Skip to main content

vdsl_sync/infra/sqlite/
mod.rs

1//! SQLite implementation of file/transfer/remote stores.
2//!
3//! Uses normalized schema: `tracked_files` + `transfers` + `sync_remotes`.
4//! Designed for single-writer (sync engine), concurrent readers OK.
5//!
6//! Uses `tokio-rusqlite` for non-blocking async access — each connection
7//! runs on a dedicated background thread with mpsc channel dispatch.
8
9mod location_file_store_impl;
10mod mapping;
11mod schema;
12mod task_store_impl;
13mod topology_file_store_impl;
14mod transfer_store_impl;
15
16use std::path::Path;
17
18use crate::infra::error::InfraError;
19
20/// SQLite-backed sync store.
21///
22/// Uses `tokio_rusqlite::Connection` — a handle that dispatches closures
23/// to a dedicated background thread via mpsc channel. Does not block
24/// the async runtime.
25pub struct SqliteSyncStore {
26    conn: tokio_rusqlite::Connection,
27}
28
29impl SqliteSyncStore {
30    /// Open (or create) a sync database at the given path.
31    pub async fn open(path: &Path) -> Result<Self, InfraError> {
32        let path = path.to_path_buf();
33        let conn =
34            tokio_rusqlite::Connection::open(&path)
35                .await
36                .map_err(|e| InfraError::Store {
37                    op: "sqlite",
38                    reason: format!("open failed: {e}"),
39                })?;
40        conn.call(schema::init_connection)
41            .await
42            .map_err(map_call_err)?;
43        Ok(Self { conn })
44    }
45
46    /// Open an in-memory database (for testing).
47    pub async fn open_in_memory() -> Result<Self, InfraError> {
48        let conn = tokio_rusqlite::Connection::open_in_memory()
49            .await
50            .map_err(|e| InfraError::Store {
51                op: "sqlite",
52                reason: format!("open_in_memory failed: {e}"),
53            })?;
54        conn.call(schema::init_connection)
55            .await
56            .map_err(map_call_err)?;
57        Ok(Self { conn })
58    }
59}
60
61// =============================================================================
62// Error mapping
63// =============================================================================
64
65/// Convert `tokio_rusqlite::Error<InfraError>` → `InfraError`.
66fn map_call_err(e: tokio_rusqlite::Error<InfraError>) -> InfraError {
67    match e {
68        tokio_rusqlite::Error::Error(infra_err) => infra_err,
69        tokio_rusqlite::Error::ConnectionClosed => InfraError::Store {
70            op: "sqlite",
71            reason: "sqlite connection closed".into(),
72        },
73        tokio_rusqlite::Error::Close((_, e)) => InfraError::Store {
74            op: "sqlite",
75            reason: format!("sqlite close error: {e}"),
76        },
77        other => InfraError::Store {
78            op: "sqlite",
79            reason: format!("tokio-rusqlite: {other:?}"),
80        },
81    }
82}
83
84// =============================================================================
85// Tests
86// =============================================================================
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91
92    use rusqlite::params;
93
94    use crate::domain::file_type::FileType;
95    use crate::domain::location::LocationId;
96    use crate::domain::topology_file::TopologyFile;
97    use crate::domain::transfer::Transfer;
98    use crate::infra::topology_file_store::TopologyFileStore;
99    use crate::infra::transfer_store::TransferStore;
100
101    fn loc(s: &str) -> LocationId {
102        LocationId::new(s).expect("valid test location")
103    }
104
105    /// Create a test TopologyFile and insert it into the store.
106    /// Returns the TopologyFile (for use as FK target in transfers).
107    async fn insert_test_topology_file(store: &SqliteSyncStore, path: &str) -> TopologyFile {
108        let tf =
109            TopologyFile::new(path.to_string(), FileType::Image).expect("valid test topology file");
110        TopologyFileStore::upsert(&*store, &tf)
111            .await
112            .expect("insert topology file");
113        tf
114    }
115
116    // =========================================================================
117    // TransferStore tests
118    // =========================================================================
119
120    #[tokio::test]
121    async fn insert_and_query_transfer() {
122        let store = SqliteSyncStore::open_in_memory().await.expect("open");
123        let file = insert_test_topology_file(&store, "output/t.png").await;
124
125        let transfer =
126            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
127        store
128            .insert_transfer(&transfer)
129            .await
130            .expect("insert transfer");
131
132        let queued = store.queued_transfers(&loc("cloud")).await.expect("queued");
133        assert_eq!(queued.len(), 1);
134        assert_eq!(queued[0].file_id(), file.id());
135        assert_eq!(queued[0].dest(), &loc("cloud"));
136    }
137
138    #[tokio::test]
139    async fn update_transfer_state() {
140        let store = SqliteSyncStore::open_in_memory().await.expect("open");
141        let file = insert_test_topology_file(&store, "output/s.png").await;
142
143        let mut transfer =
144            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
145        store
146            .insert_transfer(&transfer)
147            .await
148            .expect("insert transfer");
149
150        transfer.start().expect("start");
151        store
152            .update_transfer(&transfer)
153            .await
154            .expect("update transfer");
155
156        let queued = store.queued_transfers(&loc("cloud")).await.expect("queued");
157        assert_eq!(queued.len(), 0);
158
159        transfer.complete().expect("complete");
160        store
161            .update_transfer(&transfer)
162            .await
163            .expect("update transfer");
164
165        let latest = store
166            .latest_transfers_by_file(file.id())
167            .await
168            .expect("latest");
169        assert_eq!(latest.len(), 1);
170        assert_eq!(
171            latest[0].state(),
172            crate::domain::transfer::TransferState::Completed
173        );
174    }
175
176    #[tokio::test]
177    async fn failed_transfers_query() {
178        let store = SqliteSyncStore::open_in_memory().await.expect("open");
179        let file = insert_test_topology_file(&store, "output/f.png").await;
180
181        let mut transfer =
182            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
183        transfer.start().expect("start");
184        transfer
185            .fail(
186                "timeout".into(),
187                crate::domain::retry::TransferErrorKind::Transient,
188            )
189            .expect("fail");
190        store
191            .insert_transfer(&transfer)
192            .await
193            .expect("insert transfer");
194
195        let failed = store.failed_transfers().await.expect("failed");
196        assert_eq!(failed.len(), 1);
197        assert_eq!(failed[0].error(), Some("timeout"));
198        assert_eq!(
199            failed[0].error_kind(),
200            Some(crate::domain::retry::TransferErrorKind::Transient)
201        );
202    }
203
204    #[tokio::test]
205    async fn failed_transfers_excludes_retried() {
206        let store = SqliteSyncStore::open_in_memory().await.expect("open");
207        let file = insert_test_topology_file(&store, "output/retry.png").await;
208
209        // T1: Failed (attempt=1)
210        let mut t1 =
211            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
212        t1.start().expect("start");
213        t1.fail(
214            "net error".into(),
215            crate::domain::retry::TransferErrorKind::Transient,
216        )
217        .expect("fail");
218        store.insert_transfer(&t1).await.expect("insert t1");
219
220        // T2: retry of T1 → Queued (attempt=2), then fails again
221        let mut t2 = t1.retry().expect("retry");
222        t2.start().expect("start");
223        t2.fail(
224            "net error again".into(),
225            crate::domain::retry::TransferErrorKind::Transient,
226        )
227        .expect("fail");
228        store.insert_transfer(&t2).await.expect("insert t2");
229
230        // failed_transfers should return only T2 (latest), not T1
231        let failed = store.failed_transfers().await.expect("failed");
232        assert_eq!(
233            failed.len(),
234            1,
235            "should return only the latest failed transfer"
236        );
237        assert_eq!(failed[0].error(), Some("net error again"));
238        assert_eq!(failed[0].attempt(), 2);
239    }
240
241    #[tokio::test]
242    async fn latest_transfers_by_file_returns_latest_per_dest() {
243        let store = SqliteSyncStore::open_in_memory().await.expect("open");
244        let file = insert_test_topology_file(&store, "output/r.png").await;
245
246        let mut t1 =
247            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
248        t1.start().expect("start");
249        t1.fail(
250            "err".into(),
251            crate::domain::retry::TransferErrorKind::Transient,
252        )
253        .expect("fail");
254        store.insert_transfer(&t1).await.expect("insert t1");
255
256        let t2 = t1.retry().expect("retry");
257        store.insert_transfer(&t2).await.expect("insert t2");
258
259        let mut t3 = Transfer::new(file.id().to_string(), loc("local"), loc("pod")).expect("valid");
260        t3.start().expect("start");
261        t3.complete().expect("complete");
262        store.insert_transfer(&t3).await.expect("insert t3");
263
264        let latest = store
265            .latest_transfers_by_file(file.id())
266            .await
267            .expect("latest");
268        assert_eq!(latest.len(), 2);
269
270        let cloud = latest
271            .iter()
272            .find(|t| t.dest() == &loc("cloud"))
273            .expect("cloud");
274        assert_eq!(
275            cloud.state(),
276            crate::domain::transfer::TransferState::Queued
277        );
278        assert_eq!(cloud.attempt(), 2);
279
280        let pod = latest
281            .iter()
282            .find(|t| t.dest() == &loc("pod"))
283            .expect("pod");
284        assert_eq!(
285            pod.state(),
286            crate::domain::transfer::TransferState::Completed
287        );
288    }
289
290    #[tokio::test]
291    async fn queued_returns_only_latest_per_file_dest() {
292        let store = SqliteSyncStore::open_in_memory().await.expect("open");
293        let file = insert_test_topology_file(&store, "output/q.png").await;
294
295        let mut t1 =
296            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
297        t1.start().expect("start");
298        t1.fail(
299            "err".into(),
300            crate::domain::retry::TransferErrorKind::Transient,
301        )
302        .expect("fail");
303        store.insert_transfer(&t1).await.expect("insert t1");
304
305        let t2 = t1.retry().expect("retry");
306        store.insert_transfer(&t2).await.expect("insert t2");
307
308        let queued = store.queued_transfers(&loc("cloud")).await.expect("queued");
309        assert_eq!(queued.len(), 1);
310        assert_eq!(queued[0].attempt(), 2);
311    }
312
313    // =========================================================================
314    // unblock_dependents tests
315    // =========================================================================
316
317    #[tokio::test]
318    async fn unblock_dependents_transitions_blocked_to_queued() {
319        use crate::domain::transfer::TransferKind;
320
321        let store = SqliteSyncStore::open_in_memory().await.expect("open");
322        let file = insert_test_topology_file(&store, "output/chain.png").await;
323
324        // T1: local→cloud (Queued — 先行transfer)
325        let t1 =
326            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid t1");
327        let t1_id = t1.id().to_string();
328        store.insert_transfer(&t1).await.expect("insert t1");
329
330        // T2: cloud→pod (Blocked, depends_on=T1)
331        let t2 = Transfer::with_dependency(
332            file.id().to_string(),
333            loc("cloud"),
334            loc("pod"),
335            TransferKind::Sync,
336            t1_id.clone(),
337        )
338        .expect("valid t2");
339        let t2_id = t2.id().to_string();
340        store.insert_transfer(&t2).await.expect("insert t2");
341
342        // Before unblock: T2 should NOT appear in queued_transfers
343        let queued_before = store.queued_transfers(&loc("pod")).await.expect("queued");
344        assert_eq!(
345            queued_before.len(),
346            0,
347            "blocked transfer must not appear in queued"
348        );
349
350        // Simulate T1 completion → unblock dependents
351        let unblocked = store.unblock_dependents(&t1_id).await.expect("unblock");
352        assert_eq!(unblocked, 1, "exactly one transfer should be unblocked");
353
354        // After unblock: T2 should appear in queued_transfers
355        let queued_after = store.queued_transfers(&loc("pod")).await.expect("queued");
356        assert_eq!(
357            queued_after.len(),
358            1,
359            "unblocked transfer must appear in queued"
360        );
361        assert_eq!(queued_after[0].id(), t2_id);
362    }
363
364    #[tokio::test]
365    async fn unblock_dependents_ignores_non_blocked_state() {
366        use crate::domain::transfer::TransferKind;
367
368        let store = SqliteSyncStore::open_in_memory().await.expect("open");
369        let file = insert_test_topology_file(&store, "output/nonblock.png").await;
370
371        // T1: local→cloud (Queued)
372        let t1 =
373            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid t1");
374        let t1_id = t1.id().to_string();
375        store.insert_transfer(&t1).await.expect("insert t1");
376
377        // T2: depends on T1, but manually set to in_flight (not blocked)
378        let t2 = Transfer::with_dependency(
379            file.id().to_string(),
380            loc("cloud"),
381            loc("pod"),
382            TransferKind::Sync,
383            t1_id.clone(),
384        )
385        .expect("valid t2");
386        // with_dependency creates Blocked. Insert as-is, then manually
387        // update via SQL to simulate a non-blocked state (race condition).
388        store.insert_transfer(&t2).await.expect("insert t2");
389
390        // Manually update T2 to in_flight via SQL (simulating a race)
391        let t2_id_clone = t2.id().to_string();
392        store
393            .conn
394            .call(move |conn| {
395                conn.execute(
396                    "UPDATE transfers SET state = 'in_flight' WHERE id = ?",
397                    params![t2_id_clone],
398                )
399                .map_err(|e| InfraError::Store {
400                    op: "sqlite",
401                    reason: format!("{e}"),
402                })
403            })
404            .await
405            .expect("manual update");
406
407        // unblock should NOT touch in_flight transfers
408        let unblocked = store.unblock_dependents(&t1_id).await.expect("unblock");
409        assert_eq!(unblocked, 0, "in_flight transfer must not be unblocked");
410    }
411
412    #[tokio::test]
413    async fn unblock_dependents_multiple_dependents() {
414        use crate::domain::transfer::TransferKind;
415
416        let store = SqliteSyncStore::open_in_memory().await.expect("open");
417        let file_a = insert_test_topology_file(&store, "output/multi_a.png").await;
418        let file_b = insert_test_topology_file(&store, "output/multi_b.png").await;
419
420        // T1: local→cloud (shared dependency)
421        let t1 =
422            Transfer::new(file_a.id().to_string(), loc("local"), loc("cloud")).expect("valid t1");
423        let t1_id = t1.id().to_string();
424        store.insert_transfer(&t1).await.expect("insert t1");
425
426        // T2: cloud→pod for file_a (Blocked, depends_on=T1)
427        let t2 = Transfer::with_dependency(
428            file_a.id().to_string(),
429            loc("cloud"),
430            loc("pod"),
431            TransferKind::Sync,
432            t1_id.clone(),
433        )
434        .expect("valid t2");
435        store.insert_transfer(&t2).await.expect("insert t2");
436
437        // T3: cloud→nas for file_b (Blocked, depends_on=T1)
438        let t3 = Transfer::with_dependency(
439            file_b.id().to_string(),
440            loc("cloud"),
441            loc("nas"),
442            TransferKind::Sync,
443            t1_id.clone(),
444        )
445        .expect("valid t3");
446        store.insert_transfer(&t3).await.expect("insert t3");
447
448        // Unblock both at once
449        let unblocked = store.unblock_dependents(&t1_id).await.expect("unblock");
450        assert_eq!(unblocked, 2, "both blocked transfers should be unblocked");
451
452        // Verify both are now queued
453        let pod_queued = store.queued_transfers(&loc("pod")).await.expect("pod");
454        assert_eq!(pod_queued.len(), 1);
455        let nas_queued = store.queued_transfers(&loc("nas")).await.expect("nas");
456        assert_eq!(nas_queued.len(), 1);
457    }
458
459    #[tokio::test]
460    async fn unblock_dependents_no_dependents_returns_zero() {
461        let store = SqliteSyncStore::open_in_memory().await.expect("open");
462
463        // No transfers at all — should return 0 without error
464        let unblocked = store
465            .unblock_dependents("nonexistent-id")
466            .await
467            .expect("unblock");
468        assert_eq!(unblocked, 0);
469    }
470}