ruvector_graph/optimization/
memory_pool.rs

1//! Custom memory allocators for graph query execution
2//!
3//! This module provides specialized allocators:
4//! - Arena allocation for query-scoped memory
5//! - Object pooling for frequent allocations
6//! - NUMA-aware allocation for distributed systems
7
8use parking_lot::Mutex;
9use std::alloc::{alloc, dealloc, Layout};
10use std::cell::Cell;
11use std::ptr::{self, NonNull};
12use std::sync::Arc;
13
14/// Arena allocator for query execution
15/// All allocations are freed together when the arena is dropped
16pub struct ArenaAllocator {
17    /// Current chunk
18    current: Cell<Option<NonNull<Chunk>>>,
19    /// All chunks (for cleanup)
20    chunks: Mutex<Vec<NonNull<Chunk>>>,
21    /// Default chunk size
22    chunk_size: usize,
23}
24
25struct Chunk {
26    /// Data buffer
27    data: NonNull<u8>,
28    /// Current offset in buffer
29    offset: Cell<usize>,
30    /// Total capacity
31    capacity: usize,
32    /// Next chunk in linked list
33    next: Cell<Option<NonNull<Chunk>>>,
34}
35
36impl ArenaAllocator {
37    /// Create a new arena with default chunk size (1MB)
38    pub fn new() -> Self {
39        Self::with_chunk_size(1024 * 1024)
40    }
41
42    /// Create arena with specific chunk size
43    pub fn with_chunk_size(chunk_size: usize) -> Self {
44        Self {
45            current: Cell::new(None),
46            chunks: Mutex::new(Vec::new()),
47            chunk_size,
48        }
49    }
50
51    /// Allocate memory from the arena
52    pub fn alloc<T>(&self) -> NonNull<T> {
53        let layout = Layout::new::<T>();
54        let ptr = self.alloc_layout(layout);
55        ptr.cast()
56    }
57
58    /// Allocate with specific layout
59    pub fn alloc_layout(&self, layout: Layout) -> NonNull<u8> {
60        let size = layout.size();
61        let align = layout.align();
62
63        // SECURITY: Validate layout parameters
64        assert!(size > 0, "Cannot allocate zero bytes");
65        assert!(align > 0 && align.is_power_of_two(), "Alignment must be a power of 2");
66        assert!(size <= isize::MAX as usize, "Allocation size too large");
67
68        // Get current chunk or allocate new one
69        let chunk = match self.current.get() {
70            Some(chunk) => chunk,
71            None => {
72                let chunk = self.allocate_chunk();
73                self.current.set(Some(chunk));
74                chunk
75            }
76        };
77
78        unsafe {
79            let chunk_ref = chunk.as_ref();
80            let offset = chunk_ref.offset.get();
81
82            // Align offset
83            let aligned_offset = (offset + align - 1) & !(align - 1);
84
85            // SECURITY: Check for overflow in alignment calculation
86            if aligned_offset < offset {
87                panic!("Alignment calculation overflow");
88            }
89
90            let new_offset = aligned_offset.checked_add(size)
91                .expect("Arena allocation overflow");
92
93            if new_offset > chunk_ref.capacity {
94                // Need a new chunk
95                let new_chunk = self.allocate_chunk();
96                chunk_ref.next.set(Some(new_chunk));
97                self.current.set(Some(new_chunk));
98
99                // Retry allocation with new chunk
100                return self.alloc_layout(layout);
101            }
102
103            chunk_ref.offset.set(new_offset);
104
105            // SECURITY: Verify pointer arithmetic is safe
106            let result_ptr = chunk_ref.data.as_ptr().add(aligned_offset);
107            debug_assert!(
108                result_ptr as usize >= chunk_ref.data.as_ptr() as usize,
109                "Pointer arithmetic underflow"
110            );
111            debug_assert!(
112                result_ptr as usize <= chunk_ref.data.as_ptr().add(chunk_ref.capacity) as usize,
113                "Pointer arithmetic overflow"
114            );
115
116            NonNull::new_unchecked(result_ptr)
117        }
118    }
119
120    /// Allocate a new chunk
121    fn allocate_chunk(&self) -> NonNull<Chunk> {
122        unsafe {
123            let layout = Layout::from_size_align_unchecked(self.chunk_size, 64);
124            let data = NonNull::new_unchecked(alloc(layout));
125
126            let chunk_layout = Layout::new::<Chunk>();
127            let chunk_ptr = alloc(chunk_layout) as *mut Chunk;
128
129            ptr::write(
130                chunk_ptr,
131                Chunk {
132                    data,
133                    offset: Cell::new(0),
134                    capacity: self.chunk_size,
135                    next: Cell::new(None),
136                },
137            );
138
139            let chunk = NonNull::new_unchecked(chunk_ptr);
140            self.chunks.lock().push(chunk);
141            chunk
142        }
143    }
144
145    /// Reset arena (reuse existing chunks)
146    pub fn reset(&self) {
147        let chunks = self.chunks.lock();
148        for &chunk in chunks.iter() {
149            unsafe {
150                chunk.as_ref().offset.set(0);
151                chunk.as_ref().next.set(None);
152            }
153        }
154
155        if let Some(first_chunk) = chunks.first() {
156            self.current.set(Some(*first_chunk));
157        }
158    }
159
160    /// Get total allocated bytes across all chunks
161    pub fn total_allocated(&self) -> usize {
162        self.chunks.lock().len() * self.chunk_size
163    }
164}
165
166impl Default for ArenaAllocator {
167    fn default() -> Self {
168        Self::new()
169    }
170}
171
172impl Drop for ArenaAllocator {
173    fn drop(&mut self) {
174        let chunks = self.chunks.lock();
175        for &chunk in chunks.iter() {
176            unsafe {
177                let chunk_ref = chunk.as_ref();
178
179                // Deallocate data buffer
180                let data_layout = Layout::from_size_align_unchecked(chunk_ref.capacity, 64);
181                dealloc(chunk_ref.data.as_ptr(), data_layout);
182
183                // Deallocate chunk itself
184                let chunk_layout = Layout::new::<Chunk>();
185                dealloc(chunk.as_ptr() as *mut u8, chunk_layout);
186            }
187        }
188    }
189}
190
191unsafe impl Send for ArenaAllocator {}
192unsafe impl Sync for ArenaAllocator {}
193
194/// Query-scoped arena that resets after each query
195pub struct QueryArena {
196    arena: Arc<ArenaAllocator>,
197}
198
199impl QueryArena {
200    pub fn new() -> Self {
201        Self {
202            arena: Arc::new(ArenaAllocator::new()),
203        }
204    }
205
206    pub fn execute_query<F, R>(&self, f: F) -> R
207    where
208        F: FnOnce(&ArenaAllocator) -> R,
209    {
210        let result = f(&self.arena);
211        self.arena.reset();
212        result
213    }
214
215    pub fn arena(&self) -> &ArenaAllocator {
216        &self.arena
217    }
218}
219
220impl Default for QueryArena {
221    fn default() -> Self {
222        Self::new()
223    }
224}
225
226/// NUMA-aware allocator for multi-socket systems
227pub struct NumaAllocator {
228    /// Allocators per NUMA node
229    node_allocators: Vec<Arc<ArenaAllocator>>,
230    /// Current thread's preferred NUMA node
231    preferred_node: Cell<usize>,
232}
233
234impl NumaAllocator {
235    /// Create NUMA-aware allocator
236    pub fn new() -> Self {
237        let num_nodes = Self::detect_numa_nodes();
238        let node_allocators = (0..num_nodes)
239            .map(|_| Arc::new(ArenaAllocator::new()))
240            .collect();
241
242        Self {
243            node_allocators,
244            preferred_node: Cell::new(0),
245        }
246    }
247
248    /// Detect number of NUMA nodes (simplified)
249    fn detect_numa_nodes() -> usize {
250        // In a real implementation, this would use platform-specific APIs
251        // For now, assume 1 node per 8 CPUs
252        let cpus = num_cpus::get();
253        ((cpus + 7) / 8).max(1)
254    }
255
256    /// Allocate from preferred NUMA node
257    pub fn alloc<T>(&self) -> NonNull<T> {
258        let node = self.preferred_node.get();
259        self.node_allocators[node].alloc()
260    }
261
262    /// Set preferred NUMA node for current thread
263    pub fn set_preferred_node(&self, node: usize) {
264        if node < self.node_allocators.len() {
265            self.preferred_node.set(node);
266        }
267    }
268
269    /// Bind current thread to NUMA node
270    pub fn bind_to_node(&self, node: usize) {
271        self.set_preferred_node(node);
272
273        // In a real implementation, this would use platform-specific APIs
274        // to bind the thread to CPUs on the specified NUMA node
275        #[cfg(target_os = "linux")]
276        {
277            // Would use libnuma or similar
278        }
279    }
280}
281
282impl Default for NumaAllocator {
283    fn default() -> Self {
284        Self::new()
285    }
286}
287
288/// Object pool for reducing allocation overhead
289pub struct ObjectPool<T> {
290    /// Pool of available objects
291    available: Arc<crossbeam::queue::SegQueue<T>>,
292    /// Factory function
293    factory: Arc<dyn Fn() -> T + Send + Sync>,
294    /// Maximum pool size
295    max_size: usize,
296}
297
298impl<T> ObjectPool<T> {
299    pub fn new<F>(max_size: usize, factory: F) -> Self
300    where
301        F: Fn() -> T + Send + Sync + 'static,
302    {
303        Self {
304            available: Arc::new(crossbeam::queue::SegQueue::new()),
305            factory: Arc::new(factory),
306            max_size,
307        }
308    }
309
310    pub fn acquire(&self) -> PooledObject<T> {
311        let object = self.available.pop().unwrap_or_else(|| (self.factory)());
312
313        PooledObject {
314            object: Some(object),
315            pool: Arc::clone(&self.available),
316        }
317    }
318
319    pub fn len(&self) -> usize {
320        self.available.len()
321    }
322
323    pub fn is_empty(&self) -> bool {
324        self.available.is_empty()
325    }
326}
327
328/// RAII wrapper for pooled objects
329pub struct PooledObject<T> {
330    object: Option<T>,
331    pool: Arc<crossbeam::queue::SegQueue<T>>,
332}
333
334impl<T> PooledObject<T> {
335    pub fn get(&self) -> &T {
336        self.object.as_ref().unwrap()
337    }
338
339    pub fn get_mut(&mut self) -> &mut T {
340        self.object.as_mut().unwrap()
341    }
342}
343
344impl<T> Drop for PooledObject<T> {
345    fn drop(&mut self) {
346        if let Some(object) = self.object.take() {
347            let _ = self.pool.push(object);
348        }
349    }
350}
351
352impl<T> std::ops::Deref for PooledObject<T> {
353    type Target = T;
354    fn deref(&self) -> &Self::Target {
355        self.object.as_ref().unwrap()
356    }
357}
358
359impl<T> std::ops::DerefMut for PooledObject<T> {
360    fn deref_mut(&mut self) -> &mut Self::Target {
361        self.object.as_mut().unwrap()
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368
369    #[test]
370    fn test_arena_allocator() {
371        let arena = ArenaAllocator::new();
372
373        let ptr1 = arena.alloc::<u64>();
374        let ptr2 = arena.alloc::<u64>();
375
376        unsafe {
377            ptr1.as_ptr().write(42);
378            ptr2.as_ptr().write(84);
379
380            assert_eq!(ptr1.as_ptr().read(), 42);
381            assert_eq!(ptr2.as_ptr().read(), 84);
382        }
383    }
384
385    #[test]
386    fn test_arena_reset() {
387        let arena = ArenaAllocator::new();
388
389        for _ in 0..100 {
390            arena.alloc::<u64>();
391        }
392
393        let allocated_before = arena.total_allocated();
394        arena.reset();
395        let allocated_after = arena.total_allocated();
396
397        assert_eq!(allocated_before, allocated_after);
398    }
399
400    #[test]
401    fn test_query_arena() {
402        let query_arena = QueryArena::new();
403
404        let result = query_arena.execute_query(|arena| {
405            let ptr = arena.alloc::<u64>();
406            unsafe {
407                ptr.as_ptr().write(123);
408                ptr.as_ptr().read()
409            }
410        });
411
412        assert_eq!(result, 123);
413    }
414
415    #[test]
416    fn test_object_pool() {
417        let pool = ObjectPool::new(10, || Vec::<u8>::with_capacity(1024));
418
419        let mut obj = pool.acquire();
420        obj.push(42);
421        assert_eq!(obj[0], 42);
422
423        drop(obj);
424
425        let obj2 = pool.acquire();
426        assert!(obj2.capacity() >= 1024);
427    }
428}