reddb_server/storage/wal/
reader.rs1use super::record::{WalRecord, WAL_MAGIC, WAL_VERSION, WAL_VERSION_V2};
2use std::fs::File;
3use std::io::{self, BufReader, Read};
4use std::path::Path;
5
6pub struct WalReader {
8 reader: BufReader<File>,
9 position: u64,
10 format_version: u8,
11}
12
13impl WalReader {
14 pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
16 let file = File::open(path)?;
17 let mut reader = BufReader::new(file);
18
19 let mut header = [0u8; 8];
21 reader.read_exact(&mut header)?;
22
23 if &header[0..4] != WAL_MAGIC {
24 return Err(io::Error::new(
25 io::ErrorKind::InvalidData,
26 "Invalid WAL magic bytes",
27 ));
28 }
29
30 if header[4] != WAL_VERSION && header[4] != WAL_VERSION_V2 {
31 return Err(io::Error::new(
32 io::ErrorKind::InvalidData,
33 format!("Unsupported WAL version: {}", header[4]),
34 ));
35 }
36
37 Ok(Self {
38 reader,
39 position: 8,
40 format_version: header[4],
41 })
42 }
43
44 pub fn iter(self) -> WalIterator {
47 WalIterator {
48 reader: self.reader,
49 position: self.position,
50 format_version: self.format_version,
51 }
52 }
53
54 pub fn collect_full_page_images<P: AsRef<Path>>(
59 path: P,
60 ) -> io::Result<std::collections::BTreeMap<u32, (u64, Vec<u8>)>> {
61 use std::collections::BTreeMap;
62 let mut out: BTreeMap<u32, (u64, Vec<u8>)> = BTreeMap::new();
63 if !path.as_ref().exists() {
64 return Ok(out);
65 }
66 let reader = Self::open(path)?;
67 for item in reader.iter() {
68 let (lsn, record) = item?;
69 if let WalRecord::FullPageImage { page_id, data, .. } = record {
70 match out.get(&page_id) {
71 Some((existing_lsn, _)) if *existing_lsn > lsn => {}
72 _ => {
73 out.insert(page_id, (lsn, data));
74 }
75 }
76 }
77 }
78 Ok(out)
79 }
80}
81
82pub struct WalIterator {
83 reader: BufReader<File>,
84 position: u64,
85 format_version: u8,
86}
87
88impl Iterator for WalIterator {
89 type Item = io::Result<(u64, WalRecord)>;
90
91 fn next(&mut self) -> Option<Self::Item> {
92 let start_pos = self.position;
103
104 struct CountingReader<'a, R> {
107 inner: &'a mut R,
108 count: u64,
109 }
110
111 impl<'a, R: Read> Read for CountingReader<'a, R> {
112 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
113 let n = self.inner.read(buf)?;
114 self.count += n as u64;
115 Ok(n)
116 }
117 }
118
119 let mut counter = CountingReader {
120 inner: &mut self.reader,
121 count: 0,
122 };
123
124 match WalRecord::read_with_format_version(&mut counter, self.format_version) {
125 Ok(Some((_, record))) => {
126 self.position += counter.count;
127 Some(Ok((start_pos, record)))
128 }
129 Ok(None) => None, Err(e) => Some(Err(e)),
131 }
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::super::writer::WalWriter;
138 use super::*;
139 use std::path::PathBuf;
140
141 struct FileGuard {
142 path: PathBuf,
143 }
144
145 impl Drop for FileGuard {
146 fn drop(&mut self) {
147 let _ = std::fs::remove_file(&self.path);
148 }
149 }
150
151 fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
152 let path =
153 std::env::temp_dir().join(format!("rb_wal_reader_{}_{}.wal", name, std::process::id()));
154 let guard = FileGuard { path: path.clone() };
155 let _ = std::fs::remove_file(&path);
156 (guard, path)
157 }
158
159 #[test]
160 fn test_read_empty_wal() {
161 let (_guard, path) = temp_wal("empty");
162
163 {
165 let _writer = WalWriter::open(&path).unwrap();
166 }
167
168 let reader = WalReader::open(&path).unwrap();
170 let records: Vec<_> = reader.iter().collect();
171 assert!(records.is_empty());
172 }
173
174 #[test]
175 fn test_read_single_record() {
176 let (_guard, path) = temp_wal("single");
177
178 {
180 let mut writer = WalWriter::open(&path).unwrap();
181 writer.append(&WalRecord::Begin { tx_id: 42 }).unwrap();
182 }
183
184 let reader = WalReader::open(&path).unwrap();
186 let records: Vec<_> = reader.iter().collect();
187
188 assert_eq!(records.len(), 1);
189 let (lsn, record) = records[0].as_ref().unwrap();
190 assert_eq!(*lsn, 8);
191 assert_eq!(*record, WalRecord::Begin { tx_id: 42 });
192 }
193
194 #[test]
195 fn test_read_multiple_records() {
196 let (_guard, path) = temp_wal("multi");
197
198 {
200 let mut writer = WalWriter::open(&path).unwrap();
201 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
202 writer
203 .append(&WalRecord::PageWrite {
204 tx_id: 1,
205 page_id: 10,
206 data: vec![1, 2, 3],
207 })
208 .unwrap();
209 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
210 }
211
212 let reader = WalReader::open(&path).unwrap();
214 let records: Vec<_> = reader.iter().collect();
215
216 assert_eq!(records.len(), 3);
217
218 match &records[0].as_ref().unwrap().1 {
220 WalRecord::Begin { tx_id } => assert_eq!(*tx_id, 1),
221 _ => panic!("Expected Begin"),
222 }
223 match &records[1].as_ref().unwrap().1 {
224 WalRecord::PageWrite {
225 tx_id,
226 page_id,
227 data,
228 } => {
229 assert_eq!(*tx_id, 1);
230 assert_eq!(*page_id, 10);
231 assert_eq!(data, &vec![1, 2, 3]);
232 }
233 _ => panic!("Expected PageWrite"),
234 }
235 match &records[2].as_ref().unwrap().1 {
236 WalRecord::Commit { tx_id } => assert_eq!(*tx_id, 1),
237 _ => panic!("Expected Commit"),
238 }
239 }
240
241 #[test]
242 fn test_lsn_tracking() {
243 let (_guard, path) = temp_wal("lsn");
244
245 let (lsn1, lsn2, lsn3);
247 {
248 let mut writer = WalWriter::open(&path).unwrap();
249 lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
250 lsn2 = writer.append(&WalRecord::Checkpoint { lsn: 100 }).unwrap();
251 lsn3 = writer.append(&WalRecord::Rollback { tx_id: 1 }).unwrap();
252 }
253
254 let reader = WalReader::open(&path).unwrap();
256 let records: Vec<_> = reader.iter().collect();
257
258 assert_eq!(records.len(), 3);
259 assert_eq!(records[0].as_ref().unwrap().0, lsn1);
260 assert_eq!(records[1].as_ref().unwrap().0, lsn2);
261 assert_eq!(records[2].as_ref().unwrap().0, lsn3);
262 }
263
264 #[test]
265 fn test_invalid_magic() {
266 let (_guard, path) = temp_wal("badmagic");
267
268 std::fs::write(&path, b"BAAD0000").unwrap();
270
271 let result = WalReader::open(&path);
272 assert!(result.is_err());
273 }
274
275 #[test]
276 fn test_invalid_version() {
277 let (_guard, path) = temp_wal("badver");
278
279 let mut header = Vec::new();
281 header.extend_from_slice(WAL_MAGIC);
282 header.push(99); header.extend_from_slice(&[0u8; 3]);
284 std::fs::write(&path, &header).unwrap();
285
286 let result = WalReader::open(&path);
287 assert!(result.is_err());
288 }
289
290 #[test]
291 fn test_read_large_page_write() {
292 let (_guard, path) = temp_wal("large");
293
294 let large_data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
295
296 {
298 let mut writer = WalWriter::open(&path).unwrap();
299 writer
300 .append(&WalRecord::PageWrite {
301 tx_id: 1,
302 page_id: 0,
303 data: large_data.clone(),
304 })
305 .unwrap();
306 }
307
308 let reader = WalReader::open(&path).unwrap();
310 let records: Vec<_> = reader.iter().collect();
311
312 assert_eq!(records.len(), 1);
313 match &records[0].as_ref().unwrap().1 {
314 WalRecord::PageWrite { data, .. } => {
315 assert_eq!(*data, large_data);
316 }
317 _ => panic!("Expected PageWrite"),
318 }
319 }
320}