Skip to main content

nodedb_wal/
lazy_reader.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Lazy WAL reader: reads headers without payload for selective replay.
4//!
5//! Unlike `WalReader` which reads every payload into a `Vec<u8>`, this
6//! reader reads only the 30-byte header first. The caller inspects the
7//! header (record_type, vshard_id, lsn) and decides whether to read or
8//! skip the payload.
9//!
10//! This is critical for startup replay performance: with 100M timeseries
11//! records, a vector core can skip TS payloads (potentially GBs) by
12//! seeking forward instead of allocating and reading.
13//!
14//! ## Usage
15//!
16//! ```text
17//! let mut reader = LazyWalReader::open(path)?;
18//! while let Some(header) = reader.next_header()? {
19//!     if header.record_type == RecordType::VectorPut as u32 {
20//!         let payload = reader.read_payload(&header)?;
21//!         // process vector record
22//!     } else {
23//!         reader.skip_payload(&header)?;
24//!     }
25//! }
26//! ```
27
28use std::fs::File;
29use std::io::{Read, Seek, SeekFrom};
30use std::path::Path;
31
32use crate::error::{Result, WalError};
33use crate::record::{HEADER_SIZE, RecordHeader, WalRecord};
34
35/// Lazy WAL reader that separates header reading from payload reading.
36pub struct LazyWalReader {
37    file: File,
38    offset: u64,
39    double_write: Option<crate::double_write::DoubleWriteBuffer>,
40}
41
42impl LazyWalReader {
43    /// Open a WAL file for lazy reading.
44    pub fn open(path: &Path) -> Result<Self> {
45        let file = File::open(path)?;
46        let dwb_path = path.with_extension("dwb");
47        let double_write = if dwb_path.exists() {
48            crate::double_write::DoubleWriteBuffer::open(
49                &dwb_path,
50                crate::double_write::DwbMode::Buffered,
51            )
52            .ok()
53        } else {
54            None
55        };
56        Ok(Self {
57            file,
58            offset: 0,
59            double_write,
60        })
61    }
62
63    /// Read the next record header (30 bytes) without reading the payload.
64    ///
65    /// Returns `None` at EOF or first corruption. After this call, use
66    /// either `read_payload()` to get the payload or `skip_payload()` to
67    /// seek past it.
68    pub fn next_header(&mut self) -> Result<Option<RecordHeader>> {
69        let mut header_buf = [0u8; HEADER_SIZE];
70        match self.read_exact(&mut header_buf) {
71            Ok(()) => {}
72            Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
73                return Ok(None);
74            }
75            Err(e) => return Err(e),
76        }
77
78        let header = RecordHeader::from_bytes(&header_buf);
79
80        if header.validate(self.offset - HEADER_SIZE as u64).is_err() {
81            return Ok(None);
82        }
83
84        // Check for unknown required record types.
85        let logical_type = header.logical_record_type();
86        if crate::record::RecordType::from_raw(logical_type).is_none()
87            && crate::record::RecordType::is_required(logical_type)
88        {
89            return Err(WalError::UnknownRequiredRecordType {
90                record_type: header.record_type,
91                lsn: header.lsn,
92            });
93        }
94
95        Ok(Some(header))
96    }
97
98    /// Read the payload for a header that was just returned by `next_header()`.
99    ///
100    /// Must be called exactly once after `next_header()` returns `Some`,
101    /// and before calling `next_header()` again (unless `skip_payload()`
102    /// was called instead).
103    pub fn read_payload(&mut self, header: &RecordHeader) -> Result<Vec<u8>> {
104        let mut payload = vec![0u8; header.payload_len as usize];
105        if !payload.is_empty() {
106            match self.read_exact(&mut payload) {
107                Ok(()) => {}
108                Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
109                    return Err(WalError::Io(std::io::Error::new(
110                        std::io::ErrorKind::UnexpectedEof,
111                        "torn write: incomplete payload",
112                    )));
113                }
114                Err(e) => return Err(e),
115            }
116        }
117
118        // Verify checksum.
119        let record = WalRecord {
120            header: *header,
121            payload: payload.clone(),
122        };
123        if record.verify_checksum().is_err() {
124            // Try double-write buffer recovery.
125            if let Some(dwb) = &mut self.double_write
126                && let Ok(Some(recovered)) = dwb.recover_record(header.lsn)
127            {
128                return Ok(recovered.payload);
129            }
130            return Err(WalError::Io(std::io::Error::new(
131                std::io::ErrorKind::InvalidData,
132                "checksum mismatch",
133            )));
134        }
135
136        Ok(payload)
137    }
138
139    /// Read the payload and return a full WalRecord.
140    pub fn read_record(&mut self, header: &RecordHeader) -> Result<WalRecord> {
141        let payload = self.read_payload(header)?;
142        Ok(WalRecord {
143            header: *header,
144            payload,
145        })
146    }
147
148    /// Skip the payload for a header, seeking forward without reading.
149    ///
150    /// This is the key optimization: non-matching records skip I/O entirely.
151    pub fn skip_payload(&mut self, header: &RecordHeader) -> Result<()> {
152        let len = header.payload_len as u64;
153        if len > 0 {
154            self.file
155                .seek(SeekFrom::Current(len as i64))
156                .map_err(WalError::Io)?;
157            self.offset += len;
158        }
159        Ok(())
160    }
161
162    /// Current file offset.
163    pub fn offset(&self) -> u64 {
164        self.offset
165    }
166
167    fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
168        self.file.read_exact(buf)?;
169        self.offset += buf.len() as u64;
170        Ok(())
171    }
172}
173
174/// Open a WAL segment for lazy reading and iterate with a callback.
175///
176/// Convenience function for single-pass replay: the callback receives each
177/// header and decides whether to read or skip the payload.
178pub fn replay_segment_lazy<F>(path: &Path, mut handler: F) -> Result<()>
179where
180    F: FnMut(&mut LazyWalReader, &RecordHeader) -> Result<()>,
181{
182    let mut reader = LazyWalReader::open(path)?;
183    while let Some(header) = reader.next_header()? {
184        handler(&mut reader, &header)?;
185    }
186    Ok(())
187}
188
189/// Replay all WAL segments in a directory with lazy reading.
190///
191/// Segments are read in LSN order. The callback decides per-record whether
192/// to read or skip the payload.
193pub fn replay_all_segments_lazy<F>(wal_dir: &Path, mut handler: F) -> Result<()>
194where
195    F: FnMut(&mut LazyWalReader, &RecordHeader) -> Result<()>,
196{
197    let segments = crate::segment::discover_segments(wal_dir)?;
198    for seg in &segments {
199        replay_segment_lazy(&seg.path, &mut handler)?;
200    }
201    Ok(())
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207    use crate::record::RecordType;
208    use crate::writer::WalWriter;
209
210    #[test]
211    fn lazy_read_all_payloads() {
212        let dir = tempfile::tempdir().unwrap();
213        let path = dir.path().join("test.wal");
214
215        {
216            let mut w = WalWriter::open_without_direct_io(&path).unwrap();
217            w.append(RecordType::Put as u32, 1, 0, 0, b"hello").unwrap();
218            w.append(RecordType::VectorPut as u32, 1, 0, 0, b"vector-data")
219                .unwrap();
220            w.append(RecordType::Put as u32, 2, 1, 0, b"world").unwrap();
221            w.sync().unwrap();
222        }
223
224        let mut reader = LazyWalReader::open(&path).unwrap();
225        let mut records = Vec::new();
226        while let Some(header) = reader.next_header().unwrap() {
227            let payload = reader.read_payload(&header).unwrap();
228            records.push((header, payload));
229        }
230        assert_eq!(records.len(), 3);
231        assert_eq!(records[0].1, b"hello");
232        assert_eq!(records[1].1, b"vector-data");
233        assert_eq!(records[2].1, b"world");
234    }
235
236    #[test]
237    fn lazy_skip_non_matching() {
238        let dir = tempfile::tempdir().unwrap();
239        let path = dir.path().join("test.wal");
240
241        {
242            let mut w = WalWriter::open_without_direct_io(&path).unwrap();
243            // 3 big TS records, 1 small vector record.
244            w.append(RecordType::TimeseriesBatch as u32, 1, 0, 0, &[0u8; 10000])
245                .unwrap();
246            w.append(RecordType::TimeseriesBatch as u32, 1, 0, 0, &[0u8; 10000])
247                .unwrap();
248            w.append(RecordType::VectorPut as u32, 1, 0, 0, b"small-vec")
249                .unwrap();
250            w.append(RecordType::TimeseriesBatch as u32, 1, 0, 0, &[0u8; 10000])
251                .unwrap();
252            w.sync().unwrap();
253        }
254
255        // A "vector core" reads only VectorPut, skips TimeseriesBatch.
256        let mut reader = LazyWalReader::open(&path).unwrap();
257        let mut vector_payloads = Vec::new();
258        let mut skipped = 0;
259
260        while let Some(header) = reader.next_header().unwrap() {
261            let rt = RecordType::from_raw(header.record_type);
262            if rt == Some(RecordType::VectorPut) {
263                let payload = reader.read_payload(&header).unwrap();
264                vector_payloads.push(payload);
265            } else {
266                reader.skip_payload(&header).unwrap();
267                skipped += 1;
268            }
269        }
270
271        assert_eq!(vector_payloads.len(), 1);
272        assert_eq!(vector_payloads[0], b"small-vec");
273        assert_eq!(skipped, 3);
274    }
275
276    #[test]
277    fn replay_all_segments_lazy_works() {
278        let dir = tempfile::tempdir().unwrap();
279        // Use proper segment filename pattern: wal-{lsn:020}.seg
280        let path = dir.path().join("wal-00000000000000000001.seg");
281
282        {
283            let mut w = WalWriter::open_without_direct_io(&path).unwrap();
284            w.append(RecordType::Put as u32, 1, 0, 0, b"a").unwrap();
285            w.append(RecordType::Put as u32, 1, 0, 0, b"b").unwrap();
286            w.sync().unwrap();
287        }
288
289        let mut count = 0;
290        replay_all_segments_lazy(dir.path(), |reader, header| {
291            reader.skip_payload(header)?;
292            count += 1;
293            Ok(())
294        })
295        .unwrap();
296        assert_eq!(count, 2);
297    }
298
299    #[test]
300    fn empty_wal_no_records() {
301        let dir = tempfile::tempdir().unwrap();
302        let path = dir.path().join("empty.wal");
303
304        {
305            let mut w = WalWriter::open_without_direct_io(&path).unwrap();
306            w.sync().unwrap();
307        }
308
309        let mut reader = LazyWalReader::open(&path).unwrap();
310        assert!(reader.next_header().unwrap().is_none());
311    }
312}