use std::fmt;
use std::pin::Pin;
use std::cell::{Cell, RefCell};
use std::sync::Arc;
use std::future::Future;
use std::task::{Poll, Context};
use std::collections::VecDeque;
use futures_util::task::ArcWake;
use futures_util::future::FutureExt;
use futures_channel::oneshot;
use lazy_static::lazy_static;
use js_sys::Promise;
use wasm_bindgen::prelude::*;
pub struct JsFuture {
resolved: oneshot::Receiver<JsValue>,
rejected: oneshot::Receiver<JsValue>,
_cb_resolve: Closure<FnMut(JsValue)>,
_cb_reject: Closure<FnMut(JsValue)>,
}
impl fmt::Debug for JsFuture {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "JsFuture {{ ... }}")
}
}
impl From<Promise> for JsFuture {
fn from(js: Promise) -> JsFuture {
let (tx1, rx1) = oneshot::channel();
let cb_resolve = Closure::once(move |val| {
tx1.send(val).unwrap_throw();
});
let (tx2, rx2) = oneshot::channel();
let cb_reject = Closure::once(move |val| {
tx2.send(val).unwrap_throw();
});
js.then2(&cb_resolve, &cb_reject);
JsFuture {
resolved: rx1,
rejected: rx2,
_cb_resolve: cb_resolve,
_cb_reject: cb_reject,
}
}
}
impl Future for JsFuture {
type Output = Result<JsValue, JsValue>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let Poll::Ready(val) = self.resolved.poll_unpin(cx) {
return Poll::Ready(Ok(val.unwrap_throw()));
}
if let Poll::Ready(val) = self.rejected.poll_unpin(cx) {
return Poll::Ready(Err(val.unwrap_throw()));
}
Poll::Pending
}
}
pub fn future_to_promise<F>(future: F) -> Promise
where
F: Future<Output = Result<JsValue, JsValue>> + 'static,
{
let mut future = Some(future);
Promise::new(&mut |resolve, reject| {
spawn_local(future.take().unwrap_throw().map(move |val| {
match val {
Ok(val) => {
resolve.call1(&JsValue::undefined(), &val).unwrap_throw();
},
Err(val) => {
reject.call1(&JsValue::undefined(), &val).unwrap_throw();
},
}
}));
})
}
pub fn spawn_local<F>(future: F)
where
F: Future<Output = ()> + 'static,
{
struct Task {
future: RefCell<Option<Pin<Box<dyn Future<Output = ()> + 'static>>>>,
is_queued: Cell<bool>,
}
impl Task {
#[inline]
fn new<F>(future: F) -> Arc<Self> where F: Future<Output = ()> + 'static {
Arc::new(Self {
future: RefCell::new(Some(Box::pin(future))),
is_queued: Cell::new(false),
})
}
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
if arc_self.is_queued.replace(true) {
return;
}
let mut lock = EXECUTOR.tasks.borrow_mut();
lock.push_back(arc_self.clone());
EXECUTOR.next_tick.schedule();
}
}
struct NextTick {
is_spinning: Cell<bool>,
promise: Promise,
closure: Closure<dyn FnMut(JsValue)>,
}
impl NextTick {
#[inline]
fn new<F>(mut f: F) -> Self where F: FnMut() + 'static {
Self {
is_spinning: Cell::new(false),
promise: Promise::resolve(&JsValue::null()),
closure: Closure::wrap(Box::new(move |_| {
f();
})),
}
}
fn schedule(&self) {
if self.is_spinning.replace(true) {
return;
}
self.promise.then(&self.closure);
}
fn done(&self) {
self.is_spinning.set(false);
}
}
struct Executor {
tasks: RefCell<VecDeque<Arc<Task>>>,
next_tick: NextTick,
}
unsafe impl Send for Executor {}
unsafe impl Sync for Executor {}
lazy_static! {
static ref EXECUTOR: Executor = Executor {
tasks: RefCell::new(VecDeque::new()),
next_tick: NextTick::new(|| {
let tasks = &EXECUTOR.tasks;
loop {
let mut lock = tasks.borrow_mut();
match lock.pop_front() {
Some(task) => {
let mut future = task.future.borrow_mut();
let poll = {
let mut future = future.as_mut().unwrap_throw();
task.is_queued.set(false);
drop(lock);
let waker = ArcWake::into_waker(task.clone());
let cx = &mut Context::from_waker(&waker);
Pin::new(&mut future).poll(cx)
};
if let Poll::Ready(_) = poll {
*future = None;
}
},
None => {
EXECUTOR.next_tick.done();
break;
},
}
}
}),
};
}
ArcWake::wake_by_ref(&Task::new(future));
}