hiasync 0.2.2

Supports only single-threaded asynchronous runtime
Documentation
use crate::{JoinHandle, LocalWaker, Waker, Event};
use core::any::Any;
use core::future::Future;
use core::marker::PhantomData;
use core::pin::Pin;
use core::task::{Context, Poll};

struct WakerFuture;

impl Future for WakerFuture {
    type Output = Waker;
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let waker = LocalWaker::waker(ctx);
        Poll::Ready(waker.clone())
    }
}

async fn local_waker() -> Waker {
    WakerFuture.await
}

/// 新建异步任务,可实现和当前异步任务的并发调度.
pub async fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
    T: Future + 'static,
    T::Output: 'static,
{
    local_waker().await.task_new(future)
}

struct Yield(bool);

impl Future for Yield {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.0 {
            return Poll::Ready(());
        }
        let waker = LocalWaker::waker(ctx);
        waker.wake_by_ref();
        self.0 = true;
        Poll::Pending
    }
}

/// 主动放弃本次调度切换到其他任务.
pub async fn sched_yield() {
    Yield(false).await
}

struct Suspend(bool);

impl Future for Suspend {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, _ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.0 {
            return Poll::Ready(());
        }
        self.0 = true;
        Poll::Pending
    }
}

struct Abort(u64);

impl Future for Abort {
    type Output = ();
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let waker = LocalWaker::waker(ctx);
        waker.task_abort(self.0);
        Poll::Ready(())
    }
}

/// 主动挂起当前异步任务,直到外部调用Runtime::task_resume后才能唤醒.
///
/// `task_suspend/Runtime::task_resume`的关系类似`wait_event/Runtime::notify_events`.
/// 只是`task_suspend`需要结合到`task_self`使用.
///
/// ```text
/// async fn foo(ctx: Rc<RefCell<TaskContext>>) {
///     let msg_key = ...;
///     non_send_msg(msg_key, ...); // 需要异步等待消息响应
///     let task_id = task_self().await;
///     ctx.save_key(msg_key, task_id);
///     task_suspend().await; // 直到外部调用Runtime::wake(task_id)才返回.
///     let resp = ctx.borrow_mut().get_msg_response(msg_key); // 获取响应
///     ...
/// }
///
/// fn main() {
///     let mut rt = Runtime::new();
///     ...
///     loop {
///         let msg_key = recv_msg();
///         if let Some(task_id) = ctx.get_task_id(msg_key) {
///             // 唤醒等待msg_key响应的异步任务.
///             rt.task_resume(task_id);
///         }
///         rt.sched();
///     }
/// }
/// ```
pub async fn task_suspend() {
    Suspend(false).await
}

/// 强制终止task. 可能导致资源泄露,参考`Runtime::task_force_abort`.
pub async fn task_force_abort(task_id: u64) {
    Abort(task_id).await
}

/// 每次调用返回不重复的event_id
pub async fn new_event_id() -> u64 {
    local_waker().await.new_event_id()
}

/// 获取当前异步任务的id. 同task_self.
pub async fn current_task_id() -> u64 {
    local_waker().await.task_id()
}

/// 获取当前异步任务的id
pub async fn task_self() -> u64 {
    local_waker().await.task_id()
}

struct WaitEvent<'a, T: Any> {
    event_id: u64,
    waker: Option<Waker>,
    mark: PhantomData<&'a T>,
}

impl<'a, T: Any> Future for WaitEvent<'a, T> {
    type Output = Option<&'a T>;
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let waker = LocalWaker::waker(ctx);
        if let Some(data) = waker.event_remove(self.event_id) {
            self.waker = None;
            return Poll::Ready(data.downcast_ref::<T>());
        }
        waker.event_register(self.event_id);
        if self.waker.is_none() {
            self.waker = Some(waker.clone());
        }
        Poll::Pending
    }
}

impl<T: Any> Drop for WaitEvent<'_, T> {
    fn drop(&mut self) {
        // wait_event可能在or组合使用中提前结束,此时需要调用event_unregister避免资源泄露.
        if let Some(waker) = &self.waker {
            waker.event_unregister(self.event_id);
        }
    }
}

/// 等待外部事件通知获取事件数据.
///
/// 调用者保证事件ID不重复,每个事件到来时只会唤醒最后一次调用wait_event的任务.
/// 如果事件数据非期待的类型,返回None.
pub async fn wait_event<'a, T: Any>(event_id: u64) -> Option<&'a T> {
    WaitEvent::<T> {
        event_id,
        waker: None,
        mark: PhantomData,
    }.await
}

/// 发送事件的异步接口.
pub async fn notify_events(events: &[Event<'static>]) {
    local_waker().await.notify_events(events);
}

/// 唤醒异步任务的异步接口.
pub async fn task_resume(task_id: u64) {
    local_waker().await.task_resume(task_id);
}

enum FutureOrOutput<T: Future> {
    Future(T),
    Output(Option<T::Output>),
}

pub struct And<T1: Future, T2: Future> {
    f1: FutureOrOutput<T1>,
    f2: FutureOrOutput<T2>,
}

impl<T1: Future, T2: Future> Unpin for And<T1, T2> {}

impl<T1: Future, T2: Future> Future for And<T1, T2> {
    type Output = (T1::Output, T2::Output);
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if let FutureOrOutput::Future(ref mut f) = &mut self.f1 {
            let pin: Pin<&mut T1> = unsafe { Pin::new_unchecked(f) };
            if let Poll::Ready(r) = pin.poll(ctx) {
                self.f1 = FutureOrOutput::Output(Some(r));
            }
        }
        if let FutureOrOutput::Future(ref mut f) = &mut self.f2 {
            let pin: Pin<&mut T2> = unsafe { Pin::new_unchecked(f) };
            if let Poll::Ready(r) = pin.poll(ctx) {
                self.f2 = FutureOrOutput::Output(Some(r));
            }
        }
        let this: &mut Self = &mut self;
        if let (FutureOrOutput::Output(r1), FutureOrOutput::Output(r2)) =
            (&mut this.f1, &mut this.f2)
        {
            return Poll::Ready((r1.take().unwrap(), r2.take().unwrap()));
        }
        Poll::Pending
    }
}

