rs-zero 0.2.8

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::time::Duration;

use crate::resil::{
    AdaptiveShedderConfig, RollingWindow, ShedderError, ShedderSnapshot, WindowOutcome,
};

const DEFAULT_MIN_RT: Duration = Duration::from_secs(1);
const CPU_MAX_MILLIS: u32 = 1000;
const OVERLOAD_FACTOR_LOWER_BOUND: f64 = 0.1;
const FLYING_BETA: f64 = 0.9;

#[derive(Debug)]
pub(crate) struct ShedderState {
    config: AdaptiveShedderConfig,
    in_flight: usize,
    avg_in_flight: f64,
    overload_started_at: Option<std::time::Instant>,
    dropped_recently: bool,
    last_cpu_usage_millis: u32,
    window: RollingWindow,
}

impl ShedderState {
    pub(crate) fn new(config: AdaptiveShedderConfig) -> Self {
        Self {
            window: RollingWindow::new(config.window),
            config: AdaptiveShedderConfig {
                max_in_flight: config.max_in_flight.max(1),
                overload_in_flight_percent: config.overload_in_flight_percent.min(100),
                cpu_threshold_millis: config.cpu_threshold_millis.min(1000),
                min_overload_factor_percent: config.min_overload_factor_percent.min(100),
                ..config
            },
            in_flight: 0,
            avg_in_flight: 0.0,
            overload_started_at: None,
            dropped_recently: false,
            last_cpu_usage_millis: 0,
        }
    }

    pub(crate) fn allow(&mut self, cpu_usage_millis: u32) -> Result<(), ShedderError> {
        self.last_cpu_usage_millis = cpu_usage_millis;
        if self.should_drop(cpu_usage_millis) {
            self.window.record(WindowOutcome::Drop);
            self.dropped_recently = true;
            return Err(ShedderError);
        }

        self.in_flight += 1;
        Ok(())
    }

    pub(crate) fn record_success(&mut self, latency: Duration) {
        self.in_flight = self.in_flight.saturating_sub(1);
        self.update_average_in_flight();
        self.window
            .record_with_latency(WindowOutcome::Success, latency);
    }

    pub(crate) fn record_failure(&mut self, latency: Duration) {
        self.in_flight = self.in_flight.saturating_sub(1);
        self.update_average_in_flight();
        self.window
            .record_with_latency(WindowOutcome::Failure, latency);
    }

    pub(crate) fn snapshot(&self) -> ShedderSnapshot {
        ShedderSnapshot {
            in_flight: self.in_flight,
            avg_in_flight: self.avg_in_flight,
            cpu_usage_millis: self.last_cpu_usage_millis,
            window: self.window.snapshot(),
        }
    }

    fn should_drop(&mut self, cpu_usage_millis: u32) -> bool {
        if self.in_flight >= self.config.max_in_flight {
            return true;
        }

        let snapshot = self.window.snapshot();
        if snapshot.total() < self.config.min_request_count {
            return false;
        }

        let average_latency = snapshot.average_latency();
        let overloaded = self.system_overloaded(cpu_usage_millis)
            || self.still_hot()
            || average_latency.is_some_and(|latency| latency >= self.config.max_latency);
        if !overloaded {
            self.overload_started_at = None;
            self.dropped_recently = false;
            return false;
        }

        self.overload_started_at
            .get_or_insert_with(std::time::Instant::now);
        self.high_throughput(cpu_usage_millis)
    }

    fn high_throughput(&self, cpu_usage_millis: u32) -> bool {
        let max_flight = self.max_flight() * self.overload_factor(cpu_usage_millis);
        self.avg_in_flight > max_flight && self.in_flight as f64 > max_flight
    }

    fn max_flight(&self) -> f64 {
        let max_pass = self.window.max_successes_per_bucket().max(1) as f64;
        let min_rt = self.window.min_average_latency().unwrap_or(DEFAULT_MIN_RT);
        let bucket_duration = self
            .config
            .window
            .bucket_duration
            .max(Duration::from_millis(1));
        let window_scale = 1.0 / bucket_duration.as_secs_f64() / 1000.0;

        (max_pass * min_rt.as_secs_f64() * 1000.0 * window_scale).max(1.0)
    }

    fn overload_factor(&self, cpu_usage_millis: u32) -> f64 {
        if cpu_usage_millis < self.config.cpu_threshold_millis {
            return 1.0;
        }

        let threshold = self.config.cpu_threshold_millis.min(CPU_MAX_MILLIS - 1);
        let denominator = CPU_MAX_MILLIS.saturating_sub(threshold).max(1) as f64;
        let raw = CPU_MAX_MILLIS.saturating_sub(cpu_usage_millis.min(CPU_MAX_MILLIS)) as f64
            / denominator;
        raw.clamp(self.min_overload_factor(), 1.0)
    }

    fn min_overload_factor(&self) -> f64 {
        (self.config.min_overload_factor_percent as f64 / 100.0)
            .clamp(OVERLOAD_FACTOR_LOWER_BOUND, 1.0)
    }

    fn system_overloaded(&mut self, cpu_usage_millis: u32) -> bool {
        if cpu_usage_millis >= self.config.cpu_threshold_millis {
            self.overload_started_at
                .get_or_insert_with(std::time::Instant::now);
            true
        } else {
            false
        }
    }

    fn still_hot(&self) -> bool {
        self.dropped_recently
            && self
                .overload_started_at
                .is_some_and(|started| started.elapsed() < self.config.cool_off)
    }

    fn update_average_in_flight(&mut self) {
        self.avg_in_flight = self.avg_in_flight * FLYING_BETA + self.in_flight as f64 * 0.1;
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use super::*;
    use crate::resil::AdaptiveShedderConfig;

    #[test]
    fn latency_overload_can_reject() {
        let mut state = ShedderState::new(AdaptiveShedderConfig {
            max_in_flight: 10,
            min_request_count: 1,
            max_latency: Duration::from_nanos(1),
            ..AdaptiveShedderConfig::default()
        });
        state
            .window
            .record_with_latency(WindowOutcome::Success, Duration::from_millis(1));
        state.avg_in_flight = 2.0;
        state.in_flight = 2;

        assert!(state.should_drop(0));
    }

    #[test]
    fn high_cpu_and_throughput_can_reject() {
        let mut state = ShedderState::new(AdaptiveShedderConfig {
            max_in_flight: 10,
            min_request_count: 1,
            cpu_threshold_millis: 800,
            ..AdaptiveShedderConfig::default()
        });
        state
            .window
            .record_with_latency(WindowOutcome::Success, Duration::from_millis(1));
        state.avg_in_flight = 2.0;
        state.in_flight = 2;

        assert!(state.should_drop(950));
    }
}