use std::sync::Arc;
use std::time::{Duration, Instant};
use reqwest::{header, Client, RequestBuilder, Response};
use tracing::{debug, error};
use crate::config::Config;
use crate::error::{ConsulError, Result};
use crate::types::{QueryMeta, QueryOptions, WriteMeta, WriteOptions};
pub struct HttpClient {
config: Arc<Config>,
client: Client,
}
impl HttpClient {
pub fn new(config: Config) -> Result<Self> {
config.validate()?;
let mut builder = Client::builder()
.timeout(config.timeout)
.pool_max_idle_per_host(10);
if let Some(ref tls) = config.tls_config {
if tls.insecure_skip_verify {
builder = builder.danger_accept_invalid_certs(true);
}
}
let client = builder.build().map_err(ConsulError::HttpError)?;
Ok(Self {
config: Arc::new(config),
client,
})
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn url(&self, path: &str) -> String {
format!("{}{}", self.config.base_url(), path)
}
pub fn get(&self, path: &str) -> RequestBuilder {
let url = self.url(path);
debug!("GET {}", url);
self.apply_defaults(self.client.get(&url))
}
pub fn put(&self, path: &str) -> RequestBuilder {
let url = self.url(path);
debug!("PUT {}", url);
self.apply_defaults(self.client.put(&url))
}
pub fn post(&self, path: &str) -> RequestBuilder {
let url = self.url(path);
debug!("POST {}", url);
self.apply_defaults(self.client.post(&url))
}
pub fn delete(&self, path: &str) -> RequestBuilder {
let url = self.url(path);
debug!("DELETE {}", url);
self.apply_defaults(self.client.delete(&url))
}
fn apply_defaults(&self, mut builder: RequestBuilder) -> RequestBuilder {
if let Some(ref token) = self.config.token {
builder = builder.header("X-Consul-Token", token);
}
if let Some(ref auth) = self.config.http_auth {
builder = builder.basic_auth(&auth.username, Some(&auth.password));
}
builder
}
pub fn apply_query_options(&self, mut builder: RequestBuilder, opts: &QueryOptions) -> RequestBuilder {
let mut params: Vec<(&str, String)> = Vec::new();
if let Some(ref dc) = opts.datacenter {
params.push(("dc", dc.clone()));
} else if let Some(ref dc) = self.config.datacenter {
params.push(("dc", dc.clone()));
}
if let Some(ref token) = opts.token {
builder = builder.header("X-Consul-Token", token);
}
if let Some(ref ns) = opts.namespace {
params.push(("ns", ns.clone()));
} else if let Some(ref ns) = self.config.namespace {
params.push(("ns", ns.clone()));
}
if let Some(ref partition) = opts.partition {
params.push(("partition", partition.clone()));
} else if let Some(ref partition) = self.config.partition {
params.push(("partition", partition.clone()));
}
if opts.allow_stale {
params.push(("stale", String::new()));
}
if opts.require_consistent {
params.push(("consistent", String::new()));
}
if opts.wait_index > 0 {
params.push(("index", opts.wait_index.to_string()));
}
if let Some(wait_time) = opts.wait_time {
params.push(("wait", format!("{}s", wait_time.as_secs())));
}
if let Some(ref near) = opts.near {
params.push(("near", near.clone()));
}
for (key, value) in &opts.node_meta {
params.push(("node-meta", format!("{}:{}", key, value)));
}
if let Some(ref filter) = opts.filter {
params.push(("filter", filter.clone()));
}
if opts.use_cache {
builder = builder.header(header::CACHE_CONTROL, "");
if let Some(max_age) = opts.max_age {
builder = builder.header(header::CACHE_CONTROL, format!("max-age={}", max_age.as_secs()));
}
if let Some(stale_if_error) = opts.stale_if_error {
builder = builder.header(
header::CACHE_CONTROL,
format!("stale-if-error={}", stale_if_error.as_secs()),
);
}
}
if !params.is_empty() {
builder = builder.query(¶ms);
}
builder
}
pub fn apply_write_options(&self, mut builder: RequestBuilder, opts: &WriteOptions) -> RequestBuilder {
let mut params: Vec<(&str, String)> = Vec::new();
if let Some(ref dc) = opts.datacenter {
params.push(("dc", dc.clone()));
} else if let Some(ref dc) = self.config.datacenter {
params.push(("dc", dc.clone()));
}
if let Some(ref token) = opts.token {
builder = builder.header("X-Consul-Token", token);
}
if let Some(ref ns) = opts.namespace {
params.push(("ns", ns.clone()));
} else if let Some(ref ns) = self.config.namespace {
params.push(("ns", ns.clone()));
}
if let Some(ref partition) = opts.partition {
params.push(("partition", partition.clone()));
} else if let Some(ref partition) = self.config.partition {
params.push(("partition", partition.clone()));
}
if !params.is_empty() {
builder = builder.query(¶ms);
}
builder
}
pub async fn execute(&self, builder: RequestBuilder) -> Result<Response> {
let response = builder.send().await.map_err(ConsulError::HttpError)?;
Ok(response)
}
pub async fn execute_json<T: serde::de::DeserializeOwned>(
&self,
builder: RequestBuilder,
) -> Result<T> {
let response = self.execute(builder).await?;
self.handle_response_json(response).await
}
pub async fn query<T: serde::de::DeserializeOwned>(
&self,
builder: RequestBuilder,
) -> Result<(T, QueryMeta)> {
let start = Instant::now();
let response = self.execute(builder).await?;
let meta = self.parse_query_meta(&response, start.elapsed());
let data = self.handle_response_json(response).await?;
Ok((data, meta))
}
pub async fn write<T: serde::de::DeserializeOwned>(
&self,
builder: RequestBuilder,
) -> Result<(T, WriteMeta)> {
let start = Instant::now();
let response = self.execute(builder).await?;
let meta = WriteMeta {
request_time: start.elapsed(),
};
let data = self.handle_response_json(response).await?;
Ok((data, meta))
}
pub async fn write_bool(&self, builder: RequestBuilder) -> Result<(bool, WriteMeta)> {
let start = Instant::now();
let response = self.execute(builder).await?;
let meta = WriteMeta {
request_time: start.elapsed(),
};
let status = response.status();
if status.is_success() {
let text = response.text().await.map_err(ConsulError::HttpError)?;
let result = text.trim() == "true";
Ok((result, meta))
} else {
let text = response.text().await.unwrap_or_default();
Err(ConsulError::api_error(status.as_u16(), text))
}
}
pub async fn write_empty(&self, builder: RequestBuilder) -> Result<WriteMeta> {
let start = Instant::now();
let response = self.execute(builder).await?;
let status = response.status();
if status.is_success() {
Ok(WriteMeta {
request_time: start.elapsed(),
})
} else {
let text = response.text().await.unwrap_or_default();
Err(ConsulError::api_error(status.as_u16(), text))
}
}
async fn handle_response_json<T: serde::de::DeserializeOwned>(
&self,
response: Response,
) -> Result<T> {
let status = response.status();
if status.is_success() {
response.json::<T>().await.map_err(ConsulError::HttpError)
} else {
let text = response.text().await.unwrap_or_default();
error!("API error: {} - {}", status, text);
Err(ConsulError::api_error(status.as_u16(), text))
}
}
fn parse_query_meta(&self, response: &Response, request_time: Duration) -> QueryMeta {
let mut meta = QueryMeta {
request_time,
..Default::default()
};
if let Some(index) = response.headers().get("X-Consul-Index") {
if let Ok(s) = index.to_str() {
meta.last_index = s.parse().unwrap_or(0);
}
}
if let Some(leader) = response.headers().get("X-Consul-KnownLeader") {
if let Ok(s) = leader.to_str() {
meta.known_leader = s == "true";
}
}
if let Some(contact) = response.headers().get("X-Consul-LastContact") {
if let Ok(s) = contact.to_str() {
if let Ok(ms) = s.parse::<u64>() {
meta.last_contact = Duration::from_millis(ms);
}
}
}
if let Some(translate) = response.headers().get("X-Consul-Translate-Addresses") {
if let Ok(s) = translate.to_str() {
meta.address_translation_enabled = s == "true";
}
}
if let Some(hit) = response.headers().get("X-Cache") {
if let Ok(s) = hit.to_str() {
meta.cache_hit = s == "HIT";
}
}
if let Some(age) = response.headers().get("Age") {
if let Ok(s) = age.to_str() {
if let Ok(secs) = s.parse::<u64>() {
meta.cache_age = Some(Duration::from_secs(secs));
}
}
}
meta
}
pub async fn ping(&self) -> Result<()> {
let response = self.get("/v1/status/leader").send().await.map_err(ConsulError::HttpError)?;
if response.status().is_success() {
Ok(())
} else {
Err(ConsulError::api_error(
response.status().as_u16(),
"failed to connect to Consul",
))
}
}
}
impl Clone for HttpClient {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
client: self.client.clone(),
}
}
}