azur 0.3.1

A no_std Rust crate that implements an executor/reactor and futures using io_uring
Documentation
use core::{
    cell::{
        LazyCell,
        UnsafeCell,
    },
    future::Future,
    mem,
    panic,
    ptr::{
        self,
        NonNull,
    },
    task::{
        Context,
        Poll,
        Waker,
    },
};

use crate::{
    future::pending_state,
    io_uring::{
        self,
        QueueFullError,
    },
    op::Op,
    task::{
        Task,
        TaskHeader,
    },
};

pub(crate) struct Reactor {
    /// The io_uring instance.
    io_uring: io_uring::Instance,

    /// The number of submission queue entries to submit.
    to_submit: u32,

    /// The number of tasks.
    tasks: usize,
}

#[thread_local]
pub(crate) static REACTOR: LazyCell<UnsafeCell<Reactor>> =
    LazyCell::new(|| UnsafeCell::new(Reactor::new().unwrap()));

#[thread_local]
pub(crate) static TASK: UnsafeCell<Option<NonNull<TaskHeader>>> = UnsafeCell::new(None);

impl Reactor {
    pub(crate) fn new() -> lx::Result<Self> {
        let mut io_uring = io_uring::Instance::new(32768)?;
        unsafe { io_uring.set_sq_mapping(&io_uring::IdentityMapping) };
        Ok(Self {
            io_uring,
            to_submit: 0,
            tasks: 0,
        })
    }

    /// Submits an operation to the reactor.
    ///
    /// # Safety
    ///
    /// `op` must not deallocated, moved or dropped and the `pending_state` pointer must remain
    /// valid for reads and writes until either the operation is cancelled or the reactor sets
    /// `pending_state` is to the `PendingState::Completed` variant.
    pub(crate) unsafe fn submit<O: Op>(
        this: &UnsafeCell<Self>,
        op: &mut O,
        pending_state: *mut pending_state::Packed,
    ) -> lx::Result<()> {
        loop {
            let sqe = (*this.get()).io_uring.try_push(&io_uring::IdentityMapping);
            match sqe {
                Ok(sqe) => {
                    // SAFETY: The all-zero byte-pattern represents a valid value for
                    // `lx::io_uring_sqe`.
                    *sqe = unsafe { mem::zeroed() };
                    sqe.user_data = pending_state as u64;
                    op.fill_sqe(sqe);
                    break;
                }
                Err(QueueFullError) => (*this.get()).io_uring.start_push(),
            }
        }
        (*this.get()).to_submit += 1;
        Ok(())
    }

    pub(crate) fn cancel(
        this: &UnsafeCell<Reactor>,
        pending_state: *mut pending_state::Packed,
    ) -> lx::Result<()> {
        loop {
            let sqe = unsafe { (*this.get()).io_uring.try_push(&io_uring::IdentityMapping) };
            match sqe {
                Ok(sqe) => {
                    // SAFETY: The all-zero byte-pattern represents a valid value for
                    // `lx::io_uring_sqe`.
                    *sqe = unsafe { mem::zeroed() };
                    sqe.opcode = lx::IORING_OP_ASYNC_CANCEL;
                    sqe.user_data = 0;
                    sqe.addr = pending_state as u64;
                    break;
                }
                Err(QueueFullError) => unsafe { (*this.get()).io_uring.start_push() },
            }
        }

        unsafe {
            let this = &mut (*this.get());

            this.io_uring.end_push();

            let to_submit = mem::replace(&mut this.to_submit, 0) + 1;
            this.io_uring.get_events(to_submit, 0)?;

            this.io_uring.start_pop();
        }

        let mut cancelled = None;

        while let Some(cqe) = unsafe { (*this.get()).io_uring.pop() } {
            if cqe.user_data == 0 {
                assert!(
                    cancelled.is_none(),
                    "multiple completion queue entries for cancellation operation"
                );

                let ret = cqe.ret;
                if ret == 0 || ret == -i32::from(lx::ENOENT) {
                    cancelled = Some(true);
                } else if ret == -i32::from(lx::EALREADY) {
                    cancelled = Some(false);
                } else {
                    panic!("invalid return value for cancellation operation");
                }
            } else if cqe.user_data != pending_state as u64 {
                Reactor::handle_cqe(this, cqe);
            }
        }

        unsafe {
            (*this.get()).io_uring.end_pop();
        };

        match cancelled {
            Some(true) => return Ok(()),
            Some(false) => {}
            None => panic!("missing completion queue entry for cancellation operation"),
        }

        let mut done = false;
        while !done {
            unsafe {
                let this = &mut (*this.get());

                let to_submit = mem::replace(&mut this.to_submit, 0);
                this.io_uring.get_events(to_submit, 1)?;

                this.io_uring.start_pop();
            }

            while let Some(cqe) = unsafe { (*this.get()).io_uring.pop() } {
                if cqe.user_data == pending_state as u64 {
                    done = true;
                } else {
                    Reactor::handle_cqe(this, cqe);
                }
            }

            unsafe {
                (*this.get()).io_uring.end_pop();
            };
        }

        Ok(())
    }

