snapdir_stores/retry.rs
1//! Reusable retry/backoff engine for the network stores.
2//!
3//! This module is the **core** of snapdir's transient-failure retry policy: an
4//! SDK-agnostic loop that re-runs a fallible operation under a *full-jitter*
5//! exponential backoff schedule, honouring a server-supplied `Retry-After`
6//! hint. Like [`crate::adaptive`], it performs **no** real I/O and reads **no**
7//! real clock itself — the two impure dependencies (the jitter source and the
8//! sleep) are *injected* via the [`Jitter`] and [`AsyncSleeper`] /
9//! [`BlockingSleeper`] traits, so the engine and its backoff math are fully
10//! deterministic under test.
11//!
12//! Three pieces:
13//!
14//! - [`RetryPolicy`] — the schedule (attempt budget + base/cap durations) and
15//! the [`backoff`](RetryPolicy::backoff) math (full-jitter exp with a server
16//! hint floor).
17//! - [`Jitter`] — a `[0,1)` source. Production: [`DefaultJitter`], a tiny
18//! hand-rolled **`SplitMix64`** PRNG (no new dependency). Tests inject a fixed
19//! value.
20//! - [`AsyncSleeper`] / [`BlockingSleeper`] — the (async/blocking) sleep, plus
21//! production impls ([`TokioSleeper`] / [`ThreadSleeper`]) and a recording
22//! test fake.
23//!
24//! The engine itself — [`retry_async`] / [`retry_blocking`] — drives an
25//! operation that yields `Ok(T)` or `Err(`[`Attempt`]`)`; an [`Attempt`] carries
26//! the [`StoreError`], whether it is `transient`, and an optional
27//! `retry_after`. The next gate (`stores-backoff-wire`) feeds those from
28//! [`classify_error`](crate::classify_error) at the call sites; this gate ships
29//! only the engine + its unit tests and wires into nothing.
30
31// The backoff math works in `f64` seconds-space and the jitter PRNG maps a
32// `u64` to a `[0,1)` double; both are *advisory* timing signals (never a
33// correctness path), so the pedantic cast lints are allowed module-wide,
34// matching the `adaptive` controller's convention.
35#![allow(
36 clippy::cast_precision_loss,
37 clippy::cast_possible_truncation,
38 clippy::cast_possible_wrap,
39 clippy::cast_sign_loss
40)]
41
42use std::future::Future;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::time::{Duration, SystemTime, UNIX_EPOCH};
45
46use snapdir_core::StoreError;
47
48use crate::transfer::RateLimiter;
49
50// ---------------------------------------------------------------------------
51// RetryPolicy + backoff math.
52// ---------------------------------------------------------------------------
53
54/// The retry schedule: how many attempts to make and the exponential-backoff
55/// base/cap bounding each inter-attempt delay.
56#[derive(Clone, Copy, Debug, PartialEq, Eq)]
57pub struct RetryPolicy {
58 /// Total number of attempts, **including the first** (so `max_attempts = 5`
59 /// means one try plus up to four retries). Clamped to at least 1 by the
60 /// engine.
61 pub max_attempts: u32,
62 /// The base delay; the un-jittered exponential is `base × 2^n` for the
63 /// 0-based attempt index `n`.
64 pub base: Duration,
65 /// The hard ceiling on the un-jittered exponential (and therefore on the
66 /// jittered delay, absent a larger server hint).
67 pub cap: Duration,
68}
69
70impl Default for RetryPolicy {
71 fn default() -> Self {
72 Self {
73 max_attempts: 5,
74 base: Duration::from_millis(250),
75 cap: Duration::from_secs(30),
76 }
77 }
78}
79
80impl RetryPolicy {
81 /// Computes the inter-attempt delay before the retry that follows the
82 /// 0-based failed-attempt index `n`, under full-jitter exponential backoff.
83 ///
84 /// 1. `exp = min(cap, base × 2^n)` — the un-jittered envelope, saturated at
85 /// `cap` (the `2^n` growth is computed in `f64` and clamped, so it can
86 /// never overflow).
87 /// 2. `jittered = jitter01 × exp` — full jitter spreads the delay uniformly
88 /// over `[0, exp)` (`jitter01 ∈ [0, 1)` is supplied by the [`Jitter`]
89 /// source so tests are deterministic).
90 /// 3. `delay = max(server_hint.unwrap_or(0), jittered)` — a server
91 /// `Retry-After` acts as a floor: we never retry sooner than the server
92 /// asked, but may wait longer if the jittered exponential is larger.
93 #[must_use]
94 fn backoff(&self, n: u32, server_hint: Option<Duration>, jitter01: f64) -> Duration {
95 let cap = self.cap;
96 // exp = min(cap, base * 2^n), overflow-safe.
97 // `2f64.powi` saturates to +inf for large n; min(cap_secs, ..) clamps it
98 // before it ever leaves f64 space, so the cast back to Duration is safe.
99 let base_secs = self.base.as_secs_f64();
100 let cap_secs = cap.as_secs_f64();
101 // n as i32 for powi; for n beyond i32::MAX (impossible here) saturate.
102 let exp_secs = if n >= 1024 {
103 // 2^1024 already overflows f64 to +inf; short-circuit to the cap.
104 cap_secs
105 } else {
106 let factor = 2f64.powi(n as i32);
107 (base_secs * factor).min(cap_secs)
108 };
109 // Defensive clamp: NaN/negative -> 0, and never exceed the cap.
110 let exp_secs = if exp_secs.is_finite() {
111 exp_secs.clamp(0.0, cap_secs)
112 } else {
113 cap_secs
114 };
115 // Full jitter over [0, exp). jitter01 is contractually in [0,1); clamp
116 // defensively so a misbehaving source can never exceed the envelope.
117 let frac = if jitter01.is_finite() {
118 jitter01.clamp(0.0, 1.0)
119 } else {
120 0.0
121 };
122 let jittered = Duration::from_secs_f64(exp_secs * frac);
123
124 // Server hint is a floor (we honour "wait at least this long"). Without
125 // a hint the delay is `jittered` (<= exp <= cap); with a hint it is
126 // `max(hint, jittered)`, which the hint is allowed to push above cap.
127 match server_hint {
128 Some(hint) => hint.max(jittered),
129 None => jittered,
130 }
131 }
132}
133
134// ---------------------------------------------------------------------------
135// Jitter — [0,1) source (production SplitMix64 + injectable test stub).
136// ---------------------------------------------------------------------------
137
138/// A source of uniform `[0, 1)` fractions used to spread retry delays
139/// (full-jitter). Injected so the backoff sequence is deterministic in tests.
140pub trait Jitter {
141 /// Returns the next jitter fraction in `[0, 1)`.
142 fn jitter01(&self) -> f64;
143}
144
145/// Process-wide seed counter mixed into each [`DefaultJitter`] so two instances
146/// constructed in the same nanosecond still diverge.
147static JITTER_SEED_COUNTER: AtomicU64 = AtomicU64::new(0);
148
149/// Production [`Jitter`]: a tiny, hand-rolled **`SplitMix64`** PRNG (no external
150/// dependency).
151///
152/// Seeded once at construction from a one-time nanosecond clock sample `XOR`ed
153/// with a monotonically advancing process counter; each
154/// [`jitter01`](Jitter::jitter01) advances the 64-bit state and maps the top 53
155/// bits to a `[0, 1)` double. `SplitMix64` is the standard seeding PRNG (it is
156/// what `rand`'s `SeedableRng::seed_from_u64` uses internally) and is more than
157/// adequate for *jitter* — this is decorrelation, not cryptography.
158///
159/// Interior mutability ([`AtomicU64`]) lets `jitter01` take `&self` (matching
160/// the trait) while still advancing the stream, so a single instance can be
161/// shared across the retry loop.
162#[derive(Debug)]
163pub struct DefaultJitter {
164 state: AtomicU64,
165}
166
167impl DefaultJitter {
168 /// Builds a jitter source seeded from the current nanosecond clock `XOR`ed
169 /// with a process-wide counter (so concurrent constructions diverge).
170 #[must_use]
171 pub fn new() -> Self {
172 let nanos = SystemTime::now()
173 .duration_since(UNIX_EPOCH)
174 .map_or(0, |d| d.as_nanos() as u64);
175 let counter = JITTER_SEED_COUNTER.fetch_add(1, Ordering::Relaxed);
176 // Mix the counter in via SplitMix64 so adjacent seeds aren't adjacent
177 // states.
178 let seed = nanos ^ splitmix64_mix(counter.wrapping_add(0x9E37_79B9_7F4A_7C15));
179 Self {
180 state: AtomicU64::new(seed),
181 }
182 }
183}
184
185impl Default for DefaultJitter {
186 fn default() -> Self {
187 Self::new()
188 }
189}
190
191impl Jitter for DefaultJitter {
192 fn jitter01(&self) -> f64 {
193 // Advance the SplitMix64 state and map the result to [0,1).
194 let z = self
195 .state
196 .fetch_add(0x9E37_79B9_7F4A_7C15, Ordering::Relaxed)
197 .wrapping_add(0x9E37_79B9_7F4A_7C15);
198 let bits = splitmix64_mix(z);
199 u64_to_unit_f64(bits)
200 }
201}
202
203/// The `SplitMix64` finalizing mix (the avalanche step of the `SplitMix64` PRNG).
204#[inline]
205fn splitmix64_mix(mut z: u64) -> u64 {
206 z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
207 z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
208 z ^ (z >> 31)
209}
210
211/// Maps a uniformly random `u64` to a `[0, 1)` double using its top 53 bits
212/// (the mantissa width), the standard unbiased construction.
213#[inline]
214fn u64_to_unit_f64(bits: u64) -> f64 {
215 // 53 high bits / 2^53 lands in [0, 1).
216 ((bits >> 11) as f64) / (1u64 << 53) as f64
217}
218
219/// A deterministic [`Jitter`] for tests: always returns the same fixed
220/// fraction (clamped to `[0, 1)`).
221#[derive(Clone, Copy, Debug)]
222pub struct FixedJitter(pub f64);
223
224impl Jitter for FixedJitter {
225 fn jitter01(&self) -> f64 {
226 // Keep within the [0,1) contract (strictly below 1.0).
227 self.0.clamp(0.0, 1.0 - f64::EPSILON)
228 }
229}
230
231// ---------------------------------------------------------------------------
232// Sleeper — async + blocking sleep (production + recording test fake).
233// ---------------------------------------------------------------------------
234
235/// An async sleep, injected so the [`retry_async`] engine can be driven without
236/// real waits in tests.
237pub trait AsyncSleeper {
238 /// Sleeps for `dur` (asynchronously).
239 fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send;
240}
241
242/// An OS-thread-blocking sleep, injected so the [`retry_blocking`] engine can be
243/// driven without real waits in tests. Mirrors the blocking transfer path
244/// ([`BlockingRateLimiter`](crate::BlockingRateLimiter)), which parks the thread
245/// with [`std::thread::sleep`].
246pub trait BlockingSleeper {
247 /// Sleeps for `dur` (parks the calling thread).
248 fn sleep(&self, dur: Duration);
249}
250
251/// Production [`AsyncSleeper`] backed by [`tokio::time::sleep`].
252#[derive(Clone, Copy, Debug, Default)]
253pub struct TokioSleeper;
254
255impl AsyncSleeper for TokioSleeper {
256 fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send {
257 tokio::time::sleep(dur)
258 }
259}
260
261/// Production [`BlockingSleeper`] backed by [`std::thread::sleep`].
262#[derive(Clone, Copy, Debug, Default)]
263pub struct ThreadSleeper;
264
265impl BlockingSleeper for ThreadSleeper {
266 fn sleep(&self, dur: Duration) {
267 std::thread::sleep(dur);
268 }
269}
270
271// ---------------------------------------------------------------------------
272// Engine — SDK-agnostic over an attempt outcome.
273// ---------------------------------------------------------------------------
274
275/// The outcome of a single failed attempt, handed back to the retry engine.
276///
277/// The caller (a store operation) decides whether the failure is `transient`
278/// (worth retrying) and threads through any server `retry_after` hint. The
279/// next gate populates these from
280/// [`classify_error`](crate::classify_error); the engine itself is SDK-agnostic.
281#[derive(Debug)]
282pub struct Attempt {
283 /// The error this attempt produced (surfaced to the caller if retries are
284 /// exhausted or the error is non-transient).
285 pub err: StoreError,
286 /// Whether the error is transient (throttle / timeout-class) and therefore
287 /// retryable.
288 pub transient: bool,
289 /// An optional server-supplied minimum delay (`Retry-After`); acts as a
290 /// floor on the next backoff.
291 pub retry_after: Option<Duration>,
292}
293
294/// The most attempts the engine will ever make (defensive clamp on
295/// [`RetryPolicy::max_attempts`]).
296#[inline]
297fn clamped_max_attempts(policy: &RetryPolicy) -> u32 {
298 policy.max_attempts.max(1)
299}
300
301/// Runs `op` under `policy`'s full-jitter backoff, retrying transient failures.
302///
303/// The operation is invoked up to `policy.max_attempts` times. On `Ok(T)` the
304/// value is returned immediately. On `Err(`[`Attempt`]`)`: if the attempt is
305/// `transient` **and** attempts remain, the engine sleeps for
306/// [`RetryPolicy::backoff`] (with the attempt's `retry_after` hint and a fresh
307/// jitter sample) via `sleeper`, then retries; otherwise the attempt's
308/// [`StoreError`] is returned. A non-transient error short-circuits on the
309/// first failure (no sleep).
310///
311/// `sleeper` and `jitter` are injected so tests can record the delay sequence
312/// without real waits.
313///
314/// # Errors
315///
316/// Returns the last [`Attempt`]'s [`StoreError`] when the operation does not
317/// succeed within the attempt budget (or fails non-transiently).
318pub async fn retry_async<T, F, Fut>(
319 policy: &RetryPolicy,
320 sleeper: &impl AsyncSleeper,
321 jitter: &impl Jitter,
322 mut op: F,
323) -> Result<T, StoreError>
324where
325 F: FnMut() -> Fut,
326 Fut: Future<Output = Result<T, Attempt>>,
327{
328 let max = clamped_max_attempts(policy);
329 let mut n: u32 = 0;
330 loop {
331 match op().await {
332 Ok(value) => return Ok(value),
333 Err(attempt) => {
334 let attempts_used = n + 1;
335 if attempt.transient && attempts_used < max {
336 let delay = policy.backoff(n, attempt.retry_after, jitter.jitter01());
337 sleeper.sleep(delay).await;
338 n += 1;
339 } else {
340 return Err(attempt.err);
341 }
342 }
343 }
344 }
345}
346
347/// Blocking sibling of [`retry_async`]: same control flow over a synchronous
348/// `op`, sleeping on the calling thread via a [`BlockingSleeper`]. Used by the
349/// rayon store-to-store sync path.
350///
351/// # Errors
352///
353/// Returns the last [`Attempt`]'s [`StoreError`] when the operation does not
354/// succeed within the attempt budget (or fails non-transiently).
355pub fn retry_blocking<T, F>(
356 policy: &RetryPolicy,
357 sleeper: &impl BlockingSleeper,
358 jitter: &impl Jitter,
359 mut op: F,
360) -> Result<T, StoreError>
361where
362 F: FnMut() -> Result<T, Attempt>,
363{
364 let max = clamped_max_attempts(policy);
365 let mut n: u32 = 0;
366 loop {
367 match op() {
368 Ok(value) => return Ok(value),
369 Err(attempt) => {
370 let attempts_used = n + 1;
371 if attempt.transient && attempts_used < max {
372 let delay = policy.backoff(n, attempt.retry_after, jitter.jitter01());
373 sleeper.sleep(delay);
374 n += 1;
375 } else {
376 return Err(attempt.err);
377 }
378 }
379 }
380 }
381}
382
383// ---------------------------------------------------------------------------
384// retry_network — the per-call boundary the network stores wrap each SDK call
385// in: acquire one request-rate token, then run the operation under the retry
386// engine. This is the single injectable seam the `backoff_wire` tests drive
387// (with a scripted `op` + recording sleeper), so the wiring is exercised with
388// NO live cloud.
389// ---------------------------------------------------------------------------
390
391/// Runs a single network operation through the full retry/limiter wiring.
392///
393/// Before **every** attempt this acquires one token from `req_limiter` (the
394/// per-call request-rate bucket — `acquire(1)`, a no-op when the limiter is
395/// unlimited), then drives `op` under [`retry_async`] with `policy`'s
396/// full-jitter backoff. `op` is the SDK call: it returns `Ok(T)` on success or
397/// `Err(`[`Attempt`]`)` carrying the mapped [`StoreError`], whether the failure
398/// is `transient`, and any server `retry_after` hint extracted from the
399/// concrete SDK error.
400///
401/// The real stores pass an `op` that performs the SDK call and builds the
402/// `Attempt` on error (via the per-SDK `*_attempt_from_err` extractors); the
403/// `backoff_wire` tests pass an `op` that replays a scripted sequence of
404/// `Attempt`s, so the retry/limiter wiring is exercised without any network.
405///
406/// # Errors
407///
408/// Returns the last [`Attempt`]'s [`StoreError`] when `op` does not succeed
409/// within the attempt budget (or fails non-transiently).
410pub async fn retry_network<T, F, Fut>(
411 policy: &RetryPolicy,
412 req_limiter: &RateLimiter,
413 sleeper: &impl AsyncSleeper,
414 jitter: &impl Jitter,
415 mut op: F,
416) -> Result<T, StoreError>
417where
418 F: FnMut() -> Fut,
419 Fut: Future<Output = Result<T, Attempt>>,
420{
421 // This mirrors [`retry_async`]'s control flow exactly, but inlines the
422 // per-attempt request-token acquire so it can call `op` directly each
423 // iteration (the `FnMut() -> Fut` bound on `retry_async` is non-lending, so
424 // an `op` wrapper future that re-borrows `op`/`req_limiter` cannot escape
425 // its closure — driving the loop here sidesteps that).
426 let max = clamped_max_attempts(policy);
427 let mut n: u32 = 0;
428 loop {
429 // Pace each (re)try through the per-call request-rate limiter (a no-op
430 // when unlimited), then run the operation.
431 req_limiter.acquire(1).await;
432 match op().await {
433 Ok(value) => return Ok(value),
434 Err(attempt) => {
435 let attempts_used = n + 1;
436 if attempt.transient && attempts_used < max {
437 let delay = policy.backoff(n, attempt.retry_after, jitter.jitter01());
438 sleeper.sleep(delay).await;
439 n += 1;
440 } else {
441 return Err(attempt.err);
442 }
443 }
444 }
445 }
446}
447
448/// Parses an HTTP `Retry-After` header value into a [`Duration`].
449///
450/// Honours the **delta-seconds** form (`Retry-After: 120`) — by far the common
451/// case for `429`/`503` throttling from S3/GCS — returning `Some(120s)`. The
452/// HTTP-date form is intentionally NOT parsed (it would need a date crate and a
453/// wall-clock read; the backoff engine already provides a sane delay when the
454/// hint is absent), so a non-numeric value yields `None`. A value of `0` maps
455/// to `Some(Duration::ZERO)` (a valid "retry now" floor).
456#[must_use]
457pub fn parse_retry_after(value: &str) -> Option<Duration> {
458 let trimmed = value.trim();
459 trimmed.parse::<u64>().ok().map(Duration::from_secs)
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
466 use std::sync::Mutex;
467
468 // ----- helpers ----------------------------------------------------------
469
470 /// A recording fake [`AsyncSleeper`] / [`BlockingSleeper`]: never sleeps for
471 /// real, just appends each requested duration so tests can assert the exact
472 /// backoff sequence.
473 #[derive(Default)]
474 struct RecordingSleeper {
475 delays: Mutex<Vec<Duration>>,
476 }
477
478 impl RecordingSleeper {
479 fn recorded(&self) -> Vec<Duration> {
480 self.delays.lock().unwrap().clone()
481 }
482 }
483
484 impl AsyncSleeper for RecordingSleeper {
485 fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send {
486 self.delays.lock().unwrap().push(dur);
487 std::future::ready(())
488 }
489 }
490
491 impl BlockingSleeper for RecordingSleeper {
492 fn sleep(&self, dur: Duration) {
493 self.delays.lock().unwrap().push(dur);
494 }
495 }
496
497 /// A deterministic [`Jitter`] returning a fixed fraction (no clamping
498 /// surprises — kept strictly in `[0,1)`).
499 struct StubJitter(f64);
500 impl Jitter for StubJitter {
501 fn jitter01(&self) -> f64 {
502 self.0
503 }
504 }
505
506 fn boom() -> StoreError {
507 StoreError::Backend {
508 message: "boom".into(),
509 source: None,
510 }
511 }
512
513 fn transient_attempt(retry_after: Option<Duration>) -> Attempt {
514 Attempt {
515 err: boom(),
516 transient: true,
517 retry_after,
518 }
519 }
520
521 fn hard_attempt() -> Attempt {
522 Attempt {
523 err: boom(),
524 transient: false,
525 retry_after: None,
526 }
527 }
528
529 fn runtime() -> tokio::runtime::Runtime {
530 tokio::runtime::Builder::new_current_thread()
531 .enable_time()
532 .build()
533 .expect("build tokio runtime")
534 }
535
536 fn small_policy() -> RetryPolicy {
537 // 5 attempts (=> up to 4 sleeps), tiny base/cap so jittered delays are
538 // easy to reason about.
539 RetryPolicy {
540 max_attempts: 5,
541 base: Duration::from_millis(100),
542 cap: Duration::from_secs(10),
543 }
544 }
545
546 // ----- (1) exact attempt count on persistent transient failure ----------
547
548 #[test]
549 fn retry_async_persistent_transient_uses_full_attempt_budget() {
550 let rt = runtime();
551 rt.block_on(async {
552 let policy = small_policy();
553 let sleeper = RecordingSleeper::default();
554 let jitter = StubJitter(0.5);
555 let calls = AtomicUsize::new(0);
556
557 let result: Result<(), StoreError> = retry_async(&policy, &sleeper, &jitter, || {
558 calls.fetch_add(1, AtomicOrdering::SeqCst);
559 async { Err(transient_attempt(None)) }
560 })
561 .await;
562
563 assert!(result.is_err(), "persistent transient => surfaces the err");
564 assert_eq!(
565 calls.load(AtomicOrdering::SeqCst),
566 policy.max_attempts as usize,
567 "op invoked exactly max_attempts times"
568 );
569 assert_eq!(
570 sleeper.recorded().len(),
571 (policy.max_attempts - 1) as usize,
572 "max_attempts-1 sleeps recorded (no sleep after the last attempt)"
573 );
574 });
575 }
576
577 // ----- (2) immediate return on hard (non-transient) error ---------------
578
579 #[test]
580 fn retry_async_hard_error_returns_immediately_without_sleeping() {
581 let rt = runtime();
582 rt.block_on(async {
583 let policy = small_policy();
584 let sleeper = RecordingSleeper::default();
585 let jitter = StubJitter(0.5);
586 let calls = AtomicUsize::new(0);
587
588 let result: Result<(), StoreError> = retry_async(&policy, &sleeper, &jitter, || {
589 calls.fetch_add(1, AtomicOrdering::SeqCst);
590 async { Err(hard_attempt()) }
591 })
592 .await;
593
594 assert!(result.is_err(), "hard error surfaces");
595 assert_eq!(
596 calls.load(AtomicOrdering::SeqCst),
597 1,
598 "non-transient error => op called exactly once"
599 );
600 assert!(
601 sleeper.recorded().is_empty(),
602 "no sleeps for a non-transient error"
603 );
604 });
605 }
606
607 // ----- (3) success after k transient failures ---------------------------
608
609 #[test]
610 fn retry_async_success_after_k_transient_fails() {
611 let rt = runtime();
612 rt.block_on(async {
613 let policy = small_policy();
614 let sleeper = RecordingSleeper::default();
615 let jitter = StubJitter(0.25);
616 let calls = AtomicUsize::new(0);
617 let k = 3usize;
618
619 let result: Result<u32, StoreError> = retry_async(&policy, &sleeper, &jitter, || {
620 let prev = calls.fetch_add(1, AtomicOrdering::SeqCst);
621 async move {
622 if prev < k {
623 Err(transient_attempt(None))
624 } else {
625 Ok(42)
626 }
627 }
628 })
629 .await;
630
631 assert_eq!(result.unwrap(), 42, "succeeds on the (k+1)-th attempt");
632 assert_eq!(
633 calls.load(AtomicOrdering::SeqCst),
634 k + 1,
635 "op invoked k+1 times"
636 );
637 assert_eq!(
638 sleeper.recorded().len(),
639 k,
640 "exactly k sleeps recorded (one before each retry)"
641 );
642 });
643 }
644
645 // ----- (4) Retry-After honoured as a floor ------------------------------
646
647 #[test]
648 fn retry_async_retry_after_is_a_floor() {
649 let rt = runtime();
650 rt.block_on(async {
651 let policy = small_policy();
652 let sleeper = RecordingSleeper::default();
653 // jitter01 = 0 => jittered exp = 0, so the hint dominates entirely.
654 let jitter = StubJitter(0.0);
655 // A hint far larger than the first jittered exp (base*2^0 = 100ms).
656 let hint = Duration::from_secs(5);
657 let calls = AtomicUsize::new(0);
658
659 let _result: Result<(), StoreError> = retry_async(&policy, &sleeper, &jitter, || {
660 calls.fetch_add(1, AtomicOrdering::SeqCst);
661 async move { Err(transient_attempt(Some(hint))) }
662 })
663 .await;
664
665 let recorded = sleeper.recorded();
666 assert!(!recorded.is_empty(), "at least one retry happened");
667 for d in &recorded {
668 assert!(
669 *d >= hint,
670 "recorded delay {d:?} must be >= the server hint {hint:?}"
671 );
672 }
673 });
674 }
675
676 // ----- (5) cap respected (no hint) --------------------------------------
677
678 #[test]
679 fn retry_async_cap_respected_for_large_n() {
680 let rt = runtime();
681 rt.block_on(async {
682 // Many attempts so n grows large; base*2^n would blow past cap.
683 let policy = RetryPolicy {
684 max_attempts: 12,
685 base: Duration::from_millis(250),
686 cap: Duration::from_secs(2),
687 };
688 let sleeper = RecordingSleeper::default();
689 // jitter01 just below 1 => jittered ~ exp, the worst case for the cap.
690 let jitter = StubJitter(0.999_999);
691
692 let _result: Result<(), StoreError> =
693 retry_async(&policy, &sleeper, &jitter, || async {
694 Err(transient_attempt(None))
695 })
696 .await;
697
698 for d in sleeper.recorded() {
699 assert!(
700 d <= policy.cap,
701 "delay {d:?} must never exceed cap {:?} (no hint)",
702 policy.cap
703 );
704 }
705 });
706 }
707
708 // ----- (6) full-jitter bounds: delay == jitter01 * min(cap, base*2^n) ----
709
710 #[test]
711 fn backoff_full_jitter_lands_in_envelope() {
712 let policy = RetryPolicy {
713 max_attempts: 10,
714 base: Duration::from_millis(100),
715 cap: Duration::from_secs(30),
716 };
717 for n in 0..8u32 {
718 let frac = 0.37;
719 let delay = policy.backoff(n, None, frac);
720 let exp = policy
721 .base
722 .as_secs_f64()
723 .mul_add(2f64.powi(n as i32), 0.0)
724 .min(policy.cap.as_secs_f64());
725 let expected = exp * frac;
726 // delay == frac * min(cap, base*2^n) (within float tolerance).
727 assert!(
728 (delay.as_secs_f64() - expected).abs() < 1e-9,
729 "n={n}: delay {delay:?} != jitter01*envelope {expected}"
730 );
731 // and it lands in [0, min(cap, base*2^n)).
732 assert!(
733 delay.as_secs_f64() >= 0.0 && delay.as_secs_f64() < exp + 1e-9,
734 "n={n}: delay {delay:?} outside [0, {exp})"
735 );
736 }
737 }
738
739 #[test]
740 fn backoff_saturates_at_cap_for_huge_n() {
741 let policy = RetryPolicy {
742 max_attempts: 99,
743 base: Duration::from_millis(250),
744 cap: Duration::from_secs(30),
745 };
746 // jitter01 ~ 1 so the jittered delay tracks the (capped) envelope.
747 let d = policy.backoff(2000, None, 0.999_999);
748 assert!(
749 d <= policy.cap,
750 "huge n must saturate at the cap, got {d:?}"
751 );
752 assert!(
753 d.as_secs_f64() > 29.0,
754 "with jitter ~1 the delay should be near the cap, got {d:?}"
755 );
756 }
757
758 #[test]
759 fn default_jitter_is_in_unit_interval() {
760 let j = DefaultJitter::new();
761 for _ in 0..10_000 {
762 let x = j.jitter01();
763 assert!((0.0..1.0).contains(&x), "jitter {x} outside [0,1)");
764 }
765 // And two instances produce different streams (seed divergence).
766 let a = DefaultJitter::new();
767 let b = DefaultJitter::new();
768 let sa: Vec<f64> = (0..4).map(|_| a.jitter01()).collect();
769 let sb: Vec<f64> = (0..4).map(|_| b.jitter01()).collect();
770 assert_ne!(sa, sb, "distinct instances should diverge");
771 }
772
773 // ----- (7) blocking engine smoke tests ----------------------------------
774
775 #[test]
776 fn retry_blocking_persistent_transient_uses_full_budget() {
777 let policy = small_policy();
778 let sleeper = RecordingSleeper::default();
779 let jitter = StubJitter(0.5);
780 let calls = AtomicUsize::new(0);
781
782 let result: Result<(), StoreError> = retry_blocking(&policy, &sleeper, &jitter, || {
783 calls.fetch_add(1, AtomicOrdering::SeqCst);
784 Err(transient_attempt(None))
785 });
786
787 assert!(result.is_err());
788 assert_eq!(
789 calls.load(AtomicOrdering::SeqCst),
790 policy.max_attempts as usize,
791 "blocking: op invoked max_attempts times"
792 );
793 assert_eq!(
794 sleeper.recorded().len(),
795 (policy.max_attempts - 1) as usize,
796 "blocking: max_attempts-1 sleeps"
797 );
798 }
799
800 #[test]
801 fn retry_blocking_success_after_k_transient() {
802 let policy = small_policy();
803 let sleeper = RecordingSleeper::default();
804 let jitter = StubJitter(0.5);
805 let calls = AtomicUsize::new(0);
806 let k = 2usize;
807
808 let result: Result<&str, StoreError> = retry_blocking(&policy, &sleeper, &jitter, || {
809 let prev = calls.fetch_add(1, AtomicOrdering::SeqCst);
810 if prev < k {
811 Err(transient_attempt(None))
812 } else {
813 Ok("done")
814 }
815 });
816
817 assert_eq!(result.unwrap(), "done");
818 assert_eq!(calls.load(AtomicOrdering::SeqCst), k + 1);
819 assert_eq!(sleeper.recorded().len(), k, "blocking: k sleeps");
820 }
821
822 #[test]
823 fn default_policy_values() {
824 let p = RetryPolicy::default();
825 assert_eq!(p.max_attempts, 5);
826 assert_eq!(p.base, Duration::from_millis(250));
827 assert_eq!(p.cap, Duration::from_secs(30));
828 }
829}