1use std::{
2 cmp,
3 sync::{
4 atomic::{self, AtomicBool, AtomicU8},
5 Arc,
6 },
7};
8
9use futures::future::BoxFuture;
10use futures::FutureExt;
11use tokio::sync::Mutex;
12
13use crate::{
14 paths, Acl, CreateMode, Subscription, WatchedEvent, WatchedEventType, ZkError, ZkResult,
15 ZkState, ZooKeeper, ZooKeeperExt,
16};
17use tracing::*;
18
19const LATCH_PREFIX: &str = "latch";
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22enum State {
23 Latent = 0,
24 Started = 1,
25 Closed = 2,
26}
27
28impl PartialEq<u8> for State {
29 fn eq(&self, other: &u8) -> bool {
30 self == &State::from(*other)
31 }
32}
33
34impl From<u8> for State {
35 fn from(value: u8) -> Self {
36 match value {
37 0 => State::Latent,
38 1 => State::Started,
39 2 => State::Closed,
40 _ => unreachable!(),
41 }
42 }
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46struct ZNode {
47 path: String,
48 seqn: usize,
49}
50
51impl ZNode {
52 pub fn with_parent(parent_path: &str, path: &str) -> Option<Self> {
53 let seqn = path.split('-').last()?.parse().ok()?;
54 Some(Self {
55 path: paths::make_path(parent_path, path),
56 seqn,
57 })
58 }
59
60 pub fn creation_path(parent_path: &str, id: &str) -> String {
61 paths::make_path(parent_path, &format!("{}-{}-", LATCH_PREFIX, id))
62 }
63}
64
65impl Ord for ZNode {
66 fn cmp(&self, other: &Self) -> cmp::Ordering {
67 self.seqn.cmp(&other.seqn)
68 }
69}
70
71impl PartialOrd for ZNode {
72 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
73 Some(self.cmp(other))
74 }
75}
76
77#[derive(Clone)]
78pub struct LeaderLatch {
79 zk: Arc<ZooKeeper>,
80 id: String,
81 parent_path: String,
82 path: Arc<Mutex<Option<String>>>,
83 state: Arc<AtomicU8>,
84 subscription: Arc<Mutex<Option<Subscription>>>,
85 has_leadership: Arc<AtomicBool>,
86}
87
88impl LeaderLatch {
89 pub fn new(zk: Arc<ZooKeeper>, id: String, parent_path: String) -> Self {
90 Self {
91 zk,
92 id,
93 parent_path,
94 path: Arc::default(),
95 state: Arc::new(AtomicU8::new(State::Latent as u8)),
96 subscription: Arc::default(),
97 has_leadership: Arc::default(),
98 }
99 }
100
101 pub async fn start(&self) -> ZkResult<()> {
102 let prev_state = self.set_state(State::Latent, State::Started);
103 if prev_state != State::Latent {
104 panic!("cannot start leader latch in state: {:?}", prev_state);
105 }
106
107 let latch = self.clone();
108 let subscription = self
109 .zk
110 .add_listener(move |x| handle_state_change(&latch, x));
111 *self.subscription.lock().await = Some(subscription);
112 self.reset().await
113 }
114
115 fn reset(&self) -> BoxFuture<'_, ZkResult<()>> {
116 async move {
117 self.set_leadership(false);
118 self.set_path(None).await?;
119
120 let path = create_latch_znode(self, &self.parent_path, &self.id).await?;
121 self.set_path(Some(path)).await?;
122
123 self.check_leadership().await
124 }
125 .boxed()
126 }
127
128 fn check_leadership(&self) -> BoxFuture<'_, ZkResult<()>> {
129 async move {
130 let znodes = get_latch_znodes(&self.zk, &self.parent_path).await?;
131 if let Some(path) = &*self.path.lock().await {
132 match znodes.iter().position(|znode| &znode.path == path) {
133 Some(0) => {
134 self.set_leadership(true);
135 }
136 Some(index) => {
137 let latch = self.clone();
138 self.zk
139 .exists_w(&znodes[index - 1].path, move |ev| {
140 tokio::spawn(async move {
141 handle_znode_change(&latch, ev).await;
142 });
143 })
144 .await?;
145 self.set_leadership(false);
146 }
147 None => {
148 error!("cannot find znode: {:?}", path);
149 self.reset().await?;
150 }
151 }
152 }
153 Ok(())
154 }
155 .boxed()
156 }
157
158 pub async fn stop(&self) -> ZkResult<()> {
159 let prev_state = self.set_state(State::Started, State::Closed);
160 if prev_state != State::Started {
161 panic!("cannot close leader latch in state: {:?}", self.state);
162 }
163
164 self.set_path(None).await?;
165 self.set_leadership(false);
166
167 let subscription = &mut *self.subscription.lock().await;
168 if let Some(sub) = subscription.take() {
169 self.zk.remove_listener(sub);
170 }
171 Ok(())
172 }
173
174 pub fn id(&self) -> &str {
175 &self.id
176 }
177
178 pub async fn path(&self) -> Option<String> {
179 self.path.lock().await.clone()
180 }
181
182 pub fn has_leadership(&self) -> bool {
183 State::Started == self.state.load(atomic::Ordering::SeqCst)
184 && self.has_leadership.load(atomic::Ordering::SeqCst)
185 }
186
187 fn set_leadership(&self, value: bool) {
188 self.has_leadership.store(value, atomic::Ordering::SeqCst);
189 }
190
191 async fn set_path(&self, value: Option<String>) -> ZkResult<()> {
192 let path = &mut *self.path.lock().await;
193 if let Some(old_path) = path {
194 match self.zk.delete(old_path, None).await {
195 Ok(()) | Err(ZkError::NoNode) => Ok(()),
196 Err(e) => Err(e),
197 }?;
198 }
199 *path = value;
200 Ok(())
201 }
202
203 fn set_state(&self, cur: State, new: State) -> State {
204 State::from(
205 self.state
206 .compare_and_swap(cur as u8, new as u8, atomic::Ordering::SeqCst),
207 )
208 }
209}
210
211async fn create_latch_znode(ll: &LeaderLatch, parent_path: &str, id: &str) -> ZkResult<String> {
212 ll.zk
213 .ensure_path_with_leaf_mode(parent_path, CreateMode::Container)
214 .await?;
215
216 let zrsp = ll
217 .zk
218 .create(
219 &ZNode::creation_path(parent_path, id),
220 vec![],
221 Acl::open_unsafe().clone(),
222 CreateMode::EphemeralSequential,
223 )
224 .await?;
225
226 let latch = ll.clone();
228 ll.zk
229 .exists_w(&zrsp, move |ev| {
230 tokio::spawn(async move {
231 handle_znode_change(&latch, ev).await;
232 });
233 })
234 .await?;
235 Ok(zrsp)
236}
237
238async fn get_latch_znodes(zk: &ZooKeeper, parent_path: &str) -> ZkResult<Vec<ZNode>> {
239 let znodes = zk.get_children(parent_path, false).await?;
240 let mut latch_znodes: Vec<_> = znodes
241 .into_iter()
242 .filter_map(|path| ZNode::with_parent(parent_path, &path))
243 .collect();
244 latch_znodes.sort();
245 Ok(latch_znodes)
246}
247
248async fn handle_znode_change(latch: &LeaderLatch, ev: WatchedEvent) {
249 if let WatchedEventType::NodeDeleted = ev.event_type {
250 if let Err(e) = latch.check_leadership().await {
251 error!("failed to check for leadership: {:?}", e);
252 latch.set_leadership(false);
253 }
254 }
255}
256
257fn handle_state_change(latch: &LeaderLatch, zk_state: ZkState) {
258 if let ZkState::Closed = zk_state {
259 latch.set_leadership(false);
260 }
261}