Skip to main content

synwire_index/
index.rs

1//! [`SemanticIndex`] — the primary entry point for semantic indexing.
2
3use std::collections::HashMap;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use tokio::sync::{RwLock, mpsc};
7use uuid::Uuid;
8
9use synwire_core::embeddings::Embeddings;
10use synwire_core::rerankers::Reranker;
11use synwire_core::vectorstores::VectorStore;
12use synwire_core::vfs::{
13    IndexEvent, IndexHandle, IndexOptions, IndexResult, IndexStatus, SemanticSearchOptions,
14    SemanticSearchResult, VfsError,
15};
16
17use crate::cache;
18use crate::config::IndexConfig;
19use crate::hashes;
20use crate::pipeline;
21use crate::watcher::WatcherHandle;
22
23/// Internal state of a single indexing job.
24struct IndexJob {
25    path: PathBuf,
26    status: IndexStatus,
27    watcher: Option<WatcherHandle>,
28}
29
30/// Factory closure that creates a [`VectorStore`] for a given cache directory.
31///
32/// Receives the path to the per-index cache directory.  The factory is
33/// responsible for opening or creating the store at that location.
34pub type StoreFactory = Box<
35    dyn Fn(&Path) -> Result<Arc<dyn VectorStore>, Box<dyn std::error::Error + Send + Sync>>
36        + Send
37        + Sync,
38>;
39
40/// The semantic indexing pipeline.
41///
42/// Orchestrates directory walking, AST-aware chunking, embedding, and
43/// vector storage.  VFS providers hold an instance of this struct and
44/// delegate `index`, `status`, and `search` calls to it.
45///
46/// # Thread safety
47///
48/// `SemanticIndex` is `Send + Sync` and may be shared across async tasks via
49/// `Arc<SemanticIndex>`.
50pub struct SemanticIndex {
51    embeddings: Arc<dyn Embeddings>,
52    reranker: Option<Arc<dyn Reranker>>,
53    store_factory: StoreFactory,
54    config: IndexConfig,
55    jobs: Arc<RwLock<HashMap<String, IndexJob>>>,
56    event_tx: Option<mpsc::Sender<IndexEvent>>,
57}
58
59impl SemanticIndex {
60    /// Create a new `SemanticIndex` with the given dependencies.
61    ///
62    /// - `embeddings` — embedding model used to vectorise chunks.
63    /// - `reranker` — optional cross-encoder for result reranking.
64    /// - `store_factory` — factory that produces a [`VectorStore`] for a cache path.
65    /// - `config` — pipeline configuration (chunk sizes, cache directory).
66    /// - `event_tx` — optional channel for streaming [`IndexEvent`]s to a caller.
67    pub fn new(
68        embeddings: Arc<dyn Embeddings>,
69        reranker: Option<Arc<dyn Reranker>>,
70        store_factory: StoreFactory,
71        config: IndexConfig,
72        event_tx: Option<mpsc::Sender<IndexEvent>>,
73    ) -> Self {
74        Self {
75            embeddings,
76            reranker,
77            store_factory,
78            config,
79            jobs: Arc::new(RwLock::new(HashMap::new())),
80            event_tx,
81        }
82    }
83
84    /// Start indexing `path` asynchronously.  Returns immediately with an [`IndexHandle`].
85    ///
86    /// If `opts.force` is `false` and a valid cache exists for this path, the
87    /// index is considered fresh and no work is performed.
88    ///
89    /// # Errors
90    ///
91    /// - [`VfsError::IndexDenied`] — `path` resolves to the filesystem root.
92    /// - [`VfsError::Io`] — `path` cannot be canonicalised or the store factory fails.
93    #[allow(clippy::too_many_lines)]
94    pub async fn index(&self, path: &Path, opts: &IndexOptions) -> Result<IndexHandle, VfsError> {
95        let canonical = std::fs::canonicalize(path).map_err(VfsError::Io)?;
96
97        if canonical == Path::new("/") {
98            return Err(VfsError::IndexDenied {
99                reason: "Indexing the root filesystem is not permitted.".into(),
100            });
101        }
102
103        let index_id = Uuid::new_v4().to_string();
104        let handle = IndexHandle {
105            index_id: index_id.clone(),
106            path: canonical.to_string_lossy().to_string(),
107        };
108
109        {
110            let mut jobs = self.jobs.write().await;
111            let _ = jobs.insert(
112                index_id.clone(),
113                IndexJob {
114                    path: canonical.clone(),
115                    status: IndexStatus::Pending,
116                    watcher: None,
117                },
118            );
119        }
120
121        let cache_dir = cache::cache_dir(&self.config, &canonical);
122        let store = (self.store_factory)(&cache_dir)
123            .map_err(|e| VfsError::Io(std::io::Error::other(e.to_string())))?;
124
125        // Use cached result if fresh and force=false.
126        if !opts.force
127            && let Some(meta) = cache::read_meta(&cache_dir)
128        {
129            let result = IndexResult {
130                path: canonical.to_string_lossy().to_string(),
131                files_indexed: meta.files_indexed,
132                chunks_produced: meta.chunks_produced,
133                was_cached: true,
134            };
135            {
136                let mut jobs = self.jobs.write().await;
137                if let Some(job) = jobs.get_mut(&index_id) {
138                    job.status = IndexStatus::Ready(result.clone());
139                }
140            }
141            if let Some(tx) = &self.event_tx {
142                let _ = tx
143                    .send(IndexEvent::Complete {
144                        index_id: index_id.clone(),
145                        result,
146                    })
147                    .await;
148            }
149            return Ok(handle);
150        }
151
152        // Spawn the background indexing task.
153        let jobs_ref = Arc::clone(&self.jobs);
154        let embeddings = Arc::clone(&self.embeddings);
155        let store_for_watch = Arc::clone(&store);
156        let event_tx = self.event_tx.clone();
157        let opts_clone = opts.clone();
158        let config = self.config.clone();
159        let id_clone = index_id.clone();
160        let canonical_clone = canonical.clone();
161
162        let _task = tokio::spawn(async move {
163            // Transition to Indexing.
164            {
165                let mut jobs = jobs_ref.write().await;
166                if let Some(job) = jobs.get_mut(&id_clone) {
167                    job.status = IndexStatus::Indexing { progress: 0.0 };
168                }
169            }
170            if let Some(tx) = &event_tx {
171                let _ = tx
172                    .send(IndexEvent::Progress {
173                        index_id: id_clone.clone(),
174                        progress: 0.0,
175                    })
176                    .await;
177            }
178
179            let idx_cache_dir = cache::cache_dir(&config, &canonical_clone);
180            let mut hash_registry = hashes::read_hashes(&idx_cache_dir);
181
182            match pipeline::run(
183                &canonical_clone,
184                &opts_clone,
185                &embeddings,
186                &store,
187                config.chunk_size,
188                config.chunk_overlap,
189                &mut hash_registry,
190            )
191            .await
192            {
193                Ok((files_indexed, chunks_produced)) => {
194                    // Persist the updated content hashes alongside the meta.
195                    if let Err(e) = hashes::write_hashes(&idx_cache_dir, &hash_registry) {
196                        tracing::warn!("Failed to write hash registry: {e}");
197                    }
198
199                    let meta = cache::IndexMeta {
200                        path: canonical_clone.to_string_lossy().to_string(),
201                        indexed_at: chrono::Utc::now().to_rfc3339(),
202                        files_indexed,
203                        chunks_produced,
204                        version: 1,
205                    };
206                    if let Err(e) = cache::write_meta(&idx_cache_dir, &meta) {
207                        tracing::warn!("Failed to write index meta: {e}");
208                    }
209
210                    let result = IndexResult {
211                        path: canonical_clone.to_string_lossy().to_string(),
212                        files_indexed,
213                        chunks_produced,
214                        was_cached: false,
215                    };
216
217                    let watcher = crate::watcher::start(
218                        canonical_clone,
219                        embeddings,
220                        store_for_watch,
221                        config.chunk_size,
222                        config.chunk_overlap,
223                        hash_registry.files,
224                    );
225
226                    {
227                        let mut jobs = jobs_ref.write().await;
228                        if let Some(job) = jobs.get_mut(&id_clone) {
229                            job.status = IndexStatus::Ready(result.clone());
230                            job.watcher = Some(watcher);
231                        }
232                    }
233                    if let Some(tx) = &event_tx {
234                        let _ = tx
235                            .send(IndexEvent::Complete {
236                                index_id: id_clone,
237                                result,
238                            })
239                            .await;
240                    }
241                }
242                Err(e) => {
243                    let err_str = e.to_string();
244                    {
245                        let mut jobs = jobs_ref.write().await;
246                        if let Some(job) = jobs.get_mut(&id_clone) {
247                            job.status = IndexStatus::Failed(err_str.clone());
248                        }
249                    }
250                    if let Some(tx) = &event_tx {
251                        let _ = tx
252                            .send(IndexEvent::Failed {
253                                index_id: id_clone,
254                                error: err_str,
255                            })
256                            .await;
257                    }
258                }
259            }
260        });
261
262        Ok(handle)
263    }
264
265    /// Check the status of an indexing operation by its `index_id`.
266    ///
267    /// # Errors
268    ///
269    /// Returns [`VfsError::NotFound`] if `index_id` is unknown.
270    pub async fn status(&self, index_id: &str) -> Result<IndexStatus, VfsError> {
271        let jobs = self.jobs.read().await;
272        jobs.get(index_id).map_or_else(
273            || Err(VfsError::NotFound(format!("No index with id {index_id}"))),
274            |job| Ok(job.status.clone()),
275        )
276    }
277
278    /// Semantic search across indexed content for `path`.
279    ///
280    /// # Errors
281    ///
282    /// - [`VfsError::IndexNotReady`] — the index is still building or has never been started.
283    /// - [`VfsError::Io`] — store factory or similarity search fails.
284    #[allow(clippy::too_many_lines)]
285    pub async fn search(
286        &self,
287        path: &Path,
288        query: &str,
289        opts: &SemanticSearchOptions,
290    ) -> Result<Vec<SemanticSearchResult>, VfsError> {
291        let canonical = std::fs::canonicalize(path).map_err(VfsError::Io)?;
292        let cache_dir = cache::cache_dir(&self.config, &canonical);
293
294        {
295            let jobs = self.jobs.read().await;
296            let any_indexing = jobs.values().filter(|j| j.path == canonical).any(|j| {
297                matches!(
298                    j.status,
299                    IndexStatus::Indexing { .. } | IndexStatus::Pending
300                )
301            });
302            let any_ready = jobs
303                .values()
304                .filter(|j| j.path == canonical)
305                .any(|j| matches!(j.status, IndexStatus::Ready(_)));
306            drop(jobs);
307
308            if any_indexing && !any_ready {
309                return Err(VfsError::IndexNotReady(
310                    canonical.to_string_lossy().to_string(),
311                ));
312            }
313            if !any_ready && cache::read_meta(&cache_dir).is_none() {
314                return Err(VfsError::IndexNotReady(
315                    canonical.to_string_lossy().to_string(),
316                ));
317            }
318        }
319
320        let vector_store = (self.store_factory)(&cache_dir)
321            .map_err(|e| VfsError::Io(std::io::Error::other(e.to_string())))?;
322
323        let top_k = opts.top_k.unwrap_or(10);
324        let search_results = vector_store
325            .similarity_search_with_score(query, top_k, self.embeddings.as_ref())
326            .await
327            .map_err(|e| VfsError::Io(std::io::Error::other(e.to_string())))?;
328
329        let min_score = opts.min_score.unwrap_or(0.0);
330        let use_reranker = opts.rerank.unwrap_or(true);
331
332        // Optionally rerank the candidate documents.
333        let docs_for_rerank: Vec<_> = search_results.iter().map(|(d, _)| d.clone()).collect();
334        let reranked = if use_reranker {
335            if let Some(reranker) = &self.reranker {
336                reranker
337                    .rerank(query, &docs_for_rerank, top_k)
338                    .await
339                    .map_err(|e| VfsError::Io(std::io::Error::other(e.to_string())))?
340            } else {
341                docs_for_rerank
342            }
343        } else {
344            docs_for_rerank
345        };
346
347        // Pair reranked docs with scores from the original similarity search by
348        // position.  Reranking may reorder results so scores are approximate.
349        let raw_scores: Vec<f32> = search_results.iter().map(|(_, s)| *s).collect();
350
351        let file_filter_globs: Vec<globset::GlobMatcher> = opts
352            .file_filter
353            .iter()
354            .filter_map(|pat| {
355                globset::Glob::new(pat)
356                    .map_err(|e| tracing::warn!("Invalid file_filter glob {pat:?}: {e}"))
357                    .ok()
358                    .map(|g| g.compile_matcher())
359            })
360            .collect();
361
362        let mut output = Vec::new();
363        for (i, doc) in reranked.into_iter().enumerate() {
364            let hit_score = raw_scores.get(i).copied().unwrap_or(0.0);
365            if hit_score < min_score {
366                continue;
367            }
368
369            let file = doc
370                .metadata
371                .get("file")
372                .and_then(serde_json::Value::as_str)
373                .unwrap_or("")
374                .to_string();
375
376            if !file_filter_globs.is_empty() && !file_filter_globs.iter().any(|m| m.is_match(&file))
377            {
378                continue;
379            }
380
381            #[allow(clippy::cast_possible_truncation)]
382            let line_start = doc
383                .metadata
384                .get("line_start")
385                .and_then(serde_json::Value::as_u64)
386                .unwrap_or(1) as usize;
387
388            #[allow(clippy::cast_possible_truncation)]
389            let line_end = doc
390                .metadata
391                .get("line_end")
392                .and_then(serde_json::Value::as_u64)
393                .unwrap_or(1) as usize;
394
395            output.push(SemanticSearchResult {
396                file,
397                line_start,
398                line_end,
399                content: doc.page_content.clone(),
400                score: hit_score,
401                symbol: doc
402                    .metadata
403                    .get("symbol")
404                    .and_then(serde_json::Value::as_str)
405                    .map(str::to_owned),
406                language: doc
407                    .metadata
408                    .get("language")
409                    .and_then(serde_json::Value::as_str)
410                    .map(str::to_owned),
411            });
412        }
413
414        Ok(output)
415    }
416
417    /// Stop the background watcher for `path`.
418    ///
419    /// Does nothing if `path` cannot be canonicalised or has no active watcher.
420    pub async fn unwatch(&self, path: &Path) {
421        let Ok(canonical) = std::fs::canonicalize(path) else {
422            return;
423        };
424        let mut jobs = self.jobs.write().await;
425        for job in jobs.values_mut().filter(|j| j.path == canonical) {
426            if let Some(w) = job.watcher.take() {
427                w.stop();
428            }
429        }
430    }
431}