Skip to main content

vdsl_sync/infra/sqlite/
mod.rs

1//! SQLite implementation of file/transfer/task stores.
2//!
3//! Uses normalized schema: `topology_files` + `location_files` + `transfers` + `sync_tasks`.
4//! Schema versioning via `PRAGMA user_version` (see [`schema`]).
5//! Designed for single-writer (sync engine), concurrent readers OK.
6//!
7//! Uses `tokio-rusqlite` for non-blocking async access — each connection
8//! runs on a dedicated background thread with mpsc channel dispatch.
9
10mod location_file_store_impl;
11mod mapping;
12mod schema;
13mod task_store_impl;
14mod topology_file_store_impl;
15mod transfer_store_impl;
16
17use std::path::Path;
18
19use crate::infra::error::InfraError;
20
21/// SQLite-backed sync store.
22///
23/// Uses `tokio_rusqlite::Connection` — a handle that dispatches closures
24/// to a dedicated background thread via mpsc channel. Does not block
25/// the async runtime.
26pub struct SqliteSyncStore {
27    conn: tokio_rusqlite::Connection,
28}
29
30impl SqliteSyncStore {
31    /// Open (or create) a sync database at the given path.
32    pub async fn open(path: &Path) -> Result<Self, InfraError> {
33        let path = path.to_path_buf();
34        let conn =
35            tokio_rusqlite::Connection::open(&path)
36                .await
37                .map_err(|e| InfraError::Store {
38                    op: "sqlite",
39                    reason: format!("open failed: {e}"),
40                })?;
41        conn.call(schema::init_connection)
42            .await
43            .map_err(map_call_err)?;
44        Ok(Self { conn })
45    }
46
47    /// Open an in-memory database (for testing).
48    pub async fn open_in_memory() -> Result<Self, InfraError> {
49        let conn = tokio_rusqlite::Connection::open_in_memory()
50            .await
51            .map_err(|e| InfraError::Store {
52                op: "sqlite",
53                reason: format!("open_in_memory failed: {e}"),
54            })?;
55        conn.call(schema::init_connection)
56            .await
57            .map_err(map_call_err)?;
58        Ok(Self { conn })
59    }
60}
61
62// =============================================================================
63// Error mapping
64// =============================================================================
65
66/// Convert `tokio_rusqlite::Error<InfraError>` → `InfraError`.
67fn map_call_err(e: tokio_rusqlite::Error<InfraError>) -> InfraError {
68    match e {
69        tokio_rusqlite::Error::Error(infra_err) => infra_err,
70        tokio_rusqlite::Error::ConnectionClosed => InfraError::Store {
71            op: "sqlite",
72            reason: "sqlite connection closed".into(),
73        },
74        tokio_rusqlite::Error::Close((_, e)) => InfraError::Store {
75            op: "sqlite",
76            reason: format!("sqlite close error: {e}"),
77        },
78        other => InfraError::Store {
79            op: "sqlite",
80            reason: format!("tokio-rusqlite: {other:?}"),
81        },
82    }
83}
84
85// =============================================================================
86// Tests
87// =============================================================================
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92
93    use rusqlite::params;
94
95    use crate::domain::file_type::FileType;
96    use crate::domain::location::LocationId;
97    use crate::domain::topology_file::TopologyFile;
98    use crate::domain::transfer::Transfer;
99    use crate::infra::topology_file_store::TopologyFileStore;
100    use crate::infra::transfer_store::TransferStore;
101
102    fn loc(s: &str) -> LocationId {
103        LocationId::new(s).expect("valid test location")
104    }
105
106    /// Create a test TopologyFile and insert it into the store.
107    /// Returns the TopologyFile (for use as FK target in transfers).
108    async fn insert_test_topology_file(store: &SqliteSyncStore, path: &str) -> TopologyFile {
109        let tf =
110            TopologyFile::new(path.to_string(), FileType::Image).expect("valid test topology file");
111        TopologyFileStore::upsert(store, &tf)
112            .await
113            .expect("insert topology file");
114        tf
115    }
116
117    // =========================================================================
118    // TransferStore tests
119    // =========================================================================
120
121    #[tokio::test]
122    async fn insert_and_query_transfer() {
123        let store = SqliteSyncStore::open_in_memory().await.expect("open");
124        let file = insert_test_topology_file(&store, "output/t.png").await;
125
126        let transfer =
127            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
128        store
129            .insert_transfer(&transfer)
130            .await
131            .expect("insert transfer");
132
133        let queued = store.queued_transfers(&loc("cloud")).await.expect("queued");
134        assert_eq!(queued.len(), 1);
135        assert_eq!(queued[0].file_id(), file.id());
136        assert_eq!(queued[0].dest(), &loc("cloud"));
137    }
138
139    #[tokio::test]
140    async fn update_transfer_state() {
141        let store = SqliteSyncStore::open_in_memory().await.expect("open");
142        let file = insert_test_topology_file(&store, "output/s.png").await;
143
144        let mut transfer =
145            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
146        store
147            .insert_transfer(&transfer)
148            .await
149            .expect("insert transfer");
150
151        transfer.start().expect("start");
152        store
153            .update_transfer(&transfer)
154            .await
155            .expect("update transfer");
156
157        let queued = store.queued_transfers(&loc("cloud")).await.expect("queued");
158        assert_eq!(queued.len(), 0);
159
160        transfer.complete().expect("complete");
161        store
162            .update_transfer(&transfer)
163            .await
164            .expect("update transfer");
165
166        let latest = store
167            .latest_transfers_by_file(file.id())
168            .await
169            .expect("latest");
170        assert_eq!(latest.len(), 1);
171        assert_eq!(
172            latest[0].state(),
173            crate::domain::transfer::TransferState::Completed
174        );
175    }
176
177    #[tokio::test]
178    async fn failed_transfers_query() {
179        let store = SqliteSyncStore::open_in_memory().await.expect("open");
180        let file = insert_test_topology_file(&store, "output/f.png").await;
181
182        let mut transfer =
183            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
184        transfer.start().expect("start");
185        transfer
186            .fail(
187                "timeout".into(),
188                crate::domain::retry::TransferErrorKind::Transient,
189            )
190            .expect("fail");
191        store
192            .insert_transfer(&transfer)
193            .await
194            .expect("insert transfer");
195
196        let failed = store.failed_transfers().await.expect("failed");
197        assert_eq!(failed.len(), 1);
198        assert_eq!(failed[0].error(), Some("timeout"));
199        assert_eq!(
200            failed[0].error_kind(),
201            Some(crate::domain::retry::TransferErrorKind::Transient)
202        );
203    }
204
205    #[tokio::test]
206    async fn failed_transfers_excludes_retried() {
207        let store = SqliteSyncStore::open_in_memory().await.expect("open");
208        let file = insert_test_topology_file(&store, "output/retry.png").await;
209
210        // T1: Failed (attempt=1)
211        let mut t1 =
212            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
213        t1.start().expect("start");
214        t1.fail(
215            "net error".into(),
216            crate::domain::retry::TransferErrorKind::Transient,
217        )
218        .expect("fail");
219        store.insert_transfer(&t1).await.expect("insert t1");
220
221        // T2: retry of T1 → Queued (attempt=2), then fails again
222        let mut t2 = t1.retry().expect("retry");
223        t2.start().expect("start");
224        t2.fail(
225            "net error again".into(),
226            crate::domain::retry::TransferErrorKind::Transient,
227        )
228        .expect("fail");
229        store.insert_transfer(&t2).await.expect("insert t2");
230
231        // failed_transfers should return only T2 (latest), not T1
232        let failed = store.failed_transfers().await.expect("failed");
233        assert_eq!(
234            failed.len(),
235            1,
236            "should return only the latest failed transfer"
237        );
238        assert_eq!(failed[0].error(), Some("net error again"));
239        assert_eq!(failed[0].attempt(), 2);
240    }
241
242    #[tokio::test]
243    async fn latest_transfers_by_file_returns_latest_per_dest() {
244        let store = SqliteSyncStore::open_in_memory().await.expect("open");
245        let file = insert_test_topology_file(&store, "output/r.png").await;
246
247        let mut t1 =
248            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
249        t1.start().expect("start");
250        t1.fail(
251            "err".into(),
252            crate::domain::retry::TransferErrorKind::Transient,
253        )
254        .expect("fail");
255        store.insert_transfer(&t1).await.expect("insert t1");
256
257        let t2 = t1.retry().expect("retry");
258        store.insert_transfer(&t2).await.expect("insert t2");
259
260        let mut t3 = Transfer::new(file.id().to_string(), loc("local"), loc("pod")).expect("valid");
261        t3.start().expect("start");
262        t3.complete().expect("complete");
263        store.insert_transfer(&t3).await.expect("insert t3");
264
265        let latest = store
266            .latest_transfers_by_file(file.id())
267            .await
268            .expect("latest");
269        assert_eq!(latest.len(), 2);
270
271        let cloud = latest
272            .iter()
273            .find(|t| t.dest() == &loc("cloud"))
274            .expect("cloud");
275        assert_eq!(
276            cloud.state(),
277            crate::domain::transfer::TransferState::Queued
278        );
279        assert_eq!(cloud.attempt(), 2);
280
281        let pod = latest
282            .iter()
283            .find(|t| t.dest() == &loc("pod"))
284            .expect("pod");
285        assert_eq!(
286            pod.state(),
287            crate::domain::transfer::TransferState::Completed
288        );
289    }
290
291    #[tokio::test]
292    async fn queued_returns_only_latest_per_file_dest() {
293        let store = SqliteSyncStore::open_in_memory().await.expect("open");
294        let file = insert_test_topology_file(&store, "output/q.png").await;
295
296        let mut t1 =
297            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
298        t1.start().expect("start");
299        t1.fail(
300            "err".into(),
301            crate::domain::retry::TransferErrorKind::Transient,
302        )
303        .expect("fail");
304        store.insert_transfer(&t1).await.expect("insert t1");
305
306        let t2 = t1.retry().expect("retry");
307        store.insert_transfer(&t2).await.expect("insert t2");
308
309        let queued = store.queued_transfers(&loc("cloud")).await.expect("queued");
310        assert_eq!(queued.len(), 1);
311        assert_eq!(queued[0].attempt(), 2);
312    }
313
314    // =========================================================================
315    // unblock_dependents tests
316    // =========================================================================
317
318    #[tokio::test]
319    async fn unblock_dependents_transitions_blocked_to_queued() {
320        use crate::domain::transfer::TransferKind;
321
322        let store = SqliteSyncStore::open_in_memory().await.expect("open");
323        let file = insert_test_topology_file(&store, "output/chain.png").await;
324
325        // T1: local→cloud (Queued — 先行transfer)
326        let t1 =
327            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid t1");
328        let t1_id = t1.id().to_string();
329        store.insert_transfer(&t1).await.expect("insert t1");
330
331        // T2: cloud→pod (Blocked, depends_on=T1)
332        let t2 = Transfer::with_dependency(
333            file.id().to_string(),
334            loc("cloud"),
335            loc("pod"),
336            TransferKind::Sync,
337            t1_id.clone(),
338        )
339        .expect("valid t2");
340        let t2_id = t2.id().to_string();
341        store.insert_transfer(&t2).await.expect("insert t2");
342
343        // Before unblock: T2 should NOT appear in queued_transfers
344        let queued_before = store.queued_transfers(&loc("pod")).await.expect("queued");
345        assert_eq!(
346            queued_before.len(),
347            0,
348            "blocked transfer must not appear in queued"
349        );
350
351        // Simulate T1 completion → unblock dependents
352        let unblocked = store.unblock_dependents(&t1_id).await.expect("unblock");
353        assert_eq!(unblocked, 1, "exactly one transfer should be unblocked");
354
355        // After unblock: T2 should appear in queued_transfers
356        let queued_after = store.queued_transfers(&loc("pod")).await.expect("queued");
357        assert_eq!(
358            queued_after.len(),
359            1,
360            "unblocked transfer must appear in queued"
361        );
362        assert_eq!(queued_after[0].id(), t2_id);
363    }
364
365    #[tokio::test]
366    async fn unblock_dependents_ignores_non_blocked_state() {
367        use crate::domain::transfer::TransferKind;
368
369        let store = SqliteSyncStore::open_in_memory().await.expect("open");
370        let file = insert_test_topology_file(&store, "output/nonblock.png").await;
371
372        // T1: local→cloud (Queued)
373        let t1 =
374            Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid t1");
375        let t1_id = t1.id().to_string();
376        store.insert_transfer(&t1).await.expect("insert t1");
377
378        // T2: depends on T1, but manually set to in_flight (not blocked)
379        let t2 = Transfer::with_dependency(
380            file.id().to_string(),
381            loc("cloud"),
382            loc("pod"),
383            TransferKind::Sync,
384            t1_id.clone(),
385        )
386        .expect("valid t2");
387        // with_dependency creates Blocked. Insert as-is, then manually
388        // update via SQL to simulate a non-blocked state (race condition).
389        store.insert_transfer(&t2).await.expect("insert t2");
390
391        // Manually update T2 to in_flight via SQL (simulating a race)
392        let t2_id_clone = t2.id().to_string();
393        store
394            .conn
395            .call(move |conn| {
396                conn.execute(
397                    "UPDATE transfers SET state = 'in_flight' WHERE id = ?",
398                    params![t2_id_clone],
399                )
400                .map_err(|e| InfraError::Store {
401                    op: "sqlite",
402                    reason: format!("{e}"),
403                })
404            })
405            .await
406            .expect("manual update");
407
408        // unblock should NOT touch in_flight transfers
409        let unblocked = store.unblock_dependents(&t1_id).await.expect("unblock");
410        assert_eq!(unblocked, 0, "in_flight transfer must not be unblocked");
411    }
412
413    #[tokio::test]
414    async fn unblock_dependents_multiple_dependents() {
415        use crate::domain::transfer::TransferKind;
416
417        let store = SqliteSyncStore::open_in_memory().await.expect("open");
418        let file_a = insert_test_topology_file(&store, "output/multi_a.png").await;
419        let file_b = insert_test_topology_file(&store, "output/multi_b.png").await;
420
421        // T1: local→cloud (shared dependency)
422        let t1 =
423            Transfer::new(file_a.id().to_string(), loc("local"), loc("cloud")).expect("valid t1");
424        let t1_id = t1.id().to_string();
425        store.insert_transfer(&t1).await.expect("insert t1");
426
427        // T2: cloud→pod for file_a (Blocked, depends_on=T1)
428        let t2 = Transfer::with_dependency(
429            file_a.id().to_string(),
430            loc("cloud"),
431            loc("pod"),
432            TransferKind::Sync,
433            t1_id.clone(),
434        )
435        .expect("valid t2");
436        store.insert_transfer(&t2).await.expect("insert t2");
437
438        // T3: cloud→nas for file_b (Blocked, depends_on=T1)
439        let t3 = Transfer::with_dependency(
440            file_b.id().to_string(),
441            loc("cloud"),
442            loc("nas"),
443            TransferKind::Sync,
444            t1_id.clone(),
445        )
446        .expect("valid t3");
447        store.insert_transfer(&t3).await.expect("insert t3");
448
449        // Unblock both at once
450        let unblocked = store.unblock_dependents(&t1_id).await.expect("unblock");
451        assert_eq!(unblocked, 2, "both blocked transfers should be unblocked");
452
453        // Verify both are now queued
454        let pod_queued = store.queued_transfers(&loc("pod")).await.expect("pod");
455        assert_eq!(pod_queued.len(), 1);
456        let nas_queued = store.queued_transfers(&loc("nas")).await.expect("nas");
457        assert_eq!(nas_queued.len(), 1);
458    }
459
460    #[tokio::test]
461    async fn unblock_dependents_no_dependents_returns_zero() {
462        let store = SqliteSyncStore::open_in_memory().await.expect("open");
463
464        // No transfers at all — should return 0 without error
465        let unblocked = store
466            .unblock_dependents("nonexistent-id")
467            .await
468            .expect("unblock");
469        assert_eq!(unblocked, 0);
470    }
471
472    // =========================================================================
473    // E2E: 2回sync → list_deleted が0件であることを検証
474    //
475    // 再現対象: delete 3514件問題
476    // 仮説: 2回目syncでDiscoveredが新IDで生成され、
477    //        upsert path conflict retireで既存TFがdeleted化
478    // =========================================================================
479
480    #[tokio::test]
481    async fn two_syncs_should_not_create_deleted_topology_files() {
482        use crate::application::topology_store::TopologyStore;
483        use crate::domain::digest::{ByteDigest, ContentDigest};
484        use crate::domain::fingerprint::FileFingerprint;
485        use crate::domain::graph::RouteGraph;
486        use crate::domain::topology_delta::{DiscoveredFile, TopologyDelta};
487        use crate::infra::location_file_store::LocationFileStore;
488        use crate::infra::topology_file_store::TopologyFileStore;
489
490        let store = SqliteSyncStore::open_in_memory().await.expect("open");
491        let store = std::sync::Arc::new(store);
492
493        let local = loc("local");
494        let cloud = loc("cloud");
495        let mut graph = RouteGraph::new();
496        graph.add(local.clone(), cloud.clone());
497        graph.add(cloud.clone(), local.clone());
498        let locations = vec![local.clone(), cloud.clone()];
499
500        let topo = TopologyStore::new(
501            store.clone() as std::sync::Arc<dyn TopologyFileStore>,
502            store.clone() as std::sync::Arc<dyn LocationFileStore>,
503            store.clone() as std::sync::Arc<dyn crate::infra::transfer_store::TransferStore>,
504            graph,
505            locations,
506        );
507
508        // 10ファイル分のDiscovered delta
509        let make_deltas = |origin: &LocationId| -> Vec<TopologyDelta> {
510            (0..10)
511                .map(|i| {
512                    TopologyDelta::Discovered(DiscoveredFile {
513                        relative_path: format!("output/img-{i:04}.png"),
514                        file_type: FileType::Image,
515                        fingerprint: FileFingerprint {
516                            byte_digest: Some(ByteDigest::Djb2(format!("hash-{i}"))),
517                            content_digest: Some(ContentDigest(format!("hash-{i}"))),
518                            meta_digest: None,
519                            size: 1024,
520                            modified_at: None,
521                        },
522                        origin: origin.clone(),
523                        embedded_id: None,
524                    })
525                })
526                .collect()
527        };
528
529        // 1回目sync
530        let deltas1 = make_deltas(&local);
531        let result1 = topo.sync(&deltas1).await.expect("sync1");
532        assert_eq!(result1.ingested, 10, "sync1: 10 deltas ingested");
533
534        let deleted_after_sync1 = TopologyFileStore::list_deleted(&*store)
535            .await
536            .expect("list_deleted");
537        assert_eq!(
538            deleted_after_sync1.len(),
539            0,
540            "sync1: no deleted TFs expected"
541        );
542
543        // 2回目sync — 同じファイル群、同じfingerprint
544        // match_and_classify で ByPath → fingerprint unchanged → Skip となるはず
545        // → delta 0件で sync に渡される
546        // しかし実際は compute_topology_deltas を経由するので、
547        // ここでは apply_ingest に渡す Discovered を直接構築して
548        // 「2回目にDiscoveredが再生成される」シナリオをテストする
549        let deltas2 = make_deltas(&local);
550        let _result2 = topo.sync(&deltas2).await.expect("sync2");
551
552        let deleted_after_sync2 = TopologyFileStore::list_deleted(&*store)
553            .await
554            .expect("list_deleted");
555        assert_eq!(
556            deleted_after_sync2.len(),
557            0,
558            "sync2: no deleted TFs expected, but got {} (path conflict retire?)",
559            deleted_after_sync2.len()
560        );
561
562        // activeは10件のまま
563        let active = TopologyFileStore::count_active(&*store)
564            .await
565            .expect("count_active");
566        assert_eq!(active, 10, "active TFs should remain 10");
567    }
568
569    /// Discovered → Vanished → 再Discovered で deleted TF が蓄積しないことを検証
570    #[tokio::test]
571    async fn discovered_vanished_rediscovered_no_stale_deleted() {
572        use crate::application::topology_store::TopologyStore;
573        use crate::domain::digest::{ByteDigest, ContentDigest};
574        use crate::domain::fingerprint::FileFingerprint;
575        use crate::domain::graph::RouteGraph;
576        use crate::domain::topology_delta::{DiscoveredFile, TopologyDelta, VanishedFile};
577        use crate::infra::location_file_store::LocationFileStore;
578        use crate::infra::topology_file_store::TopologyFileStore;
579
580        let store = std::sync::Arc::new(SqliteSyncStore::open_in_memory().await.expect("open"));
581
582        let local = loc("local");
583        let cloud = loc("cloud");
584        let mut graph = RouteGraph::new();
585        graph.add(local.clone(), cloud.clone());
586        graph.add(cloud.clone(), local.clone());
587
588        let topo = TopologyStore::new(
589            store.clone() as std::sync::Arc<dyn TopologyFileStore>,
590            store.clone() as std::sync::Arc<dyn LocationFileStore>,
591            store.clone() as std::sync::Arc<dyn crate::infra::transfer_store::TransferStore>,
592            graph,
593            vec![local.clone(), cloud.clone()],
594        );
595
596        let fp = FileFingerprint {
597            byte_digest: Some(ByteDigest::Djb2("abc".into())),
598            content_digest: Some(ContentDigest("abc".into())),
599            meta_digest: None,
600            size: 1024,
601            modified_at: None,
602        };
603
604        // sync1: Discovered
605        let d1 = vec![TopologyDelta::Discovered(DiscoveredFile {
606            relative_path: "output/test.png".into(),
607            file_type: FileType::Image,
608            fingerprint: fp.clone(),
609            origin: local.clone(),
610            embedded_id: None,
611        })];
612        topo.sync(&d1).await.expect("sync1");
613
614        let tf_id = {
615            let tfs = TopologyFileStore::list_active(&*store, None, None)
616                .await
617                .expect("list");
618            assert_eq!(tfs.len(), 1);
619            tfs[0].id().to_string()
620        };
621
622        // sync2: Vanished(ファイル消失)
623        let d2 = vec![TopologyDelta::Vanished(VanishedFile {
624            topology_file_id: tf_id.clone(),
625            relative_path: "output/test.png".into(),
626            origin: local.clone(),
627        })];
628        topo.sync(&d2).await.expect("sync2");
629
630        // Vanished後: TF自体はdeleted化しない(mark_deletedは撤回済み)
631        let deleted_after_vanish = TopologyFileStore::list_deleted(&*store)
632            .await
633            .expect("list_deleted");
634        assert_eq!(
635            deleted_after_vanish.len(),
636            0,
637            "Vanished should not mark_deleted TF"
638        );
639
640        // sync3: 再Discovered(同じファイルが戻ってきた)
641        let d3 = vec![TopologyDelta::Discovered(DiscoveredFile {
642            relative_path: "output/test.png".into(),
643            file_type: FileType::Image,
644            fingerprint: fp.clone(),
645            origin: local.clone(),
646            embedded_id: None,
647        })];
648        topo.sync(&d3).await.expect("sync3");
649
650        let deleted_after_rediscovery = TopologyFileStore::list_deleted(&*store)
651            .await
652            .expect("list_deleted");
653        assert_eq!(
654            deleted_after_rediscovery.len(),
655            0,
656            "Re-Discovered should reuse existing TF, not create path conflict. Got {} deleted.",
657            deleted_after_rediscovery.len()
658        );
659    }
660
661    /// 複数Location(local + cloud)から同一ファイルをDiscoveredで報告 → 2回sync
662    #[tokio::test]
663    async fn two_syncs_multi_origin_no_deleted() {
664        use crate::application::topology_store::TopologyStore;
665        use crate::domain::digest::{ByteDigest, ContentDigest};
666        use crate::domain::fingerprint::FileFingerprint;
667        use crate::domain::graph::RouteGraph;
668        use crate::domain::topology_delta::{DiscoveredFile, TopologyDelta};
669        use crate::infra::location_file_store::LocationFileStore;
670        use crate::infra::topology_file_store::TopologyFileStore;
671
672        let store = std::sync::Arc::new(SqliteSyncStore::open_in_memory().await.expect("open"));
673
674        let local = loc("local");
675        let cloud = loc("cloud");
676        let mut graph = RouteGraph::new();
677        graph.add(local.clone(), cloud.clone());
678        graph.add(cloud.clone(), local.clone());
679
680        let topo = TopologyStore::new(
681            store.clone() as std::sync::Arc<dyn TopologyFileStore>,
682            store.clone() as std::sync::Arc<dyn LocationFileStore>,
683            store.clone() as std::sync::Arc<dyn crate::infra::transfer_store::TransferStore>,
684            graph,
685            vec![local.clone(), cloud.clone()],
686        );
687
688        let fp = |i: usize| FileFingerprint {
689            byte_digest: Some(ByteDigest::Djb2(format!("h-{i}"))),
690            content_digest: Some(ContentDigest(format!("h-{i}"))),
691            meta_digest: None,
692            size: 2048,
693            modified_at: None,
694        };
695
696        // sync1: local + cloud の両方から同一5ファイルをDiscovered
697        let mut deltas1 = Vec::new();
698        for i in 0..5 {
699            for origin in [&local, &cloud] {
700                deltas1.push(TopologyDelta::Discovered(DiscoveredFile {
701                    relative_path: format!("output/multi-{i:04}.png"),
702                    file_type: FileType::Image,
703                    fingerprint: fp(i),
704                    origin: origin.clone(),
705                    embedded_id: None,
706                }));
707            }
708        }
709        topo.sync(&deltas1).await.expect("sync1");
710
711        let deleted1 = TopologyFileStore::list_deleted(&*store)
712            .await
713            .expect("list_deleted");
714        assert_eq!(deleted1.len(), 0, "sync1: no deleted TFs");
715
716        // sync2: 同じデルタ
717        topo.sync(&deltas1).await.expect("sync2");
718
719        let deleted2 = TopologyFileStore::list_deleted(&*store)
720            .await
721            .expect("list_deleted");
722        assert_eq!(
723            deleted2.len(),
724            0,
725            "sync2: no deleted TFs, got {} — paths: {:?}",
726            deleted2.len(),
727            deleted2
728                .iter()
729                .map(|t| t.relative_path())
730                .collect::<Vec<_>>()
731        );
732    }
733}