#![cfg(feature = "resil")]
#[cfg(target_os = "linux")]
use std::{sync::Arc, time::Duration};
#[cfg(target_os = "linux")]
use rs_zero::resil::{AdaptiveShedder, AdaptiveShedderConfig, LinuxCpuUsageProvider, WindowConfig};
#[cfg(target_os = "linux")]
use tokio::sync::Barrier;
#[cfg(target_os = "linux")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore = "Linux-only CPU provider pressure test"]
async fn linux_cpu_provider_and_shedder_pressure_are_observable() {
let seconds = env_u64("RS_ZERO_CPU_PRESSURE_SECONDS", 3);
let tasks = env_usize("RS_ZERO_CPU_PRESSURE_TASKS", 4).max(2);
let in_flight = env_usize("RS_ZERO_CPU_SHEDDER_IN_FLIGHT", 4).max(1);
let provider = Arc::new(LinuxCpuUsageProvider::new());
burn_cpu(Duration::from_millis(400), tasks).await;
let sampled_cpu = sample_cpu(provider.clone(), Duration::from_secs(seconds)).await;
assert!(
sampled_cpu <= 1000,
"CPU usage must stay in millicpu range, sampled={sampled_cpu}"
);
let shedder = AdaptiveShedder::with_cpu_usage_provider(
AdaptiveShedderConfig {
max_in_flight: in_flight,
min_request_count: 1,
max_latency: Duration::from_millis(1),
overload_in_flight_percent: 10,
cpu_threshold_millis: 1,
min_overload_factor_percent: 10,
cool_off: Duration::from_millis(100),
window: WindowConfig {
buckets: 8,
bucket_duration: Duration::from_millis(50),
},
},
provider,
);
seed_shedder_latency(&shedder).await;
let (accepted, rejected) = race_shedder(&shedder, tasks * 8).await;
let snapshot = shedder.snapshot().await;
let p95_proxy = snapshot.window.average_latency().unwrap_or_default();
eprintln!(
"linux-cpu-pressure accepted={accepted} rejected={rejected} sampled_cpu_millis={sampled_cpu} snapshot_cpu_millis={} p95_proxy_ms={} drops={}",
snapshot.cpu_usage_millis,
p95_proxy.as_millis(),
snapshot.window.drops,
);
assert!(accepted > 0, "pressure test should accept some work");
assert!(rejected > 0, "pressure test should reject overloaded work");
assert_eq!(
snapshot.in_flight, 0,
"guards should release in-flight count"
);
assert!(snapshot.window.drops >= rejected as u64);
}
#[cfg(not(target_os = "linux"))]
#[test]
#[ignore = "Linux-only CPU provider pressure test"]
fn linux_cpu_provider_and_shedder_pressure_are_observable() {
eprintln!("skipping Linux CPU pressure test on non-Linux target");
}
#[cfg(target_os = "linux")]
async fn sample_cpu(provider: Arc<LinuxCpuUsageProvider>, duration: Duration) -> u32 {
let deadline = std::time::Instant::now() + duration;
let mut max_cpu = 0;
while std::time::Instant::now() < deadline {
max_cpu = max_cpu.max(rs_zero::resil::CpuUsageProvider::cpu_usage_millis(
&*provider,
));
tokio::time::sleep(Duration::from_millis(250)).await;
}
max_cpu
}
#[cfg(target_os = "linux")]
async fn burn_cpu(duration: Duration, tasks: usize) {
let mut handles = Vec::new();
for _ in 0..tasks {
handles.push(tokio::task::spawn_blocking(move || {
let deadline = std::time::Instant::now() + duration;
let mut value = 0_u64;
while std::time::Instant::now() < deadline {
value = value.wrapping_mul(1_664_525).wrapping_add(1_013_904_223);
std::hint::black_box(value);
}
}));
}
for handle in handles {
handle.await.expect("cpu burner join");
}
}
#[cfg(target_os = "linux")]
async fn seed_shedder_latency(shedder: &AdaptiveShedder) {
for _ in 0..4 {
let guard = shedder.allow().await.expect("seed guard");
tokio::time::sleep(Duration::from_millis(3)).await;
guard.record_success().await;
}
}
#[cfg(target_os = "linux")]
async fn race_shedder(shedder: &AdaptiveShedder, calls: usize) -> (usize, usize) {
let barrier = Arc::new(Barrier::new(calls));
let mut handles = Vec::new();
for _ in 0..calls {
let shedder = shedder.clone();
let barrier = barrier.clone();
handles.push(tokio::spawn(async move {
barrier.wait().await;
match shedder.allow().await {
Ok(guard) => {
tokio::time::sleep(Duration::from_millis(10)).await;
guard.record_success().await;
true
}
Err(_) => false,
}
}));
}
let mut accepted = 0;
let mut rejected = 0;
for handle in handles {
if handle.await.expect("shedder join") {
accepted += 1;
} else {
rejected += 1;
}
}
(accepted, rejected)
}
#[cfg(target_os = "linux")]
fn env_u64(name: &str, default: u64) -> u64 {
std::env::var(name)
.ok()
.and_then(|value| value.parse().ok())
.unwrap_or(default)
}
#[cfg(target_os = "linux")]
fn env_usize(name: &str, default: usize) -> usize {
std::env::var(name)
.ok()
.and_then(|value| value.parse().ok())
.unwrap_or(default)
}