Skip to main content

rs_zero/resil/
shedder.rs

1use std::{future::Future, sync::Arc, time::Duration};
2
3use thiserror::Error;
4use tokio::sync::Mutex;
5
6use crate::resil::{
7    SharedCpuUsageProvider, WindowConfig, WindowSnapshot, cpu::default_cpu_provider,
8    shedder_state::ShedderState,
9};
10
11/// Adaptive shedder configuration.
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub struct AdaptiveShedderConfig {
14    /// Maximum in-flight operations before immediate rejection.
15    pub max_in_flight: usize,
16    /// Minimum samples before latency-based shedding can reject requests.
17    pub min_request_count: u64,
18    /// Average latency threshold used for overload decisions.
19    pub max_latency: Duration,
20    /// In-flight percentage of `max_in_flight` required before latency overload drops.
21    pub overload_in_flight_percent: u8,
22    /// Rolling window used for latency and throughput samples.
23    pub window: WindowConfig,
24    /// CPU usage threshold in millicpu-style units where `1000` means 100%.
25    pub cpu_threshold_millis: u32,
26    /// Duration during which a recent drop keeps the shedder conservative.
27    pub cool_off: Duration,
28    /// Minimum overload factor when CPU is above threshold, in percent.
29    pub min_overload_factor_percent: u8,
30}
31
32impl Default for AdaptiveShedderConfig {
33    fn default() -> Self {
34        Self {
35            max_in_flight: 1024,
36            min_request_count: 20,
37            max_latency: Duration::from_millis(250),
38            overload_in_flight_percent: 80,
39            window: WindowConfig::default(),
40            cpu_threshold_millis: 900,
41            cool_off: Duration::from_secs(1),
42            min_overload_factor_percent: 10,
43        }
44    }
45}
46
47/// Snapshot of the current shedder state.
48#[derive(Debug, Clone, PartialEq)]
49pub struct ShedderSnapshot {
50    /// Current in-flight operations.
51    pub in_flight: usize,
52    /// Smoothed in-flight estimate updated when calls finish.
53    pub avg_in_flight: f64,
54    /// Last CPU usage sample in millicpu-style units.
55    pub cpu_usage_millis: u32,
56    /// Rolling window statistics.
57    pub window: WindowSnapshot,
58}
59
60/// Error returned when adaptive shedding rejects an operation.
61#[derive(Debug, Error, Clone, Copy, PartialEq, Eq)]
62#[error("service overloaded")]
63pub struct ShedderError;
64
65/// Lightweight adaptive shedder using in-flight and rolling latency signals.
66pub struct AdaptiveShedder {
67    state: Arc<Mutex<ShedderState>>,
68    cpu: SharedCpuUsageProvider,
69}
70
71impl Clone for AdaptiveShedder {
72    fn clone(&self) -> Self {
73        Self {
74            state: self.state.clone(),
75            cpu: self.cpu.clone(),
76        }
77    }
78}
79
80impl std::fmt::Debug for AdaptiveShedder {
81    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        formatter
83            .debug_struct("AdaptiveShedder")
84            .finish_non_exhaustive()
85    }
86}
87
88impl AdaptiveShedder {
89    /// Creates a shedder from configuration.
90    pub fn new(config: AdaptiveShedderConfig) -> Self {
91        Self::with_cpu_usage_provider(config, default_cpu_provider())
92    }
93
94    /// Creates a shedder with an explicit CPU usage provider.
95    pub fn with_cpu_usage_provider(
96        config: AdaptiveShedderConfig,
97        cpu: SharedCpuUsageProvider,
98    ) -> Self {
99        Self {
100            state: Arc::new(Mutex::new(ShedderState::new(config))),
101            cpu,
102        }
103    }
104
105    /// Attempts to enter the protected operation.
106    pub async fn allow(&self) -> Result<ShedderGuard, ShedderError> {
107        let cpu = self.cpu.cpu_usage_millis();
108        self.state.lock().await.allow(cpu)?;
109        Ok(ShedderGuard {
110            shedder: self.clone(),
111            started_at: std::time::Instant::now(),
112            completed: false,
113        })
114    }
115
116    /// Runs a future after acquiring shedder capacity.
117    pub async fn run<F, T>(&self, future: F) -> Result<T, ShedderError>
118    where
119        F: Future<Output = T>,
120    {
121        let guard = self.allow().await?;
122        let value = future.await;
123        guard.record_success().await;
124        Ok(value)
125    }
126
127    /// Returns a snapshot of the current shedder state.
128    pub async fn snapshot(&self) -> ShedderSnapshot {
129        self.state.lock().await.snapshot()
130    }
131
132    async fn record_success(&self, latency: Duration) {
133        self.state.lock().await.record_success(latency);
134    }
135
136    async fn record_failure(&self, latency: Duration) {
137        self.state.lock().await.record_failure(latency);
138    }
139}
140
141/// Guard returned by [`AdaptiveShedder::allow`].
142#[derive(Debug)]
143pub struct ShedderGuard {
144    shedder: AdaptiveShedder,
145    started_at: std::time::Instant,
146    completed: bool,
147}
148
149impl ShedderGuard {
150    /// Marks the protected operation as successful.
151    pub async fn record_success(mut self) {
152        self.shedder.record_success(self.started_at.elapsed()).await;
153        self.completed = true;
154    }
155
156    /// Marks the protected operation as failed.
157    pub async fn record_failure(mut self) {
158        self.shedder.record_failure(self.started_at.elapsed()).await;
159        self.completed = true;
160    }
161}
162
163impl Drop for ShedderGuard {
164    fn drop(&mut self) {
165        if !self.completed {
166            let shedder = self.shedder.clone();
167            let latency = self.started_at.elapsed();
168            if let Ok(handle) = tokio::runtime::Handle::try_current() {
169                handle.spawn(async move {
170                    shedder.record_failure(latency).await;
171                });
172            }
173        }
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use std::sync::Arc;
180
181    use super::*;
182    use crate::resil::CpuUsageProvider;
183
184    struct FixedCpu(u32);
185
186    impl CpuUsageProvider for FixedCpu {
187        fn cpu_usage_millis(&self) -> u32 {
188            self.0
189        }
190    }
191
192    #[tokio::test]
193    async fn rejects_when_in_flight_is_full() {
194        let shedder = AdaptiveShedder::new(AdaptiveShedderConfig {
195            max_in_flight: 1,
196            ..AdaptiveShedderConfig::default()
197        });
198        let _guard = shedder.allow().await.expect("first");
199
200        assert!(shedder.allow().await.is_err());
201    }
202
203    #[tokio::test]
204    async fn recovers_after_request_finishes() {
205        let shedder = AdaptiveShedder::new(AdaptiveShedderConfig {
206            max_in_flight: 1,
207            ..AdaptiveShedderConfig::default()
208        });
209        shedder.allow().await.expect("first").record_success().await;
210
211        assert!(shedder.allow().await.is_ok());
212    }
213
214    #[tokio::test]
215    async fn cpu_snapshot_is_recorded() {
216        let shedder = AdaptiveShedder::with_cpu_usage_provider(
217            AdaptiveShedderConfig::default(),
218            Arc::new(FixedCpu(950)),
219        );
220
221        shedder
222            .allow()
223            .await
224            .expect("allowed")
225            .record_success()
226            .await;
227
228        assert_eq!(shedder.snapshot().await.cpu_usage_millis, 950);
229    }
230}