greppy/daemon/
watcher.rs

1//! File system watcher for incremental indexing
2//!
3//! Watches project directories for changes and incrementally updates the index.
4//! - Create/Modify: Re-index the changed file
5//! - Delete: Remove file's chunks from index
6//! - Debounced: Waits for activity to settle before processing
7//!
8//! Design: Non-blocking, runs in background task, doesn't affect search performance.
9
10use crate::core::error::{Error, Result};
11use crate::index::{IndexWriter, TantivyIndex};
12use crate::parse::chunk_file;
13use crate::trace::builder::{remove_file_from_index, update_file_incremental};
14use crate::trace::storage::{load_index, save_index, trace_index_path};
15use crate::trace::{find_dead_symbols, snapshots::create_snapshot, SemanticIndex};
16use notify::{
17    event::{CreateKind, ModifyKind, RemoveKind},
18    Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
19};
20use std::collections::{HashMap, HashSet};
21use std::path::{Path, PathBuf};
22use std::sync::mpsc as std_mpsc;
23use std::time::Duration;
24use tracing::{debug, info, warn};
25
26/// Debounce delay - wait this long after last event before processing
27const DEBOUNCE_MS: u64 = 500;
28
29/// Events that trigger re-indexing
30#[derive(Debug, Clone)]
31pub enum FileEvent {
32    /// File was created or modified - needs (re)indexing
33    Changed(PathBuf),
34    /// File was deleted - needs removal from index
35    Deleted(PathBuf),
36}
37
38/// Result of processing file events for a project
39#[derive(Debug, Clone, Default)]
40pub struct UpdateResult {
41    /// Number of files reindexed
42    pub files_reindexed: usize,
43    /// Number of files deleted
44    pub files_deleted: usize,
45    /// Paths that were changed
46    pub changed_paths: Vec<PathBuf>,
47    /// Paths that were deleted
48    pub deleted_paths: Vec<PathBuf>,
49    /// Processing time in milliseconds
50    pub elapsed_ms: f64,
51}
52
53/// Manages file watchers for multiple projects
54pub struct WatcherManager {
55    /// Active watchers by project path
56    watchers: HashMap<PathBuf, ProjectWatcher>,
57    /// Channel to receive aggregated events (std::sync for use in blocking context)
58    event_tx: std_mpsc::Sender<(PathBuf, FileEvent)>,
59    event_rx: std_mpsc::Receiver<(PathBuf, FileEvent)>,
60}
61
62impl WatcherManager {
63    pub fn new() -> Self {
64        let (event_tx, event_rx) = std_mpsc::channel();
65        Self {
66            watchers: HashMap::new(),
67            event_tx,
68            event_rx,
69        }
70    }
71
72    /// Start watching a project directory
73    pub fn watch(&mut self, project_path: PathBuf) -> Result<()> {
74        if self.watchers.contains_key(&project_path) {
75            debug!(project = %project_path.display(), "Already watching");
76            return Ok(());
77        }
78
79        let watcher = ProjectWatcher::new(project_path.clone(), self.event_tx.clone())?;
80        self.watchers.insert(project_path.clone(), watcher);
81        info!(project = %project_path.display(), "Started watching");
82        Ok(())
83    }
84
85    /// Stop watching a project directory
86    pub fn unwatch(&mut self, project_path: &Path) {
87        if self.watchers.remove(project_path).is_some() {
88            info!(project = %project_path.display(), "Stopped watching");
89        }
90    }
91
92    /// Process pending events synchronously (for use in spawn_blocking)
93    /// Returns list of projects that were updated with their update results
94    pub fn process_events_sync(&mut self) -> Vec<(PathBuf, UpdateResult)> {
95        let mut pending: HashMap<PathBuf, Vec<FileEvent>> = HashMap::new();
96        let debounce = Duration::from_millis(DEBOUNCE_MS);
97
98        // Collect events with timeout (non-blocking drain)
99        loop {
100            match self.event_rx.recv_timeout(debounce) {
101                Ok((project, event)) => {
102                    pending.entry(project).or_default().push(event);
103                }
104                Err(std_mpsc::RecvTimeoutError::Timeout) => {
105                    // Debounce complete, process what we have
106                    if !pending.is_empty() {
107                        break;
108                    }
109                    // Nothing pending, return empty
110                    return Vec::new();
111                }
112                Err(std_mpsc::RecvTimeoutError::Disconnected) => {
113                    break;
114                }
115            }
116        }
117
118        // Process each project's events
119        let mut updated = Vec::new();
120        for (project_path, events) in pending {
121            match process_project_events_sync(&project_path, events) {
122                Ok(result) => {
123                    updated.push((project_path, result));
124                }
125                Err(e) => {
126                    warn!(project = %project_path.display(), error = %e, "Failed to process events");
127                }
128            }
129        }
130
131        updated
132    }
133}
134
135impl Default for WatcherManager {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141/// Watcher for a single project
142struct ProjectWatcher {
143    #[allow(dead_code)]
144    watcher: RecommendedWatcher,
145    #[allow(dead_code)]
146    project_path: PathBuf,
147}
148
149impl ProjectWatcher {
150    fn new(
151        project_path: PathBuf,
152        event_tx: std_mpsc::Sender<(PathBuf, FileEvent)>,
153    ) -> Result<Self> {
154        let project_path_clone = project_path.clone();
155
156        let mut watcher = notify::recommended_watcher(move |res: notify::Result<Event>| {
157            if let Ok(event) = res {
158                if let Some(file_event) = classify_event(&event) {
159                    // Send event - ignore errors if channel is full/closed
160                    let _ = event_tx.send((project_path_clone.clone(), file_event));
161                }
162            }
163        })
164        .map_err(|e| Error::WatchError {
165            message: e.to_string(),
166        })?;
167
168        watcher
169            .watch(&project_path, RecursiveMode::Recursive)
170            .map_err(|e| Error::WatchError {
171                message: e.to_string(),
172            })?;
173
174        Ok(Self {
175            watcher,
176            project_path,
177        })
178    }
179}
180
181/// Classify a notify event into our FileEvent type
182fn classify_event(event: &Event) -> Option<FileEvent> {
183    // Only care about files, not directories
184    let paths: Vec<_> = event
185        .paths
186        .iter()
187        .filter(|p| p.is_file() || !p.exists()) // Include deleted files
188        .filter(|p| is_indexable_file(p))
189        .cloned()
190        .collect();
191
192    if paths.is_empty() {
193        return None;
194    }
195
196    let path = paths.into_iter().next()?;
197
198    match &event.kind {
199        EventKind::Create(CreateKind::File) => Some(FileEvent::Changed(path)),
200        EventKind::Modify(ModifyKind::Data(_)) => Some(FileEvent::Changed(path)),
201        EventKind::Modify(ModifyKind::Name(_)) => Some(FileEvent::Changed(path)),
202        EventKind::Remove(RemoveKind::File) => Some(FileEvent::Deleted(path)),
203        _ => None,
204    }
205}
206
207/// Check if a file should be indexed
208fn is_indexable_file(path: &Path) -> bool {
209    // Skip hidden files and directories
210    if path
211        .components()
212        .any(|c| c.as_os_str().to_string_lossy().starts_with('.'))
213    {
214        return false;
215    }
216
217    // Check extension
218    let ext = path
219        .extension()
220        .and_then(|e| e.to_str())
221        .unwrap_or("")
222        .to_lowercase();
223
224    matches!(
225        ext.as_str(),
226        "ts" | "tsx"
227            | "js"
228            | "jsx"
229            | "mjs"
230            | "cjs"
231            | "py"
232            | "pyi"
233            | "rs"
234            | "go"
235            | "java"
236            | "kt"
237            | "kts"
238            | "scala"
239            | "rb"
240            | "php"
241            | "c"
242            | "h"
243            | "cpp"
244            | "cc"
245            | "cxx"
246            | "hpp"
247            | "cs"
248            | "swift"
249            | "ex"
250            | "exs"
251            | "erl"
252            | "hrl"
253            | "hs"
254            | "ml"
255            | "mli"
256            | "lua"
257            | "sh"
258            | "bash"
259            | "zsh"
260            | "sql"
261            | "vue"
262            | "svelte"
263    )
264}
265
266/// Process accumulated events for a project (synchronous version)
267fn process_project_events_sync(
268    project_path: &Path,
269    events: Vec<FileEvent>,
270) -> Result<UpdateResult> {
271    let start = std::time::Instant::now();
272
273    // Deduplicate events - if a file was changed multiple times, only process once
274    let mut to_reindex: HashSet<PathBuf> = HashSet::new();
275    let mut to_delete: HashSet<PathBuf> = HashSet::new();
276
277    for event in events {
278        match event {
279            FileEvent::Changed(path) => {
280                to_delete.remove(&path); // Changed overrides delete
281                to_reindex.insert(path);
282            }
283            FileEvent::Deleted(path) => {
284                to_reindex.remove(&path); // Delete overrides change
285                to_delete.insert(path);
286            }
287        }
288    }
289
290    if to_reindex.is_empty() && to_delete.is_empty() {
291        return Ok(UpdateResult::default());
292    }
293
294    info!(
295        project = %project_path.display(),
296        reindex = to_reindex.len(),
297        delete = to_delete.len(),
298        "Processing file changes"
299    );
300
301    // Update Tantivy text index
302    update_tantivy_index(project_path, &to_reindex, &to_delete)?;
303
304    // Update trace semantic index
305    update_trace_index(project_path, &to_reindex, &to_delete);
306
307    let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
308
309    let result = UpdateResult {
310        files_reindexed: to_reindex.len(),
311        files_deleted: to_delete.len(),
312        changed_paths: to_reindex.into_iter().collect(),
313        deleted_paths: to_delete.into_iter().collect(),
314        elapsed_ms,
315    };
316
317    info!(
318        project = %project_path.display(),
319        files_reindexed = result.files_reindexed,
320        files_deleted = result.files_deleted,
321        elapsed_ms = result.elapsed_ms,
322        "Incremental index update complete"
323    );
324
325    Ok(result)
326}
327
328/// Update the Tantivy text search index
329fn update_tantivy_index(
330    project_path: &Path,
331    to_reindex: &HashSet<PathBuf>,
332    to_delete: &HashSet<PathBuf>,
333) -> Result<()> {
334    let index = TantivyIndex::open_or_create(project_path)?;
335    let mut writer = IndexWriter::new(&index)?;
336
337    // Delete old chunks for files that changed or were deleted
338    let all_paths: Vec<_> = to_reindex.iter().chain(to_delete.iter()).collect();
339    for path in &all_paths {
340        let path_str = path.to_string_lossy();
341        writer.delete_by_path(&path_str)?;
342    }
343
344    // Re-index changed files
345    for path in to_reindex {
346        if let Ok(content) = std::fs::read_to_string(path) {
347            let chunks = chunk_file(path, &content);
348            for chunk in &chunks {
349                writer.add_chunk(chunk)?;
350            }
351            debug!(path = %path.display(), chunks = chunks.len(), "Re-indexed file (tantivy)");
352        }
353    }
354
355    // Commit changes
356    writer.commit()?;
357
358    Ok(())
359}
360
361/// Update the trace semantic index
362///
363/// This loads the existing trace index (if any), applies incremental updates,
364/// and saves the updated index back to disk.
365fn update_trace_index(
366    project_path: &Path,
367    to_reindex: &HashSet<PathBuf>,
368    to_delete: &HashSet<PathBuf>,
369) {
370    let trace_path = trace_index_path(project_path);
371
372    // Try to load existing trace index
373    let mut index = match load_index(&trace_path) {
374        Ok(idx) => idx,
375        Err(e) => {
376            // No trace index exists yet - this is fine, trace may not have been built
377            debug!(
378                project = %project_path.display(),
379                error = %e,
380                "No trace index to update (will be created on next full index)"
381            );
382            return;
383        }
384    };
385
386    let start = std::time::Instant::now();
387    let mut files_updated = 0;
388    let mut files_deleted = 0;
389
390    // Process deletions first
391    for path in to_delete {
392        let removed = remove_file_from_index(&mut index, project_path, path);
393        if removed > 0 {
394            files_deleted += 1;
395            debug!(
396                path = %path.display(),
397                symbols_removed = removed,
398                "Removed from trace index"
399            );
400        }
401    }
402
403    // Process updates/additions
404    for path in to_reindex {
405        if let Ok(content) = std::fs::read_to_string(path) {
406            let result = update_file_incremental(&mut index, project_path, path, &content);
407            files_updated += 1;
408            debug!(
409                path = %path.display(),
410                symbols_added = result.symbols_added,
411                elapsed_ms = result.elapsed_ms,
412                "Updated in trace index"
413            );
414        }
415    }
416
417    // Save the updated index
418    if files_updated > 0 || files_deleted > 0 {
419        if let Err(e) = save_index(&index, &trace_path) {
420            warn!(
421                project = %project_path.display(),
422                error = %e,
423                "Failed to save trace index"
424            );
425        } else {
426            let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
427            info!(
428                project = %project_path.display(),
429                files_updated = files_updated,
430                files_deleted = files_deleted,
431                elapsed_ms = elapsed_ms,
432                "Trace index updated"
433            );
434
435            // Create automatic snapshot after incremental update
436            let dead_symbols = find_dead_symbols(&index);
437            let cycles_count = count_cycles(&index) as u32;
438            let project_name = project_path
439                .file_name()
440                .and_then(|n| n.to_str())
441                .unwrap_or("unknown");
442
443            if let Err(e) = create_snapshot(
444                &index,
445                project_path,
446                project_name,
447                &dead_symbols.iter().map(|s| s.id).collect(),
448                cycles_count,
449                None, // Auto-generated, no custom name
450            ) {
451                debug!(
452                    project = %project_path.display(),
453                    error = %e,
454                    "Failed to create snapshot"
455                );
456            }
457        }
458    }
459}
460
461/// Count cycles using DFS (simplified version)
462fn count_cycles(index: &SemanticIndex) -> usize {
463    let mut graph: HashMap<u16, HashSet<u16>> = HashMap::new();
464
465    for edge in &index.edges {
466        if let (Some(from_sym), Some(to_sym)) =
467            (index.symbol(edge.from_symbol), index.symbol(edge.to_symbol))
468        {
469            if from_sym.file_id != to_sym.file_id {
470                graph
471                    .entry(from_sym.file_id)
472                    .or_default()
473                    .insert(to_sym.file_id);
474            }
475        }
476    }
477
478    let mut cycles = 0;
479    let mut visited = HashSet::new();
480    let mut rec_stack = HashSet::new();
481
482    for &node in graph.keys() {
483        if !visited.contains(&node) {
484            cycles += count_cycles_dfs(node, &graph, &mut visited, &mut rec_stack);
485        }
486    }
487
488    cycles
489}
490
491fn count_cycles_dfs(
492    node: u16,
493    graph: &HashMap<u16, HashSet<u16>>,
494    visited: &mut HashSet<u16>,
495    rec_stack: &mut HashSet<u16>,
496) -> usize {
497    visited.insert(node);
498    rec_stack.insert(node);
499
500    let mut cycles = 0;
501
502    if let Some(neighbors) = graph.get(&node) {
503        for &neighbor in neighbors {
504            if !visited.contains(&neighbor) {
505                cycles += count_cycles_dfs(neighbor, graph, visited, rec_stack);
506            } else if rec_stack.contains(&neighbor) {
507                cycles += 1;
508            }
509        }
510    }
511
512    rec_stack.remove(&node);
513    cycles
514}
515
516#[cfg(test)]
517mod tests {
518    use super::*;
519
520    #[test]
521    fn test_is_indexable_file() {
522        assert!(is_indexable_file(Path::new("src/main.rs")));
523        assert!(is_indexable_file(Path::new("app.tsx")));
524        assert!(!is_indexable_file(Path::new(".git/config")));
525        assert!(!is_indexable_file(Path::new("image.png")));
526    }
527}