1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use tokio::sync::watch;

use crate::chroot::OwnedChroot;
use crate::error::Error;
use crate::session::{OneshotReceiver, PersistentReceiver, SessionState, WatchReceiver, WatchedEvent};

/// StateWatcher tracks session state updates.
#[derive(Clone, Debug)]
pub struct StateWatcher {
    receiver: watch::Receiver<SessionState>,
}

impl StateWatcher {
    pub(super) fn new(receiver: watch::Receiver<SessionState>) -> StateWatcher {
        StateWatcher { receiver }
    }

    /// Returns and consumes most recently state.
    pub fn state(&mut self) -> SessionState {
        let state = self.receiver.borrow_and_update();
        *state
    }

    /// Waits until state changed and returns consumed state.
    ///
    /// This method will block indefinitely after one of terminal states consumed.
    pub async fn changed(&mut self) -> SessionState {
        if self.receiver.changed().await.is_err() {
            // Terminal state must be deliveried.
            std::future::pending().await
        }
        self.state()
    }

    /// Returns but not consumes most recently state.
    pub fn peek_state(&self) -> SessionState {
        let state = self.receiver.borrow();
        *state
    }
}

/// Watcher for stat, data and child event.
#[derive(Debug)]
pub struct OneshotWatcher {
    chroot: OwnedChroot,
    receiver: OneshotReceiver,
}

impl OneshotWatcher {
    fn new(chroot: OwnedChroot, receiver: OneshotReceiver) -> Self {
        OneshotWatcher { chroot, receiver }
    }

    /// Waits for node event or session broken.
    ///
    /// # API
    /// No [SessionState::Disconnected] event as server will deliver latest data events in auto
    /// watch reset on reconnection.
    ///
    /// # Notable issues
    /// [ZOOKEEPER-43][]: Server side of the auto reset watches patch
    ///
    /// [ZOOKEEPER-43]: https://issues.apache.org/jira/browse/ZOOKEEPER-43
    pub async fn changed(self) -> WatchedEvent {
        let mut event = self.receiver.recv().await;
        event.drain_root_path(self.chroot.root());
        event
    }

    /// Removes this watcher.
    pub async fn remove(self) -> Result<(), Error> {
        self.receiver.remove().await
    }
}

/// Watcher for persistent and recursive watch.
#[derive(Debug)]
pub struct PersistentWatcher {
    chroot: OwnedChroot,
    receiver: PersistentReceiver,
}

impl PersistentWatcher {
    fn new(chroot: OwnedChroot, receiver: PersistentReceiver) -> Self {
        PersistentWatcher { chroot, receiver }
    }

    /// Waits for next event which could be node event or session activities.
    ///
    /// # Panics
    /// Panic after terminal session event received.
    ///
    /// # BUG
    /// Events during reconnection could be lost due to [ZOOKEEPER-4698], as events during
    /// connection loss are not delivered in auto watch reset. So, callers should rebuild their
    /// knowledge to avoid data inconsistency after [SessionState::Disconnected] and following
    /// [SessionState::SyncConnected]. See also [OneshotWatcher::changed].
    ///
    /// [ZOOKEEPER-4698]: https://issues.apache.org/jira/browse/ZOOKEEPER-4698
    pub async fn changed(&mut self) -> WatchedEvent {
        let mut event = self.receiver.recv().await;
        event.drain_root_path(self.chroot.root());
        event
    }

    /// Removes this watcher.
    ///
    /// # Cautions
    /// It is a best effect as ZooKeper ([ZOOKEEPER-4472][]) does not support persistent watch
    /// removing individually.
    ///
    /// [ZOOKEEPER-4472]: https://issues.apache.org/jira/browse/ZOOKEEPER-4472
    pub async fn remove(self) -> Result<(), Error> {
        self.receiver.remove().await
    }
}

impl WatchReceiver {
    pub fn into_oneshot(self, chroot: &OwnedChroot) -> OneshotWatcher {
        match self {
            WatchReceiver::None => unreachable!("expect oneshot watcher, got none watcher"),
            WatchReceiver::Oneshot(receiver) => OneshotWatcher::new(chroot.clone(), receiver),
            WatchReceiver::Persistent(_) => {
                unreachable!("expect oneshot watcher, got persistent watcher")
            },
        }
    }

    pub fn into_persistent(self, chroot: &OwnedChroot) -> PersistentWatcher {
        match self {
            WatchReceiver::None => unreachable!("expect oneshot watcher, got none watcher"),
            WatchReceiver::Oneshot(_) => {
                unreachable!("expect oneshot watcher, got oneshot watcher")
            },
            WatchReceiver::Persistent(receiver) => PersistentWatcher::new(chroot.clone(), receiver),
        }
    }
}