use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[path = "../src/runtime.rs"]
mod runtime;
#[path = "../src/metrics.rs"]
mod metrics;
#[test]
fn raft_task_scheduling_latency_p99_stays_bounded_under_app_saturation() {
let cfg = runtime::RuntimeConfig {
control_threads: 2,
app_threads: 2,
control_priority: runtime::ControlPriority::default(),
};
let split = runtime::SplitRuntime::new(cfg).expect("build split runtime");
let ctrl = split.control_handle();
let app = split.app_handle();
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = Arc::clone(&stop);
let probe = ctrl.spawn(async move {
let interval = Duration::from_millis(100);
let mut samples_recorded = 0u32;
while !stop_clone.load(Ordering::Relaxed) {
let start = std::time::Instant::now();
tokio::time::sleep(interval).await;
let elapsed = start.elapsed();
let lag = elapsed.saturating_sub(interval);
metrics::record_raft_task_poll_latency(lag);
samples_recorded += 1;
}
samples_recorded
});
let n_load = 20;
let load_handles: Vec<_> = (0..n_load)
.map(|_| {
let stop_clone = Arc::clone(&stop);
app.spawn(async move {
let mut acc: u64 = 0;
while !stop_clone.load(Ordering::Relaxed) {
for i in 0..1_000_000u64 {
acc = acc.wrapping_add(i.wrapping_mul(i));
}
tokio::task::yield_now().await;
}
acc
})
})
.collect();
std::thread::sleep(Duration::from_secs(5));
stop.store(true, Ordering::Relaxed);
let samples = ctrl.block_on(probe).expect("probe task did not panic");
for h in load_handles {
let _ = ctrl.block_on(h);
}
assert!(
samples >= 30,
"scheduling probe ran too few samples: {samples}; the probe was starved of CPU \
(which would itself indicate runtime split is broken)"
);
let p99 = metrics::raft_task_poll_latency_p99();
let has_priority_privilege = priority_privilege_available();
let limit = if has_priority_privilege { 0.010 } else { 0.100 };
assert!(
p99 <= limit,
"ACCEPTANCE GATE FAILED: raft_task_poll_latency p99 = {:.3}ms exceeds limit {:.3}ms \
(privilege={}, samples={}). The runtime split + admission caps are NOT keeping the \
Raft task scheduled promptly under app-runtime saturation. Investigate before \
merging RFC 009 PR-1.",
p99 * 1000.0,
limit * 1000.0,
has_priority_privilege,
samples
);
eprintln!(
"[acceptance gate] p99 = {:.3}ms (limit {:.3}ms, privilege={}, samples={})",
p99 * 1000.0,
limit * 1000.0,
has_priority_privilege,
samples
);
split.shutdown_timeout(Duration::from_secs(2));
}
fn priority_privilege_available() -> bool {
#[cfg(target_os = "linux")]
{
let handle = std::thread::spawn(|| unsafe {
let param = libc::sched_param { sched_priority: 1 };
libc::pthread_setschedparam(libc::pthread_self(), libc::SCHED_FIFO, ¶m) == 0
});
handle.join().unwrap_or(false)
}
#[cfg(not(target_os = "linux"))]
{
false
}
}