use std::{
ops::{Deref, DerefMut},
sync::{
Arc, LazyLock,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
};
use bytes::BytesMut;
use crossbeam_channel::{Receiver, Sender, unbounded};
use tokio::time::sleep;
static MEMORY_POOL: LazyLock<MemoryPool> = LazyLock::new(|| MemoryPool::new());
static ACQUIRE_BUFFER_NUM: LazyLock<AtomicUsize> = LazyLock::new(|| AtomicUsize::new(0));
pub struct Buffer {
sender: Arc<Sender<BytesMut>>,
bytes: Option<BytesMut>,
}
impl Deref for Buffer {
type Target = BytesMut;
fn deref(&self) -> &Self::Target {
self.bytes
.as_ref()
.expect("buffer is already returned to the pool")
}
}
impl DerefMut for Buffer {
fn deref_mut(&mut self) -> &mut Self::Target {
self.bytes
.as_mut()
.expect("buffer is already returned to the pool")
}
}
impl Drop for Buffer {
fn drop(&mut self) {
if let Some(bytes) = self.bytes.as_mut() {
bytes.clear();
}
let _ = self.sender.send(
self.bytes
.take()
.expect("buffer is already returned to the pool"),
);
ACQUIRE_BUFFER_NUM.fetch_sub(1, Ordering::SeqCst);
}
}
const OBSERVE_INTERVAL_SECS: u64 = 10;
const LOW_USAGE_WINDOW_TICKS: u32 = 6;
const MIN_IDLE_KEEP: usize = 256;
const LOW_USAGE_RATIO_NUM: usize = 1;
const LOW_USAGE_RATIO_DEN: usize = 4;
const SHRINK_COOLDOWN_TICKS: u32 = 2;
const SHRINK_DIVISOR: usize = 4;
const SHRINK_MIN_STEP: usize = 32;
const SHRINK_MAX_STEP: usize = 512;
pub struct MemoryPool {
receiver: Arc<Receiver<BytesMut>>,
sender: Arc<Sender<BytesMut>>,
}
impl MemoryPool {
pub const MAX_MESSAGE_SIZE: usize = 4096;
pub fn acquire() -> Buffer {
MEMORY_POOL.get_buffer()
}
fn new() -> Self {
let (sender, receiver) = unbounded::<BytesMut>();
let receiver = Arc::new(receiver);
{
let receiver_ = receiver.clone();
tokio::spawn(async move {
let mut low_usage_ticks = 0u32;
let mut shrink_cooldown = 0u32;
loop {
sleep(Duration::from_secs(OBSERVE_INTERVAL_SECS)).await;
let acquire_size = ACQUIRE_BUFFER_NUM.load(Ordering::SeqCst);
let buffer_size = receiver_.len();
let low_usage = buffer_size > MIN_IDLE_KEEP
&& acquire_size.saturating_mul(LOW_USAGE_RATIO_DEN)
<= buffer_size.saturating_mul(LOW_USAGE_RATIO_NUM);
if low_usage {
low_usage_ticks = low_usage_ticks.saturating_add(1);
} else {
low_usage_ticks = 0;
shrink_cooldown = 0;
}
if shrink_cooldown > 0 {
shrink_cooldown -= 1;
}
if low_usage_ticks >= LOW_USAGE_WINDOW_TICKS && shrink_cooldown == 0 {
let excess = buffer_size.saturating_sub(MIN_IDLE_KEEP);
if excess > 0 {
let step = (excess / SHRINK_DIVISOR)
.clamp(SHRINK_MIN_STEP, SHRINK_MAX_STEP)
.min(excess);
let mut reclaimed = 0usize;
for _ in 0..step {
if receiver_.try_recv().is_ok() {
reclaimed += 1;
} else {
break;
}
}
if reclaimed > 0 {
log::debug!(
"memory pool shrink: reclaimed={}, idle_before={}, idle_after={}, acquired={}",
reclaimed,
buffer_size,
buffer_size.saturating_sub(reclaimed),
acquire_size,
);
}
shrink_cooldown = SHRINK_COOLDOWN_TICKS;
}
}
if acquire_size > buffer_size * 2 {
log::debug!(
"memory pool is running low: acquire_size={}, buffer_size={buffer_size}",
acquire_size,
);
}
}
});
}
Self {
sender: Arc::new(sender),
receiver,
}
}
fn get_buffer(&self) -> Buffer {
ACQUIRE_BUFFER_NUM.fetch_add(1, Ordering::SeqCst);
Buffer {
sender: self.sender.clone(),
bytes: Some(
self.receiver
.try_recv()
.ok()
.unwrap_or_else(|| BytesMut::with_capacity(Self::MAX_MESSAGE_SIZE)),
),
}
}
}