nexus_async_rt/
backoff.rs1use std::sync::atomic::{AtomicU64, Ordering};
27use std::time::{Duration, Instant};
28
29static BACKOFF_COUNTER: AtomicU64 = AtomicU64::new(0);
31
32#[derive(Debug)]
36pub struct Exhausted<E>(pub E);
37
38impl<E: std::fmt::Display> std::fmt::Display for Exhausted<E> {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 write!(f, "backoff exhausted: {}", self.0)
41 }
42}
43
44impl<E: std::fmt::Debug + std::fmt::Display> std::error::Error for Exhausted<E> {}
45
46pub struct Backoff {
52 initial: Duration,
53 current: Duration,
54 max_delay: Duration,
55 max_retries: Option<u32>,
56 deadline: Option<Instant>,
57 retries: u32,
58 jitter: f64,
59 seed: u64,
62}
63
64impl Backoff {
65 #[must_use]
67 pub fn builder() -> BackoffBuilder {
68 BackoffBuilder::new()
69 }
70
71 pub async fn wait<E>(&mut self, err: E) -> Result<(), Exhausted<E>> {
83 if self.is_exhausted() {
84 return Err(Exhausted(err));
85 }
86
87 let delay = self.effective_delay();
88
89 if delay.is_zero() && self.deadline.is_some_and(|d| Instant::now() >= d) {
91 return Err(Exhausted(err));
92 }
93
94 crate::context::sleep(delay).await;
95 self.advance();
96 Ok(())
97 }
98
99 pub fn advance(&mut self) {
103 self.retries += 1;
104 self.current = self
105 .current
106 .checked_mul(2)
107 .map_or(self.max_delay, |next| next.min(self.max_delay));
108 }
109
110 pub fn is_exhausted(&self) -> bool {
114 if self.max_retries.is_some_and(|max| self.retries >= max) {
115 return true;
116 }
117 if self.deadline.is_some_and(|d| Instant::now() >= d) {
118 return true;
119 }
120 false
121 }
122
123 pub fn retries(&self) -> u32 {
125 self.retries
126 }
127
128 pub fn current_delay(&self) -> Duration {
130 self.current
131 }
132
133 pub fn remaining(&self) -> Option<Duration> {
135 self.deadline
136 .map(|d| d.saturating_duration_since(Instant::now()))
137 }
138
139 pub fn reset(&mut self) {
143 self.current = self.initial;
144 self.retries = 0;
145 }
146
147 fn effective_delay(&self) -> Duration {
149 let delay = self.jittered_delay();
150 self.deadline.map_or(delay, |d| {
151 delay.min(d.saturating_duration_since(Instant::now()))
152 })
153 }
154
155 fn jittered_delay(&self) -> Duration {
157 if self.jitter == 0.0 {
158 return self.current;
159 }
160
161 let hash = {
165 let a = self.retries as u64;
166 let b = self.current.as_nanos() as u64;
167 a.wrapping_mul(6_364_136_223_846_793_005)
168 .wrapping_add(b)
169 .wrapping_add(self.seed)
170 };
171 let normalized = (hash as f64 / u64::MAX as f64).mul_add(2.0, -1.0);
173 let factor = self.jitter.mul_add(normalized, 1.0);
174 let jittered_nanos = self.current.as_nanos() as f64 * factor;
175 Duration::from_nanos(jittered_nanos.max(0.0) as u64)
176 }
177}
178
179impl std::fmt::Debug for Backoff {
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 f.debug_struct("Backoff")
182 .field("current", &self.current)
183 .field("retries", &self.retries)
184 .field("max_delay", &self.max_delay)
185 .field("max_retries", &self.max_retries)
186 .field("deadline", &self.deadline)
187 .field("jitter", &self.jitter)
188 .finish()
189 }
190}
191
192pub struct BackoffBuilder {
198 initial: Duration,
199 max_delay: Duration,
200 max_retries: Option<u32>,
201 deadline: Option<Instant>,
202 jitter: f64,
203}
204
205impl BackoffBuilder {
206 #[must_use]
207 fn new() -> Self {
208 Self {
209 initial: Duration::from_millis(100),
210 max_delay: Duration::from_secs(30),
211 max_retries: None,
212 deadline: None,
213 jitter: 0.0,
214 }
215 }
216
217 #[must_use]
219 pub fn initial(mut self, d: Duration) -> Self {
220 self.initial = d;
221 self
222 }
223
224 #[must_use]
226 pub fn max_delay(mut self, d: Duration) -> Self {
227 self.max_delay = d;
228 self
229 }
230
231 #[must_use]
233 pub fn max_retries(mut self, n: u32) -> Self {
234 self.max_retries = Some(n);
235 self
236 }
237
238 #[must_use]
243 pub fn deadline(mut self, deadline: Instant) -> Self {
244 self.deadline = Some(deadline);
245 self
246 }
247
248 #[must_use]
253 pub fn jitter(mut self, factor: f64) -> Self {
254 assert!(
255 (0.0..=1.0).contains(&factor),
256 "jitter must be between 0.0 and 1.0, got {factor}"
257 );
258 self.jitter = factor;
259 self
260 }
261
262 #[must_use]
268 pub fn build(self) -> Backoff {
269 assert!(
270 self.initial <= self.max_delay,
271 "initial delay ({:?}) must not exceed max_delay ({:?})",
272 self.initial,
273 self.max_delay,
274 );
275 Backoff {
276 initial: self.initial,
277 current: self.initial,
278 max_delay: self.max_delay,
279 max_retries: self.max_retries,
280 deadline: self.deadline,
281 retries: 0,
282 jitter: self.jitter,
283 seed: BACKOFF_COUNTER.fetch_add(1, Ordering::Relaxed),
284 }
285 }
286}
287
288impl Default for BackoffBuilder {
289 fn default() -> Self {
290 Self::new()
291 }
292}
293
294#[cfg(test)]
299mod tests {
300 use super::*;
301
302 #[test]
303 fn doubles_each_step() {
304 let mut b = Backoff::builder()
305 .initial(Duration::from_millis(10))
306 .max_delay(Duration::from_secs(10))
307 .build();
308
309 assert_eq!(b.current_delay(), Duration::from_millis(10));
310 b.advance();
311 assert_eq!(b.current_delay(), Duration::from_millis(20));
312 b.advance();
313 assert_eq!(b.current_delay(), Duration::from_millis(40));
314 b.advance();
315 assert_eq!(b.current_delay(), Duration::from_millis(80));
316 }
317
318 #[test]
319 fn caps_at_max() {
320 let mut b = Backoff::builder()
321 .initial(Duration::from_secs(1))
322 .max_delay(Duration::from_secs(5))
323 .build();
324
325 b.advance(); b.advance(); b.advance(); assert_eq!(b.current_delay(), Duration::from_secs(5));
329 b.advance(); assert_eq!(b.current_delay(), Duration::from_secs(5));
331 }
332
333 #[test]
334 fn exhausted_after_max_retries() {
335 let mut b = Backoff::builder()
336 .initial(Duration::from_millis(1))
337 .max_retries(3)
338 .build();
339
340 assert!(!b.is_exhausted());
341 b.advance();
342 assert!(!b.is_exhausted());
343 b.advance();
344 assert!(!b.is_exhausted());
345 b.advance();
346 assert!(b.is_exhausted());
347 }
348
349 #[test]
350 fn unlimited_retries() {
351 let mut b = Backoff::builder()
352 .initial(Duration::from_millis(1))
353 .max_delay(Duration::from_millis(1))
354 .build();
355
356 for _ in 0..10_000 {
357 b.advance();
358 }
359 assert!(!b.is_exhausted());
360 assert_eq!(b.retries(), 10_000);
361 }
362
363 #[test]
364 fn reset_restores_initial() {
365 let mut b = Backoff::builder()
366 .initial(Duration::from_millis(10))
367 .max_retries(5)
368 .build();
369
370 b.advance();
371 b.advance();
372 b.advance();
373 assert_eq!(b.retries(), 3);
374 assert_eq!(b.current_delay(), Duration::from_millis(80));
375
376 b.reset();
377 assert_eq!(b.retries(), 0);
378 assert_eq!(b.current_delay(), Duration::from_millis(10));
379 }
380
381 #[test]
382 fn jitter_stays_in_range() {
383 let mut b = Backoff::builder()
384 .initial(Duration::from_millis(100))
385 .max_delay(Duration::from_secs(10))
386 .jitter(0.5)
387 .build();
388
389 for _ in 0..20 {
392 let delay = b.jittered_delay();
393 let base = b.current_delay().as_nanos();
394 let actual = delay.as_nanos();
395 let lo = (base as f64 * 0.5) as u128;
396 let hi = (base as f64 * 1.5) as u128 + 1;
397 assert!(
398 actual >= lo && actual <= hi,
399 "delay {actual}ns out of range [{lo}, {hi}] for base {base}ns"
400 );
401 b.advance();
402 }
403 }
404
405 #[test]
406 #[should_panic(expected = "jitter must be between")]
407 fn jitter_out_of_range_panics() {
408 let _ = Backoff::builder().jitter(1.5).build();
409 }
410
411 #[test]
412 #[should_panic(expected = "initial delay")]
413 fn initial_exceeds_max_delay_panics() {
414 Backoff::builder()
415 .initial(Duration::from_secs(60))
416 .max_delay(Duration::from_secs(5))
417 .build();
418 }
419
420 #[test]
421 fn default_values() {
422 let b = Backoff::builder().build();
423 assert_eq!(b.current_delay(), Duration::from_millis(100));
424 assert_eq!(b.max_delay, Duration::from_secs(30));
425 assert!(!b.is_exhausted());
426 assert!(b.remaining().is_none());
427 }
428
429 #[test]
430 fn deadline_exhausts() {
431 let b = Backoff::builder()
433 .initial(Duration::from_millis(10))
434 .deadline(Instant::now() - Duration::from_secs(1))
435 .build();
436
437 assert!(b.is_exhausted());
438 }
439
440 #[test]
441 fn deadline_remaining() {
442 let deadline = Instant::now() + Duration::from_secs(60);
443 let b = Backoff::builder()
444 .initial(Duration::from_millis(10))
445 .deadline(deadline)
446 .build();
447
448 let remaining = b.remaining().expect("should have remaining");
449 assert!(remaining > Duration::ZERO);
450 assert!(remaining <= Duration::from_secs(60));
451 }
452
453 #[test]
454 fn effective_delay_capped_by_deadline() {
455 let b = Backoff::builder()
457 .initial(Duration::from_millis(50))
458 .max_delay(Duration::from_secs(10))
459 .deadline(Instant::now() + Duration::from_millis(50))
460 .build();
461
462 let delay = b.effective_delay();
463 assert!(delay <= Duration::from_millis(55));
465 }
466
467 #[test]
468 fn reset_does_not_clear_deadline() {
469 let deadline = Instant::now() + Duration::from_secs(30);
470 let mut b = Backoff::builder()
471 .initial(Duration::from_millis(10))
472 .deadline(deadline)
473 .build();
474
475 b.advance();
476 b.advance();
477 b.reset();
478
479 assert!(b.remaining().is_some());
481 assert_eq!(b.retries(), 0);
482 assert_eq!(b.current_delay(), Duration::from_millis(10));
483 }
484
485 #[test]
486 fn advance_does_not_overflow_large_delay() {
487 let mut b = Backoff::builder()
488 .initial(Duration::from_secs(u64::MAX / 4))
489 .max_delay(Duration::from_secs(u64::MAX / 4))
490 .build();
491
492 b.advance();
494 assert_eq!(b.current_delay(), Duration::from_secs(u64::MAX / 4));
495 }
496
497 #[test]
498 fn different_instances_different_jitter() {
499 let a = Backoff::builder()
500 .initial(Duration::from_millis(100))
501 .jitter(0.5)
502 .build();
503 let b = Backoff::builder()
504 .initial(Duration::from_millis(100))
505 .jitter(0.5)
506 .build();
507
508 assert_ne!(a.seed, b.seed);
510 }
511}