Skip to main content

vdsl_sync/application/sdk_impl/
sync_ops.rs

1//! `SyncStoreSdk` trait の `SdkImpl` 実装。
2//!
3//! 公開API面(sync / sync_route / put / delete / restore / get / list /
4//! status / errors / pending / locations / all_edges / local_root /
5//! set_progress_callback)を集約する。
6
7use std::path::Path;
8
9use async_trait::async_trait;
10use tracing::{error, info, trace, warn};
11
12use super::SdkImpl;
13use crate::application::error::SyncError;
14use crate::application::sdk::{PutReport, SyncReport, SyncReportError, SyncStoreSdk};
15use crate::application::topology_store::TopologyFileView;
16use crate::application::transfer_engine::PreparedTransfer;
17use crate::domain::file_type::FileType;
18use crate::domain::fingerprint::FileFingerprint;
19use crate::domain::location::{LocationId, SyncSummary};
20use crate::domain::view::{ErrorEntry, PendingEntry, PresenceState};
21use crate::infra::backend::ProgressFn;
22
23#[async_trait]
24impl SyncStoreSdk for SdkImpl {
25    // =========================================================================
26    // UseCase — 同期操作
27    // =========================================================================
28
29    async fn sync(&self) -> Result<SyncReport, SyncError> {
30        info!("sdk_impl::sync: pipeline start");
31        self.report_progress("ensure: checking locations");
32
33        // Phase 0a: Ensure — 全拠点の到達確認 + 外部ツール確保
34        // 失敗したLocationはスキャン/転送対象から除外し、syncは続行する。
35        let location_ids: Vec<String> =
36            self.locations.iter().map(|l| l.id().to_string()).collect();
37        info!(
38            location_count = self.locations.len(),
39            locations = %location_ids.join(", "),
40            "sdk_impl::sync: ensure start"
41        );
42        let mut failed_locations: std::collections::HashSet<LocationId> =
43            std::collections::HashSet::new();
44        for loc in &self.locations {
45            info!(
46                location = %loc.id(),
47                kind = ?loc.kind(),
48                "sdk_impl::sync: ensure checking"
49            );
50            match loc.ensure().await {
51                Ok(()) => {
52                    info!(location = %loc.id(), "sdk_impl::sync: ensure ok");
53                }
54                Err(e) => {
55                    error!(
56                        location = %loc.id(),
57                        kind = ?loc.kind(),
58                        error = %e,
59                        "sdk_impl::sync: ensure FAILED — this location will be excluded from sync"
60                    );
61                    failed_locations.insert(loc.id().clone());
62                }
63            }
64        }
65        if failed_locations.is_empty() {
66            info!("sdk_impl::sync: ensure done — all locations reachable");
67        } else {
68            let excluded: Vec<String> =
69                failed_locations.iter().map(|l| l.to_string()).collect();
70            warn!(
71                excluded = %excluded.join(", "),
72                "sdk_impl::sync: ensure done — {} location(s) excluded due to ensure failure",
73                failed_locations.len()
74            );
75        }
76
77        // Phase 0b: InFlight孤児の終端化(プロセスクラッシュ復帰)
78        let cancelled = self.transfer_store.cancel_orphaned_inflight().await?;
79        if cancelled > 0 {
80            info!(
81                cancelled_count = cancelled,
82                "sdk_impl::sync: cancelled orphaned InFlight transfers"
83            );
84        }
85
86        // Phase 1: Scan → TopologyDelta[]
87        self.report_progress("scan: scanning locations");
88        info!("sdk_impl::sync: phase1 scan start");
89        let progress_cb = self.progress.lock().ok().and_then(|g| g.clone());
90        let scan_result = self
91            .scanner
92            .scan_all(&self.scan_excludes, &failed_locations, progress_cb.as_ref())
93            .await?;
94        info!(
95            scanned = scan_result.scanned,
96            deltas = scan_result.deltas.len(),
97            scan_errors = scan_result.scan_errors.len(),
98            "sdk_impl::sync: phase1 scan done"
99        );
100        // delta詳細をtrace出力
101        for delta in &scan_result.deltas {
102            trace!(delta = ?delta, "sdk_impl::sync: delta");
103        }
104
105        // Phase 2: Plan — Apply → Distribute → Route → Transfer作成
106        self.report_progress(&format!(
107            "plan: {} files scanned, {} deltas",
108            scan_result.scanned,
109            scan_result.deltas.len()
110        ));
111        info!(
112            delta_count = scan_result.deltas.len(),
113            "sdk_impl::sync: phase2 plan start"
114        );
115        let plan_result = self.topology.sync(&scan_result.deltas).await?;
116        info!(
117            transfers_created = plan_result.transfers_created,
118            conflicts = plan_result.conflicts.len(),
119            "sdk_impl::sync: phase2 plan done"
120        );
121
122        // Phase 3: Execute — BFS順でTransfer実行 + DB永続化
123        // Propagate progress callback to all route backends for chunk-level reporting.
124        if let Ok(guard) = self.progress.lock() {
125            self.engine.set_progress_callback(guard.clone());
126        }
127        self.report_progress(&format!(
128            "execute: {} transfers queued",
129            plan_result.transfers_created
130        ));
131        info!("sdk_impl::sync: phase3 execute start");
132        let (transferred, failed, errors) = self.execute_bfs(&failed_locations).await?;
133        // Clear backend callbacks after execution.
134        self.engine.set_progress_callback(None);
135        info!(
136            transferred = transferred,
137            failed = failed,
138            error_count = errors.len(),
139            "sdk_impl::sync: phase3 execute done"
140        );
141
142        Ok(SyncReport {
143            scanned: scan_result.scanned,
144            scan_errors: scan_result
145                .scan_errors
146                .iter()
147                .map(|e| SyncReportError {
148                    path: e.path.clone(),
149                    error: e.error.clone(),
150                })
151                .collect(),
152            transfers_created: plan_result.transfers_created,
153            transferred,
154            failed,
155            errors,
156            conflicts: plan_result
157                .conflicts
158                .iter()
159                .map(crate::application::sdk::SyncReportConflict::from)
160                .collect(),
161        })
162    }
163
164    async fn sync_route(
165        &self,
166        src: &LocationId,
167        dest: &LocationId,
168    ) -> Result<SyncReport, SyncError> {
169        // Phase 0: InFlight孤児の終端化
170        let cancelled = self.transfer_store.cancel_orphaned_inflight().await?;
171        if cancelled > 0 {
172            info!(
173                cancelled_count = cancelled,
174                "sync_route: cancelled orphaned InFlight transfers"
175            );
176        }
177
178        // Phase 1: Plan — sync_routeはdelta生成なし、Distribute + Route のみ
179        self.report_progress(&format!("plan: route {src} → {dest}"));
180        let plan_result = self.topology.sync_route(src, dest).await?;
181
182        // Phase 2: Execute — dest宛のQueued Transferをsrcでフィルタして実行
183        // Propagate progress callback to all route backends.
184        if let Ok(guard) = self.progress.lock() {
185            self.engine.set_progress_callback(guard.clone());
186        }
187        let queued = self.transfer_store.queued_transfers(dest).await?;
188        let eligible: Vec<_> = queued.into_iter().filter(|t| t.src() == src).collect();
189
190        let mut prepared = Vec::with_capacity(eligible.len());
191        let mut total_failed = 0usize;
192        let mut all_errors: Vec<SyncReportError> = Vec::new();
193
194        for transfer in eligible {
195            match self.topology_files.get_by_id(transfer.file_id()).await {
196                Ok(Some(file)) => {
197                    prepared.push(PreparedTransfer {
198                        transfer,
199                        relative_path: file.relative_path().to_string(),
200                    });
201                }
202                Ok(None) => {
203                    total_failed += 1;
204                    all_errors.push(SyncReportError {
205                        path: transfer.file_id().to_string(),
206                        error: format!("file {} not found in store", transfer.file_id()),
207                    });
208                }
209                Err(e) => {
210                    total_failed += 1;
211                    all_errors.push(SyncReportError {
212                        path: transfer.file_id().to_string(),
213                        error: e.to_string(),
214                    });
215                }
216            }
217        }
218
219        self.report_progress(&format!(
220            "execute: {} transfers ({src} → {dest})",
221            prepared.len()
222        ));
223        let outcomes = self.engine.execute_prepared(prepared).await;
224        // Clear backend callbacks after execution.
225        self.engine.set_progress_callback(None);
226        let mut total_transferred = 0usize;
227
228        self.persist_outcomes(
229            &outcomes,
230            &mut total_transferred,
231            &mut total_failed,
232            &mut all_errors,
233        )
234        .await?;
235
236        Ok(SyncReport {
237            scanned: 0,
238            scan_errors: Vec::new(),
239            transfers_created: plan_result.transfers_created,
240            transferred: total_transferred,
241            failed: total_failed,
242            errors: all_errors,
243            conflicts: plan_result
244                .conflicts
245                .iter()
246                .map(crate::application::sdk::SyncReportConflict::from)
247                .collect(),
248        })
249    }
250
251    // =========================================================================
252    // Command — ファイル操作
253    // =========================================================================
254
255    async fn put(
256        &self,
257        path: &str,
258        file_type: FileType,
259        fingerprint: FileFingerprint,
260        origin: &LocationId,
261        embedded_id: Option<String>,
262    ) -> Result<PutReport, SyncError> {
263        let result = self
264            .topology
265            .put(path, file_type, fingerprint, origin, embedded_id)
266            .await?;
267        Ok(PutReport {
268            file_id: result.topology_file_id,
269            is_new: result.is_new,
270            transfers_created: result.transfers_created,
271        })
272    }
273
274    async fn delete(&self, path: &str) -> Result<usize, SyncError> {
275        self.topology.delete(path).await
276    }
277
278    async fn restore(&self, path: &str, revision: &str) -> Result<(), SyncError> {
279        info!(path = %path, revision = %revision, "sdk_impl::restore: start");
280
281        // 1. archive_root を持つルート(cloud宛)を engine から1件取得
282        let route = self.engine.archive_route().ok_or_else(|| -> SyncError {
283            crate::infra::error::InfraError::Transfer {
284                reason: "restore: no route with archive_root configured".into(),
285            }
286            .into()
287        })?;
288
289        // 2. 物理復元: cloud archive → cloud original
290        route.restore_from_archive(path, revision).await?;
291        info!(path = %path, "sdk_impl::restore: physical restore done");
292
293        // 3. 削除済みTopologyFileを取得して unmark
294        //    delete transfers 完走後は TF が hard-delete されている (commit c8213ce)
295        //    ため見つからないケースがある。物理 restore は既に完了しているので
296        //    次回 full sync で cloud から再発見させれば整合が取れる。
297        let deleted_tfs = self.topology_files.list_deleted().await?;
298        match deleted_tfs.into_iter().find(|t| t.relative_path() == path) {
299            Some(mut tf) => {
300                tf.unmark_deleted();
301                self.topology_files.upsert(&tf).await?;
302                info!(path = %path, file_id = %tf.id(), "sdk_impl::restore: TopologyFile unmarked");
303            }
304            None => {
305                warn!(
306                    path = %path,
307                    "sdk_impl::restore: TopologyFile not in deleted list (likely hard-deleted after delete transfers). Physical restore succeeded — next full sync will re-register."
308                );
309            }
310        }
311
312        Ok(())
313    }
314
315    // =========================================================================
316    // Query — 読み取り
317    // =========================================================================
318
319    async fn get(&self, path: &str) -> Result<Option<TopologyFileView>, SyncError> {
320        self.topology.get(path).await
321    }
322
323    async fn list(
324        &self,
325        file_type: Option<FileType>,
326        limit: Option<usize>,
327    ) -> Result<Vec<TopologyFileView>, SyncError> {
328        self.topology.list(file_type, limit).await
329    }
330
331    async fn status(&self) -> Result<SyncSummary, SyncError> {
332        use crate::domain::location::LocationSummary;
333        use crate::domain::transfer::TransferState;
334        use std::collections::HashMap;
335
336        let retry_policy = self.config.retry_policy();
337        let total_files = self.topology.file_count().await?;
338        let stats = self.transfer_store.transfer_stats().await?;
339        let present_counts = self.transfer_store.present_counts_by_location().await?;
340        let failed = self.transfer_store.failed_transfers().await?;
341        let pending = self.transfer_store.all_pending_transfers().await?;
342
343        let mut locations: HashMap<LocationId, LocationSummary> = HashMap::new();
344        let mut total_errors = 0usize;
345
346        for (loc, count) in &present_counts {
347            let summary = locations.entry(loc.clone()).or_default();
348            summary.present = *count;
349        }
350
351        for row in &stats {
352            if row.state == TransferState::Completed || row.state == TransferState::Cancelled {
353                continue;
354            }
355            let dest_state = match row.state {
356                TransferState::Blocked | TransferState::Queued => PresenceState::Pending,
357                TransferState::InFlight => PresenceState::Syncing,
358                TransferState::Failed => {
359                    let exhausted = match row.error_kind.as_deref() {
360                        Some("permanent") => true,
361                        _ => row.attempt >= retry_policy.max_attempts(),
362                    };
363                    if exhausted {
364                        PresenceState::Failed
365                    } else {
366                        PresenceState::Pending
367                    }
368                }
369                TransferState::Completed | TransferState::Cancelled => PresenceState::Absent,
370            };
371
372            let dest_summary = locations.entry(row.dest.clone()).or_default();
373            match dest_state {
374                PresenceState::Pending => {
375                    dest_summary.pending = dest_summary.pending.saturating_add(row.file_count);
376                }
377                PresenceState::Syncing => {
378                    dest_summary.syncing = dest_summary.syncing.saturating_add(row.file_count);
379                }
380                PresenceState::Failed => {
381                    dest_summary.failed = dest_summary.failed.saturating_add(row.file_count);
382                    total_errors = total_errors.saturating_add(row.file_count);
383                }
384                PresenceState::Absent => {
385                    dest_summary.absent = dest_summary.absent.saturating_add(row.file_count);
386                }
387                PresenceState::Present => {}
388            }
389        }
390
391        let error_entries: Vec<ErrorEntry> = failed
392            .iter()
393            .filter(|t| {
394                let state = PresenceState::from_transfer(t, &retry_policy);
395                state == PresenceState::Failed
396            })
397            .map(ErrorEntry::from_transfer)
398            .collect();
399
400        let mut pending_entries: Vec<PendingEntry> =
401            pending.iter().map(PendingEntry::from_transfer).collect();
402        for t in &failed {
403            let state = PresenceState::from_transfer(t, &retry_policy);
404            if state == PresenceState::Pending {
405                pending_entries.push(PendingEntry::from_transfer(t));
406            }
407        }
408
409        Ok(SyncSummary {
410            locations,
411            total_entries: total_files,
412            total_errors,
413            error_entries,
414            pending_entries,
415        })
416    }
417
418    async fn errors(&self) -> Result<Vec<ErrorEntry>, SyncError> {
419        let retry_policy = self.config.retry_policy();
420        let failed = self.transfer_store.failed_transfers().await?;
421        Ok(failed
422            .iter()
423            .filter(|t| {
424                let state = PresenceState::from_transfer(t, &retry_policy);
425                state == PresenceState::Failed
426            })
427            .map(ErrorEntry::from_transfer)
428            .collect())
429    }
430
431    async fn pending(&self, dest: &LocationId) -> Result<Vec<PendingEntry>, SyncError> {
432        let retry_policy = self.config.retry_policy();
433
434        // Queued/Blocked/InFlight transfers for the target dest
435        let all_pending = self.transfer_store.all_pending_transfers().await?;
436        let mut entries: Vec<PendingEntry> = all_pending
437            .iter()
438            .filter(|t| t.dest() == dest)
439            .map(PendingEntry::from_transfer)
440            .collect();
441
442        // Failed but retryable transfers also count as pending
443        let failed = self.transfer_store.failed_transfers().await?;
444        for t in &failed {
445            if t.dest() == dest {
446                let state = PresenceState::from_transfer(t, &retry_policy);
447                if state == PresenceState::Pending {
448                    entries.push(PendingEntry::from_transfer(t));
449                }
450            }
451        }
452
453        Ok(entries)
454    }
455
456    // =========================================================================
457    // Topology — 読み取り専用
458    // =========================================================================
459
460    fn locations(&self) -> Vec<LocationId> {
461        self.topology.locations().to_vec()
462    }
463
464    fn all_edges(&self) -> Vec<(LocationId, LocationId)> {
465        self.engine.all_edges()
466    }
467
468    fn local_root(&self) -> Option<&Path> {
469        self.engine.local_root()
470    }
471
472    fn set_progress_callback(&self, callback: Option<ProgressFn>) {
473        if let Ok(mut guard) = self.progress.lock() {
474            *guard = callback;
475        }
476    }
477}