zookeeper_client/client/
watcher.rs

1use asyncs::sync::watch;
2
3use crate::chroot::OwnedChroot;
4use crate::error::Error;
5use crate::session::{OneshotReceiver, PersistentReceiver, SessionState, WatchReceiver, WatchedEvent};
6
7/// StateWatcher tracks session state updates.
8#[derive(Clone, Debug)]
9pub struct StateWatcher {
10    receiver: watch::Receiver<SessionState>,
11}
12
13impl StateWatcher {
14    pub(super) fn new(receiver: watch::Receiver<SessionState>) -> StateWatcher {
15        StateWatcher { receiver }
16    }
17
18    /// Returns and consumes most recently state.
19    pub fn state(&mut self) -> SessionState {
20        let state = self.receiver.borrow_and_update();
21        *state
22    }
23
24    /// Waits until state changed and returns consumed state.
25    ///
26    /// This method will block indefinitely after one of terminal states consumed.
27    pub async fn changed(&mut self) -> SessionState {
28        match self.receiver.changed().await {
29            Ok(changed) => *changed,
30            // Terminal state must be delivered.
31            Err(_) => std::future::pending().await,
32        }
33    }
34
35    /// Returns but not consumes most recently state.
36    pub fn peek_state(&self) -> SessionState {
37        let state = self.receiver.borrow();
38        *state
39    }
40}
41
42/// Watcher for stat, data and child event.
43#[derive(Debug)]
44pub struct OneshotWatcher {
45    chroot: OwnedChroot,
46    receiver: OneshotReceiver,
47}
48
49impl OneshotWatcher {
50    fn new(chroot: OwnedChroot, receiver: OneshotReceiver) -> Self {
51        OneshotWatcher { chroot, receiver }
52    }
53
54    /// Waits for node event or session broken.
55    ///
56    /// # API
57    /// No [SessionState::Disconnected] event as server will deliver latest data events in auto
58    /// watch reset on reconnection.
59    ///
60    /// # Notable issues
61    /// [ZOOKEEPER-43][]: Server side of the auto reset watches patch
62    ///
63    /// [ZOOKEEPER-43]: https://issues.apache.org/jira/browse/ZOOKEEPER-43
64    pub async fn changed(self) -> WatchedEvent {
65        let mut event = self.receiver.recv().await;
66        event.drain_root_path(self.chroot.root());
67        event
68    }
69
70    /// Removes this watcher.
71    #[deprecated(since = "0.10.0", note = "Rust has Drop")]
72    pub async fn remove(self) -> Result<(), Error> {
73        self.receiver.remove().await
74    }
75}
76
77/// Watcher for persistent and recursive watch.
78#[derive(Debug)]
79pub struct PersistentWatcher {
80    chroot: OwnedChroot,
81    receiver: PersistentReceiver,
82}
83
84impl PersistentWatcher {
85    fn new(chroot: OwnedChroot, receiver: PersistentReceiver) -> Self {
86        PersistentWatcher { chroot, receiver }
87    }
88
89    /// Waits for next event which could be node event or session activities.
90    ///
91    /// # Panics
92    /// Panic after terminal session event received.
93    ///
94    /// # BUG
95    /// Events during reconnection could be lost due to [ZOOKEEPER-4698], as events during
96    /// connection loss are not delivered in auto watch reset. So, callers should rebuild their
97    /// knowledge to avoid data inconsistency after [SessionState::Disconnected] and following
98    /// [SessionState::SyncConnected]. See also [OneshotWatcher::changed].
99    ///
100    /// [ZOOKEEPER-4698]: https://issues.apache.org/jira/browse/ZOOKEEPER-4698
101    pub async fn changed(&mut self) -> WatchedEvent {
102        let mut event = self.receiver.recv().await;
103        event.drain_root_path(self.chroot.root());
104        event
105    }
106
107    /// Removes this watcher.
108    ///
109    /// # Cautions
110    /// It is a best effect as ZooKeeper ([ZOOKEEPER-4472][]) does not support persistent watch
111    /// removing individually.
112    ///
113    /// [ZOOKEEPER-4472]: https://issues.apache.org/jira/browse/ZOOKEEPER-4472
114    #[deprecated(since = "0.10.0", note = "Rust has Drop")]
115    pub async fn remove(self) -> Result<(), Error> {
116        self.receiver.remove().await
117    }
118}
119
120impl WatchReceiver {
121    pub fn into_oneshot(self, chroot: &OwnedChroot) -> OneshotWatcher {
122        match self {
123            WatchReceiver::None => unreachable!("expect oneshot watcher, got none watcher"),
124            WatchReceiver::Oneshot(receiver) => OneshotWatcher::new(chroot.clone(), receiver),
125            WatchReceiver::Persistent(_) => {
126                unreachable!("expect oneshot watcher, got persistent watcher")
127            },
128        }
129    }
130
131    pub fn into_persistent(self, chroot: &OwnedChroot) -> PersistentWatcher {
132        match self {
133            WatchReceiver::None => unreachable!("expect oneshot watcher, got none watcher"),
134            WatchReceiver::Oneshot(_) => {
135                unreachable!("expect oneshot watcher, got oneshot watcher")
136            },
137            WatchReceiver::Persistent(receiver) => PersistentWatcher::new(chroot.clone(), receiver),
138        }
139    }
140}