1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use crate::{
Acl, CreateMode, WatchedEvent, WatchedEventType, ZkError, ZkResult, ZooKeeper, watch::Watcher
};
use std::sync::mpsc::{SyncSender, Receiver, sync_channel};
use std::sync::Arc;
const ZK_DISTRIBUTEDQUEUE_PREFIX: &str = "qn-";
pub struct ZkQueue {
dir: String,
zk: Arc<ZooKeeper>,
}
impl ZkQueue {
pub fn new(zk: Arc<ZooKeeper>, dir: String) -> ZkResult<Self> {
if zk.exists(&dir, false)?.is_none() {
let _ = zk.create(&dir, vec![0], Acl::open_unsafe().clone(), CreateMode::Container)?;
}
Ok(Self {
zk,
dir
})
}
pub fn offer(&self, data: Vec<u8>) -> ZkResult<String> {
self.zk.create(
&*format!("{}/{}", self.dir, ZK_DISTRIBUTEDQUEUE_PREFIX),
data,
Acl::open_unsafe().clone(),
CreateMode::PersistentSequential)
}
fn claim(&self, key: String) -> ZkResult<Vec<u8>> {
let data = self.zk.get_data(&key, false)?;
self.zk.delete(&key, None)?;
Ok(data.0)
}
fn ordered_children<W: Watcher + 'static>(&self, watcher: Option<W>) -> ZkResult<Vec<String>> {
let mut children: Vec<(u64, String)> = Vec::new();
match watcher {
Some(w) => self.zk.get_children_w(&self.dir, w),
None => self.zk.get_children(&self.dir, false) }?.iter().for_each(|child| {
if let Ok(index) = child.replace(ZK_DISTRIBUTEDQUEUE_PREFIX, "").parse::<u64>() {
children.push((index, child.clone()))
} else {
warn!("found child with improper name: {}. ignoring", child);
}
});
children.sort_by(|a, b| a.0.cmp(&b.0));
Ok(children.iter().map(|i| i.1.clone()).collect())
}
pub fn take(&self) -> ZkResult<Vec<u8>> {
let latch: (SyncSender<bool>, Receiver<bool>) = sync_channel(1);
loop {
let tx = latch.0.clone();
let op = self.ordered_children(Some(move |ev| {
handle_znode_change(&tx, ev)
}))?;
if !op.is_empty() {
return match self.claim(format!("{}/{}", self.dir, op[0])) {
Err(e) if e == ZkError::NoNode => continue,
Err(e) => Err(e),
Ok(claim) => Ok(claim)
};
}
let _ = latch.1.recv().unwrap();
}
}
pub fn peek(&self) -> ZkResult<Option<Vec<u8>>> {
let op = self.ordered_children(Some(|_|{}))?;
Ok(match op.is_empty() {
false => Some(self.zk.get_data(&*format!("{}/{}", self.dir, op[0]), false)?.0),
true => None
})
}
pub fn poll(&self) -> ZkResult<Option<Vec<u8>>> {
let op = self.ordered_children(Some(|_|{}))?;
if !op.is_empty() {
return match self.claim(format!("{}/{}", self.dir, op[0])) {
Err(e) if e == ZkError::NoNode => Ok(None),
Err(e) => Err(e),
Ok(claim) => Ok(Some(claim))
};
}
Ok(None)
}
}
fn handle_znode_change(chan: &SyncSender<bool>, ev: WatchedEvent) {
if let WatchedEventType::NodeChildrenChanged = ev.event_type {
let _ = chan.send(true);
}
}