use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub initial_delay_ms: u64,
pub max_delay_ms: u64,
pub multiplier: u64,
pub max_attempts: u32,
pub max_elapsed_secs: u64,
pub jitter: JitterKind,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JitterKind {
None,
Half,
Full,
}
impl RetryConfig {
pub fn sqlite_busy() -> Self {
Self {
initial_delay_ms: 300,
max_delay_ms: 4800,
multiplier: 2,
max_attempts: 5,
max_elapsed_secs: 30,
jitter: JitterKind::Half,
}
}
pub fn llm_rate_limit() -> Self {
Self {
initial_delay_ms: 60_000,
max_delay_ms: 900_000,
multiplier: 2,
max_attempts: 20,
max_elapsed_secs: 3600,
jitter: JitterKind::Half,
}
}
pub fn cold_start() -> Self {
Self {
initial_delay_ms: 2000,
max_delay_ms: 4000,
multiplier: 2,
max_attempts: 2,
max_elapsed_secs: 30,
jitter: JitterKind::None,
}
}
}
pub fn compute_delay(config: &RetryConfig, attempt: u32) -> Duration {
let base = config
.initial_delay_ms
.saturating_mul(config.multiplier.saturating_pow(attempt))
.min(config.max_delay_ms);
let delay_ms = match config.jitter {
JitterKind::None => base,
JitterKind::Half => {
let half = base / 2;
if half == 0 {
base
} else {
half + fastrand::u64(0..half)
}
}
JitterKind::Full => {
if base == 0 {
0
} else {
fastrand::u64(0..base)
}
}
};
Duration::from_millis(delay_ms)
}
pub fn is_kill_switch_active() -> bool {
std::env::var("SQLITE_GRAPHRAG_DISABLE_RETRY").is_ok_and(|v| v == "1")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn compute_delay_half_jitter_in_bounds() {
let cfg = RetryConfig::llm_rate_limit();
for attempt in 0..5 {
for _ in 0..100 {
let d = compute_delay(&cfg, attempt);
let base = cfg
.initial_delay_ms
.saturating_mul(cfg.multiplier.saturating_pow(attempt))
.min(cfg.max_delay_ms);
let half = base / 2;
assert!(d.as_millis() >= half as u128);
assert!(d.as_millis() < base as u128);
}
}
}
#[test]
fn compute_delay_no_jitter_is_deterministic() {
let cfg = RetryConfig::cold_start();
let d1 = compute_delay(&cfg, 0);
let d2 = compute_delay(&cfg, 0);
assert_eq!(d1, d2);
assert_eq!(d1, Duration::from_millis(2000));
}
#[test]
fn kill_switch_inactive_by_default() {
std::env::remove_var("SQLITE_GRAPHRAG_DISABLE_RETRY");
assert!(!is_kill_switch_active());
}
#[test]
fn sqlite_busy_config_matches_constants() {
let cfg = RetryConfig::sqlite_busy();
assert_eq!(cfg.initial_delay_ms, 300);
assert_eq!(cfg.max_attempts, 5);
assert_eq!(cfg.max_elapsed_secs, 30);
}
#[test]
fn llm_rate_limit_has_deadline() {
let cfg = RetryConfig::llm_rate_limit();
assert_eq!(cfg.max_elapsed_secs, 3600);
assert_eq!(cfg.max_delay_ms, 900_000);
}
#[test]
fn full_jitter_stays_below_base() {
let cfg = RetryConfig {
initial_delay_ms: 1000,
max_delay_ms: 10_000,
multiplier: 2,
max_attempts: 5,
max_elapsed_secs: 60,
jitter: JitterKind::Full,
};
for attempt in 0..4 {
for _ in 0..100 {
let d = compute_delay(&cfg, attempt);
let base = cfg
.initial_delay_ms
.saturating_mul(cfg.multiplier.saturating_pow(attempt))
.min(cfg.max_delay_ms);
assert!(d.as_millis() < base as u128);
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AttemptOutcome {
Transient,
HardFailure,
Success,
}
#[derive(Debug, Clone)]
pub struct CircuitBreaker {
threshold: u32,
cooldown: Duration,
consecutive_failures: u32,
open_until: Option<Instant>,
}
impl CircuitBreaker {
pub fn new(threshold: u32, cooldown: Duration) -> Self {
Self {
threshold,
cooldown,
consecutive_failures: 0,
open_until: None,
}
}
pub fn record(&mut self, outcome: AttemptOutcome) -> bool {
match outcome {
AttemptOutcome::Success | AttemptOutcome::Transient => {
self.consecutive_failures = 0;
false
}
AttemptOutcome::HardFailure => {
self.consecutive_failures = self.consecutive_failures.saturating_add(1);
if self.consecutive_failures >= self.threshold.max(1) {
self.open_until = Some(Instant::now() + self.cooldown);
tracing::error!(
target: "circuit_breaker",
consecutive_failures = self.consecutive_failures,
threshold = self.threshold,
cooldown_secs = self.cooldown.as_secs(),
"circuit breaker opened — aborting job"
);
true
} else {
false
}
}
}
}
pub fn is_open(&self) -> bool {
self.open_until
.map(|deadline| Instant::now() < deadline)
.unwrap_or(false)
}
pub fn reset(&mut self) {
self.consecutive_failures = 0;
self.open_until = None;
}
}
#[cfg(test)]
mod circuit_breaker_tests {
use super::*;
#[test]
fn opens_after_threshold_consecutive_hard_failures() {
let mut cb = CircuitBreaker::new(3, Duration::from_secs(60));
assert!(!cb.record(AttemptOutcome::HardFailure));
assert!(!cb.record(AttemptOutcome::HardFailure));
assert!(cb.record(AttemptOutcome::HardFailure));
assert!(cb.is_open());
}
#[test]
fn ignores_transient_errors() {
let mut cb = CircuitBreaker::new(2, Duration::from_secs(60));
for _ in 0..10 {
assert!(!cb.record(AttemptOutcome::Transient));
}
assert!(!cb.is_open());
}
#[test]
fn success_resets_consecutive_failures() {
let mut cb = CircuitBreaker::new(3, Duration::from_secs(60));
cb.record(AttemptOutcome::HardFailure);
cb.record(AttemptOutcome::HardFailure);
cb.record(AttemptOutcome::Success);
assert!(!cb.record(AttemptOutcome::HardFailure));
assert!(!cb.is_open());
}
}