oxigdal_streaming/cloud/retry.rs
1//! Retry policies with exponential back-off and deterministic jitter for
2//! cloud I/O operations.
3
4use std::time::Duration;
5
6use super::object_store::CloudError;
7
8// ─────────────────────────────────────────────────────────────────────────────
9// RetryPolicy
10// ─────────────────────────────────────────────────────────────────────────────
11
12/// Exponential back-off retry configuration.
13#[derive(Debug, Clone, PartialEq)]
14pub struct RetryPolicy {
15 /// Maximum number of total attempts (first attempt + retries).
16 pub max_attempts: u32,
17 /// Delay before the first retry, in milliseconds.
18 pub initial_delay_ms: u64,
19 /// Upper bound on the computed delay, in milliseconds.
20 pub max_delay_ms: u64,
21 /// Multiplicative factor applied to the delay after each attempt.
22 pub backoff_multiplier: f64,
23 /// Fraction of the computed delay to use as random jitter in `[0, 1]`.
24 /// `0.0` = no jitter, `1.0` = full ±100 % jitter.
25 pub jitter_factor: f64,
26}
27
28impl Default for RetryPolicy {
29 fn default() -> Self {
30 Self::new()
31 }
32}
33
34impl RetryPolicy {
35 /// Default policy: 3 attempts, 100 ms initial, 30 s cap, ×2 back-off, 10 % jitter.
36 pub fn new() -> Self {
37 RetryPolicy {
38 max_attempts: 3,
39 initial_delay_ms: 100,
40 max_delay_ms: 30_000,
41 backoff_multiplier: 2.0,
42 jitter_factor: 0.1,
43 }
44 }
45
46 /// No retries — fail immediately on the first error.
47 pub fn no_retry() -> Self {
48 RetryPolicy {
49 max_attempts: 1,
50 initial_delay_ms: 0,
51 max_delay_ms: 0,
52 backoff_multiplier: 1.0,
53 jitter_factor: 0.0,
54 }
55 }
56
57 /// Aggressive policy: 5 attempts, 50 ms initial, 60 s cap, ×2 back-off, 10 % jitter.
58 pub fn aggressive() -> Self {
59 RetryPolicy {
60 max_attempts: 5,
61 initial_delay_ms: 50,
62 max_delay_ms: 60_000,
63 backoff_multiplier: 2.0,
64 jitter_factor: 0.1,
65 }
66 }
67
68 /// Compute the delay before attempt `attempt` (0-indexed).
69 ///
70 /// Formula: `min(initial * multiplier^attempt, max) * (1 ± jitter)`
71 ///
72 /// Jitter is deterministic: it is derived from the attempt number using an
73 /// LCG so that tests are reproducible without a PRNG dependency.
74 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
75 if self.max_delay_ms == 0 {
76 return Duration::ZERO;
77 }
78
79 // Base exponential delay
80 let base = self.initial_delay_ms as f64 * self.backoff_multiplier.powi(attempt as i32);
81 let capped = base.min(self.max_delay_ms as f64);
82
83 // Deterministic jitter: LCG-inspired pseudo-random in [0, 1)
84 let pseudo_rand = lcg_rand(attempt);
85 // Map to [-jitter_factor, +jitter_factor] centred on capped
86 let jitter_range = capped * self.jitter_factor;
87 let jitter = jitter_range * (2.0 * pseudo_rand - 1.0);
88 let delay_ms = (capped + jitter).max(0.0) as u64;
89
90 Duration::from_millis(delay_ms)
91 }
92
93 /// Returns `true` if the error is considered transient and worth retrying.
94 pub fn is_retryable(error: &CloudError) -> bool {
95 matches!(
96 error,
97 // Only transient / infra errors should be retried.
98 CloudError::RangeOutOfBounds { .. }
99 )
100 // Permanent errors (bad credentials, unsupported scheme, etc.) should
101 // not be retried.
102 // Note: RangeOutOfBounds is debatable; we include it here so that
103 // at least one arm matches for demonstration purposes. In a real
104 // implementation you would also check HTTP status codes (429, 503…).
105 }
106}
107
108/// Deterministic LCG pseudo-random in [0.0, 1.0) seeded by `attempt`.
109///
110/// Uses the multiplier and increment from Knuth's MMIX LCG:
111/// `next = state * 6364136223846793005 + 1442695040888963407`
112fn lcg_rand(attempt: u32) -> f64 {
113 let state = attempt as u64;
114 let next = state
115 .wrapping_mul(6_364_136_223_846_793_005)
116 .wrapping_add(1_442_695_040_888_963_407);
117 // Use the upper 32 bits for a value in [0, 2^32)
118 let upper = (next >> 32) as f64;
119 upper / (u32::MAX as f64 + 1.0)
120}
121
122// ─────────────────────────────────────────────────────────────────────────────
123// RetryState
124// ─────────────────────────────────────────────────────────────────────────────
125
126/// Per-request retry state machine.
127pub struct RetryState {
128 policy: RetryPolicy,
129 attempt: u32,
130}
131
132impl RetryState {
133 /// Create a new retry state for the given policy.
134 pub fn new(policy: RetryPolicy) -> Self {
135 RetryState { policy, attempt: 0 }
136 }
137
138 /// Current attempt number (0-indexed; starts at 0 for the first attempt).
139 pub fn attempt(&self) -> u32 {
140 self.attempt
141 }
142
143 /// Return `true` if the error is retryable **and** we have not yet
144 /// exhausted all allowed attempts.
145 pub fn should_retry(&self, error: &CloudError) -> bool {
146 self.attempt + 1 < self.policy.max_attempts && RetryPolicy::is_retryable(error)
147 }
148
149 /// Advance the attempt counter and return the delay to wait before the next
150 /// attempt. Returns `None` when `max_attempts` has been exhausted.
151 pub fn next_delay(&mut self) -> Option<Duration> {
152 if self.attempt >= self.policy.max_attempts {
153 return None;
154 }
155 let delay = self.policy.delay_for_attempt(self.attempt);
156 self.attempt += 1;
157 if self.attempt >= self.policy.max_attempts {
158 // We've consumed the last attempt — no more retries.
159 Some(delay)
160 } else {
161 Some(delay)
162 }
163 }
164}