ruvector_graph/optimization/
memory_pool.rs1use parking_lot::Mutex;
9use std::alloc::{alloc, dealloc, Layout};
10use std::cell::Cell;
11use std::ptr::{self, NonNull};
12use std::sync::Arc;
13
14pub struct ArenaAllocator {
17 current: Cell<Option<NonNull<Chunk>>>,
19 chunks: Mutex<Vec<NonNull<Chunk>>>,
21 chunk_size: usize,
23}
24
25struct Chunk {
26 data: NonNull<u8>,
28 offset: Cell<usize>,
30 capacity: usize,
32 next: Cell<Option<NonNull<Chunk>>>,
34}
35
36impl ArenaAllocator {
37 pub fn new() -> Self {
39 Self::with_chunk_size(1024 * 1024)
40 }
41
42 pub fn with_chunk_size(chunk_size: usize) -> Self {
44 Self {
45 current: Cell::new(None),
46 chunks: Mutex::new(Vec::new()),
47 chunk_size,
48 }
49 }
50
51 pub fn alloc<T>(&self) -> NonNull<T> {
53 let layout = Layout::new::<T>();
54 let ptr = self.alloc_layout(layout);
55 ptr.cast()
56 }
57
58 pub fn alloc_layout(&self, layout: Layout) -> NonNull<u8> {
60 let size = layout.size();
61 let align = layout.align();
62
63 let chunk = match self.current.get() {
65 Some(chunk) => chunk,
66 None => {
67 let chunk = self.allocate_chunk();
68 self.current.set(Some(chunk));
69 chunk
70 }
71 };
72
73 unsafe {
74 let chunk_ref = chunk.as_ref();
75 let offset = chunk_ref.offset.get();
76
77 let aligned_offset = (offset + align - 1) & !(align - 1);
79 let new_offset = aligned_offset + size;
80
81 if new_offset > chunk_ref.capacity {
82 let new_chunk = self.allocate_chunk();
84 chunk_ref.next.set(Some(new_chunk));
85 self.current.set(Some(new_chunk));
86
87 return self.alloc_layout(layout);
89 }
90
91 chunk_ref.offset.set(new_offset);
92 NonNull::new_unchecked(chunk_ref.data.as_ptr().add(aligned_offset))
93 }
94 }
95
96 fn allocate_chunk(&self) -> NonNull<Chunk> {
98 unsafe {
99 let layout = Layout::from_size_align_unchecked(self.chunk_size, 64);
100 let data = NonNull::new_unchecked(alloc(layout));
101
102 let chunk_layout = Layout::new::<Chunk>();
103 let chunk_ptr = alloc(chunk_layout) as *mut Chunk;
104
105 ptr::write(
106 chunk_ptr,
107 Chunk {
108 data,
109 offset: Cell::new(0),
110 capacity: self.chunk_size,
111 next: Cell::new(None),
112 },
113 );
114
115 let chunk = NonNull::new_unchecked(chunk_ptr);
116 self.chunks.lock().push(chunk);
117 chunk
118 }
119 }
120
121 pub fn reset(&self) {
123 let chunks = self.chunks.lock();
124 for &chunk in chunks.iter() {
125 unsafe {
126 chunk.as_ref().offset.set(0);
127 chunk.as_ref().next.set(None);
128 }
129 }
130
131 if let Some(first_chunk) = chunks.first() {
132 self.current.set(Some(*first_chunk));
133 }
134 }
135
136 pub fn total_allocated(&self) -> usize {
138 self.chunks.lock().len() * self.chunk_size
139 }
140}
141
142impl Default for ArenaAllocator {
143 fn default() -> Self {
144 Self::new()
145 }
146}
147
148impl Drop for ArenaAllocator {
149 fn drop(&mut self) {
150 let chunks = self.chunks.lock();
151 for &chunk in chunks.iter() {
152 unsafe {
153 let chunk_ref = chunk.as_ref();
154
155 let data_layout = Layout::from_size_align_unchecked(chunk_ref.capacity, 64);
157 dealloc(chunk_ref.data.as_ptr(), data_layout);
158
159 let chunk_layout = Layout::new::<Chunk>();
161 dealloc(chunk.as_ptr() as *mut u8, chunk_layout);
162 }
163 }
164 }
165}
166
167unsafe impl Send for ArenaAllocator {}
168unsafe impl Sync for ArenaAllocator {}
169
170pub struct QueryArena {
172 arena: Arc<ArenaAllocator>,
173}
174
175impl QueryArena {
176 pub fn new() -> Self {
177 Self {
178 arena: Arc::new(ArenaAllocator::new()),
179 }
180 }
181
182 pub fn execute_query<F, R>(&self, f: F) -> R
183 where
184 F: FnOnce(&ArenaAllocator) -> R,
185 {
186 let result = f(&self.arena);
187 self.arena.reset();
188 result
189 }
190
191 pub fn arena(&self) -> &ArenaAllocator {
192 &self.arena
193 }
194}
195
196impl Default for QueryArena {
197 fn default() -> Self {
198 Self::new()
199 }
200}
201
202pub struct NumaAllocator {
204 node_allocators: Vec<Arc<ArenaAllocator>>,
206 preferred_node: Cell<usize>,
208}
209
210impl NumaAllocator {
211 pub fn new() -> Self {
213 let num_nodes = Self::detect_numa_nodes();
214 let node_allocators = (0..num_nodes)
215 .map(|_| Arc::new(ArenaAllocator::new()))
216 .collect();
217
218 Self {
219 node_allocators,
220 preferred_node: Cell::new(0),
221 }
222 }
223
224 fn detect_numa_nodes() -> usize {
226 let cpus = num_cpus::get();
229 ((cpus + 7) / 8).max(1)
230 }
231
232 pub fn alloc<T>(&self) -> NonNull<T> {
234 let node = self.preferred_node.get();
235 self.node_allocators[node].alloc()
236 }
237
238 pub fn set_preferred_node(&self, node: usize) {
240 if node < self.node_allocators.len() {
241 self.preferred_node.set(node);
242 }
243 }
244
245 pub fn bind_to_node(&self, node: usize) {
247 self.set_preferred_node(node);
248
249 #[cfg(target_os = "linux")]
252 {
253 }
255 }
256}
257
258impl Default for NumaAllocator {
259 fn default() -> Self {
260 Self::new()
261 }
262}
263
264pub struct ObjectPool<T> {
266 available: Arc<crossbeam::queue::SegQueue<T>>,
268 factory: Arc<dyn Fn() -> T + Send + Sync>,
270 max_size: usize,
272}
273
274impl<T> ObjectPool<T> {
275 pub fn new<F>(max_size: usize, factory: F) -> Self
276 where
277 F: Fn() -> T + Send + Sync + 'static,
278 {
279 Self {
280 available: Arc::new(crossbeam::queue::SegQueue::new()),
281 factory: Arc::new(factory),
282 max_size,
283 }
284 }
285
286 pub fn acquire(&self) -> PooledObject<T> {
287 let object = self.available.pop().unwrap_or_else(|| (self.factory)());
288
289 PooledObject {
290 object: Some(object),
291 pool: Arc::clone(&self.available),
292 }
293 }
294
295 pub fn len(&self) -> usize {
296 self.available.len()
297 }
298
299 pub fn is_empty(&self) -> bool {
300 self.available.is_empty()
301 }
302}
303
304pub struct PooledObject<T> {
306 object: Option<T>,
307 pool: Arc<crossbeam::queue::SegQueue<T>>,
308}
309
310impl<T> PooledObject<T> {
311 pub fn get(&self) -> &T {
312 self.object.as_ref().unwrap()
313 }
314
315 pub fn get_mut(&mut self) -> &mut T {
316 self.object.as_mut().unwrap()
317 }
318}
319
320impl<T> Drop for PooledObject<T> {
321 fn drop(&mut self) {
322 if let Some(object) = self.object.take() {
323 let _ = self.pool.push(object);
324 }
325 }
326}
327
328impl<T> std::ops::Deref for PooledObject<T> {
329 type Target = T;
330 fn deref(&self) -> &Self::Target {
331 self.object.as_ref().unwrap()
332 }
333}
334
335impl<T> std::ops::DerefMut for PooledObject<T> {
336 fn deref_mut(&mut self) -> &mut Self::Target {
337 self.object.as_mut().unwrap()
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344
345 #[test]
346 fn test_arena_allocator() {
347 let arena = ArenaAllocator::new();
348
349 let ptr1 = arena.alloc::<u64>();
350 let ptr2 = arena.alloc::<u64>();
351
352 unsafe {
353 ptr1.as_ptr().write(42);
354 ptr2.as_ptr().write(84);
355
356 assert_eq!(ptr1.as_ptr().read(), 42);
357 assert_eq!(ptr2.as_ptr().read(), 84);
358 }
359 }
360
361 #[test]
362 fn test_arena_reset() {
363 let arena = ArenaAllocator::new();
364
365 for _ in 0..100 {
366 arena.alloc::<u64>();
367 }
368
369 let allocated_before = arena.total_allocated();
370 arena.reset();
371 let allocated_after = arena.total_allocated();
372
373 assert_eq!(allocated_before, allocated_after);
374 }
375
376 #[test]
377 fn test_query_arena() {
378 let query_arena = QueryArena::new();
379
380 let result = query_arena.execute_query(|arena| {
381 let ptr = arena.alloc::<u64>();
382 unsafe {
383 ptr.as_ptr().write(123);
384 ptr.as_ptr().read()
385 }
386 });
387
388 assert_eq!(result, 123);
389 }
390
391 #[test]
392 fn test_object_pool() {
393 let pool = ObjectPool::new(10, || Vec::<u8>::with_capacity(1024));
394
395 let mut obj = pool.acquire();
396 obj.push(42);
397 assert_eq!(obj[0], 42);
398
399 drop(obj);
400
401 let obj2 = pool.acquire();
402 assert!(obj2.capacity() >= 1024);
403 }
404}