use super::*;
const SCHED_STATS_SOCKET: &str = "/var/run/scx/root/stats";
const SCHED_STATS_PORT_DEV: &str = "/dev/vport0p2";
const RELAY_BUFFER_BYTES: usize = 256 * 1024;
const SCHED_STATS_SOCKET_DIR: &str = "/var/run/scx/root";
const SCHED_STATS_SOCKET_NAME: &str = "stats";
const SCHED_STATS_RELAY_NO_SCHEDULER_REPLY: &[u8] =
b"{\"ktstr_relay_error\":\"no scheduler available\"}\n";
pub(crate) struct RelayStopSignal {
flag: Arc<AtomicBool>,
evt: Arc<vmm_sys_util::eventfd::EventFd>,
}
impl RelayStopSignal {
pub(crate) fn signal_stop(&self) {
self.flag.store(true, Ordering::Release);
let _ = self.evt.write(1);
}
}
pub(crate) fn start_sched_stats_relay() -> RelayStopSignal {
use vmm_sys_util::eventfd::{EFD_NONBLOCK, EventFd};
let flag = Arc::new(AtomicBool::new(false));
let evt = match EventFd::new(EFD_NONBLOCK) {
Ok(e) => Arc::new(e),
Err(err) => {
tracing::error!(
error = %err,
"stats relay: eventfd create failed; relay disabled \
(host SchedStatsClient calls will hang on shutdown)"
);
return RelayStopSignal {
flag,
evt: Arc::new(EventFd::new(0).unwrap_or_else(|_| {
panic!("stats relay: cannot create any eventfd")
})),
};
}
};
let flag_for_thread = flag.clone();
let evt_for_thread = evt.clone();
let _ = std::thread::Builder::new()
.name("ktstr-sched-stats-relay".into())
.spawn(move || {
sched_stats_relay_loop(flag_for_thread, evt_for_thread);
});
RelayStopSignal { flag, evt }
}
const SCHED_STATS_RELAY_MAX_CONSECUTIVE_PORT_EOF: u32 = 3;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum RelaySessionExit {
PortEof,
Other,
}
fn sched_stats_relay_loop(stop: Arc<AtomicBool>, stop_evt: Arc<vmm_sys_util::eventfd::EventFd>) {
let mut port = match fs::OpenOptions::new()
.read(true)
.write(true)
.open(SCHED_STATS_PORT_DEV)
{
Ok(f) => f,
Err(e) => {
tracing::warn!(
error = %e,
path = SCHED_STATS_PORT_DEV,
"stats relay: open vport0p2 failed; relay disabled"
);
return;
}
};
let mut consecutive_port_eof: u32 = 0;
while !stop.load(Ordering::Acquire) {
let wait_exit = wait_for_stats_socket(&mut port, &stop, &stop_evt);
match wait_exit {
WaitSocketResult::Connected(socket) => {
consecutive_port_eof = 0;
let exit = run_relay_session(&mut port, socket, &stop, &stop_evt);
match exit {
RelaySessionExit::PortEof => {
consecutive_port_eof += 1;
}
RelaySessionExit::Other => {
consecutive_port_eof = 0;
}
}
if consecutive_port_eof >= SCHED_STATS_RELAY_MAX_CONSECUTIVE_PORT_EOF {
tracing::warn!(
consecutive_port_eof,
"stats relay: vport0p2 returned Ok(0) on \
{SCHED_STATS_RELAY_MAX_CONSECUTIVE_PORT_EOF} consecutive \
relay sessions — assuming the port is permanently dead and \
exiting the relay thread to avoid a busy-loop"
);
return;
}
}
WaitSocketResult::PortEof => {
consecutive_port_eof += 1;
if consecutive_port_eof >= SCHED_STATS_RELAY_MAX_CONSECUTIVE_PORT_EOF {
tracing::warn!(
consecutive_port_eof,
"stats relay: vport0p2 returned Ok(0) on \
{SCHED_STATS_RELAY_MAX_CONSECUTIVE_PORT_EOF} consecutive \
inotify-wait drains — assuming the port is permanently \
dead and exiting the relay thread to avoid a busy-loop"
);
return;
}
}
WaitSocketResult::Stopped => {
return;
}
}
}
}
enum WaitSocketResult {
Connected(std::os::unix::net::UnixStream),
PortEof,
Stopped,
}
fn wait_for_stats_socket(
port: &mut std::fs::File,
stop: &Arc<AtomicBool>,
stop_evt: &Arc<vmm_sys_util::eventfd::EventFd>,
) -> WaitSocketResult {
use nix::poll::{PollFd, PollFlags, PollTimeout, poll};
use nix::sys::inotify::{AddWatchFlags, InitFlags, Inotify};
use std::ffi::OsStr;
use std::os::unix::io::AsFd;
let _ = fs::create_dir_all(SCHED_STATS_SOCKET_DIR);
let inotify = match Inotify::init(InitFlags::IN_CLOEXEC | InitFlags::IN_NONBLOCK) {
Ok(i) => i,
Err(e) => {
tracing::warn!(error = %e, "stats relay: inotify_init failed");
return WaitSocketResult::Stopped;
}
};
if let Err(e) = inotify.add_watch(
SCHED_STATS_SOCKET_DIR,
AddWatchFlags::IN_CREATE
| AddWatchFlags::IN_MOVED_TO
| AddWatchFlags::IN_ATTRIB
| AddWatchFlags::IN_OPEN,
) {
tracing::warn!(
error = %e,
dir = SCHED_STATS_SOCKET_DIR,
"stats relay: inotify add_watch failed"
);
return WaitSocketResult::Stopped;
}
if stop.load(Ordering::Acquire) {
return WaitSocketResult::Stopped;
}
if let Ok(s) = std::os::unix::net::UnixStream::connect(SCHED_STATS_SOCKET) {
tracing::debug!("stats relay: connected to scheduler socket (race-free initial probe)");
return WaitSocketResult::Connected(s);
}
let target = OsStr::new(SCHED_STATS_SOCKET_NAME);
let mut buf = vec![0u8; RELAY_BUFFER_BYTES];
loop {
if stop.load(Ordering::Acquire) {
return WaitSocketResult::Stopped;
}
let inotify_fd = inotify.as_fd();
let port_fd = port.as_fd();
let stop_evt_fd =
unsafe { std::os::unix::io::BorrowedFd::borrow_raw(stop_evt.as_raw_fd()) };
let mut fds = [
PollFd::new(inotify_fd, PollFlags::POLLIN),
PollFd::new(port_fd, PollFlags::POLLIN),
PollFd::new(stop_evt_fd, PollFlags::POLLIN),
];
match poll(&mut fds, PollTimeout::NONE) {
Ok(_) => {}
Err(nix::errno::Errno::EINTR) => continue,
Err(e) => {
tracing::warn!(error = %e, "stats relay: poll on inotify failed");
return WaitSocketResult::Stopped;
}
};
let inotify_ready = fds[0]
.revents()
.is_some_and(|r| r.contains(PollFlags::POLLIN));
let port_ready = fds[1]
.revents()
.is_some_and(|r| r.contains(PollFlags::POLLIN));
let stop_ready = fds[2]
.revents()
.is_some_and(|r| r.contains(PollFlags::POLLIN));
if stop_ready {
let _ = stop_evt.read();
return WaitSocketResult::Stopped;
}
if port_ready {
match port.read(&mut buf) {
Ok(0) => {
tracing::debug!(
"stats relay: port read EOF in inotify wait; \
returning to outer loop for EOF accounting"
);
return WaitSocketResult::PortEof;
}
Ok(n) => {
tracing::debug!(
bytes = n,
"stats relay: host pushed request while waiting for scheduler; \
emitting no-scheduler error envelope"
);
if let Err(e) = port.write_all(SCHED_STATS_RELAY_NO_SCHEDULER_REPLY) {
tracing::warn!(
error = %e,
"stats relay: port write failed in inotify wait; exiting"
);
return WaitSocketResult::Stopped;
}
}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => {
tracing::warn!(
error = %e,
"stats relay: port read error in inotify wait; exiting"
);
return WaitSocketResult::Stopped;
}
}
}
if inotify_ready {
let events = match inotify.read_events() {
Ok(e) => e,
Err(nix::errno::Errno::EINTR) => continue,
Err(nix::errno::Errno::EAGAIN) => continue,
Err(e) => {
tracing::warn!(error = %e, "stats relay: inotify read_events failed");
return WaitSocketResult::Stopped;
}
};
let saw_target_or_any = events
.iter()
.any(|ev| ev.name.as_deref() == Some(target) || ev.name.is_some());
if !saw_target_or_any {
continue;
}
match std::os::unix::net::UnixStream::connect(SCHED_STATS_SOCKET) {
Ok(s) => {
tracing::debug!("stats relay: connected to scheduler socket via inotify edge");
return WaitSocketResult::Connected(s);
}
Err(e) => {
tracing::debug!(
error = %e,
"stats relay: socket appeared but connect failed (likely \
bind-without-listen race); will retry on next inotify edge"
);
}
}
}
}
}
fn drain_port_emit_errors(port: &mut std::fs::File) {
use nix::poll::{PollFd, PollFlags, PollTimeout, poll};
use std::io::ErrorKind;
use std::os::unix::io::AsFd;
let mut buf = vec![0u8; RELAY_BUFFER_BYTES];
loop {
let port_ready = {
let port_fd = port.as_fd();
let mut fds = [PollFd::new(port_fd, PollFlags::POLLIN)];
match poll(&mut fds, PollTimeout::ZERO) {
Ok(_) => fds[0]
.revents()
.is_some_and(|r| r.contains(PollFlags::POLLIN)),
Err(_) => false,
}
};
if !port_ready {
break;
}
match port.read(&mut buf) {
Ok(0) => break,
Ok(_) => {
if port
.write_all(SCHED_STATS_RELAY_NO_SCHEDULER_REPLY)
.is_err()
{
break;
}
}
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(_) => break,
}
}
}
fn run_relay_session(
port: &mut std::fs::File,
mut socket: std::os::unix::net::UnixStream,
stop: &Arc<AtomicBool>,
stop_evt: &Arc<vmm_sys_util::eventfd::EventFd>,
) -> RelaySessionExit {
use nix::poll::{PollFd, PollFlags, PollTimeout, poll};
use std::io::ErrorKind;
use std::os::unix::io::AsFd;
let mut buf = vec![0u8; RELAY_BUFFER_BYTES];
let mut socket_healthy = true;
while !stop.load(Ordering::Acquire) {
let (port_ready, socket_in, socket_hup_seen, stop_ready) = {
let port_fd = port.as_fd();
let socket_fd = socket.as_fd();
let stop_evt_fd =
unsafe { std::os::unix::io::BorrowedFd::borrow_raw(stop_evt.as_raw_fd()) };
let mut fds = [
PollFd::new(port_fd, PollFlags::POLLIN),
PollFd::new(socket_fd, PollFlags::POLLIN),
PollFd::new(stop_evt_fd, PollFlags::POLLIN),
];
match poll(&mut fds, PollTimeout::NONE) {
Ok(_) => {}
Err(nix::errno::Errno::EINTR) => continue,
Err(e) => {
tracing::warn!(error = %e, "stats relay: poll failed; exiting session");
return RelaySessionExit::Other;
}
}
let port_rev = fds[0].revents();
let socket_rev = fds[1].revents();
let stop_rev = fds[2].revents();
let port_ready = port_rev.is_some_and(|r| r.contains(PollFlags::POLLIN));
let socket_in = socket_rev.is_some_and(|r| r.contains(PollFlags::POLLIN));
let socket_hup_seen = socket_rev
.is_some_and(|r| r.contains(PollFlags::POLLHUP) || r.contains(PollFlags::POLLERR));
let stop_ready = stop_rev.is_some_and(|r| r.contains(PollFlags::POLLIN));
(port_ready, socket_in, socket_hup_seen, stop_ready)
};
if socket_hup_seen {
socket_healthy = false;
}
if stop_ready {
let _ = stop_evt.read();
return RelaySessionExit::Other;
}
if port_ready {
let n = match port.read(&mut buf) {
Ok(0) => {
tracing::debug!(
"stats relay: port read EOF; returning to outer loop \
for EOF accounting"
);
return RelaySessionExit::PortEof;
}
Ok(n) => n,
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => {
tracing::warn!(error = %e, "stats relay: port read error; exiting session");
return RelaySessionExit::Other;
}
};
if !socket_healthy {
tracing::debug!(
bytes = n,
"stats relay: port→socket forward skipped (socket already \
unhealthy); emitting error envelopes and reconnecting"
);
let _ = port.write_all(SCHED_STATS_RELAY_NO_SCHEDULER_REPLY);
drain_port_emit_errors(port);
return RelaySessionExit::Other;
}
if let Err(e) = socket.write_all(&buf[..n]) {
tracing::debug!(
error = %e,
"stats relay: socket write failed; emitting error envelopes and reconnecting"
);
let _ = port.write_all(SCHED_STATS_RELAY_NO_SCHEDULER_REPLY);
drain_port_emit_errors(port);
return RelaySessionExit::Other;
}
}
if socket_in {
let m = match socket.read(&mut buf) {
Ok(0) => {
tracing::debug!(
"stats relay: socket EOF; emitting error envelopes and reconnecting"
);
let _ = port.write_all(SCHED_STATS_RELAY_NO_SCHEDULER_REPLY);
drain_port_emit_errors(port);
return RelaySessionExit::Other;
}
Ok(m) => m,
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => {
tracing::debug!(
error = %e,
"stats relay: socket read error; emitting error envelopes and reconnecting"
);
let _ = port.write_all(SCHED_STATS_RELAY_NO_SCHEDULER_REPLY);
drain_port_emit_errors(port);
return RelaySessionExit::Other;
}
};
if let Err(e) = port.write_all(&buf[..m]) {
tracing::warn!(error = %e, "stats relay: port write failed; exiting session");
return RelaySessionExit::Other;
}
}
if !socket_healthy {
tracing::debug!(
drained_in = socket_in,
"stats relay: socket POLLHUP/POLLERR; reconnecting after draining"
);
let _ = port.write_all(SCHED_STATS_RELAY_NO_SCHEDULER_REPLY);
drain_port_emit_errors(port);
return RelaySessionExit::Other;
}
}
RelaySessionExit::Other
}