Skip to main content

lnc_core/
backpressure.rs

1//! Backpressure monitoring and flow control.
2//!
3//! Tracks system resource utilization and provides signals for adaptive
4//! rate limiting to prevent overload and maintain stable performance.
5
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::time::Duration;
8
9/// Thresholds for backpressure activation
10#[derive(Debug, Clone)]
11pub struct BackpressureConfig {
12    /// Queue depth threshold to start applying backpressure
13    pub queue_depth_warning: usize,
14    /// Queue depth threshold for critical backpressure
15    pub queue_depth_critical: usize,
16    /// Memory usage percentage to start backpressure (0-100)
17    pub memory_warning_pct: u8,
18    /// Memory usage percentage for critical backpressure (0-100)
19    pub memory_critical_pct: u8,
20    /// Pending I/O operations threshold
21    pub pending_io_warning: usize,
22    /// Pending I/O operations critical threshold
23    pub pending_io_critical: usize,
24    /// Sampling interval for metrics
25    pub sample_interval: Duration,
26}
27
28impl Default for BackpressureConfig {
29    fn default() -> Self {
30        Self {
31            queue_depth_warning: 1000,
32            queue_depth_critical: 5000,
33            memory_warning_pct: 70,
34            memory_critical_pct: 90,
35            pending_io_warning: 100,
36            pending_io_critical: 500,
37            sample_interval: Duration::from_millis(100),
38        }
39    }
40}
41
42/// Current backpressure level
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum BackpressureLevel {
45    /// No backpressure - operate normally
46    None,
47    /// Light backpressure - slow down non-critical work
48    Light,
49    /// Moderate backpressure - reduce throughput
50    Moderate,
51    /// Heavy backpressure - only critical operations
52    Heavy,
53    /// Critical - reject new work
54    Critical,
55}
56
57impl BackpressureLevel {
58    /// Get a delay multiplier for rate limiting (1.0 = no delay)
59    #[must_use]
60    pub fn delay_multiplier(&self) -> f64 {
61        match self {
62            BackpressureLevel::None => 1.0,
63            BackpressureLevel::Light => 1.5,
64            BackpressureLevel::Moderate => 2.0,
65            BackpressureLevel::Heavy => 4.0,
66            BackpressureLevel::Critical => 10.0,
67        }
68    }
69
70    /// Check if new work should be accepted
71    #[must_use]
72    pub fn accepts_work(&self) -> bool {
73        !matches!(self, BackpressureLevel::Critical)
74    }
75
76    /// Check if background work should proceed
77    #[must_use]
78    pub fn allows_background_work(&self) -> bool {
79        matches!(self, BackpressureLevel::None | BackpressureLevel::Light)
80    }
81}
82
83/// Backpressure monitor tracking system resource utilization
84pub struct BackpressureMonitor {
85    config: BackpressureConfig,
86    /// Current queue depth
87    queue_depth: AtomicU64,
88    /// Pending I/O operations
89    pending_io: AtomicU64,
90    /// Memory bytes in use
91    memory_bytes: AtomicU64,
92    /// Total memory available
93    memory_total: AtomicU64,
94    /// Whether backpressure is currently active
95    active: AtomicBool,
96    /// Last sample time (stored as epoch nanoseconds for lock-free access)
97    last_sample_ns: AtomicU64,
98}
99
100impl BackpressureMonitor {
101    #[must_use]
102    pub fn new(config: BackpressureConfig) -> Self {
103        Self {
104            config,
105            queue_depth: AtomicU64::new(0),
106            pending_io: AtomicU64::new(0),
107            memory_bytes: AtomicU64::new(0),
108            memory_total: AtomicU64::new(Self::detect_total_memory()),
109            active: AtomicBool::new(false),
110            last_sample_ns: AtomicU64::new(Self::now_ns()),
111        }
112    }
113
114    fn detect_total_memory() -> u64 {
115        // Try to detect system memory
116        #[cfg(target_os = "linux")]
117        {
118            if let Ok(content) = std::fs::read_to_string("/proc/meminfo") {
119                for line in content.lines() {
120                    if line.starts_with("MemTotal:") {
121                        let parts: Vec<&str> = line.split_whitespace().collect();
122                        if parts.len() >= 2 {
123                            if let Ok(kb) = parts[1].parse::<u64>() {
124                                return kb * 1024;
125                            }
126                        }
127                    }
128                }
129            }
130        }
131        // Default: 8GB
132        8 * 1024 * 1024 * 1024
133    }
134
135    /// Update queue depth metric
136    pub fn set_queue_depth(&self, depth: usize) {
137        self.queue_depth.store(depth as u64, Ordering::Relaxed);
138    }
139
140    /// Increment queue depth
141    pub fn increment_queue_depth(&self) {
142        self.queue_depth.fetch_add(1, Ordering::Relaxed);
143    }
144
145    /// Decrement queue depth
146    pub fn decrement_queue_depth(&self) {
147        self.queue_depth.fetch_sub(1, Ordering::Relaxed);
148    }
149
150    /// Update pending I/O count
151    pub fn set_pending_io(&self, count: usize) {
152        self.pending_io.store(count as u64, Ordering::Relaxed);
153    }
154
155    /// Update memory usage
156    pub fn set_memory_usage(&self, bytes: u64) {
157        self.memory_bytes.store(bytes, Ordering::Relaxed);
158    }
159
160    /// Get current backpressure level
161    #[must_use]
162    pub fn level(&self) -> BackpressureLevel {
163        #[allow(clippy::cast_possible_truncation)]
164        let queue = self.queue_depth.load(Ordering::Relaxed) as usize;
165        #[allow(clippy::cast_possible_truncation)]
166        let pending = self.pending_io.load(Ordering::Relaxed) as usize;
167        let mem_bytes = self.memory_bytes.load(Ordering::Relaxed);
168        let mem_total = self.memory_total.load(Ordering::Relaxed);
169        #[allow(clippy::cast_possible_truncation)]
170        let mem_pct = if mem_total > 0 {
171            ((mem_bytes * 100) / mem_total) as u8
172        } else {
173            0
174        };
175
176        // Check critical thresholds first
177        if queue >= self.config.queue_depth_critical
178            || pending >= self.config.pending_io_critical
179            || mem_pct >= self.config.memory_critical_pct
180        {
181            return BackpressureLevel::Critical;
182        }
183
184        // Calculate severity score
185        let mut score = 0u8;
186
187        if queue >= self.config.queue_depth_warning {
188            score += 2;
189            if queue >= self.config.queue_depth_critical / 2 {
190                score += 1;
191            }
192        }
193
194        if pending >= self.config.pending_io_warning {
195            score += 2;
196            if pending >= self.config.pending_io_critical / 2 {
197                score += 1;
198            }
199        }
200
201        if mem_pct >= self.config.memory_warning_pct {
202            score += 2;
203            if mem_pct
204                >= u8::midpoint(
205                    self.config.memory_warning_pct,
206                    self.config.memory_critical_pct,
207                )
208            {
209                score += 1;
210            }
211        }
212
213        match score {
214            0 => BackpressureLevel::None,
215            1..=2 => BackpressureLevel::Light,
216            3..=4 => BackpressureLevel::Moderate,
217            5..=6 => BackpressureLevel::Heavy,
218            _ => BackpressureLevel::Critical,
219        }
220    }
221
222    /// Check if backpressure is currently active
223    pub fn is_active(&self) -> bool {
224        let level = self.level();
225        let active = !matches!(level, BackpressureLevel::None);
226        self.active.store(active, Ordering::Relaxed);
227        active
228    }
229
230    /// Get current metrics snapshot
231    #[must_use]
232    #[allow(clippy::cast_possible_truncation)]
233    pub fn snapshot(&self) -> BackpressureSnapshot {
234        BackpressureSnapshot {
235            queue_depth: self.queue_depth.load(Ordering::Relaxed) as usize,
236            pending_io: self.pending_io.load(Ordering::Relaxed) as usize,
237            memory_bytes: self.memory_bytes.load(Ordering::Relaxed),
238            memory_total: self.memory_total.load(Ordering::Relaxed),
239            level: self.level(),
240        }
241    }
242
243    /// Get current time as nanoseconds since an arbitrary epoch
244    #[allow(clippy::cast_possible_truncation)]
245    fn now_ns() -> u64 {
246        use std::time::{SystemTime, UNIX_EPOCH};
247        SystemTime::now()
248            .duration_since(UNIX_EPOCH)
249            .map(|d| d.as_nanos() as u64)
250            .unwrap_or(0)
251    }
252
253    /// Should we sample metrics now? (lock-free implementation)
254    #[allow(clippy::cast_possible_truncation)]
255    pub fn should_sample(&self) -> bool {
256        let now = Self::now_ns();
257        let last = self.last_sample_ns.load(Ordering::Relaxed);
258        let interval_ns = self.config.sample_interval.as_nanos() as u64;
259
260        if now.saturating_sub(last) >= interval_ns {
261            // Try to atomically update the last sample time
262            // If another thread beat us, that's fine - we skip this sample
263            let _ = self.last_sample_ns.compare_exchange(
264                last,
265                now,
266                Ordering::Relaxed,
267                Ordering::Relaxed,
268            );
269            return true;
270        }
271        false
272    }
273}
274
275impl Default for BackpressureMonitor {
276    fn default() -> Self {
277        Self::new(BackpressureConfig::default())
278    }
279}
280
281/// Snapshot of backpressure metrics
282#[derive(Debug, Clone)]
283pub struct BackpressureSnapshot {
284    pub queue_depth: usize,
285    pub pending_io: usize,
286    pub memory_bytes: u64,
287    pub memory_total: u64,
288    pub level: BackpressureLevel,
289}
290
291impl BackpressureSnapshot {
292    /// Memory usage percentage
293    #[must_use]
294    #[allow(clippy::cast_possible_truncation)]
295    pub fn memory_pct(&self) -> u8 {
296        if self.memory_total > 0 {
297            ((self.memory_bytes * 100) / self.memory_total) as u8
298        } else {
299            0
300        }
301    }
302}
303
304/// Guard that tracks an operation for backpressure
305pub struct BackpressureGuard<'a> {
306    monitor: &'a BackpressureMonitor,
307}
308
309impl<'a> BackpressureGuard<'a> {
310    pub fn new(monitor: &'a BackpressureMonitor) -> Self {
311        monitor.increment_queue_depth();
312        Self { monitor }
313    }
314}
315
316impl Drop for BackpressureGuard<'_> {
317    fn drop(&mut self) {
318        self.monitor.decrement_queue_depth();
319    }
320}
321
322#[cfg(test)]
323#[allow(clippy::float_cmp)]
324mod tests {
325    use super::*;
326
327    #[test]
328    fn test_backpressure_level_none() {
329        let monitor = BackpressureMonitor::default();
330        assert_eq!(monitor.level(), BackpressureLevel::None);
331        assert!(!monitor.is_active());
332    }
333
334    #[test]
335    fn test_backpressure_queue_warning() {
336        let config = BackpressureConfig {
337            queue_depth_warning: 10,
338            queue_depth_critical: 100,
339            ..Default::default()
340        };
341        let monitor = BackpressureMonitor::new(config);
342
343        monitor.set_queue_depth(15);
344        assert!(monitor.is_active());
345        assert!(matches!(
346            monitor.level(),
347            BackpressureLevel::Light | BackpressureLevel::Moderate
348        ));
349    }
350
351    #[test]
352    fn test_backpressure_critical() {
353        let config = BackpressureConfig {
354            queue_depth_warning: 10,
355            queue_depth_critical: 100,
356            ..Default::default()
357        };
358        let monitor = BackpressureMonitor::new(config);
359
360        monitor.set_queue_depth(100);
361        assert_eq!(monitor.level(), BackpressureLevel::Critical);
362        assert!(!monitor.level().accepts_work());
363    }
364
365    #[test]
366    fn test_backpressure_guard() {
367        let monitor = BackpressureMonitor::default();
368        assert_eq!(monitor.snapshot().queue_depth, 0);
369
370        {
371            let _guard = BackpressureGuard::new(&monitor);
372            assert_eq!(monitor.snapshot().queue_depth, 1);
373        }
374
375        assert_eq!(monitor.snapshot().queue_depth, 0);
376    }
377
378    #[test]
379    fn test_delay_multiplier() {
380        assert_eq!(BackpressureLevel::None.delay_multiplier(), 1.0);
381        assert!(BackpressureLevel::Critical.delay_multiplier() > 1.0);
382    }
383}