1use std::collections::HashMap;
2use std::fmt::{Debug, Formatter, Result as FmtResult};
3use tokio::sync::mpsc::{channel, Receiver, Sender};
4use tracing::*;
5
6use crate::proto::ReadFrom;
7use crate::zookeeper::RawResponse;
8use crate::{KeeperState, WatchedEventType};
9
10#[derive(Clone, Debug)]
15pub struct WatchedEvent {
16 pub event_type: WatchedEventType,
18 pub keeper_state: KeeperState,
20 pub path: Option<String>,
22}
23
24#[derive(Debug, PartialEq)]
26pub enum WatchType {
27 Child,
29 Data,
31 Exist,
33}
34
35pub struct Watch {
37 pub path: String,
39 pub watch_type: WatchType,
41 pub watcher: Box<dyn FnOnce(WatchedEvent) + Send>,
43}
44
45impl Debug for Watch {
46 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
47 f.debug_struct("Watch")
48 .field("path", &self.path)
49 .field("watch_type", &self.watch_type)
50 .finish()
51 }
52}
53
54pub trait Watcher: Send {
56 fn handle(&self, event: WatchedEvent);
58}
59
60impl<F> Watcher for F
61where
62 F: Fn(WatchedEvent) + Send,
63{
64 fn handle(&self, event: WatchedEvent) {
65 self(event)
66 }
67}
68
69#[derive(Debug)]
70pub enum WatchMessage {
71 Event(RawResponse),
72 Watch(Watch),
73}
74
75pub struct ZkWatch<W: Watcher> {
76 watcher: W,
77 watches: HashMap<String, Vec<Watch>>,
78 chroot: Option<String>,
79 rx: Receiver<WatchMessage>,
80}
81
82impl<W: Watcher> ZkWatch<W> {
83 pub fn new(watcher: W, chroot: Option<String>) -> (Self, Sender<WatchMessage>) {
84 trace!("ZkWatch::new");
85 let (tx, rx) = channel(64);
86
87 let watch = ZkWatch {
88 watches: HashMap::new(),
89 watcher,
90 chroot,
91 rx,
92 };
93 (watch, tx)
94 }
95
96 pub async fn run(mut self) {
97 while let Some(msg) = self.rx.recv().await {
98 self.process_message(msg);
99 }
100 }
101
102 fn process_message(&mut self, message: WatchMessage) {
103 match message {
104 WatchMessage::Event(response) => {
105 info!("Event thread got response {:?}", response.header);
106 let mut data = response.data;
107 match response.header.err {
108 0 => match WatchedEvent::read_from(&mut data) {
109 Ok(mut event) => {
110 self.cut_chroot(&mut event);
111 self.dispatch(&event);
112 }
113 Err(e) => error!("Failed to parse WatchedEvent {:?}", e),
114 },
115 e => error!("WatchedEvent.error {:?}", e),
116 }
117 }
118 WatchMessage::Watch(watch) => {
119 self.watches
120 .entry(watch.path.clone())
121 .or_insert_with(Vec::new)
122 .push(watch);
123 }
124 }
125 }
126
127 fn cut_chroot(&self, event: &mut WatchedEvent) {
128 if let Some(ref chroot) = self.chroot {
129 if event.path.is_some() {
130 event.path = Some(event.path.as_ref().unwrap()[chroot.len()..].to_owned());
131 }
132 }
133 }
134
135 fn dispatch(&mut self, event: &WatchedEvent) {
136 debug!("{:?}", event);
137 if let Some(watches) = self.find_watches(event) {
138 for watch in watches.into_iter() {
139 (watch.watcher)(event.clone())
140 }
141 } else {
142 self.watcher.handle(event.clone())
143 }
144 }
145
146 fn find_watches(&mut self, event: &WatchedEvent) -> Option<Vec<Watch>> {
147 if let Some(ref path) = event.path {
148 match self.watches.remove(path) {
149 Some(watches) => {
150 let (matching, left): (_, Vec<Watch>) =
151 watches.into_iter().partition(|w| match event.event_type {
152 WatchedEventType::NodeChildrenChanged => {
153 w.watch_type == WatchType::Child
154 }
155 WatchedEventType::NodeCreated | WatchedEventType::NodeDataChanged => {
156 w.watch_type == WatchType::Data || w.watch_type == WatchType::Exist
157 }
158 WatchedEventType::NodeDeleted => true,
159 _ => false,
160 });
161
162 if !left.is_empty() {
164 self.watches.insert(path.to_owned(), left);
165 }
166 if matching.is_empty() {
167 None
168 } else {
169 Some(matching)
170 }
171 }
172 None => None,
173 }
174 } else {
175 None
176 }
177 }
178}