Skip to main content

sqlite_graphrag/storage/
utils.rs

1//! Storage utility helpers shared across the storage sub-modules.
2
3use crate::constants::{MAX_SQLITE_BUSY_RETRIES, SQLITE_BUSY_BASE_DELAY_MS};
4use crate::errors::AppError;
5use rusqlite::ErrorCode;
6use std::thread;
7use std::time::Duration;
8
9/// Returns `true` when `err` wraps an `SQLITE_BUSY` (or `SQLITE_LOCKED`)
10/// condition reported by rusqlite.
11///
12/// Both `SQLITE_BUSY` (`ErrorCode::DatabaseBusy`) and `SQLITE_LOCKED`
13/// (`ErrorCode::DatabaseLocked`) indicate that the write cannot proceed
14/// immediately due to WAL concurrency.  We treat both as transient and
15/// eligible for retry.
16pub fn is_sqlite_busy(err: &AppError) -> bool {
17    match err {
18        AppError::Database(rusqlite::Error::SqliteFailure(e, _)) => {
19            e.code == ErrorCode::DatabaseBusy || e.code == ErrorCode::DatabaseLocked
20        }
21        _ => false,
22    }
23}
24
25/// Executes `op` up to `MAX_SQLITE_BUSY_RETRIES` times with exponential
26/// backoff whenever the operation fails with `SQLITE_BUSY` / `SQLITE_LOCKED`.
27///
28/// Delay schedule (base = `SQLITE_BUSY_BASE_DELAY_MS`):
29/// - attempt 1 → `base` ms
30/// - attempt 2 → `base * 2` ms
31/// - attempt 3 → `base * 4` ms
32/// - attempt 4 → `base * 8` ms
33/// - attempt 5 → `base * 16` ms
34///
35/// After all retries are exhausted the last `SQLITE_BUSY` error is converted
36/// to [`AppError::DbBusy`] so callers can route on exit-code `15`.
37pub fn with_busy_retry<F>(op: F) -> Result<(), AppError>
38where
39    F: Fn() -> Result<(), AppError>,
40{
41    for attempt in 0..MAX_SQLITE_BUSY_RETRIES {
42        match op() {
43            Ok(()) => return Ok(()),
44            Err(e) if is_sqlite_busy(&e) => {
45                // v1.0.43 (M7): half-jitter to prevent thundering herd when multiple CLIs hit
46                // SQLITE_BUSY simultaneously. Effective delay: [base/2, base).
47                let base_ms = SQLITE_BUSY_BASE_DELAY_MS * (1u64 << attempt);
48                let half = base_ms / 2;
49                let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
50                let delay_ms = half + jitter;
51                thread::sleep(Duration::from_millis(delay_ms));
52            }
53            Err(other) => return Err(other),
54        }
55    }
56
57    // All retries exhausted — convert to DbBusy for stable exit-code 15.
58    Err(AppError::DbBusy(format!(
59        "SQLITE_BUSY after {MAX_SQLITE_BUSY_RETRIES} retries"
60    )))
61}
62
63#[cfg(test)]
64mod tests {
65    use super::*;
66    use std::sync::atomic::{AtomicU32, Ordering};
67    use std::sync::Arc;
68
69    /// Helper that builds a fake `AppError::Database` wrapping
70    /// `SQLITE_BUSY` (error code 5) so that `is_sqlite_busy` can be tested
71    /// without needing a live SQLite connection.
72    fn make_busy_error() -> AppError {
73        // rusqlite::Error::SqliteFailure requires a `ffi::Error` + optional msg.
74        // We construct it via the public `rusqlite::ffi` interface.
75        let ffi_err = rusqlite::ffi::Error {
76            code: ErrorCode::DatabaseBusy,
77            extended_code: 5,
78        };
79        AppError::Database(rusqlite::Error::SqliteFailure(ffi_err, None))
80    }
81
82    fn make_locked_error() -> AppError {
83        let ffi_err = rusqlite::ffi::Error {
84            code: ErrorCode::DatabaseLocked,
85            extended_code: 6,
86        };
87        AppError::Database(rusqlite::Error::SqliteFailure(ffi_err, None))
88    }
89
90    #[test]
91    fn is_sqlite_busy_detects_database_busy() {
92        assert!(is_sqlite_busy(&make_busy_error()));
93    }
94
95    #[test]
96    fn is_sqlite_busy_detects_database_locked() {
97        assert!(is_sqlite_busy(&make_locked_error()));
98    }
99
100    #[test]
101    fn is_sqlite_busy_rejects_other_errors() {
102        let err = AppError::Validation("invalid field".into());
103        assert!(!is_sqlite_busy(&err));
104    }
105
106    #[test]
107    fn with_busy_retry_propagates_non_busy_error() {
108        let calls = Arc::new(AtomicU32::new(0));
109        let calls_clone = Arc::clone(&calls);
110
111        let result = with_busy_retry(|| {
112            calls_clone.fetch_add(1, Ordering::SeqCst);
113            Err(AppError::Validation("campo x".into()))
114        });
115
116        // Non-busy errors must propagate immediately without retrying.
117        assert_eq!(calls.load(Ordering::SeqCst), 1);
118        assert!(matches!(result, Err(AppError::Validation(_))));
119    }
120
121    #[test]
122    fn with_busy_retry_succeeds_on_third_attempt() {
123        let calls = Arc::new(AtomicU32::new(0));
124        let calls_clone = Arc::clone(&calls);
125
126        // Fail twice with SQLITE_BUSY, succeed on the third call.
127        let result = with_busy_retry(|| {
128            let n = calls_clone.fetch_add(1, Ordering::SeqCst);
129            if n < 2 {
130                Err(make_busy_error())
131            } else {
132                Ok(())
133            }
134        });
135
136        assert_eq!(calls.load(Ordering::SeqCst), 3);
137        assert!(result.is_ok(), "expected Ok after 3rd attempt");
138    }
139
140    #[test]
141    fn busy_retry_jitter_in_range() {
142        // Verify that the half-jitter formula stays within [base/2, base) for attempt=2.
143        // attempt=2 → base_ms = SQLITE_BUSY_BASE_DELAY_MS * 4; half = base_ms/2.
144        // We call fastrand::u64 indirectly through with_busy_retry by observing that the
145        // function completes; direct delay bounds are tested via the formula invariant.
146        let base_ms = SQLITE_BUSY_BASE_DELAY_MS * (1u64 << 2); // attempt=2
147        let half = base_ms / 2;
148        for _ in 0..100 {
149            let jitter = fastrand::u64(0..half);
150            let delay_ms = half + jitter;
151            assert!(
152                delay_ms >= half && delay_ms < base_ms,
153                "delay_ms {delay_ms} out of [{half}, {base_ms})"
154            );
155        }
156    }
157
158    #[test]
159    fn with_busy_retry_returns_db_busy_after_all_retries() {
160        let calls = Arc::new(AtomicU32::new(0));
161        let calls_clone = Arc::clone(&calls);
162
163        let result = with_busy_retry(|| {
164            calls_clone.fetch_add(1, Ordering::SeqCst);
165            Err(make_busy_error())
166        });
167
168        assert_eq!(
169            calls.load(Ordering::SeqCst),
170            MAX_SQLITE_BUSY_RETRIES,
171            "must attempt exactly MAX_SQLITE_BUSY_RETRIES times"
172        );
173        assert!(
174            matches!(result, Err(AppError::DbBusy(_))),
175            "must convert to DbBusy after exhausting retries"
176        );
177    }
178}