rag-module 0.6.7

Enterprise RAG module with chat context storage, vector search, session management, and model downloading. Rust implementation with Node.js compatibility.
//! Content Store - SQLite-based storage for large encrypted metadata
//!
//! This module stores large payloads (like _encrypted_metadata) separately from Qdrant
//! to prevent data loss during segment merging in high-throughput scenarios.

use anyhow::{Result, anyhow};
use sqlx::{SqlitePool, sqlite::{SqliteConnectOptions, SqlitePoolOptions}};
use std::path::Path;
use std::str::FromStr;
use tracing::{info, debug, error};
use serde_json::Value as JsonValue;

/// ContentStore provides persistent storage for large document metadata
/// that would otherwise cause corruption in Qdrant during bulk inserts
pub struct ContentStore {
    pool: SqlitePool,
}

impl ContentStore {
    /// Create a new ContentStore with SQLite backend
    pub async fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
        let db_path = db_path.as_ref();

        // Ensure parent directory exists
        if let Some(parent) = db_path.parent() {
            tokio::fs::create_dir_all(parent).await?;
        }

        let db_url = format!("sqlite://{}", db_path.display());
        info!("🗄️ Initializing ContentStore at: {}", db_url);

        // Create connection options with optimizations
        let options = SqliteConnectOptions::from_str(&db_url)?
            .create_if_missing(true)
            .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) // Write-Ahead Logging for better concurrency
            .synchronous(sqlx::sqlite::SqliteSynchronous::Normal) // Balance between safety and performance
            .busy_timeout(std::time::Duration::from_secs(30)); // Wait up to 30s for locks

        // Create connection pool
        let pool = SqlitePoolOptions::new()
            .max_connections(10) // Allow concurrent operations
            .connect_with(options)
            .await?;

        info!("✅ ContentStore SQLite pool created");

        let store = Self { pool };

        // Initialize schema
        store.initialize_schema().await?;

        info!("✅ ContentStore initialized successfully");
        Ok(store)
    }

    /// Initialize database schema
    async fn initialize_schema(&self) -> Result<()> {
        info!("📋 Initializing ContentStore schema...");

        // Create content_metadata table
        sqlx::query(
            r#"
            CREATE TABLE IF NOT EXISTS content_metadata (
                doc_ref TEXT PRIMARY KEY NOT NULL,
                encrypted_metadata TEXT NOT NULL,
                collection_name TEXT NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                content_size INTEGER DEFAULT 0
            )
            "#
        )
        .execute(&self.pool)
        .await?;

        // Create index on collection_name for faster queries
        sqlx::query(
            r#"
            CREATE INDEX IF NOT EXISTS idx_collection_name
            ON content_metadata(collection_name)
            "#
        )
        .execute(&self.pool)
        .await?;

        // Create index on created_at for time-based queries
        sqlx::query(
            r#"
            CREATE INDEX IF NOT EXISTS idx_created_at
            ON content_metadata(created_at)
            "#
        )
        .execute(&self.pool)
        .await?;

        info!("✅ ContentStore schema initialized");
        Ok(())
    }

    /// Store encrypted metadata for a document
    pub async fn store_metadata(
        &self,
        doc_ref: &str,
        encrypted_metadata: &str,
        collection_name: &str,
    ) -> Result<()> {
        let content_size = encrypted_metadata.len() as i64;

        debug!("💾 Storing metadata for doc_ref: {} (size: {} bytes)", doc_ref, content_size);

        sqlx::query(
            r#"
            INSERT INTO content_metadata (doc_ref, encrypted_metadata, collection_name, content_size, updated_at)
            VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
            ON CONFLICT(doc_ref) DO UPDATE SET
                encrypted_metadata = excluded.encrypted_metadata,
                collection_name = excluded.collection_name,
                content_size = excluded.content_size,
                updated_at = CURRENT_TIMESTAMP
            "#
        )
        .bind(doc_ref)
        .bind(encrypted_metadata)
        .bind(collection_name)
        .bind(content_size)
        .execute(&self.pool)
        .await?;

        debug!("✅ Metadata stored for doc_ref: {}", doc_ref);
        Ok(())
    }

    /// Store multiple metadata entries in a batch (transaction)
    pub async fn store_metadata_batch(
        &self,
        entries: Vec<(String, String, String)>, // (doc_ref, encrypted_metadata, collection_name)
    ) -> Result<()> {
        let count = entries.len();
        info!("💾 Storing {} metadata entries in batch...", count);

        let mut tx = self.pool.begin().await?;

        for (doc_ref, encrypted_metadata, collection_name) in entries {
            let content_size = encrypted_metadata.len() as i64;

            sqlx::query(
                r#"
                INSERT INTO content_metadata (doc_ref, encrypted_metadata, collection_name, content_size, updated_at)
                VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
                ON CONFLICT(doc_ref) DO UPDATE SET
                    encrypted_metadata = excluded.encrypted_metadata,
                    collection_name = excluded.collection_name,
                    content_size = excluded.content_size,
                    updated_at = CURRENT_TIMESTAMP
                "#
            )
            .bind(&doc_ref)
            .bind(&encrypted_metadata)
            .bind(&collection_name)
            .bind(content_size)
            .execute(&mut *tx)
            .await?;
        }

        tx.commit().await?;

        info!("✅ Batch stored {} metadata entries", count);
        Ok(())
    }

    /// Retrieve encrypted metadata for a document
    pub async fn get_metadata(&self, doc_ref: &str) -> Result<Option<String>> {
        debug!("🔍 Fetching metadata for doc_ref: {}", doc_ref);

        let result = sqlx::query_scalar::<_, String>(
            r#"
            SELECT encrypted_metadata
            FROM content_metadata
            WHERE doc_ref = ?
            "#
        )
        .bind(doc_ref)
        .fetch_optional(&self.pool)
        .await?;

        match &result {
            Some(metadata) => {
                debug!("✅ Found metadata for doc_ref: {} (size: {} bytes)", doc_ref, metadata.len());
            },
            None => {
                debug!("⚠️ No metadata found for doc_ref: {}", doc_ref);
            }
        }

        Ok(result)
    }

    /// Retrieve metadata for multiple documents in batch
    pub async fn get_metadata_batch(&self, doc_refs: Vec<String>) -> Result<Vec<(String, String)>> {
        if doc_refs.is_empty() {
            return Ok(Vec::new());
        }

        let count = doc_refs.len();
        info!("🔍 Fetching metadata for {} documents in batch...", count);

        // Build query with placeholders
        let placeholders = doc_refs.iter().map(|_| "?").collect::<Vec<_>>().join(",");
        let query_str = format!(
            "SELECT doc_ref, encrypted_metadata FROM content_metadata WHERE doc_ref IN ({})",
            placeholders
        );

        let mut query = sqlx::query_as::<_, (String, String)>(&query_str);
        for doc_ref in doc_refs {
            query = query.bind(doc_ref);
        }

        let results = query.fetch_all(&self.pool).await?;

        info!("✅ Found metadata for {}/{} documents", results.len(), count);
        Ok(results)
    }

    /// Delete metadata for a document
    pub async fn delete_metadata(&self, doc_ref: &str) -> Result<bool> {
        debug!("🗑️ Deleting metadata for doc_ref: {}", doc_ref);

        let result = sqlx::query(
            r#"
            DELETE FROM content_metadata
            WHERE doc_ref = ?
            "#
        )
        .bind(doc_ref)
        .execute(&self.pool)
        .await?;

        let deleted = result.rows_affected() > 0;

        if deleted {
            debug!("✅ Deleted metadata for doc_ref: {}", doc_ref);
        } else {
            debug!("⚠️ No metadata found to delete for doc_ref: {}", doc_ref);
        }

        Ok(deleted)
    }

    /// Delete all metadata for a collection
    pub async fn delete_collection(&self, collection_name: &str) -> Result<usize> {
        info!("🗑️ Deleting all metadata for collection: {}", collection_name);

        let result = sqlx::query(
            r#"
            DELETE FROM content_metadata
            WHERE collection_name = ?
            "#
        )
        .bind(collection_name)
        .execute(&self.pool)
        .await?;

        let deleted = result.rows_affected() as usize;
        info!("✅ Deleted {} metadata entries for collection: {}", deleted, collection_name);

        Ok(deleted)
    }

    /// Count metadata entries in a collection
    pub async fn count_collection(&self, collection_name: &str) -> Result<i64> {
        let count = sqlx::query_scalar::<_, i64>(
            r#"
            SELECT COUNT(*)
            FROM content_metadata
            WHERE collection_name = ?
            "#
        )
        .bind(collection_name)
        .fetch_one(&self.pool)
        .await?;

        Ok(count)
    }

    /// Get total storage size for a collection
    pub async fn get_collection_size(&self, collection_name: &str) -> Result<i64> {
        let size = sqlx::query_scalar::<_, Option<i64>>(
            r#"
            SELECT SUM(content_size)
            FROM content_metadata
            WHERE collection_name = ?
            "#
        )
        .bind(collection_name)
        .fetch_one(&self.pool)
        .await?;

        Ok(size.unwrap_or(0))
    }

    /// Check if metadata exists for a document
    pub async fn exists(&self, doc_ref: &str) -> Result<bool> {
        let count = sqlx::query_scalar::<_, i64>(
            r#"
            SELECT COUNT(*)
            FROM content_metadata
            WHERE doc_ref = ?
            "#
        )
        .bind(doc_ref)
        .fetch_one(&self.pool)
        .await?;

        Ok(count > 0)
    }

    /// Get statistics about the content store
    pub async fn get_stats(&self) -> Result<ContentStoreStats> {
        let total_entries = sqlx::query_scalar::<_, i64>(
            "SELECT COUNT(*) FROM content_metadata"
        )
        .fetch_one(&self.pool)
        .await?;

        let total_size = sqlx::query_scalar::<_, Option<i64>>(
            "SELECT SUM(content_size) FROM content_metadata"
        )
        .fetch_one(&self.pool)
        .await?
        .unwrap_or(0);

        let collections = sqlx::query_scalar::<_, String>(
            "SELECT DISTINCT collection_name FROM content_metadata"
        )
        .fetch_all(&self.pool)
        .await?;

        Ok(ContentStoreStats {
            total_entries: total_entries as usize,
            total_size_bytes: total_size as usize,
            collection_count: collections.len(),
            collections,
        })
    }
}

/// Statistics about the content store
#[derive(Debug, Clone)]
pub struct ContentStoreStats {
    pub total_entries: usize,
    pub total_size_bytes: usize,
    pub collection_count: usize,
    pub collections: Vec<String>,
}