zookeeper_async/zookeeper.rs
1use std::convert::From;
2use std::fmt::{Debug, Formatter, Result as FmtResult};
3use std::net::{SocketAddr, ToSocketAddrs};
4use std::result;
5use std::string::ToString;
6use std::sync::atomic::{AtomicIsize, Ordering};
7use std::time::Duration;
8use tokio::sync::mpsc::Sender;
9use tokio::sync::oneshot::{channel, Sender as OneshotSender};
10use tokio::sync::Mutex;
11use tracing::*;
12
13use crate::io::ZkIo;
14use crate::listeners::ListenerSet;
15use crate::proto::{
16 to_len_prefixed_buf, AuthRequest, ByteBuf, Create2Response, CreateRequest, CreateResponse,
17 CreateTTLRequest, DeleteRequest, EmptyRequest, EmptyResponse, ExistsRequest, ExistsResponse,
18 GetAclRequest, GetAclResponse, GetChildrenRequest, GetChildrenResponse, GetDataRequest,
19 GetDataResponse, OpCode, ReadFrom, ReplyHeader, RequestHeader, SetAclRequest, SetAclResponse,
20 SetDataRequest, SetDataResponse, WriteTo,
21};
22use crate::watch::ZkWatch;
23use crate::{
24 Acl, CreateMode, Stat, Subscription, Watch, WatchType, WatchedEvent, Watcher, ZkError, ZkState,
25};
26
27/// Value returned from potentially-error operations.
28pub type ZkResult<T> = result::Result<T, ZkError>;
29
30pub struct RawRequest {
31 pub opcode: OpCode,
32 pub data: ByteBuf,
33 pub listener: Option<OneshotSender<RawResponse>>,
34 pub watch: Option<Watch>,
35}
36
37impl Debug for RawRequest {
38 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
39 f.debug_struct("RawRequest")
40 .field("opcode", &self.opcode)
41 .field("data", &self.data)
42 .finish()
43 }
44}
45
46#[derive(Debug)]
47pub struct RawResponse {
48 pub header: ReplyHeader,
49 pub data: ByteBuf,
50}
51
52/// The client interface for interacting with a ZooKeeper cluster.
53pub struct ZooKeeper {
54 chroot: Option<String>,
55 xid: AtomicIsize,
56 io: Mutex<Sender<RawRequest>>,
57 listeners: ListenerSet<ZkState>,
58}
59
60impl ZooKeeper {
61 /// Connect to a ZooKeeper cluster.
62 ///
63 /// - `connect_string`: comma separated host:port pairs, each corresponding to a zk server,
64 /// e.g. `"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"` If the optional chroot suffix is
65 /// used the example would look like: `"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"`
66 /// where the client would be rooted at `"/app/a"` and all paths would be relative to this
67 /// root -- ie getting/setting/etc...`"/foo/bar"` would result in operations being run on
68 /// `"/app/a/foo/bar"` (from the server perspective).
69 /// - `timeout`: session timeout -- how long should a client go without receiving communication
70 /// from a server before considering it connection loss?
71 /// - `watcher`: a watcher object to be notified of connection state changes.
72 /// - `retry_time`: when the connection is lost, reconnect for a longer time to avoid
73 /// reconnecting too quickly
74 pub async fn connect_with_retry_time<W>(
75 connect_string: &str,
76 timeout: Duration,
77 watcher: W,
78 retry_time: Duration,
79 ) -> ZkResult<ZooKeeper>
80 where
81 W: Watcher + 'static,
82 {
83 let (addrs, chroot) = Self::parse_connect_string(connect_string)?;
84
85 debug!("Initiating connection to {}", connect_string);
86
87 let (watch, watch_sender) = ZkWatch::new(watcher, chroot.clone());
88 let listeners = ListenerSet::<ZkState>::new();
89 let listeners1 = listeners.clone();
90 let io = ZkIo::new(addrs.clone(), timeout, retry_time, watch_sender, listeners1).await;
91 let sender = io.sender();
92
93 tokio::spawn(watch.run());
94
95 tokio::spawn(io.run());
96
97 trace!("Returning a ZooKeeper");
98
99 Ok(ZooKeeper {
100 chroot,
101 xid: AtomicIsize::new(1),
102 io: Mutex::new(sender),
103 listeners,
104 })
105 }
106
107 pub async fn connect<W>(
108 connect_string: &str,
109 timeout: Duration,
110 watcher: W,
111 ) -> ZkResult<ZooKeeper>
112 where
113 W: Watcher + 'static,
114 {
115 Self::connect_with_retry_time(connect_string, timeout, watcher, Duration::from_secs(0))
116 .await
117 }
118
119 fn parse_connect_string(connect_string: &str) -> ZkResult<(Vec<SocketAddr>, Option<String>)> {
120 let (chroot, end) = match connect_string.find('/') {
121 Some(start) => match &connect_string[start..connect_string.len()] {
122 "" | "/" => (None, start),
123 chroot => (Some(Self::validate_path(chroot)?.to_owned()), start),
124 },
125 None => (None, connect_string.len()),
126 };
127
128 let mut addrs = Vec::new();
129 for addr_str in connect_string[..end].split(',') {
130 let addr = match addr_str.trim().to_socket_addrs() {
131 Ok(mut addrs) => match addrs.next() {
132 Some(addr) => addr,
133 None => return Err(ZkError::BadArguments),
134 },
135 Err(_) => return Err(ZkError::BadArguments),
136 };
137 addrs.push(addr);
138 }
139
140 Ok((addrs, chroot))
141 }
142
143 fn xid(&self) -> i32 {
144 self.xid.fetch_add(1, Ordering::Relaxed) as i32
145 }
146
147 async fn request<Req: WriteTo, Resp: ReadFrom>(
148 &self,
149 opcode: OpCode,
150 xid: i32,
151 req: Req,
152 watch: Option<Watch>,
153 ) -> ZkResult<Resp> {
154 trace!("request opcode={:?} xid={:?}", opcode, xid);
155 let rh = RequestHeader { xid, opcode };
156 let buf = to_len_prefixed_buf(rh, req).map_err(|_| ZkError::MarshallingError)?;
157
158 let (resp_tx, resp_rx) = channel();
159 let request = RawRequest {
160 opcode,
161 data: buf,
162 listener: Some(resp_tx),
163 watch,
164 };
165
166 self.io.lock().await.send(request).await.map_err(|_| {
167 warn!("error sending request");
168 ZkError::ConnectionLoss
169 })?;
170
171 let mut response = resp_rx.await.map_err(|err| {
172 warn!("error receiving response: {:?}", err);
173 ZkError::ConnectionLoss
174 })?;
175
176 match response.header.err {
177 0 => Ok(ReadFrom::read_from(&mut response.data).map_err(|_| ZkError::MarshallingError)?),
178 e => Err(ZkError::from(e)),
179 }
180 }
181
182 fn validate_path(path: &str) -> ZkResult<&str> {
183 match path {
184 "" => Err(ZkError::BadArguments),
185 path => {
186 if path.len() > 1 && path.ends_with('/') {
187 Err(ZkError::BadArguments)
188 } else {
189 Ok(path)
190 }
191 }
192 }
193 }
194
195 fn path(&self, path: &str) -> ZkResult<String> {
196 match self.chroot {
197 Some(ref chroot) => match path {
198 "/" => Ok(chroot.clone()),
199 path => Ok(chroot.clone() + Self::validate_path(path)?),
200 },
201 None => Ok(Self::validate_path(path)?.to_owned()),
202 }
203 }
204
205 fn cut_chroot(&self, path: String) -> String {
206 if let Some(ref chroot) = self.chroot {
207 path[chroot.len()..].to_owned()
208 } else {
209 path
210 }
211 }
212
213 /// Add the specified `scheme`:`auth` information to this connection.
214 ///
215 /// See `Acl` for more information.
216 pub async fn add_auth<S: ToString>(&self, scheme: S, auth: Vec<u8>) -> ZkResult<()> {
217 trace!("ZooKeeper::add_auth");
218 let req = AuthRequest {
219 typ: 0,
220 scheme: scheme.to_string(),
221 auth,
222 };
223
224 let _: EmptyResponse = self.request(OpCode::Auth, -4, req, None).await?;
225
226 Ok(())
227 }
228
229 /// Create a node with the given `path`. The node data will be the given `data`, and node ACL
230 /// will be the given `acl`. The `mode` argument specifies the behavior of the created node (see
231 /// `CreateMode` for more information).
232 ///
233 /// This operation, if successful, will trigger all the watches left on the node of the given
234 /// path by `exists` and `get_data` API calls, and the watches left on the parent node by
235 /// `get_children` API calls.
236 ///
237 /// # Errors
238 /// If a node with the same actual path already exists in the ZooKeeper, the result will have
239 /// `Err(ZkError::NodeExists)`. Note that since a different actual path is used for each
240 /// invocation of creating sequential node with the same path argument, the call should never
241 /// error in this manner.
242 ///
243 /// If the parent node does not exist in the ZooKeeper, `Err(ZkError::NoNode)` will be returned.
244 ///
245 /// An ephemeral node cannot have children. If the parent node of the given path is ephemeral,
246 /// `Err(ZkError::NoChildrenForEphemerals)` will be returned.
247 ///
248 /// If the `acl` is invalid or empty, `Err(ZkError::InvalidACL)` is returned.
249 ///
250 /// The maximum allowable size of the data array is 1 MiB (1,048,576 bytes). Arrays larger than
251 /// this will return `Err(ZkError::BadArguments)`.
252 pub async fn create(
253 &self,
254 path: &str,
255 data: Vec<u8>,
256 acl: Vec<Acl>,
257 mode: CreateMode,
258 ) -> ZkResult<String> {
259 trace!("ZooKeeper::create");
260 let req = CreateRequest {
261 path: self.path(path)?,
262 data,
263 acl,
264 flags: mode as i32,
265 };
266
267 let response: CreateResponse = self.request(OpCode::Create, self.xid(), req, None).await?;
268
269 Ok(self.cut_chroot(response.path))
270 }
271
272 /// Create a node with the given `path`. The node data will be the given `data`, and node ACL
273 /// will be the given `acl`. The `mode` argument specifies the behavior of the created node (see
274 /// `CreateMode` for more information).
275 /// The `ttl` argument specifies the time to live of the created node.
276 ///
277 /// This operation, if successful, will trigger all the watches left on the node of the given
278 /// path by `exists` and `get_data` API calls, and the watches left on the parent node by
279 /// `get_children` API calls.
280 ///
281 /// # Errors
282 /// If a node with the same actual path already exists in the ZooKeeper, the result will have
283 /// `Err(ZkError::NodeExists)`. Note that since a different actual path is used for each
284 /// invocation of creating sequential node with the same path argument, the call should never
285 /// error in this manner.
286 ///
287 /// If the parent node does not exist in the ZooKeeper, `Err(ZkError::NoNode)` will be returned.
288 ///
289 /// An ephemeral node cannot have children. If the parent node of the given path is ephemeral,
290 /// `Err(ZkError::NoChildrenForEphemerals)` will be returned.
291 ///
292 /// If the `acl` is invalid or empty, `Err(ZkError::InvalidACL)` is returned.
293 ///
294 /// If the `CreateTtl` opcode is not supported by the server, `Err(ZkError::Unimplemented)` is returned.
295 ///
296 /// The maximum allowable size of the data array is 1 MiB (1,048,576 bytes). Arrays larger than
297 /// this will return `Err(ZkError::BadArguments)`.
298 pub async fn create_ttl(
299 &self,
300 path: &str,
301 data: Vec<u8>,
302 acl: Vec<Acl>,
303 mode: CreateMode,
304 ttl: Duration,
305 ) -> ZkResult<String> {
306 trace!("ZooKeeper::create_ttl");
307 let req = CreateTTLRequest {
308 path: self.path(path)?,
309 data,
310 acl,
311 flags: mode as i32,
312 ttl: ttl.as_millis() as i64,
313 };
314
315 let response: CreateResponse = self
316 .request(OpCode::CreateTtl, self.xid(), req, None)
317 .await?;
318
319 Ok(self.cut_chroot(response.path))
320 }
321
322 /// Create a node with the given `path`. The node data will be the given `data`, and node ACL
323 /// will be the given `acl`. The `mode` argument specifies the behavior of the created node (see
324 /// `CreateMode` for more information).
325 ///
326 /// This operation, if successful, will trigger all the watches left on the node of the given
327 /// path by `exists` and `get_data` API calls, and the watches left on the parent node by
328 /// `get_children` API calls.
329 ///
330 /// # Errors
331 /// If a node with the same actual path already exists in the ZooKeeper, the result will have
332 /// `Err(ZkError::NodeExists)`. Note that since a different actual path is used for each
333 /// invocation of creating sequential node with the same path argument, the call should never
334 /// error in this manner.
335 ///
336 /// If the parent node does not exist in the ZooKeeper, `Err(ZkError::NoNode)` will be returned.
337 ///
338 /// An ephemeral node cannot have children. If the parent node of the given path is ephemeral,
339 /// `Err(ZkError::NoChildrenForEphemerals)` will be returned.
340 ///
341 /// If the `acl` is invalid or empty, `Err(ZkError::InvalidACL)` is returned.
342 ///
343 /// If the `Create2` opcode is not supported by the server, `Err(ZkError::Unimplemented)` is returned.
344 ///
345 /// The maximum allowable size of the data array is 1 MiB (1,048,576 bytes). Arrays larger than
346 /// this will return `Err(ZkError::BadArguments)`.
347 pub async fn create2(
348 &self,
349 path: &str,
350 data: Vec<u8>,
351 acl: Vec<Acl>,
352 mode: CreateMode,
353 ) -> ZkResult<(String, Stat)> {
354 trace!("ZooKeeper::create2");
355 let req = CreateRequest {
356 path: self.path(path)?,
357 data,
358 acl,
359 flags: mode as i32,
360 };
361 let response: Create2Response =
362 self.request(OpCode::Create2, self.xid(), req, None).await?;
363
364 Ok((self.cut_chroot(response.path), response.stat))
365 }
366
367 /// Create a node with the given `path`. The node data will be the given `data`, and node ACL
368 /// will be the given `acl`. The `mode` argument specifies the behavior of the created node (see
369 /// `CreateMode` for more information).
370 /// The `ttl` argument specifies the time to live of the created node.
371 ///
372 /// This operation, if successful, will trigger all the watches left on the node of the given
373 /// path by `exists` and `get_data` API calls, and the watches left on the parent node by
374 /// `get_children` API calls.
375 ///
376 /// # Errors
377 /// If a node with the same actual path already exists in the ZooKeeper, the result will have
378 /// `Err(ZkError::NodeExists)`. Note that since a different actual path is used for each
379 /// invocation of creating sequential node with the same path argument, the call should never
380 /// error in this manner.
381 ///
382 /// If the parent node does not exist in the ZooKeeper, `Err(ZkError::NoNode)` will be returned.
383 ///
384 /// An ephemeral node cannot have children. If the parent node of the given path is ephemeral,
385 /// `Err(ZkError::NoChildrenForEphemerals)` will be returned.
386 ///
387 /// If the `acl` is invalid or empty, `Err(ZkError::InvalidACL)` is returned.
388 ///
389 /// If the `CreateTtl` opcode is not supported by the server, `Err(ZkError::Unimplemented)` is returned.
390 ///
391 /// The maximum allowable size of the data array is 1 MiB (1,048,576 bytes). Arrays larger than
392 /// this will return `Err(ZkError::BadArguments)`.
393 pub async fn create2_ttl(
394 &self,
395 path: &str,
396 data: Vec<u8>,
397 acl: Vec<Acl>,
398 mode: CreateMode,
399 ttl: Duration,
400 ) -> ZkResult<(String, Stat)> {
401 trace!("ZooKeeper::create2_ttl");
402 let req = CreateTTLRequest {
403 path: self.path(path)?,
404 data,
405 acl,
406 flags: mode as i32,
407 ttl: ttl.as_millis() as i64,
408 };
409
410 let response: Create2Response = self
411 .request(OpCode::CreateTtl, self.xid(), req, None)
412 .await?;
413
414 Ok((self.cut_chroot(response.path), response.stat))
415 }
416
417 /// Delete the node with the given `path`. The call will succeed if such a node exists, and the
418 /// given `version` matches the node's version (if the given version is `None`, it matches any
419 /// node's versions).
420 ///
421 /// This operation, if successful, will trigger all the watches on the node of the given path
422 /// left by `exists` API calls, watches left by `get_data` API calls, and the watches on the
423 /// parent node left by `get_children` API calls.
424 ///
425 /// # Errors
426 /// If the nodes does not exist, `Err(ZkError::NoNode)` will be returned.
427 ///
428 /// If the given `version` does not match the node's version, `Err(ZkError::BadVersion)` will be
429 /// returned.
430 ///
431 /// If the node has children, `Err(ZkError::NotEmpty)` will be returned.
432 pub async fn delete(&self, path: &str, version: Option<i32>) -> ZkResult<()> {
433 trace!("ZooKeeper::delete");
434 let req = DeleteRequest {
435 path: self.path(path)?,
436 version: version.unwrap_or(-1),
437 };
438
439 let _: EmptyResponse = self.request(OpCode::Delete, self.xid(), req, None).await?;
440
441 Ok(())
442 }
443
444 /// Return the `Stat` of the node of the given `path` or `None` if no such node exists.
445 ///
446 /// If the `watch` is `true` and the call is successful (no error is returned), a watch will be
447 /// left on the node with the given path. The watch will be triggered by a successful operation
448 /// that creates/delete the node or sets the data on the node.
449 pub async fn exists(&self, path: &str, watch: bool) -> ZkResult<Option<Stat>> {
450 trace!("ZooKeeper::exists");
451 let req = ExistsRequest {
452 path: self.path(path)?,
453 watch,
454 };
455
456 match self
457 .request::<ExistsRequest, ExistsResponse>(OpCode::Exists, self.xid(), req, None)
458 .await
459 {
460 Ok(response) => Ok(Some(response.stat)),
461 Err(ZkError::NoNode) => Ok(None),
462 Err(e) => Err(e),
463 }
464 }
465
466 /// Return the `Stat` of the node of the given `path` or `None` if no such node exists.
467 ///
468 /// Similar to `exists`, but sets an explicit watcher instead of relying on the client's base
469 /// `Watcher`.
470 pub async fn exists_w<W: FnOnce(WatchedEvent) + Send + 'static>(
471 &self,
472 path: &str,
473 watcher: W,
474 ) -> ZkResult<Option<Stat>> {
475 trace!("ZooKeeper::exists_w");
476 let req = ExistsRequest {
477 path: self.path(path)?,
478 watch: true,
479 };
480
481 let watch = Watch {
482 path: path.to_owned(),
483 watch_type: WatchType::Exist,
484 watcher: Box::new(watcher),
485 };
486
487 match self
488 .request::<ExistsRequest, ExistsResponse>(OpCode::Exists, self.xid(), req, Some(watch))
489 .await
490 {
491 Ok(response) => Ok(Some(response.stat)),
492 Err(ZkError::NoNode) => Ok(None),
493 Err(e) => Err(e),
494 }
495 }
496
497 /// Return the ACL and `Stat` of the node of the given path.
498 ///
499 /// # Errors
500 /// If no node with the given path exists, `Err(ZkError::NoNode)` will be returned.
501 pub async fn get_acl(&self, path: &str) -> ZkResult<(Vec<Acl>, Stat)> {
502 trace!("ZooKeeper::get_acl");
503 let req = GetAclRequest {
504 path: self.path(path)?,
505 };
506
507 let response: GetAclResponse = self.request(OpCode::GetAcl, self.xid(), req, None).await?;
508
509 Ok(response.acl_stat)
510 }
511
512 /// Set the ACL for the node of the given path if such a node exists and the given version
513 /// matches the version of the node. Return the `Stat` of the node.
514 ///
515 /// # Errors
516 /// If no node with the given path exists, `Err(ZkError::NoNode)` will be returned.
517 ///
518 /// If the given version does not match the node's version, `Err(ZkError::BadVersion)` will be
519 /// returned.
520 pub async fn set_acl(&self, path: &str, acl: Vec<Acl>, version: Option<i32>) -> ZkResult<Stat> {
521 trace!("ZooKeeper::set_acl");
522 let req = SetAclRequest {
523 path: self.path(path)?,
524 acl,
525 version: version.unwrap_or(-1),
526 };
527
528 let response: SetAclResponse = self.request(OpCode::SetAcl, self.xid(), req, None).await?;
529
530 Ok(response.stat)
531 }
532
533 /// Return the list of the children of the node of the given `path`. The returned values are not
534 /// prefixed with the provided `path`; i.e. if the database contains `/path/a` and `/path/b`,
535 /// the result of `get_children` for `"/path"` will be `["a", "b"]`.
536 ///
537 /// If the `watch` is `true` and the call is successful (no error is returned), a watch will be
538 /// left on the node with the given path. The watch will be triggered by a successful operation
539 /// that deletes the node of the given path or creates/delete a child under the node.
540 ///
541 /// The list of children returned is not sorted and no guarantee is provided as to its natural
542 /// or lexical order.
543 ///
544 /// # Errors
545 /// If no node with the given path exists, `Err(ZkError::NoNode)` will be returned.
546 pub async fn get_children(&self, path: &str, watch: bool) -> ZkResult<Vec<String>> {
547 trace!("ZooKeeper::get_children");
548 let req = GetChildrenRequest {
549 path: self.path(path)?,
550 watch,
551 };
552
553 let response: GetChildrenResponse = self
554 .request(OpCode::GetChildren, self.xid(), req, None)
555 .await?;
556
557 Ok(response.children)
558 }
559
560 /// Return the list of the children of the node of the given `path`.
561 ///
562 /// Similar to `get_children`, but sets an explicit watcher instead of relying on the client's
563 /// base `Watcher`.
564 pub async fn get_children_w<W: FnOnce(WatchedEvent) + Send + 'static>(
565 &self,
566 path: &str,
567 watcher: W,
568 ) -> ZkResult<Vec<String>> {
569 trace!("ZooKeeper::get_children_w");
570 let req = GetChildrenRequest {
571 path: self.path(path)?,
572 watch: true,
573 };
574
575 let watch = Watch {
576 path: path.to_owned(),
577 watch_type: WatchType::Child,
578 watcher: Box::new(watcher),
579 };
580
581 let response: GetChildrenResponse = self
582 .request(OpCode::GetChildren, self.xid(), req, Some(watch))
583 .await?;
584
585 Ok(response.children)
586 }
587
588 /// Return the data and the `Stat` of the node of the given path.
589 ///
590 /// If `watch` is `true` and the call is successful (no error is returned), a watch will be left
591 /// on the node with the given path. The watch will be triggered by a successful operation that
592 /// sets data on the node, or deletes the node.
593 ///
594 /// # Errors
595 /// If no node with the given path exists, `Err(ZkError::NoNode)` will be returned.
596 pub async fn get_data(&self, path: &str, watch: bool) -> ZkResult<(Vec<u8>, Stat)> {
597 trace!("ZooKeeper::get_data");
598 let req = GetDataRequest {
599 path: self.path(path)?,
600 watch,
601 };
602
603 let response: GetDataResponse =
604 self.request(OpCode::GetData, self.xid(), req, None).await?;
605
606 Ok(response.data_stat)
607 }
608
609 /// Return the data and the `Stat` of the node of the given path.
610 ///
611 /// Similar to `get_data`, but sets an explicit watcher instead of relying on the client's
612 /// base `Watcher`.
613 pub async fn get_data_w<W: FnOnce(WatchedEvent) + Send + 'static>(
614 &self,
615 path: &str,
616 watcher: W,
617 ) -> ZkResult<(Vec<u8>, Stat)> {
618 trace!("ZooKeeper::get_data_w");
619 let req = GetDataRequest {
620 path: self.path(path)?,
621 watch: true,
622 };
623
624 let watch = Watch {
625 path: path.to_owned(),
626 watch_type: WatchType::Data,
627 watcher: Box::new(watcher),
628 };
629
630 let response: GetDataResponse = self
631 .request(OpCode::GetData, self.xid(), req, Some(watch))
632 .await?;
633
634 Ok(response.data_stat)
635 }
636
637 /// Set the data for the node of the given `path` if such a node exists and the given version
638 /// matches the version of the node (if the given version is `None`, it matches any node's
639 /// versions). Return the `Stat` of the node.
640 ///
641 /// This operation, if successful, will trigger all the watches on the node of the given `path`
642 /// left by `get_data` calls.
643 ///
644 /// # Errors
645 /// If no node with the given `path` exists, `Err(ZkError::NoNode)` will be returned.
646 ///
647 /// If the given version does not match the node's version, `Err(ZkError::BadVersion)` will be
648 /// returned.
649 ///
650 /// The maximum allowable size of the `data` array is 1 MiB (1,048,576 bytes). Arrays larger
651 /// than this will return `Err(ZkError::BadArguments)`.
652 pub async fn set_data(
653 &self,
654 path: &str,
655 data: Vec<u8>,
656 version: Option<i32>,
657 ) -> ZkResult<Stat> {
658 trace!("ZooKeeper::set_data");
659 let req = SetDataRequest {
660 path: self.path(path)?,
661 data,
662 version: version.unwrap_or(-1),
663 };
664
665 let response: SetDataResponse =
666 self.request(OpCode::SetData, self.xid(), req, None).await?;
667
668 Ok(response.stat)
669 }
670
671 /// Adds a state change `Listener`, which will be notified of changes to the client's `ZkState`.
672 /// A unique identifier is returned, which is used in `remove_listener` to un-subscribe.
673 pub fn add_listener<Listener: Fn(ZkState) + Send + 'static>(
674 &self,
675 listener: Listener,
676 ) -> Subscription {
677 trace!("ZooKeeper::add_listener");
678 self.listeners.subscribe(listener)
679 }
680
681 /// Removes a state change `Listener` and closes the channel.
682 pub fn remove_listener(&self, sub: Subscription) {
683 trace!("ZooKeeper::remove_listener");
684 self.listeners.unsubscribe(sub);
685 }
686
687 /// Close this client object. Once the client is closed, its session becomes invalid. All the
688 /// ephemeral nodes in the ZooKeeper server associated with the session will be removed. The
689 /// watches left on those nodes (and on their parents) will be triggered.
690 ///
691 /// **NOTE: Due to missing support for async drop at the moment, dropping self will not call
692 /// close.**
693 pub async fn close(&self) -> ZkResult<()> {
694 trace!("ZooKeeper::close");
695 let _: EmptyResponse = self
696 .request(OpCode::CloseSession, 0, EmptyRequest, None)
697 .await?;
698
699 Ok(())
700 }
701}
702
703#[cfg(test)]
704mod tests {
705 use super::ZooKeeper;
706
707 #[test]
708 fn parse_connect_string() {
709 use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
710
711 let (addrs, chroot) =
712 ZooKeeper::parse_connect_string("127.0.0.1:2181,::1:2181/mesos").expect("Parse 1");
713 assert_eq!(
714 addrs,
715 vec![
716 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 2181)),
717 SocketAddr::V6(SocketAddrV6::new(
718 Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1),
719 2181,
720 0,
721 0
722 ))
723 ]
724 );
725 assert_eq!(chroot, Some("/mesos".to_owned()));
726
727 let (addrs, chroot) = ZooKeeper::parse_connect_string("::1:2181").expect("Parse 2");
728 assert_eq!(
729 addrs,
730 vec![SocketAddr::V6(SocketAddrV6::new(
731 Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1),
732 2181,
733 0,
734 0
735 ))]
736 );
737 assert_eq!(chroot, None);
738
739 let (addrs, chroot) = ZooKeeper::parse_connect_string("::1:2181/").expect("Parse 3");
740 assert_eq!(
741 addrs,
742 vec![SocketAddr::V6(SocketAddrV6::new(
743 Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1),
744 2181,
745 0,
746 0
747 ))]
748 );
749 assert_eq!(chroot, None);
750 }
751
752 #[test]
753 #[should_panic(expected = "BadArguments")]
754 fn parse_connect_string_fails() {
755 // This fails with ZooKeeper.java: Path must not end with / character
756 ZooKeeper::parse_connect_string("127.0.0.1:2181/mesos/").unwrap();
757 }
758}