1pub mod batch;
2
3use std::path::{Path, PathBuf};
4use std::sync::mpsc;
5use std::time::Duration;
6
7use anyhow::Result;
8use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
9
10use crate::Infigraph;
11use batch::ChangeBatch;
12
13#[derive(Debug, Clone)]
15pub struct WatchEvent {
16 pub kind: WatchEventKind,
17 pub path: PathBuf,
18 pub has_cross_file_calls: bool,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum WatchEventKind {
24 Modified,
25 Created,
26 Removed,
27}
28
29impl std::fmt::Display for WatchEvent {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 let kind = match self.kind {
32 WatchEventKind::Modified => "modified",
33 WatchEventKind::Created => "created",
34 WatchEventKind::Removed => "removed",
35 };
36 if self.has_cross_file_calls {
37 write!(
38 f,
39 "{kind}: {} [cross-file calls detected — full reindex recommended]",
40 self.path.display()
41 )
42 } else {
43 write!(f, "{kind}: {}", self.path.display())
44 }
45 }
46}
47
48pub fn watch_project(
60 prism: &Infigraph,
61 debounce_ms: u64,
62 stop_rx: mpsc::Receiver<()>,
63 on_event: impl Fn(WatchEvent) + Send + 'static,
64) -> Result<()> {
65 watch_project_with_periodic(
66 prism,
67 debounce_ms,
68 stop_rx,
69 on_event,
70 0,
71 None::<fn(&crate::IndexResult)>,
72 )
73}
74
75pub fn watch_project_with_periodic<F>(
76 prism: &Infigraph,
77 debounce_ms: u64,
78 stop_rx: mpsc::Receiver<()>,
79 on_event: impl Fn(WatchEvent) + Send + 'static,
80 periodic_secs: u64,
81 on_periodic: Option<F>,
82) -> Result<()>
83where
84 F: Fn(&crate::IndexResult) + Send + 'static,
85{
86 let (tx, rx) = mpsc::channel::<notify::Result<Event>>();
87
88 let config = Config::default().with_poll_interval(Duration::from_millis(debounce_ms));
89
90 let mut watcher = RecommendedWatcher::new(tx, config)?;
91
92 let ignore_dirs: &[&str] = &[
93 ".infigraph",
94 ".git",
95 "node_modules",
96 "__pycache__",
97 ".venv",
98 "venv",
99 "target",
100 "build",
101 "dist",
102 ".tox",
103 ];
104
105 register_watch_dirs(&mut watcher, prism.root(), ignore_dirs)?;
108
109 let mut changes_since_periodic: usize = 0;
110 let mut last_periodic = std::time::Instant::now();
111
112 let mut batch = ChangeBatch::new(1000);
115
116 loop {
117 if stop_rx.try_recv().is_ok() {
118 break;
119 }
120
121 if periodic_secs > 0
123 && changes_since_periodic > 0
124 && last_periodic.elapsed() >= Duration::from_secs(periodic_secs)
125 {
126 if let Some(ref cb) = on_periodic {
127 match prism.index() {
128 Ok(result) => {
129 if !result.extractions.is_empty() {
130 cb(&result);
131 }
132 }
133 Err(e) => eprintln!("[watch] periodic reindex failed: {e}"),
134 }
135 }
136 changes_since_periodic = 0;
137 last_periodic = std::time::Instant::now();
138 }
139
140 if !batch.is_empty() && batch.is_ready() {
142 let paths = batch.drain();
143 let count = paths.len();
144 eprintln!("[watch] batch indexing {count} files");
145
146 match prism.index_files(&paths) {
147 Ok(result) => {
148 changes_since_periodic += result.indexed_files;
149
150 if let Some(store) = prism.store() {
151 let changed: Vec<&str> =
152 result.extractions.iter().map(|e| e.file.as_str()).collect();
153 if !changed.is_empty() {
154 if let Err(e) =
155 crate::embed::update_embeddings(store, prism.root(), &changed)
156 {
157 eprintln!("[watch] batch embedding update failed: {e}");
158 }
159 }
160 }
161
162 for extraction in &result.extractions {
163 let cross = has_cross_file_calls(prism, &extraction.file);
164 let abs_path = prism.root().join(&extraction.file);
165 on_event(WatchEvent {
166 kind: WatchEventKind::Modified,
167 path: abs_path,
168 has_cross_file_calls: cross,
169 });
170 }
171 }
172 Err(e) => eprintln!("[watch] batch reindex failed: {e}"),
173 }
174 }
175
176 match rx.recv_timeout(Duration::from_millis(200)) {
177 Ok(Ok(event)) => {
178 let watch_kind = match event.kind {
179 EventKind::Create(_) => WatchEventKind::Created,
180 EventKind::Modify(_) => WatchEventKind::Modified,
181 EventKind::Remove(_) => WatchEventKind::Removed,
182 _ => continue,
183 };
184
185 for path in event.paths {
186 if should_ignore(&path, ignore_dirs) {
187 continue;
188 }
189
190 let rel = match path.strip_prefix(prism.root()) {
191 Ok(r) => r.to_string_lossy().replace('\\', "/"),
192 Err(_) => continue,
193 };
194
195 match watch_kind {
196 WatchEventKind::Removed => {
197 let _ = prism.remove_file(&path);
198 changes_since_periodic += 1;
199 on_event(WatchEvent {
200 kind: watch_kind.clone(),
201 path,
202 has_cross_file_calls: false,
203 });
204 }
205 WatchEventKind::Created | WatchEventKind::Modified => {
206 if prism.registry().for_file(&rel).is_some() {
207 batch.add(path);
208 }
209 }
210 }
211 }
212 }
213 Ok(Err(e)) => eprintln!("watch error: {e}"),
214 Err(mpsc::RecvTimeoutError::Timeout) => {}
215 Err(mpsc::RecvTimeoutError::Disconnected) => break,
216 }
217 }
218
219 Ok(())
220}
221
222pub fn watch_project_auto_resolve(
230 prism: &Infigraph,
231 debounce_ms: u64,
232 stop_rx: mpsc::Receiver<()>,
233 log_prefix: &str,
234 make_registry: impl Fn() -> anyhow::Result<crate::lang::LanguageRegistry> + Send + 'static,
235) -> Result<()> {
236 let root = prism.root().to_path_buf();
237 watch_project(prism, debounce_ms, stop_rx, {
238 let prefix = log_prefix.to_string();
239 move |evt: WatchEvent| {
240 if evt.has_cross_file_calls {
241 eprintln!("[watch {prefix}] {evt}");
242 if let Ok(reg) = make_registry() {
243 if let Ok(mut p) = Infigraph::open(&root, reg) {
244 if p.init().is_ok() {
245 let changed_rel = evt
246 .path
247 .strip_prefix(&root)
248 .map(|r| r.to_string_lossy().replace('\\', "/"))
249 .unwrap_or_else(|_| evt.path.to_string_lossy().replace('\\', "/"));
250 let mut affected_files = vec![evt.path.clone()];
251
252 if let Some(store) = p.store() {
253 let deps = get_cross_file_dependents(store, &changed_rel);
254 for dep_rel in deps {
255 let dep_abs = root.join(&dep_rel);
256 if dep_abs.exists() {
257 affected_files.push(dep_abs);
258 }
259 }
260 }
261
262 match p.index_files(&affected_files) {
263 Ok(r) => {
264 eprintln!(
265 "[watch {prefix}] targeted reindex: {}/{} affected files",
266 r.indexed_files, r.total_files
267 );
268
269 if let Some(store) = p.store() {
270 let file_strs: Vec<String> =
271 r.extractions.iter().map(|e| e.file.clone()).collect();
272 match crate::resolve::re_resolve_for_files(
273 store,
274 &file_strs,
275 &r.extractions,
276 None,
277 ) {
278 Ok(stats) => {
279 eprintln!("[watch {prefix}] re-resolved: {stats}")
280 }
281 Err(e) => {
282 eprintln!("[watch {prefix}] re-resolve failed: {e}")
283 }
284 }
285
286 let changed: Vec<&str> =
287 r.extractions.iter().map(|e| e.file.as_str()).collect();
288 match crate::embed::update_embeddings(
289 store, &root, &changed,
290 ) {
291 Ok(n) => {
292 eprintln!("[watch {prefix}] updated {n} embeddings")
293 }
294 Err(e) => eprintln!(
295 "[watch {prefix}] embedding update failed: {e}"
296 ),
297 }
298 }
299 }
300 Err(e) => {
301 eprintln!("[watch {prefix}] targeted reindex failed: {e}")
302 }
303 }
304 }
305 }
306 }
307 } else {
308 eprintln!("[watch {prefix}] {evt}");
309 }
310 }
311 })
312}
313
314fn get_cross_file_dependents(store: &crate::graph::GraphStore, rel_path: &str) -> Vec<String> {
316 let conn = match store.connection() {
317 Ok(c) => c,
318 Err(_) => return Vec::new(),
319 };
320 let escaped = rel_path.replace('\'', "\\'");
321 let mut dependents = std::collections::HashSet::new();
322
323 let q1 = format!(
324 "MATCH (a:Symbol)-[:CALLS]->(b:Symbol) WHERE a.file = '{escaped}' AND b.file <> '{escaped}' RETURN DISTINCT b.file"
325 );
326 if let Ok(result) = conn.query(&q1) {
327 for row in result {
328 if let Some(val) = row.first() {
329 dependents.insert(val.to_string());
330 }
331 }
332 }
333
334 let q2 = format!(
335 "MATCH (a:Symbol)-[:CALLS]->(b:Symbol) WHERE b.file = '{escaped}' AND a.file <> '{escaped}' RETURN DISTINCT a.file"
336 );
337 if let Ok(result) = conn.query(&q2) {
338 for row in result {
339 if let Some(val) = row.first() {
340 dependents.insert(val.to_string());
341 }
342 }
343 }
344
345 dependents.into_iter().collect()
346}
347
348fn has_cross_file_calls(prism: &Infigraph, rel_path: &str) -> bool {
350 let store = match prism.store() {
351 Some(s) => s,
352 None => return false,
353 };
354 let conn = match store.connection() {
355 Ok(c) => c,
356 Err(_) => return false,
357 };
358 let escaped = rel_path.replace('\'', "\\'");
359 let q = format!(
360 "MATCH (a:Symbol)-[:CALLS]->(b:Symbol) WHERE a.file = '{escaped}' AND b.file <> '{escaped}' RETURN count(*) LIMIT 1"
361 );
362 if let Ok(mut result) = conn.query(&q) {
363 if let Some(row) = result.next() {
364 if let Some(val) = row.first() {
365 if val.to_string().parse::<u64>().unwrap_or(0) > 0 {
366 return true;
367 }
368 }
369 }
370 }
371 let q2 = format!(
372 "MATCH (a:Symbol)-[:CALLS]->(b:Symbol) WHERE b.file = '{escaped}' AND a.file <> '{escaped}' RETURN count(*) LIMIT 1"
373 );
374 if let Ok(mut result) = conn.query(&q2) {
375 if let Some(row) = result.next() {
376 if let Some(val) = row.first() {
377 return val.to_string().parse::<u64>().unwrap_or(0) > 0;
378 }
379 }
380 }
381 false
382}
383
384fn should_ignore(path: &Path, ignore_dirs: &[&str]) -> bool {
385 path.components().any(|c| {
386 let s = c.as_os_str().to_string_lossy();
387 ignore_dirs.contains(&s.as_ref()) || s.starts_with('.')
388 })
389}
390
391fn register_watch_dirs(
392 watcher: &mut RecommendedWatcher,
393 root: &Path,
394 ignore_dirs: &[&str],
395) -> Result<()> {
396 watcher.watch(root, RecursiveMode::NonRecursive)?;
397 register_subdirs(watcher, root, ignore_dirs);
398 Ok(())
399}
400
401fn register_subdirs(watcher: &mut RecommendedWatcher, dir: &Path, ignore_dirs: &[&str]) {
402 let entries = match std::fs::read_dir(dir) {
403 Ok(e) => e,
404 Err(_) => return,
405 };
406 for entry in entries.flatten() {
407 let path = entry.path();
408 if !path.is_dir() {
409 continue;
410 }
411 let name = entry.file_name();
412 let name_str = name.to_string_lossy();
413 if ignore_dirs.contains(&name_str.as_ref()) || name_str.starts_with('.') {
414 continue;
415 }
416 let _ = watcher.watch(&path, RecursiveMode::NonRecursive);
417 register_subdirs(watcher, &path, ignore_dirs);
418 }
419}