rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
#![cfg(feature = "resil")]

#[cfg(target_os = "linux")]
use std::{sync::Arc, time::Duration};

#[cfg(target_os = "linux")]
use rs_zero::resil::{AdaptiveShedder, AdaptiveShedderConfig, LinuxCpuUsageProvider, WindowConfig};
#[cfg(target_os = "linux")]
use tokio::sync::Barrier;

#[cfg(target_os = "linux")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore = "Linux-only CPU provider pressure test"]
async fn linux_cpu_provider_and_shedder_pressure_are_observable() {
    let seconds = env_u64("RS_ZERO_CPU_PRESSURE_SECONDS", 3);
    let tasks = env_usize("RS_ZERO_CPU_PRESSURE_TASKS", 4).max(2);
    let in_flight = env_usize("RS_ZERO_CPU_SHEDDER_IN_FLIGHT", 4).max(1);
    let provider = Arc::new(LinuxCpuUsageProvider::new());

    burn_cpu(Duration::from_millis(400), tasks).await;
    let sampled_cpu = sample_cpu(provider.clone(), Duration::from_secs(seconds)).await;
    assert!(
        sampled_cpu <= 1000,
        "CPU usage must stay in millicpu range, sampled={sampled_cpu}"
    );

    let shedder = AdaptiveShedder::with_cpu_usage_provider(
        AdaptiveShedderConfig {
            max_in_flight: in_flight,
            min_request_count: 1,
            max_latency: Duration::from_millis(1),
            overload_in_flight_percent: 10,
            cpu_threshold_millis: 1,
            min_overload_factor_percent: 10,
            cool_off: Duration::from_millis(100),
            window: WindowConfig {
                buckets: 8,
                bucket_duration: Duration::from_millis(50),
            },
        },
        provider,
    );
    seed_shedder_latency(&shedder).await;

    let (accepted, rejected) = race_shedder(&shedder, tasks * 8).await;
    let snapshot = shedder.snapshot().await;
    let p95_proxy = snapshot.window.average_latency().unwrap_or_default();

    eprintln!(
        "linux-cpu-pressure accepted={accepted} rejected={rejected} sampled_cpu_millis={sampled_cpu} snapshot_cpu_millis={} p95_proxy_ms={} drops={}",
        snapshot.cpu_usage_millis,
        p95_proxy.as_millis(),
        snapshot.window.drops,
    );

    assert!(accepted > 0, "pressure test should accept some work");
    assert!(rejected > 0, "pressure test should reject overloaded work");
    assert_eq!(
        snapshot.in_flight, 0,
        "guards should release in-flight count"
    );
    assert!(snapshot.window.drops >= rejected as u64);
}

#[cfg(not(target_os = "linux"))]
#[test]
#[ignore = "Linux-only CPU provider pressure test"]
fn linux_cpu_provider_and_shedder_pressure_are_observable() {
    eprintln!("skipping Linux CPU pressure test on non-Linux target");
}

#[cfg(target_os = "linux")]
async fn sample_cpu(provider: Arc<LinuxCpuUsageProvider>, duration: Duration) -> u32 {
    let deadline = std::time::Instant::now() + duration;
    let mut max_cpu = 0;
    while std::time::Instant::now() < deadline {
        max_cpu = max_cpu.max(rs_zero::resil::CpuUsageProvider::cpu_usage_millis(
            &*provider,
        ));
        tokio::time::sleep(Duration::from_millis(250)).await;
    }
    max_cpu
}

#[cfg(target_os = "linux")]
async fn burn_cpu(duration: Duration, tasks: usize) {
    let mut handles = Vec::new();
    for _ in 0..tasks {
        handles.push(tokio::task::spawn_blocking(move || {
            let deadline = std::time::Instant::now() + duration;
            let mut value = 0_u64;
            while std::time::Instant::now() < deadline {
                value = value.wrapping_mul(1_664_525).wrapping_add(1_013_904_223);
                std::hint::black_box(value);
            }
        }));
    }
    for handle in handles {
        handle.await.expect("cpu burner join");
    }
}

#[cfg(target_os = "linux")]
async fn seed_shedder_latency(shedder: &AdaptiveShedder) {
    for _ in 0..4 {
        let guard = shedder.allow().await.expect("seed guard");
        tokio::time::sleep(Duration::from_millis(3)).await;
        guard.record_success().await;
    }
}

#[cfg(target_os = "linux")]
async fn race_shedder(shedder: &AdaptiveShedder, calls: usize) -> (usize, usize) {
    let barrier = Arc::new(Barrier::new(calls));
    let mut handles = Vec::new();
    for _ in 0..calls {
        let shedder = shedder.clone();
        let barrier = barrier.clone();
        handles.push(tokio::spawn(async move {
            barrier.wait().await;
            match shedder.allow().await {
                Ok(guard) => {
                    tokio::time::sleep(Duration::from_millis(10)).await;
                    guard.record_success().await;
                    true
                }
                Err(_) => false,
            }
        }));
    }

    let mut accepted = 0;
    let mut rejected = 0;
    for handle in handles {
        if handle.await.expect("shedder join") {
            accepted += 1;
        } else {
            rejected += 1;
        }
    }
    (accepted, rejected)
}

#[cfg(target_os = "linux")]
fn env_u64(name: &str, default: u64) -> u64 {
    std::env::var(name)
        .ok()
        .and_then(|value| value.parse().ok())
        .unwrap_or(default)
}

#[cfg(target_os = "linux")]
fn env_usize(name: &str, default: usize) -> usize {
    std::env::var(name)
        .ok()
        .and_then(|value| value.parse().ok())
        .unwrap_or(default)
}