Skip to main content

nodedb_mem/
overflow.rs

1//! mmap-backed overflow region for spilled arena allocations.
2//!
3//! Each Data Plane core gets its own overflow file. The owning core writes
4//! spilled data; other cores may open read-only mmaps for cross-core access.
5//! This preserves zero-lock TPC isolation.
6//!
7//! The region uses a bump allocator within the mmap'd file. When the file fills,
8//! it grows via `ftruncate` + `mremap`.
9
10use std::os::fd::AsRawFd;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14use crate::engine::EngineId;
15use crate::error::{MemError, Result};
16
17/// Metadata for one spilled allocation in the overflow region.
18#[derive(Debug, Clone)]
19pub struct OverflowSlot {
20    /// Offset within the mmap region.
21    pub offset: usize,
22    /// Size of the spilled data.
23    pub size: usize,
24    /// Engine that owned this allocation.
25    pub engine: EngineId,
26    /// Whether this slot is occupied (false = freed/compacted).
27    pub occupied: bool,
28}
29
30/// mmap-backed overflow region owned by a single core.
31///
32/// Uses a bump allocator within the mmap'd file with a free-list for
33/// slot reuse. When a slot is freed, it's added to the free-list so
34/// future writes of equal or smaller size can reclaim the space without
35/// advancing the bump cursor.
36///
37/// Not `Send` or `Sync` — it's single-thread owned.
38pub struct OverflowRegion {
39    /// Path to the overflow file (used for debugging/diagnostics).
40    path: PathBuf,
41    /// File descriptor wrapped in Arc for drop safety.
42    _fd: Arc<std::fs::File>,
43    /// mmap'd region. null if not yet mapped.
44    base: *mut u8,
45    /// Current capacity of the mmap in bytes.
46    capacity: usize,
47    /// Bump pointer: next write starts here.
48    cursor: usize,
49    /// Slot metadata.
50    slots: Vec<OverflowSlot>,
51    /// Free-list: indices of freed slots, sorted largest-first for best-fit.
52    free_list: Vec<usize>,
53    /// Maximum capacity (prevents unbounded growth).
54    max_capacity: usize,
55}
56
57impl OverflowRegion {
58    /// Default initial mmap size.
59    ///
60    /// Corresponds to `MemoryTuning::overflow_initial_bytes`.
61    pub const DEFAULT_INITIAL_CAPACITY: usize = 64 * 1024 * 1024; // 64 MiB
62
63    /// Default maximum capacity (prevents unbounded growth).
64    ///
65    /// Corresponds to `MemoryTuning::overflow_max_bytes`.
66    pub const DEFAULT_MAX_CAPACITY: usize = 1024 * 1024 * 1024; // 1 GiB
67
68    /// Open or create an overflow region at the given path.
69    ///
70    /// If the file doesn't exist, it's created with the initial capacity.
71    /// If the file exists, it's mapped at current size.
72    pub fn open(path: &Path) -> Result<Self> {
73        Self::open_with_capacity(path, Self::DEFAULT_INITIAL_CAPACITY)
74    }
75
76    /// Open or create an overflow region with explicit initial and maximum capacity.
77    ///
78    /// Use this when applying runtime config from `MemoryTuning`.
79    pub fn open_with_config(
80        path: &Path,
81        initial_capacity: usize,
82        max_capacity: usize,
83    ) -> Result<Self> {
84        let mut region = Self::open_with_capacity(path, initial_capacity)?;
85        region.max_capacity = max_capacity;
86        Ok(region)
87    }
88
89    /// Open or create an overflow region with a specific initial capacity.
90    pub fn open_with_capacity(path: &Path, initial_capacity: usize) -> Result<Self> {
91        let fd = std::fs::OpenOptions::new()
92            .read(true)
93            .write(true)
94            .create(true)
95            .truncate(false)
96            .open(path)
97            .map_err(|e| MemError::Overflow(format!("failed to open overflow file: {e}")))?;
98
99        let current_size = fd
100            .metadata()
101            .map_err(|e| MemError::Overflow(format!("failed to get file metadata: {e}")))?
102            .len() as usize;
103
104        let capacity = if current_size == 0 {
105            // New file — truncate to initial capacity.
106            fd.set_len(initial_capacity as u64)
107                .map_err(|e| MemError::Overflow(format!("failed to truncate file: {e}")))?;
108            initial_capacity
109        } else {
110            current_size
111        };
112
113        // SAFETY: We pass null to let the kernel choose the mapping address.
114        // `capacity` is non-zero (either from file metadata or `initial_capacity`).
115        // `fd` is a valid, open file descriptor with read/write permissions.
116        // MAP_SHARED is correct for file-backed overflow that may be read by other
117        // cores via separate read-only mappings. We check for MAP_FAILED below.
118        let base = unsafe {
119            libc::mmap(
120                std::ptr::null_mut(),
121                capacity,
122                libc::PROT_READ | libc::PROT_WRITE,
123                libc::MAP_SHARED,
124                fd.as_raw_fd(),
125                0,
126            )
127        };
128
129        if base == libc::MAP_FAILED {
130            return Err(MemError::Overflow(
131                "failed to mmap overflow region".to_string(),
132            ));
133        }
134
135        Ok(Self {
136            path: path.to_path_buf(),
137            _fd: Arc::new(fd),
138            base: base as *mut u8,
139            capacity,
140            cursor: 0,
141            slots: Vec::new(),
142            free_list: Vec::new(),
143            max_capacity: Self::DEFAULT_MAX_CAPACITY,
144        })
145    }
146
147    /// Write data to the overflow region and return the slot index.
148    ///
149    /// First attempts to reuse a freed slot from the free-list (best-fit).
150    /// Falls back to bump allocation if no suitable free slot exists.
151    pub fn write(&mut self, data: &[u8], engine: EngineId) -> Result<usize> {
152        // Try to reuse a freed slot that fits this data.
153        if let Some(reused) = self.try_reuse_slot(data, engine) {
154            return Ok(reused);
155        }
156
157        // Bump allocation path.
158        let required = self.cursor + data.len();
159
160        // Check if we need to grow.
161        if required > self.capacity {
162            self.grow(required)?;
163        }
164
165        // SAFETY: `self.base` is non-null (checked at construction, and after every
166        // mremap in `grow`). `self.cursor + data.len() <= self.capacity` is guaranteed
167        // because we called `grow(required)` above when `required > capacity`.
168        // The source (`data`) and destination (`base + cursor`) cannot overlap because
169        // `data` is a caller-owned slice and `base` is an mmap'd region.
170        unsafe {
171            std::ptr::copy_nonoverlapping(data.as_ptr(), self.base.add(self.cursor), data.len());
172        }
173
174        // Record slot.
175        let slot_index = self.slots.len();
176        self.slots.push(OverflowSlot {
177            offset: self.cursor,
178            size: data.len(),
179            engine,
180            occupied: true,
181        });
182
183        self.cursor += data.len();
184
185        Ok(slot_index)
186    }
187
188    /// Try to reuse a freed slot from the free-list.
189    ///
190    /// Uses best-fit: finds the smallest free slot that can hold `data`.
191    /// This minimizes internal fragmentation.
192    fn try_reuse_slot(&mut self, data: &[u8], engine: EngineId) -> Option<usize> {
193        if self.free_list.is_empty() {
194            return None;
195        }
196
197        // Find best-fit: smallest free slot >= data.len().
198        let mut best_idx = None;
199        let mut best_waste = usize::MAX;
200
201        for (fl_idx, &slot_idx) in self.free_list.iter().enumerate() {
202            let slot_size = self.slots[slot_idx].size;
203            if slot_size >= data.len() {
204                let waste = slot_size - data.len();
205                if waste < best_waste {
206                    best_waste = waste;
207                    best_idx = Some(fl_idx);
208                }
209            }
210        }
211
212        let fl_idx = best_idx?;
213        let slot_index = self.free_list.swap_remove(fl_idx);
214        let slot = &mut self.slots[slot_index];
215
216        // SAFETY: The slot's offset and size were validated when originally
217        // written. The slot is marked unoccupied (checked by free_list membership).
218        // data.len() <= slot.size (checked above). base is non-null.
219        unsafe {
220            std::ptr::copy_nonoverlapping(data.as_ptr(), self.base.add(slot.offset), data.len());
221        }
222
223        slot.occupied = true;
224        slot.engine = engine;
225        // Keep original slot.size — the allocated region doesn't shrink.
226        // Internal fragmentation (slot.size - data.len()) is acceptable
227        // to avoid splitting complexity.
228
229        Some(slot_index)
230    }
231
232    /// Read data from a slot (returns a slice into the mmap).
233    pub fn read(&self, slot_index: usize) -> Result<&[u8]> {
234        let slot = self
235            .slots
236            .get(slot_index)
237            .ok_or_else(|| MemError::Overflow(format!("invalid slot index: {slot_index}")))?;
238
239        if !slot.occupied {
240            return Err(MemError::Overflow(format!(
241                "slot {slot_index} is not occupied"
242            )));
243        }
244
245        // SAFETY: `self.base` is non-null. `slot.offset + slot.size` is within
246        // `self.capacity` because slots are only created by `write()` which enforces
247        // this via the grow check. After `mremap` (MREMAP_MAYMOVE), `self.base` is
248        // updated to the new address, so all prior slot offsets remain valid within
249        // the (potentially relocated) region. The slot is confirmed `occupied` above.
250        let slice = unsafe { std::slice::from_raw_parts(self.base.add(slot.offset), slot.size) };
251
252        Ok(slice)
253    }
254
255    /// Mark a slot as freed and add it to the free-list for reuse.
256    pub fn free(&mut self, slot_index: usize) -> Result<()> {
257        let slot = self
258            .slots
259            .get_mut(slot_index)
260            .ok_or_else(|| MemError::Overflow(format!("invalid slot index: {slot_index}")))?;
261
262        if !slot.occupied {
263            return Err(MemError::Overflow(format!(
264                "slot {slot_index} is already freed"
265            )));
266        }
267
268        slot.occupied = false;
269        self.free_list.push(slot_index);
270        Ok(())
271    }
272
273    /// Current utilization of the mmap region in bytes.
274    pub fn used_bytes(&self) -> usize {
275        self.cursor
276    }
277
278    /// Total capacity of the mmap region in bytes.
279    pub fn capacity(&self) -> usize {
280        self.capacity
281    }
282
283    /// Path to the backing overflow file.
284    pub fn path(&self) -> &Path {
285        &self.path
286    }
287
288    /// Number of slots (occupied or freed).
289    pub fn slot_count(&self) -> usize {
290        self.slots.len()
291    }
292
293    /// Grow the mmap region to accommodate at least `required` bytes.
294    fn grow(&mut self, required: usize) -> Result<()> {
295        let new_capacity = (self.capacity * 2).max(required);
296
297        if new_capacity > self.max_capacity {
298            return Err(MemError::Overflow(format!(
299                "overflow region would exceed max capacity: {} > {}",
300                new_capacity, self.max_capacity
301            )));
302        }
303
304        // SAFETY: `self._fd` is a valid open file descriptor (kept alive by Arc).
305        // `new_capacity` has been validated to be <= `self.max_capacity`.
306        unsafe {
307            if libc::ftruncate(self._fd.as_raw_fd(), new_capacity as libc::off_t) != 0 {
308                return Err(MemError::Overflow(
309                    "failed to truncate file for growth".to_string(),
310                ));
311            }
312        }
313
314        // SAFETY: `self.base` is a valid mmap'd pointer with size `self.capacity`
315        // (established at construction or the last successful mremap). `new_capacity`
316        // is the ftruncate'd file size. MREMAP_MAYMOVE allows the kernel to relocate
317        // the mapping; we update `self.base` below so all subsequent accesses use the
318        // new address. No other thread accesses this region (!Send + !Sync).
319        let new_base = unsafe {
320            libc::mremap(
321                self.base as *mut libc::c_void,
322                self.capacity,
323                new_capacity,
324                libc::MREMAP_MAYMOVE,
325            )
326        };
327
328        if new_base == libc::MAP_FAILED {
329            return Err(MemError::Overflow(
330                "failed to remap overflow region".to_string(),
331            ));
332        }
333
334        self.base = new_base as *mut u8;
335        self.capacity = new_capacity;
336
337        Ok(())
338    }
339}
340
341impl Drop for OverflowRegion {
342    fn drop(&mut self) {
343        // SAFETY: `self.base` was obtained from a successful `mmap` or `mremap` call,
344        // and `self.capacity` matches the current mapping size. The null check guards
345        // against double-unmap (though this should never happen in normal operation).
346        unsafe {
347            if !self.base.is_null() {
348                let _ = libc::munmap(self.base as *mut libc::c_void, self.capacity);
349            }
350        }
351    }
352}
353
354// SAFETY: OverflowRegion is intentionally !Send and !Sync because it holds
355// a raw mutable pointer to an mmap'd region. The pointer is only safe to
356// access from the single thread that owns the region (the core that created it).
357// This enforces the TPC invariant: no cross-core sharing of mutable state.
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362
363    #[test]
364    fn create_and_write() {
365        let dir = tempfile::tempdir().expect("failed to create temp dir");
366        let path = dir.path().join("overflow.mmap");
367
368        let mut region = OverflowRegion::open(&path).expect("failed to open region");
369        assert_eq!(region.used_bytes(), 0);
370        assert!(region.capacity() > 0);
371
372        let data = b"hello, world!";
373        let slot_idx = region
374            .write(data, EngineId::Vector)
375            .expect("failed to write");
376
377        assert_eq!(region.used_bytes(), data.len());
378        assert_eq!(slot_idx, 0);
379        assert_eq!(region.slot_count(), 1);
380    }
381
382    #[test]
383    fn write_and_read_roundtrip() {
384        let dir = tempfile::tempdir().expect("failed to create temp dir");
385        let path = dir.path().join("overflow.mmap");
386
387        let mut region = OverflowRegion::open(&path).expect("failed to open region");
388
389        let data1 = b"first";
390        let data2 = b"second";
391
392        let slot1 = region
393            .write(data1, EngineId::Vector)
394            .expect("failed to write slot 1");
395        let slot2 = region
396            .write(data2, EngineId::Sparse)
397            .expect("failed to write slot 2");
398
399        assert_eq!(slot1, 0);
400        assert_eq!(slot2, 1);
401
402        let read1 = region.read(slot1).expect("failed to read slot 1");
403        let read2 = region.read(slot2).expect("failed to read slot 2");
404
405        assert_eq!(read1, data1);
406        assert_eq!(read2, data2);
407    }
408
409    #[test]
410    fn free_slot() {
411        let dir = tempfile::tempdir().expect("failed to create temp dir");
412        let path = dir.path().join("overflow.mmap");
413
414        let mut region = OverflowRegion::open(&path).expect("failed to open region");
415
416        let slot = region
417            .write(b"data", EngineId::Vector)
418            .expect("failed to write");
419
420        // Should read successfully before free.
421        assert!(region.read(slot).is_ok());
422
423        // Free the slot.
424        region.free(slot).expect("failed to free slot");
425
426        // Should fail after free.
427        assert!(region.read(slot).is_err());
428    }
429
430    #[test]
431    fn grow_region() {
432        let dir = tempfile::tempdir().expect("failed to create temp dir");
433        let path = dir.path().join("overflow.mmap");
434
435        let initial = 1024; // 1 KiB for testing
436        let mut region =
437            OverflowRegion::open_with_capacity(&path, initial).expect("failed to open region");
438
439        assert_eq!(region.capacity(), initial);
440
441        // Write data larger than initial capacity.
442        let large_data = vec![0u8; initial * 2];
443        let slot = region
444            .write(&large_data, EngineId::Vector)
445            .expect("failed to write large data");
446
447        // Region should have grown.
448        assert!(region.capacity() > initial);
449
450        // Should still be readable.
451        let read_back = region.read(slot).expect("failed to read after growth");
452        assert_eq!(read_back.len(), large_data.len());
453        assert_eq!(read_back, &large_data[..]);
454    }
455
456    #[test]
457    fn invalid_slot_index() {
458        let dir = tempfile::tempdir().expect("failed to create temp dir");
459        let path = dir.path().join("overflow.mmap");
460
461        let mut region = OverflowRegion::open(&path).expect("failed to open region");
462
463        // Try to read non-existent slot.
464        assert!(region.read(999).is_err());
465
466        // Try to free non-existent slot.
467        assert!(region.free(999).is_err());
468    }
469
470    #[test]
471    fn free_list_reuse() {
472        let dir = tempfile::tempdir().expect("failed to create temp dir");
473        let path = dir.path().join("overflow.mmap");
474
475        let mut region = OverflowRegion::open(&path).expect("failed to open region");
476
477        // Write three slots.
478        let s0 = region.write(b"aaaa", EngineId::Vector).expect("write s0");
479        let s1 = region.write(b"bbbb", EngineId::Sparse).expect("write s1");
480        let _s2 = region.write(b"cccc", EngineId::Vector).expect("write s2");
481
482        let cursor_before = region.used_bytes();
483
484        // Free s0 and s1.
485        region.free(s0).expect("free s0");
486        region.free(s1).expect("free s1");
487
488        // Write new data that fits in a freed slot.
489        let s3 = region.write(b"dd", EngineId::Sparse).expect("write s3");
490
491        // Should have reused a freed slot, not advanced the cursor.
492        assert_eq!(region.used_bytes(), cursor_before);
493        // s3 should reuse s0 or s1 (best-fit: both are 4 bytes, either works).
494        assert!(s3 == s0 || s3 == s1);
495
496        // Data should be readable.
497        let data = region.read(s3).expect("read s3");
498        assert_eq!(&data[..2], b"dd");
499    }
500
501    #[test]
502    fn double_free_is_error() {
503        let dir = tempfile::tempdir().expect("failed to create temp dir");
504        let path = dir.path().join("overflow.mmap");
505
506        let mut region = OverflowRegion::open(&path).expect("failed to open region");
507        let slot = region.write(b"data", EngineId::Vector).expect("write");
508        region.free(slot).expect("first free");
509        assert!(region.free(slot).is_err());
510    }
511
512    #[test]
513    fn slot_metadata() {
514        let dir = tempfile::tempdir().expect("failed to create temp dir");
515        let path = dir.path().join("overflow.mmap");
516
517        let mut region = OverflowRegion::open(&path).expect("failed to open region");
518
519        let slot1 = region
520            .write(b"abc", EngineId::Vector)
521            .expect("failed to write");
522        let slot2 = region
523            .write(b"defgh", EngineId::Sparse)
524            .expect("failed to write");
525
526        let s1 = &region.slots[slot1];
527        let s2 = &region.slots[slot2];
528
529        assert_eq!(s1.size, 3);
530        assert_eq!(s1.engine, EngineId::Vector);
531        assert!(s1.occupied);
532
533        assert_eq!(s2.size, 5);
534        assert_eq!(s2.engine, EngineId::Sparse);
535        assert!(s2.occupied);
536
537        region.free(slot1).expect("failed to free");
538        assert!(!region.slots[slot1].occupied);
539    }
540}