1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct MetadataResponse {
24 pub throttle_time_ms: i32,
28
29 pub brokers: Vec<MetadataResponseBroker>,
33
34 pub cluster_id: Option<StrBytes>,
38
39 pub controller_id: super::BrokerId,
43
44 pub topics: Vec<MetadataResponseTopic>,
48
49 pub cluster_authorized_operations: i32,
53
54 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl MetadataResponse {
59 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
65 self.throttle_time_ms = value;
66 self
67 }
68 pub fn with_brokers(mut self, value: Vec<MetadataResponseBroker>) -> Self {
74 self.brokers = value;
75 self
76 }
77 pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
83 self.cluster_id = value;
84 self
85 }
86 pub fn with_controller_id(mut self, value: super::BrokerId) -> Self {
92 self.controller_id = value;
93 self
94 }
95 pub fn with_topics(mut self, value: Vec<MetadataResponseTopic>) -> Self {
101 self.topics = value;
102 self
103 }
104 pub fn with_cluster_authorized_operations(mut self, value: i32) -> Self {
110 self.cluster_authorized_operations = value;
111 self
112 }
113 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115 self.unknown_tagged_fields = value;
116 self
117 }
118 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120 self.unknown_tagged_fields.insert(key, value);
121 self
122 }
123}
124
125#[cfg(feature = "broker")]
126impl Encodable for MetadataResponse {
127 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128 if version >= 3 {
129 types::Int32.encode(buf, &self.throttle_time_ms)?;
130 }
131 if version >= 9 {
132 types::CompactArray(types::Struct { version }).encode(buf, &self.brokers)?;
133 } else {
134 types::Array(types::Struct { version }).encode(buf, &self.brokers)?;
135 }
136 if version >= 2 {
137 if version >= 9 {
138 types::CompactString.encode(buf, &self.cluster_id)?;
139 } else {
140 types::String.encode(buf, &self.cluster_id)?;
141 }
142 }
143 if version >= 1 {
144 types::Int32.encode(buf, &self.controller_id)?;
145 }
146 if version >= 9 {
147 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
148 } else {
149 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
150 }
151 if version >= 8 && version <= 10 {
152 types::Int32.encode(buf, &self.cluster_authorized_operations)?;
153 } else {
154 if self.cluster_authorized_operations != -2147483648 {
155 bail!("A field is set that is not available on the selected protocol version");
156 }
157 }
158 if version >= 9 {
159 let num_tagged_fields = self.unknown_tagged_fields.len();
160 if num_tagged_fields > std::u32::MAX as usize {
161 bail!(
162 "Too many tagged fields to encode ({} fields)",
163 num_tagged_fields
164 );
165 }
166 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
167
168 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
169 }
170 Ok(())
171 }
172 fn compute_size(&self, version: i16) -> Result<usize> {
173 let mut total_size = 0;
174 if version >= 3 {
175 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
176 }
177 if version >= 9 {
178 total_size +=
179 types::CompactArray(types::Struct { version }).compute_size(&self.brokers)?;
180 } else {
181 total_size += types::Array(types::Struct { version }).compute_size(&self.brokers)?;
182 }
183 if version >= 2 {
184 if version >= 9 {
185 total_size += types::CompactString.compute_size(&self.cluster_id)?;
186 } else {
187 total_size += types::String.compute_size(&self.cluster_id)?;
188 }
189 }
190 if version >= 1 {
191 total_size += types::Int32.compute_size(&self.controller_id)?;
192 }
193 if version >= 9 {
194 total_size +=
195 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
196 } else {
197 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
198 }
199 if version >= 8 && version <= 10 {
200 total_size += types::Int32.compute_size(&self.cluster_authorized_operations)?;
201 } else {
202 if self.cluster_authorized_operations != -2147483648 {
203 bail!("A field is set that is not available on the selected protocol version");
204 }
205 }
206 if version >= 9 {
207 let num_tagged_fields = self.unknown_tagged_fields.len();
208 if num_tagged_fields > std::u32::MAX as usize {
209 bail!(
210 "Too many tagged fields to encode ({} fields)",
211 num_tagged_fields
212 );
213 }
214 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
215
216 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
217 }
218 Ok(total_size)
219 }
220}
221
222#[cfg(feature = "client")]
223impl Decodable for MetadataResponse {
224 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
225 let throttle_time_ms = if version >= 3 {
226 types::Int32.decode(buf)?
227 } else {
228 0
229 };
230 let brokers = if version >= 9 {
231 types::CompactArray(types::Struct { version }).decode(buf)?
232 } else {
233 types::Array(types::Struct { version }).decode(buf)?
234 };
235 let cluster_id = if version >= 2 {
236 if version >= 9 {
237 types::CompactString.decode(buf)?
238 } else {
239 types::String.decode(buf)?
240 }
241 } else {
242 None
243 };
244 let controller_id = if version >= 1 {
245 types::Int32.decode(buf)?
246 } else {
247 (-1).into()
248 };
249 let topics = if version >= 9 {
250 types::CompactArray(types::Struct { version }).decode(buf)?
251 } else {
252 types::Array(types::Struct { version }).decode(buf)?
253 };
254 let cluster_authorized_operations = if version >= 8 && version <= 10 {
255 types::Int32.decode(buf)?
256 } else {
257 -2147483648
258 };
259 let mut unknown_tagged_fields = BTreeMap::new();
260 if version >= 9 {
261 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
262 for _ in 0..num_tagged_fields {
263 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
264 let size: u32 = types::UnsignedVarInt.decode(buf)?;
265 let unknown_value = buf.try_get_bytes(size as usize)?;
266 unknown_tagged_fields.insert(tag as i32, unknown_value);
267 }
268 }
269 Ok(Self {
270 throttle_time_ms,
271 brokers,
272 cluster_id,
273 controller_id,
274 topics,
275 cluster_authorized_operations,
276 unknown_tagged_fields,
277 })
278 }
279}
280
281impl Default for MetadataResponse {
282 fn default() -> Self {
283 Self {
284 throttle_time_ms: 0,
285 brokers: Default::default(),
286 cluster_id: None,
287 controller_id: (-1).into(),
288 topics: Default::default(),
289 cluster_authorized_operations: -2147483648,
290 unknown_tagged_fields: BTreeMap::new(),
291 }
292 }
293}
294
295impl Message for MetadataResponse {
296 const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
297 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
298}
299
300#[non_exhaustive]
302#[derive(Debug, Clone, PartialEq)]
303pub struct MetadataResponseBroker {
304 pub node_id: super::BrokerId,
308
309 pub host: StrBytes,
313
314 pub port: i32,
318
319 pub rack: Option<StrBytes>,
323
324 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
326}
327
328impl MetadataResponseBroker {
329 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
335 self.node_id = value;
336 self
337 }
338 pub fn with_host(mut self, value: StrBytes) -> Self {
344 self.host = value;
345 self
346 }
347 pub fn with_port(mut self, value: i32) -> Self {
353 self.port = value;
354 self
355 }
356 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
362 self.rack = value;
363 self
364 }
365 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
367 self.unknown_tagged_fields = value;
368 self
369 }
370 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
372 self.unknown_tagged_fields.insert(key, value);
373 self
374 }
375}
376
377#[cfg(feature = "broker")]
378impl Encodable for MetadataResponseBroker {
379 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
380 types::Int32.encode(buf, &self.node_id)?;
381 if version >= 9 {
382 types::CompactString.encode(buf, &self.host)?;
383 } else {
384 types::String.encode(buf, &self.host)?;
385 }
386 types::Int32.encode(buf, &self.port)?;
387 if version >= 1 {
388 if version >= 9 {
389 types::CompactString.encode(buf, &self.rack)?;
390 } else {
391 types::String.encode(buf, &self.rack)?;
392 }
393 }
394 if version >= 9 {
395 let num_tagged_fields = self.unknown_tagged_fields.len();
396 if num_tagged_fields > std::u32::MAX as usize {
397 bail!(
398 "Too many tagged fields to encode ({} fields)",
399 num_tagged_fields
400 );
401 }
402 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
403
404 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
405 }
406 Ok(())
407 }
408 fn compute_size(&self, version: i16) -> Result<usize> {
409 let mut total_size = 0;
410 total_size += types::Int32.compute_size(&self.node_id)?;
411 if version >= 9 {
412 total_size += types::CompactString.compute_size(&self.host)?;
413 } else {
414 total_size += types::String.compute_size(&self.host)?;
415 }
416 total_size += types::Int32.compute_size(&self.port)?;
417 if version >= 1 {
418 if version >= 9 {
419 total_size += types::CompactString.compute_size(&self.rack)?;
420 } else {
421 total_size += types::String.compute_size(&self.rack)?;
422 }
423 }
424 if version >= 9 {
425 let num_tagged_fields = self.unknown_tagged_fields.len();
426 if num_tagged_fields > std::u32::MAX as usize {
427 bail!(
428 "Too many tagged fields to encode ({} fields)",
429 num_tagged_fields
430 );
431 }
432 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
433
434 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
435 }
436 Ok(total_size)
437 }
438}
439
440#[cfg(feature = "client")]
441impl Decodable for MetadataResponseBroker {
442 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
443 let node_id = types::Int32.decode(buf)?;
444 let host = if version >= 9 {
445 types::CompactString.decode(buf)?
446 } else {
447 types::String.decode(buf)?
448 };
449 let port = types::Int32.decode(buf)?;
450 let rack = if version >= 1 {
451 if version >= 9 {
452 types::CompactString.decode(buf)?
453 } else {
454 types::String.decode(buf)?
455 }
456 } else {
457 None
458 };
459 let mut unknown_tagged_fields = BTreeMap::new();
460 if version >= 9 {
461 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
462 for _ in 0..num_tagged_fields {
463 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
464 let size: u32 = types::UnsignedVarInt.decode(buf)?;
465 let unknown_value = buf.try_get_bytes(size as usize)?;
466 unknown_tagged_fields.insert(tag as i32, unknown_value);
467 }
468 }
469 Ok(Self {
470 node_id,
471 host,
472 port,
473 rack,
474 unknown_tagged_fields,
475 })
476 }
477}
478
479impl Default for MetadataResponseBroker {
480 fn default() -> Self {
481 Self {
482 node_id: (0).into(),
483 host: Default::default(),
484 port: 0,
485 rack: None,
486 unknown_tagged_fields: BTreeMap::new(),
487 }
488 }
489}
490
491impl Message for MetadataResponseBroker {
492 const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
493 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
494}
495
496#[non_exhaustive]
498#[derive(Debug, Clone, PartialEq)]
499pub struct MetadataResponsePartition {
500 pub error_code: i16,
504
505 pub partition_index: i32,
509
510 pub leader_id: super::BrokerId,
514
515 pub leader_epoch: i32,
519
520 pub replica_nodes: Vec<super::BrokerId>,
524
525 pub isr_nodes: Vec<super::BrokerId>,
529
530 pub offline_replicas: Vec<super::BrokerId>,
534
535 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
537}
538
539impl MetadataResponsePartition {
540 pub fn with_error_code(mut self, value: i16) -> Self {
546 self.error_code = value;
547 self
548 }
549 pub fn with_partition_index(mut self, value: i32) -> Self {
555 self.partition_index = value;
556 self
557 }
558 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
564 self.leader_id = value;
565 self
566 }
567 pub fn with_leader_epoch(mut self, value: i32) -> Self {
573 self.leader_epoch = value;
574 self
575 }
576 pub fn with_replica_nodes(mut self, value: Vec<super::BrokerId>) -> Self {
582 self.replica_nodes = value;
583 self
584 }
585 pub fn with_isr_nodes(mut self, value: Vec<super::BrokerId>) -> Self {
591 self.isr_nodes = value;
592 self
593 }
594 pub fn with_offline_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
600 self.offline_replicas = value;
601 self
602 }
603 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
605 self.unknown_tagged_fields = value;
606 self
607 }
608 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
610 self.unknown_tagged_fields.insert(key, value);
611 self
612 }
613}
614
615#[cfg(feature = "broker")]
616impl Encodable for MetadataResponsePartition {
617 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
618 types::Int16.encode(buf, &self.error_code)?;
619 types::Int32.encode(buf, &self.partition_index)?;
620 types::Int32.encode(buf, &self.leader_id)?;
621 if version >= 7 {
622 types::Int32.encode(buf, &self.leader_epoch)?;
623 }
624 if version >= 9 {
625 types::CompactArray(types::Int32).encode(buf, &self.replica_nodes)?;
626 } else {
627 types::Array(types::Int32).encode(buf, &self.replica_nodes)?;
628 }
629 if version >= 9 {
630 types::CompactArray(types::Int32).encode(buf, &self.isr_nodes)?;
631 } else {
632 types::Array(types::Int32).encode(buf, &self.isr_nodes)?;
633 }
634 if version >= 5 {
635 if version >= 9 {
636 types::CompactArray(types::Int32).encode(buf, &self.offline_replicas)?;
637 } else {
638 types::Array(types::Int32).encode(buf, &self.offline_replicas)?;
639 }
640 }
641 if version >= 9 {
642 let num_tagged_fields = self.unknown_tagged_fields.len();
643 if num_tagged_fields > std::u32::MAX as usize {
644 bail!(
645 "Too many tagged fields to encode ({} fields)",
646 num_tagged_fields
647 );
648 }
649 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
650
651 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
652 }
653 Ok(())
654 }
655 fn compute_size(&self, version: i16) -> Result<usize> {
656 let mut total_size = 0;
657 total_size += types::Int16.compute_size(&self.error_code)?;
658 total_size += types::Int32.compute_size(&self.partition_index)?;
659 total_size += types::Int32.compute_size(&self.leader_id)?;
660 if version >= 7 {
661 total_size += types::Int32.compute_size(&self.leader_epoch)?;
662 }
663 if version >= 9 {
664 total_size += types::CompactArray(types::Int32).compute_size(&self.replica_nodes)?;
665 } else {
666 total_size += types::Array(types::Int32).compute_size(&self.replica_nodes)?;
667 }
668 if version >= 9 {
669 total_size += types::CompactArray(types::Int32).compute_size(&self.isr_nodes)?;
670 } else {
671 total_size += types::Array(types::Int32).compute_size(&self.isr_nodes)?;
672 }
673 if version >= 5 {
674 if version >= 9 {
675 total_size +=
676 types::CompactArray(types::Int32).compute_size(&self.offline_replicas)?;
677 } else {
678 total_size += types::Array(types::Int32).compute_size(&self.offline_replicas)?;
679 }
680 }
681 if version >= 9 {
682 let num_tagged_fields = self.unknown_tagged_fields.len();
683 if num_tagged_fields > std::u32::MAX as usize {
684 bail!(
685 "Too many tagged fields to encode ({} fields)",
686 num_tagged_fields
687 );
688 }
689 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
690
691 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
692 }
693 Ok(total_size)
694 }
695}
696
697#[cfg(feature = "client")]
698impl Decodable for MetadataResponsePartition {
699 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
700 let error_code = types::Int16.decode(buf)?;
701 let partition_index = types::Int32.decode(buf)?;
702 let leader_id = types::Int32.decode(buf)?;
703 let leader_epoch = if version >= 7 {
704 types::Int32.decode(buf)?
705 } else {
706 -1
707 };
708 let replica_nodes = if version >= 9 {
709 types::CompactArray(types::Int32).decode(buf)?
710 } else {
711 types::Array(types::Int32).decode(buf)?
712 };
713 let isr_nodes = if version >= 9 {
714 types::CompactArray(types::Int32).decode(buf)?
715 } else {
716 types::Array(types::Int32).decode(buf)?
717 };
718 let offline_replicas = if version >= 5 {
719 if version >= 9 {
720 types::CompactArray(types::Int32).decode(buf)?
721 } else {
722 types::Array(types::Int32).decode(buf)?
723 }
724 } else {
725 Default::default()
726 };
727 let mut unknown_tagged_fields = BTreeMap::new();
728 if version >= 9 {
729 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
730 for _ in 0..num_tagged_fields {
731 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
732 let size: u32 = types::UnsignedVarInt.decode(buf)?;
733 let unknown_value = buf.try_get_bytes(size as usize)?;
734 unknown_tagged_fields.insert(tag as i32, unknown_value);
735 }
736 }
737 Ok(Self {
738 error_code,
739 partition_index,
740 leader_id,
741 leader_epoch,
742 replica_nodes,
743 isr_nodes,
744 offline_replicas,
745 unknown_tagged_fields,
746 })
747 }
748}
749
750impl Default for MetadataResponsePartition {
751 fn default() -> Self {
752 Self {
753 error_code: 0,
754 partition_index: 0,
755 leader_id: (0).into(),
756 leader_epoch: -1,
757 replica_nodes: Default::default(),
758 isr_nodes: Default::default(),
759 offline_replicas: Default::default(),
760 unknown_tagged_fields: BTreeMap::new(),
761 }
762 }
763}
764
765impl Message for MetadataResponsePartition {
766 const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
767 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
768}
769
770#[non_exhaustive]
772#[derive(Debug, Clone, PartialEq)]
773pub struct MetadataResponseTopic {
774 pub error_code: i16,
778
779 pub name: Option<super::TopicName>,
783
784 pub topic_id: Uuid,
788
789 pub is_internal: bool,
793
794 pub partitions: Vec<MetadataResponsePartition>,
798
799 pub topic_authorized_operations: i32,
803
804 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
806}
807
808impl MetadataResponseTopic {
809 pub fn with_error_code(mut self, value: i16) -> Self {
815 self.error_code = value;
816 self
817 }
818 pub fn with_name(mut self, value: Option<super::TopicName>) -> Self {
824 self.name = value;
825 self
826 }
827 pub fn with_topic_id(mut self, value: Uuid) -> Self {
833 self.topic_id = value;
834 self
835 }
836 pub fn with_is_internal(mut self, value: bool) -> Self {
842 self.is_internal = value;
843 self
844 }
845 pub fn with_partitions(mut self, value: Vec<MetadataResponsePartition>) -> Self {
851 self.partitions = value;
852 self
853 }
854 pub fn with_topic_authorized_operations(mut self, value: i32) -> Self {
860 self.topic_authorized_operations = value;
861 self
862 }
863 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
865 self.unknown_tagged_fields = value;
866 self
867 }
868 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
870 self.unknown_tagged_fields.insert(key, value);
871 self
872 }
873}
874
875#[cfg(feature = "broker")]
876impl Encodable for MetadataResponseTopic {
877 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
878 types::Int16.encode(buf, &self.error_code)?;
879 if version >= 9 {
880 types::CompactString.encode(buf, &self.name)?;
881 } else {
882 types::String.encode(buf, &self.name)?;
883 }
884 if version >= 10 {
885 types::Uuid.encode(buf, &self.topic_id)?;
886 }
887 if version >= 1 {
888 types::Boolean.encode(buf, &self.is_internal)?;
889 }
890 if version >= 9 {
891 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
892 } else {
893 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
894 }
895 if version >= 8 {
896 types::Int32.encode(buf, &self.topic_authorized_operations)?;
897 } else {
898 if self.topic_authorized_operations != -2147483648 {
899 bail!("A field is set that is not available on the selected protocol version");
900 }
901 }
902 if version >= 9 {
903 let num_tagged_fields = self.unknown_tagged_fields.len();
904 if num_tagged_fields > std::u32::MAX as usize {
905 bail!(
906 "Too many tagged fields to encode ({} fields)",
907 num_tagged_fields
908 );
909 }
910 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
911
912 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
913 }
914 Ok(())
915 }
916 fn compute_size(&self, version: i16) -> Result<usize> {
917 let mut total_size = 0;
918 total_size += types::Int16.compute_size(&self.error_code)?;
919 if version >= 9 {
920 total_size += types::CompactString.compute_size(&self.name)?;
921 } else {
922 total_size += types::String.compute_size(&self.name)?;
923 }
924 if version >= 10 {
925 total_size += types::Uuid.compute_size(&self.topic_id)?;
926 }
927 if version >= 1 {
928 total_size += types::Boolean.compute_size(&self.is_internal)?;
929 }
930 if version >= 9 {
931 total_size +=
932 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
933 } else {
934 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
935 }
936 if version >= 8 {
937 total_size += types::Int32.compute_size(&self.topic_authorized_operations)?;
938 } else {
939 if self.topic_authorized_operations != -2147483648 {
940 bail!("A field is set that is not available on the selected protocol version");
941 }
942 }
943 if version >= 9 {
944 let num_tagged_fields = self.unknown_tagged_fields.len();
945 if num_tagged_fields > std::u32::MAX as usize {
946 bail!(
947 "Too many tagged fields to encode ({} fields)",
948 num_tagged_fields
949 );
950 }
951 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
952
953 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
954 }
955 Ok(total_size)
956 }
957}
958
959#[cfg(feature = "client")]
960impl Decodable for MetadataResponseTopic {
961 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
962 let error_code = types::Int16.decode(buf)?;
963 let name = if version >= 9 {
964 types::CompactString.decode(buf)?
965 } else {
966 types::String.decode(buf)?
967 };
968 let topic_id = if version >= 10 {
969 types::Uuid.decode(buf)?
970 } else {
971 Uuid::nil()
972 };
973 let is_internal = if version >= 1 {
974 types::Boolean.decode(buf)?
975 } else {
976 false
977 };
978 let partitions = if version >= 9 {
979 types::CompactArray(types::Struct { version }).decode(buf)?
980 } else {
981 types::Array(types::Struct { version }).decode(buf)?
982 };
983 let topic_authorized_operations = if version >= 8 {
984 types::Int32.decode(buf)?
985 } else {
986 -2147483648
987 };
988 let mut unknown_tagged_fields = BTreeMap::new();
989 if version >= 9 {
990 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
991 for _ in 0..num_tagged_fields {
992 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
993 let size: u32 = types::UnsignedVarInt.decode(buf)?;
994 let unknown_value = buf.try_get_bytes(size as usize)?;
995 unknown_tagged_fields.insert(tag as i32, unknown_value);
996 }
997 }
998 Ok(Self {
999 error_code,
1000 name,
1001 topic_id,
1002 is_internal,
1003 partitions,
1004 topic_authorized_operations,
1005 unknown_tagged_fields,
1006 })
1007 }
1008}
1009
1010impl Default for MetadataResponseTopic {
1011 fn default() -> Self {
1012 Self {
1013 error_code: 0,
1014 name: Some(Default::default()),
1015 topic_id: Uuid::nil(),
1016 is_internal: false,
1017 partitions: Default::default(),
1018 topic_authorized_operations: -2147483648,
1019 unknown_tagged_fields: BTreeMap::new(),
1020 }
1021 }
1022}
1023
1024impl Message for MetadataResponseTopic {
1025 const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
1026 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1027}
1028
1029impl HeaderVersion for MetadataResponse {
1030 fn header_version(version: i16) -> i16 {
1031 if version >= 9 {
1032 1
1033 } else {
1034 0
1035 }
1036 }
1037}