zookeeper/
watch.rs

1use consts::{KeeperState, WatchedEventType};
2use consts::{
3    WatchedEventType::{NodeChildrenChanged, NodeCreated, NodeDataChanged, NodeDeleted},
4    WatcherType,
5};
6use proto::ReadFrom;
7use std::collections::HashMap;
8use std::io;
9use std::sync::mpsc::{self, Receiver, Sender};
10use zookeeper::RawResponse;
11
12const PERSISTENT_WATCH_TRIGGERS: [WatchedEventType; 4] = [
13    NodeChildrenChanged,
14    NodeCreated,
15    NodeDataChanged,
16    NodeDeleted,
17];
18
19/// Represents a change on the ZooKeeper that a `Watcher` is able to respond to.
20///
21/// The `WatchedEvent` includes exactly what happened, the current state of the ZooKeeper, and the
22/// path of the znode that was involved in the event.
23#[derive(Clone, Debug)]
24pub struct WatchedEvent {
25    /// The trigger that caused the watch to hit.
26    pub event_type: WatchedEventType,
27    /// The current state of ZooKeeper (and the client's connection to it).
28    pub keeper_state: KeeperState,
29    /// The path of the znode that was involved. This will be `None` for session-related triggers.
30    pub path: Option<String>,
31}
32
33/// An object watching a path for certain changes.
34pub struct Watch {
35    /// The path to the znode this is watching.
36    pub path: String,
37    /// The type of changes this watch is looking for.
38    pub watcher_type: WatcherType,
39    /// The handler for this watch, to call when it is triggered.
40    pub watcher: Box<dyn Watcher>,
41}
42
43/// The interface for handling events when a `Watch` triggers.
44pub trait Watcher: Send {
45    /// Receive the triggered event.
46    fn handle(&self, event: WatchedEvent);
47}
48
49impl<F> Watcher for F
50where
51    F: Fn(WatchedEvent) + Send,
52{
53    fn handle(&self, event: WatchedEvent) {
54        self(event)
55    }
56}
57
58pub enum WatchMessage {
59    Event(RawResponse),
60    Watch(Watch),
61    RemoveWatch(String, WatcherType),
62}
63
64pub struct ZkWatch<W: Watcher> {
65    watcher: W,
66    watches: HashMap<String, Vec<Watch>>,
67    // Storing peristent watches separately since they may require splitting the event path
68    // which will have a performance impact. This replicates how they are stored in Zookeeper as well
69    // and allows us to skip removing them from the hashmap and re-adding them.
70    persistent_watches: HashMap<String, Vec<Watch>>,
71    chroot: Option<String>,
72    rx: Receiver<WatchMessage>,
73}
74
75impl<W: Watcher> ZkWatch<W> {
76    pub fn new(watcher: W, chroot: Option<String>) -> (Self, Sender<WatchMessage>) {
77        trace!("ZkWatch::new");
78        let (tx, rx) = mpsc::channel();
79
80        let watch = ZkWatch {
81            watches: HashMap::new(),
82            persistent_watches: HashMap::new(),
83            watcher: watcher,
84            chroot: chroot,
85            rx,
86        };
87        (watch, tx)
88    }
89
90    pub fn run(mut self) -> io::Result<()> {
91        while let Ok(msg) = self.rx.recv() {
92            self.process_message(msg);
93        }
94
95        Ok(())
96    }
97
98    fn process_message(&mut self, message: WatchMessage) {
99        match message {
100            WatchMessage::Event(response) => {
101                info!("Event thread got response {:?}", response.header);
102                let mut data = response.data;
103                match response.header.err {
104                    0 => match WatchedEvent::read_from(&mut data) {
105                        Ok(mut event) => {
106                            self.cut_chroot(&mut event);
107                            self.dispatch(&event);
108                        }
109                        Err(e) => error!("Failed to parse WatchedEvent {:?}", e),
110                    },
111                    e => error!("WatchedEvent.error {:?}", e),
112                }
113            }
114            WatchMessage::Watch(watch) => {
115                let group = if watch.watcher_type.is_persistent() {
116                    &mut self.persistent_watches
117                } else {
118                    &mut self.watches
119                };
120                group
121                    .entry(watch.path.clone())
122                    .or_insert_with(|| vec![])
123                    .push(watch);
124            }
125            WatchMessage::RemoveWatch(path, watcher_type) => {
126                remove_matching_watches(&path, watcher_type, &mut self.watches);
127                remove_matching_watches(&path, watcher_type, &mut self.persistent_watches);
128            }
129        }
130    }
131
132    fn cut_chroot(&self, event: &mut WatchedEvent) {
133        if let Some(ref chroot) = self.chroot {
134            if event.path.is_some() {
135                event.path = Some(event.path.as_ref().unwrap()[chroot.len()..].to_owned());
136            }
137        }
138    }
139
140    fn dispatch(&mut self, event: &WatchedEvent) {
141        debug!("{:?}", event);
142        if !self.trigger_watches(&event) {
143            self.watcher.handle(event.clone())
144        }
145    }
146
147    /// Triggers all the watches that we have registered, removing the ones that are not persistent.
148    /// Returns whether or not any of the watches fired.
149    fn trigger_watches(&mut self, event: &WatchedEvent) -> bool {
150        if let Some(ref path) = event.path {
151            // We execute this in two steps. Once for the one-off watches, and once for the persistent ones.
152            let triggered_watch = match self.watches.remove(path) {
153                Some(watches) => {
154                    let (matching, left): (_, Vec<Watch>) =
155                        watches.into_iter().partition(|w| {
156                            match (event.event_type, w.watcher_type) {
157                                (NodeChildrenChanged, WatcherType::Children) => true,
158                                (NodeCreated | NodeDataChanged, WatcherType::Data) => true,
159                                (NodeDeleted, _) => true,
160                                _ => false,
161                            }
162                        });
163                    // put back the remaining watches
164                    if !left.is_empty() {
165                        self.watches.insert(path.to_owned(), left);
166                    }
167                    // Trigger all matching watches.
168                    matching
169                        .iter()
170                        .for_each(|w| w.watcher.handle(event.clone()));
171                    !matching.is_empty()
172                }
173                None => false,
174            };
175
176            let triggered_peristent_watch = if PERSISTENT_WATCH_TRIGGERS.contains(&event.event_type)
177                && !self.persistent_watches.is_empty()
178            {
179                let mut watch_path = String::from("");
180                let mut parts = path.split("/").skip(1);
181                let mut triggered = false;
182                while let Some(part) = parts.next() {
183                    watch_path = watch_path + "/" + part;
184                    if let Some(watches) = self.persistent_watches.get(&watch_path) {
185                        for w in watches {
186                            if match w.watcher_type {
187                                WatcherType::Persistent => path == &watch_path,
188                                WatcherType::PersistentRecursive => true,
189                                _ => false,
190                            } {
191                                w.watcher.handle(event.clone());
192                                triggered = true;
193                            }
194                        }
195                    }
196                }
197                triggered
198            } else {
199                false
200            };
201            triggered_watch || triggered_peristent_watch
202        } else {
203            false
204        }
205    }
206}
207
208fn remove_matching_watches(
209    path: &str,
210    watcher_type: WatcherType,
211    watches: &mut HashMap<String, Vec<Watch>>,
212) {
213    let remaining_watches: Option<Vec<_>> = watches.remove(path).map(|watches| {
214        watches
215            .into_iter()
216            .filter(|w| w.watcher_type == watcher_type || watcher_type == WatcherType::Any)
217            .collect()
218    });
219    if let Some(w) = remaining_watches {
220        if !w.is_empty() {
221            watches.insert(path.into(), w);
222        }
223    }
224}