rustlite_wal/
reader.rs

1// WAL reader module - reads and replays log records
2//
3// The reader handles:
4// 1. Segment discovery - finding all WAL segment files in order
5// 2. Record reading - iterating through records in each segment
6// 3. CRC validation - verifying data integrity of each record
7
8use crate::record::WalRecord;
9use crate::writer::WalHeader;
10use rustlite_core::{Error, Result};
11use std::fs::File;
12use std::io::{BufReader, Read};
13use std::path::{Path, PathBuf};
14use tracing::debug;
15
16/// WAL reader for reading records from log segments
17pub struct WalReader {
18    /// Sorted list of segment file paths
19    segments: Vec<PathBuf>,
20    /// Index of current segment being read
21    current_segment_index: usize,
22    /// Buffered reader for current segment
23    reader: Option<BufReader<File>>,
24    /// Current byte offset within segment
25    current_offset: u64,
26}
27
28impl WalReader {
29    /// Create a new WAL reader for the given WAL directory
30    pub fn new(wal_dir: &Path) -> Result<Self> {
31        let segments = Self::discover_segments(wal_dir)?;
32
33        let mut reader = Self {
34            segments,
35            current_segment_index: 0,
36            reader: None,
37            current_offset: 0,
38        };
39
40        // Open first segment if available
41        if !reader.segments.is_empty() {
42            reader.open_segment(0)?;
43        }
44
45        Ok(reader)
46    }
47
48    /// Discover and sort all WAL segment files in the directory
49    fn discover_segments(wal_dir: &Path) -> Result<Vec<PathBuf>> {
50        if !wal_dir.exists() {
51            return Ok(Vec::new());
52        }
53
54        let mut segments: Vec<PathBuf> = std::fs::read_dir(wal_dir)
55            .map_err(|e| Error::Storage(format!("Failed to read WAL directory: {}", e)))?
56            .filter_map(|entry| entry.ok())
57            .map(|entry| entry.path())
58            .filter(|path| path.extension().map(|ext| ext == "log").unwrap_or(false))
59            .collect();
60
61        // Sort segments by filename (which contains sequence number)
62        segments.sort();
63
64        Ok(segments)
65    }
66
67    /// Open a segment file by index
68    fn open_segment(&mut self, index: usize) -> Result<()> {
69        if index >= self.segments.len() {
70            return Err(Error::Storage("Segment index out of bounds".to_string()));
71        }
72
73        let path = &self.segments[index];
74        let file = File::open(path)
75            .map_err(|e| Error::Storage(format!("Failed to open segment {:?}: {}", path, e)))?;
76
77        let mut reader = BufReader::new(file);
78
79        // Try to read header (v1.0+)
80        // If header is missing or invalid, assume legacy format (v0.x)
81        let header_offset = match WalHeader::read_from(&mut reader) {
82            Ok(header) => {
83                debug!(
84                    segment = ?path,
85                    version = header.version,
86                    "Opened WAL segment with header"
87                );
88                WalHeader::SIZE as u64
89            }
90            Err(_) => {
91                // No valid header, must be legacy format - reopen to reset position
92                let file = File::open(path).map_err(|e| {
93                    Error::Storage(format!("Failed to reopen segment {:?}: {}", path, e))
94                })?;
95                reader = BufReader::new(file);
96                debug!(segment = ?path, "Opened legacy WAL segment (pre-v1.0)");
97                0
98            }
99        };
100
101        self.reader = Some(reader);
102        self.current_segment_index = index;
103        self.current_offset = header_offset;
104
105        Ok(())
106    }
107
108    /// Move to the next segment
109    fn advance_segment(&mut self) -> Result<bool> {
110        let next_index = self.current_segment_index + 1;
111        if next_index >= self.segments.len() {
112            self.reader = None;
113            return Ok(false);
114        }
115
116        self.open_segment(next_index)?;
117        Ok(true)
118    }
119
120    /// Read the next record from the WAL
121    ///
122    /// Returns `Ok(Some(record))` if a record was read successfully,
123    /// `Ok(None)` if we've reached the end of all segments,
124    /// or an error if reading/parsing failed.
125    pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
126        loop {
127            let reader = match &mut self.reader {
128                Some(r) => r,
129                None => return Ok(None), // No more segments
130            };
131
132            // Try to read a record from current segment
133            match Self::read_record(reader) {
134                Ok(Some((record, bytes_read))) => {
135                    self.current_offset += bytes_read as u64;
136                    return Ok(Some(record));
137                }
138                Ok(None) => {
139                    // End of current segment, try next
140                    if !self.advance_segment()? {
141                        return Ok(None);
142                    }
143                    // Continue loop to read from new segment
144                }
145                Err(e) => {
146                    // Check if this is an incomplete record at end of file
147                    // (possible crash during write)
148                    if Self::is_truncation_error(&e) {
149                        // Try to advance to next segment
150                        if !self.advance_segment()? {
151                            return Ok(None);
152                        }
153                        // Continue loop to read from new segment
154                    } else {
155                        return Err(e);
156                    }
157                }
158            }
159        }
160    }
161
162    /// Read a single record from a reader
163    ///
164    /// Returns the record and number of bytes consumed
165    fn read_record(reader: &mut BufReader<File>) -> Result<Option<(WalRecord, usize)>> {
166        // Read length field (4 bytes)
167        let mut len_buf = [0u8; 4];
168        match reader.read_exact(&mut len_buf) {
169            Ok(()) => {}
170            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
171                return Ok(None); // End of file
172            }
173            Err(e) => {
174                return Err(Error::Storage(format!(
175                    "Failed to read record length: {}",
176                    e
177                )));
178            }
179        }
180
181        let content_len = u32::from_le_bytes(len_buf) as usize;
182
183        // Sanity check on length (max 16MB per record)
184        if content_len > 16 * 1024 * 1024 {
185            return Err(Error::Storage(format!(
186                "Record length too large: {} bytes",
187                content_len
188            )));
189        }
190
191        // Read content (type + payload) and CRC
192        let total_data_len = content_len + 4; // content + crc
193        let mut data = vec![0u8; total_data_len];
194
195        reader.read_exact(&mut data).map_err(|e| {
196            if e.kind() == std::io::ErrorKind::UnexpectedEof {
197                Error::Serialization("Incomplete record: truncated".to_string())
198            } else {
199                Error::Storage(format!("Failed to read record data: {}", e))
200            }
201        })?;
202
203        // Build full frame for decoding
204        let mut frame = Vec::with_capacity(4 + total_data_len);
205        frame.extend_from_slice(&len_buf);
206        frame.extend_from_slice(&data);
207
208        // Decode record (includes CRC validation)
209        let (record, bytes_consumed) = WalRecord::decode(&frame)?;
210
211        Ok(Some((record, bytes_consumed)))
212    }
213
214    /// Check if an error indicates a truncated/incomplete record
215    fn is_truncation_error(err: &Error) -> bool {
216        match err {
217            Error::Serialization(msg) => msg.contains("Incomplete") || msg.contains("truncated"),
218            _ => false,
219        }
220    }
221
222    /// Get the number of segments discovered
223    pub fn segment_count(&self) -> usize {
224        self.segments.len()
225    }
226
227    /// Get the current segment index being read
228    pub fn current_segment(&self) -> usize {
229        self.current_segment_index
230    }
231
232    /// Reset reader to the beginning
233    pub fn reset(&mut self) -> Result<()> {
234        if !self.segments.is_empty() {
235            self.open_segment(0)?;
236        } else {
237            self.reader = None;
238            self.current_segment_index = 0;
239            self.current_offset = 0;
240        }
241        Ok(())
242    }
243
244    /// Seek to a specific segment
245    pub fn seek_to_segment(&mut self, index: usize) -> Result<()> {
246        if index >= self.segments.len() {
247            return Err(Error::Storage(format!(
248                "Segment index {} out of range (have {} segments)",
249                index,
250                self.segments.len()
251            )));
252        }
253        self.open_segment(index)
254    }
255
256    /// Read all remaining records into a vector
257    pub fn read_all(&mut self) -> Result<Vec<WalRecord>> {
258        let mut records = Vec::new();
259        while let Some(record) = self.next_record()? {
260            records.push(record);
261        }
262        Ok(records)
263    }
264}
265
266/// Iterator implementation for WalReader
267impl Iterator for WalReader {
268    type Item = Result<WalRecord>;
269
270    fn next(&mut self) -> Option<Self::Item> {
271        match self.next_record() {
272            Ok(Some(record)) => Some(Ok(record)),
273            Ok(None) => None,
274            Err(e) => Some(Err(e)),
275        }
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use crate::{SyncMode, WalWriter};
283    use tempfile::TempDir;
284
285    fn setup_test_wal() -> (TempDir, PathBuf) {
286        let temp_dir = TempDir::new().expect("Failed to create temp dir");
287        let wal_path = temp_dir.path().join("wal");
288        std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
289        (temp_dir, wal_path)
290    }
291
292    #[test]
293    fn test_empty_wal_reader() {
294        let (_temp_dir, wal_path) = setup_test_wal();
295
296        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
297        assert_eq!(reader.segment_count(), 0);
298        assert!(reader.next_record().unwrap().is_none());
299    }
300
301    #[test]
302    fn test_read_single_record() {
303        let (_temp_dir, wal_path) = setup_test_wal();
304
305        // Write a record
306        {
307            let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
308                .expect("Failed to create writer");
309            let record = WalRecord::put(b"key1".to_vec(), b"value1".to_vec());
310            writer.append(record).expect("Failed to append");
311            writer.sync().expect("Failed to sync");
312        }
313
314        // Read it back
315        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
316        assert_eq!(reader.segment_count(), 1);
317
318        let record = reader.next().unwrap().expect("Expected a record");
319        match &record.payload {
320            crate::record::RecordPayload::Put { key, value } => {
321                assert_eq!(key, b"key1");
322                assert_eq!(value, b"value1");
323            }
324            _ => panic!("Expected Put record"),
325        }
326
327        assert!(reader.next_record().unwrap().is_none());
328    }
329
330    #[test]
331    fn test_read_multiple_records() {
332        let (_temp_dir, wal_path) = setup_test_wal();
333
334        // Write multiple records
335        {
336            let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
337                .expect("Failed to create writer");
338
339            for i in 0..10 {
340                let record = WalRecord::put(
341                    format!("key{}", i).into_bytes(),
342                    format!("value{}", i).into_bytes(),
343                );
344                writer.append(record).expect("Failed to append");
345            }
346            writer.sync().expect("Failed to sync");
347        }
348
349        // Read them back
350        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
351        let records = reader.read_all().expect("Failed to read all");
352
353        assert_eq!(records.len(), 10);
354    }
355
356    #[test]
357    fn test_read_across_segment_rotation() {
358        let (_temp_dir, wal_path) = setup_test_wal();
359
360        // Write with small segment size to force rotation
361        {
362            let mut writer =
363                WalWriter::new(&wal_path, 100, SyncMode::Sync).expect("Failed to create writer");
364
365            for i in 0..20 {
366                let record = WalRecord::put(
367                    format!("key{}", i).into_bytes(),
368                    format!("value{}", i).into_bytes(),
369                );
370                writer.append(record).expect("Failed to append");
371            }
372            writer.sync().expect("Failed to sync");
373        }
374
375        // Read all records
376        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
377        let records = reader.read_all().expect("Failed to read all");
378
379        assert_eq!(records.len(), 20);
380        // Verify multiple segments were created
381        assert!(reader.segment_count() > 1, "Expected multiple segments");
382    }
383
384    #[test]
385    fn test_reader_reset() {
386        let (_temp_dir, wal_path) = setup_test_wal();
387
388        // Write some records
389        {
390            let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
391                .expect("Failed to create writer");
392
393            for i in 0..5 {
394                let record = WalRecord::put(
395                    format!("key{}", i).into_bytes(),
396                    format!("value{}", i).into_bytes(),
397                );
398                writer.append(record).expect("Failed to append");
399            }
400            writer.sync().expect("Failed to sync");
401        }
402
403        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
404
405        // Read all
406        let first_read = reader.read_all().expect("Failed to read all");
407        assert_eq!(first_read.len(), 5);
408
409        // Reset and read again
410        reader.reset().expect("Failed to reset");
411        let second_read = reader.read_all().expect("Failed to read all");
412        assert_eq!(second_read.len(), 5);
413    }
414
415    #[test]
416    fn test_reader_with_transaction_markers() {
417        let (_temp_dir, wal_path) = setup_test_wal();
418
419        // Write transaction sequence
420        {
421            let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
422                .expect("Failed to create writer");
423
424            writer
425                .append(WalRecord::begin_tx(1))
426                .expect("Failed to append");
427            writer
428                .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
429                .expect("Failed to append");
430            writer
431                .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
432                .expect("Failed to append");
433            writer
434                .append(WalRecord::commit_tx(1))
435                .expect("Failed to append");
436            writer.sync().expect("Failed to sync");
437        }
438
439        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
440        let records = reader.read_all().expect("Failed to read all");
441
442        assert_eq!(records.len(), 4);
443        assert_eq!(records[0].record_type, crate::RecordType::BeginTx);
444        assert_eq!(records[1].record_type, crate::RecordType::Put);
445        assert_eq!(records[2].record_type, crate::RecordType::Put);
446        assert_eq!(records[3].record_type, crate::RecordType::CommitTx);
447    }
448}