#[cfg(feature = "buffer-pool")]
use std::sync::atomic::{AtomicUsize, Ordering};
use aligned_box::AlignedBox;
use std::error::Error;
#[cfg(feature = "buffer-pool")]
use std::io::{Error as IoError, ErrorKind, Result as IoResult};
#[cfg(all(
feature = "buffer-pool",
not(feature = "tokio-runtime"),
feature = "async-io-runtime"
))]
use async_lock::Mutex;
#[cfg(all(
feature = "buffer-pool",
not(feature = "async-io-runtime"),
feature = "tokio-runtime"
))]
use tokio::sync::Mutex;
const BUFFER_ALIGNMENT: usize = 512;
#[cfg(feature = "buffer-pool")]
const DEFAULT_POOL_CAPACITY: usize = 64;
pub struct AlignedBuffer {
inner: AlignedBox<[u8]>,
capacity: usize,
}
impl std::fmt::Debug for AlignedBuffer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AlignedBuffer")
.field("capacity", &self.capacity)
.field("len", &self.inner.len())
.finish()
}
}
impl AlignedBuffer {
pub fn try_new(size: usize) -> Result<Self, Box<dyn Error + Send + Sync>> {
let inner = AlignedBox::<[u8]>::slice_from_default(BUFFER_ALIGNMENT, size)
.map_err(|err| format!("aligned buffer allocation failed: {err:?}"))?;
Ok(Self {
inner,
capacity: size,
})
}
#[cfg(feature = "buffer-pool")]
pub fn capacity(&self) -> usize {
self.capacity
}
#[cfg(feature = "buffer-pool")]
pub fn reset(&mut self) {
}
pub fn as_mut_slice(&mut self) -> &mut [u8] {
&mut self.inner[..]
}
pub fn as_slice(&self) -> &[u8] {
&self.inner[..]
}
}
impl std::ops::Deref for AlignedBuffer {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}
impl std::ops::DerefMut for AlignedBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
self.as_mut_slice()
}
}
#[cfg(feature = "buffer-pool")]
#[derive(Debug)]
pub struct BufferPool {
pool: Mutex<Vec<AlignedBuffer>>,
buffer_size: usize,
max_capacity: usize,
acquisitions: AtomicUsize,
hits: AtomicUsize,
}
#[cfg(feature = "buffer-pool")]
impl BufferPool {
pub fn new(buffer_size: usize) -> Self {
Self::with_capacity(buffer_size, DEFAULT_POOL_CAPACITY)
}
pub fn with_capacity(buffer_size: usize, max_capacity: usize) -> Self {
Self {
pool: Mutex::new(Vec::with_capacity(max_capacity)),
buffer_size,
max_capacity,
acquisitions: AtomicUsize::new(0),
hits: AtomicUsize::new(0),
}
}
pub async fn acquire(&self) -> IoResult<AlignedBuffer> {
self.acquisitions.fetch_add(1, Ordering::Relaxed);
let mut pool = self.pool.lock().await;
if let Some(mut buf) = pool.pop() {
self.hits.fetch_add(1, Ordering::Relaxed);
buf.reset();
Ok(buf)
} else {
drop(pool);
AlignedBuffer::try_new(self.buffer_size)
.map_err(|err| IoError::new(ErrorKind::Other, err))
}
}
#[cfg(all(not(feature = "async-io-runtime"), feature = "tokio-runtime"))]
pub fn try_acquire(&self) -> Option<IoResult<AlignedBuffer>> {
self.acquisitions.fetch_add(1, Ordering::Relaxed);
if let Ok(mut pool) = self.pool.try_lock() {
if let Some(mut buf) = pool.pop() {
self.hits.fetch_add(1, Ordering::Relaxed);
buf.reset();
return Some(Ok(buf));
}
}
Some(
AlignedBuffer::try_new(self.buffer_size)
.map_err(|err| IoError::new(ErrorKind::Other, err)),
)
}
pub async fn release(&self, buf: AlignedBuffer) {
if buf.capacity() != self.buffer_size {
return;
}
let mut pool = self.pool.lock().await;
if pool.len() < self.max_capacity {
pool.push(buf);
}
}
#[cfg(all(not(feature = "async-io-runtime"), feature = "tokio-runtime"))]
pub fn try_release(&self, buf: AlignedBuffer) {
if buf.capacity() != self.buffer_size {
return;
}
if let Ok(mut pool) = self.pool.try_lock() {
if pool.len() < self.max_capacity {
pool.push(buf);
}
}
}
pub async fn available(&self) -> usize {
self.pool.lock().await.len()
}
pub fn stats(&self) -> BufferPoolStats {
let acquisitions = self.acquisitions.load(Ordering::Relaxed);
let hits = self.hits.load(Ordering::Relaxed);
BufferPoolStats {
acquisitions,
hits,
hit_rate: if acquisitions > 0 {
hits as f64 / acquisitions as f64
} else {
0.0
},
}
}
}
#[cfg(feature = "buffer-pool")]
#[derive(Debug, Clone, Copy)]
pub struct BufferPoolStats {
pub acquisitions: usize,
pub hits: usize,
pub hit_rate: f64,
}
#[cfg(all(test, feature = "buffer-pool"))]
mod tests {
use super::*;
#[cfg(all(not(feature = "async-io-runtime"), feature = "tokio-runtime"))]
#[tokio::test]
async fn test_buffer_pool_basic() {
let pool = BufferPool::new(4096);
let buf1 = pool.acquire().await.expect("buffer allocation failed");
assert_eq!(buf1.capacity(), 4096);
pool.release(buf1).await;
let buf2 = pool.acquire().await.expect("buffer allocation failed");
assert_eq!(buf2.capacity(), 4096);
let stats = pool.stats();
assert_eq!(stats.acquisitions, 2);
assert_eq!(stats.hits, 1);
}
#[cfg(all(not(feature = "async-io-runtime"), feature = "tokio-runtime"))]
#[tokio::test]
async fn test_buffer_pool_capacity() {
let pool = BufferPool::with_capacity(1024, 2);
let buf1 = pool.acquire().await.expect("buffer allocation failed");
let buf2 = pool.acquire().await.expect("buffer allocation failed");
let buf3 = pool.acquire().await.expect("buffer allocation failed");
pool.release(buf1).await;
pool.release(buf2).await;
pool.release(buf3).await;
assert_eq!(pool.available().await, 2);
}
}