Skip to main content

rivven_core/
backpressure.rs

1//! Backpressure and Flow Control
2//!
3//! Production-grade flow control mechanisms for high-throughput streaming:
4//! - **Token Bucket**: Rate limiting with burst capacity
5//! - **Leaky Bucket**: Smooth rate limiting
6//! - **Credit-Based Flow Control**: Receiver-driven backpressure
7//! - **Adaptive Rate Limiter**: Self-tuning based on latency
8//! - **Circuit Breaker**: Fail-fast on downstream issues
9//! - **Windowed Rate Tracker**: Sliding window request tracking
10//!
11//! Based on industry best practices from:
12//! - TCP congestion control (AIMD, BBR concepts)
13//! - Reactive Streams backpressure
14//! - Netflix Hystrix circuit breaker patterns
15
16use parking_lot::RwLock;
17use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use tokio::sync::{mpsc, Mutex, Notify, Semaphore};
21
22/// Token bucket rate limiter
23/// Allows bursts up to bucket capacity, then limits to refill rate
24#[derive(Debug)]
25pub struct TokenBucket {
26    /// Current number of tokens
27    tokens: AtomicU64,
28    /// Maximum tokens (burst capacity)
29    capacity: u64,
30    /// Tokens added per second
31    refill_rate: f64,
32    /// Creation time for time reference
33    created: Instant,
34    /// Last refill timestamp (nanoseconds since creation)
35    last_refill: AtomicU64,
36    /// Statistics
37    stats: TokenBucketStats,
38}
39
40impl TokenBucket {
41    /// Create a new token bucket
42    pub fn new(capacity: u64, refill_rate: f64) -> Self {
43        let now = Instant::now();
44        Self {
45            tokens: AtomicU64::new(capacity),
46            capacity,
47            refill_rate,
48            created: now,
49            last_refill: AtomicU64::new(0),
50            stats: TokenBucketStats::new(),
51        }
52    }
53
54    /// Try to acquire tokens without blocking
55    pub fn try_acquire(&self, count: u64) -> bool {
56        self.refill();
57
58        loop {
59            let current = self.tokens.load(Ordering::Acquire);
60            if current < count {
61                self.stats.rejected.fetch_add(1, Ordering::Relaxed);
62                return false;
63            }
64
65            if self
66                .tokens
67                .compare_exchange_weak(
68                    current,
69                    current - count,
70                    Ordering::AcqRel,
71                    Ordering::Relaxed,
72                )
73                .is_ok()
74            {
75                self.stats.acquired.fetch_add(count, Ordering::Relaxed);
76                return true;
77            }
78        }
79    }
80
81    /// Acquire tokens, blocking if necessary
82    pub async fn acquire(&self, count: u64) {
83        while !self.try_acquire(count) {
84            // Calculate wait time
85            let tokens_needed = count.saturating_sub(self.tokens.load(Ordering::Relaxed));
86            let wait_secs = tokens_needed as f64 / self.refill_rate;
87            let wait_duration = Duration::from_secs_f64(wait_secs.max(0.001));
88
89            tokio::time::sleep(wait_duration).await;
90        }
91    }
92
93    /// Refill tokens based on elapsed time
94    fn refill(&self) {
95        let now = self.created.elapsed().as_nanos() as u64;
96        let last = self.last_refill.load(Ordering::Acquire);
97
98        if now <= last {
99            return;
100        }
101
102        let elapsed_secs = (now - last) as f64 / 1_000_000_000.0;
103        let new_tokens = (elapsed_secs * self.refill_rate) as u64;
104
105        if new_tokens == 0 {
106            return;
107        }
108
109        if self
110            .last_refill
111            .compare_exchange(last, now, Ordering::AcqRel, Ordering::Relaxed)
112            .is_ok()
113        {
114            loop {
115                let current = self.tokens.load(Ordering::Acquire);
116                let new_total = (current + new_tokens).min(self.capacity);
117
118                if self
119                    .tokens
120                    .compare_exchange_weak(current, new_total, Ordering::AcqRel, Ordering::Relaxed)
121                    .is_ok()
122                {
123                    break;
124                }
125            }
126        }
127    }
128
129    /// Get current token count
130    pub fn available(&self) -> u64 {
131        self.refill();
132        self.tokens.load(Ordering::Relaxed)
133    }
134
135    /// Get statistics
136    pub fn stats(&self) -> TokenBucketStatsSnapshot {
137        TokenBucketStatsSnapshot {
138            acquired: self.stats.acquired.load(Ordering::Relaxed),
139            rejected: self.stats.rejected.load(Ordering::Relaxed),
140            available: self.available(),
141            capacity: self.capacity,
142        }
143    }
144}
145
146#[derive(Debug)]
147struct TokenBucketStats {
148    acquired: AtomicU64,
149    rejected: AtomicU64,
150}
151
152impl TokenBucketStats {
153    fn new() -> Self {
154        Self {
155            acquired: AtomicU64::new(0),
156            rejected: AtomicU64::new(0),
157        }
158    }
159}
160
161#[derive(Debug, Clone)]
162pub struct TokenBucketStatsSnapshot {
163    pub acquired: u64,
164    pub rejected: u64,
165    pub available: u64,
166    pub capacity: u64,
167}
168
169/// Credit-based flow control for receiver-driven backpressure
170/// Receivers grant credits to senders, senders can only send up to credit limit
171#[derive(Debug)]
172pub struct CreditFlowControl {
173    /// Available credits
174    credits: AtomicU64,
175    /// Maximum credits
176    max_credits: u64,
177    /// Notify when credits become available
178    credit_notify: Notify,
179    /// Statistics
180    stats: CreditStats,
181}
182
183impl CreditFlowControl {
184    /// Create new credit flow control
185    pub fn new(initial_credits: u64, max_credits: u64) -> Self {
186        Self {
187            credits: AtomicU64::new(initial_credits),
188            max_credits,
189            credit_notify: Notify::new(),
190            stats: CreditStats::new(),
191        }
192    }
193
194    /// Try to consume credits without blocking
195    pub fn try_consume(&self, count: u64) -> bool {
196        loop {
197            let current = self.credits.load(Ordering::Acquire);
198            if current < count {
199                self.stats.blocked.fetch_add(1, Ordering::Relaxed);
200                return false;
201            }
202
203            if self
204                .credits
205                .compare_exchange_weak(
206                    current,
207                    current - count,
208                    Ordering::AcqRel,
209                    Ordering::Relaxed,
210                )
211                .is_ok()
212            {
213                self.stats.consumed.fetch_add(count, Ordering::Relaxed);
214                return true;
215            }
216        }
217    }
218
219    /// Consume credits, blocking if necessary
220    pub async fn consume(&self, count: u64) {
221        while !self.try_consume(count) {
222            self.credit_notify.notified().await;
223        }
224    }
225
226    /// Grant credits (called by receiver)
227    pub fn grant(&self, count: u64) {
228        loop {
229            let current = self.credits.load(Ordering::Acquire);
230            let new_total = (current + count).min(self.max_credits);
231
232            if self
233                .credits
234                .compare_exchange_weak(current, new_total, Ordering::AcqRel, Ordering::Relaxed)
235                .is_ok()
236            {
237                self.stats.granted.fetch_add(count, Ordering::Relaxed);
238                self.credit_notify.notify_waiters();
239                break;
240            }
241        }
242    }
243
244    /// Get available credits
245    pub fn available(&self) -> u64 {
246        self.credits.load(Ordering::Relaxed)
247    }
248
249    /// Get statistics
250    pub fn stats(&self) -> CreditStatsSnapshot {
251        CreditStatsSnapshot {
252            consumed: self.stats.consumed.load(Ordering::Relaxed),
253            granted: self.stats.granted.load(Ordering::Relaxed),
254            blocked: self.stats.blocked.load(Ordering::Relaxed),
255            available: self.available(),
256        }
257    }
258}
259
260#[derive(Debug)]
261struct CreditStats {
262    consumed: AtomicU64,
263    granted: AtomicU64,
264    blocked: AtomicU64,
265}
266
267impl CreditStats {
268    fn new() -> Self {
269        Self {
270            consumed: AtomicU64::new(0),
271            granted: AtomicU64::new(0),
272            blocked: AtomicU64::new(0),
273        }
274    }
275}
276
277#[derive(Debug, Clone)]
278pub struct CreditStatsSnapshot {
279    pub consumed: u64,
280    pub granted: u64,
281    pub blocked: u64,
282    pub available: u64,
283}
284
285/// Adaptive rate limiter that adjusts based on latency feedback
286/// Uses AIMD (Additive Increase Multiplicative Decrease) algorithm
287#[derive(Debug)]
288pub struct AdaptiveRateLimiter {
289    /// Current rate limit (requests per second)
290    rate: AtomicU64,
291    /// Minimum rate
292    min_rate: u64,
293    /// Maximum rate
294    max_rate: u64,
295    /// Target latency (microseconds)
296    target_latency_us: u64,
297    /// Additive increase factor
298    additive_increase: u64,
299    /// Multiplicative decrease factor (0.5 = halve on overload)
300    multiplicative_decrease: f64,
301    /// Inner token bucket for actual rate limiting
302    bucket: TokenBucket,
303    /// Latency samples for adaptation
304    latency_samples: RwLock<LatencySamples>,
305    /// Statistics
306    stats: AdaptiveStats,
307}
308
309#[derive(Debug)]
310struct LatencySamples {
311    samples: Vec<u64>,
312    index: usize,
313    filled: bool,
314}
315
316impl LatencySamples {
317    fn new(size: usize) -> Self {
318        Self {
319            samples: vec![0; size],
320            index: 0,
321            filled: false,
322        }
323    }
324
325    fn add(&mut self, latency_us: u64) {
326        self.samples[self.index] = latency_us;
327        self.index = (self.index + 1) % self.samples.len();
328        if self.index == 0 {
329            self.filled = true;
330        }
331    }
332
333    fn percentile(&self, p: f64) -> u64 {
334        let count = if self.filled {
335            self.samples.len()
336        } else {
337            self.index
338        };
339        if count == 0 {
340            return 0;
341        }
342
343        let mut sorted: Vec<u64> = self.samples[..count].to_vec();
344        sorted.sort_unstable();
345
346        let idx = ((count as f64 * p) as usize).min(count - 1);
347        sorted[idx]
348    }
349}
350
351impl AdaptiveRateLimiter {
352    /// Create new adaptive rate limiter
353    pub fn new(config: AdaptiveRateLimiterConfig) -> Self {
354        let bucket = TokenBucket::new(config.initial_rate, config.initial_rate as f64);
355
356        Self {
357            rate: AtomicU64::new(config.initial_rate),
358            min_rate: config.min_rate,
359            max_rate: config.max_rate,
360            target_latency_us: config.target_latency_us,
361            additive_increase: config.additive_increase,
362            multiplicative_decrease: config.multiplicative_decrease,
363            bucket,
364            latency_samples: RwLock::new(LatencySamples::new(100)),
365            stats: AdaptiveStats::new(),
366        }
367    }
368
369    /// Try to acquire permission
370    pub fn try_acquire(&self) -> bool {
371        self.bucket.try_acquire(1)
372    }
373
374    /// Acquire permission, blocking if necessary
375    pub async fn acquire(&self) {
376        self.bucket.acquire(1).await
377    }
378
379    /// Record latency feedback for adaptation
380    pub fn record_latency(&self, latency: Duration) {
381        let latency_us = latency.as_micros() as u64;
382
383        {
384            let mut samples = self.latency_samples.write();
385            samples.add(latency_us);
386        }
387
388        // Adapt rate based on p99 latency
389        let p99 = {
390            let samples = self.latency_samples.read();
391            samples.percentile(0.99)
392        };
393
394        let current_rate = self.rate.load(Ordering::Relaxed);
395
396        if p99 > self.target_latency_us {
397            // Decrease rate (multiplicative)
398            let new_rate =
399                ((current_rate as f64 * self.multiplicative_decrease) as u64).max(self.min_rate);
400            self.rate.store(new_rate, Ordering::Relaxed);
401            self.stats.decreases.fetch_add(1, Ordering::Relaxed);
402        } else if p99 < self.target_latency_us / 2 {
403            // Increase rate (additive)
404            let new_rate = (current_rate + self.additive_increase).min(self.max_rate);
405            self.rate.store(new_rate, Ordering::Relaxed);
406            self.stats.increases.fetch_add(1, Ordering::Relaxed);
407        }
408    }
409
410    /// Get current rate limit
411    pub fn current_rate(&self) -> u64 {
412        self.rate.load(Ordering::Relaxed)
413    }
414
415    /// Get statistics
416    pub fn stats(&self) -> AdaptiveStatsSnapshot {
417        let samples = self.latency_samples.read();
418        AdaptiveStatsSnapshot {
419            current_rate: self.current_rate(),
420            increases: self.stats.increases.load(Ordering::Relaxed),
421            decreases: self.stats.decreases.load(Ordering::Relaxed),
422            p50_latency_us: samples.percentile(0.5),
423            p99_latency_us: samples.percentile(0.99),
424        }
425    }
426}
427
428/// Configuration for adaptive rate limiter
429#[derive(Debug, Clone)]
430pub struct AdaptiveRateLimiterConfig {
431    pub initial_rate: u64,
432    pub min_rate: u64,
433    pub max_rate: u64,
434    pub target_latency_us: u64,
435    pub additive_increase: u64,
436    pub multiplicative_decrease: f64,
437}
438
439impl Default for AdaptiveRateLimiterConfig {
440    fn default() -> Self {
441        Self {
442            initial_rate: 10000,
443            min_rate: 100,
444            max_rate: 1000000,
445            target_latency_us: 10000, // 10ms
446            additive_increase: 100,
447            multiplicative_decrease: 0.5,
448        }
449    }
450}
451
452#[derive(Debug)]
453struct AdaptiveStats {
454    increases: AtomicU64,
455    decreases: AtomicU64,
456}
457
458impl AdaptiveStats {
459    fn new() -> Self {
460        Self {
461            increases: AtomicU64::new(0),
462            decreases: AtomicU64::new(0),
463        }
464    }
465}
466
467#[derive(Debug, Clone)]
468pub struct AdaptiveStatsSnapshot {
469    pub current_rate: u64,
470    pub increases: u64,
471    pub decreases: u64,
472    pub p50_latency_us: u64,
473    pub p99_latency_us: u64,
474}
475
476/// Circuit breaker for fail-fast on downstream issues
477#[derive(Debug)]
478pub struct CircuitBreaker {
479    /// Current state
480    state: RwLock<CircuitState>,
481    /// Configuration
482    config: CircuitBreakerConfig,
483    /// Statistics
484    stats: CircuitBreakerStats,
485}
486
487#[derive(Debug, Clone, Copy, PartialEq, Eq)]
488pub enum CircuitState {
489    /// Normal operation
490    Closed,
491    /// Failing, rejecting all requests
492    Open { opened_at: Instant },
493    /// Testing if downstream recovered
494    HalfOpen,
495}
496
497/// Circuit breaker configuration
498#[derive(Debug, Clone)]
499pub struct CircuitBreakerConfig {
500    /// Number of failures before opening circuit
501    pub failure_threshold: u32,
502    /// Time window for counting failures
503    pub failure_window: Duration,
504    /// Time to wait before trying half-open
505    pub recovery_timeout: Duration,
506    /// Number of successes in half-open to close
507    pub success_threshold: u32,
508}
509
510impl Default for CircuitBreakerConfig {
511    fn default() -> Self {
512        Self {
513            failure_threshold: 5,
514            failure_window: Duration::from_secs(60),
515            recovery_timeout: Duration::from_secs(30),
516            success_threshold: 3,
517        }
518    }
519}
520
521impl CircuitBreaker {
522    /// Create new circuit breaker
523    pub fn new(config: CircuitBreakerConfig) -> Self {
524        Self {
525            state: RwLock::new(CircuitState::Closed),
526            config,
527            stats: CircuitBreakerStats::new(),
528        }
529    }
530
531    /// Check if request is allowed
532    pub fn allow(&self) -> bool {
533        let state = *self.state.read();
534
535        match state {
536            CircuitState::Closed => {
537                self.stats.allowed.fetch_add(1, Ordering::Relaxed);
538                true
539            }
540            CircuitState::Open { opened_at } => {
541                if opened_at.elapsed() > self.config.recovery_timeout {
542                    // Transition to half-open
543                    *self.state.write() = CircuitState::HalfOpen;
544                    self.stats.allowed.fetch_add(1, Ordering::Relaxed);
545                    true
546                } else {
547                    self.stats.rejected.fetch_add(1, Ordering::Relaxed);
548                    false
549                }
550            }
551            CircuitState::HalfOpen => {
552                // Allow limited requests in half-open
553                self.stats.allowed.fetch_add(1, Ordering::Relaxed);
554                true
555            }
556        }
557    }
558
559    /// Record a successful operation
560    pub fn record_success(&self) {
561        let mut state = self.state.write();
562        self.stats.successes.fetch_add(1, Ordering::Relaxed);
563
564        match *state {
565            CircuitState::HalfOpen => {
566                let successes = self
567                    .stats
568                    .half_open_successes
569                    .fetch_add(1, Ordering::Relaxed)
570                    + 1;
571                if successes >= self.config.success_threshold as u64 {
572                    *state = CircuitState::Closed;
573                    self.stats.half_open_successes.store(0, Ordering::Relaxed);
574                    self.stats.failures_in_window.store(0, Ordering::Relaxed);
575                }
576            }
577            CircuitState::Closed => {
578                // Reset failure count on success
579                self.stats.failures_in_window.store(0, Ordering::Relaxed);
580            }
581            _ => {}
582        }
583    }
584
585    /// Record a failed operation
586    pub fn record_failure(&self) {
587        let mut state = self.state.write();
588        self.stats.failures.fetch_add(1, Ordering::Relaxed);
589
590        match *state {
591            CircuitState::Closed => {
592                let failures = self
593                    .stats
594                    .failures_in_window
595                    .fetch_add(1, Ordering::Relaxed)
596                    + 1;
597                if failures >= self.config.failure_threshold as u64 {
598                    *state = CircuitState::Open {
599                        opened_at: Instant::now(),
600                    };
601                    self.stats.opens.fetch_add(1, Ordering::Relaxed);
602                }
603            }
604            CircuitState::HalfOpen => {
605                // Any failure in half-open reopens circuit
606                *state = CircuitState::Open {
607                    opened_at: Instant::now(),
608                };
609                self.stats.half_open_successes.store(0, Ordering::Relaxed);
610                self.stats.opens.fetch_add(1, Ordering::Relaxed);
611            }
612            _ => {}
613        }
614    }
615
616    /// Get current state
617    pub fn state(&self) -> CircuitState {
618        *self.state.read()
619    }
620
621    /// Get statistics
622    pub fn stats(&self) -> CircuitBreakerStatsSnapshot {
623        CircuitBreakerStatsSnapshot {
624            state: self.state(),
625            allowed: self.stats.allowed.load(Ordering::Relaxed),
626            rejected: self.stats.rejected.load(Ordering::Relaxed),
627            successes: self.stats.successes.load(Ordering::Relaxed),
628            failures: self.stats.failures.load(Ordering::Relaxed),
629            opens: self.stats.opens.load(Ordering::Relaxed),
630        }
631    }
632}
633
634#[derive(Debug)]
635struct CircuitBreakerStats {
636    allowed: AtomicU64,
637    rejected: AtomicU64,
638    successes: AtomicU64,
639    failures: AtomicU64,
640    failures_in_window: AtomicU64,
641    half_open_successes: AtomicU64,
642    opens: AtomicU64,
643}
644
645impl CircuitBreakerStats {
646    fn new() -> Self {
647        Self {
648            allowed: AtomicU64::new(0),
649            rejected: AtomicU64::new(0),
650            successes: AtomicU64::new(0),
651            failures: AtomicU64::new(0),
652            failures_in_window: AtomicU64::new(0),
653            half_open_successes: AtomicU64::new(0),
654            opens: AtomicU64::new(0),
655        }
656    }
657}
658
659#[derive(Debug, Clone)]
660pub struct CircuitBreakerStatsSnapshot {
661    pub state: CircuitState,
662    pub allowed: u64,
663    pub rejected: u64,
664    pub successes: u64,
665    pub failures: u64,
666    pub opens: u64,
667}
668
669/// Bounded channel with backpressure
670/// Provides async send that blocks when buffer is full
671pub struct BackpressureChannel<T> {
672    /// Inner sender
673    tx: mpsc::Sender<T>,
674    /// Inner receiver
675    rx: Mutex<mpsc::Receiver<T>>,
676    /// Semaphore for backpressure
677    permits: Arc<Semaphore>,
678    /// Capacity
679    capacity: usize,
680    /// Statistics
681    stats: ChannelStats,
682}
683
684impl<T> BackpressureChannel<T> {
685    /// Create a new backpressure channel
686    pub fn new(capacity: usize) -> Self {
687        // Use semaphore to track capacity, but start at 0 (no items in channel)
688        let (tx, rx) = mpsc::channel(capacity);
689
690        Self {
691            tx,
692            rx: Mutex::new(rx),
693            permits: Arc::new(Semaphore::new(0)), // Start at 0, add permits when items are sent
694            capacity,
695            stats: ChannelStats::new(),
696        }
697    }
698
699    /// Send a value, blocking if buffer is full
700    pub async fn send(&self, value: T) -> Result<(), mpsc::error::SendError<T>> {
701        self.stats.sent.fetch_add(1, Ordering::Relaxed);
702        let result = self.tx.send(value).await;
703        if result.is_ok() {
704            // Add permit to signal item is in channel
705            self.permits.add_permits(1);
706        }
707        result
708    }
709
710    /// Try to send without blocking
711    pub fn try_send(&self, value: T) -> Result<(), mpsc::error::TrySendError<T>> {
712        let result = self.tx.try_send(value);
713        match &result {
714            Ok(()) => {
715                self.stats.sent.fetch_add(1, Ordering::Relaxed);
716                self.permits.add_permits(1);
717            }
718            Err(mpsc::error::TrySendError::Full(_)) => {
719                self.stats.blocked.fetch_add(1, Ordering::Relaxed);
720            }
721            _ => {}
722        }
723        result
724    }
725
726    /// Receive a value
727    pub async fn recv(&self) -> Option<T> {
728        // Acquire permit (blocks until item available)
729        let permit = self.permits.acquire().await.ok()?;
730        permit.forget(); // Don't release, we're consuming the item
731
732        let mut rx = self.rx.lock().await;
733        let result = rx.recv().await;
734
735        if result.is_some() {
736            self.stats.received.fetch_add(1, Ordering::Relaxed);
737        }
738
739        result
740    }
741
742    /// Get current queue length
743    pub fn len(&self) -> usize {
744        self.permits.available_permits()
745    }
746
747    /// Check if channel is empty
748    pub fn is_empty(&self) -> bool {
749        self.len() == 0
750    }
751
752    /// Get statistics
753    pub fn stats(&self) -> ChannelStatsSnapshot {
754        ChannelStatsSnapshot {
755            sent: self.stats.sent.load(Ordering::Relaxed),
756            received: self.stats.received.load(Ordering::Relaxed),
757            blocked: self.stats.blocked.load(Ordering::Relaxed),
758            current_len: self.len(),
759            capacity: self.capacity,
760        }
761    }
762}
763
764struct ChannelStats {
765    sent: AtomicU64,
766    received: AtomicU64,
767    blocked: AtomicU64,
768}
769
770impl ChannelStats {
771    fn new() -> Self {
772        Self {
773            sent: AtomicU64::new(0),
774            received: AtomicU64::new(0),
775            blocked: AtomicU64::new(0),
776        }
777    }
778}
779
780#[derive(Debug, Clone)]
781pub struct ChannelStatsSnapshot {
782    pub sent: u64,
783    pub received: u64,
784    pub blocked: u64,
785    pub current_len: usize,
786    pub capacity: usize,
787}
788
789/// Windowed rate tracker for monitoring request rates
790pub struct WindowedRateTracker {
791    /// Window buckets
792    buckets: RwLock<Vec<AtomicU64>>,
793    /// Bucket duration
794    bucket_duration: Duration,
795    /// Number of buckets
796    num_buckets: usize,
797    /// Current bucket index
798    current_bucket: AtomicUsize,
799    /// Last bucket rotation time
800    last_rotation: RwLock<Instant>,
801}
802
803impl WindowedRateTracker {
804    /// Create a new windowed rate tracker
805    pub fn new(window_duration: Duration, num_buckets: usize) -> Self {
806        let buckets: Vec<AtomicU64> = (0..num_buckets).map(|_| AtomicU64::new(0)).collect();
807
808        Self {
809            buckets: RwLock::new(buckets),
810            bucket_duration: window_duration / num_buckets as u32,
811            num_buckets,
812            current_bucket: AtomicUsize::new(0),
813            last_rotation: RwLock::new(Instant::now()),
814        }
815    }
816
817    /// Record a request
818    pub fn record(&self, count: u64) {
819        self.maybe_rotate();
820
821        let buckets = self.buckets.read();
822        let idx = self.current_bucket.load(Ordering::Relaxed);
823        buckets[idx].fetch_add(count, Ordering::Relaxed);
824    }
825
826    /// Get rate over the window
827    pub fn rate(&self) -> f64 {
828        self.maybe_rotate();
829
830        let buckets = self.buckets.read();
831        let total: u64 = buckets.iter().map(|b| b.load(Ordering::Relaxed)).sum();
832
833        let window_secs = self.bucket_duration.as_secs_f64() * self.num_buckets as f64;
834        total as f64 / window_secs
835    }
836
837    /// Get total count in window
838    pub fn total(&self) -> u64 {
839        self.maybe_rotate();
840
841        let buckets = self.buckets.read();
842        buckets.iter().map(|b| b.load(Ordering::Relaxed)).sum()
843    }
844
845    /// Maybe rotate to next bucket
846    fn maybe_rotate(&self) {
847        let now = Instant::now();
848        let elapsed = {
849            let last = self.last_rotation.read();
850            now.duration_since(*last)
851        };
852
853        if elapsed < self.bucket_duration {
854            return;
855        }
856
857        let buckets_to_rotate =
858            (elapsed.as_secs_f64() / self.bucket_duration.as_secs_f64()) as usize;
859        if buckets_to_rotate == 0 {
860            return;
861        }
862
863        // Update last rotation
864        {
865            let mut last = self.last_rotation.write();
866            *last = now;
867        }
868
869        let buckets = self.buckets.read();
870
871        // Rotate buckets
872        for _ in 0..buckets_to_rotate.min(self.num_buckets) {
873            let next = (self.current_bucket.load(Ordering::Relaxed) + 1) % self.num_buckets;
874            buckets[next].store(0, Ordering::Relaxed);
875            self.current_bucket.store(next, Ordering::Relaxed);
876        }
877    }
878}
879
880#[cfg(test)]
881mod tests {
882    use super::*;
883
884    #[test]
885    fn test_token_bucket_basic() {
886        let bucket = TokenBucket::new(10, 10.0);
887
888        // Should be able to acquire up to capacity
889        assert!(bucket.try_acquire(5));
890        assert!(bucket.try_acquire(5));
891
892        // Should fail when empty
893        assert!(!bucket.try_acquire(1));
894
895        let stats = bucket.stats();
896        assert_eq!(stats.acquired, 10);
897        assert_eq!(stats.rejected, 1);
898    }
899
900    #[tokio::test]
901    async fn test_token_bucket_refill() {
902        // Use larger capacity so refill doesn't cap out
903        let bucket = TokenBucket::new(1000, 100.0); // 1000 capacity, 100 tokens/sec
904
905        // Drain bucket
906        bucket.try_acquire(1000);
907        assert!(!bucket.try_acquire(1));
908
909        // Wait for refill (50ms should give us ~5 tokens)
910        tokio::time::sleep(Duration::from_millis(50)).await;
911
912        // Should have some tokens now (at least 4 with 100/sec rate over 50ms)
913        let available = bucket.available();
914        assert!(
915            available >= 4,
916            "Expected at least 4 tokens, got {}",
917            available
918        );
919    }
920
921    #[test]
922    fn test_credit_flow_control() {
923        let flow = CreditFlowControl::new(10, 100);
924
925        // Consume credits
926        assert!(flow.try_consume(5));
927        assert!(flow.try_consume(5));
928        assert!(!flow.try_consume(1));
929
930        assert_eq!(flow.available(), 0);
931
932        // Grant credits
933        flow.grant(5);
934        assert_eq!(flow.available(), 5);
935        assert!(flow.try_consume(5));
936    }
937
938    #[test]
939    fn test_adaptive_rate_limiter() {
940        let config = AdaptiveRateLimiterConfig {
941            initial_rate: 1000,
942            min_rate: 100,
943            max_rate: 10000,
944            target_latency_us: 10000,
945            additive_increase: 100,
946            multiplicative_decrease: 0.5,
947        };
948
949        let limiter = AdaptiveRateLimiter::new(config);
950
951        // Initial rate
952        assert_eq!(limiter.current_rate(), 1000);
953
954        // Record high latency -> rate should decrease
955        for _ in 0..100 {
956            limiter.record_latency(Duration::from_millis(20));
957        }
958        assert!(limiter.current_rate() < 1000);
959    }
960
961    #[test]
962    fn test_circuit_breaker_closed() {
963        let breaker = CircuitBreaker::new(CircuitBreakerConfig {
964            failure_threshold: 3,
965            ..Default::default()
966        });
967
968        // Normal operation
969        assert!(breaker.allow());
970        breaker.record_success();
971
972        assert_eq!(breaker.state(), CircuitState::Closed);
973    }
974
975    #[test]
976    fn test_circuit_breaker_opens() {
977        let breaker = CircuitBreaker::new(CircuitBreakerConfig {
978            failure_threshold: 3,
979            ..Default::default()
980        });
981
982        // Trigger failures
983        for _ in 0..3 {
984            assert!(breaker.allow());
985            breaker.record_failure();
986        }
987
988        // Circuit should be open
989        match breaker.state() {
990            CircuitState::Open { .. } => {}
991            _ => panic!("Expected open state"),
992        }
993
994        // New requests should be rejected
995        assert!(!breaker.allow());
996    }
997
998    #[tokio::test]
999    async fn test_backpressure_channel() {
1000        let channel = BackpressureChannel::new(3);
1001
1002        // Fill channel
1003        channel.send(1).await.unwrap();
1004        channel.send(2).await.unwrap();
1005        channel.send(3).await.unwrap();
1006
1007        assert_eq!(channel.len(), 3);
1008
1009        // Try send should fail
1010        assert!(channel.try_send(4).is_err());
1011
1012        // Receive should work
1013        assert_eq!(channel.recv().await, Some(1));
1014        assert_eq!(channel.len(), 2);
1015
1016        // Now we can send again
1017        channel.send(4).await.unwrap();
1018    }
1019
1020    #[test]
1021    fn test_windowed_rate_tracker() {
1022        let tracker = WindowedRateTracker::new(Duration::from_secs(1), 10);
1023
1024        // Record some requests
1025        tracker.record(100);
1026        tracker.record(100);
1027
1028        assert_eq!(tracker.total(), 200);
1029        assert!(tracker.rate() > 0.0);
1030    }
1031}