use crate::uring_sys;
pub mod cqe;
pub mod sqe;
mod completion_queue;
mod submission_queue;
mod probe;
pub mod registrar;
use std::{
fmt,
io,
mem::{self, MaybeUninit},
os::unix::io::RawFd,
ptr::{self, NonNull},
time::Duration,
};
#[doc(inline)]
pub use cqe::{CQEs, CQEsBlocking, CQE};
#[doc(inline)]
pub use sqe::{SQEs, SQE};
pub use completion_queue::CompletionQueue;
pub use submission_queue::SubmissionQueue;
pub use probe::Probe;
#[doc(inline)]
pub use registrar::{Personality, Registrar};
bitflags::bitflags! {
pub struct SetupFlags: u32 {
const IOPOLL = 1 << 0;
const SQPOLL = 1 << 1;
const SQ_AFF = 1 << 2;
const CQSIZE = 1 << 3;
const CLAMP = 1 << 4;
const ATTACH_WQ = 1 << 5;
}
}
bitflags::bitflags! {
pub struct SetupFeatures: u32 {
const SINGLE_MMAP = 1 << 0;
const NODROP = 1 << 1;
const SUBMIT_STABLE = 1 << 2;
const RW_CUR_POS = 1 << 3;
const CUR_PERSONALITY = 1 << 4;
const FAST_POLL = 1 << 5;
const POLL_32BITS = 1 << 6;
}
}
pub struct IoUring {
ring: uring_sys::io_uring,
}
impl IoUring {
pub fn new(entries: u32) -> io::Result<IoUring> {
IoUring::new_with_flags(entries, SetupFlags::empty(), SetupFeatures::empty())
}
pub fn new_with_flags(
entries: u32,
flags: SetupFlags,
features: SetupFeatures,
) -> io::Result<IoUring> {
unsafe {
let mut params: uring_sys::io_uring_params = mem::zeroed();
params.flags = flags.bits();
params.features = features.bits();
let mut ring = MaybeUninit::uninit();
resultify(uring_sys::io_uring_queue_init_params(
entries as _,
ring.as_mut_ptr(),
&mut params,
))?;
Ok(IoUring {
ring: ring.assume_init(),
})
}
}
pub fn sq(&self) -> SubmissionQueue<'_> {
SubmissionQueue::new(&*self)
}
pub fn cq(&self) -> CompletionQueue<'_> {
CompletionQueue::new(&*self)
}
pub fn registrar(&self) -> Registrar<'_> {
Registrar::new(self)
}
pub fn queues(&mut self) -> (SubmissionQueue<'_>, CompletionQueue<'_>, Registrar<'_>) {
(
SubmissionQueue::new(&*self),
CompletionQueue::new(&*self),
Registrar::new(&*self),
)
}
pub fn probe(&mut self) -> io::Result<Probe> {
Probe::for_ring(&mut self.ring)
}
pub fn submit_sqes(&mut self) -> io::Result<u32> {
self.sq().submit()
}
pub fn submit_sqes_and_wait(&mut self, wait_for: u32) -> io::Result<u32> {
self.sq().submit_and_wait(wait_for)
}
pub fn submit_sqes_and_wait_with_timeout(
&mut self,
wait_for: u32,
duration: Duration,
) -> io::Result<u32> {
self.sq().submit_and_wait_with_timeout(wait_for, duration)
}
pub fn peek_for_cqe(&mut self) -> Option<CQE> {
self.cq().peek_for_cqe()
}
pub fn wait_for_cqe(&mut self) -> io::Result<CQE> {
let ring = NonNull::from(&self.ring);
self.inner_wait_for_cqes(1, ptr::null())
.map(|cqe| CQE::new(ring, cqe))
}
pub fn wait_for_cqe_with_timeout(&mut self, duration: Duration) -> io::Result<CQE> {
let ts = uring_sys::__kernel_timespec {
tv_sec: duration.as_secs() as _,
tv_nsec: duration.subsec_nanos() as _,
};
let ring = NonNull::from(&self.ring);
self.inner_wait_for_cqes(1, &ts)
.map(|cqe| CQE::new(ring, cqe))
}
pub fn cqes(&mut self) -> CQEs<'_> {
CQEs::new(NonNull::from(&mut self.ring))
}
pub fn cqes_blocking(&mut self, count: u32) -> CQEsBlocking<'_> {
CQEsBlocking::new(NonNull::from(&mut self.ring), count)
}
pub fn wait_for_cqes(&mut self, count: u32) -> io::Result<()> {
self.inner_wait_for_cqes(count as _, ptr::null())
.map(|_| ())
}
fn inner_wait_for_cqes(
&mut self,
count: u32,
ts: *const uring_sys::__kernel_timespec,
) -> io::Result<&mut uring_sys::io_uring_cqe> {
unsafe {
let mut cqe = MaybeUninit::uninit();
resultify(uring_sys::io_uring_wait_cqes(
&mut self.ring,
cqe.as_mut_ptr(),
count,
ts,
ptr::null(),
))?;
Ok(&mut *cqe.assume_init())
}
}
pub fn raw(&self) -> &uring_sys::io_uring {
&self.ring
}
pub unsafe fn raw_mut(&mut self) -> &mut uring_sys::io_uring {
&mut self.ring
}
pub fn cq_ready(&mut self) -> u32 {
self.cq().ready()
}
pub fn sq_ready(&mut self) -> u32 {
self.sq().ready()
}
pub fn sq_space_left(&mut self) -> u32 {
self.sq().space_left()
}
pub fn cq_eventfd_enabled(&mut self) -> bool {
self.cq().eventfd_enabled()
}
pub fn cq_eventfd_toggle(&mut self, enabled: bool) -> io::Result<()> {
self.cq().eventfd_toggle(enabled)
}
pub fn raw_fd(&self) -> RawFd {
self.ring.ring_fd
}
}
impl fmt::Debug for IoUring {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(std::any::type_name::<Self>())
.field("fd", &self.ring.ring_fd)
.finish()
}
}
impl Drop for IoUring {
fn drop(&mut self) {
unsafe { uring_sys::io_uring_queue_exit(&mut self.ring) };
}
}
unsafe impl Send for IoUring {}
unsafe impl Sync for IoUring {}
fn resultify(x: i32) -> io::Result<u32> {
match x >= 0 {
true => Ok(x as u32),
false => Err(io::Error::from_raw_os_error(-x)),
}
}
#[cfg(test)]
mod tests {
use super::resultify;
#[test]
fn test_resultify() {
let side_effect = |i, effect: &mut _| -> i32 {
*effect += 1;
i
};
let mut calls = 0;
let ret = resultify(side_effect(0, &mut calls));
matches!(ret, Ok(0));
assert_eq!(calls, 1);
calls = 0;
let ret = resultify(side_effect(1, &mut calls));
matches!(ret, Ok(1));
assert_eq!(calls, 1);
calls = 0;
let ret = resultify(side_effect(-1, &mut calls));
matches!(ret, Err(e) if e.raw_os_error() == Some(1));
assert_eq!(calls, 1);
}
}