use super::waiter::{WaiterId, WaiterState};
use crate::{Buf, Error, IoBuf, IoBufMut, IoBufs};
use commonware_utils::channel::oneshot;
use io_uring::{opcode, squeue::Entry as SqueueEntry, types::Fd};
use std::{
fs::File,
os::fd::{AsRawFd, OwnedFd},
sync::Arc,
time::Instant,
};
const IOVEC_BATCH_SIZE: usize = 32;
pub(super) enum WriteBuffers {
Single {
buf: IoBuf,
},
Vectored {
bufs: IoBufs,
iovecs: Box<[libc::iovec]>,
},
}
impl From<IoBufs> for WriteBuffers {
fn from(bufs: IoBufs) -> Self {
match bufs.try_into_single() {
Ok(buf) => Self::Single { buf },
Err(bufs) => {
let max_iovecs = bufs.chunk_count().min(IOVEC_BATCH_SIZE);
let iovecs: Box<[libc::iovec]> = std::iter::repeat_n(
libc::iovec {
iov_base: std::ptr::NonNull::<u8>::dangling().as_ptr().cast(),
iov_len: 0,
},
max_iovecs,
)
.collect();
Self::Vectored { bufs, iovecs }
}
}
}
}
impl WriteBuffers {
fn remaining_len(&self) -> usize {
match self {
Self::Single { buf } => buf.len(),
Self::Vectored { bufs, .. } => bufs.len(),
}
}
fn is_complete(&self) -> bool {
self.remaining_len() == 0
}
fn advance(&mut self, n: usize) {
match self {
Self::Single { buf } => buf.advance(n),
Self::Vectored { bufs, .. } => bufs.advance(n),
}
}
}
unsafe impl Send for Request {}
pub(super) enum Request {
#[cfg_attr(not(feature = "iouring-network"), allow(dead_code))]
Send(SendRequest),
#[cfg_attr(not(feature = "iouring-network"), allow(dead_code))]
Recv(RecvRequest),
#[cfg_attr(not(feature = "iouring-storage"), allow(dead_code))]
ReadAt(ReadAtRequest),
#[cfg_attr(not(feature = "iouring-storage"), allow(dead_code))]
WriteAt(WriteAtRequest),
#[cfg_attr(not(feature = "iouring-storage"), allow(dead_code))]
Sync(SyncRequest),
}
impl Request {
pub const fn deadline(&self) -> Option<Instant> {
match self {
Self::Send(r) => r.deadline,
Self::Recv(r) => r.deadline,
Self::ReadAt(_) | Self::WriteAt(_) | Self::Sync(_) => None,
}
}
pub const fn has_deadline(&self) -> bool {
self.deadline().is_some()
}
pub fn is_orphaned(&self) -> bool {
match self {
Self::Send(s) => s.sender.is_closed(),
Self::Recv(r) => r.sender.is_closed(),
Self::ReadAt(r) => r.sender.is_closed(),
Self::WriteAt(_) | Self::Sync(_) => false,
}
}
pub fn build_sqe(&mut self, waiter_id: WaiterId) -> SqueueEntry {
let sqe = match self {
Self::Send(s) => s.build_sqe(),
Self::Recv(r) => r.build_sqe(),
Self::ReadAt(r) => r.build_sqe(),
Self::WriteAt(w) => w.build_sqe(),
Self::Sync(s) => s.build_sqe(),
};
sqe.user_data(waiter_id.user_data())
}
pub fn on_cqe(&mut self, state: WaiterState, result: i32) -> bool {
match self {
Self::Send(s) => s.on_cqe(state, result),
Self::Recv(r) => r.on_cqe(state, result),
Self::ReadAt(r) => r.on_cqe(state, result),
Self::WriteAt(w) => w.on_cqe(state, result),
Self::Sync(s) => s.on_cqe(state, result),
}
}
pub fn complete(self) {
match self {
Self::Send(s) => {
let _ = s.sender.send(s.result.unwrap_or(Err(Error::SendFailed)));
}
Self::Recv(r) => {
let result = match r.result.unwrap_or(Err(Error::RecvFailed)) {
Ok(read) => Ok((r.buf, read)),
Err(err) => Err((r.buf, err)),
};
let _ = r.sender.send(result);
}
Self::ReadAt(r) => {
let result = match r.result.unwrap_or(Err(Error::ReadFailed)) {
Ok(()) => Ok(r.buf),
Err(err) => Err((r.buf, err)),
};
let _ = r.sender.send(result);
}
Self::WriteAt(w) => {
let _ = w.sender.send(w.result.unwrap_or(Err(Error::WriteFailed)));
}
Self::Sync(s) => {
let _ = s.sender.send(s.result.unwrap_or(Ok(())));
}
}
}
pub fn timeout(self) {
match self {
Self::Send(s) => {
let _ = s.sender.send(Err(Error::Timeout));
}
Self::Recv(r) => {
let _ = r.sender.send(Err((r.buf, Error::Timeout)));
}
Self::ReadAt(r) => {
let _ = r.sender.send(Err((r.buf, Error::Timeout)));
}
Self::WriteAt(w) => {
let _ = w.sender.send(Err(Error::Timeout));
}
Self::Sync(s) => {
let _ = s.sender.send(Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"request timed out",
)));
}
}
}
}
enum CqeResult {
Retry,
Cancelled,
Error(i32),
Zero,
Positive(usize),
}
impl CqeResult {
const fn from_raw(result: i32, state: WaiterState) -> Self {
if result == -libc::EAGAIN || result == -libc::EWOULDBLOCK || result == -libc::EINTR {
Self::Retry
} else if result == -libc::ECANCELED && matches!(state, WaiterState::CancelRequested) {
Self::Cancelled
} else if result < 0 {
Self::Error(result)
} else if result == 0 {
Self::Zero
} else {
Self::Positive(result as usize)
}
}
}
pub(super) struct SendRequest {
pub(super) fd: Arc<OwnedFd>,
pub(super) write: WriteBuffers,
pub(super) deadline: Option<Instant>,
pub(super) result: Option<Result<(), Error>>,
pub(super) sender: oneshot::Sender<Result<(), Error>>,
}
impl SendRequest {
fn build_sqe(&mut self) -> SqueueEntry {
let fd = Fd(self.fd.as_raw_fd());
match &mut self.write {
WriteBuffers::Single { buf } => {
let ptr = buf.as_ptr();
let remaining = buf.remaining();
opcode::Send::new(
fd,
ptr,
remaining
.try_into()
.expect("single-buffer SQE length exceeds u32"),
)
.build()
}
WriteBuffers::Vectored { bufs, iovecs } => {
let max_iovecs = bufs.chunk_count().min(iovecs.len());
let io_slices: &mut [std::io::IoSlice<'_>] = unsafe {
std::slice::from_raw_parts_mut(
iovecs.as_mut_ptr().cast::<std::io::IoSlice<'_>>(),
max_iovecs,
)
};
let iovecs_len = bufs
.chunks_vectored(io_slices)
.try_into()
.expect("iovecs_len exceeds u32");
opcode::Writev::new(fd, iovecs.as_ptr(), iovecs_len).build()
}
}
}
fn on_cqe(&mut self, state: WaiterState, result: i32) -> bool {
match CqeResult::from_raw(result, state) {
CqeResult::Retry if matches!(state, WaiterState::CancelRequested) => {
self.result = Some(Err(Error::Timeout));
true
}
CqeResult::Retry => false,
CqeResult::Cancelled => {
self.result = Some(Err(Error::Timeout));
true
}
CqeResult::Error(_) | CqeResult::Zero => {
self.result = Some(Err(Error::SendFailed));
true
}
CqeResult::Positive(n) => {
self.write.advance(n);
if self.write.is_complete() {
self.result = Some(Ok(()));
true
} else if matches!(state, WaiterState::CancelRequested) {
self.result = Some(Err(Error::Timeout));
true
} else {
false
}
}
}
}
}
pub(super) struct RecvRequest {
pub(super) fd: Arc<OwnedFd>,
pub(super) buf: IoBufMut,
pub(super) offset: usize,
pub(super) len: usize,
pub(super) exact: bool,
pub(super) deadline: Option<Instant>,
pub(super) result: Option<Result<usize, Error>>,
pub(super) sender: oneshot::Sender<Result<(IoBufMut, usize), (IoBufMut, Error)>>,
}
impl RecvRequest {
fn build_sqe(&mut self) -> SqueueEntry {
let fd = Fd(self.fd.as_raw_fd());
assert!(
self.offset <= self.len && self.len <= self.buf.capacity(),
"recv invariant violated: need offset <= len <= capacity"
);
let ptr = unsafe { self.buf.as_mut_ptr().add(self.offset) };
let remaining = self.len - self.offset;
opcode::Recv::new(
fd,
ptr,
remaining
.try_into()
.expect("single-buffer SQE length exceeds u32"),
)
.build()
}
fn on_cqe(&mut self, state: WaiterState, result: i32) -> bool {
match CqeResult::from_raw(result, state) {
CqeResult::Retry if matches!(state, WaiterState::CancelRequested) => {
self.result = Some(Err(Error::Timeout));
true
}
CqeResult::Retry => false,
CqeResult::Cancelled => {
self.result = Some(Err(Error::Timeout));
true
}
CqeResult::Error(_) | CqeResult::Zero => {
self.result = Some(Err(Error::RecvFailed));
true
}
CqeResult::Positive(n) => {
let remaining = self.len - self.offset;
assert!(
n <= remaining,
"recv CQE exceeds requested length: n={n} remaining={remaining}"
);
self.offset += n;
if !self.exact || self.offset >= self.len {
self.result = Some(Ok(self.offset));
true
} else if matches!(state, WaiterState::CancelRequested) {
self.result = Some(Err(Error::Timeout));
true
} else {
false
}
}
}
}
}
pub(super) struct ReadAtRequest {
pub(super) file: Arc<File>,
pub(super) offset: u64,
pub(super) len: usize,
pub(super) read: usize,
pub(super) buf: IoBufMut,
pub(super) result: Option<Result<(), Error>>,
pub(super) sender: oneshot::Sender<Result<IoBufMut, (IoBufMut, Error)>>,
}
impl ReadAtRequest {
fn build_sqe(&mut self) -> SqueueEntry {
let fd = Fd(self.file.as_raw_fd());
assert!(
self.read <= self.len && self.len <= self.buf.capacity(),
"read_at invariant violated: need read <= len <= capacity"
);
let ptr = unsafe { self.buf.as_mut_ptr().add(self.read) };
let remaining = self.len - self.read;
let offset = self.offset + self.read as u64;
opcode::Read::new(
fd,
ptr,
remaining
.try_into()
.expect("single-buffer SQE length exceeds u32"),
)
.offset(offset)
.build()
}
fn on_cqe(&mut self, state: WaiterState, result: i32) -> bool {
match CqeResult::from_raw(result, state) {
CqeResult::Retry => false,
CqeResult::Cancelled | CqeResult::Error(_) => {
self.result = Some(Err(Error::ReadFailed));
true
}
CqeResult::Zero => {
self.result = Some(Err(Error::BlobInsufficientLength));
true
}
CqeResult::Positive(n) => {
let remaining = self.len - self.read;
assert!(
n <= remaining,
"read CQE exceeds requested length: n={n} remaining={remaining}"
);
self.read += n;
if self.read >= self.len {
self.result = Some(Ok(()));
true
} else {
false
}
}
}
}
}
pub(super) struct WriteAtRequest {
pub(super) file: Arc<File>,
pub(super) offset: u64,
pub(super) written: usize,
pub(super) write: WriteBuffers,
pub(super) result: Option<Result<(), Error>>,
pub(super) sender: oneshot::Sender<Result<(), Error>>,
}
impl WriteAtRequest {
fn build_sqe(&mut self) -> SqueueEntry {
let fd = Fd(self.file.as_raw_fd());
let offset = self.offset + self.written as u64;
match &mut self.write {
WriteBuffers::Single { buf } => {
let ptr = buf.as_ptr();
let remaining = buf.remaining();
opcode::Write::new(
fd,
ptr,
remaining
.try_into()
.expect("single-buffer SQE length exceeds u32"),
)
.offset(offset)
.build()
}
WriteBuffers::Vectored { bufs, iovecs } => {
let max_iovecs = bufs.chunk_count().min(iovecs.len());
let io_slices: &mut [std::io::IoSlice<'_>] = unsafe {
std::slice::from_raw_parts_mut(
iovecs.as_mut_ptr().cast::<std::io::IoSlice<'_>>(),
max_iovecs,
)
};
let iovecs_len = bufs
.chunks_vectored(io_slices)
.try_into()
.expect("iovecs_len exceeds u32");
opcode::Writev::new(fd, iovecs.as_ptr(), iovecs_len)
.offset(offset)
.build()
}
}
}
fn on_cqe(&mut self, state: WaiterState, result: i32) -> bool {
match CqeResult::from_raw(result, state) {
CqeResult::Retry => false,
CqeResult::Cancelled | CqeResult::Error(_) | CqeResult::Zero => {
self.result = Some(Err(Error::WriteFailed));
true
}
CqeResult::Positive(n) => {
self.written += n;
self.write.advance(n);
if self.write.is_complete() {
self.result = Some(Ok(()));
true
} else {
false
}
}
}
}
}
pub(super) struct SyncRequest {
pub(super) file: Arc<File>,
pub(super) result: Option<std::io::Result<()>>,
pub(super) sender: oneshot::Sender<std::io::Result<()>>,
}
impl SyncRequest {
fn build_sqe(&self) -> SqueueEntry {
let fd = Fd(self.file.as_raw_fd());
opcode::Fsync::new(fd).build()
}
fn on_cqe(&mut self, state: WaiterState, result: i32) -> bool {
match CqeResult::from_raw(result, state) {
CqeResult::Retry => false,
CqeResult::Cancelled => {
self.result = Some(Err(std::io::Error::from_raw_os_error(libc::ECANCELED)));
true
}
CqeResult::Error(code) => {
self.result = Some(Err(std::io::Error::from_raw_os_error(-code)));
true
}
CqeResult::Zero | CqeResult::Positive(_) => {
self.result = Some(Ok(()));
true
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_utils::channel::oneshot;
use futures::executor::block_on;
use std::{
os::{
fd::{FromRawFd, IntoRawFd},
unix::net::UnixStream,
},
panic::{catch_unwind, AssertUnwindSafe},
};
fn make_socket_fd() -> Arc<OwnedFd> {
let (left, _right) = UnixStream::pair().expect("failed to create unix socket pair");
Arc::new(left.into())
}
fn make_file_fd() -> Arc<File> {
let (left, _right) = UnixStream::pair().expect("failed to create unix socket pair");
let file = unsafe { File::from_raw_fd(left.into_raw_fd()) };
Arc::new(file)
}
#[test]
fn test_cqe_result_from_raw_retryable_codes() {
for code in [-libc::EAGAIN, -libc::EWOULDBLOCK, -libc::EINTR] {
assert!(matches!(
CqeResult::from_raw(code, WaiterState::Active { target_tick: None }),
CqeResult::Retry
));
}
for code in [0, -libc::EINVAL, -libc::ETIMEDOUT] {
assert!(!matches!(
CqeResult::from_raw(code, WaiterState::Active { target_tick: None }),
CqeResult::Retry
));
}
}
#[test]
fn test_request_deadline_helpers_and_invariants() {
let send_deadline = Instant::now();
let send = Request::Send(SendRequest {
fd: make_socket_fd(),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: Some(send_deadline),
result: None,
sender: oneshot::channel().0,
});
assert_eq!(send.deadline(), Some(send_deadline));
assert!(send.has_deadline());
let recv_deadline = Instant::now();
let recv = Request::Recv(RecvRequest {
fd: make_socket_fd(),
buf: IoBufMut::with_capacity(8),
offset: 0,
len: 8,
exact: true,
deadline: Some(recv_deadline),
result: None,
sender: oneshot::channel().0,
});
assert_eq!(recv.deadline(), Some(recv_deadline));
assert!(recv.has_deadline());
let read = Request::ReadAt(ReadAtRequest {
file: make_file_fd(),
offset: 0,
len: 4,
read: 0,
buf: IoBufMut::with_capacity(4),
result: None,
sender: oneshot::channel().0,
});
assert_eq!(read.deadline(), None);
assert!(!read.has_deadline());
let recv_overread = std::panic::catch_unwind(|| {
let mut request = Request::Recv(RecvRequest {
fd: make_socket_fd(),
buf: IoBufMut::with_capacity(4),
offset: 5,
len: 4,
exact: true,
deadline: None,
result: None,
sender: oneshot::channel().0,
});
let _ = request.build_sqe(WaiterId::new(0, 0));
});
assert!(recv_overread.is_err());
let recv_oversized = std::panic::catch_unwind(|| {
let mut request = Request::Recv(RecvRequest {
fd: make_socket_fd(),
buf: IoBufMut::with_capacity(4),
offset: 0,
len: 5,
exact: true,
deadline: None,
result: None,
sender: oneshot::channel().0,
});
let _ = request.build_sqe(WaiterId::new(0, 0));
});
assert!(recv_oversized.is_err());
let read_oversized = std::panic::catch_unwind(|| {
let mut request = Request::ReadAt(ReadAtRequest {
file: make_file_fd(),
offset: 0,
len: 5,
read: 0,
buf: IoBufMut::with_capacity(4),
result: None,
sender: oneshot::channel().0,
});
let _ = request.build_sqe(WaiterId::new(0, 0));
});
assert!(read_oversized.is_err());
let read_overread = std::panic::catch_unwind(|| {
let mut request = Request::ReadAt(ReadAtRequest {
file: make_file_fd(),
offset: 0,
len: 4,
read: 5,
buf: IoBufMut::with_capacity(8),
result: None,
sender: oneshot::channel().0,
});
let _ = request.build_sqe(WaiterId::new(0, 0));
});
assert!(read_overread.is_err());
}
#[test]
fn test_active_send_paths() {
let (tx, _rx) = oneshot::channel();
let mut request = Request::Send(SendRequest {
fd: make_socket_fd(),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: None,
result: None,
sender: tx,
});
assert!(!request.on_cqe(WaiterState::Active { target_tick: None }, -libc::EAGAIN));
let (tx, rx) = oneshot::channel();
let mut request = Request::Send(SendRequest {
fd: make_socket_fd(),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: None,
result: None,
sender: tx,
});
assert!(!request.on_cqe(WaiterState::Active { target_tick: None }, 2));
assert!(request.on_cqe(WaiterState::CancelRequested, -libc::EAGAIN));
request.complete();
assert!(matches!(
block_on(rx).expect("missing send result"),
Err(Error::Timeout)
));
let (tx, rx) = oneshot::channel();
let mut request = Request::Send(SendRequest {
fd: make_socket_fd(),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: None,
result: None,
sender: tx,
});
assert!(!request.on_cqe(WaiterState::Active { target_tick: None }, 2));
assert!(request.on_cqe(WaiterState::CancelRequested, 1));
request.complete();
assert!(matches!(
block_on(rx).expect("missing partial-timeout result"),
Err(Error::Timeout)
));
let (tx, rx) = oneshot::channel();
let mut request = Request::Send(SendRequest {
fd: make_socket_fd(),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: None,
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::CancelRequested, -libc::ECANCELED));
request.complete();
assert!(matches!(
block_on(rx).expect("missing timeout-cancel result"),
Err(Error::Timeout)
));
let mut vectored = IoBufs::default();
vectored.append(IoBuf::from(b"abc"));
vectored.append(IoBuf::from(b"de"));
let (tx, rx) = oneshot::channel();
let mut request = Request::Send(SendRequest {
fd: make_socket_fd(),
write: vectored.into(),
deadline: None,
result: None,
sender: tx,
});
assert!(!request.on_cqe(WaiterState::Active { target_tick: None }, 3));
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, 2));
request.complete();
block_on(rx)
.expect("missing send completion")
.expect("send should complete successfully");
let (tx, rx) = oneshot::channel();
let mut request = Request::Send(SendRequest {
fd: make_socket_fd(),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: None,
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, 0));
request.complete();
assert!(matches!(
block_on(rx).expect("missing zero-result completion"),
Err(Error::SendFailed)
));
let (tx, rx) = oneshot::channel();
let mut request = Request::Send(SendRequest {
fd: make_socket_fd(),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: None,
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, -libc::EIO));
request.complete();
assert!(matches!(
block_on(rx).expect("missing hard-error completion"),
Err(Error::SendFailed)
));
let (tx, rx) = oneshot::channel();
let mut request = Request::Send(SendRequest {
fd: make_socket_fd(),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: None,
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::CancelRequested, 5));
request.complete();
block_on(rx)
.expect("missing send completion")
.expect("send should complete successfully");
}
#[test]
fn test_active_recv_paths() {
let (tx, _rx) = oneshot::channel();
let mut request = Request::Recv(RecvRequest {
fd: make_socket_fd(),
buf: IoBufMut::with_capacity(5),
offset: 0,
len: 5,
exact: true,
deadline: None,
result: None,
sender: tx,
});
assert!(!request.on_cqe(WaiterState::Active { target_tick: None }, -libc::EAGAIN));
let (tx, rx) = oneshot::channel();
let mut request = Request::Recv(RecvRequest {
fd: make_socket_fd(),
buf: IoBufMut::with_capacity(5),
offset: 0,
len: 5,
exact: false,
deadline: None,
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, 3));
request.complete();
let (_buf, read) = block_on(rx)
.expect("missing recv completion")
.expect("recv should complete successfully");
assert_eq!(read, 3);
let (tx, rx) = oneshot::channel();
let mut request = Request::Recv(RecvRequest {
fd: make_socket_fd(),
buf: IoBufMut::with_capacity(5),
offset: 0,
len: 5,
exact: true,
deadline: None,
result: None,
sender: tx,
});
assert!(!request.on_cqe(WaiterState::Active { target_tick: None }, 3));
assert!(request.on_cqe(WaiterState::CancelRequested, 1));
request.complete();
assert!(matches!(
block_on(rx).expect("missing timeout completion"),
Err((_, Error::Timeout))
));
let (tx, rx) = oneshot::channel();
let mut request = Request::Recv(RecvRequest {
fd: make_socket_fd(),
buf: IoBufMut::with_capacity(5),
offset: 0,
len: 5,
exact: true,
deadline: None,
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::CancelRequested, -libc::EINTR));
request.complete();
assert!(matches!(
block_on(rx).expect("missing retryable completion"),
Err((_, Error::Timeout))
));
let (tx, rx) = oneshot::channel();
let mut request = Request::Recv(RecvRequest {
fd: make_socket_fd(),
buf: IoBufMut::with_capacity(5),
offset: 0,
len: 5,
exact: true,
deadline: None,
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::CancelRequested, -libc::ECANCELED));
request.complete();
assert!(matches!(
block_on(rx).expect("missing timeout-cancel completion"),
Err((_, Error::Timeout))
));
let (tx, rx) = oneshot::channel();
let mut request = Request::Recv(RecvRequest {
fd: make_socket_fd(),
buf: IoBufMut::with_capacity(5),
offset: 0,
len: 5,
exact: true,
deadline: None,
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::CancelRequested, 5));
request.complete();
let (_buf, read) = block_on(rx)
.expect("missing successful completion")
.expect("recv should complete successfully");
assert_eq!(read, 5);
let (tx, _rx) = oneshot::channel();
let mut request = Request::Recv(RecvRequest {
fd: make_socket_fd(),
buf: IoBufMut::with_capacity(5),
offset: 0,
len: 5,
exact: true,
deadline: None,
result: None,
sender: tx,
});
let overflow = catch_unwind(AssertUnwindSafe(|| {
let _ = request.on_cqe(WaiterState::Active { target_tick: None }, 6);
}));
assert!(overflow.is_err());
let (tx, rx) = oneshot::channel();
let mut request = Request::Recv(RecvRequest {
fd: make_socket_fd(),
buf: IoBufMut::with_capacity(5),
offset: 0,
len: 5,
exact: true,
deadline: None,
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, 0));
request.complete();
assert!(matches!(
block_on(rx).expect("missing zero completion"),
Err((_, Error::RecvFailed))
));
let (tx, rx) = oneshot::channel();
let mut request = Request::Recv(RecvRequest {
fd: make_socket_fd(),
buf: IoBufMut::with_capacity(5),
offset: 0,
len: 5,
exact: true,
deadline: None,
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, -libc::EIO));
request.complete();
assert!(matches!(
block_on(rx).expect("missing error completion"),
Err((_, Error::RecvFailed))
));
}
#[test]
fn test_active_read_at_paths() {
let (tx, _rx) = oneshot::channel();
let mut request = Request::ReadAt(ReadAtRequest {
file: make_file_fd(),
offset: 0,
len: 5,
read: 0,
buf: IoBufMut::with_capacity(5),
result: None,
sender: tx,
});
assert!(!request.on_cqe(WaiterState::Active { target_tick: None }, -libc::EAGAIN));
let (tx, rx) = oneshot::channel();
let mut request = Request::ReadAt(ReadAtRequest {
file: make_file_fd(),
offset: 0,
len: 5,
read: 0,
buf: IoBufMut::with_capacity(5),
result: None,
sender: tx,
});
assert!(!request.on_cqe(WaiterState::Active { target_tick: None }, 2));
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, 3));
request.complete();
block_on(rx)
.expect("missing read completion")
.expect("read should complete successfully");
let (tx, rx) = oneshot::channel();
let mut request = Request::ReadAt(ReadAtRequest {
file: make_file_fd(),
offset: 0,
len: 5,
read: 0,
buf: IoBufMut::with_capacity(5),
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, 0));
request.complete();
assert!(matches!(
block_on(rx).expect("missing eof completion"),
Err((_, Error::BlobInsufficientLength))
));
let (tx, rx) = oneshot::channel();
let mut request = Request::ReadAt(ReadAtRequest {
file: make_file_fd(),
offset: 0,
len: 5,
read: 0,
buf: IoBufMut::with_capacity(5),
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, -libc::EIO));
request.complete();
assert!(matches!(
block_on(rx).expect("missing read failure"),
Err((_, Error::ReadFailed))
));
let (tx, rx) = oneshot::channel();
let mut request = Request::ReadAt(ReadAtRequest {
file: make_file_fd(),
offset: 0,
len: 5,
read: 0,
buf: IoBufMut::with_capacity(5),
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::CancelRequested, -libc::ECANCELED));
request.complete();
assert!(matches!(
block_on(rx).expect("missing timeout-cancel failure"),
Err((_, Error::ReadFailed))
));
}
#[test]
fn test_active_write_at_paths() {
let (tx, _rx) = oneshot::channel();
let mut request = Request::WriteAt(WriteAtRequest {
file: make_file_fd(),
offset: 0,
written: 0,
write: IoBufs::from(IoBuf::from(b"hello")).into(),
result: None,
sender: tx,
});
assert!(!request.on_cqe(WaiterState::Active { target_tick: None }, -libc::EAGAIN));
let (tx, rx) = oneshot::channel();
let mut request = Request::WriteAt(WriteAtRequest {
file: make_file_fd(),
offset: 0,
written: 0,
write: IoBufs::from(IoBuf::from(b"hello")).into(),
result: None,
sender: tx,
});
assert!(!request.on_cqe(WaiterState::Active { target_tick: None }, 2));
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, 3));
request.complete();
block_on(rx)
.expect("missing write completion")
.expect("write should complete successfully");
let mut vectored = IoBufs::default();
vectored.append(IoBuf::from(b"abc"));
vectored.append(IoBuf::from(b"de"));
let (tx, rx) = oneshot::channel();
let mut request = Request::WriteAt(WriteAtRequest {
file: make_file_fd(),
offset: 0,
written: 0,
write: vectored.into(),
result: None,
sender: tx,
});
assert!(!request.on_cqe(WaiterState::Active { target_tick: None }, 4));
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, 1));
request.complete();
block_on(rx)
.expect("missing vectored write completion")
.expect("vectored write should complete successfully");
let (tx, rx) = oneshot::channel();
let mut request = Request::WriteAt(WriteAtRequest {
file: make_file_fd(),
offset: 0,
written: 0,
write: IoBufs::from(IoBuf::from(b"hello")).into(),
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, 0));
request.complete();
assert!(matches!(
block_on(rx).expect("missing zero-result write"),
Err(Error::WriteFailed)
));
let (tx, rx) = oneshot::channel();
let mut request = Request::WriteAt(WriteAtRequest {
file: make_file_fd(),
offset: 0,
written: 0,
write: IoBufs::from(IoBuf::from(b"hello")).into(),
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, -libc::EIO));
request.complete();
assert!(matches!(
block_on(rx).expect("missing write failure"),
Err(Error::WriteFailed)
));
let (tx, rx) = oneshot::channel();
let mut request = Request::WriteAt(WriteAtRequest {
file: make_file_fd(),
offset: 0,
written: 0,
write: IoBufs::from(IoBuf::from(b"hello")).into(),
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::CancelRequested, -libc::ECANCELED));
request.complete();
assert!(matches!(
block_on(rx).expect("missing timeout-cancel write failure"),
Err(Error::WriteFailed)
));
}
#[test]
fn test_active_sync_paths() {
let (tx, _rx) = oneshot::channel();
let mut request = Request::Sync(SyncRequest {
file: make_file_fd(),
result: None,
sender: tx,
});
assert!(!request.on_cqe(WaiterState::Active { target_tick: None }, -libc::EINTR));
let (tx, rx) = oneshot::channel();
let mut request = Request::Sync(SyncRequest {
file: make_file_fd(),
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::CancelRequested, -libc::ECANCELED));
request.complete();
let err = block_on(rx)
.expect("missing timeout cancel result")
.expect_err("expected timeout cancel error");
assert_eq!(err.raw_os_error(), Some(libc::ECANCELED));
let (tx, rx) = oneshot::channel();
let mut request = Request::Sync(SyncRequest {
file: make_file_fd(),
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, -libc::EIO));
request.complete();
let err = block_on(rx)
.expect("missing hard error result")
.expect_err("expected hard error");
assert_eq!(err.raw_os_error(), Some(libc::EIO));
let (tx, rx) = oneshot::channel();
let mut request = Request::Sync(SyncRequest {
file: make_file_fd(),
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, 0));
request.complete();
block_on(rx)
.expect("missing zero-result completion")
.expect("sync should succeed on zero");
let (tx, rx) = oneshot::channel();
let mut request = Request::Sync(SyncRequest {
file: make_file_fd(),
result: None,
sender: tx,
});
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, 1));
request.complete();
block_on(rx)
.expect("missing positive-result completion")
.expect("sync should succeed on positive");
let (tx, rx) = oneshot::channel();
let request = Request::Sync(SyncRequest {
file: make_file_fd(),
result: None,
sender: tx,
});
request.timeout();
let err = block_on(rx)
.expect("missing timeout result")
.expect_err("expected timeout error");
assert_eq!(err.kind(), std::io::ErrorKind::TimedOut);
}
#[test]
fn test_finish_without_cqe_uses_fallback_results() {
let (tx, rx) = oneshot::channel();
let request = Request::Send(SendRequest {
fd: make_socket_fd(),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: None,
result: None,
sender: tx,
});
request.complete();
assert!(matches!(
block_on(rx).expect("missing send fallback"),
Err(Error::SendFailed)
));
let (tx, rx) = oneshot::channel();
let request = Request::Recv(RecvRequest {
fd: make_socket_fd(),
buf: IoBufMut::with_capacity(5),
offset: 0,
len: 5,
exact: true,
deadline: None,
result: None,
sender: tx,
});
request.complete();
assert!(matches!(
block_on(rx).expect("missing recv fallback"),
Err((_, Error::RecvFailed))
));
let (tx, rx) = oneshot::channel();
let request = Request::ReadAt(ReadAtRequest {
file: make_file_fd(),
offset: 0,
len: 5,
read: 0,
buf: IoBufMut::with_capacity(5),
result: None,
sender: tx,
});
request.complete();
assert!(matches!(
block_on(rx).expect("missing read fallback"),
Err((_, Error::ReadFailed))
));
let (tx, rx) = oneshot::channel();
let request = Request::WriteAt(WriteAtRequest {
file: make_file_fd(),
offset: 0,
written: 0,
write: IoBufs::from(IoBuf::from(b"hello")).into(),
result: None,
sender: tx,
});
request.complete();
assert!(matches!(
block_on(rx).expect("missing write fallback"),
Err(Error::WriteFailed)
));
let (tx, rx) = oneshot::channel();
let request = Request::Sync(SyncRequest {
file: make_file_fd(),
result: None,
sender: tx,
});
request.complete();
block_on(rx)
.expect("missing sync fallback")
.expect("sync fallback should be success");
}
#[test]
fn test_finish_timeout_delivers_timeout_results() {
let (tx, rx) = oneshot::channel();
let request = Request::Send(SendRequest {
fd: make_socket_fd(),
write: IoBufs::from(IoBuf::from(b"hello")).into(),
deadline: None,
result: None,
sender: tx,
});
request.timeout();
assert!(matches!(
block_on(rx).expect("missing send timeout"),
Err(Error::Timeout)
));
let (tx, rx) = oneshot::channel();
let request = Request::Recv(RecvRequest {
fd: make_socket_fd(),
buf: IoBufMut::with_capacity(5),
offset: 0,
len: 5,
exact: true,
deadline: None,
result: None,
sender: tx,
});
request.timeout();
assert!(matches!(
block_on(rx).expect("missing recv timeout"),
Err((_, Error::Timeout))
));
let (tx, rx) = oneshot::channel();
let request = Request::ReadAt(ReadAtRequest {
file: make_file_fd(),
offset: 0,
len: 5,
read: 0,
buf: IoBufMut::with_capacity(5),
result: None,
sender: tx,
});
request.timeout();
assert!(matches!(
block_on(rx).expect("missing read timeout"),
Err((_, Error::Timeout))
));
let (tx, rx) = oneshot::channel();
let request = Request::WriteAt(WriteAtRequest {
file: make_file_fd(),
offset: 0,
written: 0,
write: IoBufs::from(IoBuf::from(b"hello")).into(),
result: None,
sender: tx,
});
request.timeout();
assert!(matches!(
block_on(rx).expect("missing write timeout"),
Err(Error::Timeout)
));
let (tx, rx) = oneshot::channel();
let request = Request::Sync(SyncRequest {
file: make_file_fd(),
result: None,
sender: tx,
});
request.timeout();
let err = block_on(rx)
.expect("missing sync timeout")
.expect_err("sync timeout should be an error");
assert_eq!(err.kind(), std::io::ErrorKind::TimedOut);
}
}