ff_backend_sqlite/
retry.rs1use std::time::Duration;
24
25pub const MAX_ATTEMPTS: u32 = 3;
29
30pub trait IsRetryableBusy {
38 fn is_retryable_busy(&self) -> bool;
39}
40
41impl IsRetryableBusy for sqlx::Error {
42 fn is_retryable_busy(&self) -> bool {
43 crate::errors::is_retryable_sqlite_busy(self)
44 }
45}
46
47pub async fn retry_serializable<F, Fut, T, E>(mut f: F) -> Result<T, E>
51where
52 F: FnMut() -> Fut,
53 Fut: std::future::Future<Output = Result<T, E>>,
54 E: IsRetryableBusy,
55{
56 let mut last: Option<E> = None;
57 for attempt in 0..MAX_ATTEMPTS {
58 match f().await {
59 Ok(v) => return Ok(v),
60 Err(err) => {
61 if err.is_retryable_busy() {
62 if attempt + 1 < MAX_ATTEMPTS {
63 let ms = 5u64 * (1u64 << attempt);
64 tokio::time::sleep(Duration::from_millis(ms)).await;
65 }
66 last = Some(err);
67 continue;
68 }
69 return Err(err);
70 }
71 }
72 }
73 Err(last.expect("retry loop exited without populating last error"))
76}
77
78#[cfg(test)]
79mod tests {
80 use super::*;
81 use sqlx::error::{DatabaseError, ErrorKind};
82 use std::borrow::Cow;
83 use std::error::Error as StdError;
84 use std::fmt;
85 use std::sync::atomic::{AtomicU32, Ordering};
86 use std::sync::Arc;
87
88 #[derive(Debug)]
94 struct MockSqliteError {
95 code: String,
96 }
97
98 impl fmt::Display for MockSqliteError {
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 write!(f, "(mock code: {})", self.code)
101 }
102 }
103
104 impl StdError for MockSqliteError {}
105
106 impl DatabaseError for MockSqliteError {
107 fn message(&self) -> &str {
108 "mock"
109 }
110
111 fn code(&self) -> Option<Cow<'_, str>> {
112 Some(Cow::Borrowed(self.code.as_str()))
113 }
114
115 fn as_error(&self) -> &(dyn StdError + Send + Sync + 'static) {
116 self
117 }
118
119 fn as_error_mut(&mut self) -> &mut (dyn StdError + Send + Sync + 'static) {
120 self
121 }
122
123 fn into_error(self: Box<Self>) -> Box<dyn StdError + Send + Sync + 'static> {
124 self
125 }
126
127 fn kind(&self) -> ErrorKind {
128 ErrorKind::Other
129 }
130 }
131
132 fn db_err(code: &str) -> sqlx::Error {
133 sqlx::Error::Database(Box::new(MockSqliteError {
134 code: code.to_string(),
135 }))
136 }
137
138 #[test]
141 fn classifier_matches_busy() {
142 assert!(db_err("5").is_retryable_busy(), "SQLITE_BUSY (5)");
144 assert!(db_err("6").is_retryable_busy(), "SQLITE_LOCKED (6)");
145 assert!(
147 db_err("261").is_retryable_busy(),
148 "SQLITE_BUSY_RECOVERY (261 = 5 | 1<<8)"
149 );
150 assert!(
151 db_err("517").is_retryable_busy(),
152 "SQLITE_BUSY_SNAPSHOT (517 = 5 | 2<<8)"
153 );
154 assert!(
155 db_err("773").is_retryable_busy(),
156 "SQLITE_BUSY_TIMEOUT (773 = 5 | 3<<8)"
157 );
158 assert!(
160 db_err("262").is_retryable_busy(),
161 "SQLITE_LOCKED_SHAREDCACHE (262 = 6 | 1<<8)"
162 );
163 assert!(
164 db_err("518").is_retryable_busy(),
165 "SQLITE_LOCKED_VTAB (518 = 6 | 2<<8)"
166 );
167 }
168
169 #[test]
170 fn classifier_rejects_corrupt() {
171 assert!(!db_err("11").is_retryable_busy());
173 }
174
175 #[test]
176 fn classifier_rejects_full() {
177 assert!(!db_err("13").is_retryable_busy());
179 }
180
181 #[test]
182 fn classifier_rejects_misuse() {
183 assert!(!db_err("21").is_retryable_busy());
185 }
186
187 #[test]
188 fn classifier_rejects_non_db_error() {
189 assert!(!sqlx::Error::RowNotFound.is_retryable_busy());
190 }
191
192 #[test]
193 fn classifier_rejects_extended_non_busy_family() {
194 assert!(!db_err("266").is_retryable_busy());
198 assert!(!db_err("2067").is_retryable_busy());
200 }
201
202 #[tokio::test(start_paused = true)]
205 async fn retry_succeeds_on_first_try() {
206 let calls = Arc::new(AtomicU32::new(0));
207 let calls_c = calls.clone();
208 let result: Result<u32, sqlx::Error> = retry_serializable(|| {
209 let calls = calls_c.clone();
210 async move {
211 calls.fetch_add(1, Ordering::SeqCst);
212 Ok(42)
213 }
214 })
215 .await;
216 assert_eq!(result.unwrap(), 42);
217 assert_eq!(calls.load(Ordering::SeqCst), 1);
218 }
219
220 #[tokio::test(start_paused = true)]
221 async fn retry_exhausts_after_max_attempts() {
222 let calls = Arc::new(AtomicU32::new(0));
223 let calls_c = calls.clone();
224 let result: Result<(), sqlx::Error> = retry_serializable(|| {
225 let calls = calls_c.clone();
226 async move {
227 calls.fetch_add(1, Ordering::SeqCst);
228 Err(db_err("5")) }
230 })
231 .await;
232 assert!(result.is_err());
233 assert!(result.unwrap_err().is_retryable_busy());
234 assert_eq!(calls.load(Ordering::SeqCst), MAX_ATTEMPTS);
235 }
236
237 #[tokio::test(start_paused = true)]
238 async fn retry_returns_non_retryable_immediately() {
239 let calls = Arc::new(AtomicU32::new(0));
240 let calls_c = calls.clone();
241 let result: Result<(), sqlx::Error> = retry_serializable(|| {
242 let calls = calls_c.clone();
243 async move {
244 calls.fetch_add(1, Ordering::SeqCst);
245 Err(db_err("11")) }
247 })
248 .await;
249 assert!(result.is_err());
250 assert_eq!(calls.load(Ordering::SeqCst), 1);
251 }
252
253 #[tokio::test(start_paused = true)]
254 async fn retry_succeeds_on_second_attempt() {
255 let calls = Arc::new(AtomicU32::new(0));
256 let calls_c = calls.clone();
257 let result: Result<u32, sqlx::Error> = retry_serializable(|| {
258 let calls = calls_c.clone();
259 async move {
260 let n = calls.fetch_add(1, Ordering::SeqCst);
261 if n == 0 {
262 Err(db_err("5"))
263 } else {
264 Ok(7)
265 }
266 }
267 })
268 .await;
269 assert_eq!(result.unwrap(), 7);
270 assert_eq!(calls.load(Ordering::SeqCst), 2);
271 }
272
273 #[tokio::test(start_paused = true)]
274 async fn retry_backoff_matches_pg_shape() {
275 let start = tokio::time::Instant::now();
281 let _: Result<(), sqlx::Error> = retry_serializable(|| async {
282 Err::<(), _>(db_err("5"))
283 })
284 .await;
285 let elapsed = start.elapsed();
286 assert_eq!(
287 elapsed,
288 Duration::from_millis(15),
289 "expected 5ms + 10ms between three attempts"
290 );
291 }
292}