Skip to main content

oxirs_stream/
backpressure_controller.rs

1//! # Backpressure Controller
2//!
3//! Adaptive backpressure control for streaming pipelines. Supports multiple
4//! strategies: Drop, Block, Throttle (token-bucket), and SpillToDisk.
5//!
6//! ## Strategies
7//!
8//! - **Drop**: Discard incoming items when the queue is above the high watermark.
9//! - **Block**: Signal the caller to block (returns `ThrottleDelay(u64::MAX)`).
10//! - **Throttle**: Token-bucket rate limiter — items accepted only when a token
11//!   is available; delay returned otherwise.
12//! - **SpillToDisk**: Like Drop but caller is expected to write to `path`; this
13//!   module records the event for stats purposes only (I/O is caller's concern).
14
15use serde::{Deserialize, Serialize};
16
17// ─────────────────────────────────────────────
18// Strategy
19// ─────────────────────────────────────────────
20
21/// How the controller responds when the queue exceeds the high-watermark.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub enum BackpressureStrategy {
24    /// Silently discard the incoming item.
25    Drop,
26    /// Tell the caller to block indefinitely.
27    Block,
28    /// Token-bucket throttle: accept at most `rate_hz` items per second.
29    Throttle {
30        /// Target accept rate in items per second.
31        rate_hz: f64,
32    },
33    /// Spill overflowing items to disk at the given path.
34    SpillToDisk {
35        /// Filesystem path for the spill file.
36        path: String,
37    },
38}
39
40// ─────────────────────────────────────────────
41// Config
42// ─────────────────────────────────────────────
43
44/// Configuration for a [`BackpressureController`].
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct BackpressureConfig {
47    /// Strategy applied when the queue exceeds `high_watermark`.
48    pub strategy: BackpressureStrategy,
49    /// Queue depth at which backpressure is activated.
50    pub high_watermark: usize,
51    /// Queue depth at which backpressure is deactivated.
52    pub low_watermark: usize,
53    /// Rolling window length in milliseconds (reserved for future analytics).
54    pub window_ms: u64,
55}
56
57impl Default for BackpressureConfig {
58    fn default() -> Self {
59        Self {
60            strategy: BackpressureStrategy::Drop,
61            high_watermark: 1000,
62            low_watermark: 500,
63            window_ms: 1000,
64        }
65    }
66}
67
68// ─────────────────────────────────────────────
69// Stats
70// ─────────────────────────────────────────────
71
72/// Cumulative statistics tracked by a [`BackpressureController`].
73#[derive(Debug, Clone, Default, Serialize, Deserialize)]
74pub struct BackpressureStats {
75    /// Total items dropped (Drop / SpillToDisk strategy).
76    pub items_dropped: u64,
77    /// Total items for which a throttle delay was returned.
78    pub items_throttled: u64,
79    /// Current number of items in the logical queue.
80    pub current_queue_depth: usize,
81    /// Maximum queue depth observed since the last reset.
82    pub peak_queue_depth: usize,
83    /// Number of times backpressure was triggered.
84    pub backpressure_events: u64,
85}
86
87// ─────────────────────────────────────────────
88// Decision
89// ─────────────────────────────────────────────
90
91/// Decision returned by [`BackpressureController::try_accept`].
92#[derive(Debug, Clone, PartialEq)]
93pub enum BackpressureDecision {
94    /// The item is accepted; caller may enqueue it.
95    Accept,
96    /// The item should be dropped.
97    Drop,
98    /// The caller should wait `delay_ms` milliseconds before retrying.
99    ThrottleDelay(u64),
100}
101
102// ─────────────────────────────────────────────
103// Controller
104// ─────────────────────────────────────────────
105
106/// Adaptive backpressure controller implementing multiple mitigation strategies.
107#[derive(Debug)]
108pub struct BackpressureController {
109    config: BackpressureConfig,
110    /// Logical queue depth (incremented on accept, decremented on dequeue).
111    queue_depth: usize,
112    stats: BackpressureStats,
113    /// Token bucket level (only meaningful for Throttle strategy).
114    throttle_tokens: f64,
115    /// Timestamp of the last token replenishment (ms since epoch / monotonic).
116    last_tick_ms: u64,
117    /// Whether we are currently in backpressure state.
118    in_backpressure: bool,
119}
120
121impl BackpressureController {
122    /// Create a new controller with the supplied configuration.
123    pub fn new(config: BackpressureConfig) -> Self {
124        let initial_tokens = if let BackpressureStrategy::Throttle { rate_hz } = config.strategy {
125            rate_hz.max(0.0)
126        } else {
127            0.0
128        };
129        Self {
130            config,
131            queue_depth: 0,
132            stats: BackpressureStats::default(),
133            throttle_tokens: initial_tokens,
134            last_tick_ms: 0,
135            in_backpressure: false,
136        }
137    }
138
139    /// Attempt to accept one item at `now_ms` (monotonic milliseconds).
140    ///
141    /// Updates internal state and returns the decision the caller should act on.
142    pub fn try_accept(&mut self, now_ms: u64) -> BackpressureDecision {
143        // Replenish tokens regardless of watermark level.
144        self.replenish_tokens(now_ms);
145
146        let above_high = self.is_above_high_watermark();
147        let was_in_backpressure = self.in_backpressure;
148
149        if above_high && !was_in_backpressure {
150            self.in_backpressure = true;
151            self.stats.backpressure_events += 1;
152        } else if self.is_below_low_watermark() {
153            self.in_backpressure = false;
154        }
155
156        if !self.in_backpressure {
157            // Normal path — accept unconditionally.
158            self.queue_depth += 1;
159            if self.queue_depth > self.stats.peak_queue_depth {
160                self.stats.peak_queue_depth = self.queue_depth;
161            }
162            self.stats.current_queue_depth = self.queue_depth;
163            return BackpressureDecision::Accept;
164        }
165
166        // Backpressure path — apply strategy.
167        match &self.config.strategy {
168            BackpressureStrategy::Drop => {
169                self.stats.items_dropped += 1;
170                BackpressureDecision::Drop
171            }
172            BackpressureStrategy::Block => {
173                // Signal the caller to block.
174                BackpressureDecision::ThrottleDelay(u64::MAX)
175            }
176            BackpressureStrategy::Throttle { rate_hz } => {
177                if self.throttle_tokens >= 1.0 {
178                    self.throttle_tokens -= 1.0;
179                    self.queue_depth += 1;
180                    if self.queue_depth > self.stats.peak_queue_depth {
181                        self.stats.peak_queue_depth = self.queue_depth;
182                    }
183                    self.stats.current_queue_depth = self.queue_depth;
184                    BackpressureDecision::Accept
185                } else {
186                    // Compute how many ms until the next token is available.
187                    let delay_ms = if *rate_hz > 0.0 {
188                        ((1.0 - self.throttle_tokens) / rate_hz * 1000.0).ceil() as u64
189                    } else {
190                        u64::MAX
191                    };
192                    self.stats.items_throttled += 1;
193                    BackpressureDecision::ThrottleDelay(delay_ms)
194                }
195            }
196            BackpressureStrategy::SpillToDisk { .. } => {
197                // Caller is responsible for actual I/O; we record it as dropped.
198                self.stats.items_dropped += 1;
199                BackpressureDecision::Drop
200            }
201        }
202    }
203
204    /// Record that one item was consumed from the queue.
205    pub fn record_dequeue(&mut self) {
206        self.queue_depth = self.queue_depth.saturating_sub(1);
207        self.stats.current_queue_depth = self.queue_depth;
208    }
209
210    /// Return a reference to the current statistics.
211    pub fn stats(&self) -> &BackpressureStats {
212        &self.stats
213    }
214
215    /// Reset all counters (peak_queue_depth, items_dropped, …) without
216    /// changing the queue depth or backpressure state.
217    pub fn reset_stats(&mut self) {
218        let current = self.queue_depth;
219        self.stats = BackpressureStats {
220            current_queue_depth: current,
221            peak_queue_depth: current,
222            ..Default::default()
223        };
224    }
225
226    /// Return `true` when the current depth is at or above the high watermark.
227    pub fn is_above_high_watermark(&self) -> bool {
228        self.queue_depth >= self.config.high_watermark
229    }
230
231    /// Return `true` when the current depth is at or below the low watermark.
232    pub fn is_below_low_watermark(&self) -> bool {
233        self.queue_depth <= self.config.low_watermark
234    }
235
236    /// Replenish the token bucket based on elapsed time since the last tick.
237    ///
238    /// Only effective when the strategy is `Throttle`.
239    pub fn replenish_tokens(&mut self, now_ms: u64) {
240        if let BackpressureStrategy::Throttle { rate_hz } = self.config.strategy {
241            if self.last_tick_ms == 0 {
242                self.last_tick_ms = now_ms;
243                return;
244            }
245            let elapsed_ms = now_ms.saturating_sub(self.last_tick_ms);
246            let new_tokens = rate_hz * (elapsed_ms as f64 / 1000.0);
247            self.throttle_tokens = (self.throttle_tokens + new_tokens).min(rate_hz);
248            self.last_tick_ms = now_ms;
249        }
250    }
251
252    /// Return the current logical queue depth.
253    pub fn current_depth(&self) -> usize {
254        self.queue_depth
255    }
256}
257
258// ─────────────────────────────────────────────
259// Tests
260// ─────────────────────────────────────────────
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265
266    fn drop_config(high: usize, low: usize) -> BackpressureConfig {
267        BackpressureConfig {
268            strategy: BackpressureStrategy::Drop,
269            high_watermark: high,
270            low_watermark: low,
271            window_ms: 1000,
272        }
273    }
274
275    fn throttle_config(rate_hz: f64, high: usize, low: usize) -> BackpressureConfig {
276        BackpressureConfig {
277            strategy: BackpressureStrategy::Throttle { rate_hz },
278            high_watermark: high,
279            low_watermark: low,
280            window_ms: 1000,
281        }
282    }
283
284    // ── construction ────────────────────────────────────────────────────
285
286    #[test]
287    fn test_new_default_config() {
288        let ctrl = BackpressureController::new(BackpressureConfig::default());
289        assert_eq!(ctrl.current_depth(), 0);
290        assert_eq!(ctrl.stats().items_dropped, 0);
291        assert!(!ctrl.is_above_high_watermark());
292        assert!(ctrl.is_below_low_watermark());
293    }
294
295    #[test]
296    fn test_new_throttle_sets_initial_tokens() {
297        let ctrl = BackpressureController::new(throttle_config(100.0, 50, 25));
298        assert!((ctrl.throttle_tokens - 100.0).abs() < 1e-9);
299    }
300
301    // ── try_accept under high watermark ─────────────────────────────────
302
303    #[test]
304    fn test_accept_below_high_watermark() {
305        let mut ctrl = BackpressureController::new(drop_config(10, 5));
306        for i in 0..9 {
307            let decision = ctrl.try_accept(i as u64 * 10);
308            assert_eq!(decision, BackpressureDecision::Accept, "step {i}");
309        }
310        assert_eq!(ctrl.current_depth(), 9);
311    }
312
313    // ── drop strategy ───────────────────────────────────────────────────
314
315    #[test]
316    fn test_drop_at_high_watermark() {
317        let mut ctrl = BackpressureController::new(drop_config(3, 1));
318        ctrl.try_accept(0);
319        ctrl.try_accept(1);
320        ctrl.try_accept(2); // depth = 3 → high watermark
321        let decision = ctrl.try_accept(3);
322        assert_eq!(decision, BackpressureDecision::Drop);
323        assert_eq!(ctrl.stats().items_dropped, 1);
324    }
325
326    #[test]
327    fn test_drop_increments_counter() {
328        let mut ctrl = BackpressureController::new(drop_config(2, 1));
329        ctrl.try_accept(0);
330        ctrl.try_accept(1); // high watermark hit
331        ctrl.try_accept(2);
332        ctrl.try_accept(3);
333        assert_eq!(ctrl.stats().items_dropped, 2);
334    }
335
336    #[test]
337    fn test_backpressure_event_counted() {
338        let mut ctrl = BackpressureController::new(drop_config(2, 1));
339        ctrl.try_accept(0);
340        ctrl.try_accept(1);
341        ctrl.try_accept(2); // triggers bp event
342        ctrl.try_accept(3); // still in bp, no new event
343        assert_eq!(ctrl.stats().backpressure_events, 1);
344    }
345
346    // ── record_dequeue and recovery ──────────────────────────────────────
347
348    #[test]
349    fn test_dequeue_decrements_depth() {
350        let mut ctrl = BackpressureController::new(drop_config(10, 2));
351        ctrl.try_accept(0);
352        ctrl.try_accept(1);
353        ctrl.try_accept(2);
354        ctrl.record_dequeue();
355        assert_eq!(ctrl.current_depth(), 2);
356        assert_eq!(ctrl.stats().current_queue_depth, 2);
357    }
358
359    #[test]
360    fn test_dequeue_saturates_at_zero() {
361        let mut ctrl = BackpressureController::new(drop_config(10, 2));
362        ctrl.record_dequeue(); // depth is already 0
363        assert_eq!(ctrl.current_depth(), 0);
364    }
365
366    #[test]
367    fn test_recovery_after_dequeue() {
368        let mut ctrl = BackpressureController::new(drop_config(3, 1));
369        // Fill to high watermark
370        ctrl.try_accept(0);
371        ctrl.try_accept(1);
372        ctrl.try_accept(2); // bp activated at depth=3
373                            // Drain below low watermark
374        ctrl.record_dequeue(); // 2
375        ctrl.record_dequeue(); // 1 — at or below low watermark
376                               // Now next accept should succeed
377        let decision = ctrl.try_accept(100);
378        assert_eq!(decision, BackpressureDecision::Accept);
379    }
380
381    // ── peak tracking ────────────────────────────────────────────────────
382
383    #[test]
384    fn test_peak_depth_tracked() {
385        let mut ctrl = BackpressureController::new(drop_config(20, 5));
386        for i in 0..10u64 {
387            ctrl.try_accept(i);
388        }
389        for _ in 0..5 {
390            ctrl.record_dequeue();
391        }
392        assert_eq!(ctrl.stats().peak_queue_depth, 10);
393        assert_eq!(ctrl.current_depth(), 5);
394    }
395
396    // ── reset_stats ─────────────────────────────────────────────────────
397
398    #[test]
399    fn test_reset_stats_clears_counters() {
400        let mut ctrl = BackpressureController::new(drop_config(2, 1));
401        ctrl.try_accept(0);
402        ctrl.try_accept(1);
403        ctrl.try_accept(2); // drops start
404        ctrl.try_accept(3);
405        ctrl.reset_stats();
406        assert_eq!(ctrl.stats().items_dropped, 0);
407        assert_eq!(ctrl.stats().backpressure_events, 0);
408        assert_eq!(ctrl.stats().items_throttled, 0);
409        // current depth should be preserved
410        assert_eq!(ctrl.stats().current_queue_depth, ctrl.current_depth());
411    }
412
413    #[test]
414    fn test_reset_stats_preserves_depth() {
415        let mut ctrl = BackpressureController::new(drop_config(10, 2));
416        ctrl.try_accept(0);
417        ctrl.try_accept(1);
418        ctrl.try_accept(2);
419        ctrl.reset_stats();
420        assert_eq!(ctrl.current_depth(), 3);
421    }
422
423    // ── watermark predicates ─────────────────────────────────────────────
424
425    #[test]
426    fn test_above_high_watermark() {
427        let mut ctrl = BackpressureController::new(drop_config(5, 2));
428        for i in 0..5u64 {
429            ctrl.try_accept(i);
430        }
431        assert!(ctrl.is_above_high_watermark());
432    }
433
434    #[test]
435    fn test_below_low_watermark() {
436        let ctrl = BackpressureController::new(drop_config(5, 2));
437        assert!(ctrl.is_below_low_watermark());
438    }
439
440    #[test]
441    fn test_between_watermarks() {
442        let mut ctrl = BackpressureController::new(drop_config(10, 2));
443        for i in 0..5u64 {
444            ctrl.try_accept(i);
445        }
446        assert!(!ctrl.is_above_high_watermark());
447        assert!(!ctrl.is_below_low_watermark());
448    }
449
450    // ── throttle strategy ────────────────────────────────────────────────
451
452    #[test]
453    fn test_throttle_accept_when_tokens_available() {
454        let mut ctrl = BackpressureController::new(throttle_config(10.0, 2, 1));
455        // Initial tokens = rate_hz = 10.0, so first accept below watermark is normal.
456        let d = ctrl.try_accept(0);
457        assert_eq!(d, BackpressureDecision::Accept);
458    }
459
460    #[test]
461    fn test_throttle_delay_when_no_tokens() {
462        let mut ctrl = BackpressureController::new(throttle_config(1.0, 2, 1));
463        // Fill to high watermark: two accepts (depth 1, 2)
464        ctrl.try_accept(0); // depth 1
465        ctrl.try_accept(0); // depth 2 — hits high watermark, bp activated
466
467        // First accept in bp consumes the only token
468        ctrl.try_accept(0);
469        // Now tokens are exhausted — next should throttle
470        let decision = ctrl.try_accept(0);
471        assert!(
472            matches!(decision, BackpressureDecision::ThrottleDelay(_)),
473            "expected ThrottleDelay, got {decision:?}"
474        );
475        assert!(ctrl.stats().items_throttled > 0);
476    }
477
478    #[test]
479    fn test_throttle_replenish_over_time() {
480        let mut ctrl = BackpressureController::new(throttle_config(10.0, 2, 1));
481        // Exhaust by accepting well above high watermark
482        ctrl.try_accept(0); // depth=1
483        ctrl.try_accept(0); // depth=2, bp activated
484                            // Burn the token immediately
485        ctrl.try_accept(0); // uses the 1 token available right now (tokens now <1)
486                            // Advance time by 1 second — should replenish 10 tokens
487        ctrl.try_accept(1000);
488        // Tokens should be available now
489        let decision = ctrl.try_accept(1000);
490        assert_ne!(decision, BackpressureDecision::Drop);
491    }
492
493    #[test]
494    fn test_replenish_tokens_noop_without_throttle() {
495        let mut ctrl = BackpressureController::new(drop_config(10, 2));
496        let tokens_before = ctrl.throttle_tokens;
497        ctrl.replenish_tokens(5000);
498        assert!((ctrl.throttle_tokens - tokens_before).abs() < 1e-9);
499    }
500
501    #[test]
502    fn test_replenish_tokens_first_tick() {
503        let mut ctrl = BackpressureController::new(throttle_config(10.0, 10, 5));
504        ctrl.replenish_tokens(1000); // sets last_tick_ms, should not crash
505        assert_eq!(ctrl.last_tick_ms, 1000);
506    }
507
508    #[test]
509    fn test_replenish_tokens_capped_at_rate_hz() {
510        let mut ctrl = BackpressureController::new(throttle_config(5.0, 10, 5));
511        ctrl.last_tick_ms = 1;
512        ctrl.replenish_tokens(100_000); // huge elapsed time
513        assert!(ctrl.throttle_tokens <= 5.0 + 1e-9);
514    }
515
516    // ── block strategy ───────────────────────────────────────────────────
517
518    #[test]
519    fn test_block_strategy_returns_max_delay() {
520        let config = BackpressureConfig {
521            strategy: BackpressureStrategy::Block,
522            high_watermark: 2,
523            low_watermark: 1,
524            window_ms: 100,
525        };
526        let mut ctrl = BackpressureController::new(config);
527        ctrl.try_accept(0); // depth=1
528        ctrl.try_accept(0); // depth=2, bp activated
529        let d = ctrl.try_accept(0);
530        assert_eq!(d, BackpressureDecision::ThrottleDelay(u64::MAX));
531    }
532
533    // ── spill-to-disk strategy ───────────────────────────────────────────
534
535    #[test]
536    fn test_spill_to_disk_records_as_dropped() {
537        let config = BackpressureConfig {
538            strategy: BackpressureStrategy::SpillToDisk {
539                path: "/tmp/spill.bin".to_string(),
540            },
541            high_watermark: 2,
542            low_watermark: 1,
543            window_ms: 100,
544        };
545        let mut ctrl = BackpressureController::new(config);
546        ctrl.try_accept(0);
547        ctrl.try_accept(0); // bp activated
548        let d = ctrl.try_accept(0);
549        assert_eq!(d, BackpressureDecision::Drop);
550        assert_eq!(ctrl.stats().items_dropped, 1);
551    }
552
553    // ── current_depth ────────────────────────────────────────────────────
554
555    #[test]
556    fn test_current_depth_tracks_accept_and_dequeue() {
557        let mut ctrl = BackpressureController::new(drop_config(100, 50));
558        assert_eq!(ctrl.current_depth(), 0);
559        ctrl.try_accept(0);
560        ctrl.try_accept(1);
561        assert_eq!(ctrl.current_depth(), 2);
562        ctrl.record_dequeue();
563        assert_eq!(ctrl.current_depth(), 1);
564    }
565
566    // ── edge cases ───────────────────────────────────────────────────────
567
568    #[test]
569    fn test_zero_rate_throttle_returns_max_delay() {
570        let mut ctrl = BackpressureController::new(throttle_config(0.0, 2, 1));
571        ctrl.try_accept(0); // depth=1
572        ctrl.try_accept(0); // depth=2, bp activated
573                            // tokens = 0 initially for zero rate; first try in bp
574        ctrl.try_accept(0); // uses tokens=0 → throttle
575        let d = ctrl.try_accept(0);
576        assert!(matches!(d, BackpressureDecision::ThrottleDelay(_)));
577    }
578
579    #[test]
580    fn test_high_watermark_equals_one() {
581        let mut ctrl = BackpressureController::new(drop_config(1, 0));
582        let d = ctrl.try_accept(0); // depth becomes 1 → at high watermark
583        assert_eq!(d, BackpressureDecision::Accept);
584        // bp is not yet active (activated on next accept above threshold)
585        let d2 = ctrl.try_accept(0);
586        assert_eq!(d2, BackpressureDecision::Drop);
587    }
588
589    #[test]
590    fn test_stats_ref_is_consistent() {
591        let mut ctrl = BackpressureController::new(drop_config(5, 2));
592        ctrl.try_accept(0);
593        ctrl.try_accept(1);
594        let s = ctrl.stats();
595        assert_eq!(s.current_queue_depth, 2);
596        assert_eq!(s.peak_queue_depth, 2);
597        assert_eq!(s.items_dropped, 0);
598    }
599
600    #[test]
601    fn test_multiple_backpressure_cycles() {
602        // high_watermark=3: bp activates when depth is already 3 (on the 4th call)
603        let mut ctrl = BackpressureController::new(drop_config(3, 1));
604        // First cycle: fill to high watermark then trigger bp
605        ctrl.try_accept(0); // depth=1
606        ctrl.try_accept(0); // depth=2
607        ctrl.try_accept(0); // depth=3 (normal accept, no bp yet)
608        ctrl.try_accept(0); // depth=3 (above high) → bp on, event_count=1 → Drop
609                            // Drain below low watermark (1)
610        ctrl.record_dequeue(); // 2
611        ctrl.record_dequeue(); // 1 — at low watermark → bp off
612                               // Second cycle
613        ctrl.try_accept(0); // depth=2
614        ctrl.try_accept(0); // depth=3 (normal accept)
615        ctrl.try_accept(0); // above high → bp on again, event_count=2 → Drop
616        assert_eq!(ctrl.stats().backpressure_events, 2);
617    }
618}