Skip to main content

zeph_core/
file_watcher.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::path::PathBuf;
5use std::time::Duration;
6
7use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
8use tokio::sync::mpsc;
9
10#[non_exhaustive]
11#[derive(Debug, thiserror::Error)]
12pub enum FileWatcherError {
13    #[error("no watch paths configured")]
14    NoWatchPaths,
15
16    #[error("filesystem watcher error: {0}")]
17    Notify(#[from] notify::Error),
18}
19
20/// Filesystem change event for a watched path.
21#[derive(Debug, Clone)]
22pub struct FileChangedEvent {
23    pub path: PathBuf,
24}
25
26/// Watches a set of paths and sends `FileChangedEvent` on any change.
27///
28/// Uses `notify-debouncer-mini` to debounce rapid filesystem events.
29/// Paths are resolved once at construction time from the project root.
30///
31/// Call `stop()` on the watcher to shut it down cleanly. The watcher
32/// is also stopped automatically when all senders are dropped.
33pub struct FileChangeWatcher {
34    handle: tokio::task::JoinHandle<()>,
35}
36
37impl std::fmt::Debug for FileChangeWatcher {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        f.debug_struct("FileChangeWatcher").finish_non_exhaustive()
40    }
41}
42
43impl Drop for FileChangeWatcher {
44    fn drop(&mut self) {
45        self.handle.abort();
46    }
47}
48
49impl FileChangeWatcher {
50    /// Start watching the given paths.
51    ///
52    /// `watch_paths` are watched recursively if they are directories.
53    /// Each path in `watch_paths` is watched with `RecursiveMode::Recursive`.
54    ///
55    /// # Errors
56    ///
57    /// Returns an error if no paths are provided or if the watcher cannot be initialized.
58    pub fn start(
59        watch_paths: &[PathBuf],
60        debounce_ms: u64,
61        tx: mpsc::Sender<FileChangedEvent>,
62    ) -> Result<Self, FileWatcherError> {
63        if watch_paths.is_empty() {
64            return Err(FileWatcherError::NoWatchPaths);
65        }
66
67        let (notify_tx, mut notify_rx) = mpsc::channel::<PathBuf>(64);
68
69        let mut debouncer = new_debouncer(
70            Duration::from_millis(debounce_ms),
71            move |events: Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>| {
72                let events = match events {
73                    Ok(e) => e,
74                    Err(e) => {
75                        tracing::warn!("file watcher error: {e}");
76                        return;
77                    }
78                };
79                for event in events {
80                    if event.kind == DebouncedEventKind::Any {
81                        let _ = notify_tx.blocking_send(event.path);
82                    }
83                }
84            },
85        )?;
86
87        for path in watch_paths {
88            if let Err(e) = debouncer
89                .watcher()
90                .watch(path, notify::RecursiveMode::Recursive)
91            {
92                tracing::warn!(path = %path.display(), error = %e, "file watcher: failed to watch path");
93            }
94        }
95
96        let handle = tokio::spawn(async move {
97            let _debouncer = debouncer;
98            while let Some(path) = notify_rx.recv().await {
99                if tx.send(FileChangedEvent { path }).await.is_err() {
100                    break;
101                }
102            }
103        });
104
105        Ok(Self { handle })
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112
113    #[tokio::test]
114    async fn start_with_empty_paths_fails() {
115        let (tx, _rx) = mpsc::channel(16);
116        let result = FileChangeWatcher::start(&[], 500, tx);
117        assert!(result.is_err());
118        assert!(matches!(
119            result.unwrap_err(),
120            FileWatcherError::NoWatchPaths
121        ));
122    }
123
124    #[tokio::test]
125    async fn start_with_valid_dir() {
126        let dir = tempfile::tempdir().unwrap();
127        let (tx, _rx) = mpsc::channel(16);
128        let result = FileChangeWatcher::start(&[dir.path().to_path_buf()], 500, tx);
129        assert!(result.is_ok());
130    }
131
132    #[tokio::test]
133    async fn detects_file_change() {
134        let dir = tempfile::tempdir().unwrap();
135        let file_path = dir.path().join("test.txt");
136        std::fs::write(&file_path, "initial").unwrap();
137
138        let (tx, mut rx) = mpsc::channel(16);
139        let _watcher = FileChangeWatcher::start(&[dir.path().to_path_buf()], 500, tx).unwrap();
140
141        // Wait for watcher to settle before modifying.
142        tokio::time::sleep(Duration::from_millis(100)).await;
143        std::fs::write(&file_path, "updated").unwrap();
144
145        let result = tokio::time::timeout(Duration::from_secs(3), rx.recv()).await;
146        assert!(result.is_ok(), "expected FileChangedEvent within timeout");
147        // Event received — path granularity varies by OS/watcher backend (e.g. macOS FSEvents
148        // may return intermediate temp paths or symlink-resolved paths), so we only verify
149        // that an event arrived from within the watched directory tree.
150        assert!(result.unwrap().is_some());
151    }
152}