metrics_exporter_plotly/
lib.rs

1use metrics::{
2    Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SetRecorderError, SharedString,
3    Unit,
4};
5use metrics_util::AtomicBucket;
6use pdatastructs::tdigest::{TDigest, K1};
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, Instant};
11use tokio::sync::oneshot::{self, Receiver, Sender};
12use tracing::error;
13
14mod pattern;
15mod plot;
16
17pub use pattern::{PatternGroup, PlotKind};
18
19const TDIGEST_COMPRESSION_FACTOR: f64 = 100.0;
20const TDIGEST_MAX_BACKLOG_SIZE: usize = 10;
21
22pub struct PlotlyRecorderBuilder {}
23
24impl Default for PlotlyRecorderBuilder {
25    fn default() -> Self {
26        Self::new()
27    }
28}
29
30impl PlotlyRecorderBuilder {
31    pub fn new() -> Self {
32        Self {}
33    }
34
35    /// Install the recorder globally.
36    pub fn install(self) -> Result<PlotlyRecorderHandle, SetRecorderError<PlotlyRecorder>> {
37        use tokio::runtime;
38        let mut recorder = if let Ok(handle) = runtime::Handle::try_current() {
39            let _g = handle.enter();
40            self.build()
41        } else {
42            let runtime = runtime::Builder::new_current_thread()
43                .enable_all()
44                .build()
45                .unwrap();
46
47            let recorder = {
48                let _g = runtime.enter();
49                self.build()
50            };
51
52            recorder
53        };
54
55        let handle = recorder.get_handle();
56        metrics::set_global_recorder(recorder)?;
57        Ok(handle)
58    }
59
60    /// No intended for normal use, only for manually installing the recorder.
61    pub fn build(&self) -> PlotlyRecorder {
62        PlotlyRecorder::new()
63    }
64}
65
66pub struct PlotlyRecorder {
67    state: Arc<Mutex<State>>,
68    handle: Option<PlotlyRecorderHandle>,
69}
70
71impl PlotlyRecorder {
72    fn new() -> Self {
73        let state = Arc::new(Mutex::new(State::new()));
74        let (tx0, rx0) = oneshot::channel();
75        let (tx1, rx1) = oneshot::channel();
76
77        let state2 = state.clone();
78        tokio::spawn(scraper(state2, (rx0, tx1)));
79
80        let handle = PlotlyRecorderHandle {
81            channel: (tx0, rx1),
82        };
83        Self {
84            state,
85            handle: Some(handle),
86        }
87    }
88
89    fn get_handle(&mut self) -> PlotlyRecorderHandle {
90        self.handle.take().unwrap()
91    }
92}
93
94impl Recorder for PlotlyRecorder {
95    fn describe_counter(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
96        // TODO
97    }
98
99    fn describe_gauge(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
100        // TODO
101    }
102
103    fn describe_histogram(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
104        // TODO
105    }
106
107    fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter {
108        let atomic = self.state.lock().unwrap().get_counter(key);
109        Counter::from_arc(atomic)
110    }
111
112    fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge {
113        let atomic = self.state.lock().unwrap().get_gauge(key);
114        Gauge::from_arc(atomic)
115    }
116
117    fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram {
118        let atomic = self.state.lock().unwrap().get_histogram(key);
119        Histogram::from_arc(atomic)
120    }
121}
122
123struct State {
124    counters: HashMap<Key, Arc<AtomicU64>>,
125    gauges: HashMap<Key, Arc<AtomicU64>>,
126    histograms: HashMap<Key, Arc<AtomicBucket<f64>>>,
127}
128
129impl State {
130    fn new() -> Self {
131        Self {
132            counters: HashMap::new(),
133            gauges: HashMap::new(),
134            histograms: HashMap::new(),
135        }
136    }
137
138    fn get_counter(&mut self, key: &Key) -> Arc<AtomicU64> {
139        if let Some(val) = self.counters.get(key) {
140            val.clone()
141        } else {
142            let val = Arc::new(AtomicU64::new(0));
143            self.counters.insert(key.clone(), val.clone());
144            val
145        }
146    }
147
148    fn get_gauge(&mut self, key: &Key) -> Arc<AtomicU64> {
149        if let Some(val) = self.gauges.get(key) {
150            val.clone()
151        } else {
152            let val = Arc::new(AtomicU64::new(0));
153            self.gauges.insert(key.clone(), val.clone());
154            val
155        }
156    }
157
158    fn get_histogram(&mut self, key: &Key) -> Arc<AtomicBucket<f64>> {
159        if let Some(val) = self.histograms.get(key) {
160            val.clone()
161        } else {
162            let val = Arc::new(AtomicBucket::new());
163            self.histograms.insert(key.clone(), val.clone());
164            val
165        }
166    }
167}
168
169pub struct PlotlyRecorderHandle {
170    channel: (Sender<()>, Receiver<DataCollector>),
171}
172
173impl PlotlyRecorderHandle {
174    /// Plot the metrics
175    ///
176    /// Consumes the handle and metrics data for a one-shot plotting of the metrics. Takes a slice
177    /// of `PatternGroup`s which will each be opened in a new browser window.
178    pub async fn plot(self, groups: &[PatternGroup]) {
179        self.channel.0.send(()).unwrap();
180        let res = self.channel.1.await;
181
182        if let Ok(data) = res {
183            plot::plot_data(data, groups);
184        } else {
185            error!("Channel broke in Drop impl.");
186        }
187    }
188}
189
190async fn scraper(state: Arc<Mutex<State>>, (rx, tx): (Receiver<()>, Sender<DataCollector>)) {
191    let mut data = DataCollector::new();
192    // TODO: Configurable scrape time
193    let scrape_interval = Duration::from_millis(1000);
194
195    tokio::select! {
196        _ = async {
197            loop {
198                tokio::time::sleep(scrape_interval).await;
199                scrape_data(&mut data, &state);
200            }
201        } => {
202        }
203        _ = rx => {
204            scrape_data(&mut data, &state);
205        }
206    }
207
208    let _ = tx.send(data);
209}
210
211fn scrape_data(data: &mut DataCollector, state: &Mutex<State>) {
212    data.log_time();
213    let state = state.lock().unwrap();
214    for (key, counter) in state.counters.iter() {
215        let val = counter.load(Ordering::Relaxed);
216        data.push_counter(key, val);
217    }
218
219    for (key, gauge) in state.gauges.iter() {
220        let val = gauge.load(Ordering::Relaxed);
221        data.push_gauge(key, val);
222    }
223
224    for (key, histogram) in state.histograms.iter() {
225        let scale_function = K1::new(TDIGEST_COMPRESSION_FACTOR);
226        let mut tdigest = TDigest::new(scale_function, TDIGEST_MAX_BACKLOG_SIZE);
227        histogram.clear_with(|data| {
228            for d in data {
229                tdigest.insert(*d);
230            }
231        });
232
233        let vals = (
234            tdigest.quantile(0.5),
235            tdigest.quantile(0.9),
236            tdigest.quantile(0.95),
237            tdigest.quantile(0.99),
238        );
239        data.push_histogram(key, vals);
240    }
241}
242
243#[derive(Debug)]
244struct DataCollector {
245    counters: HashMap<Key, Vec<u64>>,
246    gauges: HashMap<Key, Vec<u64>>,
247    histograms: HashMap<Key, Vec<(f64, f64, f64, f64)>>,
248    start: Instant,
249    timestamps: Vec<f64>,
250}
251
252impl DataCollector {
253    fn new() -> Self {
254        let start = Instant::now();
255        Self {
256            counters: HashMap::new(),
257            gauges: HashMap::new(),
258            histograms: HashMap::new(),
259            start,
260            timestamps: vec![],
261        }
262    }
263
264    fn log_time(&mut self) {
265        self.timestamps.push(self.start.elapsed().as_secs_f64());
266    }
267
268    fn push_counter(&mut self, key: &Key, value: u64) {
269        if let Some(vec) = self.counters.get_mut(key) {
270            vec.push(value);
271        } else {
272            self.counters.insert(key.to_owned(), vec![value]);
273        }
274    }
275
276    fn push_gauge(&mut self, key: &Key, value: u64) {
277        if let Some(vec) = self.gauges.get_mut(key) {
278            vec.push(value);
279        } else {
280            self.gauges.insert(key.to_owned(), vec![value]);
281        }
282    }
283
284    fn push_histogram(&mut self, key: &Key, value: (f64, f64, f64, f64)) {
285        if let Some(vec) = self.histograms.get_mut(key) {
286            vec.push(value);
287        } else {
288            self.histograms.insert(key.to_owned(), vec![value]);
289        }
290    }
291
292    fn metrics(&self) -> Vec<&str> {
293        self.counters
294            .keys()
295            .chain(self.gauges.keys())
296            .chain(self.histograms.keys())
297            .map(|key| key.name())
298            .collect()
299    }
300
301    fn get_metric(&self, name: &str) -> Option<MetricKind> {
302        let key = Key::from_name(name.to_owned());
303
304        if let Some(vals) = self.counters.get(&key) {
305            Some(MetricKind::Single(vals.clone()))
306        } else if let Some(vals) = self.gauges.get(&key) {
307            Some(MetricKind::Single(vals.clone()))
308        } else {
309            self.histograms
310                .get(&key)
311                .map(|vals| MetricKind::Quantile(vals.clone()))
312        }
313    }
314}
315
316pub(crate) enum MetricKind {
317    Single(Vec<u64>),
318    Quantile(Vec<(f64, f64, f64, f64)>),
319}