1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
//! Configure a global executor you can reuse everywhere

#![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![no_std]
extern crate alloc;

use alloc::boxed::Box;
use async_channel::Receiver;
use core::{
    fmt,
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};
use executor_trait::Executor;
use once_cell::sync::OnceCell;

static EXECUTOR: OnceCell<Box<dyn Executor + Send + Sync>> = OnceCell::new();

pub fn init(executor: impl Executor + Send + Sync + 'static) {
    EXECUTOR.set(Box::new(executor)).map_err(|_| ()).unwrap();
}

pub fn block_on<T: 'static>(future: impl Future<Output = T> + 'static) -> T {
    let (send, recv) = async_channel::bounded(1);
    EXECUTOR.get().unwrap().block_on(Box::pin(async move {
        drop(send.send(future.await).await);
    }));
    recv.try_recv().unwrap()
}

pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
    let (send, recv) = async_channel::bounded(1);
    let inner = EXECUTOR.get().unwrap().spawn(Box::pin(async move {
        drop(send.send(future.await).await);
    }));
    Task {
        inner,
        recv: recv.into(),
    }
}

pub fn spawn_local<T: 'static>(future: impl Future<Output = T> + 'static) -> Task<T> {
    let (send, recv) = async_channel::bounded(1);
    let inner = EXECUTOR.get().unwrap().spawn_local(Box::pin(async move {
        drop(send.send(future.await).await);
    }));
    Task {
        inner,
        recv: recv.into(),
    }
}

pub async fn spawn_blocking<T: Send + 'static>(f: impl FnOnce() -> T + Send + 'static) -> T {
    let (send, recv) = async_channel::bounded(1);
    EXECUTOR
        .get()
        .unwrap()
        .spawn_blocking(Box::new(|| {
            let res = f();
            crate::spawn(async move {
                drop(send.send(res).await);
            })
            .detach();
        }))
        .await;
    recv.recv().await.unwrap()
}

pub struct Task<T> {
    inner: Box<dyn executor_trait::Task>,
    recv: ReceiverWrapper<T>,
}

impl<T: 'static> Task<T> {
    pub fn detach(self) {
        self.inner.detach();
    }

    pub async fn cancel(self) -> Option<T> {
        self.inner.cancel().await?;
        Some(self.recv.await)
    }
}

impl<T> fmt::Debug for Task<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Task").finish()
    }
}

impl<T: 'static> Future for Task<T> {
    type Output = T;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
        Pin::new(&mut self.recv).poll(cx)
    }
}

struct ReceiverWrapper<T> {
    recv: Receiver<T>,
    recv_fut: Option<Pin<Box<dyn Future<Output = T>>>>,
}

impl<T: 'static> Future for ReceiverWrapper<T> {
    type Output = T;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
        if self.recv_fut.is_none() {
            let recv = self.recv.clone();
            self.recv_fut = Some(Box::pin(async move { recv.recv().await.unwrap() }));
        }
        match self.recv_fut.as_mut().unwrap().as_mut().poll(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(t) => {
                self.recv_fut = None;
                Poll::Ready(t)
            }
        }
    }
}

impl<T> From<Receiver<T>> for ReceiverWrapper<T> {
    fn from(recv: Receiver<T>) -> Self {
        Self {
            recv,
            recv_fut: None,
        }
    }
}