zookeeper_client/client/
watcher.rs1use asyncs::sync::watch;
2
3use crate::chroot::OwnedChroot;
4use crate::error::Error;
5use crate::session::{OneshotReceiver, PersistentReceiver, SessionState, WatchReceiver, WatchedEvent};
6
7#[derive(Clone, Debug)]
9pub struct StateWatcher {
10 receiver: watch::Receiver<SessionState>,
11}
12
13impl StateWatcher {
14 pub(super) fn new(receiver: watch::Receiver<SessionState>) -> StateWatcher {
15 StateWatcher { receiver }
16 }
17
18 pub fn state(&mut self) -> SessionState {
20 let state = self.receiver.borrow_and_update();
21 *state
22 }
23
24 pub async fn changed(&mut self) -> SessionState {
28 match self.receiver.changed().await {
29 Ok(changed) => *changed,
30 Err(_) => std::future::pending().await,
32 }
33 }
34
35 pub fn peek_state(&self) -> SessionState {
37 let state = self.receiver.borrow();
38 *state
39 }
40}
41
42#[derive(Debug)]
44pub struct OneshotWatcher {
45 chroot: OwnedChroot,
46 receiver: OneshotReceiver,
47}
48
49impl OneshotWatcher {
50 fn new(chroot: OwnedChroot, receiver: OneshotReceiver) -> Self {
51 OneshotWatcher { chroot, receiver }
52 }
53
54 pub async fn changed(self) -> WatchedEvent {
65 let mut event = self.receiver.recv().await;
66 event.drain_root_path(self.chroot.root());
67 event
68 }
69
70 #[deprecated(since = "0.10.0", note = "Rust has Drop")]
72 pub async fn remove(self) -> Result<(), Error> {
73 self.receiver.remove().await
74 }
75}
76
77#[derive(Debug)]
79pub struct PersistentWatcher {
80 chroot: OwnedChroot,
81 receiver: PersistentReceiver,
82}
83
84impl PersistentWatcher {
85 fn new(chroot: OwnedChroot, receiver: PersistentReceiver) -> Self {
86 PersistentWatcher { chroot, receiver }
87 }
88
89 pub async fn changed(&mut self) -> WatchedEvent {
102 let mut event = self.receiver.recv().await;
103 event.drain_root_path(self.chroot.root());
104 event
105 }
106
107 #[deprecated(since = "0.10.0", note = "Rust has Drop")]
115 pub async fn remove(self) -> Result<(), Error> {
116 self.receiver.remove().await
117 }
118}
119
120impl WatchReceiver {
121 pub fn into_oneshot(self, chroot: &OwnedChroot) -> OneshotWatcher {
122 match self {
123 WatchReceiver::None => unreachable!("expect oneshot watcher, got none watcher"),
124 WatchReceiver::Oneshot(receiver) => OneshotWatcher::new(chroot.clone(), receiver),
125 WatchReceiver::Persistent(_) => {
126 unreachable!("expect oneshot watcher, got persistent watcher")
127 },
128 }
129 }
130
131 pub fn into_persistent(self, chroot: &OwnedChroot) -> PersistentWatcher {
132 match self {
133 WatchReceiver::None => unreachable!("expect oneshot watcher, got none watcher"),
134 WatchReceiver::Oneshot(_) => {
135 unreachable!("expect oneshot watcher, got oneshot watcher")
136 },
137 WatchReceiver::Persistent(receiver) => PersistentWatcher::new(chroot.clone(), receiver),
138 }
139 }
140}