zookeeper_async/
zookeeper_ext.rs1use async_trait::async_trait;
2use std::collections::VecDeque;
3use std::iter::once;
4use tracing::*;
5
6use crate::{Acl, CreateMode, ZkError, ZkResult, ZooKeeper};
7
8#[async_trait]
10pub trait ZooKeeperExt {
11 async fn ensure_path(&self, path: &str) -> ZkResult<()>;
14
15 async fn ensure_path_with_leaf_mode(&self, path: &str, mode: CreateMode) -> ZkResult<()>;
17
18 async fn get_children_recursive(&self, path: &str) -> ZkResult<Vec<String>>;
22
23 async fn delete_recursive(&self, path: &str) -> ZkResult<()>;
26}
27
28#[async_trait]
29impl ZooKeeperExt for ZooKeeper {
30 async fn ensure_path(&self, path: &str) -> ZkResult<()> {
31 trace!("ensure_path {}", path);
32 for (i, _) in path
33 .chars()
34 .chain(once('/'))
35 .enumerate()
36 .skip(1)
37 .filter(|c| c.1 == '/')
38 {
39 match self
40 .create(
41 &path[..i],
42 vec![],
43 Acl::open_unsafe().clone(),
44 CreateMode::Persistent,
45 )
46 .await
47 {
48 Ok(_) | Err(ZkError::NodeExists) => {}
49 Err(e) => return Err(e),
50 }
51 }
52
53 Ok(())
54 }
55
56 async fn ensure_path_with_leaf_mode(&self, path: &str, mode: CreateMode) -> ZkResult<()> {
57 trace!("ensure_path_with_leaf_mode {}", path);
58 let path_len = path.len();
59 for (i, _) in path
60 .chars()
61 .chain(once('/'))
62 .enumerate()
63 .skip(1)
64 .filter(|c| c.1 == '/')
65 {
66 match self
67 .create(
68 &path[..i],
69 vec![],
70 Acl::open_unsafe().clone(),
71 if i == path_len {
72 mode
73 } else {
74 CreateMode::Persistent
75 },
76 )
77 .await
78 {
79 Ok(_) | Err(ZkError::NodeExists) => {}
80 Err(e) => return Err(e),
81 }
82 }
83
84 Ok(())
85 }
86
87 async fn get_children_recursive(&self, path: &str) -> ZkResult<Vec<String>> {
88 trace!("get_children_recursive {}", path);
89 let mut queue: VecDeque<String> = VecDeque::new();
90 let mut result = vec![path.to_string()];
91 queue.push_front(path.to_string());
92
93 while let Some(current) = queue.pop_front() {
94 let children = self.get_children(¤t, false).await?;
95 children
96 .into_iter()
97 .map(|child| format!("{}/{}", current, child))
98 .for_each(|full_path| {
99 result.push(full_path.clone());
100 queue.push_back(full_path);
101 });
102 }
103
104 Ok(result)
105 }
106
107 async fn delete_recursive(&self, path: &str) -> ZkResult<()> {
108 trace!("delete_recursive {}", path);
109 let children = self.get_children_recursive(path).await?;
110 for child in children.iter().rev() {
111 self.delete(child, None).await?;
112 }
113
114 Ok(())
115 }
116}