Skip to main content

trillium_client_retry/
lib.rs

1//! Automatic retry/backoff middleware for the [trillium](https://trillium.rs) HTTP client.
2//!
3//! [`RetryHandler`] is a [`ClientHandler`] that re-issues a request when it fails in a way that
4//! is worth retrying — a transport-level error (connection refused, reset, timeout) or a
5//! retryable response status (`429`, `503` by default) — spacing attempts out with a configurable
6//! backoff schedule and honoring a server-advertised `Retry-After`.
7//!
8//! ```no_run
9//! use std::time::Duration;
10//! use trillium_client::Client;
11//! use trillium_client_retry::RetryHandler;
12//! use trillium_testing::client_config;
13//!
14//! let client = Client::new(client_config()).with_handler(
15//!     RetryHandler::default()
16//!         .with_exponential_backoff(Duration::from_millis(100))
17//!         .with_max_attempts(5),
18//! );
19//! ```
20//!
21//! # Behavior
22//!
23//! Each attempt runs as a full client-handler cycle (queued via `set_followup`), so other
24//! handlers — loggers, conn-id, metrics — observe every attempt. Place `RetryHandler` as the
25//! outermost handler so those observers see each attempt before the backoff sleep.
26//!
27//! ## What is retried
28//!
29//! By default, retries are limited to idempotent methods (GET, HEAD, PUT, DELETE, OPTIONS,
30//! TRACE). Within that gate, a request is retried when it fails with a transport error or returns
31//! a status in the configured set. Adjust with [`with_all_methods`], [`with_statuses`], and
32//! [`with_transport_errors`], or replace the whole decision with [`retry_when`] /
33//! [`with_decision`].
34//!
35//! ## Request bodies
36//!
37//! A request body is replayed only if it can be cloned (static bodies — `Vec<u8>`, `String`,
38//! `&'static str`, etc.). A streaming (one-shot) body cannot be replayed, so a request carrying
39//! one is **not** retried; its result is surfaced as-is.
40//!
41//! ## Limits
42//!
43//! Retrying stops at whichever comes first: [`with_max_attempts`] total attempts, or the
44//! [`with_max_elapsed`] wall-clock budget. The budget is a hard ceiling — each attempt's timeout
45//! is clamped to the time remaining, so a single slow attempt can't overrun it. (The very first
46//! attempt uses the client's own timeout, since the budget is established once the request is in
47//! flight; keep `max_elapsed` at least as large as the client timeout.)
48//!
49//! ## `Retry-After`
50//!
51//! When [`honor_retry_after`](RetryHandler::with_honor_retry_after) is set (the default) and the
52//! response carries a `Retry-After` header in delta-seconds form, that delay takes precedence
53//! over the computed backoff (clamped by [`with_max_retry_after`] if set, and always by the
54//! elapsed budget). `Retry-After` HTTP-date values are not yet parsed and fall back to the
55//! computed backoff.
56//!
57//! [`with_all_methods`]: RetryHandler::with_all_methods
58//! [`with_statuses`]: RetryHandler::with_statuses
59//! [`with_transport_errors`]: RetryHandler::with_transport_errors
60//! [`retry_when`]: RetryHandler::retry_when
61//! [`with_decision`]: RetryHandler::with_decision
62//! [`with_max_attempts`]: RetryHandler::with_max_attempts
63//! [`with_max_elapsed`]: RetryHandler::with_max_elapsed
64//! [`with_max_retry_after`]: RetryHandler::with_max_retry_after
65
66#![forbid(unsafe_code)]
67#![deny(
68    clippy::dbg_macro,
69    missing_copy_implementations,
70    rustdoc::missing_crate_level_docs,
71    missing_debug_implementations,
72    missing_docs,
73    nonstandard_style,
74    unused_qualifications
75)]
76
77// Compile the README as a doctest so its examples stay in sync with the crate.
78#[cfg(doctest)]
79#[doc = include_str!("../README.md")]
80mod readme {}
81
82mod backoff;
83use backoff::{Backoff, Kind};
84use std::{
85    borrow::Cow,
86    fmt,
87    sync::Arc,
88    time::{Duration, Instant},
89};
90use trillium_client::{
91    Body, ClientHandler, Conn, ConnExt,
92    KnownHeaderName::{Connection, ContentLength, Expect, Host, RetryAfter, TransferEncoding},
93    Method, Result, Status,
94};
95
96/// Whether `method` is eligible for retry under the idempotent-only gate (GET, HEAD, PUT, DELETE,
97/// OPTIONS, TRACE), per [RFC 9110 §9.2.2](https://www.rfc-editor.org/rfc/rfc9110#section-9.2.2).
98/// Replaying a non-idempotent request (e.g. POST) risks a duplicate side effect.
99fn is_idempotent(method: Method) -> bool {
100    matches!(
101        method,
102        Method::Get | Method::Head | Method::Put | Method::Delete | Method::Options | Method::Trace
103    )
104}
105
106type Predicate = Arc<dyn Fn(&Conn) -> bool + Send + Sync>;
107type Decision = Arc<dyn Fn(&Conn, u32) -> Option<Duration> + Send + Sync>;
108
109/// A [`ClientHandler`] that automatically retries failed requests with backoff.
110///
111/// See the [crate-level documentation][crate] for behavior and configuration.
112#[derive(Clone)]
113pub struct RetryHandler {
114    backoff: Backoff,
115    max_attempts: u32,
116    max_elapsed: Duration,
117    statuses: Arc<[Status]>,
118    all_methods: bool,
119    transport_errors: bool,
120    honor_retry_after: bool,
121    max_retry_after: Option<Duration>,
122    predicate: Option<Predicate>,
123    decision: Option<Decision>,
124}
125
126impl Default for RetryHandler {
127    fn default() -> Self {
128        Self {
129            backoff: Backoff::default(),
130            max_attempts: 4,
131            max_elapsed: Duration::from_secs(30),
132            statuses: Arc::from([Status::TooManyRequests, Status::ServiceUnavailable].as_slice()),
133            all_methods: false,
134            transport_errors: true,
135            honor_retry_after: true,
136            max_retry_after: None,
137            predicate: None,
138            decision: None,
139        }
140    }
141}
142
143impl RetryHandler {
144    /// Construct a `RetryHandler` with default settings (see the [crate docs][crate]).
145    #[must_use]
146    pub fn new() -> Self {
147        Self::default()
148    }
149
150    /// Wait a fixed `delay` before every retry.
151    ///
152    /// One of four mutually exclusive backoff curves (the others are
153    /// [`with_linear_backoff`](Self::with_linear_backoff),
154    /// [`with_exponential_backoff`](Self::with_exponential_backoff), and
155    /// [`with_custom_backoff`](Self::with_custom_backoff)); the last one set wins. The default
156    /// curve is exponential from 100ms. [`with_max_delay`](Self::with_max_delay) and
157    /// [`without_jitter`](Self::without_jitter) apply on top of whichever curve is chosen.
158    #[must_use]
159    pub fn with_constant_backoff(mut self, delay: Duration) -> Self {
160        self.backoff.kind = Kind::Constant(delay);
161        self
162    }
163
164    /// Grow the delay linearly: `step * retry_number` (the first retry waits `step`). See
165    /// [`with_constant_backoff`](Self::with_constant_backoff) for how the curves combine.
166    #[must_use]
167    pub fn with_linear_backoff(mut self, step: Duration) -> Self {
168        self.backoff.kind = Kind::Linear(step);
169        self
170    }
171
172    /// Double the delay each retry: `base * 2^(retry_number - 1)` (the first retry waits `base`).
173    /// This is the default curve, from 100ms. See
174    /// [`with_constant_backoff`](Self::with_constant_backoff) for how the curves combine.
175    #[must_use]
176    pub fn with_exponential_backoff(mut self, base: Duration) -> Self {
177        self.backoff.kind = Kind::Exponential(base);
178        self
179    }
180
181    /// Compute the delay with a fully custom curve. The closure receives the 1-based retry number
182    /// and the conn carrying the response or error being retried, and returns the base delay
183    /// (before the [`with_max_delay`](Self::with_max_delay) cap and jitter). See
184    /// [`with_constant_backoff`](Self::with_constant_backoff) for how the curves combine.
185    #[must_use]
186    pub fn with_custom_backoff(
187        mut self,
188        f: impl Fn(u32, &Conn) -> Duration + Send + Sync + 'static,
189    ) -> Self {
190        self.backoff.kind = Kind::Custom(Arc::new(f));
191        self
192    }
193
194    /// Cap the computed backoff delay at `max`, applied before jitter. Defaults to uncapped. This
195    /// caps *your* backoff curve; a server-advertised `Retry-After` is capped separately by
196    /// [`with_max_retry_after`](Self::with_max_retry_after).
197    #[must_use]
198    pub fn with_max_delay(mut self, max: Duration) -> Self {
199        self.backoff.max_delay = Some(max);
200        self
201    }
202
203    /// Use the computed backoff delay exactly, with no randomization. By default, full jitter is
204    /// applied — the actual delay is chosen uniformly at random from `0..=computed` — to spread
205    /// retries from many clients across time and avoid a synchronized thundering herd.
206    #[must_use]
207    pub fn without_jitter(mut self) -> Self {
208        self.backoff.jitter = backoff::Jitter::None;
209        self
210    }
211
212    /// Set the maximum number of attempts, *including* the original request. Defaults to 4
213    /// (the original plus up to 3 retries).
214    #[must_use]
215    pub fn with_max_attempts(mut self, max_attempts: u32) -> Self {
216        self.max_attempts = max_attempts;
217        self
218    }
219
220    /// Set the total wall-clock budget across all attempts. Defaults to 30 seconds. This is a
221    /// hard ceiling: each retry's timeout is clamped to the time remaining.
222    #[must_use]
223    pub fn with_max_elapsed(mut self, max_elapsed: Duration) -> Self {
224        self.max_elapsed = max_elapsed;
225        self
226    }
227
228    /// Replace the set of response statuses that trigger a retry. Defaults to `429` and `503`.
229    #[must_use]
230    pub fn with_statuses(mut self, statuses: impl IntoIterator<Item = Status>) -> Self {
231        self.statuses = statuses.into_iter().collect();
232        self
233    }
234
235    /// Retry regardless of request method, including POST and other non-idempotent requests. By
236    /// default only idempotent methods (GET, HEAD, PUT, DELETE, OPTIONS, TRACE) are retried, since
237    /// replaying a non-idempotent request risks a duplicate side effect. Enable this only when the
238    /// endpoint is known to be safe to replay (e.g. it is idempotent in practice or guarded by an
239    /// idempotency key).
240    #[must_use]
241    pub fn with_all_methods(mut self) -> Self {
242        self.all_methods = true;
243        self
244    }
245
246    /// Set whether transport-level errors (connection refused, reset, timeout) are retried.
247    /// Defaults to `true`.
248    #[must_use]
249    pub fn with_transport_errors(mut self, retry: bool) -> Self {
250        self.transport_errors = retry;
251        self
252    }
253
254    /// Set whether a server-advertised `Retry-After` overrides the computed backoff. Defaults to
255    /// `true`.
256    #[must_use]
257    pub fn with_honor_retry_after(mut self, honor: bool) -> Self {
258        self.honor_retry_after = honor;
259        self
260    }
261
262    /// Cap how long a `Retry-After` will be honored for. Defaults to uncapped (bounded only by
263    /// the elapsed budget).
264    #[must_use]
265    pub fn with_max_retry_after(mut self, max: Duration) -> Self {
266        self.max_retry_after = Some(max);
267        self
268    }
269
270    /// Replace the built-in retry predicate. The closure decides, from the conn carrying the
271    /// response or transport error, whether to retry — fully replacing the method gate, status
272    /// set, and transport-error toggle. Timing still comes from the configured backoff schedule.
273    #[must_use]
274    pub fn retry_when(mut self, predicate: impl Fn(&Conn) -> bool + Send + Sync + 'static) -> Self {
275        self.predicate = Some(Arc::new(predicate));
276        self
277    }
278
279    /// Replace the entire retry decision — predicate *and* backoff. The closure receives the conn
280    /// and the 1-based retry number and returns `Some(delay)` to retry after that delay, or
281    /// `None` to give up. The attempt and elapsed-budget limits still apply.
282    #[must_use]
283    pub fn with_decision(
284        mut self,
285        decision: impl Fn(&Conn, u32) -> Option<Duration> + Send + Sync + 'static,
286    ) -> Self {
287        self.decision = Some(Arc::new(decision));
288        self
289    }
290
291    fn decide(&self, conn: &Conn, retry_number: u32) -> Option<Duration> {
292        if let Some(decision) = &self.decision {
293            return decision(conn, retry_number);
294        }
295        self.should_retry(conn)
296            .then(|| self.backoff.delay(retry_number, conn))
297    }
298
299    fn should_retry(&self, conn: &Conn) -> bool {
300        if let Some(predicate) = &self.predicate {
301            return predicate(conn);
302        }
303        if !self.all_methods && !is_idempotent(conn.method()) {
304            return false;
305        }
306        if conn.error().is_some() {
307            return self.transport_errors;
308        }
309        conn.status()
310            .is_some_and(|status| self.statuses.contains(&status))
311    }
312
313    fn effective_delay(&self, conn: &Conn, base_delay: Duration) -> Duration {
314        if !self.honor_retry_after {
315            return base_delay;
316        }
317        match retry_after(conn) {
318            Some(advised) => self.max_retry_after.map_or(advised, |cap| advised.min(cap)),
319            None => base_delay,
320        }
321    }
322
323    fn build_followup(&self, conn: &Conn, state: RetryState, remaining: Duration) -> Conn {
324        let mut followup = conn.client().build_conn(conn.method(), conn.url().clone());
325
326        // Strip transport/body-description headers; `finalize_headers` re-derives them for the
327        // replayed request. Same-origin retry, so credential headers are kept.
328        let mut headers = conn.request_headers().clone();
329        headers.remove_all([Host, ContentLength, TransferEncoding, Expect, Connection]);
330        *followup.request_headers_mut() = headers;
331
332        if let Some(BodyReplay::Replayable(body)) = conn.state::<BodyReplay>()
333            && let Some(replayed) = body.try_clone()
334        {
335            followup.set_request_body(replayed);
336        }
337
338        let timeout = conn.timeout().map_or(remaining, |t| t.min(remaining));
339        followup.set_timeout(timeout);
340
341        followup.insert_state(RetryState {
342            attempts: state.attempts + 1,
343            deadline: state.deadline,
344        });
345        followup
346    }
347}
348
349impl ClientHandler for RetryHandler {
350    async fn run(&self, conn: &mut Conn) -> Result<()> {
351        // Anchor the elapsed budget on the first attempt; follow-ups carry it forward.
352        if conn.state::<RetryState>().is_none() {
353            conn.insert_state(RetryState {
354                attempts: 1,
355                deadline: Instant::now() + self.max_elapsed,
356            });
357        }
358
359        // Snapshot the body before the network consumes it, so it can be replayed.
360        let replay = match conn.request_body() {
361            None => BodyReplay::None,
362            Some(body) => match body.try_clone() {
363                Some(clone) => BodyReplay::Replayable(clone),
364                None => BodyReplay::OneShot,
365            },
366        };
367        conn.insert_state(replay);
368        Ok(())
369    }
370
371    async fn after_response(&self, conn: &mut Conn) -> Result<()> {
372        let Some(state) = conn.state::<RetryState>().copied() else {
373            return Ok(());
374        };
375        if state.attempts >= self.max_attempts {
376            return Ok(());
377        }
378        // A one-shot body can't be replayed; surface whatever happened.
379        if matches!(conn.state::<BodyReplay>(), Some(BodyReplay::OneShot)) {
380            return Ok(());
381        }
382
383        let retry_number = state.attempts;
384        let Some(base_delay) = self.decide(conn, retry_number) else {
385            return Ok(());
386        };
387        let delay = self.effective_delay(conn, base_delay);
388
389        // Not enough budget left to both wait and attempt — give up now.
390        if Instant::now() + delay >= state.deadline {
391            return Ok(());
392        }
393
394        conn.client().connector().runtime().delay(delay).await;
395
396        let remaining = state.deadline.saturating_duration_since(Instant::now());
397        if remaining.is_zero() {
398            return Ok(());
399        }
400
401        let followup = self.build_followup(conn, state, remaining);
402        // Clear any transport error so the loop runs the follow-up instead of propagating it.
403        conn.take_error();
404        conn.set_followup(followup);
405        Ok(())
406    }
407
408    fn name(&self) -> Cow<'static, str> {
409        "RetryHandler".into()
410    }
411}
412
413fn retry_after(conn: &Conn) -> Option<Duration> {
414    conn.response_headers()
415        .get_str(RetryAfter)?
416        .trim()
417        .parse::<u64>()
418        .ok()
419        .map(Duration::from_secs)
420}
421
422/// Per-conn retry bookkeeping, stashed in conn state and carried across follow-ups.
423#[derive(Clone, Copy)]
424struct RetryState {
425    attempts: u32,
426    deadline: Instant,
427}
428
429/// Snapshot of the request body's replayability, taken in `run` before the network consumes it.
430enum BodyReplay {
431    None,
432    Replayable(Body),
433    OneShot,
434}
435
436impl fmt::Debug for RetryHandler {
437    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
438        f.debug_struct("RetryHandler")
439            .field("backoff", &self.backoff)
440            .field("max_attempts", &self.max_attempts)
441            .field("max_elapsed", &self.max_elapsed)
442            .field("statuses", &self.statuses)
443            .field("all_methods", &self.all_methods)
444            .field("transport_errors", &self.transport_errors)
445            .field("honor_retry_after", &self.honor_retry_after)
446            .field("max_retry_after", &self.max_retry_after)
447            .field("predicate", &self.predicate.as_ref().map(|_| "<fn>"))
448            .field("decision", &self.decision.as_ref().map(|_| "<fn>"))
449            .finish()
450    }
451}