Skip to main content

ix/
watcher.rs

1//! File system watcher (notify crate) with debouncing.
2
3use crate::error::Result;
4use crossbeam_channel::Receiver;
5use llmosafe::llmosafe_kernel::{ReasoningLoop, SiftedSynapse};
6use llmosafe::{ResourceGuard, Synapse, WorkingMemory};
7use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher as _};
8use std::collections::HashSet;
9use std::path::{Path, PathBuf};
10use std::sync::mpsc;
11use std::thread;
12use std::time::Duration;
13
14/// A file-system watcher that detects changes in a directory tree and
15/// batches them into debounced event batches.
16pub struct Watcher {
17    root: PathBuf,
18    inner: Option<RecommendedWatcher>,
19    join_handle: Option<thread::JoinHandle<()>>,
20}
21
22impl Watcher {
23/// Create a new watcher for the given root directory.
24    ///
25    /// The watcher is not started yet; call [`Watcher::start`] to begin
26    /// receiving events.
27    #[must_use]
28    pub fn new(root: &Path) -> Self {
29        Self {
30            root: root.to_owned(),
31            inner: None,
32            join_handle: None,
33        }
34    }
35
36    /// Start watching the file system for changes.
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if the filesystem watcher cannot be created.
41    #[allow(clippy::too_many_lines)]
42    pub fn start(&mut self) -> Result<Receiver<Vec<PathBuf>>> {
43        let (tx, rx) = crossbeam_channel::unbounded();
44        let (event_tx, event_rx) = mpsc::channel();
45
46        let mut watcher = RecommendedWatcher::new(event_tx, Config::default())?;
47
48        // Attempt recursive watch on root first (most efficient)
49        if let Err(err) = watcher.watch(&self.root, RecursiveMode::Recursive) {
50            eprintln!(
51                "ix: warning: recursive watch failed: {err}. Falling back to manual walk."
52            );
53
54            let walker = ignore::WalkBuilder::new(&self.root)
55                .hidden(false)
56                .git_ignore(true)
57                .require_git(false)
58                .add_custom_ignore_filename(".ixignore")
59                .filter_entry(move |entry| {
60                    let path = entry.path();
61                    let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
62
63                    // Built-in directory defaults
64                    if entry.file_type().is_some_and(|t| t.is_dir())
65                        && (name == "lost+found"
66                            || name == ".git"
67                            || name == "node_modules"
68                            || name == "target"
69                            || name == "__pycache__"
70                            || name == ".tox"
71                            || name == ".venv"
72                            || name == "venv"
73                            || name == ".ix")
74                    {
75                        return false;
76                    }
77
78                    // Built-in file noise defaults
79                    if entry.file_type().is_some_and(|t| t.is_file()) {
80                        if let Ok(metadata) = entry.metadata()
81                            && metadata.len() > 10 * 1024 * 1024
82                        {
83                            return false;
84                        }
85                        if name == "Cargo.lock"
86                            || name == "package-lock.json"
87                            || name == "pnpm-lock.yaml"
88                            || name == "shard.ix"
89                            || name == "shard.ix.tmp"
90                        {
91                            return false;
92                        }
93                    }
94
95                    // Built-in file extension defaults
96                    if entry.file_type().is_some_and(|t| t.is_file()) {
97                        let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
98                        match ext {
99                            // Binary extensions
100                            "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" |
101                            // Media
102                            "jpg" | "png" | "gif" | "mp4" | "mp3" | "pdf" |
103                            // Archives
104                            "zip" | "7z" | "rar" |
105                            // Data
106                            "sqlite" | "db" | "bin" => return false,
107                            _ => {}
108                        }
109                        if name.ends_with(".tar.gz") {
110                            return false;
111                        }
112                    }
113                    true
114                })
115                .build();
116
117            let mut loop_guard = ReasoningLoop::<20000>::new();
118            let guard = ResourceGuard::auto(0.5);
119            let mut memory = WorkingMemory::<64>::new(1000);
120
121            for result in walker {
122                // Cognitive Stability check: entropy must be stable (RSS < 50% ceiling for the walk)
123                let synapse = guard.check().unwrap_or_else(|_| {
124                    let mut s = Synapse::new();
125                    s.set_raw_entropy(1500); // 1.0 ratio
126                    s
127                });
128
129                let sifted = SiftedSynapse::new(synapse);
130                let validated = match memory.update(sifted) {
131                    Ok(v) => v,
132                    Err(e) => {
133                        eprintln!(
134                            "ix: critical safety halt during watcher walk: {e:?}. Directory tree too large or RAM low."
135                        );
136                        break;
137                    }
138                };
139
140                if let Err(e) = loop_guard.next_step(validated) {
141                    eprintln!("ix: critical reasoning depth exceeded: {e:?}.");
142                    break;
143                }
144
145                match result {
146                    Ok(entry) => {
147                        if entry.file_type().is_some_and(|t| t.is_dir()) {
148                            let path = entry.path();
149                            if let Err(e) = watcher.watch(path, RecursiveMode::NonRecursive) {
150                                eprintln!(
151                                    "ix: warning: watcher failed for {}: {}",
152                                    path.display(),
153                                    e
154                                );
155                            }
156                        }
157                    }
158                    Err(e) => {
159                        eprintln!("ix: warning: watcher skipping path: {e}");
160                    }
161                }
162            }
163        }
164
165        self.inner = Some(watcher);
166
167        let handle = thread::spawn(move || {
168            let mut changed_paths = HashSet::new();
169            loop {
170                // Wait for the first event
171                match event_rx.recv() {
172                    Ok(Ok(event)) => {
173                        Self::collect_paths(&mut changed_paths, event);
174
175                        // Debounce loop: keep collecting for 500ms after the last event
176                        loop {
177                            match event_rx.recv_timeout(Duration::from_millis(500)) {
178                                Ok(Ok(event)) => {
179                                    Self::collect_paths(&mut changed_paths, event);
180                                }
181                                Ok(Err(_)) => {} // notify error, skip
182                                Err(mpsc::RecvTimeoutError::Timeout) => {
183                                    // Debounce period over
184                                    if !changed_paths.is_empty() {
185                                        let paths: Vec<PathBuf> = changed_paths.drain().collect();
186                                        if tx.send(paths).is_err() {
187                                            return; // Receiver dropped
188                                        }
189                                    }
190                                    break;
191                                }
192                                Err(mpsc::RecvTimeoutError::Disconnected) => return,
193                            }
194                        }
195                    }
196                    Ok(Err(_)) => {},
197                    Err(_) => return, // Watcher dropped
198                }
199            }
200        });
201
202        self.join_handle = Some(handle);
203        Ok(rx)
204    }
205
206    /// Stop watching and join the background event-loop thread.
207    pub fn stop(&mut self) {
208        self.inner.take(); // Dropping the watcher stops events
209        if let Some(handle) = self.join_handle.take() {
210            let _ = handle.join();
211        }
212    }
213
214/// Returns `true` while the underlying notify watcher is active.
215    #[must_use]
216    pub const fn is_running(&self) -> bool {
217        self.inner.is_some()
218    }
219
220    fn collect_paths(set: &mut HashSet<PathBuf>, event: Event) {
221        if event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove() {
222            for path in event.paths {
223                // Ignore the .ix directory changes to avoid loops
224                if path.components().any(|c| c.as_os_str() == ".ix") {
225                    continue;
226                }
227                set.insert(path);
228            }
229        }
230    }
231}
232
233#[cfg(test)]
234#[allow(clippy::as_conversions, clippy::unwrap_used, clippy::indexing_slicing)]
235mod tests {
236    use super::*;
237    use crate::error::Error;
238    use std::fs::File;
239    use std::io::Write;
240    use tempfile::tempdir;
241
242    #[test]
243    fn test_watcher_basic() -> Result<()> {
244        let dir = tempdir().map_err(Error::Io)?;
245        let mut watcher = Watcher::new(dir.path());
246        let rx = watcher.start()?;
247
248        let file_path = dir.path().join("test.txt");
249        {
250            let mut file = File::create(&file_path).map_err(Error::Io)?;
251            file.write_all(b"hello").map_err(Error::Io)?;
252            file.sync_all().map_err(Error::Io)?;
253        }
254
255        let events = rx
256            .recv_timeout(Duration::from_secs(2))
257            .map_err(|_| Error::Config("Timeout waiting for watcher event".into()))?;
258
259        if events.is_empty() {
260            return Err(Error::Config("No watcher events received".into()));
261        }
262        if !events.iter().any(|p: &PathBuf| p.ends_with("test.txt")) {
263            return Err(Error::Config("test.txt not found in watcher events".into()));
264        }
265
266        watcher.stop();
267        Ok(())
268    }
269}