entelix_runnable/
retrying.rs1use std::marker::PhantomData;
13use std::sync::Arc;
14
15use async_trait::async_trait;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use rand::SeedableRng;
20use rand::rngs::SmallRng;
21
22use entelix_core::ExecutionContext;
23use entelix_core::error::{Error, Result};
24use entelix_core::transports::RetryPolicy;
25
26use crate::runnable::Runnable;
27
28pub struct Retrying<R, I, O>
31where
32 R: Runnable<I, O> + 'static,
33 I: Clone + Send + 'static,
34 O: Send + 'static,
35{
36 inner: Arc<R>,
37 policy: RetryPolicy,
38 _io: PhantomData<fn(I) -> O>,
39}
40
41impl<R, I, O> Retrying<R, I, O>
42where
43 R: Runnable<I, O> + 'static,
44 I: Clone + Send + 'static,
45 O: Send + 'static,
46{
47 pub fn new(inner: R, policy: RetryPolicy) -> Self {
49 Self {
50 inner: Arc::new(inner),
51 policy,
52 _io: PhantomData,
53 }
54 }
55}
56
57impl<R, I, O> std::fmt::Debug for Retrying<R, I, O>
58where
59 R: Runnable<I, O> + 'static,
60 I: Clone + Send + 'static,
61 O: Send + 'static,
62{
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("Retrying")
65 .field("max_attempts", &self.policy.max_attempts())
66 .finish_non_exhaustive()
67 }
68}
69
70fn seed_from_time() -> u64 {
74 static COUNTER: AtomicU64 = AtomicU64::new(0);
75 let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map_or(0, |d| {
78 let n = d.as_nanos();
79 #[allow(clippy::cast_possible_truncation)]
80 {
81 n as u64
82 }
83 });
84 let bump = COUNTER.fetch_add(1, Ordering::Relaxed);
85 nanos ^ bump
86}
87
88#[async_trait]
89impl<R, I, O> Runnable<I, O> for Retrying<R, I, O>
90where
91 R: Runnable<I, O> + 'static,
92 I: Clone + Send + 'static,
93 O: Send + 'static,
94{
95 async fn invoke(&self, input: I, ctx: &ExecutionContext) -> Result<O> {
96 let max_attempts = self.policy.max_attempts().max(1);
97 let mut rng = SmallRng::seed_from_u64(seed_from_time());
98 let mut attempt: u32 = 0;
99 loop {
100 if ctx.is_cancelled() {
101 return Err(Error::Cancelled);
102 }
103 let cloned = input.clone();
104 match self.inner.invoke(cloned, ctx).await {
105 Ok(value) => return Ok(value),
106 Err(err) => {
107 attempt = attempt.saturating_add(1);
108 let exhausted = attempt >= max_attempts;
109 let decision = self.policy.classifier().should_retry(&err, attempt - 1);
110 if exhausted || !decision.retry {
111 return Err(err);
112 }
113 let backoff_delay = self
114 .policy
115 .backoff()
116 .delay_for_attempt(attempt - 1, &mut rng);
117 let delay = decision
118 .after
119 .map_or(backoff_delay, |hint| hint.min(self.policy.backoff().max()));
120 let token = ctx.cancellation();
121 tokio::select! {
122 () = tokio::time::sleep(delay) => {}
123 () = token.cancelled() => return Err(Error::Cancelled),
124 }
125 }
126 }
127 }
128 }
129}