1use std::path::Path;
20use std::sync::{Arc, Mutex as StdMutex};
21
22use async_trait::async_trait;
23use tracing::{debug, error, info, trace, warn};
24
25use super::route::{TransferDirection, TransferRoute};
26use super::sdk::{PutReport, SyncReport, SyncReportError, SyncStoreSdk};
27use super::topology_scanner::TopologyScanner;
28use super::topology_store::{TopologyFileView, TopologyStore};
29use super::transfer_engine::{PreparedTransfer, TransferEngine, TransferOutcome};
30use crate::application::error::SyncError;
31use crate::domain::config::SyncConfig;
32use crate::domain::file_type::FileType;
33use crate::domain::fingerprint::FileFingerprint;
34use crate::domain::graph::{EdgeCost, RouteGraph};
35use crate::domain::location::{LocationId, SyncSummary};
36use crate::domain::transfer::TransferState;
37use crate::domain::view::{ErrorEntry, PendingEntry, PresenceState};
38use crate::infra::backend::{ProgressFn, StorageBackend};
39use crate::infra::location::{Location, LocationKind};
40use crate::infra::location_file_store::LocationFileStore;
41use crate::infra::location_scanner::LocationScanner;
42use crate::infra::shell::RemoteShell;
43use crate::infra::topology_file_store::TopologyFileStore;
44use crate::infra::transfer_store::TransferStore;
45
46pub struct SdkImpl {
51 scanner: TopologyScanner,
52 topology: TopologyStore,
53 engine: TransferEngine,
54 topology_files: Arc<dyn TopologyFileStore>,
55 location_files: Arc<dyn LocationFileStore>,
56 transfer_store: Arc<dyn TransferStore>,
57 locations: Vec<Arc<dyn Location>>,
58 config: SyncConfig,
59 scan_excludes: Vec<glob::Pattern>,
60 progress: StdMutex<Option<ProgressFn>>,
62}
63
64struct PendingRoute {
73 src: LocationId,
74 dest: LocationId,
75 backend: Box<dyn StorageBackend>,
76 src_shell: Option<Box<dyn RemoteShell>>,
77 direction: TransferDirection,
78}
79
80pub struct SdkImplBuilder {
100 topology_files: Arc<dyn TopologyFileStore>,
101 location_files: Arc<dyn LocationFileStore>,
102 transfer_store: Arc<dyn TransferStore>,
103 locations: Vec<Arc<dyn Location>>,
104 pending_routes: Vec<PendingRoute>,
105 config: Option<SyncConfig>,
106 scan_excludes: Vec<glob::Pattern>,
107}
108
109impl SdkImplBuilder {
110 pub fn new(
112 topology_files: Arc<dyn TopologyFileStore>,
113 location_files: Arc<dyn LocationFileStore>,
114 transfer_store: Arc<dyn TransferStore>,
115 ) -> Self {
116 Self {
117 topology_files,
118 location_files,
119 transfer_store,
120 locations: Vec::new(),
121 pending_routes: Vec::new(),
122 config: None,
123 scan_excludes: Vec::new(),
124 }
125 }
126
127 pub fn location(mut self, loc: Arc<dyn Location>) -> Self {
132 if !self.locations.iter().any(|l| l.id() == loc.id()) {
133 self.locations.push(loc);
134 }
135 self
136 }
137
138 pub fn connect(
144 mut self,
145 src: &LocationId,
146 dest: &LocationId,
147 backend: Box<dyn StorageBackend>,
148 ) -> Self {
149 self.pending_routes.push(PendingRoute {
150 src: src.clone(),
151 dest: dest.clone(),
152 backend,
153 src_shell: None,
154 direction: TransferDirection::Push,
155 });
156 self
157 }
158
159 pub fn connect_with_shell(
164 mut self,
165 src: &LocationId,
166 dest: &LocationId,
167 backend: Box<dyn StorageBackend>,
168 src_shell: Box<dyn RemoteShell>,
169 ) -> Self {
170 self.pending_routes.push(PendingRoute {
171 src: src.clone(),
172 dest: dest.clone(),
173 backend,
174 src_shell: Some(src_shell),
175 direction: TransferDirection::Push,
176 });
177 self
178 }
179
180 pub fn connect_pull(
185 mut self,
186 src: &LocationId,
187 dest: &LocationId,
188 backend: Box<dyn StorageBackend>,
189 ) -> Self {
190 self.pending_routes.push(PendingRoute {
191 src: src.clone(),
192 dest: dest.clone(),
193 backend,
194 src_shell: None,
195 direction: TransferDirection::Pull,
196 });
197 self
198 }
199
200 pub fn config(mut self, config: SyncConfig) -> Self {
202 self.config = Some(config);
203 self
204 }
205
206 pub fn exclude(mut self, pattern: &str) -> Self {
208 match glob::Pattern::new(pattern) {
209 Ok(p) => self.scan_excludes.push(p),
210 Err(e) => {
211 tracing::warn!(pattern = pattern, error = %e, "invalid exclude glob pattern, skipped");
212 }
213 }
214 self
215 }
216
217 pub fn build(self) -> Result<SdkImpl, SyncError> {
224 use std::collections::HashMap;
225
226 let config = self.config.unwrap_or_default();
227
228 let loc_map: HashMap<LocationId, &Arc<dyn Location>> =
230 self.locations.iter().map(|l| (l.id().clone(), l)).collect();
231
232 let scanners: Vec<Arc<dyn LocationScanner>> =
234 self.locations.iter().map(|loc| loc.scanner()).collect();
235
236 let routes: Vec<TransferRoute> = self
238 .pending_routes
239 .into_iter()
240 .filter_map(|pr| {
241 let src_loc = loc_map.get(&pr.src)?;
242 let dest_loc = loc_map.get(&pr.dest)?;
243
244 let cost = match estimate_route_cost(src_loc.kind(), dest_loc.kind()) {
245 Ok(c) => c,
246 Err(e) => {
247 tracing::warn!(src = ?src_loc.kind(), dest = ?dest_loc.kind(), error = %e, "skipping route: invalid cost");
248 return None;
249 }
250 };
251
252 let mut route = TransferRoute::new(
253 pr.src,
254 pr.dest,
255 src_loc.file_root().to_path_buf(),
256 dest_loc.file_root().to_path_buf(),
257 pr.backend,
258 )
259 .with_cost(cost.time_per_gb, cost.priority);
260
261 if pr.direction == TransferDirection::Pull {
262 route = route.direction(TransferDirection::Pull);
263 }
264 if let Some(shell) = pr.src_shell {
265 route = route.with_src_shell(shell);
266 }
267
268 Some(route)
269 })
270 .collect();
271
272 let location_ids: Vec<LocationId> =
274 self.locations.iter().map(|loc| loc.id().clone()).collect();
275
276 let mut graph = RouteGraph::new();
278 for r in &routes {
279 graph.add_with_cost(
280 r.src().clone(),
281 r.dest().clone(),
282 EdgeCost::new(r.time_per_gb(), r.priority())?,
283 );
284 }
285
286 let topology = TopologyStore::new(
287 self.topology_files.clone(),
288 self.location_files.clone(),
289 self.transfer_store.clone(),
290 graph.clone(),
291 location_ids,
292 );
293
294 let engine = TransferEngine::new(graph, routes, config.concurrency);
295
296 let scanner = TopologyScanner::new(
297 self.topology_files.clone(),
298 self.location_files.clone(),
299 scanners,
300 );
301
302 Ok(SdkImpl {
303 scanner,
304 topology,
305 engine,
306 topology_files: self.topology_files,
307 location_files: self.location_files,
308 transfer_store: self.transfer_store,
309 locations: self.locations,
310 config,
311 scan_excludes: self.scan_excludes,
312 progress: StdMutex::new(None),
313 })
314 }
315}
316
317fn estimate_route_cost(
323 src: LocationKind,
324 dest: LocationKind,
325) -> Result<EdgeCost, crate::domain::error::DomainError> {
326 let (time_per_gb, priority) = match (src, dest) {
327 (LocationKind::Local, LocationKind::Remote) => (1.0, 10),
329 (LocationKind::Remote, LocationKind::Local) => (1.0, 10),
330
331 (LocationKind::Remote, LocationKind::Cloud) => (2.0, 50),
333 (LocationKind::Cloud, LocationKind::Remote) => (2.0, 50),
334
335 (LocationKind::Local, LocationKind::Cloud) => (5.0, 100),
337 (LocationKind::Cloud, LocationKind::Local) => (5.0, 100),
338
339 _ => (1.0, 100),
341 };
342 EdgeCost::new(time_per_gb, priority)
343}
344
345impl SdkImpl {
346 fn report_progress(&self, msg: &str) {
348 if let Ok(guard) = self.progress.lock() {
349 if let Some(cb) = guard.as_ref() {
350 cb(msg);
351 }
352 }
353 }
354
355 async fn execute_bfs(
360 &self,
361 skip_locations: &std::collections::HashSet<crate::domain::location::LocationId>,
362 ) -> Result<(usize, usize, Vec<SyncReportError>), SyncError> {
363 let mut total_transferred = 0usize;
364 let mut total_failed = 0usize;
365 let mut all_errors: Vec<SyncReportError> = Vec::new();
366
367 let targets = self.engine.all_targets_ordered();
368 debug!(targets = ?targets, "execute_bfs: BFS target order");
369
370 for target in &targets {
371 if skip_locations.contains(target) {
372 warn!(
373 target = %target,
374 "execute_bfs: skipping target (ensure failed)"
375 );
376 continue;
377 }
378 let queued = self.transfer_store.queued_transfers(target).await?;
379 if queued.is_empty() {
380 debug!(target = %target, "execute_bfs: no queued transfers, skip");
381 continue;
382 }
383 info!(target = %target, queued = queued.len(), "execute_bfs: processing target");
384 self.report_progress(&format!("target {target}: {} queued", queued.len()));
385
386 let mut prepared = Vec::with_capacity(queued.len());
388 let mut resolve_miss = 0usize;
389 for transfer in queued {
390 match self.topology_files.get_by_id(transfer.file_id()).await {
391 Ok(Some(file)) => {
392 trace!(
393 file_id = %transfer.file_id(),
394 path = %file.relative_path(),
395 src = %transfer.src(),
396 dest = %transfer.dest(),
397 "execute_bfs: prepared"
398 );
399 prepared.push(PreparedTransfer {
400 transfer,
401 relative_path: file.relative_path().to_string(),
402 });
403 }
404 Ok(None) => {
405 resolve_miss += 1;
406 error!(
407 file_id = %transfer.file_id(),
408 src = %transfer.src(),
409 dest = %transfer.dest(),
410 "execute_bfs: topology_file not found — transfer skipped"
411 );
412 total_failed += 1;
413 all_errors.push(SyncReportError {
414 path: transfer.file_id().to_string(),
415 error: format!("file {} not found in store", transfer.file_id()),
416 });
417 }
418 Err(e) => {
419 resolve_miss += 1;
420 error!(
421 file_id = %transfer.file_id(),
422 src = %transfer.src(),
423 dest = %transfer.dest(),
424 err = %e,
425 "execute_bfs: topology_file lookup error — transfer skipped"
426 );
427 total_failed += 1;
428 all_errors.push(SyncReportError {
429 path: transfer.file_id().to_string(),
430 error: e.to_string(),
431 });
432 }
433 }
434 }
435 let (sync_prepared, delete_prepared): (Vec<_>, Vec<_>) =
439 prepared.into_iter().partition(|p| !p.transfer.is_delete());
440
441 debug!(
442 target = %target,
443 sync = sync_prepared.len(),
444 delete = delete_prepared.len(),
445 resolve_miss = resolve_miss,
446 "execute_bfs: preparation done"
447 );
448
449 if !sync_prepared.is_empty() {
451 info!(
452 target = %target,
453 count = sync_prepared.len(),
454 "execute_bfs: executing sync transfers"
455 );
456 let sync_outcomes = self.engine.execute_prepared(sync_prepared).await;
457 info!(
458 target = %target,
459 outcomes = sync_outcomes.len(),
460 "execute_bfs: sync execution done, persisting"
461 );
462 self.persist_outcomes(
463 &sync_outcomes,
464 &mut total_transferred,
465 &mut total_failed,
466 &mut all_errors,
467 )
468 .await?;
469 }
470
471 if !delete_prepared.is_empty() {
473 info!(
474 target = %target,
475 count = delete_prepared.len(),
476 "execute_bfs: executing delete transfers"
477 );
478 let delete_outcomes = self.engine.execute_prepared(delete_prepared).await;
479 info!(
480 target = %target,
481 outcomes = delete_outcomes.len(),
482 "execute_bfs: delete execution done, persisting"
483 );
484 self.persist_outcomes(
485 &delete_outcomes,
486 &mut total_transferred,
487 &mut total_failed,
488 &mut all_errors,
489 )
490 .await?;
491 }
492
493 info!(
494 target = %target,
495 transferred = total_transferred,
496 failed = total_failed,
497 "execute_bfs: target batch done"
498 );
499 }
500
501 Ok((total_transferred, total_failed, all_errors))
502 }
503
504 async fn persist_outcomes(
508 &self,
509 outcomes: &[TransferOutcome],
510 total_transferred: &mut usize,
511 total_failed: &mut usize,
512 all_errors: &mut Vec<SyncReportError>,
513 ) -> Result<(), SyncError> {
514 for outcome in outcomes {
515 let is_completed = outcome.transfer.state() == TransferState::Completed;
516 self.transfer_store
517 .update_transfer(&outcome.transfer)
518 .await?;
519
520 if is_completed {
521 self.transfer_store
522 .unblock_dependents(outcome.transfer.id())
523 .await?;
524
525 if outcome.transfer.is_delete() {
526 let deleted = self
528 .location_files
529 .delete(outcome.transfer.file_id(), outcome.transfer.dest())
530 .await?;
531 trace!(
532 file_id = %outcome.transfer.file_id(),
533 dest = %outcome.transfer.dest(),
534 deleted = deleted,
535 "execute_bfs: delete transfer → LocationFile removed"
536 );
537 } else {
538 if let Ok(Some(tf)) = self
540 .topology_files
541 .get_by_id(outcome.transfer.file_id())
542 .await
543 {
544 let src_lf = self
545 .location_files
546 .get(outcome.transfer.file_id(), outcome.transfer.src())
547 .await?;
548 if let Some(src_lf) = src_lf {
549 trace!(
550 file_id = %outcome.transfer.file_id(),
551 src = %outcome.transfer.src(),
552 dest = %outcome.transfer.dest(),
553 path = %outcome.relative_path,
554 "persist_outcomes: creating dest LocationFile from src"
555 );
556 let dest_lf = tf
557 .materialize(
558 outcome.transfer.dest().clone(),
559 outcome.relative_path.clone(),
560 src_lf.fingerprint().clone(),
561 src_lf.embedded_id().map(|s| s.to_string()),
562 )
563 .map_err(SyncError::Domain)?;
564 self.location_files.upsert(&dest_lf).await?;
565 } else {
566 warn!(
567 file_id = %outcome.transfer.file_id(),
568 src = %outcome.transfer.src(),
569 "persist_outcomes: src LocationFile not found, cannot create dest LF"
570 );
571 }
572 } else {
573 warn!(
574 file_id = %outcome.transfer.file_id(),
575 "persist_outcomes: TopologyFile not found for completed transfer"
576 );
577 }
578 }
579
580 *total_transferred += 1;
581 info!(
582 id = %outcome.transfer.id(),
583 src = %outcome.transfer.src(),
584 dest = %outcome.transfer.dest(),
585 path = %outcome.relative_path,
586 kind = ?outcome.transfer.kind(),
587 "execute_bfs: transfer completed"
588 );
589 } else {
590 *total_failed += 1;
591 let err_msg = outcome
592 .transfer
593 .error()
594 .map(|e| e.to_string())
595 .unwrap_or_else(|| "unknown error".to_string());
596 error!(
597 id = %outcome.transfer.id(),
598 src = %outcome.transfer.src(),
599 dest = %outcome.transfer.dest(),
600 path = %outcome.relative_path,
601 err = %err_msg,
602 "execute_bfs: transfer FAILED"
603 );
604 all_errors.push(SyncReportError {
605 path: outcome.relative_path.clone(),
606 error: err_msg,
607 });
608 }
609 }
610 Ok(())
611 }
612}
613
614#[async_trait]
615impl SyncStoreSdk for SdkImpl {
616 async fn sync(&self) -> Result<SyncReport, SyncError> {
621 info!("sdk_impl::sync: pipeline start");
622 self.report_progress("ensure: checking locations");
623
624 let location_ids: Vec<String> = self.locations.iter().map(|l| l.id().to_string()).collect();
627 info!(
628 location_count = self.locations.len(),
629 locations = %location_ids.join(", "),
630 "sdk_impl::sync: ensure start"
631 );
632 let mut failed_locations: std::collections::HashSet<LocationId> =
633 std::collections::HashSet::new();
634 for loc in &self.locations {
635 info!(
636 location = %loc.id(),
637 kind = ?loc.kind(),
638 "sdk_impl::sync: ensure checking"
639 );
640 match loc.ensure().await {
641 Ok(()) => {
642 info!(location = %loc.id(), "sdk_impl::sync: ensure ok");
643 }
644 Err(e) => {
645 error!(
646 location = %loc.id(),
647 kind = ?loc.kind(),
648 error = %e,
649 "sdk_impl::sync: ensure FAILED — this location will be excluded from sync"
650 );
651 failed_locations.insert(loc.id().clone());
652 }
653 }
654 }
655 if failed_locations.is_empty() {
656 info!("sdk_impl::sync: ensure done — all locations reachable");
657 } else {
658 let excluded: Vec<String> = failed_locations.iter().map(|l| l.to_string()).collect();
659 warn!(
660 excluded = %excluded.join(", "),
661 "sdk_impl::sync: ensure done — {} location(s) excluded due to ensure failure",
662 failed_locations.len()
663 );
664 }
665
666 let cancelled = self.transfer_store.cancel_orphaned_inflight().await?;
668 if cancelled > 0 {
669 info!(
670 cancelled_count = cancelled,
671 "sdk_impl::sync: cancelled orphaned InFlight transfers"
672 );
673 }
674
675 self.report_progress("scan: scanning locations");
677 info!("sdk_impl::sync: phase1 scan start");
678 let progress_cb = self.progress.lock().ok().and_then(|g| g.clone());
679 let scan_result = self
680 .scanner
681 .scan_all(&self.scan_excludes, &failed_locations, progress_cb.as_ref())
682 .await?;
683 info!(
684 scanned = scan_result.scanned,
685 deltas = scan_result.deltas.len(),
686 scan_errors = scan_result.scan_errors.len(),
687 "sdk_impl::sync: phase1 scan done"
688 );
689 for delta in &scan_result.deltas {
691 trace!(delta = ?delta, "sdk_impl::sync: delta");
692 }
693
694 self.report_progress(&format!(
696 "plan: {} files scanned, {} deltas",
697 scan_result.scanned,
698 scan_result.deltas.len()
699 ));
700 info!(
701 delta_count = scan_result.deltas.len(),
702 "sdk_impl::sync: phase2 plan start"
703 );
704 let plan_result = self.topology.sync(&scan_result.deltas).await?;
705 info!(
706 transfers_created = plan_result.transfers_created,
707 conflicts = plan_result.conflicts.len(),
708 "sdk_impl::sync: phase2 plan done"
709 );
710
711 if let Ok(guard) = self.progress.lock() {
714 self.engine.set_progress_callback(guard.clone());
715 }
716 self.report_progress(&format!(
717 "execute: {} transfers queued",
718 plan_result.transfers_created
719 ));
720 info!("sdk_impl::sync: phase3 execute start");
721 let (transferred, failed, errors) = self.execute_bfs(&failed_locations).await?;
722 self.engine.set_progress_callback(None);
724 info!(
725 transferred = transferred,
726 failed = failed,
727 error_count = errors.len(),
728 "sdk_impl::sync: phase3 execute done"
729 );
730
731 Ok(SyncReport {
732 scanned: scan_result.scanned,
733 scan_errors: scan_result
734 .scan_errors
735 .iter()
736 .map(|e| SyncReportError {
737 path: e.path.clone(),
738 error: e.error.clone(),
739 })
740 .collect(),
741 transfers_created: plan_result.transfers_created,
742 transferred,
743 failed,
744 errors,
745 conflicts: plan_result
746 .conflicts
747 .iter()
748 .map(super::sdk::SyncReportConflict::from)
749 .collect(),
750 })
751 }
752
753 async fn sync_route(
754 &self,
755 src: &LocationId,
756 dest: &LocationId,
757 ) -> Result<SyncReport, SyncError> {
758 let cancelled = self.transfer_store.cancel_orphaned_inflight().await?;
760 if cancelled > 0 {
761 info!(
762 cancelled_count = cancelled,
763 "sync_route: cancelled orphaned InFlight transfers"
764 );
765 }
766
767 self.report_progress(&format!("plan: route {src} → {dest}"));
769 let plan_result = self.topology.sync_route(src, dest).await?;
770
771 if let Ok(guard) = self.progress.lock() {
774 self.engine.set_progress_callback(guard.clone());
775 }
776 let queued = self.transfer_store.queued_transfers(dest).await?;
777 let eligible: Vec<_> = queued.into_iter().filter(|t| t.src() == src).collect();
778
779 let mut prepared = Vec::with_capacity(eligible.len());
780 let mut total_failed = 0usize;
781 let mut all_errors: Vec<SyncReportError> = Vec::new();
782
783 for transfer in eligible {
784 match self.topology_files.get_by_id(transfer.file_id()).await {
785 Ok(Some(file)) => {
786 prepared.push(PreparedTransfer {
787 transfer,
788 relative_path: file.relative_path().to_string(),
789 });
790 }
791 Ok(None) => {
792 total_failed += 1;
793 all_errors.push(SyncReportError {
794 path: transfer.file_id().to_string(),
795 error: format!("file {} not found in store", transfer.file_id()),
796 });
797 }
798 Err(e) => {
799 total_failed += 1;
800 all_errors.push(SyncReportError {
801 path: transfer.file_id().to_string(),
802 error: e.to_string(),
803 });
804 }
805 }
806 }
807
808 self.report_progress(&format!(
809 "execute: {} transfers ({src} → {dest})",
810 prepared.len()
811 ));
812 let outcomes = self.engine.execute_prepared(prepared).await;
813 self.engine.set_progress_callback(None);
815 let mut total_transferred = 0usize;
816
817 self.persist_outcomes(
818 &outcomes,
819 &mut total_transferred,
820 &mut total_failed,
821 &mut all_errors,
822 )
823 .await?;
824
825 Ok(SyncReport {
826 scanned: 0,
827 scan_errors: Vec::new(),
828 transfers_created: plan_result.transfers_created,
829 transferred: total_transferred,
830 failed: total_failed,
831 errors: all_errors,
832 conflicts: plan_result
833 .conflicts
834 .iter()
835 .map(super::sdk::SyncReportConflict::from)
836 .collect(),
837 })
838 }
839
840 async fn put(
845 &self,
846 path: &str,
847 file_type: FileType,
848 fingerprint: FileFingerprint,
849 origin: &LocationId,
850 embedded_id: Option<String>,
851 ) -> Result<PutReport, SyncError> {
852 let result = self
853 .topology
854 .put(path, file_type, fingerprint, origin, embedded_id)
855 .await?;
856 Ok(PutReport {
857 file_id: result.topology_file_id,
858 is_new: result.is_new,
859 transfers_created: result.transfers_created,
860 })
861 }
862
863 async fn delete(&self, path: &str) -> Result<usize, SyncError> {
864 self.topology.delete(path).await
865 }
866
867 async fn get(&self, path: &str) -> Result<Option<TopologyFileView>, SyncError> {
872 self.topology.get(path).await
873 }
874
875 async fn list(
876 &self,
877 file_type: Option<FileType>,
878 limit: Option<usize>,
879 ) -> Result<Vec<TopologyFileView>, SyncError> {
880 self.topology.list(file_type, limit).await
881 }
882
883 async fn status(&self) -> Result<SyncSummary, SyncError> {
884 use crate::domain::location::LocationSummary;
885 use crate::domain::transfer::TransferState;
886 use std::collections::HashMap;
887
888 let retry_policy = self.config.retry_policy();
889 let total_files = self.topology.file_count().await?;
890 let stats = self.transfer_store.transfer_stats().await?;
891 let present_counts = self.transfer_store.present_counts_by_location().await?;
892 let failed = self.transfer_store.failed_transfers().await?;
893 let pending = self.transfer_store.all_pending_transfers().await?;
894
895 let mut locations: HashMap<LocationId, LocationSummary> = HashMap::new();
896 let mut total_errors = 0usize;
897
898 for (loc, count) in &present_counts {
899 let summary = locations.entry(loc.clone()).or_default();
900 summary.present = *count;
901 }
902
903 for row in &stats {
904 if row.state == TransferState::Completed || row.state == TransferState::Cancelled {
905 continue;
906 }
907 let dest_state = match row.state {
908 TransferState::Blocked | TransferState::Queued => PresenceState::Pending,
909 TransferState::InFlight => PresenceState::Syncing,
910 TransferState::Failed => {
911 let exhausted = match row.error_kind.as_deref() {
912 Some("permanent") => true,
913 _ => row.attempt >= retry_policy.max_attempts(),
914 };
915 if exhausted {
916 PresenceState::Failed
917 } else {
918 PresenceState::Pending
919 }
920 }
921 TransferState::Completed | TransferState::Cancelled => PresenceState::Absent,
922 };
923
924 let dest_summary = locations.entry(row.dest.clone()).or_default();
925 match dest_state {
926 PresenceState::Pending => {
927 dest_summary.pending = dest_summary.pending.saturating_add(row.file_count);
928 }
929 PresenceState::Syncing => {
930 dest_summary.syncing = dest_summary.syncing.saturating_add(row.file_count);
931 }
932 PresenceState::Failed => {
933 dest_summary.failed = dest_summary.failed.saturating_add(row.file_count);
934 total_errors = total_errors.saturating_add(row.file_count);
935 }
936 PresenceState::Absent => {
937 dest_summary.absent = dest_summary.absent.saturating_add(row.file_count);
938 }
939 PresenceState::Present => {}
940 }
941 }
942
943 let error_entries: Vec<ErrorEntry> = failed
944 .iter()
945 .filter(|t| {
946 let state = PresenceState::from_transfer(t, &retry_policy);
947 state == PresenceState::Failed
948 })
949 .map(ErrorEntry::from_transfer)
950 .collect();
951
952 let mut pending_entries: Vec<PendingEntry> =
953 pending.iter().map(PendingEntry::from_transfer).collect();
954 for t in &failed {
955 let state = PresenceState::from_transfer(t, &retry_policy);
956 if state == PresenceState::Pending {
957 pending_entries.push(PendingEntry::from_transfer(t));
958 }
959 }
960
961 Ok(SyncSummary {
962 locations,
963 total_entries: total_files,
964 total_errors,
965 error_entries,
966 pending_entries,
967 })
968 }
969
970 async fn errors(&self) -> Result<Vec<ErrorEntry>, SyncError> {
971 let retry_policy = self.config.retry_policy();
972 let failed = self.transfer_store.failed_transfers().await?;
973 Ok(failed
974 .iter()
975 .filter(|t| {
976 let state = PresenceState::from_transfer(t, &retry_policy);
977 state == PresenceState::Failed
978 })
979 .map(ErrorEntry::from_transfer)
980 .collect())
981 }
982
983 async fn pending(&self, dest: &LocationId) -> Result<Vec<PendingEntry>, SyncError> {
984 let retry_policy = self.config.retry_policy();
985
986 let all_pending = self.transfer_store.all_pending_transfers().await?;
988 let mut entries: Vec<PendingEntry> = all_pending
989 .iter()
990 .filter(|t| t.dest() == dest)
991 .map(PendingEntry::from_transfer)
992 .collect();
993
994 let failed = self.transfer_store.failed_transfers().await?;
996 for t in &failed {
997 if t.dest() == dest {
998 let state = PresenceState::from_transfer(t, &retry_policy);
999 if state == PresenceState::Pending {
1000 entries.push(PendingEntry::from_transfer(t));
1001 }
1002 }
1003 }
1004
1005 Ok(entries)
1006 }
1007
1008 fn locations(&self) -> Vec<LocationId> {
1013 self.topology.locations().to_vec()
1014 }
1015
1016 fn all_edges(&self) -> Vec<(LocationId, LocationId)> {
1017 self.engine.all_edges()
1018 }
1019
1020 fn local_root(&self) -> Option<&Path> {
1021 self.engine.local_root()
1022 }
1023
1024 fn set_progress_callback(&self, callback: Option<ProgressFn>) {
1025 if let Ok(mut guard) = self.progress.lock() {
1026 *guard = callback;
1027 }
1028 }
1029}