zookeeper_async/
zookeeper.rs

1use std::convert::From;
2use std::fmt::{Debug, Formatter, Result as FmtResult};
3use std::net::{SocketAddr, ToSocketAddrs};
4use std::result;
5use std::string::ToString;
6use std::sync::atomic::{AtomicIsize, Ordering};
7use std::time::Duration;
8use tokio::sync::mpsc::Sender;
9use tokio::sync::oneshot::{channel, Sender as OneshotSender};
10use tokio::sync::Mutex;
11use tracing::*;
12
13use crate::io::ZkIo;
14use crate::listeners::ListenerSet;
15use crate::proto::{
16    to_len_prefixed_buf, AuthRequest, ByteBuf, Create2Response, CreateRequest, CreateResponse,
17    CreateTTLRequest, DeleteRequest, EmptyRequest, EmptyResponse, ExistsRequest, ExistsResponse,
18    GetAclRequest, GetAclResponse, GetChildrenRequest, GetChildrenResponse, GetDataRequest,
19    GetDataResponse, OpCode, ReadFrom, ReplyHeader, RequestHeader, SetAclRequest, SetAclResponse,
20    SetDataRequest, SetDataResponse, WriteTo,
21};
22use crate::watch::ZkWatch;
23use crate::{
24    Acl, CreateMode, Stat, Subscription, Watch, WatchType, WatchedEvent, Watcher, ZkError, ZkState,
25};
26
27/// Value returned from potentially-error operations.
28pub type ZkResult<T> = result::Result<T, ZkError>;
29
30pub struct RawRequest {
31    pub opcode: OpCode,
32    pub data: ByteBuf,
33    pub listener: Option<OneshotSender<RawResponse>>,
34    pub watch: Option<Watch>,
35}
36
37impl Debug for RawRequest {
38    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
39        f.debug_struct("RawRequest")
40            .field("opcode", &self.opcode)
41            .field("data", &self.data)
42            .finish()
43    }
44}
45
46#[derive(Debug)]
47pub struct RawResponse {
48    pub header: ReplyHeader,
49    pub data: ByteBuf,
50}
51
52/// The client interface for interacting with a ZooKeeper cluster.
53pub struct ZooKeeper {
54    chroot: Option<String>,
55    xid: AtomicIsize,
56    io: Mutex<Sender<RawRequest>>,
57    listeners: ListenerSet<ZkState>,
58}
59
60impl ZooKeeper {
61    /// Connect to a ZooKeeper cluster.
62    ///
63    /// - `connect_string`: comma separated host:port pairs, each corresponding to a zk server,
64    ///   e.g. `"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"` If the optional chroot suffix is
65    ///   used the example would look like: `"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"`
66    ///   where the client would be rooted at `"/app/a"` and all paths would be relative to this
67    ///   root -- ie getting/setting/etc...`"/foo/bar"` would result in operations being run on
68    ///   `"/app/a/foo/bar"` (from the server perspective).
69    /// - `timeout`: session timeout -- how long should a client go without receiving communication
70    ///   from a server before considering it connection loss?
71    /// - `watcher`: a watcher object to be notified of connection state changes.
72    /// - `retry_time`: when the connection is lost, reconnect for a longer time to avoid
73    ///   reconnecting too quickly
74    pub async fn connect_with_retry_time<W>(
75        connect_string: &str,
76        timeout: Duration,
77        watcher: W,
78        retry_time: Duration,
79    ) -> ZkResult<ZooKeeper>
80    where
81        W: Watcher + 'static,
82    {
83        let (addrs, chroot) = Self::parse_connect_string(connect_string)?;
84
85        debug!("Initiating connection to {}", connect_string);
86
87        let (watch, watch_sender) = ZkWatch::new(watcher, chroot.clone());
88        let listeners = ListenerSet::<ZkState>::new();
89        let listeners1 = listeners.clone();
90        let io = ZkIo::new(addrs.clone(), timeout, retry_time, watch_sender, listeners1).await;
91        let sender = io.sender();
92
93        tokio::spawn(watch.run());
94
95        tokio::spawn(io.run());
96
97        trace!("Returning a ZooKeeper");
98
99        Ok(ZooKeeper {
100            chroot,
101            xid: AtomicIsize::new(1),
102            io: Mutex::new(sender),
103            listeners,
104        })
105    }
106
107    pub async fn connect<W>(
108        connect_string: &str,
109        timeout: Duration,
110        watcher: W,
111    ) -> ZkResult<ZooKeeper>
112    where
113        W: Watcher + 'static,
114    {
115        Self::connect_with_retry_time(connect_string, timeout, watcher, Duration::from_secs(0))
116            .await
117    }
118
119    fn parse_connect_string(connect_string: &str) -> ZkResult<(Vec<SocketAddr>, Option<String>)> {
120        let (chroot, end) = match connect_string.find('/') {
121            Some(start) => match &connect_string[start..connect_string.len()] {
122                "" | "/" => (None, start),
123                chroot => (Some(Self::validate_path(chroot)?.to_owned()), start),
124            },
125            None => (None, connect_string.len()),
126        };
127
128        let mut addrs = Vec::new();
129        for addr_str in connect_string[..end].split(',') {
130            let addr = match addr_str.trim().to_socket_addrs() {
131                Ok(mut addrs) => match addrs.next() {
132                    Some(addr) => addr,
133                    None => return Err(ZkError::BadArguments),
134                },
135                Err(_) => return Err(ZkError::BadArguments),
136            };
137            addrs.push(addr);
138        }
139
140        Ok((addrs, chroot))
141    }
142
143    fn xid(&self) -> i32 {
144        self.xid.fetch_add(1, Ordering::Relaxed) as i32
145    }
146
147    async fn request<Req: WriteTo, Resp: ReadFrom>(
148        &self,
149        opcode: OpCode,
150        xid: i32,
151        req: Req,
152        watch: Option<Watch>,
153    ) -> ZkResult<Resp> {
154        trace!("request opcode={:?} xid={:?}", opcode, xid);
155        let rh = RequestHeader { xid, opcode };
156        let buf = to_len_prefixed_buf(rh, req).map_err(|_| ZkError::MarshallingError)?;
157
158        let (resp_tx, resp_rx) = channel();
159        let request = RawRequest {
160            opcode,
161            data: buf,
162            listener: Some(resp_tx),
163            watch,
164        };
165
166        self.io.lock().await.send(request).await.map_err(|_| {
167            warn!("error sending request");
168            ZkError::ConnectionLoss
169        })?;
170
171        let mut response = resp_rx.await.map_err(|err| {
172            warn!("error receiving response: {:?}", err);
173            ZkError::ConnectionLoss
174        })?;
175
176        match response.header.err {
177            0 => Ok(ReadFrom::read_from(&mut response.data).map_err(|_| ZkError::MarshallingError)?),
178            e => Err(ZkError::from(e)),
179        }
180    }
181
182    fn validate_path(path: &str) -> ZkResult<&str> {
183        match path {
184            "" => Err(ZkError::BadArguments),
185            path => {
186                if path.len() > 1 && path.ends_with('/') {
187                    Err(ZkError::BadArguments)
188                } else {
189                    Ok(path)
190                }
191            }
192        }
193    }
194
195    fn path(&self, path: &str) -> ZkResult<String> {
196        match self.chroot {
197            Some(ref chroot) => match path {
198                "/" => Ok(chroot.clone()),
199                path => Ok(chroot.clone() + Self::validate_path(path)?),
200            },
201            None => Ok(Self::validate_path(path)?.to_owned()),
202        }
203    }
204
205    fn cut_chroot(&self, path: String) -> String {
206        if let Some(ref chroot) = self.chroot {
207            path[chroot.len()..].to_owned()
208        } else {
209            path
210        }
211    }
212
213    /// Add the specified `scheme`:`auth` information to this connection.
214    ///
215    /// See `Acl` for more information.
216    pub async fn add_auth<S: ToString>(&self, scheme: S, auth: Vec<u8>) -> ZkResult<()> {
217        trace!("ZooKeeper::add_auth");
218        let req = AuthRequest {
219            typ: 0,
220            scheme: scheme.to_string(),
221            auth,
222        };
223
224        let _: EmptyResponse = self.request(OpCode::Auth, -4, req, None).await?;
225
226        Ok(())
227    }
228
229    /// Create a node with the given `path`. The node data will be the given `data`, and node ACL
230    /// will be the given `acl`. The `mode` argument specifies the behavior of the created node (see
231    /// `CreateMode` for more information).
232    ///
233    /// This operation, if successful, will trigger all the watches left on the node of the given
234    /// path by `exists` and `get_data` API calls, and the watches left on the parent node by
235    /// `get_children` API calls.
236    ///
237    /// # Errors
238    /// If a node with the same actual path already exists in the ZooKeeper, the result will have
239    /// `Err(ZkError::NodeExists)`. Note that since a different actual path is used for each
240    /// invocation of creating sequential node with the same path argument, the call should never
241    /// error in this manner.
242    ///
243    /// If the parent node does not exist in the ZooKeeper, `Err(ZkError::NoNode)` will be returned.
244    ///
245    /// An ephemeral node cannot have children. If the parent node of the given path is ephemeral,
246    /// `Err(ZkError::NoChildrenForEphemerals)` will be returned.
247    ///
248    /// If the `acl` is invalid or empty, `Err(ZkError::InvalidACL)` is returned.
249    ///
250    /// The maximum allowable size of the data array is 1 MiB (1,048,576 bytes). Arrays larger than
251    /// this will return `Err(ZkError::BadArguments)`.
252    pub async fn create(
253        &self,
254        path: &str,
255        data: Vec<u8>,
256        acl: Vec<Acl>,
257        mode: CreateMode,
258    ) -> ZkResult<String> {
259        trace!("ZooKeeper::create");
260        let req = CreateRequest {
261            path: self.path(path)?,
262            data,
263            acl,
264            flags: mode as i32,
265        };
266
267        let response: CreateResponse = self.request(OpCode::Create, self.xid(), req, None).await?;
268
269        Ok(self.cut_chroot(response.path))
270    }
271
272    /// Create a node with the given `path`. The node data will be the given `data`, and node ACL
273    /// will be the given `acl`. The `mode` argument specifies the behavior of the created node (see
274    /// `CreateMode` for more information).
275    /// The `ttl` argument specifies the time to live of the created node.
276    ///
277    /// This operation, if successful, will trigger all the watches left on the node of the given
278    /// path by `exists` and `get_data` API calls, and the watches left on the parent node by
279    /// `get_children` API calls.
280    ///
281    /// # Errors
282    /// If a node with the same actual path already exists in the ZooKeeper, the result will have
283    /// `Err(ZkError::NodeExists)`. Note that since a different actual path is used for each
284    /// invocation of creating sequential node with the same path argument, the call should never
285    /// error in this manner.
286    ///
287    /// If the parent node does not exist in the ZooKeeper, `Err(ZkError::NoNode)` will be returned.
288    ///
289    /// An ephemeral node cannot have children. If the parent node of the given path is ephemeral,
290    /// `Err(ZkError::NoChildrenForEphemerals)` will be returned.
291    ///
292    /// If the `acl` is invalid or empty, `Err(ZkError::InvalidACL)` is returned.
293    ///
294    /// If the `CreateTtl` opcode is not supported by the server, `Err(ZkError::Unimplemented)` is returned.
295    ///
296    /// The maximum allowable size of the data array is 1 MiB (1,048,576 bytes). Arrays larger than
297    /// this will return `Err(ZkError::BadArguments)`.
298    pub async fn create_ttl(
299        &self,
300        path: &str,
301        data: Vec<u8>,
302        acl: Vec<Acl>,
303        mode: CreateMode,
304        ttl: Duration,
305    ) -> ZkResult<String> {
306        trace!("ZooKeeper::create_ttl");
307        let req = CreateTTLRequest {
308            path: self.path(path)?,
309            data,
310            acl,
311            flags: mode as i32,
312            ttl: ttl.as_millis() as i64,
313        };
314
315        let response: CreateResponse = self
316            .request(OpCode::CreateTtl, self.xid(), req, None)
317            .await?;
318
319        Ok(self.cut_chroot(response.path))
320    }
321
322    /// Create a node with the given `path`. The node data will be the given `data`, and node ACL
323    /// will be the given `acl`. The `mode` argument specifies the behavior of the created node (see
324    /// `CreateMode` for more information).
325    ///
326    /// This operation, if successful, will trigger all the watches left on the node of the given
327    /// path by `exists` and `get_data` API calls, and the watches left on the parent node by
328    /// `get_children` API calls.
329    ///
330    /// # Errors
331    /// If a node with the same actual path already exists in the ZooKeeper, the result will have
332    /// `Err(ZkError::NodeExists)`. Note that since a different actual path is used for each
333    /// invocation of creating sequential node with the same path argument, the call should never
334    /// error in this manner.
335    ///
336    /// If the parent node does not exist in the ZooKeeper, `Err(ZkError::NoNode)` will be returned.
337    ///
338    /// An ephemeral node cannot have children. If the parent node of the given path is ephemeral,
339    /// `Err(ZkError::NoChildrenForEphemerals)` will be returned.
340    ///
341    /// If the `acl` is invalid or empty, `Err(ZkError::InvalidACL)` is returned.
342    ///
343    /// If the `Create2` opcode is not supported by the server, `Err(ZkError::Unimplemented)` is returned.
344    ///
345    /// The maximum allowable size of the data array is 1 MiB (1,048,576 bytes). Arrays larger than
346    /// this will return `Err(ZkError::BadArguments)`.
347    pub async fn create2(
348        &self,
349        path: &str,
350        data: Vec<u8>,
351        acl: Vec<Acl>,
352        mode: CreateMode,
353    ) -> ZkResult<(String, Stat)> {
354        trace!("ZooKeeper::create2");
355        let req = CreateRequest {
356            path: self.path(path)?,
357            data,
358            acl,
359            flags: mode as i32,
360        };
361        let response: Create2Response =
362            self.request(OpCode::Create2, self.xid(), req, None).await?;
363
364        Ok((self.cut_chroot(response.path), response.stat))
365    }
366
367    /// Create a node with the given `path`. The node data will be the given `data`, and node ACL
368    /// will be the given `acl`. The `mode` argument specifies the behavior of the created node (see
369    /// `CreateMode` for more information).
370    /// The `ttl` argument specifies the time to live of the created node.
371    ///
372    /// This operation, if successful, will trigger all the watches left on the node of the given
373    /// path by `exists` and `get_data` API calls, and the watches left on the parent node by
374    /// `get_children` API calls.
375    ///
376    /// # Errors
377    /// If a node with the same actual path already exists in the ZooKeeper, the result will have
378    /// `Err(ZkError::NodeExists)`. Note that since a different actual path is used for each
379    /// invocation of creating sequential node with the same path argument, the call should never
380    /// error in this manner.
381    ///
382    /// If the parent node does not exist in the ZooKeeper, `Err(ZkError::NoNode)` will be returned.
383    ///
384    /// An ephemeral node cannot have children. If the parent node of the given path is ephemeral,
385    /// `Err(ZkError::NoChildrenForEphemerals)` will be returned.
386    ///
387    /// If the `acl` is invalid or empty, `Err(ZkError::InvalidACL)` is returned.
388    ///
389    /// If the `CreateTtl` opcode is not supported by the server, `Err(ZkError::Unimplemented)` is returned.
390    ///
391    /// The maximum allowable size of the data array is 1 MiB (1,048,576 bytes). Arrays larger than
392    /// this will return `Err(ZkError::BadArguments)`.
393    pub async fn create2_ttl(
394        &self,
395        path: &str,
396        data: Vec<u8>,
397        acl: Vec<Acl>,
398        mode: CreateMode,
399        ttl: Duration,
400    ) -> ZkResult<(String, Stat)> {
401        trace!("ZooKeeper::create2_ttl");
402        let req = CreateTTLRequest {
403            path: self.path(path)?,
404            data,
405            acl,
406            flags: mode as i32,
407            ttl: ttl.as_millis() as i64,
408        };
409
410        let response: Create2Response = self
411            .request(OpCode::CreateTtl, self.xid(), req, None)
412            .await?;
413
414        Ok((self.cut_chroot(response.path), response.stat))
415    }
416
417    /// Delete the node with the given `path`. The call will succeed if such a node exists, and the
418    /// given `version` matches the node's version (if the given version is `None`, it matches any
419    /// node's versions).
420    ///
421    /// This operation, if successful, will trigger all the watches on the node of the given path
422    /// left by `exists` API calls, watches left by `get_data` API calls, and the watches on the
423    /// parent node left by `get_children` API calls.
424    ///
425    /// # Errors
426    /// If the nodes does not exist, `Err(ZkError::NoNode)` will be returned.
427    ///
428    /// If the given `version` does not match the node's version, `Err(ZkError::BadVersion)` will be
429    /// returned.
430    ///
431    /// If the node has children, `Err(ZkError::NotEmpty)` will be returned.
432    pub async fn delete(&self, path: &str, version: Option<i32>) -> ZkResult<()> {
433        trace!("ZooKeeper::delete");
434        let req = DeleteRequest {
435            path: self.path(path)?,
436            version: version.unwrap_or(-1),
437        };
438
439        let _: EmptyResponse = self.request(OpCode::Delete, self.xid(), req, None).await?;
440
441        Ok(())
442    }
443
444    /// Return the `Stat` of the node of the given `path` or `None` if no such node exists.
445    ///
446    /// If the `watch` is `true` and the call is successful (no error is returned), a watch will be
447    /// left on the node with the given path. The watch will be triggered by a successful operation
448    /// that creates/delete the node or sets the data on the node.
449    pub async fn exists(&self, path: &str, watch: bool) -> ZkResult<Option<Stat>> {
450        trace!("ZooKeeper::exists");
451        let req = ExistsRequest {
452            path: self.path(path)?,
453            watch,
454        };
455
456        match self
457            .request::<ExistsRequest, ExistsResponse>(OpCode::Exists, self.xid(), req, None)
458            .await
459        {
460            Ok(response) => Ok(Some(response.stat)),
461            Err(ZkError::NoNode) => Ok(None),
462            Err(e) => Err(e),
463        }
464    }
465
466    /// Return the `Stat` of the node of the given `path` or `None` if no such node exists.
467    ///
468    /// Similar to `exists`, but sets an explicit watcher instead of relying on the client's base
469    /// `Watcher`.
470    pub async fn exists_w<W: FnOnce(WatchedEvent) + Send + 'static>(
471        &self,
472        path: &str,
473        watcher: W,
474    ) -> ZkResult<Option<Stat>> {
475        trace!("ZooKeeper::exists_w");
476        let req = ExistsRequest {
477            path: self.path(path)?,
478            watch: true,
479        };
480
481        let watch = Watch {
482            path: path.to_owned(),
483            watch_type: WatchType::Exist,
484            watcher: Box::new(watcher),
485        };
486
487        match self
488            .request::<ExistsRequest, ExistsResponse>(OpCode::Exists, self.xid(), req, Some(watch))
489            .await
490        {
491            Ok(response) => Ok(Some(response.stat)),
492            Err(ZkError::NoNode) => Ok(None),
493            Err(e) => Err(e),
494        }
495    }
496
497    /// Return the ACL and `Stat` of the node of the given path.
498    ///
499    /// # Errors
500    /// If no node with the given path exists, `Err(ZkError::NoNode)` will be returned.
501    pub async fn get_acl(&self, path: &str) -> ZkResult<(Vec<Acl>, Stat)> {
502        trace!("ZooKeeper::get_acl");
503        let req = GetAclRequest {
504            path: self.path(path)?,
505        };
506
507        let response: GetAclResponse = self.request(OpCode::GetAcl, self.xid(), req, None).await?;
508
509        Ok(response.acl_stat)
510    }
511
512    /// Set the ACL for the node of the given path if such a node exists and the given version
513    /// matches the version of the node. Return the `Stat` of the node.
514    ///
515    /// # Errors
516    /// If no node with the given path exists, `Err(ZkError::NoNode)` will be returned.
517    ///
518    /// If the given version does not match the node's version, `Err(ZkError::BadVersion)` will be
519    /// returned.
520    pub async fn set_acl(&self, path: &str, acl: Vec<Acl>, version: Option<i32>) -> ZkResult<Stat> {
521        trace!("ZooKeeper::set_acl");
522        let req = SetAclRequest {
523            path: self.path(path)?,
524            acl,
525            version: version.unwrap_or(-1),
526        };
527
528        let response: SetAclResponse = self.request(OpCode::SetAcl, self.xid(), req, None).await?;
529
530        Ok(response.stat)
531    }
532
533    /// Return the list of the children of the node of the given `path`. The returned values are not
534    /// prefixed with the provided `path`; i.e. if the database contains `/path/a` and `/path/b`,
535    /// the result of `get_children` for `"/path"` will be `["a", "b"]`.
536    ///
537    /// If the `watch` is `true` and the call is successful (no error is returned), a watch will be
538    /// left on the node with the given path. The watch will be triggered by a successful operation
539    /// that deletes the node of the given path or creates/delete a child under the node.
540    ///
541    /// The list of children returned is not sorted and no guarantee is provided as to its natural
542    /// or lexical order.
543    ///
544    /// # Errors
545    /// If no node with the given path exists, `Err(ZkError::NoNode)` will be returned.
546    pub async fn get_children(&self, path: &str, watch: bool) -> ZkResult<Vec<String>> {
547        trace!("ZooKeeper::get_children");
548        let req = GetChildrenRequest {
549            path: self.path(path)?,
550            watch,
551        };
552
553        let response: GetChildrenResponse = self
554            .request(OpCode::GetChildren, self.xid(), req, None)
555            .await?;
556
557        Ok(response.children)
558    }
559
560    /// Return the list of the children of the node of the given `path`.
561    ///
562    /// Similar to `get_children`, but sets an explicit watcher instead of relying on the client's
563    /// base `Watcher`.
564    pub async fn get_children_w<W: FnOnce(WatchedEvent) + Send + 'static>(
565        &self,
566        path: &str,
567        watcher: W,
568    ) -> ZkResult<Vec<String>> {
569        trace!("ZooKeeper::get_children_w");
570        let req = GetChildrenRequest {
571            path: self.path(path)?,
572            watch: true,
573        };
574
575        let watch = Watch {
576            path: path.to_owned(),
577            watch_type: WatchType::Child,
578            watcher: Box::new(watcher),
579        };
580
581        let response: GetChildrenResponse = self
582            .request(OpCode::GetChildren, self.xid(), req, Some(watch))
583            .await?;
584
585        Ok(response.children)
586    }
587
588    /// Return the data and the `Stat` of the node of the given path.
589    ///
590    /// If `watch` is `true` and the call is successful (no error is returned), a watch will be left
591    /// on the node with the given path. The watch will be triggered by a successful operation that
592    /// sets data on the node, or deletes the node.
593    ///
594    /// # Errors
595    /// If no node with the given path exists, `Err(ZkError::NoNode)` will be returned.
596    pub async fn get_data(&self, path: &str, watch: bool) -> ZkResult<(Vec<u8>, Stat)> {
597        trace!("ZooKeeper::get_data");
598        let req = GetDataRequest {
599            path: self.path(path)?,
600            watch,
601        };
602
603        let response: GetDataResponse =
604            self.request(OpCode::GetData, self.xid(), req, None).await?;
605
606        Ok(response.data_stat)
607    }
608
609    /// Return the data and the `Stat` of the node of the given path.
610    ///
611    /// Similar to `get_data`, but sets an explicit watcher instead of relying on the client's
612    /// base `Watcher`.
613    pub async fn get_data_w<W: FnOnce(WatchedEvent) + Send + 'static>(
614        &self,
615        path: &str,
616        watcher: W,
617    ) -> ZkResult<(Vec<u8>, Stat)> {
618        trace!("ZooKeeper::get_data_w");
619        let req = GetDataRequest {
620            path: self.path(path)?,
621            watch: true,
622        };
623
624        let watch = Watch {
625            path: path.to_owned(),
626            watch_type: WatchType::Data,
627            watcher: Box::new(watcher),
628        };
629
630        let response: GetDataResponse = self
631            .request(OpCode::GetData, self.xid(), req, Some(watch))
632            .await?;
633
634        Ok(response.data_stat)
635    }
636
637    /// Set the data for the node of the given `path` if such a node exists and the given version
638    /// matches the version of the node (if the given version is `None`, it matches any node's
639    /// versions). Return the `Stat` of the node.
640    ///
641    /// This operation, if successful, will trigger all the watches on the node of the given `path`
642    /// left by `get_data` calls.
643    ///
644    /// # Errors
645    /// If no node with the given `path` exists, `Err(ZkError::NoNode)` will be returned.
646    ///
647    /// If the given version does not match the node's version, `Err(ZkError::BadVersion)` will be
648    /// returned.
649    ///
650    /// The maximum allowable size of the `data` array is 1 MiB (1,048,576 bytes). Arrays larger
651    /// than this will return `Err(ZkError::BadArguments)`.
652    pub async fn set_data(
653        &self,
654        path: &str,
655        data: Vec<u8>,
656        version: Option<i32>,
657    ) -> ZkResult<Stat> {
658        trace!("ZooKeeper::set_data");
659        let req = SetDataRequest {
660            path: self.path(path)?,
661            data,
662            version: version.unwrap_or(-1),
663        };
664
665        let response: SetDataResponse =
666            self.request(OpCode::SetData, self.xid(), req, None).await?;
667
668        Ok(response.stat)
669    }
670
671    /// Adds a state change `Listener`, which will be notified of changes to the client's `ZkState`.
672    /// A unique identifier is returned, which is used in `remove_listener` to un-subscribe.
673    pub fn add_listener<Listener: Fn(ZkState) + Send + 'static>(
674        &self,
675        listener: Listener,
676    ) -> Subscription {
677        trace!("ZooKeeper::add_listener");
678        self.listeners.subscribe(listener)
679    }
680
681    /// Removes a state change `Listener` and closes the channel.
682    pub fn remove_listener(&self, sub: Subscription) {
683        trace!("ZooKeeper::remove_listener");
684        self.listeners.unsubscribe(sub);
685    }
686
687    /// Close this client object. Once the client is closed, its session becomes invalid. All the
688    /// ephemeral nodes in the ZooKeeper server associated with the session will be removed. The
689    /// watches left on those nodes (and on their parents) will be triggered.
690    ///
691    /// **NOTE: Due to missing support for async drop at the moment, dropping self will not call
692    /// close.**
693    pub async fn close(&self) -> ZkResult<()> {
694        trace!("ZooKeeper::close");
695        let _: EmptyResponse = self
696            .request(OpCode::CloseSession, 0, EmptyRequest, None)
697            .await?;
698
699        Ok(())
700    }
701}
702
703#[cfg(test)]
704mod tests {
705    use super::ZooKeeper;
706
707    #[test]
708    fn parse_connect_string() {
709        use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
710
711        let (addrs, chroot) =
712            ZooKeeper::parse_connect_string("127.0.0.1:2181,::1:2181/mesos").expect("Parse 1");
713        assert_eq!(
714            addrs,
715            vec![
716                SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 2181)),
717                SocketAddr::V6(SocketAddrV6::new(
718                    Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1),
719                    2181,
720                    0,
721                    0
722                ))
723            ]
724        );
725        assert_eq!(chroot, Some("/mesos".to_owned()));
726
727        let (addrs, chroot) = ZooKeeper::parse_connect_string("::1:2181").expect("Parse 2");
728        assert_eq!(
729            addrs,
730            vec![SocketAddr::V6(SocketAddrV6::new(
731                Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1),
732                2181,
733                0,
734                0
735            ))]
736        );
737        assert_eq!(chroot, None);
738
739        let (addrs, chroot) = ZooKeeper::parse_connect_string("::1:2181/").expect("Parse 3");
740        assert_eq!(
741            addrs,
742            vec![SocketAddr::V6(SocketAddrV6::new(
743                Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1),
744                2181,
745                0,
746                0
747            ))]
748        );
749        assert_eq!(chroot, None);
750    }
751
752    #[test]
753    #[should_panic(expected = "BadArguments")]
754    fn parse_connect_string_fails() {
755        // This fails with ZooKeeper.java: Path must not end with / character
756        ZooKeeper::parse_connect_string("127.0.0.1:2181/mesos/").unwrap();
757    }
758}