//! The [A10] io_uring library.
//!
//! This library is meant as a low-level library safely exposing the io_uring
//! API. For simplicity this only has two main types and a number of helper
//! types:
//! * [`Ring`] is a wrapper around io_uring used to poll for completion events.
//! * [`AsyncFd`] is a wrapper around a file descriptor that provides a safe
//! API to schedule operations.
//!
//! Some modules provide ways to create `AsyncFd`, e.g. [`OpenOptions`], others
//! are simply a place to expose the [`Future`]s supporting the scheduled
//! operations. The modules try to follow the same structure as that of the
//! standard library.
//!
//! [A10]: https://en.wikipedia.org/wiki/A10_motorway_(Netherlands)
//! [`OpenOptions`]: fs::OpenOptions
//! [`Future`]: std::future::Future
//!
//! # Notes
//!
//! Most I/O operations need ownership of the data, e.g. a buffer, so it can
//! delay deallocation if needed. For example when a `Future` is dropped before
//! being polled to completion. This data can be retrieved again by using the
//! [`Extract`] trait.
//!
//! ## Examples
//!
//! The example below implements the `cat(1)` program that concatenates files
//! and prints them to standard out.
//!
//! ```
//! use std::path::PathBuf;
//! use std::future::Future;
//! use std::io;
//!
//! use a10::{Extract, Ring, SubmissionQueue};
//!
//! # fn main() -> io::Result<()> {
//! // Create a new I/O uring supporting 8 submission entries.
//! let mut ring = Ring::new(8)?;
//!
//! // Get access to the submission queue, used to... well queue submissions.
//! let sq = ring.submission_queue().clone();
//! // A10 makes use of `Future`s to represent the asynchronous nature of
//! // io_uring.
//! let future = cat(sq, "./src/lib.rs");
//!
//! // This `block_on` function would normally be implement by a `Future`
//! // runtime, but we show a simple example implementation below.
//! block_on(&mut ring, future)?;
//! # Ok(()) }
//!
//! /// A "cat" like function, which reads from `filename` and writes it to
//! /// standard out.
//! async fn cat(sq: SubmissionQueue, filename: &str) -> io::Result<()> {
//! // Because io_uring uses asychronous operation it needs access to the
//! // path for the duration the operation is active. To prevent use-after
//! // free and similar issues we need ownership of the arguments. In the
//! // case of opening a file it means we need ownership of the file name.
//! let filename = PathBuf::from(filename);
//! // Open a file for reading.
//! let file = a10::fs::OpenOptions::new().open(sq.clone(), filename).await?;
//!
//! // Next we'll read from the from the file.
//! // Here we need ownership of the buffer, same reason as discussed above.
//! let buf = file.read(Vec::with_capacity(32 * 1024)).await?;
//!
//! // Let's write what we read from the file to standard out.
//! let stdout = a10::io::stdout(sq);
//! // For writing we also need ownership of the buffer, so we move the
//! // buffer into function call. However by default we won't get it back,
//! // to match the API you see in the standard libray.
//! // But using buffers just once it a bit wasteful, so we can it back
//! // using the `Extract` trait (the call to `extract`). It changes the
//! // return values (and `Future` type) to return the buffer and the amount
//! // of bytes written.
//! let (buf, n) = stdout.write(buf).extract().await?;
//!
//! // All done.
//! Ok(())
//! }
//!
//! /// Block on the `future`, expecting polling `ring` to drive it forward.
//! fn block_on<Fut, T>(ring: &mut Ring, future: Fut) -> Fut::Output
//! where
//! Fut: Future<Output = io::Result<T>>
//! {
//! use std::task::{self, RawWaker, RawWakerVTable, Poll};
//! use std::ptr;
//!
//! // Pin the future to the stack so we don't move it around.
//! let mut future = std::pin::pin!(future);
//!
//! // Create a task context to poll the future work.
//! let waker = unsafe { task::Waker::from_raw(RawWaker::new(ptr::null(), &WAKER_VTABLE)) };
//! let mut ctx = task::Context::from_waker(&waker);
//!
//! loop {
//! match future.as_mut().poll(&mut ctx) {
//! Poll::Ready(result) => return result,
//! Poll::Pending => {
//! // Poll the `Ring` to get an update on the operation(s).
//! //
//! // In pratice you would first yield to another future, but
//! // in this example we don't have one, so we'll always poll
//! // the `Ring`.
//! ring.poll(None)?;
//! }
//! }
//! }
//!
//! // A waker implementation that does nothing.
//! static WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
//! |_| RawWaker::new(ptr::null(), &WAKER_VTABLE),
//! |_| {},
//! |_| {},
//! |_| {},
//! );
//! }
//! ```
#![cfg_attr(feature = "nightly", feature(async_iterator, io_error_more))]
#![warn(
anonymous_parameters,
bare_trait_objects,
missing_debug_implementations,
missing_docs,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
variant_size_differences
)]
use std::cmp::min;
use std::marker::PhantomData;
use std::mem::{needs_drop, replace, size_of, take, ManuallyDrop};
use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, OwnedFd, RawFd};
use std::sync::atomic::{self, AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{self, Poll};
use std::time::Duration;
use std::{fmt, ptr};
mod bitmap;
pub mod cancel;
mod config;
pub mod extract;
pub mod fs;
pub mod io;
pub mod mem;
pub mod msg;
pub mod net;
mod op;
pub mod poll;
pub mod process;
pub mod signals;
// TODO: replace this with definitions from the `libc` crate once available.
mod sys;
use sys as libc;
use bitmap::AtomicBitMap;
use config::munmap;
pub use config::Config;
#[doc(no_inline)]
pub use extract::Extract;
use msg::{MsgListener, MsgToken, SendMsg};
use op::{QueuedOperation, Submission};
use poll::{MultishotPoll, OneshotPoll};
/// This type represents the user space side of an io_uring.
///
/// An io_uring is split into two queues: the submissions and completions queue.
/// The [`SubmissionQueue`] is public, but doesn't provide many methods. The
/// `SubmissionQueue` is used by I/O types in the crate to schedule asynchronous
/// operations.
///
/// The completions queue is not exposed by the crate and only used internally.
/// Instead it will wake the [`Future`]s exposed by the various I/O types, such
/// as [`AsyncFd::write`]'s [`Write`] `Future`.
///
/// [`Future`]: std::future::Future
/// [`AsyncFd::write`]: AsyncFd::write
/// [`Write`]: io::Write
#[derive(Debug)]
pub struct Ring {
/// # Notes
///
/// `CompletionQueue` musted be dropped before the `SubmissionQueue` because
/// the `ring_fd` in `SubmissionQueue` is used in the memory mappings
/// backing `CompletionQueue`.
cq: CompletionQueue,
/// Shared between this `Ring` and all types that queue any operations.
///
/// Because it depends on memory mapping from the file descriptor of the
/// ring the file descriptor is stored in the `SubmissionQueue` itself.
sq: SubmissionQueue,
}
impl Ring {
/// Configure a `Ring`.
///
/// `entries` must be a power of two and in the range 1..=4096.
///
/// # Notes
///
/// A10 always uses `IORING_SETUP_SQPOLL`, which required Linux kernel 5.11
/// to work correctly. Furthermore before Linux 5.13 the user needs the
/// `CAP_SYS_NICE` capability if run as non-root.
pub const fn config<'r>(entries: u32) -> Config<'r> {
Config::new(entries)
}
/// Create a new `Ring` with the default configuration.
///
/// For more configuration options see [`Config`].
#[doc(alias = "io_uring_setup")]
pub fn new(entries: u32) -> io::Result<Ring> {
Config::new(entries).build()
}
/// Returns the `SubmissionQueue` used by this ring.
///
/// The `SubmissionQueue` can be used to queue asynchronous I/O operations.
pub const fn submission_queue(&self) -> &SubmissionQueue {
&self.sq
}
/// Enable the ring.
///
/// This only required when starting the ring in disabled mode, see
/// [`Config::disable`].
pub fn enable(&mut self) -> io::Result<()> {
libc::syscall!(io_uring_register(
self.sq.shared.ring_fd.as_raw_fd(),
libc::IORING_REGISTER_ENABLE_RINGS,
ptr::null(),
0,
))?;
Ok(())
}
/// Poll the ring for completions.
///
/// This will wake all completed [`Future`]s with the result of their
/// operations.
///
/// If a zero duration timeout (i.e. `Some(Duration::ZERO)`) is passed this
/// function will only wake all already completed operations. It then
/// guarantees to not make a system call, but it also means it doesn't
/// guarantee at least one completion was processed.
///
/// [`Future`]: std::future::Future
#[doc(alias = "io_uring_enter")]
pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
let sq = self.sq.clone(); // TODO: remove clone.
for completion in self.completions(timeout)? {
log::trace!(completion:? = completion; "dequeued completion event");
// SAFETY: we're calling this based on information from the kernel.
unsafe { sq.update_op(completion) };
}
self.wake_blocked_futures();
Ok(())
}
/// Returns an iterator for all completion events, makes a system call if no
/// completions are queued.
fn completions(&mut self, timeout: Option<Duration>) -> io::Result<Completions> {
let head = self.completion_head();
let mut tail = self.completion_tail();
if head == tail && !matches!(timeout, Some(Duration::ZERO)) {
// If we have no completions and we have no, or a non-zero, timeout
// we make a system call to wait for completion events.
self.enter(timeout)?;
// NOTE: we're the only onces writing to the completion `head` so we
// don't need to read it again.
tail = self.completion_tail();
}
Ok(Completions {
entries: self.cq.entries,
local_head: head,
head: self.cq.head,
tail,
ring_mask: self.cq.ring_mask,
_lifetime: PhantomData,
})
}
/// Make the `io_uring_enter` system call.
fn enter(&mut self, timeout: Option<Duration>) -> io::Result<()> {
let mut args = libc::io_uring_getevents_arg {
sigmask: 0,
sigmask_sz: 0,
pad: 0,
ts: 0,
};
let mut timespec = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
if let Some(timeout) = timeout {
timespec.tv_sec = timeout.as_secs().try_into().unwrap_or(i64::MAX);
timespec.tv_nsec = libc::c_longlong::from(timeout.subsec_nanos());
args.ts = ptr::addr_of!(timespec) as u64;
}
let submissions = if self.sq.shared.kernel_thread {
0 // Kernel thread handles the submissions.
} else {
self.sq.shared.is_polling.store(true, Ordering::Release);
self.sq.unsubmitted()
};
// If there are no completions we'll wait for at least one.
let enter_flags = libc::IORING_ENTER_GETEVENTS // Wait for a completion.
| libc::IORING_ENTER_EXT_ARG; // Passing of `args`.
log::debug!(submissions = submissions; "waiting for completion events");
let result = libc::syscall!(io_uring_enter2(
self.sq.shared.ring_fd.as_raw_fd(),
submissions,
1, // Wait for at least one completion.
enter_flags,
ptr::addr_of!(args).cast(),
size_of::<libc::io_uring_getevents_arg>(),
));
if !self.sq.shared.kernel_thread {
self.sq.shared.is_polling.store(false, Ordering::Release);
}
match result {
Ok(_) => Ok(()),
// Hit timeout, we can ignore it.
Err(ref err) if err.raw_os_error() == Some(libc::ETIME) => Ok(()),
Err(err) => Err(err),
}
}
/// Returns `CompletionQueue.head`.
fn completion_head(&mut self) -> u32 {
// SAFETY: we're the only once writing to it so `Relaxed` is fine. The
// pointer itself is valid as long as `Ring.fd` is alive.
unsafe { (*self.cq.head).load(Ordering::Relaxed) }
}
/// Returns `CompletionQueue.tail`.
fn completion_tail(&self) -> u32 {
// SAFETY: this written to by the kernel so we need to use `Acquire`
// ordering. The pointer itself is valid as long as `Ring.fd` is alive.
unsafe { (*self.cq.tail).load(Ordering::Acquire) }
}
/// Wake [`SharedSubmissionQueue::blocked_futures`].
fn wake_blocked_futures(&mut self) {
// This not particullary efficient, but with a large enough number of
// entries, `IORING_SETUP_SQPOLL` and suffcient calls to [`Ring::poll`]
// this shouldn't be used at all.
let n = self.sq.available_space();
if n == 0 {
return;
}
let mut blocked_futures = {
let blocked_futures = &mut *self.sq.shared.blocked_futures.lock().unwrap();
if blocked_futures.is_empty() {
return;
}
take(blocked_futures)
};
// Do the waking outside of the lock.
let waking = min(n, blocked_futures.len());
log::trace!(waking_amount = n, waiting_futures = blocked_futures.len(); "waking blocked futures");
for waker in blocked_futures.drain(..waking) {
waker.wake();
}
// Put the remaining wakers back, even if it's empty to keep the
// allocation.
let got = &mut *self.sq.shared.blocked_futures.lock().unwrap();
let mut added = replace(got, blocked_futures);
got.append(&mut added);
}
}
impl AsFd for Ring {
fn as_fd(&self) -> BorrowedFd<'_> {
self.sq.shared.ring_fd.as_fd()
}
}
/// Queue to submit asynchronous operations to.
///
/// This type doesn't have many public methods, but is used by all I/O types,
/// such as [`OpenOptions`], to queue asynchronous operations. The queue can be
/// acquired by using [`Ring::submission_queue`].
///
/// The submission queue can be shared by cloning it, it's a cheap operation.
///
/// [`OpenOptions`]: fs::OpenOptions
#[derive(Clone)]
pub struct SubmissionQueue {
shared: Arc<SharedSubmissionQueue>,
}
/// Shared internals of [`SubmissionQueue`].
struct SharedSubmissionQueue {
/// File descriptor of the io_uring.
ring_fd: OwnedFd,
/// Mmap-ed pointer.
ptr: *mut libc::c_void,
/// Mmap-ed size in bytes.
size: libc::c_uint,
/// Local version of `tail`.
/// Increased in `queue` to give the caller mutable access to a
/// [`Submission`] in `entries`.
/// NOTE: this does not mean that `pending_tail` number of submissions are
/// ready, this is determined by `tail`.
pending_tail: AtomicU32,
// NOTE: the following two fields are constant. We read them once from the
// mmap area and then copied them here to avoid the need for the atomics.
/// Number of entries in the queue.
len: u32,
/// Mask used to index into the `sqes` queue.
ring_mask: u32,
/// True if we're using a kernel thread to do submission polling, i.e. if
/// `IORING_SETUP_SQPOLL` is enabled.
kernel_thread: bool,
/// Boolean indicating a thread is [`Ring::poll`]ing. Only used when
/// `kernel_thread` is false.
is_polling: AtomicBool,
/// Bitmap which can be used to create an index into `op_queue`.
op_indices: Box<AtomicBitMap>,
/// State of queued operations, holds the (would be) result and
/// `task::Waker`. It's used when adding new operations and when marking
/// operations as complete (by the kernel).
queued_ops: Box<[Mutex<Option<QueuedOperation>>]>,
/// Futures that are waiting for a slot in `queued_ops`.
blocked_futures: Mutex<Vec<task::Waker>>,
// NOTE: the following fields reference mmaped pages shared with the kernel,
// thus all need atomic access.
/// Head to queue, i.e. the submussions read by the kernel. Incremented by
/// the kernel when submissions has succesfully been processed.
kernel_read: *const AtomicU32,
/// Flags set by the kernel to communicate state information.
flags: *const AtomicU32,
/// Array of `len` submission entries shared with the kernel. We're the only
/// one modifiying the structures, but the kernel can read from them.
///
/// This pointer is also used in the `unmmap` call.
entries: *mut Submission,
/// Variable used to get an index into `array`. The lock must be held while
/// writing into `array` to prevent race conditions with other threads.
array_index: Mutex<u32>,
/// Array of `len` indices (into `entries`) shared with the kernel. We're
/// the only one modifiying the structures, but the kernel can read from it.
///
/// This is protected by `array_index`.
array: *mut AtomicU32,
/// Incremented by us when submitting new submissions.
array_tail: *mut AtomicU32,
}
impl SubmissionQueue {
/// Wake the connected [`Ring`].
///
/// All this does is interrupt a call to [`Ring::poll`].
pub fn wake(&self) {
// We ignore the queue full error as it means that is *very* unlikely
// that the Ring is currently being polling if the submission queue is
// filled. More likely the Ring hasn't been polled in a while.
let _: Result<(), QueueFull> = self.add_no_result(|submission| unsafe {
submission.wake(self.shared.ring_fd.as_raw_fd());
});
}
/// Setup a listener for user space messages.
///
/// The returned [`MsgListener`] iterator will return all messages send
/// using [`SubmissionQueue::try_send_msg`] and
/// [`SubmissionQueue::send_msg`] using the returned `MsgToken`.
///
/// # Notes
///
/// This will return an error if too many operations are already queued,
/// this is usually resolved by calling [`Ring::poll`].
///
/// The returned `MsgToken` has an implicitly lifetime linked to
/// `MsgListener`. If `MsgListener` is dropped the `MsgToken` will
/// become invalid.
///
/// Due to the limitations mentioned above it's advised to consider the
/// usefulness of the type severly limited. The returned `MsgListener`
/// iterator should live for the entire lifetime of the `Ring`, to ensure we
/// don't use `MsgToken` after it became invalid. Furthermore to ensure
/// the creation of it succeeds it should be done early in the lifetime of
/// `Ring`.
pub fn msg_listener(self) -> io::Result<(MsgListener, MsgToken)> {
MsgListener::new(self)
}
/// Try to send a message to iterator listening for message using `MsgToken`.
///
/// This will use the io_uring submission queue to share `data` with the
/// receiving end. This means that it will wake up the thread if it's
/// currently [polling].
///
/// This will fail if the submission queue is currently full. See
/// [`SubmissionQueue::send_msg`] for a version that tries again when the
/// submission queue is full.
///
/// See [`SubmissionQueue::msg_listener`] for examples.
///
/// [polling]: Ring::poll
pub fn try_send_msg(&self, token: MsgToken, data: u32) -> io::Result<()> {
self.add_no_result(|submission| unsafe {
submission.msg(self.shared.ring_fd.as_raw_fd(), (token.0).0 as u64, data, 0);
submission.no_completion_event();
})?;
Ok(())
}
/// Send a message to iterator listening for message using `MsgToken`.
pub const fn send_msg<'a>(&'a self, token: MsgToken, data: u32) -> SendMsg<'a> {
SendMsg::new(self, token, data)
}
/// Wait for an event specified in `mask` on the file descriptor `fd`.
///
/// Ths is similar to calling `poll(2)` the file descriptor.
#[doc(alias = "poll")]
#[doc(alias = "epoll")]
#[doc(alias = "select")]
#[allow(clippy::cast_sign_loss)]
pub fn oneshot_poll<'a>(&'a self, fd: BorrowedFd, mask: libc::c_int) -> OneshotPoll<'a> {
OneshotPoll::new(self, fd.as_raw_fd(), mask as u32)
}
/// Returns an [`AsyncIterator`] that returns multiple events as specified
/// in `mask` on the file descriptor `fd`.
///
/// This is not the same as calling [`SubmissionQueue::oneshot_poll`] in a
/// loop as this uses a multishot operation, which means only a single
/// operation is created kernel side, making this more efficient.
///
/// [`AsyncIterator`]: std::async_iter::AsyncIterator
#[allow(clippy::cast_sign_loss)]
pub fn multishot_poll<'a>(&'a self, fd: BorrowedFd, mask: libc::c_int) -> MultishotPoll<'a> {
MultishotPoll::new(self, fd.as_raw_fd(), mask as u32)
}
/// Add a submission to the queue.
///
/// Returns an index into the `op_queue` which can be used to check the
/// progress of the operation. Once the operation is completed and the
/// result read the index should be made avaiable again in `op_indices` and
/// the value set to `None`.
///
/// Returns an error if the submission queue is full. To fix this call
/// [`Ring::poll`] (and handle the completed operations) and try queueing
/// again.
fn add<F>(&self, submit: F) -> Result<OpIndex, QueueFull>
where
F: FnOnce(&mut Submission),
{
self._add(submit, QueuedOperation::new)
}
/// Same as [`add`] but uses a multishot `QueuedOperation`.
fn add_multishot<F>(&self, submit: F) -> Result<OpIndex, QueueFull>
where
F: FnOnce(&mut Submission),
{
self._add(submit, QueuedOperation::new_multishot)
}
/// See [`add`] or [`add_multishot`].
fn _add<F, O>(&self, submit: F, new_op: O) -> Result<OpIndex, QueueFull>
where
F: FnOnce(&mut Submission),
O: FnOnce() -> QueuedOperation,
{
// Get an index to the queued operation queue.
let shared = &*self.shared;
let Some(op_index) = shared.op_indices.next_available() else {
return Err(QueueFull(()));
};
let queued_op = new_op();
// SAFETY: the `AtomicBitMap` always returns valid indices for
// `op_queue` (it's the whole point of it).
let mut op = shared.queued_ops[op_index].lock().unwrap();
let old_queued_op = replace(&mut *op, Some(queued_op));
debug_assert!(old_queued_op.is_none());
let res = self.add_no_result(|submission| {
submit(submission);
submission.set_user_data(op_index as u64);
});
match res {
Ok(()) => Ok(OpIndex(op_index)),
Err(err) => {
// Make the index available, we're not going to use it.
*op = None;
drop(op);
shared.op_indices.make_available(op_index);
Err(err)
}
}
}
/// Queue a new operation without making a submission.
fn queue_multishot(&self) -> Result<OpIndex, QueueFull> {
self._queue(QueuedOperation::new_multishot)
}
/// See [`queue_multishot`].
fn _queue<O>(&self, new_op: O) -> Result<OpIndex, QueueFull>
where
O: FnOnce() -> QueuedOperation,
{
// Get an index to the queued operation queue.
let shared = &*self.shared;
let Some(op_index) = shared.op_indices.next_available() else {
return Err(QueueFull(()));
};
let queued_op = new_op();
// SAFETY: the `AtomicBitMap` always returns valid indices for
// `op_queue` (it's the whole point of it).
let old_queued_op = replace(
&mut *shared.queued_ops[op_index].lock().unwrap(),
Some(queued_op),
);
debug_assert!(old_queued_op.is_none());
Ok(OpIndex(op_index))
}
/// Same as [`SubmissionQueue::add`], but ignores the result.
#[allow(clippy::mutex_integer)] // For `array_index`, need to the lock for more.
fn add_no_result<F>(&self, submit: F) -> Result<(), QueueFull>
where
F: FnOnce(&mut Submission),
{
let shared = &*self.shared;
// First we need to acquire mutable access to an `Submission` entry in
// the `entries` array.
//
// We do this by increasing `pending_tail` by 1, reserving
// `entries[pending_tail]` for ourselves, while ensuring we don't go
// beyond what the kernel has processed by checking `tail - kernel_read`
// is less then the length of the submission queue.
let kernel_read = self.kernel_read();
let tail = shared
.pending_tail
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |tail| {
if tail - kernel_read < shared.len {
// Still an entry available.
Some(tail + 1) // TODO: handle overflows.
} else {
None
}
});
let Ok(tail) = tail else {
// If the kernel thread is not awake we'll need to wake it to make
// space in the submission queue.
self.maybe_wake_kernel_thread();
return Err(QueueFull(()));
};
// SAFETY: the `ring_mask` ensures we can never get an index larger
// then the size of the queue. Above we've already ensured that
// we're the only thread with mutable access to the entry.
let submission_index = tail & shared.ring_mask;
let submission = unsafe { &mut *shared.entries.add(submission_index as usize) };
// Let the caller fill the `submission`.
submission.reset();
submission.set_user_data(u64::MAX);
submit(submission);
#[cfg(debug_assertions)]
debug_assert!(!submission.is_unchanged());
// Ensure that all writes to the `submission` are done.
atomic::fence(Ordering::SeqCst);
// Now that we've written our submission we need add it to the
// `array` so that the kernel can process it.
log::trace!(submission:? = submission; "queueing submission");
{
// Now that the submission is filled we need to add it to the
// `shared.array` so that the kernel can read from it.
//
// We do this with a lock to avoid a race condition between two
// threads incrementing `shared.tail` concurrently. Consider the
// following execution:
//
// Thread A | Thread B
// ... | ...
// ... | Got `array_index` 0.
// Got `array_index` 1. |
// Writes index to `shared.array[1]`. |
// `shared.tail.fetch_add` to 1. |
// At this point the kernel will/can read `shared.array[0]`, but
// thread B hasn't filled it yet. So the kernel will read an invalid
// index!
// | Writes index to `shared.array[0]`.
// | `shared.tail.fetch_add` to 2.
let mut array_index = shared.array_index.lock().unwrap();
let idx = (*array_index & shared.ring_mask) as usize;
// SAFETY: `idx` is masked above to be within the correct bounds.
unsafe { (*shared.array.add(idx)).store(submission_index, Ordering::Release) };
// SAFETY: we filled the array above.
let old_tail = unsafe { (*shared.array_tail).fetch_add(1, Ordering::AcqRel) };
debug_assert!(old_tail == *array_index);
*array_index += 1;
}
// If the kernel thread is not awake we'll need to wake it for it to
// process our submission.
self.maybe_wake_kernel_thread();
// When we're not using the kernel polling thread we might have to
// submit the event ourselves to ensure we can make progress while the
// (user space) polling thread is calling `Ring::poll`.
self.maybe_submit_event();
Ok(())
}
/// Wait for a submission slot, waking `waker` once one is available.
fn wait_for_submission(&self, waker: task::Waker) {
log::trace!(waker:? = waker; "adding blocked future");
self.shared.blocked_futures.lock().unwrap().push(waker);
}
/// Returns the number of slots available.
///
/// # Notes
///
/// The value return can be outdated the nanosecond it is returned, don't
/// make a safety decisions based on it.
fn available_space(&self) -> usize {
// SAFETY: the `kernel_read` pointer itself is valid as long as
// `Ring.fd` is alive.
// We use Relaxed here because the caller knows the value will be
// outdated.
let kernel_read = unsafe { (*self.shared.kernel_read).load(Ordering::Relaxed) };
let pending_tail = self.shared.pending_tail.load(Ordering::Relaxed);
(self.shared.len - (pending_tail - kernel_read)) as usize
}
/// Returns the number of unsumitted submission queue entries.
fn unsubmitted(&self) -> u32 {
// SAFETY: the `kernel_read` pointer itself is valid as long as
// `Ring.fd` is alive.
// We use Relaxed here because it can already be outdated the moment we
// return it, the caller has to deal with that.
let kernel_read = unsafe { (*self.shared.kernel_read).load(Ordering::Relaxed) };
let pending_tail = self.shared.pending_tail.load(Ordering::Relaxed);
pending_tail - kernel_read
}
/// Wake up the kernel thread polling for submission events, if the kernel
/// thread needs a wakeup.
fn maybe_wake_kernel_thread(&self) {
if self.shared.kernel_thread && (self.flags() & libc::IORING_SQ_NEED_WAKEUP != 0) {
log::debug!("waking submission queue polling kernel thread");
let res = libc::syscall!(io_uring_enter2(
self.shared.ring_fd.as_raw_fd(),
0, // We've already queued our submissions.
0, // Don't wait for any completion events.
libc::IORING_ENTER_SQ_WAKEUP, // Wake up the kernel.
ptr::null(), // We don't pass any additional arguments.
0,
));
if let Err(err) = res {
log::warn!("failed to wake submission queue polling kernel thread: {err}");
}
}
}
/// Submit the event to the kernel when not using a kernel polling thread
/// and another thread is currently [`Ring::poll`]ing.
fn maybe_submit_event(&self) {
if !self.shared.kernel_thread && self.shared.is_polling.load(Ordering::Relaxed) {
log::debug!("submitting submission event while another thread is `Ring::poll`ing");
let ring_fd = self.shared.ring_fd.as_raw_fd();
let res = libc::syscall!(io_uring_enter2(ring_fd, 1, 0, 0, ptr::null(), 0));
if let Err(err) = res {
log::warn!("failed to submit event: {err}");
}
}
}
/// Poll a queued operation with `op_index` to check if it's ready.
///
/// # Notes
///
/// If this return [`Poll::Ready`] it marks `op_index` slot as available.
pub(crate) fn poll_op(
&self,
ctx: &mut task::Context<'_>,
op_index: OpIndex,
) -> Poll<io::Result<(u16, i32)>> {
log::trace!(op_index = op_index.0; "polling operation");
if let Some(operation) = self.shared.queued_ops.get(op_index.0) {
let mut operation = operation.lock().unwrap();
if let Some(op) = &mut *operation {
let res = op.poll(ctx);
if res.is_ready() {
*operation = None;
drop(operation);
self.shared.op_indices.make_available(op_index.0);
}
return res;
}
}
panic!("a10::SubmissionQueue::poll called incorrectly");
}
/// Poll a queued multishot operation with `op_index` to check if it's
/// ready.
///
/// # Notes
///
/// If this return [`Poll::Ready(None)`] it marks `op_index` slot as
/// available.
pub(crate) fn poll_multishot_op(
&self,
ctx: &mut task::Context<'_>,
op_index: OpIndex,
) -> Poll<Option<io::Result<(u16, i32)>>> {
log::trace!(op_index = op_index.0; "polling multishot operation");
if let Some(operation) = self.shared.queued_ops.get(op_index.0) {
let mut operation = operation.lock().unwrap();
if let Some(op) = &mut *operation {
return match op.poll(ctx) {
Poll::Ready(res) => Poll::Ready(Some(res)),
Poll::Pending if op.is_done() => {
*operation = None;
drop(operation);
self.shared.op_indices.make_available(op_index.0);
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
};
}
}
panic!("a10::SubmissionQueue::poll_multishot called incorrectly");
}
/// Mark the operation with `op_index` as dropped, attempting to cancel it.
///
/// Because the kernel still has access to the `resources`, we might have to
/// do some trickery to delay the deallocation of `resources` and making the
/// queued operation slot available again.
///
/// When the operation is still in progress we attempt to cancel it using
/// submission created by `cancel`. If the operation has completed it will
/// just drop `resources` and make the slot available again.
///
/// # Notes
///
/// `cancel` should most likely use [`Submission::no_completion_event`]
pub(crate) fn cancel_op<T, F>(
&self,
op_index: OpIndex,
resources: T,
cancel: F,
) -> Result<(), QueueFull>
where
F: FnOnce(&mut Submission),
{
log::trace!(op_index = op_index.0; "canceling operation");
if let Some(operation) = self.shared.queued_ops.get(op_index.0) {
let mut operation = operation.lock().unwrap();
if let Some(op) = &mut *operation {
if op.no_more_events() {
// Easy path, the operation has already been completed.
*operation = None;
// Unlock defore dropping `resources`, which might take a
// while.
drop(operation);
self.shared.op_indices.make_available(op_index.0);
// We can safely drop the resources.
drop(resources);
return Ok(());
}
// Hard path, the operation is not done, but the Future holding
// the resource is about to be dropped, so we need to apply some
// trickery here.
//
// We need to do two things:
// 1. Delay the dropping of `resources` until the kernel is done
// with the operation.
// 2. Delay the available making of the queued operation slot
// until the kernel is done with the operation.
//
// We achieve 1 by creating a special waker that just drops the
// resources in `resources`.
let waker = if needs_drop::<T>() {
// SAFETY: we're not going to clone the `waker`.
Some(unsafe { drop_task_waker(Box::from(resources)) })
} else {
// Of course if we don't need to drop `T`, then we don't
// have to use a special waker. But we still don't want to
// wake up the `Future` as that not longer used.
None
};
// We achive 2 by setting the operation state to dropped, so
// that `QueuedOperation::set_result` returns true, which makes
// `complete` below make the queued operation slot available
// again.
op.set_dropped(waker);
// Cancel the operation.
return self.add_no_result(cancel);
}
}
panic!("a10::SubmissionQueue::cancel_op called incorrectly");
}
/// Update an operation based on `completion`.
///
/// # Safety
///
/// This may only be called based on information form the kernel.
unsafe fn update_op(&self, completion: &Completion) {
let op_index = completion.index();
if let Some(operation) = self.shared.queued_ops.get(op_index) {
let mut operation = operation.lock().unwrap();
if let Some(op) = &mut *operation {
log::trace!(op_index = op_index, completion:? = completion; "updating operation");
let is_dropped = op.update(completion);
if is_dropped && op.no_more_events() {
// The Future was previously dropped so no one is waiting on
// the result. We can make the slot avaiable again.
*operation = None;
drop(operation);
self.shared.op_indices.make_available(op_index);
}
} else {
log::trace!(op_index = op_index, completion:? = completion; "operation gone, but got completion event");
}
}
}
/// Returns `self.kernel_read`.
fn kernel_read(&self) -> u32 {
// SAFETY: this written to by the kernel so we need to use `Acquire`
// ordering. The pointer itself is valid as long as `Ring.fd` is alive.
unsafe { (*self.shared.kernel_read).load(Ordering::Acquire) }
}
/// Returns `self.flags`.
fn flags(&self) -> u32 {
// SAFETY: this written to by the kernel so we need to use `Acquire`
// ordering. The pointer itself is valid as long as `Ring.fd` is alive.
unsafe { (*self.shared.flags).load(Ordering::Acquire) }
}
}
#[allow(clippy::mutex_integer)] // For `array_index`, need to the lock for more.
impl fmt::Debug for SubmissionQueue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
/// Load a `u32` using relaxed ordering from `ptr`.
fn load_atomic_u32(ptr: *const AtomicU32) -> u32 {
unsafe { (*ptr).load(Ordering::Relaxed) }
}
let shared = &*self.shared;
let all = f.alternate();
let mut f = f.debug_struct("SubmissionQueue");
f.field("ring_fd", &shared.ring_fd.as_raw_fd())
.field("len", &shared.len)
.field("ring_mask", &shared.ring_mask)
.field("flags", &load_atomic_u32(shared.flags))
.field("pending_tail", &shared.pending_tail)
.field("kernel_read", &load_atomic_u32(shared.kernel_read))
.field(
"array_index",
&shared.array_index.lock().map(|i| *i).unwrap_or(u32::MAX),
)
.field("array_tail", &load_atomic_u32(shared.array_tail));
if all {
f.field("op_indices", &shared.op_indices)
.field("queued_ops", &shared.queued_ops)
.field("blocked_futures", &shared.blocked_futures)
.field("mmap_ptr", &shared.ptr)
.field("mmap_size", &shared.size);
}
f.finish()
}
}
unsafe impl Send for SharedSubmissionQueue {}
unsafe impl Sync for SharedSubmissionQueue {}
impl Drop for SharedSubmissionQueue {
fn drop(&mut self) {
if let Err(err) = munmap(
self.entries.cast(),
self.len as usize * size_of::<Submission>(),
) {
log::warn!("error unmapping a10::SubmissionQueue entries: {err}");
}
if let Err(err) = munmap(self.ptr, self.size as usize) {
log::warn!("error unmapping a10::SubmissionQueue: {err}");
}
}
}
/// Index into [`SharedSubmissionQueue::op_indices`].
///
/// Returned by [`SubmissionQueue::add`] and used by
/// [`SubmissionQueue::poll_op`] to check for a result.
#[derive(Copy, Clone)]
#[must_use]
struct OpIndex(usize);
impl fmt::Debug for OpIndex {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
/// Error returned when the submission queue is full.
///
/// To resolve this issue call [`Ring::poll`].
///
/// Can be convert into [`io::Error`].
struct QueueFull(());
impl From<QueueFull> for io::Error {
fn from(_: QueueFull) -> io::Error {
#[cfg(not(feature = "nightly"))]
let kind = io::ErrorKind::Other;
#[cfg(feature = "nightly")]
let kind = io::ErrorKind::ResourceBusy;
io::Error::new(kind, "submission queue is full")
}
}
impl fmt::Debug for QueueFull {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("QueueFull").finish()
}
}
impl fmt::Display for QueueFull {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("`a10::Ring` submission queue is full")
}
}
/// Create a [`task::Waker`] that will drop itself when the waker is dropped.
///
/// # Safety
///
/// The returned `task::Waker` cannot be cloned, it will panic.
unsafe fn drop_task_waker<T: DropWaker>(to_drop: T) -> task::Waker {
unsafe fn drop_by_ptr<T: DropWaker>(data: *const ()) {
T::drop_from_waker_data(data);
}
// SAFETY: we meet the `task::Waker` and `task::RawWaker` requirements.
unsafe {
task::Waker::from_raw(task::RawWaker::new(
to_drop.into_waker_data(),
&task::RawWakerVTable::new(
|_| panic!("attempted to clone `a10::drop_task_waker`"),
// SAFETY: `wake` takes ownership, so dropping is safe.
drop_by_ptr::<T>,
|_| { /* `wake_by_ref` is a no-op. */ },
drop_by_ptr::<T>,
),
))
}
}
/// Trait used by [`drop_task_waker`].
trait DropWaker {
/// Return itself as waker data.
fn into_waker_data(self) -> *const ();
/// Drop the waker `data` created by `into_waker_data`.
unsafe fn drop_from_waker_data(data: *const ());
}
impl<T> DropWaker for Box<T> {
fn into_waker_data(self) -> *const () {
Box::into_raw(self).cast()
}
unsafe fn drop_from_waker_data(data: *const ()) {
drop(Box::<T>::from_raw(data.cast_mut().cast()));
}
}
impl<T> DropWaker for Arc<T> {
fn into_waker_data(self) -> *const () {
Arc::into_raw(self).cast()
}
unsafe fn drop_from_waker_data(data: *const ()) {
drop(Arc::<T>::from_raw(data.cast_mut().cast()));
}
}
/// Queue of completion events.
#[derive(Debug)]
struct CompletionQueue {
/// Mmap-ed pointer to the completion queue.
ptr: *mut libc::c_void,
/// Mmap-ed size in bytes.
size: libc::c_uint,
// NOTE: the following field is constant. we read them once from the mmap
// area and then copied them here to avoid the need for the atomics.
/// Mask used to index into the `sqes` queue.
ring_mask: u32,
// NOTE: the following fields reference mmaped pages shared with the kernel,
// thus all need atomic access.
/// Incremented by us when completions have been read.
head: *mut AtomicU32,
/// Incremented by the kernel when adding completions.
tail: *const AtomicU32,
/// Array of `len` completion entries shared with the kernel. The kernel
/// modifies this array, we're only reading from it.
entries: *const Completion,
}
unsafe impl Send for CompletionQueue {}
unsafe impl Sync for CompletionQueue {}
impl Drop for CompletionQueue {
fn drop(&mut self) {
if let Err(err) = munmap(self.ptr, self.size as usize) {
log::warn!("error unmapping a10::CompletionQueue: {err}");
}
}
}
/// Iterator of completed operations.
struct Completions<'ring> {
/// Same as [`CompletionQueue.entries`].
entries: *const Completion,
/// Local version of `head`. Used to updated `head` once `Completions` is
/// dropped.
local_head: u32,
/// Same as [`CompletionQueue.head`], used to let the kernel know we've read
/// the completions once we're dropped.
head: *mut AtomicU32,
/// Tail of `entries`, i.e. number of completions the kernel wrote.
tail: u32,
/// Same as [`CompletionQueue.ring_mask`].
ring_mask: u32,
/// We're depend on the lifetime of [`Ring`].
_lifetime: PhantomData<&'ring Ring>,
}
impl<'ring> Iterator for Completions<'ring> {
type Item = &'ring Completion;
fn next(&mut self) -> Option<Self::Item> {
let head = self.local_head;
let tail = self.tail;
if head < tail {
// SAFETY: the `ring_mask` ensures we can never get an `idx` larger
// then the size of the queue. We checked above that the kernel has
// written the struct (and isn't writing to now) os we can safely
// read from it.
let idx = (head & self.ring_mask) as usize;
let completion = unsafe { &*self.entries.add(idx) };
self.local_head += 1;
Some(completion)
} else {
None
}
}
}
impl<'ring> Drop for Completions<'ring> {
fn drop(&mut self) {
// Let the kernel know we've read the completions.
// SAFETY: the kernel needs to read the value so we need `Release`. The
// pointer itself is valid as long as `Ring.fd` is alive.
unsafe { (*self.head).store(self.local_head, Ordering::Release) }
}
}
/// Event that represents a completed operation.
#[repr(transparent)]
struct Completion {
inner: libc::io_uring_cqe,
}
impl Completion {
/// Returns the operation index.
const fn index(&self) -> usize {
self.inner.user_data as usize
}
/// Returns the result of the operation.
const fn result(&self) -> i32 {
self.inner.res
}
/// Return `true` if `IORING_CQE_F_MORE` is set.
const fn is_in_progress(&self) -> bool {
self.inner.flags & libc::IORING_CQE_F_MORE != 0
}
/// Return `true` if `IORING_CQE_F_NOTIF` is set.
const fn is_notification(&self) -> bool {
self.inner.flags & libc::IORING_CQE_F_NOTIF != 0
}
/// Return `true` if `IORING_CQE_F_BUFFER` is set.
const fn is_buffer_select(&self) -> bool {
self.inner.flags & libc::IORING_CQE_F_BUFFER != 0
}
const fn flags(&self) -> u16 {
(self.inner.flags & ((1 << libc::IORING_CQE_BUFFER_SHIFT) - 1)) as u16
}
/// Returns the operation flags that need to be passed to
/// [`QueuedOperation`].
const fn operation_flags(&self) -> u16 {
if self.is_buffer_select() {
(self.inner.flags >> libc::IORING_CQE_BUFFER_SHIFT) as u16
} else {
0
}
}
}
impl fmt::Debug for Completion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Completion")
.field("user_data", &self.inner.user_data)
// NOTE this this isn't always an errno, so we can't use
// `io::Error::from_raw_os_error` without being misleading.
.field("res", &self.inner.res)
.field("flags", &self.flags())
.field("operation_flags", &self.operation_flags())
.finish()
}
}
/// An open file descriptor.
///
/// All functions on `AsyncFd` are asynchronous and return a [`Future`].
///
/// [`Future`]: std::future::Future
pub struct AsyncFd {
/// # Notes
///
/// We use `ManuallyDrop` because we drop the fd using io_uring, not a
/// blocking `close(2)` system call.
fd: ManuallyDrop<OwnedFd>,
sq: SubmissionQueue,
}
// NOTE: the implementations are split over the modules to give the `Future`
// implementation types a reasonable place in the docs.
impl AsyncFd {
/// Create a new `AsyncFd`.
pub const fn new(fd: OwnedFd, sq: SubmissionQueue) -> AsyncFd {
AsyncFd {
fd: ManuallyDrop::new(fd),
sq,
}
}
/// Create a new `AsyncFd` from a `RawFd`.
///
/// # Safety
///
/// The caller must ensure that `fd` is valid and that it's no longer used
/// by anything other than the returned `AsyncFd`.
pub unsafe fn from_raw_fd(fd: RawFd, sq: SubmissionQueue) -> AsyncFd {
AsyncFd::new(OwnedFd::from_raw_fd(fd), sq)
}
/// Creates a new independently owned `AsyncFd` that shares the same
/// underlying file descriptor as the existing `AsyncFd`.
#[doc(alias = "dup")]
#[doc(alias = "dup2")]
#[doc(alias = "F_DUPFD")]
#[doc(alias = "F_DUPFD_CLOEXEC")]
pub fn try_clone(&self) -> io::Result<AsyncFd> {
let fd = self.fd.try_clone()?;
Ok(AsyncFd::new(fd, self.sq.clone()))
}
/// Returns the `RawFd` of this `AsyncFd`.
fn fd(&self) -> RawFd {
self.fd.as_raw_fd()
}
}
impl AsFd for AsyncFd {
fn as_fd(&self) -> BorrowedFd<'_> {
self.fd.as_fd()
}
}
impl fmt::Debug for AsyncFd {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
struct AsyncFdSubmissionQueue<'a>(&'a SubmissionQueue);
impl fmt::Debug for AsyncFdSubmissionQueue<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SubmissionQueue")
.field("ring_fd", &self.0.shared.ring_fd.as_raw_fd())
.finish()
}
}
f.debug_struct("AsyncFd")
.field("fd", &self.fd.as_raw_fd())
.field("sq", &AsyncFdSubmissionQueue(&self.sq))
.finish()
}
}
impl Drop for AsyncFd {
fn drop(&mut self) {
let result = self.sq.add_no_result(|submission| unsafe {
submission.close(self.fd());
submission.no_completion_event();
});
if let Err(err) = result {
log::error!("error submitting close operation for a10::AsyncFd: {err}");
}
}
}