entelix_core/transports/retry.rs
1//! `RetryLayer` + `RetryService` — `tower::Layer<S>` middleware that
2//! retries transient errors with exponential backoff and jitter.
3//!
4//! Wraps any `Service` whose `Error` is [`crate::error::Error`].
5//! Composes uniformly on the `Service<ModelInvocation>` and
6//! `Service<ToolInvocation>` paths; the same layer handles both
7//! because the retry decision is error-shape-driven, not invocation-
8//! shape-driven.
9//!
10//! ## Cancellation
11//!
12//! Every iteration head checks
13//! [`ExecutionContext::cancellation`](crate::context::ExecutionContext)
14//! and returns `Error::Cancelled` immediately if signalled — operator
15//! intent always wins over the retry policy.
16//!
17//! ## Layering with other middleware
18//!
19//! Place `RetryLayer` *outside* observability middleware
20//! (`OtelLayer`) so each retry attempt produces its own span — the
21//! span tree shows `n` attempts, not `1` opaque envelope. Place it
22//! *inside* policy gates (`PolicyLayer`) so the policy decision
23//! fires once per logical call, not per attempt.
24//!
25//! ```text
26//! PolicyLayer ↘
27//! RetryLayer ↘
28//! OtelLayer ↘
29//! <inner Service>
30//! ```
31
32use std::sync::Arc;
33use std::task::{Context, Poll};
34use std::time::Duration;
35
36use futures::future::BoxFuture;
37use rand::SeedableRng;
38use rand::rngs::SmallRng;
39use tower::{Layer, Service, ServiceExt};
40
41use crate::backoff::{DEFAULT_MAX_ATTEMPTS, ExponentialBackoff};
42use crate::error::{Error, Result};
43
44/// Parse a `Retry-After` HTTP header value into a [`Duration`].
45///
46/// Per RFC 7231, the value is either an integer number of seconds
47/// or an HTTP-date. Vendor APIs in this corner of the ecosystem
48/// (Anthropic, OpenAI, Bedrock) all use the integer form. Returns
49/// `None` for missing / malformed / zero values; `Some(Duration)`
50/// is the vendor's authoritative cooldown that retry classifiers
51/// honour ahead of self-jitter (invariant #17).
52#[must_use]
53pub fn parse_retry_after(header: Option<&http::HeaderValue>) -> Option<Duration> {
54 let header = header?.to_str().ok()?;
55 let secs: u64 = header.trim().parse().ok()?;
56 if secs == 0 {
57 return None;
58 }
59 Some(Duration::from_secs(secs))
60}
61
62/// Trait that classifies whether an error justifies another attempt
63/// and — when the vendor supplies one — the cooldown to wait before
64/// the next try.
65///
66/// The same trait drives `RetryLayer` *and*
67/// `RunnableExt::with_fallbacks`: in both cases the question is
68/// "is this error transient or permanent?". Reusing one trait keeps
69/// users out of policy-selection paralysis and avoids parallel
70/// taxonomies.
71pub trait RetryClassifier: Send + Sync + std::fmt::Debug {
72 /// Whether to attempt again, plus the optional vendor-supplied
73 /// cooldown. `attempt` starts at `0` for the first failed call
74 /// (i.e., before the first *retry*); the next failure passes
75 /// `attempt = 1`, and so on. Implementations can gate by
76 /// attempt count, error variant, kind, or any combination.
77 fn should_retry(&self, error: &Error, attempt: u32) -> RetryDecision;
78}
79
80/// Decision returned by [`RetryClassifier::should_retry`]. The
81/// `after` field carries the vendor's `Retry-After` hint when
82/// present — `RetryService` honours it ahead of its own
83/// exponential-backoff plan, capping at the configured maximum so a
84/// malicious vendor cannot pin the loop forever.
85#[derive(Clone, Copy, Debug, Eq, PartialEq)]
86pub struct RetryDecision {
87 /// Whether to attempt again.
88 pub retry: bool,
89 /// Vendor `Retry-After` hint when present. `None` means the
90 /// classifier defers to the configured backoff.
91 pub after: Option<Duration>,
92}
93
94impl RetryDecision {
95 /// Convenience: do not retry.
96 pub const STOP: Self = Self {
97 retry: false,
98 after: None,
99 };
100
101 /// Convenience: retry with the configured backoff.
102 pub const RETRY: Self = Self {
103 retry: true,
104 after: None,
105 };
106
107 /// Convenience: retry after the supplied vendor cooldown.
108 #[must_use]
109 pub const fn retry_after(after: Duration) -> Self {
110 Self {
111 retry: true,
112 after: Some(after),
113 }
114 }
115}
116
117/// Standard classifier — retries on transient HTTP / transport
118/// classes:
119///
120/// - `Provider { status: 0, .. }` — transport / DNS / connect failure.
121/// - `Provider { status: 408 | 425 | 429 | 500..=599, .. }` —
122/// Request Timeout / Too Early / Too Many Requests / 5xx server.
123///
124/// Permanent failures (`InvalidRequest`, `Config`, `Cancelled`,
125/// `DeadlineExceeded`, `Interrupted`, `Serde`, `UsageLimitExceeded`,
126/// and 4xx other than 408 / 425 / 429) are not retried.
127/// `UsageLimitExceeded` is matched explicitly — the wildcard arm
128/// would catch it, but a future variant inserted between
129/// `Provider` and the wildcard could accidentally be classified
130/// retryable; the explicit arm pins budget breach as terminal
131/// regardless of what new variants land.
132#[derive(Clone, Copy, Debug, Default)]
133pub struct DefaultRetryClassifier;
134
135impl RetryClassifier for DefaultRetryClassifier {
136 // The explicit `UsageLimitExceeded` arm is intentional defence
137 //: the wildcard below would absorb the variant
138 // identically today, but a future variant inserted between
139 // `Provider` and the wildcard could accidentally land budget
140 // breach on the retry path. Clippy's `match_same_arms` lint
141 // would have us collapse the arm — that defeats the explicit
142 // future-proofing the ADR pinned.
143 #[allow(clippy::match_same_arms)]
144 fn should_retry(&self, error: &Error, _attempt: u32) -> RetryDecision {
145 // `ToolErrorPolicyLayer` may wrap a still-retryable failure
146 // into `ToolErrorTerminal` (when the operator adds
147 // `Transient` / `RateLimit` to their `terminate_on` set).
148 // Walk the wrap chain to the leaf so the retry decision is
149 // identical regardless of whether the policy layer sits
150 // inside or outside the retry layer — without this unwrap,
151 // a policy layer placed inside the retry layer would
152 // silently disable retries.
153 let mut current = error;
154 while let Error::ToolErrorTerminal { source, .. } = current {
155 current = source;
156 }
157 match current {
158 Error::Provider {
159 kind, retry_after, ..
160 } if is_transient_kind(*kind) => match retry_after {
161 Some(after) => RetryDecision::retry_after(*after),
162 None => RetryDecision::RETRY,
163 },
164 // Budget breach is terminal — re-issuing the same call
165 // after `RunBudget` rejected it produces the same
166 // `UsageLimitExceeded` and burns one more counter slot
167 // on the way (the pre-call CAS still increments before
168 // the cap check). Explicit match defends against future
169 // variants accidentally landing on the retry path.
170 Error::UsageLimitExceeded(_) => RetryDecision::STOP,
171 // Everything else is deterministic, caller-intent, or HITL —
172 // retrying produces the same outcome (or violates intent).
173 _ => RetryDecision::STOP,
174 }
175 }
176}
177
178const fn is_transient_kind(kind: crate::error::ProviderErrorKind) -> bool {
179 use crate::error::ProviderErrorKind;
180 match kind {
181 // Network / TLS / DNS failures are transient by default —
182 // operator routes, peer connections, name resolution all
183 // recover.
184 ProviderErrorKind::Network | ProviderErrorKind::Tls | ProviderErrorKind::Dns => true,
185 // HTTP-class transience is the documented retry set:
186 // 408 Request Timeout, 425 Too Early, 429 Too Many Requests,
187 // 5xx server errors. 4xx other than these are caller-side
188 // bugs and re-issuing produces the same response.
189 ProviderErrorKind::Http(status) => matches!(status, 408 | 425 | 429 | 500..=599),
190 }
191}
192
193/// Retry policy: how many attempts, how to space them, what to retry.
194#[derive(Clone, Debug)]
195pub struct RetryPolicy {
196 /// Maximum total attempts (including the first call). `1`
197 /// disables retry — the first failure surfaces unchanged.
198 max_attempts: u32,
199 /// Backoff sequence between attempts.
200 backoff: ExponentialBackoff,
201 /// Error → "retry?" decision.
202 classifier: Arc<dyn RetryClassifier>,
203}
204
205impl RetryPolicy {
206 /// Build with explicit components.
207 #[must_use]
208 pub fn new(
209 max_attempts: u32,
210 backoff: ExponentialBackoff,
211 classifier: Arc<dyn RetryClassifier>,
212 ) -> Self {
213 Self {
214 max_attempts,
215 backoff,
216 classifier,
217 }
218 }
219
220 /// Default policy: [`DEFAULT_MAX_ATTEMPTS`] attempts,
221 /// `100 ms` → `5 s` backoff with 30% jitter,
222 /// [`DefaultRetryClassifier`].
223 #[must_use]
224 pub fn standard() -> Self {
225 Self::new(
226 DEFAULT_MAX_ATTEMPTS,
227 ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(5)),
228 Arc::new(DefaultRetryClassifier),
229 )
230 }
231
232 /// Override the attempt cap.
233 #[must_use]
234 pub const fn with_max_attempts(mut self, n: u32) -> Self {
235 self.max_attempts = n;
236 self
237 }
238
239 /// Override the backoff sequence.
240 #[must_use]
241 pub const fn with_backoff(mut self, backoff: ExponentialBackoff) -> Self {
242 self.backoff = backoff;
243 self
244 }
245
246 /// Override the classifier.
247 #[must_use]
248 pub fn with_classifier(mut self, classifier: Arc<dyn RetryClassifier>) -> Self {
249 self.classifier = classifier;
250 self
251 }
252
253 /// Borrow the configured attempt cap.
254 #[must_use]
255 pub const fn max_attempts(&self) -> u32 {
256 self.max_attempts
257 }
258
259 /// Borrow the configured backoff.
260 #[must_use]
261 pub const fn backoff(&self) -> ExponentialBackoff {
262 self.backoff
263 }
264
265 /// Borrow the configured classifier.
266 #[must_use]
267 pub fn classifier(&self) -> &Arc<dyn RetryClassifier> {
268 &self.classifier
269 }
270}
271
272/// Layer that adds retry semantics to any `Service<Req>` whose
273/// `Error` is [`Error`].
274///
275/// The wrapped service must be `Clone` (each retry attempt clones a
276/// fresh handle to call). `tower`'s convention is that `Clone`
277/// returns a cheap reference-counted view, not a deep copy — every
278/// `*Layer` in entelix follows that contract.
279#[derive(Clone, Debug)]
280pub struct RetryLayer {
281 policy: RetryPolicy,
282}
283
284impl RetryLayer {
285 /// Patch-version-stable identifier surfaced through
286 /// [`crate::ChatModel::layer_names`] /
287 /// `ToolRegistry::layer_names`. Renaming this constant is a
288 /// breaking change for dashboards keyed off the value.
289 pub const NAME: &'static str = "retry";
290
291 /// Build with a retry policy.
292 #[must_use]
293 pub const fn new(policy: RetryPolicy) -> Self {
294 Self { policy }
295 }
296}
297
298impl<S> Layer<S> for RetryLayer {
299 type Service = RetryService<S>;
300 fn layer(&self, inner: S) -> Self::Service {
301 RetryService {
302 inner,
303 policy: self.policy.clone(),
304 }
305 }
306}
307
308impl crate::NamedLayer for RetryLayer {
309 fn layer_name(&self) -> &'static str {
310 Self::NAME
311 }
312}
313
314/// `Service` produced by [`RetryLayer`]. Generic over the request
315/// type so a single layer drives both `ModelInvocation` and
316/// `ToolInvocation` paths.
317#[derive(Clone, Debug)]
318pub struct RetryService<S> {
319 inner: S,
320 policy: RetryPolicy,
321}
322
323impl<S, Req, Resp> Service<Req> for RetryService<S>
324where
325 S: Service<Req, Response = Resp, Error = Error> + Clone + Send + 'static,
326 S::Future: Send + 'static,
327 Req: Retryable + Send + 'static,
328 Resp: Send + 'static,
329{
330 type Response = Resp;
331 type Error = Error;
332 type Future = BoxFuture<'static, Result<Resp>>;
333
334 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
335 self.inner.poll_ready(cx)
336 }
337
338 fn call(&mut self, request: Req) -> Self::Future {
339 let inner = self.inner.clone();
340 let policy = self.policy.clone();
341 Box::pin(async move { run_with_retry(inner, request, policy).await })
342 }
343}
344
345/// Marker that a request type can be cloned for retries.
346///
347/// `RetryService` clones the request once per attempt (the inner
348/// service is consumed by each call). For `ModelInvocation` and
349/// `ToolInvocation` the clone is cheap (`Arc`-backed `ModelRequest`
350/// and JSON-`Value` body).
351pub trait Retryable: Clone {
352 /// Borrow the [`ExecutionContext`](crate::context::ExecutionContext)
353 /// the retry loop checks for cancellation between attempts.
354 fn ctx(&self) -> &crate::context::ExecutionContext;
355
356 /// Mutable handle so [`RetryService`] can stamp an idempotency
357 /// key on first entry — every clone of the request that follows
358 /// shares the stamped key, so vendor-side dedupe sees one
359 /// logical call across N attempts.
360 fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext;
361}
362
363impl Retryable for crate::service::ModelInvocation {
364 fn ctx(&self) -> &crate::context::ExecutionContext {
365 &self.ctx
366 }
367 fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext {
368 &mut self.ctx
369 }
370}
371
372impl Retryable for crate::service::ToolInvocation {
373 fn ctx(&self) -> &crate::context::ExecutionContext {
374 &self.ctx
375 }
376 fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext {
377 &mut self.ctx
378 }
379}
380
381impl Retryable for crate::service::StreamingModelInvocation {
382 fn ctx(&self) -> &crate::context::ExecutionContext {
383 &self.inner.ctx
384 }
385 fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext {
386 &mut self.inner.ctx
387 }
388}
389
390async fn run_with_retry<S, Req, Resp>(
391 mut inner: S,
392 mut request: Req,
393 policy: RetryPolicy,
394) -> Result<Resp>
395where
396 S: Service<Req, Response = Resp, Error = Error> + Clone + Send,
397 S::Future: Send,
398 Req: Retryable + Send,
399{
400 // Per-call RNG seeded from a system-time component so two
401 // concurrent calls never share a jitter sequence. The RNG lives
402 // for the lifetime of the call only.
403 let seed = seed_from_time();
404 let mut rng = SmallRng::seed_from_u64(seed);
405
406 // Stamp an idempotency key on the request the first time we see
407 // it. Every subsequent clone (one per retry attempt) inherits
408 // the key, so vendor-side dedupe treats N attempts as one
409 // logical call (invariant #17 — no double-charge when a client
410 // timeout races a server-side success).
411 request
412 .ctx_mut()
413 .ensure_idempotency_key(|| uuid::Uuid::new_v4().to_string());
414
415 let max_attempts = policy.max_attempts.max(1);
416 let mut attempt: u32 = 0;
417 loop {
418 // Deadline + cancellation check at the head of every attempt.
419 // A caller with `with_timeout(200ms)` and exponential backoff
420 // would otherwise sleep through the deadline and surface
421 // `Provider` instead of the more specific `DeadlineExceeded`,
422 // making it impossible to distinguish "upstream is sick"
423 // from "we ran out of time".
424 let ctx_token = request.ctx().cancellation();
425 if ctx_token.is_cancelled() {
426 return Err(Error::Cancelled);
427 }
428 if let Some(deadline) = request.ctx().deadline()
429 && tokio::time::Instant::now() >= deadline
430 {
431 return Err(Error::DeadlineExceeded);
432 }
433
434 let cloned = request.clone();
435 let result = inner.ready().await?.call(cloned).await;
436
437 match result {
438 Ok(resp) => return Ok(resp),
439 Err(err) => {
440 attempt = attempt.saturating_add(1);
441 let exhausted = attempt >= max_attempts;
442 let decision = policy.classifier.should_retry(&err, attempt - 1);
443 if exhausted || !decision.retry {
444 return Err(err);
445 }
446 // Vendor `Retry-After` beats self-jitter (invariant
447 // #17 — vendor authoritative signal wins). Cap at the
448 // configured backoff cap so a malicious or stuck
449 // vendor cannot pin the loop.
450 let backoff_delay = policy.backoff.delay_for_attempt(attempt - 1, &mut rng);
451 let delay = match decision.after {
452 Some(hint) => hint.min(policy.backoff.max()),
453 None => backoff_delay,
454 };
455 // If the caller's deadline lands inside this backoff
456 // window, cap the sleep to the remaining budget and
457 // surface `DeadlineExceeded` rather than waking up
458 // past the deadline only to retry-then-deadline.
459 let effective_delay = if let Some(deadline) = request.ctx().deadline() {
460 let now = tokio::time::Instant::now();
461 let remaining = deadline.saturating_duration_since(now);
462 if remaining.is_zero() {
463 return Err(Error::DeadlineExceeded);
464 }
465 delay.min(remaining)
466 } else {
467 delay
468 };
469 let deadline_for_select = request.ctx().deadline();
470 tokio::select! {
471 () = tokio::time::sleep(effective_delay) => {
472 // If we capped the sleep at the deadline,
473 // bail out instead of looping for one more
474 // doomed attempt.
475 if let Some(deadline) = deadline_for_select
476 && tokio::time::Instant::now() >= deadline
477 {
478 return Err(Error::DeadlineExceeded);
479 }
480 }
481 () = ctx_token.cancelled() => return Err(Error::Cancelled),
482 }
483 }
484 }
485 }
486}
487
488/// Seed the per-call RNG from the system clock's nanosecond
489/// component XORed with a process-local counter so two calls
490/// arriving in the same nanosecond still get distinct sequences.
491fn seed_from_time() -> u64 {
492 use std::sync::atomic::{AtomicU64, Ordering};
493 use std::time::{SystemTime, UNIX_EPOCH};
494 static COUNTER: AtomicU64 = AtomicU64::new(0);
495 // u128 nanoseconds wraps once every ~584 years at u64; truncating
496 // is fine for jitter — we only need uncorrelated low-order bits.
497 // silent-fallback-ok: jitter seed only — `now() < UNIX_EPOCH`
498 // cannot happen on a sane clock, and the per-process atomic
499 // counter XORed below already breaks ties so a 0 nanos
500 // contribution still yields uncorrelated low-order bits.
501 let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map_or(0, |d| {
502 let n = d.as_nanos();
503 #[allow(clippy::cast_possible_truncation)]
504 {
505 n as u64
506 }
507 });
508 let bump = COUNTER.fetch_add(1, Ordering::Relaxed);
509 nanos ^ bump
510}
511
512#[cfg(test)]
513#[allow(clippy::unwrap_used, clippy::expect_used)]
514mod tests {
515 use super::*;
516
517 #[test]
518 fn default_classifier_retries_transient_http_status_codes() {
519 let c = DefaultRetryClassifier;
520 for status in [408_u16, 425, 429, 500, 502, 503, 504, 599] {
521 let err = Error::provider_http(status, "x");
522 assert!(c.should_retry(&err, 0).retry, "status {status} must retry");
523 }
524 }
525
526 #[test]
527 fn default_classifier_retries_transport_class_failures() {
528 let c = DefaultRetryClassifier;
529 assert!(
530 c.should_retry(&Error::provider_network("connect refused"), 0)
531 .retry
532 );
533 assert!(
534 c.should_retry(&Error::provider_tls("handshake failed"), 0)
535 .retry
536 );
537 assert!(
538 c.should_retry(&Error::provider_dns("no such host"), 0)
539 .retry
540 );
541 }
542
543 #[test]
544 fn default_classifier_does_not_retry_permanent_status_codes() {
545 let c = DefaultRetryClassifier;
546 for status in [400_u16, 401, 403, 404, 410, 422] {
547 let err = Error::provider_http(status, "x");
548 assert!(
549 !c.should_retry(&err, 0).retry,
550 "status {status} must NOT retry"
551 );
552 }
553 }
554
555 #[test]
556 fn default_classifier_does_not_retry_caller_intent_or_programming_errors() {
557 let c = DefaultRetryClassifier;
558 assert!(!c.should_retry(&Error::Cancelled, 0).retry);
559 assert!(!c.should_retry(&Error::DeadlineExceeded, 0).retry);
560 assert!(!c.should_retry(&Error::invalid_request("nope"), 0).retry);
561 assert!(!c.should_retry(&Error::config("bad"), 0).retry);
562 }
563
564 #[test]
565 fn default_classifier_does_not_retry_usage_limit_exceeded() {
566 // Budget breach is terminal — re-issuing burns one more
567 // counter slot on each attempt's pre-call CAS without
568 // changing the outcome. Explicit arm pins the
569 // classification so a future variant inserted between
570 // `Provider` and the wildcard cannot accidentally land
571 // budget breach on the retry path.
572 use crate::run_budget::UsageLimitBreach;
573 let c = DefaultRetryClassifier;
574 let err = Error::UsageLimitExceeded(UsageLimitBreach::Requests {
575 limit: 5,
576 observed: 5,
577 });
578 let decision = c.should_retry(&err, 0);
579 assert!(!decision.retry);
580 assert_eq!(decision.after, None);
581 }
582
583 #[test]
584 fn default_classifier_propagates_vendor_retry_after_hint() {
585 let c = DefaultRetryClassifier;
586 let err =
587 Error::provider_http(429, "rate limited").with_retry_after(Duration::from_secs(7));
588 let decision = c.should_retry(&err, 0);
589 assert!(decision.retry);
590 assert_eq!(decision.after, Some(Duration::from_secs(7)));
591 }
592
593 #[test]
594 fn ensure_idempotency_key_stamps_once_and_subsequent_calls_observe_the_same_value() {
595 // RetryService relies on this contract — the first call
596 // sets a fresh UUID; later calls in the same logical
597 // call (one per attempt) see the same value rather than
598 // generating new ones.
599 use crate::context::ExecutionContext;
600 let mut ctx = ExecutionContext::new();
601 assert!(ctx.idempotency_key().is_none());
602 let mut counter = 0u32;
603 let first = ctx
604 .ensure_idempotency_key(|| {
605 counter += 1;
606 "first-uuid".to_owned()
607 })
608 .to_owned();
609 let second = ctx
610 .ensure_idempotency_key(|| {
611 counter += 1;
612 "second-uuid".to_owned()
613 })
614 .to_owned();
615 assert_eq!(first, "first-uuid");
616 assert_eq!(second, "first-uuid", "stamp must be stable across calls");
617 assert_eq!(counter, 1, "generator must run exactly once");
618 // Cloning the ctx propagates the same key — every retry of
619 // a logical call shares the stamp.
620 let cloned = ctx.clone();
621 assert_eq!(cloned.idempotency_key(), Some("first-uuid"));
622 }
623
624 #[test]
625 fn default_classifier_does_not_attach_retry_after_when_vendor_does_not_supply_one() {
626 let c = DefaultRetryClassifier;
627 let err = Error::provider_http(503, "down");
628 let decision = c.should_retry(&err, 0);
629 assert!(decision.retry);
630 assert!(decision.after.is_none());
631 }
632
633 #[test]
634 fn retry_policy_standard_uses_default_max_attempts() {
635 let p = RetryPolicy::standard();
636 assert_eq!(p.max_attempts(), DEFAULT_MAX_ATTEMPTS);
637 }
638
639 #[test]
640 fn retry_policy_overrides_compose() {
641 let p = RetryPolicy::standard()
642 .with_max_attempts(2)
643 .with_backoff(ExponentialBackoff::new(
644 Duration::from_millis(1),
645 Duration::from_millis(10),
646 ));
647 assert_eq!(p.max_attempts(), 2);
648 assert_eq!(p.backoff().base(), Duration::from_millis(1));
649 }
650}