use std::{
cell::UnsafeCell,
fmt::Debug,
io,
mem::{self, MaybeUninit},
ops::{Deref, DerefMut},
ptr::{self, NonNull},
rc::{Rc, Weak},
slice,
};
use compio_buf::{IoBuf, IoBufMut, SetLen};
use crate::sys::BufControl;
pub trait BufferAllocator {
fn allocate(len: u32) -> NonNull<MaybeUninit<u8>>;
unsafe fn deallocate(ptr: NonNull<MaybeUninit<u8>>, len: u32);
}
pub struct BoxAllocator;
impl BufferAllocator for BoxAllocator {
fn allocate(len: u32) -> NonNull<MaybeUninit<u8>> {
let ptr = Box::into_raw(Box::<[u8]>::new_uninit_slice(len as usize)).cast();
unsafe { NonNull::new_unchecked(ptr) }
}
unsafe fn deallocate(ptr: NonNull<MaybeUninit<u8>>, len: u32) {
let ptr = ptr::slice_from_raw_parts_mut(ptr.as_ptr(), len as usize);
_ = unsafe { Box::from_raw(ptr) };
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct BufferAlloc {
allocate: fn(len: u32) -> NonNull<MaybeUninit<u8>>,
deallocate: unsafe fn(ptr: NonNull<MaybeUninit<u8>>, len: u32),
}
impl BufferAlloc {
pub fn new<A: BufferAllocator>() -> Self {
Self {
allocate: A::allocate,
deallocate: A::deallocate,
}
}
}
pub(crate) type BufPtr = NonNull<MaybeUninit<u8>>;
pub(crate) type Slot = Option<BufPtr>;
const _: () = assert!(size_of::<Slot>() == size_of::<usize>());
#[derive(Clone)]
pub struct BufferPool {
shared: Weak<Shared>,
}
#[repr(transparent)]
#[derive(Debug)]
pub(crate) struct BufferPoolRoot {
shared: Rc<Shared>,
}
#[derive(Debug)]
pub struct BufferRef {
alloc: BufferAlloc,
len: u32,
cap: u32,
full_cap: u32,
shared: Weak<Shared>,
ptr: BufPtr,
buffer_id: u16,
}
#[repr(transparent)]
struct Shared {
inner: UnsafeCell<Inner>,
}
struct Inner {
alloc: BufferAlloc,
ctrl: BufControl,
size: u32,
bufs: Vec<Slot>,
}
impl BufferPoolRoot {
pub(crate) fn new(
driver: &mut crate::Driver,
alloc: BufferAlloc,
num_of_bufs: u16,
buffer_size: usize,
flags: u16,
) -> io::Result<Self> {
let size: u32 = buffer_size.try_into().map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
"Buffer size too large. Should be able to fit into u32.",
)
})?;
let bufs = (0..num_of_bufs.next_power_of_two())
.map(|_| Some((alloc.allocate)(size)))
.collect::<Vec<_>>();
let ctrl = unsafe { BufControl::new(driver, &bufs, size, flags) }?;
Ok(Self {
shared: Shared {
inner: Inner {
alloc,
ctrl,
size,
bufs,
}
.into(),
}
.into(),
})
}
pub(crate) unsafe fn release(&mut self, driver: &mut crate::Driver) -> io::Result<()> {
unsafe {
self.shared.with(|inner| {
inner.ctrl.release(driver)?;
for buf in mem::take(&mut inner.bufs).into_iter().flatten() {
(inner.alloc.deallocate)(buf, inner.size)
}
io::Result::Ok(())
})
}?;
Ok(())
}
pub(crate) fn get_pool(&self) -> BufferPool {
BufferPool {
shared: Rc::downgrade(&self.shared),
}
}
pub(crate) fn is_unique(&self) -> bool {
Rc::strong_count(&self.shared) == 1
}
}
impl Debug for BufferPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(shared) = self.shared.upgrade() {
f.debug_struct("BufferPool")
.field("shared", &shared)
.finish()
} else {
f.debug_struct("BufferPool")
.field("shared", &"<dropped>")
.finish()
}
}
}
impl Debug for Shared {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
struct Buf {
ptr: BufPtr,
}
impl Debug for Buf {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Buf<{:p}>", self.ptr)
}
}
struct BuffersDebug<'a> {
buffers: &'a [Slot],
}
impl Debug for BuffersDebug<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_list()
.entries(self.buffers.iter().map(|buf| buf.map(|ptr| Buf { ptr })))
.finish()
}
}
unsafe {
self.with(|inner| {
let buffers = BuffersDebug {
buffers: &inner.bufs,
};
f.debug_struct("Shared")
.field("control", &inner.ctrl)
.field("size", &inner.size)
.field("buffers", &buffers)
.finish()
})
}
}
}
impl BufferPool {
pub fn pop(&self) -> io::Result<BufferRef> {
let buffer_id = unsafe { self.with(|inner| inner.ctrl.pop()) }??;
Ok(self.take(buffer_id)?.expect("Buffer should be available"))
}
pub fn take(&self, buffer_id: u16) -> io::Result<Option<BufferRef>> {
let shared = self.shared()?;
let Some(ptr) = shared.take(buffer_id) else {
return Ok(None);
};
let cap = shared.len();
Ok(Some(BufferRef {
alloc: shared.alloc(),
len: 0,
cap,
full_cap: cap,
shared: Rc::downgrade(&shared),
ptr,
buffer_id,
}))
}
pub fn reset(&self, buffer_id: u16) -> io::Result<bool> {
let shared = self.shared()?;
let Some(buf) = shared.take(buffer_id) else {
return Ok(false);
};
shared.reset(buffer_id, buf);
Ok(true)
}
fn shared(&self) -> io::Result<Rc<Shared>> {
self.shared
.upgrade()
.ok_or_else(|| io::Error::other("The driver has been dropped"))
}
unsafe fn with<F, R>(&self, f: F) -> io::Result<R>
where
F: FnOnce(&mut Inner) -> R,
{
Ok(unsafe { self.shared()?.with(f) })
}
#[cfg(io_uring)]
pub(crate) fn buffer_group(&self) -> io::Result<u16> {
unsafe { self.with(|i| i.ctrl.buffer_group()) }
}
#[cfg(fusion)]
pub fn is_io_uring(&self) -> io::Result<bool> {
unsafe { self.with(|inner| inner.ctrl.is_io_uring()) }
}
}
impl Shared {
#[inline(always)]
unsafe fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut Inner) -> R,
{
f(unsafe { &mut *self.inner.get() })
}
fn alloc(&self) -> BufferAlloc {
unsafe { self.with(|inner| inner.alloc) }
}
fn take(&self, buffer_id: u16) -> Option<BufPtr> {
unsafe { self.with(|inner| inner.bufs.get_mut(buffer_id as usize)?.take()) }
}
fn reset(&self, buffer_id: u16, ptr: BufPtr) {
unsafe {
self.with(|inner| {
inner.bufs[buffer_id as usize] = Some(ptr);
inner.ctrl.reset(buffer_id, ptr, inner.size);
})
}
}
fn len(&self) -> u32 {
unsafe { self.with(|inner| inner.size) }
}
}
impl BufferRef {
pub fn with_capacity(mut self, cap: usize) -> Self {
self.set_capacity(cap);
self
}
pub fn set_capacity(&mut self, cap: usize) {
if cap == 0 {
return;
}
self.cap = (cap as u32).min(self.full_cap);
self.len = self.len.min(self.cap);
}
}
impl Deref for BufferRef {
type Target = [u8];
fn deref(&self) -> &Self::Target {
unsafe { slice::from_raw_parts(self.ptr.as_ptr().cast(), self.len as usize) }
}
}
impl DerefMut for BufferRef {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr() as _, self.len as usize) }
}
}
impl IoBuf for BufferRef {
fn as_init(&self) -> &[u8] {
self
}
}
impl SetLen for BufferRef {
unsafe fn set_len(&mut self, len: usize) {
debug_assert!(len <= u32::MAX as usize);
self.len = (len as u32).min(self.cap);
}
}
impl IoBufMut for BufferRef {
fn as_uninit(&mut self) -> &mut [MaybeUninit<u8>] {
unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.cap as usize) }
}
}
impl Drop for BufferRef {
fn drop(&mut self) {
if let Some(shared) = self.shared.upgrade() {
shared.reset(self.buffer_id, self.ptr);
} else {
unsafe { (self.alloc.deallocate)(self.ptr, self.full_cap) }
}
}
}