1use std::{future::Future, sync::Arc, time::Duration};
2
3use thiserror::Error;
4use tokio::sync::Mutex;
5
6use crate::resil::{
7 SharedCpuUsageProvider, WindowConfig, WindowSnapshot, cpu::default_cpu_provider,
8 shedder_state::ShedderState,
9};
10
11#[derive(Debug, Clone, PartialEq, Eq)]
13pub struct AdaptiveShedderConfig {
14 pub max_in_flight: usize,
16 pub min_request_count: u64,
18 pub max_latency: Duration,
20 pub overload_in_flight_percent: u8,
22 pub window: WindowConfig,
24 pub cpu_threshold_millis: u32,
26 pub cool_off: Duration,
28 pub min_overload_factor_percent: u8,
30}
31
32impl Default for AdaptiveShedderConfig {
33 fn default() -> Self {
34 Self {
35 max_in_flight: 1024,
36 min_request_count: 20,
37 max_latency: Duration::from_millis(250),
38 overload_in_flight_percent: 80,
39 window: WindowConfig::default(),
40 cpu_threshold_millis: 900,
41 cool_off: Duration::from_secs(1),
42 min_overload_factor_percent: 10,
43 }
44 }
45}
46
47#[derive(Debug, Clone, PartialEq)]
49pub struct ShedderSnapshot {
50 pub in_flight: usize,
52 pub avg_in_flight: f64,
54 pub cpu_usage_millis: u32,
56 pub window: WindowSnapshot,
58}
59
60#[derive(Debug, Error, Clone, Copy, PartialEq, Eq)]
62#[error("service overloaded")]
63pub struct ShedderError;
64
65pub struct AdaptiveShedder {
67 state: Arc<Mutex<ShedderState>>,
68 cpu: SharedCpuUsageProvider,
69}
70
71impl Clone for AdaptiveShedder {
72 fn clone(&self) -> Self {
73 Self {
74 state: self.state.clone(),
75 cpu: self.cpu.clone(),
76 }
77 }
78}
79
80impl std::fmt::Debug for AdaptiveShedder {
81 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 formatter
83 .debug_struct("AdaptiveShedder")
84 .finish_non_exhaustive()
85 }
86}
87
88impl AdaptiveShedder {
89 pub fn new(config: AdaptiveShedderConfig) -> Self {
91 Self::with_cpu_usage_provider(config, default_cpu_provider())
92 }
93
94 pub fn with_cpu_usage_provider(
96 config: AdaptiveShedderConfig,
97 cpu: SharedCpuUsageProvider,
98 ) -> Self {
99 Self {
100 state: Arc::new(Mutex::new(ShedderState::new(config))),
101 cpu,
102 }
103 }
104
105 pub async fn allow(&self) -> Result<ShedderGuard, ShedderError> {
107 let cpu = self.cpu.cpu_usage_millis();
108 self.state.lock().await.allow(cpu)?;
109 Ok(ShedderGuard {
110 shedder: self.clone(),
111 started_at: std::time::Instant::now(),
112 completed: false,
113 })
114 }
115
116 pub async fn run<F, T>(&self, future: F) -> Result<T, ShedderError>
118 where
119 F: Future<Output = T>,
120 {
121 let guard = self.allow().await?;
122 let value = future.await;
123 guard.record_success().await;
124 Ok(value)
125 }
126
127 pub async fn snapshot(&self) -> ShedderSnapshot {
129 self.state.lock().await.snapshot()
130 }
131
132 async fn record_success(&self, latency: Duration) {
133 self.state.lock().await.record_success(latency);
134 }
135
136 async fn record_failure(&self, latency: Duration) {
137 self.state.lock().await.record_failure(latency);
138 }
139}
140
141#[derive(Debug)]
143pub struct ShedderGuard {
144 shedder: AdaptiveShedder,
145 started_at: std::time::Instant,
146 completed: bool,
147}
148
149impl ShedderGuard {
150 pub async fn record_success(mut self) {
152 self.shedder.record_success(self.started_at.elapsed()).await;
153 self.completed = true;
154 }
155
156 pub async fn record_failure(mut self) {
158 self.shedder.record_failure(self.started_at.elapsed()).await;
159 self.completed = true;
160 }
161}
162
163impl Drop for ShedderGuard {
164 fn drop(&mut self) {
165 if !self.completed {
166 let shedder = self.shedder.clone();
167 let latency = self.started_at.elapsed();
168 if let Ok(handle) = tokio::runtime::Handle::try_current() {
169 handle.spawn(async move {
170 shedder.record_failure(latency).await;
171 });
172 }
173 }
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use std::sync::Arc;
180
181 use super::*;
182 use crate::resil::CpuUsageProvider;
183
184 struct FixedCpu(u32);
185
186 impl CpuUsageProvider for FixedCpu {
187 fn cpu_usage_millis(&self) -> u32 {
188 self.0
189 }
190 }
191
192 #[tokio::test]
193 async fn rejects_when_in_flight_is_full() {
194 let shedder = AdaptiveShedder::new(AdaptiveShedderConfig {
195 max_in_flight: 1,
196 ..AdaptiveShedderConfig::default()
197 });
198 let _guard = shedder.allow().await.expect("first");
199
200 assert!(shedder.allow().await.is_err());
201 }
202
203 #[tokio::test]
204 async fn recovers_after_request_finishes() {
205 let shedder = AdaptiveShedder::new(AdaptiveShedderConfig {
206 max_in_flight: 1,
207 ..AdaptiveShedderConfig::default()
208 });
209 shedder.allow().await.expect("first").record_success().await;
210
211 assert!(shedder.allow().await.is_ok());
212 }
213
214 #[tokio::test]
215 async fn cpu_snapshot_is_recorded() {
216 let shedder = AdaptiveShedder::with_cpu_usage_provider(
217 AdaptiveShedderConfig::default(),
218 Arc::new(FixedCpu(950)),
219 );
220
221 shedder
222 .allow()
223 .await
224 .expect("allowed")
225 .record_success()
226 .await;
227
228 assert_eq!(shedder.snapshot().await.cpu_usage_millis, 950);
229 }
230}