Skip to main content

datasynth_core/streaming/
backpressure.rs

1//! Backpressure handling for streaming generation.
2//!
3//! Provides strategies and utilities for handling backpressure
4//! when consumers cannot keep up with producers.
5
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9use crate::traits::BackpressureStrategy;
10
11/// Monitors and reports backpressure conditions.
12#[derive(Debug)]
13pub struct BackpressureMonitor {
14    /// Strategy in use.
15    strategy: BackpressureStrategy,
16    /// Capacity threshold.
17    capacity: usize,
18    /// Current fill level.
19    current_fill: AtomicU64,
20    /// High watermark (when to slow down).
21    high_watermark: f64,
22    /// Low watermark (when to resume full speed).
23    low_watermark: f64,
24    /// Total items dropped.
25    items_dropped: AtomicU64,
26    /// Total time spent blocked (nanoseconds).
27    blocked_time_ns: AtomicU64,
28    /// Number of times backpressure was triggered.
29    backpressure_events: AtomicU64,
30}
31
32impl BackpressureMonitor {
33    /// Creates a new backpressure monitor.
34    pub fn new(strategy: BackpressureStrategy, capacity: usize) -> Self {
35        Self {
36            strategy,
37            capacity,
38            current_fill: AtomicU64::new(0),
39            high_watermark: 0.8,
40            low_watermark: 0.5,
41            items_dropped: AtomicU64::new(0),
42            blocked_time_ns: AtomicU64::new(0),
43            backpressure_events: AtomicU64::new(0),
44        }
45    }
46
47    /// Creates a monitor with custom watermarks.
48    pub fn with_watermarks(mut self, high: f64, low: f64) -> Self {
49        self.high_watermark = high.clamp(0.0, 1.0);
50        self.low_watermark = low.clamp(0.0, self.high_watermark);
51        self
52    }
53
54    /// Updates the current fill level.
55    pub fn update_fill(&self, current: usize) {
56        self.current_fill.store(current as u64, Ordering::Relaxed);
57    }
58
59    /// Returns the current fill ratio (0.0 to 1.0+).
60    pub fn fill_ratio(&self) -> f64 {
61        self.current_fill.load(Ordering::Relaxed) as f64 / self.capacity as f64
62    }
63
64    /// Returns whether backpressure should be applied.
65    pub fn should_apply_backpressure(&self) -> bool {
66        self.fill_ratio() >= self.high_watermark
67    }
68
69    /// Returns whether the system has recovered from backpressure.
70    pub fn has_recovered(&self) -> bool {
71        self.fill_ratio() <= self.low_watermark
72    }
73
74    /// Records a backpressure event.
75    pub fn record_backpressure(&self) {
76        self.backpressure_events.fetch_add(1, Ordering::Relaxed);
77    }
78
79    /// Records dropped items.
80    pub fn record_dropped(&self, count: u64) {
81        self.items_dropped.fetch_add(count, Ordering::Relaxed);
82    }
83
84    /// Records blocked time.
85    pub fn record_blocked_time(&self, duration: Duration) {
86        self.blocked_time_ns
87            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
88    }
89
90    /// Returns backpressure statistics.
91    pub fn stats(&self) -> BackpressureStats {
92        BackpressureStats {
93            strategy: self.strategy,
94            fill_ratio: self.fill_ratio(),
95            items_dropped: self.items_dropped.load(Ordering::Relaxed),
96            blocked_time_ms: self.blocked_time_ns.load(Ordering::Relaxed) / 1_000_000,
97            backpressure_events: self.backpressure_events.load(Ordering::Relaxed),
98            is_under_pressure: self.should_apply_backpressure(),
99        }
100    }
101
102    /// Returns the configured strategy.
103    pub fn strategy(&self) -> BackpressureStrategy {
104        self.strategy
105    }
106}
107
108/// Statistics about backpressure handling.
109#[derive(Debug, Clone)]
110pub struct BackpressureStats {
111    /// Strategy in use.
112    pub strategy: BackpressureStrategy,
113    /// Current fill ratio.
114    pub fill_ratio: f64,
115    /// Total items dropped.
116    pub items_dropped: u64,
117    /// Total time spent blocked (milliseconds).
118    pub blocked_time_ms: u64,
119    /// Number of backpressure events.
120    pub backpressure_events: u64,
121    /// Currently under backpressure.
122    pub is_under_pressure: bool,
123}
124
125/// Adaptive backpressure controller that adjusts generation rate.
126#[derive(Debug)]
127pub struct AdaptiveBackpressure {
128    /// Target fill ratio to maintain.
129    target_fill: f64,
130    /// Minimum delay between items (nanoseconds).
131    min_delay_ns: u64,
132    /// Maximum delay between items (nanoseconds).
133    max_delay_ns: u64,
134    /// Current delay.
135    current_delay_ns: AtomicU64,
136    /// Last adjustment time.
137    last_adjustment: std::sync::Mutex<Instant>,
138    /// Adjustment interval.
139    adjustment_interval: Duration,
140}
141
142impl AdaptiveBackpressure {
143    /// Creates a new adaptive backpressure controller.
144    pub fn new() -> Self {
145        Self {
146            target_fill: 0.7,
147            min_delay_ns: 0,
148            max_delay_ns: 10_000_000, // 10ms
149            current_delay_ns: AtomicU64::new(0),
150            last_adjustment: std::sync::Mutex::new(Instant::now()),
151            adjustment_interval: Duration::from_millis(100),
152        }
153    }
154
155    /// Sets the target fill ratio.
156    pub fn with_target_fill(mut self, target: f64) -> Self {
157        self.target_fill = target.clamp(0.1, 0.9);
158        self
159    }
160
161    /// Sets the delay bounds.
162    pub fn with_delay_bounds(mut self, min: Duration, max: Duration) -> Self {
163        self.min_delay_ns = min.as_nanos() as u64;
164        self.max_delay_ns = max.as_nanos() as u64;
165        self
166    }
167
168    /// Adjusts the delay based on current fill ratio.
169    pub fn adjust(&self, current_fill: f64) {
170        let mut last_adj = self.last_adjustment.lock().unwrap();
171        if last_adj.elapsed() < self.adjustment_interval {
172            return;
173        }
174        *last_adj = Instant::now();
175        drop(last_adj);
176
177        let current_delay = self.current_delay_ns.load(Ordering::Relaxed);
178        let error = current_fill - self.target_fill;
179
180        // Simple proportional control
181        // When current delay is 0 and we need to increase, use a minimum step
182        let new_delay = if current_delay == 0 && error > 0.0 {
183            // Start with 1 microsecond (1000 nanoseconds) when at zero
184            let step = (self.max_delay_ns / 10).max(1000);
185            (step as f64 * error * 2.0) as u64
186        } else {
187            let adjustment_factor = 1.0 + error * 0.5;
188            (current_delay as f64 * adjustment_factor) as u64
189        };
190
191        // Clamp to bounds
192        let clamped_delay = new_delay.clamp(self.min_delay_ns, self.max_delay_ns);
193        self.current_delay_ns
194            .store(clamped_delay, Ordering::Relaxed);
195    }
196
197    /// Returns the current delay to apply.
198    pub fn current_delay(&self) -> Duration {
199        Duration::from_nanos(self.current_delay_ns.load(Ordering::Relaxed))
200    }
201
202    /// Resets the delay to minimum.
203    pub fn reset(&self) {
204        self.current_delay_ns
205            .store(self.min_delay_ns, Ordering::Relaxed);
206    }
207}
208
209impl Default for AdaptiveBackpressure {
210    fn default() -> Self {
211        Self::new()
212    }
213}
214
215/// A rate-aware producer that respects backpressure.
216pub struct BackpressureAwareProducer {
217    /// Backpressure monitor.
218    monitor: BackpressureMonitor,
219    /// Adaptive controller.
220    adaptive: Option<AdaptiveBackpressure>,
221    /// Current state.
222    state: BackpressureState,
223}
224
225/// State of the backpressure system.
226#[derive(Debug, Clone, Copy, PartialEq, Eq)]
227pub enum BackpressureState {
228    /// Normal operation.
229    Normal,
230    /// Slowing down due to high fill.
231    SlowingDown,
232    /// Fully blocked waiting for space.
233    Blocked,
234    /// Recovering from backpressure.
235    Recovering,
236}
237
238impl BackpressureAwareProducer {
239    /// Creates a new backpressure-aware producer.
240    pub fn new(strategy: BackpressureStrategy, capacity: usize) -> Self {
241        Self {
242            monitor: BackpressureMonitor::new(strategy, capacity),
243            adaptive: None,
244            state: BackpressureState::Normal,
245        }
246    }
247
248    /// Enables adaptive rate control.
249    pub fn with_adaptive(mut self) -> Self {
250        self.adaptive = Some(AdaptiveBackpressure::new());
251        self
252    }
253
254    /// Updates the fill level and adjusts state.
255    pub fn update(&mut self, fill_level: usize) {
256        self.monitor.update_fill(fill_level);
257        let ratio = self.monitor.fill_ratio();
258
259        // Update adaptive controller
260        if let Some(ref adaptive) = self.adaptive {
261            adaptive.adjust(ratio);
262        }
263
264        // Update state
265        self.state = if ratio >= 1.0 {
266            BackpressureState::Blocked
267        } else if self.monitor.should_apply_backpressure() {
268            if self.state == BackpressureState::Normal {
269                self.monitor.record_backpressure();
270            }
271            BackpressureState::SlowingDown
272        } else if self.monitor.has_recovered() {
273            BackpressureState::Normal
274        } else if self.state == BackpressureState::SlowingDown {
275            BackpressureState::Recovering
276        } else {
277            self.state
278        };
279    }
280
281    /// Returns the current state.
282    pub fn state(&self) -> BackpressureState {
283        self.state
284    }
285
286    /// Returns the recommended delay before the next send.
287    pub fn recommended_delay(&self) -> Duration {
288        match self.state {
289            BackpressureState::Normal => Duration::ZERO,
290            BackpressureState::SlowingDown | BackpressureState::Recovering => self
291                .adaptive
292                .as_ref()
293                .map(|a| a.current_delay())
294                .unwrap_or(Duration::from_micros(100)),
295            BackpressureState::Blocked => Duration::from_millis(1),
296        }
297    }
298
299    /// Records dropped items.
300    pub fn record_dropped(&self, count: u64) {
301        self.monitor.record_dropped(count);
302    }
303
304    /// Returns statistics.
305    pub fn stats(&self) -> BackpressureStats {
306        self.monitor.stats()
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313
314    #[test]
315    fn test_backpressure_monitor() {
316        let monitor = BackpressureMonitor::new(BackpressureStrategy::Block, 100);
317
318        monitor.update_fill(50);
319        assert!(!monitor.should_apply_backpressure());
320
321        monitor.update_fill(80);
322        assert!(monitor.should_apply_backpressure());
323
324        monitor.update_fill(40);
325        assert!(monitor.has_recovered());
326    }
327
328    #[test]
329    fn test_backpressure_monitor_stats() {
330        let monitor = BackpressureMonitor::new(BackpressureStrategy::DropOldest, 100);
331
332        monitor.record_dropped(5);
333        monitor.record_backpressure();
334        monitor.record_blocked_time(Duration::from_millis(100));
335
336        let stats = monitor.stats();
337        assert_eq!(stats.items_dropped, 5);
338        assert_eq!(stats.backpressure_events, 1);
339        assert!(stats.blocked_time_ms >= 100);
340    }
341
342    #[test]
343    fn test_adaptive_backpressure() {
344        let adaptive = AdaptiveBackpressure::new()
345            .with_target_fill(0.5)
346            .with_delay_bounds(Duration::ZERO, Duration::from_millis(100));
347
348        assert_eq!(adaptive.current_delay(), Duration::ZERO);
349
350        // Simulate high fill - should increase delay
351        for _ in 0..10 {
352            adaptive.adjust(0.9);
353            std::thread::sleep(Duration::from_millis(110));
354        }
355
356        // Delay should have increased
357        assert!(adaptive.current_delay() > Duration::ZERO);
358    }
359
360    #[test]
361    fn test_backpressure_aware_producer() {
362        let mut producer = BackpressureAwareProducer::new(BackpressureStrategy::Block, 100);
363
364        producer.update(50);
365        assert_eq!(producer.state(), BackpressureState::Normal);
366
367        producer.update(85);
368        assert_eq!(producer.state(), BackpressureState::SlowingDown);
369
370        producer.update(40);
371        assert_eq!(producer.state(), BackpressureState::Normal);
372    }
373
374    #[test]
375    fn test_backpressure_state_blocked() {
376        let mut producer = BackpressureAwareProducer::new(BackpressureStrategy::Block, 100);
377
378        producer.update(100);
379        assert_eq!(producer.state(), BackpressureState::Blocked);
380    }
381}