nodedb_wal/
lazy_reader.rs1use std::fs::File;
29use std::io::{Read, Seek, SeekFrom};
30use std::path::Path;
31
32use crate::error::{Result, WalError};
33use crate::record::{HEADER_SIZE, RecordHeader, WalRecord};
34
35pub struct LazyWalReader {
37 file: File,
38 offset: u64,
39 double_write: Option<crate::double_write::DoubleWriteBuffer>,
40}
41
42impl LazyWalReader {
43 pub fn open(path: &Path) -> Result<Self> {
45 let file = File::open(path)?;
46 let dwb_path = path.with_extension("dwb");
47 let double_write = if dwb_path.exists() {
48 crate::double_write::DoubleWriteBuffer::open(
49 &dwb_path,
50 crate::double_write::DwbMode::Buffered,
51 )
52 .ok()
53 } else {
54 None
55 };
56 Ok(Self {
57 file,
58 offset: 0,
59 double_write,
60 })
61 }
62
63 pub fn next_header(&mut self) -> Result<Option<RecordHeader>> {
69 let mut header_buf = [0u8; HEADER_SIZE];
70 match self.read_exact(&mut header_buf) {
71 Ok(()) => {}
72 Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
73 return Ok(None);
74 }
75 Err(e) => return Err(e),
76 }
77
78 let header = RecordHeader::from_bytes(&header_buf);
79
80 if header.validate(self.offset - HEADER_SIZE as u64).is_err() {
81 return Ok(None);
82 }
83
84 let logical_type = header.logical_record_type();
86 if crate::record::RecordType::from_raw(logical_type).is_none()
87 && crate::record::RecordType::is_required(logical_type)
88 {
89 return Err(WalError::UnknownRequiredRecordType {
90 record_type: header.record_type,
91 lsn: header.lsn,
92 });
93 }
94
95 Ok(Some(header))
96 }
97
98 pub fn read_payload(&mut self, header: &RecordHeader) -> Result<Vec<u8>> {
104 let mut payload = vec![0u8; header.payload_len as usize];
105 if !payload.is_empty() {
106 match self.read_exact(&mut payload) {
107 Ok(()) => {}
108 Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
109 return Err(WalError::Io(std::io::Error::new(
110 std::io::ErrorKind::UnexpectedEof,
111 "torn write: incomplete payload",
112 )));
113 }
114 Err(e) => return Err(e),
115 }
116 }
117
118 let record = WalRecord {
120 header: *header,
121 payload: payload.clone(),
122 };
123 if record.verify_checksum().is_err() {
124 if let Some(dwb) = &mut self.double_write
126 && let Ok(Some(recovered)) = dwb.recover_record(header.lsn)
127 {
128 return Ok(recovered.payload);
129 }
130 return Err(WalError::Io(std::io::Error::new(
131 std::io::ErrorKind::InvalidData,
132 "checksum mismatch",
133 )));
134 }
135
136 Ok(payload)
137 }
138
139 pub fn read_record(&mut self, header: &RecordHeader) -> Result<WalRecord> {
141 let payload = self.read_payload(header)?;
142 Ok(WalRecord {
143 header: *header,
144 payload,
145 })
146 }
147
148 pub fn skip_payload(&mut self, header: &RecordHeader) -> Result<()> {
152 let len = header.payload_len as u64;
153 if len > 0 {
154 self.file
155 .seek(SeekFrom::Current(len as i64))
156 .map_err(WalError::Io)?;
157 self.offset += len;
158 }
159 Ok(())
160 }
161
162 pub fn offset(&self) -> u64 {
164 self.offset
165 }
166
167 fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
168 self.file.read_exact(buf)?;
169 self.offset += buf.len() as u64;
170 Ok(())
171 }
172}
173
174pub fn replay_segment_lazy<F>(path: &Path, mut handler: F) -> Result<()>
179where
180 F: FnMut(&mut LazyWalReader, &RecordHeader) -> Result<()>,
181{
182 let mut reader = LazyWalReader::open(path)?;
183 while let Some(header) = reader.next_header()? {
184 handler(&mut reader, &header)?;
185 }
186 Ok(())
187}
188
189pub fn replay_all_segments_lazy<F>(wal_dir: &Path, mut handler: F) -> Result<()>
194where
195 F: FnMut(&mut LazyWalReader, &RecordHeader) -> Result<()>,
196{
197 let segments = crate::segment::discover_segments(wal_dir)?;
198 for seg in &segments {
199 replay_segment_lazy(&seg.path, &mut handler)?;
200 }
201 Ok(())
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use crate::record::RecordType;
208 use crate::writer::WalWriter;
209
210 #[test]
211 fn lazy_read_all_payloads() {
212 let dir = tempfile::tempdir().unwrap();
213 let path = dir.path().join("test.wal");
214
215 {
216 let mut w = WalWriter::open_without_direct_io(&path).unwrap();
217 w.append(RecordType::Put as u32, 1, 0, 0, b"hello").unwrap();
218 w.append(RecordType::VectorPut as u32, 1, 0, 0, b"vector-data")
219 .unwrap();
220 w.append(RecordType::Put as u32, 2, 1, 0, b"world").unwrap();
221 w.sync().unwrap();
222 }
223
224 let mut reader = LazyWalReader::open(&path).unwrap();
225 let mut records = Vec::new();
226 while let Some(header) = reader.next_header().unwrap() {
227 let payload = reader.read_payload(&header).unwrap();
228 records.push((header, payload));
229 }
230 assert_eq!(records.len(), 3);
231 assert_eq!(records[0].1, b"hello");
232 assert_eq!(records[1].1, b"vector-data");
233 assert_eq!(records[2].1, b"world");
234 }
235
236 #[test]
237 fn lazy_skip_non_matching() {
238 let dir = tempfile::tempdir().unwrap();
239 let path = dir.path().join("test.wal");
240
241 {
242 let mut w = WalWriter::open_without_direct_io(&path).unwrap();
243 w.append(RecordType::TimeseriesBatch as u32, 1, 0, 0, &[0u8; 10000])
245 .unwrap();
246 w.append(RecordType::TimeseriesBatch as u32, 1, 0, 0, &[0u8; 10000])
247 .unwrap();
248 w.append(RecordType::VectorPut as u32, 1, 0, 0, b"small-vec")
249 .unwrap();
250 w.append(RecordType::TimeseriesBatch as u32, 1, 0, 0, &[0u8; 10000])
251 .unwrap();
252 w.sync().unwrap();
253 }
254
255 let mut reader = LazyWalReader::open(&path).unwrap();
257 let mut vector_payloads = Vec::new();
258 let mut skipped = 0;
259
260 while let Some(header) = reader.next_header().unwrap() {
261 let rt = RecordType::from_raw(header.record_type);
262 if rt == Some(RecordType::VectorPut) {
263 let payload = reader.read_payload(&header).unwrap();
264 vector_payloads.push(payload);
265 } else {
266 reader.skip_payload(&header).unwrap();
267 skipped += 1;
268 }
269 }
270
271 assert_eq!(vector_payloads.len(), 1);
272 assert_eq!(vector_payloads[0], b"small-vec");
273 assert_eq!(skipped, 3);
274 }
275
276 #[test]
277 fn replay_all_segments_lazy_works() {
278 let dir = tempfile::tempdir().unwrap();
279 let path = dir.path().join("wal-00000000000000000001.seg");
281
282 {
283 let mut w = WalWriter::open_without_direct_io(&path).unwrap();
284 w.append(RecordType::Put as u32, 1, 0, 0, b"a").unwrap();
285 w.append(RecordType::Put as u32, 1, 0, 0, b"b").unwrap();
286 w.sync().unwrap();
287 }
288
289 let mut count = 0;
290 replay_all_segments_lazy(dir.path(), |reader, header| {
291 reader.skip_payload(header)?;
292 count += 1;
293 Ok(())
294 })
295 .unwrap();
296 assert_eq!(count, 2);
297 }
298
299 #[test]
300 fn empty_wal_no_records() {
301 let dir = tempfile::tempdir().unwrap();
302 let path = dir.path().join("empty.wal");
303
304 {
305 let mut w = WalWriter::open_without_direct_io(&path).unwrap();
306 w.sync().unwrap();
307 }
308
309 let mut reader = LazyWalReader::open(&path).unwrap();
310 assert!(reader.next_header().unwrap().is_none());
311 }
312}