Skip to main content

nodedb_wal/
reader.rs

1//! WAL reader for crash recovery and replay.
2//!
3//! Reads records sequentially from a WAL file, validating checksums and
4//! magic numbers. Stops at the first corruption point — everything before
5//! that point is the committed prefix.
6//!
7//! ## Replay invariants
8//!
9//! - Replay is **deterministic**: the same WAL file always produces the
10//!   same sequence of records.
11//! - Replay is **idempotent**: replaying the same record twice has the
12//!   same effect as replaying it once.
13//! - Unknown optional record types (bit 15 clear) are skipped.
14//! - Unknown required record types (bit 15 set) cause a replay failure.
15
16use std::fs::File;
17use std::io::Read;
18use std::path::Path;
19
20use crate::error::{Result, WalError};
21use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WalRecord};
22
23/// Sequential WAL reader.
24pub struct WalReader {
25    file: File,
26    offset: u64,
27    /// Optional double-write buffer for torn write recovery.
28    double_write: Option<crate::double_write::DoubleWriteBuffer>,
29}
30
31impl WalReader {
32    /// Open a WAL file for reading.
33    ///
34    /// Automatically opens the companion double-write buffer file
35    /// (`*.dwb`) if it exists alongside the WAL file.
36    pub fn open(path: &Path) -> Result<Self> {
37        let file = File::open(path)?;
38        let dwb_path = path.with_extension("dwb");
39        let double_write = if dwb_path.exists() {
40            crate::double_write::DoubleWriteBuffer::open(&dwb_path).ok()
41        } else {
42            None
43        };
44        Ok(Self {
45            file,
46            offset: 0,
47            double_write,
48        })
49    }
50
51    /// Read the next record from the WAL.
52    ///
53    /// Returns `None` at EOF (clean end) or at the first corruption point.
54    /// Returns `Err` only for I/O errors or unknown required record types.
55    pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
56        loop {
57            // Read header.
58            let mut header_buf = [0u8; HEADER_SIZE];
59            match self.read_exact(&mut header_buf) {
60                Ok(()) => {}
61                Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
62                    return Ok(None); // Clean EOF.
63                }
64                Err(e) => return Err(e),
65            }
66
67            let header = RecordHeader::from_bytes(&header_buf);
68
69            // Validate magic and version.
70            if header.validate(self.offset - HEADER_SIZE as u64).is_err() {
71                // Corruption or end of valid data — treat as end of committed prefix.
72                return Ok(None);
73            }
74
75            // Read payload.
76            let mut payload = vec![0u8; header.payload_len as usize];
77            if !payload.is_empty() {
78                match self.read_exact(&mut payload) {
79                    Ok(()) => {}
80                    Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
81                        return Ok(None);
82                    }
83                    Err(e) => return Err(e),
84                }
85            }
86
87            let record = WalRecord { header, payload };
88
89            // Verify checksum.
90            if record.verify_checksum().is_err() {
91                if let Some(dwb) = &mut self.double_write
92                    && let Ok(Some(recovered)) = dwb.recover_record(header.lsn)
93                {
94                    tracing::info!(
95                        lsn = header.lsn,
96                        "recovered torn write from double-write buffer"
97                    );
98                    self.offset += recovered.payload.len() as u64;
99                    return Ok(Some(recovered));
100                }
101                return Ok(None);
102            }
103
104            // Check if the record type is known (strip encrypted flag for lookup).
105            let logical_type = record.logical_record_type();
106            if RecordType::from_raw(logical_type).is_none() {
107                if RecordType::is_required(logical_type) {
108                    return Err(WalError::UnknownRequiredRecordType {
109                        record_type: header.record_type,
110                        lsn: header.lsn,
111                    });
112                }
113                // Unknown optional record — skip and continue loop.
114                continue;
115            }
116
117            return Ok(Some(record));
118        }
119    }
120
121    /// Iterator over all valid records in the WAL.
122    pub fn records(self) -> WalRecordIter {
123        WalRecordIter { reader: self }
124    }
125
126    /// Current read offset in the file.
127    pub fn offset(&self) -> u64 {
128        self.offset
129    }
130
131    fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
132        self.file.read_exact(buf)?;
133        self.offset += buf.len() as u64;
134        Ok(())
135    }
136}
137
138/// Iterator over WAL records.
139pub struct WalRecordIter {
140    reader: WalReader,
141}
142
143impl Iterator for WalRecordIter {
144    type Item = Result<WalRecord>;
145
146    fn next(&mut self) -> Option<Self::Item> {
147        match self.reader.next_record() {
148            Ok(Some(record)) => Some(Ok(record)),
149            Ok(None) => None,
150            Err(e) => Some(Err(e)),
151        }
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use crate::record::RecordType;
159    use crate::writer::WalWriter;
160
161    #[test]
162    fn write_then_read_roundtrip() {
163        let dir = tempfile::tempdir().unwrap();
164        let path = dir.path().join("test.wal");
165
166        // Write records.
167        {
168            let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
169            writer
170                .append(RecordType::Put as u16, 1, 0, b"first")
171                .unwrap();
172            writer
173                .append(RecordType::Put as u16, 2, 1, b"second")
174                .unwrap();
175            writer
176                .append(RecordType::Delete as u16, 1, 0, b"third")
177                .unwrap();
178            writer.sync().unwrap();
179        }
180
181        // Read them back.
182        let reader = WalReader::open(&path).unwrap();
183        let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
184
185        assert_eq!(records.len(), 3);
186        assert_eq!(records[0].header.lsn, 1);
187        assert_eq!(records[0].header.tenant_id, 1);
188        assert_eq!(records[0].payload, b"first");
189
190        assert_eq!(records[1].header.lsn, 2);
191        assert_eq!(records[1].header.tenant_id, 2);
192        assert_eq!(records[1].header.vshard_id, 1);
193        assert_eq!(records[1].payload, b"second");
194
195        assert_eq!(records[2].header.lsn, 3);
196        assert_eq!(records[2].header.record_type, RecordType::Delete as u16);
197        assert_eq!(records[2].payload, b"third");
198    }
199
200    #[test]
201    fn empty_wal_yields_no_records() {
202        let dir = tempfile::tempdir().unwrap();
203        let path = dir.path().join("empty.wal");
204
205        // Create an empty file.
206        {
207            let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
208            writer.sync().unwrap();
209        }
210
211        let reader = WalReader::open(&path).unwrap();
212        let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
213        assert!(records.is_empty());
214    }
215
216    #[test]
217    fn truncated_file_stops_at_committed_prefix() {
218        let dir = tempfile::tempdir().unwrap();
219        let path = dir.path().join("truncated.wal");
220
221        // Write records.
222        {
223            let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
224            writer
225                .append(RecordType::Put as u16, 1, 0, b"good-record")
226                .unwrap();
227            writer.sync().unwrap();
228        }
229
230        // Append garbage (simulating a torn write).
231        {
232            use std::io::Write;
233            let mut file = std::fs::OpenOptions::new()
234                .append(true)
235                .open(&path)
236                .unwrap();
237            file.write_all(b"GARBAGE_PARTIAL_RECORD").unwrap();
238        }
239
240        // Reader should return only the valid record.
241        let reader = WalReader::open(&path).unwrap();
242        let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
243        assert_eq!(records.len(), 1);
244        assert_eq!(records[0].payload, b"good-record");
245    }
246
247    #[test]
248    fn skip_many_unknown_optional_records_is_iterative() {
249        // Record type 99 has bit 15 clear (99 & 0x8000 == 0) and is not a
250        // known variant, so the reader must skip it as an unknown optional.
251        // With the current recursive implementation (line 118: `return
252        // self.next_record()`), 50 000 consecutive unknown optional records
253        // exhaust the stack and panic. After the fix converts the skip to a
254        // loop, all 50 000 are skipped without overflow and the one valid
255        // record at the end is returned.
256        const UNKNOWN_OPTIONAL: u16 = 99; // no 0x8000 bit → optional, not in enum
257        const SKIP_COUNT: usize = 50_000;
258
259        let dir = tempfile::tempdir().unwrap();
260        let path = dir.path().join("many_unknown.wal");
261
262        {
263            let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
264            for _ in 0..SKIP_COUNT {
265                writer.append(UNKNOWN_OPTIONAL, 1, 0, b"skip-me").unwrap();
266            }
267            writer
268                .append(RecordType::Put as u16, 1, 0, b"keep-me")
269                .unwrap();
270            writer.sync().unwrap();
271        }
272
273        let reader = WalReader::open(&path).unwrap();
274        let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
275
276        // Only the single known Put record survives; all unknown optional
277        // records are silently discarded.
278        assert_eq!(records.len(), 1);
279        assert_eq!(records[0].payload, b"keep-me");
280    }
281}