ruvector_graph/optimization/
memory_pool.rs

1//! Custom memory allocators for graph query execution
2//!
3//! This module provides specialized allocators:
4//! - Arena allocation for query-scoped memory
5//! - Object pooling for frequent allocations
6//! - NUMA-aware allocation for distributed systems
7
8use parking_lot::Mutex;
9use std::alloc::{alloc, dealloc, Layout};
10use std::cell::Cell;
11use std::ptr::{self, NonNull};
12use std::sync::Arc;
13
14/// Arena allocator for query execution
15/// All allocations are freed together when the arena is dropped
16pub struct ArenaAllocator {
17    /// Current chunk
18    current: Cell<Option<NonNull<Chunk>>>,
19    /// All chunks (for cleanup)
20    chunks: Mutex<Vec<NonNull<Chunk>>>,
21    /// Default chunk size
22    chunk_size: usize,
23}
24
25struct Chunk {
26    /// Data buffer
27    data: NonNull<u8>,
28    /// Current offset in buffer
29    offset: Cell<usize>,
30    /// Total capacity
31    capacity: usize,
32    /// Next chunk in linked list
33    next: Cell<Option<NonNull<Chunk>>>,
34}
35
36impl ArenaAllocator {
37    /// Create a new arena with default chunk size (1MB)
38    pub fn new() -> Self {
39        Self::with_chunk_size(1024 * 1024)
40    }
41
42    /// Create arena with specific chunk size
43    pub fn with_chunk_size(chunk_size: usize) -> Self {
44        Self {
45            current: Cell::new(None),
46            chunks: Mutex::new(Vec::new()),
47            chunk_size,
48        }
49    }
50
51    /// Allocate memory from the arena
52    pub fn alloc<T>(&self) -> NonNull<T> {
53        let layout = Layout::new::<T>();
54        let ptr = self.alloc_layout(layout);
55        ptr.cast()
56    }
57
58    /// Allocate with specific layout
59    pub fn alloc_layout(&self, layout: Layout) -> NonNull<u8> {
60        let size = layout.size();
61        let align = layout.align();
62
63        // SECURITY: Validate layout parameters
64        assert!(size > 0, "Cannot allocate zero bytes");
65        assert!(
66            align > 0 && align.is_power_of_two(),
67            "Alignment must be a power of 2"
68        );
69        assert!(size <= isize::MAX as usize, "Allocation size too large");
70
71        // Get current chunk or allocate new one
72        let chunk = match self.current.get() {
73            Some(chunk) => chunk,
74            None => {
75                let chunk = self.allocate_chunk();
76                self.current.set(Some(chunk));
77                chunk
78            }
79        };
80
81        unsafe {
82            let chunk_ref = chunk.as_ref();
83            let offset = chunk_ref.offset.get();
84
85            // Align offset
86            let aligned_offset = (offset + align - 1) & !(align - 1);
87
88            // SECURITY: Check for overflow in alignment calculation
89            if aligned_offset < offset {
90                panic!("Alignment calculation overflow");
91            }
92
93            let new_offset = aligned_offset
94                .checked_add(size)
95                .expect("Arena allocation overflow");
96
97            if new_offset > chunk_ref.capacity {
98                // Need a new chunk
99                let new_chunk = self.allocate_chunk();
100                chunk_ref.next.set(Some(new_chunk));
101                self.current.set(Some(new_chunk));
102
103                // Retry allocation with new chunk
104                return self.alloc_layout(layout);
105            }
106
107            chunk_ref.offset.set(new_offset);
108
109            // SECURITY: Verify pointer arithmetic is safe
110            let result_ptr = chunk_ref.data.as_ptr().add(aligned_offset);
111            debug_assert!(
112                result_ptr as usize >= chunk_ref.data.as_ptr() as usize,
113                "Pointer arithmetic underflow"
114            );
115            debug_assert!(
116                result_ptr as usize <= chunk_ref.data.as_ptr().add(chunk_ref.capacity) as usize,
117                "Pointer arithmetic overflow"
118            );
119
120            NonNull::new_unchecked(result_ptr)
121        }
122    }
123
124    /// Allocate a new chunk
125    fn allocate_chunk(&self) -> NonNull<Chunk> {
126        unsafe {
127            let layout = Layout::from_size_align_unchecked(self.chunk_size, 64);
128            let data = NonNull::new_unchecked(alloc(layout));
129
130            let chunk_layout = Layout::new::<Chunk>();
131            let chunk_ptr = alloc(chunk_layout) as *mut Chunk;
132
133            ptr::write(
134                chunk_ptr,
135                Chunk {
136                    data,
137                    offset: Cell::new(0),
138                    capacity: self.chunk_size,
139                    next: Cell::new(None),
140                },
141            );
142
143            let chunk = NonNull::new_unchecked(chunk_ptr);
144            self.chunks.lock().push(chunk);
145            chunk
146        }
147    }
148
149    /// Reset arena (reuse existing chunks)
150    pub fn reset(&self) {
151        let chunks = self.chunks.lock();
152        for &chunk in chunks.iter() {
153            unsafe {
154                chunk.as_ref().offset.set(0);
155                chunk.as_ref().next.set(None);
156            }
157        }
158
159        if let Some(first_chunk) = chunks.first() {
160            self.current.set(Some(*first_chunk));
161        }
162    }
163
164    /// Get total allocated bytes across all chunks
165    pub fn total_allocated(&self) -> usize {
166        self.chunks.lock().len() * self.chunk_size
167    }
168}
169
170impl Default for ArenaAllocator {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176impl Drop for ArenaAllocator {
177    fn drop(&mut self) {
178        let chunks = self.chunks.lock();
179        for &chunk in chunks.iter() {
180            unsafe {
181                let chunk_ref = chunk.as_ref();
182
183                // Deallocate data buffer
184                let data_layout = Layout::from_size_align_unchecked(chunk_ref.capacity, 64);
185                dealloc(chunk_ref.data.as_ptr(), data_layout);
186
187                // Deallocate chunk itself
188                let chunk_layout = Layout::new::<Chunk>();
189                dealloc(chunk.as_ptr() as *mut u8, chunk_layout);
190            }
191        }
192    }
193}
194
195unsafe impl Send for ArenaAllocator {}
196unsafe impl Sync for ArenaAllocator {}
197
198/// Query-scoped arena that resets after each query
199pub struct QueryArena {
200    arena: Arc<ArenaAllocator>,
201}
202
203impl QueryArena {
204    pub fn new() -> Self {
205        Self {
206            arena: Arc::new(ArenaAllocator::new()),
207        }
208    }
209
210    pub fn execute_query<F, R>(&self, f: F) -> R
211    where
212        F: FnOnce(&ArenaAllocator) -> R,
213    {
214        let result = f(&self.arena);
215        self.arena.reset();
216        result
217    }
218
219    pub fn arena(&self) -> &ArenaAllocator {
220        &self.arena
221    }
222}
223
224impl Default for QueryArena {
225    fn default() -> Self {
226        Self::new()
227    }
228}
229
230/// NUMA-aware allocator for multi-socket systems
231pub struct NumaAllocator {
232    /// Allocators per NUMA node
233    node_allocators: Vec<Arc<ArenaAllocator>>,
234    /// Current thread's preferred NUMA node
235    preferred_node: Cell<usize>,
236}
237
238impl NumaAllocator {
239    /// Create NUMA-aware allocator
240    pub fn new() -> Self {
241        let num_nodes = Self::detect_numa_nodes();
242        let node_allocators = (0..num_nodes)
243            .map(|_| Arc::new(ArenaAllocator::new()))
244            .collect();
245
246        Self {
247            node_allocators,
248            preferred_node: Cell::new(0),
249        }
250    }
251
252    /// Detect number of NUMA nodes (simplified)
253    fn detect_numa_nodes() -> usize {
254        // In a real implementation, this would use platform-specific APIs
255        // For now, assume 1 node per 8 CPUs
256        let cpus = num_cpus::get();
257        ((cpus + 7) / 8).max(1)
258    }
259
260    /// Allocate from preferred NUMA node
261    pub fn alloc<T>(&self) -> NonNull<T> {
262        let node = self.preferred_node.get();
263        self.node_allocators[node].alloc()
264    }
265
266    /// Set preferred NUMA node for current thread
267    pub fn set_preferred_node(&self, node: usize) {
268        if node < self.node_allocators.len() {
269            self.preferred_node.set(node);
270        }
271    }
272
273    /// Bind current thread to NUMA node
274    pub fn bind_to_node(&self, node: usize) {
275        self.set_preferred_node(node);
276
277        // In a real implementation, this would use platform-specific APIs
278        // to bind the thread to CPUs on the specified NUMA node
279        #[cfg(target_os = "linux")]
280        {
281            // Would use libnuma or similar
282        }
283    }
284}
285
286impl Default for NumaAllocator {
287    fn default() -> Self {
288        Self::new()
289    }
290}
291
292/// Object pool for reducing allocation overhead
293pub struct ObjectPool<T> {
294    /// Pool of available objects
295    available: Arc<crossbeam::queue::SegQueue<T>>,
296    /// Factory function
297    factory: Arc<dyn Fn() -> T + Send + Sync>,
298    /// Maximum pool size
299    max_size: usize,
300}
301
302impl<T> ObjectPool<T> {
303    pub fn new<F>(max_size: usize, factory: F) -> Self
304    where
305        F: Fn() -> T + Send + Sync + 'static,
306    {
307        Self {
308            available: Arc::new(crossbeam::queue::SegQueue::new()),
309            factory: Arc::new(factory),
310            max_size,
311        }
312    }
313
314    pub fn acquire(&self) -> PooledObject<T> {
315        let object = self.available.pop().unwrap_or_else(|| (self.factory)());
316
317        PooledObject {
318            object: Some(object),
319            pool: Arc::clone(&self.available),
320        }
321    }
322
323    pub fn len(&self) -> usize {
324        self.available.len()
325    }
326
327    pub fn is_empty(&self) -> bool {
328        self.available.is_empty()
329    }
330}
331
332/// RAII wrapper for pooled objects
333pub struct PooledObject<T> {
334    object: Option<T>,
335    pool: Arc<crossbeam::queue::SegQueue<T>>,
336}
337
338impl<T> PooledObject<T> {
339    pub fn get(&self) -> &T {
340        self.object.as_ref().unwrap()
341    }
342
343    pub fn get_mut(&mut self) -> &mut T {
344        self.object.as_mut().unwrap()
345    }
346}
347
348impl<T> Drop for PooledObject<T> {
349    fn drop(&mut self) {
350        if let Some(object) = self.object.take() {
351            let _ = self.pool.push(object);
352        }
353    }
354}
355
356impl<T> std::ops::Deref for PooledObject<T> {
357    type Target = T;
358    fn deref(&self) -> &Self::Target {
359        self.object.as_ref().unwrap()
360    }
361}
362
363impl<T> std::ops::DerefMut for PooledObject<T> {
364    fn deref_mut(&mut self) -> &mut Self::Target {
365        self.object.as_mut().unwrap()
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372
373    #[test]
374    fn test_arena_allocator() {
375        let arena = ArenaAllocator::new();
376
377        let ptr1 = arena.alloc::<u64>();
378        let ptr2 = arena.alloc::<u64>();
379
380        unsafe {
381            ptr1.as_ptr().write(42);
382            ptr2.as_ptr().write(84);
383
384            assert_eq!(ptr1.as_ptr().read(), 42);
385            assert_eq!(ptr2.as_ptr().read(), 84);
386        }
387    }
388
389    #[test]
390    fn test_arena_reset() {
391        let arena = ArenaAllocator::new();
392
393        for _ in 0..100 {
394            arena.alloc::<u64>();
395        }
396
397        let allocated_before = arena.total_allocated();
398        arena.reset();
399        let allocated_after = arena.total_allocated();
400
401        assert_eq!(allocated_before, allocated_after);
402    }
403
404    #[test]
405    fn test_query_arena() {
406        let query_arena = QueryArena::new();
407
408        let result = query_arena.execute_query(|arena| {
409            let ptr = arena.alloc::<u64>();
410            unsafe {
411                ptr.as_ptr().write(123);
412                ptr.as_ptr().read()
413            }
414        });
415
416        assert_eq!(result, 123);
417    }
418
419    #[test]
420    fn test_object_pool() {
421        let pool = ObjectPool::new(10, || Vec::<u8>::with_capacity(1024));
422
423        let mut obj = pool.acquire();
424        obj.push(42);
425        assert_eq!(obj[0], 42);
426
427        drop(obj);
428
429        let obj2 = pool.acquire();
430        assert!(obj2.capacity() >= 1024);
431    }
432}