#![cfg(test)]
#![allow(unused_imports)]
use super::super::affinity::*;
use super::super::config::*;
use super::super::types::*;
use super::super::worker::*;
use super::testing::*;
use super::*;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
#[test]
fn spawn_futex_fan_out_produces_work() {
let reports = spawn_and_collect_after(
WorkType::FutexFanOut {
fan_out: 4,
spin_iters: 1024,
},
5, 500,
);
assert_eq!(reports.len(), 5);
for r in &reports {
assert!(r.work_units > 0, "FutexFanOut worker {} did no work", r.tid);
}
}
#[test]
fn spawn_futex_fan_out_receivers_record_wake_latency() {
let config = WorkloadConfig {
num_workers: 5,
affinity: AffinityIntent::Inherit,
work_type: WorkType::FutexFanOut {
fan_out: 4,
spin_iters: 512,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(std::time::Duration::from_millis(500));
let reports = h.stop_and_collect();
let has_latencies = reports.iter().any(|r| !r.resume_latencies_ns.is_empty());
assert!(has_latencies, "receivers should record wake latencies");
}
#[test]
fn spawn_futex_fan_out_bad_worker_count_fails() {
let config = WorkloadConfig {
num_workers: 3, affinity: AffinityIntent::Inherit,
work_type: WorkType::FutexFanOut {
fan_out: 4,
spin_iters: 1024,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let result = WorkloadHandle::spawn(&config);
assert!(result.is_err());
let msg = format!("{:#}", result.err().unwrap());
assert!(
msg.contains("divisible by 5"),
"expected divisibility error: {msg}"
);
}
#[test]
fn spawn_futex_fan_out_two_groups() {
let config = WorkloadConfig {
num_workers: 10, affinity: AffinityIntent::Inherit,
work_type: WorkType::FutexFanOut {
fan_out: 4,
spin_iters: 512,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
assert_eq!(h.worker_pids().len(), 10);
h.start();
std::thread::sleep(std::time::Duration::from_millis(500));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 10);
for r in &reports {
assert!(r.work_units > 0, "worker {} did no work", r.tid);
}
}
#[test]
fn spawn_futex_fan_out_single_receiver() {
let config = WorkloadConfig {
num_workers: 2,
affinity: AffinityIntent::Inherit,
work_type: WorkType::FutexFanOut {
fan_out: 1,
spin_iters: 1024,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(std::time::Duration::from_millis(300));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 2);
for r in &reports {
assert!(r.work_units > 0, "worker {} did no work", r.tid);
}
}
#[test]
fn work_type_futex_fan_out_name() {
let wt = WorkType::FutexFanOut {
fan_out: 4,
spin_iters: 1024,
};
assert_eq!(wt.name(), "FutexFanOut");
}
#[test]
fn work_type_futex_fan_out_from_name() {
let wt = WorkType::from_name("FutexFanOut").unwrap();
match wt {
WorkType::FutexFanOut {
fan_out,
spin_iters,
} => {
assert_eq!(fan_out, 4);
assert_eq!(spin_iters, 1024);
}
_ => panic!("expected FutexFanOut"),
}
}
#[test]
fn work_type_futex_fan_out_group_size() {
let wt = WorkType::FutexFanOut {
fan_out: 4,
spin_iters: 1024,
};
assert_eq!(wt.worker_group_size(), Some(5));
}
#[test]
fn resolve_work_type_fan_out_group_size() {
let base = WorkType::SpinWait;
let over = WorkType::futex_fan_out(3, 100); let result = resolve_work_type(&base, Some(&over), true, 8); assert!(matches!(result, WorkType::FutexFanOut { .. }));
let fail = resolve_work_type(&base, Some(&over), true, 6); assert!(matches!(fail, WorkType::SpinWait));
}
#[test]
fn spawn_fan_out_compute_produces_work() {
let config = WorkloadConfig {
num_workers: 5, affinity: AffinityIntent::Inherit,
work_type: WorkType::FanOutCompute {
fan_out: 4,
cache_footprint_kb: 256,
operations: 5,
sleep_usec: 100,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(std::time::Duration::from_millis(500));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 5);
for r in &reports {
assert!(
r.work_units > 0,
"FanOutCompute worker {} did no work",
r.tid
);
}
assert!(
reports
.iter()
.filter(|r| !r.is_messenger)
.all(|r| !r.resume_latencies_ns.is_empty()),
"every FanOutCompute receiver must record at least one \
wake latency sample; got {:?}",
reports
.iter()
.map(|r| (r.tid, r.is_messenger, r.resume_latencies_ns.len()))
.collect::<Vec<_>>(),
);
const MAX_PLAUSIBLE_LATENCY_NS: u64 = 10_000_000_000;
for r in &reports {
for &lat in &r.resume_latencies_ns {
assert!(
lat < MAX_PLAUSIBLE_LATENCY_NS,
"worker {} recorded implausible wake latency {} ns \
(expected < {} ns); indicates wake_ns/generation \
ordering race. NB: lat==0 is LEGITIMATE under \
correct ordering — a Relaxed `wake_atom.load` \
paired with an Acquire gen load can see a wake_ns \
from a LATER round (gen+1's store becomes visible \
ahead of gen+1's wake_ns re-load), making \
now_ns < wake_ns and `saturating_sub` = 0. The \
reservoir-sampling of real latencies is dominated \
by positive values; a stray zero from this race \
is not a bug, so no lower bound is asserted.",
r.tid,
lat,
MAX_PLAUSIBLE_LATENCY_NS
);
}
}
}
#[test]
fn spawn_fan_out_compute_bad_worker_count_fails() {
let config = WorkloadConfig {
num_workers: 3,
affinity: AffinityIntent::Inherit,
work_type: WorkType::FanOutCompute {
fan_out: 4,
cache_footprint_kb: 256,
operations: 5,
sleep_usec: 100,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let result = WorkloadHandle::spawn(&config);
assert!(result.is_err());
let msg = format!("{:#}", result.err().unwrap());
assert!(
msg.contains("divisible by 5"),
"expected divisibility error: {msg}"
);
}
#[test]
fn spawn_fan_out_compute_two_groups() {
let config = WorkloadConfig {
num_workers: 10, affinity: AffinityIntent::Inherit,
work_type: WorkType::FanOutCompute {
fan_out: 4,
cache_footprint_kb: 256,
operations: 5,
sleep_usec: 100,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
assert_eq!(h.worker_pids().len(), 10);
h.start();
std::thread::sleep(std::time::Duration::from_millis(500));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 10);
for r in &reports {
assert!(
r.work_units > 0,
"FanOutCompute worker {} did no work",
r.tid
);
}
assert!(
reports
.iter()
.filter(|r| !r.is_messenger)
.all(|r| !r.resume_latencies_ns.is_empty()),
"every FanOutCompute receiver in both groups must record \
at least one wake latency sample; got {:?}",
reports
.iter()
.map(|r| (r.tid, r.is_messenger, r.resume_latencies_ns.len()))
.collect::<Vec<_>>(),
);
const MAX_PLAUSIBLE_LATENCY_NS: u64 = 10_000_000_000;
for r in &reports {
for &lat in &r.resume_latencies_ns {
assert!(
lat < MAX_PLAUSIBLE_LATENCY_NS,
"worker {} recorded implausible wake latency {} ns \
(expected < {} ns); indicates wake_ns/generation \
ordering race. NB: lat==0 is LEGITIMATE under \
correct ordering — a Relaxed `wake_atom.load` \
paired with an Acquire gen load can see a wake_ns \
from a LATER round (gen+1's store becomes visible \
ahead of gen+1's wake_ns re-load), making \
now_ns < wake_ns and `saturating_sub` = 0. The \
reservoir-sampling of real latencies is dominated \
by positive values; a stray zero from this race \
is not a bug, so no lower bound is asserted.",
r.tid,
lat,
MAX_PLAUSIBLE_LATENCY_NS
);
}
}
}