1use consts::{KeeperState, WatchedEventType};
2use consts::{
3 WatchedEventType::{NodeChildrenChanged, NodeCreated, NodeDataChanged, NodeDeleted},
4 WatcherType,
5};
6use proto::ReadFrom;
7use std::collections::HashMap;
8use std::io;
9use std::sync::mpsc::{self, Receiver, Sender};
10use zookeeper::RawResponse;
11
12const PERSISTENT_WATCH_TRIGGERS: [WatchedEventType; 4] = [
13 NodeChildrenChanged,
14 NodeCreated,
15 NodeDataChanged,
16 NodeDeleted,
17];
18
19#[derive(Clone, Debug)]
24pub struct WatchedEvent {
25 pub event_type: WatchedEventType,
27 pub keeper_state: KeeperState,
29 pub path: Option<String>,
31}
32
33pub struct Watch {
35 pub path: String,
37 pub watcher_type: WatcherType,
39 pub watcher: Box<dyn Watcher>,
41}
42
43pub trait Watcher: Send {
45 fn handle(&self, event: WatchedEvent);
47}
48
49impl<F> Watcher for F
50where
51 F: Fn(WatchedEvent) + Send,
52{
53 fn handle(&self, event: WatchedEvent) {
54 self(event)
55 }
56}
57
58pub enum WatchMessage {
59 Event(RawResponse),
60 Watch(Watch),
61 RemoveWatch(String, WatcherType),
62}
63
64pub struct ZkWatch<W: Watcher> {
65 watcher: W,
66 watches: HashMap<String, Vec<Watch>>,
67 persistent_watches: HashMap<String, Vec<Watch>>,
71 chroot: Option<String>,
72 rx: Receiver<WatchMessage>,
73}
74
75impl<W: Watcher> ZkWatch<W> {
76 pub fn new(watcher: W, chroot: Option<String>) -> (Self, Sender<WatchMessage>) {
77 trace!("ZkWatch::new");
78 let (tx, rx) = mpsc::channel();
79
80 let watch = ZkWatch {
81 watches: HashMap::new(),
82 persistent_watches: HashMap::new(),
83 watcher: watcher,
84 chroot: chroot,
85 rx,
86 };
87 (watch, tx)
88 }
89
90 pub fn run(mut self) -> io::Result<()> {
91 while let Ok(msg) = self.rx.recv() {
92 self.process_message(msg);
93 }
94
95 Ok(())
96 }
97
98 fn process_message(&mut self, message: WatchMessage) {
99 match message {
100 WatchMessage::Event(response) => {
101 info!("Event thread got response {:?}", response.header);
102 let mut data = response.data;
103 match response.header.err {
104 0 => match WatchedEvent::read_from(&mut data) {
105 Ok(mut event) => {
106 self.cut_chroot(&mut event);
107 self.dispatch(&event);
108 }
109 Err(e) => error!("Failed to parse WatchedEvent {:?}", e),
110 },
111 e => error!("WatchedEvent.error {:?}", e),
112 }
113 }
114 WatchMessage::Watch(watch) => {
115 let group = if watch.watcher_type.is_persistent() {
116 &mut self.persistent_watches
117 } else {
118 &mut self.watches
119 };
120 group
121 .entry(watch.path.clone())
122 .or_insert_with(|| vec![])
123 .push(watch);
124 }
125 WatchMessage::RemoveWatch(path, watcher_type) => {
126 remove_matching_watches(&path, watcher_type, &mut self.watches);
127 remove_matching_watches(&path, watcher_type, &mut self.persistent_watches);
128 }
129 }
130 }
131
132 fn cut_chroot(&self, event: &mut WatchedEvent) {
133 if let Some(ref chroot) = self.chroot {
134 if event.path.is_some() {
135 event.path = Some(event.path.as_ref().unwrap()[chroot.len()..].to_owned());
136 }
137 }
138 }
139
140 fn dispatch(&mut self, event: &WatchedEvent) {
141 debug!("{:?}", event);
142 if !self.trigger_watches(&event) {
143 self.watcher.handle(event.clone())
144 }
145 }
146
147 fn trigger_watches(&mut self, event: &WatchedEvent) -> bool {
150 if let Some(ref path) = event.path {
151 let triggered_watch = match self.watches.remove(path) {
153 Some(watches) => {
154 let (matching, left): (_, Vec<Watch>) =
155 watches.into_iter().partition(|w| {
156 match (event.event_type, w.watcher_type) {
157 (NodeChildrenChanged, WatcherType::Children) => true,
158 (NodeCreated | NodeDataChanged, WatcherType::Data) => true,
159 (NodeDeleted, _) => true,
160 _ => false,
161 }
162 });
163 if !left.is_empty() {
165 self.watches.insert(path.to_owned(), left);
166 }
167 matching
169 .iter()
170 .for_each(|w| w.watcher.handle(event.clone()));
171 !matching.is_empty()
172 }
173 None => false,
174 };
175
176 let triggered_peristent_watch = if PERSISTENT_WATCH_TRIGGERS.contains(&event.event_type)
177 && !self.persistent_watches.is_empty()
178 {
179 let mut watch_path = String::from("");
180 let mut parts = path.split("/").skip(1);
181 let mut triggered = false;
182 while let Some(part) = parts.next() {
183 watch_path = watch_path + "/" + part;
184 if let Some(watches) = self.persistent_watches.get(&watch_path) {
185 for w in watches {
186 if match w.watcher_type {
187 WatcherType::Persistent => path == &watch_path,
188 WatcherType::PersistentRecursive => true,
189 _ => false,
190 } {
191 w.watcher.handle(event.clone());
192 triggered = true;
193 }
194 }
195 }
196 }
197 triggered
198 } else {
199 false
200 };
201 triggered_watch || triggered_peristent_watch
202 } else {
203 false
204 }
205 }
206}
207
208fn remove_matching_watches(
209 path: &str,
210 watcher_type: WatcherType,
211 watches: &mut HashMap<String, Vec<Watch>>,
212) {
213 let remaining_watches: Option<Vec<_>> = watches.remove(path).map(|watches| {
214 watches
215 .into_iter()
216 .filter(|w| w.watcher_type == watcher_type || watcher_type == WatcherType::Any)
217 .collect()
218 });
219 if let Some(w) = remaining_watches {
220 if !w.is_empty() {
221 watches.insert(path.into(), w);
222 }
223 }
224}