entelix_core/transports/retry.rs
1//! `RetryLayer` + `RetryService` — `tower::Layer<S>` middleware that
2//! retries transient errors with exponential backoff and jitter.
3//!
4//! Wraps any `Service` whose `Error` is [`crate::error::Error`].
5//! Composes uniformly on the `Service<ModelInvocation>` and
6//! `Service<ToolInvocation>` paths; the same layer handles both
7//! because the retry decision is error-shape-driven, not invocation-
8//! shape-driven.
9//!
10//! ## Cancellation
11//!
12//! Every iteration head checks
13//! [`ExecutionContext::cancellation`](crate::context::ExecutionContext)
14//! and returns `Error::Cancelled` immediately if signalled — operator
15//! intent always wins over the retry policy.
16//!
17//! ## Layering with other middleware
18//!
19//! Place `RetryLayer` *outside* observability middleware
20//! (`OtelLayer`) so each retry attempt produces its own span — the
21//! span tree shows `n` attempts, not `1` opaque envelope. Place it
22//! *inside* policy gates (`PolicyLayer`) so the policy decision
23//! fires once per logical call, not per attempt.
24//!
25//! ```text
26//! PolicyLayer ↘
27//! RetryLayer ↘
28//! OtelLayer ↘
29//! <inner Service>
30//! ```
31
32use std::sync::Arc;
33use std::task::{Context, Poll};
34use std::time::Duration;
35
36use futures::future::BoxFuture;
37use rand::SeedableRng;
38use rand::rngs::SmallRng;
39use tower::{Layer, Service, ServiceExt};
40
41use crate::backoff::{DEFAULT_MAX_ATTEMPTS, ExponentialBackoff};
42use crate::error::{Error, Result};
43
44/// Parse a `Retry-After` HTTP header value into a [`Duration`].
45///
46/// Per RFC 7231, the value is either an integer number of seconds
47/// or an HTTP-date. Vendor APIs in this corner of the ecosystem
48/// (Anthropic, OpenAI, Bedrock) all use the integer form. Returns
49/// `None` for missing / malformed / zero values; `Some(Duration)`
50/// is the vendor's authoritative cooldown that retry classifiers
51/// honour ahead of self-jitter (invariant #17).
52#[must_use]
53pub fn parse_retry_after(header: Option<&http::HeaderValue>) -> Option<Duration> {
54 let header = header?.to_str().ok()?;
55 let secs: u64 = header.trim().parse().ok()?;
56 if secs == 0 {
57 return None;
58 }
59 Some(Duration::from_secs(secs))
60}
61
62/// Trait that classifies whether an error justifies another attempt
63/// and — when the vendor supplies one — the cooldown to wait before
64/// the next try.
65///
66/// The same trait drives `RetryLayer` *and*
67/// `RunnableExt::with_fallbacks`: in both cases the question is
68/// "is this error transient or permanent?". Reusing one trait keeps
69/// users out of policy-selection paralysis and avoids parallel
70/// taxonomies.
71pub trait RetryClassifier: Send + Sync + std::fmt::Debug {
72 /// Whether to attempt again, plus the optional vendor-supplied
73 /// cooldown. `attempt` starts at `0` for the first failed call
74 /// (i.e., before the first *retry*); the next failure passes
75 /// `attempt = 1`, and so on. Implementations can gate by
76 /// attempt count, error variant, kind, or any combination.
77 fn should_retry(&self, error: &Error, attempt: u32) -> RetryDecision;
78}
79
80/// Decision returned by [`RetryClassifier::should_retry`]. The
81/// `after` field carries the vendor's `Retry-After` hint when
82/// present — `RetryService` honours it ahead of its own
83/// exponential-backoff plan, capping at the configured maximum so a
84/// malicious vendor cannot pin the loop forever.
85#[derive(Clone, Copy, Debug, Eq, PartialEq)]
86pub struct RetryDecision {
87 /// Whether to attempt again.
88 pub retry: bool,
89 /// Vendor `Retry-After` hint when present. `None` means the
90 /// classifier defers to the configured backoff.
91 pub after: Option<Duration>,
92}
93
94impl RetryDecision {
95 /// Convenience: do not retry.
96 pub const STOP: Self = Self {
97 retry: false,
98 after: None,
99 };
100
101 /// Convenience: retry with the configured backoff.
102 pub const RETRY: Self = Self {
103 retry: true,
104 after: None,
105 };
106
107 /// Convenience: retry after the supplied vendor cooldown.
108 #[must_use]
109 pub const fn retry_after(after: Duration) -> Self {
110 Self {
111 retry: true,
112 after: Some(after),
113 }
114 }
115}
116
117/// Standard classifier — retries on transient HTTP / transport
118/// classes:
119///
120/// - `Provider { status: 0, .. }` — transport / DNS / connect failure.
121/// - `Provider { status: 408 | 425 | 429 | 500..=599, .. }` —
122/// Request Timeout / Too Early / Too Many Requests / 5xx server.
123///
124/// Permanent failures (`InvalidRequest`, `Config`, `Cancelled`,
125/// `DeadlineExceeded`, `Interrupted`, `Serde`, `UsageLimitExceeded`,
126/// and 4xx other than 408 / 425 / 429) are not retried.
127/// `UsageLimitExceeded` is matched explicitly — the wildcard arm
128/// would catch it, but a future variant inserted between
129/// `Provider` and the wildcard could accidentally be classified
130/// retryable; the explicit arm pins budget breach as terminal
131/// regardless of what new variants land.
132#[derive(Clone, Copy, Debug, Default)]
133pub struct DefaultRetryClassifier;
134
135impl RetryClassifier for DefaultRetryClassifier {
136 // The explicit `UsageLimitExceeded` arm is intentional defence
137 //: the wildcard below would absorb the variant
138 // identically today, but a future variant inserted between
139 // `Provider` and the wildcard could accidentally land budget
140 // breach on the retry path. Clippy's `match_same_arms` lint
141 // would have us collapse the arm — that defeats the explicit
142 // future-proofing the ADR pinned.
143 #[allow(clippy::match_same_arms)]
144 fn should_retry(&self, error: &Error, _attempt: u32) -> RetryDecision {
145 match error {
146 Error::Provider {
147 kind, retry_after, ..
148 } if is_transient_kind(*kind) => match retry_after {
149 Some(after) => RetryDecision::retry_after(*after),
150 None => RetryDecision::RETRY,
151 },
152 // Budget breach is terminal — re-issuing the same call
153 // after `RunBudget` rejected it produces the same
154 // `UsageLimitExceeded` and burns one more counter slot
155 // on the way (the pre-call CAS still increments before
156 // the cap check). Explicit match defends against future
157 // variants accidentally landing on the retry path.
158 Error::UsageLimitExceeded(_) => RetryDecision::STOP,
159 // Everything else is deterministic, caller-intent, or HITL —
160 // retrying produces the same outcome (or violates intent).
161 _ => RetryDecision::STOP,
162 }
163 }
164}
165
166const fn is_transient_kind(kind: crate::error::ProviderErrorKind) -> bool {
167 use crate::error::ProviderErrorKind;
168 match kind {
169 // Network / TLS / DNS failures are transient by default —
170 // operator routes, peer connections, name resolution all
171 // recover.
172 ProviderErrorKind::Network | ProviderErrorKind::Tls | ProviderErrorKind::Dns => true,
173 // HTTP-class transience is the documented retry set:
174 // 408 Request Timeout, 425 Too Early, 429 Too Many Requests,
175 // 5xx server errors. 4xx other than these are caller-side
176 // bugs and re-issuing produces the same response.
177 ProviderErrorKind::Http(status) => matches!(status, 408 | 425 | 429 | 500..=599),
178 }
179}
180
181/// Retry policy: how many attempts, how to space them, what to retry.
182#[derive(Clone, Debug)]
183pub struct RetryPolicy {
184 /// Maximum total attempts (including the first call). `1`
185 /// disables retry — the first failure surfaces unchanged.
186 max_attempts: u32,
187 /// Backoff sequence between attempts.
188 backoff: ExponentialBackoff,
189 /// Error → "retry?" decision.
190 classifier: Arc<dyn RetryClassifier>,
191}
192
193impl RetryPolicy {
194 /// Build with explicit components.
195 #[must_use]
196 pub fn new(
197 max_attempts: u32,
198 backoff: ExponentialBackoff,
199 classifier: Arc<dyn RetryClassifier>,
200 ) -> Self {
201 Self {
202 max_attempts,
203 backoff,
204 classifier,
205 }
206 }
207
208 /// Default policy: [`DEFAULT_MAX_ATTEMPTS`] attempts,
209 /// `100 ms` → `5 s` backoff with 30% jitter,
210 /// [`DefaultRetryClassifier`].
211 #[must_use]
212 pub fn standard() -> Self {
213 Self::new(
214 DEFAULT_MAX_ATTEMPTS,
215 ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(5)),
216 Arc::new(DefaultRetryClassifier),
217 )
218 }
219
220 /// Override the attempt cap.
221 #[must_use]
222 pub const fn with_max_attempts(mut self, n: u32) -> Self {
223 self.max_attempts = n;
224 self
225 }
226
227 /// Override the backoff sequence.
228 #[must_use]
229 pub const fn with_backoff(mut self, backoff: ExponentialBackoff) -> Self {
230 self.backoff = backoff;
231 self
232 }
233
234 /// Override the classifier.
235 #[must_use]
236 pub fn with_classifier(mut self, classifier: Arc<dyn RetryClassifier>) -> Self {
237 self.classifier = classifier;
238 self
239 }
240
241 /// Borrow the configured attempt cap.
242 #[must_use]
243 pub const fn max_attempts(&self) -> u32 {
244 self.max_attempts
245 }
246
247 /// Borrow the configured backoff.
248 #[must_use]
249 pub const fn backoff(&self) -> ExponentialBackoff {
250 self.backoff
251 }
252
253 /// Borrow the configured classifier.
254 #[must_use]
255 pub fn classifier(&self) -> &Arc<dyn RetryClassifier> {
256 &self.classifier
257 }
258}
259
260/// Layer that adds retry semantics to any `Service<Req>` whose
261/// `Error` is [`Error`].
262///
263/// The wrapped service must be `Clone` (each retry attempt clones a
264/// fresh handle to call). `tower`'s convention is that `Clone`
265/// returns a cheap reference-counted view, not a deep copy — every
266/// `*Layer` in entelix follows that contract.
267#[derive(Clone, Debug)]
268pub struct RetryLayer {
269 policy: RetryPolicy,
270}
271
272impl RetryLayer {
273 /// Patch-version-stable identifier surfaced through
274 /// [`crate::ChatModel::layer_names`] /
275 /// `ToolRegistry::layer_names`. Renaming this constant is a
276 /// breaking change for dashboards keyed off the value.
277 pub const NAME: &'static str = "retry";
278
279 /// Build with a retry policy.
280 #[must_use]
281 pub const fn new(policy: RetryPolicy) -> Self {
282 Self { policy }
283 }
284}
285
286impl<S> Layer<S> for RetryLayer {
287 type Service = RetryService<S>;
288 fn layer(&self, inner: S) -> Self::Service {
289 RetryService {
290 inner,
291 policy: self.policy.clone(),
292 }
293 }
294}
295
296impl crate::NamedLayer for RetryLayer {
297 fn layer_name(&self) -> &'static str {
298 Self::NAME
299 }
300}
301
302/// `Service` produced by [`RetryLayer`]. Generic over the request
303/// type so a single layer drives both `ModelInvocation` and
304/// `ToolInvocation` paths.
305#[derive(Clone, Debug)]
306pub struct RetryService<S> {
307 inner: S,
308 policy: RetryPolicy,
309}
310
311impl<S, Req, Resp> Service<Req> for RetryService<S>
312where
313 S: Service<Req, Response = Resp, Error = Error> + Clone + Send + 'static,
314 S::Future: Send + 'static,
315 Req: Retryable + Send + 'static,
316 Resp: Send + 'static,
317{
318 type Response = Resp;
319 type Error = Error;
320 type Future = BoxFuture<'static, Result<Resp>>;
321
322 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
323 self.inner.poll_ready(cx)
324 }
325
326 fn call(&mut self, request: Req) -> Self::Future {
327 let inner = self.inner.clone();
328 let policy = self.policy.clone();
329 Box::pin(async move { run_with_retry(inner, request, policy).await })
330 }
331}
332
333/// Marker that a request type can be cloned for retries.
334///
335/// `RetryService` clones the request once per attempt (the inner
336/// service is consumed by each call). For `ModelInvocation` and
337/// `ToolInvocation` the clone is cheap (`Arc`-backed `ModelRequest`
338/// and JSON-`Value` body).
339pub trait Retryable: Clone {
340 /// Borrow the [`ExecutionContext`](crate::context::ExecutionContext)
341 /// the retry loop checks for cancellation between attempts.
342 fn ctx(&self) -> &crate::context::ExecutionContext;
343
344 /// Mutable handle so [`RetryService`] can stamp an idempotency
345 /// key on first entry — every clone of the request that follows
346 /// shares the stamped key, so vendor-side dedupe sees one
347 /// logical call across N attempts.
348 fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext;
349}
350
351impl Retryable for crate::service::ModelInvocation {
352 fn ctx(&self) -> &crate::context::ExecutionContext {
353 &self.ctx
354 }
355 fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext {
356 &mut self.ctx
357 }
358}
359
360impl Retryable for crate::service::ToolInvocation {
361 fn ctx(&self) -> &crate::context::ExecutionContext {
362 &self.ctx
363 }
364 fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext {
365 &mut self.ctx
366 }
367}
368
369impl Retryable for crate::service::StreamingModelInvocation {
370 fn ctx(&self) -> &crate::context::ExecutionContext {
371 &self.inner.ctx
372 }
373 fn ctx_mut(&mut self) -> &mut crate::context::ExecutionContext {
374 &mut self.inner.ctx
375 }
376}
377
378async fn run_with_retry<S, Req, Resp>(
379 mut inner: S,
380 mut request: Req,
381 policy: RetryPolicy,
382) -> Result<Resp>
383where
384 S: Service<Req, Response = Resp, Error = Error> + Clone + Send,
385 S::Future: Send,
386 Req: Retryable + Send,
387{
388 // Per-call RNG seeded from a system-time component so two
389 // concurrent calls never share a jitter sequence. The RNG lives
390 // for the lifetime of the call only.
391 let seed = seed_from_time();
392 let mut rng = SmallRng::seed_from_u64(seed);
393
394 // Stamp an idempotency key on the request the first time we see
395 // it. Every subsequent clone (one per retry attempt) inherits
396 // the key, so vendor-side dedupe treats N attempts as one
397 // logical call (invariant #17 — no double-charge when a client
398 // timeout races a server-side success).
399 request
400 .ctx_mut()
401 .ensure_idempotency_key(|| uuid::Uuid::new_v4().to_string());
402
403 let max_attempts = policy.max_attempts.max(1);
404 let mut attempt: u32 = 0;
405 loop {
406 // Deadline + cancellation check at the head of every attempt.
407 // A caller with `with_timeout(200ms)` and exponential backoff
408 // would otherwise sleep through the deadline and surface
409 // `Provider` instead of the more specific `DeadlineExceeded`,
410 // making it impossible to distinguish "upstream is sick"
411 // from "we ran out of time".
412 let ctx_token = request.ctx().cancellation();
413 if ctx_token.is_cancelled() {
414 return Err(Error::Cancelled);
415 }
416 if let Some(deadline) = request.ctx().deadline()
417 && tokio::time::Instant::now() >= deadline
418 {
419 return Err(Error::DeadlineExceeded);
420 }
421
422 let cloned = request.clone();
423 let result = inner.ready().await?.call(cloned).await;
424
425 match result {
426 Ok(resp) => return Ok(resp),
427 Err(err) => {
428 attempt = attempt.saturating_add(1);
429 let exhausted = attempt >= max_attempts;
430 let decision = policy.classifier.should_retry(&err, attempt - 1);
431 if exhausted || !decision.retry {
432 return Err(err);
433 }
434 // Vendor `Retry-After` beats self-jitter (invariant
435 // #17 — vendor authoritative signal wins). Cap at the
436 // configured backoff cap so a malicious or stuck
437 // vendor cannot pin the loop.
438 let backoff_delay = policy.backoff.delay_for_attempt(attempt - 1, &mut rng);
439 let delay = match decision.after {
440 Some(hint) => hint.min(policy.backoff.max()),
441 None => backoff_delay,
442 };
443 // If the caller's deadline lands inside this backoff
444 // window, cap the sleep to the remaining budget and
445 // surface `DeadlineExceeded` rather than waking up
446 // past the deadline only to retry-then-deadline.
447 let effective_delay = if let Some(deadline) = request.ctx().deadline() {
448 let now = tokio::time::Instant::now();
449 let remaining = deadline.saturating_duration_since(now);
450 if remaining.is_zero() {
451 return Err(Error::DeadlineExceeded);
452 }
453 delay.min(remaining)
454 } else {
455 delay
456 };
457 let deadline_for_select = request.ctx().deadline();
458 tokio::select! {
459 () = tokio::time::sleep(effective_delay) => {
460 // If we capped the sleep at the deadline,
461 // bail out instead of looping for one more
462 // doomed attempt.
463 if let Some(deadline) = deadline_for_select
464 && tokio::time::Instant::now() >= deadline
465 {
466 return Err(Error::DeadlineExceeded);
467 }
468 }
469 () = ctx_token.cancelled() => return Err(Error::Cancelled),
470 }
471 }
472 }
473 }
474}
475
476/// Seed the per-call RNG from the system clock's nanosecond
477/// component XORed with a process-local counter so two calls
478/// arriving in the same nanosecond still get distinct sequences.
479fn seed_from_time() -> u64 {
480 use std::sync::atomic::{AtomicU64, Ordering};
481 use std::time::{SystemTime, UNIX_EPOCH};
482 static COUNTER: AtomicU64 = AtomicU64::new(0);
483 // u128 nanoseconds wraps once every ~584 years at u64; truncating
484 // is fine for jitter — we only need uncorrelated low-order bits.
485 // silent-fallback-ok: jitter seed only — `now() < UNIX_EPOCH`
486 // cannot happen on a sane clock, and the per-process atomic
487 // counter XORed below already breaks ties so a 0 nanos
488 // contribution still yields uncorrelated low-order bits.
489 let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map_or(0, |d| {
490 let n = d.as_nanos();
491 #[allow(clippy::cast_possible_truncation)]
492 {
493 n as u64
494 }
495 });
496 let bump = COUNTER.fetch_add(1, Ordering::Relaxed);
497 nanos ^ bump
498}
499
500#[cfg(test)]
501#[allow(clippy::unwrap_used, clippy::expect_used)]
502mod tests {
503 use super::*;
504
505 #[test]
506 fn default_classifier_retries_transient_http_status_codes() {
507 let c = DefaultRetryClassifier;
508 for status in [408_u16, 425, 429, 500, 502, 503, 504, 599] {
509 let err = Error::provider_http(status, "x");
510 assert!(c.should_retry(&err, 0).retry, "status {status} must retry");
511 }
512 }
513
514 #[test]
515 fn default_classifier_retries_transport_class_failures() {
516 let c = DefaultRetryClassifier;
517 assert!(
518 c.should_retry(&Error::provider_network("connect refused"), 0)
519 .retry
520 );
521 assert!(
522 c.should_retry(&Error::provider_tls("handshake failed"), 0)
523 .retry
524 );
525 assert!(
526 c.should_retry(&Error::provider_dns("no such host"), 0)
527 .retry
528 );
529 }
530
531 #[test]
532 fn default_classifier_does_not_retry_permanent_status_codes() {
533 let c = DefaultRetryClassifier;
534 for status in [400_u16, 401, 403, 404, 410, 422] {
535 let err = Error::provider_http(status, "x");
536 assert!(
537 !c.should_retry(&err, 0).retry,
538 "status {status} must NOT retry"
539 );
540 }
541 }
542
543 #[test]
544 fn default_classifier_does_not_retry_caller_intent_or_programming_errors() {
545 let c = DefaultRetryClassifier;
546 assert!(!c.should_retry(&Error::Cancelled, 0).retry);
547 assert!(!c.should_retry(&Error::DeadlineExceeded, 0).retry);
548 assert!(!c.should_retry(&Error::invalid_request("nope"), 0).retry);
549 assert!(!c.should_retry(&Error::config("bad"), 0).retry);
550 }
551
552 #[test]
553 fn default_classifier_does_not_retry_usage_limit_exceeded() {
554 // Budget breach is terminal — re-issuing burns one more
555 // counter slot on each attempt's pre-call CAS without
556 // changing the outcome. Explicit arm pins the
557 // classification so a future variant inserted between
558 // `Provider` and the wildcard cannot accidentally land
559 // budget breach on the retry path.
560 use crate::run_budget::UsageLimitBreach;
561 let c = DefaultRetryClassifier;
562 let err = Error::UsageLimitExceeded(UsageLimitBreach::Requests {
563 limit: 5,
564 observed: 5,
565 });
566 let decision = c.should_retry(&err, 0);
567 assert!(!decision.retry);
568 assert_eq!(decision.after, None);
569 }
570
571 #[test]
572 fn default_classifier_propagates_vendor_retry_after_hint() {
573 let c = DefaultRetryClassifier;
574 let err =
575 Error::provider_http(429, "rate limited").with_retry_after(Duration::from_secs(7));
576 let decision = c.should_retry(&err, 0);
577 assert!(decision.retry);
578 assert_eq!(decision.after, Some(Duration::from_secs(7)));
579 }
580
581 #[test]
582 fn ensure_idempotency_key_stamps_once_and_subsequent_calls_observe_the_same_value() {
583 // RetryService relies on this contract — the first call
584 // sets a fresh UUID; later calls in the same logical
585 // call (one per attempt) see the same value rather than
586 // generating new ones.
587 use crate::context::ExecutionContext;
588 let mut ctx = ExecutionContext::new();
589 assert!(ctx.idempotency_key().is_none());
590 let mut counter = 0u32;
591 let first = ctx
592 .ensure_idempotency_key(|| {
593 counter += 1;
594 "first-uuid".to_owned()
595 })
596 .to_owned();
597 let second = ctx
598 .ensure_idempotency_key(|| {
599 counter += 1;
600 "second-uuid".to_owned()
601 })
602 .to_owned();
603 assert_eq!(first, "first-uuid");
604 assert_eq!(second, "first-uuid", "stamp must be stable across calls");
605 assert_eq!(counter, 1, "generator must run exactly once");
606 // Cloning the ctx propagates the same key — every retry of
607 // a logical call shares the stamp.
608 let cloned = ctx.clone();
609 assert_eq!(cloned.idempotency_key(), Some("first-uuid"));
610 }
611
612 #[test]
613 fn default_classifier_does_not_attach_retry_after_when_vendor_does_not_supply_one() {
614 let c = DefaultRetryClassifier;
615 let err = Error::provider_http(503, "down");
616 let decision = c.should_retry(&err, 0);
617 assert!(decision.retry);
618 assert!(decision.after.is_none());
619 }
620
621 #[test]
622 fn retry_policy_standard_uses_default_max_attempts() {
623 let p = RetryPolicy::standard();
624 assert_eq!(p.max_attempts(), DEFAULT_MAX_ATTEMPTS);
625 }
626
627 #[test]
628 fn retry_policy_overrides_compose() {
629 let p = RetryPolicy::standard()
630 .with_max_attempts(2)
631 .with_backoff(ExponentialBackoff::new(
632 Duration::from_millis(1),
633 Duration::from_millis(10),
634 ));
635 assert_eq!(p.max_attempts(), 2);
636 assert_eq!(p.backoff().base(), Duration::from_millis(1));
637 }
638}