allframe_core/resilience/
retry.rs1use std::{
7 future::Future,
8 sync::atomic::{AtomicU32, Ordering},
9 time::{Duration, Instant},
10};
11
12use parking_lot::RwLock;
13use rand::Rng;
14
15#[derive(Debug, Clone)]
17pub struct RetryConfig {
18 pub max_retries: u32,
21 pub initial_interval: Duration,
23 pub max_interval: Duration,
25 pub multiplier: f64,
27 pub randomization_factor: f64,
29 pub max_elapsed_time: Option<Duration>,
31}
32
33impl Default for RetryConfig {
34 fn default() -> Self {
35 Self {
36 max_retries: 3,
37 initial_interval: Duration::from_millis(500),
38 max_interval: Duration::from_secs(30),
39 multiplier: 2.0,
40 randomization_factor: 0.5,
41 max_elapsed_time: Some(Duration::from_secs(60)),
42 }
43 }
44}
45
46impl RetryConfig {
47 pub fn new(max_retries: u32) -> Self {
49 Self {
50 max_retries,
51 ..Default::default()
52 }
53 }
54
55 pub fn with_initial_interval(mut self, interval: Duration) -> Self {
57 self.initial_interval = interval;
58 self
59 }
60
61 pub fn with_max_interval(mut self, interval: Duration) -> Self {
63 self.max_interval = interval;
64 self
65 }
66
67 pub fn with_multiplier(mut self, multiplier: f64) -> Self {
69 self.multiplier = multiplier;
70 self
71 }
72
73 pub fn with_randomization_factor(mut self, factor: f64) -> Self {
75 self.randomization_factor = factor.clamp(0.0, 1.0);
76 self
77 }
78
79 pub fn with_max_elapsed_time(mut self, time: Option<Duration>) -> Self {
81 self.max_elapsed_time = time;
82 self
83 }
84
85 pub fn calculate_interval(&self, attempt: u32) -> Duration {
87 let base = self.initial_interval.as_secs_f64() * self.multiplier.powi(attempt as i32);
88 let capped = base.min(self.max_interval.as_secs_f64());
89
90 let jitter_range = capped * self.randomization_factor;
92 let mut rng = rand::thread_rng();
93 let jitter = rng.gen_range(-jitter_range..=jitter_range);
94 let final_interval = (capped + jitter).max(0.0);
95
96 Duration::from_secs_f64(final_interval)
97 }
98}
99
100#[derive(Debug)]
102pub struct RetryError<E> {
103 pub last_error: E,
105 pub attempts: u32,
107 pub elapsed: Duration,
109}
110
111impl<E: std::fmt::Display> std::fmt::Display for RetryError<E> {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 write!(
114 f,
115 "retry exhausted after {} attempts ({:?}): {}",
116 self.attempts, self.elapsed, self.last_error
117 )
118 }
119}
120
121impl<E: std::error::Error + 'static> std::error::Error for RetryError<E> {
122 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
123 Some(&self.last_error)
124 }
125}
126
127pub trait RetryPolicy: Send + Sync {
129 fn should_retry(&self, error: &dyn std::error::Error) -> bool;
131}
132
133#[derive(Debug, Clone, Default)]
135pub struct AlwaysRetry;
136
137impl RetryPolicy for AlwaysRetry {
138 fn should_retry(&self, _error: &dyn std::error::Error) -> bool {
139 true
140 }
141}
142
143#[derive(Debug, Clone, Default)]
145#[allow(dead_code)]
146pub struct NeverRetry;
147
148impl RetryPolicy for NeverRetry {
149 fn should_retry(&self, _error: &dyn std::error::Error) -> bool {
150 false
151 }
152}
153
154pub struct RetryExecutor<P: RetryPolicy = AlwaysRetry> {
156 config: RetryConfig,
157 policy: P,
158}
159
160impl RetryExecutor<AlwaysRetry> {
161 pub fn new(config: RetryConfig) -> Self {
163 Self {
164 config,
165 policy: AlwaysRetry,
166 }
167 }
168}
169
170impl<P: RetryPolicy> RetryExecutor<P> {
171 pub fn with_policy(config: RetryConfig, policy: P) -> Self {
173 Self { config, policy }
174 }
175
176 pub async fn execute<F, Fut, T, E>(&self, name: &str, mut f: F) -> Result<T, RetryError<E>>
184 where
185 F: FnMut() -> Fut,
186 Fut: Future<Output = Result<T, E>>,
187 E: std::error::Error + 'static,
188 {
189 let start = Instant::now();
190 let mut attempts = 0u32;
191
192 loop {
193 attempts += 1;
194
195 match f().await {
196 Ok(result) => return Ok(result),
197 Err(e) => {
198 if !self.policy.should_retry(&e) {
200 return Err(RetryError {
201 last_error: e,
202 attempts,
203 elapsed: start.elapsed(),
204 });
205 }
206
207 if attempts > self.config.max_retries {
209 return Err(RetryError {
210 last_error: e,
211 attempts,
212 elapsed: start.elapsed(),
213 });
214 }
215
216 if let Some(max_elapsed) = self.config.max_elapsed_time {
218 if start.elapsed() >= max_elapsed {
219 return Err(RetryError {
220 last_error: e,
221 attempts,
222 elapsed: start.elapsed(),
223 });
224 }
225 }
226
227 let interval = self.config.calculate_interval(attempts - 1);
229
230 #[cfg(feature = "otel")]
232 tracing::debug!(
233 operation = name,
234 attempt = attempts,
235 next_retry_in = ?interval,
236 "retrying operation"
237 );
238 let _ = name; tokio::time::sleep(interval).await;
241 }
242 }
243 }
244 }
245
246 pub fn config(&self) -> &RetryConfig {
248 &self.config
249 }
250}
251
252pub struct RetryBudget {
257 max_tokens: u32,
259 tokens: AtomicU32,
261 recovery_rate: f64,
263 last_recovery: RwLock<Instant>,
265}
266
267impl RetryBudget {
268 pub fn new(max_tokens: u32, recovery_rate: f64) -> Self {
274 Self {
275 max_tokens,
276 tokens: AtomicU32::new(max_tokens),
277 recovery_rate,
278 last_recovery: RwLock::new(Instant::now()),
279 }
280 }
281
282 pub fn try_consume(&self, amount: u32) -> bool {
286 self.recover_tokens();
287
288 loop {
289 let current = self.tokens.load(Ordering::Acquire);
290 if current < amount {
291 return false;
292 }
293
294 if self
295 .tokens
296 .compare_exchange(
297 current,
298 current - amount,
299 Ordering::AcqRel,
300 Ordering::Relaxed,
301 )
302 .is_ok()
303 {
304 return true;
305 }
306 }
307 }
308
309 pub fn remaining(&self) -> u32 {
311 self.recover_tokens();
312 self.tokens.load(Ordering::Acquire)
313 }
314
315 fn recover_tokens(&self) {
317 let mut last = self.last_recovery.write();
318 let elapsed = last.elapsed();
319
320 if elapsed.as_secs_f64() > 0.1 {
321 let recovered = (elapsed.as_secs_f64() * self.recovery_rate) as u32;
323 if recovered > 0 {
324 let current = self.tokens.load(Ordering::Acquire);
325 let new_value = (current + recovered).min(self.max_tokens);
326 self.tokens.store(new_value, Ordering::Release);
327 *last = Instant::now();
328 }
329 }
330 }
331
332 pub fn reset(&self) {
334 self.tokens.store(self.max_tokens, Ordering::Release);
335 *self.last_recovery.write() = Instant::now();
336 }
337}
338
339impl Default for RetryBudget {
340 fn default() -> Self {
341 Self::new(100, 10.0) }
343}
344
345pub struct AdaptiveRetry {
350 base_config: RetryConfig,
351 outcomes: RwLock<Vec<(Instant, bool)>>,
353 window: Duration,
355}
356
357impl AdaptiveRetry {
358 pub fn new(base_config: RetryConfig) -> Self {
360 Self {
361 base_config,
362 outcomes: RwLock::new(Vec::new()),
363 window: Duration::from_secs(60),
364 }
365 }
366
367 pub fn with_window(mut self, window: Duration) -> Self {
369 self.window = window;
370 self
371 }
372
373 pub fn record_outcome(&self, success: bool) {
375 let mut outcomes = self.outcomes.write();
376 outcomes.push((Instant::now(), success));
377
378 let cutoff = Instant::now() - self.window;
380 outcomes.retain(|(time, _)| *time > cutoff);
381 }
382
383 pub fn success_rate(&self) -> f64 {
385 let outcomes = self.outcomes.read();
386 if outcomes.is_empty() {
387 return 1.0;
388 }
389
390 let successes = outcomes.iter().filter(|(_, s)| *s).count();
391 successes as f64 / outcomes.len() as f64
392 }
393
394 pub fn get_adjusted_config(&self) -> RetryConfig {
401 let success_rate = self.success_rate();
402
403 let scale = 1.0 + (2.0 * (1.0 - success_rate));
405
406 let mut config = self.base_config.clone();
407
408 config.initial_interval =
410 Duration::from_secs_f64(self.base_config.initial_interval.as_secs_f64() * scale);
411
412 if success_rate < 0.5 {
414 config.max_retries = (self.base_config.max_retries / 2).max(1);
415 }
416
417 config.multiplier = self.base_config.multiplier * (1.0 + (1.0 - success_rate));
419
420 config
421 }
422
423 pub fn executor(&self) -> RetryExecutor<AlwaysRetry> {
425 RetryExecutor::new(self.get_adjusted_config())
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use std::sync::Arc;
432
433 use super::*;
434
435 #[test]
436 fn test_retry_config_default() {
437 let config = RetryConfig::default();
438 assert_eq!(config.max_retries, 3);
439 assert_eq!(config.initial_interval, Duration::from_millis(500));
440 assert_eq!(config.multiplier, 2.0);
441 }
442
443 #[test]
444 fn test_retry_config_builder() {
445 let config = RetryConfig::new(5)
446 .with_initial_interval(Duration::from_secs(1))
447 .with_max_interval(Duration::from_secs(60))
448 .with_multiplier(1.5)
449 .with_randomization_factor(0.3);
450
451 assert_eq!(config.max_retries, 5);
452 assert_eq!(config.initial_interval, Duration::from_secs(1));
453 assert_eq!(config.max_interval, Duration::from_secs(60));
454 assert_eq!(config.multiplier, 1.5);
455 assert_eq!(config.randomization_factor, 0.3);
456 }
457
458 #[test]
459 fn test_calculate_interval_exponential() {
460 let config = RetryConfig::new(5)
461 .with_initial_interval(Duration::from_secs(1))
462 .with_randomization_factor(0.0); let interval0 = config.calculate_interval(0);
465 let interval1 = config.calculate_interval(1);
466 let interval2 = config.calculate_interval(2);
467
468 assert_eq!(interval0, Duration::from_secs(1));
469 assert_eq!(interval1, Duration::from_secs(2));
470 assert_eq!(interval2, Duration::from_secs(4));
471 }
472
473 #[test]
474 fn test_calculate_interval_capped() {
475 let config = RetryConfig::new(10)
476 .with_initial_interval(Duration::from_secs(1))
477 .with_max_interval(Duration::from_secs(10))
478 .with_randomization_factor(0.0);
479
480 let interval5 = config.calculate_interval(5); assert_eq!(interval5, Duration::from_secs(10));
482 }
483
484 #[test]
485 fn test_retry_budget_consume() {
486 let budget = RetryBudget::new(10, 0.0); assert!(budget.try_consume(5));
489 assert_eq!(budget.remaining(), 5);
490 assert!(budget.try_consume(5));
491 assert_eq!(budget.remaining(), 0);
492 assert!(!budget.try_consume(1)); }
494
495 #[test]
496 fn test_retry_budget_reset() {
497 let budget = RetryBudget::new(10, 0.0);
498 budget.try_consume(10);
499 assert_eq!(budget.remaining(), 0);
500
501 budget.reset();
502 assert_eq!(budget.remaining(), 10);
503 }
504
505 #[test]
506 fn test_adaptive_retry_success_rate() {
507 let adaptive = AdaptiveRetry::new(RetryConfig::default());
508
509 assert_eq!(adaptive.success_rate(), 1.0);
511
512 adaptive.record_outcome(true);
514 adaptive.record_outcome(true);
515 adaptive.record_outcome(false);
516 adaptive.record_outcome(false);
517
518 assert_eq!(adaptive.success_rate(), 0.5);
519 }
520
521 #[test]
522 fn test_adaptive_retry_config_adjustment() {
523 let base = RetryConfig::new(4)
524 .with_initial_interval(Duration::from_secs(1))
525 .with_multiplier(2.0);
526
527 let adaptive = AdaptiveRetry::new(base);
528
529 adaptive.record_outcome(true);
531 adaptive.record_outcome(false);
532 adaptive.record_outcome(false);
533 adaptive.record_outcome(false);
534
535 let adjusted = adaptive.get_adjusted_config();
536
537 assert_eq!(adjusted.max_retries, 2);
539 assert!(adjusted.initial_interval > Duration::from_secs(1));
541 }
542
543 #[tokio::test]
544 async fn test_retry_executor_success() {
545 let executor = RetryExecutor::new(RetryConfig::new(3));
546 let result = executor
547 .execute("test", || async { Ok::<_, std::io::Error>("success") })
548 .await;
549
550 assert!(result.is_ok());
551 assert_eq!(result.unwrap(), "success");
552 }
553
554 #[tokio::test]
555 async fn test_retry_executor_failure() {
556 let config = RetryConfig::new(2)
557 .with_initial_interval(Duration::from_millis(10))
558 .with_max_elapsed_time(None);
559
560 let executor = RetryExecutor::new(config);
561 let result = executor
562 .execute("test", || async {
563 Err::<(), _>(std::io::Error::new(
564 std::io::ErrorKind::Other,
565 "always fails",
566 ))
567 })
568 .await;
569
570 assert!(result.is_err());
571 let err = result.unwrap_err();
572 assert_eq!(err.attempts, 3); }
574
575 #[tokio::test]
576 async fn test_retry_executor_eventual_success() {
577 let config = RetryConfig::new(3).with_initial_interval(Duration::from_millis(10));
578
579 let executor = RetryExecutor::new(config);
580 let attempt = Arc::new(AtomicU32::new(0));
581 let attempt_clone = attempt.clone();
582
583 let result = executor
584 .execute("test", || {
585 let attempt = attempt_clone.clone();
586 async move {
587 let current = attempt.fetch_add(1, Ordering::SeqCst);
588 if current < 2 {
589 Err(std::io::Error::new(std::io::ErrorKind::Other, "not yet"))
590 } else {
591 Ok("success")
592 }
593 }
594 })
595 .await;
596
597 assert!(result.is_ok());
598 assert_eq!(attempt.load(Ordering::SeqCst), 3);
599 }
600
601 #[test]
602 fn test_always_retry_policy() {
603 let policy = AlwaysRetry;
604 let error = std::io::Error::new(std::io::ErrorKind::Other, "test");
605 assert!(policy.should_retry(&error));
606 }
607
608 #[test]
609 fn test_never_retry_policy() {
610 let policy = NeverRetry;
611 let error = std::io::Error::new(std::io::ErrorKind::Other, "test");
612 assert!(!policy.should_retry(&error));
613 }
614}