#![cfg(test)]
#![allow(unused_imports)]
use super::super::affinity::*;
use super::super::config::*;
use super::super::types::*;
use super::super::worker::*;
use super::testing::*;
use super::*;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
#[test]
fn worker_group_size_wake_chain() {
let wc = WorkType::wake_chain(8, WakeMechanism::Futex, Duration::from_micros(100));
assert_eq!(wc.worker_group_size(), Some(8));
let wc1 = WorkType::wake_chain(3, WakeMechanism::Pipe, Duration::from_micros(50));
assert_eq!(wc1.worker_group_size(), Some(3));
}
#[test]
fn wake_chain_pipe_bootstrap_once_invariant() {
const DEPTH: usize = 4;
const WORK_PER_HOP_MS: u64 = 50;
const TEST_WINDOW_MS: u64 = 1000;
const TOTAL_ITER_THRESHOLD: u64 = 40;
if require_isolated_cpus(DEPTH, "wake_chain_pipe_bootstrap_once_invariant") {
return;
}
let config = WorkloadConfig {
num_workers: DEPTH,
work_type: WorkType::WakeChain {
depth: DEPTH,
wake: WakeMechanism::Pipe,
work_per_hop: Duration::from_millis(WORK_PER_HOP_MS),
},
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).expect("WakeChain wake=Pipe spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(TEST_WINDOW_MS));
let reports = h.stop_and_collect();
assert_eq!(
reports.len(),
DEPTH,
"WakeChain wake=Pipe collects one report per worker"
);
let total_iters: u64 = reports.iter().map(|r| r.iterations).sum();
assert!(
total_iters <= TOTAL_ITER_THRESHOLD,
"WakeChain wake=Pipe total iterations across {DEPTH} stages \
exceeded {TOTAL_ITER_THRESHOLD} over {TEST_WINDOW_MS}ms with \
work_per_hop={WORK_PER_HOP_MS}ms (got {total_iters}). The \
bootstrap-once invariant requires only stage 0 to fire the \
initial pipe write; if every stage fires a bootstrap byte at \
iteration 0, the ring carries {DEPTH} simultaneous bytes and \
per-stage throughput rises by factor {DEPTH}. Expected \
correct total ~{}; expected buggy total ~{}. Per-worker \
reports: {:?}",
TEST_WINDOW_MS / WORK_PER_HOP_MS,
(TEST_WINDOW_MS / WORK_PER_HOP_MS) * (DEPTH as u64),
reports,
);
assert!(
total_iters >= 4,
"WakeChain wake=Pipe made fewer than one ring round-trip \
over {TEST_WINDOW_MS}ms (got {total_iters}, expected ≥ 4) — \
the bootstrap byte never completed a full lap. Per-worker \
reports: {:?}",
reports,
);
}
#[test]
fn wake_chain_pipe_no_repeat_bootstrap_invariant() {
const DEPTH: usize = 2;
const NUM_WORKERS: usize = 2;
const WORK_PER_HOP_MS: u64 = 50;
const TEST_WINDOW_MS: u64 = 1000;
const TOTAL_ITER_LOWER: u64 = 4;
const TOTAL_ITER_UPPER: u64 = 25;
if require_isolated_cpus(DEPTH, "wake_chain_pipe_no_repeat_bootstrap_invariant") {
return;
}
let config = WorkloadConfig {
num_workers: NUM_WORKERS,
work_type: WorkType::WakeChain {
depth: DEPTH,
wake: WakeMechanism::Pipe,
work_per_hop: Duration::from_millis(WORK_PER_HOP_MS),
},
..Default::default()
};
let mut h =
WorkloadHandle::spawn(&config).expect("WakeChain wake=Pipe depth=2 spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(TEST_WINDOW_MS));
let reports = h.stop_and_collect();
assert_eq!(
reports.len(),
NUM_WORKERS,
"WakeChain wake=Pipe depth=2 collects one report per worker"
);
let total_iters: u64 = reports.iter().map(|r| r.iterations).sum();
assert!(
(TOTAL_ITER_LOWER..=TOTAL_ITER_UPPER).contains(&total_iters),
"WakeChain wake=Pipe depth=2 total iterations over \
{TEST_WINDOW_MS}ms with work_per_hop={WORK_PER_HOP_MS}ms must \
land in [{TOTAL_ITER_LOWER}, {TOTAL_ITER_UPPER}] (got \
{total_iters}). Correct steady-state is ~20 (one wake per \
work_per_hop, summed across stages); a regression that drops \
the iterations==0 guard has stage 0 fire its bootstrap byte \
every iteration, queueing extra bytes in pipe[0] and \
unblocking stage 1's poll instantly — stage 0's poll on \
pipe[1] then resolves on stage 1's prior-iteration write, \
pushing stage 0 above the work_per_hop-bounded ceiling. \
Per-worker reports: {:?}",
reports,
);
let stage0_iters = reports[0].iterations;
let stage1_iters = reports[1].iterations;
let max_iters = stage0_iters.max(stage1_iters);
let min_iters = stage0_iters.min(stage1_iters);
assert!(
min_iters > 0 && max_iters <= min_iters.saturating_mul(2),
"WakeChain wake=Pipe depth=2 per-stage iteration counts must \
stay within 2× of each other (stage0={stage0_iters}, \
stage1={stage1_iters}). A regression that drops the \
iterations==0 guard has stage 0 fire its bootstrap byte every \
iteration, queueing bytes that bypass stage 1's wait and \
letting stage 0's poll resolve instantly on stage 1's prior \
write — symptom: stage 0 runs ahead of stage 1. Per-worker \
reports: {:?}",
reports,
);
}
#[test]
fn wake_chain_pipe_multi_chain_bootstrap_independence() {
const DEPTH: usize = 4;
const NUM_WORKERS: usize = 8;
const NUM_CHAINS: usize = NUM_WORKERS / DEPTH;
const WORK_PER_HOP_MS: u64 = 50;
const TEST_WINDOW_MS: u64 = 1000;
const PER_CHAIN_LOWER: u64 = 4;
const PER_CHAIN_UPPER: u64 = 30;
if require_isolated_cpus(
NUM_WORKERS,
"wake_chain_pipe_multi_chain_bootstrap_independence",
) {
return;
}
let config = WorkloadConfig {
num_workers: NUM_WORKERS,
work_type: WorkType::WakeChain {
depth: DEPTH,
wake: WakeMechanism::Pipe,
work_per_hop: Duration::from_millis(WORK_PER_HOP_MS),
},
..Default::default()
};
let mut h =
WorkloadHandle::spawn(&config).expect("WakeChain wake=Pipe multi-chain spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(TEST_WINDOW_MS));
let reports = h.stop_and_collect();
assert_eq!(
reports.len(),
NUM_WORKERS,
"WakeChain wake=Pipe multi-chain collects one report per worker"
);
let mut per_chain_totals: [u64; NUM_CHAINS] = [0; NUM_CHAINS];
for (i, r) in reports.iter().enumerate() {
let chain_idx = i / DEPTH;
per_chain_totals[chain_idx] += r.iterations;
}
for (chain_idx, &chain_total) in per_chain_totals.iter().enumerate() {
assert!(
(PER_CHAIN_LOWER..=PER_CHAIN_UPPER).contains(&chain_total),
"WakeChain wake=Pipe multi-chain: chain {chain_idx} total \
iterations over {TEST_WINDOW_MS}ms with \
work_per_hop={WORK_PER_HOP_MS}ms must land in \
[{PER_CHAIN_LOWER}, {PER_CHAIN_UPPER}] (got \
{chain_total}). Correct steady-state is ~20 per chain \
(one wake per work_per_hop across {DEPTH} stages); a \
chain-mixing regression has one chain stall while the \
other absorbs both bootstraps, or has the wrong stage \
fire its bootstrap. Per-chain totals: {:?}. Per-worker \
reports: {:?}",
per_chain_totals,
reports,
);
}
let max_chain = *per_chain_totals.iter().max().unwrap();
let min_chain = *per_chain_totals.iter().min().unwrap();
assert!(
min_chain > 0 && max_chain <= min_chain.saturating_mul(2),
"WakeChain wake=Pipe multi-chain: cross-chain iteration \
ratio must stay within 2× (max={max_chain}, min={min_chain}). \
Both chains receive the same scheduler attention modulo \
jitter; a > 2× spread indicates one chain is starving the \
other, which under independent fd ownership cannot happen \
unless a regression crosses chain indices. Per-chain totals: \
{:?}. Per-worker reports: {:?}",
per_chain_totals,
reports,
);
}
#[test]
fn pathology_wake_chain_iterates() {
let cfg = WorkloadConfig {
num_workers: 2,
work_type: WorkType::WakeChain {
depth: 2,
wake: WakeMechanism::Futex,
work_per_hop: Duration::from_micros(50),
},
..Default::default()
};
let mut h = WorkloadHandle::spawn(&cfg).expect("WakeChain must spawn");
h.start();
std::thread::sleep(Duration::from_millis(200));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 2);
let total: u64 = reports.iter().map(|r| r.iterations).sum();
assert!(total > 0, "WakeChain ring must iterate: {reports:?}");
}
#[test]
fn pathology_wake_chain_sync_iterates() {
let cfg = WorkloadConfig {
num_workers: 2,
work_type: WorkType::WakeChain {
depth: 2,
wake: WakeMechanism::Pipe,
work_per_hop: Duration::from_micros(50),
},
..Default::default()
};
let mut h = WorkloadHandle::spawn(&cfg).expect("WakeChain wake=Pipe must spawn");
h.start();
std::thread::sleep(Duration::from_millis(200));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 2);
for r in &reports {
assert!(
r.iterations > 0,
"WakeChain wake=Pipe worker must iterate: {r:?}"
);
}
}
#[test]
fn pathology_wake_chain_sync_deeper_chain() {
let cfg = WorkloadConfig {
num_workers: 4,
work_type: WorkType::WakeChain {
depth: 4,
wake: WakeMechanism::Pipe,
work_per_hop: Duration::from_micros(20),
},
..Default::default()
};
let mut h = WorkloadHandle::spawn(&cfg).expect("WakeChain wake=Pipe depth=4 must spawn");
h.start();
std::thread::sleep(Duration::from_millis(300));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 4);
for r in &reports {
assert!(
r.iterations > 0,
"WakeChain wake=Pipe depth=4 worker must iterate: {r:?}"
);
}
}
#[test]
fn pathology_wake_chain_sync_multi_chain() {
let cfg = WorkloadConfig {
num_workers: 4,
work_type: WorkType::WakeChain {
depth: 2,
wake: WakeMechanism::Pipe,
work_per_hop: Duration::from_micros(50),
},
..Default::default()
};
let mut h = WorkloadHandle::spawn(&cfg).expect("WakeChain wake=Pipe multi-chain must spawn");
h.start();
std::thread::sleep(Duration::from_millis(200));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 4);
for r in &reports {
assert!(
r.iterations > 0,
"WakeChain wake=Pipe multi-chain worker must iterate: {r:?}"
);
}
}
#[test]
fn wake_chain_spawn_rejects_non_multiple_num_workers() {
for &(num_workers, depth) in &[
(1usize, 2usize),
(3, 2),
(5, 2),
(7, 2),
(1, 4),
(2, 4),
(3, 4),
(5, 4),
(6, 4),
(7, 4),
] {
let cfg = WorkloadConfig {
num_workers,
work_type: WorkType::WakeChain {
depth,
wake: WakeMechanism::Pipe,
work_per_hop: Duration::from_micros(50),
},
..Default::default()
};
let err = WorkloadHandle::spawn(&cfg).err().unwrap_or_else(|| {
panic!(
"WakeChain spawn must reject num_workers={num_workers} \
with depth={depth} (not a positive multiple)",
)
});
let rendered = format!("{err:#}");
assert!(
rendered.contains("divisible by"),
"WakeChain rejection diagnostic must mention `divisible by`; \
num_workers={num_workers}, depth={depth}, got: {rendered}",
);
assert!(
rendered.contains(&depth.to_string()),
"WakeChain rejection diagnostic must name the offending depth \
({depth}); num_workers={num_workers}, got: {rendered}",
);
let typed = err
.downcast_ref::<WorkTypeValidationError>()
.unwrap_or_else(|| {
panic!(
"error must downcast to WorkTypeValidationError; \
num_workers={num_workers}, depth={depth}, err: {rendered}"
)
});
match typed {
WorkTypeValidationError::NonDivisibleWorkerCount {
name,
group_idx,
group_size,
num_workers: nw,
} => {
assert_eq!(
name, "WakeChain",
"name field must be WakeChain; got: {name}",
);
assert_eq!(
*group_idx, 0,
"primary group has group_idx == 0; got: {group_idx}",
);
assert_eq!(
*group_size, depth,
"group_size must equal depth; got: {group_size}",
);
assert_eq!(
*nw, num_workers,
"num_workers field must echo input; got: {nw}",
);
}
other => panic!(
"expected NonDivisibleWorkerCount; got: {other:?}; \
num_workers={num_workers}, depth={depth}"
),
}
}
}
#[test]
fn wake_chain_spawn_rejects_depth_one_pipe() {
let cfg = WorkloadConfig {
num_workers: 1,
work_type: WorkType::WakeChain {
depth: 1,
wake: WakeMechanism::Pipe,
work_per_hop: Duration::from_micros(50),
},
..Default::default()
};
let err = WorkloadHandle::spawn(&cfg)
.err()
.expect("WakeChain wake=Pipe with depth=1 must be rejected at spawn");
let rendered = format!("{err:#}");
assert!(
rendered.contains("depth must be >= 2"),
"diagnostic must mention `depth must be >= 2`; got: {rendered}",
);
let typed = err
.downcast_ref::<WorkTypeValidationError>()
.expect("error must downcast to WorkTypeValidationError");
assert!(
matches!(
typed,
WorkTypeValidationError::InsufficientWakeChainDepth {
depth: 1,
group_idx: 0,
}
),
"expected InsufficientWakeChainDepth {{ depth: 1, group_idx: 0 }}, got: {typed:?}",
);
}
#[test]
fn wake_chain_spawn_accepts_positive_multiples_of_depth() {
for &(num_workers, depth) in &[(2usize, 2usize), (4, 2), (6, 2), (4, 4), (8, 4), (12, 4)] {
let cfg = WorkloadConfig {
num_workers,
work_type: WorkType::WakeChain {
depth,
wake: WakeMechanism::Pipe,
work_per_hop: Duration::from_micros(20),
},
..Default::default()
};
let mut h = WorkloadHandle::spawn(&cfg).unwrap_or_else(|e| {
panic!(
"WakeChain spawn must accept num_workers={num_workers} \
with depth={depth} (positive multiple); err: {e:#}"
)
});
h.start();
std::thread::sleep(Duration::from_millis(20));
let _ = h.stop_and_collect();
}
}
#[test]
fn pathology_wake_chain_sync_stop_responsive() {
let cfg = WorkloadConfig {
num_workers: 2,
work_type: WorkType::WakeChain {
depth: 2,
wake: WakeMechanism::Pipe,
work_per_hop: Duration::from_micros(50),
},
..Default::default()
};
let mut h = WorkloadHandle::spawn(&cfg).expect("WakeChain wake=Pipe must spawn");
h.start();
std::thread::sleep(Duration::from_millis(200));
let stop_start = Instant::now();
let reports = h.stop_and_collect();
let stop_elapsed = stop_start.elapsed();
assert!(
stop_elapsed < Duration::from_millis(500),
"stop_and_collect took {stop_elapsed:?}, expected < 500ms"
);
assert_eq!(reports.len(), 2);
for r in &reports {
assert!(
r.completed,
"WakeChain wake=Pipe worker must complete on stop: {r:?}"
);
}
}