1use std::fs::File;
17use std::io::Read;
18use std::path::Path;
19
20use crate::error::{Result, WalError};
21use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WalRecord};
22
23pub struct WalReader {
25 file: File,
26 offset: u64,
27 double_write: Option<crate::double_write::DoubleWriteBuffer>,
29}
30
31impl WalReader {
32 pub fn open(path: &Path) -> Result<Self> {
37 let file = File::open(path)?;
38 let dwb_path = path.with_extension("dwb");
39 let double_write = if dwb_path.exists() {
40 crate::double_write::DoubleWriteBuffer::open(&dwb_path).ok()
41 } else {
42 None
43 };
44 Ok(Self {
45 file,
46 offset: 0,
47 double_write,
48 })
49 }
50
51 pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
56 loop {
57 let mut header_buf = [0u8; HEADER_SIZE];
59 match self.read_exact(&mut header_buf) {
60 Ok(()) => {}
61 Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
62 return Ok(None); }
64 Err(e) => return Err(e),
65 }
66
67 let header = RecordHeader::from_bytes(&header_buf);
68
69 if header.validate(self.offset - HEADER_SIZE as u64).is_err() {
71 return Ok(None);
73 }
74
75 let mut payload = vec![0u8; header.payload_len as usize];
77 if !payload.is_empty() {
78 match self.read_exact(&mut payload) {
79 Ok(()) => {}
80 Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
81 return Ok(None);
82 }
83 Err(e) => return Err(e),
84 }
85 }
86
87 let record = WalRecord { header, payload };
88
89 if record.verify_checksum().is_err() {
91 if let Some(dwb) = &mut self.double_write
92 && let Ok(Some(recovered)) = dwb.recover_record(header.lsn)
93 {
94 tracing::info!(
95 lsn = header.lsn,
96 "recovered torn write from double-write buffer"
97 );
98 self.offset += recovered.payload.len() as u64;
99 return Ok(Some(recovered));
100 }
101 return Ok(None);
102 }
103
104 let logical_type = record.logical_record_type();
106 if RecordType::from_raw(logical_type).is_none() {
107 if RecordType::is_required(logical_type) {
108 return Err(WalError::UnknownRequiredRecordType {
109 record_type: header.record_type,
110 lsn: header.lsn,
111 });
112 }
113 continue;
115 }
116
117 return Ok(Some(record));
118 }
119 }
120
121 pub fn records(self) -> WalRecordIter {
123 WalRecordIter { reader: self }
124 }
125
126 pub fn offset(&self) -> u64 {
128 self.offset
129 }
130
131 fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
132 self.file.read_exact(buf)?;
133 self.offset += buf.len() as u64;
134 Ok(())
135 }
136}
137
138pub struct WalRecordIter {
140 reader: WalReader,
141}
142
143impl Iterator for WalRecordIter {
144 type Item = Result<WalRecord>;
145
146 fn next(&mut self) -> Option<Self::Item> {
147 match self.reader.next_record() {
148 Ok(Some(record)) => Some(Ok(record)),
149 Ok(None) => None,
150 Err(e) => Some(Err(e)),
151 }
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use crate::record::RecordType;
159 use crate::writer::WalWriter;
160
161 #[test]
162 fn write_then_read_roundtrip() {
163 let dir = tempfile::tempdir().unwrap();
164 let path = dir.path().join("test.wal");
165
166 {
168 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
169 writer
170 .append(RecordType::Put as u16, 1, 0, b"first")
171 .unwrap();
172 writer
173 .append(RecordType::Put as u16, 2, 1, b"second")
174 .unwrap();
175 writer
176 .append(RecordType::Delete as u16, 1, 0, b"third")
177 .unwrap();
178 writer.sync().unwrap();
179 }
180
181 let reader = WalReader::open(&path).unwrap();
183 let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
184
185 assert_eq!(records.len(), 3);
186 assert_eq!(records[0].header.lsn, 1);
187 assert_eq!(records[0].header.tenant_id, 1);
188 assert_eq!(records[0].payload, b"first");
189
190 assert_eq!(records[1].header.lsn, 2);
191 assert_eq!(records[1].header.tenant_id, 2);
192 assert_eq!(records[1].header.vshard_id, 1);
193 assert_eq!(records[1].payload, b"second");
194
195 assert_eq!(records[2].header.lsn, 3);
196 assert_eq!(records[2].header.record_type, RecordType::Delete as u16);
197 assert_eq!(records[2].payload, b"third");
198 }
199
200 #[test]
201 fn empty_wal_yields_no_records() {
202 let dir = tempfile::tempdir().unwrap();
203 let path = dir.path().join("empty.wal");
204
205 {
207 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
208 writer.sync().unwrap();
209 }
210
211 let reader = WalReader::open(&path).unwrap();
212 let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
213 assert!(records.is_empty());
214 }
215
216 #[test]
217 fn truncated_file_stops_at_committed_prefix() {
218 let dir = tempfile::tempdir().unwrap();
219 let path = dir.path().join("truncated.wal");
220
221 {
223 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
224 writer
225 .append(RecordType::Put as u16, 1, 0, b"good-record")
226 .unwrap();
227 writer.sync().unwrap();
228 }
229
230 {
232 use std::io::Write;
233 let mut file = std::fs::OpenOptions::new()
234 .append(true)
235 .open(&path)
236 .unwrap();
237 file.write_all(b"GARBAGE_PARTIAL_RECORD").unwrap();
238 }
239
240 let reader = WalReader::open(&path).unwrap();
242 let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
243 assert_eq!(records.len(), 1);
244 assert_eq!(records[0].payload, b"good-record");
245 }
246
247 #[test]
248 fn skip_many_unknown_optional_records_is_iterative() {
249 const UNKNOWN_OPTIONAL: u16 = 99; const SKIP_COUNT: usize = 50_000;
258
259 let dir = tempfile::tempdir().unwrap();
260 let path = dir.path().join("many_unknown.wal");
261
262 {
263 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
264 for _ in 0..SKIP_COUNT {
265 writer.append(UNKNOWN_OPTIONAL, 1, 0, b"skip-me").unwrap();
266 }
267 writer
268 .append(RecordType::Put as u16, 1, 0, b"keep-me")
269 .unwrap();
270 writer.sync().unwrap();
271 }
272
273 let reader = WalReader::open(&path).unwrap();
274 let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
275
276 assert_eq!(records.len(), 1);
279 assert_eq!(records[0].payload, b"keep-me");
280 }
281}