nodedb_wal/
lazy_reader.rs1use std::fs::File;
27use std::io::{Read, Seek, SeekFrom};
28use std::path::Path;
29
30use crate::error::{Result, WalError};
31use crate::record::{HEADER_SIZE, RecordHeader, WalRecord};
32
33pub struct LazyWalReader {
35 file: File,
36 offset: u64,
37 double_write: Option<crate::double_write::DoubleWriteBuffer>,
38}
39
40impl LazyWalReader {
41 pub fn open(path: &Path) -> Result<Self> {
43 let file = File::open(path)?;
44 let dwb_path = path.with_extension("dwb");
45 let double_write = if dwb_path.exists() {
46 crate::double_write::DoubleWriteBuffer::open(
47 &dwb_path,
48 crate::double_write::DwbMode::Buffered,
49 )
50 .ok()
51 } else {
52 None
53 };
54 Ok(Self {
55 file,
56 offset: 0,
57 double_write,
58 })
59 }
60
61 pub fn next_header(&mut self) -> Result<Option<RecordHeader>> {
67 let mut header_buf = [0u8; HEADER_SIZE];
68 match self.read_exact(&mut header_buf) {
69 Ok(()) => {}
70 Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
71 return Ok(None);
72 }
73 Err(e) => return Err(e),
74 }
75
76 let header = RecordHeader::from_bytes(&header_buf);
77
78 if header.validate(self.offset - HEADER_SIZE as u64).is_err() {
79 return Ok(None);
80 }
81
82 let logical_type = header.logical_record_type();
84 if crate::record::RecordType::from_raw(logical_type).is_none()
85 && crate::record::RecordType::is_required(logical_type)
86 {
87 return Err(WalError::UnknownRequiredRecordType {
88 record_type: header.record_type,
89 lsn: header.lsn,
90 });
91 }
92
93 Ok(Some(header))
94 }
95
96 pub fn read_payload(&mut self, header: &RecordHeader) -> Result<Vec<u8>> {
102 let mut payload = vec![0u8; header.payload_len as usize];
103 if !payload.is_empty() {
104 match self.read_exact(&mut payload) {
105 Ok(()) => {}
106 Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
107 return Err(WalError::Io(std::io::Error::new(
108 std::io::ErrorKind::UnexpectedEof,
109 "torn write: incomplete payload",
110 )));
111 }
112 Err(e) => return Err(e),
113 }
114 }
115
116 let record = WalRecord {
118 header: *header,
119 payload: payload.clone(),
120 };
121 if record.verify_checksum().is_err() {
122 if let Some(dwb) = &mut self.double_write
124 && let Ok(Some(recovered)) = dwb.recover_record(header.lsn)
125 {
126 return Ok(recovered.payload);
127 }
128 return Err(WalError::Io(std::io::Error::new(
129 std::io::ErrorKind::InvalidData,
130 "checksum mismatch",
131 )));
132 }
133
134 Ok(payload)
135 }
136
137 pub fn read_record(&mut self, header: &RecordHeader) -> Result<WalRecord> {
139 let payload = self.read_payload(header)?;
140 Ok(WalRecord {
141 header: *header,
142 payload,
143 })
144 }
145
146 pub fn skip_payload(&mut self, header: &RecordHeader) -> Result<()> {
150 let len = header.payload_len as u64;
151 if len > 0 {
152 self.file
153 .seek(SeekFrom::Current(len as i64))
154 .map_err(WalError::Io)?;
155 self.offset += len;
156 }
157 Ok(())
158 }
159
160 pub fn offset(&self) -> u64 {
162 self.offset
163 }
164
165 fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
166 self.file.read_exact(buf)?;
167 self.offset += buf.len() as u64;
168 Ok(())
169 }
170}
171
172pub fn replay_segment_lazy<F>(path: &Path, mut handler: F) -> Result<()>
177where
178 F: FnMut(&mut LazyWalReader, &RecordHeader) -> Result<()>,
179{
180 let mut reader = LazyWalReader::open(path)?;
181 while let Some(header) = reader.next_header()? {
182 handler(&mut reader, &header)?;
183 }
184 Ok(())
185}
186
187pub fn replay_all_segments_lazy<F>(wal_dir: &Path, mut handler: F) -> Result<()>
192where
193 F: FnMut(&mut LazyWalReader, &RecordHeader) -> Result<()>,
194{
195 let segments = crate::segment::discover_segments(wal_dir)?;
196 for seg in &segments {
197 replay_segment_lazy(&seg.path, &mut handler)?;
198 }
199 Ok(())
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::record::RecordType;
206 use crate::writer::WalWriter;
207
208 #[test]
209 fn lazy_read_all_payloads() {
210 let dir = tempfile::tempdir().unwrap();
211 let path = dir.path().join("test.wal");
212
213 {
214 let mut w = WalWriter::open_without_direct_io(&path).unwrap();
215 w.append(RecordType::Put as u16, 1, 0, b"hello").unwrap();
216 w.append(RecordType::VectorPut as u16, 1, 0, b"vector-data")
217 .unwrap();
218 w.append(RecordType::Put as u16, 2, 1, b"world").unwrap();
219 w.sync().unwrap();
220 }
221
222 let mut reader = LazyWalReader::open(&path).unwrap();
223 let mut records = Vec::new();
224 while let Some(header) = reader.next_header().unwrap() {
225 let payload = reader.read_payload(&header).unwrap();
226 records.push((header, payload));
227 }
228 assert_eq!(records.len(), 3);
229 assert_eq!(records[0].1, b"hello");
230 assert_eq!(records[1].1, b"vector-data");
231 assert_eq!(records[2].1, b"world");
232 }
233
234 #[test]
235 fn lazy_skip_non_matching() {
236 let dir = tempfile::tempdir().unwrap();
237 let path = dir.path().join("test.wal");
238
239 {
240 let mut w = WalWriter::open_without_direct_io(&path).unwrap();
241 w.append(RecordType::TimeseriesBatch as u16, 1, 0, &[0u8; 10000])
243 .unwrap();
244 w.append(RecordType::TimeseriesBatch as u16, 1, 0, &[0u8; 10000])
245 .unwrap();
246 w.append(RecordType::VectorPut as u16, 1, 0, b"small-vec")
247 .unwrap();
248 w.append(RecordType::TimeseriesBatch as u16, 1, 0, &[0u8; 10000])
249 .unwrap();
250 w.sync().unwrap();
251 }
252
253 let mut reader = LazyWalReader::open(&path).unwrap();
255 let mut vector_payloads = Vec::new();
256 let mut skipped = 0;
257
258 while let Some(header) = reader.next_header().unwrap() {
259 let rt = RecordType::from_raw(header.record_type);
260 if rt == Some(RecordType::VectorPut) {
261 let payload = reader.read_payload(&header).unwrap();
262 vector_payloads.push(payload);
263 } else {
264 reader.skip_payload(&header).unwrap();
265 skipped += 1;
266 }
267 }
268
269 assert_eq!(vector_payloads.len(), 1);
270 assert_eq!(vector_payloads[0], b"small-vec");
271 assert_eq!(skipped, 3);
272 }
273
274 #[test]
275 fn replay_all_segments_lazy_works() {
276 let dir = tempfile::tempdir().unwrap();
277 let path = dir.path().join("wal-00000000000000000001.seg");
279
280 {
281 let mut w = WalWriter::open_without_direct_io(&path).unwrap();
282 w.append(RecordType::Put as u16, 1, 0, b"a").unwrap();
283 w.append(RecordType::Put as u16, 1, 0, b"b").unwrap();
284 w.sync().unwrap();
285 }
286
287 let mut count = 0;
288 replay_all_segments_lazy(dir.path(), |reader, header| {
289 reader.skip_payload(header)?;
290 count += 1;
291 Ok(())
292 })
293 .unwrap();
294 assert_eq!(count, 2);
295 }
296
297 #[test]
298 fn empty_wal_no_records() {
299 let dir = tempfile::tempdir().unwrap();
300 let path = dir.path().join("empty.wal");
301
302 {
303 let mut w = WalWriter::open_without_direct_io(&path).unwrap();
304 w.sync().unwrap();
305 }
306
307 let mut reader = LazyWalReader::open(&path).unwrap();
308 assert!(reader.next_header().unwrap().is_none());
309 }
310}