1use actix::Actor;
2use futures::FutureExt;
3pub use futures::future::BoxFuture; use near_time::Duration;
5use std::ops::DerefMut;
6use std::sync::Arc;
7
8pub trait FutureSpawner: Send + Sync {
18 fn spawn_boxed(&self, description: &'static str, f: BoxFuture<'static, ()>);
19}
20
21pub trait FutureSpawnerExt {
22 fn spawn<F>(&self, description: &'static str, f: F)
23 where
24 F: futures::Future<Output = ()> + Send + 'static;
25}
26
27impl<T: FutureSpawner> FutureSpawnerExt for T {
28 fn spawn<F>(&self, description: &'static str, f: F)
29 where
30 F: futures::Future<Output = ()> + Send + 'static,
31 {
32 self.spawn_boxed(description, f.boxed());
33 }
34}
35
36impl FutureSpawnerExt for dyn FutureSpawner + '_ {
37 fn spawn<F>(&self, description: &'static str, f: F)
38 where
39 F: futures::Future<Output = ()> + Send + 'static,
40 {
41 self.spawn_boxed(description, f.boxed());
42 }
43}
44
45pub fn respawn_for_parallelism<T: Send + 'static>(
52 future_spawner: &dyn FutureSpawner,
53 name: &'static str,
54 f: impl std::future::Future<Output = T> + Send + 'static,
55) -> impl std::future::Future<Output = T> + Send + 'static {
56 let (sender, receiver) = tokio::sync::oneshot::channel();
57 future_spawner.spawn(name, async move {
58 sender.send(f.await).ok();
59 });
60 async move { receiver.await.unwrap() }
61}
62
63pub struct ActixFutureSpawner;
65
66impl FutureSpawner for ActixFutureSpawner {
67 fn spawn_boxed(&self, description: &'static str, f: BoxFuture<'static, ()>) {
68 near_performance_metrics::actix::spawn(description, f);
69 }
70}
71
72pub struct TokioRuntimeFutureSpawner(pub Arc<tokio::runtime::Runtime>);
75
76impl FutureSpawner for TokioRuntimeFutureSpawner {
77 fn spawn_boxed(&self, _description: &'static str, f: BoxFuture<'static, ()>) {
78 self.0.spawn(f);
79 }
80}
81
82pub struct ActixArbiterHandleFutureSpawner(pub actix::ArbiterHandle);
83
84impl FutureSpawner for ActixArbiterHandleFutureSpawner {
85 fn spawn_boxed(&self, description: &'static str, f: BoxFuture<'static, ()>) {
86 if !self.0.spawn(f) {
87 near_o11y::tracing::error!(
88 "Failed to spawn future: {}, arbiter has exited",
89 description
90 );
91 }
92 }
93}
94
95pub trait DelayedActionRunner<T> {
102 fn run_later_boxed(
103 &mut self,
104 name: &str,
105 dur: Duration,
106 f: Box<dyn FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static>,
107 );
108}
109
110pub trait DelayedActionRunnerExt<T> {
111 fn run_later(
112 &mut self,
113 name: &str,
114 dur: Duration,
115 f: impl FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static,
116 );
117}
118
119impl<T, Runner> DelayedActionRunnerExt<T> for Runner
120where
121 Runner: DelayedActionRunner<T>,
122{
123 fn run_later(
124 &mut self,
125 name: &str,
126 dur: Duration,
127 f: impl FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static,
128 ) {
129 self.run_later_boxed(name, dur, Box::new(f));
130 }
131}
132
133impl<T> DelayedActionRunnerExt<T> for dyn DelayedActionRunner<T> + '_ {
134 fn run_later(
135 &mut self,
136 name: &str,
137 dur: Duration,
138 f: impl FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static,
139 ) {
140 self.run_later_boxed(name, dur, Box::new(f));
141 }
142}
143
144impl<T, Outer> DelayedActionRunner<T> for actix::Context<Outer>
148where
149 T: 'static,
150 Outer: DerefMut<Target = T>,
151 Outer: Actor<Context = actix::Context<Outer>>,
152{
153 fn run_later_boxed(
154 &mut self,
155 _name: &str,
156 dur: Duration,
157 f: Box<dyn FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static>,
158 ) {
159 near_performance_metrics::actix::run_later(
160 self,
161 dur.max(Duration::ZERO).unsigned_abs(),
162 move |obj, ctx| f(&mut *obj, ctx),
163 );
164 }
165}
166
167pub trait AsyncComputationSpawner: Send + Sync {
172 fn spawn_boxed(&self, name: &str, f: Box<dyn FnOnce() + Send>);
173}
174
175pub trait AsyncComputationSpawnerExt {
176 fn spawn(&self, name: &str, f: impl FnOnce() + Send + 'static);
177}
178
179impl<T: AsyncComputationSpawner> AsyncComputationSpawnerExt for T {
180 fn spawn(&self, name: &str, f: impl FnOnce() + Send + 'static) {
181 self.spawn_boxed(name, Box::new(f));
182 }
183}
184
185impl AsyncComputationSpawnerExt for dyn AsyncComputationSpawner + '_ {
186 fn spawn(&self, name: &str, f: impl FnOnce() + Send + 'static) {
187 self.spawn_boxed(name, Box::new(f));
188 }
189}
190
191pub struct StdThreadAsyncComputationSpawnerForTest;
192
193impl AsyncComputationSpawner for StdThreadAsyncComputationSpawnerForTest {
194 fn spawn_boxed(&self, _name: &str, f: Box<dyn FnOnce() + Send>) {
195 std::thread::spawn(f);
196 }
197}