Skip to main content

allsource_core/infrastructure/persistence/
arena_pool.rs

1//! Arena-based memory pooling for high-throughput event processing
2//!
3//! This module provides thread-local arena allocators for zero-allocation
4//! hot paths. Arenas are recycled and reused to minimize memory churn.
5//!
6//! # Performance Characteristics
7//! - ~2-5ns allocations (vs ~50-100ns for standard allocation)
8//! - Zero fragmentation within arena
9//! - Batch deallocation (entire arena at once)
10//! - Thread-local access eliminates contention
11//!
12//! # Usage Pattern
13//! ```ignore
14//! // Get arena from thread-local pool
15//! let mut arena = get_thread_local_arena();
16//!
17//! // Allocate in arena (very fast)
18//! let s = arena.alloc_str("hello");
19//! let bytes = arena.alloc_bytes(&[1, 2, 3]);
20//!
21//! // Arena is automatically returned to pool when dropped
22//! ```
23
24use bumpalo::Bump;
25use std::cell::RefCell;
26use std::sync::atomic::{AtomicU64, Ordering};
27
28/// Default arena size (16MB)
29const DEFAULT_ARENA_SIZE: usize = 16 * 1024 * 1024;
30
31/// Maximum arenas to keep in thread-local pool
32const MAX_THREAD_LOCAL_ARENAS: usize = 4;
33
34/// Global statistics for arena pool usage
35static ARENAS_CREATED: AtomicU64 = AtomicU64::new(0);
36static ARENAS_RECYCLED: AtomicU64 = AtomicU64::new(0);
37static BYTES_ALLOCATED: AtomicU64 = AtomicU64::new(0);
38
39// Thread-local arena pool
40thread_local! {
41    static ARENA_POOL: RefCell<Vec<Bump>> = RefCell::new(Vec::with_capacity(MAX_THREAD_LOCAL_ARENAS));
42}
43
44/// Get an arena from the thread-local pool (or create a new one)
45///
46/// # Performance
47/// - ~10-20ns when recycling from pool
48/// - ~100-500ns when creating new arena
49pub fn get_arena() -> PooledArena {
50    let arena = ARENA_POOL.with(|pool| pool.borrow_mut().pop());
51
52    let arena = match arena {
53        Some(mut arena) => {
54            arena.reset();
55            ARENAS_RECYCLED.fetch_add(1, Ordering::Relaxed);
56            arena
57        }
58        None => {
59            ARENAS_CREATED.fetch_add(1, Ordering::Relaxed);
60            Bump::with_capacity(DEFAULT_ARENA_SIZE)
61        }
62    };
63
64    PooledArena { arena: Some(arena) }
65}
66
67/// Get an arena with custom capacity
68pub fn get_arena_with_capacity(capacity: usize) -> PooledArena {
69    ARENAS_CREATED.fetch_add(1, Ordering::Relaxed);
70    PooledArena {
71        arena: Some(Bump::with_capacity(capacity)),
72    }
73}
74
75/// A pooled arena that returns to the thread-local pool when dropped
76pub struct PooledArena {
77    arena: Option<Bump>,
78}
79
80impl PooledArena {
81    /// Allocate a string slice in the arena
82    ///
83    /// # Performance
84    /// - ~2-5ns allocation
85    #[inline]
86    pub fn alloc_str(&self, s: &str) -> &str {
87        self.arena.as_ref().unwrap().alloc_str(s)
88    }
89
90    /// Allocate a byte slice in the arena
91    #[inline]
92    pub fn alloc_bytes(&self, bytes: &[u8]) -> &[u8] {
93        self.arena.as_ref().unwrap().alloc_slice_copy(bytes)
94    }
95
96    /// Allocate a value in the arena
97    #[inline]
98    pub fn alloc<T>(&self, val: T) -> &mut T {
99        self.arena.as_ref().unwrap().alloc(val)
100    }
101
102    /// Allocate a slice with fill function
103    #[inline]
104    pub fn alloc_slice_fill_with<T, F>(&self, len: usize, f: F) -> &mut [T]
105    where
106        F: FnMut(usize) -> T,
107    {
108        self.arena.as_ref().unwrap().alloc_slice_fill_with(len, f)
109    }
110
111    /// Get current allocation size
112    pub fn allocated(&self) -> usize {
113        self.arena.as_ref().unwrap().allocated_bytes()
114    }
115
116    /// Get a reference to the underlying bump allocator
117    pub fn inner(&self) -> &Bump {
118        self.arena.as_ref().unwrap()
119    }
120}
121
122impl Drop for PooledArena {
123    fn drop(&mut self) {
124        if let Some(arena) = self.arena.take() {
125            let allocated = arena.allocated_bytes();
126            BYTES_ALLOCATED.fetch_add(allocated as u64, Ordering::Relaxed);
127
128            ARENA_POOL.with(|pool| {
129                let mut pool = pool.borrow_mut();
130                if pool.len() < MAX_THREAD_LOCAL_ARENAS {
131                    pool.push(arena);
132                }
133                // Arena dropped if pool is full
134            });
135        }
136    }
137}
138
139/// Statistics for arena pool usage
140#[derive(Debug, Clone)]
141pub struct ArenaPoolStats {
142    /// Total arenas created across all threads
143    pub arenas_created: u64,
144    /// Total arenas recycled (reused from pool)
145    pub arenas_recycled: u64,
146    /// Total bytes allocated through arenas
147    pub bytes_allocated: u64,
148    /// Recycle rate (0.0 to 1.0)
149    pub recycle_rate: f64,
150}
151
152/// Get global arena pool statistics
153pub fn arena_stats() -> ArenaPoolStats {
154    let created = ARENAS_CREATED.load(Ordering::Relaxed);
155    let recycled = ARENAS_RECYCLED.load(Ordering::Relaxed);
156    let total = created + recycled;
157
158    ArenaPoolStats {
159        arenas_created: created,
160        arenas_recycled: recycled,
161        bytes_allocated: BYTES_ALLOCATED.load(Ordering::Relaxed),
162        recycle_rate: if total > 0 {
163            recycled as f64 / total as f64
164        } else {
165            0.0
166        },
167    }
168}
169
170/// Reset global statistics (for testing)
171pub fn reset_stats() {
172    ARENAS_CREATED.store(0, Ordering::Relaxed);
173    ARENAS_RECYCLED.store(0, Ordering::Relaxed);
174    BYTES_ALLOCATED.store(0, Ordering::Relaxed);
175}
176
177/// Scoped arena for temporary allocations
178///
179/// Provides a convenient RAII pattern for short-lived allocations.
180/// The arena is automatically returned to the pool when the scope ends.
181///
182/// # Example
183/// ```ignore
184/// {
185///     let arena = ScopedArena::new();
186///     let s1 = arena.alloc_str("temporary");
187///     let s2 = arena.alloc_str("data");
188///     // Use s1, s2...
189/// } // Arena automatically returned to pool
190/// ```
191pub struct ScopedArena {
192    arena: PooledArena,
193}
194
195impl ScopedArena {
196    /// Create a new scoped arena from the pool
197    pub fn new() -> Self {
198        Self { arena: get_arena() }
199    }
200
201    /// Create with custom capacity
202    pub fn with_capacity(capacity: usize) -> Self {
203        Self {
204            arena: get_arena_with_capacity(capacity),
205        }
206    }
207
208    /// Allocate a string
209    #[inline]
210    pub fn alloc_str(&self, s: &str) -> &str {
211        self.arena.alloc_str(s)
212    }
213
214    /// Allocate bytes
215    #[inline]
216    pub fn alloc_bytes(&self, bytes: &[u8]) -> &[u8] {
217        self.arena.alloc_bytes(bytes)
218    }
219
220    /// Allocate a value
221    #[inline]
222    pub fn alloc<T>(&self, val: T) -> &mut T {
223        self.arena.alloc(val)
224    }
225
226    /// Get current allocation size
227    pub fn allocated(&self) -> usize {
228        self.arena.allocated()
229    }
230}
231
232impl Default for ScopedArena {
233    fn default() -> Self {
234        Self::new()
235    }
236}
237
238/// Pre-allocated buffer pool for specific sizes
239///
240/// Useful when you need buffers of predictable sizes and want
241/// to avoid arena overhead for very small allocations.
242pub struct SizedBufferPool {
243    small: Vec<Vec<u8>>,  // < 1KB
244    medium: Vec<Vec<u8>>, // 1KB - 64KB
245    large: Vec<Vec<u8>>,  // > 64KB
246    small_size: usize,
247    medium_size: usize,
248    large_size: usize,
249    max_pool: usize,
250}
251
252impl SizedBufferPool {
253    /// Create a new sized buffer pool
254    pub fn new() -> Self {
255        Self {
256            small: Vec::new(),
257            medium: Vec::new(),
258            large: Vec::new(),
259            small_size: 1024,
260            medium_size: 64 * 1024,
261            large_size: 1024 * 1024,
262            max_pool: 32,
263        }
264    }
265
266    /// Get a buffer of at least the specified size
267    pub fn get(&mut self, min_size: usize) -> Vec<u8> {
268        let buf = if min_size <= self.small_size {
269            self.small.pop()
270        } else if min_size <= self.medium_size {
271            self.medium.pop()
272        } else {
273            self.large.pop()
274        };
275
276        match buf {
277            Some(mut b) => {
278                b.clear();
279                if b.capacity() >= min_size {
280                    b
281                } else {
282                    Vec::with_capacity(min_size)
283                }
284            }
285            None => {
286                let capacity = if min_size <= self.small_size {
287                    self.small_size
288                } else if min_size <= self.medium_size {
289                    self.medium_size
290                } else {
291                    self.large_size.max(min_size)
292                };
293                Vec::with_capacity(capacity)
294            }
295        }
296    }
297
298    /// Return a buffer to the pool
299    pub fn put(&mut self, mut buf: Vec<u8>) {
300        let cap = buf.capacity();
301        buf.clear();
302
303        if cap <= self.small_size && self.small.len() < self.max_pool {
304            self.small.push(buf);
305        } else if cap <= self.medium_size && self.medium.len() < self.max_pool {
306            self.medium.push(buf);
307        } else if self.large.len() < self.max_pool {
308            self.large.push(buf);
309        }
310        // Buffer dropped if all pools are full
311    }
312
313    /// Get pool statistics
314    pub fn pool_sizes(&self) -> (usize, usize, usize) {
315        (self.small.len(), self.medium.len(), self.large.len())
316    }
317}
318
319impl Default for SizedBufferPool {
320    fn default() -> Self {
321        Self::new()
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328
329    #[test]
330    fn test_get_arena() {
331        let stats_before = arena_stats();
332
333        let arena1 = get_arena();
334        let s = arena1.alloc_str("hello");
335        assert_eq!(s, "hello");
336        assert!(arena1.allocated() > 0);
337
338        drop(arena1);
339
340        let stats_after = arena_stats();
341        // Total operations should have increased by at least 1
342        let total_before = stats_before.arenas_created + stats_before.arenas_recycled;
343        let total_after = stats_after.arenas_created + stats_after.arenas_recycled;
344        assert!(
345            total_after >= total_before,
346            "Expected total ops {} >= {}",
347            total_after,
348            total_before
349        );
350    }
351
352    #[test]
353    fn test_arena_recycling() {
354        // Clear the thread-local pool first
355        ARENA_POOL.with(|pool| pool.borrow_mut().clear());
356
357        // Verify pool is empty
358        let pool_empty = ARENA_POOL.with(|pool| pool.borrow().is_empty());
359        assert!(pool_empty, "Pool should be empty after clear");
360
361        // Create and drop arena - should return to pool
362        let arena1 = get_arena();
363        let _ = arena1.alloc_str("test"); // Use the arena
364        drop(arena1);
365
366        // Verify arena was returned to pool
367        let pool_has_arena = ARENA_POOL.with(|pool| !pool.borrow().is_empty());
368        assert!(pool_has_arena, "Pool should have arena after drop");
369
370        // Get another arena (should be recycled from pool)
371        let arena2 = get_arena();
372        // Verify we can allocate in the recycled arena (it was reset)
373        let s = arena2.alloc_str("recycled");
374        assert_eq!(s, "recycled");
375
376        // Pool should now be empty (we took the arena)
377        let pool_empty_after = ARENA_POOL.with(|pool| pool.borrow().is_empty());
378        assert!(pool_empty_after, "Pool should be empty after taking arena");
379        drop(arena2);
380    }
381
382    #[test]
383    fn test_arena_allocations() {
384        let arena = get_arena();
385
386        let s1 = arena.alloc_str("hello");
387        let s2 = arena.alloc_str("world");
388        let bytes = arena.alloc_bytes(&[1, 2, 3, 4, 5]);
389
390        assert_eq!(s1, "hello");
391        assert_eq!(s2, "world");
392        assert_eq!(bytes, &[1, 2, 3, 4, 5]);
393
394        assert!(arena.allocated() > 0);
395    }
396
397    #[test]
398    fn test_scoped_arena() {
399        reset_stats();
400
401        {
402            let arena = ScopedArena::new();
403            let s = arena.alloc_str("scoped");
404            assert_eq!(s, "scoped");
405        } // Arena returned to pool
406
407        // Next arena should be recycled
408        let _ = ScopedArena::new();
409
410        let stats = arena_stats();
411        assert!(stats.arenas_recycled > 0 || stats.arenas_created > 0);
412    }
413
414    #[test]
415    fn test_sized_buffer_pool() {
416        let mut pool = SizedBufferPool::new();
417
418        // Get small buffer
419        let buf1 = pool.get(100);
420        assert!(buf1.capacity() >= 100);
421
422        // Get medium buffer
423        let buf2 = pool.get(10_000);
424        assert!(buf2.capacity() >= 10_000);
425
426        // Return buffers
427        pool.put(buf1);
428        pool.put(buf2);
429
430        let (small, medium, large) = pool.pool_sizes();
431        assert_eq!(small, 1);
432        assert_eq!(medium, 1);
433        assert_eq!(large, 0);
434    }
435
436    #[test]
437    fn test_sized_buffer_reuse() {
438        let mut pool = SizedBufferPool::new();
439
440        let mut buf1 = pool.get(100);
441        buf1.extend_from_slice(b"test data");
442        pool.put(buf1);
443
444        // Get buffer again - should be cleared
445        let buf2 = pool.get(100);
446        assert!(buf2.is_empty());
447        assert!(buf2.capacity() >= 100);
448    }
449
450    #[test]
451    fn test_arena_with_custom_capacity() {
452        let arena = get_arena_with_capacity(1024);
453        let s = arena.alloc_str("custom");
454        assert_eq!(s, "custom");
455    }
456
457    #[test]
458    fn test_concurrent_arena_access() {
459        reset_stats();
460
461        std::thread::scope(|s| {
462            for _ in 0..4 {
463                s.spawn(|| {
464                    for _ in 0..100 {
465                        let arena = get_arena();
466                        let _ = arena.alloc_str("concurrent test");
467                        drop(arena);
468                    }
469                });
470            }
471        });
472
473        let stats = arena_stats();
474        // Each thread has its own pool, so we should see both creates and recycles
475        assert!(stats.arenas_created > 0);
476        assert!(stats.arenas_recycled > 0);
477    }
478
479    #[test]
480    fn test_alloc_slice_fill() {
481        let arena = get_arena();
482        let slice = arena.alloc_slice_fill_with(5, |i| i * 2);
483        assert_eq!(slice, &[0, 2, 4, 6, 8]);
484    }
485}