1use parking_lot::RwLock;
17use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use tokio::sync::{mpsc, Mutex, Notify, Semaphore};
21
22#[derive(Debug)]
25pub struct TokenBucket {
26 tokens: AtomicU64,
28 capacity: u64,
30 refill_rate_bits: AtomicU64,
32 created: Instant,
34 last_refill: AtomicU64,
36 stats: TokenBucketStats,
38}
39
40impl TokenBucket {
41 pub fn new(capacity: u64, refill_rate: f64) -> Self {
43 let now = Instant::now();
44 Self {
45 tokens: AtomicU64::new(capacity),
46 capacity,
47 refill_rate_bits: AtomicU64::new(refill_rate.to_bits()),
48 created: now,
49 last_refill: AtomicU64::new(0),
50 stats: TokenBucketStats::new(),
51 }
52 }
53
54 pub fn try_acquire(&self, count: u64) -> bool {
56 self.refill();
57
58 loop {
59 let current = self.tokens.load(Ordering::Acquire);
60 if current < count {
61 self.stats.rejected.fetch_add(1, Ordering::Relaxed);
62 return false;
63 }
64
65 if self
66 .tokens
67 .compare_exchange_weak(
68 current,
69 current - count,
70 Ordering::AcqRel,
71 Ordering::Relaxed,
72 )
73 .is_ok()
74 {
75 self.stats.acquired.fetch_add(count, Ordering::Relaxed);
76 return true;
77 }
78 std::hint::spin_loop();
80 }
81 }
82
83 pub async fn acquire(&self, count: u64) {
85 while !self.try_acquire(count) {
86 let tokens_needed = count.saturating_sub(self.tokens.load(Ordering::Relaxed));
88 let rate = self.refill_rate();
89 let wait_secs = tokens_needed as f64 / rate;
90 let wait_duration = Duration::from_secs_f64(wait_secs.max(0.001));
91
92 tokio::time::sleep(wait_duration).await;
93 }
94 }
95
96 fn refill_rate(&self) -> f64 {
98 f64::from_bits(self.refill_rate_bits.load(Ordering::Relaxed))
99 }
100
101 pub fn update_rate(&self, new_rate: f64) {
103 self.refill_rate_bits
104 .store(new_rate.to_bits(), Ordering::Relaxed);
105 }
106
107 fn refill(&self) {
109 let now = self.created.elapsed().as_nanos() as u64;
110 let last = self.last_refill.load(Ordering::Acquire);
111
112 if now <= last {
113 return;
114 }
115
116 let rate = self.refill_rate();
117 let elapsed_secs = (now - last) as f64 / 1_000_000_000.0;
118 let new_tokens = (elapsed_secs * rate) as u64;
119
120 if new_tokens == 0 {
121 return;
122 }
123
124 if self
125 .last_refill
126 .compare_exchange(last, now, Ordering::AcqRel, Ordering::Relaxed)
127 .is_ok()
128 {
129 loop {
130 let current = self.tokens.load(Ordering::Acquire);
131 let new_total = (current + new_tokens).min(self.capacity);
132
133 if self
134 .tokens
135 .compare_exchange_weak(current, new_total, Ordering::AcqRel, Ordering::Relaxed)
136 .is_ok()
137 {
138 break;
139 }
140 }
141 }
142 }
143
144 pub fn available(&self) -> u64 {
146 self.refill();
147 self.tokens.load(Ordering::Relaxed)
148 }
149
150 pub fn stats(&self) -> TokenBucketStatsSnapshot {
152 TokenBucketStatsSnapshot {
153 acquired: self.stats.acquired.load(Ordering::Relaxed),
154 rejected: self.stats.rejected.load(Ordering::Relaxed),
155 available: self.available(),
156 capacity: self.capacity,
157 }
158 }
159}
160
161#[derive(Debug)]
162struct TokenBucketStats {
163 acquired: AtomicU64,
164 rejected: AtomicU64,
165}
166
167impl TokenBucketStats {
168 fn new() -> Self {
169 Self {
170 acquired: AtomicU64::new(0),
171 rejected: AtomicU64::new(0),
172 }
173 }
174}
175
176#[derive(Debug, Clone)]
177pub struct TokenBucketStatsSnapshot {
178 pub acquired: u64,
179 pub rejected: u64,
180 pub available: u64,
181 pub capacity: u64,
182}
183
184#[derive(Debug)]
187pub struct CreditFlowControl {
188 credits: AtomicU64,
190 max_credits: u64,
192 credit_notify: Notify,
194 stats: CreditStats,
196}
197
198impl CreditFlowControl {
199 pub fn new(initial_credits: u64, max_credits: u64) -> Self {
201 Self {
202 credits: AtomicU64::new(initial_credits),
203 max_credits,
204 credit_notify: Notify::new(),
205 stats: CreditStats::new(),
206 }
207 }
208
209 pub fn try_consume(&self, count: u64) -> bool {
211 loop {
212 let current = self.credits.load(Ordering::Acquire);
213 if current < count {
214 self.stats.blocked.fetch_add(1, Ordering::Relaxed);
215 return false;
216 }
217
218 if self
219 .credits
220 .compare_exchange_weak(
221 current,
222 current - count,
223 Ordering::AcqRel,
224 Ordering::Relaxed,
225 )
226 .is_ok()
227 {
228 self.stats.consumed.fetch_add(count, Ordering::Relaxed);
229 return true;
230 }
231 }
232 }
233
234 pub async fn consume(&self, count: u64) {
236 while !self.try_consume(count) {
237 self.credit_notify.notified().await;
238 }
239 }
240
241 pub fn grant(&self, count: u64) {
243 loop {
244 let current = self.credits.load(Ordering::Acquire);
245 let new_total = (current + count).min(self.max_credits);
246
247 if self
248 .credits
249 .compare_exchange_weak(current, new_total, Ordering::AcqRel, Ordering::Relaxed)
250 .is_ok()
251 {
252 self.stats.granted.fetch_add(count, Ordering::Relaxed);
253 self.credit_notify.notify_waiters();
254 break;
255 }
256 }
257 }
258
259 pub fn available(&self) -> u64 {
261 self.credits.load(Ordering::Relaxed)
262 }
263
264 pub fn stats(&self) -> CreditStatsSnapshot {
266 CreditStatsSnapshot {
267 consumed: self.stats.consumed.load(Ordering::Relaxed),
268 granted: self.stats.granted.load(Ordering::Relaxed),
269 blocked: self.stats.blocked.load(Ordering::Relaxed),
270 available: self.available(),
271 }
272 }
273}
274
275#[derive(Debug)]
276struct CreditStats {
277 consumed: AtomicU64,
278 granted: AtomicU64,
279 blocked: AtomicU64,
280}
281
282impl CreditStats {
283 fn new() -> Self {
284 Self {
285 consumed: AtomicU64::new(0),
286 granted: AtomicU64::new(0),
287 blocked: AtomicU64::new(0),
288 }
289 }
290}
291
292#[derive(Debug, Clone)]
293pub struct CreditStatsSnapshot {
294 pub consumed: u64,
295 pub granted: u64,
296 pub blocked: u64,
297 pub available: u64,
298}
299
300#[derive(Debug)]
303pub struct AdaptiveRateLimiter {
304 rate: AtomicU64,
306 min_rate: u64,
308 max_rate: u64,
310 target_latency_us: u64,
312 additive_increase: u64,
314 multiplicative_decrease: f64,
316 bucket: TokenBucket,
318 latency_samples: RwLock<LatencySamples>,
320 stats: AdaptiveStats,
322}
323
324#[derive(Debug)]
325struct LatencySamples {
326 samples: Vec<u64>,
327 index: usize,
328 filled: bool,
329}
330
331impl LatencySamples {
332 fn new(size: usize) -> Self {
333 Self {
334 samples: vec![0; size],
335 index: 0,
336 filled: false,
337 }
338 }
339
340 fn add(&mut self, latency_us: u64) {
341 self.samples[self.index] = latency_us;
342 self.index = (self.index + 1) % self.samples.len();
343 if self.index == 0 {
344 self.filled = true;
345 }
346 }
347
348 fn percentile(&self, p: f64) -> u64 {
349 let count = if self.filled {
350 self.samples.len()
351 } else {
352 self.index
353 };
354 if count == 0 {
355 return 0;
356 }
357
358 let mut sorted: Vec<u64> = self.samples[..count].to_vec();
359 sorted.sort_unstable();
360
361 let idx = ((count as f64 * p) as usize).min(count - 1);
362 sorted[idx]
363 }
364}
365
366impl AdaptiveRateLimiter {
367 pub fn new(config: AdaptiveRateLimiterConfig) -> Self {
369 let bucket = TokenBucket::new(config.initial_rate, config.initial_rate as f64);
370
371 Self {
372 rate: AtomicU64::new(config.initial_rate),
373 min_rate: config.min_rate,
374 max_rate: config.max_rate,
375 target_latency_us: config.target_latency_us,
376 additive_increase: config.additive_increase,
377 multiplicative_decrease: config.multiplicative_decrease,
378 bucket,
379 latency_samples: RwLock::new(LatencySamples::new(100)),
380 stats: AdaptiveStats::new(),
381 }
382 }
383
384 pub fn try_acquire(&self) -> bool {
386 self.bucket.try_acquire(1)
387 }
388
389 pub async fn acquire(&self) {
391 self.bucket.acquire(1).await
392 }
393
394 pub fn record_latency(&self, latency: Duration) {
396 let latency_us = latency.as_micros() as u64;
397
398 {
399 let mut samples = self.latency_samples.write();
400 samples.add(latency_us);
401 }
402
403 let p99 = {
405 let samples = self.latency_samples.read();
406 samples.percentile(0.99)
407 };
408
409 let current_rate = self.rate.load(Ordering::Relaxed);
410
411 if p99 > self.target_latency_us {
412 let new_rate =
414 ((current_rate as f64 * self.multiplicative_decrease) as u64).max(self.min_rate);
415 self.rate.store(new_rate, Ordering::Relaxed);
416 self.bucket.update_rate(new_rate as f64);
417 self.stats.decreases.fetch_add(1, Ordering::Relaxed);
418 } else if p99 < self.target_latency_us / 2 {
419 let new_rate = (current_rate + self.additive_increase).min(self.max_rate);
421 self.rate.store(new_rate, Ordering::Relaxed);
422 self.bucket.update_rate(new_rate as f64);
423 self.stats.increases.fetch_add(1, Ordering::Relaxed);
424 }
425 }
426
427 pub fn current_rate(&self) -> u64 {
429 self.rate.load(Ordering::Relaxed)
430 }
431
432 pub fn stats(&self) -> AdaptiveStatsSnapshot {
434 let samples = self.latency_samples.read();
435 AdaptiveStatsSnapshot {
436 current_rate: self.current_rate(),
437 increases: self.stats.increases.load(Ordering::Relaxed),
438 decreases: self.stats.decreases.load(Ordering::Relaxed),
439 p50_latency_us: samples.percentile(0.5),
440 p99_latency_us: samples.percentile(0.99),
441 }
442 }
443}
444
445#[derive(Debug, Clone)]
447pub struct AdaptiveRateLimiterConfig {
448 pub initial_rate: u64,
449 pub min_rate: u64,
450 pub max_rate: u64,
451 pub target_latency_us: u64,
452 pub additive_increase: u64,
453 pub multiplicative_decrease: f64,
454}
455
456impl Default for AdaptiveRateLimiterConfig {
457 fn default() -> Self {
458 Self {
459 initial_rate: 10000,
460 min_rate: 100,
461 max_rate: 1000000,
462 target_latency_us: 10000, additive_increase: 100,
464 multiplicative_decrease: 0.5,
465 }
466 }
467}
468
469#[derive(Debug)]
470struct AdaptiveStats {
471 increases: AtomicU64,
472 decreases: AtomicU64,
473}
474
475impl AdaptiveStats {
476 fn new() -> Self {
477 Self {
478 increases: AtomicU64::new(0),
479 decreases: AtomicU64::new(0),
480 }
481 }
482}
483
484#[derive(Debug, Clone)]
485pub struct AdaptiveStatsSnapshot {
486 pub current_rate: u64,
487 pub increases: u64,
488 pub decreases: u64,
489 pub p50_latency_us: u64,
490 pub p99_latency_us: u64,
491}
492
493#[derive(Debug)]
495pub struct CircuitBreaker {
496 state: RwLock<CircuitState>,
498 config: CircuitBreakerConfig,
500 stats: CircuitBreakerStats,
502 window_start: parking_lot::Mutex<Instant>,
504}
505
506#[derive(Debug, Clone, Copy, PartialEq, Eq)]
507pub enum CircuitState {
508 Closed,
510 Open { opened_at: Instant },
512 HalfOpen,
514}
515
516#[derive(Debug, Clone)]
518pub struct CircuitBreakerConfig {
519 pub failure_threshold: u32,
521 pub failure_window: Duration,
523 pub recovery_timeout: Duration,
525 pub success_threshold: u32,
527}
528
529impl Default for CircuitBreakerConfig {
530 fn default() -> Self {
531 Self {
532 failure_threshold: 5,
533 failure_window: Duration::from_secs(60),
534 recovery_timeout: Duration::from_secs(30),
535 success_threshold: 3,
536 }
537 }
538}
539
540impl CircuitBreaker {
541 pub fn new(config: CircuitBreakerConfig) -> Self {
543 Self {
544 state: RwLock::new(CircuitState::Closed),
545 config,
546 stats: CircuitBreakerStats::new(),
547 window_start: parking_lot::Mutex::new(Instant::now()),
548 }
549 }
550
551 pub fn allow(&self) -> bool {
553 let state = *self.state.read();
554
555 match state {
556 CircuitState::Closed => {
557 self.stats.allowed.fetch_add(1, Ordering::Relaxed);
558 true
559 }
560 CircuitState::Open { opened_at } => {
561 if opened_at.elapsed() > self.config.recovery_timeout {
562 *self.state.write() = CircuitState::HalfOpen;
564 self.stats.allowed.fetch_add(1, Ordering::Relaxed);
565 true
566 } else {
567 self.stats.rejected.fetch_add(1, Ordering::Relaxed);
568 false
569 }
570 }
571 CircuitState::HalfOpen => {
572 self.stats.allowed.fetch_add(1, Ordering::Relaxed);
574 true
575 }
576 }
577 }
578
579 pub fn record_success(&self) {
581 let mut state = self.state.write();
582 self.stats.successes.fetch_add(1, Ordering::Relaxed);
583
584 match *state {
585 CircuitState::HalfOpen => {
586 let successes = self
587 .stats
588 .half_open_successes
589 .fetch_add(1, Ordering::Relaxed)
590 + 1;
591 if successes >= self.config.success_threshold as u64 {
592 *state = CircuitState::Closed;
593 self.stats.half_open_successes.store(0, Ordering::Relaxed);
594 self.stats.failures_in_window.store(0, Ordering::Relaxed);
595 }
596 }
597 CircuitState::Closed => {
598 self.stats.failures_in_window.store(0, Ordering::Relaxed);
600 }
601 _ => {}
602 }
603 }
604
605 pub fn record_failure(&self) {
607 let mut state = self.state.write();
608 self.stats.failures.fetch_add(1, Ordering::Relaxed);
609
610 match *state {
611 CircuitState::Closed => {
612 let mut ws = self.window_start.lock();
614 if ws.elapsed() > self.config.failure_window {
615 self.stats.failures_in_window.store(1, Ordering::Relaxed);
616 *ws = Instant::now();
617 } else {
618 let failures = self
619 .stats
620 .failures_in_window
621 .fetch_add(1, Ordering::Relaxed)
622 + 1;
623 if failures >= self.config.failure_threshold as u64 {
624 *state = CircuitState::Open {
625 opened_at: Instant::now(),
626 };
627 self.stats.opens.fetch_add(1, Ordering::Relaxed);
628 }
629 }
630 }
631 CircuitState::HalfOpen => {
632 *state = CircuitState::Open {
634 opened_at: Instant::now(),
635 };
636 self.stats.half_open_successes.store(0, Ordering::Relaxed);
637 self.stats.opens.fetch_add(1, Ordering::Relaxed);
638 }
639 _ => {}
640 }
641 }
642
643 pub fn state(&self) -> CircuitState {
645 *self.state.read()
646 }
647
648 pub fn stats(&self) -> CircuitBreakerStatsSnapshot {
650 CircuitBreakerStatsSnapshot {
651 state: self.state(),
652 allowed: self.stats.allowed.load(Ordering::Relaxed),
653 rejected: self.stats.rejected.load(Ordering::Relaxed),
654 successes: self.stats.successes.load(Ordering::Relaxed),
655 failures: self.stats.failures.load(Ordering::Relaxed),
656 opens: self.stats.opens.load(Ordering::Relaxed),
657 }
658 }
659}
660
661#[derive(Debug)]
662struct CircuitBreakerStats {
663 allowed: AtomicU64,
664 rejected: AtomicU64,
665 successes: AtomicU64,
666 failures: AtomicU64,
667 failures_in_window: AtomicU64,
668 half_open_successes: AtomicU64,
669 opens: AtomicU64,
670}
671
672impl CircuitBreakerStats {
673 fn new() -> Self {
674 Self {
675 allowed: AtomicU64::new(0),
676 rejected: AtomicU64::new(0),
677 successes: AtomicU64::new(0),
678 failures: AtomicU64::new(0),
679 failures_in_window: AtomicU64::new(0),
680 half_open_successes: AtomicU64::new(0),
681 opens: AtomicU64::new(0),
682 }
683 }
684}
685
686#[derive(Debug, Clone)]
687pub struct CircuitBreakerStatsSnapshot {
688 pub state: CircuitState,
689 pub allowed: u64,
690 pub rejected: u64,
691 pub successes: u64,
692 pub failures: u64,
693 pub opens: u64,
694}
695
696pub struct BackpressureChannel<T> {
699 tx: mpsc::Sender<T>,
701 rx: Mutex<mpsc::Receiver<T>>,
703 permits: Arc<Semaphore>,
705 capacity: usize,
707 stats: ChannelStats,
709}
710
711impl<T> BackpressureChannel<T> {
712 pub fn new(capacity: usize) -> Self {
714 let (tx, rx) = mpsc::channel(capacity);
716
717 Self {
718 tx,
719 rx: Mutex::new(rx),
720 permits: Arc::new(Semaphore::new(0)), capacity,
722 stats: ChannelStats::new(),
723 }
724 }
725
726 pub async fn send(&self, value: T) -> Result<(), mpsc::error::SendError<T>> {
728 self.stats.sent.fetch_add(1, Ordering::Relaxed);
729 let result = self.tx.send(value).await;
730 if result.is_ok() {
731 self.permits.add_permits(1);
733 }
734 result
735 }
736
737 pub fn try_send(&self, value: T) -> Result<(), mpsc::error::TrySendError<T>> {
739 let result = self.tx.try_send(value);
740 match &result {
741 Ok(()) => {
742 self.stats.sent.fetch_add(1, Ordering::Relaxed);
743 self.permits.add_permits(1);
744 }
745 Err(mpsc::error::TrySendError::Full(_)) => {
746 self.stats.blocked.fetch_add(1, Ordering::Relaxed);
747 }
748 _ => {}
749 }
750 result
751 }
752
753 pub async fn recv(&self) -> Option<T> {
755 let permit = self.permits.acquire().await.ok()?;
757 permit.forget(); let mut rx = self.rx.lock().await;
760 let result = rx.recv().await;
761
762 if result.is_some() {
763 self.stats.received.fetch_add(1, Ordering::Relaxed);
764 }
765
766 result
767 }
768
769 pub fn len(&self) -> usize {
771 self.permits.available_permits()
772 }
773
774 pub fn is_empty(&self) -> bool {
776 self.len() == 0
777 }
778
779 pub fn stats(&self) -> ChannelStatsSnapshot {
781 ChannelStatsSnapshot {
782 sent: self.stats.sent.load(Ordering::Relaxed),
783 received: self.stats.received.load(Ordering::Relaxed),
784 blocked: self.stats.blocked.load(Ordering::Relaxed),
785 current_len: self.len(),
786 capacity: self.capacity,
787 }
788 }
789}
790
791struct ChannelStats {
792 sent: AtomicU64,
793 received: AtomicU64,
794 blocked: AtomicU64,
795}
796
797impl ChannelStats {
798 fn new() -> Self {
799 Self {
800 sent: AtomicU64::new(0),
801 received: AtomicU64::new(0),
802 blocked: AtomicU64::new(0),
803 }
804 }
805}
806
807#[derive(Debug, Clone)]
808pub struct ChannelStatsSnapshot {
809 pub sent: u64,
810 pub received: u64,
811 pub blocked: u64,
812 pub current_len: usize,
813 pub capacity: usize,
814}
815
816pub struct WindowedRateTracker {
818 buckets: RwLock<Vec<AtomicU64>>,
820 bucket_duration: Duration,
822 num_buckets: usize,
824 current_bucket: AtomicUsize,
826 last_rotation: RwLock<Instant>,
828}
829
830impl WindowedRateTracker {
831 pub fn new(window_duration: Duration, num_buckets: usize) -> Self {
833 let buckets: Vec<AtomicU64> = (0..num_buckets).map(|_| AtomicU64::new(0)).collect();
834
835 Self {
836 buckets: RwLock::new(buckets),
837 bucket_duration: window_duration / num_buckets as u32,
838 num_buckets,
839 current_bucket: AtomicUsize::new(0),
840 last_rotation: RwLock::new(Instant::now()),
841 }
842 }
843
844 pub fn record(&self, count: u64) {
846 self.maybe_rotate();
847
848 let buckets = self.buckets.read();
849 let idx = self.current_bucket.load(Ordering::Relaxed);
850 buckets[idx].fetch_add(count, Ordering::Relaxed);
851 }
852
853 pub fn rate(&self) -> f64 {
855 self.maybe_rotate();
856
857 let buckets = self.buckets.read();
858 let total: u64 = buckets.iter().map(|b| b.load(Ordering::Relaxed)).sum();
859
860 let window_secs = self.bucket_duration.as_secs_f64() * self.num_buckets as f64;
861 total as f64 / window_secs
862 }
863
864 pub fn total(&self) -> u64 {
866 self.maybe_rotate();
867
868 let buckets = self.buckets.read();
869 buckets.iter().map(|b| b.load(Ordering::Relaxed)).sum()
870 }
871
872 fn maybe_rotate(&self) {
874 let now = Instant::now();
875 let elapsed = {
876 let last = self.last_rotation.read();
877 now.duration_since(*last)
878 };
879
880 if elapsed < self.bucket_duration {
881 return;
882 }
883
884 let buckets_to_rotate =
885 (elapsed.as_secs_f64() / self.bucket_duration.as_secs_f64()) as usize;
886 if buckets_to_rotate == 0 {
887 return;
888 }
889
890 {
892 let mut last = self.last_rotation.write();
893 *last = now;
894 }
895
896 let buckets = self.buckets.read();
897
898 for _ in 0..buckets_to_rotate.min(self.num_buckets) {
900 let next = (self.current_bucket.load(Ordering::Relaxed) + 1) % self.num_buckets;
901 buckets[next].store(0, Ordering::Relaxed);
902 self.current_bucket.store(next, Ordering::Relaxed);
903 }
904 }
905}
906
907#[cfg(test)]
908mod tests {
909 use super::*;
910
911 #[test]
912 fn test_token_bucket_basic() {
913 let bucket = TokenBucket::new(10, 10.0);
914
915 assert!(bucket.try_acquire(5));
917 assert!(bucket.try_acquire(5));
918
919 assert!(!bucket.try_acquire(1));
921
922 let stats = bucket.stats();
923 assert_eq!(stats.acquired, 10);
924 assert_eq!(stats.rejected, 1);
925 }
926
927 #[tokio::test]
928 async fn test_token_bucket_refill() {
929 let bucket = TokenBucket::new(1000, 100.0); bucket.try_acquire(1000);
934 assert!(!bucket.try_acquire(1));
935
936 tokio::time::sleep(Duration::from_millis(50)).await;
938
939 let available = bucket.available();
941 assert!(
942 available >= 4,
943 "Expected at least 4 tokens, got {}",
944 available
945 );
946 }
947
948 #[test]
949 fn test_credit_flow_control() {
950 let flow = CreditFlowControl::new(10, 100);
951
952 assert!(flow.try_consume(5));
954 assert!(flow.try_consume(5));
955 assert!(!flow.try_consume(1));
956
957 assert_eq!(flow.available(), 0);
958
959 flow.grant(5);
961 assert_eq!(flow.available(), 5);
962 assert!(flow.try_consume(5));
963 }
964
965 #[test]
966 fn test_adaptive_rate_limiter() {
967 let config = AdaptiveRateLimiterConfig {
968 initial_rate: 1000,
969 min_rate: 100,
970 max_rate: 10000,
971 target_latency_us: 10000,
972 additive_increase: 100,
973 multiplicative_decrease: 0.5,
974 };
975
976 let limiter = AdaptiveRateLimiter::new(config);
977
978 assert_eq!(limiter.current_rate(), 1000);
980
981 for _ in 0..100 {
983 limiter.record_latency(Duration::from_millis(20));
984 }
985 assert!(limiter.current_rate() < 1000);
986 }
987
988 #[test]
989 fn test_circuit_breaker_closed() {
990 let breaker = CircuitBreaker::new(CircuitBreakerConfig {
991 failure_threshold: 3,
992 ..Default::default()
993 });
994
995 assert!(breaker.allow());
997 breaker.record_success();
998
999 assert_eq!(breaker.state(), CircuitState::Closed);
1000 }
1001
1002 #[test]
1003 fn test_circuit_breaker_opens() {
1004 let breaker = CircuitBreaker::new(CircuitBreakerConfig {
1005 failure_threshold: 3,
1006 ..Default::default()
1007 });
1008
1009 for _ in 0..3 {
1011 assert!(breaker.allow());
1012 breaker.record_failure();
1013 }
1014
1015 match breaker.state() {
1017 CircuitState::Open { .. } => {}
1018 _ => panic!("Expected open state"),
1019 }
1020
1021 assert!(!breaker.allow());
1023 }
1024
1025 #[tokio::test]
1026 async fn test_backpressure_channel() {
1027 let channel = BackpressureChannel::new(3);
1028
1029 channel.send(1).await.unwrap();
1031 channel.send(2).await.unwrap();
1032 channel.send(3).await.unwrap();
1033
1034 assert_eq!(channel.len(), 3);
1035
1036 assert!(channel.try_send(4).is_err());
1038
1039 assert_eq!(channel.recv().await, Some(1));
1041 assert_eq!(channel.len(), 2);
1042
1043 channel.send(4).await.unwrap();
1045 }
1046
1047 #[test]
1048 fn test_windowed_rate_tracker() {
1049 let tracker = WindowedRateTracker::new(Duration::from_secs(1), 10);
1050
1051 tracker.record(100);
1053 tracker.record(100);
1054
1055 assert_eq!(tracker.total(), 200);
1056 assert!(tracker.rate() > 0.0);
1057 }
1058}