Skip to main content

claw_vector/
engine.rs

1// engine.rs — VectorEngine: public entry point that unifies all subsystems.
2use std::sync::Arc;
3
4use tracing::instrument;
5
6use crate::{
7    collections::CollectionManager,
8    config::VectorConfig,
9    embeddings::{EmbeddingClient, EmbeddingProvider},
10    error::VectorResult,
11    search::{AnnSearcher, HybridSearcher},
12    store::VectorStore,
13    types::{
14        Collection, DistanceMetric, EngineStats, HybridQuery, SearchQuery, SearchResponse,
15        VectorRecord,
16    },
17};
18
19/// High-level engine that manages collections, search, and embeddings.
20pub struct VectorEngine {
21    /// Runtime configuration.
22    pub config: VectorConfig,
23    /// Collection lifecycle and persistence manager.
24    pub collections: Arc<CollectionManager>,
25    /// ANN search service.
26    pub ann_searcher: Arc<AnnSearcher>,
27    /// Hybrid vector + keyword search service.
28    pub hybrid_searcher: Arc<HybridSearcher>,
29    /// Embedding provider used for text ingestion and search.
30    pub embedding_client: Arc<dyn EmbeddingProvider>,
31}
32
33impl VectorEngine {
34    fn ensure_default_workspace_allowed(&self, operation: &str) -> VectorResult<()> {
35        if self.config.require_workspace_id {
36            return Err(crate::error::VectorError::Config(format!(
37                "{operation} requires explicit workspace_id"
38            )));
39        }
40        Ok(())
41    }
42
43    /// Create a new engine using the configured gRPC embedding service.
44    #[instrument]
45    pub async fn new(config: VectorConfig) -> VectorResult<Self> {
46        let embedding_client =
47            Arc::new(EmbeddingClient::new(&config).await?) as Arc<dyn EmbeddingProvider>;
48        Self::with_embedding_provider(config, embedding_client).await
49    }
50
51    /// Open an engine using the default configuration.
52    #[instrument]
53    pub async fn open_default() -> VectorResult<Self> {
54        Self::new(VectorConfig::default()).await
55    }
56
57    /// Create a new engine with a caller-supplied embedding provider.
58    #[instrument(skip(embedding_client))]
59    pub async fn with_embedding_provider(
60        config: VectorConfig,
61        embedding_client: Arc<dyn EmbeddingProvider>,
62    ) -> VectorResult<Self> {
63        let store = Arc::new(VectorStore::new(&config.db_path).await?);
64        let collections =
65            Arc::new(CollectionManager::new(config.clone(), Arc::clone(&store)).await?);
66        let ann_searcher = Arc::new(AnnSearcher::new(Arc::clone(&collections)));
67        let hybrid_searcher = Arc::new(HybridSearcher::new(
68            Arc::clone(&ann_searcher),
69            Arc::clone(&store),
70        ));
71
72        Ok(VectorEngine {
73            config,
74            collections,
75            ann_searcher,
76            hybrid_searcher,
77            embedding_client,
78        })
79    }
80
81    /// Create a collection with the provided dimensions and distance metric.
82    #[instrument(skip(self))]
83    pub async fn create_collection(
84        &self,
85        name: &str,
86        dimensions: usize,
87        distance: DistanceMetric,
88    ) -> VectorResult<Collection> {
89        self.ensure_default_workspace_allowed("create_collection")?;
90        self.collections
91            .create_collection(
92                &self.config.default_workspace_id,
93                name,
94                dimensions,
95                distance,
96            )
97            .await
98    }
99
100    /// Create a collection inside a specific workspace.
101    #[instrument(skip(self))]
102    pub async fn create_collection_in_workspace(
103        &self,
104        workspace_id: &str,
105        name: &str,
106        dimensions: usize,
107        distance: DistanceMetric,
108    ) -> VectorResult<Collection> {
109        self.collections
110            .create_collection(workspace_id, name, dimensions, distance)
111            .await
112    }
113
114    /// Delete a collection and all of its persisted state.
115    #[instrument(skip(self))]
116    pub async fn delete_collection(&self, name: &str) -> VectorResult<()> {
117        self.ensure_default_workspace_allowed("delete_collection")?;
118        self.collections
119            .delete_collection(&self.config.default_workspace_id, name)
120            .await
121    }
122
123    /// Delete a collection inside a specific workspace.
124    #[instrument(skip(self))]
125    pub async fn delete_collection_in_workspace(
126        &self,
127        workspace_id: &str,
128        name: &str,
129    ) -> VectorResult<()> {
130        self.collections.delete_collection(workspace_id, name).await
131    }
132
133    /// List all collections.
134    #[instrument(skip(self))]
135    pub async fn list_collections(&self) -> VectorResult<Vec<Collection>> {
136        self.ensure_default_workspace_allowed("list_collections")?;
137        self.collections
138            .list_collections(&self.config.default_workspace_id)
139            .await
140    }
141
142    /// List collections scoped to a workspace.
143    #[instrument(skip(self))]
144    pub async fn list_collections_in_workspace(
145        &self,
146        workspace_id: &str,
147    ) -> VectorResult<Vec<Collection>> {
148        self.collections.list_collections(workspace_id).await
149    }
150
151    /// Embed text, persist the record, and return its UUID.
152    #[instrument(skip(self, text, metadata))]
153    pub async fn upsert(
154        &self,
155        collection: &str,
156        text: &str,
157        metadata: serde_json::Value,
158    ) -> VectorResult<uuid::Uuid> {
159        self.ensure_default_workspace_allowed("upsert")?;
160        let vector = self.embedding_client.embed_one(text).await?;
161        let record = VectorRecord::new(collection, vector)
162            .with_text(text.to_string())
163            .with_metadata(metadata);
164        self.collections
165            .insert_vector(&self.config.default_workspace_id, record)
166            .await
167    }
168
169    /// Embed and insert a record in a workspace-scoped collection.
170    #[instrument(skip(self, text, metadata))]
171    pub async fn upsert_in_workspace(
172        &self,
173        workspace_id: &str,
174        collection: &str,
175        text: &str,
176        metadata: serde_json::Value,
177    ) -> VectorResult<uuid::Uuid> {
178        let vector = self.embedding_client.embed_one(text).await?;
179        let record = VectorRecord::new(collection, vector)
180            .with_text(text.to_string())
181            .with_metadata(metadata);
182        self.collections.insert_vector(workspace_id, record).await
183    }
184
185    /// Embed and insert multiple text records.
186    #[instrument(skip(self, items))]
187    pub async fn upsert_batch(
188        &self,
189        collection: &str,
190        items: Vec<(String, serde_json::Value)>,
191    ) -> VectorResult<Vec<uuid::Uuid>> {
192        self.ensure_default_workspace_allowed("upsert_batch")?;
193        let texts = items
194            .iter()
195            .map(|(text, _)| text.clone())
196            .collect::<Vec<_>>();
197        let embeddings = self.embedding_client.embed(texts).await?;
198        let records = items
199            .into_iter()
200            .zip(embeddings.into_iter())
201            .map(|((text, metadata), vector)| {
202                VectorRecord::new(collection, vector)
203                    .with_text(text)
204                    .with_metadata(metadata)
205            })
206            .collect::<Vec<_>>();
207        self.collections
208            .insert_batch(&self.config.default_workspace_id, records)
209            .await
210    }
211
212    /// Embed and insert multiple records in a workspace-scoped collection.
213    #[instrument(skip(self, items))]
214    pub async fn upsert_batch_in_workspace(
215        &self,
216        workspace_id: &str,
217        collection: &str,
218        items: Vec<(String, serde_json::Value)>,
219    ) -> VectorResult<Vec<uuid::Uuid>> {
220        let texts = items
221            .iter()
222            .map(|(text, _)| text.clone())
223            .collect::<Vec<_>>();
224        let embeddings = self.embedding_client.embed(texts).await?;
225        let records = items
226            .into_iter()
227            .zip(embeddings.into_iter())
228            .map(|((text, metadata), vector)| {
229                VectorRecord::new(collection, vector)
230                    .with_text(text)
231                    .with_metadata(metadata)
232            })
233            .collect::<Vec<_>>();
234        self.collections.insert_batch(workspace_id, records).await
235    }
236
237    /// Insert a raw vector directly.
238    #[instrument(skip(self, vector, metadata))]
239    pub async fn upsert_vector(
240        &self,
241        collection: &str,
242        vector: Vec<f32>,
243        metadata: serde_json::Value,
244    ) -> VectorResult<uuid::Uuid> {
245        self.ensure_default_workspace_allowed("upsert_vector")?;
246        let record = VectorRecord::new(collection, vector).with_metadata(metadata);
247        self.collections
248            .insert_vector(&self.config.default_workspace_id, record)
249            .await
250    }
251
252    /// Insert a raw vector directly into a workspace-scoped collection.
253    #[instrument(skip(self, vector, metadata))]
254    pub async fn upsert_vector_in_workspace(
255        &self,
256        workspace_id: &str,
257        collection: &str,
258        vector: Vec<f32>,
259        metadata: serde_json::Value,
260    ) -> VectorResult<uuid::Uuid> {
261        let record = VectorRecord::new(collection, vector).with_metadata(metadata);
262        self.collections.insert_vector(workspace_id, record).await
263    }
264
265    /// Execute ANN search.
266    #[instrument(skip(self, query))]
267    pub async fn search(&self, query: SearchQuery) -> VectorResult<SearchResponse> {
268        self.ensure_default_workspace_allowed("search")?;
269        self.ann_searcher.search(query).await
270    }
271
272    /// Execute ANN search scoped to a workspace.
273    #[instrument(skip(self, query))]
274    pub async fn search_in_workspace(
275        &self,
276        workspace_id: &str,
277        query: SearchQuery,
278    ) -> VectorResult<SearchResponse> {
279        self.ann_searcher
280            .search_in_workspace(workspace_id, query)
281            .await
282    }
283
284    /// Execute ANN search from raw text.
285    #[instrument(skip(self, text))]
286    pub async fn search_text(
287        &self,
288        collection: &str,
289        text: &str,
290        top_k: usize,
291    ) -> VectorResult<SearchResponse> {
292        self.ensure_default_workspace_allowed("search_text")?;
293        let vector = self.embedding_client.embed_one(text).await?;
294        self.ann_searcher
295            .search(SearchQuery {
296                collection: collection.to_string(),
297                vector,
298                top_k,
299                filter: None,
300                include_vectors: false,
301                include_metadata: true,
302                ef_search: None,
303                reranker: None,
304            })
305            .await
306    }
307
308    /// Execute ANN search from raw text scoped to a workspace.
309    #[instrument(skip(self, text))]
310    pub async fn search_text_in_workspace(
311        &self,
312        workspace_id: &str,
313        collection: &str,
314        text: &str,
315        top_k: usize,
316    ) -> VectorResult<SearchResponse> {
317        let vector = self.embedding_client.embed_one(text).await?;
318        self.ann_searcher
319            .search_in_workspace(
320                workspace_id,
321                SearchQuery {
322                    collection: collection.to_string(),
323                    vector,
324                    top_k,
325                    filter: None,
326                    include_vectors: false,
327                    include_metadata: true,
328                    ef_search: None,
329                    reranker: None,
330                },
331            )
332            .await
333    }
334
335    /// Execute hybrid search.
336    #[instrument(skip(self, query))]
337    pub async fn hybrid_search(&self, query: HybridQuery) -> VectorResult<SearchResponse> {
338        self.ensure_default_workspace_allowed("hybrid_search")?;
339        self.hybrid_searcher.search(query).await
340    }
341
342    /// Delete a vector record by UUID.
343    #[instrument(skip(self))]
344    pub async fn delete(&self, collection: &str, id: uuid::Uuid) -> VectorResult<bool> {
345        self.ensure_default_workspace_allowed("delete")?;
346        self.collections
347            .delete_vector(&self.config.default_workspace_id, collection, id)
348            .await
349    }
350
351    /// Delete a vector by UUID from a workspace-scoped collection.
352    #[instrument(skip(self))]
353    pub async fn delete_in_workspace(
354        &self,
355        workspace_id: &str,
356        collection: &str,
357        id: uuid::Uuid,
358    ) -> VectorResult<bool> {
359        self.collections
360            .delete_vector(workspace_id, collection, id)
361            .await
362    }
363
364    /// Fetch a vector record by UUID.
365    #[instrument(skip(self))]
366    pub async fn get(&self, collection: &str, id: uuid::Uuid) -> VectorResult<VectorRecord> {
367        self.ensure_default_workspace_allowed("get")?;
368        self.collections
369            .get_vector(&self.config.default_workspace_id, collection, id)
370            .await
371    }
372
373    /// Fetch a vector by UUID from a workspace-scoped collection.
374    #[instrument(skip(self))]
375    pub async fn get_in_workspace(
376        &self,
377        workspace_id: &str,
378        collection: &str,
379        id: uuid::Uuid,
380    ) -> VectorResult<VectorRecord> {
381        self.collections
382            .get_vector(workspace_id, collection, id)
383            .await
384    }
385
386    /// Persist indexes and close the underlying store.
387    #[instrument(skip(self))]
388    pub async fn close(&self) -> VectorResult<()> {
389        self.collections.persist_indexes().await?;
390        self.collections.store.close().await;
391        Ok(())
392    }
393
394    /// Return runtime statistics for the engine.
395    #[instrument(skip(self))]
396    pub async fn stats(&self) -> EngineStats {
397        let collections = self
398            .collections
399            .list_collections(&self.config.default_workspace_id)
400            .await
401            .unwrap_or_default();
402        let cache_stats = self
403            .embedding_client
404            .cache_stats()
405            .await
406            .unwrap_or_default();
407
408        EngineStats {
409            collection_count: collections.len(),
410            total_vectors: collections
411                .iter()
412                .map(|collection| collection.vector_count)
413                .sum(),
414            loaded_indexes: self.collections.loaded_index_count().await,
415            loaded_mmap_files: self.collections.loaded_mmap_count().await,
416            embedding_cache_hits: cache_stats.hit_count,
417            embedding_cache_misses: cache_stats.miss_count,
418        }
419    }
420}