Skip to main content

lnc_core/
buffer.rs

1use crate::{LanceError, Result};
2use std::alloc::{Layout, alloc, dealloc};
3use std::ptr::NonNull;
4
5const PAGE_SIZE: usize = 4096;
6
7#[repr(C, align(4096))]
8pub struct AlignedBuffer {
9    ptr: NonNull<u8>,
10    len: usize,
11    capacity: usize,
12}
13
14impl AlignedBuffer {
15    /// Create a new aligned buffer with the given capacity.
16    ///
17    /// # Errors
18    /// Returns an error if memory allocation fails.
19    pub fn new(capacity: usize) -> Result<Self> {
20        let aligned_capacity = (capacity + PAGE_SIZE - 1) & !(PAGE_SIZE - 1);
21        let layout = Layout::from_size_align(aligned_capacity, PAGE_SIZE)
22            .map_err(|_| LanceError::NumaAllocFailed(0))?;
23
24        // SAFETY: Layout is valid (checked above), alloc returns valid ptr or null
25        let ptr = unsafe {
26            let raw = alloc(layout);
27            if raw.is_null() {
28                return Err(LanceError::NumaAllocFailed(0));
29            }
30            // SAFETY: We just verified raw is not null
31            NonNull::new_unchecked(raw)
32        };
33
34        Ok(Self {
35            ptr,
36            len: 0,
37            capacity: aligned_capacity,
38        })
39    }
40
41    #[inline]
42    #[must_use]
43    pub fn as_ptr(&self) -> *const u8 {
44        self.ptr.as_ptr()
45    }
46
47    #[inline]
48    pub fn as_mut_ptr(&mut self) -> *mut u8 {
49        self.ptr.as_ptr()
50    }
51
52    #[inline]
53    #[must_use]
54    pub fn as_slice(&self) -> &[u8] {
55        // SAFETY: ptr is valid, len <= capacity, and memory is initialized
56        unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
57    }
58
59    #[inline]
60    pub fn as_mut_slice(&mut self) -> &mut [u8] {
61        // SAFETY: ptr is valid, len <= capacity, and we have exclusive access
62        unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) }
63    }
64
65    #[inline]
66    #[must_use]
67    pub fn len(&self) -> usize {
68        self.len
69    }
70
71    #[inline]
72    #[must_use]
73    pub fn is_empty(&self) -> bool {
74        self.len == 0
75    }
76
77    #[inline]
78    #[must_use]
79    pub fn capacity(&self) -> usize {
80        self.capacity
81    }
82
83    #[inline]
84    pub fn set_len(&mut self, len: usize) {
85        debug_assert!(len <= self.capacity);
86        self.len = len.min(self.capacity);
87    }
88
89    /// Write data at the specified offset.
90    ///
91    /// # Errors
92    /// Returns an error if write exceeds buffer capacity.
93    pub fn write(&mut self, offset: usize, data: &[u8]) -> Result<()> {
94        if offset + data.len() > self.capacity {
95            return Err(LanceError::IndexOutOfBounds(format!(
96                "write at offset {} with len {} exceeds capacity {}",
97                offset,
98                data.len(),
99                self.capacity
100            )));
101        }
102
103        // SAFETY: Bounds checked above, ptr is valid, regions don't overlap
104        unsafe {
105            std::ptr::copy_nonoverlapping(data.as_ptr(), self.ptr.as_ptr().add(offset), data.len());
106        }
107
108        if offset + data.len() > self.len {
109            self.len = offset + data.len();
110        }
111
112        Ok(())
113    }
114
115    pub fn clear(&mut self) {
116        self.len = 0;
117    }
118
119    /// Lock the buffer in memory using mlock to prevent page faults.
120    /// This ensures the buffer pages are resident in RAM and won't be swapped.
121    ///
122    /// # Errors
123    /// Returns an error if mlock fails (e.g., insufficient privileges or limits).
124    #[cfg(target_os = "linux")]
125    pub fn mlock(&self) -> Result<()> {
126        // SAFETY: ptr is valid and capacity is the correct size
127        let result =
128            unsafe { libc::mlock(self.ptr.as_ptr() as *const libc::c_void, self.capacity) };
129        if result != 0 {
130            return Err(LanceError::MlockFailed(
131                std::io::Error::last_os_error().to_string(),
132            ));
133        }
134        Ok(())
135    }
136
137    /// Lock the buffer in memory (no-op on non-Linux platforms).
138    ///
139    /// # Errors
140    ///
141    /// Returns `Ok(())` on non-Linux platforms (no-op).
142    #[cfg(not(target_os = "linux"))]
143    pub fn mlock(&self) -> Result<()> {
144        Ok(())
145    }
146
147    /// Prefault all pages in the buffer by touching each page.
148    /// This ensures pages are allocated and mapped before use on the hot path.
149    pub fn prefault(&mut self) {
150        let page_count = self.capacity.div_ceil(PAGE_SIZE);
151        // SAFETY: We touch each page within our allocated capacity
152        unsafe {
153            let ptr = self.ptr.as_ptr();
154            for i in 0..page_count {
155                // Write a zero byte to each page to fault it in
156                std::ptr::write_volatile(ptr.add(i * PAGE_SIZE), 0);
157            }
158        }
159    }
160
161    /// Lock and prefault the buffer for zero-latency access on hot path.
162    ///
163    /// # Errors
164    /// Returns an error if mlock fails.
165    pub fn lock_and_prefault(&mut self) -> Result<()> {
166        self.prefault();
167        self.mlock()
168    }
169}
170
171impl Drop for AlignedBuffer {
172    fn drop(&mut self) {
173        // SAFETY: Layout was valid during allocation, so it must be valid here
174        if let Ok(layout) = Layout::from_size_align(self.capacity, PAGE_SIZE) {
175            unsafe {
176                dealloc(self.ptr.as_ptr(), layout);
177            }
178        }
179    }
180}
181
182unsafe impl Send for AlignedBuffer {}
183unsafe impl Sync for AlignedBuffer {}
184
185pub struct NumaAlignedBuffer {
186    inner: AlignedBuffer,
187    numa_node: usize,
188}
189
190impl NumaAlignedBuffer {
191    /// Create a new NUMA-aligned buffer.
192    ///
193    /// # Errors
194    /// Returns an error if NUMA allocation fails.
195    #[cfg(target_os = "linux")]
196    pub fn new(capacity: usize, numa_node: usize) -> Result<Self> {
197        let aligned_capacity = (capacity + PAGE_SIZE - 1) & !(PAGE_SIZE - 1);
198
199        // SAFETY: mmap with MAP_ANONYMOUS creates new private mapping,
200        // mbind binds pages to NUMA node, munmap cleans up on failure
201        let ptr = unsafe {
202            let raw = libc::mmap(
203                std::ptr::null_mut(),
204                aligned_capacity,
205                libc::PROT_READ | libc::PROT_WRITE,
206                libc::MAP_PRIVATE | libc::MAP_ANONYMOUS,
207                -1,
208                0,
209            );
210
211            if raw == libc::MAP_FAILED {
212                return Err(LanceError::NumaAllocFailed(numa_node));
213            }
214
215            let nodemask: u64 = 1u64 << numa_node;
216            let result = libc::syscall(
217                libc::SYS_mbind,
218                raw,
219                aligned_capacity,
220                1i32,
221                std::ptr::addr_of!(nodemask),
222                64usize,
223                0i32,
224            );
225
226            if result != 0 {
227                libc::munmap(raw, aligned_capacity);
228                return Err(LanceError::NumaAllocFailed(numa_node));
229            }
230
231            NonNull::new(raw.cast::<u8>()).ok_or(LanceError::NumaAllocFailed(numa_node))?
232        };
233
234        Ok(Self {
235            inner: AlignedBuffer {
236                ptr,
237                len: 0,
238                capacity: aligned_capacity,
239            },
240            numa_node,
241        })
242    }
243
244    /// Create a new NUMA-aligned buffer.
245    ///
246    /// # Errors
247    /// Returns an error if NUMA allocation fails.
248    #[cfg(not(target_os = "linux"))]
249    pub fn new(capacity: usize, numa_node: usize) -> Result<Self> {
250        Ok(Self {
251            inner: AlignedBuffer::new(capacity)?,
252            numa_node,
253        })
254    }
255
256    #[inline]
257    #[must_use]
258    pub fn numa_node(&self) -> usize {
259        self.numa_node
260    }
261
262    #[inline]
263    #[must_use]
264    pub fn as_ptr(&self) -> *const u8 {
265        self.inner.as_ptr()
266    }
267
268    #[inline]
269    pub fn as_mut_ptr(&mut self) -> *mut u8 {
270        self.inner.as_mut_ptr()
271    }
272
273    #[inline]
274    #[must_use]
275    pub fn as_slice(&self) -> &[u8] {
276        self.inner.as_slice()
277    }
278
279    #[inline]
280    pub fn as_mut_slice(&mut self) -> &mut [u8] {
281        self.inner.as_mut_slice()
282    }
283
284    #[inline]
285    #[must_use]
286    pub fn len(&self) -> usize {
287        self.inner.len()
288    }
289
290    #[inline]
291    #[must_use]
292    pub fn is_empty(&self) -> bool {
293        self.inner.is_empty()
294    }
295
296    #[inline]
297    #[must_use]
298    pub fn capacity(&self) -> usize {
299        self.inner.capacity()
300    }
301
302    #[inline]
303    pub fn set_len(&mut self, len: usize) {
304        self.inner.set_len(len);
305    }
306
307    /// Write data at the specified offset.
308    ///
309    /// # Errors
310    /// Returns an error if write exceeds buffer capacity.
311    pub fn write(&mut self, offset: usize, data: &[u8]) -> Result<()> {
312        self.inner.write(offset, data)
313    }
314
315    pub fn clear(&mut self) {
316        self.inner.clear();
317    }
318
319    /// Lock the buffer in memory using mlock to prevent page faults.
320    ///
321    /// # Errors
322    /// Returns an error if mlock fails.
323    pub fn mlock(&self) -> Result<()> {
324        self.inner.mlock()
325    }
326
327    /// Prefault all pages in the buffer by touching each page.
328    pub fn prefault(&mut self) {
329        self.inner.prefault();
330    }
331
332    /// Lock and prefault the buffer for zero-latency access on hot path.
333    ///
334    /// # Errors
335    /// Returns an error if mlock fails.
336    pub fn lock_and_prefault(&mut self) -> Result<()> {
337        self.inner.lock_and_prefault()
338    }
339}
340
341#[cfg(target_os = "linux")]
342impl Drop for NumaAlignedBuffer {
343    fn drop(&mut self) {
344        unsafe {
345            libc::munmap(
346                self.inner.ptr.as_ptr().cast::<libc::c_void>(),
347                self.inner.capacity,
348            );
349        }
350        self.inner.capacity = 0;
351    }
352}
353
354#[cfg(test)]
355#[allow(clippy::unwrap_used)]
356mod tests {
357    use super::*;
358
359    #[test]
360    fn test_aligned_buffer_creation() {
361        let buffer = AlignedBuffer::new(8192).unwrap();
362        assert!(buffer.as_ptr() as usize % PAGE_SIZE == 0);
363        assert!(buffer.capacity() >= 8192);
364    }
365
366    #[test]
367    fn test_aligned_buffer_write() {
368        let mut buffer = AlignedBuffer::new(4096).unwrap();
369        buffer.write(0, b"hello world").unwrap();
370        assert_eq!(&buffer.as_slice()[..11], b"hello world");
371    }
372
373    #[test]
374    fn test_aligned_buffer_bounds() {
375        let mut buffer = AlignedBuffer::new(4096).unwrap();
376        let result = buffer.write(4090, &[0u8; 100]);
377        assert!(result.is_err());
378    }
379
380    #[test]
381    fn test_aligned_buffer_prefault() {
382        let mut buffer = AlignedBuffer::new(8192).unwrap();
383        // Prefault should complete without panic
384        buffer.prefault();
385        // Verify buffer is still usable
386        buffer.write(0, b"test data after prefault").unwrap();
387        assert_eq!(&buffer.as_slice()[..24], b"test data after prefault");
388    }
389
390    #[test]
391    fn test_aligned_buffer_lock_and_prefault() {
392        let mut buffer = AlignedBuffer::new(4096).unwrap();
393        // On non-Linux or without privileges, mlock may fail but prefault should work
394        let _ = buffer.lock_and_prefault();
395        // Buffer should still be usable regardless
396        buffer.write(0, b"locked data").unwrap();
397        assert_eq!(&buffer.as_slice()[..11], b"locked data");
398    }
399}