reddb_server/storage/wal/
reader.rs1use super::record::{WalRecord, WAL_MAGIC, WAL_VERSION};
2use std::fs::File;
3use std::io::{self, BufReader, Read};
4use std::path::Path;
5
6pub struct WalReader {
8 reader: BufReader<File>,
9 position: u64,
10}
11
12impl WalReader {
13 pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
15 let file = File::open(path)?;
16 let mut reader = BufReader::new(file);
17
18 let mut header = [0u8; 8];
20 reader.read_exact(&mut header)?;
21
22 if &header[0..4] != WAL_MAGIC {
23 return Err(io::Error::new(
24 io::ErrorKind::InvalidData,
25 "Invalid WAL magic bytes",
26 ));
27 }
28
29 if header[4] != WAL_VERSION {
30 return Err(io::Error::new(
31 io::ErrorKind::InvalidData,
32 format!("Unsupported WAL version: {}", header[4]),
33 ));
34 }
35
36 Ok(Self {
37 reader,
38 position: 8,
39 })
40 }
41
42 pub fn iter(self) -> WalIterator {
45 WalIterator {
46 reader: self.reader,
47 position: self.position,
48 }
49 }
50}
51
52pub struct WalIterator {
53 reader: BufReader<File>,
54 position: u64,
55}
56
57impl Iterator for WalIterator {
58 type Item = io::Result<(u64, WalRecord)>;
59
60 fn next(&mut self) -> Option<Self::Item> {
61 let start_pos = self.position;
72
73 struct CountingReader<'a, R> {
76 inner: &'a mut R,
77 count: u64,
78 }
79
80 impl<'a, R: Read> Read for CountingReader<'a, R> {
81 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
82 let n = self.inner.read(buf)?;
83 self.count += n as u64;
84 Ok(n)
85 }
86 }
87
88 let mut counter = CountingReader {
89 inner: &mut self.reader,
90 count: 0,
91 };
92
93 match WalRecord::read(&mut counter) {
94 Ok(Some(record)) => {
95 self.position += counter.count;
96 Some(Ok((start_pos, record)))
97 }
98 Ok(None) => None, Err(e) => Some(Err(e)),
100 }
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::super::writer::WalWriter;
107 use super::*;
108 use std::path::PathBuf;
109
110 struct FileGuard {
111 path: PathBuf,
112 }
113
114 impl Drop for FileGuard {
115 fn drop(&mut self) {
116 let _ = std::fs::remove_file(&self.path);
117 }
118 }
119
120 fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
121 let path =
122 std::env::temp_dir().join(format!("rb_wal_reader_{}_{}.wal", name, std::process::id()));
123 let guard = FileGuard { path: path.clone() };
124 let _ = std::fs::remove_file(&path);
125 (guard, path)
126 }
127
128 #[test]
129 fn test_read_empty_wal() {
130 let (_guard, path) = temp_wal("empty");
131
132 {
134 let _writer = WalWriter::open(&path).unwrap();
135 }
136
137 let reader = WalReader::open(&path).unwrap();
139 let records: Vec<_> = reader.iter().collect();
140 assert!(records.is_empty());
141 }
142
143 #[test]
144 fn test_read_single_record() {
145 let (_guard, path) = temp_wal("single");
146
147 {
149 let mut writer = WalWriter::open(&path).unwrap();
150 writer.append(&WalRecord::Begin { tx_id: 42 }).unwrap();
151 }
152
153 let reader = WalReader::open(&path).unwrap();
155 let records: Vec<_> = reader.iter().collect();
156
157 assert_eq!(records.len(), 1);
158 let (lsn, record) = records[0].as_ref().unwrap();
159 assert_eq!(*lsn, 8);
160 assert_eq!(*record, WalRecord::Begin { tx_id: 42 });
161 }
162
163 #[test]
164 fn test_read_multiple_records() {
165 let (_guard, path) = temp_wal("multi");
166
167 {
169 let mut writer = WalWriter::open(&path).unwrap();
170 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
171 writer
172 .append(&WalRecord::PageWrite {
173 tx_id: 1,
174 page_id: 10,
175 data: vec![1, 2, 3],
176 })
177 .unwrap();
178 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
179 }
180
181 let reader = WalReader::open(&path).unwrap();
183 let records: Vec<_> = reader.iter().collect();
184
185 assert_eq!(records.len(), 3);
186
187 match &records[0].as_ref().unwrap().1 {
189 WalRecord::Begin { tx_id } => assert_eq!(*tx_id, 1),
190 _ => panic!("Expected Begin"),
191 }
192 match &records[1].as_ref().unwrap().1 {
193 WalRecord::PageWrite {
194 tx_id,
195 page_id,
196 data,
197 } => {
198 assert_eq!(*tx_id, 1);
199 assert_eq!(*page_id, 10);
200 assert_eq!(data, &vec![1, 2, 3]);
201 }
202 _ => panic!("Expected PageWrite"),
203 }
204 match &records[2].as_ref().unwrap().1 {
205 WalRecord::Commit { tx_id } => assert_eq!(*tx_id, 1),
206 _ => panic!("Expected Commit"),
207 }
208 }
209
210 #[test]
211 fn test_lsn_tracking() {
212 let (_guard, path) = temp_wal("lsn");
213
214 let (lsn1, lsn2, lsn3);
216 {
217 let mut writer = WalWriter::open(&path).unwrap();
218 lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
219 lsn2 = writer.append(&WalRecord::Checkpoint { lsn: 100 }).unwrap();
220 lsn3 = writer.append(&WalRecord::Rollback { tx_id: 1 }).unwrap();
221 }
222
223 let reader = WalReader::open(&path).unwrap();
225 let records: Vec<_> = reader.iter().collect();
226
227 assert_eq!(records.len(), 3);
228 assert_eq!(records[0].as_ref().unwrap().0, lsn1);
229 assert_eq!(records[1].as_ref().unwrap().0, lsn2);
230 assert_eq!(records[2].as_ref().unwrap().0, lsn3);
231 }
232
233 #[test]
234 fn test_invalid_magic() {
235 let (_guard, path) = temp_wal("badmagic");
236
237 std::fs::write(&path, b"BAAD0000").unwrap();
239
240 let result = WalReader::open(&path);
241 assert!(result.is_err());
242 }
243
244 #[test]
245 fn test_invalid_version() {
246 let (_guard, path) = temp_wal("badver");
247
248 let mut header = Vec::new();
250 header.extend_from_slice(WAL_MAGIC);
251 header.push(99); header.extend_from_slice(&[0u8; 3]);
253 std::fs::write(&path, &header).unwrap();
254
255 let result = WalReader::open(&path);
256 assert!(result.is_err());
257 }
258
259 #[test]
260 fn test_read_large_page_write() {
261 let (_guard, path) = temp_wal("large");
262
263 let large_data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
264
265 {
267 let mut writer = WalWriter::open(&path).unwrap();
268 writer
269 .append(&WalRecord::PageWrite {
270 tx_id: 1,
271 page_id: 0,
272 data: large_data.clone(),
273 })
274 .unwrap();
275 }
276
277 let reader = WalReader::open(&path).unwrap();
279 let records: Vec<_> = reader.iter().collect();
280
281 assert_eq!(records.len(), 1);
282 match &records[0].as_ref().unwrap().1 {
283 WalRecord::PageWrite { data, .. } => {
284 assert_eq!(*data, large_data);
285 }
286 _ => panic!("Expected PageWrite"),
287 }
288 }
289}