futures_executor/
spawn.rs

1use futures_core::{Future, Async, Poll};
2use futures_core::never::Never;
3use futures_core::task::{self, Context};
4use futures_channel::oneshot::{channel, Sender, Receiver};
5use futures_util::FutureExt;
6
7use std::thread;
8use std::sync::Arc;
9use std::sync::atomic::Ordering;
10use std::panic::{self, AssertUnwindSafe};
11use std::sync::atomic::AtomicBool;
12
13/// A future representing the completion of task spawning.
14///
15/// See [`spawn`](spawn()) for details.
16#[derive(Debug)]
17pub struct Spawn<F>(Option<F>);
18
19/// Spawn a task onto the default executor.
20///
21/// This function returns a future that will spawn the given future as a task
22/// onto the default executor. It does *not* provide any way to wait on task
23/// completion or extract a value from the task. That can either be done through
24/// a channel, or by using [`spawn_with_handle`](::spawn_with_handle).
25pub fn spawn<F>(f: F) -> Spawn<F>
26    where F: Future<Item = (), Error = Never> + 'static + Send
27{
28    Spawn(Some(f))
29}
30
31impl<F: Future<Item = (), Error = Never> + Send + 'static> Future for Spawn<F> {
32    type Item = ();
33    type Error = Never;
34    fn poll(&mut self, cx: &mut Context) -> Poll<(), Never> {
35        cx.spawn(self.0.take().unwrap());
36        Ok(Async::Ready(()))
37    }
38}
39
40/// A future representing the completion of task spawning, yielding a
41/// [`JoinHandle`](::JoinHandle) to the spawned task.
42///
43/// See [`spawn_with_handle`](::spawn_with_handle) for details.
44#[derive(Debug)]
45pub struct SpawnWithHandle<F>(Option<F>);
46
47/// Spawn a task onto the default executor, yielding a
48/// [`JoinHandle`](::JoinHandle) to the spawned task.
49///
50/// This function returns a future that will spawn the given future as a task
51/// onto the default executor. On completion, that future will yield a
52/// [`JoinHandle`](::JoinHandle) that can itself be used as a future
53/// representing the completion of the spawned task.
54///
55/// # Examples
56///
57/// ```
58/// # extern crate futures;
59/// #
60/// use futures::prelude::*;
61/// use futures::future;
62/// use futures::executor::{block_on, spawn_with_handle};
63///
64/// # fn main() {
65/// # fn inner() -> Result<(), Never> {
66/// # Ok({
67/// let future = future::ok::<u32, Never>(1);
68/// let join_handle = block_on(spawn_with_handle(future))?;
69/// let result = block_on(join_handle);
70/// assert_eq!(result, Ok(1));
71/// # })
72/// # }
73/// # inner().unwrap();
74/// # }
75/// ```
76///
77/// ```
78/// # extern crate futures;
79/// #
80/// use futures::prelude::*;
81/// use futures::future;
82/// use futures::executor::{block_on, spawn_with_handle};
83///
84/// # fn main() {
85/// # fn inner() -> Result<(), Never> {
86/// # Ok({
87/// let future = future::err::<Never, &str>("boom");
88/// let join_handle = block_on(spawn_with_handle(future))?;
89/// let result = block_on(join_handle);
90/// assert_eq!(result, Err("boom"));
91/// # })
92/// # }
93/// # inner().unwrap();
94/// # }
95/// ```
96pub fn spawn_with_handle<F>(f: F) -> SpawnWithHandle<F>
97    where F: Future + 'static + Send, F::Item: Send, F::Error: Send
98{
99    SpawnWithHandle(Some(f))
100}
101
102impl<F> Future for SpawnWithHandle<F>
103    where F: Future + Send + 'static,
104          F::Item: Send,
105          F::Error: Send,
106{
107    type Item = JoinHandle<F::Item, F::Error>;
108    type Error = Never;
109    fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Never> {
110        let (tx, rx) = channel();
111        let keep_running_flag = Arc::new(AtomicBool::new(false));
112        // AssertUnwindSafe is used here because `Send + 'static` is basically
113        // an alias for an implementation of the `UnwindSafe` trait but we can't
114        // express that in the standard library right now.
115        let sender = MySender {
116            fut: AssertUnwindSafe(self.0.take().unwrap()).catch_unwind(),
117            tx: Some(tx),
118            keep_running_flag: keep_running_flag.clone(),
119        };
120
121        cx.spawn(sender);
122        Ok(Async::Ready(JoinHandle {
123            inner: rx ,
124            keep_running_flag: keep_running_flag.clone()
125        }))
126    }
127}
128
129struct MySender<F, T> {
130    fut: F,
131    tx: Option<Sender<T>>,
132    keep_running_flag: Arc<AtomicBool>,
133}
134
135/// The type of future returned from the `ThreadPool::spawn` function, which
136/// proxies the futures running on the thread pool.
137///
138/// This future will resolve in the same way as the underlying future, and it
139/// will propagate panics.
140#[must_use]
141#[derive(Debug)]
142pub struct JoinHandle<T, E> {
143    inner: Receiver<thread::Result<Result<T, E>>>,
144    keep_running_flag: Arc<AtomicBool>,
145}
146
147impl<T, E> JoinHandle<T, E> {
148    /// Drop this handle *without* canceling the underlying future.
149    ///
150    /// When `JoinHandle` is dropped, `ThreadPool` will try to abort the associated
151    /// task. This function can be used when you want to drop the handle but keep
152    /// executing the task.
153    pub fn forget(self) {
154        self.keep_running_flag.store(true, Ordering::SeqCst);
155    }
156}
157
158impl<T: Send + 'static, E: Send + 'static> Future for JoinHandle<T, E> {
159    type Item = T;
160    type Error = E;
161
162    fn poll(&mut self, cx: &mut task::Context) -> Poll<T, E> {
163        match self.inner.poll(cx).expect("cannot poll JoinHandle twice") {
164            Async::Ready(Ok(Ok(e))) => Ok(e.into()),
165            Async::Ready(Ok(Err(e))) => Err(e),
166            Async::Ready(Err(e)) => panic::resume_unwind(e),
167            Async::Pending => Ok(Async::Pending),
168        }
169    }
170}
171
172impl<F: Future> Future for MySender<F, Result<F::Item, F::Error>> {
173    type Item = ();
174    type Error = Never;
175
176    fn poll(&mut self, cx: &mut task::Context) -> Poll<(), Never> {
177        if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel(cx) {
178            if !self.keep_running_flag.load(Ordering::SeqCst) {
179                // Cancelled, bail out
180                return Ok(().into())
181            }
182        }
183
184        let res = match self.fut.poll(cx) {
185            Ok(Async::Ready(e)) => Ok(e),
186            Ok(Async::Pending) => return Ok(Async::Pending),
187            Err(e) => Err(e),
188        };
189
190        // if the receiving end has gone away then that's ok, we just ignore the
191        // send error here.
192        drop(self.tx.take().unwrap().send(res));
193        Ok(Async::Ready(()))
194    }
195}