    pub(crate) fn react(this: &UnsafeCell<Reactor>) -> usize {
        let mut count = 0;

        unsafe {
            (*this.get()).io_uring.start_pop();
        }

        while let Some(cqe) = unsafe { (*this.get()).io_uring.pop() } {
            Reactor::handle_cqe(this, cqe);
            count += 1;
        }

        unsafe {
            (*this.get()).io_uring.end_pop();
        }

        count
    }

    pub(crate) fn enter(this: &UnsafeCell<Reactor>, min_complete: u32) -> lx::Result<()> {
        unsafe {
            let this = &mut (*this.get());

            this.io_uring.end_push();

            let to_submit = mem::replace(&mut this.to_submit, 0);
            this.io_uring.get_events(to_submit, min_complete)?;
        }

        Ok(())
    }

    pub(crate) fn run(this: &UnsafeCell<Reactor>) -> lx::Result<()> {
        loop {
            while Reactor::react(this) > 0 {}
            if unsafe { (*this.get()).tasks == 0 } {
                break;
            }
            Reactor::enter(this, 1)?;
        }
        Ok(())
    }

    fn handle_cqe(this: &UnsafeCell<Reactor>, cqe: &lx::io_uring_cqe) {
        let state = unsafe {
            ptr::replace(
                cqe.user_data as *mut pending_state::Packed,
                pending_state::Packed::from(pending_state::Unpacked::Completed { ret: cqe.ret }),
            )
        };

        let task = match pending_state::Unpacked::from(state) {
            pending_state::Unpacked::Submitted { task, .. } => task,
            _ => panic!("unexpected operation state"),
        };

        unsafe {
            Reactor::poll_task(this, task);
        }
    }

    unsafe fn poll_task(this: &UnsafeCell<Reactor>, task: NonNull<TaskHeader>) {
        let old = mem::replace(unsafe { &mut *TASK.get() }, Some(task));

        let waker = Waker::noop();
        let mut cx = Context::from_waker(&waker);
        let poll = TaskHeader::poll(task, &mut cx);

        unsafe {
            *TASK.get() = old;
        }

        match poll {
            Poll::Ready(()) => {
                (*this.get()).tasks -= 1;
            }
            Poll::Pending => {}
        }

        TaskHeader::decrement_ref_count(task);
    }

    pub(crate) fn spawn<F: Future<Output = ()> + 'static>(this: &UnsafeCell<Reactor>, future: F) {
        unsafe { Self::spawn_unchecked(this, future) }
    }

    pub(crate) unsafe fn spawn_unchecked<F: Future<Output = ()>>(
        this: &UnsafeCell<Reactor>,
        future: F,
    ) {
        unsafe {
            (*this.get()).tasks += 1;
        }
        let ptr = Task::new(future);
        unsafe { Reactor::poll_task(this, Task::header(ptr)) }
    }
}