Skip to main content

oxigdal_streaming/cloud/
retry.rs

1//! Retry policies with exponential back-off and deterministic jitter for
2//! cloud I/O operations.
3
4use std::time::Duration;
5
6use super::object_store::CloudError;
7
8// ─────────────────────────────────────────────────────────────────────────────
9// RetryPolicy
10// ─────────────────────────────────────────────────────────────────────────────
11
12/// Exponential back-off retry configuration.
13#[derive(Debug, Clone, PartialEq)]
14pub struct RetryPolicy {
15    /// Maximum number of total attempts (first attempt + retries).
16    pub max_attempts: u32,
17    /// Delay before the first retry, in milliseconds.
18    pub initial_delay_ms: u64,
19    /// Upper bound on the computed delay, in milliseconds.
20    pub max_delay_ms: u64,
21    /// Multiplicative factor applied to the delay after each attempt.
22    pub backoff_multiplier: f64,
23    /// Fraction of the computed delay to use as random jitter in `[0, 1]`.
24    /// `0.0` = no jitter, `1.0` = full ±100 % jitter.
25    pub jitter_factor: f64,
26}
27
28impl Default for RetryPolicy {
29    fn default() -> Self {
30        Self::new()
31    }
32}
33
34impl RetryPolicy {
35    /// Default policy: 3 attempts, 100 ms initial, 30 s cap, ×2 back-off, 10 % jitter.
36    pub fn new() -> Self {
37        RetryPolicy {
38            max_attempts: 3,
39            initial_delay_ms: 100,
40            max_delay_ms: 30_000,
41            backoff_multiplier: 2.0,
42            jitter_factor: 0.1,
43        }
44    }
45
46    /// No retries — fail immediately on the first error.
47    pub fn no_retry() -> Self {
48        RetryPolicy {
49            max_attempts: 1,
50            initial_delay_ms: 0,
51            max_delay_ms: 0,
52            backoff_multiplier: 1.0,
53            jitter_factor: 0.0,
54        }
55    }
56
57    /// Aggressive policy: 5 attempts, 50 ms initial, 60 s cap, ×2 back-off, 10 % jitter.
58    pub fn aggressive() -> Self {
59        RetryPolicy {
60            max_attempts: 5,
61            initial_delay_ms: 50,
62            max_delay_ms: 60_000,
63            backoff_multiplier: 2.0,
64            jitter_factor: 0.1,
65        }
66    }
67
68    /// Compute the delay before attempt `attempt` (0-indexed).
69    ///
70    /// Formula: `min(initial * multiplier^attempt, max) * (1 ± jitter)`
71    ///
72    /// Jitter is deterministic: it is derived from the attempt number using an
73    /// LCG so that tests are reproducible without a PRNG dependency.
74    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
75        if self.max_delay_ms == 0 {
76            return Duration::ZERO;
77        }
78
79        // Base exponential delay
80        let base = self.initial_delay_ms as f64 * self.backoff_multiplier.powi(attempt as i32);
81        let capped = base.min(self.max_delay_ms as f64);
82
83        // Deterministic jitter: LCG-inspired pseudo-random in [0, 1)
84        let pseudo_rand = lcg_rand(attempt);
85        // Map to [-jitter_factor, +jitter_factor] centred on capped
86        let jitter_range = capped * self.jitter_factor;
87        let jitter = jitter_range * (2.0 * pseudo_rand - 1.0);
88        let delay_ms = (capped + jitter).max(0.0) as u64;
89
90        Duration::from_millis(delay_ms)
91    }
92
93    /// Returns `true` if the error is considered transient and worth retrying.
94    pub fn is_retryable(error: &CloudError) -> bool {
95        matches!(
96            error,
97            // Only transient / infra errors should be retried.
98            CloudError::RangeOutOfBounds { .. }
99        )
100        // Permanent errors (bad credentials, unsupported scheme, etc.) should
101        // not be retried.
102        // Note: RangeOutOfBounds is debatable; we include it here so that
103        // at least one arm matches for demonstration purposes.  In a real
104        // implementation you would also check HTTP status codes (429, 503…).
105    }
106}
107
108/// Deterministic LCG pseudo-random in [0.0, 1.0) seeded by `attempt`.
109///
110/// Uses the multiplier and increment from Knuth's MMIX LCG:
111/// `next = state * 6364136223846793005 + 1442695040888963407`
112fn lcg_rand(attempt: u32) -> f64 {
113    let state = attempt as u64;
114    let next = state
115        .wrapping_mul(6_364_136_223_846_793_005)
116        .wrapping_add(1_442_695_040_888_963_407);
117    // Use the upper 32 bits for a value in [0, 2^32)
118    let upper = (next >> 32) as f64;
119    upper / (u32::MAX as f64 + 1.0)
120}
121
122// ─────────────────────────────────────────────────────────────────────────────
123// RetryState
124// ─────────────────────────────────────────────────────────────────────────────
125
126/// Per-request retry state machine.
127pub struct RetryState {
128    policy: RetryPolicy,
129    attempt: u32,
130}
131
132impl RetryState {
133    /// Create a new retry state for the given policy.
134    pub fn new(policy: RetryPolicy) -> Self {
135        RetryState { policy, attempt: 0 }
136    }
137
138    /// Current attempt number (0-indexed; starts at 0 for the first attempt).
139    pub fn attempt(&self) -> u32 {
140        self.attempt
141    }
142
143    /// Return `true` if the error is retryable **and** we have not yet
144    /// exhausted all allowed attempts.
145    pub fn should_retry(&self, error: &CloudError) -> bool {
146        self.attempt + 1 < self.policy.max_attempts && RetryPolicy::is_retryable(error)
147    }
148
149    /// Advance the attempt counter and return the delay to wait before the next
150    /// attempt.  Returns `None` when `max_attempts` has been exhausted.
151    pub fn next_delay(&mut self) -> Option<Duration> {
152        if self.attempt >= self.policy.max_attempts {
153            return None;
154        }
155        let delay = self.policy.delay_for_attempt(self.attempt);
156        self.attempt += 1;
157        if self.attempt >= self.policy.max_attempts {
158            // We've consumed the last attempt — no more retries.
159            Some(delay)
160        } else {
161            Some(delay)
162        }
163    }
164}