zookeeper_client/client/
watcher.rs

1use asyncs::sync::watch;
2
3use crate::chroot::OwnedChroot;
4use crate::error::Error;
5use crate::session::{OneshotReceiver, PersistentReceiver, SessionState, WatchReceiver, WatchedEvent};
6
7/// StateWatcher tracks session state updates.
8#[derive(Clone, Debug)]
9pub struct StateWatcher {
10    receiver: watch::Receiver<SessionState>,
11}
12
13impl StateWatcher {
14    pub(super) fn new(receiver: watch::Receiver<SessionState>) -> StateWatcher {
15        StateWatcher { receiver }
16    }
17
18    /// Returns and consumes most recently state.
19    pub fn state(&mut self) -> SessionState {
20        let state = self.receiver.borrow_and_update();
21        *state
22    }
23
24    /// Waits until state changed and returns consumed state.
25    ///
26    /// This method will block indefinitely after one of terminal states consumed.
27    pub async fn changed(&mut self) -> SessionState {
28        match self.receiver.changed().await {
29            Ok(changed) => *changed,
30            // Terminal state must be delivered.
31            Err(_) => std::future::pending().await,
32        }
33    }
34
35    /// Returns but not consumes most recently state.
36    pub fn peek_state(&self) -> SessionState {
37        let state = self.receiver.borrow();
38        *state
39    }
40}
41
42/// Watcher for stat, data and child event.
43#[derive(Debug)]
44pub struct OneshotWatcher {
45    chroot: OwnedChroot,
46    receiver: OneshotReceiver,
47}
48
49impl OneshotWatcher {
50    fn new(chroot: OwnedChroot, receiver: OneshotReceiver) -> Self {
51        OneshotWatcher { chroot, receiver }
52    }
53
54    /// Waits for node event or session broken.
55    ///
56    /// # API
57    /// No [SessionState::Disconnected] event as server will deliver latest data events in auto
58    /// watch reset on reconnection.
59    ///
60    /// # Notable issues
61    /// [ZOOKEEPER-43][]: Server side of the auto reset watches patch
62    ///
63    /// [ZOOKEEPER-43]: https://issues.apache.org/jira/browse/ZOOKEEPER-43
64    pub async fn changed(self) -> WatchedEvent {
65        let mut event = self.receiver.recv().await;
66        event.drain_root_path(self.chroot.root());
67        event
68    }
69
70    /// Removes this watcher.
71    pub async fn remove(self) -> Result<(), Error> {
72        self.receiver.remove().await
73    }
74}
75
76/// Watcher for persistent and recursive watch.
77#[derive(Debug)]
78pub struct PersistentWatcher {
79    chroot: OwnedChroot,
80    receiver: PersistentReceiver,
81}
82
83impl PersistentWatcher {
84    fn new(chroot: OwnedChroot, receiver: PersistentReceiver) -> Self {
85        PersistentWatcher { chroot, receiver }
86    }
87
88    /// Waits for next event which could be node event or session activities.
89    ///
90    /// # Panics
91    /// Panic after terminal session event received.
92    ///
93    /// # BUG
94    /// Events during reconnection could be lost due to [ZOOKEEPER-4698], as events during
95    /// connection loss are not delivered in auto watch reset. So, callers should rebuild their
96    /// knowledge to avoid data inconsistency after [SessionState::Disconnected] and following
97    /// [SessionState::SyncConnected]. See also [OneshotWatcher::changed].
98    ///
99    /// [ZOOKEEPER-4698]: https://issues.apache.org/jira/browse/ZOOKEEPER-4698
100    pub async fn changed(&mut self) -> WatchedEvent {
101        let mut event = self.receiver.recv().await;
102        event.drain_root_path(self.chroot.root());
103        event
104    }
105
106    /// Removes this watcher.
107    ///
108    /// # Cautions
109    /// It is a best effect as ZooKeeper ([ZOOKEEPER-4472][]) does not support persistent watch
110    /// removing individually.
111    ///
112    /// [ZOOKEEPER-4472]: https://issues.apache.org/jira/browse/ZOOKEEPER-4472
113    pub async fn remove(self) -> Result<(), Error> {
114        self.receiver.remove().await
115    }
116}
117
118impl WatchReceiver {
119    pub fn into_oneshot(self, chroot: &OwnedChroot) -> OneshotWatcher {
120        match self {
121            WatchReceiver::None => unreachable!("expect oneshot watcher, got none watcher"),
122            WatchReceiver::Oneshot(receiver) => OneshotWatcher::new(chroot.clone(), receiver),
123            WatchReceiver::Persistent(_) => {
124                unreachable!("expect oneshot watcher, got persistent watcher")
125            },
126        }
127    }
128
129    pub fn into_persistent(self, chroot: &OwnedChroot) -> PersistentWatcher {
130        match self {
131            WatchReceiver::None => unreachable!("expect oneshot watcher, got none watcher"),
132            WatchReceiver::Oneshot(_) => {
133                unreachable!("expect oneshot watcher, got oneshot watcher")
134            },
135            WatchReceiver::Persistent(receiver) => PersistentWatcher::new(chroot.clone(), receiver),
136        }
137    }
138}