zookeeper_async/recipes/
distributed_queue.rs1use std::sync::Arc;
2use tracing::*;
3
4use crate::{Acl, CreateMode, ZkError, ZkResult, ZooKeeper};
5
6const ZK_DISTRIBUTED_QUEUE_PREFIX: &str = "qn-";
7
8pub struct DistributedQueue {
13 path: String,
14 zk: Arc<ZooKeeper>,
15}
16
17impl DistributedQueue {
18 pub fn new(path: String, zk: Arc<ZooKeeper>) -> Self {
19 DistributedQueue { path, zk }
20 }
21
22 pub async fn put(&self, data: Vec<u8>) -> ZkResult<String> {
24 self.zk
25 .create(
26 &format!("{}/{}", self.path, ZK_DISTRIBUTED_QUEUE_PREFIX),
27 data,
28 Acl::open_unsafe().clone(),
29 CreateMode::PersistentSequential,
30 )
31 .await
32 }
33
34 async fn claim(&self, key: &str) -> ZkResult<Vec<u8>> {
35 let data = self.zk.get_data(key, false).await?;
36 self.zk.delete(key, None).await.map(move |()| data.0)
37 }
38
39 async fn ordered_children(&self) -> ZkResult<Vec<String>> {
40 let mut children: Vec<(u64, String)> = Vec::new();
41 self.zk
42 .get_children(&self.path, false)
43 .await?
44 .into_iter()
45 .for_each(|child| {
46 if !child.starts_with(ZK_DISTRIBUTED_QUEUE_PREFIX) {
47 warn!("Found child with improper name: {}. Ignoring", child);
48 return;
49 }
50
51 if let Ok(index) = child[ZK_DISTRIBUTED_QUEUE_PREFIX.len()..].parse::<u64>() {
54 children.push((index, child))
55 } else {
56 warn!("Found child with improper index: {}. Ignoring", child);
57 }
58 });
59 children.sort_by(|a, b| a.0.cmp(&b.0));
60
61 Ok(children.into_iter().map(|i| i.1).collect())
62 }
63
64 pub async fn try_take(&self) -> ZkResult<Option<Vec<u8>>> {
66 let children = self.ordered_children().await?;
67
68 if let Some(child) = children.get(0) {
69 match self.claim(&format!("{}/{}", self.path, child)).await {
70 Err(e) if e == ZkError::NoNode => Ok(None),
73 Err(e) => Err(e),
75 Ok(claim) => Ok(Some(claim)),
76 }
77 } else {
78 Ok(None)
79 }
80 }
81
82 pub async fn peek(&self) -> ZkResult<Option<Vec<u8>>> {
85 let children = self.ordered_children().await?;
86 Ok(match children.get(0) {
87 Some(child) => Some(
88 self.zk
89 .get_data(&format!("{}/{}", self.path, child), false)
90 .await?
91 .0,
92 ),
93 None => None,
94 })
95 }
96}