1use std::collections::HashMap;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use tokio::sync::{RwLock, mpsc};
7use uuid::Uuid;
8
9use synwire_core::embeddings::Embeddings;
10use synwire_core::rerankers::Reranker;
11use synwire_core::vectorstores::VectorStore;
12use synwire_core::vfs::{
13 IndexEvent, IndexHandle, IndexOptions, IndexResult, IndexStatus, SemanticSearchOptions,
14 SemanticSearchResult, VfsError,
15};
16
17use crate::cache;
18use crate::config::IndexConfig;
19use crate::hashes;
20use crate::pipeline;
21use crate::watcher::WatcherHandle;
22
23struct IndexJob {
25 path: PathBuf,
26 status: IndexStatus,
27 watcher: Option<WatcherHandle>,
28}
29
30pub type StoreFactory = Box<
35 dyn Fn(&Path) -> Result<Arc<dyn VectorStore>, Box<dyn std::error::Error + Send + Sync>>
36 + Send
37 + Sync,
38>;
39
40pub struct SemanticIndex {
51 embeddings: Arc<dyn Embeddings>,
52 reranker: Option<Arc<dyn Reranker>>,
53 store_factory: StoreFactory,
54 config: IndexConfig,
55 jobs: Arc<RwLock<HashMap<String, IndexJob>>>,
56 event_tx: Option<mpsc::Sender<IndexEvent>>,
57}
58
59impl SemanticIndex {
60 pub fn new(
68 embeddings: Arc<dyn Embeddings>,
69 reranker: Option<Arc<dyn Reranker>>,
70 store_factory: StoreFactory,
71 config: IndexConfig,
72 event_tx: Option<mpsc::Sender<IndexEvent>>,
73 ) -> Self {
74 Self {
75 embeddings,
76 reranker,
77 store_factory,
78 config,
79 jobs: Arc::new(RwLock::new(HashMap::new())),
80 event_tx,
81 }
82 }
83
84 #[allow(clippy::too_many_lines)]
94 pub async fn index(&self, path: &Path, opts: &IndexOptions) -> Result<IndexHandle, VfsError> {
95 let canonical = std::fs::canonicalize(path).map_err(VfsError::Io)?;
96
97 if canonical == Path::new("/") {
98 return Err(VfsError::IndexDenied {
99 reason: "Indexing the root filesystem is not permitted.".into(),
100 });
101 }
102
103 let index_id = Uuid::new_v4().to_string();
104 let handle = IndexHandle {
105 index_id: index_id.clone(),
106 path: canonical.to_string_lossy().to_string(),
107 };
108
109 {
110 let mut jobs = self.jobs.write().await;
111 let _ = jobs.insert(
112 index_id.clone(),
113 IndexJob {
114 path: canonical.clone(),
115 status: IndexStatus::Pending,
116 watcher: None,
117 },
118 );
119 }
120
121 let cache_dir = cache::cache_dir(&self.config, &canonical);
122 let store = (self.store_factory)(&cache_dir)
123 .map_err(|e| VfsError::Io(std::io::Error::other(e.to_string())))?;
124
125 if !opts.force
127 && let Some(meta) = cache::read_meta(&cache_dir)
128 {
129 let result = IndexResult {
130 path: canonical.to_string_lossy().to_string(),
131 files_indexed: meta.files_indexed,
132 chunks_produced: meta.chunks_produced,
133 was_cached: true,
134 };
135 {
136 let mut jobs = self.jobs.write().await;
137 if let Some(job) = jobs.get_mut(&index_id) {
138 job.status = IndexStatus::Ready(result.clone());
139 }
140 }
141 if let Some(tx) = &self.event_tx {
142 let _ = tx
143 .send(IndexEvent::Complete {
144 index_id: index_id.clone(),
145 result,
146 })
147 .await;
148 }
149 return Ok(handle);
150 }
151
152 let jobs_ref = Arc::clone(&self.jobs);
154 let embeddings = Arc::clone(&self.embeddings);
155 let store_for_watch = Arc::clone(&store);
156 let event_tx = self.event_tx.clone();
157 let opts_clone = opts.clone();
158 let config = self.config.clone();
159 let id_clone = index_id.clone();
160 let canonical_clone = canonical.clone();
161
162 let _task = tokio::spawn(async move {
163 {
165 let mut jobs = jobs_ref.write().await;
166 if let Some(job) = jobs.get_mut(&id_clone) {
167 job.status = IndexStatus::Indexing { progress: 0.0 };
168 }
169 }
170 if let Some(tx) = &event_tx {
171 let _ = tx
172 .send(IndexEvent::Progress {
173 index_id: id_clone.clone(),
174 progress: 0.0,
175 })
176 .await;
177 }
178
179 let idx_cache_dir = cache::cache_dir(&config, &canonical_clone);
180 let mut hash_registry = hashes::read_hashes(&idx_cache_dir);
181
182 match pipeline::run(
183 &canonical_clone,
184 &opts_clone,
185 &embeddings,
186 &store,
187 config.chunk_size,
188 config.chunk_overlap,
189 &mut hash_registry,
190 )
191 .await
192 {
193 Ok((files_indexed, chunks_produced)) => {
194 if let Err(e) = hashes::write_hashes(&idx_cache_dir, &hash_registry) {
196 tracing::warn!("Failed to write hash registry: {e}");
197 }
198
199 let meta = cache::IndexMeta {
200 path: canonical_clone.to_string_lossy().to_string(),
201 indexed_at: chrono::Utc::now().to_rfc3339(),
202 files_indexed,
203 chunks_produced,
204 version: 1,
205 };
206 if let Err(e) = cache::write_meta(&idx_cache_dir, &meta) {
207 tracing::warn!("Failed to write index meta: {e}");
208 }
209
210 let result = IndexResult {
211 path: canonical_clone.to_string_lossy().to_string(),
212 files_indexed,
213 chunks_produced,
214 was_cached: false,
215 };
216
217 let watcher = crate::watcher::start(
218 canonical_clone,
219 embeddings,
220 store_for_watch,
221 config.chunk_size,
222 config.chunk_overlap,
223 hash_registry.files,
224 );
225
226 {
227 let mut jobs = jobs_ref.write().await;
228 if let Some(job) = jobs.get_mut(&id_clone) {
229 job.status = IndexStatus::Ready(result.clone());
230 job.watcher = Some(watcher);
231 }
232 }
233 if let Some(tx) = &event_tx {
234 let _ = tx
235 .send(IndexEvent::Complete {
236 index_id: id_clone,
237 result,
238 })
239 .await;
240 }
241 }
242 Err(e) => {
243 let err_str = e.to_string();
244 {
245 let mut jobs = jobs_ref.write().await;
246 if let Some(job) = jobs.get_mut(&id_clone) {
247 job.status = IndexStatus::Failed(err_str.clone());
248 }
249 }
250 if let Some(tx) = &event_tx {
251 let _ = tx
252 .send(IndexEvent::Failed {
253 index_id: id_clone,
254 error: err_str,
255 })
256 .await;
257 }
258 }
259 }
260 });
261
262 Ok(handle)
263 }
264
265 pub async fn status(&self, index_id: &str) -> Result<IndexStatus, VfsError> {
271 let jobs = self.jobs.read().await;
272 jobs.get(index_id).map_or_else(
273 || Err(VfsError::NotFound(format!("No index with id {index_id}"))),
274 |job| Ok(job.status.clone()),
275 )
276 }
277
278 #[allow(clippy::too_many_lines)]
285 pub async fn search(
286 &self,
287 path: &Path,
288 query: &str,
289 opts: &SemanticSearchOptions,
290 ) -> Result<Vec<SemanticSearchResult>, VfsError> {
291 let canonical = std::fs::canonicalize(path).map_err(VfsError::Io)?;
292 let cache_dir = cache::cache_dir(&self.config, &canonical);
293
294 {
295 let jobs = self.jobs.read().await;
296 let any_indexing = jobs.values().filter(|j| j.path == canonical).any(|j| {
297 matches!(
298 j.status,
299 IndexStatus::Indexing { .. } | IndexStatus::Pending
300 )
301 });
302 let any_ready = jobs
303 .values()
304 .filter(|j| j.path == canonical)
305 .any(|j| matches!(j.status, IndexStatus::Ready(_)));
306 drop(jobs);
307
308 if any_indexing && !any_ready {
309 return Err(VfsError::IndexNotReady(
310 canonical.to_string_lossy().to_string(),
311 ));
312 }
313 if !any_ready && cache::read_meta(&cache_dir).is_none() {
314 return Err(VfsError::IndexNotReady(
315 canonical.to_string_lossy().to_string(),
316 ));
317 }
318 }
319
320 let vector_store = (self.store_factory)(&cache_dir)
321 .map_err(|e| VfsError::Io(std::io::Error::other(e.to_string())))?;
322
323 let top_k = opts.top_k.unwrap_or(10);
324 let search_results = vector_store
325 .similarity_search_with_score(query, top_k, self.embeddings.as_ref())
326 .await
327 .map_err(|e| VfsError::Io(std::io::Error::other(e.to_string())))?;
328
329 let min_score = opts.min_score.unwrap_or(0.0);
330 let use_reranker = opts.rerank.unwrap_or(true);
331
332 let docs_for_rerank: Vec<_> = search_results.iter().map(|(d, _)| d.clone()).collect();
334 let reranked = if use_reranker {
335 if let Some(reranker) = &self.reranker {
336 reranker
337 .rerank(query, &docs_for_rerank, top_k)
338 .await
339 .map_err(|e| VfsError::Io(std::io::Error::other(e.to_string())))?
340 } else {
341 docs_for_rerank
342 }
343 } else {
344 docs_for_rerank
345 };
346
347 let raw_scores: Vec<f32> = search_results.iter().map(|(_, s)| *s).collect();
350
351 let file_filter_globs: Vec<globset::GlobMatcher> = opts
352 .file_filter
353 .iter()
354 .filter_map(|pat| {
355 globset::Glob::new(pat)
356 .map_err(|e| tracing::warn!("Invalid file_filter glob {pat:?}: {e}"))
357 .ok()
358 .map(|g| g.compile_matcher())
359 })
360 .collect();
361
362 let mut output = Vec::new();
363 for (i, doc) in reranked.into_iter().enumerate() {
364 let hit_score = raw_scores.get(i).copied().unwrap_or(0.0);
365 if hit_score < min_score {
366 continue;
367 }
368
369 let file = doc
370 .metadata
371 .get("file")
372 .and_then(serde_json::Value::as_str)
373 .unwrap_or("")
374 .to_string();
375
376 if !file_filter_globs.is_empty() && !file_filter_globs.iter().any(|m| m.is_match(&file))
377 {
378 continue;
379 }
380
381 #[allow(clippy::cast_possible_truncation)]
382 let line_start = doc
383 .metadata
384 .get("line_start")
385 .and_then(serde_json::Value::as_u64)
386 .unwrap_or(1) as usize;
387
388 #[allow(clippy::cast_possible_truncation)]
389 let line_end = doc
390 .metadata
391 .get("line_end")
392 .and_then(serde_json::Value::as_u64)
393 .unwrap_or(1) as usize;
394
395 output.push(SemanticSearchResult {
396 file,
397 line_start,
398 line_end,
399 content: doc.page_content.clone(),
400 score: hit_score,
401 symbol: doc
402 .metadata
403 .get("symbol")
404 .and_then(serde_json::Value::as_str)
405 .map(str::to_owned),
406 language: doc
407 .metadata
408 .get("language")
409 .and_then(serde_json::Value::as_str)
410 .map(str::to_owned),
411 });
412 }
413
414 Ok(output)
415 }
416
417 pub async fn unwatch(&self, path: &Path) {
421 let Ok(canonical) = std::fs::canonicalize(path) else {
422 return;
423 };
424 let mut jobs = self.jobs.write().await;
425 for job in jobs.values_mut().filter(|j| j.path == canonical) {
426 if let Some(w) = job.watcher.take() {
427 w.stop();
428 }
429 }
430 }
431}