Skip to main content

infigraph_core/watch/
mod.rs

1pub mod batch;
2
3use std::path::{Path, PathBuf};
4use std::sync::mpsc;
5use std::time::Duration;
6
7use anyhow::Result;
8use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
9
10use crate::Infigraph;
11use batch::ChangeBatch;
12
13/// A single file-change event emitted by the watcher.
14#[derive(Debug, Clone)]
15pub struct WatchEvent {
16    pub kind: WatchEventKind,
17    pub path: PathBuf,
18    /// True if this file has cross-file CALLS edges — full reindex needed to re-resolve them.
19    pub has_cross_file_calls: bool,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum WatchEventKind {
24    Modified,
25    Created,
26    Removed,
27}
28
29impl std::fmt::Display for WatchEvent {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        let kind = match self.kind {
32            WatchEventKind::Modified => "modified",
33            WatchEventKind::Created => "created",
34            WatchEventKind::Removed => "removed",
35        };
36        if self.has_cross_file_calls {
37            write!(
38                f,
39                "{kind}: {} [cross-file calls detected — full reindex recommended]",
40                self.path.display()
41            )
42        } else {
43            write!(f, "{kind}: {}", self.path.display())
44        }
45    }
46}
47
48/// Watch a project directory and auto-reindex on file changes.
49///
50/// After reindexing a changed file, checks if it has cross-file CALLS edges.
51/// If so, sets `WatchEvent.has_cross_file_calls = true` so the caller can
52/// prompt the user to run a full reindex (to re-resolve dangling call targets).
53///
54/// `on_periodic` is called every `periodic_secs` seconds when at least one file
55/// has changed since the last call. It receives the IndexResult from a full
56/// reindex and can run expensive post-processing (e.g., SCIP import).
57///
58/// Blocks until `stop_rx` receives a signal.
59pub fn watch_project(
60    prism: &Infigraph,
61    debounce_ms: u64,
62    stop_rx: mpsc::Receiver<()>,
63    on_event: impl Fn(WatchEvent) + Send + 'static,
64) -> Result<()> {
65    watch_project_with_periodic(
66        prism,
67        debounce_ms,
68        stop_rx,
69        on_event,
70        0,
71        None::<fn(&crate::IndexResult)>,
72    )
73}
74
75pub fn watch_project_with_periodic<F>(
76    prism: &Infigraph,
77    debounce_ms: u64,
78    stop_rx: mpsc::Receiver<()>,
79    on_event: impl Fn(WatchEvent) + Send + 'static,
80    periodic_secs: u64,
81    on_periodic: Option<F>,
82) -> Result<()>
83where
84    F: Fn(&crate::IndexResult) + Send + 'static,
85{
86    let (tx, rx) = mpsc::channel::<notify::Result<Event>>();
87
88    let config = Config::default().with_poll_interval(Duration::from_millis(debounce_ms));
89
90    let mut watcher = RecommendedWatcher::new(tx, config)?;
91
92    let ignore_dirs: &[&str] = &[
93        ".infigraph",
94        ".git",
95        "node_modules",
96        "__pycache__",
97        ".venv",
98        "venv",
99        "target",
100        "build",
101        "dist",
102        ".tox",
103    ];
104
105    // Watch directories selectively instead of RecursiveMode::Recursive
106    // to avoid exhausting file descriptors on repos with node_modules/target
107    register_watch_dirs(&mut watcher, prism.root(), ignore_dirs)?;
108
109    let mut changes_since_periodic: usize = 0;
110    let mut last_periodic = std::time::Instant::now();
111
112    // Batch accumulator: collect file changes over a 1-second window
113    // then index them all at once using the bulk write path.
114    let mut batch = ChangeBatch::new(1000);
115
116    loop {
117        if stop_rx.try_recv().is_ok() {
118            break;
119        }
120
121        // Periodic SCIP refresh: if changes accumulated and enough time passed
122        if periodic_secs > 0
123            && changes_since_periodic > 0
124            && last_periodic.elapsed() >= Duration::from_secs(periodic_secs)
125        {
126            if let Some(ref cb) = on_periodic {
127                match prism.index() {
128                    Ok(result) => {
129                        if !result.extractions.is_empty() {
130                            cb(&result);
131                        }
132                    }
133                    Err(e) => eprintln!("[watch] periodic reindex failed: {e}"),
134                }
135            }
136            changes_since_periodic = 0;
137            last_periodic = std::time::Instant::now();
138        }
139
140        // Flush the batch when the window has closed
141        if !batch.is_empty() && batch.is_ready() {
142            let paths = batch.drain();
143            let count = paths.len();
144            eprintln!("[watch] batch indexing {count} files");
145
146            match prism.index_files(&paths) {
147                Ok(result) => {
148                    changes_since_periodic += result.indexed_files;
149
150                    if let Some(store) = prism.store() {
151                        let changed: Vec<&str> =
152                            result.extractions.iter().map(|e| e.file.as_str()).collect();
153                        if !changed.is_empty() {
154                            if let Err(e) =
155                                crate::embed::update_embeddings(store, prism.root(), &changed)
156                            {
157                                eprintln!("[watch] batch embedding update failed: {e}");
158                            }
159                        }
160                    }
161
162                    for extraction in &result.extractions {
163                        let cross = has_cross_file_calls(prism, &extraction.file);
164                        let abs_path = prism.root().join(&extraction.file);
165                        on_event(WatchEvent {
166                            kind: WatchEventKind::Modified,
167                            path: abs_path,
168                            has_cross_file_calls: cross,
169                        });
170                    }
171                }
172                Err(e) => eprintln!("[watch] batch reindex failed: {e}"),
173            }
174        }
175
176        match rx.recv_timeout(Duration::from_millis(200)) {
177            Ok(Ok(event)) => {
178                let watch_kind = match event.kind {
179                    EventKind::Create(_) => WatchEventKind::Created,
180                    EventKind::Modify(_) => WatchEventKind::Modified,
181                    EventKind::Remove(_) => WatchEventKind::Removed,
182                    _ => continue,
183                };
184
185                for path in event.paths {
186                    if should_ignore(&path, ignore_dirs) {
187                        continue;
188                    }
189
190                    let rel = match path.strip_prefix(prism.root()) {
191                        Ok(r) => r.to_string_lossy().replace('\\', "/"),
192                        Err(_) => continue,
193                    };
194
195                    match watch_kind {
196                        WatchEventKind::Removed => {
197                            let _ = prism.remove_file(&path);
198                            changes_since_periodic += 1;
199                            on_event(WatchEvent {
200                                kind: watch_kind.clone(),
201                                path,
202                                has_cross_file_calls: false,
203                            });
204                        }
205                        WatchEventKind::Created | WatchEventKind::Modified => {
206                            if prism.registry().for_file(&rel).is_some() {
207                                batch.add(path);
208                            }
209                        }
210                    }
211                }
212            }
213            Ok(Err(e)) => eprintln!("watch error: {e}"),
214            Err(mpsc::RecvTimeoutError::Timeout) => {}
215            Err(mpsc::RecvTimeoutError::Disconnected) => break,
216        }
217    }
218
219    Ok(())
220}
221
222/// Like `watch_project` but automatically re-resolves cross-file call edges
223/// when affected by a change, keeping call resolution accurate without user intervention.
224///
225/// Instead of running a full `prism.index()` (re-parsing every file), this collects
226/// the changed file plus its cross-file dependents and uses `prism.index_files()` to
227/// re-index only the affected subset, then runs targeted re-resolution via
228/// `resolve::re_resolve_for_files()`.
229pub fn watch_project_auto_resolve(
230    prism: &Infigraph,
231    debounce_ms: u64,
232    stop_rx: mpsc::Receiver<()>,
233    log_prefix: &str,
234    make_registry: impl Fn() -> anyhow::Result<crate::lang::LanguageRegistry> + Send + 'static,
235) -> Result<()> {
236    let root = prism.root().to_path_buf();
237    watch_project(prism, debounce_ms, stop_rx, {
238        let prefix = log_prefix.to_string();
239        move |evt: WatchEvent| {
240            if evt.has_cross_file_calls {
241                eprintln!("[watch {prefix}] {evt}");
242                if let Ok(reg) = make_registry() {
243                    if let Ok(mut p) = Infigraph::open(&root, reg) {
244                        if p.init().is_ok() {
245                            let changed_rel = evt
246                                .path
247                                .strip_prefix(&root)
248                                .map(|r| r.to_string_lossy().replace('\\', "/"))
249                                .unwrap_or_else(|_| evt.path.to_string_lossy().replace('\\', "/"));
250                            let mut affected_files = vec![evt.path.clone()];
251
252                            if let Some(store) = p.store() {
253                                let deps = get_cross_file_dependents(store, &changed_rel);
254                                for dep_rel in deps {
255                                    let dep_abs = root.join(&dep_rel);
256                                    if dep_abs.exists() {
257                                        affected_files.push(dep_abs);
258                                    }
259                                }
260                            }
261
262                            match p.index_files(&affected_files) {
263                                Ok(r) => {
264                                    eprintln!(
265                                        "[watch {prefix}] targeted reindex: {}/{} affected files",
266                                        r.indexed_files, r.total_files
267                                    );
268
269                                    if let Some(store) = p.store() {
270                                        let file_strs: Vec<String> =
271                                            r.extractions.iter().map(|e| e.file.clone()).collect();
272                                        match crate::resolve::re_resolve_for_files(
273                                            store,
274                                            &file_strs,
275                                            &r.extractions,
276                                            None,
277                                        ) {
278                                            Ok(stats) => {
279                                                eprintln!("[watch {prefix}] re-resolved: {stats}")
280                                            }
281                                            Err(e) => {
282                                                eprintln!("[watch {prefix}] re-resolve failed: {e}")
283                                            }
284                                        }
285
286                                        let changed: Vec<&str> =
287                                            r.extractions.iter().map(|e| e.file.as_str()).collect();
288                                        match crate::embed::update_embeddings(
289                                            store, &root, &changed,
290                                        ) {
291                                            Ok(n) => {
292                                                eprintln!("[watch {prefix}] updated {n} embeddings")
293                                            }
294                                            Err(e) => eprintln!(
295                                                "[watch {prefix}] embedding update failed: {e}"
296                                            ),
297                                        }
298                                    }
299                                }
300                                Err(e) => {
301                                    eprintln!("[watch {prefix}] targeted reindex failed: {e}")
302                                }
303                            }
304                        }
305                    }
306                }
307            } else {
308                eprintln!("[watch {prefix}] {evt}");
309            }
310        }
311    })
312}
313
314/// Returns the relative paths of files that have cross-file CALLS edges to/from the given file.
315fn get_cross_file_dependents(store: &crate::graph::GraphStore, rel_path: &str) -> Vec<String> {
316    let conn = match store.connection() {
317        Ok(c) => c,
318        Err(_) => return Vec::new(),
319    };
320    let escaped = rel_path.replace('\'', "\\'");
321    let mut dependents = std::collections::HashSet::new();
322
323    let q1 = format!(
324        "MATCH (a:Symbol)-[:CALLS]->(b:Symbol) WHERE a.file = '{escaped}' AND b.file <> '{escaped}' RETURN DISTINCT b.file"
325    );
326    if let Ok(result) = conn.query(&q1) {
327        for row in result {
328            if let Some(val) = row.first() {
329                dependents.insert(val.to_string());
330            }
331        }
332    }
333
334    let q2 = format!(
335        "MATCH (a:Symbol)-[:CALLS]->(b:Symbol) WHERE b.file = '{escaped}' AND a.file <> '{escaped}' RETURN DISTINCT a.file"
336    );
337    if let Ok(result) = conn.query(&q2) {
338        for row in result {
339            if let Some(val) = row.first() {
340                dependents.insert(val.to_string());
341            }
342        }
343    }
344
345    dependents.into_iter().collect()
346}
347
348/// Returns true if the file has any resolved CALLS edges to/from symbols in other files.
349fn has_cross_file_calls(prism: &Infigraph, rel_path: &str) -> bool {
350    let store = match prism.store() {
351        Some(s) => s,
352        None => return false,
353    };
354    let conn = match store.connection() {
355        Ok(c) => c,
356        Err(_) => return false,
357    };
358    let escaped = rel_path.replace('\'', "\\'");
359    let q = format!(
360        "MATCH (a:Symbol)-[:CALLS]->(b:Symbol) WHERE a.file = '{escaped}' AND b.file <> '{escaped}' RETURN count(*) LIMIT 1"
361    );
362    if let Ok(mut result) = conn.query(&q) {
363        if let Some(row) = result.next() {
364            if let Some(val) = row.first() {
365                if val.to_string().parse::<u64>().unwrap_or(0) > 0 {
366                    return true;
367                }
368            }
369        }
370    }
371    let q2 = format!(
372        "MATCH (a:Symbol)-[:CALLS]->(b:Symbol) WHERE b.file = '{escaped}' AND a.file <> '{escaped}' RETURN count(*) LIMIT 1"
373    );
374    if let Ok(mut result) = conn.query(&q2) {
375        if let Some(row) = result.next() {
376            if let Some(val) = row.first() {
377                return val.to_string().parse::<u64>().unwrap_or(0) > 0;
378            }
379        }
380    }
381    false
382}
383
384fn should_ignore(path: &Path, ignore_dirs: &[&str]) -> bool {
385    path.components().any(|c| {
386        let s = c.as_os_str().to_string_lossy();
387        ignore_dirs.contains(&s.as_ref()) || s.starts_with('.')
388    })
389}
390
391fn register_watch_dirs(
392    watcher: &mut RecommendedWatcher,
393    root: &Path,
394    ignore_dirs: &[&str],
395) -> Result<()> {
396    watcher.watch(root, RecursiveMode::NonRecursive)?;
397    register_subdirs(watcher, root, ignore_dirs);
398    Ok(())
399}
400
401fn register_subdirs(watcher: &mut RecommendedWatcher, dir: &Path, ignore_dirs: &[&str]) {
402    let entries = match std::fs::read_dir(dir) {
403        Ok(e) => e,
404        Err(_) => return,
405    };
406    for entry in entries.flatten() {
407        let path = entry.path();
408        if !path.is_dir() {
409            continue;
410        }
411        let name = entry.file_name();
412        let name_str = name.to_string_lossy();
413        if ignore_dirs.contains(&name_str.as_ref()) || name_str.starts_with('.') {
414            continue;
415        }
416        let _ = watcher.watch(&path, RecursiveMode::NonRecursive);
417        register_subdirs(watcher, &path, ignore_dirs);
418    }
419}