Skip to main content

watch_path/
local.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::mpsc;
4use std::time::{Duration, Instant};
5
6use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
7
8use crate::watcher::{
9    ConnectionState, PathWatcher, WatchError, WatchEvent, WatchEventKind, WatchOptions,
10};
11
12pub struct LocalWatcher {
13    _watcher: RecommendedWatcher,
14    rx: mpsc::Receiver<notify::Result<Event>>,
15    debounce: Duration,
16    pending: HashMap<PathBuf, Instant>,
17    base_dir: PathBuf,
18}
19
20impl LocalWatcher {
21    pub fn new(dir: &Path, options: &WatchOptions) -> Result<Self, WatchError> {
22        let (tx, rx) = mpsc::channel();
23
24        let mut watcher = RecommendedWatcher::new(
25            move |result| {
26                let _ = tx.send(result);
27            },
28            notify::Config::default(),
29        )
30        .map_err(|e| WatchError::Notify(e.to_string()))?;
31
32        watcher
33            .watch(dir, RecursiveMode::Recursive)
34            .map_err(|e| WatchError::Notify(e.to_string()))?;
35
36        Ok(Self {
37            _watcher: watcher,
38            rx,
39            debounce: options.debounce,
40            pending: HashMap::new(),
41            base_dir: dir.to_path_buf(),
42        })
43    }
44}
45
46impl PathWatcher for LocalWatcher {
47    fn poll(&mut self) -> Result<Vec<WatchEvent>, WatchError> {
48        while let Ok(event_result) = self.rx.try_recv() {
49            if let Ok(event) = event_result {
50                let dominated_by_kind =
51                    matches!(event.kind, EventKind::Create(_) | EventKind::Modify(_));
52                if !dominated_by_kind {
53                    continue;
54                }
55                for path in event.paths {
56                    if path.is_file() {
57                        self.pending.insert(path, Instant::now());
58                    }
59                }
60            }
61        }
62
63        let now = Instant::now();
64        let mut ready = Vec::new();
65
66        self.pending.retain(|path, last_seen| {
67            if now.duration_since(*last_seen) >= self.debounce {
68                if !path.exists() {
69                    return false;
70                }
71                let kind = WatchEventKind::Modified;
72                ready.push(WatchEvent {
73                    path: path.to_string_lossy().to_string(),
74                    kind,
75                });
76                false
77            } else {
78                true
79            }
80        });
81
82        Ok(ready)
83    }
84
85    fn read(&mut self, path: &str) -> Result<Vec<u8>, WatchError> {
86        let full_path = if Path::new(path).is_absolute() {
87            PathBuf::from(path)
88        } else {
89            self.base_dir.join(path)
90        };
91        std::fs::read(&full_path).map_err(WatchError::Io)
92    }
93
94    fn has_pending(&self) -> bool {
95        !self.pending.is_empty()
96    }
97
98    fn connection_state(&self) -> ConnectionState {
99        ConnectionState::Connected
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106
107    #[test]
108    fn local_watcher_reads_file() {
109        let dir = tempfile::tempdir().unwrap();
110        let file_path = dir.path().join("test.txt");
111        std::fs::write(&file_path, b"hello").unwrap();
112
113        let opts = WatchOptions::default();
114        let mut watcher = LocalWatcher::new(dir.path(), &opts).unwrap();
115
116        let data = watcher.read(&file_path.to_string_lossy()).unwrap();
117        assert_eq!(data, b"hello");
118    }
119
120    #[test]
121    fn local_watcher_always_connected() {
122        let dir = tempfile::tempdir().unwrap();
123        let opts = WatchOptions::default();
124        let watcher = LocalWatcher::new(dir.path(), &opts).unwrap();
125        assert_eq!(watcher.connection_state(), ConnectionState::Connected);
126    }
127}