sqlite_graphrag/storage/
utils.rs1use crate::constants::{MAX_SQLITE_BUSY_RETRIES, SQLITE_BUSY_BASE_DELAY_MS};
4use crate::errors::AppError;
5use rusqlite::ErrorCode;
6use std::thread;
7use std::time::Duration;
8
9pub fn is_sqlite_busy(err: &AppError) -> bool {
17 match err {
18 AppError::Database(rusqlite::Error::SqliteFailure(e, _)) => {
19 e.code == ErrorCode::DatabaseBusy || e.code == ErrorCode::DatabaseLocked
20 }
21 _ => false,
22 }
23}
24
25pub fn with_busy_retry<F>(op: F) -> Result<(), AppError>
38where
39 F: Fn() -> Result<(), AppError>,
40{
41 for attempt in 0..MAX_SQLITE_BUSY_RETRIES {
42 match op() {
43 Ok(()) => return Ok(()),
44 Err(e) if is_sqlite_busy(&e) => {
45 let base_ms = SQLITE_BUSY_BASE_DELAY_MS * (1u64 << attempt);
48 let half = base_ms / 2;
49 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
50 let delay_ms = half + jitter;
51 thread::sleep(Duration::from_millis(delay_ms));
52 }
53 Err(other) => return Err(other),
54 }
55 }
56
57 Err(AppError::DbBusy(format!(
59 "SQLITE_BUSY after {MAX_SQLITE_BUSY_RETRIES} retries"
60 )))
61}
62
63#[cfg(test)]
64mod tests {
65 use super::*;
66 use std::sync::atomic::{AtomicU32, Ordering};
67 use std::sync::Arc;
68
69 fn make_busy_error() -> AppError {
73 let ffi_err = rusqlite::ffi::Error {
76 code: ErrorCode::DatabaseBusy,
77 extended_code: 5,
78 };
79 AppError::Database(rusqlite::Error::SqliteFailure(ffi_err, None))
80 }
81
82 fn make_locked_error() -> AppError {
83 let ffi_err = rusqlite::ffi::Error {
84 code: ErrorCode::DatabaseLocked,
85 extended_code: 6,
86 };
87 AppError::Database(rusqlite::Error::SqliteFailure(ffi_err, None))
88 }
89
90 #[test]
91 fn is_sqlite_busy_detects_database_busy() {
92 assert!(is_sqlite_busy(&make_busy_error()));
93 }
94
95 #[test]
96 fn is_sqlite_busy_detects_database_locked() {
97 assert!(is_sqlite_busy(&make_locked_error()));
98 }
99
100 #[test]
101 fn is_sqlite_busy_rejects_other_errors() {
102 let err = AppError::Validation("invalid field".into());
103 assert!(!is_sqlite_busy(&err));
104 }
105
106 #[test]
107 fn with_busy_retry_propagates_non_busy_error() {
108 let calls = Arc::new(AtomicU32::new(0));
109 let calls_clone = Arc::clone(&calls);
110
111 let result = with_busy_retry(|| {
112 calls_clone.fetch_add(1, Ordering::SeqCst);
113 Err(AppError::Validation("campo x".into()))
114 });
115
116 assert_eq!(calls.load(Ordering::SeqCst), 1);
118 assert!(matches!(result, Err(AppError::Validation(_))));
119 }
120
121 #[test]
122 fn with_busy_retry_succeeds_on_third_attempt() {
123 let calls = Arc::new(AtomicU32::new(0));
124 let calls_clone = Arc::clone(&calls);
125
126 let result = with_busy_retry(|| {
128 let n = calls_clone.fetch_add(1, Ordering::SeqCst);
129 if n < 2 {
130 Err(make_busy_error())
131 } else {
132 Ok(())
133 }
134 });
135
136 assert_eq!(calls.load(Ordering::SeqCst), 3);
137 assert!(result.is_ok(), "expected Ok after 3rd attempt");
138 }
139
140 #[test]
141 fn busy_retry_jitter_in_range() {
142 let base_ms = SQLITE_BUSY_BASE_DELAY_MS * (1u64 << 2); let half = base_ms / 2;
148 for _ in 0..100 {
149 let jitter = fastrand::u64(0..half);
150 let delay_ms = half + jitter;
151 assert!(
152 delay_ms >= half && delay_ms < base_ms,
153 "delay_ms {delay_ms} out of [{half}, {base_ms})"
154 );
155 }
156 }
157
158 #[test]
159 fn with_busy_retry_returns_db_busy_after_all_retries() {
160 let calls = Arc::new(AtomicU32::new(0));
161 let calls_clone = Arc::clone(&calls);
162
163 let result = with_busy_retry(|| {
164 calls_clone.fetch_add(1, Ordering::SeqCst);
165 Err(make_busy_error())
166 });
167
168 assert_eq!(
169 calls.load(Ordering::SeqCst),
170 MAX_SQLITE_BUSY_RETRIES,
171 "must attempt exactly MAX_SQLITE_BUSY_RETRIES times"
172 );
173 assert!(
174 matches!(result, Err(AppError::DbBusy(_))),
175 "must convert to DbBusy after exhausting retries"
176 );
177 }
178}