1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct LeaderChangeMessage {
24 pub version: i16,
28
29 pub leader_id: super::BrokerId,
33
34 pub voters: Vec<Voter>,
38
39 pub granting_voters: Vec<Voter>,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl LeaderChangeMessage {
49 pub fn with_version(mut self, value: i16) -> Self {
55 self.version = value;
56 self
57 }
58 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
64 self.leader_id = value;
65 self
66 }
67 pub fn with_voters(mut self, value: Vec<Voter>) -> Self {
73 self.voters = value;
74 self
75 }
76 pub fn with_granting_voters(mut self, value: Vec<Voter>) -> Self {
82 self.granting_voters = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97impl Encodable for LeaderChangeMessage {
98 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
99 if version < 0 || version > 1 {
100 bail!("specified version not supported by this message type");
101 }
102 types::Int16.encode(buf, &self.version)?;
103 types::Int32.encode(buf, &self.leader_id)?;
104 types::CompactArray(types::Struct { version }).encode(buf, &self.voters)?;
105 types::CompactArray(types::Struct { version }).encode(buf, &self.granting_voters)?;
106 let num_tagged_fields = self.unknown_tagged_fields.len();
107 if num_tagged_fields > std::u32::MAX as usize {
108 bail!(
109 "Too many tagged fields to encode ({} fields)",
110 num_tagged_fields
111 );
112 }
113 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
114
115 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
116 Ok(())
117 }
118 fn compute_size(&self, version: i16) -> Result<usize> {
119 let mut total_size = 0;
120 total_size += types::Int16.compute_size(&self.version)?;
121 total_size += types::Int32.compute_size(&self.leader_id)?;
122 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.voters)?;
123 total_size +=
124 types::CompactArray(types::Struct { version }).compute_size(&self.granting_voters)?;
125 let num_tagged_fields = self.unknown_tagged_fields.len();
126 if num_tagged_fields > std::u32::MAX as usize {
127 bail!(
128 "Too many tagged fields to encode ({} fields)",
129 num_tagged_fields
130 );
131 }
132 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
133
134 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
135 Ok(total_size)
136 }
137}
138
139impl Decodable for LeaderChangeMessage {
140 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
141 if version < 0 || version > 1 {
142 bail!("specified version not supported by this message type");
143 }
144 let version = types::Int16.decode(buf)?;
145 let leader_id = types::Int32.decode(buf)?;
146 let voters = types::CompactArray(types::Struct { version }).decode(buf)?;
147 let granting_voters = types::CompactArray(types::Struct { version }).decode(buf)?;
148 let mut unknown_tagged_fields = BTreeMap::new();
149 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
150 for _ in 0..num_tagged_fields {
151 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
152 let size: u32 = types::UnsignedVarInt.decode(buf)?;
153 let unknown_value = buf.try_get_bytes(size as usize)?;
154 unknown_tagged_fields.insert(tag as i32, unknown_value);
155 }
156 Ok(Self {
157 version,
158 leader_id,
159 voters,
160 granting_voters,
161 unknown_tagged_fields,
162 })
163 }
164}
165
166impl Default for LeaderChangeMessage {
167 fn default() -> Self {
168 Self {
169 version: 0,
170 leader_id: (0).into(),
171 voters: Default::default(),
172 granting_voters: Default::default(),
173 unknown_tagged_fields: BTreeMap::new(),
174 }
175 }
176}
177
178impl Message for LeaderChangeMessage {
179 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
180 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
181}
182
183#[non_exhaustive]
185#[derive(Debug, Clone, PartialEq)]
186pub struct Voter {
187 pub voter_id: i32,
191
192 pub voter_directory_id: Uuid,
196
197 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
199}
200
201impl Voter {
202 pub fn with_voter_id(mut self, value: i32) -> Self {
208 self.voter_id = value;
209 self
210 }
211 pub fn with_voter_directory_id(mut self, value: Uuid) -> Self {
217 self.voter_directory_id = value;
218 self
219 }
220 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
222 self.unknown_tagged_fields = value;
223 self
224 }
225 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
227 self.unknown_tagged_fields.insert(key, value);
228 self
229 }
230}
231
232impl Encodable for Voter {
233 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
234 if version < 0 || version > 1 {
235 bail!("specified version not supported by this message type");
236 }
237 types::Int32.encode(buf, &self.voter_id)?;
238 if version >= 1 {
239 types::Uuid.encode(buf, &self.voter_directory_id)?;
240 } else {
241 if &self.voter_directory_id != &Uuid::nil() {
242 bail!("A field is set that is not available on the selected protocol version");
243 }
244 }
245 let num_tagged_fields = self.unknown_tagged_fields.len();
246 if num_tagged_fields > std::u32::MAX as usize {
247 bail!(
248 "Too many tagged fields to encode ({} fields)",
249 num_tagged_fields
250 );
251 }
252 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
253
254 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
255 Ok(())
256 }
257 fn compute_size(&self, version: i16) -> Result<usize> {
258 let mut total_size = 0;
259 total_size += types::Int32.compute_size(&self.voter_id)?;
260 if version >= 1 {
261 total_size += types::Uuid.compute_size(&self.voter_directory_id)?;
262 } else {
263 if &self.voter_directory_id != &Uuid::nil() {
264 bail!("A field is set that is not available on the selected protocol version");
265 }
266 }
267 let num_tagged_fields = self.unknown_tagged_fields.len();
268 if num_tagged_fields > std::u32::MAX as usize {
269 bail!(
270 "Too many tagged fields to encode ({} fields)",
271 num_tagged_fields
272 );
273 }
274 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
275
276 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
277 Ok(total_size)
278 }
279}
280
281impl Decodable for Voter {
282 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
283 if version < 0 || version > 1 {
284 bail!("specified version not supported by this message type");
285 }
286 let voter_id = types::Int32.decode(buf)?;
287 let voter_directory_id = if version >= 1 {
288 types::Uuid.decode(buf)?
289 } else {
290 Uuid::nil()
291 };
292 let mut unknown_tagged_fields = BTreeMap::new();
293 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
294 for _ in 0..num_tagged_fields {
295 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
296 let size: u32 = types::UnsignedVarInt.decode(buf)?;
297 let unknown_value = buf.try_get_bytes(size as usize)?;
298 unknown_tagged_fields.insert(tag as i32, unknown_value);
299 }
300 Ok(Self {
301 voter_id,
302 voter_directory_id,
303 unknown_tagged_fields,
304 })
305 }
306}
307
308impl Default for Voter {
309 fn default() -> Self {
310 Self {
311 voter_id: 0,
312 voter_directory_id: Uuid::nil(),
313 unknown_tagged_fields: BTreeMap::new(),
314 }
315 }
316}
317
318impl Message for Voter {
319 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
320 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
321}