zookeeper_async/recipes/
shared_lock.rs

1use std::sync::{Arc, Mutex, MutexGuard};
2
3use tokio::sync::oneshot;
4use tracing::*;
5use uuid::Uuid;
6
7use crate::{Acl, CreateMode, ZkError, ZkResult, ZooKeeper, ZooKeeperExt};
8
9/// An RAII implementation of a "scoped lock" for a ZooKeeper distributed lock. When this structure
10/// is dropped (falls out of scope), the lock will be unlocked.
11pub struct LockGuard {
12    // we need a sync mutex, because we don't have async drop yet
13    state: Mutex<LockGuardState>,
14}
15
16struct LockGuardState {
17    id: Option<String>,
18    prefix: Option<String>,
19    path: String,
20    zookeeper: Arc<ZooKeeper>,
21    acquired: bool,
22}
23
24impl Drop for LockGuard {
25    fn drop(&mut self) {
26        let mut state = self.state();
27        if !state.acquired {
28            return;
29        }
30
31        if let Some(id) = state.id.take() {
32            let zookeeper = state.zookeeper.clone();
33            let path = state.path.clone();
34            tokio::spawn(async move {
35                debug!("Removing lock: {}/{}", path, id);
36
37                if let Err(error) = zookeeper.delete(&format!("{}/{}", path, id), None).await {
38                    panic!("Couldn't remove lock {}/{}: {}", path, id, error);
39                }
40            });
41        }
42    }
43}
44
45impl LockGuard {
46    fn new(path: String, zookeeper: Arc<ZooKeeper>) -> Self {
47        LockGuard {
48            state: Mutex::new(LockGuardState {
49                id: None,
50                prefix: None,
51                path,
52                zookeeper,
53                acquired: false,
54            }),
55        }
56    }
57
58    fn state(&self) -> MutexGuard<LockGuardState> {
59        self.state.lock().expect("Error acquiring state mutex")
60    }
61
62    async fn try_lock(self: Arc<LockGuard>) -> ZkResult<()> {
63        loop {
64            let no_id = self.state().id.is_none();
65            if no_id {
66                let prefix = Uuid::new_v4().to_string();
67
68                let (path, zookeeper) = {
69                    let state = self.state();
70                    debug!("Creating a lock in {} with prefix {}.", state.path, prefix);
71
72                    (
73                        format!("{}/{}_", state.path, prefix),
74                        state.zookeeper.clone(),
75                    )
76                };
77
78                let id = zookeeper
79                    .create(
80                        &path,
81                        Vec::new(),
82                        Acl::read_unsafe().clone(),
83                        CreateMode::EphemeralSequential,
84                    )
85                    .await?;
86
87                debug!("Resulting path: {}", id);
88
89                let id = &id[id.rfind('/').expect("Missing last path separator!") + 1..];
90
91                let mut state = self.state();
92                state.id = Some(id.into());
93                state.prefix = Some(prefix);
94            }
95
96            let (path, prefix, zookeeper) = {
97                let state = self.state();
98                (
99                    state.path.clone(),
100                    state.prefix.as_ref().cloned().unwrap(),
101                    state.zookeeper.clone(),
102                )
103            };
104
105            let nodes = zookeeper.get_children(&path, false).await?;
106
107            if nodes.is_empty() {
108                warn!("No lock node after creation - recreating.");
109
110                let mut state = self.state();
111                state.id = None;
112                state.prefix = None;
113                continue;
114            }
115
116            let mut nodes = nodes
117                .into_iter()
118                .map(|node| (node.clone(), node.rfind('_')))
119                .filter_map(|node| {
120                    let (left, right) = node.0.split_at(node.1?);
121                    Some((String::from(left), String::from(right)))
122                })
123                .collect::<Vec<_>>();
124
125            if nodes.is_empty() {
126                warn!("Couldn't find lock nodes - recreating.");
127
128                let mut state = self.state();
129                state.id = None;
130                state.prefix = None;
131                continue;
132            }
133
134            nodes.sort_unstable_by(|a, b| a.1.cmp(&b.1));
135
136            if nodes[0].0 == *prefix {
137                break;
138            }
139
140            let id_position = nodes.binary_search_by(|node| node.0.cmp(&prefix)).unwrap();
141            let previous = &nodes[id_position - 1];
142
143            let path = {
144                let state = self.state();
145                debug!(
146                    "Watching previous node to {}: {}{}",
147                    state.id.as_ref().unwrap(),
148                    previous.0,
149                    previous.1
150                );
151
152                state.path.clone()
153            };
154
155            let previous_path = format!("{}/{}{}", path, previous.0, previous.1);
156
157            let (tx, rx) = oneshot::channel();
158            let stat = zookeeper
159                .exists_w(&previous_path, move |_| {
160                    if tx.send(()).is_err() {
161                        panic!("Error sending lock notification!");
162                    }
163                })
164                .await?;
165
166            if stat.is_some() && rx.await.is_err() {
167                return Err(ZkError::ConnectionLoss);
168            }
169        }
170
171        let mut state = self.state();
172        state.acquired = true;
173
174        Ok(())
175    }
176}
177
178/// Fully distributed [locks](https://curator.apache.org/curator-recipes/shared-lock.html) that are
179/// globally synchronous, meaning at any snapshot in time no two clients think they hold the same
180/// lock.
181pub async fn lock(zookeeper: Arc<ZooKeeper>, path: String) -> ZkResult<Arc<LockGuard>> {
182    let path_copy = path.clone();
183
184    zookeeper
185        .ensure_path_with_leaf_mode(&path_copy, CreateMode::Container)
186        .await?;
187
188    let guard = Arc::new(LockGuard::new(path, zookeeper));
189    guard.clone().try_lock().await?;
190
191    Ok(guard)
192}