1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
//! An abstraction for spawning futures
//!
//! [futures-rs](http://github.com/alexcrichton/futures-rs) provides a task
//! abstraction and the ability for custom executors to manage how future
//! execution is scheduled across them.
//!
//! `futures-spawn` provides an abstraction representing the act of spawning a
//! future. This enables writing code that is not hard coded to a specific
//! executor.

#![deny(warnings, missing_docs)]

extern crate futures;

use futures::{Future};

/// Value that can spawn a future
///
/// On spawn, the executor takes ownership of the future and becomes responsible
/// to call `Future::poll()` whenever a readiness notification is raised.
pub trait Spawn<T: Future<Item = (), Error = ()>> {

    /// Spawns a future to run on this `Spawn`.
    ///
    /// This function will return immediately, and schedule the future `f` to
    /// run on `self`. The details of scheduling and execution are left to the
    /// implementations of `Spawn`.
    fn spawn_detached(&self, f: T);
}

#[cfg(feature = "use_std")]
pub use with_std::{NewThread, SpawnHandle, Spawned, SpawnHelper};

#[cfg(feature = "use_std")]
mod with_std {
    use {Spawn};
    use futures::{Future, IntoFuture, Poll, Async};
    use futures::future::{self, CatchUnwind, Lazy};
    use futures::sync::oneshot;

    use std::{thread};
    use std::panic::{self, AssertUnwindSafe};
    use std::sync::Arc;
    use std::sync::atomic::AtomicBool;
    use std::sync::atomic::Ordering::SeqCst;

    /// The type of future returned from the `Spawn::spawn` function, which
    /// proxies the futures running on the thread pool.
    ///
    /// This future will resolve in the same way as the underlying future, and it
    /// will propagate panics.
    #[must_use]
    pub struct SpawnHandle<T, E> {
        inner: oneshot::Receiver<thread::Result<Result<T, E>>>,
        keep_running_flag: Arc<AtomicBool>,
    }

    /// Contains a future that was spawned
    pub struct Spawned<F: Future> {
        future: CatchUnwind<AssertUnwindSafe<F>>,
        tx: Option<oneshot::Sender<thread::Result<Result<F::Item, F::Error>>>>,
        keep_running_flag: Arc<AtomicBool>,
    }

    /// Spawn all futures on a new thread
    ///
    /// This is the most basic `Spawn` implementation. Each call to `spawn` results
    /// in a new thread dedicated to processing the given future to completion.
    pub struct NewThread;

    /// Additional strategies for spawning a future.
    ///
    /// These functions have to be on a separate trait vs. on the `Spawn` trait
    /// in order to make rustc happy.
    pub trait SpawnHelper {
        /// Spawns a future to run on this `Spawn`, returning a future representing
        /// the produced value.
        ///
        ///
        /// This function will return immediately, and schedule the future `f` to
        /// run on `self`. The details of scheduling and execution are left to the
        /// implementations of `Spawn`. The returned future serves as a proxy to the
        /// computation that `F` is running.
        ///
        /// To simply run an arbitrary closure and extract the result, you can use
        /// the `future::lazy` combinator to defer work to executing on `&self`.
        ///
        /// Note that if the future `f` panics it will be caught by default and the
        /// returned future will propagate the panic. That is, panics will not reach
        /// `&self` and will be propagated to the returned future's `poll` method if
        /// queried.
        ///
        /// If the returned future is dropped then `f` will be canceled, if
        /// possible. That is, if the computation is in the middle of working, it
        /// will be interrupted when possible.
        fn spawn<F>(&self, future: F) -> SpawnHandle<F::Item, F::Error>
            where F: Future,
                  Self: Spawn<Spawned<F>>
        {
            use futures::sync::oneshot;
            use std::panic::AssertUnwindSafe;
            use std::sync::Arc;
            use std::sync::atomic::AtomicBool;

            let (tx, rx) = oneshot::channel();
            let keep_running_flag = Arc::new(AtomicBool::new(false));

            // AssertUnwindSafe is used here becuase `Send + 'static` is basically
            // an alias for an implementation of the `UnwindSafe` trait but we can't
            // express that in the standard library right now.
            let sender = Spawned {
                future: AssertUnwindSafe(future).catch_unwind(),
                tx: Some(tx),
                keep_running_flag: keep_running_flag.clone(),
            };

            // Spawn the future
            self.spawn_detached(sender);

            SpawnHandle {
                inner: rx,
                keep_running_flag: keep_running_flag,
            }
        }

