Skip to main content

courier/
retry.rs

1use std::path::PathBuf;
2use std::time::Duration;
3
4use serde::Deserialize;
5
6/// Configures retry behavior for components that support it.
7///
8/// On each failure the component logs a warning, waits for the computed
9/// delay, then tries again. When all attempts are exhausted, `on_exhausted`
10/// determines what happens next (see [`ExhaustedPolicy`]).
11///
12/// The delay before the Nth retry (0-indexed) is:
13/// ```text
14/// delay = min(initial_delay_ms * backoff_multiplier ^ N, max_delay_ms)
15/// ```
16///
17/// Set `max_attempts = 1` to disable retries (first failure immediately
18/// triggers `on_exhausted`).
19#[derive(Debug, Clone, PartialEq, Deserialize)]
20pub struct RetryPolicy {
21    /// Total number of attempts, including the first. Must be ≥ 1.
22    pub max_attempts: u32,
23    /// Delay before the second attempt, in milliseconds.
24    pub initial_delay_ms: u64,
25    /// Multiplier applied to the delay after each failed attempt.
26    /// `1.0` = constant delay; `2.0` = exponential doubling.
27    pub backoff_multiplier: f64,
28    /// Upper bound on any single inter-attempt delay, in milliseconds.
29    pub max_delay_ms: u64,
30    /// What to do after all attempts are exhausted.
31    #[serde(default)]
32    pub on_exhausted: ExhaustedPolicy,
33}
34
35impl RetryPolicy {
36    /// Delay for the Nth retry (0-indexed: 0 = delay before 2nd attempt).
37    pub fn delay_for(&self, retry: u32) -> Duration {
38        let ms = (self.initial_delay_ms as f64 * self.backoff_multiplier.powi(retry as i32))
39            .min(self.max_delay_ms as f64) as u64;
40        Duration::from_millis(ms)
41    }
42}
43
44impl Default for RetryPolicy {
45    fn default() -> Self {
46        Self {
47            max_attempts: 3,
48            initial_delay_ms: 100,
49            backoff_multiplier: 2.0,
50            max_delay_ms: 10_000,
51            on_exhausted: ExhaustedPolicy::default(),
52        }
53    }
54}
55
56/// What to do when a [`RetryPolicy`] exhausts all attempts.
57///
58/// - [`Propagate`]: return the last error to the caller. The surrounding
59///   `ManagedSink` then applies its own `ErrorPolicy` (drop or fail pipeline).
60///
61/// - [`DeadLetter`]: append the failed envelope as a JSON line to `path` and
62///   return `Ok(())` so the pipeline keeps running. If the file write itself
63///   fails, the original error is returned instead.
64///
65/// [`Propagate`]: ExhaustedPolicy::Propagate
66/// [`DeadLetter`]: ExhaustedPolicy::DeadLetter
67#[derive(Debug, Clone, Default, PartialEq, Deserialize)]
68#[serde(tag = "kind", rename_all = "snake_case")]
69pub enum ExhaustedPolicy {
70    /// Return the last error to the caller.
71    #[default]
72    Propagate,
73    /// Append the failed envelope as a JSON line to the given file.
74    DeadLetter { path: PathBuf },
75}