futures_spawn/
lib.rs

1//! An abstraction for spawning futures
2//!
3//! [futures-rs](http://github.com/alexcrichton/futures-rs) provides a task
4//! abstraction and the ability for custom executors to manage how future
5//! execution is scheduled across them.
6//!
7//! `futures-spawn` provides an abstraction representing the act of spawning a
8//! future. This enables writing code that is not hard coded to a specific
9//! executor.
10
11#![deny(warnings, missing_docs)]
12
13extern crate futures;
14
15use futures::{Future};
16
17/// Value that can spawn a future
18///
19/// On spawn, the executor takes ownership of the future and becomes responsible
20/// to call `Future::poll()` whenever a readiness notification is raised.
21pub trait Spawn<T: Future<Item = (), Error = ()>> {
22
23    /// Spawns a future to run on this `Spawn`.
24    ///
25    /// This function will return immediately, and schedule the future `f` to
26    /// run on `self`. The details of scheduling and execution are left to the
27    /// implementations of `Spawn`.
28    fn spawn_detached(&self, f: T);
29}
30
31#[cfg(feature = "use_std")]
32pub use with_std::{NewThread, SpawnHandle, Spawned, SpawnHelper};
33
34#[cfg(feature = "use_std")]
35mod with_std {
36    use {Spawn};
37    use futures::{Future, IntoFuture, Poll, Async};
38    use futures::future::{self, CatchUnwind, Lazy};
39    use futures::sync::oneshot;
40
41    use std::{thread};
42    use std::panic::{self, AssertUnwindSafe};
43    use std::sync::Arc;
44    use std::sync::atomic::AtomicBool;
45    use std::sync::atomic::Ordering::SeqCst;
46
47    /// The type of future returned from the `Spawn::spawn` function, which
48    /// proxies the futures running on the thread pool.
49    ///
50    /// This future will resolve in the same way as the underlying future, and it
51    /// will propagate panics.
52    #[must_use]
53    pub struct SpawnHandle<T, E> {
54        inner: oneshot::Receiver<thread::Result<Result<T, E>>>,
55        keep_running_flag: Arc<AtomicBool>,
56    }
57
58    /// Contains a future that was spawned
59    pub struct Spawned<F: Future> {
60        future: CatchUnwind<AssertUnwindSafe<F>>,
61        tx: Option<oneshot::Sender<thread::Result<Result<F::Item, F::Error>>>>,
62        keep_running_flag: Arc<AtomicBool>,
63    }
64
65    /// Spawn all futures on a new thread
66    ///
67    /// This is the most basic `Spawn` implementation. Each call to `spawn` results
68    /// in a new thread dedicated to processing the given future to completion.
69    pub struct NewThread;
70
71    /// Additional strategies for spawning a future.
72    ///
73    /// These functions have to be on a separate trait vs. on the `Spawn` trait
74    /// in order to make rustc happy.
75    pub trait SpawnHelper {
76        /// Spawns a future to run on this `Spawn`, returning a future representing
77        /// the produced value.
78        ///
79        ///
80        /// This function will return immediately, and schedule the future `f` to
81        /// run on `self`. The details of scheduling and execution are left to the
82        /// implementations of `Spawn`. The returned future serves as a proxy to the
83        /// computation that `F` is running.
84        ///
85        /// To simply run an arbitrary closure and extract the result, you can use
86        /// the `future::lazy` combinator to defer work to executing on `&self`.
87        ///
88        /// Note that if the future `f` panics it will be caught by default and the
89        /// returned future will propagate the panic. That is, panics will not reach
90        /// `&self` and will be propagated to the returned future's `poll` method if
91        /// queried.
92        ///
93        /// If the returned future is dropped then `f` will be canceled, if
94        /// possible. That is, if the computation is in the middle of working, it
95        /// will be interrupted when possible.
96        fn spawn<F>(&self, future: F) -> SpawnHandle<F::Item, F::Error>
97            where F: Future,
98                  Self: Spawn<Spawned<F>>
99        {
100            use futures::sync::oneshot;
101            use std::panic::AssertUnwindSafe;
102            use std::sync::Arc;
103            use std::sync::atomic::AtomicBool;
104
105            let (tx, rx) = oneshot::channel();
106            let keep_running_flag = Arc::new(AtomicBool::new(false));
107
108            // AssertUnwindSafe is used here becuase `Send + 'static` is basically
109            // an alias for an implementation of the `UnwindSafe` trait but we can't
110            // express that in the standard library right now.
111            let sender = Spawned {
112                future: AssertUnwindSafe(future).catch_unwind(),
113                tx: Some(tx),
114                keep_running_flag: keep_running_flag.clone(),
115            };
116
117            // Spawn the future
118            self.spawn_detached(sender);
119
120            SpawnHandle {
121                inner: rx,
122                keep_running_flag: keep_running_flag,
123            }
124        }
125
126        /// Spawns a closure on this `Spawn`
127        ///
128        /// This function is a convenience wrapper around the `spawn` function above
129        /// for running a closure wrapped in `future::lazy`. It will spawn the
130        /// function `f` provided onto the thread pool, and continue to run the
131        /// future returned by `f` on the thread pool as well.
132        fn spawn_fn<F, R>(&self, f: F) -> SpawnHandle<R::Item, R::Error>
133            where F: FnOnce() -> R,
134                  R: IntoFuture,
135                  Self: Spawn<Spawned<Lazy<F, R>>>,
136        {
137            self.spawn(future::lazy(f))
138        }
139    }
140
141    impl<T> SpawnHelper for T {
142    }
143
144    impl<T, E> SpawnHandle<T, E> {
145        /// Drop this future without canceling the underlying future.
146        ///
147        /// When `SpawnHandle` is dropped, the spawned future will be dropped as
148        /// well. This function can be used when user wants to drop but keep
149        /// executing the underlying future.
150        pub fn detach(self) {
151            self.keep_running_flag.store(true, SeqCst);
152        }
153    }
154
155    impl<T, E> Future for SpawnHandle<T, E> {
156        type Item = T;
157        type Error = E;
158
159        fn poll(&mut self) -> Poll<T, E> {
160            match self.inner.poll().expect("shouldn't be canceled") {
161                Async::Ready(Ok(Ok(e))) => Ok(e.into()),
162                Async::Ready(Ok(Err(e))) => Err(e),
163                Async::Ready(Err(e)) => panic::resume_unwind(e),
164                Async::NotReady => Ok(Async::NotReady),
165            }
166        }
167    }
168
169    impl<F: Future> Future for Spawned<F> {
170        type Item = ();
171        type Error = ();
172
173        fn poll(&mut self) -> Poll<(), ()> {
174            if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel() {
175                if !self.keep_running_flag.load(SeqCst) {
176                    // Cancelled, bail out
177                    return Ok(().into())
178                }
179            }
180
181            let res = match self.future.poll() {
182                Ok(Async::Ready(e)) => Ok(e),
183                Ok(Async::NotReady) => return Ok(Async::NotReady),
184                Err(e) => Err(e),
185            };
186
187            self.tx.take().unwrap().complete(res);
188
189            Ok(Async::Ready(()))
190        }
191    }
192
193    impl<T: Future<Item = (), Error = ()> + Send + 'static> Spawn<T> for NewThread {
194        fn spawn_detached(&self, future: T) {
195            use std::thread;
196
197            thread::spawn(move || {
198                let _ = future.wait();
199            });
200        }
201    }
202
203    #[test]
204    fn test_new_thread() {
205        let new_thread = NewThread;
206        let res = new_thread.spawn_fn(|| Ok::<u32, ()>(1));
207
208        assert_eq!(1, res.wait().unwrap());
209    }
210}
211
212#[cfg(feature = "tokio")]
213mod tokio {
214    extern crate tokio_core;
215
216    use {Spawn};
217    use futures::Future;
218    use self::tokio_core::reactor::{Core, Handle, Remote};
219
220    impl<T: Future<Item = (), Error = ()> + 'static> Spawn<T> for Handle {
221        fn spawn_detached(&self, future: T) {
222            Handle::spawn(self, future);
223        }
224    }
225
226    impl<T: Future<Item = (), Error = ()> + 'static> Spawn<T> for Core {
227        fn spawn_detached(&self, future: T) {
228            self.handle().spawn_detached(future);
229        }
230    }
231
232    impl<T: Future<Item = (), Error = ()> + Send + 'static> Spawn<T> for Remote {
233        fn spawn_detached(&self, future: T) {
234            Remote::spawn(self, move |_| future);
235        }
236    }
237}