zookeeper_client/session/
types.rs

1use derive_where::derive_where;
2use num_enum::{IntoPrimitive, TryFromPrimitive};
3use strum::EnumIter;
4
5use crate::error::Error;
6use crate::proto::AddWatchMode;
7use crate::util;
8
9/// Thin wrapper for zookeeper session id. It prints in hex format headed with 0x.
10#[derive(Copy, Clone, PartialEq, Eq)]
11pub struct SessionId(pub i64);
12
13impl std::fmt::Display for SessionId {
14    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15        write!(f, "{:#x}", self.0)
16    }
17}
18
19impl std::fmt::Debug for SessionId {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        std::fmt::Display::fmt(self, f)
22    }
23}
24
25/// ZooKeeper session info.
26#[derive(Clone)]
27#[derive_where(Debug)]
28pub struct SessionInfo {
29    pub(crate) id: SessionId,
30    #[derive_where(skip(Debug))]
31    pub(crate) password: Vec<u8>,
32    pub(crate) readonly: bool,
33    /// Only set through test otherwise 0.
34    ///
35    /// I thought to carry [Session::last_zxid] from [Client::session] to [Connector::session].
36    /// This way session reestablishment API has not major difference with internal reconnection.
37    /// But I think it is a ZooKeeper tradition to [Client::sync] after session reestablishment.
38    /// We probably should not challenge this.
39    pub(crate) last_zxid: i64,
40}
41
42impl SessionInfo {
43    pub(crate) fn new(id: SessionId, password: Vec<u8>) -> Self {
44        Self { id, password, readonly: id.0 == 0, last_zxid: 0 }
45    }
46
47    /// Session id.
48    pub fn id(&self) -> SessionId {
49        self.id
50    }
51
52    /// Is this an readonly session ?
53    ///
54    /// Readonly sessions are local to connected server thus not eligible for session reestablishment.
55    pub fn is_readonly(&self) -> bool {
56        self.readonly
57    }
58}
59
60/// ZooKeeper session states.
61#[derive(Copy, Clone, Debug, PartialEq, Eq, strum::Display)]
62pub enum SessionState {
63    /// Intermediate state states that client is disconnected from zookeeper cluster.
64    ///
65    /// In case of all clients dropped, this state is not reported. This differs from Java client
66    /// which is indeterminate currently. See [ZOOKEEPER-4702][].
67    ///
68    /// [ZOOKEEPER-4702]: https://issues.apache.org/jira/browse/ZOOKEEPER-4702
69    Disconnected,
70
71    /// Intermediate state states that client has recovered from disconnected state.
72    SyncConnected,
73
74    /// Terminal state states that client failed in authentication.
75    AuthFailed,
76
77    /// Intermediate state states that client is connected to a readonly server.
78    ConnectedReadOnly,
79
80    /// Terminal state states that zookeeper session is expired.
81    Expired,
82
83    /// Terminal state states that zookeeper client has been closed.
84    Closed,
85}
86
87impl SessionState {
88    pub(crate) fn from_server(state: i32) -> Result<SessionState, Error> {
89        let session_state = match state {
90            3 => SessionState::SyncConnected,
91            _ => return Err(Error::UnexpectedError(format!("keeper state value should not be {}", state))),
92        };
93        Ok(session_state)
94    }
95
96    pub fn is_terminated(self) -> bool {
97        use SessionState::*;
98        matches!(self, AuthFailed | Expired | Closed)
99    }
100
101    pub(crate) fn is_connected(self) -> bool {
102        self == SessionState::SyncConnected || self == SessionState::ConnectedReadOnly
103    }
104
105    pub(crate) fn to_error(self) -> Error {
106        match self {
107            SessionState::Disconnected => Error::ConnectionLoss,
108            SessionState::AuthFailed => Error::AuthFailed,
109            SessionState::Expired => Error::SessionExpired,
110            SessionState::Closed => Error::ClientClosed,
111            _ => Error::UnexpectedError(format!("expect error state, got {:?}", self)),
112        }
113    }
114}
115
116/// WatchedEvent represents update to watched node or session.
117#[derive(Clone, Debug, PartialEq, Eq)]
118#[non_exhaustive]
119pub struct WatchedEvent {
120    pub event_type: EventType,
121
122    /// Updated state for state event. No meaning in node events.
123    ///
124    /// # Caution
125    /// There will be no more updates after terminal session state.
126    pub session_state: SessionState,
127
128    /// Node path from chroot. Empty if this is a state event.
129    pub path: String,
130
131    /// `zxid` of the transaction that triggered this node event, [WatchedEvent::NO_ZXID] for session event.
132    ///
133    /// # Notable behaviors
134    /// * This feature was shipped in 3.9.0, `zxid` wil be [WatchedEvent::NO_ZXID] for earlier versions. See [ZOOKEEPER-4655].
135    /// * It is possible to receive multiple events with same `zxid` and even same `path` due to [crate::MultiWriter]. See [ZOOKEEPER-4695].
136    ///
137    /// [ZOOKEEPER-4655]: https://issues.apache.org/jira/browse/ZOOKEEPER-4655
138    /// [ZOOKEEPER-4695]: https://issues.apache.org/jira/browse/ZOOKEEPER-4695
139    pub zxid: i64,
140}
141
142impl WatchedEvent {
143    pub const NO_ZXID: i64 = -1;
144
145    pub fn new(event: EventType, path: impl Into<String>) -> Self {
146        debug_assert_ne!(event, EventType::Session);
147        Self { event_type: event, session_state: SessionState::SyncConnected, path: path.into(), zxid: Self::NO_ZXID }
148    }
149
150    pub fn with_zxid(self, zxid: i64) -> Self {
151        debug_assert_ne!(self.event_type, EventType::Session);
152        Self { zxid, ..self }
153    }
154
155    pub fn new_session(state: SessionState) -> Self {
156        Self { event_type: EventType::Session, session_state: state, path: Default::default(), zxid: Self::NO_ZXID }
157    }
158
159    pub(crate) fn drain_root_path(&mut self, root: &str) {
160        if self.event_type != EventType::Session && !root.is_empty() {
161            util::drain_root_path(&mut self.path, root).unwrap();
162        }
163    }
164}
165
166/// Event type for watch notifications.
167#[derive(Copy, Clone, Debug, PartialEq, Eq, strum::Display)]
168pub enum EventType {
169    Session,
170    NodeCreated,
171    NodeDeleted,
172    NodeDataChanged,
173    NodeChildrenChanged,
174}
175
176impl EventType {
177    pub(crate) fn from_server(i: i32) -> Result<EventType, Error> {
178        let event_type = match i {
179            1 => EventType::NodeCreated,
180            2 => EventType::NodeDeleted,
181            3 => EventType::NodeDataChanged,
182            4 => EventType::NodeChildrenChanged,
183            _ => return Err(Error::UnexpectedError(format!("event type should not be {}", i))),
184        };
185        Ok(event_type)
186    }
187}
188
189#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoPrimitive, TryFromPrimitive, EnumIter)]
190#[repr(i32)]
191pub enum WatchMode {
192    Child = 1,
193    Data = 2,
194    Any = 3,
195    PersistentNode = 4,
196    PersistentRecursive = 5,
197}
198
199impl From<AddWatchMode> for WatchMode {
200    fn from(mode: AddWatchMode) -> Self {
201        match mode {
202            AddWatchMode::Persistent => WatchMode::PersistentNode,
203            AddWatchMode::PersistentRecursive => WatchMode::PersistentRecursive,
204        }
205    }
206}