kafka_protocol/messages/
metadata_response.rs

1//! MetadataResponse
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/MetadataResponse.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-12
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct MetadataResponse {
24    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
25    ///
26    /// Supported API versions: 3-12
27    pub throttle_time_ms: i32,
28
29    /// A list of brokers present in the cluster.
30    ///
31    /// Supported API versions: 0-12
32    pub brokers: Vec<MetadataResponseBroker>,
33
34    /// The cluster ID that responding broker belongs to.
35    ///
36    /// Supported API versions: 2-12
37    pub cluster_id: Option<StrBytes>,
38
39    /// The ID of the controller broker.
40    ///
41    /// Supported API versions: 1-12
42    pub controller_id: super::BrokerId,
43
44    /// Each topic in the response.
45    ///
46    /// Supported API versions: 0-12
47    pub topics: Vec<MetadataResponseTopic>,
48
49    /// 32-bit bitfield to represent authorized operations for this cluster.
50    ///
51    /// Supported API versions: 8-10
52    pub cluster_authorized_operations: i32,
53
54    /// Other tagged fields
55    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl MetadataResponse {
59    /// Sets `throttle_time_ms` to the passed value.
60    ///
61    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
62    ///
63    /// Supported API versions: 3-12
64    pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
65        self.throttle_time_ms = value;
66        self
67    }
68    /// Sets `brokers` to the passed value.
69    ///
70    /// A list of brokers present in the cluster.
71    ///
72    /// Supported API versions: 0-12
73    pub fn with_brokers(mut self, value: Vec<MetadataResponseBroker>) -> Self {
74        self.brokers = value;
75        self
76    }
77    /// Sets `cluster_id` to the passed value.
78    ///
79    /// The cluster ID that responding broker belongs to.
80    ///
81    /// Supported API versions: 2-12
82    pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
83        self.cluster_id = value;
84        self
85    }
86    /// Sets `controller_id` to the passed value.
87    ///
88    /// The ID of the controller broker.
89    ///
90    /// Supported API versions: 1-12
91    pub fn with_controller_id(mut self, value: super::BrokerId) -> Self {
92        self.controller_id = value;
93        self
94    }
95    /// Sets `topics` to the passed value.
96    ///
97    /// Each topic in the response.
98    ///
99    /// Supported API versions: 0-12
100    pub fn with_topics(mut self, value: Vec<MetadataResponseTopic>) -> Self {
101        self.topics = value;
102        self
103    }
104    /// Sets `cluster_authorized_operations` to the passed value.
105    ///
106    /// 32-bit bitfield to represent authorized operations for this cluster.
107    ///
108    /// Supported API versions: 8-10
109    pub fn with_cluster_authorized_operations(mut self, value: i32) -> Self {
110        self.cluster_authorized_operations = value;
111        self
112    }
113    /// Sets unknown_tagged_fields to the passed value.
114    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115        self.unknown_tagged_fields = value;
116        self
117    }
118    /// Inserts an entry into unknown_tagged_fields.
119    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120        self.unknown_tagged_fields.insert(key, value);
121        self
122    }
123}
124
125#[cfg(feature = "broker")]
126impl Encodable for MetadataResponse {
127    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128        if version < 0 || version > 12 {
129            bail!("specified version not supported by this message type");
130        }
131        if version >= 3 {
132            types::Int32.encode(buf, &self.throttle_time_ms)?;
133        }
134        if version >= 9 {
135            types::CompactArray(types::Struct { version }).encode(buf, &self.brokers)?;
136        } else {
137            types::Array(types::Struct { version }).encode(buf, &self.brokers)?;
138        }
139        if version >= 2 {
140            if version >= 9 {
141                types::CompactString.encode(buf, &self.cluster_id)?;
142            } else {
143                types::String.encode(buf, &self.cluster_id)?;
144            }
145        }
146        if version >= 1 {
147            types::Int32.encode(buf, &self.controller_id)?;
148        }
149        if version >= 9 {
150            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
151        } else {
152            types::Array(types::Struct { version }).encode(buf, &self.topics)?;
153        }
154        if version >= 8 && version <= 10 {
155            types::Int32.encode(buf, &self.cluster_authorized_operations)?;
156        } else {
157            if self.cluster_authorized_operations != -2147483648 {
158                bail!("A field is set that is not available on the selected protocol version");
159            }
160        }
161        if version >= 9 {
162            let num_tagged_fields = self.unknown_tagged_fields.len();
163            if num_tagged_fields > std::u32::MAX as usize {
164                bail!(
165                    "Too many tagged fields to encode ({} fields)",
166                    num_tagged_fields
167                );
168            }
169            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
170
171            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
172        }
173        Ok(())
174    }
175    fn compute_size(&self, version: i16) -> Result<usize> {
176        let mut total_size = 0;
177        if version >= 3 {
178            total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
179        }
180        if version >= 9 {
181            total_size +=
182                types::CompactArray(types::Struct { version }).compute_size(&self.brokers)?;
183        } else {
184            total_size += types::Array(types::Struct { version }).compute_size(&self.brokers)?;
185        }
186        if version >= 2 {
187            if version >= 9 {
188                total_size += types::CompactString.compute_size(&self.cluster_id)?;
189            } else {
190                total_size += types::String.compute_size(&self.cluster_id)?;
191            }
192        }
193        if version >= 1 {
194            total_size += types::Int32.compute_size(&self.controller_id)?;
195        }
196        if version >= 9 {
197            total_size +=
198                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
199        } else {
200            total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
201        }
202        if version >= 8 && version <= 10 {
203            total_size += types::Int32.compute_size(&self.cluster_authorized_operations)?;
204        } else {
205            if self.cluster_authorized_operations != -2147483648 {
206                bail!("A field is set that is not available on the selected protocol version");
207            }
208        }
209        if version >= 9 {
210            let num_tagged_fields = self.unknown_tagged_fields.len();
211            if num_tagged_fields > std::u32::MAX as usize {
212                bail!(
213                    "Too many tagged fields to encode ({} fields)",
214                    num_tagged_fields
215                );
216            }
217            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
218
219            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
220        }
221        Ok(total_size)
222    }
223}
224
225#[cfg(feature = "client")]
226impl Decodable for MetadataResponse {
227    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
228        if version < 0 || version > 12 {
229            bail!("specified version not supported by this message type");
230        }
231        let throttle_time_ms = if version >= 3 {
232            types::Int32.decode(buf)?
233        } else {
234            0
235        };
236        let brokers = if version >= 9 {
237            types::CompactArray(types::Struct { version }).decode(buf)?
238        } else {
239            types::Array(types::Struct { version }).decode(buf)?
240        };
241        let cluster_id = if version >= 2 {
242            if version >= 9 {
243                types::CompactString.decode(buf)?
244            } else {
245                types::String.decode(buf)?
246            }
247        } else {
248            None
249        };
250        let controller_id = if version >= 1 {
251            types::Int32.decode(buf)?
252        } else {
253            (-1).into()
254        };
255        let topics = if version >= 9 {
256            types::CompactArray(types::Struct { version }).decode(buf)?
257        } else {
258            types::Array(types::Struct { version }).decode(buf)?
259        };
260        let cluster_authorized_operations = if version >= 8 && version <= 10 {
261            types::Int32.decode(buf)?
262        } else {
263            -2147483648
264        };
265        let mut unknown_tagged_fields = BTreeMap::new();
266        if version >= 9 {
267            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
268            for _ in 0..num_tagged_fields {
269                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
270                let size: u32 = types::UnsignedVarInt.decode(buf)?;
271                let unknown_value = buf.try_get_bytes(size as usize)?;
272                unknown_tagged_fields.insert(tag as i32, unknown_value);
273            }
274        }
275        Ok(Self {
276            throttle_time_ms,
277            brokers,
278            cluster_id,
279            controller_id,
280            topics,
281            cluster_authorized_operations,
282            unknown_tagged_fields,
283        })
284    }
285}
286
287impl Default for MetadataResponse {
288    fn default() -> Self {
289        Self {
290            throttle_time_ms: 0,
291            brokers: Default::default(),
292            cluster_id: None,
293            controller_id: (-1).into(),
294            topics: Default::default(),
295            cluster_authorized_operations: -2147483648,
296            unknown_tagged_fields: BTreeMap::new(),
297        }
298    }
299}
300
301impl Message for MetadataResponse {
302    const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
303    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
304}
305
306/// Valid versions: 0-12
307#[non_exhaustive]
308#[derive(Debug, Clone, PartialEq)]
309pub struct MetadataResponseBroker {
310    /// The broker ID.
311    ///
312    /// Supported API versions: 0-12
313    pub node_id: super::BrokerId,
314
315    /// The broker hostname.
316    ///
317    /// Supported API versions: 0-12
318    pub host: StrBytes,
319
320    /// The broker port.
321    ///
322    /// Supported API versions: 0-12
323    pub port: i32,
324
325    /// The rack of the broker, or null if it has not been assigned to a rack.
326    ///
327    /// Supported API versions: 1-12
328    pub rack: Option<StrBytes>,
329
330    /// Other tagged fields
331    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
332}
333
334impl MetadataResponseBroker {
335    /// Sets `node_id` to the passed value.
336    ///
337    /// The broker ID.
338    ///
339    /// Supported API versions: 0-12
340    pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
341        self.node_id = value;
342        self
343    }
344    /// Sets `host` to the passed value.
345    ///
346    /// The broker hostname.
347    ///
348    /// Supported API versions: 0-12
349    pub fn with_host(mut self, value: StrBytes) -> Self {
350        self.host = value;
351        self
352    }
353    /// Sets `port` to the passed value.
354    ///
355    /// The broker port.
356    ///
357    /// Supported API versions: 0-12
358    pub fn with_port(mut self, value: i32) -> Self {
359        self.port = value;
360        self
361    }
362    /// Sets `rack` to the passed value.
363    ///
364    /// The rack of the broker, or null if it has not been assigned to a rack.
365    ///
366    /// Supported API versions: 1-12
367    pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
368        self.rack = value;
369        self
370    }
371    /// Sets unknown_tagged_fields to the passed value.
372    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
373        self.unknown_tagged_fields = value;
374        self
375    }
376    /// Inserts an entry into unknown_tagged_fields.
377    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
378        self.unknown_tagged_fields.insert(key, value);
379        self
380    }
381}
382
383#[cfg(feature = "broker")]
384impl Encodable for MetadataResponseBroker {
385    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
386        if version < 0 || version > 12 {
387            bail!("specified version not supported by this message type");
388        }
389        types::Int32.encode(buf, &self.node_id)?;
390        if version >= 9 {
391            types::CompactString.encode(buf, &self.host)?;
392        } else {
393            types::String.encode(buf, &self.host)?;
394        }
395        types::Int32.encode(buf, &self.port)?;
396        if version >= 1 {
397            if version >= 9 {
398                types::CompactString.encode(buf, &self.rack)?;
399            } else {
400                types::String.encode(buf, &self.rack)?;
401            }
402        }
403        if version >= 9 {
404            let num_tagged_fields = self.unknown_tagged_fields.len();
405            if num_tagged_fields > std::u32::MAX as usize {
406                bail!(
407                    "Too many tagged fields to encode ({} fields)",
408                    num_tagged_fields
409                );
410            }
411            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
412
413            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
414        }
415        Ok(())
416    }
417    fn compute_size(&self, version: i16) -> Result<usize> {
418        let mut total_size = 0;
419        total_size += types::Int32.compute_size(&self.node_id)?;
420        if version >= 9 {
421            total_size += types::CompactString.compute_size(&self.host)?;
422        } else {
423            total_size += types::String.compute_size(&self.host)?;
424        }
425        total_size += types::Int32.compute_size(&self.port)?;
426        if version >= 1 {
427            if version >= 9 {
428                total_size += types::CompactString.compute_size(&self.rack)?;
429            } else {
430                total_size += types::String.compute_size(&self.rack)?;
431            }
432        }
433        if version >= 9 {
434            let num_tagged_fields = self.unknown_tagged_fields.len();
435            if num_tagged_fields > std::u32::MAX as usize {
436                bail!(
437                    "Too many tagged fields to encode ({} fields)",
438                    num_tagged_fields
439                );
440            }
441            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
442
443            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
444        }
445        Ok(total_size)
446    }
447}
448
449#[cfg(feature = "client")]
450impl Decodable for MetadataResponseBroker {
451    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
452        if version < 0 || version > 12 {
453            bail!("specified version not supported by this message type");
454        }
455        let node_id = types::Int32.decode(buf)?;
456        let host = if version >= 9 {
457            types::CompactString.decode(buf)?
458        } else {
459            types::String.decode(buf)?
460        };
461        let port = types::Int32.decode(buf)?;
462        let rack = if version >= 1 {
463            if version >= 9 {
464                types::CompactString.decode(buf)?
465            } else {
466                types::String.decode(buf)?
467            }
468        } else {
469            None
470        };
471        let mut unknown_tagged_fields = BTreeMap::new();
472        if version >= 9 {
473            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
474            for _ in 0..num_tagged_fields {
475                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
476                let size: u32 = types::UnsignedVarInt.decode(buf)?;
477                let unknown_value = buf.try_get_bytes(size as usize)?;
478                unknown_tagged_fields.insert(tag as i32, unknown_value);
479            }
480        }
481        Ok(Self {
482            node_id,
483            host,
484            port,
485            rack,
486            unknown_tagged_fields,
487        })
488    }
489}
490
491impl Default for MetadataResponseBroker {
492    fn default() -> Self {
493        Self {
494            node_id: (0).into(),
495            host: Default::default(),
496            port: 0,
497            rack: None,
498            unknown_tagged_fields: BTreeMap::new(),
499        }
500    }
501}
502
503impl Message for MetadataResponseBroker {
504    const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
505    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
506}
507
508/// Valid versions: 0-12
509#[non_exhaustive]
510#[derive(Debug, Clone, PartialEq)]
511pub struct MetadataResponsePartition {
512    /// The partition error, or 0 if there was no error.
513    ///
514    /// Supported API versions: 0-12
515    pub error_code: i16,
516
517    /// The partition index.
518    ///
519    /// Supported API versions: 0-12
520    pub partition_index: i32,
521
522    /// The ID of the leader broker.
523    ///
524    /// Supported API versions: 0-12
525    pub leader_id: super::BrokerId,
526
527    /// The leader epoch of this partition.
528    ///
529    /// Supported API versions: 7-12
530    pub leader_epoch: i32,
531
532    /// The set of all nodes that host this partition.
533    ///
534    /// Supported API versions: 0-12
535    pub replica_nodes: Vec<super::BrokerId>,
536
537    /// The set of nodes that are in sync with the leader for this partition.
538    ///
539    /// Supported API versions: 0-12
540    pub isr_nodes: Vec<super::BrokerId>,
541
542    /// The set of offline replicas of this partition.
543    ///
544    /// Supported API versions: 5-12
545    pub offline_replicas: Vec<super::BrokerId>,
546
547    /// Other tagged fields
548    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
549}
550
551impl MetadataResponsePartition {
552    /// Sets `error_code` to the passed value.
553    ///
554    /// The partition error, or 0 if there was no error.
555    ///
556    /// Supported API versions: 0-12
557    pub fn with_error_code(mut self, value: i16) -> Self {
558        self.error_code = value;
559        self
560    }
561    /// Sets `partition_index` to the passed value.
562    ///
563    /// The partition index.
564    ///
565    /// Supported API versions: 0-12
566    pub fn with_partition_index(mut self, value: i32) -> Self {
567        self.partition_index = value;
568        self
569    }
570    /// Sets `leader_id` to the passed value.
571    ///
572    /// The ID of the leader broker.
573    ///
574    /// Supported API versions: 0-12
575    pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
576        self.leader_id = value;
577        self
578    }
579    /// Sets `leader_epoch` to the passed value.
580    ///
581    /// The leader epoch of this partition.
582    ///
583    /// Supported API versions: 7-12
584    pub fn with_leader_epoch(mut self, value: i32) -> Self {
585        self.leader_epoch = value;
586        self
587    }
588    /// Sets `replica_nodes` to the passed value.
589    ///
590    /// The set of all nodes that host this partition.
591    ///
592    /// Supported API versions: 0-12
593    pub fn with_replica_nodes(mut self, value: Vec<super::BrokerId>) -> Self {
594        self.replica_nodes = value;
595        self
596    }
597    /// Sets `isr_nodes` to the passed value.
598    ///
599    /// The set of nodes that are in sync with the leader for this partition.
600    ///
601    /// Supported API versions: 0-12
602    pub fn with_isr_nodes(mut self, value: Vec<super::BrokerId>) -> Self {
603        self.isr_nodes = value;
604        self
605    }
606    /// Sets `offline_replicas` to the passed value.
607    ///
608    /// The set of offline replicas of this partition.
609    ///
610    /// Supported API versions: 5-12
611    pub fn with_offline_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
612        self.offline_replicas = value;
613        self
614    }
615    /// Sets unknown_tagged_fields to the passed value.
616    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
617        self.unknown_tagged_fields = value;
618        self
619    }
620    /// Inserts an entry into unknown_tagged_fields.
621    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
622        self.unknown_tagged_fields.insert(key, value);
623        self
624    }
625}
626
627#[cfg(feature = "broker")]
628impl Encodable for MetadataResponsePartition {
629    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
630        if version < 0 || version > 12 {
631            bail!("specified version not supported by this message type");
632        }
633        types::Int16.encode(buf, &self.error_code)?;
634        types::Int32.encode(buf, &self.partition_index)?;
635        types::Int32.encode(buf, &self.leader_id)?;
636        if version >= 7 {
637            types::Int32.encode(buf, &self.leader_epoch)?;
638        }
639        if version >= 9 {
640            types::CompactArray(types::Int32).encode(buf, &self.replica_nodes)?;
641        } else {
642            types::Array(types::Int32).encode(buf, &self.replica_nodes)?;
643        }
644        if version >= 9 {
645            types::CompactArray(types::Int32).encode(buf, &self.isr_nodes)?;
646        } else {
647            types::Array(types::Int32).encode(buf, &self.isr_nodes)?;
648        }
649        if version >= 5 {
650            if version >= 9 {
651                types::CompactArray(types::Int32).encode(buf, &self.offline_replicas)?;
652            } else {
653                types::Array(types::Int32).encode(buf, &self.offline_replicas)?;
654            }
655        }
656        if version >= 9 {
657            let num_tagged_fields = self.unknown_tagged_fields.len();
658            if num_tagged_fields > std::u32::MAX as usize {
659                bail!(
660                    "Too many tagged fields to encode ({} fields)",
661                    num_tagged_fields
662                );
663            }
664            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
665
666            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
667        }
668        Ok(())
669    }
670    fn compute_size(&self, version: i16) -> Result<usize> {
671        let mut total_size = 0;
672        total_size += types::Int16.compute_size(&self.error_code)?;
673        total_size += types::Int32.compute_size(&self.partition_index)?;
674        total_size += types::Int32.compute_size(&self.leader_id)?;
675        if version >= 7 {
676            total_size += types::Int32.compute_size(&self.leader_epoch)?;
677        }
678        if version >= 9 {
679            total_size += types::CompactArray(types::Int32).compute_size(&self.replica_nodes)?;
680        } else {
681            total_size += types::Array(types::Int32).compute_size(&self.replica_nodes)?;
682        }
683        if version >= 9 {
684            total_size += types::CompactArray(types::Int32).compute_size(&self.isr_nodes)?;
685        } else {
686            total_size += types::Array(types::Int32).compute_size(&self.isr_nodes)?;
687        }
688        if version >= 5 {
689            if version >= 9 {
690                total_size +=
691                    types::CompactArray(types::Int32).compute_size(&self.offline_replicas)?;
692            } else {
693                total_size += types::Array(types::Int32).compute_size(&self.offline_replicas)?;
694            }
695        }
696        if version >= 9 {
697            let num_tagged_fields = self.unknown_tagged_fields.len();
698            if num_tagged_fields > std::u32::MAX as usize {
699                bail!(
700                    "Too many tagged fields to encode ({} fields)",
701                    num_tagged_fields
702                );
703            }
704            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
705
706            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
707        }
708        Ok(total_size)
709    }
710}
711
712#[cfg(feature = "client")]
713impl Decodable for MetadataResponsePartition {
714    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
715        if version < 0 || version > 12 {
716            bail!("specified version not supported by this message type");
717        }
718        let error_code = types::Int16.decode(buf)?;
719        let partition_index = types::Int32.decode(buf)?;
720        let leader_id = types::Int32.decode(buf)?;
721        let leader_epoch = if version >= 7 {
722            types::Int32.decode(buf)?
723        } else {
724            -1
725        };
726        let replica_nodes = if version >= 9 {
727            types::CompactArray(types::Int32).decode(buf)?
728        } else {
729            types::Array(types::Int32).decode(buf)?
730        };
731        let isr_nodes = if version >= 9 {
732            types::CompactArray(types::Int32).decode(buf)?
733        } else {
734            types::Array(types::Int32).decode(buf)?
735        };
736        let offline_replicas = if version >= 5 {
737            if version >= 9 {
738                types::CompactArray(types::Int32).decode(buf)?
739            } else {
740                types::Array(types::Int32).decode(buf)?
741            }
742        } else {
743            Default::default()
744        };
745        let mut unknown_tagged_fields = BTreeMap::new();
746        if version >= 9 {
747            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
748            for _ in 0..num_tagged_fields {
749                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
750                let size: u32 = types::UnsignedVarInt.decode(buf)?;
751                let unknown_value = buf.try_get_bytes(size as usize)?;
752                unknown_tagged_fields.insert(tag as i32, unknown_value);
753            }
754        }
755        Ok(Self {
756            error_code,
757            partition_index,
758            leader_id,
759            leader_epoch,
760            replica_nodes,
761            isr_nodes,
762            offline_replicas,
763            unknown_tagged_fields,
764        })
765    }
766}
767
768impl Default for MetadataResponsePartition {
769    fn default() -> Self {
770        Self {
771            error_code: 0,
772            partition_index: 0,
773            leader_id: (0).into(),
774            leader_epoch: -1,
775            replica_nodes: Default::default(),
776            isr_nodes: Default::default(),
777            offline_replicas: Default::default(),
778            unknown_tagged_fields: BTreeMap::new(),
779        }
780    }
781}
782
783impl Message for MetadataResponsePartition {
784    const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
785    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
786}
787
788/// Valid versions: 0-12
789#[non_exhaustive]
790#[derive(Debug, Clone, PartialEq)]
791pub struct MetadataResponseTopic {
792    /// The topic error, or 0 if there was no error.
793    ///
794    /// Supported API versions: 0-12
795    pub error_code: i16,
796
797    /// The topic name. Null for non-existing topics queried by ID. This is never null when ErrorCode is zero. One of Name and TopicId is always populated.
798    ///
799    /// Supported API versions: 0-12
800    pub name: Option<super::TopicName>,
801
802    /// The topic id. Zero for non-existing topics queried by name. This is never zero when ErrorCode is zero. One of Name and TopicId is always populated.
803    ///
804    /// Supported API versions: 10-12
805    pub topic_id: Uuid,
806
807    /// True if the topic is internal.
808    ///
809    /// Supported API versions: 1-12
810    pub is_internal: bool,
811
812    /// Each partition in the topic.
813    ///
814    /// Supported API versions: 0-12
815    pub partitions: Vec<MetadataResponsePartition>,
816
817    /// 32-bit bitfield to represent authorized operations for this topic.
818    ///
819    /// Supported API versions: 8-12
820    pub topic_authorized_operations: i32,
821
822    /// Other tagged fields
823    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
824}
825
826impl MetadataResponseTopic {
827    /// Sets `error_code` to the passed value.
828    ///
829    /// The topic error, or 0 if there was no error.
830    ///
831    /// Supported API versions: 0-12
832    pub fn with_error_code(mut self, value: i16) -> Self {
833        self.error_code = value;
834        self
835    }
836    /// Sets `name` to the passed value.
837    ///
838    /// The topic name. Null for non-existing topics queried by ID. This is never null when ErrorCode is zero. One of Name and TopicId is always populated.
839    ///
840    /// Supported API versions: 0-12
841    pub fn with_name(mut self, value: Option<super::TopicName>) -> Self {
842        self.name = value;
843        self
844    }
845    /// Sets `topic_id` to the passed value.
846    ///
847    /// The topic id. Zero for non-existing topics queried by name. This is never zero when ErrorCode is zero. One of Name and TopicId is always populated.
848    ///
849    /// Supported API versions: 10-12
850    pub fn with_topic_id(mut self, value: Uuid) -> Self {
851        self.topic_id = value;
852        self
853    }
854    /// Sets `is_internal` to the passed value.
855    ///
856    /// True if the topic is internal.
857    ///
858    /// Supported API versions: 1-12
859    pub fn with_is_internal(mut self, value: bool) -> Self {
860        self.is_internal = value;
861        self
862    }
863    /// Sets `partitions` to the passed value.
864    ///
865    /// Each partition in the topic.
866    ///
867    /// Supported API versions: 0-12
868    pub fn with_partitions(mut self, value: Vec<MetadataResponsePartition>) -> Self {
869        self.partitions = value;
870        self
871    }
872    /// Sets `topic_authorized_operations` to the passed value.
873    ///
874    /// 32-bit bitfield to represent authorized operations for this topic.
875    ///
876    /// Supported API versions: 8-12
877    pub fn with_topic_authorized_operations(mut self, value: i32) -> Self {
878        self.topic_authorized_operations = value;
879        self
880    }
881    /// Sets unknown_tagged_fields to the passed value.
882    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
883        self.unknown_tagged_fields = value;
884        self
885    }
886    /// Inserts an entry into unknown_tagged_fields.
887    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
888        self.unknown_tagged_fields.insert(key, value);
889        self
890    }
891}
892
893#[cfg(feature = "broker")]
894impl Encodable for MetadataResponseTopic {
895    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
896        if version < 0 || version > 12 {
897            bail!("specified version not supported by this message type");
898        }
899        types::Int16.encode(buf, &self.error_code)?;
900        if version >= 9 {
901            types::CompactString.encode(buf, &self.name)?;
902        } else {
903            types::String.encode(buf, &self.name)?;
904        }
905        if version >= 10 {
906            types::Uuid.encode(buf, &self.topic_id)?;
907        }
908        if version >= 1 {
909            types::Boolean.encode(buf, &self.is_internal)?;
910        }
911        if version >= 9 {
912            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
913        } else {
914            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
915        }
916        if version >= 8 {
917            types::Int32.encode(buf, &self.topic_authorized_operations)?;
918        } else {
919            if self.topic_authorized_operations != -2147483648 {
920                bail!("A field is set that is not available on the selected protocol version");
921            }
922        }
923        if version >= 9 {
924            let num_tagged_fields = self.unknown_tagged_fields.len();
925            if num_tagged_fields > std::u32::MAX as usize {
926                bail!(
927                    "Too many tagged fields to encode ({} fields)",
928                    num_tagged_fields
929                );
930            }
931            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
932
933            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
934        }
935        Ok(())
936    }
937    fn compute_size(&self, version: i16) -> Result<usize> {
938        let mut total_size = 0;
939        total_size += types::Int16.compute_size(&self.error_code)?;
940        if version >= 9 {
941            total_size += types::CompactString.compute_size(&self.name)?;
942        } else {
943            total_size += types::String.compute_size(&self.name)?;
944        }
945        if version >= 10 {
946            total_size += types::Uuid.compute_size(&self.topic_id)?;
947        }
948        if version >= 1 {
949            total_size += types::Boolean.compute_size(&self.is_internal)?;
950        }
951        if version >= 9 {
952            total_size +=
953                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
954        } else {
955            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
956        }
957        if version >= 8 {
958            total_size += types::Int32.compute_size(&self.topic_authorized_operations)?;
959        } else {
960            if self.topic_authorized_operations != -2147483648 {
961                bail!("A field is set that is not available on the selected protocol version");
962            }
963        }
964        if version >= 9 {
965            let num_tagged_fields = self.unknown_tagged_fields.len();
966            if num_tagged_fields > std::u32::MAX as usize {
967                bail!(
968                    "Too many tagged fields to encode ({} fields)",
969                    num_tagged_fields
970                );
971            }
972            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
973
974            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
975        }
976        Ok(total_size)
977    }
978}
979
980#[cfg(feature = "client")]
981impl Decodable for MetadataResponseTopic {
982    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
983        if version < 0 || version > 12 {
984            bail!("specified version not supported by this message type");
985        }
986        let error_code = types::Int16.decode(buf)?;
987        let name = if version >= 9 {
988            types::CompactString.decode(buf)?
989        } else {
990            types::String.decode(buf)?
991        };
992        let topic_id = if version >= 10 {
993            types::Uuid.decode(buf)?
994        } else {
995            Uuid::nil()
996        };
997        let is_internal = if version >= 1 {
998            types::Boolean.decode(buf)?
999        } else {
1000            false
1001        };
1002        let partitions = if version >= 9 {
1003            types::CompactArray(types::Struct { version }).decode(buf)?
1004        } else {
1005            types::Array(types::Struct { version }).decode(buf)?
1006        };
1007        let topic_authorized_operations = if version >= 8 {
1008            types::Int32.decode(buf)?
1009        } else {
1010            -2147483648
1011        };
1012        let mut unknown_tagged_fields = BTreeMap::new();
1013        if version >= 9 {
1014            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1015            for _ in 0..num_tagged_fields {
1016                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1017                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1018                let unknown_value = buf.try_get_bytes(size as usize)?;
1019                unknown_tagged_fields.insert(tag as i32, unknown_value);
1020            }
1021        }
1022        Ok(Self {
1023            error_code,
1024            name,
1025            topic_id,
1026            is_internal,
1027            partitions,
1028            topic_authorized_operations,
1029            unknown_tagged_fields,
1030        })
1031    }
1032}
1033
1034impl Default for MetadataResponseTopic {
1035    fn default() -> Self {
1036        Self {
1037            error_code: 0,
1038            name: Some(Default::default()),
1039            topic_id: Uuid::nil(),
1040            is_internal: false,
1041            partitions: Default::default(),
1042            topic_authorized_operations: -2147483648,
1043            unknown_tagged_fields: BTreeMap::new(),
1044        }
1045    }
1046}
1047
1048impl Message for MetadataResponseTopic {
1049    const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
1050    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1051}
1052
1053impl HeaderVersion for MetadataResponse {
1054    fn header_version(version: i16) -> i16 {
1055        if version >= 9 {
1056            1
1057        } else {
1058            0
1059        }
1060    }
1061}