zookeeper_client/proto/
connect.rs1use bytes::BufMut;
2use derive_where::derive_where;
3
4use crate::record::{
5 DeserializableRecord,
6 DeserializeError,
7 DynamicRecord,
8 ReadingBuf,
9 SerializableRecord,
10 StaticRecord,
11 UnsafeBuf,
12};
13
14#[derive_where(Debug)]
15pub struct ConnectRequest<'a> {
16 pub protocol_version: i32,
17 pub last_zxid_seen: i64,
18 pub timeout: i32,
19 pub session_id: i64,
20 #[derive_where(skip(Debug))]
21 pub password: &'a [u8],
22 pub readonly: bool,
23}
24
25impl SerializableRecord for ConnectRequest<'_> {
26 fn serialize(&self, buf: &mut dyn BufMut) {
27 self.protocol_version.serialize(buf);
28 self.last_zxid_seen.serialize(buf);
29 self.timeout.serialize(buf);
30 self.session_id.serialize(buf);
31 self.password.serialize(buf);
32 self.readonly.serialize(buf);
33 }
34}
35
36impl DynamicRecord for ConnectRequest<'_> {
37 fn serialized_len(&self) -> usize {
38 2 * i32::record_len() + 2 * i64::record_len() + self.password.serialized_len() + bool::record_len()
39 }
40}
41
42#[derive_where(Debug)]
43pub struct ConnectResponse<'a> {
44 #[allow(dead_code)]
45 pub protocol_version: i32,
46 pub session_timeout: i32,
47 pub session_id: i64,
48 #[derive_where(skip(Debug))]
49 pub password: &'a [u8],
50 pub readonly: bool,
51}
52
53impl<'a> DeserializableRecord<'a> for ConnectResponse<'a> {
54 type Error = DeserializeError;
55
56 fn deserialize(buf: &mut ReadingBuf<'a>) -> Result<Self, Self::Error> {
57 let min_record_len = 4 + 4 + 8 + 4;
58 if buf.len() < min_record_len {
59 return Err(DeserializeError::InsufficientBuf);
60 }
61 let protocol_version = unsafe { buf.get_unchecked_i32() };
62 let session_timeout = unsafe { buf.get_unchecked_i32() };
63 let session_id = unsafe { buf.get_unchecked_i64() };
64 if protocol_version != 0 {
65 return Err(DeserializeError::UnmarshalError(format!("unsupported server version {protocol_version}")));
66 } else if session_timeout < 0 {
67 return Err(DeserializeError::UnmarshalError(format!(
68 "invalid negotiated session timeout {session_timeout}"
69 )));
70 }
71 let len = unsafe { buf.get_unchecked_i32() };
72 if len <= 0 || len > buf.len() as i32 {
73 return Err(DeserializeError::UnmarshalError(format!("invalid session password length {len}")));
74 }
75 let len = len as usize;
76 let password = unsafe { buf.get_unchecked(..len) };
77 let readonly = if len == buf.len() { 0 } else { unsafe { *buf.get_unchecked(len) } };
78 if readonly != 0 && readonly != 1 {
79 return Err(DeserializeError::UnmarshalError(format!("invalid session readonly value {readonly}")));
80 }
81 Ok(ConnectResponse { protocol_version, session_timeout, session_id, password, readonly: readonly == 1 })
82 }
83}