use crate::{
telemetry::metrics::{raw, Gauge, Register},
Error, IoBufMut, IoBufs,
};
use commonware_utils::channel::{
mpsc::{self, error::TryRecvError},
oneshot,
};
use io_uring::{
cqueue::Entry as CqueueEntry,
opcode::AsyncCancel,
squeue::SubmissionQueue,
types::{SubmitArgs, Timespec},
IoUring,
};
use request::{ReadAtRequest, RecvRequest, Request, SendRequest, SyncRequest, WriteAtRequest};
use std::{
collections::VecDeque,
fs::File,
os::fd::OwnedFd,
sync::Arc,
time::{Duration, Instant},
};
mod request;
mod timeout;
use timeout::{Tick, TimeoutWheel};
mod waiter;
use waiter::{CompletionOutcome, StageOutcome, WaiterId, Waiters};
mod waker;
use waker::{Waker, HALF_SUBMISSION_SEQUENCE_DOMAIN, SUBMISSION_SEQ_MASK, WAKE_USER_DATA};
mod spinner;
pub use spinner::Config as SpinnerConfig;
use spinner::Spinner;
pub const MAX_RING_SIZE: u32 = HALF_SUBMISSION_SEQUENCE_DOMAIN / 2;
type UserData = u64;
#[derive(Debug)]
pub struct Metrics {
pending_operations: Gauge,
}
impl Metrics {
pub fn new(registry: &mut impl Register) -> Self {
Self {
pending_operations: registry.register(
"pending_operations",
"Number of active logical requests in the io_uring loop",
raw::Gauge::default(),
),
}
}
}
#[derive(Clone, Debug)]
pub struct Config {
pub size: u32,
pub io_poll: bool,
pub single_issuer: bool,
pub max_request_timeout: Duration,
pub shutdown_timeout: Option<Duration>,
pub timeout_wheel_tick: Duration,
pub idle_spinner: SpinnerConfig,
}
impl Default for Config {
fn default() -> Self {
Self {
size: 128,
io_poll: false,
single_issuer: false,
max_request_timeout: Duration::from_secs(60),
shutdown_timeout: None,
timeout_wheel_tick: Duration::from_millis(5),
idle_spinner: SpinnerConfig::default(),
}
}
}
struct HandleInner {
sender: Option<mpsc::Sender<Request>>,
waker: Waker,
}
impl Drop for HandleInner {
fn drop(&mut self) {
drop(self.sender.take());
self.waker.wake();
}
}
#[derive(Clone)]
pub struct Handle {
inner: Arc<HandleInner>,
}
impl Handle {
async fn enqueue(&self, request: Request) -> Result<(), mpsc::error::SendError<Request>> {
self.inner
.sender
.as_ref()
.expect("handle sender is only taken on drop")
.send(request)
.await?;
self.inner.waker.publish();
Ok(())
}
#[cfg_attr(not(feature = "iouring-network"), allow(dead_code))]
pub async fn send(
&self,
fd: Arc<OwnedFd>,
bufs: IoBufs,
deadline: Instant,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.enqueue(Request::Send(SendRequest {
fd,
write: bufs.into(),
deadline: Some(deadline),
result: None,
sender: tx,
}))
.await
.map_err(|_| Error::SendFailed)?;
rx.await.map_err(|_| Error::SendFailed)?
}
#[allow(clippy::result_large_err)]
#[cfg_attr(not(feature = "iouring-network"), allow(dead_code))]
pub async fn recv(
&self,
fd: Arc<OwnedFd>,
buf: IoBufMut,
offset: usize,
len: usize,
exact: bool,
deadline: Instant,
) -> Result<(IoBufMut, usize), (IoBufMut, Error)> {
assert!(
offset <= len && len <= buf.capacity(),
"recv invariant violated: need offset <= len <= capacity"
);
let (tx, rx) = oneshot::channel();
let request = Request::Recv(RecvRequest {
fd,
buf,
offset,
len,
exact,
deadline: Some(deadline),
result: None,
sender: tx,
});
if let Err(err) = self.enqueue(request).await {
let Request::Recv(request) = err.0 else {
unreachable!("recv enqueue returned wrong request variant");
};
return Err((request.buf, Error::RecvFailed));
}
rx.await.unwrap_or_else(|_| {
Err((IoBufMut::default(), Error::RecvFailed))
})
}
#[allow(clippy::result_large_err)]
#[cfg_attr(not(feature = "iouring-storage"), allow(dead_code))]
pub async fn read_at(
&self,
file: Arc<File>,
offset: u64,
len: usize,
buf: IoBufMut,
) -> Result<IoBufMut, (IoBufMut, Error)> {
assert!(len <= buf.capacity(), "read_at len exceeds buffer capacity");
let (tx, rx) = oneshot::channel();
let request = Request::ReadAt(ReadAtRequest {
file,
offset,
len,
read: 0,
buf,
result: None,
sender: tx,
});
if let Err(err) = self.enqueue(request).await {
let Request::ReadAt(request) = err.0 else {
unreachable!("read_at enqueue returned wrong request variant");
};
return Err((request.buf, Error::ReadFailed));
}
rx.await.unwrap_or_else(|_| {
Err((IoBufMut::default(), Error::ReadFailed))
})
}
#[cfg_attr(not(feature = "iouring-storage"), allow(dead_code))]
pub async fn write_at(&self, file: Arc<File>, offset: u64, bufs: IoBufs) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.enqueue(Request::WriteAt(WriteAtRequest {
file,
offset,
written: 0,
write: bufs.into(),
sync: false,
result: None,
sender: tx,
}))
.await
.map_err(|_| Error::WriteFailed)?;
rx.await.map_err(|_| Error::WriteFailed)?
}
#[cfg_attr(not(feature = "iouring-storage"), allow(dead_code))]
pub async fn write_at_sync(
&self,
file: Arc<File>,
offset: u64,
bufs: IoBufs,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.enqueue(Request::WriteAt(WriteAtRequest {
file,
offset,
written: 0,
write: bufs.into(),
sync: true,
result: None,
sender: tx,
}))
.await
.map_err(|_| Error::WriteFailed)?;
rx.await.map_err(|_| Error::WriteFailed)?
}
#[cfg_attr(not(feature = "iouring-storage"), allow(dead_code))]
pub async fn sync(&self, file: Arc<File>) -> std::io::Result<()> {
let (tx, rx) = oneshot::channel();
self.enqueue(Request::Sync(SyncRequest {
file,
result: None,
sender: tx,
}))
.await
.map_err(|_| std::io::Error::other("failed to send work"))?;
rx.await
.map_err(|_| std::io::Error::other("failed to read result"))?
}
}
pub(crate) struct IoUringLoop {
cfg: Config,
metrics: Arc<Metrics>,
receiver: mpsc::Receiver<Request>,
waiters: Waiters,
ready_queue: VecDeque<WaiterId>,
pending_cancels: VecDeque<WaiterId>,
timeout_wheel: TimeoutWheel,
idle_spinner: Spinner,
waker: Waker,
wake_rearm_needed: bool,
processed_seq: u32,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum FillResult {
Disconnected,
Drained,
AtSubmissionQueueCapacity,
AtWaiterCapacity,
}
impl FillResult {
#[inline]
fn from_fill_state(
waiters: &Waiters,
submission_queue: &SubmissionQueue<'_>,
receiver: &mpsc::Receiver<Request>,
) -> Self {
if waiters.is_full() {
if receiver.is_closed() && receiver.is_empty() {
return Self::Disconnected;
}
Self::AtWaiterCapacity
} else if submission_queue.is_full() {
Self::AtSubmissionQueueCapacity
} else {
Self::Drained
}
}
}
impl IoUringLoop {
pub(crate) fn new(mut cfg: Config, registry: &mut impl Register) -> (Handle, Self) {
assert!(
!cfg.max_request_timeout.is_zero(),
"max_request_timeout must be non-zero for timeout wheel"
);
assert!(
!cfg.timeout_wheel_tick.is_zero(),
"timeout_wheel_tick must be non-zero for timeout wheel"
);
cfg.size = cfg
.size
.checked_next_power_of_two()
.expect("ring size exceeds u32::MAX");
assert!(
cfg.size <= MAX_RING_SIZE,
"rounded ring size must be at most {}",
MAX_RING_SIZE
);
let size = cfg.size as usize;
let metrics = Arc::new(Metrics::new(registry));
let (sender, receiver) = mpsc::channel(size);
let waker = Waker::new().expect("unable to create wake eventfd");
let timeout_wheel = TimeoutWheel::new(
cfg.max_request_timeout,
cfg.timeout_wheel_tick,
Instant::now(),
);
let idle_spinner = Spinner::new(&cfg.idle_spinner, || waker.pending(0));
let waiters = Waiters::new(size);
let handle = Handle {
inner: Arc::new(HandleInner {
sender: Some(sender),
waker: waker.clone(),
}),
};
(
handle,
Self {
cfg,
metrics,
receiver,
waiters,
ready_queue: VecDeque::with_capacity(size),
pending_cancels: VecDeque::with_capacity(size),
timeout_wheel,
idle_spinner,
waker,
wake_rearm_needed: true,
processed_seq: 0,
},
)
}
pub(crate) fn run(mut self) {
let mut ring = new_ring(&self.cfg).expect("unable to create io_uring instance");
loop {
for cqe in ring.completion() {
self.handle_cqe(cqe);
}
self.advance_timeouts();
let fill_result = self.fill_submission_queue(&mut ring);
self.metrics.pending_operations.set(self.waiters.len() as _);
match fill_result {
FillResult::Disconnected => {
self.drain(&mut ring);
return;
}
FillResult::AtWaiterCapacity => {
let arm = self.waker.arm(self.processed_seq);
if !arm.wake_latched() {
self.submit_and_wait(&mut ring, 1, self.timeout_wheel.next_deadline())
.expect("unable to submit to ring");
}
continue;
}
FillResult::AtSubmissionQueueCapacity => {
self.submit(&mut ring).expect("unable to submit to ring");
continue;
}
FillResult::Drained => {
}
}
if self.waiters.is_empty() {
if self
.idle_spinner
.spin(|| self.waker.pending(self.processed_seq))
{
continue;
}
if let Some(park_duration) = self.waker.park_idle(self.processed_seq) {
self.idle_spinner.on_wake(park_duration);
}
continue;
}
let arm = self.waker.arm(self.processed_seq);
if arm.still_idle() {
self.submit_and_wait(&mut ring, 1, self.timeout_wheel.next_deadline())
.expect("unable to submit to ring");
}
}
}
fn admit_request(&mut self, request: Request) -> Option<WaiterId> {
let deadline = request.deadline();
let target_tick = match deadline {
Some(deadline) => match self.timeout_wheel.target_tick(deadline) {
Some(target_tick) => Some(target_tick),
None => {
request.timeout();
return None;
}
},
None => None,
};
let waiter_id = self.waiters.insert(request, target_tick);
if let Some(target_tick) = target_tick {
self.timeout_wheel.schedule(waiter_id, target_tick);
}
Some(waiter_id)
}
fn stage_request(&mut self, waiter_id: WaiterId, submission_queue: &mut SubmissionQueue<'_>) {
match self.waiters.stage(waiter_id) {
StageOutcome::Timeout(request) => request.timeout(),
StageOutcome::Orphaned { target_tick } => {
if let Some(tick) = target_tick {
self.timeout_wheel.remove(tick);
}
}
StageOutcome::Submit(sqe) => {
unsafe {
submission_queue
.push(&sqe)
.expect("unable to push to queue");
}
}
}
}
fn stage_ready_requests(&mut self, submission_queue: &mut SubmissionQueue<'_>) -> bool {
while !submission_queue.is_full() {
let Some(waiter_id) = self.ready_queue.pop_front() else {
return false;
};
self.stage_request(waiter_id, submission_queue);
}
!self.ready_queue.is_empty()
}
fn fill_submission_queue(&mut self, ring: &mut IoUring) -> FillResult {
let mut submission_queue = ring.submission();
let mut wheel_aligned = self.timeout_wheel.next_deadline().is_some();
if self.wake_rearm_needed {
if !self.waker.reinstall(&mut submission_queue) {
return FillResult::AtSubmissionQueueCapacity;
}
self.wake_rearm_needed = false;
}
if self.stage_cancellations(&mut submission_queue) {
return FillResult::from_fill_state(&self.waiters, &submission_queue, &self.receiver);
}
if self.stage_ready_requests(&mut submission_queue) {
return FillResult::from_fill_state(&self.waiters, &submission_queue, &self.receiver);
}
while !self.waiters.is_full() && !submission_queue.is_full() {
let request = match self.receiver.try_recv() {
Ok(request) => request,
Err(TryRecvError::Disconnected) => return FillResult::Disconnected,
Err(TryRecvError::Empty) => {
if !self.waker.pending(self.processed_seq) {
break;
}
self.receiver.try_recv().expect(
"published-ahead sequence observed after acquire, but channel had no request",
)
}
};
self.processed_seq = self.processed_seq.wrapping_add(1) & SUBMISSION_SEQ_MASK;
if !wheel_aligned && request.has_deadline() {
assert!(self.timeout_wheel.advance(Instant::now()).is_none());
wheel_aligned = true;
}
if let Some(waiter_id) = self.admit_request(request) {
self.stage_request(waiter_id, &mut submission_queue);
}
}
FillResult::from_fill_state(&self.waiters, &submission_queue, &self.receiver)
}
fn stage_cancellations(&mut self, submission_queue: &mut SubmissionQueue<'_>) -> bool {
while !submission_queue.is_full() {
let Some(waiter_id) = self.pending_cancels.pop_front() else {
return false;
};
if !self.waiters.is_in_flight(waiter_id) {
continue;
}
let cancel = AsyncCancel::new(waiter_id.user_data())
.build()
.user_data(waiter_id.cancel_user_data());
unsafe {
submission_queue
.push(&cancel)
.expect("unable to push cancel to queue");
}
}
!self.pending_cancels.is_empty()
}
fn handle_cqe(&mut self, cqe: CqueueEntry) {
let user_data = cqe.user_data();
if user_data == WAKE_USER_DATA {
assert!(
cqe.result() >= 0,
"wake poll CQE failed: requires multishot poll (Linux 5.13+)"
);
self.waker.acknowledge();
if !io_uring::cqueue::more(cqe.flags()) {
self.wake_rearm_needed = true;
}
return;
}
match self.waiters.on_completion(user_data, cqe.result()) {
CompletionOutcome::Cancel => {
}
CompletionOutcome::Requeue(waiter_id) => {
self.ready_queue.push_back(waiter_id);
}
CompletionOutcome::Complete {
request,
target_tick,
} => {
if let Some(tick) = target_tick {
self.timeout_wheel.remove(tick);
}
request.complete();
}
}
}
fn advance_timeouts(&mut self) {
if self.timeout_wheel.next_deadline().is_none() {
return;
}
let Some(expired) = self.timeout_wheel.advance(Instant::now()) else {
return;
};
for entry in expired {
if self.waiters.cancel(entry.waiter_id) {
self.timeout_wheel.remove(entry.target_tick);
if self.waiters.is_in_flight(entry.waiter_id) {
self.pending_cancels.push_back(entry.waiter_id);
}
}
}
}
fn drain(&mut self, ring: &mut IoUring) {
let mut remaining = self.cfg.shutdown_timeout;
loop {
for cqe in ring.completion() {
self.handle_cqe(cqe);
}
if self.waiters.is_empty() {
break;
}
if remaining.is_some_and(|t| t.is_zero()) {
break;
}
self.advance_timeouts();
{
let mut submission_queue = ring.submission();
self.stage_cancellations(&mut submission_queue);
self.stage_ready_requests(&mut submission_queue);
}
if self.waiters.is_empty() {
break;
}
let timeout = match (remaining, self.timeout_wheel.next_deadline()) {
(Some(remaining), Some(deadline)) => Some(remaining.min(deadline)),
(Some(remaining), None) => Some(remaining),
(None, Some(deadline)) => Some(deadline),
(None, None) => None,
};
let start = Instant::now();
self.submit_and_wait(ring, 1, timeout)
.expect("unable to submit to ring");
if let Some(remaining) = remaining.as_mut() {
*remaining = remaining.saturating_sub(start.elapsed());
}
}
self.metrics.pending_operations.set(self.waiters.len() as _);
}
fn submit_and_wait(
&self,
ring: &mut IoUring,
want: usize,
timeout: Option<Duration>,
) -> Result<bool, std::io::Error> {
let result = timeout.map_or_else(
|| ring.submit_and_wait(want).map(|_| true),
|timeout| {
let ts = Timespec::new()
.sec(timeout.as_secs())
.nsec(timeout.subsec_nanos());
let args = SubmitArgs::new().timespec(&ts);
match ring.submitter().submit_with_args(want, &args) {
Ok(_) => Ok(true),
Err(err) if err.raw_os_error() == Some(libc::ETIME) => Ok(false),
Err(err) => Err(err),
}
},
);
match result {
Ok(v) => Ok(v),
Err(err) => match err.raw_os_error() {
Some(libc::EINTR | libc::EAGAIN | libc::EBUSY) => Ok(true),
_ => Err(err),
},
}
}
#[inline]
fn submit(&self, ring: &mut IoUring) -> Result<(), std::io::Error> {
self.submit_and_wait(ring, 0, None).map(|_| ())
}
}
fn new_ring(cfg: &Config) -> Result<IoUring, std::io::Error> {
let mut builder = &mut IoUring::builder();
if cfg.io_poll {
builder = builder.setup_iopoll();
}
if cfg.single_issuer {
builder = builder.setup_single_issuer();
builder = builder.setup_defer_taskrun();
}
builder.build(cfg.size)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{telemetry::metrics::Registry, IoBuf, IoBufMut};
use commonware_utils::channel::oneshot::{self, error::RecvError};
use futures::future::{join, join_all};
use request::{RecvRequest, SendRequest, SyncRequest};
use std::{
io::Write,
os::{
fd::{AsRawFd, FromRawFd, IntoRawFd},
unix::net::UnixStream,
},
time::Duration,
};
#[test]
fn test_bounded_mpsc_drains_all_buffered_messages_before_disconnected() {
let (sender, mut receiver) = mpsc::channel(2);
sender
.try_send(41)
.expect("first buffered send should succeed");
sender
.try_send(42)
.expect("second buffered send should succeed");
drop(sender);
assert_eq!(receiver.try_recv(), Ok(41));
assert_eq!(receiver.try_recv(), Ok(42));
assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_pending_recheck_after_empty_preserves_fifo_order_across_producers() {
let (sender_a, mut receiver) = mpsc::channel(2);
let sender_b = sender_a.clone();
let waker = Waker::new().expect("eventfd creation should succeed");
assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
let (a_enqueued_tx, a_enqueued_rx) = tokio::sync::oneshot::channel();
let (allow_a_publish_tx, allow_a_publish_rx) = tokio::sync::oneshot::channel();
let waker_a = waker.clone();
let producer_a = tokio::spawn(async move {
sender_a
.send(1u8)
.await
.expect("producer A should enqueue before publishing");
a_enqueued_tx
.send(())
.expect("producer A enqueue signal should be received");
allow_a_publish_rx
.await
.expect("producer A publish should be released");
waker_a.publish();
});
let waker_b = waker.clone();
let producer_b = tokio::spawn(async move {
a_enqueued_rx
.await
.expect("producer B should wait for producer A enqueue");
sender_b
.send(2u8)
.await
.expect("producer B should enqueue after producer A");
waker_b.publish();
});
while !waker.pending(0) {
tokio::task::yield_now().await;
}
assert_eq!(receiver.try_recv(), Ok(1));
assert!(!waker.pending(1));
allow_a_publish_tx
.send(())
.expect("producer A publish should be unblocked");
while !waker.pending(1) {
tokio::task::yield_now().await;
}
assert_eq!(receiver.try_recv(), Ok(2));
producer_a.await.expect("producer A task should finish");
producer_b.await.expect("producer B task should finish");
}
#[test]
fn test_iouring_loop_rounds_ring_size_up_to_power_of_two() {
let mut registry = Registry::default();
let cfg = Config {
size: 1_000,
..Default::default()
};
let (_, iouring) = IoUringLoop::new(cfg, &mut registry);
assert_eq!(iouring.cfg.size, 1_024);
let cfg = Config {
size: 1_024,
..Default::default()
};
let (_, iouring) = IoUringLoop::new(cfg, &mut registry);
assert_eq!(iouring.cfg.size, 1_024);
}
#[test]
#[should_panic(expected = "rounded ring size must be at most")]
fn test_iouring_loop_rejects_sizes_that_exceed_max_ring_size() {
let mut registry = Registry::default();
let cfg = Config {
size: MAX_RING_SIZE + 1,
..Default::default()
};
let _ = IoUringLoop::new(cfg, &mut registry);
}
#[test]
fn test_submit_and_wait_non_etime_error_is_not_misclassified() {
let cfg = Config::default();
let mut registry = Registry::default();
let (_submitter, iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
let close_result = unsafe { libc::close(ring.as_raw_fd()) };
assert_eq!(close_result, 0, "failed to close ring fd in test");
let err = iouring
.submit_and_wait(&mut ring, 1, Some(Duration::from_millis(1)))
.expect_err("submit_and_wait should fail on closed ring fd");
assert_ne!(err.raw_os_error(), Some(libc::ETIME));
std::mem::forget(ring);
}
#[test]
fn test_new_ring_iopoll_builder_path_is_exercised() {
let cfg = Config {
io_poll: true,
..Default::default()
};
match new_ring(&cfg) {
Ok(_ring) => {}
Err(err) => assert!(err.raw_os_error().is_some()),
}
}
#[test]
fn test_fill_submission_queue_returns_waiter_capacity_when_cancel_staging_fills_sq() {
let cfg = Config {
size: 8,
..Default::default()
};
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
for _ in 0..cfg.size as usize {
let (sock_left, _sock_right) =
UnixStream::pair().expect("failed to create unix socket pair");
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
let request = Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
});
let waiter_id = iouring.waiters.insert(request, None);
assert!(matches!(
iouring.waiters.stage(waiter_id),
StageOutcome::Submit(_)
));
assert!(
iouring.waiters.cancel(waiter_id),
"cancel should transition waiter to cancel-requested"
);
iouring.pending_cancels.push_back(waiter_id);
}
let fill_result = iouring.fill_submission_queue(&mut ring);
assert_eq!(fill_result, FillResult::AtWaiterCapacity);
assert!(!iouring.pending_cancels.is_empty());
}
#[test]
fn test_fill_submission_queue_returns_waiter_capacity_when_ready_staging_fills_sq() {
let cfg = Config {
size: 8,
..Default::default()
};
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
for _ in 0..cfg.size as usize {
let (sock_left, _sock_right) =
UnixStream::pair().expect("failed to create unix socket pair");
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
let request = Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
});
let waiter_id = iouring.waiters.insert(request, None);
iouring.ready_queue.push_back(waiter_id);
}
let fill_result = iouring.fill_submission_queue(&mut ring);
assert_eq!(fill_result, FillResult::AtWaiterCapacity);
assert!(!iouring.ready_queue.is_empty());
}
#[test]
fn test_fill_submission_queue_returns_disconnected_when_early_return_skips_receiver() {
#[derive(Debug)]
enum EarlyReturnPath {
Cancel,
Ready,
}
for path in [EarlyReturnPath::Cancel, EarlyReturnPath::Ready] {
let cfg = Config {
size: 8,
..Default::default()
};
let mut registry = Registry::default();
let (submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
match path {
EarlyReturnPath::Cancel => {
for _ in 0..cfg.size as usize {
let (sock_left, _sock_right) =
UnixStream::pair().expect("failed to create unix socket pair");
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
let request = Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
});
let waiter_id = iouring.waiters.insert(request, None);
assert!(matches!(
iouring.waiters.stage(waiter_id),
StageOutcome::Submit(_)
));
assert!(
iouring.waiters.cancel(waiter_id),
"cancel should transition waiter to cancel-requested"
);
iouring.pending_cancels.push_back(waiter_id);
}
}
EarlyReturnPath::Ready => {
for _ in 0..cfg.size as usize {
let (sock_left, _sock_right) =
UnixStream::pair().expect("failed to create unix socket pair");
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
let request = Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
});
let waiter_id = iouring.waiters.insert(request, None);
iouring.ready_queue.push_back(waiter_id);
}
}
}
drop(submitter);
let fill_result = iouring.fill_submission_queue(&mut ring);
assert_eq!(fill_result, FillResult::Disconnected, "{path:?}");
match path {
EarlyReturnPath::Cancel => {
assert!(!iouring.pending_cancels.is_empty(), "{path:?}");
}
EarlyReturnPath::Ready => {
assert!(!iouring.ready_queue.is_empty(), "{path:?}");
}
}
}
}
#[test]
fn test_fill_submission_queue_returns_submission_queue_capacity_when_fresh_staging_fills_sq() {
let cfg = Config {
size: 8,
..Default::default()
};
let mut registry = Registry::default();
let (submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
futures::executor::block_on(async {
for _ in 0..cfg.size as usize {
let (sock_left, _sock_right) =
UnixStream::pair().expect("failed to create unix socket pair");
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
submitter
.enqueue(Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
}))
.await
.expect("failed to enqueue request");
}
});
let fill_result = iouring.fill_submission_queue(&mut ring);
assert_eq!(fill_result, FillResult::AtSubmissionQueueCapacity);
assert!(ring.submission().is_full());
assert!(iouring.waiters.len() < cfg.size as usize);
}
#[test]
fn test_fill_submission_queue_returns_waiter_capacity_when_waiters_are_full() {
let cfg = Config {
size: 8,
..Default::default()
};
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
iouring.wake_rearm_needed = false;
for _ in 0..cfg.size as usize {
let (sock_left, _sock_right) =
UnixStream::pair().expect("failed to create unix socket pair");
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
let request = Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
});
iouring.waiters.insert(request, None);
}
let fill_result = iouring.fill_submission_queue(&mut ring);
assert_eq!(fill_result, FillResult::AtWaiterCapacity);
assert_eq!(ring.submission().len(), 0);
}
#[test]
fn test_fill_submission_queue_returns_waiter_capacity_when_fresh_staging_fills_everything() {
let cfg = Config {
size: 8,
..Default::default()
};
let mut registry = Registry::default();
let (submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
iouring.wake_rearm_needed = false;
futures::executor::block_on(async {
for _ in 0..cfg.size as usize {
let (sock_left, _sock_right) =
UnixStream::pair().expect("failed to create unix socket pair");
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
submitter
.enqueue(Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
}))
.await
.expect("failed to enqueue request");
}
});
let fill_result = iouring.fill_submission_queue(&mut ring);
assert_eq!(fill_result, FillResult::AtWaiterCapacity);
assert!(ring.submission().is_full());
assert_eq!(iouring.waiters.len(), cfg.size as usize);
}
#[test]
fn test_fill_submission_queue_skips_cancel_for_ready_queue_timeout() {
let cfg = Config::default();
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
iouring.wake_rearm_needed = false;
let (sock_left, _sock_right) =
UnixStream::pair().expect("failed to create unix socket pair");
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
let request = Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
});
let waiter_id = iouring.waiters.insert(request, None);
assert!(iouring.waiters.cancel(waiter_id));
iouring.pending_cancels.push_back(waiter_id);
let fill_result = iouring.fill_submission_queue(&mut ring);
assert_eq!(fill_result, FillResult::Drained);
assert!(iouring.pending_cancels.is_empty());
assert_eq!(ring.submission().len(), 0);
assert!(matches!(
iouring.waiters.stage(waiter_id),
StageOutcome::Timeout(_)
));
}
#[tokio::test]
async fn test_fill_submission_queue_expired_deadline_completes_immediately() {
let cfg = Config::default();
let mut registry = Registry::default();
let (submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
iouring.wake_rearm_needed = false;
let (tx, rx) = oneshot::channel();
let past_deadline = Instant::now()
.checked_sub(Duration::from_secs(1))
.unwrap_or_else(Instant::now);
submitter
.enqueue(Request::Send(SendRequest {
fd: Arc::new(UnixStream::pair().unwrap().0.into()),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: Some(past_deadline),
result: None,
sender: tx,
}))
.await
.expect("failed to enqueue request");
let fill_result = iouring.fill_submission_queue(&mut ring);
assert_eq!(fill_result, FillResult::Drained);
assert!(iouring.pending_cancels.is_empty());
assert!(iouring.waiters.is_empty());
assert_eq!(ring.submission().len(), 0);
let result = rx.await.expect("missing timeout completion");
assert!(matches!(result, Err(crate::Error::Timeout)));
}
#[test]
fn test_handle_recv_panics_on_invalid_buffer_bounds() {
let cfg = Config::default();
let mut registry = Registry::default();
let (handle, io_loop) = IoUringLoop::new(cfg, &mut registry);
drop(io_loop);
let offset_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let (left, _right) = UnixStream::pair().unwrap();
let _ = futures::executor::block_on(handle.recv(
Arc::new(left.into()),
IoBufMut::with_capacity(4),
5,
4,
true,
Instant::now() + Duration::from_secs(1),
));
}));
assert!(offset_panic.is_err());
let capacity_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let (left, _right) = UnixStream::pair().unwrap();
let _ = futures::executor::block_on(handle.recv(
Arc::new(left.into()),
IoBufMut::with_capacity(4),
0,
5,
true,
Instant::now() + Duration::from_secs(1),
));
}));
assert!(capacity_panic.is_err());
}
#[tokio::test]
async fn test_timeout_slot_reuse_does_not_cancel_new_waiter_early() {
let cfg = Config {
size: 8,
max_request_timeout: Duration::from_millis(200),
timeout_wheel_tick: Duration::from_millis(5),
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (left1, right1) = UnixStream::pair().unwrap();
(&right1).write_all(&[42]).unwrap();
let (_buf1, read1) = submitter
.recv(
Arc::new(left1.into()),
IoBufMut::with_capacity(1),
0,
1,
false,
Instant::now() + Duration::from_millis(200),
)
.await
.expect("first recv should succeed");
assert!(read1 > 0);
let (left2, _right2) = UnixStream::pair().expect("failed to create unix stream pair");
let start = Instant::now();
let result2 = submitter
.recv(
Arc::new(left2.into()),
IoBufMut::with_capacity(8),
0,
8,
false,
Instant::now() + Duration::from_millis(80),
)
.await;
let elapsed = start.elapsed();
assert!(matches!(result2, Err((_, crate::Error::Timeout))));
assert!(
elapsed >= Duration::from_millis(50),
"timeout fired too early after slot reuse: {elapsed:?}"
);
drop(submitter);
handle.join().expect("io_uring loop thread panicked");
}
#[test]
fn test_stage_request_panics_on_stale_ready_queue_entry() {
let cfg = Config::default();
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
let (sock_left, _sock_right) = UnixStream::pair().unwrap();
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
let stale = iouring.waiters.insert(
Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
}),
None,
);
assert!(matches!(
iouring.waiters.stage(stale),
StageOutcome::Submit(_)
));
match iouring.waiters.on_completion(stale.user_data(), 0) {
CompletionOutcome::Complete { request, .. } => request.complete(),
_ => panic!("sync waiter should complete immediately"),
}
let (sock_left, _sock_right) = UnixStream::pair().unwrap();
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
let reused = iouring.waiters.insert(
Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
}),
None,
);
assert_eq!(reused.index(), stale.index());
assert_ne!(reused, stale);
let stale_ready = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut sq = ring.submission();
iouring.stage_request(stale, &mut sq);
}));
assert!(stale_ready.is_err());
assert!(!iouring.waiters.is_in_flight(reused));
}
#[test]
fn test_advance_timeouts_ignores_stale_entry_after_slot_reuse() {
let cfg = Config {
max_request_timeout: Duration::from_secs(1),
timeout_wheel_tick: Duration::from_millis(100),
..Default::default()
};
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg, &mut registry);
let (sock_left, _) = UnixStream::pair().unwrap();
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (old_tx, _old_rx) = oneshot::channel();
let old_req = Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: old_tx,
});
let old_slot = iouring.waiters.insert(old_req, Some(1));
iouring.timeout_wheel.schedule(old_slot, 1);
assert!(matches!(
iouring.waiters.stage(old_slot),
StageOutcome::Submit(_)
));
if let CompletionOutcome::Complete {
request,
target_tick: Some(tick),
} = iouring.waiters.on_completion(old_slot.user_data(), 0)
{
iouring.timeout_wheel.remove(tick);
request.complete();
}
let (sock_left2, _) = UnixStream::pair().unwrap();
let file2 = unsafe { std::fs::File::from_raw_fd(sock_left2.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
let req = Request::Sync(SyncRequest {
file: Arc::new(file2),
result: None,
sender: tx,
});
let slot_index = iouring.waiters.insert(req, Some(3));
assert_eq!(slot_index.index(), old_slot.index());
assert!(matches!(
iouring.waiters.stage(slot_index),
StageOutcome::Submit(_)
));
iouring.timeout_wheel.schedule(slot_index, 3);
std::thread::sleep(iouring.cfg.timeout_wheel_tick + Duration::from_millis(10));
iouring.advance_timeouts();
assert!(iouring.pending_cancels.is_empty());
std::thread::sleep((iouring.cfg.timeout_wheel_tick * 2) + Duration::from_millis(10));
iouring.advance_timeouts();
assert_eq!(iouring.pending_cancels.len(), 1);
}
#[test]
fn test_cancel_completion_returns_saved_op_result() {
let cfg = Config::default();
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg, &mut registry);
let (left, right) = UnixStream::pair().unwrap();
(&right).write_all(b"hello").unwrap();
let (tx, rx) = oneshot::channel();
let req = Request::Recv(RecvRequest {
fd: Arc::new(left.into()),
buf: IoBufMut::with_capacity(5),
offset: 0,
len: 5,
exact: false,
deadline: Some(Instant::now() + Duration::from_secs(1)),
result: None,
sender: tx,
});
let slot_index = iouring.waiters.insert(req, Some(2));
assert!(matches!(
iouring.waiters.stage(slot_index),
StageOutcome::Submit(_)
));
assert!(
iouring.waiters.cancel(slot_index),
"cancel should transition active waiter"
);
iouring.timeout_wheel.schedule(slot_index, 2);
iouring.timeout_wheel.remove(2);
if let CompletionOutcome::Complete { request, .. } =
iouring.waiters.on_completion(slot_index.user_data(), 5)
{
request.complete();
}
assert!(matches!(
iouring
.waiters
.on_completion(slot_index.cancel_user_data(), -libc::ECANCELED),
CompletionOutcome::Cancel
));
let (_, result) = futures::executor::block_on(rx)
.expect("missing completion")
.expect("recv should succeed");
assert_eq!(result, 5);
assert_eq!(iouring.waiters.len(), 0);
}
#[test]
fn test_staged_cancel_cqe_is_ignored_after_timeout_completion() {
let cfg = Config {
max_request_timeout: Duration::from_millis(100),
timeout_wheel_tick: Duration::from_millis(5),
..Default::default()
};
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
let (left, _right) = UnixStream::pair().unwrap();
let (tx, rx) = oneshot::channel();
let req = Request::Recv(RecvRequest {
fd: Arc::new(left.into()),
buf: IoBufMut::with_capacity(8),
offset: 0,
len: 8,
exact: true,
deadline: Some(Instant::now() + Duration::from_millis(25)),
result: None,
sender: tx,
});
let waiter_id = iouring.waiters.insert(req, Some(1));
assert!(matches!(
iouring.waiters.stage(waiter_id),
StageOutcome::Submit(_)
));
iouring.timeout_wheel.schedule(waiter_id, 1);
std::thread::sleep(iouring.cfg.timeout_wheel_tick + Duration::from_millis(2));
iouring.advance_timeouts();
assert_eq!(iouring.pending_cancels.len(), 1);
{
let mut submission_queue = ring.submission();
assert!(!iouring.stage_cancellations(&mut submission_queue));
assert_eq!(submission_queue.len(), 1);
}
assert!(iouring.pending_cancels.is_empty());
match iouring.waiters.on_completion(waiter_id.user_data(), 4) {
CompletionOutcome::Complete {
request,
target_tick: None,
} => request.complete(),
_ => panic!("missing timeout completion from op CQE"),
}
assert!(matches!(
iouring
.waiters
.on_completion(waiter_id.cancel_user_data(), -libc::ECANCELED),
CompletionOutcome::Cancel
));
assert!(matches!(
futures::executor::block_on(rx).expect("missing completion"),
Err((_, crate::Error::Timeout))
));
assert_eq!(iouring.waiters.len(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_publish_wakes_eventfd_blocked_loop() {
let cfg = Config {
size: 2,
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let eventfd_waker = iouring.waker.clone();
let handle = std::thread::spawn(move || iouring.run());
let (pipe_left1, pipe_right1) = UnixStream::pair().unwrap();
let (tx1, rx1) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(pipe_left1.into()),
buf: IoBufMut::with_capacity(1),
offset: 0,
len: 1,
exact: true,
deadline: None,
result: None,
sender: tx1,
}))
.await
.unwrap();
waker::tests::wait_until_eventfd_armed(&eventfd_waker);
let (pipe_left2, pipe_right2) = UnixStream::pair().unwrap();
(&pipe_right2).write_all(&[9]).unwrap();
let (tx2, rx2) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(pipe_left2.into()),
buf: IoBufMut::with_capacity(1),
offset: 0,
len: 1,
exact: true,
deadline: None,
result: None,
sender: tx2,
}))
.await
.unwrap();
let (_, read2) = tokio::time::timeout(Duration::from_secs(2), rx2)
.await
.expect("published recv timed out")
.expect("missing published recv completion")
.expect("published recv should succeed");
assert_eq!(read2, 1);
(&pipe_right1).write_all(&[3]).unwrap();
let (_, read1) = tokio::time::timeout(Duration::from_secs(2), rx1)
.await
.expect("blocking recv timed out")
.expect("missing blocking recv completion")
.expect("blocking recv should succeed");
assert_eq!(read1, 1);
drop(pipe_right1);
drop(pipe_right2);
drop(submitter);
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_wake_reinstall_survives_submission_queue_full_and_idle_futex_deferral() {
{
let cfg = Config {
size: 8,
..Default::default()
};
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
iouring.wake_rearm_needed = true;
{
let mut submission_queue = ring.submission();
while !submission_queue.is_full() {
let nop = io_uring::opcode::Nop::new().build().user_data(0);
unsafe {
submission_queue
.push(&nop)
.expect("unable to fill submission queue");
}
}
}
let fill_result = iouring.fill_submission_queue(&mut ring);
assert_eq!(fill_result, FillResult::AtSubmissionQueueCapacity);
assert!(iouring.wake_rearm_needed);
}
let cfg = Config {
size: 2,
..Default::default()
};
let mut registry = Registry::default();
let (submitter, mut iouring) = IoUringLoop::new(cfg, &mut registry);
let idle_waker = iouring.waker.clone();
let eventfd_waker = iouring.waker.clone();
iouring.wake_rearm_needed = true;
let handle = std::thread::spawn(move || iouring.run());
waker::tests::wait_until_futex_armed(&idle_waker);
let (pipe_left1, pipe_right1) = UnixStream::pair().unwrap();
let (tx1, rx1) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(pipe_left1.into()),
buf: IoBufMut::with_capacity(1),
offset: 0,
len: 1,
exact: true,
deadline: None,
result: None,
sender: tx1,
}))
.await
.unwrap();
waker::tests::wait_until_eventfd_armed(&eventfd_waker);
let (pipe_left2, pipe_right2) = UnixStream::pair().unwrap();
(&pipe_right2).write_all(&[5]).unwrap();
let (tx2, rx2) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(pipe_left2.into()),
buf: IoBufMut::with_capacity(1),
offset: 0,
len: 1,
exact: true,
deadline: None,
result: None,
sender: tx2,
}))
.await
.unwrap();
let (_, read2) = tokio::time::timeout(Duration::from_secs(2), rx2)
.await
.expect("deferred-reinstall recv timed out")
.expect("missing deferred-reinstall recv completion")
.expect("deferred-reinstall recv should succeed");
assert_eq!(read2, 1);
(&pipe_right1).write_all(&[3]).unwrap();
let (_, read1) = tokio::time::timeout(Duration::from_secs(2), rx1)
.await
.expect("blocking recv timed out")
.expect("missing blocking recv completion")
.expect("blocking recv should succeed");
assert_eq!(read1, 1);
drop(pipe_right1);
drop(pipe_right2);
drop(submitter);
handle.join().unwrap();
}
#[test]
fn test_idle_shutdown_wakes_futex_wait_across_publish_then_drop_interleavings() {
#[derive(Clone, Copy, Debug)]
enum Scenario {
DropOnly,
PublishThenDrop,
}
for scenario in [Scenario::DropOnly, Scenario::PublishThenDrop] {
let cfg = Config::default();
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let idle_waker = iouring.waker.clone();
let handle = std::thread::spawn(move || iouring.run());
waker::tests::wait_until_futex_armed(&idle_waker);
match scenario {
Scenario::DropOnly => {
drop(submitter);
}
Scenario::PublishThenDrop => {
let (sock_left, _sock_right) = UnixStream::pair().unwrap();
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, rx) = oneshot::channel();
futures::executor::block_on(submitter.enqueue(Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
})))
.expect("idle publish enqueue should succeed");
drop(submitter);
let result = futures::executor::block_on(rx)
.expect("missing published request completion");
let _ = result;
}
}
handle
.join()
.unwrap_or_else(|_| panic!("io_uring loop thread panicked: {scenario:?}"));
}
}
#[tokio::test]
async fn test_timeout() {
let cfg = Config {
max_request_timeout: Duration::from_secs(1),
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (pipe_left, _pipe_right) = UnixStream::pair().unwrap();
assert!(matches!(
submitter
.recv(
Arc::new(pipe_left.into()),
IoBufMut::with_capacity(8),
0,
8,
false,
Instant::now() + Duration::from_secs(1),
)
.await,
Err((_, crate::Error::Timeout))
));
drop(submitter);
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_no_timeout() {
let cfg = Config {
shutdown_timeout: None,
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (sock_left, _) = UnixStream::pair().unwrap();
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, rx) = oneshot::channel();
submitter
.enqueue(Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
}))
.await
.unwrap();
drop(submitter);
let result = rx.await.unwrap();
let _ = result;
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_no_timeout_continues_deadline_processing() {
let cfg = Config {
max_request_timeout: Duration::from_millis(250),
timeout_wheel_tick: Duration::from_millis(5),
shutdown_timeout: None,
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (pipe_left, _pipe_right) = UnixStream::pair().unwrap();
let (tx, rx) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(pipe_left.into()),
buf: IoBufMut::with_capacity(8),
offset: 0,
len: 8,
exact: false,
deadline: Some(Instant::now() + Duration::from_millis(50)),
result: None,
sender: tx,
}))
.await
.unwrap();
drop(submitter);
let result = tokio::time::timeout(Duration::from_secs(2), rx)
.await
.expect("deadline completion timed out");
assert!(matches!(
result.expect("missing deadline completion"),
Err((_, crate::Error::Timeout))
));
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_timeout() {
let cfg = Config {
shutdown_timeout: Some(Duration::from_secs(1)),
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (pipe_left, _pipe_right) = UnixStream::pair().unwrap();
let (tx, rx) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(pipe_left.into()),
buf: IoBufMut::with_capacity(8),
offset: 0,
len: 8,
exact: false,
deadline: None,
result: None,
sender: tx,
}))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
drop(submitter);
let err = rx.await.unwrap_err();
assert!(matches!(err, RecvError { .. }));
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_timeout_when_waiters_full_and_channel_empty() {
let cfg = Config {
size: 1,
shutdown_timeout: Some(Duration::from_millis(50)),
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let eventfd_waker = iouring.waker.clone();
let handle = std::thread::spawn(move || iouring.run());
let (pipe_left, pipe_right) = UnixStream::pair().unwrap();
let (tx, rx) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(pipe_left.into()),
buf: IoBufMut::with_capacity(8),
offset: 0,
len: 8,
exact: false,
deadline: None,
result: None,
sender: tx,
}))
.await
.unwrap();
waker::tests::wait_until_eventfd_armed(&eventfd_waker);
drop(submitter);
let err = tokio::time::timeout(Duration::from_secs(2), rx)
.await
.expect("shutdown abandonment timed out")
.unwrap_err();
assert!(matches!(err, RecvError { .. }));
drop(pipe_right);
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_drains_buffered_request_behind_full_waiter_capacity() {
let cfg = Config {
size: 1,
shutdown_timeout: None,
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let eventfd_waker = iouring.waker.clone();
let handle = std::thread::spawn(move || iouring.run());
let (pipe_left1, pipe_right1) = UnixStream::pair().unwrap();
let (tx1, rx1) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(pipe_left1.into()),
buf: IoBufMut::with_capacity(1),
offset: 0,
len: 1,
exact: true,
deadline: None,
result: None,
sender: tx1,
}))
.await
.unwrap();
waker::tests::wait_until_eventfd_armed(&eventfd_waker);
let (pipe_left2, pipe_right2) = UnixStream::pair().unwrap();
(&pipe_right2).write_all(&[7]).unwrap();
let (tx2, rx2) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(pipe_left2.into()),
buf: IoBufMut::with_capacity(1),
offset: 0,
len: 1,
exact: true,
deadline: None,
result: None,
sender: tx2,
}))
.await
.unwrap();
drop(submitter);
(&pipe_right1).write_all(&[3]).unwrap();
let result1 = tokio::time::timeout(Duration::from_secs(2), rx1)
.await
.expect("first recv timed out")
.expect("missing first recv completion");
let result2 = tokio::time::timeout(Duration::from_secs(2), rx2)
.await
.expect("buffered recv timed out")
.expect("missing buffered recv completion");
let (_, read1) = result1.expect("first recv should succeed");
let (_, read2) = result2.expect("buffered recv should succeed");
assert_eq!(read1, 1);
assert_eq!(read2, 1);
drop(pipe_right1);
drop(pipe_right2);
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_timeout_preserves_deadline_result() {
let cfg = Config {
max_request_timeout: Duration::from_millis(250),
timeout_wheel_tick: Duration::from_millis(5),
shutdown_timeout: Some(Duration::from_millis(500)),
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (pipe_left, _pipe_right) = UnixStream::pair().unwrap();
let (tx, rx) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(pipe_left.into()),
buf: IoBufMut::with_capacity(8),
offset: 0,
len: 8,
exact: false,
deadline: Some(Instant::now() + Duration::from_millis(50)),
result: None,
sender: tx,
}))
.await
.unwrap();
drop(submitter);
let result = tokio::time::timeout(Duration::from_secs(2), rx)
.await
.expect("deadline completion timed out");
assert!(matches!(
result.expect("missing deadline completion"),
Err((_, crate::Error::Timeout))
));
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_timeout_abandons_timed_op_after_cutoff() {
let cfg = Config {
max_request_timeout: Duration::from_millis(750),
timeout_wheel_tick: Duration::from_millis(5),
shutdown_timeout: Some(Duration::from_millis(50)),
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (pipe_left, _pipe_right) = UnixStream::pair().unwrap();
let (tx, rx) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(pipe_left.into()),
buf: IoBufMut::with_capacity(8),
offset: 0,
len: 8,
exact: false,
deadline: Some(Instant::now() + Duration::from_millis(500)),
result: None,
sender: tx,
}))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
drop(submitter);
let err = tokio::time::timeout(Duration::from_secs(2), rx)
.await
.expect("shutdown abandonment timed out")
.unwrap_err();
assert!(matches!(err, RecvError { .. }));
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_deadline_timeout_ensure_enough_capacity() {
let cfg = Config {
size: 8,
max_request_timeout: Duration::from_millis(50),
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let total = 64usize;
let deadline = Instant::now() + Duration::from_millis(50);
let mut peers = Vec::with_capacity(total);
let mut recvs = Vec::with_capacity(total);
for _ in 0..total {
let (left, right) = UnixStream::pair().unwrap();
peers.push(right);
recvs.push({
let submitter = submitter.clone();
async move {
submitter
.recv(
Arc::new(left.into()),
IoBufMut::with_capacity(8),
0,
8,
false,
deadline,
)
.await
}
});
}
for result in tokio::time::timeout(Duration::from_secs(2), join_all(recvs))
.await
.expect("deadline completion timed out")
{
assert!(matches!(result, Err((_, crate::Error::Timeout))));
}
drop(peers);
drop(submitter);
handle.join().unwrap();
}
#[tokio::test]
async fn test_exact_recv_partial_progress() {
let cfg = Config::default();
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (left, right) = UnixStream::pair().unwrap();
left.set_nonblocking(true).unwrap();
right.set_nonblocking(true).unwrap();
let total = 100;
let recv = submitter.recv(
Arc::new(left.into()),
IoBufMut::with_capacity(total),
0,
total,
true,
Instant::now() + Duration::from_secs(5),
);
let writer = async {
(&right).write_all(&[1u8; 40]).unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
(&right).write_all(&[2u8; 60]).unwrap();
};
let (recv_result, ()) = tokio::time::timeout(Duration::from_secs(5), join(recv, writer))
.await
.expect("recv timed out");
let (_, result) = recv_result.expect("recv should succeed");
assert_eq!(result, total);
drop(submitter);
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_no_timeout_processes_ready_queue() {
let cfg = Config {
shutdown_timeout: None,
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (left, right) = UnixStream::pair().unwrap();
left.set_nonblocking(true).unwrap();
right.set_nonblocking(true).unwrap();
let total = 100usize;
let (tx, rx) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(left.into()),
buf: IoBufMut::with_capacity(total),
offset: 0,
len: total,
exact: true,
deadline: None,
result: None,
sender: tx,
}))
.await
.unwrap();
(&right).write_all(&[1u8; 10]).unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
drop(submitter);
tokio::time::sleep(Duration::from_millis(20)).await;
(&right).write_all(&[2u8; 90]).unwrap();
let (_, result) = tokio::time::timeout(Duration::from_secs(2), rx)
.await
.expect("shutdown recv timed out")
.expect("missing shutdown recv completion")
.expect("recv should succeed during shutdown");
assert_eq!(result, total);
handle.join().unwrap();
}
#[tokio::test]
async fn test_timeout_fires_while_request_in_ready_queue() {
let cfg = Config {
max_request_timeout: Duration::from_millis(200),
timeout_wheel_tick: Duration::from_millis(5),
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (left, right) = UnixStream::pair().unwrap();
let recv = submitter.recv(
Arc::new(left.into()),
IoBufMut::with_capacity(100),
0,
100,
true,
Instant::now() + Duration::from_millis(80),
);
let writer = async {
(&right).write_all(&[1u8; 10]).unwrap();
};
let (result, ()) = tokio::time::timeout(Duration::from_secs(5), join(recv, writer))
.await
.expect("recv should not hang");
assert!(
matches!(result, Err((_, crate::Error::Timeout))),
"expected timeout, got {result:?}"
);
drop(submitter);
handle.join().unwrap();
}
#[test]
fn test_ready_queue_timeout_skips_cancel_staging() {
let cfg = Config {
max_request_timeout: Duration::from_millis(100),
timeout_wheel_tick: Duration::from_millis(5),
..Default::default()
};
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg, &mut registry);
let (left, _right) = UnixStream::pair().unwrap();
let (tx, _rx) = oneshot::channel();
let request = Request::Recv(RecvRequest {
fd: Arc::new(left.into()),
buf: IoBufMut::with_capacity(8),
offset: 0,
len: 8,
exact: true,
deadline: Some(Instant::now() + Duration::from_millis(25)),
result: None,
sender: tx,
});
let waiter_id = iouring.waiters.insert(request, Some(1));
assert!(matches!(
iouring.waiters.stage(waiter_id),
StageOutcome::Submit(_)
));
iouring.timeout_wheel.schedule(waiter_id, 1);
let waiter_id = match iouring.waiters.on_completion(waiter_id.user_data(), 4) {
CompletionOutcome::Requeue(waiter_id) => waiter_id,
_ => panic!("missing partial recv completion"),
};
iouring.ready_queue.push_back(waiter_id);
std::thread::sleep(iouring.cfg.timeout_wheel_tick + Duration::from_millis(2));
iouring.advance_timeouts();
assert!(iouring.pending_cancels.is_empty());
assert!(matches!(
iouring.waiters.stage(waiter_id),
StageOutcome::Timeout(_)
));
}
#[test]
fn test_drain_breaks_after_local_ready_queue_timeout_finishes_last_waiter() {
let cfg = Config {
shutdown_timeout: None,
..Default::default()
};
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
iouring.wake_rearm_needed = false;
let (tx, rx) = oneshot::channel();
let waiter_id = iouring.waiters.insert(
Request::Send(SendRequest {
fd: Arc::new(UnixStream::pair().unwrap().0.into()),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: Some(Instant::now() + Duration::from_secs(1)),
result: None,
sender: tx,
}),
Some(1),
);
assert!(iouring.waiters.cancel(waiter_id));
iouring.ready_queue.push_back(waiter_id);
iouring.drain(&mut ring);
assert!(iouring.waiters.is_empty());
assert_eq!(ring.submission().len(), 0);
let result = futures::executor::block_on(rx).expect("missing timeout completion");
assert!(matches!(result, Err(crate::Error::Timeout)));
}
#[tokio::test]
async fn test_fill_submission_queue_completes_cancelled_ready_queue_entry_locally() {
let cfg = Config::default();
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
iouring.wake_rearm_needed = false;
let (tx, rx) = oneshot::channel();
let request = Request::Send(SendRequest {
fd: Arc::new(UnixStream::pair().unwrap().0.into()),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: Some(Instant::now() + Duration::from_secs(1)),
result: None,
sender: tx,
});
let waiter_id = iouring.waiters.insert(request, Some(1));
assert!(iouring.waiters.cancel(waiter_id));
iouring.ready_queue.push_back(waiter_id);
let fill_result = iouring.fill_submission_queue(&mut ring);
assert_eq!(fill_result, FillResult::Drained);
assert!(iouring.waiters.is_empty());
assert_eq!(ring.submission().len(), 0);
let result = rx.await.expect("missing timeout completion");
assert!(matches!(result, Err(crate::Error::Timeout)));
}
#[tokio::test]
async fn test_fill_submission_queue_orphans_closed_request_before_first_submit() {
let cfg = Config::default();
let mut registry = Registry::default();
let (handle, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
iouring.wake_rearm_needed = false;
let (tx, rx) = oneshot::channel();
drop(rx);
handle
.enqueue(Request::Send(SendRequest {
fd: Arc::new(UnixStream::pair().unwrap().0.into()),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: Some(Instant::now() + Duration::from_secs(1)),
result: None,
sender: tx,
}))
.await
.expect("request should enqueue");
let fill_result = iouring.fill_submission_queue(&mut ring);
assert_eq!(fill_result, FillResult::Drained);
assert!(iouring.waiters.is_empty());
assert_eq!(ring.submission().len(), 0);
assert_eq!(iouring.timeout_wheel.next_deadline(), None);
let (sock_left, _sock_right) = UnixStream::pair().unwrap();
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, rx) = oneshot::channel();
drop(rx);
handle
.enqueue(Request::ReadAt(ReadAtRequest {
file: Arc::new(file),
offset: 0,
len: 8,
read: 0,
buf: IoBufMut::with_capacity(8),
result: None,
sender: tx,
}))
.await
.expect("request should enqueue");
let fill_result = iouring.fill_submission_queue(&mut ring);
assert_eq!(fill_result, FillResult::Drained);
assert!(iouring.waiters.is_empty());
assert_eq!(ring.submission().len(), 0);
assert_eq!(iouring.timeout_wheel.next_deadline(), None);
}
#[tokio::test]
async fn test_fill_submission_queue_orphans_closed_ready_queue_entry_locally() {
let cfg = Config::default();
let mut registry = Registry::default();
let (_handle, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
iouring.wake_rearm_needed = false;
let (tx, rx) = oneshot::channel();
drop(rx);
let waiter_id = iouring.waiters.insert(
Request::Recv(RecvRequest {
fd: Arc::new(UnixStream::pair().unwrap().0.into()),
buf: IoBufMut::with_capacity(8),
offset: 4,
len: 8,
exact: true,
deadline: None,
result: None,
sender: tx,
}),
Some(1),
);
iouring.timeout_wheel.schedule(waiter_id, 1);
iouring.ready_queue.push_back(waiter_id);
let fill_result = iouring.fill_submission_queue(&mut ring);
assert_eq!(fill_result, FillResult::Drained);
assert!(iouring.waiters.is_empty());
assert_eq!(ring.submission().len(), 0);
assert_eq!(iouring.timeout_wheel.next_deadline(), None);
}
#[tokio::test]
async fn test_single_issuer() {
let cfg = Config {
single_issuer: true,
..Default::default()
};
let mut registry = Registry::default();
let (sender, iouring) = IoUringLoop::new(cfg, &mut registry);
let uring_thread = std::thread::spawn(move || iouring.run());
let (sock_left, sock_right) = UnixStream::pair().unwrap();
let recv = sender.recv(
Arc::new(sock_left.into()),
IoBufMut::with_capacity(5),
0,
5,
true,
Instant::now() + Duration::from_secs(5),
);
let send = sender.send(
Arc::new(sock_right.into()),
IoBufs::from(IoBuf::from(b"hello")),
Instant::now() + Duration::from_secs(5),
);
let (recv_result, send_result) =
tokio::time::timeout(Duration::from_secs(2), join(recv, send))
.await
.expect("recv/send timed out");
let (_, read) = recv_result.expect("recv should succeed");
assert_eq!(read, 5);
send_result.expect("send should succeed");
drop(sender);
uring_thread.join().unwrap();
}
}