1use metrics::{
2 Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SetRecorderError, SharedString,
3 Unit,
4};
5use metrics_util::AtomicBucket;
6use pdatastructs::tdigest::{TDigest, K1};
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, Instant};
11use tokio::sync::oneshot::{self, Receiver, Sender};
12use tracing::error;
13
14mod pattern;
15mod plot;
16
17pub use pattern::{PatternGroup, PlotKind};
18
19const TDIGEST_COMPRESSION_FACTOR: f64 = 100.0;
20const TDIGEST_MAX_BACKLOG_SIZE: usize = 10;
21
22pub struct PlotlyRecorderBuilder {}
23
24impl Default for PlotlyRecorderBuilder {
25 fn default() -> Self {
26 Self::new()
27 }
28}
29
30impl PlotlyRecorderBuilder {
31 pub fn new() -> Self {
32 Self {}
33 }
34
35 pub fn install(self) -> Result<PlotlyRecorderHandle, SetRecorderError<PlotlyRecorder>> {
37 use tokio::runtime;
38 let mut recorder = if let Ok(handle) = runtime::Handle::try_current() {
39 let _g = handle.enter();
40 self.build()
41 } else {
42 let runtime = runtime::Builder::new_current_thread()
43 .enable_all()
44 .build()
45 .unwrap();
46
47 let recorder = {
48 let _g = runtime.enter();
49 self.build()
50 };
51
52 recorder
53 };
54
55 let handle = recorder.get_handle();
56 metrics::set_global_recorder(recorder)?;
57 Ok(handle)
58 }
59
60 pub fn build(&self) -> PlotlyRecorder {
62 PlotlyRecorder::new()
63 }
64}
65
66pub struct PlotlyRecorder {
67 state: Arc<Mutex<State>>,
68 handle: Option<PlotlyRecorderHandle>,
69}
70
71impl PlotlyRecorder {
72 fn new() -> Self {
73 let state = Arc::new(Mutex::new(State::new()));
74 let (tx0, rx0) = oneshot::channel();
75 let (tx1, rx1) = oneshot::channel();
76
77 let state2 = state.clone();
78 tokio::spawn(scraper(state2, (rx0, tx1)));
79
80 let handle = PlotlyRecorderHandle {
81 channel: (tx0, rx1),
82 };
83 Self {
84 state,
85 handle: Some(handle),
86 }
87 }
88
89 fn get_handle(&mut self) -> PlotlyRecorderHandle {
90 self.handle.take().unwrap()
91 }
92}
93
94impl Recorder for PlotlyRecorder {
95 fn describe_counter(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
96 }
98
99 fn describe_gauge(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
100 }
102
103 fn describe_histogram(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
104 }
106
107 fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter {
108 let atomic = self.state.lock().unwrap().get_counter(key);
109 Counter::from_arc(atomic)
110 }
111
112 fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge {
113 let atomic = self.state.lock().unwrap().get_gauge(key);
114 Gauge::from_arc(atomic)
115 }
116
117 fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram {
118 let atomic = self.state.lock().unwrap().get_histogram(key);
119 Histogram::from_arc(atomic)
120 }
121}
122
123struct State {
124 counters: HashMap<Key, Arc<AtomicU64>>,
125 gauges: HashMap<Key, Arc<AtomicU64>>,
126 histograms: HashMap<Key, Arc<AtomicBucket<f64>>>,
127}
128
129impl State {
130 fn new() -> Self {
131 Self {
132 counters: HashMap::new(),
133 gauges: HashMap::new(),
134 histograms: HashMap::new(),
135 }
136 }
137
138 fn get_counter(&mut self, key: &Key) -> Arc<AtomicU64> {
139 if let Some(val) = self.counters.get(key) {
140 val.clone()
141 } else {
142 let val = Arc::new(AtomicU64::new(0));
143 self.counters.insert(key.clone(), val.clone());
144 val
145 }
146 }
147
148 fn get_gauge(&mut self, key: &Key) -> Arc<AtomicU64> {
149 if let Some(val) = self.gauges.get(key) {
150 val.clone()
151 } else {
152 let val = Arc::new(AtomicU64::new(0));
153 self.gauges.insert(key.clone(), val.clone());
154 val
155 }
156 }
157
158 fn get_histogram(&mut self, key: &Key) -> Arc<AtomicBucket<f64>> {
159 if let Some(val) = self.histograms.get(key) {
160 val.clone()
161 } else {
162 let val = Arc::new(AtomicBucket::new());
163 self.histograms.insert(key.clone(), val.clone());
164 val
165 }
166 }
167}
168
169pub struct PlotlyRecorderHandle {
170 channel: (Sender<()>, Receiver<DataCollector>),
171}
172
173impl PlotlyRecorderHandle {
174 pub async fn plot(self, groups: &[PatternGroup]) {
179 self.channel.0.send(()).unwrap();
180 let res = self.channel.1.await;
181
182 if let Ok(data) = res {
183 plot::plot_data(data, groups);
184 } else {
185 error!("Channel broke in Drop impl.");
186 }
187 }
188}
189
190async fn scraper(state: Arc<Mutex<State>>, (rx, tx): (Receiver<()>, Sender<DataCollector>)) {
191 let mut data = DataCollector::new();
192 let scrape_interval = Duration::from_millis(1000);
194
195 tokio::select! {
196 _ = async {
197 loop {
198 tokio::time::sleep(scrape_interval).await;
199 scrape_data(&mut data, &state);
200 }
201 } => {
202 }
203 _ = rx => {
204 scrape_data(&mut data, &state);
205 }
206 }
207
208 let _ = tx.send(data);
209}
210
211fn scrape_data(data: &mut DataCollector, state: &Mutex<State>) {
212 data.log_time();
213 let state = state.lock().unwrap();
214 for (key, counter) in state.counters.iter() {
215 let val = counter.load(Ordering::Relaxed);
216 data.push_counter(key, val);
217 }
218
219 for (key, gauge) in state.gauges.iter() {
220 let val = gauge.load(Ordering::Relaxed);
221 data.push_gauge(key, val);
222 }
223
224 for (key, histogram) in state.histograms.iter() {
225 let scale_function = K1::new(TDIGEST_COMPRESSION_FACTOR);
226 let mut tdigest = TDigest::new(scale_function, TDIGEST_MAX_BACKLOG_SIZE);
227 histogram.clear_with(|data| {
228 for d in data {
229 tdigest.insert(*d);
230 }
231 });
232
233 let vals = (
234 tdigest.quantile(0.5),
235 tdigest.quantile(0.9),
236 tdigest.quantile(0.95),
237 tdigest.quantile(0.99),
238 );
239 data.push_histogram(key, vals);
240 }
241}
242
243#[derive(Debug)]
244struct DataCollector {
245 counters: HashMap<Key, Vec<u64>>,
246 gauges: HashMap<Key, Vec<u64>>,
247 histograms: HashMap<Key, Vec<(f64, f64, f64, f64)>>,
248 start: Instant,
249 timestamps: Vec<f64>,
250}
251
252impl DataCollector {
253 fn new() -> Self {
254 let start = Instant::now();
255 Self {
256 counters: HashMap::new(),
257 gauges: HashMap::new(),
258 histograms: HashMap::new(),
259 start,
260 timestamps: vec![],
261 }
262 }
263
264 fn log_time(&mut self) {
265 self.timestamps.push(self.start.elapsed().as_secs_f64());
266 }
267
268 fn push_counter(&mut self, key: &Key, value: u64) {
269 if let Some(vec) = self.counters.get_mut(key) {
270 vec.push(value);
271 } else {
272 self.counters.insert(key.to_owned(), vec![value]);
273 }
274 }
275
276 fn push_gauge(&mut self, key: &Key, value: u64) {
277 if let Some(vec) = self.gauges.get_mut(key) {
278 vec.push(value);
279 } else {
280 self.gauges.insert(key.to_owned(), vec![value]);
281 }
282 }
283
284 fn push_histogram(&mut self, key: &Key, value: (f64, f64, f64, f64)) {
285 if let Some(vec) = self.histograms.get_mut(key) {
286 vec.push(value);
287 } else {
288 self.histograms.insert(key.to_owned(), vec![value]);
289 }
290 }
291
292 fn metrics(&self) -> Vec<&str> {
293 self.counters
294 .keys()
295 .chain(self.gauges.keys())
296 .chain(self.histograms.keys())
297 .map(|key| key.name())
298 .collect()
299 }
300
301 fn get_metric(&self, name: &str) -> Option<MetricKind> {
302 let key = Key::from_name(name.to_owned());
303
304 if let Some(vals) = self.counters.get(&key) {
305 Some(MetricKind::Single(vals.clone()))
306 } else if let Some(vals) = self.gauges.get(&key) {
307 Some(MetricKind::Single(vals.clone()))
308 } else {
309 self.histograms
310 .get(&key)
311 .map(|vals| MetricKind::Quantile(vals.clone()))
312 }
313 }
314}
315
316pub(crate) enum MetricKind {
317 Single(Vec<u64>),
318 Quantile(Vec<(f64, f64, f64, f64)>),
319}