hiasync 0.1.4

Supports only single-threaded asynchronous runtime
Documentation
use crate::{JoinHandle, LocalWaker, Waker};
use std::any::Any;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};

struct Spawn<T> {
    future: Option<T>,
}

impl<T> Unpin for Spawn<T> {}

impl<T: Future + 'static> Future for Spawn<T> {
    type Output = JoinHandle<T>;
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let waker = LocalWaker::waker_mut(ctx);
        Poll::Ready(waker.task_new(self.future.take().unwrap()))
    }
}

/// 新建异步任务,可实现和当前异步任务的并发调度.
pub fn spawn<T>(future: T) -> impl Future<Output = JoinHandle<T>>
where
    T: Future + 'static,
    T::Output: 'static,
{
    Spawn::<T> {
        future: Some(future),
    }
}

struct Yield(bool);

impl Future for Yield {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.0 {
            return Poll::Ready(());
        }
        let waker = LocalWaker::waker(ctx);
        waker.wake_by_ref();
        self.0 = true;
        Poll::Pending
    }
}

/// 主动放弃本次调度切换到其他任务.
pub fn sched_yield() -> impl Future<Output = ()> {
    Yield(false)
}

struct EventId;

impl Future for EventId {
    type Output = u64;
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let waker = LocalWaker::waker(ctx);
        Poll::Ready(waker.new_event_id())
    }
}

/// 每次调用返回不重复的event_id
pub fn new_event_id() -> impl Future<Output = u64> {
    EventId
}

struct CurrentTask;

impl Future for CurrentTask {
    type Output = u64;
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let waker = LocalWaker::waker(ctx);
        Poll::Ready(waker.task_id())
    }
}

/// 获取当前异步任务的id
pub fn current_task_id() -> impl Future<Output = u64> {
    CurrentTask
}

struct WaitEvent<'a, T: Any> {
    event_id: u64,
    waker: Option<Waker>,
    mark: PhantomData<&'a T>,
}

impl<'a, T: Any> Future for WaitEvent<'a, T> {
    type Output = Option<&'a T>;
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let waker = LocalWaker::waker(ctx);
        if let Some(data) = waker.event_remove(self.event_id) {
            self.waker = None;
            return Poll::Ready(data.downcast_ref::<T>());
        }
        waker.event_register(self.event_id);
        if self.waker.is_none() {
            self.waker = Some(waker.clone());
        }
        Poll::Pending
    }
}

impl<T: Any> Drop for WaitEvent<'_, T> {
    fn drop(&mut self) {
        // wait_event可能在or组合使用中提前结束,此时需要调用event_unregister避免资源泄露.
        if let Some(waker) = &self.waker {
            waker.event_unregister(self.event_id);
        }
    }
}

/// 等待外部事件通知获取事件数据.
///
/// 调用者保证事件ID不重复,每个事件到来时只会唤醒最后一次调用wait_event的任务.
/// 如果事件数据非期待的类型,返回None.
pub fn wait_event<'a, T: Any>(event_id: u64) -> impl Future<Output = Option<&'a T>> {
    WaitEvent::<T> {
        event_id,
        waker: None,
        mark: PhantomData,
    }
}

enum FutureOrOutput<T: Future> {
    Future(T),
    Output(Option<T::Output>),
}

struct And<T1: Future, T2: Future> {
    f1: FutureOrOutput<T1>,
    f2: FutureOrOutput<T2>,
}

impl<T1: Future, T2: Future> Unpin for And<T1, T2> {}

impl<T1: Future, T2: Future> Future for And<T1, T2> {
    type Output = (T1::Output, T2::Output);
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if let FutureOrOutput::Future(ref mut f) = &mut self.f1 {
            let pin: Pin<&mut T1> = unsafe { Pin::new_unchecked(f) };
            if let Poll::Ready(r) = pin.poll(ctx) {
                self.f1 = FutureOrOutput::Output(Some(r));
            }
        }
        if let FutureOrOutput::Future(ref mut f) = &mut self.f2 {
            let pin: Pin<&mut T2> = unsafe { Pin::new_unchecked(f) };
            if let Poll::Ready(r) = pin.poll(ctx) {
                self.f2 = FutureOrOutput::Output(Some(r));
            }
        }
        let this: &mut Self = &mut self;
        if let (FutureOrOutput::Output(r1), FutureOrOutput::Output(r2)) =
            (&mut this.f1, &mut this.f2)
        {
            return Poll::Ready((r1.take().unwrap(), r2.take().unwrap()));
        }
        Poll::Pending
    }
}

/// 等待f1, f2都结束后才返回.
pub fn and<T1: Future, T2: Future>(
    f1: T1,
    f2: T2,
) -> impl Future<Output = (T1::Output, T2::Output)> {
    And {
        f1: FutureOrOutput::Future(f1),
        f2: FutureOrOutput::Future(f2),
    }
}

