Skip to main content

aura_core/effects/
reliability.rs

1//! Reliability Effects
2//!
3//! Provides reliability patterns for fault-tolerant operation in distributed systems.
4//! These effects enable retry logic, circuit breaking, and graceful degradation
5//! while maintaining the stateless, composable nature of the effect system.
6//! Includes BackoffStrategy, RetryPolicy, and helper types for retry patterns.
7//!
8//! # Effect Classification
9//!
10//! - **Category**: Infrastructure Effect
11//! - **Implementation**: `aura-effects` (Layer 3)
12//! - **Usage**: Generic reliability patterns (retry, circuit breakers, rate limiting)
13//!
14//! This is an infrastructure effect providing generic reliability patterns used
15//! across distributed systems. No Aura-specific semantics. Implementations should
16//! be stateless and work through explicit dependency injection.
17
18use crate::effects::time::PhysicalTimeEffects;
19use crate::AuraError;
20use async_trait::async_trait;
21use serde::{Deserialize, Serialize};
22use std::future::Future;
23use std::time::Duration;
24
25cfg_if::cfg_if! {
26    if #[cfg(target_arch = "wasm32")] {
27        type MonotonicInstant = web_time::Instant;
28    } else {
29        type MonotonicInstant = std::time::Instant;
30    }
31}
32
33/// Reliability operations for fault tolerance and graceful degradation
34///
35/// This trait provides pure reliability primitives that can be composed
36/// to create resilient distributed systems. All operations are coordination
37/// patterns that work with other effects through explicit composition.
38#[async_trait]
39pub trait ReliabilityEffects {
40    /// Execute an operation with retry logic and exponential backoff
41    ///
42    /// # Arguments
43    /// * `operation` - The async operation to execute
44    /// * `max_attempts` - Maximum number of retry attempts
45    /// * `base_delay` - Base delay before first retry
46    /// * `max_delay` - Maximum delay between retries
47    ///
48    /// # Returns
49    /// The result of the first successful attempt, or the final error
50    async fn with_retry<T, F, Fut>(
51        &self,
52        operation: F,
53        max_attempts: u32,
54        base_delay: Duration,
55        max_delay: Duration,
56    ) -> Result<T, ReliabilityError>
57    where
58        F: Fn() -> Fut + Send,
59        Fut: std::future::Future<Output = Result<T, AuraError>> + Send,
60        T: Send;
61
62    /// Execute an operation with circuit breaker protection
63    ///
64    /// Circuit breaker prevents cascading failures by failing fast when
65    /// error rate exceeds threshold. Automatically recovers when operation
66    /// starts succeeding again.
67    ///
68    /// # Arguments
69    /// * `operation` - The async operation to execute
70    /// * `circuit_id` - Unique identifier for this circuit
71    /// * `failure_threshold` - Number of failures before opening circuit
72    /// * `timeout` - How long to keep circuit open before trying again
73    ///
74    /// # Returns
75    /// The operation result, or circuit breaker error if circuit is open
76    async fn with_circuit_breaker<T, F, Fut>(
77        &self,
78        operation: F,
79        circuit_id: &str,
80        failure_threshold: u32,
81        timeout: Duration,
82    ) -> Result<T, ReliabilityError>
83    where
84        F: Fn() -> Fut + Send,
85        Fut: std::future::Future<Output = Result<T, AuraError>> + Send,
86        T: Send;
87
88    /// Execute an operation with timeout protection
89    ///
90    /// Ensures that operations don't hang indefinitely by cancelling
91    /// them after a specified timeout period.
92    ///
93    /// # Arguments
94    /// * `operation` - The async operation to execute
95    /// * `timeout` - Maximum time to wait for completion
96    ///
97    /// # Returns
98    /// The operation result, or timeout error if operation takes too long
99    async fn with_timeout<T, F, Fut>(
100        &self,
101        operation: F,
102        timeout: Duration,
103    ) -> Result<T, ReliabilityError>
104    where
105        F: Fn() -> Fut + Send,
106        Fut: std::future::Future<Output = Result<T, AuraError>> + Send,
107        T: Send;
108
109    /// Execute an operation with rate limiting
110    ///
111    /// Prevents resource exhaustion by limiting the rate at which
112    /// operations can be executed.
113    ///
114    /// # Arguments
115    /// * `operation` - The async operation to execute
116    /// * `rate_limit_id` - Unique identifier for this rate limit
117    /// * `max_operations_per_second` - Maximum operations per second
118    ///
119    /// # Returns
120    /// The operation result, or rate limit error if limit is exceeded
121    async fn with_rate_limit<T, F, Fut>(
122        &self,
123        operation: F,
124        rate_limit_id: &str,
125        max_operations_per_second: f64,
126    ) -> Result<T, ReliabilityError>
127    where
128        F: Fn() -> Fut + Send,
129        Fut: std::future::Future<Output = Result<T, AuraError>> + Send,
130        T: Send;
131}
132
133/// Errors that can occur during reliability operations
134#[derive(Debug, thiserror::Error)]
135pub enum ReliabilityError {
136    /// Maximum retry attempts exceeded
137    #[error("Operation failed after {attempts} attempts: {last_error}")]
138    RetryExhausted {
139        attempts: u32,
140        last_error: AuraError,
141    },
142
143    /// Circuit breaker is open
144    #[error("Circuit breaker '{circuit_id}' is open, failing fast")]
145    CircuitBreakerOpen { circuit_id: String },
146
147    /// Operation timed out
148    #[error("Operation timed out after {timeout:?}")]
149    Timeout { timeout: Duration },
150
151    /// Rate limit exceeded
152    #[error("Rate limit '{rate_limit_id}' exceeded: {max_rate} ops/sec")]
153    RateLimitExceeded {
154        rate_limit_id: String,
155        max_rate: f64,
156    },
157
158    /// Underlying operation error
159    #[error("Operation failed: {0}")]
160    OperationError(#[from] AuraError),
161}
162
163// =============================================================================
164// Unified Retry Implementation
165// =============================================================================
166
167/// Backoff strategy for retry delays
168///
169/// This enum replaces duplicate backoff strategies in aura-sync, aura-agent,
170/// and provides a single source of truth for retry delays.
171#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
172pub enum BackoffStrategy {
173    /// Fixed delay between retries
174    Fixed,
175    /// Linear increase: delay * attempt
176    Linear,
177    /// Exponential increase: delay * 2^attempt
178    Exponential,
179    /// Exponential with jitter to prevent thundering herd
180    ExponentialWithJitter,
181}
182
183impl BackoffStrategy {
184    /// Calculate delay for a given attempt number
185    ///
186    /// # Arguments
187    /// - `attempt`: Zero-based attempt number (0 = first retry)
188    /// - `initial_delay`: Base delay duration
189    /// - `max_delay`: Maximum delay duration
190    pub fn calculate_delay(
191        &self,
192        attempt: u32,
193        initial_delay: Duration,
194        max_delay: Duration,
195    ) -> Duration {
196        // Removed rand import since we now use deterministic jitter
197
198        let delay = match self {
199            BackoffStrategy::Fixed => initial_delay,
200            BackoffStrategy::Linear => initial_delay * (attempt + 1),
201            BackoffStrategy::Exponential => {
202                let multiplier = 2u32.saturating_pow(attempt);
203                initial_delay * multiplier
204            }
205            BackoffStrategy::ExponentialWithJitter => {
206                let base_delay = initial_delay * 2u32.saturating_pow(attempt);
207                // Deterministic jitter using attempt count as pseudo-entropy source.
208                // Formula: 10% of base_delay × (attempt × 0.1 mod 1.0)
209                // WHY: Avoids ambient RNG (pure function), provides ~0-10% variance,
210                // and the modulo ensures bounded output regardless of attempt count.
211                // This decorrelates retry bursts without external randomness.
212                let jitter =
213                    (base_delay.as_millis() as f64 * 0.1 * (attempt as f64 * 0.1 % 1.0)) as u64;
214                base_delay + Duration::from_millis(jitter)
215            }
216        };
217
218        delay.min(max_delay)
219    }
220}
221
222/// Jitter mode for retry delays
223///
224/// Controls whether and how jitter is applied to retry delays
225/// to prevent thundering herd effects.
226#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
227pub enum JitterMode {
228    /// No jitter applied to delays
229    #[default]
230    None,
231    /// Apply deterministic jitter based on attempt number
232    Deterministic,
233}
234
235/// Retry policy configuration
236///
237/// This struct replaces duplicate retry policies across crates,
238/// providing a unified builder pattern for configuring retry behavior.
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct RetryPolicy {
241    /// Maximum number of retry attempts (0 = no retries)
242    pub max_attempts: u32,
243    /// Initial delay before first retry
244    pub initial_delay: Duration,
245    /// Maximum delay between retries
246    pub max_delay: Duration,
247    /// Backoff strategy to use
248    pub strategy: BackoffStrategy,
249    /// Jitter mode for delays
250    pub jitter: JitterMode,
251    /// Timeout for individual retry attempts
252    pub timeout: Option<Duration>,
253}
254
255impl RetryPolicy {
256    /// Create a new retry policy with exponential backoff
257    #[must_use]
258    pub fn exponential() -> Self {
259        Self {
260            max_attempts: 3,
261            initial_delay: Duration::from_millis(100),
262            max_delay: Duration::from_secs(30),
263            strategy: BackoffStrategy::Exponential,
264            jitter: JitterMode::None,
265            timeout: None,
266        }
267    }
268
269    /// Create a retry policy with fixed delay
270    #[must_use]
271    pub fn fixed(delay: Duration) -> Self {
272        Self {
273            max_attempts: 3,
274            initial_delay: delay,
275            max_delay: delay,
276            strategy: BackoffStrategy::Fixed,
277            jitter: JitterMode::None,
278            timeout: None,
279        }
280    }
281
282    /// Create a retry policy with linear backoff
283    #[must_use]
284    pub fn linear() -> Self {
285        Self {
286            max_attempts: 3,
287            initial_delay: Duration::from_millis(100),
288            max_delay: Duration::from_secs(30),
289            strategy: BackoffStrategy::Linear,
290            jitter: JitterMode::None,
291            timeout: None,
292        }
293    }
294
295    /// Set maximum retry attempts
296    #[must_use]
297    pub fn with_max_attempts(mut self, attempts: u32) -> Self {
298        self.max_attempts = attempts;
299        self
300    }
301
302    /// Set initial delay
303    #[must_use]
304    pub fn with_initial_delay(mut self, delay: Duration) -> Self {
305        self.initial_delay = delay;
306        self
307    }
308
309    /// Set maximum delay
310    #[must_use]
311    pub fn with_max_delay(mut self, delay: Duration) -> Self {
312        self.max_delay = delay;
313        self
314    }
315
316    /// Set jitter mode for delay calculations
317    #[must_use]
318    pub fn with_jitter(mut self, mode: JitterMode) -> Self {
319        self.jitter = mode;
320        if matches!(mode, JitterMode::Deterministic) {
321            self.strategy = BackoffStrategy::ExponentialWithJitter;
322        }
323        self
324    }
325
326    /// Set timeout for individual attempts
327    #[must_use]
328    pub fn with_timeout(mut self, timeout: Duration) -> Self {
329        self.timeout = Some(timeout);
330        self
331    }
332
333    /// Calculate delay for a specific attempt
334    pub fn calculate_delay(&self, attempt: u32) -> Duration {
335        let strategy = match self.jitter {
336            JitterMode::Deterministic => BackoffStrategy::ExponentialWithJitter,
337            JitterMode::None => self.strategy,
338        };
339
340        strategy.calculate_delay(attempt, self.initial_delay, self.max_delay)
341    }
342
343    /// Execute an async operation with retry logic using a caller-provided sleep function.
344    pub async fn execute_with_sleep<F, Fut, T, E, S, SFut>(
345        &self,
346        mut operation: F,
347        mut sleep: S,
348    ) -> Result<T, E>
349    where
350        F: FnMut() -> Fut,
351        Fut: Future<Output = Result<T, E>>,
352        S: FnMut(Duration) -> SFut,
353        SFut: Future<Output = ()>,
354    {
355        let mut attempt = 0;
356
357        loop {
358            match operation().await {
359                Ok(result) => return Ok(result),
360                Err(err) => {
361                    if attempt >= self.max_attempts {
362                        return Err(err);
363                    }
364
365                    let delay = self.calculate_delay(attempt);
366                    sleep(delay).await;
367
368                    attempt += 1;
369                }
370            }
371        }
372    }
373
374    /// Execute with retry logic using an injected time provider.
375    pub async fn execute_with_effects<F, Fut, T, E, Eff>(
376        &self,
377        effects: &Eff,
378        operation: F,
379    ) -> Result<T, E>
380    where
381        F: FnMut() -> Fut,
382        Fut: Future<Output = Result<T, E>>,
383        Eff: PhysicalTimeEffects + Send + Sync,
384    {
385        self.execute_with_sleep(operation, |delay| async move {
386            let _ = effects.sleep_ms(delay.as_millis() as u64).await;
387        })
388        .await
389    }
390
391    /// Execute with caller-provided sleep and timing context for deterministic metrics.
392    pub async fn execute_with_sleep_and_context<F, Fut, T, E, S, SFut>(
393        &self,
394        now: MonotonicInstant,
395        mut operation: F,
396        mut sleep: S,
397    ) -> RetryResult<T, E>
398    where
399        F: FnMut() -> Fut,
400        Fut: Future<Output = Result<T, E>>,
401        S: FnMut(Duration) -> SFut,
402        SFut: Future<Output = ()>,
403    {
404        let start = now;
405        let mut attempt = 0;
406        let mut total_delay = Duration::ZERO;
407
408        loop {
409            match operation().await {
410                Ok(result) => {
411                    return RetryResult {
412                        result: Ok(result),
413                        attempts: attempt + 1,
414                        total_duration: start.elapsed(),
415                        total_retry_delay: total_delay,
416                    };
417                }
418                Err(err) => {
419                    if attempt >= self.max_attempts {
420                        return RetryResult {
421                            result: Err(err),
422                            attempts: attempt + 1,
423                            total_duration: start.elapsed(),
424                            total_retry_delay: total_delay,
425                        };
426                    }
427
428                    let delay = self.calculate_delay(attempt);
429                    total_delay += delay;
430                    sleep(delay).await;
431
432                    attempt += 1;
433                }
434            }
435        }
436    }
437}
438
439impl Default for RetryPolicy {
440    fn default() -> Self {
441        Self::exponential()
442    }
443}
444
445/// Result of a retry operation with statistics
446#[derive(Debug, Clone)]
447pub struct RetryResult<T, E> {
448    /// Final result (success or failure)
449    pub result: Result<T, E>,
450    /// Number of attempts made
451    pub attempts: u32,
452    /// Total duration including retries
453    pub total_duration: Duration,
454    /// Total time spent waiting between retries
455    pub total_retry_delay: Duration,
456}
457
458impl<T, E> RetryResult<T, E> {
459    /// Check if operation succeeded
460    pub fn is_success(&self) -> bool {
461        self.result.is_ok()
462    }
463
464    /// Check if any retries were performed
465    pub fn had_retries(&self) -> bool {
466        self.attempts > 1
467    }
468
469    /// Get the result
470    pub fn into_result(self) -> Result<T, E> {
471        self.result
472    }
473}
474
475impl<T, E: std::fmt::Debug> RetryResult<T, E> {
476    /// Get the success value, panicking on error with a descriptive message
477    #[allow(clippy::expect_used)]
478    pub fn unwrap(self) -> T {
479        self.result
480            .expect("RetryResult should contain a success value")
481    }
482}
483
484/// Context for tracking retry state
485#[derive(Debug, Clone)]
486pub struct RetryContext {
487    /// Current attempt number (0-based)
488    pub attempt: u32,
489    /// Time of first attempt
490    pub started_at: MonotonicInstant,
491    /// Total delay accumulated
492    pub accumulated_delay: Duration,
493    /// Whether this is the last attempt
494    pub is_last_attempt: bool,
495}
496
497impl RetryContext {
498    /// Create a new retry context
499    ///
500    /// # Arguments
501    /// - `now`: Current time instant (obtain from TimeEffects in production)
502    /// - `max_attempts`: Maximum number of retry attempts
503    pub fn new(now: MonotonicInstant, max_attempts: u32) -> Self {
504        Self {
505            attempt: 0,
506            started_at: now,
507            accumulated_delay: Duration::ZERO,
508            is_last_attempt: max_attempts == 0,
509        }
510    }
511
512    /// Advance to next attempt
513    pub fn next_attempt(&mut self, delay: Duration, max_attempts: u32) {
514        self.attempt += 1;
515        self.accumulated_delay += delay;
516        self.is_last_attempt = self.attempt >= max_attempts;
517    }
518
519    /// Get elapsed time since first attempt
520    pub fn elapsed(&self) -> Duration {
521        self.started_at.elapsed()
522    }
523
524    /// Get total time including delays
525    pub fn total_time(&self) -> Duration {
526        self.elapsed()
527    }
528}
529
530// =============================================================================
531// Unified Rate Limiting Implementation
532// =============================================================================
533
534/// Adaptive rate limiting mode
535///
536/// Controls whether rate limits adjust dynamically based on system load.
537#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
538pub enum AdaptiveMode {
539    /// Fixed rate limits, no adaptation
540    Fixed,
541    /// Rate limits adapt based on current load
542    #[default]
543    Adaptive,
544}
545
546/// Rate limiter configuration
547///
548/// This struct replaces duplicate rate limiting configuration in
549/// aura-sync, providing a unified configuration system for all crates.
550#[derive(Debug, Clone, Serialize, Deserialize)]
551pub struct RateLimitConfig {
552    /// Global rate limit (operations per second)
553    pub global_ops_per_second: u32,
554
555    /// Per-peer rate limit (operations per second)
556    pub peer_ops_per_second: u32,
557
558    /// Bucket capacity (maximum burst size)
559    pub bucket_capacity: u32,
560
561    /// Refill rate (tokens per second)
562    pub refill_rate: u32,
563
564    /// Window size for sliding window algorithm
565    pub window_size: Duration,
566
567    /// Adaptive rate limiting mode
568    pub adaptive: AdaptiveMode,
569}
570
571impl Default for RateLimitConfig {
572    fn default() -> Self {
573        Self {
574            global_ops_per_second: 1000,
575            peer_ops_per_second: 100,
576            bucket_capacity: 200,
577            refill_rate: 100,
578            window_size: Duration::from_secs(60),
579            adaptive: AdaptiveMode::Adaptive,
580        }
581    }
582}
583
584/// Rate limit for a specific context
585///
586/// Implements token bucket algorithm for rate limiting with automatic refill.
587#[derive(Debug, Clone, Serialize, Deserialize)]
588pub struct RateLimit {
589    /// Maximum operations per window
590    pub max_operations: u32,
591
592    /// Window duration
593    pub window: Duration,
594
595    /// Current token count (for token bucket)
596    pub tokens: u32,
597
598    /// Last refill time (skipped during serialization, must be set after deserialization)
599    #[serde(skip)]
600    pub last_refill: Option<MonotonicInstant>,
601}
602
603impl RateLimit {
604    /// Create a new rate limit
605    ///
606    /// # Arguments
607    /// - `max_operations`: Maximum operations per window
608    /// - `window`: Window duration
609    /// - `now`: Current time instant (obtain from TimeEffects in production)
610    pub fn new(max_operations: u32, window: Duration, now: MonotonicInstant) -> Self {
611        Self {
612            max_operations,
613            window,
614            tokens: max_operations,
615            last_refill: Some(now),
616        }
617    }
618
619    /// Check if operation is allowed and consume tokens
620    ///
621    /// # Arguments
622    /// - `cost`: Token cost of the operation
623    /// - `refill_rate`: Tokens per second refill rate
624    /// - `now`: Current time instant (obtain from TimeEffects in production)
625    pub fn check_and_consume(
626        &mut self,
627        cost: u32,
628        refill_rate: u32,
629        now: MonotonicInstant,
630    ) -> bool {
631        // Initialize last_refill if not set (after deserialization)
632        let last_refill = self.last_refill.get_or_insert(now);
633
634        // Refill tokens based on elapsed time
635        let elapsed = now.duration_since(*last_refill);
636        let refill_tokens = (elapsed.as_secs_f64() * refill_rate as f64) as u32;
637
638        if refill_tokens > 0 {
639            self.tokens = (self.tokens + refill_tokens).min(self.max_operations);
640            self.last_refill = Some(now);
641        }
642
643        // Check if we have enough tokens
644        if self.tokens >= cost {
645            self.tokens -= cost;
646            true
647        } else {
648            false
649        }
650    }
651
652    /// Get current token count
653    pub fn available_tokens(&self) -> u32 {
654        self.tokens
655    }
656
657    /// Calculate time until tokens are available
658    pub fn time_until_available(&self, cost: u32, refill_rate: u32) -> Option<Duration> {
659        if self.tokens >= cost {
660            return None;
661        }
662
663        let needed = cost - self.tokens;
664        let seconds = needed as f64 / refill_rate as f64;
665
666        Some(Duration::from_secs_f64(seconds))
667    }
668}
669
670/// Result of a rate limit check
671#[derive(Debug, Clone, PartialEq, Eq)]
672pub enum RateLimitResult {
673    /// Operation allowed
674    Allowed,
675
676    /// Operation denied - rate limit exceeded
677    Denied {
678        /// Time to wait before retry
679        retry_after: Duration,
680
681        /// Reason for denial
682        reason: String,
683    },
684}
685
686impl RateLimitResult {
687    /// Check if operation is allowed
688    pub fn is_allowed(&self) -> bool {
689        matches!(self, RateLimitResult::Allowed)
690    }
691
692    /// Get retry-after duration if denied
693    pub fn retry_after(&self) -> Option<Duration> {
694        match self {
695            RateLimitResult::Denied { retry_after, .. } => Some(*retry_after),
696            RateLimitResult::Allowed => None,
697        }
698    }
699
700    /// Convert to Result type with AuraError
701    pub fn into_result(self) -> Result<(), AuraError> {
702        match self {
703            RateLimitResult::Allowed => Ok(()),
704            RateLimitResult::Denied { reason, .. } => Err(AuraError::invalid(reason)),
705        }
706    }
707}
708
709/// Rate limiter for operations
710///
711/// Provides token bucket-based rate limiting with per-peer and
712/// global limits. Provides unified rate limiting for all crates.
713pub struct RateLimiter {
714    /// Configuration
715    config: RateLimitConfig,
716
717    /// Global rate limit
718    global_limit: RateLimit,
719
720    /// Per-peer rate limits (using DeviceId as key)
721    peer_limits: std::collections::HashMap<crate::types::identifiers::DeviceId, RateLimit>,
722
723    /// Statistics
724    stats: RateLimiterStatistics,
725}
726
727impl RateLimiter {
728    /// Create a new rate limiter
729    ///
730    /// # Arguments
731    /// - `config`: Rate limiter configuration
732    /// - `now`: Current time instant (obtain from TimeEffects in production)
733    pub fn new(config: RateLimitConfig, now: MonotonicInstant) -> Self {
734        let global_limit =
735            RateLimit::new(config.global_ops_per_second, Duration::from_secs(1), now);
736
737        Self {
738            config,
739            global_limit,
740            peer_limits: std::collections::HashMap::new(),
741            stats: RateLimiterStatistics::default(),
742        }
743    }
744
745    /// Check rate limit for a peer operation
746    ///
747    /// # Arguments
748    /// - `peer_id`: Peer device ID
749    /// - `cost`: Operation cost in tokens
750    /// - `now`: Current time instant (obtain from TimeEffects in production)
751    ///
752    /// # Returns
753    /// - `RateLimitResult::Allowed` if operation can proceed
754    /// - `RateLimitResult::Denied` if rate limit exceeded
755    pub fn check_rate_limit(
756        &mut self,
757        peer_id: crate::types::identifiers::DeviceId,
758        cost: u32,
759        now: MonotonicInstant,
760    ) -> RateLimitResult {
761        // Check global limit first
762        if !self
763            .global_limit
764            .check_and_consume(cost, self.config.refill_rate, now)
765        {
766            self.stats.global_limit_hits += 1;
767
768            let retry_after = self
769                .global_limit
770                .time_until_available(cost, self.config.refill_rate)
771                .unwrap_or(Duration::from_secs(1));
772
773            return RateLimitResult::Denied {
774                retry_after,
775                reason: "Global rate limit exceeded".to_string(),
776            };
777        }
778
779        // Check per-peer limit
780        let peer_limit = self.peer_limits.entry(peer_id).or_insert_with(|| {
781            RateLimit::new(self.config.peer_ops_per_second, Duration::from_secs(1), now)
782        });
783
784        if !peer_limit.check_and_consume(cost, self.config.refill_rate, now) {
785            self.stats.peer_limit_hits += 1;
786
787            // Return tokens to global limit since peer limit blocked
788            self.global_limit.tokens =
789                (self.global_limit.tokens + cost).min(self.config.global_ops_per_second);
790
791            let retry_after = peer_limit
792                .time_until_available(cost, self.config.refill_rate)
793                .unwrap_or(Duration::from_secs(1));
794
795            return RateLimitResult::Denied {
796                retry_after,
797                reason: format!("Peer rate limit exceeded for {peer_id:?}"),
798            };
799        }
800
801        self.stats.operations_allowed += 1;
802        RateLimitResult::Allowed
803    }
804
805    /// Check if operation would exceed rate limit without consuming tokens
806    pub fn would_exceed_limit(
807        &self,
808        peer_id: &crate::types::identifiers::DeviceId,
809        cost: u32,
810    ) -> bool {
811        // Check global limit
812        if self.global_limit.available_tokens() < cost {
813            return true;
814        }
815
816        // Check peer limit
817        if let Some(peer_limit) = self.peer_limits.get(peer_id) {
818            if peer_limit.available_tokens() < cost {
819                return true;
820            }
821        }
822
823        false
824    }
825
826    /// Get available tokens for a peer
827    pub fn available_tokens(&self, peer_id: &crate::types::identifiers::DeviceId) -> u32 {
828        let global_tokens = self.global_limit.available_tokens();
829
830        let peer_tokens = self
831            .peer_limits
832            .get(peer_id)
833            .map(|l| l.available_tokens())
834            .unwrap_or(self.config.peer_ops_per_second);
835
836        global_tokens.min(peer_tokens)
837    }
838
839    /// Get statistics
840    pub fn statistics(&self) -> &RateLimiterStatistics {
841        &self.stats
842    }
843
844    /// Reset rate limiter state
845    ///
846    /// # Arguments
847    /// - `now`: Current time instant (obtain from TimeEffects in production)
848    pub fn reset(&mut self, now: MonotonicInstant) {
849        self.global_limit = RateLimit::new(
850            self.config.global_ops_per_second,
851            Duration::from_secs(1),
852            now,
853        );
854        self.peer_limits.clear();
855        self.stats = RateLimiterStatistics::default();
856    }
857
858    /// Remove rate limit for a peer
859    pub fn remove_peer(&mut self, peer_id: &crate::types::identifiers::DeviceId) {
860        self.peer_limits.remove(peer_id);
861    }
862
863    /// Get number of tracked peers
864    pub fn tracked_peers(&self) -> usize {
865        self.peer_limits.len()
866    }
867}
868
869/// Rate limiter statistics
870#[derive(Debug, Clone, Default, Serialize, Deserialize)]
871pub struct RateLimiterStatistics {
872    /// Total operations allowed
873    pub operations_allowed: u64,
874
875    /// Number of times global limit was hit
876    pub global_limit_hits: u64,
877
878    /// Number of times per-peer limit was hit
879    pub peer_limit_hits: u64,
880}