hiasync 0.1.2

Supports only single-threaded asynchronous runtime
Documentation
use crate::{JoinHandle, LocalWaker, Waker};
use std::any::Any;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};

struct Spawn<T> {
    future: Option<T>,
}

impl<T> Unpin for Spawn<T> {}

impl<T: Future + 'static> Future for Spawn<T> {
    type Output = JoinHandle<T>;
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let waker = LocalWaker::waker_mut(ctx);
        Poll::Ready(waker.task_new(self.future.take().unwrap()))
    }
}

/// 新建异步任务,可实现和当前异步任务的并发调度.
pub fn spawn<T: Future + 'static>(future: T) -> impl Future<Output = JoinHandle<T>> {
    Spawn::<T>{ future: Some(future), }
}

struct Yield(bool);

impl Future for Yield {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.0 {
            return Poll::Ready(());
        }
        let waker = LocalWaker::waker(ctx);
        waker.wake_by_ref();
        self.0 = true;
        Poll::Pending
    }
}

/// 主动放弃本次调度切换到其他任务.
pub fn sched_yield() -> impl Future<Output = ()> {
    Yield(false)
}

struct EventId;

impl Future for EventId {
    type Output = u64;
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let waker = LocalWaker::waker(ctx);
        Poll::Ready(waker.new_event_id())
    }
}

/// 每次调用返回不重复的event_id
pub fn new_event_id() -> impl Future<Output = u64> {
    EventId
}

struct CurrentTask;

impl Future for CurrentTask {
    type Output = u64;
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let waker = LocalWaker::waker(ctx);
        Poll::Ready(waker.current_task_id())
    }
}

/// 获取当前异步任务的id
pub fn current_task_id() -> impl Future<Output = u64> {
    CurrentTask
}

struct WaitEvent<'a, T: Any> {
    event_id: u64,
    waker: Option<Waker>,
    mark: PhantomData<&'a T>,
}

impl<'a, T: Any> Future for WaitEvent<'a, T> {
    type Output = Option<&'a T>;
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let waker = LocalWaker::waker(ctx);
        if let Some(data) = waker.event_remove(self.event_id) {
            self.waker = None;
            return Poll::Ready(data.downcast_ref::<T>());
        }
        waker.event_register(self.event_id);
        if self.waker.is_none() {
            self.waker = Some(waker.clone());
        }
        Poll::Pending
    }
}

impl<T: Any> Drop for WaitEvent<'_, T> {
    fn drop(&mut self) {
        // wait_event可能在or组合使用中提前结束,此时需要调用event_unregister避免资源泄露.
        if let Some(waker) = &self.waker {
            waker.event_unregister(self.event_id);
        }
    }
}

/// 等待外部事件通知获取事件数据.
///
/// 调用者保证事件ID不重复,每个事件到来时只会唤醒最后一次调用wait_event的任务.
/// 如果事件数据非期待的类型,返回None.
pub fn wait_event<'a, T: Any>(event_id: u64) -> impl Future<Output = Option<&'a T>> {
    WaitEvent::<T> {
        event_id,
        waker: None,
        mark: PhantomData,
    }
}

enum FutureOrOutput<T: Future> {
    Future(T),
    Output(Option<T::Output>),
}

struct And<T1: Future, T2: Future> {
    f1: FutureOrOutput<T1>,
    f2: FutureOrOutput<T2>,
}

impl<T1: Future, T2: Future> Unpin for And<T1, T2> {}

impl<T1: Future, T2: Future> Future for And<T1, T2> {
    type Output = (T1::Output, T2::Output);
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if let FutureOrOutput::Future(ref mut f) = &mut self.f1 {
            let pin: Pin<&mut T1> = unsafe { Pin::new_unchecked(f) };
            if let Poll::Ready(r) = pin.poll(ctx) {
                self.f1 = FutureOrOutput::Output(Some(r));
            }
        }
        if let FutureOrOutput::Future(ref mut f) = &mut self.f2 {
            let pin: Pin<&mut T2> = unsafe { Pin::new_unchecked(f) };
            if let Poll::Ready(r) = pin.poll(ctx) {
                self.f2 = FutureOrOutput::Output(Some(r));
            }
        }
        let this: &mut Self = &mut self;
        if let (FutureOrOutput::Output(r1), FutureOrOutput::Output(r2)) = (&mut this.f1, &mut this.f2) {
            return Poll::Ready((r1.take().unwrap(), r2.take().unwrap()));
        }
        Poll::Pending
    }
}

/// 等待f1, f2都结束后才返回.
pub fn and<T1: Future, T2: Future>(f1: T1, f2: T2) -> impl Future<Output = (T1::Output, T2::Output)> {
    And { f1: FutureOrOutput::Future(f1), f2: FutureOrOutput::Future(f2) }
}

struct Or<T1: Future, T2: Future> {
    f1: T1,
    f2: T2,
}

impl<T1: Future, T2: Future> Unpin for Or<T1, T2> {}

impl<T1: Future, T2: Future> Future for Or<T1, T2> {
    type Output = (Option<T1::Output>, Option<T2::Output>);
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let pin = unsafe { Pin::new_unchecked(&mut self.f1) };
        if let Poll::Ready(r) = pin.poll(ctx) {
            return Poll::Ready((Some(r), None));
        }
        let pin = unsafe { Pin::new_unchecked(&mut self.f2) };
        if let Poll::Ready(r) = pin.poll(ctx) {
            return Poll::Ready((None, Some(r)));
        }
        Poll::Pending
    }
}

/// f1和f2中任何一个结束就返回.
/// 这里要注意另一个未结束的future已执行了部分功能,提前被强制终止对业务资源的生命周期管理的影响.
pub fn or<T1: Future, T2: Future>(f1: T1, f2: T2) -> impl Future<Output = (Option<T1::Output>, Option<T2::Output>)> {
    Or { f1, f2 }
}

/// 提供future的灵活组合能力.
pub trait Extension {
    /// 等同于hiasync::and, 等待f1, f2都结束后才返回.
    fn and<T>(self, other: T) -> impl Future<Output = (Self::Output, T::Output)> where Self: Future + Sized, T: Future + Sized{
        and(self, other)
    }
    
    /// 等同于hiasync::or, f1和f2中任何一个结束就返回.
    /// 这里要注意另一个未结束的future已执行了部分功能,提前被强制终止对业务资源的生命周期管理的影响.
    fn or<T>(self, other: T) -> impl Future<Output = (Option<Self::Output>, Option<T::Output>)> where Self: Future + Sized, T: Future + Sized {
        or(self, other)
    }
}

impl<T: Future + Sized> Extension for T {}