1use std::collections::HashMap;
8use std::str::FromStr;
9use std::time::Duration;
10
11use futures::future::{Future, IntoFuture};
12use futures::stream::Stream;
13use hyper::client::connect::Connect;
14use hyper::{StatusCode, Uri};
15use serde_derive::{Deserialize, Serialize};
16use serde_json;
17use tokio::timer::Timeout;
18use url::Url;
19
20pub use crate::error::WatchError;
21
22use crate::client::{Client, ClusterInfo, Response};
23use crate::error::{ApiError, Error};
24use crate::first_ok::first_ok;
25use crate::options::{
26 ComparisonConditions,
27 DeleteOptions,
28 GetOptions as InternalGetOptions,
29 SetOptions,
30};
31use url::form_urlencoded::Serializer;
32
33#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
35pub struct KeyValueInfo {
36 pub action: Action,
38 pub node: Node,
40 #[serde(rename = "prevNode")]
42 pub prev_node: Option<Node>,
43}
44
45#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
49pub enum Action {
50 #[serde(rename = "compareAndDelete")]
52 CompareAndDelete,
53 #[serde(rename = "compareAndSwap")]
55 CompareAndSwap,
56 #[serde(rename = "create")]
58 Create,
59 #[serde(rename = "delete")]
61 Delete,
62 #[serde(rename = "expire")]
64 Expire,
65 #[serde(rename = "get")]
67 Get,
68 #[serde(rename = "set")]
70 Set,
71 #[serde(rename = "update")]
73 Update,
74}
75
76#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
78pub struct Node {
79 #[serde(rename = "createdIndex")]
81 pub created_index: Option<u64>,
82 pub dir: Option<bool>,
84 pub expiration: Option<String>,
86 pub key: Option<String>,
88 #[serde(rename = "modifiedIndex")]
90 pub modified_index: Option<u64>,
91 pub nodes: Option<Vec<Node>>,
93 pub ttl: Option<i64>,
95 pub value: Option<String>,
97}
98
99#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
101pub struct GetOptions {
102 pub recursive: bool,
104 pub sort: bool,
107 pub strong_consistency: bool,
112}
113
114#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
116pub struct WatchOptions {
117 pub index: Option<u64>,
120 pub recursive: bool,
122 pub timeout: Option<Duration>,
124}
125
126pub fn compare_and_delete<C>(
141 client: &Client<C>,
142 key: &str,
143 current_value: Option<&str>,
144 current_modified_index: Option<u64>,
145) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
146where
147 C: Clone + Connect,
148{
149 raw_delete(
150 client,
151 key,
152 DeleteOptions {
153 conditions: Some(ComparisonConditions {
154 value: current_value,
155 modified_index: current_modified_index,
156 }),
157 ..Default::default()
158 },
159 )
160}
161
162pub fn compare_and_swap<C>(
180 client: &Client<C>,
181 key: &str,
182 value: &str,
183 ttl: Option<u64>,
184 current_value: Option<&str>,
185 current_modified_index: Option<u64>,
186) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
187where
188 C: Clone + Connect,
189{
190 raw_set(
191 client,
192 key,
193 SetOptions {
194 conditions: Some(ComparisonConditions {
195 value: current_value,
196 modified_index: current_modified_index,
197 }),
198 ttl: ttl,
199 value: Some(value),
200 ..Default::default()
201 },
202 )
203}
204
205pub fn create<C>(
218 client: &Client<C>,
219 key: &str,
220 value: &str,
221 ttl: Option<u64>,
222) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
223where
224 C: Clone + Connect,
225{
226 raw_set(
227 client,
228 key,
229 SetOptions {
230 prev_exist: Some(false),
231 ttl: ttl,
232 value: Some(value),
233 ..Default::default()
234 },
235 )
236}
237
238pub fn create_dir<C>(
250 client: &Client<C>,
251 key: &str,
252 ttl: Option<u64>,
253) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
254where
255 C: Clone + Connect,
256{
257 raw_set(
258 client,
259 key,
260 SetOptions {
261 dir: Some(true),
262 prev_exist: Some(false),
263 ttl: ttl,
264 ..Default::default()
265 },
266 )
267}
268
269pub fn create_in_order<C>(
289 client: &Client<C>,
290 key: &str,
291 value: &str,
292 ttl: Option<u64>,
293) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
294where
295 C: Clone + Connect,
296{
297 raw_set(
298 client,
299 key,
300 SetOptions {
301 create_in_order: true,
302 ttl: ttl,
303 value: Some(value),
304 ..Default::default()
305 },
306 )
307}
308
309pub fn delete<C>(
322 client: &Client<C>,
323 key: &str,
324 recursive: bool,
325) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
326where
327 C: Clone + Connect,
328{
329 raw_delete(
330 client,
331 key,
332 DeleteOptions {
333 recursive: Some(recursive),
334 ..Default::default()
335 },
336 )
337}
338
339pub fn delete_dir<C>(
350 client: &Client<C>,
351 key: &str,
352) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
353where
354 C: Clone + Connect,
355{
356 raw_delete(
357 client,
358 key,
359 DeleteOptions {
360 dir: Some(true),
361 ..Default::default()
362 },
363 )
364}
365
366pub fn get<C>(
378 client: &Client<C>,
379 key: &str,
380 options: GetOptions,
381) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
382where
383 C: Clone + Connect,
384{
385 raw_get(
386 client,
387 key,
388 InternalGetOptions {
389 recursive: options.recursive,
390 sort: Some(options.sort),
391 strong_consistency: options.strong_consistency,
392 ..Default::default()
393 },
394 )
395}
396
397pub fn set<C>(
412 client: &Client<C>,
413 key: &str,
414 value: &str,
415 ttl: Option<u64>,
416) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
417where
418 C: Clone + Connect,
419{
420 raw_set(
421 client,
422 key,
423 SetOptions {
424 ttl: ttl,
425 value: Some(value),
426 ..Default::default()
427 },
428 )
429}
430
431pub fn set_dir<C>(
445 client: &Client<C>,
446 key: &str,
447 ttl: Option<u64>,
448) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
449where
450 C: Clone + Connect,
451{
452 raw_set(
453 client,
454 key,
455 SetOptions {
456 dir: Some(true),
457 ttl: ttl,
458 ..Default::default()
459 },
460 )
461}
462
463pub fn update<C>(
476 client: &Client<C>,
477 key: &str,
478 value: &str,
479 ttl: Option<u64>,
480) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
481where
482 C: Clone + Connect,
483{
484 raw_set(
485 client,
486 key,
487 SetOptions {
488 prev_exist: Some(true),
489 ttl: ttl,
490 value: Some(value),
491 ..Default::default()
492 },
493 )
494}
495
496pub fn update_dir<C>(
511 client: &Client<C>,
512 key: &str,
513 ttl: Option<u64>,
514) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
515where
516 C: Clone + Connect,
517{
518 raw_set(
519 client,
520 key,
521 SetOptions {
522 dir: Some(true),
523 prev_exist: Some(true),
524 ttl: ttl,
525 ..Default::default()
526 },
527 )
528}
529
530pub fn watch<C>(
548 client: &Client<C>,
549 key: &str,
550 options: WatchOptions,
551) -> Box<dyn Future<Item = Response<KeyValueInfo>, Error = WatchError> + Send>
552where
553 C: Clone + Connect,
554{
555 let work = raw_get(
556 client,
557 key,
558 InternalGetOptions {
559 recursive: options.recursive,
560 wait_index: options.index,
561 wait: true,
562 ..Default::default()
563 },
564 )
565 .map_err(|errors| WatchError::Other(errors));
566
567 if let Some(duration) = options.timeout {
568 Box::new(
569 Timeout::new(work, duration).map_err(|e| match e.into_inner() {
570 Some(we) => we,
571 None => WatchError::Timeout,
572 }),
573 )
574 } else {
575 Box::new(work)
576 }
577}
578
579fn build_url(endpoint: &Uri, path: &str) -> String {
581 format!("{}v2/keys{}", endpoint, path)
582}
583
584fn raw_delete<C>(
586 client: &Client<C>,
587 key: &str,
588 options: DeleteOptions<'_>,
589) -> Box<dyn Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send>
590where
591 C: Clone + Connect,
592{
593 let mut query_pairs = HashMap::new();
594
595 if options.recursive.is_some() {
596 query_pairs.insert("recursive", format!("{}", options.recursive.unwrap()));
597 }
598
599 if options.dir.is_some() {
600 query_pairs.insert("dir", format!("{}", options.dir.unwrap()));
601 }
602
603 if options.conditions.is_some() {
604 let conditions = options.conditions.unwrap();
605
606 if conditions.is_empty() {
607 return Box::new(Err(vec![Error::InvalidConditions]).into_future());
608 }
609
610 if conditions.modified_index.is_some() {
611 query_pairs.insert(
612 "prevIndex",
613 format!("{}", conditions.modified_index.unwrap()),
614 );
615 }
616
617 if conditions.value.is_some() {
618 query_pairs.insert("prevValue", conditions.value.unwrap().to_owned());
619 }
620 }
621
622 let http_client = client.http_client().clone();
623 let key = key.to_string();
624
625 let result = first_ok(client.endpoints().to_vec(), move |endpoint| {
626 let url = Url::parse_with_params(&build_url(endpoint, &key), query_pairs.clone())
627 .map_err(Error::from)
628 .into_future();
629
630 let uri = url.and_then(|url| {
631 Uri::from_str(url.as_str())
632 .map_err(Error::from)
633 .into_future()
634 });
635
636 let http_client = http_client.clone();
637
638 let response = uri.and_then(move |uri| http_client.delete(uri).map_err(Error::from));
639
640 response.and_then(move |response| {
641 let status = response.status();
642 let cluster_info = ClusterInfo::from(response.headers());
643 let body = response.into_body().concat2().map_err(Error::from);
644
645 body.and_then(move |ref body| {
646 if status == StatusCode::OK {
647 match serde_json::from_slice::<KeyValueInfo>(body) {
648 Ok(data) => Ok(Response { data, cluster_info }),
649 Err(error) => Err(Error::Serialization(error)),
650 }
651 } else {
652 match serde_json::from_slice::<ApiError>(body) {
653 Ok(error) => Err(Error::Api(error)),
654 Err(error) => Err(Error::Serialization(error)),
655 }
656 }
657 })
658 })
659 });
660
661 Box::new(result)
662}
663
664fn raw_get<C>(
666 client: &Client<C>,
667 key: &str,
668 options: InternalGetOptions,
669) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
670where
671 C: Clone + Connect,
672{
673 let mut query_pairs = HashMap::new();
674
675 query_pairs.insert("recursive", format!("{}", options.recursive));
676
677 if options.sort.is_some() {
678 query_pairs.insert("sorted", format!("{}", options.sort.unwrap()));
679 }
680
681 if options.wait {
682 query_pairs.insert("wait", "true".to_owned());
683 }
684
685 if options.wait_index.is_some() {
686 query_pairs.insert("waitIndex", format!("{}", options.wait_index.unwrap()));
687 }
688
689 let http_client = client.http_client().clone();
690 let key = key.to_string();
691
692 first_ok(client.endpoints().to_vec(), move |endpoint| {
693 let url = Url::parse_with_params(&build_url(endpoint, &key), query_pairs.clone())
694 .map_err(Error::from)
695 .into_future();
696
697 let uri = url.and_then(|url| {
698 Uri::from_str(url.as_str())
699 .map_err(Error::from)
700 .into_future()
701 });
702
703 let http_client = http_client.clone();
704
705 let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from));
706
707 response.and_then(|response| {
708 let status = response.status();
709 let cluster_info = ClusterInfo::from(response.headers());
710 let body = response.into_body().concat2().map_err(Error::from);
711
712 body.and_then(move |ref body| {
713 if status == StatusCode::OK {
714 match serde_json::from_slice::<KeyValueInfo>(body) {
715 Ok(data) => Ok(Response { data, cluster_info }),
716 Err(error) => Err(Error::Serialization(error)),
717 }
718 } else {
719 match serde_json::from_slice::<ApiError>(body) {
720 Ok(error) => Err(Error::Api(error)),
721 Err(error) => Err(Error::Serialization(error)),
722 }
723 }
724 })
725 })
726 })
727}
728
729fn raw_set<C>(
731 client: &Client<C>,
732 key: &str,
733 options: SetOptions<'_>,
734) -> Box<dyn Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send>
735where
736 C: Clone + Connect,
737{
738 let mut http_options = vec![];
739
740 if let Some(ref value) = options.value {
741 http_options.push(("value".to_owned(), value.to_string()));
742 }
743
744 if let Some(ref ttl) = options.ttl {
745 http_options.push(("ttl".to_owned(), ttl.to_string()));
746 }
747
748 if let Some(ref dir) = options.dir {
749 http_options.push(("dir".to_owned(), dir.to_string()));
750 }
751
752 if let Some(ref prev_exist) = options.prev_exist {
753 http_options.push(("prevExist".to_owned(), prev_exist.to_string()));
754 }
755
756 if let Some(ref conditions) = options.conditions {
757 if conditions.is_empty() {
758 return Box::new(Err(vec![Error::InvalidConditions]).into_future());
759 }
760
761 if let Some(ref modified_index) = conditions.modified_index {
762 http_options.push(("prevIndex".to_owned(), modified_index.to_string()));
763 }
764
765 if let Some(ref value) = conditions.value {
766 http_options.push(("prevValue".to_owned(), value.to_string()));
767 }
768 }
769
770 let http_client = client.http_client().clone();
771 let key = key.to_string();
772 let create_in_order = options.create_in_order;
773
774 let result = first_ok(client.endpoints().to_vec(), move |endpoint| {
775 let mut serializer = Serializer::new(String::new());
776 serializer.extend_pairs(http_options.clone());
777 let body = serializer.finish();
778
779 let url = build_url(endpoint, &key);
780 let uri = Uri::from_str(url.as_str())
781 .map_err(Error::from)
782 .into_future();
783
784 let http_client = http_client.clone();
785
786 let response = uri.and_then(move |uri| {
787 if create_in_order {
788 http_client.post(uri, body).map_err(Error::from)
789 } else {
790 http_client.put(uri, body).map_err(Error::from)
791 }
792 });
793
794 response.and_then(|response| {
795 let status = response.status();
796 let cluster_info = ClusterInfo::from(response.headers());
797 let body = response.into_body().concat2().map_err(Error::from);
798
799 body.and_then(move |ref body| match status {
800 StatusCode::CREATED | StatusCode::OK => {
801 match serde_json::from_slice::<KeyValueInfo>(body) {
802 Ok(data) => Ok(Response { data, cluster_info }),
803 Err(error) => Err(Error::Serialization(error)),
804 }
805 }
806 _ => match serde_json::from_slice::<ApiError>(body) {
807 Ok(error) => Err(Error::Api(error)),
808 Err(error) => Err(Error::Serialization(error)),
809 },
810 })
811 })
812 });
813
814 Box::new(result)
815}