zookeeper_client/proto/
reconfig.rs1use bytes::BufMut;
2
3use crate::record::{DynamicRecord, SerializableRecord};
4
5struct ServerList<'a, 'b: 'a, T: Iterator<Item = &'b str> + Clone>(&'a T);
6
7impl<'b, T: Iterator<Item = &'b str> + Clone> SerializableRecord for ServerList<'_, 'b, T> {
8 fn serialize(&self, buf: &mut dyn BufMut) {
9 let n = self.serialized_len();
10 if n == 4 {
11 buf.put_i32(-1);
12 return;
13 }
14 buf.put_i32(n as i32 - 4);
15 self.0.clone().filter(|s| !s.is_empty()).enumerate().for_each(|(i, s)| {
16 if i != 0 {
17 buf.put_u8(b',');
18 }
19 buf.put_slice(s.as_bytes());
20 });
21 }
22}
23
24impl<'b, T: Iterator<Item = &'b str> + Clone> DynamicRecord for ServerList<'_, 'b, T> {
25 fn serialized_len(&self) -> usize {
26 let n: usize = self.0.clone().filter(|s| !s.is_empty()).map(|s| s.len() + 1).sum();
27 4 + if n > 0 { n - 1 } else { 0 }
28 }
29}
30
31pub enum EnsembleUpdate<'a, T: Iterator<Item = &'a str> + Clone> {
35 Incremental {
36 joinings: T,
38
39 leavings: T,
41 },
42 New {
43 ensemble: T,
45 },
46}
47
48pub struct ReconfigRequest<'a, T: Iterator<Item = &'a str> + Clone> {
49 pub update: EnsembleUpdate<'a, T>,
50 pub version: i64,
51}
52
53impl<'a, T: Iterator<Item = &'a str> + Clone> SerializableRecord for ReconfigRequest<'a, T> {
54 fn serialize(&self, buf: &mut dyn BufMut) {
55 match &self.update {
56 EnsembleUpdate::Incremental { joinings, leavings } => {
57 ServerList(joinings).serialize(buf);
58 ServerList(leavings).serialize(buf);
59 buf.put_i32(-1);
60 },
61 EnsembleUpdate::New { ensemble } => {
62 buf.put_i32(-1);
63 buf.put_i32(-1);
64 ServerList(ensemble).serialize(buf);
65 },
66 };
67 self.version.serialize(buf);
68 }
69}
70
71impl<'a, T: Iterator<Item = &'a str> + Clone> DynamicRecord for ReconfigRequest<'a, T> {
72 fn serialized_len(&self) -> usize {
73 let n = match &self.update {
74 EnsembleUpdate::Incremental { joinings, leavings } => {
75 4 + ServerList(joinings).serialized_len() + ServerList(leavings).serialized_len()
76 },
77 EnsembleUpdate::New { ensemble } => {
78 let servers = ServerList(ensemble);
79 8 + servers.serialized_len()
80 },
81 };
82 n + 8
83 }
84}