Skip to main content

ix/
watcher.rs

1//! File system watcher (notify crate) with debouncing.
2
3use crate::error::Result;
4use crossbeam_channel::Receiver;
5use llmosafe::llmosafe_kernel::{ReasoningLoop, SiftedSynapse};
6use llmosafe::{ResourceGuard, Synapse, WorkingMemory};
7use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher as _};
8use std::collections::HashSet;
9use std::path::{Path, PathBuf};
10use std::sync::mpsc;
11use std::thread;
12use std::time::Duration;
13
14pub struct Watcher {
15    root: PathBuf,
16    watcher: Option<RecommendedWatcher>,
17    thread: Option<thread::JoinHandle<()>>,
18}
19
20impl Watcher {
21    pub fn new(root: &Path) -> Result<Self> {
22        Ok(Self {
23            root: root.to_owned(),
24            watcher: None,
25            thread: None,
26        })
27    }
28
29    pub fn start(&mut self) -> Result<Receiver<Vec<PathBuf>>> {
30        let (tx, rx) = crossbeam_channel::unbounded();
31        let (event_tx, event_rx) = mpsc::channel();
32
33        let mut watcher = RecommendedWatcher::new(event_tx, Config::default())?;
34
35        // Attempt recursive watch on root first (most efficient)
36        if let Err(err) = watcher.watch(&self.root, RecursiveMode::Recursive) {
37            eprintln!(
38                "ix: warning: recursive watch failed: {}. Falling back to manual walk.",
39                err
40            );
41
42            let walker = ignore::WalkBuilder::new(&self.root)
43                .hidden(false)
44                .git_ignore(true)
45                .require_git(false)
46                .add_custom_ignore_filename(".ixignore")
47                .filter_entry(move |entry| {
48                    let path = entry.path();
49                    let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
50
51                    // Built-in directory defaults
52                    if entry.file_type().map(|t| t.is_dir()).unwrap_or(false)
53                        && (name == "lost+found"
54                            || name == ".git"
55                            || name == "node_modules"
56                            || name == "target"
57                            || name == "__pycache__"
58                            || name == ".tox"
59                            || name == ".venv"
60                            || name == "venv"
61                            || name == ".ix")
62                    {
63                        return false;
64                    }
65
66                    // Built-in file noise defaults
67                    if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
68                        if let Ok(metadata) = entry.metadata()
69                            && metadata.len() > 10 * 1024 * 1024
70                        {
71                            return false;
72                        }
73                        if name == "Cargo.lock"
74                            || name == "package-lock.json"
75                            || name == "pnpm-lock.yaml"
76                            || name == "shard.ix"
77                            || name == "shard.ix.tmp"
78                        {
79                            return false;
80                        }
81                    }
82
83                    // Built-in file extension defaults
84                    if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
85                        let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
86                        match ext {
87                            // Binary extensions
88                            "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" |
89                            // Media
90                            "jpg" | "png" | "gif" | "mp4" | "mp3" | "pdf" |
91                            // Archives
92                            "zip" | "7z" | "rar" |
93                            // Data
94                            "sqlite" | "db" | "bin" => return false,
95                            _ => {}
96                        }
97                        if name.ends_with(".tar.gz") {
98                            return false;
99                        }
100                    }
101                    true
102                })
103                .build();
104
105            let mut loop_guard = ReasoningLoop::<20000>::new();
106            let guard = ResourceGuard::auto(0.5);
107            let mut memory = WorkingMemory::<64>::new(1000);
108
109            for result in walker {
110                // Cognitive Stability check: entropy must be stable (RSS < 50% ceiling for the walk)
111                let synapse = guard.check().unwrap_or_else(|_| {
112                    let mut s = Synapse::new();
113                    s.set_raw_entropy(1500); // 1.0 ratio
114                    s
115                });
116
117                let sifted = SiftedSynapse::new(synapse);
118                let validated = match memory.update(sifted) {
119                    Ok(v) => v,
120                    Err(e) => {
121                        eprintln!(
122                            "ix: critical safety halt during watcher walk: {:?}. Directory tree too large or RAM low.",
123                            e
124                        );
125                        break;
126                    }
127                };
128
129                if let Err(e) = loop_guard.next_step(validated) {
130                    eprintln!("ix: critical reasoning depth exceeded: {:?}.", e);
131                    break;
132                }
133
134                match result {
135                    Ok(entry) => {
136                        if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
137                            let path = entry.path();
138                            if let Err(e) = watcher.watch(path, RecursiveMode::NonRecursive) {
139                                eprintln!(
140                                    "ix: warning: watcher failed for {}: {}",
141                                    path.display(),
142                                    e
143                                );
144                            }
145                        }
146                    }
147                    Err(e) => {
148                        eprintln!("ix: warning: watcher skipping path: {}", e);
149                    }
150                }
151            }
152        }
153
154        self.watcher = Some(watcher);
155
156        let handle = thread::spawn(move || {
157            let mut changed_paths = HashSet::new();
158            loop {
159                // Wait for the first event
160                match event_rx.recv() {
161                    Ok(Ok(event)) => {
162                        Self::collect_paths(&mut changed_paths, event);
163
164                        // Debounce loop: keep collecting for 500ms after the last event
165                        loop {
166                            match event_rx.recv_timeout(Duration::from_millis(500)) {
167                                Ok(Ok(event)) => {
168                                    Self::collect_paths(&mut changed_paths, event);
169                                }
170                                Ok(Err(_)) => continue, // notify error, skip
171                                Err(mpsc::RecvTimeoutError::Timeout) => {
172                                    // Debounce period over
173                                    if !changed_paths.is_empty() {
174                                        let paths: Vec<PathBuf> = changed_paths.drain().collect();
175                                        if tx.send(paths).is_err() {
176                                            return; // Receiver dropped
177                                        }
178                                    }
179                                    break;
180                                }
181                                Err(mpsc::RecvTimeoutError::Disconnected) => return,
182                            }
183                        }
184                    }
185                    Ok(Err(_)) => continue,
186                    Err(_) => return, // Watcher dropped
187                }
188            }
189        });
190
191        self.thread = Some(handle);
192        Ok(rx)
193    }
194
195    pub fn stop(&mut self) {
196        self.watcher.take(); // Dropping the watcher stops events
197        if let Some(handle) = self.thread.take() {
198            let _ = handle.join();
199        }
200    }
201
202    pub fn is_running(&self) -> bool {
203        self.watcher.is_some()
204    }
205
206    fn collect_paths(set: &mut HashSet<PathBuf>, event: Event) {
207        if event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove() {
208            for path in event.paths {
209                // Ignore the .ix directory changes to avoid loops
210                if path.components().any(|c| c.as_os_str() == ".ix") {
211                    continue;
212                }
213                set.insert(path);
214            }
215        }
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222    use crate::error::Error;
223    use std::fs::File;
224    use std::io::Write;
225    use tempfile::tempdir;
226
227    #[test]
228    fn test_watcher_basic() -> Result<()> {
229        let dir = tempdir().map_err(Error::Io)?;
230        let mut watcher = Watcher::new(dir.path())?;
231        let rx = watcher.start()?;
232
233        let file_path = dir.path().join("test.txt");
234        {
235            let mut file = File::create(&file_path).map_err(Error::Io)?;
236            file.write_all(b"hello").map_err(Error::Io)?;
237            file.sync_all().map_err(Error::Io)?;
238        }
239
240        let events = rx
241            .recv_timeout(Duration::from_secs(2))
242            .map_err(|_| Error::Config("Timeout waiting for watcher event".into()))?;
243
244        assert!(!events.is_empty());
245        assert!(events.iter().any(|p: &PathBuf| p.ends_with("test.txt")));
246
247        watcher.stop();
248        Ok(())
249    }
250}