rag_plusplus_core/wal/
reader.rs

1//! WAL Reader
2//!
3//! Reads and replays WAL entries for recovery.
4
5use crate::error::{Error, Result};
6use crate::wal::entry::WalEntry;
7use std::fs::File;
8use std::io::{BufReader, Read};
9use std::path::Path;
10
11/// WAL file reader.
12///
13/// Provides iteration over WAL entries for recovery.
14///
15/// # Example
16///
17/// ```ignore
18/// use rag_plusplus_core::wal::WalReader;
19///
20/// let reader = WalReader::open("./wal/wal_00000001.log")?;
21///
22/// for entry in reader {
23///     match entry {
24///         Ok(e) => println!("Seq {}: {:?}", e.sequence, e.entry_type),
25///         Err(e) => eprintln!("Error: {}", e),
26///     }
27/// }
28/// ```
29pub struct WalReader {
30    reader: BufReader<File>,
31    entries_read: u64,
32}
33
34impl WalReader {
35    /// Open a WAL file for reading.
36    ///
37    /// # Errors
38    ///
39    /// Returns error if file cannot be opened.
40    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
41        let file = File::open(path.as_ref()).map_err(|e| Error::WalRecovery {
42            reason: format!("Failed to open WAL file: {e}"),
43        })?;
44
45        Ok(Self {
46            reader: BufReader::new(file),
47            entries_read: 0,
48        })
49    }
50
51    /// Read the next entry from the WAL.
52    fn read_entry(&mut self) -> Option<Result<WalEntry>> {
53        // Read length prefix (4 bytes)
54        let mut len_buf = [0u8; 4];
55        match self.reader.read_exact(&mut len_buf) {
56            Ok(()) => {}
57            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
58                return None; // End of file
59            }
60            Err(e) => {
61                return Some(Err(Error::WalRecovery {
62                    reason: format!("Failed to read length: {e}"),
63                }));
64            }
65        }
66
67        let len = u32::from_le_bytes(len_buf) as usize;
68
69        // Sanity check length
70        if len > 10 * 1024 * 1024 {
71            // 10 MB max entry size
72            return Some(Err(Error::WalRecovery {
73                reason: format!("Entry size too large: {len}"),
74            }));
75        }
76
77        // Read entry data
78        let mut data = vec![0u8; len];
79        if let Err(e) = self.reader.read_exact(&mut data) {
80            return Some(Err(Error::WalRecovery {
81                reason: format!("Failed to read entry data: {e}"),
82            }));
83        }
84
85        // Deserialize
86        match WalEntry::from_bytes(&data) {
87            Ok(entry) => {
88                self.entries_read += 1;
89                Some(Ok(entry))
90            }
91            Err(e) => Some(Err(Error::WalRecovery {
92                reason: format!("Failed to deserialize entry: {e}"),
93            })),
94        }
95    }
96
97    /// Number of entries successfully read.
98    #[must_use]
99    pub fn entries_read(&self) -> u64 {
100        self.entries_read
101    }
102}
103
104impl Iterator for WalReader {
105    type Item = Result<WalEntry>;
106
107    fn next(&mut self) -> Option<Self::Item> {
108        self.read_entry()
109    }
110}
111
112/// Read all entries from multiple WAL files.
113///
114/// Files are read in sorted order (by filename).
115#[allow(dead_code)]
116pub struct MultiFileReader {
117    files: Vec<std::path::PathBuf>,
118    current_reader: Option<WalReader>,
119    current_index: usize,
120    total_entries: u64,
121}
122
123#[allow(dead_code)]
124impl MultiFileReader {
125    /// Create reader from a directory of WAL files.
126    ///
127    /// # Errors
128    ///
129    /// Returns error if directory cannot be read.
130    pub fn from_directory(dir: impl AsRef<Path>) -> Result<Self> {
131        let mut files = Vec::new();
132
133        let entries = std::fs::read_dir(dir.as_ref()).map_err(|e| Error::WalRecovery {
134            reason: format!("Failed to read WAL directory: {e}"),
135        })?;
136
137        for entry in entries.flatten() {
138            let path = entry.path();
139            if path.extension().map_or(false, |e| e == "log") {
140                files.push(path);
141            }
142        }
143
144        // Sort by filename for proper ordering
145        files.sort();
146
147        Ok(Self {
148            files,
149            current_reader: None,
150            current_index: 0,
151            total_entries: 0,
152        })
153    }
154
155    /// Move to the next file.
156    fn next_file(&mut self) -> Option<Result<()>> {
157        if self.current_index >= self.files.len() {
158            return None;
159        }
160
161        let path = &self.files[self.current_index];
162        self.current_index += 1;
163
164        match WalReader::open(path) {
165            Ok(reader) => {
166                self.current_reader = Some(reader);
167                Some(Ok(()))
168            }
169            Err(e) => Some(Err(e)),
170        }
171    }
172
173    /// Get total entries read across all files.
174    #[must_use]
175    pub fn total_entries(&self) -> u64 {
176        self.total_entries
177    }
178}
179
180impl Iterator for MultiFileReader {
181    type Item = Result<WalEntry>;
182
183    fn next(&mut self) -> Option<Self::Item> {
184        loop {
185            // Try current reader first
186            if let Some(reader) = &mut self.current_reader {
187                if let Some(entry) = reader.next() {
188                    if entry.is_ok() {
189                        self.total_entries += 1;
190                    }
191                    return Some(entry);
192                }
193            }
194
195            // Move to next file
196            match self.next_file() {
197                Some(Ok(())) => continue,
198                Some(Err(e)) => return Some(Err(e)),
199                None => return None,
200            }
201        }
202    }
203}
204
205/// Replay WAL entries to recover state.
206///
207/// # Arguments
208///
209/// * `reader` - Source of WAL entries
210/// * `on_insert` - Called for Insert entries
211/// * `on_update` - Called for UpdateStats entries
212/// * `on_delete` - Called for Delete entries
213///
214/// # Returns
215///
216/// The last sequence number seen, or 0 if no entries.
217#[allow(dead_code)]
218pub fn replay_wal<I, FI, FU, FD>(
219    reader: I,
220    mut on_insert: FI,
221    mut on_update: FU,
222    mut on_delete: FD,
223) -> Result<ReplayStats>
224where
225    I: Iterator<Item = Result<WalEntry>>,
226    FI: FnMut(&WalEntry) -> Result<()>,
227    FU: FnMut(&str, f64) -> Result<()>,
228    FD: FnMut(&str) -> Result<()>,
229{
230    use crate::wal::entry::WalEntryType;
231
232    let mut stats = ReplayStats::default();
233
234    for entry_result in reader {
235        let entry = entry_result?;
236        stats.last_sequence = stats.last_sequence.max(entry.sequence);
237
238        match entry.entry_type {
239            WalEntryType::Insert => {
240                on_insert(&entry)?;
241                stats.inserts += 1;
242            }
243            WalEntryType::UpdateStats => {
244                if let Some(outcome) = entry.outcome {
245                    on_update(&entry.record_id, outcome)?;
246                    stats.updates += 1;
247                }
248            }
249            WalEntryType::Delete => {
250                on_delete(&entry.record_id)?;
251                stats.deletes += 1;
252            }
253            WalEntryType::Checkpoint => {
254                stats.checkpoints += 1;
255                stats.last_checkpoint_seq = Some(entry.sequence);
256            }
257        }
258
259        stats.total_entries += 1;
260    }
261
262    Ok(stats)
263}
264
265/// Statistics from WAL replay.
266#[derive(Debug, Clone, Default)]
267#[allow(dead_code)]
268pub struct ReplayStats {
269    /// Total entries processed
270    pub total_entries: u64,
271    /// Insert operations
272    pub inserts: u64,
273    /// Update operations
274    pub updates: u64,
275    /// Delete operations
276    pub deletes: u64,
277    /// Checkpoint markers
278    pub checkpoints: u64,
279    /// Last sequence number seen
280    pub last_sequence: u64,
281    /// Last checkpoint sequence (if any)
282    pub last_checkpoint_seq: Option<u64>,
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288    use crate::stats::OutcomeStats;
289    use crate::types::RecordStatus;
290    use crate::wal::{WalConfig, WalWriter};
291    use crate::types::MemoryRecord;
292    use tempfile::TempDir;
293
294    fn create_test_record(id: &str) -> MemoryRecord {
295        MemoryRecord {
296            id: id.into(),
297            embedding: vec![1.0, 2.0, 3.0],
298            context: format!("Context for {id}"),
299            outcome: 0.5,
300            metadata: Default::default(),
301            created_at: 1234567890,
302            status: RecordStatus::Active,
303            stats: OutcomeStats::new(1),
304        }
305    }
306
307    #[test]
308    fn test_read_written_entries() {
309        let temp_dir = TempDir::new().unwrap();
310
311        // Write entries
312        {
313            let config = WalConfig::new(temp_dir.path());
314            let writer = WalWriter::new(config).unwrap();
315            writer.log_insert(&create_test_record("rec-1")).unwrap();
316            writer.log_insert(&create_test_record("rec-2")).unwrap();
317            writer.log_update_stats(&"rec-1".into(), 0.8).unwrap();
318            writer.log_delete(&"rec-2".into()).unwrap();
319            writer.flush().unwrap();
320        }
321
322        // Read entries
323        let files = std::fs::read_dir(temp_dir.path())
324            .unwrap()
325            .filter_map(|e| e.ok())
326            .map(|e| e.path())
327            .find(|p| p.extension().map_or(false, |e| e == "log"))
328            .unwrap();
329
330        let reader = WalReader::open(files).unwrap();
331        let entries: Vec<_> = reader.filter_map(|e| e.ok()).collect();
332
333        assert_eq!(entries.len(), 4);
334        assert_eq!(entries[0].record_id, "rec-1");
335        assert_eq!(entries[1].record_id, "rec-2");
336        assert_eq!(entries[2].record_id, "rec-1");
337        assert_eq!(entries[3].record_id, "rec-2");
338    }
339
340    #[test]
341    fn test_multi_file_reader() {
342        let temp_dir = TempDir::new().unwrap();
343
344        // Write to multiple files
345        {
346            let config = WalConfig::new(temp_dir.path())
347                .with_max_file_size(256); // Small for rotation
348
349            let writer = WalWriter::new(config).unwrap();
350
351            for i in 0..20 {
352                writer
353                    .log_insert(&create_test_record(&format!("rec-{i}")))
354                    .unwrap();
355            }
356            writer.flush().unwrap();
357
358            let files = writer.list_files().unwrap();
359            assert!(files.len() > 1);
360        }
361
362        // Read all with multi-file reader
363        let reader = MultiFileReader::from_directory(temp_dir.path()).unwrap();
364        let entries: Vec<_> = reader.filter_map(|e| e.ok()).collect();
365
366        assert_eq!(entries.len(), 20);
367
368        // Verify sequence is continuous
369        for (i, entry) in entries.iter().enumerate() {
370            assert_eq!(entry.sequence, (i + 1) as u64);
371        }
372    }
373
374    #[test]
375    fn test_replay_wal() {
376        let temp_dir = TempDir::new().unwrap();
377
378        // Write entries
379        {
380            let config = WalConfig::new(temp_dir.path());
381            let writer = WalWriter::new(config).unwrap();
382            writer.log_insert(&create_test_record("rec-1")).unwrap();
383            writer.log_insert(&create_test_record("rec-2")).unwrap();
384            writer.log_update_stats(&"rec-1".into(), 0.8).unwrap();
385            writer.log_checkpoint().unwrap();
386            writer.log_delete(&"rec-2".into()).unwrap();
387            writer.flush().unwrap();
388        }
389
390        // Replay
391        let reader = MultiFileReader::from_directory(temp_dir.path()).unwrap();
392
393        let mut inserts = Vec::new();
394        let mut updates = Vec::new();
395        let mut deletes = Vec::new();
396
397        let stats = replay_wal(
398            reader,
399            |e| {
400                inserts.push(e.record_id.clone());
401                Ok(())
402            },
403            |id, outcome| {
404                updates.push((id.to_string(), outcome));
405                Ok(())
406            },
407            |id| {
408                deletes.push(id.to_string());
409                Ok(())
410            },
411        )
412        .unwrap();
413
414        assert_eq!(inserts.len(), 2);
415        assert_eq!(updates.len(), 1);
416        assert_eq!(deletes.len(), 1);
417        assert_eq!(stats.checkpoints, 1);
418        assert_eq!(stats.last_sequence, 5);
419        assert_eq!(stats.last_checkpoint_seq, Some(4));
420    }
421}