zookeeper_zk/recipes/
queue.rs1use crate::{
4 Acl, CreateMode, WatchedEvent, WatchedEventType, ZkError, ZkResult, ZooKeeper, watch::Watcher
5};
6use std::sync::mpsc::{SyncSender, Receiver, sync_channel};
7use std::sync::Arc;
8
9const ZK_DISTRIBUTEDQUEUE_PREFIX: &str = "qn-";
12
13pub struct ZkQueue {
14 dir: String,
15 zk: Arc<ZooKeeper>,
16}
17impl ZkQueue {
18 pub fn new(zk: Arc<ZooKeeper>, dir: String) -> ZkResult<Self> {
19 if zk.exists(&dir, false)?.is_none() {
20 let _ = zk.create(&dir, vec![0], Acl::open_unsafe().clone(), CreateMode::Container)?;
21 }
22 Ok(Self {
23 zk,
24 dir
25 })
26 }
27
28 pub fn offer(&self, data: Vec<u8>) -> ZkResult<String> {
30 self.zk.create(
31 &format!("{}/{}", self.dir, ZK_DISTRIBUTEDQUEUE_PREFIX),
32 data,
33 Acl::open_unsafe().clone(),
34 CreateMode::PersistentSequential)
35 }
36
37 fn claim(&self, key: String) -> ZkResult<Vec<u8>> {
42 let data = self.zk.get_data(&key, false)?;
43 self.zk.delete(&key, None)?;
44 Ok(data.0)
45 }
46
47 fn ordered_children<W: Watcher + 'static>(&self, watcher: Option<W>) -> ZkResult<Vec<String>> {
49 let mut children: Vec<(u64, String)> = Vec::new();
50 match watcher {
51 Some(w) => self.zk.get_children_w(&self.dir, w),
52 None => self.zk.get_children(&self.dir, false) }?.iter().for_each(|child| {
54 if let Ok(index) = child.replace(ZK_DISTRIBUTEDQUEUE_PREFIX, "").parse::<u64>() {
57 children.push((index, child.clone()))
58 } else {
59 warn!("found child with improper name: {}. ignoring", child);
60 }
61 });
62 children.sort_by(|a, b| a.0.cmp(&b.0));
63
64 Ok(children.iter().map(|i| i.1.clone()).collect())
65 }
66
67 pub fn take(&self) -> ZkResult<Vec<u8>> {
69 let latch: (SyncSender<bool>, Receiver<bool>) = sync_channel(1);
72 loop {
73 let tx = latch.0.clone();
74 let op = self.ordered_children(Some(move |ev| {
75 handle_znode_change(&tx, ev)
76 }))?;
77
78 if !op.is_empty() {
80 return match self.claim(format!("{}/{}", self.dir, op[0])) {
81 Err(e) if e == ZkError::NoNode => continue,
84 Err(e) => Err(e),
86
87 Ok(claim) => Ok(claim)
88 };
89 }
90
91 let _ = latch.1.recv().unwrap();
93 }
94 }
95
96 pub fn peek(&self) -> ZkResult<Option<Vec<u8>>> {
98 let op = self.ordered_children(Some(|_|{}))?;
99 Ok(match op.is_empty() {
100 false => Some(self.zk.get_data(&format!("{}/{}", self.dir, op[0]), false)?.0),
101 true => None
102 })
103 }
104
105 pub fn poll(&self) -> ZkResult<Option<Vec<u8>>> {
107 let op = self.ordered_children(Some(|_|{}))?;
108 if !op.is_empty() {
109 return match self.claim(format!("{}/{}", self.dir, op[0])) {
110 Err(e) if e == ZkError::NoNode => Ok(None),
111 Err(e) => Err(e),
112 Ok(claim) => Ok(Some(claim))
113 };
114 }
115 Ok(None)
116 }
117
118}
119
120fn handle_znode_change(chan: &SyncSender<bool>, ev: WatchedEvent) {
121 if let WatchedEventType::NodeChildrenChanged = ev.event_type {
122 let _ = chan.send(true);
123 }
124}