Skip to main content

zeph_core/
file_watcher.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::path::PathBuf;
5use std::time::Duration;
6
7use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
8use tokio::sync::mpsc;
9
10#[derive(Debug, thiserror::Error)]
11pub enum FileWatcherError {
12    #[error("no watch paths configured")]
13    NoWatchPaths,
14
15    #[error("filesystem watcher error: {0}")]
16    Notify(#[from] notify::Error),
17}
18
19/// Filesystem change event for a watched path.
20#[derive(Debug, Clone)]
21pub struct FileChangedEvent {
22    pub path: PathBuf,
23}
24
25/// Watches a set of paths and sends `FileChangedEvent` on any change.
26///
27/// Uses `notify-debouncer-mini` to debounce rapid filesystem events.
28/// Paths are resolved once at construction time from the project root.
29///
30/// Call `stop()` on the watcher to shut it down cleanly. The watcher
31/// is also stopped automatically when all senders are dropped.
32pub struct FileChangeWatcher {
33    handle: tokio::task::JoinHandle<()>,
34}
35
36impl std::fmt::Debug for FileChangeWatcher {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.debug_struct("FileChangeWatcher").finish_non_exhaustive()
39    }
40}
41
42impl Drop for FileChangeWatcher {
43    fn drop(&mut self) {
44        self.handle.abort();
45    }
46}
47
48impl FileChangeWatcher {
49    /// Start watching the given paths.
50    ///
51    /// `watch_paths` are watched recursively if they are directories.
52    /// Each path in `watch_paths` is watched with `RecursiveMode::Recursive`.
53    ///
54    /// # Errors
55    ///
56    /// Returns an error if no paths are provided or if the watcher cannot be initialized.
57    pub fn start(
58        watch_paths: &[PathBuf],
59        debounce_ms: u64,
60        tx: mpsc::Sender<FileChangedEvent>,
61    ) -> Result<Self, FileWatcherError> {
62        if watch_paths.is_empty() {
63            return Err(FileWatcherError::NoWatchPaths);
64        }
65
66        let (notify_tx, mut notify_rx) = mpsc::channel::<PathBuf>(64);
67
68        let mut debouncer = new_debouncer(
69            Duration::from_millis(debounce_ms),
70            move |events: Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>| {
71                let events = match events {
72                    Ok(e) => e,
73                    Err(e) => {
74                        tracing::warn!("file watcher error: {e}");
75                        return;
76                    }
77                };
78                for event in events {
79                    if event.kind == DebouncedEventKind::Any {
80                        let _ = notify_tx.blocking_send(event.path);
81                    }
82                }
83            },
84        )?;
85
86        for path in watch_paths {
87            if let Err(e) = debouncer
88                .watcher()
89                .watch(path, notify::RecursiveMode::Recursive)
90            {
91                tracing::warn!(path = %path.display(), error = %e, "file watcher: failed to watch path");
92            }
93        }
94
95        let handle = tokio::spawn(async move {
96            let _debouncer = debouncer;
97            while let Some(path) = notify_rx.recv().await {
98                if tx.send(FileChangedEvent { path }).await.is_err() {
99                    break;
100                }
101            }
102        });
103
104        Ok(Self { handle })
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111
112    #[tokio::test]
113    async fn start_with_empty_paths_fails() {
114        let (tx, _rx) = mpsc::channel(16);
115        let result = FileChangeWatcher::start(&[], 500, tx);
116        assert!(result.is_err());
117        assert!(matches!(
118            result.unwrap_err(),
119            FileWatcherError::NoWatchPaths
120        ));
121    }
122
123    #[tokio::test]
124    async fn start_with_valid_dir() {
125        let dir = tempfile::tempdir().unwrap();
126        let (tx, _rx) = mpsc::channel(16);
127        let result = FileChangeWatcher::start(&[dir.path().to_path_buf()], 500, tx);
128        assert!(result.is_ok());
129    }
130
131    #[tokio::test]
132    async fn detects_file_change() {
133        let dir = tempfile::tempdir().unwrap();
134        let file_path = dir.path().join("test.txt");
135        std::fs::write(&file_path, "initial").unwrap();
136
137        let (tx, mut rx) = mpsc::channel(16);
138        let _watcher = FileChangeWatcher::start(&[dir.path().to_path_buf()], 500, tx).unwrap();
139
140        // Wait for watcher to settle before modifying.
141        tokio::time::sleep(Duration::from_millis(100)).await;
142        std::fs::write(&file_path, "updated").unwrap();
143
144        let result = tokio::time::timeout(Duration::from_secs(3), rx.recv()).await;
145        assert!(result.is_ok(), "expected FileChangedEvent within timeout");
146        // Event received — path granularity varies by OS/watcher backend (e.g. macOS FSEvents
147        // may return intermediate temp paths or symlink-resolved paths), so we only verify
148        // that an event arrived from within the watched directory tree.
149        assert!(result.unwrap().is_some());
150    }
151}