ibc_telemetry/
encoder.rs

1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3use std::collections::BTreeMap;
4use std::io::{self, Write};
5
6use prometheus::proto::{self, MetricFamily, MetricType};
7use prometheus::{Encoder, Error, Result};
8
9use serde::Serialize;
10
11/// The JSON format of metric family.
12pub const JSON_FORMAT: &str = "application/json";
13
14const POSITIVE_INF: &str = "+Inf";
15const QUANTILE: &str = "quantile";
16
17/// An implementation of an [`Encoder`] that converts a [`MetricFamily`] proto message
18/// into JSON format.
19#[derive(Debug, Default)]
20pub struct JsonEncoder;
21
22#[derive(Default, Serialize)]
23struct JsonFamilies {
24    families: Vec<JsonFamily>,
25}
26
27#[derive(Serialize)]
28struct JsonFamily {
29    name: String,
30    desc: String,
31    r#type: String,
32    metrics: Vec<JsonMetric>,
33}
34
35#[derive(Serialize)]
36#[serde(tag = "type", rename_all = "lowercase")]
37enum JsonMetric {
38    Counter(Sample),
39    Gauge(Sample),
40    Histogram {
41        buckets: Vec<Sample>,
42        sum: Sample,
43        count: Sample,
44    },
45    Summary {
46        samples: Vec<Sample>,
47        count: Sample,
48        sum: Sample,
49    },
50}
51
52#[derive(Serialize)]
53struct Sample {
54    name: String,
55    value: f64,
56    timestamp: Option<i64>,
57    labels: BTreeMap<String, String>,
58}
59
60impl JsonEncoder {
61    /// Create a new text encoder.
62    pub fn new() -> Self {
63        Self
64    }
65
66    fn encode_json(&self, metric_families: &[MetricFamily]) -> Result<JsonFamilies> {
67        let mut families = JsonFamilies::default();
68
69        for mf in metric_families {
70            let name = mf.get_name();
71            let metric_type = mf.get_field_type();
72
73            let mut family = JsonFamily {
74                name: name.to_string(),
75                desc: mf.get_help().to_string(),
76                r#type: format!("{metric_type:?}").to_lowercase(),
77                metrics: Vec::default(),
78            };
79
80            for m in mf.get_metric() {
81                match metric_type {
82                    MetricType::COUNTER => {
83                        let sample = get_sample(name, None, m, None, m.get_counter().get_value())?;
84                        family.metrics.push(JsonMetric::Counter(sample));
85                    }
86                    MetricType::GAUGE => {
87                        let sample = get_sample(name, None, m, None, m.get_gauge().get_value())?;
88                        family.metrics.push(JsonMetric::Gauge(sample));
89                    }
90                    MetricType::HISTOGRAM => {
91                        let h = m.get_histogram();
92
93                        let mut buckets = Vec::new();
94                        let mut inf_seen = false;
95
96                        for b in h.get_bucket() {
97                            let upper_bound = b.get_upper_bound();
98
99                            let bucket = get_sample(
100                                name,
101                                Some("_bucket"),
102                                m,
103                                Some(("le", &upper_bound.to_string())),
104                                b.get_cumulative_count() as f64,
105                            )?;
106
107                            buckets.push(bucket);
108
109                            if upper_bound.is_sign_positive() && upper_bound.is_infinite() {
110                                inf_seen = true;
111                            }
112                        }
113
114                        if !inf_seen {
115                            let bucket = get_sample(
116                                name,
117                                Some("_bucket"),
118                                m,
119                                Some(("le", POSITIVE_INF)),
120                                h.get_sample_count() as f64,
121                            )?;
122
123                            buckets.push(bucket);
124                        }
125
126                        let sum = get_sample(name, Some("_sum"), m, None, h.get_sample_sum())?;
127                        let count =
128                            get_sample(name, Some("_count"), m, None, h.get_sample_count() as f64)?;
129
130                        family.metrics.push(JsonMetric::Histogram {
131                            buckets,
132                            sum,
133                            count,
134                        });
135                    }
136                    MetricType::SUMMARY => {
137                        let s = m.get_summary();
138
139                        let mut samples = Vec::new();
140                        for q in s.get_quantile() {
141                            let sample = get_sample(
142                                name,
143                                None,
144                                m,
145                                Some((QUANTILE, &q.get_quantile().to_string())),
146                                q.get_value(),
147                            )?;
148
149                            samples.push(sample);
150                        }
151
152                        let sum = get_sample(name, Some("_sum"), m, None, s.get_sample_sum())?;
153                        let count =
154                            get_sample(name, Some("_count"), m, None, s.get_sample_count() as f64)?;
155
156                        family.metrics.push(JsonMetric::Summary {
157                            samples,
158                            count,
159                            sum,
160                        });
161                    }
162                    MetricType::UNTYPED => {
163                        unimplemented!();
164                    }
165                }
166            }
167
168            families.families.push(family);
169        }
170
171        Ok(families)
172    }
173}
174
175impl Encoder for JsonEncoder {
176    fn encode<W: Write>(&self, metric_families: &[MetricFamily], writer: &mut W) -> Result<()> {
177        let json = self.encode_json(metric_families)?;
178
179        serde_json::to_writer(writer, &json).map_err(|e| Error::Io(io::Error::other(e)))
180    }
181
182    fn format_type(&self) -> &str {
183        JSON_FORMAT
184    }
185}
186
187/// `write_sample` writes a single sample in text format to `writer`, given the
188/// metric name, an optional metric name postfix, the metric proto message
189/// itself, optionally an additional label name and value (use empty strings if
190/// not required), and the value. The function returns the number of bytes
191/// written and any error encountered.
192fn get_sample(
193    name: &str,
194    name_postfix: Option<&str>,
195    mc: &proto::Metric,
196    additional_label: Option<(&str, &str)>,
197    value: f64,
198) -> Result<Sample> {
199    let mut name = name.to_string();
200    if let Some(postfix) = name_postfix {
201        name.push_str(postfix);
202    }
203
204    let labels = label_pairs_to_text(mc.get_label(), additional_label)?;
205    let timestamp = Some(mc.get_timestamp_ms()).filter(|&ts| ts != 0);
206
207    Ok(Sample {
208        name,
209        labels,
210        value,
211        timestamp,
212    })
213}
214
215fn label_pairs_to_text(
216    pairs: &[proto::LabelPair],
217    additional_label: Option<(&str, &str)>,
218) -> Result<BTreeMap<String, String>> {
219    if pairs.is_empty() && additional_label.is_none() {
220        return Ok(BTreeMap::default());
221    }
222
223    let mut labels = BTreeMap::new();
224    for lp in pairs {
225        labels.insert(lp.get_name().to_string(), lp.get_value().to_string());
226    }
227
228    if let Some((name, value)) = additional_label {
229        labels.insert(name.to_string(), value.to_string());
230    }
231
232    Ok(labels)
233}