Skip to main content

entelix_core/transports/
retry.rs

1//! `RetryLayer` + `RetryService` — `tower::Layer<S>` middleware that
2//! retries transient errors with exponential backoff and jitter.
3//!
4//! Wraps any `Service` whose `Error` is [`crate::error::Error`].
5//! Composes uniformly on the `Service<ModelInvocation>` and
6//! `Service<ToolInvocation>` paths; the same layer handles both
7//! because the retry decision is error-shape-driven, not invocation-
8//! shape-driven.
9//!
10//! ## Cancellation
11//!
12//! Every iteration head checks
13//! [`ExecutionContext::cancellation`](crate::context::ExecutionContext)
14//! and returns `Error::Cancelled` immediately if signalled — operator
15//! intent always wins over the retry policy.
16//!
17//! ## Layering with other middleware
18//!
19//! Place `RetryLayer` *outside* observability middleware
20//! (`OtelLayer`) so each retry attempt produces its own span — the
21//! span tree shows `n` attempts, not `1` opaque envelope. Place it
22//! *inside* policy gates (`PolicyLayer`) so the policy decision
23//! fires once per logical call, not per attempt.
24//!
25//! ```text
26//!   PolicyLayer  ↘
27//!     RetryLayer  ↘
28//!       OtelLayer  ↘
29//!         <inner Service>
30//! ```
31
32use std::sync::Arc;
33use std::task::{Context, Poll};
34use std::time::Duration;
35
36use futures::future::BoxFuture;
37use rand::SeedableRng;
38use rand::rngs::SmallRng;
39use tower::{Layer, Service, ServiceExt};
40
41use crate::backoff::{DEFAULT_MAX_ATTEMPTS, ExponentialBackoff};
42use crate::error::{Error, Result};
43
44/// Parse a `Retry-After` HTTP header value into a [`Duration`].
45///
46/// Per RFC 7231, the value is either an integer number of seconds
47/// or an HTTP-date. Vendor APIs in this corner of the ecosystem
48/// (Anthropic, OpenAI, Bedrock) all use the integer form. Returns
49/// `None` for missing / malformed / zero values; `Some(Duration)`
50/// is the vendor's authoritative cooldown that retry classifiers
51/// honour ahead of self-jitter (invariant #17).
52#[must_use]
53pub fn parse_retry_after(header: Option<&http::HeaderValue>) -> Option<Duration> {
54    let header = header?.to_str().ok()?;
55    let secs: u64 = header.trim().parse().ok()?;
56    if secs == 0 {
57        return None;
58    }
59    Some(Duration::from_secs(secs))
60}
61
62/// Trait that classifies whether an error justifies another attempt
63/// and — when the vendor supplies one — the cooldown to wait before
64/// the next try.
65///
66/// The same trait drives `RetryLayer` *and*
67/// `RunnableExt::with_fallbacks`: in both cases the question is
68/// "is this error transient or permanent?". Reusing one trait keeps
69/// users out of policy-selection paralysis and avoids parallel
70/// taxonomies.
71pub trait RetryClassifier: Send + Sync + std::fmt::Debug {
72    /// Whether to attempt again, plus the optional vendor-supplied
73    /// cooldown. `attempt` starts at `0` for the first failed call
74    /// (i.e., before the first *retry*); the next failure passes
75    /// `attempt = 1`, and so on. Implementations can gate by
76    /// attempt count, error variant, kind, or any combination.
77    fn should_retry(&self, error: &Error, attempt: u32) -> RetryDecision;
78}
79
80/// Decision returned by [`RetryClassifier::should_retry`]. The
81/// `after` field carries the vendor's `Retry-After` hint when
82/// present — `RetryService` honours it ahead of its own
83/// exponential-backoff plan, capping at the configured maximum so a
84/// malicious vendor cannot pin the loop forever.
85#[derive(Clone, Copy, Debug, Eq, PartialEq)]
86pub struct RetryDecision {
87    /// Whether to attempt again.
88    pub retry: bool,
89    /// Vendor `Retry-After` hint when present. `None` means the
90    /// classifier defers to the configured backoff.
91    pub after: Option<Duration>,
92}
93
94impl RetryDecision {
95    /// Convenience: do not retry.
96    pub const STOP: Self = Self {
97        retry: false,
98        after: None,
99    };
100
101    /// Convenience: retry with the configured backoff.
102    pub const RETRY: Self = Self {
103        retry: true,
104        after: None,
105    };
106
107    /// Convenience: retry after the supplied vendor cooldown.
108    #[must_use]
109    pub const fn retry_after(after: Duration) -> Self {
110        Self {
111            retry: true,
112            after: Some(after),
113        }
114    }
115}
116
117/// Standard classifier — retries on transient HTTP / transport
118/// classes:
119///
120/// - `Provider { status: 0, .. }` — transport / DNS / connect failure.
121/// - `Provider { status: 408 | 425 | 429 | 500..=599, .. }` —
122///   Request Timeout / Too Early / Too Many Requests / 5xx server.
123///
124/// Permanent failures (`InvalidRequest`, `Config`, `Cancelled`,
125/// `DeadlineExceeded`, `Interrupted`, `Serde`, `UsageLimitExceeded`,
126/// and 4xx other than 408 / 425 / 429) are not retried.
127/// `UsageLimitExceeded` is matched explicitly — the wildcard arm
128/// would catch it, but a future variant inserted between
129/// `Provider` and the wildcard could accidentally be classified
130/// retryable; the explicit arm pins budget breach as terminal
131/// regardless of what new variants land.
132#[derive(Clone, Copy, Debug, Default)]
133pub struct DefaultRetryClassifier;
134
135impl RetryClassifier for DefaultRetryClassifier {
136    // The explicit `UsageLimitExceeded` arm is intentional defence
137    //: the wildcard below would absorb the variant
138    // identically today, but a future variant inserted between
139    // `Provider` and the wildcard could accidentally land budget
140    // breach on the retry path. Clippy's `match_same_arms` lint
141    // would have us collapse the arm — that defeats the explicit
142    // future-proofing the ADR pinned.
143    #[allow(clippy::match_same_arms)]
144    fn should_retry(&self, error: &Error, _attempt: u32) -> RetryDecision {
145        match error {
146            Error::Provider {
147                kind, retry_after, ..
148            } if is_transient_kind(*kind) => match retry_after {
149                Some(after) => RetryDecision::retry_after(*after),
150                None => RetryDecision::RETRY,
151            },
152            // Budget breach is terminal — re-issuing the same call
153            // after `RunBudget` rejected it produces the same
154            // `UsageLimitExceeded` and burns one more counter slot
155            // on the way (the pre-call CAS still increments before
156            // the cap check). Explicit match defends against future
157            // variants accidentally landing on the retry path.
158            Error::UsageLimitExceeded(_) => RetryDecision::STOP,
159            // Everything else is deterministic, caller-intent, or HITL —
160            // retrying produces the same outcome (or violates intent).
161            _ => RetryDecision::STOP,
162        }
163    }
164}
165
166const fn is_transient_kind(kind: crate::error::ProviderErrorKind) -> bool {
167    use crate::error::ProviderErrorKind;
168    match kind {
169        // Network / TLS / DNS failures are transient by default —
170        // operator routes, peer connections, name resolution all
171        // recover.
172        ProviderErrorKind::Network | ProviderErrorKind::Tls | ProviderErrorKind::Dns => true,
173        // HTTP-class transience is the documented retry set:
174        // 408 Request Timeout, 425 Too Early, 429 Too Many Requests,
175        // 5xx server errors. 4xx other than these are caller-side
176        // bugs and re-issuing produces the same response.
177        ProviderErrorKind::Http(status) => matches!(status, 408 | 425 | 429 | 500..=599),
178    }
179}
180
181/// Retry policy: how many attempts, how to space them, what to retry.
182#[derive(Clone, Debug)]
183pub struct RetryPolicy {
184    /// Maximum total attempts (including the first call). `1`
185    /// disables retry — the first failure surfaces unchanged.
186    max_attempts: u32,
187    /// Backoff sequence between attempts.
188    backoff: ExponentialBackoff,
189    /// Error → "retry?" decision.
190    classifier: Arc<dyn RetryClassifier>,
191}
192
193impl RetryPolicy {
194    /// Build with explicit components.
195    #[must_use]
196    pub fn new(
197        max_attempts: u32,
198        backoff: ExponentialBackoff,
199        classifier: Arc<dyn RetryClassifier>,
200    ) -> Self {
201        Self {
202            max_attempts,
203            backoff,
204            classifier,
205        }
206    }
207
208    /// Default policy: [`DEFAULT_MAX_ATTEMPTS`] attempts,
209    /// `100 ms` → `5 s` backoff with 30% jitter,
210    /// [`DefaultRetryClassifier`].
211    #[must_use]
212    pub fn standard() -> Self {
213        Self::new(
214            DEFAULT_MAX_ATTEMPTS,
215            ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(5)),
216            Arc::new(DefaultRetryClassifier),
217        )
218    }
219
220    /// Override the attempt cap.
221    #[must_use]
222    pub const fn with_max_attempts(mut self, n: u32) -> Self {
223        self.max_attempts = n;
224        self
225    }
226
227    /// Override the backoff sequence.
228    #[must_use]
229    pub const fn with_backoff(mut self, backoff: ExponentialBackoff) -> Self {
230        self.backoff = backoff;
231        self
232    }
233
234    /// Override the classifier.
235    #[must_use]
236    pub fn with_classifier(mut self, classifier: Arc<dyn RetryClassifier>) -> Self {
237        self.classifier = classifier;
238        self
239    }
240
241    /// Borrow the configured attempt cap.
242    #[must_use]
243    pub const fn max_attempts(&self) -> u32 {
244        self.max_attempts
245    }
246
247    /// Borrow the configured backoff.
248    #[must_use]
249    pub const fn backoff(&self) -> ExponentialBackoff {
250        self.backoff
251    }
252
253    /// Borrow the configured classifier.
254    #[must_use]
255    pub fn classifier(&self) -> &Arc<dyn RetryClassifier> {
256        &self.classifier
257    }
258}
259
260/// Layer that adds retry semantics to any `Service<Req>` whose
261/// `Error` is [`Error`].
262///
263/// The wrapped service must be `Clone` (each retry attempt clones a
264/// fresh handle to call). `tower`'s convention is that `Clone`
265/// returns a cheap reference-counted view, not a deep copy — every
266/// `*Layer` in entelix follows that contract.
267#[derive(Clone, Debug)]
268pub struct RetryLayer {
269    policy: RetryPolicy,
270}
271
272impl RetryLayer {
273    /// Patch-version-stable identifier surfaced through
274    /// [`crate::ChatModel::layer_names`] /
275    /// `ToolRegistry::layer_names`. Renaming this constant is a
276    /// breaking change for dashboards keyed off the value.
277    pub const NAME: &'static str = "retry";
278
279    /// Build with a retry policy.
280    #[must_use]
281    pub const fn new(policy: RetryPolicy) -> Self {
282        Self { policy }
283    }
284}
285
286impl<S> Layer<S> for RetryLayer {
287    type Service = RetryService<S>;
288    fn layer(&self, inner: S) -> Self::Service {
289        RetryService {
290            inner,
291            policy: self.policy.clone(),
292        }
293    }
294}
295
296impl crate::NamedLayer for RetryLayer {
297    fn layer_name(&self) -> &'static str {
298        Self::NAME
299    }
300}
301
302/// `Service` produced by [`RetryLayer`]. Generic over the request
303/// type so a single layer drives both `ModelInvocation` and
304/// `ToolInvocation` paths.
305#[derive(Clone, Debug)]
306pub struct RetryService<S> {
307    inner: S,
308    policy: RetryPolicy,
309}
310
311impl<S, Req, Resp> Service<Req> for RetryService<S>
312where
313    S: Service<Req, Response = Resp, Error = Error> + Clone + Send + 'static,
314    S::Future: Send + 'static,
315    Req: Retryable + Send + 'static,
316    Resp: Send + 'static,
317{
318    type Response = Resp;
319    type Error = Error;
320    type Future = BoxFuture<'static, Result<Resp>>;
321
322    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
323        self.inner.poll_ready(cx)
324    }
325
326    fn call(&mut self, request: Req) -> Self::Future {
327        let inner = self.inner.clone();
328        let policy = self.policy.clone();
329        Box::pin(async move { run_with_retry(inner, request, policy).await })
330    }
331}
332
333/// Marker that a request type can be cloned for retries.
334///
335/// `RetryService` clones the request once per attempt (the inner
336/// service is consumed by each call). For `ModelInvocation` and
337/// `ToolInvocation` the clone is cheap (`Arc`-backed `ModelRequest`
338/// and JSON-`Value` body).
339pub trait Retryable: Clone {
340    /// Borrow the [`ExecutionContext`](crate::context::ExecutionContext)
341    /// the retry loop checks for cancellation between attempts.
342    fn ctx(&self) -> &crate::context::ExecutionContext;
343
344    /// Mutable handle so [`RetryService`] can stamp an idempotency
345    /// key on first entry — every clone of the request that follows
346    /// shares the stamped key, so vendor-side dedupe sees one
347    /// logical call across N attempts.
348    fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext;
349}
350
351impl Retryable for crate::service::ModelInvocation {
352    fn ctx(&self) -> &crate::context::ExecutionContext {
353        &self.ctx
354    }
355    fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext {
356        &mut self.ctx
357    }
358}
359
360impl Retryable for crate::service::ToolInvocation {
361    fn ctx(&self) -> &crate::context::ExecutionContext {
362        &self.ctx
363    }
364    fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext {
365        &mut self.ctx
366    }
367}
368
369impl Retryable for crate::service::StreamingModelInvocation {
370    fn ctx(&self) -> &crate::context::ExecutionContext {
371        &self.inner.ctx
372    }
373    fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext {
374        &mut self.inner.ctx
375    }
376}
377
378async fn run_with_retry<S, Req, Resp>(
379    mut inner: S,
380    mut request: Req,
381    policy: RetryPolicy,
382) -> Result<Resp>
383where
384    S: Service<Req, Response = Resp, Error = Error> + Clone + Send,
385    S::Future: Send,
386    Req: Retryable + Send,
387{
388    // Per-call RNG seeded from a system-time component so two
389    // concurrent calls never share a jitter sequence. The RNG lives
390    // for the lifetime of the call only.
391    let seed = seed_from_time();
392    let mut rng = SmallRng::seed_from_u64(seed);
393
394    // Stamp an idempotency key on the request the first time we see
395    // it. Every subsequent clone (one per retry attempt) inherits
396    // the key, so vendor-side dedupe treats N attempts as one
397    // logical call (invariant #17 — no double-charge when a client
398    // timeout races a server-side success).
399    request
400        .ctx_mut()
401        .ensure_idempotency_key(|| uuid::Uuid::new_v4().to_string());
402
403    let max_attempts = policy.max_attempts.max(1);
404    let mut attempt: u32 = 0;
405    loop {
406        // Deadline + cancellation check at the head of every attempt.
407        // A caller with `with_timeout(200ms)` and exponential backoff
408        // would otherwise sleep through the deadline and surface
409        // `Provider` instead of the more specific `DeadlineExceeded`,
410        // making it impossible to distinguish "upstream is sick"
411        // from "we ran out of time".
412        let ctx_token = request.ctx().cancellation();
413        if ctx_token.is_cancelled() {
414            return Err(Error::Cancelled);
415        }
416        if let Some(deadline) = request.ctx().deadline()
417            && tokio::time::Instant::now() >= deadline
418        {
419            return Err(Error::DeadlineExceeded);
420        }
421
422        let cloned = request.clone();
423        let result = inner.ready().await?.call(cloned).await;
424
425        match result {
426            Ok(resp) => return Ok(resp),
427            Err(err) => {
428                attempt = attempt.saturating_add(1);
429                let exhausted = attempt >= max_attempts;
430                let decision = policy.classifier.should_retry(&err, attempt - 1);
431                if exhausted || !decision.retry {
432                    return Err(err);
433                }
434                // Vendor `Retry-After` beats self-jitter (invariant
435                // #17 — vendor authoritative signal wins). Cap at the
436                // configured backoff cap so a malicious or stuck
437                // vendor cannot pin the loop.
438                let backoff_delay = policy.backoff.delay_for_attempt(attempt - 1, &mut rng);
439                let delay = match decision.after {
440                    Some(hint) => hint.min(policy.backoff.max()),
441                    None => backoff_delay,
442                };
443                // If the caller's deadline lands inside this backoff
444                // window, cap the sleep to the remaining budget and
445                // surface `DeadlineExceeded` rather than waking up
446                // past the deadline only to retry-then-deadline.
447                let effective_delay = if let Some(deadline) = request.ctx().deadline() {
448                    let now = tokio::time::Instant::now();
449                    let remaining = deadline.saturating_duration_since(now);
450                    if remaining.is_zero() {
451                        return Err(Error::DeadlineExceeded);
452                    }
453                    delay.min(remaining)
454                } else {
455                    delay
456                };
457                let deadline_for_select = request.ctx().deadline();
458                tokio::select! {
459                    () = tokio::time::sleep(effective_delay) => {
460                        // If we capped the sleep at the deadline,
461                        // bail out instead of looping for one more
462                        // doomed attempt.
463                        if let Some(deadline) = deadline_for_select
464                            && tokio::time::Instant::now() >= deadline
465                        {
466                            return Err(Error::DeadlineExceeded);
467                        }
468                    }
469                    () = ctx_token.cancelled() => return Err(Error::Cancelled),
470                }
471            }
472        }
473    }
474}
475
476/// Seed the per-call RNG from the system clock's nanosecond
477/// component XORed with a process-local counter so two calls
478/// arriving in the same nanosecond still get distinct sequences.
479fn seed_from_time() -> u64 {
480    use std::sync::atomic::{AtomicU64, Ordering};
481    use std::time::{SystemTime, UNIX_EPOCH};
482    static COUNTER: AtomicU64 = AtomicU64::new(0);
483    // u128 nanoseconds wraps once every ~584 years at u64; truncating
484    // is fine for jitter — we only need uncorrelated low-order bits.
485    // silent-fallback-ok: jitter seed only — `now() < UNIX_EPOCH`
486    // cannot happen on a sane clock, and the per-process atomic
487    // counter XORed below already breaks ties so a 0 nanos
488    // contribution still yields uncorrelated low-order bits.
489    let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map_or(0, |d| {
490        let n = d.as_nanos();
491        #[allow(clippy::cast_possible_truncation)]
492        {
493            n as u64
494        }
495    });
496    let bump = COUNTER.fetch_add(1, Ordering::Relaxed);
497    nanos ^ bump
498}
499
500#[cfg(test)]
501#[allow(clippy::unwrap_used, clippy::expect_used)]
502mod tests {
503    use super::*;
504
505    #[test]
506    fn default_classifier_retries_transient_http_status_codes() {
507        let c = DefaultRetryClassifier;
508        for status in [408_u16, 425, 429, 500, 502, 503, 504, 599] {
509            let err = Error::provider_http(status, "x");
510            assert!(c.should_retry(&err, 0).retry, "status {status} must retry");
511        }
512    }
513
514    #[test]
515    fn default_classifier_retries_transport_class_failures() {
516        let c = DefaultRetryClassifier;
517        assert!(
518            c.should_retry(&Error::provider_network("connect refused"), 0)
519                .retry
520        );
521        assert!(
522            c.should_retry(&Error::provider_tls("handshake failed"), 0)
523                .retry
524        );
525        assert!(
526            c.should_retry(&Error::provider_dns("no such host"), 0)
527                .retry
528        );
529    }
530
531    #[test]
532    fn default_classifier_does_not_retry_permanent_status_codes() {
533        let c = DefaultRetryClassifier;
534        for status in [400_u16, 401, 403, 404, 410, 422] {
535            let err = Error::provider_http(status, "x");
536            assert!(
537                !c.should_retry(&err, 0).retry,
538                "status {status} must NOT retry"
539            );
540        }
541    }
542
543    #[test]
544    fn default_classifier_does_not_retry_caller_intent_or_programming_errors() {
545        let c = DefaultRetryClassifier;
546        assert!(!c.should_retry(&Error::Cancelled, 0).retry);
547        assert!(!c.should_retry(&Error::DeadlineExceeded, 0).retry);
548        assert!(!c.should_retry(&Error::invalid_request("nope"), 0).retry);
549        assert!(!c.should_retry(&Error::config("bad"), 0).retry);
550    }
551
552    #[test]
553    fn default_classifier_does_not_retry_usage_limit_exceeded() {
554        // Budget breach is terminal — re-issuing burns one more
555        // counter slot on each attempt's pre-call CAS without
556        // changing the outcome. Explicit arm pins the
557        // classification so a future variant inserted between
558        // `Provider` and the wildcard cannot accidentally land
559        // budget breach on the retry path.
560        use crate::run_budget::UsageLimitBreach;
561        let c = DefaultRetryClassifier;
562        let err = Error::UsageLimitExceeded(UsageLimitBreach::Requests {
563            limit: 5,
564            observed: 5,
565        });
566        let decision = c.should_retry(&err, 0);
567        assert!(!decision.retry);
568        assert_eq!(decision.after, None);
569    }
570
571    #[test]
572    fn default_classifier_propagates_vendor_retry_after_hint() {
573        let c = DefaultRetryClassifier;
574        let err =
575            Error::provider_http(429, "rate limited").with_retry_after(Duration::from_secs(7));
576        let decision = c.should_retry(&err, 0);
577        assert!(decision.retry);
578        assert_eq!(decision.after, Some(Duration::from_secs(7)));
579    }
580
581    #[test]
582    fn ensure_idempotency_key_stamps_once_and_subsequent_calls_observe_the_same_value() {
583        // RetryService relies on this contract — the first call
584        // sets a fresh UUID; later calls in the same logical
585        // call (one per attempt) see the same value rather than
586        // generating new ones.
587        use crate::context::ExecutionContext;
588        let mut ctx = ExecutionContext::new();
589        assert!(ctx.idempotency_key().is_none());
590        let mut counter = 0u32;
591        let first = ctx
592            .ensure_idempotency_key(|| {
593                counter += 1;
594                "first-uuid".to_owned()
595            })
596            .to_owned();
597        let second = ctx
598            .ensure_idempotency_key(|| {
599                counter += 1;
600                "second-uuid".to_owned()
601            })
602            .to_owned();
603        assert_eq!(first, "first-uuid");
604        assert_eq!(second, "first-uuid", "stamp must be stable across calls");
605        assert_eq!(counter, 1, "generator must run exactly once");
606        // Cloning the ctx propagates the same key — every retry of
607        // a logical call shares the stamp.
608        let cloned = ctx.clone();
609        assert_eq!(cloned.idempotency_key(), Some("first-uuid"));
610    }
611
612    #[test]
613    fn default_classifier_does_not_attach_retry_after_when_vendor_does_not_supply_one() {
614        let c = DefaultRetryClassifier;
615        let err = Error::provider_http(503, "down");
616        let decision = c.should_retry(&err, 0);
617        assert!(decision.retry);
618        assert!(decision.after.is_none());
619    }
620
621    #[test]
622    fn retry_policy_standard_uses_default_max_attempts() {
623        let p = RetryPolicy::standard();
624        assert_eq!(p.max_attempts(), DEFAULT_MAX_ATTEMPTS);
625    }
626
627    #[test]
628    fn retry_policy_overrides_compose() {
629        let p = RetryPolicy::standard()
630            .with_max_attempts(2)
631            .with_backoff(ExponentialBackoff::new(
632                Duration::from_millis(1),
633                Duration::from_millis(10),
634            ));
635        assert_eq!(p.max_attempts(), 2);
636        assert_eq!(p.backoff().base(), Duration::from_millis(1));
637    }
638}