prometheus_http_api/lib.rs
1//! Prometheus HTTP API
2//!
3//! This crate provides data structures to interact with the prometheus HTTP API endpoints. The
4//! crate allows constructing prometheus data sources with [`DataSourceBuilder`].
5//!
6//! A [`Query`] must be provided to the prometheus data source via the
7//! [`DataSourceBuilder::with_query()`] that acceps either a
8//! [`InstantQuery`] or a [`RangeQuery`], for these types, build-like methods
9//! are provided for the optional parameters.
10//!
11//! ## Simple Usage
12//!
13//! To gather the data from `<http://localhost:9090/api/v1/query?query=up>`
14//!
15//! ```
16//! use prometheus_http_api::{
17//! DataSourceBuilder, InstantQuery, Query,
18//! };
19//!
20//! #[tokio::main]
21//! async fn main() {
22//! let query = Query::Instant(InstantQuery::new("up"));
23//! let request = DataSourceBuilder::new("localhost:9090")
24//! .with_query(query)
25//! .build()
26//! .unwrap();
27//! let res_json = request.get().await;
28//! tracing::info!("{:?}", res_json);
29//! }
30//! ```
31
32#![warn(rust_2018_idioms)]
33#![warn(missing_docs)]
34#![warn(rustdoc::missing_doc_code_examples)]
35use hyper::client::connect::HttpConnector;
36use hyper::client::Client;
37use hyper_tls::HttpsConnector;
38use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
39use serde::{Deserialize, Serialize};
40use std::collections::HashMap;
41use std::time::Duration;
42use thiserror::Error;
43
44/// [`MatrixResult`] contains Prometheus Range Vectors
45/// ```
46/// let matrix_raw_response = hyper::body::Bytes::from(r#"
47/// {
48/// "metric": {
49/// "__name__": "node_load1",
50/// "instance": "localhost:9100",
51/// "job": "node_exporter"
52/// },
53/// "values": [
54/// [1558253469,"1.69"],[1558253470,"1.70"],[1558253471,"1.71"]
55/// ]
56/// }"#);
57/// let res_json: Result<prometheus_http_api::MatrixResult, serde_json::Error> = serde_json::from_slice(&matrix_raw_response);
58/// assert!(res_json.is_ok());
59/// ```
60#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone)]
61pub struct MatrixResult {
62 /// A series of labels for the matrix results. This is a HashMap of `{"label_name_1":
63 /// "value_1", ...}`
64 #[serde(rename = "metric")]
65 pub labels: HashMap<String, String>,
66 /// The values over time captured on prometheus, generally `[[<epoch>, "<value>"]]`
67 pub values: Vec<Vec<serde_json::Value>>,
68}
69
70/// `VectorResult` contains Prometheus Instant Vectors
71/// ```
72/// let vector_raw_response = hyper::body::Bytes::from(r#"
73/// {
74/// "metric": {
75/// "__name__": "up",
76/// "instance": "localhost:9090",
77/// "job": "prometheus"
78/// },
79/// "value": [
80/// 1557571137.732,
81/// "1"
82/// ]
83/// }"#);
84/// let res_json: Result<prometheus_http_api::VectorResult, serde_json::Error> = serde_json::from_slice(&vector_raw_response);
85/// assert!(res_json.is_ok());
86/// ```
87#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone)]
88pub struct VectorResult {
89 /// A series of labels for the matrix results. This is a HashMap of `{"label_name_1":
90 /// "value_1", ...}`
91 #[serde(rename = "metric")]
92 pub labels: HashMap<String, String>,
93 /// The values over time captured on prometheus, generally `[[<epoch>, "<value>"]]`
94 pub value: Vec<serde_json::Value>,
95}
96
97/// Available [`ResponseData`] formats documentation:
98/// `https://prometheus.io/docs/prometheus/latest/querying/api/#expression-query-result-formats`
99/// ```
100/// let scalar_result_type = hyper::body::Bytes::from(r#"{
101/// "resultType":"scalar",
102/// "result":[1558283674.829,"1"]
103/// }"#);
104/// let res_json: Result<prometheus_http_api::ResponseData, serde_json::Error> = serde_json::from_slice(&scalar_result_type);
105/// assert!(res_json.is_ok());
106/// ```
107#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
108#[serde(tag = "resultType")]
109pub enum ResponseData {
110 /// Handles a Response of type [`VectorResult`]
111 #[serde(rename = "vector")]
112 Vector {
113 /// The result vector.
114 result: Vec<VectorResult>,
115 },
116 /// Handles a Response of type [`MatrixResult`]
117 #[serde(rename = "matrix")]
118 Matrix {
119 /// The result vector.
120 result: Vec<MatrixResult>,
121 },
122 /// Handles a scalar result, generally numeric
123 #[serde(rename = "scalar")]
124 Scalar {
125 /// The result vector.
126 result: Vec<serde_json::Value>,
127 },
128 /// Handles a String result, for example for label names
129 #[serde(rename = "string")]
130 String {
131 /// The result vector.
132 result: Vec<serde_json::Value>,
133 },
134}
135
136impl Default for ResponseData {
137 fn default() -> Self {
138 Self::Vector {
139 result: vec![VectorResult::default()],
140 }
141 }
142}
143
144/// A Prometheus [`Response`] returned by an HTTP query.
145/// ```
146/// let full_response = hyper::body::Bytes::from(
147/// r#"
148/// { "status":"success",
149/// "data":{
150/// "resultType":"scalar",
151/// "result":[1558283674.829,"1"]
152/// }
153/// }"#,
154/// );
155/// let res_json: Result<prometheus_http_api::Response, serde_json::Error> = serde_json::from_slice(&full_response);
156/// assert!(res_json.is_ok());
157/// ```
158#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone)]
159pub struct Response {
160 /// A Data response, it may be vector, matrix, scalar or String
161 pub data: ResponseData,
162 /// An status string, either `"success"` or `"error"`
163 pub status: String,
164}
165
166/// An instant query to send to Prometheus
167#[derive(Debug)]
168pub struct InstantQuery {
169 /// expression query string.
170 query: String,
171 /// Evaluation timestamp. Optional.
172 time: Option<u64>,
173 /// Evaluation timeout. Optional. Defaults to and is capped by the value of the -query.timeout flag.
174 timeout: Option<u64>,
175}
176
177impl InstantQuery {
178 /// Initializes an Instant query with optional fields set to None
179 pub fn new(query: &str) -> Self {
180 Self {
181 query: query.to_string(),
182 time: None,
183 timeout: None,
184 }
185 }
186
187 /// Builder method to set the query timeout
188 pub fn with_epoch(mut self, time: u64) -> Self {
189 self.time = Some(time);
190 self
191 }
192
193 /// Builder method to set the query timeout
194 pub fn with_timeout(mut self, timeout: u64) -> Self {
195 self.timeout = Some(timeout);
196 self
197 }
198
199 /// Transforms the typed query into HTTP GET query params, it contains a pre-built `base` that
200 /// may use an HTTP path prefix if configured.
201 pub fn as_query_params(&self, mut base: String) -> String {
202 tracing::trace!("InstantQuery::as_query_params raw query: {}", self.query);
203 let encoded_query = utf8_percent_encode(&self.query, NON_ALPHANUMERIC).to_string();
204 tracing::trace!(
205 "InstantQuery::as_query_params encoded_query: {}",
206 encoded_query
207 );
208 base.push_str(&format!("api/v1/query?query={}", encoded_query));
209 if let Some(time) = self.time {
210 base.push_str(&format!("&time={}", time));
211 }
212 if let Some(timeout) = self.timeout {
213 base.push_str(&format!("&timeout={}", timeout));
214 }
215 base
216 }
217}
218
219/// A range query to send to Prometheus
220#[derive(Debug)]
221pub struct RangeQuery {
222 /// expression query string.
223 pub query: String,
224 /// Start timestamp, inclusive.
225 pub start: u64,
226 /// End timestamp, inclusive.
227 pub end: u64,
228 /// Query resolution step width in duration format or float number of seconds.
229 pub step: f64,
230 /// Evaluation timeout. Optional. Defaults to and is capped by the value of the -query.timeout flag.1
231 pub timeout: Option<u64>,
232}
233
234impl RangeQuery {
235 /// Initializes a Range query with optional fields set to None
236 pub fn new(query: &str, start: u64, end: u64, step: f64) -> Self {
237 Self {
238 query: query.to_string(),
239 start,
240 end,
241 step,
242 timeout: None,
243 }
244 }
245
246 /// Builder method to set the query timeout
247 pub fn with_timeout(mut self, timeout: u64) -> Self {
248 self.timeout = Some(timeout);
249 self
250 }
251
252 /// Transforms the typed query into HTTP GET query params, it contains a pre-built `base` that
253 /// may use an HTTP path prefix if configured.
254 pub fn as_query_params(&self, mut base: String) -> String {
255 tracing::trace!("RangeQuery::as_query_params: raw query: {}", self.query);
256 let encoded_query = utf8_percent_encode(&self.query, NON_ALPHANUMERIC).to_string();
257 tracing::trace!(
258 "RangeQuery::as_query_params encoded_query: {}",
259 encoded_query
260 );
261 base.push_str(&format!(
262 "api/v1/query_range?query={}&start={}&end={}&step={}",
263 encoded_query, self.start, self.end, self.step
264 ));
265 if let Some(timeout) = self.timeout {
266 base.push_str(&format!("&timeout={}", timeout));
267 }
268 base
269 }
270}
271
272/// A query to the Prometheus HTTP API
273#[derive(Debug)]
274pub enum Query {
275 /// Represents an instant query at a single point in time
276 Instant(InstantQuery),
277 /// Represents an expression query over a range of time
278 Range(RangeQuery),
279}
280
281impl Query {
282 /// Transforms the typed query into HTTP GET query params
283 pub fn as_query_params(&self, prefix: Option<String>) -> String {
284 let mut base = if let Some(prefix) = prefix {
285 prefix
286 } else {
287 String::from("/")
288 };
289 if !base.ends_with("/") {
290 base.push_str("/");
291 }
292 match self {
293 Self::Instant(query) => query.as_query_params(base),
294 Self::Range(query) => query.as_query_params(base),
295 }
296 }
297
298 /// Returns the timeout of the prometheus query
299 pub fn get_timeout(&self) -> Option<u64> {
300 match self {
301 Self::Instant(query) => query.timeout,
302 Self::Range(query) => query.timeout,
303 }
304 }
305}
306
307/// A simple Error type to understand different errors.
308#[derive(Error, Debug)]
309pub enum DataSourceError {
310 /// The DataSource request may fail due to an http module request, could happen while
311 /// interacting with the HTTP server.
312 #[error("http error: {0}")]
313 Http(#[from] http::Error),
314 /// The DataSource request building may fail due to invalid schemes, authority, etc.
315 #[error("hyper error: {0}")]
316 Hyper(#[from] hyper::Error),
317 /// The DataSource request may fail due to invalid data returned from the server.
318 #[error("Serde Error: {0}")]
319 Serde(#[from] serde_json::Error),
320 /// The DataSource building process may not specific a query, this is a required field.
321 #[error("Missing query type")]
322 MissingQueryParam,
323}
324
325/// Represents a prometheus data source that works over an http(s) host:port endpoint potentially
326/// behind a /prometheus_prefix/
327#[derive(Debug)]
328pub struct DataSource {
329 /// This should contain the scheme://<authority>/ portion of the URL, the params would be
330 /// appended later.
331 pub authority: String,
332
333 /// Optionally specify if http/https is used. By default 'http'
334 pub scheme: String,
335
336 /// The prefix to reach prometheus on the authority, for example, prometheus may share a
337 /// host:port with grafana, etc, and prometheus would be reached by <authority>/prom/
338 pub prefix: Option<String>,
339
340 /// The query to send to prometheus
341 pub query: Query,
342
343 /// Sets the timeout for the HTTP connection to the prometheus server
344 pub http_timeout: Option<Duration>,
345}
346
347/// A Builder struct to create the [`DataSource`]
348#[derive(Debug)]
349pub struct DataSourceBuilder {
350 /// Allows setting the http://<authority>/ portion of the URL, the query param may be a
351 /// host:port or user:password@host:port or dns/fqdn
352 pub authority: String,
353
354 /// Allows setting the <scheme>://authority/ portion of the URL, currently tested with http and
355 /// https by using hyper_tls
356 pub scheme: Option<String>,
357
358 /// Allows setting the scheme://authority/<prefix>/api/v1/ portion of the URL, useful when
359 /// prometheus shares the same `authority` as other components and the api/v1/query should be
360 /// prefixed with a specific route.
361 pub prefix: Option<String>,
362
363 /// Sets the query parameter
364 pub query: Option<Query>,
365
366 /// Sets the timeout for the HTTP connection to the prometheus server
367 pub http_timeout: Option<Duration>,
368}
369
370impl DataSourceBuilder {
371 /// Initializes the builder for the DataSource, required param is the authority, may contain
372 /// `user:password@host:port`, or `host:port`
373 pub fn new(authority: &str) -> Self {
374 Self {
375 authority: authority.to_string(),
376 scheme: None,
377 prefix: None,
378 query: None,
379 http_timeout: None,
380 }
381 }
382
383 /// Sets the prefix that hosts prometheus, useful when prometheus is behind a shared reverse
384 /// proxy
385 pub fn with_prefix(mut self, prefix: String) -> Self {
386 self.prefix = Some(prefix);
387 self
388 }
389
390 /// Sets the prometheus query param.
391 pub fn with_query(mut self, query: Query) -> Self {
392 self.query = Some(query);
393 self
394 }
395
396 /// Sets the URL scheme, be it http or https
397 pub fn with_scheme(mut self, scheme: String) -> Self {
398 self.scheme = Some(scheme);
399 self
400 }
401
402 /// Builds into DataSource after checking and merging fields
403 pub fn build(self) -> Result<DataSource, DataSourceError> {
404 let query = match self.query {
405 Some(query) => query,
406 None => {
407 tracing::error!("Missing query field in builder");
408 return Err(DataSourceError::MissingQueryParam);
409 }
410 };
411 if let Some(http_timeout) = self.http_timeout {
412 if let Some(query_timeout) = query.get_timeout() {
413 if query_timeout > http_timeout.as_secs() {
414 tracing::warn!("Configured query_timeout is longer than http_timeout. Prometheus query will be dropped by the http client if the query exceeds http_timeout");
415 }
416 }
417 }
418 let scheme = match self.scheme {
419 Some(val) => val,
420 None => String::from("http"),
421 };
422 Ok(DataSource {
423 authority: self.authority,
424 scheme,
425 prefix: self.prefix,
426 query,
427 http_timeout: self.http_timeout,
428 })
429 }
430}
431
432impl DataSource {
433 /// `get` is an async operation that returns potentially a Response
434 pub async fn get(&self) -> Result<Response, DataSourceError> {
435 let url = http::uri::Builder::new()
436 .authority(self.authority.clone())
437 .scheme(self.scheme.as_str())
438 .path_and_query(self.query.as_query_params(self.prefix.clone()))
439 .build()?;
440 tracing::debug!("get() init Prometheus URL: {}", url);
441 let mut client = Client::builder();
442 if let Some(timeout) = self.http_timeout {
443 client.pool_idle_timeout(timeout);
444 }
445 let request = if url.scheme() == Some(&hyper::http::uri::Scheme::HTTP) {
446 tracing::info!("get: Prometheus URL: {}", url);
447 client
448 .build::<_, hyper::Body>(HttpConnector::new())
449 .get(url.clone())
450 } else {
451 client
452 .build::<_, hyper::Body>(HttpsConnector::new())
453 .get(url.clone())
454 };
455 let response_body = match request.await {
456 Ok(res) => hyper::body::to_bytes(res.into_body()).await?,
457 Err(err) => {
458 tracing::info!("get: Error loading '{:?}': '{:?}'", url, err);
459 return Err(err.into());
460 }
461 };
462 tracing::debug!("get() done");
463 tracing::trace!("Deserializing: {:?}", response_body);
464 Ok(serde_json::from_slice(&response_body)?)
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471 fn init_log() {
472 let _ = env_logger::builder().is_test(true).try_init();
473 }
474
475 #[test]
476 fn it_detects_prometheus_errors() {
477 init_log();
478 let test0_json = hyper::body::Bytes::from(
479 r#"
480 {
481 "status": "error",
482 "errorType": "bad_data",
483 "error": "end timestamp must not be before start time"
484 }
485 "#,
486 );
487 let res0_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test0_json);
488 assert!(res0_json.is_err());
489 let test1_json = hyper::body::Bytes::from("Internal Server Error");
490 let res1_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test1_json);
491 assert!(res1_json.is_err());
492 }
493
494 #[test]
495 fn it_loads_prometheus_scalars() {
496 init_log();
497 // A json returned by prometheus
498 let test0_json = hyper::body::Bytes::from(
499 r#"
500 { "status":"success",
501 "data":{
502 "resultType":"scalar",
503 "result":[1558283674.829,"1"]
504 }
505 }"#,
506 );
507 let res_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test0_json);
508 assert!(res_json.is_ok());
509 // This json is missing the value after the epoch
510 let test1_json = hyper::body::Bytes::from(
511 r#"
512 { "status":"success",
513 "data":{
514 "resultType":"scalar",
515 "result":[1558283674.829]
516 }
517 }"#,
518 );
519 let res_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test1_json);
520 assert!(res_json.is_ok());
521 }
522
523 #[test]
524 fn it_loads_prometheus_matrix() {
525 init_log();
526 // A json returned by prometheus
527 let test0_json = hyper::body::Bytes::from(
528 r#"
529 {
530 "status": "success",
531 "data": {
532 "resultType": "matrix",
533 "result": [
534 {
535 "metric": {
536 "__name__": "node_load1",
537 "instance": "localhost:9100",
538 "job": "node_exporter"
539 },
540 "values": [
541 [1558253469,"1.69"],[1558253470,"1.70"],[1558253471,"1.71"],
542 [1558253472,"1.72"],[1558253473,"1.73"],[1558253474,"1.74"],
543 [1558253475,"1.75"],[1558253476,"1.76"],[1558253477,"1.77"],
544 [1558253478,"1.78"],[1558253479,"1.79"]]
545 }
546 ]
547 }
548 }"#,
549 );
550 let res_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test0_json);
551 assert!(res_json.is_ok());
552 // This json is missing the value after the epoch
553 let test2_json = hyper::body::Bytes::from(
554 r#"
555 {
556 "status": "success",
557 "data": {
558 "resultType": "matrix",
559 "result": [
560 {
561 "metric": {
562 "__name__": "node_load1",
563 "instance": "localhost:9100",
564 "job": "node_exporter"
565 },
566 "values": [
567 [1558253478]
568 ]
569 }
570 ]
571 }
572 }"#,
573 );
574 let res_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test2_json);
575 assert!(res_json.is_ok());
576 }
577
578 #[test]
579 fn it_loads_prometheus_vector() {
580 init_log();
581 // A json returned by prometheus
582 let test0_json = hyper::body::Bytes::from(
583 r#"
584 {
585 "status": "success",
586 "data": {
587 "resultType": "vector",
588 "result": [
589 {
590 "metric": {
591 "__name__": "up",
592 "instance": "localhost:9090",
593 "job": "prometheus"
594 },
595 "value": [
596 1557571137.732,
597 "1"
598 ]
599 },
600 {
601 "metric": {
602 "__name__": "up",
603 "instance": "localhost:9100",
604 "job": "node_exporter"
605 },
606 "value": [
607 1557571138.732,
608 "1"
609 ]
610 }
611 ]
612 }
613 }"#,
614 );
615 let res_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test0_json);
616 assert!(res_json.is_ok());
617 }
618
619 #[tokio::test]
620 #[ignore]
621 async fn it_loads_prometheus() {
622 let query = Query::Instant(InstantQuery::new("up"));
623 let request = DataSourceBuilder::new("localhost:9090")
624 .with_query(query)
625 .build()
626 .unwrap();
627 let res_json = request.get().await;
628 tracing::error!("{:?}", res_json);
629 }
630}