Skip to main content

vdsl_sync/application/
topology_store.rs

1//! TopologyStore — Topology中心モデルのapplication-layer facade。
2//!
3//! TopologyFile(inode)+ LocationFile(directory entry)+ RouteGraph による
4//! 分散ファイルストレージの統合API。
5//!
6//! # API カテゴリ
7//!
8//! - **Sync** — トポロジー全体の同期 ([`sync`], [`sync_route`])
9//! - **File CRUD** — ファイル操作 ([`put`], [`get`], [`list`], [`delete`])
10//! - **Status** — 監視 ([`status`])
11//!
12//! # 3フェーズパイプライン
13//!
14//! ```text
15//! Phase 1: Ingest   — scan → TopologyDelta → TopologyFile/LocationFile更新
16//! Phase 2: Distribute — TopologyFile × LocationFile → DistributeAction[]
17//! Phase 3: Route     — DistributeAction + RouteGraph → PlannedTransfer → Transfer実行
18//! ```
19
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22
23use crate::application::error::SyncError;
24use crate::domain::file_type::FileType;
25use crate::domain::fingerprint::FileFingerprint;
26use crate::domain::graph::RouteGraph;
27use crate::domain::location::LocationId;
28use crate::domain::location_file::{self, LocationFile};
29use crate::domain::plan::{plan_distribution, PlannedTransfer};
30use tracing::{debug, info, trace};
31
32use crate::domain::distribute::distribute_actions;
33use crate::domain::topology_delta::TopologyDelta;
34use crate::domain::topology_file::TopologyFile;
35use crate::domain::transfer::{Transfer, TransferKind};
36use crate::infra::location_file_store::LocationFileStore;
37use crate::infra::topology_file_store::TopologyFileStore;
38use crate::infra::transfer_store::TransferStore;
39
40// =============================================================================
41// Result types
42// =============================================================================
43
44/// sync()の結果。
45#[derive(Debug, Clone, Default, serde::Serialize)]
46pub struct TopologySyncResult {
47    /// スキャンで検出されたファイル数。
48    pub scanned: usize,
49    /// Ingestで生成されたTopologyDelta数。
50    pub ingested: usize,
51    /// Distributeで生成されたDistributeAction数。
52    pub distributed: usize,
53    /// 作成されたTransfer数。
54    pub transfers_created: usize,
55    /// 検出されたコンフリクト。
56    ///
57    /// 複数Locationで同一ファイルが異なる内容に更新された場合に報告される。
58    /// コンフリクトがあるファイルのUpdate転送は抑止される。
59    pub conflicts: Vec<crate::domain::distribute::ConflictEntry>,
60}
61
62/// put()の結果。
63#[derive(Debug, serde::Serialize)]
64pub struct TopologyPutResult {
65    /// 登録/更新されたTopologyFile ID。
66    pub topology_file_id: String,
67    /// 新規登録 = true、更新 = false。
68    pub is_new: bool,
69    /// 作成されたTransfer数。
70    pub transfers_created: usize,
71}
72
73// =============================================================================
74// TopologyStore
75// =============================================================================
76
77/// Topology中心モデルの分散ファイルストレージ。
78///
79/// 3つの永続化トレイトに依存し、ドメインロジックをオーケストレーションする。
80pub struct TopologyStore {
81    topology_files: Arc<dyn TopologyFileStore>,
82    location_files: Arc<dyn LocationFileStore>,
83    transfers: Arc<dyn TransferStore>,
84    graph: RouteGraph,
85    /// 全Locationの一覧(target_locations用)。
86    locations: Vec<LocationId>,
87}
88
89impl TopologyStore {
90    pub fn new(
91        topology_files: Arc<dyn TopologyFileStore>,
92        location_files: Arc<dyn LocationFileStore>,
93        transfers: Arc<dyn TransferStore>,
94        graph: RouteGraph,
95        locations: Vec<LocationId>,
96    ) -> Self {
97        Self {
98            topology_files,
99            location_files,
100            transfers,
101            graph,
102            locations,
103        }
104    }
105
106    // =========================================================================
107    // Sync — 全体同期
108    // =========================================================================
109
110    /// 全体同期: Ingest済みのTopologyDelta群を受け取り、
111    /// Apply → Distribute → Route → Transfer作成 を実行する。
112    ///
113    /// Ingest(スキャン→TopologyDelta生成)は呼び出し元が実行する。
114    /// この関数はその後の3ステップを担当する。
115    ///
116    /// # フロー
117    ///
118    /// 1. Apply: TopologyDelta → TopologyFile/LocationFile更新
119    /// 2. Distribute: 全TopologyFile × 全LocationFile → DistributeAction[]
120    /// 3. Route: DistributeAction + RouteGraph → PlannedTransfer → Transfer作成
121    pub async fn sync(&self, deltas: &[TopologyDelta]) -> Result<TopologySyncResult, SyncError> {
122        info!(delta_count = deltas.len(), "topology_store::sync: start");
123
124        // Phase 1: Apply — TopologyDelta → TopologyFile/LocationFile更新
125        let ingest_origins = self.apply_ingest(deltas).await?;
126        info!(
127            origins = ingest_origins.len(),
128            "topology_store::sync: phase1 apply done"
129        );
130
131        // Phase 2: Distribute
132        let active_tfs = self.topology_files.list_active(None, None).await?;
133        let active_tf_refs: Vec<&TopologyFile> = active_tfs.iter().collect();
134        let file_ids: Vec<&str> = active_tfs.iter().map(|tf| tf.id()).collect();
135        debug!(
136            active_files = active_tfs.len(),
137            "topology_store::sync: loaded active topology_files"
138        );
139
140        let lf_map = self.location_files.list_by_files(&file_ids).await?;
141        let lf_ref_map = to_ref_map(&lf_map);
142        debug!(
143            location_file_groups = lf_map.len(),
144            "topology_store::sync: loaded location_files"
145        );
146
147        let dist_result = distribute_actions(
148            &active_tf_refs,
149            &lf_ref_map,
150            &self.locations,
151            &ingest_origins,
152        );
153        info!(
154            actions = dist_result.actions.len(),
155            conflicts = dist_result.conflicts.len(),
156            "topology_store::sync: phase2 distribute done"
157        );
158
159        // 削除済みファイル → 各LocationFileのdestへDelete Transfer直接発行
160        let deleted_tfs = self.topology_files.list_deleted().await?;
161        trace!(
162            deleted_tfs = deleted_tfs.len(),
163            "topology_store::sync: checking deleted topology_files for delete transfers"
164        );
165        let mut delete_transfers_created = 0;
166        let pending_dests = self.collect_pending_dests().await?;
167        for dtf in &deleted_tfs {
168            let lfs = self.location_files.list_by_file(dtf.id()).await?;
169            let empty = HashSet::new();
170            let pending = pending_dests.get(dtf.id()).unwrap_or(&empty);
171            for lf in &lfs {
172                let dest = lf.location_id().clone();
173                if pending.contains(&dest) {
174                    trace!(
175                        file_id = %dtf.id(),
176                        dest = %dest,
177                        "topology_store::sync: delete transfer skipped (pending)"
178                    );
179                    continue;
180                }
181                let src = self
182                    .locations
183                    .iter()
184                    .find(|l| *l != &dest)
185                    .cloned()
186                    .unwrap_or_else(|| dest.clone());
187                if src == dest {
188                    trace!(
189                        file_id = %dtf.id(),
190                        dest = %dest,
191                        "topology_store::sync: delete transfer skipped (single location)"
192                    );
193                    continue;
194                }
195                trace!(
196                    file_id = %dtf.id(),
197                    src = %src,
198                    dest = %dest,
199                    "topology_store::sync: creating delete transfer"
200                );
201                let transfer = Transfer::new_delete(dtf.id().to_string(), src, dest)?;
202                self.transfers.insert_transfer(&transfer).await?;
203                delete_transfers_created += 1;
204            }
205        }
206        if delete_transfers_created > 0 {
207            debug!(
208                count = delete_transfers_created,
209                "topology_store::sync: delete transfers created"
210            );
211        }
212
213        let distributed = dist_result.actions.len();
214
215        // Phase 3: Route → Transfer作成(Send/Updateのみ)
216        // 既存データ保持locationをMulti-source Dijkstraに渡す
217        // Stale/Missing/Syncing のLocationFileはsource eligible ではないため除外
218        let existing_presences: HashMap<String, HashSet<LocationId>> = lf_map
219            .iter()
220            .map(|(file_id, lfs)| {
221                let locs: HashSet<LocationId> = lfs
222                    .iter()
223                    .filter(|lf| lf.state().is_source_eligible())
224                    .map(|lf| lf.location_id().clone())
225                    .collect();
226                (file_id.clone(), locs)
227            })
228            .collect();
229        let planned = plan_distribution(
230            &dist_result.actions,
231            &self.graph,
232            &pending_dests,
233            &existing_presences,
234        );
235        debug!(
236            planned_count = planned.len(),
237            "topology_store::sync: phase3 route planned"
238        );
239
240        let transfers_created = self.create_transfers(&planned).await? + delete_transfers_created;
241        info!(
242            transfers_created = transfers_created,
243            "topology_store::sync: phase3 route done"
244        );
245
246        Ok(TopologySyncResult {
247            scanned: 0, // 呼び出し元がセット
248            ingested: deltas.len(),
249            distributed,
250            transfers_created,
251            conflicts: dist_result.conflicts,
252        })
253    }
254
255    // =========================================================================
256    // Sync — 単一ルート
257    // =========================================================================
258
259    /// 単一ルート同期: src→dest の経路のみ処理する。
260    ///
261    /// dest側のLocationFileを確認し、不足・古いものだけDistribute + Transfer作成。
262    pub async fn sync_route(
263        &self,
264        src: &LocationId,
265        dest: &LocationId,
266    ) -> Result<TopologySyncResult, SyncError> {
267        let active_tfs = self.topology_files.list_active(None, None).await?;
268        let active_tf_refs: Vec<&TopologyFile> = active_tfs.iter().collect();
269        let file_ids: Vec<&str> = active_tfs.iter().map(|tf| tf.id()).collect();
270        let lf_map = self.location_files.list_by_files(&file_ids).await?;
271        let lf_ref_map = to_ref_map(&lf_map);
272
273        // source=src, target=[dest] でDistribute
274        let mut ingest_origins = HashMap::new();
275        for tf in &active_tfs {
276            // src側のLocationFileがActiveなファイルのみ対象
277            if let Some(lfs) = lf_map.get(tf.id()) {
278                if lfs
279                    .iter()
280                    .any(|lf| lf.location_id() == src && lf.state().is_source_eligible())
281                {
282                    ingest_origins
283                        .entry(tf.id().to_string())
284                        .or_insert_with(HashSet::new)
285                        .insert(src.clone());
286                }
287            }
288        }
289
290        let dist_result = distribute_actions(
291            &active_tf_refs,
292            &lf_ref_map,
293            std::slice::from_ref(dest),
294            &ingest_origins,
295        );
296
297        let distributed = dist_result.actions.len();
298
299        // Route: この場合src→destの直接Transferのみ(optimal_tree不要)
300        let pending_dests = self.collect_pending_dests().await?;
301        let transfers: Vec<PlannedTransfer> = dist_result
302            .actions
303            .iter()
304            .filter_map(|action| {
305                let file_id = action.topology_file_id();
306                let empty = HashSet::new();
307                let pending = pending_dests.get(file_id).unwrap_or(&empty);
308                if pending.contains(dest) {
309                    return None;
310                }
311                Some(PlannedTransfer {
312                    file_id: file_id.to_string(),
313                    src: src.clone(),
314                    dest: dest.clone(),
315                    kind: if action.is_delete() {
316                        TransferKind::Delete
317                    } else {
318                        TransferKind::Sync
319                    },
320                    depends_on_index: None,
321                })
322            })
323            .collect();
324
325        let transfers_created = self.create_transfers(&transfers).await?;
326
327        Ok(TopologySyncResult {
328            scanned: 0,
329            ingested: 0,
330            distributed,
331            transfers_created,
332            conflicts: dist_result.conflicts,
333        })
334    }
335
336    // =========================================================================
337    // File CRUD
338    // =========================================================================
339
340    /// ファイル登録。
341    ///
342    /// TopologyFile + LocationFile(origin) を作成し、
343    /// 全到達可能Locationへの転送を計画する。
344    pub async fn put(
345        &self,
346        relative_path: &str,
347        file_type: FileType,
348        fingerprint: FileFingerprint,
349        origin: &LocationId,
350        embedded_id: Option<String>,
351    ) -> Result<TopologyPutResult, SyncError> {
352        // 既存チェック(path or canonical_hash)
353        let existing = self.topology_files.get_by_path(relative_path).await?;
354
355        let (tf, is_new) = if let Some(mut tf) = existing {
356            // 既存 → canonical_hash昇格 + LocationFile更新
357            tf.promote_canonical_digest(&fingerprint);
358            self.topology_files.upsert(&tf).await?;
359            (tf, false)
360        } else {
361            // 新規作成
362            let mut tf = TopologyFile::new(relative_path.to_string(), file_type)
363                .map_err(SyncError::Domain)?;
364            tf.promote_canonical_digest(&fingerprint);
365            self.topology_files.upsert(&tf).await?;
366            (tf, true)
367        };
368
369        // LocationFile作成/更新
370        let existing_lf = self.location_files.get(tf.id(), origin).await?;
371        match existing_lf {
372            Some(mut lf) => {
373                lf.update_fingerprint(fingerprint.clone(), embedded_id);
374                self.location_files.upsert(&lf).await?;
375            }
376            None => {
377                let lf = tf
378                    .materialize(
379                        origin.clone(),
380                        relative_path.to_string(),
381                        fingerprint.clone(),
382                        embedded_id,
383                    )
384                    .map_err(SyncError::Domain)?;
385                self.location_files.upsert(&lf).await?;
386            }
387        }
388
389        // Transfer計画: origin → 全到達可能Location
390        let mut ingest_origins = HashMap::new();
391        ingest_origins.insert(tf.id().to_string(), HashSet::from([origin.clone()]));
392
393        let lfs = self.location_files.list_by_file(tf.id()).await?;
394        let mut lf_map: HashMap<String, Vec<&LocationFile>> = HashMap::new();
395        lf_map.insert(tf.id().to_string(), lfs.iter().collect());
396
397        let dist_result = distribute_actions(&[&tf], &lf_map, &self.locations, &ingest_origins);
398
399        let pending_dests = self.collect_pending_dests().await?;
400        let sync_actions: Vec<_> = dist_result
401            .actions
402            .iter()
403            .filter(|a| !a.is_delete())
404            .cloned()
405            .collect();
406        let existing_presences: HashMap<String, HashSet<LocationId>> = {
407            let locs: HashSet<LocationId> = lfs.iter().map(|lf| lf.location_id().clone()).collect();
408            let mut m = HashMap::new();
409            m.insert(tf.id().to_string(), locs);
410            m
411        };
412        let planned = plan_distribution(
413            &sync_actions,
414            &self.graph,
415            &pending_dests,
416            &existing_presences,
417        );
418
419        let transfers_created = self.create_transfers(&planned).await?;
420
421        Ok(TopologyPutResult {
422            topology_file_id: tf.id().to_string(),
423            is_new,
424            transfers_created,
425        })
426    }
427
428    /// ファイル取得。
429    pub async fn get(&self, relative_path: &str) -> Result<Option<TopologyFileView>, SyncError> {
430        let tf = match self.topology_files.get_by_path(relative_path).await? {
431            Some(tf) => tf,
432            None => return Ok(None),
433        };
434        let lfs = self.location_files.list_by_file(tf.id()).await?;
435        Ok(Some(TopologyFileView {
436            topology_file: tf,
437            location_files: lfs,
438        }))
439    }
440
441    /// ファイル一覧。
442    pub async fn list(
443        &self,
444        file_type: Option<FileType>,
445        limit: Option<usize>,
446    ) -> Result<Vec<TopologyFileView>, SyncError> {
447        let tfs = self.topology_files.list_active(file_type, limit).await?;
448        let file_ids: Vec<&str> = tfs.iter().map(|tf| tf.id()).collect();
449        let lf_map = self.location_files.list_by_files(&file_ids).await?;
450
451        let views = tfs
452            .into_iter()
453            .map(|tf| {
454                let lfs = lf_map.get(tf.id()).cloned().unwrap_or_default();
455                TopologyFileView {
456                    topology_file: tf,
457                    location_files: lfs,
458                }
459            })
460            .collect();
461        Ok(views)
462    }
463
464    /// ファイル削除。
465    ///
466    /// TopologyFile mark_deleted + LocationFile保持先への Delete Transfer 直接発行。
467    /// Intentベース: distribute/plan を経由せず各destへ直結N件。
468    pub async fn delete(&self, relative_path: &str) -> Result<usize, SyncError> {
469        let mut tf = self
470            .topology_files
471            .get_by_path(relative_path)
472            .await?
473            .ok_or_else(|| SyncError::NotRegistered(relative_path.to_string()))?;
474
475        tf.mark_deleted();
476        self.topology_files.upsert(&tf).await?;
477
478        // LocationFileを持つ各LocationへDelete Transfer直接発行
479        let lfs = self.location_files.list_by_file(tf.id()).await?;
480        let pending_dests = self.collect_pending_dests().await?;
481        let empty = HashSet::new();
482        let pending = pending_dests.get(tf.id()).unwrap_or(&empty);
483
484        let mut created = 0;
485        for lf in &lfs {
486            let dest = lf.location_id().clone();
487            if pending.contains(&dest) {
488                continue;
489            }
490            // src: dest以外の任意Location(Delete実行に物理srcは不要)
491            let src = self
492                .locations
493                .iter()
494                .find(|l| *l != &dest)
495                .cloned()
496                .unwrap_or_else(|| dest.clone());
497            if src == dest {
498                // 単一Location環境 — ローカル削除はTransfer不要
499                continue;
500            }
501            let transfer = Transfer::new_delete(tf.id().to_string(), src, dest)?;
502            self.transfers.insert_transfer(&transfer).await?;
503            created += 1;
504        }
505
506        Ok(created)
507    }
508
509    // =========================================================================
510    // Status
511    // =========================================================================
512
513    /// ファイル数。
514    pub async fn file_count(&self) -> Result<usize, SyncError> {
515        Ok(self.topology_files.count_active().await?)
516    }
517
518    /// Location一覧。
519    pub fn locations(&self) -> &[LocationId] {
520        &self.locations
521    }
522
523    // =========================================================================
524    // Internal: Apply Ingest
525    // =========================================================================
526
527    /// TopologyDelta群をApply: TopologyFile/LocationFile更新。
528    ///
529    /// delta適用順序を正規化する:
530    ///   Renamed(0) → ContentChanged(1) → Discovered(2) → Vanished(3)
531    ///
532    /// Renameが先に処理されることで、Rename先pathとDiscoveredのpath衝突を防ぐ。
533    ///
534    /// 返り値: file_id → ingest origin LocationId集合(Distribute用)。
535    async fn apply_ingest(
536        &self,
537        deltas: &[TopologyDelta],
538    ) -> Result<HashMap<String, HashSet<LocationId>>, SyncError> {
539        let mut ingest_origins: HashMap<String, HashSet<LocationId>> = HashMap::new();
540
541        // delta適用順を正規化: Renamed → ContentChanged → Discovered → Vanished
542        let mut sorted_deltas: Vec<&TopologyDelta> = deltas.iter().collect();
543        sorted_deltas.sort_by_key(|d| match d {
544            TopologyDelta::Renamed(_) => 0,
545            TopologyDelta::ContentChanged(_) => 1,
546            TopologyDelta::Discovered(_) => 2,
547            TopologyDelta::Vanished(_) => 3,
548        });
549
550        let total = sorted_deltas.len();
551        let log_interval = (total / 10).max(1);
552
553        for (i, delta) in sorted_deltas.iter().enumerate() {
554            if i % log_interval == 0 {
555                info!(progress = i, total = total, "apply_ingest: processing");
556            }
557            match delta {
558                TopologyDelta::Discovered(d) => {
559                    // 既存TopologyFileがあれば再利用(複数Locationが同一ファイルを
560                    // Discoveredとして報告するケース)。なければ新規作成。
561                    let existing = self.topology_files.get_by_path(&d.relative_path).await?;
562                    let is_new = existing.is_none();
563                    let mut tf = if let Some(existing) = existing {
564                        trace!(
565                            path = %d.relative_path,
566                            tf_id = %existing.id(),
567                            origin = %d.origin,
568                            "apply_ingest: Discovered — reusing existing TopologyFile"
569                        );
570                        existing
571                    } else {
572                        trace!(
573                            path = %d.relative_path,
574                            origin = %d.origin,
575                            size = d.fingerprint.size,
576                            content_digest = ?d.fingerprint.content_digest,
577                            "apply_ingest: Discovered — creating new TopologyFile"
578                        );
579                        TopologyFile::new(d.relative_path.clone(), d.file_type)
580                            .map_err(SyncError::Domain)?
581                    };
582                    tf.promote_canonical_digest(&d.fingerprint);
583                    self.topology_files.upsert(&tf).await?;
584
585                    let lf = tf
586                        .materialize(
587                            d.origin.clone(),
588                            d.relative_path.clone(),
589                            d.fingerprint.clone(),
590                            d.embedded_id.clone(),
591                        )
592                        .map_err(SyncError::Domain)?;
593                    self.location_files.upsert(&lf).await?;
594
595                    if is_new {
596                        debug!(
597                            path = %d.relative_path,
598                            tf_id = %tf.id(),
599                            origin = %d.origin,
600                            "apply_ingest: NEW file registered"
601                        );
602                    }
603
604                    ingest_origins
605                        .entry(tf.id().to_string())
606                        .or_default()
607                        .insert(d.origin.clone());
608                }
609                TopologyDelta::ContentChanged(c) => {
610                    trace!(
611                        path = %c.relative_path,
612                        tf_id = %c.topology_file_id,
613                        origin = %c.origin,
614                        old_size = c.old_fingerprint.size,
615                        new_size = c.new_fingerprint.size,
616                        "apply_ingest: ContentChanged"
617                    );
618                    // TopologyFile: canonical_hash昇格
619                    if let Some(mut tf) = self.topology_files.get_by_id(&c.topology_file_id).await?
620                    {
621                        tf.promote_canonical_digest(&c.new_fingerprint);
622                        self.topology_files.upsert(&tf).await?;
623                    }
624
625                    // LocationFile: fingerprint更新 or 新規作成
626                    let existing_lf = self
627                        .location_files
628                        .get(&c.topology_file_id, &c.origin)
629                        .await?;
630                    match existing_lf {
631                        Some(mut lf) => {
632                            lf.update_fingerprint(c.new_fingerprint.clone(), c.embedded_id.clone());
633                            self.location_files.upsert(&lf).await?;
634                        }
635                        None => {
636                            if let Some(tf) =
637                                self.topology_files.get_by_id(&c.topology_file_id).await?
638                            {
639                                let lf = tf
640                                    .materialize(
641                                        c.origin.clone(),
642                                        c.relative_path.clone(),
643                                        c.new_fingerprint.clone(),
644                                        c.embedded_id.clone(),
645                                    )
646                                    .map_err(SyncError::Domain)?;
647                                self.location_files.upsert(&lf).await?;
648                            }
649                        }
650                    }
651
652                    // 他LocationのLocationFileをStaleに(cross-location比較で実際に異なるもののみ)
653                    let all_lfs = self
654                        .location_files
655                        .list_by_file(&c.topology_file_id)
656                        .await?;
657                    for stale_lf in
658                        location_file::stale_candidates(&all_lfs, &c.origin, &c.new_fingerprint)
659                    {
660                        let mut lf = stale_lf.clone();
661                        lf.mark_stale();
662                        self.location_files.upsert(&lf).await?;
663                    }
664
665                    ingest_origins
666                        .entry(c.topology_file_id.clone())
667                        .or_default()
668                        .insert(c.origin.clone());
669                }
670                TopologyDelta::Renamed(r) => {
671                    trace!(
672                        tf_id = %r.topology_file_id,
673                        old_path = %r.old_path,
674                        new_path = %r.new_path,
675                        origin = %r.origin,
676                        "apply_ingest: Renamed"
677                    );
678                    if let Some(mut tf) = self.topology_files.get_by_id(&r.topology_file_id).await?
679                    {
680                        tf.update_path(r.new_path.clone());
681                        tf.promote_canonical_digest(&r.fingerprint);
682                        self.topology_files.upsert(&tf).await?;
683                    }
684
685                    // LocationFile: fingerprint更新
686                    let existing_lf = self
687                        .location_files
688                        .get(&r.topology_file_id, &r.origin)
689                        .await?;
690                    match existing_lf {
691                        Some(mut lf) => {
692                            lf.update_fingerprint(r.fingerprint.clone(), r.embedded_id.clone());
693                            self.location_files.upsert(&lf).await?;
694                        }
695                        None => {
696                            if let Some(tf) =
697                                self.topology_files.get_by_id(&r.topology_file_id).await?
698                            {
699                                let lf = tf
700                                    .materialize(
701                                        r.origin.clone(),
702                                        r.new_path.clone(),
703                                        r.fingerprint.clone(),
704                                        r.embedded_id.clone(),
705                                    )
706                                    .map_err(SyncError::Domain)?;
707                                self.location_files.upsert(&lf).await?;
708                            }
709                        }
710                    }
711
712                    ingest_origins
713                        .entry(r.topology_file_id.clone())
714                        .or_default()
715                        .insert(r.origin.clone());
716                }
717                TopologyDelta::Vanished(v) => {
718                    trace!(
719                        path = %v.relative_path,
720                        tf_id = %v.topology_file_id,
721                        origin = %v.origin,
722                        "apply_ingest: Vanished"
723                    );
724                    // LocationFile: mark_missing
725                    let existing_lf = self
726                        .location_files
727                        .get(&v.topology_file_id, &v.origin)
728                        .await?;
729                    if let Some(mut lf) = existing_lf {
730                        lf.mark_missing();
731                        self.location_files.upsert(&lf).await?;
732                    }
733                    // Vanished ではingest_originsに追加しない(消失はsourceにならない)
734                    // NOTE: scan-based delete propagation (Vanished on local → mark_deleted)
735                    // は ByHash誤判定によるpath conflict retire → 大量誤削除の問題があるため
736                    // 撤回。削除は明示 delete() API のみで行う。
737                }
738            }
739        }
740
741        info!(
742            processed = total,
743            origins = ingest_origins.len(),
744            "apply_ingest: done"
745        );
746        Ok(ingest_origins)
747    }
748
749    // =========================================================================
750    // Internal: Transfer作成
751    // =========================================================================
752
753    /// PlannedTransfer群からTransferを作成しDBに書き込む。
754    async fn create_transfers(&self, planned: &[PlannedTransfer]) -> Result<usize, SyncError> {
755        let mut created = 0;
756        // depends_on_index → 実Transfer IDのマッピング
757        let mut transfer_ids: Vec<String> = Vec::with_capacity(planned.len());
758
759        for pt in planned.iter() {
760            trace!(
761                file_id = %pt.file_id,
762                src = %pt.src,
763                dest = %pt.dest,
764                kind = ?pt.kind,
765                depends_on = ?pt.depends_on_index,
766                "create_transfers: creating transfer"
767            );
768            let transfer = if let Some(dep_idx) = pt.depends_on_index {
769                let dep_id = &transfer_ids[dep_idx];
770                Transfer::with_dependency(
771                    pt.file_id.clone(),
772                    pt.src.clone(),
773                    pt.dest.clone(),
774                    pt.kind,
775                    dep_id.clone(),
776                )?
777            } else {
778                Transfer::with_kind(pt.file_id.clone(), pt.src.clone(), pt.dest.clone(), pt.kind)?
779            };
780            self.transfers.insert_transfer(&transfer).await?;
781            transfer_ids.push(transfer.id().to_string());
782            created += 1;
783        }
784
785        trace!(created = created, "create_transfers: done");
786        Ok(created)
787    }
788
789    /// 未完了Transferのdest集合をfile_id別に収集する。
790    async fn collect_pending_dests(
791        &self,
792    ) -> Result<HashMap<String, HashSet<LocationId>>, SyncError> {
793        let pending = self.transfers.all_pending_transfers().await?;
794        let mut map: HashMap<String, HashSet<LocationId>> = HashMap::new();
795        for t in &pending {
796            map.entry(t.file_id().to_string())
797                .or_default()
798                .insert(t.dest().clone());
799        }
800        Ok(map)
801    }
802}
803
804// =============================================================================
805// View types
806// =============================================================================
807
808/// TopologyFileとその全LocationFileを束ねたビュー。
809#[derive(Debug, Clone, serde::Serialize)]
810pub struct TopologyFileView {
811    pub topology_file: TopologyFile,
812    pub location_files: Vec<LocationFile>,
813}
814
815// =============================================================================
816// Internal helpers
817// =============================================================================
818
819/// HashMap<String, Vec<LocationFile>> → HashMap<String, Vec<&LocationFile>>
820fn to_ref_map(map: &HashMap<String, Vec<LocationFile>>) -> HashMap<String, Vec<&LocationFile>> {
821    map.iter()
822        .map(|(k, v)| (k.clone(), v.iter().collect()))
823        .collect()
824}
825
826// =============================================================================
827// Tests — 3フェーズパイプライン設計検証
828// =============================================================================
829
830#[cfg(test)]
831mod tests {
832    use super::*;
833    use async_trait::async_trait;
834    use chrono::{DateTime, Utc};
835    use tokio::sync::Mutex;
836
837    use crate::domain::location_file::LocationFileState;
838    use crate::domain::topology_delta::{ContentChangedFile, DiscoveredFile, VanishedFile};
839    use crate::domain::transfer::TransferState;
840    use crate::infra::error::InfraError;
841    use crate::infra::transfer_store::TransferStatRow;
842
843    // =========================================================================
844    // Mock stores — パイプライン検証に必要な最小実装
845    // =========================================================================
846
847    struct MockTopologyFileStore {
848        files: Mutex<Vec<TopologyFile>>,
849    }
850
851    impl MockTopologyFileStore {
852        fn new() -> Self {
853            Self {
854                files: Mutex::new(Vec::new()),
855            }
856        }
857    }
858
859    #[async_trait]
860    impl TopologyFileStore for MockTopologyFileStore {
861        async fn upsert(&self, file: &TopologyFile) -> Result<(), InfraError> {
862            let mut files = self.files.lock().await;
863            if let Some(pos) = files.iter().position(|f| f.id() == file.id()) {
864                files[pos] = file.clone();
865            } else {
866                files.push(file.clone());
867            }
868            Ok(())
869        }
870
871        async fn get_by_id(&self, id: &str) -> Result<Option<TopologyFile>, InfraError> {
872            Ok(self
873                .files
874                .lock()
875                .await
876                .iter()
877                .find(|f| f.id() == id)
878                .cloned())
879        }
880
881        async fn get_by_path(&self, path: &str) -> Result<Option<TopologyFile>, InfraError> {
882            Ok(self
883                .files
884                .lock()
885                .await
886                .iter()
887                .find(|f| f.relative_path() == path && f.deleted_at().is_none())
888                .cloned())
889        }
890
891        async fn find_by_canonical_hash(
892            &self,
893            hash: &str,
894        ) -> Result<Option<TopologyFile>, InfraError> {
895            Ok(self
896                .files
897                .lock()
898                .await
899                .iter()
900                .find(|f| f.canonical_hash() == Some(hash) && f.deleted_at().is_none())
901                .cloned())
902        }
903
904        async fn list_active(
905            &self,
906            file_type: Option<FileType>,
907            limit: Option<usize>,
908        ) -> Result<Vec<TopologyFile>, InfraError> {
909            let files = self.files.lock().await;
910            let mut result: Vec<_> = files
911                .iter()
912                .filter(|f| f.deleted_at().is_none())
913                .filter(|f| file_type.is_none_or(|ft| f.file_type() == ft))
914                .cloned()
915                .collect();
916            if let Some(n) = limit {
917                result.truncate(n);
918            }
919            Ok(result)
920        }
921
922        async fn list_deleted(&self) -> Result<Vec<TopologyFile>, InfraError> {
923            Ok(self
924                .files
925                .lock()
926                .await
927                .iter()
928                .filter(|f| f.deleted_at().is_some())
929                .cloned()
930                .collect())
931        }
932
933        async fn hard_delete(&self, id: &str) -> Result<bool, InfraError> {
934            let mut files = self.files.lock().await;
935            let len_before = files.len();
936            files.retain(|f| !(f.id() == id && f.deleted_at().is_some()));
937            Ok(files.len() < len_before)
938        }
939
940        async fn count_active(&self) -> Result<usize, InfraError> {
941            Ok(self
942                .files
943                .lock()
944                .await
945                .iter()
946                .filter(|f| f.deleted_at().is_none())
947                .count())
948        }
949
950        async fn list_active_paths(&self) -> Result<Vec<String>, InfraError> {
951            Ok(self
952                .files
953                .lock()
954                .await
955                .iter()
956                .filter(|f| f.deleted_at().is_none())
957                .map(|f| f.relative_path().to_string())
958                .collect())
959        }
960    }
961
962    struct MockLocationFileStore {
963        files: Mutex<Vec<LocationFile>>,
964    }
965
966    impl MockLocationFileStore {
967        fn new() -> Self {
968            Self {
969                files: Mutex::new(Vec::new()),
970            }
971        }
972    }
973
974    #[async_trait]
975    impl LocationFileStore for MockLocationFileStore {
976        async fn upsert(&self, file: &LocationFile) -> Result<(), InfraError> {
977            let mut files = self.files.lock().await;
978            if let Some(pos) = files.iter().position(|f| {
979                f.file_id() == file.file_id() && f.location_id() == file.location_id()
980            }) {
981                files[pos] = file.clone();
982            } else {
983                files.push(file.clone());
984            }
985            Ok(())
986        }
987
988        async fn get(
989            &self,
990            file_id: &str,
991            location_id: &LocationId,
992        ) -> Result<Option<LocationFile>, InfraError> {
993            Ok(self
994                .files
995                .lock()
996                .await
997                .iter()
998                .find(|f| f.file_id() == file_id && f.location_id() == location_id)
999                .cloned())
1000        }
1001
1002        async fn list_by_file(&self, file_id: &str) -> Result<Vec<LocationFile>, InfraError> {
1003            Ok(self
1004                .files
1005                .lock()
1006                .await
1007                .iter()
1008                .filter(|f| f.file_id() == file_id)
1009                .cloned()
1010                .collect())
1011        }
1012
1013        async fn list_by_location(
1014            &self,
1015            location_id: &LocationId,
1016        ) -> Result<Vec<LocationFile>, InfraError> {
1017            Ok(self
1018                .files
1019                .lock()
1020                .await
1021                .iter()
1022                .filter(|f| f.location_id() == location_id)
1023                .cloned()
1024                .collect())
1025        }
1026
1027        async fn list_by_files(
1028            &self,
1029            file_ids: &[&str],
1030        ) -> Result<HashMap<String, Vec<LocationFile>>, InfraError> {
1031            let files = self.files.lock().await;
1032            let mut map: HashMap<String, Vec<LocationFile>> = HashMap::new();
1033            for f in files.iter() {
1034                if file_ids.contains(&f.file_id()) {
1035                    map.entry(f.file_id().to_string())
1036                        .or_default()
1037                        .push(f.clone());
1038                }
1039            }
1040            Ok(map)
1041        }
1042
1043        async fn delete(
1044            &self,
1045            file_id: &str,
1046            location_id: &LocationId,
1047        ) -> Result<bool, InfraError> {
1048            let mut files = self.files.lock().await;
1049            let before = files.len();
1050            files.retain(|f| !(f.file_id() == file_id && f.location_id() == location_id));
1051            Ok(files.len() < before)
1052        }
1053
1054        async fn count_by_location(&self, location_id: &LocationId) -> Result<usize, InfraError> {
1055            Ok(self
1056                .files
1057                .lock()
1058                .await
1059                .iter()
1060                .filter(|f| f.location_id() == location_id)
1061                .count())
1062        }
1063    }
1064
1065    struct MockTransferStore {
1066        transfers: Mutex<Vec<Transfer>>,
1067    }
1068
1069    impl MockTransferStore {
1070        fn new() -> Self {
1071            Self {
1072                transfers: Mutex::new(Vec::new()),
1073            }
1074        }
1075    }
1076
1077    #[async_trait]
1078    impl TransferStore for MockTransferStore {
1079        async fn insert_transfer(&self, transfer: &Transfer) -> Result<(), InfraError> {
1080            self.transfers.lock().await.push(transfer.clone());
1081            Ok(())
1082        }
1083
1084        async fn update_transfer(&self, transfer: &Transfer) -> Result<(), InfraError> {
1085            let mut transfers = self.transfers.lock().await;
1086            if let Some(pos) = transfers.iter().position(|t| t.id() == transfer.id()) {
1087                transfers[pos] = transfer.clone();
1088            }
1089            Ok(())
1090        }
1091
1092        async fn queued_transfers(&self, dest: &LocationId) -> Result<Vec<Transfer>, InfraError> {
1093            Ok(self
1094                .transfers
1095                .lock()
1096                .await
1097                .iter()
1098                .filter(|t| t.dest() == dest && t.state() == TransferState::Queued)
1099                .cloned()
1100                .collect())
1101        }
1102
1103        async fn latest_transfers_by_file(
1104            &self,
1105            file_id: &str,
1106        ) -> Result<Vec<Transfer>, InfraError> {
1107            Ok(self
1108                .transfers
1109                .lock()
1110                .await
1111                .iter()
1112                .filter(|t| t.file_id() == file_id)
1113                .cloned()
1114                .collect())
1115        }
1116
1117        async fn failed_transfers(&self) -> Result<Vec<Transfer>, InfraError> {
1118            Ok(Vec::new())
1119        }
1120
1121        async fn prune_completed(&self, _before: DateTime<Utc>) -> Result<usize, InfraError> {
1122            Ok(0)
1123        }
1124
1125        async fn count_queued(&self) -> Result<usize, InfraError> {
1126            Ok(self
1127                .transfers
1128                .lock()
1129                .await
1130                .iter()
1131                .filter(|t| t.state() == TransferState::Queued)
1132                .count())
1133        }
1134
1135        async fn cancel_orphaned_inflight(&self) -> Result<usize, InfraError> {
1136            Ok(0)
1137        }
1138
1139        async fn unblock_dependents(
1140            &self,
1141            _completed_transfer_id: &str,
1142        ) -> Result<usize, InfraError> {
1143            Ok(0)
1144        }
1145
1146        async fn all_pending_transfers(&self) -> Result<Vec<Transfer>, InfraError> {
1147            Ok(self
1148                .transfers
1149                .lock()
1150                .await
1151                .iter()
1152                .filter(|t| {
1153                    t.state() == TransferState::Queued || t.state() == TransferState::Blocked
1154                })
1155                .cloned()
1156                .collect())
1157        }
1158
1159        async fn transfer_stats(&self) -> Result<Vec<TransferStatRow>, InfraError> {
1160            Ok(Vec::new())
1161        }
1162
1163        async fn present_counts_by_location(
1164            &self,
1165        ) -> Result<HashMap<LocationId, usize>, InfraError> {
1166            Ok(HashMap::new())
1167        }
1168    }
1169
1170    // =========================================================================
1171    // Helpers
1172    // =========================================================================
1173
1174    fn loc(name: &str) -> LocationId {
1175        LocationId::new(name).expect("valid location name")
1176    }
1177
1178    fn fp(hash: &str, size: u64) -> FileFingerprint {
1179        use crate::domain::digest::{ByteDigest, ContentDigest};
1180        FileFingerprint {
1181            byte_digest: Some(ByteDigest::Djb2(hash.to_string())),
1182            content_digest: Some(ContentDigest(hash.to_string())),
1183            meta_digest: None,
1184            size,
1185            modified_at: None,
1186        }
1187    }
1188
1189    /// local ⇄ pod ⇄ cloud の3拠点双方向グラフ。
1190    fn three_loc_setup() -> (RouteGraph, Vec<LocationId>) {
1191        let local = loc("local");
1192        let pod = loc("pod");
1193        let cloud = loc("cloud");
1194        let mut g = RouteGraph::new();
1195        g.add(local.clone(), pod.clone());
1196        g.add(pod.clone(), cloud.clone());
1197        g.add(pod.clone(), local.clone());
1198        g.add(cloud.clone(), pod.clone());
1199        (g, vec![local, pod, cloud])
1200    }
1201
1202    fn make_store(
1203        tf: Arc<MockTopologyFileStore>,
1204        lf: Arc<MockLocationFileStore>,
1205        tr: Arc<MockTransferStore>,
1206    ) -> TopologyStore {
1207        let (graph, locations) = three_loc_setup();
1208        TopologyStore::new(tf, lf, tr, graph, locations)
1209    }
1210
1211    fn discovered(path: &str, hash: &str, origin: &str) -> TopologyDelta {
1212        TopologyDelta::Discovered(DiscoveredFile {
1213            relative_path: path.to_string(),
1214            file_type: FileType::Image,
1215            fingerprint: fp(hash, 1024),
1216            origin: loc(origin),
1217            embedded_id: None,
1218        })
1219    }
1220
1221    // =========================================================================
1222    // Pipeline Test 1: Discovered → Ingest → Distribute → Route
1223    //
1224    // 検証: 新規ファイルがlocalで検出された場合、
1225    //       TopologyFile/LocationFile作成後、pod/cloudへTransferが計画される。
1226    // =========================================================================
1227
1228    #[tokio::test]
1229    async fn pipeline_discovered_creates_topology_and_routes_transfers() {
1230        let tf_s = Arc::new(MockTopologyFileStore::new());
1231        let lf_s = Arc::new(MockLocationFileStore::new());
1232        let tr_s = Arc::new(MockTransferStore::new());
1233        let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1234
1235        let result = store
1236            .sync(&[discovered("output/gen-001.png", "abc123", "local")])
1237            .await
1238            .unwrap();
1239
1240        // Phase 1 検証: TopologyFile 1件 + LocationFile(origin=local) 1件
1241        let tfs = tf_s.files.lock().await;
1242        assert_eq!(tfs.len(), 1);
1243        assert_eq!(tfs[0].relative_path(), "output/gen-001.png");
1244        assert!(
1245            tfs[0].canonical_hash().is_some(),
1246            "canonical_hash should be promoted from fingerprint"
1247        );
1248
1249        let lfs = lf_s.files.lock().await;
1250        assert_eq!(lfs.len(), 1);
1251        assert_eq!(lfs[0].location_id(), &loc("local"));
1252        assert_eq!(lfs[0].state(), LocationFileState::Active);
1253
1254        // Phase 2 検証: origin=local以外の2 locations (pod, cloud) へdistribute
1255        assert_eq!(result.ingested, 1);
1256        assert!(result.distributed >= 2, "pod + cloud へのDistributeAction");
1257
1258        // Phase 3 検証: Transfer作成(local→pod→cloudの経路)
1259        let transfers = tr_s.transfers.lock().await;
1260        assert!(
1261            !transfers.is_empty(),
1262            "Transfers should be created for reachable locations"
1263        );
1264        let tf_id = tfs[0].id();
1265        for t in transfers.iter() {
1266            assert_eq!(t.file_id(), tf_id, "All transfers for same file");
1267            assert_ne!(t.src(), t.dest(), "No self-transfer");
1268        }
1269    }
1270
1271    // =========================================================================
1272    // Pipeline Test 2: ContentChanged → Stale化 + 更新Transfer
1273    //
1274    // 検証: podに既存LocationFile(Active)がある状態で、localでコンテンツ変更。
1275    //       pod側がStaleになり、更新Transferが計画される。
1276    // =========================================================================
1277
1278    #[tokio::test]
1279    async fn pipeline_content_changed_stales_others_and_creates_transfers() {
1280        let tf_s = Arc::new(MockTopologyFileStore::new());
1281        let lf_s = Arc::new(MockLocationFileStore::new());
1282        let tr_s = Arc::new(MockTransferStore::new());
1283        let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1284
1285        // 初回: localで検出
1286        store
1287            .sync(&[discovered("output/img.png", "v1_hash", "local")])
1288            .await
1289            .unwrap();
1290
1291        let tf_id = tf_s.files.lock().await[0].id().to_string();
1292
1293        // podにLocationFile追加(Transfer完了をシミュレート)
1294        let pod_lf = LocationFile::new(
1295            tf_id.clone(),
1296            loc("pod"),
1297            "output/img.png".to_string(),
1298            fp("v1_hash", 1024),
1299            None,
1300        )
1301        .unwrap();
1302        lf_s.upsert(&pod_lf).await.unwrap();
1303
1304        // 初回Transferをクリア
1305        tr_s.transfers.lock().await.clear();
1306
1307        // localでコンテンツ変更
1308        let delta = TopologyDelta::ContentChanged(ContentChangedFile {
1309            topology_file_id: tf_id.clone(),
1310            relative_path: "output/img.png".to_string(),
1311            file_type: FileType::Image,
1312            old_fingerprint: fp("v1_hash", 1024),
1313            new_fingerprint: fp("v2_hash", 2048),
1314            origin: loc("local"),
1315            embedded_id: None,
1316        });
1317
1318        let result = store.sync(&[delta]).await.unwrap();
1319
1320        // pod側がStale
1321        let pod_lf = lf_s.get(&tf_id, &loc("pod")).await.unwrap().unwrap();
1322        assert_eq!(pod_lf.state(), LocationFileState::Stale);
1323
1324        // local側はActive(更新元)
1325        let local_lf = lf_s.get(&tf_id, &loc("local")).await.unwrap().unwrap();
1326        assert_eq!(local_lf.state(), LocationFileState::Active);
1327
1328        // 更新Transferが作成されている
1329        assert!(result.distributed > 0);
1330        assert!(result.transfers_created > 0);
1331    }
1332
1333    // =========================================================================
1334    // Pipeline Test 3: Vanished → LocationFile Missing化
1335    //
1336    // 検証: localからファイルが消失した場合、LocationFileがMissingに遷移。
1337    //       TopologyFile自体は生存(他Locationに存在し得る)。
1338    // =========================================================================
1339
1340    #[tokio::test]
1341    async fn pipeline_vanished_marks_location_file_missing() {
1342        let tf_s = Arc::new(MockTopologyFileStore::new());
1343        let lf_s = Arc::new(MockLocationFileStore::new());
1344        let tr_s = Arc::new(MockTransferStore::new());
1345        let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1346
1347        store
1348            .sync(&[discovered("output/gone.png", "gone_hash", "local")])
1349            .await
1350            .unwrap();
1351
1352        let tf_id = tf_s.files.lock().await[0].id().to_string();
1353
1354        let delta = TopologyDelta::Vanished(VanishedFile {
1355            topology_file_id: tf_id.clone(),
1356            relative_path: "output/gone.png".to_string(),
1357            origin: loc("local"),
1358        });
1359
1360        store.sync(&[delta]).await.unwrap();
1361
1362        // LocationFileがMissing
1363        let lf = lf_s.get(&tf_id, &loc("local")).await.unwrap().unwrap();
1364        assert_eq!(lf.state(), LocationFileState::Missing);
1365
1366        // TopologyFileは削除されていない(他Locationに存在し得る)
1367        let tf = tf_s.files.lock().await;
1368        assert!(tf[0].deleted_at().is_none());
1369    }
1370
1371    // =========================================================================
1372    // Pipeline Test 4: 複数delta一括 → バッチパイプライン
1373    //
1374    // 検証: 3ファイル同時Discovered(origin混在)→ 各ファイルに対するTransfer。
1375    // =========================================================================
1376
1377    #[tokio::test]
1378    async fn pipeline_batch_discovered_multi_origin() {
1379        let tf_s = Arc::new(MockTopologyFileStore::new());
1380        let lf_s = Arc::new(MockLocationFileStore::new());
1381        let tr_s = Arc::new(MockTransferStore::new());
1382        let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1383
1384        let deltas = vec![
1385            discovered("a.png", "ha", "local"),
1386            discovered("b.png", "hb", "local"),
1387            discovered("c.png", "hc", "pod"), // pod origin
1388        ];
1389
1390        let result = store.sync(&deltas).await.unwrap();
1391
1392        assert_eq!(result.ingested, 3);
1393        assert_eq!(tf_s.files.lock().await.len(), 3);
1394        assert_eq!(lf_s.files.lock().await.len(), 3);
1395
1396        // 各ファイルにorigin以外のlocationへのDistributeAction
1397        assert!(result.distributed >= 3);
1398        assert!(result.transfers_created >= 3);
1399    }
1400
1401    // =========================================================================
1402    // Pipeline Test 5: put → sync パイプライン整合性
1403    //
1404    // 検証: put()で登録後、sync()の空delta呼び出しが既存データと矛盾しない。
1405    // =========================================================================
1406
1407    #[tokio::test]
1408    async fn pipeline_put_then_empty_sync_is_consistent() {
1409        let tf_s = Arc::new(MockTopologyFileStore::new());
1410        let lf_s = Arc::new(MockLocationFileStore::new());
1411        let tr_s = Arc::new(MockTransferStore::new());
1412        let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1413
1414        // put で1ファイル登録
1415        let put_result = store
1416            .put("x.png", FileType::Image, fp("xh", 100), &loc("local"), None)
1417            .await
1418            .unwrap();
1419
1420        assert!(put_result.is_new);
1421        let initial_transfers = tr_s.transfers.lock().await.len();
1422        assert!(initial_transfers > 0);
1423
1424        // 空delta sync — 既にpendingなTransferがあるので重複Transferは作られないはず
1425        let sync_result = store.sync(&[]).await.unwrap();
1426
1427        assert_eq!(sync_result.ingested, 0);
1428        // distribute自体は走るが、pending重複で新規Transferは0(または少数)
1429        // ここではパイプラインがpanicせず完走することが重要
1430    }
1431
1432    // =========================================================================
1433    // Pipeline Test 6: delete → Transfer計画
1434    //
1435    // 検証: ファイル削除後、LocationFileを持つLocationへのDelete Transferが計画される。
1436    // =========================================================================
1437
1438    #[tokio::test]
1439    async fn pipeline_delete_creates_delete_transfers() {
1440        let tf_s = Arc::new(MockTopologyFileStore::new());
1441        let lf_s = Arc::new(MockLocationFileStore::new());
1442        let tr_s = Arc::new(MockTransferStore::new());
1443        let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1444
1445        // ファイル登録
1446        store
1447            .put(
1448                "del.png",
1449                FileType::Image,
1450                fp("dh", 100),
1451                &loc("local"),
1452                None,
1453            )
1454            .await
1455            .unwrap();
1456
1457        // podにもLocationFile追加
1458        let tf_id = tf_s.files.lock().await[0].id().to_string();
1459        let pod_lf = LocationFile::new(
1460            tf_id.clone(),
1461            loc("pod"),
1462            "del.png".to_string(),
1463            fp("dh", 100),
1464            None,
1465        )
1466        .unwrap();
1467        lf_s.upsert(&pod_lf).await.unwrap();
1468
1469        // put時のTransferをクリア
1470        tr_s.transfers.lock().await.clear();
1471
1472        // 削除
1473        let delete_count = store.delete("del.png").await.unwrap();
1474
1475        // TopologyFileがdeleted
1476        let tf = tf_s.files.lock().await;
1477        assert!(tf[0].deleted_at().is_some());
1478
1479        // LocationFile保持先(local, pod)へDelete Transfer直接発行
1480        // local: src=pod, dest=local / pod: src=local, dest=pod
1481        assert_eq!(delete_count, 2, "Delete Transfer for local + pod");
1482        let transfers = tr_s.transfers.lock().await;
1483        assert_eq!(transfers.len(), 2);
1484        for t in transfers.iter() {
1485            assert!(t.is_delete(), "All should be Delete kind");
1486            assert_ne!(t.src(), t.dest(), "No self-transfer");
1487        }
1488    }
1489}