use crate::sync::Waiter;
use crate::sync::lock::Mutex;
use alloc::collections::VecDeque;
use alloc::vec::Vec;
use core::ptr::NonNull;
use core::sync::atomic::{AtomicU16, AtomicUsize, Ordering};
pub type BufferGroupId = u16;
pub type BufferId = u16;
pub struct Buffer {
pub ptr: NonNull<u8>,
pub len: usize,
pub group_id: BufferGroupId,
pub buffer_id: BufferId,
}
unsafe impl Send for Buffer {}
pub struct BufferPoolConfig {
pub buffer_size: usize,
pub buffers_per_group: usize,
pub max_groups: usize,
}
impl Default for BufferPoolConfig {
fn default() -> Self {
Self {
buffer_size: 65536, buffers_per_group: 256,
max_groups: 16,
}
}
}
struct BufferGroup {
buffers: Vec<Buffer>,
free_list: VecDeque<BufferId>,
allocated: AtomicUsize,
}
pub struct BufferPool {
config: BufferPoolConfig,
groups: Mutex<Vec<Option<BufferGroup>>>,
next_group_id: AtomicU16,
}
impl BufferPool {
pub fn new(config: BufferPoolConfig) -> Self {
let mut groups = Vec::with_capacity(config.max_groups);
for _ in 0..config.max_groups {
groups.push(None);
}
Self {
config,
groups: Mutex::new(groups),
next_group_id: AtomicU16::new(0),
}
}
pub async fn allocate_group(&self) -> Option<BufferGroupId> {
let mut groups = self.groups.lock(Waiter::default()).await;
for (idx, slot) in groups.iter_mut().enumerate() {
if slot.is_none() {
let group_id = idx as BufferGroupId;
let mut buffers = Vec::with_capacity(self.config.buffers_per_group);
let mut free_list = VecDeque::with_capacity(self.config.buffers_per_group);
for buffer_id in 0..self.config.buffers_per_group {
let layout = alloc::alloc::Layout::from_size_align(
self.config.buffer_size,
64, )
.unwrap();
let ptr = unsafe { alloc::alloc::alloc(layout) };
if ptr.is_null() {
return None;
}
buffers.push(Buffer {
ptr: NonNull::new(ptr).unwrap(),
len: self.config.buffer_size,
group_id,
buffer_id: buffer_id as BufferId,
});
free_list.push_back(buffer_id as BufferId);
}
*slot = Some(BufferGroup {
buffers,
free_list,
allocated: AtomicUsize::new(0),
});
return Some(group_id);
}
}
None
}
pub async fn release_group(&self, group_id: BufferGroupId) {
let mut groups = self.groups.lock(Waiter::default()).await;
if let Some(Some(group)) = groups.get_mut(group_id as usize) {
let layout =
alloc::alloc::Layout::from_size_align(self.config.buffer_size, 64).unwrap();
for buffer in &group.buffers {
unsafe {
alloc::alloc::dealloc(buffer.ptr.as_ptr(), layout);
}
}
groups[group_id as usize] = None;
}
}
pub async fn with_buffer<F, R>(&self, group_id: BufferGroupId, f: F) -> Option<R>
where
F: FnOnce(&Buffer) -> R,
{
let groups = self.groups.lock(Waiter::default()).await;
if let Some(Some(group)) = groups.get(group_id as usize) {
if let Some(&buffer_id) = group.free_list.front() {
return group.buffers.get(buffer_id as usize).map(f);
}
}
None
}
pub async fn mark_used(&self, group_id: BufferGroupId, buffer_id: BufferId) {
let mut groups = self.groups.lock(Waiter::default()).await;
if let Some(Some(group)) = groups.get_mut(group_id as usize) {
group.free_list.retain(|&id| id != buffer_id);
group.allocated.fetch_add(1, Ordering::Relaxed);
}
}
pub async fn return_buffer(&self, group_id: BufferGroupId, buffer_id: BufferId) {
let mut groups = self.groups.lock(Waiter::default()).await;
if let Some(Some(group)) = groups.get_mut(group_id as usize) {
if !group.free_list.contains(&buffer_id) {
group.free_list.push_back(buffer_id);
group.allocated.fetch_sub(1, Ordering::Relaxed);
}
}
}
pub async fn get_group_buffers(
&self,
group_id: BufferGroupId,
) -> Option<Vec<(NonNull<u8>, usize)>> {
let groups = self.groups.lock(Waiter::default()).await;
if let Some(Some(group)) = groups.get(group_id as usize) {
Some(group.buffers.iter().map(|b| (b.ptr, b.len)).collect())
} else {
None
}
}
}
pub struct SocketBufferAllocation {
pool: Arc<BufferPool>,
group_id: BufferGroupId,
in_use: Mutex<Vec<BufferId>>,
}
use alloc::sync::Arc;
impl SocketBufferAllocation {
pub async fn new(pool: Arc<BufferPool>) -> Option<Self> {
let group_id = pool.allocate_group().await?;
Some(Self {
pool,
group_id,
in_use: Mutex::new(Vec::new()),
})
}
pub fn group_id(&self) -> BufferGroupId {
self.group_id
}
pub async fn acquire_buffer(&self) -> Option<BufferId> {
None
}
pub async fn release_buffer(&self, buffer_id: BufferId) {
self.pool.return_buffer(self.group_id, buffer_id).await;
let mut in_use = self.in_use.lock(Waiter::default()).await;
in_use.retain(|&id| id != buffer_id);
}
}
impl Drop for SocketBufferAllocation {
fn drop(&mut self) {
let pool = self.pool.clone();
let group_id = self.group_id;
crate::runtime::spawn_local(async move {
pool.release_group(group_id).await;
});
}
}