use super::file_ops::{copy_file, delete_extra_files};
use super::filter::should_sync;
use super::report::{SyncReport, print_report};
use super::scanner::scan_directory;
use super::types::{FileInfo, SyncParameters};
use crate::utils::create_progress_bar;
use chrono::Utc;
use futures::stream::{FuturesUnordered, StreamExt};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Semaphore;
use tracing::{debug, warn};
pub struct SyncOptions {
pub dry_run: bool,
pub excludes: Vec<String>,
pub checksum: bool,
pub delete_extra: bool,
pub delete_excludes: Vec<String>,
}
impl Default for SyncOptions {
fn default() -> Self {
Self {
dry_run: false,
excludes: vec![],
checksum: false,
delete_extra: false,
delete_excludes: vec![],
}
}
}
pub async fn sync_directories(params: &SyncParameters) -> anyhow::Result<SyncReport> {
let options = SyncOptions {
dry_run: params.dry_run,
excludes: params.excludes.clone(),
checksum: params.checksum,
delete_extra: params.delete_extra,
delete_excludes: params.delete_excludes.clone(),
};
let mut report = SyncReport::default(); println!("当前时间戳1: {}", Utc::now().timestamp());
let source_files = scan_directory(¶ms.source, &options.excludes, options.checksum)
.map_err(|e| anyhow::anyhow!("Failed to scan source directory -> {}", e))?;
println!("当前时间戳2: {}", Utc::now().timestamp());
let target_cache: HashMap<String, FileInfo> = if params.target.exists() {
match scan_directory(¶ms.target, &options.excludes, options.checksum) {
Ok(target_files) => target_files
.into_iter()
.filter_map(|info| {
let relative = info
.path
.strip_prefix(¶ms.target)
.map(|p| p.to_string_lossy().to_string())
.ok();
relative.map(|rel| (rel, info))
})
.collect(),
Err(e) => {
warn!(error = ?e, "Failed to scan target directory, proceeding with empty cache");
HashMap::new()
}
}
} else {
debug!("Target directory does not exist, skipping target scan");
HashMap::new()
};
let mut sync_queue = Vec::new();
let mut total_sync_size: u64 = 0;
for source_info in &source_files {
let relative = source_info
.path
.strip_prefix(¶ms.source)
.expect("File not under source root");
let relative_str = relative.to_string_lossy().to_string();
let target_path = params.target.join(relative);
let target_info = target_cache.get(&relative_str);
if should_sync(source_info, target_info, options.checksum) {
sync_queue.push((source_info.clone(), target_path));
total_sync_size += source_info.size;
}
}
if options.delete_extra {
if !params.target.exists() {
std::fs::create_dir_all(¶ms.target)
.map_err(|e| anyhow::anyhow!("Failed to create target directory for deletion: {}", e))?;
}
let (deleted, would_delete, delete_errors) = delete_extra_files(
¶ms.source,
¶ms.target,
options.dry_run,
&options.excludes,
&options.delete_excludes,
)
.await?;
report.deleted = deleted;
report.would_delete = would_delete;
report.delete_errors = delete_errors;
}
if sync_queue.is_empty()
&& (!options.delete_extra || report.would_delete.is_empty() || report.deleted.is_empty())
{
print_report(
true,
&report,
options.dry_run,
options.delete_extra,
source_files.len(),
total_sync_size,
params.detail,
);
return Ok(report);
}
if options.dry_run {
for (source_info, _target_path) in &sync_queue {
report.copied.push((*source_info.path).to_path_buf());
}
} else {
let pb = create_progress_bar(total_sync_size);
let processed_bytes = Arc::new(AtomicU64::new(0));
let semaphore = Arc::new(Semaphore::new(8));
use tokio::sync::Notify;
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let pb_clone_for_refresh = pb.clone();
let processed_bytes_for_refresh = processed_bytes.clone();
let refresh_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(100));
loop {
interval.tick().await;
let pos = processed_bytes_for_refresh.load(Ordering::Relaxed);
if pos >= total_sync_size {
break;
}
pb_clone_for_refresh.set_position(pos);
}
pb_clone_for_refresh.set_position(total_sync_size); });
let mut tasks = FuturesUnordered::new();
for (source_info, target_path) in &sync_queue {
let source_path = source_info.path.clone();
let target_path_clone = target_path.clone();
let size = source_info.size;
let pb_clone = pb.clone();
let processed_bytes_clone = processed_bytes.clone();
let semaphore_clone = semaphore.clone();
let source_display = source_path.display().to_string();
let target_display = target_path_clone.display().to_string();
let task = tokio::spawn(async move {
let _permit = semaphore_clone.acquire().await.unwrap();
let progress_cb = |bytes: u64| {
let _ = processed_bytes_clone.fetch_add(bytes, Ordering::Relaxed);
};
let result = copy_file(&source_path, &target_path_clone, false, Some(&processed_bytes_clone)).await;
let current = processed_bytes_clone.fetch_add(size, Ordering::Relaxed) + size;
pb_clone.set_position(current);
(
result,
source_path,
target_path_clone,
source_display,
target_display,
)
});
tasks.push(task);
}
while let Some(result) = tasks.next().await {
match result {
Ok((Ok(()), source_path, _target_path, source_display, target_display)) => {
report.copied.push((source_path).to_path_buf());
debug!(
source = %source_display,
target = %target_display,
"File copied"
);
}
Ok((Err(e), _source_path, target_path, source_display, target_display)) => {
warn!(
error = ?e,
source = %source_display,
target = %target_display,
"Failed to copy file"
);
report.errors.push((target_path, e.to_string()));
}
Err(join_error) => {
warn!(error = ?join_error, "Copy task panicked");
report.errors.push((PathBuf::new(), join_error.to_string()));
}
}
}
notify.notify_waiters();
let _ = refresh_handle.await;
pb.finish_with_message("File sync completed");
}
if report.errors.len() > 0 {
warn!(count = report.errors.len(), "Some files failed to copy");
anyhow::bail!("Failed to copy {} files", report.errors.len());
}
print_report(
false,
&report,
options.dry_run,
options.delete_extra, source_files.len(),
total_sync_size,
params.detail,
);
Ok(report)
}