reddb_server/storage/wal/
reader.rs1use super::record::WalRecord;
2use std::fs::File;
3use std::io::{self, BufReader, Read};
4use std::path::Path;
5
6pub struct WalReader {
8 reader: BufReader<File>,
9 position: u64,
10 format_version: u8,
11}
12
13impl WalReader {
14 pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
16 let file = File::open(path)?;
17 let mut reader = BufReader::new(file);
18
19 let mut header = [0u8; reddb_file::WAL_FILE_HEADER_BYTES];
21 reader.read_exact(&mut header)?;
22 let decoded = reddb_file::decode_wal_file_header(&header)?;
23
24 Ok(Self {
25 reader,
26 position: reddb_file::WAL_FILE_HEADER_BYTES as u64,
27 format_version: decoded.version,
28 })
29 }
30
31 pub fn iter(self) -> WalIterator {
34 WalIterator {
35 reader: self.reader,
36 position: self.position,
37 format_version: self.format_version,
38 }
39 }
40
41 pub fn collect_full_page_images<P: AsRef<Path>>(
46 path: P,
47 ) -> io::Result<std::collections::BTreeMap<u32, (u64, Vec<u8>)>> {
48 use std::collections::BTreeMap;
49 let mut out: BTreeMap<u32, (u64, Vec<u8>)> = BTreeMap::new();
50 if !path.as_ref().exists() {
51 return Ok(out);
52 }
53 let reader = Self::open(path)?;
54 for item in reader.iter() {
55 let (lsn, record) = item?;
56 if let WalRecord::FullPageImage { page_id, data, .. } = record {
57 match out.get(&page_id) {
58 Some((existing_lsn, _)) if *existing_lsn > lsn => {}
59 _ => {
60 out.insert(page_id, (lsn, data));
61 }
62 }
63 }
64 }
65 Ok(out)
66 }
67}
68
69pub struct WalIterator {
70 reader: BufReader<File>,
71 position: u64,
72 format_version: u8,
73}
74
75impl Iterator for WalIterator {
76 type Item = io::Result<(u64, WalRecord)>;
77
78 fn next(&mut self) -> Option<Self::Item> {
79 let start_pos = self.position;
90
91 struct CountingReader<'a, R> {
94 inner: &'a mut R,
95 count: u64,
96 }
97
98 impl<'a, R: Read> Read for CountingReader<'a, R> {
99 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
100 let n = self.inner.read(buf)?;
101 self.count += n as u64;
102 Ok(n)
103 }
104 }
105
106 let mut counter = CountingReader {
107 inner: &mut self.reader,
108 count: 0,
109 };
110
111 match WalRecord::read_with_format_version(&mut counter, self.format_version) {
112 Ok(Some((_, record))) => {
113 self.position += counter.count;
114 Some(Ok((start_pos, record)))
115 }
116 Ok(None) => None, Err(e) => Some(Err(e)),
118 }
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::super::writer::WalWriter;
125 use super::*;
126 use std::path::PathBuf;
127
128 struct FileGuard {
129 path: PathBuf,
130 }
131
132 impl Drop for FileGuard {
133 fn drop(&mut self) {
134 let _ = std::fs::remove_file(&self.path);
135 }
136 }
137
138 fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
139 let path = reddb_file::layout::wal_component_temp_path(
140 &std::env::temp_dir(),
141 "reader",
142 name,
143 std::process::id(),
144 );
145 let guard = FileGuard { path: path.clone() };
146 let _ = std::fs::remove_file(&path);
147 (guard, path)
148 }
149
150 #[test]
151 fn test_read_empty_wal() {
152 let (_guard, path) = temp_wal("empty");
153
154 {
156 let _writer = WalWriter::open(&path).unwrap();
157 }
158
159 let reader = WalReader::open(&path).unwrap();
161 let records: Vec<_> = reader.iter().collect();
162 assert!(records.is_empty());
163 }
164
165 #[test]
166 fn test_read_single_record() {
167 let (_guard, path) = temp_wal("single");
168
169 {
171 let mut writer = WalWriter::open(&path).unwrap();
172 writer.append(&WalRecord::Begin { tx_id: 42 }).unwrap();
173 }
174
175 let reader = WalReader::open(&path).unwrap();
177 let records: Vec<_> = reader.iter().collect();
178
179 assert_eq!(records.len(), 1);
180 let (lsn, record) = records[0].as_ref().unwrap();
181 assert_eq!(*lsn, 8);
182 assert_eq!(*record, WalRecord::Begin { tx_id: 42 });
183 }
184
185 #[test]
186 fn test_read_multiple_records() {
187 let (_guard, path) = temp_wal("multi");
188
189 {
191 let mut writer = WalWriter::open(&path).unwrap();
192 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
193 writer
194 .append(&WalRecord::PageWrite {
195 tx_id: 1,
196 page_id: 10,
197 data: vec![1, 2, 3],
198 })
199 .unwrap();
200 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
201 }
202
203 let reader = WalReader::open(&path).unwrap();
205 let records: Vec<_> = reader.iter().collect();
206
207 assert_eq!(records.len(), 3);
208
209 match &records[0].as_ref().unwrap().1 {
211 WalRecord::Begin { tx_id } => assert_eq!(*tx_id, 1),
212 _ => panic!("Expected Begin"),
213 }
214 match &records[1].as_ref().unwrap().1 {
215 WalRecord::PageWrite {
216 tx_id,
217 page_id,
218 data,
219 } => {
220 assert_eq!(*tx_id, 1);
221 assert_eq!(*page_id, 10);
222 assert_eq!(data, &vec![1, 2, 3]);
223 }
224 _ => panic!("Expected PageWrite"),
225 }
226 match &records[2].as_ref().unwrap().1 {
227 WalRecord::Commit { tx_id } => assert_eq!(*tx_id, 1),
228 _ => panic!("Expected Commit"),
229 }
230 }
231
232 #[test]
233 fn test_lsn_tracking() {
234 let (_guard, path) = temp_wal("lsn");
235
236 let (lsn1, lsn2, lsn3);
238 {
239 let mut writer = WalWriter::open(&path).unwrap();
240 lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
241 lsn2 = writer.append(&WalRecord::Checkpoint { lsn: 100 }).unwrap();
242 lsn3 = writer.append(&WalRecord::Rollback { tx_id: 1 }).unwrap();
243 }
244
245 let reader = WalReader::open(&path).unwrap();
247 let records: Vec<_> = reader.iter().collect();
248
249 assert_eq!(records.len(), 3);
250 assert_eq!(records[0].as_ref().unwrap().0, lsn1);
251 assert_eq!(records[1].as_ref().unwrap().0, lsn2);
252 assert_eq!(records[2].as_ref().unwrap().0, lsn3);
253 }
254
255 #[test]
256 fn test_invalid_magic() {
257 let (_guard, path) = temp_wal("badmagic");
258
259 std::fs::write(&path, b"BAAD0000").unwrap();
261
262 let result = WalReader::open(&path);
263 assert!(result.is_err());
264 }
265
266 #[test]
267 fn test_invalid_version() {
268 let (_guard, path) = temp_wal("badver");
269
270 let mut header = reddb_file::encode_wal_file_header();
272 header[4] = 99; std::fs::write(&path, header).unwrap();
274
275 let result = WalReader::open(&path);
276 assert!(result.is_err());
277 }
278
279 #[test]
280 fn test_read_large_page_write() {
281 let (_guard, path) = temp_wal("large");
282
283 let large_data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
284
285 {
287 let mut writer = WalWriter::open(&path).unwrap();
288 writer
289 .append(&WalRecord::PageWrite {
290 tx_id: 1,
291 page_id: 0,
292 data: large_data.clone(),
293 })
294 .unwrap();
295 }
296
297 let reader = WalReader::open(&path).unwrap();
299 let records: Vec<_> = reader.iter().collect();
300
301 assert_eq!(records.len(), 1);
302 match &records[0].as_ref().unwrap().1 {
303 WalRecord::PageWrite { data, .. } => {
304 assert_eq!(*data, large_data);
305 }
306 _ => panic!("Expected PageWrite"),
307 }
308 }
309}