pub mod batch;
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::time::Duration;
use anyhow::Result;
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use crate::Infigraph;
use batch::ChangeBatch;
#[derive(Debug, Clone)]
pub struct WatchEvent {
pub kind: WatchEventKind,
pub path: PathBuf,
pub has_cross_file_calls: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WatchEventKind {
Modified,
Created,
Removed,
}
impl std::fmt::Display for WatchEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let kind = match self.kind {
WatchEventKind::Modified => "modified",
WatchEventKind::Created => "created",
WatchEventKind::Removed => "removed",
};
if self.has_cross_file_calls {
write!(
f,
"{kind}: {} [cross-file calls detected — full reindex recommended]",
self.path.display()
)
} else {
write!(f, "{kind}: {}", self.path.display())
}
}
}
pub fn watch_project(
prism: &Infigraph,
debounce_ms: u64,
stop_rx: mpsc::Receiver<()>,
on_event: impl Fn(WatchEvent) + Send + 'static,
) -> Result<()> {
watch_project_with_periodic(
prism,
debounce_ms,
stop_rx,
on_event,
0,
None::<fn(&crate::IndexResult)>,
)
}
pub fn watch_project_with_periodic<F>(
prism: &Infigraph,
debounce_ms: u64,
stop_rx: mpsc::Receiver<()>,
on_event: impl Fn(WatchEvent) + Send + 'static,
periodic_secs: u64,
on_periodic: Option<F>,
) -> Result<()>
where
F: Fn(&crate::IndexResult) + Send + 'static,
{
let (tx, rx) = mpsc::channel::<notify::Result<Event>>();
let config = Config::default().with_poll_interval(Duration::from_millis(debounce_ms));
let mut watcher = RecommendedWatcher::new(tx, config)?;
let ignore_dirs: &[&str] = &[
".infigraph",
".git",
"node_modules",
"__pycache__",
".venv",
"venv",
"target",
"build",
"dist",
".tox",
];
register_watch_dirs(&mut watcher, prism.root(), ignore_dirs)?;
let mut changes_since_periodic: usize = 0;
let mut last_periodic = std::time::Instant::now();
let mut batch = ChangeBatch::new(1000);
loop {
if stop_rx.try_recv().is_ok() {
break;
}
if periodic_secs > 0
&& changes_since_periodic > 0
&& last_periodic.elapsed() >= Duration::from_secs(periodic_secs)
{
if let Some(ref cb) = on_periodic {
match prism.index() {
Ok(result) => {
if !result.extractions.is_empty() {
cb(&result);
}
}
Err(e) => eprintln!("[watch] periodic reindex failed: {e}"),
}
}
changes_since_periodic = 0;
last_periodic = std::time::Instant::now();
}
if !batch.is_empty() && batch.is_ready() {
let paths = batch.drain();
let count = paths.len();
eprintln!("[watch] batch indexing {count} files");
match prism.index_files(&paths) {
Ok(result) => {
changes_since_periodic += result.indexed_files;
if let Some(store) = prism.store() {
let changed: Vec<&str> =
result.extractions.iter().map(|e| e.file.as_str()).collect();
if !changed.is_empty() {
if let Err(e) =
crate::embed::update_embeddings(store, prism.root(), &changed)
{
eprintln!("[watch] batch embedding update failed: {e}");
}
}
}
for extraction in &result.extractions {
let cross = has_cross_file_calls(prism, &extraction.file);
let abs_path = prism.root().join(&extraction.file);
on_event(WatchEvent {
kind: WatchEventKind::Modified,
path: abs_path,
has_cross_file_calls: cross,
});
}
}
Err(e) => eprintln!("[watch] batch reindex failed: {e}"),
}
}
match rx.recv_timeout(Duration::from_millis(200)) {
Ok(Ok(event)) => {
let watch_kind = match event.kind {
EventKind::Create(_) => WatchEventKind::Created,
EventKind::Modify(_) => WatchEventKind::Modified,
EventKind::Remove(_) => WatchEventKind::Removed,
_ => continue,
};
for path in event.paths {
if should_ignore(&path, ignore_dirs) {
continue;
}
let rel = match path.strip_prefix(prism.root()) {
Ok(r) => r.to_string_lossy().replace('\\', "/"),
Err(_) => continue,
};
match watch_kind {
WatchEventKind::Removed => {
let _ = prism.remove_file(&path);
changes_since_periodic += 1;
on_event(WatchEvent {
kind: watch_kind.clone(),
path,
has_cross_file_calls: false,
});
}
WatchEventKind::Created | WatchEventKind::Modified => {
if prism.registry().for_file(&rel).is_some() {
batch.add(path);
}
}
}
}
}
Ok(Err(e)) => eprintln!("watch error: {e}"),
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
Ok(())
}
pub fn watch_project_auto_resolve(
prism: &Infigraph,
debounce_ms: u64,
stop_rx: mpsc::Receiver<()>,
log_prefix: &str,
make_registry: impl Fn() -> anyhow::Result<crate::lang::LanguageRegistry> + Send + 'static,
) -> Result<()> {
let root = prism.root().to_path_buf();
watch_project(prism, debounce_ms, stop_rx, {
let prefix = log_prefix.to_string();
move |evt: WatchEvent| {
if evt.has_cross_file_calls {
eprintln!("[watch {prefix}] {evt}");
if let Ok(reg) = make_registry() {
if let Ok(mut p) = Infigraph::open(&root, reg) {
if p.init().is_ok() {
let changed_rel = evt
.path
.strip_prefix(&root)
.map(|r| r.to_string_lossy().replace('\\', "/"))
.unwrap_or_else(|_| evt.path.to_string_lossy().replace('\\', "/"));
let mut affected_files = vec![evt.path.clone()];
if let Some(store) = p.store() {
let deps = get_cross_file_dependents(store, &changed_rel);
for dep_rel in deps {
let dep_abs = root.join(&dep_rel);
if dep_abs.exists() {
affected_files.push(dep_abs);
}
}
}
match p.index_files(&affected_files) {
Ok(r) => {
eprintln!(
"[watch {prefix}] targeted reindex: {}/{} affected files",
r.indexed_files, r.total_files
);
if let Some(store) = p.store() {
let file_strs: Vec<String> =
r.extractions.iter().map(|e| e.file.clone()).collect();
match crate::resolve::re_resolve_for_files(
store,
&file_strs,
&r.extractions,
None,
) {
Ok(stats) => {
eprintln!("[watch {prefix}] re-resolved: {stats}")
}
Err(e) => {
eprintln!("[watch {prefix}] re-resolve failed: {e}")
}
}
let changed: Vec<&str> =
r.extractions.iter().map(|e| e.file.as_str()).collect();
match crate::embed::update_embeddings(
store, &root, &changed,
) {
Ok(n) => {
eprintln!("[watch {prefix}] updated {n} embeddings")
}
Err(e) => eprintln!(
"[watch {prefix}] embedding update failed: {e}"
),
}
}
}
Err(e) => {
eprintln!("[watch {prefix}] targeted reindex failed: {e}")
}
}
}
}
}
} else {
eprintln!("[watch {prefix}] {evt}");
}
}
})
}
fn get_cross_file_dependents(store: &crate::graph::GraphStore, rel_path: &str) -> Vec<String> {
let conn = match store.connection() {
Ok(c) => c,
Err(_) => return Vec::new(),
};
let escaped = rel_path.replace('\'', "\\'");
let mut dependents = std::collections::HashSet::new();
let q1 = format!(
"MATCH (a:Symbol)-[:CALLS]->(b:Symbol) WHERE a.file = '{escaped}' AND b.file <> '{escaped}' RETURN DISTINCT b.file"
);
if let Ok(result) = conn.query(&q1) {
for row in result {
if let Some(val) = row.first() {
dependents.insert(val.to_string());
}
}
}
let q2 = format!(
"MATCH (a:Symbol)-[:CALLS]->(b:Symbol) WHERE b.file = '{escaped}' AND a.file <> '{escaped}' RETURN DISTINCT a.file"
);
if let Ok(result) = conn.query(&q2) {
for row in result {
if let Some(val) = row.first() {
dependents.insert(val.to_string());
}
}
}
dependents.into_iter().collect()
}
fn has_cross_file_calls(prism: &Infigraph, rel_path: &str) -> bool {
let store = match prism.store() {
Some(s) => s,
None => return false,
};
let conn = match store.connection() {
Ok(c) => c,
Err(_) => return false,
};
let escaped = rel_path.replace('\'', "\\'");
let q = format!(
"MATCH (a:Symbol)-[:CALLS]->(b:Symbol) WHERE a.file = '{escaped}' AND b.file <> '{escaped}' RETURN count(*) LIMIT 1"
);
if let Ok(mut result) = conn.query(&q) {
if let Some(row) = result.next() {
if let Some(val) = row.first() {
if val.to_string().parse::<u64>().unwrap_or(0) > 0 {
return true;
}
}
}
}
let q2 = format!(
"MATCH (a:Symbol)-[:CALLS]->(b:Symbol) WHERE b.file = '{escaped}' AND a.file <> '{escaped}' RETURN count(*) LIMIT 1"
);
if let Ok(mut result) = conn.query(&q2) {
if let Some(row) = result.next() {
if let Some(val) = row.first() {
return val.to_string().parse::<u64>().unwrap_or(0) > 0;
}
}
}
false
}
fn should_ignore(path: &Path, ignore_dirs: &[&str]) -> bool {
path.components().any(|c| {
let s = c.as_os_str().to_string_lossy();
ignore_dirs.contains(&s.as_ref()) || s.starts_with('.')
})
}
fn register_watch_dirs(
watcher: &mut RecommendedWatcher,
root: &Path,
ignore_dirs: &[&str],
) -> Result<()> {
watcher.watch(root, RecursiveMode::NonRecursive)?;
register_subdirs(watcher, root, ignore_dirs);
Ok(())
}
fn register_subdirs(watcher: &mut RecommendedWatcher, dir: &Path, ignore_dirs: &[&str]) {
let entries = match std::fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return,
};
for entry in entries.flatten() {
let path = entry.path();
if !path.is_dir() {
continue;
}
let name = entry.file_name();
let name_str = name.to_string_lossy();
if ignore_dirs.contains(&name_str.as_ref()) || name_str.starts_with('.') {
continue;
}
let _ = watcher.watch(&path, RecursiveMode::NonRecursive);
register_subdirs(watcher, &path, ignore_dirs);
}
}