Skip to main content

nodedb_wal/
mmap_reader.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Memory-mapped WAL segment reader for Event Plane catchup.
4//!
5//! Unlike the standard `WalReader` (which uses sequential `read_exact`),
6//! this reader maps sealed WAL segments into the process address space via
7//! `mmap`. The kernel manages the page cache — no slab allocator memory is
8//! pinned, and mmap reads from page cache don't contend with the Data Plane's
9//! O_DIRECT WAL append path (O_DIRECT bypasses page cache entirely).
10//!
11//! **Tier progression:**
12//! 1. In-memory Arc slabs (hot, zero-copy from ring buffer)
13//! 2. Mmap WAL segment reads (warm, kernel-managed pages)
14//! 3. Shed consumer + cold WAL replay (last resort)
15//!
16//! This reader is used in tier 2: when the Event Plane enters WAL Catchup
17//! Mode, it mmap's the relevant sealed segments and iterates records.
18
19use std::os::fd::AsRawFd;
20use std::path::Path;
21use std::sync::atomic::{AtomicU64, Ordering};
22
23use memmap2::Mmap;
24
25use crate::error::{Result, WalError};
26use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WAL_MAGIC, WalRecord};
27
28/// Module-scoped atomic counters for observing mmap and fadvise behaviour in
29/// production. These counters are incremented by the live code paths (open,
30/// madvise, fadvise) and may be read from tests or from a metrics scrape.
31pub mod observability {
32    use super::{AtomicU64, Ordering};
33    pub(super) static SEGMENTS_OPENED: AtomicU64 = AtomicU64::new(0);
34    pub(super) static FADV_DONTNEED_COUNT: AtomicU64 = AtomicU64::new(0);
35    pub(super) static MADV_SEQUENTIAL_COUNT: AtomicU64 = AtomicU64::new(0);
36
37    pub fn segments_opened() -> u64 {
38        SEGMENTS_OPENED.load(Ordering::Relaxed)
39    }
40    pub fn fadv_dontneed_count() -> u64 {
41        FADV_DONTNEED_COUNT.load(Ordering::Relaxed)
42    }
43    pub fn madv_sequential_count() -> u64 {
44        MADV_SEQUENTIAL_COUNT.load(Ordering::Relaxed)
45    }
46}
47
48/// Call `posix_fadvise(POSIX_FADV_DONTNEED)` on an open WAL segment fd.
49///
50/// Once a segment has been iterated end-to-end during catchup, we don't
51/// need its pages in cache any longer. Release them back to the kernel so
52/// replay doesn't pin GiBs of page cache.
53fn fadv_dontneed(fd: &std::fs::File, len: usize, path: &Path) {
54    if len == 0 {
55        return;
56    }
57    let rc = unsafe {
58        libc::posix_fadvise(
59            fd.as_raw_fd(),
60            0,
61            len as libc::off_t,
62            libc::POSIX_FADV_DONTNEED,
63        )
64    };
65    if rc == 0 {
66        observability::FADV_DONTNEED_COUNT.fetch_add(1, Ordering::Relaxed);
67    } else {
68        tracing::warn!(
69            path = %path.display(),
70            errno = rc,
71            "posix_fadvise(DONTNEED) failed on exhausted WAL segment",
72        );
73    }
74}
75
76/// Memory-mapped WAL segment reader.
77///
78/// Opens a sealed WAL segment file via mmap and provides zero-copy
79/// iteration over records. The mmap'd region is read-only and the
80/// kernel manages page residency — no application-level memory pinning.
81pub struct MmapWalReader {
82    mmap: Mmap,
83    offset: usize,
84    file: std::fs::File,
85    path: std::path::PathBuf,
86    madvise_state: Option<libc::c_int>,
87}
88
89impl MmapWalReader {
90    /// Open a WAL segment file for mmap'd reading.
91    pub fn open(path: &Path) -> Result<Self> {
92        observability::SEGMENTS_OPENED.fetch_add(1, Ordering::Relaxed);
93        let file = std::fs::File::open(path)?;
94        // SAFETY: The file is a sealed WAL segment (not being written to).
95        // The Data Plane writes to the ACTIVE segment via O_DIRECT; sealed
96        // segments are immutable after rollover.
97        let mmap = unsafe { Mmap::map(&file)? };
98
99        // Catchup iterates forward through a segment. MADV_SEQUENTIAL
100        // doubles readahead and drops already-consumed pages eagerly so
101        // replay doesn't grow buff/cache by the full WAL size.
102        let mut madvise_state = None;
103        if !mmap.is_empty() {
104            let rc = unsafe {
105                libc::madvise(
106                    mmap.as_ptr() as *mut libc::c_void,
107                    mmap.len(),
108                    libc::MADV_SEQUENTIAL,
109                )
110            };
111            if rc == 0 {
112                madvise_state = Some(libc::MADV_SEQUENTIAL);
113                observability::MADV_SEQUENTIAL_COUNT.fetch_add(1, Ordering::Relaxed);
114            } else {
115                tracing::warn!(
116                    path = %path.display(),
117                    errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0),
118                    "madvise(MADV_SEQUENTIAL) failed on WAL segment; continuing",
119                );
120            }
121        }
122
123        Ok(Self {
124            mmap,
125            offset: 0,
126            file,
127            path: path.to_path_buf(),
128            madvise_state,
129        })
130    }
131
132    /// The madvise hint applied to the mapped segment (if any).
133    pub fn madvise_state(&self) -> Option<libc::c_int> {
134        self.madvise_state
135    }
136
137    /// Hint to the kernel that pages for this segment can be dropped from
138    /// cache. Call this after a segment has been iterated end-to-end.
139    pub fn release_pages(&self) {
140        fadv_dontneed(&self.file, self.mmap.len(), &self.path);
141    }
142
143    /// Read the next record from the mmap'd region.
144    ///
145    /// Returns `None` at EOF or at the first corruption point.
146    /// Zero-copy: payload bytes reference the mmap'd region directly.
147    pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
148        let data = &self.mmap[..];
149
150        loop {
151            // Check if we have enough bytes for a header.
152            if self.offset + HEADER_SIZE > data.len() {
153                return Ok(None);
154            }
155
156            // Parse header.
157            let header_bytes: &[u8; HEADER_SIZE] = data[self.offset..self.offset + HEADER_SIZE]
158                .try_into()
159                .map_err(|_| {
160                    WalError::Io(std::io::Error::new(
161                        std::io::ErrorKind::InvalidData,
162                        "header slice conversion failed",
163                    ))
164                })?;
165            let header = RecordHeader::from_bytes(header_bytes);
166
167            // Validate magic — corruption or end of valid data.
168            if header.magic != WAL_MAGIC {
169                return Ok(None);
170            }
171
172            // Validate version.
173            if header.validate(self.offset as u64).is_err() {
174                return Ok(None);
175            }
176
177            let payload_len = header.payload_len as usize;
178            let record_end = self.offset + HEADER_SIZE + payload_len;
179
180            // Check if payload is fully within the mmap'd region.
181            if record_end > data.len() {
182                return Ok(None); // Torn write at segment end.
183            }
184
185            // Extract payload (copies from mmap to owned Vec).
186            let payload = data[self.offset + HEADER_SIZE..record_end].to_vec();
187            self.offset = record_end;
188
189            let record = WalRecord { header, payload };
190
191            // Verify checksum.
192            if record.verify_checksum().is_err() {
193                return Ok(None); // Corruption — end of committed prefix.
194            }
195
196            // Check record type.
197            let logical_type = record.logical_record_type();
198            if RecordType::from_raw(logical_type).is_none() {
199                if RecordType::is_required(logical_type) {
200                    return Err(WalError::UnknownRequiredRecordType {
201                        record_type: header.record_type,
202                        lsn: header.lsn,
203                    });
204                }
205                // Unknown optional record — skip and continue loop.
206                continue;
207            }
208
209            return Ok(Some(record));
210        }
211    }
212
213    /// Iterator over all valid records in the mmap'd segment.
214    pub fn records(self) -> MmapRecordIter {
215        MmapRecordIter { reader: self }
216    }
217
218    /// Current read offset.
219    pub fn offset(&self) -> usize {
220        self.offset
221    }
222
223    /// Total size of the mmap'd region.
224    pub fn len(&self) -> usize {
225        self.mmap.len()
226    }
227
228    /// Whether the mmap'd region is empty.
229    pub fn is_empty(&self) -> bool {
230        self.mmap.is_empty()
231    }
232}
233
234/// Iterator over records in a mmap'd WAL segment.
235pub struct MmapRecordIter {
236    reader: MmapWalReader,
237}
238
239impl Iterator for MmapRecordIter {
240    type Item = Result<WalRecord>;
241
242    fn next(&mut self) -> Option<Self::Item> {
243        match self.reader.next_record() {
244            Ok(Some(record)) => Some(Ok(record)),
245            Ok(None) => None,
246            Err(e) => Some(Err(e)),
247        }
248    }
249}
250
251/// Minimum number of segments to justify parallel replay overhead.
252const PARALLEL_SEGMENT_THRESHOLD: usize = 4;
253
254/// Replay WAL segments from a directory using mmap, starting from `from_lsn`.
255///
256/// Discovers all sealed segments, mmap's each, and returns records with
257/// LSN >= `from_lsn`. This is the Event Plane's tier-2 catchup path.
258///
259/// When 4+ segments need scanning, uses `std::thread::scope` to read
260/// segments in parallel (one thread per segment). Each thread mmap's its
261/// segment and filters records independently; results are merged in
262/// segment order (already LSN-sorted since segments are monotonic).
263pub fn replay_segments_mmap(wal_dir: &Path, from_lsn: u64) -> Result<Vec<WalRecord>> {
264    let segments = crate::segment::discover_segments(wal_dir)?;
265    let live = filter_segments_by_lsn(&segments, from_lsn);
266
267    if live.len() < PARALLEL_SEGMENT_THRESHOLD {
268        return replay_segments_sequential(live, from_lsn);
269    }
270
271    replay_segments_parallel(live, from_lsn)
272}
273
274/// Return the slice of `segments` whose LSN range may contain records with
275/// lsn >= `from_lsn`. A segment at index `i` is skippable iff the next
276/// segment's `first_lsn` is `<= from_lsn` — meaning segment `i`'s entire
277/// range is strictly below the cutoff. The last segment is never skipped
278/// on this criterion because its upper bound is unknown.
279fn filter_segments_by_lsn(
280    segments: &[crate::segment::SegmentMeta],
281    from_lsn: u64,
282) -> &[crate::segment::SegmentMeta] {
283    // Find the first segment whose next-segment first_lsn > from_lsn, OR
284    // the last segment (always live). Since segments are LSN-sorted, the
285    // live tail starts at the largest i such that segments[i].first_lsn
286    // <= from_lsn.
287    let mut start = 0;
288    for i in 0..segments.len() {
289        // Segment i covers [first_lsn_i, first_lsn_{i+1}).
290        let upper = segments.get(i + 1).map(|s| s.first_lsn).unwrap_or(u64::MAX);
291        if upper > from_lsn {
292            start = i;
293            break;
294        }
295        start = i + 1;
296    }
297    if start >= segments.len() {
298        // All segments strictly below from_lsn; nothing to replay.
299        return &[];
300    }
301    &segments[start..]
302}
303
304/// Sequential segment replay (used for small segment counts).
305fn replay_segments_sequential(
306    segments: &[crate::segment::SegmentMeta],
307    from_lsn: u64,
308) -> Result<Vec<WalRecord>> {
309    let mut records = Vec::new();
310    for seg in segments {
311        let mut reader = MmapWalReader::open(&seg.path)?;
312        while let Some(record) = reader.next_record()? {
313            if record.header.lsn >= from_lsn {
314                records.push(record);
315            }
316        }
317        reader.release_pages();
318    }
319    Ok(records)
320}
321
322/// Parallel segment replay using scoped threads.
323///
324/// Each segment is read in its own thread via mmap. Since segments are
325/// monotonically ordered by LSN, concatenating per-segment results in
326/// segment order produces a globally LSN-ordered result.
327fn replay_segments_parallel(
328    segments: &[crate::segment::SegmentMeta],
329    from_lsn: u64,
330) -> Result<Vec<WalRecord>> {
331    // Collect per-segment results. Index corresponds to segment order.
332    let mut per_segment: Vec<Result<Vec<WalRecord>>> = Vec::with_capacity(segments.len());
333
334    std::thread::scope(|scope| {
335        let handles: Vec<_> = segments
336            .iter()
337            .map(|seg| {
338                scope.spawn(move || -> Result<Vec<WalRecord>> {
339                    let mut reader = MmapWalReader::open(&seg.path)?;
340                    let mut seg_records = Vec::new();
341                    while let Some(record) = reader.next_record()? {
342                        if record.header.lsn >= from_lsn {
343                            seg_records.push(record);
344                        }
345                    }
346                    reader.release_pages();
347                    Ok(seg_records)
348                })
349            })
350            .collect();
351
352        for handle in handles {
353            per_segment.push(handle.join().unwrap_or_else(|_| {
354                Err(WalError::Io(std::io::Error::other(
355                    "segment replay thread panicked",
356                )))
357            }));
358        }
359    });
360
361    // Merge in segment order (preserves LSN ordering).
362    let total_estimate: usize = per_segment
363        .iter()
364        .map(|r| r.as_ref().map(|v| v.len()).unwrap_or(0))
365        .sum();
366    let mut records = Vec::with_capacity(total_estimate);
367    for seg_result in per_segment {
368        records.extend(seg_result?);
369    }
370
371    Ok(records)
372}
373
374/// Paginated mmap replay: reads at most `max_records` from `from_lsn`.
375///
376/// Returns `(records, has_more)` where `has_more` is `true` if the limit
377/// was reached before all segments were exhausted. This bounds memory
378/// usage per catch-up cycle to O(max_records) instead of O(all WAL data).
379///
380/// Always uses sequential reading (no parallel threads) since the bounded
381/// record count makes parallel overhead unnecessary.
382pub fn replay_segments_mmap_limit(
383    wal_dir: &Path,
384    from_lsn: u64,
385    max_records: usize,
386) -> Result<(Vec<WalRecord>, bool)> {
387    let segments = crate::segment::discover_segments(wal_dir)?;
388    let live = filter_segments_by_lsn(&segments, from_lsn);
389    let mut records = Vec::with_capacity(max_records.min(4096));
390
391    for seg in live {
392        let mut reader = MmapWalReader::open(&seg.path)?;
393        while let Some(record) = reader.next_record()? {
394            if record.header.lsn >= from_lsn {
395                records.push(record);
396                if records.len() >= max_records {
397                    // Partial scan — don't release pages for a segment
398                    // we'll likely re-open on the next catchup cycle.
399                    return Ok((records, true));
400                }
401            }
402        }
403        reader.release_pages();
404    }
405
406    Ok((records, false))
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412    use crate::record::RecordType;
413    use crate::writer::{WalWriter, WalWriterConfig};
414
415    fn test_writer(path: &Path) -> WalWriter {
416        let config = WalWriterConfig {
417            use_direct_io: false, // Tests run without O_DIRECT.
418            ..Default::default()
419        };
420        WalWriter::open(path, config).unwrap()
421    }
422
423    #[test]
424    fn mmap_reader_basic() {
425        let dir = tempfile::tempdir().unwrap();
426        let path = dir.path().join("test.wal");
427
428        // Write some records with the standard writer.
429        {
430            let mut writer = test_writer(&path);
431            writer
432                .append(RecordType::Put as u32, 1, 0, 0, b"hello")
433                .unwrap();
434            writer
435                .append(RecordType::Put as u32, 1, 0, 0, b"world")
436                .unwrap();
437            writer.sync().unwrap();
438        }
439
440        // Read back with mmap reader.
441        let reader = MmapWalReader::open(&path).unwrap();
442        let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
443
444        assert_eq!(records.len(), 2);
445        assert_eq!(records[0].payload, b"hello");
446        assert_eq!(records[1].payload, b"world");
447    }
448
449    #[test]
450    fn mmap_reader_empty_file() {
451        let dir = tempfile::tempdir().unwrap();
452        let path = dir.path().join("empty.wal");
453        std::fs::write(&path, []).unwrap();
454
455        let reader = MmapWalReader::open(&path).unwrap();
456        let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
457        assert!(records.is_empty());
458    }
459
460    #[test]
461    fn mmap_reader_truncated_header() {
462        let dir = tempfile::tempdir().unwrap();
463        let path = dir.path().join("truncated.wal");
464        // Write 10 bytes — not enough for a header (30 bytes).
465        std::fs::write(&path, [0u8; 10]).unwrap();
466
467        let reader = MmapWalReader::open(&path).unwrap();
468        let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
469        assert!(records.is_empty());
470    }
471
472    #[test]
473    fn replay_mmap_from_lsn() {
474        let dir = tempfile::tempdir().unwrap();
475        let wal_dir = dir.path().join("wal");
476        std::fs::create_dir_all(&wal_dir).unwrap();
477
478        let config = crate::segmented::SegmentedWalConfig::for_testing(wal_dir.clone());
479        let mut wal = crate::segmented::SegmentedWal::open(config).unwrap();
480
481        let lsn1 = wal.append(RecordType::Put as u32, 1, 0, 0, b"a").unwrap();
482        let lsn2 = wal.append(RecordType::Put as u32, 1, 0, 0, b"b").unwrap();
483        let lsn3 = wal.append(RecordType::Put as u32, 1, 0, 0, b"c").unwrap();
484        wal.sync().unwrap();
485
486        // Replay from lsn2 — should get records b and c.
487        let records = replay_segments_mmap(&wal_dir, lsn2).unwrap();
488        assert_eq!(records.len(), 2);
489        assert_eq!(records[0].header.lsn, lsn2);
490        assert_eq!(records[1].header.lsn, lsn3);
491
492        // Replay from lsn1 — all 3.
493        let all = replay_segments_mmap(&wal_dir, lsn1).unwrap();
494        assert_eq!(all.len(), 3);
495    }
496}