#![deny(warnings, missing_docs)]
extern crate futures;
use futures::{Future};
pub trait Spawn<T: Future<Item = (), Error = ()>> {
fn spawn_detached(&self, f: T);
}
#[cfg(feature = "use_std")]
pub use with_std::{NewThread, SpawnHandle, Spawned, SpawnHelper};
#[cfg(feature = "use_std")]
mod with_std {
use {Spawn};
use futures::{Future, IntoFuture, Poll, Async};
use futures::future::{self, CatchUnwind, Lazy};
use futures::sync::oneshot;
use std::{thread};
use std::panic::{self, AssertUnwindSafe};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
#[must_use]
pub struct SpawnHandle<T, E> {
inner: oneshot::Receiver<thread::Result<Result<T, E>>>,
keep_running_flag: Arc<AtomicBool>,
}
pub struct Spawned<F: Future> {
future: CatchUnwind<AssertUnwindSafe<F>>,
tx: Option<oneshot::Sender<thread::Result<Result<F::Item, F::Error>>>>,
keep_running_flag: Arc<AtomicBool>,
}
pub struct NewThread;
pub trait SpawnHelper {
fn spawn<F>(&self, future: F) -> SpawnHandle<F::Item, F::Error>
where F: Future,
Self: Spawn<Spawned<F>>
{
use futures::sync::oneshot;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
let (tx, rx) = oneshot::channel();
let keep_running_flag = Arc::new(AtomicBool::new(false));
let sender = Spawned {
future: AssertUnwindSafe(future).catch_unwind(),
tx: Some(tx),
keep_running_flag: keep_running_flag.clone(),
};
self.spawn_detached(sender);
SpawnHandle {
inner: rx,
keep_running_flag: keep_running_flag,
}
}
fn spawn_fn<F, R>(&self, f: F) -> SpawnHandle<R::Item, R::Error>
where F: FnOnce() -> R,
R: IntoFuture,
Self: Spawn<Spawned<Lazy<F, R>>>,
{
self.spawn(future::lazy(f))
}
}
impl<T> SpawnHelper for T {
}
impl<T, E> SpawnHandle<T, E> {
pub fn detach(self) {
self.keep_running_flag.store(true, SeqCst);
}
}
impl<T, E> Future for SpawnHandle<T, E> {
type Item = T;
type Error = E;
fn poll(&mut self) -> Poll<T, E> {
match self.inner.poll().expect("shouldn't be canceled") {
Async::Ready(Ok(Ok(e))) => Ok(e.into()),
Async::Ready(Ok(Err(e))) => Err(e),
Async::Ready(Err(e)) => panic::resume_unwind(e),
Async::NotReady => Ok(Async::NotReady),
}
}
}
impl<F: Future> Future for Spawned<F> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel() {
if !self.keep_running_flag.load(SeqCst) {
return Ok(().into())
}
}
let res = match self.future.poll() {
Ok(Async::Ready(e)) => Ok(e),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => Err(e),
};
self.tx.take().unwrap().complete(res);
Ok(Async::Ready(()))
}
}
impl<T: Future<Item = (), Error = ()> + Send + 'static> Spawn<T> for NewThread {
fn spawn_detached(&self, future: T) {
use std::thread;
thread::spawn(move || {
let _ = future.wait();
});
}
}
#[test]
fn test_new_thread() {
let new_thread = NewThread;
let res = new_thread.spawn_fn(|| Ok::<u32, ()>(1));
assert_eq!(1, res.wait().unwrap());
}
}
#[cfg(feature = "tokio")]
mod tokio {
extern crate tokio_core;
use {Spawn};
use futures::Future;
use self::tokio_core::reactor::{Core, Handle, Remote};
impl<T: Future<Item = (), Error = ()> + 'static> Spawn<T> for Handle {
fn spawn_detached(&self, future: T) {
Handle::spawn(self, future);
}
}
impl<T: Future<Item = (), Error = ()> + 'static> Spawn<T> for Core {
fn spawn_detached(&self, future: T) {
self.handle().spawn_detached(future);
}
}
impl<T: Future<Item = (), Error = ()> + Send + 'static> Spawn<T> for Remote {
fn spawn_detached(&self, future: T) {
Remote::spawn(self, move |_| future);
}
}
}