Skip to main content

entelix_runnable/
retrying.rs

1//! `Retrying<R>` — `Runnable<I, O>` adapter that retries the inner
2//! runnable using a [`RetryPolicy`].
3//!
4//! Mirrors the semantics of
5//! [`RetryLayer`](entelix_core::transports::RetryLayer) at the
6//! `Runnable` layer rather than the `tower::Service` layer — pick
7//! whichever boundary matches the unit you want to retry.
8//! `with_retry` on a `Runnable` is the right choice for a whole
9//! chain (`prompt.pipe(model).pipe(parser)`); a `RetryLayer` is the
10//! right choice for a single model-or-tool service call.
11
12use std::marker::PhantomData;
13use std::sync::Arc;
14
15use async_trait::async_trait;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use rand::SeedableRng;
20use rand::rngs::SmallRng;
21
22use entelix_core::ExecutionContext;
23use entelix_core::error::{Error, Result};
24use entelix_core::transports::RetryPolicy;
25
26use crate::runnable::Runnable;
27
28/// `Runnable<I, O>` adapter applying a [`RetryPolicy`] to the inner
29/// runnable on every invocation.
30pub struct Retrying<R, I, O>
31where
32    R: Runnable<I, O> + 'static,
33    I: Clone + Send + 'static,
34    O: Send + 'static,
35{
36    inner: Arc<R>,
37    policy: RetryPolicy,
38    _io: PhantomData<fn(I) -> O>,
39}
40
41impl<R, I, O> Retrying<R, I, O>
42where
43    R: Runnable<I, O> + 'static,
44    I: Clone + Send + 'static,
45    O: Send + 'static,
46{
47    /// Build with the inner runnable and a retry policy.
48    pub fn new(inner: R, policy: RetryPolicy) -> Self {
49        Self {
50            inner: Arc::new(inner),
51            policy,
52            _io: PhantomData,
53        }
54    }
55}
56
57impl<R, I, O> std::fmt::Debug for Retrying<R, I, O>
58where
59    R: Runnable<I, O> + 'static,
60    I: Clone + Send + 'static,
61    O: Send + 'static,
62{
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("Retrying")
65            .field("max_attempts", &self.policy.max_attempts())
66            .finish_non_exhaustive()
67    }
68}
69
70/// Per-call RNG seed — system clock nanos `XOR`-mixed with a
71/// process-local counter so two concurrent calls get distinct
72/// jitter sequences.
73fn seed_from_time() -> u64 {
74    static COUNTER: AtomicU64 = AtomicU64::new(0);
75    // u128 nanoseconds wraps once every ~584 years at u64; truncation
76    // is fine — we only need uncorrelated low-order bits for jitter.
77    let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map_or(0, |d| {
78        let n = d.as_nanos();
79        #[allow(clippy::cast_possible_truncation)]
80        {
81            n as u64
82        }
83    });
84    let bump = COUNTER.fetch_add(1, Ordering::Relaxed);
85    nanos ^ bump
86}
87
88#[async_trait]
89impl<R, I, O> Runnable<I, O> for Retrying<R, I, O>
90where
91    R: Runnable<I, O> + 'static,
92    I: Clone + Send + 'static,
93    O: Send + 'static,
94{
95    async fn invoke(&self, input: I, ctx: &ExecutionContext) -> Result<O> {
96        let max_attempts = self.policy.max_attempts().max(1);
97        let mut rng = SmallRng::seed_from_u64(seed_from_time());
98        let mut attempt: u32 = 0;
99        loop {
100            if ctx.is_cancelled() {
101                return Err(Error::Cancelled);
102            }
103            let cloned = input.clone();
104            match self.inner.invoke(cloned, ctx).await {
105                Ok(value) => return Ok(value),
106                Err(err) => {
107                    attempt = attempt.saturating_add(1);
108                    let exhausted = attempt >= max_attempts;
109                    let decision = self.policy.classifier().should_retry(&err, attempt - 1);
110                    if exhausted || !decision.retry {
111                        return Err(err);
112                    }
113                    let backoff_delay = self
114                        .policy
115                        .backoff()
116                        .delay_for_attempt(attempt - 1, &mut rng);
117                    let delay = decision
118                        .after
119                        .map_or(backoff_delay, |hint| hint.min(self.policy.backoff().max()));
120                    let token = ctx.cancellation();
121                    tokio::select! {
122                        () = tokio::time::sleep(delay) => {}
123                        () = token.cancelled() => return Err(Error::Cancelled),
124                    }
125                }
126            }
127        }
128    }
129}