zookeeper_async/recipes/
distributed_queue.rs

1use std::sync::Arc;
2use tracing::*;
3
4use crate::{Acl, CreateMode, ZkError, ZkResult, ZooKeeper};
5
6const ZK_DISTRIBUTED_QUEUE_PREFIX: &str = "qn-";
7
8/// An implementation of the Distributed Queue ZK recipe. Items put into the queue are guaranteed to
9/// be ordered (by means of ZK's PERSISTENTSEQUENTIAL node). If a single consumer takes items out of
10/// the queue, they will be ordered FIFO. If ordering is important, use a LeaderSelector to nominate
11/// a single consumer.
12pub struct DistributedQueue {
13    path: String,
14    zk: Arc<ZooKeeper>,
15}
16
17impl DistributedQueue {
18    pub fn new(path: String, zk: Arc<ZooKeeper>) -> Self {
19        DistributedQueue { path, zk }
20    }
21
22    /// Inserts data into the queue.
23    pub async fn put(&self, data: Vec<u8>) -> ZkResult<String> {
24        self.zk
25            .create(
26                &format!("{}/{}", self.path, ZK_DISTRIBUTED_QUEUE_PREFIX),
27                data,
28                Acl::open_unsafe().clone(),
29                CreateMode::PersistentSequential,
30            )
31            .await
32    }
33
34    async fn claim(&self, key: &str) -> ZkResult<Vec<u8>> {
35        let data = self.zk.get_data(key, false).await?;
36        self.zk.delete(key, None).await.map(move |()| data.0)
37    }
38
39    async fn ordered_children(&self) -> ZkResult<Vec<String>> {
40        let mut children: Vec<(u64, String)> = Vec::new();
41        self.zk
42            .get_children(&self.path, false)
43            .await?
44            .into_iter()
45            .for_each(|child| {
46                if !child.starts_with(ZK_DISTRIBUTED_QUEUE_PREFIX) {
47                    warn!("Found child with improper name: {}. Ignoring", child);
48                    return;
49                }
50
51                // the child names will be like qn-0000001. chop off the prefix, and try and convert the
52                // rest to a u64. if it fails, let's ignore it and move on
53                if let Ok(index) = child[ZK_DISTRIBUTED_QUEUE_PREFIX.len()..].parse::<u64>() {
54                    children.push((index, child))
55                } else {
56                    warn!("Found child with improper index: {}. Ignoring", child);
57                }
58            });
59        children.sort_by(|a, b| a.0.cmp(&b.0));
60
61        Ok(children.into_iter().map(|i| i.1).collect())
62    }
63
64    /// Try to take the first item, if available.
65    pub async fn try_take(&self) -> ZkResult<Option<Vec<u8>>> {
66        let children = self.ordered_children().await?;
67
68        if let Some(child) = children.get(0) {
69            match self.claim(&format!("{}/{}", self.path, child)).await {
70                // if the claim fails because the requested znode has been deleted, assume
71                // someone else claimed it and try again
72                Err(e) if e == ZkError::NoNode => Ok(None),
73                // any other error should be passed up
74                Err(e) => Err(e),
75                Ok(claim) => Ok(Some(claim)),
76            }
77        } else {
78            Ok(None)
79        }
80    }
81
82    /// Returns the data at the first element of the queue, or Ok(None) if the queue is empty.
83    /// Note: peeking does not claim ownership of the element in the queue.
84    pub async fn peek(&self) -> ZkResult<Option<Vec<u8>>> {
85        let children = self.ordered_children().await?;
86        Ok(match children.get(0) {
87            Some(child) => Some(
88                self.zk
89                    .get_data(&format!("{}/{}", self.path, child), false)
90                    .await?
91                    .0,
92            ),
93            None => None,
94        })
95    }
96}