near_async/
futures.rs

1use actix::Actor;
2use futures::FutureExt;
3pub use futures::future::BoxFuture; // pub for macros
4use near_time::Duration;
5use std::ops::DerefMut;
6use std::sync::Arc;
7
8/// Abstraction for something that can drive futures.
9///
10/// Rust futures don't run by itself. It needs a driver to execute it. This can
11/// for example be a thread, a thread pool, or Actix. This trait abstracts over
12/// the execution mechanism.
13///
14/// The reason why we need an abstraction is (1) we can intercept the future
15/// spawning to add additional instrumentation (2) we can support driving the
16/// future with TestLoop for testing.
17pub trait FutureSpawner: Send + Sync {
18    fn spawn_boxed(&self, description: &'static str, f: BoxFuture<'static, ()>);
19}
20
21pub trait FutureSpawnerExt {
22    fn spawn<F>(&self, description: &'static str, f: F)
23    where
24        F: futures::Future<Output = ()> + Send + 'static;
25}
26
27impl<T: FutureSpawner> FutureSpawnerExt for T {
28    fn spawn<F>(&self, description: &'static str, f: F)
29    where
30        F: futures::Future<Output = ()> + Send + 'static,
31    {
32        self.spawn_boxed(description, f.boxed());
33    }
34}
35
36impl FutureSpawnerExt for dyn FutureSpawner + '_ {
37    fn spawn<F>(&self, description: &'static str, f: F)
38    where
39        F: futures::Future<Output = ()> + Send + 'static,
40    {
41        self.spawn_boxed(description, f.boxed());
42    }
43}
44
45/// Given a future, respawn it as an equivalent future but which does not block the
46/// driver of the future. For example, if the given future directly performs
47/// computation, normally the whoever drives the future (such as a buffered_unordered)
48/// would be blocked by the computation, thereby not allowing computation of other
49/// futures driven by the same driver to proceed. This function respawns the future
50/// onto the FutureSpawner, so the driver of the returned future would not be blocked.
51pub fn respawn_for_parallelism<T: Send + 'static>(
52    future_spawner: &dyn FutureSpawner,
53    name: &'static str,
54    f: impl std::future::Future<Output = T> + Send + 'static,
55) -> impl std::future::Future<Output = T> + Send + 'static {
56    let (sender, receiver) = tokio::sync::oneshot::channel();
57    future_spawner.spawn(name, async move {
58        sender.send(f.await).ok();
59    });
60    async move { receiver.await.unwrap() }
61}
62
63/// A FutureSpawner that hands over the future to Actix.
64pub struct ActixFutureSpawner;
65
66impl FutureSpawner for ActixFutureSpawner {
67    fn spawn_boxed(&self, description: &'static str, f: BoxFuture<'static, ()>) {
68        near_performance_metrics::actix::spawn(description, f);
69    }
70}
71
72/// A FutureSpawner that gives futures to a tokio Runtime, possibly supporting
73/// multiple threads.
74pub struct TokioRuntimeFutureSpawner(pub Arc<tokio::runtime::Runtime>);
75
76impl FutureSpawner for TokioRuntimeFutureSpawner {
77    fn spawn_boxed(&self, _description: &'static str, f: BoxFuture<'static, ()>) {
78        self.0.spawn(f);
79    }
80}
81
82pub struct ActixArbiterHandleFutureSpawner(pub actix::ArbiterHandle);
83
84impl FutureSpawner for ActixArbiterHandleFutureSpawner {
85    fn spawn_boxed(&self, description: &'static str, f: BoxFuture<'static, ()>) {
86        if !self.0.spawn(f) {
87            near_o11y::tracing::error!(
88                "Failed to spawn future: {}, arbiter has exited",
89                description
90            );
91        }
92    }
93}
94
95/// Abstraction for something that can schedule something to run after.
96/// This isn't the same as just delaying a closure. Rather, it has the
97/// additional power of providing the closure a mutable reference to some
98/// object. With the Actix framework, for example, the object (of type `T`)
99/// would be the actor, and the `DelayedActionRunner<T>`` is implemented by
100/// the actix `Context`.
101pub trait DelayedActionRunner<T> {
102    fn run_later_boxed(
103        &mut self,
104        name: &str,
105        dur: Duration,
106        f: Box<dyn FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static>,
107    );
108}
109
110pub trait DelayedActionRunnerExt<T> {
111    fn run_later(
112        &mut self,
113        name: &str,
114        dur: Duration,
115        f: impl FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static,
116    );
117}
118
119impl<T, Runner> DelayedActionRunnerExt<T> for Runner
120where
121    Runner: DelayedActionRunner<T>,
122{
123    fn run_later(
124        &mut self,
125        name: &str,
126        dur: Duration,
127        f: impl FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static,
128    ) {
129        self.run_later_boxed(name, dur, Box::new(f));
130    }
131}
132
133impl<T> DelayedActionRunnerExt<T> for dyn DelayedActionRunner<T> + '_ {
134    fn run_later(
135        &mut self,
136        name: &str,
137        dur: Duration,
138        f: impl FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static,
139    ) {
140        self.run_later_boxed(name, dur, Box::new(f));
141    }
142}
143
144/// Implementation of `DelayedActionRunner` for Actix. With this, any code
145/// that used to take a `&mut actix::Context` can now take a
146/// `&mut dyn DelayedActionRunner<T>` instead, which isn't actix-specific.
147impl<T, Outer> DelayedActionRunner<T> for actix::Context<Outer>
148where
149    T: 'static,
150    Outer: DerefMut<Target = T>,
151    Outer: Actor<Context = actix::Context<Outer>>,
152{
153    fn run_later_boxed(
154        &mut self,
155        _name: &str,
156        dur: Duration,
157        f: Box<dyn FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static>,
158    ) {
159        near_performance_metrics::actix::run_later(
160            self,
161            dur.max(Duration::ZERO).unsigned_abs(),
162            move |obj, ctx| f(&mut *obj, ctx),
163        );
164    }
165}
166
167/// Like `FutureSpawner`, but intended for spawning asynchronous computation-heavy tasks.
168/// Rather than taking a future, it takes a function. For production, the function shall
169/// be run on a separate thread (like `rayon::spawn`), but for test, it would run the
170/// function as a TestLoop event, possibly with an artificial delay.
171pub trait AsyncComputationSpawner: Send + Sync {
172    fn spawn_boxed(&self, name: &str, f: Box<dyn FnOnce() + Send>);
173}
174
175pub trait AsyncComputationSpawnerExt {
176    fn spawn(&self, name: &str, f: impl FnOnce() + Send + 'static);
177}
178
179impl<T: AsyncComputationSpawner> AsyncComputationSpawnerExt for T {
180    fn spawn(&self, name: &str, f: impl FnOnce() + Send + 'static) {
181        self.spawn_boxed(name, Box::new(f));
182    }
183}
184
185impl AsyncComputationSpawnerExt for dyn AsyncComputationSpawner + '_ {
186    fn spawn(&self, name: &str, f: impl FnOnce() + Send + 'static) {
187        self.spawn_boxed(name, Box::new(f));
188    }
189}
190
191pub struct StdThreadAsyncComputationSpawnerForTest;
192
193impl AsyncComputationSpawner for StdThreadAsyncComputationSpawnerForTest {
194    fn spawn_boxed(&self, _name: &str, f: Box<dyn FnOnce() + Send>) {
195        std::thread::spawn(f);
196    }
197}