1use std::fs::File;
19use std::io::Read;
20use std::path::Path;
21
22use crate::error::{Result, WalError};
23use crate::preamble::{PREAMBLE_SIZE, SegmentPreamble, WAL_PREAMBLE_MAGIC};
24use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WalRecord};
25
26pub struct WalReader {
28 file: File,
29 offset: u64,
30 segment_preamble: Option<SegmentPreamble>,
33 double_write: Option<crate::double_write::DoubleWriteBuffer>,
35}
36
37impl WalReader {
38 pub fn open(path: &Path) -> Result<Self> {
47 let mut file = File::open(path)?;
48 let dwb_path = path.with_extension("dwb");
49 let double_write = if dwb_path.exists() {
50 crate::double_write::DoubleWriteBuffer::open(
51 &dwb_path,
52 crate::double_write::DwbMode::Buffered,
53 )
54 .ok()
55 } else {
56 None
57 };
58
59 let (segment_preamble, start_offset) = try_read_preamble(&mut file)?;
63
64 Ok(Self {
65 file,
66 offset: start_offset,
67 segment_preamble,
68 double_write,
69 })
70 }
71
72 pub fn segment_preamble(&self) -> Option<&SegmentPreamble> {
76 self.segment_preamble.as_ref()
77 }
78
79 pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
84 loop {
85 let mut header_buf = [0u8; HEADER_SIZE];
87 match self.read_exact(&mut header_buf) {
88 Ok(()) => {}
89 Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
90 return Ok(None); }
92 Err(e) => return Err(e),
93 }
94
95 let header = RecordHeader::from_bytes(&header_buf);
96
97 if header.validate(self.offset - HEADER_SIZE as u64).is_err() {
99 return Ok(None);
101 }
102
103 let mut payload = vec![0u8; header.payload_len as usize];
105 if !payload.is_empty() {
106 match self.read_exact(&mut payload) {
107 Ok(()) => {}
108 Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
109 return Ok(None);
110 }
111 Err(e) => return Err(e),
112 }
113 }
114
115 let record = WalRecord { header, payload };
116
117 if record.verify_checksum().is_err() {
119 if let Some(dwb) = &mut self.double_write
120 && let Ok(Some(recovered)) = dwb.recover_record(header.lsn)
121 {
122 tracing::info!(
123 lsn = header.lsn,
124 "recovered torn write from double-write buffer"
125 );
126 self.offset += recovered.payload.len() as u64;
127 return Ok(Some(recovered));
128 }
129 return Ok(None);
130 }
131
132 let logical_type = record.logical_record_type();
134 if RecordType::from_raw(logical_type).is_none() {
135 if RecordType::is_required(logical_type) {
136 return Err(WalError::UnknownRequiredRecordType {
137 record_type: header.record_type,
138 lsn: header.lsn,
139 });
140 }
141 continue;
143 }
144
145 return Ok(Some(record));
146 }
147 }
148
149 pub fn records(self) -> WalRecordIter {
151 WalRecordIter { reader: self }
152 }
153
154 pub fn offset(&self) -> u64 {
156 self.offset
157 }
158
159 fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
160 self.file.read_exact(buf)?;
161 self.offset += buf.len() as u64;
162 Ok(())
163 }
164}
165
166fn try_read_preamble(file: &mut File) -> Result<(Option<SegmentPreamble>, u64)> {
172 use std::io::Seek;
173
174 let mut buf = [0u8; PREAMBLE_SIZE];
175 match file.read_exact(&mut buf) {
176 Ok(()) => {}
177 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
178 file.seek(std::io::SeekFrom::Start(0))?;
180 return Ok((None, 0));
181 }
182 Err(e) => return Err(WalError::Io(e)),
183 }
184
185 if buf[0..4] == WAL_PREAMBLE_MAGIC {
186 let preamble = SegmentPreamble::from_bytes(&buf, &WAL_PREAMBLE_MAGIC)?;
189 Ok((Some(preamble), PREAMBLE_SIZE as u64))
190 } else {
191 file.seek(std::io::SeekFrom::Start(0))?;
193 Ok((None, 0))
194 }
195}
196
197pub struct WalRecordIter {
199 reader: WalReader,
200}
201
202impl Iterator for WalRecordIter {
203 type Item = Result<WalRecord>;
204
205 fn next(&mut self) -> Option<Self::Item> {
206 match self.reader.next_record() {
207 Ok(Some(record)) => Some(Ok(record)),
208 Ok(None) => None,
209 Err(e) => Some(Err(e)),
210 }
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use crate::record::RecordType;
218 use crate::writer::WalWriter;
219
220 #[test]
221 fn write_then_read_roundtrip() {
222 let dir = tempfile::tempdir().unwrap();
223 let path = dir.path().join("test.wal");
224
225 {
227 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
228 writer
229 .append(RecordType::Put as u32, 1, 0, 0, b"first")
230 .unwrap();
231 writer
232 .append(RecordType::Put as u32, 2, 1, 0, b"second")
233 .unwrap();
234 writer
235 .append(RecordType::Delete as u32, 1, 0, 0, b"third")
236 .unwrap();
237 writer.sync().unwrap();
238 }
239
240 let reader = WalReader::open(&path).unwrap();
242 let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
243
244 assert_eq!(records.len(), 3);
245 assert_eq!(records[0].header.lsn, 1);
246 assert_eq!(records[0].header.tenant_id, 1);
247 assert_eq!(records[0].payload, b"first");
248
249 assert_eq!(records[1].header.lsn, 2);
250 assert_eq!(records[1].header.tenant_id, 2);
251 assert_eq!(records[1].header.vshard_id, 1);
252 assert_eq!(records[1].payload, b"second");
253
254 assert_eq!(records[2].header.lsn, 3);
255 assert_eq!(records[2].header.record_type, RecordType::Delete as u32);
256 assert_eq!(records[2].payload, b"third");
257 }
258
259 #[test]
260 fn empty_wal_yields_no_records() {
261 let dir = tempfile::tempdir().unwrap();
262 let path = dir.path().join("empty.wal");
263
264 {
266 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
267 writer.sync().unwrap();
268 }
269
270 let reader = WalReader::open(&path).unwrap();
271 let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
272 assert!(records.is_empty());
273 }
274
275 #[test]
276 fn truncated_file_stops_at_committed_prefix() {
277 let dir = tempfile::tempdir().unwrap();
278 let path = dir.path().join("truncated.wal");
279
280 {
282 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
283 writer
284 .append(RecordType::Put as u32, 1, 0, 0, b"good-record")
285 .unwrap();
286 writer.sync().unwrap();
287 }
288
289 {
291 use std::io::Write;
292 let mut file = std::fs::OpenOptions::new()
293 .append(true)
294 .open(&path)
295 .unwrap();
296 file.write_all(b"GARBAGE_PARTIAL_RECORD").unwrap();
297 }
298
299 let reader = WalReader::open(&path).unwrap();
301 let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
302 assert_eq!(records.len(), 1);
303 assert_eq!(records[0].payload, b"good-record");
304 }
305
306 #[test]
307 fn skip_many_unknown_optional_records_is_iterative() {
308 const UNKNOWN_OPTIONAL: u32 = 99; const SKIP_COUNT: usize = 50_000;
317
318 let dir = tempfile::tempdir().unwrap();
319 let path = dir.path().join("many_unknown.wal");
320
321 {
322 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
323 for _ in 0..SKIP_COUNT {
324 writer
325 .append(UNKNOWN_OPTIONAL, 1, 0, 0, b"skip-me")
326 .unwrap();
327 }
328 writer
329 .append(RecordType::Put as u32, 1, 0, 0, b"keep-me")
330 .unwrap();
331 writer.sync().unwrap();
332 }
333
334 let reader = WalReader::open(&path).unwrap();
335 let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
336
337 assert_eq!(records.len(), 1);
340 assert_eq!(records[0].payload, b"keep-me");
341 }
342}