Skip to main content

vdsl_sync/application/
sdk_impl.rs

1//! SdkImpl — SyncStoreSdk の本実装。
2//!
3//! scan→delta→plan→execute の全パイプラインを内部完結させる。
4//! インターフェース層(MCP, Lua)は `Arc<dyn SyncStoreSdk>` 経由でのみ使用する。
5//!
6//! # 構成
7//!
8//! ```text
9//! SdkImpl
10//!   ├── scanner: TopologyScanner  — scan → TopologyDelta[]
11//!   ├── topology: TopologyStore   — Apply → Distribute → Route → Transfer作成
12//!   ├── engine: TransferEngine    — Transfer実行
13//!   ├── transfer_store            — Transfer永続化(execute時に必要)
14//!   ├── topology_files            — TopologyFile参照(execute時に必要)
15//!   ├── config: SyncConfig        — リトライ/並行数
16//!   └── scan_excludes             — globパターン
17//! ```
18
19use std::path::Path;
20use std::sync::{Arc, Mutex as StdMutex};
21
22use async_trait::async_trait;
23use tracing::{debug, error, info, trace, warn};
24
25use super::route::{TransferDirection, TransferRoute};
26use super::sdk::{PutReport, SyncReport, SyncReportError, SyncStoreSdk};
27use super::topology_scanner::TopologyScanner;
28use super::topology_store::{TopologyFileView, TopologyStore};
29use super::transfer_engine::{PreparedTransfer, TransferEngine, TransferOutcome};
30use crate::application::error::SyncError;
31use crate::domain::config::SyncConfig;
32use crate::domain::file_type::FileType;
33use crate::domain::fingerprint::FileFingerprint;
34use crate::domain::graph::{EdgeCost, RouteGraph};
35use crate::domain::location::{LocationId, SyncSummary};
36use crate::domain::transfer::TransferState;
37use crate::domain::view::{ErrorEntry, PendingEntry, PresenceState};
38use crate::infra::backend::{ProgressFn, StorageBackend};
39use crate::infra::location::{Location, LocationKind};
40use crate::infra::location_file_store::LocationFileStore;
41use crate::infra::location_scanner::LocationScanner;
42use crate::infra::shell::RemoteShell;
43use crate::infra::topology_file_store::TopologyFileStore;
44use crate::infra::transfer_store::TransferStore;
45
46/// SyncStoreSdkの本実装。
47///
48/// scan→delta→plan→execute を一貫して実行する。
49/// インターフェース層は `Arc<dyn SyncStoreSdk>` として保持する。
50pub struct SdkImpl {
51    scanner: TopologyScanner,
52    topology: TopologyStore,
53    engine: TransferEngine,
54    topology_files: Arc<dyn TopologyFileStore>,
55    location_files: Arc<dyn LocationFileStore>,
56    transfer_store: Arc<dyn TransferStore>,
57    locations: Vec<Arc<dyn Location>>,
58    config: SyncConfig,
59    scan_excludes: Vec<glob::Pattern>,
60    /// Progress callback for reporting phase/chunk progress.
61    progress: StdMutex<Option<ProgressFn>>,
62}
63
64// =============================================================================
65// SdkImplBuilder — 外部crateからの構築用
66// =============================================================================
67
68/// ルート接続の中間表現。
69///
70/// `connect()` で登録され、`build()` 時に Location の `file_root()` で
71/// TransferRoute に変換される。コストは `LocationKind` の組み合わせから自動推定。
72struct PendingRoute {
73    src: LocationId,
74    dest: LocationId,
75    backend: Box<dyn StorageBackend>,
76    src_shell: Option<Box<dyn RemoteShell>>,
77    direction: TransferDirection,
78}
79
80/// SdkImplのビルダー。
81///
82/// Location(拠点)を `location()` で登録し、ルートを `connect()` で宣言する。
83/// Location からスキャナーが自動導出され、ルートコストは `LocationKind` の
84/// 組み合わせから自動推定される。
85///
86/// # 使用例
87///
88/// ```ignore
89/// let sdk = SdkImplBuilder::new(topology_files, location_files, transfer_store)
90///     .location(Arc::new(LocalLocation::new(root, hasher)))
91///     .location(Arc::new(SshLocation::new(pod_id, pod_root, shell)))
92///     .location(Arc::new(CloudLocation::new(cloud_id, cloud_root, backend)))
93///     .connect(&local_id, &cloud_id, rclone_backend)
94///     .connect_with_shell(&pod_id, &cloud_id, pod_rclone, pod_shell)
95///     .connect_pull(&cloud_id, &local_id, rclone_pull)
96///     .exclude(".git")
97///     .build()?;
98/// ```
99pub struct SdkImplBuilder {
100    topology_files: Arc<dyn TopologyFileStore>,
101    location_files: Arc<dyn LocationFileStore>,
102    transfer_store: Arc<dyn TransferStore>,
103    locations: Vec<Arc<dyn Location>>,
104    pending_routes: Vec<PendingRoute>,
105    config: Option<SyncConfig>,
106    scan_excludes: Vec<glob::Pattern>,
107}
108
109impl SdkImplBuilder {
110    /// 必須3ストアでビルダーを作成。
111    pub fn new(
112        topology_files: Arc<dyn TopologyFileStore>,
113        location_files: Arc<dyn LocationFileStore>,
114        transfer_store: Arc<dyn TransferStore>,
115    ) -> Self {
116        Self {
117            topology_files,
118            location_files,
119            transfer_store,
120            locations: Vec::new(),
121            pending_routes: Vec::new(),
122            config: None,
123            scan_excludes: Vec::new(),
124        }
125    }
126
127    /// Location(拠点)追加。
128    ///
129    /// Location trait 実装からスキャナーが自動導出される。
130    /// 同じLocationIdの二重登録は無視される。
131    pub fn location(mut self, loc: Arc<dyn Location>) -> Self {
132        if !self.locations.iter().any(|l| l.id() == loc.id()) {
133            self.locations.push(loc);
134        }
135        self
136    }
137
138    /// Push方向のルート接続を宣言。
139    ///
140    /// `src` → `dest` への転送ルートを登録する。
141    /// `file_root` は `build()` 時に Location から自動解決される。
142    /// コストは `LocationKind` の組み合わせから自動推定される。
143    pub fn connect(
144        mut self,
145        src: &LocationId,
146        dest: &LocationId,
147        backend: Box<dyn StorageBackend>,
148    ) -> Self {
149        self.pending_routes.push(PendingRoute {
150            src: src.clone(),
151            dest: dest.clone(),
152            backend,
153            src_shell: None,
154            direction: TransferDirection::Push,
155        });
156        self
157    }
158
159    /// Push方向 + ソース側Shell付きのルート接続。
160    ///
161    /// リモートホスト(Pod等)がソースの場合、ソース側でのファイル操作
162    /// (存在確認、ハッシュ計算)に `src_shell` を使用する。
163    pub fn connect_with_shell(
164        mut self,
165        src: &LocationId,
166        dest: &LocationId,
167        backend: Box<dyn StorageBackend>,
168        src_shell: Box<dyn RemoteShell>,
169    ) -> Self {
170        self.pending_routes.push(PendingRoute {
171            src: src.clone(),
172            dest: dest.clone(),
173            backend,
174            src_shell: Some(src_shell),
175            direction: TransferDirection::Push,
176        });
177        self
178    }
179
180    /// Pull方向のルート接続。
181    ///
182    /// Cloud → Local, Cloud → Pod のように、リモートからローカル方向への
183    /// 転送ルートを登録する。`backend.pull()` が使用される。
184    pub fn connect_pull(
185        mut self,
186        src: &LocationId,
187        dest: &LocationId,
188        backend: Box<dyn StorageBackend>,
189    ) -> Self {
190        self.pending_routes.push(PendingRoute {
191            src: src.clone(),
192            dest: dest.clone(),
193            backend,
194            src_shell: None,
195            direction: TransferDirection::Pull,
196        });
197        self
198    }
199
200    /// SyncConfig設定。
201    pub fn config(mut self, config: SyncConfig) -> Self {
202        self.config = Some(config);
203        self
204    }
205
206    /// スキャン除外パターン追加。
207    pub fn exclude(mut self, pattern: &str) -> Self {
208        match glob::Pattern::new(pattern) {
209            Ok(p) => self.scan_excludes.push(p),
210            Err(e) => {
211                tracing::warn!(pattern = pattern, error = %e, "invalid exclude glob pattern, skipped");
212            }
213        }
214        self
215    }
216
217    /// SdkImplを構築。
218    ///
219    /// 1. Location から Scanner を自動導出
220    /// 2. PendingRoute → TransferRoute に変換(file_root 自動解決 + コスト自動推定)
221    /// 3. TransferRoute から RouteGraph + TransferEngine を構築
222    /// 4. TopologyStore + TopologyScanner を構築
223    pub fn build(self) -> Result<SdkImpl, SyncError> {
224        use std::collections::HashMap;
225
226        let config = self.config.unwrap_or_default();
227
228        // Location map: LocationId → Arc<dyn Location>
229        let loc_map: HashMap<LocationId, &Arc<dyn Location>> =
230            self.locations.iter().map(|l| (l.id().clone(), l)).collect();
231
232        // Scanner 導出
233        let scanners: Vec<Arc<dyn LocationScanner>> =
234            self.locations.iter().map(|loc| loc.scanner()).collect();
235
236        // PendingRoute → TransferRoute 変換
237        let routes: Vec<TransferRoute> = self
238            .pending_routes
239            .into_iter()
240            .filter_map(|pr| {
241                let src_loc = loc_map.get(&pr.src)?;
242                let dest_loc = loc_map.get(&pr.dest)?;
243
244                let cost = match estimate_route_cost(src_loc.kind(), dest_loc.kind()) {
245                    Ok(c) => c,
246                    Err(e) => {
247                        tracing::warn!(src = ?src_loc.kind(), dest = ?dest_loc.kind(), error = %e, "skipping route: invalid cost");
248                        return None;
249                    }
250                };
251
252                let mut route = TransferRoute::new(
253                    pr.src,
254                    pr.dest,
255                    src_loc.file_root().to_path_buf(),
256                    dest_loc.file_root().to_path_buf(),
257                    pr.backend,
258                )
259                .with_cost(cost.time_per_gb, cost.priority);
260
261                if pr.direction == TransferDirection::Pull {
262                    route = route.direction(TransferDirection::Pull);
263                }
264                if let Some(shell) = pr.src_shell {
265                    route = route.with_src_shell(shell);
266                }
267
268                Some(route)
269            })
270            .collect();
271
272        // Location一覧
273        let location_ids: Vec<LocationId> =
274            self.locations.iter().map(|loc| loc.id().clone()).collect();
275
276        // RouteGraph構築(1回のみ。TopologyStore / TransferEngine で共有)
277        let mut graph = RouteGraph::new();
278        for r in &routes {
279            graph.add_with_cost(
280                r.src().clone(),
281                r.dest().clone(),
282                EdgeCost::new(r.time_per_gb(), r.priority())?,
283            );
284        }
285
286        let topology = TopologyStore::new(
287            self.topology_files.clone(),
288            self.location_files.clone(),
289            self.transfer_store.clone(),
290            graph.clone(),
291            location_ids,
292        );
293
294        let engine = TransferEngine::new(graph, routes, config.concurrency);
295
296        let scanner = TopologyScanner::new(
297            self.topology_files.clone(),
298            self.location_files.clone(),
299            scanners,
300        );
301
302        Ok(SdkImpl {
303            scanner,
304            topology,
305            engine,
306            topology_files: self.topology_files,
307            location_files: self.location_files,
308            transfer_store: self.transfer_store,
309            locations: self.locations,
310            config,
311            scan_excludes: self.scan_excludes,
312            progress: StdMutex::new(None),
313        })
314    }
315}
316
317/// LocationKind の組み合わせからルートコストを自動推定する。
318///
319/// optimal_tree(Dijkstra)がこのコストで最適経路を計算する。
320/// 例: Local→Pod(1.0) + Pod→Cloud(2.0) = 3.0 < Local→Cloud(5.0)
321/// → Pod経由チェーンが自動的に選択される。MCP層での条件分岐は不要。
322fn estimate_route_cost(
323    src: LocationKind,
324    dest: LocationKind,
325) -> Result<EdgeCost, crate::domain::error::DomainError> {
326    let (time_per_gb, priority) = match (src, dest) {
327        // LAN/SSH: 低コスト(ローカルネットワーク、低レイテンシ)
328        (LocationKind::Local, LocationKind::Remote) => (1.0, 10),
329        (LocationKind::Remote, LocationKind::Local) => (1.0, 10),
330
331        // DC帯域: 中コスト(データセンター内 or DC→Cloud)
332        (LocationKind::Remote, LocationKind::Cloud) => (2.0, 50),
333        (LocationKind::Cloud, LocationKind::Remote) => (2.0, 50),
334
335        // WAN: 高コスト(家庭回線アップロード/ダウンロード)
336        (LocationKind::Local, LocationKind::Cloud) => (5.0, 100),
337        (LocationKind::Cloud, LocationKind::Local) => (5.0, 100),
338
339        // 同種間: 中立(通常は発生しないが安全なフォールバック)
340        _ => (1.0, 100),
341    };
342    EdgeCost::new(time_per_gb, priority)
343}
344
345impl SdkImpl {
346    /// Report progress via the stored callback (if set).
347    fn report_progress(&self, msg: &str) {
348        if let Ok(guard) = self.progress.lock() {
349            if let Some(cb) = guard.as_ref() {
350                cb(msg);
351            }
352        }
353    }
354
355    /// BFS順でTransfer実行 + DB永続化。
356    ///
357    /// Engine.execute_prepared()で純粋なroute I/Oを実行し、
358    /// 結果をtransfer_storeに永続化 + unblock_dependentsでチェーン転送を解放する。
359    async fn execute_bfs(
360        &self,
361        skip_locations: &std::collections::HashSet<crate::domain::location::LocationId>,
362    ) -> Result<(usize, usize, Vec<SyncReportError>), SyncError> {
363        let mut total_transferred = 0usize;
364        let mut total_failed = 0usize;
365        let mut all_errors: Vec<SyncReportError> = Vec::new();
366
367        let targets = self.engine.all_targets_ordered();
368        debug!(targets = ?targets, "execute_bfs: BFS target order");
369
370        for target in &targets {
371            if skip_locations.contains(target) {
372                warn!(
373                    target = %target,
374                    "execute_bfs: skipping target (ensure failed)"
375                );
376                continue;
377            }
378            let queued = self.transfer_store.queued_transfers(target).await?;
379            if queued.is_empty() {
380                debug!(target = %target, "execute_bfs: no queued transfers, skip");
381                continue;
382            }
383            info!(target = %target, queued = queued.len(), "execute_bfs: processing target");
384            self.report_progress(&format!("target {target}: {} queued", queued.len()));
385
386            // Prepare: resolve relative_path from topology_files
387            let mut prepared = Vec::with_capacity(queued.len());
388            let mut resolve_miss = 0usize;
389            for transfer in queued {
390                match self.topology_files.get_by_id(transfer.file_id()).await {
391                    Ok(Some(file)) => {
392                        trace!(
393                            file_id = %transfer.file_id(),
394                            path = %file.relative_path(),
395                            src = %transfer.src(),
396                            dest = %transfer.dest(),
397                            "execute_bfs: prepared"
398                        );
399                        prepared.push(PreparedTransfer {
400                            transfer,
401                            relative_path: file.relative_path().to_string(),
402                        });
403                    }
404                    Ok(None) => {
405                        resolve_miss += 1;
406                        error!(
407                            file_id = %transfer.file_id(),
408                            src = %transfer.src(),
409                            dest = %transfer.dest(),
410                            "execute_bfs: topology_file not found — transfer skipped"
411                        );
412                        total_failed += 1;
413                        all_errors.push(SyncReportError {
414                            path: transfer.file_id().to_string(),
415                            error: format!("file {} not found in store", transfer.file_id()),
416                        });
417                    }
418                    Err(e) => {
419                        resolve_miss += 1;
420                        error!(
421                            file_id = %transfer.file_id(),
422                            src = %transfer.src(),
423                            dest = %transfer.dest(),
424                            err = %e,
425                            "execute_bfs: topology_file lookup error — transfer skipped"
426                        );
427                        total_failed += 1;
428                        all_errors.push(SyncReportError {
429                            path: transfer.file_id().to_string(),
430                            error: e.to_string(),
431                        });
432                    }
433                }
434            }
435            // Partition: sync / delete を分離して段階実行
436            // sync完了→DB永続化→delete実行→DB永続化 の2段階。
437            // delete がハング/失敗しても sync 結果がDBに反映される。
438            let (sync_prepared, delete_prepared): (Vec<_>, Vec<_>) =
439                prepared.into_iter().partition(|p| !p.transfer.is_delete());
440
441            debug!(
442                target = %target,
443                sync = sync_prepared.len(),
444                delete = delete_prepared.len(),
445                resolve_miss = resolve_miss,
446                "execute_bfs: preparation done"
447            );
448
449            // Phase A: Sync transfers → execute → DB persist
450            if !sync_prepared.is_empty() {
451                info!(
452                    target = %target,
453                    count = sync_prepared.len(),
454                    "execute_bfs: executing sync transfers"
455                );
456                let sync_outcomes = self.engine.execute_prepared(sync_prepared).await;
457                info!(
458                    target = %target,
459                    outcomes = sync_outcomes.len(),
460                    "execute_bfs: sync execution done, persisting"
461                );
462                self.persist_outcomes(
463                    &sync_outcomes,
464                    &mut total_transferred,
465                    &mut total_failed,
466                    &mut all_errors,
467                )
468                .await?;
469            }
470
471            // Phase B: Delete transfers → execute → DB persist
472            if !delete_prepared.is_empty() {
473                info!(
474                    target = %target,
475                    count = delete_prepared.len(),
476                    "execute_bfs: executing delete transfers"
477                );
478                let delete_outcomes = self.engine.execute_prepared(delete_prepared).await;
479                info!(
480                    target = %target,
481                    outcomes = delete_outcomes.len(),
482                    "execute_bfs: delete execution done, persisting"
483                );
484                self.persist_outcomes(
485                    &delete_outcomes,
486                    &mut total_transferred,
487                    &mut total_failed,
488                    &mut all_errors,
489                )
490                .await?;
491            }
492
493            info!(
494                target = %target,
495                transferred = total_transferred,
496                failed = total_failed,
497                "execute_bfs: target batch done"
498            );
499        }
500
501        Ok((total_transferred, total_failed, all_errors))
502    }
503
504    /// TransferOutcome群をDB永続化する共通ヘルパー。
505    ///
506    /// sync/delete の2段階実行で共通化するために抽出。
507    async fn persist_outcomes(
508        &self,
509        outcomes: &[TransferOutcome],
510        total_transferred: &mut usize,
511        total_failed: &mut usize,
512        all_errors: &mut Vec<SyncReportError>,
513    ) -> Result<(), SyncError> {
514        for outcome in outcomes {
515            let is_completed = outcome.transfer.state() == TransferState::Completed;
516            self.transfer_store
517                .update_transfer(&outcome.transfer)
518                .await?;
519
520            if is_completed {
521                self.transfer_store
522                    .unblock_dependents(outcome.transfer.id())
523                    .await?;
524
525                if outcome.transfer.is_delete() {
526                    // Delete完了 = dest側にファイルが存在しない → LocationFile削除
527                    let deleted = self
528                        .location_files
529                        .delete(outcome.transfer.file_id(), outcome.transfer.dest())
530                        .await?;
531                    trace!(
532                        file_id = %outcome.transfer.file_id(),
533                        dest = %outcome.transfer.dest(),
534                        deleted = deleted,
535                        "execute_bfs: delete transfer → LocationFile removed"
536                    );
537                } else {
538                    // Sync完了 = dest側にファイルが存在 → LocationFile作成
539                    if let Ok(Some(tf)) = self
540                        .topology_files
541                        .get_by_id(outcome.transfer.file_id())
542                        .await
543                    {
544                        let src_lf = self
545                            .location_files
546                            .get(outcome.transfer.file_id(), outcome.transfer.src())
547                            .await?;
548                        if let Some(src_lf) = src_lf {
549                            trace!(
550                                file_id = %outcome.transfer.file_id(),
551                                src = %outcome.transfer.src(),
552                                dest = %outcome.transfer.dest(),
553                                path = %outcome.relative_path,
554                                "persist_outcomes: creating dest LocationFile from src"
555                            );
556                            let dest_lf = tf
557                                .materialize(
558                                    outcome.transfer.dest().clone(),
559                                    outcome.relative_path.clone(),
560                                    src_lf.fingerprint().clone(),
561                                    src_lf.embedded_id().map(|s| s.to_string()),
562                                )
563                                .map_err(SyncError::Domain)?;
564                            self.location_files.upsert(&dest_lf).await?;
565                        } else {
566                            warn!(
567                                file_id = %outcome.transfer.file_id(),
568                                src = %outcome.transfer.src(),
569                                "persist_outcomes: src LocationFile not found, cannot create dest LF"
570                            );
571                        }
572                    } else {
573                        warn!(
574                            file_id = %outcome.transfer.file_id(),
575                            "persist_outcomes: TopologyFile not found for completed transfer"
576                        );
577                    }
578                }
579
580                *total_transferred += 1;
581                info!(
582                    id = %outcome.transfer.id(),
583                    src = %outcome.transfer.src(),
584                    dest = %outcome.transfer.dest(),
585                    path = %outcome.relative_path,
586                    kind = ?outcome.transfer.kind(),
587                    "execute_bfs: transfer completed"
588                );
589            } else {
590                *total_failed += 1;
591                let err_msg = outcome
592                    .transfer
593                    .error()
594                    .map(|e| e.to_string())
595                    .unwrap_or_else(|| "unknown error".to_string());
596                error!(
597                    id = %outcome.transfer.id(),
598                    src = %outcome.transfer.src(),
599                    dest = %outcome.transfer.dest(),
600                    path = %outcome.relative_path,
601                    err = %err_msg,
602                    "execute_bfs: transfer FAILED"
603                );
604                all_errors.push(SyncReportError {
605                    path: outcome.relative_path.clone(),
606                    error: err_msg,
607                });
608            }
609        }
610        Ok(())
611    }
612}
613
614#[async_trait]
615impl SyncStoreSdk for SdkImpl {
616    // =========================================================================
617    // UseCase — 同期操作
618    // =========================================================================
619
620    async fn sync(&self) -> Result<SyncReport, SyncError> {
621        info!("sdk_impl::sync: pipeline start");
622        self.report_progress("ensure: checking locations");
623
624        // Phase 0a: Ensure — 全拠点の到達確認 + 外部ツール確保
625        // 失敗したLocationはスキャン/転送対象から除外し、syncは続行する。
626        let location_ids: Vec<String> = self.locations.iter().map(|l| l.id().to_string()).collect();
627        info!(
628            location_count = self.locations.len(),
629            locations = %location_ids.join(", "),
630            "sdk_impl::sync: ensure start"
631        );
632        let mut failed_locations: std::collections::HashSet<LocationId> =
633            std::collections::HashSet::new();
634        for loc in &self.locations {
635            info!(
636                location = %loc.id(),
637                kind = ?loc.kind(),
638                "sdk_impl::sync: ensure checking"
639            );
640            match loc.ensure().await {
641                Ok(()) => {
642                    info!(location = %loc.id(), "sdk_impl::sync: ensure ok");
643                }
644                Err(e) => {
645                    error!(
646                        location = %loc.id(),
647                        kind = ?loc.kind(),
648                        error = %e,
649                        "sdk_impl::sync: ensure FAILED — this location will be excluded from sync"
650                    );
651                    failed_locations.insert(loc.id().clone());
652                }
653            }
654        }
655        if failed_locations.is_empty() {
656            info!("sdk_impl::sync: ensure done — all locations reachable");
657        } else {
658            let excluded: Vec<String> = failed_locations.iter().map(|l| l.to_string()).collect();
659            warn!(
660                excluded = %excluded.join(", "),
661                "sdk_impl::sync: ensure done — {} location(s) excluded due to ensure failure",
662                failed_locations.len()
663            );
664        }
665
666        // Phase 0b: InFlight孤児の終端化(プロセスクラッシュ復帰)
667        let cancelled = self.transfer_store.cancel_orphaned_inflight().await?;
668        if cancelled > 0 {
669            info!(
670                cancelled_count = cancelled,
671                "sdk_impl::sync: cancelled orphaned InFlight transfers"
672            );
673        }
674
675        // Phase 1: Scan → TopologyDelta[]
676        self.report_progress("scan: scanning locations");
677        info!("sdk_impl::sync: phase1 scan start");
678        let progress_cb = self.progress.lock().ok().and_then(|g| g.clone());
679        let scan_result = self
680            .scanner
681            .scan_all(&self.scan_excludes, &failed_locations, progress_cb.as_ref())
682            .await?;
683        info!(
684            scanned = scan_result.scanned,
685            deltas = scan_result.deltas.len(),
686            scan_errors = scan_result.scan_errors.len(),
687            "sdk_impl::sync: phase1 scan done"
688        );
689        // delta詳細をtrace出力
690        for delta in &scan_result.deltas {
691            trace!(delta = ?delta, "sdk_impl::sync: delta");
692        }
693
694        // Phase 2: Plan — Apply → Distribute → Route → Transfer作成
695        self.report_progress(&format!(
696            "plan: {} files scanned, {} deltas",
697            scan_result.scanned,
698            scan_result.deltas.len()
699        ));
700        info!(
701            delta_count = scan_result.deltas.len(),
702            "sdk_impl::sync: phase2 plan start"
703        );
704        let plan_result = self.topology.sync(&scan_result.deltas).await?;
705        info!(
706            transfers_created = plan_result.transfers_created,
707            conflicts = plan_result.conflicts.len(),
708            "sdk_impl::sync: phase2 plan done"
709        );
710
711        // Phase 3: Execute — BFS順でTransfer実行 + DB永続化
712        // Propagate progress callback to all route backends for chunk-level reporting.
713        if let Ok(guard) = self.progress.lock() {
714            self.engine.set_progress_callback(guard.clone());
715        }
716        self.report_progress(&format!(
717            "execute: {} transfers queued",
718            plan_result.transfers_created
719        ));
720        info!("sdk_impl::sync: phase3 execute start");
721        let (transferred, failed, errors) = self.execute_bfs(&failed_locations).await?;
722        // Clear backend callbacks after execution.
723        self.engine.set_progress_callback(None);
724        info!(
725            transferred = transferred,
726            failed = failed,
727            error_count = errors.len(),
728            "sdk_impl::sync: phase3 execute done"
729        );
730
731        Ok(SyncReport {
732            scanned: scan_result.scanned,
733            scan_errors: scan_result
734                .scan_errors
735                .iter()
736                .map(|e| SyncReportError {
737                    path: e.path.clone(),
738                    error: e.error.clone(),
739                })
740                .collect(),
741            transfers_created: plan_result.transfers_created,
742            transferred,
743            failed,
744            errors,
745            conflicts: plan_result
746                .conflicts
747                .iter()
748                .map(super::sdk::SyncReportConflict::from)
749                .collect(),
750        })
751    }
752
753    async fn sync_route(
754        &self,
755        src: &LocationId,
756        dest: &LocationId,
757    ) -> Result<SyncReport, SyncError> {
758        // Phase 0: InFlight孤児の終端化
759        let cancelled = self.transfer_store.cancel_orphaned_inflight().await?;
760        if cancelled > 0 {
761            info!(
762                cancelled_count = cancelled,
763                "sync_route: cancelled orphaned InFlight transfers"
764            );
765        }
766
767        // Phase 1: Plan — sync_routeはdelta生成なし、Distribute + Route のみ
768        self.report_progress(&format!("plan: route {src} → {dest}"));
769        let plan_result = self.topology.sync_route(src, dest).await?;
770
771        // Phase 2: Execute — dest宛のQueued Transferをsrcでフィルタして実行
772        // Propagate progress callback to all route backends.
773        if let Ok(guard) = self.progress.lock() {
774            self.engine.set_progress_callback(guard.clone());
775        }
776        let queued = self.transfer_store.queued_transfers(dest).await?;
777        let eligible: Vec<_> = queued.into_iter().filter(|t| t.src() == src).collect();
778
779        let mut prepared = Vec::with_capacity(eligible.len());
780        let mut total_failed = 0usize;
781        let mut all_errors: Vec<SyncReportError> = Vec::new();
782
783        for transfer in eligible {
784            match self.topology_files.get_by_id(transfer.file_id()).await {
785                Ok(Some(file)) => {
786                    prepared.push(PreparedTransfer {
787                        transfer,
788                        relative_path: file.relative_path().to_string(),
789                    });
790                }
791                Ok(None) => {
792                    total_failed += 1;
793                    all_errors.push(SyncReportError {
794                        path: transfer.file_id().to_string(),
795                        error: format!("file {} not found in store", transfer.file_id()),
796                    });
797                }
798                Err(e) => {
799                    total_failed += 1;
800                    all_errors.push(SyncReportError {
801                        path: transfer.file_id().to_string(),
802                        error: e.to_string(),
803                    });
804                }
805            }
806        }
807
808        self.report_progress(&format!(
809            "execute: {} transfers ({src} → {dest})",
810            prepared.len()
811        ));
812        let outcomes = self.engine.execute_prepared(prepared).await;
813        // Clear backend callbacks after execution.
814        self.engine.set_progress_callback(None);
815        let mut total_transferred = 0usize;
816
817        self.persist_outcomes(
818            &outcomes,
819            &mut total_transferred,
820            &mut total_failed,
821            &mut all_errors,
822        )
823        .await?;
824
825        Ok(SyncReport {
826            scanned: 0,
827            scan_errors: Vec::new(),
828            transfers_created: plan_result.transfers_created,
829            transferred: total_transferred,
830            failed: total_failed,
831            errors: all_errors,
832            conflicts: plan_result
833                .conflicts
834                .iter()
835                .map(super::sdk::SyncReportConflict::from)
836                .collect(),
837        })
838    }
839
840    // =========================================================================
841    // Command — ファイル操作
842    // =========================================================================
843
844    async fn put(
845        &self,
846        path: &str,
847        file_type: FileType,
848        fingerprint: FileFingerprint,
849        origin: &LocationId,
850        embedded_id: Option<String>,
851    ) -> Result<PutReport, SyncError> {
852        let result = self
853            .topology
854            .put(path, file_type, fingerprint, origin, embedded_id)
855            .await?;
856        Ok(PutReport {
857            file_id: result.topology_file_id,
858            is_new: result.is_new,
859            transfers_created: result.transfers_created,
860        })
861    }
862
863    async fn delete(&self, path: &str) -> Result<usize, SyncError> {
864        self.topology.delete(path).await
865    }
866
867    // =========================================================================
868    // Query — 読み取り
869    // =========================================================================
870
871    async fn get(&self, path: &str) -> Result<Option<TopologyFileView>, SyncError> {
872        self.topology.get(path).await
873    }
874
875    async fn list(
876        &self,
877        file_type: Option<FileType>,
878        limit: Option<usize>,
879    ) -> Result<Vec<TopologyFileView>, SyncError> {
880        self.topology.list(file_type, limit).await
881    }
882
883    async fn status(&self) -> Result<SyncSummary, SyncError> {
884        use crate::domain::location::LocationSummary;
885        use crate::domain::transfer::TransferState;
886        use std::collections::HashMap;
887
888        let retry_policy = self.config.retry_policy();
889        let total_files = self.topology.file_count().await?;
890        let stats = self.transfer_store.transfer_stats().await?;
891        let present_counts = self.transfer_store.present_counts_by_location().await?;
892        let failed = self.transfer_store.failed_transfers().await?;
893        let pending = self.transfer_store.all_pending_transfers().await?;
894
895        let mut locations: HashMap<LocationId, LocationSummary> = HashMap::new();
896        let mut total_errors = 0usize;
897
898        for (loc, count) in &present_counts {
899            let summary = locations.entry(loc.clone()).or_default();
900            summary.present = *count;
901        }
902
903        for row in &stats {
904            if row.state == TransferState::Completed || row.state == TransferState::Cancelled {
905                continue;
906            }
907            let dest_state = match row.state {
908                TransferState::Blocked | TransferState::Queued => PresenceState::Pending,
909                TransferState::InFlight => PresenceState::Syncing,
910                TransferState::Failed => {
911                    let exhausted = match row.error_kind.as_deref() {
912                        Some("permanent") => true,
913                        _ => row.attempt >= retry_policy.max_attempts(),
914                    };
915                    if exhausted {
916                        PresenceState::Failed
917                    } else {
918                        PresenceState::Pending
919                    }
920                }
921                TransferState::Completed | TransferState::Cancelled => PresenceState::Absent,
922            };
923
924            let dest_summary = locations.entry(row.dest.clone()).or_default();
925            match dest_state {
926                PresenceState::Pending => {
927                    dest_summary.pending = dest_summary.pending.saturating_add(row.file_count);
928                }
929                PresenceState::Syncing => {
930                    dest_summary.syncing = dest_summary.syncing.saturating_add(row.file_count);
931                }
932                PresenceState::Failed => {
933                    dest_summary.failed = dest_summary.failed.saturating_add(row.file_count);
934                    total_errors = total_errors.saturating_add(row.file_count);
935                }
936                PresenceState::Absent => {
937                    dest_summary.absent = dest_summary.absent.saturating_add(row.file_count);
938                }
939                PresenceState::Present => {}
940            }
941        }
942
943        let error_entries: Vec<ErrorEntry> = failed
944            .iter()
945            .filter(|t| {
946                let state = PresenceState::from_transfer(t, &retry_policy);
947                state == PresenceState::Failed
948            })
949            .map(ErrorEntry::from_transfer)
950            .collect();
951
952        let mut pending_entries: Vec<PendingEntry> =
953            pending.iter().map(PendingEntry::from_transfer).collect();
954        for t in &failed {
955            let state = PresenceState::from_transfer(t, &retry_policy);
956            if state == PresenceState::Pending {
957                pending_entries.push(PendingEntry::from_transfer(t));
958            }
959        }
960
961        Ok(SyncSummary {
962            locations,
963            total_entries: total_files,
964            total_errors,
965            error_entries,
966            pending_entries,
967        })
968    }
969
970    async fn errors(&self) -> Result<Vec<ErrorEntry>, SyncError> {
971        let retry_policy = self.config.retry_policy();
972        let failed = self.transfer_store.failed_transfers().await?;
973        Ok(failed
974            .iter()
975            .filter(|t| {
976                let state = PresenceState::from_transfer(t, &retry_policy);
977                state == PresenceState::Failed
978            })
979            .map(ErrorEntry::from_transfer)
980            .collect())
981    }
982
983    async fn pending(&self, dest: &LocationId) -> Result<Vec<PendingEntry>, SyncError> {
984        let retry_policy = self.config.retry_policy();
985
986        // Queued/Blocked/InFlight transfers for the target dest
987        let all_pending = self.transfer_store.all_pending_transfers().await?;
988        let mut entries: Vec<PendingEntry> = all_pending
989            .iter()
990            .filter(|t| t.dest() == dest)
991            .map(PendingEntry::from_transfer)
992            .collect();
993
994        // Failed but retryable transfers also count as pending
995        let failed = self.transfer_store.failed_transfers().await?;
996        for t in &failed {
997            if t.dest() == dest {
998                let state = PresenceState::from_transfer(t, &retry_policy);
999                if state == PresenceState::Pending {
1000                    entries.push(PendingEntry::from_transfer(t));
1001                }
1002            }
1003        }
1004
1005        Ok(entries)
1006    }
1007
1008    // =========================================================================
1009    // Topology — 読み取り専用
1010    // =========================================================================
1011
1012    fn locations(&self) -> Vec<LocationId> {
1013        self.topology.locations().to_vec()
1014    }
1015
1016    fn all_edges(&self) -> Vec<(LocationId, LocationId)> {
1017        self.engine.all_edges()
1018    }
1019
1020    fn local_root(&self) -> Option<&Path> {
1021        self.engine.local_root()
1022    }
1023
1024    fn set_progress_callback(&self, callback: Option<ProgressFn>) {
1025        if let Ok(mut guard) = self.progress.lock() {
1026            *guard = callback;
1027        }
1028    }
1029}