1use super::file_ops::{copy_file, delete_extra_files};
2use super::filter::should_sync;
3use super::report::{SyncReport, print_report};
4use super::scanner::scan_directory;
5use super::types::{FileInfo, SyncParameters};
6use crate::utils::create_progress_bar;
7use chrono::Utc;
8use futures::stream::{FuturesUnordered, StreamExt};
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicU64, Ordering};
13use tokio::sync::Semaphore;
14use tracing::{debug, warn};
15
16pub struct SyncOptions {
21 pub dry_run: bool,
22 pub excludes: Vec<String>,
23 pub checksum: bool,
24 pub delete_extra: bool,
25 pub delete_excludes: Vec<String>,
26}
27
28impl Default for SyncOptions {
29 fn default() -> Self {
30 Self {
31 dry_run: false,
32 excludes: vec![],
33 checksum: false,
34 delete_extra: false,
35 delete_excludes: vec![],
36 }
37 }
38}
39
40pub async fn sync_directories(params: &SyncParameters) -> anyhow::Result<SyncReport> {
58 let options = SyncOptions {
59 dry_run: params.dry_run,
60 excludes: params.excludes.clone(),
61 checksum: params.checksum,
62 delete_extra: params.delete_extra,
63 delete_excludes: params.delete_excludes.clone(),
64 };
65
66
67 let mut report = SyncReport::default(); println!("当前时间戳1: {}", Utc::now().timestamp());
69
70 let source_files = scan_directory(¶ms.source, &options.excludes, options.checksum)
72 .map_err(|e| anyhow::anyhow!("Failed to scan source directory -> {}", e))?;
73 println!("当前时间戳2: {}", Utc::now().timestamp());
74
75 let target_cache: HashMap<String, FileInfo> = if params.target.exists() {
77 match scan_directory(¶ms.target, &options.excludes, options.checksum) {
78 Ok(target_files) => target_files
79 .into_iter()
80 .filter_map(|info| {
81 let relative = info
82 .path
83 .strip_prefix(¶ms.target)
84 .map(|p| p.to_string_lossy().to_string())
85 .ok();
86 relative.map(|rel| (rel, info))
87 })
88 .collect(),
89 Err(e) => {
90 warn!(error = ?e, "Failed to scan target directory, proceeding with empty cache");
91 HashMap::new()
92 }
93 }
94 } else {
95 debug!("Target directory does not exist, skipping target scan");
96 HashMap::new()
97 };
98
99 let mut sync_queue = Vec::new();
101 let mut total_sync_size: u64 = 0;
102
103 for source_info in &source_files {
104 let relative = source_info
105 .path
106 .strip_prefix(¶ms.source)
107 .expect("File not under source root");
108
109 let relative_str = relative.to_string_lossy().to_string();
110 let target_path = params.target.join(relative);
111
112 let target_info = target_cache.get(&relative_str);
113
114 if should_sync(source_info, target_info, options.checksum) {
116 sync_queue.push((source_info.clone(), target_path));
117 total_sync_size += source_info.size;
118 }
119 }
120
121 if options.delete_extra {
122 if !params.target.exists() {
123 std::fs::create_dir_all(¶ms.target)
124 .map_err(|e| anyhow::anyhow!("Failed to create target directory for deletion: {}", e))?;
125 }
126
127 let (deleted, would_delete, delete_errors) = delete_extra_files(
128 ¶ms.source,
129 ¶ms.target,
130 options.dry_run,
131 &options.excludes,
132 &options.delete_excludes,
133 )
134 .await?;
135
136 report.deleted = deleted;
137 report.would_delete = would_delete;
138 report.delete_errors = delete_errors;
139 }
140
141 if sync_queue.is_empty()
143 && (!options.delete_extra || report.would_delete.is_empty() || report.deleted.is_empty())
144 {
145 print_report(
147 true,
148 &report,
149 options.dry_run,
150 options.delete_extra,
151 source_files.len(),
152 total_sync_size,
153 params.detail,
154 );
155 return Ok(report);
156 }
157
158 if options.dry_run {
162 for (source_info, _target_path) in &sync_queue {
164 report.copied.push((*source_info.path).to_path_buf());
166 }
167 } else {
168 let pb = create_progress_bar(total_sync_size);
170
171 let processed_bytes = Arc::new(AtomicU64::new(0));
173
174 let semaphore = Arc::new(Semaphore::new(8));
176
177 use tokio::sync::Notify;
179 let notify = Arc::new(Notify::new());
180 let notify_clone = notify.clone();
181
182 let pb_clone_for_refresh = pb.clone();
184 let processed_bytes_for_refresh = processed_bytes.clone();
185 let refresh_handle = tokio::spawn(async move {
186 let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(100));
187 loop {
188 interval.tick().await;
189 let pos = processed_bytes_for_refresh.load(Ordering::Relaxed);
190 if pos >= total_sync_size {
191 break;
192 }
193 pb_clone_for_refresh.set_position(pos);
194 }
195 pb_clone_for_refresh.set_position(total_sync_size); });
197
198 let mut tasks = FuturesUnordered::new();
200
201 for (source_info, target_path) in &sync_queue {
202 let source_path = source_info.path.clone();
203 let target_path_clone = target_path.clone();
204 let size = source_info.size;
205 let pb_clone = pb.clone();
206 let processed_bytes_clone = processed_bytes.clone();
207 let semaphore_clone = semaphore.clone();
208 let source_display = source_path.display().to_string();
209 let target_display = target_path_clone.display().to_string();
210
211 let task = tokio::spawn(async move {
212 let _permit = semaphore_clone.acquire().await.unwrap();
214
215 let progress_cb = |bytes: u64| {
216 let _ = processed_bytes_clone.fetch_add(bytes, Ordering::Relaxed);
217 };
218
219 let result = copy_file(&source_path, &target_path_clone, false, Some(&processed_bytes_clone)).await;
220
221 let current = processed_bytes_clone.fetch_add(size, Ordering::Relaxed) + size;
223 pb_clone.set_position(current);
224
225 (
226 result,
227 source_path,
228 target_path_clone,
229 source_display,
230 target_display,
231 )
232 });
233
234 tasks.push(task);
235 }
236
237 while let Some(result) = tasks.next().await {
239 match result {
240 Ok((Ok(()), source_path, _target_path, source_display, target_display)) => {
241 report.copied.push((source_path).to_path_buf());
242 debug!(
244 source = %source_display,
245 target = %target_display,
246 "File copied"
247 );
248 }
249 Ok((Err(e), _source_path, target_path, source_display, target_display)) => {
250 warn!(
251 error = ?e,
252 source = %source_display,
253 target = %target_display,
254 "Failed to copy file"
255 );
256 report.errors.push((target_path, e.to_string()));
257 }
258 Err(join_error) => {
259 warn!(error = ?join_error, "Copy task panicked");
261 report.errors.push((PathBuf::new(), join_error.to_string()));
262 }
263 }
264 }
265 notify.notify_waiters();
266
267 let _ = refresh_handle.await;
269
270 pb.finish_with_message("File sync completed");
271 }
272
273 if report.errors.len() > 0 {
274 warn!(count = report.errors.len(), "Some files failed to copy");
275 anyhow::bail!("Failed to copy {} files", report.errors.len());
276 }
277
278 print_report(
280 false,
281 &report,
282 options.dry_run,
283 options.delete_extra, source_files.len(),
285 total_sync_size,
286 params.detail,
287 );
288
289 Ok(report)
290}