1use crate::error::Result;
4use crossbeam_channel::Receiver;
5use llmosafe::llmosafe_kernel::{ReasoningLoop, SiftedSynapse};
6use llmosafe::{ResourceGuard, Synapse, WorkingMemory};
7use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher as _};
8use std::collections::HashSet;
9use std::path::{Path, PathBuf};
10use std::sync::mpsc;
11use std::thread;
12use std::time::Duration;
13
14pub struct Watcher {
17 root: PathBuf,
18 inner: Option<RecommendedWatcher>,
19 join_handle: Option<thread::JoinHandle<()>>,
20}
21
22impl Watcher {
23#[must_use]
28 pub fn new(root: &Path) -> Self {
29 Self {
30 root: root.to_owned(),
31 inner: None,
32 join_handle: None,
33 }
34 }
35
36 #[allow(clippy::too_many_lines)]
42 pub fn start(&mut self) -> Result<Receiver<Vec<PathBuf>>> {
43 let (tx, rx) = crossbeam_channel::unbounded();
44 let (event_tx, event_rx) = mpsc::channel();
45
46 let mut watcher = RecommendedWatcher::new(event_tx, Config::default())?;
47
48 if let Err(err) = watcher.watch(&self.root, RecursiveMode::Recursive) {
50 eprintln!(
51 "ix: warning: recursive watch failed: {err}. Falling back to manual walk."
52 );
53
54 let walker = ignore::WalkBuilder::new(&self.root)
55 .hidden(false)
56 .git_ignore(true)
57 .require_git(false)
58 .add_custom_ignore_filename(".ixignore")
59 .filter_entry(move |entry| {
60 let path = entry.path();
61 let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
62
63 if entry.file_type().is_some_and(|t| t.is_dir())
65 && (name == "lost+found"
66 || name == ".git"
67 || name == "node_modules"
68 || name == "target"
69 || name == "__pycache__"
70 || name == ".tox"
71 || name == ".venv"
72 || name == "venv"
73 || name == ".ix")
74 {
75 return false;
76 }
77
78 if entry.file_type().is_some_and(|t| t.is_file()) {
80 if let Ok(metadata) = entry.metadata()
81 && metadata.len() > 10 * 1024 * 1024
82 {
83 return false;
84 }
85 if name == "Cargo.lock"
86 || name == "package-lock.json"
87 || name == "pnpm-lock.yaml"
88 || name == "shard.ix"
89 || name == "shard.ix.tmp"
90 {
91 return false;
92 }
93 }
94
95 if entry.file_type().is_some_and(|t| t.is_file()) {
97 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
98 match ext {
99 "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" |
101 "jpg" | "png" | "gif" | "mp4" | "mp3" | "pdf" |
103 "zip" | "7z" | "rar" |
105 "sqlite" | "db" | "bin" => return false,
107 _ => {}
108 }
109 if name.ends_with(".tar.gz") {
110 return false;
111 }
112 }
113 true
114 })
115 .build();
116
117 let mut loop_guard = ReasoningLoop::<20000>::new();
118 let guard = ResourceGuard::auto(0.5);
119 let mut memory = WorkingMemory::<64>::new(1000);
120
121 for result in walker {
122 let synapse = guard.check().unwrap_or_else(|_| {
124 let mut s = Synapse::new();
125 s.set_raw_entropy(1500); s
127 });
128
129 let sifted = SiftedSynapse::new(synapse);
130 let validated = match memory.update(sifted) {
131 Ok(v) => v,
132 Err(e) => {
133 eprintln!(
134 "ix: critical safety halt during watcher walk: {e:?}. Directory tree too large or RAM low."
135 );
136 break;
137 }
138 };
139
140 if let Err(e) = loop_guard.next_step(validated) {
141 eprintln!("ix: critical reasoning depth exceeded: {e:?}.");
142 break;
143 }
144
145 match result {
146 Ok(entry) => {
147 if entry.file_type().is_some_and(|t| t.is_dir()) {
148 let path = entry.path();
149 if let Err(e) = watcher.watch(path, RecursiveMode::NonRecursive) {
150 eprintln!(
151 "ix: warning: watcher failed for {}: {}",
152 path.display(),
153 e
154 );
155 }
156 }
157 }
158 Err(e) => {
159 eprintln!("ix: warning: watcher skipping path: {e}");
160 }
161 }
162 }
163 }
164
165 self.inner = Some(watcher);
166
167 let handle = thread::spawn(move || {
168 let mut changed_paths = HashSet::new();
169 loop {
170 match event_rx.recv() {
172 Ok(Ok(event)) => {
173 Self::collect_paths(&mut changed_paths, event);
174
175 loop {
177 match event_rx.recv_timeout(Duration::from_millis(500)) {
178 Ok(Ok(event)) => {
179 Self::collect_paths(&mut changed_paths, event);
180 }
181 Ok(Err(_)) => {} Err(mpsc::RecvTimeoutError::Timeout) => {
183 if !changed_paths.is_empty() {
185 let paths: Vec<PathBuf> = changed_paths.drain().collect();
186 if tx.send(paths).is_err() {
187 return; }
189 }
190 break;
191 }
192 Err(mpsc::RecvTimeoutError::Disconnected) => return,
193 }
194 }
195 }
196 Ok(Err(_)) => {},
197 Err(_) => return, }
199 }
200 });
201
202 self.join_handle = Some(handle);
203 Ok(rx)
204 }
205
206 pub fn stop(&mut self) {
208 self.inner.take(); if let Some(handle) = self.join_handle.take() {
210 let _ = handle.join();
211 }
212 }
213
214#[must_use]
216 pub const fn is_running(&self) -> bool {
217 self.inner.is_some()
218 }
219
220 fn collect_paths(set: &mut HashSet<PathBuf>, event: Event) {
221 if event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove() {
222 for path in event.paths {
223 if path.components().any(|c| c.as_os_str() == ".ix") {
225 continue;
226 }
227 set.insert(path);
228 }
229 }
230 }
231}
232
233#[cfg(test)]
234#[allow(clippy::as_conversions, clippy::unwrap_used, clippy::indexing_slicing)]
235mod tests {
236 use super::*;
237 use crate::error::Error;
238 use std::fs::File;
239 use std::io::Write;
240 use tempfile::tempdir;
241
242 #[test]
243 fn test_watcher_basic() -> Result<()> {
244 let dir = tempdir().map_err(Error::Io)?;
245 let mut watcher = Watcher::new(dir.path());
246 let rx = watcher.start()?;
247
248 let file_path = dir.path().join("test.txt");
249 {
250 let mut file = File::create(&file_path).map_err(Error::Io)?;
251 file.write_all(b"hello").map_err(Error::Io)?;
252 file.sync_all().map_err(Error::Io)?;
253 }
254
255 let events = rx
256 .recv_timeout(Duration::from_secs(2))
257 .map_err(|_| Error::Config("Timeout waiting for watcher event".into()))?;
258
259 if events.is_empty() {
260 return Err(Error::Config("No watcher events received".into()));
261 }
262 if !events.iter().any(|p: &PathBuf| p.ends_with("test.txt")) {
263 return Err(Error::Config("test.txt not found in watcher events".into()));
264 }
265
266 watcher.stop();
267 Ok(())
268 }
269}