1use futures::FutureExt;
2pub use futures::future::BoxFuture; use near_time::Duration;
4use std::sync::Arc;
5
6pub trait FutureSpawner: Send + Sync {
16 fn spawn_boxed(&self, description: &'static str, f: BoxFuture<'static, ()>);
17}
18
19pub trait FutureSpawnerExt {
20 fn spawn<F>(&self, description: &'static str, f: F)
21 where
22 F: futures::Future<Output = ()> + Send + 'static;
23}
24
25impl<T: FutureSpawner> FutureSpawnerExt for T {
26 fn spawn<F>(&self, description: &'static str, f: F)
27 where
28 F: futures::Future<Output = ()> + Send + 'static,
29 {
30 self.spawn_boxed(description, f.boxed());
31 }
32}
33
34impl FutureSpawnerExt for dyn FutureSpawner + '_ {
35 fn spawn<F>(&self, description: &'static str, f: F)
36 where
37 F: futures::Future<Output = ()> + Send + 'static,
38 {
39 self.spawn_boxed(description, f.boxed());
40 }
41}
42
43pub fn respawn_for_parallelism<T: Send + 'static>(
50 future_spawner: &dyn FutureSpawner,
51 name: &'static str,
52 f: impl std::future::Future<Output = T> + Send + 'static,
53) -> impl std::future::Future<Output = T> + Send + 'static {
54 let (sender, receiver) = tokio::sync::oneshot::channel();
55 future_spawner.spawn(name, async move {
56 sender.send(f.await).ok();
57 });
58 async move { receiver.await.unwrap() }
59}
60
61pub struct TokioRuntimeFutureSpawner(pub Arc<tokio::runtime::Runtime>);
64
65impl FutureSpawner for TokioRuntimeFutureSpawner {
66 fn spawn_boxed(&self, _description: &'static str, f: BoxFuture<'static, ()>) {
67 self.0.spawn(f);
68 }
69}
70
71pub struct DirectTokioFutureSpawnerForTest;
74
75impl FutureSpawner for DirectTokioFutureSpawnerForTest {
76 fn spawn_boxed(&self, _description: &'static str, f: BoxFuture<'static, ()>) {
77 tokio::spawn(f);
78 }
79}
80
81pub trait DelayedActionRunner<T> {
87 fn run_later_boxed(
88 &mut self,
89 name: &'static str,
90 dur: Duration,
91 f: Box<dyn FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static>,
92 );
93}
94
95pub trait DelayedActionRunnerExt<T> {
96 fn run_later(
97 &mut self,
98 name: &'static str,
99 dur: Duration,
100 f: impl FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static,
101 );
102}
103
104impl<T, Runner> DelayedActionRunnerExt<T> for Runner
105where
106 Runner: DelayedActionRunner<T>,
107{
108 fn run_later(
109 &mut self,
110 name: &'static str,
111 dur: Duration,
112 f: impl FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static,
113 ) {
114 self.run_later_boxed(name, dur, Box::new(f));
115 }
116}
117
118impl<T> DelayedActionRunnerExt<T> for dyn DelayedActionRunner<T> + '_ {
119 fn run_later(
120 &mut self,
121 name: &'static str,
122 dur: Duration,
123 f: impl FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static,
124 ) {
125 self.run_later_boxed(name, dur, Box::new(f));
126 }
127}
128
129pub trait AsyncComputationSpawner: Send + Sync {
134 fn spawn_boxed(&self, name: &str, f: Box<dyn FnOnce() + Send>);
135}
136
137pub trait AsyncComputationSpawnerExt {
138 fn spawn(&self, name: &str, f: impl FnOnce() + Send + 'static);
139}
140
141impl<T: AsyncComputationSpawner> AsyncComputationSpawnerExt for T {
142 fn spawn(&self, name: &str, f: impl FnOnce() + Send + 'static) {
143 self.spawn_boxed(name, Box::new(f));
144 }
145}
146
147impl AsyncComputationSpawnerExt for dyn AsyncComputationSpawner + '_ {
148 fn spawn(&self, name: &str, f: impl FnOnce() + Send + 'static) {
149 self.spawn_boxed(name, Box::new(f));
150 }
151}
152
153pub struct StdThreadAsyncComputationSpawnerForTest;
154
155impl AsyncComputationSpawner for StdThreadAsyncComputationSpawnerForTest {
156 fn spawn_boxed(&self, _name: &str, f: Box<dyn FnOnce() + Send>) {
157 std::thread::spawn(f);
158 }
159}