1use std::sync::Arc;
3
4use tracing::instrument;
5
6use crate::{
7 collections::CollectionManager,
8 config::VectorConfig,
9 embeddings::{EmbeddingClient, EmbeddingProvider},
10 error::VectorResult,
11 search::{AnnSearcher, HybridSearcher},
12 store::VectorStore,
13 types::{
14 Collection, DistanceMetric, EngineStats, HybridQuery, SearchQuery, SearchResponse,
15 VectorRecord,
16 },
17};
18
19pub struct VectorEngine {
21 pub config: VectorConfig,
23 pub collections: Arc<CollectionManager>,
25 pub ann_searcher: Arc<AnnSearcher>,
27 pub hybrid_searcher: Arc<HybridSearcher>,
29 pub embedding_client: Arc<dyn EmbeddingProvider>,
31}
32
33impl VectorEngine {
34 fn ensure_default_workspace_allowed(&self, operation: &str) -> VectorResult<()> {
35 if self.config.require_workspace_id {
36 return Err(crate::error::VectorError::Config(format!(
37 "{operation} requires explicit workspace_id"
38 )));
39 }
40 Ok(())
41 }
42
43 #[instrument]
45 pub async fn new(config: VectorConfig) -> VectorResult<Self> {
46 let embedding_client =
47 Arc::new(EmbeddingClient::new(&config).await?) as Arc<dyn EmbeddingProvider>;
48 Self::with_embedding_provider(config, embedding_client).await
49 }
50
51 #[instrument]
53 pub async fn open_default() -> VectorResult<Self> {
54 Self::new(VectorConfig::default()).await
55 }
56
57 #[instrument(skip(embedding_client))]
59 pub async fn with_embedding_provider(
60 config: VectorConfig,
61 embedding_client: Arc<dyn EmbeddingProvider>,
62 ) -> VectorResult<Self> {
63 let store = Arc::new(VectorStore::new(&config.db_path).await?);
64 let collections =
65 Arc::new(CollectionManager::new(config.clone(), Arc::clone(&store)).await?);
66 let ann_searcher = Arc::new(AnnSearcher::new(Arc::clone(&collections)));
67 let hybrid_searcher = Arc::new(HybridSearcher::new(
68 Arc::clone(&ann_searcher),
69 Arc::clone(&store),
70 ));
71
72 Ok(VectorEngine {
73 config,
74 collections,
75 ann_searcher,
76 hybrid_searcher,
77 embedding_client,
78 })
79 }
80
81 #[instrument(skip(self))]
83 pub async fn create_collection(
84 &self,
85 name: &str,
86 dimensions: usize,
87 distance: DistanceMetric,
88 ) -> VectorResult<Collection> {
89 self.ensure_default_workspace_allowed("create_collection")?;
90 self.collections
91 .create_collection(
92 &self.config.default_workspace_id,
93 name,
94 dimensions,
95 distance,
96 )
97 .await
98 }
99
100 #[instrument(skip(self))]
102 pub async fn create_collection_in_workspace(
103 &self,
104 workspace_id: &str,
105 name: &str,
106 dimensions: usize,
107 distance: DistanceMetric,
108 ) -> VectorResult<Collection> {
109 self.collections
110 .create_collection(workspace_id, name, dimensions, distance)
111 .await
112 }
113
114 #[instrument(skip(self))]
116 pub async fn delete_collection(&self, name: &str) -> VectorResult<()> {
117 self.ensure_default_workspace_allowed("delete_collection")?;
118 self.collections
119 .delete_collection(&self.config.default_workspace_id, name)
120 .await
121 }
122
123 #[instrument(skip(self))]
125 pub async fn delete_collection_in_workspace(
126 &self,
127 workspace_id: &str,
128 name: &str,
129 ) -> VectorResult<()> {
130 self.collections.delete_collection(workspace_id, name).await
131 }
132
133 #[instrument(skip(self))]
135 pub async fn list_collections(&self) -> VectorResult<Vec<Collection>> {
136 self.ensure_default_workspace_allowed("list_collections")?;
137 self.collections
138 .list_collections(&self.config.default_workspace_id)
139 .await
140 }
141
142 #[instrument(skip(self))]
144 pub async fn list_collections_in_workspace(
145 &self,
146 workspace_id: &str,
147 ) -> VectorResult<Vec<Collection>> {
148 self.collections.list_collections(workspace_id).await
149 }
150
151 #[instrument(skip(self, text, metadata))]
153 pub async fn upsert(
154 &self,
155 collection: &str,
156 text: &str,
157 metadata: serde_json::Value,
158 ) -> VectorResult<uuid::Uuid> {
159 self.ensure_default_workspace_allowed("upsert")?;
160 let vector = self.embedding_client.embed_one(text).await?;
161 let record = VectorRecord::new(collection, vector)
162 .with_text(text.to_string())
163 .with_metadata(metadata);
164 self.collections
165 .insert_vector(&self.config.default_workspace_id, record)
166 .await
167 }
168
169 #[instrument(skip(self, text, metadata))]
171 pub async fn upsert_in_workspace(
172 &self,
173 workspace_id: &str,
174 collection: &str,
175 text: &str,
176 metadata: serde_json::Value,
177 ) -> VectorResult<uuid::Uuid> {
178 let vector = self.embedding_client.embed_one(text).await?;
179 let record = VectorRecord::new(collection, vector)
180 .with_text(text.to_string())
181 .with_metadata(metadata);
182 self.collections.insert_vector(workspace_id, record).await
183 }
184
185 #[instrument(skip(self, items))]
187 pub async fn upsert_batch(
188 &self,
189 collection: &str,
190 items: Vec<(String, serde_json::Value)>,
191 ) -> VectorResult<Vec<uuid::Uuid>> {
192 self.ensure_default_workspace_allowed("upsert_batch")?;
193 let texts = items
194 .iter()
195 .map(|(text, _)| text.clone())
196 .collect::<Vec<_>>();
197 let embeddings = self.embedding_client.embed(texts).await?;
198 let records = items
199 .into_iter()
200 .zip(embeddings.into_iter())
201 .map(|((text, metadata), vector)| {
202 VectorRecord::new(collection, vector)
203 .with_text(text)
204 .with_metadata(metadata)
205 })
206 .collect::<Vec<_>>();
207 self.collections
208 .insert_batch(&self.config.default_workspace_id, records)
209 .await
210 }
211
212 #[instrument(skip(self, items))]
214 pub async fn upsert_batch_in_workspace(
215 &self,
216 workspace_id: &str,
217 collection: &str,
218 items: Vec<(String, serde_json::Value)>,
219 ) -> VectorResult<Vec<uuid::Uuid>> {
220 let texts = items
221 .iter()
222 .map(|(text, _)| text.clone())
223 .collect::<Vec<_>>();
224 let embeddings = self.embedding_client.embed(texts).await?;
225 let records = items
226 .into_iter()
227 .zip(embeddings.into_iter())
228 .map(|((text, metadata), vector)| {
229 VectorRecord::new(collection, vector)
230 .with_text(text)
231 .with_metadata(metadata)
232 })
233 .collect::<Vec<_>>();
234 self.collections.insert_batch(workspace_id, records).await
235 }
236
237 #[instrument(skip(self, vector, metadata))]
239 pub async fn upsert_vector(
240 &self,
241 collection: &str,
242 vector: Vec<f32>,
243 metadata: serde_json::Value,
244 ) -> VectorResult<uuid::Uuid> {
245 self.ensure_default_workspace_allowed("upsert_vector")?;
246 let record = VectorRecord::new(collection, vector).with_metadata(metadata);
247 self.collections
248 .insert_vector(&self.config.default_workspace_id, record)
249 .await
250 }
251
252 #[instrument(skip(self, vector, metadata))]
254 pub async fn upsert_vector_in_workspace(
255 &self,
256 workspace_id: &str,
257 collection: &str,
258 vector: Vec<f32>,
259 metadata: serde_json::Value,
260 ) -> VectorResult<uuid::Uuid> {
261 let record = VectorRecord::new(collection, vector).with_metadata(metadata);
262 self.collections.insert_vector(workspace_id, record).await
263 }
264
265 #[instrument(skip(self, query))]
267 pub async fn search(&self, query: SearchQuery) -> VectorResult<SearchResponse> {
268 self.ensure_default_workspace_allowed("search")?;
269 self.ann_searcher.search(query).await
270 }
271
272 #[instrument(skip(self, query))]
274 pub async fn search_in_workspace(
275 &self,
276 workspace_id: &str,
277 query: SearchQuery,
278 ) -> VectorResult<SearchResponse> {
279 self.ann_searcher
280 .search_in_workspace(workspace_id, query)
281 .await
282 }
283
284 #[instrument(skip(self, text))]
286 pub async fn search_text(
287 &self,
288 collection: &str,
289 text: &str,
290 top_k: usize,
291 ) -> VectorResult<SearchResponse> {
292 self.ensure_default_workspace_allowed("search_text")?;
293 let vector = self.embedding_client.embed_one(text).await?;
294 self.ann_searcher
295 .search(SearchQuery {
296 collection: collection.to_string(),
297 vector,
298 top_k,
299 filter: None,
300 include_vectors: false,
301 include_metadata: true,
302 ef_search: None,
303 reranker: None,
304 })
305 .await
306 }
307
308 #[instrument(skip(self, text))]
310 pub async fn search_text_in_workspace(
311 &self,
312 workspace_id: &str,
313 collection: &str,
314 text: &str,
315 top_k: usize,
316 ) -> VectorResult<SearchResponse> {
317 let vector = self.embedding_client.embed_one(text).await?;
318 self.ann_searcher
319 .search_in_workspace(
320 workspace_id,
321 SearchQuery {
322 collection: collection.to_string(),
323 vector,
324 top_k,
325 filter: None,
326 include_vectors: false,
327 include_metadata: true,
328 ef_search: None,
329 reranker: None,
330 },
331 )
332 .await
333 }
334
335 #[instrument(skip(self, query))]
337 pub async fn hybrid_search(&self, query: HybridQuery) -> VectorResult<SearchResponse> {
338 self.ensure_default_workspace_allowed("hybrid_search")?;
339 self.hybrid_searcher.search(query).await
340 }
341
342 #[instrument(skip(self))]
344 pub async fn delete(&self, collection: &str, id: uuid::Uuid) -> VectorResult<bool> {
345 self.ensure_default_workspace_allowed("delete")?;
346 self.collections
347 .delete_vector(&self.config.default_workspace_id, collection, id)
348 .await
349 }
350
351 #[instrument(skip(self))]
353 pub async fn delete_in_workspace(
354 &self,
355 workspace_id: &str,
356 collection: &str,
357 id: uuid::Uuid,
358 ) -> VectorResult<bool> {
359 self.collections
360 .delete_vector(workspace_id, collection, id)
361 .await
362 }
363
364 #[instrument(skip(self))]
366 pub async fn get(&self, collection: &str, id: uuid::Uuid) -> VectorResult<VectorRecord> {
367 self.ensure_default_workspace_allowed("get")?;
368 self.collections
369 .get_vector(&self.config.default_workspace_id, collection, id)
370 .await
371 }
372
373 #[instrument(skip(self))]
375 pub async fn get_in_workspace(
376 &self,
377 workspace_id: &str,
378 collection: &str,
379 id: uuid::Uuid,
380 ) -> VectorResult<VectorRecord> {
381 self.collections
382 .get_vector(workspace_id, collection, id)
383 .await
384 }
385
386 #[instrument(skip(self))]
388 pub async fn close(&self) -> VectorResult<()> {
389 self.collections.persist_indexes().await?;
390 self.collections.store.close().await;
391 Ok(())
392 }
393
394 #[instrument(skip(self))]
396 pub async fn stats(&self) -> EngineStats {
397 let collections = self
398 .collections
399 .list_collections(&self.config.default_workspace_id)
400 .await
401 .unwrap_or_default();
402 let cache_stats = self
403 .embedding_client
404 .cache_stats()
405 .await
406 .unwrap_or_default();
407
408 EngineStats {
409 collection_count: collections.len(),
410 total_vectors: collections
411 .iter()
412 .map(|collection| collection.vector_count)
413 .sum(),
414 loaded_indexes: self.collections.loaded_index_count().await,
415 loaded_mmap_files: self.collections.loaded_mmap_count().await,
416 embedding_cache_hits: cache_stats.hit_count,
417 embedding_cache_misses: cache_stats.miss_count,
418 }
419 }
420}