Skip to main content

reddb_server/storage/wal/
reader.rs

1use super::record::{WalRecord, WAL_MAGIC, WAL_VERSION};
2use std::fs::File;
3use std::io::{self, BufReader, Read};
4use std::path::Path;
5
6/// Reader for the Write-Ahead Log
7pub struct WalReader {
8    reader: BufReader<File>,
9    position: u64,
10}
11
12impl WalReader {
13    /// Open a WAL file for reading
14    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
15        let file = File::open(path)?;
16        let mut reader = BufReader::new(file);
17
18        // Check header
19        let mut header = [0u8; 8];
20        reader.read_exact(&mut header)?;
21
22        if &header[0..4] != WAL_MAGIC {
23            return Err(io::Error::new(
24                io::ErrorKind::InvalidData,
25                "Invalid WAL magic bytes",
26            ));
27        }
28
29        if header[4] != WAL_VERSION {
30            return Err(io::Error::new(
31                io::ErrorKind::InvalidData,
32                format!("Unsupported WAL version: {}", header[4]),
33            ));
34        }
35
36        Ok(Self {
37            reader,
38            position: 8,
39        })
40    }
41
42    /// Iterate over records
43    /// Returns iterator that yields (LSn, WalRecord)
44    pub fn iter(self) -> WalIterator {
45        WalIterator {
46            reader: self.reader,
47            position: self.position,
48        }
49    }
50}
51
52pub struct WalIterator {
53    reader: BufReader<File>,
54    position: u64,
55}
56
57impl Iterator for WalIterator {
58    type Item = io::Result<(u64, WalRecord)>;
59
60    fn next(&mut self) -> Option<Self::Item> {
61        // Need to record start position for LSN
62        // Since BufReader buffers, we can't trust file.seek(Current) directly without accounting for buffer.
63        // But `WalRecord::read` reads sequentially.
64        // The simple way: track position manually based on bytes read.
65        // WalRecord::read consumes bytes.
66
67        // Wait, WalRecord::read takes &mut R. We can wrap the reader to count bytes?
68        // Or just rely on the fact that we read sequentially.
69        // But we need to know the *start* LSN of the record to return it.
70
71        let start_pos = self.position;
72
73        // We need a way to track how many bytes were read by `WalRecord::read`.
74        // Let's create a counting wrapper.
75        struct CountingReader<'a, R> {
76            inner: &'a mut R,
77            count: u64,
78        }
79
80        impl<'a, R: Read> Read for CountingReader<'a, R> {
81            fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
82                let n = self.inner.read(buf)?;
83                self.count += n as u64;
84                Ok(n)
85            }
86        }
87
88        let mut counter = CountingReader {
89            inner: &mut self.reader,
90            count: 0,
91        };
92
93        match WalRecord::read(&mut counter) {
94            Ok(Some(record)) => {
95                self.position += counter.count;
96                Some(Ok((start_pos, record)))
97            }
98            Ok(None) => None, // EOF
99            Err(e) => Some(Err(e)),
100        }
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::super::writer::WalWriter;
107    use super::*;
108    use std::path::PathBuf;
109
110    struct FileGuard {
111        path: PathBuf,
112    }
113
114    impl Drop for FileGuard {
115        fn drop(&mut self) {
116            let _ = std::fs::remove_file(&self.path);
117        }
118    }
119
120    fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
121        let path =
122            std::env::temp_dir().join(format!("rb_wal_reader_{}_{}.wal", name, std::process::id()));
123        let guard = FileGuard { path: path.clone() };
124        let _ = std::fs::remove_file(&path);
125        (guard, path)
126    }
127
128    #[test]
129    fn test_read_empty_wal() {
130        let (_guard, path) = temp_wal("empty");
131
132        // Create empty WAL
133        {
134            let _writer = WalWriter::open(&path).unwrap();
135        }
136
137        // Read it
138        let reader = WalReader::open(&path).unwrap();
139        let records: Vec<_> = reader.iter().collect();
140        assert!(records.is_empty());
141    }
142
143    #[test]
144    fn test_read_single_record() {
145        let (_guard, path) = temp_wal("single");
146
147        // Write one record
148        {
149            let mut writer = WalWriter::open(&path).unwrap();
150            writer.append(&WalRecord::Begin { tx_id: 42 }).unwrap();
151        }
152
153        // Read it back
154        let reader = WalReader::open(&path).unwrap();
155        let records: Vec<_> = reader.iter().collect();
156
157        assert_eq!(records.len(), 1);
158        let (lsn, record) = records[0].as_ref().unwrap();
159        assert_eq!(*lsn, 8);
160        assert_eq!(*record, WalRecord::Begin { tx_id: 42 });
161    }
162
163    #[test]
164    fn test_read_multiple_records() {
165        let (_guard, path) = temp_wal("multi");
166
167        // Write multiple records
168        {
169            let mut writer = WalWriter::open(&path).unwrap();
170            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
171            writer
172                .append(&WalRecord::PageWrite {
173                    tx_id: 1,
174                    page_id: 10,
175                    data: vec![1, 2, 3],
176                })
177                .unwrap();
178            writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
179        }
180
181        // Read back
182        let reader = WalReader::open(&path).unwrap();
183        let records: Vec<_> = reader.iter().collect();
184
185        assert_eq!(records.len(), 3);
186
187        // Check each record
188        match &records[0].as_ref().unwrap().1 {
189            WalRecord::Begin { tx_id } => assert_eq!(*tx_id, 1),
190            _ => panic!("Expected Begin"),
191        }
192        match &records[1].as_ref().unwrap().1 {
193            WalRecord::PageWrite {
194                tx_id,
195                page_id,
196                data,
197            } => {
198                assert_eq!(*tx_id, 1);
199                assert_eq!(*page_id, 10);
200                assert_eq!(data, &vec![1, 2, 3]);
201            }
202            _ => panic!("Expected PageWrite"),
203        }
204        match &records[2].as_ref().unwrap().1 {
205            WalRecord::Commit { tx_id } => assert_eq!(*tx_id, 1),
206            _ => panic!("Expected Commit"),
207        }
208    }
209
210    #[test]
211    fn test_lsn_tracking() {
212        let (_guard, path) = temp_wal("lsn");
213
214        // Write records
215        let (lsn1, lsn2, lsn3);
216        {
217            let mut writer = WalWriter::open(&path).unwrap();
218            lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
219            lsn2 = writer.append(&WalRecord::Checkpoint { lsn: 100 }).unwrap();
220            lsn3 = writer.append(&WalRecord::Rollback { tx_id: 1 }).unwrap();
221        }
222
223        // Read and verify LSNs
224        let reader = WalReader::open(&path).unwrap();
225        let records: Vec<_> = reader.iter().collect();
226
227        assert_eq!(records.len(), 3);
228        assert_eq!(records[0].as_ref().unwrap().0, lsn1);
229        assert_eq!(records[1].as_ref().unwrap().0, lsn2);
230        assert_eq!(records[2].as_ref().unwrap().0, lsn3);
231    }
232
233    #[test]
234    fn test_invalid_magic() {
235        let (_guard, path) = temp_wal("badmagic");
236
237        // Write invalid file
238        std::fs::write(&path, b"BAAD0000").unwrap();
239
240        let result = WalReader::open(&path);
241        assert!(result.is_err());
242    }
243
244    #[test]
245    fn test_invalid_version() {
246        let (_guard, path) = temp_wal("badver");
247
248        // Write header with wrong version
249        let mut header = Vec::new();
250        header.extend_from_slice(WAL_MAGIC);
251        header.push(99); // Wrong version
252        header.extend_from_slice(&[0u8; 3]);
253        std::fs::write(&path, &header).unwrap();
254
255        let result = WalReader::open(&path);
256        assert!(result.is_err());
257    }
258
259    #[test]
260    fn test_read_large_page_write() {
261        let (_guard, path) = temp_wal("large");
262
263        let large_data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
264
265        // Write large record
266        {
267            let mut writer = WalWriter::open(&path).unwrap();
268            writer
269                .append(&WalRecord::PageWrite {
270                    tx_id: 1,
271                    page_id: 0,
272                    data: large_data.clone(),
273                })
274                .unwrap();
275        }
276
277        // Read back
278        let reader = WalReader::open(&path).unwrap();
279        let records: Vec<_> = reader.iter().collect();
280
281        assert_eq!(records.len(), 1);
282        match &records[0].as_ref().unwrap().1 {
283            WalRecord::PageWrite { data, .. } => {
284                assert_eq!(*data, large_data);
285            }
286            _ => panic!("Expected PageWrite"),
287        }
288    }
289}