tokio_zookeeper/
lib.rs

1//! This crate provides a client for interacting with [Apache
2//! ZooKeeper](https://zookeeper.apache.org/), a highly reliable distributed service for
3//! maintaining configuration information, naming, providing distributed synchronization, and
4//! providing group services.
5//!
6//! # About ZooKeeper
7//!
8//! The [ZooKeeper Overview](https://zookeeper.apache.org/doc/current/zookeeperOver.html) provides
9//! a thorough introduction to ZooKeeper, but we'll repeat the most important points here. At its
10//! [heart](https://zookeeper.apache.org/doc/current/zookeeperOver.html#sc_designGoals), ZooKeeper
11//! is a [hierarchical key-value
12//! store](https://zookeeper.apache.org/doc/current/zookeeperOver.html#sc_dataModelNameSpace) (that
13//! is, keys can have "sub-keys"), which additional mechanisms that guarantee consistent operation
14//! across client and server failures. Keys in ZooKeeper look like paths (e.g., `/key/subkey`), and
15//! every item along a path is called a
16//! "[Znode](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#sc_zkDataModel_znodes)".
17//! Each Znode (including those with children) can also have associated data, which can be queried
18//! and updated like in other key-value stores. Along with its data and children, each Znode stores
19//! meta-information such as [access-control
20//! lists](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#sc_ZooKeeperAccessControl),
21//! [modification
22//! timestamps](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#sc_timeInZk),
23//! and a version number
24//! that allows clients to avoid stepping on each other's toes when accessing values (more on that
25//! later).
26//!
27//! ## Operations
28//!
29//! ZooKeeper's API consists of the same basic operations you would expect to find in a
30//! file-system: [`create`](struct.ZooKeeper.html#method.create) for creating new Znodes,
31//! [`delete`](struct.ZooKeeper.html#method.delete) for removing them,
32//! [`exists`](struct.ZooKeeper.html#method.exists) for checking if a node exists,
33//! [`get_data`](struct.ZooKeeper.html#method.get_data) and
34//! [`set_data`](struct.ZooKeeper.html#method.set_data) for getting and setting a node's associated
35//! data respectively, and [`get_children`](struct.ZooKeeper.html#method.get_children) for
36//! retrieving the children of a given node (i.e., its subkeys). For all of these operations,
37//! ZooKeeper gives [strong
38//! guarantees](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#ch_zkGuarantees)
39//! about what happens when there are multiple clients interacting with the system, or even what
40//! happens in response to system and network failures.
41//!
42//! ## Ephemeral nodes
43//!
44//! When you create a Znode, you also specify a [`CreateMode`]. Nodes that are created with
45//! [`CreateMode::Persistent`] are the nodes we have discussed thus far. They remain in the server
46//! until you delete them. Nodes that are created with [`CreateMode::Ephemeral`] on the other hand
47//! are special. These [ephemeral
48//! nodes](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#Ephemeral+Nodes) are
49//! automatically deleted by the server when the client that created them disconnects. This can be
50//! handy for implementing lease-like mechanisms, and for detecting faults. Since they are
51//! automatically deleted, and nodes with children cannot be deleted directly, ephemeral nodes are
52//! not allowed to have children.
53//!
54//! ## Watches
55//!
56//! In addition to the methods above, [`ZooKeeper::exists`], [`ZooKeeper::get_data`], and
57//! [`ZooKeeper::get_children`] also support setting
58//! "[watches](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#ch_zkWatches)" on
59//! a node. A watch is one-time trigger that causes a [`WatchedEvent`] to be sent to the client
60//! that set the watch when the state for which the watch was set changes. For example, for a
61//! watched `get_data`, a one-time notification will be sent the first time the data of the target
62//! node changes following when the response to the original `get_data` call was processed. You
63//! should see the ["Watches" entry in the Programmer's
64//! Guide](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#ch_zkWatches) for
65//! details.
66//!
67//! ## Getting started
68//!
69//! To get ZooKeeper up and running, follow the official [Getting Started
70//! Guide](https://zookeeper.apache.org/doc/current/zookeeperStarted.html). In most Linux
71//! environments, the procedure for getting a basic setup working is usually just to install the
72//! `zookeeper` package and then run `systemctl start zookeeper`. ZooKeeper will then be running at
73//! `127.0.0.1:2181`.
74//!
75//! # This implementation
76//!
77//! This library is analogous to the asynchronous API offered by the [official Java
78//! implementation](https://zookeeper.apache.org/doc/current/api/org/apache/zookeeper/ZooKeeper.html),
79//! and for most operations the Java documentation should apply to the Rust implementation. If this
80//! is not the case, it is considered [a bug](https://github.com/jonhoo/tokio-zookeeper/issues),
81//! and we'd love a bug report with as much relevant information as you can offer.
82//!
83//! Note that since this implementation is asynchronous, users of the client must take care to
84//! not re-order operations in their own code. There is some discussion of this in the [official
85//! documentation of the Java
86//! bindings](https://zookeeper.apache.org/doc/r3.4.12/zookeeperProgrammers.html#Java+Binding).
87//!
88//! For more information on ZooKeeper, see the [ZooKeeper Programmer's
89//! Guide](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html) and the [Confluence
90//! ZooKeeper wiki](https://cwiki.apache.org/confluence/display/ZOOKEEPER/Index). There is also a
91//! basic tutorial (that uses the Java client)
92//! [here](https://zookeeper.apache.org/doc/current/zookeeperTutorial.html).
93//!
94//! ## Interaction with Tokio
95//!
96//! The futures in this crate expect to be running under a `tokio::Runtime`. In the common case,
97//! you cannot resolve them solely using `.wait()`, but should instead use `tokio::run` or
98//! explicitly create a `tokio::Runtime` and then use `Runtime::block_on`.
99//!
100//! # A somewhat silly example
101//!
102//! ```no_run
103//! use tokio_zookeeper::*;
104//! use futures::prelude::*;
105//!
106//! # #[tokio::main(flavor = "current_thread")]
107//! # async fn main() {
108//! let (zk, default_watcher) = ZooKeeper::connect(&"127.0.0.1:2181".parse().unwrap())
109//!     .await
110//!     .unwrap();
111//!
112//! // let's first check if /example exists. the .watch() causes us to be notified
113//! // the next time the "exists" status of /example changes after the call.
114//! let stat = zk.watch().exists("/example").await.unwrap();
115//! // initially, /example does not exist
116//! assert_eq!(stat, None);
117//! // so let's make it!
118//! let path = zk
119//!     .create(
120//!         "/example",
121//!         &b"Hello world"[..],
122//!         Acl::open_unsafe(),
123//!         CreateMode::Persistent,
124//!     )
125//!     .await
126//!     .unwrap();
127//! assert_eq!(path.as_deref(), Ok("/example"));
128//!
129//! // does it exist now?
130//! let stat = zk.watch().exists("/example").await.unwrap();
131//! // looks like it!
132//! // note that the creation above also triggered our "exists" watch!
133//! assert_eq!(stat.unwrap().data_length as usize, b"Hello world".len());
134//!
135//! // did the data get set correctly?
136//! let res = zk.get_data("/example").await.unwrap();
137//! let data = b"Hello world";
138//! let res = res.unwrap();
139//! assert_eq!(res.0, data);
140//! assert_eq!(res.1.data_length as usize, data.len());
141//!
142//! // let's update the data.
143//! let stat = zk
144//!     .set_data("/example", Some(res.1.version), &b"Bye world"[..])
145//!     .await
146//!     .unwrap();
147//! assert_eq!(stat.unwrap().data_length as usize, "Bye world".len());
148//!
149//! // create a child of /example
150//! let path = zk
151//!     .create(
152//!         "/example/more",
153//!         &b"Hello more"[..],
154//!         Acl::open_unsafe(),
155//!         CreateMode::Persistent,
156//!     )
157//!     .await
158//!     .unwrap();
159//! assert_eq!(path.as_deref(), Ok("/example/more"));
160//!
161//! // it should be visible as a child of /example
162//! let children = zk.get_children("/example").await.unwrap();
163//! assert_eq!(children, Some(vec!["more".to_string()]));
164//!
165//! // it is not legal to delete a node that has children directly
166//! let res = zk.delete("/example", None).await.unwrap();
167//! assert_eq!(res, Err(error::Delete::NotEmpty));
168//! // instead we must delete the children first
169//! let res = zk.delete("/example/more", None).await.unwrap();
170//! assert_eq!(res, Ok(()));
171//! let res = zk.delete("/example", None).await.unwrap();
172//! assert_eq!(res, Ok(()));
173//! // no /example should no longer exist!
174//! let stat = zk.exists("/example").await.unwrap();
175//! assert_eq!(stat, None);
176//!
177//! // now let's check that the .watch().exists we did in the very
178//! // beginning actually triggered!
179//! let (event, _w) = default_watcher.into_future().await;
180//! assert_eq!(
181//!     event,
182//!     Some(WatchedEvent {
183//!         event_type: WatchedEventType::NodeCreated,
184//!         keeper_state: KeeperState::SyncConnected,
185//!         path: String::from("/example"),
186//!     })
187//! );
188//! # }
189//! ```
190
191#![deny(missing_docs)]
192#![deny(missing_debug_implementations)]
193#![deny(missing_copy_implementations)]
194
195use error::Error;
196use futures::{channel::oneshot, Stream};
197use snafu::{whatever as bail, ResultExt};
198use std::borrow::Cow;
199use std::net::SocketAddr;
200use std::time;
201use tracing::{debug, instrument, trace};
202
203/// Per-operation ZooKeeper error types.
204pub mod error;
205mod proto;
206mod transform;
207mod types;
208
209use crate::proto::{Watch, ZkError};
210pub use crate::types::{
211    Acl, CreateMode, KeeperState, MultiResponse, Permission, Stat, WatchedEvent, WatchedEventType,
212};
213
214macro_rules! format_err {
215    ($($x:tt)*) => {
216        <crate::error::Error as snafu::FromString>::without_source(format!($($x)*))
217    };
218}
219pub(crate) use format_err;
220
221/// A connection to ZooKeeper.
222///
223/// All interactions with ZooKeeper are performed by calling the methods of a `ZooKeeper` instance.
224/// All clones of the same `ZooKeeper` instance use the same underlying connection. Once a
225/// connection to a server is established, a session ID is assigned to the client. The client will
226/// send heart beats to the server periodically to keep the session valid.
227///
228/// The application can call ZooKeeper APIs through a client as long as the session ID of the
229/// client remains valid. If for some reason, the client fails to send heart beats to the server
230/// for a prolonged period of time (exceeding the session timeout value, for instance), the server
231/// will expire the session, and the session ID will become invalid. The `ZooKeeper` instance will
232/// then no longer be usable, and all futures will resolve with a protocol-level error. To make
233/// further ZooKeeper API calls, the application must create a new `ZooKeeper` instance.
234///
235/// If the ZooKeeper server the client currently connects to fails or otherwise does not respond,
236/// the client will automatically try to connect to another server before its session ID expires.
237/// If successful, the application can continue to use the client.
238///
239/// Some successful ZooKeeper API calls can leave watches on the "data nodes" in the ZooKeeper
240/// server. Other successful ZooKeeper API calls can trigger those watches. Once a watch is
241/// triggered, an event will be delivered to the client which left the watch at the first place.
242/// Each watch can be triggered only once. Thus, up to one event will be delivered to a client for
243/// every watch it leaves.
244#[derive(Debug, Clone)]
245pub struct ZooKeeper {
246    #[allow(dead_code)]
247    connection: proto::Enqueuer,
248}
249
250/// Builder that allows customizing options for ZooKeeper connections.
251#[derive(Debug, Copy, Clone)]
252pub struct ZooKeeperBuilder {
253    session_timeout: time::Duration,
254}
255
256impl Default for ZooKeeperBuilder {
257    fn default() -> Self {
258        ZooKeeperBuilder {
259            session_timeout: time::Duration::new(0, 0),
260        }
261    }
262}
263
264impl ZooKeeperBuilder {
265    /// Connect to a ZooKeeper server instance at the given address.
266    ///
267    /// A `ZooKeeper` instance is returned, along with a "watcher" that will provide notifications
268    /// of any changes in state.
269    ///
270    /// If the connection to the server fails, the client will automatically try to re-connect.
271    /// Only if re-connection fails is an error returned to the client. Requests that are in-flight
272    /// during a disconnect may fail and have to be retried.
273    pub async fn connect(
274        self,
275        addr: &SocketAddr,
276    ) -> Result<(ZooKeeper, impl Stream<Item = WatchedEvent>), Error> {
277        let (tx, rx) = futures::channel::mpsc::unbounded();
278        let stream = tokio::net::TcpStream::connect(addr)
279            .await
280            .whatever_context("connect failed")?;
281        Ok((self.handshake(*addr, stream, tx).await?, rx))
282    }
283
284    /// Set the ZooKeeper [session expiry
285    /// timeout](https://zookeeper.apache.org/doc/r3.4.12/zookeeperProgrammers.html#ch_zkSessions).
286    ///
287    /// The default timeout is dictated by the server.
288    pub fn set_timeout(&mut self, t: time::Duration) {
289        self.session_timeout = t;
290    }
291
292    async fn handshake(
293        self,
294        addr: SocketAddr,
295        stream: tokio::net::TcpStream,
296        default_watcher: futures::channel::mpsc::UnboundedSender<WatchedEvent>,
297    ) -> Result<ZooKeeper, Error> {
298        let request = proto::Request::Connect {
299            protocol_version: 0,
300            last_zxid_seen: 0,
301            timeout: (self.session_timeout.as_secs() * 1_000) as i32
302                + self.session_timeout.subsec_millis() as i32,
303            session_id: 0,
304            passwd: vec![],
305            read_only: false,
306        };
307        debug!("about to perform handshake");
308
309        let enqueuer = proto::Packetizer::new(addr, stream, default_watcher);
310        enqueuer.enqueue(request).await.map(move |response| {
311            trace!(?response, "Got response");
312            ZooKeeper {
313                connection: enqueuer,
314            }
315        })
316    }
317}
318
319impl ZooKeeper {
320    /// Connect to a ZooKeeper server instance at the given address with default parameters.
321    ///
322    /// See [`ZooKeeperBuilder::connect`].
323    pub async fn connect(
324        addr: &SocketAddr,
325    ) -> Result<(Self, impl Stream<Item = WatchedEvent>), Error> {
326        ZooKeeperBuilder::default().connect(addr).await
327    }
328
329    /// Create a node with the given `path` with `data` as its contents.
330    ///
331    /// The `mode` argument specifies additional options for the newly created node.
332    ///
333    /// If `mode` is set to [`CreateMode::Ephemeral`] (or [`CreateMode::EphemeralSequential`]), the
334    /// node will be removed by the ZooKeeper automatically when the session associated with the
335    /// creation of the node expires.
336    ///
337    /// If `mode` is set to [`CreateMode::PersistentSequential`] or
338    /// [`CreateMode::EphemeralSequential`], the actual path name of a sequential node will be the
339    /// given `path` plus a suffix `i` where `i` is the current sequential number of the node. The
340    /// sequence number is always fixed length of 10 digits, 0 padded. Once such a node is created,
341    /// the sequential number will be incremented by one. The newly created node's full name is
342    /// returned when the future is resolved.
343    ///
344    /// If a node with the same actual path already exists in the ZooKeeper, the returned future
345    /// resolves with an error of [`error::Create::NodeExists`]. Note that since a different actual
346    /// path is used for each invocation of creating sequential nodes with the same `path`
347    /// argument, calls with sequential modes will never return `NodeExists`.
348    ///
349    /// Ephemeral nodes cannot have children in ZooKeeper. Therefore, if the parent node of the
350    /// given `path` is ephemeral, the return future resolves to
351    /// [`error::Create::NoChildrenForEphemerals`].
352    ///
353    /// If a node is created successfully, the ZooKeeper server will trigger the watches on the
354    /// `path` left by `exists` calls, and the watches on the parent of the node by `get_children`
355    /// calls.
356    ///
357    /// The maximum allowable size of the data array is 1 MB (1,048,576 bytes).
358    #[instrument(skip(data, acl))]
359    pub async fn create<D, A>(
360        &self,
361        path: &str,
362        data: D,
363        acl: A,
364        mode: CreateMode,
365    ) -> Result<Result<String, error::Create>, Error>
366    where
367        D: Into<Cow<'static, [u8]>>,
368        A: Into<Cow<'static, [Acl]>>,
369    {
370        let data = data.into();
371        tracing::Span::current().record("dlen", data.len());
372        self.connection
373            .enqueue(proto::Request::Create {
374                path: path.to_string(),
375                data,
376                acl: acl.into(),
377                mode,
378            })
379            .await
380            .and_then(transform::create)
381    }
382
383    /// Set the data for the node at the given `path`.
384    ///
385    /// The call will succeed if such a node exists, and the given `version` matches the version of
386    /// the node (if the given `version` is `None`, it matches any version). On success, the
387    /// updated [`Stat`] of the node is returned.
388    ///
389    /// This operation, if successful, will trigger all the watches on the node of the given `path`
390    /// left by `get_data` calls.
391    ///
392    /// The maximum allowable size of the data array is 1 MB (1,048,576 bytes).
393    #[instrument(skip(data))]
394    pub async fn set_data<D>(
395        &self,
396        path: &str,
397        version: Option<i32>,
398        data: D,
399    ) -> Result<Result<Stat, error::SetData>, Error>
400    where
401        D: Into<Cow<'static, [u8]>>,
402    {
403        let data = data.into();
404        tracing::Span::current().record("dlen", data.len());
405        let version = version.unwrap_or(-1);
406        self.connection
407            .enqueue(proto::Request::SetData {
408                path: path.to_string(),
409                version,
410                data,
411            })
412            .await
413            .and_then(move |r| transform::set_data(version, r))
414    }
415
416    /// Delete the node at the given `path`.
417    ///
418    /// The call will succeed if such a node exists, and the given `version` matches the node's
419    /// version (if the given `version` is `None`, it matches any versions).
420    ///
421    /// This operation, if successful, will trigger all the watches on the node of the given `path`
422    /// left by `exists` API calls, and the watches on the parent node left by `get_children` API
423    /// calls.
424    #[instrument]
425    pub async fn delete(
426        &self,
427        path: &str,
428        version: Option<i32>,
429    ) -> Result<Result<(), error::Delete>, Error> {
430        let version = version.unwrap_or(-1);
431        self.connection
432            .enqueue(proto::Request::Delete {
433                path: path.to_string(),
434                version,
435            })
436            .await
437            .and_then(move |r| transform::delete(version, r))
438    }
439
440    /// Return the [ACL](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#sc_ZooKeeperAccessControl)
441    /// and Stat of the node at the given `path`.
442    ///
443    /// If no node exists for the given path, the returned future resolves with an error of
444    /// [`error::GetAcl::NoNode`].
445    #[instrument]
446    pub async fn get_acl(
447        &self,
448        path: &str,
449    ) -> Result<Result<(Vec<Acl>, Stat), error::GetAcl>, Error> {
450        self.connection
451            .enqueue(proto::Request::GetAcl {
452                path: path.to_string(),
453            })
454            .await
455            .and_then(transform::get_acl)
456    }
457
458    /// Set the [ACL](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#sc_ZooKeeperAccessControl)
459    /// for the node of the given `path`.
460    ///
461    /// The call will succeed if such a node exists and the given `version` matches the ACL version
462    /// of the node. On success, the updated [`Stat`] of the node is returned.
463    ///
464    /// If no node exists for the given path, the returned future resolves with an error of
465    /// [`error::SetAcl::NoNode`]. If the given `version` does not match the ACL version, the
466    /// returned future resolves with an error of [`error::SetAcl::BadVersion`].
467    #[instrument(skip(acl))]
468    pub async fn set_acl<A>(
469        &self,
470        path: &str,
471        acl: A,
472        version: Option<i32>,
473    ) -> Result<Result<Stat, error::SetAcl>, Error>
474    where
475        A: Into<Cow<'static, [Acl]>>,
476    {
477        let version = version.unwrap_or(-1);
478        self.connection
479            .enqueue(proto::Request::SetAcl {
480                path: path.to_string(),
481                acl: acl.into(),
482                version,
483            })
484            .await
485            .and_then(move |r| transform::set_acl(version, r))
486    }
487}
488
489impl ZooKeeper {
490    /// Add a global watch for the next chained operation.
491    pub fn watch(&self) -> WatchGlobally {
492        WatchGlobally(self)
493    }
494
495    /// Add a watch for the next chained operation, and return a future for any received event
496    /// along with the operation's (successful) result.
497    pub fn with_watcher(&self) -> WithWatcher {
498        WithWatcher(self)
499    }
500
501    #[instrument(name = "exists")]
502    async fn exists_w(&self, path: &str, watch: Watch) -> Result<Option<Stat>, Error> {
503        self.connection
504            .enqueue(proto::Request::Exists {
505                path: path.to_string(),
506                watch,
507            })
508            .await
509            .and_then(transform::exists)
510    }
511
512    /// Return the [`Stat`] of the node of the given `path`, or `None` if the node does not exist.
513    pub async fn exists(&self, path: &str) -> Result<Option<Stat>, Error> {
514        self.exists_w(path, Watch::None).await
515    }
516
517    #[instrument]
518    async fn get_children_w(&self, path: &str, watch: Watch) -> Result<Option<Vec<String>>, Error> {
519        self.connection
520            .enqueue(proto::Request::GetChildren {
521                path: path.to_string(),
522                watch,
523            })
524            .await
525            .and_then(transform::get_children)
526    }
527
528    /// Return the names of the children of the node at the given `path`, or `None` if the node
529    /// does not exist.
530    ///
531    /// The returned list of children is not sorted and no guarantee is provided as to its natural
532    /// or lexical order.
533    pub async fn get_children(&self, path: &str) -> Result<Option<Vec<String>>, Error> {
534        self.get_children_w(path, Watch::None).await
535    }
536
537    #[instrument]
538    async fn get_data_w(&self, path: &str, watch: Watch) -> Result<Option<(Vec<u8>, Stat)>, Error> {
539        self.connection
540            .enqueue(proto::Request::GetData {
541                path: path.to_string(),
542                watch,
543            })
544            .await
545            .and_then(transform::get_data)
546    }
547
548    /// Return the data and the [`Stat`] of the node at the given `path`, or `None` if it does not
549    /// exist.
550    pub async fn get_data(&self, path: &str) -> Result<Option<(Vec<u8>, Stat)>, Error> {
551        self.get_data_w(path, Watch::None).await
552    }
553
554    /// Start building a multi request. Multi requests batch several operations
555    /// into one atomic unit.
556    pub fn multi(&self) -> MultiBuilder {
557        MultiBuilder {
558            zk: self,
559            requests: Vec::new(),
560        }
561    }
562}
563
564/// Proxy for [`ZooKeeper`] that adds watches for initiated operations.
565///
566/// Triggered watches produce events on the global watcher stream.
567#[derive(Debug, Clone)]
568pub struct WatchGlobally<'a>(&'a ZooKeeper);
569
570impl<'a> WatchGlobally<'a> {
571    /// Return the [`Stat`] of the node of the given `path`, or `None` if the node does not exist.
572    ///
573    /// If no errors occur, a watch is left on the node at the given `path`. The watch is triggered
574    /// by any successful operation that creates or deletes the node, or sets the node's data. When
575    /// the watch triggers, an event is sent to the global watcher stream.
576    pub async fn exists(&self, path: &str) -> Result<Option<Stat>, Error> {
577        self.0.exists_w(path, Watch::Global).await
578    }
579
580    /// Return the names of the children of the node at the given `path`, or `None` if the node
581    /// does not exist.
582    ///
583    /// The returned list of children is not sorted and no guarantee is provided as to its natural
584    /// or lexical order.
585    ///
586    /// If no errors occur, a watch is left on the node at the given `path`. The watch is triggered
587    /// by any successful operation that deletes the node at the given `path`, or creates or
588    /// deletes a child of that node. When the watch triggers, an event is sent to the global
589    /// watcher stream.
590    pub async fn get_children(&self, path: &str) -> Result<Option<Vec<String>>, Error> {
591        self.0.get_children_w(path, Watch::Global).await
592    }
593
594    /// Return the data and the [`Stat`] of the node at the given `path`, or `None` if it does not
595    /// exist.
596    ///
597    /// If no errors occur, a watch is left on the node at the given `path`. The watch is triggered
598    /// by any successful operation that sets the node's data, or deletes it. When the watch
599    /// triggers, an event is sent to the global watcher stream.
600    pub async fn get_data(&self, path: &str) -> Result<Option<(Vec<u8>, Stat)>, Error> {
601        self.0.get_data_w(path, Watch::Global).await
602    }
603}
604
605/// Proxy for [`ZooKeeper`] that adds non-global watches for initiated operations.
606///
607/// Events from triggered watches are yielded through returned `oneshot` channels. All events are
608/// also produced on the global watcher stream.
609#[derive(Debug, Clone)]
610pub struct WithWatcher<'a>(&'a ZooKeeper);
611
612impl<'a> WithWatcher<'a> {
613    /// Return the [`Stat`] of the node of the given `path`, or `None` if the node does not exist.
614    ///
615    /// If no errors occur, a watch will be left on the node at the given `path`. The watch is
616    /// triggered by any successful operation that creates or deletes the node, or sets the data on
617    /// the node, and in turn causes the included `oneshot::Receiver` to resolve.
618    pub async fn exists(
619        &self,
620        path: &str,
621    ) -> Result<(oneshot::Receiver<WatchedEvent>, Option<Stat>), Error> {
622        let (tx, rx) = oneshot::channel();
623        self.0
624            .exists_w(path, Watch::Custom(tx))
625            .await
626            .map(|r| (rx, r))
627    }
628
629    /// Return the names of the children of the node at the given `path`, or `None` if the node
630    /// does not exist.
631    ///
632    /// The returned list of children is not sorted and no guarantee is provided as to its natural
633    /// or lexical order.
634    ///
635    /// If no errors occur, a watch is left on the node at the given `path`. The watch is triggered
636    /// by any successful operation that deletes the node at the given `path`, or creates or
637    /// deletes a child of that node, and in turn causes the included `oneshot::Receiver` to
638    /// resolve.
639    pub async fn get_children(
640        &self,
641        path: &str,
642    ) -> Result<Option<(oneshot::Receiver<WatchedEvent>, Vec<String>)>, Error> {
643        let (tx, rx) = oneshot::channel();
644        self.0
645            .get_children_w(path, Watch::Custom(tx))
646            .await
647            .map(|r| (r.map(move |c| (rx, c))))
648    }
649
650    /// Return the data and the [`Stat`] of the node at the given `path`, or `None` if it does not
651    /// exist.
652    ///
653    /// If no errors occur, a watch is left on the node at the given `path`. The watch is triggered
654    /// by any successful operation that sets the node's data, or deletes it, and in turn causes
655    /// the included `oneshot::Receiver` to resolve.
656    pub async fn get_data(
657        &self,
658        path: &str,
659    ) -> Result<Option<(oneshot::Receiver<WatchedEvent>, Vec<u8>, Stat)>, Error> {
660        let (tx, rx) = oneshot::channel();
661        self.0
662            .get_data_w(path, Watch::Custom(tx))
663            .await
664            .map(|r| (r.map(move |(b, s)| (rx, b, s))))
665    }
666}
667
668/// Proxy for [`ZooKeeper`] that batches operations into an atomic "multi" request.
669#[derive(Debug)]
670pub struct MultiBuilder<'a> {
671    zk: &'a ZooKeeper,
672    requests: Vec<proto::Request>,
673}
674
675impl<'a> MultiBuilder<'a> {
676    /// Attach a create operation to this multi request.
677    ///
678    /// See [`ZooKeeper::create`] for details.
679    pub fn create<D, A>(mut self, path: &str, data: D, acl: A, mode: CreateMode) -> Self
680    where
681        D: Into<Cow<'static, [u8]>>,
682        A: Into<Cow<'static, [Acl]>>,
683    {
684        self.requests.push(proto::Request::Create {
685            path: path.to_string(),
686            data: data.into(),
687            acl: acl.into(),
688            mode,
689        });
690        self
691    }
692
693    /// Attach a set data operation to this multi request.
694    ///
695    /// See [`ZooKeeper::set_data`] for details.
696    pub fn set_data<D>(mut self, path: &str, version: Option<i32>, data: D) -> Self
697    where
698        D: Into<Cow<'static, [u8]>>,
699    {
700        self.requests.push(proto::Request::SetData {
701            path: path.to_string(),
702            version: version.unwrap_or(-1),
703            data: data.into(),
704        });
705        self
706    }
707
708    /// Attach a delete operation to this multi request.
709    ///
710    /// See [`ZooKeeper::delete`] for details.
711    pub fn delete(mut self, path: &str, version: Option<i32>) -> Self {
712        self.requests.push(proto::Request::Delete {
713            path: path.to_string(),
714            version: version.unwrap_or(-1),
715        });
716        self
717    }
718
719    /// Attach a check operation to this multi request.
720    ///
721    /// There is no equivalent to the check operation outside of a multi
722    /// request.
723    pub fn check(mut self, path: &str, version: i32) -> Self {
724        self.requests.push(proto::Request::Check {
725            path: path.to_string(),
726            version,
727        });
728        self
729    }
730
731    /// Run executes the attached requests in one atomic unit.
732    pub async fn run(self) -> Result<Vec<Result<MultiResponse, error::Multi>>, Error> {
733        let (zk, requests) = (self.zk, self.requests);
734        let reqs_lite: Vec<transform::RequestMarker> = requests.iter().map(|r| r.into()).collect();
735        zk.connection
736            .enqueue(proto::Request::Multi(requests))
737            .await
738            .and_then(move |r| match r {
739                Ok(proto::Response::Multi(responses)) => reqs_lite
740                    .iter()
741                    .zip(responses)
742                    .map(|(req, res)| transform::multi(req, res))
743                    .collect(),
744                Ok(r) => bail!("got non-multi response to multi: {:?}", r),
745                Err(e) => Err(format_err!("multi call failed: {:?}", e)),
746            })
747    }
748}
749
750#[cfg(test)]
751mod tests {
752
753    use super::*;
754
755    use futures::StreamExt;
756    use tracing::Level;
757
758    fn init_tracing_subscriber() {
759        let _ = tracing_subscriber::fmt()
760            .with_max_level(Level::DEBUG)
761            .try_init();
762    }
763
764    #[tokio::test]
765    async fn it_works() {
766        init_tracing_subscriber();
767        let builder = ZooKeeperBuilder::default();
768
769        let (zk, w) = builder
770            .connect(&"127.0.0.1:2181".parse().unwrap())
771            .await
772            .unwrap();
773        let (exists_w, stat) = zk.with_watcher().exists("/foo").await.unwrap();
774        assert_eq!(stat, None);
775        let stat = zk.watch().exists("/foo").await.unwrap();
776        assert_eq!(stat, None);
777        let path = zk
778            .create(
779                "/foo",
780                &b"Hello world"[..],
781                Acl::open_unsafe(),
782                CreateMode::Persistent,
783            )
784            .await
785            .unwrap();
786        assert_eq!(path.as_ref().map(String::as_str), Ok("/foo"));
787        let event = exists_w.await.expect("exists_w failed");
788        assert_eq!(
789            event,
790            WatchedEvent {
791                event_type: WatchedEventType::NodeCreated,
792                keeper_state: KeeperState::SyncConnected,
793                path: String::from("/foo"),
794            }
795        );
796        let stat = zk.watch().exists("/foo").await.unwrap();
797        assert_eq!(stat.unwrap().data_length as usize, b"Hello world".len());
798        let res = zk.get_acl("/foo").await.unwrap();
799        let (acl, _) = res.unwrap();
800        assert_eq!(acl, Acl::open_unsafe());
801        let res = zk.get_data("/foo").await.unwrap();
802        let data = b"Hello world";
803        let res = res.unwrap();
804        assert_eq!(res.0, data);
805        assert_eq!(res.1.data_length as usize, data.len());
806        let stat = zk
807            .set_data("/foo", Some(res.1.version), &b"Bye world"[..])
808            .await
809            .unwrap();
810        assert_eq!(stat.unwrap().data_length as usize, "Bye world".len());
811        let res = zk.get_data("/foo").await.unwrap();
812        let data = b"Bye world";
813        let res = res.unwrap();
814        assert_eq!(res.0, data);
815        assert_eq!(res.1.data_length as usize, data.len());
816        let path = zk
817            .create(
818                "/foo/bar",
819                &b"Hello bar"[..],
820                Acl::open_unsafe(),
821                CreateMode::Persistent,
822            )
823            .await
824            .unwrap();
825        assert_eq!(path.as_deref(), Ok("/foo/bar"));
826        let children = zk.get_children("/foo").await.unwrap();
827        assert_eq!(children, Some(vec!["bar".to_string()]));
828        let res = zk.get_data("/foo/bar").await.unwrap();
829        let data = b"Hello bar";
830        let res = res.unwrap();
831        assert_eq!(res.0, data);
832        assert_eq!(res.1.data_length as usize, data.len());
833        // add a new exists watch so we'll get notified of delete
834        let _ = zk.watch().exists("/foo").await.unwrap();
835        let res = zk.delete("/foo", None).await.unwrap();
836        assert_eq!(res, Err(error::Delete::NotEmpty));
837        let res = zk.delete("/foo/bar", None).await.unwrap();
838        assert_eq!(res, Ok(()));
839        let res = zk.delete("/foo", None).await.unwrap();
840        assert_eq!(res, Ok(()));
841        let stat = zk.watch().exists("/foo").await.unwrap();
842        assert_eq!(stat, None);
843        let (event, w) = w.into_future().await;
844        assert_eq!(
845            event,
846            Some(WatchedEvent {
847                event_type: WatchedEventType::NodeCreated,
848                keeper_state: KeeperState::SyncConnected,
849                path: String::from("/foo"),
850            })
851        );
852        let (event, w) = w.into_future().await;
853        assert_eq!(
854            event,
855            Some(WatchedEvent {
856                event_type: WatchedEventType::NodeDataChanged,
857                keeper_state: KeeperState::SyncConnected,
858                path: String::from("/foo"),
859            })
860        );
861        let (event, w) = w.into_future().await;
862        assert_eq!(
863            event,
864            Some(WatchedEvent {
865                event_type: WatchedEventType::NodeDeleted,
866                keeper_state: KeeperState::SyncConnected,
867                path: String::from("/foo"),
868            })
869        );
870
871        drop(zk); // make Packetizer idle
872        assert_eq!(w.count().await, 0);
873    }
874
875    #[tokio::test]
876    async fn example() {
877        let (zk, default_watcher) = ZooKeeper::connect(&"127.0.0.1:2181".parse().unwrap())
878            .await
879            .unwrap();
880
881        // let's first check if /example exists. the .watch() causes us to be notified
882        // the next time the "exists" status of /example changes after the call.
883        let stat = zk.watch().exists("/example").await.unwrap();
884        // initially, /example does not exist
885        assert_eq!(stat, None);
886        // so let's make it!
887        let path = zk
888            .create(
889                "/example",
890                &b"Hello world"[..],
891                Acl::open_unsafe(),
892                CreateMode::Persistent,
893            )
894            .await
895            .unwrap();
896        assert_eq!(path.as_deref(), Ok("/example"));
897
898        // does it exist now?
899        let stat = zk.watch().exists("/example").await.unwrap();
900        // looks like it!
901        // note that the creation above also triggered our "exists" watch!
902        assert_eq!(stat.unwrap().data_length as usize, b"Hello world".len());
903
904        // did the data get set correctly?
905        let res = zk.get_data("/example").await.unwrap();
906        let data = b"Hello world";
907        let res = res.unwrap();
908        assert_eq!(res.0, data);
909        assert_eq!(res.1.data_length as usize, data.len());
910
911        // let's update the data.
912        let stat = zk
913            .set_data("/example", Some(res.1.version), &b"Bye world"[..])
914            .await
915            .unwrap();
916        assert_eq!(stat.unwrap().data_length as usize, "Bye world".len());
917
918        // create a child of /example
919        let path = zk
920            .create(
921                "/example/more",
922                &b"Hello more"[..],
923                Acl::open_unsafe(),
924                CreateMode::Persistent,
925            )
926            .await
927            .unwrap();
928        assert_eq!(path.as_deref(), Ok("/example/more"));
929
930        // it should be visible as a child of /example
931        let children = zk.get_children("/example").await.unwrap();
932        assert_eq!(children, Some(vec!["more".to_string()]));
933
934        // it is not legal to delete a node that has children directly
935        let res = zk.delete("/example", None).await.unwrap();
936        assert_eq!(res, Err(error::Delete::NotEmpty));
937        // instead we must delete the children first
938        let res = zk.delete("/example/more", None).await.unwrap();
939        assert_eq!(res, Ok(()));
940        let res = zk.delete("/example", None).await.unwrap();
941        assert_eq!(res, Ok(()));
942        // no /example should no longer exist!
943        let stat = zk.exists("/example").await.unwrap();
944        assert_eq!(stat, None);
945
946        // now let's check that the .watch().exists we did in the very
947        // beginning actually triggered!
948        let (event, _w) = default_watcher.into_future().await;
949        assert_eq!(
950            event,
951            Some(WatchedEvent {
952                event_type: WatchedEventType::NodeCreated,
953                keeper_state: KeeperState::SyncConnected,
954                path: String::from("/example"),
955            })
956        );
957    }
958
959    #[tokio::test]
960    async fn acl_test() {
961        init_tracing_subscriber();
962        let builder = ZooKeeperBuilder::default();
963
964        let (zk, _) = (builder.connect(&"127.0.0.1:2181".parse().unwrap()))
965            .await
966            .unwrap();
967        let _ = zk
968            .create(
969                "/acl_test",
970                &b"foo"[..],
971                Acl::open_unsafe(),
972                CreateMode::Ephemeral,
973            )
974            .await
975            .unwrap();
976
977        let res = zk.get_acl("/acl_test").await.unwrap();
978        let res = res.unwrap();
979        assert_eq!(res.0, Acl::open_unsafe());
980
981        let res = zk
982            .set_acl("/acl_test", Acl::creator_all(), Some(res.1.version))
983            .await
984            .unwrap();
985        // a not authenticated user is not able to set `auth` scheme acls.
986        assert_eq!(res, Err(error::SetAcl::InvalidAcl));
987
988        let stat = zk
989            .set_acl("/acl_test", Acl::read_unsafe(), None)
990            .await
991            .unwrap();
992        // successfully change node acl to `read_unsafe`
993        assert_eq!(stat.unwrap().data_length as usize, b"foo".len());
994
995        let res = zk.get_acl("/acl_test").await.unwrap();
996        let res = res.unwrap();
997        assert_eq!(res.0, Acl::read_unsafe());
998
999        let res = zk.set_data("/acl_test", None, &b"bar"[..]).await.unwrap();
1000        // cannot set data on a read only node
1001        assert_eq!(res, Err(error::SetData::NoAuth));
1002
1003        let res = zk
1004            .set_acl("/acl_test", Acl::open_unsafe(), None)
1005            .await
1006            .unwrap();
1007        // cannot change a read only node's acl
1008        assert_eq!(res, Err(error::SetAcl::NoAuth));
1009
1010        drop(zk); // make Packetizer idle
1011    }
1012
1013    #[tokio::test]
1014    async fn multi_test() {
1015        init_tracing_subscriber();
1016        let builder = ZooKeeperBuilder::default();
1017
1018        async fn check_exists(zk: &ZooKeeper, paths: &[&str]) -> Result<Vec<bool>, Error> {
1019            let mut res = Vec::new();
1020            for p in paths {
1021                let exists = zk.exists(p).await?;
1022                res.push(exists.is_some());
1023            }
1024            Result::<_, Error>::Ok(res)
1025        }
1026
1027        let (zk, _) = builder
1028            .connect(&"127.0.0.1:2181".parse().unwrap())
1029            .await
1030            .unwrap();
1031
1032        let res = zk
1033            .multi()
1034            .create("/b", &b"a"[..], Acl::open_unsafe(), CreateMode::Persistent)
1035            .create("/c", &b"b"[..], Acl::open_unsafe(), CreateMode::Persistent)
1036            .run()
1037            .await
1038            .unwrap();
1039        assert_eq!(
1040            res,
1041            [
1042                Ok(MultiResponse::Create("/b".into())),
1043                Ok(MultiResponse::Create("/c".into()))
1044            ]
1045        );
1046
1047        let res = check_exists(&zk, &["/a", "/b", "/c", "/d"]).await.unwrap();
1048        assert_eq!(res, &[false, true, true, false]);
1049
1050        let res = zk
1051            .multi()
1052            .create("/a", &b"a"[..], Acl::open_unsafe(), CreateMode::Persistent)
1053            .create("/b", &b"b"[..], Acl::open_unsafe(), CreateMode::Persistent)
1054            .create("/c", &b"b"[..], Acl::open_unsafe(), CreateMode::Persistent)
1055            .create("/d", &b"a"[..], Acl::open_unsafe(), CreateMode::Persistent)
1056            .run()
1057            .await
1058            .unwrap();
1059        assert_eq!(
1060            res,
1061            &[
1062                Err(error::Multi::RolledBack),
1063                Err(error::Multi::Create {
1064                    source: error::Create::NodeExists
1065                }),
1066                Err(error::Multi::Skipped),
1067                Err(error::Multi::Skipped),
1068            ]
1069        );
1070
1071        let res = check_exists(&zk, &["/a", "/b", "/c", "/d"]).await.unwrap();
1072        assert_eq!(res, &[false, true, true, false]);
1073
1074        let res = zk
1075            .multi()
1076            .set_data("/b", None, &b"garbaggio"[..])
1077            .run()
1078            .await
1079            .unwrap();
1080        match res[0] {
1081            Ok(MultiResponse::SetData(stat)) => {
1082                assert_eq!(stat.data_length as usize, "garbaggio".len())
1083            }
1084            _ => panic!("unexpected response: {res:?}"),
1085        }
1086
1087        let res = zk
1088            .multi()
1089            .check("/b", 0)
1090            .delete("/c", None)
1091            .run()
1092            .await
1093            .unwrap();
1094        assert_eq!(
1095            res,
1096            [
1097                Err(error::Multi::Check {
1098                    source: error::Check::BadVersion { expected: 0 }
1099                }),
1100                Err(error::Multi::Skipped),
1101            ]
1102        );
1103
1104        let res = check_exists(&zk, &["/a", "/b", "/c", "/d"]).await.unwrap();
1105        assert_eq!(res, &[false, true, true, false]);
1106        let res = zk.multi().check("/a", 0).run().await.unwrap();
1107        assert_eq!(
1108            res,
1109            &[Err(error::Multi::Check {
1110                source: error::Check::NoNode
1111            }),]
1112        );
1113
1114        let res = zk
1115            .multi()
1116            .check("/b", 1)
1117            .delete("/b", None)
1118            .check("/c", 0)
1119            .delete("/c", None)
1120            .run()
1121            .await
1122            .unwrap();
1123        assert_eq!(
1124            res,
1125            [
1126                Ok(MultiResponse::Check),
1127                Ok(MultiResponse::Delete),
1128                Ok(MultiResponse::Check),
1129                Ok(MultiResponse::Delete),
1130            ]
1131        );
1132
1133        let res = check_exists(&zk, &["/a", "/b", "/c", "/d"]).await.unwrap();
1134        assert_eq!(res, [false, false, false, false]);
1135
1136        drop(zk); // make Packetizer idle
1137    }
1138}