pub mod blobs;
pub mod cgroup_sandbox;
pub mod console;
pub mod disk_config;
pub mod disk_template;
pub mod host_topology;
pub mod initramfs;
pub(crate) mod kvm_stats;
pub mod topology;
#[cfg(feature = "wprof")]
pub mod wprof;
pub(crate) mod builder;
pub(crate) mod capture_numa;
pub(crate) mod capture_scx;
pub(crate) mod capture_tasks;
pub(crate) mod cast_analysis_load;
pub(crate) mod contention;
pub(crate) mod exit_dispatch;
pub(crate) mod freeze_coord;
pub(crate) mod initramfs_cache;
pub(crate) mod net_config;
pub(crate) mod numa_mem;
pub(crate) mod result;
pub(crate) mod rust_init;
pub(crate) mod sched_stats;
pub(crate) mod setup;
pub(crate) mod vcpu;
pub(crate) mod virtio_blk;
pub(crate) mod virtio_console;
pub(crate) mod virtio_net;
pub(crate) mod bulk;
pub(crate) mod guest_comms;
pub(crate) mod host_comms;
pub mod wire;
mod memory_budget;
mod pi_mutex;
mod terminal;
mod vcpu_panic;
mod vmlinux;
#[allow(unused_imports)]
pub use net_config::NetConfig;
#[allow(unused_imports)]
pub use virtio_blk::VirtioBlkCountersSnapshot;
#[allow(unused_imports)]
pub use virtio_net::VirtioNetCountersSnapshot;
pub use builder::KtstrVmBuilder;
#[allow(unused_imports)]
pub use result::KVM_INTERESTING_STATS;
pub use result::{KvmStatsTotals, VmResult};
#[allow(unused_imports)]
pub use sched_stats::{SchedStatsClient, SchedStatsError, StatsRequest, StatsResponse};
pub(crate) use contention::{
create_vm_with_retry, host_resource_snapshot, map_transient_to_contention,
};
pub(crate) use pi_mutex::PiMutex;
pub(crate) use terminal::TerminalRawGuard;
pub(crate) use vcpu::{
BpfMapWriteParams, ImmediateExitHandle, register_vcpu_signal_handler, set_thread_cpumask,
vcpu_signal,
};
pub(crate) use vmlinux::find_vmlinux;
#[cfg(target_arch = "aarch64")]
pub mod aarch64;
#[cfg(target_arch = "x86_64")]
pub mod x86_64;
pub(crate) const KERNEL_HALF_CANONICAL: u64 = 0xFFFF_8000_0000_0000;
#[cfg(target_arch = "x86_64")]
#[allow(unused_imports)]
pub use x86_64::acpi;
#[cfg(target_arch = "x86_64")]
#[allow(unused_imports)]
pub use x86_64::boot;
#[cfg(target_arch = "x86_64")]
pub use x86_64::kvm;
#[cfg(target_arch = "x86_64")]
#[allow(unused_imports)]
pub use x86_64::mptable;
#[cfg(target_arch = "aarch64")]
#[allow(unused_imports)]
pub use aarch64::boot;
#[cfg(target_arch = "aarch64")]
pub use aarch64::kvm;
#[cfg(target_arch = "x86_64")]
pub(crate) use x86_64::kvm::IoapicHandle;
#[cfg(not(target_arch = "x86_64"))]
pub(crate) enum IoapicHandle {}
pub use topology::Topology;
use anyhow::{Context, Result};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
#[cfg(target_arch = "x86_64")]
const DRAM_BASE: u64 = 0;
#[cfg(target_arch = "aarch64")]
const DRAM_BASE: u64 = kvm::DRAM_START;
pub struct KtstrVm {
pub(crate) kernel: PathBuf,
pub(crate) init_binary: Option<PathBuf>,
pub(crate) scheduler_binary: Option<PathBuf>,
pub(crate) staged_schedulers: Vec<crate::vmm::builder::StagedScheduler>,
pub(crate) staged_sched_args_packed: Vec<(String, Vec<String>)>,
pub(crate) run_args: Vec<String>,
pub(crate) sched_args: Vec<String>,
pub(crate) workload_root_cgroup: Option<String>,
pub(crate) scheduler_cgroup_parent: Option<String>,
pub(crate) topology: Topology,
pub(crate) memory_mib: Option<u32>,
pub(crate) memory_min_mib: u32,
pub(crate) cmdline_extra: String,
pub(crate) timeout: Duration,
pub(crate) monitor_thresholds: Option<crate::monitor::MonitorThresholds>,
pub(crate) watchdog_timeout: Option<Duration>,
pub(crate) rendezvous_timeout: Option<Duration>,
pub(crate) bpf_map_writes: Vec<BpfMapWriteParams>,
pub(crate) performance_mode: bool,
pub(crate) no_perf_mode: bool,
pub(crate) pinning_plan: Option<host_topology::PinningPlan>,
pub(crate) mbind_node_map: Vec<Vec<usize>>,
#[allow(dead_code)]
pub(crate) no_perf_plan: Option<host_topology::LlcPlan>,
#[allow(dead_code)]
pub(crate) host_topo: Option<host_topology::HostTopology>,
pub(crate) sched_enable_cmds: Vec<String>,
pub(crate) sched_disable_cmds: Vec<String>,
pub(crate) include_files: Vec<(String, PathBuf)>,
pub(crate) disks: Vec<disk_config::DiskConfig>,
pub(crate) network: Option<net_config::NetConfig>,
pub(crate) template_staging_image: Option<PathBuf>,
pub(crate) busybox_bytes: Option<Vec<u8>>,
#[cfg(feature = "wprof")]
pub(crate) wprof: Option<crate::vmm::wprof::WprofConfig>,
pub(crate) dmesg: bool,
pub(crate) exec_cmd: Option<String>,
pub(crate) exec_timeout: Duration,
pub(crate) jemalloc_probe_binary: Option<PathBuf>,
pub(crate) jemalloc_alloc_worker_binary: Option<PathBuf>,
pub(crate) failure_dump_path: Option<PathBuf>,
pub(crate) dual_snapshot: bool,
#[allow(dead_code)]
pub(crate) workload_duration: Option<Duration>,
pub(crate) num_snapshots: u32,
pub(crate) cast_map: std::sync::Arc<crate::vmm::cast_analysis_load::LazyCastMap>,
}
struct RunLocks {
#[allow(dead_code)]
locks: Vec<std::os::fd::OwnedFd>,
default_cpu_mask: Option<Vec<usize>>,
pinning_plan: Option<host_topology::PinningPlan>,
}
#[cfg(target_arch = "x86_64")]
fn routing_failure_summary(n: u64) -> Option<String> {
(n > 0).then(|| {
format!(
"WARNING: {n} device-IRQ routing failure(s) during this run \
(KVM_SET_GSI_ROUTING errored) — affected devices' interrupts \
were not delivered, so those devices may have hung"
)
})
}
impl KtstrVm {
pub fn builder() -> KtstrVmBuilder {
KtstrVmBuilder::default()
}
fn suffix_params(&self) -> initramfs::SuffixParams<'_> {
debug_assert!(
self.init_binary.is_some(),
"suffix_params: production initramfs path requires init_binary (the /init payload)"
);
initramfs::SuffixParams {
payload: self.init_binary.as_deref(),
args: &self.run_args,
sched_args: &self.sched_args,
sched_enable: &self.sched_enable_cmds,
sched_disable: &self.sched_disable_cmds,
exec_cmd: self.exec_cmd.as_deref(),
staged_sched_args: &self.staged_sched_args_packed,
workload_root_cgroup: self.workload_root_cgroup.as_deref(),
scheduler_cgroup_parent: self.scheduler_cgroup_parent.as_deref(),
}
}
pub fn run(&self) -> Result<VmResult> {
let start = Instant::now();
let initramfs_handle = self.spawn_initramfs_resolve();
eprintln!(" initramfs spawn: {:?}", start.elapsed());
let (mut vm, kernel_result) = self.create_vm_and_load_kernel()?;
eprintln!(" kvm+kernel: {:?}", start.elapsed());
#[cfg(target_arch = "x86_64")]
let _kernel_result = {
let kr = self.setup_memory(&mut vm, kernel_result, initramfs_handle)?;
eprintln!(" setup_memory (joins initramfs): {:?}", start.elapsed());
self.setup_vcpus(&vm, kr.entry)?;
eprintln!(" setup_vcpus: {:?}", start.elapsed());
kr
};
#[cfg(target_arch = "aarch64")]
let _kernel_result = {
let kr = self.setup_memory_aarch64(&mut vm, kernel_result, initramfs_handle)?;
self.setup_vcpus_aarch64(&vm, kr.entry)?;
kr
};
let stats_ctx = kvm_stats::open_stats_context(&vm.vcpus);
if stats_ctx.is_none() {
tracing::debug!("KVM_GET_STATS_FD not supported, skipping stats collection");
}
eprintln!("VM setup total: {:?}", start.elapsed());
tracing::debug!(elapsed_us = start.elapsed().as_micros(), "total_setup");
let run_start = Instant::now();
let run_locks = self.acquire_run_locks()?;
let effective_plan = run_locks
.pinning_plan
.as_ref()
.or(self.pinning_plan.as_ref());
let run = self.run_vm(
run_start,
vm,
run_locks.default_cpu_mask.as_deref(),
effective_plan,
)?;
drop(run_locks);
let mut result = self.collect_results(start, run)?;
if let Some(ctx) = stats_ctx {
result.kvm_stats = Some(ctx.read_stats());
}
Ok(result)
}
fn acquire_run_locks(&self) -> Result<RunLocks> {
if self.no_perf_mode {
if let Some(ref plan) = self.no_perf_plan {
let stub = host_topology::PinningPlan {
assignments: Vec::new(),
service_cpu: None,
llc_indices: plan.locked_llcs.clone(),
locks: Vec::new(),
};
match host_topology::acquire_resource_locks(
&stub,
&stub.llc_indices,
host_topology::LlcLockMode::Shared,
)? {
host_topology::LockOutcome::Acquired { locks, .. } => Ok(RunLocks {
locks,
default_cpu_mask: None,
pinning_plan: None,
}),
host_topology::LockOutcome::Unavailable(reason) => {
Err(anyhow::Error::new(host_topology::ResourceContention {
reason,
}))
}
}
} else {
Ok(RunLocks {
locks: Vec::new(),
default_cpu_mask: None,
pinning_plan: None,
})
}
} else if self.performance_mode {
if let Some(ref plan) = self.pinning_plan {
match host_topology::acquire_resource_locks(
plan,
&plan.llc_indices,
host_topology::LlcLockMode::Exclusive,
)? {
host_topology::LockOutcome::Acquired { locks, .. } => Ok(RunLocks {
locks,
default_cpu_mask: None,
pinning_plan: None,
}),
host_topology::LockOutcome::Unavailable(reason) => {
Err(anyhow::Error::new(host_topology::ResourceContention {
reason,
}))
}
}
} else {
Ok(RunLocks {
locks: Vec::new(),
default_cpu_mask: None,
pinning_plan: None,
})
}
} else {
if let Some(ref host_topo) = self.host_topo {
let num_llcs = host_topo.llc_groups.len();
let llcs_needed = (self.topology.llcs as usize).max(1);
let max_slots = num_llcs.checked_div(llcs_needed).unwrap_or(1).max(1);
let start = host_topology::pid_window_offset(std::process::id(), max_slots);
for i in 0..max_slots {
let slot = (start + i) % max_slots;
let offset = slot * llcs_needed;
let Ok(candidate) = host_topo.compute_pinning(&self.topology, false, offset)
else {
continue;
};
match host_topology::acquire_resource_locks(
&candidate,
&candidate.llc_indices,
host_topology::LlcLockMode::Shared,
)? {
host_topology::LockOutcome::Acquired { locks, .. } => {
return Ok(RunLocks {
locks,
default_cpu_mask: None,
pinning_plan: Some(candidate),
});
}
host_topology::LockOutcome::Unavailable(_) => continue,
}
}
Err(anyhow::Error::new(host_topology::ResourceContention {
reason: format!(
"all {max_slots} LLC slots busy (LOCK_SH)\n \
hint: a performance_mode test may hold LOCK_EX; \
nextest retry will resolve after it finishes"
),
}))
} else {
Ok(RunLocks {
locks: Vec::new(),
default_cpu_mask: None,
pinning_plan: None,
})
}
}
}
pub fn run_interactive(&self) -> Result<Option<i32>> {
let start = Instant::now();
let initramfs_handle = self.spawn_initramfs_resolve();
let (mut vm, kernel_result) = self.create_vm_and_load_kernel()?;
#[cfg(target_arch = "x86_64")]
{
let kr = self.setup_memory(&mut vm, kernel_result, initramfs_handle)?;
self.setup_vcpus(&vm, kr.entry)?;
}
#[cfg(target_arch = "aarch64")]
{
let kr = self.setup_memory_aarch64(&mut vm, kernel_result, initramfs_handle)?;
self.setup_vcpus_aarch64(&vm, kr.entry)?;
}
let com1 = Arc::new(PiMutex::new(console::Serial::new(console::COM1_BASE)));
let com2 = Arc::new(PiMutex::new(console::Serial::new(console::COM2_BASE)));
#[cfg(target_arch = "x86_64")]
let ioapic_handle: Option<Arc<crate::vmm::IoapicHandle>> = vm.ioapic.as_ref().map(|io| {
Arc::new(crate::vmm::IoapicHandle::new(
io.clone(),
std::os::unix::io::AsRawFd::as_raw_fd(&*vm.vm_fd),
))
});
#[cfg(not(target_arch = "x86_64"))]
let ioapic_handle: Option<Arc<crate::vmm::IoapicHandle>> = None;
let mut vc = virtio_console::VirtioConsole::new();
vc.set_mem((*vm.guest_mem).clone());
let virtio_con = Arc::new(PiMutex::new(vc));
#[cfg(target_arch = "x86_64")]
{
vm.vm_fd
.register_irqfd(com1.lock().irq_evt(), console::COM1_IRQ)
.context("register COM1 irqfd")?;
vm.vm_fd
.register_irqfd(com2.lock().irq_evt(), console::COM2_IRQ)
.context("register COM2 irqfd")?;
vm.vm_fd
.register_irqfd(virtio_con.lock().irq_evt(), kvm::VIRTIO_CONSOLE_IRQ)
.context("register virtio-console irqfd")?;
}
#[cfg(target_arch = "aarch64")]
{
vm.vm_fd
.register_irqfd(com1.lock().irq_evt(), kvm::SERIAL_IRQ)
.context("register serial irqfd")?;
vm.vm_fd
.register_irqfd(com2.lock().irq_evt(), kvm::SERIAL2_IRQ)
.context("register serial2 irqfd")?;
vm.vm_fd
.register_irqfd(virtio_con.lock().irq_evt(), kvm::VIRTIO_CONSOLE_IRQ)
.context("register virtio-console irqfd")?;
}
let virtio_blk = self.init_virtio_blk(&vm)?;
let virtio_net = self.init_virtio_net(&vm)?;
let exec_mode = self.exec_cmd.is_some();
if !exec_mode {
use std::os::unix::io::AsRawFd;
let stdin_fd = std::io::stdin().as_raw_fd();
let borrowed = unsafe { std::os::unix::io::BorrowedFd::borrow_raw(stdin_fd) };
anyhow::ensure!(
nix::unistd::isatty(borrowed).unwrap_or(false),
"stdin must be a terminal for interactive shell mode",
);
}
let _raw_guard = if exec_mode {
None
} else {
Some(TerminalRawGuard::enter().context("failed to set terminal to raw mode")?)
};
let (wakeup_r, wakeup_w) = nix::unistd::pipe().context("create stdin wakeup pipe")?;
let kill = Arc::new(AtomicBool::new(false));
let kill_evt = Arc::new(
vmm_sys_util::eventfd::EventFd::new(libc::EFD_NONBLOCK)
.context("create shell kill eventfd")?,
);
let freeze = Arc::new(AtomicBool::new(false));
let watchpoint =
Arc::new(vcpu::WatchpointArm::new().context("create WatchpointArm.hit_evt EventFd")?);
let bsp_parked = Arc::new(AtomicBool::new(false));
let bsp_regs: Arc<std::sync::Mutex<Option<exit_dispatch::VcpuRegSnapshot>>> =
Arc::new(std::sync::Mutex::new(None));
let has_immediate_exit = vm.has_immediate_exit;
let mut vcpus = std::mem::take(&mut vm.vcpus);
let mut bsp = vcpus.remove(0);
let ap_pins = vec![None; vcpus.len()];
let _run_locks = self.acquire_run_locks()?;
let no_perf_mask: Option<&[usize]> = self.no_perf_plan.as_ref().map(|p| p.cpus.as_slice());
let n_aps = vcpus.len();
let ap_tid_slots: Vec<(Arc<AtomicI32>, Arc<crate::sync::Latch>)> = (0..n_aps)
.map(|_| {
(
Arc::new(AtomicI32::new(0)),
Arc::new(crate::sync::Latch::new()),
)
})
.collect();
let (ap_threads, _ap_freeze) = self.spawn_ap_threads(
vcpus,
has_immediate_exit,
&com1,
&com2,
Some(&virtio_con),
virtio_blk.as_ref(),
virtio_net.as_ref(),
ioapic_handle.as_ref(),
&kill,
&kill_evt,
&freeze,
&watchpoint,
&ap_pins,
no_perf_mask,
&ap_tid_slots,
None,
None,
)?;
let bsp_ie_for_stdin = if has_immediate_exit {
Some(ImmediateExitHandle::from_vcpu(&mut bsp))
} else {
None
};
let bsp_tid = unsafe { libc::pthread_self() };
let timed_out = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let exec_watchdog = if exec_mode {
let bsp_ie_for_wd = bsp_ie_for_stdin;
let kill_for_wd = kill.clone();
let timed_out_for_wd = timed_out.clone();
let deadline = self.exec_timeout;
Some(
std::thread::Builder::new()
.name("interactive-exec-watchdog".into())
.spawn(move || {
let start = std::time::Instant::now();
loop {
if kill_for_wd.load(Ordering::Acquire) {
return;
}
if start.elapsed() >= deadline {
if kill_for_wd.load(Ordering::Acquire) {
return;
}
timed_out_for_wd.store(true, Ordering::Release);
kill_for_wd.store(true, Ordering::Release);
if let Some(ref ie) = bsp_ie_for_wd {
ie.set(1);
std::sync::atomic::fence(Ordering::Release);
}
unsafe {
libc::pthread_kill(bsp_tid, vcpu_signal());
}
return;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
})
.context("spawn interactive-exec-watchdog thread")?,
)
} else {
None
};
struct CrossThreadKickGuard {
watchdog: Option<std::thread::JoinHandle<()>>,
stdin: Option<std::thread::JoinHandle<()>>,
kill: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl Drop for CrossThreadKickGuard {
fn drop(&mut self) {
self.kill.store(true, std::sync::atomic::Ordering::Release);
if let Some(h) = self.watchdog.take() {
let _ = h.join();
}
if let Some(h) = self.stdin.take() {
let _ = h.join();
}
}
}
let mut kick_guard = CrossThreadKickGuard {
watchdog: exec_watchdog,
stdin: None,
kill: kill.clone(),
};
let vc_for_stdin = virtio_con.clone();
let kill_for_stdin = kill.clone();
let stdin_thread = std::thread::Builder::new()
.name("interactive-stdin".into())
.spawn(move || {
use std::io::Read;
use std::os::unix::io::{AsFd, AsRawFd};
let wakeup_fd = wakeup_r;
let stdin_fd = std::io::stdin().as_raw_fd();
let mut buf = [0u8; 4096];
let mut saw_ctrl_a = false;
loop {
if kill_for_stdin.load(Ordering::Acquire) {
break;
}
let stdin_borrowed =
unsafe { std::os::unix::io::BorrowedFd::borrow_raw(stdin_fd) };
let wakeup_borrowed = wakeup_fd.as_fd();
let mut fds = [
nix::poll::PollFd::new(stdin_borrowed, nix::poll::PollFlags::POLLIN),
nix::poll::PollFd::new(wakeup_borrowed, nix::poll::PollFlags::POLLIN),
];
match nix::poll::poll(&mut fds, 100u16) {
Ok(0) => continue, Err(nix::errno::Errno::EINTR) => continue,
Err(_) => break,
Ok(_) => {}
}
if fds[1]
.revents()
.is_some_and(|r| r.intersects(nix::poll::PollFlags::POLLIN))
{
break;
}
if fds[0]
.revents()
.is_some_and(|r| r.intersects(nix::poll::PollFlags::POLLIN))
{
let mut stdin = std::io::stdin().lock();
match stdin.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let mut forward_start = 0usize;
for i in 0..n {
if saw_ctrl_a {
saw_ctrl_a = false;
if buf[i] == b'x' || buf[i] == b'X' {
eprintln!("\r\nTerminated.");
kill_for_stdin.store(true, Ordering::Release);
if let Some(ref ie) = bsp_ie_for_stdin {
ie.set(1);
std::sync::atomic::fence(Ordering::Release);
}
unsafe {
libc::pthread_kill(bsp_tid, vcpu_signal());
}
return;
}
if forward_start < i {
vc_for_stdin.lock().queue_input(&buf[forward_start..i]);
forward_start = i;
}
vc_for_stdin.lock().queue_input(&[0x01]);
}
if buf[i] == 0x01 {
if forward_start < i {
vc_for_stdin.lock().queue_input(&buf[forward_start..i]);
}
saw_ctrl_a = true;
forward_start = i + 1;
continue;
}
}
if forward_start < n {
vc_for_stdin.lock().queue_input(&buf[forward_start..n]);
}
}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(_) => break,
}
}
}
})
.context("spawn stdin reader thread")?;
kick_guard.stdin = Some(stdin_thread);
let vc_for_stdout = virtio_con.clone();
let kill_for_stdout = kill.clone();
let stdout_thread: JoinHandle<bool> = std::thread::Builder::new()
.name("interactive-stdout".into())
.spawn(move || {
use std::io::Write;
let mut wrote_any = false;
let tx_evt_raw_fd = {
let guard = vc_for_stdout.lock();
std::os::unix::io::AsRawFd::as_raw_fd(guard.tx_evt())
};
let mut stdout = std::io::stdout().lock();
loop {
if kill_for_stdout.load(Ordering::Acquire) {
break;
}
let borrowed =
unsafe { std::os::unix::io::BorrowedFd::borrow_raw(tx_evt_raw_fd) };
let mut fds = [nix::poll::PollFd::new(
borrowed,
nix::poll::PollFlags::POLLIN,
)];
match nix::poll::poll(&mut fds, 50u16) {
Ok(0) => continue,
Err(nix::errno::Errno::EINTR) => continue,
Err(_) => break,
Ok(_) => {
let _ = vc_for_stdout.lock().tx_evt().read();
}
}
if kill_for_stdout.load(Ordering::Acquire) {
break;
}
let data = vc_for_stdout.lock().drain_output();
if !data.is_empty() {
let valid_len = match std::str::from_utf8(&data) {
Ok(_) => data.len(),
Err(e) => e.valid_up_to(),
};
if valid_len > 0 {
if stdout.write_all(&data[..valid_len]).is_err()
|| stdout.flush().is_err()
{
kill_for_stdout.store(true, Ordering::Release);
break;
}
wrote_any = true;
}
}
}
let data = vc_for_stdout.lock().drain_output();
if !data.is_empty() {
let valid_len = match std::str::from_utf8(&data) {
Ok(_) => data.len(),
Err(e) => e.valid_up_to(),
};
if valid_len > 0 {
let _ = stdout.write_all(&data[..valid_len]);
let _ = stdout.flush();
wrote_any = true;
}
}
wrote_any
})
.context("spawn stdout writer thread")?;
let (dmesg_thread, dmesg_wakeup_evt) = if self.dmesg {
use std::os::unix::io::AsRawFd;
use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
use vmm_sys_util::eventfd::{EFD_NONBLOCK, EventFd};
let data_evt = com1
.lock()
.install_data_evt()
.context("install COM1 dmesg data eventfd")?;
let wakeup_evt =
Arc::new(EventFd::new(EFD_NONBLOCK).context("create dmesg wakeup eventfd")?);
let com1_for_dmesg = com1.clone();
let kill_for_dmesg = kill.clone();
let wakeup_for_thread = wakeup_evt.clone();
const DATA_TOKEN: u64 = 0;
const WAKEUP_TOKEN: u64 = 1;
let handle = std::thread::Builder::new()
.name("interactive-dmesg".into())
.spawn(move || {
use std::io::Write;
let epoll = match Epoll::new() {
Ok(e) => e,
Err(e) => {
tracing::warn!(%e, "interactive-dmesg: epoll_create1 failed");
return;
}
};
if let Err(e) = epoll.ctl(
ControlOperation::Add,
data_evt.as_raw_fd(),
EpollEvent::new(EventSet::IN, DATA_TOKEN),
) {
tracing::warn!(%e, "interactive-dmesg: add data_evt to epoll");
return;
}
if let Err(e) = epoll.ctl(
ControlOperation::Add,
wakeup_for_thread.as_raw_fd(),
EpollEvent::new(EventSet::IN, WAKEUP_TOKEN),
) {
tracing::warn!(%e, "interactive-dmesg: add wakeup to epoll");
return;
}
let mut events = [EpollEvent::default(); 2];
loop {
if kill_for_dmesg.load(Ordering::Acquire) {
break;
}
match epoll.wait(-1, &mut events) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => {
tracing::warn!(%e, "interactive-dmesg: epoll_wait failed");
break;
}
}
let _ = data_evt.read();
let _ = wakeup_for_thread.read();
let data = com1_for_dmesg.lock().drain_output();
if !data.is_empty() {
let mut stderr = std::io::stderr().lock();
let _ = stderr.write_all(&data);
let _ = stderr.flush();
}
}
let data = com1_for_dmesg.lock().drain_output();
if !data.is_empty() {
let mut stderr = std::io::stderr().lock();
let _ = stderr.write_all(&data);
let _ = stderr.flush();
}
})
.context("spawn dmesg thread")?;
(Some(handle), Some(wakeup_evt))
} else {
(None, None)
};
if let Some(mask) = self.no_perf_plan.as_ref().map(|p| p.cpus.as_slice()) {
set_thread_cpumask(mask, "BSP (shell)");
}
register_vcpu_signal_handler();
let interactive_timeout = Duration::from_secs(24 * 60 * 60);
self.run_bsp_loop(
&mut bsp,
&com1,
&com2,
Some(&virtio_con),
virtio_blk.as_ref(),
virtio_net.as_ref(),
ioapic_handle.as_ref(),
&kill,
&freeze,
&watchpoint,
&bsp_parked,
&bsp_regs,
has_immediate_exit,
start,
interactive_timeout,
None,
None,
None,
None,
&std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
&std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
&std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
&vmm_sys_util::eventfd::EventFd::new(0)
.expect("eventfd for interactive-shell kern_virt_kaslr publish"),
0,
);
kill.store(true, Ordering::Release);
let _ = nix::unistd::write(&wakeup_w, &[0u8]);
drop(wakeup_w);
if let Some(ref evt) = dmesg_wakeup_evt {
let _ = evt.write(1);
}
for vt in &ap_threads {
if !vt.exited.load(Ordering::Acquire) {
vt.kick();
}
}
for vt in ap_threads {
vt.wait_for_exit(Duration::from_secs(5));
let _ = vt.handle.join();
}
let stdout_wrote = stdout_thread.join().unwrap_or(false);
if let Some(dt) = dmesg_thread {
let _ = dt.join();
}
drop(dmesg_wakeup_evt);
drop(_raw_guard);
#[cfg(target_arch = "x86_64")]
if let Some(io) = &ioapic_handle
&& let Some(msg) = routing_failure_summary(io.routing_failures())
{
eprintln!("{msg}");
}
if exec_mode && !stdout_wrote {
let app_output = com2.lock().output();
if !app_output.is_empty() {
use std::io::Write;
let mut stdout = std::io::stdout().lock();
let _ = stdout.write_all(app_output.as_bytes());
let _ = stdout.flush();
}
}
if !self.dmesg {
let console_output = com1.lock().output();
if !console_output.is_empty() {
eprintln!("{console_output}");
}
}
if !exec_mode {
eprintln!("Connection to VM closed.");
return Ok(None);
}
let bulk = virtio_con.lock().final_drain();
let entries = crate::vmm::host_comms::parse_tlv_stream(&bulk).entries;
match Self::exec_exit_from_entries(&entries) {
Some(code) => Ok(Some(code)),
None => {
if timed_out.load(Ordering::Acquire) {
anyhow::bail!(
"shell --exec '{}' exceeded the {:?} exec-timeout and \
was force-killed; the payload's exit code is unknown. \
Raise --exec-timeout for a legitimately long-running \
payload.",
self.exec_cmd.as_deref().unwrap_or("?"),
self.exec_timeout,
)
}
let com1_out = com1.lock().output();
let com2_out = com2.lock().output();
anyhow::bail!(
"shell --exec '{}' finished but the guest delivered no CRC-valid \
MSG_TYPE_EXEC_EXIT frame; the payload's exit code is unknown.{}",
self.exec_cmd.as_deref().unwrap_or("?"),
Self::detect_guest_failure(&com1_out, &com2_out)
)
}
}
}
fn exec_exit_from_entries(entries: &[crate::vmm::wire::ShmEntry]) -> Option<i32> {
entries
.iter()
.rev()
.find(|e| {
e.msg_type == crate::vmm::wire::MSG_TYPE_EXEC_EXIT
&& e.crc_ok
&& e.payload.len() == 4
})
.map(|e| i32::from_le_bytes(e.payload[..4].try_into().unwrap()))
}
fn detect_guest_failure(com1: &str, com2: &str) -> String {
const ALLOC_FAIL: &str = "memory allocation of";
const PANIC: &str = "Kernel panic - not syncing";
const INIT_KILL: &str = "Attempted to kill init";
fn trunc(line: &str) -> String {
const MAX_CHARS: usize = 200;
let t = line.trim();
if t.chars().count() > MAX_CHARS {
let head: String = t.chars().take(MAX_CHARS).collect();
format!("{head}…")
} else {
t.to_string()
}
}
for hay in [com2, com1] {
if let Some(line) = hay.lines().find(|l| l.contains(ALLOC_FAIL)) {
return format!(
" The guest /init aborted on a failed allocation: '{}'. The \
/init is the test binary itself — raise memory_mib or check \
the guest overcommit policy (vm.overcommit_memory).",
trunc(line)
);
}
}
for hay in [com1, com2] {
if let Some(line) = hay
.lines()
.find(|l| l.contains(PANIC) || l.contains(INIT_KILL))
{
return format!(" Guest kernel panic: '{}'.", trunc(line));
}
}
" (the guest may have panicked or rebooted before send_exec_exit)".to_string()
}
}
#[cfg(test)]
mod tests;