ruvector_graph/optimization/
memory_pool.rs1use parking_lot::Mutex;
9use std::alloc::{alloc, dealloc, Layout};
10use std::cell::Cell;
11use std::ptr::{self, NonNull};
12use std::sync::Arc;
13
14pub struct ArenaAllocator {
17 current: Cell<Option<NonNull<Chunk>>>,
19 chunks: Mutex<Vec<NonNull<Chunk>>>,
21 chunk_size: usize,
23}
24
25struct Chunk {
26 data: NonNull<u8>,
28 offset: Cell<usize>,
30 capacity: usize,
32 next: Cell<Option<NonNull<Chunk>>>,
34}
35
36impl ArenaAllocator {
37 pub fn new() -> Self {
39 Self::with_chunk_size(1024 * 1024)
40 }
41
42 pub fn with_chunk_size(chunk_size: usize) -> Self {
44 Self {
45 current: Cell::new(None),
46 chunks: Mutex::new(Vec::new()),
47 chunk_size,
48 }
49 }
50
51 pub fn alloc<T>(&self) -> NonNull<T> {
53 let layout = Layout::new::<T>();
54 let ptr = self.alloc_layout(layout);
55 ptr.cast()
56 }
57
58 pub fn alloc_layout(&self, layout: Layout) -> NonNull<u8> {
60 let size = layout.size();
61 let align = layout.align();
62
63 assert!(size > 0, "Cannot allocate zero bytes");
65 assert!(
66 align > 0 && align.is_power_of_two(),
67 "Alignment must be a power of 2"
68 );
69 assert!(size <= isize::MAX as usize, "Allocation size too large");
70
71 let chunk = match self.current.get() {
73 Some(chunk) => chunk,
74 None => {
75 let chunk = self.allocate_chunk();
76 self.current.set(Some(chunk));
77 chunk
78 }
79 };
80
81 unsafe {
82 let chunk_ref = chunk.as_ref();
83 let offset = chunk_ref.offset.get();
84
85 let aligned_offset = (offset + align - 1) & !(align - 1);
87
88 if aligned_offset < offset {
90 panic!("Alignment calculation overflow");
91 }
92
93 let new_offset = aligned_offset
94 .checked_add(size)
95 .expect("Arena allocation overflow");
96
97 if new_offset > chunk_ref.capacity {
98 let new_chunk = self.allocate_chunk();
100 chunk_ref.next.set(Some(new_chunk));
101 self.current.set(Some(new_chunk));
102
103 return self.alloc_layout(layout);
105 }
106
107 chunk_ref.offset.set(new_offset);
108
109 let result_ptr = chunk_ref.data.as_ptr().add(aligned_offset);
111 debug_assert!(
112 result_ptr as usize >= chunk_ref.data.as_ptr() as usize,
113 "Pointer arithmetic underflow"
114 );
115 debug_assert!(
116 result_ptr as usize <= chunk_ref.data.as_ptr().add(chunk_ref.capacity) as usize,
117 "Pointer arithmetic overflow"
118 );
119
120 NonNull::new_unchecked(result_ptr)
121 }
122 }
123
124 fn allocate_chunk(&self) -> NonNull<Chunk> {
126 unsafe {
127 let layout = Layout::from_size_align_unchecked(self.chunk_size, 64);
128 let data = NonNull::new_unchecked(alloc(layout));
129
130 let chunk_layout = Layout::new::<Chunk>();
131 let chunk_ptr = alloc(chunk_layout) as *mut Chunk;
132
133 ptr::write(
134 chunk_ptr,
135 Chunk {
136 data,
137 offset: Cell::new(0),
138 capacity: self.chunk_size,
139 next: Cell::new(None),
140 },
141 );
142
143 let chunk = NonNull::new_unchecked(chunk_ptr);
144 self.chunks.lock().push(chunk);
145 chunk
146 }
147 }
148
149 pub fn reset(&self) {
151 let chunks = self.chunks.lock();
152 for &chunk in chunks.iter() {
153 unsafe {
154 chunk.as_ref().offset.set(0);
155 chunk.as_ref().next.set(None);
156 }
157 }
158
159 if let Some(first_chunk) = chunks.first() {
160 self.current.set(Some(*first_chunk));
161 }
162 }
163
164 pub fn total_allocated(&self) -> usize {
166 self.chunks.lock().len() * self.chunk_size
167 }
168}
169
170impl Default for ArenaAllocator {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176impl Drop for ArenaAllocator {
177 fn drop(&mut self) {
178 let chunks = self.chunks.lock();
179 for &chunk in chunks.iter() {
180 unsafe {
181 let chunk_ref = chunk.as_ref();
182
183 let data_layout = Layout::from_size_align_unchecked(chunk_ref.capacity, 64);
185 dealloc(chunk_ref.data.as_ptr(), data_layout);
186
187 let chunk_layout = Layout::new::<Chunk>();
189 dealloc(chunk.as_ptr() as *mut u8, chunk_layout);
190 }
191 }
192 }
193}
194
195unsafe impl Send for ArenaAllocator {}
196unsafe impl Sync for ArenaAllocator {}
197
198pub struct QueryArena {
200 arena: Arc<ArenaAllocator>,
201}
202
203impl QueryArena {
204 pub fn new() -> Self {
205 Self {
206 arena: Arc::new(ArenaAllocator::new()),
207 }
208 }
209
210 pub fn execute_query<F, R>(&self, f: F) -> R
211 where
212 F: FnOnce(&ArenaAllocator) -> R,
213 {
214 let result = f(&self.arena);
215 self.arena.reset();
216 result
217 }
218
219 pub fn arena(&self) -> &ArenaAllocator {
220 &self.arena
221 }
222}
223
224impl Default for QueryArena {
225 fn default() -> Self {
226 Self::new()
227 }
228}
229
230pub struct NumaAllocator {
232 node_allocators: Vec<Arc<ArenaAllocator>>,
234 preferred_node: Cell<usize>,
236}
237
238impl NumaAllocator {
239 pub fn new() -> Self {
241 let num_nodes = Self::detect_numa_nodes();
242 let node_allocators = (0..num_nodes)
243 .map(|_| Arc::new(ArenaAllocator::new()))
244 .collect();
245
246 Self {
247 node_allocators,
248 preferred_node: Cell::new(0),
249 }
250 }
251
252 fn detect_numa_nodes() -> usize {
254 let cpus = num_cpus::get();
257 ((cpus + 7) / 8).max(1)
258 }
259
260 pub fn alloc<T>(&self) -> NonNull<T> {
262 let node = self.preferred_node.get();
263 self.node_allocators[node].alloc()
264 }
265
266 pub fn set_preferred_node(&self, node: usize) {
268 if node < self.node_allocators.len() {
269 self.preferred_node.set(node);
270 }
271 }
272
273 pub fn bind_to_node(&self, node: usize) {
275 self.set_preferred_node(node);
276
277 #[cfg(target_os = "linux")]
280 {
281 }
283 }
284}
285
286impl Default for NumaAllocator {
287 fn default() -> Self {
288 Self::new()
289 }
290}
291
292pub struct ObjectPool<T> {
294 available: Arc<crossbeam::queue::SegQueue<T>>,
296 factory: Arc<dyn Fn() -> T + Send + Sync>,
298 max_size: usize,
300}
301
302impl<T> ObjectPool<T> {
303 pub fn new<F>(max_size: usize, factory: F) -> Self
304 where
305 F: Fn() -> T + Send + Sync + 'static,
306 {
307 Self {
308 available: Arc::new(crossbeam::queue::SegQueue::new()),
309 factory: Arc::new(factory),
310 max_size,
311 }
312 }
313
314 pub fn acquire(&self) -> PooledObject<T> {
315 let object = self.available.pop().unwrap_or_else(|| (self.factory)());
316
317 PooledObject {
318 object: Some(object),
319 pool: Arc::clone(&self.available),
320 }
321 }
322
323 pub fn len(&self) -> usize {
324 self.available.len()
325 }
326
327 pub fn is_empty(&self) -> bool {
328 self.available.is_empty()
329 }
330}
331
332pub struct PooledObject<T> {
334 object: Option<T>,
335 pool: Arc<crossbeam::queue::SegQueue<T>>,
336}
337
338impl<T> PooledObject<T> {
339 pub fn get(&self) -> &T {
340 self.object.as_ref().unwrap()
341 }
342
343 pub fn get_mut(&mut self) -> &mut T {
344 self.object.as_mut().unwrap()
345 }
346}
347
348impl<T> Drop for PooledObject<T> {
349 fn drop(&mut self) {
350 if let Some(object) = self.object.take() {
351 let _ = self.pool.push(object);
352 }
353 }
354}
355
356impl<T> std::ops::Deref for PooledObject<T> {
357 type Target = T;
358 fn deref(&self) -> &Self::Target {
359 self.object.as_ref().unwrap()
360 }
361}
362
363impl<T> std::ops::DerefMut for PooledObject<T> {
364 fn deref_mut(&mut self) -> &mut Self::Target {
365 self.object.as_mut().unwrap()
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372
373 #[test]
374 fn test_arena_allocator() {
375 let arena = ArenaAllocator::new();
376
377 let ptr1 = arena.alloc::<u64>();
378 let ptr2 = arena.alloc::<u64>();
379
380 unsafe {
381 ptr1.as_ptr().write(42);
382 ptr2.as_ptr().write(84);
383
384 assert_eq!(ptr1.as_ptr().read(), 42);
385 assert_eq!(ptr2.as_ptr().read(), 84);
386 }
387 }
388
389 #[test]
390 fn test_arena_reset() {
391 let arena = ArenaAllocator::new();
392
393 for _ in 0..100 {
394 arena.alloc::<u64>();
395 }
396
397 let allocated_before = arena.total_allocated();
398 arena.reset();
399 let allocated_after = arena.total_allocated();
400
401 assert_eq!(allocated_before, allocated_after);
402 }
403
404 #[test]
405 fn test_query_arena() {
406 let query_arena = QueryArena::new();
407
408 let result = query_arena.execute_query(|arena| {
409 let ptr = arena.alloc::<u64>();
410 unsafe {
411 ptr.as_ptr().write(123);
412 ptr.as_ptr().read()
413 }
414 });
415
416 assert_eq!(result, 123);
417 }
418
419 #[test]
420 fn test_object_pool() {
421 let pool = ObjectPool::new(10, || Vec::<u8>::with_capacity(1024));
422
423 let mut obj = pool.acquire();
424 obj.push(42);
425 assert_eq!(obj[0], 42);
426
427 drop(obj);
428
429 let obj2 = pool.acquire();
430 assert!(obj2.capacity() >= 1024);
431 }
432}