rag-module 0.6.7

Enterprise RAG module with chat context storage, vector search, session management, and model downloading. Rust implementation with Node.js compatibility.
//! Dual Vector Store - Uses embedded Qdrant for fast local operations
//! and syncs writes to a remote Qdrant server for backup/persistence

use anyhow::Result;
use async_trait::async_trait;
use std::sync::Arc;
use std::collections::HashMap;
use tracing::{info, warn};

use crate::db::{VectorStore, EmbeddedQdrantVectorStore, QdrantServerVectorStore};
use crate::db::vector_store::{CollectionInfo, CollectionHealth};
use crate::types::{Document, SearchOptions, SearchResult, SearchFilter};

/// Dual Vector Store that uses embedded for reads and writes to both embedded and server
pub struct DualVectorStore {
    /// Primary store - used for all reads and writes (fast, local)
    embedded: Arc<EmbeddedQdrantVectorStore>,
    /// Secondary store - used for writes only (backup to server)
    server: Arc<QdrantServerVectorStore>,
    /// Whether to fail writes if server sync fails
    fail_on_server_error: bool,
}

impl DualVectorStore {
    /// Check if a collection should be synced to server
    /// Chat history and estate collections remain local-only for privacy
    fn should_sync_collection(&self, collection_name: &str) -> bool {
        // Don't sync chat history (contains user conversations)
        if collection_name == "chat_history" || collection_name == "chat" {
            return false;
        }

        // Don't sync estate collections (contain sensitive infrastructure data)
        if collection_name.ends_with("_estate") {
            return false;
        }

        // Sync everything else (playbooks, libraries, etc.)
        true
    }
}

impl DualVectorStore {
    /// Create a new dual vector store
    pub async fn new(
        embedded: EmbeddedQdrantVectorStore,
        server: QdrantServerVectorStore,
        fail_on_server_error: bool,
    ) -> Result<Self> {
        Ok(Self {
            embedded: Arc::new(embedded),
            server: Arc::new(server),
            fail_on_server_error,
        })
    }

    /// Helper to write to server with error handling
    async fn sync_to_server<F, T>(&self, operation: &str, func: F) -> Result<()>
    where
        F: std::future::Future<Output = Result<T>>,
    {
        match func.await {
            Ok(_) => {
                info!("✅ Synced {} to server", operation);
                Ok(())
            }
            Err(e) => {
                warn!("⚠️ Failed to sync {} to server: {}", operation, e);
                if self.fail_on_server_error {
                    Err(e)
                } else {
                    // Just log the error but don't fail the operation
                    Ok(())
                }
            }
        }
    }
}

#[async_trait]
impl VectorStore for DualVectorStore {
    async fn initialize(&self) -> Result<()> {
        info!("🔄 Initializing Dual Vector Store (Embedded + Server)");

        // Initialize embedded (primary)
        self.embedded.initialize().await?;

        // Try to initialize server (secondary), but don't fail if it's down
        if let Err(e) = self.server.initialize().await {
            warn!("⚠️ Failed to initialize server store: {}. Will continue with embedded only.", e);
        } else {
            info!("✅ Server store initialized successfully");
        }

        Ok(())
    }

    async fn shutdown(&self) -> Result<()> {
        info!("🔄 Shutting down Dual Vector Store");

        // Shutdown both stores
        self.embedded.shutdown().await?;
        if let Err(e) = self.server.shutdown().await {
            warn!("⚠️ Failed to shutdown server store: {}", e);
        }

        Ok(())
    }

    async fn clear_document_cache(&self) -> Result<()> {
        info!("🗑️  Clearing document cache in dual store (embedded only)");
        // Only embedded store has document cache
        self.embedded.clear_document_cache().await?;
        Ok(())
    }

    async fn create_collection(&self, name: &str, dimension: usize) -> Result<()> {
        info!("📝 Creating collection '{}' in dual store", name);

        // Create in embedded (primary)
        self.embedded.create_collection(name, dimension).await?;

        // Sync to server (secondary) only if not a local-only collection
        if self.should_sync_collection(name) {
            self.sync_to_server(
                &format!("collection '{}'", name),
                self.server.create_collection(name, dimension)
            ).await?;
        } else {
            info!("⏭️ Skipping server sync for local-only collection '{}'", name);
        }

        Ok(())
    }

    async fn delete_collection(&self, name: &str) -> Result<bool> {
        info!("🗑️ Deleting collection '{}' from dual store", name);

        // Delete from embedded (primary)
        let result = self.embedded.delete_collection(name).await?;

        // Sync to server (secondary) only if not a local-only collection
        if self.should_sync_collection(name) {
            self.sync_to_server(
                &format!("delete collection '{}'", name),
                async {
                    self.server.delete_collection(name).await?;
                    Ok::<(), anyhow::Error>(())
                }
            ).await?;
        } else {
            info!("⏭️ Skipping server sync for local-only collection '{}'", name);
        }

        Ok(result)
    }

    async fn is_initialized(&self) -> bool {
        self.embedded.is_initialized().await
    }

    async fn list_collections(&self) -> Result<Vec<String>> {
        // List from embedded only (fast)
        self.embedded.list_collections().await
    }

    async fn get_collection_info(&self, name: &str) -> Result<Option<CollectionInfo>> {
        // Get info from embedded only (fast)
        self.embedded.get_collection_info(name).await
    }

    async fn get_collections_health(&self) -> Result<HashMap<String, CollectionHealth>> {
        // Get health from embedded only (fast)
        self.embedded.get_collections_health().await
    }

