use std::str::FromStr;
use std::time::Duration;
use chrono::offset::Utc;
use chrono::DateTime;
use futures::compat::Future01CompatExt;
use futures_stable::Stream;
use http::Uri;
use hyper::client::HttpConnector;
use hyper::{Body, Client, Request};
use hyper_tls::HttpsConnector;
use serde_json;
use url::Url;
use crate::messages::ApiResult;
use crate::{Error, Result};
pub enum Step {
Seconds(f64),
Duration(Duration),
}
pub type HyperHttpsConnector = HttpsConnector<HttpConnector>;
pub struct PromClient<T: hyper::client::connect::Connect + 'static> {
client: Client<T, Body>,
host: Url,
query_timeout: Option<Duration>,
}
impl PromClient<HyperHttpsConnector> {
pub fn new_https(
host: &str,
query_timeout: Option<Duration>,
) -> Result<PromClient<HyperHttpsConnector>> {
let host = Url::from_str(host).map_err(|e| Error::new_invalid_host_error(host, e))?;
let https = HttpsConnector::new(4).expect("Cannot build HTTPS connection pool");
Ok(PromClient {
client: Client::builder().keep_alive(true).build(https),
host,
query_timeout,
})
}
}
impl<T: hyper::client::connect::Connect + 'static> PromClient<T> {
pub async fn instant_query(
&mut self,
query: String,
at: Option<DateTime<Utc>>,
) -> Result<ApiResult> {
let mut u = self.api_call_base_url("/api/v1/query");
u.query_pairs_mut().append_pair("query", &query);
if let Some(t) = at {
u.query_pairs_mut()
.append_pair("time", t.to_rfc3339().as_str());
}
if let Some(t) = self.query_timeout {
u.query_pairs_mut()
.append_pair("timeout", &t.as_secs().to_string());
}
let u = Uri::from_str(u.as_str())?;
await!(self.make_http_get_api_call(u))
}
pub async fn range_query(
&mut self,
query: String,
start: DateTime<Utc>,
end: DateTime<Utc>,
step: Step,
) -> Result<ApiResult> {
let mut u = self.api_call_base_url("/api/v1/query_range");
u.query_pairs_mut().append_pair("query", &query);
u.query_pairs_mut()
.append_pair("start", &start.to_rfc3339().to_string());
u.query_pairs_mut()
.append_pair("end", &end.to_rfc3339().to_string());
let step: String = match step {
Step::Seconds(f) => f.to_string(),
Step::Duration(d) => format!("{}s", d.as_secs().to_string()),
};
u.query_pairs_mut().append_pair("step", &step);
if let Some(t) = self.query_timeout {
u.query_pairs_mut()
.append_pair("timeout", &t.as_secs().to_string());
}
let u = Uri::from_str(u.as_str())?;
await!(self.make_http_get_api_call(u))
}
pub async fn series(
&mut self,
selectors: Vec<String>,
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> Result<ApiResult> {
let mut u = self.api_call_base_url("/api/v1/series");
for s in selectors {
u.query_pairs_mut().append_pair("match[]", &s);
}
u.query_pairs_mut()
.append_pair("start", &start.to_rfc3339().to_string());
u.query_pairs_mut()
.append_pair("end", &end.to_rfc3339().to_string());
if let Some(t) = self.query_timeout {
u.query_pairs_mut()
.append_pair("timeout", &t.as_secs().to_string());
}
let u = Uri::from_str(u.as_str())?;
await!(self.make_http_get_api_call(u))
}
pub async fn label_names(&mut self) -> Result<ApiResult> {
let u = self.api_call_base_url("/api/v1/labels");
let u = Uri::from_str(u.as_str())?;
await!(self.make_http_get_api_call(u))
}
pub async fn label_values(&mut self, label_name: String) -> Result<ApiResult> {
let u = self.api_call_base_url(&format!("/api/v1/{}/values", label_name));
let u = Uri::from_str(u.as_str())?;
await!(self.make_http_get_api_call(u))
}
pub async fn targets(&mut self) -> Result<ApiResult> {
let u = self.api_call_base_url("/api/v1/targets");
let u = Uri::from_str(u.as_str())?;
await!(self.make_http_get_api_call(u))
}
pub async fn alert_managers(&mut self) -> Result<ApiResult> {
let u = self.api_call_base_url("/api/v1/alertmanagers");
let u = Uri::from_str(u.as_str())?;
await!(self.make_http_get_api_call(u))
}
pub async fn config(&mut self) -> Result<ApiResult> {
let u = self.api_call_base_url("/api/v1/status/config");
let u = Uri::from_str(u.as_str())?;
await!(self.make_http_get_api_call(u))
}
pub async fn flags(&mut self) -> Result<ApiResult> {
let u = self.api_call_base_url("/api/v1/status/flags");
let u = Uri::from_str(u.as_str())?;
await!(self.make_http_get_api_call(u))
}
async fn make_http_get_api_call(&mut self, u: Uri) -> Result<ApiResult> {
let resp = await!(self.client.get(u).compat())?;
let body = await!(resp.into_body().concat2().compat())?;
serde_json::from_slice::<ApiResult>(&body).map_err(From::from)
}
pub async fn delete_series(
&mut self,
series: Vec<String>,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
) -> Result<ApiResult> {
let mut u = self.api_call_base_url("/api/v1/admin/tsdb/delete_series");
for s in series {
u.query_pairs_mut().append_pair("match[]", &s);
}
if let Some(start) = start {
u.query_pairs_mut()
.append_pair("start", &start.to_rfc3339().to_string());
}
if let Some(end) = end {
u.query_pairs_mut()
.append_pair("end", &end.to_rfc3339().to_string());
}
if let Some(t) = self.query_timeout {
u.query_pairs_mut()
.append_pair("timeout", &t.as_secs().to_string());
}
let u = Uri::from_str(u.as_str())?;
let post = Request::post(u)
.body(Body::empty())
.expect("Failed to construct 'delete_series' POST with an empty body");
let resp = await!(self.client.request(post).compat())?;
let body = await!(resp.into_body().concat2().compat())?;
serde_json::from_slice::<ApiResult>(&body).map_err(From::from)
}
pub async fn snapshot(&mut self, skip_head: bool) -> Result<ApiResult> {
let mut u = self.api_call_base_url("/api/v1/admin/tsdb/snapshot");
u.query_pairs_mut().append_pair("skip_head", &skip_head.to_string());
let u = Uri::from_str(u.as_str())?;
let post = Request::post(u)
.body(Body::empty())
.expect("Failed to construct 'snapshot' POST with empty body");
let resp = await!(self.client.request(post).compat())?;
let body = await!(resp.into_body().concat2().compat())?;
serde_json::from_slice::<ApiResult>(&body).map_err(From::from)
}
pub async fn clean_tombstones(&mut self) -> Result<ApiResult> {
let u = self.api_call_base_url("/api/v1/admin/tsdb/clean_tombstones");
let u = Uri::from_str(u.as_str())?;
let post = Request::post(u)
.body(Body::empty())
.expect("Failed to construct 'clean_tombstones' POST with empty body");
let resp = await!(self.client.request(post).compat())?;
let body = await!(resp.into_body().concat2().compat())?;
serde_json::from_slice::<ApiResult>(&body).map_err(From::from)
}
fn api_call_base_url(&self, api_path: &str) -> Url {
self.host
.clone()
.join(api_path)
.expect(&format!("Cannot create API url with path '{}'", api_path))
}
}