Skip to main content

ix/
watcher.rs

1//! File system watcher (notify crate) with debouncing.
2
3use crate::error::Result;
4use crossbeam_channel::Receiver;
5use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher as _};
6use std::collections::HashSet;
7use std::path::{Path, PathBuf};
8use std::sync::mpsc;
9use std::thread;
10use std::time::Duration;
11
12pub struct Watcher {
13    root: PathBuf,
14    watcher: Option<RecommendedWatcher>,
15    thread: Option<thread::JoinHandle<()>>,
16}
17
18impl Watcher {
19    pub fn new(root: &Path) -> Result<Self> {
20        Ok(Self {
21            root: root.to_owned(),
22            watcher: None,
23            thread: None,
24        })
25    }
26
27    pub fn start(&mut self) -> Result<Receiver<Vec<PathBuf>>> {
28        let (tx, rx) = crossbeam_channel::unbounded();
29        let (event_tx, event_rx) = mpsc::channel();
30
31        let mut watcher = RecommendedWatcher::new(event_tx, Config::default())?;
32        
33        // Instead of recursive watch which crashes on permission denied, 
34        // we walk and add non-recursive watches to each directory.
35        let walker = ignore::WalkBuilder::new(&self.root)
36            .hidden(false)
37            .git_ignore(true)
38            .require_git(false)
39            .add_custom_ignore_filename(".ixignore")
40            .filter_entry(move |entry| {
41                let path = entry.path();
42                let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
43                
44                // Built-in directory defaults
45                if entry.file_type().map(|t| t.is_dir()).unwrap_or(false)
46                    && (name == "lost+found" || name == ".git" || name == "node_modules" || 
47                       name == "target" || name == "__pycache__" || name == ".tox" || 
48                       name == ".venv" || name == "venv" || name == ".ix") 
49                {
50                    return false;
51                }
52
53                // Built-in file extension defaults
54                if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
55                    let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
56                    match ext {
57                        // Binary extensions
58                        "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" |
59                        // Media
60                        "jpg" | "png" | "gif" | "mp4" | "mp3" | "pdf" |
61                        // Archives
62                        "zip" | "7z" | "rar" |
63                        // Data
64                        "sqlite" | "db" | "bin" => return false,
65                        _ => {}
66                    }
67                    if name.ends_with(".tar.gz") {
68                        return false;
69                    }
70                }
71                true
72            })
73            .build();
74
75        for result in walker {
76            match result {
77                Ok(entry) => {
78                    if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
79                        let path = entry.path();
80                        if let Err(e) = watcher.watch(path, RecursiveMode::NonRecursive) {
81                            eprintln!("ix: warning: watcher failed for {}: {}", path.display(), e);
82                        }
83                    }
84                }
85                Err(e) => {
86                    eprintln!("ix: warning: watcher skipping path: {}", e);
87                }
88            }
89        }
90
91        self.watcher = Some(watcher);
92
93        let handle = thread::spawn(move || {
94            let mut changed_paths = HashSet::new();
95            loop {
96                // Wait for the first event
97                match event_rx.recv() {
98                    Ok(Ok(event)) => {
99                        Self::collect_paths(&mut changed_paths, event);
100
101                        // Debounce loop: keep collecting for 500ms after the last event
102                        loop {
103                            match event_rx.recv_timeout(Duration::from_millis(500)) {
104                                Ok(Ok(event)) => {
105                                    Self::collect_paths(&mut changed_paths, event);
106                                }
107                                Ok(Err(_)) => continue, // notify error, skip
108                                Err(mpsc::RecvTimeoutError::Timeout) => {
109                                    // Debounce period over
110                                    if !changed_paths.is_empty() {
111                                        let paths: Vec<PathBuf> = changed_paths.drain().collect();
112                                        if tx.send(paths).is_err() {
113                                            return; // Receiver dropped
114                                        }
115                                    }
116                                    break;
117                                }
118                                Err(mpsc::RecvTimeoutError::Disconnected) => return,
119                            }
120                        }
121                    }
122                    Ok(Err(_)) => continue,
123                    Err(_) => return, // Watcher dropped
124                }
125            }
126        });
127
128        self.thread = Some(handle);
129        Ok(rx)
130    }
131
132    pub fn stop(&mut self) {
133        self.watcher.take(); // Dropping the watcher stops events
134        if let Some(handle) = self.thread.take() {
135            let _ = handle.join();
136        }
137    }
138
139    pub fn is_running(&self) -> bool {
140        self.watcher.is_some()
141    }
142
143    fn collect_paths(set: &mut HashSet<PathBuf>, event: Event) {
144        if event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove() {
145            for path in event.paths {
146                // Ignore the .ix directory changes to avoid loops
147                if path.components().any(|c| c.as_os_str() == ".ix") {
148                    continue;
149                }
150                set.insert(path);
151            }
152        }
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use crate::error::Error;
160    use std::fs::File;
161    use std::io::Write;
162    use tempfile::tempdir;
163
164    #[test]
165    fn test_watcher_basic() -> Result<()> {
166        let dir = tempdir().map_err(Error::Io)?;
167        let mut watcher = Watcher::new(dir.path())?;
168        let rx = watcher.start()?;
169
170        let file_path = dir.path().join("test.txt");
171        {
172            let mut file = File::create(&file_path).map_err(Error::Io)?;
173            file.write_all(b"hello").map_err(Error::Io)?;
174            file.sync_all().map_err(Error::Io)?;
175        }
176
177        let events = rx
178            .recv_timeout(Duration::from_secs(2))
179            .map_err(|_| Error::Config("Timeout waiting for watcher event".into()))?;
180
181        assert!(!events.is_empty());
182        assert!(events.iter().any(|p: &PathBuf| p.ends_with("test.txt")));
183
184        watcher.stop();
185        Ok(())
186    }
187}