sqlite_graphrag/storage/
utils.rs1use crate::constants::{MAX_SQLITE_BUSY_RETRIES, SQLITE_BUSY_BASE_DELAY_MS};
4use crate::errors::AppError;
5use rusqlite::ErrorCode;
6use std::thread;
7use std::time::Duration;
8
9pub fn is_sqlite_busy(err: &AppError) -> bool {
17 match err {
18 AppError::Database(rusqlite::Error::SqliteFailure(e, _)) => {
19 e.code == ErrorCode::DatabaseBusy || e.code == ErrorCode::DatabaseLocked
20 }
21 _ => false,
22 }
23}
24
25pub fn with_busy_retry<F>(op: F) -> Result<(), AppError>
38where
39 F: Fn() -> Result<(), AppError>,
40{
41 for attempt in 0..MAX_SQLITE_BUSY_RETRIES {
42 match op() {
43 Ok(()) => return Ok(()),
44 Err(e) if is_sqlite_busy(&e) => {
45 if crate::retry::is_kill_switch_active() {
46 tracing::warn!(target: "storage", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, propagating SQLITE_BUSY immediately");
47 return Err(e);
48 }
49 let base_ms = SQLITE_BUSY_BASE_DELAY_MS * (1u64 << attempt);
50 let half = base_ms / 2;
51 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
52 let delay_ms = half + jitter;
53 tracing::debug!(
54 target: "storage",
55 attempt = attempt + 1,
56 attempt_max = MAX_SQLITE_BUSY_RETRIES,
57 delay_ms,
58 "SQLITE_BUSY retry with half-jitter"
59 );
60 thread::sleep(Duration::from_millis(delay_ms));
61 }
62 Err(other) => return Err(other),
63 }
64 }
65
66 tracing::error!(
67 target: "storage",
68 retries = MAX_SQLITE_BUSY_RETRIES,
69 "SQLITE_BUSY exhausted all retries"
70 );
71 Err(AppError::DbBusy(format!(
72 "SQLITE_BUSY after {MAX_SQLITE_BUSY_RETRIES} retries"
73 )))
74}
75
76#[cfg(test)]
77mod tests {
78 use super::*;
79 use std::sync::atomic::{AtomicU32, Ordering};
80 use std::sync::Arc;
81
82 fn make_busy_error() -> AppError {
86 let ffi_err = rusqlite::ffi::Error {
89 code: ErrorCode::DatabaseBusy,
90 extended_code: 5,
91 };
92 AppError::Database(rusqlite::Error::SqliteFailure(ffi_err, None))
93 }
94
95 fn make_locked_error() -> AppError {
96 let ffi_err = rusqlite::ffi::Error {
97 code: ErrorCode::DatabaseLocked,
98 extended_code: 6,
99 };
100 AppError::Database(rusqlite::Error::SqliteFailure(ffi_err, None))
101 }
102
103 #[test]
104 fn is_sqlite_busy_detects_database_busy() {
105 assert!(is_sqlite_busy(&make_busy_error()));
106 }
107
108 #[test]
109 fn is_sqlite_busy_detects_database_locked() {
110 assert!(is_sqlite_busy(&make_locked_error()));
111 }
112
113 #[test]
114 fn is_sqlite_busy_rejects_other_errors() {
115 let err = AppError::Validation("invalid field".into());
116 assert!(!is_sqlite_busy(&err));
117 }
118
119 #[test]
120 fn with_busy_retry_propagates_non_busy_error() {
121 let calls = Arc::new(AtomicU32::new(0));
122 let calls_clone = Arc::clone(&calls);
123
124 let result = with_busy_retry(|| {
125 calls_clone.fetch_add(1, Ordering::SeqCst);
126 Err(AppError::Validation("campo x".into()))
127 });
128
129 assert_eq!(calls.load(Ordering::SeqCst), 1);
131 assert!(matches!(result, Err(AppError::Validation(_))));
132 }
133
134 #[test]
135 fn with_busy_retry_succeeds_on_third_attempt() {
136 let calls = Arc::new(AtomicU32::new(0));
137 let calls_clone = Arc::clone(&calls);
138
139 let result = with_busy_retry(|| {
141 let n = calls_clone.fetch_add(1, Ordering::SeqCst);
142 if n < 2 {
143 Err(make_busy_error())
144 } else {
145 Ok(())
146 }
147 });
148
149 assert_eq!(calls.load(Ordering::SeqCst), 3);
150 assert!(result.is_ok(), "expected Ok after 3rd attempt");
151 }
152
153 #[test]
154 fn busy_retry_jitter_in_range() {
155 let base_ms = SQLITE_BUSY_BASE_DELAY_MS * (1u64 << 2); let half = base_ms / 2;
161 for _ in 0..100 {
162 let jitter = fastrand::u64(0..half);
163 let delay_ms = half + jitter;
164 assert!(
165 delay_ms >= half && delay_ms < base_ms,
166 "delay_ms {delay_ms} out of [{half}, {base_ms})"
167 );
168 }
169 }
170
171 #[test]
172 fn with_busy_retry_returns_db_busy_after_all_retries() {
173 let calls = Arc::new(AtomicU32::new(0));
174 let calls_clone = Arc::clone(&calls);
175
176 let result = with_busy_retry(|| {
177 calls_clone.fetch_add(1, Ordering::SeqCst);
178 Err(make_busy_error())
179 });
180
181 assert_eq!(
182 calls.load(Ordering::SeqCst),
183 MAX_SQLITE_BUSY_RETRIES,
184 "must attempt exactly MAX_SQLITE_BUSY_RETRIES times"
185 );
186 assert!(
187 matches!(result, Err(AppError::DbBusy(_))),
188 "must convert to DbBusy after exhausting retries"
189 );
190 }
191}