Skip to main content

syncbox/sync/
sync_logic.rs

1use super::file_ops::{copy_file, delete_extra_files};
2use super::filter::should_sync;
3use super::report::{SyncReport, print_report};
4use super::scanner::scan_directory;
5use super::types::{FileInfo, SyncParameters};
6use crate::utils::create_progress_bar;
7use chrono::Utc;
8use futures::stream::{FuturesUnordered, StreamExt};
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicU64, Ordering};
13use tokio::sync::Semaphore;
14use tracing::{debug, warn};
15
16// ==============================================
17// 模块 4:同步逻辑(SyncLogic)
18// ==============================================
19
20pub struct SyncOptions {
21    pub dry_run: bool,
22    pub excludes: Vec<String>,
23    pub checksum: bool,
24    pub delete_extra: bool,
25    pub delete_excludes: Vec<String>,
26}
27
28impl Default for SyncOptions {
29    fn default() -> Self {
30        Self {
31            dry_run: false,
32            excludes: vec![],
33            checksum: false,
34            delete_extra: false,
35            delete_excludes: vec![],
36        }
37    }
38}
39
40/// 执行一次完整的目录同步操作。
41///
42/// 包括扫描源目录、比对目标文件、复制差异文件、可选删除多余文件。
43///
44/// # 参数
45/// * `params` - 同步参数结构体,包含源/目标路径、dry-run、checksum、排除规则等。
46///
47/// # 返回
48/// * `Ok(SyncReport)` - 同步操作报告,包含成功、失败、删除等统计信息。
49/// * `Err(anyhow::Error)` - 扫描、复制或删除过程中发生致命错误。
50///
51/// # 流程
52/// 1. 扫描源目录。
53/// 2. 构建同步队列(需复制的文件)。
54/// 3.(可选)删除目标端多余文件。
55/// 4. 执行文件复制(带进度条)。
56/// 5. 生成并打印报告。
57pub async fn sync_directories(params: &SyncParameters) -> anyhow::Result<SyncReport> {
58    let options = SyncOptions {
59        dry_run: params.dry_run,
60        excludes: params.excludes.clone(),
61        checksum: params.checksum,
62        delete_extra: params.delete_extra,
63        delete_excludes: params.delete_excludes.clone(),
64    };
65
66
67    let mut report = SyncReport::default(); // 初始化报告
68    println!("当前时间戳1: {}", Utc::now().timestamp());
69
70    // 1. 扫描源目录获取所有文件
71    let source_files = scan_directory(&params.source, &options.excludes, options.checksum)
72        .map_err(|e| anyhow::anyhow!("Failed to scan source directory -> {}", e))?;
73    println!("当前时间戳2: {}", Utc::now().timestamp());
74
75    // 2.预扫描目标目录,构建缓存
76    let target_cache: HashMap<String, FileInfo> = if params.target.exists() {
77        match scan_directory(&params.target, &options.excludes, options.checksum) {
78            Ok(target_files) => target_files
79                .into_iter()
80                .filter_map(|info| {
81                    let relative = info
82                        .path
83                        .strip_prefix(&params.target)
84                        .map(|p| p.to_string_lossy().to_string())
85                        .ok();
86                    relative.map(|rel| (rel, info))
87                })
88                .collect(),
89            Err(e) => {
90                warn!(error = ?e, "Failed to scan target directory, proceeding with empty cache");
91                HashMap::new()
92            }
93        }
94    } else {
95        debug!("Target directory does not exist, skipping target scan");
96        HashMap::new()
97    };
98
99    // 2. 预扫描:筛选出需要同步的文件,并计算总大小
100    let mut sync_queue = Vec::new();
101    let mut total_sync_size: u64 = 0;
102
103    for source_info in &source_files {
104        let relative = source_info
105            .path
106            .strip_prefix(&params.source)
107            .expect("File not under source root");
108
109        let relative_str = relative.to_string_lossy().to_string();
110        let target_path = params.target.join(relative);
111
112        let target_info = target_cache.get(&relative_str);
113
114        // 判断是否需要同步,只将需要同步的文件加入队列
115        if should_sync(source_info, target_info, options.checksum) {
116            sync_queue.push((source_info.clone(), target_path));
117            total_sync_size += source_info.size;
118        }
119    }
120
121    if options.delete_extra {
122        if !params.target.exists() {
123            std::fs::create_dir_all(&params.target)
124                .map_err(|e| anyhow::anyhow!("Failed to create target directory for deletion: {}", e))?;
125        }
126
127        let (deleted, would_delete, delete_errors) = delete_extra_files(
128            &params.source,
129            &params.target,
130            options.dry_run,
131            &options.excludes,
132            &options.delete_excludes,
133        )
134        .await?;
135
136        report.deleted = deleted;
137        report.would_delete = would_delete;
138        report.delete_errors = delete_errors;
139    }
140
141    // 检查是否有需要同步的文件
142    if sync_queue.is_empty()
143        && (!options.delete_extra || report.would_delete.is_empty() || report.deleted.is_empty())
144    {
145        // 没有文件需要同步,直接返回
146        print_report(
147            true,
148            &report,
149            options.dry_run,
150            options.delete_extra,
151            source_files.len(),
152            total_sync_size,
153            params.detail,
154        );
155        return Ok(report);
156    }
157
158    // 4. 处理同步队列
159    // let mut processed_size = 0;
160
161    if options.dry_run {
162        // Dry-run 模式:列出所有将被同步的文件
163        for (source_info, _target_path) in &sync_queue {
164            // report.copied.push(source_info.path.clone());
165            report.copied.push((*source_info.path).to_path_buf());
166        }
167    } else {
168        // 正常模式:初始化进度条
169        let pb = create_progress_bar(total_sync_size);
170
171        // 原子计数器,用于并发更新进度
172        let processed_bytes = Arc::new(AtomicU64::new(0));
173
174        // 控制最大并发数(可根据系统调整)
175        let semaphore = Arc::new(Semaphore::new(8));
176
177        // 👇 新增:用于通知进度刷新任务立即结束
178        use tokio::sync::Notify;
179        let notify = Arc::new(Notify::new());
180        let notify_clone = notify.clone();
181
182        // 👇 新增:启动进度刷新任务
183        let pb_clone_for_refresh = pb.clone();
184        let processed_bytes_for_refresh = processed_bytes.clone();
185        let refresh_handle = tokio::spawn(async move {
186            let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(100));
187            loop {
188                interval.tick().await;
189                let pos = processed_bytes_for_refresh.load(Ordering::Relaxed);
190                if pos >= total_sync_size {
191                    break;
192                }
193                pb_clone_for_refresh.set_position(pos);
194            }
195            pb_clone_for_refresh.set_position(total_sync_size); // 确保最终对齐
196        });
197
198        // 创建异步任务集合
199        let mut tasks = FuturesUnordered::new();
200
201        for (source_info, target_path) in &sync_queue {
202            let source_path = source_info.path.clone();
203            let target_path_clone = target_path.clone();
204            let size = source_info.size;
205            let pb_clone = pb.clone();
206            let processed_bytes_clone = processed_bytes.clone();
207            let semaphore_clone = semaphore.clone();
208            let source_display = source_path.display().to_string();
209            let target_display = target_path_clone.display().to_string();
210
211            let task = tokio::spawn(async move {
212                // 获取信号量许可(控制并发)
213                let _permit = semaphore_clone.acquire().await.unwrap();
214
215                let progress_cb = |bytes: u64| {
216                    let _ = processed_bytes_clone.fetch_add(bytes, Ordering::Relaxed);
217                };
218
219                let result = copy_file(&source_path, &target_path_clone, false, Some(&processed_bytes_clone)).await;
220
221                // 无论成功失败,都更新进度
222                let current = processed_bytes_clone.fetch_add(size, Ordering::Relaxed) + size;
223                pb_clone.set_position(current);
224
225                (
226                    result,
227                    source_path,
228                    target_path_clone,
229                    source_display,
230                    target_display,
231                )
232            });
233
234            tasks.push(task);
235        }
236
237        // 等待所有任务完成
238        while let Some(result) = tasks.next().await {
239            match result {
240                Ok((Ok(()), source_path, _target_path, source_display, target_display)) => {
241                    report.copied.push((source_path).to_path_buf());
242                // report.copied.push(source_path);
243                    debug!(
244                        source = %source_display,
245                        target = %target_display,
246                        "File copied"
247                    );
248                }
249                Ok((Err(e), _source_path, target_path, source_display, target_display)) => {
250                    warn!(
251                        error = ?e,
252                        source = %source_display,
253                        target = %target_display,
254                        "Failed to copy file"
255                    );
256                    report.errors.push((target_path, e.to_string()));
257                }
258                Err(join_error) => {
259                    // 任务 panic(理论上不应发生)
260                    warn!(error = ?join_error, "Copy task panicked");
261                    report.errors.push((PathBuf::new(), join_error.to_string()));
262                }
263            }
264        }
265        notify.notify_waiters();
266
267        // 等待刷新任务退出(现在会立即返回)
268        let _ = refresh_handle.await;
269
270        pb.finish_with_message("File sync completed");
271    }
272
273    if report.errors.len() > 0 {
274        warn!(count = report.errors.len(), "Some files failed to copy");
275        anyhow::bail!("Failed to copy {} files", report.errors.len());
276    }
277
278    // 5. 统一输出整合后的结果
279    print_report(
280        false,
281        &report,
282        options.dry_run,
283        options.delete_extra, // 新增:是否启用删除功能
284        source_files.len(),
285        total_sync_size,
286        params.detail,
287    );
288
289    Ok(report)
290}