syncbox/sync/watcher.rs
1use std::time::Duration;
2use notify::{recommended_watcher, Event, EventKind, RecursiveMode, Watcher};
3use tracing::{debug, error, info};
4use crate::infra::error::SyncError;
5use super::sync_logic::sync_directories;
6use super::report::SyncReport;
7use super::types::SyncParameters;
8
9// ==============================================
10// 模块 5:监听器(Watcher)
11// 文件系统监听,实时同步
12// ==============================================
13
14
15/// 启动文件系统监听器,实时同步源目录变更到目标目录。
16///
17/// 使用 `notify` crate 监听文件事件,支持防抖(debounce)机制。
18///
19/// # 参数
20/// * `params` - 同步参数(源/目标路径、排除规则等)。
21/// * `delay_ms` - 防抖延迟时间(毫秒),连续修改后等待此时间再触发同步。
22///
23/// # 返回
24/// * `Ok(SyncReport)` - 累计所有同步操作的报告。
25/// * `Err(SyncError)` - 监听器创建或同步过程中发生错误。
26///
27/// # 注意
28/// - 通常不启用 `dry_run` 和 `checksum`(由调用方决定)。
29/// - 监听 `Create`, `Modify`, `Remove` 事件。
30/// - 使用异步通道与主循环通信。
31pub async fn watch_task(
32 params: &SyncParameters,
33 delay_ms: u64,
34) -> anyhow::Result<SyncReport, SyncError> {
35 // let options = SyncOptions {
36 // dry_run: false, // watch 模式通常不是 dry_run
37 // excludes: params.excludes.clone(),
38 // delete_extra: params.delete_extra,
39 // checksum: false,
40 // delete_excludes: params.delete_excludes.clone(),
41 // };
42
43 let mut total_report = SyncReport::default(); // 累计所有同步的报告
44
45 // 3. 创建一个异步 channel,用于从文件监听线程向主异步循环传递事件
46 // - unbounded_channel:不限制缓冲区大小,避免事件丢失
47 // - tx: 发送端(在监听回调中使用)
48 // - rx: 接收端(在主循环中使用)
49 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
50
51 // 4. 创建文件系统监听器(watcher)
52 // `recommended_watcher` 会根据操作系统自动选择最优后端:
53 // - macOS: FSEvents
54 // - Linux: inotify
55 // - Windows: ReadDirectoryChangesW
56 //
57 // 回调函数会在后台线程中被调用,所以必须是 'static + Send
58 let mut watcher =
59 recommended_watcher(move |res: Result<Event, notify::Error>| {
60 match res {
61 Ok(event) => {
62 // 只关心三类事件:修改、创建、删除
63 // 忽略元数据变更(如访问时间)、重命名等,避免过度触发
64 match event.kind {
65 // 只处理文件内容修改和创建事件
66 EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) => {
67 let _ = tx.send(event);
68 }
69 _ => {
70 debug!(event = ?event, "Ignored file system event");
71 }
72 }
73 }
74 Err(error) => {
75 // 监听过程中发生错误(如权限不足、路径不存在)
76 error!("📁 File watch error: {}", error)
77 }
78 }
79 })
80 .map_err(|e| anyhow::anyhow!("Failed to create file watcher: {}", e))?;
81
82 // 5. 开始监听源目录(递归监听所有子目录)
83 watcher
84 .watch(¶ms.source, RecursiveMode::Recursive)
85 .map_err(|e| {
86 anyhow::anyhow!(
87 "Failed to watch directory '{}': {}",
88 params.source.display(),
89 e
90 )
91 })?;
92
93 info!(
94 "Started watching: {} → {}",
95 params.source.display(),
96 params.target.display()
97 );
98
99 // 6. 主事件循环:接收文件变化事件并处理
100 loop {
101 // --- 防抖机制开始 ---
102 // 我们希望:用户连续修改文件时,只在“最后一次修改后 delay_ms 毫秒”才同步一次
103
104 // 6.1 等待第一个文件变化事件
105 if rx.recv().await.is_none() {
106 info!("Watcher channel closed, exiting...");
107 break; // channel 被关闭,退出循环(通常是程序终止)
108 }
109
110 debug!(
111 "Change detected, starting debounce period of {}ms...",
112 delay_ms
113 );
114
115 // 6.2 进入防抖等待状态
116 // 使用一个内层循环,持续检查是否有新事件到来
117 loop {
118 // 尝试在 `delay_ms` 毫秒内接收下一个事件
119 // 如果收到新事件,说明用户还在修改,需要“重置”防抖计时器
120 match tokio::time::timeout(Duration::from_millis(delay_ms), rx.recv()).await {
121 Ok(Some(_)) => {
122 // 又有新事件!说明文件还在被修改,重新开始等待
123 debug!("Another change detected, restarting debounce timer...");
124 continue; // 继续等待
125 }
126 Ok(None) => {
127 // channel 被关闭(发送端关闭)
128 info!("Watcher channel closed during debounce.");
129 return Ok(total_report); // 正常退出
130 }
131 Err(_) => {
132 // timeout 超时!说明在 delay_ms 毫秒内没有新事件
133 // 👉 这正是我们想要的:用户已经“停止”修改文件
134 debug!("Debounce period ended with no further changes.");
135 break; // 跳出内层循环,准备执行同步
136 }
137 }
138 }
139 // --- 防抖机制结束 ---
140
141 // 7. 执行同步操作
142 debug!("📁 Detected stable changes → syncing...");
143 match sync_directories(¶ms).await {
144 Ok(report) => {
145 debug!("✅ Sync completed successfully");
146 total_report.copied.extend(report.copied);
147 total_report.errors.extend(report.errors);
148 }
149 Err(e) => {
150 error!(
151 error = ?e,
152 source = %params.source.display(),
153 target = %params.target.display(),
154 "Sync failed during watch"
155 );
156 total_report
157 .errors
158 .push((params.source.clone(), e.to_string()));
159 }
160 }
161
162 // 8. 同步完成,回到外层循环,继续等待下一次变化
163 }
164
165 Ok(total_report)
166}