etcd/
kv.rs

1//! etcd's key-value API.
2//!
3//! The term "node" in the documentation for this module refers to a key-value pair or a directory
4//! of key-value pairs. For example, "/foo" is a key if it has a value, but it is a directory if
5//! there other other key-value pairs "underneath" it, such as "/foo/bar".
6
7use std::collections::HashMap;
8use std::str::FromStr;
9use std::time::Duration;
10
11use futures::future::{Future, IntoFuture};
12use futures::stream::Stream;
13use hyper::client::connect::Connect;
14use hyper::{StatusCode, Uri};
15use serde_derive::{Deserialize, Serialize};
16use serde_json;
17use tokio::timer::Timeout;
18use url::Url;
19
20pub use crate::error::WatchError;
21
22use crate::client::{Client, ClusterInfo, Response};
23use crate::error::{ApiError, Error};
24use crate::first_ok::first_ok;
25use crate::options::{
26    ComparisonConditions,
27    DeleteOptions,
28    GetOptions as InternalGetOptions,
29    SetOptions,
30};
31use url::form_urlencoded::Serializer;
32
33/// Information about the result of a successful key-value API operation.
34#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
35pub struct KeyValueInfo {
36    /// The action that was taken, e.g. `get`, `set`.
37    pub action: Action,
38    /// The etcd `Node` that was operated upon.
39    pub node: Node,
40    /// The previous state of the target node.
41    #[serde(rename = "prevNode")]
42    pub prev_node: Option<Node>,
43}
44
45/// The type of action that was taken in response to a key value API request.
46///
47/// "Node" refers to the key or directory being acted upon.
48#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
49pub enum Action {
50    /// Atomic deletion of a node based on previous state.
51    #[serde(rename = "compareAndDelete")]
52    CompareAndDelete,
53    /// Atomtic update of a node based on previous state.
54    #[serde(rename = "compareAndSwap")]
55    CompareAndSwap,
56    /// Creation of a node that didn't previously exist.
57    #[serde(rename = "create")]
58    Create,
59    /// Deletion of a node.
60    #[serde(rename = "delete")]
61    Delete,
62    /// Expiration of a node.
63    #[serde(rename = "expire")]
64    Expire,
65    /// Retrieval of a node.
66    #[serde(rename = "get")]
67    Get,
68    /// Assignment of a node, which may have previously existed.
69    #[serde(rename = "set")]
70    Set,
71    /// Update of an existing node.
72    #[serde(rename = "update")]
73    Update,
74}
75
76/// An etcd key or directory.
77#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
78pub struct Node {
79    /// The new value of the etcd creation index.
80    #[serde(rename = "createdIndex")]
81    pub created_index: Option<u64>,
82    /// Whether or not the node is a directory.
83    pub dir: Option<bool>,
84    /// An ISO 8601 timestamp for when the key will expire.
85    pub expiration: Option<String>,
86    /// The name of the key.
87    pub key: Option<String>,
88    /// The new value of the etcd modification index.
89    #[serde(rename = "modifiedIndex")]
90    pub modified_index: Option<u64>,
91    /// Child nodes of a directory.
92    pub nodes: Option<Vec<Node>>,
93    /// The key's time to live in seconds.
94    pub ttl: Option<i64>,
95    /// The value of the key.
96    pub value: Option<String>,
97}
98
99/// Options for customizing the behavior of `kv::get`.
100#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
101pub struct GetOptions {
102    /// If true and the node is a directory, child nodes will be returned as well.
103    pub recursive: bool,
104    /// If true and the node is a directory, any child nodes returned will be sorted
105    /// alphabetically.
106    pub sort: bool,
107    /// If true, the etcd node serving the response will synchronize with the quorum before
108    /// returning the value.
109    ///
110    /// This is slower but avoids possibly stale data from being returned.
111    pub strong_consistency: bool,
112}
113
114/// Options for customizing the behavior of `kv::watch`.
115#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
116pub struct WatchOptions {
117    /// If given, the watch operation will return the first change at the index or greater,
118    /// allowing you to watch for changes that happened in the past.
119    pub index: Option<u64>,
120    /// Whether or not to watch all child keys as well.
121    pub recursive: bool,
122    /// If given, the watch operation will time out if it's still waiting after the duration.
123    pub timeout: Option<Duration>,
124}
125
126/// Deletes a node only if the given current value and/or current modified index match.
127///
128/// # Parameters
129///
130/// * client: A `Client` to use to make the API call.
131/// * key: The name of the node to delete.
132/// * current_value: If given, the node must currently have this value for the operation to
133/// succeed.
134/// * current_modified_index: If given, the node must currently be at this modified index for the
135/// operation to succeed.
136///
137/// # Errors
138///
139/// Fails if the conditions didn't match or if no conditions were given.
140pub fn compare_and_delete<C>(
141    client: &Client<C>,
142    key: &str,
143    current_value: Option<&str>,
144    current_modified_index: Option<u64>,
145) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
146where
147    C: Clone + Connect,
148{
149    raw_delete(
150        client,
151        key,
152        DeleteOptions {
153            conditions: Some(ComparisonConditions {
154                value: current_value,
155                modified_index: current_modified_index,
156            }),
157            ..Default::default()
158        },
159    )
160}
161
162/// Updates a node only if the given current value and/or current modified index
163/// match.
164///
165/// # Parameters
166///
167/// * client: A `Client` to use to make the API call.
168/// * key: The name of the node to update.
169/// * value: The new value for the node.
170/// * ttl: If given, the node will expire after this many seconds.
171/// * current_value: If given, the node must currently have this value for the operation to
172/// succeed.
173/// * current_modified_index: If given, the node must currently be at this modified index for the
174/// operation to succeed.
175///
176/// # Errors
177///
178/// Fails if the conditions didn't match or if no conditions were given.
179pub fn compare_and_swap<C>(
180    client: &Client<C>,
181    key: &str,
182    value: &str,
183    ttl: Option<u64>,
184    current_value: Option<&str>,
185    current_modified_index: Option<u64>,
186) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
187where
188    C: Clone + Connect,
189{
190    raw_set(
191        client,
192        key,
193        SetOptions {
194            conditions: Some(ComparisonConditions {
195                value: current_value,
196                modified_index: current_modified_index,
197            }),
198            ttl: ttl,
199            value: Some(value),
200            ..Default::default()
201        },
202    )
203}
204
205/// Creates a new key-value pair.
206///
207/// # Parameters
208///
209/// * client: A `Client` to use to make the API call.
210/// * key: The name of the key-value pair to create.
211/// * value: The new value for the node.
212/// * ttl: If given, the node will expire after this many seconds.
213///
214/// # Errors
215///
216/// Fails if the key already exists.
217pub fn create<C>(
218    client: &Client<C>,
219    key: &str,
220    value: &str,
221    ttl: Option<u64>,
222) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
223where
224    C: Clone + Connect,
225{
226    raw_set(
227        client,
228        key,
229        SetOptions {
230            prev_exist: Some(false),
231            ttl: ttl,
232            value: Some(value),
233            ..Default::default()
234        },
235    )
236}
237
238/// Creates a new empty directory.
239///
240/// # Parameters
241///
242/// * client: A `Client` to use to make the API call.
243/// * key: The name of the directory to create.
244/// * ttl: If given, the node will expire after this many seconds.
245///
246/// # Errors
247///
248/// Fails if the key already exists.
249pub fn create_dir<C>(
250    client: &Client<C>,
251    key: &str,
252    ttl: Option<u64>,
253) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
254where
255    C: Clone + Connect,
256{
257    raw_set(
258        client,
259        key,
260        SetOptions {
261            dir: Some(true),
262            prev_exist: Some(false),
263            ttl: ttl,
264            ..Default::default()
265        },
266    )
267}
268
269/// Creates a new key-value pair in a directory with a numeric key name larger than any of its
270/// sibling key-value pairs.
271///
272/// For example, the first value created with this function under the directory "/foo" will have a
273/// key name like "00000000000000000001" automatically generated. The second value created with
274/// this function under the same directory will have a key name like "00000000000000000002".
275///
276/// This behavior is guaranteed by the server.
277///
278/// # Parameters
279///
280/// * client: A `Client` to use to make the API call.
281/// * key: The name of the directory to create a key-value pair in.
282/// * value: The new value for the key-value pair.
283/// * ttl: If given, the node will expire after this many seconds.
284///
285/// # Errors
286///
287/// Fails if the key already exists and is not a directory.
288pub fn create_in_order<C>(
289    client: &Client<C>,
290    key: &str,
291    value: &str,
292    ttl: Option<u64>,
293) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
294where
295    C: Clone + Connect,
296{
297    raw_set(
298        client,
299        key,
300        SetOptions {
301            create_in_order: true,
302            ttl: ttl,
303            value: Some(value),
304            ..Default::default()
305        },
306    )
307}
308
309/// Deletes a node.
310///
311/// # Parameters
312///
313/// * client: A `Client` to use to make the API call.
314/// * key: The name of the node to delete.
315/// * recursive: If true, and the key is a directory, the directory and all child key-value
316/// pairs and directories will be deleted as well.
317///
318/// # Errors
319///
320/// Fails if the key is a directory and `recursive` is `false`.
321pub fn delete<C>(
322    client: &Client<C>,
323    key: &str,
324    recursive: bool,
325) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
326where
327    C: Clone + Connect,
328{
329    raw_delete(
330        client,
331        key,
332        DeleteOptions {
333            recursive: Some(recursive),
334            ..Default::default()
335        },
336    )
337}
338
339/// Deletes an empty directory or a key-value pair at the given key.
340///
341/// # Parameters
342///
343/// * client: A `Client` to use to make the API call.
344/// * key: The name of the node to delete.
345///
346/// # Errors
347///
348/// Fails if the directory is not empty.
349pub fn delete_dir<C>(
350    client: &Client<C>,
351    key: &str,
352) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
353where
354    C: Clone + Connect,
355{
356    raw_delete(
357        client,
358        key,
359        DeleteOptions {
360            dir: Some(true),
361            ..Default::default()
362        },
363    )
364}
365
366/// Gets the value of a node.
367///
368/// # Parameters
369///
370/// * client: A `Client` to use to make the API call.
371/// * key: The name of the node to retrieve.
372/// * options: Options to customize the behavior of the operation.
373///
374/// # Errors
375///
376/// Fails if the key doesn't exist.
377pub fn get<C>(
378    client: &Client<C>,
379    key: &str,
380    options: GetOptions,
381) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
382where
383    C: Clone + Connect,
384{
385    raw_get(
386        client,
387        key,
388        InternalGetOptions {
389            recursive: options.recursive,
390            sort: Some(options.sort),
391            strong_consistency: options.strong_consistency,
392            ..Default::default()
393        },
394    )
395}
396
397/// Sets the value of a key-value pair.
398///
399/// Any previous value and TTL will be replaced.
400///
401/// # Parameters
402///
403/// * client: A `Client` to use to make the API call.
404/// * key: The name of the key-value pair to set.
405/// * value: The new value for the key-value pair.
406/// * ttl: If given, the node will expire after this many seconds.
407///
408/// # Errors
409///
410/// Fails if the node is a directory.
411pub fn set<C>(
412    client: &Client<C>,
413    key: &str,
414    value: &str,
415    ttl: Option<u64>,
416) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
417where
418    C: Clone + Connect,
419{
420    raw_set(
421        client,
422        key,
423        SetOptions {
424            ttl: ttl,
425            value: Some(value),
426            ..Default::default()
427        },
428    )
429}
430
431/// Sets the key to an empty directory.
432///
433/// An existing key-value pair will be replaced, but an existing directory will not.
434///
435/// # Parameters
436///
437/// * client: A `Client` to use to make the API call.
438/// * key: The name of the directory to set.
439/// * ttl: If given, the node will expire after this many seconds.
440///
441/// # Errors
442///
443/// Fails if the node is an existing directory.
444pub fn set_dir<C>(
445    client: &Client<C>,
446    key: &str,
447    ttl: Option<u64>,
448) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
449where
450    C: Clone + Connect,
451{
452    raw_set(
453        client,
454        key,
455        SetOptions {
456            dir: Some(true),
457            ttl: ttl,
458            ..Default::default()
459        },
460    )
461}
462
463/// Updates an existing key-value pair.
464///
465/// # Parameters
466///
467/// * client: A `Client` to use to make the API call.
468/// * key: The name of the key-value pair to update.
469/// * value: The new value for the key-value pair.
470/// * ttl: If given, the node will expire after this many seconds.
471///
472/// # Errors
473///
474/// Fails if the key does not exist.
475pub fn update<C>(
476    client: &Client<C>,
477    key: &str,
478    value: &str,
479    ttl: Option<u64>,
480) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
481where
482    C: Clone + Connect,
483{
484    raw_set(
485        client,
486        key,
487        SetOptions {
488            prev_exist: Some(true),
489            ttl: ttl,
490            value: Some(value),
491            ..Default::default()
492        },
493    )
494}
495
496/// Updates a directory.
497///
498/// If the directory already existed, only the TTL is updated. If the key was a key-value pair, its
499/// value is removed and its TTL is updated.
500///
501/// # Parameters
502///
503/// * client: A `Client` to use to make the API call.
504/// * key: The name of the node to update.
505/// * ttl: If given, the node will expire after this many seconds.
506///
507/// # Errors
508///
509/// Fails if the node does not exist.
510pub fn update_dir<C>(
511    client: &Client<C>,
512    key: &str,
513    ttl: Option<u64>,
514) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
515where
516    C: Clone + Connect,
517{
518    raw_set(
519        client,
520        key,
521        SetOptions {
522            dir: Some(true),
523            prev_exist: Some(true),
524            ttl: ttl,
525            ..Default::default()
526        },
527    )
528}
529
530/// Watches a node for changes and returns the new value as soon as a change takes place.
531///
532/// # Parameters
533///
534/// * client: A `Client` to use to make the API call.
535/// * key: The name of the node to watch.
536/// * options: Options to customize the behavior of the operation.
537///
538/// # Errors
539///
540/// Fails if `options.index` is too old and has been flushed out of etcd's internal store of the
541/// most recent change events. In this case, the key should be queried for its latest
542/// "modified index" value and that should be used as the new `options.index` on a subsequent
543/// `watch`.
544///
545/// Fails if a timeout is specified and the duration lapses without a response from the etcd
546/// cluster.
547pub fn watch<C>(
548    client: &Client<C>,
549    key: &str,
550    options: WatchOptions,
551) -> Box<dyn Future<Item = Response<KeyValueInfo>, Error = WatchError> + Send>
552where
553    C: Clone + Connect,
554{
555    let work = raw_get(
556        client,
557        key,
558        InternalGetOptions {
559            recursive: options.recursive,
560            wait_index: options.index,
561            wait: true,
562            ..Default::default()
563        },
564    )
565    .map_err(|errors| WatchError::Other(errors));
566
567    if let Some(duration) = options.timeout {
568        Box::new(
569            Timeout::new(work, duration).map_err(|e| match e.into_inner() {
570                Some(we) => we,
571                None => WatchError::Timeout,
572            }),
573        )
574    } else {
575        Box::new(work)
576    }
577}
578
579/// Constructs the full URL for an API call.
580fn build_url(endpoint: &Uri, path: &str) -> String {
581    format!("{}v2/keys{}", endpoint, path)
582}
583
584/// Handles all delete operations.
585fn raw_delete<C>(
586    client: &Client<C>,
587    key: &str,
588    options: DeleteOptions<'_>,
589) -> Box<dyn Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send>
590where
591    C: Clone + Connect,
592{
593    let mut query_pairs = HashMap::new();
594
595    if options.recursive.is_some() {
596        query_pairs.insert("recursive", format!("{}", options.recursive.unwrap()));
597    }
598
599    if options.dir.is_some() {
600        query_pairs.insert("dir", format!("{}", options.dir.unwrap()));
601    }
602
603    if options.conditions.is_some() {
604        let conditions = options.conditions.unwrap();
605
606        if conditions.is_empty() {
607            return Box::new(Err(vec![Error::InvalidConditions]).into_future());
608        }
609
610        if conditions.modified_index.is_some() {
611            query_pairs.insert(
612                "prevIndex",
613                format!("{}", conditions.modified_index.unwrap()),
614            );
615        }
616
617        if conditions.value.is_some() {
618            query_pairs.insert("prevValue", conditions.value.unwrap().to_owned());
619        }
620    }
621
622    let http_client = client.http_client().clone();
623    let key = key.to_string();
624
625    let result = first_ok(client.endpoints().to_vec(), move |endpoint| {
626        let url = Url::parse_with_params(&build_url(endpoint, &key), query_pairs.clone())
627            .map_err(Error::from)
628            .into_future();
629
630        let uri = url.and_then(|url| {
631            Uri::from_str(url.as_str())
632                .map_err(Error::from)
633                .into_future()
634        });
635
636        let http_client = http_client.clone();
637
638        let response = uri.and_then(move |uri| http_client.delete(uri).map_err(Error::from));
639
640        response.and_then(move |response| {
641            let status = response.status();
642            let cluster_info = ClusterInfo::from(response.headers());
643            let body = response.into_body().concat2().map_err(Error::from);
644
645            body.and_then(move |ref body| {
646                if status == StatusCode::OK {
647                    match serde_json::from_slice::<KeyValueInfo>(body) {
648                        Ok(data) => Ok(Response { data, cluster_info }),
649                        Err(error) => Err(Error::Serialization(error)),
650                    }
651                } else {
652                    match serde_json::from_slice::<ApiError>(body) {
653                        Ok(error) => Err(Error::Api(error)),
654                        Err(error) => Err(Error::Serialization(error)),
655                    }
656                }
657            })
658        })
659    });
660
661    Box::new(result)
662}
663
664/// Handles all get operations.
665fn raw_get<C>(
666    client: &Client<C>,
667    key: &str,
668    options: InternalGetOptions,
669) -> impl Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send
670where
671    C: Clone + Connect,
672{
673    let mut query_pairs = HashMap::new();
674
675    query_pairs.insert("recursive", format!("{}", options.recursive));
676
677    if options.sort.is_some() {
678        query_pairs.insert("sorted", format!("{}", options.sort.unwrap()));
679    }
680
681    if options.wait {
682        query_pairs.insert("wait", "true".to_owned());
683    }
684
685    if options.wait_index.is_some() {
686        query_pairs.insert("waitIndex", format!("{}", options.wait_index.unwrap()));
687    }
688
689    let http_client = client.http_client().clone();
690    let key = key.to_string();
691
692    first_ok(client.endpoints().to_vec(), move |endpoint| {
693        let url = Url::parse_with_params(&build_url(endpoint, &key), query_pairs.clone())
694            .map_err(Error::from)
695            .into_future();
696
697        let uri = url.and_then(|url| {
698            Uri::from_str(url.as_str())
699                .map_err(Error::from)
700                .into_future()
701        });
702
703        let http_client = http_client.clone();
704
705        let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from));
706
707        response.and_then(|response| {
708            let status = response.status();
709            let cluster_info = ClusterInfo::from(response.headers());
710            let body = response.into_body().concat2().map_err(Error::from);
711
712            body.and_then(move |ref body| {
713                if status == StatusCode::OK {
714                    match serde_json::from_slice::<KeyValueInfo>(body) {
715                        Ok(data) => Ok(Response { data, cluster_info }),
716                        Err(error) => Err(Error::Serialization(error)),
717                    }
718                } else {
719                    match serde_json::from_slice::<ApiError>(body) {
720                        Ok(error) => Err(Error::Api(error)),
721                        Err(error) => Err(Error::Serialization(error)),
722                    }
723                }
724            })
725        })
726    })
727}
728
729/// Handles all set operations.
730fn raw_set<C>(
731    client: &Client<C>,
732    key: &str,
733    options: SetOptions<'_>,
734) -> Box<dyn Future<Item = Response<KeyValueInfo>, Error = Vec<Error>> + Send>
735where
736    C: Clone + Connect,
737{
738    let mut http_options = vec![];
739
740    if let Some(ref value) = options.value {
741        http_options.push(("value".to_owned(), value.to_string()));
742    }
743
744    if let Some(ref ttl) = options.ttl {
745        http_options.push(("ttl".to_owned(), ttl.to_string()));
746    }
747
748    if let Some(ref dir) = options.dir {
749        http_options.push(("dir".to_owned(), dir.to_string()));
750    }
751
752    if let Some(ref prev_exist) = options.prev_exist {
753        http_options.push(("prevExist".to_owned(), prev_exist.to_string()));
754    }
755
756    if let Some(ref conditions) = options.conditions {
757        if conditions.is_empty() {
758            return Box::new(Err(vec![Error::InvalidConditions]).into_future());
759        }
760
761        if let Some(ref modified_index) = conditions.modified_index {
762            http_options.push(("prevIndex".to_owned(), modified_index.to_string()));
763        }
764
765        if let Some(ref value) = conditions.value {
766            http_options.push(("prevValue".to_owned(), value.to_string()));
767        }
768    }
769
770    let http_client = client.http_client().clone();
771    let key = key.to_string();
772    let create_in_order = options.create_in_order;
773
774    let result = first_ok(client.endpoints().to_vec(), move |endpoint| {
775        let mut serializer = Serializer::new(String::new());
776        serializer.extend_pairs(http_options.clone());
777        let body = serializer.finish();
778
779        let url = build_url(endpoint, &key);
780        let uri = Uri::from_str(url.as_str())
781            .map_err(Error::from)
782            .into_future();
783
784        let http_client = http_client.clone();
785
786        let response = uri.and_then(move |uri| {
787            if create_in_order {
788                http_client.post(uri, body).map_err(Error::from)
789            } else {
790                http_client.put(uri, body).map_err(Error::from)
791            }
792        });
793
794        response.and_then(|response| {
795            let status = response.status();
796            let cluster_info = ClusterInfo::from(response.headers());
797            let body = response.into_body().concat2().map_err(Error::from);
798
799            body.and_then(move |ref body| match status {
800                StatusCode::CREATED | StatusCode::OK => {
801                    match serde_json::from_slice::<KeyValueInfo>(body) {
802                        Ok(data) => Ok(Response { data, cluster_info }),
803                        Err(error) => Err(Error::Serialization(error)),
804                    }
805                }
806                _ => match serde_json::from_slice::<ApiError>(body) {
807                    Ok(error) => Err(Error::Api(error)),
808                    Err(error) => Err(Error::Serialization(error)),
809                },
810            })
811        })
812    });
813
814    Box::new(result)
815}