prometheus_http_api/
lib.rs

1//! Prometheus HTTP API
2//!
3//! This crate provides data structures to interact with the prometheus HTTP API endpoints. The
4//! crate allows constructing prometheus data sources with [`DataSourceBuilder`].
5//!
6//! A [`Query`] must be provided to the prometheus data source via the
7//! [`DataSourceBuilder::with_query()`] that acceps either a
8//! [`InstantQuery`] or a [`RangeQuery`], for these types, build-like methods
9//! are provided for the optional parameters.
10//!
11//! ## Simple Usage
12//!
13//! To gather the data from `<http://localhost:9090/api/v1/query?query=up>`
14//!
15//! ```
16//! use prometheus_http_api::{
17//!     DataSourceBuilder, InstantQuery, Query,
18//! };
19//!
20//! #[tokio::main]
21//! async fn main() {
22//!     let query = Query::Instant(InstantQuery::new("up"));
23//!     let request = DataSourceBuilder::new("localhost:9090")
24//!         .with_query(query)
25//!         .build()
26//!         .unwrap();
27//!     let res_json = request.get().await;
28//!     tracing::info!("{:?}", res_json);
29//! }
30//! ```
31
32#![warn(rust_2018_idioms)]
33#![warn(missing_docs)]
34#![warn(rustdoc::missing_doc_code_examples)]
35use hyper::client::connect::HttpConnector;
36use hyper::client::Client;
37use hyper_tls::HttpsConnector;
38use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
39use serde::{Deserialize, Serialize};
40use std::collections::HashMap;
41use std::time::Duration;
42use thiserror::Error;
43
44/// [`MatrixResult`] contains Prometheus Range Vectors
45/// ```
46/// let matrix_raw_response = hyper::body::Bytes::from(r#"
47///   {
48///     "metric": {
49///       "__name__": "node_load1",
50///       "instance": "localhost:9100",
51///       "job": "node_exporter"
52///     },
53///     "values": [
54///       [1558253469,"1.69"],[1558253470,"1.70"],[1558253471,"1.71"]
55///     ]
56///   }"#);
57/// let res_json: Result<prometheus_http_api::MatrixResult, serde_json::Error> = serde_json::from_slice(&matrix_raw_response);
58/// assert!(res_json.is_ok());
59/// ```
60#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone)]
61pub struct MatrixResult {
62    /// A series of labels for the matrix results. This is a HashMap of `{"label_name_1":
63    /// "value_1", ...}`
64    #[serde(rename = "metric")]
65    pub labels: HashMap<String, String>,
66    /// The values over time captured on prometheus, generally `[[<epoch>, "<value>"]]`
67    pub values: Vec<Vec<serde_json::Value>>,
68}
69
70/// `VectorResult` contains Prometheus Instant Vectors
71/// ```
72/// let vector_raw_response = hyper::body::Bytes::from(r#"
73///  {
74///    "metric": {
75///      "__name__": "up",
76///      "instance": "localhost:9090",
77///      "job": "prometheus"
78///    },
79///    "value": [
80///      1557571137.732,
81///      "1"
82///    ]
83///   }"#);
84/// let res_json: Result<prometheus_http_api::VectorResult, serde_json::Error> = serde_json::from_slice(&vector_raw_response);
85/// assert!(res_json.is_ok());
86/// ```
87#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone)]
88pub struct VectorResult {
89    /// A series of labels for the matrix results. This is a HashMap of `{"label_name_1":
90    /// "value_1", ...}`
91    #[serde(rename = "metric")]
92    pub labels: HashMap<String, String>,
93    /// The values over time captured on prometheus, generally `[[<epoch>, "<value>"]]`
94    pub value: Vec<serde_json::Value>,
95}
96
97/// Available [`ResponseData`] formats documentation:
98/// `https://prometheus.io/docs/prometheus/latest/querying/api/#expression-query-result-formats`
99/// ```
100/// let scalar_result_type = hyper::body::Bytes::from(r#"{
101///   "resultType":"scalar",
102///   "result":[1558283674.829,"1"]
103///  }"#);
104/// let res_json: Result<prometheus_http_api::ResponseData, serde_json::Error> = serde_json::from_slice(&scalar_result_type);
105/// assert!(res_json.is_ok());
106/// ```
107#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
108#[serde(tag = "resultType")]
109pub enum ResponseData {
110    /// Handles a Response of type [`VectorResult`]
111    #[serde(rename = "vector")]
112    Vector {
113        /// The result vector.
114        result: Vec<VectorResult>,
115    },
116    /// Handles a Response of type [`MatrixResult`]
117    #[serde(rename = "matrix")]
118    Matrix {
119        /// The result vector.
120        result: Vec<MatrixResult>,
121    },
122    /// Handles a scalar result, generally numeric
123    #[serde(rename = "scalar")]
124    Scalar {
125        /// The result vector.
126        result: Vec<serde_json::Value>,
127    },
128    /// Handles a String result, for example for label names
129    #[serde(rename = "string")]
130    String {
131        /// The result vector.
132        result: Vec<serde_json::Value>,
133    },
134}
135
136impl Default for ResponseData {
137    fn default() -> Self {
138        Self::Vector {
139            result: vec![VectorResult::default()],
140        }
141    }
142}
143
144/// A Prometheus [`Response`] returned by an HTTP query.
145/// ```
146/// let full_response = hyper::body::Bytes::from(
147///    r#"
148///    { "status":"success",
149///      "data":{
150///        "resultType":"scalar",
151///        "result":[1558283674.829,"1"]
152///      }
153///    }"#,
154///  );
155/// let res_json: Result<prometheus_http_api::Response, serde_json::Error> = serde_json::from_slice(&full_response);
156/// assert!(res_json.is_ok());
157/// ```
158#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone)]
159pub struct Response {
160    /// A Data response, it may be vector, matrix, scalar or String
161    pub data: ResponseData,
162    /// An status string, either `"success"` or `"error"`
163    pub status: String,
164}
165
166/// An instant query to send to Prometheus
167#[derive(Debug)]
168pub struct InstantQuery {
169    ///  expression query string.
170    query: String,
171    /// Evaluation timestamp. Optional.
172    time: Option<u64>,
173    /// Evaluation timeout. Optional. Defaults to and is capped by the value of the -query.timeout flag.
174    timeout: Option<u64>,
175}
176
177impl InstantQuery {
178    /// Initializes an Instant query with optional fields set to None
179    pub fn new(query: &str) -> Self {
180        Self {
181            query: query.to_string(),
182            time: None,
183            timeout: None,
184        }
185    }
186
187    /// Builder method to set the query timeout
188    pub fn with_epoch(mut self, time: u64) -> Self {
189        self.time = Some(time);
190        self
191    }
192
193    /// Builder method to set the query timeout
194    pub fn with_timeout(mut self, timeout: u64) -> Self {
195        self.timeout = Some(timeout);
196        self
197    }
198
199    /// Transforms the typed query into HTTP GET query params, it contains a pre-built `base` that
200    /// may use an HTTP path  prefix if configured.
201    pub fn as_query_params(&self, mut base: String) -> String {
202        tracing::trace!("InstantQuery::as_query_params raw query: {}", self.query);
203        let encoded_query = utf8_percent_encode(&self.query, NON_ALPHANUMERIC).to_string();
204        tracing::trace!(
205            "InstantQuery::as_query_params encoded_query: {}",
206            encoded_query
207        );
208        base.push_str(&format!("api/v1/query?query={}", encoded_query));
209        if let Some(time) = self.time {
210            base.push_str(&format!("&time={}", time));
211        }
212        if let Some(timeout) = self.timeout {
213            base.push_str(&format!("&timeout={}", timeout));
214        }
215        base
216    }
217}
218
219/// A range query to send to Prometheus
220#[derive(Debug)]
221pub struct RangeQuery {
222    ///  expression query string.
223    pub query: String,
224    /// Start timestamp, inclusive.
225    pub start: u64,
226    /// End timestamp, inclusive.
227    pub end: u64,
228    /// Query resolution step width in duration format or float number of seconds.
229    pub step: f64,
230    /// Evaluation timeout. Optional. Defaults to and is capped by the value of the -query.timeout flag.1
231    pub timeout: Option<u64>,
232}
233
234impl RangeQuery {
235    /// Initializes a Range query with optional fields set to None
236    pub fn new(query: &str, start: u64, end: u64, step: f64) -> Self {
237        Self {
238            query: query.to_string(),
239            start,
240            end,
241            step,
242            timeout: None,
243        }
244    }
245
246    /// Builder method to set the query timeout
247    pub fn with_timeout(mut self, timeout: u64) -> Self {
248        self.timeout = Some(timeout);
249        self
250    }
251
252    /// Transforms the typed query into HTTP GET query params, it contains a pre-built `base` that
253    /// may use an HTTP path  prefix if configured.
254    pub fn as_query_params(&self, mut base: String) -> String {
255        tracing::trace!("RangeQuery::as_query_params: raw query: {}", self.query);
256        let encoded_query = utf8_percent_encode(&self.query, NON_ALPHANUMERIC).to_string();
257        tracing::trace!(
258            "RangeQuery::as_query_params encoded_query: {}",
259            encoded_query
260        );
261        base.push_str(&format!(
262            "api/v1/query_range?query={}&start={}&end={}&step={}",
263            encoded_query, self.start, self.end, self.step
264        ));
265        if let Some(timeout) = self.timeout {
266            base.push_str(&format!("&timeout={}", timeout));
267        }
268        base
269    }
270}
271
272/// A query to the Prometheus HTTP API
273#[derive(Debug)]
274pub enum Query {
275    /// Represents an instant query at a single point in time
276    Instant(InstantQuery),
277    /// Represents an expression query over a range of time
278    Range(RangeQuery),
279}
280
281impl Query {
282    /// Transforms the typed query into HTTP GET query params
283    pub fn as_query_params(&self, prefix: Option<String>) -> String {
284        let mut base = if let Some(prefix) = prefix {
285            prefix
286        } else {
287            String::from("/")
288        };
289        if !base.ends_with("/") {
290            base.push_str("/");
291        }
292        match self {
293            Self::Instant(query) => query.as_query_params(base),
294            Self::Range(query) => query.as_query_params(base),
295        }
296    }
297
298    /// Returns the timeout of the prometheus query
299    pub fn get_timeout(&self) -> Option<u64> {
300        match self {
301            Self::Instant(query) => query.timeout,
302            Self::Range(query) => query.timeout,
303        }
304    }
305}
306
307/// A simple Error type to understand different errors.
308#[derive(Error, Debug)]
309pub enum DataSourceError {
310    /// The DataSource request may fail due to an http module request, could happen while
311    /// interacting with the HTTP server.
312    #[error("http error: {0}")]
313    Http(#[from] http::Error),
314    /// The DataSource request building may fail due to invalid schemes, authority, etc.
315    #[error("hyper error: {0}")]
316    Hyper(#[from] hyper::Error),
317    /// The DataSource request may fail due to invalid data returned from the server.
318    #[error("Serde Error: {0}")]
319    Serde(#[from] serde_json::Error),
320    /// The DataSource building process may not specific a query, this is a required field.
321    #[error("Missing query type")]
322    MissingQueryParam,
323}
324
325/// Represents a prometheus data source that works over an http(s) host:port endpoint potentially
326/// behind a /prometheus_prefix/
327#[derive(Debug)]
328pub struct DataSource {
329    /// This should contain the scheme://<authority>/ portion of the URL, the params would be
330    /// appended later.
331    pub authority: String,
332
333    /// Optionally specify if http/https is used. By default 'http'
334    pub scheme: String,
335
336    /// The prefix to reach prometheus on the authority, for example, prometheus may share a
337    /// host:port with grafana, etc, and prometheus would be reached by <authority>/prom/
338    pub prefix: Option<String>,
339
340    /// The query to send to prometheus
341    pub query: Query,
342
343    /// Sets the timeout for the HTTP connection to the prometheus server
344    pub http_timeout: Option<Duration>,
345}
346
347/// A Builder struct to create the [`DataSource`]
348#[derive(Debug)]
349pub struct DataSourceBuilder {
350    /// Allows setting the http://<authority>/ portion of the URL, the query param may be a
351    /// host:port or user:password@host:port or dns/fqdn
352    pub authority: String,
353
354    /// Allows setting the <scheme>://authority/ portion of the URL, currently tested with http and
355    /// https by using hyper_tls
356    pub scheme: Option<String>,
357
358    /// Allows setting the scheme://authority/<prefix>/api/v1/ portion of the URL, useful when
359    /// prometheus shares the same `authority` as other components and the api/v1/query should be
360    /// prefixed with a specific route.
361    pub prefix: Option<String>,
362
363    /// Sets the query parameter
364    pub query: Option<Query>,
365
366    /// Sets the timeout for the HTTP connection to the prometheus server
367    pub http_timeout: Option<Duration>,
368}
369
370impl DataSourceBuilder {
371    /// Initializes the builder for the DataSource, required param is the authority, may contain
372    /// `user:password@host:port`, or `host:port`
373    pub fn new(authority: &str) -> Self {
374        Self {
375            authority: authority.to_string(),
376            scheme: None,
377            prefix: None,
378            query: None,
379            http_timeout: None,
380        }
381    }
382
383    /// Sets the prefix that hosts prometheus, useful when prometheus is behind a shared reverse
384    /// proxy
385    pub fn with_prefix(mut self, prefix: String) -> Self {
386        self.prefix = Some(prefix);
387        self
388    }
389
390    /// Sets the prometheus query param.
391    pub fn with_query(mut self, query: Query) -> Self {
392        self.query = Some(query);
393        self
394    }
395
396    /// Sets the URL scheme, be it http or https
397    pub fn with_scheme(mut self, scheme: String) -> Self {
398        self.scheme = Some(scheme);
399        self
400    }
401
402    /// Builds into DataSource after checking and merging fields
403    pub fn build(self) -> Result<DataSource, DataSourceError> {
404        let query = match self.query {
405            Some(query) => query,
406            None => {
407                tracing::error!("Missing query field in builder");
408                return Err(DataSourceError::MissingQueryParam);
409            }
410        };
411        if let Some(http_timeout) = self.http_timeout {
412            if let Some(query_timeout) = query.get_timeout() {
413                if query_timeout > http_timeout.as_secs() {
414                    tracing::warn!("Configured query_timeout is longer than http_timeout. Prometheus query will be dropped by the http client if the query exceeds http_timeout");
415                }
416            }
417        }
418        let scheme = match self.scheme {
419            Some(val) => val,
420            None => String::from("http"),
421        };
422        Ok(DataSource {
423            authority: self.authority,
424            scheme,
425            prefix: self.prefix,
426            query,
427            http_timeout: self.http_timeout,
428        })
429    }
430}
431
432impl DataSource {
433    /// `get` is an async operation that returns potentially a Response
434    pub async fn get(&self) -> Result<Response, DataSourceError> {
435        let url = http::uri::Builder::new()
436            .authority(self.authority.clone())
437            .scheme(self.scheme.as_str())
438            .path_and_query(self.query.as_query_params(self.prefix.clone()))
439            .build()?;
440        tracing::debug!("get() init Prometheus URL: {}", url);
441        let mut client = Client::builder();
442        if let Some(timeout) = self.http_timeout {
443            client.pool_idle_timeout(timeout);
444        }
445        let request = if url.scheme() == Some(&hyper::http::uri::Scheme::HTTP) {
446            tracing::info!("get: Prometheus URL: {}", url);
447            client
448                .build::<_, hyper::Body>(HttpConnector::new())
449                .get(url.clone())
450        } else {
451            client
452                .build::<_, hyper::Body>(HttpsConnector::new())
453                .get(url.clone())
454        };
455        let response_body = match request.await {
456            Ok(res) => hyper::body::to_bytes(res.into_body()).await?,
457            Err(err) => {
458                tracing::info!("get: Error loading '{:?}': '{:?}'", url, err);
459                return Err(err.into());
460            }
461        };
462        tracing::debug!("get() done");
463        tracing::trace!("Deserializing: {:?}", response_body);
464        Ok(serde_json::from_slice(&response_body)?)
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471    fn init_log() {
472        let _ = env_logger::builder().is_test(true).try_init();
473    }
474
475    #[test]
476    fn it_detects_prometheus_errors() {
477        init_log();
478        let test0_json = hyper::body::Bytes::from(
479            r#"
480            {
481              "status": "error",
482              "errorType": "bad_data",
483              "error": "end timestamp must not be before start time"
484            }
485            "#,
486        );
487        let res0_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test0_json);
488        assert!(res0_json.is_err());
489        let test1_json = hyper::body::Bytes::from("Internal Server Error");
490        let res1_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test1_json);
491        assert!(res1_json.is_err());
492    }
493
494    #[test]
495    fn it_loads_prometheus_scalars() {
496        init_log();
497        // A json returned by prometheus
498        let test0_json = hyper::body::Bytes::from(
499            r#"
500            { "status":"success",
501              "data":{
502                "resultType":"scalar",
503                "result":[1558283674.829,"1"]
504              }
505            }"#,
506        );
507        let res_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test0_json);
508        assert!(res_json.is_ok());
509        // This json is missing the value after the epoch
510        let test1_json = hyper::body::Bytes::from(
511            r#"
512            { "status":"success",
513              "data":{
514                "resultType":"scalar",
515                "result":[1558283674.829]
516              }
517            }"#,
518        );
519        let res_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test1_json);
520        assert!(res_json.is_ok());
521    }
522
523    #[test]
524    fn it_loads_prometheus_matrix() {
525        init_log();
526        // A json returned by prometheus
527        let test0_json = hyper::body::Bytes::from(
528            r#"
529            {
530              "status": "success",
531              "data": {
532                "resultType": "matrix",
533                "result": [
534                  {
535                    "metric": {
536                      "__name__": "node_load1",
537                      "instance": "localhost:9100",
538                      "job": "node_exporter"
539                    },
540                    "values": [
541                        [1558253469,"1.69"],[1558253470,"1.70"],[1558253471,"1.71"],
542                        [1558253472,"1.72"],[1558253473,"1.73"],[1558253474,"1.74"],
543                        [1558253475,"1.75"],[1558253476,"1.76"],[1558253477,"1.77"],
544                        [1558253478,"1.78"],[1558253479,"1.79"]]
545                  }
546                ]
547              }
548            }"#,
549        );
550        let res_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test0_json);
551        assert!(res_json.is_ok());
552        // This json is missing the value after the epoch
553        let test2_json = hyper::body::Bytes::from(
554            r#"
555            {
556              "status": "success",
557              "data": {
558                "resultType": "matrix",
559                "result": [
560                  {
561                    "metric": {
562                      "__name__": "node_load1",
563                      "instance": "localhost:9100",
564                      "job": "node_exporter"
565                    },
566                    "values": [
567                        [1558253478]
568                    ]
569                  }
570                ]
571              }
572            }"#,
573        );
574        let res_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test2_json);
575        assert!(res_json.is_ok());
576    }
577
578    #[test]
579    fn it_loads_prometheus_vector() {
580        init_log();
581        // A json returned by prometheus
582        let test0_json = hyper::body::Bytes::from(
583            r#"
584            {
585              "status": "success",
586              "data": {
587                "resultType": "vector",
588                "result": [
589                  {
590                    "metric": {
591                      "__name__": "up",
592                      "instance": "localhost:9090",
593                      "job": "prometheus"
594                    },
595                    "value": [
596                      1557571137.732,
597                      "1"
598                    ]
599                  },
600                  {
601                    "metric": {
602                      "__name__": "up",
603                      "instance": "localhost:9100",
604                      "job": "node_exporter"
605                    },
606                    "value": [
607                      1557571138.732,
608                      "1"
609                    ]
610                  }
611                ]
612              }
613            }"#,
614        );
615        let res_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test0_json);
616        assert!(res_json.is_ok());
617    }
618
619    #[tokio::test]
620    #[ignore]
621    async fn it_loads_prometheus() {
622        let query = Query::Instant(InstantQuery::new("up"));
623        let request = DataSourceBuilder::new("localhost:9090")
624            .with_query(query)
625            .build()
626            .unwrap();
627        let res_json = request.get().await;
628        tracing::error!("{:?}", res_json);
629    }
630}