Skip to main content

cartog_watch/
lib.rs

1//! File system watcher with auto-reindexing for cartog.
2//!
3//! Watches a directory for source file changes using debounced filesystem events,
4//! triggers incremental re-indexing, and optionally defers RAG embedding to batch
5//! changed symbols after a configurable quiet period.
6#![doc = ""]
7#![doc = include_str!("../README.md")]
8
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use anyhow::{Context, Result};
16use notify_debouncer_mini::{new_debouncer, DebouncedEventKind};
17use serde::Serialize;
18use tracing::{debug, info, warn};
19
20use cartog_core::detect_language;
21use cartog_db::Database;
22use cartog_indexer as indexer;
23use cartog_indexer::is_ignored_dirname;
24use cartog_rag as rag;
25
26mod stale;
27pub use stale::{StaleSnapshot, StaleState};
28
29/// Configuration for the watch loop.
30pub struct WatchConfig {
31    /// Root directory to watch.
32    pub root: PathBuf,
33    /// Debounce window for filesystem events.
34    pub debounce: Duration,
35    /// Auto-embed override. `Some(true)`/`Some(false)` force on/off; `None`
36    /// auto-detects (embed only if the DB already has embeddings). Resolved once
37    /// at startup against the live DB — see [`run_watch`].
38    pub rag_override: Option<bool>,
39    /// Delay after last index before embedding (only when auto-embed is on).
40    pub rag_delay: Duration,
41    /// RAG provider configuration (embedding + reranker).
42    pub rag_config: rag::EmbeddingProviderConfig,
43    /// Secret-redaction policy applied to each re-index pass.
44    pub redact: indexer::RedactionConfig,
45    /// Emit newline-delimited JSON events on stdout. When false, the loop
46    /// only produces tracing logs on stderr (existing behavior).
47    pub json_events: bool,
48    /// Directory for the watcher's PID file (written on startup, removed on
49    /// graceful exit). `None` disables PID-file tracking. Consulted by
50    /// `cartog self update` to detect a running watcher.
51    pub pid_lock_dir: Option<PathBuf>,
52    /// Slot name used when acquiring the watch PID file. Required when
53    /// `pid_lock_dir` is set — [`run_watch`]/[`spawn_watch`] hard-fail
54    /// if a directory is configured without a slot, to prevent a
55    /// global-slot watcher from silently colliding with DB-scoped peers
56    /// in multi-project setups. `None` is only valid when `pid_lock_dir`
57    /// is also `None` (untracked mode used by tests).
58    ///
59    /// In the cartog binary the slot is derived via
60    /// `cartog::state::slot_for_db("watch", db_path)`. Library
61    /// embedders should follow the same shape: `<prefix>-<16 hex chars>`
62    /// where the hex is a SHA-256 prefix of the canonicalized DB path.
63    pub pid_lock_slot: Option<String>,
64    /// Open the on-disk DB via `Database::open_existing_rw` instead of
65    /// `Database::open`. Used by the Phase 5 promoter to attach without
66    /// re-running schema migrations (the promoter validated the schema
67    /// when it pinned `PinnedAttach`; running them again would re-trigger
68    /// the SQLITE_BUSY race the election prevents).
69    pub skip_migrations: bool,
70    /// Shared staleness state for the MCP server to read. `None` (the default,
71    /// e.g. standalone `cartog watch`) disables staleness publishing.
72    pub stale: Option<Arc<StaleState>>,
73}
74
75impl WatchConfig {
76    /// Build a [`WatchConfig`] rooted at `root` with these defaults:
77    /// `debounce = 5s`, `rag_override = None` (auto-detect), `rag_delay = 30s`,
78    /// `json_events = false`, both `pid_lock_*` = `None` (untracked
79    /// mode), `skip_migrations = false`. Callers wanting PID-lock
80    /// tracking must set BOTH `pid_lock_dir` and `pid_lock_slot` after
81    /// construction — see [`WatchConfig::pid_lock_slot`].
82    pub fn new(root: PathBuf) -> Self {
83        Self {
84            root,
85            // 5s default: long enough to collapse bursts from `git pull` /
86            // `npm install` / branch switches into a single re-index, short
87            // enough to feel live for normal save-on-type editing.
88            debounce: Duration::from_secs(5),
89            rag_override: None,
90            rag_delay: Duration::from_secs(30),
91            rag_config: rag::EmbeddingProviderConfig::default(),
92            redact: indexer::RedactionConfig::default(),
93            json_events: false,
94            pid_lock_dir: None,
95            pid_lock_slot: None,
96            skip_migrations: false,
97            stale: None,
98        }
99    }
100}
101
102/// Legacy fallback slot used in untracked mode (`pid_lock_dir = None`,
103/// `pid_lock_slot = None`). When `pid_lock_dir` is set, callers must
104/// provide a DB-scoped slot via
105/// `cartog::state::slot_for_db("watch", db_path)` — see
106/// [`WatchConfig::pid_lock_slot`].
107pub const WATCH_LOCK_SLOT: &str = "watch";
108
109/// Resolve whether the watcher auto-embeds: an explicit `override` wins, else
110/// auto-detect — embed only if the repo already has embeddings (opted into RAG).
111fn resolve_watch_rag(override_: Option<bool>, embedding_count: u32) -> bool {
112    override_.unwrap_or(embedding_count > 0)
113}
114
115/// A single event emitted by the watch loop when `json_events` is enabled.
116///
117/// Serialized to stdout as one compact JSON object per line (NDJSON) so
118/// downstream tooling can parse events as they arrive.
119#[derive(Debug, Serialize)]
120#[serde(tag = "event", rename_all = "snake_case")]
121enum WatchEvent<'a> {
122    /// Emitted once when the watcher begins observing the tree.
123    Started {
124        root: &'a str,
125        debounce_ms: u128,
126        rag: bool,
127        rag_delay_s: u64,
128    },
129    /// A debounced re-index pass completed successfully.
130    Reindex {
131        files_indexed: u32,
132        files_skipped: u32,
133        files_removed: u32,
134        symbols_added: u32,
135        edges_added: u32,
136        edges_resolved: u32,
137        duration_ms: u128,
138    },
139    /// A debounced re-index pass failed; the loop keeps running.
140    ReindexFailed { error: String },
141    /// RAG deferred-embedding pass completed.
142    RagEmbedded {
143        symbols_embedded: u32,
144        symbols_skipped: u32,
145        total_content_symbols: u32,
146        duration_ms: u128,
147    },
148    /// RAG deferred-embedding pass failed; the loop keeps running.
149    RagFailed { error: String },
150    /// Emitted once when the watcher has stopped (Ctrl+C, drop, etc.).
151    Shutdown,
152}
153
154/// Write one NDJSON line to stdout, flushing immediately so consumers see
155/// events in real time rather than when the pipe buffer flushes.
156///
157/// Deliberately fire-and-forget: the only realistic failure modes are a
158/// closed stdout pipe (the consumer went away — nothing to report to) or a
159/// serde error on an entirely statically-typed struct (impossible in
160/// practice). Propagating would force every call site to decide whether to
161/// abort the watch loop over a transient stdout hiccup, which is worse
162/// behavior than missing one event line.
163fn emit_event(event: &WatchEvent<'_>) {
164    if let Ok(line) = serde_json::to_string(event) {
165        let mut out = std::io::stdout().lock();
166        let _ = writeln!(out, "{line}");
167        let _ = out.flush();
168    }
169}
170
171/// Handle returned by `spawn_watch`. Drop or call `stop()` to shut down the watcher.
172pub struct WatchHandle {
173    shutdown: Arc<AtomicBool>,
174    thread: Option<std::thread::JoinHandle<()>>,
175}
176
177impl WatchHandle {
178    /// Signal the watch loop to stop and wait for it to finish.
179    pub fn stop(mut self) {
180        self.shutdown.store(true, Ordering::SeqCst);
181        if let Some(handle) = self.thread.take() {
182            let _ = handle.join();
183        }
184    }
185}
186
187impl Drop for WatchHandle {
188    fn drop(&mut self) {
189        self.shutdown.store(true, Ordering::SeqCst);
190        // Best-effort join with a bounded deadline. Without this, the
191        // watcher thread keeps holding the PID `ProcessLock` until its
192        // next `recv_timeout` (~1s for idle, up to `config.debounce`
193        // worst case), so a fresh `cartog watch` started right after Drop
194        // would observe AcquireError::Held — confusing the user who saw
195        // the previous process exit. We try briefly (~1.5s) then return:
196        // we'd rather leak the thread than block shutdown for several
197        // seconds on a hung debouncer. Callers wanting deterministic
198        // cleanup should call `stop()` explicitly (it joins unbounded).
199        if let Some(handle) = self.thread.take() {
200            let deadline = std::time::Instant::now() + Duration::from_millis(1500);
201            while !handle.is_finished() && std::time::Instant::now() < deadline {
202                std::thread::sleep(Duration::from_millis(25));
203            }
204            if handle.is_finished() {
205                let _ = handle.join();
206            }
207            // else: leak the JoinHandle (process is shutting down anyway).
208        }
209    }
210}
211
212/// Validate that the PID-lock configuration on a [`WatchConfig`] is
213/// internally consistent. The two dangerous half-configured states are
214/// `(Some(dir), None)` — a global slot would collide with DB-scoped
215/// peers — and `(None, Some(slot))` — the slot is silently ignored and
216/// the caller's intent is dropped. Called synchronously by both
217/// [`spawn_watch`] and [`run_watch`] so a misconfigured embedder never
218/// gets an `Ok(WatchHandle)` for a watcher that died on the spot.
219fn validate_pid_lock_config(config: &WatchConfig) -> Result<()> {
220    match (
221        config.pid_lock_dir.is_some(),
222        config.pid_lock_slot.is_some(),
223    ) {
224        (true, false) => anyhow::bail!(
225            "WatchConfig::pid_lock_dir is set but pid_lock_slot is None; \
226             refusing to claim the global watch slot — pass a DB-scoped slot \
227             (e.g. `cartog::state::slot_for_db(\"watch\", db_path)`)"
228        ),
229        (false, true) => anyhow::bail!(
230            "WatchConfig::pid_lock_slot is set but pid_lock_dir is None; \
231             a slot without a directory is silently ignored — either set \
232             both fields or clear both to run in untracked mode"
233        ),
234        _ => Ok(()),
235    }
236}
237
238/// Spawn the watch loop on a background thread.
239///
240/// Returns a `WatchHandle` that can be used to stop the watcher. The
241/// watcher opens its own `Database` connection (SQLite WAL allows
242/// concurrent readers).
243///
244/// Static misconfiguration of [`WatchConfig`] (e.g. `pid_lock_dir` set
245/// without `pid_lock_slot`, or vice versa) is checked synchronously
246/// before the thread is spawned and surfaced as `Err`, so a
247/// misconfigured embedder never gets a `WatchHandle` whose thread is
248/// already dead. Errors that emerge later (PID-lock contention with a
249/// live peer, filesystem I/O failures) are logged inside the thread;
250/// the caller still receives `Ok(WatchHandle)`. Use [`run_watch`] when
251/// ALL failures must propagate synchronously.
252pub fn spawn_watch(config: WatchConfig, db_path: &str) -> Result<WatchHandle> {
253    let root = config
254        .root
255        .canonicalize()
256        .context("cannot resolve watch root")?;
257
258    if !root.is_dir() {
259        anyhow::bail!("watch target is not a directory: {}", root.display());
260    }
261    validate_pid_lock_config(&config)?;
262
263    let db_path = db_path.to_string();
264    let shutdown = Arc::new(AtomicBool::new(false));
265    let shutdown_clone = Arc::clone(&shutdown);
266
267    let thread = std::thread::Builder::new()
268        .name("cartog-watch".into())
269        .spawn(move || {
270            if let Err(e) = watch_loop(config, &root, &db_path, &shutdown_clone) {
271                warn!(error = %e, "watch loop exited with error");
272            }
273        })
274        .context("failed to spawn watch thread")?;
275
276    Ok(WatchHandle {
277        shutdown,
278        thread: Some(thread),
279    })
280}
281
282/// Run the watch loop in the foreground (blocking).
283///
284/// Used by `cartog watch` CLI command.
285pub fn run_watch(config: WatchConfig, db_path: &str) -> Result<()> {
286    validate_pid_lock_config(&config)?;
287    let root = config
288        .root
289        .canonicalize()
290        .context("cannot resolve watch root")?;
291
292    if !root.is_dir() {
293        anyhow::bail!("watch target is not a directory: {}", root.display());
294    }
295
296    let shutdown = Arc::new(AtomicBool::new(false));
297    let shutdown_clone = Arc::clone(&shutdown);
298
299    // Install Ctrl+C handler for graceful shutdown
300    install_ctrlc_handler(&shutdown_clone);
301
302    watch_loop(config, &root, db_path, &shutdown)
303}
304
305/// Install a Ctrl+C handler that sets the shutdown flag.
306fn install_ctrlc_handler(flag: &Arc<AtomicBool>) {
307    let flag = Arc::clone(flag);
308    let _ = ctrlc::set_handler(move || {
309        flag.store(true, Ordering::SeqCst);
310    });
311}
312
313/// Core watch loop. Runs until `shutdown` is set.
314fn watch_loop(
315    config: WatchConfig,
316    root: &Path,
317    db_path: &str,
318    shutdown: &AtomicBool,
319) -> Result<()> {
320    // Acquire first so an election loss aborts before opening DB / watcher.
321    // Unlike `cartog serve`, the watcher does NOT attach read-only — if
322    // another writer already owns the slot, we refuse to start and let the
323    // user stop the running process. This applies uniformly: the Phase 5
324    // promoter also acquires `watch.pid` so a separately-running
325    // `cartog watch` from a terminal correctly sees Held and aborts. Two
326    // watchers writing to the same DB would re-index the same files in
327    // parallel; the watch slot is the only thing preventing that.
328    //
329    // The (Some(dir), None) hard-fail is enforced synchronously by
330    // `validate_pid_lock_config` in spawn_watch/run_watch BEFORE this
331    // function runs; reaching this point with a misconfigured pair
332    // means a caller bypassed those entry points.
333    validate_pid_lock_config(&config)?;
334    let watch_slot: Option<&str> = config.pid_lock_slot.as_deref();
335    let _lock: Option<cartog_process_lock::ProcessLock> =
336        match (config.pid_lock_dir.as_deref(), watch_slot) {
337            (Some(dir), Some(slot)) => match cartog_process_lock::ProcessLock::acquire(dir, slot) {
338                Ok(lock) => Some(lock),
339                Err(cartog_process_lock::AcquireError::Held(held)) => {
340                    anyhow::bail!(
341                        "another cartog process holds the watch lock at {} (slot {}, PID {}); \
342                         stop it before running `cartog watch`",
343                        dir.display(),
344                        held.slot,
345                        held.pid,
346                    );
347                }
348                Err(cartog_process_lock::AcquireError::Io(e)) => {
349                    return Err(e).with_context(|| {
350                        format!("failed to acquire watch PID lock at {}", dir.display())
351                    });
352                }
353            },
354            _ => None,
355        };
356
357    let db = if config.skip_migrations {
358        Database::open_existing_rw(db_path)
359            .context("failed to open database for watcher (existing-rw)")?
360    } else {
361        Database::open(db_path, config.rag_config.resolved_dimension())
362            .context("failed to open database for watcher")?
363    };
364
365    // Re-resolve auto-embed on every consultation, not once at startup: an
366    // explicit override wins, else auto-detect against the LIVE embedding count.
367    // This way a repo that runs its first `rag index` AFTER the watcher started
368    // (the common MCP flow) begins auto-embedding without a restart.
369    let rag_override = config.rag_override;
370    let rag_enabled = |db: &Database| -> bool {
371        resolve_watch_rag(
372            rag_override,
373            db.embedding_count().unwrap_or_else(|e| {
374                warn!(error = %e, "failed to read embedding count; auto-embed off");
375                0
376            }),
377        )
378    };
379
380    info!(
381        path = %root.display(),
382        debounce_ms = config.debounce.as_millis(),
383        rag = rag_enabled(&db),
384        rag_delay_s = config.rag_delay.as_secs(),
385        "starting watch"
386    );
387    if config.json_events {
388        emit_event(&WatchEvent::Started {
389            root: &root.to_string_lossy(),
390            debounce_ms: config.debounce.as_millis(),
391            rag: rag_enabled(&db),
392            rag_delay_s: config.rag_delay.as_secs(),
393        });
394    }
395
396    // Initial incremental index to ensure DB is current. Symbols left needing
397    // embedding here arm the RAG timer + staleness state below, so the initial
398    // batch is embedded on the same deferred schedule as change-driven reindexes
399    // (and shows the staleness banner meanwhile) rather than only on shutdown.
400    let mut initial_pending = 0u32;
401    let initial_start = Instant::now();
402    // Watch never runs the LSP pass (lsp = false), so the override map is inert.
403    match indexer::index_directory(
404        &db,
405        root,
406        false,
407        false,
408        None,
409        None,
410        config.redact,
411        &std::collections::HashMap::new(),
412    ) {
413        Ok(r) => {
414            info!(
415                files = r.files_indexed,
416                skipped = r.files_skipped,
417                removed = r.files_removed,
418                symbols = r.symbols_added,
419                "initial index complete"
420            );
421            if config.json_events {
422                emit_event(&WatchEvent::Reindex {
423                    files_indexed: r.files_indexed,
424                    files_skipped: r.files_skipped,
425                    files_removed: r.files_removed,
426                    symbols_added: r.symbols_added,
427                    edges_added: r.edges_added,
428                    edges_resolved: r.edges_resolved,
429                    duration_ms: initial_start.elapsed().as_millis(),
430                });
431            }
432            if rag_enabled(&db) {
433                match db.symbols_needing_embeddings() {
434                    Ok(needing) => initial_pending = needing.len() as u32,
435                    Err(e) => warn!(error = %e, "failed to check embedding status"),
436                }
437                // A pending format upgrade re-embeds all symbols; count it so the
438                // staleness banner reflects the queued re-embed.
439                if initial_pending == 0
440                    && rag::indexer::embedding_format_upgrade_pending(&db).unwrap_or(false)
441                {
442                    initial_pending = db.symbol_content_count().unwrap_or(1).max(1);
443                }
444            }
445            // Publish the post-initial-index state (no changes observed yet, so
446            // change_seq is 0; caught up to 0).
447            if let Some(s) = &config.stale {
448                s.note_reindex(s.change_seq(), initial_pending);
449            }
450        }
451        Err(e) => {
452            warn!(error = %e, "initial index failed");
453            if config.json_events {
454                emit_event(&WatchEvent::ReindexFailed {
455                    error: e.to_string(),
456                });
457            }
458        }
459    }
460
461    // Set up the debounced file watcher
462    let (tx, rx) = std::sync::mpsc::channel();
463    let mut debouncer =
464        new_debouncer(config.debounce, tx).context("failed to create file watcher")?;
465
466    debouncer
467        .watcher()
468        .watch(root, notify::RecursiveMode::Recursive)
469        .context("failed to start watching directory")?;
470
471    info!("watching for changes (Ctrl+C to stop)");
472
473    // Create the embedding provider once (lazy, on first RAG use). On
474    // first creation we also reconcile the on-disk fingerprint so a
475    // provider/model swap (even at the same dimension) clears the now-stale
476    // vector index instead of returning garbage similarity scores.
477    let mut rag_provider: Option<Box<dyn rag::provider::EmbeddingProvider>> = None;
478    let ensure_provider =
479        |provider: &mut Option<Box<dyn rag::provider::EmbeddingProvider>>| -> bool {
480            if provider.is_none() {
481                match rag::create_embedding_provider(&config.rag_config) {
482                    Ok(p) => {
483                        if let Err(e) =
484                            db.reconcile_embedding_fingerprint(&rag::fingerprint_of(p.as_ref()))
485                        {
486                            warn!(error = %e, "failed to reconcile embedding fingerprint");
487                            return false;
488                        }
489                        *provider = Some(p);
490                        true
491                    }
492                    Err(e) => {
493                        warn!(error = %e, "failed to create embedding provider");
494                        false
495                    }
496                }
497            } else {
498                true
499            }
500        };
501
502    // RAG timer seed. `initial_pending` already folds in a pending format upgrade.
503    let mut rag_pending = initial_pending > 0;
504    let mut last_index_time: Option<Instant> = rag_pending.then(Instant::now);
505
506    loop {
507        if shutdown.load(Ordering::SeqCst) {
508            break;
509        }
510
511        // Wait for events with a timeout so we can check shutdown + RAG timer
512        let poll_timeout = if rag_pending {
513            Duration::from_millis(500) // Poll frequently to check RAG timer
514        } else {
515            Duration::from_secs(1) // Idle poll for shutdown check
516        };
517
518        match rx.recv_timeout(poll_timeout) {
519            Ok(Ok(events)) => {
520                // Filter events to only supported source files in non-ignored dirs
521                let relevant = events.iter().any(|event| {
522                    event.kind == DebouncedEventKind::Any && is_relevant_path(&event.path, root)
523                });
524
525                if relevant {
526                    debug!(
527                        count = events.len(),
528                        "file change events received, re-indexing"
529                    );
530                    // Capture the change count BEFORE reindexing; a change that
531                    // arrives while the reindex runs bumps the seq past this and
532                    // stays flagged stale.
533                    let caught_up_to = config.stale.as_ref().map(|s| {
534                        s.note_change();
535                        s.change_seq()
536                    });
537                    let reindex_start = Instant::now();
538                    match indexer::index_directory(
539                        &db,
540                        root,
541                        false,
542                        false,
543                        None,
544                        None,
545                        config.redact,
546                        &std::collections::HashMap::new(),
547                    ) {
548                        Ok(r) => {
549                            if r.files_indexed > 0 || r.files_removed > 0 {
550                                info!(
551                                    files = r.files_indexed,
552                                    skipped = r.files_skipped,
553                                    removed = r.files_removed,
554                                    symbols = r.symbols_added,
555                                    "re-indexed"
556                                );
557                            }
558                            if config.json_events && (r.files_indexed > 0 || r.files_removed > 0) {
559                                emit_event(&WatchEvent::Reindex {
560                                    files_indexed: r.files_indexed,
561                                    files_skipped: r.files_skipped,
562                                    files_removed: r.files_removed,
563                                    symbols_added: r.symbols_added,
564                                    edges_added: r.edges_added,
565                                    edges_resolved: r.edges_resolved,
566                                    duration_ms: reindex_start.elapsed().as_millis(),
567                                });
568                            }
569                            // Check if RAG embedding is needed
570                            let mut pending_count = 0u32;
571                            if rag_enabled(&db) {
572                                match db.symbols_needing_embeddings() {
573                                    Ok(needing) if !needing.is_empty() => {
574                                        debug!(
575                                            pending = needing.len(),
576                                            "symbols need embedding, starting RAG timer"
577                                        );
578                                        pending_count = needing.len() as u32;
579                                        rag_pending = true;
580                                        last_index_time = Some(Instant::now());
581                                    }
582                                    Ok(_) => {
583                                        // No symbols need embedding
584                                        rag_pending = false;
585                                    }
586                                    Err(e) => {
587                                        warn!(error = %e, "failed to check embedding status");
588                                    }
589                                }
590                            }
591                            if let (Some(s), Some(seq)) = (&config.stale, caught_up_to) {
592                                s.note_reindex(seq, pending_count);
593                            }
594                        }
595                        Err(e) => {
596                            warn!(error = %e, "re-index failed");
597                            if config.json_events {
598                                emit_event(&WatchEvent::ReindexFailed {
599                                    error: e.to_string(),
600                                });
601                            }
602                        }
603                    }
604                }
605            }
606            Ok(Err(error)) => {
607                warn!(error = %error, "file watcher error");
608            }
609            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
610                // Check RAG timer (rag_pending is only set when auto-embed is on)
611                if rag_pending {
612                    if let Some(last) = last_index_time {
613                        if last.elapsed() >= config.rag_delay {
614                            info!("RAG delay elapsed, embedding pending symbols");
615                            if !ensure_provider(&mut rag_provider) {
616                                rag_pending = false;
617                                last_index_time = None;
618                                continue;
619                            }
620                            if let Some(ref mut provider) = rag_provider {
621                                let embed_start = Instant::now();
622                                match rag::indexer::index_embeddings(
623                                    &db,
624                                    provider.as_mut(),
625                                    false,
626                                    None,
627                                    None,
628                                ) {
629                                    Ok(r) => {
630                                        info!(
631                                            embedded = r.symbols_embedded,
632                                            skipped = r.symbols_skipped,
633                                            "RAG embedding complete"
634                                        );
635                                        if config.json_events {
636                                            emit_event(&WatchEvent::RagEmbedded {
637                                                symbols_embedded: r.symbols_embedded,
638                                                symbols_skipped: r.symbols_skipped,
639                                                total_content_symbols: r.total_content_symbols,
640                                                duration_ms: embed_start.elapsed().as_millis(),
641                                            });
642                                        }
643                                        // Embeddings are current — clear the stale signal.
644                                        if let Some(s) = &config.stale {
645                                            s.clear_rag_pending();
646                                        }
647                                    }
648                                    Err(e) => {
649                                        warn!(error = %e, "RAG embedding failed");
650                                        if config.json_events {
651                                            emit_event(&WatchEvent::RagFailed {
652                                                error: e.to_string(),
653                                            });
654                                        }
655                                        // Leave the stale signal set: embeddings did NOT
656                                        // catch up, so callers must still be warned.
657                                    }
658                                }
659                            }
660                            // Disarm the local retry timer regardless (avoid a tight
661                            // re-embed loop); a later file change re-arms it.
662                            rag_pending = false;
663                            last_index_time = None;
664                        }
665                    }
666                }
667            }
668            Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
669                warn!("file watcher channel disconnected");
670                break;
671            }
672        }
673    }
674
675    // Flush pending RAG embeddings on shutdown (rag_pending implies enabled)
676    if rag_pending {
677        info!("flushing pending RAG embeddings before shutdown");
678        ensure_provider(&mut rag_provider);
679        if let Some(ref mut provider) = rag_provider {
680            let embed_start = Instant::now();
681            match rag::indexer::index_embeddings(&db, provider.as_mut(), false, None, None) {
682                Ok(r) => {
683                    info!(embedded = r.symbols_embedded, "final RAG flush complete");
684                    if config.json_events {
685                        emit_event(&WatchEvent::RagEmbedded {
686                            symbols_embedded: r.symbols_embedded,
687                            symbols_skipped: r.symbols_skipped,
688                            total_content_symbols: r.total_content_symbols,
689                            duration_ms: embed_start.elapsed().as_millis(),
690                        });
691                    }
692                }
693                Err(e) => {
694                    warn!(error = %e, "final RAG flush failed");
695                    if config.json_events {
696                        emit_event(&WatchEvent::RagFailed {
697                            error: e.to_string(),
698                        });
699                    }
700                }
701            }
702        }
703    }
704
705    info!("watch stopped");
706    if config.json_events {
707        emit_event(&WatchEvent::Shutdown);
708    }
709    Ok(())
710}
711
712/// Check if a path is relevant for indexing: supported language + not in ignored directory.
713///
714/// Returns `false` for:
715/// - Files with unsupported extensions (no tree-sitter extractor)
716/// - Files outside the watched root (e.g., symlink escapes)
717/// - Files under an ignored directory (`.git`, `node_modules`, etc.)
718fn is_relevant_path(path: &Path, root: &Path) -> bool {
719    // Must be a supported source file
720    if detect_language(path).is_none() {
721        return false;
722    }
723
724    // Must be under the watched root
725    let relative = match path.strip_prefix(root) {
726        Ok(rel) => rel,
727        Err(_) => return false,
728    };
729
730    // Check that no ancestor directory is ignored
731    if let Some(parent) = relative.parent() {
732        for component in parent.components() {
733            if let std::path::Component::Normal(name) = component {
734                if let Some(name_str) = name.to_str() {
735                    if is_ignored_dirname(name_str) {
736                        return false;
737                    }
738                }
739            }
740        }
741    }
742
743    true
744}
745
746#[cfg(test)]
747mod tests {
748    use super::*;
749    use std::path::PathBuf;
750
751    // ── Language coverage: all supported extensions ──
752
753    #[test]
754    fn test_relevant_python_file() {
755        let root = PathBuf::from("/project");
756        assert!(is_relevant_path(Path::new("/project/src/main.py"), &root));
757    }
758
759    #[test]
760    fn test_relevant_python_stub() {
761        let root = PathBuf::from("/project");
762        assert!(is_relevant_path(Path::new("/project/src/types.pyi"), &root));
763    }
764
765    #[test]
766    fn test_relevant_typescript_file() {
767        let root = PathBuf::from("/project");
768        assert!(is_relevant_path(Path::new("/project/src/app.ts"), &root));
769    }
770
771    #[test]
772    fn test_relevant_tsx_file() {
773        let root = PathBuf::from("/project");
774        assert!(is_relevant_path(Path::new("/project/src/App.tsx"), &root));
775    }
776
777    #[test]
778    fn test_relevant_javascript_file() {
779        let root = PathBuf::from("/project");
780        assert!(is_relevant_path(Path::new("/project/src/index.js"), &root));
781    }
782
783    #[test]
784    fn test_relevant_jsx_file() {
785        let root = PathBuf::from("/project");
786        assert!(is_relevant_path(Path::new("/project/src/App.jsx"), &root));
787    }
788
789    #[test]
790    fn test_relevant_mjs_file() {
791        let root = PathBuf::from("/project");
792        assert!(is_relevant_path(Path::new("/project/src/utils.mjs"), &root));
793    }
794
795    #[test]
796    fn test_relevant_cjs_file() {
797        let root = PathBuf::from("/project");
798        assert!(is_relevant_path(
799            Path::new("/project/src/config.cjs"),
800            &root
801        ));
802    }
803
804    #[test]
805    fn test_relevant_rust_file() {
806        let root = PathBuf::from("/project");
807        assert!(is_relevant_path(Path::new("/project/src/lib.rs"), &root));
808    }
809
810    #[test]
811    fn test_relevant_go_file() {
812        let root = PathBuf::from("/project");
813        assert!(is_relevant_path(Path::new("/project/cmd/main.go"), &root));
814    }
815
816    #[test]
817    fn test_relevant_ruby_file() {
818        let root = PathBuf::from("/project");
819        assert!(is_relevant_path(
820            Path::new("/project/lib/service.rb"),
821            &root
822        ));
823    }
824
825    #[test]
826    fn test_relevant_java_file() {
827        let root = PathBuf::from("/project");
828        assert!(is_relevant_path(
829            Path::new("/project/src/UserService.java"),
830            &root
831        ));
832    }
833
834    // ── Irrelevant file types ──
835
836    #[test]
837    fn test_irrelevant_json_file() {
838        let root = PathBuf::from("/project");
839        assert!(!is_relevant_path(Path::new("/project/package.json"), &root));
840    }
841
842    #[test]
843    fn test_relevant_markdown_file() {
844        let root = PathBuf::from("/project");
845        assert!(is_relevant_path(Path::new("/project/README.md"), &root));
846        assert!(is_relevant_path(
847            Path::new("/project/docs/design.md"),
848            &root
849        ));
850    }
851
852    #[test]
853    fn test_irrelevant_toml_file() {
854        let root = PathBuf::from("/project");
855        assert!(!is_relevant_path(Path::new("/project/Cargo.toml"), &root));
856    }
857
858    #[test]
859    fn test_irrelevant_yaml_file() {
860        let root = PathBuf::from("/project");
861        assert!(!is_relevant_path(
862            Path::new("/project/.github/ci.yml"),
863            &root
864        ));
865    }
866
867    #[test]
868    fn test_irrelevant_no_extension() {
869        let root = PathBuf::from("/project");
870        assert!(!is_relevant_path(Path::new("/project/Makefile"), &root));
871    }
872
873    // ── Ignored directories (all entries from is_ignored_dirname) ──
874
875    #[test]
876    fn test_ignored_node_modules() {
877        let root = PathBuf::from("/project");
878        assert!(!is_relevant_path(
879            Path::new("/project/node_modules/pkg/index.js"),
880            &root
881        ));
882    }
883
884    #[test]
885    fn test_ignored_git_dir() {
886        let root = PathBuf::from("/project");
887        assert!(!is_relevant_path(
888            Path::new("/project/.git/hooks/pre-commit.py"),
889            &root
890        ));
891    }
892
893    #[test]
894    fn test_ignored_target_dir() {
895        let root = PathBuf::from("/project");
896        assert!(!is_relevant_path(
897            Path::new("/project/target/debug/build.rs"),
898            &root
899        ));
900    }
901
902    #[test]
903    fn test_ignored_pycache() {
904        let root = PathBuf::from("/project");
905        assert!(!is_relevant_path(
906            Path::new("/project/src/__pycache__/mod.py"),
907            &root
908        ));
909    }
910
911    #[test]
912    fn test_ignored_nested_vendor() {
913        let root = PathBuf::from("/project");
914        assert!(!is_relevant_path(
915            Path::new("/project/lib/vendor/gem/lib.rb"),
916            &root
917        ));
918    }
919
920    #[test]
921    fn test_ignored_venv() {
922        let root = PathBuf::from("/project");
923        assert!(!is_relevant_path(
924            Path::new("/project/.venv/lib/site.py"),
925            &root
926        ));
927        assert!(!is_relevant_path(
928            Path::new("/project/venv/lib/site.py"),
929            &root
930        ));
931    }
932
933    #[test]
934    fn test_ignored_env() {
935        let root = PathBuf::from("/project");
936        assert!(!is_relevant_path(
937            Path::new("/project/.env/lib/site.py"),
938            &root
939        ));
940        assert!(!is_relevant_path(
941            Path::new("/project/env/lib/site.py"),
942            &root
943        ));
944    }
945
946    #[test]
947    fn test_ignored_dist_build() {
948        let root = PathBuf::from("/project");
949        assert!(!is_relevant_path(
950            Path::new("/project/dist/bundle.js"),
951            &root
952        ));
953        assert!(!is_relevant_path(
954            Path::new("/project/build/output.js"),
955            &root
956        ));
957    }
958
959    #[test]
960    fn test_ignored_next_nuxt() {
961        let root = PathBuf::from("/project");
962        assert!(!is_relevant_path(
963            Path::new("/project/.next/server/app.js"),
964            &root
965        ));
966        assert!(!is_relevant_path(
967            Path::new("/project/.nuxt/dist/app.js"),
968            &root
969        ));
970    }
971
972    #[test]
973    fn test_ignored_mypy_pytest_tox() {
974        let root = PathBuf::from("/project");
975        assert!(!is_relevant_path(
976            Path::new("/project/.mypy_cache/3.11/mod.py"),
977            &root
978        ));
979        assert!(!is_relevant_path(
980            Path::new("/project/.pytest_cache/v/test.py"),
981            &root
982        ));
983        assert!(!is_relevant_path(
984            Path::new("/project/.tox/py311/lib.py"),
985            &root
986        ));
987    }
988
989    #[test]
990    fn test_ignored_hg_svn() {
991        let root = PathBuf::from("/project");
992        assert!(!is_relevant_path(
993            Path::new("/project/.hg/store/data.py"),
994            &root
995        ));
996        assert!(!is_relevant_path(
997            Path::new("/project/.svn/entries.py"),
998            &root
999        ));
1000    }
1001
1002    // ── Path boundary conditions ──
1003
1004    #[test]
1005    fn test_hidden_dir_ignored() {
1006        let root = PathBuf::from("/project");
1007        assert!(!is_relevant_path(
1008            Path::new("/project/.hidden/script.py"),
1009            &root
1010        ));
1011    }
1012
1013    #[test]
1014    fn test_root_level_file_allowed() {
1015        let root = PathBuf::from("/project");
1016        assert!(is_relevant_path(Path::new("/project/setup.py"), &root));
1017    }
1018
1019    #[test]
1020    fn test_deeply_nested_file_allowed() {
1021        let root = PathBuf::from("/project");
1022        assert!(is_relevant_path(
1023            Path::new("/project/src/auth/tokens/validate.py"),
1024            &root
1025        ));
1026    }
1027
1028    #[test]
1029    fn test_path_outside_root_rejected() {
1030        let root = PathBuf::from("/project");
1031        assert!(
1032            !is_relevant_path(Path::new("/other/project/main.py"), &root),
1033            "files outside root should be rejected"
1034        );
1035    }
1036
1037    #[test]
1038    fn test_path_sibling_of_root_rejected() {
1039        let root = PathBuf::from("/workspace/project-a");
1040        assert!(
1041            !is_relevant_path(Path::new("/workspace/project-b/main.py"), &root),
1042            "files in sibling directory should be rejected"
1043        );
1044    }
1045
1046    #[test]
1047    fn test_path_partial_prefix_rejected() {
1048        let root = PathBuf::from("/project");
1049        // "/project-b/main.py" starts with "/project" as a string but is not under /project/
1050        assert!(
1051            !is_relevant_path(Path::new("/project-b/main.py"), &root),
1052            "partial prefix match should be rejected (strip_prefix handles this correctly)"
1053        );
1054    }
1055
1056    // ── WatchConfig ──
1057
1058    #[test]
1059    fn test_config_defaults() {
1060        let config = WatchConfig::new(PathBuf::from("."));
1061        assert_eq!(config.debounce, Duration::from_secs(5));
1062        assert_eq!(config.rag_override, None);
1063        assert_eq!(config.rag_delay, Duration::from_secs(30));
1064        assert!(!config.json_events);
1065    }
1066
1067    #[test]
1068    fn auto_detect_embeds_only_when_repo_has_embeddings() {
1069        assert!(resolve_watch_rag(None, 5));
1070        assert!(!resolve_watch_rag(None, 0));
1071    }
1072
1073    #[test]
1074    fn explicit_override_beats_embedding_count() {
1075        assert!(!resolve_watch_rag(Some(false), 100));
1076        assert!(resolve_watch_rag(Some(true), 0));
1077    }
1078
1079    // ── NDJSON event serialization ──
1080    //
1081    // Lock in the wire format of the events `cartog watch --json` produces:
1082    // downstream tools parse these, so a field rename would be a breaking
1083    // change that should show up in a diff review.
1084
1085    #[test]
1086    fn test_watch_event_started_shape() {
1087        let e = WatchEvent::Started {
1088            root: "/proj",
1089            debounce_ms: 5000,
1090            rag: true,
1091            rag_delay_s: 30,
1092        };
1093        let s = serde_json::to_string(&e).unwrap();
1094        assert!(s.contains("\"event\":\"started\""));
1095        assert!(s.contains("\"root\":\"/proj\""));
1096        assert!(s.contains("\"debounce_ms\":5000"));
1097        assert!(s.contains("\"rag\":true"));
1098        assert!(s.contains("\"rag_delay_s\":30"));
1099    }
1100
1101    #[test]
1102    fn test_watch_event_reindex_shape() {
1103        let e = WatchEvent::Reindex {
1104            files_indexed: 1,
1105            files_skipped: 2,
1106            files_removed: 0,
1107            symbols_added: 10,
1108            edges_added: 4,
1109            edges_resolved: 3,
1110            duration_ms: 42,
1111        };
1112        let s = serde_json::to_string(&e).unwrap();
1113        assert!(s.contains("\"event\":\"reindex\""));
1114        assert!(s.contains("\"files_indexed\":1"));
1115        assert!(s.contains("\"duration_ms\":42"));
1116    }
1117
1118    #[test]
1119    fn test_watch_event_shutdown_shape() {
1120        let s = serde_json::to_string(&WatchEvent::Shutdown).unwrap();
1121        assert_eq!(s, "{\"event\":\"shutdown\"}");
1122    }
1123
1124    #[test]
1125    fn test_config_custom_values() {
1126        let mut config = WatchConfig::new(PathBuf::from("/my/project"));
1127        config.debounce = Duration::from_secs(5);
1128        config.rag_override = Some(true);
1129        config.rag_delay = Duration::from_secs(60);
1130        assert_eq!(config.root, PathBuf::from("/my/project"));
1131        assert_eq!(config.debounce, Duration::from_secs(5));
1132        assert_eq!(config.rag_override, Some(true));
1133        assert_eq!(config.rag_delay, Duration::from_secs(60));
1134    }
1135
1136    // ── spawn_watch error paths ──
1137
1138    #[test]
1139    fn test_spawn_watch_nonexistent_dir() {
1140        let config = WatchConfig::new(PathBuf::from("/nonexistent/path/xyz"));
1141        let result = spawn_watch(config, ":memory:");
1142        assert!(result.is_err(), "should fail for nonexistent directory");
1143    }
1144
1145    #[test]
1146    fn test_spawn_watch_file_not_dir() {
1147        // Use Cargo.toml as a file that exists but is not a directory
1148        let manifest = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("Cargo.toml");
1149        let config = WatchConfig::new(manifest);
1150        let result = spawn_watch(config, ":memory:");
1151        assert!(
1152            result.is_err(),
1153            "should fail when target is a file, not dir"
1154        );
1155    }
1156
1157    // ── validate_pid_lock_config branches ──
1158
1159    #[test]
1160    fn validate_pid_lock_accepts_both_none() {
1161        let config = WatchConfig::new(PathBuf::from("."));
1162        assert!(
1163            validate_pid_lock_config(&config).is_ok(),
1164            "untracked mode (both None) is valid"
1165        );
1166    }
1167
1168    #[test]
1169    fn validate_pid_lock_accepts_both_set() {
1170        let mut config = WatchConfig::new(PathBuf::from("."));
1171        config.pid_lock_dir = Some(PathBuf::from("/tmp/cartog-locks"));
1172        config.pid_lock_slot = Some("watch-0123456789abcdef".to_string());
1173        assert!(
1174            validate_pid_lock_config(&config).is_ok(),
1175            "both fields set is valid"
1176        );
1177    }
1178
1179    #[test]
1180    fn validate_pid_lock_rejects_dir_without_slot() {
1181        let mut config = WatchConfig::new(PathBuf::from("."));
1182        config.pid_lock_dir = Some(PathBuf::from("/tmp/cartog-locks"));
1183        let err = validate_pid_lock_config(&config).expect_err("dir without slot must fail");
1184        assert!(
1185            err.to_string().contains("pid_lock_slot is None"),
1186            "error names the missing slot: {err}"
1187        );
1188    }
1189
1190    #[test]
1191    fn validate_pid_lock_rejects_slot_without_dir() {
1192        let mut config = WatchConfig::new(PathBuf::from("."));
1193        config.pid_lock_slot = Some("watch-0123456789abcdef".to_string());
1194        let err = validate_pid_lock_config(&config).expect_err("slot without dir must fail");
1195        assert!(
1196            err.to_string().contains("pid_lock_dir is None"),
1197            "error names the missing directory: {err}"
1198        );
1199    }
1200
1201    // ── is_ignored_dirname direct tests ──
1202
1203    #[test]
1204    fn test_is_ignored_dirname_known_dirs() {
1205        let ignored = [
1206            ".git",
1207            ".hg",
1208            ".svn",
1209            "node_modules",
1210            "__pycache__",
1211            ".mypy_cache",
1212            ".pytest_cache",
1213            ".tox",
1214            ".venv",
1215            "venv",
1216            ".env",
1217            "env",
1218            "target",
1219            "dist",
1220            "build",
1221            ".next",
1222            ".nuxt",
1223            "vendor",
1224        ];
1225        for name in &ignored {
1226            assert!(is_ignored_dirname(name), "{name} should be ignored");
1227        }
1228    }
1229
1230    #[test]
1231    fn test_is_ignored_dirname_hidden_dirs() {
1232        assert!(is_ignored_dirname(".hidden"));
1233        assert!(is_ignored_dirname(".cache"));
1234        assert!(is_ignored_dirname(".config"));
1235    }
1236
1237    #[test]
1238    fn test_is_ignored_dirname_allowed_dirs() {
1239        let allowed = [
1240            "src", "lib", "tests", "docs", "app", "cmd", "internal", "pkg",
1241        ];
1242        for name in &allowed {
1243            assert!(!is_ignored_dirname(name), "{name} should NOT be ignored");
1244        }
1245    }
1246
1247    #[test]
1248    fn test_is_ignored_dirname_case_sensitive() {
1249        // "Target" != "target" — should NOT be ignored (case-sensitive match)
1250        assert!(!is_ignored_dirname("Target"));
1251        assert!(!is_ignored_dirname("NODE_MODULES"));
1252        assert!(!is_ignored_dirname("Vendor"));
1253    }
1254
1255    // ── WatchHandle shutdown ──
1256
1257    #[test]
1258    fn test_watch_handle_drop_signals_shutdown() {
1259        let shutdown = Arc::new(AtomicBool::new(false));
1260        let shutdown_clone = Arc::clone(&shutdown);
1261        let handle = WatchHandle {
1262            shutdown: shutdown_clone,
1263            thread: None,
1264        };
1265        assert!(!shutdown.load(Ordering::SeqCst));
1266        drop(handle);
1267        assert!(
1268            shutdown.load(Ordering::SeqCst),
1269            "drop should set shutdown flag"
1270        );
1271    }
1272
1273    #[test]
1274    fn test_watch_handle_stop_signals_and_joins() {
1275        let shutdown = Arc::new(AtomicBool::new(false));
1276        let shutdown_clone = Arc::clone(&shutdown);
1277        let shutdown_for_thread = Arc::clone(&shutdown);
1278
1279        let thread = std::thread::spawn(move || {
1280            // Simulate work loop that checks shutdown
1281            while !shutdown_for_thread.load(Ordering::SeqCst) {
1282                std::thread::sleep(Duration::from_millis(10));
1283            }
1284        });
1285
1286        let handle = WatchHandle {
1287            shutdown: shutdown_clone,
1288            thread: Some(thread),
1289        };
1290        handle.stop(); // Should set flag AND join thread
1291        assert!(shutdown.load(Ordering::SeqCst));
1292    }
1293}