1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::mpsc;
4use std::time::{Duration, Instant};
5
6use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
7
8use crate::watcher::{
9 ConnectionState, PathWatcher, WatchError, WatchEvent, WatchEventKind, WatchOptions,
10};
11
12pub struct LocalWatcher {
13 _watcher: RecommendedWatcher,
14 rx: mpsc::Receiver<notify::Result<Event>>,
15 debounce: Duration,
16 pending: HashMap<PathBuf, Instant>,
17 base_dir: PathBuf,
18}
19
20impl LocalWatcher {
21 pub fn new(dir: &Path, options: &WatchOptions) -> Result<Self, WatchError> {
22 let (tx, rx) = mpsc::channel();
23
24 let mut watcher = RecommendedWatcher::new(
25 move |result| {
26 let _ = tx.send(result);
27 },
28 notify::Config::default(),
29 )
30 .map_err(|e| WatchError::Notify(e.to_string()))?;
31
32 watcher
33 .watch(dir, RecursiveMode::Recursive)
34 .map_err(|e| WatchError::Notify(e.to_string()))?;
35
36 Ok(Self {
37 _watcher: watcher,
38 rx,
39 debounce: options.debounce,
40 pending: HashMap::new(),
41 base_dir: dir.to_path_buf(),
42 })
43 }
44}
45
46impl PathWatcher for LocalWatcher {
47 fn poll(&mut self) -> Result<Vec<WatchEvent>, WatchError> {
48 while let Ok(event_result) = self.rx.try_recv() {
49 if let Ok(event) = event_result {
50 let dominated_by_kind =
51 matches!(event.kind, EventKind::Create(_) | EventKind::Modify(_));
52 if !dominated_by_kind {
53 continue;
54 }
55 for path in event.paths {
56 if path.is_file() {
57 self.pending.insert(path, Instant::now());
58 }
59 }
60 }
61 }
62
63 let now = Instant::now();
64 let mut ready = Vec::new();
65
66 self.pending.retain(|path, last_seen| {
67 if now.duration_since(*last_seen) >= self.debounce {
68 if !path.exists() {
69 return false;
70 }
71 let kind = WatchEventKind::Modified;
72 ready.push(WatchEvent {
73 path: path.to_string_lossy().to_string(),
74 kind,
75 });
76 false
77 } else {
78 true
79 }
80 });
81
82 Ok(ready)
83 }
84
85 fn read(&mut self, path: &str) -> Result<Vec<u8>, WatchError> {
86 let full_path = if Path::new(path).is_absolute() {
87 PathBuf::from(path)
88 } else {
89 self.base_dir.join(path)
90 };
91 std::fs::read(&full_path).map_err(WatchError::Io)
92 }
93
94 fn has_pending(&self) -> bool {
95 !self.pending.is_empty()
96 }
97
98 fn connection_state(&self) -> ConnectionState {
99 ConnectionState::Connected
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106
107 #[test]
108 fn local_watcher_reads_file() {
109 let dir = tempfile::tempdir().unwrap();
110 let file_path = dir.path().join("test.txt");
111 std::fs::write(&file_path, b"hello").unwrap();
112
113 let opts = WatchOptions::default();
114 let mut watcher = LocalWatcher::new(dir.path(), &opts).unwrap();
115
116 let data = watcher.read(&file_path.to_string_lossy()).unwrap();
117 assert_eq!(data, b"hello");
118 }
119
120 #[test]
121 fn local_watcher_always_connected() {
122 let dir = tempfile::tempdir().unwrap();
123 let opts = WatchOptions::default();
124 let watcher = LocalWatcher::new(dir.path(), &opts).unwrap();
125 assert_eq!(watcher.connection_state(), ConnectionState::Connected);
126 }
127}