Skip to main content

hexz_core/cache/
buffer_pool.rs

1//! Reusable buffer pool for decompression output buffers.
2//!
3//! Eliminates per-block `Vec<u8>` allocations during decompression by pooling
4//! and reusing buffers of common sizes. This is especially important for the
5//! parallel decompression path where N threads hitting the global allocator
6//! concurrently causes contention.
7//!
8//! # Design
9//!
10//! The pool is a simple `Mutex<Vec<Vec<u8>>>` stack. Buffers are checked out
11//! for decompression and returned after the decompressed data has been copied
12//! or converted to `Bytes`.
13//!
14//! When a buffer is needed for cache insertion (converting to `Bytes`), the
15//! buffer is consumed and not returned to the pool. The pool naturally reaches
16//! steady state as the block cache fills up and stops triggering new
17//! decompressions.
18
19use std::sync::Mutex;
20
21/// Default maximum number of idle buffers to retain in the pool.
22pub const DEFAULT_POOL_SIZE: usize = 16;
23
24/// A pool of reusable `Vec<u8>` buffers for decompression.
25///
26/// Thread-safe via internal `Mutex`. The pool stores buffers sorted by capacity
27/// to enable efficient size-matched checkout.
28pub struct BufferPool {
29    /// Stack of available buffers, largest capacity last for O(1) pop.
30    buffers: Mutex<Vec<Vec<u8>>>,
31    /// Maximum number of buffers to retain in the pool.
32    max_buffers: usize,
33}
34
35impl std::fmt::Debug for BufferPool {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        let pooled = self.buffers.lock().map_or(0, |b| b.len());
38        f.debug_struct("BufferPool")
39            .field("max_buffers", &self.max_buffers)
40            .field("pooled", &pooled)
41            .finish()
42    }
43}
44
45impl BufferPool {
46    /// Creates a new buffer pool.
47    ///
48    /// # Parameters
49    ///
50    /// - `max_buffers`: Maximum number of idle buffers to retain.
51    ///   Excess buffers returned via `checkin` are dropped.
52    pub fn new(max_buffers: usize) -> Self {
53        Self {
54            buffers: Mutex::new(Vec::with_capacity(max_buffers)),
55            max_buffers,
56        }
57    }
58
59    /// Checks out a buffer with at least `capacity` bytes.
60    ///
61    /// Returns a pooled buffer if one of sufficient size is available,
62    /// otherwise allocates a new one. The returned buffer has length 0
63    /// but capacity >= `capacity`.
64    pub fn checkout(&self, capacity: usize) -> Vec<u8> {
65        if let Ok(mut pool) = self.buffers.lock() {
66            // Find the first buffer with sufficient capacity (linear scan is
67            // fine since max_buffers is small, typically 8-32).
68            if let Some(idx) = pool.iter().position(|b| b.capacity() >= capacity) {
69                let mut buf = pool.swap_remove(idx);
70                buf.clear();
71                return buf;
72            }
73        }
74        Vec::with_capacity(capacity)
75    }
76
77    /// Returns a buffer to the pool for future reuse.
78    ///
79    /// If the pool is full, the buffer is dropped. Buffers are only worth
80    /// pooling if they have meaningful capacity (the caller should not return
81    /// tiny buffers).
82    pub fn checkin(&self, buf: Vec<u8>) {
83        if buf.capacity() == 0 {
84            return;
85        }
86        if let Ok(mut pool) = self.buffers.lock() {
87            if pool.len() < self.max_buffers {
88                pool.push(buf);
89            }
90            // else: drop buf (pool is full)
91        }
92        // else: lock poisoned, just drop the buffer
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99
100    #[test]
101    fn checkout_returns_sufficient_capacity() {
102        let pool = BufferPool::new(4);
103        let buf = pool.checkout(1024);
104        assert!(buf.capacity() >= 1024);
105        assert_eq!(buf.len(), 0);
106    }
107
108    #[test]
109    fn checkin_and_reuse() {
110        let pool = BufferPool::new(4);
111
112        // Allocate and return a buffer
113        let mut buf = pool.checkout(1024);
114        buf.extend_from_slice(&[42u8; 512]);
115        let cap = buf.capacity();
116        pool.checkin(buf);
117
118        // Should get the same buffer back (same capacity, cleared)
119        let buf2 = pool.checkout(1024);
120        assert_eq!(buf2.capacity(), cap);
121        assert_eq!(buf2.len(), 0);
122    }
123
124    #[test]
125    fn respects_max_buffers() {
126        let pool = BufferPool::new(2);
127
128        pool.checkin(Vec::with_capacity(1024));
129        pool.checkin(Vec::with_capacity(1024));
130        pool.checkin(Vec::with_capacity(1024)); // should be dropped
131
132        let guard = pool.buffers.lock().unwrap();
133        assert_eq!(guard.len(), 2);
134    }
135
136    #[test]
137    fn checkout_allocates_when_empty() {
138        let pool = BufferPool::new(4);
139        let buf = pool.checkout(65536);
140        assert!(buf.capacity() >= 65536);
141    }
142
143    #[test]
144    fn checkout_skips_too_small_buffers() {
145        let pool = BufferPool::new(4);
146        pool.checkin(Vec::with_capacity(512));
147
148        // Request larger than available
149        let buf = pool.checkout(1024);
150        assert!(buf.capacity() >= 1024);
151
152        // The small buffer should still be in the pool
153        let guard = pool.buffers.lock().unwrap();
154        assert_eq!(guard.len(), 1);
155    }
156}