tokio_compat/runtime/threadpool/task_executor.rs
1use tokio_02::runtime::Handle;
2use tokio_02::task::JoinHandle;
3use tokio_executor_01::{self as executor_01, Executor as Executor01};
4
5use futures_01::future::{self as future_01, Future as Future01};
6use futures_util::{compat::Future01CompatExt, future::FutureExt};
7use std::future::Future;
8
9/// Executes futures on the runtime
10///
11/// All futures spawned using this executor will be submitted to the associated
12/// Runtime's executor. This executor is usually a thread pool.
13///
14/// For more details, see the [module level](index.html) documentation.
15#[derive(Debug, Clone)]
16#[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))]
17pub struct TaskExecutor {
18 pub(super) inner: super::compat::CompatSpawner<Handle>,
19}
20
21impl TaskExecutor {
22 /// Spawn a `futures` 0.1 future onto the Tokio runtime.
23 ///
24 /// This spawns the given future onto the runtime's executor, usually a
25 /// thread pool. The thread pool is then responsible for polling the future
26 /// until it completes.
27 ///
28 /// See [module level][mod] documentation for more details.
29 ///
30 /// [mod]: index.html
31 ///
32 /// # Examples
33 ///
34 /// ```
35 /// use tokio_compat::runtime::Runtime;
36 /// # fn dox() {
37 /// // Create the runtime
38 /// let rt = Runtime::new().unwrap();
39 /// let executor = rt.executor();
40 ///
41 /// // Spawn a `futures` 0.1 future onto the runtime
42 /// executor.spawn(futures_01::future::lazy(|| {
43 /// println!("now running on a worker thread");
44 /// Ok(())
45 /// }));
46 /// # }
47 /// ```
48 pub fn spawn<F>(&self, future: F)
49 where
50 F: Future01<Item = (), Error = ()> + Send + 'static,
51 {
52 let future = Box::pin(future.compat().map(|_| ()));
53 self.spawn_std(future)
54 }
55
56 /// Spawn a `std::future` future onto the Tokio runtime.
57 ///
58 /// This spawns the given future onto the runtime's executor, usually a
59 /// thread pool. The thread pool is then responsible for polling the future
60 /// until it completes.
61 ///
62 /// See [module level][mod] documentation for more details.
63 ///
64 /// [mod]: index.html
65 ///
66 /// # Examples
67 ///
68 /// ```
69 /// use tokio_compat::runtime::Runtime;
70 ///
71 /// # fn dox() {
72 /// // Create the runtime
73 /// let rt = Runtime::new().unwrap();
74 /// let executor = rt.executor();
75 ///
76 /// // Spawn a `std::future` future onto the runtime
77 /// executor.spawn_std(async {
78 /// println!("now running on a worker thread");
79 /// });
80 /// # }
81 /// ```
82 pub fn spawn_std<F>(&self, future: F)
83 where
84 F: Future<Output = ()> + Send + 'static,
85 {
86 let idle = self.inner.idle.reserve();
87 let _ = self.inner.inner.spawn(idle.with(future));
88 }
89
90 /// Spawn a `futures` 0.1 future onto the Tokio runtime, returning a
91 /// `JoinHandle` that can be used to await its result.
92 ///
93 /// This spawns the given future onto the runtime's executor, usually a
94 /// thread pool. The thread pool is then responsible for polling the future
95 /// until it completes.
96 ///
97 /// **Note** that futures spawned in this manner do not "count" towards
98 /// keeping the runtime active for [`shutdown_on_idle`], since they are paired
99 /// with a `JoinHandle` for awaiting their completion. See [here] for
100 /// details on shutting down the compatibility runtime.
101 ///
102 /// [mod]: index.html
103 /// [`shutdown_on_idle`]: struct.Runtime.html#method.shutdown_on_idle
104 /// [here]: index.html#shutting-down
105 ///
106 /// # Examples
107 ///
108 /// ```
109 /// use tokio_compat::runtime::Runtime;
110 /// # fn dox() {
111 /// // Create the runtime
112 /// let rt = Runtime::new().unwrap();
113 /// let executor = rt.executor();
114 ///
115 /// // Spawn a `futures` 0.1 future onto the runtime
116 /// executor.spawn(futures_01::future::lazy(|| {
117 /// println!("now running on a worker thread");
118 /// Ok(())
119 /// }));
120 /// # }
121 /// ```
122 pub fn spawn_handle<F>(&self, future: F) -> JoinHandle<Result<F::Item, F::Error>>
123 where
124 F: Future01 + Send + 'static,
125 F::Item: Send + 'static,
126 F::Error: Send + 'static,
127 {
128 let future = Box::pin(future.compat());
129 self.spawn_handle_std(future)
130 }
131
132 /// Spawn a `std::future` future onto the Tokio runtime, returning a
133 /// `JoinHandle` that can be used to await its result.
134 ///
135 /// This spawns the given future onto the runtime's executor, usually a
136 /// thread pool. The thread pool is then responsible for polling the future
137 /// until it completes.
138 ///
139 /// See [module level][mod] documentation for more details.
140 ///
141 /// **Note** that futures spawned in this manner do not "count" towards
142 /// keeping the runtime active for [`shutdown_on_idle`], since they are paired
143 /// with a `JoinHandle` for awaiting their completion. See [here] for
144 /// details on shutting down the compatibility runtime.
145 ///
146 /// [mod]: index.html
147 /// [`shutdown_on_idle`]: struct.Runtime.html#method.shutdown_on_idle
148 /// [here]: index.html#shutting-down
149 ///
150 /// # Examples
151 ///
152 /// ```
153 /// use tokio_compat::runtime::Runtime;
154 ///
155 /// # fn dox() {
156 /// // Create the runtime
157 /// let rt = Runtime::new().unwrap();
158 /// let executor = rt.executor();
159 ///
160 /// // Spawn a `std::future` future onto the runtime
161 /// executor.spawn_std(async {
162 /// println!("now running on a worker thread");
163 /// });
164 /// # }
165 /// ```
166 ///
167 /// # Panics
168 ///
169 /// This function panics if the spawn fails. Failure occurs if the executor
170 /// is currently at capacity and is unable to spawn a new future.
171 pub fn spawn_handle_std<F>(&self, future: F) -> JoinHandle<F::Output>
172 where
173 F: Future + Send + 'static,
174 F::Output: Send + 'static,
175 {
176 self.inner.inner.spawn(future)
177 }
178}
179
180impl<T> future_01::Executor<T> for TaskExecutor
181where
182 T: Future01<Item = (), Error = ()> + Send + 'static,
183{
184 fn execute(&self, future: T) -> Result<(), future_01::ExecuteError<T>> {
185 self.spawn(future);
186 Ok(())
187 }
188}
189
190impl Executor01 for TaskExecutor {
191 fn spawn(
192 &mut self,
193 future: Box<dyn Future01<Item = (), Error = ()> + Send>,
194 ) -> Result<(), executor_01::SpawnError> {
195 Executor01::spawn(&mut self.inner, future)
196 }
197}
198
199impl<T> executor_01::TypedExecutor<T> for TaskExecutor
200where
201 T: Future01<Item = (), Error = ()> + Send + 'static,
202{
203 fn spawn(&mut self, future: T) -> Result<(), executor_01::SpawnError> {
204 executor_01::TypedExecutor::spawn(&mut self.inner, future)
205 }
206}