struct Or<T1: Future, T2: Future> {
    f1: T1,
    f2: T2,
}

impl<T1: Future, T2: Future> Unpin for Or<T1, T2> {}

impl<T1: Future, T2: Future> Future for Or<T1, T2> {
    type Output = (Option<T1::Output>, Option<T2::Output>);
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let pin = unsafe { Pin::new_unchecked(&mut self.f1) };
        if let Poll::Ready(r) = pin.poll(ctx) {
            return Poll::Ready((Some(r), None));
        }
        let pin = unsafe { Pin::new_unchecked(&mut self.f2) };
        if let Poll::Ready(r) = pin.poll(ctx) {
            return Poll::Ready((None, Some(r)));
        }
        Poll::Pending
    }
}

/// f1和f2中任何一个结束就返回.
///
/// 这里要注意另一个未结束的future已执行了部分功能,提前被强制终止对业务资源的生命周期管理的影响.
pub fn or<T1: Future, T2: Future>(
    f1: T1,
    f2: T2,
) -> impl Future<Output = (Option<T1::Output>, Option<T2::Output>)> {
    Or { f1, f2 }
}

struct PartialOr<T1: Future, T2: Future> {
    is_first: bool,
    f1: FutureOrOutput<T1>,
    f2: FutureOrOutput<T2>,
}

impl<T1: Future, T2: Future> Unpin for PartialOr<T1, T2> {}

impl<T1: Future, T2: Future> Future for PartialOr<T1, T2> {
    type Output = (T1::Output, Option<T2::Output>);
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if let FutureOrOutput::Future(ref mut f) = &mut self.f1 {
            let pin: Pin<&mut T1> = unsafe { Pin::new_unchecked(f) };
            if let Poll::Ready(r) = pin.poll(ctx) {
                if self.is_first {
                    return Poll::Ready((r, None));
                }
                self.f1 = FutureOrOutput::Output(Some(r));
            }
        }

        if self.is_first {
            self.is_first = false;
        }

        if let FutureOrOutput::Future(ref mut f) = &mut self.f2 {
            let pin: Pin<&mut T2> = unsafe { Pin::new_unchecked(f) };
            if let Poll::Ready(r) = pin.poll(ctx) {
                self.f2 = FutureOrOutput::Output(Some(r));
            }
        }

        let this: &mut Self = &mut self;
        if let (FutureOrOutput::Output(r1), FutureOrOutput::Output(r2)) =
            (&mut this.f1, &mut this.f2)
        {
            return Poll::Ready((r1.take().unwrap(), r2.take()));
        }
        Poll::Pending
    }
}

/// f2可能不被执行就返回.
///
/// 如果f1首次调度返回Poll::Ready, 则不会调度f2直接返回(T1::Output, None).
/// 否则一定在f1和f2都返回Poll::Ready后才结束, 返回(T1::Output, Some(T2::Output)).
/// 和or的区别在于,f1和f2要么从未被调度过,要么一定等待调度结束再返回,完全消除异步任务可能被强制终止的场景.
pub fn partial_or<T1: Future, T2: Future>(
    f1: T1,
    f2: T2,
) -> impl Future<Output = (T1::Output, Option<T2::Output>)> {
    PartialOr {
        is_first: true,
        f1: FutureOrOutput::Future(f1),
        f2: FutureOrOutput::Future(f2),
    }
}

/// 提供future的灵活组合能力.
pub trait Extension {
    /// 等同于hiasync::and.
    ///
    /// 等待self, other都结束后才返回.
    fn and<T>(self, other: T) -> impl Future<Output = (Self::Output, T::Output)>
    where
        Self: Future + Sized,
        T: Future + Sized,
    {
        and(self, other)
    }

    /// 等同于hiasync::or.
    ///
    /// self和other中任何一个结束就返回.
    /// 这里要注意另一个未结束的future已执行了部分功能,提前被强制终止对业务资源的生命周期管理的影响.
    fn or<T>(self, other: T) -> impl Future<Output = (Option<Self::Output>, Option<T::Output>)>
    where
        Self: Future + Sized,
        T: Future + Sized,
    {
        or(self, other)
    }

    /// 等同于hiasync::partial_or.
    ///
    /// 如果self首次调度返回Poll::Ready, 则不会调度other直接返回(Self::Output, None).
    /// 否则一定在self和other都返回Poll::Ready后才结束返回(Self::Output, Some(T::Output)).
    /// 和or的区别在于,self和other要么从未被调度过,要么一定等待调度结束再返回,完全消除异步任务可能被强制终止的场景.
    fn partial_or<T>(self, other: T) -> impl Future<Output = (Self::Output, Option<T::Output>)>
    where
        Self: Future + Sized,
        T: Future + Sized,
    {
        partial_or(self, other)
    }
}

impl<T: Future + Sized> Extension for T {}