1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct MetadataResponse {
24 pub throttle_time_ms: i32,
28
29 pub brokers: Vec<MetadataResponseBroker>,
33
34 pub cluster_id: Option<StrBytes>,
38
39 pub controller_id: super::BrokerId,
43
44 pub topics: Vec<MetadataResponseTopic>,
48
49 pub cluster_authorized_operations: i32,
53
54 pub error_code: i16,
58
59 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
61}
62
63impl MetadataResponse {
64 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
70 self.throttle_time_ms = value;
71 self
72 }
73 pub fn with_brokers(mut self, value: Vec<MetadataResponseBroker>) -> Self {
79 self.brokers = value;
80 self
81 }
82 pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
88 self.cluster_id = value;
89 self
90 }
91 pub fn with_controller_id(mut self, value: super::BrokerId) -> Self {
97 self.controller_id = value;
98 self
99 }
100 pub fn with_topics(mut self, value: Vec<MetadataResponseTopic>) -> Self {
106 self.topics = value;
107 self
108 }
109 pub fn with_cluster_authorized_operations(mut self, value: i32) -> Self {
115 self.cluster_authorized_operations = value;
116 self
117 }
118 pub fn with_error_code(mut self, value: i16) -> Self {
124 self.error_code = value;
125 self
126 }
127 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
129 self.unknown_tagged_fields = value;
130 self
131 }
132 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
134 self.unknown_tagged_fields.insert(key, value);
135 self
136 }
137}
138
139#[cfg(feature = "broker")]
140impl Encodable for MetadataResponse {
141 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
142 if version < 0 || version > 13 {
143 bail!("specified version not supported by this message type");
144 }
145 if version >= 3 {
146 types::Int32.encode(buf, &self.throttle_time_ms)?;
147 }
148 if version >= 9 {
149 types::CompactArray(types::Struct { version }).encode(buf, &self.brokers)?;
150 } else {
151 types::Array(types::Struct { version }).encode(buf, &self.brokers)?;
152 }
153 if version >= 2 {
154 if version >= 9 {
155 types::CompactString.encode(buf, &self.cluster_id)?;
156 } else {
157 types::String.encode(buf, &self.cluster_id)?;
158 }
159 }
160 if version >= 1 {
161 types::Int32.encode(buf, &self.controller_id)?;
162 }
163 if version >= 9 {
164 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
165 } else {
166 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
167 }
168 if version >= 8 && version <= 10 {
169 types::Int32.encode(buf, &self.cluster_authorized_operations)?;
170 } else {
171 if self.cluster_authorized_operations != -2147483648 {
172 bail!("A field is set that is not available on the selected protocol version");
173 }
174 }
175 if version >= 13 {
176 types::Int16.encode(buf, &self.error_code)?;
177 }
178 if version >= 9 {
179 let num_tagged_fields = self.unknown_tagged_fields.len();
180 if num_tagged_fields > std::u32::MAX as usize {
181 bail!(
182 "Too many tagged fields to encode ({} fields)",
183 num_tagged_fields
184 );
185 }
186 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
187
188 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
189 }
190 Ok(())
191 }
192 fn compute_size(&self, version: i16) -> Result<usize> {
193 let mut total_size = 0;
194 if version >= 3 {
195 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
196 }
197 if version >= 9 {
198 total_size +=
199 types::CompactArray(types::Struct { version }).compute_size(&self.brokers)?;
200 } else {
201 total_size += types::Array(types::Struct { version }).compute_size(&self.brokers)?;
202 }
203 if version >= 2 {
204 if version >= 9 {
205 total_size += types::CompactString.compute_size(&self.cluster_id)?;
206 } else {
207 total_size += types::String.compute_size(&self.cluster_id)?;
208 }
209 }
210 if version >= 1 {
211 total_size += types::Int32.compute_size(&self.controller_id)?;
212 }
213 if version >= 9 {
214 total_size +=
215 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
216 } else {
217 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
218 }
219 if version >= 8 && version <= 10 {
220 total_size += types::Int32.compute_size(&self.cluster_authorized_operations)?;
221 } else {
222 if self.cluster_authorized_operations != -2147483648 {
223 bail!("A field is set that is not available on the selected protocol version");
224 }
225 }
226 if version >= 13 {
227 total_size += types::Int16.compute_size(&self.error_code)?;
228 }
229 if version >= 9 {
230 let num_tagged_fields = self.unknown_tagged_fields.len();
231 if num_tagged_fields > std::u32::MAX as usize {
232 bail!(
233 "Too many tagged fields to encode ({} fields)",
234 num_tagged_fields
235 );
236 }
237 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
238
239 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
240 }
241 Ok(total_size)
242 }
243}
244
245#[cfg(feature = "client")]
246impl Decodable for MetadataResponse {
247 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
248 if version < 0 || version > 13 {
249 bail!("specified version not supported by this message type");
250 }
251 let throttle_time_ms = if version >= 3 {
252 types::Int32.decode(buf)?
253 } else {
254 0
255 };
256 let brokers = if version >= 9 {
257 types::CompactArray(types::Struct { version }).decode(buf)?
258 } else {
259 types::Array(types::Struct { version }).decode(buf)?
260 };
261 let cluster_id = if version >= 2 {
262 if version >= 9 {
263 types::CompactString.decode(buf)?
264 } else {
265 types::String.decode(buf)?
266 }
267 } else {
268 None
269 };
270 let controller_id = if version >= 1 {
271 types::Int32.decode(buf)?
272 } else {
273 (-1).into()
274 };
275 let topics = if version >= 9 {
276 types::CompactArray(types::Struct { version }).decode(buf)?
277 } else {
278 types::Array(types::Struct { version }).decode(buf)?
279 };
280 let cluster_authorized_operations = if version >= 8 && version <= 10 {
281 types::Int32.decode(buf)?
282 } else {
283 -2147483648
284 };
285 let error_code = if version >= 13 {
286 types::Int16.decode(buf)?
287 } else {
288 0
289 };
290 let mut unknown_tagged_fields = BTreeMap::new();
291 if version >= 9 {
292 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
293 for _ in 0..num_tagged_fields {
294 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
295 let size: u32 = types::UnsignedVarInt.decode(buf)?;
296 let unknown_value = buf.try_get_bytes(size as usize)?;
297 unknown_tagged_fields.insert(tag as i32, unknown_value);
298 }
299 }
300 Ok(Self {
301 throttle_time_ms,
302 brokers,
303 cluster_id,
304 controller_id,
305 topics,
306 cluster_authorized_operations,
307 error_code,
308 unknown_tagged_fields,
309 })
310 }
311}
312
313impl Default for MetadataResponse {
314 fn default() -> Self {
315 Self {
316 throttle_time_ms: 0,
317 brokers: Default::default(),
318 cluster_id: None,
319 controller_id: (-1).into(),
320 topics: Default::default(),
321 cluster_authorized_operations: -2147483648,
322 error_code: 0,
323 unknown_tagged_fields: BTreeMap::new(),
324 }
325 }
326}
327
328impl Message for MetadataResponse {
329 const VERSIONS: VersionRange = VersionRange { min: 0, max: 13 };
330 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
331}
332
333#[non_exhaustive]
335#[derive(Debug, Clone, PartialEq)]
336pub struct MetadataResponseBroker {
337 pub node_id: super::BrokerId,
341
342 pub host: StrBytes,
346
347 pub port: i32,
351
352 pub rack: Option<StrBytes>,
356
357 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
359}
360
361impl MetadataResponseBroker {
362 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
368 self.node_id = value;
369 self
370 }
371 pub fn with_host(mut self, value: StrBytes) -> Self {
377 self.host = value;
378 self
379 }
380 pub fn with_port(mut self, value: i32) -> Self {
386 self.port = value;
387 self
388 }
389 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
395 self.rack = value;
396 self
397 }
398 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
400 self.unknown_tagged_fields = value;
401 self
402 }
403 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
405 self.unknown_tagged_fields.insert(key, value);
406 self
407 }
408}
409
410#[cfg(feature = "broker")]
411impl Encodable for MetadataResponseBroker {
412 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
413 if version < 0 || version > 13 {
414 bail!("specified version not supported by this message type");
415 }
416 types::Int32.encode(buf, &self.node_id)?;
417 if version >= 9 {
418 types::CompactString.encode(buf, &self.host)?;
419 } else {
420 types::String.encode(buf, &self.host)?;
421 }
422 types::Int32.encode(buf, &self.port)?;
423 if version >= 1 {
424 if version >= 9 {
425 types::CompactString.encode(buf, &self.rack)?;
426 } else {
427 types::String.encode(buf, &self.rack)?;
428 }
429 }
430 if version >= 9 {
431 let num_tagged_fields = self.unknown_tagged_fields.len();
432 if num_tagged_fields > std::u32::MAX as usize {
433 bail!(
434 "Too many tagged fields to encode ({} fields)",
435 num_tagged_fields
436 );
437 }
438 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
439
440 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
441 }
442 Ok(())
443 }
444 fn compute_size(&self, version: i16) -> Result<usize> {
445 let mut total_size = 0;
446 total_size += types::Int32.compute_size(&self.node_id)?;
447 if version >= 9 {
448 total_size += types::CompactString.compute_size(&self.host)?;
449 } else {
450 total_size += types::String.compute_size(&self.host)?;
451 }
452 total_size += types::Int32.compute_size(&self.port)?;
453 if version >= 1 {
454 if version >= 9 {
455 total_size += types::CompactString.compute_size(&self.rack)?;
456 } else {
457 total_size += types::String.compute_size(&self.rack)?;
458 }
459 }
460 if version >= 9 {
461 let num_tagged_fields = self.unknown_tagged_fields.len();
462 if num_tagged_fields > std::u32::MAX as usize {
463 bail!(
464 "Too many tagged fields to encode ({} fields)",
465 num_tagged_fields
466 );
467 }
468 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
469
470 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
471 }
472 Ok(total_size)
473 }
474}
475
476#[cfg(feature = "client")]
477impl Decodable for MetadataResponseBroker {
478 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
479 if version < 0 || version > 13 {
480 bail!("specified version not supported by this message type");
481 }
482 let node_id = types::Int32.decode(buf)?;
483 let host = if version >= 9 {
484 types::CompactString.decode(buf)?
485 } else {
486 types::String.decode(buf)?
487 };
488 let port = types::Int32.decode(buf)?;
489 let rack = if version >= 1 {
490 if version >= 9 {
491 types::CompactString.decode(buf)?
492 } else {
493 types::String.decode(buf)?
494 }
495 } else {
496 None
497 };
498 let mut unknown_tagged_fields = BTreeMap::new();
499 if version >= 9 {
500 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
501 for _ in 0..num_tagged_fields {
502 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
503 let size: u32 = types::UnsignedVarInt.decode(buf)?;
504 let unknown_value = buf.try_get_bytes(size as usize)?;
505 unknown_tagged_fields.insert(tag as i32, unknown_value);
506 }
507 }
508 Ok(Self {
509 node_id,
510 host,
511 port,
512 rack,
513 unknown_tagged_fields,
514 })
515 }
516}
517
518impl Default for MetadataResponseBroker {
519 fn default() -> Self {
520 Self {
521 node_id: (0).into(),
522 host: Default::default(),
523 port: 0,
524 rack: None,
525 unknown_tagged_fields: BTreeMap::new(),
526 }
527 }
528}
529
530impl Message for MetadataResponseBroker {
531 const VERSIONS: VersionRange = VersionRange { min: 0, max: 13 };
532 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
533}
534
535#[non_exhaustive]
537#[derive(Debug, Clone, PartialEq)]
538pub struct MetadataResponsePartition {
539 pub error_code: i16,
543
544 pub partition_index: i32,
548
549 pub leader_id: super::BrokerId,
553
554 pub leader_epoch: i32,
558
559 pub replica_nodes: Vec<super::BrokerId>,
563
564 pub isr_nodes: Vec<super::BrokerId>,
568
569 pub offline_replicas: Vec<super::BrokerId>,
573
574 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
576}
577
578impl MetadataResponsePartition {
579 pub fn with_error_code(mut self, value: i16) -> Self {
585 self.error_code = value;
586 self
587 }
588 pub fn with_partition_index(mut self, value: i32) -> Self {
594 self.partition_index = value;
595 self
596 }
597 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
603 self.leader_id = value;
604 self
605 }
606 pub fn with_leader_epoch(mut self, value: i32) -> Self {
612 self.leader_epoch = value;
613 self
614 }
615 pub fn with_replica_nodes(mut self, value: Vec<super::BrokerId>) -> Self {
621 self.replica_nodes = value;
622 self
623 }
624 pub fn with_isr_nodes(mut self, value: Vec<super::BrokerId>) -> Self {
630 self.isr_nodes = value;
631 self
632 }
633 pub fn with_offline_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
639 self.offline_replicas = value;
640 self
641 }
642 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
644 self.unknown_tagged_fields = value;
645 self
646 }
647 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
649 self.unknown_tagged_fields.insert(key, value);
650 self
651 }
652}
653
654#[cfg(feature = "broker")]
655impl Encodable for MetadataResponsePartition {
656 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
657 if version < 0 || version > 13 {
658 bail!("specified version not supported by this message type");
659 }
660 types::Int16.encode(buf, &self.error_code)?;
661 types::Int32.encode(buf, &self.partition_index)?;
662 types::Int32.encode(buf, &self.leader_id)?;
663 if version >= 7 {
664 types::Int32.encode(buf, &self.leader_epoch)?;
665 }
666 if version >= 9 {
667 types::CompactArray(types::Int32).encode(buf, &self.replica_nodes)?;
668 } else {
669 types::Array(types::Int32).encode(buf, &self.replica_nodes)?;
670 }
671 if version >= 9 {
672 types::CompactArray(types::Int32).encode(buf, &self.isr_nodes)?;
673 } else {
674 types::Array(types::Int32).encode(buf, &self.isr_nodes)?;
675 }
676 if version >= 5 {
677 if version >= 9 {
678 types::CompactArray(types::Int32).encode(buf, &self.offline_replicas)?;
679 } else {
680 types::Array(types::Int32).encode(buf, &self.offline_replicas)?;
681 }
682 }
683 if version >= 9 {
684 let num_tagged_fields = self.unknown_tagged_fields.len();
685 if num_tagged_fields > std::u32::MAX as usize {
686 bail!(
687 "Too many tagged fields to encode ({} fields)",
688 num_tagged_fields
689 );
690 }
691 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
692
693 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
694 }
695 Ok(())
696 }
697 fn compute_size(&self, version: i16) -> Result<usize> {
698 let mut total_size = 0;
699 total_size += types::Int16.compute_size(&self.error_code)?;
700 total_size += types::Int32.compute_size(&self.partition_index)?;
701 total_size += types::Int32.compute_size(&self.leader_id)?;
702 if version >= 7 {
703 total_size += types::Int32.compute_size(&self.leader_epoch)?;
704 }
705 if version >= 9 {
706 total_size += types::CompactArray(types::Int32).compute_size(&self.replica_nodes)?;
707 } else {
708 total_size += types::Array(types::Int32).compute_size(&self.replica_nodes)?;
709 }
710 if version >= 9 {
711 total_size += types::CompactArray(types::Int32).compute_size(&self.isr_nodes)?;
712 } else {
713 total_size += types::Array(types::Int32).compute_size(&self.isr_nodes)?;
714 }
715 if version >= 5 {
716 if version >= 9 {
717 total_size +=
718 types::CompactArray(types::Int32).compute_size(&self.offline_replicas)?;
719 } else {
720 total_size += types::Array(types::Int32).compute_size(&self.offline_replicas)?;
721 }
722 }
723 if version >= 9 {
724 let num_tagged_fields = self.unknown_tagged_fields.len();
725 if num_tagged_fields > std::u32::MAX as usize {
726 bail!(
727 "Too many tagged fields to encode ({} fields)",
728 num_tagged_fields
729 );
730 }
731 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
732
733 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
734 }
735 Ok(total_size)
736 }
737}
738
739#[cfg(feature = "client")]
740impl Decodable for MetadataResponsePartition {
741 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
742 if version < 0 || version > 13 {
743 bail!("specified version not supported by this message type");
744 }
745 let error_code = types::Int16.decode(buf)?;
746 let partition_index = types::Int32.decode(buf)?;
747 let leader_id = types::Int32.decode(buf)?;
748 let leader_epoch = if version >= 7 {
749 types::Int32.decode(buf)?
750 } else {
751 -1
752 };
753 let replica_nodes = if version >= 9 {
754 types::CompactArray(types::Int32).decode(buf)?
755 } else {
756 types::Array(types::Int32).decode(buf)?
757 };
758 let isr_nodes = if version >= 9 {
759 types::CompactArray(types::Int32).decode(buf)?
760 } else {
761 types::Array(types::Int32).decode(buf)?
762 };
763 let offline_replicas = if version >= 5 {
764 if version >= 9 {
765 types::CompactArray(types::Int32).decode(buf)?
766 } else {
767 types::Array(types::Int32).decode(buf)?
768 }
769 } else {
770 Default::default()
771 };
772 let mut unknown_tagged_fields = BTreeMap::new();
773 if version >= 9 {
774 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
775 for _ in 0..num_tagged_fields {
776 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
777 let size: u32 = types::UnsignedVarInt.decode(buf)?;
778 let unknown_value = buf.try_get_bytes(size as usize)?;
779 unknown_tagged_fields.insert(tag as i32, unknown_value);
780 }
781 }
782 Ok(Self {
783 error_code,
784 partition_index,
785 leader_id,
786 leader_epoch,
787 replica_nodes,
788 isr_nodes,
789 offline_replicas,
790 unknown_tagged_fields,
791 })
792 }
793}
794
795impl Default for MetadataResponsePartition {
796 fn default() -> Self {
797 Self {
798 error_code: 0,
799 partition_index: 0,
800 leader_id: (0).into(),
801 leader_epoch: -1,
802 replica_nodes: Default::default(),
803 isr_nodes: Default::default(),
804 offline_replicas: Default::default(),
805 unknown_tagged_fields: BTreeMap::new(),
806 }
807 }
808}
809
810impl Message for MetadataResponsePartition {
811 const VERSIONS: VersionRange = VersionRange { min: 0, max: 13 };
812 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
813}
814
815#[non_exhaustive]
817#[derive(Debug, Clone, PartialEq)]
818pub struct MetadataResponseTopic {
819 pub error_code: i16,
823
824 pub name: Option<super::TopicName>,
828
829 pub topic_id: Uuid,
833
834 pub is_internal: bool,
838
839 pub partitions: Vec<MetadataResponsePartition>,
843
844 pub topic_authorized_operations: i32,
848
849 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
851}
852
853impl MetadataResponseTopic {
854 pub fn with_error_code(mut self, value: i16) -> Self {
860 self.error_code = value;
861 self
862 }
863 pub fn with_name(mut self, value: Option<super::TopicName>) -> Self {
869 self.name = value;
870 self
871 }
872 pub fn with_topic_id(mut self, value: Uuid) -> Self {
878 self.topic_id = value;
879 self
880 }
881 pub fn with_is_internal(mut self, value: bool) -> Self {
887 self.is_internal = value;
888 self
889 }
890 pub fn with_partitions(mut self, value: Vec<MetadataResponsePartition>) -> Self {
896 self.partitions = value;
897 self
898 }
899 pub fn with_topic_authorized_operations(mut self, value: i32) -> Self {
905 self.topic_authorized_operations = value;
906 self
907 }
908 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
910 self.unknown_tagged_fields = value;
911 self
912 }
913 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
915 self.unknown_tagged_fields.insert(key, value);
916 self
917 }
918}
919
920#[cfg(feature = "broker")]
921impl Encodable for MetadataResponseTopic {
922 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
923 if version < 0 || version > 13 {
924 bail!("specified version not supported by this message type");
925 }
926 types::Int16.encode(buf, &self.error_code)?;
927 if version >= 9 {
928 types::CompactString.encode(buf, &self.name)?;
929 } else {
930 types::String.encode(buf, &self.name)?;
931 }
932 if version >= 10 {
933 types::Uuid.encode(buf, &self.topic_id)?;
934 }
935 if version >= 1 {
936 types::Boolean.encode(buf, &self.is_internal)?;
937 }
938 if version >= 9 {
939 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
940 } else {
941 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
942 }
943 if version >= 8 {
944 types::Int32.encode(buf, &self.topic_authorized_operations)?;
945 } else {
946 if self.topic_authorized_operations != -2147483648 {
947 bail!("A field is set that is not available on the selected protocol version");
948 }
949 }
950 if version >= 9 {
951 let num_tagged_fields = self.unknown_tagged_fields.len();
952 if num_tagged_fields > std::u32::MAX as usize {
953 bail!(
954 "Too many tagged fields to encode ({} fields)",
955 num_tagged_fields
956 );
957 }
958 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
959
960 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
961 }
962 Ok(())
963 }
964 fn compute_size(&self, version: i16) -> Result<usize> {
965 let mut total_size = 0;
966 total_size += types::Int16.compute_size(&self.error_code)?;
967 if version >= 9 {
968 total_size += types::CompactString.compute_size(&self.name)?;
969 } else {
970 total_size += types::String.compute_size(&self.name)?;
971 }
972 if version >= 10 {
973 total_size += types::Uuid.compute_size(&self.topic_id)?;
974 }
975 if version >= 1 {
976 total_size += types::Boolean.compute_size(&self.is_internal)?;
977 }
978 if version >= 9 {
979 total_size +=
980 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
981 } else {
982 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
983 }
984 if version >= 8 {
985 total_size += types::Int32.compute_size(&self.topic_authorized_operations)?;
986 } else {
987 if self.topic_authorized_operations != -2147483648 {
988 bail!("A field is set that is not available on the selected protocol version");
989 }
990 }
991 if version >= 9 {
992 let num_tagged_fields = self.unknown_tagged_fields.len();
993 if num_tagged_fields > std::u32::MAX as usize {
994 bail!(
995 "Too many tagged fields to encode ({} fields)",
996 num_tagged_fields
997 );
998 }
999 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1000
1001 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1002 }
1003 Ok(total_size)
1004 }
1005}
1006
1007#[cfg(feature = "client")]
1008impl Decodable for MetadataResponseTopic {
1009 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1010 if version < 0 || version > 13 {
1011 bail!("specified version not supported by this message type");
1012 }
1013 let error_code = types::Int16.decode(buf)?;
1014 let name = if version >= 9 {
1015 types::CompactString.decode(buf)?
1016 } else {
1017 types::String.decode(buf)?
1018 };
1019 let topic_id = if version >= 10 {
1020 types::Uuid.decode(buf)?
1021 } else {
1022 Uuid::nil()
1023 };
1024 let is_internal = if version >= 1 {
1025 types::Boolean.decode(buf)?
1026 } else {
1027 false
1028 };
1029 let partitions = if version >= 9 {
1030 types::CompactArray(types::Struct { version }).decode(buf)?
1031 } else {
1032 types::Array(types::Struct { version }).decode(buf)?
1033 };
1034 let topic_authorized_operations = if version >= 8 {
1035 types::Int32.decode(buf)?
1036 } else {
1037 -2147483648
1038 };
1039 let mut unknown_tagged_fields = BTreeMap::new();
1040 if version >= 9 {
1041 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1042 for _ in 0..num_tagged_fields {
1043 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1044 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1045 let unknown_value = buf.try_get_bytes(size as usize)?;
1046 unknown_tagged_fields.insert(tag as i32, unknown_value);
1047 }
1048 }
1049 Ok(Self {
1050 error_code,
1051 name,
1052 topic_id,
1053 is_internal,
1054 partitions,
1055 topic_authorized_operations,
1056 unknown_tagged_fields,
1057 })
1058 }
1059}
1060
1061impl Default for MetadataResponseTopic {
1062 fn default() -> Self {
1063 Self {
1064 error_code: 0,
1065 name: Some(Default::default()),
1066 topic_id: Uuid::nil(),
1067 is_internal: false,
1068 partitions: Default::default(),
1069 topic_authorized_operations: -2147483648,
1070 unknown_tagged_fields: BTreeMap::new(),
1071 }
1072 }
1073}
1074
1075impl Message for MetadataResponseTopic {
1076 const VERSIONS: VersionRange = VersionRange { min: 0, max: 13 };
1077 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1078}
1079
1080impl HeaderVersion for MetadataResponse {
1081 fn header_version(version: i16) -> i16 {
1082 if version >= 9 {
1083 1
1084 } else {
1085 0
1086 }
1087 }
1088}