1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct PartitionData {
24 pub partition_index: i32,
28
29 pub candidate_epoch: i32,
33
34 pub candidate_id: super::BrokerId,
38
39 pub candidate_directory_id: Uuid,
43
44 pub voter_directory_id: Uuid,
48
49 pub last_offset_epoch: i32,
53
54 pub last_offset: i64,
58
59 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
61}
62
63impl PartitionData {
64 pub fn with_partition_index(mut self, value: i32) -> Self {
70 self.partition_index = value;
71 self
72 }
73 pub fn with_candidate_epoch(mut self, value: i32) -> Self {
79 self.candidate_epoch = value;
80 self
81 }
82 pub fn with_candidate_id(mut self, value: super::BrokerId) -> Self {
88 self.candidate_id = value;
89 self
90 }
91 pub fn with_candidate_directory_id(mut self, value: Uuid) -> Self {
97 self.candidate_directory_id = value;
98 self
99 }
100 pub fn with_voter_directory_id(mut self, value: Uuid) -> Self {
106 self.voter_directory_id = value;
107 self
108 }
109 pub fn with_last_offset_epoch(mut self, value: i32) -> Self {
115 self.last_offset_epoch = value;
116 self
117 }
118 pub fn with_last_offset(mut self, value: i64) -> Self {
124 self.last_offset = value;
125 self
126 }
127 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
129 self.unknown_tagged_fields = value;
130 self
131 }
132 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
134 self.unknown_tagged_fields.insert(key, value);
135 self
136 }
137}
138
139#[cfg(feature = "client")]
140impl Encodable for PartitionData {
141 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
142 if version < 0 || version > 1 {
143 bail!("specified version not supported by this message type");
144 }
145 types::Int32.encode(buf, &self.partition_index)?;
146 types::Int32.encode(buf, &self.candidate_epoch)?;
147 types::Int32.encode(buf, &self.candidate_id)?;
148 if version >= 1 {
149 types::Uuid.encode(buf, &self.candidate_directory_id)?;
150 }
151 if version >= 1 {
152 types::Uuid.encode(buf, &self.voter_directory_id)?;
153 }
154 types::Int32.encode(buf, &self.last_offset_epoch)?;
155 types::Int64.encode(buf, &self.last_offset)?;
156 let num_tagged_fields = self.unknown_tagged_fields.len();
157 if num_tagged_fields > std::u32::MAX as usize {
158 bail!(
159 "Too many tagged fields to encode ({} fields)",
160 num_tagged_fields
161 );
162 }
163 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
164
165 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
166 Ok(())
167 }
168 fn compute_size(&self, version: i16) -> Result<usize> {
169 let mut total_size = 0;
170 total_size += types::Int32.compute_size(&self.partition_index)?;
171 total_size += types::Int32.compute_size(&self.candidate_epoch)?;
172 total_size += types::Int32.compute_size(&self.candidate_id)?;
173 if version >= 1 {
174 total_size += types::Uuid.compute_size(&self.candidate_directory_id)?;
175 }
176 if version >= 1 {
177 total_size += types::Uuid.compute_size(&self.voter_directory_id)?;
178 }
179 total_size += types::Int32.compute_size(&self.last_offset_epoch)?;
180 total_size += types::Int64.compute_size(&self.last_offset)?;
181 let num_tagged_fields = self.unknown_tagged_fields.len();
182 if num_tagged_fields > std::u32::MAX as usize {
183 bail!(
184 "Too many tagged fields to encode ({} fields)",
185 num_tagged_fields
186 );
187 }
188 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
189
190 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
191 Ok(total_size)
192 }
193}
194
195#[cfg(feature = "broker")]
196impl Decodable for PartitionData {
197 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
198 if version < 0 || version > 1 {
199 bail!("specified version not supported by this message type");
200 }
201 let partition_index = types::Int32.decode(buf)?;
202 let candidate_epoch = types::Int32.decode(buf)?;
203 let candidate_id = types::Int32.decode(buf)?;
204 let candidate_directory_id = if version >= 1 {
205 types::Uuid.decode(buf)?
206 } else {
207 Uuid::nil()
208 };
209 let voter_directory_id = if version >= 1 {
210 types::Uuid.decode(buf)?
211 } else {
212 Uuid::nil()
213 };
214 let last_offset_epoch = types::Int32.decode(buf)?;
215 let last_offset = types::Int64.decode(buf)?;
216 let mut unknown_tagged_fields = BTreeMap::new();
217 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
218 for _ in 0..num_tagged_fields {
219 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
220 let size: u32 = types::UnsignedVarInt.decode(buf)?;
221 let unknown_value = buf.try_get_bytes(size as usize)?;
222 unknown_tagged_fields.insert(tag as i32, unknown_value);
223 }
224 Ok(Self {
225 partition_index,
226 candidate_epoch,
227 candidate_id,
228 candidate_directory_id,
229 voter_directory_id,
230 last_offset_epoch,
231 last_offset,
232 unknown_tagged_fields,
233 })
234 }
235}
236
237impl Default for PartitionData {
238 fn default() -> Self {
239 Self {
240 partition_index: 0,
241 candidate_epoch: 0,
242 candidate_id: (0).into(),
243 candidate_directory_id: Uuid::nil(),
244 voter_directory_id: Uuid::nil(),
245 last_offset_epoch: 0,
246 last_offset: 0,
247 unknown_tagged_fields: BTreeMap::new(),
248 }
249 }
250}
251
252impl Message for PartitionData {
253 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
254 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
255}
256
257#[non_exhaustive]
259#[derive(Debug, Clone, PartialEq)]
260pub struct TopicData {
261 pub topic_name: super::TopicName,
265
266 pub partitions: Vec<PartitionData>,
270
271 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
273}
274
275impl TopicData {
276 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
282 self.topic_name = value;
283 self
284 }
285 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
291 self.partitions = value;
292 self
293 }
294 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
296 self.unknown_tagged_fields = value;
297 self
298 }
299 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
301 self.unknown_tagged_fields.insert(key, value);
302 self
303 }
304}
305
306#[cfg(feature = "client")]
307impl Encodable for TopicData {
308 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
309 if version < 0 || version > 1 {
310 bail!("specified version not supported by this message type");
311 }
312 types::CompactString.encode(buf, &self.topic_name)?;
313 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
314 let num_tagged_fields = self.unknown_tagged_fields.len();
315 if num_tagged_fields > std::u32::MAX as usize {
316 bail!(
317 "Too many tagged fields to encode ({} fields)",
318 num_tagged_fields
319 );
320 }
321 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
322
323 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
324 Ok(())
325 }
326 fn compute_size(&self, version: i16) -> Result<usize> {
327 let mut total_size = 0;
328 total_size += types::CompactString.compute_size(&self.topic_name)?;
329 total_size +=
330 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
331 let num_tagged_fields = self.unknown_tagged_fields.len();
332 if num_tagged_fields > std::u32::MAX as usize {
333 bail!(
334 "Too many tagged fields to encode ({} fields)",
335 num_tagged_fields
336 );
337 }
338 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
339
340 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
341 Ok(total_size)
342 }
343}
344
345#[cfg(feature = "broker")]
346impl Decodable for TopicData {
347 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
348 if version < 0 || version > 1 {
349 bail!("specified version not supported by this message type");
350 }
351 let topic_name = types::CompactString.decode(buf)?;
352 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
353 let mut unknown_tagged_fields = BTreeMap::new();
354 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
355 for _ in 0..num_tagged_fields {
356 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
357 let size: u32 = types::UnsignedVarInt.decode(buf)?;
358 let unknown_value = buf.try_get_bytes(size as usize)?;
359 unknown_tagged_fields.insert(tag as i32, unknown_value);
360 }
361 Ok(Self {
362 topic_name,
363 partitions,
364 unknown_tagged_fields,
365 })
366 }
367}
368
369impl Default for TopicData {
370 fn default() -> Self {
371 Self {
372 topic_name: Default::default(),
373 partitions: Default::default(),
374 unknown_tagged_fields: BTreeMap::new(),
375 }
376 }
377}
378
379impl Message for TopicData {
380 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
381 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
382}
383
384#[non_exhaustive]
386#[derive(Debug, Clone, PartialEq)]
387pub struct VoteRequest {
388 pub cluster_id: Option<StrBytes>,
392
393 pub voter_id: super::BrokerId,
397
398 pub topics: Vec<TopicData>,
402
403 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
405}
406
407impl VoteRequest {
408 pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
414 self.cluster_id = value;
415 self
416 }
417 pub fn with_voter_id(mut self, value: super::BrokerId) -> Self {
423 self.voter_id = value;
424 self
425 }
426 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
432 self.topics = value;
433 self
434 }
435 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
437 self.unknown_tagged_fields = value;
438 self
439 }
440 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
442 self.unknown_tagged_fields.insert(key, value);
443 self
444 }
445}
446
447#[cfg(feature = "client")]
448impl Encodable for VoteRequest {
449 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
450 if version < 0 || version > 1 {
451 bail!("specified version not supported by this message type");
452 }
453 types::CompactString.encode(buf, &self.cluster_id)?;
454 if version >= 1 {
455 types::Int32.encode(buf, &self.voter_id)?;
456 }
457 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
458 let num_tagged_fields = self.unknown_tagged_fields.len();
459 if num_tagged_fields > std::u32::MAX as usize {
460 bail!(
461 "Too many tagged fields to encode ({} fields)",
462 num_tagged_fields
463 );
464 }
465 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
466
467 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
468 Ok(())
469 }
470 fn compute_size(&self, version: i16) -> Result<usize> {
471 let mut total_size = 0;
472 total_size += types::CompactString.compute_size(&self.cluster_id)?;
473 if version >= 1 {
474 total_size += types::Int32.compute_size(&self.voter_id)?;
475 }
476 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
477 let num_tagged_fields = self.unknown_tagged_fields.len();
478 if num_tagged_fields > std::u32::MAX as usize {
479 bail!(
480 "Too many tagged fields to encode ({} fields)",
481 num_tagged_fields
482 );
483 }
484 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
485
486 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
487 Ok(total_size)
488 }
489}
490
491#[cfg(feature = "broker")]
492impl Decodable for VoteRequest {
493 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
494 if version < 0 || version > 1 {
495 bail!("specified version not supported by this message type");
496 }
497 let cluster_id = types::CompactString.decode(buf)?;
498 let voter_id = if version >= 1 {
499 types::Int32.decode(buf)?
500 } else {
501 (-1).into()
502 };
503 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
504 let mut unknown_tagged_fields = BTreeMap::new();
505 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
506 for _ in 0..num_tagged_fields {
507 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
508 let size: u32 = types::UnsignedVarInt.decode(buf)?;
509 let unknown_value = buf.try_get_bytes(size as usize)?;
510 unknown_tagged_fields.insert(tag as i32, unknown_value);
511 }
512 Ok(Self {
513 cluster_id,
514 voter_id,
515 topics,
516 unknown_tagged_fields,
517 })
518 }
519}
520
521impl Default for VoteRequest {
522 fn default() -> Self {
523 Self {
524 cluster_id: None,
525 voter_id: (-1).into(),
526 topics: Default::default(),
527 unknown_tagged_fields: BTreeMap::new(),
528 }
529 }
530}
531
532impl Message for VoteRequest {
533 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
534 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
535}
536
537impl HeaderVersion for VoteRequest {
538 fn header_version(version: i16) -> i16 {
539 2
540 }
541}