zookeeper_async/recipes/
shared_lock.rs1use std::sync::{Arc, Mutex, MutexGuard};
2
3use tokio::sync::oneshot;
4use tracing::*;
5use uuid::Uuid;
6
7use crate::{Acl, CreateMode, ZkError, ZkResult, ZooKeeper, ZooKeeperExt};
8
9pub struct LockGuard {
12 state: Mutex<LockGuardState>,
14}
15
16struct LockGuardState {
17 id: Option<String>,
18 prefix: Option<String>,
19 path: String,
20 zookeeper: Arc<ZooKeeper>,
21 acquired: bool,
22}
23
24impl Drop for LockGuard {
25 fn drop(&mut self) {
26 let mut state = self.state();
27 if !state.acquired {
28 return;
29 }
30
31 if let Some(id) = state.id.take() {
32 let zookeeper = state.zookeeper.clone();
33 let path = state.path.clone();
34 tokio::spawn(async move {
35 debug!("Removing lock: {}/{}", path, id);
36
37 if let Err(error) = zookeeper.delete(&format!("{}/{}", path, id), None).await {
38 panic!("Couldn't remove lock {}/{}: {}", path, id, error);
39 }
40 });
41 }
42 }
43}
44
45impl LockGuard {
46 fn new(path: String, zookeeper: Arc<ZooKeeper>) -> Self {
47 LockGuard {
48 state: Mutex::new(LockGuardState {
49 id: None,
50 prefix: None,
51 path,
52 zookeeper,
53 acquired: false,
54 }),
55 }
56 }
57
58 fn state(&self) -> MutexGuard<LockGuardState> {
59 self.state.lock().expect("Error acquiring state mutex")
60 }
61
62 async fn try_lock(self: Arc<LockGuard>) -> ZkResult<()> {
63 loop {
64 let no_id = self.state().id.is_none();
65 if no_id {
66 let prefix = Uuid::new_v4().to_string();
67
68 let (path, zookeeper) = {
69 let state = self.state();
70 debug!("Creating a lock in {} with prefix {}.", state.path, prefix);
71
72 (
73 format!("{}/{}_", state.path, prefix),
74 state.zookeeper.clone(),
75 )
76 };
77
78 let id = zookeeper
79 .create(
80 &path,
81 Vec::new(),
82 Acl::read_unsafe().clone(),
83 CreateMode::EphemeralSequential,
84 )
85 .await?;
86
87 debug!("Resulting path: {}", id);
88
89 let id = &id[id.rfind('/').expect("Missing last path separator!") + 1..];
90
91 let mut state = self.state();
92 state.id = Some(id.into());
93 state.prefix = Some(prefix);
94 }
95
96 let (path, prefix, zookeeper) = {
97 let state = self.state();
98 (
99 state.path.clone(),
100 state.prefix.as_ref().cloned().unwrap(),
101 state.zookeeper.clone(),
102 )
103 };
104
105 let nodes = zookeeper.get_children(&path, false).await?;
106
107 if nodes.is_empty() {
108 warn!("No lock node after creation - recreating.");
109
110 let mut state = self.state();
111 state.id = None;
112 state.prefix = None;
113 continue;
114 }
115
116 let mut nodes = nodes
117 .into_iter()
118 .map(|node| (node.clone(), node.rfind('_')))
119 .filter_map(|node| {
120 let (left, right) = node.0.split_at(node.1?);
121 Some((String::from(left), String::from(right)))
122 })
123 .collect::<Vec<_>>();
124
125 if nodes.is_empty() {
126 warn!("Couldn't find lock nodes - recreating.");
127
128 let mut state = self.state();
129 state.id = None;
130 state.prefix = None;
131 continue;
132 }
133
134 nodes.sort_unstable_by(|a, b| a.1.cmp(&b.1));
135
136 if nodes[0].0 == *prefix {
137 break;
138 }
139
140 let id_position = nodes.binary_search_by(|node| node.0.cmp(&prefix)).unwrap();
141 let previous = &nodes[id_position - 1];
142
143 let path = {
144 let state = self.state();
145 debug!(
146 "Watching previous node to {}: {}{}",
147 state.id.as_ref().unwrap(),
148 previous.0,
149 previous.1
150 );
151
152 state.path.clone()
153 };
154
155 let previous_path = format!("{}/{}{}", path, previous.0, previous.1);
156
157 let (tx, rx) = oneshot::channel();
158 let stat = zookeeper
159 .exists_w(&previous_path, move |_| {
160 if tx.send(()).is_err() {
161 panic!("Error sending lock notification!");
162 }
163 })
164 .await?;
165
166 if stat.is_some() && rx.await.is_err() {
167 return Err(ZkError::ConnectionLoss);
168 }
169 }
170
171 let mut state = self.state();
172 state.acquired = true;
173
174 Ok(())
175 }
176}
177
178pub async fn lock(zookeeper: Arc<ZooKeeper>, path: String) -> ZkResult<Arc<LockGuard>> {
182 let path_copy = path.clone();
183
184 zookeeper
185 .ensure_path_with_leaf_mode(&path_copy, CreateMode::Container)
186 .await?;
187
188 let guard = Arc::new(LockGuard::new(path, zookeeper));
189 guard.clone().try_lock().await?;
190
191 Ok(guard)
192}