futures_spawn/lib.rs
1//! An abstraction for spawning futures
2//!
3//! [futures-rs](http://github.com/alexcrichton/futures-rs) provides a task
4//! abstraction and the ability for custom executors to manage how future
5//! execution is scheduled across them.
6//!
7//! `futures-spawn` provides an abstraction representing the act of spawning a
8//! future. This enables writing code that is not hard coded to a specific
9//! executor.
10
11#![deny(warnings, missing_docs)]
12
13extern crate futures;
14
15use futures::{Future};
16
17/// Value that can spawn a future
18///
19/// On spawn, the executor takes ownership of the future and becomes responsible
20/// to call `Future::poll()` whenever a readiness notification is raised.
21pub trait Spawn<T: Future<Item = (), Error = ()>> {
22
23 /// Spawns a future to run on this `Spawn`.
24 ///
25 /// This function will return immediately, and schedule the future `f` to
26 /// run on `self`. The details of scheduling and execution are left to the
27 /// implementations of `Spawn`.
28 fn spawn_detached(&self, f: T);
29}
30
31#[cfg(feature = "use_std")]
32pub use with_std::{NewThread, SpawnHandle, Spawned, SpawnHelper};
33
34#[cfg(feature = "use_std")]
35mod with_std {
36 use {Spawn};
37 use futures::{Future, IntoFuture, Poll, Async};
38 use futures::future::{self, CatchUnwind, Lazy};
39 use futures::sync::oneshot;
40
41 use std::{thread};
42 use std::panic::{self, AssertUnwindSafe};
43 use std::sync::Arc;
44 use std::sync::atomic::AtomicBool;
45 use std::sync::atomic::Ordering::SeqCst;
46
47 /// The type of future returned from the `Spawn::spawn` function, which
48 /// proxies the futures running on the thread pool.
49 ///
50 /// This future will resolve in the same way as the underlying future, and it
51 /// will propagate panics.
52 #[must_use]
53 pub struct SpawnHandle<T, E> {
54 inner: oneshot::Receiver<thread::Result<Result<T, E>>>,
55 keep_running_flag: Arc<AtomicBool>,
56 }
57
58 /// Contains a future that was spawned
59 pub struct Spawned<F: Future> {
60 future: CatchUnwind<AssertUnwindSafe<F>>,
61 tx: Option<oneshot::Sender<thread::Result<Result<F::Item, F::Error>>>>,
62 keep_running_flag: Arc<AtomicBool>,
63 }
64
65 /// Spawn all futures on a new thread
66 ///
67 /// This is the most basic `Spawn` implementation. Each call to `spawn` results
68 /// in a new thread dedicated to processing the given future to completion.
69 pub struct NewThread;
70
71 /// Additional strategies for spawning a future.
72 ///
73 /// These functions have to be on a separate trait vs. on the `Spawn` trait
74 /// in order to make rustc happy.
75 pub trait SpawnHelper {
76 /// Spawns a future to run on this `Spawn`, returning a future representing
77 /// the produced value.
78 ///
79 ///
80 /// This function will return immediately, and schedule the future `f` to
81 /// run on `self`. The details of scheduling and execution are left to the
82 /// implementations of `Spawn`. The returned future serves as a proxy to the
83 /// computation that `F` is running.
84 ///
85 /// To simply run an arbitrary closure and extract the result, you can use
86 /// the `future::lazy` combinator to defer work to executing on `&self`.
87 ///
88 /// Note that if the future `f` panics it will be caught by default and the
89 /// returned future will propagate the panic. That is, panics will not reach
90 /// `&self` and will be propagated to the returned future's `poll` method if
91 /// queried.
92 ///
93 /// If the returned future is dropped then `f` will be canceled, if
94 /// possible. That is, if the computation is in the middle of working, it
95 /// will be interrupted when possible.
96 fn spawn<F>(&self, future: F) -> SpawnHandle<F::Item, F::Error>
97 where F: Future,
98 Self: Spawn<Spawned<F>>
99 {
100 use futures::sync::oneshot;
101 use std::panic::AssertUnwindSafe;
102 use std::sync::Arc;
103 use std::sync::atomic::AtomicBool;
104
105 let (tx, rx) = oneshot::channel();
106 let keep_running_flag = Arc::new(AtomicBool::new(false));
107
108 // AssertUnwindSafe is used here becuase `Send + 'static` is basically
109 // an alias for an implementation of the `UnwindSafe` trait but we can't
110 // express that in the standard library right now.
111 let sender = Spawned {
112 future: AssertUnwindSafe(future).catch_unwind(),
113 tx: Some(tx),
114 keep_running_flag: keep_running_flag.clone(),
115 };
116
117 // Spawn the future
118 self.spawn_detached(sender);
119
120 SpawnHandle {
121 inner: rx,
122 keep_running_flag: keep_running_flag,
123 }
124 }
125
126 /// Spawns a closure on this `Spawn`
127 ///
128 /// This function is a convenience wrapper around the `spawn` function above
129 /// for running a closure wrapped in `future::lazy`. It will spawn the
130 /// function `f` provided onto the thread pool, and continue to run the
131 /// future returned by `f` on the thread pool as well.
132 fn spawn_fn<F, R>(&self, f: F) -> SpawnHandle<R::Item, R::Error>
133 where F: FnOnce() -> R,
134 R: IntoFuture,
135 Self: Spawn<Spawned<Lazy<F, R>>>,
136 {
137 self.spawn(future::lazy(f))
138 }
139 }
140
141 impl<T> SpawnHelper for T {
142 }
143
144 impl<T, E> SpawnHandle<T, E> {
145 /// Drop this future without canceling the underlying future.
146 ///
147 /// When `SpawnHandle` is dropped, the spawned future will be dropped as
148 /// well. This function can be used when user wants to drop but keep
149 /// executing the underlying future.
150 pub fn detach(self) {
151 self.keep_running_flag.store(true, SeqCst);
152 }
153 }
154
155 impl<T, E> Future for SpawnHandle<T, E> {
156 type Item = T;
157 type Error = E;
158
159 fn poll(&mut self) -> Poll<T, E> {
160 match self.inner.poll().expect("shouldn't be canceled") {
161 Async::Ready(Ok(Ok(e))) => Ok(e.into()),
162 Async::Ready(Ok(Err(e))) => Err(e),
163 Async::Ready(Err(e)) => panic::resume_unwind(e),
164 Async::NotReady => Ok(Async::NotReady),
165 }
166 }
167 }
168
169 impl<F: Future> Future for Spawned<F> {
170 type Item = ();
171 type Error = ();
172
173 fn poll(&mut self) -> Poll<(), ()> {
174 if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel() {
175 if !self.keep_running_flag.load(SeqCst) {
176 // Cancelled, bail out
177 return Ok(().into())
178 }
179 }
180
181 let res = match self.future.poll() {
182 Ok(Async::Ready(e)) => Ok(e),
183 Ok(Async::NotReady) => return Ok(Async::NotReady),
184 Err(e) => Err(e),
185 };
186
187 self.tx.take().unwrap().complete(res);
188
189 Ok(Async::Ready(()))
190 }
191 }
192
193 impl<T: Future<Item = (), Error = ()> + Send + 'static> Spawn<T> for NewThread {
194 fn spawn_detached(&self, future: T) {
195 use std::thread;
196
197 thread::spawn(move || {
198 let _ = future.wait();
199 });
200 }
201 }
202
203 #[test]
204 fn test_new_thread() {
205 let new_thread = NewThread;
206 let res = new_thread.spawn_fn(|| Ok::<u32, ()>(1));
207
208 assert_eq!(1, res.wait().unwrap());
209 }
210}
211
212#[cfg(feature = "tokio")]
213mod tokio {
214 extern crate tokio_core;
215
216 use {Spawn};
217 use futures::Future;
218 use self::tokio_core::reactor::{Core, Handle, Remote};
219
220 impl<T: Future<Item = (), Error = ()> + 'static> Spawn<T> for Handle {
221 fn spawn_detached(&self, future: T) {
222 Handle::spawn(self, future);
223 }
224 }
225
226 impl<T: Future<Item = (), Error = ()> + 'static> Spawn<T> for Core {
227 fn spawn_detached(&self, future: T) {
228 self.handle().spawn_detached(future);
229 }
230 }
231
232 impl<T: Future<Item = (), Error = ()> + Send + 'static> Spawn<T> for Remote {
233 fn spawn_detached(&self, future: T) {
234 Remote::spawn(self, move |_| future);
235 }
236 }
237}