use std::mem::drop as unlock;
use std::os::fd::RawFd;
use std::sync::Arc;
use std::sync::atomic::{self, Ordering};
use std::time::Duration;
use std::{fmt, io, ptr, task};
use crate::io_uring::{Shared, cq, libc, load_kernel_shared};
use crate::{asan, lock};
#[derive(Clone, Debug)]
pub(crate) struct Submissions {
shared: Arc<Shared>,
}
impl Submissions {
pub(crate) fn new(shared: Shared) -> Submissions {
Submissions {
shared: Arc::new(shared),
}
}
pub(crate) fn add<F>(&self, fill_submission: F) -> Result<(), QueueFull>
where
F: FnOnce(&mut Submission),
{
let shared = &*self.shared;
let len = shared.submissions_len;
if shared.unsubmitted_submissions() >= len {
return Err(QueueFull);
}
let submissions_guard = lock(&shared.submissions_lock);
let head = load_kernel_shared(shared.submissions_head);
let tail = load_kernel_shared(shared.submissions_tail);
if (tail - head) > len {
unlock(submissions_guard);
return Err(QueueFull);
}
let index = (tail & (len - 1)) as usize;
let submission = unsafe { &mut *shared.submissions.add(index).as_ptr() };
asan::unpoison(submission);
submission.reset();
fill_submission(submission);
#[cfg(debug_assertions)]
debug_assert!(!submission.is_unchanged());
atomic::fence(Ordering::SeqCst);
let new_tail = tail.wrapping_add(1);
unsafe { (*shared.submissions_tail.as_ptr()).store(new_tail, Ordering::Release) }
log::trace!(submission:?, index, tail, new_tail; "queueing submission");
asan::poison(submission);
unlock(submissions_guard);
Ok(())
}
pub(super) fn cancel(&self, user_data: u64) -> Result<(), QueueFull> {
self.add(|submission| {
submission.0.opcode = libc::IORING_OP_ASYNC_CANCEL as u8;
submission.0.__bindgen_anon_2 = libc::io_uring_sqe__bindgen_ty_2 { addr: user_data };
submission.no_success_event();
})
}
pub(crate) fn wake(&self) -> io::Result<()> {
log::trace!("waking up ring");
if !self.shared.is_polling.load(Ordering::Acquire) {
log::trace!("skipping ring message as it's not polling");
return Ok(());
}
loop {
let submitted = self.add(|submission| {
submission.0.opcode = libc::IORING_OP_MSG_RING as u8;
submission.0.fd = self.ring_fd();
submission.0.__bindgen_anon_2 = libc::io_uring_sqe__bindgen_ty_2 {
addr: u64::from(libc::IORING_MSG_DATA),
};
submission.0.__bindgen_anon_1 = libc::io_uring_sqe__bindgen_ty_1 {
off: cq::WAKE_USER_DATA,
};
});
self.shared.enter(0, 0, Some(Duration::ZERO))?;
if let Ok(()) = submitted {
return Ok(());
}
}
}
pub(crate) fn wait_for_submission(&self, waker: task::Waker) {
log::trace!(waker:?; "adding future waiting on submission slot");
let shared = &*self.shared;
lock(&shared.blocked_futures).push(waker);
}
pub(crate) fn shared(&self) -> &Shared {
&self.shared
}
pub(super) fn ring_fd(&self) -> RawFd {
self.shared.ring_fd()
}
}
pub(crate) struct QueueFull;
impl From<QueueFull> for io::Error {
fn from(QueueFull: QueueFull) -> io::Error {
io::Error::new(io::ErrorKind::ResourceBusy, "submission queue is full")
}
}
impl fmt::Debug for QueueFull {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("QueueFull").finish()
}
}
impl fmt::Display for QueueFull {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("`a10::Ring` submission queue is full")
}
}
#[repr(transparent)]
pub(crate) struct Submission(pub(crate) libc::io_uring_sqe);
impl Submission {
#[allow(clippy::assertions_on_constants)]
fn reset(&mut self) {
debug_assert!(libc::IORING_OP_NOP == 0);
unsafe { ptr::from_mut(&mut self.0).write_bytes(0, 1) };
}
#[cfg(debug_assertions)]
const fn is_unchanged(&self) -> bool {
self.0.opcode == libc::IORING_OP_NOP as u8
}
pub(super) fn set_async(&mut self) {
self.0.flags |= libc::IOSQE_ASYNC;
}
pub(super) fn no_success_event(&mut self) {
self.0.flags |= libc::IOSQE_CQE_SKIP_SUCCESS;
}
}
impl fmt::Debug for Submission {
#[allow(clippy::too_many_lines)] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn io_op(f: &mut fmt::DebugStruct<'_, '_>, submission: &libc::io_uring_sqe, name: &str) {
f.field("opcode", &name)
.field("fd", &submission.fd)
.field("offset", unsafe { &submission.__bindgen_anon_1.off })
.field("addr", unsafe { &submission.__bindgen_anon_2.addr })
.field("len", &submission.len);
}
fn net_op(f: &mut fmt::DebugStruct<'_, '_>, submission: &libc::io_uring_sqe, name: &str) {
let buf_group = unsafe { submission.__bindgen_anon_4.buf_group };
f.field("opcode", &name)
.field("fd", &submission.fd)
.field("addr", unsafe { &submission.__bindgen_anon_2.addr })
.field("len", &submission.len)
.field("msg_flags", unsafe {
&submission.__bindgen_anon_3.msg_flags
})
.field("ioprio", &submission.ioprio)
.field("buf_group", &buf_group);
}
let mut f = f.debug_struct("io_uring::Submission");
match u32::from(self.0.opcode) {
libc::IORING_OP_NOP => {
f.field("opcode", &"IORING_OP_NOP");
}
libc::IORING_OP_FSYNC => {
f.field("opcode", &"IORING_OP_FSYNC")
.field("fd", &self.0.fd)
.field("fsync_flags", unsafe {
&self.0.__bindgen_anon_3.fsync_flags
});
}
libc::IORING_OP_READ => io_op(&mut f, &self.0, "IORING_OP_READ"),
libc::IORING_OP_READV => io_op(&mut f, &self.0, "IORING_OP_READV"),
libc::IORING_OP_READ_MULTISHOT => {
let buf_group = unsafe { self.0.__bindgen_anon_4.buf_group };
f.field("opcode", &"IORING_OP_READ_MULTISHOT")
.field("fd", &self.0.fd)
.field("flags", &self.0.flags)
.field("buf_group", &buf_group);
}
libc::IORING_OP_WRITE => io_op(&mut f, &self.0, "IORING_OP_WRITE"),
libc::IORING_OP_WRITEV => io_op(&mut f, &self.0, "IORING_OP_WRITEV"),
libc::IORING_OP_RENAMEAT => {
f.field("opcode", &"IORING_OP_RENAMEAT")
.field("old_fd", &self.0.fd)
.field("old_path", unsafe { &self.0.__bindgen_anon_2.addr })
.field("new_fd", &self.0.len)
.field("new_path", unsafe { &self.0.__bindgen_anon_1.off })
.field("rename_flags", unsafe {
&self.0.__bindgen_anon_3.rename_flags
});
}
libc::IORING_OP_SOCKET => {
f.field("opcode", &"IORING_OP_SOCKET")
.field("domain", &crate::net::Domain(self.0.fd))
.field("type", &unsafe {
crate::net::Type(self.0.__bindgen_anon_1.off as u32)
})
.field("file_index", unsafe { &self.0.__bindgen_anon_5.file_index })
.field("protocol", &crate::net::Protocol(self.0.len))
.field("rw_flags", unsafe { &self.0.__bindgen_anon_3.rw_flags });
}
libc::IORING_OP_CONNECT => {
f.field("opcode", &"IORING_OP_CONNECT")
.field("fd", &self.0.fd)
.field("addr", unsafe { &self.0.__bindgen_anon_2.addr })
.field("addr_size", unsafe { &self.0.__bindgen_anon_1.off });
}
libc::IORING_OP_SEND => net_op(&mut f, &self.0, "IORING_OP_SEND"),
libc::IORING_OP_SEND_ZC => net_op(&mut f, &self.0, "IORING_OP_SEND_ZC"),
libc::IORING_OP_SENDMSG => net_op(&mut f, &self.0, "IORING_OP_SENDMSG"),
libc::IORING_OP_SENDMSG_ZC => net_op(&mut f, &self.0, "IORING_OP_SENDMSG_ZC"),
libc::IORING_OP_RECV => net_op(&mut f, &self.0, "IORING_OP_RECV"),
libc::IORING_OP_RECVMSG => net_op(&mut f, &self.0, "IORING_OP_RECVMSG"),
libc::IORING_OP_SHUTDOWN => {
f.field("opcode", &"IORING_OP_SHUTDOWN")
.field("fd", &self.0.fd)
.field("how", &self.0.len);
}
libc::IORING_OP_BIND => {
f.field("opcode", &"IORING_OP_BIND")
.field("fd", &self.0.fd)
.field("addr", unsafe { &self.0.__bindgen_anon_2.addr })
.field("addr_size", unsafe { &self.0.__bindgen_anon_1.addr2 });
}
libc::IORING_OP_LISTEN => {
f.field("opcode", &"IORING_OP_LISTEN")
.field("fd", &self.0.fd)
.field("backlog", &self.0.len);
}
libc::IORING_OP_ACCEPT => {
f.field("opcode", &"IORING_OP_ACCEPT")
.field("fd", &self.0.fd)
.field("addr", unsafe { &self.0.__bindgen_anon_2.addr })
.field("addr_size", unsafe { &self.0.__bindgen_anon_1.off })
.field("accept_flags", unsafe {
&self.0.__bindgen_anon_3.accept_flags
})
.field("file_index", unsafe { &self.0.__bindgen_anon_5.file_index })
.field("ioprio", &self.0.ioprio);
}
libc::IORING_OP_URING_CMD => {
fn sockopt(
f: &mut fmt::DebugStruct<'_, '_>,
submission: &libc::io_uring_sqe,
name: &str,
) {
let level = crate::net::Level(unsafe {
submission.__bindgen_anon_2.__bindgen_anon_1.level
});
let opt = unsafe { submission.__bindgen_anon_2.__bindgen_anon_1.optname };
f.field("cmd_op", &name).field("level", &level);
match level {
crate::net::Level::SOCKET => f.field("name", &crate::net::SocketOpt(opt)),
crate::net::Level::IPV4 => f.field("name", &crate::net::IPv4Opt(opt)),
crate::net::Level::IPV6 => f.field("name", &crate::net::IPv6Opt(opt)),
crate::net::Level::TCP => f.field("name", &crate::net::TcpOpt(opt)),
crate::net::Level::UDP => f.field("name", &crate::net::UdpOpt(opt)),
_ => f.field("name", &crate::net::Opt(opt)),
};
f.field("len", unsafe { &submission.__bindgen_anon_5.optlen });
}
f.field("opcode", &"IORING_OP_URING_CMD")
.field("fd", &self.0.fd);
match unsafe { self.0.__bindgen_anon_1.__bindgen_anon_1.cmd_op } {
libc::SOCKET_URING_OP_GETSOCKOPT => {
sockopt(&mut f, &self.0, "SOCKET_URING_OP_GETSOCKOPT");
}
libc::SOCKET_URING_OP_SETSOCKOPT => {
sockopt(&mut f, &self.0, "SOCKET_URING_OP_SETSOCKOPT");
f.field("value", unsafe { &*self.0.__bindgen_anon_6.optval });
}
op => {
f.field("cmd_op", &op);
}
}
}
libc::IORING_OP_ASYNC_CANCEL => {
f.field("opcode", &"IORING_OP_ASYNC_CANCEL");
let cancel_flags = unsafe { self.0.__bindgen_anon_3.cancel_flags };
#[allow(clippy::if_not_else)]
if (cancel_flags & libc::IORING_ASYNC_CANCEL_FD) != 0 {
f.field("fd", &self.0.fd)
.field("cancel_flags", &cancel_flags);
} else {
f.field("addr", unsafe { &self.0.__bindgen_anon_2.addr });
}
}
libc::IORING_OP_OPENAT => {
f.field("opcode", &"IORING_OP_OPENAT")
.field("dirfd", &self.0.fd)
.field("pathname", unsafe { &self.0.__bindgen_anon_2.addr })
.field("mode", &self.0.len)
.field("open_flags", unsafe { &self.0.__bindgen_anon_3.open_flags })
.field("file_index", unsafe { &self.0.__bindgen_anon_5.file_index });
}
libc::IORING_OP_SPLICE => {
f.field("opcode", &"IORING_OP_SPLICE")
.field("fd_in", unsafe { &self.0.__bindgen_anon_5.splice_fd_in })
.field("off_in", unsafe { &self.0.__bindgen_anon_2.splice_off_in })
.field("fd_out", &self.0.fd)
.field("off_out", unsafe { &self.0.__bindgen_anon_1.off })
.field("len", &self.0.len)
.field("splice_flags", unsafe {
&self.0.__bindgen_anon_3.splice_flags
});
}
libc::IORING_OP_CLOSE => {
f.field("opcode", &"IORING_OP_CLOSE")
.field("fd", &self.0.fd)
.field("file_index", unsafe { &self.0.__bindgen_anon_5.file_index });
}
libc::IORING_OP_FILES_UPDATE => {
f.field("opcode", &"IORING_OP_FILES_UPDATE")
.field("fd", &self.0.fd)
.field("offset", unsafe { &self.0.__bindgen_anon_1.off })
.field("fds", unsafe { &self.0.__bindgen_anon_2.addr })
.field("len", &self.0.len);
}
libc::IORING_OP_STATX => {
f.field("opcode", &"IORING_OP_STATX")
.field("fd", &self.0.fd)
.field("pathname", unsafe { &self.0.__bindgen_anon_2.addr })
.field("statx_flags", unsafe {
&self.0.__bindgen_anon_3.statx_flags
})
.field("mask", &self.0.len)
.field("statx", unsafe { &self.0.__bindgen_anon_1.off });
}
libc::IORING_OP_FADVISE => {
f.field("opcode", &"IORING_OP_FADVISE")
.field("fd", &self.0.fd)
.field("offset", unsafe { &self.0.__bindgen_anon_1.off })
.field("len", &self.0.len)
.field("advise", unsafe { &self.0.__bindgen_anon_3.fadvise_advice });
}
libc::IORING_OP_FALLOCATE => {
f.field("opcode", &"IORING_OP_FALLOCATE")
.field("fd", &self.0.fd)
.field("offset", unsafe { &self.0.__bindgen_anon_1.off })
.field("len", unsafe { &self.0.__bindgen_anon_2.addr })
.field("mode", &self.0.len);
}
libc::IORING_OP_FTRUNCATE => {
f.field("opcode", &"IORING_OP_FTRUNCATE")
.field("fd", &self.0.fd)
.field("len", unsafe { &self.0.__bindgen_anon_1.off });
}
libc::IORING_OP_UNLINKAT => {
f.field("opcode", &"IORING_OP_UNLINKAT")
.field("dirfd", &self.0.fd)
.field("path", unsafe { &self.0.__bindgen_anon_2.addr })
.field("unlink_flags", unsafe {
&self.0.__bindgen_anon_3.unlink_flags
});
}
libc::IORING_OP_MKDIRAT => {
f.field("opcode", &"IORING_OP_MKDIRAT")
.field("dirfd", &self.0.fd)
.field("path", unsafe { &self.0.__bindgen_anon_2.addr })
.field("mode", &self.0.len);
}
libc::IORING_OP_POLL_ADD => {
f.field("opcode", &"IORING_OP_POLL_ADD")
.field("fd", &self.0.fd)
.field("poll_events", unsafe {
&self.0.__bindgen_anon_3.poll32_events
})
.field("multishot", &(self.0.len == libc::IORING_POLL_ADD_MULTI));
}
libc::IORING_OP_POLL_REMOVE => {
f.field("opcode", &"IORING_OP_POLL_REMOVE")
.field("target_user_data", unsafe { &self.0.__bindgen_anon_2.addr });
}
libc::IORING_OP_MADVISE => {
f.field("opcode", &"IORING_OP_MADVISE")
.field("address", unsafe { &self.0.__bindgen_anon_2.addr })
.field("len", &self.0.len)
.field("advise", unsafe { &self.0.__bindgen_anon_3.fadvise_advice });
}
libc::IORING_OP_MSG_RING => {
f.field("opcode", &"IORING_OP_MSG_RING")
.field("ringfd", &self.0.fd)
.field("msg1", &self.0.len)
.field("msg2", unsafe { &self.0.__bindgen_anon_1.off });
}
libc::IORING_OP_WAITID => {
f.field("opcode", &"IORING_OP_WAITID")
.field("id", &self.0.fd)
.field("id_type", &self.0.len)
.field("options", unsafe { &self.0.__bindgen_anon_5.file_index })
.field("info", unsafe { &self.0.__bindgen_anon_1.addr2 });
}
libc::IORING_OP_FIXED_FD_INSTALL => {
f.field("opcode", &"IORING_OP_FIXED_FD_INSTALL")
.field("fd", &self.0.fd)
.field("install_fd_flags", unsafe {
&self.0.__bindgen_anon_3.install_fd_flags
});
}
libc::IORING_OP_PIPE => {
f.field("opcode", &"IORING_OP_PIPE")
.field("fds", unsafe { &self.0.__bindgen_anon_2.addr })
.field("pipe_flags", unsafe { &self.0.__bindgen_anon_3.pipe_flags });
}
_ => {
f.field("opcode", &self.0.opcode)
.field("ioprio", &self.0.ioprio)
.field("fd", &self.0.fd)
.field("len", &self.0.len)
.field("personality", &self.0.personality);
}
}
f.field("flags", &self.0.flags)
.field("user_data", &(self.0.user_data as *const ()))
.finish()
}
}