prometheus_remote_write/
lib.rs

1//! Types and utilities for calling Prometheus remote write API endpoints.
2
3/// Special label for the name of a metric.
4pub const LABEL_NAME: &str = "__name__";
5pub const CONTENT_TYPE: &str = "application/x-protobuf";
6pub const HEADER_NAME_REMOTE_WRITE_VERSION: &str = "X-Prometheus-Remote-Write-Version";
7pub const REMOTE_WRITE_VERSION_01: &str = "0.1.0";
8
9/// A write request.
10///
11/// .proto:
12/// ```protobuf
13/// message WriteRequest {
14///   repeated TimeSeries timeseries = 1;
15///   // Cortex uses this field to determine the source of the write request.
16///   // We reserve it to avoid any compatibility issues.
17///   reserved  2;
18
19///   // Prometheus uses this field to send metadata, but this is
20///   // omitted from v1 of the spec as it is experimental.
21///   reserved  3;
22/// }
23/// ```
24#[derive(prost::Message, Clone, PartialEq)]
25pub struct WriteRequest {
26    #[prost(message, repeated, tag = "1")]
27    pub timeseries: Vec<TimeSeries>,
28}
29
30impl WriteRequest {
31    /// Prepare the write request for sending.
32    ///
33    /// Ensures that the request conforms to the specification.
34    /// See https://prometheus.io/docs/concepts/remote_write_spec.
35    pub fn sort(&mut self) {
36        for series in &mut self.timeseries {
37            series.sort_labels_and_samples();
38        }
39    }
40
41    pub fn sorted(mut self) -> Self {
42        self.sort();
43        self
44    }
45
46    /// Encode this write request as a protobuf message.
47    ///
48    /// NOTE: The API requires snappy compression, not a raw protobuf message.
49    pub fn encode_proto3(self) -> Vec<u8> {
50        prost::Message::encode_to_vec(&self.sorted())
51    }
52
53    /// Encode this write request as a snappy-compressed protobuf message.
54    #[cfg(feature = "compression")]
55    pub fn encode_compressed(self) -> Result<Vec<u8>, snap::Error> {
56        snap::raw::Encoder::new().compress_vec(&self.encode_proto3())
57    }
58
59    /// Parse metrics from the Prometheus text format, and convert them into a
60    /// [`WriteRequest`].
61    #[cfg(feature = "parse")]
62    pub fn from_text_format(
63        text: String,
64    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
65        fn samples_to_timeseries(
66            samples: Vec<prometheus_parse::Sample>,
67        ) -> Result<Vec<TimeSeries>, Box<dyn std::error::Error + Send + Sync>> {
68            let mut all_series = std::collections::HashMap::<String, TimeSeries>::new();
69
70            for sample in &samples {
71                let mut labels = sample
72                    .labels
73                    .iter()
74                    .map(|(k, v)| (k.as_str(), v.as_str()))
75                    .collect::<Vec<_>>();
76
77                labels.push((LABEL_NAME, sample.metric.as_str()));
78
79                labels.sort_by(|a, b| a.0.cmp(b.0));
80
81                let mut ident = sample.metric.clone();
82                ident.push_str("_$$_");
83                for (k, v) in &labels {
84                    ident.push_str(k);
85                    ident.push('=');
86                    ident.push_str(v);
87                }
88
89                let series = all_series.entry(ident).or_insert_with(|| {
90                    let labels = labels
91                        .iter()
92                        .map(|(k, v)| Label {
93                            name: k.to_string(),
94                            value: v.to_string(),
95                        })
96                        .collect::<Vec<_>>();
97
98                    TimeSeries {
99                        labels,
100                        samples: vec![],
101                    }
102                });
103
104                let value = match sample.value {
105                    prometheus_parse::Value::Counter(v) => v,
106                    prometheus_parse::Value::Gauge(v) => v,
107                    prometheus_parse::Value::Histogram(_) => {
108                        Err("histogram not supported yet".to_string())?
109                    }
110                    prometheus_parse::Value::Summary(_) => {
111                        Err("summary not supported yet".to_string())?
112                    }
113                    prometheus_parse::Value::Untyped(v) => v,
114                };
115
116                series.samples.push(Sample {
117                    value,
118                    timestamp: sample.timestamp.timestamp_millis(),
119                });
120            }
121
122            Ok(all_series.into_values().collect())
123        }
124
125        let iter = text.trim().lines().map(|x| Ok(x.to_string()));
126        let parsed = prometheus_parse::Scrape::parse(iter)
127            .map_err(|err| format!("could not parse input as Prometheus text format: {err}"))?;
128
129        let mut series = samples_to_timeseries(parsed.samples)?;
130        series.sort_by(|a, b| {
131            let name_a = a.labels.iter().find(|x| x.name == LABEL_NAME).unwrap();
132            let name_b = b.labels.iter().find(|x| x.name == LABEL_NAME).unwrap();
133            name_a.value.cmp(&name_b.value)
134        });
135
136        let s = Self { timeseries: series };
137
138        Ok(s.sorted())
139    }
140
141    /// Build a fully prepared HTTP request that an be sent to a remote write endpoint.
142    #[cfg(feature = "http")]
143    pub fn build_http_request(
144        self,
145        endpoint: &url::Url,
146        user_agent: &str,
147    ) -> Result<http::Request<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> {
148        let req = http::Request::builder()
149            .method(http::Method::POST)
150            .uri(endpoint.as_str())
151            .header(http::header::CONTENT_TYPE, CONTENT_TYPE)
152            .header(HEADER_NAME_REMOTE_WRITE_VERSION, REMOTE_WRITE_VERSION_01)
153            .header(http::header::CONTENT_ENCODING, "snappy")
154            .header(http::header::USER_AGENT, user_agent)
155            .body(self.encode_compressed()?)?;
156
157        Ok(req)
158    }
159}
160
161/// A time series.
162///
163/// .proto:
164/// ```protobuf
165/// message TimeSeries {
166///   repeated Label labels   = 1;
167///   repeated Sample samples = 2;
168/// }
169/// ```
170#[derive(prost::Message, Clone, PartialEq)]
171pub struct TimeSeries {
172    #[prost(message, repeated, tag = "1")]
173    pub labels: Vec<Label>,
174    #[prost(message, repeated, tag = "2")]
175    pub samples: Vec<Sample>,
176}
177
178impl TimeSeries {
179    /// Sort labels by name, and the samples by timestamp.
180    ///
181    /// Required by the specification.
182    pub fn sort_labels_and_samples(&mut self) {
183        self.labels.sort_by(|a, b| a.name.cmp(&b.name));
184        self.samples.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
185    }
186}
187
188/// A label.
189///
190/// .proto:
191/// ```protobuf
192/// message Label {
193///   string name  = 1;
194///   string value = 2;
195/// }
196/// ```
197#[derive(prost::Message, Clone, Hash, PartialEq, Eq)]
198pub struct Label {
199    #[prost(string, tag = "1")]
200    pub name: String,
201    #[prost(string, tag = "2")]
202    pub value: String,
203}
204
205/// A sample.
206///
207/// .proto:
208/// ```protobuf
209/// message Sample {
210///   double value    = 1;
211///   int64 timestamp = 2;
212/// }
213/// ```
214#[derive(prost::Message, Clone, PartialEq)]
215pub struct Sample {
216    #[prost(double, tag = "1")]
217    pub value: f64,
218    #[prost(int64, tag = "2")]
219    pub timestamp: i64,
220}
221
222#[cfg(all(feature = "parse", feature = "compression"))]
223#[cfg(test)]
224mod tests {
225    use pretty_assertions::assert_eq;
226
227    use super::*;
228    #[test]
229    fn test_name() {
230        let input = r#"
231# TYPE mycounter counter
232# TYPE mygauge gauge
233
234mygauge 100 100
235http_requests_total{method="post",code="200"} 1027 1395066363000
236mycounter 100 100
237alpha 10 1000
238http_requests_total{method="post",code="200"} 50 1000
239    "#;
240
241        let req = WriteRequest::from_text_format(input.to_string()).unwrap();
242
243        assert_eq!(
244            req,
245            WriteRequest {
246                timeseries: vec![
247                    TimeSeries {
248                        labels: vec![Label {
249                            name: LABEL_NAME.to_string(),
250                            value: "alpha".to_string()
251                        },],
252                        samples: vec![Sample {
253                            value: 10.0,
254                            timestamp: 1000,
255                        },]
256                    },
257                    TimeSeries {
258                        labels: vec![
259                            Label {
260                                name: LABEL_NAME.to_string(),
261                                value: "http_requests_total".to_string()
262                            },
263                            Label {
264                                name: "code".to_string(),
265                                value: "200".to_string()
266                            },
267                            Label {
268                                name: "method".to_string(),
269                                value: "post".to_string()
270                            },
271                        ],
272                        samples: vec![
273                            Sample {
274                                value: 50.0,
275                                timestamp: 1000,
276                            },
277                            Sample {
278                                value: 1027.0,
279                                timestamp: 1395066363000
280                            },
281                        ]
282                    },
283                    TimeSeries {
284                        labels: vec![Label {
285                            name: LABEL_NAME.to_string(),
286                            value: "mycounter".to_string()
287                        },],
288                        samples: vec![Sample {
289                            value: 100.0,
290                            timestamp: 100,
291                        }],
292                    },
293                    TimeSeries {
294                        labels: vec![Label {
295                            name: LABEL_NAME.to_string(),
296                            value: "mygauge".to_string()
297                        },],
298                        samples: vec![Sample {
299                            value: 100.0,
300                            timestamp: 100,
301                        }],
302                    },
303                ]
304            }
305        );
306
307        let _x = req.clone().encode_proto3();
308        let _y = req.encode_compressed();
309    }
310}