Skip to main content

rag_rat_core/
watch.rs

1//! Background file watcher: keeps the active index (and dirty-worktree overlay) fresh as files
2//! change, so graph/symbol queries reflect uncommitted edits without waiting for a commit.
3//!
4//! - **One watcher per worktree** via the election lock; **one writer at a time per DB** via the
5//!   write lock (see [`crate::locks`]).
6//! - Watches the configured target *directories* recursively (so **new files** are seen), classifies
7//!   events through the target globs to decide whether to fire, and debounces bursts with a
8//!   max-latency cap so sustained writes can't starve a pass.
9//! - Each pass runs the existing pipeline: discover → reconcile → (rate-limited) gc →
10//!   memory_validate. Discover handles additions/edits/deletions; the pass is idempotent.
11
12use std::{
13    path::{Path, PathBuf},
14    sync::{
15        Arc,
16        atomic::{AtomicBool, Ordering},
17        mpsc::RecvTimeoutError,
18    },
19    thread::JoinHandle,
20    time::{Duration, Instant},
21};
22
23use notify::{Event, RecursiveMode, Watcher as _, recommended_watcher};
24
25use crate::{
26    config::Config,
27    fleet,
28    index::{IndexDatabase, ai::ReconcileOptions, target_for_path},
29    locks::{self, FileLock},
30};
31
32/// Run gc on every Nth watcher pass (deletion reconciliation is already handled by discover, so gc
33/// — which shells to `git worktree list` + a liveness scan — need not run every keystroke burst).
34const GC_EVERY_PASSES: u64 = 20;
35/// Bound a single reconcile so a pass never holds the write lock indefinitely.
36const PASS_RECONCILE_MAX_SECONDS: u64 = 60;
37/// Shutdown / interactive lock acquisition: skip rather than block forever.
38const SKIP_TIMEOUT: Duration = Duration::from_secs(3);
39/// Quiet window after a change to the installed binary before signaling the fleet to hot-upgrade.
40/// `cargo install` writes a temp file then renames; the debounce lets the rename settle.
41const FLEET_DEBOUNCE: Duration = Duration::from_millis(500);
42/// Max-latency cap for the fleet-trigger debounce (sustained binary churn still fires).
43const FLEET_MAX_LATENCY: Duration = Duration::from_millis(2000);
44
45/// Debounce state with a hard max-latency cap. Pure (clock injected) so it is unit-testable without
46/// real filesystem events.
47#[derive(Debug)]
48struct Debounce {
49    debounce: Duration,
50    max_latency: Duration,
51    first: Option<Instant>,
52    last: Option<Instant>,
53}
54
55impl Debounce {
56    fn new(debounce: Duration, max_latency: Duration) -> Self {
57        Self { debounce, max_latency, first: None, last: None }
58    }
59
60    fn on_event(&mut self, now: Instant) {
61        self.first.get_or_insert(now);
62        self.last = Some(now);
63    }
64
65    fn reset(&mut self) {
66        self.first = None;
67        self.last = None;
68    }
69
70    /// When a pass should fire: the earlier of "quiet window since the last event" and "max latency
71    /// since the first event". The cap is what guarantees progress under sustained writes.
72    fn fire_at(&self) -> Option<Instant> {
73        let (first, last) = (self.first?, self.last?);
74        Some((last + self.debounce).min(first + self.max_latency))
75    }
76
77    fn due_in(&self, now: Instant) -> Option<Duration> {
78        self.fire_at().map(|at| at.saturating_duration_since(now))
79    }
80
81    fn should_fire(&self, now: Instant) -> bool {
82        self.fire_at().is_some_and(|at| now >= at)
83    }
84}
85
86/// Run one maintenance pass, blocking on the per-DB write lock (watcher-to-watcher serializes).
87pub fn maintenance_pass(config: &Config, run_gc: bool) -> anyhow::Result<()> {
88    let lock_path = locks::write_lock_path(&config.database);
89    let _lock = FileLock::acquire_blocking(&lock_path)?;
90    run_pass(config, run_gc)
91}
92
93/// Run one maintenance pass only if the write lock is free within `SKIP_TIMEOUT`; returns whether
94/// it ran. Used by interactive / hook / shutdown callers so a held lock can't hang them.
95pub fn maintenance_pass_or_skip(config: &Config, run_gc: bool) -> anyhow::Result<bool> {
96    let lock_path = locks::write_lock_path(&config.database);
97    match FileLock::acquire_timeout(&lock_path, SKIP_TIMEOUT)? {
98        Some(_lock) => {
99            run_pass(config, run_gc)?;
100            Ok(true)
101        },
102        None => Ok(false),
103    }
104}
105
106fn run_pass(config: &Config, run_gc: bool) -> anyhow::Result<()> {
107    let db = IndexDatabase::index_discover(config)?;
108    let runtime = &config.local_ai.embedding.runtime;
109    let options = ReconcileOptions {
110        batch_size: Some(runtime.batch_size),
111        changed_first: true,
112        max_seconds: Some(PASS_RECONCILE_MAX_SECONDS),
113        max_embedding_chars: runtime.max_embedding_chars,
114        intra_threads: runtime.ort_threads.map(|n| n as usize),
115        ..ReconcileOptions::default()
116    };
117    db.reconcile_with_options_progress(options, |_| {})?;
118    if run_gc {
119        let _ = db.gc();
120    }
121    let _ = db.memory_validate();
122    Ok(())
123}
124
125/// A relevant event is a rescan/overflow notice, or one whose path matches a configured target —
126/// classification only decides *whether to fire a pass*, not what to index (the discover sweep does
127/// that). Ignored paths (`.rag-rat/`, `target/`, …) never match a target, so they never fire.
128fn event_is_relevant(config: &Config, event: &Event) -> bool {
129    if event.need_rescan() {
130        return true;
131    }
132    event.paths.iter().any(|path| {
133        path.strip_prefix(&config.root)
134            .ok()
135            .is_some_and(|relative| target_for_path(config, relative).is_some())
136    })
137}
138
139/// Whether `event` touches the installed binary path — the fleet hot-upgrade trigger. Matches by
140/// full path (`cargo install` renames its temp file to exactly this path) so unrelated churn in
141/// the same directory is ignored.
142fn event_targets_binary(fleet_bin: Option<&Path>, event: &Event) -> bool {
143    let Some(bin) = fleet_bin else {
144        return false;
145    };
146    event.paths.iter().any(|path| path == bin)
147}
148
149/// A running watcher. Dropping it signals the thread to stop and joins it.
150#[derive(Debug)]
151pub struct Watcher {
152    stop: Arc<AtomicBool>,
153    handle: Option<JoinHandle<()>>,
154}
155
156impl Watcher {
157    /// Start the watcher unless disabled by config or `RAG_RAT_NO_WATCH`. The returned watcher must
158    /// be kept alive; dropping it stops the thread. Returns `None` when watching is disabled.
159    pub fn spawn(config: Config) -> Option<Watcher> {
160        Self::spawn_with_fleet(config, None)
161    }
162
163    /// Like [`Watcher::spawn`], but when `fleet_bin` is the installed-binary path, the elected
164    /// watcher also watches that file's directory and signals the hot-upgrade fleet (see
165    /// [`crate::fleet`]) when a new binary lands. Only the MCP server — which has a `SIGUSR1`
166    /// handler — passes `Some`.
167    pub fn spawn_with_fleet(config: Config, fleet_bin: Option<PathBuf>) -> Option<Watcher> {
168        if !config.watch.enabled || std::env::var_os("RAG_RAT_NO_WATCH").is_some() {
169            return None;
170        }
171        let stop = Arc::new(AtomicBool::new(false));
172        let handle = std::thread::Builder::new()
173            .name("rag-rat-watch".to_string())
174            .spawn({
175                let stop = Arc::clone(&stop);
176                move || watcher_main(config, fleet_bin, &stop)
177            })
178            .ok()?;
179        Some(Watcher { stop, handle: Some(handle) })
180    }
181}
182
183impl Drop for Watcher {
184    fn drop(&mut self) {
185        self.stop.store(true, Ordering::Relaxed);
186        if let Some(handle) = self.handle.take() {
187            let _ = handle.join();
188        }
189    }
190}
191
192fn sleep_checking_stop(total: Duration, stop: &AtomicBool) {
193    let step = Duration::from_millis(200);
194    let mut waited = Duration::ZERO;
195    while waited < total {
196        if stop.load(Ordering::Relaxed) {
197            return;
198        }
199        std::thread::sleep(step.min(total - waited));
200        waited += step;
201    }
202}
203
204fn watcher_main(config: Config, fleet_bin: Option<PathBuf>, stop: &AtomicBool) {
205    let base_dir =
206        config.database.parent().map(Path::to_path_buf).unwrap_or_else(|| config.root.clone());
207    let election_path = locks::election_lock_path(&base_dir, &config.root);
208
209    // Win election (one watcher per worktree); retry so a new watcher takes over if a holder dies.
210    let _election = loop {
211        if stop.load(Ordering::Relaxed) {
212            return;
213        }
214        match FileLock::try_acquire(&election_path) {
215            Ok(Some(lock)) => break lock,
216            _ => sleep_checking_stop(Duration::from_secs(5), stop),
217        }
218    };
219
220    // Catch-up pass: covers edits made while no watcher was running (startup / election gap).
221    let _ = maintenance_pass(&config, true);
222
223    let (tx, rx) = std::sync::mpsc::channel();
224    let Ok(mut notify_watcher) = recommended_watcher(move |res| {
225        let _ = tx.send(res);
226    }) else {
227        return;
228    };
229    for target in &config.targets {
230        for dir in &target.directories {
231            let _ = notify_watcher.watch(&config.root.join(dir), RecursiveMode::Recursive);
232        }
233    }
234    // Fleet hot-upgrade: also watch the installed binary's directory so a new `cargo install`
235    // rename triggers a fleet-wide upgrade. Watch the directory (not the file) so the atomic
236    // rename — which replaces the inode — is still observed.
237    let fleet_dir = fleet_bin.as_ref().and_then(|bin| bin.parent());
238    if let Some(dir) = fleet_dir {
239        let _ = notify_watcher.watch(dir, RecursiveMode::NonRecursive);
240    }
241
242    let mut debounce = Debounce::new(
243        Duration::from_millis(config.watch.debounce_ms),
244        Duration::from_millis(config.watch.max_latency_ms),
245    );
246    let mut fleet_debounce = Debounce::new(FLEET_DEBOUNCE, FLEET_MAX_LATENCY);
247    // Periodic backstop (covers event-blind filesystems + missed events). `None` disables it.
248    let periodic = (config.watch.periodic_sweep_secs > 0)
249        .then(|| Duration::from_secs(config.watch.periodic_sweep_secs));
250    let mut passes: u64 = 0;
251    let mut last_pass = Instant::now(); // the catch-up pass just ran
252    loop {
253        if stop.load(Ordering::Relaxed) {
254            break;
255        }
256        let now = Instant::now();
257        let periodic_wait = periodic.map(|p| (last_pass + p).saturating_duration_since(now));
258        let wait = [debounce.due_in(now), fleet_debounce.due_in(now), periodic_wait]
259            .into_iter()
260            .flatten()
261            .min()
262            .unwrap_or(Duration::from_millis(500));
263        match rx.recv_timeout(wait) {
264            Ok(Ok(event)) => {
265                let now = Instant::now();
266                if event_is_relevant(&config, &event) {
267                    debounce.on_event(now);
268                }
269                if event_targets_binary(fleet_bin.as_deref(), &event) {
270                    fleet_debounce.on_event(now);
271                }
272            },
273            Ok(_) => {},
274            Err(RecvTimeoutError::Timeout) => {},
275            Err(RecvTimeoutError::Disconnected) => break,
276        }
277        let now = Instant::now();
278        let periodic_due = periodic.is_some_and(|p| now >= last_pass + p);
279        if debounce.should_fire(now) || periodic_due {
280            passes += 1;
281            let _ = maintenance_pass(&config, passes.is_multiple_of(GC_EVERY_PASSES));
282            debounce.reset();
283            last_pass = Instant::now();
284        }
285        if fleet_debounce.should_fire(now)
286            && let Some(bin) = fleet_bin.as_deref()
287        {
288            // Signal the fleet (this process last) to hot-upgrade to the freshly installed binary.
289            fleet::trigger(bin);
290            fleet_debounce.reset();
291        }
292    }
293
294    // Final pass for edits in the last debounce window — discover only (no embedding), timeout-and-
295    // skip. The host may SIGKILL shortly after stdin EOF, so shutdown must be bounded; discover is
296    // fast and keeps structure fresh, and the next session's startup catch-up does the embedding.
297    if debounce.fire_at().is_some() {
298        let _ = shutdown_discover(&config);
299    }
300}
301
302/// A bounded shutdown refresh: take the write lock only if free, run discover (no reconcile/embed).
303fn shutdown_discover(config: &Config) -> anyhow::Result<bool> {
304    let lock_path = locks::write_lock_path(&config.database);
305    match FileLock::acquire_timeout(&lock_path, SKIP_TIMEOUT)? {
306        Some(_lock) => {
307            IndexDatabase::index_discover(config)?;
308            Ok(true)
309        },
310        None => Ok(false),
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317
318    #[test]
319    fn debounce_fires_after_quiet_window() {
320        let mut d = Debounce::new(Duration::from_millis(400), Duration::from_millis(2500));
321        let t0 = Instant::now();
322        d.on_event(t0);
323        assert!(!d.should_fire(t0 + Duration::from_millis(399)), "fires before quiet window");
324        assert!(d.should_fire(t0 + Duration::from_millis(400)), "fires at quiet window");
325    }
326
327    #[test]
328    fn debounce_max_latency_cap_beats_sustained_events() {
329        let debounce = Duration::from_millis(400);
330        let max = Duration::from_millis(2500);
331        let mut d = Debounce::new(debounce, max);
332        let t0 = Instant::now();
333        d.on_event(t0);
334        // A steady stream of events every 200ms keeps the quiet window from ever elapsing...
335        let mut now = t0;
336        for _ in 0..100 {
337            now += Duration::from_millis(200);
338            d.on_event(now);
339            if now >= t0 + max {
340                break;
341            }
342            assert!(!d.should_fire(now), "should not fire mid-stream before the cap");
343        }
344        // ...but the max-latency cap forces a fire at first + max_latency regardless.
345        assert!(
346            d.should_fire(t0 + max),
347            "max-latency cap must force a pass under sustained writes"
348        );
349    }
350
351    #[test]
352    fn debounce_idle_has_no_deadline() {
353        let d = Debounce::new(Duration::from_millis(400), Duration::from_millis(2500));
354        assert!(d.due_in(Instant::now()).is_none());
355        assert!(!d.should_fire(Instant::now()));
356    }
357}