1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct BeginQuorumEpochRequest {
24 pub cluster_id: Option<StrBytes>,
28
29 pub voter_id: super::BrokerId,
33
34 pub topics: Vec<TopicData>,
38
39 pub leader_endpoints: Vec<LeaderEndpoint>,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl BeginQuorumEpochRequest {
49 pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
55 self.cluster_id = value;
56 self
57 }
58 pub fn with_voter_id(mut self, value: super::BrokerId) -> Self {
64 self.voter_id = value;
65 self
66 }
67 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
73 self.topics = value;
74 self
75 }
76 pub fn with_leader_endpoints(mut self, value: Vec<LeaderEndpoint>) -> Self {
82 self.leader_endpoints = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "client")]
98impl Encodable for BeginQuorumEpochRequest {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 if version < 0 || version > 1 {
101 bail!("specified version not supported by this message type");
102 }
103 if version >= 1 {
104 types::CompactString.encode(buf, &self.cluster_id)?;
105 } else {
106 types::String.encode(buf, &self.cluster_id)?;
107 }
108 if version >= 1 {
109 types::Int32.encode(buf, &self.voter_id)?;
110 }
111 if version >= 1 {
112 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
113 } else {
114 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
115 }
116 if version >= 1 {
117 types::CompactArray(types::Struct { version }).encode(buf, &self.leader_endpoints)?;
118 }
119 if version >= 1 {
120 let num_tagged_fields = self.unknown_tagged_fields.len();
121 if num_tagged_fields > std::u32::MAX as usize {
122 bail!(
123 "Too many tagged fields to encode ({} fields)",
124 num_tagged_fields
125 );
126 }
127 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
128
129 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
130 }
131 Ok(())
132 }
133 fn compute_size(&self, version: i16) -> Result<usize> {
134 let mut total_size = 0;
135 if version >= 1 {
136 total_size += types::CompactString.compute_size(&self.cluster_id)?;
137 } else {
138 total_size += types::String.compute_size(&self.cluster_id)?;
139 }
140 if version >= 1 {
141 total_size += types::Int32.compute_size(&self.voter_id)?;
142 }
143 if version >= 1 {
144 total_size +=
145 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
146 } else {
147 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
148 }
149 if version >= 1 {
150 total_size += types::CompactArray(types::Struct { version })
151 .compute_size(&self.leader_endpoints)?;
152 }
153 if version >= 1 {
154 let num_tagged_fields = self.unknown_tagged_fields.len();
155 if num_tagged_fields > std::u32::MAX as usize {
156 bail!(
157 "Too many tagged fields to encode ({} fields)",
158 num_tagged_fields
159 );
160 }
161 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
162
163 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
164 }
165 Ok(total_size)
166 }
167}
168
169#[cfg(feature = "broker")]
170impl Decodable for BeginQuorumEpochRequest {
171 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
172 if version < 0 || version > 1 {
173 bail!("specified version not supported by this message type");
174 }
175 let cluster_id = if version >= 1 {
176 types::CompactString.decode(buf)?
177 } else {
178 types::String.decode(buf)?
179 };
180 let voter_id = if version >= 1 {
181 types::Int32.decode(buf)?
182 } else {
183 (-1).into()
184 };
185 let topics = if version >= 1 {
186 types::CompactArray(types::Struct { version }).decode(buf)?
187 } else {
188 types::Array(types::Struct { version }).decode(buf)?
189 };
190 let leader_endpoints = if version >= 1 {
191 types::CompactArray(types::Struct { version }).decode(buf)?
192 } else {
193 Default::default()
194 };
195 let mut unknown_tagged_fields = BTreeMap::new();
196 if version >= 1 {
197 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
198 for _ in 0..num_tagged_fields {
199 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
200 let size: u32 = types::UnsignedVarInt.decode(buf)?;
201 let unknown_value = buf.try_get_bytes(size as usize)?;
202 unknown_tagged_fields.insert(tag as i32, unknown_value);
203 }
204 }
205 Ok(Self {
206 cluster_id,
207 voter_id,
208 topics,
209 leader_endpoints,
210 unknown_tagged_fields,
211 })
212 }
213}
214
215impl Default for BeginQuorumEpochRequest {
216 fn default() -> Self {
217 Self {
218 cluster_id: None,
219 voter_id: (-1).into(),
220 topics: Default::default(),
221 leader_endpoints: Default::default(),
222 unknown_tagged_fields: BTreeMap::new(),
223 }
224 }
225}
226
227impl Message for BeginQuorumEpochRequest {
228 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
229 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
230}
231
232#[non_exhaustive]
234#[derive(Debug, Clone, PartialEq)]
235pub struct LeaderEndpoint {
236 pub name: StrBytes,
240
241 pub host: StrBytes,
245
246 pub port: u16,
250
251 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
253}
254
255impl LeaderEndpoint {
256 pub fn with_name(mut self, value: StrBytes) -> Self {
262 self.name = value;
263 self
264 }
265 pub fn with_host(mut self, value: StrBytes) -> Self {
271 self.host = value;
272 self
273 }
274 pub fn with_port(mut self, value: u16) -> Self {
280 self.port = value;
281 self
282 }
283 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
285 self.unknown_tagged_fields = value;
286 self
287 }
288 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
290 self.unknown_tagged_fields.insert(key, value);
291 self
292 }
293}
294
295#[cfg(feature = "client")]
296impl Encodable for LeaderEndpoint {
297 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
298 if version < 0 || version > 1 {
299 bail!("specified version not supported by this message type");
300 }
301 if version >= 1 {
302 types::CompactString.encode(buf, &self.name)?;
303 } else {
304 if !self.name.is_empty() {
305 bail!("A field is set that is not available on the selected protocol version");
306 }
307 }
308 if version >= 1 {
309 types::CompactString.encode(buf, &self.host)?;
310 } else {
311 if !self.host.is_empty() {
312 bail!("A field is set that is not available on the selected protocol version");
313 }
314 }
315 if version >= 1 {
316 types::UInt16.encode(buf, &self.port)?;
317 } else {
318 if self.port != 0 {
319 bail!("A field is set that is not available on the selected protocol version");
320 }
321 }
322 if version >= 1 {
323 let num_tagged_fields = self.unknown_tagged_fields.len();
324 if num_tagged_fields > std::u32::MAX as usize {
325 bail!(
326 "Too many tagged fields to encode ({} fields)",
327 num_tagged_fields
328 );
329 }
330 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
331
332 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
333 }
334 Ok(())
335 }
336 fn compute_size(&self, version: i16) -> Result<usize> {
337 let mut total_size = 0;
338 if version >= 1 {
339 total_size += types::CompactString.compute_size(&self.name)?;
340 } else {
341 if !self.name.is_empty() {
342 bail!("A field is set that is not available on the selected protocol version");
343 }
344 }
345 if version >= 1 {
346 total_size += types::CompactString.compute_size(&self.host)?;
347 } else {
348 if !self.host.is_empty() {
349 bail!("A field is set that is not available on the selected protocol version");
350 }
351 }
352 if version >= 1 {
353 total_size += types::UInt16.compute_size(&self.port)?;
354 } else {
355 if self.port != 0 {
356 bail!("A field is set that is not available on the selected protocol version");
357 }
358 }
359 if version >= 1 {
360 let num_tagged_fields = self.unknown_tagged_fields.len();
361 if num_tagged_fields > std::u32::MAX as usize {
362 bail!(
363 "Too many tagged fields to encode ({} fields)",
364 num_tagged_fields
365 );
366 }
367 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
368
369 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
370 }
371 Ok(total_size)
372 }
373}
374
375#[cfg(feature = "broker")]
376impl Decodable for LeaderEndpoint {
377 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
378 if version < 0 || version > 1 {
379 bail!("specified version not supported by this message type");
380 }
381 let name = if version >= 1 {
382 types::CompactString.decode(buf)?
383 } else {
384 Default::default()
385 };
386 let host = if version >= 1 {
387 types::CompactString.decode(buf)?
388 } else {
389 Default::default()
390 };
391 let port = if version >= 1 {
392 types::UInt16.decode(buf)?
393 } else {
394 0
395 };
396 let mut unknown_tagged_fields = BTreeMap::new();
397 if version >= 1 {
398 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
399 for _ in 0..num_tagged_fields {
400 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
401 let size: u32 = types::UnsignedVarInt.decode(buf)?;
402 let unknown_value = buf.try_get_bytes(size as usize)?;
403 unknown_tagged_fields.insert(tag as i32, unknown_value);
404 }
405 }
406 Ok(Self {
407 name,
408 host,
409 port,
410 unknown_tagged_fields,
411 })
412 }
413}
414
415impl Default for LeaderEndpoint {
416 fn default() -> Self {
417 Self {
418 name: Default::default(),
419 host: Default::default(),
420 port: 0,
421 unknown_tagged_fields: BTreeMap::new(),
422 }
423 }
424}
425
426impl Message for LeaderEndpoint {
427 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
428 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
429}
430
431#[non_exhaustive]
433#[derive(Debug, Clone, PartialEq)]
434pub struct PartitionData {
435 pub partition_index: i32,
439
440 pub voter_directory_id: Uuid,
444
445 pub leader_id: super::BrokerId,
449
450 pub leader_epoch: i32,
454
455 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
457}
458
459impl PartitionData {
460 pub fn with_partition_index(mut self, value: i32) -> Self {
466 self.partition_index = value;
467 self
468 }
469 pub fn with_voter_directory_id(mut self, value: Uuid) -> Self {
475 self.voter_directory_id = value;
476 self
477 }
478 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
484 self.leader_id = value;
485 self
486 }
487 pub fn with_leader_epoch(mut self, value: i32) -> Self {
493 self.leader_epoch = value;
494 self
495 }
496 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
498 self.unknown_tagged_fields = value;
499 self
500 }
501 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
503 self.unknown_tagged_fields.insert(key, value);
504 self
505 }
506}
507
508#[cfg(feature = "client")]
509impl Encodable for PartitionData {
510 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
511 if version < 0 || version > 1 {
512 bail!("specified version not supported by this message type");
513 }
514 types::Int32.encode(buf, &self.partition_index)?;
515 if version >= 1 {
516 types::Uuid.encode(buf, &self.voter_directory_id)?;
517 }
518 types::Int32.encode(buf, &self.leader_id)?;
519 types::Int32.encode(buf, &self.leader_epoch)?;
520 if version >= 1 {
521 let num_tagged_fields = self.unknown_tagged_fields.len();
522 if num_tagged_fields > std::u32::MAX as usize {
523 bail!(
524 "Too many tagged fields to encode ({} fields)",
525 num_tagged_fields
526 );
527 }
528 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
529
530 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
531 }
532 Ok(())
533 }
534 fn compute_size(&self, version: i16) -> Result<usize> {
535 let mut total_size = 0;
536 total_size += types::Int32.compute_size(&self.partition_index)?;
537 if version >= 1 {
538 total_size += types::Uuid.compute_size(&self.voter_directory_id)?;
539 }
540 total_size += types::Int32.compute_size(&self.leader_id)?;
541 total_size += types::Int32.compute_size(&self.leader_epoch)?;
542 if version >= 1 {
543 let num_tagged_fields = self.unknown_tagged_fields.len();
544 if num_tagged_fields > std::u32::MAX as usize {
545 bail!(
546 "Too many tagged fields to encode ({} fields)",
547 num_tagged_fields
548 );
549 }
550 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
551
552 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
553 }
554 Ok(total_size)
555 }
556}
557
558#[cfg(feature = "broker")]
559impl Decodable for PartitionData {
560 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
561 if version < 0 || version > 1 {
562 bail!("specified version not supported by this message type");
563 }
564 let partition_index = types::Int32.decode(buf)?;
565 let voter_directory_id = if version >= 1 {
566 types::Uuid.decode(buf)?
567 } else {
568 Uuid::nil()
569 };
570 let leader_id = types::Int32.decode(buf)?;
571 let leader_epoch = types::Int32.decode(buf)?;
572 let mut unknown_tagged_fields = BTreeMap::new();
573 if version >= 1 {
574 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
575 for _ in 0..num_tagged_fields {
576 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
577 let size: u32 = types::UnsignedVarInt.decode(buf)?;
578 let unknown_value = buf.try_get_bytes(size as usize)?;
579 unknown_tagged_fields.insert(tag as i32, unknown_value);
580 }
581 }
582 Ok(Self {
583 partition_index,
584 voter_directory_id,
585 leader_id,
586 leader_epoch,
587 unknown_tagged_fields,
588 })
589 }
590}
591
592impl Default for PartitionData {
593 fn default() -> Self {
594 Self {
595 partition_index: 0,
596 voter_directory_id: Uuid::nil(),
597 leader_id: (0).into(),
598 leader_epoch: 0,
599 unknown_tagged_fields: BTreeMap::new(),
600 }
601 }
602}
603
604impl Message for PartitionData {
605 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
606 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
607}
608
609#[non_exhaustive]
611#[derive(Debug, Clone, PartialEq)]
612pub struct TopicData {
613 pub topic_name: super::TopicName,
617
618 pub partitions: Vec<PartitionData>,
622
623 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
625}
626
627impl TopicData {
628 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
634 self.topic_name = value;
635 self
636 }
637 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
643 self.partitions = value;
644 self
645 }
646 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
648 self.unknown_tagged_fields = value;
649 self
650 }
651 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
653 self.unknown_tagged_fields.insert(key, value);
654 self
655 }
656}
657
658#[cfg(feature = "client")]
659impl Encodable for TopicData {
660 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
661 if version < 0 || version > 1 {
662 bail!("specified version not supported by this message type");
663 }
664 if version >= 1 {
665 types::CompactString.encode(buf, &self.topic_name)?;
666 } else {
667 types::String.encode(buf, &self.topic_name)?;
668 }
669 if version >= 1 {
670 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
671 } else {
672 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
673 }
674 if version >= 1 {
675 let num_tagged_fields = self.unknown_tagged_fields.len();
676 if num_tagged_fields > std::u32::MAX as usize {
677 bail!(
678 "Too many tagged fields to encode ({} fields)",
679 num_tagged_fields
680 );
681 }
682 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
683
684 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
685 }
686 Ok(())
687 }
688 fn compute_size(&self, version: i16) -> Result<usize> {
689 let mut total_size = 0;
690 if version >= 1 {
691 total_size += types::CompactString.compute_size(&self.topic_name)?;
692 } else {
693 total_size += types::String.compute_size(&self.topic_name)?;
694 }
695 if version >= 1 {
696 total_size +=
697 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
698 } else {
699 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
700 }
701 if version >= 1 {
702 let num_tagged_fields = self.unknown_tagged_fields.len();
703 if num_tagged_fields > std::u32::MAX as usize {
704 bail!(
705 "Too many tagged fields to encode ({} fields)",
706 num_tagged_fields
707 );
708 }
709 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
710
711 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
712 }
713 Ok(total_size)
714 }
715}
716
717#[cfg(feature = "broker")]
718impl Decodable for TopicData {
719 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
720 if version < 0 || version > 1 {
721 bail!("specified version not supported by this message type");
722 }
723 let topic_name = if version >= 1 {
724 types::CompactString.decode(buf)?
725 } else {
726 types::String.decode(buf)?
727 };
728 let partitions = if version >= 1 {
729 types::CompactArray(types::Struct { version }).decode(buf)?
730 } else {
731 types::Array(types::Struct { version }).decode(buf)?
732 };
733 let mut unknown_tagged_fields = BTreeMap::new();
734 if version >= 1 {
735 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
736 for _ in 0..num_tagged_fields {
737 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
738 let size: u32 = types::UnsignedVarInt.decode(buf)?;
739 let unknown_value = buf.try_get_bytes(size as usize)?;
740 unknown_tagged_fields.insert(tag as i32, unknown_value);
741 }
742 }
743 Ok(Self {
744 topic_name,
745 partitions,
746 unknown_tagged_fields,
747 })
748 }
749}
750
751impl Default for TopicData {
752 fn default() -> Self {
753 Self {
754 topic_name: Default::default(),
755 partitions: Default::default(),
756 unknown_tagged_fields: BTreeMap::new(),
757 }
758 }
759}
760
761impl Message for TopicData {
762 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
763 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
764}
765
766impl HeaderVersion for BeginQuorumEpochRequest {
767 fn header_version(version: i16) -> i16 {
768 if version >= 1 {
769 2
770 } else {
771 1
772 }
773 }
774}