Skip to main content

nodedb_wal/
reader.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! WAL reader for crash recovery and replay.
4//!
5//! Reads records sequentially from a WAL file, validating checksums and
6//! magic numbers. Stops at the first corruption point — everything before
7//! that point is the committed prefix.
8//!
9//! ## Replay invariants
10//!
11//! - Replay is **deterministic**: the same WAL file always produces the
12//!   same sequence of records.
13//! - Replay is **idempotent**: replaying the same record twice has the
14//!   same effect as replaying it once.
15//! - Unknown optional record types (bit 15 clear) are skipped.
16//! - Unknown required record types (bit 15 set) cause a replay failure.
17
18use std::fs::File;
19use std::io::Read;
20use std::path::Path;
21
22use crate::error::{Result, WalError};
23use crate::preamble::{PREAMBLE_SIZE, SegmentPreamble, WAL_PREAMBLE_MAGIC};
24use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WalRecord};
25
26/// Sequential WAL reader.
27pub struct WalReader {
28    file: File,
29    offset: u64,
30    /// Preamble read from offset 0 of this segment (present when encryption
31    /// is active). The epoch is used as part of the AAD for decryption.
32    segment_preamble: Option<SegmentPreamble>,
33    /// Optional double-write buffer for torn write recovery.
34    double_write: Option<crate::double_write::DoubleWriteBuffer>,
35}
36
37impl WalReader {
38    /// Open a WAL file for reading.
39    ///
40    /// If the file begins with a valid `WALP` preamble (16 bytes), it is
41    /// consumed and stored for use as AAD during decryption. Files without a
42    /// preamble (unencrypted segments) start reading from offset 0 directly.
43    ///
44    /// Automatically opens the companion double-write buffer file
45    /// (`*.dwb`) if it exists alongside the WAL file.
46    pub fn open(path: &Path) -> Result<Self> {
47        let mut file = File::open(path)?;
48        let dwb_path = path.with_extension("dwb");
49        let double_write = if dwb_path.exists() {
50            crate::double_write::DoubleWriteBuffer::open(
51                &dwb_path,
52                crate::double_write::DwbMode::Buffered,
53            )
54            .ok()
55        } else {
56            None
57        };
58
59        // Attempt to read the preamble at offset 0.
60        // If the first 4 bytes match WAL_PREAMBLE_MAGIC, consume the full
61        // 16-byte preamble and validate it. Otherwise rewind to 0.
62        let (segment_preamble, start_offset) = try_read_preamble(&mut file)?;
63
64        Ok(Self {
65            file,
66            offset: start_offset,
67            segment_preamble,
68            double_write,
69        })
70    }
71
72    /// The preamble read from this segment file, if present.
73    ///
74    /// Returns `None` for unencrypted segments (no preamble written).
75    pub fn segment_preamble(&self) -> Option<&SegmentPreamble> {
76        self.segment_preamble.as_ref()
77    }
78
79    /// Read the next record from the WAL.
80    ///
81    /// Returns `None` at EOF (clean end) or at the first corruption point.
82    /// Returns `Err` only for I/O errors or unknown required record types.
83    pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
84        loop {
85            // Read header.
86            let mut header_buf = [0u8; HEADER_SIZE];
87            match self.read_exact(&mut header_buf) {
88                Ok(()) => {}
89                Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
90                    return Ok(None); // Clean EOF.
91                }
92                Err(e) => return Err(e),
93            }
94
95            let header = RecordHeader::from_bytes(&header_buf);
96
97            // Validate magic and version.
98            if header.validate(self.offset - HEADER_SIZE as u64).is_err() {
99                // Corruption or end of valid data — treat as end of committed prefix.
100                return Ok(None);
101            }
102
103            // Read payload.
104            let mut payload = vec![0u8; header.payload_len as usize];
105            if !payload.is_empty() {
106                match self.read_exact(&mut payload) {
107                    Ok(()) => {}
108                    Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
109                        return Ok(None);
110                    }
111                    Err(e) => return Err(e),
112                }
113            }
114
115            let record = WalRecord { header, payload };
116
117            // Verify checksum.
118            if record.verify_checksum().is_err() {
119                if let Some(dwb) = &mut self.double_write
120                    && let Ok(Some(recovered)) = dwb.recover_record(header.lsn)
121                {
122                    tracing::info!(
123                        lsn = header.lsn,
124                        "recovered torn write from double-write buffer"
125                    );
126                    self.offset += recovered.payload.len() as u64;
127                    return Ok(Some(recovered));
128                }
129                return Ok(None);
130            }
131
132            // Check if the record type is known (strip encrypted flag for lookup).
133            let logical_type = record.logical_record_type();
134            if RecordType::from_raw(logical_type).is_none() {
135                if RecordType::is_required(logical_type) {
136                    return Err(WalError::UnknownRequiredRecordType {
137                        record_type: header.record_type,
138                        lsn: header.lsn,
139                    });
140                }
141                // Unknown optional record — skip and continue loop.
142                continue;
143            }
144
145            return Ok(Some(record));
146        }
147    }
148
149    /// Iterator over all valid records in the WAL.
150    pub fn records(self) -> WalRecordIter {
151        WalRecordIter { reader: self }
152    }
153
154    /// Current read offset in the file.
155    pub fn offset(&self) -> u64 {
156        self.offset
157    }
158
159    fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
160        self.file.read_exact(buf)?;
161        self.offset += buf.len() as u64;
162        Ok(())
163    }
164}
165
166/// Attempt to read a WAL segment preamble from the start of a file.
167///
168/// Returns `(Some(preamble), PREAMBLE_SIZE)` if a valid `WALP` preamble is
169/// found, or `(None, 0)` if the file does not start with the preamble magic
170/// (unencrypted segment — seek back to 0).
171fn try_read_preamble(file: &mut File) -> Result<(Option<SegmentPreamble>, u64)> {
172    use std::io::Seek;
173
174    let mut buf = [0u8; PREAMBLE_SIZE];
175    match file.read_exact(&mut buf) {
176        Ok(()) => {}
177        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
178            // File is too short to hold a preamble — no preamble present.
179            file.seek(std::io::SeekFrom::Start(0))?;
180            return Ok((None, 0));
181        }
182        Err(e) => return Err(WalError::Io(e)),
183    }
184
185    if buf[0..4] == WAL_PREAMBLE_MAGIC {
186        // Parse and validate the preamble. An unsupported version is a hard
187        // error — do not silently fall through to record scanning.
188        let preamble = SegmentPreamble::from_bytes(&buf, &WAL_PREAMBLE_MAGIC)?;
189        Ok((Some(preamble), PREAMBLE_SIZE as u64))
190    } else {
191        // First bytes are not the preamble magic — rewind and read as records.
192        file.seek(std::io::SeekFrom::Start(0))?;
193        Ok((None, 0))
194    }
195}
196
197/// Iterator over WAL records.
198pub struct WalRecordIter {
199    reader: WalReader,
200}
201
202impl Iterator for WalRecordIter {
203    type Item = Result<WalRecord>;
204
205    fn next(&mut self) -> Option<Self::Item> {
206        match self.reader.next_record() {
207            Ok(Some(record)) => Some(Ok(record)),
208            Ok(None) => None,
209            Err(e) => Some(Err(e)),
210        }
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use crate::record::RecordType;
218    use crate::writer::WalWriter;
219
220    #[test]
221    fn write_then_read_roundtrip() {
222        let dir = tempfile::tempdir().unwrap();
223        let path = dir.path().join("test.wal");
224
225        // Write records.
226        {
227            let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
228            writer
229                .append(RecordType::Put as u32, 1, 0, 0, b"first")
230                .unwrap();
231            writer
232                .append(RecordType::Put as u32, 2, 1, 0, b"second")
233                .unwrap();
234            writer
235                .append(RecordType::Delete as u32, 1, 0, 0, b"third")
236                .unwrap();
237            writer.sync().unwrap();
238        }
239
240        // Read them back.
241        let reader = WalReader::open(&path).unwrap();
242        let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
243
244        assert_eq!(records.len(), 3);
245        assert_eq!(records[0].header.lsn, 1);
246        assert_eq!(records[0].header.tenant_id, 1);
247        assert_eq!(records[0].payload, b"first");
248
249        assert_eq!(records[1].header.lsn, 2);
250        assert_eq!(records[1].header.tenant_id, 2);
251        assert_eq!(records[1].header.vshard_id, 1);
252        assert_eq!(records[1].payload, b"second");
253
254        assert_eq!(records[2].header.lsn, 3);
255        assert_eq!(records[2].header.record_type, RecordType::Delete as u32);
256        assert_eq!(records[2].payload, b"third");
257    }
258
259    #[test]
260    fn empty_wal_yields_no_records() {
261        let dir = tempfile::tempdir().unwrap();
262        let path = dir.path().join("empty.wal");
263
264        // Create an empty file.
265        {
266            let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
267            writer.sync().unwrap();
268        }
269
270        let reader = WalReader::open(&path).unwrap();
271        let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
272        assert!(records.is_empty());
273    }
274
275    #[test]
276    fn truncated_file_stops_at_committed_prefix() {
277        let dir = tempfile::tempdir().unwrap();
278        let path = dir.path().join("truncated.wal");
279
280        // Write records.
281        {
282            let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
283            writer
284                .append(RecordType::Put as u32, 1, 0, 0, b"good-record")
285                .unwrap();
286            writer.sync().unwrap();
287        }
288
289        // Append garbage (simulating a torn write).
290        {
291            use std::io::Write;
292            let mut file = std::fs::OpenOptions::new()
293                .append(true)
294                .open(&path)
295                .unwrap();
296            file.write_all(b"GARBAGE_PARTIAL_RECORD").unwrap();
297        }
298
299        // Reader should return only the valid record.
300        let reader = WalReader::open(&path).unwrap();
301        let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
302        assert_eq!(records.len(), 1);
303        assert_eq!(records[0].payload, b"good-record");
304    }
305
306    #[test]
307    fn skip_many_unknown_optional_records_is_iterative() {
308        // Record type 99 has bit 15 clear (99 & 0x8000 == 0) and is not a
309        // known variant, so the reader must skip it as an unknown optional.
310        // With the current recursive implementation (line 118: `return
311        // self.next_record()`), 50 000 consecutive unknown optional records
312        // exhaust the stack and panic. After the fix converts the skip to a
313        // loop, all 50 000 are skipped without overflow and the one valid
314        // record at the end is returned.
315        const UNKNOWN_OPTIONAL: u32 = 99; // no 0x8000 bit → optional, not in enum
316        const SKIP_COUNT: usize = 50_000;
317
318        let dir = tempfile::tempdir().unwrap();
319        let path = dir.path().join("many_unknown.wal");
320
321        {
322            let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
323            for _ in 0..SKIP_COUNT {
324                writer
325                    .append(UNKNOWN_OPTIONAL, 1, 0, 0, b"skip-me")
326                    .unwrap();
327            }
328            writer
329                .append(RecordType::Put as u32, 1, 0, 0, b"keep-me")
330                .unwrap();
331            writer.sync().unwrap();
332        }
333
334        let reader = WalReader::open(&path).unwrap();
335        let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
336
337        // Only the single known Put record survives; all unknown optional
338        // records are silently discarded.
339        assert_eq!(records.len(), 1);
340        assert_eq!(records[0].payload, b"keep-me");
341    }
342}