allframe_core/resilience/
retry.rs

1//! Retry patterns with exponential backoff, jitter, and adaptive behavior.
2//!
3//! Provides retry mechanisms for transient failures with configurable backoff
4//! strategies.
5
6use std::{
7    future::Future,
8    sync::atomic::{AtomicU32, Ordering},
9    time::{Duration, Instant},
10};
11
12use parking_lot::RwLock;
13use rand::Rng;
14
15/// Configuration for retry behavior.
16#[derive(Debug, Clone)]
17pub struct RetryConfig {
18    /// Maximum number of retry attempts (0 = no retries, just the initial
19    /// attempt).
20    pub max_retries: u32,
21    /// Initial interval between retries.
22    pub initial_interval: Duration,
23    /// Maximum interval between retries.
24    pub max_interval: Duration,
25    /// Multiplier for exponential backoff.
26    pub multiplier: f64,
27    /// Randomization factor for jitter (0.0 = no jitter, 0.5 = +/- 50%).
28    pub randomization_factor: f64,
29    /// Maximum total elapsed time for all retries. None = no limit.
30    pub max_elapsed_time: Option<Duration>,
31}
32
33impl Default for RetryConfig {
34    fn default() -> Self {
35        Self {
36            max_retries: 3,
37            initial_interval: Duration::from_millis(500),
38            max_interval: Duration::from_secs(30),
39            multiplier: 2.0,
40            randomization_factor: 0.5,
41            max_elapsed_time: Some(Duration::from_secs(60)),
42        }
43    }
44}
45
46impl RetryConfig {
47    /// Create a new retry config with specified max retries.
48    pub fn new(max_retries: u32) -> Self {
49        Self {
50            max_retries,
51            ..Default::default()
52        }
53    }
54
55    /// Set the initial interval.
56    pub fn with_initial_interval(mut self, interval: Duration) -> Self {
57        self.initial_interval = interval;
58        self
59    }
60
61    /// Set the maximum interval.
62    pub fn with_max_interval(mut self, interval: Duration) -> Self {
63        self.max_interval = interval;
64        self
65    }
66
67    /// Set the backoff multiplier.
68    pub fn with_multiplier(mut self, multiplier: f64) -> Self {
69        self.multiplier = multiplier;
70        self
71    }
72
73    /// Set the randomization factor for jitter.
74    pub fn with_randomization_factor(mut self, factor: f64) -> Self {
75        self.randomization_factor = factor.clamp(0.0, 1.0);
76        self
77    }
78
79    /// Set the maximum elapsed time.
80    pub fn with_max_elapsed_time(mut self, time: Option<Duration>) -> Self {
81        self.max_elapsed_time = time;
82        self
83    }
84
85    /// Calculate the next backoff interval with jitter.
86    pub fn calculate_interval(&self, attempt: u32) -> Duration {
87        let base = self.initial_interval.as_secs_f64() * self.multiplier.powi(attempt as i32);
88        let capped = base.min(self.max_interval.as_secs_f64());
89
90        // Apply jitter
91        let jitter_range = capped * self.randomization_factor;
92        let mut rng = rand::thread_rng();
93        let jitter = rng.gen_range(-jitter_range..=jitter_range);
94        let final_interval = (capped + jitter).max(0.0);
95
96        Duration::from_secs_f64(final_interval)
97    }
98}
99
100/// Error returned when all retry attempts fail.
101#[derive(Debug)]
102pub struct RetryError<E> {
103    /// The last error encountered.
104    pub last_error: E,
105    /// Number of attempts made.
106    pub attempts: u32,
107    /// Total elapsed time.
108    pub elapsed: Duration,
109}
110
111impl<E: std::fmt::Display> std::fmt::Display for RetryError<E> {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        write!(
114            f,
115            "retry exhausted after {} attempts ({:?}): {}",
116            self.attempts, self.elapsed, self.last_error
117        )
118    }
119}
120
121impl<E: std::error::Error + 'static> std::error::Error for RetryError<E> {
122    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
123        Some(&self.last_error)
124    }
125}
126
127/// Trait for determining if an error should trigger a retry.
128pub trait RetryPolicy: Send + Sync {
129    /// Returns true if the operation should be retried for this error.
130    fn should_retry(&self, error: &dyn std::error::Error) -> bool;
131}
132
133/// Default retry policy that retries all errors.
134#[derive(Debug, Clone, Default)]
135pub struct AlwaysRetry;
136
137impl RetryPolicy for AlwaysRetry {
138    fn should_retry(&self, _error: &dyn std::error::Error) -> bool {
139        true
140    }
141}
142
143/// Retry policy that never retries.
144#[derive(Debug, Clone, Default)]
145#[allow(dead_code)]
146pub struct NeverRetry;
147
148impl RetryPolicy for NeverRetry {
149    fn should_retry(&self, _error: &dyn std::error::Error) -> bool {
150        false
151    }
152}
153
154/// Executes async operations with exponential backoff and jitter.
155pub struct RetryExecutor<P: RetryPolicy = AlwaysRetry> {
156    config: RetryConfig,
157    policy: P,
158}
159
160impl RetryExecutor<AlwaysRetry> {
161    /// Create a new retry executor with default policy.
162    pub fn new(config: RetryConfig) -> Self {
163        Self {
164            config,
165            policy: AlwaysRetry,
166        }
167    }
168}
169
170impl<P: RetryPolicy> RetryExecutor<P> {
171    /// Create a retry executor with a custom policy.
172    pub fn with_policy(config: RetryConfig, policy: P) -> Self {
173        Self { config, policy }
174    }
175
176    /// Execute an async operation with retries.
177    ///
178    /// The operation will be retried according to the configuration until:
179    /// - It succeeds
180    /// - Max retries is reached
181    /// - Max elapsed time is reached
182    /// - The retry policy says not to retry
183    pub async fn execute<F, Fut, T, E>(&self, name: &str, mut f: F) -> Result<T, RetryError<E>>
184    where
185        F: FnMut() -> Fut,
186        Fut: Future<Output = Result<T, E>>,
187        E: std::error::Error + 'static,
188    {
189        let start = Instant::now();
190        let mut attempts = 0u32;
191
192        loop {
193            attempts += 1;
194
195            match f().await {
196                Ok(result) => return Ok(result),
197                Err(e) => {
198                    // Check if we should retry
199                    if !self.policy.should_retry(&e) {
200                        return Err(RetryError {
201                            last_error: e,
202                            attempts,
203                            elapsed: start.elapsed(),
204                        });
205                    }
206
207                    // Check if we've exceeded max retries
208                    if attempts > self.config.max_retries {
209                        return Err(RetryError {
210                            last_error: e,
211                            attempts,
212                            elapsed: start.elapsed(),
213                        });
214                    }
215
216                    // Check if we've exceeded max elapsed time
217                    if let Some(max_elapsed) = self.config.max_elapsed_time {
218                        if start.elapsed() >= max_elapsed {
219                            return Err(RetryError {
220                                last_error: e,
221                                attempts,
222                                elapsed: start.elapsed(),
223                            });
224                        }
225                    }
226
227                    // Calculate and wait for backoff interval
228                    let interval = self.config.calculate_interval(attempts - 1);
229
230                    // Log retry attempt (could be made configurable)
231                    #[cfg(feature = "otel")]
232                    tracing::debug!(
233                        operation = name,
234                        attempt = attempts,
235                        next_retry_in = ?interval,
236                        "retrying operation"
237                    );
238                    let _ = name; // Silence unused warning when otel is disabled
239
240                    tokio::time::sleep(interval).await;
241                }
242            }
243        }
244    }
245
246    /// Get the configuration.
247    pub fn config(&self) -> &RetryConfig {
248        &self.config
249    }
250}
251
252/// Prevents retry storms by limiting total retry capacity.
253///
254/// A retry budget tracks available retry tokens and prevents excessive
255/// retries when the system is under stress.
256pub struct RetryBudget {
257    /// Maximum tokens available.
258    max_tokens: u32,
259    /// Current available tokens.
260    tokens: AtomicU32,
261    /// Tokens recovered per second.
262    recovery_rate: f64,
263    /// Last recovery time.
264    last_recovery: RwLock<Instant>,
265}
266
267impl RetryBudget {
268    /// Create a new retry budget.
269    ///
270    /// # Arguments
271    /// * `max_tokens` - Maximum retry tokens available
272    /// * `recovery_rate` - Tokens recovered per second
273    pub fn new(max_tokens: u32, recovery_rate: f64) -> Self {
274        Self {
275            max_tokens,
276            tokens: AtomicU32::new(max_tokens),
277            recovery_rate,
278            last_recovery: RwLock::new(Instant::now()),
279        }
280    }
281
282    /// Try to consume retry tokens.
283    ///
284    /// Returns true if tokens were available and consumed.
285    pub fn try_consume(&self, amount: u32) -> bool {
286        self.recover_tokens();
287
288        loop {
289            let current = self.tokens.load(Ordering::Acquire);
290            if current < amount {
291                return false;
292            }
293
294            if self
295                .tokens
296                .compare_exchange(
297                    current,
298                    current - amount,
299                    Ordering::AcqRel,
300                    Ordering::Relaxed,
301                )
302                .is_ok()
303            {
304                return true;
305            }
306        }
307    }
308
309    /// Get remaining tokens.
310    pub fn remaining(&self) -> u32 {
311        self.recover_tokens();
312        self.tokens.load(Ordering::Acquire)
313    }
314
315    /// Recover tokens based on elapsed time.
316    fn recover_tokens(&self) {
317        let mut last = self.last_recovery.write();
318        let elapsed = last.elapsed();
319
320        if elapsed.as_secs_f64() > 0.1 {
321            // Only recover every 100ms
322            let recovered = (elapsed.as_secs_f64() * self.recovery_rate) as u32;
323            if recovered > 0 {
324                let current = self.tokens.load(Ordering::Acquire);
325                let new_value = (current + recovered).min(self.max_tokens);
326                self.tokens.store(new_value, Ordering::Release);
327                *last = Instant::now();
328            }
329        }
330    }
331
332    /// Reset the budget to full capacity.
333    pub fn reset(&self) {
334        self.tokens.store(self.max_tokens, Ordering::Release);
335        *self.last_recovery.write() = Instant::now();
336    }
337}
338
339impl Default for RetryBudget {
340    fn default() -> Self {
341        Self::new(100, 10.0) // 100 tokens, recover 10/second
342    }
343}
344
345/// Adapts retry behavior based on recent success/failure rates.
346///
347/// When failures increase, the retry configuration becomes more conservative
348/// (longer delays, fewer retries). When success rate improves, it relaxes.
349pub struct AdaptiveRetry {
350    base_config: RetryConfig,
351    /// Recent outcomes (true = success, false = failure).
352    outcomes: RwLock<Vec<(Instant, bool)>>,
353    /// Window size for calculating success rate.
354    window: Duration,
355}
356
357impl AdaptiveRetry {
358    /// Create a new adaptive retry with base configuration.
359    pub fn new(base_config: RetryConfig) -> Self {
360        Self {
361            base_config,
362            outcomes: RwLock::new(Vec::new()),
363            window: Duration::from_secs(60),
364        }
365    }
366
367    /// Create with a custom window size.
368    pub fn with_window(mut self, window: Duration) -> Self {
369        self.window = window;
370        self
371    }
372
373    /// Record an operation outcome.
374    pub fn record_outcome(&self, success: bool) {
375        let mut outcomes = self.outcomes.write();
376        outcomes.push((Instant::now(), success));
377
378        // Prune old entries
379        let cutoff = Instant::now() - self.window;
380        outcomes.retain(|(time, _)| *time > cutoff);
381    }
382
383    /// Get success rate (0.0 - 1.0).
384    pub fn success_rate(&self) -> f64 {
385        let outcomes = self.outcomes.read();
386        if outcomes.is_empty() {
387            return 1.0;
388        }
389
390        let successes = outcomes.iter().filter(|(_, s)| *s).count();
391        successes as f64 / outcomes.len() as f64
392    }
393
394    /// Get adjusted configuration based on success rate.
395    ///
396    /// Lower success rates result in:
397    /// - Longer initial intervals
398    /// - Fewer max retries
399    /// - Higher multiplier
400    pub fn get_adjusted_config(&self) -> RetryConfig {
401        let success_rate = self.success_rate();
402
403        // Scale factor: 1.0 at 100% success, up to 3.0 at 0% success
404        let scale = 1.0 + (2.0 * (1.0 - success_rate));
405
406        let mut config = self.base_config.clone();
407
408        // Increase initial interval
409        config.initial_interval =
410            Duration::from_secs_f64(self.base_config.initial_interval.as_secs_f64() * scale);
411
412        // Reduce max retries when success rate is low
413        if success_rate < 0.5 {
414            config.max_retries = (self.base_config.max_retries / 2).max(1);
415        }
416
417        // Increase multiplier when success rate is low
418        config.multiplier = self.base_config.multiplier * (1.0 + (1.0 - success_rate));
419
420        config
421    }
422
423    /// Create a retry executor with the current adjusted config.
424    pub fn executor(&self) -> RetryExecutor<AlwaysRetry> {
425        RetryExecutor::new(self.get_adjusted_config())
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use std::sync::Arc;
432
433    use super::*;
434
435    #[test]
436    fn test_retry_config_default() {
437        let config = RetryConfig::default();
438        assert_eq!(config.max_retries, 3);
439        assert_eq!(config.initial_interval, Duration::from_millis(500));
440        assert_eq!(config.multiplier, 2.0);
441    }
442
443    #[test]
444    fn test_retry_config_builder() {
445        let config = RetryConfig::new(5)
446            .with_initial_interval(Duration::from_secs(1))
447            .with_max_interval(Duration::from_secs(60))
448            .with_multiplier(1.5)
449            .with_randomization_factor(0.3);
450
451        assert_eq!(config.max_retries, 5);
452        assert_eq!(config.initial_interval, Duration::from_secs(1));
453        assert_eq!(config.max_interval, Duration::from_secs(60));
454        assert_eq!(config.multiplier, 1.5);
455        assert_eq!(config.randomization_factor, 0.3);
456    }
457
458    #[test]
459    fn test_calculate_interval_exponential() {
460        let config = RetryConfig::new(5)
461            .with_initial_interval(Duration::from_secs(1))
462            .with_randomization_factor(0.0); // No jitter for predictable test
463
464        let interval0 = config.calculate_interval(0);
465        let interval1 = config.calculate_interval(1);
466        let interval2 = config.calculate_interval(2);
467
468        assert_eq!(interval0, Duration::from_secs(1));
469        assert_eq!(interval1, Duration::from_secs(2));
470        assert_eq!(interval2, Duration::from_secs(4));
471    }
472
473    #[test]
474    fn test_calculate_interval_capped() {
475        let config = RetryConfig::new(10)
476            .with_initial_interval(Duration::from_secs(1))
477            .with_max_interval(Duration::from_secs(10))
478            .with_randomization_factor(0.0);
479
480        let interval5 = config.calculate_interval(5); // Would be 32s without cap
481        assert_eq!(interval5, Duration::from_secs(10));
482    }
483
484    #[test]
485    fn test_retry_budget_consume() {
486        let budget = RetryBudget::new(10, 0.0); // No recovery
487
488        assert!(budget.try_consume(5));
489        assert_eq!(budget.remaining(), 5);
490        assert!(budget.try_consume(5));
491        assert_eq!(budget.remaining(), 0);
492        assert!(!budget.try_consume(1)); // No more tokens
493    }
494
495    #[test]
496    fn test_retry_budget_reset() {
497        let budget = RetryBudget::new(10, 0.0);
498        budget.try_consume(10);
499        assert_eq!(budget.remaining(), 0);
500
501        budget.reset();
502        assert_eq!(budget.remaining(), 10);
503    }
504
505    #[test]
506    fn test_adaptive_retry_success_rate() {
507        let adaptive = AdaptiveRetry::new(RetryConfig::default());
508
509        // No outcomes = 100% success rate
510        assert_eq!(adaptive.success_rate(), 1.0);
511
512        // Record some outcomes
513        adaptive.record_outcome(true);
514        adaptive.record_outcome(true);
515        adaptive.record_outcome(false);
516        adaptive.record_outcome(false);
517
518        assert_eq!(adaptive.success_rate(), 0.5);
519    }
520
521    #[test]
522    fn test_adaptive_retry_config_adjustment() {
523        let base = RetryConfig::new(4)
524            .with_initial_interval(Duration::from_secs(1))
525            .with_multiplier(2.0);
526
527        let adaptive = AdaptiveRetry::new(base);
528
529        // 25% success rate (below 50% threshold)
530        adaptive.record_outcome(true);
531        adaptive.record_outcome(false);
532        adaptive.record_outcome(false);
533        adaptive.record_outcome(false);
534
535        let adjusted = adaptive.get_adjusted_config();
536
537        // Should have reduced max retries (success_rate < 0.5)
538        assert_eq!(adjusted.max_retries, 2);
539        // Should have increased initial interval
540        assert!(adjusted.initial_interval > Duration::from_secs(1));
541    }
542
543    #[tokio::test]
544    async fn test_retry_executor_success() {
545        let executor = RetryExecutor::new(RetryConfig::new(3));
546        let result = executor
547            .execute("test", || async { Ok::<_, std::io::Error>("success") })
548            .await;
549
550        assert!(result.is_ok());
551        assert_eq!(result.unwrap(), "success");
552    }
553
554    #[tokio::test]
555    async fn test_retry_executor_failure() {
556        let config = RetryConfig::new(2)
557            .with_initial_interval(Duration::from_millis(10))
558            .with_max_elapsed_time(None);
559
560        let executor = RetryExecutor::new(config);
561        let result = executor
562            .execute("test", || async {
563                Err::<(), _>(std::io::Error::new(
564                    std::io::ErrorKind::Other,
565                    "always fails",
566                ))
567            })
568            .await;
569
570        assert!(result.is_err());
571        let err = result.unwrap_err();
572        assert_eq!(err.attempts, 3); // Initial + 2 retries
573    }
574
575    #[tokio::test]
576    async fn test_retry_executor_eventual_success() {
577        let config = RetryConfig::new(3).with_initial_interval(Duration::from_millis(10));
578
579        let executor = RetryExecutor::new(config);
580        let attempt = Arc::new(AtomicU32::new(0));
581        let attempt_clone = attempt.clone();
582
583        let result = executor
584            .execute("test", || {
585                let attempt = attempt_clone.clone();
586                async move {
587                    let current = attempt.fetch_add(1, Ordering::SeqCst);
588                    if current < 2 {
589                        Err(std::io::Error::new(std::io::ErrorKind::Other, "not yet"))
590                    } else {
591                        Ok("success")
592                    }
593                }
594            })
595            .await;
596
597        assert!(result.is_ok());
598        assert_eq!(attempt.load(Ordering::SeqCst), 3);
599    }
600
601    #[test]
602    fn test_always_retry_policy() {
603        let policy = AlwaysRetry;
604        let error = std::io::Error::new(std::io::ErrorKind::Other, "test");
605        assert!(policy.should_retry(&error));
606    }
607
608    #[test]
609    fn test_never_retry_policy() {
610        let policy = NeverRetry;
611        let error = std::io::Error::new(std::io::ErrorKind::Other, "test");
612        assert!(!policy.should_retry(&error));
613    }
614}