zookeeper_async/
zookeeper_ext.rs

1use async_trait::async_trait;
2use std::collections::VecDeque;
3use std::iter::once;
4use tracing::*;
5
6use crate::{Acl, CreateMode, ZkError, ZkResult, ZooKeeper};
7
8/// Extended ZooKeeper operations that are not needed for the "core."
9#[async_trait]
10pub trait ZooKeeperExt {
11    /// Ensure that `path` exists and create all potential paths leading up to it if it does not.
12    /// This operates in a manner similar to `mkdir -p`.
13    async fn ensure_path(&self, path: &str) -> ZkResult<()>;
14
15    /// Ensures path like `ensure_path`, but with the last node created with `mode`.
16    async fn ensure_path_with_leaf_mode(&self, path: &str, mode: CreateMode) -> ZkResult<()>;
17
18    /// Performs a breadth-first tree traversal of the tree starting at `path`,
19    /// returning a list of fully prefixed child nodes.
20    /// *NOTE*: This is not an atomic operation.
21    async fn get_children_recursive(&self, path: &str) -> ZkResult<Vec<String>>;
22
23    /// Deletes the node at `path` and all its children.
24    /// *NOTE*: This is not an atomic operation.
25    async fn delete_recursive(&self, path: &str) -> ZkResult<()>;
26}
27
28#[async_trait]
29impl ZooKeeperExt for ZooKeeper {
30    async fn ensure_path(&self, path: &str) -> ZkResult<()> {
31        trace!("ensure_path {}", path);
32        for (i, _) in path
33            .chars()
34            .chain(once('/'))
35            .enumerate()
36            .skip(1)
37            .filter(|c| c.1 == '/')
38        {
39            match self
40                .create(
41                    &path[..i],
42                    vec![],
43                    Acl::open_unsafe().clone(),
44                    CreateMode::Persistent,
45                )
46                .await
47            {
48                Ok(_) | Err(ZkError::NodeExists) => {}
49                Err(e) => return Err(e),
50            }
51        }
52
53        Ok(())
54    }
55
56    async fn ensure_path_with_leaf_mode(&self, path: &str, mode: CreateMode) -> ZkResult<()> {
57        trace!("ensure_path_with_leaf_mode {}", path);
58        let path_len = path.len();
59        for (i, _) in path
60            .chars()
61            .chain(once('/'))
62            .enumerate()
63            .skip(1)
64            .filter(|c| c.1 == '/')
65        {
66            match self
67                .create(
68                    &path[..i],
69                    vec![],
70                    Acl::open_unsafe().clone(),
71                    if i == path_len {
72                        mode
73                    } else {
74                        CreateMode::Persistent
75                    },
76                )
77                .await
78            {
79                Ok(_) | Err(ZkError::NodeExists) => {}
80                Err(e) => return Err(e),
81            }
82        }
83
84        Ok(())
85    }
86
87    async fn get_children_recursive(&self, path: &str) -> ZkResult<Vec<String>> {
88        trace!("get_children_recursive {}", path);
89        let mut queue: VecDeque<String> = VecDeque::new();
90        let mut result = vec![path.to_string()];
91        queue.push_front(path.to_string());
92
93        while let Some(current) = queue.pop_front() {
94            let children = self.get_children(&current, false).await?;
95            children
96                .into_iter()
97                .map(|child| format!("{}/{}", current, child))
98                .for_each(|full_path| {
99                    result.push(full_path.clone());
100                    queue.push_back(full_path);
101                });
102        }
103
104        Ok(result)
105    }
106
107    async fn delete_recursive(&self, path: &str) -> ZkResult<()> {
108        trace!("delete_recursive {}", path);
109        let children = self.get_children_recursive(path).await?;
110        for child in children.iter().rev() {
111            self.delete(child, None).await?;
112        }
113
114        Ok(())
115    }
116}