tokio_zookeeper/lib.rs
1//! This crate provides a client for interacting with [Apache
2//! ZooKeeper](https://zookeeper.apache.org/), a highly reliable distributed service for
3//! maintaining configuration information, naming, providing distributed synchronization, and
4//! providing group services.
5//!
6//! # About ZooKeeper
7//!
8//! The [ZooKeeper Overview](https://zookeeper.apache.org/doc/current/zookeeperOver.html) provides
9//! a thorough introduction to ZooKeeper, but we'll repeat the most important points here. At its
10//! [heart](https://zookeeper.apache.org/doc/current/zookeeperOver.html#sc_designGoals), ZooKeeper
11//! is a [hierarchical key-value
12//! store](https://zookeeper.apache.org/doc/current/zookeeperOver.html#sc_dataModelNameSpace) (that
13//! is, keys can have "sub-keys"), which additional mechanisms that guarantee consistent operation
14//! across client and server failures. Keys in ZooKeeper look like paths (e.g., `/key/subkey`), and
15//! every item along a path is called a
16//! "[Znode](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#sc_zkDataModel_znodes)".
17//! Each Znode (including those with children) can also have associated data, which can be queried
18//! and updated like in other key-value stores. Along with its data and children, each Znode stores
19//! meta-information such as [access-control
20//! lists](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#sc_ZooKeeperAccessControl),
21//! [modification
22//! timestamps](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#sc_timeInZk),
23//! and a version number
24//! that allows clients to avoid stepping on each other's toes when accessing values (more on that
25//! later).
26//!
27//! ## Operations
28//!
29//! ZooKeeper's API consists of the same basic operations you would expect to find in a
30//! file-system: [`create`](struct.ZooKeeper.html#method.create) for creating new Znodes,
31//! [`delete`](struct.ZooKeeper.html#method.delete) for removing them,
32//! [`exists`](struct.ZooKeeper.html#method.exists) for checking if a node exists,
33//! [`get_data`](struct.ZooKeeper.html#method.get_data) and
34//! [`set_data`](struct.ZooKeeper.html#method.set_data) for getting and setting a node's associated
35//! data respectively, and [`get_children`](struct.ZooKeeper.html#method.get_children) for
36//! retrieving the children of a given node (i.e., its subkeys). For all of these operations,
37//! ZooKeeper gives [strong
38//! guarantees](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#ch_zkGuarantees)
39//! about what happens when there are multiple clients interacting with the system, or even what
40//! happens in response to system and network failures.
41//!
42//! ## Ephemeral nodes
43//!
44//! When you create a Znode, you also specify a [`CreateMode`]. Nodes that are created with
45//! [`CreateMode::Persistent`] are the nodes we have discussed thus far. They remain in the server
46//! until you delete them. Nodes that are created with [`CreateMode::Ephemeral`] on the other hand
47//! are special. These [ephemeral
48//! nodes](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#Ephemeral+Nodes) are
49//! automatically deleted by the server when the client that created them disconnects. This can be
50//! handy for implementing lease-like mechanisms, and for detecting faults. Since they are
51//! automatically deleted, and nodes with children cannot be deleted directly, ephemeral nodes are
52//! not allowed to have children.
53//!
54//! ## Watches
55//!
56//! In addition to the methods above, [`ZooKeeper::exists`], [`ZooKeeper::get_data`], and
57//! [`ZooKeeper::get_children`] also support setting
58//! "[watches](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#ch_zkWatches)" on
59//! a node. A watch is one-time trigger that causes a [`WatchedEvent`] to be sent to the client
60//! that set the watch when the state for which the watch was set changes. For example, for a
61//! watched `get_data`, a one-time notification will be sent the first time the data of the target
62//! node changes following when the response to the original `get_data` call was processed. You
63//! should see the ["Watches" entry in the Programmer's
64//! Guide](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#ch_zkWatches) for
65//! details.
66//!
67//! ## Getting started
68//!
69//! To get ZooKeeper up and running, follow the official [Getting Started
70//! Guide](https://zookeeper.apache.org/doc/current/zookeeperStarted.html). In most Linux
71//! environments, the procedure for getting a basic setup working is usually just to install the
72//! `zookeeper` package and then run `systemctl start zookeeper`. ZooKeeper will then be running at
73//! `127.0.0.1:2181`.
74//!
75//! # This implementation
76//!
77//! This library is analogous to the asynchronous API offered by the [official Java
78//! implementation](https://zookeeper.apache.org/doc/current/api/org/apache/zookeeper/ZooKeeper.html),
79//! and for most operations the Java documentation should apply to the Rust implementation. If this
80//! is not the case, it is considered [a bug](https://github.com/jonhoo/tokio-zookeeper/issues),
81//! and we'd love a bug report with as much relevant information as you can offer.
82//!
83//! Note that since this implementation is asynchronous, users of the client must take care to
84//! not re-order operations in their own code. There is some discussion of this in the [official
85//! documentation of the Java
86//! bindings](https://zookeeper.apache.org/doc/r3.4.12/zookeeperProgrammers.html#Java+Binding).
87//!
88//! For more information on ZooKeeper, see the [ZooKeeper Programmer's
89//! Guide](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html) and the [Confluence
90//! ZooKeeper wiki](https://cwiki.apache.org/confluence/display/ZOOKEEPER/Index). There is also a
91//! basic tutorial (that uses the Java client)
92//! [here](https://zookeeper.apache.org/doc/current/zookeeperTutorial.html).
93//!
94//! ## Interaction with Tokio
95//!
96//! The futures in this crate expect to be running under a `tokio::Runtime`. In the common case,
97//! you cannot resolve them solely using `.wait()`, but should instead use `tokio::run` or
98//! explicitly create a `tokio::Runtime` and then use `Runtime::block_on`.
99//!
100//! # A somewhat silly example
101//!
102//! ```no_run
103//! use tokio_zookeeper::*;
104//! use futures::prelude::*;
105//!
106//! # #[tokio::main(flavor = "current_thread")]
107//! # async fn main() {
108//! let (zk, default_watcher) = ZooKeeper::connect(&"127.0.0.1:2181".parse().unwrap())
109//! .await
110//! .unwrap();
111//!
112//! // let's first check if /example exists. the .watch() causes us to be notified
113//! // the next time the "exists" status of /example changes after the call.
114//! let stat = zk.watch().exists("/example").await.unwrap();
115//! // initially, /example does not exist
116//! assert_eq!(stat, None);
117//! // so let's make it!
118//! let path = zk
119//! .create(
120//! "/example",
121//! &b"Hello world"[..],
122//! Acl::open_unsafe(),
123//! CreateMode::Persistent,
124//! )
125//! .await
126//! .unwrap();
127//! assert_eq!(path.as_deref(), Ok("/example"));
128//!
129//! // does it exist now?
130//! let stat = zk.watch().exists("/example").await.unwrap();
131//! // looks like it!
132//! // note that the creation above also triggered our "exists" watch!
133//! assert_eq!(stat.unwrap().data_length as usize, b"Hello world".len());
134//!
135//! // did the data get set correctly?
136//! let res = zk.get_data("/example").await.unwrap();
137//! let data = b"Hello world";
138//! let res = res.unwrap();
139//! assert_eq!(res.0, data);
140//! assert_eq!(res.1.data_length as usize, data.len());
141//!
142//! // let's update the data.
143//! let stat = zk
144//! .set_data("/example", Some(res.1.version), &b"Bye world"[..])
145//! .await
146//! .unwrap();
147//! assert_eq!(stat.unwrap().data_length as usize, "Bye world".len());
148//!
149//! // create a child of /example
150//! let path = zk
151//! .create(
152//! "/example/more",
153//! &b"Hello more"[..],
154//! Acl::open_unsafe(),
155//! CreateMode::Persistent,
156//! )
157//! .await
158//! .unwrap();
159//! assert_eq!(path.as_deref(), Ok("/example/more"));
160//!
161//! // it should be visible as a child of /example
162//! let children = zk.get_children("/example").await.unwrap();
163//! assert_eq!(children, Some(vec!["more".to_string()]));
164//!
165//! // it is not legal to delete a node that has children directly
166//! let res = zk.delete("/example", None).await.unwrap();
167//! assert_eq!(res, Err(error::Delete::NotEmpty));
168//! // instead we must delete the children first
169//! let res = zk.delete("/example/more", None).await.unwrap();
170//! assert_eq!(res, Ok(()));
171//! let res = zk.delete("/example", None).await.unwrap();
172//! assert_eq!(res, Ok(()));
173//! // no /example should no longer exist!
174//! let stat = zk.exists("/example").await.unwrap();
175//! assert_eq!(stat, None);
176//!
177//! // now let's check that the .watch().exists we did in the very
178//! // beginning actually triggered!
179//! let (event, _w) = default_watcher.into_future().await;
180//! assert_eq!(
181//! event,
182//! Some(WatchedEvent {
183//! event_type: WatchedEventType::NodeCreated,
184//! keeper_state: KeeperState::SyncConnected,
185//! path: String::from("/example"),
186//! })
187//! );
188//! # }
189//! ```
190
191#![deny(missing_docs)]
192#![deny(missing_debug_implementations)]
193#![deny(missing_copy_implementations)]
194
195use error::Error;
196use futures::{channel::oneshot, Stream};
197use snafu::{whatever as bail, ResultExt};
198use std::borrow::Cow;
199use std::net::SocketAddr;
200use std::time;
201use tracing::{debug, instrument, trace};
202
203/// Per-operation ZooKeeper error types.
204pub mod error;
205mod proto;
206mod transform;
207mod types;
208
209use crate::proto::{Watch, ZkError};
210pub use crate::types::{
211 Acl, CreateMode, KeeperState, MultiResponse, Permission, Stat, WatchedEvent, WatchedEventType,
212};
213
214macro_rules! format_err {
215 ($($x:tt)*) => {
216 <crate::error::Error as snafu::FromString>::without_source(format!($($x)*))
217 };
218}
219pub(crate) use format_err;
220
221/// A connection to ZooKeeper.
222///
223/// All interactions with ZooKeeper are performed by calling the methods of a `ZooKeeper` instance.
224/// All clones of the same `ZooKeeper` instance use the same underlying connection. Once a
225/// connection to a server is established, a session ID is assigned to the client. The client will
226/// send heart beats to the server periodically to keep the session valid.
227///
228/// The application can call ZooKeeper APIs through a client as long as the session ID of the
229/// client remains valid. If for some reason, the client fails to send heart beats to the server
230/// for a prolonged period of time (exceeding the session timeout value, for instance), the server
231/// will expire the session, and the session ID will become invalid. The `ZooKeeper` instance will
232/// then no longer be usable, and all futures will resolve with a protocol-level error. To make
233/// further ZooKeeper API calls, the application must create a new `ZooKeeper` instance.
234///
235/// If the ZooKeeper server the client currently connects to fails or otherwise does not respond,
236/// the client will automatically try to connect to another server before its session ID expires.
237/// If successful, the application can continue to use the client.
238///
239/// Some successful ZooKeeper API calls can leave watches on the "data nodes" in the ZooKeeper
240/// server. Other successful ZooKeeper API calls can trigger those watches. Once a watch is
241/// triggered, an event will be delivered to the client which left the watch at the first place.
242/// Each watch can be triggered only once. Thus, up to one event will be delivered to a client for
243/// every watch it leaves.
244#[derive(Debug, Clone)]
245pub struct ZooKeeper {
246 #[allow(dead_code)]
247 connection: proto::Enqueuer,
248}
249
250/// Builder that allows customizing options for ZooKeeper connections.
251#[derive(Debug, Copy, Clone)]
252pub struct ZooKeeperBuilder {
253 session_timeout: time::Duration,
254}
255
256impl Default for ZooKeeperBuilder {
257 fn default() -> Self {
258 ZooKeeperBuilder {
259 session_timeout: time::Duration::new(0, 0),
260 }
261 }
262}
263
264impl ZooKeeperBuilder {
265 /// Connect to a ZooKeeper server instance at the given address.
266 ///
267 /// A `ZooKeeper` instance is returned, along with a "watcher" that will provide notifications
268 /// of any changes in state.
269 ///
270 /// If the connection to the server fails, the client will automatically try to re-connect.
271 /// Only if re-connection fails is an error returned to the client. Requests that are in-flight
272 /// during a disconnect may fail and have to be retried.
273 pub async fn connect(
274 self,
275 addr: &SocketAddr,
276 ) -> Result<(ZooKeeper, impl Stream<Item = WatchedEvent>), Error> {
277 let (tx, rx) = futures::channel::mpsc::unbounded();
278 let stream = tokio::net::TcpStream::connect(addr)
279 .await
280 .whatever_context("connect failed")?;
281 Ok((self.handshake(*addr, stream, tx).await?, rx))
282 }
283
284 /// Set the ZooKeeper [session expiry
285 /// timeout](https://zookeeper.apache.org/doc/r3.4.12/zookeeperProgrammers.html#ch_zkSessions).
286 ///
287 /// The default timeout is dictated by the server.
288 pub fn set_timeout(&mut self, t: time::Duration) {
289 self.session_timeout = t;
290 }
291
292 async fn handshake(
293 self,
294 addr: SocketAddr,
295 stream: tokio::net::TcpStream,
296 default_watcher: futures::channel::mpsc::UnboundedSender<WatchedEvent>,
297 ) -> Result<ZooKeeper, Error> {
298 let request = proto::Request::Connect {
299 protocol_version: 0,
300 last_zxid_seen: 0,
301 timeout: (self.session_timeout.as_secs() * 1_000) as i32
302 + self.session_timeout.subsec_millis() as i32,
303 session_id: 0,
304 passwd: vec![],
305 read_only: false,
306 };
307 debug!("about to perform handshake");
308
309 let enqueuer = proto::Packetizer::new(addr, stream, default_watcher);
310 enqueuer.enqueue(request).await.map(move |response| {
311 trace!(?response, "Got response");
312 ZooKeeper {
313 connection: enqueuer,
314 }
315 })
316 }
317}
318
319impl ZooKeeper {
320 /// Connect to a ZooKeeper server instance at the given address with default parameters.
321 ///
322 /// See [`ZooKeeperBuilder::connect`].
323 pub async fn connect(
324 addr: &SocketAddr,
325 ) -> Result<(Self, impl Stream<Item = WatchedEvent>), Error> {
326 ZooKeeperBuilder::default().connect(addr).await
327 }
328
329 /// Create a node with the given `path` with `data` as its contents.
330 ///
331 /// The `mode` argument specifies additional options for the newly created node.
332 ///
333 /// If `mode` is set to [`CreateMode::Ephemeral`] (or [`CreateMode::EphemeralSequential`]), the
334 /// node will be removed by the ZooKeeper automatically when the session associated with the
335 /// creation of the node expires.
336 ///
337 /// If `mode` is set to [`CreateMode::PersistentSequential`] or
338 /// [`CreateMode::EphemeralSequential`], the actual path name of a sequential node will be the
339 /// given `path` plus a suffix `i` where `i` is the current sequential number of the node. The
340 /// sequence number is always fixed length of 10 digits, 0 padded. Once such a node is created,
341 /// the sequential number will be incremented by one. The newly created node's full name is
342 /// returned when the future is resolved.
343 ///
344 /// If a node with the same actual path already exists in the ZooKeeper, the returned future
345 /// resolves with an error of [`error::Create::NodeExists`]. Note that since a different actual
346 /// path is used for each invocation of creating sequential nodes with the same `path`
347 /// argument, calls with sequential modes will never return `NodeExists`.
348 ///
349 /// Ephemeral nodes cannot have children in ZooKeeper. Therefore, if the parent node of the
350 /// given `path` is ephemeral, the return future resolves to
351 /// [`error::Create::NoChildrenForEphemerals`].
352 ///
353 /// If a node is created successfully, the ZooKeeper server will trigger the watches on the
354 /// `path` left by `exists` calls, and the watches on the parent of the node by `get_children`
355 /// calls.
356 ///
357 /// The maximum allowable size of the data array is 1 MB (1,048,576 bytes).
358 #[instrument(skip(data, acl))]
359 pub async fn create<D, A>(
360 &self,
361 path: &str,
362 data: D,
363 acl: A,
364 mode: CreateMode,
365 ) -> Result<Result<String, error::Create>, Error>
366 where
367 D: Into<Cow<'static, [u8]>>,
368 A: Into<Cow<'static, [Acl]>>,
369 {
370 let data = data.into();
371 tracing::Span::current().record("dlen", data.len());
372 self.connection
373 .enqueue(proto::Request::Create {
374 path: path.to_string(),
375 data,
376 acl: acl.into(),
377 mode,
378 })
379 .await
380 .and_then(transform::create)
381 }
382
383 /// Set the data for the node at the given `path`.
384 ///
385 /// The call will succeed if such a node exists, and the given `version` matches the version of
386 /// the node (if the given `version` is `None`, it matches any version). On success, the
387 /// updated [`Stat`] of the node is returned.
388 ///
389 /// This operation, if successful, will trigger all the watches on the node of the given `path`
390 /// left by `get_data` calls.
391 ///
392 /// The maximum allowable size of the data array is 1 MB (1,048,576 bytes).
393 #[instrument(skip(data))]
394 pub async fn set_data<D>(
395 &self,
396 path: &str,
397 version: Option<i32>,
398 data: D,
399 ) -> Result<Result<Stat, error::SetData>, Error>
400 where
401 D: Into<Cow<'static, [u8]>>,
402 {
403 let data = data.into();
404 tracing::Span::current().record("dlen", data.len());
405 let version = version.unwrap_or(-1);
406 self.connection
407 .enqueue(proto::Request::SetData {
408 path: path.to_string(),
409 version,
410 data,
411 })
412 .await
413 .and_then(move |r| transform::set_data(version, r))
414 }
415
416 /// Delete the node at the given `path`.
417 ///
418 /// The call will succeed if such a node exists, and the given `version` matches the node's
419 /// version (if the given `version` is `None`, it matches any versions).
420 ///
421 /// This operation, if successful, will trigger all the watches on the node of the given `path`
422 /// left by `exists` API calls, and the watches on the parent node left by `get_children` API
423 /// calls.
424 #[instrument]
425 pub async fn delete(
426 &self,
427 path: &str,
428 version: Option<i32>,
429 ) -> Result<Result<(), error::Delete>, Error> {
430 let version = version.unwrap_or(-1);
431 self.connection
432 .enqueue(proto::Request::Delete {
433 path: path.to_string(),
434 version,
435 })
436 .await
437 .and_then(move |r| transform::delete(version, r))
438 }
439
440 /// Return the [ACL](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#sc_ZooKeeperAccessControl)
441 /// and Stat of the node at the given `path`.
442 ///
443 /// If no node exists for the given path, the returned future resolves with an error of
444 /// [`error::GetAcl::NoNode`].
445 #[instrument]
446 pub async fn get_acl(
447 &self,
448 path: &str,
449 ) -> Result<Result<(Vec<Acl>, Stat), error::GetAcl>, Error> {
450 self.connection
451 .enqueue(proto::Request::GetAcl {
452 path: path.to_string(),
453 })
454 .await
455 .and_then(transform::get_acl)
456 }
457
458 /// Set the [ACL](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#sc_ZooKeeperAccessControl)
459 /// for the node of the given `path`.
460 ///
461 /// The call will succeed if such a node exists and the given `version` matches the ACL version
462 /// of the node. On success, the updated [`Stat`] of the node is returned.
463 ///
464 /// If no node exists for the given path, the returned future resolves with an error of
465 /// [`error::SetAcl::NoNode`]. If the given `version` does not match the ACL version, the
466 /// returned future resolves with an error of [`error::SetAcl::BadVersion`].
467 #[instrument(skip(acl))]
468 pub async fn set_acl<A>(
469 &self,
470 path: &str,
471 acl: A,
472 version: Option<i32>,
473 ) -> Result<Result<Stat, error::SetAcl>, Error>
474 where
475 A: Into<Cow<'static, [Acl]>>,
476 {
477 let version = version.unwrap_or(-1);
478 self.connection
479 .enqueue(proto::Request::SetAcl {
480 path: path.to_string(),
481 acl: acl.into(),
482 version,
483 })
484 .await
485 .and_then(move |r| transform::set_acl(version, r))
486 }
487}
488
489impl ZooKeeper {
490 /// Add a global watch for the next chained operation.
491 pub fn watch(&self) -> WatchGlobally {
492 WatchGlobally(self)
493 }
494
495 /// Add a watch for the next chained operation, and return a future for any received event
496 /// along with the operation's (successful) result.
497 pub fn with_watcher(&self) -> WithWatcher {
498 WithWatcher(self)
499 }
500
501 #[instrument(name = "exists")]
502 async fn exists_w(&self, path: &str, watch: Watch) -> Result<Option<Stat>, Error> {
503 self.connection
504 .enqueue(proto::Request::Exists {
505 path: path.to_string(),
506 watch,
507 })
508 .await
509 .and_then(transform::exists)
510 }
511
512 /// Return the [`Stat`] of the node of the given `path`, or `None` if the node does not exist.
513 pub async fn exists(&self, path: &str) -> Result<Option<Stat>, Error> {
514 self.exists_w(path, Watch::None).await
515 }
516
517 #[instrument]
518 async fn get_children_w(&self, path: &str, watch: Watch) -> Result<Option<Vec<String>>, Error> {
519 self.connection
520 .enqueue(proto::Request::GetChildren {
521 path: path.to_string(),
522 watch,
523 })
524 .await
525 .and_then(transform::get_children)
526 }
527
528 /// Return the names of the children of the node at the given `path`, or `None` if the node
529 /// does not exist.
530 ///
531 /// The returned list of children is not sorted and no guarantee is provided as to its natural
532 /// or lexical order.
533 pub async fn get_children(&self, path: &str) -> Result<Option<Vec<String>>, Error> {
534 self.get_children_w(path, Watch::None).await
535 }
536
537 #[instrument]
538 async fn get_data_w(&self, path: &str, watch: Watch) -> Result<Option<(Vec<u8>, Stat)>, Error> {
539 self.connection
540 .enqueue(proto::Request::GetData {
541 path: path.to_string(),
542 watch,
543 })
544 .await
545 .and_then(transform::get_data)
546 }
547
548 /// Return the data and the [`Stat`] of the node at the given `path`, or `None` if it does not
549 /// exist.
550 pub async fn get_data(&self, path: &str) -> Result<Option<(Vec<u8>, Stat)>, Error> {
551 self.get_data_w(path, Watch::None).await
552 }
553
554 /// Start building a multi request. Multi requests batch several operations
555 /// into one atomic unit.
556 pub fn multi(&self) -> MultiBuilder {
557 MultiBuilder {
558 zk: self,
559 requests: Vec::new(),
560 }
561 }
562}
563
564/// Proxy for [`ZooKeeper`] that adds watches for initiated operations.
565///
566/// Triggered watches produce events on the global watcher stream.
567#[derive(Debug, Clone)]
568pub struct WatchGlobally<'a>(&'a ZooKeeper);
569
570impl<'a> WatchGlobally<'a> {
571 /// Return the [`Stat`] of the node of the given `path`, or `None` if the node does not exist.
572 ///
573 /// If no errors occur, a watch is left on the node at the given `path`. The watch is triggered
574 /// by any successful operation that creates or deletes the node, or sets the node's data. When
575 /// the watch triggers, an event is sent to the global watcher stream.
576 pub async fn exists(&self, path: &str) -> Result<Option<Stat>, Error> {
577 self.0.exists_w(path, Watch::Global).await
578 }
579
580 /// Return the names of the children of the node at the given `path`, or `None` if the node
581 /// does not exist.
582 ///
583 /// The returned list of children is not sorted and no guarantee is provided as to its natural
584 /// or lexical order.
585 ///
586 /// If no errors occur, a watch is left on the node at the given `path`. The watch is triggered
587 /// by any successful operation that deletes the node at the given `path`, or creates or
588 /// deletes a child of that node. When the watch triggers, an event is sent to the global
589 /// watcher stream.
590 pub async fn get_children(&self, path: &str) -> Result<Option<Vec<String>>, Error> {
591 self.0.get_children_w(path, Watch::Global).await
592 }
593
594 /// Return the data and the [`Stat`] of the node at the given `path`, or `None` if it does not
595 /// exist.
596 ///
597 /// If no errors occur, a watch is left on the node at the given `path`. The watch is triggered
598 /// by any successful operation that sets the node's data, or deletes it. When the watch
599 /// triggers, an event is sent to the global watcher stream.
600 pub async fn get_data(&self, path: &str) -> Result<Option<(Vec<u8>, Stat)>, Error> {
601 self.0.get_data_w(path, Watch::Global).await
602 }
603}
604
605/// Proxy for [`ZooKeeper`] that adds non-global watches for initiated operations.
606///
607/// Events from triggered watches are yielded through returned `oneshot` channels. All events are
608/// also produced on the global watcher stream.
609#[derive(Debug, Clone)]
610pub struct WithWatcher<'a>(&'a ZooKeeper);
611
612impl<'a> WithWatcher<'a> {
613 /// Return the [`Stat`] of the node of the given `path`, or `None` if the node does not exist.
614 ///
615 /// If no errors occur, a watch will be left on the node at the given `path`. The watch is
616 /// triggered by any successful operation that creates or deletes the node, or sets the data on
617 /// the node, and in turn causes the included `oneshot::Receiver` to resolve.
618 pub async fn exists(
619 &self,
620 path: &str,
621 ) -> Result<(oneshot::Receiver<WatchedEvent>, Option<Stat>), Error> {
622 let (tx, rx) = oneshot::channel();
623 self.0
624 .exists_w(path, Watch::Custom(tx))
625 .await
626 .map(|r| (rx, r))
627 }
628
629 /// Return the names of the children of the node at the given `path`, or `None` if the node
630 /// does not exist.
631 ///
632 /// The returned list of children is not sorted and no guarantee is provided as to its natural
633 /// or lexical order.
634 ///
635 /// If no errors occur, a watch is left on the node at the given `path`. The watch is triggered
636 /// by any successful operation that deletes the node at the given `path`, or creates or
637 /// deletes a child of that node, and in turn causes the included `oneshot::Receiver` to
638 /// resolve.
639 pub async fn get_children(
640 &self,
641 path: &str,
642 ) -> Result<Option<(oneshot::Receiver<WatchedEvent>, Vec<String>)>, Error> {
643 let (tx, rx) = oneshot::channel();
644 self.0
645 .get_children_w(path, Watch::Custom(tx))
646 .await
647 .map(|r| (r.map(move |c| (rx, c))))
648 }
649
650 /// Return the data and the [`Stat`] of the node at the given `path`, or `None` if it does not
651 /// exist.
652 ///
653 /// If no errors occur, a watch is left on the node at the given `path`. The watch is triggered
654 /// by any successful operation that sets the node's data, or deletes it, and in turn causes
655 /// the included `oneshot::Receiver` to resolve.
656 pub async fn get_data(
657 &self,
658 path: &str,
659 ) -> Result<Option<(oneshot::Receiver<WatchedEvent>, Vec<u8>, Stat)>, Error> {
660 let (tx, rx) = oneshot::channel();
661 self.0
662 .get_data_w(path, Watch::Custom(tx))
663 .await
664 .map(|r| (r.map(move |(b, s)| (rx, b, s))))
665 }
666}
667
668/// Proxy for [`ZooKeeper`] that batches operations into an atomic "multi" request.
669#[derive(Debug)]
670pub struct MultiBuilder<'a> {
671 zk: &'a ZooKeeper,
672 requests: Vec<proto::Request>,
673}
674
675impl<'a> MultiBuilder<'a> {
676 /// Attach a create operation to this multi request.
677 ///
678 /// See [`ZooKeeper::create`] for details.
679 pub fn create<D, A>(mut self, path: &str, data: D, acl: A, mode: CreateMode) -> Self
680 where
681 D: Into<Cow<'static, [u8]>>,
682 A: Into<Cow<'static, [Acl]>>,
683 {
684 self.requests.push(proto::Request::Create {
685 path: path.to_string(),
686 data: data.into(),
687 acl: acl.into(),
688 mode,
689 });
690 self
691 }
692
693 /// Attach a set data operation to this multi request.
694 ///
695 /// See [`ZooKeeper::set_data`] for details.
696 pub fn set_data<D>(mut self, path: &str, version: Option<i32>, data: D) -> Self
697 where
698 D: Into<Cow<'static, [u8]>>,
699 {
700 self.requests.push(proto::Request::SetData {
701 path: path.to_string(),
702 version: version.unwrap_or(-1),
703 data: data.into(),
704 });
705 self
706 }
707
708 /// Attach a delete operation to this multi request.
709 ///
710 /// See [`ZooKeeper::delete`] for details.
711 pub fn delete(mut self, path: &str, version: Option<i32>) -> Self {
712 self.requests.push(proto::Request::Delete {
713 path: path.to_string(),
714 version: version.unwrap_or(-1),
715 });
716 self
717 }
718
719 /// Attach a check operation to this multi request.
720 ///
721 /// There is no equivalent to the check operation outside of a multi
722 /// request.
723 pub fn check(mut self, path: &str, version: i32) -> Self {
724 self.requests.push(proto::Request::Check {
725 path: path.to_string(),
726 version,
727 });
728 self
729 }
730
731 /// Run executes the attached requests in one atomic unit.
732 pub async fn run(self) -> Result<Vec<Result<MultiResponse, error::Multi>>, Error> {
733 let (zk, requests) = (self.zk, self.requests);
734 let reqs_lite: Vec<transform::RequestMarker> = requests.iter().map(|r| r.into()).collect();
735 zk.connection
736 .enqueue(proto::Request::Multi(requests))
737 .await
738 .and_then(move |r| match r {
739 Ok(proto::Response::Multi(responses)) => reqs_lite
740 .iter()
741 .zip(responses)
742 .map(|(req, res)| transform::multi(req, res))
743 .collect(),
744 Ok(r) => bail!("got non-multi response to multi: {:?}", r),
745 Err(e) => Err(format_err!("multi call failed: {:?}", e)),
746 })
747 }
748}
749
750#[cfg(test)]
751mod tests {
752
753 use super::*;
754
755 use futures::StreamExt;
756 use tracing::Level;
757
758 fn init_tracing_subscriber() {
759 let _ = tracing_subscriber::fmt()
760 .with_max_level(Level::DEBUG)
761 .try_init();
762 }
763
764 #[tokio::test]
765 async fn it_works() {
766 init_tracing_subscriber();
767 let builder = ZooKeeperBuilder::default();
768
769 let (zk, w) = builder
770 .connect(&"127.0.0.1:2181".parse().unwrap())
771 .await
772 .unwrap();
773 let (exists_w, stat) = zk.with_watcher().exists("/foo").await.unwrap();
774 assert_eq!(stat, None);
775 let stat = zk.watch().exists("/foo").await.unwrap();
776 assert_eq!(stat, None);
777 let path = zk
778 .create(
779 "/foo",
780 &b"Hello world"[..],
781 Acl::open_unsafe(),
782 CreateMode::Persistent,
783 )
784 .await
785 .unwrap();
786 assert_eq!(path.as_ref().map(String::as_str), Ok("/foo"));
787 let event = exists_w.await.expect("exists_w failed");
788 assert_eq!(
789 event,
790 WatchedEvent {
791 event_type: WatchedEventType::NodeCreated,
792 keeper_state: KeeperState::SyncConnected,
793 path: String::from("/foo"),
794 }
795 );
796 let stat = zk.watch().exists("/foo").await.unwrap();
797 assert_eq!(stat.unwrap().data_length as usize, b"Hello world".len());
798 let res = zk.get_acl("/foo").await.unwrap();
799 let (acl, _) = res.unwrap();
800 assert_eq!(acl, Acl::open_unsafe());
801 let res = zk.get_data("/foo").await.unwrap();
802 let data = b"Hello world";
803 let res = res.unwrap();
804 assert_eq!(res.0, data);
805 assert_eq!(res.1.data_length as usize, data.len());
806 let stat = zk
807 .set_data("/foo", Some(res.1.version), &b"Bye world"[..])
808 .await
809 .unwrap();
810 assert_eq!(stat.unwrap().data_length as usize, "Bye world".len());
811 let res = zk.get_data("/foo").await.unwrap();
812 let data = b"Bye world";
813 let res = res.unwrap();
814 assert_eq!(res.0, data);
815 assert_eq!(res.1.data_length as usize, data.len());
816 let path = zk
817 .create(
818 "/foo/bar",
819 &b"Hello bar"[..],
820 Acl::open_unsafe(),
821 CreateMode::Persistent,
822 )
823 .await
824 .unwrap();
825 assert_eq!(path.as_deref(), Ok("/foo/bar"));
826 let children = zk.get_children("/foo").await.unwrap();
827 assert_eq!(children, Some(vec!["bar".to_string()]));
828 let res = zk.get_data("/foo/bar").await.unwrap();
829 let data = b"Hello bar";
830 let res = res.unwrap();
831 assert_eq!(res.0, data);
832 assert_eq!(res.1.data_length as usize, data.len());
833 // add a new exists watch so we'll get notified of delete
834 let _ = zk.watch().exists("/foo").await.unwrap();
835 let res = zk.delete("/foo", None).await.unwrap();
836 assert_eq!(res, Err(error::Delete::NotEmpty));
837 let res = zk.delete("/foo/bar", None).await.unwrap();
838 assert_eq!(res, Ok(()));
839 let res = zk.delete("/foo", None).await.unwrap();
840 assert_eq!(res, Ok(()));
841 let stat = zk.watch().exists("/foo").await.unwrap();
842 assert_eq!(stat, None);
843 let (event, w) = w.into_future().await;
844 assert_eq!(
845 event,
846 Some(WatchedEvent {
847 event_type: WatchedEventType::NodeCreated,
848 keeper_state: KeeperState::SyncConnected,
849 path: String::from("/foo"),
850 })
851 );
852 let (event, w) = w.into_future().await;
853 assert_eq!(
854 event,
855 Some(WatchedEvent {
856 event_type: WatchedEventType::NodeDataChanged,
857 keeper_state: KeeperState::SyncConnected,
858 path: String::from("/foo"),
859 })
860 );
861 let (event, w) = w.into_future().await;
862 assert_eq!(
863 event,
864 Some(WatchedEvent {
865 event_type: WatchedEventType::NodeDeleted,
866 keeper_state: KeeperState::SyncConnected,
867 path: String::from("/foo"),
868 })
869 );
870
871 drop(zk); // make Packetizer idle
872 assert_eq!(w.count().await, 0);
873 }
874
875 #[tokio::test]
876 async fn example() {
877 let (zk, default_watcher) = ZooKeeper::connect(&"127.0.0.1:2181".parse().unwrap())
878 .await
879 .unwrap();
880
881 // let's first check if /example exists. the .watch() causes us to be notified
882 // the next time the "exists" status of /example changes after the call.
883 let stat = zk.watch().exists("/example").await.unwrap();
884 // initially, /example does not exist
885 assert_eq!(stat, None);
886 // so let's make it!
887 let path = zk
888 .create(
889 "/example",
890 &b"Hello world"[..],
891 Acl::open_unsafe(),
892 CreateMode::Persistent,
893 )
894 .await
895 .unwrap();
896 assert_eq!(path.as_deref(), Ok("/example"));
897
898 // does it exist now?
899 let stat = zk.watch().exists("/example").await.unwrap();
900 // looks like it!
901 // note that the creation above also triggered our "exists" watch!
902 assert_eq!(stat.unwrap().data_length as usize, b"Hello world".len());
903
904 // did the data get set correctly?
905 let res = zk.get_data("/example").await.unwrap();
906 let data = b"Hello world";
907 let res = res.unwrap();
908 assert_eq!(res.0, data);
909 assert_eq!(res.1.data_length as usize, data.len());
910
911 // let's update the data.
912 let stat = zk
913 .set_data("/example", Some(res.1.version), &b"Bye world"[..])
914 .await
915 .unwrap();
916 assert_eq!(stat.unwrap().data_length as usize, "Bye world".len());
917
918 // create a child of /example
919 let path = zk
920 .create(
921 "/example/more",
922 &b"Hello more"[..],
923 Acl::open_unsafe(),
924 CreateMode::Persistent,
925 )
926 .await
927 .unwrap();
928 assert_eq!(path.as_deref(), Ok("/example/more"));
929
930 // it should be visible as a child of /example
931 let children = zk.get_children("/example").await.unwrap();
932 assert_eq!(children, Some(vec!["more".to_string()]));
933
934 // it is not legal to delete a node that has children directly
935 let res = zk.delete("/example", None).await.unwrap();
936 assert_eq!(res, Err(error::Delete::NotEmpty));
937 // instead we must delete the children first
938 let res = zk.delete("/example/more", None).await.unwrap();
939 assert_eq!(res, Ok(()));
940 let res = zk.delete("/example", None).await.unwrap();
941 assert_eq!(res, Ok(()));
942 // no /example should no longer exist!
943 let stat = zk.exists("/example").await.unwrap();
944 assert_eq!(stat, None);
945
946 // now let's check that the .watch().exists we did in the very
947 // beginning actually triggered!
948 let (event, _w) = default_watcher.into_future().await;
949 assert_eq!(
950 event,
951 Some(WatchedEvent {
952 event_type: WatchedEventType::NodeCreated,
953 keeper_state: KeeperState::SyncConnected,
954 path: String::from("/example"),
955 })
956 );
957 }
958
959 #[tokio::test]
960 async fn acl_test() {
961 init_tracing_subscriber();
962 let builder = ZooKeeperBuilder::default();
963
964 let (zk, _) = (builder.connect(&"127.0.0.1:2181".parse().unwrap()))
965 .await
966 .unwrap();
967 let _ = zk
968 .create(
969 "/acl_test",
970 &b"foo"[..],
971 Acl::open_unsafe(),
972 CreateMode::Ephemeral,
973 )
974 .await
975 .unwrap();
976
977 let res = zk.get_acl("/acl_test").await.unwrap();
978 let res = res.unwrap();
979 assert_eq!(res.0, Acl::open_unsafe());
980
981 let res = zk
982 .set_acl("/acl_test", Acl::creator_all(), Some(res.1.version))
983 .await
984 .unwrap();
985 // a not authenticated user is not able to set `auth` scheme acls.
986 assert_eq!(res, Err(error::SetAcl::InvalidAcl));
987
988 let stat = zk
989 .set_acl("/acl_test", Acl::read_unsafe(), None)
990 .await
991 .unwrap();
992 // successfully change node acl to `read_unsafe`
993 assert_eq!(stat.unwrap().data_length as usize, b"foo".len());
994
995 let res = zk.get_acl("/acl_test").await.unwrap();
996 let res = res.unwrap();
997 assert_eq!(res.0, Acl::read_unsafe());
998
999 let res = zk.set_data("/acl_test", None, &b"bar"[..]).await.unwrap();
1000 // cannot set data on a read only node
1001 assert_eq!(res, Err(error::SetData::NoAuth));
1002
1003 let res = zk
1004 .set_acl("/acl_test", Acl::open_unsafe(), None)
1005 .await
1006 .unwrap();
1007 // cannot change a read only node's acl
1008 assert_eq!(res, Err(error::SetAcl::NoAuth));
1009
1010 drop(zk); // make Packetizer idle
1011 }
1012
1013 #[tokio::test]
1014 async fn multi_test() {
1015 init_tracing_subscriber();
1016 let builder = ZooKeeperBuilder::default();
1017
1018 async fn check_exists(zk: &ZooKeeper, paths: &[&str]) -> Result<Vec<bool>, Error> {
1019 let mut res = Vec::new();
1020 for p in paths {
1021 let exists = zk.exists(p).await?;
1022 res.push(exists.is_some());
1023 }
1024 Result::<_, Error>::Ok(res)
1025 }
1026
1027 let (zk, _) = builder
1028 .connect(&"127.0.0.1:2181".parse().unwrap())
1029 .await
1030 .unwrap();
1031
1032 let res = zk
1033 .multi()
1034 .create("/b", &b"a"[..], Acl::open_unsafe(), CreateMode::Persistent)
1035 .create("/c", &b"b"[..], Acl::open_unsafe(), CreateMode::Persistent)
1036 .run()
1037 .await
1038 .unwrap();
1039 assert_eq!(
1040 res,
1041 [
1042 Ok(MultiResponse::Create("/b".into())),
1043 Ok(MultiResponse::Create("/c".into()))
1044 ]
1045 );
1046
1047 let res = check_exists(&zk, &["/a", "/b", "/c", "/d"]).await.unwrap();
1048 assert_eq!(res, &[false, true, true, false]);
1049
1050 let res = zk
1051 .multi()
1052 .create("/a", &b"a"[..], Acl::open_unsafe(), CreateMode::Persistent)
1053 .create("/b", &b"b"[..], Acl::open_unsafe(), CreateMode::Persistent)
1054 .create("/c", &b"b"[..], Acl::open_unsafe(), CreateMode::Persistent)
1055 .create("/d", &b"a"[..], Acl::open_unsafe(), CreateMode::Persistent)
1056 .run()
1057 .await
1058 .unwrap();
1059 assert_eq!(
1060 res,
1061 &[
1062 Err(error::Multi::RolledBack),
1063 Err(error::Multi::Create {
1064 source: error::Create::NodeExists
1065 }),
1066 Err(error::Multi::Skipped),
1067 Err(error::Multi::Skipped),
1068 ]
1069 );
1070
1071 let res = check_exists(&zk, &["/a", "/b", "/c", "/d"]).await.unwrap();
1072 assert_eq!(res, &[false, true, true, false]);
1073
1074 let res = zk
1075 .multi()
1076 .set_data("/b", None, &b"garbaggio"[..])
1077 .run()
1078 .await
1079 .unwrap();
1080 match res[0] {
1081 Ok(MultiResponse::SetData(stat)) => {
1082 assert_eq!(stat.data_length as usize, "garbaggio".len())
1083 }
1084 _ => panic!("unexpected response: {res:?}"),
1085 }
1086
1087 let res = zk
1088 .multi()
1089 .check("/b", 0)
1090 .delete("/c", None)
1091 .run()
1092 .await
1093 .unwrap();
1094 assert_eq!(
1095 res,
1096 [
1097 Err(error::Multi::Check {
1098 source: error::Check::BadVersion { expected: 0 }
1099 }),
1100 Err(error::Multi::Skipped),
1101 ]
1102 );
1103
1104 let res = check_exists(&zk, &["/a", "/b", "/c", "/d"]).await.unwrap();
1105 assert_eq!(res, &[false, true, true, false]);
1106 let res = zk.multi().check("/a", 0).run().await.unwrap();
1107 assert_eq!(
1108 res,
1109 &[Err(error::Multi::Check {
1110 source: error::Check::NoNode
1111 }),]
1112 );
1113
1114 let res = zk
1115 .multi()
1116 .check("/b", 1)
1117 .delete("/b", None)
1118 .check("/c", 0)
1119 .delete("/c", None)
1120 .run()
1121 .await
1122 .unwrap();
1123 assert_eq!(
1124 res,
1125 [
1126 Ok(MultiResponse::Check),
1127 Ok(MultiResponse::Delete),
1128 Ok(MultiResponse::Check),
1129 Ok(MultiResponse::Delete),
1130 ]
1131 );
1132
1133 let res = check_exists(&zk, &["/a", "/b", "/c", "/d"]).await.unwrap();
1134 assert_eq!(res, [false, false, false, false]);
1135
1136 drop(zk); // make Packetizer idle
1137 }
1138}