codeprism_core/watcher/
mod.rs1use crate::error::{Error, Result};
4use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9use tokio::sync::mpsc;
10use tokio::time::sleep;
11
12#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum ChangeKind {
15 Created,
17 Modified,
19 Deleted,
21 Renamed {
23 old: PathBuf,
25 new: PathBuf,
27 },
28}
29
30#[derive(Debug, Clone)]
32pub struct ChangeEvent {
33 pub repo_root: PathBuf,
35 pub path: PathBuf,
37 pub kind: ChangeKind,
39 pub timestamp: Instant,
41}
42
43impl ChangeEvent {
44 pub fn new(repo_root: PathBuf, path: PathBuf, kind: ChangeKind) -> Self {
46 Self {
47 repo_root,
48 path,
49 kind,
50 timestamp: Instant::now(),
51 }
52 }
53}
54
55struct Debouncer {
57 pending: Arc<Mutex<HashMap<PathBuf, (ChangeKind, Instant)>>>,
58 tx: mpsc::UnboundedSender<ChangeEvent>,
59 debounce_duration: Duration,
60}
61
62impl Debouncer {
63 fn new(tx: mpsc::UnboundedSender<ChangeEvent>, debounce_duration: Duration) -> Self {
65 Self {
66 pending: Arc::new(Mutex::new(HashMap::new())),
67 tx,
68 debounce_duration,
69 }
70 }
71
72 fn add_event(&self, event: ChangeEvent) {
74 let mut pending = self.pending.lock().unwrap();
75 pending.insert(event.path.clone(), (event.kind.clone(), event.timestamp));
76
77 let pending_clone = Arc::clone(&self.pending);
79 let tx = self.tx.clone();
80 let path = event.path.clone();
81 let repo_root = event.repo_root.clone();
82 let duration = self.debounce_duration;
83
84 tokio::spawn(async move {
85 sleep(duration).await;
86
87 let mut pending = pending_clone.lock().unwrap();
88 if let Some((kind, timestamp)) = pending.remove(&path) {
89 if timestamp.elapsed() >= duration {
91 let event = ChangeEvent {
92 repo_root,
93 path,
94 kind,
95 timestamp,
96 };
97 let _ = tx.send(event);
98 }
99 }
100 });
101 }
102}
103
104pub struct FileWatcher {
106 watcher: RecommendedWatcher,
107 #[allow(dead_code)] debouncer: Arc<Debouncer>,
109 change_rx: mpsc::UnboundedReceiver<ChangeEvent>,
110 watched_paths: Arc<Mutex<Vec<PathBuf>>>,
111}
112
113impl FileWatcher {
114 pub fn new() -> Result<Self> {
116 Self::with_debounce(Duration::from_millis(50))
117 }
118
119 pub fn with_debounce(debounce_duration: Duration) -> Result<Self> {
121 let (change_tx, change_rx) = mpsc::unbounded_channel();
122 let debouncer = Arc::new(Debouncer::new(change_tx.clone(), debounce_duration));
123
124 let (notify_tx, mut notify_rx) = mpsc::unbounded_channel();
126
127 let watcher = RecommendedWatcher::new(
128 move |res: notify::Result<Event>| {
129 if let Ok(event) = res {
130 let _ = notify_tx.send(event);
131 }
132 },
133 Config::default(),
134 )
135 .map_err(|e| Error::watcher(format!("Failed to create watcher: {}", e)))?;
136
137 let debouncer_clone = Arc::clone(&debouncer);
139 tokio::spawn(async move {
140 while let Some(event) = notify_rx.recv().await {
141 if let Some(path) = event.paths.first() {
144 let repo_root = path.clone();
145 if let Some(change_event) = Self::convert_event(event, repo_root) {
146 debouncer_clone.add_event(change_event);
147 }
148 }
149 }
150 });
151
152 Ok(Self {
153 watcher,
154 debouncer,
155 change_rx,
156 watched_paths: Arc::new(Mutex::new(Vec::new())),
157 })
158 }
159
160 pub fn watch_dir(&mut self, path: &Path, _repo_root: PathBuf) -> Result<()> {
162 self.watcher
163 .watch(path, RecursiveMode::Recursive)
164 .map_err(|e| Error::watcher(format!("Failed to watch {}: {}", path.display(), e)))?;
165
166 let mut paths = self.watched_paths.lock().unwrap();
167 paths.push(path.to_path_buf());
168
169 Ok(())
170 }
171
172 pub fn unwatch(&mut self, path: &Path) -> Result<()> {
174 self.watcher
175 .unwatch(path)
176 .map_err(|e| Error::watcher(format!("Failed to unwatch {}: {}", path.display(), e)))?;
177
178 let mut paths = self.watched_paths.lock().unwrap();
179 paths.retain(|p| p != path);
180
181 Ok(())
182 }
183
184 pub async fn next_change(&mut self) -> Option<ChangeEvent> {
186 self.change_rx.recv().await
187 }
188
189 fn convert_event(event: Event, repo_root: PathBuf) -> Option<ChangeEvent> {
191 let path = event.paths.first()?.clone();
192
193 let kind = match event.kind {
194 EventKind::Create(_) => ChangeKind::Created,
195 EventKind::Modify(_) => ChangeKind::Modified,
196 EventKind::Remove(_) => ChangeKind::Deleted,
197 EventKind::Any => ChangeKind::Modified,
198 _ => return None,
199 };
200
201 Some(ChangeEvent::new(repo_root, path, kind))
202 }
203}
204
205impl Default for FileWatcher {
206 fn default() -> Self {
207 Self::new().expect("Failed to create file watcher")
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use std::fs;
215 use tempfile::TempDir;
216
217 #[tokio::test]
218 async fn test_file_watcher_creation() {
219 let watcher = FileWatcher::new();
220 assert!(watcher.is_ok());
221 }
222
223 #[tokio::test]
224 async fn test_debouncer() {
225 let (tx, mut rx) = mpsc::unbounded_channel();
226 let debouncer = Debouncer::new(tx, Duration::from_millis(50));
227
228 let event = ChangeEvent::new(
229 PathBuf::from("/repo"),
230 PathBuf::from("/repo/file.txt"),
231 ChangeKind::Modified,
232 );
233
234 debouncer.add_event(event);
235
236 sleep(Duration::from_millis(100)).await;
238
239 let received = rx.recv().await;
240 assert!(received.is_some());
241 }
242
243 #[tokio::test]
244 async fn test_watch_directory() {
245 let temp_dir = TempDir::new().unwrap();
246 let mut watcher = FileWatcher::new().unwrap();
247
248 let result = watcher.watch_dir(temp_dir.path(), temp_dir.path().to_path_buf());
249 assert!(result.is_ok());
250
251 let file_path = temp_dir.path().join("test.txt");
253 fs::write(&file_path, "test content").unwrap();
254
255 sleep(Duration::from_millis(100)).await;
257
258 }
260}