allsource_core/infrastructure/persistence/
arena_pool.rs1use bumpalo::Bump;
25use std::{
26 cell::RefCell,
27 sync::atomic::{AtomicU64, Ordering},
28};
29
30const DEFAULT_ARENA_SIZE: usize = 16 * 1024 * 1024;
32
33const MAX_THREAD_LOCAL_ARENAS: usize = 4;
35
36static ARENAS_CREATED: AtomicU64 = AtomicU64::new(0);
38static ARENAS_RECYCLED: AtomicU64 = AtomicU64::new(0);
39static BYTES_ALLOCATED: AtomicU64 = AtomicU64::new(0);
40
41thread_local! {
43 static ARENA_POOL: RefCell<Vec<Bump>> = RefCell::new(Vec::with_capacity(MAX_THREAD_LOCAL_ARENAS));
44}
45
46pub fn get_arena() -> PooledArena {
52 let arena = ARENA_POOL.with(|pool| pool.borrow_mut().pop());
53
54 let arena = match arena {
55 Some(mut arena) => {
56 arena.reset();
57 ARENAS_RECYCLED.fetch_add(1, Ordering::Relaxed);
58 arena
59 }
60 None => {
61 ARENAS_CREATED.fetch_add(1, Ordering::Relaxed);
62 Bump::with_capacity(DEFAULT_ARENA_SIZE)
63 }
64 };
65
66 PooledArena { arena: Some(arena) }
67}
68
69pub fn get_arena_with_capacity(capacity: usize) -> PooledArena {
71 ARENAS_CREATED.fetch_add(1, Ordering::Relaxed);
72 PooledArena {
73 arena: Some(Bump::with_capacity(capacity)),
74 }
75}
76
77pub struct PooledArena {
79 arena: Option<Bump>,
80}
81
82impl PooledArena {
83 #[inline]
88 pub fn alloc_str(&self, s: &str) -> &str {
89 self.arena.as_ref().unwrap().alloc_str(s)
90 }
91
92 #[inline]
94 pub fn alloc_bytes(&self, bytes: &[u8]) -> &[u8] {
95 self.arena.as_ref().unwrap().alloc_slice_copy(bytes)
96 }
97
98 #[inline]
100 pub fn alloc<T>(&self, val: T) -> &mut T {
101 self.arena.as_ref().unwrap().alloc(val)
102 }
103
104 #[inline]
106 pub fn alloc_slice_fill_with<T, F>(&self, len: usize, f: F) -> &mut [T]
107 where
108 F: FnMut(usize) -> T,
109 {
110 self.arena.as_ref().unwrap().alloc_slice_fill_with(len, f)
111 }
112
113 pub fn allocated(&self) -> usize {
115 self.arena.as_ref().unwrap().allocated_bytes()
116 }
117
118 pub fn inner(&self) -> &Bump {
120 self.arena.as_ref().unwrap()
121 }
122}
123
124impl Drop for PooledArena {
125 fn drop(&mut self) {
126 if let Some(arena) = self.arena.take() {
127 let allocated = arena.allocated_bytes();
128 BYTES_ALLOCATED.fetch_add(allocated as u64, Ordering::Relaxed);
129
130 ARENA_POOL.with(|pool| {
131 let mut pool = pool.borrow_mut();
132 if pool.len() < MAX_THREAD_LOCAL_ARENAS {
133 pool.push(arena);
134 }
135 });
137 }
138 }
139}
140
141#[derive(Debug, Clone)]
143pub struct ArenaPoolStats {
144 pub arenas_created: u64,
146 pub arenas_recycled: u64,
148 pub bytes_allocated: u64,
150 pub recycle_rate: f64,
152}
153
154pub fn arena_stats() -> ArenaPoolStats {
156 let created = ARENAS_CREATED.load(Ordering::Relaxed);
157 let recycled = ARENAS_RECYCLED.load(Ordering::Relaxed);
158 let total = created + recycled;
159
160 ArenaPoolStats {
161 arenas_created: created,
162 arenas_recycled: recycled,
163 bytes_allocated: BYTES_ALLOCATED.load(Ordering::Relaxed),
164 recycle_rate: if total > 0 {
165 recycled as f64 / total as f64
166 } else {
167 0.0
168 },
169 }
170}
171
172pub fn reset_stats() {
174 ARENAS_CREATED.store(0, Ordering::Relaxed);
175 ARENAS_RECYCLED.store(0, Ordering::Relaxed);
176 BYTES_ALLOCATED.store(0, Ordering::Relaxed);
177}
178
179pub struct ScopedArena {
194 arena: PooledArena,
195}
196
197impl ScopedArena {
198 pub fn new() -> Self {
200 Self { arena: get_arena() }
201 }
202
203 pub fn with_capacity(capacity: usize) -> Self {
205 Self {
206 arena: get_arena_with_capacity(capacity),
207 }
208 }
209
210 #[inline]
212 pub fn alloc_str(&self, s: &str) -> &str {
213 self.arena.alloc_str(s)
214 }
215
216 #[inline]
218 pub fn alloc_bytes(&self, bytes: &[u8]) -> &[u8] {
219 self.arena.alloc_bytes(bytes)
220 }
221
222 #[inline]
224 pub fn alloc<T>(&self, val: T) -> &mut T {
225 self.arena.alloc(val)
226 }
227
228 pub fn allocated(&self) -> usize {
230 self.arena.allocated()
231 }
232}
233
234impl Default for ScopedArena {
235 fn default() -> Self {
236 Self::new()
237 }
238}
239
240pub struct SizedBufferPool {
245 small: Vec<Vec<u8>>, medium: Vec<Vec<u8>>, large: Vec<Vec<u8>>, small_size: usize,
249 medium_size: usize,
250 large_size: usize,
251 max_pool: usize,
252}
253
254impl SizedBufferPool {
255 pub fn new() -> Self {
257 Self {
258 small: Vec::new(),
259 medium: Vec::new(),
260 large: Vec::new(),
261 small_size: 1024,
262 medium_size: 64 * 1024,
263 large_size: 1024 * 1024,
264 max_pool: 32,
265 }
266 }
267
268 pub fn get(&mut self, min_size: usize) -> Vec<u8> {
270 let buf = if min_size <= self.small_size {
271 self.small.pop()
272 } else if min_size <= self.medium_size {
273 self.medium.pop()
274 } else {
275 self.large.pop()
276 };
277
278 match buf {
279 Some(mut b) => {
280 b.clear();
281 if b.capacity() >= min_size {
282 b
283 } else {
284 Vec::with_capacity(min_size)
285 }
286 }
287 None => {
288 let capacity = if min_size <= self.small_size {
289 self.small_size
290 } else if min_size <= self.medium_size {
291 self.medium_size
292 } else {
293 self.large_size.max(min_size)
294 };
295 Vec::with_capacity(capacity)
296 }
297 }
298 }
299
300 pub fn put(&mut self, mut buf: Vec<u8>) {
302 let cap = buf.capacity();
303 buf.clear();
304
305 if cap <= self.small_size && self.small.len() < self.max_pool {
306 self.small.push(buf);
307 } else if cap <= self.medium_size && self.medium.len() < self.max_pool {
308 self.medium.push(buf);
309 } else if self.large.len() < self.max_pool {
310 self.large.push(buf);
311 }
312 }
314
315 pub fn pool_sizes(&self) -> (usize, usize, usize) {
317 (self.small.len(), self.medium.len(), self.large.len())
318 }
319}
320
321impl Default for SizedBufferPool {
322 fn default() -> Self {
323 Self::new()
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 #[test]
332 fn test_get_arena() {
333 let arena1 = get_arena();
337 let s = arena1.alloc_str("hello");
338 assert_eq!(s, "hello");
339 assert!(arena1.allocated() > 0);
340
341 let s2 = arena1.alloc_str("world");
343 assert_eq!(s2, "world");
344
345 assert_eq!(s, "hello");
347 assert_eq!(s2, "world");
348 }
349
350 #[test]
351 fn test_arena_recycling() {
352 ARENA_POOL.with(|pool| pool.borrow_mut().clear());
354
355 let pool_empty = ARENA_POOL.with(|pool| pool.borrow().is_empty());
357 assert!(pool_empty, "Pool should be empty after clear");
358
359 let arena1 = get_arena();
361 let _ = arena1.alloc_str("test"); drop(arena1);
363
364 let pool_has_arena = ARENA_POOL.with(|pool| !pool.borrow().is_empty());
366 assert!(pool_has_arena, "Pool should have arena after drop");
367
368 let arena2 = get_arena();
370 let s = arena2.alloc_str("recycled");
372 assert_eq!(s, "recycled");
373
374 let pool_empty_after = ARENA_POOL.with(|pool| pool.borrow().is_empty());
376 assert!(pool_empty_after, "Pool should be empty after taking arena");
377 drop(arena2);
378 }
379
380 #[test]
381 fn test_arena_allocations() {
382 let arena = get_arena();
383
384 let s1 = arena.alloc_str("hello");
385 let s2 = arena.alloc_str("world");
386 let bytes = arena.alloc_bytes(&[1, 2, 3, 4, 5]);
387
388 assert_eq!(s1, "hello");
389 assert_eq!(s2, "world");
390 assert_eq!(bytes, &[1, 2, 3, 4, 5]);
391
392 assert!(arena.allocated() > 0);
393 }
394
395 #[test]
396 fn test_scoped_arena() {
397 reset_stats();
398
399 {
400 let arena = ScopedArena::new();
401 let s = arena.alloc_str("scoped");
402 assert_eq!(s, "scoped");
403 } let _ = ScopedArena::new();
407
408 let stats = arena_stats();
409 assert!(stats.arenas_recycled > 0 || stats.arenas_created > 0);
410 }
411
412 #[test]
413 fn test_sized_buffer_pool() {
414 let mut pool = SizedBufferPool::new();
415
416 let buf1 = pool.get(100);
418 assert!(buf1.capacity() >= 100);
419
420 let buf2 = pool.get(10_000);
422 assert!(buf2.capacity() >= 10_000);
423
424 pool.put(buf1);
426 pool.put(buf2);
427
428 let (small, medium, large) = pool.pool_sizes();
429 assert_eq!(small, 1);
430 assert_eq!(medium, 1);
431 assert_eq!(large, 0);
432 }
433
434 #[test]
435 fn test_sized_buffer_reuse() {
436 let mut pool = SizedBufferPool::new();
437
438 let mut buf1 = pool.get(100);
439 buf1.extend_from_slice(b"test data");
440 pool.put(buf1);
441
442 let buf2 = pool.get(100);
444 assert!(buf2.is_empty());
445 assert!(buf2.capacity() >= 100);
446 }
447
448 #[test]
449 fn test_arena_with_custom_capacity() {
450 let arena = get_arena_with_capacity(1024);
451 let s = arena.alloc_str("custom");
452 assert_eq!(s, "custom");
453 }
454
455 #[test]
456 fn test_concurrent_arena_access() {
457 reset_stats();
458
459 std::thread::scope(|s| {
460 for _ in 0..4 {
461 s.spawn(|| {
462 for _ in 0..100 {
463 let arena = get_arena();
464 let _ = arena.alloc_str("concurrent test");
465 drop(arena);
466 }
467 });
468 }
469 });
470
471 let stats = arena_stats();
472 assert!(stats.arenas_created > 0);
474 assert!(stats.arenas_recycled > 0);
475 }
476
477 #[test]
478 fn test_alloc_slice_fill() {
479 let arena = get_arena();
480 let slice = arena.alloc_slice_fill_with(5, |i| i * 2);
481 assert_eq!(slice, &[0, 2, 4, 6, 8]);
482 }
483}