hard_sync_core/
watcher.rs1use std::path::PathBuf;
2use std::sync::mpsc;
3use std::thread;
4use std::time::{Duration, Instant};
5
6use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
7
8use crate::config::{PairConfig, SourceSide};
9use crate::drive::find_mounted_drive;
10use crate::sync_engine::{sync_pair, SyncOptions, SyncReport};
11
12const DRIVE_POLL_INTERVAL: Duration = Duration::from_secs(3);
16
17const DEBOUNCE_DELAY: Duration = Duration::from_millis(500);
20
21pub enum WatchEvent {
25 DriveDetected { mount_point: PathBuf },
27 DriveRemoved,
29 SyncStarted,
31 SyncCompleted(SyncReport),
33 SyncError(String),
35 Watching,
37}
38
39pub struct WatchHandle {
42 stop_tx: mpsc::Sender<()>,
43 thread: Option<thread::JoinHandle<()>>,
44}
45
46impl WatchHandle {
47 pub fn wait(mut self) {
49 if let Some(t) = self.thread.take() {
50 let _ = t.join();
51 }
52 }
53
54 pub fn stop(&self) {
56 let _ = self.stop_tx.send(());
57 }
58}
59
60enum Msg {
63 FileChanged,
64 DriveCheck,
65 Stop,
66}
67
68pub fn watch_pair(
73 name: &str,
74 on_event: impl Fn(WatchEvent) + Send + 'static,
75) -> Result<WatchHandle, String> {
76 let pair = crate::config::get_pair(name)?;
77 let (stop_tx, stop_rx) = mpsc::channel::<()>();
78 let name = name.to_string();
79
80 let thread = thread::spawn(move || {
81 run_watch_loop(name, pair, on_event, stop_rx);
82 });
83
84 Ok(WatchHandle {
85 stop_tx,
86 thread: Some(thread),
87 })
88}
89
90fn run_watch_loop(
93 name: String,
94 pair: PairConfig,
95 on_event: impl Fn(WatchEvent),
96 stop_rx: mpsc::Receiver<()>,
97) {
98 let (msg_tx, msg_rx) = mpsc::channel::<Msg>();
99
100 let source_path = match pair.source {
102 SourceSide::Base => pair.base.clone(),
103 SourceSide::Target => pair.target.clone(),
104 };
105
106 let file_tx = msg_tx.clone();
108 let mut watcher: RecommendedWatcher =
109 match notify::recommended_watcher(move |res: notify::Result<Event>| {
110 if res.is_ok() {
111 let _ = file_tx.send(Msg::FileChanged);
112 }
113 }) {
114 Ok(w) => w,
115 Err(e) => {
116 on_event(WatchEvent::SyncError(format!("Failed to start watcher: {}", e)));
117 return;
118 }
119 };
120
121 if let Err(e) = watcher.watch(&source_path, RecursiveMode::Recursive) {
122 on_event(WatchEvent::SyncError(format!("Failed to watch path: {}", e)));
123 return;
124 }
125
126 let is_cross_drive = pair.drive_id.is_some();
128 if is_cross_drive {
129 let poll_tx = msg_tx.clone();
130 thread::spawn(move || loop {
131 thread::sleep(DRIVE_POLL_INTERVAL);
132 if poll_tx.send(Msg::DriveCheck).is_err() {
133 break;
134 }
135 });
136 }
137
138 {
140 let stop_tx = msg_tx.clone();
141 thread::spawn(move || {
142 if stop_rx.recv().is_ok() {
143 let _ = stop_tx.send(Msg::Stop);
144 }
145 });
146 }
147
148 on_event(WatchEvent::Watching);
149
150 let mut drive_mounted = if is_cross_drive {
152 pair.drive_id
154 .as_ref()
155 .and_then(|id| find_mounted_drive(id))
156 .is_some()
157 } else {
158 true };
160
161 let mut last_change: Option<Instant> = None;
162 let mut pending_sync = false;
163
164 if drive_mounted && is_cross_drive {
166 if let Some(mount) = pair.drive_id.as_ref().and_then(|id| find_mounted_drive(id)) {
167 on_event(WatchEvent::DriveDetected { mount_point: mount });
168 do_sync(&name, &on_event);
169 }
170 }
171
172 loop {
173 let timeout = if pending_sync {
175 DEBOUNCE_DELAY
176 } else {
177 Duration::from_secs(60)
178 };
179
180 let msg = msg_rx.recv_timeout(timeout);
181
182 match msg {
183 Ok(Msg::Stop) | Err(mpsc::RecvTimeoutError::Disconnected) => break,
184
185 Ok(Msg::FileChanged) => {
186 if drive_mounted {
187 last_change = Some(Instant::now());
188 pending_sync = true;
189 }
190 }
191
192 Ok(Msg::DriveCheck) => {
193 let now_mounted = pair
194 .drive_id
195 .as_ref()
196 .and_then(|id| find_mounted_drive(id))
197 .is_some();
198
199 match (drive_mounted, now_mounted) {
200 (false, true) => {
201 drive_mounted = true;
203 if let Some(mount) =
204 pair.drive_id.as_ref().and_then(|id| find_mounted_drive(id))
205 {
206 on_event(WatchEvent::DriveDetected { mount_point: mount });
207 }
208 do_sync(&name, &on_event);
209 }
210 (true, false) => {
211 drive_mounted = false;
213 pending_sync = false;
214 on_event(WatchEvent::DriveRemoved);
215 }
216 _ => {}
217 }
218 }
219
220 Err(mpsc::RecvTimeoutError::Timeout) => {
221 if pending_sync {
223 if let Some(last) = last_change {
224 if last.elapsed() >= DEBOUNCE_DELAY {
225 pending_sync = false;
226 last_change = None;
227 do_sync(&name, &on_event);
228 }
229 }
230 }
231 }
232 }
233 }
234}
235
236fn do_sync(name: &str, on_event: &impl Fn(WatchEvent)) {
239 on_event(WatchEvent::SyncStarted);
240 match sync_pair(name, SyncOptions { dry_run: false, verify: false }) {
241 Ok(report) => on_event(WatchEvent::SyncCompleted(report)),
242 Err(e) => on_event(WatchEvent::SyncError(e)),
243 }
244}