Skip to main content

nodedb_wal/
lazy_reader.rs

1//! Lazy WAL reader: reads headers without payload for selective replay.
2//!
3//! Unlike `WalReader` which reads every payload into a `Vec<u8>`, this
4//! reader reads only the 30-byte header first. The caller inspects the
5//! header (record_type, vshard_id, lsn) and decides whether to read or
6//! skip the payload.
7//!
8//! This is critical for startup replay performance: with 100M timeseries
9//! records, a vector core can skip TS payloads (potentially GBs) by
10//! seeking forward instead of allocating and reading.
11//!
12//! ## Usage
13//!
14//! ```text
15//! let mut reader = LazyWalReader::open(path)?;
16//! while let Some(header) = reader.next_header()? {
17//!     if header.record_type == RecordType::VectorPut as u16 {
18//!         let payload = reader.read_payload(&header)?;
19//!         // process vector record
20//!     } else {
21//!         reader.skip_payload(&header)?;
22//!     }
23//! }
24//! ```
25
26use std::fs::File;
27use std::io::{Read, Seek, SeekFrom};
28use std::path::Path;
29
30use crate::error::{Result, WalError};
31use crate::record::{HEADER_SIZE, RecordHeader, WalRecord};
32
33/// Lazy WAL reader that separates header reading from payload reading.
34pub struct LazyWalReader {
35    file: File,
36    offset: u64,
37    double_write: Option<crate::double_write::DoubleWriteBuffer>,
38}
39
40impl LazyWalReader {
41    /// Open a WAL file for lazy reading.
42    pub fn open(path: &Path) -> Result<Self> {
43        let file = File::open(path)?;
44        let dwb_path = path.with_extension("dwb");
45        let double_write = if dwb_path.exists() {
46            crate::double_write::DoubleWriteBuffer::open(
47                &dwb_path,
48                crate::double_write::DwbMode::Buffered,
49            )
50            .ok()
51        } else {
52            None
53        };
54        Ok(Self {
55            file,
56            offset: 0,
57            double_write,
58        })
59    }
60
61    /// Read the next record header (30 bytes) without reading the payload.
62    ///
63    /// Returns `None` at EOF or first corruption. After this call, use
64    /// either `read_payload()` to get the payload or `skip_payload()` to
65    /// seek past it.
66    pub fn next_header(&mut self) -> Result<Option<RecordHeader>> {
67        let mut header_buf = [0u8; HEADER_SIZE];
68        match self.read_exact(&mut header_buf) {
69            Ok(()) => {}
70            Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
71                return Ok(None);
72            }
73            Err(e) => return Err(e),
74        }
75
76        let header = RecordHeader::from_bytes(&header_buf);
77
78        if header.validate(self.offset - HEADER_SIZE as u64).is_err() {
79            return Ok(None);
80        }
81
82        // Check for unknown required record types.
83        let logical_type = header.logical_record_type();
84        if crate::record::RecordType::from_raw(logical_type).is_none()
85            && crate::record::RecordType::is_required(logical_type)
86        {
87            return Err(WalError::UnknownRequiredRecordType {
88                record_type: header.record_type,
89                lsn: header.lsn,
90            });
91        }
92
93        Ok(Some(header))
94    }
95
96    /// Read the payload for a header that was just returned by `next_header()`.
97    ///
98    /// Must be called exactly once after `next_header()` returns `Some`,
99    /// and before calling `next_header()` again (unless `skip_payload()`
100    /// was called instead).
101    pub fn read_payload(&mut self, header: &RecordHeader) -> Result<Vec<u8>> {
102        let mut payload = vec![0u8; header.payload_len as usize];
103        if !payload.is_empty() {
104            match self.read_exact(&mut payload) {
105                Ok(()) => {}
106                Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
107                    return Err(WalError::Io(std::io::Error::new(
108                        std::io::ErrorKind::UnexpectedEof,
109                        "torn write: incomplete payload",
110                    )));
111                }
112                Err(e) => return Err(e),
113            }
114        }
115
116        // Verify checksum.
117        let record = WalRecord {
118            header: *header,
119            payload: payload.clone(),
120        };
121        if record.verify_checksum().is_err() {
122            // Try double-write buffer recovery.
123            if let Some(dwb) = &mut self.double_write
124                && let Ok(Some(recovered)) = dwb.recover_record(header.lsn)
125            {
126                return Ok(recovered.payload);
127            }
128            return Err(WalError::Io(std::io::Error::new(
129                std::io::ErrorKind::InvalidData,
130                "checksum mismatch",
131            )));
132        }
133
134        Ok(payload)
135    }
136
137    /// Read the payload and return a full WalRecord.
138    pub fn read_record(&mut self, header: &RecordHeader) -> Result<WalRecord> {
139        let payload = self.read_payload(header)?;
140        Ok(WalRecord {
141            header: *header,
142            payload,
143        })
144    }
145
146    /// Skip the payload for a header, seeking forward without reading.
147    ///
148    /// This is the key optimization: non-matching records skip I/O entirely.
149    pub fn skip_payload(&mut self, header: &RecordHeader) -> Result<()> {
150        let len = header.payload_len as u64;
151        if len > 0 {
152            self.file
153                .seek(SeekFrom::Current(len as i64))
154                .map_err(WalError::Io)?;
155            self.offset += len;
156        }
157        Ok(())
158    }
159
160    /// Current file offset.
161    pub fn offset(&self) -> u64 {
162        self.offset
163    }
164
165    fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
166        self.file.read_exact(buf)?;
167        self.offset += buf.len() as u64;
168        Ok(())
169    }
170}
171
172/// Open a WAL segment for lazy reading and iterate with a callback.
173///
174/// Convenience function for single-pass replay: the callback receives each
175/// header and decides whether to read or skip the payload.
176pub fn replay_segment_lazy<F>(path: &Path, mut handler: F) -> Result<()>
177where
178    F: FnMut(&mut LazyWalReader, &RecordHeader) -> Result<()>,
179{
180    let mut reader = LazyWalReader::open(path)?;
181    while let Some(header) = reader.next_header()? {
182        handler(&mut reader, &header)?;
183    }
184    Ok(())
185}
186
187/// Replay all WAL segments in a directory with lazy reading.
188///
189/// Segments are read in LSN order. The callback decides per-record whether
190/// to read or skip the payload.
191pub fn replay_all_segments_lazy<F>(wal_dir: &Path, mut handler: F) -> Result<()>
192where
193    F: FnMut(&mut LazyWalReader, &RecordHeader) -> Result<()>,
194{
195    let segments = crate::segment::discover_segments(wal_dir)?;
196    for seg in &segments {
197        replay_segment_lazy(&seg.path, &mut handler)?;
198    }
199    Ok(())
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::record::RecordType;
206    use crate::writer::WalWriter;
207
208    #[test]
209    fn lazy_read_all_payloads() {
210        let dir = tempfile::tempdir().unwrap();
211        let path = dir.path().join("test.wal");
212
213        {
214            let mut w = WalWriter::open_without_direct_io(&path).unwrap();
215            w.append(RecordType::Put as u16, 1, 0, b"hello").unwrap();
216            w.append(RecordType::VectorPut as u16, 1, 0, b"vector-data")
217                .unwrap();
218            w.append(RecordType::Put as u16, 2, 1, b"world").unwrap();
219            w.sync().unwrap();
220        }
221
222        let mut reader = LazyWalReader::open(&path).unwrap();
223        let mut records = Vec::new();
224        while let Some(header) = reader.next_header().unwrap() {
225            let payload = reader.read_payload(&header).unwrap();
226            records.push((header, payload));
227        }
228        assert_eq!(records.len(), 3);
229        assert_eq!(records[0].1, b"hello");
230        assert_eq!(records[1].1, b"vector-data");
231        assert_eq!(records[2].1, b"world");
232    }
233
234    #[test]
235    fn lazy_skip_non_matching() {
236        let dir = tempfile::tempdir().unwrap();
237        let path = dir.path().join("test.wal");
238
239        {
240            let mut w = WalWriter::open_without_direct_io(&path).unwrap();
241            // 3 big TS records, 1 small vector record.
242            w.append(RecordType::TimeseriesBatch as u16, 1, 0, &[0u8; 10000])
243                .unwrap();
244            w.append(RecordType::TimeseriesBatch as u16, 1, 0, &[0u8; 10000])
245                .unwrap();
246            w.append(RecordType::VectorPut as u16, 1, 0, b"small-vec")
247                .unwrap();
248            w.append(RecordType::TimeseriesBatch as u16, 1, 0, &[0u8; 10000])
249                .unwrap();
250            w.sync().unwrap();
251        }
252
253        // A "vector core" reads only VectorPut, skips TimeseriesBatch.
254        let mut reader = LazyWalReader::open(&path).unwrap();
255        let mut vector_payloads = Vec::new();
256        let mut skipped = 0;
257
258        while let Some(header) = reader.next_header().unwrap() {
259            let rt = RecordType::from_raw(header.record_type);
260            if rt == Some(RecordType::VectorPut) {
261                let payload = reader.read_payload(&header).unwrap();
262                vector_payloads.push(payload);
263            } else {
264                reader.skip_payload(&header).unwrap();
265                skipped += 1;
266            }
267        }
268
269        assert_eq!(vector_payloads.len(), 1);
270        assert_eq!(vector_payloads[0], b"small-vec");
271        assert_eq!(skipped, 3);
272    }
273
274    #[test]
275    fn replay_all_segments_lazy_works() {
276        let dir = tempfile::tempdir().unwrap();
277        // Use proper segment filename pattern: wal-{lsn:020}.seg
278        let path = dir.path().join("wal-00000000000000000001.seg");
279
280        {
281            let mut w = WalWriter::open_without_direct_io(&path).unwrap();
282            w.append(RecordType::Put as u16, 1, 0, b"a").unwrap();
283            w.append(RecordType::Put as u16, 1, 0, b"b").unwrap();
284            w.sync().unwrap();
285        }
286
287        let mut count = 0;
288        replay_all_segments_lazy(dir.path(), |reader, header| {
289            reader.skip_payload(header)?;
290            count += 1;
291            Ok(())
292        })
293        .unwrap();
294        assert_eq!(count, 2);
295    }
296
297    #[test]
298    fn empty_wal_no_records() {
299        let dir = tempfile::tempdir().unwrap();
300        let path = dir.path().join("empty.wal");
301
302        {
303            let mut w = WalWriter::open_without_direct_io(&path).unwrap();
304            w.sync().unwrap();
305        }
306
307        let mut reader = LazyWalReader::open(&path).unwrap();
308        assert!(reader.next_header().unwrap().is_none());
309    }
310}