Skip to main content

nodedb_wal/
mmap_reader.rs

1//! Memory-mapped WAL segment reader for Event Plane catchup.
2//!
3//! Unlike the standard `WalReader` (which uses sequential `read_exact`),
4//! this reader maps sealed WAL segments into the process address space via
5//! `mmap`. The kernel manages the page cache — no slab allocator memory is
6//! pinned, and mmap reads from page cache don't contend with the Data Plane's
7//! O_DIRECT WAL append path (O_DIRECT bypasses page cache entirely).
8//!
9//! **Tier progression:**
10//! 1. In-memory Arc slabs (hot, zero-copy from ring buffer)
11//! 2. Mmap WAL segment reads (warm, kernel-managed pages)
12//! 3. Shed consumer + cold WAL replay (last resort)
13//!
14//! This reader is used in tier 2: when the Event Plane enters WAL Catchup
15//! Mode, it mmap's the relevant sealed segments and iterates records.
16
17use std::os::fd::AsRawFd;
18use std::path::Path;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21use memmap2::Mmap;
22
23use crate::error::{Result, WalError};
24use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WAL_MAGIC, WalRecord};
25
26/// Module-scoped counters for observing mmap/fadvise behaviour.
27pub mod test_hooks {
28    use super::{AtomicU64, Ordering};
29    pub(super) static SEGMENTS_OPENED: AtomicU64 = AtomicU64::new(0);
30    pub(super) static FADV_DONTNEED_COUNT: AtomicU64 = AtomicU64::new(0);
31    pub(super) static MADV_SEQUENTIAL_COUNT: AtomicU64 = AtomicU64::new(0);
32
33    pub fn segments_opened() -> u64 {
34        SEGMENTS_OPENED.load(Ordering::Relaxed)
35    }
36    pub fn fadv_dontneed_count() -> u64 {
37        FADV_DONTNEED_COUNT.load(Ordering::Relaxed)
38    }
39    pub fn madv_sequential_count() -> u64 {
40        MADV_SEQUENTIAL_COUNT.load(Ordering::Relaxed)
41    }
42}
43
44/// Call `posix_fadvise(POSIX_FADV_DONTNEED)` on an open WAL segment fd.
45///
46/// Once a segment has been iterated end-to-end during catchup, we don't
47/// need its pages in cache any longer. Release them back to the kernel so
48/// replay doesn't pin GiBs of page cache.
49fn fadv_dontneed(fd: &std::fs::File, len: usize, path: &Path) {
50    if len == 0 {
51        return;
52    }
53    let rc = unsafe {
54        libc::posix_fadvise(
55            fd.as_raw_fd(),
56            0,
57            len as libc::off_t,
58            libc::POSIX_FADV_DONTNEED,
59        )
60    };
61    if rc == 0 {
62        test_hooks::FADV_DONTNEED_COUNT.fetch_add(1, Ordering::Relaxed);
63    } else {
64        tracing::warn!(
65            path = %path.display(),
66            errno = rc,
67            "posix_fadvise(DONTNEED) failed on exhausted WAL segment",
68        );
69    }
70}
71
72/// Memory-mapped WAL segment reader.
73///
74/// Opens a sealed WAL segment file via mmap and provides zero-copy
75/// iteration over records. The mmap'd region is read-only and the
76/// kernel manages page residency — no application-level memory pinning.
77pub struct MmapWalReader {
78    mmap: Mmap,
79    offset: usize,
80    file: std::fs::File,
81    path: std::path::PathBuf,
82    madvise_state: Option<libc::c_int>,
83}
84
85impl MmapWalReader {
86    /// Open a WAL segment file for mmap'd reading.
87    pub fn open(path: &Path) -> Result<Self> {
88        test_hooks::SEGMENTS_OPENED.fetch_add(1, Ordering::Relaxed);
89        let file = std::fs::File::open(path)?;
90        // SAFETY: The file is a sealed WAL segment (not being written to).
91        // The Data Plane writes to the ACTIVE segment via O_DIRECT; sealed
92        // segments are immutable after rollover.
93        let mmap = unsafe { Mmap::map(&file)? };
94
95        // Catchup iterates forward through a segment. MADV_SEQUENTIAL
96        // doubles readahead and drops already-consumed pages eagerly so
97        // replay doesn't grow buff/cache by the full WAL size.
98        let mut madvise_state = None;
99        if !mmap.is_empty() {
100            let rc = unsafe {
101                libc::madvise(
102                    mmap.as_ptr() as *mut libc::c_void,
103                    mmap.len(),
104                    libc::MADV_SEQUENTIAL,
105                )
106            };
107            if rc == 0 {
108                madvise_state = Some(libc::MADV_SEQUENTIAL);
109                test_hooks::MADV_SEQUENTIAL_COUNT.fetch_add(1, Ordering::Relaxed);
110            } else {
111                tracing::warn!(
112                    path = %path.display(),
113                    errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0),
114                    "madvise(MADV_SEQUENTIAL) failed on WAL segment; continuing",
115                );
116            }
117        }
118
119        Ok(Self {
120            mmap,
121            offset: 0,
122            file,
123            path: path.to_path_buf(),
124            madvise_state,
125        })
126    }
127
128    /// The madvise hint applied to the mapped segment (if any).
129    pub fn madvise_state(&self) -> Option<libc::c_int> {
130        self.madvise_state
131    }
132
133    /// Hint to the kernel that pages for this segment can be dropped from
134    /// cache. Call this after a segment has been iterated end-to-end.
135    pub fn release_pages(&self) {
136        fadv_dontneed(&self.file, self.mmap.len(), &self.path);
137    }
138
139    /// Read the next record from the mmap'd region.
140    ///
141    /// Returns `None` at EOF or at the first corruption point.
142    /// Zero-copy: payload bytes reference the mmap'd region directly.
143    pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
144        let data = &self.mmap[..];
145
146        loop {
147            // Check if we have enough bytes for a header.
148            if self.offset + HEADER_SIZE > data.len() {
149                return Ok(None);
150            }
151
152            // Parse header.
153            let header_bytes: &[u8; HEADER_SIZE] = data[self.offset..self.offset + HEADER_SIZE]
154                .try_into()
155                .map_err(|_| {
156                    WalError::Io(std::io::Error::new(
157                        std::io::ErrorKind::InvalidData,
158                        "header slice conversion failed",
159                    ))
160                })?;
161            let header = RecordHeader::from_bytes(header_bytes);
162
163            // Validate magic — corruption or end of valid data.
164            if header.magic != WAL_MAGIC {
165                return Ok(None);
166            }
167
168            // Validate version.
169            if header.validate(self.offset as u64).is_err() {
170                return Ok(None);
171            }
172
173            let payload_len = header.payload_len as usize;
174            let record_end = self.offset + HEADER_SIZE + payload_len;
175
176            // Check if payload is fully within the mmap'd region.
177            if record_end > data.len() {
178                return Ok(None); // Torn write at segment end.
179            }
180
181            // Extract payload (copies from mmap to owned Vec).
182            let payload = data[self.offset + HEADER_SIZE..record_end].to_vec();
183            self.offset = record_end;
184
185            let record = WalRecord { header, payload };
186
187            // Verify checksum.
188            if record.verify_checksum().is_err() {
189                return Ok(None); // Corruption — end of committed prefix.
190            }
191
192            // Check record type.
193            let logical_type = record.logical_record_type();
194            if RecordType::from_raw(logical_type).is_none() {
195                if RecordType::is_required(logical_type) {
196                    return Err(WalError::UnknownRequiredRecordType {
197                        record_type: header.record_type,
198                        lsn: header.lsn,
199                    });
200                }
201                // Unknown optional record — skip and continue loop.
202                continue;
203            }
204
205            return Ok(Some(record));
206        }
207    }
208
209    /// Iterator over all valid records in the mmap'd segment.
210    pub fn records(self) -> MmapRecordIter {
211        MmapRecordIter { reader: self }
212    }
213
214    /// Current read offset.
215    pub fn offset(&self) -> usize {
216        self.offset
217    }
218
219    /// Total size of the mmap'd region.
220    pub fn len(&self) -> usize {
221        self.mmap.len()
222    }
223
224    /// Whether the mmap'd region is empty.
225    pub fn is_empty(&self) -> bool {
226        self.mmap.is_empty()
227    }
228}
229
230/// Iterator over records in a mmap'd WAL segment.
231pub struct MmapRecordIter {
232    reader: MmapWalReader,
233}
234
235impl Iterator for MmapRecordIter {
236    type Item = Result<WalRecord>;
237
238    fn next(&mut self) -> Option<Self::Item> {
239        match self.reader.next_record() {
240            Ok(Some(record)) => Some(Ok(record)),
241            Ok(None) => None,
242            Err(e) => Some(Err(e)),
243        }
244    }
245}
246
247/// Minimum number of segments to justify parallel replay overhead.
248const PARALLEL_SEGMENT_THRESHOLD: usize = 4;
249
250/// Replay WAL segments from a directory using mmap, starting from `from_lsn`.
251///
252/// Discovers all sealed segments, mmap's each, and returns records with
253/// LSN >= `from_lsn`. This is the Event Plane's tier-2 catchup path.
254///
255/// When 4+ segments need scanning, uses `std::thread::scope` to read
256/// segments in parallel (one thread per segment). Each thread mmap's its
257/// segment and filters records independently; results are merged in
258/// segment order (already LSN-sorted since segments are monotonic).
259pub fn replay_segments_mmap(wal_dir: &Path, from_lsn: u64) -> Result<Vec<WalRecord>> {
260    let segments = crate::segment::discover_segments(wal_dir)?;
261    let live = filter_segments_by_lsn(&segments, from_lsn);
262
263    if live.len() < PARALLEL_SEGMENT_THRESHOLD {
264        return replay_segments_sequential(live, from_lsn);
265    }
266
267    replay_segments_parallel(live, from_lsn)
268}
269
270/// Return the slice of `segments` whose LSN range may contain records with
271/// lsn >= `from_lsn`. A segment at index `i` is skippable iff the next
272/// segment's `first_lsn` is `<= from_lsn` — meaning segment `i`'s entire
273/// range is strictly below the cutoff. The last segment is never skipped
274/// on this criterion because its upper bound is unknown.
275fn filter_segments_by_lsn(
276    segments: &[crate::segment::SegmentMeta],
277    from_lsn: u64,
278) -> &[crate::segment::SegmentMeta] {
279    // Find the first segment whose next-segment first_lsn > from_lsn, OR
280    // the last segment (always live). Since segments are LSN-sorted, the
281    // live tail starts at the largest i such that segments[i].first_lsn
282    // <= from_lsn.
283    let mut start = 0;
284    for i in 0..segments.len() {
285        // Segment i covers [first_lsn_i, first_lsn_{i+1}).
286        let upper = segments.get(i + 1).map(|s| s.first_lsn).unwrap_or(u64::MAX);
287        if upper > from_lsn {
288            start = i;
289            break;
290        }
291        start = i + 1;
292    }
293    if start >= segments.len() {
294        // All segments strictly below from_lsn; nothing to replay.
295        return &[];
296    }
297    &segments[start..]
298}
299
300/// Sequential segment replay (used for small segment counts).
301fn replay_segments_sequential(
302    segments: &[crate::segment::SegmentMeta],
303    from_lsn: u64,
304) -> Result<Vec<WalRecord>> {
305    let mut records = Vec::new();
306    for seg in segments {
307        let mut reader = MmapWalReader::open(&seg.path)?;
308        while let Some(record) = reader.next_record()? {
309            if record.header.lsn >= from_lsn {
310                records.push(record);
311            }
312        }
313        reader.release_pages();
314    }
315    Ok(records)
316}
317
318/// Parallel segment replay using scoped threads.
319///
320/// Each segment is read in its own thread via mmap. Since segments are
321/// monotonically ordered by LSN, concatenating per-segment results in
322/// segment order produces a globally LSN-ordered result.
323fn replay_segments_parallel(
324    segments: &[crate::segment::SegmentMeta],
325    from_lsn: u64,
326) -> Result<Vec<WalRecord>> {
327    // Collect per-segment results. Index corresponds to segment order.
328    let mut per_segment: Vec<Result<Vec<WalRecord>>> = Vec::with_capacity(segments.len());
329
330    std::thread::scope(|scope| {
331        let handles: Vec<_> = segments
332            .iter()
333            .map(|seg| {
334                scope.spawn(move || -> Result<Vec<WalRecord>> {
335                    let mut reader = MmapWalReader::open(&seg.path)?;
336                    let mut seg_records = Vec::new();
337                    while let Some(record) = reader.next_record()? {
338                        if record.header.lsn >= from_lsn {
339                            seg_records.push(record);
340                        }
341                    }
342                    reader.release_pages();
343                    Ok(seg_records)
344                })
345            })
346            .collect();
347
348        for handle in handles {
349            per_segment.push(handle.join().unwrap_or_else(|_| {
350                Err(WalError::Io(std::io::Error::other(
351                    "segment replay thread panicked",
352                )))
353            }));
354        }
355    });
356
357    // Merge in segment order (preserves LSN ordering).
358    let total_estimate: usize = per_segment
359        .iter()
360        .map(|r| r.as_ref().map(|v| v.len()).unwrap_or(0))
361        .sum();
362    let mut records = Vec::with_capacity(total_estimate);
363    for seg_result in per_segment {
364        records.extend(seg_result?);
365    }
366
367    Ok(records)
368}
369
370/// Paginated mmap replay: reads at most `max_records` from `from_lsn`.
371///
372/// Returns `(records, has_more)` where `has_more` is `true` if the limit
373/// was reached before all segments were exhausted. This bounds memory
374/// usage per catch-up cycle to O(max_records) instead of O(all WAL data).
375///
376/// Always uses sequential reading (no parallel threads) since the bounded
377/// record count makes parallel overhead unnecessary.
378pub fn replay_segments_mmap_limit(
379    wal_dir: &Path,
380    from_lsn: u64,
381    max_records: usize,
382) -> Result<(Vec<WalRecord>, bool)> {
383    let segments = crate::segment::discover_segments(wal_dir)?;
384    let live = filter_segments_by_lsn(&segments, from_lsn);
385    let mut records = Vec::with_capacity(max_records.min(4096));
386
387    for seg in live {
388        let mut reader = MmapWalReader::open(&seg.path)?;
389        while let Some(record) = reader.next_record()? {
390            if record.header.lsn >= from_lsn {
391                records.push(record);
392                if records.len() >= max_records {
393                    // Partial scan — don't release pages for a segment
394                    // we'll likely re-open on the next catchup cycle.
395                    return Ok((records, true));
396                }
397            }
398        }
399        reader.release_pages();
400    }
401
402    Ok((records, false))
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use crate::record::RecordType;
409    use crate::writer::{WalWriter, WalWriterConfig};
410
411    fn test_writer(path: &Path) -> WalWriter {
412        let config = WalWriterConfig {
413            use_direct_io: false, // Tests run without O_DIRECT.
414            ..Default::default()
415        };
416        WalWriter::open(path, config).unwrap()
417    }
418
419    #[test]
420    fn mmap_reader_basic() {
421        let dir = tempfile::tempdir().unwrap();
422        let path = dir.path().join("test.wal");
423
424        // Write some records with the standard writer.
425        {
426            let mut writer = test_writer(&path);
427            writer
428                .append(RecordType::Put as u16, 1, 0, b"hello")
429                .unwrap();
430            writer
431                .append(RecordType::Put as u16, 1, 0, b"world")
432                .unwrap();
433            writer.sync().unwrap();
434        }
435
436        // Read back with mmap reader.
437        let reader = MmapWalReader::open(&path).unwrap();
438        let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
439
440        assert_eq!(records.len(), 2);
441        assert_eq!(records[0].payload, b"hello");
442        assert_eq!(records[1].payload, b"world");
443    }
444
445    #[test]
446    fn mmap_reader_empty_file() {
447        let dir = tempfile::tempdir().unwrap();
448        let path = dir.path().join("empty.wal");
449        std::fs::write(&path, []).unwrap();
450
451        let reader = MmapWalReader::open(&path).unwrap();
452        let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
453        assert!(records.is_empty());
454    }
455
456    #[test]
457    fn mmap_reader_truncated_header() {
458        let dir = tempfile::tempdir().unwrap();
459        let path = dir.path().join("truncated.wal");
460        // Write 10 bytes — not enough for a header (30 bytes).
461        std::fs::write(&path, [0u8; 10]).unwrap();
462
463        let reader = MmapWalReader::open(&path).unwrap();
464        let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
465        assert!(records.is_empty());
466    }
467
468    #[test]
469    fn replay_mmap_from_lsn() {
470        let dir = tempfile::tempdir().unwrap();
471        let wal_dir = dir.path().join("wal");
472        std::fs::create_dir_all(&wal_dir).unwrap();
473
474        let config = crate::segmented::SegmentedWalConfig::for_testing(wal_dir.clone());
475        let mut wal = crate::segmented::SegmentedWal::open(config).unwrap();
476
477        let lsn1 = wal.append(RecordType::Put as u16, 1, 0, b"a").unwrap();
478        let lsn2 = wal.append(RecordType::Put as u16, 1, 0, b"b").unwrap();
479        let lsn3 = wal.append(RecordType::Put as u16, 1, 0, b"c").unwrap();
480        wal.sync().unwrap();
481
482        // Replay from lsn2 — should get records b and c.
483        let records = replay_segments_mmap(&wal_dir, lsn2).unwrap();
484        assert_eq!(records.len(), 2);
485        assert_eq!(records[0].header.lsn, lsn2);
486        assert_eq!(records[1].header.lsn, lsn3);
487
488        // Replay from lsn1 — all 3.
489        let all = replay_segments_mmap(&wal_dir, lsn1).unwrap();
490        assert_eq!(all.len(), 3);
491    }
492}