ruvector_graph/optimization/
memory_pool.rs

1//! Custom memory allocators for graph query execution
2//!
3//! This module provides specialized allocators:
4//! - Arena allocation for query-scoped memory
5//! - Object pooling for frequent allocations
6//! - NUMA-aware allocation for distributed systems
7
8use parking_lot::Mutex;
9use std::alloc::{alloc, dealloc, Layout};
10use std::cell::Cell;
11use std::ptr::{self, NonNull};
12use std::sync::Arc;
13
14/// Arena allocator for query execution
15/// All allocations are freed together when the arena is dropped
16pub struct ArenaAllocator {
17    /// Current chunk
18    current: Cell<Option<NonNull<Chunk>>>,
19    /// All chunks (for cleanup)
20    chunks: Mutex<Vec<NonNull<Chunk>>>,
21    /// Default chunk size
22    chunk_size: usize,
23}
24
25struct Chunk {
26    /// Data buffer
27    data: NonNull<u8>,
28    /// Current offset in buffer
29    offset: Cell<usize>,
30    /// Total capacity
31    capacity: usize,
32    /// Next chunk in linked list
33    next: Cell<Option<NonNull<Chunk>>>,
34}
35
36impl ArenaAllocator {
37    /// Create a new arena with default chunk size (1MB)
38    pub fn new() -> Self {
39        Self::with_chunk_size(1024 * 1024)
40    }
41
42    /// Create arena with specific chunk size
43    pub fn with_chunk_size(chunk_size: usize) -> Self {
44        Self {
45            current: Cell::new(None),
46            chunks: Mutex::new(Vec::new()),
47            chunk_size,
48        }
49    }
50
51    /// Allocate memory from the arena
52    pub fn alloc<T>(&self) -> NonNull<T> {
53        let layout = Layout::new::<T>();
54        let ptr = self.alloc_layout(layout);
55        ptr.cast()
56    }
57
58    /// Allocate with specific layout
59    pub fn alloc_layout(&self, layout: Layout) -> NonNull<u8> {
60        let size = layout.size();
61        let align = layout.align();
62
63        // Get current chunk or allocate new one
64        let chunk = match self.current.get() {
65            Some(chunk) => chunk,
66            None => {
67                let chunk = self.allocate_chunk();
68                self.current.set(Some(chunk));
69                chunk
70            }
71        };
72
73        unsafe {
74            let chunk_ref = chunk.as_ref();
75            let offset = chunk_ref.offset.get();
76
77            // Align offset
78            let aligned_offset = (offset + align - 1) & !(align - 1);
79            let new_offset = aligned_offset + size;
80
81            if new_offset > chunk_ref.capacity {
82                // Need a new chunk
83                let new_chunk = self.allocate_chunk();
84                chunk_ref.next.set(Some(new_chunk));
85                self.current.set(Some(new_chunk));
86
87                // Retry allocation with new chunk
88                return self.alloc_layout(layout);
89            }
90
91            chunk_ref.offset.set(new_offset);
92            NonNull::new_unchecked(chunk_ref.data.as_ptr().add(aligned_offset))
93        }
94    }
95
96    /// Allocate a new chunk
97    fn allocate_chunk(&self) -> NonNull<Chunk> {
98        unsafe {
99            let layout = Layout::from_size_align_unchecked(self.chunk_size, 64);
100            let data = NonNull::new_unchecked(alloc(layout));
101
102            let chunk_layout = Layout::new::<Chunk>();
103            let chunk_ptr = alloc(chunk_layout) as *mut Chunk;
104
105            ptr::write(
106                chunk_ptr,
107                Chunk {
108                    data,
109                    offset: Cell::new(0),
110                    capacity: self.chunk_size,
111                    next: Cell::new(None),
112                },
113            );
114
115            let chunk = NonNull::new_unchecked(chunk_ptr);
116            self.chunks.lock().push(chunk);
117            chunk
118        }
119    }
120
121    /// Reset arena (reuse existing chunks)
122    pub fn reset(&self) {
123        let chunks = self.chunks.lock();
124        for &chunk in chunks.iter() {
125            unsafe {
126                chunk.as_ref().offset.set(0);
127                chunk.as_ref().next.set(None);
128            }
129        }
130
131        if let Some(first_chunk) = chunks.first() {
132            self.current.set(Some(*first_chunk));
133        }
134    }
135
136    /// Get total allocated bytes across all chunks
137    pub fn total_allocated(&self) -> usize {
138        self.chunks.lock().len() * self.chunk_size
139    }
140}
141
142impl Default for ArenaAllocator {
143    fn default() -> Self {
144        Self::new()
145    }
146}
147
148impl Drop for ArenaAllocator {
149    fn drop(&mut self) {
150        let chunks = self.chunks.lock();
151        for &chunk in chunks.iter() {
152            unsafe {
153                let chunk_ref = chunk.as_ref();
154
155                // Deallocate data buffer
156                let data_layout = Layout::from_size_align_unchecked(chunk_ref.capacity, 64);
157                dealloc(chunk_ref.data.as_ptr(), data_layout);
158
159                // Deallocate chunk itself
160                let chunk_layout = Layout::new::<Chunk>();
161                dealloc(chunk.as_ptr() as *mut u8, chunk_layout);
162            }
163        }
164    }
165}
166
167unsafe impl Send for ArenaAllocator {}
168unsafe impl Sync for ArenaAllocator {}
169
170/// Query-scoped arena that resets after each query
171pub struct QueryArena {
172    arena: Arc<ArenaAllocator>,
173}
174
175impl QueryArena {
176    pub fn new() -> Self {
177        Self {
178            arena: Arc::new(ArenaAllocator::new()),
179        }
180    }
181
182    pub fn execute_query<F, R>(&self, f: F) -> R
183    where
184        F: FnOnce(&ArenaAllocator) -> R,
185    {
186        let result = f(&self.arena);
187        self.arena.reset();
188        result
189    }
190
191    pub fn arena(&self) -> &ArenaAllocator {
192        &self.arena
193    }
194}
195
196impl Default for QueryArena {
197    fn default() -> Self {
198        Self::new()
199    }
200}
201
202/// NUMA-aware allocator for multi-socket systems
203pub struct NumaAllocator {
204    /// Allocators per NUMA node
205    node_allocators: Vec<Arc<ArenaAllocator>>,
206    /// Current thread's preferred NUMA node
207    preferred_node: Cell<usize>,
208}
209
210impl NumaAllocator {
211    /// Create NUMA-aware allocator
212    pub fn new() -> Self {
213        let num_nodes = Self::detect_numa_nodes();
214        let node_allocators = (0..num_nodes)
215            .map(|_| Arc::new(ArenaAllocator::new()))
216            .collect();
217
218        Self {
219            node_allocators,
220            preferred_node: Cell::new(0),
221        }
222    }
223
224    /// Detect number of NUMA nodes (simplified)
225    fn detect_numa_nodes() -> usize {
226        // In a real implementation, this would use platform-specific APIs
227        // For now, assume 1 node per 8 CPUs
228        let cpus = num_cpus::get();
229        ((cpus + 7) / 8).max(1)
230    }
231
232    /// Allocate from preferred NUMA node
233    pub fn alloc<T>(&self) -> NonNull<T> {
234        let node = self.preferred_node.get();
235        self.node_allocators[node].alloc()
236    }
237
238    /// Set preferred NUMA node for current thread
239    pub fn set_preferred_node(&self, node: usize) {
240        if node < self.node_allocators.len() {
241            self.preferred_node.set(node);
242        }
243    }
244
245    /// Bind current thread to NUMA node
246    pub fn bind_to_node(&self, node: usize) {
247        self.set_preferred_node(node);
248
249        // In a real implementation, this would use platform-specific APIs
250        // to bind the thread to CPUs on the specified NUMA node
251        #[cfg(target_os = "linux")]
252        {
253            // Would use libnuma or similar
254        }
255    }
256}
257
258impl Default for NumaAllocator {
259    fn default() -> Self {
260        Self::new()
261    }
262}
263
264/// Object pool for reducing allocation overhead
265pub struct ObjectPool<T> {
266    /// Pool of available objects
267    available: Arc<crossbeam::queue::SegQueue<T>>,
268    /// Factory function
269    factory: Arc<dyn Fn() -> T + Send + Sync>,
270    /// Maximum pool size
271    max_size: usize,
272}
273
274impl<T> ObjectPool<T> {
275    pub fn new<F>(max_size: usize, factory: F) -> Self
276    where
277        F: Fn() -> T + Send + Sync + 'static,
278    {
279        Self {
280            available: Arc::new(crossbeam::queue::SegQueue::new()),
281            factory: Arc::new(factory),
282            max_size,
283        }
284    }
285
286    pub fn acquire(&self) -> PooledObject<T> {
287        let object = self.available.pop().unwrap_or_else(|| (self.factory)());
288
289        PooledObject {
290            object: Some(object),
291            pool: Arc::clone(&self.available),
292        }
293    }
294
295    pub fn len(&self) -> usize {
296        self.available.len()
297    }
298
299    pub fn is_empty(&self) -> bool {
300        self.available.is_empty()
301    }
302}
303
304/// RAII wrapper for pooled objects
305pub struct PooledObject<T> {
306    object: Option<T>,
307    pool: Arc<crossbeam::queue::SegQueue<T>>,
308}
309
310impl<T> PooledObject<T> {
311    pub fn get(&self) -> &T {
312        self.object.as_ref().unwrap()
313    }
314
315    pub fn get_mut(&mut self) -> &mut T {
316        self.object.as_mut().unwrap()
317    }
318}
319
320impl<T> Drop for PooledObject<T> {
321    fn drop(&mut self) {
322        if let Some(object) = self.object.take() {
323            let _ = self.pool.push(object);
324        }
325    }
326}
327
328impl<T> std::ops::Deref for PooledObject<T> {
329    type Target = T;
330    fn deref(&self) -> &Self::Target {
331        self.object.as_ref().unwrap()
332    }
333}
334
335impl<T> std::ops::DerefMut for PooledObject<T> {
336    fn deref_mut(&mut self) -> &mut Self::Target {
337        self.object.as_mut().unwrap()
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344
345    #[test]
346    fn test_arena_allocator() {
347        let arena = ArenaAllocator::new();
348
349        let ptr1 = arena.alloc::<u64>();
350        let ptr2 = arena.alloc::<u64>();
351
352        unsafe {
353            ptr1.as_ptr().write(42);
354            ptr2.as_ptr().write(84);
355
356            assert_eq!(ptr1.as_ptr().read(), 42);
357            assert_eq!(ptr2.as_ptr().read(), 84);
358        }
359    }
360
361    #[test]
362    fn test_arena_reset() {
363        let arena = ArenaAllocator::new();
364
365        for _ in 0..100 {
366            arena.alloc::<u64>();
367        }
368
369        let allocated_before = arena.total_allocated();
370        arena.reset();
371        let allocated_after = arena.total_allocated();
372
373        assert_eq!(allocated_before, allocated_after);
374    }
375
376    #[test]
377    fn test_query_arena() {
378        let query_arena = QueryArena::new();
379
380        let result = query_arena.execute_query(|arena| {
381            let ptr = arena.alloc::<u64>();
382            unsafe {
383                ptr.as_ptr().write(123);
384                ptr.as_ptr().read()
385            }
386        });
387
388        assert_eq!(result, 123);
389    }
390
391    #[test]
392    fn test_object_pool() {
393        let pool = ObjectPool::new(10, || Vec::<u8>::with_capacity(1024));
394
395        let mut obj = pool.acquire();
396        obj.push(42);
397        assert_eq!(obj[0], 42);
398
399        drop(obj);
400
401        let obj2 = pool.acquire();
402        assert!(obj2.capacity() >= 1024);
403    }
404}