Skip to main content

amaters_core/storage/
buffer_pool.rs

1//! Buffer pool for reusable byte buffers organized by size class.
2//!
3//! The buffer pool eliminates frequent heap allocations by maintaining free lists
4//! of pre-allocated buffers. When a [`PooledBuffer`] is dropped it is automatically
5//! returned to the pool for future reuse.
6//!
7//! # Size classes
8//!
9//! Buffers are grouped into nine size classes (in bytes):
10//! 4 KB, 8 KB, 16 KB, 32 KB, 64 KB, 128 KB, 256 KB, 512 KB, 1 MB.
11
12use parking_lot::Mutex;
13use std::ops::{Deref, DerefMut};
14use std::sync::{
15    Arc,
16    atomic::{AtomicU64, Ordering},
17};
18
19/// The nine supported size classes (bytes).
20const SIZE_CLASSES: [usize; 9] = [
21    4_096,     // 4 KB
22    8_192,     // 8 KB
23    16_384,    // 16 KB
24    32_768,    // 32 KB
25    65_536,    // 64 KB
26    131_072,   // 128 KB
27    262_144,   // 256 KB
28    524_288,   // 512 KB
29    1_048_576, // 1 MB
30];
31
32// ---------------------------------------------------------------------------
33// BufferPoolStats
34// ---------------------------------------------------------------------------
35
36/// Snapshot of [`BufferPool`] counters.
37#[derive(Debug, Clone)]
38pub struct BufferPoolStats {
39    /// Total number of [`PooledBuffer`] acquisitions.
40    pub allocations: u64,
41    /// Number of acquisitions satisfied from the free list.
42    pub recycles: u64,
43    /// Number of acquisitions that required a fresh heap allocation (bypassed
44    /// the pool, either because the free list was empty or the requested size
45    /// exceeded the largest size class).
46    pub misses: u64,
47    /// `recycles / allocations`, or `0.0` when `allocations == 0`.
48    pub recycle_rate: f64,
49}
50
51// ---------------------------------------------------------------------------
52// BufferPool
53// ---------------------------------------------------------------------------
54
55/// A pool of reusable byte buffers organized by size class.
56///
57/// Obtain a buffer with [`BufferPool::acquire`]; it is automatically returned
58/// when the [`PooledBuffer`] is dropped.
59pub struct BufferPool {
60    /// One free list per size class.
61    free_lists: Arc<[Mutex<Vec<Vec<u8>>>; 9]>,
62    /// Maximum number of buffers to retain per size class.
63    capacity_per_class: usize,
64    /// Total [`acquire`] calls.
65    allocations: AtomicU64,
66    /// Acquisitions served from the free list.
67    recycles: AtomicU64,
68    /// Acquisitions that fell back to fresh allocation.
69    misses: AtomicU64,
70}
71
72impl BufferPool {
73    /// Create a new pool wrapping at most `capacity_per_class` idle buffers per
74    /// size class.
75    pub fn new(capacity_per_class: usize) -> Arc<Self> {
76        Arc::new(Self {
77            free_lists: Arc::new([
78                Mutex::new(Vec::new()),
79                Mutex::new(Vec::new()),
80                Mutex::new(Vec::new()),
81                Mutex::new(Vec::new()),
82                Mutex::new(Vec::new()),
83                Mutex::new(Vec::new()),
84                Mutex::new(Vec::new()),
85                Mutex::new(Vec::new()),
86                Mutex::new(Vec::new()),
87            ]),
88            capacity_per_class,
89            allocations: AtomicU64::new(0),
90            recycles: AtomicU64::new(0),
91            misses: AtomicU64::new(0),
92        })
93    }
94
95    /// Acquire a buffer large enough to hold `min_size` bytes.
96    ///
97    /// The returned [`PooledBuffer`] tracks its size class and will return
98    /// itself to the pool when dropped.
99    ///
100    /// If `min_size` exceeds the largest size class (1 MB), a fresh allocation
101    /// is issued on every call and the buffer is **not** pooled on release.  In
102    /// this case `misses` is incremented.
103    pub fn acquire(self: &Arc<Self>, min_size: usize) -> PooledBuffer {
104        self.allocations.fetch_add(1, Ordering::Relaxed);
105
106        // Find the smallest size class that fits.
107        let class_idx = SIZE_CLASSES.iter().position(|&cap| cap >= min_size);
108
109        match class_idx {
110            Some(idx) => {
111                let class_size = SIZE_CLASSES[idx];
112                // Try to pop an existing buffer from the free list.
113                let maybe_buf = self.free_lists[idx].lock().pop();
114                match maybe_buf {
115                    Some(mut buf) => {
116                        // Reuse: clear content, keep allocation.
117                        buf.clear();
118                        buf.resize(class_size, 0u8);
119                        self.recycles.fetch_add(1, Ordering::Relaxed);
120                        PooledBuffer {
121                            data: Some(buf),
122                            pool: Arc::clone(self),
123                            size_class: idx,
124                        }
125                    }
126                    None => {
127                        // Free list is empty — allocate fresh.
128                        self.misses.fetch_add(1, Ordering::Relaxed);
129                        let buf = vec![0u8; class_size];
130                        PooledBuffer {
131                            data: Some(buf),
132                            pool: Arc::clone(self),
133                            size_class: idx,
134                        }
135                    }
136                }
137            }
138            None => {
139                // Requested size exceeds every size class — allocate without
140                // pooling (size_class == usize::MAX signals "no pool slot").
141                self.misses.fetch_add(1, Ordering::Relaxed);
142                let buf = vec![0u8; min_size];
143                PooledBuffer {
144                    data: Some(buf),
145                    pool: Arc::clone(self),
146                    size_class: usize::MAX,
147                }
148            }
149        }
150    }
151
152    /// Return a buffer to the pool.
153    ///
154    /// This is called automatically by [`PooledBuffer::drop`].
155    pub fn release(&self, buffer: Vec<u8>, size_class: usize) {
156        // Oversized buffers (sentinel class) are simply dropped.
157        if size_class >= SIZE_CLASSES.len() {
158            return;
159        }
160        let mut list = self.free_lists[size_class].lock();
161        if list.len() < self.capacity_per_class {
162            list.push(buffer);
163        }
164        // If the list is full the buffer is simply dropped (list lock released).
165    }
166
167    /// Total number of `acquire` calls.
168    pub fn allocations(&self) -> u64 {
169        self.allocations.load(Ordering::Relaxed)
170    }
171
172    /// Number of acquisitions served from the free list.
173    pub fn recycles(&self) -> u64 {
174        self.recycles.load(Ordering::Relaxed)
175    }
176
177    /// Number of acquisitions that required a fresh heap allocation.
178    pub fn misses(&self) -> u64 {
179        self.misses.load(Ordering::Relaxed)
180    }
181
182    /// Return a snapshot of all counters.
183    pub fn stats(&self) -> BufferPoolStats {
184        let allocations = self.allocations();
185        let recycles = self.recycles();
186        let misses = self.misses();
187        let recycle_rate = if allocations == 0 {
188            0.0
189        } else {
190            recycles as f64 / allocations as f64
191        };
192        BufferPoolStats {
193            allocations,
194            recycles,
195            misses,
196            recycle_rate,
197        }
198    }
199}
200
201// ---------------------------------------------------------------------------
202// PooledBuffer
203// ---------------------------------------------------------------------------
204
205/// A byte buffer borrowed from a [`BufferPool`].
206///
207/// The buffer is automatically returned to the pool when this value is dropped.
208pub struct PooledBuffer {
209    /// `None` only transiently during `Drop`.
210    data: Option<Vec<u8>>,
211    pool: Arc<BufferPool>,
212    /// Index into [`SIZE_CLASSES`], or `usize::MAX` for oversized allocations.
213    size_class: usize,
214}
215
216impl Drop for PooledBuffer {
217    fn drop(&mut self) {
218        let buf = self.data.take().unwrap_or_default();
219        self.pool.release(buf, self.size_class);
220    }
221}
222
223impl Deref for PooledBuffer {
224    type Target = [u8];
225
226    fn deref(&self) -> &[u8] {
227        self.data.as_deref().unwrap_or(&[])
228    }
229}
230
231impl DerefMut for PooledBuffer {
232    fn deref_mut(&mut self) -> &mut [u8] {
233        self.data.as_deref_mut().unwrap_or(&mut [])
234    }
235}
236
237impl AsRef<[u8]> for PooledBuffer {
238    fn as_ref(&self) -> &[u8] {
239        self.deref()
240    }
241}
242
243impl AsMut<[u8]> for PooledBuffer {
244    fn as_mut(&mut self) -> &mut [u8] {
245        self.deref_mut()
246    }
247}
248
249impl PooledBuffer {
250    /// Length of the buffer in bytes.
251    pub fn len(&self) -> usize {
252        self.data.as_ref().map_or(0, |d| d.len())
253    }
254
255    /// `true` if the buffer has zero length.
256    pub fn is_empty(&self) -> bool {
257        self.len() == 0
258    }
259
260    /// Allocated capacity of the inner `Vec`.
261    pub fn capacity(&self) -> usize {
262        self.data.as_ref().map_or(0, |d| d.capacity())
263    }
264
265    /// Immutable byte slice view.
266    pub fn as_slice(&self) -> &[u8] {
267        self.deref()
268    }
269
270    /// Mutable byte slice view.
271    pub fn as_mut_slice(&mut self) -> &mut [u8] {
272        self.deref_mut()
273    }
274
275    /// Resize the inner `Vec`, filling new elements with `value`.
276    pub fn resize(&mut self, new_len: usize, value: u8) {
277        if let Some(ref mut v) = self.data {
278            v.resize(new_len, value);
279        }
280    }
281}
282
283// ---------------------------------------------------------------------------
284// Tests
285// ---------------------------------------------------------------------------
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    /// Verify that a buffer acquired for `min_size` bytes has at least that
292    /// many bytes available.
293    #[test]
294    fn test_buffer_pool_basic_acquire() {
295        let pool = BufferPool::new(4);
296        let buf = pool.acquire(1_000);
297        assert!(
298            buf.len() >= 1_000,
299            "buffer must be at least the requested size"
300        );
301    }
302
303    /// Drop a buffer, re-acquire one, and verify the recycle counter climbs.
304    #[test]
305    fn test_buffer_pool_recycle() {
306        let pool = BufferPool::new(4);
307        let buf = pool.acquire(4_096);
308        drop(buf);
309        let _buf2 = pool.acquire(4_096);
310        assert!(pool.recycles() >= 1, "second acquire should be a recycle");
311    }
312
313    /// Verify that buffers are rounded up to the correct size-class capacity.
314    ///
315    /// Requested sizes and expected capacities:
316    /// - 1 byte  → 4 KB class  (4 096)
317    /// - 4 096   → 4 KB class  (4 096)
318    /// - 8 193   → 16 KB class (16 384)
319    /// - 65 537  → 128 KB class (131 072)
320    #[test]
321    fn test_buffer_pool_size_classes() {
322        let pool = BufferPool::new(4);
323
324        let cases: &[(usize, usize)] = &[
325            (1, SIZE_CLASSES[0]),          // 1 → 4 KB
326            (4_096, SIZE_CLASSES[0]),      // exactly 4 KB → 4 KB
327            (8_193, SIZE_CLASSES[2]),      // just over 8 KB → 16 KB
328            (65_537, SIZE_CLASSES[4 + 1]), // just over 64 KB → 128 KB
329        ];
330
331        for &(req, expected_cap) in cases {
332            let buf = pool.acquire(req);
333            assert_eq!(
334                buf.capacity(),
335                expected_cap,
336                "request {} → expected size-class capacity {}",
337                req,
338                expected_cap
339            );
340        }
341    }
342
343    /// The free list must not grow beyond `capacity_per_class`.
344    #[test]
345    fn test_buffer_pool_capacity_limit() {
346        let capacity = 3usize;
347        let pool = BufferPool::new(capacity);
348
349        // Acquire capacity + 1 buffers, all in the same size class.
350        let buffers: Vec<_> = (0..capacity + 1).map(|_| pool.acquire(4_096)).collect();
351        // Drop them all — the pool may keep at most `capacity`.
352        drop(buffers);
353
354        let free_count = pool.free_lists[0].lock().len();
355        assert!(
356            free_count <= capacity,
357            "free list must not exceed capacity (got {})",
358            free_count
359        );
360    }
361
362    /// Write via `DerefMut`, read via `Deref`.
363    #[test]
364    fn test_pooled_buffer_deref() {
365        let pool = BufferPool::new(4);
366        let mut buf = pool.acquire(4_096);
367
368        buf[0] = 0xAA;
369        buf[1] = 0xBB;
370
371        assert_eq!(buf[0], 0xAA);
372        assert_eq!(buf[1], 0xBB);
373    }
374
375    /// Counters must reflect acquire / recycle / miss semantics.
376    #[test]
377    fn test_buffer_pool_stats() {
378        let pool = BufferPool::new(4);
379
380        // First acquire: miss (free list is empty).
381        let b1 = pool.acquire(4_096);
382        assert_eq!(pool.allocations(), 1);
383        assert_eq!(pool.misses(), 1);
384        assert_eq!(pool.recycles(), 0);
385
386        // Release b1 back to the pool.
387        drop(b1);
388
389        // Second acquire: recycle.
390        let _b2 = pool.acquire(4_096);
391        assert_eq!(pool.allocations(), 2);
392        assert_eq!(pool.misses(), 1);
393        assert_eq!(pool.recycles(), 1);
394
395        let stats = pool.stats();
396        assert_eq!(stats.allocations, 2);
397        assert_eq!(stats.recycles, 1);
398        assert!((stats.recycle_rate - 0.5).abs() < f64::EPSILON);
399    }
400}