use futures::stream::futures_unordered;
use futures::{Future, IntoFuture, Stream};
use http::header::{HeaderMap, HeaderValue};
use hyper::client::connect::{Connect, HttpConnector};
use hyper::{Client as Hyper, StatusCode, Uri};
#[cfg(feature = "tls")]
use hyper_tls::HttpsConnector;
use log::error;
use serde::de::DeserializeOwned;
use serde_derive::{Deserialize, Serialize};
use serde_json;
use crate::error::{ApiError, Error};
use crate::http::HttpClient;
use crate::version::VersionInfo;
const XETCD_CLUSTER_ID: &str = "X-Etcd-Cluster-Id";
const XETCD_INDEX: &str = "X-Etcd-Index";
const XRAFT_INDEX: &str = "X-Raft-Index";
const XRAFT_TERM: &str = "X-Raft-Term";
#[derive(Clone, Debug)]
pub struct Client<C>
where
C: Clone + Connect + Sync + 'static,
{
endpoints: Vec<Uri>,
http_client: HttpClient<C>,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct BasicAuth {
pub username: String,
pub password: String,
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct Health {
pub health: String,
}
impl Client<HttpConnector> {
pub fn new(
endpoints: &[&str],
basic_auth: Option<BasicAuth>,
) -> Result<Client<HttpConnector>, Error> {
let hyper = Hyper::builder().keep_alive(true).build_http();
Client::custom(hyper, endpoints, basic_auth)
}
}
#[cfg(feature = "tls")]
impl Client<HttpsConnector<HttpConnector>> {
pub fn https(
endpoints: &[&str],
basic_auth: Option<BasicAuth>,
) -> Result<Client<HttpsConnector<HttpConnector>>, Error> {
let connector = HttpsConnector::new(4)?;
let hyper = Hyper::builder().keep_alive(true).build(connector);
Client::custom(hyper, endpoints, basic_auth)
}
}
impl<C> Client<C>
where
C: Clone + Connect + Sync + 'static,
{
pub fn custom(
hyper: Hyper<C>,
endpoints: &[&str],
basic_auth: Option<BasicAuth>,
) -> Result<Client<C>, Error> {
if endpoints.len() < 1 {
return Err(Error::NoEndpoints);
}
let mut uri_endpoints = Vec::with_capacity(endpoints.len());
for endpoint in endpoints {
uri_endpoints.push(endpoint.parse()?);
}
Ok(Client {
endpoints: uri_endpoints,
http_client: HttpClient::new(hyper, basic_auth),
})
}
pub(crate) fn http_client(&self) -> &HttpClient<C> {
&self.http_client
}
pub(crate) fn endpoints(&self) -> &[Uri] {
&self.endpoints
}
pub fn health(&self) -> impl Stream<Item = Response<Health>, Error = Error> + Send {
let futures = self.endpoints.iter().map(|endpoint| {
let url = build_url(&endpoint, "health");
let uri = url.parse().map_err(Error::from).into_future();
let cloned_client = self.http_client.clone();
let response = uri.and_then(move |uri| cloned_client.get(uri).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| {
if status == StatusCode::OK {
match serde_json::from_slice::<Health>(body) {
Ok(data) => Ok(Response { data, cluster_info }),
Err(error) => Err(Error::Serialization(error)),
}
} else {
match serde_json::from_slice::<ApiError>(body) {
Ok(error) => Err(Error::Api(error)),
Err(error) => Err(Error::Serialization(error)),
}
}
})
})
});
futures_unordered(futures)
}
pub fn versions(&self) -> impl Stream<Item = Response<VersionInfo>, Error = Error> + Send {
let futures = self.endpoints.iter().map(|endpoint| {
let url = build_url(&endpoint, "version");
let uri = url.parse().map_err(Error::from).into_future();
let cloned_client = self.http_client.clone();
let response = uri.and_then(move |uri| cloned_client.get(uri).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| {
if status == StatusCode::OK {
match serde_json::from_slice::<VersionInfo>(body) {
Ok(data) => Ok(Response { data, cluster_info }),
Err(error) => Err(Error::Serialization(error)),
}
} else {
match serde_json::from_slice::<ApiError>(body) {
Ok(error) => Err(Error::Api(error)),
Err(error) => Err(Error::Serialization(error)),
}
}
})
})
});
futures_unordered(futures)
}
pub(crate) fn request<U, T>(
&self,
uri: U,
) -> impl Future<Item = Response<T>, Error = Error> + Send
where
U: Future<Item = Uri, Error = Error> + Send,
T: DeserializeOwned + Send + 'static,
{
let http_client = self.http_client.clone();
let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |body| {
if status == StatusCode::OK {
match serde_json::from_slice::<T>(&body) {
Ok(data) => Ok(Response { data, cluster_info }),
Err(error) => Err(Error::Serialization(error)),
}
} else {
match serde_json::from_slice::<ApiError>(&body) {
Ok(error) => Err(Error::Api(error)),
Err(error) => Err(Error::Serialization(error)),
}
}
})
})
}
}
#[derive(Clone, Debug)]
pub struct Response<T> {
pub cluster_info: ClusterInfo,
pub data: T,
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct ClusterInfo {
pub cluster_id: Option<String>,
pub etcd_index: Option<u64>,
pub raft_index: Option<u64>,
pub raft_term: Option<u64>,
}
impl<'a> From<&'a HeaderMap<HeaderValue>> for ClusterInfo {
fn from(headers: &'a HeaderMap<HeaderValue>) -> Self {
let cluster_id = headers.get(XETCD_CLUSTER_ID).and_then(|v| {
match String::from_utf8(v.as_bytes().to_vec()) {
Ok(s) => Some(s),
Err(e) => {
error!("{} header decode error: {:?}", XETCD_CLUSTER_ID, e);
None
}
}
});
let etcd_index = headers.get(XETCD_INDEX).and_then(|v| {
match String::from_utf8(v.as_bytes().to_vec())
.map_err(|e| format!("{:?}", e))
.and_then(|s| s.parse().map_err(|e| format!("{:?}", e)))
{
Ok(i) => Some(i),
Err(e) => {
error!("{} header decode error: {}", XETCD_INDEX, e);
None
}
}
});
let raft_index = headers.get(XRAFT_INDEX).and_then(|v| {
match String::from_utf8(v.as_bytes().to_vec())
.map_err(|e| format!("{:?}", e))
.and_then(|s| s.parse().map_err(|e| format!("{:?}", e)))
{
Ok(i) => Some(i),
Err(e) => {
error!("{} header decode error: {}", XRAFT_INDEX, e);
None
}
}
});
let raft_term = headers.get(XRAFT_TERM).and_then(|v| {
match String::from_utf8(v.as_bytes().to_vec())
.map_err(|e| format!("{:?}", e))
.and_then(|s| s.parse().map_err(|e| format!("{:?}", e)))
{
Ok(i) => Some(i),
Err(e) => {
error!("{} header decode error: {}", XRAFT_TERM, e);
None
}
}
});
ClusterInfo {
cluster_id: cluster_id,
etcd_index: etcd_index,
raft_index: raft_index,
raft_term: raft_term,
}
}
}
fn build_url(endpoint: &Uri, path: &str) -> String {
format!("{}{}", endpoint, path)
}