mcp_memory/watcher.rs
1//! Background file watcher for automatic re-indexing of code-symbol entities.
2//!
3//! Launch per-project watches via [`spawn_watcher`]. Re-indexing goes through
4//! [`crate::actions::code::handle_code_index`], which resolves the project's
5//! database from [`crate::code_registry`]; the watcher holds that project's
6//! `Arc<GraphHandle>` for its lifetime so the canonical instance stays open.
7//! The watcher uses OS-native filesystem events (`notify` crate) with a 2-second
8//! debounce window to avoid thrashing during bulk edits / git operations.
9
10#![cfg(feature = "code")]
11
12use std::path::Path;
13use std::sync::Arc;
14
15use notify::Watcher as _;
16
17use crate::code::lang;
18use crate::kg::GraphHandle;
19
20/// Spawn a background thread that watches `path` (recursively) for file
21/// modifications and re-indexes changed files under the given `project`.
22///
23/// `kg_arc` is the project's handle; the thread holds it to pin the canonical
24/// instance open. The watcher debounces events for 2 seconds of quiet before
25/// triggering a re-index batch.
26pub fn spawn_watcher(kg_arc: Arc<GraphHandle>, path: String, project: &str) {
27 let _ = std::thread::Builder::new()
28 .name(format!("watcher-{project}"))
29 .spawn(move || {
30 // Held for the thread's lifetime to keep the project DB's canonical
31 // handle alive; this is the same instance the registry hands out, so
32 // re-indexing uses it directly without re-acquiring the registry lock.
33 let kg = kg_arc;
34 let (tx, rx) = std::sync::mpsc::channel::<notify::Event>();
35 let Ok(mut watcher) = notify::recommended_watcher(move |res| {
36 if let Ok(event) = res {
37 let _ = tx.send(event);
38 }
39 }) else {
40 return;
41 };
42 if watcher
43 .watch(Path::new(&path), notify::RecursiveMode::Recursive)
44 .is_err()
45 {
46 return;
47 }
48
49 use std::collections::BTreeSet;
50 use std::path::PathBuf;
51 use std::time::{Duration, Instant};
52
53 const DEBOUNCE_MS: u64 = 2000;
54 let base = crate::actions::code::canonical_base();
55 let mut pending: BTreeSet<PathBuf> = BTreeSet::new();
56 let mut last_event = Instant::now();
57
58 loop {
59 let elapsed = last_event.elapsed().as_millis() as u64;
60 let timeout = Duration::from_millis(DEBOUNCE_MS.saturating_sub(elapsed));
61 match rx.recv_timeout(timeout) {
62 Ok(first) => {
63 // Process the event that woke us, then drain any queued behind it.
64 let mut collect = |event: notify::Event| {
65 for p in &event.paths {
66 if lang::detect(p).is_some() {
67 pending.insert(p.clone());
68 }
69 }
70 };
71 collect(first);
72 while let Ok(event) = rx.try_recv() {
73 collect(event);
74 }
75 last_event = Instant::now();
76 }
77 // Debounce window elapsed — apply the whole change set at once:
78 // re-index surviving files in a single batch (one parse pool,
79 // amortized write transactions) and purge deleted ones.
80 Err(_) if !pending.is_empty() => {
81 let mut to_index: Vec<PathBuf> = Vec::new();
82 for p in std::mem::take(&mut pending) {
83 if p.exists() {
84 to_index.push(p);
85 } else {
86 let name = crate::actions::code::file_entity_name(&p, &base);
87 let _ = kg.code_purge_file(&name);
88 }
89 }
90 if !to_index.is_empty() {
91 let _ = crate::actions::code::index_paths(
92 kg.as_ref(),
93 to_index,
94 &base,
95 false,
96 );
97 }
98 }
99 Err(_) => {}
100 }
101 }
102 });
103}