Skip to main content

oxirs_stream/
backpressure.rs

1//! Backpressure and Flow Control
2//!
3//! This module provides sophisticated backpressure handling for stream processing:
4//! - Dynamic rate limiting based on system load
5//! - Buffer management with overflow strategies
6//! - Flow control signals
7//! - Adaptive throttling
8//! - Queue depth monitoring
9//! - Circuit breaker pattern for fault tolerance
10//! - Graceful degradation strategies
11//!
12//! Uses SciRS2 metrics for comprehensive monitoring
13
14use anyhow::{anyhow, Result};
15use chrono::{DateTime, Duration as ChronoDuration, Utc};
16use scirs2_core::metrics::{Counter, Gauge, Histogram};
17use serde::{Deserialize, Serialize};
18use std::collections::VecDeque;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{Mutex, Semaphore};
23use tracing::{debug, info, warn};
24
25type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
26
27/// Circuit breaker state for backpressure control
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
29pub enum CircuitState {
30    /// Circuit is closed, normal operation
31    #[default]
32    Closed,
33    /// Circuit is open, rejecting requests
34    Open,
35    /// Circuit is half-open, testing recovery
36    HalfOpen,
37}
38
39/// Graceful degradation strategy
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum DegradationStrategy {
42    /// Reduce throughput by percentage
43    ReduceThroughput { reduction_percent: f64 },
44    /// Skip non-critical operations
45    SkipNonCritical,
46    /// Increase buffer size temporarily
47    ExpandBuffer { factor: f64 },
48    /// Sample events (keep every Nth event)
49    Sampling { sample_rate: f64 },
50    /// Combined strategies
51    Combined(Vec<DegradationStrategy>),
52}
53
54/// Backpressure strategy
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub enum BackpressureStrategy {
57    /// Drop oldest events when buffer is full
58    DropOldest,
59    /// Drop newest events when buffer is full
60    DropNewest,
61    /// Block until space is available
62    Block,
63    /// Exponential backoff with retries
64    ExponentialBackoff {
65        initial_delay_ms: u64,
66        max_delay_ms: u64,
67        multiplier: f64,
68    },
69    /// Adaptive throttling based on throughput
70    Adaptive {
71        target_throughput: f64,
72        adjustment_factor: f64,
73    },
74}
75
76/// Flow control signal
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum FlowControlSignal {
79    /// System is healthy, proceed normally
80    Proceed,
81    /// System is under pressure, slow down
82    SlowDown,
83    /// System is overloaded, stop sending
84    Stop,
85}
86
87/// Circuit breaker configuration
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct CircuitBreakerConfig {
90    /// Enable circuit breaker
91    pub enabled: bool,
92    /// Failure threshold to open circuit
93    pub failure_threshold: u32,
94    /// Success threshold to close circuit
95    pub success_threshold: u32,
96    /// Timeout before transitioning to half-open
97    pub timeout: Duration,
98    /// Maximum calls in half-open state
99    pub half_open_max_calls: u32,
100}
101
102impl Default for CircuitBreakerConfig {
103    fn default() -> Self {
104        Self {
105            enabled: true,
106            failure_threshold: 5,
107            success_threshold: 3,
108            timeout: Duration::from_secs(30),
109            half_open_max_calls: 3,
110        }
111    }
112}
113
114/// Backpressure configuration
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct BackpressureConfig {
117    /// Maximum buffer size
118    pub max_buffer_size: usize,
119    /// Backpressure strategy
120    pub strategy: BackpressureStrategy,
121    /// High water mark (percentage of buffer)
122    pub high_water_mark: f64,
123    /// Low water mark (percentage of buffer)
124    pub low_water_mark: f64,
125    /// Enable adaptive throttling
126    pub enable_adaptive: bool,
127    /// Measurement window for throughput
128    pub measurement_window: ChronoDuration,
129    /// Circuit breaker configuration
130    pub circuit_breaker: CircuitBreakerConfig,
131    /// Degradation strategy when under pressure
132    pub degradation: DegradationStrategy,
133}
134
135impl Default for BackpressureConfig {
136    fn default() -> Self {
137        Self {
138            max_buffer_size: 10000,
139            strategy: BackpressureStrategy::Block,
140            high_water_mark: 0.8,
141            low_water_mark: 0.2,
142            enable_adaptive: true,
143            measurement_window: ChronoDuration::seconds(10),
144            circuit_breaker: CircuitBreakerConfig::default(),
145            degradation: DegradationStrategy::ReduceThroughput {
146                reduction_percent: 50.0,
147            },
148        }
149    }
150}
151
152/// Backpressure controller statistics
153#[derive(Debug, Clone, Default)]
154pub struct BackpressureStats {
155    pub events_received: u64,
156    pub events_processed: u64,
157    pub events_dropped: u64,
158    pub events_blocked: u64,
159    pub buffer_size: usize,
160    pub buffer_utilization: f64,
161    pub current_throughput: f64,
162    pub backpressure_events: u64,
163    pub avg_latency_ms: f64,
164    pub circuit_state: CircuitState,
165    pub circuit_failures: u32,
166    pub circuit_successes: u32,
167    pub degradation_active: bool,
168}
169
170/// Type alias for timestamped buffer elements
171type TimestampedBuffer<T> = Arc<Mutex<VecDeque<(T, DateTime<Utc>)>>>;
172
173/// Type alias for throughput history
174type ThroughputHistory = Arc<Mutex<VecDeque<(DateTime<Utc>, u64)>>>;
175
176/// Exported metrics snapshot
177#[derive(Debug, Clone)]
178pub struct BackpressureMetrics {
179    pub events_received: u64,
180    pub events_processed: u64,
181    pub events_dropped: u64,
182    pub queue_depth: f64,
183    pub latency_stats: scirs2_core::metrics::HistogramStats,
184    pub backpressure_events: u64,
185    pub circuit_state_changes: u64,
186}
187
188/// Backpressure controller
189pub struct BackpressureController<T> {
190    config: BackpressureConfig,
191    buffer: TimestampedBuffer<T>,
192    stats: Arc<Mutex<BackpressureStats>>,
193    flow_control: Arc<Mutex<FlowControlSignal>>,
194    semaphore: Arc<Semaphore>,
195    throughput_history: ThroughputHistory,
196    // Circuit breaker state
197    circuit_state: Arc<Mutex<CircuitState>>,
198    circuit_failures: Arc<Mutex<u32>>,
199    circuit_successes: Arc<Mutex<u32>>,
200    circuit_last_failure: Arc<Mutex<Option<Instant>>>,
201    circuit_half_open_calls: Arc<Mutex<u32>>,
202    // SciRS2 metrics
203    metrics_events_received: Arc<Counter>,
204    metrics_events_processed: Arc<Counter>,
205    metrics_events_dropped: Arc<Counter>,
206    metrics_queue_depth: Arc<Gauge>,
207    metrics_latency: Arc<Histogram>,
208    metrics_backpressure_events: Arc<Counter>,
209    metrics_circuit_state_changes: Arc<Counter>,
210}
211
212impl<T: Clone + Send> BackpressureController<T> {
213    /// Create a new backpressure controller
214    pub fn new(config: BackpressureConfig) -> Self {
215        let max_permits = config.max_buffer_size;
216
217        // Initialize SciRS2 metrics
218        let metrics_events_received =
219            Arc::new(Counter::new("backpressure_events_received".to_string()));
220        let metrics_events_processed =
221            Arc::new(Counter::new("backpressure_events_processed".to_string()));
222        let metrics_events_dropped =
223            Arc::new(Counter::new("backpressure_events_dropped".to_string()));
224        let metrics_queue_depth = Arc::new(Gauge::new("backpressure_queue_depth".to_string()));
225        let metrics_latency = Arc::new(Histogram::new("backpressure_latency_seconds".to_string()));
226        let metrics_backpressure_events =
227            Arc::new(Counter::new("backpressure_events_total".to_string()));
228        let metrics_circuit_state_changes = Arc::new(Counter::new(
229            "backpressure_circuit_state_changes".to_string(),
230        ));
231
232        Self {
233            config,
234            buffer: Arc::new(Mutex::new(VecDeque::new())),
235            stats: Arc::new(Mutex::new(BackpressureStats::default())),
236            flow_control: Arc::new(Mutex::new(FlowControlSignal::Proceed)),
237            semaphore: Arc::new(Semaphore::new(max_permits)),
238            throughput_history: Arc::new(Mutex::new(VecDeque::new())),
239            circuit_state: Arc::new(Mutex::new(CircuitState::Closed)),
240            circuit_failures: Arc::new(Mutex::new(0)),
241            circuit_successes: Arc::new(Mutex::new(0)),
242            circuit_last_failure: Arc::new(Mutex::new(None)),
243            circuit_half_open_calls: Arc::new(Mutex::new(0)),
244            metrics_events_received,
245            metrics_events_processed,
246            metrics_events_dropped,
247            metrics_queue_depth,
248            metrics_latency,
249            metrics_backpressure_events,
250            metrics_circuit_state_changes,
251        }
252    }
253
254    /// Get metrics for export
255    pub fn get_metrics(&self) -> BackpressureMetrics {
256        BackpressureMetrics {
257            events_received: self.metrics_events_received.get(),
258            events_processed: self.metrics_events_processed.get(),
259            events_dropped: self.metrics_events_dropped.get(),
260            queue_depth: self.metrics_queue_depth.get(),
261            latency_stats: self.metrics_latency.get_stats(),
262            backpressure_events: self.metrics_backpressure_events.get(),
263            circuit_state_changes: self.metrics_circuit_state_changes.get(),
264        }
265    }
266
267    /// Check circuit breaker state and handle transitions
268    async fn check_circuit_state(&self) -> Result<bool> {
269        if !self.config.circuit_breaker.enabled {
270            return Ok(true); // Circuit breaker disabled, always allow
271        }
272
273        let mut state = self.circuit_state.lock().await;
274        let circuit_config = &self.config.circuit_breaker;
275
276        match *state {
277            CircuitState::Closed => Ok(true),
278            CircuitState::Open => {
279                let last_failure = self.circuit_last_failure.lock().await;
280                if let Some(last_fail_time) = *last_failure {
281                    if last_fail_time.elapsed() >= circuit_config.timeout {
282                        // Transition to HalfOpen
283                        *state = CircuitState::HalfOpen;
284                        *self.circuit_half_open_calls.lock().await = 0;
285                        self.metrics_circuit_state_changes.inc();
286                        info!("Circuit breaker transitioned to HalfOpen");
287                        Ok(true)
288                    } else {
289                        Ok(false) // Still open, reject request
290                    }
291                } else {
292                    Ok(false)
293                }
294            }
295            CircuitState::HalfOpen => {
296                let mut half_open_calls = self.circuit_half_open_calls.lock().await;
297                if *half_open_calls < circuit_config.half_open_max_calls {
298                    *half_open_calls += 1;
299                    Ok(true)
300                } else {
301                    Ok(false) // Too many calls in half-open state
302                }
303            }
304        }
305    }
306
307    /// Record success for circuit breaker
308    async fn record_circuit_success(&self) {
309        if !self.config.circuit_breaker.enabled {
310            return;
311        }
312
313        let mut state = self.circuit_state.lock().await;
314        let circuit_config = &self.config.circuit_breaker;
315
316        match *state {
317            CircuitState::HalfOpen => {
318                let mut successes = self.circuit_successes.lock().await;
319                *successes += 1;
320                if *successes >= circuit_config.success_threshold {
321                    // Transition to Closed
322                    *state = CircuitState::Closed;
323                    *self.circuit_failures.lock().await = 0;
324                    *successes = 0; // Reset using existing lock instead of acquiring again
325                    self.metrics_circuit_state_changes.inc();
326                    info!("Circuit breaker transitioned to Closed");
327                }
328            }
329            CircuitState::Closed => {
330                // Reset failure count on success
331                *self.circuit_failures.lock().await = 0;
332            }
333            CircuitState::Open => {
334                // Should not happen, but reset if it does
335                *state = CircuitState::Closed;
336                *self.circuit_failures.lock().await = 0;
337                self.metrics_circuit_state_changes.inc();
338            }
339        }
340    }
341
342    /// Record failure for circuit breaker
343    async fn record_circuit_failure(&self) {
344        if !self.config.circuit_breaker.enabled {
345            return;
346        }
347
348        let mut state = self.circuit_state.lock().await;
349        let circuit_config = &self.config.circuit_breaker;
350        let mut failures = self.circuit_failures.lock().await;
351
352        *failures += 1;
353        *self.circuit_last_failure.lock().await = Some(Instant::now());
354
355        if *failures >= circuit_config.failure_threshold && *state != CircuitState::Open {
356            // Transition to Open
357            *state = CircuitState::Open;
358            *self.circuit_successes.lock().await = 0;
359            self.metrics_circuit_state_changes.inc();
360            warn!(
361                "Circuit breaker transitioned to Open after {} failures",
362                failures
363            );
364        }
365    }
366
367    /// Apply graceful degradation strategy
368    async fn apply_degradation(&self, _event: &T) -> Result<bool> {
369        // Block strategy explicitly forbids silent drops: the caller wants
370        // the producer to await available capacity, not have its events
371        // discarded behind its back. Skipping degradation here keeps
372        // `Block` semantics correct end-to-end (no event loss) while
373        // still allowing degradation to throttle drop-tolerant strategies.
374        if matches!(self.config.strategy, BackpressureStrategy::Block) {
375            return Ok(true);
376        }
377
378        let stats = self.stats.lock().await;
379        let utilization = stats.buffer_utilization;
380        drop(stats);
381
382        // Only apply degradation when utilization is high
383        if utilization < self.config.high_water_mark {
384            return Ok(true); // No degradation needed
385        }
386
387        self.apply_degradation_strategy(&self.config.degradation)
388            .await
389    }
390
391    /// Helper method to apply a specific degradation strategy
392    fn apply_degradation_strategy<'a>(
393        &'a self,
394        strategy: &'a DegradationStrategy,
395    ) -> BoxFuture<'a, Result<bool>> {
396        Box::pin(async move {
397            match strategy {
398                DegradationStrategy::ReduceThroughput { reduction_percent } => {
399                    // Randomly drop events based on reduction percentage
400                    let threshold = 1.0 - (reduction_percent / 100.0);
401                    Ok(fastrand::f64() < threshold)
402                }
403                DegradationStrategy::SkipNonCritical => {
404                    // For now, accept all events (would need priority info)
405                    Ok(true)
406                }
407                DegradationStrategy::ExpandBuffer { factor } => {
408                    // Temporarily allow buffer to grow (check against expanded size)
409                    let expanded_size = (self.config.max_buffer_size as f64 * factor) as usize;
410                    let buffer = self.buffer.lock().await;
411                    Ok(buffer.len() < expanded_size)
412                }
413                DegradationStrategy::Sampling { sample_rate } => {
414                    // Keep events based on sample rate
415                    Ok(fastrand::f64() < *sample_rate)
416                }
417                DegradationStrategy::Combined(strategies) => {
418                    // Apply all strategies and accept only if all pass
419                    for strat in strategies {
420                        if !self.apply_degradation_strategy(strat).await? {
421                            return Ok(false);
422                        }
423                    }
424                    Ok(true)
425                }
426            }
427        })
428    }
429
430    /// Offer an event to the controller
431    pub async fn offer(&self, event: T) -> Result<()> {
432        // Update metrics
433        self.metrics_events_received.inc();
434        let mut stats = self.stats.lock().await;
435        stats.events_received += 1;
436        drop(stats);
437
438        // Check circuit breaker
439        if !self.check_circuit_state().await? {
440            self.metrics_events_dropped.inc();
441            return Err(anyhow!("Circuit breaker is open"));
442        }
443
444        // Apply graceful degradation
445        if !self.apply_degradation(&event).await? {
446            self.metrics_events_dropped.inc();
447            let mut stats = self.stats.lock().await;
448            stats.events_dropped += 1;
449            stats.degradation_active = true;
450            return Err(anyhow!("Event dropped due to graceful degradation"));
451        }
452
453        // Process event based on strategy
454        let result = match &self.config.strategy {
455            BackpressureStrategy::DropOldest => self.offer_drop_oldest(event).await,
456            BackpressureStrategy::DropNewest => self.offer_drop_newest(event).await,
457            BackpressureStrategy::Block => self.offer_blocking(event).await,
458            BackpressureStrategy::ExponentialBackoff {
459                initial_delay_ms,
460                max_delay_ms,
461                multiplier,
462            } => {
463                self.offer_with_backoff(event, *initial_delay_ms, *max_delay_ms, *multiplier)
464                    .await
465            }
466            BackpressureStrategy::Adaptive {
467                target_throughput,
468                adjustment_factor,
469            } => {
470                self.offer_adaptive(event, *target_throughput, *adjustment_factor)
471                    .await
472            }
473        };
474
475        // Update circuit breaker state based on result
476        match &result {
477            Ok(_) => self.record_circuit_success().await,
478            Err(_) => self.record_circuit_failure().await,
479        }
480
481        result
482    }
483
484    /// Offer with drop oldest strategy
485    async fn offer_drop_oldest(&self, event: T) -> Result<()> {
486        let mut buffer = self.buffer.lock().await;
487
488        if buffer.len() >= self.config.max_buffer_size {
489            // Drop oldest
490            buffer.pop_front();
491
492            self.metrics_events_dropped.inc();
493            let mut stats = self.stats.lock().await;
494            stats.events_dropped += 1;
495            drop(stats);
496
497            warn!("Buffer full, dropped oldest event");
498        }
499
500        buffer.push_back((event, Utc::now()));
501        let buffer_len = buffer.len();
502        self.metrics_queue_depth.set(buffer_len as f64);
503        drop(buffer);
504
505        self.update_flow_control(buffer_len).await;
506
507        Ok(())
508    }
509
510    /// Offer with drop newest strategy
511    async fn offer_drop_newest(&self, event: T) -> Result<()> {
512        let mut buffer = self.buffer.lock().await;
513
514        if buffer.len() >= self.config.max_buffer_size {
515            self.metrics_events_dropped.inc();
516            let mut stats = self.stats.lock().await;
517            stats.events_dropped += 1;
518            drop(stats);
519
520            warn!("Buffer full, dropped newest event");
521            return Ok(());
522        }
523
524        buffer.push_back((event, Utc::now()));
525        let buffer_len = buffer.len();
526        self.metrics_queue_depth.set(buffer_len as f64);
527        drop(buffer);
528
529        self.update_flow_control(buffer_len).await;
530
531        Ok(())
532    }
533
534    /// Offer with blocking strategy
535    async fn offer_blocking(&self, event: T) -> Result<()> {
536        // Acquire a semaphore permit representing one slot in the buffer.
537        // Ownership of the permit is conceptually transferred to the
538        // buffered event: it is released later by `poll()` via
539        // `Semaphore::add_permits(1)`. We therefore `forget()` the
540        // permit guard so it is NOT auto-released on scope exit — that
541        // would double-release and let the buffer grow without bound,
542        // defeating the whole point of `Block` backpressure.
543        let permit = self
544            .semaphore
545            .acquire()
546            .await
547            .map_err(|e| anyhow!("Failed to acquire semaphore: {}", e))?;
548
549        let mut buffer = self.buffer.lock().await;
550        buffer.push_back((event, Utc::now()));
551
552        let buffer_size = buffer.len();
553        self.metrics_queue_depth.set(buffer_size as f64);
554        drop(buffer);
555
556        // Transfer ownership of the permit to the buffered event.
557        // `poll()` will call `add_permits(1)` to release it on consume.
558        permit.forget();
559
560        self.update_flow_control(buffer_size).await;
561
562        Ok(())
563    }
564
565    /// Offer with exponential backoff
566    async fn offer_with_backoff(
567        &self,
568        event: T,
569        initial_delay_ms: u64,
570        max_delay_ms: u64,
571        multiplier: f64,
572    ) -> Result<()> {
573        let mut delay_ms = initial_delay_ms;
574        let mut retries = 0;
575        const MAX_RETRIES: u32 = 10;
576
577        loop {
578            let buffer = self.buffer.lock().await;
579            let buffer_size = buffer.len();
580            drop(buffer);
581
582            if buffer_size < self.config.max_buffer_size {
583                let mut buffer = self.buffer.lock().await;
584                buffer.push_back((event, Utc::now()));
585                drop(buffer);
586
587                self.update_flow_control(buffer_size + 1).await;
588                return Ok(());
589            }
590
591            if retries >= MAX_RETRIES {
592                let mut stats = self.stats.lock().await;
593                stats.events_dropped += 1;
594                return Err(anyhow!("Max retries exceeded, dropping event"));
595            }
596
597            // Exponential backoff
598            tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
599
600            delay_ms = ((delay_ms as f64 * multiplier) as u64).min(max_delay_ms);
601            retries += 1;
602
603            let mut stats = self.stats.lock().await;
604            stats.events_blocked += 1;
605            drop(stats);
606        }
607    }
608
609    /// Offer with adaptive throttling using SciRS2
610    async fn offer_adaptive(
611        &self,
612        event: T,
613        target_throughput: f64,
614        adjustment_factor: f64,
615    ) -> Result<()> {
616        // Measure current throughput
617        let current_throughput = self.measure_throughput().await;
618
619        // Adaptive delay based on throughput
620        if current_throughput > target_throughput {
621            let delay_ms =
622                ((current_throughput / target_throughput - 1.0) * adjustment_factor) as u64;
623            tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
624        }
625
626        // Add to buffer
627        let mut buffer = self.buffer.lock().await;
628
629        if buffer.len() >= self.config.max_buffer_size {
630            let mut stats = self.stats.lock().await;
631            stats.events_dropped += 1;
632            drop(stats);
633
634            return Err(anyhow!("Buffer full even with adaptive throttling"));
635        }
636
637        buffer.push_back((event, Utc::now()));
638        let buffer_size = buffer.len();
639        drop(buffer);
640
641        self.update_flow_control(buffer_size).await;
642
643        Ok(())
644    }
645
646    /// Poll an event from the controller
647    pub async fn poll(&self) -> Result<Option<T>> {
648        let mut buffer = self.buffer.lock().await;
649
650        if let Some((event, timestamp)) = buffer.pop_front() {
651            let buffer_size = buffer.len();
652            self.metrics_queue_depth.set(buffer_size as f64);
653            drop(buffer);
654
655            // Release semaphore permit
656            self.semaphore.add_permits(1);
657
658            // Calculate and record latency
659            let latency = (Utc::now() - timestamp).num_milliseconds() as f64;
660            self.metrics_latency.observe(latency / 1000.0); // Convert to seconds
661
662            // Update metrics
663            self.metrics_events_processed.inc();
664
665            // Update stats
666            let mut stats = self.stats.lock().await;
667            stats.events_processed += 1;
668
669            let alpha = 0.1;
670            stats.avg_latency_ms = alpha * latency + (1.0 - alpha) * stats.avg_latency_ms;
671
672            drop(stats);
673
674            self.update_flow_control(buffer_size).await;
675            self.record_throughput().await;
676
677            Ok(Some(event))
678        } else {
679            Ok(None)
680        }
681    }
682
683    /// Update flow control signal
684    async fn update_flow_control(&self, buffer_size: usize) {
685        let utilization = buffer_size as f64 / self.config.max_buffer_size as f64;
686
687        let signal = if utilization >= self.config.high_water_mark {
688            FlowControlSignal::Stop
689        } else if utilization >= self.config.low_water_mark {
690            FlowControlSignal::SlowDown
691        } else {
692            FlowControlSignal::Proceed
693        };
694
695        let mut flow_control = self.flow_control.lock().await;
696        if *flow_control != signal {
697            debug!(
698                "Flow control signal changed: {:?} -> {:?}",
699                *flow_control, signal
700            );
701
702            if signal != FlowControlSignal::Proceed {
703                self.metrics_backpressure_events.inc();
704                let mut stats = self.stats.lock().await;
705                stats.backpressure_events += 1;
706            }
707        }
708        *flow_control = signal;
709
710        // Update stats
711        let mut stats = self.stats.lock().await;
712        stats.buffer_size = buffer_size;
713        stats.buffer_utilization = utilization;
714    }
715
716    /// Record throughput measurement
717    async fn record_throughput(&self) {
718        let now = Utc::now();
719        let mut history = self.throughput_history.lock().await;
720
721        history.push_back((now, 1));
722
723        // Clean old measurements
724        let window_start = now - self.config.measurement_window;
725        while let Some((timestamp, _)) = history.front() {
726            if *timestamp < window_start {
727                history.pop_front();
728            } else {
729                break;
730            }
731        }
732    }
733
734    /// Measure current throughput
735    async fn measure_throughput(&self) -> f64 {
736        let now = Utc::now();
737        let history = self.throughput_history.lock().await;
738
739        if history.is_empty() {
740            return 0.0;
741        }
742
743        let window_start = now - self.config.measurement_window;
744        let count: u64 = history
745            .iter()
746            .filter(|(timestamp, _)| *timestamp >= window_start)
747            .map(|(_, count)| count)
748            .sum();
749
750        let elapsed_seconds = self.config.measurement_window.num_seconds() as f64;
751        count as f64 / elapsed_seconds
752    }
753
754    /// Get current flow control signal
755    pub async fn flow_control_signal(&self) -> FlowControlSignal {
756        *self.flow_control.lock().await
757    }
758
759    /// Get statistics
760    pub async fn stats(&self) -> BackpressureStats {
761        let stats = self.stats.lock().await;
762        let mut result = stats.clone();
763
764        // Update current throughput
765        drop(stats);
766        result.current_throughput = self.measure_throughput().await;
767
768        // Update circuit breaker info
769        result.circuit_state = *self.circuit_state.lock().await;
770        result.circuit_failures = *self.circuit_failures.lock().await;
771        result.circuit_successes = *self.circuit_successes.lock().await;
772
773        result
774    }
775
776    /// Get circuit breaker state
777    pub async fn circuit_state(&self) -> CircuitState {
778        *self.circuit_state.lock().await
779    }
780
781    /// Get buffer size
782    pub async fn buffer_size(&self) -> usize {
783        self.buffer.lock().await.len()
784    }
785
786    /// Clear buffer
787    pub async fn clear(&self) {
788        let mut buffer = self.buffer.lock().await;
789        let cleared_count = buffer.len();
790        buffer.clear();
791
792        // Release all permits
793        self.semaphore.add_permits(cleared_count);
794
795        let mut stats = self.stats.lock().await;
796        stats.buffer_size = 0;
797        stats.buffer_utilization = 0.0;
798    }
799}
800
801/// Rate limiter with token bucket algorithm
802pub struct RateLimiter {
803    tokens: Arc<Mutex<f64>>,
804    max_tokens: f64,
805    refill_rate: f64, // tokens per second
806    last_refill: Arc<Mutex<DateTime<Utc>>>,
807}
808
809impl RateLimiter {
810    /// Create a new rate limiter
811    pub fn new(max_tokens: f64, refill_rate: f64) -> Self {
812        Self {
813            tokens: Arc::new(Mutex::new(max_tokens)),
814            max_tokens,
815            refill_rate,
816            last_refill: Arc::new(Mutex::new(Utc::now())),
817        }
818    }
819
820    /// Try to acquire a token
821    pub async fn try_acquire(&self) -> bool {
822        self.refill_tokens().await;
823
824        let mut tokens = self.tokens.lock().await;
825        if *tokens >= 1.0 {
826            *tokens -= 1.0;
827            true
828        } else {
829            false
830        }
831    }
832
833    /// Acquire a token (blocking)
834    pub async fn acquire(&self) -> Result<()> {
835        loop {
836            if self.try_acquire().await {
837                return Ok(());
838            }
839
840            // Wait for refill
841            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
842        }
843    }
844
845    /// Refill tokens based on elapsed time
846    async fn refill_tokens(&self) {
847        let now = Utc::now();
848        let mut last_refill = self.last_refill.lock().await;
849
850        let elapsed = (now - *last_refill).num_milliseconds() as f64 / 1000.0;
851        let new_tokens = elapsed * self.refill_rate;
852
853        if new_tokens > 0.0 {
854            let mut tokens = self.tokens.lock().await;
855            *tokens = (*tokens + new_tokens).min(self.max_tokens);
856            *last_refill = now;
857        }
858    }
859
860    /// Get current token count
861    pub async fn available_tokens(&self) -> f64 {
862        self.refill_tokens().await;
863        *self.tokens.lock().await
864    }
865}
866
867#[cfg(test)]
868mod tests {
869    use super::*;
870
871    #[tokio::test]
872    async fn test_backpressure_drop_oldest() {
873        let config = BackpressureConfig {
874            max_buffer_size: 3,
875            strategy: BackpressureStrategy::DropOldest,
876            high_water_mark: 1.5, // Disable degradation
877            ..Default::default()
878        };
879
880        let controller = BackpressureController::new(config);
881
882        // Fill buffer
883        for i in 0..5 {
884            controller.offer(i).await.unwrap();
885        }
886
887        // Should have dropped 2 oldest (0, 1)
888        assert_eq!(controller.buffer_size().await, 3);
889
890        // Poll should return 2 (oldest after drops)
891        let event = controller.poll().await.unwrap().unwrap();
892        assert_eq!(event, 2);
893    }
894
895    #[tokio::test]
896    async fn test_backpressure_drop_newest() {
897        let config = BackpressureConfig {
898            max_buffer_size: 3,
899            strategy: BackpressureStrategy::DropNewest,
900            high_water_mark: 1.5, // Disable degradation
901            ..Default::default()
902        };
903
904        let controller = BackpressureController::new(config);
905
906        // Fill buffer
907        for i in 0..5 {
908            controller.offer(i).await.unwrap();
909        }
910
911        // Should have kept first 3, dropped 3 and 4
912        assert_eq!(controller.buffer_size().await, 3);
913
914        let event = controller.poll().await.unwrap().unwrap();
915        assert_eq!(event, 0);
916    }
917
918    #[tokio::test]
919    async fn test_flow_control_signals() {
920        let config = BackpressureConfig {
921            max_buffer_size: 100,
922            high_water_mark: 0.8,
923            low_water_mark: 0.2,
924            degradation: DegradationStrategy::ReduceThroughput {
925                reduction_percent: 0.0, // Disable degradation
926            },
927            ..Default::default()
928        };
929
930        let controller = BackpressureController::new(config);
931
932        // Low utilization
933        assert_eq!(
934            controller.flow_control_signal().await,
935            FlowControlSignal::Proceed
936        );
937
938        // Fill to medium utilization
939        for i in 0..30 {
940            controller.offer(i).await.unwrap();
941        }
942
943        assert_eq!(
944            controller.flow_control_signal().await,
945            FlowControlSignal::SlowDown
946        );
947
948        // Fill to high utilization
949        for i in 30..85 {
950            controller.offer(i).await.unwrap();
951        }
952
953        assert_eq!(
954            controller.flow_control_signal().await,
955            FlowControlSignal::Stop
956        );
957    }
958
959    #[tokio::test]
960    async fn test_rate_limiter() {
961        let limiter = RateLimiter::new(10.0, 10.0); // 10 tokens, 10/second refill
962
963        // Should be able to acquire 10 tokens
964        for _ in 0..10 {
965            assert!(limiter.try_acquire().await);
966        }
967
968        // 11th should fail
969        assert!(!limiter.try_acquire().await);
970
971        // Wait for refill
972        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
973
974        // Should be able to acquire again
975        assert!(limiter.try_acquire().await);
976    }
977
978    // Circuit Breaker Tests
979    #[tokio::test]
980    async fn test_circuit_breaker_closed_to_open() {
981        let config = BackpressureConfig {
982            max_buffer_size: 100,
983            strategy: BackpressureStrategy::Block,
984            circuit_breaker: CircuitBreakerConfig {
985                enabled: true,
986                failure_threshold: 3,
987                success_threshold: 2,
988                timeout: Duration::from_millis(100),
989                half_open_max_calls: 2,
990            },
991            ..Default::default()
992        };
993
994        let controller = BackpressureController::<i32>::new(config);
995
996        // Initial state should be Closed
997        assert_eq!(controller.circuit_state().await, CircuitState::Closed);
998
999        // Manually trigger failures by recording them directly
1000        for _ in 0..3 {
1001            controller.record_circuit_failure().await;
1002        }
1003
1004        // After enough failures, circuit should open
1005        assert_eq!(controller.circuit_state().await, CircuitState::Open);
1006    }
1007
1008    #[tokio::test]
1009    async fn test_circuit_breaker_open_to_half_open() {
1010        let config = BackpressureConfig {
1011            max_buffer_size: 100,
1012            strategy: BackpressureStrategy::Block,
1013            circuit_breaker: CircuitBreakerConfig {
1014                enabled: true,
1015                failure_threshold: 3,
1016                success_threshold: 2,
1017                timeout: Duration::from_millis(50),
1018                half_open_max_calls: 2,
1019            },
1020            ..Default::default()
1021        };
1022
1023        let controller = BackpressureController::<i32>::new(config);
1024
1025        // Manually cause circuit to open by recording failures
1026        for _ in 0..3 {
1027            controller.record_circuit_failure().await;
1028        }
1029
1030        assert_eq!(controller.circuit_state().await, CircuitState::Open);
1031
1032        // Wait for timeout
1033        tokio::time::sleep(Duration::from_millis(100)).await;
1034
1035        // Try to check circuit state - this should transition to HalfOpen
1036        let _ = controller.check_circuit_state().await;
1037        assert_eq!(controller.circuit_state().await, CircuitState::HalfOpen);
1038    }
1039
1040    #[tokio::test]
1041    async fn test_circuit_breaker_half_open_to_closed() {
1042        let config = BackpressureConfig {
1043            max_buffer_size: 100, // Large enough to allow successes
1044            strategy: BackpressureStrategy::Block,
1045            circuit_breaker: CircuitBreakerConfig {
1046                enabled: true,
1047                failure_threshold: 2,
1048                success_threshold: 2,
1049                timeout: Duration::from_millis(50),
1050                half_open_max_calls: 5,
1051            },
1052            ..Default::default()
1053        };
1054
1055        let controller = BackpressureController::<i32>::new(config);
1056
1057        // Manually set state to HalfOpen for testing
1058        *controller.circuit_state.lock().await = CircuitState::HalfOpen;
1059
1060        // Record successes
1061        for _ in 0..2 {
1062            controller.record_circuit_success().await;
1063        }
1064
1065        // Should transition to Closed
1066        assert_eq!(controller.circuit_state().await, CircuitState::Closed);
1067    }
1068
1069    // Stress Tests
1070    #[tokio::test]
1071    async fn test_stress_high_load() {
1072        let config = BackpressureConfig {
1073            max_buffer_size: 1000,
1074            strategy: BackpressureStrategy::DropOldest,
1075            ..Default::default()
1076        };
1077
1078        let controller = Arc::new(BackpressureController::new(config));
1079
1080        // Spawn multiple producers
1081        let mut handles = vec![];
1082        for producer_id in 0..10 {
1083            let controller_clone = controller.clone();
1084            let handle = tokio::spawn(async move {
1085                for i in 0..1000 {
1086                    let value = producer_id * 1000 + i;
1087                    let _ = controller_clone.offer(value).await;
1088                }
1089            });
1090            handles.push(handle);
1091        }
1092
1093        // Wait for all producers
1094        for handle in handles {
1095            handle.await.unwrap();
1096        }
1097
1098        // Verify stats
1099        let stats = controller.stats().await;
1100        assert_eq!(stats.events_received, 10000);
1101        assert!(stats.buffer_size <= 1000);
1102    }
1103
1104    #[tokio::test]
1105    async fn test_stress_concurrent_offer_and_poll() {
1106        let config = BackpressureConfig {
1107            max_buffer_size: 500,
1108            strategy: BackpressureStrategy::Block,
1109            ..Default::default()
1110        };
1111
1112        let controller = Arc::new(BackpressureController::new(config));
1113
1114        // Spawn producer
1115        let producer_controller = controller.clone();
1116        let producer = tokio::spawn(async move {
1117            for i in 0..5000 {
1118                let _ = producer_controller.offer(i).await;
1119            }
1120        });
1121
1122        // Spawn consumer with optimized polling strategy
1123        let consumer_controller = controller.clone();
1124        let consumer = tokio::spawn(async move {
1125            let mut count = 0;
1126            let timeout_duration = Duration::from_secs(10);
1127            let start_time = Instant::now();
1128            let mut consecutive_empty_polls = 0;
1129
1130            loop {
1131                // Add timeout to prevent infinite loops
1132                if start_time.elapsed() > timeout_duration {
1133                    panic!(
1134                        "Consumer timeout after 10 seconds, consumed {} events",
1135                        count
1136                    );
1137                }
1138
1139                match consumer_controller.poll().await {
1140                    Ok(Some(_)) => {
1141                        count += 1;
1142                        consecutive_empty_polls = 0;
1143                        if count >= 5000 {
1144                            break;
1145                        }
1146                        // Don't sleep when successfully consuming - keep polling
1147                    }
1148                    Ok(None) => {
1149                        // Adaptive backoff: sleep longer after consecutive empty polls
1150                        consecutive_empty_polls += 1;
1151                        let sleep_duration = if consecutive_empty_polls < 5 {
1152                            Duration::from_micros(100) // Initial backoff
1153                        } else if consecutive_empty_polls < 20 {
1154                            Duration::from_micros(500) // Medium backoff
1155                        } else {
1156                            Duration::from_millis(1) // Maximum backoff
1157                        };
1158                        tokio::time::sleep(sleep_duration).await;
1159                    }
1160                    Err(_) => {
1161                        // Error case - use medium backoff
1162                        tokio::time::sleep(Duration::from_micros(500)).await;
1163                    }
1164                }
1165            }
1166            count
1167        });
1168
1169        // Wait for both with timeout
1170        let producer_result = tokio::time::timeout(Duration::from_secs(10), producer).await;
1171        assert!(producer_result.is_ok(), "Producer timeout");
1172        producer_result.unwrap().unwrap();
1173
1174        let consumer_result = tokio::time::timeout(Duration::from_secs(10), consumer).await;
1175        assert!(consumer_result.is_ok(), "Consumer timeout");
1176        let consumed = consumer_result.unwrap().unwrap();
1177
1178        assert_eq!(consumed, 5000);
1179
1180        // Verify stats
1181        let stats = controller.stats().await;
1182        assert_eq!(stats.events_received, 5000);
1183        assert_eq!(stats.events_processed, 5000);
1184    }
1185
1186    // Degradation Strategy Tests
1187    #[tokio::test]
1188    async fn test_degradation_reduce_throughput() {
1189        let config = BackpressureConfig {
1190            max_buffer_size: 10,
1191            strategy: BackpressureStrategy::DropOldest,
1192            high_water_mark: 0.5, // Trigger degradation early
1193            degradation: DegradationStrategy::ReduceThroughput {
1194                reduction_percent: 50.0,
1195            },
1196            ..Default::default()
1197        };
1198
1199        let controller = BackpressureController::new(config);
1200
1201        // Fill buffer to trigger degradation
1202        for i in 0..20 {
1203            let _ = controller.offer(i).await;
1204        }
1205
1206        let stats = controller.stats().await;
1207        // Some events should be dropped due to degradation
1208        assert!(stats.events_dropped > 0);
1209    }
1210
1211    #[tokio::test]
1212    async fn test_degradation_sampling() {
1213        let config = BackpressureConfig {
1214            max_buffer_size: 10,
1215            strategy: BackpressureStrategy::DropOldest,
1216            high_water_mark: 0.5,
1217            degradation: DegradationStrategy::Sampling { sample_rate: 0.5 },
1218            ..Default::default()
1219        };
1220
1221        let controller = BackpressureController::new(config);
1222
1223        // Fill buffer
1224        for i in 0..20 {
1225            let _ = controller.offer(i).await;
1226        }
1227
1228        let stats = controller.stats().await;
1229        // All events are received, but roughly half should be dropped due to sampling
1230        assert_eq!(stats.events_received, 20);
1231        assert!(stats.events_dropped > 0); // Some should be dropped
1232        assert!(stats.buffer_size < 20); // Not all made it to buffer
1233    }
1234
1235    // Metrics Tests
1236    #[tokio::test]
1237    async fn test_metrics_collection() {
1238        let config = BackpressureConfig {
1239            max_buffer_size: 100,
1240            strategy: BackpressureStrategy::Block,
1241            ..Default::default()
1242        };
1243
1244        let controller = BackpressureController::new(config);
1245
1246        // Verify initial metrics
1247        assert_eq!(controller.metrics_events_received.get(), 0);
1248        assert_eq!(controller.metrics_events_processed.get(), 0);
1249
1250        // Offer and poll events
1251        for i in 0..10 {
1252            controller.offer(i).await.unwrap();
1253        }
1254
1255        assert_eq!(controller.metrics_events_received.get(), 10);
1256
1257        for _ in 0..5 {
1258            controller.poll().await.unwrap();
1259        }
1260
1261        assert_eq!(controller.metrics_events_processed.get(), 5);
1262        assert_eq!(controller.metrics_queue_depth.get(), 5.0);
1263    }
1264
1265    #[tokio::test]
1266    async fn test_metrics_latency() {
1267        let config = BackpressureConfig {
1268            max_buffer_size: 100,
1269            strategy: BackpressureStrategy::Block,
1270            ..Default::default()
1271        };
1272
1273        let controller = BackpressureController::new(config);
1274
1275        // Offer events
1276        for i in 0..10 {
1277            controller.offer(i).await.unwrap();
1278        }
1279
1280        // Wait a bit to create measurable latency
1281        tokio::time::sleep(Duration::from_millis(10)).await;
1282
1283        // Poll events
1284        for _ in 0..10 {
1285            controller.poll().await.unwrap();
1286        }
1287
1288        // Check latency histogram
1289        let stats = controller.metrics_latency.get_stats();
1290        assert!(stats.count == 10);
1291        assert!(stats.mean > 0.0);
1292    }
1293
1294    #[tokio::test]
1295    async fn test_metrics_backpressure_events() {
1296        let config = BackpressureConfig {
1297            max_buffer_size: 100,
1298            strategy: BackpressureStrategy::DropOldest,
1299            high_water_mark: 0.5,
1300            degradation: DegradationStrategy::ReduceThroughput {
1301                reduction_percent: 0.0, // Disable degradation
1302            },
1303            ..Default::default()
1304        };
1305
1306        let controller = BackpressureController::new(config);
1307
1308        // Fill buffer to trigger backpressure
1309        for i in 0..60 {
1310            controller.offer(i).await.unwrap();
1311        }
1312
1313        // Should have triggered backpressure events
1314        assert!(controller.metrics_backpressure_events.get() > 0);
1315    }
1316}