use std::collections::HashMap;
use std::sync::{Arc, Mutex as StdMutex};
use super::SdkImpl;
use crate::application::error::SyncError;
use crate::application::route::{TransferDirection, TransferRoute};
use crate::application::topology_scanner::TopologyScanner;
use crate::application::topology_store::TopologyStore;
use crate::application::transfer_engine::TransferEngine;
use crate::domain::config::SyncConfig;
use crate::domain::graph::{EdgeCost, RouteGraph};
use crate::domain::location::LocationId;
use crate::infra::backend::StorageBackend;
use crate::infra::location::{Location, LocationKind};
use crate::infra::location_file_store::LocationFileStore;
use crate::infra::location_scanner::LocationScanner;
use crate::infra::shell::RemoteShell;
use crate::infra::topology_file_store::TopologyFileStore;
use crate::infra::transfer_store::TransferStore;
struct PendingRoute {
src: LocationId,
dest: LocationId,
backend: Box<dyn StorageBackend>,
src_shell: Option<Box<dyn RemoteShell>>,
direction: TransferDirection,
}
pub struct SdkImplBuilder {
topology_files: Arc<dyn TopologyFileStore>,
location_files: Arc<dyn LocationFileStore>,
transfer_store: Arc<dyn TransferStore>,
locations: Vec<Arc<dyn Location>>,
pending_routes: Vec<PendingRoute>,
config: Option<SyncConfig>,
scan_excludes: Vec<glob::Pattern>,
archive_roots: HashMap<LocationId, std::path::PathBuf>,
}
impl SdkImplBuilder {
pub fn new(
topology_files: Arc<dyn TopologyFileStore>,
location_files: Arc<dyn LocationFileStore>,
transfer_store: Arc<dyn TransferStore>,
) -> Self {
Self {
topology_files,
location_files,
transfer_store,
locations: Vec::new(),
pending_routes: Vec::new(),
config: None,
scan_excludes: Vec::new(),
archive_roots: HashMap::new(),
}
}
pub fn archive_route_to(mut self, dest: &LocationId, archive_root: std::path::PathBuf) -> Self {
self.archive_roots.insert(dest.clone(), archive_root);
self
}
pub fn location(mut self, loc: Arc<dyn Location>) -> Self {
if !self.locations.iter().any(|l| l.id() == loc.id()) {
self.locations.push(loc);
}
self
}
pub fn connect(
mut self,
src: &LocationId,
dest: &LocationId,
backend: Box<dyn StorageBackend>,
) -> Self {
self.pending_routes.push(PendingRoute {
src: src.clone(),
dest: dest.clone(),
backend,
src_shell: None,
direction: TransferDirection::Push,
});
self
}
pub fn connect_with_shell(
mut self,
src: &LocationId,
dest: &LocationId,
backend: Box<dyn StorageBackend>,
src_shell: Box<dyn RemoteShell>,
) -> Self {
self.pending_routes.push(PendingRoute {
src: src.clone(),
dest: dest.clone(),
backend,
src_shell: Some(src_shell),
direction: TransferDirection::Push,
});
self
}
pub fn connect_pull(
mut self,
src: &LocationId,
dest: &LocationId,
backend: Box<dyn StorageBackend>,
) -> Self {
self.pending_routes.push(PendingRoute {
src: src.clone(),
dest: dest.clone(),
backend,
src_shell: None,
direction: TransferDirection::Pull,
});
self
}
pub fn config(mut self, config: SyncConfig) -> Self {
self.config = Some(config);
self
}
pub fn exclude(mut self, pattern: &str) -> Self {
match glob::Pattern::new(pattern) {
Ok(p) => self.scan_excludes.push(p),
Err(e) => {
tracing::warn!(pattern = pattern, error = %e, "invalid exclude glob pattern, skipped");
}
}
self
}
pub fn build(self) -> Result<SdkImpl, SyncError> {
let config = self.config.unwrap_or_default();
let loc_map: HashMap<LocationId, &Arc<dyn Location>> =
self.locations.iter().map(|l| (l.id().clone(), l)).collect();
let scanners: Vec<Arc<dyn LocationScanner>> =
self.locations.iter().map(|loc| loc.scanner()).collect();
let archive_roots = self.archive_roots;
let routes: Vec<TransferRoute> = self
.pending_routes
.into_iter()
.filter_map(|pr| {
let src_loc = loc_map.get(&pr.src)?;
let dest_loc = loc_map.get(&pr.dest)?;
let cost = match estimate_route_cost(src_loc.kind(), dest_loc.kind()) {
Ok(c) => c,
Err(e) => {
tracing::warn!(src = ?src_loc.kind(), dest = ?dest_loc.kind(), error = %e, "skipping route: invalid cost");
return None;
}
};
let archive_root_for_dest = archive_roots.get(&pr.dest).cloned();
let mut route = TransferRoute::new(
pr.src,
pr.dest.clone(),
src_loc.file_root().to_path_buf(),
dest_loc.file_root().to_path_buf(),
pr.backend,
)
.with_cost(cost.time_per_gb, cost.priority);
if let Some(archive_root) = archive_root_for_dest {
if pr.direction == TransferDirection::Push {
route = route.with_archive_root(archive_root);
}
}
if pr.direction == TransferDirection::Pull {
route = route.direction(TransferDirection::Pull);
}
if let Some(shell) = pr.src_shell {
route = route.with_src_shell(shell);
}
Some(route)
})
.collect();
let location_ids: Vec<LocationId> =
self.locations.iter().map(|loc| loc.id().clone()).collect();
let mut graph = RouteGraph::new();
for r in &routes {
graph.add_with_cost(
r.src().clone(),
r.dest().clone(),
EdgeCost::new(r.time_per_gb(), r.priority())?,
);
}
let topology = TopologyStore::new(
self.topology_files.clone(),
self.location_files.clone(),
self.transfer_store.clone(),
graph.clone(),
location_ids,
);
let engine = TransferEngine::new(graph, routes, config.concurrency);
let scanner = TopologyScanner::new(
self.topology_files.clone(),
self.location_files.clone(),
scanners,
);
Ok(SdkImpl {
scanner,
topology,
engine,
topology_files: self.topology_files,
location_files: self.location_files,
transfer_store: self.transfer_store,
locations: self.locations,
config,
scan_excludes: self.scan_excludes,
progress: StdMutex::new(None),
})
}
}
fn estimate_route_cost(
src: LocationKind,
dest: LocationKind,
) -> Result<EdgeCost, crate::domain::error::DomainError> {
let (time_per_gb, priority) = match (src, dest) {
(LocationKind::Local, LocationKind::Remote) => (1.0, 10),
(LocationKind::Remote, LocationKind::Local) => (1.0, 10),
(LocationKind::Remote, LocationKind::Cloud) => (2.0, 50),
(LocationKind::Cloud, LocationKind::Remote) => (2.0, 50),
(LocationKind::Local, LocationKind::Cloud) => (5.0, 100),
(LocationKind::Cloud, LocationKind::Local) => (5.0, 100),
_ => (1.0, 100),
};
EdgeCost::new(time_per_gb, priority)
}