rs-zero 0.2.8

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::{future::Future, sync::Arc, time::Duration};

use thiserror::Error;
use tokio::sync::Mutex;

use crate::resil::{
    SharedCpuUsageProvider, WindowConfig, WindowSnapshot, cpu::default_cpu_provider,
    shedder_state::ShedderState,
};

/// Adaptive shedder configuration.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AdaptiveShedderConfig {
    /// Maximum in-flight operations before immediate rejection.
    pub max_in_flight: usize,
    /// Minimum samples before latency-based shedding can reject requests.
    pub min_request_count: u64,
    /// Average latency threshold used for overload decisions.
    pub max_latency: Duration,
    /// In-flight percentage of `max_in_flight` required before latency overload drops.
    pub overload_in_flight_percent: u8,
    /// Rolling window used for latency and throughput samples.
    pub window: WindowConfig,
    /// CPU usage threshold in millicpu-style units where `1000` means 100%.
    pub cpu_threshold_millis: u32,
    /// Duration during which a recent drop keeps the shedder conservative.
    pub cool_off: Duration,
    /// Minimum overload factor when CPU is above threshold, in percent.
    pub min_overload_factor_percent: u8,
}

impl Default for AdaptiveShedderConfig {
    fn default() -> Self {
        Self {
            max_in_flight: 1024,
            min_request_count: 20,
            max_latency: Duration::from_millis(250),
            overload_in_flight_percent: 80,
            window: WindowConfig::default(),
            cpu_threshold_millis: 900,
            cool_off: Duration::from_secs(1),
            min_overload_factor_percent: 10,
        }
    }
}

/// Snapshot of the current shedder state.
#[derive(Debug, Clone, PartialEq)]
pub struct ShedderSnapshot {
    /// Current in-flight operations.
    pub in_flight: usize,
    /// Smoothed in-flight estimate updated when calls finish.
    pub avg_in_flight: f64,
    /// Last CPU usage sample in millicpu-style units.
    pub cpu_usage_millis: u32,
    /// Rolling window statistics.
    pub window: WindowSnapshot,
}

/// Error returned when adaptive shedding rejects an operation.
#[derive(Debug, Error, Clone, Copy, PartialEq, Eq)]
#[error("service overloaded")]
pub struct ShedderError;

/// Lightweight adaptive shedder using in-flight and rolling latency signals.
pub struct AdaptiveShedder {
    state: Arc<Mutex<ShedderState>>,
    cpu: SharedCpuUsageProvider,
}

impl Clone for AdaptiveShedder {
    fn clone(&self) -> Self {
        Self {
            state: self.state.clone(),
            cpu: self.cpu.clone(),
        }
    }
}

impl std::fmt::Debug for AdaptiveShedder {
    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        formatter
            .debug_struct("AdaptiveShedder")
            .finish_non_exhaustive()
    }
}

impl AdaptiveShedder {
    /// Creates a shedder from configuration.
    pub fn new(config: AdaptiveShedderConfig) -> Self {
        Self::with_cpu_usage_provider(config, default_cpu_provider())
    }

    /// Creates a shedder with an explicit CPU usage provider.
    pub fn with_cpu_usage_provider(
        config: AdaptiveShedderConfig,
        cpu: SharedCpuUsageProvider,
    ) -> Self {
        Self {
            state: Arc::new(Mutex::new(ShedderState::new(config))),
            cpu,
        }
    }

    /// Attempts to enter the protected operation.
    pub async fn allow(&self) -> Result<ShedderGuard, ShedderError> {
        let cpu = self.cpu.cpu_usage_millis();
        self.state.lock().await.allow(cpu)?;
        Ok(ShedderGuard {
            shedder: self.clone(),
            started_at: std::time::Instant::now(),
            completed: false,
        })
    }

    /// Runs a future after acquiring shedder capacity.
    pub async fn run<F, T>(&self, future: F) -> Result<T, ShedderError>
    where
        F: Future<Output = T>,
    {
        let guard = self.allow().await?;
        let value = future.await;
        guard.record_success().await;
        Ok(value)
    }

    /// Returns a snapshot of the current shedder state.
    pub async fn snapshot(&self) -> ShedderSnapshot {
        self.state.lock().await.snapshot()
    }

    async fn record_success(&self, latency: Duration) {
        self.state.lock().await.record_success(latency);
    }

    async fn record_failure(&self, latency: Duration) {
        self.state.lock().await.record_failure(latency);
    }
}

/// Guard returned by [`AdaptiveShedder::allow`].
#[derive(Debug)]
pub struct ShedderGuard {
    shedder: AdaptiveShedder,
    started_at: std::time::Instant,
    completed: bool,
}

impl ShedderGuard {
    /// Marks the protected operation as successful.
    pub async fn record_success(mut self) {
        self.shedder.record_success(self.started_at.elapsed()).await;
        self.completed = true;
    }

    /// Marks the protected operation as failed.
    pub async fn record_failure(mut self) {
        self.shedder.record_failure(self.started_at.elapsed()).await;
        self.completed = true;
    }
}

impl Drop for ShedderGuard {
    fn drop(&mut self) {
        if !self.completed {
            let shedder = self.shedder.clone();
            let latency = self.started_at.elapsed();
            if let Ok(handle) = tokio::runtime::Handle::try_current() {
                handle.spawn(async move {
                    shedder.record_failure(latency).await;
                });
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use super::*;
    use crate::resil::CpuUsageProvider;

    struct FixedCpu(u32);

    impl CpuUsageProvider for FixedCpu {
        fn cpu_usage_millis(&self) -> u32 {
            self.0
        }
    }

    #[tokio::test]
    async fn rejects_when_in_flight_is_full() {
        let shedder = AdaptiveShedder::new(AdaptiveShedderConfig {
            max_in_flight: 1,
            ..AdaptiveShedderConfig::default()
        });
        let _guard = shedder.allow().await.expect("first");

        assert!(shedder.allow().await.is_err());
    }

    #[tokio::test]
    async fn recovers_after_request_finishes() {
        let shedder = AdaptiveShedder::new(AdaptiveShedderConfig {
            max_in_flight: 1,
            ..AdaptiveShedderConfig::default()
        });
        shedder.allow().await.expect("first").record_success().await;

        assert!(shedder.allow().await.is_ok());
    }

    #[tokio::test]
    async fn cpu_snapshot_is_recorded() {
        let shedder = AdaptiveShedder::with_cpu_usage_provider(
            AdaptiveShedderConfig::default(),
            Arc::new(FixedCpu(950)),
        );

        shedder
            .allow()
            .await
            .expect("allowed")
            .record_success()
            .await;

        assert_eq!(shedder.snapshot().await.cpu_usage_millis, 950);
    }
}