use std::collections::HashMap;
use futures::stream::{self, StreamExt};
use tracing::{debug, info, trace, warn};
use super::route::TransferRoute;
use crate::application::error::SyncError;
use crate::domain::graph::RouteGraph;
use crate::domain::location::LocationId;
use crate::domain::plan::Topology;
use crate::domain::retry::TransferErrorKind;
use crate::domain::transfer::{Transfer, TransferKind, TransferState};
pub struct PreparedTransfer {
pub transfer: Transfer,
pub relative_path: String,
}
pub struct TransferOutcome {
pub transfer: Transfer,
pub relative_path: String,
}
type RouteKey = (LocationId, LocationId);
trait AsyncBatchFn {
fn call(
&self,
route: &TransferRoute,
paths: &[String],
) -> impl std::future::Future<Output = HashMap<String, Result<(), SyncError>>> + Send;
}
struct BatchSync;
impl AsyncBatchFn for BatchSync {
async fn call(
&self,
route: &TransferRoute,
paths: &[String],
) -> HashMap<String, Result<(), SyncError>> {
route.transfer_batch(paths).await
}
}
struct BatchDelete;
impl AsyncBatchFn for BatchDelete {
async fn call(
&self,
route: &TransferRoute,
paths: &[String],
) -> HashMap<String, Result<(), SyncError>> {
route.delete_batch(paths).await
}
}
pub struct TransferEngine {
graph: RouteGraph,
routes: HashMap<RouteKey, TransferRoute>,
concurrency: usize,
}
impl TransferEngine {
const DEFAULT_CONCURRENCY: usize = 8;
fn build_route_map(routes: Vec<TransferRoute>) -> HashMap<RouteKey, TransferRoute> {
routes
.into_iter()
.map(|r| ((r.src().clone(), r.dest().clone()), r))
.collect()
}
pub fn new(graph: RouteGraph, routes: Vec<TransferRoute>, concurrency: usize) -> Self {
let concurrency = if concurrency == 0 {
Self::DEFAULT_CONCURRENCY
} else {
concurrency
};
Self {
graph,
routes: Self::build_route_map(routes),
concurrency,
}
}
pub fn all_edges(&self) -> Vec<(LocationId, LocationId)> {
self.graph.all_edges()
}
pub fn find_route(&self, src: &LocationId, dest: &LocationId) -> Option<&TransferRoute> {
self.routes.get(&(src.clone(), dest.clone()))
}
pub fn archive_route(&self) -> Option<&TransferRoute> {
self.routes.values().find(|r| r.archive_root().is_some())
}
fn destinations_ordered(&self) -> Vec<LocationId> {
self.graph.destinations_ordered_from(&LocationId::local())
}
pub(crate) fn set_progress_callback(
&self,
callback: Option<crate::infra::backend::ProgressFn>,
) {
for route in self.routes.values() {
route.backend().set_progress_callback(callback.clone());
}
}
pub fn all_targets_ordered(&self) -> Vec<LocationId> {
let mut targets = self.destinations_ordered();
for dest in self.graph.all_destinations() {
if !targets.contains(&dest) {
targets.push(dest);
}
}
targets
}
pub fn local_root(&self) -> Option<&std::path::Path> {
self.routes
.values()
.find(|r| r.src().is_local())
.map(|r| r.src_file_root())
}
pub async fn execute_prepared(&self, prepared: Vec<PreparedTransfer>) -> Vec<TransferOutcome> {
let mut outcomes = Vec::with_capacity(prepared.len());
let mut by_route: HashMap<RouteKey, Vec<PreparedTransfer>> = HashMap::new();
for p in prepared {
let key = (p.transfer.src().clone(), p.transfer.dest().clone());
by_route.entry(key).or_default().push(p);
}
for ((src, dest), group) in by_route {
let group_len = group.len();
info!(src = %src, dest = %dest, count = group_len, "execute_prepared: route group start");
let route = match self.find_route(&src, &dest) {
Some(r) => r,
None => {
warn!(src = %src, dest = %dest, count = group_len, "execute_prepared: no route found");
for mut p in group {
let _ = p.transfer.start();
let _ = p.transfer.fail(
format!("no route: {src} → {dest}"),
TransferErrorKind::Permanent,
);
outcomes.push(TransferOutcome {
transfer: p.transfer,
relative_path: p.relative_path,
});
}
continue;
}
};
let (delete_group, sync_group): (Vec<_>, Vec<_>) =
group.into_iter().partition(|p| p.transfer.is_delete());
debug!(
src = %src, dest = %dest,
sync = sync_group.len(), delete = delete_group.len(),
batch = route.supports_batch(),
"execute_prepared: partitioned"
);
if route.supports_batch() && sync_group.len() > 1 {
info!(src = %src, dest = %dest, count = sync_group.len(), "execute_prepared: batch transfer start");
outcomes.extend(
Self::execute_batch_common(route, sync_group, BatchSync, "batch_sync").await,
);
} else {
info!(src = %src, dest = %dest, count = sync_group.len(), concurrency = self.concurrency, "execute_prepared: individual transfer start");
let sync_outcomes: Vec<TransferOutcome> = stream::iter(
sync_group
.into_iter()
.map(|p| async { Self::execute_single_pure(route, p).await }),
)
.buffer_unordered(self.concurrency)
.collect()
.await;
outcomes.extend(sync_outcomes);
}
if !delete_group.is_empty() {
if route.supports_batch() && delete_group.len() > 1 {
info!(src = %src, dest = %dest, count = delete_group.len(), "execute_prepared: batch delete start");
outcomes.extend(
Self::execute_batch_common(
route,
delete_group,
BatchDelete,
"batch_delete",
)
.await,
);
} else {
info!(src = %src, dest = %dest, count = delete_group.len(), concurrency = self.concurrency, "execute_prepared: individual delete start");
let delete_outcomes: Vec<TransferOutcome> = stream::iter(
delete_group
.into_iter()
.map(|p| async { Self::execute_single_pure(route, p).await }),
)
.buffer_unordered(self.concurrency)
.collect()
.await;
outcomes.extend(delete_outcomes);
}
}
let completed = outcomes
.iter()
.filter(|o| o.transfer.state() == TransferState::Completed)
.count();
let failed = outcomes
.iter()
.filter(|o| o.transfer.state() == TransferState::Failed)
.count();
info!(
src = %src, dest = %dest,
completed = completed, failed = failed,
"execute_prepared: route group done"
);
}
outcomes
}
async fn execute_single_pure(
route: &TransferRoute,
mut prepared: PreparedTransfer,
) -> TransferOutcome {
trace!(
transfer_id = %prepared.transfer.id(),
path = %prepared.relative_path,
src = %prepared.transfer.src(),
dest = %prepared.transfer.dest(),
kind = ?prepared.transfer.kind(),
"execute_single_pure: start"
);
if !route.is_pull() && !prepared.transfer.is_delete() {
match route.src_file_exists(&prepared.relative_path).await {
Ok(true) => {}
Ok(false) => {
let _ = prepared.transfer.start();
let _ = prepared.transfer.fail(
format!("source file not found on {}", prepared.transfer.src()),
TransferErrorKind::Permanent,
);
return TransferOutcome {
transfer: prepared.transfer,
relative_path: prepared.relative_path,
};
}
Err(e) => {
let _ = prepared.transfer.start();
let _ = prepared
.transfer
.fail(e.to_string(), classify_transfer_error(&e));
return TransferOutcome {
transfer: prepared.transfer,
relative_path: prepared.relative_path,
};
}
}
}
if let Err(e) = prepared.transfer.start() {
warn!(
transfer_id = %prepared.transfer.id(),
error = %e,
"execute_single_pure: failed to start transfer"
);
return TransferOutcome {
transfer: prepared.transfer,
relative_path: prepared.relative_path,
};
}
let op_result = match prepared.transfer.kind() {
TransferKind::Sync => route.transfer(&prepared.relative_path).await,
TransferKind::Delete => route.delete(&prepared.relative_path).await,
};
match op_result {
Ok(()) => {
if let Err(e) = prepared.transfer.complete() {
warn!(
transfer_id = %prepared.transfer.id(),
error = %e,
"execute_single_pure: failed to complete transfer"
);
}
debug!(
path = %prepared.relative_path,
src = %prepared.transfer.src(),
dest = %prepared.transfer.dest(),
"execute_single_pure: completed"
);
}
Err(e) => {
let kind = classify_transfer_error(&e);
debug!(
path = %prepared.relative_path,
err = %e,
kind = ?kind,
"execute_single_pure: failed"
);
if let Err(state_err) = prepared.transfer.fail(e.to_string(), kind) {
warn!(
transfer_id = %prepared.transfer.id(),
error = %state_err,
"execute_single_pure: failed to mark transfer as failed"
);
}
}
}
TransferOutcome {
transfer: prepared.transfer,
relative_path: prepared.relative_path,
}
}
async fn execute_batch_common(
route: &TransferRoute,
mut prepared: Vec<PreparedTransfer>,
batch_fn: impl AsyncBatchFn,
label: &str,
) -> Vec<TransferOutcome> {
let relative_paths: Vec<String> =
prepared.iter().map(|p| p.relative_path.clone()).collect();
for p in &mut prepared {
if let Err(e) = p.transfer.start() {
warn!(
transfer_id = %p.transfer.id(),
error = %e,
"{label}: failed to start transfer"
);
}
}
let batch_start = std::time::Instant::now();
info!(
count = relative_paths.len(),
src = %route.src(),
dest = %route.dest(),
"{label}: calling batch"
);
let batch_results = batch_fn.call(route, &relative_paths).await;
let elapsed = batch_start.elapsed();
info!(
results = batch_results.len(),
elapsed_secs = elapsed.as_secs(),
src = %route.src(),
dest = %route.dest(),
"{label}: batch returned"
);
let mut outcomes = Vec::with_capacity(prepared.len());
let mut path_map: HashMap<String, PreparedTransfer> = prepared
.into_iter()
.map(|p| (p.relative_path.clone(), p))
.collect();
for (rel_path, result) in batch_results {
if let Some(mut p) = path_map.remove(&rel_path) {
match result {
Ok(()) => {
if let Err(e) = p.transfer.complete() {
warn!(
transfer_id = %p.transfer.id(),
error = %e,
"{label}: failed to complete transfer"
);
}
}
Err(e) => {
let kind = classify_transfer_error(&e);
let _ = p.transfer.fail(e.to_string(), kind);
}
}
outcomes.push(TransferOutcome {
transfer: p.transfer,
relative_path: p.relative_path,
});
}
}
for (_, mut p) in path_map {
let _ = p.transfer.fail(
format!("not included in {label} result"),
TransferErrorKind::Transient,
);
outcomes.push(TransferOutcome {
transfer: p.transfer,
relative_path: p.relative_path,
});
}
outcomes
}
}
impl Topology for TransferEngine {
fn optimal_tree(
&self,
origin: &LocationId,
required_dests: &std::collections::HashSet<LocationId>,
) -> Vec<(LocationId, LocationId)> {
self.graph.optimal_tree(origin, required_dests)
}
}
fn classify_transfer_error(e: &SyncError) -> TransferErrorKind {
match e {
SyncError::Domain(_) => TransferErrorKind::Permanent,
SyncError::OutsideSyncRoot { .. }
| SyncError::NotRegistered(_)
| SyncError::NoBackend(_)
| SyncError::NoRouteAvailable { .. }
| SyncError::Init(_) => TransferErrorKind::Permanent,
SyncError::Infra(infra) => classify_infra_error(infra),
SyncError::Duplicate { .. } => TransferErrorKind::Permanent,
}
}
fn classify_infra_error(e: &crate::infra::error::InfraError) -> TransferErrorKind {
use crate::infra::error::InfraError;
match e {
InfraError::FileNotFound(_) => TransferErrorKind::Permanent,
InfraError::Transfer { reason } => {
let r = reason.to_lowercase();
if r.contains("not valid utf-8")
|| r.contains("traversal")
|| r.contains("not supported")
|| r.contains("starts with '-'")
{
TransferErrorKind::Permanent
} else {
TransferErrorKind::Transient
}
}
InfraError::Io(_) => TransferErrorKind::Transient,
InfraError::Store { .. }
| InfraError::Hash { .. }
| InfraError::Serialization(_)
| InfraError::Init(_) => TransferErrorKind::Permanent,
}
}