zookeeper_client/proto/
connect.rs

1use bytes::BufMut;
2use derive_where::derive_where;
3
4use crate::record::{
5    DeserializableRecord,
6    DeserializeError,
7    DynamicRecord,
8    ReadingBuf,
9    SerializableRecord,
10    StaticRecord,
11    UnsafeBuf,
12};
13
14#[derive_where(Debug)]
15pub struct ConnectRequest<'a> {
16    pub protocol_version: i32,
17    pub last_zxid_seen: i64,
18    pub timeout: i32,
19    pub session_id: i64,
20    #[derive_where(skip(Debug))]
21    pub password: &'a [u8],
22    pub readonly: bool,
23}
24
25impl SerializableRecord for ConnectRequest<'_> {
26    fn serialize(&self, buf: &mut dyn BufMut) {
27        self.protocol_version.serialize(buf);
28        self.last_zxid_seen.serialize(buf);
29        self.timeout.serialize(buf);
30        self.session_id.serialize(buf);
31        self.password.serialize(buf);
32        self.readonly.serialize(buf);
33    }
34}
35
36impl DynamicRecord for ConnectRequest<'_> {
37    fn serialized_len(&self) -> usize {
38        2 * i32::record_len() + 2 * i64::record_len() + self.password.serialized_len() + bool::record_len()
39    }
40}
41
42#[derive_where(Debug)]
43pub struct ConnectResponse<'a> {
44    #[allow(dead_code)]
45    pub protocol_version: i32,
46    pub session_timeout: i32,
47    pub session_id: i64,
48    #[derive_where(skip(Debug))]
49    pub password: &'a [u8],
50    pub readonly: bool,
51}
52
53impl<'a> DeserializableRecord<'a> for ConnectResponse<'a> {
54    type Error = DeserializeError;
55
56    fn deserialize(buf: &mut ReadingBuf<'a>) -> Result<Self, Self::Error> {
57        let min_record_len = 4 + 4 + 8 + 4;
58        if buf.len() < min_record_len {
59            return Err(DeserializeError::InsufficientBuf);
60        }
61        let protocol_version = unsafe { buf.get_unchecked_i32() };
62        let session_timeout = unsafe { buf.get_unchecked_i32() };
63        let session_id = unsafe { buf.get_unchecked_i64() };
64        if protocol_version != 0 {
65            return Err(DeserializeError::UnmarshalError(format!("unsupported server version {protocol_version}")));
66        } else if session_timeout < 0 {
67            return Err(DeserializeError::UnmarshalError(format!(
68                "invalid negotiated session timeout {session_timeout}"
69            )));
70        }
71        let len = unsafe { buf.get_unchecked_i32() };
72        if len <= 0 || len > buf.len() as i32 {
73            return Err(DeserializeError::UnmarshalError(format!("invalid session password length {len}")));
74        }
75        let len = len as usize;
76        let password = unsafe { buf.get_unchecked(..len) };
77        let readonly = if len == buf.len() { 0 } else { unsafe { *buf.get_unchecked(len) } };
78        if readonly != 0 && readonly != 1 {
79            return Err(DeserializeError::UnmarshalError(format!("invalid session readonly value {readonly}")));
80        }
81        Ok(ConnectResponse { protocol_version, session_timeout, session_id, password, readonly: readonly == 1 })
82    }
83}