hexz_core/cache/buffer_pool.rs
1//! Reusable buffer pool for decompression output buffers.
2//!
3//! Eliminates per-block `Vec<u8>` allocations during decompression by pooling
4//! and reusing buffers of common sizes. This is especially important for the
5//! parallel decompression path where N threads hitting the global allocator
6//! concurrently causes contention.
7//!
8//! # Design
9//!
10//! The pool is a simple `Mutex<Vec<Vec<u8>>>` stack. Buffers are checked out
11//! for decompression and returned after the decompressed data has been copied
12//! or converted to `Bytes`.
13//!
14//! When a buffer is needed for cache insertion (converting to `Bytes`), the
15//! buffer is consumed and not returned to the pool. The pool naturally reaches
16//! steady state as the block cache fills up and stops triggering new
17//! decompressions.
18
19use std::sync::Mutex;
20
21/// Default maximum number of idle buffers to retain in the pool.
22pub const DEFAULT_POOL_SIZE: usize = 16;
23
24/// A pool of reusable `Vec<u8>` buffers for decompression.
25///
26/// Thread-safe via internal `Mutex`. The pool stores buffers sorted by capacity
27/// to enable efficient size-matched checkout.
28pub struct BufferPool {
29 /// Stack of available buffers, largest capacity last for O(1) pop.
30 buffers: Mutex<Vec<Vec<u8>>>,
31 /// Maximum number of buffers to retain in the pool.
32 max_buffers: usize,
33}
34
35impl std::fmt::Debug for BufferPool {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 let pooled = self.buffers.lock().map_or(0, |b| b.len());
38 f.debug_struct("BufferPool")
39 .field("max_buffers", &self.max_buffers)
40 .field("pooled", &pooled)
41 .finish()
42 }
43}
44
45impl BufferPool {
46 /// Creates a new buffer pool.
47 ///
48 /// # Parameters
49 ///
50 /// - `max_buffers`: Maximum number of idle buffers to retain.
51 /// Excess buffers returned via `checkin` are dropped.
52 pub fn new(max_buffers: usize) -> Self {
53 Self {
54 buffers: Mutex::new(Vec::with_capacity(max_buffers)),
55 max_buffers,
56 }
57 }
58
59 /// Checks out a buffer with at least `capacity` bytes.
60 ///
61 /// Returns a pooled buffer if one of sufficient size is available,
62 /// otherwise allocates a new one. The returned buffer has length 0
63 /// but capacity >= `capacity`.
64 pub fn checkout(&self, capacity: usize) -> Vec<u8> {
65 if let Ok(mut pool) = self.buffers.lock() {
66 // Find the first buffer with sufficient capacity (linear scan is
67 // fine since max_buffers is small, typically 8-32).
68 if let Some(idx) = pool.iter().position(|b| b.capacity() >= capacity) {
69 let mut buf = pool.swap_remove(idx);
70 buf.clear();
71 return buf;
72 }
73 }
74 Vec::with_capacity(capacity)
75 }
76
77 /// Returns a buffer to the pool for future reuse.
78 ///
79 /// If the pool is full, the buffer is dropped. Buffers are only worth
80 /// pooling if they have meaningful capacity (the caller should not return
81 /// tiny buffers).
82 pub fn checkin(&self, buf: Vec<u8>) {
83 if buf.capacity() == 0 {
84 return;
85 }
86 if let Ok(mut pool) = self.buffers.lock() {
87 if pool.len() < self.max_buffers {
88 pool.push(buf);
89 }
90 // else: drop buf (pool is full)
91 }
92 // else: lock poisoned, just drop the buffer
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99
100 #[test]
101 fn checkout_returns_sufficient_capacity() {
102 let pool = BufferPool::new(4);
103 let buf = pool.checkout(1024);
104 assert!(buf.capacity() >= 1024);
105 assert_eq!(buf.len(), 0);
106 }
107
108 #[test]
109 fn checkin_and_reuse() {
110 let pool = BufferPool::new(4);
111
112 // Allocate and return a buffer
113 let mut buf = pool.checkout(1024);
114 buf.extend_from_slice(&[42u8; 512]);
115 let cap = buf.capacity();
116 pool.checkin(buf);
117
118 // Should get the same buffer back (same capacity, cleared)
119 let buf2 = pool.checkout(1024);
120 assert_eq!(buf2.capacity(), cap);
121 assert_eq!(buf2.len(), 0);
122 }
123
124 #[test]
125 fn respects_max_buffers() {
126 let pool = BufferPool::new(2);
127
128 pool.checkin(Vec::with_capacity(1024));
129 pool.checkin(Vec::with_capacity(1024));
130 pool.checkin(Vec::with_capacity(1024)); // should be dropped
131
132 let guard = pool.buffers.lock().unwrap();
133 assert_eq!(guard.len(), 2);
134 }
135
136 #[test]
137 fn checkout_allocates_when_empty() {
138 let pool = BufferPool::new(4);
139 let buf = pool.checkout(65536);
140 assert!(buf.capacity() >= 65536);
141 }
142
143 #[test]
144 fn checkout_skips_too_small_buffers() {
145 let pool = BufferPool::new(4);
146 pool.checkin(Vec::with_capacity(512));
147
148 // Request larger than available
149 let buf = pool.checkout(1024);
150 assert!(buf.capacity() >= 1024);
151
152 // The small buffer should still be in the pool
153 let guard = pool.buffers.lock().unwrap();
154 assert_eq!(guard.len(), 1);
155 }
156}