1use parking_lot::Mutex;
8use std::collections::VecDeque;
9use std::sync::atomic::{AtomicU64, Ordering};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
13pub enum BufferSize {
14 Small = 4096,
16 Medium = 65536,
18 Large = 1048576,
20}
21
22impl BufferSize {
23 #[inline]
25 pub fn capacity(self) -> usize {
26 self as usize
27 }
28
29 pub fn for_capacity(required: usize) -> Self {
31 if required <= Self::Small.capacity() {
32 Self::Small
33 } else if required <= Self::Medium.capacity() {
34 Self::Medium
35 } else {
36 Self::Large
37 }
38 }
39}
40
41#[derive(Debug, Default)]
43pub struct PoolStatistics {
44 pub hits: AtomicU64,
46 pub misses: AtomicU64,
48 pub returns: AtomicU64,
50 pub discards: AtomicU64,
52}
53
54impl PoolStatistics {
55 pub fn hit_rate(&self) -> f64 {
57 let hits = self.hits.load(Ordering::Relaxed);
58 let misses = self.misses.load(Ordering::Relaxed);
59 let total = hits + misses;
60
61 if total == 0 {
62 0.0
63 } else {
64 hits as f64 / total as f64
65 }
66 }
67}
68
69#[derive(Debug, Clone)]
71pub struct PoolConfig {
72 pub max_buffers_per_size: usize,
74 pub collect_stats: bool,
76}
77
78impl Default for PoolConfig {
79 fn default() -> Self {
80 Self {
81 max_buffers_per_size: 16, collect_stats: true,
83 }
84 }
85}
86
87#[derive(Debug)]
89struct SizePool {
90 buffers: VecDeque<Vec<u8>>,
91 max_capacity: usize,
92}
93
94impl SizePool {
95 fn new(max_capacity: usize) -> Self {
96 Self {
97 buffers: VecDeque::new(),
98 max_capacity,
99 }
100 }
101
102 fn get_buffer(&mut self) -> Option<Vec<u8>> {
104 self.buffers.pop_front()
105 }
106
107 fn return_buffer(&mut self, mut buffer: Vec<u8>) -> bool {
110 if self.buffers.len() < self.max_capacity {
111 buffer.clear(); self.buffers.push_back(buffer);
113 true
114 } else {
115 false }
117 }
118
119 fn len(&self) -> usize {
121 self.buffers.len()
122 }
123}
124
125#[derive(Debug)]
127pub struct BufferPool {
128 small_pool: Mutex<SizePool>,
129 medium_pool: Mutex<SizePool>,
130 large_pool: Mutex<SizePool>,
131 stats: PoolStatistics,
132 config: PoolConfig,
133}
134
135impl BufferPool {
136 pub fn new() -> Self {
138 Self::with_config(PoolConfig::default())
139 }
140
141 pub fn with_config(config: PoolConfig) -> Self {
143 Self {
144 small_pool: Mutex::new(SizePool::new(config.max_buffers_per_size)),
145 medium_pool: Mutex::new(SizePool::new(config.max_buffers_per_size)),
146 large_pool: Mutex::new(SizePool::new(config.max_buffers_per_size)),
147 stats: PoolStatistics::default(),
148 config,
149 }
150 }
151
152 pub fn get_buffer(&self, size: BufferSize) -> PooledBuffer<'_> {
155 let pool = match size {
156 BufferSize::Small => &self.small_pool,
157 BufferSize::Medium => &self.medium_pool,
158 BufferSize::Large => &self.large_pool,
159 };
160
161 let buffer = {
162 let mut pool_guard = pool.lock();
163 pool_guard.get_buffer()
164 };
165
166 let buffer = if let Some(mut buf) = buffer {
167 if self.config.collect_stats {
169 self.stats.hits.fetch_add(1, Ordering::Relaxed);
170 }
171 buf.reserve(size.capacity());
173 buf
174 } else {
175 if self.config.collect_stats {
177 self.stats.misses.fetch_add(1, Ordering::Relaxed);
178 }
179 Vec::with_capacity(size.capacity())
180 };
181
182 PooledBuffer {
183 buffer: Some(buffer),
184 size,
185 pool: self,
186 }
187 }
188
189 fn return_buffer(&self, buffer: Vec<u8>, size: BufferSize) {
191 if self.config.collect_stats {
192 self.stats.returns.fetch_add(1, Ordering::Relaxed);
193 }
194
195 let pool = match size {
196 BufferSize::Small => &self.small_pool,
197 BufferSize::Medium => &self.medium_pool,
198 BufferSize::Large => &self.large_pool,
199 };
200
201 let accepted = {
202 let mut pool_guard = pool.lock();
203 pool_guard.return_buffer(buffer)
204 };
205
206 if !accepted && self.config.collect_stats {
207 self.stats.discards.fetch_add(1, Ordering::Relaxed);
208 }
209 }
210
211 pub fn statistics(&self) -> &PoolStatistics {
213 &self.stats
214 }
215
216 pub fn config(&self) -> &PoolConfig {
218 &self.config
219 }
220
221 pub fn pool_sizes(&self) -> (usize, usize, usize) {
223 (
224 self.small_pool.lock().len(),
225 self.medium_pool.lock().len(),
226 self.large_pool.lock().len(),
227 )
228 }
229}
230
231impl Default for BufferPool {
232 fn default() -> Self {
233 Self::new()
234 }
235}
236
237#[derive(Debug)]
240pub struct PooledBuffer<'a> {
241 buffer: Option<Vec<u8>>,
242 size: BufferSize,
243 pool: &'a BufferPool,
244}
245
246impl PooledBuffer<'_> {
247 pub fn get_mut(&mut self) -> &mut Vec<u8> {
249 self.buffer.as_mut().expect("PooledBuffer buffer was taken")
250 }
251
252 pub fn get_ref(&self) -> &Vec<u8> {
254 self.buffer.as_ref().expect("PooledBuffer buffer was taken")
255 }
256
257 pub fn size(&self) -> BufferSize {
259 self.size
260 }
261
262 pub fn take(mut self) -> Vec<u8> {
265 self.buffer
266 .take()
267 .expect("PooledBuffer buffer was already taken")
268 }
269}
270
271impl std::ops::Deref for PooledBuffer<'_> {
272 type Target = Vec<u8>;
273
274 fn deref(&self) -> &Self::Target {
275 self.get_ref()
276 }
277}
278
279impl std::ops::DerefMut for PooledBuffer<'_> {
280 fn deref_mut(&mut self) -> &mut Self::Target {
281 self.get_mut()
282 }
283}
284
285impl Drop for PooledBuffer<'_> {
286 fn drop(&mut self) {
287 if let Some(buffer) = self.buffer.take() {
288 self.pool.return_buffer(buffer, self.size);
289 }
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296 use std::sync::Arc;
297 use std::thread;
298
299 #[test]
300 fn test_buffer_size_for_capacity() {
301 assert_eq!(BufferSize::for_capacity(1024), BufferSize::Small);
302 assert_eq!(BufferSize::for_capacity(4096), BufferSize::Small);
303 assert_eq!(BufferSize::for_capacity(8192), BufferSize::Medium);
304 assert_eq!(BufferSize::for_capacity(65536), BufferSize::Medium);
305 assert_eq!(BufferSize::for_capacity(131072), BufferSize::Large);
306 }
307
308 #[test]
309 fn test_buffer_pool_basic_operation() {
310 let pool = BufferPool::new();
311
312 let mut buffer = pool.get_buffer(BufferSize::Medium);
314 buffer.extend_from_slice(&[1, 2, 3, 4, 5]);
315 assert_eq!(buffer.len(), 5);
316
317 drop(buffer);
319
320 let stats = pool.statistics();
322 assert_eq!(stats.misses.load(Ordering::Relaxed), 1); assert_eq!(stats.returns.load(Ordering::Relaxed), 1); }
325
326 #[test]
327 fn test_buffer_reuse() {
328 let pool = BufferPool::new();
329
330 {
332 let mut buffer = pool.get_buffer(BufferSize::Small);
333 buffer.extend_from_slice(&[1, 2, 3]);
334 }
335
336 {
338 let buffer = pool.get_buffer(BufferSize::Small);
339 assert_eq!(buffer.len(), 0); }
341
342 let stats = pool.statistics();
343 assert_eq!(stats.hits.load(Ordering::Relaxed), 1);
344 assert_eq!(stats.misses.load(Ordering::Relaxed), 1);
345 }
346
347 #[test]
348 fn test_bounded_capacity() {
349 let config = PoolConfig {
350 max_buffers_per_size: 2,
351 collect_stats: true,
352 };
353 let pool = BufferPool::with_config(config);
354
355 {
357 let _buf1 = pool.get_buffer(BufferSize::Small);
358 let _buf2 = pool.get_buffer(BufferSize::Small);
359 } {
363 let _buf3 = pool.get_buffer(BufferSize::Small); let _buf4 = pool.get_buffer(BufferSize::Small); let _buf5 = pool.get_buffer(BufferSize::Small); } let stats = pool.statistics();
369 assert_eq!(stats.discards.load(Ordering::Relaxed), 1);
370 }
371
372 #[test]
373 fn test_thread_safety() {
374 let pool = Arc::new(BufferPool::new());
375 let mut handles = vec![];
376
377 for _ in 0..4 {
379 let pool_clone = Arc::clone(&pool);
380 let handle = thread::spawn(move || {
381 for _ in 0..10 {
382 let mut buffer = pool_clone.get_buffer(BufferSize::Medium);
383 buffer.extend_from_slice(&[42; 100]);
384 }
386 });
387 handles.push(handle);
388 }
389
390 for handle in handles {
392 handle.join().unwrap();
393 }
394
395 let stats = pool.statistics();
397 assert!(stats.hits.load(Ordering::Relaxed) + stats.misses.load(Ordering::Relaxed) == 40);
398 }
399
400 #[test]
401 fn test_hit_rate_calculation() {
402 let pool = BufferPool::new();
403
404 assert_eq!(pool.statistics().hit_rate(), 0.0);
406
407 {
409 let _buf1 = pool.get_buffer(BufferSize::Small); let _buf2 = pool.get_buffer(BufferSize::Small); }
412 {
413 let _buf3 = pool.get_buffer(BufferSize::Small); let _buf4 = pool.get_buffer(BufferSize::Small); }
416
417 assert_eq!(pool.statistics().hit_rate(), 0.5);
419 }
420}