eventsrc_client/replayable/
retry.rs1use std::{fmt::Debug, time::Duration};
18
19use backon::{BackoffBuilder, ConstantBuilder, ExponentialBuilder};
20
21pub trait RetryPolicy: Debug + Send + Sync + 'static {
23 fn next_delay(&self, context: RetryContext) -> Option<Duration>;
25}
26
27#[non_exhaustive]
29#[derive(Clone, Copy, Debug, PartialEq, Eq)]
30pub enum RetryCause {
31 Disconnect,
33 ConnectError,
35 StreamError,
37}
38
39#[non_exhaustive]
41#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42pub struct RetryContext {
43 pub cause: RetryCause,
45 pub failure_streak: usize,
50 pub server_retry: Option<Duration>,
52}
53
54#[derive(Clone, Debug, PartialEq, Eq)]
56pub struct ConstantBackoff {
57 delay: Duration,
58 max_delay: Option<Duration>,
59 max_retries: Option<usize>,
60 jitter: bool,
61}
62
63impl ConstantBackoff {
64 pub fn new(delay: Duration) -> Self {
66 Self { delay, max_delay: None, max_retries: None, jitter: false }
67 }
68
69 pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
71 self.max_delay = Some(max_delay);
72 self
73 }
74
75 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
80 self.max_retries = Some(max_retries);
81 self
82 }
83
84 pub fn with_jitter(mut self) -> Self {
86 self.jitter = true;
87 self
88 }
89
90 fn effective_delay(&self, server_retry: Option<Duration>) -> Duration {
91 let delay = server_retry.unwrap_or(self.delay);
92
93 match self.max_delay {
94 Some(max_delay) => delay.min(max_delay),
95 None => delay,
96 }
97 }
98
99 fn builder(&self, delay: Duration) -> ConstantBuilder {
100 let mut builder = ConstantBuilder::default().with_delay(delay);
101
102 builder = match self.max_retries {
103 Some(max_retries) => builder.with_max_times(max_retries),
104 None => builder.without_max_times(),
105 };
106
107 if self.jitter { builder.with_jitter() } else { builder }
108 }
109}
110
111impl Default for ConstantBackoff {
112 fn default() -> Self {
116 Self::new(Duration::from_secs(3))
117 }
118}
119
120impl RetryPolicy for ConstantBackoff {
121 fn next_delay(&self, context: RetryContext) -> Option<Duration> {
122 let delay = self.effective_delay(context.server_retry);
123
124 match context.cause {
125 RetryCause::Disconnect => Some(delay),
126 RetryCause::ConnectError | RetryCause::StreamError => {
127 let mut backoff = self.builder(delay).build();
128 backoff.nth(context.failure_streak.saturating_sub(1))
129 },
130 }
131 }
132}
133
134#[derive(Clone, Debug, PartialEq, Eq)]
136pub struct ExponentialBackoff {
137 initial_delay: Duration,
138 max_delay: Option<Duration>,
139 max_retries: Option<usize>,
140 jitter: bool,
141}
142
143impl ExponentialBackoff {
144 pub fn new(initial_delay: Duration) -> Self {
146 Self { initial_delay, max_delay: None, max_retries: None, jitter: false }
147 }
148
149 pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
151 self.max_delay = Some(max_delay);
152 self
153 }
154
155 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
157 self.max_retries = Some(max_retries);
158 self
159 }
160
161 pub fn with_jitter(mut self) -> Self {
163 self.jitter = true;
164 self
165 }
166
167 fn effective_initial_delay(&self, server_retry: Option<Duration>) -> Duration {
168 let delay = server_retry.unwrap_or(self.initial_delay);
169
170 match self.max_delay {
171 Some(max_delay) => delay.min(max_delay),
172 None => delay,
173 }
174 }
175
176 fn builder(&self, initial_delay: Duration) -> ExponentialBuilder {
177 let mut builder = ExponentialBuilder::default().with_min_delay(initial_delay);
178
179 builder = match self.max_delay {
180 Some(max_delay) => builder.with_max_delay(max_delay),
181 None => builder.without_max_delay(),
182 };
183
184 builder = match self.max_retries {
185 Some(max_retries) => builder.with_max_times(max_retries),
186 None => builder.without_max_times(),
187 };
188
189 if self.jitter { builder.with_jitter() } else { builder }
190 }
191}
192
193impl RetryPolicy for ExponentialBackoff {
194 fn next_delay(&self, context: RetryContext) -> Option<Duration> {
195 let initial_delay = self.effective_initial_delay(context.server_retry);
196
197 match context.cause {
198 RetryCause::Disconnect => Some(initial_delay),
199 RetryCause::ConnectError | RetryCause::StreamError => {
200 let mut backoff = self.builder(initial_delay).build();
201 backoff.nth(context.failure_streak.saturating_sub(1))
202 },
203 }
204 }
205}
206
207#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
209pub struct NeverRetry;
210
211impl RetryPolicy for NeverRetry {
212 fn next_delay(&self, _context: RetryContext) -> Option<Duration> {
213 None
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[test]
222 fn constant_backoff_reuses_the_same_delay_until_exhausted() {
223 let policy = ConstantBackoff::new(Duration::from_millis(10)).with_max_retries(2);
224
225 assert_eq!(
226 policy.next_delay(RetryContext {
227 cause: RetryCause::ConnectError,
228 failure_streak: 1,
229 server_retry: None,
230 }),
231 Some(Duration::from_millis(10)),
232 );
233 assert_eq!(
234 policy.next_delay(RetryContext {
235 cause: RetryCause::ConnectError,
236 failure_streak: 2,
237 server_retry: None,
238 }),
239 Some(Duration::from_millis(10)),
240 );
241 assert_eq!(
242 policy.next_delay(RetryContext {
243 cause: RetryCause::ConnectError,
244 failure_streak: 3,
245 server_retry: None,
246 }),
247 None,
248 );
249 }
250
251 #[test]
252 fn disconnect_does_not_consume_retry_budget() {
253 let policy = ConstantBackoff::new(Duration::from_millis(10)).with_max_retries(1);
254
255 assert_eq!(
256 policy.next_delay(RetryContext {
257 cause: RetryCause::Disconnect,
258 failure_streak: 0,
259 server_retry: None,
260 }),
261 Some(Duration::from_millis(10)),
262 );
263 assert_eq!(
264 policy.next_delay(RetryContext {
265 cause: RetryCause::ConnectError,
266 failure_streak: 1,
267 server_retry: None,
268 }),
269 Some(Duration::from_millis(10)),
270 );
271 assert_eq!(
272 policy.next_delay(RetryContext {
273 cause: RetryCause::ConnectError,
274 failure_streak: 2,
275 server_retry: None,
276 }),
277 None,
278 );
279 }
280
281 #[test]
282 fn server_retry_replaces_the_policy_base_delay() {
283 let policy = ConstantBackoff::new(Duration::from_secs(1));
284
285 assert_eq!(
286 policy.next_delay(RetryContext {
287 cause: RetryCause::Disconnect,
288 failure_streak: 0,
289 server_retry: Some(Duration::from_millis(250)),
290 }),
291 Some(Duration::from_millis(250)),
292 );
293 }
294
295 #[test]
296 fn max_delay_clamps_server_retry_delay_for_constant_backoff() {
297 let policy = ConstantBackoff::new(Duration::from_millis(10))
298 .with_max_delay(Duration::from_millis(25));
299
300 assert_eq!(
301 policy.next_delay(RetryContext {
302 cause: RetryCause::Disconnect,
303 failure_streak: 0,
304 server_retry: Some(Duration::from_millis(40)),
305 }),
306 Some(Duration::from_millis(25)),
307 );
308 }
309
310 #[test]
311 fn exponential_backoff_increases_delay_and_respects_max_delay() {
312 let policy = ExponentialBackoff::new(Duration::from_millis(10))
313 .with_max_delay(Duration::from_millis(25))
314 .with_max_retries(4);
315
316 assert_eq!(
317 policy.next_delay(RetryContext {
318 cause: RetryCause::ConnectError,
319 failure_streak: 1,
320 server_retry: None,
321 }),
322 Some(Duration::from_millis(10)),
323 );
324 assert_eq!(
325 policy.next_delay(RetryContext {
326 cause: RetryCause::ConnectError,
327 failure_streak: 2,
328 server_retry: None,
329 }),
330 Some(Duration::from_millis(20)),
331 );
332 assert_eq!(
333 policy.next_delay(RetryContext {
334 cause: RetryCause::ConnectError,
335 failure_streak: 3,
336 server_retry: None,
337 }),
338 Some(Duration::from_millis(25)),
339 );
340 }
341
342 #[test]
343 fn never_retry_always_stops_reconnecting() {
344 let policy = NeverRetry;
345
346 assert_eq!(
347 policy.next_delay(RetryContext {
348 cause: RetryCause::Disconnect,
349 failure_streak: 0,
350 server_retry: None,
351 }),
352 None,
353 );
354 assert_eq!(
355 policy.next_delay(RetryContext {
356 cause: RetryCause::ConnectError,
357 failure_streak: 1,
358 server_retry: Some(Duration::from_secs(1)),
359 }),
360 None,
361 );
362 }
363}