#![deny(warnings)]
use arc_swap::*;
use dummy_waker::*;
use futures::*;
use futures::channel::mpsc::*;
use gloo_timers::callback::*;
use once_cell::sync::*;
use std::collections::*;
use std::future::*;
use std::pin::*;
use std::sync::*;
use std::task::*;
use thiserror::*;
use web_sys::*;
#[derive(Clone, Debug, Error, PartialEq)]
pub enum ExecutorInitializationError {
#[error("The executor was already started.")]
AlreadyStarted(),
#[error("The executor was not started on the main thread.")]
IncorrectThread(),
}
type SendableFuture = dyn FnOnce() -> Pin<Box<dyn Future<Output = ()>>> + Send;
static TASK_QUEUE: OnceCell<UnboundedSender<Box<SendableFuture>>> = OnceCell::new();
pub fn initialize() -> Result<(), ExecutorInitializationError> {
let mut executor = MainExecutor::new()?;
Interval::new(0, move || executor.poll()).forget();
Ok(())
}
pub fn spawn<F: 'static + IntoFuture + Send>(f: F) -> impl Future<Output = F::Output> + Send + Sync where F::Output: Send {
let sync = FutureSynchronization::default();
let result_ref = sync.sender.clone();
let waker_ref = sync.waker.clone();
TASK_QUEUE.get().expect("Main executor was not initialized.").unbounded_send(Box::new(move || Box::pin(async move {
drop(result_ref.unbounded_send(f.into_future().await));
if let Some(waker) = &*waker_ref.load() {
waker.wake_by_ref();
}
}))).expect("Could not spawn new task.");
sync
}
struct MainExecutor {
futures: VecDeque<Pin<Box<dyn Future<Output = ()>>>>,
receiver: UnboundedReceiver<Box<SendableFuture>>
}
impl MainExecutor {
pub fn new() -> Result<Self, ExecutorInitializationError> {
if window().is_some() {
let (send, receiver) = unbounded();
TASK_QUEUE.set(send).map_err(|_| ExecutorInitializationError::AlreadyStarted())?;
Ok(Self {
futures: VecDeque::new(),
receiver
})
}
else {
Err(ExecutorInitializationError::IncorrectThread())
}
}
pub fn poll(&mut self) {
self.add_new_futures();
let waker = dummy_waker();
let mut cx = Context::from_waker(&waker);
let to_poll = self.futures.len();
for _ in 0..to_poll {
let mut fut = self.futures.pop_front().expect("Could not take future from futures queue.");
match fut.poll_unpin(&mut cx) {
Poll::Pending => self.futures.push_back(fut),
Poll::Ready(()) => {}
}
}
}
fn add_new_futures(&mut self) {
while let Ok(Some(fut)) = self.receiver.try_next() {
self.futures.push_back(fut());
}
}
}
struct FutureSynchronization<T> {
pub sender: UnboundedSender<T>,
pub receiver: UnboundedReceiver<T>,
pub waker: Arc<ArcSwapOption<Waker>>
}
impl<T> Default for FutureSynchronization<T> {
fn default() -> Self {
let (sender, receiver) = unbounded();
Self { sender, receiver, waker: Default::default() }
}
}
impl<T> Future for FutureSynchronization<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.waker.store(Some(Arc::new(cx.waker().clone())));
if let Ok(Some(res)) = self.receiver.try_next() {
Poll::Ready(res)
}
else {
Poll::Pending
}
}
}