Skip to main content

reddb_server/storage/wal/
reader.rs

1use super::record::{WalRecord, WAL_MAGIC, WAL_VERSION};
2use std::fs::File;
3use std::io::{self, BufReader, Read};
4use std::path::Path;
5
6/// Reader for the Write-Ahead Log
7pub struct WalReader {
8    reader: BufReader<File>,
9    position: u64,
10}
11
12impl WalReader {
13    /// Open a WAL file for reading
14    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
15        let file = File::open(path)?;
16        let mut reader = BufReader::new(file);
17
18        // Check header
19        let mut header = [0u8; 8];
20        reader.read_exact(&mut header)?;
21
22        if &header[0..4] != WAL_MAGIC {
23            return Err(io::Error::new(
24                io::ErrorKind::InvalidData,
25                "Invalid WAL magic bytes",
26            ));
27        }
28
29        if header[4] != WAL_VERSION {
30            return Err(io::Error::new(
31                io::ErrorKind::InvalidData,
32                format!("Unsupported WAL version: {}", header[4]),
33            ));
34        }
35
36        Ok(Self {
37            reader,
38            position: 8,
39        })
40    }
41
42    /// Iterate over records
43    /// Returns iterator that yields (LSn, WalRecord)
44    pub fn iter(self) -> WalIterator {
45        WalIterator {
46            reader: self.reader,
47            position: self.position,
48        }
49    }
50
51    /// Scan a WAL file and collect the most recent full-page image for
52    /// each page id observed. Returned map is `page_id → (lsn, data)`.
53    /// Recovery applies these images before redo so torn writes are
54    /// healed without the legacy `-dwb` sidecar (gh-478).
55    pub fn collect_full_page_images<P: AsRef<Path>>(
56        path: P,
57    ) -> io::Result<std::collections::BTreeMap<u32, (u64, Vec<u8>)>> {
58        use std::collections::BTreeMap;
59        let mut out: BTreeMap<u32, (u64, Vec<u8>)> = BTreeMap::new();
60        if !path.as_ref().exists() {
61            return Ok(out);
62        }
63        let reader = Self::open(path)?;
64        for item in reader.iter() {
65            let (lsn, record) = item?;
66            if let WalRecord::FullPageImage { page_id, data, .. } = record {
67                match out.get(&page_id) {
68                    Some((existing_lsn, _)) if *existing_lsn > lsn => {}
69                    _ => {
70                        out.insert(page_id, (lsn, data));
71                    }
72                }
73            }
74        }
75        Ok(out)
76    }
77}
78
79pub struct WalIterator {
80    reader: BufReader<File>,
81    position: u64,
82}
83
84impl Iterator for WalIterator {
85    type Item = io::Result<(u64, WalRecord)>;
86
87    fn next(&mut self) -> Option<Self::Item> {
88        // Need to record start position for LSN
89        // Since BufReader buffers, we can't trust file.seek(Current) directly without accounting for buffer.
90        // But `WalRecord::read` reads sequentially.
91        // The simple way: track position manually based on bytes read.
92        // WalRecord::read consumes bytes.
93
94        // Wait, WalRecord::read takes &mut R. We can wrap the reader to count bytes?
95        // Or just rely on the fact that we read sequentially.
96        // But we need to know the *start* LSN of the record to return it.
97
98        let start_pos = self.position;
99
100        // We need a way to track how many bytes were read by `WalRecord::read`.
101        // Let's create a counting wrapper.
102        struct CountingReader<'a, R> {
103            inner: &'a mut R,
104            count: u64,
105        }
106
107        impl<'a, R: Read> Read for CountingReader<'a, R> {
108            fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
109                let n = self.inner.read(buf)?;
110                self.count += n as u64;
111                Ok(n)
112            }
113        }
114
115        let mut counter = CountingReader {
116            inner: &mut self.reader,
117            count: 0,
118        };
119
120        match WalRecord::read(&mut counter) {
121            Ok(Some(record)) => {
122                self.position += counter.count;
123                Some(Ok((start_pos, record)))
124            }
125            Ok(None) => None, // EOF
126            Err(e) => Some(Err(e)),
127        }
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::super::writer::WalWriter;
134    use super::*;
135    use std::path::PathBuf;
136
137    struct FileGuard {
138        path: PathBuf,
139    }
140
141    impl Drop for FileGuard {
142        fn drop(&mut self) {
143            let _ = std::fs::remove_file(&self.path);
144        }
145    }
146
147    fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
148        let path =
149            std::env::temp_dir().join(format!("rb_wal_reader_{}_{}.wal", name, std::process::id()));
150        let guard = FileGuard { path: path.clone() };
151        let _ = std::fs::remove_file(&path);
152        (guard, path)
153    }
154
155    #[test]
156    fn test_read_empty_wal() {
157        let (_guard, path) = temp_wal("empty");
158
159        // Create empty WAL
160        {
161            let _writer = WalWriter::open(&path).unwrap();
162        }
163
164        // Read it
165        let reader = WalReader::open(&path).unwrap();
166        let records: Vec<_> = reader.iter().collect();
167        assert!(records.is_empty());
168    }
169
170    #[test]
171    fn test_read_single_record() {
172        let (_guard, path) = temp_wal("single");
173
174        // Write one record
175        {
176            let mut writer = WalWriter::open(&path).unwrap();
177            writer.append(&WalRecord::Begin { tx_id: 42 }).unwrap();
178        }
179
180        // Read it back
181        let reader = WalReader::open(&path).unwrap();
182        let records: Vec<_> = reader.iter().collect();
183
184        assert_eq!(records.len(), 1);
185        let (lsn, record) = records[0].as_ref().unwrap();
186        assert_eq!(*lsn, 8);
187        assert_eq!(*record, WalRecord::Begin { tx_id: 42 });
188    }
189
190    #[test]
191    fn test_read_multiple_records() {
192        let (_guard, path) = temp_wal("multi");
193
194        // Write multiple records
195        {
196            let mut writer = WalWriter::open(&path).unwrap();
197            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
198            writer
199                .append(&WalRecord::PageWrite {
200                    tx_id: 1,
201                    page_id: 10,
202                    data: vec![1, 2, 3],
203                })
204                .unwrap();
205            writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
206        }
207
208        // Read back
209        let reader = WalReader::open(&path).unwrap();
210        let records: Vec<_> = reader.iter().collect();
211
212        assert_eq!(records.len(), 3);
213
214        // Check each record
215        match &records[0].as_ref().unwrap().1 {
216            WalRecord::Begin { tx_id } => assert_eq!(*tx_id, 1),
217            _ => panic!("Expected Begin"),
218        }
219        match &records[1].as_ref().unwrap().1 {
220            WalRecord::PageWrite {
221                tx_id,
222                page_id,
223                data,
224            } => {
225                assert_eq!(*tx_id, 1);
226                assert_eq!(*page_id, 10);
227                assert_eq!(data, &vec![1, 2, 3]);
228            }
229            _ => panic!("Expected PageWrite"),
230        }
231        match &records[2].as_ref().unwrap().1 {
232            WalRecord::Commit { tx_id } => assert_eq!(*tx_id, 1),
233            _ => panic!("Expected Commit"),
234        }
235    }
236
237    #[test]
238    fn test_lsn_tracking() {
239        let (_guard, path) = temp_wal("lsn");
240
241        // Write records
242        let (lsn1, lsn2, lsn3);
243        {
244            let mut writer = WalWriter::open(&path).unwrap();
245            lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
246            lsn2 = writer.append(&WalRecord::Checkpoint { lsn: 100 }).unwrap();
247            lsn3 = writer.append(&WalRecord::Rollback { tx_id: 1 }).unwrap();
248        }
249
250        // Read and verify LSNs
251        let reader = WalReader::open(&path).unwrap();
252        let records: Vec<_> = reader.iter().collect();
253
254        assert_eq!(records.len(), 3);
255        assert_eq!(records[0].as_ref().unwrap().0, lsn1);
256        assert_eq!(records[1].as_ref().unwrap().0, lsn2);
257        assert_eq!(records[2].as_ref().unwrap().0, lsn3);
258    }
259
260    #[test]
261    fn test_invalid_magic() {
262        let (_guard, path) = temp_wal("badmagic");
263
264        // Write invalid file
265        std::fs::write(&path, b"BAAD0000").unwrap();
266
267        let result = WalReader::open(&path);
268        assert!(result.is_err());
269    }
270
271    #[test]
272    fn test_invalid_version() {
273        let (_guard, path) = temp_wal("badver");
274
275        // Write header with wrong version
276        let mut header = Vec::new();
277        header.extend_from_slice(WAL_MAGIC);
278        header.push(99); // Wrong version
279        header.extend_from_slice(&[0u8; 3]);
280        std::fs::write(&path, &header).unwrap();
281
282        let result = WalReader::open(&path);
283        assert!(result.is_err());
284    }
285
286    #[test]
287    fn test_read_large_page_write() {
288        let (_guard, path) = temp_wal("large");
289
290        let large_data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
291
292        // Write large record
293        {
294            let mut writer = WalWriter::open(&path).unwrap();
295            writer
296                .append(&WalRecord::PageWrite {
297                    tx_id: 1,
298                    page_id: 0,
299                    data: large_data.clone(),
300                })
301                .unwrap();
302        }
303
304        // Read back
305        let reader = WalReader::open(&path).unwrap();
306        let records: Vec<_> = reader.iter().collect();
307
308        assert_eq!(records.len(), 1);
309        match &records[0].as_ref().unwrap().1 {
310            WalRecord::PageWrite { data, .. } => {
311                assert_eq!(*data, large_data);
312            }
313            _ => panic!("Expected PageWrite"),
314        }
315    }
316}