1use crate::types::BufferSize;
2use crossbeam::queue::SegQueue;
3use std::ops::Deref;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use tracing::info;
7
8pub struct PooledBuffer {
22 buffer: Vec<u8>,
23 initialized: usize,
24 pool: Arc<SegQueue<Vec<u8>>>,
25 pool_size: Arc<AtomicUsize>,
26 max_pool_size: usize,
27}
28
29impl PooledBuffer {
30 #[must_use]
32 #[inline]
33 pub fn capacity(&self) -> usize {
34 self.buffer.len()
35 }
36
37 #[must_use]
39 #[inline]
40 pub fn initialized(&self) -> usize {
41 self.initialized
42 }
43
44 pub async fn read_from<R>(&mut self, reader: &mut R) -> std::io::Result<usize>
46 where
47 R: tokio::io::AsyncReadExt + Unpin,
48 {
49 let n = reader.read(&mut self.buffer[..]).await?;
50 self.initialized = n;
51 Ok(n)
52 }
53
54 #[inline]
59 pub fn copy_from_slice(&mut self, data: &[u8]) {
60 assert!(
61 data.len() <= self.buffer.len(),
62 "data exceeds buffer capacity"
63 );
64 self.buffer[..data.len()].copy_from_slice(data);
65 self.initialized = data.len();
66 }
67
68 #[must_use]
86 #[inline]
87 pub fn as_mut_slice(&mut self) -> &mut [u8] {
88 &mut self.buffer[..]
89 }
90}
91
92impl Deref for PooledBuffer {
93 type Target = [u8];
94
95 #[inline]
96 fn deref(&self) -> &Self::Target {
97 &self.buffer[..self.initialized]
99 }
100}
101
102impl AsRef<[u8]> for PooledBuffer {
105 #[inline]
106 fn as_ref(&self) -> &[u8] {
107 &self.buffer[..self.initialized]
109 }
110}
111
112impl Drop for PooledBuffer {
113 fn drop(&mut self) {
114 let mut current_size = self.pool_size.load(Ordering::Relaxed);
116 while current_size < self.max_pool_size {
117 match self.pool_size.compare_exchange_weak(
118 current_size,
119 current_size + 1,
120 Ordering::Relaxed,
121 Ordering::Relaxed,
122 ) {
123 Ok(_) => {
124 let buffer = std::mem::take(&mut self.buffer);
125 self.pool.push(buffer);
126 return;
127 }
128 Err(new_size) => {
129 current_size = new_size;
130 }
131 }
132 }
133 }
135}
136
137#[derive(Debug, Clone)]
140pub struct BufferPool {
141 pool: Arc<SegQueue<Vec<u8>>>,
142 buffer_size: BufferSize,
143 max_pool_size: usize,
144 pool_size: Arc<AtomicUsize>,
145}
146
147impl BufferPool {
148 #[allow(clippy::uninit_vec)]
167 fn create_aligned_buffer(size: usize) -> Vec<u8> {
168 let page_size = 4096;
170 let aligned_size = size.div_ceil(page_size) * page_size;
171
172 let mut buffer = Vec::with_capacity(aligned_size);
174 unsafe {
181 buffer.set_len(size);
182 }
183 buffer
184 }
185
186 #[must_use]
208 pub fn new(buffer_size: BufferSize, max_pool_size: usize) -> Self {
209 let pool = Arc::new(SegQueue::new());
210 let pool_size = Arc::new(AtomicUsize::new(0));
211
212 info!(
214 "Pre-allocating {} buffers of {}KB each ({}MB total)",
215 max_pool_size,
216 buffer_size.get() / 1024,
217 (max_pool_size * buffer_size.get()) / (1024 * 1024)
218 );
219
220 for _ in 0..max_pool_size {
221 let buffer = Self::create_aligned_buffer(buffer_size.get());
222 pool.push(buffer);
223 pool_size.fetch_add(1, Ordering::Relaxed);
224 }
225
226 info!("Buffer pool pre-allocation complete");
227
228 Self {
229 pool,
230 buffer_size,
231 max_pool_size,
232 pool_size,
233 }
234 }
235
236 #[cfg(test)]
241 #[must_use]
242 pub fn for_tests() -> Self {
243 Self::new(BufferSize::try_new(8192).expect("valid size"), 4)
244 }
245
246 pub async fn acquire(&self) -> PooledBuffer {
266 let buffer = if let Some(buffer) = self.pool.pop() {
267 self.pool_size.fetch_sub(1, Ordering::Relaxed);
268 debug_assert_eq!(buffer.len(), self.buffer_size.get());
270 buffer
271 } else {
272 Self::create_aligned_buffer(self.buffer_size.get())
274 };
275
276 PooledBuffer {
277 buffer,
278 initialized: 0, pool: Arc::clone(&self.pool),
280 pool_size: Arc::clone(&self.pool_size),
281 max_pool_size: self.max_pool_size,
282 }
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289
290 #[tokio::test]
291 async fn test_buffer_pool_creation() {
292 let pool = BufferPool::new(BufferSize::try_new(8192).unwrap(), 10);
293
294 let buffer1 = pool.acquire().await;
296 assert_eq!(buffer1.capacity(), 8192);
297 assert_eq!(buffer1.initialized(), 0); }
300
301 #[tokio::test]
302 async fn test_buffer_pool_get_and_return() {
303 let pool = BufferPool::new(BufferSize::try_new(4096).unwrap(), 5);
304
305 let buffer = pool.acquire().await;
307 assert_eq!(buffer.capacity(), 4096);
308 assert_eq!(buffer.initialized(), 0);
309
310 drop(buffer);
315
316 let buffer2 = pool.acquire().await;
318 assert_eq!(buffer2.capacity(), 4096);
319 }
320
321 #[tokio::test]
322 async fn test_buffer_pool_exhaustion() {
323 let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 2);
324
325 let buf1 = pool.acquire().await;
327 let buf2 = pool.acquire().await;
328
329 let buf3 = pool.acquire().await;
331 assert_eq!(buf3.capacity(), 1024);
332
333 drop(buf1);
335 drop(buf2);
336 drop(buf3);
337 }
338
339 #[tokio::test]
340 async fn test_buffer_pool_concurrent_access() {
341 let pool = BufferPool::new(BufferSize::try_new(2048).unwrap(), 10);
342
343 let mut handles = vec![];
345
346 for _ in 0..20 {
347 let pool_clone = pool.clone();
348 let handle = tokio::spawn(async move {
349 let buffer = pool_clone.acquire().await;
350 assert_eq!(buffer.capacity(), 2048);
351 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
353 });
354 handles.push(handle);
355 }
356
357 for handle in handles {
359 handle.await.unwrap();
360 }
361 }
362
363 #[tokio::test]
364 async fn test_buffer_alignment() {
365 let pool = BufferPool::new(BufferSize::try_new(8192).unwrap(), 1);
366 let buffer = pool.acquire().await;
367
368 assert!(buffer.capacity() >= 8192);
370 assert_eq!(buffer.capacity() % 4096, 0);
372 }
373
374 #[tokio::test]
375 async fn test_buffer_clear_and_resize() {
376 let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 2);
377
378 let mut buffer = pool.acquire().await;
379
380 let data = vec![42u8; 101];
382 buffer.copy_from_slice(&data);
383 assert_eq!(buffer.initialized(), 101);
384
385 drop(buffer);
387
388 let buffer2 = pool.acquire().await;
390 assert_eq!(buffer2.capacity(), 1024);
391 }
393
394 #[tokio::test]
395 async fn test_buffer_pool_max_size_enforcement() {
396 let pool = BufferPool::new(BufferSize::try_new(512).unwrap(), 3);
397
398 let buf1 = pool.acquire().await;
400 let buf2 = pool.acquire().await;
401 let buf3 = pool.acquire().await;
402
403 let buf4 = pool.acquire().await;
405
406 drop(buf1);
408 drop(buf2);
409 drop(buf3);
410 drop(buf4);
411
412 }
415
416 #[tokio::test]
417 async fn test_buffer_wrong_size_not_returned() {
418 let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 2);
419
420 let buffer = pool.acquire().await;
421 assert_eq!(buffer.capacity(), 1024);
422
423 drop(buffer);
425 }
426
427 #[tokio::test]
428 async fn test_buffer_pool_multiple_get_return_cycles() {
429 let pool = BufferPool::new(BufferSize::try_new(4096).unwrap(), 5);
430
431 for i in 0..20 {
433 let mut buffer = pool.acquire().await;
434 assert_eq!(buffer.capacity(), 4096);
435
436 let data = vec![i as u8; 1];
438 buffer.copy_from_slice(&data);
439 assert_eq!(buffer.initialized(), 1);
440 }
441 }
442
443 #[test]
444 fn test_buffer_pool_clone() {
445 let pool1 = BufferPool::new(BufferSize::try_new(1024).unwrap(), 5);
446 let _pool2 = pool1.clone();
447
448 }
451
452 #[tokio::test]
453 async fn test_different_buffer_sizes() {
454 let small_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 5);
455 let medium_pool = BufferPool::new(BufferSize::try_new(8192).unwrap(), 5);
456 let large_pool = BufferPool::new(BufferSize::try_new(65536).unwrap(), 5);
457
458 let small_buf = small_pool.acquire().await;
459 let medium_buf = medium_pool.acquire().await;
460 let large_buf = large_pool.acquire().await;
461
462 assert_eq!(small_buf.capacity(), 1024);
463 assert_eq!(medium_buf.capacity(), 8192);
464 assert_eq!(large_buf.capacity(), 65536);
465
466 }
468
469 #[tokio::test]
470 async fn test_as_mut_slice() {
471 let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 5);
472 let mut buffer = pool.acquire().await;
473
474 let slice = buffer.as_mut_slice();
476 assert_eq!(slice.len(), 1024);
477
478 slice[0] = b'H';
480 slice[1] = b'i';
481 slice[2] = b'!';
482
483 for (i, byte) in slice.iter_mut().enumerate() {
485 *byte = (i % 256) as u8;
486 }
487
488 assert_eq!(buffer.initialized(), 0);
491 }
492
493 #[tokio::test]
494 async fn test_buffer_pool_stress() {
495 let pool = BufferPool::new(BufferSize::try_new(4096).unwrap(), 10);
496
497 let mut handles = vec![];
499
500 for _ in 0..100 {
501 let pool_clone = pool.clone();
502 let handle = tokio::spawn(async move {
503 for _ in 0..10 {
504 let buffer = pool_clone.acquire().await;
505 assert_eq!(buffer.capacity(), 4096);
506 }
507 });
508 handles.push(handle);
509 }
510
511 for handle in handles {
512 handle.await.unwrap();
513 }
514 }
515
516 #[tokio::test]
517 async fn test_pooled_buffer_deref() {
518 let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 5);
519 let mut buffer = pool.acquire().await;
520
521 assert_eq!(buffer.len(), 0);
523
524 buffer.copy_from_slice(b"Hello");
526
527 assert_eq!(buffer.len(), 5);
529 assert_eq!(&*buffer, b"Hello");
530 }
531
532 #[tokio::test]
533 async fn test_pooled_buffer_as_ref() {
534 let pool = BufferPool::new(BufferSize::try_new(512).unwrap(), 5);
535 let mut buffer = pool.acquire().await;
536
537 buffer.copy_from_slice(b"Test data");
538
539 let slice: &[u8] = buffer.as_ref();
541 assert_eq!(slice, b"Test data");
542 assert_eq!(slice.len(), 9);
543 }
544
545 #[tokio::test]
546 async fn test_copy_from_slice_updates_initialized() {
547 let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 5);
548 let mut buffer = pool.acquire().await;
549
550 assert_eq!(buffer.initialized(), 0);
551
552 buffer.copy_from_slice(b"abc");
553 assert_eq!(buffer.initialized(), 3);
554
555 buffer.copy_from_slice(b"longer text");
556 assert_eq!(buffer.initialized(), 11);
557 }
558
559 #[tokio::test]
560 #[should_panic(expected = "data exceeds buffer capacity")]
561 async fn test_copy_from_slice_panic_on_overflow() {
562 let pool = BufferPool::new(BufferSize::try_new(10).unwrap(), 5);
563 let mut buffer = pool.acquire().await;
564
565 let too_large = vec![0u8; 20];
566 buffer.copy_from_slice(&too_large); }
568
569 #[tokio::test]
570 async fn test_buffer_pool_debug() {
571 let pool = BufferPool::new(BufferSize::try_new(2048).unwrap(), 5);
572 let debug_str = format!("{:?}", pool);
573 assert!(debug_str.contains("BufferPool"));
574 }
575
576 #[tokio::test]
577 async fn test_buffer_initialized_tracking() {
578 let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 5);
579 let mut buffer = pool.acquire().await;
580
581 buffer.copy_from_slice(b"First");
583 assert_eq!(buffer.initialized(), 5);
584 assert_eq!(&*buffer, b"First");
585
586 buffer.copy_from_slice(b"Second write");
587 assert_eq!(buffer.initialized(), 12);
588 assert_eq!(&*buffer, b"Second write");
589 }
590
591 #[tokio::test]
592 async fn test_buffer_capacity_vs_initialized() {
593 let pool = BufferPool::new(BufferSize::try_new(8192).unwrap(), 5);
594 let mut buffer = pool.acquire().await;
595
596 assert_eq!(buffer.capacity(), 8192);
598
599 assert_eq!(buffer.initialized(), 0);
601
602 buffer.copy_from_slice(b"Small");
603 assert_eq!(buffer.capacity(), 8192);
604 assert_eq!(buffer.initialized(), 5);
605 }
606
607 #[tokio::test]
608 async fn test_as_mut_slice_capacity() {
609 let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 5);
610 let mut buffer = pool.acquire().await;
611
612 let slice = buffer.as_mut_slice();
614 assert_eq!(slice.len(), 1024);
615 }
616
617 #[tokio::test]
618 async fn test_empty_slice_copy() {
619 let pool = BufferPool::new(BufferSize::try_new(512).unwrap(), 5);
620 let mut buffer = pool.acquire().await;
621
622 buffer.copy_from_slice(&[]);
624 assert_eq!(buffer.initialized(), 0);
625 assert_eq!(&*buffer, b"");
626 }
627
628 #[tokio::test]
629 async fn test_buffer_reuse_preserves_capacity() {
630 let pool = BufferPool::new(BufferSize::try_new(2048).unwrap(), 5);
631
632 {
633 let mut buffer = pool.acquire().await;
634 buffer.copy_from_slice(b"test");
635 assert_eq!(buffer.capacity(), 2048);
636 } let buffer2 = pool.acquire().await;
639 assert_eq!(buffer2.capacity(), 2048);
641 }
642
643 #[test]
644 fn test_buffer_size_alignment() {
645 let buffer = BufferPool::create_aligned_buffer(1000);
647 assert_eq!(buffer.len(), 1000);
649 assert_eq!(buffer.capacity() % 4096, 0);
650
651 let buffer2 = BufferPool::create_aligned_buffer(8192);
652 assert_eq!(buffer2.len(), 8192);
653 assert_eq!(buffer2.capacity() % 4096, 0);
654 }
655}