tokio_compat/runtime/threadpool/
task_executor.rs

1use tokio_02::runtime::Handle;
2use tokio_02::task::JoinHandle;
3use tokio_executor_01::{self as executor_01, Executor as Executor01};
4
5use futures_01::future::{self as future_01, Future as Future01};
6use futures_util::{compat::Future01CompatExt, future::FutureExt};
7use std::future::Future;
8
9/// Executes futures on the runtime
10///
11/// All futures spawned using this executor will be submitted to the associated
12/// Runtime's executor. This executor is usually a thread pool.
13///
14/// For more details, see the [module level](index.html) documentation.
15#[derive(Debug, Clone)]
16#[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))]
17pub struct TaskExecutor {
18    pub(super) inner: super::compat::CompatSpawner<Handle>,
19}
20
21impl TaskExecutor {
22    /// Spawn a `futures` 0.1 future onto the Tokio runtime.
23    ///
24    /// This spawns the given future onto the runtime's executor, usually a
25    /// thread pool. The thread pool is then responsible for polling the future
26    /// until it completes.
27    ///
28    /// See [module level][mod] documentation for more details.
29    ///
30    /// [mod]: index.html
31    ///
32    /// # Examples
33    ///
34    /// ```
35    /// use tokio_compat::runtime::Runtime;
36    /// # fn dox() {
37    /// // Create the runtime
38    /// let rt = Runtime::new().unwrap();
39    /// let executor = rt.executor();
40    ///
41    /// // Spawn a `futures` 0.1 future onto the runtime
42    /// executor.spawn(futures_01::future::lazy(|| {
43    ///     println!("now running on a worker thread");
44    ///     Ok(())
45    /// }));
46    /// # }
47    /// ```
48    pub fn spawn<F>(&self, future: F)
49    where
50        F: Future01<Item = (), Error = ()> + Send + 'static,
51    {
52        let future = Box::pin(future.compat().map(|_| ()));
53        self.spawn_std(future)
54    }
55
56    /// Spawn a `std::future` future onto the Tokio runtime.
57    ///
58    /// This spawns the given future onto the runtime's executor, usually a
59    /// thread pool. The thread pool is then responsible for polling the future
60    /// until it completes.
61    ///
62    /// See [module level][mod] documentation for more details.
63    ///
64    /// [mod]: index.html
65    ///
66    /// # Examples
67    ///
68    /// ```
69    /// use tokio_compat::runtime::Runtime;
70    ///
71    /// # fn dox() {
72    /// // Create the runtime
73    /// let rt = Runtime::new().unwrap();
74    /// let executor = rt.executor();
75    ///
76    /// // Spawn a `std::future` future onto the runtime
77    /// executor.spawn_std(async {
78    ///     println!("now running on a worker thread");
79    /// });
80    /// # }
81    /// ```
82    pub fn spawn_std<F>(&self, future: F)
83    where
84        F: Future<Output = ()> + Send + 'static,
85    {
86        let idle = self.inner.idle.reserve();
87        let _ = self.inner.inner.spawn(idle.with(future));
88    }
89
90    /// Spawn a `futures` 0.1 future onto the Tokio runtime, returning a
91    /// `JoinHandle` that can be used to await its result.
92    ///
93    /// This spawns the given future onto the runtime's executor, usually a
94    /// thread pool. The thread pool is then responsible for polling the future
95    /// until it completes.
96    ///
97    /// **Note** that futures spawned in this manner do not "count" towards
98    /// keeping the runtime active for [`shutdown_on_idle`], since they are paired
99    /// with a `JoinHandle` for  awaiting their completion. See [here] for
100    /// details on shutting down the compatibility runtime.
101    ///
102    /// [mod]: index.html
103    /// [`shutdown_on_idle`]: struct.Runtime.html#method.shutdown_on_idle
104    /// [here]: index.html#shutting-down
105    ///
106    /// # Examples
107    ///
108    /// ```
109    /// use tokio_compat::runtime::Runtime;
110    /// # fn dox() {
111    /// // Create the runtime
112    /// let rt = Runtime::new().unwrap();
113    /// let executor = rt.executor();
114    ///
115    /// // Spawn a `futures` 0.1 future onto the runtime
116    /// executor.spawn(futures_01::future::lazy(|| {
117    ///     println!("now running on a worker thread");
118    ///     Ok(())
119    /// }));
120    /// # }
121    /// ```
122    pub fn spawn_handle<F>(&self, future: F) -> JoinHandle<Result<F::Item, F::Error>>
123    where
124        F: Future01 + Send + 'static,
125        F::Item: Send + 'static,
126        F::Error: Send + 'static,
127    {
128        let future = Box::pin(future.compat());
129        self.spawn_handle_std(future)
130    }
131
132    /// Spawn a `std::future` future onto the Tokio runtime, returning a
133    /// `JoinHandle` that can be used to await its result.
134    ///
135    /// This spawns the given future onto the runtime's executor, usually a
136    /// thread pool. The thread pool is then responsible for polling the future
137    /// until it completes.
138    ///
139    /// See [module level][mod] documentation for more details.
140    ///
141    /// **Note** that futures spawned in this manner do not "count" towards
142    /// keeping the runtime active for [`shutdown_on_idle`], since they are paired
143    /// with a `JoinHandle` for  awaiting their completion. See [here] for
144    /// details on shutting down the compatibility runtime.
145    ///
146    /// [mod]: index.html
147    /// [`shutdown_on_idle`]: struct.Runtime.html#method.shutdown_on_idle
148    /// [here]: index.html#shutting-down
149    ///
150    /// # Examples
151    ///
152    /// ```
153    /// use tokio_compat::runtime::Runtime;
154    ///
155    /// # fn dox() {
156    /// // Create the runtime
157    /// let rt = Runtime::new().unwrap();
158    /// let executor = rt.executor();
159    ///
160    /// // Spawn a `std::future` future onto the runtime
161    /// executor.spawn_std(async {
162    ///     println!("now running on a worker thread");
163    /// });
164    /// # }
165    /// ```
166    ///
167    /// # Panics
168    ///
169    /// This function panics if the spawn fails. Failure occurs if the executor
170    /// is currently at capacity and is unable to spawn a new future.
171    pub fn spawn_handle_std<F>(&self, future: F) -> JoinHandle<F::Output>
172    where
173        F: Future + Send + 'static,
174        F::Output: Send + 'static,
175    {
176        self.inner.inner.spawn(future)
177    }
178}
179
180impl<T> future_01::Executor<T> for TaskExecutor
181where
182    T: Future01<Item = (), Error = ()> + Send + 'static,
183{
184    fn execute(&self, future: T) -> Result<(), future_01::ExecuteError<T>> {
185        self.spawn(future);
186        Ok(())
187    }
188}
189
190impl Executor01 for TaskExecutor {
191    fn spawn(
192        &mut self,
193        future: Box<dyn Future01<Item = (), Error = ()> + Send>,
194    ) -> Result<(), executor_01::SpawnError> {
195        Executor01::spawn(&mut self.inner, future)
196    }
197}
198
199impl<T> executor_01::TypedExecutor<T> for TaskExecutor
200where
201    T: Future01<Item = (), Error = ()> + Send + 'static,
202{
203    fn spawn(&mut self, future: T) -> Result<(), executor_01::SpawnError> {
204        executor_01::TypedExecutor::spawn(&mut self.inner, future)
205    }
206}