Skip to main content

oxigdal_streaming/core/
backpressure.rs

1//! Backpressure handling for stream processing.
2
3use crate::error::{Result, StreamingError};
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::time::{Duration, Instant};
8use tokio::sync::RwLock;
9
10/// Strategy for handling backpressure.
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
12pub enum BackpressureStrategy {
13    /// Block until capacity is available
14    Block,
15
16    /// Drop oldest elements
17    DropOldest,
18
19    /// Drop newest elements
20    DropNewest,
21
22    /// Fail the operation
23    Fail,
24
25    /// Adaptive strategy based on load
26    Adaptive,
27}
28
29/// Configuration for backpressure management.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct BackpressureConfig {
32    /// Strategy to use
33    pub strategy: BackpressureStrategy,
34
35    /// High watermark (percentage)
36    pub high_watermark: f64,
37
38    /// Low watermark (percentage)
39    pub low_watermark: f64,
40
41    /// Maximum latency threshold
42    pub max_latency: Duration,
43
44    /// Sample window for metrics
45    pub sample_window: Duration,
46
47    /// Enable adaptive backpressure
48    pub adaptive: bool,
49}
50
51impl Default for BackpressureConfig {
52    fn default() -> Self {
53        Self {
54            strategy: BackpressureStrategy::Block,
55            high_watermark: 0.8,
56            low_watermark: 0.2,
57            max_latency: Duration::from_secs(1),
58            sample_window: Duration::from_secs(10),
59            adaptive: true,
60        }
61    }
62}
63
64/// Metrics for load monitoring.
65#[derive(Debug, Clone)]
66pub struct LoadMetrics {
67    /// Current buffer utilization (0.0 to 1.0)
68    pub buffer_utilization: f64,
69
70    /// Average latency
71    pub avg_latency: Duration,
72
73    /// Peak latency
74    pub peak_latency: Duration,
75
76    /// Throughput (elements per second)
77    pub throughput: f64,
78
79    /// Number of dropped elements
80    pub dropped_elements: u64,
81
82    /// Number of backpressure events
83    pub backpressure_events: u64,
84}
85
86impl Default for LoadMetrics {
87    fn default() -> Self {
88        Self {
89            buffer_utilization: 0.0,
90            avg_latency: Duration::ZERO,
91            peak_latency: Duration::ZERO,
92            throughput: 0.0,
93            dropped_elements: 0,
94            backpressure_events: 0,
95        }
96    }
97}
98
99/// Manages backpressure for a stream.
100pub struct BackpressureManager {
101    config: BackpressureConfig,
102    metrics: Arc<RwLock<LoadMetrics>>,
103    buffer_capacity: AtomicUsize,
104    buffer_size: AtomicUsize,
105    elements_processed: AtomicU64,
106    elements_dropped: AtomicU64,
107    backpressure_events: AtomicU64,
108    last_sample: Arc<RwLock<Instant>>,
109    sample_start: Instant,
110}
111
112impl BackpressureManager {
113    /// Create a new backpressure manager.
114    pub fn new(config: BackpressureConfig, buffer_capacity: usize) -> Self {
115        Self {
116            config,
117            metrics: Arc::new(RwLock::new(LoadMetrics::default())),
118            buffer_capacity: AtomicUsize::new(buffer_capacity),
119            buffer_size: AtomicUsize::new(0),
120            elements_processed: AtomicU64::new(0),
121            elements_dropped: AtomicU64::new(0),
122            backpressure_events: AtomicU64::new(0),
123            last_sample: Arc::new(RwLock::new(Instant::now())),
124            sample_start: Instant::now(),
125        }
126    }
127
128    /// Check if backpressure should be applied.
129    pub async fn should_apply_backpressure(&self) -> bool {
130        let utilization = self.buffer_utilization();
131        utilization >= self.config.high_watermark
132    }
133
134    /// Check if backpressure can be released.
135    pub async fn can_release_backpressure(&self) -> bool {
136        let utilization = self.buffer_utilization();
137        utilization <= self.config.low_watermark
138    }
139
140    /// Handle a new element arrival.
141    pub async fn handle_element_arrival(&self) -> Result<bool> {
142        let current_size = self.buffer_size.load(Ordering::Relaxed);
143        let capacity = self.buffer_capacity.load(Ordering::Relaxed);
144
145        if current_size >= capacity {
146            self.backpressure_events.fetch_add(1, Ordering::Relaxed);
147
148            match self.config.strategy {
149                BackpressureStrategy::Block => {
150                    return Ok(false);
151                }
152                BackpressureStrategy::DropOldest | BackpressureStrategy::DropNewest => {
153                    self.elements_dropped.fetch_add(1, Ordering::Relaxed);
154                    return Ok(true);
155                }
156                BackpressureStrategy::Fail => {
157                    return Err(StreamingError::BufferFull);
158                }
159                BackpressureStrategy::Adaptive => {
160                    if self.should_apply_backpressure().await {
161                        return Ok(false);
162                    }
163                }
164            }
165        }
166
167        self.buffer_size.fetch_add(1, Ordering::Relaxed);
168        Ok(true)
169    }
170
171    /// Handle element processing completion.
172    pub async fn handle_element_processed(&self, latency: Duration) {
173        self.buffer_size.fetch_sub(1, Ordering::Relaxed);
174        self.elements_processed.fetch_add(1, Ordering::Relaxed);
175
176        // Update metrics
177        self.update_metrics(latency).await;
178    }
179
180    /// Update metrics based on current state.
181    async fn update_metrics(&self, latency: Duration) {
182        let now = Instant::now();
183        let last_sample = *self.last_sample.read().await;
184
185        if now.duration_since(last_sample) >= self.config.sample_window {
186            let mut metrics = self.metrics.write().await;
187            let mut last = self.last_sample.write().await;
188
189            metrics.buffer_utilization = self.buffer_utilization();
190            metrics.dropped_elements = self.elements_dropped.load(Ordering::Relaxed);
191            metrics.backpressure_events = self.backpressure_events.load(Ordering::Relaxed);
192
193            let elapsed = now.duration_since(self.sample_start).as_secs_f64();
194            let processed = self.elements_processed.load(Ordering::Relaxed);
195            metrics.throughput = processed as f64 / elapsed;
196
197            if latency > metrics.peak_latency {
198                metrics.peak_latency = latency;
199            }
200
201            // Simple moving average for latency
202            let alpha = 0.1;
203            let new_latency_secs = latency.as_secs_f64();
204            let old_latency_secs = metrics.avg_latency.as_secs_f64();
205            let avg_latency_secs = alpha * new_latency_secs + (1.0 - alpha) * old_latency_secs;
206            metrics.avg_latency = Duration::from_secs_f64(avg_latency_secs);
207
208            *last = now;
209        }
210    }
211
212    /// Get current buffer utilization.
213    fn buffer_utilization(&self) -> f64 {
214        let size = self.buffer_size.load(Ordering::Relaxed);
215        let capacity = self.buffer_capacity.load(Ordering::Relaxed);
216
217        if capacity == 0 {
218            0.0
219        } else {
220            size as f64 / capacity as f64
221        }
222    }
223
224    /// Get current metrics.
225    pub async fn metrics(&self) -> LoadMetrics {
226        self.metrics.read().await.clone()
227    }
228
229    /// Set buffer capacity.
230    pub fn set_capacity(&self, capacity: usize) {
231        self.buffer_capacity.store(capacity, Ordering::Relaxed);
232    }
233
234    /// Get buffer capacity.
235    pub fn capacity(&self) -> usize {
236        self.buffer_capacity.load(Ordering::Relaxed)
237    }
238
239    /// Get current buffer size.
240    pub fn size(&self) -> usize {
241        self.buffer_size.load(Ordering::Relaxed)
242    }
243
244    /// Reset metrics.
245    pub async fn reset_metrics(&self) {
246        let mut metrics = self.metrics.write().await;
247        *metrics = LoadMetrics::default();
248
249        self.elements_processed.store(0, Ordering::Relaxed);
250        self.elements_dropped.store(0, Ordering::Relaxed);
251        self.backpressure_events.store(0, Ordering::Relaxed);
252    }
253
254    /// Adaptive capacity adjustment based on load.
255    pub async fn adjust_capacity_adaptive(&self) {
256        // Use real-time buffer utilization instead of cached metrics
257        let utilization = self.buffer_utilization();
258        let metrics = self.metrics().await;
259
260        if utilization > self.config.high_watermark && metrics.avg_latency < self.config.max_latency
261        {
262            let current = self.buffer_capacity.load(Ordering::Relaxed);
263            let new_capacity = (current as f64 * 1.2) as usize;
264            self.buffer_capacity.store(new_capacity, Ordering::Relaxed);
265        } else if utilization < self.config.low_watermark {
266            let current = self.buffer_capacity.load(Ordering::Relaxed);
267            let new_capacity = ((current as f64 * 0.8) as usize).max(64);
268            self.buffer_capacity.store(new_capacity, Ordering::Relaxed);
269        }
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276
277    #[tokio::test]
278    async fn test_backpressure_manager_creation() {
279        let config = BackpressureConfig::default();
280        let manager = BackpressureManager::new(config, 1000);
281
282        assert_eq!(manager.capacity(), 1000);
283        assert_eq!(manager.size(), 0);
284    }
285
286    #[tokio::test]
287    async fn test_buffer_utilization() {
288        let config = BackpressureConfig::default();
289        let manager = BackpressureManager::new(config, 100);
290
291        assert_eq!(manager.buffer_utilization(), 0.0);
292
293        for _ in 0..50 {
294            manager
295                .handle_element_arrival()
296                .await
297                .expect("backpressure element arrival should succeed");
298        }
299
300        assert!((manager.buffer_utilization() - 0.5).abs() < 0.01);
301    }
302
303    #[tokio::test]
304    async fn test_backpressure_application() {
305        let config = BackpressureConfig {
306            high_watermark: 0.5,
307            ..Default::default()
308        };
309        let manager = BackpressureManager::new(config, 100);
310
311        for _ in 0..55 {
312            manager
313                .handle_element_arrival()
314                .await
315                .expect("backpressure element arrival should succeed");
316        }
317
318        assert!(manager.should_apply_backpressure().await);
319    }
320
321    #[tokio::test]
322    async fn test_adaptive_capacity_adjustment() {
323        let config = BackpressureConfig::default();
324        let manager = BackpressureManager::new(config, 100);
325
326        let initial_capacity = manager.capacity();
327
328        for _ in 0..95 {
329            manager
330                .handle_element_arrival()
331                .await
332                .expect("backpressure element arrival should succeed");
333        }
334
335        manager.adjust_capacity_adaptive().await;
336        let new_capacity = manager.capacity();
337
338        assert!(new_capacity > initial_capacity);
339    }
340}