Skip to main content

nodedb_wal/
lazy_reader.rs

1//! Lazy WAL reader: reads headers without payload for selective replay.
2//!
3//! Unlike `WalReader` which reads every payload into a `Vec<u8>`, this
4//! reader reads only the 30-byte header first. The caller inspects the
5//! header (record_type, vshard_id, lsn) and decides whether to read or
6//! skip the payload.
7//!
8//! This is critical for startup replay performance: with 100M timeseries
9//! records, a vector core can skip TS payloads (potentially GBs) by
10//! seeking forward instead of allocating and reading.
11//!
12//! ## Usage
13//!
14//! ```text
15//! let mut reader = LazyWalReader::open(path)?;
16//! while let Some(header) = reader.next_header()? {
17//!     if header.record_type == RecordType::VectorPut as u16 {
18//!         let payload = reader.read_payload(&header)?;
19//!         // process vector record
20//!     } else {
21//!         reader.skip_payload(&header)?;
22//!     }
23//! }
24//! ```
25
26use std::fs::File;
27use std::io::{Read, Seek, SeekFrom};
28use std::path::Path;
29
30use crate::error::{Result, WalError};
31use crate::record::{HEADER_SIZE, RecordHeader, WalRecord};
32
33/// Lazy WAL reader that separates header reading from payload reading.
34pub struct LazyWalReader {
35    file: File,
36    offset: u64,
37    double_write: Option<crate::double_write::DoubleWriteBuffer>,
38}
39
40impl LazyWalReader {
41    /// Open a WAL file for lazy reading.
42    pub fn open(path: &Path) -> Result<Self> {
43        let file = File::open(path)?;
44        let dwb_path = path.with_extension("dwb");
45        let double_write = if dwb_path.exists() {
46            crate::double_write::DoubleWriteBuffer::open(&dwb_path).ok()
47        } else {
48            None
49        };
50        Ok(Self {
51            file,
52            offset: 0,
53            double_write,
54        })
55    }
56
57    /// Read the next record header (30 bytes) without reading the payload.
58    ///
59    /// Returns `None` at EOF or first corruption. After this call, use
60    /// either `read_payload()` to get the payload or `skip_payload()` to
61    /// seek past it.
62    pub fn next_header(&mut self) -> Result<Option<RecordHeader>> {
63        let mut header_buf = [0u8; HEADER_SIZE];
64        match self.read_exact(&mut header_buf) {
65            Ok(()) => {}
66            Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
67                return Ok(None);
68            }
69            Err(e) => return Err(e),
70        }
71
72        let header = RecordHeader::from_bytes(&header_buf);
73
74        if header.validate(self.offset - HEADER_SIZE as u64).is_err() {
75            return Ok(None);
76        }
77
78        // Check for unknown required record types.
79        let logical_type = header.logical_record_type();
80        if crate::record::RecordType::from_raw(logical_type).is_none()
81            && crate::record::RecordType::is_required(logical_type)
82        {
83            return Err(WalError::UnknownRequiredRecordType {
84                record_type: header.record_type,
85                lsn: header.lsn,
86            });
87        }
88
89        Ok(Some(header))
90    }
91
92    /// Read the payload for a header that was just returned by `next_header()`.
93    ///
94    /// Must be called exactly once after `next_header()` returns `Some`,
95    /// and before calling `next_header()` again (unless `skip_payload()`
96    /// was called instead).
97    pub fn read_payload(&mut self, header: &RecordHeader) -> Result<Vec<u8>> {
98        let mut payload = vec![0u8; header.payload_len as usize];
99        if !payload.is_empty() {
100            match self.read_exact(&mut payload) {
101                Ok(()) => {}
102                Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
103                    return Err(WalError::Io(std::io::Error::new(
104                        std::io::ErrorKind::UnexpectedEof,
105                        "torn write: incomplete payload",
106                    )));
107                }
108                Err(e) => return Err(e),
109            }
110        }
111
112        // Verify checksum.
113        let record = WalRecord {
114            header: *header,
115            payload: payload.clone(),
116        };
117        if record.verify_checksum().is_err() {
118            // Try double-write buffer recovery.
119            if let Some(dwb) = &mut self.double_write
120                && let Ok(Some(recovered)) = dwb.recover_record(header.lsn)
121            {
122                return Ok(recovered.payload);
123            }
124            return Err(WalError::Io(std::io::Error::new(
125                std::io::ErrorKind::InvalidData,
126                "checksum mismatch",
127            )));
128        }
129
130        Ok(payload)
131    }
132
133    /// Read the payload and return a full WalRecord.
134    pub fn read_record(&mut self, header: &RecordHeader) -> Result<WalRecord> {
135        let payload = self.read_payload(header)?;
136        Ok(WalRecord {
137            header: *header,
138            payload,
139        })
140    }
141
142    /// Skip the payload for a header, seeking forward without reading.
143    ///
144    /// This is the key optimization: non-matching records skip I/O entirely.
145    pub fn skip_payload(&mut self, header: &RecordHeader) -> Result<()> {
146        let len = header.payload_len as u64;
147        if len > 0 {
148            self.file
149                .seek(SeekFrom::Current(len as i64))
150                .map_err(WalError::Io)?;
151            self.offset += len;
152        }
153        Ok(())
154    }
155
156    /// Current file offset.
157    pub fn offset(&self) -> u64 {
158        self.offset
159    }
160
161    fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
162        self.file.read_exact(buf)?;
163        self.offset += buf.len() as u64;
164        Ok(())
165    }
166}
167
168/// Open a WAL segment for lazy reading and iterate with a callback.
169///
170/// Convenience function for single-pass replay: the callback receives each
171/// header and decides whether to read or skip the payload.
172pub fn replay_segment_lazy<F>(path: &Path, mut handler: F) -> Result<()>
173where
174    F: FnMut(&mut LazyWalReader, &RecordHeader) -> Result<()>,
175{
176    let mut reader = LazyWalReader::open(path)?;
177    while let Some(header) = reader.next_header()? {
178        handler(&mut reader, &header)?;
179    }
180    Ok(())
181}
182
183/// Replay all WAL segments in a directory with lazy reading.
184///
185/// Segments are read in LSN order. The callback decides per-record whether
186/// to read or skip the payload.
187pub fn replay_all_segments_lazy<F>(wal_dir: &Path, mut handler: F) -> Result<()>
188where
189    F: FnMut(&mut LazyWalReader, &RecordHeader) -> Result<()>,
190{
191    let segments = crate::segment::discover_segments(wal_dir)?;
192    for seg in &segments {
193        replay_segment_lazy(&seg.path, &mut handler)?;
194    }
195    Ok(())
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use crate::record::RecordType;
202    use crate::writer::WalWriter;
203
204    #[test]
205    fn lazy_read_all_payloads() {
206        let dir = tempfile::tempdir().unwrap();
207        let path = dir.path().join("test.wal");
208
209        {
210            let mut w = WalWriter::open_without_direct_io(&path).unwrap();
211            w.append(RecordType::Put as u16, 1, 0, b"hello").unwrap();
212            w.append(RecordType::VectorPut as u16, 1, 0, b"vector-data")
213                .unwrap();
214            w.append(RecordType::Put as u16, 2, 1, b"world").unwrap();
215            w.sync().unwrap();
216        }
217
218        let mut reader = LazyWalReader::open(&path).unwrap();
219        let mut records = Vec::new();
220        while let Some(header) = reader.next_header().unwrap() {
221            let payload = reader.read_payload(&header).unwrap();
222            records.push((header, payload));
223        }
224        assert_eq!(records.len(), 3);
225        assert_eq!(records[0].1, b"hello");
226        assert_eq!(records[1].1, b"vector-data");
227        assert_eq!(records[2].1, b"world");
228    }
229
230    #[test]
231    fn lazy_skip_non_matching() {
232        let dir = tempfile::tempdir().unwrap();
233        let path = dir.path().join("test.wal");
234
235        {
236            let mut w = WalWriter::open_without_direct_io(&path).unwrap();
237            // 3 big TS records, 1 small vector record.
238            w.append(RecordType::TimeseriesBatch as u16, 1, 0, &[0u8; 10000])
239                .unwrap();
240            w.append(RecordType::TimeseriesBatch as u16, 1, 0, &[0u8; 10000])
241                .unwrap();
242            w.append(RecordType::VectorPut as u16, 1, 0, b"small-vec")
243                .unwrap();
244            w.append(RecordType::TimeseriesBatch as u16, 1, 0, &[0u8; 10000])
245                .unwrap();
246            w.sync().unwrap();
247        }
248
249        // A "vector core" reads only VectorPut, skips TimeseriesBatch.
250        let mut reader = LazyWalReader::open(&path).unwrap();
251        let mut vector_payloads = Vec::new();
252        let mut skipped = 0;
253
254        while let Some(header) = reader.next_header().unwrap() {
255            let rt = RecordType::from_raw(header.record_type);
256            if rt == Some(RecordType::VectorPut) {
257                let payload = reader.read_payload(&header).unwrap();
258                vector_payloads.push(payload);
259            } else {
260                reader.skip_payload(&header).unwrap();
261                skipped += 1;
262            }
263        }
264
265        assert_eq!(vector_payloads.len(), 1);
266        assert_eq!(vector_payloads[0], b"small-vec");
267        assert_eq!(skipped, 3);
268    }
269
270    #[test]
271    fn replay_all_segments_lazy_works() {
272        let dir = tempfile::tempdir().unwrap();
273        // Use proper segment filename pattern: wal-{lsn:020}.seg
274        let path = dir.path().join("wal-00000000000000000001.seg");
275
276        {
277            let mut w = WalWriter::open_without_direct_io(&path).unwrap();
278            w.append(RecordType::Put as u16, 1, 0, b"a").unwrap();
279            w.append(RecordType::Put as u16, 1, 0, b"b").unwrap();
280            w.sync().unwrap();
281        }
282
283        let mut count = 0;
284        replay_all_segments_lazy(dir.path(), |reader, header| {
285            reader.skip_payload(header)?;
286            count += 1;
287            Ok(())
288        })
289        .unwrap();
290        assert_eq!(count, 2);
291    }
292
293    #[test]
294    fn empty_wal_no_records() {
295        let dir = tempfile::tempdir().unwrap();
296        let path = dir.path().join("empty.wal");
297
298        {
299            let mut w = WalWriter::open_without_direct_io(&path).unwrap();
300            w.sync().unwrap();
301        }
302
303        let mut reader = LazyWalReader::open(&path).unwrap();
304        assert!(reader.next_header().unwrap().is_none());
305    }
306}