rustlite_wal/
reader.rs

1// WAL reader module - reads and replays log records
2//
3// The reader handles:
4// 1. Segment discovery - finding all WAL segment files in order
5// 2. Record reading - iterating through records in each segment
6// 3. CRC validation - verifying data integrity of each record
7
8use crate::record::WalRecord;
9use rustlite_core::{Error, Result};
10use std::fs::File;
11use std::io::{BufReader, Read};
12use std::path::{Path, PathBuf};
13
14/// WAL reader for reading records from log segments
15pub struct WalReader {
16    /// Sorted list of segment file paths
17    segments: Vec<PathBuf>,
18    /// Index of current segment being read
19    current_segment_index: usize,
20    /// Buffered reader for current segment
21    reader: Option<BufReader<File>>,
22    /// Current byte offset within segment
23    current_offset: u64,
24}
25
26impl WalReader {
27    /// Create a new WAL reader for the given WAL directory
28    pub fn new(wal_dir: &Path) -> Result<Self> {
29        let segments = Self::discover_segments(wal_dir)?;
30
31        let mut reader = Self {
32            segments,
33            current_segment_index: 0,
34            reader: None,
35            current_offset: 0,
36        };
37
38        // Open first segment if available
39        if !reader.segments.is_empty() {
40            reader.open_segment(0)?;
41        }
42
43        Ok(reader)
44    }
45
46    /// Discover and sort all WAL segment files in the directory
47    fn discover_segments(wal_dir: &Path) -> Result<Vec<PathBuf>> {
48        if !wal_dir.exists() {
49            return Ok(Vec::new());
50        }
51
52        let mut segments: Vec<PathBuf> = std::fs::read_dir(wal_dir)
53            .map_err(|e| Error::Storage(format!("Failed to read WAL directory: {}", e)))?
54            .filter_map(|entry| entry.ok())
55            .map(|entry| entry.path())
56            .filter(|path| {
57                path.extension()
58                    .map(|ext| ext == "log")
59                    .unwrap_or(false)
60            })
61            .collect();
62
63        // Sort segments by filename (which contains sequence number)
64        segments.sort();
65
66        Ok(segments)
67    }
68
69    /// Open a segment file by index
70    fn open_segment(&mut self, index: usize) -> Result<()> {
71        if index >= self.segments.len() {
72            return Err(Error::Storage("Segment index out of bounds".to_string()));
73        }
74
75        let path = &self.segments[index];
76        let file = File::open(path)
77            .map_err(|e| Error::Storage(format!("Failed to open segment {:?}: {}", path, e)))?;
78
79        self.reader = Some(BufReader::new(file));
80        self.current_segment_index = index;
81        self.current_offset = 0;
82
83        Ok(())
84    }
85
86    /// Move to the next segment
87    fn advance_segment(&mut self) -> Result<bool> {
88        let next_index = self.current_segment_index + 1;
89        if next_index >= self.segments.len() {
90            self.reader = None;
91            return Ok(false);
92        }
93
94        self.open_segment(next_index)?;
95        Ok(true)
96    }
97
98    /// Read the next record from the WAL
99    ///
100    /// Returns `Ok(Some(record))` if a record was read successfully,
101    /// `Ok(None)` if we've reached the end of all segments,
102    /// or an error if reading/parsing failed.
103    pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
104        loop {
105            let reader = match &mut self.reader {
106                Some(r) => r,
107                None => return Ok(None), // No more segments
108            };
109
110            // Try to read a record from current segment
111            match Self::read_record(reader) {
112                Ok(Some((record, bytes_read))) => {
113                    self.current_offset += bytes_read as u64;
114                    return Ok(Some(record));
115                }
116                Ok(None) => {
117                    // End of current segment, try next
118                    if !self.advance_segment()? {
119                        return Ok(None);
120                    }
121                    // Continue loop to read from new segment
122                }
123                Err(e) => {
124                    // Check if this is an incomplete record at end of file
125                    // (possible crash during write)
126                    if Self::is_truncation_error(&e) {
127                        // Try to advance to next segment
128                        if !self.advance_segment()? {
129                            return Ok(None);
130                        }
131                        // Continue loop to read from new segment
132                    } else {
133                        return Err(e);
134                    }
135                }
136            }
137        }
138    }
139
140    /// Read a single record from a reader
141    ///
142    /// Returns the record and number of bytes consumed
143    fn read_record(reader: &mut BufReader<File>) -> Result<Option<(WalRecord, usize)>> {
144        // Read length field (4 bytes)
145        let mut len_buf = [0u8; 4];
146        match reader.read_exact(&mut len_buf) {
147            Ok(()) => {}
148            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
149                return Ok(None); // End of file
150            }
151            Err(e) => {
152                return Err(Error::Storage(format!("Failed to read record length: {}", e)));
153            }
154        }
155
156        let content_len = u32::from_le_bytes(len_buf) as usize;
157
158        // Sanity check on length (max 16MB per record)
159        if content_len > 16 * 1024 * 1024 {
160            return Err(Error::Storage(format!(
161                "Record length too large: {} bytes",
162                content_len
163            )));
164        }
165
166        // Read content (type + payload) and CRC
167        let total_data_len = content_len + 4; // content + crc
168        let mut data = vec![0u8; total_data_len];
169
170        reader.read_exact(&mut data).map_err(|e| {
171            if e.kind() == std::io::ErrorKind::UnexpectedEof {
172                Error::Serialization("Incomplete record: truncated".to_string())
173            } else {
174                Error::Storage(format!("Failed to read record data: {}", e))
175            }
176        })?;
177
178        // Build full frame for decoding
179        let mut frame = Vec::with_capacity(4 + total_data_len);
180        frame.extend_from_slice(&len_buf);
181        frame.extend_from_slice(&data);
182
183        // Decode record (includes CRC validation)
184        let (record, bytes_consumed) = WalRecord::decode(&frame)?;
185
186        Ok(Some((record, bytes_consumed)))
187    }
188
189    /// Check if an error indicates a truncated/incomplete record
190    fn is_truncation_error(err: &Error) -> bool {
191        match err {
192            Error::Serialization(msg) => msg.contains("Incomplete") || msg.contains("truncated"),
193            _ => false,
194        }
195    }
196
197    /// Get the number of segments discovered
198    pub fn segment_count(&self) -> usize {
199        self.segments.len()
200    }
201
202    /// Get the current segment index being read
203    pub fn current_segment(&self) -> usize {
204        self.current_segment_index
205    }
206
207    /// Reset reader to the beginning
208    pub fn reset(&mut self) -> Result<()> {
209        if !self.segments.is_empty() {
210            self.open_segment(0)?;
211        } else {
212            self.reader = None;
213            self.current_segment_index = 0;
214            self.current_offset = 0;
215        }
216        Ok(())
217    }
218
219    /// Seek to a specific segment
220    pub fn seek_to_segment(&mut self, index: usize) -> Result<()> {
221        if index >= self.segments.len() {
222            return Err(Error::Storage(format!(
223                "Segment index {} out of range (have {} segments)",
224                index,
225                self.segments.len()
226            )));
227        }
228        self.open_segment(index)
229    }
230
231    /// Read all remaining records into a vector
232    pub fn read_all(&mut self) -> Result<Vec<WalRecord>> {
233        let mut records = Vec::new();
234        while let Some(record) = self.next_record()? {
235            records.push(record);
236        }
237        Ok(records)
238    }
239}
240
241/// Iterator implementation for WalReader
242impl Iterator for WalReader {
243    type Item = Result<WalRecord>;
244
245    fn next(&mut self) -> Option<Self::Item> {
246        match self.next_record() {
247            Ok(Some(record)) => Some(Ok(record)),
248            Ok(None) => None,
249            Err(e) => Some(Err(e)),
250        }
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257    use crate::{SyncMode, WalWriter};
258    use tempfile::TempDir;
259
260    fn setup_test_wal() -> (TempDir, PathBuf) {
261        let temp_dir = TempDir::new().expect("Failed to create temp dir");
262        let wal_path = temp_dir.path().join("wal");
263        std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
264        (temp_dir, wal_path)
265    }
266
267    #[test]
268    fn test_empty_wal_reader() {
269        let (_temp_dir, wal_path) = setup_test_wal();
270
271        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
272        assert_eq!(reader.segment_count(), 0);
273        assert!(reader.next_record().unwrap().is_none());
274    }
275
276    #[test]
277    fn test_read_single_record() {
278        let (_temp_dir, wal_path) = setup_test_wal();
279
280        // Write a record
281        {
282            let mut writer =
283                WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync).expect("Failed to create writer");
284            let record = WalRecord::put(b"key1".to_vec(), b"value1".to_vec());
285            writer.append(record).expect("Failed to append");
286            writer.sync().expect("Failed to sync");
287        }
288
289        // Read it back
290        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
291        assert_eq!(reader.segment_count(), 1);
292
293        let record = reader.next().unwrap().expect("Expected a record");
294        match &record.payload {
295            crate::record::RecordPayload::Put { key, value } => {
296                assert_eq!(key, b"key1");
297                assert_eq!(value, b"value1");
298            }
299            _ => panic!("Expected Put record"),
300        }
301
302        assert!(reader.next_record().unwrap().is_none());
303    }
304
305    #[test]
306    fn test_read_multiple_records() {
307        let (_temp_dir, wal_path) = setup_test_wal();
308
309        // Write multiple records
310        {
311            let mut writer =
312                WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync).expect("Failed to create writer");
313
314            for i in 0..10 {
315                let record = WalRecord::put(
316                    format!("key{}", i).into_bytes(),
317                    format!("value{}", i).into_bytes(),
318                );
319                writer.append(record).expect("Failed to append");
320            }
321            writer.sync().expect("Failed to sync");
322        }
323
324        // Read them back
325        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
326        let records = reader.read_all().expect("Failed to read all");
327
328        assert_eq!(records.len(), 10);
329    }
330
331    #[test]
332    fn test_read_across_segment_rotation() {
333        let (_temp_dir, wal_path) = setup_test_wal();
334
335        // Write with small segment size to force rotation
336        {
337            let mut writer =
338                WalWriter::new(&wal_path, 100, SyncMode::Sync).expect("Failed to create writer");
339
340            for i in 0..20 {
341                let record = WalRecord::put(
342                    format!("key{}", i).into_bytes(),
343                    format!("value{}", i).into_bytes(),
344                );
345                writer.append(record).expect("Failed to append");
346            }
347            writer.sync().expect("Failed to sync");
348        }
349
350        // Read all records
351        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
352        let records = reader.read_all().expect("Failed to read all");
353
354        assert_eq!(records.len(), 20);
355        // Verify multiple segments were created
356        assert!(reader.segment_count() > 1, "Expected multiple segments");
357    }
358
359    #[test]
360    fn test_reader_reset() {
361        let (_temp_dir, wal_path) = setup_test_wal();
362
363        // Write some records
364        {
365            let mut writer =
366                WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync).expect("Failed to create writer");
367
368            for i in 0..5 {
369                let record = WalRecord::put(
370                    format!("key{}", i).into_bytes(),
371                    format!("value{}", i).into_bytes(),
372                );
373                writer.append(record).expect("Failed to append");
374            }
375            writer.sync().expect("Failed to sync");
376        }
377
378        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
379
380        // Read all
381        let first_read = reader.read_all().expect("Failed to read all");
382        assert_eq!(first_read.len(), 5);
383
384        // Reset and read again
385        reader.reset().expect("Failed to reset");
386        let second_read = reader.read_all().expect("Failed to read all");
387        assert_eq!(second_read.len(), 5);
388    }
389
390    #[test]
391    fn test_reader_with_transaction_markers() {
392        let (_temp_dir, wal_path) = setup_test_wal();
393
394        // Write transaction sequence
395        {
396            let mut writer =
397                WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync).expect("Failed to create writer");
398
399            writer.append(WalRecord::begin_tx(1)).expect("Failed to append");
400            writer
401                .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
402                .expect("Failed to append");
403            writer
404                .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
405                .expect("Failed to append");
406            writer.append(WalRecord::commit_tx(1)).expect("Failed to append");
407            writer.sync().expect("Failed to sync");
408        }
409
410        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
411        let records = reader.read_all().expect("Failed to read all");
412
413        assert_eq!(records.len(), 4);
414        assert_eq!(records[0].record_type, crate::RecordType::BeginTx);
415        assert_eq!(records[1].record_type, crate::RecordType::Put);
416        assert_eq!(records[2].record_type, crate::RecordType::Put);
417        assert_eq!(records[3].record_type, crate::RecordType::CommitTx);
418    }
419}