use super::*;
const SCHED_LOG_CHUNK_BYTES: usize = 64 * 1024;
pub(crate) fn dump_sched_output(log_path: &str) {
crate::vmm::guest_comms::send_sched_log(crate::verifier::SCHED_OUTPUT_START.as_bytes());
send_sched_log_file(log_path);
crate::vmm::guest_comms::send_sched_log(crate::verifier::SCHED_OUTPUT_END.as_bytes());
}
pub(crate) fn dump_staged_scheduler_logs() {
let Ok(entries) = fs::read_dir("/tmp") else {
return;
};
let mut paths: Vec<std::path::PathBuf> = entries
.flatten()
.map(|e| e.path())
.filter(|p| {
p.file_name()
.and_then(|n| n.to_str())
.is_some_and(|n| n.starts_with("sched_") && n.ends_with(".log"))
})
.collect();
paths.sort();
for p in paths {
if let Some(s) = p.to_str() {
dump_sched_output(s);
}
}
}
fn send_sched_log_file(path: &str) {
let Ok(content) = fs::read_to_string(path) else {
return;
};
let bytes = content.as_bytes();
let mut start = 0usize;
while start < bytes.len() {
let end = (start + SCHED_LOG_CHUNK_BYTES).min(bytes.len());
crate::vmm::guest_comms::send_sched_log(&bytes[start..end]);
start = end;
}
}
pub(crate) fn send_sched_log_text(s: &str) {
let bytes = s.as_bytes();
let cap = SCHED_LOG_CHUNK_BYTES.min(bytes.len());
crate::vmm::guest_comms::send_sched_log(&bytes[..cap]);
}
pub(crate) fn start_trace_pipe() -> (Option<Arc<AtomicBool>>, Option<std::thread::JoinHandle<()>>) {
if Path::new(TRACE_SCHED_EXT_DUMP_ENABLE).exists() {
let _ = fs::write(TRACE_SCHED_EXT_DUMP_ENABLE, "1");
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
let handle = std::thread::Builder::new()
.name("trace-pipe".into())
.spawn(move || {
use std::os::unix::fs::OpenOptionsExt;
let Ok(mut trace) = fs::OpenOptions::new()
.read(true)
.custom_flags(libc::O_NONBLOCK)
.open(TRACE_PIPE)
else {
return;
};
let Ok(mut com1) = fs::OpenOptions::new().write(true).open(COM1) else {
return;
};
let mut buf = [0u8; 4096];
let mut scan_tail: Vec<u8> = Vec::new();
loop {
if stop_clone.load(Ordering::Acquire) {
break;
}
let mut pollfds = [PollFd::new(trace.as_fd(), PollFlags::POLLIN)];
match poll(&mut pollfds, PollTimeout::from(200u16)) {
Ok(0) => continue,
Ok(_) => {}
Err(nix::errno::Errno::EINTR) => continue,
Err(_) => break,
}
if let Some(revents) = pollfds[0].revents() {
if revents.intersects(PollFlags::POLLERR | PollFlags::POLLNVAL) {
break;
}
if !revents.contains(PollFlags::POLLIN) {
if revents.contains(PollFlags::POLLHUP) {
break;
}
continue;
}
}
loop {
match trace.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let _ = com1.write_all(&buf[..n]);
scan_dump_markers(&buf[..n], &mut scan_tail);
if stop_clone.load(Ordering::Acquire) {
break;
}
}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(_) => break,
}
}
}
})
.ok();
(Some(stop), handle)
} else {
(None, None)
}
}
const SCAN_TAIL_KEEP: usize = 32;
static SCX_DUMP_STARTED_LATCH: OnceLock<Arc<Latch>> = OnceLock::new();
static SCX_DUMP_COMPLETE_LATCH: OnceLock<Arc<Latch>> = OnceLock::new();
pub(crate) fn scx_dump_started_latch() -> Arc<Latch> {
SCX_DUMP_STARTED_LATCH
.get_or_init(|| Arc::new(Latch::new()))
.clone()
}
pub(crate) fn scx_dump_complete_latch() -> Arc<Latch> {
SCX_DUMP_COMPLETE_LATCH
.get_or_init(|| Arc::new(Latch::new()))
.clone()
}
fn slice_find(haystack: &[u8], needle: &[u8]) -> bool {
needle.len() <= haystack.len() && haystack.windows(needle.len()).any(|w| w == needle)
}
pub(crate) fn scan_dump_markers(chunk: &[u8], tail: &mut Vec<u8>) {
if scx_dump_complete_latch().is_set() {
return;
}
tail.extend_from_slice(chunk);
if !scx_dump_started_latch().is_set() && slice_find(tail, b"sched_ext_dump:") {
scx_dump_started_latch().set();
}
if slice_find(tail, b"SCX_EV_SUB_BYPASS_DISPATCH") || slice_find(tail, b"~~~~ TRUNCATED ~~~~") {
scx_dump_complete_latch().set();
tail.clear();
return;
}
let excess = tail.len().saturating_sub(SCAN_TAIL_KEEP);
if excess > 0 {
tail.drain(..excess);
}
}
static BPF_MAP_WRITE_DONE_LATCH: OnceLock<Arc<Latch>> = OnceLock::new();
static ACCESSOR_READY_LATCH: OnceLock<Arc<Latch>> = OnceLock::new();
pub(crate) fn bpf_map_write_done_latch() -> Arc<Latch> {
BPF_MAP_WRITE_DONE_LATCH
.get_or_init(|| Arc::new(Latch::new()))
.clone()
}
pub(crate) fn accessor_ready_latch() -> Arc<Latch> {
ACCESSOR_READY_LATCH
.get_or_init(|| Arc::new(Latch::new()))
.clone()
}
pub(crate) fn start_hvc0_poll(trace_stop: Option<Arc<AtomicBool>>) -> Option<Arc<AtomicBool>> {
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
std::thread::Builder::new()
.name("hvc0-poll".into())
.spawn(move || {
hvc0_poll_loop(&stop_clone, trace_stop.as_deref());
})
.ok();
Some(stop)
}
fn hvc0_poll_loop(stop: &AtomicBool, trace_stop: Option<&AtomicBool>) {
use std::os::unix::io::AsRawFd;
let hvc0 = match fs::OpenOptions::new()
.read(true)
.custom_flags(libc::O_NONBLOCK)
.open(HVC0)
{
Ok(f) => f,
Err(e) => {
write_com2(&format!(
"ktstr-init: hvc0 poll loop disabled — open {HVC0}: {e}"
));
return;
}
};
let poll_timeout_ms: PollTimeout = 1000u16.into();
while !stop.load(Ordering::Acquire) {
let borrowed = unsafe { BorrowedFd::borrow_raw(hvc0.as_raw_fd()) };
let mut fds = [PollFd::new(borrowed, PollFlags::POLLIN)];
match poll(&mut fds, poll_timeout_ms) {
Ok(0) => continue,
Ok(_) => {}
Err(nix::errno::Errno::EINTR) => continue,
Err(_) => break,
}
if let Some(revents) = fds[0].revents() {
if revents.intersects(PollFlags::POLLERR | PollFlags::POLLNVAL) {
break;
}
if !revents.contains(PollFlags::POLLIN) {
if revents.contains(PollFlags::POLLHUP) {
break;
}
continue;
}
}
let mut buf = [0u8; 16];
let mut hvc_ref: &fs::File = &hvc0;
let n = 'read_retry: loop {
match hvc_ref.read(&mut buf) {
Ok(n) => break 'read_retry Some(n),
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => {
tracing::warn!(
err = %e,
"ktstr-init: hvc0 read failed; aborting poll loop"
);
break 'read_retry None;
}
}
};
let Some(n) = n else { break };
if buf[..n].contains(&crate::vmm::virtio_console::SIGNAL_VC_DUMP) {
let _ = fs::write("/proc/sysrq-trigger", "D");
}
if buf[..n].contains(&crate::vmm::virtio_console::SIGNAL_BPF_WRITE_DONE) {
bpf_map_write_done_latch().set();
}
if buf[..n].contains(&crate::vmm::virtio_console::SIGNAL_ACCESSOR_READY) {
accessor_ready_latch().set();
}
if buf[..n].contains(&crate::vmm::virtio_console::SIGNAL_VC_SHUTDOWN) {
tracing::info!("ktstr-init: shutdown request received, draining");
if let Some(ts) = trace_stop {
ts.store(true, Ordering::Release);
}
let _ = fs::write(TRACE_TRACING_ON, "0");
let _ = std::io::stdout().flush();
let _ = std::io::stderr().flush();
if let Ok(f) = fs::OpenOptions::new().write(true).open(COM1) {
unsafe {
libc::tcdrain(std::os::unix::io::AsRawFd::as_raw_fd(&f));
}
}
if let Ok(f) = fs::OpenOptions::new().write(true).open(COM2) {
unsafe {
libc::tcdrain(std::os::unix::io::AsRawFd::as_raw_fd(&f));
}
}
break;
}
}
}
pub(crate) struct SchedExitStop {
pub(crate) stop: Arc<AtomicBool>,
wake_fd: Option<OwnedFd>,
join_handle: Option<std::thread::JoinHandle<()>>,
}
impl SchedExitStop {
pub(crate) fn wake(&self) {
if let Some(ref fd) = self.wake_fd {
let val: u64 = 1;
let bytes = val.to_ne_bytes();
let _ = unsafe {
libc::write(
fd.as_raw_fd(),
bytes.as_ptr() as *const libc::c_void,
bytes.len(),
)
};
}
}
pub(crate) fn stop_and_join(self) {
self.stop.store(true, Ordering::Release);
self.wake();
if let Some(handle) = self.join_handle {
let _ = handle.join();
}
}
}
pub(crate) fn start_sched_exit_monitor(
sched_pid: Option<u32>,
log_path: Option<&str>,
suppress_com2: Arc<AtomicBool>,
probe_output_done: Option<Arc<crate::sync::Latch>>,
) -> Option<SchedExitStop> {
let pid = sched_pid?;
let proc_path = format!("/proc/{pid}");
let log_path = log_path.map(|s| s.to_string());
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
let (monitor_fd, writer_fd): (Option<OwnedFd>, Option<OwnedFd>) = {
let raw = unsafe { libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC) };
if raw < 0 {
let err = std::io::Error::last_os_error();
tracing::warn!(
err = %err,
"ktstr-init: sched-exit-mon eventfd allocation failed; \
falling back to 250 ms stop poll cadence"
);
(None, None)
} else {
let monitor_fd = unsafe { OwnedFd::from_raw_fd(raw) };
match monitor_fd.try_clone() {
Ok(writer_fd) => (Some(monitor_fd), Some(writer_fd)),
Err(e) => {
tracing::warn!(
err = %e,
"ktstr-init: sched-exit-mon eventfd dup failed; \
falling back to 250 ms stop poll cadence"
);
(Some(monitor_fd), None)
}
}
}
};
let join_handle = std::thread::Builder::new()
.name("sched-exit-mon".into())
.spawn(move || {
let pidfd = unsafe {
libc::syscall(libc::SYS_pidfd_open, pid as libc::c_int, 0u32) as libc::c_int
};
if pidfd < 0 {
tracing::error!(
pid,
err = %std::io::Error::last_os_error(),
"ktstr-init: pidfd_open failed for sched — sched exit monitor disabled",
);
return;
}
let stop_fd = monitor_fd.as_ref().map(|f| f.as_raw_fd()).unwrap_or(-1);
let poll_timeout: i32 = if stop_fd >= 0 { -1 } else { 250 };
while !stop_clone.load(Ordering::Acquire) {
let exited = {
let mut pfds = [
libc::pollfd {
fd: pidfd,
events: libc::POLLIN,
revents: 0,
},
libc::pollfd {
fd: stop_fd,
events: libc::POLLIN,
revents: 0,
},
];
let _ = unsafe {
libc::poll(pfds.as_mut_ptr(), pfds.len() as libc::nfds_t, poll_timeout)
};
!Path::new(&proc_path).exists()
};
if exited {
if suppress_com2.load(Ordering::Acquire) {
if let Some(ref done) = probe_output_done {
done.wait();
}
} else if let Some(ref path) = log_path {
dump_sched_output(path);
}
if stop_clone.load(Ordering::Acquire) {
unsafe {
libc::close(pidfd);
}
return;
}
let exit_code: i32 = 1;
crate::vmm::guest_comms::send_sched_exit(exit_code);
unsafe {
libc::close(pidfd);
}
return;
}
if stop_fd >= 0 {
let mut buf = [0u8; 8];
let _ = unsafe {
libc::read(stop_fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len())
};
}
}
unsafe {
libc::close(pidfd);
}
})
.ok();
Some(SchedExitStop {
stop,
wake_fd: writer_fd,
join_handle,
})
}
#[tracing::instrument]
pub(crate) fn exec_shell_script(path: &str) {
let content = match fs::read_to_string(path) {
Ok(c) => c,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
tracing::debug!(path, "ktstr-init: exec_shell_script: no script (skipping)");
return;
}
Err(e) => {
tracing::error!(path, err = %e, "ktstr-init: exec_shell_script: read failed");
return;
}
};
let mut ok_count = 0u32;
let mut fail_count = 0u32;
for line in content.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if exec_shell_line(line).is_ok() {
ok_count += 1;
} else {
fail_count += 1;
}
}
if fail_count > 0 {
tracing::error!(
path,
ok_count,
fail_count,
"ktstr-init: exec_shell_script partial-apply: {fail_count} line(s) failed, {ok_count} line(s) ok"
);
}
}
pub(crate) fn exec_shell_line(line: &str) -> Result<(), ()> {
if let Some(rest) = line.strip_prefix("echo ")
&& let Some((value, path)) = rest.split_once(" > ")
{
let value = value.trim();
let path = path.trim();
if let Err(e) = fs::write(path, format!("{value}\n")) {
tracing::error!(value, path, err = %e, "ktstr-init: echo redirect failed");
return Err(());
}
return Ok(());
}
tracing::error!(line, "ktstr-init: unsupported command");
Err(())
}