zookeeper_async/
watch.rs

1use std::collections::HashMap;
2use std::fmt::{Debug, Formatter, Result as FmtResult};
3use tokio::sync::mpsc::{channel, Receiver, Sender};
4use tracing::*;
5
6use crate::proto::ReadFrom;
7use crate::zookeeper::RawResponse;
8use crate::{KeeperState, WatchedEventType};
9
10/// Represents a change on the ZooKeeper that a `Watcher` is able to respond to.
11///
12/// The `WatchedEvent` includes exactly what happened, the current state of the ZooKeeper, and the
13/// path of the znode that was involved in the event.
14#[derive(Clone, Debug)]
15pub struct WatchedEvent {
16    /// The trigger that caused the watch to hit.
17    pub event_type: WatchedEventType,
18    /// The current state of ZooKeeper (and the client's connection to it).
19    pub keeper_state: KeeperState,
20    /// The path of the znode that was involved. This will be `None` for session-related triggers.
21    pub path: Option<String>,
22}
23
24/// Describes what a `Watch` is looking for.
25#[derive(Debug, PartialEq)]
26pub enum WatchType {
27    /// Watching for changes to children.
28    Child,
29    /// Watching for changes to data.
30    Data,
31    /// Watching for the creation of a node at the given path.
32    Exist,
33}
34
35/// An object watching a path for certain changes.
36pub struct Watch {
37    /// The path to the znode this is watching.
38    pub path: String,
39    /// The type of changes this watch is looking for.
40    pub watch_type: WatchType,
41    /// The handler for this watch, to call when it is triggered.
42    pub watcher: Box<dyn FnOnce(WatchedEvent) + Send>,
43}
44
45impl Debug for Watch {
46    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
47        f.debug_struct("Watch")
48            .field("path", &self.path)
49            .field("watch_type", &self.watch_type)
50            .finish()
51    }
52}
53
54/// The interface for handling events when a `Watch` triggers.
55pub trait Watcher: Send {
56    /// Receive the triggered event.
57    fn handle(&self, event: WatchedEvent);
58}
59
60impl<F> Watcher for F
61where
62    F: Fn(WatchedEvent) + Send,
63{
64    fn handle(&self, event: WatchedEvent) {
65        self(event)
66    }
67}
68
69#[derive(Debug)]
70pub enum WatchMessage {
71    Event(RawResponse),
72    Watch(Watch),
73}
74
75pub struct ZkWatch<W: Watcher> {
76    watcher: W,
77    watches: HashMap<String, Vec<Watch>>,
78    chroot: Option<String>,
79    rx: Receiver<WatchMessage>,
80}
81
82impl<W: Watcher> ZkWatch<W> {
83    pub fn new(watcher: W, chroot: Option<String>) -> (Self, Sender<WatchMessage>) {
84        trace!("ZkWatch::new");
85        let (tx, rx) = channel(64);
86
87        let watch = ZkWatch {
88            watches: HashMap::new(),
89            watcher,
90            chroot,
91            rx,
92        };
93        (watch, tx)
94    }
95
96    pub async fn run(mut self) {
97        while let Some(msg) = self.rx.recv().await {
98            self.process_message(msg);
99        }
100    }
101
102    fn process_message(&mut self, message: WatchMessage) {
103        match message {
104            WatchMessage::Event(response) => {
105                info!("Event thread got response {:?}", response.header);
106                let mut data = response.data;
107                match response.header.err {
108                    0 => match WatchedEvent::read_from(&mut data) {
109                        Ok(mut event) => {
110                            self.cut_chroot(&mut event);
111                            self.dispatch(&event);
112                        }
113                        Err(e) => error!("Failed to parse WatchedEvent {:?}", e),
114                    },
115                    e => error!("WatchedEvent.error {:?}", e),
116                }
117            }
118            WatchMessage::Watch(watch) => {
119                self.watches
120                    .entry(watch.path.clone())
121                    .or_insert_with(Vec::new)
122                    .push(watch);
123            }
124        }
125    }
126
127    fn cut_chroot(&self, event: &mut WatchedEvent) {
128        if let Some(ref chroot) = self.chroot {
129            if event.path.is_some() {
130                event.path = Some(event.path.as_ref().unwrap()[chroot.len()..].to_owned());
131            }
132        }
133    }
134
135    fn dispatch(&mut self, event: &WatchedEvent) {
136        debug!("{:?}", event);
137        if let Some(watches) = self.find_watches(event) {
138            for watch in watches.into_iter() {
139                (watch.watcher)(event.clone())
140            }
141        } else {
142            self.watcher.handle(event.clone())
143        }
144    }
145
146    fn find_watches(&mut self, event: &WatchedEvent) -> Option<Vec<Watch>> {
147        if let Some(ref path) = event.path {
148            match self.watches.remove(path) {
149                Some(watches) => {
150                    let (matching, left): (_, Vec<Watch>) =
151                        watches.into_iter().partition(|w| match event.event_type {
152                            WatchedEventType::NodeChildrenChanged => {
153                                w.watch_type == WatchType::Child
154                            }
155                            WatchedEventType::NodeCreated | WatchedEventType::NodeDataChanged => {
156                                w.watch_type == WatchType::Data || w.watch_type == WatchType::Exist
157                            }
158                            WatchedEventType::NodeDeleted => true,
159                            _ => false,
160                        });
161
162                    // put back the remaining watches
163                    if !left.is_empty() {
164                        self.watches.insert(path.to_owned(), left);
165                    }
166                    if matching.is_empty() {
167                        None
168                    } else {
169                        Some(matching)
170                    }
171                }
172                None => None,
173            }
174        } else {
175            None
176        }
177    }
178}