Skip to main content

reddb_server/storage/wal/
reader.rs

1use super::record::{WalRecord, WAL_MAGIC, WAL_VERSION};
2use std::fs::File;
3use std::io::{self, BufReader, Read};
4use std::path::Path;
5
6/// Reader for the Write-Ahead Log
7pub struct WalReader {
8    reader: BufReader<File>,
9    position: u64,
10}
11
12impl WalReader {
13    /// Open a WAL file for reading
14    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
15        let file = File::open(path)?;
16        let mut reader = BufReader::new(file);
17
18        // Check header
19        let mut header = [0u8; 8];
20        reader.read_exact(&mut header)?;
21
22        if &header[0..4] != WAL_MAGIC {
23            return Err(io::Error::new(
24                io::ErrorKind::InvalidData,
25                "Invalid WAL magic bytes",
26            ));
27        }
28
29        if header[4] != WAL_VERSION {
30            return Err(io::Error::new(
31                io::ErrorKind::InvalidData,
32                format!("Unsupported WAL version: {}", header[4]),
33            ));
34        }
35
36        Ok(Self {
37            reader,
38            position: 8,
39        })
40    }
41
42    /// Iterate over records
43    /// Returns iterator that yields (LSn, WalRecord)
44    pub fn iter(self) -> WalIterator {
45        WalIterator {
46            reader: self.reader,
47            position: self.position,
48        }
49    }
50
51    /// Scan a WAL file and collect the most recent full-page image for
52    /// each page id observed. Returned map is `page_id → (lsn, data)`.
53    /// Recovery applies these images before redo so torn writes are
54    /// healed without the legacy `-dwb` sidecar (gh-478).
55    pub fn collect_full_page_images<P: AsRef<Path>>(
56        path: P,
57    ) -> io::Result<std::collections::BTreeMap<u32, (u64, Vec<u8>)>> {
58        use std::collections::BTreeMap;
59        let mut out: BTreeMap<u32, (u64, Vec<u8>)> = BTreeMap::new();
60        if !path.as_ref().exists() {
61            return Ok(out);
62        }
63        let reader = Self::open(path)?;
64        for item in reader.iter() {
65            let (lsn, record) = item?;
66            if let WalRecord::FullPageImage {
67                page_id, data, ..
68            } = record
69            {
70                match out.get(&page_id) {
71                    Some((existing_lsn, _)) if *existing_lsn > lsn => {}
72                    _ => {
73                        out.insert(page_id, (lsn, data));
74                    }
75                }
76            }
77        }
78        Ok(out)
79    }
80}
81
82pub struct WalIterator {
83    reader: BufReader<File>,
84    position: u64,
85}
86
87impl Iterator for WalIterator {
88    type Item = io::Result<(u64, WalRecord)>;
89
90    fn next(&mut self) -> Option<Self::Item> {
91        // Need to record start position for LSN
92        // Since BufReader buffers, we can't trust file.seek(Current) directly without accounting for buffer.
93        // But `WalRecord::read` reads sequentially.
94        // The simple way: track position manually based on bytes read.
95        // WalRecord::read consumes bytes.
96
97        // Wait, WalRecord::read takes &mut R. We can wrap the reader to count bytes?
98        // Or just rely on the fact that we read sequentially.
99        // But we need to know the *start* LSN of the record to return it.
100
101        let start_pos = self.position;
102
103        // We need a way to track how many bytes were read by `WalRecord::read`.
104        // Let's create a counting wrapper.
105        struct CountingReader<'a, R> {
106            inner: &'a mut R,
107            count: u64,
108        }
109
110        impl<'a, R: Read> Read for CountingReader<'a, R> {
111            fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
112                let n = self.inner.read(buf)?;
113                self.count += n as u64;
114                Ok(n)
115            }
116        }
117
118        let mut counter = CountingReader {
119            inner: &mut self.reader,
120            count: 0,
121        };
122
123        match WalRecord::read(&mut counter) {
124            Ok(Some(record)) => {
125                self.position += counter.count;
126                Some(Ok((start_pos, record)))
127            }
128            Ok(None) => None, // EOF
129            Err(e) => Some(Err(e)),
130        }
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::super::writer::WalWriter;
137    use super::*;
138    use std::path::PathBuf;
139
140    struct FileGuard {
141        path: PathBuf,
142    }
143
144    impl Drop for FileGuard {
145        fn drop(&mut self) {
146            let _ = std::fs::remove_file(&self.path);
147        }
148    }
149
150    fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
151        let path =
152            std::env::temp_dir().join(format!("rb_wal_reader_{}_{}.wal", name, std::process::id()));
153        let guard = FileGuard { path: path.clone() };
154        let _ = std::fs::remove_file(&path);
155        (guard, path)
156    }
157
158    #[test]
159    fn test_read_empty_wal() {
160        let (_guard, path) = temp_wal("empty");
161
162        // Create empty WAL
163        {
164            let _writer = WalWriter::open(&path).unwrap();
165        }
166
167        // Read it
168        let reader = WalReader::open(&path).unwrap();
169        let records: Vec<_> = reader.iter().collect();
170        assert!(records.is_empty());
171    }
172
173    #[test]
174    fn test_read_single_record() {
175        let (_guard, path) = temp_wal("single");
176
177        // Write one record
178        {
179            let mut writer = WalWriter::open(&path).unwrap();
180            writer.append(&WalRecord::Begin { tx_id: 42 }).unwrap();
181        }
182
183        // Read it back
184        let reader = WalReader::open(&path).unwrap();
185        let records: Vec<_> = reader.iter().collect();
186
187        assert_eq!(records.len(), 1);
188        let (lsn, record) = records[0].as_ref().unwrap();
189        assert_eq!(*lsn, 8);
190        assert_eq!(*record, WalRecord::Begin { tx_id: 42 });
191    }
192
193    #[test]
194    fn test_read_multiple_records() {
195        let (_guard, path) = temp_wal("multi");
196
197        // Write multiple records
198        {
199            let mut writer = WalWriter::open(&path).unwrap();
200            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
201            writer
202                .append(&WalRecord::PageWrite {
203                    tx_id: 1,
204                    page_id: 10,
205                    data: vec![1, 2, 3],
206                })
207                .unwrap();
208            writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
209        }
210
211        // Read back
212        let reader = WalReader::open(&path).unwrap();
213        let records: Vec<_> = reader.iter().collect();
214
215        assert_eq!(records.len(), 3);
216
217        // Check each record
218        match &records[0].as_ref().unwrap().1 {
219            WalRecord::Begin { tx_id } => assert_eq!(*tx_id, 1),
220            _ => panic!("Expected Begin"),
221        }
222        match &records[1].as_ref().unwrap().1 {
223            WalRecord::PageWrite {
224                tx_id,
225                page_id,
226                data,
227            } => {
228                assert_eq!(*tx_id, 1);
229                assert_eq!(*page_id, 10);
230                assert_eq!(data, &vec![1, 2, 3]);
231            }
232            _ => panic!("Expected PageWrite"),
233        }
234        match &records[2].as_ref().unwrap().1 {
235            WalRecord::Commit { tx_id } => assert_eq!(*tx_id, 1),
236            _ => panic!("Expected Commit"),
237        }
238    }
239
240    #[test]
241    fn test_lsn_tracking() {
242        let (_guard, path) = temp_wal("lsn");
243
244        // Write records
245        let (lsn1, lsn2, lsn3);
246        {
247            let mut writer = WalWriter::open(&path).unwrap();
248            lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
249            lsn2 = writer.append(&WalRecord::Checkpoint { lsn: 100 }).unwrap();
250            lsn3 = writer.append(&WalRecord::Rollback { tx_id: 1 }).unwrap();
251        }
252
253        // Read and verify LSNs
254        let reader = WalReader::open(&path).unwrap();
255        let records: Vec<_> = reader.iter().collect();
256
257        assert_eq!(records.len(), 3);
258        assert_eq!(records[0].as_ref().unwrap().0, lsn1);
259        assert_eq!(records[1].as_ref().unwrap().0, lsn2);
260        assert_eq!(records[2].as_ref().unwrap().0, lsn3);
261    }
262
263    #[test]
264    fn test_invalid_magic() {
265        let (_guard, path) = temp_wal("badmagic");
266
267        // Write invalid file
268        std::fs::write(&path, b"BAAD0000").unwrap();
269
270        let result = WalReader::open(&path);
271        assert!(result.is_err());
272    }
273
274    #[test]
275    fn test_invalid_version() {
276        let (_guard, path) = temp_wal("badver");
277
278        // Write header with wrong version
279        let mut header = Vec::new();
280        header.extend_from_slice(WAL_MAGIC);
281        header.push(99); // Wrong version
282        header.extend_from_slice(&[0u8; 3]);
283        std::fs::write(&path, &header).unwrap();
284
285        let result = WalReader::open(&path);
286        assert!(result.is_err());
287    }
288
289    #[test]
290    fn test_read_large_page_write() {
291        let (_guard, path) = temp_wal("large");
292
293        let large_data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
294
295        // Write large record
296        {
297            let mut writer = WalWriter::open(&path).unwrap();
298            writer
299                .append(&WalRecord::PageWrite {
300                    tx_id: 1,
301                    page_id: 0,
302                    data: large_data.clone(),
303                })
304                .unwrap();
305        }
306
307        // Read back
308        let reader = WalReader::open(&path).unwrap();
309        let records: Vec<_> = reader.iter().collect();
310
311        assert_eq!(records.len(), 1);
312        match &records[0].as_ref().unwrap().1 {
313            WalRecord::PageWrite { data, .. } => {
314                assert_eq!(*data, large_data);
315            }
316            _ => panic!("Expected PageWrite"),
317        }
318    }
319}