use std::{
ops::{Deref, DerefMut},
sync::{
Arc, LazyLock,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
};
use bytes::BytesMut;
use crossbeam_channel::{Receiver, Sender, unbounded};
use tokio::time::interval;
static MEMORY_POOL: LazyLock<MemoryPool> = LazyLock::new(|| MemoryPool::new());
static ACQUIRE_BUFFER_NUM: LazyLock<AtomicUsize> = LazyLock::new(|| AtomicUsize::new(0));
pub struct Buffer {
sender: Arc<Sender<BytesMut>>,
bytes: Option<BytesMut>,
}
impl Deref for Buffer {
type Target = BytesMut;
fn deref(&self) -> &Self::Target {
self.bytes
.as_ref()
.expect("buffer is already returned to the pool")
}
}
impl DerefMut for Buffer {
fn deref_mut(&mut self) -> &mut Self::Target {
self.bytes
.as_mut()
.expect("buffer is already returned to the pool")
}
}
impl Drop for Buffer {
fn drop(&mut self) {
if let Some(bytes) = self.bytes.as_mut() {
bytes.clear();
}
let _ = self.sender.send(
self.bytes
.take()
.expect("buffer is already returned to the pool"),
);
ACQUIRE_BUFFER_NUM.fetch_sub(1, Ordering::SeqCst);
}
}
pub struct MemoryPool {
receiver: Arc<Receiver<BytesMut>>,
sender: Arc<Sender<BytesMut>>,
}
impl MemoryPool {
pub const MAX_MESSAGE_SIZE: usize = 4096;
pub fn acquire() -> Buffer {
MEMORY_POOL.get_buffer()
}
fn new() -> Self {
let (sender, receiver) = unbounded::<BytesMut>();
let receiver = Arc::new(receiver);
{
let receiver_ = receiver.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(10));
let mut continuous_decline = false;
let mut tick_steps = 0;
loop {
interval.tick().await;
let acquire_size = ACQUIRE_BUFFER_NUM.load(Ordering::SeqCst);
let buffer_size = receiver_.len();
if buffer_size >= acquire_size * 3 {
if tick_steps == 0 {
continuous_decline = true;
}
} else {
continuous_decline = false;
}
tick_steps += 1;
if tick_steps >= 6 {
tick_steps = 0;
if continuous_decline {
continuous_decline = false;
for _ in 0..if buffer_size <= 3 {
buffer_size
} else {
buffer_size / 3
} {
if receiver_.try_recv().is_err() {
break;
}
}
}
}
}
});
}
Self {
sender: Arc::new(sender),
receiver,
}
}
fn get_buffer(&self) -> Buffer {
ACQUIRE_BUFFER_NUM.fetch_add(1, Ordering::SeqCst);
Buffer {
sender: self.sender.clone(),
bytes: Some(
self.receiver
.try_recv()
.ok()
.unwrap_or_else(|| BytesMut::with_capacity(Self::MAX_MESSAGE_SIZE)),
),
}
}
}