seekr_code/server/
daemon.rs1use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::Duration;
9
10use tokio::sync::RwLock;
11
12use crate::config::SeekrConfig;
13use crate::embedder::traits::Embedder;
14use crate::index::incremental::IncrementalState;
15use crate::index::store::SeekrIndex;
16use crate::scanner::watcher::{FileEvent, dedup_events, start_async_watcher};
17
18const DEFAULT_DEBOUNCE_MS: u64 = 500;
20
21pub async fn run_watch_daemon(
31 watch_path: &Path,
32 config: &SeekrConfig,
33 index: Arc<RwLock<SeekrIndex>>,
34 debounce_ms: Option<u64>,
35) -> Result<(), crate::error::ServerError> {
36 let debounce = Duration::from_millis(debounce_ms.unwrap_or(DEFAULT_DEBOUNCE_MS));
37 let watch_path = watch_path.to_path_buf();
38
39 let (_watcher, mut rx) = start_async_watcher(&watch_path)
41 .map_err(|e| crate::error::ServerError::Internal(format!("Watch error: {}", e)))?;
42
43 tracing::info!(
44 path = %watch_path.display(),
45 debounce_ms = debounce.as_millis() as u64,
46 "Watch daemon started — monitoring for file changes"
47 );
48
49 let index_dir = config.index_dir.join(
51 watch_path
52 .file_name()
53 .unwrap_or_default()
54 .to_string_lossy()
55 .as_ref(),
56 );
57 let state_path = index_dir.join("incremental_state.json");
58 let mut inc_state = IncrementalState::load(&state_path).unwrap_or_default();
59
60 let mut pending_events: Vec<FileEvent> = Vec::new();
62
63 loop {
64 tokio::select! {
66 event = rx.recv() => {
67 match event {
68 Some(fe) => {
69 pending_events.push(fe);
70 while let Ok(more) = rx.try_recv() {
72 pending_events.push(more);
73 }
74 }
75 None => {
76 tracing::warn!("File watcher channel closed, stopping daemon");
77 break;
78 }
79 }
80
81 tokio::time::sleep(debounce).await;
83
84 while let Ok(more) = rx.try_recv() {
86 pending_events.push(more);
87 }
88
89 if !pending_events.is_empty() {
91 let events = std::mem::take(&mut pending_events);
92 let deduped = dedup_events(events);
93
94 match process_events(&deduped, &index, &mut inc_state, config).await {
95 Ok((added, removed)) => {
96 if added > 0 || removed > 0 {
97 tracing::info!(
98 added = added,
99 removed = removed,
100 "Incremental index updated"
101 );
102
103 if let Err(e) = inc_state.save(&state_path) {
105 tracing::warn!("Failed to save incremental state: {}", e);
106 }
107
108 let idx = index.read().await;
110 if let Err(e) = idx.save(&index_dir) {
111 tracing::warn!("Failed to save index: {}", e);
112 }
113 }
114 }
115 Err(e) => {
116 tracing::error!("Error processing file events: {}", e);
117 }
119 }
120 }
121 }
122 }
123 }
124
125 Ok(())
126}
127
128async fn process_events(
132 events: &[FileEvent],
133 index: &Arc<RwLock<SeekrIndex>>,
134 inc_state: &mut IncrementalState,
135 config: &SeekrConfig,
136) -> Result<(usize, usize), String> {
137 let mut changed_files: Vec<PathBuf> = Vec::new();
138 let mut deleted_files: Vec<PathBuf> = Vec::new();
139
140 for event in events {
141 match event {
142 FileEvent::Changed(path) => {
143 if is_supported_file(path) {
145 changed_files.push(path.clone());
146 }
147 }
148 FileEvent::Deleted(path) => {
149 deleted_files.push(path.clone());
150 }
151 }
152 }
153
154 let mut total_added = 0;
155 let mut total_removed = 0;
156
157 if !deleted_files.is_empty() {
159 let chunk_ids_to_remove = inc_state.chunk_ids_to_remove(&deleted_files);
160 if !chunk_ids_to_remove.is_empty() {
161 let mut idx = index.write().await;
162 idx.remove_chunks(&chunk_ids_to_remove);
163 total_removed = chunk_ids_to_remove.len();
164 }
165 inc_state.apply_deletions(&deleted_files);
166
167 tracing::debug!(
168 count = deleted_files.len(),
169 chunks = total_removed,
170 "Removed deleted files from index"
171 );
172 }
173
174 if !changed_files.is_empty() {
176 for file in &changed_files {
178 let old_ids = inc_state.chunk_ids_for_file(file);
179 if !old_ids.is_empty() {
180 let mut idx = index.write().await;
181 idx.remove_chunks(&old_ids);
182 total_removed += old_ids.len();
183 }
184 }
185
186 let embedder = create_embedder(config)?;
188
189 for file in &changed_files {
190 match process_single_file(file, &*embedder, index, inc_state).await {
191 Ok(count) => {
192 total_added += count;
193 tracing::debug!(file = %file.display(), chunks = count, "Re-indexed file");
194 }
195 Err(e) => {
196 tracing::warn!(file = %file.display(), error = %e, "Failed to index file");
197 }
198 }
199 }
200 }
201
202 Ok((total_added, total_removed))
203}
204
205async fn process_single_file(
209 file: &Path,
210 embedder: &dyn Embedder,
211 index: &Arc<RwLock<SeekrIndex>>,
212 inc_state: &mut IncrementalState,
213) -> Result<usize, String> {
214 let content = std::fs::read(file).map_err(|e| e.to_string())?;
216
217 let parse_result =
219 crate::parser::chunker::chunk_file_from_path(file).map_err(|e| e.to_string())?;
220
221 let chunks = match parse_result {
222 Some(result) => result.chunks,
223 None => {
224 inc_state.update_file(file.to_path_buf(), &content, Vec::new());
226 return Ok(0);
227 }
228 };
229
230 if chunks.is_empty() {
231 inc_state.update_file(file.to_path_buf(), &content, Vec::new());
232 return Ok(0);
233 }
234
235 let texts: Vec<&str> = chunks.iter().map(|c| c.body.as_str()).collect();
237 let embeddings = embedder.embed_batch(&texts).map_err(|e| e.to_string())?;
238
239 let mut chunk_ids = Vec::new();
241 {
242 let mut idx = index.write().await;
243 for (chunk, embedding) in chunks.iter().zip(embeddings.iter()) {
244 let text_tokens = crate::index::store::tokenize_for_index_pub(&chunk.body);
245 let entry = crate::index::IndexEntry {
246 chunk_id: chunk.id,
247 embedding: embedding.clone(),
248 text_tokens,
249 };
250 idx.add_entry(entry, chunk.clone());
251 chunk_ids.push(chunk.id);
252 }
253 }
254
255 inc_state.update_file(file.to_path_buf(), &content, chunk_ids);
257
258 Ok(chunks.len())
259}
260
261fn is_supported_file(path: &Path) -> bool {
263 let supported = [
264 "rs", "py", "js", "jsx", "ts", "tsx", "go", "java", "c", "cpp", "h", "hpp", "rb", "sh",
265 "bash", "json", "toml", "yaml", "yml", "html", "css",
266 ];
267
268 path.extension()
269 .and_then(|e| e.to_str())
270 .map(|ext| supported.contains(&ext))
271 .unwrap_or(false)
272}
273
274fn create_embedder(config: &SeekrConfig) -> Result<Box<dyn Embedder>, String> {
276 match crate::embedder::onnx::OnnxEmbedder::new(&config.model_dir) {
277 Ok(embedder) => Ok(Box::new(embedder)),
278 Err(e) => Err(format!("Failed to create embedder: {}", e)),
279 }
280}