1use crate::{LanceError, Result};
2use std::alloc::{Layout, alloc, dealloc};
3use std::ptr::NonNull;
4
5const PAGE_SIZE: usize = 4096;
6
7#[repr(C, align(4096))]
8pub struct AlignedBuffer {
9 ptr: NonNull<u8>,
10 len: usize,
11 capacity: usize,
12}
13
14impl AlignedBuffer {
15 pub fn new(capacity: usize) -> Result<Self> {
20 let aligned_capacity = (capacity + PAGE_SIZE - 1) & !(PAGE_SIZE - 1);
21 let layout = Layout::from_size_align(aligned_capacity, PAGE_SIZE)
22 .map_err(|_| LanceError::NumaAllocFailed(0))?;
23
24 let ptr = unsafe {
26 let raw = alloc(layout);
27 if raw.is_null() {
28 return Err(LanceError::NumaAllocFailed(0));
29 }
30 NonNull::new_unchecked(raw)
32 };
33
34 Ok(Self {
35 ptr,
36 len: 0,
37 capacity: aligned_capacity,
38 })
39 }
40
41 #[inline]
42 #[must_use]
43 pub fn as_ptr(&self) -> *const u8 {
44 self.ptr.as_ptr()
45 }
46
47 #[inline]
48 pub fn as_mut_ptr(&mut self) -> *mut u8 {
49 self.ptr.as_ptr()
50 }
51
52 #[inline]
53 #[must_use]
54 pub fn as_slice(&self) -> &[u8] {
55 unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
57 }
58
59 #[inline]
60 pub fn as_mut_slice(&mut self) -> &mut [u8] {
61 unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) }
63 }
64
65 #[inline]
66 #[must_use]
67 pub fn len(&self) -> usize {
68 self.len
69 }
70
71 #[inline]
72 #[must_use]
73 pub fn is_empty(&self) -> bool {
74 self.len == 0
75 }
76
77 #[inline]
78 #[must_use]
79 pub fn capacity(&self) -> usize {
80 self.capacity
81 }
82
83 #[inline]
84 pub fn set_len(&mut self, len: usize) {
85 debug_assert!(len <= self.capacity);
86 self.len = len.min(self.capacity);
87 }
88
89 pub fn write(&mut self, offset: usize, data: &[u8]) -> Result<()> {
94 if offset + data.len() > self.capacity {
95 return Err(LanceError::IndexOutOfBounds(format!(
96 "write at offset {} with len {} exceeds capacity {}",
97 offset,
98 data.len(),
99 self.capacity
100 )));
101 }
102
103 unsafe {
105 std::ptr::copy_nonoverlapping(data.as_ptr(), self.ptr.as_ptr().add(offset), data.len());
106 }
107
108 if offset + data.len() > self.len {
109 self.len = offset + data.len();
110 }
111
112 Ok(())
113 }
114
115 pub fn clear(&mut self) {
116 self.len = 0;
117 }
118
119 #[cfg(target_os = "linux")]
125 pub fn mlock(&self) -> Result<()> {
126 let result =
128 unsafe { libc::mlock(self.ptr.as_ptr() as *const libc::c_void, self.capacity) };
129 if result != 0 {
130 return Err(LanceError::MlockFailed(
131 std::io::Error::last_os_error().to_string(),
132 ));
133 }
134 Ok(())
135 }
136
137 #[cfg(not(target_os = "linux"))]
143 pub fn mlock(&self) -> Result<()> {
144 Ok(())
145 }
146
147 pub fn prefault(&mut self) {
150 let page_count = self.capacity.div_ceil(PAGE_SIZE);
151 unsafe {
153 let ptr = self.ptr.as_ptr();
154 for i in 0..page_count {
155 std::ptr::write_volatile(ptr.add(i * PAGE_SIZE), 0);
157 }
158 }
159 }
160
161 pub fn lock_and_prefault(&mut self) -> Result<()> {
166 self.prefault();
167 self.mlock()
168 }
169}
170
171impl Drop for AlignedBuffer {
172 fn drop(&mut self) {
173 if let Ok(layout) = Layout::from_size_align(self.capacity, PAGE_SIZE) {
175 unsafe {
176 dealloc(self.ptr.as_ptr(), layout);
177 }
178 }
179 }
180}
181
182unsafe impl Send for AlignedBuffer {}
183unsafe impl Sync for AlignedBuffer {}
184
185pub struct NumaAlignedBuffer {
186 inner: AlignedBuffer,
187 numa_node: usize,
188}
189
190impl NumaAlignedBuffer {
191 #[cfg(target_os = "linux")]
196 pub fn new(capacity: usize, numa_node: usize) -> Result<Self> {
197 let aligned_capacity = (capacity + PAGE_SIZE - 1) & !(PAGE_SIZE - 1);
198
199 let ptr = unsafe {
202 let raw = libc::mmap(
203 std::ptr::null_mut(),
204 aligned_capacity,
205 libc::PROT_READ | libc::PROT_WRITE,
206 libc::MAP_PRIVATE | libc::MAP_ANONYMOUS,
207 -1,
208 0,
209 );
210
211 if raw == libc::MAP_FAILED {
212 return Err(LanceError::NumaAllocFailed(numa_node));
213 }
214
215 let nodemask: u64 = 1u64 << numa_node;
216 let result = libc::syscall(
217 libc::SYS_mbind,
218 raw,
219 aligned_capacity,
220 1i32,
221 std::ptr::addr_of!(nodemask),
222 64usize,
223 0i32,
224 );
225
226 if result != 0 {
227 libc::munmap(raw, aligned_capacity);
228 return Err(LanceError::NumaAllocFailed(numa_node));
229 }
230
231 NonNull::new(raw.cast::<u8>()).ok_or(LanceError::NumaAllocFailed(numa_node))?
232 };
233
234 Ok(Self {
235 inner: AlignedBuffer {
236 ptr,
237 len: 0,
238 capacity: aligned_capacity,
239 },
240 numa_node,
241 })
242 }
243
244 #[cfg(not(target_os = "linux"))]
249 pub fn new(capacity: usize, numa_node: usize) -> Result<Self> {
250 Ok(Self {
251 inner: AlignedBuffer::new(capacity)?,
252 numa_node,
253 })
254 }
255
256 #[inline]
257 #[must_use]
258 pub fn numa_node(&self) -> usize {
259 self.numa_node
260 }
261
262 #[inline]
263 #[must_use]
264 pub fn as_ptr(&self) -> *const u8 {
265 self.inner.as_ptr()
266 }
267
268 #[inline]
269 pub fn as_mut_ptr(&mut self) -> *mut u8 {
270 self.inner.as_mut_ptr()
271 }
272
273 #[inline]
274 #[must_use]
275 pub fn as_slice(&self) -> &[u8] {
276 self.inner.as_slice()
277 }
278
279 #[inline]
280 pub fn as_mut_slice(&mut self) -> &mut [u8] {
281 self.inner.as_mut_slice()
282 }
283
284 #[inline]
285 #[must_use]
286 pub fn len(&self) -> usize {
287 self.inner.len()
288 }
289
290 #[inline]
291 #[must_use]
292 pub fn is_empty(&self) -> bool {
293 self.inner.is_empty()
294 }
295
296 #[inline]
297 #[must_use]
298 pub fn capacity(&self) -> usize {
299 self.inner.capacity()
300 }
301
302 #[inline]
303 pub fn set_len(&mut self, len: usize) {
304 self.inner.set_len(len);
305 }
306
307 pub fn write(&mut self, offset: usize, data: &[u8]) -> Result<()> {
312 self.inner.write(offset, data)
313 }
314
315 pub fn clear(&mut self) {
316 self.inner.clear();
317 }
318
319 pub fn mlock(&self) -> Result<()> {
324 self.inner.mlock()
325 }
326
327 pub fn prefault(&mut self) {
329 self.inner.prefault();
330 }
331
332 pub fn lock_and_prefault(&mut self) -> Result<()> {
337 self.inner.lock_and_prefault()
338 }
339}
340
341#[cfg(target_os = "linux")]
342impl Drop for NumaAlignedBuffer {
343 fn drop(&mut self) {
344 unsafe {
345 libc::munmap(
346 self.inner.ptr.as_ptr().cast::<libc::c_void>(),
347 self.inner.capacity,
348 );
349 }
350 self.inner.capacity = 0;
351 }
352}
353
354#[cfg(test)]
355#[allow(clippy::unwrap_used)]
356mod tests {
357 use super::*;
358
359 #[test]
360 fn test_aligned_buffer_creation() {
361 let buffer = AlignedBuffer::new(8192).unwrap();
362 assert!(buffer.as_ptr() as usize % PAGE_SIZE == 0);
363 assert!(buffer.capacity() >= 8192);
364 }
365
366 #[test]
367 fn test_aligned_buffer_write() {
368 let mut buffer = AlignedBuffer::new(4096).unwrap();
369 buffer.write(0, b"hello world").unwrap();
370 assert_eq!(&buffer.as_slice()[..11], b"hello world");
371 }
372
373 #[test]
374 fn test_aligned_buffer_bounds() {
375 let mut buffer = AlignedBuffer::new(4096).unwrap();
376 let result = buffer.write(4090, &[0u8; 100]);
377 assert!(result.is_err());
378 }
379
380 #[test]
381 fn test_aligned_buffer_prefault() {
382 let mut buffer = AlignedBuffer::new(8192).unwrap();
383 buffer.prefault();
385 buffer.write(0, b"test data after prefault").unwrap();
387 assert_eq!(&buffer.as_slice()[..24], b"test data after prefault");
388 }
389
390 #[test]
391 fn test_aligned_buffer_lock_and_prefault() {
392 let mut buffer = AlignedBuffer::new(4096).unwrap();
393 let _ = buffer.lock_and_prefault();
395 buffer.write(0, b"locked data").unwrap();
397 assert_eq!(&buffer.as_slice()[..11], b"locked data");
398 }
399}