1use std::sync::Arc;
3
4use tracing::instrument;
5
6use crate::{
7 collections::CollectionManager,
8 config::VectorConfig,
9 embeddings::{EmbeddingClient, EmbeddingProvider},
10 error::VectorResult,
11 search::{AnnSearcher, HybridSearcher},
12 store::VectorStore,
13 types::{
14 Collection, DistanceMetric, EngineStats, HybridQuery, SearchQuery, SearchResponse,
15 VectorRecord,
16 },
17};
18
19pub struct VectorEngine {
21 pub config: VectorConfig,
23 pub collections: Arc<CollectionManager>,
25 pub ann_searcher: Arc<AnnSearcher>,
27 pub hybrid_searcher: Arc<HybridSearcher>,
29 pub embedding_client: Arc<dyn EmbeddingProvider>,
31}
32
33impl VectorEngine {
34 #[instrument]
36 pub async fn new(config: VectorConfig) -> VectorResult<Self> {
37 let embedding_client =
38 Arc::new(EmbeddingClient::new(&config).await?) as Arc<dyn EmbeddingProvider>;
39 Self::with_embedding_provider(config, embedding_client).await
40 }
41
42 #[instrument]
44 pub async fn open_default() -> VectorResult<Self> {
45 Self::new(VectorConfig::default()).await
46 }
47
48 #[instrument(skip(embedding_client))]
50 pub async fn with_embedding_provider(
51 config: VectorConfig,
52 embedding_client: Arc<dyn EmbeddingProvider>,
53 ) -> VectorResult<Self> {
54 let store = Arc::new(VectorStore::new(&config.db_path).await?);
55 let collections =
56 Arc::new(CollectionManager::new(config.clone(), Arc::clone(&store)).await?);
57 let ann_searcher = Arc::new(AnnSearcher::new(Arc::clone(&collections)));
58 let hybrid_searcher = Arc::new(HybridSearcher::new(
59 Arc::clone(&ann_searcher),
60 Arc::clone(&store),
61 ));
62
63 Ok(VectorEngine {
64 config,
65 collections,
66 ann_searcher,
67 hybrid_searcher,
68 embedding_client,
69 })
70 }
71
72 #[instrument(skip(self))]
74 pub async fn create_collection(
75 &self,
76 name: &str,
77 dimensions: usize,
78 distance: DistanceMetric,
79 ) -> VectorResult<Collection> {
80 self.collections
81 .create_collection(
82 &self.config.default_workspace_id,
83 name,
84 dimensions,
85 distance,
86 )
87 .await
88 }
89
90 #[instrument(skip(self))]
92 pub async fn create_collection_in_workspace(
93 &self,
94 workspace_id: &str,
95 name: &str,
96 dimensions: usize,
97 distance: DistanceMetric,
98 ) -> VectorResult<Collection> {
99 self.collections
100 .create_collection(workspace_id, name, dimensions, distance)
101 .await
102 }
103
104 #[instrument(skip(self))]
106 pub async fn delete_collection(&self, name: &str) -> VectorResult<()> {
107 self.collections
108 .delete_collection(&self.config.default_workspace_id, name)
109 .await
110 }
111
112 #[instrument(skip(self))]
114 pub async fn delete_collection_in_workspace(
115 &self,
116 workspace_id: &str,
117 name: &str,
118 ) -> VectorResult<()> {
119 self.collections.delete_collection(workspace_id, name).await
120 }
121
122 #[instrument(skip(self))]
124 pub async fn list_collections(&self) -> VectorResult<Vec<Collection>> {
125 self.collections
126 .list_collections(&self.config.default_workspace_id)
127 .await
128 }
129
130 #[instrument(skip(self))]
132 pub async fn list_collections_in_workspace(
133 &self,
134 workspace_id: &str,
135 ) -> VectorResult<Vec<Collection>> {
136 self.collections.list_collections(workspace_id).await
137 }
138
139 #[instrument(skip(self, text, metadata))]
141 pub async fn upsert(
142 &self,
143 collection: &str,
144 text: &str,
145 metadata: serde_json::Value,
146 ) -> VectorResult<uuid::Uuid> {
147 let vector = self.embedding_client.embed_one(text).await?;
148 let record = VectorRecord::new(collection, vector)
149 .with_text(text.to_string())
150 .with_metadata(metadata);
151 self.collections
152 .insert_vector(&self.config.default_workspace_id, record)
153 .await
154 }
155
156 #[instrument(skip(self, text, metadata))]
158 pub async fn upsert_in_workspace(
159 &self,
160 workspace_id: &str,
161 collection: &str,
162 text: &str,
163 metadata: serde_json::Value,
164 ) -> VectorResult<uuid::Uuid> {
165 let vector = self.embedding_client.embed_one(text).await?;
166 let record = VectorRecord::new(collection, vector)
167 .with_text(text.to_string())
168 .with_metadata(metadata);
169 self.collections.insert_vector(workspace_id, record).await
170 }
171
172 #[instrument(skip(self, items))]
174 pub async fn upsert_batch(
175 &self,
176 collection: &str,
177 items: Vec<(String, serde_json::Value)>,
178 ) -> VectorResult<Vec<uuid::Uuid>> {
179 let texts = items
180 .iter()
181 .map(|(text, _)| text.clone())
182 .collect::<Vec<_>>();
183 let embeddings = self.embedding_client.embed(texts).await?;
184 let records = items
185 .into_iter()
186 .zip(embeddings.into_iter())
187 .map(|((text, metadata), vector)| {
188 VectorRecord::new(collection, vector)
189 .with_text(text)
190 .with_metadata(metadata)
191 })
192 .collect::<Vec<_>>();
193 self.collections
194 .insert_batch(&self.config.default_workspace_id, records)
195 .await
196 }
197
198 #[instrument(skip(self, items))]
200 pub async fn upsert_batch_in_workspace(
201 &self,
202 workspace_id: &str,
203 collection: &str,
204 items: Vec<(String, serde_json::Value)>,
205 ) -> VectorResult<Vec<uuid::Uuid>> {
206 let texts = items
207 .iter()
208 .map(|(text, _)| text.clone())
209 .collect::<Vec<_>>();
210 let embeddings = self.embedding_client.embed(texts).await?;
211 let records = items
212 .into_iter()
213 .zip(embeddings.into_iter())
214 .map(|((text, metadata), vector)| {
215 VectorRecord::new(collection, vector)
216 .with_text(text)
217 .with_metadata(metadata)
218 })
219 .collect::<Vec<_>>();
220 self.collections.insert_batch(workspace_id, records).await
221 }
222
223 #[instrument(skip(self, vector, metadata))]
225 pub async fn upsert_vector(
226 &self,
227 collection: &str,
228 vector: Vec<f32>,
229 metadata: serde_json::Value,
230 ) -> VectorResult<uuid::Uuid> {
231 let record = VectorRecord::new(collection, vector).with_metadata(metadata);
232 self.collections
233 .insert_vector(&self.config.default_workspace_id, record)
234 .await
235 }
236
237 #[instrument(skip(self, vector, metadata))]
239 pub async fn upsert_vector_in_workspace(
240 &self,
241 workspace_id: &str,
242 collection: &str,
243 vector: Vec<f32>,
244 metadata: serde_json::Value,
245 ) -> VectorResult<uuid::Uuid> {
246 let record = VectorRecord::new(collection, vector).with_metadata(metadata);
247 self.collections.insert_vector(workspace_id, record).await
248 }
249
250 #[instrument(skip(self, query))]
252 pub async fn search(&self, query: SearchQuery) -> VectorResult<SearchResponse> {
253 self.ann_searcher.search(query).await
254 }
255
256 #[instrument(skip(self, query))]
258 pub async fn search_in_workspace(
259 &self,
260 workspace_id: &str,
261 query: SearchQuery,
262 ) -> VectorResult<SearchResponse> {
263 self.ann_searcher
264 .search_in_workspace(workspace_id, query)
265 .await
266 }
267
268 #[instrument(skip(self, text))]
270 pub async fn search_text(
271 &self,
272 collection: &str,
273 text: &str,
274 top_k: usize,
275 ) -> VectorResult<SearchResponse> {
276 let vector = self.embedding_client.embed_one(text).await?;
277 self.ann_searcher
278 .search(SearchQuery {
279 collection: collection.to_string(),
280 vector,
281 top_k,
282 filter: None,
283 include_vectors: false,
284 include_metadata: true,
285 ef_search: None,
286 reranker: None,
287 })
288 .await
289 }
290
291 #[instrument(skip(self, text))]
293 pub async fn search_text_in_workspace(
294 &self,
295 workspace_id: &str,
296 collection: &str,
297 text: &str,
298 top_k: usize,
299 ) -> VectorResult<SearchResponse> {
300 let vector = self.embedding_client.embed_one(text).await?;
301 self.ann_searcher
302 .search_in_workspace(
303 workspace_id,
304 SearchQuery {
305 collection: collection.to_string(),
306 vector,
307 top_k,
308 filter: None,
309 include_vectors: false,
310 include_metadata: true,
311 ef_search: None,
312 reranker: None,
313 },
314 )
315 .await
316 }
317
318 #[instrument(skip(self, query))]
320 pub async fn hybrid_search(&self, query: HybridQuery) -> VectorResult<SearchResponse> {
321 self.hybrid_searcher.search(query).await
322 }
323
324 #[instrument(skip(self))]
326 pub async fn delete(&self, collection: &str, id: uuid::Uuid) -> VectorResult<bool> {
327 self.collections
328 .delete_vector(&self.config.default_workspace_id, collection, id)
329 .await
330 }
331
332 #[instrument(skip(self))]
334 pub async fn delete_in_workspace(
335 &self,
336 workspace_id: &str,
337 collection: &str,
338 id: uuid::Uuid,
339 ) -> VectorResult<bool> {
340 self.collections
341 .delete_vector(workspace_id, collection, id)
342 .await
343 }
344
345 #[instrument(skip(self))]
347 pub async fn get(&self, collection: &str, id: uuid::Uuid) -> VectorResult<VectorRecord> {
348 self.collections
349 .get_vector(&self.config.default_workspace_id, collection, id)
350 .await
351 }
352
353 #[instrument(skip(self))]
355 pub async fn get_in_workspace(
356 &self,
357 workspace_id: &str,
358 collection: &str,
359 id: uuid::Uuid,
360 ) -> VectorResult<VectorRecord> {
361 self.collections
362 .get_vector(workspace_id, collection, id)
363 .await
364 }
365
366 #[instrument(skip(self))]
368 pub async fn close(&self) -> VectorResult<()> {
369 self.collections.persist_indexes().await?;
370 self.collections.store.close().await;
371 Ok(())
372 }
373
374 #[instrument(skip(self))]
376 pub async fn stats(&self) -> EngineStats {
377 let collections = self
378 .collections
379 .list_collections(&self.config.default_workspace_id)
380 .await
381 .unwrap_or_default();
382 let cache_stats = self
383 .embedding_client
384 .cache_stats()
385 .await
386 .unwrap_or_default();
387
388 EngineStats {
389 collection_count: collections.len(),
390 total_vectors: collections
391 .iter()
392 .map(|collection| collection.vector_count)
393 .sum(),
394 loaded_indexes: self.collections.loaded_index_count().await,
395 loaded_mmap_files: self.collections.loaded_mmap_count().await,
396 embedding_cache_hits: cache_stats.hit_count,
397 embedding_cache_misses: cache_stats.miss_count,
398 }
399 }
400}