Skip to main content

allsource_core/infrastructure/persistence/
arena_pool.rs

1//! Arena-based memory pooling for high-throughput event processing
2//!
3//! This module provides thread-local arena allocators for zero-allocation
4//! hot paths. Arenas are recycled and reused to minimize memory churn.
5//!
6//! # Performance Characteristics
7//! - ~2-5ns allocations (vs ~50-100ns for standard allocation)
8//! - Zero fragmentation within arena
9//! - Batch deallocation (entire arena at once)
10//! - Thread-local access eliminates contention
11//!
12//! # Usage Pattern
13//! ```ignore
14//! // Get arena from thread-local pool
15//! let mut arena = get_thread_local_arena();
16//!
17//! // Allocate in arena (very fast)
18//! let s = arena.alloc_str("hello");
19//! let bytes = arena.alloc_bytes(&[1, 2, 3]);
20//!
21//! // Arena is automatically returned to pool when dropped
22//! ```
23
24use bumpalo::Bump;
25use std::{
26    cell::RefCell,
27    sync::atomic::{AtomicU64, Ordering},
28};
29
30/// Default arena size (16MB)
31const DEFAULT_ARENA_SIZE: usize = 16 * 1024 * 1024;
32
33/// Maximum arenas to keep in thread-local pool
34const MAX_THREAD_LOCAL_ARENAS: usize = 4;
35
36/// Global statistics for arena pool usage
37static ARENAS_CREATED: AtomicU64 = AtomicU64::new(0);
38static ARENAS_RECYCLED: AtomicU64 = AtomicU64::new(0);
39static BYTES_ALLOCATED: AtomicU64 = AtomicU64::new(0);
40
41// Thread-local arena pool
42thread_local! {
43    static ARENA_POOL: RefCell<Vec<Bump>> = RefCell::new(Vec::with_capacity(MAX_THREAD_LOCAL_ARENAS));
44}
45
46/// Get an arena from the thread-local pool (or create a new one)
47///
48/// # Performance
49/// - ~10-20ns when recycling from pool
50/// - ~100-500ns when creating new arena
51pub fn get_arena() -> PooledArena {
52    let arena = ARENA_POOL.with(|pool| pool.borrow_mut().pop());
53
54    let arena = if let Some(mut arena) = arena {
55        arena.reset();
56        ARENAS_RECYCLED.fetch_add(1, Ordering::Relaxed);
57        arena
58    } else {
59        ARENAS_CREATED.fetch_add(1, Ordering::Relaxed);
60        Bump::with_capacity(DEFAULT_ARENA_SIZE)
61    };
62
63    PooledArena { arena: Some(arena) }
64}
65
66/// Get an arena with custom capacity
67pub fn get_arena_with_capacity(capacity: usize) -> PooledArena {
68    ARENAS_CREATED.fetch_add(1, Ordering::Relaxed);
69    PooledArena {
70        arena: Some(Bump::with_capacity(capacity)),
71    }
72}
73
74/// A pooled arena that returns to the thread-local pool when dropped
75pub struct PooledArena {
76    arena: Option<Bump>,
77}
78
79impl PooledArena {
80    /// Allocate a string slice in the arena
81    ///
82    /// # Performance
83    /// - ~2-5ns allocation
84    #[inline]
85    pub fn alloc_str(&self, s: &str) -> &str {
86        self.arena.as_ref().unwrap().alloc_str(s)
87    }
88
89    /// Allocate a byte slice in the arena
90    #[inline]
91    pub fn alloc_bytes(&self, bytes: &[u8]) -> &[u8] {
92        self.arena.as_ref().unwrap().alloc_slice_copy(bytes)
93    }
94
95    /// Allocate a value in the arena
96    #[inline]
97    pub fn alloc<T>(&self, val: T) -> &mut T {
98        self.arena.as_ref().unwrap().alloc(val)
99    }
100
101    /// Allocate a slice with fill function
102    #[inline]
103    pub fn alloc_slice_fill_with<T, F>(&self, len: usize, f: F) -> &mut [T]
104    where
105        F: FnMut(usize) -> T,
106    {
107        self.arena.as_ref().unwrap().alloc_slice_fill_with(len, f)
108    }
109
110    /// Get current allocation size
111    pub fn allocated(&self) -> usize {
112        self.arena.as_ref().unwrap().allocated_bytes()
113    }
114
115    /// Get a reference to the underlying bump allocator
116    pub fn inner(&self) -> &Bump {
117        self.arena.as_ref().unwrap()
118    }
119}
120
121impl Drop for PooledArena {
122    fn drop(&mut self) {
123        if let Some(arena) = self.arena.take() {
124            let allocated = arena.allocated_bytes();
125            BYTES_ALLOCATED.fetch_add(allocated as u64, Ordering::Relaxed);
126
127            ARENA_POOL.with(|pool| {
128                let mut pool = pool.borrow_mut();
129                if pool.len() < MAX_THREAD_LOCAL_ARENAS {
130                    pool.push(arena);
131                }
132                // Arena dropped if pool is full
133            });
134        }
135    }
136}
137
138/// Statistics for arena pool usage
139#[derive(Debug, Clone)]
140pub struct ArenaPoolStats {
141    /// Total arenas created across all threads
142    pub arenas_created: u64,
143    /// Total arenas recycled (reused from pool)
144    pub arenas_recycled: u64,
145    /// Total bytes allocated through arenas
146    pub bytes_allocated: u64,
147    /// Recycle rate (0.0 to 1.0)
148    pub recycle_rate: f64,
149}
150
151/// Get global arena pool statistics
152pub fn arena_stats() -> ArenaPoolStats {
153    let created = ARENAS_CREATED.load(Ordering::Relaxed);
154    let recycled = ARENAS_RECYCLED.load(Ordering::Relaxed);
155    let total = created + recycled;
156
157    ArenaPoolStats {
158        arenas_created: created,
159        arenas_recycled: recycled,
160        bytes_allocated: BYTES_ALLOCATED.load(Ordering::Relaxed),
161        recycle_rate: if total > 0 {
162            recycled as f64 / total as f64
163        } else {
164            0.0
165        },
166    }
167}
168
169/// Reset global statistics (for testing)
170pub fn reset_stats() {
171    ARENAS_CREATED.store(0, Ordering::Relaxed);
172    ARENAS_RECYCLED.store(0, Ordering::Relaxed);
173    BYTES_ALLOCATED.store(0, Ordering::Relaxed);
174}
175
176/// Scoped arena for temporary allocations
177///
178/// Provides a convenient RAII pattern for short-lived allocations.
179/// The arena is automatically returned to the pool when the scope ends.
180///
181/// # Example
182/// ```ignore
183/// {
184///     let arena = ScopedArena::new();
185///     let s1 = arena.alloc_str("temporary");
186///     let s2 = arena.alloc_str("data");
187///     // Use s1, s2...
188/// } // Arena automatically returned to pool
189/// ```
190pub struct ScopedArena {
191    arena: PooledArena,
192}
193
194impl ScopedArena {
195    /// Create a new scoped arena from the pool
196    pub fn new() -> Self {
197        Self { arena: get_arena() }
198    }
199
200    /// Create with custom capacity
201    pub fn with_capacity(capacity: usize) -> Self {
202        Self {
203            arena: get_arena_with_capacity(capacity),
204        }
205    }
206
207    /// Allocate a string
208    #[inline]
209    pub fn alloc_str(&self, s: &str) -> &str {
210        self.arena.alloc_str(s)
211    }
212
213    /// Allocate bytes
214    #[inline]
215    pub fn alloc_bytes(&self, bytes: &[u8]) -> &[u8] {
216        self.arena.alloc_bytes(bytes)
217    }
218
219    /// Allocate a value
220    #[inline]
221    pub fn alloc<T>(&self, val: T) -> &mut T {
222        self.arena.alloc(val)
223    }
224
225    /// Get current allocation size
226    pub fn allocated(&self) -> usize {
227        self.arena.allocated()
228    }
229}
230
231impl Default for ScopedArena {
232    fn default() -> Self {
233        Self::new()
234    }
235}
236
237/// Pre-allocated buffer pool for specific sizes
238///
239/// Useful when you need buffers of predictable sizes and want
240/// to avoid arena overhead for very small allocations.
241pub struct SizedBufferPool {
242    small: Vec<Vec<u8>>,  // < 1KB
243    medium: Vec<Vec<u8>>, // 1KB - 64KB
244    large: Vec<Vec<u8>>,  // > 64KB
245    small_size: usize,
246    medium_size: usize,
247    large_size: usize,
248    max_pool: usize,
249}
250
251impl SizedBufferPool {
252    /// Create a new sized buffer pool
253    pub fn new() -> Self {
254        Self {
255            small: Vec::new(),
256            medium: Vec::new(),
257            large: Vec::new(),
258            small_size: 1024,
259            medium_size: 64 * 1024,
260            large_size: 1024 * 1024,
261            max_pool: 32,
262        }
263    }
264
265    /// Get a buffer of at least the specified size
266    pub fn get(&mut self, min_size: usize) -> Vec<u8> {
267        let buf = if min_size <= self.small_size {
268            self.small.pop()
269        } else if min_size <= self.medium_size {
270            self.medium.pop()
271        } else {
272            self.large.pop()
273        };
274
275        if let Some(mut b) = buf {
276            b.clear();
277            if b.capacity() >= min_size {
278                b
279            } else {
280                Vec::with_capacity(min_size)
281            }
282        } else {
283            let capacity = if min_size <= self.small_size {
284                self.small_size
285            } else if min_size <= self.medium_size {
286                self.medium_size
287            } else {
288                self.large_size.max(min_size)
289            };
290            Vec::with_capacity(capacity)
291        }
292    }
293
294    /// Return a buffer to the pool
295    pub fn put(&mut self, mut buf: Vec<u8>) {
296        let cap = buf.capacity();
297        buf.clear();
298
299        if cap <= self.small_size && self.small.len() < self.max_pool {
300            self.small.push(buf);
301        } else if cap <= self.medium_size && self.medium.len() < self.max_pool {
302            self.medium.push(buf);
303        } else if self.large.len() < self.max_pool {
304            self.large.push(buf);
305        }
306        // Buffer dropped if all pools are full
307    }
308
309    /// Get pool statistics
310    pub fn pool_sizes(&self) -> (usize, usize, usize) {
311        (self.small.len(), self.medium.len(), self.large.len())
312    }
313}
314
315impl Default for SizedBufferPool {
316    fn default() -> Self {
317        Self::new()
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324
325    #[test]
326    fn test_get_arena() {
327        // Test that arena allocation and string storage works correctly
328        // Note: We don't check global stats here because they can be affected
329        // by other tests running in parallel (e.g., reset_stats() calls)
330        let arena1 = get_arena();
331        let s = arena1.alloc_str("hello");
332        assert_eq!(s, "hello");
333        assert!(arena1.allocated() > 0);
334
335        // Test that we can allocate multiple items
336        let s2 = arena1.alloc_str("world");
337        assert_eq!(s2, "world");
338
339        // Test that allocations persist
340        assert_eq!(s, "hello");
341        assert_eq!(s2, "world");
342    }
343
344    #[test]
345    fn test_arena_recycling() {
346        // Clear the thread-local pool first
347        ARENA_POOL.with(|pool| pool.borrow_mut().clear());
348
349        // Verify pool is empty
350        let pool_empty = ARENA_POOL.with(|pool| pool.borrow().is_empty());
351        assert!(pool_empty, "Pool should be empty after clear");
352
353        // Create and drop arena - should return to pool
354        let arena1 = get_arena();
355        let _ = arena1.alloc_str("test"); // Use the arena
356        drop(arena1);
357
358        // Verify arena was returned to pool
359        let pool_has_arena = ARENA_POOL.with(|pool| !pool.borrow().is_empty());
360        assert!(pool_has_arena, "Pool should have arena after drop");
361
362        // Get another arena (should be recycled from pool)
363        let arena2 = get_arena();
364        // Verify we can allocate in the recycled arena (it was reset)
365        let s = arena2.alloc_str("recycled");
366        assert_eq!(s, "recycled");
367
368        // Pool should now be empty (we took the arena)
369        let pool_empty_after = ARENA_POOL.with(|pool| pool.borrow().is_empty());
370        assert!(pool_empty_after, "Pool should be empty after taking arena");
371        drop(arena2);
372    }
373
374    #[test]
375    fn test_arena_allocations() {
376        let arena = get_arena();
377
378        let s1 = arena.alloc_str("hello");
379        let s2 = arena.alloc_str("world");
380        let bytes = arena.alloc_bytes(&[1, 2, 3, 4, 5]);
381
382        assert_eq!(s1, "hello");
383        assert_eq!(s2, "world");
384        assert_eq!(bytes, &[1, 2, 3, 4, 5]);
385
386        assert!(arena.allocated() > 0);
387    }
388
389    #[test]
390    fn test_scoped_arena() {
391        reset_stats();
392
393        {
394            let arena = ScopedArena::new();
395            let s = arena.alloc_str("scoped");
396            assert_eq!(s, "scoped");
397        } // Arena returned to pool
398
399        // Next arena should be recycled
400        let _ = ScopedArena::new();
401
402        let stats = arena_stats();
403        assert!(stats.arenas_recycled > 0 || stats.arenas_created > 0);
404    }
405
406    #[test]
407    fn test_sized_buffer_pool() {
408        let mut pool = SizedBufferPool::new();
409
410        // Get small buffer
411        let buf1 = pool.get(100);
412        assert!(buf1.capacity() >= 100);
413
414        // Get medium buffer
415        let buf2 = pool.get(10_000);
416        assert!(buf2.capacity() >= 10_000);
417
418        // Return buffers
419        pool.put(buf1);
420        pool.put(buf2);
421
422        let (small, medium, large) = pool.pool_sizes();
423        assert_eq!(small, 1);
424        assert_eq!(medium, 1);
425        assert_eq!(large, 0);
426    }
427
428    #[test]
429    fn test_sized_buffer_reuse() {
430        let mut pool = SizedBufferPool::new();
431
432        let mut buf1 = pool.get(100);
433        buf1.extend_from_slice(b"test data");
434        pool.put(buf1);
435
436        // Get buffer again - should be cleared
437        let buf2 = pool.get(100);
438        assert!(buf2.is_empty());
439        assert!(buf2.capacity() >= 100);
440    }
441
442    #[test]
443    fn test_arena_with_custom_capacity() {
444        let arena = get_arena_with_capacity(1024);
445        let s = arena.alloc_str("custom");
446        assert_eq!(s, "custom");
447    }
448
449    #[test]
450    fn test_concurrent_arena_access() {
451        reset_stats();
452
453        std::thread::scope(|s| {
454            for _ in 0..4 {
455                s.spawn(|| {
456                    for _ in 0..100 {
457                        let arena = get_arena();
458                        let _ = arena.alloc_str("concurrent test");
459                        drop(arena);
460                    }
461                });
462            }
463        });
464
465        let stats = arena_stats();
466        // Each thread has its own pool, so we should see both creates and recycles
467        assert!(stats.arenas_created > 0);
468        assert!(stats.arenas_recycled > 0);
469    }
470
471    #[test]
472    fn test_alloc_slice_fill() {
473        let arena = get_arena();
474        let slice = arena.alloc_slice_fill_with(5, |i| i * 2);
475        assert_eq!(slice, &[0, 2, 4, 6, 8]);
476    }
477}