datasynth_core/streaming/
backpressure.rs1use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9use crate::traits::BackpressureStrategy;
10
11#[derive(Debug)]
13pub struct BackpressureMonitor {
14 strategy: BackpressureStrategy,
16 capacity: usize,
18 current_fill: AtomicU64,
20 high_watermark: f64,
22 low_watermark: f64,
24 items_dropped: AtomicU64,
26 blocked_time_ns: AtomicU64,
28 backpressure_events: AtomicU64,
30}
31
32impl BackpressureMonitor {
33 pub fn new(strategy: BackpressureStrategy, capacity: usize) -> Self {
35 Self {
36 strategy,
37 capacity,
38 current_fill: AtomicU64::new(0),
39 high_watermark: 0.8,
40 low_watermark: 0.5,
41 items_dropped: AtomicU64::new(0),
42 blocked_time_ns: AtomicU64::new(0),
43 backpressure_events: AtomicU64::new(0),
44 }
45 }
46
47 pub fn with_watermarks(mut self, high: f64, low: f64) -> Self {
49 self.high_watermark = high.clamp(0.0, 1.0);
50 self.low_watermark = low.clamp(0.0, self.high_watermark);
51 self
52 }
53
54 pub fn update_fill(&self, current: usize) {
56 self.current_fill.store(current as u64, Ordering::Relaxed);
57 }
58
59 pub fn fill_ratio(&self) -> f64 {
61 self.current_fill.load(Ordering::Relaxed) as f64 / self.capacity as f64
62 }
63
64 pub fn should_apply_backpressure(&self) -> bool {
66 self.fill_ratio() >= self.high_watermark
67 }
68
69 pub fn has_recovered(&self) -> bool {
71 self.fill_ratio() <= self.low_watermark
72 }
73
74 pub fn record_backpressure(&self) {
76 self.backpressure_events.fetch_add(1, Ordering::Relaxed);
77 }
78
79 pub fn record_dropped(&self, count: u64) {
81 self.items_dropped.fetch_add(count, Ordering::Relaxed);
82 }
83
84 pub fn record_blocked_time(&self, duration: Duration) {
86 self.blocked_time_ns
87 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
88 }
89
90 pub fn stats(&self) -> BackpressureStats {
92 BackpressureStats {
93 strategy: self.strategy,
94 fill_ratio: self.fill_ratio(),
95 items_dropped: self.items_dropped.load(Ordering::Relaxed),
96 blocked_time_ms: self.blocked_time_ns.load(Ordering::Relaxed) / 1_000_000,
97 backpressure_events: self.backpressure_events.load(Ordering::Relaxed),
98 is_under_pressure: self.should_apply_backpressure(),
99 }
100 }
101
102 pub fn strategy(&self) -> BackpressureStrategy {
104 self.strategy
105 }
106}
107
108#[derive(Debug, Clone)]
110pub struct BackpressureStats {
111 pub strategy: BackpressureStrategy,
113 pub fill_ratio: f64,
115 pub items_dropped: u64,
117 pub blocked_time_ms: u64,
119 pub backpressure_events: u64,
121 pub is_under_pressure: bool,
123}
124
125#[derive(Debug)]
127pub struct AdaptiveBackpressure {
128 target_fill: f64,
130 min_delay_ns: u64,
132 max_delay_ns: u64,
134 current_delay_ns: AtomicU64,
136 last_adjustment: std::sync::Mutex<Instant>,
138 adjustment_interval: Duration,
140}
141
142impl AdaptiveBackpressure {
143 pub fn new() -> Self {
145 Self {
146 target_fill: 0.7,
147 min_delay_ns: 0,
148 max_delay_ns: 10_000_000, current_delay_ns: AtomicU64::new(0),
150 last_adjustment: std::sync::Mutex::new(Instant::now()),
151 adjustment_interval: Duration::from_millis(100),
152 }
153 }
154
155 pub fn with_target_fill(mut self, target: f64) -> Self {
157 self.target_fill = target.clamp(0.1, 0.9);
158 self
159 }
160
161 pub fn with_delay_bounds(mut self, min: Duration, max: Duration) -> Self {
163 self.min_delay_ns = min.as_nanos() as u64;
164 self.max_delay_ns = max.as_nanos() as u64;
165 self
166 }
167
168 pub fn adjust(&self, current_fill: f64) {
170 let mut last_adj = self.last_adjustment.lock().unwrap();
171 if last_adj.elapsed() < self.adjustment_interval {
172 return;
173 }
174 *last_adj = Instant::now();
175 drop(last_adj);
176
177 let current_delay = self.current_delay_ns.load(Ordering::Relaxed);
178 let error = current_fill - self.target_fill;
179
180 let new_delay = if current_delay == 0 && error > 0.0 {
183 let step = (self.max_delay_ns / 10).max(1000);
185 (step as f64 * error * 2.0) as u64
186 } else {
187 let adjustment_factor = 1.0 + error * 0.5;
188 (current_delay as f64 * adjustment_factor) as u64
189 };
190
191 let clamped_delay = new_delay.clamp(self.min_delay_ns, self.max_delay_ns);
193 self.current_delay_ns
194 .store(clamped_delay, Ordering::Relaxed);
195 }
196
197 pub fn current_delay(&self) -> Duration {
199 Duration::from_nanos(self.current_delay_ns.load(Ordering::Relaxed))
200 }
201
202 pub fn reset(&self) {
204 self.current_delay_ns
205 .store(self.min_delay_ns, Ordering::Relaxed);
206 }
207}
208
209impl Default for AdaptiveBackpressure {
210 fn default() -> Self {
211 Self::new()
212 }
213}
214
215pub struct BackpressureAwareProducer {
217 monitor: BackpressureMonitor,
219 adaptive: Option<AdaptiveBackpressure>,
221 state: BackpressureState,
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
227pub enum BackpressureState {
228 Normal,
230 SlowingDown,
232 Blocked,
234 Recovering,
236}
237
238impl BackpressureAwareProducer {
239 pub fn new(strategy: BackpressureStrategy, capacity: usize) -> Self {
241 Self {
242 monitor: BackpressureMonitor::new(strategy, capacity),
243 adaptive: None,
244 state: BackpressureState::Normal,
245 }
246 }
247
248 pub fn with_adaptive(mut self) -> Self {
250 self.adaptive = Some(AdaptiveBackpressure::new());
251 self
252 }
253
254 pub fn update(&mut self, fill_level: usize) {
256 self.monitor.update_fill(fill_level);
257 let ratio = self.monitor.fill_ratio();
258
259 if let Some(ref adaptive) = self.adaptive {
261 adaptive.adjust(ratio);
262 }
263
264 self.state = if ratio >= 1.0 {
266 BackpressureState::Blocked
267 } else if self.monitor.should_apply_backpressure() {
268 if self.state == BackpressureState::Normal {
269 self.monitor.record_backpressure();
270 }
271 BackpressureState::SlowingDown
272 } else if self.monitor.has_recovered() {
273 BackpressureState::Normal
274 } else if self.state == BackpressureState::SlowingDown {
275 BackpressureState::Recovering
276 } else {
277 self.state
278 };
279 }
280
281 pub fn state(&self) -> BackpressureState {
283 self.state
284 }
285
286 pub fn recommended_delay(&self) -> Duration {
288 match self.state {
289 BackpressureState::Normal => Duration::ZERO,
290 BackpressureState::SlowingDown | BackpressureState::Recovering => self
291 .adaptive
292 .as_ref()
293 .map(|a| a.current_delay())
294 .unwrap_or(Duration::from_micros(100)),
295 BackpressureState::Blocked => Duration::from_millis(1),
296 }
297 }
298
299 pub fn record_dropped(&self, count: u64) {
301 self.monitor.record_dropped(count);
302 }
303
304 pub fn stats(&self) -> BackpressureStats {
306 self.monitor.stats()
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313
314 #[test]
315 fn test_backpressure_monitor() {
316 let monitor = BackpressureMonitor::new(BackpressureStrategy::Block, 100);
317
318 monitor.update_fill(50);
319 assert!(!monitor.should_apply_backpressure());
320
321 monitor.update_fill(80);
322 assert!(monitor.should_apply_backpressure());
323
324 monitor.update_fill(40);
325 assert!(monitor.has_recovered());
326 }
327
328 #[test]
329 fn test_backpressure_monitor_stats() {
330 let monitor = BackpressureMonitor::new(BackpressureStrategy::DropOldest, 100);
331
332 monitor.record_dropped(5);
333 monitor.record_backpressure();
334 monitor.record_blocked_time(Duration::from_millis(100));
335
336 let stats = monitor.stats();
337 assert_eq!(stats.items_dropped, 5);
338 assert_eq!(stats.backpressure_events, 1);
339 assert!(stats.blocked_time_ms >= 100);
340 }
341
342 #[test]
343 fn test_adaptive_backpressure() {
344 let adaptive = AdaptiveBackpressure::new()
345 .with_target_fill(0.5)
346 .with_delay_bounds(Duration::ZERO, Duration::from_millis(100));
347
348 assert_eq!(adaptive.current_delay(), Duration::ZERO);
349
350 for _ in 0..10 {
352 adaptive.adjust(0.9);
353 std::thread::sleep(Duration::from_millis(110));
354 }
355
356 assert!(adaptive.current_delay() > Duration::ZERO);
358 }
359
360 #[test]
361 fn test_backpressure_aware_producer() {
362 let mut producer = BackpressureAwareProducer::new(BackpressureStrategy::Block, 100);
363
364 producer.update(50);
365 assert_eq!(producer.state(), BackpressureState::Normal);
366
367 producer.update(85);
368 assert_eq!(producer.state(), BackpressureState::SlowingDown);
369
370 producer.update(40);
371 assert_eq!(producer.state(), BackpressureState::Normal);
372 }
373
374 #[test]
375 fn test_backpressure_state_blocked() {
376 let mut producer = BackpressureAwareProducer::new(BackpressureStrategy::Block, 100);
377
378 producer.update(100);
379 assert_eq!(producer.state(), BackpressureState::Blocked);
380 }
381}