Skip to main content

hard_sync_core/
watcher.rs

1use std::path::PathBuf;
2use std::sync::mpsc;
3use std::thread;
4use std::time::{Duration, Instant};
5
6use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
7
8use crate::config::{PairConfig, SourceSide};
9use crate::drive::find_mounted_drive;
10use crate::sync_engine::{sync_pair, SyncOptions, SyncReport};
11
12// ── Timing constants ───────────────────────────────────────────────────────────
13
14/// How often the drive poll thread checks for the target drive.
15const DRIVE_POLL_INTERVAL: Duration = Duration::from_secs(3);
16
17/// Wait this long after the last file-change event before triggering a sync.
18/// Prevents thrashing when an editor writes many small events on save.
19const DEBOUNCE_DELAY: Duration = Duration::from_millis(500);
20
21// ── Public types ──────────────────────────────────────────────────────────────
22
23/// Events emitted by the watch loop — consumed by the caller (CLI / UI).
24pub enum WatchEvent {
25    /// Drive-aware pair: target drive was just detected at this mount point.
26    DriveDetected { mount_point: PathBuf },
27    /// Drive-aware pair: target drive was unplugged / no longer found.
28    DriveRemoved,
29    /// A sync is about to start.
30    SyncStarted,
31    /// A sync completed successfully.
32    SyncCompleted(SyncReport),
33    /// A sync failed with this error string.
34    SyncError(String),
35    /// Emitted once at startup when the watcher is ready.
36    Watching,
37}
38
39/// Handle returned to the caller. Keeps the watch loop alive.
40/// Drop it or call `stop()` to shut down the watcher.
41pub struct WatchHandle {
42    stop_tx: mpsc::Sender<()>,
43    thread: Option<thread::JoinHandle<()>>,
44}
45
46impl WatchHandle {
47    /// Block the current thread until the watch loop exits.
48    pub fn wait(mut self) {
49        if let Some(t) = self.thread.take() {
50            let _ = t.join();
51        }
52    }
53
54    /// Signal the watch loop to stop (non-blocking).
55    pub fn stop(&self) {
56        let _ = self.stop_tx.send(());
57    }
58}
59
60// ── Internal channel messages ─────────────────────────────────────────────────
61
62enum Msg {
63    FileChanged,
64    DriveCheck,
65    Stop,
66}
67
68// ── Public entry point ────────────────────────────────────────────────────────
69
70/// Start watching a named pair. Returns a `WatchHandle` immediately.
71/// The `on_event` callback is called from the watch thread for each event.
72pub fn watch_pair(
73    name: &str,
74    on_event: impl Fn(WatchEvent) + Send + 'static,
75) -> Result<WatchHandle, String> {
76    let pair = crate::config::get_pair(name)?;
77    let (stop_tx, stop_rx) = mpsc::channel::<()>();
78    let name = name.to_string();
79
80    let thread = thread::spawn(move || {
81        run_watch_loop(name, pair, on_event, stop_rx);
82    });
83
84    Ok(WatchHandle {
85        stop_tx,
86        thread: Some(thread),
87    })
88}
89
90// ── Watch loop ────────────────────────────────────────────────────────────────
91
92fn run_watch_loop(
93    name: String,
94    pair: PairConfig,
95    on_event: impl Fn(WatchEvent),
96    stop_rx: mpsc::Receiver<()>,
97) {
98    let (msg_tx, msg_rx) = mpsc::channel::<Msg>();
99
100    // Resolve the source path (always local — always present)
101    let source_path = match pair.source {
102        SourceSide::Base => pair.base.clone(),
103        SourceSide::Target => pair.target.clone(),
104    };
105
106    // Set up the file watcher on source
107    let file_tx = msg_tx.clone();
108    let mut watcher: RecommendedWatcher =
109        match notify::recommended_watcher(move |res: notify::Result<Event>| {
110            if res.is_ok() {
111                let _ = file_tx.send(Msg::FileChanged);
112            }
113        }) {
114            Ok(w) => w,
115            Err(e) => {
116                on_event(WatchEvent::SyncError(format!("Failed to start watcher: {}", e)));
117                return;
118            }
119        };
120
121    if let Err(e) = watcher.watch(&source_path, RecursiveMode::Recursive) {
122        on_event(WatchEvent::SyncError(format!("Failed to watch path: {}", e)));
123        return;
124    }
125
126    // For cross-drive pairs, spawn a drive poll thread
127    let is_cross_drive = pair.drive_id.is_some();
128    if is_cross_drive {
129        let poll_tx = msg_tx.clone();
130        thread::spawn(move || loop {
131            thread::sleep(DRIVE_POLL_INTERVAL);
132            if poll_tx.send(Msg::DriveCheck).is_err() {
133                break;
134            }
135        });
136    }
137
138    // Forward stop signal into the message channel
139    {
140        let stop_tx = msg_tx.clone();
141        thread::spawn(move || {
142            if stop_rx.recv().is_ok() {
143                let _ = stop_tx.send(Msg::Stop);
144            }
145        });
146    }
147
148    on_event(WatchEvent::Watching);
149
150    // State
151    let mut drive_mounted = if is_cross_drive {
152        // Check once immediately on startup
153        pair.drive_id
154            .as_ref()
155            .and_then(|id| find_mounted_drive(id))
156            .is_some()
157    } else {
158        true // same-drive pair — target always accessible
159    };
160
161    let mut last_change: Option<Instant> = None;
162    let mut pending_sync = false;
163
164    // If drive is already mounted at startup, sync immediately
165    if drive_mounted && is_cross_drive {
166        if let Some(mount) = pair.drive_id.as_ref().and_then(|id| find_mounted_drive(id)) {
167            on_event(WatchEvent::DriveDetected { mount_point: mount });
168            do_sync(&name, &on_event);
169        }
170    }
171
172    loop {
173        // Timeout so we can check debounce even if no messages arrive
174        let timeout = if pending_sync {
175            DEBOUNCE_DELAY
176        } else {
177            Duration::from_secs(60)
178        };
179
180        let msg = msg_rx.recv_timeout(timeout);
181
182        match msg {
183            Ok(Msg::Stop) | Err(mpsc::RecvTimeoutError::Disconnected) => break,
184
185            Ok(Msg::FileChanged) => {
186                if drive_mounted {
187                    last_change = Some(Instant::now());
188                    pending_sync = true;
189                }
190            }
191
192            Ok(Msg::DriveCheck) => {
193                let now_mounted = pair
194                    .drive_id
195                    .as_ref()
196                    .and_then(|id| find_mounted_drive(id))
197                    .is_some();
198
199                match (drive_mounted, now_mounted) {
200                    (false, true) => {
201                        // Drive just appeared
202                        drive_mounted = true;
203                        if let Some(mount) =
204                            pair.drive_id.as_ref().and_then(|id| find_mounted_drive(id))
205                        {
206                            on_event(WatchEvent::DriveDetected { mount_point: mount });
207                        }
208                        do_sync(&name, &on_event);
209                    }
210                    (true, false) => {
211                        // Drive just disappeared
212                        drive_mounted = false;
213                        pending_sync = false;
214                        on_event(WatchEvent::DriveRemoved);
215                    }
216                    _ => {}
217                }
218            }
219
220            Err(mpsc::RecvTimeoutError::Timeout) => {
221                // Debounce: fire sync if enough time has passed since last change
222                if pending_sync {
223                    if let Some(last) = last_change {
224                        if last.elapsed() >= DEBOUNCE_DELAY {
225                            pending_sync = false;
226                            last_change = None;
227                            do_sync(&name, &on_event);
228                        }
229                    }
230                }
231            }
232        }
233    }
234}
235
236// ── Sync helper ───────────────────────────────────────────────────────────────
237
238fn do_sync(name: &str, on_event: &impl Fn(WatchEvent)) {
239    on_event(WatchEvent::SyncStarted);
240    match sync_pair(name, SyncOptions { dry_run: false, verify: false }) {
241        Ok(report) => on_event(WatchEvent::SyncCompleted(report)),
242        Err(e) => on_event(WatchEvent::SyncError(e)),
243    }
244}