Skip to main content

claw_vector/
engine.rs

1// engine.rs — VectorEngine: public entry point that unifies all subsystems.
2use std::sync::Arc;
3
4use tracing::instrument;
5
6use crate::{
7    collections::CollectionManager,
8    config::VectorConfig,
9    embeddings::{EmbeddingClient, EmbeddingProvider},
10    error::VectorResult,
11    search::{AnnSearcher, HybridSearcher},
12    store::VectorStore,
13    types::{
14        Collection, DistanceMetric, EngineStats, HybridQuery, SearchQuery, SearchResponse,
15        VectorRecord,
16    },
17};
18
19/// High-level engine that manages collections, search, and embeddings.
20pub struct VectorEngine {
21    /// Runtime configuration.
22    pub config: VectorConfig,
23    /// Collection lifecycle and persistence manager.
24    pub collections: Arc<CollectionManager>,
25    /// ANN search service.
26    pub ann_searcher: Arc<AnnSearcher>,
27    /// Hybrid vector + keyword search service.
28    pub hybrid_searcher: Arc<HybridSearcher>,
29    /// Embedding provider used for text ingestion and search.
30    pub embedding_client: Arc<dyn EmbeddingProvider>,
31}
32
33impl VectorEngine {
34    /// Create a new engine using the configured gRPC embedding service.
35    #[instrument]
36    pub async fn new(config: VectorConfig) -> VectorResult<Self> {
37        let embedding_client =
38            Arc::new(EmbeddingClient::new(&config).await?) as Arc<dyn EmbeddingProvider>;
39        Self::with_embedding_provider(config, embedding_client).await
40    }
41
42    /// Open an engine using the default configuration.
43    #[instrument]
44    pub async fn open_default() -> VectorResult<Self> {
45        Self::new(VectorConfig::default()).await
46    }
47
48    /// Create a new engine with a caller-supplied embedding provider.
49    #[instrument(skip(embedding_client))]
50    pub async fn with_embedding_provider(
51        config: VectorConfig,
52        embedding_client: Arc<dyn EmbeddingProvider>,
53    ) -> VectorResult<Self> {
54        let store = Arc::new(VectorStore::new(&config.db_path).await?);
55        let collections =
56            Arc::new(CollectionManager::new(config.clone(), Arc::clone(&store)).await?);
57        let ann_searcher = Arc::new(AnnSearcher::new(Arc::clone(&collections)));
58        let hybrid_searcher = Arc::new(HybridSearcher::new(
59            Arc::clone(&ann_searcher),
60            Arc::clone(&store),
61        ));
62
63        Ok(VectorEngine {
64            config,
65            collections,
66            ann_searcher,
67            hybrid_searcher,
68            embedding_client,
69        })
70    }
71
72    /// Create a collection with the provided dimensions and distance metric.
73    #[instrument(skip(self))]
74    pub async fn create_collection(
75        &self,
76        name: &str,
77        dimensions: usize,
78        distance: DistanceMetric,
79    ) -> VectorResult<Collection> {
80        self.collections
81            .create_collection(
82                &self.config.default_workspace_id,
83                name,
84                dimensions,
85                distance,
86            )
87            .await
88    }
89
90    /// Create a collection inside a specific workspace.
91    #[instrument(skip(self))]
92    pub async fn create_collection_in_workspace(
93        &self,
94        workspace_id: &str,
95        name: &str,
96        dimensions: usize,
97        distance: DistanceMetric,
98    ) -> VectorResult<Collection> {
99        self.collections
100            .create_collection(workspace_id, name, dimensions, distance)
101            .await
102    }
103
104    /// Delete a collection and all of its persisted state.
105    #[instrument(skip(self))]
106    pub async fn delete_collection(&self, name: &str) -> VectorResult<()> {
107        self.collections
108            .delete_collection(&self.config.default_workspace_id, name)
109            .await
110    }
111
112    /// Delete a collection inside a specific workspace.
113    #[instrument(skip(self))]
114    pub async fn delete_collection_in_workspace(
115        &self,
116        workspace_id: &str,
117        name: &str,
118    ) -> VectorResult<()> {
119        self.collections.delete_collection(workspace_id, name).await
120    }
121
122    /// List all collections.
123    #[instrument(skip(self))]
124    pub async fn list_collections(&self) -> VectorResult<Vec<Collection>> {
125        self.collections
126            .list_collections(&self.config.default_workspace_id)
127            .await
128    }
129
130    /// List collections scoped to a workspace.
131    #[instrument(skip(self))]
132    pub async fn list_collections_in_workspace(
133        &self,
134        workspace_id: &str,
135    ) -> VectorResult<Vec<Collection>> {
136        self.collections.list_collections(workspace_id).await
137    }
138
139    /// Embed text, persist the record, and return its UUID.
140    #[instrument(skip(self, text, metadata))]
141    pub async fn upsert(
142        &self,
143        collection: &str,
144        text: &str,
145        metadata: serde_json::Value,
146    ) -> VectorResult<uuid::Uuid> {
147        let vector = self.embedding_client.embed_one(text).await?;
148        let record = VectorRecord::new(collection, vector)
149            .with_text(text.to_string())
150            .with_metadata(metadata);
151        self.collections
152            .insert_vector(&self.config.default_workspace_id, record)
153            .await
154    }
155
156    /// Embed and insert a record in a workspace-scoped collection.
157    #[instrument(skip(self, text, metadata))]
158    pub async fn upsert_in_workspace(
159        &self,
160        workspace_id: &str,
161        collection: &str,
162        text: &str,
163        metadata: serde_json::Value,
164    ) -> VectorResult<uuid::Uuid> {
165        let vector = self.embedding_client.embed_one(text).await?;
166        let record = VectorRecord::new(collection, vector)
167            .with_text(text.to_string())
168            .with_metadata(metadata);
169        self.collections.insert_vector(workspace_id, record).await
170    }
171
172    /// Embed and insert multiple text records.
173    #[instrument(skip(self, items))]
174    pub async fn upsert_batch(
175        &self,
176        collection: &str,
177        items: Vec<(String, serde_json::Value)>,
178    ) -> VectorResult<Vec<uuid::Uuid>> {
179        let texts = items
180            .iter()
181            .map(|(text, _)| text.clone())
182            .collect::<Vec<_>>();
183        let embeddings = self.embedding_client.embed(texts).await?;
184        let records = items
185            .into_iter()
186            .zip(embeddings.into_iter())
187            .map(|((text, metadata), vector)| {
188                VectorRecord::new(collection, vector)
189                    .with_text(text)
190                    .with_metadata(metadata)
191            })
192            .collect::<Vec<_>>();
193        self.collections
194            .insert_batch(&self.config.default_workspace_id, records)
195            .await
196    }
197
198    /// Embed and insert multiple records in a workspace-scoped collection.
199    #[instrument(skip(self, items))]
200    pub async fn upsert_batch_in_workspace(
201        &self,
202        workspace_id: &str,
203        collection: &str,
204        items: Vec<(String, serde_json::Value)>,
205    ) -> VectorResult<Vec<uuid::Uuid>> {
206        let texts = items
207            .iter()
208            .map(|(text, _)| text.clone())
209            .collect::<Vec<_>>();
210        let embeddings = self.embedding_client.embed(texts).await?;
211        let records = items
212            .into_iter()
213            .zip(embeddings.into_iter())
214            .map(|((text, metadata), vector)| {
215                VectorRecord::new(collection, vector)
216                    .with_text(text)
217                    .with_metadata(metadata)
218            })
219            .collect::<Vec<_>>();
220        self.collections.insert_batch(workspace_id, records).await
221    }
222
223    /// Insert a raw vector directly.
224    #[instrument(skip(self, vector, metadata))]
225    pub async fn upsert_vector(
226        &self,
227        collection: &str,
228        vector: Vec<f32>,
229        metadata: serde_json::Value,
230    ) -> VectorResult<uuid::Uuid> {
231        let record = VectorRecord::new(collection, vector).with_metadata(metadata);
232        self.collections
233            .insert_vector(&self.config.default_workspace_id, record)
234            .await
235    }
236
237    /// Insert a raw vector directly into a workspace-scoped collection.
238    #[instrument(skip(self, vector, metadata))]
239    pub async fn upsert_vector_in_workspace(
240        &self,
241        workspace_id: &str,
242        collection: &str,
243        vector: Vec<f32>,
244        metadata: serde_json::Value,
245    ) -> VectorResult<uuid::Uuid> {
246        let record = VectorRecord::new(collection, vector).with_metadata(metadata);
247        self.collections.insert_vector(workspace_id, record).await
248    }
249
250    /// Execute ANN search.
251    #[instrument(skip(self, query))]
252    pub async fn search(&self, query: SearchQuery) -> VectorResult<SearchResponse> {
253        self.ann_searcher.search(query).await
254    }
255
256    /// Execute ANN search scoped to a workspace.
257    #[instrument(skip(self, query))]
258    pub async fn search_in_workspace(
259        &self,
260        workspace_id: &str,
261        query: SearchQuery,
262    ) -> VectorResult<SearchResponse> {
263        self.ann_searcher
264            .search_in_workspace(workspace_id, query)
265            .await
266    }
267
268    /// Execute ANN search from raw text.
269    #[instrument(skip(self, text))]
270    pub async fn search_text(
271        &self,
272        collection: &str,
273        text: &str,
274        top_k: usize,
275    ) -> VectorResult<SearchResponse> {
276        let vector = self.embedding_client.embed_one(text).await?;
277        self.ann_searcher
278            .search(SearchQuery {
279                collection: collection.to_string(),
280                vector,
281                top_k,
282                filter: None,
283                include_vectors: false,
284                include_metadata: true,
285                ef_search: None,
286                reranker: None,
287            })
288            .await
289    }
290
291    /// Execute ANN search from raw text scoped to a workspace.
292    #[instrument(skip(self, text))]
293    pub async fn search_text_in_workspace(
294        &self,
295        workspace_id: &str,
296        collection: &str,
297        text: &str,
298        top_k: usize,
299    ) -> VectorResult<SearchResponse> {
300        let vector = self.embedding_client.embed_one(text).await?;
301        self.ann_searcher
302            .search_in_workspace(
303                workspace_id,
304                SearchQuery {
305                    collection: collection.to_string(),
306                    vector,
307                    top_k,
308                    filter: None,
309                    include_vectors: false,
310                    include_metadata: true,
311                    ef_search: None,
312                    reranker: None,
313                },
314            )
315            .await
316    }
317
318    /// Execute hybrid search.
319    #[instrument(skip(self, query))]
320    pub async fn hybrid_search(&self, query: HybridQuery) -> VectorResult<SearchResponse> {
321        self.hybrid_searcher.search(query).await
322    }
323
324    /// Delete a vector record by UUID.
325    #[instrument(skip(self))]
326    pub async fn delete(&self, collection: &str, id: uuid::Uuid) -> VectorResult<bool> {
327        self.collections
328            .delete_vector(&self.config.default_workspace_id, collection, id)
329            .await
330    }
331
332    /// Delete a vector by UUID from a workspace-scoped collection.
333    #[instrument(skip(self))]
334    pub async fn delete_in_workspace(
335        &self,
336        workspace_id: &str,
337        collection: &str,
338        id: uuid::Uuid,
339    ) -> VectorResult<bool> {
340        self.collections
341            .delete_vector(workspace_id, collection, id)
342            .await
343    }
344
345    /// Fetch a vector record by UUID.
346    #[instrument(skip(self))]
347    pub async fn get(&self, collection: &str, id: uuid::Uuid) -> VectorResult<VectorRecord> {
348        self.collections
349            .get_vector(&self.config.default_workspace_id, collection, id)
350            .await
351    }
352
353    /// Fetch a vector by UUID from a workspace-scoped collection.
354    #[instrument(skip(self))]
355    pub async fn get_in_workspace(
356        &self,
357        workspace_id: &str,
358        collection: &str,
359        id: uuid::Uuid,
360    ) -> VectorResult<VectorRecord> {
361        self.collections
362            .get_vector(workspace_id, collection, id)
363            .await
364    }
365
366    /// Persist indexes and close the underlying store.
367    #[instrument(skip(self))]
368    pub async fn close(&self) -> VectorResult<()> {
369        self.collections.persist_indexes().await?;
370        self.collections.store.close().await;
371        Ok(())
372    }
373
374    /// Return runtime statistics for the engine.
375    #[instrument(skip(self))]
376    pub async fn stats(&self) -> EngineStats {
377        let collections = self
378            .collections
379            .list_collections(&self.config.default_workspace_id)
380            .await
381            .unwrap_or_default();
382        let cache_stats = self
383            .embedding_client
384            .cache_stats()
385            .await
386            .unwrap_or_default();
387
388        EngineStats {
389            collection_count: collections.len(),
390            total_vectors: collections
391                .iter()
392                .map(|collection| collection.vector_count)
393                .sum(),
394            loaded_indexes: self.collections.loaded_index_count().await,
395            loaded_mmap_files: self.collections.loaded_mmap_count().await,
396            embedding_cache_hits: cache_stats.hit_count,
397            embedding_cache_misses: cache_stats.miss_count,
398        }
399    }
400}