Skip to main content

vdsl_sync/application/sdk_impl/
sync_ops.rs

1//! `SyncStoreSdk` trait の `SdkImpl` 実装。
2//!
3//! 公開API面(sync / sync_route / put / delete / restore / get / list /
4//! status / errors / pending / locations / all_edges / local_root /
5//! set_progress_callback)を集約する。
6
7use std::path::Path;
8
9use async_trait::async_trait;
10use tracing::{error, info, trace, warn};
11
12use super::SdkImpl;
13use crate::application::error::SyncError;
14use crate::application::sdk::{PutReport, SyncReport, SyncReportError, SyncStoreSdk};
15use crate::application::topology_store::TopologyFileView;
16use crate::application::transfer_engine::PreparedTransfer;
17use crate::domain::file_type::FileType;
18use crate::domain::fingerprint::FileFingerprint;
19use crate::domain::location::{LocationId, SyncSummary};
20use crate::domain::view::{ErrorEntry, PendingEntry, PresenceState};
21use crate::infra::backend::ProgressFn;
22
23#[async_trait]
24impl SyncStoreSdk for SdkImpl {
25    // =========================================================================
26    // UseCase — 同期操作
27    // =========================================================================
28
29    async fn sync(&self) -> Result<SyncReport, SyncError> {
30        info!("sdk_impl::sync: pipeline start");
31        self.report_progress("ensure: checking locations");
32
33        // Phase 0a: Ensure — 全拠点の到達確認 + 外部ツール確保
34        // 失敗したLocationはスキャン/転送対象から除外し、syncは続行する。
35        let location_ids: Vec<String> = self.locations.iter().map(|l| l.id().to_string()).collect();
36        info!(
37            location_count = self.locations.len(),
38            locations = %location_ids.join(", "),
39            "sdk_impl::sync: ensure start"
40        );
41        let mut failed_locations: std::collections::HashSet<LocationId> =
42            std::collections::HashSet::new();
43        for loc in &self.locations {
44            info!(
45                location = %loc.id(),
46                kind = ?loc.kind(),
47                "sdk_impl::sync: ensure checking"
48            );
49            match loc.ensure().await {
50                Ok(()) => {
51                    info!(location = %loc.id(), "sdk_impl::sync: ensure ok");
52                }
53                Err(e) => {
54                    error!(
55                        location = %loc.id(),
56                        kind = ?loc.kind(),
57                        error = %e,
58                        "sdk_impl::sync: ensure FAILED — this location will be excluded from sync"
59                    );
60                    failed_locations.insert(loc.id().clone());
61                }
62            }
63        }
64        if failed_locations.is_empty() {
65            info!("sdk_impl::sync: ensure done — all locations reachable");
66        } else {
67            let excluded: Vec<String> = failed_locations.iter().map(|l| l.to_string()).collect();
68            warn!(
69                excluded = %excluded.join(", "),
70                "sdk_impl::sync: ensure done — {} location(s) excluded due to ensure failure",
71                failed_locations.len()
72            );
73        }
74
75        // Phase 0b: InFlight孤児の終端化(プロセスクラッシュ復帰)
76        let cancelled = self.transfer_store.cancel_orphaned_inflight().await?;
77        if cancelled > 0 {
78            info!(
79                cancelled_count = cancelled,
80                "sdk_impl::sync: cancelled orphaned InFlight transfers"
81            );
82        }
83
84        // Phase 1: Scan → TopologyDelta[]
85        self.report_progress("scan: scanning locations");
86        info!("sdk_impl::sync: phase1 scan start");
87        let progress_cb = self.progress.lock().ok().and_then(|g| g.clone());
88        let scan_result = self
89            .scanner
90            .scan_all(&self.scan_excludes, &failed_locations, progress_cb.as_ref())
91            .await?;
92        info!(
93            scanned = scan_result.scanned,
94            deltas = scan_result.deltas.len(),
95            scan_errors = scan_result.scan_errors.len(),
96            "sdk_impl::sync: phase1 scan done"
97        );
98        // delta詳細をtrace出力
99        for delta in &scan_result.deltas {
100            trace!(delta = ?delta, "sdk_impl::sync: delta");
101        }
102
103        // Phase 2: Plan — Apply → Distribute → Route → Transfer作成
104        self.report_progress(&format!(
105            "plan: {} files scanned, {} deltas",
106            scan_result.scanned,
107            scan_result.deltas.len()
108        ));
109        info!(
110            delta_count = scan_result.deltas.len(),
111            "sdk_impl::sync: phase2 plan start"
112        );
113        let plan_result = self.topology.sync(&scan_result.deltas).await?;
114        info!(
115            transfers_created = plan_result.transfers_created,
116            conflicts = plan_result.conflicts.len(),
117            "sdk_impl::sync: phase2 plan done"
118        );
119
120        // Phase 3: Execute — BFS順でTransfer実行 + DB永続化
121        // Propagate progress callback to all route backends for chunk-level reporting.
122        if let Ok(guard) = self.progress.lock() {
123            self.engine.set_progress_callback(guard.clone());
124        }
125        self.report_progress(&format!(
126            "execute: {} transfers queued",
127            plan_result.transfers_created
128        ));
129        info!("sdk_impl::sync: phase3 execute start");
130        let (transferred, failed, errors) = self.execute_bfs(&failed_locations).await?;
131        // Clear backend callbacks after execution.
132        self.engine.set_progress_callback(None);
133        info!(
134            transferred = transferred,
135            failed = failed,
136            error_count = errors.len(),
137            "sdk_impl::sync: phase3 execute done"
138        );
139
140        Ok(SyncReport {
141            scanned: scan_result.scanned,
142            scan_errors: scan_result
143                .scan_errors
144                .iter()
145                .map(|e| SyncReportError {
146                    path: e.path.clone(),
147                    error: e.error.clone(),
148                })
149                .collect(),
150            transfers_created: plan_result.transfers_created,
151            transferred,
152            failed,
153            errors,
154            conflicts: plan_result
155                .conflicts
156                .iter()
157                .map(crate::application::sdk::SyncReportConflict::from)
158                .collect(),
159        })
160    }
161
162    async fn sync_route(
163        &self,
164        src: &LocationId,
165        dest: &LocationId,
166    ) -> Result<SyncReport, SyncError> {
167        // Phase 0: InFlight孤児の終端化
168        let cancelled = self.transfer_store.cancel_orphaned_inflight().await?;
169        if cancelled > 0 {
170            info!(
171                cancelled_count = cancelled,
172                "sync_route: cancelled orphaned InFlight transfers"
173            );
174        }
175
176        // Phase 1: Plan — sync_routeはdelta生成なし、Distribute + Route のみ
177        self.report_progress(&format!("plan: route {src} → {dest}"));
178        let plan_result = self.topology.sync_route(src, dest).await?;
179
180        // Phase 2: Execute — dest宛のQueued Transferをsrcでフィルタして実行
181        // Propagate progress callback to all route backends.
182        if let Ok(guard) = self.progress.lock() {
183            self.engine.set_progress_callback(guard.clone());
184        }
185        let queued = self.transfer_store.queued_transfers(dest).await?;
186        let eligible: Vec<_> = queued.into_iter().filter(|t| t.src() == src).collect();
187
188        let mut prepared = Vec::with_capacity(eligible.len());
189        let mut total_failed = 0usize;
190        let mut all_errors: Vec<SyncReportError> = Vec::new();
191
192        for transfer in eligible {
193            match self.topology_files.get_by_id(transfer.file_id()).await {
194                Ok(Some(file)) => {
195                    prepared.push(PreparedTransfer {
196                        transfer,
197                        relative_path: file.relative_path().to_string(),
198                    });
199                }
200                Ok(None) => {
201                    total_failed += 1;
202                    all_errors.push(SyncReportError {
203                        path: transfer.file_id().to_string(),
204                        error: format!("file {} not found in store", transfer.file_id()),
205                    });
206                }
207                Err(e) => {
208                    total_failed += 1;
209                    all_errors.push(SyncReportError {
210                        path: transfer.file_id().to_string(),
211                        error: e.to_string(),
212                    });
213                }
214            }
215        }
216
217        self.report_progress(&format!(
218            "execute: {} transfers ({src} → {dest})",
219            prepared.len()
220        ));
221        let outcomes = self.engine.execute_prepared(prepared).await;
222        // Clear backend callbacks after execution.
223        self.engine.set_progress_callback(None);
224        let mut total_transferred = 0usize;
225
226        self.persist_outcomes(
227            &outcomes,
228            &mut total_transferred,
229            &mut total_failed,
230            &mut all_errors,
231        )
232        .await?;
233
234        Ok(SyncReport {
235            scanned: 0,
236            scan_errors: Vec::new(),
237            transfers_created: plan_result.transfers_created,
238            transferred: total_transferred,
239            failed: total_failed,
240            errors: all_errors,
241            conflicts: plan_result
242                .conflicts
243                .iter()
244                .map(crate::application::sdk::SyncReportConflict::from)
245                .collect(),
246        })
247    }
248
249    // =========================================================================
250    // Command — ファイル操作
251    // =========================================================================
252
253    async fn put(
254        &self,
255        path: &str,
256        file_type: FileType,
257        fingerprint: FileFingerprint,
258        origin: &LocationId,
259        embedded_id: Option<String>,
260    ) -> Result<PutReport, SyncError> {
261        let result = self
262            .topology
263            .put(path, file_type, fingerprint, origin, embedded_id)
264            .await?;
265        Ok(PutReport {
266            file_id: result.topology_file_id,
267            is_new: result.is_new,
268            transfers_created: result.transfers_created,
269        })
270    }
271
272    async fn delete(&self, path: &str) -> Result<usize, SyncError> {
273        self.topology.delete(path).await
274    }
275
276    async fn restore(&self, path: &str, revision: &str) -> Result<(), SyncError> {
277        info!(path = %path, revision = %revision, "sdk_impl::restore: start");
278
279        // 1. archive_root を持つルート(cloud宛)を engine から1件取得
280        let route = self.engine.archive_route().ok_or_else(|| -> SyncError {
281            crate::infra::error::InfraError::Transfer {
282                reason: "restore: no route with archive_root configured".into(),
283            }
284            .into()
285        })?;
286
287        // 2. 物理復元: cloud archive → cloud original
288        route.restore_from_archive(path, revision).await?;
289        info!(path = %path, "sdk_impl::restore: physical restore done");
290
291        // 3. 削除済みTopologyFileを取得して unmark
292        //    delete transfers 完走後は TF が hard-delete されている (commit c8213ce)
293        //    ため見つからないケースがある。物理 restore は既に完了しているので
294        //    次回 full sync で cloud から再発見させれば整合が取れる。
295        let deleted_tfs = self.topology_files.list_deleted().await?;
296        match deleted_tfs.into_iter().find(|t| t.relative_path() == path) {
297            Some(mut tf) => {
298                tf.unmark_deleted();
299                self.topology_files.upsert(&tf).await?;
300                info!(path = %path, file_id = %tf.id(), "sdk_impl::restore: TopologyFile unmarked");
301            }
302            None => {
303                warn!(
304                    path = %path,
305                    "sdk_impl::restore: TopologyFile not in deleted list (likely hard-deleted after delete transfers). Physical restore succeeded — next full sync will re-register."
306                );
307            }
308        }
309
310        Ok(())
311    }
312
313    // =========================================================================
314    // Query — 読み取り
315    // =========================================================================
316
317    async fn get(&self, path: &str) -> Result<Option<TopologyFileView>, SyncError> {
318        self.topology.get(path).await
319    }
320
321    async fn list(
322        &self,
323        file_type: Option<FileType>,
324        limit: Option<usize>,
325    ) -> Result<Vec<TopologyFileView>, SyncError> {
326        self.topology.list(file_type, limit).await
327    }
328
329    async fn status(&self) -> Result<SyncSummary, SyncError> {
330        use crate::domain::location::LocationSummary;
331        use crate::domain::transfer::TransferState;
332        use std::collections::HashMap;
333
334        let retry_policy = self.config.retry_policy();
335        let total_files = self.topology.file_count().await?;
336        let stats = self.transfer_store.transfer_stats().await?;
337        let present_counts = self.transfer_store.present_counts_by_location().await?;
338        let failed = self.transfer_store.failed_transfers().await?;
339        let pending = self.transfer_store.all_pending_transfers().await?;
340
341        let mut locations: HashMap<LocationId, LocationSummary> = HashMap::new();
342        let mut total_errors = 0usize;
343
344        for (loc, count) in &present_counts {
345            let summary = locations.entry(loc.clone()).or_default();
346            summary.present = *count;
347        }
348
349        for row in &stats {
350            if row.state == TransferState::Completed || row.state == TransferState::Cancelled {
351                continue;
352            }
353            let dest_state = match row.state {
354                TransferState::Blocked | TransferState::Queued => PresenceState::Pending,
355                TransferState::InFlight => PresenceState::Syncing,
356                TransferState::Failed => {
357                    let exhausted = match row.error_kind.as_deref() {
358                        Some("permanent") => true,
359                        _ => row.attempt >= retry_policy.max_attempts(),
360                    };
361                    if exhausted {
362                        PresenceState::Failed
363                    } else {
364                        PresenceState::Pending
365                    }
366                }
367                TransferState::Completed | TransferState::Cancelled => PresenceState::Absent,
368            };
369
370            let dest_summary = locations.entry(row.dest.clone()).or_default();
371            match dest_state {
372                PresenceState::Pending => {
373                    dest_summary.pending = dest_summary.pending.saturating_add(row.file_count);
374                }
375                PresenceState::Syncing => {
376                    dest_summary.syncing = dest_summary.syncing.saturating_add(row.file_count);
377                }
378                PresenceState::Failed => {
379                    dest_summary.failed = dest_summary.failed.saturating_add(row.file_count);
380                    total_errors = total_errors.saturating_add(row.file_count);
381                }
382                PresenceState::Absent => {
383                    dest_summary.absent = dest_summary.absent.saturating_add(row.file_count);
384                }
385                PresenceState::Present => {}
386            }
387        }
388
389        let error_entries: Vec<ErrorEntry> = failed
390            .iter()
391            .filter(|t| {
392                let state = PresenceState::from_transfer(t, &retry_policy);
393                state == PresenceState::Failed
394            })
395            .map(ErrorEntry::from_transfer)
396            .collect();
397
398        let mut pending_entries: Vec<PendingEntry> =
399            pending.iter().map(PendingEntry::from_transfer).collect();
400        for t in &failed {
401            let state = PresenceState::from_transfer(t, &retry_policy);
402            if state == PresenceState::Pending {
403                pending_entries.push(PendingEntry::from_transfer(t));
404            }
405        }
406
407        Ok(SyncSummary {
408            locations,
409            total_entries: total_files,
410            total_errors,
411            error_entries,
412            pending_entries,
413        })
414    }
415
416    async fn errors(&self) -> Result<Vec<ErrorEntry>, SyncError> {
417        let retry_policy = self.config.retry_policy();
418        let failed = self.transfer_store.failed_transfers().await?;
419        Ok(failed
420            .iter()
421            .filter(|t| {
422                let state = PresenceState::from_transfer(t, &retry_policy);
423                state == PresenceState::Failed
424            })
425            .map(ErrorEntry::from_transfer)
426            .collect())
427    }
428
429    async fn pending(&self, dest: &LocationId) -> Result<Vec<PendingEntry>, SyncError> {
430        let retry_policy = self.config.retry_policy();
431
432        // Queued/Blocked/InFlight transfers for the target dest
433        let all_pending = self.transfer_store.all_pending_transfers().await?;
434        let mut entries: Vec<PendingEntry> = all_pending
435            .iter()
436            .filter(|t| t.dest() == dest)
437            .map(PendingEntry::from_transfer)
438            .collect();
439
440        // Failed but retryable transfers also count as pending
441        let failed = self.transfer_store.failed_transfers().await?;
442        for t in &failed {
443            if t.dest() == dest {
444                let state = PresenceState::from_transfer(t, &retry_policy);
445                if state == PresenceState::Pending {
446                    entries.push(PendingEntry::from_transfer(t));
447                }
448            }
449        }
450
451        Ok(entries)
452    }
453
454    // =========================================================================
455    // Topology — 読み取り専用
456    // =========================================================================
457
458    fn locations(&self) -> Vec<LocationId> {
459        self.topology.locations().to_vec()
460    }
461
462    fn all_edges(&self) -> Vec<(LocationId, LocationId)> {
463        self.engine.all_edges()
464    }
465
466    fn local_root(&self) -> Option<&Path> {
467        self.engine.local_root()
468    }
469
470    fn set_progress_callback(&self, callback: Option<ProgressFn>) {
471        if let Ok(mut guard) = self.progress.lock() {
472            *guard = callback;
473        }
474    }
475}