use crate::{Error, IoBufMut, IoBufs};
use commonware_utils::channel::{
mpsc::{self, error::TryRecvError},
oneshot,
};
use io_uring::{
cqueue::Entry as CqueueEntry,
opcode::AsyncCancel,
squeue::SubmissionQueue,
types::{SubmitArgs, Timespec},
IoUring,
};
use prometheus_client::{metrics::gauge::Gauge, registry::Registry};
use request::{ReadAtRequest, RecvRequest, Request, SendRequest, SyncRequest, WriteAtRequest};
use std::{
collections::VecDeque,
fs::File,
os::fd::OwnedFd,
sync::Arc,
time::{Duration, Instant},
};
mod request;
mod timeout;
use timeout::{Tick, TimeoutWheel};
mod waiter;
use waiter::{CompletionOutcome, StageOutcome, WaiterId, Waiters};
mod waker;
use waker::{Waker, SUBMISSION_SEQ_MASK, WAKE_USER_DATA};
type UserData = u64;
#[derive(Debug)]
pub struct Metrics {
pending_operations: Gauge,
}
impl Metrics {
pub fn new(registry: &mut Registry) -> Self {
let metrics = Self {
pending_operations: Gauge::default(),
};
registry.register(
"pending_operations",
"Number of active logical requests in the io_uring loop",
metrics.pending_operations.clone(),
);
metrics
}
}
#[derive(Clone, Debug)]
pub struct Config {
pub size: u32,
pub io_poll: bool,
pub single_issuer: bool,
pub max_request_timeout: Duration,
pub shutdown_timeout: Option<Duration>,
pub timeout_wheel_tick: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
size: 128,
io_poll: false,
single_issuer: false,
max_request_timeout: Duration::from_secs(60),
shutdown_timeout: None,
timeout_wheel_tick: Duration::from_millis(5),
}
}
}
struct HandleInner {
sender: Option<mpsc::Sender<Request>>,
waker: Waker,
}
impl Drop for HandleInner {
fn drop(&mut self) {
drop(self.sender.take());
self.waker.ring();
}
}
#[derive(Clone)]
pub struct Handle {
inner: Arc<HandleInner>,
}
impl Handle {
async fn enqueue(&self, request: Request) -> Result<(), mpsc::error::SendError<Request>> {
self.inner
.sender
.as_ref()
.expect("handle sender is only taken on drop")
.send(request)
.await?;
self.inner.waker.publish();
Ok(())
}
#[cfg_attr(not(feature = "iouring-network"), allow(dead_code))]
pub async fn send(
&self,
fd: Arc<OwnedFd>,
bufs: IoBufs,
deadline: Instant,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.enqueue(Request::Send(SendRequest {
fd,
write: bufs.into(),
deadline: Some(deadline),
result: None,
sender: tx,
}))
.await
.map_err(|_| Error::SendFailed)?;
rx.await.map_err(|_| Error::SendFailed)?
}
#[cfg_attr(not(feature = "iouring-network"), allow(dead_code))]
pub async fn recv(
&self,
fd: Arc<OwnedFd>,
buf: IoBufMut,
offset: usize,
len: usize,
exact: bool,
deadline: Instant,
) -> Result<(IoBufMut, usize), (IoBufMut, Error)> {
assert!(
offset <= len && len <= buf.capacity(),
"recv invariant violated: need offset <= len <= capacity"
);
let (tx, rx) = oneshot::channel();
let request = Request::Recv(RecvRequest {
fd,
buf,
offset,
len,
exact,
deadline: Some(deadline),
result: None,
sender: tx,
});
if let Err(err) = self.enqueue(request).await {
let Request::Recv(request) = err.0 else {
unreachable!("recv enqueue returned wrong request variant");
};
return Err((request.buf, Error::RecvFailed));
}
rx.await.unwrap_or_else(|_| {
Err((IoBufMut::default(), Error::RecvFailed))
})
}
#[cfg_attr(not(feature = "iouring-storage"), allow(dead_code))]
pub async fn read_at(
&self,
file: Arc<File>,
offset: u64,
len: usize,
buf: IoBufMut,
) -> Result<IoBufMut, (IoBufMut, Error)> {
assert!(len <= buf.capacity(), "read_at len exceeds buffer capacity");
let (tx, rx) = oneshot::channel();
let request = Request::ReadAt(ReadAtRequest {
file,
offset,
len,
read: 0,
buf,
result: None,
sender: tx,
});
if let Err(err) = self.enqueue(request).await {
let Request::ReadAt(request) = err.0 else {
unreachable!("read_at enqueue returned wrong request variant");
};
return Err((request.buf, Error::ReadFailed));
}
rx.await.unwrap_or_else(|_| {
Err((IoBufMut::default(), Error::ReadFailed))
})
}
#[cfg_attr(not(feature = "iouring-storage"), allow(dead_code))]
pub async fn write_at(&self, file: Arc<File>, offset: u64, bufs: IoBufs) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.enqueue(Request::WriteAt(WriteAtRequest {
file,
offset,
written: 0,
write: bufs.into(),
result: None,
sender: tx,
}))
.await
.map_err(|_| Error::WriteFailed)?;
rx.await.map_err(|_| Error::WriteFailed)?
}
#[cfg_attr(not(feature = "iouring-storage"), allow(dead_code))]
pub async fn sync(&self, file: Arc<File>) -> std::io::Result<()> {
let (tx, rx) = oneshot::channel();
self.enqueue(Request::Sync(SyncRequest {
file,
result: None,
sender: tx,
}))
.await
.map_err(|_| std::io::Error::other("failed to send work"))?;
rx.await
.map_err(|_| std::io::Error::other("failed to read result"))?
}
}
pub(crate) struct IoUringLoop {
cfg: Config,
metrics: Arc<Metrics>,
receiver: mpsc::Receiver<Request>,
waiters: Waiters,
ready_queue: VecDeque<WaiterId>,
pending_cancels: VecDeque<WaiterId>,
timeout_wheel: TimeoutWheel,
waker: Waker,
wake_rearm_needed: bool,
processed_seq: u64,
}
impl IoUringLoop {
pub(crate) fn new(mut cfg: Config, registry: &mut Registry) -> (Handle, Self) {
assert!(
!cfg.max_request_timeout.is_zero(),
"max_request_timeout must be non-zero for timeout wheel"
);
assert!(
!cfg.timeout_wheel_tick.is_zero(),
"timeout_wheel_tick must be non-zero for timeout wheel"
);
cfg.size = cfg
.size
.checked_next_power_of_two()
.expect("ring size exceeds u32::MAX");
let size = cfg.size as usize;
let metrics = Arc::new(Metrics::new(registry));
let (sender, receiver) = mpsc::channel(size);
let waker = Waker::new().expect("unable to create wake eventfd");
let timeout_wheel = TimeoutWheel::new(
cfg.max_request_timeout,
cfg.timeout_wheel_tick,
Instant::now(),
);
let waiters = Waiters::new(size);
let handle = Handle {
inner: Arc::new(HandleInner {
sender: Some(sender),
waker: waker.clone(),
}),
};
(
handle,
Self {
cfg,
metrics,
receiver,
waiters,
ready_queue: VecDeque::with_capacity(size),
pending_cancels: VecDeque::with_capacity(size),
timeout_wheel,
waker,
wake_rearm_needed: true,
processed_seq: 0,
},
)
}
pub(crate) fn run(mut self) {
let mut ring = new_ring(&self.cfg).expect("unable to create io_uring instance");
loop {
for cqe in ring.completion() {
self.handle_cqe(cqe);
}
self.advance_timeouts();
let Some(at_capacity) = self.fill_submission_queue(&mut ring) else {
self.drain(&mut ring);
return;
};
self.metrics.pending_operations.set(self.waiters.len() as _);
if self.waker.submitted() != self.processed_seq {
if at_capacity {
self.submit_and_wait(&mut ring, 1, self.timeout_wheel.next_deadline())
.expect("unable to submit to ring");
}
continue;
}
if at_capacity || self.waker.arm() == self.processed_seq {
self.submit_and_wait(&mut ring, 1, self.timeout_wheel.next_deadline())
.expect("unable to submit to ring");
}
self.waker.disarm();
}
}
fn admit_request(&mut self, request: Request) -> Option<WaiterId> {
let deadline = request.deadline();
let target_tick = match deadline {
Some(deadline) => match self.timeout_wheel.target_tick(deadline) {
Some(target_tick) => Some(target_tick),
None => {
request.timeout();
return None;
}
},
None => None,
};
let waiter_id = self.waiters.insert(request, target_tick);
if let Some(target_tick) = target_tick {
self.timeout_wheel.schedule(waiter_id, target_tick);
}
Some(waiter_id)
}
fn stage_request(&mut self, waiter_id: WaiterId, submission_queue: &mut SubmissionQueue<'_>) {
match self.waiters.stage(waiter_id) {
StageOutcome::Timeout(request) => request.timeout(),
StageOutcome::Orphaned { target_tick } => {
if let Some(tick) = target_tick {
self.timeout_wheel.remove(tick);
}
}
StageOutcome::Submit(sqe) => {
unsafe {
submission_queue
.push(&sqe)
.expect("unable to push to queue");
}
}
}
}
fn stage_ready_requests(&mut self, submission_queue: &mut SubmissionQueue<'_>) -> bool {
while !submission_queue.is_full() {
let Some(waiter_id) = self.ready_queue.pop_front() else {
return false;
};
self.stage_request(waiter_id, submission_queue);
}
!self.ready_queue.is_empty()
}
fn fill_submission_queue(&mut self, ring: &mut IoUring) -> Option<bool> {
let mut drained = 0u64;
let mut submission_queue = ring.submission();
let mut wheel_aligned = self.timeout_wheel.next_deadline().is_some();
if std::mem::take(&mut self.wake_rearm_needed) {
self.waker.reinstall(&mut submission_queue);
}
if self.stage_cancellations(&mut submission_queue) {
return Some(true);
}
if self.stage_ready_requests(&mut submission_queue) {
return Some(true);
}
while self.waiters.len() < self.cfg.size as usize && !submission_queue.is_full() {
let request = match self.receiver.try_recv() {
Ok(request) => request,
Err(TryRecvError::Disconnected) => return None,
Err(TryRecvError::Empty) => break,
};
drained += 1;
if !wheel_aligned && request.has_deadline() {
assert!(self.timeout_wheel.advance(Instant::now()).is_none());
wheel_aligned = true;
}
if let Some(waiter_id) = self.admit_request(request) {
self.stage_request(waiter_id, &mut submission_queue);
}
}
self.processed_seq = self.processed_seq.wrapping_add(drained) & SUBMISSION_SEQ_MASK;
let at_sq_capacity = submission_queue.is_full();
let at_waiter_capacity = self.waiters.len() == self.cfg.size as usize;
Some(at_sq_capacity || at_waiter_capacity)
}
fn stage_cancellations(&mut self, submission_queue: &mut SubmissionQueue<'_>) -> bool {
while !submission_queue.is_full() {
let Some(waiter_id) = self.pending_cancels.pop_front() else {
return false;
};
if !self.waiters.is_in_flight(waiter_id) {
continue;
}
let cancel = AsyncCancel::new(waiter_id.user_data())
.build()
.user_data(waiter_id.cancel_user_data());
unsafe {
submission_queue
.push(&cancel)
.expect("unable to push cancel to queue");
}
}
!self.pending_cancels.is_empty()
}
fn handle_cqe(&mut self, cqe: CqueueEntry) {
let user_data = cqe.user_data();
if user_data == WAKE_USER_DATA {
assert!(
cqe.result() >= 0,
"wake poll CQE failed: requires multishot poll (Linux 5.13+)"
);
self.waker.acknowledge();
if !io_uring::cqueue::more(cqe.flags()) {
self.wake_rearm_needed = true;
}
return;
}
match self.waiters.on_completion(user_data, cqe.result()) {
CompletionOutcome::Cancel => {
}
CompletionOutcome::Requeue(waiter_id) => {
self.ready_queue.push_back(waiter_id);
}
CompletionOutcome::Complete {
request,
target_tick,
} => {
if let Some(tick) = target_tick {
self.timeout_wheel.remove(tick);
}
request.complete();
}
}
}
fn advance_timeouts(&mut self) {
if self.timeout_wheel.next_deadline().is_none() {
return;
}
let Some(expired) = self.timeout_wheel.advance(Instant::now()) else {
return;
};
for entry in expired {
if self.waiters.cancel(entry.waiter_id) {
self.timeout_wheel.remove(entry.target_tick);
if self.waiters.is_in_flight(entry.waiter_id) {
self.pending_cancels.push_back(entry.waiter_id);
}
}
}
}
fn drain(&mut self, ring: &mut IoUring) {
let mut remaining = self.cfg.shutdown_timeout;
loop {
for cqe in ring.completion() {
self.handle_cqe(cqe);
}
if self.waiters.is_empty() {
break;
}
if remaining.is_some_and(|t| t.is_zero()) {
break;
}
self.advance_timeouts();
{
let mut submission_queue = ring.submission();
self.stage_cancellations(&mut submission_queue);
self.stage_ready_requests(&mut submission_queue);
}
if self.waiters.is_empty() {
break;
}
let timeout = match (remaining, self.timeout_wheel.next_deadline()) {
(Some(remaining), Some(deadline)) => Some(remaining.min(deadline)),
(Some(remaining), None) => Some(remaining),
(None, Some(deadline)) => Some(deadline),
(None, None) => None,
};
let start = Instant::now();
self.submit_and_wait(ring, 1, timeout)
.expect("unable to submit to ring");
if let Some(remaining) = remaining.as_mut() {
*remaining = remaining.saturating_sub(start.elapsed());
}
}
self.metrics.pending_operations.set(self.waiters.len() as _);
}
fn submit_and_wait(
&self,
ring: &mut IoUring,
want: usize,
timeout: Option<Duration>,
) -> Result<bool, std::io::Error> {
let result = timeout.map_or_else(
|| ring.submit_and_wait(want).map(|_| true),
|timeout| {
let ts = Timespec::new()
.sec(timeout.as_secs())
.nsec(timeout.subsec_nanos());
let args = SubmitArgs::new().timespec(&ts);
match ring.submitter().submit_with_args(want, &args) {
Ok(_) => Ok(true),
Err(err) if err.raw_os_error() == Some(libc::ETIME) => Ok(false),
Err(err) => Err(err),
}
},
);
match result {
Ok(v) => Ok(v),
Err(err) => match err.raw_os_error() {
Some(libc::EINTR | libc::EAGAIN | libc::EBUSY) => Ok(true),
_ => Err(err),
},
}
}
}
fn new_ring(cfg: &Config) -> Result<IoUring, std::io::Error> {
let mut builder = &mut IoUring::builder();
if cfg.io_poll {
builder = builder.setup_iopoll();
}
if cfg.single_issuer {
builder = builder.setup_single_issuer();
builder = builder.setup_defer_taskrun();
}
builder.build(cfg.size)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{IoBuf, IoBufMut};
use commonware_utils::channel::oneshot::{self, error::RecvError};
use futures::future::{join, join_all};
use prometheus_client::registry::Registry;
use request::{RecvRequest, SendRequest, SyncRequest};
use std::{
io::Write,
os::{
fd::{AsRawFd, FromRawFd, IntoRawFd},
unix::net::UnixStream,
},
time::Duration,
};
#[test]
fn test_iouring_loop_rounds_ring_size_up_to_power_of_two() {
let mut registry = Registry::default();
let cfg = Config {
size: 1_000,
..Default::default()
};
let (_, iouring) = IoUringLoop::new(cfg, &mut registry);
assert_eq!(iouring.cfg.size, 1_024);
let cfg = Config {
size: 1_024,
..Default::default()
};
let (_, iouring) = IoUringLoop::new(cfg, &mut registry);
assert_eq!(iouring.cfg.size, 1_024);
}
#[test]
fn test_submit_and_wait_non_etime_error_is_not_misclassified() {
let cfg = Config::default();
let mut registry = Registry::default();
let (_submitter, iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
let close_result = unsafe { libc::close(ring.as_raw_fd()) };
assert_eq!(close_result, 0, "failed to close ring fd in test");
let err = iouring
.submit_and_wait(&mut ring, 1, Some(Duration::from_millis(1)))
.expect_err("submit_and_wait should fail on closed ring fd");
assert_ne!(err.raw_os_error(), Some(libc::ETIME));
std::mem::forget(ring);
}
#[test]
fn test_new_ring_iopoll_builder_path_is_exercised() {
let cfg = Config {
io_poll: true,
..Default::default()
};
match new_ring(&cfg) {
Ok(_ring) => {}
Err(err) => assert!(err.raw_os_error().is_some()),
}
}
#[test]
fn test_fill_submission_queue_returns_true_when_cancel_staging_fills_sq() {
let cfg = Config {
size: 8,
..Default::default()
};
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
for _ in 0..cfg.size as usize {
let (sock_left, _sock_right) =
UnixStream::pair().expect("failed to create unix socket pair");
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
let request = Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
});
let waiter_id = iouring.waiters.insert(request, None);
assert!(matches!(
iouring.waiters.stage(waiter_id),
StageOutcome::Submit(_)
));
assert!(
iouring.waiters.cancel(waiter_id),
"cancel should transition waiter to cancel-requested"
);
iouring.pending_cancels.push_back(waiter_id);
}
let at_capacity = iouring
.fill_submission_queue(&mut ring)
.expect("channel should remain connected");
assert!(at_capacity);
assert!(!iouring.pending_cancels.is_empty());
}
#[test]
fn test_fill_submission_queue_returns_true_when_ready_staging_fills_sq() {
let cfg = Config {
size: 8,
..Default::default()
};
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
for _ in 0..cfg.size as usize {
let (sock_left, _sock_right) =
UnixStream::pair().expect("failed to create unix socket pair");
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
let request = Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
});
let waiter_id = iouring.waiters.insert(request, None);
iouring.ready_queue.push_back(waiter_id);
}
let at_capacity = iouring
.fill_submission_queue(&mut ring)
.expect("channel should remain connected");
assert!(at_capacity);
assert!(!iouring.ready_queue.is_empty());
}
#[test]
fn test_fill_submission_queue_returns_true_when_fresh_staging_fills_sq() {
let cfg = Config {
size: 8,
..Default::default()
};
let mut registry = Registry::default();
let (submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
futures::executor::block_on(async {
for _ in 0..cfg.size as usize {
let (sock_left, _sock_right) =
UnixStream::pair().expect("failed to create unix socket pair");
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
submitter
.enqueue(Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
}))
.await
.expect("failed to enqueue request");
}
});
let at_capacity = iouring
.fill_submission_queue(&mut ring)
.expect("channel should remain connected");
assert!(at_capacity);
assert!(ring.submission().is_full());
assert!(iouring.waiters.len() < cfg.size as usize);
}
#[test]
fn test_fill_submission_queue_skips_cancel_for_ready_queue_timeout() {
let cfg = Config::default();
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
iouring.wake_rearm_needed = false;
let (sock_left, _sock_right) =
UnixStream::pair().expect("failed to create unix socket pair");
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
let request = Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
});
let waiter_id = iouring.waiters.insert(request, None);
assert!(iouring.waiters.cancel(waiter_id));
iouring.pending_cancels.push_back(waiter_id);
let at_capacity = iouring
.fill_submission_queue(&mut ring)
.expect("channel should remain connected");
assert!(!at_capacity);
assert!(iouring.pending_cancels.is_empty());
assert_eq!(ring.submission().len(), 0);
assert!(matches!(
iouring.waiters.stage(waiter_id),
StageOutcome::Timeout(_)
));
}
#[tokio::test]
async fn test_fill_submission_queue_expired_deadline_completes_immediately() {
let cfg = Config::default();
let mut registry = Registry::default();
let (submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
iouring.wake_rearm_needed = false;
let (tx, rx) = oneshot::channel();
let past_deadline = Instant::now()
.checked_sub(Duration::from_secs(1))
.unwrap_or_else(Instant::now);
submitter
.enqueue(Request::Send(SendRequest {
fd: Arc::new(UnixStream::pair().unwrap().0.into()),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: Some(past_deadline),
result: None,
sender: tx,
}))
.await
.expect("failed to enqueue request");
let at_capacity = iouring
.fill_submission_queue(&mut ring)
.expect("channel should remain connected");
assert!(!at_capacity);
assert!(iouring.pending_cancels.is_empty());
assert!(iouring.waiters.is_empty());
assert_eq!(ring.submission().len(), 0);
let result = rx.await.expect("missing timeout completion");
assert!(matches!(result, Err(crate::Error::Timeout)));
}
#[test]
fn test_handle_recv_panics_on_invalid_buffer_bounds() {
let cfg = Config::default();
let mut registry = Registry::default();
let (handle, io_loop) = IoUringLoop::new(cfg, &mut registry);
drop(io_loop);
let offset_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let (left, _right) = UnixStream::pair().unwrap();
let _ = futures::executor::block_on(handle.recv(
Arc::new(left.into()),
IoBufMut::with_capacity(4),
5,
4,
true,
Instant::now() + Duration::from_secs(1),
));
}));
assert!(offset_panic.is_err());
let capacity_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let (left, _right) = UnixStream::pair().unwrap();
let _ = futures::executor::block_on(handle.recv(
Arc::new(left.into()),
IoBufMut::with_capacity(4),
0,
5,
true,
Instant::now() + Duration::from_secs(1),
));
}));
assert!(capacity_panic.is_err());
}
#[tokio::test]
async fn test_timeout_slot_reuse_does_not_cancel_new_waiter_early() {
let cfg = Config {
size: 8,
max_request_timeout: Duration::from_millis(200),
timeout_wheel_tick: Duration::from_millis(5),
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (left1, right1) = UnixStream::pair().unwrap();
(&right1).write_all(&[42]).unwrap();
let (_buf1, read1) = submitter
.recv(
Arc::new(left1.into()),
IoBufMut::with_capacity(1),
0,
1,
false,
Instant::now() + Duration::from_millis(200),
)
.await
.expect("first recv should succeed");
assert!(read1 > 0);
let (left2, _right2) = UnixStream::pair().expect("failed to create unix stream pair");
let start = Instant::now();
let result2 = submitter
.recv(
Arc::new(left2.into()),
IoBufMut::with_capacity(8),
0,
8,
false,
Instant::now() + Duration::from_millis(80),
)
.await;
let elapsed = start.elapsed();
assert!(matches!(result2, Err((_, crate::Error::Timeout))));
assert!(
elapsed >= Duration::from_millis(50),
"timeout fired too early after slot reuse: {elapsed:?}"
);
drop(submitter);
handle.join().expect("io_uring loop thread panicked");
}
#[test]
fn test_stage_request_panics_on_stale_ready_queue_entry() {
let cfg = Config::default();
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
let (sock_left, _sock_right) = UnixStream::pair().unwrap();
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
let stale = iouring.waiters.insert(
Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
}),
None,
);
assert!(matches!(
iouring.waiters.stage(stale),
StageOutcome::Submit(_)
));
match iouring.waiters.on_completion(stale.user_data(), 0) {
CompletionOutcome::Complete { request, .. } => request.complete(),
_ => panic!("sync waiter should complete immediately"),
}
let (sock_left, _sock_right) = UnixStream::pair().unwrap();
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
let reused = iouring.waiters.insert(
Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
}),
None,
);
assert_eq!(reused.index(), stale.index());
assert_ne!(reused, stale);
let stale_ready = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut sq = ring.submission();
iouring.stage_request(stale, &mut sq);
}));
assert!(stale_ready.is_err());
assert!(!iouring.waiters.is_in_flight(reused));
}
#[test]
fn test_advance_timeouts_ignores_stale_entry_after_slot_reuse() {
let cfg = Config {
max_request_timeout: Duration::from_millis(100),
..Default::default()
};
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg, &mut registry);
let (sock_left, _) = UnixStream::pair().unwrap();
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (old_tx, _old_rx) = oneshot::channel();
let old_req = Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: old_tx,
});
let old_slot = iouring.waiters.insert(old_req, Some(1));
iouring.timeout_wheel.schedule(old_slot, 1);
assert!(matches!(
iouring.waiters.stage(old_slot),
StageOutcome::Submit(_)
));
if let CompletionOutcome::Complete {
request,
target_tick: Some(tick),
} = iouring.waiters.on_completion(old_slot.user_data(), 0)
{
iouring.timeout_wheel.remove(tick);
request.complete();
}
let (sock_left2, _) = UnixStream::pair().unwrap();
let file2 = unsafe { std::fs::File::from_raw_fd(sock_left2.into_raw_fd()) };
let (tx, _rx) = oneshot::channel();
let req = Request::Sync(SyncRequest {
file: Arc::new(file2),
result: None,
sender: tx,
});
let slot_index = iouring.waiters.insert(req, Some(3));
assert_eq!(slot_index.index(), old_slot.index());
assert!(matches!(
iouring.waiters.stage(slot_index),
StageOutcome::Submit(_)
));
iouring.timeout_wheel.schedule(slot_index, 3);
std::thread::sleep(iouring.cfg.timeout_wheel_tick + Duration::from_millis(2));
iouring.advance_timeouts();
assert!(iouring.pending_cancels.is_empty());
std::thread::sleep((iouring.cfg.timeout_wheel_tick * 2) + Duration::from_millis(2));
iouring.advance_timeouts();
assert_eq!(iouring.pending_cancels.len(), 1);
}
#[test]
fn test_cancel_completion_returns_saved_op_result() {
let cfg = Config::default();
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg, &mut registry);
let (left, right) = UnixStream::pair().unwrap();
(&right).write_all(b"hello").unwrap();
let (tx, rx) = oneshot::channel();
let req = Request::Recv(RecvRequest {
fd: Arc::new(left.into()),
buf: IoBufMut::with_capacity(5),
offset: 0,
len: 5,
exact: false,
deadline: Some(Instant::now() + Duration::from_secs(1)),
result: None,
sender: tx,
});
let slot_index = iouring.waiters.insert(req, Some(2));
assert!(matches!(
iouring.waiters.stage(slot_index),
StageOutcome::Submit(_)
));
assert!(
iouring.waiters.cancel(slot_index),
"cancel should transition active waiter"
);
iouring.timeout_wheel.schedule(slot_index, 2);
iouring.timeout_wheel.remove(2);
if let CompletionOutcome::Complete { request, .. } =
iouring.waiters.on_completion(slot_index.user_data(), 5)
{
request.complete();
}
assert!(matches!(
iouring
.waiters
.on_completion(slot_index.cancel_user_data(), -libc::ECANCELED),
CompletionOutcome::Cancel
));
let (_, result) = futures::executor::block_on(rx)
.expect("missing completion")
.expect("recv should succeed");
assert_eq!(result, 5);
assert_eq!(iouring.waiters.len(), 0);
}
#[test]
fn test_staged_cancel_cqe_is_ignored_after_timeout_completion() {
let cfg = Config {
max_request_timeout: Duration::from_millis(100),
timeout_wheel_tick: Duration::from_millis(5),
..Default::default()
};
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
let (left, _right) = UnixStream::pair().unwrap();
let (tx, rx) = oneshot::channel();
let req = Request::Recv(RecvRequest {
fd: Arc::new(left.into()),
buf: IoBufMut::with_capacity(8),
offset: 0,
len: 8,
exact: true,
deadline: Some(Instant::now() + Duration::from_millis(25)),
result: None,
sender: tx,
});
let waiter_id = iouring.waiters.insert(req, Some(1));
assert!(matches!(
iouring.waiters.stage(waiter_id),
StageOutcome::Submit(_)
));
iouring.timeout_wheel.schedule(waiter_id, 1);
std::thread::sleep(iouring.cfg.timeout_wheel_tick + Duration::from_millis(2));
iouring.advance_timeouts();
assert_eq!(iouring.pending_cancels.len(), 1);
{
let mut submission_queue = ring.submission();
assert!(!iouring.stage_cancellations(&mut submission_queue));
assert_eq!(submission_queue.len(), 1);
}
assert!(iouring.pending_cancels.is_empty());
match iouring.waiters.on_completion(waiter_id.user_data(), 4) {
CompletionOutcome::Complete {
request,
target_tick: None,
} => request.complete(),
_ => panic!("missing timeout completion from op CQE"),
}
assert!(matches!(
iouring
.waiters
.on_completion(waiter_id.cancel_user_data(), -libc::ECANCELED),
CompletionOutcome::Cancel
));
assert!(matches!(
futures::executor::block_on(rx).expect("missing completion"),
Err((_, crate::Error::Timeout))
));
assert_eq!(iouring.waiters.len(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_wake_path_progress_scenarios() {
for should_succeed in [true, false] {
let cfg = Config::default();
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (left_pipe, right_pipe) = UnixStream::pair().unwrap();
let recv = submitter.recv(
Arc::new(left_pipe.into()),
IoBufMut::with_capacity(5),
0,
5,
false,
Instant::now() + Duration::from_secs(5),
);
let send = submitter.send(
Arc::new(right_pipe.into()),
IoBufs::from(IoBuf::from(b"hello")),
Instant::now() + Duration::from_secs(5),
);
let timeout = tokio::time::timeout(Duration::from_secs(2), async {
let (recv_result, send_result) = join(recv, send).await;
if should_succeed {
let (_, read) = recv_result.expect("recv should succeed");
assert!(read > 0);
send_result.expect("send should succeed");
} else {
let _ = recv_result;
let _ = send_result;
}
});
assert!(
timeout.await.is_ok(),
"wake path test timed out (should_succeed={should_succeed})"
);
drop(submitter);
handle.join().unwrap();
}
}
#[tokio::test]
async fn test_timeout() {
let cfg = Config {
max_request_timeout: Duration::from_secs(1),
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (pipe_left, _pipe_right) = UnixStream::pair().unwrap();
assert!(matches!(
submitter
.recv(
Arc::new(pipe_left.into()),
IoBufMut::with_capacity(8),
0,
8,
false,
Instant::now() + Duration::from_secs(1),
)
.await,
Err((_, crate::Error::Timeout))
));
drop(submitter);
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_no_timeout() {
let cfg = Config {
shutdown_timeout: None,
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (sock_left, _) = UnixStream::pair().unwrap();
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, rx) = oneshot::channel();
submitter
.enqueue(Request::Sync(SyncRequest {
file: Arc::new(file),
result: None,
sender: tx,
}))
.await
.unwrap();
drop(submitter);
let result = rx.await.unwrap();
let _ = result;
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_no_timeout_continues_deadline_processing() {
let cfg = Config {
max_request_timeout: Duration::from_millis(250),
timeout_wheel_tick: Duration::from_millis(5),
shutdown_timeout: None,
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (pipe_left, _pipe_right) = UnixStream::pair().unwrap();
let (tx, rx) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(pipe_left.into()),
buf: IoBufMut::with_capacity(8),
offset: 0,
len: 8,
exact: false,
deadline: Some(Instant::now() + Duration::from_millis(50)),
result: None,
sender: tx,
}))
.await
.unwrap();
drop(submitter);
let result = tokio::time::timeout(Duration::from_secs(2), rx)
.await
.expect("deadline completion timed out");
assert!(matches!(
result.expect("missing deadline completion"),
Err((_, crate::Error::Timeout))
));
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_timeout() {
let cfg = Config {
shutdown_timeout: Some(Duration::from_secs(1)),
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (pipe_left, _pipe_right) = UnixStream::pair().unwrap();
let (tx, rx) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(pipe_left.into()),
buf: IoBufMut::with_capacity(8),
offset: 0,
len: 8,
exact: false,
deadline: None,
result: None,
sender: tx,
}))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
drop(submitter);
let err = rx.await.unwrap_err();
assert!(matches!(err, RecvError { .. }));
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_timeout_preserves_deadline_result() {
let cfg = Config {
max_request_timeout: Duration::from_millis(250),
timeout_wheel_tick: Duration::from_millis(5),
shutdown_timeout: Some(Duration::from_millis(500)),
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (pipe_left, _pipe_right) = UnixStream::pair().unwrap();
let (tx, rx) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(pipe_left.into()),
buf: IoBufMut::with_capacity(8),
offset: 0,
len: 8,
exact: false,
deadline: Some(Instant::now() + Duration::from_millis(50)),
result: None,
sender: tx,
}))
.await
.unwrap();
drop(submitter);
let result = tokio::time::timeout(Duration::from_secs(2), rx)
.await
.expect("deadline completion timed out");
assert!(matches!(
result.expect("missing deadline completion"),
Err((_, crate::Error::Timeout))
));
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_timeout_abandons_timed_op_after_cutoff() {
let cfg = Config {
max_request_timeout: Duration::from_millis(750),
timeout_wheel_tick: Duration::from_millis(5),
shutdown_timeout: Some(Duration::from_millis(50)),
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (pipe_left, _pipe_right) = UnixStream::pair().unwrap();
let (tx, rx) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(pipe_left.into()),
buf: IoBufMut::with_capacity(8),
offset: 0,
len: 8,
exact: false,
deadline: Some(Instant::now() + Duration::from_millis(500)),
result: None,
sender: tx,
}))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
drop(submitter);
let err = tokio::time::timeout(Duration::from_secs(2), rx)
.await
.expect("shutdown abandonment timed out")
.unwrap_err();
assert!(matches!(err, RecvError { .. }));
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_deadline_timeout_ensure_enough_capacity() {
let cfg = Config {
size: 8,
max_request_timeout: Duration::from_millis(50),
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let total = 64usize;
let deadline = Instant::now() + Duration::from_millis(50);
let mut peers = Vec::with_capacity(total);
let mut recvs = Vec::with_capacity(total);
for _ in 0..total {
let (left, right) = UnixStream::pair().unwrap();
peers.push(right);
recvs.push({
let submitter = submitter.clone();
async move {
submitter
.recv(
Arc::new(left.into()),
IoBufMut::with_capacity(8),
0,
8,
false,
deadline,
)
.await
}
});
}
for result in tokio::time::timeout(Duration::from_secs(2), join_all(recvs))
.await
.expect("deadline completion timed out")
{
assert!(matches!(result, Err((_, crate::Error::Timeout))));
}
drop(peers);
drop(submitter);
handle.join().unwrap();
}
#[tokio::test]
async fn test_exact_recv_partial_progress() {
let cfg = Config::default();
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (left, right) = UnixStream::pair().unwrap();
left.set_nonblocking(true).unwrap();
right.set_nonblocking(true).unwrap();
let total = 100;
let recv = submitter.recv(
Arc::new(left.into()),
IoBufMut::with_capacity(total),
0,
total,
true,
Instant::now() + Duration::from_secs(5),
);
let writer = async {
(&right).write_all(&[1u8; 40]).unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
(&right).write_all(&[2u8; 60]).unwrap();
};
let (recv_result, ()) = tokio::time::timeout(Duration::from_secs(5), join(recv, writer))
.await
.expect("recv timed out");
let (_, result) = recv_result.expect("recv should succeed");
assert_eq!(result, total);
drop(submitter);
handle.join().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_no_timeout_processes_ready_queue() {
let cfg = Config {
shutdown_timeout: None,
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (left, right) = UnixStream::pair().unwrap();
left.set_nonblocking(true).unwrap();
right.set_nonblocking(true).unwrap();
let total = 100usize;
let (tx, rx) = oneshot::channel();
submitter
.enqueue(Request::Recv(RecvRequest {
fd: Arc::new(left.into()),
buf: IoBufMut::with_capacity(total),
offset: 0,
len: total,
exact: true,
deadline: None,
result: None,
sender: tx,
}))
.await
.unwrap();
(&right).write_all(&[1u8; 10]).unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
drop(submitter);
tokio::time::sleep(Duration::from_millis(20)).await;
(&right).write_all(&[2u8; 90]).unwrap();
let (_, result) = tokio::time::timeout(Duration::from_secs(2), rx)
.await
.expect("shutdown recv timed out")
.expect("missing shutdown recv completion")
.expect("recv should succeed during shutdown");
assert_eq!(result, total);
handle.join().unwrap();
}
#[tokio::test]
async fn test_timeout_fires_while_request_in_ready_queue() {
let cfg = Config {
max_request_timeout: Duration::from_millis(200),
timeout_wheel_tick: Duration::from_millis(5),
..Default::default()
};
let mut registry = Registry::default();
let (submitter, iouring) = IoUringLoop::new(cfg, &mut registry);
let handle = std::thread::spawn(move || iouring.run());
let (left, right) = UnixStream::pair().unwrap();
let recv = submitter.recv(
Arc::new(left.into()),
IoBufMut::with_capacity(100),
0,
100,
true,
Instant::now() + Duration::from_millis(80),
);
let writer = async {
(&right).write_all(&[1u8; 10]).unwrap();
};
let (result, ()) = tokio::time::timeout(Duration::from_secs(5), join(recv, writer))
.await
.expect("recv should not hang");
assert!(
matches!(result, Err((_, crate::Error::Timeout))),
"expected timeout, got {result:?}"
);
drop(submitter);
handle.join().unwrap();
}
#[test]
fn test_ready_queue_timeout_skips_cancel_staging() {
let cfg = Config {
max_request_timeout: Duration::from_millis(100),
timeout_wheel_tick: Duration::from_millis(5),
..Default::default()
};
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg, &mut registry);
let (left, _right) = UnixStream::pair().unwrap();
let (tx, _rx) = oneshot::channel();
let request = Request::Recv(RecvRequest {
fd: Arc::new(left.into()),
buf: IoBufMut::with_capacity(8),
offset: 0,
len: 8,
exact: true,
deadline: Some(Instant::now() + Duration::from_millis(25)),
result: None,
sender: tx,
});
let waiter_id = iouring.waiters.insert(request, Some(1));
assert!(matches!(
iouring.waiters.stage(waiter_id),
StageOutcome::Submit(_)
));
iouring.timeout_wheel.schedule(waiter_id, 1);
let waiter_id = match iouring.waiters.on_completion(waiter_id.user_data(), 4) {
CompletionOutcome::Requeue(waiter_id) => waiter_id,
_ => panic!("missing partial recv completion"),
};
iouring.ready_queue.push_back(waiter_id);
std::thread::sleep(iouring.cfg.timeout_wheel_tick + Duration::from_millis(2));
iouring.advance_timeouts();
assert!(iouring.pending_cancels.is_empty());
assert!(matches!(
iouring.waiters.stage(waiter_id),
StageOutcome::Timeout(_)
));
}
#[test]
fn test_drain_breaks_after_local_ready_queue_timeout_finishes_last_waiter() {
let cfg = Config {
shutdown_timeout: None,
..Default::default()
};
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
iouring.wake_rearm_needed = false;
let (tx, rx) = oneshot::channel();
let waiter_id = iouring.waiters.insert(
Request::Send(SendRequest {
fd: Arc::new(UnixStream::pair().unwrap().0.into()),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: Some(Instant::now() + Duration::from_secs(1)),
result: None,
sender: tx,
}),
Some(1),
);
assert!(iouring.waiters.cancel(waiter_id));
iouring.ready_queue.push_back(waiter_id);
iouring.drain(&mut ring);
assert!(iouring.waiters.is_empty());
assert_eq!(ring.submission().len(), 0);
let result = futures::executor::block_on(rx).expect("missing timeout completion");
assert!(matches!(result, Err(crate::Error::Timeout)));
}
#[tokio::test]
async fn test_fill_submission_queue_completes_cancelled_ready_queue_entry_locally() {
let cfg = Config::default();
let mut registry = Registry::default();
let (_submitter, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
iouring.wake_rearm_needed = false;
let (tx, rx) = oneshot::channel();
let request = Request::Send(SendRequest {
fd: Arc::new(UnixStream::pair().unwrap().0.into()),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: Some(Instant::now() + Duration::from_secs(1)),
result: None,
sender: tx,
});
let waiter_id = iouring.waiters.insert(request, Some(1));
assert!(iouring.waiters.cancel(waiter_id));
iouring.ready_queue.push_back(waiter_id);
let at_capacity = iouring
.fill_submission_queue(&mut ring)
.expect("channel should remain connected");
assert!(!at_capacity);
assert!(iouring.waiters.is_empty());
assert_eq!(ring.submission().len(), 0);
let result = rx.await.expect("missing timeout completion");
assert!(matches!(result, Err(crate::Error::Timeout)));
}
#[tokio::test]
async fn test_fill_submission_queue_orphans_closed_request_before_first_submit() {
let cfg = Config::default();
let mut registry = Registry::default();
let (handle, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
iouring.wake_rearm_needed = false;
let (tx, rx) = oneshot::channel();
drop(rx);
handle
.enqueue(Request::Send(SendRequest {
fd: Arc::new(UnixStream::pair().unwrap().0.into()),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: Some(Instant::now() + Duration::from_secs(1)),
result: None,
sender: tx,
}))
.await
.expect("request should enqueue");
let at_capacity = iouring
.fill_submission_queue(&mut ring)
.expect("channel should remain connected");
assert!(!at_capacity);
assert!(iouring.waiters.is_empty());
assert_eq!(ring.submission().len(), 0);
assert_eq!(iouring.timeout_wheel.next_deadline(), None);
let (sock_left, _sock_right) = UnixStream::pair().unwrap();
let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
let (tx, rx) = oneshot::channel();
drop(rx);
handle
.enqueue(Request::ReadAt(ReadAtRequest {
file: Arc::new(file),
offset: 0,
len: 8,
read: 0,
buf: IoBufMut::with_capacity(8),
result: None,
sender: tx,
}))
.await
.expect("request should enqueue");
let at_capacity = iouring
.fill_submission_queue(&mut ring)
.expect("channel should remain connected");
assert!(!at_capacity);
assert!(iouring.waiters.is_empty());
assert_eq!(ring.submission().len(), 0);
assert_eq!(iouring.timeout_wheel.next_deadline(), None);
}
#[tokio::test]
async fn test_fill_submission_queue_orphans_closed_ready_queue_entry_locally() {
let cfg = Config::default();
let mut registry = Registry::default();
let (_handle, mut iouring) = IoUringLoop::new(cfg.clone(), &mut registry);
let mut ring = new_ring(&cfg).expect("unable to create io_uring instance");
iouring.wake_rearm_needed = false;
let (tx, rx) = oneshot::channel();
drop(rx);
let waiter_id = iouring.waiters.insert(
Request::Recv(RecvRequest {
fd: Arc::new(UnixStream::pair().unwrap().0.into()),
buf: IoBufMut::with_capacity(8),
offset: 4,
len: 8,
exact: true,
deadline: None,
result: None,
sender: tx,
}),
Some(1),
);
iouring.timeout_wheel.schedule(waiter_id, 1);
iouring.ready_queue.push_back(waiter_id);
let at_capacity = iouring
.fill_submission_queue(&mut ring)
.expect("channel should remain connected");
assert!(!at_capacity);
assert!(iouring.waiters.is_empty());
assert_eq!(ring.submission().len(), 0);
assert_eq!(iouring.timeout_wheel.next_deadline(), None);
}
#[tokio::test]
async fn test_single_issuer() {
let cfg = Config {
single_issuer: true,
..Default::default()
};
let mut registry = Registry::default();
let (sender, iouring) = IoUringLoop::new(cfg, &mut registry);
let uring_thread = std::thread::spawn(move || iouring.run());
let (sock_left, sock_right) = UnixStream::pair().unwrap();
let recv = sender.recv(
Arc::new(sock_left.into()),
IoBufMut::with_capacity(5),
0,
5,
true,
Instant::now() + Duration::from_secs(5),
);
let send = sender.send(
Arc::new(sock_right.into()),
IoBufs::from(IoBuf::from(b"hello")),
Instant::now() + Duration::from_secs(5),
);
let (recv_result, send_result) =
tokio::time::timeout(Duration::from_secs(2), join(recv, send))
.await
.expect("recv/send timed out");
let (_, read) = recv_result.expect("recv should succeed");
assert_eq!(read, 5);
send_result.expect("send should succeed");
drop(sender);
uring_thread.join().unwrap();
}
}