1use crate::infra::error::SyncError;
2use crate::utils::{create_progress_bar, format_file_size};
3use crate::{cli, config};
4use notify::{Event, EventKind, RecursiveMode, Watcher, recommended_watcher};
5use num_format::{Locale, ToFormattedString};
6use std::fs;
7use std::path::{Path, PathBuf};
8use std::time::{Duration, SystemTime};
9use tracing::{debug, error, info, warn};
10use chrono::Local;
11
12#[derive(Debug, Clone)]
18pub struct FileInfo {
19 pub path: PathBuf,
21 pub mtime: SystemTime,
23 pub size: u64,
25 pub blake3_hash: Option<[u8; 32]>,
27}
28
29impl FileInfo {
30 pub fn from_path(path: &Path, compute_hash: bool) -> std::io::Result<Self> {
32 let metadata = fs::metadata(path)?;
33 let blake3_hash = if compute_hash && metadata.is_file() {
34 Some(compute_blake3_hash(path)?)
35 } else {
36 None
37 };
38 Ok(FileInfo {
39 path: path.to_path_buf(),
40 mtime: metadata.modified()?,
41 size: metadata.len(),
42 blake3_hash,
43 })
44 }
45
46 pub fn is_newer_than(&self, target: &Self) -> bool {
48 self.mtime > target.mtime || self.size != target.size
49 }
50
51 pub fn content_eq(&self, other: &Self) -> bool {
53 self.size == other.size && self.blake3_hash == other.blake3_hash
54 }
55}
56
57#[derive(Debug, Clone)]
59pub struct SyncParameters {
60 pub source: PathBuf,
62 pub target: PathBuf,
64 pub dry_run: bool,
66 pub checksum: bool,
68 pub excludes: Vec<String>,
70 pub delete_extra: bool,
72 pub delete_excludes: Vec<String>,
74 pub detail: bool,
76}
77
78impl From<&cli::Command> for SyncParameters {
80 fn from(cmd: &cli::Command) -> Self {
81 match cmd {
82 cli::Command::Sync {
83 source,
84 target,
85 dry_run,
86 checksum,
87 delete,
88 exclude,
89 delete_exclude,
90 detail,
91 } => Self {
92 source: source.clone(),
93 target: target.clone(),
94 dry_run: *dry_run,
95 checksum: *checksum,
96 excludes: exclude.clone(),
97 delete_extra: *delete,
98 delete_excludes: delete_exclude.clone(),
99 detail: *detail,
100 },
101 cli::Command::Run {
102 name: _,
103 config: _,
104 dry_run,
105 checksum,
106 detail,
107 } => {
108 Self {
110 source: PathBuf::new(),
111 target: PathBuf::new(),
112 dry_run: *dry_run,
113 checksum: *checksum,
114 excludes: Vec::new(),
115 delete_extra: false,
116 delete_excludes: Vec::new(),
117 detail: *detail,
118 }
119 }
120 cli::Command::Watch {
121 name: _,
122 config: _,
123 delay: _,
124 checksum,
125 dry_run,
126 detail,
127 } => Self {
128 source: PathBuf::new(),
129 target: PathBuf::new(),
130 dry_run: *dry_run,
131 checksum: *checksum,
132 excludes: Vec::new(),
133 delete_extra: false,
134 delete_excludes: Vec::new(),
135 detail: *detail,
136 }
137 }
138 }
139}
140
141impl From<&config::SyncTask> for SyncParameters {
143 fn from(task: &config::SyncTask) -> Self {
144 Self {
145 source: task.source.clone(),
146 target: task.target.clone(),
147 dry_run: false, checksum: false, excludes: task.exclude.clone(),
150 delete_extra: task.delete_extra,
151 delete_excludes: task.delete_extra_exclude.clone(),
152 detail: false,
153 }
154 }
155}
156
157mod scanner {
163 use super::*;
164 pub fn scan_directory<P: AsRef<Path>>(
176 root: P,
177 exclude_patterns: &[String],
178 compute_hash: bool,
179 ) -> Result<Vec<FileInfo>, SyncError> {
180 let mut files = Vec::new();
181 let root = root.as_ref();
182
183 if !root.exists() {
185 return Err(SyncError::SourceNotFound(root.to_path_buf()));
186 }
187
188 let entries = fs::read_dir(root).map_err(|e| {
190 debug!(
191 error = ?e,
192 path = %root.display(),
193 "Failed to read directory"
194 );
195 SyncError::IoError(e)
196 })?;
197
198 for entry in entries {
200 let entry = match entry {
201 Ok(entry) => entry,
202 Err(e) => {
203 warn!(
204 error = ?e,
205 dir = %root.display(),
206 "Failed to read directory entry"
207 );
208 continue;
209 }
210 };
211
212 let path = entry.path();
213
214 if should_exclude(&path, root, exclude_patterns) {
216 debug!(path = %path.display(), "Skipped (excluded)");
217 continue;
218 }
219
220 if path.is_dir() {
221 match scan_directory(&path, exclude_patterns, compute_hash) {
222 Ok(mut sub_files) => {
223 files.append(&mut sub_files);
224 }
225 Err(e) => {
226 return Err(e);
227 }
228 }
229 } else {
230 match FileInfo::from_path(&path, compute_hash) {
231 Ok(info) => files.push(info),
232 Err(e) => {
233 warn!(
234 error = ?e,
235 path = %path.display(),
236 "Failed to read file metadata"
237 );
238 }
239 }
240 }
241 }
242
243 Ok(files)
244 }
245}
246
247mod filter {
252 use super::*;
253
254 pub fn should_exclude(path: &Path, root: &Path, exclude_patterns: &[String]) -> bool {
269 let relative = match path.strip_prefix(root) {
272 Ok(rel) => rel,
273 Err(_) => return false, };
275
276 let relative_str = relative.to_string_lossy();
278
279 for pattern in exclude_patterns {
281 if pattern.starts_with('/') {
283 if relative_str.starts_with(&pattern[1..]) {
285 return true;
286 }
287 } else if pattern.ends_with('/') {
288 if relative_str.starts_with(&*pattern)
290 || relative_str.contains(&format!("/{}", pattern))
291 {
292 return true;
293 }
294 } else {
295 let regex_pattern = pattern.replace('*', ".*");
297 if let Ok(regex) = regex::Regex::new(&format!("^{}$", regex_pattern)) {
298 if regex.is_match(&relative_str) {
299 return true;
300 }
301 }
302 }
303 }
304
305 if let Some(name) = relative.file_name().and_then(|s| s.to_str()) {
307 matches!(
308 name,
309 ".DS_Store" | ".fseventsd" | ".Trashes" | ".Spotlight-V100" | ".TemporaryItems"
310 ) || name.starts_with("._") } else {
312 false
313 }
314 }
315
316 pub fn should_sync(
328 source_info: &FileInfo,
329 target_info: Option<&FileInfo>,
330 checksum: bool,
331 ) -> bool {
332 match target_info {
333 None => true, Some(target) => {
335 if checksum {
336 !source_info.content_eq(target)
338 } else {
339 source_info.is_newer_than(target)
341 }
342 }
343 }
344 }
345}
346
347mod file_ops {
353 use super::*;
354 pub async fn copy_file(source: &Path, target: &Path, dry_run: bool) -> std::io::Result<()> {
367 if dry_run {
368 return Ok(());
369 }
370
371 if let Some(parent) = target.parent() {
373 tokio::fs::create_dir_all(parent).await?;
374 }
375
376 tokio::fs::copy(source, target).await?;
378 Ok(())
379 }
380
381 pub fn compute_blake3_hash(path: &Path) -> std::io::Result<[u8; 32]> {
382 let mut file = fs::File::open(path)?;
383 let mut hasher = blake3::Hasher::new();
384 std::io::copy(&mut file, &mut hasher)?;
385 Ok(hasher.finalize().into())
386 }
387
388 pub async fn delete_extra_files(
397 source: &PathBuf,
398 target: &PathBuf,
399 dry_run: bool,
400 exclude: &[String],
401 delete_exclude: &[String],
402 ) -> anyhow::Result<(Vec<PathBuf>, Vec<PathBuf>, Vec<(PathBuf, String)>)> {
403 use std::collections::HashSet;
404
405 let source_files: HashSet<String> = scan_directory(source, exclude, false)?
407 .into_iter()
408 .filter_map(|info| {
409 info.path
410 .strip_prefix(source)
411 .ok()
412 .map(|rel| rel.to_string_lossy().to_string())
413 })
414 .collect();
415
416 let mut to_delete = Vec::new();
418 scan_target_for_deletion(
419 target,
420 target,
421 &source,
422 &source_files,
423 exclude,
424 delete_exclude,
425 &mut to_delete,
426 )
427 .await?;
428
429 let mut deleted = Vec::new();
431 let mut would_delete = Vec::new();
432 let mut delete_errors = Vec::new();
433
434 for path in &to_delete {
436 if dry_run {
437 would_delete.push(path.clone());
438 } else {
439 match tokio::fs::remove_file(path).await {
440 Ok(()) => {
441 deleted.push(path.clone());
442 would_delete.push(path.clone());
443 }
444 Err(e) => {
445 delete_errors.push((path.clone(), e.to_string()));
446 }
447 }
448 }
449 }
450
451 Ok((deleted, would_delete, delete_errors))
452 }
453
454 pub async fn scan_target_for_deletion(
465 current: &PathBuf,
466 target_root: &PathBuf,
467 source_root: &PathBuf,
468 source_files: &std::collections::HashSet<String>,
469 exclude: &[String],
470 delete_exclude: &[String],
471 to_delete: &mut Vec<PathBuf>,
472 ) -> std::io::Result<()> {
473 let mut dir = tokio::fs::read_dir(current).await?;
474
475 while let Some(entry) = dir.next_entry().await? {
476 let path = entry.path();
477
478 if path.is_dir() {
479 let future = scan_target_for_deletion(
481 &path,
482 target_root,
483 source_root,
484 source_files,
485 exclude,
486 delete_exclude,
487 to_delete,
488 );
489 Box::pin(future).await?;
490 } else {
491 if let Ok(rel_path) = path.strip_prefix(target_root) {
492 let rel_str = rel_path.to_string_lossy().to_string();
493 if !source_files.contains(&rel_str)
494 && !should_exclude(&path, source_root, exclude)
495 && !should_exclude(&path, target_root, delete_exclude)
496 {
498 to_delete.push(path);
499 }
500 }
501 }
502 }
503
504 Ok(())
505 }
506}
507
508mod sync_logic {
513 use super::*;
514
515 pub struct SyncOptions {
516 pub dry_run: bool,
517 pub excludes: Vec<String>,
518 pub checksum: bool,
519 pub delete_extra: bool,
520 pub delete_excludes: Vec<String>,
521 }
522
523 impl Default for SyncOptions {
524 fn default() -> Self {
525 Self {
526 dry_run: false,
527 excludes: vec![],
528 checksum: false,
529 delete_extra: false,
530 delete_excludes: vec![],
531 }
532 }
533 }
534
535 pub async fn sync_directories(params: &SyncParameters) -> anyhow::Result<SyncReport> {
555 let options = SyncOptions {
556 dry_run: params.dry_run,
557 excludes: params.excludes.clone(),
558 checksum: params.checksum,
559 delete_extra: params.delete_extra,
560 delete_excludes: params.delete_excludes.clone(),
561 };
562
563 let mut report = SyncReport::default(); let source_files = scan_directory(¶ms.source, &options.excludes, options.checksum)
567 .map_err(|e| anyhow::anyhow!("Failed to scan source directory -> {}", e))?;
568
569 let mut sync_queue = Vec::new();
571 let mut total_sync_size: u64 = 0;
572
573 for source_info in &source_files {
574 let relative = source_info
575 .path
576 .strip_prefix(¶ms.source)
577 .expect("File not under source root");
578 let target_path = params.target.join(relative);
579 let target_info = if target_path.exists() {
580 FileInfo::from_path(&target_path, options.checksum).ok()
581 } else {
582 None
583 };
584
585 if should_sync(source_info, target_info.as_ref(), options.checksum) {
587 sync_queue.push((source_info.clone(), target_path));
588 total_sync_size += source_info.size;
589 }
590 }
591
592 if options.delete_extra {
593 let (deleted, would_delete, delete_errors) = delete_extra_files(
594 ¶ms.source,
595 ¶ms.target,
596 options.dry_run,
597 &options.excludes,
598 &options.delete_excludes,
599 )
600 .await?;
601
602 report.deleted = deleted;
603 report.would_delete = would_delete;
604 report.delete_errors = delete_errors;
605 }
606
607 if sync_queue.is_empty()
609 && (!options.delete_extra
610 || report.would_delete.is_empty()
611 || report.deleted.is_empty())
612 {
613 print_report(
615 true,
616 &report,
617 options.dry_run,
618 options.delete_extra,
619 source_files.len(),
620 total_sync_size,
621 params.detail,
622 );
623 return Ok(report);
624 }
625
626 let mut processed_size = 0;
628
629 if options.dry_run {
630 for (source_info, _target_path) in &sync_queue {
632 report.copied.push(source_info.path.clone());
633 }
634 } else {
635 let pb = create_progress_bar(total_sync_size);
637
638 for (source_info, target_path) in &sync_queue {
639 match copy_file(&source_info.path, target_path, options.dry_run).await {
640 Ok(()) => {
641 report.copied.push(source_info.path.clone());
642 processed_size += source_info.size;
643 pb.set_position(processed_size);
644 debug!(
645 source = %source_info.path.display(),
646 target = %target_path.display(),
647 "File copied"
648 );
649 }
650 Err(e) => {
651 warn!(
652 error = ?e,
653 source = %source_info.path.display(),
654 target = %target_path.display(),
655 "Failed to copy file"
656 );
657 report.errors.push((target_path.clone(), e.to_string()));
659 processed_size += source_info.size;
660 pb.set_position(processed_size);
661 }
662 }
663 }
664
665 pb.finish_with_message("File sync completed");
666 }
667
668 if report.errors.len() > 0 {
669 warn!(count = report.errors.len(), "Some files failed to copy");
670 anyhow::bail!("Failed to copy {} files", report.errors.len());
671 }
672
673 print_report(
675 false,
676 &report,
677 options.dry_run,
678 options.delete_extra, source_files.len(),
680 total_sync_size,
681 params.detail,
682 );
683
684 Ok(report)
685 }
686}
687
688mod watcher {
694 use super::*;
695
696 pub async fn watch_task(
708 params: &SyncParameters,
709 delay_ms: u64,
710 ) -> anyhow::Result<SyncReport, SyncError> {
711 let mut total_report = SyncReport::default(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
726
727 let mut watcher =
735 recommended_watcher(move |res: std::result::Result<Event, notify::Error>| {
736 match res {
737 Ok(event) => {
738 match event.kind {
741 EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) => {
743 let _ = tx.send(event);
744 }
745 _ => {
746 debug!(event = ?event, "Ignored file system event");
747 }
748 }
749 }
750 Err(error) => {
751 error!("📁 File watch error: {}", error)
753 }
754 }
755 })
756 .map_err(|e| anyhow::anyhow!("Failed to create file watcher: {}", e))?;
757
758 watcher
760 .watch(¶ms.source, RecursiveMode::Recursive)
761 .map_err(|e| {
762 anyhow::anyhow!(
763 "Failed to watch directory '{}': {}",
764 params.source.display(),
765 e
766 )
767 })?;
768
769 info!(
770 "Started watching: {} → {}",
771 params.source.display(),
772 params.target.display()
773 );
774
775 loop {
777 if rx.recv().await.is_none() {
782 info!("Watcher channel closed, exiting...");
783 break; }
785
786 debug!(
787 "Change detected, starting debounce period of {}ms...",
788 delay_ms
789 );
790
791 loop {
794 match tokio::time::timeout(Duration::from_millis(delay_ms), rx.recv()).await {
797 Ok(Some(_)) => {
798 debug!("Another change detected, restarting debounce timer...");
800 continue; }
802 Ok(None) => {
803 info!("Watcher channel closed during debounce.");
805 return Ok(total_report); }
807 Err(_) => {
808 debug!("Debounce period ended with no further changes.");
811 break; }
813 }
814 }
815 debug!("📁 Detected stable changes → syncing...");
819 match sync_directories(¶ms).await {
820 Ok(report) => {
821 debug!("✅ Sync completed successfully");
822 total_report.copied.extend(report.copied);
823 total_report.errors.extend(report.errors);
824 }
825 Err(e) => {
826 error!(
827 error = ?e,
828 source = %params.source.display(),
829 target = %params.target.display(),
830 "Sync failed during watch"
831 );
832 total_report
833 .errors
834 .push((params.source.clone(), e.to_string()));
835 }
836 }
837
838 }
840
841 Ok(total_report)
842 }
843}
844
845mod report {
850 use super::*;
851 use std::fmt::Write;
852
853 #[derive(Debug, Default)]
855 pub struct SyncReport {
856 pub copied: Vec<PathBuf>, pub errors: Vec<(PathBuf, String)>, pub deleted: Vec<PathBuf>, pub would_delete: Vec<PathBuf>, pub delete_errors: Vec<(PathBuf, String)>, }
862
863 pub fn print_report(
865 is_latest: bool,
866 report: &SyncReport,
867 dry_run: bool,
868 delete_extra: bool,
869 total_source_files: usize,
870 total_sync_size: u64,
871 detail: bool,
872 ) {
873 if is_latest {
874 warn!("未发现待同步的文件");
875 return;
876 }
877 let mut output = String::new();
878
879 writeln!(
881 output,
882 "{}\n源文件总数:{},{}同步文件数: {} ({})",
883 if dry_run {
884 "试运行模式"
885 } else {
886 "同步成功!"
887 },
888 total_source_files.to_formatted_string(&Locale::en),
889 if dry_run { "待" } else { "" },
890 report.copied.len().to_formatted_string(&Locale::en),
891 format_file_size(total_sync_size)
892 )
893 .unwrap();
894
895 if detail && !report.copied.is_empty() {
896 writeln!(output, "{}同步的文件:", if dry_run { "待" } else { "" }).unwrap();
897 for path in &report.copied {
898 writeln!(output, " - {}", path.display()).unwrap();
899 }
900 }
901
902 if !dry_run && !report.errors.is_empty() {
904 writeln!(
905 output,
906 "同步错误数: {}",
907 report.errors.len().to_formatted_string(&Locale::en)
908 )
909 .unwrap();
910 }
911
912 if detail && !report.errors.is_empty() {
913 writeln!(output, "同步错误详情:").unwrap();
914 for (path, err) in &report.errors {
915 writeln!(output, " - {}: {}", path.display(), err).unwrap();
916 }
917 }
918
919 if delete_extra {
921 let has_delete_data = if dry_run {
922 !report.would_delete.is_empty() } else {
924 !report.deleted.is_empty() || !report.delete_errors.is_empty() };
926
927 if has_delete_data {
928 if dry_run {
929 writeln!(
930 output,
931 "待删除文件数: {}",
932 report.would_delete.len().to_formatted_string(&Locale::en)
933 )
934 .unwrap();
935 if detail && !report.would_delete.is_empty() {
936 writeln!(output, "待删除的文件:").unwrap();
937 for path in &report.would_delete {
938 writeln!(output, " - {}", path.display()).unwrap();
939 }
940 }
941 } else {
942 writeln!(
943 output,
944 "已删除文件数: {}",
945 report.deleted.len().to_formatted_string(&Locale::en)
946 )
947 .unwrap();
948 if detail && !report.deleted.is_empty() {
949 writeln!(output, "已删除的文件:").unwrap();
950 for path in &report.deleted {
951 writeln!(output, " - {}", path.display()).unwrap();
952 }
953 }
954
955 if !report.delete_errors.is_empty() {
956 writeln!(
957 output,
958 "删除错误数: {}",
959 report.delete_errors.len().to_formatted_string(&Locale::en)
960 )
961 .unwrap();
962 if detail {
963 writeln!(output, "删除错误详情:").unwrap();
964 for (path, err) in &report.delete_errors {
965 writeln!(output, " - {}: {}", path.display(), err).unwrap();
966 }
967 }
968 }
969 }
970 }
971 }
972
973 let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
974 info!("[{}] {}", timestamp, output);
975 }
976}
977
978use crate::sync::file_ops::compute_blake3_hash;
983pub use file_ops::{copy_file, delete_extra_files};
984pub use filter::{should_exclude, should_sync};
985pub use report::{SyncReport, print_report};
986pub use scanner::scan_directory;
987pub use sync_logic::{SyncOptions, sync_directories};
988pub use watcher::watch_task;