io-uring 0.5.12

The low-level `io_uring` userspace interface for Rust
Documentation
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic;
use std::{io, ptr};

use crate::register::{execute, Probe};
use crate::sys;
use crate::util::{cast_ptr, OwnedFd};
use crate::Parameters;

use crate::register::Restriction;

use crate::types;

/// Interface for submitting submission queue events in an io_uring instance to the kernel for
/// executing and registering files or buffers with the instance.
///
/// io_uring supports both directly performing I/O on buffers and file descriptors and registering
/// them beforehand. Registering is slow, but it makes performing the actual I/O much faster.
pub struct Submitter<'a> {
    fd: &'a OwnedFd,
    params: &'a Parameters,

    sq_head: *const atomic::AtomicU32,
    sq_tail: *const atomic::AtomicU32,
    sq_flags: *const atomic::AtomicU32,
}

impl<'a> Submitter<'a> {
    #[inline]
    pub(crate) const fn new(
        fd: &'a OwnedFd,
        params: &'a Parameters,
        sq_head: *const atomic::AtomicU32,
        sq_tail: *const atomic::AtomicU32,
        sq_flags: *const atomic::AtomicU32,
    ) -> Submitter<'a> {
        Submitter {
            fd,
            params,
            sq_head,
            sq_tail,
            sq_flags,
        }
    }

    #[inline]
    fn sq_len(&self) -> usize {
        unsafe {
            let head = (*self.sq_head).load(atomic::Ordering::Acquire);
            let tail = (*self.sq_tail).load(atomic::Ordering::Acquire);

            tail.wrapping_sub(head) as usize
        }
    }

    /// Whether the kernel thread has gone to sleep because it waited for too long without
    /// submission queue entries.
    #[inline]
    fn sq_need_wakeup(&self) -> bool {
        unsafe {
            (*self.sq_flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_NEED_WAKEUP != 0
        }
    }

    /// CQ ring is overflown
    fn sq_cq_overflow(&self) -> bool {
        unsafe {
            (*self.sq_flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_CQ_OVERFLOW != 0
        }
    }

    /// Initiate and/or complete asynchronous I/O. This is a low-level wrapper around
    /// `io_uring_enter` - see `man io_uring_enter` (or [its online
    /// version](https://manpages.debian.org/unstable/liburing-dev/io_uring_enter.2.en.html) for
    /// more details.
    ///
    /// You will probably want to use a more high-level API such as
    /// [`submit`](Self::submit) or [`submit_and_wait`](Self::submit_and_wait).
    ///
    /// # Safety
    ///
    /// This provides a raw interface so developer must ensure that parameters are correct.
    pub unsafe fn enter<T: Sized>(
        &self,
        to_submit: u32,
        min_complete: u32,
        flag: u32,
        arg: Option<&T>,
    ) -> io::Result<usize> {
        let arg = arg
            .map(|arg| cast_ptr(arg) as *const _)
            .unwrap_or_else(ptr::null);
        let size = std::mem::size_of::<T>();
        sys::io_uring_enter(
            self.fd.as_raw_fd(),
            to_submit,
            min_complete,
            flag,
            arg,
            size,
        )
        .map(|res| res as _)
    }

    /// Submit all queued submission queue events to the kernel.
    #[inline]
    pub fn submit(&self) -> io::Result<usize> {
        self.submit_and_wait(0)
    }

    /// Submit all queued submission queue events to the kernel and wait for at least `want`
    /// completion events to complete.
    pub fn submit_and_wait(&self, want: usize) -> io::Result<usize> {
        let len = self.sq_len();
        let mut flags = 0;

        // This logic suffers from the fact the sq_cq_overflow and sq_need_wakeup
        // each cause an atomic load of the same variable, self.sq_flags.
        // In the hottest paths, when a server is running with sqpoll,
        // this is going to be hit twice, when once would be sufficient.

        if want > 0 || self.params.is_setup_iopoll() || self.sq_cq_overflow() {
            flags |= sys::IORING_ENTER_GETEVENTS;
        }

        if self.params.is_setup_sqpoll() {
            if self.sq_need_wakeup() {
                flags |= sys::IORING_ENTER_SQ_WAKEUP;
            } else if want == 0 {
                // The kernel thread is polling and hasn't fallen asleep, so we don't need to tell
                // it to process events or wake it up
                return Ok(len);
            }
        }

        unsafe { self.enter::<libc::sigset_t>(len as _, want as _, flags, None) }
    }

    pub fn submit_with_args(
        &self,
        want: usize,
        args: &types::SubmitArgs<'_, '_>,
    ) -> io::Result<usize> {
        let len = self.sq_len();
        let mut flags = sys::IORING_ENTER_EXT_ARG;

        if want > 0 {
            flags |= sys::IORING_ENTER_GETEVENTS;
        }

        if self.params.is_setup_sqpoll() {
            if self.sq_need_wakeup() {
                flags |= sys::IORING_ENTER_SQ_WAKEUP;
            } else if want == 0 {
                // The kernel thread is polling and hasn't fallen asleep, so we don't need to tell
                // it to process events or wake it up
                return Ok(len);
            }
        }

        unsafe { self.enter(len as _, want as _, flags, Some(&args.args)) }
    }

    /// Wait for the submission queue to have free entries.
    pub fn squeue_wait(&self) -> io::Result<usize> {
        unsafe { self.enter::<libc::sigset_t>(0, 0, sys::IORING_ENTER_SQ_WAIT, None) }
    }

    /// Register in-memory user buffers for I/O with the kernel. You can use these buffers with the
    /// [`ReadFixed`](crate::opcode::ReadFixed) and [`WriteFixed`](crate::opcode::WriteFixed)
    /// operations.
    pub fn register_buffers(&self, bufs: &[libc::iovec]) -> io::Result<()> {
        execute(
            self.fd.as_raw_fd(),
            sys::IORING_REGISTER_BUFFERS,
            bufs.as_ptr() as *const _,
            bufs.len() as _,
        )
        .map(drop)
    }

    /// Registers an empty file table of nr_files number of file descriptors. The sparse variant is
    /// available in kernels 5.19 and later.
    ///
    /// Registering a file table is a prerequisite for using any request that
    /// uses direct descriptors.
    pub fn register_files_sparse(&self, nr: u32) -> io::Result<()> {
        use std::mem;
        let rr = sys::io_uring_rsrc_register {
            nr,
            flags: sys::IORING_RSRC_REGISTER_SPARSE,
            resv2: 0,
            data: 0,
            tags: 0,
        };
        let rr = cast_ptr::<sys::io_uring_rsrc_register>(&rr);
        execute(
            self.fd.as_raw_fd(),
            sys::IORING_REGISTER_FILES2,
            rr as *const _,
            mem::size_of::<sys::io_uring_rsrc_register>() as _,
        )
        .map(drop)
    }

    /// Register files for I/O. You can use the registered files with
    /// [`Fixed`](crate::types::Fixed).
    ///
    /// Each fd may be -1, in which case it is considered "sparse", and can be filled in later with
    /// [`register_files_update`](Self::register_files_update).
    ///
    /// Note that this will wait for the ring to idle; it will only return once all active requests
    /// are complete. Use [`register_files_update`](Self::register_files_update) to avoid this.
    pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
        execute(
            self.fd.as_raw_fd(),
            sys::IORING_REGISTER_FILES,
            fds.as_ptr() as *const _,
            fds.len() as _,
        )
        .map(drop)
    }

    /// This operation replaces existing files in the registered file set with new ones,
    /// either turning a sparse entry (one where fd is equal to -1) into a real one, removing an existing entry (new one is set to -1),
    /// or replacing an existing entry with a new existing entry. The `offset` parameter specifies
    /// the offset into the list of registered files at which to start updating files.
    ///
    /// You can also perform this asynchronously with the
    /// [`FilesUpdate`](crate::opcode::FilesUpdate) opcode.
    pub fn register_files_update(&self, offset: u32, fds: &[RawFd]) -> io::Result<usize> {
        let fu = sys::io_uring_files_update {
            offset,
            resv: 0,
            fds: fds.as_ptr() as _,
        };
        let fu = cast_ptr::<sys::io_uring_files_update>(&fu);
        let ret = execute(
            self.fd.as_raw_fd(),
            sys::IORING_REGISTER_FILES_UPDATE,
            fu as *const _,
            fds.len() as _,
        )?;
        Ok(ret as _)
    }

    /// Register an eventfd created by [`eventfd`](libc::eventfd) with the io_uring instance.
    pub fn register_eventfd(&self, eventfd: RawFd) -> io::Result<()> {
        execute(
            self.fd.as_raw_fd(),
            sys::IORING_REGISTER_EVENTFD,
            cast_ptr::<RawFd>(&eventfd) as *const _,
            1,
        )
        .map(drop)
    }

    /// This works just like [`register_eventfd`](Self::register_eventfd), except notifications are
    /// only posted for events that complete in an async manner, so requests that complete
    /// immediately will not cause a notification.
    pub fn register_eventfd_async(&self, eventfd: RawFd) -> io::Result<()> {
        execute(
            self.fd.as_raw_fd(),
            sys::IORING_REGISTER_EVENTFD_ASYNC,
            cast_ptr::<RawFd>(&eventfd) as *const _,
            1,
        )
        .map(drop)
    }

    /// Fill in the given [`Probe`] with information about the opcodes supported by io_uring on the
    /// running kernel.
    ///
    /// # Examples
    ///
    // This is marked no_run as it is only available from Linux 5.6+, however the latest Ubuntu (on
    // which CI runs) only has Linux 5.4.
    /// ```no_run
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let io_uring = io_uring::IoUring::new(1)?;
    /// let mut probe = io_uring::Probe::new();
    /// io_uring.submitter().register_probe(&mut probe)?;
    ///
    /// if probe.is_supported(io_uring::opcode::Read::CODE) {
    ///     println!("Reading is supported!");
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub fn register_probe(&self, probe: &mut Probe) -> io::Result<()> {
        execute(
            self.fd.as_raw_fd(),
            sys::IORING_REGISTER_PROBE,
            probe.as_mut_ptr() as *const _,
            Probe::COUNT as _,
        )
        .map(drop)
    }

    /// Register credentials of the running application with io_uring, and get an id associated with
    /// these credentials. This ID can then be [passed](crate::squeue::Entry::personality) into
    /// submission queue entries to issue the request with this process' credentials.
    ///
    /// By default, if [`Parameters::is_feature_cur_personality`] is set then requests will use the
    /// credentials of the task that called [`Submitter::enter`], otherwise they will use the
    /// credentials of the task that originally registered the io_uring.
    ///
    /// [`Parameters::is_feature_cur_personality`]: crate::Parameters::is_feature_cur_personality
    pub fn register_personality(&self) -> io::Result<u16> {
        let id = execute(
            self.fd.as_raw_fd(),
            sys::IORING_REGISTER_PERSONALITY,
            ptr::null(),
            0,
        )?;
        Ok(id as u16)
    }

    /// Unregister all previously registered buffers.
    ///
    /// You do not need to explicitly call this before dropping the [`IoUring`](crate::IoUring), as
    /// it will be cleaned up by the kernel automatically.
    pub fn unregister_buffers(&self) -> io::Result<()> {
        execute(
            self.fd.as_raw_fd(),
            sys::IORING_UNREGISTER_BUFFERS,
            ptr::null(),
            0,
        )
        .map(drop)
    }

    /// Unregister all previously registered files.
    ///
    /// You do not need to explicitly call this before dropping the [`IoUring`](crate::IoUring), as
    /// it will be cleaned up by the kernel automatically.
    pub fn unregister_files(&self) -> io::Result<()> {
        execute(
            self.fd.as_raw_fd(),
            sys::IORING_UNREGISTER_FILES,
            ptr::null(),
            0,
        )
        .map(drop)
    }

    /// Unregister an eventfd file descriptor to stop notifications.
    pub fn unregister_eventfd(&self) -> io::Result<()> {
        execute(
            self.fd.as_raw_fd(),
            sys::IORING_UNREGISTER_EVENTFD,
            ptr::null(),
            0,
        )
        .map(drop)
    }

    /// Unregister a previously registered personality.
    pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
        execute(
            self.fd.as_raw_fd(),
            sys::IORING_UNREGISTER_PERSONALITY,
            ptr::null(),
            personality as _,
        )
        .map(drop)
    }

    /// Permanently install a feature allowlist. Once this has been called, attempting to perform
    /// an operation not on the allowlist will fail with `-EACCES`.
    ///
    /// This can only be called once, to prevent untrusted code from removing restrictions.
    pub fn register_restrictions(&self, res: &mut [Restriction]) -> io::Result<()> {
        execute(
            self.fd.as_raw_fd(),
            sys::IORING_REGISTER_RESTRICTIONS,
            res.as_mut_ptr().cast(),
            res.len() as _,
        )
        .map(drop)
    }

    /// Enable the rings of the io_uring instance if they have been disabled with
    /// [`setup_r_disabled`](crate::Builder::setup_r_disabled).
    pub fn register_enable_rings(&self) -> io::Result<()> {
        execute(
            self.fd.as_raw_fd(),
            sys::IORING_REGISTER_ENABLE_RINGS,
            ptr::null(),
            0,
        )
        .map(drop)
    }

    /// Get and/or set the limit for number of io_uring worker threads per NUMA
    /// node. `max[0]` holds the limit for bounded workers, which process I/O
    /// operations expected to be bound in time, that is I/O on regular files or
    /// block devices. While `max[1]` holds the limit for unbounded workers,
    /// which carry out I/O operations that can never complete, for instance I/O
    /// on sockets. Passing `0` does not change the current limit. Returns
    /// previous limits on success.
    pub fn register_iowq_max_workers(&self, max: &mut [u32; 2]) -> io::Result<()> {
        execute(
            self.fd.as_raw_fd(),
            sys::IORING_REGISTER_IOWQ_MAX_WORKERS,
            max.as_mut_ptr().cast(),
            max.len() as _,
        )
        .map(drop)
    }

    /// Register buffer ring for provided buffers.
    ///
    /// Details can be found in the io_uring_register_buf_ring.3 man page.
    ///
    /// If the register command is not supported, or the ring_entries value exceeds
    /// 32768, the InvalidInput error is returned.
    ///
    /// Available since 5.19.
    pub fn register_buf_ring(
        &self,
        ring_addr: u64,
        ring_entries: u16,
        bgid: u16,
    ) -> io::Result<()> {
        // The interface type for ring_entries is u32 but the same interface only allows a u16 for
        // the tail to be specified, so to try and avoid further confusion, we limit the
        // ring_entries to u16 here too. The value is actually limited to 2^15 (32768) but we can
        // let the kernel enforce that.
        let arg = sys::io_uring_buf_reg {
            ring_addr,
            ring_entries: ring_entries as _,
            bgid,
            pad: 0,
            resv: Default::default(),
        };
        let arg = cast_ptr::<sys::io_uring_buf_reg>(&arg);
        execute(
            self.fd.as_raw_fd(),
            sys::IORING_REGISTER_PBUF_RING,
            arg as *const _,
            1,
        )
        .map(drop)
    }

    /// Unregister a previously registered buffer ring.
    ///
    /// Available since 5.19.
    pub fn unregister_buf_ring(&self, bgid: u16) -> io::Result<()> {
        let arg = sys::io_uring_buf_reg {
            ring_addr: 0,
            ring_entries: 0,
            bgid,
            pad: 0,
            resv: Default::default(),
        };
        let arg = cast_ptr::<sys::io_uring_buf_reg>(&arg);
        execute(
            self.fd.as_raw_fd(),
            sys::IORING_UNREGISTER_PBUF_RING,
            arg as *const _,
            1,
        )
        .map(drop)
    }
}