1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ElectLeadersRequest {
24 pub election_type: i8,
28
29 pub topic_partitions: Option<Vec<TopicPartitions>>,
33
34 pub timeout_ms: i32,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl ElectLeadersRequest {
44 pub fn with_election_type(mut self, value: i8) -> Self {
50 self.election_type = value;
51 self
52 }
53 pub fn with_topic_partitions(mut self, value: Option<Vec<TopicPartitions>>) -> Self {
59 self.topic_partitions = value;
60 self
61 }
62 pub fn with_timeout_ms(mut self, value: i32) -> Self {
68 self.timeout_ms = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for ElectLeadersRequest {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version >= 1 {
87 types::Int8.encode(buf, &self.election_type)?;
88 } else {
89 if self.election_type != 0 {
90 bail!("A field is set that is not available on the selected protocol version");
91 }
92 }
93 if version >= 2 {
94 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_partitions)?;
95 } else {
96 types::Array(types::Struct { version }).encode(buf, &self.topic_partitions)?;
97 }
98 types::Int32.encode(buf, &self.timeout_ms)?;
99 if version >= 2 {
100 let num_tagged_fields = self.unknown_tagged_fields.len();
101 if num_tagged_fields > std::u32::MAX as usize {
102 bail!(
103 "Too many tagged fields to encode ({} fields)",
104 num_tagged_fields
105 );
106 }
107 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
108
109 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
110 }
111 Ok(())
112 }
113 fn compute_size(&self, version: i16) -> Result<usize> {
114 let mut total_size = 0;
115 if version >= 1 {
116 total_size += types::Int8.compute_size(&self.election_type)?;
117 } else {
118 if self.election_type != 0 {
119 bail!("A field is set that is not available on the selected protocol version");
120 }
121 }
122 if version >= 2 {
123 total_size += types::CompactArray(types::Struct { version })
124 .compute_size(&self.topic_partitions)?;
125 } else {
126 total_size +=
127 types::Array(types::Struct { version }).compute_size(&self.topic_partitions)?;
128 }
129 total_size += types::Int32.compute_size(&self.timeout_ms)?;
130 if version >= 2 {
131 let num_tagged_fields = self.unknown_tagged_fields.len();
132 if num_tagged_fields > std::u32::MAX as usize {
133 bail!(
134 "Too many tagged fields to encode ({} fields)",
135 num_tagged_fields
136 );
137 }
138 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
139
140 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
141 }
142 Ok(total_size)
143 }
144}
145
146#[cfg(feature = "broker")]
147impl Decodable for ElectLeadersRequest {
148 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
149 let election_type = if version >= 1 {
150 types::Int8.decode(buf)?
151 } else {
152 0
153 };
154 let topic_partitions = if version >= 2 {
155 types::CompactArray(types::Struct { version }).decode(buf)?
156 } else {
157 types::Array(types::Struct { version }).decode(buf)?
158 };
159 let timeout_ms = types::Int32.decode(buf)?;
160 let mut unknown_tagged_fields = BTreeMap::new();
161 if version >= 2 {
162 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
163 for _ in 0..num_tagged_fields {
164 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
165 let size: u32 = types::UnsignedVarInt.decode(buf)?;
166 let unknown_value = buf.try_get_bytes(size as usize)?;
167 unknown_tagged_fields.insert(tag as i32, unknown_value);
168 }
169 }
170 Ok(Self {
171 election_type,
172 topic_partitions,
173 timeout_ms,
174 unknown_tagged_fields,
175 })
176 }
177}
178
179impl Default for ElectLeadersRequest {
180 fn default() -> Self {
181 Self {
182 election_type: 0,
183 topic_partitions: Some(Default::default()),
184 timeout_ms: 60000,
185 unknown_tagged_fields: BTreeMap::new(),
186 }
187 }
188}
189
190impl Message for ElectLeadersRequest {
191 const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
192 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
193}
194
195#[non_exhaustive]
197#[derive(Debug, Clone, PartialEq)]
198pub struct TopicPartitions {
199 pub topic: super::TopicName,
203
204 pub partitions: Vec<i32>,
208
209 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
211}
212
213impl TopicPartitions {
214 pub fn with_topic(mut self, value: super::TopicName) -> Self {
220 self.topic = value;
221 self
222 }
223 pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
229 self.partitions = value;
230 self
231 }
232 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
234 self.unknown_tagged_fields = value;
235 self
236 }
237 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
239 self.unknown_tagged_fields.insert(key, value);
240 self
241 }
242}
243
244#[cfg(feature = "client")]
245impl Encodable for TopicPartitions {
246 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
247 if version >= 2 {
248 types::CompactString.encode(buf, &self.topic)?;
249 } else {
250 types::String.encode(buf, &self.topic)?;
251 }
252 if version >= 2 {
253 types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
254 } else {
255 types::Array(types::Int32).encode(buf, &self.partitions)?;
256 }
257 if version >= 2 {
258 let num_tagged_fields = self.unknown_tagged_fields.len();
259 if num_tagged_fields > std::u32::MAX as usize {
260 bail!(
261 "Too many tagged fields to encode ({} fields)",
262 num_tagged_fields
263 );
264 }
265 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
266
267 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
268 }
269 Ok(())
270 }
271 fn compute_size(&self, version: i16) -> Result<usize> {
272 let mut total_size = 0;
273 if version >= 2 {
274 total_size += types::CompactString.compute_size(&self.topic)?;
275 } else {
276 total_size += types::String.compute_size(&self.topic)?;
277 }
278 if version >= 2 {
279 total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
280 } else {
281 total_size += types::Array(types::Int32).compute_size(&self.partitions)?;
282 }
283 if version >= 2 {
284 let num_tagged_fields = self.unknown_tagged_fields.len();
285 if num_tagged_fields > std::u32::MAX as usize {
286 bail!(
287 "Too many tagged fields to encode ({} fields)",
288 num_tagged_fields
289 );
290 }
291 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
292
293 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
294 }
295 Ok(total_size)
296 }
297}
298
299#[cfg(feature = "broker")]
300impl Decodable for TopicPartitions {
301 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
302 let topic = if version >= 2 {
303 types::CompactString.decode(buf)?
304 } else {
305 types::String.decode(buf)?
306 };
307 let partitions = if version >= 2 {
308 types::CompactArray(types::Int32).decode(buf)?
309 } else {
310 types::Array(types::Int32).decode(buf)?
311 };
312 let mut unknown_tagged_fields = BTreeMap::new();
313 if version >= 2 {
314 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
315 for _ in 0..num_tagged_fields {
316 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
317 let size: u32 = types::UnsignedVarInt.decode(buf)?;
318 let unknown_value = buf.try_get_bytes(size as usize)?;
319 unknown_tagged_fields.insert(tag as i32, unknown_value);
320 }
321 }
322 Ok(Self {
323 topic,
324 partitions,
325 unknown_tagged_fields,
326 })
327 }
328}
329
330impl Default for TopicPartitions {
331 fn default() -> Self {
332 Self {
333 topic: Default::default(),
334 partitions: Default::default(),
335 unknown_tagged_fields: BTreeMap::new(),
336 }
337 }
338}
339
340impl Message for TopicPartitions {
341 const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
342 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
343}
344
345impl HeaderVersion for ElectLeadersRequest {
346 fn header_version(version: i16) -> i16 {
347 if version >= 2 {
348 2
349 } else {
350 1
351 }
352 }
353}