use std::io;
use std::mem;
use std::ffi::CStr;
use std::ops::{Deref, DerefMut};
use std::os::unix::io::RawFd;
use std::ptr;
use std::slice;
use crate::registrar::{UringFd, UringReadBuf, UringWriteBuf};
pub use nix::fcntl::{OFlag, FallocateFlags, PosixFadviseAdvice};
pub use nix::poll::PollFlags;
pub use nix::sys::epoll::{EpollOp, EpollEvent};
pub use nix::sys::mman::MmapAdvise;
pub use nix::sys::stat::Mode;
pub use nix::sys::socket::{SockAddr, SockFlag, MsgFlags};
use crate::Personality;
pub struct SQE<'a> {
sqe: &'a mut uring_sys::io_uring_sqe,
}
impl<'a> SQE<'a> {
pub(crate) fn new(sqe: &'a mut uring_sys::io_uring_sqe) -> SQE<'a> {
SQE { sqe }
}
#[inline]
pub fn user_data(&self) -> u64 {
self.sqe.user_data as u64
}
pub unsafe fn set_user_data(&mut self, user_data: u64) {
self.sqe.user_data = user_data as _;
}
#[inline]
pub fn flags(&self) -> SubmissionFlags {
unsafe { SubmissionFlags::from_bits_unchecked(self.sqe.flags as _) }
}
pub fn overwrite_flags(&mut self, flags: SubmissionFlags) {
self.sqe.flags = flags.bits() as _;
}
#[inline]
pub(crate) fn set_fixed_file(&mut self) {
self.set_flags(SubmissionFlags::FIXED_FILE);
}
#[inline]
pub fn set_flags(&mut self, flags: SubmissionFlags) {
self.sqe.flags |= flags.bits();
}
#[inline]
pub fn set_personality(&mut self, personality: Personality) {
self.sqe.buf_index.buf_index.personality = personality.id;
}
#[inline]
pub unsafe fn prep_read(
&mut self,
fd: impl UringFd,
buf: impl UringReadBuf,
offset: u64,
) {
buf.prep_read(fd, self, offset);
}
#[inline]
pub unsafe fn prep_read_vectored(
&mut self,
fd: impl UringFd,
bufs: &mut [io::IoSliceMut<'_>],
offset: u64,
) {
let len = bufs.len();
let addr = bufs.as_mut_ptr();
uring_sys::io_uring_prep_readv(self.sqe, fd.as_raw_fd(), addr as _, len as _, offset as _);
fd.update_sqe(self);
}
#[inline]
pub unsafe fn prep_read_fixed(
&mut self,
fd: impl UringFd,
buf: &mut [u8],
offset: u64,
buf_index: u32,
) {
let len = buf.len();
let addr = buf.as_mut_ptr();
uring_sys::io_uring_prep_read_fixed(self.sqe,
fd.as_raw_fd(),
addr as _,
len as _,
offset as _,
buf_index as _);
fd.update_sqe(self);
}
#[inline]
pub unsafe fn prep_write(
&mut self,
fd: impl UringFd,
buf: impl UringWriteBuf,
offset: u64,
) {
buf.prep_write(fd, self, offset)
}
#[inline]
pub unsafe fn prep_write_vectored(
&mut self,
fd: impl UringFd,
bufs: &[io::IoSlice<'_>],
offset: u64,
) {
let len = bufs.len();
let addr = bufs.as_ptr();
uring_sys::io_uring_prep_writev(self.sqe,
fd.as_raw_fd(),
addr as _,
len as _,
offset as _);
fd.update_sqe(self);
}
#[inline]
pub unsafe fn prep_write_fixed(
&mut self,
fd: impl UringFd,
buf: &[u8],
offset: u64,
buf_index: usize,
) {
let len = buf.len();
let addr = buf.as_ptr();
uring_sys::io_uring_prep_write_fixed(self.sqe,
fd.as_raw_fd(),
addr as _,
len as _,
offset as _,
buf_index as _);
fd.update_sqe(self);
}
#[inline]
pub unsafe fn prep_fsync(&mut self, fd: impl UringFd, flags: FsyncFlags) {
uring_sys::io_uring_prep_fsync(self.sqe, fd.as_raw_fd(), flags.bits() as _);
fd.update_sqe(self);
}
#[inline]
pub unsafe fn prep_splice(
&mut self,
fd_in: RawFd,
off_in: i64,
fd_out: RawFd,
off_out: i64,
count: u32,
flags: SpliceFlags,
) {
uring_sys::io_uring_prep_splice(self.sqe, fd_in, off_in, fd_out, off_out, count, flags.bits());
}
#[inline]
pub unsafe fn prep_recv(&mut self, fd: impl UringFd, buf: &mut [u8], flags: MsgFlags) {
let data = buf.as_mut_ptr() as *mut libc::c_void;
let len = buf.len();
uring_sys::io_uring_prep_send(self.sqe, fd.as_raw_fd(), data, len, flags.bits());
fd.update_sqe(self);
}
#[inline]
pub unsafe fn prep_send(&mut self, fd: impl UringFd, buf: &[u8], flags: MsgFlags) {
let data = buf.as_ptr() as *const libc::c_void as *mut libc::c_void;
let len = buf.len();
uring_sys::io_uring_prep_send(self.sqe, fd.as_raw_fd(), data, len, flags.bits());
fd.update_sqe(self);
}
pub unsafe fn prep_recvmsg(&mut self, fd: impl UringFd, msg: *mut libc::msghdr, flags: MsgFlags) {
uring_sys::io_uring_prep_recvmsg(self.sqe, fd.as_raw_fd(), msg, flags.bits() as _);
fd.update_sqe(self);
}
pub unsafe fn prep_sendmsg(&mut self, fd: impl UringFd, msg: *mut libc::msghdr, flags: MsgFlags) {
uring_sys::io_uring_prep_sendmsg(self.sqe, fd.as_raw_fd(), msg, flags.bits() as _);
fd.update_sqe(self);
}
#[inline]
pub unsafe fn prep_fallocate(&mut self, fd: impl UringFd,
offset: u64, size: u64,
flags: FallocateFlags) {
uring_sys::io_uring_prep_fallocate(self.sqe, fd.as_raw_fd(),
flags.bits() as _,
offset as _,
size as _);
fd.update_sqe(self);
}
#[inline]
pub unsafe fn prep_statx(
&mut self,
dirfd: impl UringFd,
path: &CStr,
flags: StatxFlags,
mask: StatxMode,
buf: &mut libc::statx,
) {
uring_sys::io_uring_prep_statx(self.sqe, dirfd.as_raw_fd(), path.as_ptr() as _,
flags.bits() as _, mask.bits() as _,
buf as _);
}
#[inline]
pub unsafe fn prep_openat(
&mut self,
fd: impl UringFd,
path: &CStr,
flags: OFlag,
mode: Mode,
) {
uring_sys::io_uring_prep_openat(self.sqe, fd.as_raw_fd(), path.as_ptr() as _, flags.bits(), mode.bits());
}
#[inline]
pub unsafe fn prep_close(&mut self, fd: impl UringFd) {
uring_sys::io_uring_prep_close(self.sqe, fd.as_raw_fd());
}
#[inline]
pub unsafe fn prep_timeout(&mut self, ts: &uring_sys::__kernel_timespec, events: u32, flags: TimeoutFlags) {
uring_sys::io_uring_prep_timeout(self.sqe,
ts as *const _ as *mut _,
events as _,
flags.bits() as _);
}
#[inline]
pub unsafe fn prep_timeout_remove(&mut self, user_data: u64) {
uring_sys::io_uring_prep_timeout_remove(self.sqe, user_data as _, 0);
}
#[inline]
pub unsafe fn prep_link_timeout(&mut self, ts: &uring_sys::__kernel_timespec) {
uring_sys::io_uring_prep_link_timeout(self.sqe, ts as *const _ as *mut _, 0);
}
#[inline]
pub unsafe fn prep_poll_add(&mut self, fd: impl UringFd, poll_flags: PollFlags) {
uring_sys::io_uring_prep_poll_add(self.sqe, fd.as_raw_fd(), poll_flags.bits());
fd.update_sqe(self);
}
#[inline]
pub unsafe fn prep_poll_remove(&mut self, user_data: u64) {
uring_sys::io_uring_prep_poll_remove(self.sqe, user_data as _)
}
#[inline]
pub unsafe fn prep_connect(&mut self, fd: impl UringFd, socket_addr: &SockAddr) {
let (addr, len) = socket_addr.as_ffi_pair();
uring_sys::io_uring_prep_connect(self.sqe, fd.as_raw_fd(), addr as *const _ as *mut _, len);
fd.update_sqe(self);
}
#[inline]
pub unsafe fn prep_accept(&mut self, fd: impl UringFd, accept: Option<&mut SockAddrStorage>, flags: SockFlag) {
let (addr, len) = match accept {
Some(accept) => (accept.storage.as_mut_ptr() as *mut _, &mut accept.len as *mut _ as *mut _),
None => (std::ptr::null_mut(), std::ptr::null_mut())
};
uring_sys::io_uring_prep_accept(self.sqe, fd.as_raw_fd(), addr, len, flags.bits());
fd.update_sqe(self);
}
#[inline]
pub unsafe fn prep_fadvise(&mut self, fd: impl UringFd, off: u64, len: u64, advice: PosixFadviseAdvice) {
use PosixFadviseAdvice::*;
let advice = match advice {
POSIX_FADV_NORMAL => libc::POSIX_FADV_NORMAL,
POSIX_FADV_SEQUENTIAL => libc::POSIX_FADV_SEQUENTIAL,
POSIX_FADV_RANDOM => libc::POSIX_FADV_RANDOM,
POSIX_FADV_NOREUSE => libc::POSIX_FADV_NOREUSE,
POSIX_FADV_WILLNEED => libc::POSIX_FADV_WILLNEED,
POSIX_FADV_DONTNEED => libc::POSIX_FADV_DONTNEED,
};
uring_sys::io_uring_prep_fadvise(self.sqe, fd.as_raw_fd(), off as _, len as _, advice);
fd.update_sqe(self);
}
#[inline]
pub unsafe fn prep_madvise(&mut self, data: &mut [u8], advice: MmapAdvise) {
use MmapAdvise::*;
let advice = match advice {
MADV_NORMAL => libc::MADV_NORMAL,
MADV_RANDOM => libc::MADV_RANDOM,
MADV_SEQUENTIAL => libc::MADV_SEQUENTIAL,
MADV_WILLNEED => libc::MADV_WILLNEED,
MADV_DONTNEED => libc::MADV_DONTNEED,
MADV_REMOVE => libc::MADV_REMOVE,
MADV_DONTFORK => libc::MADV_DONTFORK,
MADV_DOFORK => libc::MADV_DOFORK,
MADV_HWPOISON => libc::MADV_HWPOISON,
MADV_MERGEABLE => libc::MADV_MERGEABLE,
MADV_UNMERGEABLE => libc::MADV_UNMERGEABLE,
MADV_SOFT_OFFLINE => libc::MADV_SOFT_OFFLINE,
MADV_HUGEPAGE => libc::MADV_HUGEPAGE,
MADV_NOHUGEPAGE => libc::MADV_NOHUGEPAGE,
MADV_DONTDUMP => libc::MADV_DONTDUMP,
MADV_DODUMP => libc::MADV_DODUMP,
MADV_FREE => libc::MADV_FREE,
};
uring_sys::io_uring_prep_madvise(self.sqe, data.as_mut_ptr() as *mut _, data.len() as _, advice);
}
#[inline]
pub unsafe fn prep_epoll_ctl(&mut self, epoll_fd: RawFd, op: EpollOp, fd: RawFd, event: Option<&mut EpollEvent>) {
let op = match op {
EpollOp::EpollCtlAdd => libc::EPOLL_CTL_ADD,
EpollOp::EpollCtlDel => libc::EPOLL_CTL_DEL,
EpollOp::EpollCtlMod => libc::EPOLL_CTL_MOD,
};
let event = event.map_or(ptr::null_mut(), |event| event as *mut EpollEvent as *mut _);
uring_sys::io_uring_prep_epoll_ctl(self.sqe, epoll_fd, fd, op, event);
}
#[inline]
pub unsafe fn prep_files_update(&mut self, files: &[RawFd], offset: u32) {
let addr = files.as_ptr() as *mut RawFd;
let len = files.len() as u32;
uring_sys::io_uring_prep_files_update(self.sqe, addr, len, offset as _);
}
pub unsafe fn prep_provide_buffers(&mut self,
buffers: &mut [u8],
count: u32,
group: BufferGroupId,
index: u32,
) {
let addr = buffers.as_mut_ptr() as *mut libc::c_void;
let len = buffers.len() as u32 / count;
uring_sys::io_uring_prep_provide_buffers(self.sqe, addr, len as _, count as _, group.id as _, index as _);
}
pub unsafe fn prep_remove_buffers(&mut self, count: u32, id: BufferGroupId) {
uring_sys::io_uring_prep_remove_buffers(self.sqe, count as _, id.id as _);
}
#[inline]
pub unsafe fn prep_cancel(&mut self, user_data: u64, flags: i32) {
uring_sys::io_uring_prep_cancel(self.sqe, user_data as _, flags);
}
#[inline]
pub unsafe fn prep_nop(&mut self) {
uring_sys::io_uring_prep_nop(self.sqe);
}
pub fn clear(&mut self) {
*self.sqe = unsafe { mem::zeroed() };
}
pub fn raw(&self) -> &uring_sys::io_uring_sqe {
&self.sqe
}
pub unsafe fn raw_mut(&mut self) -> &mut uring_sys::io_uring_sqe {
&mut self.sqe
}
}
unsafe impl<'a> Send for SQE<'a> { }
unsafe impl<'a> Sync for SQE<'a> { }
pub struct SockAddrStorage {
storage: mem::MaybeUninit<nix::sys::socket::sockaddr_storage>,
len: usize,
}
impl SockAddrStorage {
pub fn uninit() -> Self {
let storage = mem::MaybeUninit::uninit();
let len = mem::size_of::<nix::sys::socket::sockaddr_storage>();
SockAddrStorage {
storage,
len
}
}
pub unsafe fn as_socket_addr(&self) -> io::Result<SockAddr> {
let storage = &*self.storage.as_ptr();
nix::sys::socket::sockaddr_storage_to_addr(storage, self.len).map_err(|e| {
let err_no = e.as_errno();
match err_no {
Some(err_no) => io::Error::from_raw_os_error(err_no as _),
None => io::Error::new(io::ErrorKind::Other, "Unknown error")
}
})
}
}
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct BufferGroupId {
pub id: u32,
}
bitflags::bitflags! {
pub struct SubmissionFlags: u8 {
const FIXED_FILE = 1 << 0;
const IO_DRAIN = 1 << 1;
const IO_LINK = 1 << 2;
const IO_HARDLINK = 1 << 3;
const ASYNC = 1 << 4;
const BUFFER_SELECT = 1 << 5;
}
}
bitflags::bitflags! {
pub struct FsyncFlags: u32 {
const FSYNC_DATASYNC = 1 << 0;
}
}
bitflags::bitflags! {
pub struct StatxFlags: i32 {
const AT_STATX_SYNC_AS_STAT = 0;
const AT_SYMLINK_NOFOLLOW = 1 << 10;
const AT_NO_AUTOMOUNT = 1 << 11;
const AT_EMPTY_PATH = 1 << 12;
const AT_STATX_FORCE_SYNC = 1 << 13;
const AT_STATX_DONT_SYNC = 1 << 14;
}
}
bitflags::bitflags! {
pub struct StatxMode: i32 {
const STATX_TYPE = 1 << 0;
const STATX_MODE = 1 << 1;
const STATX_NLINK = 1 << 2;
const STATX_UID = 1 << 3;
const STATX_GID = 1 << 4;
const STATX_ATIME = 1 << 5;
const STATX_MTIME = 1 << 6;
const STATX_CTIME = 1 << 7;
const STATX_INO = 1 << 8;
const STATX_SIZE = 1 << 9;
const STATX_BLOCKS = 1 << 10;
const STATX_BTIME = 1 << 11;
}
}
bitflags::bitflags! {
pub struct TimeoutFlags: u32 {
const TIMEOUT_ABS = 1 << 0;
}
}
bitflags::bitflags! {
pub struct SpliceFlags: u32 {
const F_FD_IN_FIXED = 1 << 31;
}
}
pub struct SQEs<'ring> {
sqes: slice::IterMut<'ring, uring_sys::io_uring_sqe>,
}
impl<'ring> SQEs<'ring> {
pub(crate) fn new(slice: &'ring mut [uring_sys::io_uring_sqe]) -> SQEs<'ring> {
SQEs {
sqes: slice.iter_mut(),
}
}
pub fn single(&mut self) -> Option<SQE<'ring>> {
let mut next = None;
while let Some(sqe) = self.consume() { next = Some(sqe) }
next
}
pub fn hard_linked(&mut self) -> HardLinked<'ring, '_> {
HardLinked { sqes: self }
}
pub fn soft_linked(&mut self) -> SoftLinked<'ring, '_> {
SoftLinked { sqes: self }
}
pub fn remaining(&self) -> u32 {
self.sqes.len() as u32
}
fn consume(&mut self) -> Option<SQE<'ring>> {
self.sqes.next().map(|sqe| {
unsafe { uring_sys::io_uring_prep_nop(sqe) }
SQE { sqe }
})
}
}
impl<'ring> Iterator for SQEs<'ring> {
type Item = SQE<'ring>;
fn next(&mut self) -> Option<SQE<'ring>> {
self.consume()
}
}
pub struct HardLinked<'ring, 'a> {
sqes: &'a mut SQEs<'ring>,
}
impl<'ring> HardLinked<'ring, '_> {
pub fn terminate(self) -> Option<SQE<'ring>> {
self.sqes.consume()
}
}
impl<'ring> Iterator for HardLinked<'ring, '_> {
type Item = HardLinkedSQE<'ring>;
fn next(&mut self) -> Option<Self::Item> {
let is_final = self.sqes.remaining() == 1;
self.sqes.consume().map(|sqe| HardLinkedSQE { sqe, is_final })
}
}
pub struct HardLinkedSQE<'ring> {
sqe: SQE<'ring>,
is_final: bool,
}
impl<'ring> Deref for HardLinkedSQE<'ring> {
type Target = SQE<'ring>;
fn deref(&self) -> &SQE<'ring> {
&self.sqe
}
}
impl<'ring> DerefMut for HardLinkedSQE<'ring> {
fn deref_mut(&mut self) -> &mut SQE<'ring> {
&mut self.sqe
}
}
impl<'ring> Drop for HardLinkedSQE<'ring> {
fn drop(&mut self) {
if !self.is_final {
self.sqe.set_flags(SubmissionFlags::IO_HARDLINK);
}
}
}
pub struct SoftLinked<'ring, 'a> {
sqes: &'a mut SQEs<'ring>,
}
impl<'ring> SoftLinked<'ring, '_> {
pub fn terminate(self) -> Option<SQE<'ring>> {
self.sqes.consume()
}
}
impl<'ring> Iterator for SoftLinked<'ring, '_> {
type Item = SoftLinkedSQE<'ring>;
fn next(&mut self) -> Option<Self::Item> {
let is_final = self.sqes.remaining() == 1;
self.sqes.consume().map(|sqe| SoftLinkedSQE { sqe, is_final })
}
}
pub struct SoftLinkedSQE<'ring> {
sqe: SQE<'ring>,
is_final: bool,
}
impl<'ring> Deref for SoftLinkedSQE<'ring> {
type Target = SQE<'ring>;
fn deref(&self) -> &SQE<'ring> {
&self.sqe
}
}
impl<'ring> DerefMut for SoftLinkedSQE<'ring> {
fn deref_mut(&mut self) -> &mut SQE<'ring> {
&mut self.sqe
}
}
impl<'ring> Drop for SoftLinkedSQE<'ring> {
fn drop(&mut self) {
if !self.is_final {
self.sqe.set_flags(SubmissionFlags::IO_LINK);
}
}
}