use std::sync::Arc;
use tracing::instrument;
use crate::{
collections::CollectionManager,
config::VectorConfig,
embeddings::{EmbeddingClient, EmbeddingProvider},
error::VectorResult,
search::{AnnSearcher, HybridSearcher},
store::VectorStore,
types::{
Collection, DistanceMetric, EngineStats, HybridQuery, SearchQuery, SearchResponse,
VectorRecord,
},
};
pub struct VectorEngine {
pub config: VectorConfig,
pub collections: Arc<CollectionManager>,
pub ann_searcher: Arc<AnnSearcher>,
pub hybrid_searcher: Arc<HybridSearcher>,
pub embedding_client: Arc<dyn EmbeddingProvider>,
}
impl VectorEngine {
fn ensure_default_workspace_allowed(&self, operation: &str) -> VectorResult<()> {
if self.config.require_workspace_id {
return Err(crate::error::VectorError::Config(format!(
"{operation} requires explicit workspace_id"
)));
}
Ok(())
}
#[instrument]
pub async fn new(config: VectorConfig) -> VectorResult<Self> {
let embedding_client =
Arc::new(EmbeddingClient::new(&config).await?) as Arc<dyn EmbeddingProvider>;
Self::with_embedding_provider(config, embedding_client).await
}
#[instrument]
pub async fn open_default() -> VectorResult<Self> {
Self::new(VectorConfig::default()).await
}
#[instrument(skip(embedding_client))]
pub async fn with_embedding_provider(
config: VectorConfig,
embedding_client: Arc<dyn EmbeddingProvider>,
) -> VectorResult<Self> {
let store = Arc::new(VectorStore::new(&config.db_path).await?);
let collections =
Arc::new(CollectionManager::new(config.clone(), Arc::clone(&store)).await?);
let ann_searcher = Arc::new(AnnSearcher::new(Arc::clone(&collections)));
let hybrid_searcher = Arc::new(HybridSearcher::new(
Arc::clone(&ann_searcher),
Arc::clone(&store),
));
Ok(VectorEngine {
config,
collections,
ann_searcher,
hybrid_searcher,
embedding_client,
})
}
#[instrument(skip(self))]
pub async fn create_collection(
&self,
name: &str,
dimensions: usize,
distance: DistanceMetric,
) -> VectorResult<Collection> {
self.ensure_default_workspace_allowed("create_collection")?;
self.collections
.create_collection(
&self.config.default_workspace_id,
name,
dimensions,
distance,
)
.await
}
#[instrument(skip(self))]
pub async fn create_collection_in_workspace(
&self,
workspace_id: &str,
name: &str,
dimensions: usize,
distance: DistanceMetric,
) -> VectorResult<Collection> {
self.collections
.create_collection(workspace_id, name, dimensions, distance)
.await
}
#[instrument(skip(self))]
pub async fn delete_collection(&self, name: &str) -> VectorResult<()> {
self.ensure_default_workspace_allowed("delete_collection")?;
self.collections
.delete_collection(&self.config.default_workspace_id, name)
.await
}
#[instrument(skip(self))]
pub async fn delete_collection_in_workspace(
&self,
workspace_id: &str,
name: &str,
) -> VectorResult<()> {
self.collections.delete_collection(workspace_id, name).await
}
#[instrument(skip(self))]
pub async fn list_collections(&self) -> VectorResult<Vec<Collection>> {
self.ensure_default_workspace_allowed("list_collections")?;
self.collections
.list_collections(&self.config.default_workspace_id)
.await
}
#[instrument(skip(self))]
pub async fn list_collections_in_workspace(
&self,
workspace_id: &str,
) -> VectorResult<Vec<Collection>> {
self.collections.list_collections(workspace_id).await
}
#[instrument(skip(self, text, metadata))]
pub async fn upsert(
&self,
collection: &str,
text: &str,
metadata: serde_json::Value,
) -> VectorResult<uuid::Uuid> {
self.ensure_default_workspace_allowed("upsert")?;
let vector = self.embedding_client.embed_one(text).await?;
let record = VectorRecord::new(collection, vector)
.with_text(text.to_string())
.with_metadata(metadata);
self.collections
.insert_vector(&self.config.default_workspace_id, record)
.await
}
#[instrument(skip(self, text, metadata))]
pub async fn upsert_in_workspace(
&self,
workspace_id: &str,
collection: &str,
text: &str,
metadata: serde_json::Value,
) -> VectorResult<uuid::Uuid> {
let vector = self.embedding_client.embed_one(text).await?;
let record = VectorRecord::new(collection, vector)
.with_text(text.to_string())
.with_metadata(metadata);
self.collections.insert_vector(workspace_id, record).await
}
#[instrument(skip(self, items))]
pub async fn upsert_batch(
&self,
collection: &str,
items: Vec<(String, serde_json::Value)>,
) -> VectorResult<Vec<uuid::Uuid>> {
self.ensure_default_workspace_allowed("upsert_batch")?;
let texts = items
.iter()
.map(|(text, _)| text.clone())
.collect::<Vec<_>>();
let embeddings = self.embedding_client.embed(texts).await?;
let records = items
.into_iter()
.zip(embeddings.into_iter())
.map(|((text, metadata), vector)| {
VectorRecord::new(collection, vector)
.with_text(text)
.with_metadata(metadata)
})
.collect::<Vec<_>>();
self.collections
.insert_batch(&self.config.default_workspace_id, records)
.await
}
#[instrument(skip(self, items))]
pub async fn upsert_batch_in_workspace(
&self,
workspace_id: &str,
collection: &str,
items: Vec<(String, serde_json::Value)>,
) -> VectorResult<Vec<uuid::Uuid>> {
let texts = items
.iter()
.map(|(text, _)| text.clone())
.collect::<Vec<_>>();
let embeddings = self.embedding_client.embed(texts).await?;
let records = items
.into_iter()
.zip(embeddings.into_iter())
.map(|((text, metadata), vector)| {
VectorRecord::new(collection, vector)
.with_text(text)
.with_metadata(metadata)
})
.collect::<Vec<_>>();
self.collections.insert_batch(workspace_id, records).await
}
#[instrument(skip(self, vector, metadata))]
pub async fn upsert_vector(
&self,
collection: &str,
vector: Vec<f32>,
metadata: serde_json::Value,
) -> VectorResult<uuid::Uuid> {
self.ensure_default_workspace_allowed("upsert_vector")?;
let record = VectorRecord::new(collection, vector).with_metadata(metadata);
self.collections
.insert_vector(&self.config.default_workspace_id, record)
.await
}
#[instrument(skip(self, vector, metadata))]
pub async fn upsert_vector_in_workspace(
&self,
workspace_id: &str,
collection: &str,
vector: Vec<f32>,
metadata: serde_json::Value,
) -> VectorResult<uuid::Uuid> {
let record = VectorRecord::new(collection, vector).with_metadata(metadata);
self.collections.insert_vector(workspace_id, record).await
}
#[instrument(skip(self, query))]
pub async fn search(&self, query: SearchQuery) -> VectorResult<SearchResponse> {
self.ensure_default_workspace_allowed("search")?;
self.ann_searcher.search(query).await
}
#[instrument(skip(self, query))]
pub async fn search_in_workspace(
&self,
workspace_id: &str,
query: SearchQuery,
) -> VectorResult<SearchResponse> {
self.ann_searcher
.search_in_workspace(workspace_id, query)
.await
}
#[instrument(skip(self, text))]
pub async fn search_text(
&self,
collection: &str,
text: &str,
top_k: usize,
) -> VectorResult<SearchResponse> {
self.ensure_default_workspace_allowed("search_text")?;
let vector = self.embedding_client.embed_one(text).await?;
self.ann_searcher
.search(SearchQuery {
collection: collection.to_string(),
vector,
top_k,
filter: None,
include_vectors: false,
include_metadata: true,
ef_search: None,
reranker: None,
})
.await
}
#[instrument(skip(self, text))]
pub async fn search_text_in_workspace(
&self,
workspace_id: &str,
collection: &str,
text: &str,
top_k: usize,
) -> VectorResult<SearchResponse> {
let vector = self.embedding_client.embed_one(text).await?;
self.ann_searcher
.search_in_workspace(
workspace_id,
SearchQuery {
collection: collection.to_string(),
vector,
top_k,
filter: None,
include_vectors: false,
include_metadata: true,
ef_search: None,
reranker: None,
},
)
.await
}
#[instrument(skip(self, query))]
pub async fn hybrid_search(&self, query: HybridQuery) -> VectorResult<SearchResponse> {
self.ensure_default_workspace_allowed("hybrid_search")?;
self.hybrid_searcher.search(query).await
}
#[instrument(skip(self))]
pub async fn delete(&self, collection: &str, id: uuid::Uuid) -> VectorResult<bool> {
self.ensure_default_workspace_allowed("delete")?;
self.collections
.delete_vector(&self.config.default_workspace_id, collection, id)
.await
}
#[instrument(skip(self))]
pub async fn delete_in_workspace(
&self,
workspace_id: &str,
collection: &str,
id: uuid::Uuid,
) -> VectorResult<bool> {
self.collections
.delete_vector(workspace_id, collection, id)
.await
}
#[instrument(skip(self))]
pub async fn get(&self, collection: &str, id: uuid::Uuid) -> VectorResult<VectorRecord> {
self.ensure_default_workspace_allowed("get")?;
self.collections
.get_vector(&self.config.default_workspace_id, collection, id)
.await
}
#[instrument(skip(self))]
pub async fn get_in_workspace(
&self,
workspace_id: &str,
collection: &str,
id: uuid::Uuid,
) -> VectorResult<VectorRecord> {
self.collections
.get_vector(workspace_id, collection, id)
.await
}
#[instrument(skip(self))]
pub async fn close(&self) -> VectorResult<()> {
self.collections.persist_indexes().await?;
self.collections.store.close().await;
Ok(())
}
#[instrument(skip(self))]
pub async fn stats(&self) -> EngineStats {
let collections = self
.collections
.list_collections(&self.config.default_workspace_id)
.await
.unwrap_or_default();
let cache_stats = self
.embedding_client
.cache_stats()
.await
.unwrap_or_default();
EngineStats {
collection_count: collections.len(),
total_vectors: collections
.iter()
.map(|collection| collection.vector_count)
.sum(),
loaded_indexes: self.collections.loaded_index_count().await,
loaded_mmap_files: self.collections.loaded_mmap_count().await,
embedding_cache_hits: cache_stats.hit_count,
embedding_cache_misses: cache_stats.miss_count,
}
}
}