1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct EndQuorumEpochRequest {
24 pub cluster_id: Option<StrBytes>,
28
29 pub topics: Vec<TopicData>,
33
34 pub leader_endpoints: Vec<LeaderEndpoint>,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl EndQuorumEpochRequest {
44 pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
50 self.cluster_id = value;
51 self
52 }
53 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
59 self.topics = value;
60 self
61 }
62 pub fn with_leader_endpoints(mut self, value: Vec<LeaderEndpoint>) -> Self {
68 self.leader_endpoints = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for EndQuorumEpochRequest {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version < 0 || version > 1 {
87 bail!("specified version not supported by this message type");
88 }
89 if version >= 1 {
90 types::CompactString.encode(buf, &self.cluster_id)?;
91 } else {
92 types::String.encode(buf, &self.cluster_id)?;
93 }
94 if version >= 1 {
95 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
96 } else {
97 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
98 }
99 if version >= 1 {
100 types::CompactArray(types::Struct { version }).encode(buf, &self.leader_endpoints)?;
101 }
102 if version >= 1 {
103 let num_tagged_fields = self.unknown_tagged_fields.len();
104 if num_tagged_fields > std::u32::MAX as usize {
105 bail!(
106 "Too many tagged fields to encode ({} fields)",
107 num_tagged_fields
108 );
109 }
110 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
111
112 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
113 }
114 Ok(())
115 }
116 fn compute_size(&self, version: i16) -> Result<usize> {
117 let mut total_size = 0;
118 if version >= 1 {
119 total_size += types::CompactString.compute_size(&self.cluster_id)?;
120 } else {
121 total_size += types::String.compute_size(&self.cluster_id)?;
122 }
123 if version >= 1 {
124 total_size +=
125 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
126 } else {
127 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
128 }
129 if version >= 1 {
130 total_size += types::CompactArray(types::Struct { version })
131 .compute_size(&self.leader_endpoints)?;
132 }
133 if version >= 1 {
134 let num_tagged_fields = self.unknown_tagged_fields.len();
135 if num_tagged_fields > std::u32::MAX as usize {
136 bail!(
137 "Too many tagged fields to encode ({} fields)",
138 num_tagged_fields
139 );
140 }
141 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
142
143 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
144 }
145 Ok(total_size)
146 }
147}
148
149#[cfg(feature = "broker")]
150impl Decodable for EndQuorumEpochRequest {
151 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
152 if version < 0 || version > 1 {
153 bail!("specified version not supported by this message type");
154 }
155 let cluster_id = if version >= 1 {
156 types::CompactString.decode(buf)?
157 } else {
158 types::String.decode(buf)?
159 };
160 let topics = if version >= 1 {
161 types::CompactArray(types::Struct { version }).decode(buf)?
162 } else {
163 types::Array(types::Struct { version }).decode(buf)?
164 };
165 let leader_endpoints = if version >= 1 {
166 types::CompactArray(types::Struct { version }).decode(buf)?
167 } else {
168 Default::default()
169 };
170 let mut unknown_tagged_fields = BTreeMap::new();
171 if version >= 1 {
172 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
173 for _ in 0..num_tagged_fields {
174 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
175 let size: u32 = types::UnsignedVarInt.decode(buf)?;
176 let unknown_value = buf.try_get_bytes(size as usize)?;
177 unknown_tagged_fields.insert(tag as i32, unknown_value);
178 }
179 }
180 Ok(Self {
181 cluster_id,
182 topics,
183 leader_endpoints,
184 unknown_tagged_fields,
185 })
186 }
187}
188
189impl Default for EndQuorumEpochRequest {
190 fn default() -> Self {
191 Self {
192 cluster_id: None,
193 topics: Default::default(),
194 leader_endpoints: Default::default(),
195 unknown_tagged_fields: BTreeMap::new(),
196 }
197 }
198}
199
200impl Message for EndQuorumEpochRequest {
201 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
202 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
203}
204
205#[non_exhaustive]
207#[derive(Debug, Clone, PartialEq)]
208pub struct LeaderEndpoint {
209 pub name: StrBytes,
213
214 pub host: StrBytes,
218
219 pub port: u16,
223
224 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
226}
227
228impl LeaderEndpoint {
229 pub fn with_name(mut self, value: StrBytes) -> Self {
235 self.name = value;
236 self
237 }
238 pub fn with_host(mut self, value: StrBytes) -> Self {
244 self.host = value;
245 self
246 }
247 pub fn with_port(mut self, value: u16) -> Self {
253 self.port = value;
254 self
255 }
256 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
258 self.unknown_tagged_fields = value;
259 self
260 }
261 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
263 self.unknown_tagged_fields.insert(key, value);
264 self
265 }
266}
267
268#[cfg(feature = "client")]
269impl Encodable for LeaderEndpoint {
270 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
271 if version < 0 || version > 1 {
272 bail!("specified version not supported by this message type");
273 }
274 if version >= 1 {
275 types::CompactString.encode(buf, &self.name)?;
276 } else {
277 if !self.name.is_empty() {
278 bail!("A field is set that is not available on the selected protocol version");
279 }
280 }
281 if version >= 1 {
282 types::CompactString.encode(buf, &self.host)?;
283 } else {
284 if !self.host.is_empty() {
285 bail!("A field is set that is not available on the selected protocol version");
286 }
287 }
288 if version >= 1 {
289 types::UInt16.encode(buf, &self.port)?;
290 } else {
291 if self.port != 0 {
292 bail!("A field is set that is not available on the selected protocol version");
293 }
294 }
295 if version >= 1 {
296 let num_tagged_fields = self.unknown_tagged_fields.len();
297 if num_tagged_fields > std::u32::MAX as usize {
298 bail!(
299 "Too many tagged fields to encode ({} fields)",
300 num_tagged_fields
301 );
302 }
303 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
304
305 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
306 }
307 Ok(())
308 }
309 fn compute_size(&self, version: i16) -> Result<usize> {
310 let mut total_size = 0;
311 if version >= 1 {
312 total_size += types::CompactString.compute_size(&self.name)?;
313 } else {
314 if !self.name.is_empty() {
315 bail!("A field is set that is not available on the selected protocol version");
316 }
317 }
318 if version >= 1 {
319 total_size += types::CompactString.compute_size(&self.host)?;
320 } else {
321 if !self.host.is_empty() {
322 bail!("A field is set that is not available on the selected protocol version");
323 }
324 }
325 if version >= 1 {
326 total_size += types::UInt16.compute_size(&self.port)?;
327 } else {
328 if self.port != 0 {
329 bail!("A field is set that is not available on the selected protocol version");
330 }
331 }
332 if version >= 1 {
333 let num_tagged_fields = self.unknown_tagged_fields.len();
334 if num_tagged_fields > std::u32::MAX as usize {
335 bail!(
336 "Too many tagged fields to encode ({} fields)",
337 num_tagged_fields
338 );
339 }
340 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
341
342 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
343 }
344 Ok(total_size)
345 }
346}
347
348#[cfg(feature = "broker")]
349impl Decodable for LeaderEndpoint {
350 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
351 if version < 0 || version > 1 {
352 bail!("specified version not supported by this message type");
353 }
354 let name = if version >= 1 {
355 types::CompactString.decode(buf)?
356 } else {
357 Default::default()
358 };
359 let host = if version >= 1 {
360 types::CompactString.decode(buf)?
361 } else {
362 Default::default()
363 };
364 let port = if version >= 1 {
365 types::UInt16.decode(buf)?
366 } else {
367 0
368 };
369 let mut unknown_tagged_fields = BTreeMap::new();
370 if version >= 1 {
371 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
372 for _ in 0..num_tagged_fields {
373 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
374 let size: u32 = types::UnsignedVarInt.decode(buf)?;
375 let unknown_value = buf.try_get_bytes(size as usize)?;
376 unknown_tagged_fields.insert(tag as i32, unknown_value);
377 }
378 }
379 Ok(Self {
380 name,
381 host,
382 port,
383 unknown_tagged_fields,
384 })
385 }
386}
387
388impl Default for LeaderEndpoint {
389 fn default() -> Self {
390 Self {
391 name: Default::default(),
392 host: Default::default(),
393 port: 0,
394 unknown_tagged_fields: BTreeMap::new(),
395 }
396 }
397}
398
399impl Message for LeaderEndpoint {
400 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
401 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
402}
403
404#[non_exhaustive]
406#[derive(Debug, Clone, PartialEq)]
407pub struct PartitionData {
408 pub partition_index: i32,
412
413 pub leader_id: super::BrokerId,
417
418 pub leader_epoch: i32,
422
423 pub preferred_successors: Vec<i32>,
427
428 pub preferred_candidates: Vec<ReplicaInfo>,
432
433 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
435}
436
437impl PartitionData {
438 pub fn with_partition_index(mut self, value: i32) -> Self {
444 self.partition_index = value;
445 self
446 }
447 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
453 self.leader_id = value;
454 self
455 }
456 pub fn with_leader_epoch(mut self, value: i32) -> Self {
462 self.leader_epoch = value;
463 self
464 }
465 pub fn with_preferred_successors(mut self, value: Vec<i32>) -> Self {
471 self.preferred_successors = value;
472 self
473 }
474 pub fn with_preferred_candidates(mut self, value: Vec<ReplicaInfo>) -> Self {
480 self.preferred_candidates = value;
481 self
482 }
483 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
485 self.unknown_tagged_fields = value;
486 self
487 }
488 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
490 self.unknown_tagged_fields.insert(key, value);
491 self
492 }
493}
494
495#[cfg(feature = "client")]
496impl Encodable for PartitionData {
497 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
498 if version < 0 || version > 1 {
499 bail!("specified version not supported by this message type");
500 }
501 types::Int32.encode(buf, &self.partition_index)?;
502 types::Int32.encode(buf, &self.leader_id)?;
503 types::Int32.encode(buf, &self.leader_epoch)?;
504 if version == 0 {
505 types::Array(types::Int32).encode(buf, &self.preferred_successors)?;
506 }
507 if version >= 1 {
508 types::CompactArray(types::Struct { version })
509 .encode(buf, &self.preferred_candidates)?;
510 }
511 if version >= 1 {
512 let num_tagged_fields = self.unknown_tagged_fields.len();
513 if num_tagged_fields > std::u32::MAX as usize {
514 bail!(
515 "Too many tagged fields to encode ({} fields)",
516 num_tagged_fields
517 );
518 }
519 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
520
521 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
522 }
523 Ok(())
524 }
525 fn compute_size(&self, version: i16) -> Result<usize> {
526 let mut total_size = 0;
527 total_size += types::Int32.compute_size(&self.partition_index)?;
528 total_size += types::Int32.compute_size(&self.leader_id)?;
529 total_size += types::Int32.compute_size(&self.leader_epoch)?;
530 if version == 0 {
531 total_size += types::Array(types::Int32).compute_size(&self.preferred_successors)?;
532 }
533 if version >= 1 {
534 total_size += types::CompactArray(types::Struct { version })
535 .compute_size(&self.preferred_candidates)?;
536 }
537 if version >= 1 {
538 let num_tagged_fields = self.unknown_tagged_fields.len();
539 if num_tagged_fields > std::u32::MAX as usize {
540 bail!(
541 "Too many tagged fields to encode ({} fields)",
542 num_tagged_fields
543 );
544 }
545 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
546
547 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
548 }
549 Ok(total_size)
550 }
551}
552
553#[cfg(feature = "broker")]
554impl Decodable for PartitionData {
555 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
556 if version < 0 || version > 1 {
557 bail!("specified version not supported by this message type");
558 }
559 let partition_index = types::Int32.decode(buf)?;
560 let leader_id = types::Int32.decode(buf)?;
561 let leader_epoch = types::Int32.decode(buf)?;
562 let preferred_successors = if version == 0 {
563 types::Array(types::Int32).decode(buf)?
564 } else {
565 Default::default()
566 };
567 let preferred_candidates = if version >= 1 {
568 types::CompactArray(types::Struct { version }).decode(buf)?
569 } else {
570 Default::default()
571 };
572 let mut unknown_tagged_fields = BTreeMap::new();
573 if version >= 1 {
574 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
575 for _ in 0..num_tagged_fields {
576 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
577 let size: u32 = types::UnsignedVarInt.decode(buf)?;
578 let unknown_value = buf.try_get_bytes(size as usize)?;
579 unknown_tagged_fields.insert(tag as i32, unknown_value);
580 }
581 }
582 Ok(Self {
583 partition_index,
584 leader_id,
585 leader_epoch,
586 preferred_successors,
587 preferred_candidates,
588 unknown_tagged_fields,
589 })
590 }
591}
592
593impl Default for PartitionData {
594 fn default() -> Self {
595 Self {
596 partition_index: 0,
597 leader_id: (0).into(),
598 leader_epoch: 0,
599 preferred_successors: Default::default(),
600 preferred_candidates: Default::default(),
601 unknown_tagged_fields: BTreeMap::new(),
602 }
603 }
604}
605
606impl Message for PartitionData {
607 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
608 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
609}
610
611#[non_exhaustive]
613#[derive(Debug, Clone, PartialEq)]
614pub struct ReplicaInfo {
615 pub candidate_id: super::BrokerId,
619
620 pub candidate_directory_id: Uuid,
624
625 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
627}
628
629impl ReplicaInfo {
630 pub fn with_candidate_id(mut self, value: super::BrokerId) -> Self {
636 self.candidate_id = value;
637 self
638 }
639 pub fn with_candidate_directory_id(mut self, value: Uuid) -> Self {
645 self.candidate_directory_id = value;
646 self
647 }
648 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
650 self.unknown_tagged_fields = value;
651 self
652 }
653 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
655 self.unknown_tagged_fields.insert(key, value);
656 self
657 }
658}
659
660#[cfg(feature = "client")]
661impl Encodable for ReplicaInfo {
662 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
663 if version < 0 || version > 1 {
664 bail!("specified version not supported by this message type");
665 }
666 if version >= 1 {
667 types::Int32.encode(buf, &self.candidate_id)?;
668 } else {
669 if self.candidate_id != 0 {
670 bail!("A field is set that is not available on the selected protocol version");
671 }
672 }
673 if version >= 1 {
674 types::Uuid.encode(buf, &self.candidate_directory_id)?;
675 } else {
676 if &self.candidate_directory_id != &Uuid::nil() {
677 bail!("A field is set that is not available on the selected protocol version");
678 }
679 }
680 if version >= 1 {
681 let num_tagged_fields = self.unknown_tagged_fields.len();
682 if num_tagged_fields > std::u32::MAX as usize {
683 bail!(
684 "Too many tagged fields to encode ({} fields)",
685 num_tagged_fields
686 );
687 }
688 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
689
690 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
691 }
692 Ok(())
693 }
694 fn compute_size(&self, version: i16) -> Result<usize> {
695 let mut total_size = 0;
696 if version >= 1 {
697 total_size += types::Int32.compute_size(&self.candidate_id)?;
698 } else {
699 if self.candidate_id != 0 {
700 bail!("A field is set that is not available on the selected protocol version");
701 }
702 }
703 if version >= 1 {
704 total_size += types::Uuid.compute_size(&self.candidate_directory_id)?;
705 } else {
706 if &self.candidate_directory_id != &Uuid::nil() {
707 bail!("A field is set that is not available on the selected protocol version");
708 }
709 }
710 if version >= 1 {
711 let num_tagged_fields = self.unknown_tagged_fields.len();
712 if num_tagged_fields > std::u32::MAX as usize {
713 bail!(
714 "Too many tagged fields to encode ({} fields)",
715 num_tagged_fields
716 );
717 }
718 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
719
720 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
721 }
722 Ok(total_size)
723 }
724}
725
726#[cfg(feature = "broker")]
727impl Decodable for ReplicaInfo {
728 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
729 if version < 0 || version > 1 {
730 bail!("specified version not supported by this message type");
731 }
732 let candidate_id = if version >= 1 {
733 types::Int32.decode(buf)?
734 } else {
735 (0).into()
736 };
737 let candidate_directory_id = if version >= 1 {
738 types::Uuid.decode(buf)?
739 } else {
740 Uuid::nil()
741 };
742 let mut unknown_tagged_fields = BTreeMap::new();
743 if version >= 1 {
744 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
745 for _ in 0..num_tagged_fields {
746 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
747 let size: u32 = types::UnsignedVarInt.decode(buf)?;
748 let unknown_value = buf.try_get_bytes(size as usize)?;
749 unknown_tagged_fields.insert(tag as i32, unknown_value);
750 }
751 }
752 Ok(Self {
753 candidate_id,
754 candidate_directory_id,
755 unknown_tagged_fields,
756 })
757 }
758}
759
760impl Default for ReplicaInfo {
761 fn default() -> Self {
762 Self {
763 candidate_id: (0).into(),
764 candidate_directory_id: Uuid::nil(),
765 unknown_tagged_fields: BTreeMap::new(),
766 }
767 }
768}
769
770impl Message for ReplicaInfo {
771 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
772 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
773}
774
775#[non_exhaustive]
777#[derive(Debug, Clone, PartialEq)]
778pub struct TopicData {
779 pub topic_name: super::TopicName,
783
784 pub partitions: Vec<PartitionData>,
788
789 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
791}
792
793impl TopicData {
794 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
800 self.topic_name = value;
801 self
802 }
803 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
809 self.partitions = value;
810 self
811 }
812 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
814 self.unknown_tagged_fields = value;
815 self
816 }
817 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
819 self.unknown_tagged_fields.insert(key, value);
820 self
821 }
822}
823
824#[cfg(feature = "client")]
825impl Encodable for TopicData {
826 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
827 if version < 0 || version > 1 {
828 bail!("specified version not supported by this message type");
829 }
830 if version >= 1 {
831 types::CompactString.encode(buf, &self.topic_name)?;
832 } else {
833 types::String.encode(buf, &self.topic_name)?;
834 }
835 if version >= 1 {
836 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
837 } else {
838 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
839 }
840 if version >= 1 {
841 let num_tagged_fields = self.unknown_tagged_fields.len();
842 if num_tagged_fields > std::u32::MAX as usize {
843 bail!(
844 "Too many tagged fields to encode ({} fields)",
845 num_tagged_fields
846 );
847 }
848 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
849
850 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
851 }
852 Ok(())
853 }
854 fn compute_size(&self, version: i16) -> Result<usize> {
855 let mut total_size = 0;
856 if version >= 1 {
857 total_size += types::CompactString.compute_size(&self.topic_name)?;
858 } else {
859 total_size += types::String.compute_size(&self.topic_name)?;
860 }
861 if version >= 1 {
862 total_size +=
863 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
864 } else {
865 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
866 }
867 if version >= 1 {
868 let num_tagged_fields = self.unknown_tagged_fields.len();
869 if num_tagged_fields > std::u32::MAX as usize {
870 bail!(
871 "Too many tagged fields to encode ({} fields)",
872 num_tagged_fields
873 );
874 }
875 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
876
877 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
878 }
879 Ok(total_size)
880 }
881}
882
883#[cfg(feature = "broker")]
884impl Decodable for TopicData {
885 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
886 if version < 0 || version > 1 {
887 bail!("specified version not supported by this message type");
888 }
889 let topic_name = if version >= 1 {
890 types::CompactString.decode(buf)?
891 } else {
892 types::String.decode(buf)?
893 };
894 let partitions = if version >= 1 {
895 types::CompactArray(types::Struct { version }).decode(buf)?
896 } else {
897 types::Array(types::Struct { version }).decode(buf)?
898 };
899 let mut unknown_tagged_fields = BTreeMap::new();
900 if version >= 1 {
901 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
902 for _ in 0..num_tagged_fields {
903 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
904 let size: u32 = types::UnsignedVarInt.decode(buf)?;
905 let unknown_value = buf.try_get_bytes(size as usize)?;
906 unknown_tagged_fields.insert(tag as i32, unknown_value);
907 }
908 }
909 Ok(Self {
910 topic_name,
911 partitions,
912 unknown_tagged_fields,
913 })
914 }
915}
916
917impl Default for TopicData {
918 fn default() -> Self {
919 Self {
920 topic_name: Default::default(),
921 partitions: Default::default(),
922 unknown_tagged_fields: BTreeMap::new(),
923 }
924 }
925}
926
927impl Message for TopicData {
928 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
929 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
930}
931
932impl HeaderVersion for EndQuorumEpochRequest {
933 fn header_version(version: i16) -> i16 {
934 if version >= 1 {
935 2
936 } else {
937 1
938 }
939 }
940}