prometheus_reqwest_remote_write/
lib.rs

1use std::{collections::HashMap, time::SystemTime};
2
3use prometheus::proto::MetricFamily;
4use reqwest::Client;
5
6/// Special label for the name of a metric.
7pub const LABEL_NAME: &str = "__name__";
8pub const CONTENT_TYPE: &str = "application/x-protobuf";
9pub const HEADER_NAME_REMOTE_WRITE_VERSION: &str = "X-Prometheus-Remote-Write-Version";
10pub const REMOTE_WRITE_VERSION_01: &str = "0.1.0";
11pub const COUNT_SUFFIX: &str = "_count";
12pub const SUM_SUFFIX: &str = "_sum";
13pub const TOTAL_SUFFIX: &str = "_total";
14
15/// A label.
16///
17/// .proto:
18/// ```protobuf
19/// message Label {
20///   string name  = 1;
21///   string value = 2;
22/// }
23/// ```
24#[derive(prost::Message, Clone, Hash, PartialEq, Eq)]
25pub struct Label {
26    #[prost(string, tag = "1")]
27    pub name: String,
28    #[prost(string, tag = "2")]
29    pub value: String,
30}
31
32/// A sample.
33///
34/// .proto:
35/// ```protobuf
36/// message Sample {
37///   double value    = 1;
38///   int64 timestamp = 2;
39/// }
40/// ```
41#[derive(prost::Message, Clone, PartialEq)]
42pub struct Sample {
43    #[prost(double, tag = "1")]
44    pub value: f64,
45    #[prost(int64, tag = "2")]
46    pub timestamp: i64,
47}
48
49pub enum ExtraLabel {
50    LessThan(f64),
51    Quantile(f64),
52}
53
54/// A time series.
55///
56/// .proto:
57/// ```protobuf
58/// message TimeSeries {
59///   repeated Label labels   = 1;
60///   repeated Sample samples = 2;
61/// }
62/// ```
63#[derive(prost::Message, Clone, PartialEq)]
64pub struct TimeSeries {
65    #[prost(message, repeated, tag = "1")]
66    pub labels: Vec<Label>,
67    #[prost(message, repeated, tag = "2")]
68    pub samples: Vec<Sample>,
69}
70
71impl TimeSeries {
72    /// Sort labels by name, and the samples by timestamp.
73    ///
74    /// Required by the specification.
75    pub fn sort_labels_and_samples(&mut self) {
76        self.labels.sort_by(|a, b| a.name.cmp(&b.name));
77        self.samples.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
78    }
79}
80
81/// A write request.
82///
83/// .proto:
84/// ```protobuf
85/// message WriteRequest {
86///   repeated TimeSeries timeseries = 1;
87///   // Cortex uses this field to determine the source of the write request.
88///   // We reserve it to avoid any compatibility issues.
89///   reserved  2;
90
91///   // Prometheus uses this field to send metadata, but this is
92///   // omitted from v1 of the spec as it is experimental.
93///   reserved  3;
94/// }
95/// ```
96#[derive(prost::Message, Clone, PartialEq)]
97pub struct WriteRequest {
98    #[prost(message, repeated, tag = "1")]
99    pub timeseries: Vec<TimeSeries>,
100}
101
102fn get_timestamp() -> i64 {
103    SystemTime::now()
104        .duration_since(SystemTime::UNIX_EPOCH)
105        .unwrap()
106        .as_millis() as i64
107}
108
109impl WriteRequest {
110    /// Prepare the write request for sending.
111    ///
112    /// Ensures that the request conforms to the specification.
113    /// See https://prometheus.io/docs/concepts/remote_write_spec.
114    pub fn sort(&mut self) {
115        for series in &mut self.timeseries {
116            series.sort_labels_and_samples();
117        }
118    }
119
120    pub fn sorted(mut self) -> Self {
121        self.sort();
122        self
123    }
124
125    /// Encode this write request as a protobuf message.
126    ///
127    /// NOTE: The API requires snappy compression, not a raw protobuf message.
128    pub fn encode_proto3(self) -> Vec<u8> {
129        prost::Message::encode_to_vec(&self.sorted())
130    }
131
132    pub fn encode_compressed(self) -> Result<Vec<u8>, snap::Error> {
133        snap::raw::Encoder::new().compress_vec(&self.encode_proto3())
134    }
135
136    /// Encode Prometheus metric families into a WriteRequest
137    pub fn from_metric_families(
138        metric_families: Vec<MetricFamily>,
139        custom_labels: Option<Vec<(String, String)>>,
140    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
141        let mut timeseries = Vec::new();
142        let now = get_timestamp();
143        let custom_labels = custom_labels.unwrap_or_default();
144        metric_families
145            .iter()
146            .for_each(|mf| match mf.get_field_type() {
147                prometheus::proto::MetricType::GAUGE => {
148                    mf.get_metric().iter().for_each(|m| {
149                        let mut labels = m
150                            .get_label()
151                            .iter()
152                            .map(|l| (l.name().to_string(), l.value().to_string()))
153                            .collect::<Vec<_>>();
154                        labels.push((LABEL_NAME.to_string(), mf.name().to_string()));
155                        labels.extend_from_slice(&custom_labels);
156
157                        let samples = vec![Sample {
158                            value: m.get_gauge().value(),
159                            timestamp: now,
160                        }];
161
162                        timeseries.push(TimeSeries {
163                            labels: labels
164                                .iter()
165                                .map(|(k, v)| Label {
166                                    name: k.to_string(),
167                                    value: v.to_string(),
168                                })
169                                .collect::<Vec<_>>(),
170                            samples,
171                        });
172                    });
173                }
174                prometheus::proto::MetricType::COUNTER => {
175                    mf.get_metric().iter().for_each(|m| {
176                        let mut labels = m
177                            .get_label()
178                            .iter()
179                            .map(|l| (l.name().to_string(), l.value().to_string()))
180                            .collect::<Vec<_>>();
181                        labels.push((LABEL_NAME.to_string(), mf.name().to_string()));
182                        labels.extend_from_slice(&custom_labels);
183                        let samples = vec![Sample {
184                            value: m.get_counter().value(),
185                            timestamp: now,
186                        }];
187
188                        timeseries.push(TimeSeries {
189                            labels: labels
190                                .iter()
191                                .map(|(k, v)| Label {
192                                    name: k.to_string(),
193                                    value: v.to_string(),
194                                })
195                                .collect::<Vec<_>>(),
196                            samples,
197                        });
198                    });
199                }
200                prometheus::proto::MetricType::SUMMARY => {
201                    mf.get_metric().iter().for_each(|m| {
202                        let mut labels = m
203                            .get_label()
204                            .iter()
205                            .map(|l| (l.name().to_string(), l.value().to_string()))
206                            .collect::<HashMap<String, String>>();
207                        labels.insert(LABEL_NAME.to_string(), mf.name().to_string());
208                        custom_labels.iter().for_each(|(k, v)| {
209                            labels.insert(k.to_string(), v.to_string());
210                        });
211                        m.get_summary().get_quantile().iter().for_each(|quantile| {
212                            let mut our_labels = labels.clone();
213                            our_labels.insert(
214                                "quantile".to_string(),
215                                quantile.quantile().to_string(),
216                            );
217                            let samples = vec![Sample {
218                                value: quantile.value(),
219                                timestamp: now,
220                            }];
221                            timeseries.push(TimeSeries {
222                                labels: our_labels
223                                    .iter()
224                                    .map(|(k, v)| Label {
225                                        name: k.to_string(),
226                                        value: v.to_string(),
227                                    })
228                                    .collect::<Vec<_>>(),
229                                samples,
230                            });
231                        });
232                        let mut top_level_labels = labels.clone();
233                        top_level_labels.insert(
234                            LABEL_NAME.to_string(),
235                            format!("{}{}", mf.name(), SUM_SUFFIX),
236                        );
237                        timeseries.push(TimeSeries {
238                            samples: vec![Sample {
239                                value: m.get_summary().sample_sum(),
240                                timestamp: now,
241                            }],
242                            labels: top_level_labels
243                                .iter()
244                                .map(|(k, v)| Label {
245                                    name: k.to_string(),
246                                    value: v.to_string(),
247                                })
248                                .collect(),
249                        });
250                        top_level_labels.insert(
251                            LABEL_NAME.to_string(),
252                            format!("{}{}", mf.name(), COUNT_SUFFIX),
253                        );
254                        timeseries.push(TimeSeries {
255                            samples: vec![Sample {
256                                value: m.get_summary().sample_count() as f64,
257                                timestamp: now,
258                            }],
259                            labels: top_level_labels
260                                .iter()
261                                .map(|(k, v)| Label {
262                                    name: k.to_string(),
263                                    value: v.to_string(),
264                                })
265                                .collect(),
266                        });
267                    });
268                }
269                prometheus::proto::MetricType::UNTYPED => {}
270                prometheus::proto::MetricType::HISTOGRAM => {
271                    mf.get_metric().iter().for_each(|m| {
272                        let mut labels = m
273                            .get_label()
274                            .iter()
275                            .map(|l| (l.name().to_string(), l.value().to_string()))
276                            .collect::<HashMap<String, String>>();
277                        labels.insert(LABEL_NAME.to_string(), mf.name().to_string());
278                        custom_labels.iter().for_each(|(k, v)| {
279                            labels.insert(k.to_string(), v.to_string());
280                        });
281                        m.get_histogram().get_bucket().iter().for_each(|bucket| {
282                            let mut our_labels = labels.clone();
283                            our_labels
284                                .insert("le".to_string(), bucket.upper_bound().to_string());
285                            let samples = vec![Sample {
286                                value: bucket.cumulative_count() as f64,
287                                timestamp: now,
288                            }];
289                            timeseries.push(TimeSeries {
290                                labels: our_labels
291                                    .iter()
292                                    .map(|(k, v)| Label {
293                                        name: k.to_string(),
294                                        value: v.to_string(),
295                                    })
296                                    .collect::<Vec<_>>(),
297                                samples,
298                            });
299                        });
300                        let mut top_level_labels = labels.clone();
301                        top_level_labels.insert(
302                            LABEL_NAME.to_string(),
303                            format!("{}{}", mf.name(), SUM_SUFFIX),
304                        );
305                        timeseries.push(TimeSeries {
306                            samples: vec![Sample {
307                                value: m.get_histogram().get_sample_sum(),
308                                timestamp: now,
309                            }],
310                            labels: top_level_labels
311                                .iter()
312                                .map(|(k, v)| Label {
313                                    name: k.to_string(),
314                                    value: v.to_string(),
315                                })
316                                .collect(),
317                        });
318                        top_level_labels.insert(
319                            LABEL_NAME.to_string(),
320                            format!("{}{}", mf.name(), COUNT_SUFFIX),
321                        );
322                        timeseries.push(TimeSeries {
323                            samples: vec![Sample {
324                                value: m.get_histogram().get_sample_count() as f64,
325                                timestamp: now,
326                            }],
327                            labels: top_level_labels
328                                .iter()
329                                .map(|(k, v)| Label {
330                                    name: k.to_string(),
331                                    value: v.to_string(),
332                                })
333                                .collect(),
334                        });
335                        top_level_labels.insert(LABEL_NAME.to_string(), mf.name().to_string());
336                        top_level_labels.insert("le".into(), "+Inf".into());
337                        timeseries.push(TimeSeries {
338                            samples: vec![Sample {
339                                value: m.get_histogram().get_sample_count() as f64,
340                                timestamp: now,
341                            }],
342                            labels: top_level_labels
343                                .iter()
344                                .map(|(k, v)| Label {
345                                    name: k.to_string(),
346                                    value: v.to_string(),
347                                })
348                                .collect(),
349                        });
350                    });
351                }
352            });
353        timeseries.sort_by(|a, b| {
354            let name_a = a.labels.iter().find(|l| l.name == LABEL_NAME).unwrap();
355            let name_b = b.labels.iter().find(|l| l.name == LABEL_NAME).unwrap();
356            name_a.value.cmp(&name_b.value)
357        });
358        let s = Self { timeseries };
359        Ok(s.sorted())
360    }
361
362    pub fn build_http_request(
363        self,
364        client: Client,
365        endpoint: &str,
366        user_agent: &str,
367    ) -> Result<reqwest::Request, reqwest::Error> {
368        client
369            .post(endpoint)
370            .header(reqwest::header::CONTENT_TYPE, CONTENT_TYPE)
371            .header(HEADER_NAME_REMOTE_WRITE_VERSION, REMOTE_WRITE_VERSION_01)
372            .header(reqwest::header::CONTENT_ENCODING, "snappy")
373            .header(reqwest::header::USER_AGENT, user_agent)
374            .body(
375                self.encode_compressed()
376                    .expect("Failed to compress metrics data"),
377            )
378            .build()
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385    use pretty_assertions::assert_eq;
386    use prometheus::{histogram_opts, Counter, Gauge, Histogram, Registry};
387
388    #[test]
389    pub fn can_encode_counter() {
390        let registry = Registry::new();
391        let counter_name = "my_counter";
392        let help = "an extra description";
393        let counter = Counter::new(counter_name, help).unwrap();
394        registry.register(Box::new(counter.clone())).unwrap();
395        let incremented_by = 5.0;
396        counter.inc_by(incremented_by);
397        let req = WriteRequest::from_metric_families(registry.gather(), None)
398            .expect("Failed to encode counter");
399        assert_eq!(req.timeseries.len(), 1);
400        let entry = req.timeseries.first().unwrap();
401        assert_eq!(entry.labels.len(), 1);
402        assert_eq!(
403            entry
404                .labels
405                .iter()
406                .find(|l| l.name == LABEL_NAME)
407                .unwrap()
408                .value,
409            counter_name
410        );
411        assert_eq!(entry.samples.first().unwrap().value, incremented_by);
412    }
413    #[test]
414    pub fn can_encode_gauge() {
415        let registry = Registry::new();
416        let gauge_name = "my_gauge";
417        let help = "an extra description";
418        let counter = Gauge::new(gauge_name, help).unwrap();
419        registry.register(Box::new(counter.clone())).unwrap();
420        let incremented_by = 5.0;
421        counter.set(incremented_by);
422        let req = WriteRequest::from_metric_families(registry.gather(), None)
423            .expect("Failed to encode gauge");
424        assert_eq!(req.timeseries.len(), 1);
425        let entry = req.timeseries.first().unwrap();
426        assert_eq!(entry.labels.len(), 1);
427        assert_eq!(
428            entry
429                .labels
430                .iter()
431                .find(|l| l.name == LABEL_NAME)
432                .unwrap()
433                .value,
434            gauge_name
435        );
436        assert_eq!(entry.samples.first().unwrap().value, incremented_by);
437    }
438    #[test]
439    pub fn can_encode_histogram() {
440        let registry = Registry::new();
441        let histogram_name = "my_histogram";
442        let help = "an extra description".to_string();
443        let opts = histogram_opts!(histogram_name, help, vec![10.0, 1000.0, 10000.0]);
444        let histogram = Histogram::with_opts(opts).unwrap();
445        registry.register(Box::new(histogram.clone())).unwrap();
446        histogram.observe(5.0);
447        histogram.observe(500.0);
448        histogram.observe(5000.0);
449        histogram.observe(50000.0);
450        let req = WriteRequest::from_metric_families(registry.gather(), None)
451            .expect("Failed to encode histogram");
452        assert_eq!(req.timeseries.len(), 6);
453        let bucket_names: Vec<String> = req
454            .timeseries
455            .clone()
456            .into_iter()
457            .filter_map(|ts| {
458                ts.labels
459                    .iter()
460                    .find(|l| l.name == "le")
461                    .map(|l| l.value.clone())
462            })
463            .collect();
464        assert_eq!(bucket_names, vec!["10", "1000", "10000", "+Inf"]);
465
466        let count_observations = req
467            .timeseries
468            .clone()
469            .iter()
470            .find(|l| {
471                l.labels.iter().any(|label| {
472                    label.name == LABEL_NAME
473                        && label.value == format!("{}{}", histogram_name, COUNT_SUFFIX)
474                })
475            })
476            .map(|ts| ts.samples.first().unwrap().value)
477            .unwrap();
478        assert_eq!(count_observations, 4.0);
479        let sum_observation = req
480            .timeseries
481            .iter()
482            .find(|l| {
483                l.labels.iter().any(|label| {
484                    label.name == LABEL_NAME
485                        && label.value == format!("{}{}", histogram_name, SUM_SUFFIX)
486                })
487            })
488            .map(|ts| ts.samples.first().unwrap().value)
489            .unwrap();
490        assert_eq!(sum_observation, 55505.0)
491    }
492    #[test]
493    pub fn can_add_custom_labels() {
494        let registry = Registry::new();
495        let counter_name = "my_counter";
496        let help = "an extra description";
497        let counter = Counter::new(counter_name, help).unwrap();
498        registry.register(Box::new(counter.clone())).unwrap();
499        let incremented_by = 5.0;
500        counter.inc_by(incremented_by);
501        let req = WriteRequest::from_metric_families(
502            registry.gather(),
503            Some(vec![("foo".into(), "bar".into())]),
504        )
505        .expect("Failed to encode counter");
506        assert_eq!(req.timeseries.len(), 1);
507        let entry = req.timeseries.first().unwrap();
508        assert_eq!(entry.labels.len(), 2);
509        assert_eq!(
510            entry
511                .labels
512                .iter()
513                .find(|l| l.name == LABEL_NAME)
514                .unwrap()
515                .value,
516            counter_name
517        );
518        assert_eq!(
519            entry.labels.iter().find(|l| l.name == "foo").unwrap().value,
520            "bar"
521        );
522        assert_eq!(entry.samples.first().unwrap().value, incremented_by);
523    }
524}