azur 0.3.1

A no_std Rust crate that implements an executor/reactor and futures using io_uring
Documentation
pub(crate) mod pending_state;

use core::{
    future::Future as CoreFuture,
    marker::PhantomPinned,
    pin::Pin,
    task::{
        Context,
        Poll,
    },
};

use futures_core::future::FusedFuture;
use tracing::error;

use crate::{
    op::Op,
    reactor::{
        Reactor,
        REACTOR,
        TASK,
    },
    task::TaskHeader,
};

struct Pending<O: Op> {
    state: pending_state::Packed,
    op: O,
}

pub struct Future<O: Op> {
    pending: Option<Pending<O>>,
    _marker: PhantomPinned,
}

impl<O: Op> Future<O> {
    pub(crate) fn new(op: O) -> Self {
        Self {
            pending: Some(Pending {
                state: pending_state::Packed::from(pending_state::Unpacked::New),
                op,
            }),
            _marker: PhantomPinned,
        }
    }
}

impl<O: Op> CoreFuture for Future<O> {
    type Output = O::Output;

    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        // SAFETY: The below code does not move `this`.
        let this = unsafe { self.get_unchecked_mut() };

        let pending = this
            .pending
            .as_mut()
            .expect("polling on a terminated future");
        match pending_state::Unpacked::from(pending.state) {
            pending_state::Unpacked::New => {
                unsafe {
                    Reactor::submit(
                        &REACTOR,
                        &mut pending.op,
                        &mut pending.state as *mut pending_state::Packed,
                    )
                    .unwrap()
                };
                let mut task = unsafe { (*TASK.get()).unwrap() };
                unsafe { task.as_mut().increment_ref_count() };
                pending.state =
                    pending_state::Packed::from(pending_state::Unpacked::Submitted { task });
                Poll::Pending
            }
            pending_state::Unpacked::Submitted { .. } => Poll::Pending,
            pending_state::Unpacked::Completed { ret } => {
                let pending = this.pending.take().unwrap();
                Poll::Ready(pending.op.complete(ret))
            }
        }
    }
}

impl<O: Op> FusedFuture for Future<O> {
    fn is_terminated(&self) -> bool {
        self.pending.is_none()
    }
}

#[cold]
fn cancel(pending_state: &mut pending_state::Packed) {
    match pending_state::Unpacked::from(*pending_state) {
        pending_state::Unpacked::Submitted { task } => {
            unsafe { TaskHeader::decrement_ref_count(task) };
        }
        _ => return,
    };
    if let Err(err) = Reactor::cancel(&REACTOR, pending_state as *mut pending_state::Packed) {
        error!("failed to cancel operation: {err}");
        crate::freeze();
    }
}

impl<O: Op> Drop for Future<O> {
    fn drop(&mut self) {
        if let Some(pending) = &mut self.pending {
            cancel(&mut pending.state);
        }
    }
}