veda_rs/util/
backpressure.rs

1//! Backpressure control for rate limiting.
2
3use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use parking_lot::RwLock;
7
8/// Backpressure controller for automatic rate limiting
9#[derive(Debug)]
10pub struct BackpressureController {
11    max_queue_size: AtomicUsize,
12    current_queue_size: AtomicUsize,
13    throttle_rate: AtomicU64, // tasks per second, stored as f64 bits
14    last_update: RwLock<Instant>,
15    config: BackpressureConfig,
16}
17
18#[derive(Debug, Clone)]
19pub struct BackpressureConfig {
20    pub max_queue_size: usize,
21    pub target_latency_ms: u64,
22    pub rate_limit_per_sec: Option<f64>,
23    pub backoff_factor: f64,
24}
25
26impl Default for BackpressureConfig {
27    fn default() -> Self {
28        Self {
29            max_queue_size: 10_000,
30            target_latency_ms: 100,
31            rate_limit_per_sec: None,
32            backoff_factor: 0.5,
33        }
34    }
35}
36
37impl BackpressureController {
38    pub fn new(config: BackpressureConfig) -> Self {
39        let initial_rate = config.rate_limit_per_sec.unwrap_or(1000.0);
40        Self {
41            max_queue_size: AtomicUsize::new(config.max_queue_size),
42            current_queue_size: AtomicUsize::new(0),
43            throttle_rate: AtomicU64::new(initial_rate.to_bits()),
44            last_update: RwLock::new(Instant::now()),
45            config,
46        }
47    }
48    
49    /// Check if we should admit a new task
50    pub fn should_admit(&self) -> bool {
51        let current = self.current_queue_size.load(Ordering::Relaxed);
52        let max = self.max_queue_size.load(Ordering::Relaxed);
53        current < max
54    }
55    
56    /// Increment queue size when a task is enqueued
57    pub fn on_enqueue(&self) -> bool {
58        let current = self.current_queue_size.fetch_add(1, Ordering::Relaxed);
59        let max = self.max_queue_size.load(Ordering::Relaxed);
60        
61        if current >= max {
62            // Over capacity, reject
63            self.current_queue_size.fetch_sub(1, Ordering::Relaxed);
64            return false;
65        }
66        true
67    }
68    
69    /// Decrement queue size when a task completes
70    pub fn on_complete(&self) {
71        self.current_queue_size.fetch_sub(1, Ordering::Relaxed);
72    }
73    
74    /// Update throttle rate based on observed latency
75    pub fn update_rate(&self, observed_latency_ms: u64) {
76        let target = self.config.target_latency_ms;
77        
78        if observed_latency_ms > target {
79            // Latency too high, reduce rate
80            let current_rate = f64::from_bits(self.throttle_rate.load(Ordering::Relaxed));
81            let new_rate = current_rate * self.config.backoff_factor;
82            self.throttle_rate.store(new_rate.to_bits(), Ordering::Relaxed);
83        } else if observed_latency_ms < target / 2 {
84            // Latency low, can increase rate
85            let current_rate = f64::from_bits(self.throttle_rate.load(Ordering::Relaxed));
86            let new_rate = current_rate * (1.0 / self.config.backoff_factor);
87            
88            // Cap at configured limit if any
89            let new_rate = if let Some(limit) = self.config.rate_limit_per_sec {
90                new_rate.min(limit)
91            } else {
92                new_rate
93            };
94            
95            self.throttle_rate.store(new_rate.to_bits(), Ordering::Relaxed);
96        }
97        
98        *self.last_update.write() = Instant::now();
99    }
100    
101    /// Get current throttle rate (tasks per second)
102    pub fn current_rate(&self) -> f64 {
103        f64::from_bits(self.throttle_rate.load(Ordering::Relaxed))
104    }
105    
106    /// Get current queue size
107    pub fn queue_size(&self) -> usize {
108        self.current_queue_size.load(Ordering::Relaxed)
109    }
110    
111    /// Calculate delay needed before next task admission
112    pub fn compute_delay(&self) -> Option<Duration> {
113        let rate = self.current_rate();
114        if rate <= 0.0 {
115            return Some(Duration::from_millis(100));
116        }
117        
118        let elapsed = self.last_update.read().elapsed();
119        let interval = Duration::from_secs_f64(1.0 / rate);
120        
121        if elapsed < interval {
122            Some(interval - elapsed)
123        } else {
124            None
125        }
126    }
127    
128    /// Adjust max queue size dynamically
129    pub fn set_max_queue_size(&self, size: usize) {
130        self.max_queue_size.store(size, Ordering::Relaxed);
131    }
132}
133
134impl Default for BackpressureController {
135    fn default() -> Self {
136        Self::new(BackpressureConfig::default())
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    
144    #[test]
145    fn test_backpressure_admit() {
146        let config = BackpressureConfig {
147            max_queue_size: 10,
148            ..Default::default()
149        };
150        let controller = BackpressureController::new(config);
151        
152        // Should admit up to max
153        for _ in 0..10 {
154            assert!(controller.on_enqueue());
155        }
156        
157        // Should reject beyond max
158        assert!(!controller.on_enqueue());
159        
160        // Complete one task
161        controller.on_complete();
162        
163        // Should admit again
164        assert!(controller.on_enqueue());
165    }
166    
167    #[test]
168    fn test_rate_adjustment() {
169        let controller = BackpressureController::default();
170        let initial_rate = controller.current_rate();
171        
172        // High latency should reduce rate
173        controller.update_rate(200);
174        assert!(controller.current_rate() < initial_rate);
175        
176        // Low latency should increase rate
177        controller.update_rate(10);
178        assert!(controller.current_rate() > initial_rate * 0.5);
179    }
180    
181    #[test]
182    fn test_compute_delay() {
183        let config = BackpressureConfig {
184            rate_limit_per_sec: Some(10.0), // 10 tasks per second
185            ..Default::default()
186        };
187        let controller = BackpressureController::new(config);
188        
189        // First task should have no delay
190        let delay = controller.compute_delay();
191        assert!(delay.is_some());
192        
193        // Rate is 10/s, so interval is 100ms
194        if let Some(d) = delay {
195            assert!(d <= Duration::from_millis(100));
196        }
197    }
198}