veda_rs/util/
backpressure.rs1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use parking_lot::RwLock;
7
8#[derive(Debug)]
10pub struct BackpressureController {
11 max_queue_size: AtomicUsize,
12 current_queue_size: AtomicUsize,
13 throttle_rate: AtomicU64, last_update: RwLock<Instant>,
15 config: BackpressureConfig,
16}
17
18#[derive(Debug, Clone)]
19pub struct BackpressureConfig {
20 pub max_queue_size: usize,
21 pub target_latency_ms: u64,
22 pub rate_limit_per_sec: Option<f64>,
23 pub backoff_factor: f64,
24}
25
26impl Default for BackpressureConfig {
27 fn default() -> Self {
28 Self {
29 max_queue_size: 10_000,
30 target_latency_ms: 100,
31 rate_limit_per_sec: None,
32 backoff_factor: 0.5,
33 }
34 }
35}
36
37impl BackpressureController {
38 pub fn new(config: BackpressureConfig) -> Self {
39 let initial_rate = config.rate_limit_per_sec.unwrap_or(1000.0);
40 Self {
41 max_queue_size: AtomicUsize::new(config.max_queue_size),
42 current_queue_size: AtomicUsize::new(0),
43 throttle_rate: AtomicU64::new(initial_rate.to_bits()),
44 last_update: RwLock::new(Instant::now()),
45 config,
46 }
47 }
48
49 pub fn should_admit(&self) -> bool {
51 let current = self.current_queue_size.load(Ordering::Relaxed);
52 let max = self.max_queue_size.load(Ordering::Relaxed);
53 current < max
54 }
55
56 pub fn on_enqueue(&self) -> bool {
58 let current = self.current_queue_size.fetch_add(1, Ordering::Relaxed);
59 let max = self.max_queue_size.load(Ordering::Relaxed);
60
61 if current >= max {
62 self.current_queue_size.fetch_sub(1, Ordering::Relaxed);
64 return false;
65 }
66 true
67 }
68
69 pub fn on_complete(&self) {
71 self.current_queue_size.fetch_sub(1, Ordering::Relaxed);
72 }
73
74 pub fn update_rate(&self, observed_latency_ms: u64) {
76 let target = self.config.target_latency_ms;
77
78 if observed_latency_ms > target {
79 let current_rate = f64::from_bits(self.throttle_rate.load(Ordering::Relaxed));
81 let new_rate = current_rate * self.config.backoff_factor;
82 self.throttle_rate.store(new_rate.to_bits(), Ordering::Relaxed);
83 } else if observed_latency_ms < target / 2 {
84 let current_rate = f64::from_bits(self.throttle_rate.load(Ordering::Relaxed));
86 let new_rate = current_rate * (1.0 / self.config.backoff_factor);
87
88 let new_rate = if let Some(limit) = self.config.rate_limit_per_sec {
90 new_rate.min(limit)
91 } else {
92 new_rate
93 };
94
95 self.throttle_rate.store(new_rate.to_bits(), Ordering::Relaxed);
96 }
97
98 *self.last_update.write() = Instant::now();
99 }
100
101 pub fn current_rate(&self) -> f64 {
103 f64::from_bits(self.throttle_rate.load(Ordering::Relaxed))
104 }
105
106 pub fn queue_size(&self) -> usize {
108 self.current_queue_size.load(Ordering::Relaxed)
109 }
110
111 pub fn compute_delay(&self) -> Option<Duration> {
113 let rate = self.current_rate();
114 if rate <= 0.0 {
115 return Some(Duration::from_millis(100));
116 }
117
118 let elapsed = self.last_update.read().elapsed();
119 let interval = Duration::from_secs_f64(1.0 / rate);
120
121 if elapsed < interval {
122 Some(interval - elapsed)
123 } else {
124 None
125 }
126 }
127
128 pub fn set_max_queue_size(&self, size: usize) {
130 self.max_queue_size.store(size, Ordering::Relaxed);
131 }
132}
133
134impl Default for BackpressureController {
135 fn default() -> Self {
136 Self::new(BackpressureConfig::default())
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143
144 #[test]
145 fn test_backpressure_admit() {
146 let config = BackpressureConfig {
147 max_queue_size: 10,
148 ..Default::default()
149 };
150 let controller = BackpressureController::new(config);
151
152 for _ in 0..10 {
154 assert!(controller.on_enqueue());
155 }
156
157 assert!(!controller.on_enqueue());
159
160 controller.on_complete();
162
163 assert!(controller.on_enqueue());
165 }
166
167 #[test]
168 fn test_rate_adjustment() {
169 let controller = BackpressureController::default();
170 let initial_rate = controller.current_rate();
171
172 controller.update_rate(200);
174 assert!(controller.current_rate() < initial_rate);
175
176 controller.update_rate(10);
178 assert!(controller.current_rate() > initial_rate * 0.5);
179 }
180
181 #[test]
182 fn test_compute_delay() {
183 let config = BackpressureConfig {
184 rate_limit_per_sec: Some(10.0), ..Default::default()
186 };
187 let controller = BackpressureController::new(config);
188
189 let delay = controller.compute_delay();
191 assert!(delay.is_some());
192
193 if let Some(d) = delay {
195 assert!(d <= Duration::from_millis(100));
196 }
197 }
198}