1use anyhow::{anyhow, Result};
15use chrono::{DateTime, Duration as ChronoDuration, Utc};
16use scirs2_core::metrics::{Counter, Gauge, Histogram};
17use serde::{Deserialize, Serialize};
18use std::collections::VecDeque;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{Mutex, Semaphore};
23use tracing::{debug, info, warn};
24
25type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
29pub enum CircuitState {
30 #[default]
32 Closed,
33 Open,
35 HalfOpen,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum DegradationStrategy {
42 ReduceThroughput { reduction_percent: f64 },
44 SkipNonCritical,
46 ExpandBuffer { factor: f64 },
48 Sampling { sample_rate: f64 },
50 Combined(Vec<DegradationStrategy>),
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub enum BackpressureStrategy {
57 DropOldest,
59 DropNewest,
61 Block,
63 ExponentialBackoff {
65 initial_delay_ms: u64,
66 max_delay_ms: u64,
67 multiplier: f64,
68 },
69 Adaptive {
71 target_throughput: f64,
72 adjustment_factor: f64,
73 },
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum FlowControlSignal {
79 Proceed,
81 SlowDown,
83 Stop,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct CircuitBreakerConfig {
90 pub enabled: bool,
92 pub failure_threshold: u32,
94 pub success_threshold: u32,
96 pub timeout: Duration,
98 pub half_open_max_calls: u32,
100}
101
102impl Default for CircuitBreakerConfig {
103 fn default() -> Self {
104 Self {
105 enabled: true,
106 failure_threshold: 5,
107 success_threshold: 3,
108 timeout: Duration::from_secs(30),
109 half_open_max_calls: 3,
110 }
111 }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct BackpressureConfig {
117 pub max_buffer_size: usize,
119 pub strategy: BackpressureStrategy,
121 pub high_water_mark: f64,
123 pub low_water_mark: f64,
125 pub enable_adaptive: bool,
127 pub measurement_window: ChronoDuration,
129 pub circuit_breaker: CircuitBreakerConfig,
131 pub degradation: DegradationStrategy,
133}
134
135impl Default for BackpressureConfig {
136 fn default() -> Self {
137 Self {
138 max_buffer_size: 10000,
139 strategy: BackpressureStrategy::Block,
140 high_water_mark: 0.8,
141 low_water_mark: 0.2,
142 enable_adaptive: true,
143 measurement_window: ChronoDuration::seconds(10),
144 circuit_breaker: CircuitBreakerConfig::default(),
145 degradation: DegradationStrategy::ReduceThroughput {
146 reduction_percent: 50.0,
147 },
148 }
149 }
150}
151
152#[derive(Debug, Clone, Default)]
154pub struct BackpressureStats {
155 pub events_received: u64,
156 pub events_processed: u64,
157 pub events_dropped: u64,
158 pub events_blocked: u64,
159 pub buffer_size: usize,
160 pub buffer_utilization: f64,
161 pub current_throughput: f64,
162 pub backpressure_events: u64,
163 pub avg_latency_ms: f64,
164 pub circuit_state: CircuitState,
165 pub circuit_failures: u32,
166 pub circuit_successes: u32,
167 pub degradation_active: bool,
168}
169
170type TimestampedBuffer<T> = Arc<Mutex<VecDeque<(T, DateTime<Utc>)>>>;
172
173type ThroughputHistory = Arc<Mutex<VecDeque<(DateTime<Utc>, u64)>>>;
175
176#[derive(Debug, Clone)]
178pub struct BackpressureMetrics {
179 pub events_received: u64,
180 pub events_processed: u64,
181 pub events_dropped: u64,
182 pub queue_depth: f64,
183 pub latency_stats: scirs2_core::metrics::HistogramStats,
184 pub backpressure_events: u64,
185 pub circuit_state_changes: u64,
186}
187
188pub struct BackpressureController<T> {
190 config: BackpressureConfig,
191 buffer: TimestampedBuffer<T>,
192 stats: Arc<Mutex<BackpressureStats>>,
193 flow_control: Arc<Mutex<FlowControlSignal>>,
194 semaphore: Arc<Semaphore>,
195 throughput_history: ThroughputHistory,
196 circuit_state: Arc<Mutex<CircuitState>>,
198 circuit_failures: Arc<Mutex<u32>>,
199 circuit_successes: Arc<Mutex<u32>>,
200 circuit_last_failure: Arc<Mutex<Option<Instant>>>,
201 circuit_half_open_calls: Arc<Mutex<u32>>,
202 metrics_events_received: Arc<Counter>,
204 metrics_events_processed: Arc<Counter>,
205 metrics_events_dropped: Arc<Counter>,
206 metrics_queue_depth: Arc<Gauge>,
207 metrics_latency: Arc<Histogram>,
208 metrics_backpressure_events: Arc<Counter>,
209 metrics_circuit_state_changes: Arc<Counter>,
210}
211
212impl<T: Clone + Send> BackpressureController<T> {
213 pub fn new(config: BackpressureConfig) -> Self {
215 let max_permits = config.max_buffer_size;
216
217 let metrics_events_received =
219 Arc::new(Counter::new("backpressure_events_received".to_string()));
220 let metrics_events_processed =
221 Arc::new(Counter::new("backpressure_events_processed".to_string()));
222 let metrics_events_dropped =
223 Arc::new(Counter::new("backpressure_events_dropped".to_string()));
224 let metrics_queue_depth = Arc::new(Gauge::new("backpressure_queue_depth".to_string()));
225 let metrics_latency = Arc::new(Histogram::new("backpressure_latency_seconds".to_string()));
226 let metrics_backpressure_events =
227 Arc::new(Counter::new("backpressure_events_total".to_string()));
228 let metrics_circuit_state_changes = Arc::new(Counter::new(
229 "backpressure_circuit_state_changes".to_string(),
230 ));
231
232 Self {
233 config,
234 buffer: Arc::new(Mutex::new(VecDeque::new())),
235 stats: Arc::new(Mutex::new(BackpressureStats::default())),
236 flow_control: Arc::new(Mutex::new(FlowControlSignal::Proceed)),
237 semaphore: Arc::new(Semaphore::new(max_permits)),
238 throughput_history: Arc::new(Mutex::new(VecDeque::new())),
239 circuit_state: Arc::new(Mutex::new(CircuitState::Closed)),
240 circuit_failures: Arc::new(Mutex::new(0)),
241 circuit_successes: Arc::new(Mutex::new(0)),
242 circuit_last_failure: Arc::new(Mutex::new(None)),
243 circuit_half_open_calls: Arc::new(Mutex::new(0)),
244 metrics_events_received,
245 metrics_events_processed,
246 metrics_events_dropped,
247 metrics_queue_depth,
248 metrics_latency,
249 metrics_backpressure_events,
250 metrics_circuit_state_changes,
251 }
252 }
253
254 pub fn get_metrics(&self) -> BackpressureMetrics {
256 BackpressureMetrics {
257 events_received: self.metrics_events_received.get(),
258 events_processed: self.metrics_events_processed.get(),
259 events_dropped: self.metrics_events_dropped.get(),
260 queue_depth: self.metrics_queue_depth.get(),
261 latency_stats: self.metrics_latency.get_stats(),
262 backpressure_events: self.metrics_backpressure_events.get(),
263 circuit_state_changes: self.metrics_circuit_state_changes.get(),
264 }
265 }
266
267 async fn check_circuit_state(&self) -> Result<bool> {
269 if !self.config.circuit_breaker.enabled {
270 return Ok(true); }
272
273 let mut state = self.circuit_state.lock().await;
274 let circuit_config = &self.config.circuit_breaker;
275
276 match *state {
277 CircuitState::Closed => Ok(true),
278 CircuitState::Open => {
279 let last_failure = self.circuit_last_failure.lock().await;
280 if let Some(last_fail_time) = *last_failure {
281 if last_fail_time.elapsed() >= circuit_config.timeout {
282 *state = CircuitState::HalfOpen;
284 *self.circuit_half_open_calls.lock().await = 0;
285 self.metrics_circuit_state_changes.inc();
286 info!("Circuit breaker transitioned to HalfOpen");
287 Ok(true)
288 } else {
289 Ok(false) }
291 } else {
292 Ok(false)
293 }
294 }
295 CircuitState::HalfOpen => {
296 let mut half_open_calls = self.circuit_half_open_calls.lock().await;
297 if *half_open_calls < circuit_config.half_open_max_calls {
298 *half_open_calls += 1;
299 Ok(true)
300 } else {
301 Ok(false) }
303 }
304 }
305 }
306
307 async fn record_circuit_success(&self) {
309 if !self.config.circuit_breaker.enabled {
310 return;
311 }
312
313 let mut state = self.circuit_state.lock().await;
314 let circuit_config = &self.config.circuit_breaker;
315
316 match *state {
317 CircuitState::HalfOpen => {
318 let mut successes = self.circuit_successes.lock().await;
319 *successes += 1;
320 if *successes >= circuit_config.success_threshold {
321 *state = CircuitState::Closed;
323 *self.circuit_failures.lock().await = 0;
324 *successes = 0; self.metrics_circuit_state_changes.inc();
326 info!("Circuit breaker transitioned to Closed");
327 }
328 }
329 CircuitState::Closed => {
330 *self.circuit_failures.lock().await = 0;
332 }
333 CircuitState::Open => {
334 *state = CircuitState::Closed;
336 *self.circuit_failures.lock().await = 0;
337 self.metrics_circuit_state_changes.inc();
338 }
339 }
340 }
341
342 async fn record_circuit_failure(&self) {
344 if !self.config.circuit_breaker.enabled {
345 return;
346 }
347
348 let mut state = self.circuit_state.lock().await;
349 let circuit_config = &self.config.circuit_breaker;
350 let mut failures = self.circuit_failures.lock().await;
351
352 *failures += 1;
353 *self.circuit_last_failure.lock().await = Some(Instant::now());
354
355 if *failures >= circuit_config.failure_threshold && *state != CircuitState::Open {
356 *state = CircuitState::Open;
358 *self.circuit_successes.lock().await = 0;
359 self.metrics_circuit_state_changes.inc();
360 warn!(
361 "Circuit breaker transitioned to Open after {} failures",
362 failures
363 );
364 }
365 }
366
367 async fn apply_degradation(&self, _event: &T) -> Result<bool> {
369 if matches!(self.config.strategy, BackpressureStrategy::Block) {
375 return Ok(true);
376 }
377
378 let stats = self.stats.lock().await;
379 let utilization = stats.buffer_utilization;
380 drop(stats);
381
382 if utilization < self.config.high_water_mark {
384 return Ok(true); }
386
387 self.apply_degradation_strategy(&self.config.degradation)
388 .await
389 }
390
391 fn apply_degradation_strategy<'a>(
393 &'a self,
394 strategy: &'a DegradationStrategy,
395 ) -> BoxFuture<'a, Result<bool>> {
396 Box::pin(async move {
397 match strategy {
398 DegradationStrategy::ReduceThroughput { reduction_percent } => {
399 let threshold = 1.0 - (reduction_percent / 100.0);
401 Ok(fastrand::f64() < threshold)
402 }
403 DegradationStrategy::SkipNonCritical => {
404 Ok(true)
406 }
407 DegradationStrategy::ExpandBuffer { factor } => {
408 let expanded_size = (self.config.max_buffer_size as f64 * factor) as usize;
410 let buffer = self.buffer.lock().await;
411 Ok(buffer.len() < expanded_size)
412 }
413 DegradationStrategy::Sampling { sample_rate } => {
414 Ok(fastrand::f64() < *sample_rate)
416 }
417 DegradationStrategy::Combined(strategies) => {
418 for strat in strategies {
420 if !self.apply_degradation_strategy(strat).await? {
421 return Ok(false);
422 }
423 }
424 Ok(true)
425 }
426 }
427 })
428 }
429
430 pub async fn offer(&self, event: T) -> Result<()> {
432 self.metrics_events_received.inc();
434 let mut stats = self.stats.lock().await;
435 stats.events_received += 1;
436 drop(stats);
437
438 if !self.check_circuit_state().await? {
440 self.metrics_events_dropped.inc();
441 return Err(anyhow!("Circuit breaker is open"));
442 }
443
444 if !self.apply_degradation(&event).await? {
446 self.metrics_events_dropped.inc();
447 let mut stats = self.stats.lock().await;
448 stats.events_dropped += 1;
449 stats.degradation_active = true;
450 return Err(anyhow!("Event dropped due to graceful degradation"));
451 }
452
453 let result = match &self.config.strategy {
455 BackpressureStrategy::DropOldest => self.offer_drop_oldest(event).await,
456 BackpressureStrategy::DropNewest => self.offer_drop_newest(event).await,
457 BackpressureStrategy::Block => self.offer_blocking(event).await,
458 BackpressureStrategy::ExponentialBackoff {
459 initial_delay_ms,
460 max_delay_ms,
461 multiplier,
462 } => {
463 self.offer_with_backoff(event, *initial_delay_ms, *max_delay_ms, *multiplier)
464 .await
465 }
466 BackpressureStrategy::Adaptive {
467 target_throughput,
468 adjustment_factor,
469 } => {
470 self.offer_adaptive(event, *target_throughput, *adjustment_factor)
471 .await
472 }
473 };
474
475 match &result {
477 Ok(_) => self.record_circuit_success().await,
478 Err(_) => self.record_circuit_failure().await,
479 }
480
481 result
482 }
483
484 async fn offer_drop_oldest(&self, event: T) -> Result<()> {
486 let mut buffer = self.buffer.lock().await;
487
488 if buffer.len() >= self.config.max_buffer_size {
489 buffer.pop_front();
491
492 self.metrics_events_dropped.inc();
493 let mut stats = self.stats.lock().await;
494 stats.events_dropped += 1;
495 drop(stats);
496
497 warn!("Buffer full, dropped oldest event");
498 }
499
500 buffer.push_back((event, Utc::now()));
501 let buffer_len = buffer.len();
502 self.metrics_queue_depth.set(buffer_len as f64);
503 drop(buffer);
504
505 self.update_flow_control(buffer_len).await;
506
507 Ok(())
508 }
509
510 async fn offer_drop_newest(&self, event: T) -> Result<()> {
512 let mut buffer = self.buffer.lock().await;
513
514 if buffer.len() >= self.config.max_buffer_size {
515 self.metrics_events_dropped.inc();
516 let mut stats = self.stats.lock().await;
517 stats.events_dropped += 1;
518 drop(stats);
519
520 warn!("Buffer full, dropped newest event");
521 return Ok(());
522 }
523
524 buffer.push_back((event, Utc::now()));
525 let buffer_len = buffer.len();
526 self.metrics_queue_depth.set(buffer_len as f64);
527 drop(buffer);
528
529 self.update_flow_control(buffer_len).await;
530
531 Ok(())
532 }
533
534 async fn offer_blocking(&self, event: T) -> Result<()> {
536 let permit = self
544 .semaphore
545 .acquire()
546 .await
547 .map_err(|e| anyhow!("Failed to acquire semaphore: {}", e))?;
548
549 let mut buffer = self.buffer.lock().await;
550 buffer.push_back((event, Utc::now()));
551
552 let buffer_size = buffer.len();
553 self.metrics_queue_depth.set(buffer_size as f64);
554 drop(buffer);
555
556 permit.forget();
559
560 self.update_flow_control(buffer_size).await;
561
562 Ok(())
563 }
564
565 async fn offer_with_backoff(
567 &self,
568 event: T,
569 initial_delay_ms: u64,
570 max_delay_ms: u64,
571 multiplier: f64,
572 ) -> Result<()> {
573 let mut delay_ms = initial_delay_ms;
574 let mut retries = 0;
575 const MAX_RETRIES: u32 = 10;
576
577 loop {
578 let buffer = self.buffer.lock().await;
579 let buffer_size = buffer.len();
580 drop(buffer);
581
582 if buffer_size < self.config.max_buffer_size {
583 let mut buffer = self.buffer.lock().await;
584 buffer.push_back((event, Utc::now()));
585 drop(buffer);
586
587 self.update_flow_control(buffer_size + 1).await;
588 return Ok(());
589 }
590
591 if retries >= MAX_RETRIES {
592 let mut stats = self.stats.lock().await;
593 stats.events_dropped += 1;
594 return Err(anyhow!("Max retries exceeded, dropping event"));
595 }
596
597 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
599
600 delay_ms = ((delay_ms as f64 * multiplier) as u64).min(max_delay_ms);
601 retries += 1;
602
603 let mut stats = self.stats.lock().await;
604 stats.events_blocked += 1;
605 drop(stats);
606 }
607 }
608
609 async fn offer_adaptive(
611 &self,
612 event: T,
613 target_throughput: f64,
614 adjustment_factor: f64,
615 ) -> Result<()> {
616 let current_throughput = self.measure_throughput().await;
618
619 if current_throughput > target_throughput {
621 let delay_ms =
622 ((current_throughput / target_throughput - 1.0) * adjustment_factor) as u64;
623 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
624 }
625
626 let mut buffer = self.buffer.lock().await;
628
629 if buffer.len() >= self.config.max_buffer_size {
630 let mut stats = self.stats.lock().await;
631 stats.events_dropped += 1;
632 drop(stats);
633
634 return Err(anyhow!("Buffer full even with adaptive throttling"));
635 }
636
637 buffer.push_back((event, Utc::now()));
638 let buffer_size = buffer.len();
639 drop(buffer);
640
641 self.update_flow_control(buffer_size).await;
642
643 Ok(())
644 }
645
646 pub async fn poll(&self) -> Result<Option<T>> {
648 let mut buffer = self.buffer.lock().await;
649
650 if let Some((event, timestamp)) = buffer.pop_front() {
651 let buffer_size = buffer.len();
652 self.metrics_queue_depth.set(buffer_size as f64);
653 drop(buffer);
654
655 self.semaphore.add_permits(1);
657
658 let latency = (Utc::now() - timestamp).num_milliseconds() as f64;
660 self.metrics_latency.observe(latency / 1000.0); self.metrics_events_processed.inc();
664
665 let mut stats = self.stats.lock().await;
667 stats.events_processed += 1;
668
669 let alpha = 0.1;
670 stats.avg_latency_ms = alpha * latency + (1.0 - alpha) * stats.avg_latency_ms;
671
672 drop(stats);
673
674 self.update_flow_control(buffer_size).await;
675 self.record_throughput().await;
676
677 Ok(Some(event))
678 } else {
679 Ok(None)
680 }
681 }
682
683 async fn update_flow_control(&self, buffer_size: usize) {
685 let utilization = buffer_size as f64 / self.config.max_buffer_size as f64;
686
687 let signal = if utilization >= self.config.high_water_mark {
688 FlowControlSignal::Stop
689 } else if utilization >= self.config.low_water_mark {
690 FlowControlSignal::SlowDown
691 } else {
692 FlowControlSignal::Proceed
693 };
694
695 let mut flow_control = self.flow_control.lock().await;
696 if *flow_control != signal {
697 debug!(
698 "Flow control signal changed: {:?} -> {:?}",
699 *flow_control, signal
700 );
701
702 if signal != FlowControlSignal::Proceed {
703 self.metrics_backpressure_events.inc();
704 let mut stats = self.stats.lock().await;
705 stats.backpressure_events += 1;
706 }
707 }
708 *flow_control = signal;
709
710 let mut stats = self.stats.lock().await;
712 stats.buffer_size = buffer_size;
713 stats.buffer_utilization = utilization;
714 }
715
716 async fn record_throughput(&self) {
718 let now = Utc::now();
719 let mut history = self.throughput_history.lock().await;
720
721 history.push_back((now, 1));
722
723 let window_start = now - self.config.measurement_window;
725 while let Some((timestamp, _)) = history.front() {
726 if *timestamp < window_start {
727 history.pop_front();
728 } else {
729 break;
730 }
731 }
732 }
733
734 async fn measure_throughput(&self) -> f64 {
736 let now = Utc::now();
737 let history = self.throughput_history.lock().await;
738
739 if history.is_empty() {
740 return 0.0;
741 }
742
743 let window_start = now - self.config.measurement_window;
744 let count: u64 = history
745 .iter()
746 .filter(|(timestamp, _)| *timestamp >= window_start)
747 .map(|(_, count)| count)
748 .sum();
749
750 let elapsed_seconds = self.config.measurement_window.num_seconds() as f64;
751 count as f64 / elapsed_seconds
752 }
753
754 pub async fn flow_control_signal(&self) -> FlowControlSignal {
756 *self.flow_control.lock().await
757 }
758
759 pub async fn stats(&self) -> BackpressureStats {
761 let stats = self.stats.lock().await;
762 let mut result = stats.clone();
763
764 drop(stats);
766 result.current_throughput = self.measure_throughput().await;
767
768 result.circuit_state = *self.circuit_state.lock().await;
770 result.circuit_failures = *self.circuit_failures.lock().await;
771 result.circuit_successes = *self.circuit_successes.lock().await;
772
773 result
774 }
775
776 pub async fn circuit_state(&self) -> CircuitState {
778 *self.circuit_state.lock().await
779 }
780
781 pub async fn buffer_size(&self) -> usize {
783 self.buffer.lock().await.len()
784 }
785
786 pub async fn clear(&self) {
788 let mut buffer = self.buffer.lock().await;
789 let cleared_count = buffer.len();
790 buffer.clear();
791
792 self.semaphore.add_permits(cleared_count);
794
795 let mut stats = self.stats.lock().await;
796 stats.buffer_size = 0;
797 stats.buffer_utilization = 0.0;
798 }
799}
800
801pub struct RateLimiter {
803 tokens: Arc<Mutex<f64>>,
804 max_tokens: f64,
805 refill_rate: f64, last_refill: Arc<Mutex<DateTime<Utc>>>,
807}
808
809impl RateLimiter {
810 pub fn new(max_tokens: f64, refill_rate: f64) -> Self {
812 Self {
813 tokens: Arc::new(Mutex::new(max_tokens)),
814 max_tokens,
815 refill_rate,
816 last_refill: Arc::new(Mutex::new(Utc::now())),
817 }
818 }
819
820 pub async fn try_acquire(&self) -> bool {
822 self.refill_tokens().await;
823
824 let mut tokens = self.tokens.lock().await;
825 if *tokens >= 1.0 {
826 *tokens -= 1.0;
827 true
828 } else {
829 false
830 }
831 }
832
833 pub async fn acquire(&self) -> Result<()> {
835 loop {
836 if self.try_acquire().await {
837 return Ok(());
838 }
839
840 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
842 }
843 }
844
845 async fn refill_tokens(&self) {
847 let now = Utc::now();
848 let mut last_refill = self.last_refill.lock().await;
849
850 let elapsed = (now - *last_refill).num_milliseconds() as f64 / 1000.0;
851 let new_tokens = elapsed * self.refill_rate;
852
853 if new_tokens > 0.0 {
854 let mut tokens = self.tokens.lock().await;
855 *tokens = (*tokens + new_tokens).min(self.max_tokens);
856 *last_refill = now;
857 }
858 }
859
860 pub async fn available_tokens(&self) -> f64 {
862 self.refill_tokens().await;
863 *self.tokens.lock().await
864 }
865}
866
867#[cfg(test)]
868mod tests {
869 use super::*;
870
871 #[tokio::test]
872 async fn test_backpressure_drop_oldest() {
873 let config = BackpressureConfig {
874 max_buffer_size: 3,
875 strategy: BackpressureStrategy::DropOldest,
876 high_water_mark: 1.5, ..Default::default()
878 };
879
880 let controller = BackpressureController::new(config);
881
882 for i in 0..5 {
884 controller.offer(i).await.unwrap();
885 }
886
887 assert_eq!(controller.buffer_size().await, 3);
889
890 let event = controller.poll().await.unwrap().unwrap();
892 assert_eq!(event, 2);
893 }
894
895 #[tokio::test]
896 async fn test_backpressure_drop_newest() {
897 let config = BackpressureConfig {
898 max_buffer_size: 3,
899 strategy: BackpressureStrategy::DropNewest,
900 high_water_mark: 1.5, ..Default::default()
902 };
903
904 let controller = BackpressureController::new(config);
905
906 for i in 0..5 {
908 controller.offer(i).await.unwrap();
909 }
910
911 assert_eq!(controller.buffer_size().await, 3);
913
914 let event = controller.poll().await.unwrap().unwrap();
915 assert_eq!(event, 0);
916 }
917
918 #[tokio::test]
919 async fn test_flow_control_signals() {
920 let config = BackpressureConfig {
921 max_buffer_size: 100,
922 high_water_mark: 0.8,
923 low_water_mark: 0.2,
924 degradation: DegradationStrategy::ReduceThroughput {
925 reduction_percent: 0.0, },
927 ..Default::default()
928 };
929
930 let controller = BackpressureController::new(config);
931
932 assert_eq!(
934 controller.flow_control_signal().await,
935 FlowControlSignal::Proceed
936 );
937
938 for i in 0..30 {
940 controller.offer(i).await.unwrap();
941 }
942
943 assert_eq!(
944 controller.flow_control_signal().await,
945 FlowControlSignal::SlowDown
946 );
947
948 for i in 30..85 {
950 controller.offer(i).await.unwrap();
951 }
952
953 assert_eq!(
954 controller.flow_control_signal().await,
955 FlowControlSignal::Stop
956 );
957 }
958
959 #[tokio::test]
960 async fn test_rate_limiter() {
961 let limiter = RateLimiter::new(10.0, 10.0); for _ in 0..10 {
965 assert!(limiter.try_acquire().await);
966 }
967
968 assert!(!limiter.try_acquire().await);
970
971 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
973
974 assert!(limiter.try_acquire().await);
976 }
977
978 #[tokio::test]
980 async fn test_circuit_breaker_closed_to_open() {
981 let config = BackpressureConfig {
982 max_buffer_size: 100,
983 strategy: BackpressureStrategy::Block,
984 circuit_breaker: CircuitBreakerConfig {
985 enabled: true,
986 failure_threshold: 3,
987 success_threshold: 2,
988 timeout: Duration::from_millis(100),
989 half_open_max_calls: 2,
990 },
991 ..Default::default()
992 };
993
994 let controller = BackpressureController::<i32>::new(config);
995
996 assert_eq!(controller.circuit_state().await, CircuitState::Closed);
998
999 for _ in 0..3 {
1001 controller.record_circuit_failure().await;
1002 }
1003
1004 assert_eq!(controller.circuit_state().await, CircuitState::Open);
1006 }
1007
1008 #[tokio::test]
1009 async fn test_circuit_breaker_open_to_half_open() {
1010 let config = BackpressureConfig {
1011 max_buffer_size: 100,
1012 strategy: BackpressureStrategy::Block,
1013 circuit_breaker: CircuitBreakerConfig {
1014 enabled: true,
1015 failure_threshold: 3,
1016 success_threshold: 2,
1017 timeout: Duration::from_millis(50),
1018 half_open_max_calls: 2,
1019 },
1020 ..Default::default()
1021 };
1022
1023 let controller = BackpressureController::<i32>::new(config);
1024
1025 for _ in 0..3 {
1027 controller.record_circuit_failure().await;
1028 }
1029
1030 assert_eq!(controller.circuit_state().await, CircuitState::Open);
1031
1032 tokio::time::sleep(Duration::from_millis(100)).await;
1034
1035 let _ = controller.check_circuit_state().await;
1037 assert_eq!(controller.circuit_state().await, CircuitState::HalfOpen);
1038 }
1039
1040 #[tokio::test]
1041 async fn test_circuit_breaker_half_open_to_closed() {
1042 let config = BackpressureConfig {
1043 max_buffer_size: 100, strategy: BackpressureStrategy::Block,
1045 circuit_breaker: CircuitBreakerConfig {
1046 enabled: true,
1047 failure_threshold: 2,
1048 success_threshold: 2,
1049 timeout: Duration::from_millis(50),
1050 half_open_max_calls: 5,
1051 },
1052 ..Default::default()
1053 };
1054
1055 let controller = BackpressureController::<i32>::new(config);
1056
1057 *controller.circuit_state.lock().await = CircuitState::HalfOpen;
1059
1060 for _ in 0..2 {
1062 controller.record_circuit_success().await;
1063 }
1064
1065 assert_eq!(controller.circuit_state().await, CircuitState::Closed);
1067 }
1068
1069 #[tokio::test]
1071 async fn test_stress_high_load() {
1072 let config = BackpressureConfig {
1073 max_buffer_size: 1000,
1074 strategy: BackpressureStrategy::DropOldest,
1075 ..Default::default()
1076 };
1077
1078 let controller = Arc::new(BackpressureController::new(config));
1079
1080 let mut handles = vec![];
1082 for producer_id in 0..10 {
1083 let controller_clone = controller.clone();
1084 let handle = tokio::spawn(async move {
1085 for i in 0..1000 {
1086 let value = producer_id * 1000 + i;
1087 let _ = controller_clone.offer(value).await;
1088 }
1089 });
1090 handles.push(handle);
1091 }
1092
1093 for handle in handles {
1095 handle.await.unwrap();
1096 }
1097
1098 let stats = controller.stats().await;
1100 assert_eq!(stats.events_received, 10000);
1101 assert!(stats.buffer_size <= 1000);
1102 }
1103
1104 #[tokio::test]
1105 async fn test_stress_concurrent_offer_and_poll() {
1106 let config = BackpressureConfig {
1107 max_buffer_size: 500,
1108 strategy: BackpressureStrategy::Block,
1109 ..Default::default()
1110 };
1111
1112 let controller = Arc::new(BackpressureController::new(config));
1113
1114 let producer_controller = controller.clone();
1116 let producer = tokio::spawn(async move {
1117 for i in 0..5000 {
1118 let _ = producer_controller.offer(i).await;
1119 }
1120 });
1121
1122 let consumer_controller = controller.clone();
1124 let consumer = tokio::spawn(async move {
1125 let mut count = 0;
1126 let timeout_duration = Duration::from_secs(10);
1127 let start_time = Instant::now();
1128 let mut consecutive_empty_polls = 0;
1129
1130 loop {
1131 if start_time.elapsed() > timeout_duration {
1133 panic!(
1134 "Consumer timeout after 10 seconds, consumed {} events",
1135 count
1136 );
1137 }
1138
1139 match consumer_controller.poll().await {
1140 Ok(Some(_)) => {
1141 count += 1;
1142 consecutive_empty_polls = 0;
1143 if count >= 5000 {
1144 break;
1145 }
1146 }
1148 Ok(None) => {
1149 consecutive_empty_polls += 1;
1151 let sleep_duration = if consecutive_empty_polls < 5 {
1152 Duration::from_micros(100) } else if consecutive_empty_polls < 20 {
1154 Duration::from_micros(500) } else {
1156 Duration::from_millis(1) };
1158 tokio::time::sleep(sleep_duration).await;
1159 }
1160 Err(_) => {
1161 tokio::time::sleep(Duration::from_micros(500)).await;
1163 }
1164 }
1165 }
1166 count
1167 });
1168
1169 let producer_result = tokio::time::timeout(Duration::from_secs(10), producer).await;
1171 assert!(producer_result.is_ok(), "Producer timeout");
1172 producer_result.unwrap().unwrap();
1173
1174 let consumer_result = tokio::time::timeout(Duration::from_secs(10), consumer).await;
1175 assert!(consumer_result.is_ok(), "Consumer timeout");
1176 let consumed = consumer_result.unwrap().unwrap();
1177
1178 assert_eq!(consumed, 5000);
1179
1180 let stats = controller.stats().await;
1182 assert_eq!(stats.events_received, 5000);
1183 assert_eq!(stats.events_processed, 5000);
1184 }
1185
1186 #[tokio::test]
1188 async fn test_degradation_reduce_throughput() {
1189 let config = BackpressureConfig {
1190 max_buffer_size: 10,
1191 strategy: BackpressureStrategy::DropOldest,
1192 high_water_mark: 0.5, degradation: DegradationStrategy::ReduceThroughput {
1194 reduction_percent: 50.0,
1195 },
1196 ..Default::default()
1197 };
1198
1199 let controller = BackpressureController::new(config);
1200
1201 for i in 0..20 {
1203 let _ = controller.offer(i).await;
1204 }
1205
1206 let stats = controller.stats().await;
1207 assert!(stats.events_dropped > 0);
1209 }
1210
1211 #[tokio::test]
1212 async fn test_degradation_sampling() {
1213 let config = BackpressureConfig {
1214 max_buffer_size: 10,
1215 strategy: BackpressureStrategy::DropOldest,
1216 high_water_mark: 0.5,
1217 degradation: DegradationStrategy::Sampling { sample_rate: 0.5 },
1218 ..Default::default()
1219 };
1220
1221 let controller = BackpressureController::new(config);
1222
1223 for i in 0..20 {
1225 let _ = controller.offer(i).await;
1226 }
1227
1228 let stats = controller.stats().await;
1229 assert_eq!(stats.events_received, 20);
1231 assert!(stats.events_dropped > 0); assert!(stats.buffer_size < 20); }
1234
1235 #[tokio::test]
1237 async fn test_metrics_collection() {
1238 let config = BackpressureConfig {
1239 max_buffer_size: 100,
1240 strategy: BackpressureStrategy::Block,
1241 ..Default::default()
1242 };
1243
1244 let controller = BackpressureController::new(config);
1245
1246 assert_eq!(controller.metrics_events_received.get(), 0);
1248 assert_eq!(controller.metrics_events_processed.get(), 0);
1249
1250 for i in 0..10 {
1252 controller.offer(i).await.unwrap();
1253 }
1254
1255 assert_eq!(controller.metrics_events_received.get(), 10);
1256
1257 for _ in 0..5 {
1258 controller.poll().await.unwrap();
1259 }
1260
1261 assert_eq!(controller.metrics_events_processed.get(), 5);
1262 assert_eq!(controller.metrics_queue_depth.get(), 5.0);
1263 }
1264
1265 #[tokio::test]
1266 async fn test_metrics_latency() {
1267 let config = BackpressureConfig {
1268 max_buffer_size: 100,
1269 strategy: BackpressureStrategy::Block,
1270 ..Default::default()
1271 };
1272
1273 let controller = BackpressureController::new(config);
1274
1275 for i in 0..10 {
1277 controller.offer(i).await.unwrap();
1278 }
1279
1280 tokio::time::sleep(Duration::from_millis(10)).await;
1282
1283 for _ in 0..10 {
1285 controller.poll().await.unwrap();
1286 }
1287
1288 let stats = controller.metrics_latency.get_stats();
1290 assert!(stats.count == 10);
1291 assert!(stats.mean > 0.0);
1292 }
1293
1294 #[tokio::test]
1295 async fn test_metrics_backpressure_events() {
1296 let config = BackpressureConfig {
1297 max_buffer_size: 100,
1298 strategy: BackpressureStrategy::DropOldest,
1299 high_water_mark: 0.5,
1300 degradation: DegradationStrategy::ReduceThroughput {
1301 reduction_percent: 0.0, },
1303 ..Default::default()
1304 };
1305
1306 let controller = BackpressureController::new(config);
1307
1308 for i in 0..60 {
1310 controller.offer(i).await.unwrap();
1311 }
1312
1313 assert!(controller.metrics_backpressure_events.get() > 0);
1315 }
1316}