    async fn add_document(&self, collection_name: &str, document: Document) -> Result<String> {
        // Add to embedded (primary)
        let id = self.embedded.add_document(collection_name, document.clone()).await?;

        // Sync to server (secondary) only if not a local-only collection
        if self.should_sync_collection(collection_name) {
            self.sync_to_server(
                &format!("document '{}' to collection '{}'", id, collection_name),
                self.server.add_document(collection_name, document)
            ).await?;
        } else {
            info!("⏭️ Document '{}' stored locally only (collection '{}')", id, collection_name);
        }

        Ok(id)
    }

    async fn add_documents(&self, collection_name: &str, documents: Vec<Document>) -> Result<Vec<String>> {
        // Add to embedded (primary)
        let ids = self.embedded.add_documents(collection_name, documents.clone()).await?;

        // Sync to server (secondary) only if not a local-only collection
        if self.should_sync_collection(collection_name) {
            self.sync_to_server(
                &format!("{} documents to collection '{}'", documents.len(), collection_name),
                self.server.add_documents(collection_name, documents)
            ).await?;
        } else {
            info!("⏭️ {} documents stored locally only (collection '{}')", documents.len(), collection_name);
        }

        Ok(ids)
    }

    async fn get_document(&self, collection_name: &str, id: &str) -> Result<Option<Document>> {
        // Get from embedded only (fast)
        self.embedded.get_document(collection_name, id).await
    }

    async fn update_document(&self, collection_name: &str, id: &str, document: Document) -> Result<()> {
        // Update in embedded (primary)
        self.embedded.update_document(collection_name, id, document.clone()).await?;

        // Sync to server (secondary) only if not a local-only collection
        if self.should_sync_collection(collection_name) {
            self.sync_to_server(
                &format!("update document '{}' in collection '{}'", id, collection_name),
                self.server.update_document(collection_name, id, document)
            ).await?;
        }

        Ok(())
    }

    async fn delete_document(&self, collection_name: &str, id: &str) -> Result<bool> {
        // Delete from embedded (primary)
        let result = self.embedded.delete_document(collection_name, id).await?;

        // Sync to server (secondary) only if not a local-only collection
        if self.should_sync_collection(collection_name) {
            self.sync_to_server(
                &format!("delete document '{}' from collection '{}'", id, collection_name),
                async {
                    self.server.delete_document(collection_name, id).await?;
                    Ok::<(), anyhow::Error>(())
                }
            ).await?;
        }

        Ok(result)
    }

    async fn search(
        &self,
        collection_name: &str,
        query_vector: Vec<f32>,
        options: SearchOptions,
    ) -> Result<Vec<SearchResult>> {
        // Search embedded only (fast, local)
        self.embedded.search(collection_name, query_vector, options).await
    }

    async fn list_documents(
        &self,
        collection_name: &str,
        limit: Option<usize>,
        filter: Option<SearchFilter>,
    ) -> Result<Vec<Document>> {
        // List from embedded only (fast)
        self.embedded.list_documents(collection_name, limit, filter).await
    }

    async fn scroll_collection(
        &self,
        collection_name: &str,
        filter: Option<SearchFilter>,
        limit: Option<usize>,
    ) -> Result<Vec<SearchResult>> {
        // Scroll from embedded only (fast)
        self.embedded.scroll_collection(collection_name, filter, limit).await
    }

    async fn set_dimensions(&self, dimensions: usize) -> Result<()> {
        // Set dimensions on both stores
        self.embedded.set_dimensions(dimensions).await?;
        if let Err(e) = self.server.set_dimensions(dimensions).await {
            warn!("⚠️ Failed to set dimensions on server: {}", e);
        }
        Ok(())
    }

    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    async fn disable_optimizer(&self, collection_name: &str) -> Result<()> {
        // Disable optimizer on server store only (embedded doesn't have one)
        self.server.disable_optimizer(collection_name).await
    }

    async fn enable_optimizer(&self, collection_name: &str) -> Result<()> {
        // Re-enable optimizer on server store only (embedded doesn't have one)
        self.server.enable_optimizer(collection_name).await
    }
}

impl DualVectorStore {
    /// Set user context on both stores
    pub async fn set_user_context(&self, user_id: &str) {
        self.embedded.set_user_context(user_id).await;
        self.server.set_user_context(user_id).await;
    }

    /// Get the embedded store (for direct access if needed)
    pub fn get_embedded(&self) -> &Arc<EmbeddedQdrantVectorStore> {
        &self.embedded
    }

    /// Get the server store (for direct access if needed)
    pub fn get_server(&self) -> &Arc<QdrantServerVectorStore> {
        &self.server
    }

    /// Force a full sync from embedded to server for a collection
    /// Skips chat_history and estate collections (local-only for privacy)
    pub async fn force_sync_collection(&self, collection_name: &str) -> Result<usize> {
        // Check if collection should be synced
        if !self.should_sync_collection(collection_name) {
            info!("⏭️ Skipping force sync for local-only collection '{}'", collection_name);
            return Ok(0);
        }

        info!("🔄 Force syncing collection '{}' to server", collection_name);

        // Get all documents from embedded
        let documents = self.embedded.list_documents(collection_name, None, None).await?;
        let count = documents.len();

        if count == 0 {
            info!("No documents to sync in collection '{}'", collection_name);
            return Ok(0);
        }

        // Check if collection exists on server, create if not
        let collection_info = self.server.get_collection_info(collection_name).await?;
        if collection_info.is_none() {
            // Get dimension from first document
            let dimension = documents[0].embedding.as_ref().map(|e| e.len()).unwrap_or(1024);
            self.server.create_collection(collection_name, dimension).await?;
        }

        // Batch upload to server
        self.server.add_documents(collection_name, documents).await?;

        info!("✅ Synced {} documents to server", count);
        Ok(count)
    }
}