1use parking_lot::RwLock;
17use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use tokio::sync::{mpsc, Mutex, Notify, Semaphore};
21
22#[derive(Debug)]
25pub struct TokenBucket {
26 tokens: AtomicU64,
28 capacity: u64,
30 refill_rate: f64,
32 created: Instant,
34 last_refill: AtomicU64,
36 stats: TokenBucketStats,
38}
39
40impl TokenBucket {
41 pub fn new(capacity: u64, refill_rate: f64) -> Self {
43 let now = Instant::now();
44 Self {
45 tokens: AtomicU64::new(capacity),
46 capacity,
47 refill_rate,
48 created: now,
49 last_refill: AtomicU64::new(0),
50 stats: TokenBucketStats::new(),
51 }
52 }
53
54 pub fn try_acquire(&self, count: u64) -> bool {
56 self.refill();
57
58 loop {
59 let current = self.tokens.load(Ordering::Acquire);
60 if current < count {
61 self.stats.rejected.fetch_add(1, Ordering::Relaxed);
62 return false;
63 }
64
65 if self
66 .tokens
67 .compare_exchange_weak(
68 current,
69 current - count,
70 Ordering::AcqRel,
71 Ordering::Relaxed,
72 )
73 .is_ok()
74 {
75 self.stats.acquired.fetch_add(count, Ordering::Relaxed);
76 return true;
77 }
78 }
79 }
80
81 pub async fn acquire(&self, count: u64) {
83 while !self.try_acquire(count) {
84 let tokens_needed = count.saturating_sub(self.tokens.load(Ordering::Relaxed));
86 let wait_secs = tokens_needed as f64 / self.refill_rate;
87 let wait_duration = Duration::from_secs_f64(wait_secs.max(0.001));
88
89 tokio::time::sleep(wait_duration).await;
90 }
91 }
92
93 fn refill(&self) {
95 let now = self.created.elapsed().as_nanos() as u64;
96 let last = self.last_refill.load(Ordering::Acquire);
97
98 if now <= last {
99 return;
100 }
101
102 let elapsed_secs = (now - last) as f64 / 1_000_000_000.0;
103 let new_tokens = (elapsed_secs * self.refill_rate) as u64;
104
105 if new_tokens == 0 {
106 return;
107 }
108
109 if self
110 .last_refill
111 .compare_exchange(last, now, Ordering::AcqRel, Ordering::Relaxed)
112 .is_ok()
113 {
114 loop {
115 let current = self.tokens.load(Ordering::Acquire);
116 let new_total = (current + new_tokens).min(self.capacity);
117
118 if self
119 .tokens
120 .compare_exchange_weak(current, new_total, Ordering::AcqRel, Ordering::Relaxed)
121 .is_ok()
122 {
123 break;
124 }
125 }
126 }
127 }
128
129 pub fn available(&self) -> u64 {
131 self.refill();
132 self.tokens.load(Ordering::Relaxed)
133 }
134
135 pub fn stats(&self) -> TokenBucketStatsSnapshot {
137 TokenBucketStatsSnapshot {
138 acquired: self.stats.acquired.load(Ordering::Relaxed),
139 rejected: self.stats.rejected.load(Ordering::Relaxed),
140 available: self.available(),
141 capacity: self.capacity,
142 }
143 }
144}
145
146#[derive(Debug)]
147struct TokenBucketStats {
148 acquired: AtomicU64,
149 rejected: AtomicU64,
150}
151
152impl TokenBucketStats {
153 fn new() -> Self {
154 Self {
155 acquired: AtomicU64::new(0),
156 rejected: AtomicU64::new(0),
157 }
158 }
159}
160
161#[derive(Debug, Clone)]
162pub struct TokenBucketStatsSnapshot {
163 pub acquired: u64,
164 pub rejected: u64,
165 pub available: u64,
166 pub capacity: u64,
167}
168
169#[derive(Debug)]
172pub struct CreditFlowControl {
173 credits: AtomicU64,
175 max_credits: u64,
177 credit_notify: Notify,
179 stats: CreditStats,
181}
182
183impl CreditFlowControl {
184 pub fn new(initial_credits: u64, max_credits: u64) -> Self {
186 Self {
187 credits: AtomicU64::new(initial_credits),
188 max_credits,
189 credit_notify: Notify::new(),
190 stats: CreditStats::new(),
191 }
192 }
193
194 pub fn try_consume(&self, count: u64) -> bool {
196 loop {
197 let current = self.credits.load(Ordering::Acquire);
198 if current < count {
199 self.stats.blocked.fetch_add(1, Ordering::Relaxed);
200 return false;
201 }
202
203 if self
204 .credits
205 .compare_exchange_weak(
206 current,
207 current - count,
208 Ordering::AcqRel,
209 Ordering::Relaxed,
210 )
211 .is_ok()
212 {
213 self.stats.consumed.fetch_add(count, Ordering::Relaxed);
214 return true;
215 }
216 }
217 }
218
219 pub async fn consume(&self, count: u64) {
221 while !self.try_consume(count) {
222 self.credit_notify.notified().await;
223 }
224 }
225
226 pub fn grant(&self, count: u64) {
228 loop {
229 let current = self.credits.load(Ordering::Acquire);
230 let new_total = (current + count).min(self.max_credits);
231
232 if self
233 .credits
234 .compare_exchange_weak(current, new_total, Ordering::AcqRel, Ordering::Relaxed)
235 .is_ok()
236 {
237 self.stats.granted.fetch_add(count, Ordering::Relaxed);
238 self.credit_notify.notify_waiters();
239 break;
240 }
241 }
242 }
243
244 pub fn available(&self) -> u64 {
246 self.credits.load(Ordering::Relaxed)
247 }
248
249 pub fn stats(&self) -> CreditStatsSnapshot {
251 CreditStatsSnapshot {
252 consumed: self.stats.consumed.load(Ordering::Relaxed),
253 granted: self.stats.granted.load(Ordering::Relaxed),
254 blocked: self.stats.blocked.load(Ordering::Relaxed),
255 available: self.available(),
256 }
257 }
258}
259
260#[derive(Debug)]
261struct CreditStats {
262 consumed: AtomicU64,
263 granted: AtomicU64,
264 blocked: AtomicU64,
265}
266
267impl CreditStats {
268 fn new() -> Self {
269 Self {
270 consumed: AtomicU64::new(0),
271 granted: AtomicU64::new(0),
272 blocked: AtomicU64::new(0),
273 }
274 }
275}
276
277#[derive(Debug, Clone)]
278pub struct CreditStatsSnapshot {
279 pub consumed: u64,
280 pub granted: u64,
281 pub blocked: u64,
282 pub available: u64,
283}
284
285#[derive(Debug)]
288pub struct AdaptiveRateLimiter {
289 rate: AtomicU64,
291 min_rate: u64,
293 max_rate: u64,
295 target_latency_us: u64,
297 additive_increase: u64,
299 multiplicative_decrease: f64,
301 bucket: TokenBucket,
303 latency_samples: RwLock<LatencySamples>,
305 stats: AdaptiveStats,
307}
308
309#[derive(Debug)]
310struct LatencySamples {
311 samples: Vec<u64>,
312 index: usize,
313 filled: bool,
314}
315
316impl LatencySamples {
317 fn new(size: usize) -> Self {
318 Self {
319 samples: vec![0; size],
320 index: 0,
321 filled: false,
322 }
323 }
324
325 fn add(&mut self, latency_us: u64) {
326 self.samples[self.index] = latency_us;
327 self.index = (self.index + 1) % self.samples.len();
328 if self.index == 0 {
329 self.filled = true;
330 }
331 }
332
333 fn percentile(&self, p: f64) -> u64 {
334 let count = if self.filled {
335 self.samples.len()
336 } else {
337 self.index
338 };
339 if count == 0 {
340 return 0;
341 }
342
343 let mut sorted: Vec<u64> = self.samples[..count].to_vec();
344 sorted.sort_unstable();
345
346 let idx = ((count as f64 * p) as usize).min(count - 1);
347 sorted[idx]
348 }
349}
350
351impl AdaptiveRateLimiter {
352 pub fn new(config: AdaptiveRateLimiterConfig) -> Self {
354 let bucket = TokenBucket::new(config.initial_rate, config.initial_rate as f64);
355
356 Self {
357 rate: AtomicU64::new(config.initial_rate),
358 min_rate: config.min_rate,
359 max_rate: config.max_rate,
360 target_latency_us: config.target_latency_us,
361 additive_increase: config.additive_increase,
362 multiplicative_decrease: config.multiplicative_decrease,
363 bucket,
364 latency_samples: RwLock::new(LatencySamples::new(100)),
365 stats: AdaptiveStats::new(),
366 }
367 }
368
369 pub fn try_acquire(&self) -> bool {
371 self.bucket.try_acquire(1)
372 }
373
374 pub async fn acquire(&self) {
376 self.bucket.acquire(1).await
377 }
378
379 pub fn record_latency(&self, latency: Duration) {
381 let latency_us = latency.as_micros() as u64;
382
383 {
384 let mut samples = self.latency_samples.write();
385 samples.add(latency_us);
386 }
387
388 let p99 = {
390 let samples = self.latency_samples.read();
391 samples.percentile(0.99)
392 };
393
394 let current_rate = self.rate.load(Ordering::Relaxed);
395
396 if p99 > self.target_latency_us {
397 let new_rate =
399 ((current_rate as f64 * self.multiplicative_decrease) as u64).max(self.min_rate);
400 self.rate.store(new_rate, Ordering::Relaxed);
401 self.stats.decreases.fetch_add(1, Ordering::Relaxed);
402 } else if p99 < self.target_latency_us / 2 {
403 let new_rate = (current_rate + self.additive_increase).min(self.max_rate);
405 self.rate.store(new_rate, Ordering::Relaxed);
406 self.stats.increases.fetch_add(1, Ordering::Relaxed);
407 }
408 }
409
410 pub fn current_rate(&self) -> u64 {
412 self.rate.load(Ordering::Relaxed)
413 }
414
415 pub fn stats(&self) -> AdaptiveStatsSnapshot {
417 let samples = self.latency_samples.read();
418 AdaptiveStatsSnapshot {
419 current_rate: self.current_rate(),
420 increases: self.stats.increases.load(Ordering::Relaxed),
421 decreases: self.stats.decreases.load(Ordering::Relaxed),
422 p50_latency_us: samples.percentile(0.5),
423 p99_latency_us: samples.percentile(0.99),
424 }
425 }
426}
427
428#[derive(Debug, Clone)]
430pub struct AdaptiveRateLimiterConfig {
431 pub initial_rate: u64,
432 pub min_rate: u64,
433 pub max_rate: u64,
434 pub target_latency_us: u64,
435 pub additive_increase: u64,
436 pub multiplicative_decrease: f64,
437}
438
439impl Default for AdaptiveRateLimiterConfig {
440 fn default() -> Self {
441 Self {
442 initial_rate: 10000,
443 min_rate: 100,
444 max_rate: 1000000,
445 target_latency_us: 10000, additive_increase: 100,
447 multiplicative_decrease: 0.5,
448 }
449 }
450}
451
452#[derive(Debug)]
453struct AdaptiveStats {
454 increases: AtomicU64,
455 decreases: AtomicU64,
456}
457
458impl AdaptiveStats {
459 fn new() -> Self {
460 Self {
461 increases: AtomicU64::new(0),
462 decreases: AtomicU64::new(0),
463 }
464 }
465}
466
467#[derive(Debug, Clone)]
468pub struct AdaptiveStatsSnapshot {
469 pub current_rate: u64,
470 pub increases: u64,
471 pub decreases: u64,
472 pub p50_latency_us: u64,
473 pub p99_latency_us: u64,
474}
475
476#[derive(Debug)]
478pub struct CircuitBreaker {
479 state: RwLock<CircuitState>,
481 config: CircuitBreakerConfig,
483 stats: CircuitBreakerStats,
485}
486
487#[derive(Debug, Clone, Copy, PartialEq, Eq)]
488pub enum CircuitState {
489 Closed,
491 Open { opened_at: Instant },
493 HalfOpen,
495}
496
497#[derive(Debug, Clone)]
499pub struct CircuitBreakerConfig {
500 pub failure_threshold: u32,
502 pub failure_window: Duration,
504 pub recovery_timeout: Duration,
506 pub success_threshold: u32,
508}
509
510impl Default for CircuitBreakerConfig {
511 fn default() -> Self {
512 Self {
513 failure_threshold: 5,
514 failure_window: Duration::from_secs(60),
515 recovery_timeout: Duration::from_secs(30),
516 success_threshold: 3,
517 }
518 }
519}
520
521impl CircuitBreaker {
522 pub fn new(config: CircuitBreakerConfig) -> Self {
524 Self {
525 state: RwLock::new(CircuitState::Closed),
526 config,
527 stats: CircuitBreakerStats::new(),
528 }
529 }
530
531 pub fn allow(&self) -> bool {
533 let state = *self.state.read();
534
535 match state {
536 CircuitState::Closed => {
537 self.stats.allowed.fetch_add(1, Ordering::Relaxed);
538 true
539 }
540 CircuitState::Open { opened_at } => {
541 if opened_at.elapsed() > self.config.recovery_timeout {
542 *self.state.write() = CircuitState::HalfOpen;
544 self.stats.allowed.fetch_add(1, Ordering::Relaxed);
545 true
546 } else {
547 self.stats.rejected.fetch_add(1, Ordering::Relaxed);
548 false
549 }
550 }
551 CircuitState::HalfOpen => {
552 self.stats.allowed.fetch_add(1, Ordering::Relaxed);
554 true
555 }
556 }
557 }
558
559 pub fn record_success(&self) {
561 let mut state = self.state.write();
562 self.stats.successes.fetch_add(1, Ordering::Relaxed);
563
564 match *state {
565 CircuitState::HalfOpen => {
566 let successes = self
567 .stats
568 .half_open_successes
569 .fetch_add(1, Ordering::Relaxed)
570 + 1;
571 if successes >= self.config.success_threshold as u64 {
572 *state = CircuitState::Closed;
573 self.stats.half_open_successes.store(0, Ordering::Relaxed);
574 self.stats.failures_in_window.store(0, Ordering::Relaxed);
575 }
576 }
577 CircuitState::Closed => {
578 self.stats.failures_in_window.store(0, Ordering::Relaxed);
580 }
581 _ => {}
582 }
583 }
584
585 pub fn record_failure(&self) {
587 let mut state = self.state.write();
588 self.stats.failures.fetch_add(1, Ordering::Relaxed);
589
590 match *state {
591 CircuitState::Closed => {
592 let failures = self
593 .stats
594 .failures_in_window
595 .fetch_add(1, Ordering::Relaxed)
596 + 1;
597 if failures >= self.config.failure_threshold as u64 {
598 *state = CircuitState::Open {
599 opened_at: Instant::now(),
600 };
601 self.stats.opens.fetch_add(1, Ordering::Relaxed);
602 }
603 }
604 CircuitState::HalfOpen => {
605 *state = CircuitState::Open {
607 opened_at: Instant::now(),
608 };
609 self.stats.half_open_successes.store(0, Ordering::Relaxed);
610 self.stats.opens.fetch_add(1, Ordering::Relaxed);
611 }
612 _ => {}
613 }
614 }
615
616 pub fn state(&self) -> CircuitState {
618 *self.state.read()
619 }
620
621 pub fn stats(&self) -> CircuitBreakerStatsSnapshot {
623 CircuitBreakerStatsSnapshot {
624 state: self.state(),
625 allowed: self.stats.allowed.load(Ordering::Relaxed),
626 rejected: self.stats.rejected.load(Ordering::Relaxed),
627 successes: self.stats.successes.load(Ordering::Relaxed),
628 failures: self.stats.failures.load(Ordering::Relaxed),
629 opens: self.stats.opens.load(Ordering::Relaxed),
630 }
631 }
632}
633
634#[derive(Debug)]
635struct CircuitBreakerStats {
636 allowed: AtomicU64,
637 rejected: AtomicU64,
638 successes: AtomicU64,
639 failures: AtomicU64,
640 failures_in_window: AtomicU64,
641 half_open_successes: AtomicU64,
642 opens: AtomicU64,
643}
644
645impl CircuitBreakerStats {
646 fn new() -> Self {
647 Self {
648 allowed: AtomicU64::new(0),
649 rejected: AtomicU64::new(0),
650 successes: AtomicU64::new(0),
651 failures: AtomicU64::new(0),
652 failures_in_window: AtomicU64::new(0),
653 half_open_successes: AtomicU64::new(0),
654 opens: AtomicU64::new(0),
655 }
656 }
657}
658
659#[derive(Debug, Clone)]
660pub struct CircuitBreakerStatsSnapshot {
661 pub state: CircuitState,
662 pub allowed: u64,
663 pub rejected: u64,
664 pub successes: u64,
665 pub failures: u64,
666 pub opens: u64,
667}
668
669pub struct BackpressureChannel<T> {
672 tx: mpsc::Sender<T>,
674 rx: Mutex<mpsc::Receiver<T>>,
676 permits: Arc<Semaphore>,
678 capacity: usize,
680 stats: ChannelStats,
682}
683
684impl<T> BackpressureChannel<T> {
685 pub fn new(capacity: usize) -> Self {
687 let (tx, rx) = mpsc::channel(capacity);
689
690 Self {
691 tx,
692 rx: Mutex::new(rx),
693 permits: Arc::new(Semaphore::new(0)), capacity,
695 stats: ChannelStats::new(),
696 }
697 }
698
699 pub async fn send(&self, value: T) -> Result<(), mpsc::error::SendError<T>> {
701 self.stats.sent.fetch_add(1, Ordering::Relaxed);
702 let result = self.tx.send(value).await;
703 if result.is_ok() {
704 self.permits.add_permits(1);
706 }
707 result
708 }
709
710 pub fn try_send(&self, value: T) -> Result<(), mpsc::error::TrySendError<T>> {
712 let result = self.tx.try_send(value);
713 match &result {
714 Ok(()) => {
715 self.stats.sent.fetch_add(1, Ordering::Relaxed);
716 self.permits.add_permits(1);
717 }
718 Err(mpsc::error::TrySendError::Full(_)) => {
719 self.stats.blocked.fetch_add(1, Ordering::Relaxed);
720 }
721 _ => {}
722 }
723 result
724 }
725
726 pub async fn recv(&self) -> Option<T> {
728 let permit = self.permits.acquire().await.ok()?;
730 permit.forget(); let mut rx = self.rx.lock().await;
733 let result = rx.recv().await;
734
735 if result.is_some() {
736 self.stats.received.fetch_add(1, Ordering::Relaxed);
737 }
738
739 result
740 }
741
742 pub fn len(&self) -> usize {
744 self.permits.available_permits()
745 }
746
747 pub fn is_empty(&self) -> bool {
749 self.len() == 0
750 }
751
752 pub fn stats(&self) -> ChannelStatsSnapshot {
754 ChannelStatsSnapshot {
755 sent: self.stats.sent.load(Ordering::Relaxed),
756 received: self.stats.received.load(Ordering::Relaxed),
757 blocked: self.stats.blocked.load(Ordering::Relaxed),
758 current_len: self.len(),
759 capacity: self.capacity,
760 }
761 }
762}
763
764struct ChannelStats {
765 sent: AtomicU64,
766 received: AtomicU64,
767 blocked: AtomicU64,
768}
769
770impl ChannelStats {
771 fn new() -> Self {
772 Self {
773 sent: AtomicU64::new(0),
774 received: AtomicU64::new(0),
775 blocked: AtomicU64::new(0),
776 }
777 }
778}
779
780#[derive(Debug, Clone)]
781pub struct ChannelStatsSnapshot {
782 pub sent: u64,
783 pub received: u64,
784 pub blocked: u64,
785 pub current_len: usize,
786 pub capacity: usize,
787}
788
789pub struct WindowedRateTracker {
791 buckets: RwLock<Vec<AtomicU64>>,
793 bucket_duration: Duration,
795 num_buckets: usize,
797 current_bucket: AtomicUsize,
799 last_rotation: RwLock<Instant>,
801}
802
803impl WindowedRateTracker {
804 pub fn new(window_duration: Duration, num_buckets: usize) -> Self {
806 let buckets: Vec<AtomicU64> = (0..num_buckets).map(|_| AtomicU64::new(0)).collect();
807
808 Self {
809 buckets: RwLock::new(buckets),
810 bucket_duration: window_duration / num_buckets as u32,
811 num_buckets,
812 current_bucket: AtomicUsize::new(0),
813 last_rotation: RwLock::new(Instant::now()),
814 }
815 }
816
817 pub fn record(&self, count: u64) {
819 self.maybe_rotate();
820
821 let buckets = self.buckets.read();
822 let idx = self.current_bucket.load(Ordering::Relaxed);
823 buckets[idx].fetch_add(count, Ordering::Relaxed);
824 }
825
826 pub fn rate(&self) -> f64 {
828 self.maybe_rotate();
829
830 let buckets = self.buckets.read();
831 let total: u64 = buckets.iter().map(|b| b.load(Ordering::Relaxed)).sum();
832
833 let window_secs = self.bucket_duration.as_secs_f64() * self.num_buckets as f64;
834 total as f64 / window_secs
835 }
836
837 pub fn total(&self) -> u64 {
839 self.maybe_rotate();
840
841 let buckets = self.buckets.read();
842 buckets.iter().map(|b| b.load(Ordering::Relaxed)).sum()
843 }
844
845 fn maybe_rotate(&self) {
847 let now = Instant::now();
848 let elapsed = {
849 let last = self.last_rotation.read();
850 now.duration_since(*last)
851 };
852
853 if elapsed < self.bucket_duration {
854 return;
855 }
856
857 let buckets_to_rotate =
858 (elapsed.as_secs_f64() / self.bucket_duration.as_secs_f64()) as usize;
859 if buckets_to_rotate == 0 {
860 return;
861 }
862
863 {
865 let mut last = self.last_rotation.write();
866 *last = now;
867 }
868
869 let buckets = self.buckets.read();
870
871 for _ in 0..buckets_to_rotate.min(self.num_buckets) {
873 let next = (self.current_bucket.load(Ordering::Relaxed) + 1) % self.num_buckets;
874 buckets[next].store(0, Ordering::Relaxed);
875 self.current_bucket.store(next, Ordering::Relaxed);
876 }
877 }
878}
879
880#[cfg(test)]
881mod tests {
882 use super::*;
883
884 #[test]
885 fn test_token_bucket_basic() {
886 let bucket = TokenBucket::new(10, 10.0);
887
888 assert!(bucket.try_acquire(5));
890 assert!(bucket.try_acquire(5));
891
892 assert!(!bucket.try_acquire(1));
894
895 let stats = bucket.stats();
896 assert_eq!(stats.acquired, 10);
897 assert_eq!(stats.rejected, 1);
898 }
899
900 #[tokio::test]
901 async fn test_token_bucket_refill() {
902 let bucket = TokenBucket::new(1000, 100.0); bucket.try_acquire(1000);
907 assert!(!bucket.try_acquire(1));
908
909 tokio::time::sleep(Duration::from_millis(50)).await;
911
912 let available = bucket.available();
914 assert!(
915 available >= 4,
916 "Expected at least 4 tokens, got {}",
917 available
918 );
919 }
920
921 #[test]
922 fn test_credit_flow_control() {
923 let flow = CreditFlowControl::new(10, 100);
924
925 assert!(flow.try_consume(5));
927 assert!(flow.try_consume(5));
928 assert!(!flow.try_consume(1));
929
930 assert_eq!(flow.available(), 0);
931
932 flow.grant(5);
934 assert_eq!(flow.available(), 5);
935 assert!(flow.try_consume(5));
936 }
937
938 #[test]
939 fn test_adaptive_rate_limiter() {
940 let config = AdaptiveRateLimiterConfig {
941 initial_rate: 1000,
942 min_rate: 100,
943 max_rate: 10000,
944 target_latency_us: 10000,
945 additive_increase: 100,
946 multiplicative_decrease: 0.5,
947 };
948
949 let limiter = AdaptiveRateLimiter::new(config);
950
951 assert_eq!(limiter.current_rate(), 1000);
953
954 for _ in 0..100 {
956 limiter.record_latency(Duration::from_millis(20));
957 }
958 assert!(limiter.current_rate() < 1000);
959 }
960
961 #[test]
962 fn test_circuit_breaker_closed() {
963 let breaker = CircuitBreaker::new(CircuitBreakerConfig {
964 failure_threshold: 3,
965 ..Default::default()
966 });
967
968 assert!(breaker.allow());
970 breaker.record_success();
971
972 assert_eq!(breaker.state(), CircuitState::Closed);
973 }
974
975 #[test]
976 fn test_circuit_breaker_opens() {
977 let breaker = CircuitBreaker::new(CircuitBreakerConfig {
978 failure_threshold: 3,
979 ..Default::default()
980 });
981
982 for _ in 0..3 {
984 assert!(breaker.allow());
985 breaker.record_failure();
986 }
987
988 match breaker.state() {
990 CircuitState::Open { .. } => {}
991 _ => panic!("Expected open state"),
992 }
993
994 assert!(!breaker.allow());
996 }
997
998 #[tokio::test]
999 async fn test_backpressure_channel() {
1000 let channel = BackpressureChannel::new(3);
1001
1002 channel.send(1).await.unwrap();
1004 channel.send(2).await.unwrap();
1005 channel.send(3).await.unwrap();
1006
1007 assert_eq!(channel.len(), 3);
1008
1009 assert!(channel.try_send(4).is_err());
1011
1012 assert_eq!(channel.recv().await, Some(1));
1014 assert_eq!(channel.len(), 2);
1015
1016 channel.send(4).await.unwrap();
1018 }
1019
1020 #[test]
1021 fn test_windowed_rate_tracker() {
1022 let tracker = WindowedRateTracker::new(Duration::from_secs(1), 10);
1023
1024 tracker.record(100);
1026 tracker.record(100);
1027
1028 assert_eq!(tracker.total(), 200);
1029 assert!(tracker.rate() > 0.0);
1030 }
1031}