Skip to main content

ff_backend_sqlite/
retry.rs

1//! SERIALIZABLE retry helper for SQLite per RFC-023 §4.3.
2//!
3//! Mirrors `ff-backend-postgres::operator::retry_serializable`: up to
4//! [`MAX_ATTEMPTS`] iterations of the closure, re-running when the
5//! returned error classifies as a transient busy-contention fault
6//! (`SQLITE_BUSY` / `SQLITE_BUSY_TIMEOUT` / `SQLITE_LOCKED`). All
7//! non-retryable kinds (`SQLITE_CORRUPT`, `SQLITE_FULL`, misuse, etc.)
8//! propagate immediately.
9//!
10//! Backoff shape matches the PG reference (`5ms * 2^attempt`) — 5ms,
11//! 10ms between the three attempts. Deliberately short: under the
12//! single-writer envelope §4.1 targets, busy contention resolves in
13//! tens of milliseconds at most, and the caller already absorbed the
14//! SQLite driver's per-statement busy-wait budget before the error
15//! surfaced here.
16//!
17//! Wired into Wave-9 SERIALIZABLE ops in Phase 2a.2 / 2a.3:
18//! `cancel_flow`, `cancel_flow_header`, `ack_cancel_member`,
19//! `change_priority`, `replay_execution`, `complete_attempt`,
20//! `fail_attempt`, `deliver_signal`, plus the scanner-supervisor's
21//! budget-reconcile path.
22
23use std::time::Duration;
24
25/// Retry budget paralleling
26/// `ff-backend-postgres::operator::MAX_ATTEMPTS` and
27/// `CANCEL_FLOW_MAX_ATTEMPTS` (RFC-023 §4.3 / RFC §4.2 template).
28pub const MAX_ATTEMPTS: u32 = 3;
29
30/// Trait letting the retry helper classify an arbitrary error type
31/// as retryable. Implemented for `sqlx::Error` out of the box.
32///
33/// Op-level wrappers that translate `sqlx::Error` → `EngineError`
34/// before entering the retry loop implement this trait on their
35/// translated error to keep the classification at a single source of
36/// truth.
37pub trait IsRetryableBusy {
38    fn is_retryable_busy(&self) -> bool;
39}
40
41impl IsRetryableBusy for sqlx::Error {
42    fn is_retryable_busy(&self) -> bool {
43        crate::errors::is_retryable_sqlite_busy(self)
44    }
45}
46
47/// Runs `f` up to [`MAX_ATTEMPTS`] times, retrying when the error
48/// classifies as transient busy contention. Between retries, sleeps
49/// `5ms * 2^attempt` to match the PG reference shape.
50pub async fn retry_serializable<F, Fut, T, E>(mut f: F) -> Result<T, E>
51where
52    F: FnMut() -> Fut,
53    Fut: std::future::Future<Output = Result<T, E>>,
54    E: IsRetryableBusy,
55{
56    let mut last: Option<E> = None;
57    for attempt in 0..MAX_ATTEMPTS {
58        match f().await {
59            Ok(v) => return Ok(v),
60            Err(err) => {
61                if err.is_retryable_busy() {
62                    if attempt + 1 < MAX_ATTEMPTS {
63                        let ms = 5u64 * (1u64 << attempt);
64                        tokio::time::sleep(Duration::from_millis(ms)).await;
65                    }
66                    last = Some(err);
67                    continue;
68                }
69                return Err(err);
70            }
71        }
72    }
73    // Safe: MAX_ATTEMPTS >= 1 and the loop only falls through via
74    // the retryable branch, which always populates `last`.
75    Err(last.expect("retry loop exited without populating last error"))
76}
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81    use sqlx::error::{DatabaseError, ErrorKind};
82    use std::borrow::Cow;
83    use std::error::Error as StdError;
84    use std::fmt;
85    use std::sync::atomic::{AtomicU32, Ordering};
86    use std::sync::Arc;
87
88    // Minimal mock `DatabaseError` so we can build a real
89    // `sqlx::Error::Database` with a chosen SQLite code in tests.
90    // sqlx's `SqliteError::new` is `pub(crate)`, so we cannot reuse
91    // it directly — the `DatabaseError` trait surface is public and
92    // is the correct seam.
93    #[derive(Debug)]
94    struct MockSqliteError {
95        code: String,
96    }
97
98    impl fmt::Display for MockSqliteError {
99        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100            write!(f, "(mock code: {})", self.code)
101        }
102    }
103
104    impl StdError for MockSqliteError {}
105
106    impl DatabaseError for MockSqliteError {
107        fn message(&self) -> &str {
108            "mock"
109        }
110
111        fn code(&self) -> Option<Cow<'_, str>> {
112            Some(Cow::Borrowed(self.code.as_str()))
113        }
114
115        fn as_error(&self) -> &(dyn StdError + Send + Sync + 'static) {
116            self
117        }
118
119        fn as_error_mut(&mut self) -> &mut (dyn StdError + Send + Sync + 'static) {
120            self
121        }
122
123        fn into_error(self: Box<Self>) -> Box<dyn StdError + Send + Sync + 'static> {
124            self
125        }
126
127        fn kind(&self) -> ErrorKind {
128            ErrorKind::Other
129        }
130    }
131
132    fn db_err(code: &str) -> sqlx::Error {
133        sqlx::Error::Database(Box::new(MockSqliteError {
134            code: code.to_string(),
135        }))
136    }
137
138    // ── Classifier tests ─────────────────────────────────────────
139
140    #[test]
141    fn classifier_matches_busy() {
142        // Primary codes.
143        assert!(db_err("5").is_retryable_busy(), "SQLITE_BUSY (5)");
144        assert!(db_err("6").is_retryable_busy(), "SQLITE_LOCKED (6)");
145        // BUSY extended codes: low 8 bits == 5.
146        assert!(
147            db_err("261").is_retryable_busy(),
148            "SQLITE_BUSY_RECOVERY (261 = 5 | 1<<8)"
149        );
150        assert!(
151            db_err("517").is_retryable_busy(),
152            "SQLITE_BUSY_SNAPSHOT (517 = 5 | 2<<8)"
153        );
154        assert!(
155            db_err("773").is_retryable_busy(),
156            "SQLITE_BUSY_TIMEOUT (773 = 5 | 3<<8)"
157        );
158        // LOCKED extended codes: low 8 bits == 6.
159        assert!(
160            db_err("262").is_retryable_busy(),
161            "SQLITE_LOCKED_SHAREDCACHE (262 = 6 | 1<<8)"
162        );
163        assert!(
164            db_err("518").is_retryable_busy(),
165            "SQLITE_LOCKED_VTAB (518 = 6 | 2<<8)"
166        );
167    }
168
169    #[test]
170    fn classifier_rejects_corrupt() {
171        // SQLITE_CORRUPT = 11
172        assert!(!db_err("11").is_retryable_busy());
173    }
174
175    #[test]
176    fn classifier_rejects_full() {
177        // SQLITE_FULL = 13
178        assert!(!db_err("13").is_retryable_busy());
179    }
180
181    #[test]
182    fn classifier_rejects_misuse() {
183        // SQLITE_MISUSE = 21
184        assert!(!db_err("21").is_retryable_busy());
185    }
186
187    #[test]
188    fn classifier_rejects_non_db_error() {
189        assert!(!sqlx::Error::RowNotFound.is_retryable_busy());
190    }
191
192    #[test]
193    fn classifier_rejects_extended_non_busy_family() {
194        // SQLITE_IOERR = 10; SQLITE_IOERR_READ = 10 | 1<<8 = 266.
195        // High byte collides nothing with BUSY/LOCKED — low 8 bits
196        // must be 5 or 6 for a match. Guard against future mask bugs.
197        assert!(!db_err("266").is_retryable_busy());
198        // SQLITE_CONSTRAINT = 19; SQLITE_CONSTRAINT_UNIQUE = 19 | 8<<8 = 2067.
199        assert!(!db_err("2067").is_retryable_busy());
200    }
201
202    // ── retry_serializable tests ─────────────────────────────────
203
204    #[tokio::test(start_paused = true)]
205    async fn retry_succeeds_on_first_try() {
206        let calls = Arc::new(AtomicU32::new(0));
207        let calls_c = calls.clone();
208        let result: Result<u32, sqlx::Error> = retry_serializable(|| {
209            let calls = calls_c.clone();
210            async move {
211                calls.fetch_add(1, Ordering::SeqCst);
212                Ok(42)
213            }
214        })
215        .await;
216        assert_eq!(result.unwrap(), 42);
217        assert_eq!(calls.load(Ordering::SeqCst), 1);
218    }
219
220    #[tokio::test(start_paused = true)]
221    async fn retry_exhausts_after_max_attempts() {
222        let calls = Arc::new(AtomicU32::new(0));
223        let calls_c = calls.clone();
224        let result: Result<(), sqlx::Error> = retry_serializable(|| {
225            let calls = calls_c.clone();
226            async move {
227                calls.fetch_add(1, Ordering::SeqCst);
228                Err(db_err("5")) // SQLITE_BUSY
229            }
230        })
231        .await;
232        assert!(result.is_err());
233        assert!(result.unwrap_err().is_retryable_busy());
234        assert_eq!(calls.load(Ordering::SeqCst), MAX_ATTEMPTS);
235    }
236
237    #[tokio::test(start_paused = true)]
238    async fn retry_returns_non_retryable_immediately() {
239        let calls = Arc::new(AtomicU32::new(0));
240        let calls_c = calls.clone();
241        let result: Result<(), sqlx::Error> = retry_serializable(|| {
242            let calls = calls_c.clone();
243            async move {
244                calls.fetch_add(1, Ordering::SeqCst);
245                Err(db_err("11")) // SQLITE_CORRUPT
246            }
247        })
248        .await;
249        assert!(result.is_err());
250        assert_eq!(calls.load(Ordering::SeqCst), 1);
251    }
252
253    #[tokio::test(start_paused = true)]
254    async fn retry_succeeds_on_second_attempt() {
255        let calls = Arc::new(AtomicU32::new(0));
256        let calls_c = calls.clone();
257        let result: Result<u32, sqlx::Error> = retry_serializable(|| {
258            let calls = calls_c.clone();
259            async move {
260                let n = calls.fetch_add(1, Ordering::SeqCst);
261                if n == 0 {
262                    Err(db_err("5"))
263                } else {
264                    Ok(7)
265                }
266            }
267        })
268        .await;
269        assert_eq!(result.unwrap(), 7);
270        assert_eq!(calls.load(Ordering::SeqCst), 2);
271    }
272
273    #[tokio::test(start_paused = true)]
274    async fn retry_backoff_matches_pg_shape() {
275        // With start_paused, any sleep advances virtual time only when
276        // the runtime auto-advances (which happens when all tasks are
277        // parked). We observe total elapsed virtual time after an
278        // exhausted retry: sleeps are 5ms (after attempt 0) + 10ms
279        // (after attempt 1) = 15ms. No sleep after the final attempt.
280        let start = tokio::time::Instant::now();
281        let _: Result<(), sqlx::Error> = retry_serializable(|| async {
282            Err::<(), _>(db_err("5"))
283        })
284        .await;
285        let elapsed = start.elapsed();
286        assert_eq!(
287            elapsed,
288            Duration::from_millis(15),
289            "expected 5ms + 10ms between three attempts"
290        );
291    }
292}