1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::time::Duration;
8
9#[derive(Debug, Clone)]
11pub struct BackpressureConfig {
12 pub queue_depth_warning: usize,
14 pub queue_depth_critical: usize,
16 pub memory_warning_pct: u8,
18 pub memory_critical_pct: u8,
20 pub pending_io_warning: usize,
22 pub pending_io_critical: usize,
24 pub sample_interval: Duration,
26}
27
28impl Default for BackpressureConfig {
29 fn default() -> Self {
30 Self {
31 queue_depth_warning: 1000,
32 queue_depth_critical: 5000,
33 memory_warning_pct: 70,
34 memory_critical_pct: 90,
35 pending_io_warning: 100,
36 pending_io_critical: 500,
37 sample_interval: Duration::from_millis(100),
38 }
39 }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum BackpressureLevel {
45 None,
47 Light,
49 Moderate,
51 Heavy,
53 Critical,
55}
56
57impl BackpressureLevel {
58 #[must_use]
60 pub fn delay_multiplier(&self) -> f64 {
61 match self {
62 BackpressureLevel::None => 1.0,
63 BackpressureLevel::Light => 1.5,
64 BackpressureLevel::Moderate => 2.0,
65 BackpressureLevel::Heavy => 4.0,
66 BackpressureLevel::Critical => 10.0,
67 }
68 }
69
70 #[must_use]
72 pub fn accepts_work(&self) -> bool {
73 !matches!(self, BackpressureLevel::Critical)
74 }
75
76 #[must_use]
78 pub fn allows_background_work(&self) -> bool {
79 matches!(self, BackpressureLevel::None | BackpressureLevel::Light)
80 }
81}
82
83pub struct BackpressureMonitor {
85 config: BackpressureConfig,
86 queue_depth: AtomicU64,
88 pending_io: AtomicU64,
90 memory_bytes: AtomicU64,
92 memory_total: AtomicU64,
94 active: AtomicBool,
96 last_sample_ns: AtomicU64,
98}
99
100impl BackpressureMonitor {
101 #[must_use]
102 pub fn new(config: BackpressureConfig) -> Self {
103 Self {
104 config,
105 queue_depth: AtomicU64::new(0),
106 pending_io: AtomicU64::new(0),
107 memory_bytes: AtomicU64::new(0),
108 memory_total: AtomicU64::new(Self::detect_total_memory()),
109 active: AtomicBool::new(false),
110 last_sample_ns: AtomicU64::new(Self::now_ns()),
111 }
112 }
113
114 fn detect_total_memory() -> u64 {
115 #[cfg(target_os = "linux")]
117 {
118 if let Ok(content) = std::fs::read_to_string("/proc/meminfo") {
119 for line in content.lines() {
120 if line.starts_with("MemTotal:") {
121 let parts: Vec<&str> = line.split_whitespace().collect();
122 if parts.len() >= 2 {
123 if let Ok(kb) = parts[1].parse::<u64>() {
124 return kb * 1024;
125 }
126 }
127 }
128 }
129 }
130 }
131 8 * 1024 * 1024 * 1024
133 }
134
135 pub fn set_queue_depth(&self, depth: usize) {
137 self.queue_depth.store(depth as u64, Ordering::Relaxed);
138 }
139
140 pub fn increment_queue_depth(&self) {
142 self.queue_depth.fetch_add(1, Ordering::Relaxed);
143 }
144
145 pub fn decrement_queue_depth(&self) {
147 self.queue_depth.fetch_sub(1, Ordering::Relaxed);
148 }
149
150 pub fn set_pending_io(&self, count: usize) {
152 self.pending_io.store(count as u64, Ordering::Relaxed);
153 }
154
155 pub fn set_memory_usage(&self, bytes: u64) {
157 self.memory_bytes.store(bytes, Ordering::Relaxed);
158 }
159
160 #[must_use]
162 pub fn level(&self) -> BackpressureLevel {
163 #[allow(clippy::cast_possible_truncation)]
164 let queue = self.queue_depth.load(Ordering::Relaxed) as usize;
165 #[allow(clippy::cast_possible_truncation)]
166 let pending = self.pending_io.load(Ordering::Relaxed) as usize;
167 let mem_bytes = self.memory_bytes.load(Ordering::Relaxed);
168 let mem_total = self.memory_total.load(Ordering::Relaxed);
169 #[allow(clippy::cast_possible_truncation)]
170 let mem_pct = if mem_total > 0 {
171 ((mem_bytes * 100) / mem_total) as u8
172 } else {
173 0
174 };
175
176 if queue >= self.config.queue_depth_critical
178 || pending >= self.config.pending_io_critical
179 || mem_pct >= self.config.memory_critical_pct
180 {
181 return BackpressureLevel::Critical;
182 }
183
184 let mut score = 0u8;
186
187 if queue >= self.config.queue_depth_warning {
188 score += 2;
189 if queue >= self.config.queue_depth_critical / 2 {
190 score += 1;
191 }
192 }
193
194 if pending >= self.config.pending_io_warning {
195 score += 2;
196 if pending >= self.config.pending_io_critical / 2 {
197 score += 1;
198 }
199 }
200
201 if mem_pct >= self.config.memory_warning_pct {
202 score += 2;
203 if mem_pct
204 >= u8::midpoint(
205 self.config.memory_warning_pct,
206 self.config.memory_critical_pct,
207 )
208 {
209 score += 1;
210 }
211 }
212
213 match score {
214 0 => BackpressureLevel::None,
215 1..=2 => BackpressureLevel::Light,
216 3..=4 => BackpressureLevel::Moderate,
217 5..=6 => BackpressureLevel::Heavy,
218 _ => BackpressureLevel::Critical,
219 }
220 }
221
222 pub fn is_active(&self) -> bool {
224 let level = self.level();
225 let active = !matches!(level, BackpressureLevel::None);
226 self.active.store(active, Ordering::Relaxed);
227 active
228 }
229
230 #[must_use]
232 #[allow(clippy::cast_possible_truncation)]
233 pub fn snapshot(&self) -> BackpressureSnapshot {
234 BackpressureSnapshot {
235 queue_depth: self.queue_depth.load(Ordering::Relaxed) as usize,
236 pending_io: self.pending_io.load(Ordering::Relaxed) as usize,
237 memory_bytes: self.memory_bytes.load(Ordering::Relaxed),
238 memory_total: self.memory_total.load(Ordering::Relaxed),
239 level: self.level(),
240 }
241 }
242
243 #[allow(clippy::cast_possible_truncation)]
245 fn now_ns() -> u64 {
246 use std::time::{SystemTime, UNIX_EPOCH};
247 SystemTime::now()
248 .duration_since(UNIX_EPOCH)
249 .map(|d| d.as_nanos() as u64)
250 .unwrap_or(0)
251 }
252
253 #[allow(clippy::cast_possible_truncation)]
255 pub fn should_sample(&self) -> bool {
256 let now = Self::now_ns();
257 let last = self.last_sample_ns.load(Ordering::Relaxed);
258 let interval_ns = self.config.sample_interval.as_nanos() as u64;
259
260 if now.saturating_sub(last) >= interval_ns {
261 let _ = self.last_sample_ns.compare_exchange(
264 last,
265 now,
266 Ordering::Relaxed,
267 Ordering::Relaxed,
268 );
269 return true;
270 }
271 false
272 }
273}
274
275impl Default for BackpressureMonitor {
276 fn default() -> Self {
277 Self::new(BackpressureConfig::default())
278 }
279}
280
281#[derive(Debug, Clone)]
283pub struct BackpressureSnapshot {
284 pub queue_depth: usize,
285 pub pending_io: usize,
286 pub memory_bytes: u64,
287 pub memory_total: u64,
288 pub level: BackpressureLevel,
289}
290
291impl BackpressureSnapshot {
292 #[must_use]
294 #[allow(clippy::cast_possible_truncation)]
295 pub fn memory_pct(&self) -> u8 {
296 if self.memory_total > 0 {
297 ((self.memory_bytes * 100) / self.memory_total) as u8
298 } else {
299 0
300 }
301 }
302}
303
304pub struct BackpressureGuard<'a> {
306 monitor: &'a BackpressureMonitor,
307}
308
309impl<'a> BackpressureGuard<'a> {
310 pub fn new(monitor: &'a BackpressureMonitor) -> Self {
311 monitor.increment_queue_depth();
312 Self { monitor }
313 }
314}
315
316impl Drop for BackpressureGuard<'_> {
317 fn drop(&mut self) {
318 self.monitor.decrement_queue_depth();
319 }
320}
321
322#[cfg(test)]
323#[allow(clippy::float_cmp)]
324mod tests {
325 use super::*;
326
327 #[test]
328 fn test_backpressure_level_none() {
329 let monitor = BackpressureMonitor::default();
330 assert_eq!(monitor.level(), BackpressureLevel::None);
331 assert!(!monitor.is_active());
332 }
333
334 #[test]
335 fn test_backpressure_queue_warning() {
336 let config = BackpressureConfig {
337 queue_depth_warning: 10,
338 queue_depth_critical: 100,
339 ..Default::default()
340 };
341 let monitor = BackpressureMonitor::new(config);
342
343 monitor.set_queue_depth(15);
344 assert!(monitor.is_active());
345 assert!(matches!(
346 monitor.level(),
347 BackpressureLevel::Light | BackpressureLevel::Moderate
348 ));
349 }
350
351 #[test]
352 fn test_backpressure_critical() {
353 let config = BackpressureConfig {
354 queue_depth_warning: 10,
355 queue_depth_critical: 100,
356 ..Default::default()
357 };
358 let monitor = BackpressureMonitor::new(config);
359
360 monitor.set_queue_depth(100);
361 assert_eq!(monitor.level(), BackpressureLevel::Critical);
362 assert!(!monitor.level().accepts_work());
363 }
364
365 #[test]
366 fn test_backpressure_guard() {
367 let monitor = BackpressureMonitor::default();
368 assert_eq!(monitor.snapshot().queue_depth, 0);
369
370 {
371 let _guard = BackpressureGuard::new(&monitor);
372 assert_eq!(monitor.snapshot().queue_depth, 1);
373 }
374
375 assert_eq!(monitor.snapshot().queue_depth, 0);
376 }
377
378 #[test]
379 fn test_delay_multiplier() {
380 assert_eq!(BackpressureLevel::None.delay_multiplier(), 1.0);
381 assert!(BackpressureLevel::Critical.delay_multiplier() > 1.0);
382 }
383}