Skip to main content

seekr_code/server/
daemon.rs

1//! Watch daemon for real-time incremental indexing.
2//!
3//! Monitors file system changes via the async watcher and triggers
4//! incremental index updates with debounce to batch rapid changes.
5
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::Duration;
9
10use tokio::sync::RwLock;
11
12use crate::config::SeekrConfig;
13use crate::embedder::traits::Embedder;
14use crate::index::incremental::IncrementalState;
15use crate::index::store::SeekrIndex;
16use crate::scanner::watcher::{FileEvent, dedup_events, start_async_watcher};
17
18/// Default debounce interval in milliseconds.
19const DEFAULT_DEBOUNCE_MS: u64 = 500;
20
21/// Run the watch daemon that monitors file changes and updates the index.
22///
23/// This function spawns an async task that:
24/// 1. Listens for file system events via the async watcher
25/// 2. Debounces rapid changes (batches events within a time window)
26/// 3. Triggers incremental index updates for changed files
27/// 4. Removes deleted files from the index
28///
29/// The `index` is shared with the HTTP server via `Arc<RwLock<>>`.
30pub async fn run_watch_daemon(
31    watch_path: &Path,
32    config: &SeekrConfig,
33    index: Arc<RwLock<SeekrIndex>>,
34    debounce_ms: Option<u64>,
35) -> Result<(), crate::error::ServerError> {
36    let debounce = Duration::from_millis(debounce_ms.unwrap_or(DEFAULT_DEBOUNCE_MS));
37    let watch_path = watch_path.to_path_buf();
38
39    // Start the async file watcher
40    let (_watcher, mut rx) = start_async_watcher(&watch_path)
41        .map_err(|e| crate::error::ServerError::Internal(format!("Watch error: {}", e)))?;
42
43    tracing::info!(
44        path = %watch_path.display(),
45        debounce_ms = debounce.as_millis() as u64,
46        "Watch daemon started — monitoring for file changes"
47    );
48
49    // Load incremental state
50    let index_dir = config.index_dir.join(
51        watch_path
52            .file_name()
53            .unwrap_or_default()
54            .to_string_lossy()
55            .as_ref(),
56    );
57    let state_path = index_dir.join("incremental_state.json");
58    let mut inc_state = IncrementalState::load(&state_path).unwrap_or_default();
59
60    // Main event loop
61    let mut pending_events: Vec<FileEvent> = Vec::new();
62
63    loop {
64        // Wait for the first event or timeout to process pending events
65        tokio::select! {
66            event = rx.recv() => {
67                match event {
68                    Some(fe) => {
69                        pending_events.push(fe);
70                        // Drain any additional events that arrived
71                        while let Ok(more) = rx.try_recv() {
72                            pending_events.push(more);
73                        }
74                    }
75                    None => {
76                        tracing::warn!("File watcher channel closed, stopping daemon");
77                        break;
78                    }
79                }
80
81                // Start debounce timer — collect more events within the window
82                tokio::time::sleep(debounce).await;
83
84                // Drain any events that arrived during debounce
85                while let Ok(more) = rx.try_recv() {
86                    pending_events.push(more);
87                }
88
89                // Process the batch
90                if !pending_events.is_empty() {
91                    let events = std::mem::take(&mut pending_events);
92                    let deduped = dedup_events(events);
93
94                    match process_events(&deduped, &index, &mut inc_state, config).await {
95                        Ok((added, removed)) => {
96                            if added > 0 || removed > 0 {
97                                tracing::info!(
98                                    added = added,
99                                    removed = removed,
100                                    "Incremental index updated"
101                                );
102
103                                // Save incremental state
104                                if let Err(e) = inc_state.save(&state_path) {
105                                    tracing::warn!("Failed to save incremental state: {}", e);
106                                }
107
108                                // Save the index
109                                let idx = index.read().await;
110                                if let Err(e) = idx.save(&index_dir) {
111                                    tracing::warn!("Failed to save index: {}", e);
112                                }
113                            }
114                        }
115                        Err(e) => {
116                            tracing::error!("Error processing file events: {}", e);
117                            // Continue running despite errors
118                        }
119                    }
120                }
121            }
122        }
123    }
124
125    Ok(())
126}
127
128/// Process a batch of deduplicated file events.
129///
130/// Returns (chunks_added, chunks_removed).
131async fn process_events(
132    events: &[FileEvent],
133    index: &Arc<RwLock<SeekrIndex>>,
134    inc_state: &mut IncrementalState,
135    config: &SeekrConfig,
136) -> Result<(usize, usize), String> {
137    let mut changed_files: Vec<PathBuf> = Vec::new();
138    let mut deleted_files: Vec<PathBuf> = Vec::new();
139
140    for event in events {
141        match event {
142            FileEvent::Changed(path) => {
143                // Filter by supported file extensions
144                if is_supported_file(path) {
145                    changed_files.push(path.clone());
146                }
147            }
148            FileEvent::Deleted(path) => {
149                deleted_files.push(path.clone());
150            }
151        }
152    }
153
154    let mut total_added = 0;
155    let mut total_removed = 0;
156
157    // Handle deletions
158    if !deleted_files.is_empty() {
159        let chunk_ids_to_remove = inc_state.chunk_ids_to_remove(&deleted_files);
160        if !chunk_ids_to_remove.is_empty() {
161            let mut idx = index.write().await;
162            idx.remove_chunks(&chunk_ids_to_remove);
163            total_removed = chunk_ids_to_remove.len();
164        }
165        inc_state.apply_deletions(&deleted_files);
166
167        tracing::debug!(
168            count = deleted_files.len(),
169            chunks = total_removed,
170            "Removed deleted files from index"
171        );
172    }
173
174    // Handle changed/new files
175    if !changed_files.is_empty() {
176        // Remove old chunks for changed files first
177        for file in &changed_files {
178            let old_ids = inc_state.chunk_ids_for_file(file);
179            if !old_ids.is_empty() {
180                let mut idx = index.write().await;
181                idx.remove_chunks(&old_ids);
182                total_removed += old_ids.len();
183            }
184        }
185
186        // Parse and re-index changed files
187        let embedder = create_embedder(config)?;
188
189        for file in &changed_files {
190            match process_single_file(file, &*embedder, index, inc_state).await {
191                Ok(count) => {
192                    total_added += count;
193                    tracing::debug!(file = %file.display(), chunks = count, "Re-indexed file");
194                }
195                Err(e) => {
196                    tracing::warn!(file = %file.display(), error = %e, "Failed to index file");
197                }
198            }
199        }
200    }
201
202    Ok((total_added, total_removed))
203}
204
205/// Parse, embed, and add a single file to the index.
206///
207/// Returns the number of chunks added.
208async fn process_single_file(
209    file: &Path,
210    embedder: &dyn Embedder,
211    index: &Arc<RwLock<SeekrIndex>>,
212    inc_state: &mut IncrementalState,
213) -> Result<usize, String> {
214    // Read file content for hashing
215    let content = std::fs::read(file).map_err(|e| e.to_string())?;
216
217    // Parse into chunks using the existing chunker
218    let parse_result =
219        crate::parser::chunker::chunk_file_from_path(file).map_err(|e| e.to_string())?;
220
221    let chunks = match parse_result {
222        Some(result) => result.chunks,
223        None => {
224            // Unsupported language or binary file — still track in state
225            inc_state.update_file(file.to_path_buf(), &content, Vec::new());
226            return Ok(0);
227        }
228    };
229
230    if chunks.is_empty() {
231        inc_state.update_file(file.to_path_buf(), &content, Vec::new());
232        return Ok(0);
233    }
234
235    // Generate embeddings
236    let texts: Vec<&str> = chunks.iter().map(|c| c.body.as_str()).collect();
237    let embeddings = embedder.embed_batch(&texts).map_err(|e| e.to_string())?;
238
239    // Add to index
240    let mut chunk_ids = Vec::new();
241    {
242        let mut idx = index.write().await;
243        for (chunk, embedding) in chunks.iter().zip(embeddings.iter()) {
244            let text_tokens = crate::index::store::tokenize_for_index_pub(&chunk.body);
245            let entry = crate::index::IndexEntry {
246                chunk_id: chunk.id,
247                embedding: embedding.clone(),
248                text_tokens,
249            };
250            idx.add_entry(entry, chunk.clone());
251            chunk_ids.push(chunk.id);
252        }
253    }
254
255    // Update incremental state
256    inc_state.update_file(file.to_path_buf(), &content, chunk_ids);
257
258    Ok(chunks.len())
259}
260
261/// Check if a file has a supported extension for indexing.
262fn is_supported_file(path: &Path) -> bool {
263    let supported = [
264        "rs", "py", "js", "jsx", "ts", "tsx", "go", "java", "c", "cpp", "h", "hpp", "rb", "sh",
265        "bash", "json", "toml", "yaml", "yml", "html", "css",
266    ];
267
268    path.extension()
269        .and_then(|e| e.to_str())
270        .map(|ext| supported.contains(&ext))
271        .unwrap_or(false)
272}
273
274/// Create an embedder for the daemon.
275fn create_embedder(config: &SeekrConfig) -> Result<Box<dyn Embedder>, String> {
276    match crate::embedder::onnx::OnnxEmbedder::new(&config.model_dir) {
277        Ok(embedder) => Ok(Box::new(embedder)),
278        Err(e) => Err(format!("Failed to create embedder: {}", e)),
279    }
280}