Skip to main content

allsource_core/infrastructure/persistence/
arena_pool.rs

1//! Arena-based memory pooling for high-throughput event processing
2//!
3//! This module provides thread-local arena allocators for zero-allocation
4//! hot paths. Arenas are recycled and reused to minimize memory churn.
5//!
6//! # Performance Characteristics
7//! - ~2-5ns allocations (vs ~50-100ns for standard allocation)
8//! - Zero fragmentation within arena
9//! - Batch deallocation (entire arena at once)
10//! - Thread-local access eliminates contention
11//!
12//! # Usage Pattern
13//! ```ignore
14//! // Get arena from thread-local pool
15//! let mut arena = get_thread_local_arena();
16//!
17//! // Allocate in arena (very fast)
18//! let s = arena.alloc_str("hello");
19//! let bytes = arena.alloc_bytes(&[1, 2, 3]);
20//!
21//! // Arena is automatically returned to pool when dropped
22//! ```
23
24use bumpalo::Bump;
25use std::{
26    cell::RefCell,
27    sync::atomic::{AtomicU64, Ordering},
28};
29
30/// Default arena size (16MB)
31const DEFAULT_ARENA_SIZE: usize = 16 * 1024 * 1024;
32
33/// Maximum arenas to keep in thread-local pool
34const MAX_THREAD_LOCAL_ARENAS: usize = 4;
35
36/// Global statistics for arena pool usage
37static ARENAS_CREATED: AtomicU64 = AtomicU64::new(0);
38static ARENAS_RECYCLED: AtomicU64 = AtomicU64::new(0);
39static BYTES_ALLOCATED: AtomicU64 = AtomicU64::new(0);
40
41// Thread-local arena pool
42thread_local! {
43    static ARENA_POOL: RefCell<Vec<Bump>> = RefCell::new(Vec::with_capacity(MAX_THREAD_LOCAL_ARENAS));
44}
45
46/// Get an arena from the thread-local pool (or create a new one)
47///
48/// # Performance
49/// - ~10-20ns when recycling from pool
50/// - ~100-500ns when creating new arena
51pub fn get_arena() -> PooledArena {
52    let arena = ARENA_POOL.with(|pool| pool.borrow_mut().pop());
53
54    let arena = match arena {
55        Some(mut arena) => {
56            arena.reset();
57            ARENAS_RECYCLED.fetch_add(1, Ordering::Relaxed);
58            arena
59        }
60        None => {
61            ARENAS_CREATED.fetch_add(1, Ordering::Relaxed);
62            Bump::with_capacity(DEFAULT_ARENA_SIZE)
63        }
64    };
65
66    PooledArena { arena: Some(arena) }
67}
68
69/// Get an arena with custom capacity
70pub fn get_arena_with_capacity(capacity: usize) -> PooledArena {
71    ARENAS_CREATED.fetch_add(1, Ordering::Relaxed);
72    PooledArena {
73        arena: Some(Bump::with_capacity(capacity)),
74    }
75}
76
77/// A pooled arena that returns to the thread-local pool when dropped
78pub struct PooledArena {
79    arena: Option<Bump>,
80}
81
82impl PooledArena {
83    /// Allocate a string slice in the arena
84    ///
85    /// # Performance
86    /// - ~2-5ns allocation
87    #[inline]
88    pub fn alloc_str(&self, s: &str) -> &str {
89        self.arena.as_ref().unwrap().alloc_str(s)
90    }
91
92    /// Allocate a byte slice in the arena
93    #[inline]
94    pub fn alloc_bytes(&self, bytes: &[u8]) -> &[u8] {
95        self.arena.as_ref().unwrap().alloc_slice_copy(bytes)
96    }
97
98    /// Allocate a value in the arena
99    #[inline]
100    pub fn alloc<T>(&self, val: T) -> &mut T {
101        self.arena.as_ref().unwrap().alloc(val)
102    }
103
104    /// Allocate a slice with fill function
105    #[inline]
106    pub fn alloc_slice_fill_with<T, F>(&self, len: usize, f: F) -> &mut [T]
107    where
108        F: FnMut(usize) -> T,
109    {
110        self.arena.as_ref().unwrap().alloc_slice_fill_with(len, f)
111    }
112
113    /// Get current allocation size
114    pub fn allocated(&self) -> usize {
115        self.arena.as_ref().unwrap().allocated_bytes()
116    }
117
118    /// Get a reference to the underlying bump allocator
119    pub fn inner(&self) -> &Bump {
120        self.arena.as_ref().unwrap()
121    }
122}
123
124impl Drop for PooledArena {
125    fn drop(&mut self) {
126        if let Some(arena) = self.arena.take() {
127            let allocated = arena.allocated_bytes();
128            BYTES_ALLOCATED.fetch_add(allocated as u64, Ordering::Relaxed);
129
130            ARENA_POOL.with(|pool| {
131                let mut pool = pool.borrow_mut();
132                if pool.len() < MAX_THREAD_LOCAL_ARENAS {
133                    pool.push(arena);
134                }
135                // Arena dropped if pool is full
136            });
137        }
138    }
139}
140
141/// Statistics for arena pool usage
142#[derive(Debug, Clone)]
143pub struct ArenaPoolStats {
144    /// Total arenas created across all threads
145    pub arenas_created: u64,
146    /// Total arenas recycled (reused from pool)
147    pub arenas_recycled: u64,
148    /// Total bytes allocated through arenas
149    pub bytes_allocated: u64,
150    /// Recycle rate (0.0 to 1.0)
151    pub recycle_rate: f64,
152}
153
154/// Get global arena pool statistics
155pub fn arena_stats() -> ArenaPoolStats {
156    let created = ARENAS_CREATED.load(Ordering::Relaxed);
157    let recycled = ARENAS_RECYCLED.load(Ordering::Relaxed);
158    let total = created + recycled;
159
160    ArenaPoolStats {
161        arenas_created: created,
162        arenas_recycled: recycled,
163        bytes_allocated: BYTES_ALLOCATED.load(Ordering::Relaxed),
164        recycle_rate: if total > 0 {
165            recycled as f64 / total as f64
166        } else {
167            0.0
168        },
169    }
170}
171
172/// Reset global statistics (for testing)
173pub fn reset_stats() {
174    ARENAS_CREATED.store(0, Ordering::Relaxed);
175    ARENAS_RECYCLED.store(0, Ordering::Relaxed);
176    BYTES_ALLOCATED.store(0, Ordering::Relaxed);
177}
178
179/// Scoped arena for temporary allocations
180///
181/// Provides a convenient RAII pattern for short-lived allocations.
182/// The arena is automatically returned to the pool when the scope ends.
183///
184/// # Example
185/// ```ignore
186/// {
187///     let arena = ScopedArena::new();
188///     let s1 = arena.alloc_str("temporary");
189///     let s2 = arena.alloc_str("data");
190///     // Use s1, s2...
191/// } // Arena automatically returned to pool
192/// ```
193pub struct ScopedArena {
194    arena: PooledArena,
195}
196
197impl ScopedArena {
198    /// Create a new scoped arena from the pool
199    pub fn new() -> Self {
200        Self { arena: get_arena() }
201    }
202
203    /// Create with custom capacity
204    pub fn with_capacity(capacity: usize) -> Self {
205        Self {
206            arena: get_arena_with_capacity(capacity),
207        }
208    }
209
210    /// Allocate a string
211    #[inline]
212    pub fn alloc_str(&self, s: &str) -> &str {
213        self.arena.alloc_str(s)
214    }
215
216    /// Allocate bytes
217    #[inline]
218    pub fn alloc_bytes(&self, bytes: &[u8]) -> &[u8] {
219        self.arena.alloc_bytes(bytes)
220    }
221
222    /// Allocate a value
223    #[inline]
224    pub fn alloc<T>(&self, val: T) -> &mut T {
225        self.arena.alloc(val)
226    }
227
228    /// Get current allocation size
229    pub fn allocated(&self) -> usize {
230        self.arena.allocated()
231    }
232}
233
234impl Default for ScopedArena {
235    fn default() -> Self {
236        Self::new()
237    }
238}
239
240/// Pre-allocated buffer pool for specific sizes
241///
242/// Useful when you need buffers of predictable sizes and want
243/// to avoid arena overhead for very small allocations.
244pub struct SizedBufferPool {
245    small: Vec<Vec<u8>>,  // < 1KB
246    medium: Vec<Vec<u8>>, // 1KB - 64KB
247    large: Vec<Vec<u8>>,  // > 64KB
248    small_size: usize,
249    medium_size: usize,
250    large_size: usize,
251    max_pool: usize,
252}
253
254impl SizedBufferPool {
255    /// Create a new sized buffer pool
256    pub fn new() -> Self {
257        Self {
258            small: Vec::new(),
259            medium: Vec::new(),
260            large: Vec::new(),
261            small_size: 1024,
262            medium_size: 64 * 1024,
263            large_size: 1024 * 1024,
264            max_pool: 32,
265        }
266    }
267
268    /// Get a buffer of at least the specified size
269    pub fn get(&mut self, min_size: usize) -> Vec<u8> {
270        let buf = if min_size <= self.small_size {
271            self.small.pop()
272        } else if min_size <= self.medium_size {
273            self.medium.pop()
274        } else {
275            self.large.pop()
276        };
277
278        match buf {
279            Some(mut b) => {
280                b.clear();
281                if b.capacity() >= min_size {
282                    b
283                } else {
284                    Vec::with_capacity(min_size)
285                }
286            }
287            None => {
288                let capacity = if min_size <= self.small_size {
289                    self.small_size
290                } else if min_size <= self.medium_size {
291                    self.medium_size
292                } else {
293                    self.large_size.max(min_size)
294                };
295                Vec::with_capacity(capacity)
296            }
297        }
298    }
299
300    /// Return a buffer to the pool
301    pub fn put(&mut self, mut buf: Vec<u8>) {
302        let cap = buf.capacity();
303        buf.clear();
304
305        if cap <= self.small_size && self.small.len() < self.max_pool {
306            self.small.push(buf);
307        } else if cap <= self.medium_size && self.medium.len() < self.max_pool {
308            self.medium.push(buf);
309        } else if self.large.len() < self.max_pool {
310            self.large.push(buf);
311        }
312        // Buffer dropped if all pools are full
313    }
314
315    /// Get pool statistics
316    pub fn pool_sizes(&self) -> (usize, usize, usize) {
317        (self.small.len(), self.medium.len(), self.large.len())
318    }
319}
320
321impl Default for SizedBufferPool {
322    fn default() -> Self {
323        Self::new()
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    #[test]
332    fn test_get_arena() {
333        // Test that arena allocation and string storage works correctly
334        // Note: We don't check global stats here because they can be affected
335        // by other tests running in parallel (e.g., reset_stats() calls)
336        let arena1 = get_arena();
337        let s = arena1.alloc_str("hello");
338        assert_eq!(s, "hello");
339        assert!(arena1.allocated() > 0);
340
341        // Test that we can allocate multiple items
342        let s2 = arena1.alloc_str("world");
343        assert_eq!(s2, "world");
344
345        // Test that allocations persist
346        assert_eq!(s, "hello");
347        assert_eq!(s2, "world");
348    }
349
350    #[test]
351    fn test_arena_recycling() {
352        // Clear the thread-local pool first
353        ARENA_POOL.with(|pool| pool.borrow_mut().clear());
354
355        // Verify pool is empty
356        let pool_empty = ARENA_POOL.with(|pool| pool.borrow().is_empty());
357        assert!(pool_empty, "Pool should be empty after clear");
358
359        // Create and drop arena - should return to pool
360        let arena1 = get_arena();
361        let _ = arena1.alloc_str("test"); // Use the arena
362        drop(arena1);
363
364        // Verify arena was returned to pool
365        let pool_has_arena = ARENA_POOL.with(|pool| !pool.borrow().is_empty());
366        assert!(pool_has_arena, "Pool should have arena after drop");
367
368        // Get another arena (should be recycled from pool)
369        let arena2 = get_arena();
370        // Verify we can allocate in the recycled arena (it was reset)
371        let s = arena2.alloc_str("recycled");
372        assert_eq!(s, "recycled");
373
374        // Pool should now be empty (we took the arena)
375        let pool_empty_after = ARENA_POOL.with(|pool| pool.borrow().is_empty());
376        assert!(pool_empty_after, "Pool should be empty after taking arena");
377        drop(arena2);
378    }
379
380    #[test]
381    fn test_arena_allocations() {
382        let arena = get_arena();
383
384        let s1 = arena.alloc_str("hello");
385        let s2 = arena.alloc_str("world");
386        let bytes = arena.alloc_bytes(&[1, 2, 3, 4, 5]);
387
388        assert_eq!(s1, "hello");
389        assert_eq!(s2, "world");
390        assert_eq!(bytes, &[1, 2, 3, 4, 5]);
391
392        assert!(arena.allocated() > 0);
393    }
394
395    #[test]
396    fn test_scoped_arena() {
397        reset_stats();
398
399        {
400            let arena = ScopedArena::new();
401            let s = arena.alloc_str("scoped");
402            assert_eq!(s, "scoped");
403        } // Arena returned to pool
404
405        // Next arena should be recycled
406        let _ = ScopedArena::new();
407
408        let stats = arena_stats();
409        assert!(stats.arenas_recycled > 0 || stats.arenas_created > 0);
410    }
411
412    #[test]
413    fn test_sized_buffer_pool() {
414        let mut pool = SizedBufferPool::new();
415
416        // Get small buffer
417        let buf1 = pool.get(100);
418        assert!(buf1.capacity() >= 100);
419
420        // Get medium buffer
421        let buf2 = pool.get(10_000);
422        assert!(buf2.capacity() >= 10_000);
423
424        // Return buffers
425        pool.put(buf1);
426        pool.put(buf2);
427
428        let (small, medium, large) = pool.pool_sizes();
429        assert_eq!(small, 1);
430        assert_eq!(medium, 1);
431        assert_eq!(large, 0);
432    }
433
434    #[test]
435    fn test_sized_buffer_reuse() {
436        let mut pool = SizedBufferPool::new();
437
438        let mut buf1 = pool.get(100);
439        buf1.extend_from_slice(b"test data");
440        pool.put(buf1);
441
442        // Get buffer again - should be cleared
443        let buf2 = pool.get(100);
444        assert!(buf2.is_empty());
445        assert!(buf2.capacity() >= 100);
446    }
447
448    #[test]
449    fn test_arena_with_custom_capacity() {
450        let arena = get_arena_with_capacity(1024);
451        let s = arena.alloc_str("custom");
452        assert_eq!(s, "custom");
453    }
454
455    #[test]
456    fn test_concurrent_arena_access() {
457        reset_stats();
458
459        std::thread::scope(|s| {
460            for _ in 0..4 {
461                s.spawn(|| {
462                    for _ in 0..100 {
463                        let arena = get_arena();
464                        let _ = arena.alloc_str("concurrent test");
465                        drop(arena);
466                    }
467                });
468            }
469        });
470
471        let stats = arena_stats();
472        // Each thread has its own pool, so we should see both creates and recycles
473        assert!(stats.arenas_created > 0);
474        assert!(stats.arenas_recycled > 0);
475    }
476
477    #[test]
478    fn test_alloc_slice_fill() {
479        let arena = get_arena();
480        let slice = arena.alloc_slice_fill_with(5, |i| i * 2);
481        assert_eq!(slice, &[0, 2, 4, 6, 8]);
482    }
483}