use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::buffer::PoolBuffer;
use crate::config::PoolConfig;
use crate::error::{Error, Result, MAX_REQUEST_BYTES};
use crate::size_class::SizeClassPool;
use crate::stats::PoolStats;
use crate::tls::take_tls_buffer;
pub(crate) const FOUR_KIB: usize = 4 * 1024;
pub(crate) const SIXTY_FOUR_KIB: usize = 64 * 1024;
pub(crate) const TWO_FIFTY_SIX_KIB: usize = 256 * 1024;
pub(crate) const ONE_MIB: usize = 1024 * 1024;
#[derive(Debug)]
pub struct BufferPool {
pub(crate) four_kib: Arc<SizeClassPool>,
pub(crate) sixty_four_kib: Arc<SizeClassPool>,
pub(crate) two_fifty_six_kib: Arc<SizeClassPool>,
pub(crate) one_mib: Arc<SizeClassPool>,
pub(crate) numa_node: Option<u32>,
pub(crate) stats: Arc<PoolStats>,
}
impl BufferPool {
#[must_use]
pub fn new(config: PoolConfig) -> Self {
Self {
four_kib: Arc::new(SizeClassPool::pooled(
FOUR_KIB,
config.four_kib_count,
config.numa_node,
)),
sixty_four_kib: Arc::new(SizeClassPool::pooled(
SIXTY_FOUR_KIB,
config.sixty_four_kib_count,
config.numa_node,
)),
two_fifty_six_kib: Arc::new(SizeClassPool::pooled(
TWO_FIFTY_SIX_KIB,
config.two_fifty_six_kib_count,
config.numa_node,
)),
one_mib: Arc::new(SizeClassPool::pooled(
ONE_MIB,
config.one_mib_count,
config.numa_node,
)),
numa_node: config.numa_node,
stats: Arc::new(PoolStats::default()),
}
}
#[must_use]
pub fn stats(&self) -> &PoolStats {
&self.stats
}
pub fn checkout(&self, min_bytes: usize) -> Result<PoolBuffer> {
if min_bytes > MAX_REQUEST_BYTES {
return Err(Error::RequestedLengthTooLarge {
requested: min_bytes,
});
}
if let Some((capacity, ptr, owner)) = take_tls_buffer(min_bytes) {
let out = self.stats.checked_out.fetch_add(1, Ordering::Relaxed) + 1;
self.stats
.peak_checked_out
.fetch_max(out, Ordering::Relaxed);
let bytes_out = self
.stats
.bytes_checked_out
.fetch_add(min_bytes, Ordering::Relaxed)
+ min_bytes;
self.stats
.peak_bytes_checked_out
.fetch_max(bytes_out, Ordering::Relaxed);
return Ok(PoolBuffer {
ptr,
len: min_bytes,
capacity,
owner,
stats: Some(Arc::clone(&self.stats)),
});
}
let selected = self.select_pool(min_bytes);
let fallback_capacity = selected.map_or(min_bytes, |pool| pool.capacity);
if let Some(pool) = selected {
if let Some(allocation) = pool.pop() {
self.stats.hits.fetch_add(1, Ordering::Relaxed);
let out = self.stats.checked_out.fetch_add(1, Ordering::Relaxed) + 1;
self.stats
.peak_checked_out
.fetch_max(out, Ordering::Relaxed);
let bytes_out = self
.stats
.bytes_checked_out
.fetch_add(min_bytes, Ordering::Relaxed)
+ min_bytes;
self.stats
.peak_bytes_checked_out
.fetch_max(bytes_out, Ordering::Relaxed);
return Ok(PoolBuffer {
ptr: allocation.ptr,
len: min_bytes,
capacity: pool.capacity,
owner: Arc::clone(pool),
stats: Some(Arc::clone(&self.stats)),
});
}
}
self.stats.misses.fetch_add(1, Ordering::Relaxed);
let out = self.stats.checked_out.fetch_add(1, Ordering::Relaxed) + 1;
self.stats
.peak_checked_out
.fetch_max(out, Ordering::Relaxed);
let bytes_out = self
.stats
.bytes_checked_out
.fetch_add(min_bytes, Ordering::Relaxed)
+ min_bytes;
self.stats
.peak_bytes_checked_out
.fetch_max(bytes_out, Ordering::Relaxed);
let owner = selected.map_or_else(
|| Arc::new(SizeClassPool::pooled(fallback_capacity, 0, self.numa_node)),
Arc::clone,
);
let allocation = owner.allocate_fallback();
Ok(PoolBuffer {
ptr: allocation.ptr,
len: min_bytes,
capacity: fallback_capacity,
owner,
stats: Some(Arc::clone(&self.stats)),
})
}
pub fn checkout_zeroed(&self, min_bytes: usize) -> Result<PoolBuffer> {
let mut buffer = self.checkout(min_bytes)?;
buffer.fill(0);
Ok(buffer)
}
pub fn checkout_aligned(&self, min_bytes: usize, alignment: usize) -> Result<PoolBuffer> {
if !alignment.is_power_of_two() || alignment == 0 {
return Err(Error::InvalidAlignment { alignment });
}
self.checkout(min_bytes)
}
#[doc(hidden)]
pub fn available_for_test(&self, class: usize) -> usize {
let pool = match class {
FOUR_KIB => &self.four_kib,
SIXTY_FOUR_KIB => &self.sixty_four_kib,
TWO_FIFTY_SIX_KIB => &self.two_fifty_six_kib,
ONE_MIB => &self.one_mib,
_ => return 0,
};
pool.queue
.as_ref()
.map_or(0, crossbeam_queue::ArrayQueue::len)
}
fn select_pool(&self, min_bytes: usize) -> Option<&Arc<SizeClassPool>> {
if min_bytes <= FOUR_KIB {
Some(&self.four_kib)
} else if min_bytes <= SIXTY_FOUR_KIB {
Some(&self.sixty_four_kib)
} else if min_bytes <= TWO_FIFTY_SIX_KIB {
Some(&self.two_fifty_six_kib)
} else if min_bytes <= ONE_MIB {
Some(&self.one_mib)
} else {
None
}
}
}