oxirs_stream/
backpressure.rs

1//! Backpressure and Flow Control
2//!
3//! This module provides sophisticated backpressure handling for stream processing:
4//! - Dynamic rate limiting based on system load
5//! - Buffer management with overflow strategies
6//! - Flow control signals
7//! - Adaptive throttling
8//! - Queue depth monitoring
9//!
10//! Uses SciRS2 for adaptive algorithm tuning
11
12use anyhow::{anyhow, Result};
13use chrono::{DateTime, Duration as ChronoDuration, Utc};
14use serde::{Deserialize, Serialize};
15use std::collections::VecDeque;
16use std::sync::Arc;
17use tokio::sync::{Mutex, Semaphore};
18use tracing::{debug, warn};
19
20// Use scirs2-core for adaptive algorithms (reserved for future use)
21
22/// Backpressure strategy
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub enum BackpressureStrategy {
25    /// Drop oldest events when buffer is full
26    DropOldest,
27    /// Drop newest events when buffer is full
28    DropNewest,
29    /// Block until space is available
30    Block,
31    /// Exponential backoff with retries
32    ExponentialBackoff {
33        initial_delay_ms: u64,
34        max_delay_ms: u64,
35        multiplier: f64,
36    },
37    /// Adaptive throttling based on throughput
38    Adaptive {
39        target_throughput: f64,
40        adjustment_factor: f64,
41    },
42}
43
44/// Flow control signal
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum FlowControlSignal {
47    /// System is healthy, proceed normally
48    Proceed,
49    /// System is under pressure, slow down
50    SlowDown,
51    /// System is overloaded, stop sending
52    Stop,
53}
54
55/// Backpressure configuration
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct BackpressureConfig {
58    /// Maximum buffer size
59    pub max_buffer_size: usize,
60    /// Backpressure strategy
61    pub strategy: BackpressureStrategy,
62    /// High water mark (percentage of buffer)
63    pub high_water_mark: f64,
64    /// Low water mark (percentage of buffer)
65    pub low_water_mark: f64,
66    /// Enable adaptive throttling
67    pub enable_adaptive: bool,
68    /// Measurement window for throughput
69    pub measurement_window: ChronoDuration,
70}
71
72impl Default for BackpressureConfig {
73    fn default() -> Self {
74        Self {
75            max_buffer_size: 10000,
76            strategy: BackpressureStrategy::Block,
77            high_water_mark: 0.8,
78            low_water_mark: 0.2,
79            enable_adaptive: true,
80            measurement_window: ChronoDuration::seconds(10),
81        }
82    }
83}
84
85/// Backpressure controller statistics
86#[derive(Debug, Clone, Default)]
87pub struct BackpressureStats {
88    pub events_received: u64,
89    pub events_processed: u64,
90    pub events_dropped: u64,
91    pub events_blocked: u64,
92    pub buffer_size: usize,
93    pub buffer_utilization: f64,
94    pub current_throughput: f64,
95    pub backpressure_events: u64,
96    pub avg_latency_ms: f64,
97}
98
99/// Type alias for timestamped buffer elements
100type TimestampedBuffer<T> = Arc<Mutex<VecDeque<(T, DateTime<Utc>)>>>;
101
102/// Type alias for throughput history
103type ThroughputHistory = Arc<Mutex<VecDeque<(DateTime<Utc>, u64)>>>;
104
105/// Backpressure controller
106pub struct BackpressureController<T> {
107    config: BackpressureConfig,
108    buffer: TimestampedBuffer<T>,
109    stats: Arc<Mutex<BackpressureStats>>,
110    flow_control: Arc<Mutex<FlowControlSignal>>,
111    semaphore: Arc<Semaphore>,
112    throughput_history: ThroughputHistory,
113}
114
115impl<T: Clone + Send> BackpressureController<T> {
116    /// Create a new backpressure controller
117    pub fn new(config: BackpressureConfig) -> Self {
118        let max_permits = config.max_buffer_size;
119
120        Self {
121            config,
122            buffer: Arc::new(Mutex::new(VecDeque::new())),
123            stats: Arc::new(Mutex::new(BackpressureStats::default())),
124            flow_control: Arc::new(Mutex::new(FlowControlSignal::Proceed)),
125            semaphore: Arc::new(Semaphore::new(max_permits)),
126            throughput_history: Arc::new(Mutex::new(VecDeque::new())),
127        }
128    }
129
130    /// Offer an event to the controller
131    pub async fn offer(&self, event: T) -> Result<()> {
132        let mut stats = self.stats.lock().await;
133        stats.events_received += 1;
134        drop(stats);
135
136        match &self.config.strategy {
137            BackpressureStrategy::DropOldest => self.offer_drop_oldest(event).await,
138            BackpressureStrategy::DropNewest => self.offer_drop_newest(event).await,
139            BackpressureStrategy::Block => self.offer_blocking(event).await,
140            BackpressureStrategy::ExponentialBackoff {
141                initial_delay_ms,
142                max_delay_ms,
143                multiplier,
144            } => {
145                self.offer_with_backoff(event, *initial_delay_ms, *max_delay_ms, *multiplier)
146                    .await
147            }
148            BackpressureStrategy::Adaptive {
149                target_throughput,
150                adjustment_factor,
151            } => {
152                self.offer_adaptive(event, *target_throughput, *adjustment_factor)
153                    .await
154            }
155        }
156    }
157
158    /// Offer with drop oldest strategy
159    async fn offer_drop_oldest(&self, event: T) -> Result<()> {
160        let mut buffer = self.buffer.lock().await;
161
162        if buffer.len() >= self.config.max_buffer_size {
163            // Drop oldest
164            buffer.pop_front();
165
166            let mut stats = self.stats.lock().await;
167            stats.events_dropped += 1;
168            drop(stats);
169
170            warn!("Buffer full, dropped oldest event");
171        }
172
173        buffer.push_back((event, Utc::now()));
174        self.update_flow_control(buffer.len()).await;
175
176        Ok(())
177    }
178
179    /// Offer with drop newest strategy
180    async fn offer_drop_newest(&self, event: T) -> Result<()> {
181        let mut buffer = self.buffer.lock().await;
182
183        if buffer.len() >= self.config.max_buffer_size {
184            let mut stats = self.stats.lock().await;
185            stats.events_dropped += 1;
186            drop(stats);
187
188            warn!("Buffer full, dropped newest event");
189            return Ok(());
190        }
191
192        buffer.push_back((event, Utc::now()));
193        self.update_flow_control(buffer.len()).await;
194
195        Ok(())
196    }
197
198    /// Offer with blocking strategy
199    async fn offer_blocking(&self, event: T) -> Result<()> {
200        // Acquire semaphore permit
201        let _permit = self
202            .semaphore
203            .acquire()
204            .await
205            .map_err(|e| anyhow!("Failed to acquire semaphore: {}", e))?;
206
207        let mut buffer = self.buffer.lock().await;
208        buffer.push_back((event, Utc::now()));
209
210        let buffer_size = buffer.len();
211        drop(buffer);
212
213        self.update_flow_control(buffer_size).await;
214
215        Ok(())
216    }
217
218    /// Offer with exponential backoff
219    async fn offer_with_backoff(
220        &self,
221        event: T,
222        initial_delay_ms: u64,
223        max_delay_ms: u64,
224        multiplier: f64,
225    ) -> Result<()> {
226        let mut delay_ms = initial_delay_ms;
227        let mut retries = 0;
228        const MAX_RETRIES: u32 = 10;
229
230        loop {
231            let buffer = self.buffer.lock().await;
232            let buffer_size = buffer.len();
233            drop(buffer);
234
235            if buffer_size < self.config.max_buffer_size {
236                let mut buffer = self.buffer.lock().await;
237                buffer.push_back((event, Utc::now()));
238                drop(buffer);
239
240                self.update_flow_control(buffer_size + 1).await;
241                return Ok(());
242            }
243
244            if retries >= MAX_RETRIES {
245                let mut stats = self.stats.lock().await;
246                stats.events_dropped += 1;
247                return Err(anyhow!("Max retries exceeded, dropping event"));
248            }
249
250            // Exponential backoff
251            tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
252
253            delay_ms = ((delay_ms as f64 * multiplier) as u64).min(max_delay_ms);
254            retries += 1;
255
256            let mut stats = self.stats.lock().await;
257            stats.events_blocked += 1;
258            drop(stats);
259        }
260    }
261
262    /// Offer with adaptive throttling using SciRS2
263    async fn offer_adaptive(
264        &self,
265        event: T,
266        target_throughput: f64,
267        adjustment_factor: f64,
268    ) -> Result<()> {
269        // Measure current throughput
270        let current_throughput = self.measure_throughput().await;
271
272        // Adaptive delay based on throughput
273        if current_throughput > target_throughput {
274            let delay_ms =
275                ((current_throughput / target_throughput - 1.0) * adjustment_factor) as u64;
276            tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
277        }
278
279        // Add to buffer
280        let mut buffer = self.buffer.lock().await;
281
282        if buffer.len() >= self.config.max_buffer_size {
283            let mut stats = self.stats.lock().await;
284            stats.events_dropped += 1;
285            drop(stats);
286
287            return Err(anyhow!("Buffer full even with adaptive throttling"));
288        }
289
290        buffer.push_back((event, Utc::now()));
291        let buffer_size = buffer.len();
292        drop(buffer);
293
294        self.update_flow_control(buffer_size).await;
295
296        Ok(())
297    }
298
299    /// Poll an event from the controller
300    pub async fn poll(&self) -> Result<Option<T>> {
301        let mut buffer = self.buffer.lock().await;
302
303        if let Some((event, timestamp)) = buffer.pop_front() {
304            let buffer_size = buffer.len();
305            drop(buffer);
306
307            // Release semaphore permit
308            self.semaphore.add_permits(1);
309
310            // Update stats
311            let mut stats = self.stats.lock().await;
312            stats.events_processed += 1;
313
314            let latency = (Utc::now() - timestamp).num_milliseconds() as f64;
315            let alpha = 0.1;
316            stats.avg_latency_ms = alpha * latency + (1.0 - alpha) * stats.avg_latency_ms;
317
318            drop(stats);
319
320            self.update_flow_control(buffer_size).await;
321            self.record_throughput().await;
322
323            Ok(Some(event))
324        } else {
325            Ok(None)
326        }
327    }
328
329    /// Update flow control signal
330    async fn update_flow_control(&self, buffer_size: usize) {
331        let utilization = buffer_size as f64 / self.config.max_buffer_size as f64;
332
333        let signal = if utilization >= self.config.high_water_mark {
334            FlowControlSignal::Stop
335        } else if utilization >= self.config.low_water_mark {
336            FlowControlSignal::SlowDown
337        } else {
338            FlowControlSignal::Proceed
339        };
340
341        let mut flow_control = self.flow_control.lock().await;
342        if *flow_control != signal {
343            debug!(
344                "Flow control signal changed: {:?} -> {:?}",
345                *flow_control, signal
346            );
347
348            if signal != FlowControlSignal::Proceed {
349                let mut stats = self.stats.lock().await;
350                stats.backpressure_events += 1;
351            }
352        }
353        *flow_control = signal;
354
355        // Update stats
356        let mut stats = self.stats.lock().await;
357        stats.buffer_size = buffer_size;
358        stats.buffer_utilization = utilization;
359    }
360
361    /// Record throughput measurement
362    async fn record_throughput(&self) {
363        let now = Utc::now();
364        let mut history = self.throughput_history.lock().await;
365
366        history.push_back((now, 1));
367
368        // Clean old measurements
369        let window_start = now - self.config.measurement_window;
370        while let Some((timestamp, _)) = history.front() {
371            if *timestamp < window_start {
372                history.pop_front();
373            } else {
374                break;
375            }
376        }
377    }
378
379    /// Measure current throughput
380    async fn measure_throughput(&self) -> f64 {
381        let now = Utc::now();
382        let history = self.throughput_history.lock().await;
383
384        if history.is_empty() {
385            return 0.0;
386        }
387
388        let window_start = now - self.config.measurement_window;
389        let count: u64 = history
390            .iter()
391            .filter(|(timestamp, _)| *timestamp >= window_start)
392            .map(|(_, count)| count)
393            .sum();
394
395        let elapsed_seconds = self.config.measurement_window.num_seconds() as f64;
396        count as f64 / elapsed_seconds
397    }
398
399    /// Get current flow control signal
400    pub async fn flow_control_signal(&self) -> FlowControlSignal {
401        *self.flow_control.lock().await
402    }
403
404    /// Get statistics
405    pub async fn stats(&self) -> BackpressureStats {
406        let stats = self.stats.lock().await;
407        let mut result = stats.clone();
408
409        // Update current throughput
410        drop(stats);
411        result.current_throughput = self.measure_throughput().await;
412
413        result
414    }
415
416    /// Get buffer size
417    pub async fn buffer_size(&self) -> usize {
418        self.buffer.lock().await.len()
419    }
420
421    /// Clear buffer
422    pub async fn clear(&self) {
423        let mut buffer = self.buffer.lock().await;
424        let cleared_count = buffer.len();
425        buffer.clear();
426
427        // Release all permits
428        self.semaphore.add_permits(cleared_count);
429
430        let mut stats = self.stats.lock().await;
431        stats.buffer_size = 0;
432        stats.buffer_utilization = 0.0;
433    }
434}
435
436/// Rate limiter with token bucket algorithm
437pub struct RateLimiter {
438    tokens: Arc<Mutex<f64>>,
439    max_tokens: f64,
440    refill_rate: f64, // tokens per second
441    last_refill: Arc<Mutex<DateTime<Utc>>>,
442}
443
444impl RateLimiter {
445    /// Create a new rate limiter
446    pub fn new(max_tokens: f64, refill_rate: f64) -> Self {
447        Self {
448            tokens: Arc::new(Mutex::new(max_tokens)),
449            max_tokens,
450            refill_rate,
451            last_refill: Arc::new(Mutex::new(Utc::now())),
452        }
453    }
454
455    /// Try to acquire a token
456    pub async fn try_acquire(&self) -> bool {
457        self.refill_tokens().await;
458
459        let mut tokens = self.tokens.lock().await;
460        if *tokens >= 1.0 {
461            *tokens -= 1.0;
462            true
463        } else {
464            false
465        }
466    }
467
468    /// Acquire a token (blocking)
469    pub async fn acquire(&self) -> Result<()> {
470        loop {
471            if self.try_acquire().await {
472                return Ok(());
473            }
474
475            // Wait for refill
476            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
477        }
478    }
479
480    /// Refill tokens based on elapsed time
481    async fn refill_tokens(&self) {
482        let now = Utc::now();
483        let mut last_refill = self.last_refill.lock().await;
484
485        let elapsed = (now - *last_refill).num_milliseconds() as f64 / 1000.0;
486        let new_tokens = elapsed * self.refill_rate;
487
488        if new_tokens > 0.0 {
489            let mut tokens = self.tokens.lock().await;
490            *tokens = (*tokens + new_tokens).min(self.max_tokens);
491            *last_refill = now;
492        }
493    }
494
495    /// Get current token count
496    pub async fn available_tokens(&self) -> f64 {
497        self.refill_tokens().await;
498        *self.tokens.lock().await
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505
506    #[tokio::test]
507    async fn test_backpressure_drop_oldest() {
508        let config = BackpressureConfig {
509            max_buffer_size: 3,
510            strategy: BackpressureStrategy::DropOldest,
511            ..Default::default()
512        };
513
514        let controller = BackpressureController::new(config);
515
516        // Fill buffer
517        for i in 0..5 {
518            controller.offer(i).await.unwrap();
519        }
520
521        // Should have dropped 2 oldest (0, 1)
522        assert_eq!(controller.buffer_size().await, 3);
523
524        // Poll should return 2 (oldest after drops)
525        let event = controller.poll().await.unwrap().unwrap();
526        assert_eq!(event, 2);
527    }
528
529    #[tokio::test]
530    async fn test_backpressure_drop_newest() {
531        let config = BackpressureConfig {
532            max_buffer_size: 3,
533            strategy: BackpressureStrategy::DropNewest,
534            ..Default::default()
535        };
536
537        let controller = BackpressureController::new(config);
538
539        // Fill buffer
540        for i in 0..5 {
541            controller.offer(i).await.unwrap();
542        }
543
544        // Should have kept first 3, dropped 3 and 4
545        assert_eq!(controller.buffer_size().await, 3);
546
547        let event = controller.poll().await.unwrap().unwrap();
548        assert_eq!(event, 0);
549    }
550
551    #[tokio::test]
552    async fn test_flow_control_signals() {
553        let config = BackpressureConfig {
554            max_buffer_size: 100,
555            high_water_mark: 0.8,
556            low_water_mark: 0.2,
557            ..Default::default()
558        };
559
560        let controller = BackpressureController::new(config);
561
562        // Low utilization
563        assert_eq!(
564            controller.flow_control_signal().await,
565            FlowControlSignal::Proceed
566        );
567
568        // Fill to medium utilization
569        for i in 0..30 {
570            controller.offer(i).await.unwrap();
571        }
572
573        assert_eq!(
574            controller.flow_control_signal().await,
575            FlowControlSignal::SlowDown
576        );
577
578        // Fill to high utilization
579        for i in 30..85 {
580            controller.offer(i).await.unwrap();
581        }
582
583        assert_eq!(
584            controller.flow_control_signal().await,
585            FlowControlSignal::Stop
586        );
587    }
588
589    #[tokio::test]
590    async fn test_rate_limiter() {
591        let limiter = RateLimiter::new(10.0, 10.0); // 10 tokens, 10/second refill
592
593        // Should be able to acquire 10 tokens
594        for _ in 0..10 {
595            assert!(limiter.try_acquire().await);
596        }
597
598        // 11th should fail
599        assert!(!limiter.try_acquire().await);
600
601        // Wait for refill
602        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
603
604        // Should be able to acquire again
605        assert!(limiter.try_acquire().await);
606    }
607}