zookeeper_zk/recipes/
queue.rs

1/// https://github.com/apache/zookeeper/blob/master/zookeeper-recipes/zookeeper-recipes-queue/src/main/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java
2///
3use crate::{
4    Acl, CreateMode, WatchedEvent, WatchedEventType, ZkError, ZkResult, ZooKeeper, watch::Watcher
5};
6use std::sync::mpsc::{SyncSender, Receiver, sync_channel};
7use std::sync::Arc;
8
9/// The default prefix to use for all children in the znode
10/// should be the same as the example recipe: https://github.com/apache/zookeeper/blob/245ff759b0e9fe0a1815e03433306ac805bf5e95/zookeeper-recipes/zookeeper-recipes-queue/src/main/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java#L48
11const ZK_DISTRIBUTEDQUEUE_PREFIX: &str = "qn-";
12
13pub struct ZkQueue {
14    dir: String,
15    zk: Arc<ZooKeeper>,
16}
17impl ZkQueue {
18    pub fn new(zk: Arc<ZooKeeper>, dir: String) -> ZkResult<Self> {
19        if zk.exists(&dir, false)?.is_none() {
20            let _ = zk.create(&dir, vec![0], Acl::open_unsafe().clone(), CreateMode::Container)?;
21        }
22        Ok(Self {
23            zk,
24            dir
25        })
26    }
27
28    /// Inserts data into the queue
29    pub fn offer(&self, data: Vec<u8>) -> ZkResult<String> {
30        self.zk.create(
31            &format!("{}/{}", self.dir, ZK_DISTRIBUTEDQUEUE_PREFIX),
32            data,
33            Acl::open_unsafe().clone(),
34            CreateMode::PersistentSequential)
35    }
36
37    /// Claim a item from the queue. gets the contents of the znode, and then delete it.
38    ///
39    /// NOTE. There is a small chance that another client could execute getData before this client
40    /// deletes the znode. If this is an issue, a LeaderLatch would need to be implemented
41    fn claim(&self, key: String) -> ZkResult<Vec<u8>> {
42        let data = self.zk.get_data(&key, false)?;
43        self.zk.delete(&key, None)?;
44        Ok(data.0)
45    }
46
47    /// Returns a Vec of the children, in order, of the task znode
48    fn ordered_children<W: Watcher + 'static>(&self, watcher: Option<W>) -> ZkResult<Vec<String>> {
49        let mut children: Vec<(u64, String)> = Vec::new();
50        match watcher {
51            Some(w) => self.zk.get_children_w(&self.dir, w),
52            None => self.zk.get_children(&self.dir, false) // false I think?
53        }?.iter().for_each(|child| {
54            // the child names will be like qn-0000001. chop off the prefix, and try and convert the
55            // rest to a u64. if it fails, let's ignore it and move on
56            if let Ok(index) = child.replace(ZK_DISTRIBUTEDQUEUE_PREFIX, "").parse::<u64>() {
57                children.push((index, child.clone()))
58            } else {
59                warn!("found child with improper name: {}. ignoring", child);
60            }
61        });
62        children.sort_by(|a, b| a.0.cmp(&b.0));
63
64        Ok(children.iter().map(|i| i.1.clone()).collect())
65    }
66
67    /// Removes the head of the queue and returns it, blocking until it succeeds or throws an error
68    pub fn take(&self) -> ZkResult<Vec<u8>> {
69        // create a channel with a capacity of 1 to act as a latch if there are not messages in the
70        // queue.
71        let latch: (SyncSender<bool>, Receiver<bool>) = sync_channel(1);
72        loop {
73            let tx = latch.0.clone();
74            let op = self.ordered_children(Some(move |ev| {
75                handle_znode_change(&tx, ev)
76            }))?;
77
78            // if self.ordered_children returned something, let's try and claim it
79            if !op.is_empty() {
80                return match self.claim(format!("{}/{}", self.dir, op[0])) {
81                    // if the claim fails because the requested znode has been deleted, assume
82                    // someone else claimed it and try again
83                    Err(e) if e == ZkError::NoNode => continue,
84                    // any other error should be passed up
85                    Err(e) => Err(e),
86
87                    Ok(claim) => Ok(claim)
88                };
89            }
90
91            // otherwise, wait until the handler is called and try this again
92            let _ = latch.1.recv().unwrap();
93        }
94    }
95
96    /// Returns the data at the first element of the queue, or Ok(None) if the queue is empty.
97    pub fn peek(&self) -> ZkResult<Option<Vec<u8>>> {
98        let op = self.ordered_children(Some(|_|{}))?;
99        Ok(match op.is_empty() {
100            false => Some(self.zk.get_data(&format!("{}/{}", self.dir, op[0]), false)?.0),
101            true => None
102        })
103    }
104
105    /// Attempts to remove the head of the queue and return it. Returns Ok(None) if the queue is empty.
106    pub fn poll(&self) -> ZkResult<Option<Vec<u8>>> {
107        let op = self.ordered_children(Some(|_|{}))?;
108        if !op.is_empty() {
109            return match self.claim(format!("{}/{}", self.dir, op[0])) {
110                Err(e) if e == ZkError::NoNode => Ok(None),
111                Err(e) => Err(e),
112                Ok(claim) => Ok(Some(claim))
113            };
114        }
115        Ok(None)
116    }
117
118}
119
120fn handle_znode_change(chan: &SyncSender<bool>, ev: WatchedEvent) {
121    if let WatchedEventType::NodeChildrenChanged = ev.event_type {
122        let _ = chan.send(true);
123    }
124}