use anyhow::Result;
use ktstr::assert::{AssertDetail, AssertResult, DetailKind};
use ktstr::ktstr_test;
use ktstr::scenario::Ctx;
use ktstr::workload::{
AffinityIntent, CloneMode, SchedPolicy, WorkType, WorkloadConfig, WorkloadHandle,
};
#[ktstr_test(llcs = 1, cores = 2, threads = 1, memory_mb = 1024)]
fn thread_integration_spin_wait(ctx: &Ctx) -> Result<AssertResult> {
let config = WorkloadConfig {
num_workers: 2,
clone_mode: CloneMode::Thread,
work_type: WorkType::SpinWait,
affinity: AffinityIntent::Inherit,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut handle = WorkloadHandle::spawn(&config)?;
let pids = handle.worker_pids();
if pids.len() != 2 {
return Ok(failing_result(format!(
"Thread SpinWait expected 2 workers, got {}; spawn broken",
pids.len(),
)));
}
for tid in &pids {
if *tid <= 0 {
return Ok(failing_result(format!(
"Thread SpinWait worker reported non-positive tid={tid}; \
the gettid() publish path is broken under VM conditions",
)));
}
}
handle.start();
std::thread::sleep(ctx.duration);
let reports = handle.stop_and_collect();
let mut result = AssertResult::pass();
if reports.len() != 2 {
result.passed = false;
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"Thread SpinWait expected 2 reports, got {}; collection \
broken",
reports.len(),
),
));
return Ok(result);
}
for r in &reports {
if !r.completed {
result.passed = false;
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"Thread SpinWait worker tid={} did not complete; \
stop signaling broken under VM. exit_info={:?}",
r.tid, r.exit_info,
),
));
return Ok(result);
}
if r.work_units == 0 {
result.passed = false;
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"Thread SpinWait worker tid={} did no work; the \
spin loop never advanced under guest-kernel \
scheduling",
r.tid,
),
));
return Ok(result);
}
}
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"Thread SpinWait completed cleanly across {} workers; \
total work_units={}",
reports.len(),
reports.iter().map(|r| r.work_units).sum::<u64>(),
),
));
Ok(result)
}
#[ktstr_test(llcs = 1, cores = 2, threads = 1, memory_mb = 1024)]
fn thread_integration_futex_ping_pong(ctx: &Ctx) -> Result<AssertResult> {
let config = WorkloadConfig {
num_workers: 2,
clone_mode: CloneMode::Thread,
work_type: WorkType::FutexPingPong { spin_iters: 256 },
affinity: AffinityIntent::Inherit,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut handle = WorkloadHandle::spawn(&config)?;
handle.start();
std::thread::sleep(ctx.duration);
let reports = handle.stop_and_collect();
let mut result = AssertResult::pass();
if reports.len() != 2 {
result.passed = false;
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"Thread FutexPingPong expected 2 reports, got {}; \
spawn broken",
reports.len(),
),
));
return Ok(result);
}
for r in &reports {
if r.resume_latencies_ns.is_empty() {
result.passed = false;
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"Thread FutexPingPong worker tid={} captured zero \
wake-latency samples — the partner's futex wake \
never arrived. Under shared-mm semantics this \
means the futex word allocation is broken or the \
pair (0, 1) routing is mis-wired.",
r.tid,
),
));
return Ok(result);
}
if r.work_units == 0 {
result.passed = false;
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"Thread FutexPingPong worker tid={} did no work; \
the spin loop between wakes never advanced.",
r.tid,
),
));
return Ok(result);
}
}
let total_samples: usize = reports.iter().map(|r| r.resume_latencies_ns.len()).sum();
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"Thread FutexPingPong populated resume_latencies_ns: \
total_samples={total_samples} across {} workers",
reports.len(),
),
));
Ok(result)
}
#[ktstr_test(llcs = 1, cores = 2, threads = 1, memory_mb = 1024)]
fn thread_integration_page_fault_churn(ctx: &Ctx) -> Result<AssertResult> {
let config = WorkloadConfig {
num_workers: 2,
clone_mode: CloneMode::Thread,
work_type: WorkType::page_fault_churn(4096, 64, 8),
affinity: AffinityIntent::Inherit,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut handle = WorkloadHandle::spawn(&config)?;
handle.start();
std::thread::sleep(ctx.duration);
let reports = handle.stop_and_collect();
let mut result = AssertResult::pass();
if reports.len() != 2 {
result.passed = false;
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"Thread PageFaultChurn expected 2 reports, got {}; \
spawn broken",
reports.len(),
),
));
return Ok(result);
}
let total_iters: u64 = reports.iter().map(|r| r.iterations).sum();
if total_iters == 0 {
result.passed = false;
result.details.push(AssertDetail::new(
DetailKind::Other,
"Thread PageFaultChurn reported zero total iterations — \
the madvise(DONTNEED) → fault loop never advanced. \
Under shared-mm thread workers, each must still hold \
its own per-worker mmap region; check the per-iteration \
dispatch arm."
.to_string(),
));
return Ok(result);
}
for r in &reports {
if r.work_units == 0 {
result.passed = false;
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"Thread PageFaultChurn worker tid={} did no \
work; the first-touch fault never fired.",
r.tid,
),
));
return Ok(result);
}
}
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"Thread PageFaultChurn populated iterations: \
total_iterations={total_iters} across {} workers",
reports.len(),
),
));
Ok(result)
}
#[ktstr_test(llcs = 1, cores = 4, threads = 1, memory_mb = 1024)]
fn thread_integration_mutex_contention(ctx: &Ctx) -> Result<AssertResult> {
let config = WorkloadConfig {
num_workers: 4,
clone_mode: CloneMode::Thread,
work_type: WorkType::MutexContention {
contenders: 4,
hold_iters: 256,
work_iters: 1024,
},
affinity: AffinityIntent::Inherit,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut handle = WorkloadHandle::spawn(&config)?;
handle.start();
std::thread::sleep(ctx.duration);
let reports = handle.stop_and_collect();
let mut result = AssertResult::pass();
if reports.len() != 4 {
result.passed = false;
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"Thread MutexContention expected 4 reports, got {}; \
spawn broken",
reports.len(),
),
));
return Ok(result);
}
let total_iters: u64 = reports.iter().map(|r| r.iterations).sum();
if total_iters == 0 {
result.passed = false;
result.details.push(AssertDetail::new(
DetailKind::Other,
"Thread MutexContention reported zero total iterations — \
every worker failed to acquire the shared mutex. The \
futex_fastpath or its contention fallback is broken \
under VM."
.to_string(),
));
return Ok(result);
}
for r in &reports {
if r.work_units == 0 {
result.passed = false;
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"Thread MutexContention worker tid={} did no \
work; spawn produced an idle contender.",
r.tid,
),
));
return Ok(result);
}
}
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"Thread MutexContention populated iterations: \
total_iterations={total_iters} across {} workers",
reports.len(),
),
));
Ok(result)
}
fn failing_result(msg: String) -> AssertResult {
let mut r = AssertResult::pass();
r.passed = false;
r.details.push(AssertDetail::new(DetailKind::Other, msg));
r
}