#![cfg(test)]
#![allow(unused_imports)]
use super::super::affinity::*;
use super::super::config::*;
use super::super::types::*;
use super::super::worker::*;
use super::testing::*;
use super::*;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
#[test]
fn join_thread_with_timeout_returns_result_on_quick_completion() {
use std::sync::Arc;
let exit_evt = Arc::new(vmm_sys_util::eventfd::EventFd::new(libc::EFD_NONBLOCK).unwrap());
let exit_evt_thread = Arc::clone(&exit_evt);
let join = std::thread::spawn(move || {
let _ = exit_evt_thread.write(1);
WorkerReport {
tid: 7,
..WorkerReport::default()
}
});
let r = join_thread_with_timeout(join, &exit_evt, Duration::from_secs(2));
match r {
Some(Ok(report)) => assert_eq!(report.tid, 7),
Some(Err(_)) => panic!("clean thread must not produce join Err"),
None => panic!("clean thread must not time out within 2s"),
}
}
#[test]
fn join_thread_with_timeout_returns_none_on_timeout() {
use std::sync::Arc;
let exit_evt = Arc::new(vmm_sys_util::eventfd::EventFd::new(libc::EFD_NONBLOCK).unwrap());
let join = std::thread::spawn(|| {
std::thread::sleep(Duration::from_millis(800));
WorkerReport::default()
});
let r = join_thread_with_timeout(join, &exit_evt, Duration::from_millis(100));
assert!(r.is_none(), "100ms timeout vs 800ms thread must time out");
}
#[test]
fn thread_worker_drop_joins_handle() {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
use std::sync::mpsc;
let stop = Arc::new(AtomicBool::new(false));
let observed = Arc::new(AtomicBool::new(false));
let stop_thread = Arc::clone(&stop);
let observed_thread = Arc::clone(&observed);
let (start_tx, start_rx) = mpsc::sync_channel::<()>(0);
let tid = Arc::new(AtomicI32::new(0));
let tid_thread = Arc::clone(&tid);
let exit_evt = Arc::new(vmm_sys_util::eventfd::EventFd::new(libc::EFD_NONBLOCK).unwrap());
let exit_evt_thread = Arc::clone(&exit_evt);
let join = std::thread::spawn(move || {
tid_thread.store(
unsafe { libc::syscall(libc::SYS_gettid) as libc::pid_t },
Ordering::Relaxed,
);
let _ = start_rx.recv();
while !stop_thread.load(Ordering::Relaxed) {
std::thread::sleep(Duration::from_millis(20));
}
observed_thread.store(true, Ordering::Relaxed);
let _ = exit_evt_thread.write(1);
WorkerReport::default()
});
let tw = ThreadWorker {
tid,
stop,
start_tx: Some(start_tx),
join: Some(join),
exit_evt,
};
if let Some(ref tx) = tw.start_tx {
let _ = tx.send(());
}
std::thread::sleep(Duration::from_millis(50));
drop(tw);
assert!(
observed.load(Ordering::Relaxed),
"ThreadWorker::drop must join its JoinHandle — observed=false \
means the drop returned without waiting for the worker, which \
would mean the worker was detached (Rust's default for \
JoinHandle::drop) instead of explicitly joined"
);
}
#[test]
fn spawn_thread_clone_mode_runs_to_completion() {
let config = WorkloadConfig {
num_workers: 2,
clone_mode: CloneMode::Thread,
work_type: WorkType::SpinWait,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).expect("Thread mode must spawn");
h.start();
std::thread::sleep(std::time::Duration::from_millis(150));
let pids = h.worker_pids();
assert_eq!(pids.len(), 2, "worker_pids must reflect both threads");
for tid in &pids {
assert!(*tid > 0, "thread tid must be a real gettid() value: {tid}");
}
assert_ne!(pids[0], pids[1], "sibling thread tids must differ");
let reports = h.stop_and_collect();
assert_eq!(
reports.len(),
2,
"thread mode collects one report per worker"
);
for r in &reports {
assert!(r.completed, "thread worker must complete: {:?}", r);
assert!(
r.work_units > 0,
"thread worker must do work: {}",
r.work_units
);
}
}
#[test]
fn spawn_thread_with_forkexit_rejected_at_spawn_time() {
let config = WorkloadConfig {
num_workers: 1,
clone_mode: CloneMode::Thread,
work_type: WorkType::ForkExit,
..Default::default()
};
let result = WorkloadHandle::spawn(&config);
let err = match result {
Ok(_) => panic!("Thread + ForkExit must bail at spawn"),
Err(e) => e,
};
let msg = format!("{err:#}");
assert!(
msg.contains("CloneMode::Thread")
&& msg.contains("WorkType::ForkExit")
&& msg.contains("CloneMode::Fork"),
"diagnostic must name both incompatible variants and the safe \
alternative: {msg}"
);
}
#[test]
fn spawn_thread_with_cgroupchurn_rejected_at_spawn_time() {
let config = WorkloadConfig {
num_workers: 1,
clone_mode: CloneMode::Thread,
work_type: WorkType::CgroupChurn {
groups: 2,
cycle_ms: 100,
},
..Default::default()
};
let result = WorkloadHandle::spawn(&config);
let err = match result {
Ok(_) => panic!("Thread + CgroupChurn must bail at spawn"),
Err(e) => e,
};
let msg = format!("{err:#}");
assert!(
msg.contains("CloneMode::Thread")
&& msg.contains("WorkType::CgroupChurn")
&& msg.contains("CloneMode::Fork"),
"diagnostic must name both incompatible variants and the safe \
alternative: {msg}"
);
}
#[test]
fn spawn_fork_with_epollstorm_rejected_at_spawn_time() {
let config = WorkloadConfig {
num_workers: 2,
clone_mode: CloneMode::Fork,
work_type: WorkType::EpollStorm {
producers: 1,
consumers: 1,
events_per_burst: 1,
},
..Default::default()
};
let result = WorkloadHandle::spawn(&config);
let err = match result {
Ok(_) => panic!("Fork + EpollStorm must bail at spawn"),
Err(e) => e,
};
let msg = format!("{err:#}");
assert!(
msg.contains("CloneMode::Fork")
&& msg.contains("WorkType::EpollStorm")
&& msg.contains("CloneMode::Thread"),
"diagnostic must name both incompatible variants and the safe \
alternative: {msg}"
);
}
#[test]
fn spawn_thread_panic_yields_panicked_exit_info() {
fn panic_immediately(_stop: &AtomicBool) -> WorkerReport {
panic!("test panic from thread worker");
}
let config = WorkloadConfig {
num_workers: 1,
clone_mode: CloneMode::Thread,
work_type: WorkType::custom("panic_immediately", panic_immediately),
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).expect("Thread spawn must succeed");
h.start();
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 1);
let r = &reports[0];
assert!(
!r.completed,
"panicked worker must NOT report completed=true"
);
match &r.exit_info {
Some(WorkerExitInfo::Panicked(msg)) => {
assert!(
msg.contains("test panic from thread worker"),
"panic message must round-trip from panic!() to exit_info: {msg}"
);
}
other => panic!("expected Panicked(_) exit_info on thread panic, got {other:?}",),
}
}
#[test]
fn spawn_thread_custom_stop_does_not_touch_global_stop() {
fn spin_until_stop(stop: &AtomicBool) -> WorkerReport {
let tid: libc::pid_t = unsafe { libc::syscall(libc::SYS_gettid) as libc::pid_t };
while !stop_requested(stop) {
std::thread::sleep(Duration::from_millis(10));
}
WorkerReport {
tid,
completed: true,
..WorkerReport::default()
}
}
STOP.store(false, Ordering::Relaxed);
let stop_before = STOP.load(Ordering::Relaxed);
assert!(
!stop_before,
"global STOP must be false before the test runs — \
a stale true from a prior test would mask the assertion"
);
let config = WorkloadConfig {
num_workers: 1,
clone_mode: CloneMode::Thread,
work_type: WorkType::custom("spin_until_stop", spin_until_stop),
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).expect("Thread spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(50));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 1);
assert!(
reports[0].completed,
"Custom thread worker must observe per-worker stop and \
return completed=true: got {:?}",
reports[0]
);
let stop_after = STOP.load(Ordering::Relaxed);
assert!(
!stop_after,
"global STOP must remain false after Thread-mode \
stop_and_collect — Thread mode flips per-worker flags \
only, never the global signal-handler flag"
);
}
#[test]
fn spawn_thread_workers_share_tgid() {
use std::sync::Mutex;
static WORKER_PIDTIDS: Mutex<Vec<(libc::pid_t, libc::pid_t)>> = Mutex::new(Vec::new());
fn record_pid_tid_then_spin(stop: &AtomicBool) -> WorkerReport {
let pid: libc::pid_t = unsafe { libc::getpid() };
let tid: libc::pid_t = unsafe { libc::syscall(libc::SYS_gettid) as libc::pid_t };
WORKER_PIDTIDS.lock().unwrap().push((pid, tid));
while !stop_requested(stop) {
std::thread::sleep(Duration::from_millis(10));
}
WorkerReport {
tid,
completed: true,
..WorkerReport::default()
}
}
let parent_pid: libc::pid_t = unsafe { libc::getpid() };
let parent_tid: libc::pid_t = unsafe { libc::syscall(libc::SYS_gettid) as libc::pid_t };
let config = WorkloadConfig {
num_workers: 2,
clone_mode: CloneMode::Thread,
work_type: WorkType::custom("record_pid_tid_then_spin", record_pid_tid_then_spin),
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).expect("Thread spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(50));
let _reports = h.stop_and_collect();
let captured = WORKER_PIDTIDS.lock().unwrap().clone();
assert_eq!(
captured.len(),
2,
"both workers must record their (pid, tid) before stop: got {captured:?}"
);
for (worker_pid, worker_tid) in &captured {
assert_eq!(
*worker_pid, parent_pid,
"Thread worker getpid()={worker_pid} must match parent \
getpid()={parent_pid} — std::thread shares the tgid",
);
assert_ne!(
*worker_tid, parent_tid,
"Thread worker gettid()={worker_tid} must differ from parent \
gettid()={parent_tid} — each std::thread is a distinct \
kernel task",
);
}
}
#[test]
fn spawn_thread_with_nicesweep_succeeds() {
let config = WorkloadConfig {
num_workers: 1,
clone_mode: CloneMode::Thread,
work_type: WorkType::NiceSweep,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config)
.expect("Thread + NiceSweep spawn must succeed (no incompatibility)");
h.start();
std::thread::sleep(Duration::from_millis(150));
let reports = h.stop_and_collect();
assert_eq!(
reports.len(),
1,
"Thread + NiceSweep must collect one report"
);
assert!(
reports[0].completed,
"Thread + NiceSweep worker must complete cleanly: {:?}",
reports[0]
);
}
#[test]
fn spawn_thread_drop_cleanup() {
use std::sync::atomic::AtomicUsize;
static EXITED_COUNT: AtomicUsize = AtomicUsize::new(0);
fn spin_then_record_exit(stop: &AtomicBool) -> WorkerReport {
let tid: libc::pid_t = unsafe { libc::syscall(libc::SYS_gettid) as libc::pid_t };
while !stop_requested(stop) {
std::thread::sleep(Duration::from_millis(5));
}
EXITED_COUNT.fetch_add(1, Ordering::SeqCst);
WorkerReport {
tid,
completed: true,
..WorkerReport::default()
}
}
let config = WorkloadConfig {
num_workers: 2,
clone_mode: CloneMode::Thread,
work_type: WorkType::custom("spin_then_record_exit", spin_then_record_exit),
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).expect("Thread spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(50));
drop(h);
let count = EXITED_COUNT.load(Ordering::SeqCst);
assert_eq!(
count, 2,
"both Thread workers must run to completion under \
WorkloadHandle::drop's join path (got {count}); a count \
below 2 indicates Drop timed out or abandoned a thread \
instead of joining it",
);
}
#[test]
fn spawn_thread_with_pipe_io() {
let config = WorkloadConfig {
num_workers: 2,
clone_mode: CloneMode::Thread,
work_type: WorkType::PipeIo { burst_iters: 1024 },
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).expect("Thread + PipeIo spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(200));
let reports = h.stop_and_collect();
assert_eq!(
reports.len(),
2,
"Thread + PipeIo collects one report per worker"
);
for r in &reports {
assert!(
r.work_units > 0,
"Thread + PipeIo worker tid={} ran zero iterations: {:?}",
r.tid,
r,
);
assert!(
!r.resume_latencies_ns.is_empty(),
"Thread + PipeIo worker tid={} captured zero wake-latency \
samples — its pipe ops never observed a partner write, \
which under shared-fd-table semantics means the pipe fds \
were closed before the worker reached pipe_exchange. \
work_units={} (bumped regardless of pipe success). Full \
report: {:?}",
r.tid,
r.work_units,
r,
);
}
}
#[test]
fn wake_chain_pipe_thread_mode_bootstrap_throughput() {
const DEPTH: usize = 4;
const WORK_PER_HOP_MS: u64 = 50;
const TEST_WINDOW_MS: u64 = 1000;
const TOTAL_ITER_THRESHOLD: u64 = 40;
if require_isolated_cpus(DEPTH, "wake_chain_pipe_thread_mode_bootstrap_throughput") {
return;
}
let config = WorkloadConfig {
num_workers: DEPTH,
clone_mode: CloneMode::Thread,
work_type: WorkType::WakeChain {
depth: DEPTH,
wake: WakeMechanism::Pipe,
work_per_hop: Duration::from_millis(WORK_PER_HOP_MS),
},
..Default::default()
};
let mut h =
WorkloadHandle::spawn(&config).expect("Thread + WakeChain wake=Pipe spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(TEST_WINDOW_MS));
let reports = h.stop_and_collect();
assert_eq!(
reports.len(),
DEPTH,
"Thread + WakeChain wake=Pipe collects one report per worker"
);
let total_iters: u64 = reports.iter().map(|r| r.iterations).sum();
assert!(
total_iters <= TOTAL_ITER_THRESHOLD,
"Thread + WakeChain wake=Pipe total iterations across {DEPTH} \
stages exceeded {TOTAL_ITER_THRESHOLD} over {TEST_WINDOW_MS}ms \
with work_per_hop={WORK_PER_HOP_MS}ms (got {total_iters}). \
Throughput is wall-clock-bounded; the bootstrap-once invariant \
holds identically under Thread mode. Expected correct total \
~{}; expected buggy total ~{}. Per-worker reports: {:?}",
TEST_WINDOW_MS / WORK_PER_HOP_MS,
(TEST_WINDOW_MS / WORK_PER_HOP_MS) * (DEPTH as u64),
reports,
);
assert!(
total_iters >= 4,
"Thread + WakeChain wake=Pipe made fewer than one ring \
round-trip over {TEST_WINDOW_MS}ms (got {total_iters}, \
expected ≥ 4) — the bootstrap byte never completed a full \
lap. Under Thread mode this typically means the pipe fds \
were closed before the workers reached the chain handoff \
site (a regression in `WorkloadHandle::chain_pipes` \
ownership transfer). Per-worker reports: {:?}",
reports,
);
}
#[test]
fn spawn_thread_with_wake_chain_pipe() {
let config = WorkloadConfig {
num_workers: 2,
clone_mode: CloneMode::Thread,
work_type: WorkType::WakeChain {
depth: 2,
wake: WakeMechanism::Pipe,
work_per_hop: Duration::from_micros(100),
},
..Default::default()
};
let mut h =
WorkloadHandle::spawn(&config).expect("Thread + WakeChain wake=Pipe spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(200));
let reports = h.stop_and_collect();
assert_eq!(
reports.len(),
2,
"Thread + WakeChain wake=Pipe collects one report per worker"
);
let total_samples: usize = reports.iter().map(|r| r.resume_latencies_ns.len()).sum();
assert!(
total_samples > 0,
"Thread + WakeChain wake=Pipe captured zero wake-latency \
samples across {} workers — the chain pipes never routed a \
stage handoff, which under shared-fd-table semantics means \
the pipe fds were closed before the workers reached the \
chain handoff site. Per-worker reports: {:?}",
reports.len(),
reports,
);
for r in &reports {
assert!(
r.work_units > 0,
"Thread + WakeChain wake=Pipe worker tid={} ran zero \
iterations: {:?}",
r.tid,
r,
);
}
}
#[test]
fn spawn_thread_with_futex_ping_pong() {
let config = WorkloadConfig {
num_workers: 2,
clone_mode: CloneMode::Thread,
work_type: WorkType::FutexPingPong { spin_iters: 1024 },
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).expect("Thread + FutexPingPong spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(200));
let reports = h.stop_and_collect();
assert_eq!(
reports.len(),
2,
"Thread + FutexPingPong collects one report per worker",
);
for r in &reports {
assert!(
r.work_units > 0,
"Thread + FutexPingPong worker tid={} did no work: {:?}",
r.tid,
r,
);
}
}
#[test]
fn spawn_thread_set_affinity_works_post_start() {
let config = WorkloadConfig {
num_workers: 1,
clone_mode: CloneMode::Thread,
work_type: WorkType::SpinWait,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).expect("Thread spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(50));
let cpus: BTreeSet<usize> = [0].into_iter().collect();
let result = h.set_affinity(0, &cpus);
assert!(
result.is_ok(),
"set_affinity(0, {{0}}) on a started Thread worker must succeed; \
got {:?}",
result.err(),
);
let _reports = h.stop_and_collect();
}
#[test]
fn thread_workers_share_tgid_with_harness() {
let config = WorkloadConfig {
num_workers: 3,
clone_mode: CloneMode::Thread,
work_type: WorkType::SpinWait,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).expect("Thread spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(100));
let pids = h.worker_pids();
assert_eq!(pids.len(), 3);
let harness_pid = unsafe { libc::getpid() };
for &tid in &pids {
let status = std::fs::read_to_string(format!("/proc/{tid}/status"))
.expect("must read /proc/<tid>/status for thread worker");
let tgid_line = status
.lines()
.find(|l| l.starts_with("Tgid:"))
.expect("status must include Tgid line");
let tgid: i32 = tgid_line
.trim_start_matches("Tgid:")
.trim()
.parse()
.expect("Tgid must be a parseable integer");
assert_eq!(
tgid, harness_pid,
"Thread worker tid={tid} must share tgid with test harness pid={harness_pid}; \
found Tgid={tgid}. Thread workers run inside the harness process — a \
distinct tgid would mean the dispatch silently forked instead."
);
}
let _ = h.stop_and_collect();
}
#[test]
fn thread_stop_and_collect_returns_within_bounded_deadline() {
fn spin_until_stop(stop: &AtomicBool) -> WorkerReport {
let tid: libc::pid_t = unsafe { libc::syscall(libc::SYS_gettid) as libc::pid_t };
while !stop_requested(stop) {
std::thread::sleep(Duration::from_millis(10));
}
WorkerReport {
tid,
completed: true,
..WorkerReport::default()
}
}
let config = WorkloadConfig {
num_workers: 4,
clone_mode: CloneMode::Thread,
work_type: WorkType::custom("spin_until_stop", spin_until_stop),
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).expect("Thread spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(50));
let started = std::time::Instant::now();
let reports = h.stop_and_collect();
let elapsed = started.elapsed();
assert!(
elapsed < Duration::from_secs(5),
"stop_and_collect must return inside 5s for cooperating workers; took {elapsed:?}"
);
assert_eq!(reports.len(), 4);
for r in &reports {
assert!(
r.completed,
"every worker must observe stop and return: {r:?}"
);
}
}