Skip to main content

ix/
watcher.rs

1//! File system watcher (notify crate) with debouncing.
2
3use crate::error::Result;
4use crossbeam_channel::Receiver;
5use llmosafe::llmosafe_kernel::{ReasoningLoop, SiftedSynapse};
6use llmosafe::{ResourceGuard, Synapse, WorkingMemory};
7use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher as _};
8use std::collections::HashSet;
9use std::path::{Path, PathBuf};
10use std::sync::mpsc;
11use std::thread;
12use std::time::Duration;
13
14/// A file-system watcher that detects changes in a directory tree and
15/// batches them into debounced event batches.
16pub struct Watcher {
17    root: PathBuf,
18    inner: Option<RecommendedWatcher>,
19    join_handle: Option<thread::JoinHandle<()>>,
20}
21
22impl Watcher {
23    /// Create a new watcher for the given root directory.
24    ///
25    /// The watcher is not started yet; call [`Watcher::start`] to begin
26    /// receiving events.
27    #[must_use]
28    pub fn new(root: &Path) -> Self {
29        Self {
30            root: root.to_owned(),
31            inner: None,
32            join_handle: None,
33        }
34    }
35
36    /// Start watching the file system for changes.
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if the filesystem watcher cannot be created.
41    #[allow(clippy::too_many_lines)]
42    pub fn start(&mut self) -> Result<Receiver<Vec<PathBuf>>> {
43        let (tx, rx) = crossbeam_channel::unbounded();
44        let (event_tx, event_rx) = mpsc::channel();
45
46        let mut watcher = RecommendedWatcher::new(event_tx, Config::default())?;
47
48        // Attempt recursive watch on root first (most efficient)
49        if let Err(err) = watcher.watch(&self.root, RecursiveMode::Recursive) {
50            eprintln!("ix: warning: recursive watch failed: {err}. Falling back to manual walk.");
51
52            let walker = ignore::WalkBuilder::new(&self.root)
53                .hidden(false)
54                .git_ignore(true)
55                .require_git(false)
56                .add_custom_ignore_filename(".ixignore")
57                .filter_entry(move |entry| {
58                    let path = entry.path();
59                    let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
60
61                    // Built-in directory defaults
62                    if entry.file_type().is_some_and(|t| t.is_dir())
63                        && (name == "lost+found"
64                            || name == ".git"
65                            || name == "node_modules"
66                            || name == "target"
67                            || name == "__pycache__"
68                            || name == ".tox"
69                            || name == ".venv"
70                            || name == "venv"
71                            || name == ".ix")
72                    {
73                        return false;
74                    }
75
76                    // Built-in file noise defaults
77                    if entry.file_type().is_some_and(|t| t.is_file()) {
78                        if let Ok(metadata) = entry.metadata()
79                            && metadata.len() > 10 * 1024 * 1024
80                        {
81                            return false;
82                        }
83                        if name == "Cargo.lock"
84                            || name == "package-lock.json"
85                            || name == "pnpm-lock.yaml"
86                            || name == "shard.ix"
87                            || name == "shard.ix.tmp"
88                        {
89                            return false;
90                        }
91                    }
92
93                    // Built-in file extension defaults
94                    if entry.file_type().is_some_and(|t| t.is_file()) {
95                        let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
96                        match ext {
97                            // Binary extensions
98                            "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" |
99                            // Media
100                            "jpg" | "png" | "gif" | "mp4" | "mp3" | "pdf" |
101                            // Archives
102                            "zip" | "7z" | "rar" |
103                            // Data
104                            "sqlite" | "db" | "bin" => return false,
105                            _ => {}
106                        }
107                        if name.ends_with(".tar.gz") {
108                            return false;
109                        }
110                    }
111                    true
112                })
113                .build();
114
115            let mut loop_guard = ReasoningLoop::<20000>::new();
116            let guard = ResourceGuard::auto(0.5);
117            let mut memory = WorkingMemory::<64>::new(1000);
118
119            for result in walker {
120                // Cognitive Stability check: entropy must be stable (RSS < 50% ceiling for the walk)
121                let synapse = guard.check().unwrap_or_else(|_| {
122                    let mut s = Synapse::new();
123                    s.set_raw_entropy(1500); // 1.0 ratio
124                    s
125                });
126
127                let sifted = SiftedSynapse::new(synapse);
128                let validated = match memory.update(sifted) {
129                    Ok(v) => v,
130                    Err(e) => {
131                        eprintln!(
132                            "ix: critical safety halt during watcher walk: {e:?}. Directory tree too large or RAM low."
133                        );
134                        break;
135                    }
136                };
137
138                if let Err(e) = loop_guard.next_step(validated) {
139                    eprintln!("ix: critical reasoning depth exceeded: {e:?}.");
140                    break;
141                }
142
143                match result {
144                    Ok(entry) => {
145                        if entry.file_type().is_some_and(|t| t.is_dir()) {
146                            let path = entry.path();
147                            if let Err(e) = watcher.watch(path, RecursiveMode::NonRecursive) {
148                                eprintln!(
149                                    "ix: warning: watcher failed for {}: {}",
150                                    path.display(),
151                                    e
152                                );
153                            }
154                        }
155                    }
156                    Err(e) => {
157                        eprintln!("ix: warning: watcher skipping path: {e}");
158                    }
159                }
160            }
161        }
162
163        self.inner = Some(watcher);
164
165        let handle = thread::spawn(move || {
166            let mut changed_paths = HashSet::new();
167            loop {
168                // Wait for the first event
169                match event_rx.recv() {
170                    Ok(Ok(event)) => {
171                        Self::collect_paths(&mut changed_paths, event);
172
173                        // Debounce loop: keep collecting for 500ms after the last event
174                        loop {
175                            match event_rx.recv_timeout(Duration::from_millis(500)) {
176                                Ok(Ok(event)) => {
177                                    Self::collect_paths(&mut changed_paths, event);
178                                }
179                                Ok(Err(_)) => {} // notify error, skip
180                                Err(mpsc::RecvTimeoutError::Timeout) => {
181                                    // Debounce period over
182                                    if !changed_paths.is_empty() {
183                                        let paths: Vec<PathBuf> = changed_paths.drain().collect();
184                                        if tx.send(paths).is_err() {
185                                            return; // Receiver dropped
186                                        }
187                                    }
188                                    break;
189                                }
190                                Err(mpsc::RecvTimeoutError::Disconnected) => return,
191                            }
192                        }
193                    }
194                    Ok(Err(_)) => {}
195                    Err(_) => return, // Watcher dropped
196                }
197            }
198        });
199
200        self.join_handle = Some(handle);
201        Ok(rx)
202    }
203
204    /// Stop watching and join the background event-loop thread.
205    pub fn stop(&mut self) {
206        self.inner.take(); // Dropping the watcher stops events
207        if let Some(handle) = self.join_handle.take() {
208            let _ = handle.join();
209        }
210    }
211
212    /// Returns `true` while the underlying notify watcher is active.
213    #[must_use]
214    pub const fn is_running(&self) -> bool {
215        self.inner.is_some()
216    }
217
218    fn collect_paths(set: &mut HashSet<PathBuf>, event: Event) {
219        if event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove() {
220            for path in event.paths {
221                // Ignore the .ix directory changes to avoid loops
222                if path.components().any(|c| c.as_os_str() == ".ix") {
223                    continue;
224                }
225                set.insert(path);
226            }
227        }
228    }
229}
230
231#[cfg(test)]
232#[allow(clippy::as_conversions, clippy::unwrap_used, clippy::indexing_slicing)]
233mod tests {
234    use super::*;
235    use crate::error::Error;
236    use std::fs::File;
237    use std::io::Write;
238    use tempfile::tempdir;
239
240    #[test]
241    fn test_watcher_basic() -> Result<()> {
242        let dir = tempdir().map_err(Error::Io)?;
243        let mut watcher = Watcher::new(dir.path());
244        let rx = watcher.start()?;
245
246        let file_path = dir.path().join("test.txt");
247        {
248            let mut file = File::create(&file_path).map_err(Error::Io)?;
249            file.write_all(b"hello").map_err(Error::Io)?;
250            file.sync_all().map_err(Error::Io)?;
251        }
252
253        let events = rx
254            .recv_timeout(Duration::from_secs(2))
255            .map_err(|_| Error::Config("Timeout waiting for watcher event".into()))?;
256
257        if events.is_empty() {
258            return Err(Error::Config("No watcher events received".into()));
259        }
260        if !events.iter().any(|p: &PathBuf| p.ends_with("test.txt")) {
261            return Err(Error::Config("test.txt not found in watcher events".into()));
262        }
263
264        watcher.stop();
265        Ok(())
266    }
267}