Skip to main content

mockforge_data/rag/
storage.rs

1//! Document storage and vector indexing
2//!
3//! This module provides storage backends for documents and their vector embeddings,
4//! supporting various indexing strategies and similarity search algorithms.
5
6use crate::rag::engine::DocumentChunk;
7use crate::Result;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12
13type VectorStore = Arc<RwLock<Vec<(String, Vec<f32>)>>>;
14
15/// Vector index for similarity search
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct VectorIndex {
18    /// Index ID
19    pub id: String,
20    /// Index name
21    pub name: String,
22    /// Index type
23    pub index_type: IndexType,
24    /// Vector dimensions
25    pub dimensions: usize,
26    /// Number of vectors indexed
27    pub vector_count: usize,
28    /// Index metadata
29    pub metadata: HashMap<String, String>,
30    /// Creation timestamp
31    pub created_at: chrono::DateTime<chrono::Utc>,
32    /// Last updated timestamp
33    pub updated_at: chrono::DateTime<chrono::Utc>,
34}
35
36impl VectorIndex {
37    /// Create a new vector index
38    pub fn new(id: String, name: String, index_type: IndexType, dimensions: usize) -> Self {
39        let now = chrono::Utc::now();
40        Self {
41            id,
42            name,
43            index_type,
44            dimensions,
45            vector_count: 0,
46            metadata: HashMap::new(),
47            created_at: now,
48            updated_at: now,
49        }
50    }
51
52    /// Add metadata to index
53    pub fn add_metadata(&mut self, key: String, value: String) {
54        self.metadata.insert(key, value);
55        self.updated_at = chrono::Utc::now();
56    }
57
58    /// Get metadata value
59    pub fn get_metadata(&self, key: &str) -> Option<&String> {
60        self.metadata.get(key)
61    }
62
63    /// Remove metadata
64    pub fn remove_metadata(&mut self, key: &str) -> Option<String> {
65        let result = self.metadata.remove(key);
66        if result.is_some() {
67            self.updated_at = chrono::Utc::now();
68        }
69        result
70    }
71
72    /// Update vector count
73    pub fn update_vector_count(&mut self, count: usize) {
74        self.vector_count = count;
75        self.updated_at = chrono::Utc::now();
76    }
77
78    /// Get index size estimate in bytes
79    pub fn estimated_size_bytes(&self) -> u64 {
80        // Rough estimate: each vector takes ~4 bytes per dimension + overhead
81        (self.vector_count * self.dimensions * 4 + 1024) as u64
82    }
83
84    /// Check if index is empty
85    pub fn is_empty(&self) -> bool {
86        self.vector_count == 0
87    }
88
89    /// Get index statistics
90    pub fn stats(&self) -> IndexStats {
91        IndexStats {
92            id: self.id.clone(),
93            name: self.name.clone(),
94            index_type: self.index_type.clone(),
95            dimensions: self.dimensions,
96            vector_count: self.vector_count,
97            estimated_size_bytes: self.estimated_size_bytes(),
98            metadata_count: self.metadata.len(),
99            created_at: self.created_at,
100            updated_at: self.updated_at,
101        }
102    }
103}
104
105/// Index statistics
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct IndexStats {
108    /// Index ID
109    pub id: String,
110    /// Index name
111    pub name: String,
112    /// Index type
113    pub index_type: IndexType,
114    /// Vector dimensions
115    pub dimensions: usize,
116    /// Number of vectors
117    pub vector_count: usize,
118    /// Estimated size in bytes
119    pub estimated_size_bytes: u64,
120    /// Number of metadata entries
121    pub metadata_count: usize,
122    /// Creation timestamp
123    pub created_at: chrono::DateTime<chrono::Utc>,
124    /// Last updated timestamp
125    pub updated_at: chrono::DateTime<chrono::Utc>,
126}
127
128/// Index type enumeration
129#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
130pub enum IndexType {
131    /// Flat index - brute force search
132    #[default]
133    Flat,
134    /// IVF (Inverted File) index - for large datasets
135    IVF,
136    /// HNSW (Hierarchical Navigable Small World) index - for high performance
137    HNSW,
138    /// PQ (Product Quantization) index - for memory efficiency
139    PQ,
140    /// Custom index type
141    Custom(String),
142}
143
144/// Search parameters for vector search
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct SearchParams {
147    /// Number of results to return
148    pub top_k: usize,
149    /// Similarity threshold (0.0 to 1.0)
150    pub threshold: f32,
151    /// Search method to use
152    pub search_method: SearchMethod,
153    /// Include metadata in results
154    pub include_metadata: bool,
155    /// Filter by document ID
156    pub document_filter: Option<String>,
157    /// Filter by metadata
158    pub metadata_filter: Option<HashMap<String, String>>,
159}
160
161impl Default for SearchParams {
162    fn default() -> Self {
163        Self {
164            top_k: 10,
165            threshold: 0.7,
166            search_method: SearchMethod::Cosine,
167            include_metadata: true,
168            document_filter: None,
169            metadata_filter: None,
170        }
171    }
172}
173
174/// Search method enumeration
175#[derive(Debug, Clone, Default, Serialize, Deserialize)]
176pub enum SearchMethod {
177    /// Cosine similarity
178    #[default]
179    Cosine,
180    /// Euclidean distance
181    Euclidean,
182    /// Dot product
183    DotProduct,
184    /// Manhattan distance
185    Manhattan,
186}
187
188/// Storage backend trait for documents and vectors
189#[async_trait::async_trait]
190pub trait DocumentStorage: Send + Sync {
191    /// Store document chunks
192    async fn store_chunks(&self, chunks: Vec<DocumentChunk>) -> Result<()>;
193
194    /// Search for similar chunks
195    async fn search_similar(
196        &self,
197        query_embedding: &[f32],
198        top_k: usize,
199    ) -> Result<Vec<DocumentChunk>>;
200
201    /// Search with custom parameters
202    async fn search_with_params(
203        &self,
204        query_embedding: &[f32],
205        params: SearchParams,
206    ) -> Result<Vec<DocumentChunk>>;
207
208    /// Get chunk by ID
209    async fn get_chunk(&self, chunk_id: &str) -> Result<Option<DocumentChunk>>;
210
211    /// Delete chunk by ID
212    async fn delete_chunk(&self, chunk_id: &str) -> Result<bool>;
213
214    /// Get chunks by document ID
215    async fn get_chunks_by_document(&self, document_id: &str) -> Result<Vec<DocumentChunk>>;
216
217    /// Delete all chunks for a document
218    async fn delete_document(&self, document_id: &str) -> Result<usize>;
219
220    /// Get storage statistics
221    async fn get_stats(&self) -> Result<StorageStats>;
222
223    /// List all document IDs
224    async fn list_documents(&self) -> Result<Vec<String>>;
225
226    /// Get total number of chunks
227    async fn get_total_chunks(&self) -> Result<usize>;
228
229    /// Clear all data
230    async fn clear(&self) -> Result<()>;
231
232    /// Optimize storage (rebuild indexes, compact data)
233    async fn optimize(&self) -> Result<()>;
234
235    /// Create backup
236    async fn create_backup(&self, path: &str) -> Result<()>;
237
238    /// Restore from backup
239    async fn restore_backup(&self, path: &str) -> Result<()>;
240
241    /// Check storage health
242    async fn health_check(&self) -> Result<StorageHealth>;
243}
244
245/// Storage statistics
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct StorageStats {
248    /// Total number of documents
249    pub total_documents: usize,
250    /// Total number of chunks
251    pub total_chunks: usize,
252    /// Index size in bytes
253    pub index_size_bytes: u64,
254    /// Last updated timestamp
255    pub last_updated: chrono::DateTime<chrono::Utc>,
256    /// Storage backend type
257    pub backend_type: String,
258    /// Available disk space in bytes
259    pub available_space_bytes: u64,
260    /// Used space in bytes
261    pub used_space_bytes: u64,
262}
263
264/// Storage health information
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct StorageHealth {
267    /// Overall health status
268    pub status: HealthStatus,
269    /// Health check timestamp
270    pub checked_at: chrono::DateTime<chrono::Utc>,
271    /// Detailed health information
272    pub details: HashMap<String, String>,
273    /// Performance metrics
274    pub metrics: Option<StorageMetrics>,
275}
276
277/// Health status enumeration
278#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
279pub enum HealthStatus {
280    /// Storage is healthy
281    Healthy,
282    /// Storage has warnings
283    Warning,
284    /// Storage is unhealthy
285    Unhealthy,
286    /// Storage is unavailable
287    Unavailable,
288}
289
290/// Storage performance metrics
291#[derive(Debug, Clone, Serialize, Deserialize)]
292pub struct StorageMetrics {
293    /// Average search time in milliseconds
294    pub average_search_time_ms: f64,
295    /// Average insert time in milliseconds
296    pub average_insert_time_ms: f64,
297    /// Index fragmentation percentage (0.0 to 1.0)
298    pub fragmentation_ratio: f32,
299    /// Cache hit rate (0.0 to 1.0)
300    pub cache_hit_rate: f32,
301    /// Memory usage in bytes
302    pub memory_usage_bytes: u64,
303    /// Disk usage in bytes
304    pub disk_usage_bytes: u64,
305}
306
307/// In-memory storage implementation for development and testing
308pub struct InMemoryStorage {
309    // pub(super) so the file-backed wrapper in this module can rehydrate
310    // these from disk without going through the async trait surface.
311    pub(super) chunks: Arc<RwLock<HashMap<String, DocumentChunk>>>,
312    pub(super) vectors: VectorStore,
313    pub(super) stats: Arc<RwLock<StorageStats>>,
314}
315
316impl InMemoryStorage {
317    /// Create a new in-memory storage
318    pub fn new() -> Self {
319        Self::new_with_backend_type("memory")
320    }
321
322    /// Create in-memory storage with a specific backend label.
323    /// This is used when a persistent backend is configured but running with an in-memory fallback.
324    pub fn new_with_backend_type(backend_type: &str) -> Self {
325        let now = chrono::Utc::now();
326        Self {
327            chunks: Arc::new(RwLock::new(HashMap::new())),
328            vectors: Arc::new(RwLock::new(Vec::new())),
329            stats: Arc::new(RwLock::new(StorageStats {
330                total_documents: 0,
331                total_chunks: 0,
332                index_size_bytes: 0,
333                last_updated: now,
334                backend_type: backend_type.to_string(),
335                available_space_bytes: u64::MAX,
336                used_space_bytes: 0,
337            })),
338        }
339    }
340
341    /// Calculate cosine similarity
342    fn cosine_similarity(&self, a: &[f32], b: &[f32]) -> f32 {
343        if a.len() != b.len() {
344            return 0.0;
345        }
346
347        let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
348        let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
349        let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
350
351        if norm_a == 0.0 || norm_b == 0.0 {
352            0.0
353        } else {
354            dot_product / (norm_a * norm_b)
355        }
356    }
357}
358
359impl Default for InMemoryStorage {
360    fn default() -> Self {
361        Self::new()
362    }
363}
364
365#[async_trait::async_trait]
366impl DocumentStorage for InMemoryStorage {
367    async fn store_chunks(&self, chunks: Vec<DocumentChunk>) -> Result<()> {
368        let mut chunks_map = self.chunks.write().await;
369        let mut vectors = self.vectors.write().await;
370        let mut stats = self.stats.write().await;
371
372        for chunk in chunks {
373            chunks_map.insert(chunk.id.clone(), chunk.clone());
374
375            // Store vector for similarity search
376            vectors.push((chunk.id.clone(), chunk.embedding.clone()));
377
378            stats.total_chunks += 1;
379        }
380
381        stats.last_updated = chrono::Utc::now();
382        stats.index_size_bytes = (stats.total_chunks * 1536 * 4) as u64; // Rough estimate
383        stats.used_space_bytes = stats.index_size_bytes;
384
385        Ok(())
386    }
387
388    async fn search_similar(
389        &self,
390        query_embedding: &[f32],
391        top_k: usize,
392    ) -> Result<Vec<DocumentChunk>> {
393        let vectors = self.vectors.read().await;
394        let chunks = self.chunks.read().await;
395
396        let mut similarities: Vec<(String, f32)> = vectors
397            .iter()
398            .map(|(chunk_id, embedding)| {
399                let similarity = self.cosine_similarity(query_embedding, embedding);
400                (chunk_id.clone(), similarity)
401            })
402            .collect();
403
404        // Sort by similarity (descending)
405        similarities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
406
407        // Take top-k results
408        let mut results = Vec::new();
409        for (chunk_id, _) in similarities.iter().take(top_k) {
410            if let Some(chunk) = chunks.get(chunk_id) {
411                results.push(chunk.clone());
412            }
413        }
414
415        Ok(results)
416    }
417
418    async fn search_with_params(
419        &self,
420        query_embedding: &[f32],
421        params: SearchParams,
422    ) -> Result<Vec<DocumentChunk>> {
423        let mut results = self.search_similar(query_embedding, params.top_k * 2).await?; // Get more candidates
424
425        // Apply filters
426        if let Some(document_filter) = &params.document_filter {
427            results.retain(|chunk| chunk.document_id == *document_filter);
428        }
429
430        if let Some(metadata_filter) = &params.metadata_filter {
431            results.retain(|chunk| {
432                metadata_filter.iter().all(|(key, value)| {
433                    chunk.get_metadata(key).map(|v| v == value).unwrap_or(false)
434                })
435            });
436        }
437
438        // Apply threshold filter
439        results.retain(|chunk| {
440            let similarity = self.cosine_similarity(query_embedding, &chunk.embedding);
441            similarity >= params.threshold
442        });
443
444        // Sort by similarity
445        results.sort_by(|a, b| {
446            let sim_a = self.cosine_similarity(query_embedding, &a.embedding);
447            let sim_b = self.cosine_similarity(query_embedding, &b.embedding);
448            sim_b.partial_cmp(&sim_a).unwrap_or(std::cmp::Ordering::Equal)
449        });
450
451        // Take top-k
452        results.truncate(params.top_k);
453
454        Ok(results)
455    }
456
457    async fn get_chunk(&self, chunk_id: &str) -> Result<Option<DocumentChunk>> {
458        let chunks = self.chunks.read().await;
459        Ok(chunks.get(chunk_id).cloned())
460    }
461
462    async fn delete_chunk(&self, chunk_id: &str) -> Result<bool> {
463        let mut chunks = self.chunks.write().await;
464        let mut vectors = self.vectors.write().await;
465        let mut stats = self.stats.write().await;
466
467        let chunk_removed = chunks.remove(chunk_id).is_some();
468        let _vector_removed = vectors.retain(|(id, _)| id != chunk_id);
469
470        if chunk_removed {
471            stats.total_chunks = stats.total_chunks.saturating_sub(1);
472            stats.last_updated = chrono::Utc::now();
473            stats.index_size_bytes = (stats.total_chunks * 1536 * 4) as u64;
474            stats.used_space_bytes = stats.index_size_bytes;
475        }
476
477        Ok(chunk_removed)
478    }
479
480    async fn get_chunks_by_document(&self, document_id: &str) -> Result<Vec<DocumentChunk>> {
481        let chunks = self.chunks.read().await;
482        let results = chunks
483            .values()
484            .filter(|chunk| chunk.document_id == document_id)
485            .cloned()
486            .collect();
487        Ok(results)
488    }
489
490    async fn delete_document(&self, document_id: &str) -> Result<usize> {
491        let mut chunks = self.chunks.write().await;
492        let mut vectors = self.vectors.write().await;
493        let mut stats = self.stats.write().await;
494
495        let initial_count = chunks.len();
496        chunks.retain(|_, chunk| chunk.document_id != document_id);
497        vectors.retain(|(id, _)| {
498            // Remove vectors for chunks that were removed
499            chunks.contains_key(id)
500        });
501
502        let removed_count = initial_count - chunks.len();
503        if removed_count > 0 {
504            stats.total_chunks = stats.total_chunks.saturating_sub(removed_count);
505            stats.last_updated = chrono::Utc::now();
506            stats.index_size_bytes = (stats.total_chunks * 1536 * 4) as u64;
507            stats.used_space_bytes = stats.index_size_bytes;
508        }
509
510        Ok(removed_count)
511    }
512
513    async fn get_stats(&self) -> Result<StorageStats> {
514        let stats = self.stats.read().await;
515        Ok(stats.clone())
516    }
517
518    async fn list_documents(&self) -> Result<Vec<String>> {
519        let chunks = self.chunks.read().await;
520        let documents: std::collections::HashSet<String> =
521            chunks.values().map(|chunk| chunk.document_id.clone()).collect();
522        Ok(documents.into_iter().collect())
523    }
524
525    async fn get_total_chunks(&self) -> Result<usize> {
526        let stats = self.stats.read().await;
527        Ok(stats.total_chunks)
528    }
529
530    async fn clear(&self) -> Result<()> {
531        let mut chunks = self.chunks.write().await;
532        let mut vectors = self.vectors.write().await;
533        let mut stats = self.stats.write().await;
534
535        chunks.clear();
536        vectors.clear();
537
538        stats.total_documents = 0;
539        stats.total_chunks = 0;
540        stats.index_size_bytes = 0;
541        stats.used_space_bytes = 0;
542        stats.last_updated = chrono::Utc::now();
543
544        Ok(())
545    }
546
547    async fn optimize(&self) -> Result<()> {
548        // In-memory storage doesn't need optimization
549        Ok(())
550    }
551
552    async fn create_backup(&self, path: &str) -> Result<()> {
553        let chunks = self.chunks.read().await;
554        let vectors = self.vectors.read().await;
555
556        let backup_data = serde_json::json!({
557            "version": 1,
558            "created_at": chrono::Utc::now().to_rfc3339(),
559            "chunks": chunks.values().collect::<Vec<_>>(),
560            "vectors": vectors.iter().collect::<Vec<_>>(),
561        });
562
563        let json_bytes = serde_json::to_vec_pretty(&backup_data)?;
564        std::fs::write(path, json_bytes)?;
565
566        Ok(())
567    }
568
569    async fn restore_backup(&self, path: &str) -> Result<()> {
570        let json_bytes = std::fs::read(path)?;
571        let backup_data: serde_json::Value = serde_json::from_slice(&json_bytes)?;
572
573        // Clear current data
574        self.clear().await?;
575
576        let mut chunks_map = self.chunks.write().await;
577        let mut vectors = self.vectors.write().await;
578        let mut stats = self.stats.write().await;
579
580        // Restore chunks
581        if let Some(chunks_arr) = backup_data.get("chunks").and_then(|v| v.as_array()) {
582            for chunk_val in chunks_arr {
583                if let Ok(chunk) = serde_json::from_value::<DocumentChunk>(chunk_val.clone()) {
584                    chunks_map.insert(chunk.id.clone(), chunk);
585                }
586            }
587        }
588
589        // Restore vectors
590        if let Some(vectors_arr) = backup_data.get("vectors").and_then(|v| v.as_array()) {
591            for vector_val in vectors_arr {
592                if let Ok(vector) = serde_json::from_value::<(String, Vec<f32>)>(vector_val.clone())
593                {
594                    vectors.push(vector);
595                }
596            }
597        }
598
599        // Update stats
600        let doc_ids: std::collections::HashSet<String> =
601            chunks_map.values().map(|c| c.document_id.clone()).collect();
602        stats.total_documents = doc_ids.len();
603        stats.total_chunks = chunks_map.len();
604        stats.index_size_bytes = (stats.total_chunks * 1536 * 4) as u64;
605        stats.used_space_bytes = stats.index_size_bytes;
606        stats.last_updated = chrono::Utc::now();
607
608        Ok(())
609    }
610
611    async fn health_check(&self) -> Result<StorageHealth> {
612        let chunks = self.chunks.read().await;
613        let vectors = self.vectors.read().await;
614
615        let mut details = HashMap::new();
616        details.insert("chunk_count".to_string(), chunks.len().to_string());
617        details.insert("vector_count".to_string(), vectors.len().to_string());
618        details.insert("memory_usage".to_string(), "unknown".to_string());
619
620        let status = if chunks.len() == vectors.len() {
621            HealthStatus::Healthy
622        } else {
623            details.insert("error".to_string(), "Chunk/vector count mismatch".to_string());
624            HealthStatus::Unhealthy
625        };
626
627        Ok(StorageHealth {
628            status,
629            checked_at: chrono::Utc::now(),
630            details,
631            metrics: None,
632        })
633    }
634}
635
636/// Storage factory for creating different storage backends
637pub struct StorageFactory;
638
639impl StorageFactory {
640    /// Create in-memory storage. Ephemeral — chunks + vectors live only
641    /// for the process lifetime. Intended for tests and OSS quick-start.
642    pub fn create_memory() -> Box<dyn DocumentStorage> {
643        Box::new(InMemoryStorage::new())
644    }
645
646    /// Create file-backed storage that persists chunks + vectors to a JSON
647    /// snapshot inside `path/`. Closes the "RAG forgets everything on
648    /// restart" half of #669 — embeddings survive process restarts now.
649    ///
650    /// Layout:
651    ///   `<path>/storage.json`  — single-file snapshot (atomic write via tmp)
652    ///
653    /// Tradeoff: rewrites the whole snapshot on every `store_chunks` call.
654    /// Fine for thousand-chunk-scale corpora that fit in RAM (the
655    /// embedded RAG use case); a real vector database is a better fit
656    /// for million-chunk catalogs — see `create_vector_db` below.
657    pub fn create_file(path: &str) -> Result<Box<dyn DocumentStorage>> {
658        if path.trim().is_empty() {
659            return Err(crate::Error::generic("File storage path cannot be empty"));
660        }
661
662        std::fs::create_dir_all(path)?;
663        let dir = std::path::PathBuf::from(path);
664        Ok(Box::new(PersistentFileStorage::new(dir)?))
665    }
666
667    /// Create database storage. Not yet implemented — the connection
668    /// string would route to a sqlx-backed implementation. Returns a
669    /// labelled in-memory store for now and warns rather than failing
670    /// silently. Tracked in #669 follow-up.
671    pub fn create_database(connection_string: &str) -> Result<Box<dyn DocumentStorage>> {
672        if connection_string.trim().is_empty() {
673            return Err(crate::Error::generic("Database connection string cannot be empty"));
674        }
675        tracing::warn!(
676            "create_database falls back to in-memory storage; \
677             sqlx-backed backend is tracked in #669 follow-up"
678        );
679        Ok(Box::new(InMemoryStorage::new_with_backend_type("database")))
680    }
681
682    /// Create vector-database storage. Real vector-DB integrations
683    /// (Qdrant, LanceDB, pgvector) belong behind crate feature flags so
684    /// the heavy client/transitive deps don't land in every consumer.
685    /// Until one of those features is enabled, this returns a clear
686    /// error rather than silently falling back to ephemeral memory —
687    /// the silent fallback was exactly what the audit (#669) flagged.
688    pub fn create_vector_db(config: HashMap<String, String>) -> Result<Box<dyn DocumentStorage>> {
689        if config.is_empty() {
690            return Err(crate::Error::generic("Vector database configuration cannot be empty"));
691        }
692
693        let provider = config.get("provider").map(|s| s.as_str()).unwrap_or("<unspecified>");
694
695        Err(crate::Error::generic(format!(
696            "vector-db backend '{provider}' not compiled in. \
697             Enable the `qdrant` or `lancedb` feature on mockforge-data, \
698             or use `create_file()` for persistent local storage."
699        )))
700    }
701}
702
703/// File-backed `DocumentStorage` that snapshots an in-memory store to a
704/// JSON file on every write. Construction reads any prior snapshot back.
705///
706/// Implementation note: delegates all read paths to the wrapped
707/// `InMemoryStorage` so cosine-similarity search etc. stays identical.
708/// Only `store_chunks` / `delete_documents` / `clear_all` rewrite the
709/// snapshot — read-only ops stay zero-IO.
710pub struct PersistentFileStorage {
711    inner: InMemoryStorage,
712    snapshot_path: std::path::PathBuf,
713}
714
715#[derive(Debug, Serialize, Deserialize)]
716struct StorageSnapshot {
717    /// Schema version; bump if the on-disk shape changes.
718    version: u32,
719    chunks: HashMap<String, DocumentChunk>,
720    vectors: Vec<(String, Vec<f32>)>,
721}
722
723impl PersistentFileStorage {
724    /// Create a persistent file storage at `<dir>/storage.json`. Reads
725    /// any existing snapshot at construction time.
726    pub fn new(dir: std::path::PathBuf) -> Result<Self> {
727        std::fs::create_dir_all(&dir)?;
728        let snapshot_path = dir.join("storage.json");
729        let inner = InMemoryStorage::new_with_backend_type("file");
730
731        if snapshot_path.exists() {
732            let raw = std::fs::read_to_string(&snapshot_path).map_err(crate::Error::from)?;
733            let snapshot: StorageSnapshot = serde_json::from_str(&raw)
734                .map_err(|e| crate::Error::generic(format!("malformed snapshot: {e}")))?;
735
736            // We just created `inner` and haven't shared its Arcs yet, so
737            // `try_write` must succeed. Using `try_write` (not
738            // `blocking_write`) keeps construction safe to call from
739            // inside a tokio runtime — `blocking_write` panics from
740            // worker threads of the current runtime.
741            inner
742                .chunks
743                .try_write()
744                .map(|mut g| *g = snapshot.chunks)
745                .map_err(|_| crate::Error::generic("snapshot load: chunks lock contended"))?;
746            inner
747                .vectors
748                .try_write()
749                .map(|mut g| *g = snapshot.vectors)
750                .map_err(|_| crate::Error::generic("snapshot load: vectors lock contended"))?;
751            if let Ok(mut stats) = inner.stats.try_write() {
752                stats.last_updated = chrono::Utc::now();
753            }
754            tracing::info!(
755                path = %snapshot_path.display(),
756                "loaded RAG storage snapshot"
757            );
758        }
759
760        Ok(Self {
761            inner,
762            snapshot_path,
763        })
764    }
765
766    async fn persist(&self) -> Result<()> {
767        let snapshot = StorageSnapshot {
768            version: 1,
769            chunks: self.inner.chunks.read().await.clone(),
770            vectors: self.inner.vectors.read().await.clone(),
771        };
772        let json = serde_json::to_string_pretty(&snapshot)
773            .map_err(|e| crate::Error::generic(format!("serialise snapshot: {e}")))?;
774
775        // Atomic rename — a crash mid-write leaves the previous snapshot
776        // intact rather than truncating to a half-file.
777        let tmp = self.snapshot_path.with_extension("tmp");
778        std::fs::write(&tmp, json).map_err(crate::Error::from)?;
779        std::fs::rename(&tmp, &self.snapshot_path).map_err(crate::Error::from)?;
780        Ok(())
781    }
782}
783
784#[async_trait::async_trait]
785impl DocumentStorage for PersistentFileStorage {
786    async fn store_chunks(&self, chunks: Vec<DocumentChunk>) -> Result<()> {
787        self.inner.store_chunks(chunks).await?;
788        self.persist().await?;
789        Ok(())
790    }
791
792    async fn search_similar(
793        &self,
794        query_embedding: &[f32],
795        top_k: usize,
796    ) -> Result<Vec<DocumentChunk>> {
797        self.inner.search_similar(query_embedding, top_k).await
798    }
799
800    async fn search_with_params(
801        &self,
802        query_embedding: &[f32],
803        params: SearchParams,
804    ) -> Result<Vec<DocumentChunk>> {
805        self.inner.search_with_params(query_embedding, params).await
806    }
807
808    async fn get_chunk(&self, chunk_id: &str) -> Result<Option<DocumentChunk>> {
809        self.inner.get_chunk(chunk_id).await
810    }
811
812    async fn delete_chunk(&self, chunk_id: &str) -> Result<bool> {
813        let res = self.inner.delete_chunk(chunk_id).await?;
814        if res {
815            self.persist().await?;
816        }
817        Ok(res)
818    }
819
820    async fn get_chunks_by_document(&self, document_id: &str) -> Result<Vec<DocumentChunk>> {
821        self.inner.get_chunks_by_document(document_id).await
822    }
823
824    async fn delete_document(&self, document_id: &str) -> Result<usize> {
825        let n = self.inner.delete_document(document_id).await?;
826        if n > 0 {
827            self.persist().await?;
828        }
829        Ok(n)
830    }
831
832    async fn get_stats(&self) -> Result<StorageStats> {
833        let mut stats = self.inner.get_stats().await?;
834        // Override the backend label so health/admin surfaces show
835        // "file" rather than the inner store's "file" label (which we
836        // already set, but make it explicit).
837        stats.backend_type = "file".to_string();
838        Ok(stats)
839    }
840
841    async fn list_documents(&self) -> Result<Vec<String>> {
842        self.inner.list_documents().await
843    }
844
845    async fn get_total_chunks(&self) -> Result<usize> {
846        self.inner.get_total_chunks().await
847    }
848
849    async fn clear(&self) -> Result<()> {
850        self.inner.clear().await?;
851        self.persist().await?;
852        Ok(())
853    }
854
855    async fn optimize(&self) -> Result<()> {
856        self.inner.optimize().await
857    }
858
859    async fn create_backup(&self, path: &str) -> Result<()> {
860        self.inner.create_backup(path).await
861    }
862
863    async fn restore_backup(&self, path: &str) -> Result<()> {
864        self.inner.restore_backup(path).await?;
865        self.persist().await?;
866        Ok(())
867    }
868
869    async fn health_check(&self) -> Result<StorageHealth> {
870        self.inner.health_check().await
871    }
872}
873
874#[cfg(test)]
875mod tests {
876    use super::StorageFactory;
877    use std::collections::HashMap;
878
879    #[test]
880    fn test_module_compiles() {
881        // Basic compilation test
882    }
883
884    #[tokio::test]
885    async fn test_create_file_storage_fallback_backend_type() {
886        let dir =
887            std::env::temp_dir().join(format!("mockforge-data-storage-{}", std::process::id()));
888        let _ = std::fs::remove_dir_all(&dir);
889        let storage = StorageFactory::create_file(dir.to_str().expect("path")).expect("create");
890        let stats = storage.get_stats().await.expect("stats");
891        assert_eq!(stats.backend_type, "file");
892        let _ = std::fs::remove_dir_all(&dir);
893    }
894
895    #[tokio::test]
896    async fn test_create_database_storage_fallback_backend_type() {
897        let storage =
898            StorageFactory::create_database("postgres://user:pass@localhost/db").expect("create");
899        let stats = storage.get_stats().await.expect("stats");
900        assert_eq!(stats.backend_type, "database");
901    }
902
903    #[tokio::test]
904    async fn test_create_vector_storage_errors_without_real_backend() {
905        // After #669: vector-db requested without `qdrant`/`lancedb` feature
906        // is now an error rather than a silent in-memory fallback.
907        let mut cfg = HashMap::new();
908        cfg.insert("provider".to_string(), "qdrant".to_string());
909        let result = StorageFactory::create_vector_db(cfg);
910        assert!(result.is_err(), "expected error, got Ok");
911        let msg = result.err().unwrap().to_string();
912        assert!(
913            msg.contains("not compiled in") || msg.contains("qdrant"),
914            "expected helpful error mentioning compile-in or qdrant, got: {msg}"
915        );
916    }
917
918    #[tokio::test]
919    async fn test_persistent_file_storage_round_trips_across_restart() {
920        use crate::rag::engine::DocumentChunk;
921
922        let dir = std::env::temp_dir().join(format!(
923            "mockforge-rag-persist-{}-{}",
924            std::process::id(),
925            chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
926        ));
927        let _ = std::fs::remove_dir_all(&dir);
928
929        let path_str = dir.to_str().expect("path");
930
931        // First "process": write a chunk + embedding.
932        {
933            let storage = StorageFactory::create_file(path_str).expect("create");
934            let chunk = DocumentChunk {
935                id: "chunk-1".to_string(),
936                document_id: "doc-1".to_string(),
937                content: "hello rag".to_string(),
938                embedding: vec![0.1, 0.2, 0.3],
939                metadata: HashMap::new(),
940                position: 0,
941                length: 9,
942            };
943            storage.store_chunks(vec![chunk]).await.expect("store");
944        }
945
946        // Second "process": read the same path. Inner state should
947        // rehydrate from disk; the chunk should still be queryable.
948        {
949            let storage = StorageFactory::create_file(path_str).expect("reopen");
950            let stats = storage.get_stats().await.expect("stats");
951            assert_eq!(stats.backend_type, "file");
952            let chunk = storage.get_chunk("chunk-1").await.expect("query");
953            assert!(chunk.is_some(), "persisted chunk should survive restart");
954            let chunk = chunk.unwrap();
955            assert_eq!(chunk.content, "hello rag");
956            assert_eq!(chunk.embedding, vec![0.1, 0.2, 0.3]);
957        }
958
959        let _ = std::fs::remove_dir_all(&dir);
960    }
961
962    #[tokio::test]
963    async fn test_persistent_file_storage_search_survives_restart() {
964        use crate::rag::engine::DocumentChunk;
965
966        let dir = std::env::temp_dir().join(format!(
967            "mockforge-rag-search-{}-{}",
968            std::process::id(),
969            chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
970        ));
971        let _ = std::fs::remove_dir_all(&dir);
972        let path_str = dir.to_str().expect("path");
973
974        {
975            let storage = StorageFactory::create_file(path_str).expect("create");
976            let chunks = vec![
977                DocumentChunk {
978                    id: "a".to_string(),
979                    document_id: "doc".to_string(),
980                    content: "apple".to_string(),
981                    embedding: vec![1.0, 0.0, 0.0],
982                    metadata: HashMap::new(),
983                    position: 0,
984                    length: 5,
985                },
986                DocumentChunk {
987                    id: "b".to_string(),
988                    document_id: "doc".to_string(),
989                    content: "banana".to_string(),
990                    embedding: vec![0.0, 1.0, 0.0],
991                    metadata: HashMap::new(),
992                    position: 5,
993                    length: 6,
994                },
995            ];
996            storage.store_chunks(chunks).await.expect("store");
997        }
998
999        // Reopen — cosine-similarity search should still return
1000        // the right chunk for a vector pointing at it.
1001        let storage = StorageFactory::create_file(path_str).expect("reopen");
1002        let hits = storage.search_similar(&[1.0, 0.0, 0.0], 1).await.expect("search");
1003        assert_eq!(hits.len(), 1);
1004        assert_eq!(hits[0].id, "a");
1005
1006        let _ = std::fs::remove_dir_all(&dir);
1007    }
1008}