1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
use actix::Actor;
pub use futures::future::BoxFuture; // pub for macros
use futures::FutureExt;
use near_time::Duration;
use std::ops::DerefMut;

/// Abstraction for something that can drive futures.
///
/// Rust futures don't run by itself. It needs a driver to execute it. This can
/// for example be a thread, a thread pool, or Actix. This trait abstracts over
/// the execution mechanism.
///
/// The reason why we need an abstraction is (1) we can intercept the future
/// spawning to add additional instrumentation (2) we can support driving the
/// future with TestLoop for testing.
pub trait FutureSpawner {
    fn spawn_boxed(&self, description: &'static str, f: BoxFuture<'static, ()>);
}

pub trait FutureSpawnerExt {
    fn spawn<F>(&self, description: &'static str, f: F)
    where
        F: futures::Future<Output = ()> + Send + 'static;
}

impl<T: FutureSpawner> FutureSpawnerExt for T {
    fn spawn<F>(&self, description: &'static str, f: F)
    where
        F: futures::Future<Output = ()> + Send + 'static,
    {
        self.spawn_boxed(description, f.boxed());
    }
}

impl FutureSpawnerExt for dyn FutureSpawner + '_ {
    fn spawn<F>(&self, description: &'static str, f: F)
    where
        F: futures::Future<Output = ()> + Send + 'static,
    {
        self.spawn_boxed(description, f.boxed());
    }
}

/// A FutureSpawner that hands over the future to Actix.
pub struct ActixFutureSpawner;

impl FutureSpawner for ActixFutureSpawner {
    fn spawn_boxed(&self, description: &'static str, f: BoxFuture<'static, ()>) {
        near_performance_metrics::actix::spawn(description, f);
    }
}

pub struct ActixArbiterHandleFutureSpawner(pub actix::ArbiterHandle);

impl FutureSpawner for ActixArbiterHandleFutureSpawner {
    fn spawn_boxed(&self, description: &'static str, f: BoxFuture<'static, ()>) {
        if !self.0.spawn(f) {
            near_o11y::tracing::error!(
                "Failed to spawn future: {}, arbiter has exited",
                description
            );
        }
    }
}

/// Abstraction for something that can schedule something to run after.
/// This isn't the same as just delaying a closure. Rather, it has the
/// additional power of providing the closure a mutable reference to some
/// object. With the Actix framework, for example, the object (of type `T`)
/// would be the actor, and the `DelayedActionRunner<T>`` is implemented by
/// the actix `Context`.
pub trait DelayedActionRunner<T> {
    fn run_later_boxed(
        &mut self,
        name: &str,
        dur: Duration,
        f: Box<dyn FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static>,
    );
}

pub trait DelayedActionRunnerExt<T> {
    fn run_later(
        &mut self,
        name: &str,
        dur: Duration,
        f: impl FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static,
    );
}

impl<T, Runner> DelayedActionRunnerExt<T> for Runner
where
    Runner: DelayedActionRunner<T>,
{
    fn run_later(
        &mut self,
        name: &str,
        dur: Duration,
        f: impl FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static,
    ) {
        self.run_later_boxed(name, dur, Box::new(f));
    }
}

impl<T> DelayedActionRunnerExt<T> for dyn DelayedActionRunner<T> + '_ {
    fn run_later(
        &mut self,
        name: &str,
        dur: Duration,
        f: impl FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static,
    ) {
        self.run_later_boxed(name, dur, Box::new(f));
    }
}

/// Implementation of `DelayedActionRunner` for Actix. With this, any code
/// that used to take a `&mut actix::Context` can now take a
/// `&mut dyn DelayedActionRunner<T>` instead, which isn't actix-specific.
impl<T, Outer> DelayedActionRunner<T> for actix::Context<Outer>
where
    T: 'static,
    Outer: DerefMut<Target = T>,
    Outer: Actor<Context = actix::Context<Outer>>,
{
    fn run_later_boxed(
        &mut self,
        _name: &str,
        dur: Duration,
        f: Box<dyn FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static>,
    ) {
        near_performance_metrics::actix::run_later(
            self,
            dur.max(Duration::ZERO).unsigned_abs(),
            move |obj, ctx| f(obj.deref_mut(), ctx),
        );
    }
}

/// Like `FutureSpawner`, but intended for spawning asynchronous computation-heavy tasks.
/// Rather than taking a future, it takes a function. For production, the function shall
/// be run on a separate thread (like `rayon::spawn`), but for test, it would run the
/// function as a TestLoop event, possibly with an artificial delay.
pub trait AsyncComputationSpawner: Send + Sync {
    fn spawn_boxed(&self, name: &str, f: Box<dyn FnOnce() + Send>);
}

pub trait AsyncComputationSpawnerExt {
    fn spawn(&self, name: &str, f: impl FnOnce() + Send + 'static);
}

impl<T: AsyncComputationSpawner> AsyncComputationSpawnerExt for T {
    fn spawn(&self, name: &str, f: impl FnOnce() + Send + 'static) {
        self.spawn_boxed(name, Box::new(f));
    }
}

impl AsyncComputationSpawnerExt for dyn AsyncComputationSpawner + '_ {
    fn spawn(&self, name: &str, f: impl FnOnce() + Send + 'static) {
        self.spawn_boxed(name, Box::new(f));
    }
}

pub struct StdThreadAsyncComputationSpawnerForTest;

impl AsyncComputationSpawner for StdThreadAsyncComputationSpawnerForTest {
    fn spawn_boxed(&self, _name: &str, f: Box<dyn FnOnce() + Send>) {
        std::thread::spawn(f);
    }
}