1use crate::error::Result;
4use crossbeam_channel::Receiver;
5use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher as _};
6use std::collections::HashSet;
7use std::path::{Path, PathBuf};
8use std::sync::mpsc;
9use std::thread;
10use std::time::Duration;
11use llmosafe::llmosafe_kernel::{ReasoningLoop, SiftedSynapse};
12use llmosafe::{ResourceGuard, Synapse, WorkingMemory};
13
14pub struct Watcher {
15 root: PathBuf,
16 watcher: Option<RecommendedWatcher>,
17 thread: Option<thread::JoinHandle<()>>,
18}
19
20impl Watcher {
21 pub fn new(root: &Path) -> Result<Self> {
22 Ok(Self {
23 root: root.to_owned(),
24 watcher: None,
25 thread: None,
26 })
27 }
28
29 pub fn start(&mut self) -> Result<Receiver<Vec<PathBuf>>> {
30 let (tx, rx) = crossbeam_channel::unbounded();
31 let (event_tx, event_rx) = mpsc::channel();
32
33 let mut watcher = RecommendedWatcher::new(event_tx, Config::default())?;
34
35 if let Err(err) = watcher.watch(&self.root, RecursiveMode::Recursive) {
37 eprintln!("ix: warning: recursive watch failed: {}. Falling back to manual walk.", err);
38
39 let walker = ignore::WalkBuilder::new(&self.root)
40 .hidden(false)
41 .git_ignore(true)
42 .require_git(false)
43 .add_custom_ignore_filename(".ixignore")
44 .filter_entry(move |entry| {
45 let path = entry.path();
46 let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
47
48 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false)
50 && (name == "lost+found" || name == ".git" || name == "node_modules" ||
51 name == "target" || name == "__pycache__" || name == ".tox" ||
52 name == ".venv" || name == "venv" || name == ".ix")
53 {
54 return false;
55 }
56
57 if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
59 if let Ok(metadata) = entry.metadata()
60 && metadata.len() > 10 * 1024 * 1024
61 {
62 return false;
63 }
64 if name == "Cargo.lock" || name == "package-lock.json" || name == "pnpm-lock.yaml" ||
65 name == "shard.ix" || name == "shard.ix.tmp"
66 {
67 return false;
68 }
69 }
70
71 if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
73 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
74 match ext {
75 "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" |
77 "jpg" | "png" | "gif" | "mp4" | "mp3" | "pdf" |
79 "zip" | "7z" | "rar" |
81 "sqlite" | "db" | "bin" => return false,
83 _ => {}
84 }
85 if name.ends_with(".tar.gz") {
86 return false;
87 }
88 }
89 true
90 })
91 .build();
92
93 let mut loop_guard = ReasoningLoop::<20000>::new();
94 let guard = ResourceGuard::auto(0.5);
95 let mut memory = WorkingMemory::<64>::new(1000);
96
97 for result in walker {
98 let synapse = guard.check().unwrap_or_else(|_| {
100 let mut s = Synapse::new();
101 s.set_raw_entropy(1500); s
103 });
104
105 let sifted = SiftedSynapse::new(synapse);
106 let validated = match memory.update(sifted) {
107 Ok(v) => v,
108 Err(e) => {
109 eprintln!("ix: critical safety halt during watcher walk: {:?}. Directory tree too large or RAM low.", e);
110 break;
111 }
112 };
113
114 if let Err(e) = loop_guard.next_step(validated) {
115 eprintln!("ix: critical reasoning depth exceeded: {:?}.", e);
116 break;
117 }
118
119 match result {
120 Ok(entry) => {
121 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
122 let path = entry.path();
123 if let Err(e) = watcher.watch(path, RecursiveMode::NonRecursive) {
124 eprintln!("ix: warning: watcher failed for {}: {}", path.display(), e);
125 }
126 }
127 }
128 Err(e) => {
129 eprintln!("ix: warning: watcher skipping path: {}", e);
130 }
131 }
132 }
133 }
134
135 self.watcher = Some(watcher);
136
137 let handle = thread::spawn(move || {
138 let mut changed_paths = HashSet::new();
139 loop {
140 match event_rx.recv() {
142 Ok(Ok(event)) => {
143 Self::collect_paths(&mut changed_paths, event);
144
145 loop {
147 match event_rx.recv_timeout(Duration::from_millis(500)) {
148 Ok(Ok(event)) => {
149 Self::collect_paths(&mut changed_paths, event);
150 }
151 Ok(Err(_)) => continue, Err(mpsc::RecvTimeoutError::Timeout) => {
153 if !changed_paths.is_empty() {
155 let paths: Vec<PathBuf> = changed_paths.drain().collect();
156 if tx.send(paths).is_err() {
157 return; }
159 }
160 break;
161 }
162 Err(mpsc::RecvTimeoutError::Disconnected) => return,
163 }
164 }
165 }
166 Ok(Err(_)) => continue,
167 Err(_) => return, }
169 }
170 });
171
172 self.thread = Some(handle);
173 Ok(rx)
174 }
175
176 pub fn stop(&mut self) {
177 self.watcher.take(); if let Some(handle) = self.thread.take() {
179 let _ = handle.join();
180 }
181 }
182
183 pub fn is_running(&self) -> bool {
184 self.watcher.is_some()
185 }
186
187 fn collect_paths(set: &mut HashSet<PathBuf>, event: Event) {
188 if event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove() {
189 for path in event.paths {
190 if path.components().any(|c| c.as_os_str() == ".ix") {
192 continue;
193 }
194 set.insert(path);
195 }
196 }
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use crate::error::Error;
204 use std::fs::File;
205 use std::io::Write;
206 use tempfile::tempdir;
207
208 #[test]
209 fn test_watcher_basic() -> Result<()> {
210 let dir = tempdir().map_err(Error::Io)?;
211 let mut watcher = Watcher::new(dir.path())?;
212 let rx = watcher.start()?;
213
214 let file_path = dir.path().join("test.txt");
215 {
216 let mut file = File::create(&file_path).map_err(Error::Io)?;
217 file.write_all(b"hello").map_err(Error::Io)?;
218 file.sync_all().map_err(Error::Io)?;
219 }
220
221 let events = rx
222 .recv_timeout(Duration::from_secs(2))
223 .map_err(|_| Error::Config("Timeout waiting for watcher event".into()))?;
224
225 assert!(!events.is_empty());
226 assert!(events.iter().any(|p: &PathBuf| p.ends_with("test.txt")));
227
228 watcher.stop();
229 Ok(())
230 }
231}