tokio_compat/runtime/threadpool/
mod.rs

1mod builder;
2mod task_executor;
3
4#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
5pub use self::task_executor::TaskExecutor;
6#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
7pub use builder::Builder;
8
9use super::{
10    compat::{self, CompatSpawner},
11    idle,
12};
13
14use futures_01::future::Future as Future01;
15use futures_util::{compat::Future01CompatExt, future::FutureExt};
16use std::{
17    fmt,
18    future::Future,
19    io,
20    sync::{Arc, RwLock},
21};
22use tokio_02::{
23    runtime::{self, Handle},
24    task::JoinHandle,
25};
26use tokio_executor_01 as executor_01;
27use tokio_reactor_01 as reactor_01;
28
29use tokio_timer_02 as timer_02;
30
31/// A thread pool runtime that can run tasks that use both `tokio` 0.1 and
32/// `tokio` 0.2 APIs.
33///
34/// This functions similarly to the [`tokio::runtime::Runtime`][rt] struct in the
35/// `tokio` crate. However, unlike that runtime, the `tokio-compat` runtime is
36/// capable of running both `std::future::Future` tasks that use `tokio` 0.2
37/// runtime services. and `futures` 0.1 tasks that use `tokio` 0.1 runtime
38/// services.
39///
40/// [rt]: https://docs.rs/tokio/0.2.4/tokio/runtime/struct.Runtime.html
41#[derive(Debug)]
42#[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))]
43pub struct Runtime {
44    /// The actual runtime. This is in an option so that it can be dropped when
45    /// shutting down.
46    inner: Option<Inner>,
47
48    /// Idleness tracking.
49    idle: idle::Idle,
50    idle_rx: idle::Rx,
51
52    // This should store the only long-living strong ref to the handle,
53    // and once the Runtime is dropped, it should also be deallocated.
54    compat_sender: Arc<RwLock<Option<CompatSpawner<tokio_02::runtime::Handle>>>>,
55}
56
57/// A future that resolves when the Tokio `Runtime` is shut down.
58#[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))]
59pub struct Shutdown {
60    inner: Box<dyn Future01<Item = (), Error = ()> + Send + Sync>,
61}
62
63#[derive(Debug)]
64#[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))]
65struct Inner {
66    runtime: runtime::Runtime,
67
68    /// Compatibility background thread.
69    ///
70    /// This maintains a `tokio` 0.1 timer and reactor to support running
71    /// futures that use older tokio APIs.
72    compat_bg: compat::Background,
73}
74
75// ===== impl Runtime =====
76
77/// Start the Tokio runtime using the supplied `futures` 0.1 future to bootstrap
78/// execution.
79///
80/// This function is used to bootstrap the execution of a Tokio application. It
81/// does the following:
82///
83/// * Start the Tokio runtime using a default configuration.
84/// * Spawn the given future onto the thread pool.
85/// * Block the current thread until the runtime shuts down.
86///
87/// Note that the function will not return immediately once `future` has
88/// completed. Instead it waits for the entire runtime to become idle.
89///
90/// See the [module level][mod] documentation for more details.
91///
92/// # Examples
93///
94/// ```rust
95/// use futures_01::{Future as Future01, Stream as Stream01};
96/// use tokio_01::net::TcpListener;
97///
98/// # fn process<T>(_: T) -> Box<dyn Future01<Item = (), Error = ()> + Send> {
99/// # unimplemented!();
100/// # }
101/// # fn dox() {
102/// # let addr = "127.0.0.1:8080".parse().unwrap();
103/// let listener = TcpListener::bind(&addr).unwrap();
104///
105/// let server = listener.incoming()
106///     .map_err(|e| println!("error = {:?}", e))
107///     .for_each(|socket| {
108///         tokio_01::spawn(process(socket))
109///     });
110///
111/// tokio_compat::run(server);
112/// # }
113/// # pub fn main() {}
114/// ```
115///
116/// # Panics
117///
118/// This function panics if called from the context of an executor.
119///
120/// [mod]: ../index.html
121#[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))]
122pub fn run<F>(future: F)
123where
124    F: Future01<Item = (), Error = ()> + Send + 'static,
125{
126    run_std(future.compat().map(|_| ()))
127}
128
129/// Start the Tokio runtime using the supplied `std::future` future to bootstrap
130/// execution.
131///
132/// This function is used to bootstrap the execution of a Tokio application. It
133/// does the following:
134///
135/// * Start the Tokio runtime using a default configuration.
136/// * Spawn the given future onto the thread pool.
137/// * Block the current thread until the runtime shuts down.
138///
139/// Note that the function will not return immediately once `future` has
140/// completed. Instead it waits for the entire runtime to become idle.
141///
142/// See the [module level][mod] documentation for more details.
143///
144/// # Examples
145///
146/// ```rust
147/// use futures_01::{Future as Future01, Stream as Stream01};
148/// use tokio_01::net::TcpListener;
149///
150/// # fn process<T>(_: T) -> Box<dyn Future01<Item = (), Error = ()> + Send> {
151/// # unimplemented!();
152/// # }
153/// # fn dox() {
154/// # let addr = "127.0.0.1:8080".parse().unwrap();
155/// let listener = TcpListener::bind(&addr).unwrap();
156///
157/// let server = listener.incoming()
158///     .map_err(|e| println!("error = {:?}", e))
159///     .for_each(|socket| {
160///         tokio_01::spawn(process(socket))
161///     });
162///
163/// tokio_compat::run(server);
164/// # }
165/// # pub fn main() {}
166/// ```
167///
168/// # Panics
169///
170/// This function panics if called from the context of an executor.
171///
172/// [mod]: ../index.html
173#[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))]
174pub fn run_std<F>(future: F)
175where
176    F: Future<Output = ()> + Send + 'static,
177{
178    let runtime = Runtime::new().expect("failed to start new Runtime");
179    runtime.spawn_std(future);
180    runtime.shutdown_on_idle().wait().unwrap();
181}
182
183impl Runtime {
184    /// Create a new runtime instance with default configuration values.
185    ///
186    /// This results in a reactor, thread pool, and timer being initialized. The
187    /// thread pool will not spawn any worker threads until it needs to, i.e.
188    /// tasks are scheduled to run.
189    ///
190    /// Most users will not need to call this function directly, instead they
191    /// will use [`tokio_compat::run`](fn.run.html).
192    ///
193    /// See [module level][mod] documentation for more details.
194    ///
195    /// # Examples
196    ///
197    /// Creating a new `Runtime` with default configuration values.
198    ///
199    /// ```
200    /// use tokio_compat::runtime::Runtime;
201    ///
202    /// let rt = Runtime::new()
203    ///     .unwrap();
204    ///
205    /// // Use the runtime...
206    /// ```
207    ///
208    /// [mod]: index.html
209    pub fn new() -> io::Result<Self> {
210        Builder::new().build()
211    }
212
213    /// Return a handle to the runtime's executor.
214    ///
215    /// The returned handle can be used to spawn both `futures` 0.1 and
216    /// `std::future` tasks that run on this runtime.
217    ///
218    /// # Examples
219    ///
220    /// ```
221    /// use tokio_compat::runtime::Runtime;
222    ///
223    /// let rt = Runtime::new()
224    ///     .unwrap();
225    ///
226    /// let executor_handle = rt.executor();
227    ///
228    /// // use `executor_handle`
229    /// ```
230    pub fn executor(&self) -> TaskExecutor {
231        let inner = self.spawner();
232        TaskExecutor { inner }
233    }
234
235    /// Spawn a `futures` 0.1 future onto the Tokio runtime.
236    ///
237    /// This spawns the given future onto the runtime's executor, usually a
238    /// thread pool. The thread pool is then responsible for polling the future
239    /// until it completes.
240    ///
241    /// See [module level][mod] documentation for more details.
242    ///
243    /// [mod]: index.html
244    ///
245    /// # Examples
246    ///
247    /// ```
248    /// use tokio_compat::runtime::Runtime;
249    /// use futures_01::future::Future;
250    ///
251    /// fn main() {
252    ///    // Create the runtime
253    ///    let rt = Runtime::new().unwrap();
254    ///
255    ///    // Spawn a future onto the runtime
256    ///    rt.spawn(futures_01::future::lazy(|| {
257    ///        println!("now running on a worker thread");
258    ///        Ok(())
259    ///    }));
260    ///
261    ///     rt.shutdown_on_idle()
262    ///         .wait()
263    ///         .unwrap();
264    /// }
265    /// ```
266    ///
267    /// # Panics
268    ///
269    /// This function panics if the spawn fails. Failure occurs if the executor
270    /// is currently at capacity and is unable to spawn a new future.
271    pub fn spawn<F>(&self, future: F) -> &Self
272    where
273        F: Future01<Item = (), Error = ()> + Send + 'static,
274    {
275        self.spawn_std(future.compat().map(|_| ()))
276    }
277
278    /// Spawn a `std::future` future onto the Tokio runtime.
279    ///
280    /// This spawns the given future onto the runtime's executor, usually a
281    /// thread pool. The thread pool is then responsible for polling the future
282    /// until it completes.
283    ///
284    /// See [module level][mod] documentation for more details.
285    ///
286    /// [mod]: index.html
287    ///
288    /// # Examples
289    ///
290    /// ```
291    /// use tokio_compat::runtime::Runtime;
292    /// use futures_01::future::Future;
293    ///
294    /// fn main() {
295    ///    // Create the runtime
296    ///    let rt = Runtime::new().unwrap();
297    ///
298    ///    // Spawn a future onto the runtime
299    ///    rt.spawn_std(async {
300    ///        println!("now running on a worker thread");
301    ///    });
302    ///
303    ///     rt.shutdown_on_idle()
304    ///         .wait()
305    ///         .unwrap();
306    /// }
307    /// ```
308    ///
309    /// # Panics
310    ///
311    /// This function panics if the spawn fails. Failure occurs if the executor
312    /// is currently at capacity and is unable to spawn a new future.
313    pub fn spawn_std<F>(&self, future: F) -> &Self
314    where
315        F: Future<Output = ()> + Send + 'static,
316    {
317        let idle = self.idle.reserve();
318        self.inner().runtime.spawn(idle.with(future));
319        self
320    }
321
322    /// Spawn a `futures` 0.1 future onto the Tokio runtime, returning a
323    /// `JoinHandle` that can be used to await its result.
324    ///
325    /// This spawns the given future onto the runtime's executor, usually a
326    /// thread pool. The thread pool is then responsible for polling the future
327    /// until it completes.
328    ///
329    /// **Note** that futures spawned in this manner do not "count" towards
330    /// `shutdown_on_idle`, since they are paired with a `JoinHandle` for
331    /// awaiting their completion. See [here] for details on shutting down
332    /// the compatibility runtime.
333    ///
334    /// See [module level][mod] documentation for more details.
335    ///
336    /// [mod]: index.html
337    /// [here]: index.html#shutting-down
338    ///
339    /// # Examples
340    ///
341    /// ```
342    /// use tokio_compat::runtime::Runtime;
343    /// # fn dox() {
344    /// // Create the runtime
345    /// let rt = Runtime::new().unwrap();
346    /// let executor = rt.executor();
347    ///
348    /// // Spawn a `futures` 0.1 future onto the runtime
349    /// executor.spawn(futures_01::future::lazy(|| {
350    ///     println!("now running on a worker thread");
351    ///     Ok(())
352    /// }));
353    /// # }
354    /// ```
355    ///
356    /// # Panics
357    ///
358    /// This function panics if the spawn fails. Failure occurs if the executor
359    /// is currently at capacity and is unable to spawn a new future.
360    pub fn spawn_handle<F>(&self, future: F) -> JoinHandle<Result<F::Item, F::Error>>
361    where
362        F: Future01 + Send + 'static,
363        F::Item: Send + 'static,
364        F::Error: Send + 'static,
365    {
366        let future = Box::pin(future.compat());
367        self.spawn_handle_std(future)
368    }
369
370    /// Spawn a `std::future` future onto the Tokio runtime, returning a
371    /// `JoinHandle` that can be used to await its result.
372    ///
373    /// This spawns the given future onto the runtime's executor, usually a
374    /// thread pool. The thread pool is then responsible for polling the future
375    /// until it completes.
376    ///
377    /// **Note** that futures spawned in this manner do not "count" towards
378    /// `shutdown_on_idle`, since they are paired with a `JoinHandle` for
379    /// awaiting their completion. See [here] for details on shutting down
380    /// the compatibility runtime.
381    ///
382    /// See [module level][mod] documentation for more details.
383    ///
384    /// [mod]: index.html
385    /// [here]: index.html#shutting-down
386    ///
387    /// # Examples
388    ///
389    /// ```
390    /// use tokio_compat::runtime::Runtime;
391    ///
392    /// # fn dox() {
393    /// // Create the runtime
394    /// let rt = Runtime::new().unwrap();
395    /// let executor = rt.executor();
396    ///
397    /// // Spawn a `std::future` future onto the runtime
398    /// executor.spawn_std(async {
399    ///     println!("now running on a worker thread");
400    /// });
401    /// # }
402    /// ```
403    pub fn spawn_handle_std<F>(&self, future: F) -> JoinHandle<F::Output>
404    where
405        F: Future + Send + 'static,
406        F::Output: Send + 'static,
407    {
408        self.inner().runtime.spawn(future)
409    }
410
411    /// Run a `futures` 0.1 future to completion on the Tokio runtime.
412    ///
413    /// This runs the given future on the runtime, blocking until it is
414    /// complete, and yielding its resolved result. Any tasks or timers which
415    /// the future spawns internally will be executed on the runtime.
416    ///
417    /// This method should not be called from an asynchronous context.
418    ///
419    /// # Panics
420    ///
421    /// This function panics if the executor is at capacity, if the provided
422    /// future panics, or if called within an asynchronous execution context.
423    pub fn block_on<F>(&mut self, future: F) -> Result<F::Item, F::Error>
424    where
425        F: Future01,
426    {
427        self.block_on_std(future.compat())
428    }
429
430    /// Run a `std::future` future to completion on the Tokio runtime.
431    ///
432    /// This runs the given future on the runtime, blocking until it is
433    /// complete, and yielding its resolved result. Any tasks or timers which
434    /// the future spawns internally will be executed on the runtime.
435    ///
436    /// This method should not be called from an asynchronous context.
437    ///
438    /// # Panics
439    ///
440    /// This function panics if the executor is at capacity, if the provided
441    /// future panics, or if called within an asynchronous execution context.
442    pub fn block_on_std<F>(&mut self, future: F) -> F::Output
443    where
444        F: Future,
445    {
446        let idle = self.idle.reserve();
447        let spawner = self.spawner();
448        let inner = self.inner_mut();
449        let compat = &inner.compat_bg;
450        let _timer = timer_02::timer::set_default(compat.timer());
451        let _reactor = reactor_01::set_default(compat.reactor());
452        let _executor = executor_01::set_default(spawner);
453        inner.runtime.block_on(idle.with(future))
454    }
455
456    /// Signals the runtime to shutdown once it becomes idle.
457    ///
458    /// Blocks the current thread until the shutdown operation has completed.
459    /// This function can be used to perform a graceful shutdown of the runtime.
460    ///
461    /// The runtime enters an idle state once **all** of the following occur.
462    ///
463    /// * The thread pool has no tasks to execute, i.e., all tasks that were
464    ///   spawned have completed.
465    /// * The reactor is not managing any I/O resources.
466    ///
467    /// See [module level][mod] documentation for more details.
468    ///
469    /// **Note**: tasks spawned with associated [`JoinHandle`]s do _not_ "count"
470    /// towards `shutdown_on_idle`. Since `shutdown_on_idle` does not exist in
471    /// `tokio` 0.2, this is intended as a _transitional_ API; its use should be
472    /// phased out in favor of waiting on `JoinHandle`s.
473    ///
474    /// # Examples
475    ///
476    /// ```
477    /// use tokio_compat::runtime::Runtime;
478    /// use futures_01::future::Future;
479    ///
480    /// let rt = Runtime::new()
481    ///     .unwrap();
482    ///
483    /// // Use the runtime...
484    /// # rt.spawn_std(async {});
485    ///
486    /// // Shutdown the runtime
487    /// rt.shutdown_on_idle()
488    ///     .wait()
489    ///     .unwrap();
490    /// ```
491    ///
492    /// [mod]: index.html
493    /// [`JoinHandle`]: https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html
494    pub fn shutdown_on_idle(mut self) -> Shutdown {
495        let spawner = self.spawner();
496        let Inner {
497            compat_bg,
498            mut runtime,
499        } = self.inner.take().expect("runtime is only shut down once");
500        let _timer = timer_02::timer::set_default(compat_bg.timer());
501        let _reactor = reactor_01::set_default(compat_bg.reactor());
502        let _executor = executor_01::set_default(spawner);
503        runtime.block_on(self.idle_rx.idle());
504        let f = futures_01::future::lazy(move || Ok(()));
505
506        Shutdown { inner: Box::new(f) }
507    }
508
509    /// Signals the runtime to shutdown immediately.
510    ///
511    /// Blocks the current thread until the shutdown operation has completed.
512    /// This function will forcibly shutdown the runtime, causing any
513    /// in-progress work to become canceled.
514    ///
515    /// The shutdown steps are:
516    ///
517    /// * Drain any scheduled work queues.
518    /// * Drop any futures that have not yet completed.
519    /// * Drop the reactor.
520    ///
521    /// Once the reactor has dropped, any outstanding I/O resources bound to
522    /// that reactor will no longer function. Calling any method on them will
523    /// result in an error.
524    ///
525    /// See [module level][mod] documentation for more details.
526    ///
527    /// # Examples
528    ///
529    /// ```
530    /// use tokio_compat::runtime::Runtime;
531    /// use futures_01::future::Future;
532    ///
533    /// let rt = Runtime::new()
534    ///     .unwrap();
535    ///
536    /// // Use the runtime...
537    ///
538    /// // Shutdown the runtime
539    /// rt.shutdown_now()
540    ///     .wait()
541    ///     .unwrap();
542    /// ```
543    ///
544    /// [mod]: index.html
545    pub fn shutdown_now(mut self) -> Shutdown {
546        drop(self.inner.take().unwrap());
547        let f = futures_01::future::lazy(move || Ok(()));
548
549        Shutdown { inner: Box::new(f) }
550    }
551
552    fn spawner(&self) -> CompatSpawner<Handle> {
553        CompatSpawner {
554            inner: self.inner().runtime.handle().clone(),
555            idle: self.idle.clone(),
556        }
557    }
558
559    fn inner(&self) -> &Inner {
560        self.inner.as_ref().unwrap()
561    }
562
563    fn inner_mut(&mut self) -> &mut Inner {
564        self.inner.as_mut().unwrap()
565    }
566
567    /// Enter the runtime context
568    pub fn enter<F, R>(&self, f: F) -> R
569    where
570        F: FnOnce() -> R,
571    {
572        let spawner = self.spawner();
573        let inner = self.inner();
574        let compat = &inner.compat_bg;
575        let _timer = timer_02::timer::set_default(compat.timer());
576        let _reactor = reactor_01::set_default(compat.reactor());
577        let _executor = executor_01::set_default(spawner);
578        inner.runtime.enter(f)
579    }
580}
581
582impl Drop for Runtime {
583    fn drop(&mut self) {
584        if let Some(inner) = self.inner.take() {
585            drop(inner);
586        }
587    }
588}
589
590impl Future01 for Shutdown {
591    type Item = ();
592    type Error = ();
593    fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
594        self.inner.poll()
595    }
596}
597
598impl fmt::Debug for Shutdown {
599    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
600        f.pad("Shutdown { .. }")
601    }
602}