1use std::{collections::HashMap, time::SystemTime};
2
3use prometheus::proto::MetricFamily;
4use reqwest::Client;
5
6pub const LABEL_NAME: &str = "__name__";
8pub const CONTENT_TYPE: &str = "application/x-protobuf";
9pub const HEADER_NAME_REMOTE_WRITE_VERSION: &str = "X-Prometheus-Remote-Write-Version";
10pub const REMOTE_WRITE_VERSION_01: &str = "0.1.0";
11pub const COUNT_SUFFIX: &str = "_count";
12pub const SUM_SUFFIX: &str = "_sum";
13pub const TOTAL_SUFFIX: &str = "_total";
14
15#[derive(prost::Message, Clone, Hash, PartialEq, Eq)]
25pub struct Label {
26 #[prost(string, tag = "1")]
27 pub name: String,
28 #[prost(string, tag = "2")]
29 pub value: String,
30}
31
32#[derive(prost::Message, Clone, PartialEq)]
42pub struct Sample {
43 #[prost(double, tag = "1")]
44 pub value: f64,
45 #[prost(int64, tag = "2")]
46 pub timestamp: i64,
47}
48
49pub enum ExtraLabel {
50 LessThan(f64),
51 Quantile(f64),
52}
53
54#[derive(prost::Message, Clone, PartialEq)]
64pub struct TimeSeries {
65 #[prost(message, repeated, tag = "1")]
66 pub labels: Vec<Label>,
67 #[prost(message, repeated, tag = "2")]
68 pub samples: Vec<Sample>,
69}
70
71impl TimeSeries {
72 pub fn sort_labels_and_samples(&mut self) {
76 self.labels.sort_by(|a, b| a.name.cmp(&b.name));
77 self.samples.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
78 }
79}
80
81#[derive(prost::Message, Clone, PartialEq)]
97pub struct WriteRequest {
98 #[prost(message, repeated, tag = "1")]
99 pub timeseries: Vec<TimeSeries>,
100}
101
102fn get_timestamp() -> i64 {
103 SystemTime::now()
104 .duration_since(SystemTime::UNIX_EPOCH)
105 .unwrap()
106 .as_millis() as i64
107}
108
109impl WriteRequest {
110 pub fn sort(&mut self) {
115 for series in &mut self.timeseries {
116 series.sort_labels_and_samples();
117 }
118 }
119
120 pub fn sorted(mut self) -> Self {
121 self.sort();
122 self
123 }
124
125 pub fn encode_proto3(self) -> Vec<u8> {
129 prost::Message::encode_to_vec(&self.sorted())
130 }
131
132 pub fn encode_compressed(self) -> Result<Vec<u8>, snap::Error> {
133 snap::raw::Encoder::new().compress_vec(&self.encode_proto3())
134 }
135
136 pub fn from_metric_families(
138 metric_families: Vec<MetricFamily>,
139 custom_labels: Option<Vec<(String, String)>>,
140 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
141 let mut timeseries = Vec::new();
142 let now = get_timestamp();
143 let custom_labels = custom_labels.unwrap_or_default();
144 metric_families
145 .iter()
146 .for_each(|mf| match mf.get_field_type() {
147 prometheus::proto::MetricType::GAUGE => {
148 mf.get_metric().iter().for_each(|m| {
149 let mut labels = m
150 .get_label()
151 .iter()
152 .map(|l| (l.name().to_string(), l.value().to_string()))
153 .collect::<Vec<_>>();
154 labels.push((LABEL_NAME.to_string(), mf.name().to_string()));
155 labels.extend_from_slice(&custom_labels);
156
157 let samples = vec![Sample {
158 value: m.get_gauge().value(),
159 timestamp: now,
160 }];
161
162 timeseries.push(TimeSeries {
163 labels: labels
164 .iter()
165 .map(|(k, v)| Label {
166 name: k.to_string(),
167 value: v.to_string(),
168 })
169 .collect::<Vec<_>>(),
170 samples,
171 });
172 });
173 }
174 prometheus::proto::MetricType::COUNTER => {
175 mf.get_metric().iter().for_each(|m| {
176 let mut labels = m
177 .get_label()
178 .iter()
179 .map(|l| (l.name().to_string(), l.value().to_string()))
180 .collect::<Vec<_>>();
181 labels.push((LABEL_NAME.to_string(), mf.name().to_string()));
182 labels.extend_from_slice(&custom_labels);
183 let samples = vec![Sample {
184 value: m.get_counter().value(),
185 timestamp: now,
186 }];
187
188 timeseries.push(TimeSeries {
189 labels: labels
190 .iter()
191 .map(|(k, v)| Label {
192 name: k.to_string(),
193 value: v.to_string(),
194 })
195 .collect::<Vec<_>>(),
196 samples,
197 });
198 });
199 }
200 prometheus::proto::MetricType::SUMMARY => {
201 mf.get_metric().iter().for_each(|m| {
202 let mut labels = m
203 .get_label()
204 .iter()
205 .map(|l| (l.name().to_string(), l.value().to_string()))
206 .collect::<HashMap<String, String>>();
207 labels.insert(LABEL_NAME.to_string(), mf.name().to_string());
208 custom_labels.iter().for_each(|(k, v)| {
209 labels.insert(k.to_string(), v.to_string());
210 });
211 m.get_summary().get_quantile().iter().for_each(|quantile| {
212 let mut our_labels = labels.clone();
213 our_labels.insert(
214 "quantile".to_string(),
215 quantile.quantile().to_string(),
216 );
217 let samples = vec![Sample {
218 value: quantile.value(),
219 timestamp: now,
220 }];
221 timeseries.push(TimeSeries {
222 labels: our_labels
223 .iter()
224 .map(|(k, v)| Label {
225 name: k.to_string(),
226 value: v.to_string(),
227 })
228 .collect::<Vec<_>>(),
229 samples,
230 });
231 });
232 let mut top_level_labels = labels.clone();
233 top_level_labels.insert(
234 LABEL_NAME.to_string(),
235 format!("{}{}", mf.name(), SUM_SUFFIX),
236 );
237 timeseries.push(TimeSeries {
238 samples: vec![Sample {
239 value: m.get_summary().sample_sum(),
240 timestamp: now,
241 }],
242 labels: top_level_labels
243 .iter()
244 .map(|(k, v)| Label {
245 name: k.to_string(),
246 value: v.to_string(),
247 })
248 .collect(),
249 });
250 top_level_labels.insert(
251 LABEL_NAME.to_string(),
252 format!("{}{}", mf.name(), COUNT_SUFFIX),
253 );
254 timeseries.push(TimeSeries {
255 samples: vec![Sample {
256 value: m.get_summary().sample_count() as f64,
257 timestamp: now,
258 }],
259 labels: top_level_labels
260 .iter()
261 .map(|(k, v)| Label {
262 name: k.to_string(),
263 value: v.to_string(),
264 })
265 .collect(),
266 });
267 });
268 }
269 prometheus::proto::MetricType::UNTYPED => {}
270 prometheus::proto::MetricType::HISTOGRAM => {
271 mf.get_metric().iter().for_each(|m| {
272 let mut labels = m
273 .get_label()
274 .iter()
275 .map(|l| (l.name().to_string(), l.value().to_string()))
276 .collect::<HashMap<String, String>>();
277 labels.insert(LABEL_NAME.to_string(), mf.name().to_string());
278 custom_labels.iter().for_each(|(k, v)| {
279 labels.insert(k.to_string(), v.to_string());
280 });
281 m.get_histogram().get_bucket().iter().for_each(|bucket| {
282 let mut our_labels = labels.clone();
283 our_labels
284 .insert("le".to_string(), bucket.upper_bound().to_string());
285 let samples = vec![Sample {
286 value: bucket.cumulative_count() as f64,
287 timestamp: now,
288 }];
289 timeseries.push(TimeSeries {
290 labels: our_labels
291 .iter()
292 .map(|(k, v)| Label {
293 name: k.to_string(),
294 value: v.to_string(),
295 })
296 .collect::<Vec<_>>(),
297 samples,
298 });
299 });
300 let mut top_level_labels = labels.clone();
301 top_level_labels.insert(
302 LABEL_NAME.to_string(),
303 format!("{}{}", mf.name(), SUM_SUFFIX),
304 );
305 timeseries.push(TimeSeries {
306 samples: vec![Sample {
307 value: m.get_histogram().get_sample_sum(),
308 timestamp: now,
309 }],
310 labels: top_level_labels
311 .iter()
312 .map(|(k, v)| Label {
313 name: k.to_string(),
314 value: v.to_string(),
315 })
316 .collect(),
317 });
318 top_level_labels.insert(
319 LABEL_NAME.to_string(),
320 format!("{}{}", mf.name(), COUNT_SUFFIX),
321 );
322 timeseries.push(TimeSeries {
323 samples: vec![Sample {
324 value: m.get_histogram().get_sample_count() as f64,
325 timestamp: now,
326 }],
327 labels: top_level_labels
328 .iter()
329 .map(|(k, v)| Label {
330 name: k.to_string(),
331 value: v.to_string(),
332 })
333 .collect(),
334 });
335 top_level_labels.insert(LABEL_NAME.to_string(), mf.name().to_string());
336 top_level_labels.insert("le".into(), "+Inf".into());
337 timeseries.push(TimeSeries {
338 samples: vec![Sample {
339 value: m.get_histogram().get_sample_count() as f64,
340 timestamp: now,
341 }],
342 labels: top_level_labels
343 .iter()
344 .map(|(k, v)| Label {
345 name: k.to_string(),
346 value: v.to_string(),
347 })
348 .collect(),
349 });
350 });
351 }
352 });
353 timeseries.sort_by(|a, b| {
354 let name_a = a.labels.iter().find(|l| l.name == LABEL_NAME).unwrap();
355 let name_b = b.labels.iter().find(|l| l.name == LABEL_NAME).unwrap();
356 name_a.value.cmp(&name_b.value)
357 });
358 let s = Self { timeseries };
359 Ok(s.sorted())
360 }
361
362 pub fn build_http_request(
363 self,
364 client: Client,
365 endpoint: &str,
366 user_agent: &str,
367 ) -> Result<reqwest::Request, reqwest::Error> {
368 client
369 .post(endpoint)
370 .header(reqwest::header::CONTENT_TYPE, CONTENT_TYPE)
371 .header(HEADER_NAME_REMOTE_WRITE_VERSION, REMOTE_WRITE_VERSION_01)
372 .header(reqwest::header::CONTENT_ENCODING, "snappy")
373 .header(reqwest::header::USER_AGENT, user_agent)
374 .body(
375 self.encode_compressed()
376 .expect("Failed to compress metrics data"),
377 )
378 .build()
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385 use pretty_assertions::assert_eq;
386 use prometheus::{histogram_opts, Counter, Gauge, Histogram, Registry};
387
388 #[test]
389 pub fn can_encode_counter() {
390 let registry = Registry::new();
391 let counter_name = "my_counter";
392 let help = "an extra description";
393 let counter = Counter::new(counter_name, help).unwrap();
394 registry.register(Box::new(counter.clone())).unwrap();
395 let incremented_by = 5.0;
396 counter.inc_by(incremented_by);
397 let req = WriteRequest::from_metric_families(registry.gather(), None)
398 .expect("Failed to encode counter");
399 assert_eq!(req.timeseries.len(), 1);
400 let entry = req.timeseries.first().unwrap();
401 assert_eq!(entry.labels.len(), 1);
402 assert_eq!(
403 entry
404 .labels
405 .iter()
406 .find(|l| l.name == LABEL_NAME)
407 .unwrap()
408 .value,
409 counter_name
410 );
411 assert_eq!(entry.samples.first().unwrap().value, incremented_by);
412 }
413 #[test]
414 pub fn can_encode_gauge() {
415 let registry = Registry::new();
416 let gauge_name = "my_gauge";
417 let help = "an extra description";
418 let counter = Gauge::new(gauge_name, help).unwrap();
419 registry.register(Box::new(counter.clone())).unwrap();
420 let incremented_by = 5.0;
421 counter.set(incremented_by);
422 let req = WriteRequest::from_metric_families(registry.gather(), None)
423 .expect("Failed to encode gauge");
424 assert_eq!(req.timeseries.len(), 1);
425 let entry = req.timeseries.first().unwrap();
426 assert_eq!(entry.labels.len(), 1);
427 assert_eq!(
428 entry
429 .labels
430 .iter()
431 .find(|l| l.name == LABEL_NAME)
432 .unwrap()
433 .value,
434 gauge_name
435 );
436 assert_eq!(entry.samples.first().unwrap().value, incremented_by);
437 }
438 #[test]
439 pub fn can_encode_histogram() {
440 let registry = Registry::new();
441 let histogram_name = "my_histogram";
442 let help = "an extra description".to_string();
443 let opts = histogram_opts!(histogram_name, help, vec![10.0, 1000.0, 10000.0]);
444 let histogram = Histogram::with_opts(opts).unwrap();
445 registry.register(Box::new(histogram.clone())).unwrap();
446 histogram.observe(5.0);
447 histogram.observe(500.0);
448 histogram.observe(5000.0);
449 histogram.observe(50000.0);
450 let req = WriteRequest::from_metric_families(registry.gather(), None)
451 .expect("Failed to encode histogram");
452 assert_eq!(req.timeseries.len(), 6);
453 let bucket_names: Vec<String> = req
454 .timeseries
455 .clone()
456 .into_iter()
457 .filter_map(|ts| {
458 ts.labels
459 .iter()
460 .find(|l| l.name == "le")
461 .map(|l| l.value.clone())
462 })
463 .collect();
464 assert_eq!(bucket_names, vec!["10", "1000", "10000", "+Inf"]);
465
466 let count_observations = req
467 .timeseries
468 .clone()
469 .iter()
470 .find(|l| {
471 l.labels.iter().any(|label| {
472 label.name == LABEL_NAME
473 && label.value == format!("{}{}", histogram_name, COUNT_SUFFIX)
474 })
475 })
476 .map(|ts| ts.samples.first().unwrap().value)
477 .unwrap();
478 assert_eq!(count_observations, 4.0);
479 let sum_observation = req
480 .timeseries
481 .iter()
482 .find(|l| {
483 l.labels.iter().any(|label| {
484 label.name == LABEL_NAME
485 && label.value == format!("{}{}", histogram_name, SUM_SUFFIX)
486 })
487 })
488 .map(|ts| ts.samples.first().unwrap().value)
489 .unwrap();
490 assert_eq!(sum_observation, 55505.0)
491 }
492 #[test]
493 pub fn can_add_custom_labels() {
494 let registry = Registry::new();
495 let counter_name = "my_counter";
496 let help = "an extra description";
497 let counter = Counter::new(counter_name, help).unwrap();
498 registry.register(Box::new(counter.clone())).unwrap();
499 let incremented_by = 5.0;
500 counter.inc_by(incremented_by);
501 let req = WriteRequest::from_metric_families(
502 registry.gather(),
503 Some(vec![("foo".into(), "bar".into())]),
504 )
505 .expect("Failed to encode counter");
506 assert_eq!(req.timeseries.len(), 1);
507 let entry = req.timeseries.first().unwrap();
508 assert_eq!(entry.labels.len(), 2);
509 assert_eq!(
510 entry
511 .labels
512 .iter()
513 .find(|l| l.name == LABEL_NAME)
514 .unwrap()
515 .value,
516 counter_name
517 );
518 assert_eq!(
519 entry.labels.iter().find(|l| l.name == "foo").unwrap().value,
520 "bar"
521 );
522 assert_eq!(entry.samples.first().unwrap().value, incremented_by);
523 }
524}