zookeeper_client/client/
watcher.rs1use asyncs::sync::watch;
2
3use crate::chroot::OwnedChroot;
4use crate::error::Error;
5use crate::session::{OneshotReceiver, PersistentReceiver, SessionState, WatchReceiver, WatchedEvent};
6
7#[derive(Clone, Debug)]
9pub struct StateWatcher {
10 receiver: watch::Receiver<SessionState>,
11}
12
13impl StateWatcher {
14 pub(super) fn new(receiver: watch::Receiver<SessionState>) -> StateWatcher {
15 StateWatcher { receiver }
16 }
17
18 pub fn state(&mut self) -> SessionState {
20 let state = self.receiver.borrow_and_update();
21 *state
22 }
23
24 pub async fn changed(&mut self) -> SessionState {
28 match self.receiver.changed().await {
29 Ok(changed) => *changed,
30 Err(_) => std::future::pending().await,
32 }
33 }
34
35 pub fn peek_state(&self) -> SessionState {
37 let state = self.receiver.borrow();
38 *state
39 }
40}
41
42#[derive(Debug)]
44pub struct OneshotWatcher {
45 chroot: OwnedChroot,
46 receiver: OneshotReceiver,
47}
48
49impl OneshotWatcher {
50 fn new(chroot: OwnedChroot, receiver: OneshotReceiver) -> Self {
51 OneshotWatcher { chroot, receiver }
52 }
53
54 pub async fn changed(self) -> WatchedEvent {
65 let mut event = self.receiver.recv().await;
66 event.drain_root_path(self.chroot.root());
67 event
68 }
69
70 pub async fn remove(self) -> Result<(), Error> {
72 self.receiver.remove().await
73 }
74}
75
76#[derive(Debug)]
78pub struct PersistentWatcher {
79 chroot: OwnedChroot,
80 receiver: PersistentReceiver,
81}
82
83impl PersistentWatcher {
84 fn new(chroot: OwnedChroot, receiver: PersistentReceiver) -> Self {
85 PersistentWatcher { chroot, receiver }
86 }
87
88 pub async fn changed(&mut self) -> WatchedEvent {
101 let mut event = self.receiver.recv().await;
102 event.drain_root_path(self.chroot.root());
103 event
104 }
105
106 pub async fn remove(self) -> Result<(), Error> {
114 self.receiver.remove().await
115 }
116}
117
118impl WatchReceiver {
119 pub fn into_oneshot(self, chroot: &OwnedChroot) -> OneshotWatcher {
120 match self {
121 WatchReceiver::None => unreachable!("expect oneshot watcher, got none watcher"),
122 WatchReceiver::Oneshot(receiver) => OneshotWatcher::new(chroot.clone(), receiver),
123 WatchReceiver::Persistent(_) => {
124 unreachable!("expect oneshot watcher, got persistent watcher")
125 },
126 }
127 }
128
129 pub fn into_persistent(self, chroot: &OwnedChroot) -> PersistentWatcher {
130 match self {
131 WatchReceiver::None => unreachable!("expect oneshot watcher, got none watcher"),
132 WatchReceiver::Oneshot(_) => {
133 unreachable!("expect oneshot watcher, got oneshot watcher")
134 },
135 WatchReceiver::Persistent(receiver) => PersistentWatcher::new(chroot.clone(), receiver),
136 }
137 }
138}