use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
use futures::future::{Future, IntoFuture};
use futures::stream::Stream;
use hyper::client::connect::Connect;
use hyper::{StatusCode, Uri};
use serde_derive::{Deserialize, Serialize};
use serde_json;
use tokio::timer::Timeout;
use url::Url;
pub use crate::error::WatchError;
use crate::client::{Client, ClusterInfo, Response};
use crate::error::{ApiError, Error};
use crate::first_ok::first_ok;
use crate::options::{
ComparisonConditions,
DeleteOptions,
GetOptions as InternalGetOptions,
SetOptions,
};
use url::form_urlencoded::Serializer;
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct KeyValueInfo {
pub action: Action,
pub node: Node,
#[serde(rename = "prevNode")]
pub prev_node: Option<Node>,
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub enum Action {
#[serde(rename = "compareAndDelete")]
CompareAndDelete,
#[serde(rename = "compareAndSwap")]
CompareAndSwap,
#[serde(rename = "create")]
Create,
#[serde(rename = "delete")]
Delete,
#[serde(rename = "expire")]
Expire,
#[serde(rename = "get")]
Get,
#[serde(rename = "set")]
Set,
#[serde(rename = "update")]
Update,
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct Node {
#[serde(rename = "createdIndex")]
pub created_index: Option<u64>,
pub dir: Option<bool>,
pub expiration: Option<String>,
pub key: Option<String>,
#[serde(rename = "modifiedIndex")]
pub modified_index: Option<u64>,
pub nodes: Option<Vec<Node>>,
pub ttl: Option<i64>,
pub value: Option<String>,
}
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
pub struct GetOptions {
pub recursive: bool,
pub sort: bool,
pub strong_consistency: bool,
}
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
pub struct WatchOptions {
pub index: Option<u64>,
pub recursive: bool,
pub timeout: Option<Duration>,
}
pub fn compare_and_delete<C>(
client: &Client<C>,
key: &str,
current_value: Option<&str>,
current_modified_index: Option<u64>,
) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
where
C: Clone + Connect,
{
raw_delete(
client,
key,
DeleteOptions {
conditions: Some(ComparisonConditions {
value: current_value,
modified_index: current_modified_index,
}),
..Default::default()
},
)
}
pub fn compare_and_swap<C>(
client: &Client<C>,
key: &str,
value: &str,
ttl: Option<u64>,
current_value: Option<&str>,
current_modified_index: Option<u64>,
) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
where
C: Clone + Connect,
{
raw_set(
client,
key,
SetOptions {
conditions: Some(ComparisonConditions {
value: current_value,
modified_index: current_modified_index,
}),
ttl: ttl,
value: Some(value),
..Default::default()
},
)
}
pub fn create<C>(
client: &Client<C>,
key: &str,
value: &str,
ttl: Option<u64>,
) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
where
C: Clone + Connect,
{
raw_set(
client,
key,
SetOptions {
prev_exist: Some(false),
ttl: ttl,
value: Some(value),
..Default::default()
},
)
}
pub fn create_dir<C>(
client: &Client<C>,
key: &str,
ttl: Option<u64>,
) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
where
C: Clone + Connect,
{
raw_set(
client,
key,
SetOptions {
dir: Some(true),
prev_exist: Some(false),
ttl: ttl,
..Default::default()
},
)
}
pub fn create_in_order<C>(
client: &Client<C>,
key: &str,
value: &str,
ttl: Option<u64>,
) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
where
C: Clone + Connect,
{
raw_set(
client,
key,
SetOptions {
create_in_order: true,
ttl: ttl,
value: Some(value),
..Default::default()
},
)
}
pub fn delete<C>(
client: &Client<C>,
key: &str,
recursive: bool,
) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
where
C: Clone + Connect,
{
raw_delete(
client,
key,
DeleteOptions {
recursive: Some(recursive),
..Default::default()
},
)
}
pub fn delete_dir<C>(
client: &Client<C>,
key: &str,
) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
where
C: Clone + Connect,
{
raw_delete(
client,
key,
DeleteOptions {
dir: Some(true),
..Default::default()
},
)
}
pub fn get<C>(
client: &Client<C>,
key: &str,
options: GetOptions,
) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
where
C: Clone + Connect,
{
raw_get(
client,
key,
InternalGetOptions {
recursive: options.recursive,
sort: Some(options.sort),
strong_consistency: options.strong_consistency,
..Default::default()
},
)
}
pub fn set<C>(
client: &Client<C>,
key: &str,
value: &str,
ttl: Option<u64>,
) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
where
C: Clone + Connect,
{
raw_set(
client,
key,
SetOptions {
ttl: ttl,
value: Some(value),
..Default::default()
},
)
}
pub fn set_dir<C>(
client: &Client<C>,
key: &str,
ttl: Option<u64>,
) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
where
C: Clone + Connect,
{
raw_set(
client,
key,
SetOptions {
dir: Some(true),
ttl: ttl,
..Default::default()
},
)
}
pub fn update<C>(
client: &Client<C>,
key: &str,
value: &str,
ttl: Option<u64>,
) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
where
C: Clone + Connect,
{
raw_set(
client,
key,
SetOptions {
prev_exist: Some(true),
ttl: ttl,
value: Some(value),
..Default::default()
},
)
}
pub fn update_dir<C>(
client: &Client<C>,
key: &str,
ttl: Option<u64>,
) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
where
C: Clone + Connect,
{
raw_set(
client,
key,
SetOptions {
dir: Some(true),
prev_exist: Some(true),
ttl: ttl,
..Default::default()
},
)
}
pub fn watch<C>(
client: &Client<C>,
key: &str,
options: WatchOptions,
) -> Box<dyn Future<Item = Response<KeyValueInfo>, Error = WatchError> + Send>
where
C: Clone + Connect,
{
let work = raw_get(
client,
key,
InternalGetOptions {
recursive: options.recursive,
wait_index: options.index,
wait: true,
..Default::default()
},
)
.map_err(|errors| WatchError::Other(errors));
if let Some(duration) = options.timeout {
Box::new(
Timeout::new(work, duration).map_err(|e| match e.into_inner() {
Some(we) => we,
None => WatchError::Timeout,
}),
)
} else {
Box::new(work)
}
}
fn build_url(endpoint: &Uri, path: &str) -> String {
format!("{}v2/keys{}", endpoint, path)
}
fn raw_delete<C>(
client: &Client<C>,
key: &str,
options: DeleteOptions<'_>,
) -> Box<dyn Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send>
where
C: Clone + Connect,
{
let mut query_pairs = HashMap::new();
if options.recursive.is_some() {
query_pairs.insert("recursive", format!("{}", options.recursive.unwrap()));
}
if options.dir.is_some() {
query_pairs.insert("dir", format!("{}", options.dir.unwrap()));
}
if options.conditions.is_some() {
let conditions = options.conditions.unwrap();
if conditions.is_empty() {
return Box::new(Err(vec![Error::InvalidConditions]).into_future());
}
if conditions.modified_index.is_some() {
query_pairs.insert(
"prevIndex",
format!("{}", conditions.modified_index.unwrap()),
);
}
if conditions.value.is_some() {
query_pairs.insert("prevValue", conditions.value.unwrap().to_owned());
}
}
let http_client = client.http_client().clone();
let key = key.to_string();
let result = first_ok(client.endpoints().to_vec(), move |endpoint| {
let url = Url::parse_with_params(&build_url(endpoint, &key), query_pairs.clone())
.map_err(Error::from)
.into_future();
let uri = url.and_then(|url| {
Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future()
});
let http_client = http_client.clone();
let response = uri.and_then(move |uri| http_client.delete(uri).map_err(Error::from));
response.and_then(move |response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| {
if status == StatusCode::OK {
match serde_json::from_slice::<KeyValueInfo>(body) {
Ok(data) => Ok(Response { data, cluster_info }),
Err(error) => Err(Error::Serialization(error)),
}
} else {
match serde_json::from_slice::<ApiError>(body) {
Ok(error) => Err(Error::Api(error)),
Err(error) => Err(Error::Serialization(error)),
}
}
})
})
});
Box::new(result)
}
fn raw_get<C>(
client: &Client<C>,
key: &str,
options: InternalGetOptions,
) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
where
C: Clone + Connect,
{
let mut query_pairs = HashMap::new();
query_pairs.insert("recursive", format!("{}", options.recursive));
if options.sort.is_some() {
query_pairs.insert("sorted", format!("{}", options.sort.unwrap()));
}
if options.wait {
query_pairs.insert("wait", "true".to_owned());
}
if options.wait_index.is_some() {
query_pairs.insert("waitIndex", format!("{}", options.wait_index.unwrap()));
}
let http_client = client.http_client().clone();
let key = key.to_string();
first_ok(client.endpoints().to_vec(), move |endpoint| {
let url = Url::parse_with_params(&build_url(endpoint, &key), query_pairs.clone())
.map_err(Error::from)
.into_future();
let uri = url.and_then(|url| {
Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future()
});
let http_client = http_client.clone();
let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from));
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| {
if status == StatusCode::OK {
match serde_json::from_slice::<KeyValueInfo>(body) {
Ok(data) => Ok(Response { data, cluster_info }),
Err(error) => Err(Error::Serialization(error)),
}
} else {
match serde_json::from_slice::<ApiError>(body) {
Ok(error) => Err(Error::Api(error)),
Err(error) => Err(Error::Serialization(error)),
}
}
})
})
})
}
fn raw_set<C>(
client: &Client<C>,
key: &str,
options: SetOptions<'_>,
) -> Box<dyn Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send>
where
C: Clone + Connect,
{
let mut http_options = vec![];
if let Some(ref value) = options.value {
http_options.push(("value".to_owned(), value.to_string()));
}
if let Some(ref ttl) = options.ttl {
http_options.push(("ttl".to_owned(), ttl.to_string()));
}
if let Some(ref dir) = options.dir {
http_options.push(("dir".to_owned(), dir.to_string()));
}
if let Some(ref prev_exist) = options.prev_exist {
http_options.push(("prevExist".to_owned(), prev_exist.to_string()));
}
if let Some(ref conditions) = options.conditions {
if conditions.is_empty() {
return Box::new(Err(vec![Error::InvalidConditions]).into_future());
}
if let Some(ref modified_index) = conditions.modified_index {
http_options.push(("prevIndex".to_owned(), modified_index.to_string()));
}
if let Some(ref value) = conditions.value {
http_options.push(("prevValue".to_owned(), value.to_string()));
}
}
let http_client = client.http_client().clone();
let key = key.to_string();
let create_in_order = options.create_in_order;
let result = first_ok(client.endpoints().to_vec(), move |endpoint| {
let mut serializer = Serializer::new(String::new());
serializer.extend_pairs(http_options.clone());
let body = serializer.finish();
let url = build_url(endpoint, &key);
let uri = Uri::from_str(url.as_str())
.map_err(Error::from)
.into_future();
let http_client = http_client.clone();
let response = uri.and_then(move |uri| {
if create_in_order {
http_client.post(uri, body).map_err(Error::from)
} else {
http_client.put(uri, body).map_err(Error::from)
}
});
response.and_then(|response| {
let status = response.status();
let cluster_info = ClusterInfo::from(response.headers());
let body = response.into_body().concat2().map_err(Error::from);
body.and_then(move |ref body| match status {
StatusCode::CREATED | StatusCode::OK => {
match serde_json::from_slice::<KeyValueInfo>(body) {
Ok(data) => Ok(Response { data, cluster_info }),
Err(error) => Err(Error::Serialization(error)),
}
}
_ => match serde_json::from_slice::<ApiError>(body) {
Ok(error) => Err(Error::Api(error)),
Err(error) => Err(Error::Serialization(error)),
},
})
})
});
Box::new(result)
}