trillium_client_retry/lib.rs
1//! Automatic retry/backoff middleware for the [trillium](https://trillium.rs) HTTP client.
2//!
3//! [`RetryHandler`] is a [`ClientHandler`] that re-issues a request when it fails in a way that
4//! is worth retrying — a transport-level error (connection refused, reset, timeout) or a
5//! retryable response status (`429`, `503` by default) — spacing attempts out with a configurable
6//! backoff schedule and honoring a server-advertised `Retry-After`.
7//!
8//! ```no_run
9//! use std::time::Duration;
10//! use trillium_client::Client;
11//! use trillium_client_retry::RetryHandler;
12//! use trillium_testing::client_config;
13//!
14//! let client = Client::new(client_config()).with_handler(
15//! RetryHandler::default()
16//! .with_exponential_backoff(Duration::from_millis(100))
17//! .with_max_attempts(5),
18//! );
19//! ```
20//!
21//! # Behavior
22//!
23//! Each attempt runs as a full client-handler cycle (queued via `set_followup`), so other
24//! handlers — loggers, conn-id, metrics — observe every attempt. Place `RetryHandler` as the
25//! outermost handler so those observers see each attempt before the backoff sleep.
26//!
27//! ## What is retried
28//!
29//! By default, retries are limited to idempotent methods (GET, HEAD, PUT, DELETE, OPTIONS,
30//! TRACE). Within that gate, a request is retried when it fails with a transport error or returns
31//! a status in the configured set. Adjust with [`with_all_methods`], [`with_statuses`], and
32//! [`with_transport_errors`], or replace the whole decision with [`retry_when`] /
33//! [`with_decision`].
34//!
35//! ## Request bodies
36//!
37//! A request body is replayed only if it can be cloned (static bodies — `Vec<u8>`, `String`,
38//! `&'static str`, etc.). A streaming (one-shot) body cannot be replayed, so a request carrying
39//! one is **not** retried; its result is surfaced as-is.
40//!
41//! ## Limits
42//!
43//! Retrying stops at whichever comes first: [`with_max_attempts`] total attempts, or the
44//! [`with_max_elapsed`] wall-clock budget. The budget is a hard ceiling — each attempt's timeout
45//! is clamped to the time remaining, so a single slow attempt can't overrun it. (The very first
46//! attempt uses the client's own timeout, since the budget is established once the request is in
47//! flight; keep `max_elapsed` at least as large as the client timeout.)
48//!
49//! ## `Retry-After`
50//!
51//! When [`honor_retry_after`](RetryHandler::with_honor_retry_after) is set (the default) and the
52//! response carries a `Retry-After` header in delta-seconds form, that delay takes precedence
53//! over the computed backoff (clamped by [`with_max_retry_after`] if set, and always by the
54//! elapsed budget). `Retry-After` HTTP-date values are not yet parsed and fall back to the
55//! computed backoff.
56//!
57//! [`with_all_methods`]: RetryHandler::with_all_methods
58//! [`with_statuses`]: RetryHandler::with_statuses
59//! [`with_transport_errors`]: RetryHandler::with_transport_errors
60//! [`retry_when`]: RetryHandler::retry_when
61//! [`with_decision`]: RetryHandler::with_decision
62//! [`with_max_attempts`]: RetryHandler::with_max_attempts
63//! [`with_max_elapsed`]: RetryHandler::with_max_elapsed
64//! [`with_max_retry_after`]: RetryHandler::with_max_retry_after
65
66#![forbid(unsafe_code)]
67#![deny(
68 clippy::dbg_macro,
69 missing_copy_implementations,
70 rustdoc::missing_crate_level_docs,
71 missing_debug_implementations,
72 missing_docs,
73 nonstandard_style,
74 unused_qualifications
75)]
76
77// Compile the README as a doctest so its examples stay in sync with the crate.
78#[cfg(doctest)]
79#[doc = include_str!("../README.md")]
80mod readme {}
81
82mod backoff;
83use backoff::{Backoff, Kind};
84use std::{
85 borrow::Cow,
86 fmt,
87 sync::Arc,
88 time::{Duration, Instant},
89};
90use trillium_client::{
91 Body, ClientHandler, Conn, ConnExt,
92 KnownHeaderName::{Connection, ContentLength, Expect, Host, RetryAfter, TransferEncoding},
93 Method, Result, Status,
94};
95
96/// Whether `method` is eligible for retry under the idempotent-only gate (GET, HEAD, PUT, DELETE,
97/// OPTIONS, TRACE), per [RFC 9110 §9.2.2](https://www.rfc-editor.org/rfc/rfc9110#section-9.2.2).
98/// Replaying a non-idempotent request (e.g. POST) risks a duplicate side effect.
99fn is_idempotent(method: Method) -> bool {
100 matches!(
101 method,
102 Method::Get | Method::Head | Method::Put | Method::Delete | Method::Options | Method::Trace
103 )
104}
105
106type Predicate = Arc<dyn Fn(&Conn) -> bool + Send + Sync>;
107type Decision = Arc<dyn Fn(&Conn, u32) -> Option<Duration> + Send + Sync>;
108
109/// A [`ClientHandler`] that automatically retries failed requests with backoff.
110///
111/// See the [crate-level documentation][crate] for behavior and configuration.
112#[derive(Clone)]
113pub struct RetryHandler {
114 backoff: Backoff,
115 max_attempts: u32,
116 max_elapsed: Duration,
117 statuses: Arc<[Status]>,
118 all_methods: bool,
119 transport_errors: bool,
120 honor_retry_after: bool,
121 max_retry_after: Option<Duration>,
122 predicate: Option<Predicate>,
123 decision: Option<Decision>,
124}
125
126impl Default for RetryHandler {
127 fn default() -> Self {
128 Self {
129 backoff: Backoff::default(),
130 max_attempts: 4,
131 max_elapsed: Duration::from_secs(30),
132 statuses: Arc::from([Status::TooManyRequests, Status::ServiceUnavailable].as_slice()),
133 all_methods: false,
134 transport_errors: true,
135 honor_retry_after: true,
136 max_retry_after: None,
137 predicate: None,
138 decision: None,
139 }
140 }
141}
142
143impl RetryHandler {
144 /// Construct a `RetryHandler` with default settings (see the [crate docs][crate]).
145 #[must_use]
146 pub fn new() -> Self {
147 Self::default()
148 }
149
150 /// Wait a fixed `delay` before every retry.
151 ///
152 /// One of four mutually exclusive backoff curves (the others are
153 /// [`with_linear_backoff`](Self::with_linear_backoff),
154 /// [`with_exponential_backoff`](Self::with_exponential_backoff), and
155 /// [`with_custom_backoff`](Self::with_custom_backoff)); the last one set wins. The default
156 /// curve is exponential from 100ms. [`with_max_delay`](Self::with_max_delay) and
157 /// [`without_jitter`](Self::without_jitter) apply on top of whichever curve is chosen.
158 #[must_use]
159 pub fn with_constant_backoff(mut self, delay: Duration) -> Self {
160 self.backoff.kind = Kind::Constant(delay);
161 self
162 }
163
164 /// Grow the delay linearly: `step * retry_number` (the first retry waits `step`). See
165 /// [`with_constant_backoff`](Self::with_constant_backoff) for how the curves combine.
166 #[must_use]
167 pub fn with_linear_backoff(mut self, step: Duration) -> Self {
168 self.backoff.kind = Kind::Linear(step);
169 self
170 }
171
172 /// Double the delay each retry: `base * 2^(retry_number - 1)` (the first retry waits `base`).
173 /// This is the default curve, from 100ms. See
174 /// [`with_constant_backoff`](Self::with_constant_backoff) for how the curves combine.
175 #[must_use]
176 pub fn with_exponential_backoff(mut self, base: Duration) -> Self {
177 self.backoff.kind = Kind::Exponential(base);
178 self
179 }
180
181 /// Compute the delay with a fully custom curve. The closure receives the 1-based retry number
182 /// and the conn carrying the response or error being retried, and returns the base delay
183 /// (before the [`with_max_delay`](Self::with_max_delay) cap and jitter). See
184 /// [`with_constant_backoff`](Self::with_constant_backoff) for how the curves combine.
185 #[must_use]
186 pub fn with_custom_backoff(
187 mut self,
188 f: impl Fn(u32, &Conn) -> Duration + Send + Sync + 'static,
189 ) -> Self {
190 self.backoff.kind = Kind::Custom(Arc::new(f));
191 self
192 }
193
194 /// Cap the computed backoff delay at `max`, applied before jitter. Defaults to uncapped. This
195 /// caps *your* backoff curve; a server-advertised `Retry-After` is capped separately by
196 /// [`with_max_retry_after`](Self::with_max_retry_after).
197 #[must_use]
198 pub fn with_max_delay(mut self, max: Duration) -> Self {
199 self.backoff.max_delay = Some(max);
200 self
201 }
202
203 /// Use the computed backoff delay exactly, with no randomization. By default, full jitter is
204 /// applied — the actual delay is chosen uniformly at random from `0..=computed` — to spread
205 /// retries from many clients across time and avoid a synchronized thundering herd.
206 #[must_use]
207 pub fn without_jitter(mut self) -> Self {
208 self.backoff.jitter = backoff::Jitter::None;
209 self
210 }
211
212 /// Set the maximum number of attempts, *including* the original request. Defaults to 4
213 /// (the original plus up to 3 retries).
214 #[must_use]
215 pub fn with_max_attempts(mut self, max_attempts: u32) -> Self {
216 self.max_attempts = max_attempts;
217 self
218 }
219
220 /// Set the total wall-clock budget across all attempts. Defaults to 30 seconds. This is a
221 /// hard ceiling: each retry's timeout is clamped to the time remaining.
222 #[must_use]
223 pub fn with_max_elapsed(mut self, max_elapsed: Duration) -> Self {
224 self.max_elapsed = max_elapsed;
225 self
226 }
227
228 /// Replace the set of response statuses that trigger a retry. Defaults to `429` and `503`.
229 #[must_use]
230 pub fn with_statuses(mut self, statuses: impl IntoIterator<Item = Status>) -> Self {
231 self.statuses = statuses.into_iter().collect();
232 self
233 }
234
235 /// Retry regardless of request method, including POST and other non-idempotent requests. By
236 /// default only idempotent methods (GET, HEAD, PUT, DELETE, OPTIONS, TRACE) are retried, since
237 /// replaying a non-idempotent request risks a duplicate side effect. Enable this only when the
238 /// endpoint is known to be safe to replay (e.g. it is idempotent in practice or guarded by an
239 /// idempotency key).
240 #[must_use]
241 pub fn with_all_methods(mut self) -> Self {
242 self.all_methods = true;
243 self
244 }
245
246 /// Set whether transport-level errors (connection refused, reset, timeout) are retried.
247 /// Defaults to `true`.
248 #[must_use]
249 pub fn with_transport_errors(mut self, retry: bool) -> Self {
250 self.transport_errors = retry;
251 self
252 }
253
254 /// Set whether a server-advertised `Retry-After` overrides the computed backoff. Defaults to
255 /// `true`.
256 #[must_use]
257 pub fn with_honor_retry_after(mut self, honor: bool) -> Self {
258 self.honor_retry_after = honor;
259 self
260 }
261
262 /// Cap how long a `Retry-After` will be honored for. Defaults to uncapped (bounded only by
263 /// the elapsed budget).
264 #[must_use]
265 pub fn with_max_retry_after(mut self, max: Duration) -> Self {
266 self.max_retry_after = Some(max);
267 self
268 }
269
270 /// Replace the built-in retry predicate. The closure decides, from the conn carrying the
271 /// response or transport error, whether to retry — fully replacing the method gate, status
272 /// set, and transport-error toggle. Timing still comes from the configured backoff schedule.
273 #[must_use]
274 pub fn retry_when(mut self, predicate: impl Fn(&Conn) -> bool + Send + Sync + 'static) -> Self {
275 self.predicate = Some(Arc::new(predicate));
276 self
277 }
278
279 /// Replace the entire retry decision — predicate *and* backoff. The closure receives the conn
280 /// and the 1-based retry number and returns `Some(delay)` to retry after that delay, or
281 /// `None` to give up. The attempt and elapsed-budget limits still apply.
282 #[must_use]
283 pub fn with_decision(
284 mut self,
285 decision: impl Fn(&Conn, u32) -> Option<Duration> + Send + Sync + 'static,
286 ) -> Self {
287 self.decision = Some(Arc::new(decision));
288 self
289 }
290
291 fn decide(&self, conn: &Conn, retry_number: u32) -> Option<Duration> {
292 if let Some(decision) = &self.decision {
293 return decision(conn, retry_number);
294 }
295 self.should_retry(conn)
296 .then(|| self.backoff.delay(retry_number, conn))
297 }
298
299 fn should_retry(&self, conn: &Conn) -> bool {
300 if let Some(predicate) = &self.predicate {
301 return predicate(conn);
302 }
303 if !self.all_methods && !is_idempotent(conn.method()) {
304 return false;
305 }
306 if conn.error().is_some() {
307 return self.transport_errors;
308 }
309 conn.status()
310 .is_some_and(|status| self.statuses.contains(&status))
311 }
312
313 fn effective_delay(&self, conn: &Conn, base_delay: Duration) -> Duration {
314 if !self.honor_retry_after {
315 return base_delay;
316 }
317 match retry_after(conn) {
318 Some(advised) => self.max_retry_after.map_or(advised, |cap| advised.min(cap)),
319 None => base_delay,
320 }
321 }
322
323 fn build_followup(&self, conn: &Conn, state: RetryState, remaining: Duration) -> Conn {
324 let mut followup = conn.client().build_conn(conn.method(), conn.url().clone());
325
326 // Strip transport/body-description headers; `finalize_headers` re-derives them for the
327 // replayed request. Same-origin retry, so credential headers are kept.
328 let mut headers = conn.request_headers().clone();
329 headers.remove_all([Host, ContentLength, TransferEncoding, Expect, Connection]);
330 *followup.request_headers_mut() = headers;
331
332 if let Some(BodyReplay::Replayable(body)) = conn.state::<BodyReplay>()
333 && let Some(replayed) = body.try_clone()
334 {
335 followup.set_request_body(replayed);
336 }
337
338 let timeout = conn.timeout().map_or(remaining, |t| t.min(remaining));
339 followup.set_timeout(timeout);
340
341 followup.insert_state(RetryState {
342 attempts: state.attempts + 1,
343 deadline: state.deadline,
344 });
345 followup
346 }
347}
348
349impl ClientHandler for RetryHandler {
350 async fn run(&self, conn: &mut Conn) -> Result<()> {
351 // Anchor the elapsed budget on the first attempt; follow-ups carry it forward.
352 if conn.state::<RetryState>().is_none() {
353 conn.insert_state(RetryState {
354 attempts: 1,
355 deadline: Instant::now() + self.max_elapsed,
356 });
357 }
358
359 // Snapshot the body before the network consumes it, so it can be replayed.
360 let replay = match conn.request_body() {
361 None => BodyReplay::None,
362 Some(body) => match body.try_clone() {
363 Some(clone) => BodyReplay::Replayable(clone),
364 None => BodyReplay::OneShot,
365 },
366 };
367 conn.insert_state(replay);
368 Ok(())
369 }
370
371 async fn after_response(&self, conn: &mut Conn) -> Result<()> {
372 let Some(state) = conn.state::<RetryState>().copied() else {
373 return Ok(());
374 };
375 if state.attempts >= self.max_attempts {
376 return Ok(());
377 }
378 // A one-shot body can't be replayed; surface whatever happened.
379 if matches!(conn.state::<BodyReplay>(), Some(BodyReplay::OneShot)) {
380 return Ok(());
381 }
382
383 let retry_number = state.attempts;
384 let Some(base_delay) = self.decide(conn, retry_number) else {
385 return Ok(());
386 };
387 let delay = self.effective_delay(conn, base_delay);
388
389 // Not enough budget left to both wait and attempt — give up now.
390 if Instant::now() + delay >= state.deadline {
391 return Ok(());
392 }
393
394 conn.client().connector().runtime().delay(delay).await;
395
396 let remaining = state.deadline.saturating_duration_since(Instant::now());
397 if remaining.is_zero() {
398 return Ok(());
399 }
400
401 let followup = self.build_followup(conn, state, remaining);
402 // Clear any transport error so the loop runs the follow-up instead of propagating it.
403 conn.take_error();
404 conn.set_followup(followup);
405 Ok(())
406 }
407
408 fn name(&self) -> Cow<'static, str> {
409 "RetryHandler".into()
410 }
411}
412
413fn retry_after(conn: &Conn) -> Option<Duration> {
414 conn.response_headers()
415 .get_str(RetryAfter)?
416 .trim()
417 .parse::<u64>()
418 .ok()
419 .map(Duration::from_secs)
420}
421
422/// Per-conn retry bookkeeping, stashed in conn state and carried across follow-ups.
423#[derive(Clone, Copy)]
424struct RetryState {
425 attempts: u32,
426 deadline: Instant,
427}
428
429/// Snapshot of the request body's replayability, taken in `run` before the network consumes it.
430enum BodyReplay {
431 None,
432 Replayable(Body),
433 OneShot,
434}
435
436impl fmt::Debug for RetryHandler {
437 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
438 f.debug_struct("RetryHandler")
439 .field("backoff", &self.backoff)
440 .field("max_attempts", &self.max_attempts)
441 .field("max_elapsed", &self.max_elapsed)
442 .field("statuses", &self.statuses)
443 .field("all_methods", &self.all_methods)
444 .field("transport_errors", &self.transport_errors)
445 .field("honor_retry_after", &self.honor_retry_after)
446 .field("max_retry_after", &self.max_retry_after)
447 .field("predicate", &self.predicate.as_ref().map(|_| "<fn>"))
448 .field("decision", &self.decision.as_ref().map(|_| "<fn>"))
449 .finish()
450 }
451}