Skip to main content

nodedb_wal/
reader.rs

1//! WAL reader for crash recovery and replay.
2//!
3//! Reads records sequentially from a WAL file, validating checksums and
4//! magic numbers. Stops at the first corruption point — everything before
5//! that point is the committed prefix.
6//!
7//! ## Replay invariants
8//!
9//! - Replay is **deterministic**: the same WAL file always produces the
10//!   same sequence of records.
11//! - Replay is **idempotent**: replaying the same record twice has the
12//!   same effect as replaying it once.
13//! - Unknown optional record types (bit 15 clear) are skipped.
14//! - Unknown required record types (bit 15 set) cause a replay failure.
15
16use std::fs::File;
17use std::io::Read;
18use std::path::Path;
19
20use crate::error::{Result, WalError};
21use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WalRecord};
22
23/// Sequential WAL reader.
24pub struct WalReader {
25    file: File,
26    offset: u64,
27    /// Optional double-write buffer for torn write recovery.
28    double_write: Option<crate::double_write::DoubleWriteBuffer>,
29}
30
31impl WalReader {
32    /// Open a WAL file for reading.
33    ///
34    /// Automatically opens the companion double-write buffer file
35    /// (`*.dwb`) if it exists alongside the WAL file.
36    pub fn open(path: &Path) -> Result<Self> {
37        let file = File::open(path)?;
38        let dwb_path = path.with_extension("dwb");
39        let double_write = if dwb_path.exists() {
40            crate::double_write::DoubleWriteBuffer::open(
41                &dwb_path,
42                crate::double_write::DwbMode::Buffered,
43            )
44            .ok()
45        } else {
46            None
47        };
48        Ok(Self {
49            file,
50            offset: 0,
51            double_write,
52        })
53    }
54
55    /// Read the next record from the WAL.
56    ///
57    /// Returns `None` at EOF (clean end) or at the first corruption point.
58    /// Returns `Err` only for I/O errors or unknown required record types.
59    pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
60        loop {
61            // Read header.
62            let mut header_buf = [0u8; HEADER_SIZE];
63            match self.read_exact(&mut header_buf) {
64                Ok(()) => {}
65                Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
66                    return Ok(None); // Clean EOF.
67                }
68                Err(e) => return Err(e),
69            }
70
71            let header = RecordHeader::from_bytes(&header_buf);
72
73            // Validate magic and version.
74            if header.validate(self.offset - HEADER_SIZE as u64).is_err() {
75                // Corruption or end of valid data — treat as end of committed prefix.
76                return Ok(None);
77            }
78
79            // Read payload.
80            let mut payload = vec![0u8; header.payload_len as usize];
81            if !payload.is_empty() {
82                match self.read_exact(&mut payload) {
83                    Ok(()) => {}
84                    Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
85                        return Ok(None);
86                    }
87                    Err(e) => return Err(e),
88                }
89            }
90
91            let record = WalRecord { header, payload };
92
93            // Verify checksum.
94            if record.verify_checksum().is_err() {
95                if let Some(dwb) = &mut self.double_write
96                    && let Ok(Some(recovered)) = dwb.recover_record(header.lsn)
97                {
98                    tracing::info!(
99                        lsn = header.lsn,
100                        "recovered torn write from double-write buffer"
101                    );
102                    self.offset += recovered.payload.len() as u64;
103                    return Ok(Some(recovered));
104                }
105                return Ok(None);
106            }
107
108            // Check if the record type is known (strip encrypted flag for lookup).
109            let logical_type = record.logical_record_type();
110            if RecordType::from_raw(logical_type).is_none() {
111                if RecordType::is_required(logical_type) {
112                    return Err(WalError::UnknownRequiredRecordType {
113                        record_type: header.record_type,
114                        lsn: header.lsn,
115                    });
116                }
117                // Unknown optional record — skip and continue loop.
118                continue;
119            }
120
121            return Ok(Some(record));
122        }
123    }
124
125    /// Iterator over all valid records in the WAL.
126    pub fn records(self) -> WalRecordIter {
127        WalRecordIter { reader: self }
128    }
129
130    /// Current read offset in the file.
131    pub fn offset(&self) -> u64 {
132        self.offset
133    }
134
135    fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
136        self.file.read_exact(buf)?;
137        self.offset += buf.len() as u64;
138        Ok(())
139    }
140}
141
142/// Iterator over WAL records.
143pub struct WalRecordIter {
144    reader: WalReader,
145}
146
147impl Iterator for WalRecordIter {
148    type Item = Result<WalRecord>;
149
150    fn next(&mut self) -> Option<Self::Item> {
151        match self.reader.next_record() {
152            Ok(Some(record)) => Some(Ok(record)),
153            Ok(None) => None,
154            Err(e) => Some(Err(e)),
155        }
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use crate::record::RecordType;
163    use crate::writer::WalWriter;
164
165    #[test]
166    fn write_then_read_roundtrip() {
167        let dir = tempfile::tempdir().unwrap();
168        let path = dir.path().join("test.wal");
169
170        // Write records.
171        {
172            let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
173            writer
174                .append(RecordType::Put as u16, 1, 0, b"first")
175                .unwrap();
176            writer
177                .append(RecordType::Put as u16, 2, 1, b"second")
178                .unwrap();
179            writer
180                .append(RecordType::Delete as u16, 1, 0, b"third")
181                .unwrap();
182            writer.sync().unwrap();
183        }
184
185        // Read them back.
186        let reader = WalReader::open(&path).unwrap();
187        let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
188
189        assert_eq!(records.len(), 3);
190        assert_eq!(records[0].header.lsn, 1);
191        assert_eq!(records[0].header.tenant_id, 1);
192        assert_eq!(records[0].payload, b"first");
193
194        assert_eq!(records[1].header.lsn, 2);
195        assert_eq!(records[1].header.tenant_id, 2);
196        assert_eq!(records[1].header.vshard_id, 1);
197        assert_eq!(records[1].payload, b"second");
198
199        assert_eq!(records[2].header.lsn, 3);
200        assert_eq!(records[2].header.record_type, RecordType::Delete as u16);
201        assert_eq!(records[2].payload, b"third");
202    }
203
204    #[test]
205    fn empty_wal_yields_no_records() {
206        let dir = tempfile::tempdir().unwrap();
207        let path = dir.path().join("empty.wal");
208
209        // Create an empty file.
210        {
211            let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
212            writer.sync().unwrap();
213        }
214
215        let reader = WalReader::open(&path).unwrap();
216        let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
217        assert!(records.is_empty());
218    }
219
220    #[test]
221    fn truncated_file_stops_at_committed_prefix() {
222        let dir = tempfile::tempdir().unwrap();
223        let path = dir.path().join("truncated.wal");
224
225        // Write records.
226        {
227            let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
228            writer
229                .append(RecordType::Put as u16, 1, 0, b"good-record")
230                .unwrap();
231            writer.sync().unwrap();
232        }
233
234        // Append garbage (simulating a torn write).
235        {
236            use std::io::Write;
237            let mut file = std::fs::OpenOptions::new()
238                .append(true)
239                .open(&path)
240                .unwrap();
241            file.write_all(b"GARBAGE_PARTIAL_RECORD").unwrap();
242        }
243
244        // Reader should return only the valid record.
245        let reader = WalReader::open(&path).unwrap();
246        let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
247        assert_eq!(records.len(), 1);
248        assert_eq!(records[0].payload, b"good-record");
249    }
250
251    #[test]
252    fn skip_many_unknown_optional_records_is_iterative() {
253        // Record type 99 has bit 15 clear (99 & 0x8000 == 0) and is not a
254        // known variant, so the reader must skip it as an unknown optional.
255        // With the current recursive implementation (line 118: `return
256        // self.next_record()`), 50 000 consecutive unknown optional records
257        // exhaust the stack and panic. After the fix converts the skip to a
258        // loop, all 50 000 are skipped without overflow and the one valid
259        // record at the end is returned.
260        const UNKNOWN_OPTIONAL: u16 = 99; // no 0x8000 bit → optional, not in enum
261        const SKIP_COUNT: usize = 50_000;
262
263        let dir = tempfile::tempdir().unwrap();
264        let path = dir.path().join("many_unknown.wal");
265
266        {
267            let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
268            for _ in 0..SKIP_COUNT {
269                writer.append(UNKNOWN_OPTIONAL, 1, 0, b"skip-me").unwrap();
270            }
271            writer
272                .append(RecordType::Put as u16, 1, 0, b"keep-me")
273                .unwrap();
274            writer.sync().unwrap();
275        }
276
277        let reader = WalReader::open(&path).unwrap();
278        let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
279
280        // Only the single known Put record survives; all unknown optional
281        // records are silently discarded.
282        assert_eq!(records.len(), 1);
283        assert_eq!(records[0].payload, b"keep-me");
284    }
285}