Skip to main content

oxirs_stream/
backpressure.rs

1//! Backpressure and Flow Control
2//!
3//! This module provides sophisticated backpressure handling for stream processing:
4//! - Dynamic rate limiting based on system load
5//! - Buffer management with overflow strategies
6//! - Flow control signals
7//! - Adaptive throttling
8//! - Queue depth monitoring
9//! - Circuit breaker pattern for fault tolerance
10//! - Graceful degradation strategies
11//!
12//! Uses SciRS2 metrics for comprehensive monitoring
13
14use anyhow::{anyhow, Result};
15use chrono::{DateTime, Duration as ChronoDuration, Utc};
16use scirs2_core::metrics::{Counter, Gauge, Histogram};
17use serde::{Deserialize, Serialize};
18use std::collections::VecDeque;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{Mutex, Semaphore};
23use tracing::{debug, info, warn};
24
25type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
26
27/// Circuit breaker state for backpressure control
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
29pub enum CircuitState {
30    /// Circuit is closed, normal operation
31    #[default]
32    Closed,
33    /// Circuit is open, rejecting requests
34    Open,
35    /// Circuit is half-open, testing recovery
36    HalfOpen,
37}
38
39/// Graceful degradation strategy
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum DegradationStrategy {
42    /// Reduce throughput by percentage
43    ReduceThroughput { reduction_percent: f64 },
44    /// Skip non-critical operations
45    SkipNonCritical,
46    /// Increase buffer size temporarily
47    ExpandBuffer { factor: f64 },
48    /// Sample events (keep every Nth event)
49    Sampling { sample_rate: f64 },
50    /// Combined strategies
51    Combined(Vec<DegradationStrategy>),
52}
53
54/// Backpressure strategy
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub enum BackpressureStrategy {
57    /// Drop oldest events when buffer is full
58    DropOldest,
59    /// Drop newest events when buffer is full
60    DropNewest,
61    /// Block until space is available
62    Block,
63    /// Exponential backoff with retries
64    ExponentialBackoff {
65        initial_delay_ms: u64,
66        max_delay_ms: u64,
67        multiplier: f64,
68    },
69    /// Adaptive throttling based on throughput
70    Adaptive {
71        target_throughput: f64,
72        adjustment_factor: f64,
73    },
74}
75
76/// Flow control signal
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum FlowControlSignal {
79    /// System is healthy, proceed normally
80    Proceed,
81    /// System is under pressure, slow down
82    SlowDown,
83    /// System is overloaded, stop sending
84    Stop,
85}
86
87/// Circuit breaker configuration
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct CircuitBreakerConfig {
90    /// Enable circuit breaker
91    pub enabled: bool,
92    /// Failure threshold to open circuit
93    pub failure_threshold: u32,
94    /// Success threshold to close circuit
95    pub success_threshold: u32,
96    /// Timeout before transitioning to half-open
97    pub timeout: Duration,
98    /// Maximum calls in half-open state
99    pub half_open_max_calls: u32,
100}
101
102impl Default for CircuitBreakerConfig {
103    fn default() -> Self {
104        Self {
105            enabled: true,
106            failure_threshold: 5,
107            success_threshold: 3,
108            timeout: Duration::from_secs(30),
109            half_open_max_calls: 3,
110        }
111    }
112}
113
114/// Backpressure configuration
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct BackpressureConfig {
117    /// Maximum buffer size
118    pub max_buffer_size: usize,
119    /// Backpressure strategy
120    pub strategy: BackpressureStrategy,
121    /// High water mark (percentage of buffer)
122    pub high_water_mark: f64,
123    /// Low water mark (percentage of buffer)
124    pub low_water_mark: f64,
125    /// Enable adaptive throttling
126    pub enable_adaptive: bool,
127    /// Measurement window for throughput
128    pub measurement_window: ChronoDuration,
129    /// Circuit breaker configuration
130    pub circuit_breaker: CircuitBreakerConfig,
131    /// Degradation strategy when under pressure
132    pub degradation: DegradationStrategy,
133}
134
135impl Default for BackpressureConfig {
136    fn default() -> Self {
137        Self {
138            max_buffer_size: 10000,
139            strategy: BackpressureStrategy::Block,
140            high_water_mark: 0.8,
141            low_water_mark: 0.2,
142            enable_adaptive: true,
143            measurement_window: ChronoDuration::seconds(10),
144            circuit_breaker: CircuitBreakerConfig::default(),
145            degradation: DegradationStrategy::ReduceThroughput {
146                reduction_percent: 50.0,
147            },
148        }
149    }
150}
151
152/// Backpressure controller statistics
153#[derive(Debug, Clone, Default)]
154pub struct BackpressureStats {
155    pub events_received: u64,
156    pub events_processed: u64,
157    pub events_dropped: u64,
158    pub events_blocked: u64,
159    pub buffer_size: usize,
160    pub buffer_utilization: f64,
161    pub current_throughput: f64,
162    pub backpressure_events: u64,
163    pub avg_latency_ms: f64,
164    pub circuit_state: CircuitState,
165    pub circuit_failures: u32,
166    pub circuit_successes: u32,
167    pub degradation_active: bool,
168}
169
170/// Type alias for timestamped buffer elements
171type TimestampedBuffer<T> = Arc<Mutex<VecDeque<(T, DateTime<Utc>)>>>;
172
173/// Type alias for throughput history
174type ThroughputHistory = Arc<Mutex<VecDeque<(DateTime<Utc>, u64)>>>;
175
176/// Exported metrics snapshot
177#[derive(Debug, Clone)]
178pub struct BackpressureMetrics {
179    pub events_received: u64,
180    pub events_processed: u64,
181    pub events_dropped: u64,
182    pub queue_depth: f64,
183    pub latency_stats: scirs2_core::metrics::HistogramStats,
184    pub backpressure_events: u64,
185    pub circuit_state_changes: u64,
186}
187
188/// Backpressure controller
189pub struct BackpressureController<T> {
190    config: BackpressureConfig,
191    buffer: TimestampedBuffer<T>,
192    stats: Arc<Mutex<BackpressureStats>>,
193    flow_control: Arc<Mutex<FlowControlSignal>>,
194    semaphore: Arc<Semaphore>,
195    throughput_history: ThroughputHistory,
196    // Circuit breaker state
197    circuit_state: Arc<Mutex<CircuitState>>,
198    circuit_failures: Arc<Mutex<u32>>,
199    circuit_successes: Arc<Mutex<u32>>,
200    circuit_last_failure: Arc<Mutex<Option<Instant>>>,
201    circuit_half_open_calls: Arc<Mutex<u32>>,
202    // SciRS2 metrics
203    metrics_events_received: Arc<Counter>,
204    metrics_events_processed: Arc<Counter>,
205    metrics_events_dropped: Arc<Counter>,
206    metrics_queue_depth: Arc<Gauge>,
207    metrics_latency: Arc<Histogram>,
208    metrics_backpressure_events: Arc<Counter>,
209    metrics_circuit_state_changes: Arc<Counter>,
210}
211
212impl<T: Clone + Send> BackpressureController<T> {
213    /// Create a new backpressure controller
214    pub fn new(config: BackpressureConfig) -> Self {
215        let max_permits = config.max_buffer_size;
216
217        // Initialize SciRS2 metrics
218        let metrics_events_received =
219            Arc::new(Counter::new("backpressure_events_received".to_string()));
220        let metrics_events_processed =
221            Arc::new(Counter::new("backpressure_events_processed".to_string()));
222        let metrics_events_dropped =
223            Arc::new(Counter::new("backpressure_events_dropped".to_string()));
224        let metrics_queue_depth = Arc::new(Gauge::new("backpressure_queue_depth".to_string()));
225        let metrics_latency = Arc::new(Histogram::new("backpressure_latency_seconds".to_string()));
226        let metrics_backpressure_events =
227            Arc::new(Counter::new("backpressure_events_total".to_string()));
228        let metrics_circuit_state_changes = Arc::new(Counter::new(
229            "backpressure_circuit_state_changes".to_string(),
230        ));
231
232        Self {
233            config,
234            buffer: Arc::new(Mutex::new(VecDeque::new())),
235            stats: Arc::new(Mutex::new(BackpressureStats::default())),
236            flow_control: Arc::new(Mutex::new(FlowControlSignal::Proceed)),
237            semaphore: Arc::new(Semaphore::new(max_permits)),
238            throughput_history: Arc::new(Mutex::new(VecDeque::new())),
239            circuit_state: Arc::new(Mutex::new(CircuitState::Closed)),
240            circuit_failures: Arc::new(Mutex::new(0)),
241            circuit_successes: Arc::new(Mutex::new(0)),
242            circuit_last_failure: Arc::new(Mutex::new(None)),
243            circuit_half_open_calls: Arc::new(Mutex::new(0)),
244            metrics_events_received,
245            metrics_events_processed,
246            metrics_events_dropped,
247            metrics_queue_depth,
248            metrics_latency,
249            metrics_backpressure_events,
250            metrics_circuit_state_changes,
251        }
252    }
253
254    /// Get metrics for export
255    pub fn get_metrics(&self) -> BackpressureMetrics {
256        BackpressureMetrics {
257            events_received: self.metrics_events_received.get(),
258            events_processed: self.metrics_events_processed.get(),
259            events_dropped: self.metrics_events_dropped.get(),
260            queue_depth: self.metrics_queue_depth.get(),
261            latency_stats: self.metrics_latency.get_stats(),
262            backpressure_events: self.metrics_backpressure_events.get(),
263            circuit_state_changes: self.metrics_circuit_state_changes.get(),
264        }
265    }
266
267    /// Check circuit breaker state and handle transitions
268    async fn check_circuit_state(&self) -> Result<bool> {
269        if !self.config.circuit_breaker.enabled {
270            return Ok(true); // Circuit breaker disabled, always allow
271        }
272
273        let mut state = self.circuit_state.lock().await;
274        let circuit_config = &self.config.circuit_breaker;
275
276        match *state {
277            CircuitState::Closed => Ok(true),
278            CircuitState::Open => {
279                let last_failure = self.circuit_last_failure.lock().await;
280                if let Some(last_fail_time) = *last_failure {
281                    if last_fail_time.elapsed() >= circuit_config.timeout {
282                        // Transition to HalfOpen
283                        *state = CircuitState::HalfOpen;
284                        *self.circuit_half_open_calls.lock().await = 0;
285                        self.metrics_circuit_state_changes.inc();
286                        info!("Circuit breaker transitioned to HalfOpen");
287                        Ok(true)
288                    } else {
289                        Ok(false) // Still open, reject request
290                    }
291                } else {
292                    Ok(false)
293                }
294            }
295            CircuitState::HalfOpen => {
296                let mut half_open_calls = self.circuit_half_open_calls.lock().await;
297                if *half_open_calls < circuit_config.half_open_max_calls {
298                    *half_open_calls += 1;
299                    Ok(true)
300                } else {
301                    Ok(false) // Too many calls in half-open state
302                }
303            }
304        }
305    }
306
307    /// Record success for circuit breaker
308    async fn record_circuit_success(&self) {
309        if !self.config.circuit_breaker.enabled {
310            return;
311        }
312
313        let mut state = self.circuit_state.lock().await;
314        let circuit_config = &self.config.circuit_breaker;
315
316        match *state {
317            CircuitState::HalfOpen => {
318                let mut successes = self.circuit_successes.lock().await;
319                *successes += 1;
320                if *successes >= circuit_config.success_threshold {
321                    // Transition to Closed
322                    *state = CircuitState::Closed;
323                    *self.circuit_failures.lock().await = 0;
324                    *successes = 0; // Reset using existing lock instead of acquiring again
325                    self.metrics_circuit_state_changes.inc();
326                    info!("Circuit breaker transitioned to Closed");
327                }
328            }
329            CircuitState::Closed => {
330                // Reset failure count on success
331                *self.circuit_failures.lock().await = 0;
332            }
333            CircuitState::Open => {
334                // Should not happen, but reset if it does
335                *state = CircuitState::Closed;
336                *self.circuit_failures.lock().await = 0;
337                self.metrics_circuit_state_changes.inc();
338            }
339        }
340    }
341
342    /// Record failure for circuit breaker
343    async fn record_circuit_failure(&self) {
344        if !self.config.circuit_breaker.enabled {
345            return;
346        }
347
348        let mut state = self.circuit_state.lock().await;
349        let circuit_config = &self.config.circuit_breaker;
350        let mut failures = self.circuit_failures.lock().await;
351
352        *failures += 1;
353        *self.circuit_last_failure.lock().await = Some(Instant::now());
354
355        if *failures >= circuit_config.failure_threshold && *state != CircuitState::Open {
356            // Transition to Open
357            *state = CircuitState::Open;
358            *self.circuit_successes.lock().await = 0;
359            self.metrics_circuit_state_changes.inc();
360            warn!(
361                "Circuit breaker transitioned to Open after {} failures",
362                failures
363            );
364        }
365    }
366
367    /// Apply graceful degradation strategy
368    async fn apply_degradation(&self, _event: &T) -> Result<bool> {
369        let stats = self.stats.lock().await;
370        let utilization = stats.buffer_utilization;
371        drop(stats);
372
373        // Only apply degradation when utilization is high
374        if utilization < self.config.high_water_mark {
375            return Ok(true); // No degradation needed
376        }
377
378        self.apply_degradation_strategy(&self.config.degradation)
379            .await
380    }
381
382    /// Helper method to apply a specific degradation strategy
383    fn apply_degradation_strategy<'a>(
384        &'a self,
385        strategy: &'a DegradationStrategy,
386    ) -> BoxFuture<'a, Result<bool>> {
387        Box::pin(async move {
388            match strategy {
389                DegradationStrategy::ReduceThroughput { reduction_percent } => {
390                    // Randomly drop events based on reduction percentage
391                    let threshold = 1.0 - (reduction_percent / 100.0);
392                    Ok(fastrand::f64() < threshold)
393                }
394                DegradationStrategy::SkipNonCritical => {
395                    // For now, accept all events (would need priority info)
396                    Ok(true)
397                }
398                DegradationStrategy::ExpandBuffer { factor } => {
399                    // Temporarily allow buffer to grow (check against expanded size)
400                    let expanded_size = (self.config.max_buffer_size as f64 * factor) as usize;
401                    let buffer = self.buffer.lock().await;
402                    Ok(buffer.len() < expanded_size)
403                }
404                DegradationStrategy::Sampling { sample_rate } => {
405                    // Keep events based on sample rate
406                    Ok(fastrand::f64() < *sample_rate)
407                }
408                DegradationStrategy::Combined(strategies) => {
409                    // Apply all strategies and accept only if all pass
410                    for strat in strategies {
411                        if !self.apply_degradation_strategy(strat).await? {
412                            return Ok(false);
413                        }
414                    }
415                    Ok(true)
416                }
417            }
418        })
419    }
420
421    /// Offer an event to the controller
422    pub async fn offer(&self, event: T) -> Result<()> {
423        // Update metrics
424        self.metrics_events_received.inc();
425        let mut stats = self.stats.lock().await;
426        stats.events_received += 1;
427        drop(stats);
428
429        // Check circuit breaker
430        if !self.check_circuit_state().await? {
431            self.metrics_events_dropped.inc();
432            return Err(anyhow!("Circuit breaker is open"));
433        }
434
435        // Apply graceful degradation
436        if !self.apply_degradation(&event).await? {
437            self.metrics_events_dropped.inc();
438            let mut stats = self.stats.lock().await;
439            stats.events_dropped += 1;
440            stats.degradation_active = true;
441            return Err(anyhow!("Event dropped due to graceful degradation"));
442        }
443
444        // Process event based on strategy
445        let result = match &self.config.strategy {
446            BackpressureStrategy::DropOldest => self.offer_drop_oldest(event).await,
447            BackpressureStrategy::DropNewest => self.offer_drop_newest(event).await,
448            BackpressureStrategy::Block => self.offer_blocking(event).await,
449            BackpressureStrategy::ExponentialBackoff {
450                initial_delay_ms,
451                max_delay_ms,
452                multiplier,
453            } => {
454                self.offer_with_backoff(event, *initial_delay_ms, *max_delay_ms, *multiplier)
455                    .await
456            }
457            BackpressureStrategy::Adaptive {
458                target_throughput,
459                adjustment_factor,
460            } => {
461                self.offer_adaptive(event, *target_throughput, *adjustment_factor)
462                    .await
463            }
464        };
465
466        // Update circuit breaker state based on result
467        match &result {
468            Ok(_) => self.record_circuit_success().await,
469            Err(_) => self.record_circuit_failure().await,
470        }
471
472        result
473    }
474
475    /// Offer with drop oldest strategy
476    async fn offer_drop_oldest(&self, event: T) -> Result<()> {
477        let mut buffer = self.buffer.lock().await;
478
479        if buffer.len() >= self.config.max_buffer_size {
480            // Drop oldest
481            buffer.pop_front();
482
483            self.metrics_events_dropped.inc();
484            let mut stats = self.stats.lock().await;
485            stats.events_dropped += 1;
486            drop(stats);
487
488            warn!("Buffer full, dropped oldest event");
489        }
490
491        buffer.push_back((event, Utc::now()));
492        let buffer_len = buffer.len();
493        self.metrics_queue_depth.set(buffer_len as f64);
494        drop(buffer);
495
496        self.update_flow_control(buffer_len).await;
497
498        Ok(())
499    }
500
501    /// Offer with drop newest strategy
502    async fn offer_drop_newest(&self, event: T) -> Result<()> {
503        let mut buffer = self.buffer.lock().await;
504
505        if buffer.len() >= self.config.max_buffer_size {
506            self.metrics_events_dropped.inc();
507            let mut stats = self.stats.lock().await;
508            stats.events_dropped += 1;
509            drop(stats);
510
511            warn!("Buffer full, dropped newest event");
512            return Ok(());
513        }
514
515        buffer.push_back((event, Utc::now()));
516        let buffer_len = buffer.len();
517        self.metrics_queue_depth.set(buffer_len as f64);
518        drop(buffer);
519
520        self.update_flow_control(buffer_len).await;
521
522        Ok(())
523    }
524
525    /// Offer with blocking strategy
526    async fn offer_blocking(&self, event: T) -> Result<()> {
527        // Acquire semaphore permit
528        let _permit = self
529            .semaphore
530            .acquire()
531            .await
532            .map_err(|e| anyhow!("Failed to acquire semaphore: {}", e))?;
533
534        let mut buffer = self.buffer.lock().await;
535        buffer.push_back((event, Utc::now()));
536
537        let buffer_size = buffer.len();
538        drop(buffer);
539
540        self.update_flow_control(buffer_size).await;
541
542        Ok(())
543    }
544
545    /// Offer with exponential backoff
546    async fn offer_with_backoff(
547        &self,
548        event: T,
549        initial_delay_ms: u64,
550        max_delay_ms: u64,
551        multiplier: f64,
552    ) -> Result<()> {
553        let mut delay_ms = initial_delay_ms;
554        let mut retries = 0;
555        const MAX_RETRIES: u32 = 10;
556
557        loop {
558            let buffer = self.buffer.lock().await;
559            let buffer_size = buffer.len();
560            drop(buffer);
561
562            if buffer_size < self.config.max_buffer_size {
563                let mut buffer = self.buffer.lock().await;
564                buffer.push_back((event, Utc::now()));
565                drop(buffer);
566
567                self.update_flow_control(buffer_size + 1).await;
568                return Ok(());
569            }
570
571            if retries >= MAX_RETRIES {
572                let mut stats = self.stats.lock().await;
573                stats.events_dropped += 1;
574                return Err(anyhow!("Max retries exceeded, dropping event"));
575            }
576
577            // Exponential backoff
578            tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
579
580            delay_ms = ((delay_ms as f64 * multiplier) as u64).min(max_delay_ms);
581            retries += 1;
582
583            let mut stats = self.stats.lock().await;
584            stats.events_blocked += 1;
585            drop(stats);
586        }
587    }
588
589    /// Offer with adaptive throttling using SciRS2
590    async fn offer_adaptive(
591        &self,
592        event: T,
593        target_throughput: f64,
594        adjustment_factor: f64,
595    ) -> Result<()> {
596        // Measure current throughput
597        let current_throughput = self.measure_throughput().await;
598
599        // Adaptive delay based on throughput
600        if current_throughput > target_throughput {
601            let delay_ms =
602                ((current_throughput / target_throughput - 1.0) * adjustment_factor) as u64;
603            tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
604        }
605
606        // Add to buffer
607        let mut buffer = self.buffer.lock().await;
608
609        if buffer.len() >= self.config.max_buffer_size {
610            let mut stats = self.stats.lock().await;
611            stats.events_dropped += 1;
612            drop(stats);
613
614            return Err(anyhow!("Buffer full even with adaptive throttling"));
615        }
616
617        buffer.push_back((event, Utc::now()));
618        let buffer_size = buffer.len();
619        drop(buffer);
620
621        self.update_flow_control(buffer_size).await;
622
623        Ok(())
624    }
625
626    /// Poll an event from the controller
627    pub async fn poll(&self) -> Result<Option<T>> {
628        let mut buffer = self.buffer.lock().await;
629
630        if let Some((event, timestamp)) = buffer.pop_front() {
631            let buffer_size = buffer.len();
632            self.metrics_queue_depth.set(buffer_size as f64);
633            drop(buffer);
634
635            // Release semaphore permit
636            self.semaphore.add_permits(1);
637
638            // Calculate and record latency
639            let latency = (Utc::now() - timestamp).num_milliseconds() as f64;
640            self.metrics_latency.observe(latency / 1000.0); // Convert to seconds
641
642            // Update metrics
643            self.metrics_events_processed.inc();
644
645            // Update stats
646            let mut stats = self.stats.lock().await;
647            stats.events_processed += 1;
648
649            let alpha = 0.1;
650            stats.avg_latency_ms = alpha * latency + (1.0 - alpha) * stats.avg_latency_ms;
651
652            drop(stats);
653
654            self.update_flow_control(buffer_size).await;
655            self.record_throughput().await;
656
657            Ok(Some(event))
658        } else {
659            Ok(None)
660        }
661    }
662
663    /// Update flow control signal
664    async fn update_flow_control(&self, buffer_size: usize) {
665        let utilization = buffer_size as f64 / self.config.max_buffer_size as f64;
666
667        let signal = if utilization >= self.config.high_water_mark {
668            FlowControlSignal::Stop
669        } else if utilization >= self.config.low_water_mark {
670            FlowControlSignal::SlowDown
671        } else {
672            FlowControlSignal::Proceed
673        };
674
675        let mut flow_control = self.flow_control.lock().await;
676        if *flow_control != signal {
677            debug!(
678                "Flow control signal changed: {:?} -> {:?}",
679                *flow_control, signal
680            );
681
682            if signal != FlowControlSignal::Proceed {
683                self.metrics_backpressure_events.inc();
684                let mut stats = self.stats.lock().await;
685                stats.backpressure_events += 1;
686            }
687        }
688        *flow_control = signal;
689
690        // Update stats
691        let mut stats = self.stats.lock().await;
692        stats.buffer_size = buffer_size;
693        stats.buffer_utilization = utilization;
694    }
695
696    /// Record throughput measurement
697    async fn record_throughput(&self) {
698        let now = Utc::now();
699        let mut history = self.throughput_history.lock().await;
700
701        history.push_back((now, 1));
702
703        // Clean old measurements
704        let window_start = now - self.config.measurement_window;
705        while let Some((timestamp, _)) = history.front() {
706            if *timestamp < window_start {
707                history.pop_front();
708            } else {
709                break;
710            }
711        }
712    }
713
714    /// Measure current throughput
715    async fn measure_throughput(&self) -> f64 {
716        let now = Utc::now();
717        let history = self.throughput_history.lock().await;
718
719        if history.is_empty() {
720            return 0.0;
721        }
722
723        let window_start = now - self.config.measurement_window;
724        let count: u64 = history
725            .iter()
726            .filter(|(timestamp, _)| *timestamp >= window_start)
727            .map(|(_, count)| count)
728            .sum();
729
730        let elapsed_seconds = self.config.measurement_window.num_seconds() as f64;
731        count as f64 / elapsed_seconds
732    }
733
734    /// Get current flow control signal
735    pub async fn flow_control_signal(&self) -> FlowControlSignal {
736        *self.flow_control.lock().await
737    }
738
739    /// Get statistics
740    pub async fn stats(&self) -> BackpressureStats {
741        let stats = self.stats.lock().await;
742        let mut result = stats.clone();
743
744        // Update current throughput
745        drop(stats);
746        result.current_throughput = self.measure_throughput().await;
747
748        // Update circuit breaker info
749        result.circuit_state = *self.circuit_state.lock().await;
750        result.circuit_failures = *self.circuit_failures.lock().await;
751        result.circuit_successes = *self.circuit_successes.lock().await;
752
753        result
754    }
755
756    /// Get circuit breaker state
757    pub async fn circuit_state(&self) -> CircuitState {
758        *self.circuit_state.lock().await
759    }
760
761    /// Get buffer size
762    pub async fn buffer_size(&self) -> usize {
763        self.buffer.lock().await.len()
764    }
765
766    /// Clear buffer
767    pub async fn clear(&self) {
768        let mut buffer = self.buffer.lock().await;
769        let cleared_count = buffer.len();
770        buffer.clear();
771
772        // Release all permits
773        self.semaphore.add_permits(cleared_count);
774
775        let mut stats = self.stats.lock().await;
776        stats.buffer_size = 0;
777        stats.buffer_utilization = 0.0;
778    }
779}
780
781/// Rate limiter with token bucket algorithm
782pub struct RateLimiter {
783    tokens: Arc<Mutex<f64>>,
784    max_tokens: f64,
785    refill_rate: f64, // tokens per second
786    last_refill: Arc<Mutex<DateTime<Utc>>>,
787}
788
789impl RateLimiter {
790    /// Create a new rate limiter
791    pub fn new(max_tokens: f64, refill_rate: f64) -> Self {
792        Self {
793            tokens: Arc::new(Mutex::new(max_tokens)),
794            max_tokens,
795            refill_rate,
796            last_refill: Arc::new(Mutex::new(Utc::now())),
797        }
798    }
799
800    /// Try to acquire a token
801    pub async fn try_acquire(&self) -> bool {
802        self.refill_tokens().await;
803
804        let mut tokens = self.tokens.lock().await;
805        if *tokens >= 1.0 {
806            *tokens -= 1.0;
807            true
808        } else {
809            false
810        }
811    }
812
813    /// Acquire a token (blocking)
814    pub async fn acquire(&self) -> Result<()> {
815        loop {
816            if self.try_acquire().await {
817                return Ok(());
818            }
819
820            // Wait for refill
821            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
822        }
823    }
824
825    /// Refill tokens based on elapsed time
826    async fn refill_tokens(&self) {
827        let now = Utc::now();
828        let mut last_refill = self.last_refill.lock().await;
829
830        let elapsed = (now - *last_refill).num_milliseconds() as f64 / 1000.0;
831        let new_tokens = elapsed * self.refill_rate;
832
833        if new_tokens > 0.0 {
834            let mut tokens = self.tokens.lock().await;
835            *tokens = (*tokens + new_tokens).min(self.max_tokens);
836            *last_refill = now;
837        }
838    }
839
840    /// Get current token count
841    pub async fn available_tokens(&self) -> f64 {
842        self.refill_tokens().await;
843        *self.tokens.lock().await
844    }
845}
846
847#[cfg(test)]
848mod tests {
849    use super::*;
850
851    #[tokio::test]
852    async fn test_backpressure_drop_oldest() {
853        let config = BackpressureConfig {
854            max_buffer_size: 3,
855            strategy: BackpressureStrategy::DropOldest,
856            high_water_mark: 1.5, // Disable degradation
857            ..Default::default()
858        };
859
860        let controller = BackpressureController::new(config);
861
862        // Fill buffer
863        for i in 0..5 {
864            controller.offer(i).await.unwrap();
865        }
866
867        // Should have dropped 2 oldest (0, 1)
868        assert_eq!(controller.buffer_size().await, 3);
869
870        // Poll should return 2 (oldest after drops)
871        let event = controller.poll().await.unwrap().unwrap();
872        assert_eq!(event, 2);
873    }
874
875    #[tokio::test]
876    async fn test_backpressure_drop_newest() {
877        let config = BackpressureConfig {
878            max_buffer_size: 3,
879            strategy: BackpressureStrategy::DropNewest,
880            high_water_mark: 1.5, // Disable degradation
881            ..Default::default()
882        };
883
884        let controller = BackpressureController::new(config);
885
886        // Fill buffer
887        for i in 0..5 {
888            controller.offer(i).await.unwrap();
889        }
890
891        // Should have kept first 3, dropped 3 and 4
892        assert_eq!(controller.buffer_size().await, 3);
893
894        let event = controller.poll().await.unwrap().unwrap();
895        assert_eq!(event, 0);
896    }
897
898    #[tokio::test]
899    async fn test_flow_control_signals() {
900        let config = BackpressureConfig {
901            max_buffer_size: 100,
902            high_water_mark: 0.8,
903            low_water_mark: 0.2,
904            degradation: DegradationStrategy::ReduceThroughput {
905                reduction_percent: 0.0, // Disable degradation
906            },
907            ..Default::default()
908        };
909
910        let controller = BackpressureController::new(config);
911
912        // Low utilization
913        assert_eq!(
914            controller.flow_control_signal().await,
915            FlowControlSignal::Proceed
916        );
917
918        // Fill to medium utilization
919        for i in 0..30 {
920            controller.offer(i).await.unwrap();
921        }
922
923        assert_eq!(
924            controller.flow_control_signal().await,
925            FlowControlSignal::SlowDown
926        );
927
928        // Fill to high utilization
929        for i in 30..85 {
930            controller.offer(i).await.unwrap();
931        }
932
933        assert_eq!(
934            controller.flow_control_signal().await,
935            FlowControlSignal::Stop
936        );
937    }
938
939    #[tokio::test]
940    async fn test_rate_limiter() {
941        let limiter = RateLimiter::new(10.0, 10.0); // 10 tokens, 10/second refill
942
943        // Should be able to acquire 10 tokens
944        for _ in 0..10 {
945            assert!(limiter.try_acquire().await);
946        }
947
948        // 11th should fail
949        assert!(!limiter.try_acquire().await);
950
951        // Wait for refill
952        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
953
954        // Should be able to acquire again
955        assert!(limiter.try_acquire().await);
956    }
957
958    // Circuit Breaker Tests
959    #[tokio::test]
960    async fn test_circuit_breaker_closed_to_open() {
961        let config = BackpressureConfig {
962            max_buffer_size: 100,
963            strategy: BackpressureStrategy::Block,
964            circuit_breaker: CircuitBreakerConfig {
965                enabled: true,
966                failure_threshold: 3,
967                success_threshold: 2,
968                timeout: Duration::from_millis(100),
969                half_open_max_calls: 2,
970            },
971            ..Default::default()
972        };
973
974        let controller = BackpressureController::<i32>::new(config);
975
976        // Initial state should be Closed
977        assert_eq!(controller.circuit_state().await, CircuitState::Closed);
978
979        // Manually trigger failures by recording them directly
980        for _ in 0..3 {
981            controller.record_circuit_failure().await;
982        }
983
984        // After enough failures, circuit should open
985        assert_eq!(controller.circuit_state().await, CircuitState::Open);
986    }
987
988    #[tokio::test]
989    async fn test_circuit_breaker_open_to_half_open() {
990        let config = BackpressureConfig {
991            max_buffer_size: 100,
992            strategy: BackpressureStrategy::Block,
993            circuit_breaker: CircuitBreakerConfig {
994                enabled: true,
995                failure_threshold: 3,
996                success_threshold: 2,
997                timeout: Duration::from_millis(50),
998                half_open_max_calls: 2,
999            },
1000            ..Default::default()
1001        };
1002
1003        let controller = BackpressureController::<i32>::new(config);
1004
1005        // Manually cause circuit to open by recording failures
1006        for _ in 0..3 {
1007            controller.record_circuit_failure().await;
1008        }
1009
1010        assert_eq!(controller.circuit_state().await, CircuitState::Open);
1011
1012        // Wait for timeout
1013        tokio::time::sleep(Duration::from_millis(100)).await;
1014
1015        // Try to check circuit state - this should transition to HalfOpen
1016        let _ = controller.check_circuit_state().await;
1017        assert_eq!(controller.circuit_state().await, CircuitState::HalfOpen);
1018    }
1019
1020    #[tokio::test]
1021    async fn test_circuit_breaker_half_open_to_closed() {
1022        let config = BackpressureConfig {
1023            max_buffer_size: 100, // Large enough to allow successes
1024            strategy: BackpressureStrategy::Block,
1025            circuit_breaker: CircuitBreakerConfig {
1026                enabled: true,
1027                failure_threshold: 2,
1028                success_threshold: 2,
1029                timeout: Duration::from_millis(50),
1030                half_open_max_calls: 5,
1031            },
1032            ..Default::default()
1033        };
1034
1035        let controller = BackpressureController::<i32>::new(config);
1036
1037        // Manually set state to HalfOpen for testing
1038        *controller.circuit_state.lock().await = CircuitState::HalfOpen;
1039
1040        // Record successes
1041        for _ in 0..2 {
1042            controller.record_circuit_success().await;
1043        }
1044
1045        // Should transition to Closed
1046        assert_eq!(controller.circuit_state().await, CircuitState::Closed);
1047    }
1048
1049    // Stress Tests
1050    #[tokio::test]
1051    async fn test_stress_high_load() {
1052        let config = BackpressureConfig {
1053            max_buffer_size: 1000,
1054            strategy: BackpressureStrategy::DropOldest,
1055            ..Default::default()
1056        };
1057
1058        let controller = Arc::new(BackpressureController::new(config));
1059
1060        // Spawn multiple producers
1061        let mut handles = vec![];
1062        for producer_id in 0..10 {
1063            let controller_clone = controller.clone();
1064            let handle = tokio::spawn(async move {
1065                for i in 0..1000 {
1066                    let value = producer_id * 1000 + i;
1067                    let _ = controller_clone.offer(value).await;
1068                }
1069            });
1070            handles.push(handle);
1071        }
1072
1073        // Wait for all producers
1074        for handle in handles {
1075            handle.await.unwrap();
1076        }
1077
1078        // Verify stats
1079        let stats = controller.stats().await;
1080        assert_eq!(stats.events_received, 10000);
1081        assert!(stats.buffer_size <= 1000);
1082    }
1083
1084    #[tokio::test]
1085    async fn test_stress_concurrent_offer_and_poll() {
1086        let config = BackpressureConfig {
1087            max_buffer_size: 500,
1088            strategy: BackpressureStrategy::Block,
1089            ..Default::default()
1090        };
1091
1092        let controller = Arc::new(BackpressureController::new(config));
1093
1094        // Spawn producer
1095        let producer_controller = controller.clone();
1096        let producer = tokio::spawn(async move {
1097            for i in 0..5000 {
1098                let _ = producer_controller.offer(i).await;
1099            }
1100        });
1101
1102        // Spawn consumer with optimized polling strategy
1103        let consumer_controller = controller.clone();
1104        let consumer = tokio::spawn(async move {
1105            let mut count = 0;
1106            let timeout_duration = Duration::from_secs(10);
1107            let start_time = Instant::now();
1108            let mut consecutive_empty_polls = 0;
1109
1110            loop {
1111                // Add timeout to prevent infinite loops
1112                if start_time.elapsed() > timeout_duration {
1113                    panic!(
1114                        "Consumer timeout after 10 seconds, consumed {} events",
1115                        count
1116                    );
1117                }
1118
1119                match consumer_controller.poll().await {
1120                    Ok(Some(_)) => {
1121                        count += 1;
1122                        consecutive_empty_polls = 0;
1123                        if count >= 5000 {
1124                            break;
1125                        }
1126                        // Don't sleep when successfully consuming - keep polling
1127                    }
1128                    Ok(None) => {
1129                        // Adaptive backoff: sleep longer after consecutive empty polls
1130                        consecutive_empty_polls += 1;
1131                        let sleep_duration = if consecutive_empty_polls < 5 {
1132                            Duration::from_micros(100) // Initial backoff
1133                        } else if consecutive_empty_polls < 20 {
1134                            Duration::from_micros(500) // Medium backoff
1135                        } else {
1136                            Duration::from_millis(1) // Maximum backoff
1137                        };
1138                        tokio::time::sleep(sleep_duration).await;
1139                    }
1140                    Err(_) => {
1141                        // Error case - use medium backoff
1142                        tokio::time::sleep(Duration::from_micros(500)).await;
1143                    }
1144                }
1145            }
1146            count
1147        });
1148
1149        // Wait for both with timeout
1150        let producer_result = tokio::time::timeout(Duration::from_secs(10), producer).await;
1151        assert!(producer_result.is_ok(), "Producer timeout");
1152        producer_result.unwrap().unwrap();
1153
1154        let consumer_result = tokio::time::timeout(Duration::from_secs(10), consumer).await;
1155        assert!(consumer_result.is_ok(), "Consumer timeout");
1156        let consumed = consumer_result.unwrap().unwrap();
1157
1158        assert_eq!(consumed, 5000);
1159
1160        // Verify stats
1161        let stats = controller.stats().await;
1162        assert_eq!(stats.events_received, 5000);
1163        assert_eq!(stats.events_processed, 5000);
1164    }
1165
1166    // Degradation Strategy Tests
1167    #[tokio::test]
1168    async fn test_degradation_reduce_throughput() {
1169        let config = BackpressureConfig {
1170            max_buffer_size: 10,
1171            strategy: BackpressureStrategy::DropOldest,
1172            high_water_mark: 0.5, // Trigger degradation early
1173            degradation: DegradationStrategy::ReduceThroughput {
1174                reduction_percent: 50.0,
1175            },
1176            ..Default::default()
1177        };
1178
1179        let controller = BackpressureController::new(config);
1180
1181        // Fill buffer to trigger degradation
1182        for i in 0..20 {
1183            let _ = controller.offer(i).await;
1184        }
1185
1186        let stats = controller.stats().await;
1187        // Some events should be dropped due to degradation
1188        assert!(stats.events_dropped > 0);
1189    }
1190
1191    #[tokio::test]
1192    async fn test_degradation_sampling() {
1193        let config = BackpressureConfig {
1194            max_buffer_size: 10,
1195            strategy: BackpressureStrategy::DropOldest,
1196            high_water_mark: 0.5,
1197            degradation: DegradationStrategy::Sampling { sample_rate: 0.5 },
1198            ..Default::default()
1199        };
1200
1201        let controller = BackpressureController::new(config);
1202
1203        // Fill buffer
1204        for i in 0..20 {
1205            let _ = controller.offer(i).await;
1206        }
1207
1208        let stats = controller.stats().await;
1209        // All events are received, but roughly half should be dropped due to sampling
1210        assert_eq!(stats.events_received, 20);
1211        assert!(stats.events_dropped > 0); // Some should be dropped
1212        assert!(stats.buffer_size < 20); // Not all made it to buffer
1213    }
1214
1215    // Metrics Tests
1216    #[tokio::test]
1217    async fn test_metrics_collection() {
1218        let config = BackpressureConfig {
1219            max_buffer_size: 100,
1220            strategy: BackpressureStrategy::Block,
1221            ..Default::default()
1222        };
1223
1224        let controller = BackpressureController::new(config);
1225
1226        // Verify initial metrics
1227        assert_eq!(controller.metrics_events_received.get(), 0);
1228        assert_eq!(controller.metrics_events_processed.get(), 0);
1229
1230        // Offer and poll events
1231        for i in 0..10 {
1232            controller.offer(i).await.unwrap();
1233        }
1234
1235        assert_eq!(controller.metrics_events_received.get(), 10);
1236
1237        for _ in 0..5 {
1238            controller.poll().await.unwrap();
1239        }
1240
1241        assert_eq!(controller.metrics_events_processed.get(), 5);
1242        assert_eq!(controller.metrics_queue_depth.get(), 5.0);
1243    }
1244
1245    #[tokio::test]
1246    async fn test_metrics_latency() {
1247        let config = BackpressureConfig {
1248            max_buffer_size: 100,
1249            strategy: BackpressureStrategy::Block,
1250            ..Default::default()
1251        };
1252
1253        let controller = BackpressureController::new(config);
1254
1255        // Offer events
1256        for i in 0..10 {
1257            controller.offer(i).await.unwrap();
1258        }
1259
1260        // Wait a bit to create measurable latency
1261        tokio::time::sleep(Duration::from_millis(10)).await;
1262
1263        // Poll events
1264        for _ in 0..10 {
1265            controller.poll().await.unwrap();
1266        }
1267
1268        // Check latency histogram
1269        let stats = controller.metrics_latency.get_stats();
1270        assert!(stats.count == 10);
1271        assert!(stats.mean > 0.0);
1272    }
1273
1274    #[tokio::test]
1275    async fn test_metrics_backpressure_events() {
1276        let config = BackpressureConfig {
1277            max_buffer_size: 100,
1278            strategy: BackpressureStrategy::DropOldest,
1279            high_water_mark: 0.5,
1280            degradation: DegradationStrategy::ReduceThroughput {
1281                reduction_percent: 0.0, // Disable degradation
1282            },
1283            ..Default::default()
1284        };
1285
1286        let controller = BackpressureController::new(config);
1287
1288        // Fill buffer to trigger backpressure
1289        for i in 0..60 {
1290            controller.offer(i).await.unwrap();
1291        }
1292
1293        // Should have triggered backpressure events
1294        assert!(controller.metrics_backpressure_events.get() > 0);
1295    }
1296}