1use std::fs::File;
17use std::io::Read;
18use std::path::Path;
19
20use crate::error::{Result, WalError};
21use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WalRecord};
22
23pub struct WalReader {
25 file: File,
26 offset: u64,
27 double_write: Option<crate::double_write::DoubleWriteBuffer>,
29}
30
31impl WalReader {
32 pub fn open(path: &Path) -> Result<Self> {
37 let file = File::open(path)?;
38 let dwb_path = path.with_extension("dwb");
39 let double_write = if dwb_path.exists() {
40 crate::double_write::DoubleWriteBuffer::open(&dwb_path).ok()
41 } else {
42 None
43 };
44 Ok(Self {
45 file,
46 offset: 0,
47 double_write,
48 })
49 }
50
51 pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
56 let mut header_buf = [0u8; HEADER_SIZE];
58 match self.read_exact(&mut header_buf) {
59 Ok(()) => {}
60 Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
61 return Ok(None); }
63 Err(e) => return Err(e),
64 }
65
66 let header = RecordHeader::from_bytes(&header_buf);
67
68 if header.validate(self.offset - HEADER_SIZE as u64).is_err() {
70 return Ok(None);
72 }
73
74 let mut payload = vec![0u8; header.payload_len as usize];
76 if !payload.is_empty() {
77 match self.read_exact(&mut payload) {
78 Ok(()) => {}
79 Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
80 return Ok(None);
82 }
83 Err(e) => return Err(e),
84 }
85 }
86
87 let record = WalRecord { header, payload };
88
89 if record.verify_checksum().is_err() {
91 if let Some(dwb) = &mut self.double_write
94 && let Ok(Some(recovered)) = dwb.recover_record(header.lsn)
95 {
96 tracing::info!(
97 lsn = header.lsn,
98 "recovered torn write from double-write buffer"
99 );
100 self.offset += recovered.payload.len() as u64;
101 return Ok(Some(recovered));
102 }
103 return Ok(None);
105 }
106
107 let logical_type = record.logical_record_type();
109 if RecordType::from_raw(logical_type).is_none() {
110 if RecordType::is_required(logical_type) {
111 return Err(WalError::UnknownRequiredRecordType {
112 record_type: header.record_type,
113 lsn: header.lsn,
114 });
115 }
116 return self.next_record();
119 }
120
121 Ok(Some(record))
122 }
123
124 pub fn records(self) -> WalRecordIter {
126 WalRecordIter { reader: self }
127 }
128
129 pub fn offset(&self) -> u64 {
131 self.offset
132 }
133
134 fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
135 self.file.read_exact(buf)?;
136 self.offset += buf.len() as u64;
137 Ok(())
138 }
139}
140
141pub struct WalRecordIter {
143 reader: WalReader,
144}
145
146impl Iterator for WalRecordIter {
147 type Item = Result<WalRecord>;
148
149 fn next(&mut self) -> Option<Self::Item> {
150 match self.reader.next_record() {
151 Ok(Some(record)) => Some(Ok(record)),
152 Ok(None) => None,
153 Err(e) => Some(Err(e)),
154 }
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use crate::record::RecordType;
162 use crate::writer::WalWriter;
163
164 #[test]
165 fn write_then_read_roundtrip() {
166 let dir = tempfile::tempdir().unwrap();
167 let path = dir.path().join("test.wal");
168
169 {
171 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
172 writer
173 .append(RecordType::Put as u16, 1, 0, b"first")
174 .unwrap();
175 writer
176 .append(RecordType::Put as u16, 2, 1, b"second")
177 .unwrap();
178 writer
179 .append(RecordType::Delete as u16, 1, 0, b"third")
180 .unwrap();
181 writer.sync().unwrap();
182 }
183
184 let reader = WalReader::open(&path).unwrap();
186 let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
187
188 assert_eq!(records.len(), 3);
189 assert_eq!(records[0].header.lsn, 1);
190 assert_eq!(records[0].header.tenant_id, 1);
191 assert_eq!(records[0].payload, b"first");
192
193 assert_eq!(records[1].header.lsn, 2);
194 assert_eq!(records[1].header.tenant_id, 2);
195 assert_eq!(records[1].header.vshard_id, 1);
196 assert_eq!(records[1].payload, b"second");
197
198 assert_eq!(records[2].header.lsn, 3);
199 assert_eq!(records[2].header.record_type, RecordType::Delete as u16);
200 assert_eq!(records[2].payload, b"third");
201 }
202
203 #[test]
204 fn empty_wal_yields_no_records() {
205 let dir = tempfile::tempdir().unwrap();
206 let path = dir.path().join("empty.wal");
207
208 {
210 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
211 writer.sync().unwrap();
212 }
213
214 let reader = WalReader::open(&path).unwrap();
215 let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
216 assert!(records.is_empty());
217 }
218
219 #[test]
220 fn truncated_file_stops_at_committed_prefix() {
221 let dir = tempfile::tempdir().unwrap();
222 let path = dir.path().join("truncated.wal");
223
224 {
226 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
227 writer
228 .append(RecordType::Put as u16, 1, 0, b"good-record")
229 .unwrap();
230 writer.sync().unwrap();
231 }
232
233 {
235 use std::io::Write;
236 let mut file = std::fs::OpenOptions::new()
237 .append(true)
238 .open(&path)
239 .unwrap();
240 file.write_all(b"GARBAGE_PARTIAL_RECORD").unwrap();
241 }
242
243 let reader = WalReader::open(&path).unwrap();
245 let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
246 assert_eq!(records.len(), 1);
247 assert_eq!(records[0].payload, b"good-record");
248 }
249}