futures_util/compat/
executor.rs

1use super::{Compat, Future01CompatExt};
2use crate::{
3    future::{FutureExt, TryFutureExt, UnitError},
4    task::SpawnExt,
5};
6use futures_01::future::{ExecuteError as ExecuteError01, Executor as Executor01};
7use futures_01::Future as Future01;
8use futures_task::{FutureObj, Spawn as Spawn03, SpawnError as SpawnError03};
9
10/// A future that can run on a futures 0.1
11/// [`Executor`](futures_01::future::Executor).
12pub type Executor01Future = Compat<UnitError<FutureObj<'static, ()>>>;
13
14/// Extension trait for futures 0.1 [`Executor`](futures_01::future::Executor).
15pub trait Executor01CompatExt: Executor01<Executor01Future> + Clone + Send + 'static {
16    /// Converts a futures 0.1 [`Executor`](futures_01::future::Executor) into a
17    /// futures 0.3 [`Spawn`](futures_task::Spawn).
18    ///
19    /// ```
20    /// # if cfg!(miri) { return; } // Miri does not support epoll
21    /// use futures::task::SpawnExt;
22    /// use futures::future::{FutureExt, TryFutureExt};
23    /// use futures_util::compat::Executor01CompatExt;
24    /// use tokio::executor::DefaultExecutor;
25    ///
26    /// # let (tx, rx) = futures::channel::oneshot::channel();
27    ///
28    /// let spawner = DefaultExecutor::current().compat();
29    /// let future03 = async move {
30    ///     println!("Running on the pool");
31    ///     spawner.spawn(async {
32    ///         println!("Spawned!");
33    ///         # tx.send(42).unwrap();
34    ///     }).unwrap();
35    /// };
36    ///
37    /// let future01 = future03.unit_error().boxed().compat();
38    ///
39    /// tokio::run(future01);
40    /// # futures::executor::block_on(rx).unwrap();
41    /// ```
42    fn compat(self) -> Executor01As03<Self>
43    where
44        Self: Sized;
45}
46
47impl<Ex> Executor01CompatExt for Ex
48where
49    Ex: Executor01<Executor01Future> + Clone + Send + 'static,
50{
51    fn compat(self) -> Executor01As03<Self> {
52        Executor01As03 { executor01: self }
53    }
54}
55
56/// Converts a futures 0.1 [`Executor`](futures_01::future::Executor) into a
57/// futures 0.3 [`Spawn`](futures_task::Spawn).
58#[derive(Debug, Clone)]
59pub struct Executor01As03<Ex> {
60    executor01: Ex,
61}
62
63impl<Ex> Spawn03 for Executor01As03<Ex>
64where
65    Ex: Executor01<Executor01Future> + Clone + Send + 'static,
66{
67    fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError03> {
68        let future = future.unit_error().compat();
69
70        self.executor01.execute(future).map_err(|_| SpawnError03::shutdown())
71    }
72}
73
74#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
75impl<Sp, Fut> Executor01<Fut> for Compat<Sp>
76where
77    for<'a> &'a Sp: Spawn03,
78    Fut: Future01<Item = (), Error = ()> + Send + 'static,
79{
80    fn execute(&self, future: Fut) -> Result<(), ExecuteError01<Fut>> {
81        (&self.inner)
82            .spawn(future.compat().map(|_| ()))
83            .expect("unable to spawn future from Compat executor");
84        Ok(())
85    }
86}