1use crate::error::Result;
4use crossbeam_channel::Receiver;
5use llmosafe::llmosafe_kernel::{ReasoningLoop, SiftedSynapse};
6use llmosafe::{ResourceGuard, Synapse, WorkingMemory};
7use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher as _};
8use std::collections::HashSet;
9use std::path::{Path, PathBuf};
10use std::sync::mpsc;
11use std::thread;
12use std::time::Duration;
13
14pub struct Watcher {
15 root: PathBuf,
16 watcher: Option<RecommendedWatcher>,
17 thread: Option<thread::JoinHandle<()>>,
18}
19
20impl Watcher {
21 pub fn new(root: &Path) -> Result<Self> {
22 Ok(Self {
23 root: root.to_owned(),
24 watcher: None,
25 thread: None,
26 })
27 }
28
29 pub fn start(&mut self) -> Result<Receiver<Vec<PathBuf>>> {
30 let (tx, rx) = crossbeam_channel::unbounded();
31 let (event_tx, event_rx) = mpsc::channel();
32
33 let mut watcher = RecommendedWatcher::new(event_tx, Config::default())?;
34
35 if let Err(err) = watcher.watch(&self.root, RecursiveMode::Recursive) {
37 eprintln!(
38 "ix: warning: recursive watch failed: {}. Falling back to manual walk.",
39 err
40 );
41
42 let walker = ignore::WalkBuilder::new(&self.root)
43 .hidden(false)
44 .git_ignore(true)
45 .require_git(false)
46 .add_custom_ignore_filename(".ixignore")
47 .filter_entry(move |entry| {
48 let path = entry.path();
49 let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
50
51 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false)
53 && (name == "lost+found"
54 || name == ".git"
55 || name == "node_modules"
56 || name == "target"
57 || name == "__pycache__"
58 || name == ".tox"
59 || name == ".venv"
60 || name == "venv"
61 || name == ".ix")
62 {
63 return false;
64 }
65
66 if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
68 if let Ok(metadata) = entry.metadata()
69 && metadata.len() > 10 * 1024 * 1024
70 {
71 return false;
72 }
73 if name == "Cargo.lock"
74 || name == "package-lock.json"
75 || name == "pnpm-lock.yaml"
76 || name == "shard.ix"
77 || name == "shard.ix.tmp"
78 {
79 return false;
80 }
81 }
82
83 if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
85 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
86 match ext {
87 "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" |
89 "jpg" | "png" | "gif" | "mp4" | "mp3" | "pdf" |
91 "zip" | "7z" | "rar" |
93 "sqlite" | "db" | "bin" => return false,
95 _ => {}
96 }
97 if name.ends_with(".tar.gz") {
98 return false;
99 }
100 }
101 true
102 })
103 .build();
104
105 let mut loop_guard = ReasoningLoop::<20000>::new();
106 let guard = ResourceGuard::auto(0.5);
107 let mut memory = WorkingMemory::<64>::new(1000);
108
109 for result in walker {
110 let synapse = guard.check().unwrap_or_else(|_| {
112 let mut s = Synapse::new();
113 s.set_raw_entropy(1500); s
115 });
116
117 let sifted = SiftedSynapse::new(synapse);
118 let validated = match memory.update(sifted) {
119 Ok(v) => v,
120 Err(e) => {
121 eprintln!(
122 "ix: critical safety halt during watcher walk: {:?}. Directory tree too large or RAM low.",
123 e
124 );
125 break;
126 }
127 };
128
129 if let Err(e) = loop_guard.next_step(validated) {
130 eprintln!("ix: critical reasoning depth exceeded: {:?}.", e);
131 break;
132 }
133
134 match result {
135 Ok(entry) => {
136 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
137 let path = entry.path();
138 if let Err(e) = watcher.watch(path, RecursiveMode::NonRecursive) {
139 eprintln!(
140 "ix: warning: watcher failed for {}: {}",
141 path.display(),
142 e
143 );
144 }
145 }
146 }
147 Err(e) => {
148 eprintln!("ix: warning: watcher skipping path: {}", e);
149 }
150 }
151 }
152 }
153
154 self.watcher = Some(watcher);
155
156 let handle = thread::spawn(move || {
157 let mut changed_paths = HashSet::new();
158 loop {
159 match event_rx.recv() {
161 Ok(Ok(event)) => {
162 Self::collect_paths(&mut changed_paths, event);
163
164 loop {
166 match event_rx.recv_timeout(Duration::from_millis(500)) {
167 Ok(Ok(event)) => {
168 Self::collect_paths(&mut changed_paths, event);
169 }
170 Ok(Err(_)) => continue, Err(mpsc::RecvTimeoutError::Timeout) => {
172 if !changed_paths.is_empty() {
174 let paths: Vec<PathBuf> = changed_paths.drain().collect();
175 if tx.send(paths).is_err() {
176 return; }
178 }
179 break;
180 }
181 Err(mpsc::RecvTimeoutError::Disconnected) => return,
182 }
183 }
184 }
185 Ok(Err(_)) => continue,
186 Err(_) => return, }
188 }
189 });
190
191 self.thread = Some(handle);
192 Ok(rx)
193 }
194
195 pub fn stop(&mut self) {
196 self.watcher.take(); if let Some(handle) = self.thread.take() {
198 let _ = handle.join();
199 }
200 }
201
202 pub fn is_running(&self) -> bool {
203 self.watcher.is_some()
204 }
205
206 fn collect_paths(set: &mut HashSet<PathBuf>, event: Event) {
207 if event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove() {
208 for path in event.paths {
209 if path.components().any(|c| c.as_os_str() == ".ix") {
211 continue;
212 }
213 set.insert(path);
214 }
215 }
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222 use crate::error::Error;
223 use std::fs::File;
224 use std::io::Write;
225 use tempfile::tempdir;
226
227 #[test]
228 fn test_watcher_basic() -> Result<()> {
229 let dir = tempdir().map_err(Error::Io)?;
230 let mut watcher = Watcher::new(dir.path())?;
231 let rx = watcher.start()?;
232
233 let file_path = dir.path().join("test.txt");
234 {
235 let mut file = File::create(&file_path).map_err(Error::Io)?;
236 file.write_all(b"hello").map_err(Error::Io)?;
237 file.sync_all().map_err(Error::Io)?;
238 }
239
240 let events = rx
241 .recv_timeout(Duration::from_secs(2))
242 .map_err(|_| Error::Config("Timeout waiting for watcher event".into()))?;
243
244 assert!(!events.is_empty());
245 assert!(events.iter().any(|p: &PathBuf| p.ends_with("test.txt")));
246
247 watcher.stop();
248 Ok(())
249 }
250}