1use crate::error::Result;
4use crossbeam_channel::Receiver;
5use llmosafe::llmosafe_kernel::{ReasoningLoop, SiftedSynapse};
6use llmosafe::{ResourceGuard, Synapse, WorkingMemory};
7use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher as _};
8use std::collections::HashSet;
9use std::path::{Path, PathBuf};
10use std::sync::mpsc;
11use std::thread;
12use std::time::Duration;
13
14pub struct Watcher {
17 root: PathBuf,
18 inner: Option<RecommendedWatcher>,
19 join_handle: Option<thread::JoinHandle<()>>,
20}
21
22impl Watcher {
23 #[must_use]
28 pub fn new(root: &Path) -> Self {
29 Self {
30 root: root.to_owned(),
31 inner: None,
32 join_handle: None,
33 }
34 }
35
36 #[allow(clippy::too_many_lines)]
42 pub fn start(&mut self) -> Result<Receiver<Vec<PathBuf>>> {
43 let (tx, rx) = crossbeam_channel::unbounded();
44 let (event_tx, event_rx) = mpsc::channel();
45
46 let mut watcher = RecommendedWatcher::new(event_tx, Config::default())?;
47
48 if let Err(err) = watcher.watch(&self.root, RecursiveMode::Recursive) {
50 eprintln!("ix: warning: recursive watch failed: {err}. Falling back to manual walk.");
51
52 let walker = ignore::WalkBuilder::new(&self.root)
53 .hidden(false)
54 .git_ignore(true)
55 .require_git(false)
56 .add_custom_ignore_filename(".ixignore")
57 .filter_entry(move |entry| {
58 let path = entry.path();
59 let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
60
61 if entry.file_type().is_some_and(|t| t.is_dir())
63 && (name == "lost+found"
64 || name == ".git"
65 || name == "node_modules"
66 || name == "target"
67 || name == "__pycache__"
68 || name == ".tox"
69 || name == ".venv"
70 || name == "venv"
71 || name == ".ix")
72 {
73 return false;
74 }
75
76 if entry.file_type().is_some_and(|t| t.is_file()) {
78 if let Ok(metadata) = entry.metadata()
79 && metadata.len() > 10 * 1024 * 1024
80 {
81 return false;
82 }
83 if name == "Cargo.lock"
84 || name == "package-lock.json"
85 || name == "pnpm-lock.yaml"
86 || name == "shard.ix"
87 || name == "shard.ix.tmp"
88 {
89 return false;
90 }
91 }
92
93 if entry.file_type().is_some_and(|t| t.is_file()) {
95 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
96 match ext {
97 "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" |
99 "jpg" | "png" | "gif" | "mp4" | "mp3" | "pdf" |
101 "zip" | "7z" | "rar" |
103 "sqlite" | "db" | "bin" => return false,
105 _ => {}
106 }
107 if name.ends_with(".tar.gz") {
108 return false;
109 }
110 }
111 true
112 })
113 .build();
114
115 let mut loop_guard = ReasoningLoop::<20000>::new();
116 let guard = ResourceGuard::auto(0.5);
117 let mut memory = WorkingMemory::<64>::new(1000);
118
119 for result in walker {
120 let synapse = guard.check().unwrap_or_else(|_| {
122 let mut s = Synapse::new();
123 s.set_raw_entropy(1500); s
125 });
126
127 let sifted = SiftedSynapse::new(synapse);
128 let validated = match memory.update(sifted) {
129 Ok(v) => v,
130 Err(e) => {
131 eprintln!(
132 "ix: critical safety halt during watcher walk: {e:?}. Directory tree too large or RAM low."
133 );
134 break;
135 }
136 };
137
138 if let Err(e) = loop_guard.next_step(validated) {
139 eprintln!("ix: critical reasoning depth exceeded: {e:?}.");
140 break;
141 }
142
143 match result {
144 Ok(entry) => {
145 if entry.file_type().is_some_and(|t| t.is_dir()) {
146 let path = entry.path();
147 if let Err(e) = watcher.watch(path, RecursiveMode::NonRecursive) {
148 eprintln!(
149 "ix: warning: watcher failed for {}: {}",
150 path.display(),
151 e
152 );
153 }
154 }
155 }
156 Err(e) => {
157 eprintln!("ix: warning: watcher skipping path: {e}");
158 }
159 }
160 }
161 }
162
163 self.inner = Some(watcher);
164
165 let handle = thread::spawn(move || {
166 let mut changed_paths = HashSet::new();
167 loop {
168 match event_rx.recv() {
170 Ok(Ok(event)) => {
171 Self::collect_paths(&mut changed_paths, event);
172
173 loop {
175 match event_rx.recv_timeout(Duration::from_millis(500)) {
176 Ok(Ok(event)) => {
177 Self::collect_paths(&mut changed_paths, event);
178 }
179 Ok(Err(_)) => {} Err(mpsc::RecvTimeoutError::Timeout) => {
181 if !changed_paths.is_empty() {
183 let paths: Vec<PathBuf> = changed_paths.drain().collect();
184 if tx.send(paths).is_err() {
185 return; }
187 }
188 break;
189 }
190 Err(mpsc::RecvTimeoutError::Disconnected) => return,
191 }
192 }
193 }
194 Ok(Err(_)) => {}
195 Err(_) => return, }
197 }
198 });
199
200 self.join_handle = Some(handle);
201 Ok(rx)
202 }
203
204 pub fn stop(&mut self) {
206 self.inner.take(); if let Some(handle) = self.join_handle.take() {
208 let _ = handle.join();
209 }
210 }
211
212 #[must_use]
214 pub const fn is_running(&self) -> bool {
215 self.inner.is_some()
216 }
217
218 fn collect_paths(set: &mut HashSet<PathBuf>, event: Event) {
219 if event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove() {
220 for path in event.paths {
221 if path.components().any(|c| c.as_os_str() == ".ix") {
223 continue;
224 }
225 set.insert(path);
226 }
227 }
228 }
229}
230
231#[cfg(test)]
232#[allow(clippy::as_conversions, clippy::unwrap_used, clippy::indexing_slicing)]
233mod tests {
234 use super::*;
235 use crate::error::Error;
236 use std::fs::File;
237 use std::io::Write;
238 use tempfile::tempdir;
239
240 #[test]
241 fn test_watcher_basic() -> Result<()> {
242 let dir = tempdir().map_err(Error::Io)?;
243 let mut watcher = Watcher::new(dir.path());
244 let rx = watcher.start()?;
245
246 let file_path = dir.path().join("test.txt");
247 {
248 let mut file = File::create(&file_path).map_err(Error::Io)?;
249 file.write_all(b"hello").map_err(Error::Io)?;
250 file.sync_all().map_err(Error::Io)?;
251 }
252
253 let events = rx
254 .recv_timeout(Duration::from_secs(2))
255 .map_err(|_| Error::Config("Timeout waiting for watcher event".into()))?;
256
257 if events.is_empty() {
258 return Err(Error::Config("No watcher events received".into()));
259 }
260 if !events.iter().any(|p: &PathBuf| p.ends_with("test.txt")) {
261 return Err(Error::Config("test.txt not found in watcher events".into()));
262 }
263
264 watcher.stop();
265 Ok(())
266 }
267}