use std::path::Path;
use std::sync::{Arc, Mutex as StdMutex};
use async_trait::async_trait;
use tracing::{debug, error, info, trace, warn};
use super::route::{TransferDirection, TransferRoute};
use super::sdk::{PutReport, SyncReport, SyncReportError, SyncStoreSdk};
use super::topology_scanner::TopologyScanner;
use super::topology_store::{TopologyFileView, TopologyStore};
use super::transfer_engine::{PreparedTransfer, TransferEngine, TransferOutcome};
use crate::application::error::SyncError;
use crate::domain::config::SyncConfig;
use crate::domain::file_type::FileType;
use crate::domain::fingerprint::FileFingerprint;
use crate::domain::graph::{EdgeCost, RouteGraph};
use crate::domain::location::{LocationId, SyncSummary};
use crate::domain::transfer::TransferState;
use crate::domain::view::{ErrorEntry, PendingEntry, PresenceState};
use crate::infra::backend::{ProgressFn, StorageBackend};
use crate::infra::location::{Location, LocationKind};
use crate::infra::location_file_store::LocationFileStore;
use crate::infra::location_scanner::LocationScanner;
use crate::infra::shell::RemoteShell;
use crate::infra::topology_file_store::TopologyFileStore;
use crate::infra::transfer_store::TransferStore;
pub struct SdkImpl {
scanner: TopologyScanner,
topology: TopologyStore,
engine: TransferEngine,
topology_files: Arc<dyn TopologyFileStore>,
location_files: Arc<dyn LocationFileStore>,
transfer_store: Arc<dyn TransferStore>,
locations: Vec<Arc<dyn Location>>,
config: SyncConfig,
scan_excludes: Vec<glob::Pattern>,
progress: StdMutex<Option<ProgressFn>>,
}
struct PendingRoute {
src: LocationId,
dest: LocationId,
backend: Box<dyn StorageBackend>,
src_shell: Option<Box<dyn RemoteShell>>,
direction: TransferDirection,
}
pub struct SdkImplBuilder {
topology_files: Arc<dyn TopologyFileStore>,
location_files: Arc<dyn LocationFileStore>,
transfer_store: Arc<dyn TransferStore>,
locations: Vec<Arc<dyn Location>>,
pending_routes: Vec<PendingRoute>,
config: Option<SyncConfig>,
scan_excludes: Vec<glob::Pattern>,
}
impl SdkImplBuilder {
pub fn new(
topology_files: Arc<dyn TopologyFileStore>,
location_files: Arc<dyn LocationFileStore>,
transfer_store: Arc<dyn TransferStore>,
) -> Self {
Self {
topology_files,
location_files,
transfer_store,
locations: Vec::new(),
pending_routes: Vec::new(),
config: None,
scan_excludes: Vec::new(),
}
}
pub fn location(mut self, loc: Arc<dyn Location>) -> Self {
if !self.locations.iter().any(|l| l.id() == loc.id()) {
self.locations.push(loc);
}
self
}
pub fn connect(
mut self,
src: &LocationId,
dest: &LocationId,
backend: Box<dyn StorageBackend>,
) -> Self {
self.pending_routes.push(PendingRoute {
src: src.clone(),
dest: dest.clone(),
backend,
src_shell: None,
direction: TransferDirection::Push,
});
self
}
pub fn connect_with_shell(
mut self,
src: &LocationId,
dest: &LocationId,
backend: Box<dyn StorageBackend>,
src_shell: Box<dyn RemoteShell>,
) -> Self {
self.pending_routes.push(PendingRoute {
src: src.clone(),
dest: dest.clone(),
backend,
src_shell: Some(src_shell),
direction: TransferDirection::Push,
});
self
}
pub fn connect_pull(
mut self,
src: &LocationId,
dest: &LocationId,
backend: Box<dyn StorageBackend>,
) -> Self {
self.pending_routes.push(PendingRoute {
src: src.clone(),
dest: dest.clone(),
backend,
src_shell: None,
direction: TransferDirection::Pull,
});
self
}
pub fn config(mut self, config: SyncConfig) -> Self {
self.config = Some(config);
self
}
pub fn exclude(mut self, pattern: &str) -> Self {
match glob::Pattern::new(pattern) {
Ok(p) => self.scan_excludes.push(p),
Err(e) => {
tracing::warn!(pattern = pattern, error = %e, "invalid exclude glob pattern, skipped");
}
}
self
}
pub fn build(self) -> Result<SdkImpl, SyncError> {
use std::collections::HashMap;
let config = self.config.unwrap_or_default();
let loc_map: HashMap<LocationId, &Arc<dyn Location>> =
self.locations.iter().map(|l| (l.id().clone(), l)).collect();
let scanners: Vec<Arc<dyn LocationScanner>> =
self.locations.iter().map(|loc| loc.scanner()).collect();
let routes: Vec<TransferRoute> = self
.pending_routes
.into_iter()
.filter_map(|pr| {
let src_loc = loc_map.get(&pr.src)?;
let dest_loc = loc_map.get(&pr.dest)?;
let cost = match estimate_route_cost(src_loc.kind(), dest_loc.kind()) {
Ok(c) => c,
Err(e) => {
tracing::warn!(src = ?src_loc.kind(), dest = ?dest_loc.kind(), error = %e, "skipping route: invalid cost");
return None;
}
};
let mut route = TransferRoute::new(
pr.src,
pr.dest,
src_loc.file_root().to_path_buf(),
dest_loc.file_root().to_path_buf(),
pr.backend,
)
.with_cost(cost.time_per_gb, cost.priority);
if pr.direction == TransferDirection::Pull {
route = route.direction(TransferDirection::Pull);
}
if let Some(shell) = pr.src_shell {
route = route.with_src_shell(shell);
}
Some(route)
})
.collect();
let location_ids: Vec<LocationId> =
self.locations.iter().map(|loc| loc.id().clone()).collect();
let mut graph = RouteGraph::new();
for r in &routes {
graph.add_with_cost(
r.src().clone(),
r.dest().clone(),
EdgeCost::new(r.time_per_gb(), r.priority())?,
);
}
let topology = TopologyStore::new(
self.topology_files.clone(),
self.location_files.clone(),
self.transfer_store.clone(),
graph.clone(),
location_ids,
);
let engine = TransferEngine::new(graph, routes, config.concurrency);
let scanner = TopologyScanner::new(
self.topology_files.clone(),
self.location_files.clone(),
scanners,
);
Ok(SdkImpl {
scanner,
topology,
engine,
topology_files: self.topology_files,
location_files: self.location_files,
transfer_store: self.transfer_store,
locations: self.locations,
config,
scan_excludes: self.scan_excludes,
progress: StdMutex::new(None),
})
}
}
fn estimate_route_cost(
src: LocationKind,
dest: LocationKind,
) -> Result<EdgeCost, crate::domain::error::DomainError> {
let (time_per_gb, priority) = match (src, dest) {
(LocationKind::Local, LocationKind::Remote) => (1.0, 10),
(LocationKind::Remote, LocationKind::Local) => (1.0, 10),
(LocationKind::Remote, LocationKind::Cloud) => (2.0, 50),
(LocationKind::Cloud, LocationKind::Remote) => (2.0, 50),
(LocationKind::Local, LocationKind::Cloud) => (5.0, 100),
(LocationKind::Cloud, LocationKind::Local) => (5.0, 100),
_ => (1.0, 100),
};
EdgeCost::new(time_per_gb, priority)
}
impl SdkImpl {
fn report_progress(&self, msg: &str) {
if let Ok(guard) = self.progress.lock() {
if let Some(cb) = guard.as_ref() {
cb(msg);
}
}
}
async fn execute_bfs(
&self,
skip_locations: &std::collections::HashSet<crate::domain::location::LocationId>,
) -> Result<(usize, usize, Vec<SyncReportError>), SyncError> {
let mut total_transferred = 0usize;
let mut total_failed = 0usize;
let mut all_errors: Vec<SyncReportError> = Vec::new();
let targets = self.engine.all_targets_ordered();
debug!(targets = ?targets, "execute_bfs: BFS target order");
for target in &targets {
if skip_locations.contains(target) {
warn!(
target = %target,
"execute_bfs: skipping target (ensure failed)"
);
continue;
}
let queued = self.transfer_store.queued_transfers(target).await?;
if queued.is_empty() {
debug!(target = %target, "execute_bfs: no queued transfers, skip");
continue;
}
info!(target = %target, queued = queued.len(), "execute_bfs: processing target");
self.report_progress(&format!("target {target}: {} queued", queued.len()));
let mut prepared = Vec::with_capacity(queued.len());
let mut resolve_miss = 0usize;
for transfer in queued {
match self.topology_files.get_by_id(transfer.file_id()).await {
Ok(Some(file)) => {
trace!(
file_id = %transfer.file_id(),
path = %file.relative_path(),
src = %transfer.src(),
dest = %transfer.dest(),
"execute_bfs: prepared"
);
prepared.push(PreparedTransfer {
transfer,
relative_path: file.relative_path().to_string(),
});
}
Ok(None) => {
resolve_miss += 1;
error!(
file_id = %transfer.file_id(),
src = %transfer.src(),
dest = %transfer.dest(),
"execute_bfs: topology_file not found — transfer skipped"
);
total_failed += 1;
all_errors.push(SyncReportError {
path: transfer.file_id().to_string(),
error: format!("file {} not found in store", transfer.file_id()),
});
}
Err(e) => {
resolve_miss += 1;
error!(
file_id = %transfer.file_id(),
src = %transfer.src(),
dest = %transfer.dest(),
err = %e,
"execute_bfs: topology_file lookup error — transfer skipped"
);
total_failed += 1;
all_errors.push(SyncReportError {
path: transfer.file_id().to_string(),
error: e.to_string(),
});
}
}
}
let (sync_prepared, delete_prepared): (Vec<_>, Vec<_>) =
prepared.into_iter().partition(|p| !p.transfer.is_delete());
debug!(
target = %target,
sync = sync_prepared.len(),
delete = delete_prepared.len(),
resolve_miss = resolve_miss,
"execute_bfs: preparation done"
);
if !sync_prepared.is_empty() {
info!(
target = %target,
count = sync_prepared.len(),
"execute_bfs: executing sync transfers"
);
let sync_outcomes = self.engine.execute_prepared(sync_prepared).await;
info!(
target = %target,
outcomes = sync_outcomes.len(),
"execute_bfs: sync execution done, persisting"
);
self.persist_outcomes(
&sync_outcomes,
&mut total_transferred,
&mut total_failed,
&mut all_errors,
)
.await?;
}
if !delete_prepared.is_empty() {
info!(
target = %target,
count = delete_prepared.len(),
"execute_bfs: executing delete transfers"
);
let delete_outcomes = self.engine.execute_prepared(delete_prepared).await;
info!(
target = %target,
outcomes = delete_outcomes.len(),
"execute_bfs: delete execution done, persisting"
);
self.persist_outcomes(
&delete_outcomes,
&mut total_transferred,
&mut total_failed,
&mut all_errors,
)
.await?;
}
info!(
target = %target,
transferred = total_transferred,
failed = total_failed,
"execute_bfs: target batch done"
);
}
Ok((total_transferred, total_failed, all_errors))
}
async fn persist_outcomes(
&self,
outcomes: &[TransferOutcome],
total_transferred: &mut usize,
total_failed: &mut usize,
all_errors: &mut Vec<SyncReportError>,
) -> Result<(), SyncError> {
for outcome in outcomes {
let is_completed = outcome.transfer.state() == TransferState::Completed;
self.transfer_store
.update_transfer(&outcome.transfer)
.await?;
if is_completed {
self.transfer_store
.unblock_dependents(outcome.transfer.id())
.await?;
if outcome.transfer.is_delete() {
let deleted = self
.location_files
.delete(outcome.transfer.file_id(), outcome.transfer.dest())
.await?;
trace!(
file_id = %outcome.transfer.file_id(),
dest = %outcome.transfer.dest(),
deleted = deleted,
"execute_bfs: delete transfer → LocationFile removed"
);
} else {
if let Ok(Some(tf)) = self
.topology_files
.get_by_id(outcome.transfer.file_id())
.await
{
let src_lf = self
.location_files
.get(outcome.transfer.file_id(), outcome.transfer.src())
.await?;
if let Some(src_lf) = src_lf {
trace!(
file_id = %outcome.transfer.file_id(),
src = %outcome.transfer.src(),
dest = %outcome.transfer.dest(),
path = %outcome.relative_path,
"persist_outcomes: creating dest LocationFile from src"
);
let dest_lf = tf
.materialize(
outcome.transfer.dest().clone(),
outcome.relative_path.clone(),
src_lf.fingerprint().clone(),
src_lf.embedded_id().map(|s| s.to_string()),
)
.map_err(SyncError::Domain)?;
self.location_files.upsert(&dest_lf).await?;
} else {
warn!(
file_id = %outcome.transfer.file_id(),
src = %outcome.transfer.src(),
"persist_outcomes: src LocationFile not found, cannot create dest LF"
);
}
} else {
warn!(
file_id = %outcome.transfer.file_id(),
"persist_outcomes: TopologyFile not found for completed transfer"
);
}
}
*total_transferred += 1;
info!(
id = %outcome.transfer.id(),
src = %outcome.transfer.src(),
dest = %outcome.transfer.dest(),
path = %outcome.relative_path,
kind = ?outcome.transfer.kind(),
"execute_bfs: transfer completed"
);
} else {
*total_failed += 1;
let err_msg = outcome
.transfer
.error()
.map(|e| e.to_string())
.unwrap_or_else(|| "unknown error".to_string());
error!(
id = %outcome.transfer.id(),
src = %outcome.transfer.src(),
dest = %outcome.transfer.dest(),
path = %outcome.relative_path,
err = %err_msg,
"execute_bfs: transfer FAILED"
);
all_errors.push(SyncReportError {
path: outcome.relative_path.clone(),
error: err_msg,
});
}
}
Ok(())
}
}
#[async_trait]
impl SyncStoreSdk for SdkImpl {
async fn sync(&self) -> Result<SyncReport, SyncError> {
info!("sdk_impl::sync: pipeline start");
self.report_progress("ensure: checking locations");
let location_ids: Vec<String> = self.locations.iter().map(|l| l.id().to_string()).collect();
info!(
location_count = self.locations.len(),
locations = %location_ids.join(", "),
"sdk_impl::sync: ensure start"
);
let mut failed_locations: std::collections::HashSet<LocationId> =
std::collections::HashSet::new();
for loc in &self.locations {
info!(
location = %loc.id(),
kind = ?loc.kind(),
"sdk_impl::sync: ensure checking"
);
match loc.ensure().await {
Ok(()) => {
info!(location = %loc.id(), "sdk_impl::sync: ensure ok");
}
Err(e) => {
error!(
location = %loc.id(),
kind = ?loc.kind(),
error = %e,
"sdk_impl::sync: ensure FAILED — this location will be excluded from sync"
);
failed_locations.insert(loc.id().clone());
}
}
}
if failed_locations.is_empty() {
info!("sdk_impl::sync: ensure done — all locations reachable");
} else {
let excluded: Vec<String> = failed_locations.iter().map(|l| l.to_string()).collect();
warn!(
excluded = %excluded.join(", "),
"sdk_impl::sync: ensure done — {} location(s) excluded due to ensure failure",
failed_locations.len()
);
}
let cancelled = self.transfer_store.cancel_orphaned_inflight().await?;
if cancelled > 0 {
info!(
cancelled_count = cancelled,
"sdk_impl::sync: cancelled orphaned InFlight transfers"
);
}
self.report_progress("scan: scanning locations");
info!("sdk_impl::sync: phase1 scan start");
let progress_cb = self.progress.lock().ok().and_then(|g| g.clone());
let scan_result = self
.scanner
.scan_all(&self.scan_excludes, &failed_locations, progress_cb.as_ref())
.await?;
info!(
scanned = scan_result.scanned,
deltas = scan_result.deltas.len(),
scan_errors = scan_result.scan_errors.len(),
"sdk_impl::sync: phase1 scan done"
);
for delta in &scan_result.deltas {
trace!(delta = ?delta, "sdk_impl::sync: delta");
}
self.report_progress(&format!(
"plan: {} files scanned, {} deltas",
scan_result.scanned,
scan_result.deltas.len()
));
info!(
delta_count = scan_result.deltas.len(),
"sdk_impl::sync: phase2 plan start"
);
let plan_result = self.topology.sync(&scan_result.deltas).await?;
info!(
transfers_created = plan_result.transfers_created,
conflicts = plan_result.conflicts.len(),
"sdk_impl::sync: phase2 plan done"
);
if let Ok(guard) = self.progress.lock() {
self.engine.set_progress_callback(guard.clone());
}
self.report_progress(&format!(
"execute: {} transfers queued",
plan_result.transfers_created
));
info!("sdk_impl::sync: phase3 execute start");
let (transferred, failed, errors) = self.execute_bfs(&failed_locations).await?;
self.engine.set_progress_callback(None);
info!(
transferred = transferred,
failed = failed,
error_count = errors.len(),
"sdk_impl::sync: phase3 execute done"
);
Ok(SyncReport {
scanned: scan_result.scanned,
scan_errors: scan_result
.scan_errors
.iter()
.map(|e| SyncReportError {
path: e.path.clone(),
error: e.error.clone(),
})
.collect(),
transfers_created: plan_result.transfers_created,
transferred,
failed,
errors,
conflicts: plan_result
.conflicts
.iter()
.map(super::sdk::SyncReportConflict::from)
.collect(),
})
}
async fn sync_route(
&self,
src: &LocationId,
dest: &LocationId,
) -> Result<SyncReport, SyncError> {
let cancelled = self.transfer_store.cancel_orphaned_inflight().await?;
if cancelled > 0 {
info!(
cancelled_count = cancelled,
"sync_route: cancelled orphaned InFlight transfers"
);
}
self.report_progress(&format!("plan: route {src} → {dest}"));
let plan_result = self.topology.sync_route(src, dest).await?;
if let Ok(guard) = self.progress.lock() {
self.engine.set_progress_callback(guard.clone());
}
let queued = self.transfer_store.queued_transfers(dest).await?;
let eligible: Vec<_> = queued.into_iter().filter(|t| t.src() == src).collect();
let mut prepared = Vec::with_capacity(eligible.len());
let mut total_failed = 0usize;
let mut all_errors: Vec<SyncReportError> = Vec::new();
for transfer in eligible {
match self.topology_files.get_by_id(transfer.file_id()).await {
Ok(Some(file)) => {
prepared.push(PreparedTransfer {
transfer,
relative_path: file.relative_path().to_string(),
});
}
Ok(None) => {
total_failed += 1;
all_errors.push(SyncReportError {
path: transfer.file_id().to_string(),
error: format!("file {} not found in store", transfer.file_id()),
});
}
Err(e) => {
total_failed += 1;
all_errors.push(SyncReportError {
path: transfer.file_id().to_string(),
error: e.to_string(),
});
}
}
}
self.report_progress(&format!(
"execute: {} transfers ({src} → {dest})",
prepared.len()
));
let outcomes = self.engine.execute_prepared(prepared).await;
self.engine.set_progress_callback(None);
let mut total_transferred = 0usize;
self.persist_outcomes(
&outcomes,
&mut total_transferred,
&mut total_failed,
&mut all_errors,
)
.await?;
Ok(SyncReport {
scanned: 0,
scan_errors: Vec::new(),
transfers_created: plan_result.transfers_created,
transferred: total_transferred,
failed: total_failed,
errors: all_errors,
conflicts: plan_result
.conflicts
.iter()
.map(super::sdk::SyncReportConflict::from)
.collect(),
})
}
async fn put(
&self,
path: &str,
file_type: FileType,
fingerprint: FileFingerprint,
origin: &LocationId,
embedded_id: Option<String>,
) -> Result<PutReport, SyncError> {
let result = self
.topology
.put(path, file_type, fingerprint, origin, embedded_id)
.await?;
Ok(PutReport {
file_id: result.topology_file_id,
is_new: result.is_new,
transfers_created: result.transfers_created,
})
}
async fn delete(&self, path: &str) -> Result<usize, SyncError> {
self.topology.delete(path).await
}
async fn get(&self, path: &str) -> Result<Option<TopologyFileView>, SyncError> {
self.topology.get(path).await
}
async fn list(
&self,
file_type: Option<FileType>,
limit: Option<usize>,
) -> Result<Vec<TopologyFileView>, SyncError> {
self.topology.list(file_type, limit).await
}
async fn status(&self) -> Result<SyncSummary, SyncError> {
use crate::domain::location::LocationSummary;
use crate::domain::transfer::TransferState;
use std::collections::HashMap;
let retry_policy = self.config.retry_policy();
let total_files = self.topology.file_count().await?;
let stats = self.transfer_store.transfer_stats().await?;
let present_counts = self.transfer_store.present_counts_by_location().await?;
let failed = self.transfer_store.failed_transfers().await?;
let pending = self.transfer_store.all_pending_transfers().await?;
let mut locations: HashMap<LocationId, LocationSummary> = HashMap::new();
let mut total_errors = 0usize;
for (loc, count) in &present_counts {
let summary = locations.entry(loc.clone()).or_default();
summary.present = *count;
}
for row in &stats {
if row.state == TransferState::Completed || row.state == TransferState::Cancelled {
continue;
}
let dest_state = match row.state {
TransferState::Blocked | TransferState::Queued => PresenceState::Pending,
TransferState::InFlight => PresenceState::Syncing,
TransferState::Failed => {
let exhausted = match row.error_kind.as_deref() {
Some("permanent") => true,
_ => row.attempt >= retry_policy.max_attempts(),
};
if exhausted {
PresenceState::Failed
} else {
PresenceState::Pending
}
}
TransferState::Completed | TransferState::Cancelled => PresenceState::Absent,
};
let dest_summary = locations.entry(row.dest.clone()).or_default();
match dest_state {
PresenceState::Pending => {
dest_summary.pending = dest_summary.pending.saturating_add(row.file_count);
}
PresenceState::Syncing => {
dest_summary.syncing = dest_summary.syncing.saturating_add(row.file_count);
}
PresenceState::Failed => {
dest_summary.failed = dest_summary.failed.saturating_add(row.file_count);
total_errors = total_errors.saturating_add(row.file_count);
}
PresenceState::Absent => {
dest_summary.absent = dest_summary.absent.saturating_add(row.file_count);
}
PresenceState::Present => {}
}
}
let error_entries: Vec<ErrorEntry> = failed
.iter()
.filter(|t| {
let state = PresenceState::from_transfer(t, &retry_policy);
state == PresenceState::Failed
})
.map(ErrorEntry::from_transfer)
.collect();
let mut pending_entries: Vec<PendingEntry> =
pending.iter().map(PendingEntry::from_transfer).collect();
for t in &failed {
let state = PresenceState::from_transfer(t, &retry_policy);
if state == PresenceState::Pending {
pending_entries.push(PendingEntry::from_transfer(t));
}
}
Ok(SyncSummary {
locations,
total_entries: total_files,
total_errors,
error_entries,
pending_entries,
})
}
async fn errors(&self) -> Result<Vec<ErrorEntry>, SyncError> {
let retry_policy = self.config.retry_policy();
let failed = self.transfer_store.failed_transfers().await?;
Ok(failed
.iter()
.filter(|t| {
let state = PresenceState::from_transfer(t, &retry_policy);
state == PresenceState::Failed
})
.map(ErrorEntry::from_transfer)
.collect())
}
async fn pending(&self, dest: &LocationId) -> Result<Vec<PendingEntry>, SyncError> {
let retry_policy = self.config.retry_policy();
let all_pending = self.transfer_store.all_pending_transfers().await?;
let mut entries: Vec<PendingEntry> = all_pending
.iter()
.filter(|t| t.dest() == dest)
.map(PendingEntry::from_transfer)
.collect();
let failed = self.transfer_store.failed_transfers().await?;
for t in &failed {
if t.dest() == dest {
let state = PresenceState::from_transfer(t, &retry_policy);
if state == PresenceState::Pending {
entries.push(PendingEntry::from_transfer(t));
}
}
}
Ok(entries)
}
fn locations(&self) -> Vec<LocationId> {
self.topology.locations().to_vec()
}
fn all_edges(&self) -> Vec<(LocationId, LocationId)> {
self.engine.all_edges()
}
fn local_root(&self) -> Option<&Path> {
self.engine.local_root()
}
fn set_progress_callback(&self, callback: Option<ProgressFn>) {
if let Ok(mut guard) = self.progress.lock() {
*guard = callback;
}
}
}