1use anyhow::{anyhow, Result};
15use chrono::{DateTime, Duration as ChronoDuration, Utc};
16use scirs2_core::metrics::{Counter, Gauge, Histogram};
17use serde::{Deserialize, Serialize};
18use std::collections::VecDeque;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{Mutex, Semaphore};
23use tracing::{debug, info, warn};
24
25type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
29pub enum CircuitState {
30 #[default]
32 Closed,
33 Open,
35 HalfOpen,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum DegradationStrategy {
42 ReduceThroughput { reduction_percent: f64 },
44 SkipNonCritical,
46 ExpandBuffer { factor: f64 },
48 Sampling { sample_rate: f64 },
50 Combined(Vec<DegradationStrategy>),
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub enum BackpressureStrategy {
57 DropOldest,
59 DropNewest,
61 Block,
63 ExponentialBackoff {
65 initial_delay_ms: u64,
66 max_delay_ms: u64,
67 multiplier: f64,
68 },
69 Adaptive {
71 target_throughput: f64,
72 adjustment_factor: f64,
73 },
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum FlowControlSignal {
79 Proceed,
81 SlowDown,
83 Stop,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct CircuitBreakerConfig {
90 pub enabled: bool,
92 pub failure_threshold: u32,
94 pub success_threshold: u32,
96 pub timeout: Duration,
98 pub half_open_max_calls: u32,
100}
101
102impl Default for CircuitBreakerConfig {
103 fn default() -> Self {
104 Self {
105 enabled: true,
106 failure_threshold: 5,
107 success_threshold: 3,
108 timeout: Duration::from_secs(30),
109 half_open_max_calls: 3,
110 }
111 }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct BackpressureConfig {
117 pub max_buffer_size: usize,
119 pub strategy: BackpressureStrategy,
121 pub high_water_mark: f64,
123 pub low_water_mark: f64,
125 pub enable_adaptive: bool,
127 pub measurement_window: ChronoDuration,
129 pub circuit_breaker: CircuitBreakerConfig,
131 pub degradation: DegradationStrategy,
133}
134
135impl Default for BackpressureConfig {
136 fn default() -> Self {
137 Self {
138 max_buffer_size: 10000,
139 strategy: BackpressureStrategy::Block,
140 high_water_mark: 0.8,
141 low_water_mark: 0.2,
142 enable_adaptive: true,
143 measurement_window: ChronoDuration::seconds(10),
144 circuit_breaker: CircuitBreakerConfig::default(),
145 degradation: DegradationStrategy::ReduceThroughput {
146 reduction_percent: 50.0,
147 },
148 }
149 }
150}
151
152#[derive(Debug, Clone, Default)]
154pub struct BackpressureStats {
155 pub events_received: u64,
156 pub events_processed: u64,
157 pub events_dropped: u64,
158 pub events_blocked: u64,
159 pub buffer_size: usize,
160 pub buffer_utilization: f64,
161 pub current_throughput: f64,
162 pub backpressure_events: u64,
163 pub avg_latency_ms: f64,
164 pub circuit_state: CircuitState,
165 pub circuit_failures: u32,
166 pub circuit_successes: u32,
167 pub degradation_active: bool,
168}
169
170type TimestampedBuffer<T> = Arc<Mutex<VecDeque<(T, DateTime<Utc>)>>>;
172
173type ThroughputHistory = Arc<Mutex<VecDeque<(DateTime<Utc>, u64)>>>;
175
176#[derive(Debug, Clone)]
178pub struct BackpressureMetrics {
179 pub events_received: u64,
180 pub events_processed: u64,
181 pub events_dropped: u64,
182 pub queue_depth: f64,
183 pub latency_stats: scirs2_core::metrics::HistogramStats,
184 pub backpressure_events: u64,
185 pub circuit_state_changes: u64,
186}
187
188pub struct BackpressureController<T> {
190 config: BackpressureConfig,
191 buffer: TimestampedBuffer<T>,
192 stats: Arc<Mutex<BackpressureStats>>,
193 flow_control: Arc<Mutex<FlowControlSignal>>,
194 semaphore: Arc<Semaphore>,
195 throughput_history: ThroughputHistory,
196 circuit_state: Arc<Mutex<CircuitState>>,
198 circuit_failures: Arc<Mutex<u32>>,
199 circuit_successes: Arc<Mutex<u32>>,
200 circuit_last_failure: Arc<Mutex<Option<Instant>>>,
201 circuit_half_open_calls: Arc<Mutex<u32>>,
202 metrics_events_received: Arc<Counter>,
204 metrics_events_processed: Arc<Counter>,
205 metrics_events_dropped: Arc<Counter>,
206 metrics_queue_depth: Arc<Gauge>,
207 metrics_latency: Arc<Histogram>,
208 metrics_backpressure_events: Arc<Counter>,
209 metrics_circuit_state_changes: Arc<Counter>,
210}
211
212impl<T: Clone + Send> BackpressureController<T> {
213 pub fn new(config: BackpressureConfig) -> Self {
215 let max_permits = config.max_buffer_size;
216
217 let metrics_events_received =
219 Arc::new(Counter::new("backpressure_events_received".to_string()));
220 let metrics_events_processed =
221 Arc::new(Counter::new("backpressure_events_processed".to_string()));
222 let metrics_events_dropped =
223 Arc::new(Counter::new("backpressure_events_dropped".to_string()));
224 let metrics_queue_depth = Arc::new(Gauge::new("backpressure_queue_depth".to_string()));
225 let metrics_latency = Arc::new(Histogram::new("backpressure_latency_seconds".to_string()));
226 let metrics_backpressure_events =
227 Arc::new(Counter::new("backpressure_events_total".to_string()));
228 let metrics_circuit_state_changes = Arc::new(Counter::new(
229 "backpressure_circuit_state_changes".to_string(),
230 ));
231
232 Self {
233 config,
234 buffer: Arc::new(Mutex::new(VecDeque::new())),
235 stats: Arc::new(Mutex::new(BackpressureStats::default())),
236 flow_control: Arc::new(Mutex::new(FlowControlSignal::Proceed)),
237 semaphore: Arc::new(Semaphore::new(max_permits)),
238 throughput_history: Arc::new(Mutex::new(VecDeque::new())),
239 circuit_state: Arc::new(Mutex::new(CircuitState::Closed)),
240 circuit_failures: Arc::new(Mutex::new(0)),
241 circuit_successes: Arc::new(Mutex::new(0)),
242 circuit_last_failure: Arc::new(Mutex::new(None)),
243 circuit_half_open_calls: Arc::new(Mutex::new(0)),
244 metrics_events_received,
245 metrics_events_processed,
246 metrics_events_dropped,
247 metrics_queue_depth,
248 metrics_latency,
249 metrics_backpressure_events,
250 metrics_circuit_state_changes,
251 }
252 }
253
254 pub fn get_metrics(&self) -> BackpressureMetrics {
256 BackpressureMetrics {
257 events_received: self.metrics_events_received.get(),
258 events_processed: self.metrics_events_processed.get(),
259 events_dropped: self.metrics_events_dropped.get(),
260 queue_depth: self.metrics_queue_depth.get(),
261 latency_stats: self.metrics_latency.get_stats(),
262 backpressure_events: self.metrics_backpressure_events.get(),
263 circuit_state_changes: self.metrics_circuit_state_changes.get(),
264 }
265 }
266
267 async fn check_circuit_state(&self) -> Result<bool> {
269 if !self.config.circuit_breaker.enabled {
270 return Ok(true); }
272
273 let mut state = self.circuit_state.lock().await;
274 let circuit_config = &self.config.circuit_breaker;
275
276 match *state {
277 CircuitState::Closed => Ok(true),
278 CircuitState::Open => {
279 let last_failure = self.circuit_last_failure.lock().await;
280 if let Some(last_fail_time) = *last_failure {
281 if last_fail_time.elapsed() >= circuit_config.timeout {
282 *state = CircuitState::HalfOpen;
284 *self.circuit_half_open_calls.lock().await = 0;
285 self.metrics_circuit_state_changes.inc();
286 info!("Circuit breaker transitioned to HalfOpen");
287 Ok(true)
288 } else {
289 Ok(false) }
291 } else {
292 Ok(false)
293 }
294 }
295 CircuitState::HalfOpen => {
296 let mut half_open_calls = self.circuit_half_open_calls.lock().await;
297 if *half_open_calls < circuit_config.half_open_max_calls {
298 *half_open_calls += 1;
299 Ok(true)
300 } else {
301 Ok(false) }
303 }
304 }
305 }
306
307 async fn record_circuit_success(&self) {
309 if !self.config.circuit_breaker.enabled {
310 return;
311 }
312
313 let mut state = self.circuit_state.lock().await;
314 let circuit_config = &self.config.circuit_breaker;
315
316 match *state {
317 CircuitState::HalfOpen => {
318 let mut successes = self.circuit_successes.lock().await;
319 *successes += 1;
320 if *successes >= circuit_config.success_threshold {
321 *state = CircuitState::Closed;
323 *self.circuit_failures.lock().await = 0;
324 *successes = 0; self.metrics_circuit_state_changes.inc();
326 info!("Circuit breaker transitioned to Closed");
327 }
328 }
329 CircuitState::Closed => {
330 *self.circuit_failures.lock().await = 0;
332 }
333 CircuitState::Open => {
334 *state = CircuitState::Closed;
336 *self.circuit_failures.lock().await = 0;
337 self.metrics_circuit_state_changes.inc();
338 }
339 }
340 }
341
342 async fn record_circuit_failure(&self) {
344 if !self.config.circuit_breaker.enabled {
345 return;
346 }
347
348 let mut state = self.circuit_state.lock().await;
349 let circuit_config = &self.config.circuit_breaker;
350 let mut failures = self.circuit_failures.lock().await;
351
352 *failures += 1;
353 *self.circuit_last_failure.lock().await = Some(Instant::now());
354
355 if *failures >= circuit_config.failure_threshold && *state != CircuitState::Open {
356 *state = CircuitState::Open;
358 *self.circuit_successes.lock().await = 0;
359 self.metrics_circuit_state_changes.inc();
360 warn!(
361 "Circuit breaker transitioned to Open after {} failures",
362 failures
363 );
364 }
365 }
366
367 async fn apply_degradation(&self, _event: &T) -> Result<bool> {
369 let stats = self.stats.lock().await;
370 let utilization = stats.buffer_utilization;
371 drop(stats);
372
373 if utilization < self.config.high_water_mark {
375 return Ok(true); }
377
378 self.apply_degradation_strategy(&self.config.degradation)
379 .await
380 }
381
382 fn apply_degradation_strategy<'a>(
384 &'a self,
385 strategy: &'a DegradationStrategy,
386 ) -> BoxFuture<'a, Result<bool>> {
387 Box::pin(async move {
388 match strategy {
389 DegradationStrategy::ReduceThroughput { reduction_percent } => {
390 let threshold = 1.0 - (reduction_percent / 100.0);
392 Ok(fastrand::f64() < threshold)
393 }
394 DegradationStrategy::SkipNonCritical => {
395 Ok(true)
397 }
398 DegradationStrategy::ExpandBuffer { factor } => {
399 let expanded_size = (self.config.max_buffer_size as f64 * factor) as usize;
401 let buffer = self.buffer.lock().await;
402 Ok(buffer.len() < expanded_size)
403 }
404 DegradationStrategy::Sampling { sample_rate } => {
405 Ok(fastrand::f64() < *sample_rate)
407 }
408 DegradationStrategy::Combined(strategies) => {
409 for strat in strategies {
411 if !self.apply_degradation_strategy(strat).await? {
412 return Ok(false);
413 }
414 }
415 Ok(true)
416 }
417 }
418 })
419 }
420
421 pub async fn offer(&self, event: T) -> Result<()> {
423 self.metrics_events_received.inc();
425 let mut stats = self.stats.lock().await;
426 stats.events_received += 1;
427 drop(stats);
428
429 if !self.check_circuit_state().await? {
431 self.metrics_events_dropped.inc();
432 return Err(anyhow!("Circuit breaker is open"));
433 }
434
435 if !self.apply_degradation(&event).await? {
437 self.metrics_events_dropped.inc();
438 let mut stats = self.stats.lock().await;
439 stats.events_dropped += 1;
440 stats.degradation_active = true;
441 return Err(anyhow!("Event dropped due to graceful degradation"));
442 }
443
444 let result = match &self.config.strategy {
446 BackpressureStrategy::DropOldest => self.offer_drop_oldest(event).await,
447 BackpressureStrategy::DropNewest => self.offer_drop_newest(event).await,
448 BackpressureStrategy::Block => self.offer_blocking(event).await,
449 BackpressureStrategy::ExponentialBackoff {
450 initial_delay_ms,
451 max_delay_ms,
452 multiplier,
453 } => {
454 self.offer_with_backoff(event, *initial_delay_ms, *max_delay_ms, *multiplier)
455 .await
456 }
457 BackpressureStrategy::Adaptive {
458 target_throughput,
459 adjustment_factor,
460 } => {
461 self.offer_adaptive(event, *target_throughput, *adjustment_factor)
462 .await
463 }
464 };
465
466 match &result {
468 Ok(_) => self.record_circuit_success().await,
469 Err(_) => self.record_circuit_failure().await,
470 }
471
472 result
473 }
474
475 async fn offer_drop_oldest(&self, event: T) -> Result<()> {
477 let mut buffer = self.buffer.lock().await;
478
479 if buffer.len() >= self.config.max_buffer_size {
480 buffer.pop_front();
482
483 self.metrics_events_dropped.inc();
484 let mut stats = self.stats.lock().await;
485 stats.events_dropped += 1;
486 drop(stats);
487
488 warn!("Buffer full, dropped oldest event");
489 }
490
491 buffer.push_back((event, Utc::now()));
492 let buffer_len = buffer.len();
493 self.metrics_queue_depth.set(buffer_len as f64);
494 drop(buffer);
495
496 self.update_flow_control(buffer_len).await;
497
498 Ok(())
499 }
500
501 async fn offer_drop_newest(&self, event: T) -> Result<()> {
503 let mut buffer = self.buffer.lock().await;
504
505 if buffer.len() >= self.config.max_buffer_size {
506 self.metrics_events_dropped.inc();
507 let mut stats = self.stats.lock().await;
508 stats.events_dropped += 1;
509 drop(stats);
510
511 warn!("Buffer full, dropped newest event");
512 return Ok(());
513 }
514
515 buffer.push_back((event, Utc::now()));
516 let buffer_len = buffer.len();
517 self.metrics_queue_depth.set(buffer_len as f64);
518 drop(buffer);
519
520 self.update_flow_control(buffer_len).await;
521
522 Ok(())
523 }
524
525 async fn offer_blocking(&self, event: T) -> Result<()> {
527 let _permit = self
529 .semaphore
530 .acquire()
531 .await
532 .map_err(|e| anyhow!("Failed to acquire semaphore: {}", e))?;
533
534 let mut buffer = self.buffer.lock().await;
535 buffer.push_back((event, Utc::now()));
536
537 let buffer_size = buffer.len();
538 drop(buffer);
539
540 self.update_flow_control(buffer_size).await;
541
542 Ok(())
543 }
544
545 async fn offer_with_backoff(
547 &self,
548 event: T,
549 initial_delay_ms: u64,
550 max_delay_ms: u64,
551 multiplier: f64,
552 ) -> Result<()> {
553 let mut delay_ms = initial_delay_ms;
554 let mut retries = 0;
555 const MAX_RETRIES: u32 = 10;
556
557 loop {
558 let buffer = self.buffer.lock().await;
559 let buffer_size = buffer.len();
560 drop(buffer);
561
562 if buffer_size < self.config.max_buffer_size {
563 let mut buffer = self.buffer.lock().await;
564 buffer.push_back((event, Utc::now()));
565 drop(buffer);
566
567 self.update_flow_control(buffer_size + 1).await;
568 return Ok(());
569 }
570
571 if retries >= MAX_RETRIES {
572 let mut stats = self.stats.lock().await;
573 stats.events_dropped += 1;
574 return Err(anyhow!("Max retries exceeded, dropping event"));
575 }
576
577 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
579
580 delay_ms = ((delay_ms as f64 * multiplier) as u64).min(max_delay_ms);
581 retries += 1;
582
583 let mut stats = self.stats.lock().await;
584 stats.events_blocked += 1;
585 drop(stats);
586 }
587 }
588
589 async fn offer_adaptive(
591 &self,
592 event: T,
593 target_throughput: f64,
594 adjustment_factor: f64,
595 ) -> Result<()> {
596 let current_throughput = self.measure_throughput().await;
598
599 if current_throughput > target_throughput {
601 let delay_ms =
602 ((current_throughput / target_throughput - 1.0) * adjustment_factor) as u64;
603 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
604 }
605
606 let mut buffer = self.buffer.lock().await;
608
609 if buffer.len() >= self.config.max_buffer_size {
610 let mut stats = self.stats.lock().await;
611 stats.events_dropped += 1;
612 drop(stats);
613
614 return Err(anyhow!("Buffer full even with adaptive throttling"));
615 }
616
617 buffer.push_back((event, Utc::now()));
618 let buffer_size = buffer.len();
619 drop(buffer);
620
621 self.update_flow_control(buffer_size).await;
622
623 Ok(())
624 }
625
626 pub async fn poll(&self) -> Result<Option<T>> {
628 let mut buffer = self.buffer.lock().await;
629
630 if let Some((event, timestamp)) = buffer.pop_front() {
631 let buffer_size = buffer.len();
632 self.metrics_queue_depth.set(buffer_size as f64);
633 drop(buffer);
634
635 self.semaphore.add_permits(1);
637
638 let latency = (Utc::now() - timestamp).num_milliseconds() as f64;
640 self.metrics_latency.observe(latency / 1000.0); self.metrics_events_processed.inc();
644
645 let mut stats = self.stats.lock().await;
647 stats.events_processed += 1;
648
649 let alpha = 0.1;
650 stats.avg_latency_ms = alpha * latency + (1.0 - alpha) * stats.avg_latency_ms;
651
652 drop(stats);
653
654 self.update_flow_control(buffer_size).await;
655 self.record_throughput().await;
656
657 Ok(Some(event))
658 } else {
659 Ok(None)
660 }
661 }
662
663 async fn update_flow_control(&self, buffer_size: usize) {
665 let utilization = buffer_size as f64 / self.config.max_buffer_size as f64;
666
667 let signal = if utilization >= self.config.high_water_mark {
668 FlowControlSignal::Stop
669 } else if utilization >= self.config.low_water_mark {
670 FlowControlSignal::SlowDown
671 } else {
672 FlowControlSignal::Proceed
673 };
674
675 let mut flow_control = self.flow_control.lock().await;
676 if *flow_control != signal {
677 debug!(
678 "Flow control signal changed: {:?} -> {:?}",
679 *flow_control, signal
680 );
681
682 if signal != FlowControlSignal::Proceed {
683 self.metrics_backpressure_events.inc();
684 let mut stats = self.stats.lock().await;
685 stats.backpressure_events += 1;
686 }
687 }
688 *flow_control = signal;
689
690 let mut stats = self.stats.lock().await;
692 stats.buffer_size = buffer_size;
693 stats.buffer_utilization = utilization;
694 }
695
696 async fn record_throughput(&self) {
698 let now = Utc::now();
699 let mut history = self.throughput_history.lock().await;
700
701 history.push_back((now, 1));
702
703 let window_start = now - self.config.measurement_window;
705 while let Some((timestamp, _)) = history.front() {
706 if *timestamp < window_start {
707 history.pop_front();
708 } else {
709 break;
710 }
711 }
712 }
713
714 async fn measure_throughput(&self) -> f64 {
716 let now = Utc::now();
717 let history = self.throughput_history.lock().await;
718
719 if history.is_empty() {
720 return 0.0;
721 }
722
723 let window_start = now - self.config.measurement_window;
724 let count: u64 = history
725 .iter()
726 .filter(|(timestamp, _)| *timestamp >= window_start)
727 .map(|(_, count)| count)
728 .sum();
729
730 let elapsed_seconds = self.config.measurement_window.num_seconds() as f64;
731 count as f64 / elapsed_seconds
732 }
733
734 pub async fn flow_control_signal(&self) -> FlowControlSignal {
736 *self.flow_control.lock().await
737 }
738
739 pub async fn stats(&self) -> BackpressureStats {
741 let stats = self.stats.lock().await;
742 let mut result = stats.clone();
743
744 drop(stats);
746 result.current_throughput = self.measure_throughput().await;
747
748 result.circuit_state = *self.circuit_state.lock().await;
750 result.circuit_failures = *self.circuit_failures.lock().await;
751 result.circuit_successes = *self.circuit_successes.lock().await;
752
753 result
754 }
755
756 pub async fn circuit_state(&self) -> CircuitState {
758 *self.circuit_state.lock().await
759 }
760
761 pub async fn buffer_size(&self) -> usize {
763 self.buffer.lock().await.len()
764 }
765
766 pub async fn clear(&self) {
768 let mut buffer = self.buffer.lock().await;
769 let cleared_count = buffer.len();
770 buffer.clear();
771
772 self.semaphore.add_permits(cleared_count);
774
775 let mut stats = self.stats.lock().await;
776 stats.buffer_size = 0;
777 stats.buffer_utilization = 0.0;
778 }
779}
780
781pub struct RateLimiter {
783 tokens: Arc<Mutex<f64>>,
784 max_tokens: f64,
785 refill_rate: f64, last_refill: Arc<Mutex<DateTime<Utc>>>,
787}
788
789impl RateLimiter {
790 pub fn new(max_tokens: f64, refill_rate: f64) -> Self {
792 Self {
793 tokens: Arc::new(Mutex::new(max_tokens)),
794 max_tokens,
795 refill_rate,
796 last_refill: Arc::new(Mutex::new(Utc::now())),
797 }
798 }
799
800 pub async fn try_acquire(&self) -> bool {
802 self.refill_tokens().await;
803
804 let mut tokens = self.tokens.lock().await;
805 if *tokens >= 1.0 {
806 *tokens -= 1.0;
807 true
808 } else {
809 false
810 }
811 }
812
813 pub async fn acquire(&self) -> Result<()> {
815 loop {
816 if self.try_acquire().await {
817 return Ok(());
818 }
819
820 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
822 }
823 }
824
825 async fn refill_tokens(&self) {
827 let now = Utc::now();
828 let mut last_refill = self.last_refill.lock().await;
829
830 let elapsed = (now - *last_refill).num_milliseconds() as f64 / 1000.0;
831 let new_tokens = elapsed * self.refill_rate;
832
833 if new_tokens > 0.0 {
834 let mut tokens = self.tokens.lock().await;
835 *tokens = (*tokens + new_tokens).min(self.max_tokens);
836 *last_refill = now;
837 }
838 }
839
840 pub async fn available_tokens(&self) -> f64 {
842 self.refill_tokens().await;
843 *self.tokens.lock().await
844 }
845}
846
847#[cfg(test)]
848mod tests {
849 use super::*;
850
851 #[tokio::test]
852 async fn test_backpressure_drop_oldest() {
853 let config = BackpressureConfig {
854 max_buffer_size: 3,
855 strategy: BackpressureStrategy::DropOldest,
856 high_water_mark: 1.5, ..Default::default()
858 };
859
860 let controller = BackpressureController::new(config);
861
862 for i in 0..5 {
864 controller.offer(i).await.unwrap();
865 }
866
867 assert_eq!(controller.buffer_size().await, 3);
869
870 let event = controller.poll().await.unwrap().unwrap();
872 assert_eq!(event, 2);
873 }
874
875 #[tokio::test]
876 async fn test_backpressure_drop_newest() {
877 let config = BackpressureConfig {
878 max_buffer_size: 3,
879 strategy: BackpressureStrategy::DropNewest,
880 high_water_mark: 1.5, ..Default::default()
882 };
883
884 let controller = BackpressureController::new(config);
885
886 for i in 0..5 {
888 controller.offer(i).await.unwrap();
889 }
890
891 assert_eq!(controller.buffer_size().await, 3);
893
894 let event = controller.poll().await.unwrap().unwrap();
895 assert_eq!(event, 0);
896 }
897
898 #[tokio::test]
899 async fn test_flow_control_signals() {
900 let config = BackpressureConfig {
901 max_buffer_size: 100,
902 high_water_mark: 0.8,
903 low_water_mark: 0.2,
904 degradation: DegradationStrategy::ReduceThroughput {
905 reduction_percent: 0.0, },
907 ..Default::default()
908 };
909
910 let controller = BackpressureController::new(config);
911
912 assert_eq!(
914 controller.flow_control_signal().await,
915 FlowControlSignal::Proceed
916 );
917
918 for i in 0..30 {
920 controller.offer(i).await.unwrap();
921 }
922
923 assert_eq!(
924 controller.flow_control_signal().await,
925 FlowControlSignal::SlowDown
926 );
927
928 for i in 30..85 {
930 controller.offer(i).await.unwrap();
931 }
932
933 assert_eq!(
934 controller.flow_control_signal().await,
935 FlowControlSignal::Stop
936 );
937 }
938
939 #[tokio::test]
940 async fn test_rate_limiter() {
941 let limiter = RateLimiter::new(10.0, 10.0); for _ in 0..10 {
945 assert!(limiter.try_acquire().await);
946 }
947
948 assert!(!limiter.try_acquire().await);
950
951 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
953
954 assert!(limiter.try_acquire().await);
956 }
957
958 #[tokio::test]
960 async fn test_circuit_breaker_closed_to_open() {
961 let config = BackpressureConfig {
962 max_buffer_size: 100,
963 strategy: BackpressureStrategy::Block,
964 circuit_breaker: CircuitBreakerConfig {
965 enabled: true,
966 failure_threshold: 3,
967 success_threshold: 2,
968 timeout: Duration::from_millis(100),
969 half_open_max_calls: 2,
970 },
971 ..Default::default()
972 };
973
974 let controller = BackpressureController::<i32>::new(config);
975
976 assert_eq!(controller.circuit_state().await, CircuitState::Closed);
978
979 for _ in 0..3 {
981 controller.record_circuit_failure().await;
982 }
983
984 assert_eq!(controller.circuit_state().await, CircuitState::Open);
986 }
987
988 #[tokio::test]
989 async fn test_circuit_breaker_open_to_half_open() {
990 let config = BackpressureConfig {
991 max_buffer_size: 100,
992 strategy: BackpressureStrategy::Block,
993 circuit_breaker: CircuitBreakerConfig {
994 enabled: true,
995 failure_threshold: 3,
996 success_threshold: 2,
997 timeout: Duration::from_millis(50),
998 half_open_max_calls: 2,
999 },
1000 ..Default::default()
1001 };
1002
1003 let controller = BackpressureController::<i32>::new(config);
1004
1005 for _ in 0..3 {
1007 controller.record_circuit_failure().await;
1008 }
1009
1010 assert_eq!(controller.circuit_state().await, CircuitState::Open);
1011
1012 tokio::time::sleep(Duration::from_millis(100)).await;
1014
1015 let _ = controller.check_circuit_state().await;
1017 assert_eq!(controller.circuit_state().await, CircuitState::HalfOpen);
1018 }
1019
1020 #[tokio::test]
1021 async fn test_circuit_breaker_half_open_to_closed() {
1022 let config = BackpressureConfig {
1023 max_buffer_size: 100, strategy: BackpressureStrategy::Block,
1025 circuit_breaker: CircuitBreakerConfig {
1026 enabled: true,
1027 failure_threshold: 2,
1028 success_threshold: 2,
1029 timeout: Duration::from_millis(50),
1030 half_open_max_calls: 5,
1031 },
1032 ..Default::default()
1033 };
1034
1035 let controller = BackpressureController::<i32>::new(config);
1036
1037 *controller.circuit_state.lock().await = CircuitState::HalfOpen;
1039
1040 for _ in 0..2 {
1042 controller.record_circuit_success().await;
1043 }
1044
1045 assert_eq!(controller.circuit_state().await, CircuitState::Closed);
1047 }
1048
1049 #[tokio::test]
1051 async fn test_stress_high_load() {
1052 let config = BackpressureConfig {
1053 max_buffer_size: 1000,
1054 strategy: BackpressureStrategy::DropOldest,
1055 ..Default::default()
1056 };
1057
1058 let controller = Arc::new(BackpressureController::new(config));
1059
1060 let mut handles = vec![];
1062 for producer_id in 0..10 {
1063 let controller_clone = controller.clone();
1064 let handle = tokio::spawn(async move {
1065 for i in 0..1000 {
1066 let value = producer_id * 1000 + i;
1067 let _ = controller_clone.offer(value).await;
1068 }
1069 });
1070 handles.push(handle);
1071 }
1072
1073 for handle in handles {
1075 handle.await.unwrap();
1076 }
1077
1078 let stats = controller.stats().await;
1080 assert_eq!(stats.events_received, 10000);
1081 assert!(stats.buffer_size <= 1000);
1082 }
1083
1084 #[tokio::test]
1085 async fn test_stress_concurrent_offer_and_poll() {
1086 let config = BackpressureConfig {
1087 max_buffer_size: 500,
1088 strategy: BackpressureStrategy::Block,
1089 ..Default::default()
1090 };
1091
1092 let controller = Arc::new(BackpressureController::new(config));
1093
1094 let producer_controller = controller.clone();
1096 let producer = tokio::spawn(async move {
1097 for i in 0..5000 {
1098 let _ = producer_controller.offer(i).await;
1099 }
1100 });
1101
1102 let consumer_controller = controller.clone();
1104 let consumer = tokio::spawn(async move {
1105 let mut count = 0;
1106 let timeout_duration = Duration::from_secs(10);
1107 let start_time = Instant::now();
1108 let mut consecutive_empty_polls = 0;
1109
1110 loop {
1111 if start_time.elapsed() > timeout_duration {
1113 panic!(
1114 "Consumer timeout after 10 seconds, consumed {} events",
1115 count
1116 );
1117 }
1118
1119 match consumer_controller.poll().await {
1120 Ok(Some(_)) => {
1121 count += 1;
1122 consecutive_empty_polls = 0;
1123 if count >= 5000 {
1124 break;
1125 }
1126 }
1128 Ok(None) => {
1129 consecutive_empty_polls += 1;
1131 let sleep_duration = if consecutive_empty_polls < 5 {
1132 Duration::from_micros(100) } else if consecutive_empty_polls < 20 {
1134 Duration::from_micros(500) } else {
1136 Duration::from_millis(1) };
1138 tokio::time::sleep(sleep_duration).await;
1139 }
1140 Err(_) => {
1141 tokio::time::sleep(Duration::from_micros(500)).await;
1143 }
1144 }
1145 }
1146 count
1147 });
1148
1149 let producer_result = tokio::time::timeout(Duration::from_secs(10), producer).await;
1151 assert!(producer_result.is_ok(), "Producer timeout");
1152 producer_result.unwrap().unwrap();
1153
1154 let consumer_result = tokio::time::timeout(Duration::from_secs(10), consumer).await;
1155 assert!(consumer_result.is_ok(), "Consumer timeout");
1156 let consumed = consumer_result.unwrap().unwrap();
1157
1158 assert_eq!(consumed, 5000);
1159
1160 let stats = controller.stats().await;
1162 assert_eq!(stats.events_received, 5000);
1163 assert_eq!(stats.events_processed, 5000);
1164 }
1165
1166 #[tokio::test]
1168 async fn test_degradation_reduce_throughput() {
1169 let config = BackpressureConfig {
1170 max_buffer_size: 10,
1171 strategy: BackpressureStrategy::DropOldest,
1172 high_water_mark: 0.5, degradation: DegradationStrategy::ReduceThroughput {
1174 reduction_percent: 50.0,
1175 },
1176 ..Default::default()
1177 };
1178
1179 let controller = BackpressureController::new(config);
1180
1181 for i in 0..20 {
1183 let _ = controller.offer(i).await;
1184 }
1185
1186 let stats = controller.stats().await;
1187 assert!(stats.events_dropped > 0);
1189 }
1190
1191 #[tokio::test]
1192 async fn test_degradation_sampling() {
1193 let config = BackpressureConfig {
1194 max_buffer_size: 10,
1195 strategy: BackpressureStrategy::DropOldest,
1196 high_water_mark: 0.5,
1197 degradation: DegradationStrategy::Sampling { sample_rate: 0.5 },
1198 ..Default::default()
1199 };
1200
1201 let controller = BackpressureController::new(config);
1202
1203 for i in 0..20 {
1205 let _ = controller.offer(i).await;
1206 }
1207
1208 let stats = controller.stats().await;
1209 assert_eq!(stats.events_received, 20);
1211 assert!(stats.events_dropped > 0); assert!(stats.buffer_size < 20); }
1214
1215 #[tokio::test]
1217 async fn test_metrics_collection() {
1218 let config = BackpressureConfig {
1219 max_buffer_size: 100,
1220 strategy: BackpressureStrategy::Block,
1221 ..Default::default()
1222 };
1223
1224 let controller = BackpressureController::new(config);
1225
1226 assert_eq!(controller.metrics_events_received.get(), 0);
1228 assert_eq!(controller.metrics_events_processed.get(), 0);
1229
1230 for i in 0..10 {
1232 controller.offer(i).await.unwrap();
1233 }
1234
1235 assert_eq!(controller.metrics_events_received.get(), 10);
1236
1237 for _ in 0..5 {
1238 controller.poll().await.unwrap();
1239 }
1240
1241 assert_eq!(controller.metrics_events_processed.get(), 5);
1242 assert_eq!(controller.metrics_queue_depth.get(), 5.0);
1243 }
1244
1245 #[tokio::test]
1246 async fn test_metrics_latency() {
1247 let config = BackpressureConfig {
1248 max_buffer_size: 100,
1249 strategy: BackpressureStrategy::Block,
1250 ..Default::default()
1251 };
1252
1253 let controller = BackpressureController::new(config);
1254
1255 for i in 0..10 {
1257 controller.offer(i).await.unwrap();
1258 }
1259
1260 tokio::time::sleep(Duration::from_millis(10)).await;
1262
1263 for _ in 0..10 {
1265 controller.poll().await.unwrap();
1266 }
1267
1268 let stats = controller.metrics_latency.get_stats();
1270 assert!(stats.count == 10);
1271 assert!(stats.mean > 0.0);
1272 }
1273
1274 #[tokio::test]
1275 async fn test_metrics_backpressure_events() {
1276 let config = BackpressureConfig {
1277 max_buffer_size: 100,
1278 strategy: BackpressureStrategy::DropOldest,
1279 high_water_mark: 0.5,
1280 degradation: DegradationStrategy::ReduceThroughput {
1281 reduction_percent: 0.0, },
1283 ..Default::default()
1284 };
1285
1286 let controller = BackpressureController::new(config);
1287
1288 for i in 0..60 {
1290 controller.offer(i).await.unwrap();
1291 }
1292
1293 assert!(controller.metrics_backpressure_events.get() > 0);
1295 }
1296}