reddb_server/storage/wal/
reader.rs1use super::record::{WalRecord, WAL_MAGIC, WAL_VERSION};
2use std::fs::File;
3use std::io::{self, BufReader, Read};
4use std::path::Path;
5
6pub struct WalReader {
8 reader: BufReader<File>,
9 position: u64,
10}
11
12impl WalReader {
13 pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
15 let file = File::open(path)?;
16 let mut reader = BufReader::new(file);
17
18 let mut header = [0u8; 8];
20 reader.read_exact(&mut header)?;
21
22 if &header[0..4] != WAL_MAGIC {
23 return Err(io::Error::new(
24 io::ErrorKind::InvalidData,
25 "Invalid WAL magic bytes",
26 ));
27 }
28
29 if header[4] != WAL_VERSION {
30 return Err(io::Error::new(
31 io::ErrorKind::InvalidData,
32 format!("Unsupported WAL version: {}", header[4]),
33 ));
34 }
35
36 Ok(Self {
37 reader,
38 position: 8,
39 })
40 }
41
42 pub fn iter(self) -> WalIterator {
45 WalIterator {
46 reader: self.reader,
47 position: self.position,
48 }
49 }
50
51 pub fn collect_full_page_images<P: AsRef<Path>>(
56 path: P,
57 ) -> io::Result<std::collections::BTreeMap<u32, (u64, Vec<u8>)>> {
58 use std::collections::BTreeMap;
59 let mut out: BTreeMap<u32, (u64, Vec<u8>)> = BTreeMap::new();
60 if !path.as_ref().exists() {
61 return Ok(out);
62 }
63 let reader = Self::open(path)?;
64 for item in reader.iter() {
65 let (lsn, record) = item?;
66 if let WalRecord::FullPageImage { page_id, data, .. } = record {
67 match out.get(&page_id) {
68 Some((existing_lsn, _)) if *existing_lsn > lsn => {}
69 _ => {
70 out.insert(page_id, (lsn, data));
71 }
72 }
73 }
74 }
75 Ok(out)
76 }
77}
78
79pub struct WalIterator {
80 reader: BufReader<File>,
81 position: u64,
82}
83
84impl Iterator for WalIterator {
85 type Item = io::Result<(u64, WalRecord)>;
86
87 fn next(&mut self) -> Option<Self::Item> {
88 let start_pos = self.position;
99
100 struct CountingReader<'a, R> {
103 inner: &'a mut R,
104 count: u64,
105 }
106
107 impl<'a, R: Read> Read for CountingReader<'a, R> {
108 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
109 let n = self.inner.read(buf)?;
110 self.count += n as u64;
111 Ok(n)
112 }
113 }
114
115 let mut counter = CountingReader {
116 inner: &mut self.reader,
117 count: 0,
118 };
119
120 match WalRecord::read(&mut counter) {
121 Ok(Some(record)) => {
122 self.position += counter.count;
123 Some(Ok((start_pos, record)))
124 }
125 Ok(None) => None, Err(e) => Some(Err(e)),
127 }
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::super::writer::WalWriter;
134 use super::*;
135 use std::path::PathBuf;
136
137 struct FileGuard {
138 path: PathBuf,
139 }
140
141 impl Drop for FileGuard {
142 fn drop(&mut self) {
143 let _ = std::fs::remove_file(&self.path);
144 }
145 }
146
147 fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
148 let path =
149 std::env::temp_dir().join(format!("rb_wal_reader_{}_{}.wal", name, std::process::id()));
150 let guard = FileGuard { path: path.clone() };
151 let _ = std::fs::remove_file(&path);
152 (guard, path)
153 }
154
155 #[test]
156 fn test_read_empty_wal() {
157 let (_guard, path) = temp_wal("empty");
158
159 {
161 let _writer = WalWriter::open(&path).unwrap();
162 }
163
164 let reader = WalReader::open(&path).unwrap();
166 let records: Vec<_> = reader.iter().collect();
167 assert!(records.is_empty());
168 }
169
170 #[test]
171 fn test_read_single_record() {
172 let (_guard, path) = temp_wal("single");
173
174 {
176 let mut writer = WalWriter::open(&path).unwrap();
177 writer.append(&WalRecord::Begin { tx_id: 42 }).unwrap();
178 }
179
180 let reader = WalReader::open(&path).unwrap();
182 let records: Vec<_> = reader.iter().collect();
183
184 assert_eq!(records.len(), 1);
185 let (lsn, record) = records[0].as_ref().unwrap();
186 assert_eq!(*lsn, 8);
187 assert_eq!(*record, WalRecord::Begin { tx_id: 42 });
188 }
189
190 #[test]
191 fn test_read_multiple_records() {
192 let (_guard, path) = temp_wal("multi");
193
194 {
196 let mut writer = WalWriter::open(&path).unwrap();
197 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
198 writer
199 .append(&WalRecord::PageWrite {
200 tx_id: 1,
201 page_id: 10,
202 data: vec![1, 2, 3],
203 })
204 .unwrap();
205 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
206 }
207
208 let reader = WalReader::open(&path).unwrap();
210 let records: Vec<_> = reader.iter().collect();
211
212 assert_eq!(records.len(), 3);
213
214 match &records[0].as_ref().unwrap().1 {
216 WalRecord::Begin { tx_id } => assert_eq!(*tx_id, 1),
217 _ => panic!("Expected Begin"),
218 }
219 match &records[1].as_ref().unwrap().1 {
220 WalRecord::PageWrite {
221 tx_id,
222 page_id,
223 data,
224 } => {
225 assert_eq!(*tx_id, 1);
226 assert_eq!(*page_id, 10);
227 assert_eq!(data, &vec![1, 2, 3]);
228 }
229 _ => panic!("Expected PageWrite"),
230 }
231 match &records[2].as_ref().unwrap().1 {
232 WalRecord::Commit { tx_id } => assert_eq!(*tx_id, 1),
233 _ => panic!("Expected Commit"),
234 }
235 }
236
237 #[test]
238 fn test_lsn_tracking() {
239 let (_guard, path) = temp_wal("lsn");
240
241 let (lsn1, lsn2, lsn3);
243 {
244 let mut writer = WalWriter::open(&path).unwrap();
245 lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
246 lsn2 = writer.append(&WalRecord::Checkpoint { lsn: 100 }).unwrap();
247 lsn3 = writer.append(&WalRecord::Rollback { tx_id: 1 }).unwrap();
248 }
249
250 let reader = WalReader::open(&path).unwrap();
252 let records: Vec<_> = reader.iter().collect();
253
254 assert_eq!(records.len(), 3);
255 assert_eq!(records[0].as_ref().unwrap().0, lsn1);
256 assert_eq!(records[1].as_ref().unwrap().0, lsn2);
257 assert_eq!(records[2].as_ref().unwrap().0, lsn3);
258 }
259
260 #[test]
261 fn test_invalid_magic() {
262 let (_guard, path) = temp_wal("badmagic");
263
264 std::fs::write(&path, b"BAAD0000").unwrap();
266
267 let result = WalReader::open(&path);
268 assert!(result.is_err());
269 }
270
271 #[test]
272 fn test_invalid_version() {
273 let (_guard, path) = temp_wal("badver");
274
275 let mut header = Vec::new();
277 header.extend_from_slice(WAL_MAGIC);
278 header.push(99); header.extend_from_slice(&[0u8; 3]);
280 std::fs::write(&path, &header).unwrap();
281
282 let result = WalReader::open(&path);
283 assert!(result.is_err());
284 }
285
286 #[test]
287 fn test_read_large_page_write() {
288 let (_guard, path) = temp_wal("large");
289
290 let large_data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
291
292 {
294 let mut writer = WalWriter::open(&path).unwrap();
295 writer
296 .append(&WalRecord::PageWrite {
297 tx_id: 1,
298 page_id: 0,
299 data: large_data.clone(),
300 })
301 .unwrap();
302 }
303
304 let reader = WalReader::open(&path).unwrap();
306 let records: Vec<_> = reader.iter().collect();
307
308 assert_eq!(records.len(), 1);
309 match &records[0].as_ref().unwrap().1 {
310 WalRecord::PageWrite { data, .. } => {
311 assert_eq!(*data, large_data);
312 }
313 _ => panic!("Expected PageWrite"),
314 }
315 }
316}