allsource_core/infrastructure/persistence/
arena_pool.rs1use bumpalo::Bump;
25use std::cell::RefCell;
26use std::sync::atomic::{AtomicU64, Ordering};
27
28const DEFAULT_ARENA_SIZE: usize = 16 * 1024 * 1024;
30
31const MAX_THREAD_LOCAL_ARENAS: usize = 4;
33
34static ARENAS_CREATED: AtomicU64 = AtomicU64::new(0);
36static ARENAS_RECYCLED: AtomicU64 = AtomicU64::new(0);
37static BYTES_ALLOCATED: AtomicU64 = AtomicU64::new(0);
38
39thread_local! {
41 static ARENA_POOL: RefCell<Vec<Bump>> = RefCell::new(Vec::with_capacity(MAX_THREAD_LOCAL_ARENAS));
42}
43
44pub fn get_arena() -> PooledArena {
50 let arena = ARENA_POOL.with(|pool| pool.borrow_mut().pop());
51
52 let arena = match arena {
53 Some(mut arena) => {
54 arena.reset();
55 ARENAS_RECYCLED.fetch_add(1, Ordering::Relaxed);
56 arena
57 }
58 None => {
59 ARENAS_CREATED.fetch_add(1, Ordering::Relaxed);
60 Bump::with_capacity(DEFAULT_ARENA_SIZE)
61 }
62 };
63
64 PooledArena { arena: Some(arena) }
65}
66
67pub fn get_arena_with_capacity(capacity: usize) -> PooledArena {
69 ARENAS_CREATED.fetch_add(1, Ordering::Relaxed);
70 PooledArena {
71 arena: Some(Bump::with_capacity(capacity)),
72 }
73}
74
75pub struct PooledArena {
77 arena: Option<Bump>,
78}
79
80impl PooledArena {
81 #[inline]
86 pub fn alloc_str(&self, s: &str) -> &str {
87 self.arena.as_ref().unwrap().alloc_str(s)
88 }
89
90 #[inline]
92 pub fn alloc_bytes(&self, bytes: &[u8]) -> &[u8] {
93 self.arena.as_ref().unwrap().alloc_slice_copy(bytes)
94 }
95
96 #[inline]
98 pub fn alloc<T>(&self, val: T) -> &mut T {
99 self.arena.as_ref().unwrap().alloc(val)
100 }
101
102 #[inline]
104 pub fn alloc_slice_fill_with<T, F>(&self, len: usize, f: F) -> &mut [T]
105 where
106 F: FnMut(usize) -> T,
107 {
108 self.arena.as_ref().unwrap().alloc_slice_fill_with(len, f)
109 }
110
111 pub fn allocated(&self) -> usize {
113 self.arena.as_ref().unwrap().allocated_bytes()
114 }
115
116 pub fn inner(&self) -> &Bump {
118 self.arena.as_ref().unwrap()
119 }
120}
121
122impl Drop for PooledArena {
123 fn drop(&mut self) {
124 if let Some(arena) = self.arena.take() {
125 let allocated = arena.allocated_bytes();
126 BYTES_ALLOCATED.fetch_add(allocated as u64, Ordering::Relaxed);
127
128 ARENA_POOL.with(|pool| {
129 let mut pool = pool.borrow_mut();
130 if pool.len() < MAX_THREAD_LOCAL_ARENAS {
131 pool.push(arena);
132 }
133 });
135 }
136 }
137}
138
139#[derive(Debug, Clone)]
141pub struct ArenaPoolStats {
142 pub arenas_created: u64,
144 pub arenas_recycled: u64,
146 pub bytes_allocated: u64,
148 pub recycle_rate: f64,
150}
151
152pub fn arena_stats() -> ArenaPoolStats {
154 let created = ARENAS_CREATED.load(Ordering::Relaxed);
155 let recycled = ARENAS_RECYCLED.load(Ordering::Relaxed);
156 let total = created + recycled;
157
158 ArenaPoolStats {
159 arenas_created: created,
160 arenas_recycled: recycled,
161 bytes_allocated: BYTES_ALLOCATED.load(Ordering::Relaxed),
162 recycle_rate: if total > 0 {
163 recycled as f64 / total as f64
164 } else {
165 0.0
166 },
167 }
168}
169
170pub fn reset_stats() {
172 ARENAS_CREATED.store(0, Ordering::Relaxed);
173 ARENAS_RECYCLED.store(0, Ordering::Relaxed);
174 BYTES_ALLOCATED.store(0, Ordering::Relaxed);
175}
176
177pub struct ScopedArena {
192 arena: PooledArena,
193}
194
195impl ScopedArena {
196 pub fn new() -> Self {
198 Self { arena: get_arena() }
199 }
200
201 pub fn with_capacity(capacity: usize) -> Self {
203 Self {
204 arena: get_arena_with_capacity(capacity),
205 }
206 }
207
208 #[inline]
210 pub fn alloc_str(&self, s: &str) -> &str {
211 self.arena.alloc_str(s)
212 }
213
214 #[inline]
216 pub fn alloc_bytes(&self, bytes: &[u8]) -> &[u8] {
217 self.arena.alloc_bytes(bytes)
218 }
219
220 #[inline]
222 pub fn alloc<T>(&self, val: T) -> &mut T {
223 self.arena.alloc(val)
224 }
225
226 pub fn allocated(&self) -> usize {
228 self.arena.allocated()
229 }
230}
231
232impl Default for ScopedArena {
233 fn default() -> Self {
234 Self::new()
235 }
236}
237
238pub struct SizedBufferPool {
243 small: Vec<Vec<u8>>, medium: Vec<Vec<u8>>, large: Vec<Vec<u8>>, small_size: usize,
247 medium_size: usize,
248 large_size: usize,
249 max_pool: usize,
250}
251
252impl SizedBufferPool {
253 pub fn new() -> Self {
255 Self {
256 small: Vec::new(),
257 medium: Vec::new(),
258 large: Vec::new(),
259 small_size: 1024,
260 medium_size: 64 * 1024,
261 large_size: 1024 * 1024,
262 max_pool: 32,
263 }
264 }
265
266 pub fn get(&mut self, min_size: usize) -> Vec<u8> {
268 let buf = if min_size <= self.small_size {
269 self.small.pop()
270 } else if min_size <= self.medium_size {
271 self.medium.pop()
272 } else {
273 self.large.pop()
274 };
275
276 match buf {
277 Some(mut b) => {
278 b.clear();
279 if b.capacity() >= min_size {
280 b
281 } else {
282 Vec::with_capacity(min_size)
283 }
284 }
285 None => {
286 let capacity = if min_size <= self.small_size {
287 self.small_size
288 } else if min_size <= self.medium_size {
289 self.medium_size
290 } else {
291 self.large_size.max(min_size)
292 };
293 Vec::with_capacity(capacity)
294 }
295 }
296 }
297
298 pub fn put(&mut self, mut buf: Vec<u8>) {
300 let cap = buf.capacity();
301 buf.clear();
302
303 if cap <= self.small_size && self.small.len() < self.max_pool {
304 self.small.push(buf);
305 } else if cap <= self.medium_size && self.medium.len() < self.max_pool {
306 self.medium.push(buf);
307 } else if self.large.len() < self.max_pool {
308 self.large.push(buf);
309 }
310 }
312
313 pub fn pool_sizes(&self) -> (usize, usize, usize) {
315 (self.small.len(), self.medium.len(), self.large.len())
316 }
317}
318
319impl Default for SizedBufferPool {
320 fn default() -> Self {
321 Self::new()
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328
329 #[test]
330 fn test_get_arena() {
331 let stats_before = arena_stats();
332
333 let arena1 = get_arena();
334 let s = arena1.alloc_str("hello");
335 assert_eq!(s, "hello");
336 assert!(arena1.allocated() > 0);
337
338 drop(arena1);
339
340 let stats_after = arena_stats();
341 let total_before = stats_before.arenas_created + stats_before.arenas_recycled;
343 let total_after = stats_after.arenas_created + stats_after.arenas_recycled;
344 assert!(
345 total_after >= total_before,
346 "Expected total ops {} >= {}",
347 total_after,
348 total_before
349 );
350 }
351
352 #[test]
353 fn test_arena_recycling() {
354 ARENA_POOL.with(|pool| pool.borrow_mut().clear());
356
357 let pool_empty = ARENA_POOL.with(|pool| pool.borrow().is_empty());
359 assert!(pool_empty, "Pool should be empty after clear");
360
361 let arena1 = get_arena();
363 let _ = arena1.alloc_str("test"); drop(arena1);
365
366 let pool_has_arena = ARENA_POOL.with(|pool| !pool.borrow().is_empty());
368 assert!(pool_has_arena, "Pool should have arena after drop");
369
370 let arena2 = get_arena();
372 let s = arena2.alloc_str("recycled");
374 assert_eq!(s, "recycled");
375
376 let pool_empty_after = ARENA_POOL.with(|pool| pool.borrow().is_empty());
378 assert!(pool_empty_after, "Pool should be empty after taking arena");
379 drop(arena2);
380 }
381
382 #[test]
383 fn test_arena_allocations() {
384 let arena = get_arena();
385
386 let s1 = arena.alloc_str("hello");
387 let s2 = arena.alloc_str("world");
388 let bytes = arena.alloc_bytes(&[1, 2, 3, 4, 5]);
389
390 assert_eq!(s1, "hello");
391 assert_eq!(s2, "world");
392 assert_eq!(bytes, &[1, 2, 3, 4, 5]);
393
394 assert!(arena.allocated() > 0);
395 }
396
397 #[test]
398 fn test_scoped_arena() {
399 reset_stats();
400
401 {
402 let arena = ScopedArena::new();
403 let s = arena.alloc_str("scoped");
404 assert_eq!(s, "scoped");
405 } let _ = ScopedArena::new();
409
410 let stats = arena_stats();
411 assert!(stats.arenas_recycled > 0 || stats.arenas_created > 0);
412 }
413
414 #[test]
415 fn test_sized_buffer_pool() {
416 let mut pool = SizedBufferPool::new();
417
418 let buf1 = pool.get(100);
420 assert!(buf1.capacity() >= 100);
421
422 let buf2 = pool.get(10_000);
424 assert!(buf2.capacity() >= 10_000);
425
426 pool.put(buf1);
428 pool.put(buf2);
429
430 let (small, medium, large) = pool.pool_sizes();
431 assert_eq!(small, 1);
432 assert_eq!(medium, 1);
433 assert_eq!(large, 0);
434 }
435
436 #[test]
437 fn test_sized_buffer_reuse() {
438 let mut pool = SizedBufferPool::new();
439
440 let mut buf1 = pool.get(100);
441 buf1.extend_from_slice(b"test data");
442 pool.put(buf1);
443
444 let buf2 = pool.get(100);
446 assert!(buf2.is_empty());
447 assert!(buf2.capacity() >= 100);
448 }
449
450 #[test]
451 fn test_arena_with_custom_capacity() {
452 let arena = get_arena_with_capacity(1024);
453 let s = arena.alloc_str("custom");
454 assert_eq!(s, "custom");
455 }
456
457 #[test]
458 fn test_concurrent_arena_access() {
459 reset_stats();
460
461 std::thread::scope(|s| {
462 for _ in 0..4 {
463 s.spawn(|| {
464 for _ in 0..100 {
465 let arena = get_arena();
466 let _ = arena.alloc_str("concurrent test");
467 drop(arena);
468 }
469 });
470 }
471 });
472
473 let stats = arena_stats();
474 assert!(stats.arenas_created > 0);
476 assert!(stats.arenas_recycled > 0);
477 }
478
479 #[test]
480 fn test_alloc_slice_fill() {
481 let arena = get_arena();
482 let slice = arena.alloc_slice_fill_with(5, |i| i * 2);
483 assert_eq!(slice, &[0, 2, 4, 6, 8]);
484 }
485}