1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct MetadataResponse {
24 pub throttle_time_ms: i32,
28
29 pub brokers: Vec<MetadataResponseBroker>,
33
34 pub cluster_id: Option<StrBytes>,
38
39 pub controller_id: super::BrokerId,
43
44 pub topics: Vec<MetadataResponseTopic>,
48
49 pub cluster_authorized_operations: i32,
53
54 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl MetadataResponse {
59 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
65 self.throttle_time_ms = value;
66 self
67 }
68 pub fn with_brokers(mut self, value: Vec<MetadataResponseBroker>) -> Self {
74 self.brokers = value;
75 self
76 }
77 pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
83 self.cluster_id = value;
84 self
85 }
86 pub fn with_controller_id(mut self, value: super::BrokerId) -> Self {
92 self.controller_id = value;
93 self
94 }
95 pub fn with_topics(mut self, value: Vec<MetadataResponseTopic>) -> Self {
101 self.topics = value;
102 self
103 }
104 pub fn with_cluster_authorized_operations(mut self, value: i32) -> Self {
110 self.cluster_authorized_operations = value;
111 self
112 }
113 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115 self.unknown_tagged_fields = value;
116 self
117 }
118 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120 self.unknown_tagged_fields.insert(key, value);
121 self
122 }
123}
124
125#[cfg(feature = "broker")]
126impl Encodable for MetadataResponse {
127 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128 if version < 0 || version > 12 {
129 bail!("specified version not supported by this message type");
130 }
131 if version >= 3 {
132 types::Int32.encode(buf, &self.throttle_time_ms)?;
133 }
134 if version >= 9 {
135 types::CompactArray(types::Struct { version }).encode(buf, &self.brokers)?;
136 } else {
137 types::Array(types::Struct { version }).encode(buf, &self.brokers)?;
138 }
139 if version >= 2 {
140 if version >= 9 {
141 types::CompactString.encode(buf, &self.cluster_id)?;
142 } else {
143 types::String.encode(buf, &self.cluster_id)?;
144 }
145 }
146 if version >= 1 {
147 types::Int32.encode(buf, &self.controller_id)?;
148 }
149 if version >= 9 {
150 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
151 } else {
152 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
153 }
154 if version >= 8 && version <= 10 {
155 types::Int32.encode(buf, &self.cluster_authorized_operations)?;
156 } else {
157 if self.cluster_authorized_operations != -2147483648 {
158 bail!("A field is set that is not available on the selected protocol version");
159 }
160 }
161 if version >= 9 {
162 let num_tagged_fields = self.unknown_tagged_fields.len();
163 if num_tagged_fields > std::u32::MAX as usize {
164 bail!(
165 "Too many tagged fields to encode ({} fields)",
166 num_tagged_fields
167 );
168 }
169 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
170
171 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
172 }
173 Ok(())
174 }
175 fn compute_size(&self, version: i16) -> Result<usize> {
176 let mut total_size = 0;
177 if version >= 3 {
178 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
179 }
180 if version >= 9 {
181 total_size +=
182 types::CompactArray(types::Struct { version }).compute_size(&self.brokers)?;
183 } else {
184 total_size += types::Array(types::Struct { version }).compute_size(&self.brokers)?;
185 }
186 if version >= 2 {
187 if version >= 9 {
188 total_size += types::CompactString.compute_size(&self.cluster_id)?;
189 } else {
190 total_size += types::String.compute_size(&self.cluster_id)?;
191 }
192 }
193 if version >= 1 {
194 total_size += types::Int32.compute_size(&self.controller_id)?;
195 }
196 if version >= 9 {
197 total_size +=
198 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
199 } else {
200 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
201 }
202 if version >= 8 && version <= 10 {
203 total_size += types::Int32.compute_size(&self.cluster_authorized_operations)?;
204 } else {
205 if self.cluster_authorized_operations != -2147483648 {
206 bail!("A field is set that is not available on the selected protocol version");
207 }
208 }
209 if version >= 9 {
210 let num_tagged_fields = self.unknown_tagged_fields.len();
211 if num_tagged_fields > std::u32::MAX as usize {
212 bail!(
213 "Too many tagged fields to encode ({} fields)",
214 num_tagged_fields
215 );
216 }
217 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
218
219 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
220 }
221 Ok(total_size)
222 }
223}
224
225#[cfg(feature = "client")]
226impl Decodable for MetadataResponse {
227 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
228 if version < 0 || version > 12 {
229 bail!("specified version not supported by this message type");
230 }
231 let throttle_time_ms = if version >= 3 {
232 types::Int32.decode(buf)?
233 } else {
234 0
235 };
236 let brokers = if version >= 9 {
237 types::CompactArray(types::Struct { version }).decode(buf)?
238 } else {
239 types::Array(types::Struct { version }).decode(buf)?
240 };
241 let cluster_id = if version >= 2 {
242 if version >= 9 {
243 types::CompactString.decode(buf)?
244 } else {
245 types::String.decode(buf)?
246 }
247 } else {
248 None
249 };
250 let controller_id = if version >= 1 {
251 types::Int32.decode(buf)?
252 } else {
253 (-1).into()
254 };
255 let topics = if version >= 9 {
256 types::CompactArray(types::Struct { version }).decode(buf)?
257 } else {
258 types::Array(types::Struct { version }).decode(buf)?
259 };
260 let cluster_authorized_operations = if version >= 8 && version <= 10 {
261 types::Int32.decode(buf)?
262 } else {
263 -2147483648
264 };
265 let mut unknown_tagged_fields = BTreeMap::new();
266 if version >= 9 {
267 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
268 for _ in 0..num_tagged_fields {
269 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
270 let size: u32 = types::UnsignedVarInt.decode(buf)?;
271 let unknown_value = buf.try_get_bytes(size as usize)?;
272 unknown_tagged_fields.insert(tag as i32, unknown_value);
273 }
274 }
275 Ok(Self {
276 throttle_time_ms,
277 brokers,
278 cluster_id,
279 controller_id,
280 topics,
281 cluster_authorized_operations,
282 unknown_tagged_fields,
283 })
284 }
285}
286
287impl Default for MetadataResponse {
288 fn default() -> Self {
289 Self {
290 throttle_time_ms: 0,
291 brokers: Default::default(),
292 cluster_id: None,
293 controller_id: (-1).into(),
294 topics: Default::default(),
295 cluster_authorized_operations: -2147483648,
296 unknown_tagged_fields: BTreeMap::new(),
297 }
298 }
299}
300
301impl Message for MetadataResponse {
302 const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
303 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
304}
305
306#[non_exhaustive]
308#[derive(Debug, Clone, PartialEq)]
309pub struct MetadataResponseBroker {
310 pub node_id: super::BrokerId,
314
315 pub host: StrBytes,
319
320 pub port: i32,
324
325 pub rack: Option<StrBytes>,
329
330 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
332}
333
334impl MetadataResponseBroker {
335 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
341 self.node_id = value;
342 self
343 }
344 pub fn with_host(mut self, value: StrBytes) -> Self {
350 self.host = value;
351 self
352 }
353 pub fn with_port(mut self, value: i32) -> Self {
359 self.port = value;
360 self
361 }
362 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
368 self.rack = value;
369 self
370 }
371 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
373 self.unknown_tagged_fields = value;
374 self
375 }
376 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
378 self.unknown_tagged_fields.insert(key, value);
379 self
380 }
381}
382
383#[cfg(feature = "broker")]
384impl Encodable for MetadataResponseBroker {
385 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
386 if version < 0 || version > 12 {
387 bail!("specified version not supported by this message type");
388 }
389 types::Int32.encode(buf, &self.node_id)?;
390 if version >= 9 {
391 types::CompactString.encode(buf, &self.host)?;
392 } else {
393 types::String.encode(buf, &self.host)?;
394 }
395 types::Int32.encode(buf, &self.port)?;
396 if version >= 1 {
397 if version >= 9 {
398 types::CompactString.encode(buf, &self.rack)?;
399 } else {
400 types::String.encode(buf, &self.rack)?;
401 }
402 }
403 if version >= 9 {
404 let num_tagged_fields = self.unknown_tagged_fields.len();
405 if num_tagged_fields > std::u32::MAX as usize {
406 bail!(
407 "Too many tagged fields to encode ({} fields)",
408 num_tagged_fields
409 );
410 }
411 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
412
413 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
414 }
415 Ok(())
416 }
417 fn compute_size(&self, version: i16) -> Result<usize> {
418 let mut total_size = 0;
419 total_size += types::Int32.compute_size(&self.node_id)?;
420 if version >= 9 {
421 total_size += types::CompactString.compute_size(&self.host)?;
422 } else {
423 total_size += types::String.compute_size(&self.host)?;
424 }
425 total_size += types::Int32.compute_size(&self.port)?;
426 if version >= 1 {
427 if version >= 9 {
428 total_size += types::CompactString.compute_size(&self.rack)?;
429 } else {
430 total_size += types::String.compute_size(&self.rack)?;
431 }
432 }
433 if version >= 9 {
434 let num_tagged_fields = self.unknown_tagged_fields.len();
435 if num_tagged_fields > std::u32::MAX as usize {
436 bail!(
437 "Too many tagged fields to encode ({} fields)",
438 num_tagged_fields
439 );
440 }
441 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
442
443 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
444 }
445 Ok(total_size)
446 }
447}
448
449#[cfg(feature = "client")]
450impl Decodable for MetadataResponseBroker {
451 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
452 if version < 0 || version > 12 {
453 bail!("specified version not supported by this message type");
454 }
455 let node_id = types::Int32.decode(buf)?;
456 let host = if version >= 9 {
457 types::CompactString.decode(buf)?
458 } else {
459 types::String.decode(buf)?
460 };
461 let port = types::Int32.decode(buf)?;
462 let rack = if version >= 1 {
463 if version >= 9 {
464 types::CompactString.decode(buf)?
465 } else {
466 types::String.decode(buf)?
467 }
468 } else {
469 None
470 };
471 let mut unknown_tagged_fields = BTreeMap::new();
472 if version >= 9 {
473 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
474 for _ in 0..num_tagged_fields {
475 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
476 let size: u32 = types::UnsignedVarInt.decode(buf)?;
477 let unknown_value = buf.try_get_bytes(size as usize)?;
478 unknown_tagged_fields.insert(tag as i32, unknown_value);
479 }
480 }
481 Ok(Self {
482 node_id,
483 host,
484 port,
485 rack,
486 unknown_tagged_fields,
487 })
488 }
489}
490
491impl Default for MetadataResponseBroker {
492 fn default() -> Self {
493 Self {
494 node_id: (0).into(),
495 host: Default::default(),
496 port: 0,
497 rack: None,
498 unknown_tagged_fields: BTreeMap::new(),
499 }
500 }
501}
502
503impl Message for MetadataResponseBroker {
504 const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
505 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
506}
507
508#[non_exhaustive]
510#[derive(Debug, Clone, PartialEq)]
511pub struct MetadataResponsePartition {
512 pub error_code: i16,
516
517 pub partition_index: i32,
521
522 pub leader_id: super::BrokerId,
526
527 pub leader_epoch: i32,
531
532 pub replica_nodes: Vec<super::BrokerId>,
536
537 pub isr_nodes: Vec<super::BrokerId>,
541
542 pub offline_replicas: Vec<super::BrokerId>,
546
547 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
549}
550
551impl MetadataResponsePartition {
552 pub fn with_error_code(mut self, value: i16) -> Self {
558 self.error_code = value;
559 self
560 }
561 pub fn with_partition_index(mut self, value: i32) -> Self {
567 self.partition_index = value;
568 self
569 }
570 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
576 self.leader_id = value;
577 self
578 }
579 pub fn with_leader_epoch(mut self, value: i32) -> Self {
585 self.leader_epoch = value;
586 self
587 }
588 pub fn with_replica_nodes(mut self, value: Vec<super::BrokerId>) -> Self {
594 self.replica_nodes = value;
595 self
596 }
597 pub fn with_isr_nodes(mut self, value: Vec<super::BrokerId>) -> Self {
603 self.isr_nodes = value;
604 self
605 }
606 pub fn with_offline_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
612 self.offline_replicas = value;
613 self
614 }
615 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
617 self.unknown_tagged_fields = value;
618 self
619 }
620 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
622 self.unknown_tagged_fields.insert(key, value);
623 self
624 }
625}
626
627#[cfg(feature = "broker")]
628impl Encodable for MetadataResponsePartition {
629 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
630 if version < 0 || version > 12 {
631 bail!("specified version not supported by this message type");
632 }
633 types::Int16.encode(buf, &self.error_code)?;
634 types::Int32.encode(buf, &self.partition_index)?;
635 types::Int32.encode(buf, &self.leader_id)?;
636 if version >= 7 {
637 types::Int32.encode(buf, &self.leader_epoch)?;
638 }
639 if version >= 9 {
640 types::CompactArray(types::Int32).encode(buf, &self.replica_nodes)?;
641 } else {
642 types::Array(types::Int32).encode(buf, &self.replica_nodes)?;
643 }
644 if version >= 9 {
645 types::CompactArray(types::Int32).encode(buf, &self.isr_nodes)?;
646 } else {
647 types::Array(types::Int32).encode(buf, &self.isr_nodes)?;
648 }
649 if version >= 5 {
650 if version >= 9 {
651 types::CompactArray(types::Int32).encode(buf, &self.offline_replicas)?;
652 } else {
653 types::Array(types::Int32).encode(buf, &self.offline_replicas)?;
654 }
655 }
656 if version >= 9 {
657 let num_tagged_fields = self.unknown_tagged_fields.len();
658 if num_tagged_fields > std::u32::MAX as usize {
659 bail!(
660 "Too many tagged fields to encode ({} fields)",
661 num_tagged_fields
662 );
663 }
664 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
665
666 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
667 }
668 Ok(())
669 }
670 fn compute_size(&self, version: i16) -> Result<usize> {
671 let mut total_size = 0;
672 total_size += types::Int16.compute_size(&self.error_code)?;
673 total_size += types::Int32.compute_size(&self.partition_index)?;
674 total_size += types::Int32.compute_size(&self.leader_id)?;
675 if version >= 7 {
676 total_size += types::Int32.compute_size(&self.leader_epoch)?;
677 }
678 if version >= 9 {
679 total_size += types::CompactArray(types::Int32).compute_size(&self.replica_nodes)?;
680 } else {
681 total_size += types::Array(types::Int32).compute_size(&self.replica_nodes)?;
682 }
683 if version >= 9 {
684 total_size += types::CompactArray(types::Int32).compute_size(&self.isr_nodes)?;
685 } else {
686 total_size += types::Array(types::Int32).compute_size(&self.isr_nodes)?;
687 }
688 if version >= 5 {
689 if version >= 9 {
690 total_size +=
691 types::CompactArray(types::Int32).compute_size(&self.offline_replicas)?;
692 } else {
693 total_size += types::Array(types::Int32).compute_size(&self.offline_replicas)?;
694 }
695 }
696 if version >= 9 {
697 let num_tagged_fields = self.unknown_tagged_fields.len();
698 if num_tagged_fields > std::u32::MAX as usize {
699 bail!(
700 "Too many tagged fields to encode ({} fields)",
701 num_tagged_fields
702 );
703 }
704 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
705
706 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
707 }
708 Ok(total_size)
709 }
710}
711
712#[cfg(feature = "client")]
713impl Decodable for MetadataResponsePartition {
714 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
715 if version < 0 || version > 12 {
716 bail!("specified version not supported by this message type");
717 }
718 let error_code = types::Int16.decode(buf)?;
719 let partition_index = types::Int32.decode(buf)?;
720 let leader_id = types::Int32.decode(buf)?;
721 let leader_epoch = if version >= 7 {
722 types::Int32.decode(buf)?
723 } else {
724 -1
725 };
726 let replica_nodes = if version >= 9 {
727 types::CompactArray(types::Int32).decode(buf)?
728 } else {
729 types::Array(types::Int32).decode(buf)?
730 };
731 let isr_nodes = if version >= 9 {
732 types::CompactArray(types::Int32).decode(buf)?
733 } else {
734 types::Array(types::Int32).decode(buf)?
735 };
736 let offline_replicas = if version >= 5 {
737 if version >= 9 {
738 types::CompactArray(types::Int32).decode(buf)?
739 } else {
740 types::Array(types::Int32).decode(buf)?
741 }
742 } else {
743 Default::default()
744 };
745 let mut unknown_tagged_fields = BTreeMap::new();
746 if version >= 9 {
747 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
748 for _ in 0..num_tagged_fields {
749 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
750 let size: u32 = types::UnsignedVarInt.decode(buf)?;
751 let unknown_value = buf.try_get_bytes(size as usize)?;
752 unknown_tagged_fields.insert(tag as i32, unknown_value);
753 }
754 }
755 Ok(Self {
756 error_code,
757 partition_index,
758 leader_id,
759 leader_epoch,
760 replica_nodes,
761 isr_nodes,
762 offline_replicas,
763 unknown_tagged_fields,
764 })
765 }
766}
767
768impl Default for MetadataResponsePartition {
769 fn default() -> Self {
770 Self {
771 error_code: 0,
772 partition_index: 0,
773 leader_id: (0).into(),
774 leader_epoch: -1,
775 replica_nodes: Default::default(),
776 isr_nodes: Default::default(),
777 offline_replicas: Default::default(),
778 unknown_tagged_fields: BTreeMap::new(),
779 }
780 }
781}
782
783impl Message for MetadataResponsePartition {
784 const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
785 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
786}
787
788#[non_exhaustive]
790#[derive(Debug, Clone, PartialEq)]
791pub struct MetadataResponseTopic {
792 pub error_code: i16,
796
797 pub name: Option<super::TopicName>,
801
802 pub topic_id: Uuid,
806
807 pub is_internal: bool,
811
812 pub partitions: Vec<MetadataResponsePartition>,
816
817 pub topic_authorized_operations: i32,
821
822 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
824}
825
826impl MetadataResponseTopic {
827 pub fn with_error_code(mut self, value: i16) -> Self {
833 self.error_code = value;
834 self
835 }
836 pub fn with_name(mut self, value: Option<super::TopicName>) -> Self {
842 self.name = value;
843 self
844 }
845 pub fn with_topic_id(mut self, value: Uuid) -> Self {
851 self.topic_id = value;
852 self
853 }
854 pub fn with_is_internal(mut self, value: bool) -> Self {
860 self.is_internal = value;
861 self
862 }
863 pub fn with_partitions(mut self, value: Vec<MetadataResponsePartition>) -> Self {
869 self.partitions = value;
870 self
871 }
872 pub fn with_topic_authorized_operations(mut self, value: i32) -> Self {
878 self.topic_authorized_operations = value;
879 self
880 }
881 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
883 self.unknown_tagged_fields = value;
884 self
885 }
886 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
888 self.unknown_tagged_fields.insert(key, value);
889 self
890 }
891}
892
893#[cfg(feature = "broker")]
894impl Encodable for MetadataResponseTopic {
895 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
896 if version < 0 || version > 12 {
897 bail!("specified version not supported by this message type");
898 }
899 types::Int16.encode(buf, &self.error_code)?;
900 if version >= 9 {
901 types::CompactString.encode(buf, &self.name)?;
902 } else {
903 types::String.encode(buf, &self.name)?;
904 }
905 if version >= 10 {
906 types::Uuid.encode(buf, &self.topic_id)?;
907 }
908 if version >= 1 {
909 types::Boolean.encode(buf, &self.is_internal)?;
910 }
911 if version >= 9 {
912 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
913 } else {
914 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
915 }
916 if version >= 8 {
917 types::Int32.encode(buf, &self.topic_authorized_operations)?;
918 } else {
919 if self.topic_authorized_operations != -2147483648 {
920 bail!("A field is set that is not available on the selected protocol version");
921 }
922 }
923 if version >= 9 {
924 let num_tagged_fields = self.unknown_tagged_fields.len();
925 if num_tagged_fields > std::u32::MAX as usize {
926 bail!(
927 "Too many tagged fields to encode ({} fields)",
928 num_tagged_fields
929 );
930 }
931 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
932
933 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
934 }
935 Ok(())
936 }
937 fn compute_size(&self, version: i16) -> Result<usize> {
938 let mut total_size = 0;
939 total_size += types::Int16.compute_size(&self.error_code)?;
940 if version >= 9 {
941 total_size += types::CompactString.compute_size(&self.name)?;
942 } else {
943 total_size += types::String.compute_size(&self.name)?;
944 }
945 if version >= 10 {
946 total_size += types::Uuid.compute_size(&self.topic_id)?;
947 }
948 if version >= 1 {
949 total_size += types::Boolean.compute_size(&self.is_internal)?;
950 }
951 if version >= 9 {
952 total_size +=
953 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
954 } else {
955 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
956 }
957 if version >= 8 {
958 total_size += types::Int32.compute_size(&self.topic_authorized_operations)?;
959 } else {
960 if self.topic_authorized_operations != -2147483648 {
961 bail!("A field is set that is not available on the selected protocol version");
962 }
963 }
964 if version >= 9 {
965 let num_tagged_fields = self.unknown_tagged_fields.len();
966 if num_tagged_fields > std::u32::MAX as usize {
967 bail!(
968 "Too many tagged fields to encode ({} fields)",
969 num_tagged_fields
970 );
971 }
972 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
973
974 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
975 }
976 Ok(total_size)
977 }
978}
979
980#[cfg(feature = "client")]
981impl Decodable for MetadataResponseTopic {
982 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
983 if version < 0 || version > 12 {
984 bail!("specified version not supported by this message type");
985 }
986 let error_code = types::Int16.decode(buf)?;
987 let name = if version >= 9 {
988 types::CompactString.decode(buf)?
989 } else {
990 types::String.decode(buf)?
991 };
992 let topic_id = if version >= 10 {
993 types::Uuid.decode(buf)?
994 } else {
995 Uuid::nil()
996 };
997 let is_internal = if version >= 1 {
998 types::Boolean.decode(buf)?
999 } else {
1000 false
1001 };
1002 let partitions = if version >= 9 {
1003 types::CompactArray(types::Struct { version }).decode(buf)?
1004 } else {
1005 types::Array(types::Struct { version }).decode(buf)?
1006 };
1007 let topic_authorized_operations = if version >= 8 {
1008 types::Int32.decode(buf)?
1009 } else {
1010 -2147483648
1011 };
1012 let mut unknown_tagged_fields = BTreeMap::new();
1013 if version >= 9 {
1014 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1015 for _ in 0..num_tagged_fields {
1016 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1017 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1018 let unknown_value = buf.try_get_bytes(size as usize)?;
1019 unknown_tagged_fields.insert(tag as i32, unknown_value);
1020 }
1021 }
1022 Ok(Self {
1023 error_code,
1024 name,
1025 topic_id,
1026 is_internal,
1027 partitions,
1028 topic_authorized_operations,
1029 unknown_tagged_fields,
1030 })
1031 }
1032}
1033
1034impl Default for MetadataResponseTopic {
1035 fn default() -> Self {
1036 Self {
1037 error_code: 0,
1038 name: Some(Default::default()),
1039 topic_id: Uuid::nil(),
1040 is_internal: false,
1041 partitions: Default::default(),
1042 topic_authorized_operations: -2147483648,
1043 unknown_tagged_fields: BTreeMap::new(),
1044 }
1045 }
1046}
1047
1048impl Message for MetadataResponseTopic {
1049 const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
1050 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1051}
1052
1053impl HeaderVersion for MetadataResponse {
1054 fn header_version(version: i16) -> i16 {
1055 if version >= 9 {
1056 1
1057 } else {
1058 0
1059 }
1060 }
1061}