Skip to main content

foxstash_core/storage/
incremental.rs

1//! Incremental persistence with Write-Ahead Log (WAL)
2//!
3//! This module provides efficient incremental persistence for vector indexes:
4//!
5//! - **Write-Ahead Log (WAL)**: Append-only log for fast writes
6//! - **Checkpointing**: Periodic full snapshots for fast recovery
7//! - **Recovery**: Replay WAL from last checkpoint
8//! - **Compaction**: Merge WAL into checkpoint to reclaim space
9//!
10//! # Architecture
11//!
12//! ```text
13//! storage/
14//! ├── checkpoint_00001.bin   # Full index snapshot
15//! ├── checkpoint_00001.meta  # Checkpoint metadata
16//! ├── wal_00001.log          # WAL entries since checkpoint
17//! └── manifest.json          # Current state pointer
18//! ```
19//!
20//! # Example
21//!
22//! ```no_run
23//! use foxstash_core::storage::incremental::{
24//!     IncrementalStorage, IncrementalConfig, IndexMetadata, RecoveryHelper,
25//! };
26//! use foxstash_core::storage::incremental::WalOperation;
27//! use foxstash_core::index::HNSWIndex;
28//! use foxstash_core::Document;
29//!
30//! fn main() -> Result<(), foxstash_core::RagError> {
31//!     // Create incremental storage
32//!     let config = IncrementalConfig::default()
33//!         .with_wal_sync_interval(100)
34//!         .with_checkpoint_threshold(10_000);
35//!
36//!     let mut storage = IncrementalStorage::new("/tmp/index_storage", config)?;
37//!     let mut index = HNSWIndex::with_defaults(128);
38//!
39//!     // Add documents -- log to WAL, then apply to index
40//!     let doc = Document {
41//!         id: "doc1".into(),
42//!         content: "Hello".into(),
43//!         embedding: vec![0.1; 128],
44//!         metadata: None,
45//!     };
46//!     storage.log_add(&doc)?;
47//!     index.add(doc)?;
48//!
49//!     // Checkpoint when threshold is reached
50//!     if storage.needs_checkpoint() {
51//!         let meta = IndexMetadata {
52//!             document_count: index.len(),
53//!             embedding_dim: 128,
54//!             index_type: "hnsw".into(),
55//!         };
56//!         storage.checkpoint(&index.get_all_documents(), meta)?;
57//!     }
58//!
59//!     // Recovery: load last checkpoint, then replay WAL
60//!     let helper = RecoveryHelper::new(&storage);
61//!     helper.replay_wal(|op| {
62//!         match op {
63//!             WalOperation::Add(doc) => { index.add(doc.clone())?; }
64//!             WalOperation::Clear => { index.clear(); }
65//!             _ => {}
66//!         }
67//!         Ok(())
68//!     })?;
69//!     Ok(())
70//! }
71//! ```
72
73#![cfg(not(target_arch = "wasm32"))]
74
75use crate::storage::compression::{self, Codec};
76use crate::{Document, RagError, Result};
77use serde::{Deserialize, Serialize};
78use std::fs::{self, File, OpenOptions};
79use std::io::{BufReader, BufWriter, Read, Write};
80use std::path::{Path, PathBuf};
81use std::sync::atomic::{AtomicU64, Ordering};
82use std::time::{SystemTime, UNIX_EPOCH};
83
84static ATOMIC_WRITE_COUNTER: AtomicU64 = AtomicU64::new(0);
85
86// ============================================================================
87// Configuration
88// ============================================================================
89
90/// Configuration for incremental storage
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct IncrementalConfig {
93    /// Number of WAL operations before triggering automatic checkpoint
94    pub checkpoint_threshold: usize,
95    /// Number of WAL operations before syncing to disk
96    pub wal_sync_interval: usize,
97    /// Maximum WAL file size in bytes before forcing checkpoint
98    pub max_wal_size: usize,
99    /// Compression codec for checkpoints
100    pub checkpoint_codec: Codec,
101    /// Whether to fsync after each WAL write (slower but safer)
102    pub sync_on_write: bool,
103    /// Keep old checkpoints for rollback (0 = delete immediately)
104    pub keep_checkpoints: usize,
105}
106
107impl Default for IncrementalConfig {
108    fn default() -> Self {
109        Self {
110            checkpoint_threshold: 10_000,
111            wal_sync_interval: 100,
112            max_wal_size: 100 * 1024 * 1024, // 100 MB
113            checkpoint_codec: Codec::Gzip,
114            sync_on_write: false,
115            keep_checkpoints: 2,
116        }
117    }
118}
119
120impl IncrementalConfig {
121    /// Set checkpoint threshold
122    pub fn with_checkpoint_threshold(mut self, threshold: usize) -> Self {
123        self.checkpoint_threshold = threshold;
124        self
125    }
126
127    /// Set WAL sync interval
128    pub fn with_wal_sync_interval(mut self, interval: usize) -> Self {
129        self.wal_sync_interval = interval;
130        self
131    }
132
133    /// Set maximum WAL size
134    pub fn with_max_wal_size(mut self, size: usize) -> Self {
135        self.max_wal_size = size;
136        self
137    }
138
139    /// Set checkpoint compression codec
140    pub fn with_checkpoint_codec(mut self, codec: Codec) -> Self {
141        self.checkpoint_codec = codec;
142        self
143    }
144
145    /// Enable sync on every write (safer but slower)
146    pub fn with_sync_on_write(mut self, sync: bool) -> Self {
147        self.sync_on_write = sync;
148        self
149    }
150
151    /// Set number of old checkpoints to keep
152    pub fn with_keep_checkpoints(mut self, count: usize) -> Self {
153        self.keep_checkpoints = count;
154        self
155    }
156}
157
158// ============================================================================
159// WAL Entry Types
160// ============================================================================
161
162/// Operations that can be logged to WAL
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub enum WalOperation {
165    /// Add a document
166    Add(Document),
167    /// Remove a document by ID
168    Remove(String),
169    /// Clear all documents
170    Clear,
171    /// Marker for checkpoint completion
172    Checkpoint { checkpoint_id: u64 },
173}
174
175/// A single WAL entry with metadata
176#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct WalEntry {
178    /// Monotonically increasing sequence number
179    pub seq: u64,
180    /// Unix timestamp in milliseconds
181    pub timestamp: u64,
182    /// The operation
183    pub operation: WalOperation,
184    /// CRC32 checksum for integrity
185    pub checksum: u32,
186}
187
188impl WalEntry {
189    /// Create a new WAL entry
190    fn new(seq: u64, operation: WalOperation) -> Self {
191        let timestamp = SystemTime::now()
192            .duration_since(UNIX_EPOCH)
193            .unwrap()
194            .as_millis() as u64;
195
196        let mut entry = Self {
197            seq,
198            timestamp,
199            operation,
200            checksum: 0,
201        };
202
203        entry.checksum = entry.compute_checksum();
204        entry
205    }
206
207    /// Compute CRC32 checksum
208    fn compute_checksum(&self) -> u32 {
209        let data = serde_json::to_vec(&(&self.seq, &self.timestamp, &self.operation)).unwrap();
210        crc32fast::hash(&data)
211    }
212
213    /// Verify entry integrity
214    pub fn verify(&self) -> bool {
215        // compute_checksum() serializes (seq, timestamp, operation) — the checksum
216        // field is already excluded from the hash input, so no clone needed.
217        self.checksum == self.compute_checksum()
218    }
219}
220
221// ============================================================================
222// Manifest
223// ============================================================================
224
225/// Manifest tracking current storage state
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct Manifest {
228    /// Current checkpoint ID
229    pub current_checkpoint: Option<u64>,
230    /// Current WAL sequence number
231    pub wal_seq: u64,
232    /// Number of operations since last checkpoint
233    pub ops_since_checkpoint: usize,
234    /// Total documents in index
235    pub total_documents: usize,
236    /// Index embedding dimension
237    pub embedding_dim: usize,
238    /// Index type ("hnsw", "flat", "sq8_hnsw", "binary_hnsw")
239    pub index_type: String,
240    /// Last modified timestamp
241    pub last_modified: u64,
242}
243
244impl Default for Manifest {
245    fn default() -> Self {
246        Self {
247            current_checkpoint: None,
248            wal_seq: 0,
249            ops_since_checkpoint: 0,
250            total_documents: 0,
251            embedding_dim: 0,
252            index_type: String::new(),
253            last_modified: 0,
254        }
255    }
256}
257
258// ============================================================================
259// Checkpoint Metadata
260// ============================================================================
261
262/// Metadata for a checkpoint
263#[derive(Debug, Clone, Serialize, Deserialize)]
264pub struct CheckpointMeta {
265    /// Checkpoint ID
266    pub id: u64,
267    /// WAL sequence at checkpoint time
268    pub wal_seq: u64,
269    /// Number of documents in checkpoint
270    pub document_count: usize,
271    /// Embedding dimension
272    pub embedding_dim: usize,
273    /// Index type
274    pub index_type: String,
275    /// Creation timestamp
276    pub created_at: u64,
277    /// Uncompressed size
278    pub original_size: usize,
279    /// Compressed size
280    pub compressed_size: usize,
281    /// Compression codec used
282    pub codec: Codec,
283}
284
285// ============================================================================
286// WAL Writer
287// ============================================================================
288
289/// Write-Ahead Log writer
290struct WalWriter {
291    file: BufWriter<File>,
292    #[allow(dead_code)]
293    path: PathBuf, // Kept for future: WAL rotation, recovery logging
294    current_size: usize,
295    sync_on_write: bool,
296}
297
298impl WalWriter {
299    /// Open or create WAL file
300    fn open(path: &Path, sync_on_write: bool) -> Result<Self> {
301        let file = OpenOptions::new()
302            .create(true)
303            .append(true)
304            .open(path)
305            .map_err(|e| RagError::StorageError(format!("Failed to open WAL: {}", e)))?;
306
307        let current_size = file.metadata().map(|m| m.len() as usize).unwrap_or(0);
308
309        Ok(Self {
310            file: BufWriter::new(file),
311            path: path.to_path_buf(),
312            current_size,
313            sync_on_write,
314        })
315    }
316
317    /// Append entry to WAL
318    fn append(&mut self, entry: &WalEntry) -> Result<()> {
319        let data = serde_json::to_vec(entry)
320            .map_err(|e| RagError::StorageError(format!("WAL serialize failed: {}", e)))?;
321        let len = data.len() as u32;
322
323        // Write length prefix + data
324        self.file
325            .write_all(&len.to_le_bytes())
326            .map_err(|e| RagError::StorageError(format!("WAL write failed: {}", e)))?;
327        self.file
328            .write_all(&data)
329            .map_err(|e| RagError::StorageError(format!("WAL write failed: {}", e)))?;
330
331        self.current_size += 4 + data.len();
332
333        if self.sync_on_write {
334            self.sync()?;
335        }
336
337        Ok(())
338    }
339
340    /// Sync WAL to disk
341    fn sync(&mut self) -> Result<()> {
342        self.file
343            .flush()
344            .map_err(|e| RagError::StorageError(format!("WAL sync failed: {}", e)))?;
345        self.file
346            .get_ref()
347            .sync_all()
348            .map_err(|e| RagError::StorageError(format!("WAL sync failed: {}", e)))?;
349        Ok(())
350    }
351
352    /// Get current WAL size
353    fn size(&self) -> usize {
354        self.current_size
355    }
356}
357
358// ============================================================================
359// WAL Reader
360// ============================================================================
361
362/// Read entries from WAL
363struct WalReader {
364    file: BufReader<File>,
365}
366
367impl WalReader {
368    /// Open WAL for reading
369    fn open(path: &Path) -> Result<Self> {
370        let file = File::open(path)
371            .map_err(|e| RagError::StorageError(format!("Failed to open WAL: {}", e)))?;
372        Ok(Self {
373            file: BufReader::new(file),
374        })
375    }
376
377    /// Read all entries from a specific sequence number
378    fn read_from(&mut self, from_seq: u64) -> Result<Vec<WalEntry>> {
379        let mut entries = Vec::new();
380        let mut len_buf = [0u8; 4];
381
382        loop {
383            // Read length prefix
384            match self.file.read_exact(&mut len_buf) {
385                Ok(()) => {}
386                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
387                Err(e) => return Err(RagError::StorageError(format!("WAL read failed: {}", e))),
388            }
389
390            let len = u32::from_le_bytes(len_buf) as usize;
391            let mut data = vec![0u8; len];
392            match self.file.read_exact(&mut data) {
393                Ok(()) => {}
394                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
395                Err(e) => return Err(RagError::StorageError(format!("WAL read failed: {}", e))),
396            }
397
398            let entry: WalEntry = serde_json::from_slice(&data)
399                .map_err(|e| RagError::StorageError(format!("WAL deserialize failed: {}", e)))?;
400
401            // Verify integrity
402            if !entry.verify() {
403                return Err(RagError::StorageError(format!(
404                    "WAL entry {} failed integrity check",
405                    entry.seq
406                )));
407            }
408
409            // Only include entries after the requested sequence
410            if entry.seq > from_seq {
411                entries.push(entry);
412            }
413        }
414
415        Ok(entries)
416    }
417}
418
419// ============================================================================
420// Incremental Storage
421// ============================================================================
422
423/// Incremental storage manager with WAL and checkpointing
424///
425/// Provides efficient incremental persistence with fast recovery.
426pub struct IncrementalStorage {
427    base_path: PathBuf,
428    config: IncrementalConfig,
429    manifest: Manifest,
430    wal_writer: Option<WalWriter>,
431    ops_since_sync: usize,
432}
433
434impl IncrementalStorage {
435    /// Create or open incremental storage
436    pub fn new<P: AsRef<Path>>(base_path: P, config: IncrementalConfig) -> Result<Self> {
437        let base_path = base_path.as_ref().to_path_buf();
438
439        // Create directory if needed
440        fs::create_dir_all(&base_path)
441            .map_err(|e| RagError::StorageError(format!("Failed to create storage dir: {}", e)))?;
442
443        // Load or create manifest
444        let manifest_path = base_path.join("manifest.json");
445        let manifest = if manifest_path.exists() {
446            let data = fs::read_to_string(&manifest_path)
447                .map_err(|e| RagError::StorageError(format!("Failed to read manifest: {}", e)))?;
448            serde_json::from_str(&data)
449                .map_err(|e| RagError::StorageError(format!("Failed to parse manifest: {}", e)))?
450        } else {
451            Manifest::default()
452        };
453
454        // Open WAL writer
455        let wal_path = base_path.join(format!(
456            "wal_{:05}.log",
457            manifest.current_checkpoint.unwrap_or(0)
458        ));
459        let wal_writer = WalWriter::open(&wal_path, config.sync_on_write)?;
460
461        Ok(Self {
462            base_path,
463            config,
464            manifest,
465            wal_writer: Some(wal_writer),
466            ops_since_sync: 0,
467        })
468    }
469
470    /// Log an add operation to WAL
471    pub fn log_add(&mut self, doc: &Document) -> Result<()> {
472        self.log_operation(WalOperation::Add(doc.clone()))
473    }
474
475    /// Log a remove operation to WAL
476    pub fn log_remove(&mut self, id: &str) -> Result<()> {
477        self.log_operation(WalOperation::Remove(id.to_string()))
478    }
479
480    /// Log a clear operation to WAL
481    pub fn log_clear(&mut self) -> Result<()> {
482        self.log_operation(WalOperation::Clear)
483    }
484
485    /// Log an operation to WAL
486    fn log_operation(&mut self, operation: WalOperation) -> Result<()> {
487        self.manifest.wal_seq += 1;
488        self.manifest.ops_since_checkpoint += 1;
489
490        let entry = WalEntry::new(self.manifest.wal_seq, operation);
491
492        if let Some(ref mut writer) = self.wal_writer {
493            writer.append(&entry)?;
494            self.ops_since_sync += 1;
495
496            // Periodic sync
497            if self.ops_since_sync >= self.config.wal_sync_interval {
498                writer.sync()?;
499                self.ops_since_sync = 0;
500            }
501        }
502
503        // Update manifest
504        self.manifest.last_modified = SystemTime::now()
505            .duration_since(UNIX_EPOCH)
506            .unwrap()
507            .as_secs();
508
509        Ok(())
510    }
511
512    /// Check if checkpoint is needed
513    pub fn needs_checkpoint(&self) -> bool {
514        self.manifest.ops_since_checkpoint >= self.config.checkpoint_threshold
515            || self.wal_writer.as_ref().map(|w| w.size()).unwrap_or(0) >= self.config.max_wal_size
516    }
517
518    /// Create a checkpoint from serializable index data
519    pub fn checkpoint<T: Serialize>(
520        &mut self,
521        index: &T,
522        meta: IndexMetadata,
523    ) -> Result<CheckpointMeta> {
524        // Sync WAL first
525        if let Some(ref mut writer) = self.wal_writer {
526            writer.sync()?;
527        }
528
529        let checkpoint_id = self.manifest.current_checkpoint.map(|c| c + 1).unwrap_or(1);
530
531        // Serialize index
532        let data = serde_json::to_vec(index)
533            .map_err(|e| RagError::StorageError(format!("Checkpoint serialize failed: {}", e)))?;
534        let original_size = data.len();
535
536        // Compress
537        let (compressed, _stats) = compression::compress_with(&data, self.config.checkpoint_codec)?;
538        let compressed_size = compressed.len();
539
540        // Write checkpoint file
541        let checkpoint_path = self
542            .base_path
543            .join(format!("checkpoint_{:05}.bin", checkpoint_id));
544        Self::write_atomic_file(&checkpoint_path, &compressed)
545            .map_err(|e| RagError::StorageError(format!("Failed to write checkpoint: {}", e)))?;
546
547        // Create checkpoint metadata
548        let checkpoint_meta = CheckpointMeta {
549            id: checkpoint_id,
550            wal_seq: self.manifest.wal_seq,
551            document_count: meta.document_count,
552            embedding_dim: meta.embedding_dim,
553            index_type: meta.index_type.clone(),
554            created_at: SystemTime::now()
555                .duration_since(UNIX_EPOCH)
556                .unwrap()
557                .as_secs(),
558            original_size,
559            compressed_size,
560            codec: self.config.checkpoint_codec,
561        };
562
563        // Write checkpoint metadata
564        let meta_path = self
565            .base_path
566            .join(format!("checkpoint_{:05}.meta", checkpoint_id));
567        let meta_json = serde_json::to_string_pretty(&checkpoint_meta)
568            .map_err(|e| RagError::StorageError(format!("Failed to serialize meta: {}", e)))?;
569        Self::write_atomic_file(&meta_path, meta_json.as_bytes()).map_err(|e| {
570            RagError::StorageError(format!("Failed to write checkpoint meta: {}", e))
571        })?;
572
573        // Log checkpoint marker to WAL
574        self.manifest.wal_seq += 1;
575        let entry = WalEntry::new(
576            self.manifest.wal_seq,
577            WalOperation::Checkpoint { checkpoint_id },
578        );
579        if let Some(ref mut writer) = self.wal_writer {
580            writer.append(&entry)?;
581            writer.sync()?;
582        }
583
584        // Update manifest
585        self.manifest.current_checkpoint = Some(checkpoint_id);
586        self.manifest.ops_since_checkpoint = 0;
587        self.manifest.total_documents = meta.document_count;
588        self.manifest.embedding_dim = meta.embedding_dim;
589        self.manifest.index_type = meta.index_type;
590        self.save_manifest()?;
591
592        // Rotate WAL
593        self.rotate_wal(checkpoint_id)?;
594
595        // Clean old checkpoints
596        self.cleanup_old_checkpoints(checkpoint_id)?;
597
598        Ok(checkpoint_meta)
599    }
600
601    /// Load checkpoint and return deserialized data
602    pub fn load_checkpoint<T: for<'de> Deserialize<'de>>(
603        &self,
604    ) -> Result<Option<(T, CheckpointMeta)>> {
605        let checkpoint_id = match self.manifest.current_checkpoint {
606            Some(id) => id,
607            None => return Ok(None),
608        };
609
610        // Load metadata
611        let meta_path = self
612            .base_path
613            .join(format!("checkpoint_{:05}.meta", checkpoint_id));
614        let meta_json = fs::read_to_string(&meta_path).map_err(|e| {
615            RagError::StorageError(format!("Failed to read checkpoint meta: {}", e))
616        })?;
617        let meta: CheckpointMeta = serde_json::from_str(&meta_json).map_err(|e| {
618            RagError::StorageError(format!("Failed to parse checkpoint meta: {}", e))
619        })?;
620
621        // Load and decompress checkpoint
622        let checkpoint_path = self
623            .base_path
624            .join(format!("checkpoint_{:05}.bin", checkpoint_id));
625        let compressed = fs::read(&checkpoint_path)
626            .map_err(|e| RagError::StorageError(format!("Failed to read checkpoint: {}", e)))?;
627        let data = compression::decompress(&compressed)?;
628
629        // Deserialize
630        let index: T = serde_json::from_slice(&data)
631            .map_err(|e| RagError::StorageError(format!("Checkpoint deserialize failed: {}", e)))?;
632
633        Ok(Some((index, meta)))
634    }
635
636    /// Get WAL entries since last checkpoint
637    pub fn get_wal_entries(&self) -> Result<Vec<WalEntry>> {
638        let checkpoint_seq = if let Some(cp_id) = self.manifest.current_checkpoint {
639            // Find the checkpoint marker seq
640            let meta_path = self.base_path.join(format!("checkpoint_{:05}.meta", cp_id));
641            if meta_path.exists() {
642                let meta_json = fs::read_to_string(&meta_path)
643                    .map_err(|e| RagError::StorageError(format!("Failed to read meta: {}", e)))?;
644                let meta: CheckpointMeta = serde_json::from_str(&meta_json)
645                    .map_err(|e| RagError::StorageError(format!("Failed to parse meta: {}", e)))?;
646                meta.wal_seq
647            } else {
648                0
649            }
650        } else {
651            0
652        };
653
654        let wal_path = self.base_path.join(format!(
655            "wal_{:05}.log",
656            self.manifest.current_checkpoint.unwrap_or(0)
657        ));
658
659        if !wal_path.exists() {
660            return Ok(Vec::new());
661        }
662
663        let mut reader = WalReader::open(&wal_path)?;
664        reader.read_from(checkpoint_seq)
665    }
666
667    /// Get current manifest state
668    pub fn manifest(&self) -> &Manifest {
669        &self.manifest
670    }
671
672    /// Get storage statistics
673    pub fn stats(&self) -> StorageStats {
674        let wal_size = self.wal_writer.as_ref().map(|w| w.size()).unwrap_or(0);
675
676        let checkpoint_size = self
677            .manifest
678            .current_checkpoint
679            .map(|id| {
680                let path = self.base_path.join(format!("checkpoint_{:05}.bin", id));
681                fs::metadata(&path).map(|m| m.len() as usize).unwrap_or(0)
682            })
683            .unwrap_or(0);
684
685        StorageStats {
686            checkpoint_id: self.manifest.current_checkpoint,
687            wal_size,
688            checkpoint_size,
689            total_size: wal_size + checkpoint_size,
690            ops_since_checkpoint: self.manifest.ops_since_checkpoint,
691            total_documents: self.manifest.total_documents,
692        }
693    }
694
695    /// Force sync WAL to disk
696    pub fn sync(&mut self) -> Result<()> {
697        if let Some(ref mut writer) = self.wal_writer {
698            writer.sync()?;
699        }
700        self.save_manifest()?;
701        Ok(())
702    }
703
704    fn save_manifest(&self) -> Result<()> {
705        let manifest_path = self.base_path.join("manifest.json");
706        let json = serde_json::to_string_pretty(&self.manifest)
707            .map_err(|e| RagError::StorageError(format!("Failed to serialize manifest: {}", e)))?;
708        Self::write_atomic_file(&manifest_path, json.as_bytes())
709            .map_err(|e| RagError::StorageError(format!("Failed to write manifest: {}", e)))?;
710        Ok(())
711    }
712
713    fn atomic_tmp_path(path: &Path) -> PathBuf {
714        let file_name = path.file_name().and_then(|f| f.to_str()).unwrap_or("file");
715        let counter = ATOMIC_WRITE_COUNTER.fetch_add(1, Ordering::Relaxed);
716        path.with_file_name(format!(
717            "{}.{}.{}.tmp",
718            file_name,
719            std::process::id(),
720            counter
721        ))
722    }
723
724    fn write_atomic_file(path: &Path, data: &[u8]) -> std::io::Result<()> {
725        let tmp_path = Self::atomic_tmp_path(path);
726        {
727            let mut file = File::create(&tmp_path)?;
728            file.write_all(data)?;
729            file.sync_all()?;
730        }
731        fs::rename(&tmp_path, path).inspect_err(|_| {
732            let _ = fs::remove_file(&tmp_path);
733        })?;
734        Ok(())
735    }
736
737    fn rotate_wal(&mut self, checkpoint_id: u64) -> Result<()> {
738        // Close current WAL
739        if let Some(ref mut writer) = self.wal_writer {
740            writer.sync()?;
741        }
742
743        // Open new WAL FIRST (before deleting old).
744        // This ensures we always have a valid WAL even if deletion fails.
745        let new_wal_path = self.base_path.join(format!("wal_{:05}.log", checkpoint_id));
746        let new_writer = WalWriter::open(&new_wal_path, self.config.sync_on_write)?;
747        self.wal_writer = Some(new_writer);
748
749        // Now safe to delete old WAL.
750        let old_wal = self
751            .base_path
752            .join(format!("wal_{:05}.log", checkpoint_id.saturating_sub(1)));
753        if old_wal.exists() && old_wal != new_wal_path {
754            let _ = fs::remove_file(&old_wal);
755        }
756
757        Ok(())
758    }
759
760    fn cleanup_old_checkpoints(&self, current_id: u64) -> Result<()> {
761        let cutoff = if self.config.keep_checkpoints == 0 {
762            current_id.saturating_sub(1)
763        } else {
764            current_id.saturating_sub(self.config.keep_checkpoints as u64)
765        };
766
767        for entry in fs::read_dir(&self.base_path)
768            .map_err(|e| RagError::StorageError(format!("Failed to read dir: {}", e)))?
769        {
770            let entry =
771                entry.map_err(|e| RagError::StorageError(format!("Dir entry error: {}", e)))?;
772            let name = entry.file_name().to_string_lossy().to_string();
773
774            if name.starts_with("checkpoint_") {
775                // Extract checkpoint ID
776                if let Some(id_str) = name
777                    .strip_prefix("checkpoint_")
778                    .and_then(|s| s.split('.').next())
779                {
780                    if let Ok(id) = id_str.parse::<u64>() {
781                        if id <= cutoff {
782                            let _ = fs::remove_file(entry.path());
783                        }
784                    }
785                }
786            }
787        }
788
789        Ok(())
790    }
791}
792
793/// Metadata about the index for checkpointing
794#[derive(Debug, Clone)]
795pub struct IndexMetadata {
796    pub document_count: usize,
797    pub embedding_dim: usize,
798    pub index_type: String,
799}
800
801/// Storage statistics
802#[derive(Debug, Clone)]
803pub struct StorageStats {
804    /// Current checkpoint ID
805    pub checkpoint_id: Option<u64>,
806    /// WAL file size in bytes
807    pub wal_size: usize,
808    /// Checkpoint file size in bytes
809    pub checkpoint_size: usize,
810    /// Total storage size
811    pub total_size: usize,
812    /// Operations since last checkpoint
813    pub ops_since_checkpoint: usize,
814    /// Total documents in index
815    pub total_documents: usize,
816}
817
818// ============================================================================
819// Recovery Helper
820// ============================================================================
821
822/// Helper for recovering an index from storage
823pub struct RecoveryHelper<'a> {
824    storage: &'a IncrementalStorage,
825}
826
827impl<'a> RecoveryHelper<'a> {
828    pub fn new(storage: &'a IncrementalStorage) -> Self {
829        Self { storage }
830    }
831
832    /// Replay WAL entries on an index
833    pub fn replay_wal<F>(&self, mut apply_op: F) -> Result<usize>
834    where
835        F: FnMut(&WalOperation) -> Result<()>,
836    {
837        let entries = self.storage.get_wal_entries()?;
838        let count = entries.len();
839
840        for entry in entries {
841            match &entry.operation {
842                WalOperation::Checkpoint { .. } => {
843                    // Skip checkpoint markers
844                    continue;
845                }
846                op => apply_op(op)?,
847            }
848        }
849
850        Ok(count)
851    }
852}
853
854// ============================================================================
855// Tests
856// ============================================================================
857
858#[cfg(test)]
859mod tests {
860    use super::*;
861    use tempfile::TempDir;
862
863    fn create_test_document(id: &str, dim: usize) -> Document {
864        Document {
865            id: id.to_string(),
866            content: format!("Content for {}", id),
867            embedding: vec![0.1; dim],
868            metadata: None,
869        }
870    }
871
872    #[test]
873    fn test_config_builder() {
874        let config = IncrementalConfig::default()
875            .with_checkpoint_threshold(5000)
876            .with_wal_sync_interval(50)
877            .with_max_wal_size(50 * 1024 * 1024)
878            .with_sync_on_write(true)
879            .with_keep_checkpoints(3);
880
881        assert_eq!(config.checkpoint_threshold, 5000);
882        assert_eq!(config.wal_sync_interval, 50);
883        assert_eq!(config.max_wal_size, 50 * 1024 * 1024);
884        assert!(config.sync_on_write);
885        assert_eq!(config.keep_checkpoints, 3);
886    }
887
888    #[test]
889    fn test_wal_entry_integrity() {
890        let entry = WalEntry::new(1, WalOperation::Add(create_test_document("doc1", 128)));
891        assert!(entry.verify());
892
893        // Tamper with entry
894        let mut tampered = entry.clone();
895        tampered.seq = 999;
896        assert!(!tampered.verify());
897    }
898
899    #[test]
900    fn test_storage_creation() {
901        let dir = TempDir::new().unwrap();
902        let storage = IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
903
904        assert!(storage.manifest().current_checkpoint.is_none());
905        assert_eq!(storage.manifest().wal_seq, 0);
906    }
907
908    #[test]
909    fn test_wal_logging() {
910        let dir = TempDir::new().unwrap();
911        let mut storage =
912            IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
913
914        // Log some operations
915        storage.log_add(&create_test_document("doc1", 128)).unwrap();
916        storage.log_add(&create_test_document("doc2", 128)).unwrap();
917        storage.log_remove("doc1").unwrap();
918
919        assert_eq!(storage.manifest().wal_seq, 3);
920        assert_eq!(storage.manifest().ops_since_checkpoint, 3);
921
922        // Force sync
923        storage.sync().unwrap();
924
925        // Read back WAL
926        let entries = storage.get_wal_entries().unwrap();
927        assert_eq!(entries.len(), 3);
928
929        match &entries[0].operation {
930            WalOperation::Add(doc) => assert_eq!(doc.id, "doc1"),
931            _ => panic!("Expected Add operation"),
932        }
933
934        match &entries[2].operation {
935            WalOperation::Remove(id) => assert_eq!(id, "doc1"),
936            _ => panic!("Expected Remove operation"),
937        }
938    }
939
940    #[test]
941    fn test_checkpoint_and_recovery() {
942        let dir = TempDir::new().unwrap();
943
944        // Create storage and log some operations
945        let mut storage = IncrementalStorage::new(
946            dir.path(),
947            IncrementalConfig::default().with_checkpoint_threshold(100),
948        )
949        .unwrap();
950
951        // Simulate index data (use String for serialization)
952        let test_data: Vec<String> =
953            vec!["doc1".to_string(), "doc2".to_string(), "doc3".to_string()];
954        for id in &test_data {
955            storage.log_add(&create_test_document(id, 128)).unwrap();
956        }
957
958        // Create checkpoint
959        let meta = storage
960            .checkpoint(
961                &test_data,
962                IndexMetadata {
963                    document_count: 3,
964                    embedding_dim: 128,
965                    index_type: "test".to_string(),
966                },
967            )
968            .unwrap();
969
970        assert_eq!(meta.id, 1);
971        assert_eq!(meta.document_count, 3);
972
973        // Log more operations after checkpoint
974        storage.log_add(&create_test_document("doc4", 128)).unwrap();
975        storage.sync().unwrap();
976
977        // Verify we can load checkpoint
978        let (loaded_data, loaded_meta): (Vec<String>, CheckpointMeta) =
979            storage.load_checkpoint().unwrap().unwrap();
980        assert_eq!(loaded_data, test_data);
981        assert_eq!(loaded_meta.id, 1);
982
983        // Verify WAL has the post-checkpoint entry
984        let entries = storage.get_wal_entries().unwrap();
985        assert_eq!(entries.len(), 1);
986        match &entries[0].operation {
987            WalOperation::Add(doc) => assert_eq!(doc.id, "doc4"),
988            _ => panic!("Expected Add operation"),
989        }
990    }
991
992    #[test]
993    fn test_needs_checkpoint() {
994        let dir = TempDir::new().unwrap();
995        let mut storage = IncrementalStorage::new(
996            dir.path(),
997            IncrementalConfig::default().with_checkpoint_threshold(5),
998        )
999        .unwrap();
1000
1001        for i in 0..4 {
1002            storage
1003                .log_add(&create_test_document(&format!("doc{}", i), 128))
1004                .unwrap();
1005        }
1006        assert!(!storage.needs_checkpoint());
1007
1008        storage.log_add(&create_test_document("doc5", 128)).unwrap();
1009        assert!(storage.needs_checkpoint());
1010    }
1011
1012    #[test]
1013    fn test_storage_stats() {
1014        let dir = TempDir::new().unwrap();
1015        let mut storage =
1016            IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1017
1018        for i in 0..10 {
1019            storage
1020                .log_add(&create_test_document(&format!("doc{}", i), 128))
1021                .unwrap();
1022        }
1023        storage.sync().unwrap();
1024
1025        let stats = storage.stats();
1026        assert!(stats.wal_size > 0);
1027        assert_eq!(stats.ops_since_checkpoint, 10);
1028        assert!(stats.checkpoint_id.is_none());
1029    }
1030
1031    #[test]
1032    fn test_recovery_helper() {
1033        let dir = TempDir::new().unwrap();
1034        let mut storage =
1035            IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1036
1037        // Log operations
1038        storage.log_add(&create_test_document("doc1", 128)).unwrap();
1039        storage.log_add(&create_test_document("doc2", 128)).unwrap();
1040        storage.log_remove("doc1").unwrap();
1041        storage.sync().unwrap();
1042
1043        // Use recovery helper
1044        let helper = RecoveryHelper::new(&storage);
1045        let mut adds = 0;
1046        let mut removes = 0;
1047
1048        helper
1049            .replay_wal(|op| {
1050                match op {
1051                    WalOperation::Add(_) => adds += 1,
1052                    WalOperation::Remove(_) => removes += 1,
1053                    _ => {}
1054                }
1055                Ok(())
1056            })
1057            .unwrap();
1058
1059        assert_eq!(adds, 2);
1060        assert_eq!(removes, 1);
1061    }
1062
1063    #[test]
1064    fn test_persistence_across_reopens() {
1065        let dir = TempDir::new().unwrap();
1066
1067        // First session
1068        {
1069            let mut storage =
1070                IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1071            storage.log_add(&create_test_document("doc1", 128)).unwrap();
1072            storage.log_add(&create_test_document("doc2", 128)).unwrap();
1073            storage.sync().unwrap();
1074        }
1075
1076        // Reopen
1077        {
1078            let storage =
1079                IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1080            assert_eq!(storage.manifest().wal_seq, 2);
1081
1082            let entries = storage.get_wal_entries().unwrap();
1083            assert_eq!(entries.len(), 2);
1084        }
1085    }
1086
1087    #[test]
1088    fn test_keep_checkpoints_zero_prunes_old_checkpoints() {
1089        let dir = TempDir::new().unwrap();
1090        let mut storage = IncrementalStorage::new(
1091            dir.path(),
1092            IncrementalConfig::default().with_keep_checkpoints(0),
1093        )
1094        .unwrap();
1095
1096        let data = vec!["doc".to_string()];
1097        for checkpoint_no in 0..3 {
1098            storage
1099                .checkpoint(
1100                    &data,
1101                    IndexMetadata {
1102                        document_count: 1,
1103                        embedding_dim: 128,
1104                        index_type: format!("test_{}", checkpoint_no),
1105                    },
1106                )
1107                .unwrap();
1108        }
1109
1110        let checkpoint_bins: Vec<_> = fs::read_dir(dir.path())
1111            .unwrap()
1112            .filter_map(|e| e.ok())
1113            .map(|e| e.file_name().to_string_lossy().to_string())
1114            .filter(|name| name.starts_with("checkpoint_") && name.ends_with(".bin"))
1115            .collect();
1116
1117        assert_eq!(
1118            checkpoint_bins.len(),
1119            1,
1120            "keep_checkpoints=0 should keep only current checkpoint"
1121        );
1122    }
1123
1124    #[test]
1125    fn test_keep_checkpoints_exact_retention_count() {
1126        let dir = TempDir::new().unwrap();
1127        let mut storage = IncrementalStorage::new(
1128            dir.path(),
1129            IncrementalConfig::default().with_keep_checkpoints(2),
1130        )
1131        .unwrap();
1132
1133        let data = vec!["doc".to_string()];
1134        for checkpoint_no in 0..5 {
1135            storage
1136                .checkpoint(
1137                    &data,
1138                    IndexMetadata {
1139                        document_count: 1,
1140                        embedding_dim: 128,
1141                        index_type: format!("test_{}", checkpoint_no),
1142                    },
1143                )
1144                .unwrap();
1145        }
1146
1147        let checkpoint_bins: Vec<_> = fs::read_dir(dir.path())
1148            .unwrap()
1149            .filter_map(|e| e.ok())
1150            .map(|e| e.file_name().to_string_lossy().to_string())
1151            .filter(|name| name.starts_with("checkpoint_") && name.ends_with(".bin"))
1152            .collect();
1153
1154        assert_eq!(
1155            checkpoint_bins.len(),
1156            2,
1157            "retention should keep exactly keep_checkpoints checkpoint files"
1158        );
1159    }
1160
1161    #[test]
1162    fn test_wal_roundtrip_with_metadata() {
1163        let dir = TempDir::new().unwrap();
1164        let mut storage =
1165            IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1166
1167        let doc = Document {
1168            id: "meta-doc".to_string(),
1169            content: "has metadata".to_string(),
1170            embedding: vec![0.1; 4],
1171            metadata: Some(serde_json::json!({
1172                "scope": "workspace",
1173                "tags": ["rust", "ai"],
1174                "priority": 5
1175            })),
1176        };
1177
1178        storage.log_add(&doc).unwrap();
1179        storage.sync().unwrap();
1180
1181        // Read back and verify metadata survived the roundtrip.
1182        let entries = storage.get_wal_entries().unwrap();
1183        assert_eq!(entries.len(), 1);
1184        match &entries[0].operation {
1185            WalOperation::Add(recovered) => {
1186                assert_eq!(recovered.id, "meta-doc");
1187                let meta = recovered.metadata.as_ref().unwrap();
1188                assert_eq!(meta["scope"], "workspace");
1189                assert_eq!(meta["priority"], 5);
1190                assert_eq!(meta["tags"][0], "rust");
1191            }
1192            _ => panic!("Expected Add operation"),
1193        }
1194    }
1195
1196    #[test]
1197    fn test_checkpoint_roundtrip_with_metadata() {
1198        let dir = TempDir::new().unwrap();
1199        let mut storage =
1200            IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1201
1202        let docs = vec![
1203            Document {
1204                id: "d1".to_string(),
1205                content: "first".to_string(),
1206                embedding: vec![1.0, 0.0],
1207                metadata: Some(serde_json::json!({"lang": "rust"})),
1208            },
1209            Document {
1210                id: "d2".to_string(),
1211                content: "second".to_string(),
1212                embedding: vec![0.0, 1.0],
1213                metadata: None,
1214            },
1215        ];
1216
1217        storage
1218            .checkpoint(
1219                &docs,
1220                IndexMetadata {
1221                    document_count: 2,
1222                    embedding_dim: 2,
1223                    index_type: "hnsw".to_string(),
1224                },
1225            )
1226            .unwrap();
1227
1228        let (loaded, meta): (Vec<Document>, CheckpointMeta) =
1229            storage.load_checkpoint().unwrap().unwrap();
1230        assert_eq!(meta.document_count, 2);
1231        assert_eq!(loaded.len(), 2);
1232
1233        assert_eq!(loaded[0].id, "d1");
1234        assert_eq!(loaded[0].metadata.as_ref().unwrap()["lang"], "rust");
1235
1236        assert_eq!(loaded[1].id, "d2");
1237        assert!(loaded[1].metadata.is_none());
1238    }
1239
1240    #[test]
1241    fn test_recovery_ignores_truncated_tail_entry() {
1242        let dir = TempDir::new().unwrap();
1243        {
1244            let mut storage =
1245                IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1246            storage.log_add(&create_test_document("doc1", 4)).unwrap();
1247            storage.sync().unwrap();
1248        }
1249
1250        let wal_path = dir.path().join("wal_00000.log");
1251        let torn_entry = WalEntry::new(2, WalOperation::Add(create_test_document("doc2", 4)));
1252        let torn_payload = serde_json::to_vec(&torn_entry).unwrap();
1253        let torn_len = torn_payload.len() as u32;
1254
1255        let mut file = OpenOptions::new().append(true).open(&wal_path).unwrap();
1256        file.write_all(&torn_len.to_le_bytes()).unwrap();
1257        file.write_all(&torn_payload[..torn_payload.len() / 2])
1258            .unwrap();
1259        file.sync_all().unwrap();
1260
1261        let storage = IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1262        let entries = storage.get_wal_entries().unwrap();
1263        assert_eq!(entries.len(), 1);
1264        match &entries[0].operation {
1265            WalOperation::Add(doc) => assert_eq!(doc.id, "doc1"),
1266            _ => panic!("expected Add operation"),
1267        }
1268    }
1269
1270    #[test]
1271    fn test_atomic_writes_leave_no_tmp_files_after_repeated_checkpoints() {
1272        let dir = TempDir::new().unwrap();
1273        let mut storage =
1274            IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1275
1276        for i in 0..8 {
1277            let payload = vec![format!("doc-{i}")];
1278            storage
1279                .checkpoint(
1280                    &payload,
1281                    IndexMetadata {
1282                        document_count: 1,
1283                        embedding_dim: 128,
1284                        index_type: "hnsw".to_string(),
1285                    },
1286                )
1287                .unwrap();
1288        }
1289
1290        let has_tmp = fs::read_dir(dir.path())
1291            .unwrap()
1292            .filter_map(|e| e.ok())
1293            .map(|e| e.file_name().to_string_lossy().to_string())
1294            .any(|name| name.ends_with(".tmp"));
1295        assert!(!has_tmp);
1296    }
1297}