use crate::infra::error::SyncError;
use crate::utils::{create_progress_bar, format_file_size};
use crate::{cli, config};
use notify::{Event, EventKind, RecursiveMode, Watcher, recommended_watcher};
use num_format::{Locale, ToFormattedString};
use std::fs;
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime};
use tracing::{debug, error, info, warn};
use chrono::Local;
#[derive(Debug, Clone)]
pub struct FileInfo {
pub path: PathBuf,
pub mtime: SystemTime,
pub size: u64,
pub blake3_hash: Option<[u8; 32]>,
}
impl FileInfo {
pub fn from_path(path: &Path, compute_hash: bool) -> std::io::Result<Self> {
let metadata = fs::metadata(path)?;
let blake3_hash = if compute_hash && metadata.is_file() {
Some(compute_blake3_hash(path)?)
} else {
None
};
Ok(FileInfo {
path: path.to_path_buf(),
mtime: metadata.modified()?,
size: metadata.len(),
blake3_hash,
})
}
pub fn is_newer_than(&self, target: &Self) -> bool {
self.mtime > target.mtime || self.size != target.size
}
pub fn content_eq(&self, other: &Self) -> bool {
self.size == other.size && self.blake3_hash == other.blake3_hash
}
}
#[derive(Debug, Clone)]
pub struct SyncParameters {
pub source: PathBuf,
pub target: PathBuf,
pub dry_run: bool,
pub checksum: bool,
pub excludes: Vec<String>,
pub delete_extra: bool,
pub delete_excludes: Vec<String>,
pub detail: bool,
}
impl From<&cli::Command> for SyncParameters {
fn from(cmd: &cli::Command) -> Self {
match cmd {
cli::Command::Sync {
source,
target,
dry_run,
checksum,
delete,
exclude,
delete_exclude,
detail,
} => Self {
source: source.clone(),
target: target.clone(),
dry_run: *dry_run,
checksum: *checksum,
excludes: exclude.clone(),
delete_extra: *delete,
delete_excludes: delete_exclude.clone(),
detail: *detail,
},
cli::Command::Run {
name: _,
config: _,
dry_run,
checksum,
detail,
} => {
Self {
source: PathBuf::new(),
target: PathBuf::new(),
dry_run: *dry_run,
checksum: *checksum,
excludes: Vec::new(),
delete_extra: false,
delete_excludes: Vec::new(),
detail: *detail,
}
}
cli::Command::Watch {
name: _,
config: _,
delay: _,
checksum,
dry_run,
detail,
} => Self {
source: PathBuf::new(),
target: PathBuf::new(),
dry_run: *dry_run,
checksum: *checksum,
excludes: Vec::new(),
delete_extra: false,
delete_excludes: Vec::new(),
detail: *detail,
}
}
}
}
impl From<&config::SyncTask> for SyncParameters {
fn from(task: &config::SyncTask) -> Self {
Self {
source: task.source.clone(),
target: task.target.clone(),
dry_run: false, checksum: false, excludes: task.exclude.clone(),
delete_extra: task.delete_extra,
delete_excludes: task.delete_extra_exclude.clone(),
detail: false,
}
}
}
mod scanner {
use super::*;
pub fn scan_directory<P: AsRef<Path>>(
root: P,
exclude_patterns: &[String],
compute_hash: bool,
) -> Result<Vec<FileInfo>, SyncError> {
let mut files = Vec::new();
let root = root.as_ref();
if !root.exists() {
return Err(SyncError::SourceNotFound(root.to_path_buf()));
}
let entries = fs::read_dir(root).map_err(|e| {
debug!(
error = ?e,
path = %root.display(),
"Failed to read directory"
);
SyncError::IoError(e)
})?;
for entry in entries {
let entry = match entry {
Ok(entry) => entry,
Err(e) => {
warn!(
error = ?e,
dir = %root.display(),
"Failed to read directory entry"
);
continue;
}
};
let path = entry.path();
if should_exclude(&path, root, exclude_patterns) {
debug!(path = %path.display(), "Skipped (excluded)");
continue;
}
if path.is_dir() {
match scan_directory(&path, exclude_patterns, compute_hash) {
Ok(mut sub_files) => {
files.append(&mut sub_files);
}
Err(e) => {
return Err(e);
}
}
} else {
match FileInfo::from_path(&path, compute_hash) {
Ok(info) => files.push(info),
Err(e) => {
warn!(
error = ?e,
path = %path.display(),
"Failed to read file metadata"
);
}
}
}
}
Ok(files)
}
}
mod filter {
use super::*;
pub fn should_exclude(path: &Path, root: &Path, exclude_patterns: &[String]) -> bool {
let relative = match path.strip_prefix(root) {
Ok(rel) => rel,
Err(_) => return false, };
let relative_str = relative.to_string_lossy();
for pattern in exclude_patterns {
if pattern.starts_with('/') {
if relative_str.starts_with(&pattern[1..]) {
return true;
}
} else if pattern.ends_with('/') {
if relative_str.starts_with(&*pattern)
|| relative_str.contains(&format!("/{}", pattern))
{
return true;
}
} else {
let regex_pattern = pattern.replace('*', ".*");
if let Ok(regex) = regex::Regex::new(&format!("^{}$", regex_pattern)) {
if regex.is_match(&relative_str) {
return true;
}
}
}
}
if let Some(name) = relative.file_name().and_then(|s| s.to_str()) {
matches!(
name,
".DS_Store" | ".fseventsd" | ".Trashes" | ".Spotlight-V100" | ".TemporaryItems"
) || name.starts_with("._") } else {
false
}
}
pub fn should_sync(
source_info: &FileInfo,
target_info: Option<&FileInfo>,
checksum: bool,
) -> bool {
match target_info {
None => true, Some(target) => {
if checksum {
!source_info.content_eq(target)
} else {
source_info.is_newer_than(target)
}
}
}
}
}
mod file_ops {
use super::*;
pub async fn copy_file(source: &Path, target: &Path, dry_run: bool) -> std::io::Result<()> {
if dry_run {
return Ok(());
}
if let Some(parent) = target.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::copy(source, target).await?;
Ok(())
}
pub fn compute_blake3_hash(path: &Path) -> std::io::Result<[u8; 32]> {
let mut file = fs::File::open(path)?;
let mut hasher = blake3::Hasher::new();
std::io::copy(&mut file, &mut hasher)?;
Ok(hasher.finalize().into())
}
pub async fn delete_extra_files(
source: &PathBuf,
target: &PathBuf,
dry_run: bool,
exclude: &[String],
delete_exclude: &[String],
) -> anyhow::Result<(Vec<PathBuf>, Vec<PathBuf>, Vec<(PathBuf, String)>)> {
use std::collections::HashSet;
let source_files: HashSet<String> = scan_directory(source, exclude, false)?
.into_iter()
.filter_map(|info| {
info.path
.strip_prefix(source)
.ok()
.map(|rel| rel.to_string_lossy().to_string())
})
.collect();
let mut to_delete = Vec::new();
scan_target_for_deletion(
target,
target,
&source,
&source_files,
exclude,
delete_exclude,
&mut to_delete,
)
.await?;
let mut deleted = Vec::new();
let mut would_delete = Vec::new();
let mut delete_errors = Vec::new();
for path in &to_delete {
if dry_run {
would_delete.push(path.clone());
} else {
match tokio::fs::remove_file(path).await {
Ok(()) => {
deleted.push(path.clone());
would_delete.push(path.clone());
}
Err(e) => {
delete_errors.push((path.clone(), e.to_string()));
}
}
}
}
Ok((deleted, would_delete, delete_errors))
}
pub async fn scan_target_for_deletion(
current: &PathBuf,
target_root: &PathBuf,
source_root: &PathBuf,
source_files: &std::collections::HashSet<String>,
exclude: &[String],
delete_exclude: &[String],
to_delete: &mut Vec<PathBuf>,
) -> std::io::Result<()> {
let mut dir = tokio::fs::read_dir(current).await?;
while let Some(entry) = dir.next_entry().await? {
let path = entry.path();
if path.is_dir() {
let future = scan_target_for_deletion(
&path,
target_root,
source_root,
source_files,
exclude,
delete_exclude,
to_delete,
);
Box::pin(future).await?;
} else {
if let Ok(rel_path) = path.strip_prefix(target_root) {
let rel_str = rel_path.to_string_lossy().to_string();
if !source_files.contains(&rel_str)
&& !should_exclude(&path, source_root, exclude)
&& !should_exclude(&path, target_root, delete_exclude)
{
to_delete.push(path);
}
}
}
}
Ok(())
}
}
mod sync_logic {
use super::*;
pub struct SyncOptions {
pub dry_run: bool,
pub excludes: Vec<String>,
pub checksum: bool,
pub delete_extra: bool,
pub delete_excludes: Vec<String>,
}
impl Default for SyncOptions {
fn default() -> Self {
Self {
dry_run: false,
excludes: vec![],
checksum: false,
delete_extra: false,
delete_excludes: vec![],
}
}
}
pub async fn sync_directories(params: &SyncParameters) -> anyhow::Result<SyncReport> {
let options = SyncOptions {
dry_run: params.dry_run,
excludes: params.excludes.clone(),
checksum: params.checksum,
delete_extra: params.delete_extra,
delete_excludes: params.delete_excludes.clone(),
};
let mut report = SyncReport::default();
let source_files = scan_directory(¶ms.source, &options.excludes, options.checksum)
.map_err(|e| anyhow::anyhow!("Failed to scan source directory -> {}", e))?;
let mut sync_queue = Vec::new();
let mut total_sync_size: u64 = 0;
for source_info in &source_files {
let relative = source_info
.path
.strip_prefix(¶ms.source)
.expect("File not under source root");
let target_path = params.target.join(relative);
let target_info = if target_path.exists() {
FileInfo::from_path(&target_path, options.checksum).ok()
} else {
None
};
if should_sync(source_info, target_info.as_ref(), options.checksum) {
sync_queue.push((source_info.clone(), target_path));
total_sync_size += source_info.size;
}
}
if options.delete_extra {
let (deleted, would_delete, delete_errors) = delete_extra_files(
¶ms.source,
¶ms.target,
options.dry_run,
&options.excludes,
&options.delete_excludes,
)
.await?;
report.deleted = deleted;
report.would_delete = would_delete;
report.delete_errors = delete_errors;
}
if sync_queue.is_empty()
&& (!options.delete_extra
|| report.would_delete.is_empty()
|| report.deleted.is_empty())
{
print_report(
true,
&report,
options.dry_run,
options.delete_extra,
source_files.len(),
total_sync_size,
params.detail,
);
return Ok(report);
}
let mut processed_size = 0;
if options.dry_run {
for (source_info, _target_path) in &sync_queue {
report.copied.push(source_info.path.clone());
}
} else {
let pb = create_progress_bar(total_sync_size);
for (source_info, target_path) in &sync_queue {
match copy_file(&source_info.path, target_path, options.dry_run).await {
Ok(()) => {
report.copied.push(source_info.path.clone());
processed_size += source_info.size;
pb.set_position(processed_size);
debug!(
source = %source_info.path.display(),
target = %target_path.display(),
"File copied"
);
}
Err(e) => {
warn!(
error = ?e,
source = %source_info.path.display(),
target = %target_path.display(),
"Failed to copy file"
);
report.errors.push((target_path.clone(), e.to_string()));
processed_size += source_info.size;
pb.set_position(processed_size);
}
}
}
pb.finish_with_message("File sync completed");
}
if report.errors.len() > 0 {
warn!(count = report.errors.len(), "Some files failed to copy");
anyhow::bail!("Failed to copy {} files", report.errors.len());
}
print_report(
false,
&report,
options.dry_run,
options.delete_extra, source_files.len(),
total_sync_size,
params.detail,
);
Ok(report)
}
}
mod watcher {
use super::*;
pub async fn watch_task(
params: &SyncParameters,
delay_ms: u64,
) -> anyhow::Result<SyncReport, SyncError> {
let mut total_report = SyncReport::default();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let mut watcher =
recommended_watcher(move |res: std::result::Result<Event, notify::Error>| {
match res {
Ok(event) => {
match event.kind {
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) => {
let _ = tx.send(event);
}
_ => {
debug!(event = ?event, "Ignored file system event");
}
}
}
Err(error) => {
error!("📁 File watch error: {}", error)
}
}
})
.map_err(|e| anyhow::anyhow!("Failed to create file watcher: {}", e))?;
watcher
.watch(¶ms.source, RecursiveMode::Recursive)
.map_err(|e| {
anyhow::anyhow!(
"Failed to watch directory '{}': {}",
params.source.display(),
e
)
})?;
info!(
"Started watching: {} → {}",
params.source.display(),
params.target.display()
);
loop {
if rx.recv().await.is_none() {
info!("Watcher channel closed, exiting...");
break; }
debug!(
"Change detected, starting debounce period of {}ms...",
delay_ms
);
loop {
match tokio::time::timeout(Duration::from_millis(delay_ms), rx.recv()).await {
Ok(Some(_)) => {
debug!("Another change detected, restarting debounce timer...");
continue; }
Ok(None) => {
info!("Watcher channel closed during debounce.");
return Ok(total_report); }
Err(_) => {
debug!("Debounce period ended with no further changes.");
break; }
}
}
debug!("📁 Detected stable changes → syncing...");
match sync_directories(¶ms).await {
Ok(report) => {
debug!("✅ Sync completed successfully");
total_report.copied.extend(report.copied);
total_report.errors.extend(report.errors);
}
Err(e) => {
error!(
error = ?e,
source = %params.source.display(),
target = %params.target.display(),
"Sync failed during watch"
);
total_report
.errors
.push((params.source.clone(), e.to_string()));
}
}
}
Ok(total_report)
}
}
mod report {
use super::*;
use std::fmt::Write;
#[derive(Debug, Default)]
pub struct SyncReport {
pub copied: Vec<PathBuf>, pub errors: Vec<(PathBuf, String)>, pub deleted: Vec<PathBuf>, pub would_delete: Vec<PathBuf>, pub delete_errors: Vec<(PathBuf, String)>, }
pub fn print_report(
is_latest: bool,
report: &SyncReport,
dry_run: bool,
delete_extra: bool,
total_source_files: usize,
total_sync_size: u64,
detail: bool,
) {
if is_latest {
warn!("未发现待同步的文件");
return;
}
let mut output = String::new();
writeln!(
output,
"{}\n源文件总数:{},{}同步文件数: {} ({})",
if dry_run {
"试运行模式"
} else {
"同步成功!"
},
total_source_files.to_formatted_string(&Locale::en),
if dry_run { "待" } else { "" },
report.copied.len().to_formatted_string(&Locale::en),
format_file_size(total_sync_size)
)
.unwrap();
if detail && !report.copied.is_empty() {
writeln!(output, "{}同步的文件:", if dry_run { "待" } else { "" }).unwrap();
for path in &report.copied {
writeln!(output, " - {}", path.display()).unwrap();
}
}
if !dry_run && !report.errors.is_empty() {
writeln!(
output,
"同步错误数: {}",
report.errors.len().to_formatted_string(&Locale::en)
)
.unwrap();
}
if detail && !report.errors.is_empty() {
writeln!(output, "同步错误详情:").unwrap();
for (path, err) in &report.errors {
writeln!(output, " - {}: {}", path.display(), err).unwrap();
}
}
if delete_extra {
let has_delete_data = if dry_run {
!report.would_delete.is_empty() } else {
!report.deleted.is_empty() || !report.delete_errors.is_empty() };
if has_delete_data {
if dry_run {
writeln!(
output,
"待删除文件数: {}",
report.would_delete.len().to_formatted_string(&Locale::en)
)
.unwrap();
if detail && !report.would_delete.is_empty() {
writeln!(output, "待删除的文件:").unwrap();
for path in &report.would_delete {
writeln!(output, " - {}", path.display()).unwrap();
}
}
} else {
writeln!(
output,
"已删除文件数: {}",
report.deleted.len().to_formatted_string(&Locale::en)
)
.unwrap();
if detail && !report.deleted.is_empty() {
writeln!(output, "已删除的文件:").unwrap();
for path in &report.deleted {
writeln!(output, " - {}", path.display()).unwrap();
}
}
if !report.delete_errors.is_empty() {
writeln!(
output,
"删除错误数: {}",
report.delete_errors.len().to_formatted_string(&Locale::en)
)
.unwrap();
if detail {
writeln!(output, "删除错误详情:").unwrap();
for (path, err) in &report.delete_errors {
writeln!(output, " - {}: {}", path.display(), err).unwrap();
}
}
}
}
}
}
let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
info!("[{}] {}", timestamp, output);
}
}
use crate::sync::file_ops::compute_blake3_hash;
pub use file_ops::{copy_file, delete_extra_files};
pub use filter::{should_exclude, should_sync};
pub use report::{SyncReport, print_report};
pub use scanner::scan_directory;
pub use sync_logic::{SyncOptions, sync_directories};
pub use watcher::watch_task;