1#![cfg(not(target_arch = "wasm32"))]
74
75use crate::storage::compression::{self, Codec};
76use crate::{Document, RagError, Result};
77use serde::{Deserialize, Serialize};
78use std::fs::{self, File, OpenOptions};
79use std::io::{BufReader, BufWriter, Read, Write};
80use std::path::{Path, PathBuf};
81use std::sync::atomic::{AtomicU64, Ordering};
82use std::time::{SystemTime, UNIX_EPOCH};
83
84static ATOMIC_WRITE_COUNTER: AtomicU64 = AtomicU64::new(0);
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct IncrementalConfig {
93 pub checkpoint_threshold: usize,
95 pub wal_sync_interval: usize,
97 pub max_wal_size: usize,
99 pub checkpoint_codec: Codec,
101 pub sync_on_write: bool,
103 pub keep_checkpoints: usize,
105}
106
107impl Default for IncrementalConfig {
108 fn default() -> Self {
109 Self {
110 checkpoint_threshold: 10_000,
111 wal_sync_interval: 100,
112 max_wal_size: 100 * 1024 * 1024, checkpoint_codec: Codec::Gzip,
114 sync_on_write: false,
115 keep_checkpoints: 2,
116 }
117 }
118}
119
120impl IncrementalConfig {
121 pub fn with_checkpoint_threshold(mut self, threshold: usize) -> Self {
123 self.checkpoint_threshold = threshold;
124 self
125 }
126
127 pub fn with_wal_sync_interval(mut self, interval: usize) -> Self {
129 self.wal_sync_interval = interval;
130 self
131 }
132
133 pub fn with_max_wal_size(mut self, size: usize) -> Self {
135 self.max_wal_size = size;
136 self
137 }
138
139 pub fn with_checkpoint_codec(mut self, codec: Codec) -> Self {
141 self.checkpoint_codec = codec;
142 self
143 }
144
145 pub fn with_sync_on_write(mut self, sync: bool) -> Self {
147 self.sync_on_write = sync;
148 self
149 }
150
151 pub fn with_keep_checkpoints(mut self, count: usize) -> Self {
153 self.keep_checkpoints = count;
154 self
155 }
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
164pub enum WalOperation {
165 Add(Document),
167 Remove(String),
169 Clear,
171 Checkpoint { checkpoint_id: u64 },
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct WalEntry {
178 pub seq: u64,
180 pub timestamp: u64,
182 pub operation: WalOperation,
184 pub checksum: u32,
186}
187
188impl WalEntry {
189 fn new(seq: u64, operation: WalOperation) -> Self {
191 let timestamp = SystemTime::now()
192 .duration_since(UNIX_EPOCH)
193 .unwrap()
194 .as_millis() as u64;
195
196 let mut entry = Self {
197 seq,
198 timestamp,
199 operation,
200 checksum: 0,
201 };
202
203 entry.checksum = entry.compute_checksum();
204 entry
205 }
206
207 fn compute_checksum(&self) -> u32 {
209 let data = serde_json::to_vec(&(&self.seq, &self.timestamp, &self.operation)).unwrap();
210 crc32fast::hash(&data)
211 }
212
213 pub fn verify(&self) -> bool {
215 self.checksum == self.compute_checksum()
218 }
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct Manifest {
228 pub current_checkpoint: Option<u64>,
230 pub wal_seq: u64,
232 pub ops_since_checkpoint: usize,
234 pub total_documents: usize,
236 pub embedding_dim: usize,
238 pub index_type: String,
240 pub last_modified: u64,
242}
243
244impl Default for Manifest {
245 fn default() -> Self {
246 Self {
247 current_checkpoint: None,
248 wal_seq: 0,
249 ops_since_checkpoint: 0,
250 total_documents: 0,
251 embedding_dim: 0,
252 index_type: String::new(),
253 last_modified: 0,
254 }
255 }
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
264pub struct CheckpointMeta {
265 pub id: u64,
267 pub wal_seq: u64,
269 pub document_count: usize,
271 pub embedding_dim: usize,
273 pub index_type: String,
275 pub created_at: u64,
277 pub original_size: usize,
279 pub compressed_size: usize,
281 pub codec: Codec,
283}
284
285struct WalWriter {
291 file: BufWriter<File>,
292 #[allow(dead_code)]
293 path: PathBuf, current_size: usize,
295 sync_on_write: bool,
296}
297
298impl WalWriter {
299 fn open(path: &Path, sync_on_write: bool) -> Result<Self> {
301 let file = OpenOptions::new()
302 .create(true)
303 .append(true)
304 .open(path)
305 .map_err(|e| RagError::StorageError(format!("Failed to open WAL: {}", e)))?;
306
307 let current_size = file.metadata().map(|m| m.len() as usize).unwrap_or(0);
308
309 Ok(Self {
310 file: BufWriter::new(file),
311 path: path.to_path_buf(),
312 current_size,
313 sync_on_write,
314 })
315 }
316
317 fn append(&mut self, entry: &WalEntry) -> Result<()> {
319 let data = serde_json::to_vec(entry)
320 .map_err(|e| RagError::StorageError(format!("WAL serialize failed: {}", e)))?;
321 let len = data.len() as u32;
322
323 self.file
325 .write_all(&len.to_le_bytes())
326 .map_err(|e| RagError::StorageError(format!("WAL write failed: {}", e)))?;
327 self.file
328 .write_all(&data)
329 .map_err(|e| RagError::StorageError(format!("WAL write failed: {}", e)))?;
330
331 self.current_size += 4 + data.len();
332
333 if self.sync_on_write {
334 self.sync()?;
335 }
336
337 Ok(())
338 }
339
340 fn sync(&mut self) -> Result<()> {
342 self.file
343 .flush()
344 .map_err(|e| RagError::StorageError(format!("WAL sync failed: {}", e)))?;
345 self.file
346 .get_ref()
347 .sync_all()
348 .map_err(|e| RagError::StorageError(format!("WAL sync failed: {}", e)))?;
349 Ok(())
350 }
351
352 fn size(&self) -> usize {
354 self.current_size
355 }
356}
357
358struct WalReader {
364 file: BufReader<File>,
365}
366
367impl WalReader {
368 fn open(path: &Path) -> Result<Self> {
370 let file = File::open(path)
371 .map_err(|e| RagError::StorageError(format!("Failed to open WAL: {}", e)))?;
372 Ok(Self {
373 file: BufReader::new(file),
374 })
375 }
376
377 fn read_from(&mut self, from_seq: u64) -> Result<Vec<WalEntry>> {
379 let mut entries = Vec::new();
380 let mut len_buf = [0u8; 4];
381
382 loop {
383 match self.file.read_exact(&mut len_buf) {
385 Ok(()) => {}
386 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
387 Err(e) => return Err(RagError::StorageError(format!("WAL read failed: {}", e))),
388 }
389
390 let len = u32::from_le_bytes(len_buf) as usize;
391 let mut data = vec![0u8; len];
392 match self.file.read_exact(&mut data) {
393 Ok(()) => {}
394 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
395 Err(e) => return Err(RagError::StorageError(format!("WAL read failed: {}", e))),
396 }
397
398 let entry: WalEntry = serde_json::from_slice(&data)
399 .map_err(|e| RagError::StorageError(format!("WAL deserialize failed: {}", e)))?;
400
401 if !entry.verify() {
403 return Err(RagError::StorageError(format!(
404 "WAL entry {} failed integrity check",
405 entry.seq
406 )));
407 }
408
409 if entry.seq > from_seq {
411 entries.push(entry);
412 }
413 }
414
415 Ok(entries)
416 }
417}
418
419pub struct IncrementalStorage {
427 base_path: PathBuf,
428 config: IncrementalConfig,
429 manifest: Manifest,
430 wal_writer: Option<WalWriter>,
431 ops_since_sync: usize,
432}
433
434impl IncrementalStorage {
435 pub fn new<P: AsRef<Path>>(base_path: P, config: IncrementalConfig) -> Result<Self> {
437 let base_path = base_path.as_ref().to_path_buf();
438
439 fs::create_dir_all(&base_path)
441 .map_err(|e| RagError::StorageError(format!("Failed to create storage dir: {}", e)))?;
442
443 let manifest_path = base_path.join("manifest.json");
445 let manifest = if manifest_path.exists() {
446 let data = fs::read_to_string(&manifest_path)
447 .map_err(|e| RagError::StorageError(format!("Failed to read manifest: {}", e)))?;
448 serde_json::from_str(&data)
449 .map_err(|e| RagError::StorageError(format!("Failed to parse manifest: {}", e)))?
450 } else {
451 Manifest::default()
452 };
453
454 let wal_path = base_path.join(format!(
456 "wal_{:05}.log",
457 manifest.current_checkpoint.unwrap_or(0)
458 ));
459 let wal_writer = WalWriter::open(&wal_path, config.sync_on_write)?;
460
461 Ok(Self {
462 base_path,
463 config,
464 manifest,
465 wal_writer: Some(wal_writer),
466 ops_since_sync: 0,
467 })
468 }
469
470 pub fn log_add(&mut self, doc: &Document) -> Result<()> {
472 self.log_operation(WalOperation::Add(doc.clone()))
473 }
474
475 pub fn log_remove(&mut self, id: &str) -> Result<()> {
477 self.log_operation(WalOperation::Remove(id.to_string()))
478 }
479
480 pub fn log_clear(&mut self) -> Result<()> {
482 self.log_operation(WalOperation::Clear)
483 }
484
485 fn log_operation(&mut self, operation: WalOperation) -> Result<()> {
487 self.manifest.wal_seq += 1;
488 self.manifest.ops_since_checkpoint += 1;
489
490 let entry = WalEntry::new(self.manifest.wal_seq, operation);
491
492 if let Some(ref mut writer) = self.wal_writer {
493 writer.append(&entry)?;
494 self.ops_since_sync += 1;
495
496 if self.ops_since_sync >= self.config.wal_sync_interval {
498 writer.sync()?;
499 self.ops_since_sync = 0;
500 }
501 }
502
503 self.manifest.last_modified = SystemTime::now()
505 .duration_since(UNIX_EPOCH)
506 .unwrap()
507 .as_secs();
508
509 Ok(())
510 }
511
512 pub fn needs_checkpoint(&self) -> bool {
514 self.manifest.ops_since_checkpoint >= self.config.checkpoint_threshold
515 || self.wal_writer.as_ref().map(|w| w.size()).unwrap_or(0) >= self.config.max_wal_size
516 }
517
518 pub fn checkpoint<T: Serialize>(
520 &mut self,
521 index: &T,
522 meta: IndexMetadata,
523 ) -> Result<CheckpointMeta> {
524 if let Some(ref mut writer) = self.wal_writer {
526 writer.sync()?;
527 }
528
529 let checkpoint_id = self.manifest.current_checkpoint.map(|c| c + 1).unwrap_or(1);
530
531 let data = serde_json::to_vec(index)
533 .map_err(|e| RagError::StorageError(format!("Checkpoint serialize failed: {}", e)))?;
534 let original_size = data.len();
535
536 let (compressed, _stats) = compression::compress_with(&data, self.config.checkpoint_codec)?;
538 let compressed_size = compressed.len();
539
540 let checkpoint_path = self
542 .base_path
543 .join(format!("checkpoint_{:05}.bin", checkpoint_id));
544 Self::write_atomic_file(&checkpoint_path, &compressed)
545 .map_err(|e| RagError::StorageError(format!("Failed to write checkpoint: {}", e)))?;
546
547 let checkpoint_meta = CheckpointMeta {
549 id: checkpoint_id,
550 wal_seq: self.manifest.wal_seq,
551 document_count: meta.document_count,
552 embedding_dim: meta.embedding_dim,
553 index_type: meta.index_type.clone(),
554 created_at: SystemTime::now()
555 .duration_since(UNIX_EPOCH)
556 .unwrap()
557 .as_secs(),
558 original_size,
559 compressed_size,
560 codec: self.config.checkpoint_codec,
561 };
562
563 let meta_path = self
565 .base_path
566 .join(format!("checkpoint_{:05}.meta", checkpoint_id));
567 let meta_json = serde_json::to_string_pretty(&checkpoint_meta)
568 .map_err(|e| RagError::StorageError(format!("Failed to serialize meta: {}", e)))?;
569 Self::write_atomic_file(&meta_path, meta_json.as_bytes()).map_err(|e| {
570 RagError::StorageError(format!("Failed to write checkpoint meta: {}", e))
571 })?;
572
573 self.manifest.wal_seq += 1;
575 let entry = WalEntry::new(
576 self.manifest.wal_seq,
577 WalOperation::Checkpoint { checkpoint_id },
578 );
579 if let Some(ref mut writer) = self.wal_writer {
580 writer.append(&entry)?;
581 writer.sync()?;
582 }
583
584 self.manifest.current_checkpoint = Some(checkpoint_id);
586 self.manifest.ops_since_checkpoint = 0;
587 self.manifest.total_documents = meta.document_count;
588 self.manifest.embedding_dim = meta.embedding_dim;
589 self.manifest.index_type = meta.index_type;
590 self.save_manifest()?;
591
592 self.rotate_wal(checkpoint_id)?;
594
595 self.cleanup_old_checkpoints(checkpoint_id)?;
597
598 Ok(checkpoint_meta)
599 }
600
601 pub fn load_checkpoint<T: for<'de> Deserialize<'de>>(
603 &self,
604 ) -> Result<Option<(T, CheckpointMeta)>> {
605 let checkpoint_id = match self.manifest.current_checkpoint {
606 Some(id) => id,
607 None => return Ok(None),
608 };
609
610 let meta_path = self
612 .base_path
613 .join(format!("checkpoint_{:05}.meta", checkpoint_id));
614 let meta_json = fs::read_to_string(&meta_path).map_err(|e| {
615 RagError::StorageError(format!("Failed to read checkpoint meta: {}", e))
616 })?;
617 let meta: CheckpointMeta = serde_json::from_str(&meta_json).map_err(|e| {
618 RagError::StorageError(format!("Failed to parse checkpoint meta: {}", e))
619 })?;
620
621 let checkpoint_path = self
623 .base_path
624 .join(format!("checkpoint_{:05}.bin", checkpoint_id));
625 let compressed = fs::read(&checkpoint_path)
626 .map_err(|e| RagError::StorageError(format!("Failed to read checkpoint: {}", e)))?;
627 let data = compression::decompress(&compressed)?;
628
629 let index: T = serde_json::from_slice(&data)
631 .map_err(|e| RagError::StorageError(format!("Checkpoint deserialize failed: {}", e)))?;
632
633 Ok(Some((index, meta)))
634 }
635
636 pub fn get_wal_entries(&self) -> Result<Vec<WalEntry>> {
638 let checkpoint_seq = if let Some(cp_id) = self.manifest.current_checkpoint {
639 let meta_path = self.base_path.join(format!("checkpoint_{:05}.meta", cp_id));
641 if meta_path.exists() {
642 let meta_json = fs::read_to_string(&meta_path)
643 .map_err(|e| RagError::StorageError(format!("Failed to read meta: {}", e)))?;
644 let meta: CheckpointMeta = serde_json::from_str(&meta_json)
645 .map_err(|e| RagError::StorageError(format!("Failed to parse meta: {}", e)))?;
646 meta.wal_seq
647 } else {
648 0
649 }
650 } else {
651 0
652 };
653
654 let wal_path = self.base_path.join(format!(
655 "wal_{:05}.log",
656 self.manifest.current_checkpoint.unwrap_or(0)
657 ));
658
659 if !wal_path.exists() {
660 return Ok(Vec::new());
661 }
662
663 let mut reader = WalReader::open(&wal_path)?;
664 reader.read_from(checkpoint_seq)
665 }
666
667 pub fn manifest(&self) -> &Manifest {
669 &self.manifest
670 }
671
672 pub fn stats(&self) -> StorageStats {
674 let wal_size = self.wal_writer.as_ref().map(|w| w.size()).unwrap_or(0);
675
676 let checkpoint_size = self
677 .manifest
678 .current_checkpoint
679 .map(|id| {
680 let path = self.base_path.join(format!("checkpoint_{:05}.bin", id));
681 fs::metadata(&path).map(|m| m.len() as usize).unwrap_or(0)
682 })
683 .unwrap_or(0);
684
685 StorageStats {
686 checkpoint_id: self.manifest.current_checkpoint,
687 wal_size,
688 checkpoint_size,
689 total_size: wal_size + checkpoint_size,
690 ops_since_checkpoint: self.manifest.ops_since_checkpoint,
691 total_documents: self.manifest.total_documents,
692 }
693 }
694
695 pub fn sync(&mut self) -> Result<()> {
697 if let Some(ref mut writer) = self.wal_writer {
698 writer.sync()?;
699 }
700 self.save_manifest()?;
701 Ok(())
702 }
703
704 fn save_manifest(&self) -> Result<()> {
705 let manifest_path = self.base_path.join("manifest.json");
706 let json = serde_json::to_string_pretty(&self.manifest)
707 .map_err(|e| RagError::StorageError(format!("Failed to serialize manifest: {}", e)))?;
708 Self::write_atomic_file(&manifest_path, json.as_bytes())
709 .map_err(|e| RagError::StorageError(format!("Failed to write manifest: {}", e)))?;
710 Ok(())
711 }
712
713 fn atomic_tmp_path(path: &Path) -> PathBuf {
714 let file_name = path.file_name().and_then(|f| f.to_str()).unwrap_or("file");
715 let counter = ATOMIC_WRITE_COUNTER.fetch_add(1, Ordering::Relaxed);
716 path.with_file_name(format!(
717 "{}.{}.{}.tmp",
718 file_name,
719 std::process::id(),
720 counter
721 ))
722 }
723
724 fn write_atomic_file(path: &Path, data: &[u8]) -> std::io::Result<()> {
725 let tmp_path = Self::atomic_tmp_path(path);
726 {
727 let mut file = File::create(&tmp_path)?;
728 file.write_all(data)?;
729 file.sync_all()?;
730 }
731 fs::rename(&tmp_path, path).inspect_err(|_| {
732 let _ = fs::remove_file(&tmp_path);
733 })?;
734 Ok(())
735 }
736
737 fn rotate_wal(&mut self, checkpoint_id: u64) -> Result<()> {
738 if let Some(ref mut writer) = self.wal_writer {
740 writer.sync()?;
741 }
742
743 let new_wal_path = self.base_path.join(format!("wal_{:05}.log", checkpoint_id));
746 let new_writer = WalWriter::open(&new_wal_path, self.config.sync_on_write)?;
747 self.wal_writer = Some(new_writer);
748
749 let old_wal = self
751 .base_path
752 .join(format!("wal_{:05}.log", checkpoint_id.saturating_sub(1)));
753 if old_wal.exists() && old_wal != new_wal_path {
754 let _ = fs::remove_file(&old_wal);
755 }
756
757 Ok(())
758 }
759
760 fn cleanup_old_checkpoints(&self, current_id: u64) -> Result<()> {
761 let cutoff = if self.config.keep_checkpoints == 0 {
762 current_id.saturating_sub(1)
763 } else {
764 current_id.saturating_sub(self.config.keep_checkpoints as u64)
765 };
766
767 for entry in fs::read_dir(&self.base_path)
768 .map_err(|e| RagError::StorageError(format!("Failed to read dir: {}", e)))?
769 {
770 let entry =
771 entry.map_err(|e| RagError::StorageError(format!("Dir entry error: {}", e)))?;
772 let name = entry.file_name().to_string_lossy().to_string();
773
774 if name.starts_with("checkpoint_") {
775 if let Some(id_str) = name
777 .strip_prefix("checkpoint_")
778 .and_then(|s| s.split('.').next())
779 {
780 if let Ok(id) = id_str.parse::<u64>() {
781 if id <= cutoff {
782 let _ = fs::remove_file(entry.path());
783 }
784 }
785 }
786 }
787 }
788
789 Ok(())
790 }
791}
792
793#[derive(Debug, Clone)]
795pub struct IndexMetadata {
796 pub document_count: usize,
797 pub embedding_dim: usize,
798 pub index_type: String,
799}
800
801#[derive(Debug, Clone)]
803pub struct StorageStats {
804 pub checkpoint_id: Option<u64>,
806 pub wal_size: usize,
808 pub checkpoint_size: usize,
810 pub total_size: usize,
812 pub ops_since_checkpoint: usize,
814 pub total_documents: usize,
816}
817
818pub struct RecoveryHelper<'a> {
824 storage: &'a IncrementalStorage,
825}
826
827impl<'a> RecoveryHelper<'a> {
828 pub fn new(storage: &'a IncrementalStorage) -> Self {
829 Self { storage }
830 }
831
832 pub fn replay_wal<F>(&self, mut apply_op: F) -> Result<usize>
834 where
835 F: FnMut(&WalOperation) -> Result<()>,
836 {
837 let entries = self.storage.get_wal_entries()?;
838 let count = entries.len();
839
840 for entry in entries {
841 match &entry.operation {
842 WalOperation::Checkpoint { .. } => {
843 continue;
845 }
846 op => apply_op(op)?,
847 }
848 }
849
850 Ok(count)
851 }
852}
853
854#[cfg(test)]
859mod tests {
860 use super::*;
861 use tempfile::TempDir;
862
863 fn create_test_document(id: &str, dim: usize) -> Document {
864 Document {
865 id: id.to_string(),
866 content: format!("Content for {}", id),
867 embedding: vec![0.1; dim],
868 metadata: None,
869 }
870 }
871
872 #[test]
873 fn test_config_builder() {
874 let config = IncrementalConfig::default()
875 .with_checkpoint_threshold(5000)
876 .with_wal_sync_interval(50)
877 .with_max_wal_size(50 * 1024 * 1024)
878 .with_sync_on_write(true)
879 .with_keep_checkpoints(3);
880
881 assert_eq!(config.checkpoint_threshold, 5000);
882 assert_eq!(config.wal_sync_interval, 50);
883 assert_eq!(config.max_wal_size, 50 * 1024 * 1024);
884 assert!(config.sync_on_write);
885 assert_eq!(config.keep_checkpoints, 3);
886 }
887
888 #[test]
889 fn test_wal_entry_integrity() {
890 let entry = WalEntry::new(1, WalOperation::Add(create_test_document("doc1", 128)));
891 assert!(entry.verify());
892
893 let mut tampered = entry.clone();
895 tampered.seq = 999;
896 assert!(!tampered.verify());
897 }
898
899 #[test]
900 fn test_storage_creation() {
901 let dir = TempDir::new().unwrap();
902 let storage = IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
903
904 assert!(storage.manifest().current_checkpoint.is_none());
905 assert_eq!(storage.manifest().wal_seq, 0);
906 }
907
908 #[test]
909 fn test_wal_logging() {
910 let dir = TempDir::new().unwrap();
911 let mut storage =
912 IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
913
914 storage.log_add(&create_test_document("doc1", 128)).unwrap();
916 storage.log_add(&create_test_document("doc2", 128)).unwrap();
917 storage.log_remove("doc1").unwrap();
918
919 assert_eq!(storage.manifest().wal_seq, 3);
920 assert_eq!(storage.manifest().ops_since_checkpoint, 3);
921
922 storage.sync().unwrap();
924
925 let entries = storage.get_wal_entries().unwrap();
927 assert_eq!(entries.len(), 3);
928
929 match &entries[0].operation {
930 WalOperation::Add(doc) => assert_eq!(doc.id, "doc1"),
931 _ => panic!("Expected Add operation"),
932 }
933
934 match &entries[2].operation {
935 WalOperation::Remove(id) => assert_eq!(id, "doc1"),
936 _ => panic!("Expected Remove operation"),
937 }
938 }
939
940 #[test]
941 fn test_checkpoint_and_recovery() {
942 let dir = TempDir::new().unwrap();
943
944 let mut storage = IncrementalStorage::new(
946 dir.path(),
947 IncrementalConfig::default().with_checkpoint_threshold(100),
948 )
949 .unwrap();
950
951 let test_data: Vec<String> =
953 vec!["doc1".to_string(), "doc2".to_string(), "doc3".to_string()];
954 for id in &test_data {
955 storage.log_add(&create_test_document(id, 128)).unwrap();
956 }
957
958 let meta = storage
960 .checkpoint(
961 &test_data,
962 IndexMetadata {
963 document_count: 3,
964 embedding_dim: 128,
965 index_type: "test".to_string(),
966 },
967 )
968 .unwrap();
969
970 assert_eq!(meta.id, 1);
971 assert_eq!(meta.document_count, 3);
972
973 storage.log_add(&create_test_document("doc4", 128)).unwrap();
975 storage.sync().unwrap();
976
977 let (loaded_data, loaded_meta): (Vec<String>, CheckpointMeta) =
979 storage.load_checkpoint().unwrap().unwrap();
980 assert_eq!(loaded_data, test_data);
981 assert_eq!(loaded_meta.id, 1);
982
983 let entries = storage.get_wal_entries().unwrap();
985 assert_eq!(entries.len(), 1);
986 match &entries[0].operation {
987 WalOperation::Add(doc) => assert_eq!(doc.id, "doc4"),
988 _ => panic!("Expected Add operation"),
989 }
990 }
991
992 #[test]
993 fn test_needs_checkpoint() {
994 let dir = TempDir::new().unwrap();
995 let mut storage = IncrementalStorage::new(
996 dir.path(),
997 IncrementalConfig::default().with_checkpoint_threshold(5),
998 )
999 .unwrap();
1000
1001 for i in 0..4 {
1002 storage
1003 .log_add(&create_test_document(&format!("doc{}", i), 128))
1004 .unwrap();
1005 }
1006 assert!(!storage.needs_checkpoint());
1007
1008 storage.log_add(&create_test_document("doc5", 128)).unwrap();
1009 assert!(storage.needs_checkpoint());
1010 }
1011
1012 #[test]
1013 fn test_storage_stats() {
1014 let dir = TempDir::new().unwrap();
1015 let mut storage =
1016 IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1017
1018 for i in 0..10 {
1019 storage
1020 .log_add(&create_test_document(&format!("doc{}", i), 128))
1021 .unwrap();
1022 }
1023 storage.sync().unwrap();
1024
1025 let stats = storage.stats();
1026 assert!(stats.wal_size > 0);
1027 assert_eq!(stats.ops_since_checkpoint, 10);
1028 assert!(stats.checkpoint_id.is_none());
1029 }
1030
1031 #[test]
1032 fn test_recovery_helper() {
1033 let dir = TempDir::new().unwrap();
1034 let mut storage =
1035 IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1036
1037 storage.log_add(&create_test_document("doc1", 128)).unwrap();
1039 storage.log_add(&create_test_document("doc2", 128)).unwrap();
1040 storage.log_remove("doc1").unwrap();
1041 storage.sync().unwrap();
1042
1043 let helper = RecoveryHelper::new(&storage);
1045 let mut adds = 0;
1046 let mut removes = 0;
1047
1048 helper
1049 .replay_wal(|op| {
1050 match op {
1051 WalOperation::Add(_) => adds += 1,
1052 WalOperation::Remove(_) => removes += 1,
1053 _ => {}
1054 }
1055 Ok(())
1056 })
1057 .unwrap();
1058
1059 assert_eq!(adds, 2);
1060 assert_eq!(removes, 1);
1061 }
1062
1063 #[test]
1064 fn test_persistence_across_reopens() {
1065 let dir = TempDir::new().unwrap();
1066
1067 {
1069 let mut storage =
1070 IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1071 storage.log_add(&create_test_document("doc1", 128)).unwrap();
1072 storage.log_add(&create_test_document("doc2", 128)).unwrap();
1073 storage.sync().unwrap();
1074 }
1075
1076 {
1078 let storage =
1079 IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1080 assert_eq!(storage.manifest().wal_seq, 2);
1081
1082 let entries = storage.get_wal_entries().unwrap();
1083 assert_eq!(entries.len(), 2);
1084 }
1085 }
1086
1087 #[test]
1088 fn test_keep_checkpoints_zero_prunes_old_checkpoints() {
1089 let dir = TempDir::new().unwrap();
1090 let mut storage = IncrementalStorage::new(
1091 dir.path(),
1092 IncrementalConfig::default().with_keep_checkpoints(0),
1093 )
1094 .unwrap();
1095
1096 let data = vec!["doc".to_string()];
1097 for checkpoint_no in 0..3 {
1098 storage
1099 .checkpoint(
1100 &data,
1101 IndexMetadata {
1102 document_count: 1,
1103 embedding_dim: 128,
1104 index_type: format!("test_{}", checkpoint_no),
1105 },
1106 )
1107 .unwrap();
1108 }
1109
1110 let checkpoint_bins: Vec<_> = fs::read_dir(dir.path())
1111 .unwrap()
1112 .filter_map(|e| e.ok())
1113 .map(|e| e.file_name().to_string_lossy().to_string())
1114 .filter(|name| name.starts_with("checkpoint_") && name.ends_with(".bin"))
1115 .collect();
1116
1117 assert_eq!(
1118 checkpoint_bins.len(),
1119 1,
1120 "keep_checkpoints=0 should keep only current checkpoint"
1121 );
1122 }
1123
1124 #[test]
1125 fn test_keep_checkpoints_exact_retention_count() {
1126 let dir = TempDir::new().unwrap();
1127 let mut storage = IncrementalStorage::new(
1128 dir.path(),
1129 IncrementalConfig::default().with_keep_checkpoints(2),
1130 )
1131 .unwrap();
1132
1133 let data = vec!["doc".to_string()];
1134 for checkpoint_no in 0..5 {
1135 storage
1136 .checkpoint(
1137 &data,
1138 IndexMetadata {
1139 document_count: 1,
1140 embedding_dim: 128,
1141 index_type: format!("test_{}", checkpoint_no),
1142 },
1143 )
1144 .unwrap();
1145 }
1146
1147 let checkpoint_bins: Vec<_> = fs::read_dir(dir.path())
1148 .unwrap()
1149 .filter_map(|e| e.ok())
1150 .map(|e| e.file_name().to_string_lossy().to_string())
1151 .filter(|name| name.starts_with("checkpoint_") && name.ends_with(".bin"))
1152 .collect();
1153
1154 assert_eq!(
1155 checkpoint_bins.len(),
1156 2,
1157 "retention should keep exactly keep_checkpoints checkpoint files"
1158 );
1159 }
1160
1161 #[test]
1162 fn test_wal_roundtrip_with_metadata() {
1163 let dir = TempDir::new().unwrap();
1164 let mut storage =
1165 IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1166
1167 let doc = Document {
1168 id: "meta-doc".to_string(),
1169 content: "has metadata".to_string(),
1170 embedding: vec![0.1; 4],
1171 metadata: Some(serde_json::json!({
1172 "scope": "workspace",
1173 "tags": ["rust", "ai"],
1174 "priority": 5
1175 })),
1176 };
1177
1178 storage.log_add(&doc).unwrap();
1179 storage.sync().unwrap();
1180
1181 let entries = storage.get_wal_entries().unwrap();
1183 assert_eq!(entries.len(), 1);
1184 match &entries[0].operation {
1185 WalOperation::Add(recovered) => {
1186 assert_eq!(recovered.id, "meta-doc");
1187 let meta = recovered.metadata.as_ref().unwrap();
1188 assert_eq!(meta["scope"], "workspace");
1189 assert_eq!(meta["priority"], 5);
1190 assert_eq!(meta["tags"][0], "rust");
1191 }
1192 _ => panic!("Expected Add operation"),
1193 }
1194 }
1195
1196 #[test]
1197 fn test_checkpoint_roundtrip_with_metadata() {
1198 let dir = TempDir::new().unwrap();
1199 let mut storage =
1200 IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1201
1202 let docs = vec![
1203 Document {
1204 id: "d1".to_string(),
1205 content: "first".to_string(),
1206 embedding: vec![1.0, 0.0],
1207 metadata: Some(serde_json::json!({"lang": "rust"})),
1208 },
1209 Document {
1210 id: "d2".to_string(),
1211 content: "second".to_string(),
1212 embedding: vec![0.0, 1.0],
1213 metadata: None,
1214 },
1215 ];
1216
1217 storage
1218 .checkpoint(
1219 &docs,
1220 IndexMetadata {
1221 document_count: 2,
1222 embedding_dim: 2,
1223 index_type: "hnsw".to_string(),
1224 },
1225 )
1226 .unwrap();
1227
1228 let (loaded, meta): (Vec<Document>, CheckpointMeta) =
1229 storage.load_checkpoint().unwrap().unwrap();
1230 assert_eq!(meta.document_count, 2);
1231 assert_eq!(loaded.len(), 2);
1232
1233 assert_eq!(loaded[0].id, "d1");
1234 assert_eq!(loaded[0].metadata.as_ref().unwrap()["lang"], "rust");
1235
1236 assert_eq!(loaded[1].id, "d2");
1237 assert!(loaded[1].metadata.is_none());
1238 }
1239
1240 #[test]
1241 fn test_recovery_ignores_truncated_tail_entry() {
1242 let dir = TempDir::new().unwrap();
1243 {
1244 let mut storage =
1245 IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1246 storage.log_add(&create_test_document("doc1", 4)).unwrap();
1247 storage.sync().unwrap();
1248 }
1249
1250 let wal_path = dir.path().join("wal_00000.log");
1251 let torn_entry = WalEntry::new(2, WalOperation::Add(create_test_document("doc2", 4)));
1252 let torn_payload = serde_json::to_vec(&torn_entry).unwrap();
1253 let torn_len = torn_payload.len() as u32;
1254
1255 let mut file = OpenOptions::new().append(true).open(&wal_path).unwrap();
1256 file.write_all(&torn_len.to_le_bytes()).unwrap();
1257 file.write_all(&torn_payload[..torn_payload.len() / 2])
1258 .unwrap();
1259 file.sync_all().unwrap();
1260
1261 let storage = IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1262 let entries = storage.get_wal_entries().unwrap();
1263 assert_eq!(entries.len(), 1);
1264 match &entries[0].operation {
1265 WalOperation::Add(doc) => assert_eq!(doc.id, "doc1"),
1266 _ => panic!("expected Add operation"),
1267 }
1268 }
1269
1270 #[test]
1271 fn test_atomic_writes_leave_no_tmp_files_after_repeated_checkpoints() {
1272 let dir = TempDir::new().unwrap();
1273 let mut storage =
1274 IncrementalStorage::new(dir.path(), IncrementalConfig::default()).unwrap();
1275
1276 for i in 0..8 {
1277 let payload = vec![format!("doc-{i}")];
1278 storage
1279 .checkpoint(
1280 &payload,
1281 IndexMetadata {
1282 document_count: 1,
1283 embedding_dim: 128,
1284 index_type: "hnsw".to_string(),
1285 },
1286 )
1287 .unwrap();
1288 }
1289
1290 let has_tmp = fs::read_dir(dir.path())
1291 .unwrap()
1292 .filter_map(|e| e.ok())
1293 .map(|e| e.file_name().to_string_lossy().to_string())
1294 .any(|name| name.ends_with(".tmp"));
1295 assert!(!has_tmp);
1296 }
1297}