Skip to main content

sqlite_graphrag/
retry.rs

1//! Centralized retry infrastructure with exponential backoff and half-jitter.
2//!
3//! Provides [`RetryConfig`](crate::retry::RetryConfig) with named constructors for each failure domain
4//! (SQLite BUSY, LLM rate-limit, cold-start) and a [`compute_delay`](crate::retry::compute_delay) function
5//! that applies the configured jitter strategy.
6
7use std::time::{Duration, Instant};
8
9/// Configures retry behavior for a specific failure domain.
10///
11/// Use the named constructors ([`Self::sqlite_busy`], [`Self::llm_rate_limit`],
12/// [`Self::cold_start`]) for pre-tuned policies. All timing values are in
13/// milliseconds except `max_elapsed_secs` which is in seconds.
14#[derive(Debug, Clone)]
15pub struct RetryConfig {
16    /// Base delay for the first retry attempt (ms).
17    pub initial_delay_ms: u64,
18    /// Upper bound on any single delay (ms).
19    pub max_delay_ms: u64,
20    /// Multiplicative factor applied per attempt.
21    pub multiplier: u64,
22    /// Hard cap on total attempts (0 = unlimited, use deadline).
23    pub max_attempts: u32,
24    /// Total elapsed wall-clock time before giving up (seconds).
25    pub max_elapsed_secs: u64,
26    /// Jitter strategy applied to computed delays.
27    pub jitter: JitterKind,
28}
29
30/// Jitter strategy for randomizing retry delays.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum JitterKind {
33    /// No randomization — deterministic delay.
34    None,
35    /// Half-jitter: delay in [base/2, base). Guarantees minimum wait.
36    Half,
37    /// Full-jitter: delay in [0, base). Maximum spread.
38    Full,
39}
40
41impl RetryConfig {
42    /// SQLite BUSY retry: 5 attempts, 300ms base, half-jitter, 30s deadline.
43    pub fn sqlite_busy() -> Self {
44        Self {
45            initial_delay_ms: 300,
46            max_delay_ms: 4800,
47            multiplier: 2,
48            max_attempts: 5,
49            max_elapsed_secs: 30,
50            jitter: JitterKind::Half,
51        }
52    }
53
54    /// LLM rate-limit retry: 60s base, 900s cap, half-jitter, 1h deadline.
55    pub fn llm_rate_limit() -> Self {
56        Self {
57            initial_delay_ms: 60_000,
58            max_delay_ms: 900_000,
59            multiplier: 2,
60            max_attempts: 20,
61            max_elapsed_secs: 3600,
62            jitter: JitterKind::Half,
63        }
64    }
65
66    /// Cold-start retry: 2s base, 2 attempts, no jitter, 30s deadline.
67    pub fn cold_start() -> Self {
68        Self {
69            initial_delay_ms: 2000,
70            max_delay_ms: 4000,
71            multiplier: 2,
72            max_attempts: 2,
73            max_elapsed_secs: 30,
74            jitter: JitterKind::None,
75        }
76    }
77}
78
79/// Computes the delay for a given attempt using the config's jitter strategy.
80///
81/// # Formula
82///
83/// ```text
84/// base = min(initial_delay_ms * multiplier^attempt, max_delay_ms)
85/// delay = apply_jitter(base, jitter_kind)
86/// ```
87pub fn compute_delay(config: &RetryConfig, attempt: u32) -> Duration {
88    let base = config
89        .initial_delay_ms
90        .saturating_mul(config.multiplier.saturating_pow(attempt))
91        .min(config.max_delay_ms);
92
93    let delay_ms = match config.jitter {
94        JitterKind::None => base,
95        JitterKind::Half => {
96            let half = base / 2;
97            if half == 0 {
98                base
99            } else {
100                half + fastrand::u64(0..half)
101            }
102        }
103        JitterKind::Full => {
104            if base == 0 {
105                0
106            } else {
107                fastrand::u64(0..base)
108            }
109        }
110    };
111
112    Duration::from_millis(delay_ms)
113}
114
115/// Returns `true` if the env var `SQLITE_GRAPHRAG_DISABLE_RETRY` is set to `1`.
116///
117/// When active, all retry loops should propagate the error immediately without
118/// sleeping. Use during incidents to prevent retry storms.
119pub fn is_kill_switch_active() -> bool {
120    std::env::var("SQLITE_GRAPHRAG_DISABLE_RETRY").is_ok_and(|v| v == "1")
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    #[test]
128    fn compute_delay_half_jitter_in_bounds() {
129        let cfg = RetryConfig::llm_rate_limit();
130        for attempt in 0..5 {
131            for _ in 0..100 {
132                let d = compute_delay(&cfg, attempt);
133                let base = cfg
134                    .initial_delay_ms
135                    .saturating_mul(cfg.multiplier.saturating_pow(attempt))
136                    .min(cfg.max_delay_ms);
137                let half = base / 2;
138                assert!(d.as_millis() >= half as u128);
139                assert!(d.as_millis() < base as u128);
140            }
141        }
142    }
143
144    #[test]
145    fn compute_delay_no_jitter_is_deterministic() {
146        let cfg = RetryConfig::cold_start();
147        let d1 = compute_delay(&cfg, 0);
148        let d2 = compute_delay(&cfg, 0);
149        assert_eq!(d1, d2);
150        assert_eq!(d1, Duration::from_millis(2000));
151    }
152
153    #[test]
154    fn kill_switch_inactive_by_default() {
155        std::env::remove_var("SQLITE_GRAPHRAG_DISABLE_RETRY");
156        assert!(!is_kill_switch_active());
157    }
158
159    #[test]
160    fn sqlite_busy_config_matches_constants() {
161        let cfg = RetryConfig::sqlite_busy();
162        assert_eq!(cfg.initial_delay_ms, 300);
163        assert_eq!(cfg.max_attempts, 5);
164        assert_eq!(cfg.max_elapsed_secs, 30);
165    }
166
167    #[test]
168    fn llm_rate_limit_has_deadline() {
169        let cfg = RetryConfig::llm_rate_limit();
170        assert_eq!(cfg.max_elapsed_secs, 3600);
171        assert_eq!(cfg.max_delay_ms, 900_000);
172    }
173
174    #[test]
175    fn full_jitter_stays_below_base() {
176        let cfg = RetryConfig {
177            initial_delay_ms: 1000,
178            max_delay_ms: 10_000,
179            multiplier: 2,
180            max_attempts: 5,
181            max_elapsed_secs: 60,
182            jitter: JitterKind::Full,
183        };
184        for attempt in 0..4 {
185            for _ in 0..100 {
186                let d = compute_delay(&cfg, attempt);
187                let base = cfg
188                    .initial_delay_ms
189                    .saturating_mul(cfg.multiplier.saturating_pow(attempt))
190                    .min(cfg.max_delay_ms);
191                assert!(d.as_millis() < base as u128);
192            }
193        }
194    }
195}
196
197// ---------------------------------------------------------------------------
198// Circuit Breaker (G28-D, v1.0.68)
199// ---------------------------------------------------------------------------
200
201/// Outcome of a single retry attempt, used to feed a [`CircuitBreaker`].
202///
203/// We keep this intentionally narrow: rate-limit / timeout errors are
204/// TRANSIENT and should NOT count toward the breaker; everything else
205/// counts as a HARD failure that contributes to opening the breaker.
206#[derive(Debug, Clone, Copy, PartialEq, Eq)]
207pub enum AttemptOutcome {
208    /// Transient error: counts as a successful iteration, does NOT trip the breaker.
209    /// Examples: `AppError::RateLimited`, `AppError::Timeout`, `AppError::DbBusy`.
210    Transient,
211    /// Hard failure: counts toward the breaker's failure threshold.
212    /// Examples: `AppError::Validation`, `AppError::Conflict`,
213    /// `AppError::Embedding`, `AppError::Internal`.
214    HardFailure,
215    /// Successful iteration: resets the consecutive-failure counter.
216    Success,
217}
218
219/// Counts consecutive hard failures and trips open after a threshold.
220///
221/// G28-D (v1.0.68): caps `enrich --retry-failed` and `ingest --retry-failed`
222/// loops so persistent failures (e.g., LLM provider returning the same
223/// 4xx for hours) cannot run unbounded.  After `threshold` consecutive
224/// [`AttemptOutcome::HardFailure`] outcomes, `record` returns `true` and
225/// the caller is expected to abort with `AppError::CircuitBreakerOpen`.
226///
227/// Rate-limited / transient errors are explicitly NOT counted, so a
228/// provider that throttles but eventually recovers will not trip the
229/// breaker.
230#[derive(Debug, Clone)]
231pub struct CircuitBreaker {
232    threshold: u32,
233    cooldown: Duration,
234    consecutive_failures: u32,
235    open_until: Option<Instant>,
236}
237
238impl CircuitBreaker {
239    /// Creates a breaker that opens after `threshold` consecutive hard
240    /// failures and stays open for `cooldown` after the last failure.
241    pub fn new(threshold: u32, cooldown: Duration) -> Self {
242        Self {
243            threshold,
244            cooldown,
245            consecutive_failures: 0,
246            open_until: None,
247        }
248    }
249
250    /// Records one attempt outcome.
251    ///
252    /// Returns `true` when the breaker is now open and the caller must
253    /// abort the job.  Returns `false` when the attempt should continue.
254    pub fn record(&mut self, outcome: AttemptOutcome) -> bool {
255        match outcome {
256            AttemptOutcome::Success | AttemptOutcome::Transient => {
257                self.consecutive_failures = 0;
258                false
259            }
260            AttemptOutcome::HardFailure => {
261                self.consecutive_failures = self.consecutive_failures.saturating_add(1);
262                if self.consecutive_failures >= self.threshold.max(1) {
263                    self.open_until = Some(Instant::now() + self.cooldown);
264                    tracing::error!(
265                        target: "circuit_breaker",
266                        consecutive_failures = self.consecutive_failures,
267                        threshold = self.threshold,
268                        cooldown_secs = self.cooldown.as_secs(),
269                        "circuit breaker opened — aborting job"
270                    );
271                    true
272                } else {
273                    false
274                }
275            }
276        }
277    }
278
279    /// `true` when the breaker is currently open (and not yet cooled down).
280    pub fn is_open(&self) -> bool {
281        self.open_until
282            .map(|deadline| Instant::now() < deadline)
283            .unwrap_or(false)
284    }
285
286    /// Resets the breaker to closed state.
287    pub fn reset(&mut self) {
288        self.consecutive_failures = 0;
289        self.open_until = None;
290    }
291}
292
293#[cfg(test)]
294mod circuit_breaker_tests {
295    use super::*;
296
297    #[test]
298    fn opens_after_threshold_consecutive_hard_failures() {
299        let mut cb = CircuitBreaker::new(3, Duration::from_secs(60));
300        assert!(!cb.record(AttemptOutcome::HardFailure));
301        assert!(!cb.record(AttemptOutcome::HardFailure));
302        assert!(cb.record(AttemptOutcome::HardFailure));
303        assert!(cb.is_open());
304    }
305
306    #[test]
307    fn ignores_transient_errors() {
308        let mut cb = CircuitBreaker::new(2, Duration::from_secs(60));
309        // 10 transients in a row should never open the breaker.
310        for _ in 0..10 {
311            assert!(!cb.record(AttemptOutcome::Transient));
312        }
313        assert!(!cb.is_open());
314    }
315
316    #[test]
317    fn success_resets_consecutive_failures() {
318        let mut cb = CircuitBreaker::new(3, Duration::from_secs(60));
319        cb.record(AttemptOutcome::HardFailure);
320        cb.record(AttemptOutcome::HardFailure);
321        cb.record(AttemptOutcome::Success);
322        assert!(!cb.record(AttemptOutcome::HardFailure));
323        assert!(!cb.is_open());
324    }
325}