Skip to main content

ix/
watcher.rs

1//! File system watcher (notify crate) with debouncing.
2
3use crate::error::Result;
4use crossbeam_channel::Receiver;
5use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher as _};
6use std::collections::HashSet;
7use std::path::{Path, PathBuf};
8use std::sync::mpsc;
9use std::thread;
10use std::time::Duration;
11use llmosafe::llmosafe_kernel::{ReasoningLoop, ValidatedSynapse, SiftedSynapse};
12use llmosafe::{ResourceGuard, Synapse, WorkingMemory};
13
14pub struct Watcher {
15    root: PathBuf,
16    watcher: Option<RecommendedWatcher>,
17    thread: Option<thread::JoinHandle<()>>,
18}
19
20impl Watcher {
21    pub fn new(root: &Path) -> Result<Self> {
22        Ok(Self {
23            root: root.to_owned(),
24            watcher: None,
25            thread: None,
26        })
27    }
28
29    pub fn start(&mut self) -> Result<Receiver<Vec<PathBuf>>> {
30        let (tx, rx) = crossbeam_channel::unbounded();
31        let (event_tx, event_rx) = mpsc::channel();
32
33        let mut watcher = RecommendedWatcher::new(event_tx, Config::default())?;
34        
35        // Attempt recursive watch on root first (most efficient)
36        if let Err(err) = watcher.watch(&self.root, RecursiveMode::Recursive) {
37            eprintln!("ix: warning: recursive watch failed: {}. Falling back to manual walk.", err);
38            
39            let walker = ignore::WalkBuilder::new(&self.root)
40                .hidden(false)
41                .git_ignore(true)
42                .require_git(false)
43                .add_custom_ignore_filename(".ixignore")
44                .filter_entry(move |entry| {
45                    let path = entry.path();
46                    let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
47                    
48                    // Built-in directory defaults
49                    if entry.file_type().map(|t| t.is_dir()).unwrap_or(false)
50                        && (name == "lost+found" || name == ".git" || name == "node_modules" || 
51                           name == "target" || name == "__pycache__" || name == ".tox" || 
52                           name == ".venv" || name == "venv" || name == ".ix") 
53                    {
54                        return false;
55                    }
56
57                    // Built-in file noise defaults
58                    if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
59                        if let Ok(metadata) = entry.metadata()
60                            && metadata.len() > 10 * 1024 * 1024
61                        {
62                            return false;
63                        }
64                        if name == "Cargo.lock" || name == "package-lock.json" || name == "pnpm-lock.yaml" || 
65                           name == "shard.ix" || name == "shard.ix.tmp"
66                        {
67                            return false;
68                        }
69                    }
70
71                    // Built-in file extension defaults
72                    if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
73                        let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
74                        match ext {
75                            // Binary extensions
76                            "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" |
77                            // Media
78                            "jpg" | "png" | "gif" | "mp4" | "mp3" | "pdf" |
79                            // Archives
80                            "zip" | "7z" | "rar" |
81                            // Data
82                            "sqlite" | "db" | "bin" => return false,
83                            _ => {}
84                        }
85                        if name.ends_with(".tar.gz") {
86                            return false;
87                        }
88                    }
89                    true
90                })
91                .build();
92            
93            let mut loop_guard = ReasoningLoop::<20000>::new();
94            let guard = ResourceGuard::auto(0.5);
95            let mut memory = WorkingMemory::<64>::new(1000);
96
97            for result in walker {
98                // Cognitive Stability check: entropy must be stable (RSS < 50% ceiling for the walk)
99                let synapse = guard.check().unwrap_or_else(|_| {
100                    let mut s = Synapse::new();
101                    s.set_raw_entropy(1500); // 1.0 ratio
102                    s
103                });
104
105                let sifted = SiftedSynapse::new(synapse);
106                let validated = match memory.update(sifted) {
107                    Ok(v) => v,
108                    Err(e) => {
109                        eprintln!("ix: critical safety halt during watcher walk: {:?}. Directory tree too large or RAM low.", e);
110                        break;
111                    }
112                };
113
114                if let Err(e) = loop_guard.next_step(validated) {
115                    eprintln!("ix: critical reasoning depth exceeded: {:?}.", e);
116                    break;
117                }
118
119                match result {
120                    Ok(entry) => {
121                        if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
122                            let path = entry.path();
123                            if let Err(e) = watcher.watch(path, RecursiveMode::NonRecursive) {
124                                eprintln!("ix: warning: watcher failed for {}: {}", path.display(), e);
125                            }
126                        }
127                    }
128                    Err(e) => {
129                        eprintln!("ix: warning: watcher skipping path: {}", e);
130                    }
131                }
132            }
133        }
134
135        self.watcher = Some(watcher);
136
137        let handle = thread::spawn(move || {
138            let mut changed_paths = HashSet::new();
139            loop {
140                // Wait for the first event
141                match event_rx.recv() {
142                    Ok(Ok(event)) => {
143                        Self::collect_paths(&mut changed_paths, event);
144
145                        // Debounce loop: keep collecting for 500ms after the last event
146                        loop {
147                            match event_rx.recv_timeout(Duration::from_millis(500)) {
148                                Ok(Ok(event)) => {
149                                    Self::collect_paths(&mut changed_paths, event);
150                                }
151                                Ok(Err(_)) => continue, // notify error, skip
152                                Err(mpsc::RecvTimeoutError::Timeout) => {
153                                    // Debounce period over
154                                    if !changed_paths.is_empty() {
155                                        let paths: Vec<PathBuf> = changed_paths.drain().collect();
156                                        if tx.send(paths).is_err() {
157                                            return; // Receiver dropped
158                                        }
159                                    }
160                                    break;
161                                }
162                                Err(mpsc::RecvTimeoutError::Disconnected) => return,
163                            }
164                        }
165                    }
166                    Ok(Err(_)) => continue,
167                    Err(_) => return, // Watcher dropped
168                }
169            }
170        });
171
172        self.thread = Some(handle);
173        Ok(rx)
174    }
175
176    pub fn stop(&mut self) {
177        self.watcher.take(); // Dropping the watcher stops events
178        if let Some(handle) = self.thread.take() {
179            let _ = handle.join();
180        }
181    }
182
183    pub fn is_running(&self) -> bool {
184        self.watcher.is_some()
185    }
186
187    fn collect_paths(set: &mut HashSet<PathBuf>, event: Event) {
188        if event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove() {
189            for path in event.paths {
190                // Ignore the .ix directory changes to avoid loops
191                if path.components().any(|c| c.as_os_str() == ".ix") {
192                    continue;
193                }
194                set.insert(path);
195            }
196        }
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use crate::error::Error;
204    use std::fs::File;
205    use std::io::Write;
206    use tempfile::tempdir;
207
208    #[test]
209    fn test_watcher_basic() -> Result<()> {
210        let dir = tempdir().map_err(Error::Io)?;
211        let mut watcher = Watcher::new(dir.path())?;
212        let rx = watcher.start()?;
213
214        let file_path = dir.path().join("test.txt");
215        {
216            let mut file = File::create(&file_path).map_err(Error::Io)?;
217            file.write_all(b"hello").map_err(Error::Io)?;
218            file.sync_all().map_err(Error::Io)?;
219        }
220
221        let events = rx
222            .recv_timeout(Duration::from_secs(2))
223            .map_err(|_| Error::Config("Timeout waiting for watcher event".into()))?;
224
225        assert!(!events.is_empty());
226        assert!(events.iter().any(|p: &PathBuf| p.ends_with("test.txt")));
227
228        watcher.stop();
229        Ok(())
230    }
231}