rustlite_wal/
reader.rs

1// WAL reader module - reads and replays log records
2//
3// The reader handles:
4// 1. Segment discovery - finding all WAL segment files in order
5// 2. Record reading - iterating through records in each segment
6// 3. CRC validation - verifying data integrity of each record
7
8use crate::record::WalRecord;
9use rustlite_core::{Error, Result};
10use std::fs::File;
11use std::io::{BufReader, Read};
12use std::path::{Path, PathBuf};
13
14/// WAL reader for reading records from log segments
15pub struct WalReader {
16    /// Sorted list of segment file paths
17    segments: Vec<PathBuf>,
18    /// Index of current segment being read
19    current_segment_index: usize,
20    /// Buffered reader for current segment
21    reader: Option<BufReader<File>>,
22    /// Current byte offset within segment
23    current_offset: u64,
24}
25
26impl WalReader {
27    /// Create a new WAL reader for the given WAL directory
28    pub fn new(wal_dir: &Path) -> Result<Self> {
29        let segments = Self::discover_segments(wal_dir)?;
30
31        let mut reader = Self {
32            segments,
33            current_segment_index: 0,
34            reader: None,
35            current_offset: 0,
36        };
37
38        // Open first segment if available
39        if !reader.segments.is_empty() {
40            reader.open_segment(0)?;
41        }
42
43        Ok(reader)
44    }
45
46    /// Discover and sort all WAL segment files in the directory
47    fn discover_segments(wal_dir: &Path) -> Result<Vec<PathBuf>> {
48        if !wal_dir.exists() {
49            return Ok(Vec::new());
50        }
51
52        let mut segments: Vec<PathBuf> = std::fs::read_dir(wal_dir)
53            .map_err(|e| Error::Storage(format!("Failed to read WAL directory: {}", e)))?
54            .filter_map(|entry| entry.ok())
55            .map(|entry| entry.path())
56            .filter(|path| path.extension().map(|ext| ext == "log").unwrap_or(false))
57            .collect();
58
59        // Sort segments by filename (which contains sequence number)
60        segments.sort();
61
62        Ok(segments)
63    }
64
65    /// Open a segment file by index
66    fn open_segment(&mut self, index: usize) -> Result<()> {
67        if index >= self.segments.len() {
68            return Err(Error::Storage("Segment index out of bounds".to_string()));
69        }
70
71        let path = &self.segments[index];
72        let file = File::open(path)
73            .map_err(|e| Error::Storage(format!("Failed to open segment {:?}: {}", path, e)))?;
74
75        self.reader = Some(BufReader::new(file));
76        self.current_segment_index = index;
77        self.current_offset = 0;
78
79        Ok(())
80    }
81
82    /// Move to the next segment
83    fn advance_segment(&mut self) -> Result<bool> {
84        let next_index = self.current_segment_index + 1;
85        if next_index >= self.segments.len() {
86            self.reader = None;
87            return Ok(false);
88        }
89
90        self.open_segment(next_index)?;
91        Ok(true)
92    }
93
94    /// Read the next record from the WAL
95    ///
96    /// Returns `Ok(Some(record))` if a record was read successfully,
97    /// `Ok(None)` if we've reached the end of all segments,
98    /// or an error if reading/parsing failed.
99    pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
100        loop {
101            let reader = match &mut self.reader {
102                Some(r) => r,
103                None => return Ok(None), // No more segments
104            };
105
106            // Try to read a record from current segment
107            match Self::read_record(reader) {
108                Ok(Some((record, bytes_read))) => {
109                    self.current_offset += bytes_read as u64;
110                    return Ok(Some(record));
111                }
112                Ok(None) => {
113                    // End of current segment, try next
114                    if !self.advance_segment()? {
115                        return Ok(None);
116                    }
117                    // Continue loop to read from new segment
118                }
119                Err(e) => {
120                    // Check if this is an incomplete record at end of file
121                    // (possible crash during write)
122                    if Self::is_truncation_error(&e) {
123                        // Try to advance to next segment
124                        if !self.advance_segment()? {
125                            return Ok(None);
126                        }
127                        // Continue loop to read from new segment
128                    } else {
129                        return Err(e);
130                    }
131                }
132            }
133        }
134    }
135
136    /// Read a single record from a reader
137    ///
138    /// Returns the record and number of bytes consumed
139    fn read_record(reader: &mut BufReader<File>) -> Result<Option<(WalRecord, usize)>> {
140        // Read length field (4 bytes)
141        let mut len_buf = [0u8; 4];
142        match reader.read_exact(&mut len_buf) {
143            Ok(()) => {}
144            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
145                return Ok(None); // End of file
146            }
147            Err(e) => {
148                return Err(Error::Storage(format!(
149                    "Failed to read record length: {}",
150                    e
151                )));
152            }
153        }
154
155        let content_len = u32::from_le_bytes(len_buf) as usize;
156
157        // Sanity check on length (max 16MB per record)
158        if content_len > 16 * 1024 * 1024 {
159            return Err(Error::Storage(format!(
160                "Record length too large: {} bytes",
161                content_len
162            )));
163        }
164
165        // Read content (type + payload) and CRC
166        let total_data_len = content_len + 4; // content + crc
167        let mut data = vec![0u8; total_data_len];
168
169        reader.read_exact(&mut data).map_err(|e| {
170            if e.kind() == std::io::ErrorKind::UnexpectedEof {
171                Error::Serialization("Incomplete record: truncated".to_string())
172            } else {
173                Error::Storage(format!("Failed to read record data: {}", e))
174            }
175        })?;
176
177        // Build full frame for decoding
178        let mut frame = Vec::with_capacity(4 + total_data_len);
179        frame.extend_from_slice(&len_buf);
180        frame.extend_from_slice(&data);
181
182        // Decode record (includes CRC validation)
183        let (record, bytes_consumed) = WalRecord::decode(&frame)?;
184
185        Ok(Some((record, bytes_consumed)))
186    }
187
188    /// Check if an error indicates a truncated/incomplete record
189    fn is_truncation_error(err: &Error) -> bool {
190        match err {
191            Error::Serialization(msg) => msg.contains("Incomplete") || msg.contains("truncated"),
192            _ => false,
193        }
194    }
195
196    /// Get the number of segments discovered
197    pub fn segment_count(&self) -> usize {
198        self.segments.len()
199    }
200
201    /// Get the current segment index being read
202    pub fn current_segment(&self) -> usize {
203        self.current_segment_index
204    }
205
206    /// Reset reader to the beginning
207    pub fn reset(&mut self) -> Result<()> {
208        if !self.segments.is_empty() {
209            self.open_segment(0)?;
210        } else {
211            self.reader = None;
212            self.current_segment_index = 0;
213            self.current_offset = 0;
214        }
215        Ok(())
216    }
217
218    /// Seek to a specific segment
219    pub fn seek_to_segment(&mut self, index: usize) -> Result<()> {
220        if index >= self.segments.len() {
221            return Err(Error::Storage(format!(
222                "Segment index {} out of range (have {} segments)",
223                index,
224                self.segments.len()
225            )));
226        }
227        self.open_segment(index)
228    }
229
230    /// Read all remaining records into a vector
231    pub fn read_all(&mut self) -> Result<Vec<WalRecord>> {
232        let mut records = Vec::new();
233        while let Some(record) = self.next_record()? {
234            records.push(record);
235        }
236        Ok(records)
237    }
238}
239
240/// Iterator implementation for WalReader
241impl Iterator for WalReader {
242    type Item = Result<WalRecord>;
243
244    fn next(&mut self) -> Option<Self::Item> {
245        match self.next_record() {
246            Ok(Some(record)) => Some(Ok(record)),
247            Ok(None) => None,
248            Err(e) => Some(Err(e)),
249        }
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256    use crate::{SyncMode, WalWriter};
257    use tempfile::TempDir;
258
259    fn setup_test_wal() -> (TempDir, PathBuf) {
260        let temp_dir = TempDir::new().expect("Failed to create temp dir");
261        let wal_path = temp_dir.path().join("wal");
262        std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
263        (temp_dir, wal_path)
264    }
265
266    #[test]
267    fn test_empty_wal_reader() {
268        let (_temp_dir, wal_path) = setup_test_wal();
269
270        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
271        assert_eq!(reader.segment_count(), 0);
272        assert!(reader.next_record().unwrap().is_none());
273    }
274
275    #[test]
276    fn test_read_single_record() {
277        let (_temp_dir, wal_path) = setup_test_wal();
278
279        // Write a record
280        {
281            let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
282                .expect("Failed to create writer");
283            let record = WalRecord::put(b"key1".to_vec(), b"value1".to_vec());
284            writer.append(record).expect("Failed to append");
285            writer.sync().expect("Failed to sync");
286        }
287
288        // Read it back
289        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
290        assert_eq!(reader.segment_count(), 1);
291
292        let record = reader.next().unwrap().expect("Expected a record");
293        match &record.payload {
294            crate::record::RecordPayload::Put { key, value } => {
295                assert_eq!(key, b"key1");
296                assert_eq!(value, b"value1");
297            }
298            _ => panic!("Expected Put record"),
299        }
300
301        assert!(reader.next_record().unwrap().is_none());
302    }
303
304    #[test]
305    fn test_read_multiple_records() {
306        let (_temp_dir, wal_path) = setup_test_wal();
307
308        // Write multiple records
309        {
310            let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
311                .expect("Failed to create writer");
312
313            for i in 0..10 {
314                let record = WalRecord::put(
315                    format!("key{}", i).into_bytes(),
316                    format!("value{}", i).into_bytes(),
317                );
318                writer.append(record).expect("Failed to append");
319            }
320            writer.sync().expect("Failed to sync");
321        }
322
323        // Read them back
324        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
325        let records = reader.read_all().expect("Failed to read all");
326
327        assert_eq!(records.len(), 10);
328    }
329
330    #[test]
331    fn test_read_across_segment_rotation() {
332        let (_temp_dir, wal_path) = setup_test_wal();
333
334        // Write with small segment size to force rotation
335        {
336            let mut writer =
337                WalWriter::new(&wal_path, 100, SyncMode::Sync).expect("Failed to create writer");
338
339            for i in 0..20 {
340                let record = WalRecord::put(
341                    format!("key{}", i).into_bytes(),
342                    format!("value{}", i).into_bytes(),
343                );
344                writer.append(record).expect("Failed to append");
345            }
346            writer.sync().expect("Failed to sync");
347        }
348
349        // Read all records
350        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
351        let records = reader.read_all().expect("Failed to read all");
352
353        assert_eq!(records.len(), 20);
354        // Verify multiple segments were created
355        assert!(reader.segment_count() > 1, "Expected multiple segments");
356    }
357
358    #[test]
359    fn test_reader_reset() {
360        let (_temp_dir, wal_path) = setup_test_wal();
361
362        // Write some records
363        {
364            let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
365                .expect("Failed to create writer");
366
367            for i in 0..5 {
368                let record = WalRecord::put(
369                    format!("key{}", i).into_bytes(),
370                    format!("value{}", i).into_bytes(),
371                );
372                writer.append(record).expect("Failed to append");
373            }
374            writer.sync().expect("Failed to sync");
375        }
376
377        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
378
379        // Read all
380        let first_read = reader.read_all().expect("Failed to read all");
381        assert_eq!(first_read.len(), 5);
382
383        // Reset and read again
384        reader.reset().expect("Failed to reset");
385        let second_read = reader.read_all().expect("Failed to read all");
386        assert_eq!(second_read.len(), 5);
387    }
388
389    #[test]
390    fn test_reader_with_transaction_markers() {
391        let (_temp_dir, wal_path) = setup_test_wal();
392
393        // Write transaction sequence
394        {
395            let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
396                .expect("Failed to create writer");
397
398            writer
399                .append(WalRecord::begin_tx(1))
400                .expect("Failed to append");
401            writer
402                .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
403                .expect("Failed to append");
404            writer
405                .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
406                .expect("Failed to append");
407            writer
408                .append(WalRecord::commit_tx(1))
409                .expect("Failed to append");
410            writer.sync().expect("Failed to sync");
411        }
412
413        let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
414        let records = reader.read_all().expect("Failed to read all");
415
416        assert_eq!(records.len(), 4);
417        assert_eq!(records[0].record_type, crate::RecordType::BeginTx);
418        assert_eq!(records[1].record_type, crate::RecordType::Put);
419        assert_eq!(records[2].record_type, crate::RecordType::Put);
420        assert_eq!(records[3].record_type, crate::RecordType::CommitTx);
421    }
422}