codeprism_core/watcher/
mod.rs

1//! File system watcher for detecting changes
2
3use crate::error::{Error, Result};
4use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9use tokio::sync::mpsc;
10use tokio::time::sleep;
11
12/// Type of file change
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum ChangeKind {
15    /// File created
16    Created,
17    /// File modified
18    Modified,
19    /// File deleted
20    Deleted,
21    /// File renamed
22    Renamed {
23        /// Old file path
24        old: PathBuf,
25        /// New file path
26        new: PathBuf,
27    },
28}
29
30/// File change event
31#[derive(Debug, Clone)]
32pub struct ChangeEvent {
33    /// Repository root
34    pub repo_root: PathBuf,
35    /// Changed file path
36    pub path: PathBuf,
37    /// Type of change
38    pub kind: ChangeKind,
39    /// Timestamp of the event
40    pub timestamp: Instant,
41}
42
43impl ChangeEvent {
44    /// Create a new change event
45    pub fn new(repo_root: PathBuf, path: PathBuf, kind: ChangeKind) -> Self {
46        Self {
47            repo_root,
48            path,
49            kind,
50            timestamp: Instant::now(),
51        }
52    }
53}
54
55/// Debouncer for file events
56struct Debouncer {
57    pending: Arc<Mutex<HashMap<PathBuf, (ChangeKind, Instant)>>>,
58    tx: mpsc::UnboundedSender<ChangeEvent>,
59    debounce_duration: Duration,
60}
61
62impl Debouncer {
63    /// Create a new debouncer
64    fn new(tx: mpsc::UnboundedSender<ChangeEvent>, debounce_duration: Duration) -> Self {
65        Self {
66            pending: Arc::new(Mutex::new(HashMap::new())),
67            tx,
68            debounce_duration,
69        }
70    }
71
72    /// Add an event to be debounced
73    fn add_event(&self, event: ChangeEvent) {
74        let mut pending = self.pending.lock().unwrap();
75        pending.insert(event.path.clone(), (event.kind.clone(), event.timestamp));
76
77        // Schedule flush
78        let pending_clone = Arc::clone(&self.pending);
79        let tx = self.tx.clone();
80        let path = event.path.clone();
81        let repo_root = event.repo_root.clone();
82        let duration = self.debounce_duration;
83
84        tokio::spawn(async move {
85            sleep(duration).await;
86
87            let mut pending = pending_clone.lock().unwrap();
88            if let Some((kind, timestamp)) = pending.remove(&path) {
89                // Check if enough time has passed
90                if timestamp.elapsed() >= duration {
91                    let event = ChangeEvent {
92                        repo_root,
93                        path,
94                        kind,
95                        timestamp,
96                    };
97                    let _ = tx.send(event);
98                }
99            }
100        });
101    }
102}
103
104/// File system watcher
105pub struct FileWatcher {
106    watcher: RecommendedWatcher,
107    #[allow(dead_code)] // Used indirectly through Arc
108    debouncer: Arc<Debouncer>,
109    change_rx: mpsc::UnboundedReceiver<ChangeEvent>,
110    watched_paths: Arc<Mutex<Vec<PathBuf>>>,
111}
112
113impl FileWatcher {
114    /// Create a new file watcher with default 50ms debounce
115    pub fn new() -> Result<Self> {
116        Self::with_debounce(Duration::from_millis(50))
117    }
118
119    /// Create a new file watcher with custom debounce duration
120    pub fn with_debounce(debounce_duration: Duration) -> Result<Self> {
121        let (change_tx, change_rx) = mpsc::unbounded_channel();
122        let debouncer = Arc::new(Debouncer::new(change_tx.clone(), debounce_duration));
123
124        // Create a channel for notify events
125        let (notify_tx, mut notify_rx) = mpsc::unbounded_channel();
126
127        let watcher = RecommendedWatcher::new(
128            move |res: notify::Result<Event>| {
129                if let Ok(event) = res {
130                    let _ = notify_tx.send(event);
131                }
132            },
133            Config::default(),
134        )
135        .map_err(|e| Error::watcher(format!("Failed to create watcher: {}", e)))?;
136
137        // Start event processor
138        let debouncer_clone = Arc::clone(&debouncer);
139        tokio::spawn(async move {
140            while let Some(event) = notify_rx.recv().await {
141                // For now, we'll use a placeholder repo_root
142                // In real usage, this would be tracked per watched path
143                if let Some(path) = event.paths.first() {
144                    let repo_root = path.clone();
145                    if let Some(change_event) = Self::convert_event(event, repo_root) {
146                        debouncer_clone.add_event(change_event);
147                    }
148                }
149            }
150        });
151
152        Ok(Self {
153            watcher,
154            debouncer,
155            change_rx,
156            watched_paths: Arc::new(Mutex::new(Vec::new())),
157        })
158    }
159
160    /// Watch a directory recursively
161    pub fn watch_dir(&mut self, path: &Path, _repo_root: PathBuf) -> Result<()> {
162        self.watcher
163            .watch(path, RecursiveMode::Recursive)
164            .map_err(|e| Error::watcher(format!("Failed to watch {}: {}", path.display(), e)))?;
165
166        let mut paths = self.watched_paths.lock().unwrap();
167        paths.push(path.to_path_buf());
168
169        Ok(())
170    }
171
172    /// Stop watching a directory
173    pub fn unwatch(&mut self, path: &Path) -> Result<()> {
174        self.watcher
175            .unwatch(path)
176            .map_err(|e| Error::watcher(format!("Failed to unwatch {}: {}", path.display(), e)))?;
177
178        let mut paths = self.watched_paths.lock().unwrap();
179        paths.retain(|p| p != path);
180
181        Ok(())
182    }
183
184    /// Get the next change event
185    pub async fn next_change(&mut self) -> Option<ChangeEvent> {
186        self.change_rx.recv().await
187    }
188
189    /// Convert notify event to our ChangeEvent
190    fn convert_event(event: Event, repo_root: PathBuf) -> Option<ChangeEvent> {
191        let path = event.paths.first()?.clone();
192
193        let kind = match event.kind {
194            EventKind::Create(_) => ChangeKind::Created,
195            EventKind::Modify(_) => ChangeKind::Modified,
196            EventKind::Remove(_) => ChangeKind::Deleted,
197            EventKind::Any => ChangeKind::Modified,
198            _ => return None,
199        };
200
201        Some(ChangeEvent::new(repo_root, path, kind))
202    }
203}
204
205impl Default for FileWatcher {
206    fn default() -> Self {
207        Self::new().expect("Failed to create file watcher")
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use std::fs;
215    use tempfile::TempDir;
216
217    #[tokio::test]
218    async fn test_file_watcher_creation() {
219        let watcher = FileWatcher::new();
220        assert!(watcher.is_ok());
221    }
222
223    #[tokio::test]
224    async fn test_debouncer() {
225        let (tx, mut rx) = mpsc::unbounded_channel();
226        let debouncer = Debouncer::new(tx, Duration::from_millis(50));
227
228        let event = ChangeEvent::new(
229            PathBuf::from("/repo"),
230            PathBuf::from("/repo/file.txt"),
231            ChangeKind::Modified,
232        );
233
234        debouncer.add_event(event);
235
236        // Wait for debounce
237        sleep(Duration::from_millis(100)).await;
238
239        let received = rx.recv().await;
240        assert!(received.is_some());
241    }
242
243    #[tokio::test]
244    async fn test_watch_directory() {
245        let temp_dir = TempDir::new().unwrap();
246        let mut watcher = FileWatcher::new().unwrap();
247
248        let result = watcher.watch_dir(temp_dir.path(), temp_dir.path().to_path_buf());
249        assert!(result.is_ok());
250
251        // Create a file
252        let file_path = temp_dir.path().join("test.txt");
253        fs::write(&file_path, "test content").unwrap();
254
255        // Wait for event
256        sleep(Duration::from_millis(100)).await;
257
258        // Note: In a real test, we'd need to properly handle async event reception
259    }
260}