rag_plusplus_core/wal/
reader.rs1use crate::error::{Error, Result};
6use crate::wal::entry::WalEntry;
7use std::fs::File;
8use std::io::{BufReader, Read};
9use std::path::Path;
10
11pub struct WalReader {
30 reader: BufReader<File>,
31 entries_read: u64,
32}
33
34impl WalReader {
35 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
41 let file = File::open(path.as_ref()).map_err(|e| Error::WalRecovery {
42 reason: format!("Failed to open WAL file: {e}"),
43 })?;
44
45 Ok(Self {
46 reader: BufReader::new(file),
47 entries_read: 0,
48 })
49 }
50
51 fn read_entry(&mut self) -> Option<Result<WalEntry>> {
53 let mut len_buf = [0u8; 4];
55 match self.reader.read_exact(&mut len_buf) {
56 Ok(()) => {}
57 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
58 return None; }
60 Err(e) => {
61 return Some(Err(Error::WalRecovery {
62 reason: format!("Failed to read length: {e}"),
63 }));
64 }
65 }
66
67 let len = u32::from_le_bytes(len_buf) as usize;
68
69 if len > 10 * 1024 * 1024 {
71 return Some(Err(Error::WalRecovery {
73 reason: format!("Entry size too large: {len}"),
74 }));
75 }
76
77 let mut data = vec![0u8; len];
79 if let Err(e) = self.reader.read_exact(&mut data) {
80 return Some(Err(Error::WalRecovery {
81 reason: format!("Failed to read entry data: {e}"),
82 }));
83 }
84
85 match WalEntry::from_bytes(&data) {
87 Ok(entry) => {
88 self.entries_read += 1;
89 Some(Ok(entry))
90 }
91 Err(e) => Some(Err(Error::WalRecovery {
92 reason: format!("Failed to deserialize entry: {e}"),
93 })),
94 }
95 }
96
97 #[must_use]
99 pub fn entries_read(&self) -> u64 {
100 self.entries_read
101 }
102}
103
104impl Iterator for WalReader {
105 type Item = Result<WalEntry>;
106
107 fn next(&mut self) -> Option<Self::Item> {
108 self.read_entry()
109 }
110}
111
112#[allow(dead_code)]
116pub struct MultiFileReader {
117 files: Vec<std::path::PathBuf>,
118 current_reader: Option<WalReader>,
119 current_index: usize,
120 total_entries: u64,
121}
122
123#[allow(dead_code)]
124impl MultiFileReader {
125 pub fn from_directory(dir: impl AsRef<Path>) -> Result<Self> {
131 let mut files = Vec::new();
132
133 let entries = std::fs::read_dir(dir.as_ref()).map_err(|e| Error::WalRecovery {
134 reason: format!("Failed to read WAL directory: {e}"),
135 })?;
136
137 for entry in entries.flatten() {
138 let path = entry.path();
139 if path.extension().map_or(false, |e| e == "log") {
140 files.push(path);
141 }
142 }
143
144 files.sort();
146
147 Ok(Self {
148 files,
149 current_reader: None,
150 current_index: 0,
151 total_entries: 0,
152 })
153 }
154
155 fn next_file(&mut self) -> Option<Result<()>> {
157 if self.current_index >= self.files.len() {
158 return None;
159 }
160
161 let path = &self.files[self.current_index];
162 self.current_index += 1;
163
164 match WalReader::open(path) {
165 Ok(reader) => {
166 self.current_reader = Some(reader);
167 Some(Ok(()))
168 }
169 Err(e) => Some(Err(e)),
170 }
171 }
172
173 #[must_use]
175 pub fn total_entries(&self) -> u64 {
176 self.total_entries
177 }
178}
179
180impl Iterator for MultiFileReader {
181 type Item = Result<WalEntry>;
182
183 fn next(&mut self) -> Option<Self::Item> {
184 loop {
185 if let Some(reader) = &mut self.current_reader {
187 if let Some(entry) = reader.next() {
188 if entry.is_ok() {
189 self.total_entries += 1;
190 }
191 return Some(entry);
192 }
193 }
194
195 match self.next_file() {
197 Some(Ok(())) => continue,
198 Some(Err(e)) => return Some(Err(e)),
199 None => return None,
200 }
201 }
202 }
203}
204
205#[allow(dead_code)]
218pub fn replay_wal<I, FI, FU, FD>(
219 reader: I,
220 mut on_insert: FI,
221 mut on_update: FU,
222 mut on_delete: FD,
223) -> Result<ReplayStats>
224where
225 I: Iterator<Item = Result<WalEntry>>,
226 FI: FnMut(&WalEntry) -> Result<()>,
227 FU: FnMut(&str, f64) -> Result<()>,
228 FD: FnMut(&str) -> Result<()>,
229{
230 use crate::wal::entry::WalEntryType;
231
232 let mut stats = ReplayStats::default();
233
234 for entry_result in reader {
235 let entry = entry_result?;
236 stats.last_sequence = stats.last_sequence.max(entry.sequence);
237
238 match entry.entry_type {
239 WalEntryType::Insert => {
240 on_insert(&entry)?;
241 stats.inserts += 1;
242 }
243 WalEntryType::UpdateStats => {
244 if let Some(outcome) = entry.outcome {
245 on_update(&entry.record_id, outcome)?;
246 stats.updates += 1;
247 }
248 }
249 WalEntryType::Delete => {
250 on_delete(&entry.record_id)?;
251 stats.deletes += 1;
252 }
253 WalEntryType::Checkpoint => {
254 stats.checkpoints += 1;
255 stats.last_checkpoint_seq = Some(entry.sequence);
256 }
257 }
258
259 stats.total_entries += 1;
260 }
261
262 Ok(stats)
263}
264
265#[derive(Debug, Clone, Default)]
267#[allow(dead_code)]
268pub struct ReplayStats {
269 pub total_entries: u64,
271 pub inserts: u64,
273 pub updates: u64,
275 pub deletes: u64,
277 pub checkpoints: u64,
279 pub last_sequence: u64,
281 pub last_checkpoint_seq: Option<u64>,
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288 use crate::stats::OutcomeStats;
289 use crate::types::RecordStatus;
290 use crate::wal::{WalConfig, WalWriter};
291 use crate::types::MemoryRecord;
292 use tempfile::TempDir;
293
294 fn create_test_record(id: &str) -> MemoryRecord {
295 MemoryRecord {
296 id: id.into(),
297 embedding: vec![1.0, 2.0, 3.0],
298 context: format!("Context for {id}"),
299 outcome: 0.5,
300 metadata: Default::default(),
301 created_at: 1234567890,
302 status: RecordStatus::Active,
303 stats: OutcomeStats::new(1),
304 }
305 }
306
307 #[test]
308 fn test_read_written_entries() {
309 let temp_dir = TempDir::new().unwrap();
310
311 {
313 let config = WalConfig::new(temp_dir.path());
314 let writer = WalWriter::new(config).unwrap();
315 writer.log_insert(&create_test_record("rec-1")).unwrap();
316 writer.log_insert(&create_test_record("rec-2")).unwrap();
317 writer.log_update_stats(&"rec-1".into(), 0.8).unwrap();
318 writer.log_delete(&"rec-2".into()).unwrap();
319 writer.flush().unwrap();
320 }
321
322 let files = std::fs::read_dir(temp_dir.path())
324 .unwrap()
325 .filter_map(|e| e.ok())
326 .map(|e| e.path())
327 .find(|p| p.extension().map_or(false, |e| e == "log"))
328 .unwrap();
329
330 let reader = WalReader::open(files).unwrap();
331 let entries: Vec<_> = reader.filter_map(|e| e.ok()).collect();
332
333 assert_eq!(entries.len(), 4);
334 assert_eq!(entries[0].record_id, "rec-1");
335 assert_eq!(entries[1].record_id, "rec-2");
336 assert_eq!(entries[2].record_id, "rec-1");
337 assert_eq!(entries[3].record_id, "rec-2");
338 }
339
340 #[test]
341 fn test_multi_file_reader() {
342 let temp_dir = TempDir::new().unwrap();
343
344 {
346 let config = WalConfig::new(temp_dir.path())
347 .with_max_file_size(256); let writer = WalWriter::new(config).unwrap();
350
351 for i in 0..20 {
352 writer
353 .log_insert(&create_test_record(&format!("rec-{i}")))
354 .unwrap();
355 }
356 writer.flush().unwrap();
357
358 let files = writer.list_files().unwrap();
359 assert!(files.len() > 1);
360 }
361
362 let reader = MultiFileReader::from_directory(temp_dir.path()).unwrap();
364 let entries: Vec<_> = reader.filter_map(|e| e.ok()).collect();
365
366 assert_eq!(entries.len(), 20);
367
368 for (i, entry) in entries.iter().enumerate() {
370 assert_eq!(entry.sequence, (i + 1) as u64);
371 }
372 }
373
374 #[test]
375 fn test_replay_wal() {
376 let temp_dir = TempDir::new().unwrap();
377
378 {
380 let config = WalConfig::new(temp_dir.path());
381 let writer = WalWriter::new(config).unwrap();
382 writer.log_insert(&create_test_record("rec-1")).unwrap();
383 writer.log_insert(&create_test_record("rec-2")).unwrap();
384 writer.log_update_stats(&"rec-1".into(), 0.8).unwrap();
385 writer.log_checkpoint().unwrap();
386 writer.log_delete(&"rec-2".into()).unwrap();
387 writer.flush().unwrap();
388 }
389
390 let reader = MultiFileReader::from_directory(temp_dir.path()).unwrap();
392
393 let mut inserts = Vec::new();
394 let mut updates = Vec::new();
395 let mut deletes = Vec::new();
396
397 let stats = replay_wal(
398 reader,
399 |e| {
400 inserts.push(e.record_id.clone());
401 Ok(())
402 },
403 |id, outcome| {
404 updates.push((id.to_string(), outcome));
405 Ok(())
406 },
407 |id| {
408 deletes.push(id.to_string());
409 Ok(())
410 },
411 )
412 .unwrap();
413
414 assert_eq!(inserts.len(), 2);
415 assert_eq!(updates.len(), 1);
416 assert_eq!(deletes.len(), 1);
417 assert_eq!(stats.checkpoints, 1);
418 assert_eq!(stats.last_sequence, 5);
419 assert_eq!(stats.last_checkpoint_seq, Some(4));
420 }
421}