pub mod checksumdb;
pub mod dircache;
pub mod output;
pub mod progress;
pub mod ratelimit;
pub mod resume;
pub mod scale;
pub mod scanner;
pub mod server_mode;
pub mod strategy;
pub mod transfer;
#[cfg(feature = "watch")]
pub mod watch;
use crate::cli::SymlinkMode;
use crate::error::Result;
use crate::filter::FilterEngine;
use crate::integrity::{ChecksumType, IntegrityVerifier};
use crate::perf::{PerformanceMetrics, PerformanceMonitor};
use crate::resource;
use crate::transport::Transport;
use dircache::DirectoryCache;
use futures::{FutureExt, stream::StreamExt};
use indicatif::{ProgressBar, ProgressStyle};
use output::SyncEvent;
use ratelimit::RateLimiter;
use resume::{ResumeState, SyncFlags};
use scale::FileSetBloom;
use scanner::FileEntry;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use strategy::{StrategyPlanner, SyncAction};
use transfer::Transferrer;
#[derive(Debug, Clone)]
pub struct SyncError {
pub path: PathBuf,
pub error: String,
pub action: String,
}
#[derive(Debug, Clone, Default)]
pub struct SyncStats {
pub files_scanned: u64,
pub files_created: u64,
pub files_updated: u64,
pub files_skipped: usize,
pub files_deleted: usize,
pub bytes_transferred: u64,
pub files_delta_synced: usize,
pub delta_bytes_saved: u64,
pub files_compressed: usize,
pub compression_bytes_saved: u64,
pub files_verified: usize,
pub verification_failures: usize,
pub duration: Duration,
pub bytes_would_add: u64,
pub bytes_would_change: u64,
pub bytes_would_delete: u64,
#[allow(dead_code)]
pub dirs_created: u64,
#[allow(dead_code)]
pub symlinks_created: u64,
pub errors: Vec<SyncError>,
}
#[derive(Debug)]
pub struct VerificationResult {
pub files_matched: usize,
pub files_mismatched: Vec<PathBuf>,
pub files_only_in_source: Vec<PathBuf>,
pub files_only_in_dest: Vec<PathBuf>,
pub errors: Vec<SyncError>,
pub duration: Duration,
}
pub struct SyncEngine<T: Transport> {
transport: Arc<T>,
dry_run: bool,
diff_mode: bool,
delete: bool,
delete_threshold: u8,
#[allow(dead_code)] trash: bool,
force_delete: bool,
quiet: bool,
max_concurrent: usize,
max_errors: usize,
min_size: Option<u64>,
max_size: Option<u64>,
filter_engine: FilterEngine,
bwlimit: Option<u64>,
resume: bool,
checkpoint_files: usize,
checkpoint_bytes: u64,
json: bool,
verification_mode: ChecksumType,
verify_on_write: bool,
symlink_mode: SymlinkMode,
preserve_xattrs: bool,
preserve_hardlinks: bool,
preserve_acls: bool,
preserve_flags: bool, per_file_progress: bool, ignore_times: bool,
size_only: bool,
checksum: bool,
update_only: bool,
ignore_existing: bool,
use_cache: bool,
clear_cache: bool,
checksum_db: bool,
clear_checksum_db: bool,
prune_checksum_db: bool,
dest_is_remote: bool,
perf_monitor: Option<Arc<Mutex<PerformanceMonitor>>>,
}
impl<T: Transport + 'static> SyncEngine<T> {
#[allow(clippy::too_many_arguments)]
pub fn new(
transport: T,
dry_run: bool,
diff_mode: bool,
delete: bool,
delete_threshold: u8,
trash: bool,
force_delete: bool,
quiet: bool,
max_concurrent: usize,
max_errors: usize,
min_size: Option<u64>,
max_size: Option<u64>,
filter_engine: FilterEngine,
bwlimit: Option<u64>,
resume: bool,
checkpoint_files: usize,
checkpoint_bytes: u64,
json: bool,
verification_mode: ChecksumType,
verify_on_write: bool,
symlink_mode: SymlinkMode,
preserve_xattrs: bool,
preserve_hardlinks: bool,
preserve_acls: bool,
preserve_flags: bool, per_file_progress: bool,
ignore_times: bool,
size_only: bool,
checksum: bool,
update_only: bool,
ignore_existing: bool,
use_cache: bool,
clear_cache: bool,
checksum_db: bool,
clear_checksum_db: bool,
prune_checksum_db: bool,
dest_is_remote: bool,
perf: bool,
) -> Self {
let perf_monitor = if perf { Some(Arc::new(Mutex::new(PerformanceMonitor::new(bwlimit)))) } else { None };
Self {
transport: Arc::new(transport),
dry_run,
diff_mode,
delete,
delete_threshold,
trash,
force_delete,
quiet,
max_concurrent,
max_errors,
min_size,
max_size,
filter_engine,
bwlimit,
resume,
checkpoint_files,
checkpoint_bytes,
json,
verification_mode,
verify_on_write,
symlink_mode,
preserve_xattrs,
preserve_hardlinks,
preserve_acls,
preserve_flags,
per_file_progress,
ignore_times,
size_only,
checksum,
update_only,
ignore_existing,
use_cache,
clear_cache,
checksum_db,
clear_checksum_db,
prune_checksum_db,
dest_is_remote,
perf_monitor,
}
}
fn should_filter_by_size(&self, file_size: u64) -> bool {
if let Some(min) = self.min_size
&& file_size < min
{
return true;
}
if let Some(max) = self.max_size
&& file_size > max
{
return true;
}
false
}
fn should_exclude(&self, relative_path: &Path, is_dir: bool) -> bool {
self.filter_engine.should_exclude(relative_path, is_dir)
}
pub async fn sync(&self, source: &Path, destination: &Path) -> Result<SyncStats> {
let start_time = std::time::Instant::now();
tracing::info!("Starting sync: {} → {}", source.display(), destination.display());
if let Err(e) = crate::resume::TransferState::clear_stale_states(std::time::Duration::from_secs(7 * 24 * 60 * 60)) {
tracing::warn!("Failed to clean up stale resume states: {}", e);
}
if !self.dry_run {
tracing::debug!("Ensuring destination directory exists: {}", destination.display());
self.transport.create_dir_all(destination).await?;
}
if self.clear_cache && !self.dry_run {
if let Err(e) = DirectoryCache::delete(destination) {
tracing::warn!("Failed to clear directory cache: {}", e);
} else {
tracing::debug!("Cleared directory cache");
}
}
let mut dir_cache = if self.use_cache {
let cache = DirectoryCache::load(destination);
tracing::debug!("Loaded directory cache with {} entries", cache.len());
Some(cache)
} else {
None
};
let checksum_db = if self.checksum && self.checksum_db {
match checksumdb::ChecksumDatabase::open(destination) {
Ok(db) => {
tracing::debug!("Opened checksum database");
if self.clear_checksum_db && !self.dry_run {
if let Err(e) = db.clear() {
tracing::warn!("Failed to clear checksum database: {}", e);
} else {
tracing::info!("Cleared checksum database");
}
}
Some(db)
}
Err(e) => {
tracing::warn!("Failed to open checksum database: {}", e);
None
}
}
} else {
None
};
let can_use_cache = if let Some(ref cache) = dir_cache {
if let Ok(source_meta) = std::fs::metadata(source) {
if let Ok(source_mtime) = source_meta.modified() {
let source_path = PathBuf::from(".");
!cache.needs_rescan(&source_path, source_mtime)
} else {
false
}
} else {
false
}
} else {
false
};
if let Some(ref monitor) = self.perf_monitor {
monitor.lock().unwrap().start_scan();
}
let all_files = if can_use_cache {
if let Some(ref cache) = dir_cache {
if let Some(cached_files) = cache.get_cached_files(&PathBuf::from(".")) {
let file_count = cached_files.len();
tracing::info!("Using cached scan results ({} files) - source unchanged", file_count);
cached_files.iter().map(|cf| cf.to_file_entry(source)).collect()
} else {
tracing::debug!("No cached files found, performing full scan");
self.transport.scan(source).await?
}
} else {
self.transport.scan(source).await?
}
} else {
tracing::debug!("Scanning source directory (cache miss or disabled)...");
self.transport.scan(source).await?
};
let total_scanned = all_files.len();
if can_use_cache {
tracing::info!("Retrieved {} items from cache", total_scanned);
} else {
tracing::info!("Found {} items in source", total_scanned);
}
self.transport.prepare_for_transfer(total_scanned).await?;
if let Some(ref mut cache) = dir_cache {
use crate::sync::dircache::CachedFile;
use std::collections::HashMap;
let mut files_by_dir: HashMap<PathBuf, Vec<CachedFile>> = HashMap::new();
for file in &all_files {
if file.is_dir {
cache.update((*file.relative_path).clone(), file.modified);
}
let dir_path = if file.is_dir {
(*file.relative_path).clone()
} else {
file.relative_path.parent().map(|p| p.to_path_buf()).unwrap_or_else(|| PathBuf::from("."))
};
files_by_dir.entry(dir_path).or_default().push(CachedFile::from_file_entry(file));
}
let total_files: usize = files_by_dir.values().map(|v| v.len()).sum();
for (dir_path, files) in files_by_dir {
cache.cache_files(dir_path, files);
}
tracing::debug!("Updated directory cache with {} directories, {} files", cache.len(), total_files);
}
let mut excluded_dirs: Vec<PathBuf> = Vec::new();
let source_files: Vec<_> = all_files
.into_iter()
.filter(|file| {
for excluded_dir in &excluded_dirs {
if file.relative_path.starts_with(excluded_dir) {
tracing::debug!("Filtering out (parent excluded): {}", file.relative_path.display());
return false;
}
}
if self.should_exclude(&file.relative_path, file.is_dir) {
tracing::debug!("Filtering out (excluded): {}", file.relative_path.display());
if file.is_dir {
excluded_dirs.push((*file.relative_path).clone());
}
return false;
}
if file.is_dir {
return true;
}
if self.should_filter_by_size(file.size) {
tracing::debug!("Filtering out (size): {}", file.relative_path.display());
return false;
}
true
})
.collect();
if source_files.len() < total_scanned {
let filtered_count = total_scanned - source_files.len();
tracing::info!("Filtered out {} files", filtered_count);
}
if let Some(ref monitor) = self.perf_monitor {
monitor.lock().unwrap().end_scan();
}
tracing::debug!("Scan completed, about to check resources");
if !self.dry_run {
let bytes_needed: u64 = source_files.iter().filter(|f| !f.is_dir).map(|f| f.size).sum();
self.transport.check_disk_space(destination, bytes_needed).await?;
resource::check_fd_limits(self.max_concurrent)?;
}
tracing::debug!("Resource checks completed, loading resume state");
let current_flags = SyncFlags {
delete: self.delete,
exclude: vec![], min_size: self.min_size,
max_size: self.max_size,
};
let resume_state = if self.resume {
match ResumeState::load(destination)? {
Some(state) => {
if state.is_compatible_with(¤t_flags) {
let (completed, total) = state.progress();
tracing::info!("Resuming sync: {} of {} files already completed", completed, total);
if !self.quiet {
println!("📋 Resuming previous sync ({}/{} files completed)", completed, total);
}
Some(state)
} else {
tracing::warn!("Resume state incompatible (flags changed), starting fresh");
if !self.quiet {
println!("⚠️ Resume state incompatible, starting fresh sync");
}
ResumeState::delete(destination)?;
Some(ResumeState::new(source.to_path_buf(), destination.to_path_buf(), current_flags, source_files.len()))
}
}
None => {
Some(ResumeState::new(source.to_path_buf(), destination.to_path_buf(), current_flags, source_files.len()))
}
}
} else {
None
};
let completed_paths = resume_state.as_ref().map(|s| s.completed_paths()).unwrap_or_default();
tracing::debug!("Resume state loaded, starting plan timing");
if let Some(ref monitor) = self.perf_monitor {
monitor.lock().unwrap().start_plan();
}
let planner = StrategyPlanner::with_comparison_flags(self.ignore_times, self.size_only, self.checksum, self.update_only, self.ignore_existing);
tracing::debug!("Starting to plan {} tasks", source_files.len());
tracing::debug!("Filtering completed files...");
let files_to_plan: Vec<_> = source_files
.iter()
.filter(|file| {
if !completed_paths.is_empty() && completed_paths.contains(&**file.relative_path) {
tracing::debug!("Skipping completed file: {}", file.relative_path.display());
false
} else {
true
}
})
.collect();
let total_to_plan = files_to_plan.len();
tracing::debug!("Filtered to {} files to plan", total_to_plan);
let plan_pb = if self.quiet {
ProgressBar::hidden()
} else {
let pb = ProgressBar::new_spinner();
pb.set_style(ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap());
pb.enable_steady_tick(std::time::Duration::from_millis(100));
pb
};
plan_pb.set_message(format!("Scanning destination ({} source files)...", total_to_plan));
tracing::debug!("Starting destination scan...");
let dest_files = self.transport.scan(destination).await.unwrap_or_else(|e| {
tracing::debug!("Destination scan failed (may not exist yet): {}", e);
Vec::new()
});
tracing::debug!("Destination scan complete: {} files found", dest_files.len());
let dest_map: std::collections::HashMap<std::path::PathBuf, scanner::FileEntry> =
dest_files.into_iter().map(|f| ((*f.relative_path).clone(), f)).collect();
plan_pb.set_message(format!("Comparing {} source files against {} destination files...", total_to_plan, dest_map.len()));
let mut tasks: Vec<strategy::SyncTask> = Vec::with_capacity(total_to_plan);
for (i, file) in files_to_plan.iter().enumerate() {
let task = planner.plan_file_with_dest_map(file, destination, &dest_map);
tasks.push(task);
if i % 10000 == 0 {
plan_pb.set_message(format!("Comparing: {}/{} files ({:.0}%)", i, total_to_plan, (i as f64 / total_to_plan as f64) * 100.0));
}
}
plan_pb.finish_and_clear();
if self.delete {
let deletions = planner.plan_deletions(&source_files, destination);
if !deletions.is_empty() {
let dest_file_count = scanner::Scanner::new(destination).scan().map(|files| files.len()).unwrap_or(0);
if dest_file_count > 0 && !self.force_delete {
let delete_percentage = (deletions.len() as f64 / dest_file_count as f64) * 100.0;
if delete_percentage > self.delete_threshold as f64 {
tracing::error!(
"Refusing to delete {:.1}% of destination files ({} files). Threshold: {}%. Use --force-delete to override.",
delete_percentage,
deletions.len(),
self.delete_threshold
);
if !self.quiet {
eprintln!(
"⚠️ ERROR: Would delete {:.1}% of files ({}/{}), exceeding threshold of {}%",
delete_percentage,
deletions.len(),
dest_file_count,
self.delete_threshold
);
eprintln!("Use --force-delete to skip safety checks (dangerous!)");
}
return Err(crate::error::SyncError::Io(std::io::Error::other(format!(
"Deletion threshold exceeded: {:.1}% > {}%",
delete_percentage, self.delete_threshold
))));
}
}
const CATASTROPHIC_THRESHOLD: usize = 10000;
if deletions.len() > CATASTROPHIC_THRESHOLD && !self.quiet && !self.json && !self.dry_run {
let warning_msg = if self.force_delete {
format!(
"🚨 CRITICAL WARNING: About to delete {} files with --force-delete!\n\
This will PERMANENTLY DELETE a large amount of data.\n\
Type 'DELETE {}' to confirm (case-sensitive): ",
deletions.len(),
deletions.len()
)
} else {
format!("⚠️ WARNING: About to delete {} files. Continue? [y/N] ", deletions.len())
};
eprintln!("{}", warning_msg);
use std::io::IsTerminal;
if !std::io::stdin().is_terminal() {
return Err(crate::error::SyncError::Io(std::io::Error::other("Cannot prompt for deletion confirmation: stdin is not a terminal")));
}
let mut input = String::new();
std::io::stdin().read_line(&mut input)?;
let confirmed = if self.force_delete {
input.trim() == format!("DELETE {}", deletions.len())
} else {
input.trim().eq_ignore_ascii_case("y")
};
if !confirmed {
tracing::info!("Deletion cancelled by user");
return Err(crate::error::SyncError::Io(std::io::Error::other("Deletion cancelled by user")));
}
} else if deletions.len() > 1000 && !self.force_delete && !self.quiet && !self.json && !self.dry_run {
eprintln!("⚠️ WARNING: About to delete {} files. Continue? [y/N] ", deletions.len());
use std::io::IsTerminal;
if !std::io::stdin().is_terminal() {
return Err(crate::error::SyncError::Io(std::io::Error::other("Cannot prompt for deletion confirmation: stdin is not a terminal")));
}
let mut input = String::new();
std::io::stdin().read_line(&mut input)?;
if !input.trim().eq_ignore_ascii_case("y") {
tracing::info!("Deletion cancelled by user");
return Err(crate::error::SyncError::Io(std::io::Error::other("Deletion cancelled by user")));
}
}
}
tasks.extend(deletions);
}
if let Some(ref monitor) = self.perf_monitor {
monitor.lock().unwrap().end_plan();
}
if self.json {
SyncEvent::Start { source: source.to_path_buf(), destination: destination.to_path_buf(), total_files: tasks.len() }.emit();
}
let resume_state = Arc::new(Mutex::new(resume_state));
let _checkpoint_files = self.checkpoint_files;
let _checkpoint_bytes = self.checkpoint_bytes;
let stats = Arc::new(Mutex::new(SyncStats {
files_scanned: source_files.len() as u64,
files_created: 0,
files_updated: 0,
files_skipped: 0,
files_deleted: 0,
bytes_transferred: 0,
files_delta_synced: 0,
delta_bytes_saved: 0,
files_compressed: 0,
compression_bytes_saved: 0,
files_verified: 0,
verification_failures: 0,
duration: Duration::ZERO,
bytes_would_add: 0,
bytes_would_change: 0,
bytes_would_delete: 0,
dirs_created: 0,
symlinks_created: 0,
errors: Vec::new(),
}));
let total_bytes: u64 = tasks
.iter()
.filter(|t| !matches!(t.action, SyncAction::Skip | SyncAction::Delete))
.map(|t| t.source.as_ref().map(|f| if f.is_dir { 0 } else { f.size }).unwrap_or(0))
.sum();
let pb = if self.quiet {
ProgressBar::hidden()
} else {
let pb = ProgressBar::new(total_bytes);
pb.set_style(
ProgressStyle::default_bar()
.template("{msg}\n{spinner:.green} [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})")
.unwrap()
.progress_chars("#>-"),
);
pb.enable_steady_tick(std::time::Duration::from_millis(100));
pb
};
let rate_limiter = self.bwlimit.map(|limit| Arc::new(Mutex::new(RateLimiter::new(limit))));
let hardlink_map = Arc::new(Mutex::new(std::collections::HashMap::new()));
if let Some(ref monitor) = self.perf_monitor {
monitor.lock().unwrap().start_transfer();
}
if !self.dry_run {
let unique_dirs: std::collections::HashSet<_> = tasks
.iter()
.filter(|t| matches!(t.action, SyncAction::Create | SyncAction::Update))
.filter_map(|t| t.dest_path.parent())
.filter(|p| !p.as_os_str().is_empty())
.collect();
if !unique_dirs.is_empty() {
let dir_refs: Vec<&std::path::Path> = unique_dirs.into_iter().collect();
tracing::info!("Pre-creating {} directories in batch", dir_refs.len());
if let Err(e) = self.transport.create_dirs_batch(&dir_refs).await {
tracing::warn!("Batch directory creation failed, will create per-file: {}", e);
}
}
}
let mut files_since_checkpoint = 0;
let mut bytes_since_checkpoint = 0;
let transfer_futures = tasks.into_iter().map(|task| {
let transport = Arc::clone(&self.transport);
let dry_run = self.dry_run;
let diff_mode = self.diff_mode;
let _json = self.json;
let pb = pb.clone();
let rate_limiter = rate_limiter.clone();
let verification_mode = self.verification_mode;
let verify_on_write = self.verify_on_write;
let symlink_mode = self.symlink_mode;
let preserve_xattrs = self.preserve_xattrs;
let preserve_hardlinks = self.preserve_hardlinks;
let preserve_acls = self.preserve_acls;
let preserve_flags = self.preserve_flags;
let per_file_progress = self.per_file_progress && !self.quiet;
let hardlink_map = Arc::clone(&hardlink_map);
let _perf_monitor = self.perf_monitor.clone();
async move {
let transferrer = Transferrer::new(
transport.as_ref(),
dry_run,
diff_mode,
symlink_mode,
preserve_xattrs,
preserve_hardlinks,
preserve_acls,
preserve_flags,
per_file_progress,
hardlink_map,
);
let verifier = IntegrityVerifier::new(verification_mode, verify_on_write);
let filename = task.dest_path.file_name().and_then(|n| n.to_str()).unwrap_or_else(|| task.dest_path.to_str().unwrap_or(""));
let msg = match &task.action {
SyncAction::Create => format!("Creating: {}", filename),
SyncAction::Update => format!("Updating: {}", filename),
SyncAction::Skip => format!("Skipping: {}", filename),
SyncAction::Delete => format!("Deleting: {}", filename),
};
if !matches!(task.action, SyncAction::Skip) {
pb.set_message(msg);
}
struct TaskResult {
task: crate::sync::strategy::SyncTask,
bytes_written: u64,
transfer_result: Option<crate::transport::TransferResult>,
_error: Option<String>,
verified: bool,
}
let result = match task.action {
SyncAction::Create => {
if let Some(source) = &task.source {
match transferrer.create(source, &task.dest_path).await {
Ok(transfer_result) => {
let bytes_written = transfer_result.as_ref().map(|r| r.bytes_written).unwrap_or(0);
if let Some(ref limiter) = rate_limiter
&& bytes_written > 0
{
let sleep_duration = limiter.lock().unwrap().consume(bytes_written);
if sleep_duration > Duration::ZERO {
tokio::time::sleep(sleep_duration).await;
}
}
let mut verified = true;
if verification_mode != ChecksumType::None && !dry_run && !source.is_dir {
let source_path = &source.path;
let dest_path = &task.dest_path;
let verification_result = if !dest_path.exists() {
match transport.compute_checksum(source_path, &verifier).await {
Ok(sc) => match transport.compute_checksum(dest_path, &verifier).await {
Ok(dc) => Ok(sc == dc),
Err(e) => Err(e),
},
Err(e) => Err(e),
}
} else {
verifier.verify_transfer(source_path, dest_path)
};
if let Ok(v) = verification_result {
verified = v;
} else {
verified = false;
}
}
Ok(TaskResult { task: task.clone(), bytes_written, transfer_result, _error: None, verified })
}
Err(e) => Err((task.clone(), e)),
}
} else {
Ok(TaskResult { task: task.clone(), bytes_written: 0, transfer_result: None, _error: None, verified: true })
}
}
SyncAction::Update => {
if let Some(source) = &task.source {
match transferrer.update(source, &task.dest_path).await {
Ok(transfer_result) => {
let bytes_written = transfer_result.as_ref().map(|r| r.bytes_written).unwrap_or(0);
if let Some(ref limiter) = rate_limiter
&& bytes_written > 0
{
let sleep_duration = limiter.lock().unwrap().consume(bytes_written);
if sleep_duration > Duration::ZERO {
tokio::time::sleep(sleep_duration).await;
}
}
let mut verified = true;
if verification_mode != ChecksumType::None && !dry_run && !source.is_dir {
let source_path = &source.path;
let dest_path = &task.dest_path;
let verification_result = if !dest_path.exists() {
match transport.compute_checksum(source_path, &verifier).await {
Ok(sc) => match transport.compute_checksum(dest_path, &verifier).await {
Ok(dc) => Ok(sc == dc),
Err(e) => Err(e),
},
Err(e) => Err(e),
}
} else {
verifier.verify_transfer(source_path, dest_path)
};
if let Ok(v) = verification_result {
verified = v;
} else {
verified = false;
}
}
Ok(TaskResult { task: task.clone(), bytes_written, transfer_result, _error: None, verified })
}
Err(e) => Err((task.clone(), e)),
}
} else {
Ok(TaskResult { task: task.clone(), bytes_written: 0, transfer_result: None, _error: None, verified: true })
}
}
SyncAction::Delete => {
let is_dir = task.dest_path.is_dir();
match transferrer.delete(&task.dest_path, is_dir).await {
Ok(_) => Ok(TaskResult { task: task.clone(), bytes_written: 0, transfer_result: None, _error: None, verified: true }),
Err(e) => Err((task.clone(), e)),
}
}
SyncAction::Skip => Ok(TaskResult { task: task.clone(), bytes_written: 0, transfer_result: None, _error: None, verified: true }),
};
let bytes_for_progress = match &task.action {
SyncAction::Create | SyncAction::Update => task.source.as_ref().map(|f| f.size).unwrap_or(0),
_ => 0,
};
pb.inc(bytes_for_progress);
result
}
});
let mut stream = futures::stream::iter(transfer_futures).buffer_unordered(self.max_concurrent);
while let Some(result) = stream.next().await {
match result {
Ok(res) => {
let mut s = stats.lock().unwrap();
let task = &res.task;
match task.action {
SyncAction::Create => {
s.files_created += 1;
s.bytes_transferred += res.bytes_written;
if self.dry_run
&& let Some(src) = &task.source
&& !src.is_dir
{
s.bytes_would_add += src.size;
}
if let Some(monitor) = &self.perf_monitor {
monitor.lock().unwrap().add_file_created();
monitor.lock().unwrap().add_bytes_transferred(res.bytes_written);
if !task.source.as_ref().map(|s| s.is_dir).unwrap_or(false) {
monitor.lock().unwrap().add_bytes_read(task.source.as_ref().map(|s| s.size).unwrap_or(0));
}
}
if let Some(ref tr) = res.transfer_result
&& tr.compression_used
{
s.files_compressed += 1;
if let Some(transferred) = tr.transferred_bytes {
s.compression_bytes_saved += res.bytes_written.saturating_sub(transferred);
}
}
if self.json {
SyncEvent::Create {
path: task.dest_path.clone(),
size: task.source.as_ref().map(|s| s.size).unwrap_or(0),
bytes_transferred: res.bytes_written,
}
.emit();
}
}
SyncAction::Update => {
s.files_updated += 1;
s.bytes_transferred += res.bytes_written;
if self.dry_run
&& let Some(src) = &task.source
&& !src.is_dir
{
s.bytes_would_change += src.size;
}
if let Some(monitor) = &self.perf_monitor {
monitor.lock().unwrap().add_file_updated();
monitor.lock().unwrap().add_bytes_transferred(res.bytes_written);
if !task.source.as_ref().map(|s| s.is_dir).unwrap_or(false) {
monitor.lock().unwrap().add_bytes_read(task.source.as_ref().map(|s| s.size).unwrap_or(0));
}
}
if let Some(tr) = res.transfer_result.as_ref() {
if crate::transport::TransferResult::used_delta(tr) {
s.files_delta_synced += 1;
if let Some(literal) = tr.literal_bytes {
s.delta_bytes_saved += res.bytes_written.saturating_sub(literal);
}
}
if tr.compression_used {
s.files_compressed += 1;
if let Some(transferred) = tr.transferred_bytes {
s.compression_bytes_saved += res.bytes_written.saturating_sub(transferred);
}
}
}
if self.json {
let delta_used = res.transfer_result.as_ref().map(|r| r.used_delta()).unwrap_or(false);
SyncEvent::Update {
path: task.dest_path.clone(),
size: task.source.as_ref().map(|s| s.size).unwrap_or(0),
bytes_transferred: res.bytes_written,
delta_used,
}
.emit();
}
}
SyncAction::Skip => {
s.files_skipped += 1;
if self.json {
SyncEvent::Skip { path: task.dest_path.clone(), reason: "up_to_date".to_string() }.emit();
}
}
SyncAction::Delete => {
s.files_deleted += 1;
if self.dry_run && !task.dest_path.is_dir() {
}
if let Some(monitor) = &self.perf_monitor {
monitor.lock().unwrap().add_file_deleted();
}
if self.json {
SyncEvent::Delete { path: task.dest_path.clone() }.emit();
}
}
}
if !res.verified {
s.verification_failures += 1;
tracing::warn!("Verification failed for {}", task.dest_path.display());
} else if self.verification_mode != ChecksumType::None
&& !self.dry_run && matches!(task.action, SyncAction::Create | SyncAction::Update)
&& task.source.as_ref().map(|s| !s.is_dir).unwrap_or(false)
{
s.files_verified += 1;
}
if self.resume
&& !self.dry_run && matches!(task.action, SyncAction::Create | SyncAction::Update)
&& let Ok(mut state_guard) = resume_state.lock()
&& let Some(state) = state_guard.as_mut()
{
let rel_path = task.dest_path.strip_prefix(destination).unwrap_or(&task.dest_path).to_path_buf();
state.add_completed_file(
resume::CompletedFile {
relative_path: rel_path,
action: match task.action {
SyncAction::Create => "create".to_string(),
SyncAction::Update => "update".to_string(),
_ => "unknown".to_string(),
},
size: task.source.as_ref().map(|s| s.size).unwrap_or(0),
checksum: "none".to_string(), completed_at: resume::format_timestamp(SystemTime::now()),
},
res.bytes_written,
);
files_since_checkpoint += 1;
bytes_since_checkpoint += res.bytes_written;
if files_since_checkpoint >= self.checkpoint_files || bytes_since_checkpoint >= self.checkpoint_bytes {
tracing::debug!("Checkpointing resume state ({} files, {} bytes)", files_since_checkpoint, bytes_since_checkpoint);
if !self.dest_is_remote
&& let Err(e) = state.save(destination)
{
tracing::warn!("Failed to save checkpoint: {}", e);
}
files_since_checkpoint = 0;
bytes_since_checkpoint = 0;
}
}
}
Err((task, e)) => {
let mut s = stats.lock().unwrap();
s.errors.push(SyncError {
path: task.dest_path.clone(),
error: e.to_string(),
action: match task.action {
SyncAction::Create => "create".to_string(),
SyncAction::Update => "update".to_string(),
SyncAction::Delete => "delete".to_string(),
SyncAction::Skip => "skip".to_string(),
},
});
tracing::error!("Sync error for {}: {}", task.dest_path.display(), e);
if self.max_errors > 0 && s.errors.len() >= self.max_errors {
tracing::error!("Max errors exceeded. Aborting.");
pb.finish_with_message("Aborted due to errors");
return Err(crate::error::SyncError::Io(std::io::Error::other("Max errors exceeded")));
}
}
}
}
if let Some(ref monitor) = self.perf_monitor {
monitor.lock().unwrap().end_transfer();
}
pb.finish_with_message("Sync complete");
let mut final_stats = Arc::try_unwrap(stats).unwrap().into_inner().unwrap();
if !final_stats.errors.is_empty() {
tracing::warn!("Sync completed with {} errors", final_stats.errors.len());
if !self.quiet && !self.json {
use colored::Colorize;
eprintln!("\n{}", "⚠️ Errors occurred during sync:".red().bold());
eprintln!();
for (i, err) in final_stats.errors.iter().enumerate() {
eprintln!(
" {}. {} {}",
(i + 1).to_string().bright_black(),
format!("[{}]", err.action).yellow(),
err.path.display().to_string().white()
);
eprintln!(" {}", err.error.bright_black());
if i < final_stats.errors.len() - 1 {
eprintln!();
}
}
eprintln!();
eprintln!("{}", format!("Total errors: {}", final_stats.errors.len()).red());
eprintln!();
}
}
final_stats.duration = start_time.elapsed();
tracing::info!(
"Sync complete: {} created, {} updated, {} skipped, {} deleted, took {:.2}s",
final_stats.files_created,
final_stats.files_updated,
final_stats.files_skipped,
final_stats.files_deleted,
final_stats.duration.as_secs_f64()
);
if self.json {
SyncEvent::Summary {
files_created: final_stats.files_created as usize,
files_updated: final_stats.files_updated as usize,
files_skipped: final_stats.files_skipped,
files_deleted: final_stats.files_deleted,
bytes_transferred: final_stats.bytes_transferred,
duration_secs: final_stats.duration.as_secs_f64(),
files_verified: final_stats.files_verified,
verification_failures: final_stats.verification_failures,
}
.emit();
if let Some(perf_metrics) = self.get_performance_metrics() {
SyncEvent::Performance {
total_duration_secs: perf_metrics.total_duration.as_secs_f64(),
scan_duration_secs: perf_metrics.scan_duration.as_secs_f64(),
plan_duration_secs: perf_metrics.plan_duration.as_secs_f64(),
transfer_duration_secs: perf_metrics.transfer_duration.as_secs_f64(),
bytes_transferred: perf_metrics.bytes_transferred,
bytes_read: perf_metrics.bytes_read,
files_processed: perf_metrics.files_processed,
files_created: perf_metrics.files_created,
files_updated: perf_metrics.files_updated,
files_deleted: perf_metrics.files_deleted,
directories_created: perf_metrics.directories_created,
avg_transfer_speed: perf_metrics.avg_transfer_speed,
peak_transfer_speed: perf_metrics.peak_transfer_speed,
files_per_second: perf_metrics.files_per_second,
bandwidth_utilization: perf_metrics.bandwidth_utilization,
}
.emit();
}
}
if let Ok(mut state_guard) = resume_state.lock() {
if state_guard.is_some() {
if ResumeState::load(destination)?.is_some() {
tracing::debug!("Cleaning up resume state file");
if let Err(e) = ResumeState::delete(destination) {
tracing::warn!("Failed to delete resume state: {}", e);
}
}
}
*state_guard = None;
}
if self.use_cache
&& !self.dry_run
&& let Some(ref cache) = dir_cache
{
if !self.dest_is_remote {
if let Err(e) = cache.save(destination) {
tracing::warn!("Failed to save directory cache: {}", e);
} else {
tracing::debug!("Saved directory cache with {} entries", cache.len());
}
} else {
tracing::debug!("Skipping cache save - destination directory doesn't exist");
}
}
if let Some(ref db) = checksum_db
&& !self.dry_run
{
let mut stored_count = 0;
let verifier = IntegrityVerifier::new(if self.checksum { ChecksumType::Fast } else { ChecksumType::None }, false);
for file in &source_files {
if file.is_dir {
continue; }
if let Ok(checksum) = verifier.compute_file_checksum(&file.path) {
if let Err(e) = db.store_checksum(&file.path, file.modified, file.size, &checksum) {
tracing::warn!("Failed to store checksum for {}: {}", file.path.display(), e);
} else {
stored_count += 1;
}
}
}
if stored_count > 0 {
tracing::info!("Stored {} checksums in database", stored_count);
}
if self.prune_checksum_db {
use std::collections::HashSet;
let existing_paths: HashSet<_> = source_files.iter().map(|f| (*f.path).clone()).collect();
match db.prune(&existing_paths) {
Ok(pruned) => {
if pruned > 0 {
tracing::info!("Pruned {} stale entries from checksum database", pruned);
}
}
Err(e) => {
tracing::warn!("Failed to prune checksum database: {}", e);
}
}
}
}
Ok(final_stats)
}
pub async fn sync_streaming(&self, source: &Path, destination: &Path) -> Result<SyncStats> {
let start_time = std::time::Instant::now();
tracing::info!("Starting streaming sync: {} → {}", source.display(), destination.display());
if let Err(e) = crate::resume::TransferState::clear_stale_states(std::time::Duration::from_secs(7 * 24 * 60 * 60)) {
tracing::warn!("Failed to clean up stale resume states: {}", e);
}
if !self.dry_run {
self.transport.create_dir_all(destination).await?;
}
if let Some(ref monitor) = self.perf_monitor {
monitor.lock().unwrap().start_scan();
}
let pb = if self.quiet {
ProgressBar::hidden()
} else {
let pb = ProgressBar::new_spinner();
pb.set_style(ProgressStyle::default_spinner().template("{msg}\n{spinner:.green} {pos} files processed").unwrap());
pb.enable_steady_tick(std::time::Duration::from_millis(100));
pb
};
let stats = Arc::new(Mutex::new(SyncStats {
files_scanned: 0,
files_created: 0,
files_updated: 0,
files_skipped: 0,
files_deleted: 0,
bytes_transferred: 0,
files_delta_synced: 0,
delta_bytes_saved: 0,
files_compressed: 0,
compression_bytes_saved: 0,
files_verified: 0,
verification_failures: 0,
duration: Duration::ZERO,
bytes_would_add: 0,
bytes_would_change: 0,
bytes_would_delete: 0,
dirs_created: 0,
symlinks_created: 0,
errors: Vec::new(),
}));
let planner = Arc::new(StrategyPlanner::with_comparison_flags(
self.ignore_times,
self.size_only,
self.checksum,
self.update_only,
self.ignore_existing,
));
let hardlink_map = Arc::new(Mutex::new(std::collections::HashMap::new()));
let rate_limiter = self.bwlimit.map(|limit| Arc::new(Mutex::new(RateLimiter::new(limit))));
let bloom_filter = Arc::new(Mutex::new(FileSetBloom::new(1_000_000)));
let plan_concurrency = self.max_concurrent * 2;
let transfer_concurrency = self.max_concurrent;
let source_stream = self.transport.scan_streaming(source).await?;
source_stream
.map(|entry_result| {
match entry_result {
Ok(file) => {
bloom_filter.lock().unwrap().insert(&file.relative_path);
if self.should_exclude(&file.relative_path, file.is_dir) {
return None;
}
if self.should_filter_by_size(file.size) {
return None;
}
Some(Ok(file))
}
Err(e) => Some(Err(e)),
}
})
.filter_map(|x| async move { x }) .map(|entry_result| {
let planner = planner.clone();
let transport = self.transport.clone();
let destination = destination.to_path_buf();
let stats = stats.clone();
let pb = pb.clone();
let _json = self.json;
async move {
let file = match entry_result {
Ok(f) => f,
Err(e) => return Err(e),
};
{
let mut s = stats.lock().unwrap();
s.files_scanned += 1;
}
pb.set_message(format!("Scanning: {}", file.relative_path.display()));
pb.inc(1);
let task = planner.plan_file_async(&file, &destination, &transport, None).await?;
Ok(task)
}
})
.buffer_unordered(plan_concurrency) .map(|task_result| {
let transport = self.transport.clone();
let stats = stats.clone();
let pb = pb.clone();
let dry_run = self.dry_run;
let diff_mode = self.diff_mode;
let json = self.json;
let verification_mode = self.verification_mode;
let verify_on_write = self.verify_on_write;
let symlink_mode = self.symlink_mode;
let preserve_xattrs = self.preserve_xattrs;
let preserve_hardlinks = self.preserve_hardlinks;
let preserve_acls = self.preserve_acls;
let preserve_flags = self.preserve_flags;
let per_file_progress = self.per_file_progress && !self.quiet;
let hardlink_map = hardlink_map.clone();
let rate_limiter = rate_limiter.clone();
let perf_monitor = self.perf_monitor.clone();
let task = match task_result {
Ok(t) => t,
Err(e) => return futures::future::ready(Err(e)).boxed(),
};
if matches!(task.action, SyncAction::Skip) {
{
let mut s = stats.lock().unwrap();
s.files_skipped += 1;
}
if json {
SyncEvent::Skip { path: task.dest_path.clone(), reason: "up_to_date".to_string() }.emit();
}
return futures::future::ready(Ok(())).boxed();
}
async move {
let transferrer = Transferrer::new(
transport.as_ref(),
dry_run,
diff_mode,
symlink_mode,
preserve_xattrs,
preserve_hardlinks,
preserve_acls,
preserve_flags,
per_file_progress,
hardlink_map,
);
let _verifier = IntegrityVerifier::new(verification_mode, verify_on_write);
let filename = task.dest_path.file_name().and_then(|n| n.to_str()).unwrap_or_else(|| task.dest_path.to_str().unwrap_or(""));
let msg = match &task.action {
SyncAction::Create => format!("Creating: {}", filename),
SyncAction::Update => format!("Updating: {}", filename),
SyncAction::Skip => format!("Skipping: {}", filename),
SyncAction::Delete => format!("Deleting: {}", filename),
};
if !matches!(task.action, SyncAction::Skip) {
pb.set_message(msg);
}
match task.action {
SyncAction::Create => {
if let Some(source) = &task.source {
match transferrer.create(source, &task.dest_path).await {
Ok(transfer_result) => {
let bytes_written = if let Some(ref result) = transfer_result { result.bytes_written } else { 0 };
{
let mut stats = stats.lock().unwrap();
stats.bytes_transferred += bytes_written;
stats.files_created += 1;
if let Some(monitor) = &perf_monitor {
monitor.lock().unwrap().add_file_created();
monitor.lock().unwrap().add_bytes_transferred(bytes_written);
if !source.is_dir {
monitor.lock().unwrap().add_bytes_read(source.size);
}
}
if dry_run && !source.is_dir {
stats.bytes_would_add += source.size;
}
}
if let Some(ref limiter) = rate_limiter
&& bytes_written > 0
{
let sleep_duration = limiter.lock().unwrap().consume(bytes_written);
if sleep_duration > Duration::ZERO {
tokio::time::sleep(sleep_duration).await;
}
}
if json {
SyncEvent::Create { path: task.dest_path.clone(), size: source.size, bytes_transferred: bytes_written }.emit();
}
Ok(())
}
Err(e) => {
let mut stats = stats.lock().unwrap();
stats.errors.push(SyncError { path: task.dest_path.clone(), error: e.to_string(), action: "create".to_string() });
Err(e)
}
}
} else {
Ok(())
}
}
SyncAction::Update => {
if let Some(source) = &task.source {
match transferrer.update(source, &task.dest_path).await {
Ok(transfer_result) => {
let bytes_written = if let Some(ref result) = transfer_result { result.bytes_written } else { 0 };
{
let mut stats = stats.lock().unwrap();
if let Some(ref result) = transfer_result {
stats.bytes_transferred += result.bytes_written;
if result.used_delta() {
stats.files_delta_synced += 1;
if let Some(literal) = result.literal_bytes {
stats.delta_bytes_saved += result.bytes_written.saturating_sub(literal);
}
}
}
stats.files_updated += 1;
if let Some(monitor) = &perf_monitor {
monitor.lock().unwrap().add_file_updated();
monitor.lock().unwrap().add_bytes_transferred(bytes_written);
if !source.is_dir {
monitor.lock().unwrap().add_bytes_read(source.size);
}
}
if dry_run && !source.is_dir {
stats.bytes_would_change += source.size;
}
}
if let Some(ref limiter) = rate_limiter
&& bytes_written > 0
{
let sleep_duration = limiter.lock().unwrap().consume(bytes_written);
if sleep_duration > Duration::ZERO {
tokio::time::sleep(sleep_duration).await;
}
}
if json {
let delta_used = transfer_result.as_ref().map(|r| r.used_delta()).unwrap_or(false);
SyncEvent::Update { path: task.dest_path.clone(), size: source.size, bytes_transferred: bytes_written, delta_used }
.emit();
}
Ok(())
}
Err(e) => {
let mut stats = stats.lock().unwrap();
stats.errors.push(SyncError { path: task.dest_path.clone(), error: e.to_string(), action: "update".to_string() });
Err(e)
}
}
} else {
Ok(())
}
}
_ => Ok(()), }
}
.boxed()
})
.buffer_unordered(transfer_concurrency)
.collect::<Vec<_>>() .await;
if self.delete {
if let Ok(dest_stream) = self.transport.scan_streaming(destination).await {
let bloom = bloom_filter.lock().unwrap().clone();
let transport = self.transport.clone();
let stats = stats.clone();
let pb = pb.clone();
let dry_run = self.dry_run;
let json = self.json;
let _force_delete = self.force_delete;
let _delete_threshold = self.delete_threshold;
dest_stream
.filter_map(|res| async { res.ok() }) .filter(|dest_file| {
futures::future::ready(!bloom.contains(&dest_file.relative_path))
})
.map(|dest_file| {
let transport = transport.clone();
let stats = stats.clone();
let pb = pb.clone();
let path = dest_file.path.clone(); let is_dir = dest_file.is_dir;
async move {
let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
pb.set_message(format!("Deleting: {}", filename));
if dry_run {
{
let mut s = stats.lock().unwrap();
s.bytes_would_delete += dest_file.size;
s.files_deleted += 1;
}
if json {
SyncEvent::Delete { path: (*path).clone() }.emit();
}
return Ok(());
}
match transport.remove(&path, is_dir).await {
Ok(_) => {
{
let mut s = stats.lock().unwrap();
s.files_deleted += 1;
}
if json {
SyncEvent::Delete { path: (*path).clone() }.emit();
}
Ok(())
}
Err(e) => {
{
let mut s = stats.lock().unwrap();
s.errors.push(SyncError { path: (*path).clone(), error: e.to_string(), action: "delete".to_string() });
}
Err(e)
}
}
}
})
.buffer_unordered(self.max_concurrent)
.collect::<Vec<_>>()
.await;
}
}
pb.finish_with_message("Sync complete");
let mut final_stats = Arc::try_unwrap(stats).unwrap().into_inner().unwrap();
final_stats.duration = start_time.elapsed();
Ok(final_stats)
}
pub async fn verify(&self, source: &Path, destination: &Path) -> Result<VerificationResult> {
let start_time = std::time::Instant::now();
tracing::info!("Starting verification: {} ↔ {}", source.display(), destination.display());
if let Some(ref monitor) = self.perf_monitor {
monitor.lock().unwrap().start_scan();
}
let source_files = self.transport.scan(source).await?;
let dest_files = self.transport.scan(destination).await?;
if let Some(ref monitor) = self.perf_monitor {
monitor.lock().unwrap().end_scan();
}
tracing::info!("Found {} files in source, {} files in destination", source_files.len(), dest_files.len());
let mut dest_map = std::collections::HashMap::new();
for file in &dest_files {
let rel_path = file.path.strip_prefix(destination).unwrap_or(&file.path).to_path_buf();
dest_map.insert(rel_path, file);
}
let checksum_type = if self.checksum {
ChecksumType::Fast } else {
self.verification_mode };
let verifier = IntegrityVerifier::new(checksum_type, false);
let mut files_matched = 0;
let mut files_mismatched = Vec::new();
let mut files_only_in_source = Vec::new();
let mut errors = Vec::new();
for source_file in &source_files {
if source_file.is_dir {
continue;
}
if self.should_filter_by_size(source_file.size) {
continue;
}
let rel_path = source_file.path.strip_prefix(source).unwrap_or(&source_file.path).to_path_buf();
if let Some(dest_file) = dest_map.get(&rel_path) {
match self.compare_checksums(&source_file.path, &dest_file.path, &verifier).await {
Ok(true) => {
files_matched += 1;
if !self.quiet {
tracing::debug!("✓ {}", rel_path.display());
}
}
Ok(false) => {
files_mismatched.push(rel_path.clone());
tracing::warn!("✗ Mismatch: {}", rel_path.display());
}
Err(e) => {
errors.push(SyncError { path: rel_path.clone(), error: e.to_string(), action: "verify".to_string() });
tracing::error!("Error verifying {}: {}", rel_path.display(), e);
}
}
} else {
files_only_in_source.push(rel_path.clone());
tracing::info!("→ Only in source: {}", rel_path.display());
}
}
let mut files_only_in_dest = Vec::new();
for dest_file in &dest_files {
if dest_file.is_dir {
continue;
}
let rel_path = dest_file.path.strip_prefix(destination).unwrap_or(&dest_file.path).to_path_buf();
let source_path = source.join(&rel_path);
if !source_path.exists() {
files_only_in_dest.push(rel_path.clone());
tracing::info!("← Only in destination: {}", rel_path.display());
}
}
let duration = start_time.elapsed();
tracing::info!(
"Verification complete: {} matched, {} mismatched, {} only in source, {} only in dest, {} errors",
files_matched,
files_mismatched.len(),
files_only_in_source.len(),
files_only_in_dest.len(),
errors.len()
);
Ok(VerificationResult { files_matched, files_mismatched, files_only_in_source, files_only_in_dest, errors, duration })
}
async fn compare_checksums(&self, source_path: &Path, dest_path: &Path, verifier: &IntegrityVerifier) -> Result<bool> {
let source_checksum = self.transport.compute_checksum(source_path, verifier).await?;
let dest_checksum = self.transport.compute_checksum(dest_path, verifier).await?;
Ok(source_checksum == dest_checksum)
}
pub async fn sync_single_file(&self, source: &Path, destination: &Path) -> Result<SyncStats> {
let start_time = std::time::Instant::now();
tracing::info!("Starting single file sync: {} → {}", source.display(), destination.display());
let mut stats = SyncStats {
files_scanned: 1,
files_created: 0,
files_updated: 0,
files_skipped: 0,
files_deleted: 0,
bytes_transferred: 0,
files_delta_synced: 0,
delta_bytes_saved: 0,
files_compressed: 0,
compression_bytes_saved: 0,
files_verified: 0,
verification_failures: 0,
duration: Duration::ZERO,
bytes_would_add: 0,
bytes_would_change: 0,
bytes_would_delete: 0,
dirs_created: 0,
symlinks_created: 0,
errors: Vec::new(),
};
let dest_exists = self.transport.exists(destination).await?;
let hardlink_map = Arc::new(Mutex::new(std::collections::HashMap::new()));
let per_file_progress = self.per_file_progress && !self.quiet;
let transferrer = Transferrer::new(
self.transport.as_ref(),
self.dry_run,
self.diff_mode,
self.symlink_mode,
self.preserve_xattrs,
self.preserve_hardlinks,
self.preserve_acls,
self.preserve_flags,
per_file_progress,
hardlink_map,
);
if !dest_exists {
tracing::info!("Creating {}", destination.display());
let metadata = source.metadata()?;
let filename = source
.file_name()
.ok_or_else(|| crate::error::SyncError::Io(std::io::Error::other(format!("Invalid source path: {}", source.display()))))?
.to_owned();
if let Some(result) = transferrer
.create(
&FileEntry {
path: Arc::new(source.to_path_buf()),
relative_path: Arc::new(PathBuf::from(filename)),
size: metadata.len(),
modified: metadata.modified()?,
is_dir: false,
is_symlink: false,
symlink_target: None,
is_sparse: false,
allocated_size: metadata.len(),
xattrs: None,
inode: None,
nlink: 1,
acls: None,
bsd_flags: None,
},
destination,
)
.await?
{
stats.bytes_transferred = result.bytes_written;
if result.compression_used {
stats.files_compressed = 1;
if let Some(transferred) = result.transferred_bytes {
stats.compression_bytes_saved = result.bytes_written.saturating_sub(transferred);
}
}
}
stats.files_created = 1;
if self.verification_mode != ChecksumType::None && !self.dry_run {
let verifier = IntegrityVerifier::new(self.verification_mode, self.verify_on_write);
match verifier.verify_transfer(source, destination) {
Ok(verified) => {
if verified {
stats.files_verified = 1;
} else {
stats.verification_failures = 1;
tracing::warn!("Verification failed for {}: checksums do not match", destination.display());
}
}
Err(e) => {
tracing::warn!("Verification error for {}: {}", destination.display(), e);
stats.verification_failures = 1;
}
}
}
} else {
tracing::info!("Updating {}", destination.display());
let metadata = source.metadata()?;
let filename = source
.file_name()
.ok_or_else(|| crate::error::SyncError::Io(std::io::Error::other(format!("Invalid source path: {}", source.display()))))?
.to_owned();
if let Some(result) = transferrer
.update(
&FileEntry {
path: Arc::new(source.to_path_buf()),
relative_path: Arc::new(PathBuf::from(filename)),
size: metadata.len(),
modified: metadata.modified()?,
is_dir: false,
is_symlink: false,
symlink_target: None,
is_sparse: false,
allocated_size: metadata.len(),
xattrs: None,
inode: None,
nlink: 1,
acls: None,
bsd_flags: None,
},
destination,
)
.await?
{
stats.bytes_transferred = result.bytes_written;
if result.used_delta() {
stats.files_delta_synced = 1;
if let Some(literal_bytes) = result.literal_bytes {
stats.delta_bytes_saved = result.bytes_written.saturating_sub(literal_bytes);
}
}
if result.compression_used {
stats.files_compressed = 1;
if let Some(transferred) = result.transferred_bytes {
stats.compression_bytes_saved = result.bytes_written.saturating_sub(transferred);
}
}
}
stats.files_updated = 1;
if self.verification_mode != ChecksumType::None && !self.dry_run {
let verifier = IntegrityVerifier::new(self.verification_mode, self.verify_on_write);
match verifier.verify_transfer(source, destination) {
Ok(verified) => {
if verified {
stats.files_verified = 1;
} else {
stats.verification_failures = 1;
tracing::warn!("Verification failed for {}: checksums do not match", destination.display());
}
}
Err(e) => {
tracing::warn!("Verification error for {}: {}", destination.display(), e);
stats.verification_failures = 1;
}
}
}
}
stats.duration = start_time.elapsed();
Ok(stats)
}
pub fn get_performance_metrics(&self) -> Option<PerformanceMetrics> {
self.perf_monitor.as_ref().map(|monitor| monitor.lock().unwrap().get_metrics())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::transport::local::LocalTransport;
use std::fs;
use tempfile::TempDir;
fn create_test_engine() -> SyncEngine<LocalTransport> {
let transport = LocalTransport::new();
SyncEngine::new(
transport,
false, false, false, 50, false, false, true, 4, 100, None, None, FilterEngine::new(), None, false, 0, 0, false, ChecksumType::Fast,
false, SymlinkMode::Preserve,
false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, )
}
#[tokio::test]
async fn test_basic_sync_success() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
fs::write(source_dir.path().join("file1.txt"), "content1").unwrap();
fs::write(source_dir.path().join("file2.txt"), "content2").unwrap();
let engine = create_test_engine();
let stats = engine.sync(source_dir.path(), dest_dir.path()).await.unwrap();
assert_eq!(stats.files_created, 2);
assert!(dest_dir.path().join("file1.txt").exists());
assert!(dest_dir.path().join("file2.txt").exists());
assert_eq!(fs::read_to_string(dest_dir.path().join("file1.txt")).unwrap(), "content1");
}
#[tokio::test]
async fn test_sync_with_subdirectories() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
fs::create_dir(source_dir.path().join("subdir")).unwrap();
fs::write(source_dir.path().join("subdir/file.txt"), "nested").unwrap();
let engine = create_test_engine();
let stats = engine.sync(source_dir.path(), dest_dir.path()).await.unwrap();
assert!(stats.files_created >= 1);
assert!(dest_dir.path().join("subdir/file.txt").exists());
assert_eq!(fs::read_to_string(dest_dir.path().join("subdir/file.txt")).unwrap(), "nested");
}
#[tokio::test]
async fn test_sync_empty_source() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
let engine = create_test_engine();
let stats = engine.sync(source_dir.path(), dest_dir.path()).await.unwrap();
assert_eq!(stats.files_created, 0);
assert_eq!(stats.files_scanned, 0);
}
#[tokio::test]
async fn test_sync_dry_run_no_changes() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
fs::write(source_dir.path().join("file.txt"), "content").unwrap();
let transport = LocalTransport::new();
let engine = SyncEngine::new(
transport,
true, false, false, 50, false, false, true, 4, 100, None, None, FilterEngine::new(), None, false, 0, 0, false, ChecksumType::Fast,
false, SymlinkMode::Preserve,
false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, );
let stats = engine.sync(source_dir.path(), dest_dir.path()).await.unwrap();
assert_eq!(stats.files_scanned, 1);
assert!(!dest_dir.path().join("file.txt").exists());
}
#[tokio::test]
async fn test_toctou_file_deleted_after_scan() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
let file_path = source_dir.path().join("file.txt");
fs::write(&file_path, "content").unwrap();
let scanner = scanner::Scanner::new(source_dir.path());
let source_files = scanner.scan().unwrap();
assert_eq!(source_files.len(), 1);
fs::remove_file(&file_path).unwrap();
let engine = create_test_engine();
let result = engine.sync(source_dir.path(), dest_dir.path()).await;
match result {
Ok(stats) => {
assert_eq!(stats.files_created, 0);
}
Err(_) => {
}
}
}
#[tokio::test]
async fn test_toctou_file_modified_after_scan() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
let file_path = source_dir.path().join("file.txt");
fs::write(&file_path, "initial content").unwrap();
let engine = create_test_engine();
let source = source_dir.path().to_path_buf();
let dest = dest_dir.path().to_path_buf();
fs::write(&file_path, "modified content").unwrap();
let stats = engine.sync(&source, &dest).await.unwrap();
assert_eq!(stats.files_created, 1);
assert!(dest_dir.path().join("file.txt").exists());
}
#[tokio::test]
async fn test_toctou_file_size_changed() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
let file_path = source_dir.path().join("file.txt");
fs::write(&file_path, "small").unwrap();
let initial_size = fs::metadata(&file_path).unwrap().len();
assert_eq!(initial_size, 5);
fs::write(&file_path, "a".repeat(10000)).unwrap();
let engine = create_test_engine();
let result = engine.sync(source_dir.path(), dest_dir.path()).await;
match result {
Ok(stats) => {
assert_eq!(stats.files_created, 1);
assert!(dest_dir.path().join("file.txt").exists());
}
Err(_) => {
}
}
}
#[tokio::test]
async fn test_toctou_directory_deleted_after_scan() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
let subdir = source_dir.path().join("subdir");
fs::create_dir(&subdir).unwrap();
fs::write(subdir.join("file.txt"), "content").unwrap();
let scanner = scanner::Scanner::new(source_dir.path());
let source_files = scanner.scan().unwrap();
assert!(!source_files.is_empty());
fs::remove_dir_all(&subdir).unwrap();
let engine = create_test_engine();
let result = engine.sync(source_dir.path(), dest_dir.path()).await;
match result {
Ok(stats) => {
assert_eq!(stats.files_created, 0);
}
Err(_) => {
}
}
}
#[tokio::test]
async fn test_toctou_new_file_created_during_sync() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
fs::write(source_dir.path().join("file1.txt"), "content1").unwrap();
fs::write(source_dir.path().join("file2.txt"), "content2").unwrap();
let engine = create_test_engine();
let stats = engine.sync(source_dir.path(), dest_dir.path()).await.unwrap();
assert!(stats.files_created >= 1);
}
#[tokio::test]
async fn test_sync_many_small_files() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
for i in 0..100 {
fs::write(source_dir.path().join(format!("file{}.txt", i)), format!("content{}", i)).unwrap();
}
let engine = create_test_engine();
let stats = engine.sync(source_dir.path(), dest_dir.path()).await.unwrap();
assert_eq!(stats.files_created, 100);
for i in 0..100 {
assert!(dest_dir.path().join(format!("file{}.txt", i)).exists());
}
}
#[tokio::test]
async fn test_sync_very_deep_nesting() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
let mut path = source_dir.path().to_path_buf();
for i in 0..100 {
path.push(format!("level{}", i));
}
fs::create_dir_all(&path).unwrap();
fs::write(path.join("deep.txt"), "very deep content").unwrap();
let engine = create_test_engine();
let stats = engine.sync(source_dir.path(), dest_dir.path()).await.unwrap();
assert!(stats.files_created >= 1);
let mut dest_path = dest_dir.path().to_path_buf();
for i in 0..100 {
dest_path.push(format!("level{}", i));
}
dest_path.push("deep.txt");
assert!(dest_path.exists());
assert_eq!(fs::read_to_string(&dest_path).unwrap(), "very deep content");
}
#[tokio::test]
async fn test_sync_large_file() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
let large_content = "x".repeat(10 * 1024 * 1024);
fs::write(source_dir.path().join("large.bin"), &large_content).unwrap();
let engine = create_test_engine();
let stats = engine.sync(source_dir.path(), dest_dir.path()).await.unwrap();
assert_eq!(stats.files_created, 1);
assert!(stats.bytes_transferred >= 10 * 1024 * 1024);
let dest_file = dest_dir.path().join("large.bin");
assert!(dest_file.exists());
assert_eq!(fs::metadata(&dest_file).unwrap().len(), 10 * 1024 * 1024);
}
#[tokio::test]
async fn test_sync_mixed_sizes() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
fs::write(source_dir.path().join("tiny.txt"), "x").unwrap();
fs::write(source_dir.path().join("small.txt"), "x".repeat(1024)).unwrap();
fs::write(source_dir.path().join("medium.txt"), "x".repeat(100 * 1024)).unwrap();
fs::write(source_dir.path().join("large.txt"), "x".repeat(1024 * 1024)).unwrap();
let engine = create_test_engine();
let stats = engine.sync(source_dir.path(), dest_dir.path()).await.unwrap();
assert_eq!(stats.files_created, 4);
assert!(dest_dir.path().join("tiny.txt").exists());
assert!(dest_dir.path().join("small.txt").exists());
assert!(dest_dir.path().join("medium.txt").exists());
assert!(dest_dir.path().join("large.txt").exists());
}
#[tokio::test]
async fn test_sync_idempotent() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
fs::write(source_dir.path().join("file.txt"), "content").unwrap();
let engine = create_test_engine();
let stats1 = engine.sync(source_dir.path(), dest_dir.path()).await.unwrap();
assert_eq!(stats1.files_created, 1);
let stats2 = engine.sync(source_dir.path(), dest_dir.path()).await.unwrap();
assert_eq!(stats2.files_skipped, 1);
assert_eq!(stats2.files_created, 0);
}
#[tokio::test]
async fn test_error_threshold_zero_collects_all_errors() {
use std::os::unix::fs::PermissionsExt;
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
for i in 1..=5 {
fs::write(source_dir.path().join(format!("file{}.txt", i)), "content").unwrap();
}
let perms = fs::Permissions::from_mode(0o444);
fs::set_permissions(dest_dir.path(), perms).unwrap();
let transport = LocalTransport::new();
let engine = SyncEngine::new(
transport,
false, false, false, 50, false, false, true, 1, 0, None, None, FilterEngine::new(), None, false, 0, 0, false, ChecksumType::Fast,
false, SymlinkMode::Preserve,
false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, );
let result = engine.sync(source_dir.path(), dest_dir.path()).await;
match result {
Ok(stats) => {
assert!(!stats.errors.is_empty(), "Should have collected errors");
}
Err(_) => {
}
}
let perms = fs::Permissions::from_mode(0o755);
let _ = fs::set_permissions(dest_dir.path(), perms);
}
#[tokio::test]
async fn test_error_threshold_aborts_when_exceeded() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
for i in 1..=10 {
fs::write(source_dir.path().join(format!("file{}.txt", i)), "content").unwrap();
}
let scanner = scanner::Scanner::new(source_dir.path());
let source_files = scanner.scan().unwrap();
assert_eq!(source_files.len(), 10);
for i in 1..=5 {
fs::remove_file(source_dir.path().join(format!("file{}.txt", i))).unwrap();
}
let transport = LocalTransport::new();
let engine = SyncEngine::new(
transport,
false, false, false, 50, false, false, true, 1, 3, None, None, FilterEngine::new(), None, false, 0, 0, false, ChecksumType::Fast,
false, SymlinkMode::Preserve,
false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, );
let result = engine.sync(source_dir.path(), dest_dir.path()).await;
match result {
Ok(stats) => {
assert!(stats.files_created <= 10);
}
Err(e) => {
let error_msg = e.to_string();
assert!(
error_msg.contains("Error threshold exceeded") || error_msg.contains("errors"),
"Error should mention threshold: {}",
error_msg
);
}
}
}
#[tokio::test]
async fn test_error_collection_below_threshold_continues() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
for i in 1..=10 {
fs::write(source_dir.path().join(format!("file{}.txt", i)), "content").unwrap();
}
let scanner = scanner::Scanner::new(source_dir.path());
let source_files = scanner.scan().unwrap();
assert_eq!(source_files.len(), 10);
fs::remove_file(source_dir.path().join("file1.txt")).unwrap();
fs::remove_file(source_dir.path().join("file2.txt")).unwrap();
let transport = LocalTransport::new();
let engine = SyncEngine::new(
transport,
false, false, false, 50, false, false, true, 1, 5, None, None, FilterEngine::new(), None, false, 0, 0, false, ChecksumType::Fast,
false, SymlinkMode::Preserve,
false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, );
let result = engine.sync(source_dir.path(), dest_dir.path()).await;
match result {
Ok(stats) => {
assert!(stats.files_created <= 8, "Should sync remaining files");
if !stats.errors.is_empty() {
assert!(stats.errors.len() <= 5, "Errors should be below threshold");
}
}
Err(_) => {
}
}
}
#[tokio::test]
async fn test_error_message_includes_count_and_first_error() {
let source_dir = TempDir::new().unwrap();
let dest_dir = TempDir::new().unwrap();
for i in 1..=5 {
fs::write(source_dir.path().join(format!("file{}.txt", i)), "content").unwrap();
}
let scanner = scanner::Scanner::new(source_dir.path());
let _ = scanner.scan().unwrap();
for i in 1..=5 {
fs::remove_file(source_dir.path().join(format!("file{}.txt", i))).unwrap();
}
let transport = LocalTransport::new();
let engine = SyncEngine::new(
transport,
false, false, false, 50, false, false, true, 1, 2, None, None, FilterEngine::new(),
None, false, 0, 0, false, ChecksumType::Fast,
false, SymlinkMode::Preserve,
false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, );
let result = engine.sync(source_dir.path(), dest_dir.path()).await;
if let Err(e) = result {
let error_msg = e.to_string();
assert!(error_msg.contains("error") || error_msg.contains("Error"), "Error message should mention errors: {}", error_msg);
}
}
}