nodedb_wal/
lazy_reader.rs1use std::fs::File;
27use std::io::{Read, Seek, SeekFrom};
28use std::path::Path;
29
30use crate::error::{Result, WalError};
31use crate::record::{HEADER_SIZE, RecordHeader, WalRecord};
32
33pub struct LazyWalReader {
35 file: File,
36 offset: u64,
37 double_write: Option<crate::double_write::DoubleWriteBuffer>,
38}
39
40impl LazyWalReader {
41 pub fn open(path: &Path) -> Result<Self> {
43 let file = File::open(path)?;
44 let dwb_path = path.with_extension("dwb");
45 let double_write = if dwb_path.exists() {
46 crate::double_write::DoubleWriteBuffer::open(&dwb_path).ok()
47 } else {
48 None
49 };
50 Ok(Self {
51 file,
52 offset: 0,
53 double_write,
54 })
55 }
56
57 pub fn next_header(&mut self) -> Result<Option<RecordHeader>> {
63 let mut header_buf = [0u8; HEADER_SIZE];
64 match self.read_exact(&mut header_buf) {
65 Ok(()) => {}
66 Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
67 return Ok(None);
68 }
69 Err(e) => return Err(e),
70 }
71
72 let header = RecordHeader::from_bytes(&header_buf);
73
74 if header.validate(self.offset - HEADER_SIZE as u64).is_err() {
75 return Ok(None);
76 }
77
78 let logical_type = header.logical_record_type();
80 if crate::record::RecordType::from_raw(logical_type).is_none()
81 && crate::record::RecordType::is_required(logical_type)
82 {
83 return Err(WalError::UnknownRequiredRecordType {
84 record_type: header.record_type,
85 lsn: header.lsn,
86 });
87 }
88
89 Ok(Some(header))
90 }
91
92 pub fn read_payload(&mut self, header: &RecordHeader) -> Result<Vec<u8>> {
98 let mut payload = vec![0u8; header.payload_len as usize];
99 if !payload.is_empty() {
100 match self.read_exact(&mut payload) {
101 Ok(()) => {}
102 Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
103 return Err(WalError::Io(std::io::Error::new(
104 std::io::ErrorKind::UnexpectedEof,
105 "torn write: incomplete payload",
106 )));
107 }
108 Err(e) => return Err(e),
109 }
110 }
111
112 let record = WalRecord {
114 header: *header,
115 payload: payload.clone(),
116 };
117 if record.verify_checksum().is_err() {
118 if let Some(dwb) = &mut self.double_write
120 && let Ok(Some(recovered)) = dwb.recover_record(header.lsn)
121 {
122 return Ok(recovered.payload);
123 }
124 return Err(WalError::Io(std::io::Error::new(
125 std::io::ErrorKind::InvalidData,
126 "checksum mismatch",
127 )));
128 }
129
130 Ok(payload)
131 }
132
133 pub fn read_record(&mut self, header: &RecordHeader) -> Result<WalRecord> {
135 let payload = self.read_payload(header)?;
136 Ok(WalRecord {
137 header: *header,
138 payload,
139 })
140 }
141
142 pub fn skip_payload(&mut self, header: &RecordHeader) -> Result<()> {
146 let len = header.payload_len as u64;
147 if len > 0 {
148 self.file
149 .seek(SeekFrom::Current(len as i64))
150 .map_err(WalError::Io)?;
151 self.offset += len;
152 }
153 Ok(())
154 }
155
156 pub fn offset(&self) -> u64 {
158 self.offset
159 }
160
161 fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
162 self.file.read_exact(buf)?;
163 self.offset += buf.len() as u64;
164 Ok(())
165 }
166}
167
168pub fn replay_segment_lazy<F>(path: &Path, mut handler: F) -> Result<()>
173where
174 F: FnMut(&mut LazyWalReader, &RecordHeader) -> Result<()>,
175{
176 let mut reader = LazyWalReader::open(path)?;
177 while let Some(header) = reader.next_header()? {
178 handler(&mut reader, &header)?;
179 }
180 Ok(())
181}
182
183pub fn replay_all_segments_lazy<F>(wal_dir: &Path, mut handler: F) -> Result<()>
188where
189 F: FnMut(&mut LazyWalReader, &RecordHeader) -> Result<()>,
190{
191 let segments = crate::segment::discover_segments(wal_dir)?;
192 for seg in &segments {
193 replay_segment_lazy(&seg.path, &mut handler)?;
194 }
195 Ok(())
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use crate::record::RecordType;
202 use crate::writer::WalWriter;
203
204 #[test]
205 fn lazy_read_all_payloads() {
206 let dir = tempfile::tempdir().unwrap();
207 let path = dir.path().join("test.wal");
208
209 {
210 let mut w = WalWriter::open_without_direct_io(&path).unwrap();
211 w.append(RecordType::Put as u16, 1, 0, b"hello").unwrap();
212 w.append(RecordType::VectorPut as u16, 1, 0, b"vector-data")
213 .unwrap();
214 w.append(RecordType::Put as u16, 2, 1, b"world").unwrap();
215 w.sync().unwrap();
216 }
217
218 let mut reader = LazyWalReader::open(&path).unwrap();
219 let mut records = Vec::new();
220 while let Some(header) = reader.next_header().unwrap() {
221 let payload = reader.read_payload(&header).unwrap();
222 records.push((header, payload));
223 }
224 assert_eq!(records.len(), 3);
225 assert_eq!(records[0].1, b"hello");
226 assert_eq!(records[1].1, b"vector-data");
227 assert_eq!(records[2].1, b"world");
228 }
229
230 #[test]
231 fn lazy_skip_non_matching() {
232 let dir = tempfile::tempdir().unwrap();
233 let path = dir.path().join("test.wal");
234
235 {
236 let mut w = WalWriter::open_without_direct_io(&path).unwrap();
237 w.append(RecordType::TimeseriesBatch as u16, 1, 0, &[0u8; 10000])
239 .unwrap();
240 w.append(RecordType::TimeseriesBatch as u16, 1, 0, &[0u8; 10000])
241 .unwrap();
242 w.append(RecordType::VectorPut as u16, 1, 0, b"small-vec")
243 .unwrap();
244 w.append(RecordType::TimeseriesBatch as u16, 1, 0, &[0u8; 10000])
245 .unwrap();
246 w.sync().unwrap();
247 }
248
249 let mut reader = LazyWalReader::open(&path).unwrap();
251 let mut vector_payloads = Vec::new();
252 let mut skipped = 0;
253
254 while let Some(header) = reader.next_header().unwrap() {
255 let rt = RecordType::from_raw(header.record_type);
256 if rt == Some(RecordType::VectorPut) {
257 let payload = reader.read_payload(&header).unwrap();
258 vector_payloads.push(payload);
259 } else {
260 reader.skip_payload(&header).unwrap();
261 skipped += 1;
262 }
263 }
264
265 assert_eq!(vector_payloads.len(), 1);
266 assert_eq!(vector_payloads[0], b"small-vec");
267 assert_eq!(skipped, 3);
268 }
269
270 #[test]
271 fn replay_all_segments_lazy_works() {
272 let dir = tempfile::tempdir().unwrap();
273 let path = dir.path().join("wal-00000000000000000001.seg");
275
276 {
277 let mut w = WalWriter::open_without_direct_io(&path).unwrap();
278 w.append(RecordType::Put as u16, 1, 0, b"a").unwrap();
279 w.append(RecordType::Put as u16, 1, 0, b"b").unwrap();
280 w.sync().unwrap();
281 }
282
283 let mut count = 0;
284 replay_all_segments_lazy(dir.path(), |reader, header| {
285 reader.skip_payload(header)?;
286 count += 1;
287 Ok(())
288 })
289 .unwrap();
290 assert_eq!(count, 2);
291 }
292
293 #[test]
294 fn empty_wal_no_records() {
295 let dir = tempfile::tempdir().unwrap();
296 let path = dir.path().join("empty.wal");
297
298 {
299 let mut w = WalWriter::open_without_direct_io(&path).unwrap();
300 w.sync().unwrap();
301 }
302
303 let mut reader = LazyWalReader::open(&path).unwrap();
304 assert!(reader.next_header().unwrap().is_none());
305 }
306}