/// 等待f1, f2都结束后才返回.
pub fn and<T1: Future, T2: Future>(
    f1: T1,
    f2: T2,
) -> And<T1, T2> {
    And {
        f1: FutureOrOutput::Future(f1),
        f2: FutureOrOutput::Future(f2),
    }
}

pub struct Or<T1: Future, T2: Future> {
    f1: T1,
    f2: T2,
}

impl<T1: Future, T2: Future> Unpin for Or<T1, T2> {}

impl<T1: Future, T2: Future> Future for Or<T1, T2> {
    type Output = (Option<T1::Output>, Option<T2::Output>);
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let pin = unsafe { Pin::new_unchecked(&mut self.f1) };
        if let Poll::Ready(r) = pin.poll(ctx) {
            return Poll::Ready((Some(r), None));
        }
        let pin = unsafe { Pin::new_unchecked(&mut self.f2) };
        if let Poll::Ready(r) = pin.poll(ctx) {
            return Poll::Ready((None, Some(r)));
        }
        Poll::Pending
    }
}

/// f1和f2中任何一个结束就返回.
///
/// 这里要注意另一个未结束的future已执行了部分功能,提前被强制终止对业务资源的生命周期管理的影响.
pub fn or<T1: Future, T2: Future>(
    f1: T1,
    f2: T2,
) -> Or<T1, T2> {
    Or { f1, f2 }
}

pub struct PartialOr<T1: Future, T2: Future> {
    is_first: bool,
    f1: FutureOrOutput<T1>,
    f2: FutureOrOutput<T2>,
}

impl<T1: Future, T2: Future> Unpin for PartialOr<T1, T2> {}

impl<T1: Future, T2: Future> Future for PartialOr<T1, T2> {
    type Output = (T1::Output, Option<T2::Output>);
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if let FutureOrOutput::Future(ref mut f) = &mut self.f1 {
            let pin: Pin<&mut T1> = unsafe { Pin::new_unchecked(f) };
            if let Poll::Ready(r) = pin.poll(ctx) {
                if self.is_first {
                    return Poll::Ready((r, None));
                }
                self.f1 = FutureOrOutput::Output(Some(r));
            }
        }

        if self.is_first {
            self.is_first = false;
        }

        if let FutureOrOutput::Future(ref mut f) = &mut self.f2 {
            let pin: Pin<&mut T2> = unsafe { Pin::new_unchecked(f) };
            if let Poll::Ready(r) = pin.poll(ctx) {
                self.f2 = FutureOrOutput::Output(Some(r));
            }
        }

        let this: &mut Self = &mut self;
        if let (FutureOrOutput::Output(r1), FutureOrOutput::Output(r2)) =
            (&mut this.f1, &mut this.f2)
        {
            return Poll::Ready((r1.take().unwrap(), r2.take()));
        }
        Poll::Pending
    }
}

/// f2可能不被执行就返回.
///
/// 如果f1首次调度返回Poll::Ready, 则不会调度f2直接返回(T1::Output, None).
/// 否则一定在f1和f2都返回Poll::Ready后才结束, 返回(T1::Output, Some(T2::Output)).
/// 和or的区别在于,f1和f2要么从未被调度过,要么一定等待调度结束再返回,完全消除异步任务可能被强制终止的场景.
pub fn partial_or<T1: Future, T2: Future>(
    f1: T1,
    f2: T2,
) -> PartialOr<T1, T2> {
    PartialOr {
        is_first: true,
        f1: FutureOrOutput::Future(f1),
        f2: FutureOrOutput::Future(f2),
    }
}

/// 提供future的灵活组合能力.
pub trait Extension {
    /// 等同于hiasync::and.
    ///
    /// 等待self, other都结束后才返回.
    fn and<T>(self, other: T) -> And<Self, T>
    where
        Self: Future + Sized,
        T: Future + Sized,
    {
        and(self, other)
    }

    /// 等同于hiasync::or.
    ///
    /// self和other中任何一个结束就返回.
    /// 这里要注意另一个未结束的future已执行了部分功能,提前被强制终止对业务资源的生命周期管理的影响.
    fn or<T>(self, other: T) -> Or<Self, T>
    where
        Self: Future + Sized,
        T: Future + Sized,
    {
        or(self, other)
    }

    /// 等同于hiasync::partial_or.
    ///
    /// 如果self首次调度返回Poll::Ready, 则不会调度other直接返回(Self::Output, None).
    /// 否则一定在self和other都返回Poll::Ready后才结束返回(Self::Output, Some(T::Output)).
    /// 和or的区别在于,self和other要么从未被调度过,要么一定等待调度结束再返回,完全消除异步任务可能被强制终止的场景.
    fn partial_or<T>(self, other: T) -> PartialOr<Self, T>
    where
        Self: Future + Sized,
        T: Future + Sized,
    {
        partial_or(self, other)
    }
}

impl<T: Future + Sized> Extension for T {}