Skip to main content

reddb_server/storage/wal/
reader.rs

1use super::record::{WalRecord, WAL_MAGIC, WAL_VERSION, WAL_VERSION_V2};
2use std::fs::File;
3use std::io::{self, BufReader, Read};
4use std::path::Path;
5
6/// Reader for the Write-Ahead Log
7pub struct WalReader {
8    reader: BufReader<File>,
9    position: u64,
10    format_version: u8,
11}
12
13impl WalReader {
14    /// Open a WAL file for reading
15    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
16        let file = File::open(path)?;
17        let mut reader = BufReader::new(file);
18
19        // Check header
20        let mut header = [0u8; 8];
21        reader.read_exact(&mut header)?;
22
23        if &header[0..4] != WAL_MAGIC {
24            return Err(io::Error::new(
25                io::ErrorKind::InvalidData,
26                "Invalid WAL magic bytes",
27            ));
28        }
29
30        if header[4] != WAL_VERSION && header[4] != WAL_VERSION_V2 {
31            return Err(io::Error::new(
32                io::ErrorKind::InvalidData,
33                format!("Unsupported WAL version: {}", header[4]),
34            ));
35        }
36
37        Ok(Self {
38            reader,
39            position: 8,
40            format_version: header[4],
41        })
42    }
43
44    /// Iterate over records
45    /// Returns iterator that yields (LSn, WalRecord)
46    pub fn iter(self) -> WalIterator {
47        WalIterator {
48            reader: self.reader,
49            position: self.position,
50            format_version: self.format_version,
51        }
52    }
53
54    /// Scan a WAL file and collect the most recent full-page image for
55    /// each page id observed. Returned map is `page_id → (lsn, data)`.
56    /// Recovery applies these images before redo so torn writes are
57    /// healed without the legacy `-dwb` sidecar (gh-478).
58    pub fn collect_full_page_images<P: AsRef<Path>>(
59        path: P,
60    ) -> io::Result<std::collections::BTreeMap<u32, (u64, Vec<u8>)>> {
61        use std::collections::BTreeMap;
62        let mut out: BTreeMap<u32, (u64, Vec<u8>)> = BTreeMap::new();
63        if !path.as_ref().exists() {
64            return Ok(out);
65        }
66        let reader = Self::open(path)?;
67        for item in reader.iter() {
68            let (lsn, record) = item?;
69            if let WalRecord::FullPageImage { page_id, data, .. } = record {
70                match out.get(&page_id) {
71                    Some((existing_lsn, _)) if *existing_lsn > lsn => {}
72                    _ => {
73                        out.insert(page_id, (lsn, data));
74                    }
75                }
76            }
77        }
78        Ok(out)
79    }
80}
81
82pub struct WalIterator {
83    reader: BufReader<File>,
84    position: u64,
85    format_version: u8,
86}
87
88impl Iterator for WalIterator {
89    type Item = io::Result<(u64, WalRecord)>;
90
91    fn next(&mut self) -> Option<Self::Item> {
92        // Need to record start position for LSN
93        // Since BufReader buffers, we can't trust file.seek(Current) directly without accounting for buffer.
94        // But `WalRecord::read` reads sequentially.
95        // The simple way: track position manually based on bytes read.
96        // WalRecord::read consumes bytes.
97
98        // Wait, WalRecord::read takes &mut R. We can wrap the reader to count bytes?
99        // Or just rely on the fact that we read sequentially.
100        // But we need to know the *start* LSN of the record to return it.
101
102        let start_pos = self.position;
103
104        // We need a way to track how many bytes were read by `WalRecord::read`.
105        // Let's create a counting wrapper.
106        struct CountingReader<'a, R> {
107            inner: &'a mut R,
108            count: u64,
109        }
110
111        impl<'a, R: Read> Read for CountingReader<'a, R> {
112            fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
113                let n = self.inner.read(buf)?;
114                self.count += n as u64;
115                Ok(n)
116            }
117        }
118
119        let mut counter = CountingReader {
120            inner: &mut self.reader,
121            count: 0,
122        };
123
124        match WalRecord::read_with_format_version(&mut counter, self.format_version) {
125            Ok(Some((_, record))) => {
126                self.position += counter.count;
127                Some(Ok((start_pos, record)))
128            }
129            Ok(None) => None, // EOF
130            Err(e) => Some(Err(e)),
131        }
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::super::writer::WalWriter;
138    use super::*;
139    use std::path::PathBuf;
140
141    struct FileGuard {
142        path: PathBuf,
143    }
144
145    impl Drop for FileGuard {
146        fn drop(&mut self) {
147            let _ = std::fs::remove_file(&self.path);
148        }
149    }
150
151    fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
152        let path =
153            std::env::temp_dir().join(format!("rb_wal_reader_{}_{}.wal", name, std::process::id()));
154        let guard = FileGuard { path: path.clone() };
155        let _ = std::fs::remove_file(&path);
156        (guard, path)
157    }
158
159    #[test]
160    fn test_read_empty_wal() {
161        let (_guard, path) = temp_wal("empty");
162
163        // Create empty WAL
164        {
165            let _writer = WalWriter::open(&path).unwrap();
166        }
167
168        // Read it
169        let reader = WalReader::open(&path).unwrap();
170        let records: Vec<_> = reader.iter().collect();
171        assert!(records.is_empty());
172    }
173
174    #[test]
175    fn test_read_single_record() {
176        let (_guard, path) = temp_wal("single");
177
178        // Write one record
179        {
180            let mut writer = WalWriter::open(&path).unwrap();
181            writer.append(&WalRecord::Begin { tx_id: 42 }).unwrap();
182        }
183
184        // Read it back
185        let reader = WalReader::open(&path).unwrap();
186        let records: Vec<_> = reader.iter().collect();
187
188        assert_eq!(records.len(), 1);
189        let (lsn, record) = records[0].as_ref().unwrap();
190        assert_eq!(*lsn, 8);
191        assert_eq!(*record, WalRecord::Begin { tx_id: 42 });
192    }
193
194    #[test]
195    fn test_read_multiple_records() {
196        let (_guard, path) = temp_wal("multi");
197
198        // Write multiple records
199        {
200            let mut writer = WalWriter::open(&path).unwrap();
201            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
202            writer
203                .append(&WalRecord::PageWrite {
204                    tx_id: 1,
205                    page_id: 10,
206                    data: vec![1, 2, 3],
207                })
208                .unwrap();
209            writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
210        }
211
212        // Read back
213        let reader = WalReader::open(&path).unwrap();
214        let records: Vec<_> = reader.iter().collect();
215
216        assert_eq!(records.len(), 3);
217
218        // Check each record
219        match &records[0].as_ref().unwrap().1 {
220            WalRecord::Begin { tx_id } => assert_eq!(*tx_id, 1),
221            _ => panic!("Expected Begin"),
222        }
223        match &records[1].as_ref().unwrap().1 {
224            WalRecord::PageWrite {
225                tx_id,
226                page_id,
227                data,
228            } => {
229                assert_eq!(*tx_id, 1);
230                assert_eq!(*page_id, 10);
231                assert_eq!(data, &vec![1, 2, 3]);
232            }
233            _ => panic!("Expected PageWrite"),
234        }
235        match &records[2].as_ref().unwrap().1 {
236            WalRecord::Commit { tx_id } => assert_eq!(*tx_id, 1),
237            _ => panic!("Expected Commit"),
238        }
239    }
240
241    #[test]
242    fn test_lsn_tracking() {
243        let (_guard, path) = temp_wal("lsn");
244
245        // Write records
246        let (lsn1, lsn2, lsn3);
247        {
248            let mut writer = WalWriter::open(&path).unwrap();
249            lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
250            lsn2 = writer.append(&WalRecord::Checkpoint { lsn: 100 }).unwrap();
251            lsn3 = writer.append(&WalRecord::Rollback { tx_id: 1 }).unwrap();
252        }
253
254        // Read and verify LSNs
255        let reader = WalReader::open(&path).unwrap();
256        let records: Vec<_> = reader.iter().collect();
257
258        assert_eq!(records.len(), 3);
259        assert_eq!(records[0].as_ref().unwrap().0, lsn1);
260        assert_eq!(records[1].as_ref().unwrap().0, lsn2);
261        assert_eq!(records[2].as_ref().unwrap().0, lsn3);
262    }
263
264    #[test]
265    fn test_invalid_magic() {
266        let (_guard, path) = temp_wal("badmagic");
267
268        // Write invalid file
269        std::fs::write(&path, b"BAAD0000").unwrap();
270
271        let result = WalReader::open(&path);
272        assert!(result.is_err());
273    }
274
275    #[test]
276    fn test_invalid_version() {
277        let (_guard, path) = temp_wal("badver");
278
279        // Write header with wrong version
280        let mut header = Vec::new();
281        header.extend_from_slice(WAL_MAGIC);
282        header.push(99); // Wrong version
283        header.extend_from_slice(&[0u8; 3]);
284        std::fs::write(&path, &header).unwrap();
285
286        let result = WalReader::open(&path);
287        assert!(result.is_err());
288    }
289
290    #[test]
291    fn test_read_large_page_write() {
292        let (_guard, path) = temp_wal("large");
293
294        let large_data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
295
296        // Write large record
297        {
298            let mut writer = WalWriter::open(&path).unwrap();
299            writer
300                .append(&WalRecord::PageWrite {
301                    tx_id: 1,
302                    page_id: 0,
303                    data: large_data.clone(),
304                })
305                .unwrap();
306        }
307
308        // Read back
309        let reader = WalReader::open(&path).unwrap();
310        let records: Vec<_> = reader.iter().collect();
311
312        assert_eq!(records.len(), 1);
313        match &records[0].as_ref().unwrap().1 {
314            WalRecord::PageWrite { data, .. } => {
315                assert_eq!(*data, large_data);
316            }
317            _ => panic!("Expected PageWrite"),
318        }
319    }
320}