tokio_compat/runtime/threadpool/mod.rs
1mod builder;
2mod task_executor;
3
4#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
5pub use self::task_executor::TaskExecutor;
6#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
7pub use builder::Builder;
8
9use super::{
10 compat::{self, CompatSpawner},
11 idle,
12};
13
14use futures_01::future::Future as Future01;
15use futures_util::{compat::Future01CompatExt, future::FutureExt};
16use std::{
17 fmt,
18 future::Future,
19 io,
20 sync::{Arc, RwLock},
21};
22use tokio_02::{
23 runtime::{self, Handle},
24 task::JoinHandle,
25};
26use tokio_executor_01 as executor_01;
27use tokio_reactor_01 as reactor_01;
28
29use tokio_timer_02 as timer_02;
30
31/// A thread pool runtime that can run tasks that use both `tokio` 0.1 and
32/// `tokio` 0.2 APIs.
33///
34/// This functions similarly to the [`tokio::runtime::Runtime`][rt] struct in the
35/// `tokio` crate. However, unlike that runtime, the `tokio-compat` runtime is
36/// capable of running both `std::future::Future` tasks that use `tokio` 0.2
37/// runtime services. and `futures` 0.1 tasks that use `tokio` 0.1 runtime
38/// services.
39///
40/// [rt]: https://docs.rs/tokio/0.2.4/tokio/runtime/struct.Runtime.html
41#[derive(Debug)]
42#[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))]
43pub struct Runtime {
44 /// The actual runtime. This is in an option so that it can be dropped when
45 /// shutting down.
46 inner: Option<Inner>,
47
48 /// Idleness tracking.
49 idle: idle::Idle,
50 idle_rx: idle::Rx,
51
52 // This should store the only long-living strong ref to the handle,
53 // and once the Runtime is dropped, it should also be deallocated.
54 compat_sender: Arc<RwLock<Option<CompatSpawner<tokio_02::runtime::Handle>>>>,
55}
56
57/// A future that resolves when the Tokio `Runtime` is shut down.
58#[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))]
59pub struct Shutdown {
60 inner: Box<dyn Future01<Item = (), Error = ()> + Send + Sync>,
61}
62
63#[derive(Debug)]
64#[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))]
65struct Inner {
66 runtime: runtime::Runtime,
67
68 /// Compatibility background thread.
69 ///
70 /// This maintains a `tokio` 0.1 timer and reactor to support running
71 /// futures that use older tokio APIs.
72 compat_bg: compat::Background,
73}
74
75// ===== impl Runtime =====
76
77/// Start the Tokio runtime using the supplied `futures` 0.1 future to bootstrap
78/// execution.
79///
80/// This function is used to bootstrap the execution of a Tokio application. It
81/// does the following:
82///
83/// * Start the Tokio runtime using a default configuration.
84/// * Spawn the given future onto the thread pool.
85/// * Block the current thread until the runtime shuts down.
86///
87/// Note that the function will not return immediately once `future` has
88/// completed. Instead it waits for the entire runtime to become idle.
89///
90/// See the [module level][mod] documentation for more details.
91///
92/// # Examples
93///
94/// ```rust
95/// use futures_01::{Future as Future01, Stream as Stream01};
96/// use tokio_01::net::TcpListener;
97///
98/// # fn process<T>(_: T) -> Box<dyn Future01<Item = (), Error = ()> + Send> {
99/// # unimplemented!();
100/// # }
101/// # fn dox() {
102/// # let addr = "127.0.0.1:8080".parse().unwrap();
103/// let listener = TcpListener::bind(&addr).unwrap();
104///
105/// let server = listener.incoming()
106/// .map_err(|e| println!("error = {:?}", e))
107/// .for_each(|socket| {
108/// tokio_01::spawn(process(socket))
109/// });
110///
111/// tokio_compat::run(server);
112/// # }
113/// # pub fn main() {}
114/// ```
115///
116/// # Panics
117///
118/// This function panics if called from the context of an executor.
119///
120/// [mod]: ../index.html
121#[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))]
122pub fn run<F>(future: F)
123where
124 F: Future01<Item = (), Error = ()> + Send + 'static,
125{
126 run_std(future.compat().map(|_| ()))
127}
128
129/// Start the Tokio runtime using the supplied `std::future` future to bootstrap
130/// execution.
131///
132/// This function is used to bootstrap the execution of a Tokio application. It
133/// does the following:
134///
135/// * Start the Tokio runtime using a default configuration.
136/// * Spawn the given future onto the thread pool.
137/// * Block the current thread until the runtime shuts down.
138///
139/// Note that the function will not return immediately once `future` has
140/// completed. Instead it waits for the entire runtime to become idle.
141///
142/// See the [module level][mod] documentation for more details.
143///
144/// # Examples
145///
146/// ```rust
147/// use futures_01::{Future as Future01, Stream as Stream01};
148/// use tokio_01::net::TcpListener;
149///
150/// # fn process<T>(_: T) -> Box<dyn Future01<Item = (), Error = ()> + Send> {
151/// # unimplemented!();
152/// # }
153/// # fn dox() {
154/// # let addr = "127.0.0.1:8080".parse().unwrap();
155/// let listener = TcpListener::bind(&addr).unwrap();
156///
157/// let server = listener.incoming()
158/// .map_err(|e| println!("error = {:?}", e))
159/// .for_each(|socket| {
160/// tokio_01::spawn(process(socket))
161/// });
162///
163/// tokio_compat::run(server);
164/// # }
165/// # pub fn main() {}
166/// ```
167///
168/// # Panics
169///
170/// This function panics if called from the context of an executor.
171///
172/// [mod]: ../index.html
173#[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))]
174pub fn run_std<F>(future: F)
175where
176 F: Future<Output = ()> + Send + 'static,
177{
178 let runtime = Runtime::new().expect("failed to start new Runtime");
179 runtime.spawn_std(future);
180 runtime.shutdown_on_idle().wait().unwrap();
181}
182
183impl Runtime {
184 /// Create a new runtime instance with default configuration values.
185 ///
186 /// This results in a reactor, thread pool, and timer being initialized. The
187 /// thread pool will not spawn any worker threads until it needs to, i.e.
188 /// tasks are scheduled to run.
189 ///
190 /// Most users will not need to call this function directly, instead they
191 /// will use [`tokio_compat::run`](fn.run.html).
192 ///
193 /// See [module level][mod] documentation for more details.
194 ///
195 /// # Examples
196 ///
197 /// Creating a new `Runtime` with default configuration values.
198 ///
199 /// ```
200 /// use tokio_compat::runtime::Runtime;
201 ///
202 /// let rt = Runtime::new()
203 /// .unwrap();
204 ///
205 /// // Use the runtime...
206 /// ```
207 ///
208 /// [mod]: index.html
209 pub fn new() -> io::Result<Self> {
210 Builder::new().build()
211 }
212
213 /// Return a handle to the runtime's executor.
214 ///
215 /// The returned handle can be used to spawn both `futures` 0.1 and
216 /// `std::future` tasks that run on this runtime.
217 ///
218 /// # Examples
219 ///
220 /// ```
221 /// use tokio_compat::runtime::Runtime;
222 ///
223 /// let rt = Runtime::new()
224 /// .unwrap();
225 ///
226 /// let executor_handle = rt.executor();
227 ///
228 /// // use `executor_handle`
229 /// ```
230 pub fn executor(&self) -> TaskExecutor {
231 let inner = self.spawner();
232 TaskExecutor { inner }
233 }
234
235 /// Spawn a `futures` 0.1 future onto the Tokio runtime.
236 ///
237 /// This spawns the given future onto the runtime's executor, usually a
238 /// thread pool. The thread pool is then responsible for polling the future
239 /// until it completes.
240 ///
241 /// See [module level][mod] documentation for more details.
242 ///
243 /// [mod]: index.html
244 ///
245 /// # Examples
246 ///
247 /// ```
248 /// use tokio_compat::runtime::Runtime;
249 /// use futures_01::future::Future;
250 ///
251 /// fn main() {
252 /// // Create the runtime
253 /// let rt = Runtime::new().unwrap();
254 ///
255 /// // Spawn a future onto the runtime
256 /// rt.spawn(futures_01::future::lazy(|| {
257 /// println!("now running on a worker thread");
258 /// Ok(())
259 /// }));
260 ///
261 /// rt.shutdown_on_idle()
262 /// .wait()
263 /// .unwrap();
264 /// }
265 /// ```
266 ///
267 /// # Panics
268 ///
269 /// This function panics if the spawn fails. Failure occurs if the executor
270 /// is currently at capacity and is unable to spawn a new future.
271 pub fn spawn<F>(&self, future: F) -> &Self
272 where
273 F: Future01<Item = (), Error = ()> + Send + 'static,
274 {
275 self.spawn_std(future.compat().map(|_| ()))
276 }
277
278 /// Spawn a `std::future` future onto the Tokio runtime.
279 ///
280 /// This spawns the given future onto the runtime's executor, usually a
281 /// thread pool. The thread pool is then responsible for polling the future
282 /// until it completes.
283 ///
284 /// See [module level][mod] documentation for more details.
285 ///
286 /// [mod]: index.html
287 ///
288 /// # Examples
289 ///
290 /// ```
291 /// use tokio_compat::runtime::Runtime;
292 /// use futures_01::future::Future;
293 ///
294 /// fn main() {
295 /// // Create the runtime
296 /// let rt = Runtime::new().unwrap();
297 ///
298 /// // Spawn a future onto the runtime
299 /// rt.spawn_std(async {
300 /// println!("now running on a worker thread");
301 /// });
302 ///
303 /// rt.shutdown_on_idle()
304 /// .wait()
305 /// .unwrap();
306 /// }
307 /// ```
308 ///
309 /// # Panics
310 ///
311 /// This function panics if the spawn fails. Failure occurs if the executor
312 /// is currently at capacity and is unable to spawn a new future.
313 pub fn spawn_std<F>(&self, future: F) -> &Self
314 where
315 F: Future<Output = ()> + Send + 'static,
316 {
317 let idle = self.idle.reserve();
318 self.inner().runtime.spawn(idle.with(future));
319 self
320 }
321
322 /// Spawn a `futures` 0.1 future onto the Tokio runtime, returning a
323 /// `JoinHandle` that can be used to await its result.
324 ///
325 /// This spawns the given future onto the runtime's executor, usually a
326 /// thread pool. The thread pool is then responsible for polling the future
327 /// until it completes.
328 ///
329 /// **Note** that futures spawned in this manner do not "count" towards
330 /// `shutdown_on_idle`, since they are paired with a `JoinHandle` for
331 /// awaiting their completion. See [here] for details on shutting down
332 /// the compatibility runtime.
333 ///
334 /// See [module level][mod] documentation for more details.
335 ///
336 /// [mod]: index.html
337 /// [here]: index.html#shutting-down
338 ///
339 /// # Examples
340 ///
341 /// ```
342 /// use tokio_compat::runtime::Runtime;
343 /// # fn dox() {
344 /// // Create the runtime
345 /// let rt = Runtime::new().unwrap();
346 /// let executor = rt.executor();
347 ///
348 /// // Spawn a `futures` 0.1 future onto the runtime
349 /// executor.spawn(futures_01::future::lazy(|| {
350 /// println!("now running on a worker thread");
351 /// Ok(())
352 /// }));
353 /// # }
354 /// ```
355 ///
356 /// # Panics
357 ///
358 /// This function panics if the spawn fails. Failure occurs if the executor
359 /// is currently at capacity and is unable to spawn a new future.
360 pub fn spawn_handle<F>(&self, future: F) -> JoinHandle<Result<F::Item, F::Error>>
361 where
362 F: Future01 + Send + 'static,
363 F::Item: Send + 'static,
364 F::Error: Send + 'static,
365 {
366 let future = Box::pin(future.compat());
367 self.spawn_handle_std(future)
368 }
369
370 /// Spawn a `std::future` future onto the Tokio runtime, returning a
371 /// `JoinHandle` that can be used to await its result.
372 ///
373 /// This spawns the given future onto the runtime's executor, usually a
374 /// thread pool. The thread pool is then responsible for polling the future
375 /// until it completes.
376 ///
377 /// **Note** that futures spawned in this manner do not "count" towards
378 /// `shutdown_on_idle`, since they are paired with a `JoinHandle` for
379 /// awaiting their completion. See [here] for details on shutting down
380 /// the compatibility runtime.
381 ///
382 /// See [module level][mod] documentation for more details.
383 ///
384 /// [mod]: index.html
385 /// [here]: index.html#shutting-down
386 ///
387 /// # Examples
388 ///
389 /// ```
390 /// use tokio_compat::runtime::Runtime;
391 ///
392 /// # fn dox() {
393 /// // Create the runtime
394 /// let rt = Runtime::new().unwrap();
395 /// let executor = rt.executor();
396 ///
397 /// // Spawn a `std::future` future onto the runtime
398 /// executor.spawn_std(async {
399 /// println!("now running on a worker thread");
400 /// });
401 /// # }
402 /// ```
403 pub fn spawn_handle_std<F>(&self, future: F) -> JoinHandle<F::Output>
404 where
405 F: Future + Send + 'static,
406 F::Output: Send + 'static,
407 {
408 self.inner().runtime.spawn(future)
409 }
410
411 /// Run a `futures` 0.1 future to completion on the Tokio runtime.
412 ///
413 /// This runs the given future on the runtime, blocking until it is
414 /// complete, and yielding its resolved result. Any tasks or timers which
415 /// the future spawns internally will be executed on the runtime.
416 ///
417 /// This method should not be called from an asynchronous context.
418 ///
419 /// # Panics
420 ///
421 /// This function panics if the executor is at capacity, if the provided
422 /// future panics, or if called within an asynchronous execution context.
423 pub fn block_on<F>(&mut self, future: F) -> Result<F::Item, F::Error>
424 where
425 F: Future01,
426 {
427 self.block_on_std(future.compat())
428 }
429
430 /// Run a `std::future` future to completion on the Tokio runtime.
431 ///
432 /// This runs the given future on the runtime, blocking until it is
433 /// complete, and yielding its resolved result. Any tasks or timers which
434 /// the future spawns internally will be executed on the runtime.
435 ///
436 /// This method should not be called from an asynchronous context.
437 ///
438 /// # Panics
439 ///
440 /// This function panics if the executor is at capacity, if the provided
441 /// future panics, or if called within an asynchronous execution context.
442 pub fn block_on_std<F>(&mut self, future: F) -> F::Output
443 where
444 F: Future,
445 {
446 let idle = self.idle.reserve();
447 let spawner = self.spawner();
448 let inner = self.inner_mut();
449 let compat = &inner.compat_bg;
450 let _timer = timer_02::timer::set_default(compat.timer());
451 let _reactor = reactor_01::set_default(compat.reactor());
452 let _executor = executor_01::set_default(spawner);
453 inner.runtime.block_on(idle.with(future))
454 }
455
456 /// Signals the runtime to shutdown once it becomes idle.
457 ///
458 /// Blocks the current thread until the shutdown operation has completed.
459 /// This function can be used to perform a graceful shutdown of the runtime.
460 ///
461 /// The runtime enters an idle state once **all** of the following occur.
462 ///
463 /// * The thread pool has no tasks to execute, i.e., all tasks that were
464 /// spawned have completed.
465 /// * The reactor is not managing any I/O resources.
466 ///
467 /// See [module level][mod] documentation for more details.
468 ///
469 /// **Note**: tasks spawned with associated [`JoinHandle`]s do _not_ "count"
470 /// towards `shutdown_on_idle`. Since `shutdown_on_idle` does not exist in
471 /// `tokio` 0.2, this is intended as a _transitional_ API; its use should be
472 /// phased out in favor of waiting on `JoinHandle`s.
473 ///
474 /// # Examples
475 ///
476 /// ```
477 /// use tokio_compat::runtime::Runtime;
478 /// use futures_01::future::Future;
479 ///
480 /// let rt = Runtime::new()
481 /// .unwrap();
482 ///
483 /// // Use the runtime...
484 /// # rt.spawn_std(async {});
485 ///
486 /// // Shutdown the runtime
487 /// rt.shutdown_on_idle()
488 /// .wait()
489 /// .unwrap();
490 /// ```
491 ///
492 /// [mod]: index.html
493 /// [`JoinHandle`]: https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html
494 pub fn shutdown_on_idle(mut self) -> Shutdown {
495 let spawner = self.spawner();
496 let Inner {
497 compat_bg,
498 mut runtime,
499 } = self.inner.take().expect("runtime is only shut down once");
500 let _timer = timer_02::timer::set_default(compat_bg.timer());
501 let _reactor = reactor_01::set_default(compat_bg.reactor());
502 let _executor = executor_01::set_default(spawner);
503 runtime.block_on(self.idle_rx.idle());
504 let f = futures_01::future::lazy(move || Ok(()));
505
506 Shutdown { inner: Box::new(f) }
507 }
508
509 /// Signals the runtime to shutdown immediately.
510 ///
511 /// Blocks the current thread until the shutdown operation has completed.
512 /// This function will forcibly shutdown the runtime, causing any
513 /// in-progress work to become canceled.
514 ///
515 /// The shutdown steps are:
516 ///
517 /// * Drain any scheduled work queues.
518 /// * Drop any futures that have not yet completed.
519 /// * Drop the reactor.
520 ///
521 /// Once the reactor has dropped, any outstanding I/O resources bound to
522 /// that reactor will no longer function. Calling any method on them will
523 /// result in an error.
524 ///
525 /// See [module level][mod] documentation for more details.
526 ///
527 /// # Examples
528 ///
529 /// ```
530 /// use tokio_compat::runtime::Runtime;
531 /// use futures_01::future::Future;
532 ///
533 /// let rt = Runtime::new()
534 /// .unwrap();
535 ///
536 /// // Use the runtime...
537 ///
538 /// // Shutdown the runtime
539 /// rt.shutdown_now()
540 /// .wait()
541 /// .unwrap();
542 /// ```
543 ///
544 /// [mod]: index.html
545 pub fn shutdown_now(mut self) -> Shutdown {
546 drop(self.inner.take().unwrap());
547 let f = futures_01::future::lazy(move || Ok(()));
548
549 Shutdown { inner: Box::new(f) }
550 }
551
552 fn spawner(&self) -> CompatSpawner<Handle> {
553 CompatSpawner {
554 inner: self.inner().runtime.handle().clone(),
555 idle: self.idle.clone(),
556 }
557 }
558
559 fn inner(&self) -> &Inner {
560 self.inner.as_ref().unwrap()
561 }
562
563 fn inner_mut(&mut self) -> &mut Inner {
564 self.inner.as_mut().unwrap()
565 }
566
567 /// Enter the runtime context
568 pub fn enter<F, R>(&self, f: F) -> R
569 where
570 F: FnOnce() -> R,
571 {
572 let spawner = self.spawner();
573 let inner = self.inner();
574 let compat = &inner.compat_bg;
575 let _timer = timer_02::timer::set_default(compat.timer());
576 let _reactor = reactor_01::set_default(compat.reactor());
577 let _executor = executor_01::set_default(spawner);
578 inner.runtime.enter(f)
579 }
580}
581
582impl Drop for Runtime {
583 fn drop(&mut self) {
584 if let Some(inner) = self.inner.take() {
585 drop(inner);
586 }
587 }
588}
589
590impl Future01 for Shutdown {
591 type Item = ();
592 type Error = ();
593 fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
594 self.inner.poll()
595 }
596}
597
598impl fmt::Debug for Shutdown {
599 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
600 f.pad("Shutdown { .. }")
601 }
602}