1use std::sync::Arc;
3
4use tracing::instrument;
5
6use crate::{
7 collections::CollectionManager,
8 config::VectorConfig,
9 embeddings::{EmbeddingClient, EmbeddingProvider},
10 error::VectorResult,
11 search::{AnnSearcher, HybridSearcher},
12 store::VectorStore,
13 types::{
14 Collection, DistanceMetric, EngineStats, HybridQuery, SearchQuery, SearchResponse,
15 VectorRecord,
16 },
17};
18
19pub struct VectorEngine {
21 pub config: VectorConfig,
23 pub collections: Arc<CollectionManager>,
25 pub ann_searcher: Arc<AnnSearcher>,
27 pub hybrid_searcher: Arc<HybridSearcher>,
29 pub embedding_client: Arc<dyn EmbeddingProvider>,
31}
32
33impl VectorEngine {
34 #[instrument]
36 pub async fn new(config: VectorConfig) -> VectorResult<Self> {
37 let embedding_client =
38 Arc::new(EmbeddingClient::new(&config).await?) as Arc<dyn EmbeddingProvider>;
39 Self::with_embedding_provider(config, embedding_client).await
40 }
41
42 #[instrument]
44 pub async fn open_default() -> VectorResult<Self> {
45 Self::new(VectorConfig::default()).await
46 }
47
48 #[instrument(skip(embedding_client))]
50 pub async fn with_embedding_provider(
51 config: VectorConfig,
52 embedding_client: Arc<dyn EmbeddingProvider>,
53 ) -> VectorResult<Self> {
54 let store = Arc::new(VectorStore::new(&config.db_path).await?);
55 let collections =
56 Arc::new(CollectionManager::new(config.clone(), Arc::clone(&store)).await?);
57 let ann_searcher = Arc::new(AnnSearcher::new(Arc::clone(&collections)));
58 let hybrid_searcher = Arc::new(HybridSearcher::new(
59 Arc::clone(&ann_searcher),
60 Arc::clone(&store),
61 ));
62
63 Ok(VectorEngine {
64 config,
65 collections,
66 ann_searcher,
67 hybrid_searcher,
68 embedding_client,
69 })
70 }
71
72 #[instrument(skip(self))]
74 pub async fn create_collection(
75 &self,
76 name: &str,
77 dimensions: usize,
78 distance: DistanceMetric,
79 ) -> VectorResult<Collection> {
80 self.collections
81 .create_collection(&self.config.default_workspace_id, name, dimensions, distance)
82 .await
83 }
84
85 #[instrument(skip(self))]
87 pub async fn create_collection_in_workspace(
88 &self,
89 workspace_id: &str,
90 name: &str,
91 dimensions: usize,
92 distance: DistanceMetric,
93 ) -> VectorResult<Collection> {
94 self.collections
95 .create_collection(workspace_id, name, dimensions, distance)
96 .await
97 }
98
99 #[instrument(skip(self))]
101 pub async fn delete_collection(&self, name: &str) -> VectorResult<()> {
102 self.collections
103 .delete_collection(&self.config.default_workspace_id, name)
104 .await
105 }
106
107 #[instrument(skip(self))]
109 pub async fn delete_collection_in_workspace(
110 &self,
111 workspace_id: &str,
112 name: &str,
113 ) -> VectorResult<()> {
114 self.collections.delete_collection(workspace_id, name).await
115 }
116
117 #[instrument(skip(self))]
119 pub async fn list_collections(&self) -> VectorResult<Vec<Collection>> {
120 self.collections
121 .list_collections(&self.config.default_workspace_id)
122 .await
123 }
124
125 #[instrument(skip(self))]
127 pub async fn list_collections_in_workspace(
128 &self,
129 workspace_id: &str,
130 ) -> VectorResult<Vec<Collection>> {
131 self.collections.list_collections(workspace_id).await
132 }
133
134 #[instrument(skip(self, text, metadata))]
136 pub async fn upsert(
137 &self,
138 collection: &str,
139 text: &str,
140 metadata: serde_json::Value,
141 ) -> VectorResult<uuid::Uuid> {
142 let vector = self.embedding_client.embed_one(text).await?;
143 let record = VectorRecord::new(collection, vector)
144 .with_text(text.to_string())
145 .with_metadata(metadata);
146 self.collections
147 .insert_vector(&self.config.default_workspace_id, record)
148 .await
149 }
150
151 #[instrument(skip(self, text, metadata))]
153 pub async fn upsert_in_workspace(
154 &self,
155 workspace_id: &str,
156 collection: &str,
157 text: &str,
158 metadata: serde_json::Value,
159 ) -> VectorResult<uuid::Uuid> {
160 let vector = self.embedding_client.embed_one(text).await?;
161 let record = VectorRecord::new(collection, vector)
162 .with_text(text.to_string())
163 .with_metadata(metadata);
164 self.collections.insert_vector(workspace_id, record).await
165 }
166
167 #[instrument(skip(self, items))]
169 pub async fn upsert_batch(
170 &self,
171 collection: &str,
172 items: Vec<(String, serde_json::Value)>,
173 ) -> VectorResult<Vec<uuid::Uuid>> {
174 let texts = items
175 .iter()
176 .map(|(text, _)| text.clone())
177 .collect::<Vec<_>>();
178 let embeddings = self.embedding_client.embed(texts).await?;
179 let records = items
180 .into_iter()
181 .zip(embeddings.into_iter())
182 .map(|((text, metadata), vector)| {
183 VectorRecord::new(collection, vector)
184 .with_text(text)
185 .with_metadata(metadata)
186 })
187 .collect::<Vec<_>>();
188 self.collections
189 .insert_batch(&self.config.default_workspace_id, records)
190 .await
191 }
192
193 #[instrument(skip(self, items))]
195 pub async fn upsert_batch_in_workspace(
196 &self,
197 workspace_id: &str,
198 collection: &str,
199 items: Vec<(String, serde_json::Value)>,
200 ) -> VectorResult<Vec<uuid::Uuid>> {
201 let texts = items
202 .iter()
203 .map(|(text, _)| text.clone())
204 .collect::<Vec<_>>();
205 let embeddings = self.embedding_client.embed(texts).await?;
206 let records = items
207 .into_iter()
208 .zip(embeddings.into_iter())
209 .map(|((text, metadata), vector)| {
210 VectorRecord::new(collection, vector)
211 .with_text(text)
212 .with_metadata(metadata)
213 })
214 .collect::<Vec<_>>();
215 self.collections.insert_batch(workspace_id, records).await
216 }
217
218 #[instrument(skip(self, vector, metadata))]
220 pub async fn upsert_vector(
221 &self,
222 collection: &str,
223 vector: Vec<f32>,
224 metadata: serde_json::Value,
225 ) -> VectorResult<uuid::Uuid> {
226 let record = VectorRecord::new(collection, vector).with_metadata(metadata);
227 self.collections
228 .insert_vector(&self.config.default_workspace_id, record)
229 .await
230 }
231
232 #[instrument(skip(self, vector, metadata))]
234 pub async fn upsert_vector_in_workspace(
235 &self,
236 workspace_id: &str,
237 collection: &str,
238 vector: Vec<f32>,
239 metadata: serde_json::Value,
240 ) -> VectorResult<uuid::Uuid> {
241 let record = VectorRecord::new(collection, vector).with_metadata(metadata);
242 self.collections.insert_vector(workspace_id, record).await
243 }
244
245 #[instrument(skip(self, query))]
247 pub async fn search(&self, query: SearchQuery) -> VectorResult<SearchResponse> {
248 self.ann_searcher.search(query).await
249 }
250
251 #[instrument(skip(self, query))]
253 pub async fn search_in_workspace(
254 &self,
255 workspace_id: &str,
256 query: SearchQuery,
257 ) -> VectorResult<SearchResponse> {
258 self.ann_searcher.search_in_workspace(workspace_id, query).await
259 }
260
261 #[instrument(skip(self, text))]
263 pub async fn search_text(
264 &self,
265 collection: &str,
266 text: &str,
267 top_k: usize,
268 ) -> VectorResult<SearchResponse> {
269 let vector = self.embedding_client.embed_one(text).await?;
270 self.ann_searcher
271 .search(SearchQuery {
272 collection: collection.to_string(),
273 vector,
274 top_k,
275 filter: None,
276 include_vectors: false,
277 include_metadata: true,
278 ef_search: None,
279 reranker: None,
280 })
281 .await
282 }
283
284 #[instrument(skip(self, text))]
286 pub async fn search_text_in_workspace(
287 &self,
288 workspace_id: &str,
289 collection: &str,
290 text: &str,
291 top_k: usize,
292 ) -> VectorResult<SearchResponse> {
293 let vector = self.embedding_client.embed_one(text).await?;
294 self.ann_searcher
295 .search_in_workspace(
296 workspace_id,
297 SearchQuery {
298 collection: collection.to_string(),
299 vector,
300 top_k,
301 filter: None,
302 include_vectors: false,
303 include_metadata: true,
304 ef_search: None,
305 reranker: None,
306 },
307 )
308 .await
309 }
310
311 #[instrument(skip(self, query))]
313 pub async fn hybrid_search(&self, query: HybridQuery) -> VectorResult<SearchResponse> {
314 self.hybrid_searcher.search(query).await
315 }
316
317 #[instrument(skip(self))]
319 pub async fn delete(&self, collection: &str, id: uuid::Uuid) -> VectorResult<bool> {
320 self.collections
321 .delete_vector(&self.config.default_workspace_id, collection, id)
322 .await
323 }
324
325 #[instrument(skip(self))]
327 pub async fn delete_in_workspace(
328 &self,
329 workspace_id: &str,
330 collection: &str,
331 id: uuid::Uuid,
332 ) -> VectorResult<bool> {
333 self.collections.delete_vector(workspace_id, collection, id).await
334 }
335
336 #[instrument(skip(self))]
338 pub async fn get(&self, collection: &str, id: uuid::Uuid) -> VectorResult<VectorRecord> {
339 self.collections
340 .get_vector(&self.config.default_workspace_id, collection, id)
341 .await
342 }
343
344 #[instrument(skip(self))]
346 pub async fn get_in_workspace(
347 &self,
348 workspace_id: &str,
349 collection: &str,
350 id: uuid::Uuid,
351 ) -> VectorResult<VectorRecord> {
352 self.collections.get_vector(workspace_id, collection, id).await
353 }
354
355 #[instrument(skip(self))]
357 pub async fn close(&self) -> VectorResult<()> {
358 self.collections.persist_indexes().await?;
359 self.collections.store.close().await;
360 Ok(())
361 }
362
363 #[instrument(skip(self))]
365 pub async fn stats(&self) -> EngineStats {
366 let collections = self
367 .collections
368 .list_collections(&self.config.default_workspace_id)
369 .await
370 .unwrap_or_default();
371 let cache_stats = self
372 .embedding_client
373 .cache_stats()
374 .await
375 .unwrap_or_default();
376
377 EngineStats {
378 collection_count: collections.len(),
379 total_vectors: collections
380 .iter()
381 .map(|collection| collection.vector_count)
382 .sum(),
383 loaded_indexes: self.collections.loaded_index_count().await,
384 loaded_mmap_files: self.collections.loaded_mmap_count().await,
385 embedding_cache_hits: cache_stats.hit_count,
386 embedding_cache_misses: cache_stats.miss_count,
387 }
388 }
389}