Skip to main content

sqlite_graphrag/storage/
utils.rs

1//! Storage utility helpers shared across the storage sub-modules.
2
3use crate::constants::{MAX_SQLITE_BUSY_RETRIES, SQLITE_BUSY_BASE_DELAY_MS};
4use crate::errors::AppError;
5use rusqlite::ErrorCode;
6use std::thread;
7use std::time::Duration;
8
9/// Returns `true` when `err` wraps an `SQLITE_BUSY` (or `SQLITE_LOCKED`)
10/// condition reported by rusqlite.
11///
12/// Both `SQLITE_BUSY` (`ErrorCode::DatabaseBusy`) and `SQLITE_LOCKED`
13/// (`ErrorCode::DatabaseLocked`) indicate that the write cannot proceed
14/// immediately due to WAL concurrency.  We treat both as transient and
15/// eligible for retry.
16pub fn is_sqlite_busy(err: &AppError) -> bool {
17    match err {
18        AppError::Database(rusqlite::Error::SqliteFailure(e, _)) => {
19            e.code == ErrorCode::DatabaseBusy || e.code == ErrorCode::DatabaseLocked
20        }
21        _ => false,
22    }
23}
24
25/// Executes `op` up to `MAX_SQLITE_BUSY_RETRIES` times with exponential
26/// backoff whenever the operation fails with `SQLITE_BUSY` / `SQLITE_LOCKED`.
27///
28/// Delay schedule (base = `SQLITE_BUSY_BASE_DELAY_MS`):
29/// - attempt 1 → `base` ms
30/// - attempt 2 → `base * 2` ms
31/// - attempt 3 → `base * 4` ms
32/// - attempt 4 → `base * 8` ms
33/// - attempt 5 → `base * 16` ms
34///
35/// After all retries are exhausted the last `SQLITE_BUSY` error is converted
36/// to [`AppError::DbBusy`] so callers can route on exit-code `15`.
37pub fn with_busy_retry<F>(op: F) -> Result<(), AppError>
38where
39    F: Fn() -> Result<(), AppError>,
40{
41    for attempt in 0..MAX_SQLITE_BUSY_RETRIES {
42        match op() {
43            Ok(()) => return Ok(()),
44            Err(e) if is_sqlite_busy(&e) => {
45                if crate::retry::is_kill_switch_active() {
46                    tracing::warn!(target: "storage", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, propagating SQLITE_BUSY immediately");
47                    return Err(e);
48                }
49                let base_ms = SQLITE_BUSY_BASE_DELAY_MS * (1u64 << attempt);
50                let half = base_ms / 2;
51                let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
52                let delay_ms = half + jitter;
53                tracing::debug!(
54                    target: "storage",
55                    attempt = attempt + 1,
56                    attempt_max = MAX_SQLITE_BUSY_RETRIES,
57                    delay_ms,
58                    "SQLITE_BUSY retry with half-jitter"
59                );
60                thread::sleep(Duration::from_millis(delay_ms));
61            }
62            Err(other) => return Err(other),
63        }
64    }
65
66    tracing::error!(
67        target: "storage",
68        retries = MAX_SQLITE_BUSY_RETRIES,
69        "SQLITE_BUSY exhausted all retries"
70    );
71    Err(AppError::DbBusy(format!(
72        "SQLITE_BUSY after {MAX_SQLITE_BUSY_RETRIES} retries"
73    )))
74}
75
76#[cfg(test)]
77mod tests {
78    use super::*;
79    use std::sync::atomic::{AtomicU32, Ordering};
80    use std::sync::Arc;
81
82    /// Helper that builds a fake `AppError::Database` wrapping
83    /// `SQLITE_BUSY` (error code 5) so that `is_sqlite_busy` can be tested
84    /// without needing a live SQLite connection.
85    fn make_busy_error() -> AppError {
86        // rusqlite::Error::SqliteFailure requires a `ffi::Error` + optional msg.
87        // We construct it via the public `rusqlite::ffi` interface.
88        let ffi_err = rusqlite::ffi::Error {
89            code: ErrorCode::DatabaseBusy,
90            extended_code: 5,
91        };
92        AppError::Database(rusqlite::Error::SqliteFailure(ffi_err, None))
93    }
94
95    fn make_locked_error() -> AppError {
96        let ffi_err = rusqlite::ffi::Error {
97            code: ErrorCode::DatabaseLocked,
98            extended_code: 6,
99        };
100        AppError::Database(rusqlite::Error::SqliteFailure(ffi_err, None))
101    }
102
103    #[test]
104    fn is_sqlite_busy_detects_database_busy() {
105        assert!(is_sqlite_busy(&make_busy_error()));
106    }
107
108    #[test]
109    fn is_sqlite_busy_detects_database_locked() {
110        assert!(is_sqlite_busy(&make_locked_error()));
111    }
112
113    #[test]
114    fn is_sqlite_busy_rejects_other_errors() {
115        let err = AppError::Validation("invalid field".into());
116        assert!(!is_sqlite_busy(&err));
117    }
118
119    #[test]
120    fn with_busy_retry_propagates_non_busy_error() {
121        let calls = Arc::new(AtomicU32::new(0));
122        let calls_clone = Arc::clone(&calls);
123
124        let result = with_busy_retry(|| {
125            calls_clone.fetch_add(1, Ordering::SeqCst);
126            Err(AppError::Validation("campo x".into()))
127        });
128
129        // Non-busy errors must propagate immediately without retrying.
130        assert_eq!(calls.load(Ordering::SeqCst), 1);
131        assert!(matches!(result, Err(AppError::Validation(_))));
132    }
133
134    #[test]
135    fn with_busy_retry_succeeds_on_third_attempt() {
136        let calls = Arc::new(AtomicU32::new(0));
137        let calls_clone = Arc::clone(&calls);
138
139        // Fail twice with SQLITE_BUSY, succeed on the third call.
140        let result = with_busy_retry(|| {
141            let n = calls_clone.fetch_add(1, Ordering::SeqCst);
142            if n < 2 {
143                Err(make_busy_error())
144            } else {
145                Ok(())
146            }
147        });
148
149        assert_eq!(calls.load(Ordering::SeqCst), 3);
150        assert!(result.is_ok(), "expected Ok after 3rd attempt");
151    }
152
153    #[test]
154    fn busy_retry_jitter_in_range() {
155        // Verify that the half-jitter formula stays within [base/2, base) for attempt=2.
156        // attempt=2 → base_ms = SQLITE_BUSY_BASE_DELAY_MS * 4; half = base_ms/2.
157        // We call fastrand::u64 indirectly through with_busy_retry by observing that the
158        // function completes; direct delay bounds are tested via the formula invariant.
159        let base_ms = SQLITE_BUSY_BASE_DELAY_MS * (1u64 << 2); // attempt=2
160        let half = base_ms / 2;
161        for _ in 0..100 {
162            let jitter = fastrand::u64(0..half);
163            let delay_ms = half + jitter;
164            assert!(
165                delay_ms >= half && delay_ms < base_ms,
166                "delay_ms {delay_ms} out of [{half}, {base_ms})"
167            );
168        }
169    }
170
171    #[test]
172    fn with_busy_retry_returns_db_busy_after_all_retries() {
173        let calls = Arc::new(AtomicU32::new(0));
174        let calls_clone = Arc::clone(&calls);
175
176        let result = with_busy_retry(|| {
177            calls_clone.fetch_add(1, Ordering::SeqCst);
178            Err(make_busy_error())
179        });
180
181        assert_eq!(
182            calls.load(Ordering::SeqCst),
183            MAX_SQLITE_BUSY_RETRIES,
184            "must attempt exactly MAX_SQLITE_BUSY_RETRIES times"
185        );
186        assert!(
187            matches!(result, Err(AppError::DbBusy(_))),
188            "must convert to DbBusy after exhausting retries"
189        );
190    }
191}