1use crate::effects::time::PhysicalTimeEffects;
19use crate::AuraError;
20use async_trait::async_trait;
21use serde::{Deserialize, Serialize};
22use std::future::Future;
23use std::time::Duration;
24
25cfg_if::cfg_if! {
26 if #[cfg(target_arch = "wasm32")] {
27 type MonotonicInstant = web_time::Instant;
28 } else {
29 type MonotonicInstant = std::time::Instant;
30 }
31}
32
33#[async_trait]
39pub trait ReliabilityEffects {
40 async fn with_retry<T, F, Fut>(
51 &self,
52 operation: F,
53 max_attempts: u32,
54 base_delay: Duration,
55 max_delay: Duration,
56 ) -> Result<T, ReliabilityError>
57 where
58 F: Fn() -> Fut + Send,
59 Fut: std::future::Future<Output = Result<T, AuraError>> + Send,
60 T: Send;
61
62 async fn with_circuit_breaker<T, F, Fut>(
77 &self,
78 operation: F,
79 circuit_id: &str,
80 failure_threshold: u32,
81 timeout: Duration,
82 ) -> Result<T, ReliabilityError>
83 where
84 F: Fn() -> Fut + Send,
85 Fut: std::future::Future<Output = Result<T, AuraError>> + Send,
86 T: Send;
87
88 async fn with_timeout<T, F, Fut>(
100 &self,
101 operation: F,
102 timeout: Duration,
103 ) -> Result<T, ReliabilityError>
104 where
105 F: Fn() -> Fut + Send,
106 Fut: std::future::Future<Output = Result<T, AuraError>> + Send,
107 T: Send;
108
109 async fn with_rate_limit<T, F, Fut>(
122 &self,
123 operation: F,
124 rate_limit_id: &str,
125 max_operations_per_second: f64,
126 ) -> Result<T, ReliabilityError>
127 where
128 F: Fn() -> Fut + Send,
129 Fut: std::future::Future<Output = Result<T, AuraError>> + Send,
130 T: Send;
131}
132
133#[derive(Debug, thiserror::Error)]
135pub enum ReliabilityError {
136 #[error("Operation failed after {attempts} attempts: {last_error}")]
138 RetryExhausted {
139 attempts: u32,
140 last_error: AuraError,
141 },
142
143 #[error("Circuit breaker '{circuit_id}' is open, failing fast")]
145 CircuitBreakerOpen { circuit_id: String },
146
147 #[error("Operation timed out after {timeout:?}")]
149 Timeout { timeout: Duration },
150
151 #[error("Rate limit '{rate_limit_id}' exceeded: {max_rate} ops/sec")]
153 RateLimitExceeded {
154 rate_limit_id: String,
155 max_rate: f64,
156 },
157
158 #[error("Operation failed: {0}")]
160 OperationError(#[from] AuraError),
161}
162
163#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
172pub enum BackoffStrategy {
173 Fixed,
175 Linear,
177 Exponential,
179 ExponentialWithJitter,
181}
182
183impl BackoffStrategy {
184 pub fn calculate_delay(
191 &self,
192 attempt: u32,
193 initial_delay: Duration,
194 max_delay: Duration,
195 ) -> Duration {
196 let delay = match self {
199 BackoffStrategy::Fixed => initial_delay,
200 BackoffStrategy::Linear => initial_delay * (attempt + 1),
201 BackoffStrategy::Exponential => {
202 let multiplier = 2u32.saturating_pow(attempt);
203 initial_delay * multiplier
204 }
205 BackoffStrategy::ExponentialWithJitter => {
206 let base_delay = initial_delay * 2u32.saturating_pow(attempt);
207 let jitter =
213 (base_delay.as_millis() as f64 * 0.1 * (attempt as f64 * 0.1 % 1.0)) as u64;
214 base_delay + Duration::from_millis(jitter)
215 }
216 };
217
218 delay.min(max_delay)
219 }
220}
221
222#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
227pub enum JitterMode {
228 #[default]
230 None,
231 Deterministic,
233}
234
235#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct RetryPolicy {
241 pub max_attempts: u32,
243 pub initial_delay: Duration,
245 pub max_delay: Duration,
247 pub strategy: BackoffStrategy,
249 pub jitter: JitterMode,
251 pub timeout: Option<Duration>,
253}
254
255impl RetryPolicy {
256 #[must_use]
258 pub fn exponential() -> Self {
259 Self {
260 max_attempts: 3,
261 initial_delay: Duration::from_millis(100),
262 max_delay: Duration::from_secs(30),
263 strategy: BackoffStrategy::Exponential,
264 jitter: JitterMode::None,
265 timeout: None,
266 }
267 }
268
269 #[must_use]
271 pub fn fixed(delay: Duration) -> Self {
272 Self {
273 max_attempts: 3,
274 initial_delay: delay,
275 max_delay: delay,
276 strategy: BackoffStrategy::Fixed,
277 jitter: JitterMode::None,
278 timeout: None,
279 }
280 }
281
282 #[must_use]
284 pub fn linear() -> Self {
285 Self {
286 max_attempts: 3,
287 initial_delay: Duration::from_millis(100),
288 max_delay: Duration::from_secs(30),
289 strategy: BackoffStrategy::Linear,
290 jitter: JitterMode::None,
291 timeout: None,
292 }
293 }
294
295 #[must_use]
297 pub fn with_max_attempts(mut self, attempts: u32) -> Self {
298 self.max_attempts = attempts;
299 self
300 }
301
302 #[must_use]
304 pub fn with_initial_delay(mut self, delay: Duration) -> Self {
305 self.initial_delay = delay;
306 self
307 }
308
309 #[must_use]
311 pub fn with_max_delay(mut self, delay: Duration) -> Self {
312 self.max_delay = delay;
313 self
314 }
315
316 #[must_use]
318 pub fn with_jitter(mut self, mode: JitterMode) -> Self {
319 self.jitter = mode;
320 if matches!(mode, JitterMode::Deterministic) {
321 self.strategy = BackoffStrategy::ExponentialWithJitter;
322 }
323 self
324 }
325
326 #[must_use]
328 pub fn with_timeout(mut self, timeout: Duration) -> Self {
329 self.timeout = Some(timeout);
330 self
331 }
332
333 pub fn calculate_delay(&self, attempt: u32) -> Duration {
335 let strategy = match self.jitter {
336 JitterMode::Deterministic => BackoffStrategy::ExponentialWithJitter,
337 JitterMode::None => self.strategy,
338 };
339
340 strategy.calculate_delay(attempt, self.initial_delay, self.max_delay)
341 }
342
343 pub async fn execute_with_sleep<F, Fut, T, E, S, SFut>(
345 &self,
346 mut operation: F,
347 mut sleep: S,
348 ) -> Result<T, E>
349 where
350 F: FnMut() -> Fut,
351 Fut: Future<Output = Result<T, E>>,
352 S: FnMut(Duration) -> SFut,
353 SFut: Future<Output = ()>,
354 {
355 let mut attempt = 0;
356
357 loop {
358 match operation().await {
359 Ok(result) => return Ok(result),
360 Err(err) => {
361 if attempt >= self.max_attempts {
362 return Err(err);
363 }
364
365 let delay = self.calculate_delay(attempt);
366 sleep(delay).await;
367
368 attempt += 1;
369 }
370 }
371 }
372 }
373
374 pub async fn execute_with_effects<F, Fut, T, E, Eff>(
376 &self,
377 effects: &Eff,
378 operation: F,
379 ) -> Result<T, E>
380 where
381 F: FnMut() -> Fut,
382 Fut: Future<Output = Result<T, E>>,
383 Eff: PhysicalTimeEffects + Send + Sync,
384 {
385 self.execute_with_sleep(operation, |delay| async move {
386 let _ = effects.sleep_ms(delay.as_millis() as u64).await;
387 })
388 .await
389 }
390
391 pub async fn execute_with_sleep_and_context<F, Fut, T, E, S, SFut>(
393 &self,
394 now: MonotonicInstant,
395 mut operation: F,
396 mut sleep: S,
397 ) -> RetryResult<T, E>
398 where
399 F: FnMut() -> Fut,
400 Fut: Future<Output = Result<T, E>>,
401 S: FnMut(Duration) -> SFut,
402 SFut: Future<Output = ()>,
403 {
404 let start = now;
405 let mut attempt = 0;
406 let mut total_delay = Duration::ZERO;
407
408 loop {
409 match operation().await {
410 Ok(result) => {
411 return RetryResult {
412 result: Ok(result),
413 attempts: attempt + 1,
414 total_duration: start.elapsed(),
415 total_retry_delay: total_delay,
416 };
417 }
418 Err(err) => {
419 if attempt >= self.max_attempts {
420 return RetryResult {
421 result: Err(err),
422 attempts: attempt + 1,
423 total_duration: start.elapsed(),
424 total_retry_delay: total_delay,
425 };
426 }
427
428 let delay = self.calculate_delay(attempt);
429 total_delay += delay;
430 sleep(delay).await;
431
432 attempt += 1;
433 }
434 }
435 }
436 }
437}
438
439impl Default for RetryPolicy {
440 fn default() -> Self {
441 Self::exponential()
442 }
443}
444
445#[derive(Debug, Clone)]
447pub struct RetryResult<T, E> {
448 pub result: Result<T, E>,
450 pub attempts: u32,
452 pub total_duration: Duration,
454 pub total_retry_delay: Duration,
456}
457
458impl<T, E> RetryResult<T, E> {
459 pub fn is_success(&self) -> bool {
461 self.result.is_ok()
462 }
463
464 pub fn had_retries(&self) -> bool {
466 self.attempts > 1
467 }
468
469 pub fn into_result(self) -> Result<T, E> {
471 self.result
472 }
473}
474
475impl<T, E: std::fmt::Debug> RetryResult<T, E> {
476 #[allow(clippy::expect_used)]
478 pub fn unwrap(self) -> T {
479 self.result
480 .expect("RetryResult should contain a success value")
481 }
482}
483
484#[derive(Debug, Clone)]
486pub struct RetryContext {
487 pub attempt: u32,
489 pub started_at: MonotonicInstant,
491 pub accumulated_delay: Duration,
493 pub is_last_attempt: bool,
495}
496
497impl RetryContext {
498 pub fn new(now: MonotonicInstant, max_attempts: u32) -> Self {
504 Self {
505 attempt: 0,
506 started_at: now,
507 accumulated_delay: Duration::ZERO,
508 is_last_attempt: max_attempts == 0,
509 }
510 }
511
512 pub fn next_attempt(&mut self, delay: Duration, max_attempts: u32) {
514 self.attempt += 1;
515 self.accumulated_delay += delay;
516 self.is_last_attempt = self.attempt >= max_attempts;
517 }
518
519 pub fn elapsed(&self) -> Duration {
521 self.started_at.elapsed()
522 }
523
524 pub fn total_time(&self) -> Duration {
526 self.elapsed()
527 }
528}
529
530#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
538pub enum AdaptiveMode {
539 Fixed,
541 #[default]
543 Adaptive,
544}
545
546#[derive(Debug, Clone, Serialize, Deserialize)]
551pub struct RateLimitConfig {
552 pub global_ops_per_second: u32,
554
555 pub peer_ops_per_second: u32,
557
558 pub bucket_capacity: u32,
560
561 pub refill_rate: u32,
563
564 pub window_size: Duration,
566
567 pub adaptive: AdaptiveMode,
569}
570
571impl Default for RateLimitConfig {
572 fn default() -> Self {
573 Self {
574 global_ops_per_second: 1000,
575 peer_ops_per_second: 100,
576 bucket_capacity: 200,
577 refill_rate: 100,
578 window_size: Duration::from_secs(60),
579 adaptive: AdaptiveMode::Adaptive,
580 }
581 }
582}
583
584#[derive(Debug, Clone, Serialize, Deserialize)]
588pub struct RateLimit {
589 pub max_operations: u32,
591
592 pub window: Duration,
594
595 pub tokens: u32,
597
598 #[serde(skip)]
600 pub last_refill: Option<MonotonicInstant>,
601}
602
603impl RateLimit {
604 pub fn new(max_operations: u32, window: Duration, now: MonotonicInstant) -> Self {
611 Self {
612 max_operations,
613 window,
614 tokens: max_operations,
615 last_refill: Some(now),
616 }
617 }
618
619 pub fn check_and_consume(
626 &mut self,
627 cost: u32,
628 refill_rate: u32,
629 now: MonotonicInstant,
630 ) -> bool {
631 let last_refill = self.last_refill.get_or_insert(now);
633
634 let elapsed = now.duration_since(*last_refill);
636 let refill_tokens = (elapsed.as_secs_f64() * refill_rate as f64) as u32;
637
638 if refill_tokens > 0 {
639 self.tokens = (self.tokens + refill_tokens).min(self.max_operations);
640 self.last_refill = Some(now);
641 }
642
643 if self.tokens >= cost {
645 self.tokens -= cost;
646 true
647 } else {
648 false
649 }
650 }
651
652 pub fn available_tokens(&self) -> u32 {
654 self.tokens
655 }
656
657 pub fn time_until_available(&self, cost: u32, refill_rate: u32) -> Option<Duration> {
659 if self.tokens >= cost {
660 return None;
661 }
662
663 let needed = cost - self.tokens;
664 let seconds = needed as f64 / refill_rate as f64;
665
666 Some(Duration::from_secs_f64(seconds))
667 }
668}
669
670#[derive(Debug, Clone, PartialEq, Eq)]
672pub enum RateLimitResult {
673 Allowed,
675
676 Denied {
678 retry_after: Duration,
680
681 reason: String,
683 },
684}
685
686impl RateLimitResult {
687 pub fn is_allowed(&self) -> bool {
689 matches!(self, RateLimitResult::Allowed)
690 }
691
692 pub fn retry_after(&self) -> Option<Duration> {
694 match self {
695 RateLimitResult::Denied { retry_after, .. } => Some(*retry_after),
696 RateLimitResult::Allowed => None,
697 }
698 }
699
700 pub fn into_result(self) -> Result<(), AuraError> {
702 match self {
703 RateLimitResult::Allowed => Ok(()),
704 RateLimitResult::Denied { reason, .. } => Err(AuraError::invalid(reason)),
705 }
706 }
707}
708
709pub struct RateLimiter {
714 config: RateLimitConfig,
716
717 global_limit: RateLimit,
719
720 peer_limits: std::collections::HashMap<crate::types::identifiers::DeviceId, RateLimit>,
722
723 stats: RateLimiterStatistics,
725}
726
727impl RateLimiter {
728 pub fn new(config: RateLimitConfig, now: MonotonicInstant) -> Self {
734 let global_limit =
735 RateLimit::new(config.global_ops_per_second, Duration::from_secs(1), now);
736
737 Self {
738 config,
739 global_limit,
740 peer_limits: std::collections::HashMap::new(),
741 stats: RateLimiterStatistics::default(),
742 }
743 }
744
745 pub fn check_rate_limit(
756 &mut self,
757 peer_id: crate::types::identifiers::DeviceId,
758 cost: u32,
759 now: MonotonicInstant,
760 ) -> RateLimitResult {
761 if !self
763 .global_limit
764 .check_and_consume(cost, self.config.refill_rate, now)
765 {
766 self.stats.global_limit_hits += 1;
767
768 let retry_after = self
769 .global_limit
770 .time_until_available(cost, self.config.refill_rate)
771 .unwrap_or(Duration::from_secs(1));
772
773 return RateLimitResult::Denied {
774 retry_after,
775 reason: "Global rate limit exceeded".to_string(),
776 };
777 }
778
779 let peer_limit = self.peer_limits.entry(peer_id).or_insert_with(|| {
781 RateLimit::new(self.config.peer_ops_per_second, Duration::from_secs(1), now)
782 });
783
784 if !peer_limit.check_and_consume(cost, self.config.refill_rate, now) {
785 self.stats.peer_limit_hits += 1;
786
787 self.global_limit.tokens =
789 (self.global_limit.tokens + cost).min(self.config.global_ops_per_second);
790
791 let retry_after = peer_limit
792 .time_until_available(cost, self.config.refill_rate)
793 .unwrap_or(Duration::from_secs(1));
794
795 return RateLimitResult::Denied {
796 retry_after,
797 reason: format!("Peer rate limit exceeded for {peer_id:?}"),
798 };
799 }
800
801 self.stats.operations_allowed += 1;
802 RateLimitResult::Allowed
803 }
804
805 pub fn would_exceed_limit(
807 &self,
808 peer_id: &crate::types::identifiers::DeviceId,
809 cost: u32,
810 ) -> bool {
811 if self.global_limit.available_tokens() < cost {
813 return true;
814 }
815
816 if let Some(peer_limit) = self.peer_limits.get(peer_id) {
818 if peer_limit.available_tokens() < cost {
819 return true;
820 }
821 }
822
823 false
824 }
825
826 pub fn available_tokens(&self, peer_id: &crate::types::identifiers::DeviceId) -> u32 {
828 let global_tokens = self.global_limit.available_tokens();
829
830 let peer_tokens = self
831 .peer_limits
832 .get(peer_id)
833 .map(|l| l.available_tokens())
834 .unwrap_or(self.config.peer_ops_per_second);
835
836 global_tokens.min(peer_tokens)
837 }
838
839 pub fn statistics(&self) -> &RateLimiterStatistics {
841 &self.stats
842 }
843
844 pub fn reset(&mut self, now: MonotonicInstant) {
849 self.global_limit = RateLimit::new(
850 self.config.global_ops_per_second,
851 Duration::from_secs(1),
852 now,
853 );
854 self.peer_limits.clear();
855 self.stats = RateLimiterStatistics::default();
856 }
857
858 pub fn remove_peer(&mut self, peer_id: &crate::types::identifiers::DeviceId) {
860 self.peer_limits.remove(peer_id);
861 }
862
863 pub fn tracked_peers(&self) -> usize {
865 self.peer_limits.len()
866 }
867}
868
869#[derive(Debug, Clone, Default, Serialize, Deserialize)]
871pub struct RateLimiterStatistics {
872 pub operations_allowed: u64,
874
875 pub global_limit_hits: u64,
877
878 pub peer_limit_hits: u64,
880}