reddb_server/storage/wal/
reader.rs1use super::record::{WalRecord, WAL_MAGIC, WAL_VERSION};
2use std::fs::File;
3use std::io::{self, BufReader, Read};
4use std::path::Path;
5
6pub struct WalReader {
8 reader: BufReader<File>,
9 position: u64,
10}
11
12impl WalReader {
13 pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
15 let file = File::open(path)?;
16 let mut reader = BufReader::new(file);
17
18 let mut header = [0u8; 8];
20 reader.read_exact(&mut header)?;
21
22 if &header[0..4] != WAL_MAGIC {
23 return Err(io::Error::new(
24 io::ErrorKind::InvalidData,
25 "Invalid WAL magic bytes",
26 ));
27 }
28
29 if header[4] != WAL_VERSION {
30 return Err(io::Error::new(
31 io::ErrorKind::InvalidData,
32 format!("Unsupported WAL version: {}", header[4]),
33 ));
34 }
35
36 Ok(Self {
37 reader,
38 position: 8,
39 })
40 }
41
42 pub fn iter(self) -> WalIterator {
45 WalIterator {
46 reader: self.reader,
47 position: self.position,
48 }
49 }
50
51 pub fn collect_full_page_images<P: AsRef<Path>>(
56 path: P,
57 ) -> io::Result<std::collections::BTreeMap<u32, (u64, Vec<u8>)>> {
58 use std::collections::BTreeMap;
59 let mut out: BTreeMap<u32, (u64, Vec<u8>)> = BTreeMap::new();
60 if !path.as_ref().exists() {
61 return Ok(out);
62 }
63 let reader = Self::open(path)?;
64 for item in reader.iter() {
65 let (lsn, record) = item?;
66 if let WalRecord::FullPageImage {
67 page_id, data, ..
68 } = record
69 {
70 match out.get(&page_id) {
71 Some((existing_lsn, _)) if *existing_lsn > lsn => {}
72 _ => {
73 out.insert(page_id, (lsn, data));
74 }
75 }
76 }
77 }
78 Ok(out)
79 }
80}
81
82pub struct WalIterator {
83 reader: BufReader<File>,
84 position: u64,
85}
86
87impl Iterator for WalIterator {
88 type Item = io::Result<(u64, WalRecord)>;
89
90 fn next(&mut self) -> Option<Self::Item> {
91 let start_pos = self.position;
102
103 struct CountingReader<'a, R> {
106 inner: &'a mut R,
107 count: u64,
108 }
109
110 impl<'a, R: Read> Read for CountingReader<'a, R> {
111 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
112 let n = self.inner.read(buf)?;
113 self.count += n as u64;
114 Ok(n)
115 }
116 }
117
118 let mut counter = CountingReader {
119 inner: &mut self.reader,
120 count: 0,
121 };
122
123 match WalRecord::read(&mut counter) {
124 Ok(Some(record)) => {
125 self.position += counter.count;
126 Some(Ok((start_pos, record)))
127 }
128 Ok(None) => None, Err(e) => Some(Err(e)),
130 }
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::super::writer::WalWriter;
137 use super::*;
138 use std::path::PathBuf;
139
140 struct FileGuard {
141 path: PathBuf,
142 }
143
144 impl Drop for FileGuard {
145 fn drop(&mut self) {
146 let _ = std::fs::remove_file(&self.path);
147 }
148 }
149
150 fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
151 let path =
152 std::env::temp_dir().join(format!("rb_wal_reader_{}_{}.wal", name, std::process::id()));
153 let guard = FileGuard { path: path.clone() };
154 let _ = std::fs::remove_file(&path);
155 (guard, path)
156 }
157
158 #[test]
159 fn test_read_empty_wal() {
160 let (_guard, path) = temp_wal("empty");
161
162 {
164 let _writer = WalWriter::open(&path).unwrap();
165 }
166
167 let reader = WalReader::open(&path).unwrap();
169 let records: Vec<_> = reader.iter().collect();
170 assert!(records.is_empty());
171 }
172
173 #[test]
174 fn test_read_single_record() {
175 let (_guard, path) = temp_wal("single");
176
177 {
179 let mut writer = WalWriter::open(&path).unwrap();
180 writer.append(&WalRecord::Begin { tx_id: 42 }).unwrap();
181 }
182
183 let reader = WalReader::open(&path).unwrap();
185 let records: Vec<_> = reader.iter().collect();
186
187 assert_eq!(records.len(), 1);
188 let (lsn, record) = records[0].as_ref().unwrap();
189 assert_eq!(*lsn, 8);
190 assert_eq!(*record, WalRecord::Begin { tx_id: 42 });
191 }
192
193 #[test]
194 fn test_read_multiple_records() {
195 let (_guard, path) = temp_wal("multi");
196
197 {
199 let mut writer = WalWriter::open(&path).unwrap();
200 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
201 writer
202 .append(&WalRecord::PageWrite {
203 tx_id: 1,
204 page_id: 10,
205 data: vec![1, 2, 3],
206 })
207 .unwrap();
208 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
209 }
210
211 let reader = WalReader::open(&path).unwrap();
213 let records: Vec<_> = reader.iter().collect();
214
215 assert_eq!(records.len(), 3);
216
217 match &records[0].as_ref().unwrap().1 {
219 WalRecord::Begin { tx_id } => assert_eq!(*tx_id, 1),
220 _ => panic!("Expected Begin"),
221 }
222 match &records[1].as_ref().unwrap().1 {
223 WalRecord::PageWrite {
224 tx_id,
225 page_id,
226 data,
227 } => {
228 assert_eq!(*tx_id, 1);
229 assert_eq!(*page_id, 10);
230 assert_eq!(data, &vec![1, 2, 3]);
231 }
232 _ => panic!("Expected PageWrite"),
233 }
234 match &records[2].as_ref().unwrap().1 {
235 WalRecord::Commit { tx_id } => assert_eq!(*tx_id, 1),
236 _ => panic!("Expected Commit"),
237 }
238 }
239
240 #[test]
241 fn test_lsn_tracking() {
242 let (_guard, path) = temp_wal("lsn");
243
244 let (lsn1, lsn2, lsn3);
246 {
247 let mut writer = WalWriter::open(&path).unwrap();
248 lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
249 lsn2 = writer.append(&WalRecord::Checkpoint { lsn: 100 }).unwrap();
250 lsn3 = writer.append(&WalRecord::Rollback { tx_id: 1 }).unwrap();
251 }
252
253 let reader = WalReader::open(&path).unwrap();
255 let records: Vec<_> = reader.iter().collect();
256
257 assert_eq!(records.len(), 3);
258 assert_eq!(records[0].as_ref().unwrap().0, lsn1);
259 assert_eq!(records[1].as_ref().unwrap().0, lsn2);
260 assert_eq!(records[2].as_ref().unwrap().0, lsn3);
261 }
262
263 #[test]
264 fn test_invalid_magic() {
265 let (_guard, path) = temp_wal("badmagic");
266
267 std::fs::write(&path, b"BAAD0000").unwrap();
269
270 let result = WalReader::open(&path);
271 assert!(result.is_err());
272 }
273
274 #[test]
275 fn test_invalid_version() {
276 let (_guard, path) = temp_wal("badver");
277
278 let mut header = Vec::new();
280 header.extend_from_slice(WAL_MAGIC);
281 header.push(99); header.extend_from_slice(&[0u8; 3]);
283 std::fs::write(&path, &header).unwrap();
284
285 let result = WalReader::open(&path);
286 assert!(result.is_err());
287 }
288
289 #[test]
290 fn test_read_large_page_write() {
291 let (_guard, path) = temp_wal("large");
292
293 let large_data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
294
295 {
297 let mut writer = WalWriter::open(&path).unwrap();
298 writer
299 .append(&WalRecord::PageWrite {
300 tx_id: 1,
301 page_id: 0,
302 data: large_data.clone(),
303 })
304 .unwrap();
305 }
306
307 let reader = WalReader::open(&path).unwrap();
309 let records: Vec<_> = reader.iter().collect();
310
311 assert_eq!(records.len(), 1);
312 match &records[0].as_ref().unwrap().1 {
313 WalRecord::PageWrite { data, .. } => {
314 assert_eq!(*data, large_data);
315 }
316 _ => panic!("Expected PageWrite"),
317 }
318 }
319}