zookeeper_client/proto/
reconfig.rs

1use bytes::BufMut;
2
3use crate::record::{DynamicRecord, SerializableRecord};
4
5struct ServerList<'a, 'b: 'a, T: Iterator<Item = &'b str> + Clone>(&'a T);
6
7impl<'b, T: Iterator<Item = &'b str> + Clone> SerializableRecord for ServerList<'_, 'b, T> {
8    fn serialize(&self, buf: &mut dyn BufMut) {
9        let n = self.serialized_len();
10        if n == 4 {
11            buf.put_i32(-1);
12            return;
13        }
14        buf.put_i32(n as i32 - 4);
15        self.0.clone().filter(|s| !s.is_empty()).enumerate().for_each(|(i, s)| {
16            if i != 0 {
17                buf.put_u8(b',');
18            }
19            buf.put_slice(s.as_bytes());
20        });
21    }
22}
23
24impl<'b, T: Iterator<Item = &'b str> + Clone> DynamicRecord for ServerList<'_, 'b, T> {
25    fn serialized_len(&self) -> usize {
26        let n: usize = self.0.clone().filter(|s| !s.is_empty()).map(|s| s.len() + 1).sum();
27        4 + if n > 0 { n - 1 } else { 0 }
28    }
29}
30
31/// EnsembleUpdate specifies an update to ZooKeeper ensemble membership.
32///
33/// The item could be single server or comma separated server list.
34pub enum EnsembleUpdate<'a, T: Iterator<Item = &'a str> + Clone> {
35    Incremental {
36        /// Joining servers.
37        joinings: T,
38
39        /// Leaving servers.
40        leavings: T,
41    },
42    New {
43        /// New ensemble.
44        ensemble: T,
45    },
46}
47
48pub struct ReconfigRequest<'a, T: Iterator<Item = &'a str> + Clone> {
49    pub update: EnsembleUpdate<'a, T>,
50    pub version: i64,
51}
52
53impl<'a, T: Iterator<Item = &'a str> + Clone> SerializableRecord for ReconfigRequest<'a, T> {
54    fn serialize(&self, buf: &mut dyn BufMut) {
55        match &self.update {
56            EnsembleUpdate::Incremental { joinings, leavings } => {
57                ServerList(joinings).serialize(buf);
58                ServerList(leavings).serialize(buf);
59                buf.put_i32(-1);
60            },
61            EnsembleUpdate::New { ensemble } => {
62                buf.put_i32(-1);
63                buf.put_i32(-1);
64                ServerList(ensemble).serialize(buf);
65            },
66        };
67        self.version.serialize(buf);
68    }
69}
70
71impl<'a, T: Iterator<Item = &'a str> + Clone> DynamicRecord for ReconfigRequest<'a, T> {
72    fn serialized_len(&self) -> usize {
73        let n = match &self.update {
74            EnsembleUpdate::Incremental { joinings, leavings } => {
75                4 + ServerList(joinings).serialized_len() + ServerList(leavings).serialized_len()
76            },
77            EnsembleUpdate::New { ensemble } => {
78                let servers = ServerList(ensemble);
79                8 + servers.serialized_len()
80            },
81        };
82        n + 8
83    }
84}