use anyhow::Result;
use async_trait::async_trait;
use std::sync::Arc;
use std::collections::HashMap;
use tracing::{info, warn};
use crate::db::{VectorStore, EmbeddedQdrantVectorStore, QdrantServerVectorStore};
use crate::db::vector_store::{CollectionInfo, CollectionHealth};
use crate::types::{Document, SearchOptions, SearchResult, SearchFilter};
pub struct DualVectorStore {
embedded: Arc<EmbeddedQdrantVectorStore>,
server: Arc<QdrantServerVectorStore>,
fail_on_server_error: bool,
}
impl DualVectorStore {
fn should_sync_collection(&self, collection_name: &str) -> bool {
if collection_name == "chat_history" || collection_name == "chat" {
return false;
}
if collection_name.ends_with("_estate") {
return false;
}
true
}
}
impl DualVectorStore {
pub async fn new(
embedded: EmbeddedQdrantVectorStore,
server: QdrantServerVectorStore,
fail_on_server_error: bool,
) -> Result<Self> {
Ok(Self {
embedded: Arc::new(embedded),
server: Arc::new(server),
fail_on_server_error,
})
}
async fn sync_to_server<F, T>(&self, operation: &str, func: F) -> Result<()>
where
F: std::future::Future<Output = Result<T>>,
{
match func.await {
Ok(_) => {
info!("✅ Synced {} to server", operation);
Ok(())
}
Err(e) => {
warn!("⚠️ Failed to sync {} to server: {}", operation, e);
if self.fail_on_server_error {
Err(e)
} else {
Ok(())
}
}
}
}
}
#[async_trait]
impl VectorStore for DualVectorStore {
async fn initialize(&self) -> Result<()> {
info!("🔄 Initializing Dual Vector Store (Embedded + Server)");
self.embedded.initialize().await?;
if let Err(e) = self.server.initialize().await {
warn!("⚠️ Failed to initialize server store: {}. Will continue with embedded only.", e);
} else {
info!("✅ Server store initialized successfully");
}
Ok(())
}
async fn shutdown(&self) -> Result<()> {
info!("🔄 Shutting down Dual Vector Store");
self.embedded.shutdown().await?;
if let Err(e) = self.server.shutdown().await {
warn!("⚠️ Failed to shutdown server store: {}", e);
}
Ok(())
}
async fn clear_document_cache(&self) -> Result<()> {
info!("🗑️ Clearing document cache in dual store (embedded only)");
self.embedded.clear_document_cache().await?;
Ok(())
}
async fn create_collection(&self, name: &str, dimension: usize) -> Result<()> {
info!("📝 Creating collection '{}' in dual store", name);
self.embedded.create_collection(name, dimension).await?;
if self.should_sync_collection(name) {
self.sync_to_server(
&format!("collection '{}'", name),
self.server.create_collection(name, dimension)
).await?;
} else {
info!("⏭️ Skipping server sync for local-only collection '{}'", name);
}
Ok(())
}
async fn delete_collection(&self, name: &str) -> Result<bool> {
info!("🗑️ Deleting collection '{}' from dual store", name);
let result = self.embedded.delete_collection(name).await?;
if self.should_sync_collection(name) {
self.sync_to_server(
&format!("delete collection '{}'", name),
async {
self.server.delete_collection(name).await?;
Ok::<(), anyhow::Error>(())
}
).await?;
} else {
info!("⏭️ Skipping server sync for local-only collection '{}'", name);
}
Ok(result)
}
async fn is_initialized(&self) -> bool {
self.embedded.is_initialized().await
}
async fn list_collections(&self) -> Result<Vec<String>> {
self.embedded.list_collections().await
}
async fn get_collection_info(&self, name: &str) -> Result<Option<CollectionInfo>> {
self.embedded.get_collection_info(name).await
}
async fn get_collections_health(&self) -> Result<HashMap<String, CollectionHealth>> {
self.embedded.get_collections_health().await
}
async fn add_document(&self, collection_name: &str, document: Document) -> Result<String> {
let id = self.embedded.add_document(collection_name, document.clone()).await?;
if self.should_sync_collection(collection_name) {
self.sync_to_server(
&format!("document '{}' to collection '{}'", id, collection_name),
self.server.add_document(collection_name, document)
).await?;
} else {
info!("⏭️ Document '{}' stored locally only (collection '{}')", id, collection_name);
}
Ok(id)
}
async fn add_documents(&self, collection_name: &str, documents: Vec<Document>) -> Result<Vec<String>> {
let ids = self.embedded.add_documents(collection_name, documents.clone()).await?;
if self.should_sync_collection(collection_name) {
self.sync_to_server(
&format!("{} documents to collection '{}'", documents.len(), collection_name),
self.server.add_documents(collection_name, documents)
).await?;
} else {
info!("⏭️ {} documents stored locally only (collection '{}')", documents.len(), collection_name);
}
Ok(ids)
}
async fn get_document(&self, collection_name: &str, id: &str) -> Result<Option<Document>> {
self.embedded.get_document(collection_name, id).await
}
async fn update_document(&self, collection_name: &str, id: &str, document: Document) -> Result<()> {
self.embedded.update_document(collection_name, id, document.clone()).await?;
if self.should_sync_collection(collection_name) {
self.sync_to_server(
&format!("update document '{}' in collection '{}'", id, collection_name),
self.server.update_document(collection_name, id, document)
).await?;
}
Ok(())
}
async fn delete_document(&self, collection_name: &str, id: &str) -> Result<bool> {
let result = self.embedded.delete_document(collection_name, id).await?;
if self.should_sync_collection(collection_name) {
self.sync_to_server(
&format!("delete document '{}' from collection '{}'", id, collection_name),
async {
self.server.delete_document(collection_name, id).await?;
Ok::<(), anyhow::Error>(())
}
).await?;
}
Ok(result)
}
async fn search(
&self,
collection_name: &str,
query_vector: Vec<f32>,
options: SearchOptions,
) -> Result<Vec<SearchResult>> {
self.embedded.search(collection_name, query_vector, options).await
}
async fn list_documents(
&self,
collection_name: &str,
limit: Option<usize>,
filter: Option<SearchFilter>,
) -> Result<Vec<Document>> {
self.embedded.list_documents(collection_name, limit, filter).await
}
async fn scroll_collection(
&self,
collection_name: &str,
filter: Option<SearchFilter>,
limit: Option<usize>,
) -> Result<Vec<SearchResult>> {
self.embedded.scroll_collection(collection_name, filter, limit).await
}
async fn set_dimensions(&self, dimensions: usize) -> Result<()> {
self.embedded.set_dimensions(dimensions).await?;
if let Err(e) = self.server.set_dimensions(dimensions).await {
warn!("⚠️ Failed to set dimensions on server: {}", e);
}
Ok(())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn disable_optimizer(&self, collection_name: &str) -> Result<()> {
self.server.disable_optimizer(collection_name).await
}
async fn enable_optimizer(&self, collection_name: &str) -> Result<()> {
self.server.enable_optimizer(collection_name).await
}
}
impl DualVectorStore {
pub async fn set_user_context(&self, user_id: &str) {
self.embedded.set_user_context(user_id).await;
self.server.set_user_context(user_id).await;
}
pub fn get_embedded(&self) -> &Arc<EmbeddedQdrantVectorStore> {
&self.embedded
}
pub fn get_server(&self) -> &Arc<QdrantServerVectorStore> {
&self.server
}
pub async fn force_sync_collection(&self, collection_name: &str) -> Result<usize> {
if !self.should_sync_collection(collection_name) {
info!("⏭️ Skipping force sync for local-only collection '{}'", collection_name);
return Ok(0);
}
info!("🔄 Force syncing collection '{}' to server", collection_name);
let documents = self.embedded.list_documents(collection_name, None, None).await?;
let count = documents.len();
if count == 0 {
info!("No documents to sync in collection '{}'", collection_name);
return Ok(0);
}
let collection_info = self.server.get_collection_info(collection_name).await?;
if collection_info.is_none() {
let dimension = documents[0].embedding.as_ref().map(|e| e.len()).unwrap_or(1024);
self.server.create_collection(collection_name, dimension).await?;
}
self.server.add_documents(collection_name, documents).await?;
info!("✅ Synced {} documents to server", count);
Ok(count)
}
}