zookeeper_async/recipes/
leader_latch.rs

1use std::{
2    cmp,
3    sync::{
4        atomic::{self, AtomicBool, AtomicU8},
5        Arc,
6    },
7};
8
9use futures::future::BoxFuture;
10use futures::FutureExt;
11use tokio::sync::Mutex;
12
13use crate::{
14    paths, Acl, CreateMode, Subscription, WatchedEvent, WatchedEventType, ZkError, ZkResult,
15    ZkState, ZooKeeper, ZooKeeperExt,
16};
17use tracing::*;
18
19const LATCH_PREFIX: &str = "latch";
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22enum State {
23    Latent = 0,
24    Started = 1,
25    Closed = 2,
26}
27
28impl PartialEq<u8> for State {
29    fn eq(&self, other: &u8) -> bool {
30        self == &State::from(*other)
31    }
32}
33
34impl From<u8> for State {
35    fn from(value: u8) -> Self {
36        match value {
37            0 => State::Latent,
38            1 => State::Started,
39            2 => State::Closed,
40            _ => unreachable!(),
41        }
42    }
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46struct ZNode {
47    path: String,
48    seqn: usize,
49}
50
51impl ZNode {
52    pub fn with_parent(parent_path: &str, path: &str) -> Option<Self> {
53        let seqn = path.split('-').last()?.parse().ok()?;
54        Some(Self {
55            path: paths::make_path(parent_path, path),
56            seqn,
57        })
58    }
59
60    pub fn creation_path(parent_path: &str, id: &str) -> String {
61        paths::make_path(parent_path, &format!("{}-{}-", LATCH_PREFIX, id))
62    }
63}
64
65impl Ord for ZNode {
66    fn cmp(&self, other: &Self) -> cmp::Ordering {
67        self.seqn.cmp(&other.seqn)
68    }
69}
70
71impl PartialOrd for ZNode {
72    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
73        Some(self.cmp(other))
74    }
75}
76
77#[derive(Clone)]
78pub struct LeaderLatch {
79    zk: Arc<ZooKeeper>,
80    id: String,
81    parent_path: String,
82    path: Arc<Mutex<Option<String>>>,
83    state: Arc<AtomicU8>,
84    subscription: Arc<Mutex<Option<Subscription>>>,
85    has_leadership: Arc<AtomicBool>,
86}
87
88impl LeaderLatch {
89    pub fn new(zk: Arc<ZooKeeper>, id: String, parent_path: String) -> Self {
90        Self {
91            zk,
92            id,
93            parent_path,
94            path: Arc::default(),
95            state: Arc::new(AtomicU8::new(State::Latent as u8)),
96            subscription: Arc::default(),
97            has_leadership: Arc::default(),
98        }
99    }
100
101    pub async fn start(&self) -> ZkResult<()> {
102        let prev_state = self.set_state(State::Latent, State::Started);
103        if prev_state != State::Latent {
104            panic!("cannot start leader latch in state: {:?}", prev_state);
105        }
106
107        let latch = self.clone();
108        let subscription = self
109            .zk
110            .add_listener(move |x| handle_state_change(&latch, x));
111        *self.subscription.lock().await = Some(subscription);
112        self.reset().await
113    }
114
115    fn reset(&self) -> BoxFuture<'_, ZkResult<()>> {
116        async move {
117            self.set_leadership(false);
118            self.set_path(None).await?;
119
120            let path = create_latch_znode(self, &self.parent_path, &self.id).await?;
121            self.set_path(Some(path)).await?;
122
123            self.check_leadership().await
124        }
125        .boxed()
126    }
127
128    fn check_leadership(&self) -> BoxFuture<'_, ZkResult<()>> {
129        async move {
130            let znodes = get_latch_znodes(&self.zk, &self.parent_path).await?;
131            if let Some(path) = &*self.path.lock().await {
132                match znodes.iter().position(|znode| &znode.path == path) {
133                    Some(0) => {
134                        self.set_leadership(true);
135                    }
136                    Some(index) => {
137                        let latch = self.clone();
138                        self.zk
139                            .exists_w(&znodes[index - 1].path, move |ev| {
140                                tokio::spawn(async move {
141                                    handle_znode_change(&latch, ev).await;
142                                });
143                            })
144                            .await?;
145                        self.set_leadership(false);
146                    }
147                    None => {
148                        error!("cannot find znode: {:?}", path);
149                        self.reset().await?;
150                    }
151                }
152            }
153            Ok(())
154        }
155        .boxed()
156    }
157
158    pub async fn stop(&self) -> ZkResult<()> {
159        let prev_state = self.set_state(State::Started, State::Closed);
160        if prev_state != State::Started {
161            panic!("cannot close leader latch in state: {:?}", self.state);
162        }
163
164        self.set_path(None).await?;
165        self.set_leadership(false);
166
167        let subscription = &mut *self.subscription.lock().await;
168        if let Some(sub) = subscription.take() {
169            self.zk.remove_listener(sub);
170        }
171        Ok(())
172    }
173
174    pub fn id(&self) -> &str {
175        &self.id
176    }
177
178    pub async fn path(&self) -> Option<String> {
179        self.path.lock().await.clone()
180    }
181
182    pub fn has_leadership(&self) -> bool {
183        State::Started == self.state.load(atomic::Ordering::SeqCst)
184            && self.has_leadership.load(atomic::Ordering::SeqCst)
185    }
186
187    fn set_leadership(&self, value: bool) {
188        self.has_leadership.store(value, atomic::Ordering::SeqCst);
189    }
190
191    async fn set_path(&self, value: Option<String>) -> ZkResult<()> {
192        let path = &mut *self.path.lock().await;
193        if let Some(old_path) = path {
194            match self.zk.delete(old_path, None).await {
195                Ok(()) | Err(ZkError::NoNode) => Ok(()),
196                Err(e) => Err(e),
197            }?;
198        }
199        *path = value;
200        Ok(())
201    }
202
203    fn set_state(&self, cur: State, new: State) -> State {
204        State::from(
205            self.state
206                .compare_and_swap(cur as u8, new as u8, atomic::Ordering::SeqCst),
207        )
208    }
209}
210
211async fn create_latch_znode(ll: &LeaderLatch, parent_path: &str, id: &str) -> ZkResult<String> {
212    ll.zk
213        .ensure_path_with_leaf_mode(parent_path, CreateMode::Container)
214        .await?;
215
216    let zrsp = ll
217        .zk
218        .create(
219            &ZNode::creation_path(parent_path, id),
220            vec![],
221            Acl::open_unsafe().clone(),
222            CreateMode::EphemeralSequential,
223        )
224        .await?;
225
226    // add the handle_znode_change to the freshly created znode
227    let latch = ll.clone();
228    ll.zk
229        .exists_w(&zrsp, move |ev| {
230            tokio::spawn(async move {
231                handle_znode_change(&latch, ev).await;
232            });
233        })
234        .await?;
235    Ok(zrsp)
236}
237
238async fn get_latch_znodes(zk: &ZooKeeper, parent_path: &str) -> ZkResult<Vec<ZNode>> {
239    let znodes = zk.get_children(parent_path, false).await?;
240    let mut latch_znodes: Vec<_> = znodes
241        .into_iter()
242        .filter_map(|path| ZNode::with_parent(parent_path, &path))
243        .collect();
244    latch_znodes.sort();
245    Ok(latch_znodes)
246}
247
248async fn handle_znode_change(latch: &LeaderLatch, ev: WatchedEvent) {
249    if let WatchedEventType::NodeDeleted = ev.event_type {
250        if let Err(e) = latch.check_leadership().await {
251            error!("failed to check for leadership: {:?}", e);
252            latch.set_leadership(false);
253        }
254    }
255}
256
257fn handle_state_change(latch: &LeaderLatch, zk_state: ZkState) {
258    if let ZkState::Closed = zk_state {
259        latch.set_leadership(false);
260    }
261}