Skip to main content

eventsrc_client/replayable/
retry.rs

1//! Retry policy types for reconnecting event sources.
2//!
3//! The reqwest integration keeps retry concerns split into two layers:
4//!
5//! - [`crate::replayable::EventSource`] classifies reconnectable transitions
6//! - [`RetryPolicy`] maps that retry context to an optional delay
7//!
8//! This keeps transport and protocol semantics in the event source state machine
9//! while still allowing callers to customize reconnect timing.
10//!
11//! Built-in policies:
12//!
13//! - [`ConstantBackoff`] for a fixed reconnect delay
14//! - [`ExponentialBackoff`] for failure-sensitive backoff
15//! - [`NeverRetry`] to stop reconnecting entirely
16
17use std::{fmt::Debug, time::Duration};
18
19use backon::{BackoffBuilder, ConstantBuilder, ExponentialBuilder};
20
21/// Computes reconnect delays for retryable event source transitions.
22pub trait RetryPolicy: Debug + Send + Sync + 'static {
23    /// Returns the delay before the next reconnect attempt, or `None` to stop reconnecting.
24    fn next_delay(&self, context: RetryContext) -> Option<Duration>;
25}
26
27/// Why the event source is reconnecting.
28#[non_exhaustive]
29#[derive(Clone, Copy, Debug, PartialEq, Eq)]
30pub enum RetryCause {
31    /// The stream reached EOF after a valid SSE response.
32    Disconnect,
33    /// Establishing the HTTP connection failed.
34    ConnectError,
35    /// Reading the SSE response body failed.
36    StreamError,
37}
38
39/// Context passed to [`RetryPolicy`] when computing the next delay.
40#[non_exhaustive]
41#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42pub struct RetryContext {
43    /// Why the reconnect is being scheduled.
44    pub cause: RetryCause,
45    /// Current consecutive failure count.
46    ///
47    /// This is `0` for [`RetryCause::Disconnect`], `1` for the first failure,
48    /// `2` for the second consecutive failure, and so on.
49    pub failure_streak: usize,
50    /// Latest `retry:` directive observed from the server, if any.
51    pub server_retry: Option<Duration>,
52}
53
54/// A fixed-delay reconnect policy.
55#[derive(Clone, Debug, PartialEq, Eq)]
56pub struct ConstantBackoff {
57    delay: Duration,
58    max_delay: Option<Duration>,
59    max_retries: Option<usize>,
60    jitter: bool,
61}
62
63impl ConstantBackoff {
64    /// Creates a fixed-delay reconnect policy.
65    pub fn new(delay: Duration) -> Self {
66        Self { delay, max_delay: None, max_retries: None, jitter: false }
67    }
68
69    /// Clamps both configured and server-provided delays to `max_delay`.
70    pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
71        self.max_delay = Some(max_delay);
72        self
73    }
74
75    /// Limits how many consecutive failures may be retried.
76    ///
77    /// This budget is consumed only by failure causes and not by normal
78    /// disconnects after a valid SSE response.
79    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
80        self.max_retries = Some(max_retries);
81        self
82    }
83
84    /// Enables jitter on computed failure delays.
85    pub fn with_jitter(mut self) -> Self {
86        self.jitter = true;
87        self
88    }
89
90    fn effective_delay(&self, server_retry: Option<Duration>) -> Duration {
91        let delay = server_retry.unwrap_or(self.delay);
92
93        match self.max_delay {
94            Some(max_delay) => delay.min(max_delay),
95            None => delay,
96        }
97    }
98
99    fn builder(&self, delay: Duration) -> ConstantBuilder {
100        let mut builder = ConstantBuilder::default().with_delay(delay);
101
102        builder = match self.max_retries {
103            Some(max_retries) => builder.with_max_times(max_retries),
104            None => builder.without_max_times(),
105        };
106
107        if self.jitter { builder.with_jitter() } else { builder }
108    }
109}
110
111impl Default for ConstantBackoff {
112    /// Returns the default reconnect policy used by [`crate::replayable::EventSource`].
113    ///
114    /// The default is a constant 3 second delay.
115    fn default() -> Self {
116        Self::new(Duration::from_secs(3))
117    }
118}
119
120impl RetryPolicy for ConstantBackoff {
121    fn next_delay(&self, context: RetryContext) -> Option<Duration> {
122        let delay = self.effective_delay(context.server_retry);
123
124        match context.cause {
125            RetryCause::Disconnect => Some(delay),
126            RetryCause::ConnectError | RetryCause::StreamError => {
127                let mut backoff = self.builder(delay).build();
128                backoff.nth(context.failure_streak.saturating_sub(1))
129            },
130        }
131    }
132}
133
134/// An exponential reconnect policy.
135#[derive(Clone, Debug, PartialEq, Eq)]
136pub struct ExponentialBackoff {
137    initial_delay: Duration,
138    max_delay: Option<Duration>,
139    max_retries: Option<usize>,
140    jitter: bool,
141}
142
143impl ExponentialBackoff {
144    /// Creates an exponential reconnect policy using `initial_delay` as the base.
145    pub fn new(initial_delay: Duration) -> Self {
146        Self { initial_delay, max_delay: None, max_retries: None, jitter: false }
147    }
148
149    /// Clamps both configured and server-provided delays to `max_delay`.
150    pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
151        self.max_delay = Some(max_delay);
152        self
153    }
154
155    /// Limits how many consecutive failures may be retried.
156    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
157        self.max_retries = Some(max_retries);
158        self
159    }
160
161    /// Enables jitter on computed failure delays.
162    pub fn with_jitter(mut self) -> Self {
163        self.jitter = true;
164        self
165    }
166
167    fn effective_initial_delay(&self, server_retry: Option<Duration>) -> Duration {
168        let delay = server_retry.unwrap_or(self.initial_delay);
169
170        match self.max_delay {
171            Some(max_delay) => delay.min(max_delay),
172            None => delay,
173        }
174    }
175
176    fn builder(&self, initial_delay: Duration) -> ExponentialBuilder {
177        let mut builder = ExponentialBuilder::default().with_min_delay(initial_delay);
178
179        builder = match self.max_delay {
180            Some(max_delay) => builder.with_max_delay(max_delay),
181            None => builder.without_max_delay(),
182        };
183
184        builder = match self.max_retries {
185            Some(max_retries) => builder.with_max_times(max_retries),
186            None => builder.without_max_times(),
187        };
188
189        if self.jitter { builder.with_jitter() } else { builder }
190    }
191}
192
193impl RetryPolicy for ExponentialBackoff {
194    fn next_delay(&self, context: RetryContext) -> Option<Duration> {
195        let initial_delay = self.effective_initial_delay(context.server_retry);
196
197        match context.cause {
198            RetryCause::Disconnect => Some(initial_delay),
199            RetryCause::ConnectError | RetryCause::StreamError => {
200                let mut backoff = self.builder(initial_delay).build();
201                backoff.nth(context.failure_streak.saturating_sub(1))
202            },
203        }
204    }
205}
206
207/// A reconnect policy that never retries.
208#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
209pub struct NeverRetry;
210
211impl RetryPolicy for NeverRetry {
212    fn next_delay(&self, _context: RetryContext) -> Option<Duration> {
213        None
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[test]
222    fn constant_backoff_reuses_the_same_delay_until_exhausted() {
223        let policy = ConstantBackoff::new(Duration::from_millis(10)).with_max_retries(2);
224
225        assert_eq!(
226            policy.next_delay(RetryContext {
227                cause: RetryCause::ConnectError,
228                failure_streak: 1,
229                server_retry: None,
230            }),
231            Some(Duration::from_millis(10)),
232        );
233        assert_eq!(
234            policy.next_delay(RetryContext {
235                cause: RetryCause::ConnectError,
236                failure_streak: 2,
237                server_retry: None,
238            }),
239            Some(Duration::from_millis(10)),
240        );
241        assert_eq!(
242            policy.next_delay(RetryContext {
243                cause: RetryCause::ConnectError,
244                failure_streak: 3,
245                server_retry: None,
246            }),
247            None,
248        );
249    }
250
251    #[test]
252    fn disconnect_does_not_consume_retry_budget() {
253        let policy = ConstantBackoff::new(Duration::from_millis(10)).with_max_retries(1);
254
255        assert_eq!(
256            policy.next_delay(RetryContext {
257                cause: RetryCause::Disconnect,
258                failure_streak: 0,
259                server_retry: None,
260            }),
261            Some(Duration::from_millis(10)),
262        );
263        assert_eq!(
264            policy.next_delay(RetryContext {
265                cause: RetryCause::ConnectError,
266                failure_streak: 1,
267                server_retry: None,
268            }),
269            Some(Duration::from_millis(10)),
270        );
271        assert_eq!(
272            policy.next_delay(RetryContext {
273                cause: RetryCause::ConnectError,
274                failure_streak: 2,
275                server_retry: None,
276            }),
277            None,
278        );
279    }
280
281    #[test]
282    fn server_retry_replaces_the_policy_base_delay() {
283        let policy = ConstantBackoff::new(Duration::from_secs(1));
284
285        assert_eq!(
286            policy.next_delay(RetryContext {
287                cause: RetryCause::Disconnect,
288                failure_streak: 0,
289                server_retry: Some(Duration::from_millis(250)),
290            }),
291            Some(Duration::from_millis(250)),
292        );
293    }
294
295    #[test]
296    fn max_delay_clamps_server_retry_delay_for_constant_backoff() {
297        let policy = ConstantBackoff::new(Duration::from_millis(10))
298            .with_max_delay(Duration::from_millis(25));
299
300        assert_eq!(
301            policy.next_delay(RetryContext {
302                cause: RetryCause::Disconnect,
303                failure_streak: 0,
304                server_retry: Some(Duration::from_millis(40)),
305            }),
306            Some(Duration::from_millis(25)),
307        );
308    }
309
310    #[test]
311    fn exponential_backoff_increases_delay_and_respects_max_delay() {
312        let policy = ExponentialBackoff::new(Duration::from_millis(10))
313            .with_max_delay(Duration::from_millis(25))
314            .with_max_retries(4);
315
316        assert_eq!(
317            policy.next_delay(RetryContext {
318                cause: RetryCause::ConnectError,
319                failure_streak: 1,
320                server_retry: None,
321            }),
322            Some(Duration::from_millis(10)),
323        );
324        assert_eq!(
325            policy.next_delay(RetryContext {
326                cause: RetryCause::ConnectError,
327                failure_streak: 2,
328                server_retry: None,
329            }),
330            Some(Duration::from_millis(20)),
331        );
332        assert_eq!(
333            policy.next_delay(RetryContext {
334                cause: RetryCause::ConnectError,
335                failure_streak: 3,
336                server_retry: None,
337            }),
338            Some(Duration::from_millis(25)),
339        );
340    }
341
342    #[test]
343    fn never_retry_always_stops_reconnecting() {
344        let policy = NeverRetry;
345
346        assert_eq!(
347            policy.next_delay(RetryContext {
348                cause: RetryCause::Disconnect,
349                failure_streak: 0,
350                server_retry: None,
351            }),
352            None,
353        );
354        assert_eq!(
355            policy.next_delay(RetryContext {
356                cause: RetryCause::ConnectError,
357                failure_streak: 1,
358                server_retry: Some(Duration::from_secs(1)),
359            }),
360            None,
361        );
362    }
363}