Skip to main content

nodedb_wal/
mmap_reader.rs

1//! Memory-mapped WAL segment reader for Event Plane catchup.
2//!
3//! Unlike the standard `WalReader` (which uses sequential `read_exact`),
4//! this reader maps sealed WAL segments into the process address space via
5//! `mmap`. The kernel manages the page cache — no slab allocator memory is
6//! pinned, and mmap reads from page cache don't contend with the Data Plane's
7//! O_DIRECT WAL append path (O_DIRECT bypasses page cache entirely).
8//!
9//! **Tier progression:**
10//! 1. In-memory Arc slabs (hot, zero-copy from ring buffer)
11//! 2. Mmap WAL segment reads (warm, kernel-managed pages)
12//! 3. Shed consumer + cold WAL replay (last resort)
13//!
14//! This reader is used in tier 2: when the Event Plane enters WAL Catchup
15//! Mode, it mmap's the relevant sealed segments and iterates records.
16
17use std::path::Path;
18
19use memmap2::Mmap;
20
21use crate::error::{Result, WalError};
22use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WAL_MAGIC, WalRecord};
23
24/// Memory-mapped WAL segment reader.
25///
26/// Opens a sealed WAL segment file via mmap and provides zero-copy
27/// iteration over records. The mmap'd region is read-only and the
28/// kernel manages page residency — no application-level memory pinning.
29pub struct MmapWalReader {
30    mmap: Mmap,
31    offset: usize,
32}
33
34impl MmapWalReader {
35    /// Open a WAL segment file for mmap'd reading.
36    pub fn open(path: &Path) -> Result<Self> {
37        let file = std::fs::File::open(path)?;
38        // SAFETY: The file is a sealed WAL segment (not being written to).
39        // The Data Plane writes to the ACTIVE segment via O_DIRECT; sealed
40        // segments are immutable after rollover.
41        let mmap = unsafe { Mmap::map(&file)? };
42        Ok(Self { mmap, offset: 0 })
43    }
44
45    /// Read the next record from the mmap'd region.
46    ///
47    /// Returns `None` at EOF or at the first corruption point.
48    /// Zero-copy: payload bytes reference the mmap'd region directly.
49    pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
50        let data = &self.mmap[..];
51
52        loop {
53            // Check if we have enough bytes for a header.
54            if self.offset + HEADER_SIZE > data.len() {
55                return Ok(None);
56            }
57
58            // Parse header.
59            let header_bytes: &[u8; HEADER_SIZE] = data[self.offset..self.offset + HEADER_SIZE]
60                .try_into()
61                .map_err(|_| {
62                    WalError::Io(std::io::Error::new(
63                        std::io::ErrorKind::InvalidData,
64                        "header slice conversion failed",
65                    ))
66                })?;
67            let header = RecordHeader::from_bytes(header_bytes);
68
69            // Validate magic — corruption or end of valid data.
70            if header.magic != WAL_MAGIC {
71                return Ok(None);
72            }
73
74            // Validate version.
75            if header.validate(self.offset as u64).is_err() {
76                return Ok(None);
77            }
78
79            let payload_len = header.payload_len as usize;
80            let record_end = self.offset + HEADER_SIZE + payload_len;
81
82            // Check if payload is fully within the mmap'd region.
83            if record_end > data.len() {
84                return Ok(None); // Torn write at segment end.
85            }
86
87            // Extract payload (copies from mmap to owned Vec).
88            let payload = data[self.offset + HEADER_SIZE..record_end].to_vec();
89            self.offset = record_end;
90
91            let record = WalRecord { header, payload };
92
93            // Verify checksum.
94            if record.verify_checksum().is_err() {
95                return Ok(None); // Corruption — end of committed prefix.
96            }
97
98            // Check record type.
99            let logical_type = record.logical_record_type();
100            if RecordType::from_raw(logical_type).is_none() {
101                if RecordType::is_required(logical_type) {
102                    return Err(WalError::UnknownRequiredRecordType {
103                        record_type: header.record_type,
104                        lsn: header.lsn,
105                    });
106                }
107                // Unknown optional record — skip and continue loop.
108                continue;
109            }
110
111            return Ok(Some(record));
112        }
113    }
114
115    /// Iterator over all valid records in the mmap'd segment.
116    pub fn records(self) -> MmapRecordIter {
117        MmapRecordIter { reader: self }
118    }
119
120    /// Current read offset.
121    pub fn offset(&self) -> usize {
122        self.offset
123    }
124
125    /// Total size of the mmap'd region.
126    pub fn len(&self) -> usize {
127        self.mmap.len()
128    }
129
130    /// Whether the mmap'd region is empty.
131    pub fn is_empty(&self) -> bool {
132        self.mmap.is_empty()
133    }
134}
135
136/// Iterator over records in a mmap'd WAL segment.
137pub struct MmapRecordIter {
138    reader: MmapWalReader,
139}
140
141impl Iterator for MmapRecordIter {
142    type Item = Result<WalRecord>;
143
144    fn next(&mut self) -> Option<Self::Item> {
145        match self.reader.next_record() {
146            Ok(Some(record)) => Some(Ok(record)),
147            Ok(None) => None,
148            Err(e) => Some(Err(e)),
149        }
150    }
151}
152
153/// Minimum number of segments to justify parallel replay overhead.
154const PARALLEL_SEGMENT_THRESHOLD: usize = 4;
155
156/// Replay WAL segments from a directory using mmap, starting from `from_lsn`.
157///
158/// Discovers all sealed segments, mmap's each, and returns records with
159/// LSN >= `from_lsn`. This is the Event Plane's tier-2 catchup path.
160///
161/// When 4+ segments need scanning, uses `std::thread::scope` to read
162/// segments in parallel (one thread per segment). Each thread mmap's its
163/// segment and filters records independently; results are merged in
164/// segment order (already LSN-sorted since segments are monotonic).
165pub fn replay_segments_mmap(wal_dir: &Path, from_lsn: u64) -> Result<Vec<WalRecord>> {
166    let segments = crate::segment::discover_segments(wal_dir)?;
167
168    if segments.len() < PARALLEL_SEGMENT_THRESHOLD {
169        return replay_segments_sequential(&segments, from_lsn);
170    }
171
172    replay_segments_parallel(&segments, from_lsn)
173}
174
175/// Sequential segment replay (used for small segment counts).
176fn replay_segments_sequential(
177    segments: &[crate::segment::SegmentMeta],
178    from_lsn: u64,
179) -> Result<Vec<WalRecord>> {
180    let mut records = Vec::new();
181    for seg in segments {
182        let reader = MmapWalReader::open(&seg.path)?;
183        for record_result in reader.records() {
184            let record = record_result?;
185            if record.header.lsn >= from_lsn {
186                records.push(record);
187            }
188        }
189    }
190    Ok(records)
191}
192
193/// Parallel segment replay using scoped threads.
194///
195/// Each segment is read in its own thread via mmap. Since segments are
196/// monotonically ordered by LSN, concatenating per-segment results in
197/// segment order produces a globally LSN-ordered result.
198fn replay_segments_parallel(
199    segments: &[crate::segment::SegmentMeta],
200    from_lsn: u64,
201) -> Result<Vec<WalRecord>> {
202    // Collect per-segment results. Index corresponds to segment order.
203    let mut per_segment: Vec<Result<Vec<WalRecord>>> = Vec::with_capacity(segments.len());
204
205    std::thread::scope(|scope| {
206        let handles: Vec<_> = segments
207            .iter()
208            .map(|seg| {
209                scope.spawn(move || -> Result<Vec<WalRecord>> {
210                    let reader = MmapWalReader::open(&seg.path)?;
211                    let mut seg_records = Vec::new();
212                    for record_result in reader.records() {
213                        let record = record_result?;
214                        if record.header.lsn >= from_lsn {
215                            seg_records.push(record);
216                        }
217                    }
218                    Ok(seg_records)
219                })
220            })
221            .collect();
222
223        for handle in handles {
224            per_segment.push(handle.join().unwrap_or_else(|_| {
225                Err(WalError::Io(std::io::Error::other(
226                    "segment replay thread panicked",
227                )))
228            }));
229        }
230    });
231
232    // Merge in segment order (preserves LSN ordering).
233    let total_estimate: usize = per_segment
234        .iter()
235        .map(|r| r.as_ref().map(|v| v.len()).unwrap_or(0))
236        .sum();
237    let mut records = Vec::with_capacity(total_estimate);
238    for seg_result in per_segment {
239        records.extend(seg_result?);
240    }
241
242    Ok(records)
243}
244
245/// Paginated mmap replay: reads at most `max_records` from `from_lsn`.
246///
247/// Returns `(records, has_more)` where `has_more` is `true` if the limit
248/// was reached before all segments were exhausted. This bounds memory
249/// usage per catch-up cycle to O(max_records) instead of O(all WAL data).
250///
251/// Always uses sequential reading (no parallel threads) since the bounded
252/// record count makes parallel overhead unnecessary.
253pub fn replay_segments_mmap_limit(
254    wal_dir: &Path,
255    from_lsn: u64,
256    max_records: usize,
257) -> Result<(Vec<WalRecord>, bool)> {
258    let segments = crate::segment::discover_segments(wal_dir)?;
259    let mut records = Vec::with_capacity(max_records.min(4096));
260
261    for seg in &segments {
262        // Skip segments that end before from_lsn. A segment's max LSN
263        // is at least its first_lsn, so if first_lsn of the NEXT segment
264        // is <= from_lsn we can skip this one. Conservative: always read
265        // the last segment since we don't know its max LSN cheaply.
266        let reader = MmapWalReader::open(&seg.path)?;
267        for record_result in reader.records() {
268            let record = record_result?;
269            if record.header.lsn >= from_lsn {
270                records.push(record);
271                if records.len() >= max_records {
272                    return Ok((records, true));
273                }
274            }
275        }
276    }
277
278    Ok((records, false))
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284    use crate::record::RecordType;
285    use crate::writer::{WalWriter, WalWriterConfig};
286
287    fn test_writer(path: &Path) -> WalWriter {
288        let config = WalWriterConfig {
289            use_direct_io: false, // Tests run without O_DIRECT.
290            ..Default::default()
291        };
292        WalWriter::open(path, config).unwrap()
293    }
294
295    #[test]
296    fn mmap_reader_basic() {
297        let dir = tempfile::tempdir().unwrap();
298        let path = dir.path().join("test.wal");
299
300        // Write some records with the standard writer.
301        {
302            let mut writer = test_writer(&path);
303            writer
304                .append(RecordType::Put as u16, 1, 0, b"hello")
305                .unwrap();
306            writer
307                .append(RecordType::Put as u16, 1, 0, b"world")
308                .unwrap();
309            writer.sync().unwrap();
310        }
311
312        // Read back with mmap reader.
313        let reader = MmapWalReader::open(&path).unwrap();
314        let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
315
316        assert_eq!(records.len(), 2);
317        assert_eq!(records[0].payload, b"hello");
318        assert_eq!(records[1].payload, b"world");
319    }
320
321    #[test]
322    fn mmap_reader_empty_file() {
323        let dir = tempfile::tempdir().unwrap();
324        let path = dir.path().join("empty.wal");
325        std::fs::write(&path, []).unwrap();
326
327        let reader = MmapWalReader::open(&path).unwrap();
328        let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
329        assert!(records.is_empty());
330    }
331
332    #[test]
333    fn mmap_reader_truncated_header() {
334        let dir = tempfile::tempdir().unwrap();
335        let path = dir.path().join("truncated.wal");
336        // Write 10 bytes — not enough for a header (30 bytes).
337        std::fs::write(&path, [0u8; 10]).unwrap();
338
339        let reader = MmapWalReader::open(&path).unwrap();
340        let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
341        assert!(records.is_empty());
342    }
343
344    #[test]
345    fn replay_mmap_from_lsn() {
346        let dir = tempfile::tempdir().unwrap();
347        let wal_dir = dir.path().join("wal");
348        std::fs::create_dir_all(&wal_dir).unwrap();
349
350        let config = crate::segmented::SegmentedWalConfig::for_testing(wal_dir.clone());
351        let mut wal = crate::segmented::SegmentedWal::open(config).unwrap();
352
353        let lsn1 = wal.append(RecordType::Put as u16, 1, 0, b"a").unwrap();
354        let lsn2 = wal.append(RecordType::Put as u16, 1, 0, b"b").unwrap();
355        let lsn3 = wal.append(RecordType::Put as u16, 1, 0, b"c").unwrap();
356        wal.sync().unwrap();
357
358        // Replay from lsn2 — should get records b and c.
359        let records = replay_segments_mmap(&wal_dir, lsn2).unwrap();
360        assert_eq!(records.len(), 2);
361        assert_eq!(records[0].header.lsn, lsn2);
362        assert_eq!(records[1].header.lsn, lsn3);
363
364        // Replay from lsn1 — all 3.
365        let all = replay_segments_mmap(&wal_dir, lsn1).unwrap();
366        assert_eq!(all.len(), 3);
367    }
368}