pub mod console;
pub mod host_topology;
pub mod initramfs;
pub(crate) mod rust_init;
pub mod shm_ring;
pub mod topology;
pub(crate) mod virtio_console;
#[cfg(target_arch = "aarch64")]
pub mod aarch64;
#[cfg(target_arch = "x86_64")]
pub mod x86_64;
#[cfg(target_arch = "x86_64")]
pub use x86_64::acpi;
#[cfg(target_arch = "x86_64")]
pub use x86_64::boot;
#[cfg(target_arch = "x86_64")]
pub use x86_64::kvm;
#[cfg(target_arch = "x86_64")]
pub use x86_64::kvm_stats;
#[cfg(target_arch = "x86_64")]
pub use x86_64::mptable;
#[cfg(target_arch = "aarch64")]
pub use aarch64::boot;
#[cfg(target_arch = "aarch64")]
pub use aarch64::kvm;
pub use topology::Topology;
use anyhow::{Context, Result};
use kvm_ioctls::VcpuExit;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::os::unix::thread::JoinHandleExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use vm_memory::{Bytes, GuestAddress, GuestMemory, GuestMemoryMmap};
use crate::monitor;
pub(crate) fn allocate_hugepage_memory(size: usize, base: GuestAddress) -> Result<GuestMemoryMmap> {
use vm_memory::mmap::{GuestRegionMmap, MmapRegionBuilder};
let needed_pages = size / (2 << 20);
let free_pages = host_topology::hugepages_free();
if free_pages < needed_pages as u64 {
eprintln!(
"performance_mode: WARNING: not enough hugepages \
(needed {} MB = {} pages, available {} pages). \
Using regular pages.",
size >> 20,
needed_pages,
free_pages,
);
return GuestMemoryMmap::<()>::from_ranges(&[(base, size)])
.context("allocate guest memory (hugepage fallback)");
}
let flags = libc::MAP_ANONYMOUS | libc::MAP_PRIVATE | libc::MAP_HUGETLB | libc::MAP_HUGE_2MB;
let region = MmapRegionBuilder::new(size)
.with_mmap_prot(libc::PROT_READ | libc::PROT_WRITE)
.with_mmap_flags(flags)
.with_hugetlbfs(true)
.build();
match region {
Ok(r) => {
let ret = unsafe {
libc::madvise(
r.as_ptr() as *mut libc::c_void,
r.size(),
libc::MADV_POPULATE_WRITE,
)
};
if ret != 0 {
let err = std::io::Error::last_os_error();
eprintln!(
"performance_mode: WARNING: hugepage pre-fault failed ({err}), \
not enough hugepages (needed: {} MB, available: {} pages). \
Using regular pages.",
size >> 20,
free_pages,
);
return GuestMemoryMmap::<()>::from_ranges(&[(base, size)])
.context("allocate guest memory (hugepage fallback)");
}
eprintln!(
"performance_mode: allocated {} MB with 2MB hugepages",
size >> 20
);
let guest_region = GuestRegionMmap::new(r, base)
.ok_or_else(|| anyhow::anyhow!("hugepage region overflow"))?;
GuestMemoryMmap::from_regions(vec![guest_region])
.context("create guest memory from hugepage region")
}
Err(e) => {
eprintln!(
"performance_mode: WARNING: hugepage allocation failed ({e}), \
not enough hugepages (needed: {} MB, available: {} pages). \
Using regular pages.",
size >> 20,
free_pages,
);
GuestMemoryMmap::<()>::from_ranges(&[(base, size)])
.context("allocate guest memory (hugepage fallback)")
}
}
}
pub(crate) struct PiMutex<T> {
inner: std::cell::UnsafeCell<T>,
mutex: std::cell::UnsafeCell<libc::pthread_mutex_t>,
}
unsafe impl<T: Send> Send for PiMutex<T> {}
unsafe impl<T: Send> Sync for PiMutex<T> {}
impl<T> PiMutex<T> {
pub(crate) fn new(value: T) -> Self {
unsafe {
let mut attr: libc::pthread_mutexattr_t = std::mem::zeroed();
libc::pthread_mutexattr_init(&mut attr);
libc::pthread_mutexattr_setprotocol(&mut attr, libc::PTHREAD_PRIO_INHERIT);
let mut mutex: libc::pthread_mutex_t = std::mem::zeroed();
libc::pthread_mutex_init(&mut mutex, &attr);
libc::pthread_mutexattr_destroy(&mut attr);
PiMutex {
inner: std::cell::UnsafeCell::new(value),
mutex: std::cell::UnsafeCell::new(mutex),
}
}
}
pub(crate) fn lock(&self) -> PiMutexGuard<'_, T> {
unsafe {
let rc = libc::pthread_mutex_lock(self.mutex.get());
debug_assert_eq!(rc, 0, "pthread_mutex_lock failed: {rc}");
}
PiMutexGuard { mutex: self }
}
}
impl<T> Drop for PiMutex<T> {
fn drop(&mut self) {
unsafe {
libc::pthread_mutex_destroy(self.mutex.get());
}
}
}
pub(crate) struct PiMutexGuard<'a, T> {
mutex: &'a PiMutex<T>,
}
impl<T> std::ops::Deref for PiMutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.mutex.inner.get() }
}
}
impl<T> std::ops::DerefMut for PiMutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.mutex.inner.get() }
}
}
impl<T> Drop for PiMutexGuard<'_, T> {
fn drop(&mut self) {
unsafe {
let rc = libc::pthread_mutex_unlock(self.mutex.mutex.get());
debug_assert_eq!(rc, 0, "pthread_mutex_unlock failed: {rc}");
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub(crate) struct BaseKey(u64);
pub(crate) fn hash_file(path: &Path) -> Result<u64> {
use std::io::Read;
let mut f =
std::fs::File::open(path).with_context(|| format!("open for hash: {}", path.display()))?;
let mut hasher = std::hash::DefaultHasher::new();
let mut buf = [0u8; 65536];
loop {
let n = f
.read(&mut buf)
.with_context(|| format!("read for hash: {}", path.display()))?;
if n == 0 {
break;
}
hasher.write(&buf[..n]);
}
Ok(hasher.finish())
}
impl BaseKey {
pub(crate) fn new(payload: &Path, scheduler: Option<&Path>) -> Result<Self> {
let mut hasher = std::hash::DefaultHasher::new();
crate::GIT_FULL_HASH.hash(&mut hasher);
hash_file(payload)?.hash(&mut hasher);
Self::hash_shared_libs(payload, &mut hasher);
match scheduler {
Some(s) => {
1u8.hash(&mut hasher);
hash_file(s)?.hash(&mut hasher);
Self::hash_shared_libs(s, &mut hasher);
}
None => 0u8.hash(&mut hasher),
}
Ok(BaseKey(hasher.finish()))
}
pub(crate) fn new_shell(
payload: &Path,
scheduler: Option<&Path>,
include_files: &[(String, PathBuf)],
busybox: bool,
) -> Result<Self> {
let mut hasher = std::hash::DefaultHasher::new();
crate::GIT_FULL_HASH.hash(&mut hasher);
"ktstr-shell".hash(&mut hasher);
busybox.hash(&mut hasher);
hash_file(payload)?.hash(&mut hasher);
Self::hash_shared_libs(payload, &mut hasher);
match scheduler {
Some(s) => {
1u8.hash(&mut hasher);
hash_file(s)?.hash(&mut hasher);
Self::hash_shared_libs(s, &mut hasher);
}
None => 0u8.hash(&mut hasher),
}
let mut sorted: Vec<(&str, &Path)> = include_files
.iter()
.map(|(a, p)| (a.as_str(), p.as_path()))
.collect();
sorted.sort_by_key(|(a, _)| *a);
sorted.len().hash(&mut hasher);
for (archive_path, host_path) in &sorted {
archive_path.hash(&mut hasher);
hash_file(host_path)?.hash(&mut hasher);
Self::hash_shared_libs(host_path, &mut hasher);
}
Ok(BaseKey(hasher.finish()))
}
fn hash_shared_libs(binary: &Path, hasher: &mut std::hash::DefaultHasher) {
if let Ok(result) = initramfs::resolve_shared_libs(binary) {
let mut entries: Vec<_> = result.found.iter().map(|(_, p)| p.clone()).collect();
entries.sort();
for p in &entries {
p.to_str().unwrap_or("").hash(hasher);
if let Ok(sample) = hash_file(p) {
sample.hash(hasher);
}
}
}
}
}
fn base_cache() -> &'static Mutex<HashMap<BaseKey, Arc<Vec<u8>>>> {
static CACHE: OnceLock<Mutex<HashMap<BaseKey, Arc<Vec<u8>>>>> = OnceLock::new();
CACHE.get_or_init(|| Mutex::new(HashMap::new()))
}
pub(crate) enum BaseRef {
Mapped(initramfs::MappedShm),
Owned(Arc<Vec<u8>>),
}
impl AsRef<[u8]> for BaseRef {
fn as_ref(&self) -> &[u8] {
match self {
BaseRef::Mapped(m) => m.as_ref(),
BaseRef::Owned(a) => a,
}
}
}
pub(crate) fn get_or_build_base(
payload: &Path,
extras: &[(&str, &Path)],
include_files: &[(&str, &Path)],
busybox: bool,
key: &BaseKey,
) -> Result<BaseRef> {
cleanup_stale_shm(key);
if let Some(arc) = base_cache().lock().unwrap().get(key).cloned() {
tracing::debug!("initramfs base cache hit (process)");
return Ok(BaseRef::Owned(arc));
}
let seg_name = initramfs::shm_segment_name(key.0);
match shm_try_create_excl(&seg_name) {
ShmCreateResult::Winner(fd) => {
tracing::debug!("initramfs shm: builder (O_EXCL won)");
let t0 = std::time::Instant::now();
let data = initramfs::create_initramfs_base(payload, extras, include_files, busybox)?;
tracing::debug!(
elapsed_us = t0.elapsed().as_micros(),
bytes = data.len(),
"create_initramfs_base",
);
shm_write_and_release(fd, &data, &seg_name);
if let Some(mapped) = initramfs::shm_load_base(key.0) {
return Ok(BaseRef::Mapped(mapped));
}
let arc = Arc::new(data);
base_cache()
.lock()
.unwrap()
.insert(key.clone(), arc.clone());
return Ok(BaseRef::Owned(arc));
}
ShmCreateResult::Exists => {
tracing::debug!("initramfs shm: waiting for builder (EEXIST)");
if let Some(mapped) = initramfs::shm_load_base(key.0) {
tracing::debug!("initramfs base cache hit (shm, after wait)");
return Ok(BaseRef::Mapped(mapped));
}
}
ShmCreateResult::Error => {
if let Some(mapped) = initramfs::shm_load_base(key.0) {
tracing::debug!("initramfs base cache hit (shm)");
return Ok(BaseRef::Mapped(mapped));
}
}
}
let t0 = std::time::Instant::now();
let data = initramfs::create_initramfs_base(payload, extras, include_files, busybox)?;
let arc = Arc::new(data);
tracing::debug!(
elapsed_us = t0.elapsed().as_micros(),
bytes = arc.len(),
"create_initramfs_base (fallback)",
);
base_cache()
.lock()
.unwrap()
.insert(key.clone(), arc.clone());
if let Err(e) = initramfs::shm_store_base(key.0, &arc) {
tracing::warn!("shm_store_base: {e:#}");
}
Ok(BaseRef::Owned(arc))
}
fn cleanup_stale_shm(current: &BaseKey) {
let current_suffix = format!("{:016x}", current.0);
let shm_dir = match std::fs::read_dir("/dev/shm") {
Ok(d) => d,
Err(_) => return,
};
for entry in shm_dir.flatten() {
let name = entry.file_name();
let Some(name_str) = name.to_str() else {
continue;
};
let hash_suffix = if let Some(s) = name_str.strip_prefix("ktstr-base-") {
s
} else if let Some(s) = name_str.strip_prefix("ktstr-lz4-") {
s
} else if let Some(s) = name_str.strip_prefix("ktstr-gz-") {
s
} else {
continue;
};
if hash_suffix == current_suffix {
continue;
}
let shm_name = format!("/{name_str}");
let Ok(cname) = std::ffi::CString::new(shm_name) else {
continue;
};
unsafe {
let fd = libc::shm_open(cname.as_ptr(), libc::O_RDONLY, 0);
if fd < 0 {
continue;
}
if libc::flock(fd, libc::LOCK_EX | libc::LOCK_NB) == 0 {
libc::shm_unlink(cname.as_ptr());
libc::flock(fd, libc::LOCK_UN);
}
libc::close(fd);
}
}
}
enum ShmCreateResult {
Winner(std::os::unix::io::RawFd),
Exists,
Error,
}
fn shm_try_create_excl(name: &str) -> ShmCreateResult {
let Ok(cname) = std::ffi::CString::new(name) else {
return ShmCreateResult::Error;
};
unsafe {
let fd = libc::shm_open(
cname.as_ptr(),
libc::O_CREAT | libc::O_EXCL | libc::O_RDWR,
0o644,
);
if fd < 0 {
let err = *libc::__errno_location();
return if err == libc::EEXIST {
ShmCreateResult::Exists
} else {
ShmCreateResult::Error
};
}
if libc::flock(fd, libc::LOCK_EX) != 0 {
libc::close(fd);
return ShmCreateResult::Error;
}
ShmCreateResult::Winner(fd)
}
}
fn shm_write_and_release(fd: std::os::unix::io::RawFd, data: &[u8], seg_name: &str) {
unsafe {
if libc::ftruncate(fd, data.len() as libc::off_t) != 0 {
if let Ok(cname) = std::ffi::CString::new(seg_name) {
libc::shm_unlink(cname.as_ptr());
}
libc::flock(fd, libc::LOCK_UN);
libc::close(fd);
return;
}
let ptr = libc::mmap(
std::ptr::null_mut(),
data.len(),
libc::PROT_WRITE,
libc::MAP_SHARED,
fd,
0,
);
if ptr == libc::MAP_FAILED {
libc::ftruncate(fd, 0);
if let Ok(cname) = std::ffi::CString::new(seg_name) {
libc::shm_unlink(cname.as_ptr());
}
} else {
std::ptr::copy_nonoverlapping(data.as_ptr(), ptr as *mut u8, data.len());
libc::munmap(ptr, data.len());
}
libc::flock(fd, libc::LOCK_UN);
libc::close(fd);
}
}
pub(crate) struct MemoryBudget {
pub uncompressed_initramfs_bytes: u64,
pub compressed_initrd_bytes: u64,
pub kernel_init_size: u64,
pub shm_bytes: u64,
}
pub(crate) fn read_kernel_init_size(kernel_path: &Path) -> Result<u64> {
use std::io::{Read, Seek, SeekFrom};
let mut f = std::fs::File::open(kernel_path)
.with_context(|| format!("open kernel for init_size: {}", kernel_path.display()))?;
#[cfg(target_arch = "x86_64")]
{
f.seek(SeekFrom::Start(0x260))
.context("seek to init_size in bzImage")?;
let mut buf = [0u8; 4];
f.read_exact(&mut buf)
.context("read init_size from bzImage")?;
Ok(u32::from_le_bytes(buf) as u64)
}
#[cfg(target_arch = "aarch64")]
{
let mut magic = [0u8; 2];
f.read_exact(&mut magic).context("read kernel magic")?;
if magic == [0x1f, 0x8b] {
f.seek(SeekFrom::Start(0))
.context("seek vmlinuz to start")?;
let mut decoder = flate2::read::GzDecoder::new(&mut f);
let mut header = [0u8; 24];
decoder
.read_exact(&mut header)
.context("decompress arm64 vmlinuz header for image_size")?;
return Ok(u64::from_le_bytes(header[16..24].try_into().unwrap()));
}
f.seek(SeekFrom::Start(16))
.context("seek to image_size in arm64 Image")?;
let mut buf = [0u8; 8];
f.read_exact(&mut buf)
.context("read image_size from arm64 Image")?;
Ok(u64::from_le_bytes(buf))
}
}
const WORKLOAD_MB: u64 = 256;
pub(crate) fn initramfs_min_memory_mb(budget: &MemoryBudget) -> u32 {
let ceil_mb = |bytes: u64| -> u64 { (bytes + (1 << 20) - 1) >> 20 };
let init_size_mb = ceil_mb(budget.kernel_init_size);
let compressed_mb = ceil_mb(budget.compressed_initrd_bytes);
let shm_mb = ceil_mb(budget.shm_bytes);
let uncompressed_mb = ceil_mb(budget.uncompressed_initramfs_bytes);
let uncompressed_scaled = (uncompressed_mb * 10).div_ceil(9);
let content_mb = uncompressed_scaled + init_size_mb + compressed_mb;
let boot_mb = (content_mb * 64).div_ceil(63);
(boot_mb + WORKLOAD_MB + shm_mb) as u32
}
struct ImmediateExitHandle {
ptr: *mut u8,
}
unsafe impl Send for ImmediateExitHandle {}
unsafe impl Sync for ImmediateExitHandle {}
impl ImmediateExitHandle {
fn from_vcpu(vcpu: &mut kvm_ioctls::VcpuFd) -> Self {
let kvm_run = vcpu.get_kvm_run();
let ptr: *mut u8 = &mut kvm_run.immediate_exit;
Self { ptr }
}
fn set(&self, val: u8) {
unsafe {
std::ptr::write_volatile(self.ptr, val);
}
}
}
fn vcpu_signal() -> libc::c_int {
libc::SIGRTMIN()
}
extern "C" fn vcpu_signal_handler(_: libc::c_int, _: *mut libc::siginfo_t, _: *mut libc::c_void) {
std::sync::atomic::fence(Ordering::Acquire);
}
fn register_vcpu_signal_handler() {
unsafe {
let mut sa: libc::sigaction = std::mem::zeroed();
sa.sa_sigaction = vcpu_signal_handler as *const () as usize;
sa.sa_flags = libc::SA_SIGINFO;
libc::sigemptyset(&mut sa.sa_mask);
libc::sigaction(vcpu_signal(), &sa, std::ptr::null_mut());
let mut set: libc::sigset_t = std::mem::zeroed();
libc::sigemptyset(&mut set);
libc::sigaddset(&mut set, vcpu_signal());
libc::pthread_sigmask(libc::SIG_UNBLOCK, &set, std::ptr::null_mut());
}
}
fn pin_current_thread(cpu: usize, label: &str) {
let mut cpuset = nix::sched::CpuSet::new();
if let Err(e) = cpuset.set(cpu) {
eprintln!("performance_mode: WARNING: cpuset.set({cpu}) for {label}: {e}");
return;
}
match nix::sched::sched_setaffinity(nix::unistd::Pid::from_raw(0), &cpuset) {
Ok(()) => eprintln!("performance_mode: pinned {label} to host CPU {cpu}"),
Err(e) => eprintln!("performance_mode: WARNING: pin {label} to CPU {cpu}: {e}"),
}
}
fn set_rt_priority(priority: i32, label: &str) {
let param = libc::sched_param {
sched_priority: priority,
};
let rc = unsafe { libc::sched_setscheduler(0, libc::SCHED_FIFO, ¶m) };
if rc == 0 {
eprintln!("performance_mode: {label} set to SCHED_FIFO priority {priority}");
} else {
let err = std::io::Error::last_os_error();
eprintln!("performance_mode: WARNING: SCHED_FIFO for {label}: {err} (need CAP_SYS_NICE)");
}
}
#[derive(Debug)]
pub struct VmResult {
pub success: bool,
pub exit_code: i32,
pub duration: Duration,
pub timed_out: bool,
pub output: String,
pub stderr: String,
pub monitor: Option<monitor::MonitorReport>,
pub shm_data: Option<shm_ring::ShmDrainResult>,
pub stimulus_events: Vec<shm_ring::StimulusEvent>,
pub verifier_stats: Vec<monitor::bpf_prog::ProgVerifierStats>,
pub kvm_stats: Option<KvmStatsTotals>,
pub crash_message: Option<String>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct KvmStatsTotals {
pub per_vcpu: Vec<HashMap<String, u64>>,
}
pub const KVM_INTERESTING_STATS: &[&str] = &[
"exits",
"halt_exits",
"halt_successful_poll",
"halt_attempted_poll",
"halt_wait_ns",
"preemption_reported",
"signal_exits",
"hypercalls",
];
impl KvmStatsTotals {
pub fn sum(&self, name: &str) -> u64 {
self.per_vcpu.iter().filter_map(|m| m.get(name)).sum()
}
pub fn avg(&self, name: &str) -> u64 {
if self.per_vcpu.is_empty() {
return 0;
}
self.sum(name) / self.per_vcpu.len() as u64
}
}
struct VmRunState {
exit_code: i32,
timed_out: bool,
ap_threads: Vec<VcpuThread>,
monitor_handle: Option<JoinHandle<(Vec<monitor::MonitorSample>, shm_ring::ShmDrainResult)>>,
bpf_write_handle: Option<JoinHandle<()>>,
com1: Arc<PiMutex<console::Serial>>,
com2: Arc<PiMutex<console::Serial>>,
kill: Arc<AtomicBool>,
vm: kvm::KtstrKvm,
}
#[cfg(target_arch = "x86_64")]
const DRAM_BASE: u64 = 0;
#[cfg(target_arch = "aarch64")]
const DRAM_BASE: u64 = kvm::DRAM_START;
#[cfg(target_arch = "x86_64")]
const INITRD_ADDR: u64 = 0x800_0000;
#[cfg(target_arch = "aarch64")]
fn aarch64_initrd_addr(memory_mb: u32, shm_size: u64, initrd_max_size: u64) -> u64 {
let fdt_addr = aarch64::fdt::fdt_address(memory_mb, shm_size);
(fdt_addr - initrd_max_size) & !0xFFF
}
struct VcpuThread {
handle: JoinHandle<kvm_ioctls::VcpuFd>,
exited: Arc<AtomicBool>,
immediate_exit: Option<ImmediateExitHandle>,
}
impl VcpuThread {
fn kick(&self) {
if let Some(ref ie) = self.immediate_exit {
ie.set(1);
std::sync::atomic::fence(Ordering::Release);
}
self.signal();
}
fn signal(&self) {
unsafe {
libc::pthread_kill(self.handle.as_pthread_t() as libc::pthread_t, vcpu_signal());
}
}
fn wait_for_exit(&self, timeout: Duration) {
let start = Instant::now();
let mut last_kick = Instant::now();
while !self.exited.load(Ordering::Acquire) {
if start.elapsed() > timeout {
break;
}
if last_kick.elapsed() > Duration::from_millis(10) {
self.kick();
last_kick = Instant::now();
}
std::thread::yield_now();
}
}
}
pub struct KtstrVm {
kernel: PathBuf,
init_binary: Option<PathBuf>,
scheduler_binary: Option<PathBuf>,
run_args: Vec<String>,
sched_args: Vec<String>,
topology: Topology,
memory_mb: Option<u32>,
memory_min_mb: u32,
cmdline_extra: String,
timeout: Duration,
shm_size: u64,
monitor_thresholds: Option<crate::monitor::MonitorThresholds>,
watchdog_timeout: Option<Duration>,
bpf_map_write: Option<BpfMapWriteParams>,
performance_mode: bool,
pinning_plan: Option<host_topology::PinningPlan>,
mbind_nodes: Vec<usize>,
#[allow(dead_code)]
cpu_locks: Vec<std::os::fd::OwnedFd>,
sched_enable_cmds: Vec<String>,
sched_disable_cmds: Vec<String>,
include_files: Vec<(String, PathBuf)>,
busybox: bool,
dmesg: bool,
exec_cmd: Option<String>,
}
#[derive(Clone)]
struct BpfMapWriteParams {
map_name_suffix: String,
offset: usize,
value: u32,
}
impl KtstrVm {
pub fn builder() -> KtstrVmBuilder {
KtstrVmBuilder::default()
}
pub fn run(&self) -> Result<VmResult> {
let start = Instant::now();
let initramfs_handle = self.spawn_initramfs_resolve();
let (mut vm, kernel_result) = self.create_vm_and_load_kernel()?;
#[cfg(target_arch = "x86_64")]
let _kernel_result = {
let kr = self.setup_memory(&mut vm, kernel_result, initramfs_handle)?;
self.setup_vcpus(&vm, kr.entry)?;
kr
};
#[cfg(target_arch = "aarch64")]
let _kernel_result = {
let kr = self.setup_memory_aarch64(&mut vm, kernel_result, initramfs_handle)?;
self.setup_vcpus_aarch64(&vm, kr.entry)?;
kr
};
#[cfg(target_arch = "x86_64")]
let stats_ctx = kvm_stats::open_stats_context(&vm.vcpus);
#[cfg(target_arch = "x86_64")]
if stats_ctx.is_none() {
tracing::debug!("KVM_GET_STATS_FD not supported, skipping stats collection");
}
tracing::debug!(elapsed_us = start.elapsed().as_micros(), "total_setup");
let run = self.run_vm(start, vm)?;
#[allow(unused_mut)]
let mut result = self.collect_results(start, run)?;
#[cfg(target_arch = "x86_64")]
if let Some(ctx) = stats_ctx {
result.kvm_stats = Some(ctx.read_stats());
}
Ok(result)
}
pub fn run_interactive(&self) -> Result<()> {
let start = Instant::now();
let initramfs_handle = self.spawn_initramfs_resolve();
let (mut vm, kernel_result) = self.create_vm_and_load_kernel()?;
#[cfg(target_arch = "x86_64")]
{
let kr = self.setup_memory(&mut vm, kernel_result, initramfs_handle)?;
self.setup_vcpus(&vm, kr.entry)?;
}
#[cfg(target_arch = "aarch64")]
{
let kr = self.setup_memory_aarch64(&mut vm, kernel_result, initramfs_handle)?;
self.setup_vcpus_aarch64(&vm, kr.entry)?;
}
let com1 = Arc::new(PiMutex::new(console::Serial::new(console::COM1_BASE)));
let com2 = Arc::new(PiMutex::new(console::Serial::new(console::COM2_BASE)));
let mut vc = virtio_console::VirtioConsole::new();
vc.set_mem(vm.guest_mem.clone());
let virtio_con = Arc::new(PiMutex::new(vc));
#[cfg(target_arch = "x86_64")]
if !vm.split_irqchip {
vm.vm_fd
.register_irqfd(com1.lock().irq_evt(), console::COM1_IRQ)
.context("register COM1 irqfd")?;
vm.vm_fd
.register_irqfd(com2.lock().irq_evt(), console::COM2_IRQ)
.context("register COM2 irqfd")?;
vm.vm_fd
.register_irqfd(virtio_con.lock().irq_evt(), kvm::VIRTIO_CONSOLE_IRQ)
.context("register virtio-console irqfd")?;
}
#[cfg(target_arch = "aarch64")]
{
vm.vm_fd
.register_irqfd(com1.lock().irq_evt(), kvm::SERIAL_IRQ)
.context("register serial irqfd")?;
vm.vm_fd
.register_irqfd(com2.lock().irq_evt(), kvm::SERIAL2_IRQ)
.context("register serial2 irqfd")?;
vm.vm_fd
.register_irqfd(virtio_con.lock().irq_evt(), kvm::VIRTIO_CONSOLE_IRQ)
.context("register virtio-console irqfd")?;
}
let exec_mode = self.exec_cmd.is_some();
if !exec_mode {
use std::os::unix::io::AsRawFd;
let stdin_fd = std::io::stdin().as_raw_fd();
let borrowed = unsafe { std::os::unix::io::BorrowedFd::borrow_raw(stdin_fd) };
anyhow::ensure!(
nix::unistd::isatty(borrowed).unwrap_or(false),
"stdin must be a terminal for interactive shell mode",
);
}
let _raw_guard = if exec_mode {
None
} else {
Some(TerminalRawGuard::enter().context("failed to set terminal to raw mode")?)
};
let (wakeup_r, wakeup_w) = nix::unistd::pipe().context("create stdin wakeup pipe")?;
let kill = Arc::new(AtomicBool::new(false));
let has_immediate_exit = vm.has_immediate_exit;
let mut vcpus = std::mem::take(&mut vm.vcpus);
let mut bsp = vcpus.remove(0);
let ap_pins = vec![None; vcpus.len()];
let ap_threads = self.spawn_ap_threads(
vcpus,
has_immediate_exit,
&com1,
&com2,
Some(&virtio_con),
&kill,
&ap_pins,
)?;
let bsp_ie_for_stdin = if has_immediate_exit {
Some(ImmediateExitHandle::from_vcpu(&mut bsp))
} else {
None
};
let bsp_tid = unsafe { libc::pthread_self() };
let vc_for_stdin = virtio_con.clone();
let kill_for_stdin = kill.clone();
let stdin_thread = std::thread::Builder::new()
.name("interactive-stdin".into())
.spawn(move || {
use std::io::Read;
use std::os::unix::io::{AsFd, AsRawFd};
let wakeup_fd = wakeup_r;
let stdin_fd = std::io::stdin().as_raw_fd();
let mut buf = [0u8; 4096];
let mut saw_ctrl_a = false;
loop {
if kill_for_stdin.load(Ordering::Acquire) {
break;
}
let stdin_borrowed =
unsafe { std::os::unix::io::BorrowedFd::borrow_raw(stdin_fd) };
let wakeup_borrowed = wakeup_fd.as_fd();
let mut fds = [
nix::poll::PollFd::new(stdin_borrowed, nix::poll::PollFlags::POLLIN),
nix::poll::PollFd::new(wakeup_borrowed, nix::poll::PollFlags::POLLIN),
];
match nix::poll::poll(&mut fds, 100u16) {
Ok(0) => continue, Err(nix::errno::Errno::EINTR) => continue,
Err(_) => break,
Ok(_) => {}
}
if fds[1]
.revents()
.is_some_and(|r| r.intersects(nix::poll::PollFlags::POLLIN))
{
break;
}
if fds[0]
.revents()
.is_some_and(|r| r.intersects(nix::poll::PollFlags::POLLIN))
{
let mut stdin = std::io::stdin().lock();
match stdin.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let mut forward_start = 0usize;
for i in 0..n {
if saw_ctrl_a {
saw_ctrl_a = false;
if buf[i] == b'x' || buf[i] == b'X' {
eprintln!("\r\nTerminated.");
kill_for_stdin.store(true, Ordering::Release);
if let Some(ref ie) = bsp_ie_for_stdin {
ie.set(1);
std::sync::atomic::fence(Ordering::Release);
}
unsafe {
libc::pthread_kill(bsp_tid, vcpu_signal());
}
return;
}
vc_for_stdin.lock().queue_input(&[0x01]);
}
if buf[i] == 0x01 {
if forward_start < i {
vc_for_stdin.lock().queue_input(&buf[forward_start..i]);
}
saw_ctrl_a = true;
forward_start = i + 1;
continue;
}
}
if forward_start < n {
vc_for_stdin.lock().queue_input(&buf[forward_start..n]);
}
}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(_) => break,
}
}
}
})
.context("spawn stdin reader thread")?;
let vc_for_stdout = virtio_con.clone();
let kill_for_stdout = kill.clone();
let stdout_thread: JoinHandle<bool> = std::thread::Builder::new()
.name("interactive-stdout".into())
.spawn(move || {
use std::io::Write;
let mut wrote_any = false;
let tx_evt_raw_fd = {
let guard = vc_for_stdout.lock();
std::os::unix::io::AsRawFd::as_raw_fd(guard.tx_evt())
};
let mut stdout = std::io::stdout().lock();
loop {
if kill_for_stdout.load(Ordering::Acquire) {
break;
}
let borrowed =
unsafe { std::os::unix::io::BorrowedFd::borrow_raw(tx_evt_raw_fd) };
let mut fds = [nix::poll::PollFd::new(
borrowed,
nix::poll::PollFlags::POLLIN,
)];
match nix::poll::poll(&mut fds, 50u16) {
Ok(0) => continue,
Err(nix::errno::Errno::EINTR) => continue,
Err(_) => break,
Ok(_) => {
let _ = vc_for_stdout.lock().tx_evt().read();
}
}
if kill_for_stdout.load(Ordering::Acquire) {
break;
}
let data = vc_for_stdout.lock().drain_output();
if !data.is_empty() {
let valid_len = match std::str::from_utf8(&data) {
Ok(_) => data.len(),
Err(e) => e.valid_up_to(),
};
if valid_len > 0 {
if stdout.write_all(&data[..valid_len]).is_err()
|| stdout.flush().is_err()
{
kill_for_stdout.store(true, Ordering::Release);
break;
}
wrote_any = true;
}
}
}
let data = vc_for_stdout.lock().drain_output();
if !data.is_empty() {
let valid_len = match std::str::from_utf8(&data) {
Ok(_) => data.len(),
Err(e) => e.valid_up_to(),
};
if valid_len > 0 {
let _ = stdout.write_all(&data[..valid_len]);
let _ = stdout.flush();
wrote_any = true;
}
}
wrote_any
})
.context("spawn stdout writer thread")?;
let dmesg_thread = if self.dmesg {
let com1_for_dmesg = com1.clone();
let kill_for_dmesg = kill.clone();
Some(
std::thread::Builder::new()
.name("interactive-dmesg".into())
.spawn(move || {
use std::io::Write;
loop {
if kill_for_dmesg.load(Ordering::Acquire) {
break;
}
std::thread::sleep(std::time::Duration::from_millis(50));
let data = com1_for_dmesg.lock().drain_output();
if !data.is_empty() {
let mut stderr = std::io::stderr().lock();
let _ = stderr.write_all(&data);
let _ = stderr.flush();
}
}
let data = com1_for_dmesg.lock().drain_output();
if !data.is_empty() {
let mut stderr = std::io::stderr().lock();
let _ = stderr.write_all(&data);
let _ = stderr.flush();
}
})
.context("spawn dmesg thread")?,
)
} else {
None
};
register_vcpu_signal_handler();
let interactive_timeout = Duration::from_secs(24 * 60 * 60);
self.run_bsp_loop(
&mut bsp,
&com1,
&com2,
Some(&virtio_con),
&kill,
has_immediate_exit,
start,
interactive_timeout,
);
kill.store(true, Ordering::Release);
let _ = nix::unistd::write(&wakeup_w, &[0u8]);
drop(wakeup_w);
for vt in &ap_threads {
if !vt.exited.load(Ordering::Acquire) {
vt.kick();
}
}
for vt in ap_threads {
vt.wait_for_exit(Duration::from_secs(5));
let _ = vt.handle.join();
}
let stdout_wrote = stdout_thread.join().unwrap_or(false);
let _ = stdin_thread.join();
if let Some(dt) = dmesg_thread {
let _ = dt.join();
}
drop(_raw_guard);
if exec_mode && !stdout_wrote {
let app_output = com2.lock().output();
if !app_output.is_empty() {
use std::io::Write;
let mut stdout = std::io::stdout().lock();
for line in app_output.lines() {
if !line.starts_with("KTSTR_EXEC_EXIT=") {
let _ = writeln!(stdout, "{line}");
}
}
let _ = stdout.flush();
}
}
if !self.dmesg {
let console_output = com1.lock().output();
if !console_output.is_empty() {
eprintln!("{console_output}");
}
}
if !exec_mode {
eprintln!("Connection to VM closed.");
}
Ok(())
}
fn create_vm_and_load_kernel(&self) -> Result<(kvm::KtstrKvm, Option<boot::KernelLoadResult>)> {
let t0 = Instant::now();
let use_hugepages = self.performance_mode
&& self.memory_mb.is_some_and(|mb| {
host_topology::hugepages_free() >= host_topology::hugepages_needed(mb)
});
let vm = match self.memory_mb {
Some(mb) => {
if use_hugepages {
kvm::KtstrKvm::new_with_hugepages(self.topology, mb, self.performance_mode)
.context("create VM with hugepages")?
} else {
kvm::KtstrKvm::new(self.topology, mb, self.performance_mode)
.context("create VM")?
}
}
None => {
kvm::KtstrKvm::new_deferred(self.topology, use_hugepages, self.performance_mode)
.context("create VM (deferred memory)")?
}
};
tracing::debug!(elapsed_us = t0.elapsed().as_micros(), "kvm_create");
let kernel_result = if let Some(mb) = self.memory_mb {
if self.performance_mode
&& !self.mbind_nodes.is_empty()
&& let Ok(host_addr) = vm.guest_mem.get_host_address(GuestAddress(DRAM_BASE))
{
let mem_size = (mb as u64) << 20;
host_topology::mbind_to_nodes(host_addr, mem_size as usize, &self.mbind_nodes);
}
let t0 = Instant::now();
let kr = boot::load_kernel(&vm.guest_mem, &self.kernel).context("load kernel")?;
tracing::debug!(elapsed_us = t0.elapsed().as_micros(), "load_kernel");
Some(kr)
} else {
None
};
Ok((vm, kernel_result))
}
fn spawn_initramfs_resolve(&self) -> Option<JoinHandle<Result<(BaseRef, BaseKey)>>> {
let bin = self.init_binary.as_ref()?;
let payload = bin.clone();
let scheduler = self.scheduler_binary.clone();
let include_files = self.include_files.clone();
let busybox = self.busybox;
std::thread::Builder::new()
.name("initramfs-resolve".into())
.spawn(move || -> Result<(BaseRef, BaseKey)> {
let extras: Vec<(&str, &std::path::Path)> = scheduler
.as_deref()
.map(|p| vec![("scheduler", p)])
.unwrap_or_default();
let shell_mode = busybox || !include_files.is_empty();
let key = if shell_mode {
BaseKey::new_shell(&payload, scheduler.as_deref(), &include_files, busybox)?
} else {
BaseKey::new(&payload, scheduler.as_deref())?
};
let include_refs: Vec<(&str, &std::path::Path)> = include_files
.iter()
.map(|(a, p)| (a.as_str(), p.as_path()))
.collect();
let base = get_or_build_base(&payload, &extras, &include_refs, busybox, &key)?;
Ok((base, key))
})
.ok()
}
#[cfg(target_arch = "x86_64")]
fn compress_and_load_initrd(
&self,
guest_mem: &GuestMemoryMmap,
base_bytes: &[u8],
suffix: &[u8],
key: &BaseKey,
load_addr: u64,
) -> Result<u32> {
let uncompressed_size = base_bytes.len() + suffix.len();
let t0 = Instant::now();
let lz4_base = self.get_or_compress_base(base_bytes, key)?;
let lz4_suffix = initramfs::lz4_legacy_compress(suffix);
let total_compressed = lz4_base.len() + lz4_suffix.len();
tracing::debug!(
elapsed_us = t0.elapsed().as_micros(),
uncompressed = uncompressed_size,
lz4_base = lz4_base.len(),
lz4_suffix = lz4_suffix.len(),
ratio = format!("{:.1}x", uncompressed_size as f64 / total_compressed as f64),
"lz4_initramfs",
);
tracing::debug!(
base_magic = format!(
"{:02x}{:02x}{:02x}{:02x}",
lz4_base[0], lz4_base[1], lz4_base[2], lz4_base[3]
),
suffix_magic = format!(
"{:02x}{:02x}{:02x}{:02x}",
lz4_suffix[0], lz4_suffix[1], lz4_suffix[2], lz4_suffix[3]
),
base_len = lz4_base.len(),
suffix_len = lz4_suffix.len(),
total = total_compressed,
load_addr = format!("{:#x}", load_addr),
suffix_addr = format!("{:#x}", load_addr + lz4_base.len() as u64),
"initrd_load_debug",
);
let t0 = Instant::now();
let cow_ok = self.try_cow_overlay(guest_mem, key, lz4_base.len(), load_addr);
if cow_ok {
guest_mem
.write_slice(&lz4_suffix, GuestAddress(load_addr + lz4_base.len() as u64))
.context("write lz4 suffix after COW base")?;
tracing::debug!(
elapsed_us = t0.elapsed().as_micros(),
cow = true,
"initrd_write"
);
} else {
initramfs::load_initramfs_parts(guest_mem, &[&lz4_base, &lz4_suffix], load_addr)?;
tracing::debug!(
elapsed_us = t0.elapsed().as_micros(),
cow = false,
"initrd_write"
);
}
let mut verify_buf = [0u8; 8];
guest_mem
.read_slice(&mut verify_buf, GuestAddress(load_addr))
.context("read-back initrd verify")?;
tracing::debug!(
first_8 = format!(
"{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
verify_buf[0],
verify_buf[1],
verify_buf[2],
verify_buf[3],
verify_buf[4],
verify_buf[5],
verify_buf[6],
verify_buf[7]
),
expected_magic = "02214c18",
"initrd_verify",
);
Ok(total_compressed as u32)
}
#[cfg(target_arch = "x86_64")]
fn join_and_load_initramfs(
&self,
vm: &kvm::KtstrKvm,
handle: JoinHandle<Result<(BaseRef, BaseKey)>>,
load_addr: u64,
) -> Result<(Option<u64>, Option<u32>)> {
let t0 = Instant::now();
let (base, key) = handle
.join()
.map_err(|_| anyhow::anyhow!("initramfs-resolve thread panicked"))??;
tracing::debug!(elapsed_us = t0.elapsed().as_micros(), "initramfs_join");
let base_bytes: &[u8] = base.as_ref();
let t0 = Instant::now();
let enable_refs: Vec<&str> = self.sched_enable_cmds.iter().map(|s| s.as_str()).collect();
let disable_refs: Vec<&str> = self.sched_disable_cmds.iter().map(|s| s.as_str()).collect();
let suffix = initramfs::build_suffix_full(
base_bytes.len(),
&self.run_args,
&self.sched_args,
&enable_refs,
&disable_refs,
self.exec_cmd.as_deref(),
)?;
let uncompressed_size = base_bytes.len() + suffix.len();
tracing::debug!(
elapsed_us = t0.elapsed().as_micros(),
base_bytes = base_bytes.len(),
suffix_bytes = suffix.len(),
"build_suffix",
);
let memory_mb = self.memory_mb.expect(
"join_and_load_initramfs called in deferred mode; \
use join_compute_memory_and_load instead",
);
let lz4_base = self.get_or_compress_base(base_bytes, &key)?;
let lz4_suffix = initramfs::lz4_legacy_compress(&suffix);
let compressed_size = lz4_base.len() + lz4_suffix.len();
let kernel_init_size = read_kernel_init_size(&self.kernel).unwrap_or(0) as u64;
let budget = MemoryBudget {
uncompressed_initramfs_bytes: uncompressed_size as u64,
compressed_initrd_bytes: compressed_size as u64,
kernel_init_size,
shm_bytes: self.shm_size,
};
let min_mb = initramfs_min_memory_mb(&budget);
if memory_mb < min_mb {
anyhow::bail!(
"VM memory {}MB insufficient for initramfs \
(uncompressed={}MB, compressed={}MB, \
init_size={}MB): need {}MB",
memory_mb,
uncompressed_size >> 20,
compressed_size >> 20,
kernel_init_size >> 20,
min_mb,
);
}
let size =
self.compress_and_load_initrd(&vm.guest_mem, base_bytes, &suffix, &key, load_addr)?;
Ok((Some(load_addr), Some(size)))
}
#[cfg(target_arch = "x86_64")]
fn join_compute_memory_and_load(
&self,
vm: &mut kvm::KtstrKvm,
handle: JoinHandle<Result<(BaseRef, BaseKey)>>,
load_addr: u64,
) -> Result<(Option<u64>, Option<u32>, u32)> {
let t0 = Instant::now();
let (base, key) = handle
.join()
.map_err(|_| anyhow::anyhow!("initramfs-resolve thread panicked"))??;
tracing::debug!(elapsed_us = t0.elapsed().as_micros(), "initramfs_join");
let base_bytes: &[u8] = base.as_ref();
let t0 = Instant::now();
let enable_refs: Vec<&str> = self.sched_enable_cmds.iter().map(|s| s.as_str()).collect();
let disable_refs: Vec<&str> = self.sched_disable_cmds.iter().map(|s| s.as_str()).collect();
let suffix = initramfs::build_suffix_full(
base_bytes.len(),
&self.run_args,
&self.sched_args,
&enable_refs,
&disable_refs,
self.exec_cmd.as_deref(),
)?;
let uncompressed_size = base_bytes.len() + suffix.len();
tracing::debug!(
elapsed_us = t0.elapsed().as_micros(),
base_bytes = base_bytes.len(),
suffix_bytes = suffix.len(),
"build_suffix",
);
let t0_compress = Instant::now();
let lz4_base = self.get_or_compress_base(base_bytes, &key)?;
let lz4_suffix = initramfs::lz4_legacy_compress(&suffix);
let compressed_size = lz4_base.len() + lz4_suffix.len();
tracing::debug!(
elapsed_us = t0_compress.elapsed().as_micros(),
uncompressed = uncompressed_size,
compressed = compressed_size,
ratio = format!("{:.1}x", uncompressed_size as f64 / compressed_size as f64),
"deferred_lz4_compress",
);
let kernel_init_size = read_kernel_init_size(&self.kernel).unwrap_or(0) as u64;
let budget = MemoryBudget {
uncompressed_initramfs_bytes: uncompressed_size as u64,
compressed_initrd_bytes: compressed_size as u64,
kernel_init_size,
shm_bytes: self.shm_size,
};
let memory_mb = initramfs_min_memory_mb(&budget).max(self.memory_min_mb);
tracing::debug!(
uncompressed_mb = uncompressed_size >> 20,
compressed_mb = compressed_size >> 20,
init_size_mb = kernel_init_size >> 20,
memory_min_mb = self.memory_min_mb,
memory_mb,
"deferred_memory_computed",
);
vm.allocate_and_register_memory(memory_mb)
.with_context(|| format!("allocate deferred memory ({memory_mb}MB)"))?;
let size =
self.compress_and_load_initrd(&vm.guest_mem, base_bytes, &suffix, &key, load_addr)?;
Ok((Some(load_addr), Some(size), memory_mb))
}
fn effective_memory_mb(&self, guest_mem: &GuestMemoryMmap) -> u32 {
use vm_memory::GuestMemoryRegion;
match self.memory_mb {
Some(mb) => mb,
None => {
let total_bytes: u64 = guest_mem.iter().map(|r| r.len()).sum();
(total_bytes >> 20) as u32
}
}
}
#[cfg(target_arch = "x86_64")]
fn get_or_compress_base(&self, base_bytes: &[u8], key: &BaseKey) -> Result<Vec<u8>> {
if let Some((fd, len)) = initramfs::shm_open_lz4(key.0) {
let mut buf = vec![0u8; len];
unsafe {
let ptr = libc::mmap(
std::ptr::null_mut(),
len,
libc::PROT_READ,
libc::MAP_SHARED,
fd,
0,
);
if ptr != libc::MAP_FAILED {
std::ptr::copy_nonoverlapping(ptr as *const u8, buf.as_mut_ptr(), len);
libc::munmap(ptr, len);
initramfs::shm_close_fd(fd);
if buf.len() >= 4 && buf[..4] == initramfs::LZ4_LEGACY_MAGIC {
tracing::debug!(bytes = len, "lz4_base cache hit (shm)");
return Ok(buf);
}
tracing::warn!(
bytes = len,
magic = format!("{:02x}{:02x}{:02x}{:02x}", buf[0], buf[1], buf[2], buf[3]),
"stale compressed shm segment (wrong magic), recompressing"
);
} else {
initramfs::shm_close_fd(fd);
}
}
}
let lz4 = initramfs::lz4_legacy_compress(base_bytes);
if let Err(e) = initramfs::shm_store_lz4(key.0, &lz4) {
tracing::warn!("shm_store_lz4: {e:#}");
}
Ok(lz4)
}
#[cfg(target_arch = "x86_64")]
fn try_cow_overlay(
&self,
guest_mem: &GuestMemoryMmap,
key: &BaseKey,
expected_len: usize,
load_addr: u64,
) -> bool {
let Some((fd, len)) = initramfs::shm_open_lz4(key.0) else {
return false;
};
if len != expected_len {
initramfs::shm_close_fd(fd);
return false;
}
let mut magic = [0u8; 4];
unsafe {
let ptr = libc::mmap(
std::ptr::null_mut(),
len,
libc::PROT_READ,
libc::MAP_SHARED,
fd,
0,
);
if ptr == libc::MAP_FAILED {
initramfs::shm_close_fd(fd);
return false;
}
std::ptr::copy_nonoverlapping(ptr as *const u8, magic.as_mut_ptr(), 4);
libc::munmap(ptr, len);
}
if magic != initramfs::LZ4_LEGACY_MAGIC {
tracing::warn!(
magic = format!(
"{:02x}{:02x}{:02x}{:02x}",
magic[0], magic[1], magic[2], magic[3]
),
"stale compressed shm segment in COW path, skipping"
);
initramfs::shm_close_fd(fd);
return false;
}
let Ok(host_addr) = guest_mem.get_host_address(GuestAddress(load_addr)) else {
initramfs::shm_close_fd(fd);
return false;
};
let ok = unsafe { initramfs::cow_overlay(host_addr, len, fd) };
initramfs::shm_close_fd(fd);
ok
}
fn init_shm_region(&self, guest_mem: &GuestMemoryMmap, shm_base: u64) -> Result<()> {
let shm_size = self.shm_size as usize;
let header = shm_ring::ShmRingHeader {
magic: shm_ring::SHM_RING_MAGIC,
version: shm_ring::SHM_RING_VERSION,
capacity: (shm_size - shm_ring::HEADER_SIZE) as u32,
_pad: 0,
write_ptr: 0,
read_ptr: 0,
drops: 0,
};
guest_mem
.write_slice(
zerocopy::IntoBytes::as_bytes(&header),
GuestAddress(shm_base),
)
.context("write SHM header")
}
#[cfg(target_arch = "x86_64")]
fn setup_memory(
&self,
vm: &mut kvm::KtstrKvm,
kernel_result: Option<boot::KernelLoadResult>,
initramfs_handle: Option<JoinHandle<Result<(BaseRef, BaseKey)>>>,
) -> Result<boot::KernelLoadResult> {
let (kernel_result, initrd_addr, initrd_size) = if let Some(kr) = kernel_result {
let (initrd_addr, initrd_size) = match initramfs_handle {
Some(handle) => self.join_and_load_initramfs(vm, handle, INITRD_ADDR)?,
None => (None, None),
};
(kr, initrd_addr, initrd_size)
} else {
let (initrd_addr, initrd_size, memory_mb) = match initramfs_handle {
Some(handle) => self.join_compute_memory_and_load(vm, handle, INITRD_ADDR)?,
None => {
let memory_mb = 256u32;
vm.allocate_and_register_memory(memory_mb)
.context("allocate deferred memory (no initramfs)")?;
(None, None, memory_mb)
}
};
if self.performance_mode
&& !self.mbind_nodes.is_empty()
&& let Ok(host_addr) = vm.guest_mem.get_host_address(GuestAddress(DRAM_BASE))
{
let mem_size = (memory_mb as u64) << 20;
host_topology::mbind_to_nodes(host_addr, mem_size as usize, &self.mbind_nodes);
}
let t0 = Instant::now();
let kr = boot::load_kernel(&vm.guest_mem, &self.kernel).context("load kernel")?;
tracing::debug!(elapsed_us = t0.elapsed().as_micros(), "load_kernel");
(kr, initrd_addr, initrd_size)
};
let memory_mb = self.effective_memory_mb(&vm.guest_mem);
let mut cmdline = concat!(
"console=ttyS0 nomodules mitigations=off ",
"no_timer_check clocksource=kvm-clock ",
"random.trust_cpu=on swiotlb=noforce ",
"i8042.noaux i8042.nomux i8042.nopnp i8042.dumbkbd ",
"pci=off reboot=k panic=-1 iomem=relaxed nokaslr lockdown=none ",
"sysctl.kernel.unprivileged_bpf_disabled=0 ",
"sysctl.kernel.sched_schedstats=1",
)
.to_string();
let verbose = std::env::var("KTSTR_VERBOSE")
.map(|v| v == "1")
.unwrap_or(false)
|| std::env::var("RUST_BACKTRACE").is_ok_and(|v| v == "1" || v == "full");
if verbose {
cmdline.push_str(" earlyprintk=serial loglevel=7");
} else {
cmdline.push_str(" loglevel=0");
}
if self.init_binary.is_some() {
cmdline.push_str(" rdinit=/init initramfs_options=size=90%");
}
cmdline.push_str(&format!(
" virtio_mmio.device={:#x}@{:#x}:{}",
virtio_console::VIRTIO_MMIO_SIZE,
kvm::VIRTIO_CONSOLE_MMIO_BASE,
kvm::VIRTIO_CONSOLE_IRQ,
));
if self.shm_size > 0 {
let mem_size = (memory_mb as u64) << 20;
let shm_base = mem_size - self.shm_size;
cmdline.push_str(&format!(
" KTSTR_SHM_BASE={:#x} KTSTR_SHM_SIZE={:#x}",
shm_base, self.shm_size
));
}
if !self.cmdline_extra.is_empty() {
cmdline.push(' ');
cmdline.push_str(&self.cmdline_extra);
}
let t0 = Instant::now();
boot::write_cmdline(&vm.guest_mem, &cmdline)?;
boot::write_boot_params(
&vm.guest_mem,
&cmdline,
memory_mb,
initrd_addr,
initrd_size,
kernel_result.setup_header.as_ref(),
self.shm_size,
)?;
tracing::debug!(elapsed_us = t0.elapsed().as_micros(), "cmdline_boot_params");
let t0 = Instant::now();
if self.shm_size > 0 {
let mem_size = (memory_mb as u64) << 20;
let shm_base = mem_size - self.shm_size;
self.init_shm_region(&vm.guest_mem, shm_base)?;
}
tracing::debug!(elapsed_us = t0.elapsed().as_micros(), "shm_ring_init");
let t0 = Instant::now();
mptable::setup_mptable(&vm.guest_mem, &self.topology)?;
let _acpi_layout =
acpi::setup_acpi(&vm.guest_mem, &self.topology, memory_mb, self.shm_size)?;
tracing::debug!(elapsed_us = t0.elapsed().as_micros(), "mptable_acpi");
Ok(kernel_result)
}
#[cfg(target_arch = "x86_64")]
fn setup_vcpus(&self, vm: &kvm::KtstrKvm, kernel_entry: u64) -> Result<()> {
let t0 = Instant::now();
boot::setup_sregs(&vm.guest_mem, &vm.vcpus[0], vm.split_irqchip)?;
boot::setup_regs(&vm.vcpus[0], kernel_entry)?;
boot::setup_fpu(&vm.vcpus[0])?;
boot::setup_msrs(&vm.vcpus[0], None)?;
boot::setup_lapic(&vm.vcpus[0], true)?;
vm.vcpus[0]
.set_mp_state(kvm_bindings::kvm_mp_state {
mp_state: kvm_bindings::KVM_MP_STATE_RUNNABLE,
})
.context("set BSP mp_state")?;
tracing::debug!(elapsed_us = t0.elapsed().as_micros(), "bsp_setup");
let t0 = Instant::now();
for vcpu in &vm.vcpus[1..] {
boot::setup_fpu(vcpu)?;
boot::setup_lapic(vcpu, false)?;
vcpu.set_mp_state(kvm_bindings::kvm_mp_state {
mp_state: kvm_bindings::KVM_MP_STATE_UNINITIALIZED,
})
.context("set AP mp_state")?;
}
tracing::debug!(
elapsed_us = t0.elapsed().as_micros(),
ap_count = vm.vcpus.len().saturating_sub(1),
"ap_setup"
);
Ok(())
}
#[allow(clippy::type_complexity)]
fn run_vm(&self, start: Instant, mut vm: kvm::KtstrKvm) -> Result<VmRunState> {
let com1 = Arc::new(PiMutex::new(console::Serial::new(console::COM1_BASE)));
let com2 = Arc::new(PiMutex::new(console::Serial::new(console::COM2_BASE)));
#[cfg(target_arch = "x86_64")]
if !vm.split_irqchip {
vm.vm_fd
.register_irqfd(com1.lock().irq_evt(), console::COM1_IRQ)
.context("register COM1 irqfd")?;
vm.vm_fd
.register_irqfd(com2.lock().irq_evt(), console::COM2_IRQ)
.context("register COM2 irqfd")?;
}
#[cfg(target_arch = "aarch64")]
{
vm.vm_fd
.register_irqfd(com1.lock().irq_evt(), kvm::SERIAL_IRQ)
.context("register serial irqfd")?;
vm.vm_fd
.register_irqfd(com2.lock().irq_evt(), kvm::SERIAL2_IRQ)
.context("register serial2 irqfd")?;
}
let kill = Arc::new(AtomicBool::new(false));
let has_immediate_exit = vm.has_immediate_exit;
let mut vcpus = std::mem::take(&mut vm.vcpus);
let mut bsp = vcpus.remove(0);
let pin_targets: Vec<Option<usize>> = if let Some(ref plan) = self.pinning_plan {
let total = self.topology.total_cpus() as usize;
let mut targets = vec![None; total];
for &(vcpu_id, host_cpu) in &plan.assignments {
if (vcpu_id as usize) < total {
targets[vcpu_id as usize] = Some(host_cpu);
}
}
targets
} else {
Vec::new()
};
let ap_pins: Vec<Option<usize>> = if pin_targets.len() > 1 {
pin_targets[1..].to_vec()
} else {
vec![None; vcpus.len()]
};
let ap_threads = self.spawn_ap_threads(
vcpus,
has_immediate_exit,
&com1,
&com2,
None,
&kill,
&ap_pins,
)?;
if let Some(Some(host_cpu)) = pin_targets.first() {
pin_current_thread(*host_cpu, "BSP (vCPU 0)");
}
if self.performance_mode {
set_rt_priority(1, "BSP (vCPU 0)");
}
let vcpu_pthreads = {
let mut pts = Vec::with_capacity(1 + ap_threads.len());
pts.push(unsafe { libc::pthread_self() } as libc::pthread_t);
for vt in &ap_threads {
pts.push(vt.handle.as_pthread_t() as libc::pthread_t);
}
pts
};
let monitor_handle = self.start_monitor(&vm, &kill, start, vcpu_pthreads)?;
let bpf_write_handle = self.start_bpf_map_write(&vm, &kill)?;
register_vcpu_signal_handler();
let timeout = self.timeout;
let bsp_ie = if has_immediate_exit {
Some(ImmediateExitHandle::from_vcpu(&mut bsp))
} else {
None
};
let bsp_tid = unsafe { libc::pthread_self() };
let bsp_done = Arc::new(AtomicBool::new(false));
let bsp_done_for_wd = bsp_done.clone();
let kill_for_watchdog = kill.clone();
let rt_watchdog = self.performance_mode;
let wd_service_cpu = self.pinning_plan.as_ref().and_then(|p| p.service_cpu);
let wd_shm = if self.shm_size > 0 {
vm.guest_mem
.get_host_address(GuestAddress(DRAM_BASE))
.ok()
.map(|host_base| {
let mem_size = (self.effective_memory_mb(&vm.guest_mem) as u64) << 20;
let mem = monitor::reader::GuestMem::new(host_base, mem_size);
let shm_base = mem_size - self.shm_size;
(mem, shm_base)
})
} else {
None
};
let watchdog = std::thread::Builder::new()
.name("vmm-watchdog".into())
.spawn(move || {
if let Some(cpu) = wd_service_cpu {
pin_current_thread(cpu, "watchdog");
}
if rt_watchdog {
set_rt_priority(2, "watchdog");
}
let hard_deadline = Instant::now() + timeout;
let soft_deadline = if timeout > Duration::from_secs(5) {
Some(hard_deadline - Duration::from_secs(3))
} else {
None
};
let mut soft_fired = false;
eprintln!("watchdog: started, timeout={timeout:?}");
loop {
if bsp_done_for_wd.load(Ordering::Acquire) {
eprintln!("watchdog: BSP done, returning");
return;
}
if kill_for_watchdog.load(Ordering::Acquire) || Instant::now() >= hard_deadline
{
if bsp_done_for_wd.load(Ordering::Acquire) {
eprintln!("watchdog: BSP already done, returning");
return;
}
let reason = if Instant::now() >= hard_deadline {
"hard timeout expired"
} else {
"kill set by AP"
};
eprintln!("watchdog: {reason}, kicking BSP");
if let Some(ref ie) = bsp_ie {
ie.set(1);
std::sync::atomic::fence(Ordering::Release);
}
unsafe {
libc::pthread_kill(bsp_tid, vcpu_signal());
}
eprintln!("watchdog: BSP kicked");
return;
}
if !soft_fired && soft_deadline.is_some_and(|d| Instant::now() >= d) {
soft_fired = true;
if let Some((ref mem, shm_base)) = wd_shm {
eprintln!("watchdog: soft deadline, requesting graceful shutdown");
shm_ring::signal_guest_value(
mem,
shm_base,
0,
shm_ring::SIGNAL_SHUTDOWN_REQ,
);
}
}
std::thread::sleep(Duration::from_millis(100));
}
})
.context("spawn watchdog thread")?;
eprintln!("BSP: entering run loop");
let (exit_code, timed_out) = self.run_bsp_loop(
&mut bsp,
&com1,
&com2,
None,
&kill,
has_immediate_exit,
start,
timeout,
);
bsp_done.store(true, Ordering::Release);
eprintln!("BSP: exited run loop, code={exit_code} timed_out={timed_out}");
let _ = watchdog.join();
Ok(VmRunState {
exit_code,
timed_out,
ap_threads,
monitor_handle,
bpf_write_handle,
com1,
com2,
kill,
vm,
})
}
#[allow(clippy::too_many_arguments)]
fn spawn_ap_threads(
&self,
vcpus: Vec<kvm_ioctls::VcpuFd>,
has_immediate_exit: bool,
com1: &Arc<PiMutex<console::Serial>>,
com2: &Arc<PiMutex<console::Serial>>,
virtio_con: Option<&Arc<PiMutex<virtio_console::VirtioConsole>>>,
kill: &Arc<AtomicBool>,
pin_targets: &[Option<usize>],
) -> Result<Vec<VcpuThread>> {
let mut ap_threads: Vec<VcpuThread> = Vec::new();
for (i, mut vcpu) in vcpus.into_iter().enumerate() {
let ie_handle = if has_immediate_exit {
Some(ImmediateExitHandle::from_vcpu(&mut vcpu))
} else {
None
};
let kill_clone = kill.clone();
let com1_clone = com1.clone();
let com2_clone = com2.clone();
let vc_clone = virtio_con.cloned();
let exited = Arc::new(AtomicBool::new(false));
let exited_clone = exited.clone();
let pin_cpu = pin_targets.get(i).copied().flatten();
let rt = self.performance_mode;
let handle = std::thread::Builder::new()
.name(format!("vcpu-{}", i + 1))
.spawn(move || {
register_vcpu_signal_handler();
if let Some(cpu) = pin_cpu {
pin_current_thread(cpu, &format!("vCPU {}", i + 1));
}
if rt {
set_rt_priority(1, &format!("vCPU {}", i + 1));
}
vcpu_run_loop_unified(
&mut vcpu,
&com1_clone,
&com2_clone,
vc_clone.as_ref(),
&kill_clone,
);
exited_clone.store(true, Ordering::Release);
vcpu
})
.with_context(|| format!("spawn vCPU {} thread", i + 1))?;
ap_threads.push(VcpuThread {
handle,
exited,
immediate_exit: ie_handle,
});
}
Ok(ap_threads)
}
#[allow(clippy::type_complexity)]
fn start_monitor(
&self,
vm: &kvm::KtstrKvm,
kill: &Arc<AtomicBool>,
start: Instant,
vcpu_pthreads: Vec<libc::pthread_t>,
) -> Result<Option<JoinHandle<(Vec<monitor::MonitorSample>, shm_ring::ShmDrainResult)>>> {
let Some(vmlinux) = find_vmlinux(&self.kernel) else {
return Ok(None);
};
let offsets = monitor::btf_offsets::KernelOffsets::from_vmlinux(&vmlinux);
let symbols = monitor::symbols::KernelSymbols::from_vmlinux(&vmlinux);
let (Ok(offsets), Ok(symbols)) = (offsets, symbols) else {
return Ok(None);
};
let host_base = vm
.guest_mem
.get_host_address(GuestAddress(DRAM_BASE))
.unwrap();
let mem_size = (self.effective_memory_mb(&vm.guest_mem) as u64) << 20;
let mem = monitor::reader::GuestMem::new(host_base, mem_size);
let num_cpus = self.topology.total_cpus();
let kill_clone = kill.clone();
let dump_trigger =
self.monitor_thresholds
.filter(|_| self.shm_size > 0)
.map(|thresholds| {
let shm_base_pa = mem_size - self.shm_size;
monitor::reader::DumpTrigger {
shm_base_pa,
thresholds,
}
});
let hz = monitor::guest_kernel_hz(Some(&self.kernel));
let watchdog_jiffies = self.watchdog_timeout.map(|d| d.as_secs() * hz);
let preemption_threshold_ns = monitor::vcpu_preemption_threshold_ns(Some(&self.kernel));
let rt_monitor = self.performance_mode;
let service_cpu = self.pinning_plan.as_ref().and_then(|p| p.service_cpu);
let shm_base_pa = if self.shm_size > 0 {
Some(mem_size - self.shm_size)
} else {
None
};
let vmlinux_clone = vmlinux.clone();
let handle = std::thread::Builder::new()
.name("vmm-monitor".into())
.spawn(move || {
if let Some(cpu) = service_cpu {
pin_current_thread(cpu, "monitor");
}
if rt_monitor {
set_rt_priority(2, "monitor");
}
std::thread::sleep(Duration::from_millis(500));
let page_offset = monitor::symbols::resolve_page_offset(&mem, &symbols);
let pco_pa = monitor::symbols::text_kva_to_pa(symbols.per_cpu_offset);
let offsets_arr = unsafe {
monitor::symbols::read_per_cpu_offsets(mem.base_ptr(), pco_pa, num_cpus)
};
let rq_pas =
monitor::symbols::compute_rq_pas(symbols.runqueues, &offsets_arr, page_offset);
let watchdog_override = watchdog_jiffies.and_then(|jiffies| {
symbols.scx_watchdog_timeout.map(|kva| {
let pa = monitor::symbols::text_kva_to_pa(kva);
monitor::reader::WatchdogOverride { pa, jiffies }
})
});
if watchdog_jiffies.is_some() && watchdog_override.is_none() {
tracing::warn!("scx_watchdog_timeout symbol not found in vmlinux",);
}
let event_pcpu_pas = symbols
.scx_root
.zip(offsets.event_offsets.as_ref())
.and_then(|(scx_root_kva, ev)| {
let scx_root_pa = monitor::symbols::text_kva_to_pa(scx_root_kva);
monitor::reader::resolve_event_pcpu_pas(
&mem,
scx_root_pa,
ev,
&offsets_arr,
page_offset,
)
});
let vcpu_timing = monitor::reader::VcpuTiming {
pthreads: vcpu_pthreads,
};
if let Some(base) = shm_base_pa {
let slot_pa = base + shm_ring::SIGNAL_SLOT_BASE as u64 + 1;
let deadline = start + Duration::from_secs(30);
while std::time::Instant::now() < deadline {
if kill_clone.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
if mem.read_u8(slot_pa, 0) != 0 {
break;
}
std::thread::sleep(Duration::from_millis(100));
}
}
let prog_stats_ctx =
monitor::btf_offsets::BpfProgOffsets::from_vmlinux(&vmlinux_clone)
.ok()
.and_then(|prog_offsets| {
let prog_idr_kva = symbols.prog_idr?;
let cached = monitor::bpf_prog::discover_struct_ops_stats(
&mem,
monitor::symbols::text_kva_to_pa(symbols.init_top_pgt.unwrap_or(0)),
page_offset,
prog_idr_kva,
&prog_offsets,
monitor::symbols::resolve_pgtable_l5(&mem, &symbols),
);
if cached.is_empty() {
return None;
}
Some(monitor::reader::ProgStatsCtx {
cached,
per_cpu_offsets: offsets_arr.clone(),
page_offset,
offsets: prog_offsets,
})
});
let mon_cfg = monitor::reader::MonitorConfig {
event_pcpu_pas: event_pcpu_pas.as_deref(),
dump_trigger: dump_trigger.as_ref(),
watchdog_override: watchdog_override.as_ref(),
vcpu_timing: Some(&vcpu_timing),
preemption_threshold_ns,
shm_base_pa,
prog_stats_ctx: prog_stats_ctx.as_ref(),
page_offset,
};
monitor::reader::monitor_loop(
&mem,
&rq_pas,
&offsets,
Duration::from_millis(100),
&kill_clone,
start,
&mon_cfg,
)
})
.context("spawn monitor thread")?;
Ok(Some(handle))
}
fn start_bpf_map_write(
&self,
vm: &kvm::KtstrKvm,
kill: &Arc<AtomicBool>,
) -> Result<Option<JoinHandle<()>>> {
let Some(ref params) = self.bpf_map_write else {
return Ok(None);
};
let Some(vmlinux) = find_vmlinux(&self.kernel) else {
eprintln!("bpf_map_write: vmlinux not found, skipping");
return Ok(None);
};
let host_base = vm
.guest_mem
.get_host_address(GuestAddress(DRAM_BASE))
.unwrap();
let mem_size = (self.effective_memory_mb(&vm.guest_mem) as u64) << 20;
let mem = monitor::reader::GuestMem::new(host_base, mem_size);
let kill_clone = kill.clone();
let params = params.clone();
let shm_size = self.shm_size;
let handle = std::thread::Builder::new()
.name("bpf-map-write".into())
.spawn(move || {
if kill_clone.load(Ordering::Acquire) {
return;
}
let phase1_deadline =
std::time::Instant::now() + std::time::Duration::from_secs(30);
let accessor = loop {
match monitor::bpf_map::BpfMapAccessorOwned::new(&mem, &vmlinux) {
Ok(a) => break a,
Err(e) => {
if kill_clone.load(Ordering::Acquire) {
return;
}
if std::time::Instant::now() >= phase1_deadline {
eprintln!("bpf_map_write: accessor init timed out: {e:#}");
return;
}
std::thread::sleep(std::time::Duration::from_millis(200));
}
}
};
let retry_deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
let mut attempt = 0u32;
let map_info = loop {
attempt += 1;
if let Some(info) = accessor.find_map(¶ms.map_name_suffix) {
break info;
}
if kill_clone.load(Ordering::Acquire) {
eprintln!("bpf_map_write: VM exited during map search");
return;
}
if std::time::Instant::now() >= retry_deadline {
eprintln!(
"bpf_map_write: map *{} not found after {} attempts",
params.map_name_suffix, attempt,
);
return;
}
std::thread::sleep(std::time::Duration::from_millis(200));
};
eprintln!(
"bpf_map_write: map '{}' found after {} attempts",
map_info.name, attempt,
);
if shm_size > 0 {
let shm_base = mem.size() - shm_size;
let ready_deadline =
std::time::Instant::now() + std::time::Duration::from_secs(30);
loop {
if kill_clone.load(Ordering::Acquire) {
return;
}
if std::time::Instant::now() >= ready_deadline {
eprintln!("bpf_map_write: timed out waiting for probes ready");
return;
}
let val = mem.read_u8(shm_base, shm_ring::SIGNAL_SLOT_BASE + 1);
if val >= shm_ring::SIGNAL_PROBES_READY {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
eprintln!("bpf_map_write: guest probes ready, writing crash trigger");
}
let all_maps = accessor.maps();
eprintln!(
"bpf_map_write: maps() found {} map(s): [{}]",
all_maps.len(),
all_maps
.iter()
.map(|m| format!("{}(type={})", m.name, m.map_type))
.collect::<Vec<_>>()
.join(", "),
);
let before = accessor.read_value_u32(&map_info, params.offset);
let ok = accessor.write_value_u32(&map_info, params.offset, params.value);
let after = accessor.read_value_u32(&map_info, params.offset);
eprintln!(
"bpf_map_write: map '{}' write={} (value={} offset={} before={:?} after={:?})",
map_info.name, ok, params.value, params.offset, before, after,
);
if ok && shm_size > 0 {
let shm_base = mem.size() - shm_size;
shm_ring::signal_guest(&mem, shm_base, 0);
eprintln!("bpf_map_write: signaled slot 0");
}
})
.context("spawn bpf-map-write thread")?;
Ok(Some(handle))
}
#[allow(clippy::too_many_arguments)]
fn run_bsp_loop(
&self,
bsp: &mut kvm_ioctls::VcpuFd,
com1: &Arc<PiMutex<console::Serial>>,
com2: &Arc<PiMutex<console::Serial>>,
virtio_con: Option<&Arc<PiMutex<virtio_console::VirtioConsole>>>,
kill: &Arc<AtomicBool>,
has_immediate_exit: bool,
start: Instant,
timeout: Duration,
) -> (i32, bool) {
let mut exit_code: i32 = -1;
loop {
if start.elapsed() > timeout {
return (exit_code, true);
}
if kill.load(Ordering::Acquire) {
break;
}
match bsp.run() {
Ok(mut exit) => {
if matches!(exit, VcpuExit::Hlt) {
if kill.load(Ordering::Acquire) {
break;
}
continue;
}
match classify_exit(com1, com2, virtio_con.map(|a| a.as_ref()), &mut exit) {
Some(ExitAction::Continue) | None => {}
Some(ExitAction::Shutdown) => {
exit_code = 0;
break;
}
Some(ExitAction::Fatal(reason)) => {
if let Some(r) = reason {
tracing::error!(r, "BSP VM entry failed");
} else {
tracing::error!("BSP internal error");
}
break;
}
}
}
Err(e) => {
if e.errno() == libc::EAGAIN || e.errno() == libc::EINTR {
if has_immediate_exit {
bsp.set_kvm_immediate_exit(0);
}
continue;
}
tracing::error!(%e, "BSP run failed");
break;
}
}
}
(exit_code, false)
}
fn collect_results(&self, start: Instant, run: VmRunState) -> Result<VmResult> {
let mut exit_code = run.exit_code;
let timed_out = run.timed_out;
run.kill.store(true, Ordering::Release);
for vt in &run.ap_threads {
if !vt.exited.load(Ordering::Acquire) {
vt.kick();
}
}
for vt in run.ap_threads {
vt.wait_for_exit(Duration::from_secs(5));
let _ = vt.handle.join();
}
let (monitor_report, mid_flight_drain) =
match run.monitor_handle.and_then(|h| h.join().ok()) {
Some((samples, drain)) => {
let preemption_threshold_ns =
monitor::vcpu_preemption_threshold_ns(Some(&self.kernel));
let summary = monitor::MonitorSummary::from_samples_with_threshold(
&samples,
preemption_threshold_ns,
);
let report = monitor::MonitorReport {
samples,
summary,
preemption_threshold_ns,
};
(Some(report), drain)
}
None => (None, shm_ring::ShmDrainResult::default()),
};
if let Some(h) = run.bpf_write_handle {
let _ = h.join();
}
let (shm_data, stimulus_events) = if self.shm_size > 0 {
let mem_size = (self.effective_memory_mb(&run.vm.guest_mem) as u64) << 20;
let shm_base = DRAM_BASE + mem_size - self.shm_size;
let shm_size = self.shm_size as usize;
let mut shm_buf = vec![0u8; shm_size];
run.vm
.guest_mem
.read_slice(&mut shm_buf, GuestAddress(shm_base))
.context("read SHM region")?;
let post_mortem = shm_ring::shm_drain(&shm_buf, 0);
let mut all_entries = mid_flight_drain.entries;
all_entries.extend(post_mortem.entries);
let drops = mid_flight_drain.drops.max(post_mortem.drops);
let events: Vec<shm_ring::StimulusEvent> = all_entries
.iter()
.filter(|e| e.msg_type == shm_ring::MSG_TYPE_STIMULUS && e.crc_ok)
.filter_map(|e| shm_ring::StimulusEvent::from_payload(&e.payload))
.collect();
(
Some(shm_ring::ShmDrainResult {
entries: all_entries,
drops,
}),
events,
)
} else {
(None, Vec::new())
};
let app_output = run.com2.lock().output();
let console_output = run.com1.lock().output();
let shm_exit = shm_data.as_ref().and_then(|d| {
d.entries
.iter()
.rev()
.find(|e| e.msg_type == shm_ring::MSG_TYPE_EXIT && e.crc_ok && e.payload.len() == 4)
.map(|e| i32::from_ne_bytes(e.payload[..4].try_into().unwrap()))
});
if let Some(code) = shm_exit {
exit_code = code;
} else if let Some(line) = app_output
.lines()
.rev()
.find(|l| l.starts_with("KTSTR_EXIT="))
&& let Ok(code) = line.trim_start_matches("KTSTR_EXIT=").trim().parse::<i32>()
{
exit_code = code;
}
let crash_message = shm_data.as_ref().and_then(|d| {
d.entries
.iter()
.find(|e| e.msg_type == shm_ring::MSG_TYPE_CRASH && e.crc_ok)
.and_then(|e| String::from_utf8(e.payload.clone()).ok())
});
let verifier_stats = self.collect_verifier_stats(&run.vm);
Ok(VmResult {
success: !timed_out && exit_code == 0,
exit_code,
duration: start.elapsed(),
timed_out,
output: app_output,
stderr: console_output,
monitor: monitor_report,
shm_data,
stimulus_events,
verifier_stats,
kvm_stats: None,
crash_message,
})
}
fn collect_verifier_stats(
&self,
vm: &kvm::KtstrKvm,
) -> Vec<monitor::bpf_prog::ProgVerifierStats> {
let vmlinux = match find_vmlinux(&self.kernel) {
Some(v) => v,
None => return Vec::new(),
};
let host_base = match vm.guest_mem.get_host_address(GuestAddress(DRAM_BASE)) {
Ok(ptr) => ptr,
Err(_) => return Vec::new(),
};
let mem_size = (self.effective_memory_mb(&vm.guest_mem) as u64) << 20;
let mem = monitor::reader::GuestMem::new(host_base, mem_size);
let kernel = match monitor::guest::GuestKernel::new(&mem, &vmlinux) {
Ok(k) => k,
Err(_) => return Vec::new(),
};
let accessor =
match monitor::bpf_prog::BpfProgAccessor::from_guest_kernel(&kernel, &vmlinux) {
Ok(a) => a,
Err(_) => return Vec::new(),
};
accessor.struct_ops_progs()
}
}
#[cfg(target_arch = "aarch64")]
impl KtstrVm {
fn setup_memory_aarch64(
&self,
vm: &mut kvm::KtstrKvm,
kernel_result: Option<boot::KernelLoadResult>,
initramfs_handle: Option<JoinHandle<Result<(BaseRef, BaseKey)>>>,
) -> Result<boot::KernelLoadResult> {
let kernel_result = if let Some(kr) = kernel_result {
kr
} else {
if let Some(handle) = initramfs_handle {
let (base, _key) = handle
.join()
.map_err(|_| anyhow::anyhow!("initramfs-resolve thread panicked"))??;
let base_bytes: &[u8] = base.as_ref();
let enable_refs: Vec<&str> =
self.sched_enable_cmds.iter().map(|s| s.as_str()).collect();
let disable_refs: Vec<&str> =
self.sched_disable_cmds.iter().map(|s| s.as_str()).collect();
let suffix = initramfs::build_suffix_full(
base_bytes.len(),
&self.run_args,
&self.sched_args,
&enable_refs,
&disable_refs,
self.exec_cmd.as_deref(),
)?;
let uncompressed_size = base_bytes.len() + suffix.len();
let mut full = Vec::with_capacity(base_bytes.len() + suffix.len());
full.extend_from_slice(base_bytes);
full.extend_from_slice(&suffix);
let initrd_data = initramfs::lz4_legacy_compress(&full);
let total_size = initrd_data.len() as u64;
let kernel_init_size = read_kernel_init_size(&self.kernel).unwrap_or(0);
let budget = MemoryBudget {
uncompressed_initramfs_bytes: uncompressed_size as u64,
compressed_initrd_bytes: total_size,
kernel_init_size,
shm_bytes: self.shm_size,
};
let memory_mb = initramfs_min_memory_mb(&budget).max(self.memory_min_mb);
vm.allocate_and_register_memory(memory_mb)
.with_context(|| {
format!("allocate deferred memory ({memory_mb}MB, aarch64)")
})?;
let kr = boot::load_kernel(&vm.guest_mem, &self.kernel)
.context("load kernel (aarch64)")?;
let load_addr = aarch64_initrd_addr(memory_mb, self.shm_size, total_size);
initramfs::load_initramfs_parts(&vm.guest_mem, &[&initrd_data], load_addr)?;
return self.finish_aarch64_setup(vm, kr, Some(load_addr), Some(total_size as u32));
} else {
let memory_mb = 256u32;
vm.allocate_and_register_memory(memory_mb)
.context("allocate deferred memory (no initramfs, aarch64)")?;
let kr = boot::load_kernel(&vm.guest_mem, &self.kernel)
.context("load kernel (aarch64)")?;
return self.finish_aarch64_setup(vm, kr, None, None);
}
};
let (initrd_addr, initrd_size) = match initramfs_handle {
Some(handle) => {
let memory_mb = self.memory_mb.unwrap();
let (base, _key) = handle
.join()
.map_err(|_| anyhow::anyhow!("initramfs-resolve thread panicked"))??;
let base_bytes: &[u8] = base.as_ref();
let enable_refs: Vec<&str> =
self.sched_enable_cmds.iter().map(|s| s.as_str()).collect();
let disable_refs: Vec<&str> =
self.sched_disable_cmds.iter().map(|s| s.as_str()).collect();
let suffix = initramfs::build_suffix_full(
base_bytes.len(),
&self.run_args,
&self.sched_args,
&enable_refs,
&disable_refs,
self.exec_cmd.as_deref(),
)?;
let mut full = Vec::with_capacity(base_bytes.len() + suffix.len());
full.extend_from_slice(base_bytes);
full.extend_from_slice(&suffix);
let initrd_data = initramfs::lz4_legacy_compress(&full);
let total_size = initrd_data.len() as u64;
let load_addr = aarch64_initrd_addr(memory_mb, self.shm_size, total_size);
initramfs::load_initramfs_parts(&vm.guest_mem, &[&initrd_data], load_addr)?;
(Some(load_addr), Some(total_size as u32))
}
None => (None, None),
};
self.finish_aarch64_setup(vm, kernel_result, initrd_addr, initrd_size)
}
#[cfg(target_arch = "aarch64")]
fn finish_aarch64_setup(
&self,
vm: &kvm::KtstrKvm,
kernel_result: boot::KernelLoadResult,
initrd_addr: Option<u64>,
initrd_size: Option<u32>,
) -> Result<boot::KernelLoadResult> {
let memory_mb = self.effective_memory_mb(&vm.guest_mem);
let mut cmdline = concat!(
"console=ttyS0 ",
"nomodules mitigations=off ",
"random.trust_cpu=on swiotlb=noforce ",
"panic=-1 iomem=relaxed nokaslr lockdown=none ",
"sysctl.kernel.unprivileged_bpf_disabled=0 ",
"sysctl.kernel.sched_schedstats=1 ",
"kfence.sample_interval=0",
)
.to_string();
cmdline.push_str(" earlycon=uart,mmio,0x09000000");
let verbose = std::env::var("KTSTR_VERBOSE")
.map(|v| v == "1")
.unwrap_or(false)
|| std::env::var("RUST_BACKTRACE").is_ok_and(|v| v == "1" || v == "full");
if verbose {
cmdline.push_str(" loglevel=7");
} else {
cmdline.push_str(" loglevel=0");
}
if self.init_binary.is_some() {
cmdline.push_str(" rdinit=/init initramfs_options=size=90%");
}
if self.shm_size > 0 {
let mem_size = (memory_mb as u64) << 20;
let shm_base = kvm::DRAM_START + mem_size - self.shm_size;
cmdline.push_str(&format!(
" KTSTR_SHM_BASE={:#x} KTSTR_SHM_SIZE={:#x}",
shm_base, self.shm_size
));
}
if !self.cmdline_extra.is_empty() {
cmdline.push(' ');
cmdline.push_str(&self.cmdline_extra);
}
let t0 = Instant::now();
boot::validate_cmdline(&cmdline)?;
let fdt_addr = aarch64::fdt::fdt_address(memory_mb, self.shm_size);
let mpidrs =
aarch64::topology::read_mpidrs(&vm.vcpus).context("read vCPU MPIDRs for FDT")?;
let dtb = aarch64::fdt::create_fdt(
&mpidrs,
memory_mb,
&cmdline,
initrd_addr,
initrd_size,
self.shm_size,
)
.context("create FDT")?;
vm.guest_mem
.write_slice(&dtb, GuestAddress(fdt_addr))
.context("write FDT to guest memory")?;
tracing::debug!(
elapsed_us = t0.elapsed().as_micros(),
fdt_addr,
fdt_len = dtb.len(),
"cmdline_fdt",
);
let t0 = Instant::now();
if self.shm_size > 0 {
let mem_size = (memory_mb as u64) << 20;
let shm_base = kvm::DRAM_START + mem_size - self.shm_size;
self.init_shm_region(&vm.guest_mem, shm_base)?;
}
tracing::debug!(elapsed_us = t0.elapsed().as_micros(), "shm_ring_init");
Ok(kernel_result)
}
#[cfg(target_arch = "aarch64")]
fn setup_vcpus_aarch64(&self, vm: &kvm::KtstrKvm, kernel_entry: u64) -> Result<()> {
let t0 = Instant::now();
let memory_mb = self.effective_memory_mb(&vm.guest_mem);
let fdt_addr = aarch64::fdt::fdt_address(memory_mb, self.shm_size);
boot::setup_regs(&vm.vcpus[0], kernel_entry, fdt_addr)?;
tracing::debug!(elapsed_us = t0.elapsed().as_micros(), "bsp_setup");
Ok(())
}
}
#[cfg(target_arch = "aarch64")]
fn dispatch_mmio_write(
com1: &PiMutex<console::Serial>,
com2: &PiMutex<console::Serial>,
virtio_con: Option<&PiMutex<virtio_console::VirtioConsole>>,
addr: u64,
data: &[u8],
) -> bool {
if let Some(offset) = mmio_serial_offset(addr, kvm::SERIAL_MMIO_BASE) {
if let Some(&byte) = data.first() {
com1.lock().inner_write(offset, byte);
}
} else if let Some(offset) = mmio_serial_offset(addr, kvm::SERIAL2_MMIO_BASE)
&& let Some(&byte) = data.first()
{
com2.lock().inner_write(offset, byte);
} else if let Some(vc) = virtio_con {
let base = kvm::VIRTIO_CONSOLE_MMIO_BASE;
if addr >= base && addr < base + virtio_console::VIRTIO_MMIO_SIZE {
vc.lock().mmio_write(addr - base, data);
}
}
false
}
#[cfg(target_arch = "aarch64")]
fn dispatch_mmio_read(
com1: &PiMutex<console::Serial>,
com2: &PiMutex<console::Serial>,
virtio_con: Option<&PiMutex<virtio_console::VirtioConsole>>,
addr: u64,
data: &mut [u8],
) {
if let Some(offset) = mmio_serial_offset(addr, kvm::SERIAL_MMIO_BASE) {
if let Some(first) = data.first_mut() {
*first = com1.lock().inner_read(offset);
}
} else if let Some(offset) = mmio_serial_offset(addr, kvm::SERIAL2_MMIO_BASE) {
if let Some(first) = data.first_mut() {
*first = com2.lock().inner_read(offset);
}
} else if let Some(vc) = virtio_con
&& (kvm::VIRTIO_CONSOLE_MMIO_BASE
..kvm::VIRTIO_CONSOLE_MMIO_BASE + virtio_console::VIRTIO_MMIO_SIZE)
.contains(&addr)
{
vc.lock()
.mmio_read(addr - kvm::VIRTIO_CONSOLE_MMIO_BASE, data);
} else {
for b in data.iter_mut() {
*b = 0xff;
}
}
}
#[cfg(target_arch = "aarch64")]
fn mmio_serial_offset(addr: u64, base: u64) -> Option<u8> {
let size = kvm::SERIAL_MMIO_SIZE;
if addr >= base && addr < base + size {
Some((addr - base) as u8)
} else {
None
}
}
fn vcpu_run_loop_unified(
vcpu: &mut kvm_ioctls::VcpuFd,
com1: &Arc<PiMutex<console::Serial>>,
com2: &Arc<PiMutex<console::Serial>>,
virtio_con: Option<&Arc<PiMutex<virtio_console::VirtioConsole>>>,
kill: &Arc<AtomicBool>,
) {
loop {
if kill.load(Ordering::Acquire) {
break;
}
match vcpu.run() {
Ok(mut exit) => {
if matches!(exit, VcpuExit::Hlt) {
if kill.load(Ordering::Acquire) {
break;
}
continue;
}
match classify_exit(com1, com2, virtio_con.map(|a| a.as_ref()), &mut exit) {
Some(ExitAction::Continue) | None => {}
Some(ExitAction::Shutdown) => {
kill.store(true, Ordering::Release);
break;
}
Some(ExitAction::Fatal(_)) => break,
}
}
Err(e) => {
if e.errno() == libc::EINTR || e.errno() == libc::EAGAIN {
vcpu.set_kvm_immediate_exit(0);
if kill.load(Ordering::Acquire) {
break;
}
continue;
}
if kill.load(Ordering::Acquire) {
break;
}
}
}
if kill.load(Ordering::Acquire) {
break;
}
}
}
const KVM_SYSTEM_EVENT_SHUTDOWN: u32 = 1;
const KVM_SYSTEM_EVENT_RESET: u32 = 2;
enum ExitAction {
Continue,
Shutdown,
Fatal(Option<u64>),
}
fn classify_exit(
com1: &PiMutex<console::Serial>,
com2: &PiMutex<console::Serial>,
virtio_con: Option<&PiMutex<virtio_console::VirtioConsole>>,
exit: &mut VcpuExit,
) -> Option<ExitAction> {
match exit {
#[cfg(target_arch = "x86_64")]
VcpuExit::IoOut(port, data) => {
if dispatch_io_out(com1, com2, *port, data) {
Some(ExitAction::Shutdown)
} else {
Some(ExitAction::Continue)
}
}
#[cfg(target_arch = "x86_64")]
VcpuExit::IoIn(port, data) => {
dispatch_io_in(com1, com2, *port, data);
Some(ExitAction::Continue)
}
#[cfg(target_arch = "aarch64")]
VcpuExit::MmioWrite(addr, data) => {
if dispatch_mmio_write(com1, com2, virtio_con, *addr, data) {
Some(ExitAction::Shutdown)
} else {
Some(ExitAction::Continue)
}
}
#[cfg(target_arch = "aarch64")]
VcpuExit::MmioRead(addr, data) => {
dispatch_mmio_read(com1, com2, virtio_con, *addr, data);
Some(ExitAction::Continue)
}
VcpuExit::Hlt => None,
VcpuExit::Shutdown => Some(ExitAction::Shutdown),
VcpuExit::SystemEvent(event_type, _) => {
if *event_type == KVM_SYSTEM_EVENT_SHUTDOWN || *event_type == KVM_SYSTEM_EVENT_RESET {
Some(ExitAction::Shutdown)
} else {
Some(ExitAction::Continue)
}
}
VcpuExit::FailEntry(reason, _cpu) => Some(ExitAction::Fatal(Some(*reason))),
VcpuExit::InternalError => Some(ExitAction::Fatal(None)),
#[cfg(target_arch = "x86_64")]
VcpuExit::MmioRead(addr, data) => {
if let Some(vc) = virtio_con {
let base = kvm::VIRTIO_CONSOLE_MMIO_BASE;
if *addr >= base && *addr < base + virtio_console::VIRTIO_MMIO_SIZE {
vc.lock().mmio_read(*addr - base, data);
return Some(ExitAction::Continue);
}
}
for b in data.iter_mut() {
*b = 0xff;
}
Some(ExitAction::Continue)
}
#[cfg(target_arch = "x86_64")]
VcpuExit::MmioWrite(addr, data) => {
if let Some(vc) = virtio_con {
let base = kvm::VIRTIO_CONSOLE_MMIO_BASE;
if *addr >= base && *addr < base + virtio_console::VIRTIO_MMIO_SIZE {
vc.lock().mmio_write(*addr - base, data);
return Some(ExitAction::Continue);
}
}
Some(ExitAction::Continue)
}
_ => None,
}
}
#[cfg(target_arch = "x86_64")]
const I8042_DATA_PORT: u16 = 0x60;
#[cfg(target_arch = "x86_64")]
const I8042_CMD_PORT: u16 = 0x64;
#[cfg(target_arch = "x86_64")]
const I8042_CMD_RESET_CPU: u8 = 0xFE;
#[cfg(target_arch = "x86_64")]
fn dispatch_io_out(
com1: &PiMutex<console::Serial>,
com2: &PiMutex<console::Serial>,
port: u16,
data: &[u8],
) -> bool {
if port == I8042_CMD_PORT && data.first() == Some(&I8042_CMD_RESET_CPU) {
return true;
}
if (console::COM1_BASE..console::COM1_BASE + 8).contains(&port) {
com1.lock().handle_out(port, data);
} else if (console::COM2_BASE..console::COM2_BASE + 8).contains(&port) {
com2.lock().handle_out(port, data);
}
false
}
#[cfg(target_arch = "x86_64")]
fn dispatch_io_in(
com1: &PiMutex<console::Serial>,
com2: &PiMutex<console::Serial>,
port: u16,
data: &mut [u8],
) {
match port {
I8042_CMD_PORT => {
if let Some(b) = data.first_mut() {
*b = 0;
}
}
I8042_DATA_PORT => {
if let Some(b) = data.first_mut() {
*b = 0;
}
}
p if (console::COM1_BASE..console::COM1_BASE + 8).contains(&p) => {
com1.lock().handle_in(port, data);
}
p if (console::COM2_BASE..console::COM2_BASE + 8).contains(&p) => {
com2.lock().handle_in(port, data);
}
_ => {}
}
}
#[cfg(target_arch = "x86_64")]
pub(crate) fn find_vmlinux(kernel_path: &Path) -> Option<PathBuf> {
let dir = kernel_path.parent()?;
let candidate = dir.join("vmlinux");
if candidate.exists() {
return Some(candidate);
}
if let Ok(root) = dir.join("../../..").canonicalize() {
let candidate = root.join("vmlinux");
if candidate.exists() {
return Some(candidate);
}
}
if let Some(name) = kernel_path.file_name().and_then(|n| n.to_str()) {
let version = name.strip_prefix("vmlinuz-").unwrap_or(name);
let debug = PathBuf::from(format!("/usr/lib/debug/boot/vmlinux-{version}"));
if debug.exists() {
return Some(debug);
}
}
None
}
#[cfg(not(target_arch = "x86_64"))]
pub(crate) fn find_vmlinux(kernel_path: &Path) -> Option<PathBuf> {
let dir = kernel_path.parent()?;
let candidate = dir.join("vmlinux");
if candidate.exists() {
return Some(candidate);
}
if let Ok(root) = dir.join("../../..").canonicalize() {
let candidate = root.join("vmlinux");
if candidate.exists() {
return Some(candidate);
}
}
if let Some(name) = kernel_path.file_name().and_then(|n| n.to_str()) {
let version = name.strip_prefix("vmlinuz-").unwrap_or(name);
let boot = PathBuf::from(format!("/boot/vmlinux-{version}"));
if boot.exists() {
return Some(boot);
}
let modules = PathBuf::from(format!("/lib/modules/{version}/build/vmlinux"));
if modules.exists() {
return Some(modules);
}
}
if let Some(parent_name) = dir.file_name().and_then(|n| n.to_str()) {
let build = dir.join("build/vmlinux");
if build.exists() {
return Some(build);
}
let boot = PathBuf::from(format!("/boot/vmlinux-{parent_name}"));
if boot.exists() {
return Some(boot);
}
}
None
}
pub struct KtstrVmBuilder {
kernel: Option<PathBuf>,
init_binary: Option<PathBuf>,
scheduler_binary: Option<PathBuf>,
run_args: Vec<String>,
sched_args: Vec<String>,
topology: Topology,
memory_mb: Option<u32>,
memory_min_mb: u32,
cmdline_extra: String,
timeout: Duration,
shm_size: u64,
monitor_thresholds: Option<crate::monitor::MonitorThresholds>,
watchdog_timeout: Option<Duration>,
bpf_map_write: Option<BpfMapWriteParams>,
performance_mode: bool,
sched_enable_cmds: Vec<String>,
sched_disable_cmds: Vec<String>,
include_files: Vec<(String, PathBuf)>,
busybox: bool,
dmesg: bool,
exec_cmd: Option<String>,
}
impl Default for KtstrVmBuilder {
fn default() -> Self {
KtstrVmBuilder {
kernel: None,
init_binary: None,
scheduler_binary: None,
run_args: Vec::new(),
sched_args: Vec::new(),
topology: Topology {
sockets: 1,
cores_per_socket: 1,
threads_per_core: 1,
},
memory_mb: Some(256),
memory_min_mb: 0,
cmdline_extra: String::new(),
timeout: Duration::from_secs(60),
shm_size: 0,
monitor_thresholds: None,
watchdog_timeout: Some(Duration::from_secs(4)),
bpf_map_write: None,
performance_mode: false,
sched_enable_cmds: Vec::new(),
sched_disable_cmds: Vec::new(),
include_files: Vec::new(),
busybox: false,
dmesg: false,
exec_cmd: None,
}
}
}
impl KtstrVmBuilder {
pub fn kernel(mut self, path: impl Into<PathBuf>) -> Self {
self.kernel = Some(path.into());
self
}
pub fn init_binary(mut self, path: impl Into<PathBuf>) -> Self {
self.init_binary = Some(path.into());
self
}
pub fn scheduler_binary(mut self, path: impl Into<PathBuf>) -> Self {
self.scheduler_binary = Some(path.into());
self
}
pub fn run_args(mut self, args: &[String]) -> Self {
self.run_args = args.to_vec();
self
}
#[allow(dead_code)]
pub fn sched_args(mut self, args: &[String]) -> Self {
self.sched_args = args.to_vec();
self
}
#[allow(dead_code)]
pub fn kernel_dir(mut self, path: impl Into<PathBuf>) -> Self {
let dir: PathBuf = path.into();
#[cfg(target_arch = "x86_64")]
{
self.kernel = Some(dir.join("arch/x86/boot/bzImage"));
}
#[cfg(target_arch = "aarch64")]
{
self.kernel = Some(dir.join("arch/arm64/boot/Image"));
}
self
}
pub fn topology(mut self, sockets: u32, cores: u32, threads: u32) -> Self {
self.topology = Topology {
sockets,
cores_per_socket: cores,
threads_per_core: threads,
};
self
}
pub fn memory_mb(mut self, mb: u32) -> Self {
self.memory_mb = Some(mb);
self.memory_min_mb = 0;
self
}
pub fn memory_deferred(mut self) -> Self {
self.memory_mb = None;
self.memory_min_mb = 0;
self
}
pub fn memory_deferred_min(mut self, min_mb: u32) -> Self {
self.memory_mb = None;
self.memory_min_mb = min_mb;
self
}
#[allow(dead_code)]
pub fn cmdline(mut self, extra: &str) -> Self {
self.cmdline_extra = extra.to_string();
self
}
pub fn timeout(mut self, t: Duration) -> Self {
self.timeout = t;
self
}
#[allow(dead_code)]
pub fn shm_size(mut self, bytes: u64) -> Self {
self.shm_size = bytes;
self
}
#[allow(dead_code)]
pub fn monitor_thresholds(mut self, thresholds: crate::monitor::MonitorThresholds) -> Self {
self.monitor_thresholds = Some(thresholds);
self
}
#[allow(dead_code)]
pub fn watchdog_timeout(mut self, timeout: Duration) -> Self {
self.watchdog_timeout = Some(timeout);
self
}
#[allow(dead_code)]
pub fn bpf_map_write(mut self, map_name_suffix: &str, offset: usize, value: u32) -> Self {
self.bpf_map_write = Some(BpfMapWriteParams {
map_name_suffix: map_name_suffix.to_string(),
offset,
value,
});
self
}
#[allow(dead_code)]
pub fn performance_mode(mut self, enabled: bool) -> Self {
self.performance_mode = enabled;
self
}
pub fn sched_enable_cmds(mut self, cmds: &[&str]) -> Self {
self.sched_enable_cmds = cmds.iter().map(|s| s.to_string()).collect();
self
}
pub fn sched_disable_cmds(mut self, cmds: &[&str]) -> Self {
self.sched_disable_cmds = cmds.iter().map(|s| s.to_string()).collect();
self
}
#[allow(dead_code)]
pub fn include_files(mut self, files: Vec<(String, PathBuf)>) -> Self {
self.include_files = files;
self
}
#[allow(dead_code)]
pub fn busybox(mut self, enabled: bool) -> Self {
self.busybox = enabled;
self
}
pub fn dmesg(mut self, enabled: bool) -> Self {
self.dmesg = enabled;
self
}
#[allow(dead_code)]
pub fn exec_cmd(mut self, cmd: String) -> Self {
self.exec_cmd = Some(cmd);
self
}
pub fn build(mut self) -> Result<KtstrVm> {
let (pinning_plan, mbind_nodes, cpu_locks) = if self.performance_mode {
let (plan, host_topo) = self.validate_performance_mode()?;
let pinned_cpus: Vec<usize> = plan.assignments.iter().map(|a| a.1).collect();
let nodes = host_topo.numa_nodes_for_cpus(&pinned_cpus);
(Some(plan), nodes, Vec::new())
} else {
let total_cpus = self.topology.total_cpus() as usize;
let host_topo = host_topology::HostTopology::from_sysfs().ok();
let host_cpus = host_topo
.as_ref()
.map(|h| h.total_cpus())
.unwrap_or(total_cpus);
let locks =
host_topology::acquire_cpu_locks(total_cpus, host_cpus, host_topo.as_ref())?;
(None, Vec::new(), locks)
};
let kernel = self.kernel.context("kernel path required")?;
anyhow::ensure!(kernel.exists(), "kernel not found: {}", kernel.display());
let t = &self.topology;
anyhow::ensure!(t.sockets > 0, "sockets must be > 0");
anyhow::ensure!(t.cores_per_socket > 0, "cores_per_socket must be > 0");
anyhow::ensure!(t.threads_per_core > 0, "threads_per_core must be > 0");
if let Some(ref bin) = self.init_binary
&& !bin.starts_with("/proc/")
{
anyhow::ensure!(bin.exists(), "init binary not found: {}", bin.display());
}
if let Some(ref bin) = self.scheduler_binary {
anyhow::ensure!(
bin.exists(),
"scheduler binary not found: {}",
bin.display()
);
}
Ok(KtstrVm {
kernel,
init_binary: self.init_binary,
scheduler_binary: self.scheduler_binary,
run_args: self.run_args,
sched_args: self.sched_args,
topology: self.topology,
memory_mb: self.memory_mb,
memory_min_mb: self.memory_min_mb,
cmdline_extra: self.cmdline_extra,
timeout: self.timeout,
shm_size: self.shm_size,
monitor_thresholds: self.monitor_thresholds,
watchdog_timeout: self.watchdog_timeout,
bpf_map_write: self.bpf_map_write,
performance_mode: self.performance_mode,
pinning_plan,
mbind_nodes,
cpu_locks,
sched_enable_cmds: self.sched_enable_cmds,
sched_disable_cmds: self.sched_disable_cmds,
include_files: self.include_files,
busybox: self.busybox,
dmesg: self.dmesg,
exec_cmd: self.exec_cmd,
})
}
fn validate_performance_mode(
&mut self,
) -> Result<(host_topology::PinningPlan, host_topology::HostTopology)> {
let host_topo = host_topology::HostTopology::from_sysfs()
.context("performance_mode: read host topology")?;
let t = &self.topology;
let total_vcpus = t.total_cpus();
let llcs_needed = t.sockets as usize;
let reserved: usize = host_topo
.llc_groups
.iter()
.take(llcs_needed)
.map(|g| g.cpus.len())
.sum();
let total_reserved = reserved + 1; if total_reserved > host_topo.total_cpus() {
return Err(anyhow::Error::new(host_topology::ResourceContention {
reason: format!(
"performance_mode: need {} CPUs ({} across {} LLCs + 1 service) \
but only {} host CPUs available",
total_reserved,
reserved,
llcs_needed,
host_topo.total_cpus(),
),
}));
}
let plan = acquire_slot_with_locks(
&host_topo,
t.sockets,
t.cores_per_socket,
t.threads_per_core,
)?;
if let Some(mb) = self.memory_mb {
let free = host_topology::hugepages_free();
let needed = host_topology::hugepages_needed(mb);
if free == 0 {
eprintln!(
"performance_mode: WARNING: no 2MB hugepages available, \
guest memory will use regular pages",
);
} else if free < needed {
eprintln!(
"performance_mode: WARNING: need {} 2MB hugepages, \
only {} free — falling back to regular pages",
needed, free,
);
}
}
if let Some((running, total)) = host_topology::host_load_estimate() {
let threshold = (total_vcpus as f64 * 0.5) as usize;
if running > threshold {
eprintln!(
"performance_mode: WARNING: {} processes running on {} CPUs \
(threshold {} for {} vCPUs) — results may be noisy",
running, total, threshold, total_vcpus,
);
}
}
Ok((plan, host_topo))
}
}
fn acquire_slot_with_locks(
host_topo: &host_topology::HostTopology,
sockets: u32,
cores_per_socket: u32,
threads_per_core: u32,
) -> Result<host_topology::PinningPlan> {
let num_llcs = host_topo.llc_groups.len();
let sockets_needed = sockets as usize;
let max_slots = num_llcs
.checked_div(sockets_needed)
.unwrap_or(num_llcs)
.max(1);
let llc_mode = host_topology::LlcLockMode::Exclusive;
for slot in 0..max_slots {
let offset = slot * sockets_needed;
let llc_indices: Vec<usize> = (offset..offset + sockets_needed).collect();
let candidate = host_topo
.compute_pinning(sockets, cores_per_socket, threads_per_core, true, offset)
.context("performance_mode: topology mapping")?;
match host_topology::acquire_resource_locks(&candidate, &llc_indices, llc_mode)? {
host_topology::LockOutcome::Acquired { locks, .. } => {
let mut plan = candidate;
plan.locks = locks;
eprintln!(
"performance_mode: reserved LLC slot {} (offset {}, max {})",
slot, offset, max_slots,
);
return Ok(plan);
}
host_topology::LockOutcome::Unavailable(_) => continue,
}
}
Err(anyhow::Error::new(host_topology::ResourceContention {
reason: format!("all {max_slots} LLC slots busy"),
}))
}
static SAVED_TERMIOS_FD: std::sync::atomic::AtomicI32 = std::sync::atomic::AtomicI32::new(-1);
static mut SAVED_TERMIOS: std::mem::MaybeUninit<libc::termios> = std::mem::MaybeUninit::uninit();
extern "C" fn terminal_restore_signal_handler(sig: libc::c_int) {
let fd = SAVED_TERMIOS_FD.load(std::sync::atomic::Ordering::Acquire);
if fd >= 0 {
unsafe {
libc::tcsetattr(
fd,
libc::TCSANOW,
std::ptr::addr_of!(SAVED_TERMIOS).cast::<libc::termios>(),
);
}
}
unsafe {
libc::raise(sig);
}
}
struct TerminalRawGuard {
original: nix::sys::termios::Termios,
fd: std::os::unix::io::RawFd,
prev_sigint: libc::sigaction,
prev_sigterm: libc::sigaction,
prev_sigquit: libc::sigaction,
}
impl TerminalRawGuard {
fn enter() -> Result<Self> {
use nix::sys::termios::{self, SetArg};
use std::os::unix::io::AsRawFd;
let fd = std::io::stdin().as_raw_fd();
let borrowed = unsafe { std::os::unix::io::BorrowedFd::borrow_raw(fd) };
let original = termios::tcgetattr(borrowed).context("tcgetattr")?;
let mut raw = original.clone();
termios::cfmakeraw(&mut raw);
termios::tcsetattr(borrowed, SetArg::TCSANOW, &raw).context("tcsetattr raw")?;
unsafe {
let ptr = std::ptr::addr_of_mut!(SAVED_TERMIOS);
(*ptr).write(original.clone().into());
}
SAVED_TERMIOS_FD.store(fd, std::sync::atomic::Ordering::Release);
let mut prev_sigint: libc::sigaction = unsafe { std::mem::zeroed() };
let mut prev_sigterm: libc::sigaction = unsafe { std::mem::zeroed() };
let mut prev_sigquit: libc::sigaction = unsafe { std::mem::zeroed() };
unsafe {
let mut sa: libc::sigaction = std::mem::zeroed();
sa.sa_sigaction = terminal_restore_signal_handler as *const () as usize;
sa.sa_flags = libc::SA_RESETHAND;
libc::sigemptyset(&mut sa.sa_mask);
libc::sigaction(libc::SIGINT, &sa, &mut prev_sigint);
libc::sigaction(libc::SIGTERM, &sa, &mut prev_sigterm);
libc::sigaction(libc::SIGQUIT, &sa, &mut prev_sigquit);
}
Ok(Self {
original,
fd,
prev_sigint,
prev_sigterm,
prev_sigquit,
})
}
}
impl Drop for TerminalRawGuard {
fn drop(&mut self) {
SAVED_TERMIOS_FD.store(-1, std::sync::atomic::Ordering::Release);
let borrowed = unsafe { std::os::unix::io::BorrowedFd::borrow_raw(self.fd) };
let _ = nix::sys::termios::tcsetattr(
borrowed,
nix::sys::termios::SetArg::TCSANOW,
&self.original,
);
unsafe {
libc::sigaction(libc::SIGINT, &self.prev_sigint, std::ptr::null_mut());
libc::sigaction(libc::SIGTERM, &self.prev_sigterm, std::ptr::null_mut());
libc::sigaction(libc::SIGQUIT, &self.prev_sigquit, std::ptr::null_mut());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builder_default() {
let b = KtstrVmBuilder::default();
assert_eq!(b.memory_mb, Some(256));
assert_eq!(b.topology.total_cpus(), 1);
}
#[test]
fn builder_topology() {
let b = KtstrVmBuilder::default().topology(2, 4, 2);
assert_eq!(b.topology.total_cpus(), 16);
assert_eq!(b.topology.sockets, 2);
}
#[test]
fn builder_requires_kernel() {
let result = KtstrVmBuilder::default().build();
assert!(result.is_err());
}
#[test]
fn builder_rejects_missing_kernel() {
let result = KtstrVmBuilder::default()
.kernel("/nonexistent/vmlinuz")
.build();
assert!(result.is_err());
}
#[test]
fn builder_chain() {
let b = KtstrVmBuilder::default()
.topology(2, 2, 2)
.memory_mb(4096)
.cmdline("root=/dev/sda")
.timeout(Duration::from_secs(300));
assert_eq!(b.memory_mb, Some(4096));
assert_eq!(b.topology.total_cpus(), 8);
assert_eq!(b.cmdline_extra, "root=/dev/sda");
assert_eq!(b.timeout, Duration::from_secs(300));
}
#[test]
fn builder_with_init_binary() {
let exe = crate::resolve_current_exe().unwrap();
let b = KtstrVmBuilder::default().init_binary(&exe);
assert_eq!(b.init_binary.as_deref(), Some(exe.as_path()));
}
#[test]
fn builder_rejects_missing_init_binary() {
let result = KtstrVmBuilder::default()
.kernel("/nonexistent/vmlinuz")
.init_binary("/nonexistent/binary")
.build();
assert!(result.is_err());
}
#[test]
fn builder_rejects_missing_scheduler_binary() {
let exe = crate::resolve_current_exe().unwrap();
let result = KtstrVmBuilder::default()
.kernel(&exe)
.scheduler_binary("/nonexistent/scheduler")
.build();
assert!(result.is_err());
}
#[test]
fn builder_run_args() {
let b = KtstrVmBuilder::default().run_args(&["run".into(), "--json".into()]);
assert_eq!(b.run_args, vec!["run", "--json"]);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn builder_kernel_dir_resolves_bzimage() {
let b = KtstrVmBuilder::default().kernel_dir("/some/linux");
assert_eq!(
b.kernel.as_deref(),
Some(std::path::Path::new("/some/linux/arch/x86/boot/bzImage"))
);
}
#[test]
fn vm_result_fields_carry_values() {
let r = VmResult {
success: true,
exit_code: 0,
duration: Duration::from_secs(5),
timed_out: false,
output: "hello world".into(),
stderr: "boot log".into(),
monitor: None,
shm_data: None,
stimulus_events: Vec::new(),
verifier_stats: Vec::new(),
kvm_stats: None,
crash_message: None,
};
assert!(r.success);
assert_eq!(r.exit_code, 0);
assert!(!r.timed_out);
assert_eq!(r.duration, Duration::from_secs(5));
assert_eq!(r.output, "hello world");
assert_eq!(r.stderr, "boot log");
assert!(r.monitor.is_none());
assert!(r.shm_data.is_none());
assert!(r.stimulus_events.is_empty());
let r2 = VmResult {
success: false,
exit_code: 1,
duration: Duration::from_millis(500),
timed_out: true,
output: String::new(),
stderr: String::new(),
monitor: None,
shm_data: None,
stimulus_events: Vec::new(),
verifier_stats: Vec::new(),
kvm_stats: None,
crash_message: None,
};
assert!(!r2.success);
assert_eq!(r2.exit_code, 1);
assert!(r2.timed_out);
assert_eq!(r2.duration, Duration::from_millis(500));
}
#[test]
fn vcpu_exit_flag_transitions() {
let exited = Arc::new(AtomicBool::new(false));
assert!(
!exited.load(Ordering::Acquire),
"initial state must be false"
);
let exited_clone = Arc::clone(&exited);
let handle = std::thread::spawn(move || {
exited_clone.store(true, Ordering::Release);
});
handle.join().unwrap();
assert!(
exited.load(Ordering::Acquire),
"flag must be true after cross-thread store"
);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn ap_mp_state_set_correctly() {
let topo = Topology {
sockets: 2,
cores_per_socket: 2,
threads_per_core: 1,
};
let vm = kvm::KtstrKvm::new(topo, 128, false).unwrap();
for vcpu in &vm.vcpus[1..] {
let state = vcpu.get_mp_state().unwrap();
assert_eq!(
state.mp_state,
kvm_bindings::KVM_MP_STATE_UNINITIALIZED,
"AP should default to UNINITIALIZED"
);
}
}
#[test]
fn vcpu_signal_is_sigrtmin() {
let sig = vcpu_signal();
assert!(sig >= libc::SIGRTMIN(), "signal should be >= SIGRTMIN");
assert!(sig <= libc::SIGRTMAX(), "signal should be <= SIGRTMAX");
}
#[test]
#[cfg(target_arch = "x86_64")]
fn boot_kernel_produces_output() {
let Some(kernel) = crate::find_kernel().unwrap() else {
return;
};
let vm = match KtstrVm::builder()
.kernel(&kernel)
.topology(1, 1, 1)
.memory_mb(256)
.timeout(Duration::from_secs(10))
.cmdline("loglevel=7")
.build()
{
Ok(vm) => vm,
Err(e)
if e.downcast_ref::<host_topology::ResourceContention>()
.is_some() =>
{
return;
}
Err(e) => panic!("{e:#}"),
};
let result = vm.run().unwrap();
assert!(
result.stderr.contains("Linux") || result.stderr.contains("Booting"),
"kernel console should contain boot messages"
);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn boot_kernel_smp_topology() {
let Some(kernel) = crate::find_kernel().unwrap() else {
return;
};
let vm = match KtstrVm::builder()
.kernel(&kernel)
.topology(2, 2, 1) .memory_mb(256)
.timeout(Duration::from_secs(10))
.cmdline("loglevel=7")
.build()
{
Ok(vm) => vm,
Err(e)
if e.downcast_ref::<host_topology::ResourceContention>()
.is_some() =>
{
return;
}
Err(e) => panic!("{e:#}"),
};
let result = vm.run().unwrap();
assert!(!result.stderr.is_empty(), "no console output from SMP boot");
}
#[test]
#[cfg(target_arch = "x86_64")]
fn bench_boot_time() {
let Some(kernel) = crate::find_kernel().unwrap() else {
return;
};
for (label, sockets, cores, threads, mem) in
[("1cpu", 1, 1, 1, 256), ("4cpu", 2, 2, 1, 512)]
{
let start = Instant::now();
let vm = match KtstrVm::builder()
.kernel(&kernel)
.topology(sockets, cores, threads)
.memory_mb(mem)
.timeout(Duration::from_secs(10))
.build()
{
Ok(vm) => vm,
Err(e)
if e.downcast_ref::<host_topology::ResourceContention>()
.is_some() =>
{
continue;
}
Err(e) => panic!("{e:#}"),
};
let setup = start.elapsed();
let result = vm.run().unwrap();
let boot_ms = result
.stderr
.lines()
.rev()
.find(|l| l.contains("Kernel panic") || l.contains("end Kernel panic"))
.and_then(|l| {
l.trim()
.strip_prefix('[')
.and_then(|s| s.split(']').next())
.and_then(|s| s.trim().parse::<f64>().ok())
})
.map(|s| (s * 1000.0) as u64)
.unwrap_or(0);
eprintln!(
"BENCH {label}: setup={:.0}ms kernel_boot={boot_ms}ms wall={:.0}ms timed_out={}",
setup.as_millis(),
result.duration.as_millis(),
result.timed_out,
);
}
}
#[test]
#[cfg(target_arch = "x86_64")]
fn kvm_has_immediate_exit_cap() {
let topo = Topology {
sockets: 1,
cores_per_socket: 1,
threads_per_core: 1,
};
let vm = kvm::KtstrKvm::new(topo, 64, false).unwrap();
assert!(
vm.has_immediate_exit,
"KVM_CAP_IMMEDIATE_EXIT should be available on modern kernels"
);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn immediate_exit_handle_set_clear() {
let topo = Topology {
sockets: 1,
cores_per_socket: 1,
threads_per_core: 1,
};
let mut vm = kvm::KtstrKvm::new(topo, 64, false).unwrap();
let handle = ImmediateExitHandle::from_vcpu(&mut vm.vcpus[0]);
assert_eq!(
vm.vcpus[0].get_kvm_run().immediate_exit,
0,
"immediate_exit should start at 0"
);
handle.set(1);
assert_eq!(
vm.vcpus[0].get_kvm_run().immediate_exit,
1,
"handle.set(1) should be visible via get_kvm_run()"
);
vm.vcpus[0].set_kvm_immediate_exit(0);
assert_eq!(
vm.vcpus[0].get_kvm_run().immediate_exit,
0,
"set_kvm_immediate_exit(0) should clear the flag"
);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn immediate_exit_handle_cross_vcpu() {
let topo = Topology {
sockets: 1,
cores_per_socket: 2,
threads_per_core: 1,
};
let mut vm = kvm::KtstrKvm::new(topo, 64, false).unwrap();
let h0 = ImmediateExitHandle::from_vcpu(&mut vm.vcpus[0]);
let h1 = ImmediateExitHandle::from_vcpu(&mut vm.vcpus[1]);
h0.set(1);
assert_eq!(vm.vcpus[0].get_kvm_run().immediate_exit, 1);
assert_eq!(
vm.vcpus[1].get_kvm_run().immediate_exit,
0,
"setting vcpu0 handle should not affect vcpu1"
);
h1.set(1);
assert_eq!(vm.vcpus[1].get_kvm_run().immediate_exit, 1);
h0.set(0);
h1.set(0);
assert_eq!(vm.vcpus[0].get_kvm_run().immediate_exit, 0);
assert_eq!(vm.vcpus[1].get_kvm_run().immediate_exit, 0);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn vcpu_thread_kick_sets_immediate_exit() {
let topo = Topology {
sockets: 1,
cores_per_socket: 1,
threads_per_core: 1,
};
let mut vm = kvm::KtstrKvm::new(topo, 64, false).unwrap();
let ie = ImmediateExitHandle::from_vcpu(&mut vm.vcpus[0]);
ie.set(1);
std::sync::atomic::fence(Ordering::Release);
assert_eq!(
vm.vcpus[0].get_kvm_run().immediate_exit,
1,
"kick pattern should set immediate_exit=1"
);
vm.vcpus[0].set_kvm_immediate_exit(0);
assert_eq!(vm.vcpus[0].get_kvm_run().immediate_exit, 0);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn find_vmlinux_from_bzimage_path() {
let tmp = std::env::temp_dir().join("ktstr-find-vmlinux-test");
let boot_dir = tmp.join("arch/x86/boot");
std::fs::create_dir_all(&boot_dir).unwrap();
let vmlinux = tmp.join("vmlinux");
std::fs::write(&vmlinux, b"ELF").unwrap();
let bzimage = boot_dir.join("bzImage");
std::fs::write(&bzimage, b"kernel").unwrap();
let found = find_vmlinux(&bzimage);
assert_eq!(found, Some(vmlinux));
std::fs::remove_dir_all(&tmp).unwrap();
}
#[test]
fn find_vmlinux_sibling() {
let tmp = std::env::temp_dir().join("ktstr-find-vmlinux-sibling");
std::fs::create_dir_all(&tmp).unwrap();
let vmlinux = tmp.join("vmlinux");
std::fs::write(&vmlinux, b"ELF").unwrap();
let kernel = tmp.join("bzImage");
std::fs::write(&kernel, b"kernel").unwrap();
let found = find_vmlinux(&kernel);
assert_eq!(found, Some(vmlinux));
std::fs::remove_dir_all(&tmp).unwrap();
}
#[test]
fn find_vmlinux_bare_filename() {
assert_eq!(find_vmlinux(Path::new("vmlinuz")), None);
}
#[test]
fn find_vmlinux_root_parent() {
let result = find_vmlinux(Path::new("/vmlinuz"));
if !Path::new("/vmlinux").exists() {
assert_eq!(result, None);
}
}
#[test]
fn builder_rejects_zero_sockets() {
let exe = crate::resolve_current_exe().unwrap();
let result = KtstrVmBuilder::default()
.kernel(&exe)
.topology(0, 2, 2)
.build();
assert!(result.is_err(), "sockets=0 should fail validation");
}
#[test]
fn builder_rejects_zero_cores() {
let exe = crate::resolve_current_exe().unwrap();
let result = KtstrVmBuilder::default()
.kernel(&exe)
.topology(2, 0, 2)
.build();
assert!(result.is_err(), "cores=0 should fail validation");
}
#[test]
fn builder_rejects_zero_threads() {
let exe = crate::resolve_current_exe().unwrap();
let result = KtstrVmBuilder::default()
.kernel(&exe)
.topology(2, 2, 0)
.build();
assert!(result.is_err(), "threads=0 should fail validation");
}
#[test]
fn vm_result_without_monitor_has_no_samples() {
let r = VmResult {
success: true,
exit_code: 0,
duration: Duration::from_secs(1),
timed_out: false,
output: "test output".into(),
stderr: String::new(),
monitor: None,
shm_data: None,
stimulus_events: Vec::new(),
verifier_stats: Vec::new(),
kvm_stats: None,
crash_message: None,
};
assert!(r.monitor.is_none());
assert_eq!(r.output, "test output");
assert_eq!(r.exit_code, 0);
}
#[test]
fn vm_result_with_monitor_carries_summary() {
use crate::monitor;
let summary = monitor::MonitorSummary {
prog_stats_deltas: None,
total_samples: 5,
max_imbalance_ratio: 3.5,
max_local_dsq_depth: 10,
stall_detected: true,
event_deltas: None,
schedstat_deltas: None,
..Default::default()
};
let report = monitor::MonitorReport {
samples: vec![],
summary: summary.clone(),
..Default::default()
};
let r = VmResult {
success: false,
exit_code: 1,
duration: Duration::from_millis(500),
timed_out: true,
output: String::new(),
stderr: "kernel panic".into(),
monitor: Some(report),
shm_data: None,
stimulus_events: Vec::new(),
verifier_stats: Vec::new(),
kvm_stats: None,
crash_message: None,
};
let mon = r.monitor.as_ref().unwrap();
assert_eq!(mon.summary.total_samples, 5);
assert!((mon.summary.max_imbalance_ratio - 3.5).abs() < f64::EPSILON);
assert_eq!(mon.summary.max_local_dsq_depth, 10);
assert!(mon.summary.stall_detected);
assert!(r.timed_out);
assert_eq!(r.exit_code, 1);
assert_eq!(r.stderr, "kernel panic");
}
#[test]
fn find_vmlinux_missing_returns_none() {
let tmp = std::env::temp_dir().join("ktstr-find-vmlinux-none");
std::fs::create_dir_all(&tmp).unwrap();
let kernel = tmp.join("bzImage");
std::fs::write(&kernel, b"kernel").unwrap();
assert_eq!(find_vmlinux(&kernel), None);
std::fs::remove_dir_all(&tmp).unwrap();
}
#[test]
#[cfg(target_arch = "x86_64")]
fn boot_kernel_with_monitor() {
let Some(kernel) = crate::find_kernel().unwrap() else {
return;
};
let Some(_vmlinux) = find_vmlinux(&kernel) else {
return;
};
let vm = match KtstrVm::builder()
.kernel(&kernel)
.topology(1, 2, 1)
.memory_mb(256)
.timeout(Duration::from_secs(10))
.build()
{
Ok(vm) => vm,
Err(e)
if e.downcast_ref::<host_topology::ResourceContention>()
.is_some() =>
{
return;
}
Err(e) => panic!("{e:#}"),
};
let result = vm.run().unwrap();
if let Some(ref report) = result.monitor {
assert!(
report.summary.total_samples > 0,
"monitor should have collected at least one sample"
);
}
}
#[test]
fn base_key_same_inputs_match() {
let exe = crate::resolve_current_exe().unwrap();
let k1 = BaseKey::new(&exe, None).unwrap();
let k2 = BaseKey::new(&exe, None).unwrap();
assert_eq!(k1, k2);
}
#[test]
fn base_key_nonexistent_payload_fails() {
let result = BaseKey::new(Path::new("/nonexistent/binary"), None);
assert!(result.is_err());
}
#[test]
fn base_key_different_content_differs() {
let tmp =
std::env::temp_dir().join(format!("ktstr-cache-content-test-{}", std::process::id()));
std::fs::create_dir_all(&tmp).unwrap();
let bin = tmp.join("payload");
std::fs::write(&bin, b"content_v1").unwrap();
let k1 = BaseKey::new(&bin, None).unwrap();
std::fs::write(&bin, b"content_v2").unwrap();
let k2 = BaseKey::new(&bin, None).unwrap();
assert_ne!(
k1, k2,
"different file content should produce different key"
);
std::fs::remove_dir_all(&tmp).unwrap();
}
#[test]
fn base_key_with_scheduler() {
let exe = crate::resolve_current_exe().unwrap();
let k1 = BaseKey::new(&exe, None).unwrap();
let k2 = BaseKey::new(&exe, Some(&exe)).unwrap();
assert_ne!(k1, k2, "with vs without scheduler should differ");
}
#[test]
fn hash_file_large_file() {
let tmp =
std::env::temp_dir().join(format!("ktstr-hash-sample-test-{}", std::process::id()));
std::fs::create_dir_all(&tmp).unwrap();
let f = tmp.join("big");
let data: Vec<u8> = (0..16384).map(|i| (i % 256) as u8).collect();
std::fs::write(&f, &data).unwrap();
let h = hash_file(&f).unwrap();
assert_eq!(h, hash_file(&f).unwrap());
std::fs::remove_dir_all(&tmp).unwrap();
}
#[test]
fn base_cache_hit() {
let exe = crate::resolve_current_exe().unwrap();
let key = BaseKey::new(&exe, None).unwrap();
let sentinel = Arc::new(vec![0xDE, 0xAD]);
base_cache()
.lock()
.unwrap()
.insert(key.clone(), sentinel.clone());
let cached = base_cache().lock().unwrap().get(&key).cloned();
assert!(cached.is_some());
assert!(Arc::ptr_eq(&cached.unwrap(), &sentinel));
base_cache().lock().unwrap().remove(&key);
}
#[test]
fn shm_store_and_load_roundtrip() {
let hash = 0xDEAD_BEEF_CAFE_1234u64;
let data = vec![0x07u8, 0x07, 0x01]; initramfs::shm_store_base(hash, &data).unwrap();
let loaded = initramfs::shm_load_base(hash);
assert!(loaded.is_some(), "shm_load_base should return Some");
assert_eq!(loaded.unwrap().as_ref(), &data[..]);
initramfs::shm_unlink_base(hash);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn dispatch_io_out_i8042_reset_is_shutdown_signal() {
let com1 = PiMutex::new(console::Serial::new(console::COM1_BASE));
let com2 = PiMutex::new(console::Serial::new(console::COM2_BASE));
assert!(
dispatch_io_out(&com1, &com2, I8042_CMD_PORT, &[I8042_CMD_RESET_CPU]),
"I8042 reset (0xFE to port 0x64) must signal shutdown"
);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn dispatch_io_out_i8042_non_reset() {
let com1 = PiMutex::new(console::Serial::new(console::COM1_BASE));
let com2 = PiMutex::new(console::Serial::new(console::COM2_BASE));
assert!(!dispatch_io_out(&com1, &com2, I8042_CMD_PORT, &[0x00]));
}
#[test]
#[cfg(target_arch = "x86_64")]
fn dispatch_io_out_serial_com1() {
let com1 = PiMutex::new(console::Serial::new(console::COM1_BASE));
let com2 = PiMutex::new(console::Serial::new(console::COM2_BASE));
assert!(!dispatch_io_out(&com1, &com2, console::COM1_BASE, b"A"));
}
#[test]
#[cfg(target_arch = "x86_64")]
fn dispatch_io_out_serial_com2() {
let com1 = PiMutex::new(console::Serial::new(console::COM1_BASE));
let com2 = PiMutex::new(console::Serial::new(console::COM2_BASE));
assert!(!dispatch_io_out(&com1, &com2, console::COM2_BASE, b"B"));
let output = com2.lock().output();
assert!(output.contains('B'));
}
#[test]
#[cfg(target_arch = "x86_64")]
fn dispatch_io_out_unknown_port() {
let com1 = PiMutex::new(console::Serial::new(console::COM1_BASE));
let com2 = PiMutex::new(console::Serial::new(console::COM2_BASE));
assert!(!dispatch_io_out(&com1, &com2, 0x1234, &[0xFF]));
}
#[test]
#[cfg(target_arch = "x86_64")]
fn dispatch_io_in_i8042_status() {
let com1 = PiMutex::new(console::Serial::new(console::COM1_BASE));
let com2 = PiMutex::new(console::Serial::new(console::COM2_BASE));
let mut data = [0xFFu8; 1];
dispatch_io_in(&com1, &com2, I8042_CMD_PORT, &mut data);
assert_eq!(data[0], 0);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn dispatch_io_in_i8042_data() {
let com1 = PiMutex::new(console::Serial::new(console::COM1_BASE));
let com2 = PiMutex::new(console::Serial::new(console::COM2_BASE));
let mut data = [0xFFu8; 1];
dispatch_io_in(&com1, &com2, I8042_DATA_PORT, &mut data);
assert_eq!(data[0], 0);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn dispatch_io_in_unknown_port() {
let com1 = PiMutex::new(console::Serial::new(console::COM1_BASE));
let com2 = PiMutex::new(console::Serial::new(console::COM2_BASE));
let mut data = [0xFFu8; 1];
dispatch_io_in(&com1, &com2, 0x1234, &mut data);
assert_eq!(data[0], 0xFF, "unknown port should not modify data");
}
#[test]
fn pi_mutex_lock_unlock() {
let m = PiMutex::new(42u32);
{
let mut guard = m.lock();
assert_eq!(*guard, 42);
*guard = 99;
}
assert_eq!(*m.lock(), 99);
}
#[test]
fn pi_mutex_cross_thread() {
let m = Arc::new(PiMutex::new(0u32));
let m2 = m.clone();
let handle = std::thread::spawn(move || {
*m2.lock() += 1;
});
handle.join().unwrap();
assert_eq!(*m.lock(), 1);
}
#[test]
fn builder_watchdog_timeout_default() {
let b = KtstrVmBuilder::default();
assert_eq!(b.watchdog_timeout, Some(Duration::from_secs(4)));
}
#[test]
fn builder_watchdog_timeout_override() {
let b = KtstrVmBuilder::default().watchdog_timeout(Duration::from_secs(5));
assert_eq!(b.watchdog_timeout, Some(Duration::from_secs(5)));
}
#[test]
fn builder_monitor_thresholds_sets() {
let t = crate::monitor::MonitorThresholds {
max_imbalance_ratio: 2.0,
..Default::default()
};
let b = KtstrVmBuilder::default().monitor_thresholds(t);
assert!(b.monitor_thresholds.is_some());
}
#[test]
fn builder_shm_size() {
let b = KtstrVmBuilder::default().shm_size(65536);
assert_eq!(b.shm_size, 65536);
}
#[test]
fn builder_sched_args() {
let b = KtstrVmBuilder::default().sched_args(&["--enable-borrow".into()]);
assert_eq!(b.sched_args, vec!["--enable-borrow"]);
}
#[test]
fn builder_performance_mode_default_false() {
let b = KtstrVmBuilder::default();
assert!(!b.performance_mode);
}
#[test]
fn builder_performance_mode_set() {
let b = KtstrVmBuilder::default().performance_mode(true);
assert!(b.performance_mode);
}
#[test]
fn builder_performance_mode_false_no_validation() {
let exe = crate::resolve_current_exe().unwrap();
let result = KtstrVmBuilder::default()
.kernel(&exe)
.topology(1, 1, 1)
.performance_mode(false)
.build();
match result {
Ok(_) => {}
Err(e)
if e.downcast_ref::<host_topology::ResourceContention>()
.is_some() =>
{
}
Err(e) => panic!("performance_mode=false should not validate host topology: {e:#}",),
}
}
#[test]
fn builder_performance_mode_oversubscribed_fails() {
let exe = crate::resolve_current_exe().unwrap();
let host_topo = host_topology::HostTopology::from_sysfs().unwrap();
let too_many = host_topo.total_cpus() as u32 + 1;
let result = KtstrVmBuilder::default()
.kernel(&exe)
.topology(1, too_many, 1)
.performance_mode(true)
.build();
match result {
Ok(_) => panic!("oversubscribed topology should fail"),
Err(e) => {
let msg = format!("{e}");
assert!(
msg.contains("performance_mode"),
"error should mention performance_mode: {msg}",
);
}
}
}
#[test]
fn builder_performance_mode_too_many_sockets_fails() {
let exe = crate::resolve_current_exe().unwrap();
let host_topo = host_topology::HostTopology::from_sysfs().unwrap();
let too_many_sockets = host_topo.llc_groups.len() as u32 + 1;
if (too_many_sockets as usize + 1) <= host_topo.total_cpus() {
let result = KtstrVmBuilder::default()
.kernel(&exe)
.topology(too_many_sockets, 1, 1)
.performance_mode(true)
.build();
assert!(result.is_err(), "more sockets than LLCs should fail",);
}
}
#[test]
fn builder_performance_mode_valid_succeeds() {
let exe = crate::resolve_current_exe().unwrap();
let host_topo = host_topology::HostTopology::from_sysfs().unwrap();
if host_topo.total_cpus() < 3 {
return;
}
let result = KtstrVmBuilder::default()
.kernel(&exe)
.topology(1, 2, 1)
.performance_mode(true)
.build();
match result {
Ok(_) => {}
Err(e)
if e.downcast_ref::<host_topology::ResourceContention>()
.is_some() =>
{
}
Err(e) => panic!("valid topology with performance_mode should build: {e:#}",),
}
}
#[test]
fn builder_performance_mode_preserves_in_vm() {
let exe = crate::resolve_current_exe().unwrap();
let host_topo = host_topology::HostTopology::from_sysfs().unwrap();
if host_topo.total_cpus() < 3 {
return;
}
let vm = match KtstrVmBuilder::default()
.kernel(&exe)
.topology(1, 2, 1)
.performance_mode(true)
.build()
{
Ok(vm) => vm,
Err(e)
if e.downcast_ref::<host_topology::ResourceContention>()
.is_some() =>
{
return;
}
Err(e) => panic!("{e:#}"),
};
assert!(vm.performance_mode);
}
#[test]
fn builder_performance_mode_false_preserves_in_vm() {
let exe = crate::resolve_current_exe().unwrap();
let vm = match KtstrVmBuilder::default()
.kernel(&exe)
.topology(1, 1, 1)
.performance_mode(false)
.build()
{
Ok(vm) => vm,
Err(e)
if e.downcast_ref::<host_topology::ResourceContention>()
.is_some() =>
{
return;
}
Err(e) => panic!("{e:#}"),
};
assert!(!vm.performance_mode);
}
#[test]
fn builder_performance_mode_mbind_nodes_populated() {
let exe = crate::resolve_current_exe().unwrap();
let host_topo = host_topology::HostTopology::from_sysfs().unwrap();
if host_topo.total_cpus() < 3 {
return;
}
let vm = KtstrVmBuilder::default()
.kernel(&exe)
.topology(1, 2, 1)
.performance_mode(true)
.build();
if let Ok(vm) = vm {
assert!(
!vm.mbind_nodes.is_empty(),
"mbind_nodes should be populated for performance_mode",
);
}
}
#[test]
fn shm_different_hashes_independent() {
let h1 = 0x1111_2222_3333_4444u64;
let h2 = 0x5555_6666_7777_8888u64;
let d1 = vec![0xAAu8; 16];
let d2 = vec![0xBBu8; 32];
initramfs::shm_store_base(h1, &d1).unwrap();
initramfs::shm_store_base(h2, &d2).unwrap();
assert_eq!(initramfs::shm_load_base(h1).unwrap().as_ref(), &d1[..]);
assert_eq!(initramfs::shm_load_base(h2).unwrap().as_ref(), &d2[..]);
initramfs::shm_unlink_base(h1);
initramfs::shm_unlink_base(h2);
}
#[test]
fn pi_mutex_concurrent_increment() {
let m = Arc::new(PiMutex::new(0u64));
let threads: Vec<_> = (0..8)
.map(|_| {
let m = m.clone();
std::thread::spawn(move || {
for _ in 0..1000 {
*m.lock() += 1;
}
})
})
.collect();
for t in threads {
t.join().unwrap();
}
assert_eq!(*m.lock(), 8000);
}
#[test]
fn pi_mutex_protocol_is_inherit() {
unsafe {
let mut attr: libc::pthread_mutexattr_t = std::mem::zeroed();
assert_eq!(libc::pthread_mutexattr_init(&mut attr), 0);
assert_eq!(
libc::pthread_mutexattr_setprotocol(&mut attr, libc::PTHREAD_PRIO_INHERIT),
0,
);
let mut protocol: libc::c_int = 0;
assert_eq!(libc::pthread_mutexattr_getprotocol(&attr, &mut protocol), 0);
assert_eq!(protocol, libc::PTHREAD_PRIO_INHERIT);
libc::pthread_mutexattr_destroy(&mut attr);
}
}
#[test]
fn set_rt_priority_applies_when_capable() {
let param = libc::sched_param { sched_priority: 1 };
let rc = unsafe { libc::sched_setscheduler(0, libc::SCHED_FIFO, ¶m) };
if rc != 0 {
eprintln!("skipping set_rt_priority test: no CAP_SYS_NICE");
return;
}
let policy = unsafe { libc::sched_getscheduler(0) };
assert_eq!(policy, libc::SCHED_FIFO);
let mut out_param: libc::sched_param = unsafe { std::mem::zeroed() };
unsafe { libc::sched_getparam(0, &mut out_param) };
assert_eq!(out_param.sched_priority, 1);
let restore = libc::sched_param { sched_priority: 0 };
unsafe { libc::sched_setscheduler(0, libc::SCHED_OTHER, &restore) };
}
#[test]
fn set_rt_priority_warns_without_cap() {
set_rt_priority(1, "test-thread");
}
#[cfg(target_arch = "aarch64")]
fn find_aarch64_image() -> Option<std::path::PathBuf> {
crate::find_kernel().unwrap()
}
#[test]
#[cfg(target_arch = "aarch64")]
fn boot_kernel_produces_output_aarch64() {
let Some(kernel) = find_aarch64_image() else {
eprintln!("skipping: no aarch64 Image found (only compressed vmlinuz available)");
return;
};
let vm = match KtstrVm::builder()
.kernel(&kernel)
.topology(1, 1, 1)
.memory_mb(256)
.timeout(Duration::from_secs(10))
.cmdline("loglevel=7")
.build()
{
Ok(vm) => vm,
Err(e)
if e.downcast_ref::<host_topology::ResourceContention>()
.is_some() =>
{
return;
}
Err(e) => panic!("{e:#}"),
};
let result = vm.run().unwrap();
assert!(
result.stderr.contains("Linux") || result.stderr.contains("Booting"),
"kernel console should contain boot messages, got: {}",
&result.stderr[..result.stderr.len().min(200)],
);
}
#[test]
#[cfg(target_arch = "aarch64")]
fn boot_kernel_smp_topology_aarch64() {
let Some(kernel) = find_aarch64_image() else {
eprintln!("skipping: no aarch64 Image found");
return;
};
let vm = match KtstrVm::builder()
.kernel(&kernel)
.topology(2, 2, 1) .memory_mb(256)
.timeout(Duration::from_secs(10))
.cmdline("loglevel=7")
.build()
{
Ok(vm) => vm,
Err(e)
if e.downcast_ref::<host_topology::ResourceContention>()
.is_some() =>
{
return;
}
Err(e) => panic!("{e:#}"),
};
let result = vm.run().unwrap();
assert!(!result.stderr.is_empty(), "no console output from SMP boot");
}
#[test]
#[cfg(target_arch = "aarch64")]
fn aarch64_kvm_has_immediate_exit() {
let topo = Topology {
sockets: 1,
cores_per_socket: 1,
threads_per_core: 1,
};
let vm = kvm::KtstrKvm::new(topo, 64, false).unwrap();
assert!(
vm.has_immediate_exit,
"KVM_CAP_IMMEDIATE_EXIT should be available on modern kernels"
);
}
#[test]
#[cfg(target_arch = "aarch64")]
fn builder_kernel_dir_resolves_image() {
let b = KtstrVmBuilder::default().kernel_dir("/some/linux");
assert_eq!(
b.kernel.as_deref(),
Some(std::path::Path::new("/some/linux/arch/arm64/boot/Image"))
);
}
}