Skip to main content

rivven_core/
backpressure.rs

1//! Backpressure and Flow Control
2//!
3//! Production-grade flow control mechanisms for high-throughput streaming:
4//! - **Token Bucket**: Rate limiting with burst capacity
5//! - **Leaky Bucket**: Smooth rate limiting
6//! - **Credit-Based Flow Control**: Receiver-driven backpressure
7//! - **Adaptive Rate Limiter**: Self-tuning based on latency
8//! - **Circuit Breaker**: Fail-fast on downstream issues
9//! - **Windowed Rate Tracker**: Sliding window request tracking
10//!
11//! Based on industry best practices from:
12//! - TCP congestion control (AIMD, BBR concepts)
13//! - Reactive Streams backpressure
14//! - Netflix Hystrix circuit breaker patterns
15
16use parking_lot::RwLock;
17use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use tokio::sync::{mpsc, Mutex, Notify, Semaphore};
21
22/// Token bucket rate limiter
23/// Allows bursts up to bucket capacity, then limits to refill rate
24#[derive(Debug)]
25pub struct TokenBucket {
26    /// Current number of tokens
27    tokens: AtomicU64,
28    /// Maximum tokens (burst capacity)
29    capacity: u64,
30    /// Tokens added per second (stored as f64 bits for atomic updates)
31    refill_rate_bits: AtomicU64,
32    /// Creation time for time reference
33    created: Instant,
34    /// Last refill timestamp (nanoseconds since creation)
35    last_refill: AtomicU64,
36    /// Statistics
37    stats: TokenBucketStats,
38}
39
40impl TokenBucket {
41    /// Create a new token bucket
42    pub fn new(capacity: u64, refill_rate: f64) -> Self {
43        let now = Instant::now();
44        Self {
45            tokens: AtomicU64::new(capacity),
46            capacity,
47            refill_rate_bits: AtomicU64::new(refill_rate.to_bits()),
48            created: now,
49            last_refill: AtomicU64::new(0),
50            stats: TokenBucketStats::new(),
51        }
52    }
53
54    /// Try to acquire tokens without blocking
55    pub fn try_acquire(&self, count: u64) -> bool {
56        self.refill();
57
58        loop {
59            let current = self.tokens.load(Ordering::Acquire);
60            if current < count {
61                self.stats.rejected.fetch_add(1, Ordering::Relaxed);
62                return false;
63            }
64
65            if self
66                .tokens
67                .compare_exchange_weak(
68                    current,
69                    current - count,
70                    Ordering::AcqRel,
71                    Ordering::Relaxed,
72                )
73                .is_ok()
74            {
75                self.stats.acquired.fetch_add(count, Ordering::Relaxed);
76                return true;
77            }
78            // Hint to the CPU that we're in a spin loop
79            std::hint::spin_loop();
80        }
81    }
82
83    /// Acquire tokens, blocking if necessary
84    pub async fn acquire(&self, count: u64) {
85        while !self.try_acquire(count) {
86            // Calculate wait time
87            let tokens_needed = count.saturating_sub(self.tokens.load(Ordering::Relaxed));
88            let rate = self.refill_rate();
89            let wait_secs = tokens_needed as f64 / rate;
90            let wait_duration = Duration::from_secs_f64(wait_secs.max(0.001));
91
92            tokio::time::sleep(wait_duration).await;
93        }
94    }
95
96    /// Get the current refill rate (tokens per second)
97    fn refill_rate(&self) -> f64 {
98        f64::from_bits(self.refill_rate_bits.load(Ordering::Relaxed))
99    }
100
101    /// Update the refill rate atomically (for adaptive rate limiting)
102    pub fn update_rate(&self, new_rate: f64) {
103        self.refill_rate_bits
104            .store(new_rate.to_bits(), Ordering::Relaxed);
105    }
106
107    /// Refill tokens based on elapsed time
108    fn refill(&self) {
109        let now = self.created.elapsed().as_nanos() as u64;
110        let last = self.last_refill.load(Ordering::Acquire);
111
112        if now <= last {
113            return;
114        }
115
116        let rate = self.refill_rate();
117        let elapsed_secs = (now - last) as f64 / 1_000_000_000.0;
118        let new_tokens = (elapsed_secs * rate) as u64;
119
120        if new_tokens == 0 {
121            return;
122        }
123
124        if self
125            .last_refill
126            .compare_exchange(last, now, Ordering::AcqRel, Ordering::Relaxed)
127            .is_ok()
128        {
129            loop {
130                let current = self.tokens.load(Ordering::Acquire);
131                let new_total = (current + new_tokens).min(self.capacity);
132
133                if self
134                    .tokens
135                    .compare_exchange_weak(current, new_total, Ordering::AcqRel, Ordering::Relaxed)
136                    .is_ok()
137                {
138                    break;
139                }
140            }
141        }
142    }
143
144    /// Get current token count
145    pub fn available(&self) -> u64 {
146        self.refill();
147        self.tokens.load(Ordering::Relaxed)
148    }
149
150    /// Get statistics
151    pub fn stats(&self) -> TokenBucketStatsSnapshot {
152        TokenBucketStatsSnapshot {
153            acquired: self.stats.acquired.load(Ordering::Relaxed),
154            rejected: self.stats.rejected.load(Ordering::Relaxed),
155            available: self.available(),
156            capacity: self.capacity,
157        }
158    }
159}
160
161#[derive(Debug)]
162struct TokenBucketStats {
163    acquired: AtomicU64,
164    rejected: AtomicU64,
165}
166
167impl TokenBucketStats {
168    fn new() -> Self {
169        Self {
170            acquired: AtomicU64::new(0),
171            rejected: AtomicU64::new(0),
172        }
173    }
174}
175
176#[derive(Debug, Clone)]
177pub struct TokenBucketStatsSnapshot {
178    pub acquired: u64,
179    pub rejected: u64,
180    pub available: u64,
181    pub capacity: u64,
182}
183
184/// Credit-based flow control for receiver-driven backpressure
185/// Receivers grant credits to senders, senders can only send up to credit limit
186#[derive(Debug)]
187pub struct CreditFlowControl {
188    /// Available credits
189    credits: AtomicU64,
190    /// Maximum credits
191    max_credits: u64,
192    /// Notify when credits become available
193    credit_notify: Notify,
194    /// Statistics
195    stats: CreditStats,
196}
197
198impl CreditFlowControl {
199    /// Create new credit flow control
200    pub fn new(initial_credits: u64, max_credits: u64) -> Self {
201        Self {
202            credits: AtomicU64::new(initial_credits),
203            max_credits,
204            credit_notify: Notify::new(),
205            stats: CreditStats::new(),
206        }
207    }
208
209    /// Try to consume credits without blocking
210    pub fn try_consume(&self, count: u64) -> bool {
211        loop {
212            let current = self.credits.load(Ordering::Acquire);
213            if current < count {
214                self.stats.blocked.fetch_add(1, Ordering::Relaxed);
215                return false;
216            }
217
218            if self
219                .credits
220                .compare_exchange_weak(
221                    current,
222                    current - count,
223                    Ordering::AcqRel,
224                    Ordering::Relaxed,
225                )
226                .is_ok()
227            {
228                self.stats.consumed.fetch_add(count, Ordering::Relaxed);
229                return true;
230            }
231        }
232    }
233
234    /// Consume credits, blocking if necessary
235    pub async fn consume(&self, count: u64) {
236        while !self.try_consume(count) {
237            self.credit_notify.notified().await;
238        }
239    }
240
241    /// Grant credits (called by receiver)
242    pub fn grant(&self, count: u64) {
243        loop {
244            let current = self.credits.load(Ordering::Acquire);
245            let new_total = (current + count).min(self.max_credits);
246
247            if self
248                .credits
249                .compare_exchange_weak(current, new_total, Ordering::AcqRel, Ordering::Relaxed)
250                .is_ok()
251            {
252                self.stats.granted.fetch_add(count, Ordering::Relaxed);
253                self.credit_notify.notify_waiters();
254                break;
255            }
256        }
257    }
258
259    /// Get available credits
260    pub fn available(&self) -> u64 {
261        self.credits.load(Ordering::Relaxed)
262    }
263
264    /// Get statistics
265    pub fn stats(&self) -> CreditStatsSnapshot {
266        CreditStatsSnapshot {
267            consumed: self.stats.consumed.load(Ordering::Relaxed),
268            granted: self.stats.granted.load(Ordering::Relaxed),
269            blocked: self.stats.blocked.load(Ordering::Relaxed),
270            available: self.available(),
271        }
272    }
273}
274
275#[derive(Debug)]
276struct CreditStats {
277    consumed: AtomicU64,
278    granted: AtomicU64,
279    blocked: AtomicU64,
280}
281
282impl CreditStats {
283    fn new() -> Self {
284        Self {
285            consumed: AtomicU64::new(0),
286            granted: AtomicU64::new(0),
287            blocked: AtomicU64::new(0),
288        }
289    }
290}
291
292#[derive(Debug, Clone)]
293pub struct CreditStatsSnapshot {
294    pub consumed: u64,
295    pub granted: u64,
296    pub blocked: u64,
297    pub available: u64,
298}
299
300/// Adaptive rate limiter that adjusts based on latency feedback
301/// Uses AIMD (Additive Increase Multiplicative Decrease) algorithm
302#[derive(Debug)]
303pub struct AdaptiveRateLimiter {
304    /// Current rate limit (requests per second)
305    rate: AtomicU64,
306    /// Minimum rate
307    min_rate: u64,
308    /// Maximum rate
309    max_rate: u64,
310    /// Target latency (microseconds)
311    target_latency_us: u64,
312    /// Additive increase factor
313    additive_increase: u64,
314    /// Multiplicative decrease factor (0.5 = halve on overload)
315    multiplicative_decrease: f64,
316    /// Inner token bucket for actual rate limiting
317    bucket: TokenBucket,
318    /// Latency samples for adaptation
319    latency_samples: RwLock<LatencySamples>,
320    /// Statistics
321    stats: AdaptiveStats,
322}
323
324#[derive(Debug)]
325struct LatencySamples {
326    samples: Vec<u64>,
327    index: usize,
328    filled: bool,
329}
330
331impl LatencySamples {
332    fn new(size: usize) -> Self {
333        Self {
334            samples: vec![0; size],
335            index: 0,
336            filled: false,
337        }
338    }
339
340    fn add(&mut self, latency_us: u64) {
341        self.samples[self.index] = latency_us;
342        self.index = (self.index + 1) % self.samples.len();
343        if self.index == 0 {
344            self.filled = true;
345        }
346    }
347
348    fn percentile(&self, p: f64) -> u64 {
349        let count = if self.filled {
350            self.samples.len()
351        } else {
352            self.index
353        };
354        if count == 0 {
355            return 0;
356        }
357
358        let mut sorted: Vec<u64> = self.samples[..count].to_vec();
359        sorted.sort_unstable();
360
361        let idx = ((count as f64 * p) as usize).min(count - 1);
362        sorted[idx]
363    }
364}
365
366impl AdaptiveRateLimiter {
367    /// Create new adaptive rate limiter
368    pub fn new(config: AdaptiveRateLimiterConfig) -> Self {
369        let bucket = TokenBucket::new(config.initial_rate, config.initial_rate as f64);
370
371        Self {
372            rate: AtomicU64::new(config.initial_rate),
373            min_rate: config.min_rate,
374            max_rate: config.max_rate,
375            target_latency_us: config.target_latency_us,
376            additive_increase: config.additive_increase,
377            multiplicative_decrease: config.multiplicative_decrease,
378            bucket,
379            latency_samples: RwLock::new(LatencySamples::new(100)),
380            stats: AdaptiveStats::new(),
381        }
382    }
383
384    /// Try to acquire permission
385    pub fn try_acquire(&self) -> bool {
386        self.bucket.try_acquire(1)
387    }
388
389    /// Acquire permission, blocking if necessary
390    pub async fn acquire(&self) {
391        self.bucket.acquire(1).await
392    }
393
394    /// Record latency feedback for adaptation
395    pub fn record_latency(&self, latency: Duration) {
396        let latency_us = latency.as_micros() as u64;
397
398        {
399            let mut samples = self.latency_samples.write();
400            samples.add(latency_us);
401        }
402
403        // Adapt rate based on p99 latency
404        let p99 = {
405            let samples = self.latency_samples.read();
406            samples.percentile(0.99)
407        };
408
409        let current_rate = self.rate.load(Ordering::Relaxed);
410
411        if p99 > self.target_latency_us {
412            // Decrease rate (multiplicative)
413            let new_rate =
414                ((current_rate as f64 * self.multiplicative_decrease) as u64).max(self.min_rate);
415            self.rate.store(new_rate, Ordering::Relaxed);
416            self.bucket.update_rate(new_rate as f64);
417            self.stats.decreases.fetch_add(1, Ordering::Relaxed);
418        } else if p99 < self.target_latency_us / 2 {
419            // Increase rate (additive)
420            let new_rate = (current_rate + self.additive_increase).min(self.max_rate);
421            self.rate.store(new_rate, Ordering::Relaxed);
422            self.bucket.update_rate(new_rate as f64);
423            self.stats.increases.fetch_add(1, Ordering::Relaxed);
424        }
425    }
426
427    /// Get current rate limit
428    pub fn current_rate(&self) -> u64 {
429        self.rate.load(Ordering::Relaxed)
430    }
431
432    /// Get statistics
433    pub fn stats(&self) -> AdaptiveStatsSnapshot {
434        let samples = self.latency_samples.read();
435        AdaptiveStatsSnapshot {
436            current_rate: self.current_rate(),
437            increases: self.stats.increases.load(Ordering::Relaxed),
438            decreases: self.stats.decreases.load(Ordering::Relaxed),
439            p50_latency_us: samples.percentile(0.5),
440            p99_latency_us: samples.percentile(0.99),
441        }
442    }
443}
444
445/// Configuration for adaptive rate limiter
446#[derive(Debug, Clone)]
447pub struct AdaptiveRateLimiterConfig {
448    pub initial_rate: u64,
449    pub min_rate: u64,
450    pub max_rate: u64,
451    pub target_latency_us: u64,
452    pub additive_increase: u64,
453    pub multiplicative_decrease: f64,
454}
455
456impl Default for AdaptiveRateLimiterConfig {
457    fn default() -> Self {
458        Self {
459            initial_rate: 10000,
460            min_rate: 100,
461            max_rate: 1000000,
462            target_latency_us: 10000, // 10ms
463            additive_increase: 100,
464            multiplicative_decrease: 0.5,
465        }
466    }
467}
468
469#[derive(Debug)]
470struct AdaptiveStats {
471    increases: AtomicU64,
472    decreases: AtomicU64,
473}
474
475impl AdaptiveStats {
476    fn new() -> Self {
477        Self {
478            increases: AtomicU64::new(0),
479            decreases: AtomicU64::new(0),
480        }
481    }
482}
483
484#[derive(Debug, Clone)]
485pub struct AdaptiveStatsSnapshot {
486    pub current_rate: u64,
487    pub increases: u64,
488    pub decreases: u64,
489    pub p50_latency_us: u64,
490    pub p99_latency_us: u64,
491}
492
493/// Circuit breaker for fail-fast on downstream issues
494#[derive(Debug)]
495pub struct CircuitBreaker {
496    /// Current state
497    state: RwLock<CircuitState>,
498    /// Configuration
499    config: CircuitBreakerConfig,
500    /// Statistics
501    stats: CircuitBreakerStats,
502    /// Start of current failure window
503    window_start: parking_lot::Mutex<Instant>,
504}
505
506#[derive(Debug, Clone, Copy, PartialEq, Eq)]
507pub enum CircuitState {
508    /// Normal operation
509    Closed,
510    /// Failing, rejecting all requests
511    Open { opened_at: Instant },
512    /// Testing if downstream recovered
513    HalfOpen,
514}
515
516/// Circuit breaker configuration
517#[derive(Debug, Clone)]
518pub struct CircuitBreakerConfig {
519    /// Number of failures before opening circuit
520    pub failure_threshold: u32,
521    /// Time window for counting failures
522    pub failure_window: Duration,
523    /// Time to wait before trying half-open
524    pub recovery_timeout: Duration,
525    /// Number of successes in half-open to close
526    pub success_threshold: u32,
527}
528
529impl Default for CircuitBreakerConfig {
530    fn default() -> Self {
531        Self {
532            failure_threshold: 5,
533            failure_window: Duration::from_secs(60),
534            recovery_timeout: Duration::from_secs(30),
535            success_threshold: 3,
536        }
537    }
538}
539
540impl CircuitBreaker {
541    /// Create new circuit breaker
542    pub fn new(config: CircuitBreakerConfig) -> Self {
543        Self {
544            state: RwLock::new(CircuitState::Closed),
545            config,
546            stats: CircuitBreakerStats::new(),
547            window_start: parking_lot::Mutex::new(Instant::now()),
548        }
549    }
550
551    /// Check if request is allowed
552    pub fn allow(&self) -> bool {
553        let state = *self.state.read();
554
555        match state {
556            CircuitState::Closed => {
557                self.stats.allowed.fetch_add(1, Ordering::Relaxed);
558                true
559            }
560            CircuitState::Open { opened_at } => {
561                if opened_at.elapsed() > self.config.recovery_timeout {
562                    // Transition to half-open
563                    *self.state.write() = CircuitState::HalfOpen;
564                    self.stats.allowed.fetch_add(1, Ordering::Relaxed);
565                    true
566                } else {
567                    self.stats.rejected.fetch_add(1, Ordering::Relaxed);
568                    false
569                }
570            }
571            CircuitState::HalfOpen => {
572                // Allow limited requests in half-open
573                self.stats.allowed.fetch_add(1, Ordering::Relaxed);
574                true
575            }
576        }
577    }
578
579    /// Record a successful operation
580    pub fn record_success(&self) {
581        let mut state = self.state.write();
582        self.stats.successes.fetch_add(1, Ordering::Relaxed);
583
584        match *state {
585            CircuitState::HalfOpen => {
586                let successes = self
587                    .stats
588                    .half_open_successes
589                    .fetch_add(1, Ordering::Relaxed)
590                    + 1;
591                if successes >= self.config.success_threshold as u64 {
592                    *state = CircuitState::Closed;
593                    self.stats.half_open_successes.store(0, Ordering::Relaxed);
594                    self.stats.failures_in_window.store(0, Ordering::Relaxed);
595                }
596            }
597            CircuitState::Closed => {
598                // Reset failure count on success
599                self.stats.failures_in_window.store(0, Ordering::Relaxed);
600            }
601            _ => {}
602        }
603    }
604
605    /// Record a failed operation
606    pub fn record_failure(&self) {
607        let mut state = self.state.write();
608        self.stats.failures.fetch_add(1, Ordering::Relaxed);
609
610        match *state {
611            CircuitState::Closed => {
612                // Check if failure window has expired; if so, reset counter
613                let mut ws = self.window_start.lock();
614                if ws.elapsed() > self.config.failure_window {
615                    self.stats.failures_in_window.store(1, Ordering::Relaxed);
616                    *ws = Instant::now();
617                } else {
618                    let failures = self
619                        .stats
620                        .failures_in_window
621                        .fetch_add(1, Ordering::Relaxed)
622                        + 1;
623                    if failures >= self.config.failure_threshold as u64 {
624                        *state = CircuitState::Open {
625                            opened_at: Instant::now(),
626                        };
627                        self.stats.opens.fetch_add(1, Ordering::Relaxed);
628                    }
629                }
630            }
631            CircuitState::HalfOpen => {
632                // Any failure in half-open reopens circuit
633                *state = CircuitState::Open {
634                    opened_at: Instant::now(),
635                };
636                self.stats.half_open_successes.store(0, Ordering::Relaxed);
637                self.stats.opens.fetch_add(1, Ordering::Relaxed);
638            }
639            _ => {}
640        }
641    }
642
643    /// Get current state
644    pub fn state(&self) -> CircuitState {
645        *self.state.read()
646    }
647
648    /// Get statistics
649    pub fn stats(&self) -> CircuitBreakerStatsSnapshot {
650        CircuitBreakerStatsSnapshot {
651            state: self.state(),
652            allowed: self.stats.allowed.load(Ordering::Relaxed),
653            rejected: self.stats.rejected.load(Ordering::Relaxed),
654            successes: self.stats.successes.load(Ordering::Relaxed),
655            failures: self.stats.failures.load(Ordering::Relaxed),
656            opens: self.stats.opens.load(Ordering::Relaxed),
657        }
658    }
659}
660
661#[derive(Debug)]
662struct CircuitBreakerStats {
663    allowed: AtomicU64,
664    rejected: AtomicU64,
665    successes: AtomicU64,
666    failures: AtomicU64,
667    failures_in_window: AtomicU64,
668    half_open_successes: AtomicU64,
669    opens: AtomicU64,
670}
671
672impl CircuitBreakerStats {
673    fn new() -> Self {
674        Self {
675            allowed: AtomicU64::new(0),
676            rejected: AtomicU64::new(0),
677            successes: AtomicU64::new(0),
678            failures: AtomicU64::new(0),
679            failures_in_window: AtomicU64::new(0),
680            half_open_successes: AtomicU64::new(0),
681            opens: AtomicU64::new(0),
682        }
683    }
684}
685
686#[derive(Debug, Clone)]
687pub struct CircuitBreakerStatsSnapshot {
688    pub state: CircuitState,
689    pub allowed: u64,
690    pub rejected: u64,
691    pub successes: u64,
692    pub failures: u64,
693    pub opens: u64,
694}
695
696/// Bounded channel with backpressure
697/// Provides async send that blocks when buffer is full
698pub struct BackpressureChannel<T> {
699    /// Inner sender
700    tx: mpsc::Sender<T>,
701    /// Inner receiver
702    rx: Mutex<mpsc::Receiver<T>>,
703    /// Semaphore for backpressure
704    permits: Arc<Semaphore>,
705    /// Capacity
706    capacity: usize,
707    /// Statistics
708    stats: ChannelStats,
709}
710
711impl<T> BackpressureChannel<T> {
712    /// Create a new backpressure channel
713    pub fn new(capacity: usize) -> Self {
714        // Use semaphore to track capacity, but start at 0 (no items in channel)
715        let (tx, rx) = mpsc::channel(capacity);
716
717        Self {
718            tx,
719            rx: Mutex::new(rx),
720            permits: Arc::new(Semaphore::new(0)), // Start at 0, add permits when items are sent
721            capacity,
722            stats: ChannelStats::new(),
723        }
724    }
725
726    /// Send a value, blocking if buffer is full
727    pub async fn send(&self, value: T) -> Result<(), mpsc::error::SendError<T>> {
728        self.stats.sent.fetch_add(1, Ordering::Relaxed);
729        let result = self.tx.send(value).await;
730        if result.is_ok() {
731            // Add permit to signal item is in channel
732            self.permits.add_permits(1);
733        }
734        result
735    }
736
737    /// Try to send without blocking
738    pub fn try_send(&self, value: T) -> Result<(), mpsc::error::TrySendError<T>> {
739        let result = self.tx.try_send(value);
740        match &result {
741            Ok(()) => {
742                self.stats.sent.fetch_add(1, Ordering::Relaxed);
743                self.permits.add_permits(1);
744            }
745            Err(mpsc::error::TrySendError::Full(_)) => {
746                self.stats.blocked.fetch_add(1, Ordering::Relaxed);
747            }
748            _ => {}
749        }
750        result
751    }
752
753    /// Receive a value
754    pub async fn recv(&self) -> Option<T> {
755        // Acquire permit (blocks until item available)
756        let permit = self.permits.acquire().await.ok()?;
757        permit.forget(); // Don't release, we're consuming the item
758
759        let mut rx = self.rx.lock().await;
760        let result = rx.recv().await;
761
762        if result.is_some() {
763            self.stats.received.fetch_add(1, Ordering::Relaxed);
764        }
765
766        result
767    }
768
769    /// Get current queue length
770    pub fn len(&self) -> usize {
771        self.permits.available_permits()
772    }
773
774    /// Check if channel is empty
775    pub fn is_empty(&self) -> bool {
776        self.len() == 0
777    }
778
779    /// Get statistics
780    pub fn stats(&self) -> ChannelStatsSnapshot {
781        ChannelStatsSnapshot {
782            sent: self.stats.sent.load(Ordering::Relaxed),
783            received: self.stats.received.load(Ordering::Relaxed),
784            blocked: self.stats.blocked.load(Ordering::Relaxed),
785            current_len: self.len(),
786            capacity: self.capacity,
787        }
788    }
789}
790
791struct ChannelStats {
792    sent: AtomicU64,
793    received: AtomicU64,
794    blocked: AtomicU64,
795}
796
797impl ChannelStats {
798    fn new() -> Self {
799        Self {
800            sent: AtomicU64::new(0),
801            received: AtomicU64::new(0),
802            blocked: AtomicU64::new(0),
803        }
804    }
805}
806
807#[derive(Debug, Clone)]
808pub struct ChannelStatsSnapshot {
809    pub sent: u64,
810    pub received: u64,
811    pub blocked: u64,
812    pub current_len: usize,
813    pub capacity: usize,
814}
815
816/// Windowed rate tracker for monitoring request rates
817pub struct WindowedRateTracker {
818    /// Window buckets
819    buckets: RwLock<Vec<AtomicU64>>,
820    /// Bucket duration
821    bucket_duration: Duration,
822    /// Number of buckets
823    num_buckets: usize,
824    /// Current bucket index
825    current_bucket: AtomicUsize,
826    /// Last bucket rotation time
827    last_rotation: RwLock<Instant>,
828}
829
830impl WindowedRateTracker {
831    /// Create a new windowed rate tracker
832    pub fn new(window_duration: Duration, num_buckets: usize) -> Self {
833        let buckets: Vec<AtomicU64> = (0..num_buckets).map(|_| AtomicU64::new(0)).collect();
834
835        Self {
836            buckets: RwLock::new(buckets),
837            bucket_duration: window_duration / num_buckets as u32,
838            num_buckets,
839            current_bucket: AtomicUsize::new(0),
840            last_rotation: RwLock::new(Instant::now()),
841        }
842    }
843
844    /// Record a request
845    pub fn record(&self, count: u64) {
846        self.maybe_rotate();
847
848        let buckets = self.buckets.read();
849        let idx = self.current_bucket.load(Ordering::Relaxed);
850        buckets[idx].fetch_add(count, Ordering::Relaxed);
851    }
852
853    /// Get rate over the window
854    pub fn rate(&self) -> f64 {
855        self.maybe_rotate();
856
857        let buckets = self.buckets.read();
858        let total: u64 = buckets.iter().map(|b| b.load(Ordering::Relaxed)).sum();
859
860        let window_secs = self.bucket_duration.as_secs_f64() * self.num_buckets as f64;
861        total as f64 / window_secs
862    }
863
864    /// Get total count in window
865    pub fn total(&self) -> u64 {
866        self.maybe_rotate();
867
868        let buckets = self.buckets.read();
869        buckets.iter().map(|b| b.load(Ordering::Relaxed)).sum()
870    }
871
872    /// Maybe rotate to next bucket
873    fn maybe_rotate(&self) {
874        let now = Instant::now();
875        let elapsed = {
876            let last = self.last_rotation.read();
877            now.duration_since(*last)
878        };
879
880        if elapsed < self.bucket_duration {
881            return;
882        }
883
884        let buckets_to_rotate =
885            (elapsed.as_secs_f64() / self.bucket_duration.as_secs_f64()) as usize;
886        if buckets_to_rotate == 0 {
887            return;
888        }
889
890        // Update last rotation
891        {
892            let mut last = self.last_rotation.write();
893            *last = now;
894        }
895
896        let buckets = self.buckets.read();
897
898        // Rotate buckets
899        for _ in 0..buckets_to_rotate.min(self.num_buckets) {
900            let next = (self.current_bucket.load(Ordering::Relaxed) + 1) % self.num_buckets;
901            buckets[next].store(0, Ordering::Relaxed);
902            self.current_bucket.store(next, Ordering::Relaxed);
903        }
904    }
905}
906
907#[cfg(test)]
908mod tests {
909    use super::*;
910
911    #[test]
912    fn test_token_bucket_basic() {
913        let bucket = TokenBucket::new(10, 10.0);
914
915        // Should be able to acquire up to capacity
916        assert!(bucket.try_acquire(5));
917        assert!(bucket.try_acquire(5));
918
919        // Should fail when empty
920        assert!(!bucket.try_acquire(1));
921
922        let stats = bucket.stats();
923        assert_eq!(stats.acquired, 10);
924        assert_eq!(stats.rejected, 1);
925    }
926
927    #[tokio::test]
928    async fn test_token_bucket_refill() {
929        // Use larger capacity so refill doesn't cap out
930        let bucket = TokenBucket::new(1000, 100.0); // 1000 capacity, 100 tokens/sec
931
932        // Drain bucket
933        bucket.try_acquire(1000);
934        assert!(!bucket.try_acquire(1));
935
936        // Wait for refill (50ms should give us ~5 tokens)
937        tokio::time::sleep(Duration::from_millis(50)).await;
938
939        // Should have some tokens now (at least 4 with 100/sec rate over 50ms)
940        let available = bucket.available();
941        assert!(
942            available >= 4,
943            "Expected at least 4 tokens, got {}",
944            available
945        );
946    }
947
948    #[test]
949    fn test_credit_flow_control() {
950        let flow = CreditFlowControl::new(10, 100);
951
952        // Consume credits
953        assert!(flow.try_consume(5));
954        assert!(flow.try_consume(5));
955        assert!(!flow.try_consume(1));
956
957        assert_eq!(flow.available(), 0);
958
959        // Grant credits
960        flow.grant(5);
961        assert_eq!(flow.available(), 5);
962        assert!(flow.try_consume(5));
963    }
964
965    #[test]
966    fn test_adaptive_rate_limiter() {
967        let config = AdaptiveRateLimiterConfig {
968            initial_rate: 1000,
969            min_rate: 100,
970            max_rate: 10000,
971            target_latency_us: 10000,
972            additive_increase: 100,
973            multiplicative_decrease: 0.5,
974        };
975
976        let limiter = AdaptiveRateLimiter::new(config);
977
978        // Initial rate
979        assert_eq!(limiter.current_rate(), 1000);
980
981        // Record high latency -> rate should decrease
982        for _ in 0..100 {
983            limiter.record_latency(Duration::from_millis(20));
984        }
985        assert!(limiter.current_rate() < 1000);
986    }
987
988    #[test]
989    fn test_circuit_breaker_closed() {
990        let breaker = CircuitBreaker::new(CircuitBreakerConfig {
991            failure_threshold: 3,
992            ..Default::default()
993        });
994
995        // Normal operation
996        assert!(breaker.allow());
997        breaker.record_success();
998
999        assert_eq!(breaker.state(), CircuitState::Closed);
1000    }
1001
1002    #[test]
1003    fn test_circuit_breaker_opens() {
1004        let breaker = CircuitBreaker::new(CircuitBreakerConfig {
1005            failure_threshold: 3,
1006            ..Default::default()
1007        });
1008
1009        // Trigger failures
1010        for _ in 0..3 {
1011            assert!(breaker.allow());
1012            breaker.record_failure();
1013        }
1014
1015        // Circuit should be open
1016        match breaker.state() {
1017            CircuitState::Open { .. } => {}
1018            _ => panic!("Expected open state"),
1019        }
1020
1021        // New requests should be rejected
1022        assert!(!breaker.allow());
1023    }
1024
1025    #[tokio::test]
1026    async fn test_backpressure_channel() {
1027        let channel = BackpressureChannel::new(3);
1028
1029        // Fill channel
1030        channel.send(1).await.unwrap();
1031        channel.send(2).await.unwrap();
1032        channel.send(3).await.unwrap();
1033
1034        assert_eq!(channel.len(), 3);
1035
1036        // Try send should fail
1037        assert!(channel.try_send(4).is_err());
1038
1039        // Receive should work
1040        assert_eq!(channel.recv().await, Some(1));
1041        assert_eq!(channel.len(), 2);
1042
1043        // Now we can send again
1044        channel.send(4).await.unwrap();
1045    }
1046
1047    #[test]
1048    fn test_windowed_rate_tracker() {
1049        let tracker = WindowedRateTracker::new(Duration::from_secs(1), 10);
1050
1051        // Record some requests
1052        tracker.record(100);
1053        tracker.record(100);
1054
1055        assert_eq!(tracker.total(), 200);
1056        assert!(tracker.rate() > 0.0);
1057    }
1058}