amaters_core/storage/buffer_pool.rs
1//! Buffer pool for reusable byte buffers organized by size class.
2//!
3//! The buffer pool eliminates frequent heap allocations by maintaining free lists
4//! of pre-allocated buffers. When a [`PooledBuffer`] is dropped it is automatically
5//! returned to the pool for future reuse.
6//!
7//! # Size classes
8//!
9//! Buffers are grouped into nine size classes (in bytes):
10//! 4 KB, 8 KB, 16 KB, 32 KB, 64 KB, 128 KB, 256 KB, 512 KB, 1 MB.
11
12use parking_lot::Mutex;
13use std::ops::{Deref, DerefMut};
14use std::sync::{
15 Arc,
16 atomic::{AtomicU64, Ordering},
17};
18
19/// The nine supported size classes (bytes).
20const SIZE_CLASSES: [usize; 9] = [
21 4_096, // 4 KB
22 8_192, // 8 KB
23 16_384, // 16 KB
24 32_768, // 32 KB
25 65_536, // 64 KB
26 131_072, // 128 KB
27 262_144, // 256 KB
28 524_288, // 512 KB
29 1_048_576, // 1 MB
30];
31
32// ---------------------------------------------------------------------------
33// BufferPoolStats
34// ---------------------------------------------------------------------------
35
36/// Snapshot of [`BufferPool`] counters.
37#[derive(Debug, Clone)]
38pub struct BufferPoolStats {
39 /// Total number of [`PooledBuffer`] acquisitions.
40 pub allocations: u64,
41 /// Number of acquisitions satisfied from the free list.
42 pub recycles: u64,
43 /// Number of acquisitions that required a fresh heap allocation (bypassed
44 /// the pool, either because the free list was empty or the requested size
45 /// exceeded the largest size class).
46 pub misses: u64,
47 /// `recycles / allocations`, or `0.0` when `allocations == 0`.
48 pub recycle_rate: f64,
49}
50
51// ---------------------------------------------------------------------------
52// BufferPool
53// ---------------------------------------------------------------------------
54
55/// A pool of reusable byte buffers organized by size class.
56///
57/// Obtain a buffer with [`BufferPool::acquire`]; it is automatically returned
58/// when the [`PooledBuffer`] is dropped.
59pub struct BufferPool {
60 /// One free list per size class.
61 free_lists: Arc<[Mutex<Vec<Vec<u8>>>; 9]>,
62 /// Maximum number of buffers to retain per size class.
63 capacity_per_class: usize,
64 /// Total [`acquire`] calls.
65 allocations: AtomicU64,
66 /// Acquisitions served from the free list.
67 recycles: AtomicU64,
68 /// Acquisitions that fell back to fresh allocation.
69 misses: AtomicU64,
70}
71
72impl BufferPool {
73 /// Create a new pool wrapping at most `capacity_per_class` idle buffers per
74 /// size class.
75 pub fn new(capacity_per_class: usize) -> Arc<Self> {
76 Arc::new(Self {
77 free_lists: Arc::new([
78 Mutex::new(Vec::new()),
79 Mutex::new(Vec::new()),
80 Mutex::new(Vec::new()),
81 Mutex::new(Vec::new()),
82 Mutex::new(Vec::new()),
83 Mutex::new(Vec::new()),
84 Mutex::new(Vec::new()),
85 Mutex::new(Vec::new()),
86 Mutex::new(Vec::new()),
87 ]),
88 capacity_per_class,
89 allocations: AtomicU64::new(0),
90 recycles: AtomicU64::new(0),
91 misses: AtomicU64::new(0),
92 })
93 }
94
95 /// Acquire a buffer large enough to hold `min_size` bytes.
96 ///
97 /// The returned [`PooledBuffer`] tracks its size class and will return
98 /// itself to the pool when dropped.
99 ///
100 /// If `min_size` exceeds the largest size class (1 MB), a fresh allocation
101 /// is issued on every call and the buffer is **not** pooled on release. In
102 /// this case `misses` is incremented.
103 pub fn acquire(self: &Arc<Self>, min_size: usize) -> PooledBuffer {
104 self.allocations.fetch_add(1, Ordering::Relaxed);
105
106 // Find the smallest size class that fits.
107 let class_idx = SIZE_CLASSES.iter().position(|&cap| cap >= min_size);
108
109 match class_idx {
110 Some(idx) => {
111 let class_size = SIZE_CLASSES[idx];
112 // Try to pop an existing buffer from the free list.
113 let maybe_buf = self.free_lists[idx].lock().pop();
114 match maybe_buf {
115 Some(mut buf) => {
116 // Reuse: clear content, keep allocation.
117 buf.clear();
118 buf.resize(class_size, 0u8);
119 self.recycles.fetch_add(1, Ordering::Relaxed);
120 PooledBuffer {
121 data: Some(buf),
122 pool: Arc::clone(self),
123 size_class: idx,
124 }
125 }
126 None => {
127 // Free list is empty — allocate fresh.
128 self.misses.fetch_add(1, Ordering::Relaxed);
129 let buf = vec![0u8; class_size];
130 PooledBuffer {
131 data: Some(buf),
132 pool: Arc::clone(self),
133 size_class: idx,
134 }
135 }
136 }
137 }
138 None => {
139 // Requested size exceeds every size class — allocate without
140 // pooling (size_class == usize::MAX signals "no pool slot").
141 self.misses.fetch_add(1, Ordering::Relaxed);
142 let buf = vec![0u8; min_size];
143 PooledBuffer {
144 data: Some(buf),
145 pool: Arc::clone(self),
146 size_class: usize::MAX,
147 }
148 }
149 }
150 }
151
152 /// Return a buffer to the pool.
153 ///
154 /// This is called automatically by [`PooledBuffer::drop`].
155 pub fn release(&self, buffer: Vec<u8>, size_class: usize) {
156 // Oversized buffers (sentinel class) are simply dropped.
157 if size_class >= SIZE_CLASSES.len() {
158 return;
159 }
160 let mut list = self.free_lists[size_class].lock();
161 if list.len() < self.capacity_per_class {
162 list.push(buffer);
163 }
164 // If the list is full the buffer is simply dropped (list lock released).
165 }
166
167 /// Total number of `acquire` calls.
168 pub fn allocations(&self) -> u64 {
169 self.allocations.load(Ordering::Relaxed)
170 }
171
172 /// Number of acquisitions served from the free list.
173 pub fn recycles(&self) -> u64 {
174 self.recycles.load(Ordering::Relaxed)
175 }
176
177 /// Number of acquisitions that required a fresh heap allocation.
178 pub fn misses(&self) -> u64 {
179 self.misses.load(Ordering::Relaxed)
180 }
181
182 /// Return a snapshot of all counters.
183 pub fn stats(&self) -> BufferPoolStats {
184 let allocations = self.allocations();
185 let recycles = self.recycles();
186 let misses = self.misses();
187 let recycle_rate = if allocations == 0 {
188 0.0
189 } else {
190 recycles as f64 / allocations as f64
191 };
192 BufferPoolStats {
193 allocations,
194 recycles,
195 misses,
196 recycle_rate,
197 }
198 }
199}
200
201// ---------------------------------------------------------------------------
202// PooledBuffer
203// ---------------------------------------------------------------------------
204
205/// A byte buffer borrowed from a [`BufferPool`].
206///
207/// The buffer is automatically returned to the pool when this value is dropped.
208pub struct PooledBuffer {
209 /// `None` only transiently during `Drop`.
210 data: Option<Vec<u8>>,
211 pool: Arc<BufferPool>,
212 /// Index into [`SIZE_CLASSES`], or `usize::MAX` for oversized allocations.
213 size_class: usize,
214}
215
216impl Drop for PooledBuffer {
217 fn drop(&mut self) {
218 let buf = self.data.take().unwrap_or_default();
219 self.pool.release(buf, self.size_class);
220 }
221}
222
223impl Deref for PooledBuffer {
224 type Target = [u8];
225
226 fn deref(&self) -> &[u8] {
227 self.data.as_deref().unwrap_or(&[])
228 }
229}
230
231impl DerefMut for PooledBuffer {
232 fn deref_mut(&mut self) -> &mut [u8] {
233 self.data.as_deref_mut().unwrap_or(&mut [])
234 }
235}
236
237impl AsRef<[u8]> for PooledBuffer {
238 fn as_ref(&self) -> &[u8] {
239 self.deref()
240 }
241}
242
243impl AsMut<[u8]> for PooledBuffer {
244 fn as_mut(&mut self) -> &mut [u8] {
245 self.deref_mut()
246 }
247}
248
249impl PooledBuffer {
250 /// Length of the buffer in bytes.
251 pub fn len(&self) -> usize {
252 self.data.as_ref().map_or(0, |d| d.len())
253 }
254
255 /// `true` if the buffer has zero length.
256 pub fn is_empty(&self) -> bool {
257 self.len() == 0
258 }
259
260 /// Allocated capacity of the inner `Vec`.
261 pub fn capacity(&self) -> usize {
262 self.data.as_ref().map_or(0, |d| d.capacity())
263 }
264
265 /// Immutable byte slice view.
266 pub fn as_slice(&self) -> &[u8] {
267 self.deref()
268 }
269
270 /// Mutable byte slice view.
271 pub fn as_mut_slice(&mut self) -> &mut [u8] {
272 self.deref_mut()
273 }
274
275 /// Resize the inner `Vec`, filling new elements with `value`.
276 pub fn resize(&mut self, new_len: usize, value: u8) {
277 if let Some(ref mut v) = self.data {
278 v.resize(new_len, value);
279 }
280 }
281}
282
283// ---------------------------------------------------------------------------
284// Tests
285// ---------------------------------------------------------------------------
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290
291 /// Verify that a buffer acquired for `min_size` bytes has at least that
292 /// many bytes available.
293 #[test]
294 fn test_buffer_pool_basic_acquire() {
295 let pool = BufferPool::new(4);
296 let buf = pool.acquire(1_000);
297 assert!(
298 buf.len() >= 1_000,
299 "buffer must be at least the requested size"
300 );
301 }
302
303 /// Drop a buffer, re-acquire one, and verify the recycle counter climbs.
304 #[test]
305 fn test_buffer_pool_recycle() {
306 let pool = BufferPool::new(4);
307 let buf = pool.acquire(4_096);
308 drop(buf);
309 let _buf2 = pool.acquire(4_096);
310 assert!(pool.recycles() >= 1, "second acquire should be a recycle");
311 }
312
313 /// Verify that buffers are rounded up to the correct size-class capacity.
314 ///
315 /// Requested sizes and expected capacities:
316 /// - 1 byte → 4 KB class (4 096)
317 /// - 4 096 → 4 KB class (4 096)
318 /// - 8 193 → 16 KB class (16 384)
319 /// - 65 537 → 128 KB class (131 072)
320 #[test]
321 fn test_buffer_pool_size_classes() {
322 let pool = BufferPool::new(4);
323
324 let cases: &[(usize, usize)] = &[
325 (1, SIZE_CLASSES[0]), // 1 → 4 KB
326 (4_096, SIZE_CLASSES[0]), // exactly 4 KB → 4 KB
327 (8_193, SIZE_CLASSES[2]), // just over 8 KB → 16 KB
328 (65_537, SIZE_CLASSES[4 + 1]), // just over 64 KB → 128 KB
329 ];
330
331 for &(req, expected_cap) in cases {
332 let buf = pool.acquire(req);
333 assert_eq!(
334 buf.capacity(),
335 expected_cap,
336 "request {} → expected size-class capacity {}",
337 req,
338 expected_cap
339 );
340 }
341 }
342
343 /// The free list must not grow beyond `capacity_per_class`.
344 #[test]
345 fn test_buffer_pool_capacity_limit() {
346 let capacity = 3usize;
347 let pool = BufferPool::new(capacity);
348
349 // Acquire capacity + 1 buffers, all in the same size class.
350 let buffers: Vec<_> = (0..capacity + 1).map(|_| pool.acquire(4_096)).collect();
351 // Drop them all — the pool may keep at most `capacity`.
352 drop(buffers);
353
354 let free_count = pool.free_lists[0].lock().len();
355 assert!(
356 free_count <= capacity,
357 "free list must not exceed capacity (got {})",
358 free_count
359 );
360 }
361
362 /// Write via `DerefMut`, read via `Deref`.
363 #[test]
364 fn test_pooled_buffer_deref() {
365 let pool = BufferPool::new(4);
366 let mut buf = pool.acquire(4_096);
367
368 buf[0] = 0xAA;
369 buf[1] = 0xBB;
370
371 assert_eq!(buf[0], 0xAA);
372 assert_eq!(buf[1], 0xBB);
373 }
374
375 /// Counters must reflect acquire / recycle / miss semantics.
376 #[test]
377 fn test_buffer_pool_stats() {
378 let pool = BufferPool::new(4);
379
380 // First acquire: miss (free list is empty).
381 let b1 = pool.acquire(4_096);
382 assert_eq!(pool.allocations(), 1);
383 assert_eq!(pool.misses(), 1);
384 assert_eq!(pool.recycles(), 0);
385
386 // Release b1 back to the pool.
387 drop(b1);
388
389 // Second acquire: recycle.
390 let _b2 = pool.acquire(4_096);
391 assert_eq!(pool.allocations(), 2);
392 assert_eq!(pool.misses(), 1);
393 assert_eq!(pool.recycles(), 1);
394
395 let stats = pool.stats();
396 assert_eq!(stats.allocations, 2);
397 assert_eq!(stats.recycles, 1);
398 assert!((stats.recycle_rate - 0.5).abs() < f64::EPSILON);
399 }
400}