ruvector_graph/optimization/
memory_pool.rs1use parking_lot::Mutex;
9use std::alloc::{alloc, dealloc, Layout};
10use std::cell::Cell;
11use std::ptr::{self, NonNull};
12use std::sync::Arc;
13
14pub struct ArenaAllocator {
17 current: Cell<Option<NonNull<Chunk>>>,
19 chunks: Mutex<Vec<NonNull<Chunk>>>,
21 chunk_size: usize,
23}
24
25struct Chunk {
26 data: NonNull<u8>,
28 offset: Cell<usize>,
30 capacity: usize,
32 next: Cell<Option<NonNull<Chunk>>>,
34}
35
36impl ArenaAllocator {
37 pub fn new() -> Self {
39 Self::with_chunk_size(1024 * 1024)
40 }
41
42 pub fn with_chunk_size(chunk_size: usize) -> Self {
44 Self {
45 current: Cell::new(None),
46 chunks: Mutex::new(Vec::new()),
47 chunk_size,
48 }
49 }
50
51 pub fn alloc<T>(&self) -> NonNull<T> {
53 let layout = Layout::new::<T>();
54 let ptr = self.alloc_layout(layout);
55 ptr.cast()
56 }
57
58 pub fn alloc_layout(&self, layout: Layout) -> NonNull<u8> {
60 let size = layout.size();
61 let align = layout.align();
62
63 assert!(size > 0, "Cannot allocate zero bytes");
65 assert!(align > 0 && align.is_power_of_two(), "Alignment must be a power of 2");
66 assert!(size <= isize::MAX as usize, "Allocation size too large");
67
68 let chunk = match self.current.get() {
70 Some(chunk) => chunk,
71 None => {
72 let chunk = self.allocate_chunk();
73 self.current.set(Some(chunk));
74 chunk
75 }
76 };
77
78 unsafe {
79 let chunk_ref = chunk.as_ref();
80 let offset = chunk_ref.offset.get();
81
82 let aligned_offset = (offset + align - 1) & !(align - 1);
84
85 if aligned_offset < offset {
87 panic!("Alignment calculation overflow");
88 }
89
90 let new_offset = aligned_offset.checked_add(size)
91 .expect("Arena allocation overflow");
92
93 if new_offset > chunk_ref.capacity {
94 let new_chunk = self.allocate_chunk();
96 chunk_ref.next.set(Some(new_chunk));
97 self.current.set(Some(new_chunk));
98
99 return self.alloc_layout(layout);
101 }
102
103 chunk_ref.offset.set(new_offset);
104
105 let result_ptr = chunk_ref.data.as_ptr().add(aligned_offset);
107 debug_assert!(
108 result_ptr as usize >= chunk_ref.data.as_ptr() as usize,
109 "Pointer arithmetic underflow"
110 );
111 debug_assert!(
112 result_ptr as usize <= chunk_ref.data.as_ptr().add(chunk_ref.capacity) as usize,
113 "Pointer arithmetic overflow"
114 );
115
116 NonNull::new_unchecked(result_ptr)
117 }
118 }
119
120 fn allocate_chunk(&self) -> NonNull<Chunk> {
122 unsafe {
123 let layout = Layout::from_size_align_unchecked(self.chunk_size, 64);
124 let data = NonNull::new_unchecked(alloc(layout));
125
126 let chunk_layout = Layout::new::<Chunk>();
127 let chunk_ptr = alloc(chunk_layout) as *mut Chunk;
128
129 ptr::write(
130 chunk_ptr,
131 Chunk {
132 data,
133 offset: Cell::new(0),
134 capacity: self.chunk_size,
135 next: Cell::new(None),
136 },
137 );
138
139 let chunk = NonNull::new_unchecked(chunk_ptr);
140 self.chunks.lock().push(chunk);
141 chunk
142 }
143 }
144
145 pub fn reset(&self) {
147 let chunks = self.chunks.lock();
148 for &chunk in chunks.iter() {
149 unsafe {
150 chunk.as_ref().offset.set(0);
151 chunk.as_ref().next.set(None);
152 }
153 }
154
155 if let Some(first_chunk) = chunks.first() {
156 self.current.set(Some(*first_chunk));
157 }
158 }
159
160 pub fn total_allocated(&self) -> usize {
162 self.chunks.lock().len() * self.chunk_size
163 }
164}
165
166impl Default for ArenaAllocator {
167 fn default() -> Self {
168 Self::new()
169 }
170}
171
172impl Drop for ArenaAllocator {
173 fn drop(&mut self) {
174 let chunks = self.chunks.lock();
175 for &chunk in chunks.iter() {
176 unsafe {
177 let chunk_ref = chunk.as_ref();
178
179 let data_layout = Layout::from_size_align_unchecked(chunk_ref.capacity, 64);
181 dealloc(chunk_ref.data.as_ptr(), data_layout);
182
183 let chunk_layout = Layout::new::<Chunk>();
185 dealloc(chunk.as_ptr() as *mut u8, chunk_layout);
186 }
187 }
188 }
189}
190
191unsafe impl Send for ArenaAllocator {}
192unsafe impl Sync for ArenaAllocator {}
193
194pub struct QueryArena {
196 arena: Arc<ArenaAllocator>,
197}
198
199impl QueryArena {
200 pub fn new() -> Self {
201 Self {
202 arena: Arc::new(ArenaAllocator::new()),
203 }
204 }
205
206 pub fn execute_query<F, R>(&self, f: F) -> R
207 where
208 F: FnOnce(&ArenaAllocator) -> R,
209 {
210 let result = f(&self.arena);
211 self.arena.reset();
212 result
213 }
214
215 pub fn arena(&self) -> &ArenaAllocator {
216 &self.arena
217 }
218}
219
220impl Default for QueryArena {
221 fn default() -> Self {
222 Self::new()
223 }
224}
225
226pub struct NumaAllocator {
228 node_allocators: Vec<Arc<ArenaAllocator>>,
230 preferred_node: Cell<usize>,
232}
233
234impl NumaAllocator {
235 pub fn new() -> Self {
237 let num_nodes = Self::detect_numa_nodes();
238 let node_allocators = (0..num_nodes)
239 .map(|_| Arc::new(ArenaAllocator::new()))
240 .collect();
241
242 Self {
243 node_allocators,
244 preferred_node: Cell::new(0),
245 }
246 }
247
248 fn detect_numa_nodes() -> usize {
250 let cpus = num_cpus::get();
253 ((cpus + 7) / 8).max(1)
254 }
255
256 pub fn alloc<T>(&self) -> NonNull<T> {
258 let node = self.preferred_node.get();
259 self.node_allocators[node].alloc()
260 }
261
262 pub fn set_preferred_node(&self, node: usize) {
264 if node < self.node_allocators.len() {
265 self.preferred_node.set(node);
266 }
267 }
268
269 pub fn bind_to_node(&self, node: usize) {
271 self.set_preferred_node(node);
272
273 #[cfg(target_os = "linux")]
276 {
277 }
279 }
280}
281
282impl Default for NumaAllocator {
283 fn default() -> Self {
284 Self::new()
285 }
286}
287
288pub struct ObjectPool<T> {
290 available: Arc<crossbeam::queue::SegQueue<T>>,
292 factory: Arc<dyn Fn() -> T + Send + Sync>,
294 max_size: usize,
296}
297
298impl<T> ObjectPool<T> {
299 pub fn new<F>(max_size: usize, factory: F) -> Self
300 where
301 F: Fn() -> T + Send + Sync + 'static,
302 {
303 Self {
304 available: Arc::new(crossbeam::queue::SegQueue::new()),
305 factory: Arc::new(factory),
306 max_size,
307 }
308 }
309
310 pub fn acquire(&self) -> PooledObject<T> {
311 let object = self.available.pop().unwrap_or_else(|| (self.factory)());
312
313 PooledObject {
314 object: Some(object),
315 pool: Arc::clone(&self.available),
316 }
317 }
318
319 pub fn len(&self) -> usize {
320 self.available.len()
321 }
322
323 pub fn is_empty(&self) -> bool {
324 self.available.is_empty()
325 }
326}
327
328pub struct PooledObject<T> {
330 object: Option<T>,
331 pool: Arc<crossbeam::queue::SegQueue<T>>,
332}
333
334impl<T> PooledObject<T> {
335 pub fn get(&self) -> &T {
336 self.object.as_ref().unwrap()
337 }
338
339 pub fn get_mut(&mut self) -> &mut T {
340 self.object.as_mut().unwrap()
341 }
342}
343
344impl<T> Drop for PooledObject<T> {
345 fn drop(&mut self) {
346 if let Some(object) = self.object.take() {
347 let _ = self.pool.push(object);
348 }
349 }
350}
351
352impl<T> std::ops::Deref for PooledObject<T> {
353 type Target = T;
354 fn deref(&self) -> &Self::Target {
355 self.object.as_ref().unwrap()
356 }
357}
358
359impl<T> std::ops::DerefMut for PooledObject<T> {
360 fn deref_mut(&mut self) -> &mut Self::Target {
361 self.object.as_mut().unwrap()
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368
369 #[test]
370 fn test_arena_allocator() {
371 let arena = ArenaAllocator::new();
372
373 let ptr1 = arena.alloc::<u64>();
374 let ptr2 = arena.alloc::<u64>();
375
376 unsafe {
377 ptr1.as_ptr().write(42);
378 ptr2.as_ptr().write(84);
379
380 assert_eq!(ptr1.as_ptr().read(), 42);
381 assert_eq!(ptr2.as_ptr().read(), 84);
382 }
383 }
384
385 #[test]
386 fn test_arena_reset() {
387 let arena = ArenaAllocator::new();
388
389 for _ in 0..100 {
390 arena.alloc::<u64>();
391 }
392
393 let allocated_before = arena.total_allocated();
394 arena.reset();
395 let allocated_after = arena.total_allocated();
396
397 assert_eq!(allocated_before, allocated_after);
398 }
399
400 #[test]
401 fn test_query_arena() {
402 let query_arena = QueryArena::new();
403
404 let result = query_arena.execute_query(|arena| {
405 let ptr = arena.alloc::<u64>();
406 unsafe {
407 ptr.as_ptr().write(123);
408 ptr.as_ptr().read()
409 }
410 });
411
412 assert_eq!(result, 123);
413 }
414
415 #[test]
416 fn test_object_pool() {
417 let pool = ObjectPool::new(10, || Vec::<u8>::with_capacity(1024));
418
419 let mut obj = pool.acquire();
420 obj.push(42);
421 assert_eq!(obj[0], 42);
422
423 drop(obj);
424
425 let obj2 = pool.acquire();
426 assert!(obj2.capacity() >= 1024);
427 }
428}