azur 0.3.1

A no_std Rust crate that implements an executor/reactor and futures using io_uring
Documentation
//! A wrapper around the raw `io_uring` interface.

use core::{
    mem,
    ptr,
    sync::atomic::{
        AtomicU32,
        Ordering,
    },
};

use lx::AsRawFd;

/// The `io_uring` submission queue array maps indices in the submission queue ring to
/// corresponding indices in the actual submission queue. This level of indirection allows
/// reserving and the submitting entries out of order.
pub(crate) trait SubmissionQueueMapping {
    fn map(&self, ring_idx: u32) -> u32;
}

/// A `SubmissionQueueMapping` implementation that uses the same indices in the submission queue
/// ring than in the actual submission queue.
pub(crate) struct IdentityMapping;

impl SubmissionQueueMapping for IdentityMapping {
    fn map(&self, ring_idx: u32) -> u32 {
        ring_idx
    }
}

#[derive(Debug)]
pub(crate) struct QueueFullError;

/// An safer interface to an io_uring instance. Multi-threading is not supported.
pub(crate) struct Instance {
    /// The io_uring file descriptor.
    fd: lx::OwnedFd,

    /// The memory mapping that contains both the completion queue and fields shared between the
    /// kernel and the application.
    _shared_mmap: lx::Mmap,

    /// The memory mapping that contains the submission queue entries.
    sq_mmap: lx::Mmap,

    sq_head: *const AtomicU32,
    sq_head_cache: u32,
    sq_tail: *const AtomicU32,
    sq_tail_cache: u32,
    sq_mask: u32,
    sq_capacity: u32,
    sq_array: *mut u32,

    cq_head: *const AtomicU32,
    cq_head_cache: u32,
    cq_tail: *const AtomicU32,
    cq_tail_cache: u32,
    cq_mask: u32,
    cq_capacity: u32,
    cq_entries: *const lx::io_uring_cqe,
}

impl Instance {
    /// Creates a new io_uring instance with the default parameters, and for a capacity of at least
    /// `entries` entries for the completion and submission queues.
    pub(crate) fn new(entries: u32) -> lx::Result<Self> {
        let mut params: lx::io_uring_params = unsafe { mem::zeroed() };
        params.flags = lx::IORING_SETUP_SINGLE_ISSUER | lx::IORING_SETUP_DEFER_TASKRUN;

        let fd = lx::io_uring_setup(entries, &mut params)?;

        // We assume this optimization to be available so that we only have to do one memory map
        // and use a single address everywhere.
        if params.features & lx::IORING_FEAT_SINGLE_MMAP == 0 {
            return Err(lx::Error::from_code(lx::EINVAL));
        }

        let cq_size = usize::try_from(params.cq_off.cqes).unwrap()
            + mem::size_of::<lx::io_uring_cqe>() * usize::try_from(params.cq_entries).unwrap();
        let sq_size = usize::try_from(params.sq_off.array).unwrap()
            + mem::size_of::<u32>() * usize::try_from(params.sq_entries).unwrap();
        let size = cq_size.max(sq_size);
        let shared_mmap = unsafe {
            lx::mmap_file(
                ptr::null_mut(),
                size,
                lx::PROT_READ | lx::PROT_WRITE,
                lx::MAP_SHARED | lx::MAP_POPULATE,
                &fd,
                lx::IORING_OFF_SQ_RING,
            )
        }?;

        let sq_head = unsafe {
            shared_mmap
                .as_ref()
                .as_ptr()
                .offset(isize::try_from(params.sq_off.head).unwrap())
                as *const AtomicU32
        };
        let sq_tail = unsafe {
            shared_mmap
                .as_ref()
                .as_ptr()
                .offset(isize::try_from(params.sq_off.tail).unwrap())
                as *const AtomicU32
        };
        let sq_array = unsafe {
            shared_mmap
                .as_ref()
                .as_ptr()
                .offset(isize::try_from(params.sq_off.array).unwrap()) as *mut u32
        };

        let cq_head = unsafe {
            shared_mmap
                .as_ref()
                .as_ptr()
                .offset(isize::try_from(params.cq_off.head).unwrap())
                as *const AtomicU32
        };
        let cq_tail = unsafe {
            shared_mmap
                .as_ref()
                .as_ptr()
                .offset(isize::try_from(params.cq_off.tail).unwrap())
                as *const AtomicU32
        };
        let cq_entries = unsafe {
            shared_mmap
                .as_ref()
                .as_ptr()
                .offset(isize::try_from(params.cq_off.cqes).unwrap())
                as *const lx::io_uring_cqe
        };

        let sq_size =
            mem::size_of::<lx::io_uring_sqe>() * usize::try_from(params.sq_entries).unwrap();
        let sq_mmap = unsafe {
            lx::mmap_file(
                ptr::null_mut(),
                sq_size,
                lx::PROT_READ | lx::PROT_WRITE,
                lx::MAP_SHARED | lx::MAP_POPULATE,
                &fd,
                lx::IORING_OFF_SQES,
            )?
        };

        Ok(Self {
            fd,

            _shared_mmap: shared_mmap,
            sq_mmap,

            sq_head,
            sq_head_cache: 0,
            sq_tail,
            sq_tail_cache: 0,
            sq_mask: params.sq_entries - 1,
            sq_capacity: params.sq_entries,
            sq_array,

            cq_head,
            cq_head_cache: 0,
            cq_tail,
            cq_tail_cache: 0,
            cq_mask: params.cq_entries - 1,
            cq_capacity: params.cq_entries,
            cq_entries,
        })
    }

