Skip to main content

nodedb_wal/
reader.rs

1//! WAL reader for crash recovery and replay.
2//!
3//! Reads records sequentially from a WAL file, validating checksums and
4//! magic numbers. Stops at the first corruption point — everything before
5//! that point is the committed prefix.
6//!
7//! ## Replay invariants
8//!
9//! - Replay is **deterministic**: the same WAL file always produces the
10//!   same sequence of records.
11//! - Replay is **idempotent**: replaying the same record twice has the
12//!   same effect as replaying it once.
13//! - Unknown optional record types (bit 15 clear) are skipped.
14//! - Unknown required record types (bit 15 set) cause a replay failure.
15
16use std::fs::File;
17use std::io::Read;
18use std::path::Path;
19
20use crate::error::{Result, WalError};
21use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WalRecord};
22
23/// Sequential WAL reader.
24pub struct WalReader {
25    file: File,
26    offset: u64,
27    /// Optional double-write buffer for torn write recovery.
28    double_write: Option<crate::double_write::DoubleWriteBuffer>,
29}
30
31impl WalReader {
32    /// Open a WAL file for reading.
33    ///
34    /// Automatically opens the companion double-write buffer file
35    /// (`*.dwb`) if it exists alongside the WAL file.
36    pub fn open(path: &Path) -> Result<Self> {
37        let file = File::open(path)?;
38        let dwb_path = path.with_extension("dwb");
39        let double_write = if dwb_path.exists() {
40            crate::double_write::DoubleWriteBuffer::open(&dwb_path).ok()
41        } else {
42            None
43        };
44        Ok(Self {
45            file,
46            offset: 0,
47            double_write,
48        })
49    }
50
51    /// Read the next record from the WAL.
52    ///
53    /// Returns `None` at EOF (clean end) or at the first corruption point.
54    /// Returns `Err` only for I/O errors or unknown required record types.
55    pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
56        // Read header.
57        let mut header_buf = [0u8; HEADER_SIZE];
58        match self.read_exact(&mut header_buf) {
59            Ok(()) => {}
60            Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
61                return Ok(None); // Clean EOF.
62            }
63            Err(e) => return Err(e),
64        }
65
66        let header = RecordHeader::from_bytes(&header_buf);
67
68        // Validate magic and version.
69        if header.validate(self.offset - HEADER_SIZE as u64).is_err() {
70            // Corruption or end of valid data — treat as end of committed prefix.
71            return Ok(None);
72        }
73
74        // Read payload.
75        let mut payload = vec![0u8; header.payload_len as usize];
76        if !payload.is_empty() {
77            match self.read_exact(&mut payload) {
78                Ok(()) => {}
79                Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
80                    // Torn write — record is incomplete. This is the end of committed prefix.
81                    return Ok(None);
82                }
83                Err(e) => return Err(e),
84            }
85        }
86
87        let record = WalRecord { header, payload };
88
89        // Verify checksum.
90        if record.verify_checksum().is_err() {
91            // Checksum mismatch — torn write or corruption.
92            // Try to recover from double-write buffer if available.
93            if let Some(dwb) = &mut self.double_write
94                && let Ok(Some(recovered)) = dwb.recover_record(header.lsn)
95            {
96                tracing::info!(
97                    lsn = header.lsn,
98                    "recovered torn write from double-write buffer"
99                );
100                self.offset += recovered.payload.len() as u64;
101                return Ok(Some(recovered));
102            }
103            // No DWB recovery possible — end of committed prefix.
104            return Ok(None);
105        }
106
107        // Check if the record type is known (strip encrypted flag for lookup).
108        let logical_type = record.logical_record_type();
109        if RecordType::from_raw(logical_type).is_none() {
110            if RecordType::is_required(logical_type) {
111                return Err(WalError::UnknownRequiredRecordType {
112                    record_type: header.record_type,
113                    lsn: header.lsn,
114                });
115            }
116            // Unknown optional record — skip it and continue.
117            // (The record is already consumed, so just recurse.)
118            return self.next_record();
119        }
120
121        Ok(Some(record))
122    }
123
124    /// Iterator over all valid records in the WAL.
125    pub fn records(self) -> WalRecordIter {
126        WalRecordIter { reader: self }
127    }
128
129    /// Current read offset in the file.
130    pub fn offset(&self) -> u64 {
131        self.offset
132    }
133
134    fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
135        self.file.read_exact(buf)?;
136        self.offset += buf.len() as u64;
137        Ok(())
138    }
139}
140
141/// Iterator over WAL records.
142pub struct WalRecordIter {
143    reader: WalReader,
144}
145
146impl Iterator for WalRecordIter {
147    type Item = Result<WalRecord>;
148
149    fn next(&mut self) -> Option<Self::Item> {
150        match self.reader.next_record() {
151            Ok(Some(record)) => Some(Ok(record)),
152            Ok(None) => None,
153            Err(e) => Some(Err(e)),
154        }
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use crate::record::RecordType;
162    use crate::writer::WalWriter;
163
164    #[test]
165    fn write_then_read_roundtrip() {
166        let dir = tempfile::tempdir().unwrap();
167        let path = dir.path().join("test.wal");
168
169        // Write records.
170        {
171            let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
172            writer
173                .append(RecordType::Put as u16, 1, 0, b"first")
174                .unwrap();
175            writer
176                .append(RecordType::Put as u16, 2, 1, b"second")
177                .unwrap();
178            writer
179                .append(RecordType::Delete as u16, 1, 0, b"third")
180                .unwrap();
181            writer.sync().unwrap();
182        }
183
184        // Read them back.
185        let reader = WalReader::open(&path).unwrap();
186        let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
187
188        assert_eq!(records.len(), 3);
189        assert_eq!(records[0].header.lsn, 1);
190        assert_eq!(records[0].header.tenant_id, 1);
191        assert_eq!(records[0].payload, b"first");
192
193        assert_eq!(records[1].header.lsn, 2);
194        assert_eq!(records[1].header.tenant_id, 2);
195        assert_eq!(records[1].header.vshard_id, 1);
196        assert_eq!(records[1].payload, b"second");
197
198        assert_eq!(records[2].header.lsn, 3);
199        assert_eq!(records[2].header.record_type, RecordType::Delete as u16);
200        assert_eq!(records[2].payload, b"third");
201    }
202
203    #[test]
204    fn empty_wal_yields_no_records() {
205        let dir = tempfile::tempdir().unwrap();
206        let path = dir.path().join("empty.wal");
207
208        // Create an empty file.
209        {
210            let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
211            writer.sync().unwrap();
212        }
213
214        let reader = WalReader::open(&path).unwrap();
215        let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
216        assert!(records.is_empty());
217    }
218
219    #[test]
220    fn truncated_file_stops_at_committed_prefix() {
221        let dir = tempfile::tempdir().unwrap();
222        let path = dir.path().join("truncated.wal");
223
224        // Write records.
225        {
226            let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
227            writer
228                .append(RecordType::Put as u16, 1, 0, b"good-record")
229                .unwrap();
230            writer.sync().unwrap();
231        }
232
233        // Append garbage (simulating a torn write).
234        {
235            use std::io::Write;
236            let mut file = std::fs::OpenOptions::new()
237                .append(true)
238                .open(&path)
239                .unwrap();
240            file.write_all(b"GARBAGE_PARTIAL_RECORD").unwrap();
241        }
242
243        // Reader should return only the valid record.
244        let reader = WalReader::open(&path).unwrap();
245        let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
246        assert_eq!(records.len(), 1);
247        assert_eq!(records[0].payload, b"good-record");
248    }
249}