1use std::fs::File;
17use std::io::Read;
18use std::path::Path;
19
20use crate::error::{Result, WalError};
21use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WalRecord};
22
23pub struct WalReader {
25 file: File,
26 offset: u64,
27 double_write: Option<crate::double_write::DoubleWriteBuffer>,
29}
30
31impl WalReader {
32 pub fn open(path: &Path) -> Result<Self> {
37 let file = File::open(path)?;
38 let dwb_path = path.with_extension("dwb");
39 let double_write = if dwb_path.exists() {
40 crate::double_write::DoubleWriteBuffer::open(
41 &dwb_path,
42 crate::double_write::DwbMode::Buffered,
43 )
44 .ok()
45 } else {
46 None
47 };
48 Ok(Self {
49 file,
50 offset: 0,
51 double_write,
52 })
53 }
54
55 pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
60 loop {
61 let mut header_buf = [0u8; HEADER_SIZE];
63 match self.read_exact(&mut header_buf) {
64 Ok(()) => {}
65 Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
66 return Ok(None); }
68 Err(e) => return Err(e),
69 }
70
71 let header = RecordHeader::from_bytes(&header_buf);
72
73 if header.validate(self.offset - HEADER_SIZE as u64).is_err() {
75 return Ok(None);
77 }
78
79 let mut payload = vec![0u8; header.payload_len as usize];
81 if !payload.is_empty() {
82 match self.read_exact(&mut payload) {
83 Ok(()) => {}
84 Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
85 return Ok(None);
86 }
87 Err(e) => return Err(e),
88 }
89 }
90
91 let record = WalRecord { header, payload };
92
93 if record.verify_checksum().is_err() {
95 if let Some(dwb) = &mut self.double_write
96 && let Ok(Some(recovered)) = dwb.recover_record(header.lsn)
97 {
98 tracing::info!(
99 lsn = header.lsn,
100 "recovered torn write from double-write buffer"
101 );
102 self.offset += recovered.payload.len() as u64;
103 return Ok(Some(recovered));
104 }
105 return Ok(None);
106 }
107
108 let logical_type = record.logical_record_type();
110 if RecordType::from_raw(logical_type).is_none() {
111 if RecordType::is_required(logical_type) {
112 return Err(WalError::UnknownRequiredRecordType {
113 record_type: header.record_type,
114 lsn: header.lsn,
115 });
116 }
117 continue;
119 }
120
121 return Ok(Some(record));
122 }
123 }
124
125 pub fn records(self) -> WalRecordIter {
127 WalRecordIter { reader: self }
128 }
129
130 pub fn offset(&self) -> u64 {
132 self.offset
133 }
134
135 fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
136 self.file.read_exact(buf)?;
137 self.offset += buf.len() as u64;
138 Ok(())
139 }
140}
141
142pub struct WalRecordIter {
144 reader: WalReader,
145}
146
147impl Iterator for WalRecordIter {
148 type Item = Result<WalRecord>;
149
150 fn next(&mut self) -> Option<Self::Item> {
151 match self.reader.next_record() {
152 Ok(Some(record)) => Some(Ok(record)),
153 Ok(None) => None,
154 Err(e) => Some(Err(e)),
155 }
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use crate::record::RecordType;
163 use crate::writer::WalWriter;
164
165 #[test]
166 fn write_then_read_roundtrip() {
167 let dir = tempfile::tempdir().unwrap();
168 let path = dir.path().join("test.wal");
169
170 {
172 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
173 writer
174 .append(RecordType::Put as u16, 1, 0, b"first")
175 .unwrap();
176 writer
177 .append(RecordType::Put as u16, 2, 1, b"second")
178 .unwrap();
179 writer
180 .append(RecordType::Delete as u16, 1, 0, b"third")
181 .unwrap();
182 writer.sync().unwrap();
183 }
184
185 let reader = WalReader::open(&path).unwrap();
187 let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
188
189 assert_eq!(records.len(), 3);
190 assert_eq!(records[0].header.lsn, 1);
191 assert_eq!(records[0].header.tenant_id, 1);
192 assert_eq!(records[0].payload, b"first");
193
194 assert_eq!(records[1].header.lsn, 2);
195 assert_eq!(records[1].header.tenant_id, 2);
196 assert_eq!(records[1].header.vshard_id, 1);
197 assert_eq!(records[1].payload, b"second");
198
199 assert_eq!(records[2].header.lsn, 3);
200 assert_eq!(records[2].header.record_type, RecordType::Delete as u16);
201 assert_eq!(records[2].payload, b"third");
202 }
203
204 #[test]
205 fn empty_wal_yields_no_records() {
206 let dir = tempfile::tempdir().unwrap();
207 let path = dir.path().join("empty.wal");
208
209 {
211 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
212 writer.sync().unwrap();
213 }
214
215 let reader = WalReader::open(&path).unwrap();
216 let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
217 assert!(records.is_empty());
218 }
219
220 #[test]
221 fn truncated_file_stops_at_committed_prefix() {
222 let dir = tempfile::tempdir().unwrap();
223 let path = dir.path().join("truncated.wal");
224
225 {
227 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
228 writer
229 .append(RecordType::Put as u16, 1, 0, b"good-record")
230 .unwrap();
231 writer.sync().unwrap();
232 }
233
234 {
236 use std::io::Write;
237 let mut file = std::fs::OpenOptions::new()
238 .append(true)
239 .open(&path)
240 .unwrap();
241 file.write_all(b"GARBAGE_PARTIAL_RECORD").unwrap();
242 }
243
244 let reader = WalReader::open(&path).unwrap();
246 let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
247 assert_eq!(records.len(), 1);
248 assert_eq!(records[0].payload, b"good-record");
249 }
250
251 #[test]
252 fn skip_many_unknown_optional_records_is_iterative() {
253 const UNKNOWN_OPTIONAL: u16 = 99; const SKIP_COUNT: usize = 50_000;
262
263 let dir = tempfile::tempdir().unwrap();
264 let path = dir.path().join("many_unknown.wal");
265
266 {
267 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
268 for _ in 0..SKIP_COUNT {
269 writer.append(UNKNOWN_OPTIONAL, 1, 0, b"skip-me").unwrap();
270 }
271 writer
272 .append(RecordType::Put as u16, 1, 0, b"keep-me")
273 .unwrap();
274 writer.sync().unwrap();
275 }
276
277 let reader = WalReader::open(&path).unwrap();
278 let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
279
280 assert_eq!(records.len(), 1);
283 assert_eq!(records[0].payload, b"keep-me");
284 }
285}