mod accept;
mod buffers;
mod futex;
mod recv;
mod register;
mod send_zc;
#[cfg(feature = "connect")]
mod connect;
#[cfg(feature = "epoll")]
mod epoll_ctl;
use crate::error::UringBearerError;
use io_uring::IoUring;
use crate::completion::SubmissionRecordStatus;
use crate::fixed::FixedFdRegister;
use crate::slab::BuffersRec;
use crate::slab::FutexRec;
use crate::Completion;
use io_uring_owner::Owner;
use io_uring_opcode::{OpCode, OpCompletion};
use slabbable::Slabbable;
use slabbable_impl_selector::SelectedSlab;
use crate::BearerCapacityKind;
use capacity::Capacity;
use capacity::Setting as CapacitySetting;
pub struct UringBearer<C> {
pub(crate) io_uring: IoUring<io_uring::squeue::Entry, io_uring::cqueue::Entry>,
pub(crate) fd_slab: SelectedSlab<Completion<C>>,
pub(crate) fd_register: FixedFdRegister,
pub(crate) bufs: SelectedSlab<BuffersRec>,
pub(crate) futexes: SelectedSlab<FutexRec>,
}
impl<C: core::fmt::Debug + Clone + OpCompletion> UringBearer<C> {
pub fn with_capacity<H: CapacitySetting<BearerCapacityKind>>(
caps: Capacity<H, BearerCapacityKind>,
) -> Result<Self, UringBearerError> {
let iou: IoUring<io_uring::squeue::Entry, io_uring::cqueue::Entry> = IoUring::builder()
.build(caps.of_unbounded(&BearerCapacityKind::CoreQueue) as u32)
.map_err(|e| UringBearerError::IoUringCreate(e.to_string()))?;
Self::from_io_uring(iou, caps)
}
pub fn from_io_uring<H: CapacitySetting<BearerCapacityKind>>(
iou: IoUring<io_uring::squeue::Entry, io_uring::cqueue::Entry>,
caps: Capacity<H, BearerCapacityKind>,
) -> Result<Self, UringBearerError> {
Ok(Self {
io_uring: iou,
fd_slab: SelectedSlab::<Completion<C>>::with_fixed_capacity(
caps.of_unbounded(&BearerCapacityKind::PendingCompletions),
)
.map_err(UringBearerError::Slabbable)?,
fd_register: FixedFdRegister::with_fixed_capacity(
caps.of_unbounded(&BearerCapacityKind::RegisteredFd) as u32,
),
bufs: SelectedSlab::<BuffersRec>::with_fixed_capacity(
caps.of_unbounded(&BearerCapacityKind::Buffers),
)
.map_err(UringBearerError::Slabbable)?,
futexes: SelectedSlab::<FutexRec>::with_fixed_capacity(
caps.of_unbounded(&BearerCapacityKind::Futexes),
)
.map_err(UringBearerError::Slabbable)?,
})
}
pub fn completions<F, U>(&mut self, user: &mut U, func: F) -> Result<(), UringBearerError>
where
F: Fn(&mut U, &io_uring::cqueue::Entry, &Completion<C>),
{
unsafe {
self.handle_completions(user, |u, e, rec| {
func(u, e, rec);
SubmissionRecordStatus::Retain
})
}
}
pub unsafe fn handle_completions<F, U>(
&mut self,
user: &mut U,
func: F,
) -> Result<(), UringBearerError>
where
F: Fn(&mut U, &io_uring::cqueue::Entry, &Completion<C>) -> SubmissionRecordStatus,
{
let iou = &mut self.io_uring;
let c_queue = iou.completion();
for item in c_queue {
let key = item.user_data();
let a_rec_t = self
.fd_slab
.slot_get_ref(key as usize)
.map_err(UringBearerError::Slabbable)?;
if let Some(completed_rec) = a_rec_t {
let rec_status = func(user, &item, completed_rec);
if rec_status == SubmissionRecordStatus::Forget {
self.fd_slab
.mark_for_reuse(key as usize)
.map_err(UringBearerError::Slabbable)?;
}
}
}
Ok(())
}
pub fn io_uring(&mut self) -> &mut IoUring<io_uring::squeue::Entry, io_uring::cqueue::Entry> {
&mut self.io_uring
}
pub fn submit(&self) -> Result<usize, UringBearerError> {
self.io_uring
.submit()
.map_err(|e| UringBearerError::Submission(e.to_string()))
}
pub fn submit_and_wait(&self, want: usize) -> Result<usize, UringBearerError> {
self.io_uring
.submit_and_wait(want)
.map_err(|e| UringBearerError::Submission(e.to_string()))
}
pub fn push_op<Op: OpCode<C>>(&mut self, op: Op) -> Result<usize, UringBearerError> {
let key = self
.fd_slab
.take_next_with(Completion::Op(op.submission()?))
.map_err(UringBearerError::Slabbable)?;
match self._push_to_completion(key) {
Err(e) => Err(e),
Ok(()) => Ok(key),
}
}
pub fn push_op_typed(&mut self, op: Completion<C>) -> Result<usize, UringBearerError> {
let key = self
.fd_slab
.take_next_with(op)
.map_err(UringBearerError::Slabbable)?;
match self._push_to_completion(key) {
Err(e) => Err(e),
Ok(()) => Ok(key),
}
}
#[inline]
pub(crate) fn _push_to_completion(&mut self, idx: usize) -> Result<(), UringBearerError> {
let iou = &mut self.io_uring;
let mut s_queue = iou.submission();
let completion_rec = self
.fd_slab
.slot_get_mut(idx)
.map_err(UringBearerError::Slabbable)?;
let submission = match completion_rec {
Some(completion) => {
if completion.owner() == Owner::Kernel {
return Err(UringBearerError::InvalidOwnership(completion.owner(), idx));
}
completion.force_owner_kernel();
completion.entry().user_data(idx as u64)
}
_ => return Err(UringBearerError::SlabBugSetGet("Submisison not found?")),
};
match unsafe { s_queue.push(&submission) } {
Ok(_) => Ok(()),
Err(_) => Err(UringBearerError::SubmissionPush),
}
}
#[inline]
fn _fixed_fd_validate(&self, try_fixed_fd: u32) -> bool {
if try_fixed_fd > self.fd_register.capacity() - 1 {
return false;
}
match self.fd_register.get(try_fixed_fd) {
Some((_, _itm)) => true,
_ => false,
}
}
}