Skip to main content

reddb_server/storage/wal/
reader.rs

1use super::record::WalRecord;
2use std::fs::File;
3use std::io::{self, BufReader, Read};
4use std::path::Path;
5
6/// Reader for the Write-Ahead Log
7pub struct WalReader {
8    reader: BufReader<File>,
9    position: u64,
10    format_version: u8,
11}
12
13impl WalReader {
14    /// Open a WAL file for reading
15    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
16        let file = File::open(path)?;
17        let mut reader = BufReader::new(file);
18
19        // Check header
20        let mut header = [0u8; reddb_file::WAL_FILE_HEADER_BYTES];
21        reader.read_exact(&mut header)?;
22        let decoded = reddb_file::decode_wal_file_header(&header)?;
23
24        Ok(Self {
25            reader,
26            position: reddb_file::WAL_FILE_HEADER_BYTES as u64,
27            format_version: decoded.version,
28        })
29    }
30
31    /// Iterate over records
32    /// Returns iterator that yields (LSn, WalRecord)
33    pub fn iter(self) -> WalIterator {
34        WalIterator {
35            reader: self.reader,
36            position: self.position,
37            format_version: self.format_version,
38        }
39    }
40
41    /// Scan a WAL file and collect the most recent full-page image for
42    /// each page id observed. Returned map is `page_id → (lsn, data)`.
43    /// Recovery applies these images before redo so torn writes are
44    /// healed without the legacy `-dwb` sidecar (gh-478).
45    pub fn collect_full_page_images<P: AsRef<Path>>(
46        path: P,
47    ) -> io::Result<std::collections::BTreeMap<u32, (u64, Vec<u8>)>> {
48        use std::collections::BTreeMap;
49        let mut out: BTreeMap<u32, (u64, Vec<u8>)> = BTreeMap::new();
50        if !path.as_ref().exists() {
51            return Ok(out);
52        }
53        let reader = Self::open(path)?;
54        for item in reader.iter() {
55            let (lsn, record) = item?;
56            if let WalRecord::FullPageImage { page_id, data, .. } = record {
57                match out.get(&page_id) {
58                    Some((existing_lsn, _)) if *existing_lsn > lsn => {}
59                    _ => {
60                        out.insert(page_id, (lsn, data));
61                    }
62                }
63            }
64        }
65        Ok(out)
66    }
67}
68
69pub struct WalIterator {
70    reader: BufReader<File>,
71    position: u64,
72    format_version: u8,
73}
74
75impl Iterator for WalIterator {
76    type Item = io::Result<(u64, WalRecord)>;
77
78    fn next(&mut self) -> Option<Self::Item> {
79        // Need to record start position for LSN
80        // Since BufReader buffers, we can't trust file.seek(Current) directly without accounting for buffer.
81        // But `WalRecord::read` reads sequentially.
82        // The simple way: track position manually based on bytes read.
83        // WalRecord::read consumes bytes.
84
85        // Wait, WalRecord::read takes &mut R. We can wrap the reader to count bytes?
86        // Or just rely on the fact that we read sequentially.
87        // But we need to know the *start* LSN of the record to return it.
88
89        let start_pos = self.position;
90
91        // We need a way to track how many bytes were read by `WalRecord::read`.
92        // Let's create a counting wrapper.
93        struct CountingReader<'a, R> {
94            inner: &'a mut R,
95            count: u64,
96        }
97
98        impl<'a, R: Read> Read for CountingReader<'a, R> {
99            fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
100                let n = self.inner.read(buf)?;
101                self.count += n as u64;
102                Ok(n)
103            }
104        }
105
106        let mut counter = CountingReader {
107            inner: &mut self.reader,
108            count: 0,
109        };
110
111        match WalRecord::read_with_format_version(&mut counter, self.format_version) {
112            Ok(Some((_, record))) => {
113                self.position += counter.count;
114                Some(Ok((start_pos, record)))
115            }
116            Ok(None) => None, // EOF
117            Err(e) => Some(Err(e)),
118        }
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::super::writer::WalWriter;
125    use super::*;
126    use std::path::PathBuf;
127
128    struct FileGuard {
129        path: PathBuf,
130    }
131
132    impl Drop for FileGuard {
133        fn drop(&mut self) {
134            let _ = std::fs::remove_file(&self.path);
135        }
136    }
137
138    fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
139        let path = reddb_file::layout::wal_component_temp_path(
140            &std::env::temp_dir(),
141            "reader",
142            name,
143            std::process::id(),
144        );
145        let guard = FileGuard { path: path.clone() };
146        let _ = std::fs::remove_file(&path);
147        (guard, path)
148    }
149
150    #[test]
151    fn test_read_empty_wal() {
152        let (_guard, path) = temp_wal("empty");
153
154        // Create empty WAL
155        {
156            let _writer = WalWriter::open(&path).unwrap();
157        }
158
159        // Read it
160        let reader = WalReader::open(&path).unwrap();
161        let records: Vec<_> = reader.iter().collect();
162        assert!(records.is_empty());
163    }
164
165    #[test]
166    fn test_read_single_record() {
167        let (_guard, path) = temp_wal("single");
168
169        // Write one record
170        {
171            let mut writer = WalWriter::open(&path).unwrap();
172            writer.append(&WalRecord::Begin { tx_id: 42 }).unwrap();
173        }
174
175        // Read it back
176        let reader = WalReader::open(&path).unwrap();
177        let records: Vec<_> = reader.iter().collect();
178
179        assert_eq!(records.len(), 1);
180        let (lsn, record) = records[0].as_ref().unwrap();
181        assert_eq!(*lsn, 8);
182        assert_eq!(*record, WalRecord::Begin { tx_id: 42 });
183    }
184
185    #[test]
186    fn test_read_multiple_records() {
187        let (_guard, path) = temp_wal("multi");
188
189        // Write multiple records
190        {
191            let mut writer = WalWriter::open(&path).unwrap();
192            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
193            writer
194                .append(&WalRecord::PageWrite {
195                    tx_id: 1,
196                    page_id: 10,
197                    data: vec![1, 2, 3],
198                })
199                .unwrap();
200            writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
201        }
202
203        // Read back
204        let reader = WalReader::open(&path).unwrap();
205        let records: Vec<_> = reader.iter().collect();
206
207        assert_eq!(records.len(), 3);
208
209        // Check each record
210        match &records[0].as_ref().unwrap().1 {
211            WalRecord::Begin { tx_id } => assert_eq!(*tx_id, 1),
212            _ => panic!("Expected Begin"),
213        }
214        match &records[1].as_ref().unwrap().1 {
215            WalRecord::PageWrite {
216                tx_id,
217                page_id,
218                data,
219            } => {
220                assert_eq!(*tx_id, 1);
221                assert_eq!(*page_id, 10);
222                assert_eq!(data, &vec![1, 2, 3]);
223            }
224            _ => panic!("Expected PageWrite"),
225        }
226        match &records[2].as_ref().unwrap().1 {
227            WalRecord::Commit { tx_id } => assert_eq!(*tx_id, 1),
228            _ => panic!("Expected Commit"),
229        }
230    }
231
232    #[test]
233    fn test_lsn_tracking() {
234        let (_guard, path) = temp_wal("lsn");
235
236        // Write records
237        let (lsn1, lsn2, lsn3);
238        {
239            let mut writer = WalWriter::open(&path).unwrap();
240            lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
241            lsn2 = writer.append(&WalRecord::Checkpoint { lsn: 100 }).unwrap();
242            lsn3 = writer.append(&WalRecord::Rollback { tx_id: 1 }).unwrap();
243        }
244
245        // Read and verify LSNs
246        let reader = WalReader::open(&path).unwrap();
247        let records: Vec<_> = reader.iter().collect();
248
249        assert_eq!(records.len(), 3);
250        assert_eq!(records[0].as_ref().unwrap().0, lsn1);
251        assert_eq!(records[1].as_ref().unwrap().0, lsn2);
252        assert_eq!(records[2].as_ref().unwrap().0, lsn3);
253    }
254
255    #[test]
256    fn test_invalid_magic() {
257        let (_guard, path) = temp_wal("badmagic");
258
259        // Write invalid file
260        std::fs::write(&path, b"BAAD0000").unwrap();
261
262        let result = WalReader::open(&path);
263        assert!(result.is_err());
264    }
265
266    #[test]
267    fn test_invalid_version() {
268        let (_guard, path) = temp_wal("badver");
269
270        // Write header with wrong version
271        let mut header = reddb_file::encode_wal_file_header();
272        header[4] = 99; // Wrong version
273        std::fs::write(&path, header).unwrap();
274
275        let result = WalReader::open(&path);
276        assert!(result.is_err());
277    }
278
279    #[test]
280    fn test_read_large_page_write() {
281        let (_guard, path) = temp_wal("large");
282
283        let large_data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
284
285        // Write large record
286        {
287            let mut writer = WalWriter::open(&path).unwrap();
288            writer
289                .append(&WalRecord::PageWrite {
290                    tx_id: 1,
291                    page_id: 0,
292                    data: large_data.clone(),
293                })
294                .unwrap();
295        }
296
297        // Read back
298        let reader = WalReader::open(&path).unwrap();
299        let records: Vec<_> = reader.iter().collect();
300
301        assert_eq!(records.len(), 1);
302        match &records[0].as_ref().unwrap().1 {
303            WalRecord::PageWrite { data, .. } => {
304                assert_eq!(*data, large_data);
305            }
306            _ => panic!("Expected PageWrite"),
307        }
308    }
309}