#![warn(rust_2018_idioms)]
#![warn(missing_docs)]
#![warn(rustdoc::missing_doc_code_examples)]
use hyper::client::connect::HttpConnector;
use hyper::client::Client;
use hyper_tls::HttpsConnector;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
use thiserror::Error;
#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone)]
pub struct MatrixResult {
#[serde(rename = "metric")]
pub labels: HashMap<String, String>,
pub values: Vec<Vec<serde_json::Value>>,
}
#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone)]
pub struct VectorResult {
#[serde(rename = "metric")]
pub labels: HashMap<String, String>,
pub value: Vec<serde_json::Value>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(tag = "resultType")]
pub enum ResponseData {
#[serde(rename = "vector")]
Vector {
result: Vec<VectorResult>,
},
#[serde(rename = "matrix")]
Matrix {
result: Vec<MatrixResult>,
},
#[serde(rename = "scalar")]
Scalar {
result: Vec<serde_json::Value>,
},
#[serde(rename = "string")]
String {
result: Vec<serde_json::Value>,
},
}
impl Default for ResponseData {
fn default() -> Self {
Self::Vector {
result: vec![VectorResult::default()],
}
}
}
#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone)]
pub struct Response {
pub data: ResponseData,
pub status: String,
}
#[derive(Debug)]
pub struct InstantQuery {
query: String,
time: Option<u64>,
timeout: Option<u64>,
}
impl InstantQuery {
pub fn new(query: &str) -> Self {
Self {
query: query.to_string(),
time: None,
timeout: None,
}
}
pub fn with_epoch(mut self, time: u64) -> Self {
self.time = Some(time);
self
}
pub fn with_timeout(mut self, timeout: u64) -> Self {
self.timeout = Some(timeout);
self
}
pub fn as_query_params(&self, mut base: String) -> String {
tracing::trace!("InstantQuery::as_query_params raw query: {}", self.query);
let encoded_query = utf8_percent_encode(&self.query, NON_ALPHANUMERIC).to_string();
tracing::trace!(
"InstantQuery::as_query_params encoded_query: {}",
encoded_query
);
base.push_str(&format!("api/v1/query?query={}", encoded_query));
if let Some(time) = self.time {
base.push_str(&format!("&time={}", time));
}
if let Some(timeout) = self.timeout {
base.push_str(&format!("&timeout={}", timeout));
}
base
}
}
#[derive(Debug)]
pub struct RangeQuery {
pub query: String,
pub start: u64,
pub end: u64,
pub step: f64,
pub timeout: Option<u64>,
}
impl RangeQuery {
pub fn new(query: &str, start: u64, end: u64, step: f64) -> Self {
Self {
query: query.to_string(),
start,
end,
step,
timeout: None,
}
}
pub fn with_timeout(mut self, timeout: u64) -> Self {
self.timeout = Some(timeout);
self
}
pub fn as_query_params(&self, mut base: String) -> String {
tracing::trace!("RangeQuery::as_query_params: raw query: {}", self.query);
let encoded_query = utf8_percent_encode(&self.query, NON_ALPHANUMERIC).to_string();
tracing::trace!(
"RangeQuery::as_query_params encoded_query: {}",
encoded_query
);
base.push_str(&format!(
"api/v1/query_range?query={}&start={}&end={}&step={}",
encoded_query, self.start, self.end, self.step
));
if let Some(timeout) = self.timeout {
base.push_str(&format!("&timeout={}", timeout));
}
base
}
}
#[derive(Debug)]
pub enum Query {
Instant(InstantQuery),
Range(RangeQuery),
}
impl Query {
pub fn as_query_params(&self, prefix: Option<String>) -> String {
let mut base = if let Some(prefix) = prefix {
prefix
} else {
String::from("/")
};
if !base.ends_with("/") {
base.push_str("/");
}
match self {
Self::Instant(query) => query.as_query_params(base),
Self::Range(query) => query.as_query_params(base),
}
}
pub fn get_timeout(&self) -> Option<u64> {
match self {
Self::Instant(query) => query.timeout,
Self::Range(query) => query.timeout,
}
}
}
#[derive(Error, Debug)]
pub enum DataSourceError {
#[error("http error: {0}")]
Http(#[from] http::Error),
#[error("hyper error: {0}")]
Hyper(#[from] hyper::Error),
#[error("Serde Error: {0}")]
Serde(#[from] serde_json::Error),
#[error("Missing query type")]
MissingQueryParam,
}
#[derive(Debug)]
pub struct DataSource {
pub authority: String,
pub scheme: String,
pub prefix: Option<String>,
pub query: Query,
pub http_timeout: Option<Duration>,
}
#[derive(Debug)]
pub struct DataSourceBuilder {
pub authority: String,
pub scheme: Option<String>,
pub prefix: Option<String>,
pub query: Option<Query>,
pub http_timeout: Option<Duration>,
}
impl DataSourceBuilder {
pub fn new(authority: &str) -> Self {
Self {
authority: authority.to_string(),
scheme: None,
prefix: None,
query: None,
http_timeout: None,
}
}
pub fn with_prefix(mut self, prefix: String) -> Self {
self.prefix = Some(prefix);
self
}
pub fn with_query(mut self, query: Query) -> Self {
self.query = Some(query);
self
}
pub fn with_scheme(mut self, scheme: String) -> Self {
self.scheme = Some(scheme);
self
}
pub fn build(self) -> Result<DataSource, DataSourceError> {
let query = match self.query {
Some(query) => query,
None => {
tracing::error!("Missing query field in builder");
return Err(DataSourceError::MissingQueryParam);
}
};
if let Some(http_timeout) = self.http_timeout {
if let Some(query_timeout) = query.get_timeout() {
if query_timeout > http_timeout.as_secs() {
tracing::warn!("Configured query_timeout is longer than http_timeout. Prometheus query will be dropped by the http client if the query exceeds http_timeout");
}
}
}
let scheme = match self.scheme {
Some(val) => val,
None => String::from("http"),
};
Ok(DataSource {
authority: self.authority,
scheme,
prefix: self.prefix,
query,
http_timeout: self.http_timeout,
})
}
}
impl DataSource {
pub async fn get(&self) -> Result<Response, DataSourceError> {
let url = http::uri::Builder::new()
.authority(self.authority.clone())
.scheme(self.scheme.as_str())
.path_and_query(self.query.as_query_params(self.prefix.clone()))
.build()?;
tracing::debug!("get() init Prometheus URL: {}", url);
let mut client = Client::builder();
if let Some(timeout) = self.http_timeout {
client.pool_idle_timeout(timeout);
}
let request = if url.scheme() == Some(&hyper::http::uri::Scheme::HTTP) {
tracing::info!("get: Prometheus URL: {}", url);
client
.build::<_, hyper::Body>(HttpConnector::new())
.get(url.clone())
} else {
client
.build::<_, hyper::Body>(HttpsConnector::new())
.get(url.clone())
};
let response_body = match request.await {
Ok(res) => hyper::body::to_bytes(res.into_body()).await?,
Err(err) => {
tracing::info!("get: Error loading '{:?}': '{:?}'", url, err);
return Err(err.into());
}
};
tracing::debug!("get() done");
tracing::trace!("Deserializing: {:?}", response_body);
Ok(serde_json::from_slice(&response_body)?)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn init_log() {
let _ = env_logger::builder().is_test(true).try_init();
}
#[test]
fn it_detects_prometheus_errors() {
init_log();
let test0_json = hyper::body::Bytes::from(
r#"
{
"status": "error",
"errorType": "bad_data",
"error": "end timestamp must not be before start time"
}
"#,
);
let res0_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test0_json);
assert!(res0_json.is_err());
let test1_json = hyper::body::Bytes::from("Internal Server Error");
let res1_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test1_json);
assert!(res1_json.is_err());
}
#[test]
fn it_loads_prometheus_scalars() {
init_log();
let test0_json = hyper::body::Bytes::from(
r#"
{ "status":"success",
"data":{
"resultType":"scalar",
"result":[1558283674.829,"1"]
}
}"#,
);
let res_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test0_json);
assert!(res_json.is_ok());
let test1_json = hyper::body::Bytes::from(
r#"
{ "status":"success",
"data":{
"resultType":"scalar",
"result":[1558283674.829]
}
}"#,
);
let res_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test1_json);
assert!(res_json.is_ok());
}
#[test]
fn it_loads_prometheus_matrix() {
init_log();
let test0_json = hyper::body::Bytes::from(
r#"
{
"status": "success",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"__name__": "node_load1",
"instance": "localhost:9100",
"job": "node_exporter"
},
"values": [
[1558253469,"1.69"],[1558253470,"1.70"],[1558253471,"1.71"],
[1558253472,"1.72"],[1558253473,"1.73"],[1558253474,"1.74"],
[1558253475,"1.75"],[1558253476,"1.76"],[1558253477,"1.77"],
[1558253478,"1.78"],[1558253479,"1.79"]]
}
]
}
}"#,
);
let res_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test0_json);
assert!(res_json.is_ok());
let test2_json = hyper::body::Bytes::from(
r#"
{
"status": "success",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"__name__": "node_load1",
"instance": "localhost:9100",
"job": "node_exporter"
},
"values": [
[1558253478]
]
}
]
}
}"#,
);
let res_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test2_json);
assert!(res_json.is_ok());
}
#[test]
fn it_loads_prometheus_vector() {
init_log();
let test0_json = hyper::body::Bytes::from(
r#"
{
"status": "success",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"__name__": "up",
"instance": "localhost:9090",
"job": "prometheus"
},
"value": [
1557571137.732,
"1"
]
},
{
"metric": {
"__name__": "up",
"instance": "localhost:9100",
"job": "node_exporter"
},
"value": [
1557571138.732,
"1"
]
}
]
}
}"#,
);
let res_json: Result<Response, serde_json::Error> = serde_json::from_slice(&test0_json);
assert!(res_json.is_ok());
}
#[tokio::test]
#[ignore]
async fn it_loads_prometheus() {
let query = Query::Instant(InstantQuery::new("up"));
let request = DataSourceBuilder::new("localhost:9090")
.with_query(query)
.build()
.unwrap();
let res_json = request.get().await;
tracing::error!("{:?}", res_json);
}
}