Skip to main content

entelix_core/transports/
retry.rs

1//! `RetryLayer` + `RetryService` — `tower::Layer<S>` middleware that
2//! retries transient errors with exponential backoff and jitter.
3//!
4//! Wraps any `Service` whose `Error` is [`crate::error::Error`].
5//! Composes uniformly on the `Service<ModelInvocation>` and
6//! `Service<ToolInvocation>` paths; the same layer handles both
7//! because the retry decision is error-shape-driven, not invocation-
8//! shape-driven.
9//!
10//! ## Cancellation
11//!
12//! Every iteration head checks
13//! [`ExecutionContext::cancellation`](crate::context::ExecutionContext)
14//! and returns `Error::Cancelled` immediately if signalled — operator
15//! intent always wins over the retry policy.
16//!
17//! ## Layering with other middleware
18//!
19//! Place `RetryLayer` *outside* observability middleware
20//! (`OtelLayer`) so each retry attempt produces its own span — the
21//! span tree shows `n` attempts, not `1` opaque envelope. Place it
22//! *inside* policy gates (`PolicyLayer`) so the policy decision
23//! fires once per logical call, not per attempt.
24//!
25//! ```text
26//!   PolicyLayer  ↘
27//!     RetryLayer  ↘
28//!       OtelLayer  ↘
29//!         <inner Service>
30//! ```
31
32use std::sync::Arc;
33use std::task::{Context, Poll};
34use std::time::Duration;
35
36use futures::future::BoxFuture;
37use rand::SeedableRng;
38use rand::rngs::SmallRng;
39use tower::{Layer, Service, ServiceExt};
40
41use crate::backoff::{DEFAULT_MAX_ATTEMPTS, ExponentialBackoff};
42use crate::error::{Error, Result};
43
44/// Parse a `Retry-After` HTTP header value into a [`Duration`].
45///
46/// Per RFC 7231, the value is either an integer number of seconds
47/// or an HTTP-date. Vendor APIs in this corner of the ecosystem
48/// (Anthropic, OpenAI, Bedrock) all use the integer form. Returns
49/// `None` for missing / malformed / zero values; `Some(Duration)`
50/// is the vendor's authoritative cooldown that retry classifiers
51/// honour ahead of self-jitter (invariant #17).
52#[must_use]
53pub fn parse_retry_after(header: Option<&http::HeaderValue>) -> Option<Duration> {
54    let header = header?.to_str().ok()?;
55    let secs: u64 = header.trim().parse().ok()?;
56    if secs == 0 {
57        return None;
58    }
59    Some(Duration::from_secs(secs))
60}
61
62/// Trait that classifies whether an error justifies another attempt
63/// and — when the vendor supplies one — the cooldown to wait before
64/// the next try.
65///
66/// The same trait drives `RetryLayer` *and*
67/// `RunnableExt::with_fallbacks`: in both cases the question is
68/// "is this error transient or permanent?". Reusing one trait keeps
69/// users out of policy-selection paralysis and avoids parallel
70/// taxonomies.
71pub trait RetryClassifier: Send + Sync + std::fmt::Debug {
72    /// Whether to attempt again, plus the optional vendor-supplied
73    /// cooldown. `attempt` starts at `0` for the first failed call
74    /// (i.e., before the first *retry*); the next failure passes
75    /// `attempt = 1`, and so on. Implementations can gate by
76    /// attempt count, error variant, kind, or any combination.
77    fn should_retry(&self, error: &Error, attempt: u32) -> RetryDecision;
78}
79
80/// Decision returned by [`RetryClassifier::should_retry`]. The
81/// `after` field carries the vendor's `Retry-After` hint when
82/// present — `RetryService` honours it ahead of its own
83/// exponential-backoff plan, capping at the configured maximum so a
84/// malicious vendor cannot pin the loop forever.
85#[derive(Clone, Copy, Debug, Eq, PartialEq)]
86pub struct RetryDecision {
87    /// Whether to attempt again.
88    pub retry: bool,
89    /// Vendor `Retry-After` hint when present. `None` means the
90    /// classifier defers to the configured backoff.
91    pub after: Option<Duration>,
92}
93
94impl RetryDecision {
95    /// Convenience: do not retry.
96    pub const STOP: Self = Self {
97        retry: false,
98        after: None,
99    };
100
101    /// Convenience: retry with the configured backoff.
102    pub const RETRY: Self = Self {
103        retry: true,
104        after: None,
105    };
106
107    /// Convenience: retry after the supplied vendor cooldown.
108    #[must_use]
109    pub const fn retry_after(after: Duration) -> Self {
110        Self {
111            retry: true,
112            after: Some(after),
113        }
114    }
115}
116
117/// Standard classifier — retries on transient HTTP / transport
118/// classes:
119///
120/// - `Provider { status: 0, .. }` — transport / DNS / connect failure.
121/// - `Provider { status: 408 | 425 | 429 | 500..=599, .. }` —
122///   Request Timeout / Too Early / Too Many Requests / 5xx server.
123///
124/// Permanent failures (`InvalidRequest`, `Config`, `Cancelled`,
125/// `DeadlineExceeded`, `Interrupted`, `Serde`, `UsageLimitExceeded`,
126/// and 4xx other than 408 / 425 / 429) are not retried.
127/// `UsageLimitExceeded` is matched explicitly — the wildcard arm
128/// would catch it, but a future variant inserted between
129/// `Provider` and the wildcard could accidentally be classified
130/// retryable; the explicit arm pins budget breach as terminal
131/// regardless of what new variants land.
132#[derive(Clone, Copy, Debug, Default)]
133pub struct DefaultRetryClassifier;
134
135impl RetryClassifier for DefaultRetryClassifier {
136    // The explicit `UsageLimitExceeded` arm is intentional defence
137    //: the wildcard below would absorb the variant
138    // identically today, but a future variant inserted between
139    // `Provider` and the wildcard could accidentally land budget
140    // breach on the retry path. Clippy's `match_same_arms` lint
141    // would have us collapse the arm — that defeats the explicit
142    // future-proofing the ADR pinned.
143    #[allow(clippy::match_same_arms)]
144    fn should_retry(&self, error: &Error, _attempt: u32) -> RetryDecision {
145        // `ToolErrorPolicyLayer` may wrap a still-retryable failure
146        // into `ToolErrorTerminal` (when the operator adds
147        // `Transient` / `RateLimit` to their `terminate_on` set).
148        // Walk the wrap chain to the leaf so the retry decision is
149        // identical regardless of whether the policy layer sits
150        // inside or outside the retry layer — without this unwrap,
151        // a policy layer placed inside the retry layer would
152        // silently disable retries.
153        let mut current = error;
154        while let Error::ToolErrorTerminal { source, .. } = current {
155            current = source;
156        }
157        match current {
158            Error::Provider {
159                kind, retry_after, ..
160            } if is_transient_kind(*kind) => match retry_after {
161                Some(after) => RetryDecision::retry_after(*after),
162                None => RetryDecision::RETRY,
163            },
164            // Budget breach is terminal — re-issuing the same call
165            // after `RunBudget` rejected it produces the same
166            // `UsageLimitExceeded` and burns one more counter slot
167            // on the way (the pre-call CAS still increments before
168            // the cap check). Explicit match defends against future
169            // variants accidentally landing on the retry path.
170            Error::UsageLimitExceeded(_) => RetryDecision::STOP,
171            // Everything else is deterministic, caller-intent, or HITL —
172            // retrying produces the same outcome (or violates intent).
173            _ => RetryDecision::STOP,
174        }
175    }
176}
177
178const fn is_transient_kind(kind: crate::error::ProviderErrorKind) -> bool {
179    use crate::error::ProviderErrorKind;
180    match kind {
181        // Network / TLS / DNS failures are transient by default —
182        // operator routes, peer connections, name resolution all
183        // recover.
184        ProviderErrorKind::Network | ProviderErrorKind::Tls | ProviderErrorKind::Dns => true,
185        // HTTP-class transience is the documented retry set:
186        // 408 Request Timeout, 425 Too Early, 429 Too Many Requests,
187        // 5xx server errors. 4xx other than these are caller-side
188        // bugs and re-issuing produces the same response.
189        ProviderErrorKind::Http(status) => matches!(status, 408 | 425 | 429 | 500..=599),
190    }
191}
192
193/// Retry policy: how many attempts, how to space them, what to retry.
194#[derive(Clone, Debug)]
195pub struct RetryPolicy {
196    /// Maximum total attempts (including the first call). `1`
197    /// disables retry — the first failure surfaces unchanged.
198    max_attempts: u32,
199    /// Backoff sequence between attempts.
200    backoff: ExponentialBackoff,
201    /// Error → "retry?" decision.
202    classifier: Arc<dyn RetryClassifier>,
203}
204
205impl RetryPolicy {
206    /// Build with explicit components.
207    #[must_use]
208    pub fn new(
209        max_attempts: u32,
210        backoff: ExponentialBackoff,
211        classifier: Arc<dyn RetryClassifier>,
212    ) -> Self {
213        Self {
214            max_attempts,
215            backoff,
216            classifier,
217        }
218    }
219
220    /// Default policy: [`DEFAULT_MAX_ATTEMPTS`] attempts,
221    /// `100 ms` → `5 s` backoff with 30% jitter,
222    /// [`DefaultRetryClassifier`].
223    #[must_use]
224    pub fn standard() -> Self {
225        Self::new(
226            DEFAULT_MAX_ATTEMPTS,
227            ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(5)),
228            Arc::new(DefaultRetryClassifier),
229        )
230    }
231
232    /// Override the attempt cap.
233    #[must_use]
234    pub const fn with_max_attempts(mut self, n: u32) -> Self {
235        self.max_attempts = n;
236        self
237    }
238
239    /// Override the backoff sequence.
240    #[must_use]
241    pub const fn with_backoff(mut self, backoff: ExponentialBackoff) -> Self {
242        self.backoff = backoff;
243        self
244    }
245
246    /// Override the classifier.
247    #[must_use]
248    pub fn with_classifier(mut self, classifier: Arc<dyn RetryClassifier>) -> Self {
249        self.classifier = classifier;
250        self
251    }
252
253    /// Borrow the configured attempt cap.
254    #[must_use]
255    pub const fn max_attempts(&self) -> u32 {
256        self.max_attempts
257    }
258
259    /// Borrow the configured backoff.
260    #[must_use]
261    pub const fn backoff(&self) -> ExponentialBackoff {
262        self.backoff
263    }
264
265    /// Borrow the configured classifier.
266    #[must_use]
267    pub fn classifier(&self) -> &Arc<dyn RetryClassifier> {
268        &self.classifier
269    }
270}
271
272/// Layer that adds retry semantics to any `Service<Req>` whose
273/// `Error` is [`Error`].
274///
275/// The wrapped service must be `Clone` (each retry attempt clones a
276/// fresh handle to call). `tower`'s convention is that `Clone`
277/// returns a cheap reference-counted view, not a deep copy — every
278/// `*Layer` in entelix follows that contract.
279#[derive(Clone, Debug)]
280pub struct RetryLayer {
281    policy: RetryPolicy,
282}
283
284impl RetryLayer {
285    /// Patch-version-stable identifier surfaced through
286    /// [`crate::ChatModel::layer_names`] /
287    /// `ToolRegistry::layer_names`. Renaming this constant is a
288    /// breaking change for dashboards keyed off the value.
289    pub const NAME: &'static str = "retry";
290
291    /// Build with a retry policy.
292    #[must_use]
293    pub const fn new(policy: RetryPolicy) -> Self {
294        Self { policy }
295    }
296}
297
298impl<S> Layer<S> for RetryLayer {
299    type Service = RetryService<S>;
300    fn layer(&self, inner: S) -> Self::Service {
301        RetryService {
302            inner,
303            policy: self.policy.clone(),
304        }
305    }
306}
307
308impl crate::NamedLayer for RetryLayer {
309    fn layer_name(&self) -> &'static str {
310        Self::NAME
311    }
312}
313
314/// `Service` produced by [`RetryLayer`]. Generic over the request
315/// type so a single layer drives both `ModelInvocation` and
316/// `ToolInvocation` paths.
317#[derive(Clone, Debug)]
318pub struct RetryService<S> {
319    inner: S,
320    policy: RetryPolicy,
321}
322
323impl<S, Req, Resp> Service<Req> for RetryService<S>
324where
325    S: Service<Req, Response = Resp, Error = Error> + Clone + Send + 'static,
326    S::Future: Send + 'static,
327    Req: Retryable + Send + 'static,
328    Resp: Send + 'static,
329{
330    type Response = Resp;
331    type Error = Error;
332    type Future = BoxFuture<'static, Result<Resp>>;
333
334    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
335        self.inner.poll_ready(cx)
336    }
337
338    fn call(&mut self, request: Req) -> Self::Future {
339        let inner = self.inner.clone();
340        let policy = self.policy.clone();
341        Box::pin(async move { run_with_retry(inner, request, policy).await })
342    }
343}
344
345/// Marker that a request type can be cloned for retries.
346///
347/// `RetryService` clones the request once per attempt (the inner
348/// service is consumed by each call). For `ModelInvocation` and
349/// `ToolInvocation` the clone is cheap (`Arc`-backed `ModelRequest`
350/// and JSON-`Value` body).
351pub trait Retryable: Clone {
352    /// Borrow the [`ExecutionContext`](crate::context::ExecutionContext)
353    /// the retry loop checks for cancellation between attempts.
354    fn ctx(&self) -> &crate::context::ExecutionContext;
355
356    /// Mutable handle so [`RetryService`] can stamp an idempotency
357    /// key on first entry — every clone of the request that follows
358    /// shares the stamped key, so vendor-side dedupe sees one
359    /// logical call across N attempts.
360    fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext;
361}
362
363impl Retryable for crate::service::ModelInvocation {
364    fn ctx(&self) -> &crate::context::ExecutionContext {
365        &self.ctx
366    }
367    fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext {
368        &mut self.ctx
369    }
370}
371
372impl Retryable for crate::service::ToolInvocation {
373    fn ctx(&self) -> &crate::context::ExecutionContext {
374        &self.ctx
375    }
376    fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext {
377        &mut self.ctx
378    }
379}
380
381impl Retryable for crate::service::StreamingModelInvocation {
382    fn ctx(&self) -> &crate::context::ExecutionContext {
383        &self.inner.ctx
384    }
385    fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext {
386        &mut self.inner.ctx
387    }
388}
389
390async fn run_with_retry<S, Req, Resp>(
391    mut inner: S,
392    mut request: Req,
393    policy: RetryPolicy,
394) -> Result<Resp>
395where
396    S: Service<Req, Response = Resp, Error = Error> + Clone + Send,
397    S::Future: Send,
398    Req: Retryable + Send,
399{
400    // Per-call RNG seeded from a system-time component so two
401    // concurrent calls never share a jitter sequence. The RNG lives
402    // for the lifetime of the call only.
403    let seed = seed_from_time();
404    let mut rng = SmallRng::seed_from_u64(seed);
405
406    // Stamp an idempotency key on the request the first time we see
407    // it. Every subsequent clone (one per retry attempt) inherits
408    // the key, so vendor-side dedupe treats N attempts as one
409    // logical call (invariant #17 — no double-charge when a client
410    // timeout races a server-side success).
411    request
412        .ctx_mut()
413        .ensure_idempotency_key(|| uuid::Uuid::new_v4().to_string());
414
415    let max_attempts = policy.max_attempts.max(1);
416    let mut attempt: u32 = 0;
417    loop {
418        // Deadline + cancellation check at the head of every attempt.
419        // A caller with `with_timeout(200ms)` and exponential backoff
420        // would otherwise sleep through the deadline and surface
421        // `Provider` instead of the more specific `DeadlineExceeded`,
422        // making it impossible to distinguish "upstream is sick"
423        // from "we ran out of time".
424        let ctx_token = request.ctx().cancellation();
425        if ctx_token.is_cancelled() {
426            return Err(Error::Cancelled);
427        }
428        if let Some(deadline) = request.ctx().deadline()
429            && tokio::time::Instant::now() >= deadline
430        {
431            return Err(Error::DeadlineExceeded);
432        }
433
434        let cloned = request.clone();
435        let result = inner.ready().await?.call(cloned).await;
436
437        match result {
438            Ok(resp) => return Ok(resp),
439            Err(err) => {
440                attempt = attempt.saturating_add(1);
441                let exhausted = attempt >= max_attempts;
442                let decision = policy.classifier.should_retry(&err, attempt - 1);
443                if exhausted || !decision.retry {
444                    return Err(err);
445                }
446                // Vendor `Retry-After` beats self-jitter (invariant
447                // #17 — vendor authoritative signal wins). Cap at the
448                // configured backoff cap so a malicious or stuck
449                // vendor cannot pin the loop.
450                let backoff_delay = policy.backoff.delay_for_attempt(attempt - 1, &mut rng);
451                let delay = match decision.after {
452                    Some(hint) => hint.min(policy.backoff.max()),
453                    None => backoff_delay,
454                };
455                // If the caller's deadline lands inside this backoff
456                // window, cap the sleep to the remaining budget and
457                // surface `DeadlineExceeded` rather than waking up
458                // past the deadline only to retry-then-deadline.
459                let effective_delay = if let Some(deadline) = request.ctx().deadline() {
460                    let now = tokio::time::Instant::now();
461                    let remaining = deadline.saturating_duration_since(now);
462                    if remaining.is_zero() {
463                        return Err(Error::DeadlineExceeded);
464                    }
465                    delay.min(remaining)
466                } else {
467                    delay
468                };
469                let deadline_for_select = request.ctx().deadline();
470                tokio::select! {
471                    () = tokio::time::sleep(effective_delay) => {
472                        // If we capped the sleep at the deadline,
473                        // bail out instead of looping for one more
474                        // doomed attempt.
475                        if let Some(deadline) = deadline_for_select
476                            && tokio::time::Instant::now() >= deadline
477                        {
478                            return Err(Error::DeadlineExceeded);
479                        }
480                    }
481                    () = ctx_token.cancelled() => return Err(Error::Cancelled),
482                }
483            }
484        }
485    }
486}
487
488/// Seed the per-call RNG from the system clock's nanosecond
489/// component XORed with a process-local counter so two calls
490/// arriving in the same nanosecond still get distinct sequences.
491fn seed_from_time() -> u64 {
492    use std::sync::atomic::{AtomicU64, Ordering};
493    use std::time::{SystemTime, UNIX_EPOCH};
494    static COUNTER: AtomicU64 = AtomicU64::new(0);
495    // u128 nanoseconds wraps once every ~584 years at u64; truncating
496    // is fine for jitter — we only need uncorrelated low-order bits.
497    // silent-fallback-ok: jitter seed only — `now() < UNIX_EPOCH`
498    // cannot happen on a sane clock, and the per-process atomic
499    // counter XORed below already breaks ties so a 0 nanos
500    // contribution still yields uncorrelated low-order bits.
501    let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map_or(0, |d| {
502        let n = d.as_nanos();
503        #[allow(clippy::cast_possible_truncation)]
504        {
505            n as u64
506        }
507    });
508    let bump = COUNTER.fetch_add(1, Ordering::Relaxed);
509    nanos ^ bump
510}
511
512#[cfg(test)]
513#[allow(clippy::unwrap_used, clippy::expect_used)]
514mod tests {
515    use super::*;
516
517    #[test]
518    fn default_classifier_retries_transient_http_status_codes() {
519        let c = DefaultRetryClassifier;
520        for status in [408_u16, 425, 429, 500, 502, 503, 504, 599] {
521            let err = Error::provider_http(status, "x");
522            assert!(c.should_retry(&err, 0).retry, "status {status} must retry");
523        }
524    }
525
526    #[test]
527    fn default_classifier_retries_transport_class_failures() {
528        let c = DefaultRetryClassifier;
529        assert!(
530            c.should_retry(&Error::provider_network("connect refused"), 0)
531                .retry
532        );
533        assert!(
534            c.should_retry(&Error::provider_tls("handshake failed"), 0)
535                .retry
536        );
537        assert!(
538            c.should_retry(&Error::provider_dns("no such host"), 0)
539                .retry
540        );
541    }
542
543    #[test]
544    fn default_classifier_does_not_retry_permanent_status_codes() {
545        let c = DefaultRetryClassifier;
546        for status in [400_u16, 401, 403, 404, 410, 422] {
547            let err = Error::provider_http(status, "x");
548            assert!(
549                !c.should_retry(&err, 0).retry,
550                "status {status} must NOT retry"
551            );
552        }
553    }
554
555    #[test]
556    fn default_classifier_does_not_retry_caller_intent_or_programming_errors() {
557        let c = DefaultRetryClassifier;
558        assert!(!c.should_retry(&Error::Cancelled, 0).retry);
559        assert!(!c.should_retry(&Error::DeadlineExceeded, 0).retry);
560        assert!(!c.should_retry(&Error::invalid_request("nope"), 0).retry);
561        assert!(!c.should_retry(&Error::config("bad"), 0).retry);
562    }
563
564    #[test]
565    fn default_classifier_does_not_retry_usage_limit_exceeded() {
566        // Budget breach is terminal — re-issuing burns one more
567        // counter slot on each attempt's pre-call CAS without
568        // changing the outcome. Explicit arm pins the
569        // classification so a future variant inserted between
570        // `Provider` and the wildcard cannot accidentally land
571        // budget breach on the retry path.
572        use crate::run_budget::UsageLimitBreach;
573        let c = DefaultRetryClassifier;
574        let err = Error::UsageLimitExceeded(UsageLimitBreach::Requests {
575            limit: 5,
576            observed: 5,
577        });
578        let decision = c.should_retry(&err, 0);
579        assert!(!decision.retry);
580        assert_eq!(decision.after, None);
581    }
582
583    #[test]
584    fn default_classifier_propagates_vendor_retry_after_hint() {
585        let c = DefaultRetryClassifier;
586        let err =
587            Error::provider_http(429, "rate limited").with_retry_after(Duration::from_secs(7));
588        let decision = c.should_retry(&err, 0);
589        assert!(decision.retry);
590        assert_eq!(decision.after, Some(Duration::from_secs(7)));
591    }
592
593    #[test]
594    fn ensure_idempotency_key_stamps_once_and_subsequent_calls_observe_the_same_value() {
595        // RetryService relies on this contract — the first call
596        // sets a fresh UUID; later calls in the same logical
597        // call (one per attempt) see the same value rather than
598        // generating new ones.
599        use crate::context::ExecutionContext;
600        let mut ctx = ExecutionContext::new();
601        assert!(ctx.idempotency_key().is_none());
602        let mut counter = 0u32;
603        let first = ctx
604            .ensure_idempotency_key(|| {
605                counter += 1;
606                "first-uuid".to_owned()
607            })
608            .to_owned();
609        let second = ctx
610            .ensure_idempotency_key(|| {
611                counter += 1;
612                "second-uuid".to_owned()
613            })
614            .to_owned();
615        assert_eq!(first, "first-uuid");
616        assert_eq!(second, "first-uuid", "stamp must be stable across calls");
617        assert_eq!(counter, 1, "generator must run exactly once");
618        // Cloning the ctx propagates the same key — every retry of
619        // a logical call shares the stamp.
620        let cloned = ctx.clone();
621        assert_eq!(cloned.idempotency_key(), Some("first-uuid"));
622    }
623
624    #[test]
625    fn default_classifier_does_not_attach_retry_after_when_vendor_does_not_supply_one() {
626        let c = DefaultRetryClassifier;
627        let err = Error::provider_http(503, "down");
628        let decision = c.should_retry(&err, 0);
629        assert!(decision.retry);
630        assert!(decision.after.is_none());
631    }
632
633    #[test]
634    fn retry_policy_standard_uses_default_max_attempts() {
635        let p = RetryPolicy::standard();
636        assert_eq!(p.max_attempts(), DEFAULT_MAX_ATTEMPTS);
637    }
638
639    #[test]
640    fn retry_policy_overrides_compose() {
641        let p = RetryPolicy::standard()
642            .with_max_attempts(2)
643            .with_backoff(ExponentialBackoff::new(
644                Duration::from_millis(1),
645                Duration::from_millis(10),
646            ));
647        assert_eq!(p.max_attempts(), 2);
648        assert_eq!(p.backoff().base(), Duration::from_millis(1));
649    }
650}