fluxus_core/error_handling/
backpressure.rs

1use std::time::Duration;
2
3/// Backpressure strategy for handling overload
4#[derive(Debug, Clone)]
5pub enum BackpressureStrategy {
6    /// Block when buffer is full
7    Block,
8    /// Drop oldest items when buffer is full
9    DropOldest,
10    /// Drop newest items when buffer is full
11    DropNewest,
12    /// Apply backpressure with custom threshold
13    Throttle {
14        high_watermark: usize,
15        low_watermark: usize,
16        backoff: Duration,
17    },
18}
19
20/// Backpressure controller for managing load
21pub struct BackpressureController {
22    strategy: BackpressureStrategy,
23    current_load: usize,
24}
25
26impl BackpressureController {
27    /// Create a new backpressure controller with the given strategy
28    pub fn new(strategy: BackpressureStrategy) -> Self {
29        Self {
30            strategy,
31            current_load: 0,
32        }
33    }
34
35    /// Check if we should apply backpressure
36    pub fn should_apply_backpressure(&self) -> bool {
37        match &self.strategy {
38            BackpressureStrategy::Block => self.current_load > 0,
39            BackpressureStrategy::DropOldest | BackpressureStrategy::DropNewest => false,
40            BackpressureStrategy::Throttle { high_watermark, .. } => {
41                self.current_load >= *high_watermark
42            }
43        }
44    }
45
46    /// Get the backoff duration if throttling is needed
47    pub fn get_backoff(&self) -> Option<Duration> {
48        match &self.strategy {
49            BackpressureStrategy::Throttle { backoff, .. } => Some(*backoff),
50            _ => None,
51        }
52    }
53
54    /// Update the current load
55    pub fn update_load(&mut self, load: usize) {
56        self.current_load = load;
57    }
58
59    /// Check if we can accept more items based on the strategy
60    pub fn can_accept(&self) -> bool {
61        !self.should_apply_backpressure()
62    }
63}