Skip to main content

snapdir_stores/
retry.rs

1//! Reusable retry/backoff engine for the network stores.
2//!
3//! This module is the **core** of snapdir's transient-failure retry policy: an
4//! SDK-agnostic loop that re-runs a fallible operation under a *full-jitter*
5//! exponential backoff schedule, honouring a server-supplied `Retry-After`
6//! hint. Like [`crate::adaptive`], it performs **no** real I/O and reads **no**
7//! real clock itself — the two impure dependencies (the jitter source and the
8//! sleep) are *injected* via the [`Jitter`] and [`AsyncSleeper`] /
9//! [`BlockingSleeper`] traits, so the engine and its backoff math are fully
10//! deterministic under test.
11//!
12//! Three pieces:
13//!
14//! - [`RetryPolicy`] — the schedule (attempt budget + base/cap durations) and
15//!   the [`backoff`](RetryPolicy::backoff) math (full-jitter exp with a server
16//!   hint floor).
17//! - [`Jitter`] — a `[0,1)` source. Production: [`DefaultJitter`], a tiny
18//!   hand-rolled **`SplitMix64`** PRNG (no new dependency). Tests inject a fixed
19//!   value.
20//! - [`AsyncSleeper`] / [`BlockingSleeper`] — the (async/blocking) sleep, plus
21//!   production impls ([`TokioSleeper`] / [`ThreadSleeper`]) and a recording
22//!   test fake.
23//!
24//! The engine itself — [`retry_async`] / [`retry_blocking`] — drives an
25//! operation that yields `Ok(T)` or `Err(`[`Attempt`]`)`; an [`Attempt`] carries
26//! the [`StoreError`], whether it is `transient`, and an optional
27//! `retry_after`. The next gate (`stores-backoff-wire`) feeds those from
28//! [`classify_error`](crate::classify_error) at the call sites; this gate ships
29//! only the engine + its unit tests and wires into nothing.
30
31// The backoff math works in `f64` seconds-space and the jitter PRNG maps a
32// `u64` to a `[0,1)` double; both are *advisory* timing signals (never a
33// correctness path), so the pedantic cast lints are allowed module-wide,
34// matching the `adaptive` controller's convention.
35#![allow(
36    clippy::cast_precision_loss,
37    clippy::cast_possible_truncation,
38    clippy::cast_possible_wrap,
39    clippy::cast_sign_loss
40)]
41
42use std::future::Future;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::time::{Duration, SystemTime, UNIX_EPOCH};
45
46use snapdir_core::StoreError;
47
48use crate::transfer::RateLimiter;
49
50// ---------------------------------------------------------------------------
51// RetryPolicy + backoff math.
52// ---------------------------------------------------------------------------
53
54/// The retry schedule: how many attempts to make and the exponential-backoff
55/// base/cap bounding each inter-attempt delay.
56#[derive(Clone, Copy, Debug, PartialEq, Eq)]
57pub struct RetryPolicy {
58    /// Total number of attempts, **including the first** (so `max_attempts = 5`
59    /// means one try plus up to four retries). Clamped to at least 1 by the
60    /// engine.
61    pub max_attempts: u32,
62    /// The base delay; the un-jittered exponential is `base × 2^n` for the
63    /// 0-based attempt index `n`.
64    pub base: Duration,
65    /// The hard ceiling on the un-jittered exponential (and therefore on the
66    /// jittered delay, absent a larger server hint).
67    pub cap: Duration,
68}
69
70impl Default for RetryPolicy {
71    fn default() -> Self {
72        Self {
73            max_attempts: 5,
74            base: Duration::from_millis(250),
75            cap: Duration::from_secs(30),
76        }
77    }
78}
79
80impl RetryPolicy {
81    /// Computes the inter-attempt delay before the retry that follows the
82    /// 0-based failed-attempt index `n`, under full-jitter exponential backoff.
83    ///
84    /// 1. `exp = min(cap, base × 2^n)` — the un-jittered envelope, saturated at
85    ///    `cap` (the `2^n` growth is computed in `f64` and clamped, so it can
86    ///    never overflow).
87    /// 2. `jittered = jitter01 × exp` — full jitter spreads the delay uniformly
88    ///    over `[0, exp)` (`jitter01 ∈ [0, 1)` is supplied by the [`Jitter`]
89    ///    source so tests are deterministic).
90    /// 3. `delay = max(server_hint.unwrap_or(0), jittered)` — a server
91    ///    `Retry-After` acts as a floor: we never retry sooner than the server
92    ///    asked, but may wait longer if the jittered exponential is larger.
93    #[must_use]
94    fn backoff(&self, n: u32, server_hint: Option<Duration>, jitter01: f64) -> Duration {
95        let cap = self.cap;
96        // exp = min(cap, base * 2^n), overflow-safe.
97        // `2f64.powi` saturates to +inf for large n; min(cap_secs, ..) clamps it
98        // before it ever leaves f64 space, so the cast back to Duration is safe.
99        let base_secs = self.base.as_secs_f64();
100        let cap_secs = cap.as_secs_f64();
101        // n as i32 for powi; for n beyond i32::MAX (impossible here) saturate.
102        let exp_secs = if n >= 1024 {
103            // 2^1024 already overflows f64 to +inf; short-circuit to the cap.
104            cap_secs
105        } else {
106            let factor = 2f64.powi(n as i32);
107            (base_secs * factor).min(cap_secs)
108        };
109        // Defensive clamp: NaN/negative -> 0, and never exceed the cap.
110        let exp_secs = if exp_secs.is_finite() {
111            exp_secs.clamp(0.0, cap_secs)
112        } else {
113            cap_secs
114        };
115        // Full jitter over [0, exp). jitter01 is contractually in [0,1); clamp
116        // defensively so a misbehaving source can never exceed the envelope.
117        let frac = if jitter01.is_finite() {
118            jitter01.clamp(0.0, 1.0)
119        } else {
120            0.0
121        };
122        let jittered = Duration::from_secs_f64(exp_secs * frac);
123
124        // Server hint is a floor (we honour "wait at least this long"). Without
125        // a hint the delay is `jittered` (<= exp <= cap); with a hint it is
126        // `max(hint, jittered)`, which the hint is allowed to push above cap.
127        match server_hint {
128            Some(hint) => hint.max(jittered),
129            None => jittered,
130        }
131    }
132}
133
134// ---------------------------------------------------------------------------
135// Jitter — [0,1) source (production SplitMix64 + injectable test stub).
136// ---------------------------------------------------------------------------
137
138/// A source of uniform `[0, 1)` fractions used to spread retry delays
139/// (full-jitter). Injected so the backoff sequence is deterministic in tests.
140pub trait Jitter {
141    /// Returns the next jitter fraction in `[0, 1)`.
142    fn jitter01(&self) -> f64;
143}
144
145/// Process-wide seed counter mixed into each [`DefaultJitter`] so two instances
146/// constructed in the same nanosecond still diverge.
147static JITTER_SEED_COUNTER: AtomicU64 = AtomicU64::new(0);
148
149/// Production [`Jitter`]: a tiny, hand-rolled **`SplitMix64`** PRNG (no external
150/// dependency).
151///
152/// Seeded once at construction from a one-time nanosecond clock sample `XOR`ed
153/// with a monotonically advancing process counter; each
154/// [`jitter01`](Jitter::jitter01) advances the 64-bit state and maps the top 53
155/// bits to a `[0, 1)` double. `SplitMix64` is the standard seeding PRNG (it is
156/// what `rand`'s `SeedableRng::seed_from_u64` uses internally) and is more than
157/// adequate for *jitter* — this is decorrelation, not cryptography.
158///
159/// Interior mutability ([`AtomicU64`]) lets `jitter01` take `&self` (matching
160/// the trait) while still advancing the stream, so a single instance can be
161/// shared across the retry loop.
162#[derive(Debug)]
163pub struct DefaultJitter {
164    state: AtomicU64,
165}
166
167impl DefaultJitter {
168    /// Builds a jitter source seeded from the current nanosecond clock `XOR`ed
169    /// with a process-wide counter (so concurrent constructions diverge).
170    #[must_use]
171    pub fn new() -> Self {
172        let nanos = SystemTime::now()
173            .duration_since(UNIX_EPOCH)
174            .map_or(0, |d| d.as_nanos() as u64);
175        let counter = JITTER_SEED_COUNTER.fetch_add(1, Ordering::Relaxed);
176        // Mix the counter in via SplitMix64 so adjacent seeds aren't adjacent
177        // states.
178        let seed = nanos ^ splitmix64_mix(counter.wrapping_add(0x9E37_79B9_7F4A_7C15));
179        Self {
180            state: AtomicU64::new(seed),
181        }
182    }
183}
184
185impl Default for DefaultJitter {
186    fn default() -> Self {
187        Self::new()
188    }
189}
190
191impl Jitter for DefaultJitter {
192    fn jitter01(&self) -> f64 {
193        // Advance the SplitMix64 state and map the result to [0,1).
194        let z = self
195            .state
196            .fetch_add(0x9E37_79B9_7F4A_7C15, Ordering::Relaxed)
197            .wrapping_add(0x9E37_79B9_7F4A_7C15);
198        let bits = splitmix64_mix(z);
199        u64_to_unit_f64(bits)
200    }
201}
202
203/// The `SplitMix64` finalizing mix (the avalanche step of the `SplitMix64` PRNG).
204#[inline]
205fn splitmix64_mix(mut z: u64) -> u64 {
206    z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
207    z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
208    z ^ (z >> 31)
209}
210
211/// Maps a uniformly random `u64` to a `[0, 1)` double using its top 53 bits
212/// (the mantissa width), the standard unbiased construction.
213#[inline]
214fn u64_to_unit_f64(bits: u64) -> f64 {
215    // 53 high bits / 2^53 lands in [0, 1).
216    ((bits >> 11) as f64) / (1u64 << 53) as f64
217}
218
219/// A deterministic [`Jitter`] for tests: always returns the same fixed
220/// fraction (clamped to `[0, 1)`).
221#[derive(Clone, Copy, Debug)]
222pub struct FixedJitter(pub f64);
223
224impl Jitter for FixedJitter {
225    fn jitter01(&self) -> f64 {
226        // Keep within the [0,1) contract (strictly below 1.0).
227        self.0.clamp(0.0, 1.0 - f64::EPSILON)
228    }
229}
230
231// ---------------------------------------------------------------------------
232// Sleeper — async + blocking sleep (production + recording test fake).
233// ---------------------------------------------------------------------------
234
235/// An async sleep, injected so the [`retry_async`] engine can be driven without
236/// real waits in tests.
237pub trait AsyncSleeper {
238    /// Sleeps for `dur` (asynchronously).
239    fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send;
240}
241
242/// An OS-thread-blocking sleep, injected so the [`retry_blocking`] engine can be
243/// driven without real waits in tests. Mirrors the blocking transfer path
244/// ([`BlockingRateLimiter`](crate::BlockingRateLimiter)), which parks the thread
245/// with [`std::thread::sleep`].
246pub trait BlockingSleeper {
247    /// Sleeps for `dur` (parks the calling thread).
248    fn sleep(&self, dur: Duration);
249}
250
251/// Production [`AsyncSleeper`] backed by [`tokio::time::sleep`].
252#[derive(Clone, Copy, Debug, Default)]
253pub struct TokioSleeper;
254
255impl AsyncSleeper for TokioSleeper {
256    fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send {
257        tokio::time::sleep(dur)
258    }
259}
260
261/// Production [`BlockingSleeper`] backed by [`std::thread::sleep`].
262#[derive(Clone, Copy, Debug, Default)]
263pub struct ThreadSleeper;
264
265impl BlockingSleeper for ThreadSleeper {
266    fn sleep(&self, dur: Duration) {
267        std::thread::sleep(dur);
268    }
269}
270
271// ---------------------------------------------------------------------------
272// Engine — SDK-agnostic over an attempt outcome.
273// ---------------------------------------------------------------------------
274
275/// The outcome of a single failed attempt, handed back to the retry engine.
276///
277/// The caller (a store operation) decides whether the failure is `transient`
278/// (worth retrying) and threads through any server `retry_after` hint. The
279/// next gate populates these from
280/// [`classify_error`](crate::classify_error); the engine itself is SDK-agnostic.
281#[derive(Debug)]
282pub struct Attempt {
283    /// The error this attempt produced (surfaced to the caller if retries are
284    /// exhausted or the error is non-transient).
285    pub err: StoreError,
286    /// Whether the error is transient (throttle / timeout-class) and therefore
287    /// retryable.
288    pub transient: bool,
289    /// An optional server-supplied minimum delay (`Retry-After`); acts as a
290    /// floor on the next backoff.
291    pub retry_after: Option<Duration>,
292}
293
294/// The most attempts the engine will ever make (defensive clamp on
295/// [`RetryPolicy::max_attempts`]).
296#[inline]
297fn clamped_max_attempts(policy: &RetryPolicy) -> u32 {
298    policy.max_attempts.max(1)
299}
300
301/// Runs `op` under `policy`'s full-jitter backoff, retrying transient failures.
302///
303/// The operation is invoked up to `policy.max_attempts` times. On `Ok(T)` the
304/// value is returned immediately. On `Err(`[`Attempt`]`)`: if the attempt is
305/// `transient` **and** attempts remain, the engine sleeps for
306/// [`RetryPolicy::backoff`] (with the attempt's `retry_after` hint and a fresh
307/// jitter sample) via `sleeper`, then retries; otherwise the attempt's
308/// [`StoreError`] is returned. A non-transient error short-circuits on the
309/// first failure (no sleep).
310///
311/// `sleeper` and `jitter` are injected so tests can record the delay sequence
312/// without real waits.
313///
314/// # Errors
315///
316/// Returns the last [`Attempt`]'s [`StoreError`] when the operation does not
317/// succeed within the attempt budget (or fails non-transiently).
318pub async fn retry_async<T, F, Fut>(
319    policy: &RetryPolicy,
320    sleeper: &impl AsyncSleeper,
321    jitter: &impl Jitter,
322    mut op: F,
323) -> Result<T, StoreError>
324where
325    F: FnMut() -> Fut,
326    Fut: Future<Output = Result<T, Attempt>>,
327{
328    let max = clamped_max_attempts(policy);
329    let mut n: u32 = 0;
330    loop {
331        match op().await {
332            Ok(value) => return Ok(value),
333            Err(attempt) => {
334                let attempts_used = n + 1;
335                if attempt.transient && attempts_used < max {
336                    let delay = policy.backoff(n, attempt.retry_after, jitter.jitter01());
337                    sleeper.sleep(delay).await;
338                    n += 1;
339                } else {
340                    return Err(attempt.err);
341                }
342            }
343        }
344    }
345}
346
347/// Blocking sibling of [`retry_async`]: same control flow over a synchronous
348/// `op`, sleeping on the calling thread via a [`BlockingSleeper`]. Used by the
349/// rayon store-to-store sync path.
350///
351/// # Errors
352///
353/// Returns the last [`Attempt`]'s [`StoreError`] when the operation does not
354/// succeed within the attempt budget (or fails non-transiently).
355pub fn retry_blocking<T, F>(
356    policy: &RetryPolicy,
357    sleeper: &impl BlockingSleeper,
358    jitter: &impl Jitter,
359    mut op: F,
360) -> Result<T, StoreError>
361where
362    F: FnMut() -> Result<T, Attempt>,
363{
364    let max = clamped_max_attempts(policy);
365    let mut n: u32 = 0;
366    loop {
367        match op() {
368            Ok(value) => return Ok(value),
369            Err(attempt) => {
370                let attempts_used = n + 1;
371                if attempt.transient && attempts_used < max {
372                    let delay = policy.backoff(n, attempt.retry_after, jitter.jitter01());
373                    sleeper.sleep(delay);
374                    n += 1;
375                } else {
376                    return Err(attempt.err);
377                }
378            }
379        }
380    }
381}
382
383// ---------------------------------------------------------------------------
384// retry_network — the per-call boundary the network stores wrap each SDK call
385// in: acquire one request-rate token, then run the operation under the retry
386// engine. This is the single injectable seam the `backoff_wire` tests drive
387// (with a scripted `op` + recording sleeper), so the wiring is exercised with
388// NO live cloud.
389// ---------------------------------------------------------------------------
390
391/// Runs a single network operation through the full retry/limiter wiring.
392///
393/// Before **every** attempt this acquires one token from `req_limiter` (the
394/// per-call request-rate bucket — `acquire(1)`, a no-op when the limiter is
395/// unlimited), then drives `op` under [`retry_async`] with `policy`'s
396/// full-jitter backoff. `op` is the SDK call: it returns `Ok(T)` on success or
397/// `Err(`[`Attempt`]`)` carrying the mapped [`StoreError`], whether the failure
398/// is `transient`, and any server `retry_after` hint extracted from the
399/// concrete SDK error.
400///
401/// The real stores pass an `op` that performs the SDK call and builds the
402/// `Attempt` on error (via the per-SDK `*_attempt_from_err` extractors); the
403/// `backoff_wire` tests pass an `op` that replays a scripted sequence of
404/// `Attempt`s, so the retry/limiter wiring is exercised without any network.
405///
406/// # Errors
407///
408/// Returns the last [`Attempt`]'s [`StoreError`] when `op` does not succeed
409/// within the attempt budget (or fails non-transiently).
410pub async fn retry_network<T, F, Fut>(
411    policy: &RetryPolicy,
412    req_limiter: &RateLimiter,
413    sleeper: &impl AsyncSleeper,
414    jitter: &impl Jitter,
415    mut op: F,
416) -> Result<T, StoreError>
417where
418    F: FnMut() -> Fut,
419    Fut: Future<Output = Result<T, Attempt>>,
420{
421    // This mirrors [`retry_async`]'s control flow exactly, but inlines the
422    // per-attempt request-token acquire so it can call `op` directly each
423    // iteration (the `FnMut() -> Fut` bound on `retry_async` is non-lending, so
424    // an `op` wrapper future that re-borrows `op`/`req_limiter` cannot escape
425    // its closure — driving the loop here sidesteps that).
426    let max = clamped_max_attempts(policy);
427    let mut n: u32 = 0;
428    loop {
429        // Pace each (re)try through the per-call request-rate limiter (a no-op
430        // when unlimited), then run the operation.
431        req_limiter.acquire(1).await;
432        match op().await {
433            Ok(value) => return Ok(value),
434            Err(attempt) => {
435                let attempts_used = n + 1;
436                if attempt.transient && attempts_used < max {
437                    let delay = policy.backoff(n, attempt.retry_after, jitter.jitter01());
438                    sleeper.sleep(delay).await;
439                    n += 1;
440                } else {
441                    return Err(attempt.err);
442                }
443            }
444        }
445    }
446}
447
448/// Parses an HTTP `Retry-After` header value into a [`Duration`].
449///
450/// Honours the **delta-seconds** form (`Retry-After: 120`) — by far the common
451/// case for `429`/`503` throttling from S3/GCS — returning `Some(120s)`. The
452/// HTTP-date form is intentionally NOT parsed (it would need a date crate and a
453/// wall-clock read; the backoff engine already provides a sane delay when the
454/// hint is absent), so a non-numeric value yields `None`. A value of `0` maps
455/// to `Some(Duration::ZERO)` (a valid "retry now" floor).
456#[must_use]
457pub fn parse_retry_after(value: &str) -> Option<Duration> {
458    let trimmed = value.trim();
459    trimmed.parse::<u64>().ok().map(Duration::from_secs)
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465    use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
466    use std::sync::Mutex;
467
468    // ----- helpers ----------------------------------------------------------
469
470    /// A recording fake [`AsyncSleeper`] / [`BlockingSleeper`]: never sleeps for
471    /// real, just appends each requested duration so tests can assert the exact
472    /// backoff sequence.
473    #[derive(Default)]
474    struct RecordingSleeper {
475        delays: Mutex<Vec<Duration>>,
476    }
477
478    impl RecordingSleeper {
479        fn recorded(&self) -> Vec<Duration> {
480            self.delays.lock().unwrap().clone()
481        }
482    }
483
484    impl AsyncSleeper for RecordingSleeper {
485        fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send {
486            self.delays.lock().unwrap().push(dur);
487            std::future::ready(())
488        }
489    }
490
491    impl BlockingSleeper for RecordingSleeper {
492        fn sleep(&self, dur: Duration) {
493            self.delays.lock().unwrap().push(dur);
494        }
495    }
496
497    /// A deterministic [`Jitter`] returning a fixed fraction (no clamping
498    /// surprises — kept strictly in `[0,1)`).
499    struct StubJitter(f64);
500    impl Jitter for StubJitter {
501        fn jitter01(&self) -> f64 {
502            self.0
503        }
504    }
505
506    fn boom() -> StoreError {
507        StoreError::Backend {
508            message: "boom".into(),
509            source: None,
510        }
511    }
512
513    fn transient_attempt(retry_after: Option<Duration>) -> Attempt {
514        Attempt {
515            err: boom(),
516            transient: true,
517            retry_after,
518        }
519    }
520
521    fn hard_attempt() -> Attempt {
522        Attempt {
523            err: boom(),
524            transient: false,
525            retry_after: None,
526        }
527    }
528
529    fn runtime() -> tokio::runtime::Runtime {
530        tokio::runtime::Builder::new_current_thread()
531            .enable_time()
532            .build()
533            .expect("build tokio runtime")
534    }
535
536    fn small_policy() -> RetryPolicy {
537        // 5 attempts (=> up to 4 sleeps), tiny base/cap so jittered delays are
538        // easy to reason about.
539        RetryPolicy {
540            max_attempts: 5,
541            base: Duration::from_millis(100),
542            cap: Duration::from_secs(10),
543        }
544    }
545
546    // ----- (1) exact attempt count on persistent transient failure ----------
547
548    #[test]
549    fn retry_async_persistent_transient_uses_full_attempt_budget() {
550        let rt = runtime();
551        rt.block_on(async {
552            let policy = small_policy();
553            let sleeper = RecordingSleeper::default();
554            let jitter = StubJitter(0.5);
555            let calls = AtomicUsize::new(0);
556
557            let result: Result<(), StoreError> = retry_async(&policy, &sleeper, &jitter, || {
558                calls.fetch_add(1, AtomicOrdering::SeqCst);
559                async { Err(transient_attempt(None)) }
560            })
561            .await;
562
563            assert!(result.is_err(), "persistent transient => surfaces the err");
564            assert_eq!(
565                calls.load(AtomicOrdering::SeqCst),
566                policy.max_attempts as usize,
567                "op invoked exactly max_attempts times"
568            );
569            assert_eq!(
570                sleeper.recorded().len(),
571                (policy.max_attempts - 1) as usize,
572                "max_attempts-1 sleeps recorded (no sleep after the last attempt)"
573            );
574        });
575    }
576
577    // ----- (2) immediate return on hard (non-transient) error ---------------
578
579    #[test]
580    fn retry_async_hard_error_returns_immediately_without_sleeping() {
581        let rt = runtime();
582        rt.block_on(async {
583            let policy = small_policy();
584            let sleeper = RecordingSleeper::default();
585            let jitter = StubJitter(0.5);
586            let calls = AtomicUsize::new(0);
587
588            let result: Result<(), StoreError> = retry_async(&policy, &sleeper, &jitter, || {
589                calls.fetch_add(1, AtomicOrdering::SeqCst);
590                async { Err(hard_attempt()) }
591            })
592            .await;
593
594            assert!(result.is_err(), "hard error surfaces");
595            assert_eq!(
596                calls.load(AtomicOrdering::SeqCst),
597                1,
598                "non-transient error => op called exactly once"
599            );
600            assert!(
601                sleeper.recorded().is_empty(),
602                "no sleeps for a non-transient error"
603            );
604        });
605    }
606
607    // ----- (3) success after k transient failures ---------------------------
608
609    #[test]
610    fn retry_async_success_after_k_transient_fails() {
611        let rt = runtime();
612        rt.block_on(async {
613            let policy = small_policy();
614            let sleeper = RecordingSleeper::default();
615            let jitter = StubJitter(0.25);
616            let calls = AtomicUsize::new(0);
617            let k = 3usize;
618
619            let result: Result<u32, StoreError> = retry_async(&policy, &sleeper, &jitter, || {
620                let prev = calls.fetch_add(1, AtomicOrdering::SeqCst);
621                async move {
622                    if prev < k {
623                        Err(transient_attempt(None))
624                    } else {
625                        Ok(42)
626                    }
627                }
628            })
629            .await;
630
631            assert_eq!(result.unwrap(), 42, "succeeds on the (k+1)-th attempt");
632            assert_eq!(
633                calls.load(AtomicOrdering::SeqCst),
634                k + 1,
635                "op invoked k+1 times"
636            );
637            assert_eq!(
638                sleeper.recorded().len(),
639                k,
640                "exactly k sleeps recorded (one before each retry)"
641            );
642        });
643    }
644
645    // ----- (4) Retry-After honoured as a floor ------------------------------
646
647    #[test]
648    fn retry_async_retry_after_is_a_floor() {
649        let rt = runtime();
650        rt.block_on(async {
651            let policy = small_policy();
652            let sleeper = RecordingSleeper::default();
653            // jitter01 = 0 => jittered exp = 0, so the hint dominates entirely.
654            let jitter = StubJitter(0.0);
655            // A hint far larger than the first jittered exp (base*2^0 = 100ms).
656            let hint = Duration::from_secs(5);
657            let calls = AtomicUsize::new(0);
658
659            let _result: Result<(), StoreError> = retry_async(&policy, &sleeper, &jitter, || {
660                calls.fetch_add(1, AtomicOrdering::SeqCst);
661                async move { Err(transient_attempt(Some(hint))) }
662            })
663            .await;
664
665            let recorded = sleeper.recorded();
666            assert!(!recorded.is_empty(), "at least one retry happened");
667            for d in &recorded {
668                assert!(
669                    *d >= hint,
670                    "recorded delay {d:?} must be >= the server hint {hint:?}"
671                );
672            }
673        });
674    }
675
676    // ----- (5) cap respected (no hint) --------------------------------------
677
678    #[test]
679    fn retry_async_cap_respected_for_large_n() {
680        let rt = runtime();
681        rt.block_on(async {
682            // Many attempts so n grows large; base*2^n would blow past cap.
683            let policy = RetryPolicy {
684                max_attempts: 12,
685                base: Duration::from_millis(250),
686                cap: Duration::from_secs(2),
687            };
688            let sleeper = RecordingSleeper::default();
689            // jitter01 just below 1 => jittered ~ exp, the worst case for the cap.
690            let jitter = StubJitter(0.999_999);
691
692            let _result: Result<(), StoreError> =
693                retry_async(&policy, &sleeper, &jitter, || async {
694                    Err(transient_attempt(None))
695                })
696                .await;
697
698            for d in sleeper.recorded() {
699                assert!(
700                    d <= policy.cap,
701                    "delay {d:?} must never exceed cap {:?} (no hint)",
702                    policy.cap
703                );
704            }
705        });
706    }
707
708    // ----- (6) full-jitter bounds: delay == jitter01 * min(cap, base*2^n) ----
709
710    #[test]
711    fn backoff_full_jitter_lands_in_envelope() {
712        let policy = RetryPolicy {
713            max_attempts: 10,
714            base: Duration::from_millis(100),
715            cap: Duration::from_secs(30),
716        };
717        for n in 0..8u32 {
718            let frac = 0.37;
719            let delay = policy.backoff(n, None, frac);
720            let exp = policy
721                .base
722                .as_secs_f64()
723                .mul_add(2f64.powi(n as i32), 0.0)
724                .min(policy.cap.as_secs_f64());
725            let expected = exp * frac;
726            // delay == frac * min(cap, base*2^n) (within float tolerance).
727            assert!(
728                (delay.as_secs_f64() - expected).abs() < 1e-9,
729                "n={n}: delay {delay:?} != jitter01*envelope {expected}"
730            );
731            // and it lands in [0, min(cap, base*2^n)).
732            assert!(
733                delay.as_secs_f64() >= 0.0 && delay.as_secs_f64() < exp + 1e-9,
734                "n={n}: delay {delay:?} outside [0, {exp})"
735            );
736        }
737    }
738
739    #[test]
740    fn backoff_saturates_at_cap_for_huge_n() {
741        let policy = RetryPolicy {
742            max_attempts: 99,
743            base: Duration::from_millis(250),
744            cap: Duration::from_secs(30),
745        };
746        // jitter01 ~ 1 so the jittered delay tracks the (capped) envelope.
747        let d = policy.backoff(2000, None, 0.999_999);
748        assert!(
749            d <= policy.cap,
750            "huge n must saturate at the cap, got {d:?}"
751        );
752        assert!(
753            d.as_secs_f64() > 29.0,
754            "with jitter ~1 the delay should be near the cap, got {d:?}"
755        );
756    }
757
758    #[test]
759    fn default_jitter_is_in_unit_interval() {
760        let j = DefaultJitter::new();
761        for _ in 0..10_000 {
762            let x = j.jitter01();
763            assert!((0.0..1.0).contains(&x), "jitter {x} outside [0,1)");
764        }
765        // And two instances produce different streams (seed divergence).
766        let a = DefaultJitter::new();
767        let b = DefaultJitter::new();
768        let sa: Vec<f64> = (0..4).map(|_| a.jitter01()).collect();
769        let sb: Vec<f64> = (0..4).map(|_| b.jitter01()).collect();
770        assert_ne!(sa, sb, "distinct instances should diverge");
771    }
772
773    // ----- (7) blocking engine smoke tests ----------------------------------
774
775    #[test]
776    fn retry_blocking_persistent_transient_uses_full_budget() {
777        let policy = small_policy();
778        let sleeper = RecordingSleeper::default();
779        let jitter = StubJitter(0.5);
780        let calls = AtomicUsize::new(0);
781
782        let result: Result<(), StoreError> = retry_blocking(&policy, &sleeper, &jitter, || {
783            calls.fetch_add(1, AtomicOrdering::SeqCst);
784            Err(transient_attempt(None))
785        });
786
787        assert!(result.is_err());
788        assert_eq!(
789            calls.load(AtomicOrdering::SeqCst),
790            policy.max_attempts as usize,
791            "blocking: op invoked max_attempts times"
792        );
793        assert_eq!(
794            sleeper.recorded().len(),
795            (policy.max_attempts - 1) as usize,
796            "blocking: max_attempts-1 sleeps"
797        );
798    }
799
800    #[test]
801    fn retry_blocking_success_after_k_transient() {
802        let policy = small_policy();
803        let sleeper = RecordingSleeper::default();
804        let jitter = StubJitter(0.5);
805        let calls = AtomicUsize::new(0);
806        let k = 2usize;
807
808        let result: Result<&str, StoreError> = retry_blocking(&policy, &sleeper, &jitter, || {
809            let prev = calls.fetch_add(1, AtomicOrdering::SeqCst);
810            if prev < k {
811                Err(transient_attempt(None))
812            } else {
813                Ok("done")
814            }
815        });
816
817        assert_eq!(result.unwrap(), "done");
818        assert_eq!(calls.load(AtomicOrdering::SeqCst), k + 1);
819        assert_eq!(sleeper.recorded().len(), k, "blocking: k sleeps");
820    }
821
822    #[test]
823    fn default_policy_values() {
824        let p = RetryPolicy::default();
825        assert_eq!(p.max_attempts, 5);
826        assert_eq!(p.base, Duration::from_millis(250));
827        assert_eq!(p.cap, Duration::from_secs(30));
828    }
829}