jackhammer/
lib.rs

1use std::{
2    pin::Pin,
3    future::Future,
4};
5use anyhow::*;
6use metrix::{
7    TelemetryTransmitter,
8    instruments::{Panel, Cockpit},
9    processor::{TelemetryProcessor, AggregatesProcessors},
10instruments::Meter, TransmitsTelemetryData, instruments::Histogram, TimeUnit, instruments::Gauge};
11use tokio::{
12    task::JoinHandle,
13    time::{self, *},
14};
15
16pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
17
18pub struct Jackhammer {
19    interval: Interval,
20    actions_per_interval: u32,
21    action_factory: Box<dyn ActionFactory>,
22    timeout: Option<Duration>,
23    metrics: Metrics,
24}
25
26impl Jackhammer {
27    pub fn builder() -> JackhammerBuilder {
28        JackhammerBuilder::new()
29    }
30
31    async fn run(mut self) -> Result<()> {
32        loop {
33            self.interval.tick().await;
34
35            for _ in 0..self.actions_per_interval {
36                let action = self.action_factory.next_action();
37                let action = timeout(self.timeout, action);
38                let metrics = self.metrics.clone();
39
40                tokio::spawn(async move {
41                    let start = Instant::now();
42
43                    match action.await {
44                        Ok(Ok(_)) => metrics.observed_successful_action(start.elapsed()),
45                        Ok(Err(_)) =>  metrics.observed_failed_action(start.elapsed()),
46                        Err(_) => metrics.observed_timed_out_action(start.elapsed()),
47                    }
48
49                    metrics.observed_finished_action(start.elapsed());
50                });
51
52                self.metrics.observed_spawned_action();
53            }
54        }
55    }
56}
57
58pub struct JackhammerBuilder {
59    actions_per_interval: u32,
60    interval: Duration,
61    action_factory: Box<dyn ActionFactory>,
62    metrics: Metrics,
63    timeout: Option<Duration>,
64}
65
66impl JackhammerBuilder {
67    pub fn new() -> Self {
68        Self {
69            actions_per_interval: 1,
70            interval: Duration::from_secs(1),
71            action_factory: Box::new(|| Box::pin(async { Ok(()) })),
72            metrics: Metrics::default(),
73            timeout: None,
74        }
75    }
76
77    pub fn actions_per_interval(mut self, actions_per_interval: u32) -> Self {
78        self.actions_per_interval = actions_per_interval;
79        self
80    }
81
82    pub fn interval(mut self, interval: Duration) -> Self {
83        self.interval = interval;
84        self
85    }
86
87    pub fn action_factory<AF>(mut self, action_factory: AF) -> Self
88    where
89        AF: ActionFactory + Send + Sync,
90    {
91        self.action_factory = Box::new(action_factory);
92        self
93    }
94
95    pub fn timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
96        self.timeout = timeout.into();
97        self
98    }
99
100    /// Should be called at most once
101    pub fn instrumentation<AP>(mut self, aggregator: &mut AP) -> Self
102    where
103        AP: AggregatesProcessors,
104    {
105        self.metrics = Metrics::new(aggregator);
106        self
107    }
108
109    pub fn start(self) -> JackhammerHandle {
110        let jackhammer = Jackhammer {
111            interval: time::interval(self.interval),
112            actions_per_interval: self.actions_per_interval,
113            action_factory: self.action_factory,
114            metrics: self.metrics,
115            timeout: self.timeout,
116        };
117
118        let join_handle = tokio::spawn(jackhammer.run());
119
120        JackhammerHandle {
121            join_handle
122        }
123    }
124}
125
126pub trait ActionFactory: Send + 'static {
127    fn next_action(&mut self) -> BoxFuture<'static, Result<()>>;
128}
129
130impl dyn ActionFactory {
131    pub fn from_fn<F, Fut>(factory_fn: F) -> impl ActionFactory
132    where
133        F: FnMut() -> Fut + Send + Sync + 'static,
134        Fut: Future<Output = Result<()>> + Send + 'static,
135    {
136        factory_fn
137    }
138}
139
140impl<F, Fut> ActionFactory for F
141where
142    F: FnMut() -> Fut + Send + Sync + 'static,
143    Fut: Future<Output = Result<()>> + Send + 'static,
144{
145    fn next_action(&mut self) -> BoxFuture<'static, Result<()>> {
146        Box::pin(self())
147    }
148}
149
150pub struct JackhammerHandle {
151    join_handle: JoinHandle<Result<()>>,
152}
153
154impl JackhammerHandle {
155    pub async fn join(self) -> Result<()> {
156        self.join_handle.await??;
157        Ok(())
158    }
159}
160
161#[derive(Clone, Default)]
162struct Metrics {
163    telemetry_transmitter: Option<TelemetryTransmitter<Metric>>,
164}
165
166impl Metrics {
167    fn new<AP>(aggregator: &mut AP) -> Self
168    where
169        AP: AggregatesProcessors,
170    {
171        let mut cockpit = Cockpit::new("jackhammer");
172
173        let panel = Panel::named(Metric::SuccessfulActions, "successful_actions")
174            .meter(Meter::new_with_defaults("per_second"))
175            .histogram(
176                Histogram::new_with_defaults("latency_us")
177                .display_time_unit(TimeUnit::Microseconds)
178            )
179            .histogram(
180                Histogram::new_with_defaults("latency_ms")
181                .display_time_unit(TimeUnit::Milliseconds)
182            );
183        cockpit.add_panel(panel);
184
185        let panel = Panel::named(Metric::FailedActions, "failed_actions")
186            .meter(Meter::new_with_defaults("per_second"))
187            .histogram(
188                Histogram::new_with_defaults("latency_us")
189                .display_time_unit(TimeUnit::Microseconds)
190            )
191            .histogram(
192                Histogram::new_with_defaults("latency_ms")
193                .display_time_unit(TimeUnit::Milliseconds)
194            );
195        cockpit.add_panel(panel);
196
197        let mut panel = Panel::named(Metric::FinishedActions, "finished_actions")
198            .meter(Meter::new_with_defaults("per_second"))
199            .histogram(
200                Histogram::new_with_defaults("latency_us")
201                .display_time_unit(TimeUnit::Microseconds)
202            )
203            .histogram(
204                Histogram::new_with_defaults("latency_ms")
205                .display_time_unit(TimeUnit::Milliseconds)
206            );
207        panel.set_description("Actions that succeeded, failed or timed out");
208        cockpit.add_panel(panel);
209
210        let panel = Panel::named(Metric::TimedOutActions, "timed_out_actions")
211            .meter(Meter::new_with_defaults("per_second"))
212            .histogram(
213                Histogram::new_with_defaults("latency_us")
214                .display_time_unit(TimeUnit::Microseconds)
215            )
216            .histogram(
217                Histogram::new_with_defaults("latency_ms")
218                .display_time_unit(TimeUnit::Milliseconds)
219            );
220        cockpit.add_panel(panel);
221
222        let panel = Panel::named(Metric::SpawnedActions, "spawned_actions")
223            .gauge(Gauge::new_with_defaults("count"));
224        cockpit.add_panel(panel);
225
226        let (tx, mut rx) = TelemetryProcessor::new_pair_without_name();
227        rx.add_cockpit(cockpit);
228        aggregator.add_processor(rx);
229
230        Self { telemetry_transmitter: Some(tx) }
231    }
232
233    fn observed_successful_action(&self, duration: Duration) {
234        if let Some(tx) = &self.telemetry_transmitter {
235            tx.observed_one_duration_now(Metric::SuccessfulActions, duration);
236        }
237    }
238
239    fn observed_failed_action(&self, duration: Duration) {
240        if let Some(tx) = &self.telemetry_transmitter {
241            tx.observed_one_duration_now(Metric::FailedActions, duration);
242        }
243    }
244
245    fn observed_finished_action(&self, duration: Duration) {
246        let now = std::time::Instant::now();
247
248        if let Some(tx) = &self.telemetry_transmitter {
249            tx.observed_duration(Metric::FinishedActions, duration, now);
250            tx.observed_one_value(Metric::SpawnedActions, metrix::Decrement, now);
251        }
252    }
253
254    fn observed_timed_out_action(&self, duration: Duration) {
255        if let Some(tx) = &self.telemetry_transmitter {
256            tx.observed_one_duration_now(Metric::TimedOutActions, duration);
257        }
258    }
259
260    fn observed_spawned_action(&self) {
261        if let Some(tx) = &self.telemetry_transmitter {
262            tx.observed_one_value_now(Metric::SpawnedActions, metrix::Increment);
263        }
264    }
265}
266
267#[derive(PartialEq, Eq, Copy, Clone)]
268enum Metric {
269    SuccessfulActions,
270    FailedActions,
271    FinishedActions,
272    TimedOutActions,
273    SpawnedActions,
274}
275
276async fn timeout<T>(duration: Option<Duration>, future: impl Future<Output = T>) -> Result<T, time::error::Elapsed> {
277    match duration {
278        Some(duration) => time::timeout(duration, future).await,
279        None => Ok(future.await),
280    }
281}