        /// Spawns a closure on this `Spawn`
        ///
        /// This function is a convenience wrapper around the `spawn` function above
        /// for running a closure wrapped in `future::lazy`. It will spawn the
        /// function `f` provided onto the thread pool, and continue to run the
        /// future returned by `f` on the thread pool as well.
        fn spawn_fn<F, R>(&self, f: F) -> SpawnHandle<R::Item, R::Error>
            where F: FnOnce() -> R,
                  R: IntoFuture,
                  Self: Spawn<Spawned<Lazy<F, R>>>,
        {
            self.spawn(future::lazy(f))
        }
    }

    impl<T> SpawnHelper for T {
    }

    impl<T, E> SpawnHandle<T, E> {
        /// Drop this future without canceling the underlying future.
        ///
        /// When `SpawnHandle` is dropped, the spawned future will be dropped as
        /// well. This function can be used when user wants to drop but keep
        /// executing the underlying future.
        pub fn detach(self) {
            self.keep_running_flag.store(true, SeqCst);
        }
    }

    impl<T, E> Future for SpawnHandle<T, E> {
        type Item = T;
        type Error = E;

        fn poll(&mut self) -> Poll<T, E> {
            match self.inner.poll().expect("shouldn't be canceled") {
                Async::Ready(Ok(Ok(e))) => Ok(e.into()),
                Async::Ready(Ok(Err(e))) => Err(e),
                Async::Ready(Err(e)) => panic::resume_unwind(e),
                Async::NotReady => Ok(Async::NotReady),
            }
        }
    }

    impl<F: Future> Future for Spawned<F> {
        type Item = ();
        type Error = ();

        fn poll(&mut self) -> Poll<(), ()> {
            if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel() {
                if !self.keep_running_flag.load(SeqCst) {
                    // Cancelled, bail out
                    return Ok(().into())
                }
            }

            let res = match self.future.poll() {
                Ok(Async::Ready(e)) => Ok(e),
                Ok(Async::NotReady) => return Ok(Async::NotReady),
                Err(e) => Err(e),
            };

            self.tx.take().unwrap().complete(res);

            Ok(Async::Ready(()))
        }
    }

    impl<T: Future<Item = (), Error = ()> + Send + 'static> Spawn<T> for NewThread {
        fn spawn_detached(&self, future: T) {
            use std::thread;

            thread::spawn(move || {
                let _ = future.wait();
            });
        }
    }

    #[test]
    fn test_new_thread() {
        let new_thread = NewThread;
        let res = new_thread.spawn_fn(|| Ok::<u32, ()>(1));

        assert_eq!(1, res.wait().unwrap());
    }
}

#[cfg(feature = "tokio")]
mod tokio {
    extern crate tokio_core;

    use {Spawn};
    use futures::Future;
    use self::tokio_core::reactor::{Core, Handle, Remote};

    impl<T: Future<Item = (), Error = ()> + 'static> Spawn<T> for Handle {
        fn spawn_detached(&self, future: T) {
            Handle::spawn(self, future);
        }
    }

    impl<T: Future<Item = (), Error = ()> + 'static> Spawn<T> for Core {
        fn spawn_detached(&self, future: T) {
            self.handle().spawn_detached(future);
        }
    }

    impl<T: Future<Item = (), Error = ()> + Send + 'static> Spawn<T> for Remote {
        fn spawn_detached(&self, future: T) {
            Remote::spawn(self, move |_| future);
        }
    }
}