    #[inline]
    pub(crate) fn start_push(&mut self) {
        self.sq_head_cache = unsafe { (*self.sq_head).load(Ordering::Acquire) };
    }

    /// Tries to push a new submission. If the queue is full, `Err(QueueFullError)` is returned.
    ///
    /// # Safety
    ///
    /// The fields written by `entry` must be valid for the entire duration of the operation. For
    /// example, a buffer passed to the kernel should live until the operation is done.
    pub(crate) unsafe fn try_push(
        &mut self,
        mapping: &impl SubmissionQueueMapping,
    ) -> Result<&mut lx::io_uring_sqe, QueueFullError> {
        if self.sq_tail_cache == self.sq_head_cache + self.sq_capacity {
            return Err(QueueFullError);
        }

        let mask = self.sq_mask;
        let index = usize::try_from(mapping.map(self.sq_tail_cache & mask)).unwrap();
        let entry = unsafe { &mut *self.sq_entries_mut().get_unchecked_mut(index) };

        self.sq_tail_cache += 1;

        Ok(entry)
    }

    #[inline]
    pub(crate) fn end_push(&mut self) {
        unsafe { (*self.sq_tail).store(self.sq_tail_cache, Ordering::Release) };
    }

    pub(crate) unsafe fn set_sq_mapping(&mut self, mapping: &impl SubmissionQueueMapping) {
        for (ring_idx, actual_idx) in (*self.sq_array_mut()).iter_mut().enumerate() {
            *actual_idx = mapping.map(u32::try_from(ring_idx).unwrap());
        }
    }

    /// Returns a mutable to the submission queue.
    #[inline]
    fn sq_array_mut(&mut self) -> *mut [u32] {
        ptr::slice_from_raw_parts_mut(self.sq_array, usize::try_from(self.sq_capacity).unwrap())
    }

    /// Returns a mutable pointer to the submission queue entries.
    #[inline]
    unsafe fn sq_entries_mut(&mut self) -> *mut [lx::io_uring_sqe] {
        ptr::slice_from_raw_parts_mut(
            self.sq_mmap.as_mut().as_mut_ptr() as *mut lx::io_uring_sqe,
            usize::try_from(self.sq_capacity).unwrap(),
        )
    }

    #[inline]
    pub(crate) fn get_events(&mut self, to_submit: u32, min_complete: u32) -> lx::Result<i32> {
        lx::io_uring_enter_getevents(self.fd.as_raw_fd() as u32, to_submit, min_complete)
    }

    #[inline]
    pub(crate) fn start_pop(&mut self) {
        self.cq_tail_cache = unsafe { (*self.cq_tail).load(Ordering::Acquire) };
    }

    pub(crate) fn pop(&mut self) -> Option<&lx::io_uring_cqe> {
        if self.cq_head_cache == self.cq_tail_cache {
            return None;
        }

        let index = usize::try_from(self.cq_head_cache & self.cq_mask).unwrap();
        let entry = unsafe { &*self.cq_entries().get_unchecked(index) };

        self.cq_head_cache += 1;

        Some(entry)
    }

    #[inline]
    pub(crate) fn end_pop(&mut self) {
        unsafe { (*self.cq_head).store(self.cq_head_cache, Ordering::Release) };
    }

    /// Returns a constant pointer to the completion queue.
    #[inline]
    fn cq_entries(&self) -> *const [lx::io_uring_cqe] {
        ptr::slice_from_raw_parts(self.cq_entries, usize::try_from(self.cq_capacity).unwrap())
    }
}

impl lx::AsRawFd for Instance {
    #[inline]
    fn as_raw_fd(&self) -> lx::RawFd {
        self.fd.as_raw_fd()
    }
}