1pub const LABEL_NAME: &str = "__name__";
5pub const CONTENT_TYPE: &str = "application/x-protobuf";
6pub const HEADER_NAME_REMOTE_WRITE_VERSION: &str = "X-Prometheus-Remote-Write-Version";
7pub const REMOTE_WRITE_VERSION_01: &str = "0.1.0";
8
9#[derive(prost::Message, Clone, PartialEq)]
25pub struct WriteRequest {
26 #[prost(message, repeated, tag = "1")]
27 pub timeseries: Vec<TimeSeries>,
28}
29
30impl WriteRequest {
31 pub fn sort(&mut self) {
36 for series in &mut self.timeseries {
37 series.sort_labels_and_samples();
38 }
39 }
40
41 pub fn sorted(mut self) -> Self {
42 self.sort();
43 self
44 }
45
46 pub fn encode_proto3(self) -> Vec<u8> {
50 prost::Message::encode_to_vec(&self.sorted())
51 }
52
53 #[cfg(feature = "compression")]
55 pub fn encode_compressed(self) -> Result<Vec<u8>, snap::Error> {
56 snap::raw::Encoder::new().compress_vec(&self.encode_proto3())
57 }
58
59 #[cfg(feature = "parse")]
62 pub fn from_text_format(
63 text: String,
64 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
65 fn samples_to_timeseries(
66 samples: Vec<prometheus_parse::Sample>,
67 ) -> Result<Vec<TimeSeries>, Box<dyn std::error::Error + Send + Sync>> {
68 let mut all_series = std::collections::HashMap::<String, TimeSeries>::new();
69
70 for sample in &samples {
71 let mut labels = sample
72 .labels
73 .iter()
74 .map(|(k, v)| (k.as_str(), v.as_str()))
75 .collect::<Vec<_>>();
76
77 labels.push((LABEL_NAME, sample.metric.as_str()));
78
79 labels.sort_by(|a, b| a.0.cmp(b.0));
80
81 let mut ident = sample.metric.clone();
82 ident.push_str("_$$_");
83 for (k, v) in &labels {
84 ident.push_str(k);
85 ident.push('=');
86 ident.push_str(v);
87 }
88
89 let series = all_series.entry(ident).or_insert_with(|| {
90 let labels = labels
91 .iter()
92 .map(|(k, v)| Label {
93 name: k.to_string(),
94 value: v.to_string(),
95 })
96 .collect::<Vec<_>>();
97
98 TimeSeries {
99 labels,
100 samples: vec![],
101 }
102 });
103
104 let value = match sample.value {
105 prometheus_parse::Value::Counter(v) => v,
106 prometheus_parse::Value::Gauge(v) => v,
107 prometheus_parse::Value::Histogram(_) => {
108 Err("histogram not supported yet".to_string())?
109 }
110 prometheus_parse::Value::Summary(_) => {
111 Err("summary not supported yet".to_string())?
112 }
113 prometheus_parse::Value::Untyped(v) => v,
114 };
115
116 series.samples.push(Sample {
117 value,
118 timestamp: sample.timestamp.timestamp_millis(),
119 });
120 }
121
122 Ok(all_series.into_values().collect())
123 }
124
125 let iter = text.trim().lines().map(|x| Ok(x.to_string()));
126 let parsed = prometheus_parse::Scrape::parse(iter)
127 .map_err(|err| format!("could not parse input as Prometheus text format: {err}"))?;
128
129 let mut series = samples_to_timeseries(parsed.samples)?;
130 series.sort_by(|a, b| {
131 let name_a = a.labels.iter().find(|x| x.name == LABEL_NAME).unwrap();
132 let name_b = b.labels.iter().find(|x| x.name == LABEL_NAME).unwrap();
133 name_a.value.cmp(&name_b.value)
134 });
135
136 let s = Self { timeseries: series };
137
138 Ok(s.sorted())
139 }
140
141 #[cfg(feature = "http")]
143 pub fn build_http_request(
144 self,
145 endpoint: &url::Url,
146 user_agent: &str,
147 ) -> Result<http::Request<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> {
148 let req = http::Request::builder()
149 .method(http::Method::POST)
150 .uri(endpoint.as_str())
151 .header(http::header::CONTENT_TYPE, CONTENT_TYPE)
152 .header(HEADER_NAME_REMOTE_WRITE_VERSION, REMOTE_WRITE_VERSION_01)
153 .header(http::header::CONTENT_ENCODING, "snappy")
154 .header(http::header::USER_AGENT, user_agent)
155 .body(self.encode_compressed()?)?;
156
157 Ok(req)
158 }
159}
160
161#[derive(prost::Message, Clone, PartialEq)]
171pub struct TimeSeries {
172 #[prost(message, repeated, tag = "1")]
173 pub labels: Vec<Label>,
174 #[prost(message, repeated, tag = "2")]
175 pub samples: Vec<Sample>,
176}
177
178impl TimeSeries {
179 pub fn sort_labels_and_samples(&mut self) {
183 self.labels.sort_by(|a, b| a.name.cmp(&b.name));
184 self.samples.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
185 }
186}
187
188#[derive(prost::Message, Clone, Hash, PartialEq, Eq)]
198pub struct Label {
199 #[prost(string, tag = "1")]
200 pub name: String,
201 #[prost(string, tag = "2")]
202 pub value: String,
203}
204
205#[derive(prost::Message, Clone, PartialEq)]
215pub struct Sample {
216 #[prost(double, tag = "1")]
217 pub value: f64,
218 #[prost(int64, tag = "2")]
219 pub timestamp: i64,
220}
221
222#[cfg(all(feature = "parse", feature = "compression"))]
223#[cfg(test)]
224mod tests {
225 use pretty_assertions::assert_eq;
226
227 use super::*;
228 #[test]
229 fn test_name() {
230 let input = r#"
231# TYPE mycounter counter
232# TYPE mygauge gauge
233
234mygauge 100 100
235http_requests_total{method="post",code="200"} 1027 1395066363000
236mycounter 100 100
237alpha 10 1000
238http_requests_total{method="post",code="200"} 50 1000
239 "#;
240
241 let req = WriteRequest::from_text_format(input.to_string()).unwrap();
242
243 assert_eq!(
244 req,
245 WriteRequest {
246 timeseries: vec![
247 TimeSeries {
248 labels: vec![Label {
249 name: LABEL_NAME.to_string(),
250 value: "alpha".to_string()
251 },],
252 samples: vec![Sample {
253 value: 10.0,
254 timestamp: 1000,
255 },]
256 },
257 TimeSeries {
258 labels: vec![
259 Label {
260 name: LABEL_NAME.to_string(),
261 value: "http_requests_total".to_string()
262 },
263 Label {
264 name: "code".to_string(),
265 value: "200".to_string()
266 },
267 Label {
268 name: "method".to_string(),
269 value: "post".to_string()
270 },
271 ],
272 samples: vec![
273 Sample {
274 value: 50.0,
275 timestamp: 1000,
276 },
277 Sample {
278 value: 1027.0,
279 timestamp: 1395066363000
280 },
281 ]
282 },
283 TimeSeries {
284 labels: vec![Label {
285 name: LABEL_NAME.to_string(),
286 value: "mycounter".to_string()
287 },],
288 samples: vec![Sample {
289 value: 100.0,
290 timestamp: 100,
291 }],
292 },
293 TimeSeries {
294 labels: vec![Label {
295 name: LABEL_NAME.to_string(),
296 value: "mygauge".to_string()
297 },],
298 samples: vec![Sample {
299 value: 100.0,
300 timestamp: 100,
301 }],
302 },
303 ]
304 }
305 );
306
307 let _x = req.clone().encode_proto3();
308 let _y = req.encode_compressed();
309 }
310}