#![deny(missing_docs)]
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::sync::Arc;
use futures::executor::{self, Notify, Spawn};
use futures::future;
use futures::prelude::*;
use futures::sync::oneshot;
use js_sys::{Function, Promise};
use wasm_bindgen::prelude::*;
pub struct JsFuture {
resolved: oneshot::Receiver<JsValue>,
rejected: oneshot::Receiver<JsValue>,
callbacks: Option<(Closure<FnMut(JsValue)>, Closure<FnMut(JsValue)>)>,
}
impl From<Promise> for JsFuture {
fn from(js: Promise) -> JsFuture {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let mut tx1 = Some(tx1);
let resolve = Closure::wrap(Box::new(move |val| {
drop(tx1.take().unwrap().send(val));
}) as Box<FnMut(_)>);
let mut tx2 = Some(tx2);
let reject = Closure::wrap(Box::new(move |val| {
drop(tx2.take().unwrap().send(val));
}) as Box<FnMut(_)>);
js.then2(&resolve, &reject);
JsFuture {
resolved: rx1,
rejected: rx2,
callbacks: Some((resolve, reject)),
}
}
}
impl Future for JsFuture {
type Item = JsValue;
type Error = JsValue;
fn poll(&mut self) -> Poll<JsValue, JsValue> {
if let Ok(Async::Ready(val)) = self.resolved.poll() {
drop(self.callbacks.take());
return Ok(val.into());
}
if let Ok(Async::Ready(val)) = self.rejected.poll() {
drop(self.callbacks.take());
return Err(val);
}
Ok(Async::NotReady)
}
}
pub fn future_to_promise<F>(future: F) -> Promise
where
F: Future<Item = JsValue, Error = JsValue> + 'static,
{
_future_to_promise(Box::new(future))
}
fn _future_to_promise(future: Box<Future<Item = JsValue, Error = JsValue>>) -> Promise {
let mut future = Some(executor::spawn(future));
return Promise::new(&mut |resolve, reject| {
Package::poll(&Arc::new(Package {
spawn: RefCell::new(future.take().unwrap()),
resolve,
reject,
notified: Cell::new(State::Notified),
}));
});
struct Package {
spawn: RefCell<Spawn<Box<Future<Item = JsValue, Error = JsValue>>>>,
notified: Cell<State>,
resolve: Function,
reject: Function,
}
enum State {
Polling,
Notified,
Waiting(Arc<Package>),
}
unsafe impl Send for Package {}
unsafe impl Sync for Package {}
impl Package {
fn poll(me: &Arc<Package>) {
loop {
match me.notified.replace(State::Polling) {
State::Notified => {}
State::Polling => {
me.notified.set(State::Waiting(me.clone()));
break;
}
State::Waiting(_) => panic!("shouldn't see waiting state!"),
}
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) {
Ok(Async::Ready(value)) => (value, &me.resolve),
Err(value) => (value, &me.reject),
Ok(Async::NotReady) => continue,
};
drop(f.call1(&JsValue::undefined(), &val));
break;
}
}
}
impl Notify for Package {
fn notify(&self, _id: usize) {
let me = match self.notified.replace(State::Notified) {
State::Waiting(me) => me,
State::Notified => return,
State::Polling => return,
};
let promise = Promise::resolve(&JsValue::undefined());
let slot = Rc::new(RefCell::new(None));
let slot2 = slot.clone();
let closure = Closure::wrap(Box::new(move |_| {
let myself = slot2.borrow_mut().take();
debug_assert!(myself.is_some());
Package::poll(&me);
}) as Box<FnMut(JsValue)>);
promise.then(&closure);
*slot.borrow_mut() = Some(closure);
}
}
}
pub fn spawn_local<F>(future: F)
where
F: Future<Item = (), Error = ()> + 'static,
{
future_to_promise(
future
.map(|()| JsValue::undefined())
.or_else(|()| future::ok::<JsValue, JsValue>(JsValue::undefined())),
);
}