Skip to main content

claw_vector/
engine.rs

1// engine.rs — VectorEngine: public entry point that unifies all subsystems.
2use std::sync::Arc;
3
4use tracing::instrument;
5
6use crate::{
7    collections::CollectionManager,
8    config::VectorConfig,
9    embeddings::{EmbeddingClient, EmbeddingProvider},
10    error::VectorResult,
11    search::{AnnSearcher, HybridSearcher},
12    store::VectorStore,
13    types::{
14        Collection, DistanceMetric, EngineStats, HybridQuery, SearchQuery, SearchResponse,
15        VectorRecord,
16    },
17};
18
19/// High-level engine that manages collections, search, and embeddings.
20pub struct VectorEngine {
21    /// Runtime configuration.
22    pub config: VectorConfig,
23    /// Collection lifecycle and persistence manager.
24    pub collections: Arc<CollectionManager>,
25    /// ANN search service.
26    pub ann_searcher: Arc<AnnSearcher>,
27    /// Hybrid vector + keyword search service.
28    pub hybrid_searcher: Arc<HybridSearcher>,
29    /// Embedding provider used for text ingestion and search.
30    pub embedding_client: Arc<dyn EmbeddingProvider>,
31}
32
33impl VectorEngine {
34    /// Create a new engine using the configured gRPC embedding service.
35    #[instrument]
36    pub async fn new(config: VectorConfig) -> VectorResult<Self> {
37        let embedding_client =
38            Arc::new(EmbeddingClient::new(&config).await?) as Arc<dyn EmbeddingProvider>;
39        Self::with_embedding_provider(config, embedding_client).await
40    }
41
42    /// Open an engine using the default configuration.
43    #[instrument]
44    pub async fn open_default() -> VectorResult<Self> {
45        Self::new(VectorConfig::default()).await
46    }
47
48    /// Create a new engine with a caller-supplied embedding provider.
49    #[instrument(skip(embedding_client))]
50    pub async fn with_embedding_provider(
51        config: VectorConfig,
52        embedding_client: Arc<dyn EmbeddingProvider>,
53    ) -> VectorResult<Self> {
54        let store = Arc::new(VectorStore::new(&config.db_path).await?);
55        let collections =
56            Arc::new(CollectionManager::new(config.clone(), Arc::clone(&store)).await?);
57        let ann_searcher = Arc::new(AnnSearcher::new(Arc::clone(&collections)));
58        let hybrid_searcher = Arc::new(HybridSearcher::new(
59            Arc::clone(&ann_searcher),
60            Arc::clone(&store),
61        ));
62
63        Ok(VectorEngine {
64            config,
65            collections,
66            ann_searcher,
67            hybrid_searcher,
68            embedding_client,
69        })
70    }
71
72    /// Create a collection with the provided dimensions and distance metric.
73    #[instrument(skip(self))]
74    pub async fn create_collection(
75        &self,
76        name: &str,
77        dimensions: usize,
78        distance: DistanceMetric,
79    ) -> VectorResult<Collection> {
80        self.collections
81            .create_collection(&self.config.default_workspace_id, name, dimensions, distance)
82            .await
83    }
84
85    /// Create a collection inside a specific workspace.
86    #[instrument(skip(self))]
87    pub async fn create_collection_in_workspace(
88        &self,
89        workspace_id: &str,
90        name: &str,
91        dimensions: usize,
92        distance: DistanceMetric,
93    ) -> VectorResult<Collection> {
94        self.collections
95            .create_collection(workspace_id, name, dimensions, distance)
96            .await
97    }
98
99    /// Delete a collection and all of its persisted state.
100    #[instrument(skip(self))]
101    pub async fn delete_collection(&self, name: &str) -> VectorResult<()> {
102        self.collections
103            .delete_collection(&self.config.default_workspace_id, name)
104            .await
105    }
106
107    /// Delete a collection inside a specific workspace.
108    #[instrument(skip(self))]
109    pub async fn delete_collection_in_workspace(
110        &self,
111        workspace_id: &str,
112        name: &str,
113    ) -> VectorResult<()> {
114        self.collections.delete_collection(workspace_id, name).await
115    }
116
117    /// List all collections.
118    #[instrument(skip(self))]
119    pub async fn list_collections(&self) -> VectorResult<Vec<Collection>> {
120        self.collections
121            .list_collections(&self.config.default_workspace_id)
122            .await
123    }
124
125    /// List collections scoped to a workspace.
126    #[instrument(skip(self))]
127    pub async fn list_collections_in_workspace(
128        &self,
129        workspace_id: &str,
130    ) -> VectorResult<Vec<Collection>> {
131        self.collections.list_collections(workspace_id).await
132    }
133
134    /// Embed text, persist the record, and return its UUID.
135    #[instrument(skip(self, text, metadata))]
136    pub async fn upsert(
137        &self,
138        collection: &str,
139        text: &str,
140        metadata: serde_json::Value,
141    ) -> VectorResult<uuid::Uuid> {
142        let vector = self.embedding_client.embed_one(text).await?;
143        let record = VectorRecord::new(collection, vector)
144            .with_text(text.to_string())
145            .with_metadata(metadata);
146        self.collections
147            .insert_vector(&self.config.default_workspace_id, record)
148            .await
149    }
150
151    /// Embed and insert a record in a workspace-scoped collection.
152    #[instrument(skip(self, text, metadata))]
153    pub async fn upsert_in_workspace(
154        &self,
155        workspace_id: &str,
156        collection: &str,
157        text: &str,
158        metadata: serde_json::Value,
159    ) -> VectorResult<uuid::Uuid> {
160        let vector = self.embedding_client.embed_one(text).await?;
161        let record = VectorRecord::new(collection, vector)
162            .with_text(text.to_string())
163            .with_metadata(metadata);
164        self.collections.insert_vector(workspace_id, record).await
165    }
166
167    /// Embed and insert multiple text records.
168    #[instrument(skip(self, items))]
169    pub async fn upsert_batch(
170        &self,
171        collection: &str,
172        items: Vec<(String, serde_json::Value)>,
173    ) -> VectorResult<Vec<uuid::Uuid>> {
174        let texts = items
175            .iter()
176            .map(|(text, _)| text.clone())
177            .collect::<Vec<_>>();
178        let embeddings = self.embedding_client.embed(texts).await?;
179        let records = items
180            .into_iter()
181            .zip(embeddings.into_iter())
182            .map(|((text, metadata), vector)| {
183                VectorRecord::new(collection, vector)
184                    .with_text(text)
185                    .with_metadata(metadata)
186            })
187            .collect::<Vec<_>>();
188        self.collections
189            .insert_batch(&self.config.default_workspace_id, records)
190            .await
191    }
192
193    /// Embed and insert multiple records in a workspace-scoped collection.
194    #[instrument(skip(self, items))]
195    pub async fn upsert_batch_in_workspace(
196        &self,
197        workspace_id: &str,
198        collection: &str,
199        items: Vec<(String, serde_json::Value)>,
200    ) -> VectorResult<Vec<uuid::Uuid>> {
201        let texts = items
202            .iter()
203            .map(|(text, _)| text.clone())
204            .collect::<Vec<_>>();
205        let embeddings = self.embedding_client.embed(texts).await?;
206        let records = items
207            .into_iter()
208            .zip(embeddings.into_iter())
209            .map(|((text, metadata), vector)| {
210                VectorRecord::new(collection, vector)
211                    .with_text(text)
212                    .with_metadata(metadata)
213            })
214            .collect::<Vec<_>>();
215        self.collections.insert_batch(workspace_id, records).await
216    }
217
218    /// Insert a raw vector directly.
219    #[instrument(skip(self, vector, metadata))]
220    pub async fn upsert_vector(
221        &self,
222        collection: &str,
223        vector: Vec<f32>,
224        metadata: serde_json::Value,
225    ) -> VectorResult<uuid::Uuid> {
226        let record = VectorRecord::new(collection, vector).with_metadata(metadata);
227        self.collections
228            .insert_vector(&self.config.default_workspace_id, record)
229            .await
230    }
231
232    /// Insert a raw vector directly into a workspace-scoped collection.
233    #[instrument(skip(self, vector, metadata))]
234    pub async fn upsert_vector_in_workspace(
235        &self,
236        workspace_id: &str,
237        collection: &str,
238        vector: Vec<f32>,
239        metadata: serde_json::Value,
240    ) -> VectorResult<uuid::Uuid> {
241        let record = VectorRecord::new(collection, vector).with_metadata(metadata);
242        self.collections.insert_vector(workspace_id, record).await
243    }
244
245    /// Execute ANN search.
246    #[instrument(skip(self, query))]
247    pub async fn search(&self, query: SearchQuery) -> VectorResult<SearchResponse> {
248        self.ann_searcher.search(query).await
249    }
250
251    /// Execute ANN search scoped to a workspace.
252    #[instrument(skip(self, query))]
253    pub async fn search_in_workspace(
254        &self,
255        workspace_id: &str,
256        query: SearchQuery,
257    ) -> VectorResult<SearchResponse> {
258        self.ann_searcher.search_in_workspace(workspace_id, query).await
259    }
260
261    /// Execute ANN search from raw text.
262    #[instrument(skip(self, text))]
263    pub async fn search_text(
264        &self,
265        collection: &str,
266        text: &str,
267        top_k: usize,
268    ) -> VectorResult<SearchResponse> {
269        let vector = self.embedding_client.embed_one(text).await?;
270        self.ann_searcher
271            .search(SearchQuery {
272                collection: collection.to_string(),
273                vector,
274                top_k,
275                filter: None,
276                include_vectors: false,
277                include_metadata: true,
278                ef_search: None,
279                reranker: None,
280            })
281            .await
282    }
283
284    /// Execute ANN search from raw text scoped to a workspace.
285    #[instrument(skip(self, text))]
286    pub async fn search_text_in_workspace(
287        &self,
288        workspace_id: &str,
289        collection: &str,
290        text: &str,
291        top_k: usize,
292    ) -> VectorResult<SearchResponse> {
293        let vector = self.embedding_client.embed_one(text).await?;
294        self.ann_searcher
295            .search_in_workspace(
296                workspace_id,
297                SearchQuery {
298                    collection: collection.to_string(),
299                    vector,
300                    top_k,
301                    filter: None,
302                    include_vectors: false,
303                    include_metadata: true,
304                    ef_search: None,
305                    reranker: None,
306                },
307            )
308            .await
309    }
310
311    /// Execute hybrid search.
312    #[instrument(skip(self, query))]
313    pub async fn hybrid_search(&self, query: HybridQuery) -> VectorResult<SearchResponse> {
314        self.hybrid_searcher.search(query).await
315    }
316
317    /// Delete a vector record by UUID.
318    #[instrument(skip(self))]
319    pub async fn delete(&self, collection: &str, id: uuid::Uuid) -> VectorResult<bool> {
320        self.collections
321            .delete_vector(&self.config.default_workspace_id, collection, id)
322            .await
323    }
324
325    /// Delete a vector by UUID from a workspace-scoped collection.
326    #[instrument(skip(self))]
327    pub async fn delete_in_workspace(
328        &self,
329        workspace_id: &str,
330        collection: &str,
331        id: uuid::Uuid,
332    ) -> VectorResult<bool> {
333        self.collections.delete_vector(workspace_id, collection, id).await
334    }
335
336    /// Fetch a vector record by UUID.
337    #[instrument(skip(self))]
338    pub async fn get(&self, collection: &str, id: uuid::Uuid) -> VectorResult<VectorRecord> {
339        self.collections
340            .get_vector(&self.config.default_workspace_id, collection, id)
341            .await
342    }
343
344    /// Fetch a vector by UUID from a workspace-scoped collection.
345    #[instrument(skip(self))]
346    pub async fn get_in_workspace(
347        &self,
348        workspace_id: &str,
349        collection: &str,
350        id: uuid::Uuid,
351    ) -> VectorResult<VectorRecord> {
352        self.collections.get_vector(workspace_id, collection, id).await
353    }
354
355    /// Persist indexes and close the underlying store.
356    #[instrument(skip(self))]
357    pub async fn close(&self) -> VectorResult<()> {
358        self.collections.persist_indexes().await?;
359        self.collections.store.close().await;
360        Ok(())
361    }
362
363    /// Return runtime statistics for the engine.
364    #[instrument(skip(self))]
365    pub async fn stats(&self) -> EngineStats {
366        let collections = self
367            .collections
368            .list_collections(&self.config.default_workspace_id)
369            .await
370            .unwrap_or_default();
371        let cache_stats = self
372            .embedding_client
373            .cache_stats()
374            .await
375            .unwrap_or_default();
376
377        EngineStats {
378            collection_count: collections.len(),
379            total_vectors: collections
380                .iter()
381                .map(|collection| collection.vector_count)
382                .sum(),
383            loaded_indexes: self.collections.loaded_index_count().await,
384            loaded_mmap_files: self.collections.loaded_mmap_count().await,
385            embedding_cache_hits: cache_stats.hit_count,
386            embedding_cache_misses: cache_stats.miss_count,
387        }
388    }
389}