use std::{
ops::{Deref, DerefMut},
sync::{
Arc, LazyLock,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
};
use bytes::BytesMut;
use crossbeam_queue::ArrayQueue;
use tokio::time::interval;
static MEMORY_POOL: LazyLock<MemoryPool> = LazyLock::new(|| MemoryPool::new());
static ACQUIRE_BUFFER_NUM: LazyLock<AtomicUsize> = LazyLock::new(|| AtomicUsize::new(0));
struct MemoryPool(Arc<ArrayQueue<BytesMut>>);
impl MemoryPool {
const MAX_MESSAGE_SIZE: usize = 4096;
const MAX_QUEUE_SIZE: usize = 4096;
fn new() -> Self {
let queue: Arc<ArrayQueue<BytesMut>> = ArrayQueue::new(Self::MAX_QUEUE_SIZE).into();
{
let queue = queue.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(10));
let mut continuous_decline = false;
let mut tick_steps = 0;
loop {
interval.tick().await;
let acquire_size = ACQUIRE_BUFFER_NUM.load(Ordering::Relaxed);
let buffer_size = queue.len();
if buffer_size >= acquire_size * 3 {
if tick_steps == 0 {
continuous_decline = true;
}
} else {
continuous_decline = false;
}
tick_steps += 1;
if tick_steps >= 6 {
tick_steps = 0;
if continuous_decline {
continuous_decline = false;
for _ in 0..if buffer_size <= 3 {
buffer_size
} else {
buffer_size / 3
} {
let _ = queue.pop();
}
}
}
}
});
}
Self(queue)
}
fn get_buffer(&self, capacity: Option<usize>) -> Buffer {
ACQUIRE_BUFFER_NUM.fetch_add(1, Ordering::Relaxed);
let capacity = capacity
.unwrap_or(Self::MAX_MESSAGE_SIZE)
.min(Self::MAX_MESSAGE_SIZE);
Buffer(Some(
self.0
.pop()
.unwrap_or_else(|| BytesMut::with_capacity(capacity)),
))
}
fn return_buffer(&self, buffer: &mut Buffer) {
if let Some(mut bytes) = buffer.0.take() {
ACQUIRE_BUFFER_NUM.fetch_sub(1, Ordering::Relaxed);
bytes.clear();
let _ = self.0.push(bytes);
}
}
}
pub struct Buffer(Option<BytesMut>);
impl Buffer {
pub const MAX_MESSAGE_SIZE: usize = MemoryPool::MAX_MESSAGE_SIZE;
pub fn new() -> Self {
MEMORY_POOL.get_buffer(None)
}
pub fn with_capacity(capacity: usize) -> Self {
MEMORY_POOL.get_buffer(Some(capacity))
}
}
impl Deref for Buffer {
type Target = BytesMut;
fn deref(&self) -> &Self::Target {
self.0
.as_ref()
.expect("buffer is already returned to the pool")
}
}
impl DerefMut for Buffer {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0
.as_mut()
.expect("buffer is already returned to the pool")
}
}
impl Drop for Buffer {
fn drop(&mut self) {
MEMORY_POOL.return_buffer(self)
}
}