allsource_core/infrastructure/persistence/
arena_pool.rs1use bumpalo::Bump;
25use std::{
26 cell::RefCell,
27 sync::atomic::{AtomicU64, Ordering},
28};
29
30const DEFAULT_ARENA_SIZE: usize = 16 * 1024 * 1024;
32
33const MAX_THREAD_LOCAL_ARENAS: usize = 4;
35
36static ARENAS_CREATED: AtomicU64 = AtomicU64::new(0);
38static ARENAS_RECYCLED: AtomicU64 = AtomicU64::new(0);
39static BYTES_ALLOCATED: AtomicU64 = AtomicU64::new(0);
40
41thread_local! {
43 static ARENA_POOL: RefCell<Vec<Bump>> = RefCell::new(Vec::with_capacity(MAX_THREAD_LOCAL_ARENAS));
44}
45
46pub fn get_arena() -> PooledArena {
52 let arena = ARENA_POOL.with(|pool| pool.borrow_mut().pop());
53
54 let arena = if let Some(mut arena) = arena {
55 arena.reset();
56 ARENAS_RECYCLED.fetch_add(1, Ordering::Relaxed);
57 arena
58 } else {
59 ARENAS_CREATED.fetch_add(1, Ordering::Relaxed);
60 Bump::with_capacity(DEFAULT_ARENA_SIZE)
61 };
62
63 PooledArena { arena: Some(arena) }
64}
65
66pub fn get_arena_with_capacity(capacity: usize) -> PooledArena {
68 ARENAS_CREATED.fetch_add(1, Ordering::Relaxed);
69 PooledArena {
70 arena: Some(Bump::with_capacity(capacity)),
71 }
72}
73
74pub struct PooledArena {
76 arena: Option<Bump>,
77}
78
79impl PooledArena {
80 #[inline]
85 pub fn alloc_str(&self, s: &str) -> &str {
86 self.arena.as_ref().unwrap().alloc_str(s)
87 }
88
89 #[inline]
91 pub fn alloc_bytes(&self, bytes: &[u8]) -> &[u8] {
92 self.arena.as_ref().unwrap().alloc_slice_copy(bytes)
93 }
94
95 #[inline]
97 pub fn alloc<T>(&self, val: T) -> &mut T {
98 self.arena.as_ref().unwrap().alloc(val)
99 }
100
101 #[inline]
103 pub fn alloc_slice_fill_with<T, F>(&self, len: usize, f: F) -> &mut [T]
104 where
105 F: FnMut(usize) -> T,
106 {
107 self.arena.as_ref().unwrap().alloc_slice_fill_with(len, f)
108 }
109
110 pub fn allocated(&self) -> usize {
112 self.arena.as_ref().unwrap().allocated_bytes()
113 }
114
115 pub fn inner(&self) -> &Bump {
117 self.arena.as_ref().unwrap()
118 }
119}
120
121impl Drop for PooledArena {
122 fn drop(&mut self) {
123 if let Some(arena) = self.arena.take() {
124 let allocated = arena.allocated_bytes();
125 BYTES_ALLOCATED.fetch_add(allocated as u64, Ordering::Relaxed);
126
127 ARENA_POOL.with(|pool| {
128 let mut pool = pool.borrow_mut();
129 if pool.len() < MAX_THREAD_LOCAL_ARENAS {
130 pool.push(arena);
131 }
132 });
134 }
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct ArenaPoolStats {
141 pub arenas_created: u64,
143 pub arenas_recycled: u64,
145 pub bytes_allocated: u64,
147 pub recycle_rate: f64,
149}
150
151pub fn arena_stats() -> ArenaPoolStats {
153 let created = ARENAS_CREATED.load(Ordering::Relaxed);
154 let recycled = ARENAS_RECYCLED.load(Ordering::Relaxed);
155 let total = created + recycled;
156
157 ArenaPoolStats {
158 arenas_created: created,
159 arenas_recycled: recycled,
160 bytes_allocated: BYTES_ALLOCATED.load(Ordering::Relaxed),
161 recycle_rate: if total > 0 {
162 recycled as f64 / total as f64
163 } else {
164 0.0
165 },
166 }
167}
168
169pub fn reset_stats() {
171 ARENAS_CREATED.store(0, Ordering::Relaxed);
172 ARENAS_RECYCLED.store(0, Ordering::Relaxed);
173 BYTES_ALLOCATED.store(0, Ordering::Relaxed);
174}
175
176pub struct ScopedArena {
191 arena: PooledArena,
192}
193
194impl ScopedArena {
195 pub fn new() -> Self {
197 Self { arena: get_arena() }
198 }
199
200 pub fn with_capacity(capacity: usize) -> Self {
202 Self {
203 arena: get_arena_with_capacity(capacity),
204 }
205 }
206
207 #[inline]
209 pub fn alloc_str(&self, s: &str) -> &str {
210 self.arena.alloc_str(s)
211 }
212
213 #[inline]
215 pub fn alloc_bytes(&self, bytes: &[u8]) -> &[u8] {
216 self.arena.alloc_bytes(bytes)
217 }
218
219 #[inline]
221 pub fn alloc<T>(&self, val: T) -> &mut T {
222 self.arena.alloc(val)
223 }
224
225 pub fn allocated(&self) -> usize {
227 self.arena.allocated()
228 }
229}
230
231impl Default for ScopedArena {
232 fn default() -> Self {
233 Self::new()
234 }
235}
236
237pub struct SizedBufferPool {
242 small: Vec<Vec<u8>>, medium: Vec<Vec<u8>>, large: Vec<Vec<u8>>, small_size: usize,
246 medium_size: usize,
247 large_size: usize,
248 max_pool: usize,
249}
250
251impl SizedBufferPool {
252 pub fn new() -> Self {
254 Self {
255 small: Vec::new(),
256 medium: Vec::new(),
257 large: Vec::new(),
258 small_size: 1024,
259 medium_size: 64 * 1024,
260 large_size: 1024 * 1024,
261 max_pool: 32,
262 }
263 }
264
265 pub fn get(&mut self, min_size: usize) -> Vec<u8> {
267 let buf = if min_size <= self.small_size {
268 self.small.pop()
269 } else if min_size <= self.medium_size {
270 self.medium.pop()
271 } else {
272 self.large.pop()
273 };
274
275 if let Some(mut b) = buf {
276 b.clear();
277 if b.capacity() >= min_size {
278 b
279 } else {
280 Vec::with_capacity(min_size)
281 }
282 } else {
283 let capacity = if min_size <= self.small_size {
284 self.small_size
285 } else if min_size <= self.medium_size {
286 self.medium_size
287 } else {
288 self.large_size.max(min_size)
289 };
290 Vec::with_capacity(capacity)
291 }
292 }
293
294 pub fn put(&mut self, mut buf: Vec<u8>) {
296 let cap = buf.capacity();
297 buf.clear();
298
299 if cap <= self.small_size && self.small.len() < self.max_pool {
300 self.small.push(buf);
301 } else if cap <= self.medium_size && self.medium.len() < self.max_pool {
302 self.medium.push(buf);
303 } else if self.large.len() < self.max_pool {
304 self.large.push(buf);
305 }
306 }
308
309 pub fn pool_sizes(&self) -> (usize, usize, usize) {
311 (self.small.len(), self.medium.len(), self.large.len())
312 }
313}
314
315impl Default for SizedBufferPool {
316 fn default() -> Self {
317 Self::new()
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324
325 #[test]
326 fn test_get_arena() {
327 let arena1 = get_arena();
331 let s = arena1.alloc_str("hello");
332 assert_eq!(s, "hello");
333 assert!(arena1.allocated() > 0);
334
335 let s2 = arena1.alloc_str("world");
337 assert_eq!(s2, "world");
338
339 assert_eq!(s, "hello");
341 assert_eq!(s2, "world");
342 }
343
344 #[test]
345 fn test_arena_recycling() {
346 ARENA_POOL.with(|pool| pool.borrow_mut().clear());
348
349 let pool_empty = ARENA_POOL.with(|pool| pool.borrow().is_empty());
351 assert!(pool_empty, "Pool should be empty after clear");
352
353 let arena1 = get_arena();
355 let _ = arena1.alloc_str("test"); drop(arena1);
357
358 let pool_has_arena = ARENA_POOL.with(|pool| !pool.borrow().is_empty());
360 assert!(pool_has_arena, "Pool should have arena after drop");
361
362 let arena2 = get_arena();
364 let s = arena2.alloc_str("recycled");
366 assert_eq!(s, "recycled");
367
368 let pool_empty_after = ARENA_POOL.with(|pool| pool.borrow().is_empty());
370 assert!(pool_empty_after, "Pool should be empty after taking arena");
371 drop(arena2);
372 }
373
374 #[test]
375 fn test_arena_allocations() {
376 let arena = get_arena();
377
378 let s1 = arena.alloc_str("hello");
379 let s2 = arena.alloc_str("world");
380 let bytes = arena.alloc_bytes(&[1, 2, 3, 4, 5]);
381
382 assert_eq!(s1, "hello");
383 assert_eq!(s2, "world");
384 assert_eq!(bytes, &[1, 2, 3, 4, 5]);
385
386 assert!(arena.allocated() > 0);
387 }
388
389 #[test]
390 fn test_scoped_arena() {
391 reset_stats();
392
393 {
394 let arena = ScopedArena::new();
395 let s = arena.alloc_str("scoped");
396 assert_eq!(s, "scoped");
397 } let _ = ScopedArena::new();
401
402 let stats = arena_stats();
403 assert!(stats.arenas_recycled > 0 || stats.arenas_created > 0);
404 }
405
406 #[test]
407 fn test_sized_buffer_pool() {
408 let mut pool = SizedBufferPool::new();
409
410 let buf1 = pool.get(100);
412 assert!(buf1.capacity() >= 100);
413
414 let buf2 = pool.get(10_000);
416 assert!(buf2.capacity() >= 10_000);
417
418 pool.put(buf1);
420 pool.put(buf2);
421
422 let (small, medium, large) = pool.pool_sizes();
423 assert_eq!(small, 1);
424 assert_eq!(medium, 1);
425 assert_eq!(large, 0);
426 }
427
428 #[test]
429 fn test_sized_buffer_reuse() {
430 let mut pool = SizedBufferPool::new();
431
432 let mut buf1 = pool.get(100);
433 buf1.extend_from_slice(b"test data");
434 pool.put(buf1);
435
436 let buf2 = pool.get(100);
438 assert!(buf2.is_empty());
439 assert!(buf2.capacity() >= 100);
440 }
441
442 #[test]
443 fn test_arena_with_custom_capacity() {
444 let arena = get_arena_with_capacity(1024);
445 let s = arena.alloc_str("custom");
446 assert_eq!(s, "custom");
447 }
448
449 #[test]
450 fn test_concurrent_arena_access() {
451 reset_stats();
452
453 std::thread::scope(|s| {
454 for _ in 0..4 {
455 s.spawn(|| {
456 for _ in 0..100 {
457 let arena = get_arena();
458 let _ = arena.alloc_str("concurrent test");
459 drop(arena);
460 }
461 });
462 }
463 });
464
465 let stats = arena_stats();
466 assert!(stats.arenas_created > 0);
468 assert!(stats.arenas_recycled > 0);
469 }
470
471 #[test]
472 fn test_alloc_slice_fill() {
473 let arena = get_arena();
474 let slice = arena.alloc_slice_fill_with(5, |i| i * 2);
475 assert_eq!(slice, &[0, 2, 4, 6, 8]);
476 }
477}