1use std::{
2 pin::Pin,
3 future::Future,
4};
5use anyhow::*;
6use metrix::{
7 TelemetryTransmitter,
8 instruments::{Panel, Cockpit},
9 processor::{TelemetryProcessor, AggregatesProcessors},
10instruments::Meter, TransmitsTelemetryData, instruments::Histogram, TimeUnit, instruments::Gauge};
11use tokio::{
12 task::JoinHandle,
13 time::{self, *},
14};
15
16pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
17
18pub struct Jackhammer {
19 interval: Interval,
20 actions_per_interval: u32,
21 action_factory: Box<dyn ActionFactory>,
22 timeout: Option<Duration>,
23 metrics: Metrics,
24}
25
26impl Jackhammer {
27 pub fn builder() -> JackhammerBuilder {
28 JackhammerBuilder::new()
29 }
30
31 async fn run(mut self) -> Result<()> {
32 loop {
33 self.interval.tick().await;
34
35 for _ in 0..self.actions_per_interval {
36 let action = self.action_factory.next_action();
37 let action = timeout(self.timeout, action);
38 let metrics = self.metrics.clone();
39
40 tokio::spawn(async move {
41 let start = Instant::now();
42
43 match action.await {
44 Ok(Ok(_)) => metrics.observed_successful_action(start.elapsed()),
45 Ok(Err(_)) => metrics.observed_failed_action(start.elapsed()),
46 Err(_) => metrics.observed_timed_out_action(start.elapsed()),
47 }
48
49 metrics.observed_finished_action(start.elapsed());
50 });
51
52 self.metrics.observed_spawned_action();
53 }
54 }
55 }
56}
57
58pub struct JackhammerBuilder {
59 actions_per_interval: u32,
60 interval: Duration,
61 action_factory: Box<dyn ActionFactory>,
62 metrics: Metrics,
63 timeout: Option<Duration>,
64}
65
66impl JackhammerBuilder {
67 pub fn new() -> Self {
68 Self {
69 actions_per_interval: 1,
70 interval: Duration::from_secs(1),
71 action_factory: Box::new(|| Box::pin(async { Ok(()) })),
72 metrics: Metrics::default(),
73 timeout: None,
74 }
75 }
76
77 pub fn actions_per_interval(mut self, actions_per_interval: u32) -> Self {
78 self.actions_per_interval = actions_per_interval;
79 self
80 }
81
82 pub fn interval(mut self, interval: Duration) -> Self {
83 self.interval = interval;
84 self
85 }
86
87 pub fn action_factory<AF>(mut self, action_factory: AF) -> Self
88 where
89 AF: ActionFactory + Send + Sync,
90 {
91 self.action_factory = Box::new(action_factory);
92 self
93 }
94
95 pub fn timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
96 self.timeout = timeout.into();
97 self
98 }
99
100 pub fn instrumentation<AP>(mut self, aggregator: &mut AP) -> Self
102 where
103 AP: AggregatesProcessors,
104 {
105 self.metrics = Metrics::new(aggregator);
106 self
107 }
108
109 pub fn start(self) -> JackhammerHandle {
110 let jackhammer = Jackhammer {
111 interval: time::interval(self.interval),
112 actions_per_interval: self.actions_per_interval,
113 action_factory: self.action_factory,
114 metrics: self.metrics,
115 timeout: self.timeout,
116 };
117
118 let join_handle = tokio::spawn(jackhammer.run());
119
120 JackhammerHandle {
121 join_handle
122 }
123 }
124}
125
126pub trait ActionFactory: Send + 'static {
127 fn next_action(&mut self) -> BoxFuture<'static, Result<()>>;
128}
129
130impl dyn ActionFactory {
131 pub fn from_fn<F, Fut>(factory_fn: F) -> impl ActionFactory
132 where
133 F: FnMut() -> Fut + Send + Sync + 'static,
134 Fut: Future<Output = Result<()>> + Send + 'static,
135 {
136 factory_fn
137 }
138}
139
140impl<F, Fut> ActionFactory for F
141where
142 F: FnMut() -> Fut + Send + Sync + 'static,
143 Fut: Future<Output = Result<()>> + Send + 'static,
144{
145 fn next_action(&mut self) -> BoxFuture<'static, Result<()>> {
146 Box::pin(self())
147 }
148}
149
150pub struct JackhammerHandle {
151 join_handle: JoinHandle<Result<()>>,
152}
153
154impl JackhammerHandle {
155 pub async fn join(self) -> Result<()> {
156 self.join_handle.await??;
157 Ok(())
158 }
159}
160
161#[derive(Clone, Default)]
162struct Metrics {
163 telemetry_transmitter: Option<TelemetryTransmitter<Metric>>,
164}
165
166impl Metrics {
167 fn new<AP>(aggregator: &mut AP) -> Self
168 where
169 AP: AggregatesProcessors,
170 {
171 let mut cockpit = Cockpit::new("jackhammer");
172
173 let panel = Panel::named(Metric::SuccessfulActions, "successful_actions")
174 .meter(Meter::new_with_defaults("per_second"))
175 .histogram(
176 Histogram::new_with_defaults("latency_us")
177 .display_time_unit(TimeUnit::Microseconds)
178 )
179 .histogram(
180 Histogram::new_with_defaults("latency_ms")
181 .display_time_unit(TimeUnit::Milliseconds)
182 );
183 cockpit.add_panel(panel);
184
185 let panel = Panel::named(Metric::FailedActions, "failed_actions")
186 .meter(Meter::new_with_defaults("per_second"))
187 .histogram(
188 Histogram::new_with_defaults("latency_us")
189 .display_time_unit(TimeUnit::Microseconds)
190 )
191 .histogram(
192 Histogram::new_with_defaults("latency_ms")
193 .display_time_unit(TimeUnit::Milliseconds)
194 );
195 cockpit.add_panel(panel);
196
197 let mut panel = Panel::named(Metric::FinishedActions, "finished_actions")
198 .meter(Meter::new_with_defaults("per_second"))
199 .histogram(
200 Histogram::new_with_defaults("latency_us")
201 .display_time_unit(TimeUnit::Microseconds)
202 )
203 .histogram(
204 Histogram::new_with_defaults("latency_ms")
205 .display_time_unit(TimeUnit::Milliseconds)
206 );
207 panel.set_description("Actions that succeeded, failed or timed out");
208 cockpit.add_panel(panel);
209
210 let panel = Panel::named(Metric::TimedOutActions, "timed_out_actions")
211 .meter(Meter::new_with_defaults("per_second"))
212 .histogram(
213 Histogram::new_with_defaults("latency_us")
214 .display_time_unit(TimeUnit::Microseconds)
215 )
216 .histogram(
217 Histogram::new_with_defaults("latency_ms")
218 .display_time_unit(TimeUnit::Milliseconds)
219 );
220 cockpit.add_panel(panel);
221
222 let panel = Panel::named(Metric::SpawnedActions, "spawned_actions")
223 .gauge(Gauge::new_with_defaults("count"));
224 cockpit.add_panel(panel);
225
226 let (tx, mut rx) = TelemetryProcessor::new_pair_without_name();
227 rx.add_cockpit(cockpit);
228 aggregator.add_processor(rx);
229
230 Self { telemetry_transmitter: Some(tx) }
231 }
232
233 fn observed_successful_action(&self, duration: Duration) {
234 if let Some(tx) = &self.telemetry_transmitter {
235 tx.observed_one_duration_now(Metric::SuccessfulActions, duration);
236 }
237 }
238
239 fn observed_failed_action(&self, duration: Duration) {
240 if let Some(tx) = &self.telemetry_transmitter {
241 tx.observed_one_duration_now(Metric::FailedActions, duration);
242 }
243 }
244
245 fn observed_finished_action(&self, duration: Duration) {
246 let now = std::time::Instant::now();
247
248 if let Some(tx) = &self.telemetry_transmitter {
249 tx.observed_duration(Metric::FinishedActions, duration, now);
250 tx.observed_one_value(Metric::SpawnedActions, metrix::Decrement, now);
251 }
252 }
253
254 fn observed_timed_out_action(&self, duration: Duration) {
255 if let Some(tx) = &self.telemetry_transmitter {
256 tx.observed_one_duration_now(Metric::TimedOutActions, duration);
257 }
258 }
259
260 fn observed_spawned_action(&self) {
261 if let Some(tx) = &self.telemetry_transmitter {
262 tx.observed_one_value_now(Metric::SpawnedActions, metrix::Increment);
263 }
264 }
265}
266
267#[derive(PartialEq, Eq, Copy, Clone)]
268enum Metric {
269 SuccessfulActions,
270 FailedActions,
271 FinishedActions,
272 TimedOutActions,
273 SpawnedActions,
274}
275
276async fn timeout<T>(duration: Option<Duration>, future: impl Future<Output = T>) -> Result<T, time::error::Elapsed> {
277 match duration {
278 Some(duration) => time::timeout(duration, future).await,
279 None => Ok(future.await),
280 }
281}