Skip to main content

batuta/oracle/rag/
persistence.rs

1//! RAG Index Persistence - Section 9.7 of oracle-mode-spec.md
2//!
3//! Persistent storage for the RAG index at `~/.cache/batuta/rag/`.
4//! Uses JSON format with BLAKE3 checksums for integrity validation.
5//!
6//! # Toyota Production System Principles
7//!
8//! - **Jidoka**: Graceful degradation on corruption (rebuild instead of crash)
9//! - **Poka-Yoke**: Version compatibility prevents format mismatches
10//! - **Heijunka**: Incremental updates via fingerprint-based invalidation
11//! - **Muda**: JSON for debugging, future P2 uses bincode
12
13use super::fingerprint::{blake3_hash, DocumentFingerprint};
14use super::types::{Bm25Config, RrfConfig};
15use super::IndexedDocument;
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::fs;
19use std::io::{self, Write};
20use std::path::{Path, PathBuf};
21
22/// Index format version (semver major.minor.patch)
23///
24/// 1.1.0: Added chunk_contents, stemming, stop words, TF-IDF dense search
25pub const INDEX_VERSION: &str = "1.1.0";
26
27/// Cache directory relative to user cache
28const CACHE_SUBDIR: &str = "batuta/rag";
29
30/// Manifest filename
31const MANIFEST_FILE: &str = "manifest.json";
32
33/// Index filename
34const INDEX_FILE: &str = "index.json";
35
36/// Documents filename
37const DOCUMENTS_FILE: &str = "documents.json";
38
39/// Fingerprints-only filename (lightweight, for `is_index_current` checks)
40const FINGERPRINTS_FILE: &str = "fingerprints.json";
41
42/// Persisted RAG index manifest
43///
44/// Contains metadata and checksums for integrity validation.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct RagManifest {
47    /// Index format version (semver)
48    pub version: String,
49    /// BLAKE3 checksum of index.json
50    pub index_checksum: [u8; 32],
51    /// BLAKE3 checksum of documents.json
52    pub docs_checksum: [u8; 32],
53    /// Indexed corpus sources
54    pub sources: Vec<CorpusSource>,
55    /// Unix timestamp when indexed (milliseconds)
56    pub indexed_at: u64,
57    /// Batuta version that created this index
58    pub batuta_version: String,
59}
60
61/// Source corpus information
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct CorpusSource {
64    /// Corpus identifier (e.g., "trueno", "hf-ground-truth-corpus")
65    pub id: String,
66    /// Git commit hash at index time (if available)
67    pub commit: Option<String>,
68    /// Number of documents indexed from this source
69    pub doc_count: usize,
70    /// Number of chunks indexed from this source
71    pub chunk_count: usize,
72}
73
74/// Serializable inverted index state
75#[derive(Debug, Clone, Default, Serialize, Deserialize)]
76pub struct PersistedIndex {
77    /// Inverted index: term -> (doc_id -> term_frequency)
78    pub inverted_index: HashMap<String, HashMap<String, usize>>,
79    /// Document lengths for BM25
80    pub doc_lengths: HashMap<String, usize>,
81    /// BM25 configuration
82    pub bm25_config: Bm25Config,
83    /// RRF configuration
84    pub rrf_config: RrfConfig,
85    /// Average document length
86    pub avg_doc_length: f64,
87}
88
89/// Serializable document metadata
90#[derive(Debug, Clone, Default, Serialize, Deserialize)]
91pub struct PersistedDocuments {
92    /// Documents by ID
93    pub documents: HashMap<String, IndexedDocument>,
94    /// Fingerprints for change detection
95    pub fingerprints: HashMap<String, DocumentFingerprint>,
96    /// Total chunks indexed
97    pub total_chunks: usize,
98    /// Chunk content snippets (first 200 chars) for result display
99    #[serde(default)]
100    pub chunk_contents: HashMap<String, String>,
101}
102
103/// Persistence errors
104#[derive(Debug, thiserror::Error)]
105pub enum PersistenceError {
106    /// I/O error
107    #[error("I/O error: {0}")]
108    Io(#[from] io::Error),
109
110    /// JSON serialization error
111    #[error("JSON error: {0}")]
112    Json(#[from] serde_json::Error),
113
114    /// Checksum mismatch (Jidoka halt)
115    #[error("Checksum mismatch for {file}: expected {expected:x?}, got {actual:x?}")]
116    ChecksumMismatch { file: String, expected: [u8; 32], actual: [u8; 32] },
117
118    /// Version mismatch
119    #[error("Version mismatch: index version {index_version}, expected {expected_version}")]
120    VersionMismatch { index_version: String, expected_version: String },
121
122    /// Cache directory not found
123    #[error("Cache directory not found")]
124    CacheDirNotFound,
125
126    /// Manifest not found (no cached index)
127    #[error("No cached index found")]
128    NoCachedIndex,
129}
130
131/// RAG index persistence manager
132///
133/// Handles saving and loading the RAG index to/from disk.
134#[derive(Debug)]
135pub struct RagPersistence {
136    /// Cache path
137    cache_path: PathBuf,
138}
139
140impl RagPersistence {
141    /// Create persistence manager with default cache path
142    ///
143    /// Default path: `~/.cache/batuta/rag/`
144    pub fn new() -> Self {
145        Self { cache_path: Self::default_cache_path() }
146    }
147
148    /// Create persistence manager with custom cache path
149    pub fn with_path(path: PathBuf) -> Self {
150        Self { cache_path: path }
151    }
152
153    /// Get default cache path
154    ///
155    /// Uses `dirs::cache_dir()` for platform-specific cache location.
156    fn default_cache_path() -> PathBuf {
157        #[cfg(feature = "native")]
158        {
159            dirs::cache_dir().unwrap_or_else(|| PathBuf::from(".cache")).join(CACHE_SUBDIR)
160        }
161        #[cfg(not(feature = "native"))]
162        {
163            PathBuf::from(".cache").join(CACHE_SUBDIR)
164        }
165    }
166
167    /// Get the cache path
168    pub fn cache_path(&self) -> &Path {
169        &self.cache_path
170    }
171
172    /// Save index to disk using two-phase commit
173    ///
174    /// Writes three files with crash safety:
175    /// - **Prepare phase**: Write all `.tmp` files (crash here = old cache intact)
176    /// - **Commit phase**: Rename all 3, manifest LAST (crash before manifest
177    ///   rename = old manifest still valid or checksum mismatch triggers rebuild)
178    ///
179    /// Files written:
180    /// - `manifest.json`: Version and checksums
181    /// - `index.json`: Inverted index data
182    /// - `documents.json`: Document metadata
183    pub fn save(
184        &self,
185        index: &PersistedIndex,
186        docs: &PersistedDocuments,
187        sources: Vec<CorpusSource>,
188    ) -> Result<(), PersistenceError> {
189        // Ensure cache directory exists
190        fs::create_dir_all(&self.cache_path)?;
191
192        // Clean up any orphaned .tmp files from a previous crashed save
193        self.cleanup_tmp_files();
194
195        // Serialize index and documents
196        let index_json = serde_json::to_string_pretty(index)?;
197        let docs_json = serde_json::to_string_pretty(docs)?;
198
199        // Serialize fingerprints separately for fast is_index_current checks
200        let fingerprints_json = serde_json::to_string_pretty(&docs.fingerprints)?;
201
202        // Compute checksums
203        let index_checksum = blake3_hash(index_json.as_bytes());
204        let docs_checksum = blake3_hash(docs_json.as_bytes());
205
206        // Create manifest
207        let manifest = RagManifest {
208            version: INDEX_VERSION.to_string(),
209            index_checksum,
210            docs_checksum,
211            sources,
212            indexed_at: current_timestamp_ms(),
213            batuta_version: env!("CARGO_PKG_VERSION").to_string(),
214        };
215        let manifest_json = serde_json::to_string_pretty(&manifest)?;
216
217        // Phase 1: Prepare — write all .tmp files (crash here = old cache intact)
218        self.prepare_write(INDEX_FILE, index_json.as_bytes())?;
219        self.prepare_write(DOCUMENTS_FILE, docs_json.as_bytes())?;
220        self.prepare_write(FINGERPRINTS_FILE, fingerprints_json.as_bytes())?;
221        self.prepare_write(MANIFEST_FILE, manifest_json.as_bytes())?;
222
223        // Phase 2: Commit — rename all, manifest LAST
224        // Crash before manifest rename = old manifest checksums won't match new
225        // data files, which triggers graceful rebuild on next load().
226        self.commit_rename(INDEX_FILE)?;
227        self.commit_rename(DOCUMENTS_FILE)?;
228        self.commit_rename(FINGERPRINTS_FILE)?;
229        self.commit_rename(MANIFEST_FILE)?;
230
231        Ok(())
232    }
233
234    /// Load index from disk
235    ///
236    /// Returns `None` if no cached index exists or if the cache is corrupted
237    /// (IO error, checksum mismatch, invalid JSON). Corruption triggers a
238    /// warning to stderr so the caller can rebuild gracefully.
239    ///
240    /// Returns `Err` only for `VersionMismatch` (incompatible format requires
241    /// a code update, not just a re-index).
242    pub fn load(
243        &self,
244    ) -> Result<Option<(PersistedIndex, PersistedDocuments, RagManifest)>, PersistenceError> {
245        let manifest_path = self.cache_path.join(MANIFEST_FILE);
246
247        // Check if manifest exists
248        if !manifest_path.exists() {
249            return Ok(None);
250        }
251
252        // Load manifest — graceful on IO/JSON errors
253        let manifest_json = match fs::read_to_string(&manifest_path) {
254            Ok(s) => s,
255            Err(e) => {
256                eprintln!("Warning: failed to read RAG manifest, will rebuild: {e}");
257                return Ok(None);
258            }
259        };
260        let manifest: RagManifest = match serde_json::from_str(&manifest_json) {
261            Ok(m) => m,
262            Err(e) => {
263                eprintln!("Warning: corrupt RAG manifest JSON, will rebuild: {e}");
264                return Ok(None);
265            }
266        };
267
268        // Validate version (Poka-Yoke) — hard error, needs code update
269        self.validate_version(&manifest)?;
270
271        // Load and validate index — graceful on IO/JSON/checksum errors
272        let index_json = match fs::read_to_string(self.cache_path.join(INDEX_FILE)) {
273            Ok(s) => s,
274            Err(e) => {
275                eprintln!("Warning: failed to read RAG index file, will rebuild: {e}");
276                return Ok(None);
277            }
278        };
279        if let Err(e) = self.validate_checksum(&index_json, manifest.index_checksum, "index.json") {
280            eprintln!("Warning: {e}, will rebuild");
281            return Ok(None);
282        }
283        let index: PersistedIndex = match serde_json::from_str(&index_json) {
284            Ok(i) => i,
285            Err(e) => {
286                eprintln!("Warning: corrupt RAG index JSON, will rebuild: {e}");
287                return Ok(None);
288            }
289        };
290
291        // Load and validate documents — graceful on IO/JSON/checksum errors
292        let docs_json = match fs::read_to_string(self.cache_path.join(DOCUMENTS_FILE)) {
293            Ok(s) => s,
294            Err(e) => {
295                eprintln!("Warning: failed to read RAG documents file, will rebuild: {e}");
296                return Ok(None);
297            }
298        };
299        if let Err(e) = self.validate_checksum(&docs_json, manifest.docs_checksum, "documents.json")
300        {
301            eprintln!("Warning: {e}, will rebuild");
302            return Ok(None);
303        }
304        let docs: PersistedDocuments = match serde_json::from_str(&docs_json) {
305            Ok(d) => d,
306            Err(e) => {
307                eprintln!("Warning: corrupt RAG documents JSON, will rebuild: {e}");
308                return Ok(None);
309            }
310        };
311
312        Ok(Some((index, docs, manifest)))
313    }
314
315    /// Load only fingerprints for fast `is_index_current` checks.
316    ///
317    /// Reads ~KB fingerprints.json instead of ~600MB (index.json + documents.json).
318    /// Falls back to full `load()` if fingerprints.json doesn't exist (pre-upgrade cache).
319    pub fn load_fingerprints_only(
320        &self,
321    ) -> Result<Option<HashMap<String, DocumentFingerprint>>, PersistenceError> {
322        let fp_path = self.cache_path.join(FINGERPRINTS_FILE);
323
324        if fp_path.exists() {
325            let fp_json = match fs::read_to_string(&fp_path) {
326                Ok(s) => s,
327                Err(_) => return self.load_fingerprints_fallback(),
328            };
329            match serde_json::from_str(&fp_json) {
330                Ok(fps) => return Ok(Some(fps)),
331                Err(_) => return self.load_fingerprints_fallback(),
332            }
333        }
334
335        // Fallback: fingerprints.json doesn't exist (pre-upgrade cache)
336        self.load_fingerprints_fallback()
337    }
338
339    /// Fallback: extract fingerprints from full documents.json load
340    fn load_fingerprints_fallback(
341        &self,
342    ) -> Result<Option<HashMap<String, DocumentFingerprint>>, PersistenceError> {
343        self.load().map(|opt| opt.map(|(_, docs, _)| docs.fingerprints))
344    }
345
346    /// Save only fingerprints.json for fast `is_index_current` checks.
347    ///
348    /// Used by the SQLite indexing path to persist fingerprints without
349    /// writing the full 600MB JSON index/documents files.
350    pub fn save_fingerprints_only(
351        &self,
352        fingerprints: &HashMap<String, DocumentFingerprint>,
353    ) -> Result<(), PersistenceError> {
354        fs::create_dir_all(&self.cache_path)?;
355        let fingerprints_json = serde_json::to_string_pretty(fingerprints)?;
356        self.prepare_write(FINGERPRINTS_FILE, fingerprints_json.as_bytes())?;
357        self.commit_rename(FINGERPRINTS_FILE)?;
358        Ok(())
359    }
360
361    /// Clear cached index
362    pub fn clear(&self) -> Result<(), PersistenceError> {
363        if self.cache_path.exists() {
364            // Remove individual files
365            let _ = fs::remove_file(self.cache_path.join(MANIFEST_FILE));
366            let _ = fs::remove_file(self.cache_path.join(INDEX_FILE));
367            let _ = fs::remove_file(self.cache_path.join(DOCUMENTS_FILE));
368            let _ = fs::remove_file(self.cache_path.join(FINGERPRINTS_FILE));
369
370            // Try to remove directory if empty
371            let _ = fs::remove_dir(&self.cache_path);
372        }
373        Ok(())
374    }
375
376    /// Get index statistics without full load
377    pub fn stats(&self) -> Result<Option<RagManifest>, PersistenceError> {
378        let manifest_path = self.cache_path.join(MANIFEST_FILE);
379
380        if !manifest_path.exists() {
381            return Ok(None);
382        }
383
384        let manifest_json = fs::read_to_string(&manifest_path)?;
385        let manifest: RagManifest = serde_json::from_str(&manifest_json)?;
386
387        Ok(Some(manifest))
388    }
389
390    /// Phase 1: Write data to a `.tmp` file (prepare)
391    fn prepare_write(&self, filename: &str, data: &[u8]) -> Result<(), io::Error> {
392        let tmp_path = self.cache_path.join(format!("{}.tmp", filename));
393
394        let mut file = fs::File::create(&tmp_path)?;
395        file.write_all(data)?;
396        file.sync_all()?;
397
398        Ok(())
399    }
400
401    /// Phase 2: Rename `.tmp` file to final path (commit)
402    fn commit_rename(&self, filename: &str) -> Result<(), io::Error> {
403        let tmp_path = self.cache_path.join(format!("{}.tmp", filename));
404        let final_path = self.cache_path.join(filename);
405
406        fs::rename(&tmp_path, &final_path)?;
407
408        Ok(())
409    }
410
411    /// Remove orphaned `.tmp` files from a previous crashed save
412    fn cleanup_tmp_files(&self) {
413        for filename in &[MANIFEST_FILE, INDEX_FILE, DOCUMENTS_FILE] {
414            let tmp_path = self.cache_path.join(format!("{}.tmp", filename));
415            let _ = fs::remove_file(tmp_path);
416        }
417    }
418
419    /// Validate version compatibility (Poka-Yoke)
420    fn validate_version(&self, manifest: &RagManifest) -> Result<(), PersistenceError> {
421        // Parse versions
422        let index_parts: Vec<&str> = manifest.version.split('.').collect();
423        let expected_parts: Vec<&str> = INDEX_VERSION.split('.').collect();
424
425        // Major version must match for compatibility
426        if index_parts.first() != expected_parts.first() {
427            return Err(PersistenceError::VersionMismatch {
428                index_version: manifest.version.clone(),
429                expected_version: INDEX_VERSION.to_string(),
430            });
431        }
432
433        Ok(())
434    }
435
436    /// Validate checksum (Jidoka)
437    fn validate_checksum(
438        &self,
439        data: &str,
440        expected: [u8; 32],
441        filename: &str,
442    ) -> Result<(), PersistenceError> {
443        let actual = blake3_hash(data.as_bytes());
444
445        if actual != expected {
446            return Err(PersistenceError::ChecksumMismatch {
447                file: filename.to_string(),
448                expected,
449                actual,
450            });
451        }
452
453        Ok(())
454    }
455}
456
457impl Default for RagPersistence {
458    fn default() -> Self {
459        Self::new()
460    }
461}
462
463/// Get current timestamp in milliseconds
464fn current_timestamp_ms() -> u64 {
465    std::time::SystemTime::now()
466        .duration_since(std::time::UNIX_EPOCH)
467        .map(|d| d.as_millis() as u64)
468        .unwrap_or(0)
469}
470
471#[cfg(test)]
472#[path = "persistence_tests.rs"]
473mod tests;