Skip to main content

nodedb_wal/
mmap_reader.rs

1//! Memory-mapped WAL segment reader for Event Plane catchup.
2//!
3//! Unlike the standard `WalReader` (which uses sequential `read_exact`),
4//! this reader maps sealed WAL segments into the process address space via
5//! `mmap`. The kernel manages the page cache — no slab allocator memory is
6//! pinned, and mmap reads from page cache don't contend with the Data Plane's
7//! O_DIRECT WAL append path (O_DIRECT bypasses page cache entirely).
8//!
9//! **Tier progression:**
10//! 1. In-memory Arc slabs (hot, zero-copy from ring buffer)
11//! 2. Mmap WAL segment reads (warm, kernel-managed pages)
12//! 3. Shed consumer + cold WAL replay (last resort)
13//!
14//! This reader is used in tier 2: when the Event Plane enters WAL Catchup
15//! Mode, it mmap's the relevant sealed segments and iterates records.
16
17use std::path::Path;
18
19use memmap2::Mmap;
20
21use crate::error::{Result, WalError};
22use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WAL_MAGIC, WalRecord};
23
24/// Memory-mapped WAL segment reader.
25///
26/// Opens a sealed WAL segment file via mmap and provides zero-copy
27/// iteration over records. The mmap'd region is read-only and the
28/// kernel manages page residency — no application-level memory pinning.
29pub struct MmapWalReader {
30    mmap: Mmap,
31    offset: usize,
32}
33
34impl MmapWalReader {
35    /// Open a WAL segment file for mmap'd reading.
36    pub fn open(path: &Path) -> Result<Self> {
37        let file = std::fs::File::open(path)?;
38        // SAFETY: The file is a sealed WAL segment (not being written to).
39        // The Data Plane writes to the ACTIVE segment via O_DIRECT; sealed
40        // segments are immutable after rollover.
41        let mmap = unsafe { Mmap::map(&file)? };
42        Ok(Self { mmap, offset: 0 })
43    }
44
45    /// Read the next record from the mmap'd region.
46    ///
47    /// Returns `None` at EOF or at the first corruption point.
48    /// Zero-copy: payload bytes reference the mmap'd region directly.
49    pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
50        let data = &self.mmap[..];
51
52        // Check if we have enough bytes for a header.
53        if self.offset + HEADER_SIZE > data.len() {
54            return Ok(None);
55        }
56
57        // Parse header.
58        let header_bytes: &[u8; HEADER_SIZE] = data[self.offset..self.offset + HEADER_SIZE]
59            .try_into()
60            .map_err(|_| {
61                WalError::Io(std::io::Error::new(
62                    std::io::ErrorKind::InvalidData,
63                    "header slice conversion failed",
64                ))
65            })?;
66        let header = RecordHeader::from_bytes(header_bytes);
67
68        // Validate magic — corruption or end of valid data.
69        if header.magic != WAL_MAGIC {
70            return Ok(None);
71        }
72
73        // Validate version.
74        if header.validate(self.offset as u64).is_err() {
75            return Ok(None);
76        }
77
78        let payload_len = header.payload_len as usize;
79        let record_end = self.offset + HEADER_SIZE + payload_len;
80
81        // Check if payload is fully within the mmap'd region.
82        if record_end > data.len() {
83            return Ok(None); // Torn write at segment end.
84        }
85
86        // Extract payload (copies from mmap to owned Vec).
87        let payload = data[self.offset + HEADER_SIZE..record_end].to_vec();
88        self.offset = record_end;
89
90        let record = WalRecord { header, payload };
91
92        // Verify checksum.
93        if record.verify_checksum().is_err() {
94            return Ok(None); // Corruption — end of committed prefix.
95        }
96
97        // Check record type.
98        let logical_type = record.logical_record_type();
99        if RecordType::from_raw(logical_type).is_none() {
100            if RecordType::is_required(logical_type) {
101                return Err(WalError::UnknownRequiredRecordType {
102                    record_type: header.record_type,
103                    lsn: header.lsn,
104                });
105            }
106            // Unknown optional record — skip and continue.
107            return self.next_record();
108        }
109
110        Ok(Some(record))
111    }
112
113    /// Iterator over all valid records in the mmap'd segment.
114    pub fn records(self) -> MmapRecordIter {
115        MmapRecordIter { reader: self }
116    }
117
118    /// Current read offset.
119    pub fn offset(&self) -> usize {
120        self.offset
121    }
122
123    /// Total size of the mmap'd region.
124    pub fn len(&self) -> usize {
125        self.mmap.len()
126    }
127
128    /// Whether the mmap'd region is empty.
129    pub fn is_empty(&self) -> bool {
130        self.mmap.is_empty()
131    }
132}
133
134/// Iterator over records in a mmap'd WAL segment.
135pub struct MmapRecordIter {
136    reader: MmapWalReader,
137}
138
139impl Iterator for MmapRecordIter {
140    type Item = Result<WalRecord>;
141
142    fn next(&mut self) -> Option<Self::Item> {
143        match self.reader.next_record() {
144            Ok(Some(record)) => Some(Ok(record)),
145            Ok(None) => None,
146            Err(e) => Some(Err(e)),
147        }
148    }
149}
150
151/// Minimum number of segments to justify parallel replay overhead.
152const PARALLEL_SEGMENT_THRESHOLD: usize = 4;
153
154/// Replay WAL segments from a directory using mmap, starting from `from_lsn`.
155///
156/// Discovers all sealed segments, mmap's each, and returns records with
157/// LSN >= `from_lsn`. This is the Event Plane's tier-2 catchup path.
158///
159/// When 4+ segments need scanning, uses `std::thread::scope` to read
160/// segments in parallel (one thread per segment). Each thread mmap's its
161/// segment and filters records independently; results are merged in
162/// segment order (already LSN-sorted since segments are monotonic).
163pub fn replay_segments_mmap(wal_dir: &Path, from_lsn: u64) -> Result<Vec<WalRecord>> {
164    let segments = crate::segment::discover_segments(wal_dir)?;
165
166    if segments.len() < PARALLEL_SEGMENT_THRESHOLD {
167        return replay_segments_sequential(&segments, from_lsn);
168    }
169
170    replay_segments_parallel(&segments, from_lsn)
171}
172
173/// Sequential segment replay (used for small segment counts).
174fn replay_segments_sequential(
175    segments: &[crate::segment::SegmentMeta],
176    from_lsn: u64,
177) -> Result<Vec<WalRecord>> {
178    let mut records = Vec::new();
179    for seg in segments {
180        let reader = MmapWalReader::open(&seg.path)?;
181        for record_result in reader.records() {
182            let record = record_result?;
183            if record.header.lsn >= from_lsn {
184                records.push(record);
185            }
186        }
187    }
188    Ok(records)
189}
190
191/// Parallel segment replay using scoped threads.
192///
193/// Each segment is read in its own thread via mmap. Since segments are
194/// monotonically ordered by LSN, concatenating per-segment results in
195/// segment order produces a globally LSN-ordered result.
196fn replay_segments_parallel(
197    segments: &[crate::segment::SegmentMeta],
198    from_lsn: u64,
199) -> Result<Vec<WalRecord>> {
200    // Collect per-segment results. Index corresponds to segment order.
201    let mut per_segment: Vec<Result<Vec<WalRecord>>> = Vec::with_capacity(segments.len());
202
203    std::thread::scope(|scope| {
204        let handles: Vec<_> = segments
205            .iter()
206            .map(|seg| {
207                scope.spawn(move || -> Result<Vec<WalRecord>> {
208                    let reader = MmapWalReader::open(&seg.path)?;
209                    let mut seg_records = Vec::new();
210                    for record_result in reader.records() {
211                        let record = record_result?;
212                        if record.header.lsn >= from_lsn {
213                            seg_records.push(record);
214                        }
215                    }
216                    Ok(seg_records)
217                })
218            })
219            .collect();
220
221        for handle in handles {
222            per_segment.push(handle.join().unwrap_or_else(|_| {
223                Err(WalError::Io(std::io::Error::other(
224                    "segment replay thread panicked",
225                )))
226            }));
227        }
228    });
229
230    // Merge in segment order (preserves LSN ordering).
231    let total_estimate: usize = per_segment
232        .iter()
233        .map(|r| r.as_ref().map(|v| v.len()).unwrap_or(0))
234        .sum();
235    let mut records = Vec::with_capacity(total_estimate);
236    for seg_result in per_segment {
237        records.extend(seg_result?);
238    }
239
240    Ok(records)
241}
242
243/// Paginated mmap replay: reads at most `max_records` from `from_lsn`.
244///
245/// Returns `(records, has_more)` where `has_more` is `true` if the limit
246/// was reached before all segments were exhausted. This bounds memory
247/// usage per catch-up cycle to O(max_records) instead of O(all WAL data).
248///
249/// Always uses sequential reading (no parallel threads) since the bounded
250/// record count makes parallel overhead unnecessary.
251pub fn replay_segments_mmap_limit(
252    wal_dir: &Path,
253    from_lsn: u64,
254    max_records: usize,
255) -> Result<(Vec<WalRecord>, bool)> {
256    let segments = crate::segment::discover_segments(wal_dir)?;
257    let mut records = Vec::with_capacity(max_records.min(4096));
258
259    for seg in &segments {
260        // Skip segments that end before from_lsn. A segment's max LSN
261        // is at least its first_lsn, so if first_lsn of the NEXT segment
262        // is <= from_lsn we can skip this one. Conservative: always read
263        // the last segment since we don't know its max LSN cheaply.
264        let reader = MmapWalReader::open(&seg.path)?;
265        for record_result in reader.records() {
266            let record = record_result?;
267            if record.header.lsn >= from_lsn {
268                records.push(record);
269                if records.len() >= max_records {
270                    return Ok((records, true));
271                }
272            }
273        }
274    }
275
276    Ok((records, false))
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use crate::record::RecordType;
283    use crate::writer::{WalWriter, WalWriterConfig};
284
285    fn test_writer(path: &Path) -> WalWriter {
286        let config = WalWriterConfig {
287            use_direct_io: false, // Tests run without O_DIRECT.
288            ..Default::default()
289        };
290        WalWriter::open(path, config).unwrap()
291    }
292
293    #[test]
294    fn mmap_reader_basic() {
295        let dir = tempfile::tempdir().unwrap();
296        let path = dir.path().join("test.wal");
297
298        // Write some records with the standard writer.
299        {
300            let mut writer = test_writer(&path);
301            writer
302                .append(RecordType::Put as u16, 1, 0, b"hello")
303                .unwrap();
304            writer
305                .append(RecordType::Put as u16, 1, 0, b"world")
306                .unwrap();
307            writer.sync().unwrap();
308        }
309
310        // Read back with mmap reader.
311        let reader = MmapWalReader::open(&path).unwrap();
312        let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
313
314        assert_eq!(records.len(), 2);
315        assert_eq!(records[0].payload, b"hello");
316        assert_eq!(records[1].payload, b"world");
317    }
318
319    #[test]
320    fn mmap_reader_empty_file() {
321        let dir = tempfile::tempdir().unwrap();
322        let path = dir.path().join("empty.wal");
323        std::fs::write(&path, []).unwrap();
324
325        let reader = MmapWalReader::open(&path).unwrap();
326        let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
327        assert!(records.is_empty());
328    }
329
330    #[test]
331    fn mmap_reader_truncated_header() {
332        let dir = tempfile::tempdir().unwrap();
333        let path = dir.path().join("truncated.wal");
334        // Write 10 bytes — not enough for a header (30 bytes).
335        std::fs::write(&path, [0u8; 10]).unwrap();
336
337        let reader = MmapWalReader::open(&path).unwrap();
338        let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
339        assert!(records.is_empty());
340    }
341
342    #[test]
343    fn replay_mmap_from_lsn() {
344        let dir = tempfile::tempdir().unwrap();
345        let wal_dir = dir.path().join("wal");
346        std::fs::create_dir_all(&wal_dir).unwrap();
347
348        let config = crate::segmented::SegmentedWalConfig::for_testing(wal_dir.clone());
349        let mut wal = crate::segmented::SegmentedWal::open(config).unwrap();
350
351        let lsn1 = wal.append(RecordType::Put as u16, 1, 0, b"a").unwrap();
352        let lsn2 = wal.append(RecordType::Put as u16, 1, 0, b"b").unwrap();
353        let lsn3 = wal.append(RecordType::Put as u16, 1, 0, b"c").unwrap();
354        wal.sync().unwrap();
355
356        // Replay from lsn2 — should get records b and c.
357        let records = replay_segments_mmap(&wal_dir, lsn2).unwrap();
358        assert_eq!(records.len(), 2);
359        assert_eq!(records[0].header.lsn, lsn2);
360        assert_eq!(records[1].header.lsn, lsn3);
361
362        // Replay from lsn1 — all 3.
363        let all = replay_segments_mmap(&wal_dir, lsn1).unwrap();
364        assert_eq!(all.len(), 3);
365    }
366}