1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ElectLeadersRequest {
24 pub election_type: i8,
28
29 pub topic_partitions: Option<Vec<TopicPartitions>>,
33
34 pub timeout_ms: i32,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl ElectLeadersRequest {
44 pub fn with_election_type(mut self, value: i8) -> Self {
50 self.election_type = value;
51 self
52 }
53 pub fn with_topic_partitions(mut self, value: Option<Vec<TopicPartitions>>) -> Self {
59 self.topic_partitions = value;
60 self
61 }
62 pub fn with_timeout_ms(mut self, value: i32) -> Self {
68 self.timeout_ms = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for ElectLeadersRequest {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version < 0 || version > 2 {
87 bail!("specified version not supported by this message type");
88 }
89 if version >= 1 {
90 types::Int8.encode(buf, &self.election_type)?;
91 } else {
92 if self.election_type != 0 {
93 bail!("A field is set that is not available on the selected protocol version");
94 }
95 }
96 if version >= 2 {
97 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_partitions)?;
98 } else {
99 types::Array(types::Struct { version }).encode(buf, &self.topic_partitions)?;
100 }
101 types::Int32.encode(buf, &self.timeout_ms)?;
102 if version >= 2 {
103 let num_tagged_fields = self.unknown_tagged_fields.len();
104 if num_tagged_fields > std::u32::MAX as usize {
105 bail!(
106 "Too many tagged fields to encode ({} fields)",
107 num_tagged_fields
108 );
109 }
110 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
111
112 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
113 }
114 Ok(())
115 }
116 fn compute_size(&self, version: i16) -> Result<usize> {
117 let mut total_size = 0;
118 if version >= 1 {
119 total_size += types::Int8.compute_size(&self.election_type)?;
120 } else {
121 if self.election_type != 0 {
122 bail!("A field is set that is not available on the selected protocol version");
123 }
124 }
125 if version >= 2 {
126 total_size += types::CompactArray(types::Struct { version })
127 .compute_size(&self.topic_partitions)?;
128 } else {
129 total_size +=
130 types::Array(types::Struct { version }).compute_size(&self.topic_partitions)?;
131 }
132 total_size += types::Int32.compute_size(&self.timeout_ms)?;
133 if version >= 2 {
134 let num_tagged_fields = self.unknown_tagged_fields.len();
135 if num_tagged_fields > std::u32::MAX as usize {
136 bail!(
137 "Too many tagged fields to encode ({} fields)",
138 num_tagged_fields
139 );
140 }
141 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
142
143 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
144 }
145 Ok(total_size)
146 }
147}
148
149#[cfg(feature = "broker")]
150impl Decodable for ElectLeadersRequest {
151 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
152 if version < 0 || version > 2 {
153 bail!("specified version not supported by this message type");
154 }
155 let election_type = if version >= 1 {
156 types::Int8.decode(buf)?
157 } else {
158 0
159 };
160 let topic_partitions = if version >= 2 {
161 types::CompactArray(types::Struct { version }).decode(buf)?
162 } else {
163 types::Array(types::Struct { version }).decode(buf)?
164 };
165 let timeout_ms = types::Int32.decode(buf)?;
166 let mut unknown_tagged_fields = BTreeMap::new();
167 if version >= 2 {
168 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
169 for _ in 0..num_tagged_fields {
170 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
171 let size: u32 = types::UnsignedVarInt.decode(buf)?;
172 let unknown_value = buf.try_get_bytes(size as usize)?;
173 unknown_tagged_fields.insert(tag as i32, unknown_value);
174 }
175 }
176 Ok(Self {
177 election_type,
178 topic_partitions,
179 timeout_ms,
180 unknown_tagged_fields,
181 })
182 }
183}
184
185impl Default for ElectLeadersRequest {
186 fn default() -> Self {
187 Self {
188 election_type: 0,
189 topic_partitions: Some(Default::default()),
190 timeout_ms: 60000,
191 unknown_tagged_fields: BTreeMap::new(),
192 }
193 }
194}
195
196impl Message for ElectLeadersRequest {
197 const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
198 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
199}
200
201#[non_exhaustive]
203#[derive(Debug, Clone, PartialEq)]
204pub struct TopicPartitions {
205 pub topic: super::TopicName,
209
210 pub partitions: Vec<i32>,
214
215 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
217}
218
219impl TopicPartitions {
220 pub fn with_topic(mut self, value: super::TopicName) -> Self {
226 self.topic = value;
227 self
228 }
229 pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
235 self.partitions = value;
236 self
237 }
238 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
240 self.unknown_tagged_fields = value;
241 self
242 }
243 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
245 self.unknown_tagged_fields.insert(key, value);
246 self
247 }
248}
249
250#[cfg(feature = "client")]
251impl Encodable for TopicPartitions {
252 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
253 if version < 0 || version > 2 {
254 bail!("specified version not supported by this message type");
255 }
256 if version >= 2 {
257 types::CompactString.encode(buf, &self.topic)?;
258 } else {
259 types::String.encode(buf, &self.topic)?;
260 }
261 if version >= 2 {
262 types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
263 } else {
264 types::Array(types::Int32).encode(buf, &self.partitions)?;
265 }
266 if version >= 2 {
267 let num_tagged_fields = self.unknown_tagged_fields.len();
268 if num_tagged_fields > std::u32::MAX as usize {
269 bail!(
270 "Too many tagged fields to encode ({} fields)",
271 num_tagged_fields
272 );
273 }
274 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
275
276 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
277 }
278 Ok(())
279 }
280 fn compute_size(&self, version: i16) -> Result<usize> {
281 let mut total_size = 0;
282 if version >= 2 {
283 total_size += types::CompactString.compute_size(&self.topic)?;
284 } else {
285 total_size += types::String.compute_size(&self.topic)?;
286 }
287 if version >= 2 {
288 total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
289 } else {
290 total_size += types::Array(types::Int32).compute_size(&self.partitions)?;
291 }
292 if version >= 2 {
293 let num_tagged_fields = self.unknown_tagged_fields.len();
294 if num_tagged_fields > std::u32::MAX as usize {
295 bail!(
296 "Too many tagged fields to encode ({} fields)",
297 num_tagged_fields
298 );
299 }
300 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
301
302 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
303 }
304 Ok(total_size)
305 }
306}
307
308#[cfg(feature = "broker")]
309impl Decodable for TopicPartitions {
310 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
311 if version < 0 || version > 2 {
312 bail!("specified version not supported by this message type");
313 }
314 let topic = if version >= 2 {
315 types::CompactString.decode(buf)?
316 } else {
317 types::String.decode(buf)?
318 };
319 let partitions = if version >= 2 {
320 types::CompactArray(types::Int32).decode(buf)?
321 } else {
322 types::Array(types::Int32).decode(buf)?
323 };
324 let mut unknown_tagged_fields = BTreeMap::new();
325 if version >= 2 {
326 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
327 for _ in 0..num_tagged_fields {
328 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
329 let size: u32 = types::UnsignedVarInt.decode(buf)?;
330 let unknown_value = buf.try_get_bytes(size as usize)?;
331 unknown_tagged_fields.insert(tag as i32, unknown_value);
332 }
333 }
334 Ok(Self {
335 topic,
336 partitions,
337 unknown_tagged_fields,
338 })
339 }
340}
341
342impl Default for TopicPartitions {
343 fn default() -> Self {
344 Self {
345 topic: Default::default(),
346 partitions: Default::default(),
347 unknown_tagged_fields: BTreeMap::new(),
348 }
349 }
350}
351
352impl Message for TopicPartitions {
353 const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
354 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
355}
356
357impl HeaderVersion for ElectLeadersRequest {
358 fn header_version(version: i16) -> i16 {
359 if version >= 2 {
360 2
361 } else {
362 1
363 }
364 }
365}