Skip to main content

nodedb_mem/
overflow.rs

1//! mmap-backed overflow region for spilled arena allocations.
2//!
3//! Each Data Plane core gets its own overflow file. The owning core writes
4//! spilled data; other cores may open read-only mmaps for cross-core access.
5//! This preserves zero-lock TPC isolation.
6//!
7//! The region uses a bump allocator within the mmap'd file. When the file fills,
8//! it grows via `ftruncate` + `mremap`.
9
10use std::os::fd::AsRawFd;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::sync::atomic::{AtomicU64, Ordering};
14
15use crate::engine::EngineId;
16use crate::error::{MemError, Result};
17
18/// Module-scoped counters for observing madvise behaviour.
19pub mod test_hooks {
20    use super::{AtomicU64, Ordering};
21    pub(super) static MADV_RANDOM_COUNT: AtomicU64 = AtomicU64::new(0);
22
23    pub fn madv_random_count() -> u64 {
24        MADV_RANDOM_COUNT.load(Ordering::Relaxed)
25    }
26}
27
28/// Advise MADV_RANDOM on a freshly-mapped (or freshly-remapped) overflow
29/// region. Returns the hint on success, `None` on failure or zero-length.
30fn advise_random(base: *mut u8, len: usize, path: &Path) -> Option<libc::c_int> {
31    if len == 0 {
32        return None;
33    }
34    let rc = unsafe { libc::madvise(base as *mut libc::c_void, len, libc::MADV_RANDOM) };
35    if rc == 0 {
36        test_hooks::MADV_RANDOM_COUNT.fetch_add(1, Ordering::Relaxed);
37        Some(libc::MADV_RANDOM)
38    } else {
39        tracing::warn!(
40            path = %path.display(),
41            errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0),
42            "madvise(MADV_RANDOM) failed on overflow region; continuing with kernel default",
43        );
44        None
45    }
46}
47
48/// Metadata for one spilled allocation in the overflow region.
49#[derive(Debug, Clone)]
50pub struct OverflowSlot {
51    /// Offset within the mmap region.
52    pub offset: usize,
53    /// Size of the spilled data.
54    pub size: usize,
55    /// Engine that owned this allocation.
56    pub engine: EngineId,
57    /// Whether this slot is occupied (false = freed/compacted).
58    pub occupied: bool,
59}
60
61/// mmap-backed overflow region owned by a single core.
62///
63/// Uses a bump allocator within the mmap'd file with a free-list for
64/// slot reuse. When a slot is freed, it's added to the free-list so
65/// future writes of equal or smaller size can reclaim the space without
66/// advancing the bump cursor.
67///
68/// Not `Send` or `Sync` — it's single-thread owned.
69pub struct OverflowRegion {
70    /// Path to the overflow file (used for debugging/diagnostics).
71    path: PathBuf,
72    /// File descriptor wrapped in Arc for drop safety.
73    _fd: Arc<std::fs::File>,
74    /// mmap'd region. null if not yet mapped.
75    base: *mut u8,
76    /// Current capacity of the mmap in bytes.
77    capacity: usize,
78    /// Bump pointer: next write starts here.
79    cursor: usize,
80    /// Slot metadata.
81    slots: Vec<OverflowSlot>,
82    /// Free-list: indices of freed slots, sorted largest-first for best-fit.
83    free_list: Vec<usize>,
84    /// Maximum capacity (prevents unbounded growth).
85    max_capacity: usize,
86    /// Last madvise hint applied to `base` (for observability).
87    madvise_state: Option<libc::c_int>,
88}
89
90impl OverflowRegion {
91    /// Default initial mmap size.
92    ///
93    /// Corresponds to `MemoryTuning::overflow_initial_bytes`.
94    pub const DEFAULT_INITIAL_CAPACITY: usize = 64 * 1024 * 1024; // 64 MiB
95
96    /// Default maximum capacity (prevents unbounded growth).
97    ///
98    /// Corresponds to `MemoryTuning::overflow_max_bytes`.
99    pub const DEFAULT_MAX_CAPACITY: usize = 1024 * 1024 * 1024; // 1 GiB
100
101    /// Open or create an overflow region at the given path.
102    ///
103    /// If the file doesn't exist, it's created with the initial capacity.
104    /// If the file exists, it's mapped at current size.
105    pub fn open(path: &Path) -> Result<Self> {
106        Self::open_with_capacity(path, Self::DEFAULT_INITIAL_CAPACITY)
107    }
108
109    /// Open or create an overflow region with explicit initial and maximum capacity.
110    ///
111    /// Use this when applying runtime config from `MemoryTuning`.
112    pub fn open_with_config(
113        path: &Path,
114        initial_capacity: usize,
115        max_capacity: usize,
116    ) -> Result<Self> {
117        let mut region = Self::open_with_capacity(path, initial_capacity)?;
118        region.max_capacity = max_capacity;
119        Ok(region)
120    }
121
122    /// Open or create an overflow region with a specific initial capacity.
123    pub fn open_with_capacity(path: &Path, initial_capacity: usize) -> Result<Self> {
124        let fd = std::fs::OpenOptions::new()
125            .read(true)
126            .write(true)
127            .create(true)
128            .truncate(false)
129            .open(path)
130            .map_err(|e| MemError::Overflow(format!("failed to open overflow file: {e}")))?;
131
132        let current_size = fd
133            .metadata()
134            .map_err(|e| MemError::Overflow(format!("failed to get file metadata: {e}")))?
135            .len() as usize;
136
137        let capacity = if current_size == 0 {
138            // New file — truncate to initial capacity.
139            fd.set_len(initial_capacity as u64)
140                .map_err(|e| MemError::Overflow(format!("failed to truncate file: {e}")))?;
141            initial_capacity
142        } else {
143            current_size
144        };
145
146        // SAFETY: We pass null to let the kernel choose the mapping address.
147        // `capacity` is non-zero (either from file metadata or `initial_capacity`).
148        // `fd` is a valid, open file descriptor with read/write permissions.
149        // MAP_SHARED is correct for file-backed overflow that may be read by other
150        // cores via separate read-only mappings. We check for MAP_FAILED below.
151        let base = unsafe {
152            libc::mmap(
153                std::ptr::null_mut(),
154                capacity,
155                libc::PROT_READ | libc::PROT_WRITE,
156                libc::MAP_SHARED,
157                fd.as_raw_fd(),
158                0,
159            )
160        };
161
162        if base == libc::MAP_FAILED {
163            return Err(MemError::Overflow(
164                "failed to mmap overflow region".to_string(),
165            ));
166        }
167
168        // Overflow spill is written in per-slot bursts and read from
169        // potentially any core. Access is scattered, so MADV_NORMAL readahead
170        // wastes page cache on adjacent slots unlikely to be read together.
171        let madvise_state = advise_random(base as *mut u8, capacity, path);
172
173        Ok(Self {
174            path: path.to_path_buf(),
175            _fd: Arc::new(fd),
176            base: base as *mut u8,
177            capacity,
178            cursor: 0,
179            slots: Vec::new(),
180            free_list: Vec::new(),
181            max_capacity: Self::DEFAULT_MAX_CAPACITY,
182            madvise_state,
183        })
184    }
185
186    /// The madvise hint currently in effect on the mapped region.
187    pub fn madvise_state(&self) -> Option<libc::c_int> {
188        self.madvise_state
189    }
190
191    /// Write data to the overflow region and return the slot index.
192    ///
193    /// First attempts to reuse a freed slot from the free-list (best-fit).
194    /// Falls back to bump allocation if no suitable free slot exists.
195    pub fn write(&mut self, data: &[u8], engine: EngineId) -> Result<usize> {
196        // Try to reuse a freed slot that fits this data.
197        if let Some(reused) = self.try_reuse_slot(data, engine) {
198            return Ok(reused);
199        }
200
201        // Bump allocation path.
202        let required = self.cursor + data.len();
203
204        // Check if we need to grow.
205        if required > self.capacity {
206            self.grow(required)?;
207        }
208
209        // SAFETY: `self.base` is non-null (checked at construction, and after every
210        // mremap in `grow`). `self.cursor + data.len() <= self.capacity` is guaranteed
211        // because we called `grow(required)` above when `required > capacity`.
212        // The source (`data`) and destination (`base + cursor`) cannot overlap because
213        // `data` is a caller-owned slice and `base` is an mmap'd region.
214        unsafe {
215            std::ptr::copy_nonoverlapping(data.as_ptr(), self.base.add(self.cursor), data.len());
216        }
217
218        // Record slot.
219        let slot_index = self.slots.len();
220        self.slots.push(OverflowSlot {
221            offset: self.cursor,
222            size: data.len(),
223            engine,
224            occupied: true,
225        });
226
227        self.cursor += data.len();
228
229        Ok(slot_index)
230    }
231
232    /// Try to reuse a freed slot from the free-list.
233    ///
234    /// Uses best-fit: finds the smallest free slot that can hold `data`.
235    /// This minimizes internal fragmentation.
236    fn try_reuse_slot(&mut self, data: &[u8], engine: EngineId) -> Option<usize> {
237        if self.free_list.is_empty() {
238            return None;
239        }
240
241        // Find best-fit: smallest free slot >= data.len().
242        let mut best_idx = None;
243        let mut best_waste = usize::MAX;
244
245        for (fl_idx, &slot_idx) in self.free_list.iter().enumerate() {
246            let slot_size = self.slots[slot_idx].size;
247            if slot_size >= data.len() {
248                let waste = slot_size - data.len();
249                if waste < best_waste {
250                    best_waste = waste;
251                    best_idx = Some(fl_idx);
252                }
253            }
254        }
255
256        let fl_idx = best_idx?;
257        let slot_index = self.free_list.swap_remove(fl_idx);
258        let slot = &mut self.slots[slot_index];
259
260        // SAFETY: The slot's offset and size were validated when originally
261        // written. The slot is marked unoccupied (checked by free_list membership).
262        // data.len() <= slot.size (checked above). base is non-null.
263        unsafe {
264            std::ptr::copy_nonoverlapping(data.as_ptr(), self.base.add(slot.offset), data.len());
265        }
266
267        slot.occupied = true;
268        slot.engine = engine;
269        // Keep original slot.size — the allocated region doesn't shrink.
270        // Internal fragmentation (slot.size - data.len()) is acceptable
271        // to avoid splitting complexity.
272
273        Some(slot_index)
274    }
275
276    /// Read data from a slot (returns a slice into the mmap).
277    pub fn read(&self, slot_index: usize) -> Result<&[u8]> {
278        let slot = self
279            .slots
280            .get(slot_index)
281            .ok_or_else(|| MemError::Overflow(format!("invalid slot index: {slot_index}")))?;
282
283        if !slot.occupied {
284            return Err(MemError::Overflow(format!(
285                "slot {slot_index} is not occupied"
286            )));
287        }
288
289        // SAFETY: `self.base` is non-null. `slot.offset + slot.size` is within
290        // `self.capacity` because slots are only created by `write()` which enforces
291        // this via the grow check. After `mremap` (MREMAP_MAYMOVE), `self.base` is
292        // updated to the new address, so all prior slot offsets remain valid within
293        // the (potentially relocated) region. The slot is confirmed `occupied` above.
294        let slice = unsafe { std::slice::from_raw_parts(self.base.add(slot.offset), slot.size) };
295
296        Ok(slice)
297    }
298
299    /// Mark a slot as freed and add it to the free-list for reuse.
300    pub fn free(&mut self, slot_index: usize) -> Result<()> {
301        let slot = self
302            .slots
303            .get_mut(slot_index)
304            .ok_or_else(|| MemError::Overflow(format!("invalid slot index: {slot_index}")))?;
305
306        if !slot.occupied {
307            return Err(MemError::Overflow(format!(
308                "slot {slot_index} is already freed"
309            )));
310        }
311
312        slot.occupied = false;
313        self.free_list.push(slot_index);
314        Ok(())
315    }
316
317    /// Current utilization of the mmap region in bytes.
318    pub fn used_bytes(&self) -> usize {
319        self.cursor
320    }
321
322    /// Total capacity of the mmap region in bytes.
323    pub fn capacity(&self) -> usize {
324        self.capacity
325    }
326
327    /// Path to the backing overflow file.
328    pub fn path(&self) -> &Path {
329        &self.path
330    }
331
332    /// Number of slots (occupied or freed).
333    pub fn slot_count(&self) -> usize {
334        self.slots.len()
335    }
336
337    /// Grow the mmap region to accommodate at least `required` bytes.
338    fn grow(&mut self, required: usize) -> Result<()> {
339        let new_capacity = (self.capacity * 2).max(required);
340
341        if new_capacity > self.max_capacity {
342            return Err(MemError::Overflow(format!(
343                "overflow region would exceed max capacity: {} > {}",
344                new_capacity, self.max_capacity
345            )));
346        }
347
348        // SAFETY: `self._fd` is a valid open file descriptor (kept alive by Arc).
349        // `new_capacity` has been validated to be <= `self.max_capacity`.
350        unsafe {
351            if libc::ftruncate(self._fd.as_raw_fd(), new_capacity as libc::off_t) != 0 {
352                return Err(MemError::Overflow(
353                    "failed to truncate file for growth".to_string(),
354                ));
355            }
356        }
357
358        // SAFETY: `self.base` is a valid mmap'd pointer with size `self.capacity`
359        // (established at construction or the last successful mremap). `new_capacity`
360        // is the ftruncate'd file size. MREMAP_MAYMOVE allows the kernel to relocate
361        // the mapping; we update `self.base` below so all subsequent accesses use the
362        // new address. No other thread accesses this region (!Send + !Sync).
363        let new_base = unsafe {
364            libc::mremap(
365                self.base as *mut libc::c_void,
366                self.capacity,
367                new_capacity,
368                libc::MREMAP_MAYMOVE,
369            )
370        };
371
372        if new_base == libc::MAP_FAILED {
373            return Err(MemError::Overflow(
374                "failed to remap overflow region".to_string(),
375            ));
376        }
377
378        self.base = new_base as *mut u8;
379        self.capacity = new_capacity;
380
381        // mremap produces a fresh mapping that inherits no advice — re-advise
382        // so growth doesn't silently regress to MADV_NORMAL.
383        self.madvise_state = advise_random(self.base, self.capacity, &self.path);
384
385        Ok(())
386    }
387}
388
389impl Drop for OverflowRegion {
390    fn drop(&mut self) {
391        // SAFETY: `self.base` was obtained from a successful `mmap` or `mremap` call,
392        // and `self.capacity` matches the current mapping size. The null check guards
393        // against double-unmap (though this should never happen in normal operation).
394        unsafe {
395            if !self.base.is_null() {
396                let _ = libc::munmap(self.base as *mut libc::c_void, self.capacity);
397            }
398        }
399    }
400}
401
402// SAFETY: OverflowRegion is intentionally !Send and !Sync because it holds
403// a raw mutable pointer to an mmap'd region. The pointer is only safe to
404// access from the single thread that owns the region (the core that created it).
405// This enforces the TPC invariant: no cross-core sharing of mutable state.
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410
411    #[test]
412    fn create_and_write() {
413        let dir = tempfile::tempdir().expect("failed to create temp dir");
414        let path = dir.path().join("overflow.mmap");
415
416        let mut region = OverflowRegion::open(&path).expect("failed to open region");
417        assert_eq!(region.used_bytes(), 0);
418        assert!(region.capacity() > 0);
419
420        let data = b"hello, world!";
421        let slot_idx = region
422            .write(data, EngineId::Vector)
423            .expect("failed to write");
424
425        assert_eq!(region.used_bytes(), data.len());
426        assert_eq!(slot_idx, 0);
427        assert_eq!(region.slot_count(), 1);
428    }
429
430    #[test]
431    fn regrowth_re_advises_random_after_mremap() {
432        // Regression coverage: mremap-grown spill regions
433        // inherited no advice, silently regressing to MADV_NORMAL. Private
434        // to this file because it drives the internal bump allocator until
435        // `grow()` fires — no public API exposes the growth path directly.
436        let dir = tempfile::tempdir().unwrap();
437        let path = dir.path().join("regrow.mmap");
438
439        let mut region = OverflowRegion::open_with_capacity(&path, 4096).unwrap();
440        let before = test_hooks::madv_random_count();
441        // open() already advised once.
442        assert_eq!(region.madvise_state(), Some(libc::MADV_RANDOM));
443
444        // Write 4096+ bytes to force grow().
445        let blob = vec![0xABu8; 4096];
446        region.write(&blob, EngineId::Vector).unwrap();
447        region.write(&blob, EngineId::Vector).unwrap();
448
449        assert!(
450            region.capacity() > 4096,
451            "test must have triggered at least one regrowth"
452        );
453        let after = test_hooks::madv_random_count();
454        assert!(
455            after > before,
456            "mremap-grown region must re-advise MADV_RANDOM (before={before}, after={after})"
457        );
458        assert_eq!(region.madvise_state(), Some(libc::MADV_RANDOM));
459    }
460
461    #[test]
462    fn write_and_read_roundtrip() {
463        let dir = tempfile::tempdir().expect("failed to create temp dir");
464        let path = dir.path().join("overflow.mmap");
465
466        let mut region = OverflowRegion::open(&path).expect("failed to open region");
467
468        let data1 = b"first";
469        let data2 = b"second";
470
471        let slot1 = region
472            .write(data1, EngineId::Vector)
473            .expect("failed to write slot 1");
474        let slot2 = region
475            .write(data2, EngineId::Sparse)
476            .expect("failed to write slot 2");
477
478        assert_eq!(slot1, 0);
479        assert_eq!(slot2, 1);
480
481        let read1 = region.read(slot1).expect("failed to read slot 1");
482        let read2 = region.read(slot2).expect("failed to read slot 2");
483
484        assert_eq!(read1, data1);
485        assert_eq!(read2, data2);
486    }
487
488    #[test]
489    fn free_slot() {
490        let dir = tempfile::tempdir().expect("failed to create temp dir");
491        let path = dir.path().join("overflow.mmap");
492
493        let mut region = OverflowRegion::open(&path).expect("failed to open region");
494
495        let slot = region
496            .write(b"data", EngineId::Vector)
497            .expect("failed to write");
498
499        // Should read successfully before free.
500        assert!(region.read(slot).is_ok());
501
502        // Free the slot.
503        region.free(slot).expect("failed to free slot");
504
505        // Should fail after free.
506        assert!(region.read(slot).is_err());
507    }
508
509    #[test]
510    fn grow_region() {
511        let dir = tempfile::tempdir().expect("failed to create temp dir");
512        let path = dir.path().join("overflow.mmap");
513
514        let initial = 1024; // 1 KiB for testing
515        let mut region =
516            OverflowRegion::open_with_capacity(&path, initial).expect("failed to open region");
517
518        assert_eq!(region.capacity(), initial);
519
520        // Write data larger than initial capacity.
521        let large_data = vec![0u8; initial * 2];
522        let slot = region
523            .write(&large_data, EngineId::Vector)
524            .expect("failed to write large data");
525
526        // Region should have grown.
527        assert!(region.capacity() > initial);
528
529        // Should still be readable.
530        let read_back = region.read(slot).expect("failed to read after growth");
531        assert_eq!(read_back.len(), large_data.len());
532        assert_eq!(read_back, &large_data[..]);
533    }
534
535    #[test]
536    fn invalid_slot_index() {
537        let dir = tempfile::tempdir().expect("failed to create temp dir");
538        let path = dir.path().join("overflow.mmap");
539
540        let mut region = OverflowRegion::open(&path).expect("failed to open region");
541
542        // Try to read non-existent slot.
543        assert!(region.read(999).is_err());
544
545        // Try to free non-existent slot.
546        assert!(region.free(999).is_err());
547    }
548
549    #[test]
550    fn free_list_reuse() {
551        let dir = tempfile::tempdir().expect("failed to create temp dir");
552        let path = dir.path().join("overflow.mmap");
553
554        let mut region = OverflowRegion::open(&path).expect("failed to open region");
555
556        // Write three slots.
557        let s0 = region.write(b"aaaa", EngineId::Vector).expect("write s0");
558        let s1 = region.write(b"bbbb", EngineId::Sparse).expect("write s1");
559        let _s2 = region.write(b"cccc", EngineId::Vector).expect("write s2");
560
561        let cursor_before = region.used_bytes();
562
563        // Free s0 and s1.
564        region.free(s0).expect("free s0");
565        region.free(s1).expect("free s1");
566
567        // Write new data that fits in a freed slot.
568        let s3 = region.write(b"dd", EngineId::Sparse).expect("write s3");
569
570        // Should have reused a freed slot, not advanced the cursor.
571        assert_eq!(region.used_bytes(), cursor_before);
572        // s3 should reuse s0 or s1 (best-fit: both are 4 bytes, either works).
573        assert!(s3 == s0 || s3 == s1);
574
575        // Data should be readable.
576        let data = region.read(s3).expect("read s3");
577        assert_eq!(&data[..2], b"dd");
578    }
579
580    #[test]
581    fn double_free_is_error() {
582        let dir = tempfile::tempdir().expect("failed to create temp dir");
583        let path = dir.path().join("overflow.mmap");
584
585        let mut region = OverflowRegion::open(&path).expect("failed to open region");
586        let slot = region.write(b"data", EngineId::Vector).expect("write");
587        region.free(slot).expect("first free");
588        assert!(region.free(slot).is_err());
589    }
590
591    #[test]
592    fn slot_metadata() {
593        let dir = tempfile::tempdir().expect("failed to create temp dir");
594        let path = dir.path().join("overflow.mmap");
595
596        let mut region = OverflowRegion::open(&path).expect("failed to open region");
597
598        let slot1 = region
599            .write(b"abc", EngineId::Vector)
600            .expect("failed to write");
601        let slot2 = region
602            .write(b"defgh", EngineId::Sparse)
603            .expect("failed to write");
604
605        let s1 = &region.slots[slot1];
606        let s2 = &region.slots[slot2];
607
608        assert_eq!(s1.size, 3);
609        assert_eq!(s1.engine, EngineId::Vector);
610        assert!(s1.occupied);
611
612        assert_eq!(s2.size, 5);
613        assert_eq!(s2.engine, EngineId::Sparse);
614        assert!(s2.occupied);
615
616        region.free(slot1).expect("failed to free");
617        assert!(!region.slots[slot1].occupied);
618    }
619}