1mod location_file_store_impl;
11mod mapping;
12mod schema;
13mod task_store_impl;
14mod topology_file_store_impl;
15mod transfer_store_impl;
16
17use std::path::Path;
18
19use crate::infra::error::InfraError;
20
21pub struct SqliteSyncStore {
27 conn: tokio_rusqlite::Connection,
28}
29
30impl SqliteSyncStore {
31 pub async fn open(path: &Path) -> Result<Self, InfraError> {
33 let path = path.to_path_buf();
34 let conn =
35 tokio_rusqlite::Connection::open(&path)
36 .await
37 .map_err(|e| InfraError::Store {
38 op: "sqlite",
39 reason: format!("open failed: {e}"),
40 })?;
41 conn.call(schema::init_connection)
42 .await
43 .map_err(map_call_err)?;
44 Ok(Self { conn })
45 }
46
47 pub async fn open_in_memory() -> Result<Self, InfraError> {
49 let conn = tokio_rusqlite::Connection::open_in_memory()
50 .await
51 .map_err(|e| InfraError::Store {
52 op: "sqlite",
53 reason: format!("open_in_memory failed: {e}"),
54 })?;
55 conn.call(schema::init_connection)
56 .await
57 .map_err(map_call_err)?;
58 Ok(Self { conn })
59 }
60}
61
62fn map_call_err(e: tokio_rusqlite::Error<InfraError>) -> InfraError {
68 match e {
69 tokio_rusqlite::Error::Error(infra_err) => infra_err,
70 tokio_rusqlite::Error::ConnectionClosed => InfraError::Store {
71 op: "sqlite",
72 reason: "sqlite connection closed".into(),
73 },
74 tokio_rusqlite::Error::Close((_, e)) => InfraError::Store {
75 op: "sqlite",
76 reason: format!("sqlite close error: {e}"),
77 },
78 other => InfraError::Store {
79 op: "sqlite",
80 reason: format!("tokio-rusqlite: {other:?}"),
81 },
82 }
83}
84
85#[cfg(test)]
90mod tests {
91 use super::*;
92
93 use rusqlite::params;
94
95 use crate::domain::file_type::FileType;
96 use crate::domain::location::LocationId;
97 use crate::domain::topology_file::TopologyFile;
98 use crate::domain::transfer::Transfer;
99 use crate::infra::topology_file_store::TopologyFileStore;
100 use crate::infra::transfer_store::TransferStore;
101
102 fn loc(s: &str) -> LocationId {
103 LocationId::new(s).expect("valid test location")
104 }
105
106 async fn insert_test_topology_file(store: &SqliteSyncStore, path: &str) -> TopologyFile {
109 let tf =
110 TopologyFile::new(path.to_string(), FileType::Image).expect("valid test topology file");
111 TopologyFileStore::upsert(store, &tf)
112 .await
113 .expect("insert topology file");
114 tf
115 }
116
117 #[tokio::test]
122 async fn insert_and_query_transfer() {
123 let store = SqliteSyncStore::open_in_memory().await.expect("open");
124 let file = insert_test_topology_file(&store, "output/t.png").await;
125
126 let transfer =
127 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
128 store
129 .insert_transfer(&transfer)
130 .await
131 .expect("insert transfer");
132
133 let queued = store.queued_transfers(&loc("cloud")).await.expect("queued");
134 assert_eq!(queued.len(), 1);
135 assert_eq!(queued[0].file_id(), file.id());
136 assert_eq!(queued[0].dest(), &loc("cloud"));
137 }
138
139 #[tokio::test]
140 async fn update_transfer_state() {
141 let store = SqliteSyncStore::open_in_memory().await.expect("open");
142 let file = insert_test_topology_file(&store, "output/s.png").await;
143
144 let mut transfer =
145 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
146 store
147 .insert_transfer(&transfer)
148 .await
149 .expect("insert transfer");
150
151 transfer.start().expect("start");
152 store
153 .update_transfer(&transfer)
154 .await
155 .expect("update transfer");
156
157 let queued = store.queued_transfers(&loc("cloud")).await.expect("queued");
158 assert_eq!(queued.len(), 0);
159
160 transfer.complete().expect("complete");
161 store
162 .update_transfer(&transfer)
163 .await
164 .expect("update transfer");
165
166 let latest = store
167 .latest_transfers_by_file(file.id())
168 .await
169 .expect("latest");
170 assert_eq!(latest.len(), 1);
171 assert_eq!(
172 latest[0].state(),
173 crate::domain::transfer::TransferState::Completed
174 );
175 }
176
177 #[tokio::test]
178 async fn failed_transfers_query() {
179 let store = SqliteSyncStore::open_in_memory().await.expect("open");
180 let file = insert_test_topology_file(&store, "output/f.png").await;
181
182 let mut transfer =
183 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
184 transfer.start().expect("start");
185 transfer
186 .fail(
187 "timeout".into(),
188 crate::domain::retry::TransferErrorKind::Transient,
189 )
190 .expect("fail");
191 store
192 .insert_transfer(&transfer)
193 .await
194 .expect("insert transfer");
195
196 let failed = store.failed_transfers().await.expect("failed");
197 assert_eq!(failed.len(), 1);
198 assert_eq!(failed[0].error(), Some("timeout"));
199 assert_eq!(
200 failed[0].error_kind(),
201 Some(crate::domain::retry::TransferErrorKind::Transient)
202 );
203 }
204
205 #[tokio::test]
206 async fn failed_transfers_excludes_retried() {
207 let store = SqliteSyncStore::open_in_memory().await.expect("open");
208 let file = insert_test_topology_file(&store, "output/retry.png").await;
209
210 let mut t1 =
212 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
213 t1.start().expect("start");
214 t1.fail(
215 "net error".into(),
216 crate::domain::retry::TransferErrorKind::Transient,
217 )
218 .expect("fail");
219 store.insert_transfer(&t1).await.expect("insert t1");
220
221 let mut t2 = t1.retry().expect("retry");
223 t2.start().expect("start");
224 t2.fail(
225 "net error again".into(),
226 crate::domain::retry::TransferErrorKind::Transient,
227 )
228 .expect("fail");
229 store.insert_transfer(&t2).await.expect("insert t2");
230
231 let failed = store.failed_transfers().await.expect("failed");
233 assert_eq!(
234 failed.len(),
235 1,
236 "should return only the latest failed transfer"
237 );
238 assert_eq!(failed[0].error(), Some("net error again"));
239 assert_eq!(failed[0].attempt(), 2);
240 }
241
242 #[tokio::test]
243 async fn latest_transfers_by_file_returns_latest_per_dest() {
244 let store = SqliteSyncStore::open_in_memory().await.expect("open");
245 let file = insert_test_topology_file(&store, "output/r.png").await;
246
247 let mut t1 =
248 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
249 t1.start().expect("start");
250 t1.fail(
251 "err".into(),
252 crate::domain::retry::TransferErrorKind::Transient,
253 )
254 .expect("fail");
255 store.insert_transfer(&t1).await.expect("insert t1");
256
257 let t2 = t1.retry().expect("retry");
258 store.insert_transfer(&t2).await.expect("insert t2");
259
260 let mut t3 = Transfer::new(file.id().to_string(), loc("local"), loc("pod")).expect("valid");
261 t3.start().expect("start");
262 t3.complete().expect("complete");
263 store.insert_transfer(&t3).await.expect("insert t3");
264
265 let latest = store
266 .latest_transfers_by_file(file.id())
267 .await
268 .expect("latest");
269 assert_eq!(latest.len(), 2);
270
271 let cloud = latest
272 .iter()
273 .find(|t| t.dest() == &loc("cloud"))
274 .expect("cloud");
275 assert_eq!(
276 cloud.state(),
277 crate::domain::transfer::TransferState::Queued
278 );
279 assert_eq!(cloud.attempt(), 2);
280
281 let pod = latest
282 .iter()
283 .find(|t| t.dest() == &loc("pod"))
284 .expect("pod");
285 assert_eq!(
286 pod.state(),
287 crate::domain::transfer::TransferState::Completed
288 );
289 }
290
291 #[tokio::test]
292 async fn queued_returns_only_latest_per_file_dest() {
293 let store = SqliteSyncStore::open_in_memory().await.expect("open");
294 let file = insert_test_topology_file(&store, "output/q.png").await;
295
296 let mut t1 =
297 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
298 t1.start().expect("start");
299 t1.fail(
300 "err".into(),
301 crate::domain::retry::TransferErrorKind::Transient,
302 )
303 .expect("fail");
304 store.insert_transfer(&t1).await.expect("insert t1");
305
306 let t2 = t1.retry().expect("retry");
307 store.insert_transfer(&t2).await.expect("insert t2");
308
309 let queued = store.queued_transfers(&loc("cloud")).await.expect("queued");
310 assert_eq!(queued.len(), 1);
311 assert_eq!(queued[0].attempt(), 2);
312 }
313
314 #[tokio::test]
319 async fn unblock_dependents_transitions_blocked_to_queued() {
320 use crate::domain::transfer::TransferKind;
321
322 let store = SqliteSyncStore::open_in_memory().await.expect("open");
323 let file = insert_test_topology_file(&store, "output/chain.png").await;
324
325 let t1 =
327 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid t1");
328 let t1_id = t1.id().to_string();
329 store.insert_transfer(&t1).await.expect("insert t1");
330
331 let t2 = Transfer::with_dependency(
333 file.id().to_string(),
334 loc("cloud"),
335 loc("pod"),
336 TransferKind::Sync,
337 t1_id.clone(),
338 )
339 .expect("valid t2");
340 let t2_id = t2.id().to_string();
341 store.insert_transfer(&t2).await.expect("insert t2");
342
343 let queued_before = store.queued_transfers(&loc("pod")).await.expect("queued");
345 assert_eq!(
346 queued_before.len(),
347 0,
348 "blocked transfer must not appear in queued"
349 );
350
351 let unblocked = store.unblock_dependents(&t1_id).await.expect("unblock");
353 assert_eq!(unblocked, 1, "exactly one transfer should be unblocked");
354
355 let queued_after = store.queued_transfers(&loc("pod")).await.expect("queued");
357 assert_eq!(
358 queued_after.len(),
359 1,
360 "unblocked transfer must appear in queued"
361 );
362 assert_eq!(queued_after[0].id(), t2_id);
363 }
364
365 #[tokio::test]
366 async fn unblock_dependents_ignores_non_blocked_state() {
367 use crate::domain::transfer::TransferKind;
368
369 let store = SqliteSyncStore::open_in_memory().await.expect("open");
370 let file = insert_test_topology_file(&store, "output/nonblock.png").await;
371
372 let t1 =
374 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid t1");
375 let t1_id = t1.id().to_string();
376 store.insert_transfer(&t1).await.expect("insert t1");
377
378 let t2 = Transfer::with_dependency(
380 file.id().to_string(),
381 loc("cloud"),
382 loc("pod"),
383 TransferKind::Sync,
384 t1_id.clone(),
385 )
386 .expect("valid t2");
387 store.insert_transfer(&t2).await.expect("insert t2");
390
391 let t2_id_clone = t2.id().to_string();
393 store
394 .conn
395 .call(move |conn| {
396 conn.execute(
397 "UPDATE transfers SET state = 'in_flight' WHERE id = ?",
398 params![t2_id_clone],
399 )
400 .map_err(|e| InfraError::Store {
401 op: "sqlite",
402 reason: format!("{e}"),
403 })
404 })
405 .await
406 .expect("manual update");
407
408 let unblocked = store.unblock_dependents(&t1_id).await.expect("unblock");
410 assert_eq!(unblocked, 0, "in_flight transfer must not be unblocked");
411 }
412
413 #[tokio::test]
414 async fn unblock_dependents_multiple_dependents() {
415 use crate::domain::transfer::TransferKind;
416
417 let store = SqliteSyncStore::open_in_memory().await.expect("open");
418 let file_a = insert_test_topology_file(&store, "output/multi_a.png").await;
419 let file_b = insert_test_topology_file(&store, "output/multi_b.png").await;
420
421 let t1 =
423 Transfer::new(file_a.id().to_string(), loc("local"), loc("cloud")).expect("valid t1");
424 let t1_id = t1.id().to_string();
425 store.insert_transfer(&t1).await.expect("insert t1");
426
427 let t2 = Transfer::with_dependency(
429 file_a.id().to_string(),
430 loc("cloud"),
431 loc("pod"),
432 TransferKind::Sync,
433 t1_id.clone(),
434 )
435 .expect("valid t2");
436 store.insert_transfer(&t2).await.expect("insert t2");
437
438 let t3 = Transfer::with_dependency(
440 file_b.id().to_string(),
441 loc("cloud"),
442 loc("nas"),
443 TransferKind::Sync,
444 t1_id.clone(),
445 )
446 .expect("valid t3");
447 store.insert_transfer(&t3).await.expect("insert t3");
448
449 let unblocked = store.unblock_dependents(&t1_id).await.expect("unblock");
451 assert_eq!(unblocked, 2, "both blocked transfers should be unblocked");
452
453 let pod_queued = store.queued_transfers(&loc("pod")).await.expect("pod");
455 assert_eq!(pod_queued.len(), 1);
456 let nas_queued = store.queued_transfers(&loc("nas")).await.expect("nas");
457 assert_eq!(nas_queued.len(), 1);
458 }
459
460 #[tokio::test]
461 async fn unblock_dependents_no_dependents_returns_zero() {
462 let store = SqliteSyncStore::open_in_memory().await.expect("open");
463
464 let unblocked = store
466 .unblock_dependents("nonexistent-id")
467 .await
468 .expect("unblock");
469 assert_eq!(unblocked, 0);
470 }
471
472 #[tokio::test]
481 async fn two_syncs_should_not_create_deleted_topology_files() {
482 use crate::application::topology_store::TopologyStore;
483 use crate::domain::digest::{ByteDigest, ContentDigest};
484 use crate::domain::fingerprint::FileFingerprint;
485 use crate::domain::graph::RouteGraph;
486 use crate::domain::topology_delta::{DiscoveredFile, TopologyDelta};
487 use crate::infra::location_file_store::LocationFileStore;
488 use crate::infra::topology_file_store::TopologyFileStore;
489
490 let store = SqliteSyncStore::open_in_memory().await.expect("open");
491 let store = std::sync::Arc::new(store);
492
493 let local = loc("local");
494 let cloud = loc("cloud");
495 let mut graph = RouteGraph::new();
496 graph.add(local.clone(), cloud.clone());
497 graph.add(cloud.clone(), local.clone());
498 let locations = vec![local.clone(), cloud.clone()];
499
500 let topo = TopologyStore::new(
501 store.clone() as std::sync::Arc<dyn TopologyFileStore>,
502 store.clone() as std::sync::Arc<dyn LocationFileStore>,
503 store.clone() as std::sync::Arc<dyn crate::infra::transfer_store::TransferStore>,
504 graph,
505 locations,
506 );
507
508 let make_deltas = |origin: &LocationId| -> Vec<TopologyDelta> {
510 (0..10)
511 .map(|i| {
512 TopologyDelta::Discovered(DiscoveredFile {
513 relative_path: format!("output/img-{i:04}.png"),
514 file_type: FileType::Image,
515 fingerprint: FileFingerprint {
516 byte_digest: Some(ByteDigest::Djb2(format!("hash-{i}"))),
517 content_digest: Some(ContentDigest(format!("hash-{i}"))),
518 meta_digest: None,
519 size: 1024,
520 modified_at: None,
521 },
522 origin: origin.clone(),
523 embedded_id: None,
524 })
525 })
526 .collect()
527 };
528
529 let deltas1 = make_deltas(&local);
531 let result1 = topo.sync(&deltas1).await.expect("sync1");
532 assert_eq!(result1.ingested, 10, "sync1: 10 deltas ingested");
533
534 let deleted_after_sync1 = TopologyFileStore::list_deleted(&*store)
535 .await
536 .expect("list_deleted");
537 assert_eq!(
538 deleted_after_sync1.len(),
539 0,
540 "sync1: no deleted TFs expected"
541 );
542
543 let deltas2 = make_deltas(&local);
550 let _result2 = topo.sync(&deltas2).await.expect("sync2");
551
552 let deleted_after_sync2 = TopologyFileStore::list_deleted(&*store)
553 .await
554 .expect("list_deleted");
555 assert_eq!(
556 deleted_after_sync2.len(),
557 0,
558 "sync2: no deleted TFs expected, but got {} (path conflict retire?)",
559 deleted_after_sync2.len()
560 );
561
562 let active = TopologyFileStore::count_active(&*store)
564 .await
565 .expect("count_active");
566 assert_eq!(active, 10, "active TFs should remain 10");
567 }
568
569 #[tokio::test]
571 async fn discovered_vanished_rediscovered_no_stale_deleted() {
572 use crate::application::topology_store::TopologyStore;
573 use crate::domain::digest::{ByteDigest, ContentDigest};
574 use crate::domain::fingerprint::FileFingerprint;
575 use crate::domain::graph::RouteGraph;
576 use crate::domain::topology_delta::{DiscoveredFile, TopologyDelta, VanishedFile};
577 use crate::infra::location_file_store::LocationFileStore;
578 use crate::infra::topology_file_store::TopologyFileStore;
579
580 let store = std::sync::Arc::new(SqliteSyncStore::open_in_memory().await.expect("open"));
581
582 let local = loc("local");
583 let cloud = loc("cloud");
584 let mut graph = RouteGraph::new();
585 graph.add(local.clone(), cloud.clone());
586 graph.add(cloud.clone(), local.clone());
587
588 let topo = TopologyStore::new(
589 store.clone() as std::sync::Arc<dyn TopologyFileStore>,
590 store.clone() as std::sync::Arc<dyn LocationFileStore>,
591 store.clone() as std::sync::Arc<dyn crate::infra::transfer_store::TransferStore>,
592 graph,
593 vec![local.clone(), cloud.clone()],
594 );
595
596 let fp = FileFingerprint {
597 byte_digest: Some(ByteDigest::Djb2("abc".into())),
598 content_digest: Some(ContentDigest("abc".into())),
599 meta_digest: None,
600 size: 1024,
601 modified_at: None,
602 };
603
604 let d1 = vec![TopologyDelta::Discovered(DiscoveredFile {
606 relative_path: "output/test.png".into(),
607 file_type: FileType::Image,
608 fingerprint: fp.clone(),
609 origin: local.clone(),
610 embedded_id: None,
611 })];
612 topo.sync(&d1).await.expect("sync1");
613
614 let tf_id = {
615 let tfs = TopologyFileStore::list_active(&*store, None, None)
616 .await
617 .expect("list");
618 assert_eq!(tfs.len(), 1);
619 tfs[0].id().to_string()
620 };
621
622 let d2 = vec![TopologyDelta::Vanished(VanishedFile {
624 topology_file_id: tf_id.clone(),
625 relative_path: "output/test.png".into(),
626 origin: local.clone(),
627 })];
628 topo.sync(&d2).await.expect("sync2");
629
630 let deleted_after_vanish = TopologyFileStore::list_deleted(&*store)
632 .await
633 .expect("list_deleted");
634 assert_eq!(
635 deleted_after_vanish.len(),
636 0,
637 "Vanished should not mark_deleted TF"
638 );
639
640 let d3 = vec![TopologyDelta::Discovered(DiscoveredFile {
642 relative_path: "output/test.png".into(),
643 file_type: FileType::Image,
644 fingerprint: fp.clone(),
645 origin: local.clone(),
646 embedded_id: None,
647 })];
648 topo.sync(&d3).await.expect("sync3");
649
650 let deleted_after_rediscovery = TopologyFileStore::list_deleted(&*store)
651 .await
652 .expect("list_deleted");
653 assert_eq!(
654 deleted_after_rediscovery.len(),
655 0,
656 "Re-Discovered should reuse existing TF, not create path conflict. Got {} deleted.",
657 deleted_after_rediscovery.len()
658 );
659 }
660
661 #[tokio::test]
663 async fn two_syncs_multi_origin_no_deleted() {
664 use crate::application::topology_store::TopologyStore;
665 use crate::domain::digest::{ByteDigest, ContentDigest};
666 use crate::domain::fingerprint::FileFingerprint;
667 use crate::domain::graph::RouteGraph;
668 use crate::domain::topology_delta::{DiscoveredFile, TopologyDelta};
669 use crate::infra::location_file_store::LocationFileStore;
670 use crate::infra::topology_file_store::TopologyFileStore;
671
672 let store = std::sync::Arc::new(SqliteSyncStore::open_in_memory().await.expect("open"));
673
674 let local = loc("local");
675 let cloud = loc("cloud");
676 let mut graph = RouteGraph::new();
677 graph.add(local.clone(), cloud.clone());
678 graph.add(cloud.clone(), local.clone());
679
680 let topo = TopologyStore::new(
681 store.clone() as std::sync::Arc<dyn TopologyFileStore>,
682 store.clone() as std::sync::Arc<dyn LocationFileStore>,
683 store.clone() as std::sync::Arc<dyn crate::infra::transfer_store::TransferStore>,
684 graph,
685 vec![local.clone(), cloud.clone()],
686 );
687
688 let fp = |i: usize| FileFingerprint {
689 byte_digest: Some(ByteDigest::Djb2(format!("h-{i}"))),
690 content_digest: Some(ContentDigest(format!("h-{i}"))),
691 meta_digest: None,
692 size: 2048,
693 modified_at: None,
694 };
695
696 let mut deltas1 = Vec::new();
698 for i in 0..5 {
699 for origin in [&local, &cloud] {
700 deltas1.push(TopologyDelta::Discovered(DiscoveredFile {
701 relative_path: format!("output/multi-{i:04}.png"),
702 file_type: FileType::Image,
703 fingerprint: fp(i),
704 origin: origin.clone(),
705 embedded_id: None,
706 }));
707 }
708 }
709 topo.sync(&deltas1).await.expect("sync1");
710
711 let deleted1 = TopologyFileStore::list_deleted(&*store)
712 .await
713 .expect("list_deleted");
714 assert_eq!(deleted1.len(), 0, "sync1: no deleted TFs");
715
716 topo.sync(&deltas1).await.expect("sync2");
718
719 let deleted2 = TopologyFileStore::list_deleted(&*store)
720 .await
721 .expect("list_deleted");
722 assert_eq!(
723 deleted2.len(),
724 0,
725 "sync2: no deleted TFs, got {} — paths: {:?}",
726 deleted2.len(),
727 deleted2
728 .iter()
729 .map(|t| t.relative_path())
730 .collect::<Vec<_>>()
731 );
732 }
733}