#![cfg_attr(feature = "nightly", feature(async_iterator, io_error_more))]
#![warn(
anonymous_parameters,
bare_trait_objects,
missing_debug_implementations,
missing_docs,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
variant_size_differences
)]
use std::cmp::min;
use std::marker::PhantomData;
use std::mem::{needs_drop, replace, size_of, take};
use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd};
use std::sync::atomic::{self, AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{self, Poll};
use std::time::Duration;
use std::{fmt, ptr};
mod bitmap;
mod config;
mod drop_waker;
mod op;
mod sys;
#[rustfmt::skip] pub mod fd;
pub mod cancel;
pub mod extract;
pub mod fs;
pub mod io;
pub mod mem;
pub mod msg;
pub mod net;
pub mod poll;
pub mod process;
use bitmap::AtomicBitMap;
#[doc(no_inline)]
pub use cancel::Cancel;
use config::munmap;
pub use config::Config;
use drop_waker::{drop_task_waker, DropWake};
#[doc(no_inline)]
pub use extract::Extract;
#[doc(no_inline)]
pub use fd::AsyncFd;
use op::{QueuedOperation, Submission};
use sys as libc;
#[derive(Debug)]
pub struct Ring {
cq: CompletionQueue,
sq: SubmissionQueue,
}
impl Ring {
pub const fn config<'r>(entries: u32) -> Config<'r> {
Config::new(entries)
}
#[doc(alias = "io_uring_setup")]
pub fn new(entries: u32) -> io::Result<Ring> {
Config::new(entries).build()
}
pub const fn submission_queue(&self) -> &SubmissionQueue {
&self.sq
}
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn enable(&mut self) -> io::Result<()> {
self.sq
.register(libc::IORING_REGISTER_ENABLE_RINGS, ptr::null(), 0)
}
#[doc(alias = "io_uring_enter")]
pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
let sq = self.sq.clone(); for completion in self.completions(timeout)? {
log::trace!(completion:? = completion; "dequeued completion event");
unsafe { sq.update_op(completion) };
}
self.wake_blocked_futures();
Ok(())
}
fn completions(&mut self, timeout: Option<Duration>) -> io::Result<Completions> {
let head = self.completion_head();
let mut tail = self.completion_tail();
if head == tail && !matches!(timeout, Some(Duration::ZERO)) {
self.enter(timeout)?;
tail = self.completion_tail();
}
Ok(Completions {
entries: self.cq.entries,
local_head: head,
head: self.cq.head,
tail,
ring_mask: self.cq.ring_mask,
_lifetime: PhantomData,
})
}
fn enter(&mut self, timeout: Option<Duration>) -> io::Result<()> {
let mut args = libc::io_uring_getevents_arg {
sigmask: 0,
sigmask_sz: 0,
pad: 0,
ts: 0,
};
let mut timespec = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
if let Some(timeout) = timeout {
timespec.tv_sec = timeout.as_secs().try_into().unwrap_or(i64::MAX);
timespec.tv_nsec = libc::c_longlong::from(timeout.subsec_nanos());
args.ts = ptr::addr_of!(timespec) as u64;
}
let submissions = if self.sq.shared.kernel_thread {
0 } else {
self.sq.shared.is_polling.store(true, Ordering::Release);
self.sq.unsubmitted()
};
let enter_flags = libc::IORING_ENTER_GETEVENTS | libc::IORING_ENTER_EXT_ARG;
log::debug!(submissions = submissions; "waiting for completion events");
let result = libc::syscall!(io_uring_enter2(
self.sq.shared.ring_fd.as_raw_fd(),
submissions,
1, enter_flags,
ptr::addr_of!(args).cast(),
size_of::<libc::io_uring_getevents_arg>(),
));
if !self.sq.shared.kernel_thread {
self.sq.shared.is_polling.store(false, Ordering::Release);
}
match result {
Ok(_) => Ok(()),
Err(ref err) if err.raw_os_error() == Some(libc::ETIME) => Ok(()),
Err(err) => Err(err),
}
}
fn completion_head(&mut self) -> u32 {
unsafe { (*self.cq.head).load(Ordering::Relaxed) }
}
fn completion_tail(&self) -> u32 {
unsafe { (*self.cq.tail).load(Ordering::Acquire) }
}
#[allow(clippy::needless_pass_by_ref_mut)]
fn wake_blocked_futures(&mut self) {
let n = self.sq.available_space();
if n == 0 {
return;
}
let mut blocked_futures = {
let blocked_futures = &mut *self.sq.shared.blocked_futures.lock().unwrap();
if blocked_futures.is_empty() {
return;
}
take(blocked_futures)
};
let waking = min(n, blocked_futures.len());
log::trace!(waking_amount = n, waiting_futures = blocked_futures.len(); "waking blocked futures");
for waker in blocked_futures.drain(..waking) {
waker.wake();
}
let got = &mut *self.sq.shared.blocked_futures.lock().unwrap();
let mut added = replace(got, blocked_futures);
got.append(&mut added);
}
}
impl AsFd for Ring {
fn as_fd(&self) -> BorrowedFd<'_> {
self.sq.shared.ring_fd.as_fd()
}
}
#[derive(Clone)]
pub struct SubmissionQueue {
shared: Arc<SharedSubmissionQueue>,
}
struct SharedSubmissionQueue {
ring_fd: OwnedFd,
ptr: *mut libc::c_void,
size: libc::c_uint,
pending_tail: AtomicU32,
len: u32,
ring_mask: u32,
kernel_thread: bool,
is_polling: AtomicBool,
op_indices: Box<AtomicBitMap>,
queued_ops: Box<[Mutex<Option<QueuedOperation>>]>,
blocked_futures: Mutex<Vec<task::Waker>>,
kernel_read: *const AtomicU32,
flags: *const AtomicU32,
entries: *mut Submission,
array_index: Mutex<u32>,
array: *mut AtomicU32,
array_tail: *mut AtomicU32,
}
impl SubmissionQueue {
pub fn wake(&self) {
let _: Result<(), QueueFull> = self.add_no_result(|submission| unsafe {
submission.wake(self.shared.ring_fd.as_raw_fd());
});
}
fn register(
&self,
op: libc::c_uint,
arg: *const libc::c_void,
nr_args: libc::c_uint,
) -> io::Result<()> {
libc::syscall!(io_uring_register(
self.shared.ring_fd.as_raw_fd(),
op,
arg,
nr_args
))?;
Ok(())
}
fn add<F>(&self, submit: F) -> Result<OpIndex, QueueFull>
where
F: FnOnce(&mut Submission),
{
self._add(submit, QueuedOperation::new)
}
fn add_multishot<F>(&self, submit: F) -> Result<OpIndex, QueueFull>
where
F: FnOnce(&mut Submission),
{
self._add(submit, QueuedOperation::new_multishot)
}
fn _add<F, O>(&self, submit: F, new_op: O) -> Result<OpIndex, QueueFull>
where
F: FnOnce(&mut Submission),
O: FnOnce() -> QueuedOperation,
{
let shared = &*self.shared;
let Some(op_index) = shared.op_indices.next_available() else {
return Err(QueueFull(()));
};
let queued_op = new_op();
let mut op = shared.queued_ops[op_index].lock().unwrap();
let old_queued_op = replace(&mut *op, Some(queued_op));
debug_assert!(old_queued_op.is_none());
let res = self.add_no_result(|submission| {
submit(submission);
submission.set_user_data(op_index as u64);
});
match res {
Ok(()) => Ok(OpIndex(op_index)),
Err(err) => {
*op = None;
drop(op);
shared.op_indices.make_available(op_index);
Err(err)
}
}
}
fn queue_multishot(&self) -> Result<OpIndex, QueueFull> {
self._queue(QueuedOperation::new_multishot)
}
fn _queue<O>(&self, new_op: O) -> Result<OpIndex, QueueFull>
where
O: FnOnce() -> QueuedOperation,
{
let shared = &*self.shared;
let Some(op_index) = shared.op_indices.next_available() else {
return Err(QueueFull(()));
};
let queued_op = new_op();
let old_queued_op = replace(
&mut *shared.queued_ops[op_index].lock().unwrap(),
Some(queued_op),
);
debug_assert!(old_queued_op.is_none());
Ok(OpIndex(op_index))
}
#[allow(clippy::mutex_integer)] fn add_no_result<F>(&self, submit: F) -> Result<(), QueueFull>
where
F: FnOnce(&mut Submission),
{
let shared = &*self.shared;
let kernel_read = self.kernel_read();
let tail = shared
.pending_tail
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |tail| {
if tail - kernel_read < shared.len {
Some(tail.wrapping_add(1))
} else {
None
}
});
let Ok(tail) = tail else {
self.maybe_wake_kernel_thread();
return Err(QueueFull(()));
};
let submission_index = tail & shared.ring_mask;
let submission = unsafe { &mut *shared.entries.add(submission_index as usize) };
submission.reset();
submission.set_user_data(u64::MAX);
submit(submission);
#[cfg(debug_assertions)]
debug_assert!(!submission.is_unchanged());
atomic::fence(Ordering::SeqCst);
log::trace!(submission:? = submission; "queueing submission");
{
let mut array_index = shared.array_index.lock().unwrap();
let idx = (*array_index & shared.ring_mask) as usize;
unsafe { (*shared.array.add(idx)).store(submission_index, Ordering::Release) };
let old_tail = unsafe { (*shared.array_tail).fetch_add(1, Ordering::AcqRel) };
debug_assert!(old_tail == *array_index);
*array_index += 1;
}
self.maybe_wake_kernel_thread();
self.maybe_submit_event();
Ok(())
}
fn wait_for_submission(&self, waker: task::Waker) {
log::trace!(waker:? = waker; "adding blocked future");
self.shared.blocked_futures.lock().unwrap().push(waker);
}
fn available_space(&self) -> usize {
let kernel_read = unsafe { (*self.shared.kernel_read).load(Ordering::Relaxed) };
let pending_tail = self.shared.pending_tail.load(Ordering::Relaxed);
(self.shared.len - (pending_tail - kernel_read)) as usize
}
fn unsubmitted(&self) -> u32 {
let kernel_read = unsafe { (*self.shared.kernel_read).load(Ordering::Relaxed) };
let pending_tail = self.shared.pending_tail.load(Ordering::Relaxed);
pending_tail - kernel_read
}
fn maybe_wake_kernel_thread(&self) {
if self.shared.kernel_thread && (self.flags() & libc::IORING_SQ_NEED_WAKEUP != 0) {
log::debug!("waking submission queue polling kernel thread");
let res = libc::syscall!(io_uring_enter2(
self.shared.ring_fd.as_raw_fd(),
0, 0, libc::IORING_ENTER_SQ_WAKEUP, ptr::null(), 0,
));
if let Err(err) = res {
log::warn!("failed to wake submission queue polling kernel thread: {err}");
}
}
}
fn maybe_submit_event(&self) {
if !self.shared.kernel_thread && self.shared.is_polling.load(Ordering::Relaxed) {
log::debug!("submitting submission event while another thread is `Ring::poll`ing");
let ring_fd = self.shared.ring_fd.as_raw_fd();
let res = libc::syscall!(io_uring_enter2(ring_fd, 1, 0, 0, ptr::null(), 0));
if let Err(err) = res {
log::warn!("failed to submit event: {err}");
}
}
}
pub(crate) fn poll_op(
&self,
ctx: &mut task::Context<'_>,
op_index: OpIndex,
) -> Poll<io::Result<(u16, i32)>> {
log::trace!(op_index = op_index.0; "polling operation");
if let Some(operation) = self.shared.queued_ops.get(op_index.0) {
let mut operation = operation.lock().unwrap();
if let Some(op) = &mut *operation {
let res = op.poll(ctx);
if res.is_ready() {
*operation = None;
drop(operation);
self.shared.op_indices.make_available(op_index.0);
}
return res;
}
}
panic!("a10::SubmissionQueue::poll called incorrectly");
}
pub(crate) fn poll_multishot_op(
&self,
ctx: &mut task::Context<'_>,
op_index: OpIndex,
) -> Poll<Option<io::Result<(u16, i32)>>> {
log::trace!(op_index = op_index.0; "polling multishot operation");
if let Some(operation) = self.shared.queued_ops.get(op_index.0) {
let mut operation = operation.lock().unwrap();
if let Some(op) = &mut *operation {
return match op.poll(ctx) {
Poll::Ready(res) => Poll::Ready(Some(res)),
Poll::Pending if op.is_done() => {
*operation = None;
drop(operation);
self.shared.op_indices.make_available(op_index.0);
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
};
}
}
panic!("a10::SubmissionQueue::poll_multishot called incorrectly");
}
pub(crate) fn cancel_op<R, D, F>(
&self,
op_index: OpIndex,
create_drop_waker: R,
cancel: F,
) -> Result<(), QueueFull>
where
R: FnOnce() -> D,
D: DropWake,
F: FnOnce(&mut Submission),
{
log::trace!(op_index = op_index.0; "canceling operation");
if let Some(operation) = self.shared.queued_ops.get(op_index.0) {
let mut operation = operation.lock().unwrap();
if let Some(op) = &mut *operation {
if op.no_more_events() {
*operation = None;
drop(operation);
self.shared.op_indices.make_available(op_index.0);
drop(create_drop_waker);
return Ok(());
}
let waker = if needs_drop::<D>() {
Some(unsafe { drop_task_waker(create_drop_waker()) })
} else {
None
};
op.set_dropped(waker);
return self.add_no_result(cancel);
}
}
panic!("a10::SubmissionQueue::cancel_op called incorrectly");
}
unsafe fn update_op(&self, completion: &Completion) {
let op_index = completion.index();
if let Some(operation) = self.shared.queued_ops.get(op_index) {
let mut operation = operation.lock().unwrap();
if let Some(op) = &mut *operation {
log::trace!(op_index = op_index, completion:? = completion; "updating operation");
let is_dropped = op.update(completion);
if is_dropped && op.no_more_events() {
*operation = None;
drop(operation);
self.shared.op_indices.make_available(op_index);
}
} else {
log::trace!(op_index = op_index, completion:? = completion; "operation gone, but got completion event");
}
}
}
fn kernel_read(&self) -> u32 {
unsafe { (*self.shared.kernel_read).load(Ordering::Acquire) }
}
fn flags(&self) -> u32 {
unsafe { (*self.shared.flags).load(Ordering::Acquire) }
}
}
#[allow(clippy::mutex_integer)] impl fmt::Debug for SubmissionQueue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn load_atomic_u32(ptr: *const AtomicU32) -> u32 {
unsafe { (*ptr).load(Ordering::Relaxed) }
}
let shared = &*self.shared;
let all = f.alternate();
let mut f = f.debug_struct("SubmissionQueue");
f.field("ring_fd", &shared.ring_fd.as_raw_fd())
.field("len", &shared.len)
.field("ring_mask", &shared.ring_mask)
.field("flags", &load_atomic_u32(shared.flags))
.field("pending_tail", &shared.pending_tail)
.field("kernel_read", &load_atomic_u32(shared.kernel_read))
.field(
"array_index",
&shared.array_index.lock().map(|i| *i).unwrap_or(u32::MAX),
)
.field("array_tail", &load_atomic_u32(shared.array_tail));
if all {
f.field("op_indices", &shared.op_indices)
.field("queued_ops", &shared.queued_ops)
.field("blocked_futures", &shared.blocked_futures)
.field("mmap_ptr", &shared.ptr)
.field("mmap_size", &shared.size);
}
f.finish()
}
}
unsafe impl Send for SharedSubmissionQueue {}
unsafe impl Sync for SharedSubmissionQueue {}
impl Drop for SharedSubmissionQueue {
fn drop(&mut self) {
if let Err(err) = munmap(
self.entries.cast(),
self.len as usize * size_of::<Submission>(),
) {
log::warn!("error unmapping a10::SubmissionQueue entries: {err}");
}
if let Err(err) = munmap(self.ptr, self.size as usize) {
log::warn!("error unmapping a10::SubmissionQueue: {err}");
}
}
}
#[derive(Copy, Clone)]
#[must_use]
struct OpIndex(usize);
impl fmt::Debug for OpIndex {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
struct QueueFull(());
impl From<QueueFull> for io::Error {
fn from(_: QueueFull) -> io::Error {
#[cfg(not(feature = "nightly"))]
let kind = io::ErrorKind::Other;
#[cfg(feature = "nightly")]
let kind = io::ErrorKind::ResourceBusy;
io::Error::new(kind, "submission queue is full")
}
}
impl fmt::Debug for QueueFull {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("QueueFull").finish()
}
}
impl fmt::Display for QueueFull {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("`a10::Ring` submission queue is full")
}
}
#[derive(Debug)]
struct CompletionQueue {
ptr: *mut libc::c_void,
size: libc::c_uint,
ring_mask: u32,
head: *mut AtomicU32,
tail: *const AtomicU32,
entries: *const Completion,
}
unsafe impl Send for CompletionQueue {}
unsafe impl Sync for CompletionQueue {}
impl Drop for CompletionQueue {
fn drop(&mut self) {
if let Err(err) = munmap(self.ptr, self.size as usize) {
log::warn!("error unmapping a10::CompletionQueue: {err}");
}
}
}
struct Completions<'ring> {
entries: *const Completion,
local_head: u32,
head: *mut AtomicU32,
tail: u32,
ring_mask: u32,
_lifetime: PhantomData<&'ring Ring>,
}
impl<'ring> Iterator for Completions<'ring> {
type Item = &'ring Completion;
fn next(&mut self) -> Option<Self::Item> {
let head = self.local_head;
let tail = self.tail;
if head < tail {
let idx = (head & self.ring_mask) as usize;
let completion = unsafe { &*self.entries.add(idx) };
self.local_head += 1;
Some(completion)
} else {
None
}
}
}
impl<'ring> Drop for Completions<'ring> {
fn drop(&mut self) {
unsafe { (*self.head).store(self.local_head, Ordering::Release) }
}
}
#[repr(transparent)]
struct Completion {
inner: libc::io_uring_cqe,
}
impl Completion {
const fn index(&self) -> usize {
self.inner.user_data as usize
}
const fn result(&self) -> i32 {
self.inner.res
}
const fn is_in_progress(&self) -> bool {
self.inner.flags & libc::IORING_CQE_F_MORE != 0
}
const fn is_notification(&self) -> bool {
self.inner.flags & libc::IORING_CQE_F_NOTIF != 0
}
const fn is_buffer_select(&self) -> bool {
self.inner.flags & libc::IORING_CQE_F_BUFFER != 0
}
const fn flags(&self) -> u16 {
(self.inner.flags & ((1 << libc::IORING_CQE_BUFFER_SHIFT) - 1)) as u16
}
const fn operation_flags(&self) -> u16 {
if self.is_buffer_select() {
(self.inner.flags >> libc::IORING_CQE_BUFFER_SHIFT) as u16
} else {
0
}
}
}
impl fmt::Debug for Completion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Completion")
.field("user_data", &self.inner.user_data)
.field("res", &self.inner.res)
.field("flags", &self.flags())
.field("operation_flags", &self.operation_flags())
.finish()
}
}
#[rustfmt::skip]
macro_rules! man_link {
($syscall: tt ( $section: tt ) ) => {
concat!(
"\n\nAdditional documentation can be found in the ",
"[`", stringify!($syscall), "(", stringify!($section), ")`]",
"(https://man7.org/linux/man-pages/man", stringify!($section), "/", stringify!($syscall), ".", stringify!($section), ".html)",
" manual.\n"
)
};
}
use man_link;