Skip to main content

syncbox/sync/
watcher.rs

1use std::time::Duration;
2use notify::{recommended_watcher, Event, EventKind, RecursiveMode, Watcher};
3use tracing::{debug, error, info};
4use crate::infra::error::SyncError;
5use super::sync_logic::sync_directories;
6use super::report::SyncReport;
7use super::types::SyncParameters;
8
9// ==============================================
10// 模块 5:监听器(Watcher)
11// 文件系统监听,实时同步
12// ==============================================
13
14
15/// 启动文件系统监听器,实时同步源目录变更到目标目录。
16///
17/// 使用 `notify` crate 监听文件事件,支持防抖(debounce)机制。
18///
19/// # 参数
20/// * `params` - 同步参数(源/目标路径、排除规则等)。
21/// * `delay_ms` - 防抖延迟时间(毫秒),连续修改后等待此时间再触发同步。
22///
23/// # 返回
24/// * `Ok(SyncReport)` - 累计所有同步操作的报告。
25/// * `Err(SyncError)` - 监听器创建或同步过程中发生错误。
26///
27/// # 注意
28/// - 通常不启用 `dry_run` 和 `checksum`(由调用方决定)。
29/// - 监听 `Create`, `Modify`, `Remove` 事件。
30/// - 使用异步通道与主循环通信。
31pub async fn watch_task(
32    params: &SyncParameters,
33    delay_ms: u64,
34) -> anyhow::Result<SyncReport, SyncError> {
35    // let options = SyncOptions {
36    //     dry_run: false, // watch 模式通常不是 dry_run
37    //     excludes: params.excludes.clone(),
38    //     delete_extra: params.delete_extra,
39    //     checksum: false,
40    //     delete_excludes: params.delete_excludes.clone(),
41    // };
42
43    let mut total_report = SyncReport::default(); // 累计所有同步的报告
44
45    // 3. 创建一个异步 channel,用于从文件监听线程向主异步循环传递事件
46    //    - unbounded_channel:不限制缓冲区大小,避免事件丢失
47    //    - tx: 发送端(在监听回调中使用)
48    //    - rx: 接收端(在主循环中使用)
49    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
50
51    // 4. 创建文件系统监听器(watcher)
52    //    `recommended_watcher` 会根据操作系统自动选择最优后端:
53    //    - macOS: FSEvents
54    //    - Linux: inotify
55    //    - Windows: ReadDirectoryChangesW
56    //
57    //    回调函数会在后台线程中被调用,所以必须是 'static + Send
58    let mut watcher =
59        recommended_watcher(move |res: Result<Event, notify::Error>| {
60            match res {
61                Ok(event) => {
62                    // 只关心三类事件:修改、创建、删除
63                    // 忽略元数据变更(如访问时间)、重命名等,避免过度触发
64                    match event.kind {
65                        // 只处理文件内容修改和创建事件
66                        EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) => {
67                            let _ = tx.send(event);
68                        }
69                        _ => {
70                            debug!(event = ?event, "Ignored file system event");
71                        }
72                    }
73                }
74                Err(error) => {
75                    // 监听过程中发生错误(如权限不足、路径不存在)
76                    error!("📁 File watch error: {}", error)
77                }
78            }
79        })
80            .map_err(|e| anyhow::anyhow!("Failed to create file watcher: {}", e))?;
81
82    // 5. 开始监听源目录(递归监听所有子目录)
83    watcher
84        .watch(&params.source, RecursiveMode::Recursive)
85        .map_err(|e| {
86            anyhow::anyhow!(
87                    "Failed to watch directory '{}': {}",
88                    params.source.display(),
89                    e
90                )
91        })?;
92
93    info!(
94            "Started watching: {} → {}",
95            params.source.display(),
96            params.target.display()
97        );
98
99    // 6. 主事件循环:接收文件变化事件并处理
100    loop {
101        // --- 防抖机制开始 ---
102        // 我们希望:用户连续修改文件时,只在“最后一次修改后 delay_ms 毫秒”才同步一次
103
104        // 6.1 等待第一个文件变化事件
105        if rx.recv().await.is_none() {
106            info!("Watcher channel closed, exiting...");
107            break; // channel 被关闭,退出循环(通常是程序终止)
108        }
109
110        debug!(
111                "Change detected, starting debounce period of {}ms...",
112                delay_ms
113            );
114
115        // 6.2 进入防抖等待状态
116        //     使用一个内层循环,持续检查是否有新事件到来
117        loop {
118            // 尝试在 `delay_ms` 毫秒内接收下一个事件
119            // 如果收到新事件,说明用户还在修改,需要“重置”防抖计时器
120            match tokio::time::timeout(Duration::from_millis(delay_ms), rx.recv()).await {
121                Ok(Some(_)) => {
122                    // 又有新事件!说明文件还在被修改,重新开始等待
123                    debug!("Another change detected, restarting debounce timer...");
124                    continue; // 继续等待
125                }
126                Ok(None) => {
127                    // channel 被关闭(发送端关闭)
128                    info!("Watcher channel closed during debounce.");
129                    return Ok(total_report); // 正常退出
130                }
131                Err(_) => {
132                    // timeout 超时!说明在 delay_ms 毫秒内没有新事件
133                    // 👉 这正是我们想要的:用户已经“停止”修改文件
134                    debug!("Debounce period ended with no further changes.");
135                    break; // 跳出内层循环,准备执行同步
136                }
137            }
138        }
139        // --- 防抖机制结束 ---
140
141        // 7. 执行同步操作
142        debug!("📁 Detected stable changes → syncing...");
143        match sync_directories(&params).await {
144            Ok(report) => {
145                debug!("✅ Sync completed successfully");
146                total_report.copied.extend(report.copied);
147                total_report.errors.extend(report.errors);
148            }
149            Err(e) => {
150                error!(
151                        error = ?e,
152                        source = %params.source.display(),
153                        target = %params.target.display(),
154                        "Sync failed during watch"
155                    );
156                total_report
157                    .errors
158                    .push((params.source.clone(), e.to_string()));
159            }
160        }
161
162        // 8. 同步完成,回到外层循环,继续等待下一次变化
163    }
164
165    Ok(total_report)
166}