near_async/
futures.rs

1use futures::FutureExt;
2pub use futures::future::BoxFuture; // pub for macros
3use near_time::Duration;
4use std::sync::Arc;
5
6/// Abstraction for something that can drive futures.
7///
8/// Rust futures don't run by itself. It needs a driver to execute it. This can
9/// for example be a thread, a thread pool, or a TokioRuntimeHandle. This trait abstracts over
10/// the execution mechanism.
11///
12/// The reason why we need an abstraction is (1) we can intercept the future
13/// spawning to add additional instrumentation (2) we can support driving the
14/// future with TestLoop for testing.
15pub trait FutureSpawner: Send + Sync {
16    fn spawn_boxed(&self, description: &'static str, f: BoxFuture<'static, ()>);
17}
18
19pub trait FutureSpawnerExt {
20    fn spawn<F>(&self, description: &'static str, f: F)
21    where
22        F: futures::Future<Output = ()> + Send + 'static;
23}
24
25impl<T: FutureSpawner> FutureSpawnerExt for T {
26    fn spawn<F>(&self, description: &'static str, f: F)
27    where
28        F: futures::Future<Output = ()> + Send + 'static,
29    {
30        self.spawn_boxed(description, f.boxed());
31    }
32}
33
34impl FutureSpawnerExt for dyn FutureSpawner + '_ {
35    fn spawn<F>(&self, description: &'static str, f: F)
36    where
37        F: futures::Future<Output = ()> + Send + 'static,
38    {
39        self.spawn_boxed(description, f.boxed());
40    }
41}
42
43/// Given a future, respawn it as an equivalent future but which does not block the
44/// driver of the future. For example, if the given future directly performs
45/// computation, normally the whoever drives the future (such as a buffered_unordered)
46/// would be blocked by the computation, thereby not allowing computation of other
47/// futures driven by the same driver to proceed. This function respawns the future
48/// onto the FutureSpawner, so the driver of the returned future would not be blocked.
49pub fn respawn_for_parallelism<T: Send + 'static>(
50    future_spawner: &dyn FutureSpawner,
51    name: &'static str,
52    f: impl std::future::Future<Output = T> + Send + 'static,
53) -> impl std::future::Future<Output = T> + Send + 'static {
54    let (sender, receiver) = tokio::sync::oneshot::channel();
55    future_spawner.spawn(name, async move {
56        sender.send(f.await).ok();
57    });
58    async move { receiver.await.unwrap() }
59}
60
61/// A FutureSpawner that gives futures to a tokio Runtime, possibly supporting
62/// multiple threads.
63pub struct TokioRuntimeFutureSpawner(pub Arc<tokio::runtime::Runtime>);
64
65impl FutureSpawner for TokioRuntimeFutureSpawner {
66    fn spawn_boxed(&self, _description: &'static str, f: BoxFuture<'static, ()>) {
67        self.0.spawn(f);
68    }
69}
70
71/// A FutureSpawner that directly spawns using tokio::spawn. Use only for tests.
72/// This is undesirable in production code because it bypasses any tracking functionality.
73pub struct DirectTokioFutureSpawnerForTest;
74
75impl FutureSpawner for DirectTokioFutureSpawnerForTest {
76    fn spawn_boxed(&self, _description: &'static str, f: BoxFuture<'static, ()>) {
77        tokio::spawn(f);
78    }
79}
80
81/// Abstraction for something that can schedule something to run after.
82/// This isn't the same as just delaying a closure. Rather, it has the
83/// additional power of providing the closure a mutable reference to some
84/// object. For the tokio actor, the object (of type `T`) would be the actor,
85/// and the `DelayedActionRunner<T>` is implemented by `TokioRuntimeHandle<T>`.
86pub trait DelayedActionRunner<T> {
87    fn run_later_boxed(
88        &mut self,
89        name: &'static str,
90        dur: Duration,
91        f: Box<dyn FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static>,
92    );
93}
94
95pub trait DelayedActionRunnerExt<T> {
96    fn run_later(
97        &mut self,
98        name: &'static str,
99        dur: Duration,
100        f: impl FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static,
101    );
102}
103
104impl<T, Runner> DelayedActionRunnerExt<T> for Runner
105where
106    Runner: DelayedActionRunner<T>,
107{
108    fn run_later(
109        &mut self,
110        name: &'static str,
111        dur: Duration,
112        f: impl FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static,
113    ) {
114        self.run_later_boxed(name, dur, Box::new(f));
115    }
116}
117
118impl<T> DelayedActionRunnerExt<T> for dyn DelayedActionRunner<T> + '_ {
119    fn run_later(
120        &mut self,
121        name: &'static str,
122        dur: Duration,
123        f: impl FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static,
124    ) {
125        self.run_later_boxed(name, dur, Box::new(f));
126    }
127}
128
129/// Like `FutureSpawner`, but intended for spawning asynchronous computation-heavy tasks.
130/// Rather than taking a future, it takes a function. For production, the function shall
131/// be run on a separate thread (like `rayon::spawn`), but for test, it would run the
132/// function as a TestLoop event, possibly with an artificial delay.
133pub trait AsyncComputationSpawner: Send + Sync {
134    fn spawn_boxed(&self, name: &str, f: Box<dyn FnOnce() + Send>);
135}
136
137pub trait AsyncComputationSpawnerExt {
138    fn spawn(&self, name: &str, f: impl FnOnce() + Send + 'static);
139}
140
141impl<T: AsyncComputationSpawner> AsyncComputationSpawnerExt for T {
142    fn spawn(&self, name: &str, f: impl FnOnce() + Send + 'static) {
143        self.spawn_boxed(name, Box::new(f));
144    }
145}
146
147impl AsyncComputationSpawnerExt for dyn AsyncComputationSpawner + '_ {
148    fn spawn(&self, name: &str, f: impl FnOnce() + Send + 'static) {
149        self.spawn_boxed(name, Box::new(f));
150    }
151}
152
153pub struct StdThreadAsyncComputationSpawnerForTest;
154
155impl AsyncComputationSpawner for StdThreadAsyncComputationSpawnerForTest {
156    fn spawn_boxed(&self, _name: &str, f: Box<dyn FnOnce() + Send>) {
157        std::thread::spawn(f);
158    }
159}