use std::str::FromStr;
use futures::{Future, IntoFuture, Stream};
use hyper::client::connect::Connect;
use hyper::{StatusCode, Uri};
use serde_derive::{Deserialize, Serialize};
use serde_json;
use crate::client::{Client, ClusterInfo, Response};
use crate::error::{ApiError, Error};
use crate::first_ok::first_ok;
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct Member {
pub id: String,
pub name: String,
#[serde(rename = "peerURLs")]
pub peer_urls: Vec<String>,
#[serde(rename = "clientURLs")]
pub client_urls: Vec<String>,
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
struct PeerUrls {
#[serde(rename = "peerURLs")]
peer_urls: Vec<String>,
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
struct ListResponse {
members: Vec<Member>,
}
pub fn add<C>(
client: &Client<C>,
peer_urls: Vec<String>,
) -> Box<dyn Future<Item = Response<()>, Error = Vec<Error>>>
where
C: Clone + Connect,
{
let peer_urls = PeerUrls { peer_urls };
let body = match serde_json::to_string(&peer_urls) {
Ok(body) => body,
Err(error) => return Box::new(Err(vec![Error::Serialization(error)]).into_future()),
};
let http_client = client.http_client().clone();
let result = first_ok(client.endpoints().to_vec(), move |member| {
let url = build_url(member, "");
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let body = body.clone();
let http_client = http_client.clone();
let response = uri.and_then(move |uri| http_client.post(uri, body).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| {
if status == StatusCode::CREATED {
Ok(Response {
data: (),
cluster_info,
})
} else {
match serde_json::from_slice::<ApiError>(body) {
Ok(error) => Err(Error::Api(error)),
Err(error) => Err(Error::Serialization(error)),
}
}
})
})
});
Box::new(result)
}
pub fn delete<C>(
client: &Client<C>,
id: String,
) -> impl Future<Item = Response<()>, Error = Vec<Error>> + Send
where
C: Clone + Connect,
{
let http_client = client.http_client().clone();
first_ok(client.endpoints().to_vec(), move |member| {
let url = build_url(member, &format!("/{}", id));
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let http_client = http_client.clone();
let response = uri.and_then(move |uri| http_client.delete(uri).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| {
if status == StatusCode::NO_CONTENT {
Ok(Response {
data: (),
cluster_info,
})
} else {
match serde_json::from_slice::<ApiError>(body) {
Ok(error) => Err(Error::Api(error)),
Err(error) => Err(Error::Serialization(error)),
}
}
})
})
})
}
pub fn list<C>(
client: &Client<C>,
) -> impl Future<Item = Response<Vec<Member>>, Error = Vec<Error>> + Send
where
C: Clone + Connect,
{
let http_client = client.http_client().clone();
first_ok(client.endpoints().to_vec(), move |member| {
let url = build_url(member, "");
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let http_client = http_client.clone();
let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| {
if status == StatusCode::OK {
match serde_json::from_slice::<ListResponse>(body) {
Ok(data) => Ok(Response {
data: data.members,
cluster_info,
}),
Err(error) => Err(Error::Serialization(error)),
}
} else {
match serde_json::from_slice::<ApiError>(body) {
Ok(error) => Err(Error::Api(error)),
Err(error) => Err(Error::Serialization(error)),
}
}
})
})
})
}
pub fn update<C>(
client: &Client<C>,
id: String,
peer_urls: Vec<String>,
) -> Box<dyn Future<Item = Response<()>, Error = Vec<Error>>>
where
C: Clone + Connect,
{
let peer_urls = PeerUrls { peer_urls };
let body = match serde_json::to_string(&peer_urls) {
Ok(body) => body,
Err(error) => return Box::new(Err(vec![Error::Serialization(error)]).into_future()),
};
let http_client = client.http_client().clone();
let result = first_ok(client.endpoints().to_vec(), move |member| {
let url = build_url(member, &format!("/{}", id));
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let body = body.clone();
let http_client = http_client.clone();
let response = uri.and_then(move |uri| http_client.put(uri, body).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| {
if status == StatusCode::NO_CONTENT {
Ok(Response {
data: (),
cluster_info,
})
} else {
match serde_json::from_slice::<ApiError>(body) {
Ok(error) => Err(Error::Api(error)),
Err(error) => Err(Error::Serialization(error)),
}
}
})
})
});
Box::new(result)
}
fn build_url(endpoint: &Uri, path: &str) -> String {
format!("{}v2/members{}", endpoint, path)
}