reliability_toolkit/
retry.rs1use std::future::Future;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::time::sleep;
18
19pub type RetryPredicate<E> = Arc<dyn Fn(&E) -> bool + Send + Sync>;
21
22#[derive(Clone)]
24pub struct RetryConfig<E = std::io::Error> {
25 pub max_attempts: u32,
27 pub base_delay: Duration,
29 pub max_delay: Duration,
31 pub retry_if: Option<RetryPredicate<E>>,
34}
35
36impl<E> Default for RetryConfig<E> {
37 fn default() -> Self {
38 Self {
39 max_attempts: 3,
40 base_delay: Duration::from_millis(100),
41 max_delay: Duration::from_secs(5),
42 retry_if: None,
43 }
44 }
45}
46
47impl<E> std::fmt::Debug for RetryConfig<E> {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("RetryConfig")
50 .field("max_attempts", &self.max_attempts)
51 .field("base_delay", &self.base_delay)
52 .field("max_delay", &self.max_delay)
53 .field("retry_if", &self.retry_if.as_ref().map(|_| "<predicate>"))
54 .finish()
55 }
56}
57
58#[derive(Clone, Debug)]
60pub struct Retry<E = std::io::Error> {
61 cfg: RetryConfig<E>,
62 seed: Arc<AtomicU64>,
63}
64
65impl<E> Retry<E> {
66 pub fn new(cfg: RetryConfig<E>) -> Self {
68 let seed = std::time::SystemTime::now()
70 .duration_since(std::time::UNIX_EPOCH)
71 .map_or(0xdead_beef, |d| d.as_nanos() as u64);
72 Self {
73 cfg,
74 seed: Arc::new(AtomicU64::new(seed.wrapping_add(1))),
75 }
76 }
77
78 pub async fn run<F, Fut, T>(&self, mut make_fut: F) -> Result<T, E>
83 where
84 F: FnMut() -> Fut,
85 Fut: Future<Output = Result<T, E>>,
86 {
87 let mut attempt: u32 = 0;
88 loop {
89 attempt += 1;
90 match make_fut().await {
91 Ok(v) => return Ok(v),
92 Err(e) => {
93 if attempt >= self.cfg.max_attempts {
94 return Err(e);
95 }
96 if let Some(pred) = &self.cfg.retry_if {
97 if !pred(&e) {
98 return Err(e);
99 }
100 }
101 let delay = self.backoff(attempt);
102 sleep(delay).await;
103 }
104 }
105 }
106 }
107
108 fn backoff(&self, attempt: u32) -> Duration {
109 let exp = attempt.saturating_sub(1).min(30);
111 let raw = self.cfg.base_delay.saturating_mul(1u32 << exp);
112 let capped = raw.min(self.cfg.max_delay);
113 let max_ms = capped.as_millis().min(u128::from(u64::MAX)) as u64;
115 let jitter_ms = self.next_rand() % (max_ms + 1);
116 Duration::from_millis(jitter_ms)
117 }
118
119 fn next_rand(&self) -> u64 {
121 let mut x = self.seed.load(Ordering::Relaxed);
122 if x == 0 {
123 x = 0xdead_beef;
124 }
125 x ^= x >> 12;
126 x ^= x << 25;
127 x ^= x >> 27;
128 self.seed.store(x, Ordering::Relaxed);
129 x.wrapping_mul(0x2545_F491_4F6C_DD1D)
130 }
131}