zookeeper_client/client/
watcher.rs1use asyncs::sync::watch;
2
3use crate::chroot::OwnedChroot;
4use crate::error::Error;
5use crate::session::{OneshotReceiver, PersistentReceiver, SessionState, WatchReceiver, WatchedEvent};
6
7#[derive(Clone, Debug)]
9pub struct StateWatcher {
10    receiver: watch::Receiver<SessionState>,
11}
12
13impl StateWatcher {
14    pub(super) fn new(receiver: watch::Receiver<SessionState>) -> StateWatcher {
15        StateWatcher { receiver }
16    }
17
18    pub fn state(&mut self) -> SessionState {
20        let state = self.receiver.borrow_and_update();
21        *state
22    }
23
24    pub async fn changed(&mut self) -> SessionState {
28        match self.receiver.changed().await {
29            Ok(changed) => *changed,
30            Err(_) => std::future::pending().await,
32        }
33    }
34
35    pub fn peek_state(&self) -> SessionState {
37        let state = self.receiver.borrow();
38        *state
39    }
40}
41
42#[derive(Debug)]
44pub struct OneshotWatcher {
45    chroot: OwnedChroot,
46    receiver: OneshotReceiver,
47}
48
49impl OneshotWatcher {
50    fn new(chroot: OwnedChroot, receiver: OneshotReceiver) -> Self {
51        OneshotWatcher { chroot, receiver }
52    }
53
54    pub async fn changed(self) -> WatchedEvent {
65        let mut event = self.receiver.recv().await;
66        event.drain_root_path(self.chroot.root());
67        event
68    }
69
70    pub async fn remove(self) -> Result<(), Error> {
72        self.receiver.remove().await
73    }
74}
75
76#[derive(Debug)]
78pub struct PersistentWatcher {
79    chroot: OwnedChroot,
80    receiver: PersistentReceiver,
81}
82
83impl PersistentWatcher {
84    fn new(chroot: OwnedChroot, receiver: PersistentReceiver) -> Self {
85        PersistentWatcher { chroot, receiver }
86    }
87
88    pub async fn changed(&mut self) -> WatchedEvent {
101        let mut event = self.receiver.recv().await;
102        event.drain_root_path(self.chroot.root());
103        event
104    }
105
106    pub async fn remove(self) -> Result<(), Error> {
114        self.receiver.remove().await
115    }
116}
117
118impl WatchReceiver {
119    pub fn into_oneshot(self, chroot: &OwnedChroot) -> OneshotWatcher {
120        match self {
121            WatchReceiver::None => unreachable!("expect oneshot watcher, got none watcher"),
122            WatchReceiver::Oneshot(receiver) => OneshotWatcher::new(chroot.clone(), receiver),
123            WatchReceiver::Persistent(_) => {
124                unreachable!("expect oneshot watcher, got persistent watcher")
125            },
126        }
127    }
128
129    pub fn into_persistent(self, chroot: &OwnedChroot) -> PersistentWatcher {
130        match self {
131            WatchReceiver::None => unreachable!("expect oneshot watcher, got none watcher"),
132            WatchReceiver::Oneshot(_) => {
133                unreachable!("expect oneshot watcher, got oneshot watcher")
134            },
135            WatchReceiver::Persistent(receiver) => PersistentWatcher::new(chroot.clone(), receiver),
136        }
137    }
138}