tokio_util/
context.rs

1//! Tokio context aware futures utilities.
2//!
3//! This module includes utilities around integrating tokio with other runtimes
4//! by allowing the context to be attached to futures. This allows spawning
5//! futures on other executors while still using tokio to drive them. This
6//! can be useful if you need to use a tokio based library in an executor/runtime
7//! that does not provide a tokio context.
8
9use pin_project_lite::pin_project;
10use std::{
11    future::Future,
12    pin::Pin,
13    task::{Context, Poll},
14};
15use tokio::runtime::{Handle, Runtime};
16
17pin_project! {
18    /// `TokioContext` allows running futures that must be inside Tokio's
19    /// context on a non-Tokio runtime.
20    ///
21    /// It contains a [`Handle`] to the runtime. A handle to the runtime can be
22    /// obtain by calling the [`Runtime::handle()`] method.
23    ///
24    /// Note that the `TokioContext` wrapper only works if the `Runtime` it is
25    /// connected to has not yet been destroyed. You must keep the `Runtime`
26    /// alive until the future has finished executing.
27    ///
28    /// **Warning:** If `TokioContext` is used together with a [current thread]
29    /// runtime, that runtime must be inside a call to `block_on` for the
30    /// wrapped future to work. For this reason, it is recommended to use a
31    /// [multi thread] runtime, even if you configure it to only spawn one
32    /// worker thread.
33    ///
34    /// # Examples
35    ///
36    /// This example creates two runtimes, but only [enables time] on one of
37    /// them. It then uses the context of the runtime with the timer enabled to
38    /// execute a [`sleep`] future on the runtime with timing disabled.
39    /// ```
40    /// # #[cfg(not(target_family = "wasm"))]
41    /// # {
42    /// use tokio::time::{sleep, Duration};
43    /// use tokio_util::context::RuntimeExt;
44    ///
45    /// // This runtime has timers enabled.
46    /// let rt = tokio::runtime::Builder::new_multi_thread()
47    ///     .enable_all()
48    ///     .build()
49    ///     .unwrap();
50    ///
51    /// // This runtime has timers disabled.
52    /// let rt2 = tokio::runtime::Builder::new_multi_thread()
53    ///     .build()
54    ///     .unwrap();
55    ///
56    /// // Wrap the sleep future in the context of rt.
57    /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await });
58    ///
59    /// // Execute the future on rt2.
60    /// rt2.block_on(fut);
61    /// # }
62    /// ```
63    ///
64    /// [`Handle`]: struct@tokio::runtime::Handle
65    /// [`Runtime::handle()`]: fn@tokio::runtime::Runtime::handle
66    /// [`RuntimeExt`]: trait@crate::context::RuntimeExt
67    /// [`new_static`]: fn@Self::new_static
68    /// [`sleep`]: fn@tokio::time::sleep
69    /// [current thread]: fn@tokio::runtime::Builder::new_current_thread
70    /// [enables time]: fn@tokio::runtime::Builder::enable_time
71    /// [multi thread]: fn@tokio::runtime::Builder::new_multi_thread
72    pub struct TokioContext<F> {
73        #[pin]
74        inner: F,
75        handle: Handle,
76    }
77}
78
79impl<F> TokioContext<F> {
80    /// Associate the provided future with the context of the runtime behind
81    /// the provided `Handle`.
82    ///
83    /// This constructor uses a `'static` lifetime to opt-out of checking that
84    /// the runtime still exists.
85    ///
86    /// # Examples
87    ///
88    /// This is the same as the example above, but uses the `new` constructor
89    /// rather than [`RuntimeExt::wrap`].
90    ///
91    /// [`RuntimeExt::wrap`]: fn@RuntimeExt::wrap
92    ///
93    /// ```
94    /// # #[cfg(not(target_family = "wasm"))]
95    /// # {
96    /// use tokio::time::{sleep, Duration};
97    /// use tokio_util::context::TokioContext;
98    ///
99    /// // This runtime has timers enabled.
100    /// let rt = tokio::runtime::Builder::new_multi_thread()
101    ///     .enable_all()
102    ///     .build()
103    ///     .unwrap();
104    ///
105    /// // This runtime has timers disabled.
106    /// let rt2 = tokio::runtime::Builder::new_multi_thread()
107    ///     .build()
108    ///     .unwrap();
109    ///
110    /// let fut = TokioContext::new(
111    ///     async { sleep(Duration::from_millis(2)).await },
112    ///     rt.handle().clone(),
113    /// );
114    ///
115    /// // Execute the future on rt2.
116    /// rt2.block_on(fut);
117    /// # }
118    /// ```
119    pub fn new(future: F, handle: Handle) -> TokioContext<F> {
120        TokioContext {
121            inner: future,
122            handle,
123        }
124    }
125
126    /// Obtain a reference to the handle inside this `TokioContext`.
127    pub fn handle(&self) -> &Handle {
128        &self.handle
129    }
130
131    /// Remove the association between the Tokio runtime and the wrapped future.
132    pub fn into_inner(self) -> F {
133        self.inner
134    }
135}
136
137impl<F: Future> Future for TokioContext<F> {
138    type Output = F::Output;
139
140    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
141        let me = self.project();
142        let handle = me.handle;
143        let fut = me.inner;
144
145        let _enter = handle.enter();
146        fut.poll(cx)
147    }
148}
149
150/// Extension trait that simplifies bundling a `Handle` with a `Future`.
151pub trait RuntimeExt {
152    /// Create a [`TokioContext`] that wraps the provided future and runs it in
153    /// this runtime's context.
154    ///
155    /// # Examples
156    ///
157    /// This example creates two runtimes, but only [enables time] on one of
158    /// them. It then uses the context of the runtime with the timer enabled to
159    /// execute a [`sleep`] future on the runtime with timing disabled.
160    ///
161    /// ```
162    /// # #[cfg(not(target_family = "wasm"))]
163    /// # {
164    /// use tokio::time::{sleep, Duration};
165    /// use tokio_util::context::RuntimeExt;
166    ///
167    /// // This runtime has timers enabled.
168    /// let rt = tokio::runtime::Builder::new_multi_thread()
169    ///     .enable_all()
170    ///     .build()
171    ///     .unwrap();
172    ///
173    /// // This runtime has timers disabled.
174    /// let rt2 = tokio::runtime::Builder::new_multi_thread()
175    ///     .build()
176    ///     .unwrap();
177    ///
178    /// // Wrap the sleep future in the context of rt.
179    /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await });
180    ///
181    /// // Execute the future on rt2.
182    /// rt2.block_on(fut);
183    /// # }
184    /// ```
185    ///
186    /// [`TokioContext`]: struct@crate::context::TokioContext
187    /// [`sleep`]: fn@tokio::time::sleep
188    /// [enables time]: fn@tokio::runtime::Builder::enable_time
189    fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>;
190}
191
192impl RuntimeExt for Runtime {
193    fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> {
194        TokioContext {
195            inner: fut,
196            handle: self.handle().clone(),
197        }
198    }
199}