kafka_protocol/messages/
update_metadata_request.rs

1//! UpdateMetadataRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/UpdateMetadataRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-8
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct UpdateMetadataBroker {
24    /// The broker id.
25    ///
26    /// Supported API versions: 0-8
27    pub id: super::BrokerId,
28
29    /// The broker hostname.
30    ///
31    /// Supported API versions: 0
32    pub v0_host: StrBytes,
33
34    /// The broker port.
35    ///
36    /// Supported API versions: 0
37    pub v0_port: i32,
38
39    /// The broker endpoints.
40    ///
41    /// Supported API versions: 1-8
42    pub endpoints: Vec<UpdateMetadataEndpoint>,
43
44    /// The rack which this broker belongs to.
45    ///
46    /// Supported API versions: 2-8
47    pub rack: Option<StrBytes>,
48
49    /// Other tagged fields
50    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl UpdateMetadataBroker {
54    /// Sets `id` to the passed value.
55    ///
56    /// The broker id.
57    ///
58    /// Supported API versions: 0-8
59    pub fn with_id(mut self, value: super::BrokerId) -> Self {
60        self.id = value;
61        self
62    }
63    /// Sets `v0_host` to the passed value.
64    ///
65    /// The broker hostname.
66    ///
67    /// Supported API versions: 0
68    pub fn with_v0_host(mut self, value: StrBytes) -> Self {
69        self.v0_host = value;
70        self
71    }
72    /// Sets `v0_port` to the passed value.
73    ///
74    /// The broker port.
75    ///
76    /// Supported API versions: 0
77    pub fn with_v0_port(mut self, value: i32) -> Self {
78        self.v0_port = value;
79        self
80    }
81    /// Sets `endpoints` to the passed value.
82    ///
83    /// The broker endpoints.
84    ///
85    /// Supported API versions: 1-8
86    pub fn with_endpoints(mut self, value: Vec<UpdateMetadataEndpoint>) -> Self {
87        self.endpoints = value;
88        self
89    }
90    /// Sets `rack` to the passed value.
91    ///
92    /// The rack which this broker belongs to.
93    ///
94    /// Supported API versions: 2-8
95    pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
96        self.rack = value;
97        self
98    }
99    /// Sets unknown_tagged_fields to the passed value.
100    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101        self.unknown_tagged_fields = value;
102        self
103    }
104    /// Inserts an entry into unknown_tagged_fields.
105    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106        self.unknown_tagged_fields.insert(key, value);
107        self
108    }
109}
110
111#[cfg(feature = "client")]
112impl Encodable for UpdateMetadataBroker {
113    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114        types::Int32.encode(buf, &self.id)?;
115        if version == 0 {
116            types::String.encode(buf, &self.v0_host)?;
117        }
118        if version == 0 {
119            types::Int32.encode(buf, &self.v0_port)?;
120        }
121        if version >= 1 {
122            if version >= 6 {
123                types::CompactArray(types::Struct { version }).encode(buf, &self.endpoints)?;
124            } else {
125                types::Array(types::Struct { version }).encode(buf, &self.endpoints)?;
126            }
127        }
128        if version >= 2 {
129            if version >= 6 {
130                types::CompactString.encode(buf, &self.rack)?;
131            } else {
132                types::String.encode(buf, &self.rack)?;
133            }
134        }
135        if version >= 6 {
136            let num_tagged_fields = self.unknown_tagged_fields.len();
137            if num_tagged_fields > std::u32::MAX as usize {
138                bail!(
139                    "Too many tagged fields to encode ({} fields)",
140                    num_tagged_fields
141                );
142            }
143            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
144
145            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
146        }
147        Ok(())
148    }
149    fn compute_size(&self, version: i16) -> Result<usize> {
150        let mut total_size = 0;
151        total_size += types::Int32.compute_size(&self.id)?;
152        if version == 0 {
153            total_size += types::String.compute_size(&self.v0_host)?;
154        }
155        if version == 0 {
156            total_size += types::Int32.compute_size(&self.v0_port)?;
157        }
158        if version >= 1 {
159            if version >= 6 {
160                total_size +=
161                    types::CompactArray(types::Struct { version }).compute_size(&self.endpoints)?;
162            } else {
163                total_size +=
164                    types::Array(types::Struct { version }).compute_size(&self.endpoints)?;
165            }
166        }
167        if version >= 2 {
168            if version >= 6 {
169                total_size += types::CompactString.compute_size(&self.rack)?;
170            } else {
171                total_size += types::String.compute_size(&self.rack)?;
172            }
173        }
174        if version >= 6 {
175            let num_tagged_fields = self.unknown_tagged_fields.len();
176            if num_tagged_fields > std::u32::MAX as usize {
177                bail!(
178                    "Too many tagged fields to encode ({} fields)",
179                    num_tagged_fields
180                );
181            }
182            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
183
184            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
185        }
186        Ok(total_size)
187    }
188}
189
190#[cfg(feature = "broker")]
191impl Decodable for UpdateMetadataBroker {
192    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
193        let id = types::Int32.decode(buf)?;
194        let v0_host = if version == 0 {
195            types::String.decode(buf)?
196        } else {
197            Default::default()
198        };
199        let v0_port = if version == 0 {
200            types::Int32.decode(buf)?
201        } else {
202            0
203        };
204        let endpoints = if version >= 1 {
205            if version >= 6 {
206                types::CompactArray(types::Struct { version }).decode(buf)?
207            } else {
208                types::Array(types::Struct { version }).decode(buf)?
209            }
210        } else {
211            Default::default()
212        };
213        let rack = if version >= 2 {
214            if version >= 6 {
215                types::CompactString.decode(buf)?
216            } else {
217                types::String.decode(buf)?
218            }
219        } else {
220            Some(Default::default())
221        };
222        let mut unknown_tagged_fields = BTreeMap::new();
223        if version >= 6 {
224            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
225            for _ in 0..num_tagged_fields {
226                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
227                let size: u32 = types::UnsignedVarInt.decode(buf)?;
228                let unknown_value = buf.try_get_bytes(size as usize)?;
229                unknown_tagged_fields.insert(tag as i32, unknown_value);
230            }
231        }
232        Ok(Self {
233            id,
234            v0_host,
235            v0_port,
236            endpoints,
237            rack,
238            unknown_tagged_fields,
239        })
240    }
241}
242
243impl Default for UpdateMetadataBroker {
244    fn default() -> Self {
245        Self {
246            id: (0).into(),
247            v0_host: Default::default(),
248            v0_port: 0,
249            endpoints: Default::default(),
250            rack: Some(Default::default()),
251            unknown_tagged_fields: BTreeMap::new(),
252        }
253    }
254}
255
256impl Message for UpdateMetadataBroker {
257    const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
258    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
259}
260
261/// Valid versions: 0-8
262#[non_exhaustive]
263#[derive(Debug, Clone, PartialEq)]
264pub struct UpdateMetadataEndpoint {
265    /// The port of this endpoint
266    ///
267    /// Supported API versions: 1-8
268    pub port: i32,
269
270    /// The hostname of this endpoint
271    ///
272    /// Supported API versions: 1-8
273    pub host: StrBytes,
274
275    /// The listener name.
276    ///
277    /// Supported API versions: 3-8
278    pub listener: StrBytes,
279
280    /// The security protocol type.
281    ///
282    /// Supported API versions: 1-8
283    pub security_protocol: i16,
284
285    /// Other tagged fields
286    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
287}
288
289impl UpdateMetadataEndpoint {
290    /// Sets `port` to the passed value.
291    ///
292    /// The port of this endpoint
293    ///
294    /// Supported API versions: 1-8
295    pub fn with_port(mut self, value: i32) -> Self {
296        self.port = value;
297        self
298    }
299    /// Sets `host` to the passed value.
300    ///
301    /// The hostname of this endpoint
302    ///
303    /// Supported API versions: 1-8
304    pub fn with_host(mut self, value: StrBytes) -> Self {
305        self.host = value;
306        self
307    }
308    /// Sets `listener` to the passed value.
309    ///
310    /// The listener name.
311    ///
312    /// Supported API versions: 3-8
313    pub fn with_listener(mut self, value: StrBytes) -> Self {
314        self.listener = value;
315        self
316    }
317    /// Sets `security_protocol` to the passed value.
318    ///
319    /// The security protocol type.
320    ///
321    /// Supported API versions: 1-8
322    pub fn with_security_protocol(mut self, value: i16) -> Self {
323        self.security_protocol = value;
324        self
325    }
326    /// Sets unknown_tagged_fields to the passed value.
327    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
328        self.unknown_tagged_fields = value;
329        self
330    }
331    /// Inserts an entry into unknown_tagged_fields.
332    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
333        self.unknown_tagged_fields.insert(key, value);
334        self
335    }
336}
337
338#[cfg(feature = "client")]
339impl Encodable for UpdateMetadataEndpoint {
340    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
341        if version >= 1 {
342            types::Int32.encode(buf, &self.port)?;
343        } else {
344            if self.port != 0 {
345                bail!("A field is set that is not available on the selected protocol version");
346            }
347        }
348        if version >= 1 {
349            if version >= 6 {
350                types::CompactString.encode(buf, &self.host)?;
351            } else {
352                types::String.encode(buf, &self.host)?;
353            }
354        } else {
355            if !self.host.is_empty() {
356                bail!("A field is set that is not available on the selected protocol version");
357            }
358        }
359        if version >= 3 {
360            if version >= 6 {
361                types::CompactString.encode(buf, &self.listener)?;
362            } else {
363                types::String.encode(buf, &self.listener)?;
364            }
365        }
366        if version >= 1 {
367            types::Int16.encode(buf, &self.security_protocol)?;
368        } else {
369            if self.security_protocol != 0 {
370                bail!("A field is set that is not available on the selected protocol version");
371            }
372        }
373        if version >= 6 {
374            let num_tagged_fields = self.unknown_tagged_fields.len();
375            if num_tagged_fields > std::u32::MAX as usize {
376                bail!(
377                    "Too many tagged fields to encode ({} fields)",
378                    num_tagged_fields
379                );
380            }
381            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
382
383            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
384        }
385        Ok(())
386    }
387    fn compute_size(&self, version: i16) -> Result<usize> {
388        let mut total_size = 0;
389        if version >= 1 {
390            total_size += types::Int32.compute_size(&self.port)?;
391        } else {
392            if self.port != 0 {
393                bail!("A field is set that is not available on the selected protocol version");
394            }
395        }
396        if version >= 1 {
397            if version >= 6 {
398                total_size += types::CompactString.compute_size(&self.host)?;
399            } else {
400                total_size += types::String.compute_size(&self.host)?;
401            }
402        } else {
403            if !self.host.is_empty() {
404                bail!("A field is set that is not available on the selected protocol version");
405            }
406        }
407        if version >= 3 {
408            if version >= 6 {
409                total_size += types::CompactString.compute_size(&self.listener)?;
410            } else {
411                total_size += types::String.compute_size(&self.listener)?;
412            }
413        }
414        if version >= 1 {
415            total_size += types::Int16.compute_size(&self.security_protocol)?;
416        } else {
417            if self.security_protocol != 0 {
418                bail!("A field is set that is not available on the selected protocol version");
419            }
420        }
421        if version >= 6 {
422            let num_tagged_fields = self.unknown_tagged_fields.len();
423            if num_tagged_fields > std::u32::MAX as usize {
424                bail!(
425                    "Too many tagged fields to encode ({} fields)",
426                    num_tagged_fields
427                );
428            }
429            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
430
431            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
432        }
433        Ok(total_size)
434    }
435}
436
437#[cfg(feature = "broker")]
438impl Decodable for UpdateMetadataEndpoint {
439    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
440        let port = if version >= 1 {
441            types::Int32.decode(buf)?
442        } else {
443            0
444        };
445        let host = if version >= 1 {
446            if version >= 6 {
447                types::CompactString.decode(buf)?
448            } else {
449                types::String.decode(buf)?
450            }
451        } else {
452            Default::default()
453        };
454        let listener = if version >= 3 {
455            if version >= 6 {
456                types::CompactString.decode(buf)?
457            } else {
458                types::String.decode(buf)?
459            }
460        } else {
461            Default::default()
462        };
463        let security_protocol = if version >= 1 {
464            types::Int16.decode(buf)?
465        } else {
466            0
467        };
468        let mut unknown_tagged_fields = BTreeMap::new();
469        if version >= 6 {
470            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
471            for _ in 0..num_tagged_fields {
472                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
473                let size: u32 = types::UnsignedVarInt.decode(buf)?;
474                let unknown_value = buf.try_get_bytes(size as usize)?;
475                unknown_tagged_fields.insert(tag as i32, unknown_value);
476            }
477        }
478        Ok(Self {
479            port,
480            host,
481            listener,
482            security_protocol,
483            unknown_tagged_fields,
484        })
485    }
486}
487
488impl Default for UpdateMetadataEndpoint {
489    fn default() -> Self {
490        Self {
491            port: 0,
492            host: Default::default(),
493            listener: Default::default(),
494            security_protocol: 0,
495            unknown_tagged_fields: BTreeMap::new(),
496        }
497    }
498}
499
500impl Message for UpdateMetadataEndpoint {
501    const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
502    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
503}
504
505/// Valid versions: 0-8
506#[non_exhaustive]
507#[derive(Debug, Clone, PartialEq)]
508pub struct UpdateMetadataPartitionState {
509    /// In older versions of this RPC, the topic name.
510    ///
511    /// Supported API versions: 0-4
512    pub topic_name: super::TopicName,
513
514    /// The partition index.
515    ///
516    /// Supported API versions: 0-8
517    pub partition_index: i32,
518
519    /// The controller epoch.
520    ///
521    /// Supported API versions: 0-8
522    pub controller_epoch: i32,
523
524    /// The ID of the broker which is the current partition leader.
525    ///
526    /// Supported API versions: 0-8
527    pub leader: super::BrokerId,
528
529    /// The leader epoch of this partition.
530    ///
531    /// Supported API versions: 0-8
532    pub leader_epoch: i32,
533
534    /// The brokers which are in the ISR for this partition.
535    ///
536    /// Supported API versions: 0-8
537    pub isr: Vec<super::BrokerId>,
538
539    /// The Zookeeper version.
540    ///
541    /// Supported API versions: 0-8
542    pub zk_version: i32,
543
544    /// All the replicas of this partition.
545    ///
546    /// Supported API versions: 0-8
547    pub replicas: Vec<super::BrokerId>,
548
549    /// The replicas of this partition which are offline.
550    ///
551    /// Supported API versions: 4-8
552    pub offline_replicas: Vec<super::BrokerId>,
553
554    /// Other tagged fields
555    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
556}
557
558impl UpdateMetadataPartitionState {
559    /// Sets `topic_name` to the passed value.
560    ///
561    /// In older versions of this RPC, the topic name.
562    ///
563    /// Supported API versions: 0-4
564    pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
565        self.topic_name = value;
566        self
567    }
568    /// Sets `partition_index` to the passed value.
569    ///
570    /// The partition index.
571    ///
572    /// Supported API versions: 0-8
573    pub fn with_partition_index(mut self, value: i32) -> Self {
574        self.partition_index = value;
575        self
576    }
577    /// Sets `controller_epoch` to the passed value.
578    ///
579    /// The controller epoch.
580    ///
581    /// Supported API versions: 0-8
582    pub fn with_controller_epoch(mut self, value: i32) -> Self {
583        self.controller_epoch = value;
584        self
585    }
586    /// Sets `leader` to the passed value.
587    ///
588    /// The ID of the broker which is the current partition leader.
589    ///
590    /// Supported API versions: 0-8
591    pub fn with_leader(mut self, value: super::BrokerId) -> Self {
592        self.leader = value;
593        self
594    }
595    /// Sets `leader_epoch` to the passed value.
596    ///
597    /// The leader epoch of this partition.
598    ///
599    /// Supported API versions: 0-8
600    pub fn with_leader_epoch(mut self, value: i32) -> Self {
601        self.leader_epoch = value;
602        self
603    }
604    /// Sets `isr` to the passed value.
605    ///
606    /// The brokers which are in the ISR for this partition.
607    ///
608    /// Supported API versions: 0-8
609    pub fn with_isr(mut self, value: Vec<super::BrokerId>) -> Self {
610        self.isr = value;
611        self
612    }
613    /// Sets `zk_version` to the passed value.
614    ///
615    /// The Zookeeper version.
616    ///
617    /// Supported API versions: 0-8
618    pub fn with_zk_version(mut self, value: i32) -> Self {
619        self.zk_version = value;
620        self
621    }
622    /// Sets `replicas` to the passed value.
623    ///
624    /// All the replicas of this partition.
625    ///
626    /// Supported API versions: 0-8
627    pub fn with_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
628        self.replicas = value;
629        self
630    }
631    /// Sets `offline_replicas` to the passed value.
632    ///
633    /// The replicas of this partition which are offline.
634    ///
635    /// Supported API versions: 4-8
636    pub fn with_offline_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
637        self.offline_replicas = value;
638        self
639    }
640    /// Sets unknown_tagged_fields to the passed value.
641    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
642        self.unknown_tagged_fields = value;
643        self
644    }
645    /// Inserts an entry into unknown_tagged_fields.
646    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
647        self.unknown_tagged_fields.insert(key, value);
648        self
649    }
650}
651
652#[cfg(feature = "client")]
653impl Encodable for UpdateMetadataPartitionState {
654    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
655        if version <= 4 {
656            types::String.encode(buf, &self.topic_name)?;
657        }
658        types::Int32.encode(buf, &self.partition_index)?;
659        types::Int32.encode(buf, &self.controller_epoch)?;
660        types::Int32.encode(buf, &self.leader)?;
661        types::Int32.encode(buf, &self.leader_epoch)?;
662        if version >= 6 {
663            types::CompactArray(types::Int32).encode(buf, &self.isr)?;
664        } else {
665            types::Array(types::Int32).encode(buf, &self.isr)?;
666        }
667        types::Int32.encode(buf, &self.zk_version)?;
668        if version >= 6 {
669            types::CompactArray(types::Int32).encode(buf, &self.replicas)?;
670        } else {
671            types::Array(types::Int32).encode(buf, &self.replicas)?;
672        }
673        if version >= 4 {
674            if version >= 6 {
675                types::CompactArray(types::Int32).encode(buf, &self.offline_replicas)?;
676            } else {
677                types::Array(types::Int32).encode(buf, &self.offline_replicas)?;
678            }
679        }
680        if version >= 6 {
681            let num_tagged_fields = self.unknown_tagged_fields.len();
682            if num_tagged_fields > std::u32::MAX as usize {
683                bail!(
684                    "Too many tagged fields to encode ({} fields)",
685                    num_tagged_fields
686                );
687            }
688            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
689
690            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
691        }
692        Ok(())
693    }
694    fn compute_size(&self, version: i16) -> Result<usize> {
695        let mut total_size = 0;
696        if version <= 4 {
697            total_size += types::String.compute_size(&self.topic_name)?;
698        }
699        total_size += types::Int32.compute_size(&self.partition_index)?;
700        total_size += types::Int32.compute_size(&self.controller_epoch)?;
701        total_size += types::Int32.compute_size(&self.leader)?;
702        total_size += types::Int32.compute_size(&self.leader_epoch)?;
703        if version >= 6 {
704            total_size += types::CompactArray(types::Int32).compute_size(&self.isr)?;
705        } else {
706            total_size += types::Array(types::Int32).compute_size(&self.isr)?;
707        }
708        total_size += types::Int32.compute_size(&self.zk_version)?;
709        if version >= 6 {
710            total_size += types::CompactArray(types::Int32).compute_size(&self.replicas)?;
711        } else {
712            total_size += types::Array(types::Int32).compute_size(&self.replicas)?;
713        }
714        if version >= 4 {
715            if version >= 6 {
716                total_size +=
717                    types::CompactArray(types::Int32).compute_size(&self.offline_replicas)?;
718            } else {
719                total_size += types::Array(types::Int32).compute_size(&self.offline_replicas)?;
720            }
721        }
722        if version >= 6 {
723            let num_tagged_fields = self.unknown_tagged_fields.len();
724            if num_tagged_fields > std::u32::MAX as usize {
725                bail!(
726                    "Too many tagged fields to encode ({} fields)",
727                    num_tagged_fields
728                );
729            }
730            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
731
732            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
733        }
734        Ok(total_size)
735    }
736}
737
738#[cfg(feature = "broker")]
739impl Decodable for UpdateMetadataPartitionState {
740    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
741        let topic_name = if version <= 4 {
742            types::String.decode(buf)?
743        } else {
744            Default::default()
745        };
746        let partition_index = types::Int32.decode(buf)?;
747        let controller_epoch = types::Int32.decode(buf)?;
748        let leader = types::Int32.decode(buf)?;
749        let leader_epoch = types::Int32.decode(buf)?;
750        let isr = if version >= 6 {
751            types::CompactArray(types::Int32).decode(buf)?
752        } else {
753            types::Array(types::Int32).decode(buf)?
754        };
755        let zk_version = types::Int32.decode(buf)?;
756        let replicas = if version >= 6 {
757            types::CompactArray(types::Int32).decode(buf)?
758        } else {
759            types::Array(types::Int32).decode(buf)?
760        };
761        let offline_replicas = if version >= 4 {
762            if version >= 6 {
763                types::CompactArray(types::Int32).decode(buf)?
764            } else {
765                types::Array(types::Int32).decode(buf)?
766            }
767        } else {
768            Default::default()
769        };
770        let mut unknown_tagged_fields = BTreeMap::new();
771        if version >= 6 {
772            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
773            for _ in 0..num_tagged_fields {
774                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
775                let size: u32 = types::UnsignedVarInt.decode(buf)?;
776                let unknown_value = buf.try_get_bytes(size as usize)?;
777                unknown_tagged_fields.insert(tag as i32, unknown_value);
778            }
779        }
780        Ok(Self {
781            topic_name,
782            partition_index,
783            controller_epoch,
784            leader,
785            leader_epoch,
786            isr,
787            zk_version,
788            replicas,
789            offline_replicas,
790            unknown_tagged_fields,
791        })
792    }
793}
794
795impl Default for UpdateMetadataPartitionState {
796    fn default() -> Self {
797        Self {
798            topic_name: Default::default(),
799            partition_index: 0,
800            controller_epoch: 0,
801            leader: (0).into(),
802            leader_epoch: 0,
803            isr: Default::default(),
804            zk_version: 0,
805            replicas: Default::default(),
806            offline_replicas: Default::default(),
807            unknown_tagged_fields: BTreeMap::new(),
808        }
809    }
810}
811
812impl Message for UpdateMetadataPartitionState {
813    const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
814    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
815}
816
817/// Valid versions: 0-8
818#[non_exhaustive]
819#[derive(Debug, Clone, PartialEq)]
820pub struct UpdateMetadataRequest {
821    /// The controller id.
822    ///
823    /// Supported API versions: 0-8
824    pub controller_id: super::BrokerId,
825
826    /// If KRaft controller id is used during migration. See KIP-866
827    ///
828    /// Supported API versions: 8
829    pub is_k_raft_controller: bool,
830
831    /// Indicates if this request is a Full metadata snapshot (2), Incremental (1), or Unknown (0). Using during ZK migration, see KIP-866
832    ///
833    /// Supported API versions: 8
834    pub _type: i8,
835
836    /// The controller epoch.
837    ///
838    /// Supported API versions: 0-8
839    pub controller_epoch: i32,
840
841    /// The broker epoch.
842    ///
843    /// Supported API versions: 5-8
844    pub broker_epoch: i64,
845
846    /// In older versions of this RPC, each partition that we would like to update.
847    ///
848    /// Supported API versions: 0-4
849    pub ungrouped_partition_states: Vec<UpdateMetadataPartitionState>,
850
851    /// In newer versions of this RPC, each topic that we would like to update.
852    ///
853    /// Supported API versions: 5-8
854    pub topic_states: Vec<UpdateMetadataTopicState>,
855
856    ///
857    ///
858    /// Supported API versions: 0-8
859    pub live_brokers: Vec<UpdateMetadataBroker>,
860
861    /// Other tagged fields
862    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
863}
864
865impl UpdateMetadataRequest {
866    /// Sets `controller_id` to the passed value.
867    ///
868    /// The controller id.
869    ///
870    /// Supported API versions: 0-8
871    pub fn with_controller_id(mut self, value: super::BrokerId) -> Self {
872        self.controller_id = value;
873        self
874    }
875    /// Sets `is_k_raft_controller` to the passed value.
876    ///
877    /// If KRaft controller id is used during migration. See KIP-866
878    ///
879    /// Supported API versions: 8
880    pub fn with_is_k_raft_controller(mut self, value: bool) -> Self {
881        self.is_k_raft_controller = value;
882        self
883    }
884    /// Sets `_type` to the passed value.
885    ///
886    /// Indicates if this request is a Full metadata snapshot (2), Incremental (1), or Unknown (0). Using during ZK migration, see KIP-866
887    ///
888    /// Supported API versions: 8
889    pub fn with_type(mut self, value: i8) -> Self {
890        self._type = value;
891        self
892    }
893    /// Sets `controller_epoch` to the passed value.
894    ///
895    /// The controller epoch.
896    ///
897    /// Supported API versions: 0-8
898    pub fn with_controller_epoch(mut self, value: i32) -> Self {
899        self.controller_epoch = value;
900        self
901    }
902    /// Sets `broker_epoch` to the passed value.
903    ///
904    /// The broker epoch.
905    ///
906    /// Supported API versions: 5-8
907    pub fn with_broker_epoch(mut self, value: i64) -> Self {
908        self.broker_epoch = value;
909        self
910    }
911    /// Sets `ungrouped_partition_states` to the passed value.
912    ///
913    /// In older versions of this RPC, each partition that we would like to update.
914    ///
915    /// Supported API versions: 0-4
916    pub fn with_ungrouped_partition_states(
917        mut self,
918        value: Vec<UpdateMetadataPartitionState>,
919    ) -> Self {
920        self.ungrouped_partition_states = value;
921        self
922    }
923    /// Sets `topic_states` to the passed value.
924    ///
925    /// In newer versions of this RPC, each topic that we would like to update.
926    ///
927    /// Supported API versions: 5-8
928    pub fn with_topic_states(mut self, value: Vec<UpdateMetadataTopicState>) -> Self {
929        self.topic_states = value;
930        self
931    }
932    /// Sets `live_brokers` to the passed value.
933    ///
934    ///
935    ///
936    /// Supported API versions: 0-8
937    pub fn with_live_brokers(mut self, value: Vec<UpdateMetadataBroker>) -> Self {
938        self.live_brokers = value;
939        self
940    }
941    /// Sets unknown_tagged_fields to the passed value.
942    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
943        self.unknown_tagged_fields = value;
944        self
945    }
946    /// Inserts an entry into unknown_tagged_fields.
947    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
948        self.unknown_tagged_fields.insert(key, value);
949        self
950    }
951}
952
953#[cfg(feature = "client")]
954impl Encodable for UpdateMetadataRequest {
955    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
956        types::Int32.encode(buf, &self.controller_id)?;
957        if version >= 8 {
958            types::Boolean.encode(buf, &self.is_k_raft_controller)?;
959        } else {
960            if self.is_k_raft_controller {
961                bail!("A field is set that is not available on the selected protocol version");
962            }
963        }
964        types::Int32.encode(buf, &self.controller_epoch)?;
965        if version >= 5 {
966            types::Int64.encode(buf, &self.broker_epoch)?;
967        }
968        if version <= 4 {
969            types::Array(types::Struct { version })
970                .encode(buf, &self.ungrouped_partition_states)?;
971        } else {
972            if !self.ungrouped_partition_states.is_empty() {
973                bail!("A field is set that is not available on the selected protocol version");
974            }
975        }
976        if version >= 5 {
977            if version >= 6 {
978                types::CompactArray(types::Struct { version }).encode(buf, &self.topic_states)?;
979            } else {
980                types::Array(types::Struct { version }).encode(buf, &self.topic_states)?;
981            }
982        } else {
983            if !self.topic_states.is_empty() {
984                bail!("A field is set that is not available on the selected protocol version");
985            }
986        }
987        if version >= 6 {
988            types::CompactArray(types::Struct { version }).encode(buf, &self.live_brokers)?;
989        } else {
990            types::Array(types::Struct { version }).encode(buf, &self.live_brokers)?;
991        }
992        if version >= 6 {
993            let mut num_tagged_fields = self.unknown_tagged_fields.len();
994            if version >= 8 {
995                if self._type != 0 {
996                    num_tagged_fields += 1;
997                }
998            }
999            if num_tagged_fields > std::u32::MAX as usize {
1000                bail!(
1001                    "Too many tagged fields to encode ({} fields)",
1002                    num_tagged_fields
1003                );
1004            }
1005            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1006            if version >= 8 {
1007                if self._type != 0 {
1008                    let computed_size = types::Int8.compute_size(&self._type)?;
1009                    if computed_size > std::u32::MAX as usize {
1010                        bail!(
1011                            "Tagged field is too large to encode ({} bytes)",
1012                            computed_size
1013                        );
1014                    }
1015                    types::UnsignedVarInt.encode(buf, 0)?;
1016                    types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1017                    types::Int8.encode(buf, &self._type)?;
1018                }
1019            }
1020            write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
1021        }
1022        Ok(())
1023    }
1024    fn compute_size(&self, version: i16) -> Result<usize> {
1025        let mut total_size = 0;
1026        total_size += types::Int32.compute_size(&self.controller_id)?;
1027        if version >= 8 {
1028            total_size += types::Boolean.compute_size(&self.is_k_raft_controller)?;
1029        } else {
1030            if self.is_k_raft_controller {
1031                bail!("A field is set that is not available on the selected protocol version");
1032            }
1033        }
1034        total_size += types::Int32.compute_size(&self.controller_epoch)?;
1035        if version >= 5 {
1036            total_size += types::Int64.compute_size(&self.broker_epoch)?;
1037        }
1038        if version <= 4 {
1039            total_size += types::Array(types::Struct { version })
1040                .compute_size(&self.ungrouped_partition_states)?;
1041        } else {
1042            if !self.ungrouped_partition_states.is_empty() {
1043                bail!("A field is set that is not available on the selected protocol version");
1044            }
1045        }
1046        if version >= 5 {
1047            if version >= 6 {
1048                total_size += types::CompactArray(types::Struct { version })
1049                    .compute_size(&self.topic_states)?;
1050            } else {
1051                total_size +=
1052                    types::Array(types::Struct { version }).compute_size(&self.topic_states)?;
1053            }
1054        } else {
1055            if !self.topic_states.is_empty() {
1056                bail!("A field is set that is not available on the selected protocol version");
1057            }
1058        }
1059        if version >= 6 {
1060            total_size +=
1061                types::CompactArray(types::Struct { version }).compute_size(&self.live_brokers)?;
1062        } else {
1063            total_size +=
1064                types::Array(types::Struct { version }).compute_size(&self.live_brokers)?;
1065        }
1066        if version >= 6 {
1067            let mut num_tagged_fields = self.unknown_tagged_fields.len();
1068            if version >= 8 {
1069                if self._type != 0 {
1070                    num_tagged_fields += 1;
1071                }
1072            }
1073            if num_tagged_fields > std::u32::MAX as usize {
1074                bail!(
1075                    "Too many tagged fields to encode ({} fields)",
1076                    num_tagged_fields
1077                );
1078            }
1079            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1080            if version >= 8 {
1081                if self._type != 0 {
1082                    let computed_size = types::Int8.compute_size(&self._type)?;
1083                    if computed_size > std::u32::MAX as usize {
1084                        bail!(
1085                            "Tagged field is too large to encode ({} bytes)",
1086                            computed_size
1087                        );
1088                    }
1089                    total_size += types::UnsignedVarInt.compute_size(0)?;
1090                    total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1091                    total_size += computed_size;
1092                }
1093            }
1094            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1095        }
1096        Ok(total_size)
1097    }
1098}
1099
1100#[cfg(feature = "broker")]
1101impl Decodable for UpdateMetadataRequest {
1102    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1103        let controller_id = types::Int32.decode(buf)?;
1104        let is_k_raft_controller = if version >= 8 {
1105            types::Boolean.decode(buf)?
1106        } else {
1107            false
1108        };
1109        let mut _type = 0;
1110        let controller_epoch = types::Int32.decode(buf)?;
1111        let broker_epoch = if version >= 5 {
1112            types::Int64.decode(buf)?
1113        } else {
1114            -1
1115        };
1116        let ungrouped_partition_states = if version <= 4 {
1117            types::Array(types::Struct { version }).decode(buf)?
1118        } else {
1119            Default::default()
1120        };
1121        let topic_states = if version >= 5 {
1122            if version >= 6 {
1123                types::CompactArray(types::Struct { version }).decode(buf)?
1124            } else {
1125                types::Array(types::Struct { version }).decode(buf)?
1126            }
1127        } else {
1128            Default::default()
1129        };
1130        let live_brokers = if version >= 6 {
1131            types::CompactArray(types::Struct { version }).decode(buf)?
1132        } else {
1133            types::Array(types::Struct { version }).decode(buf)?
1134        };
1135        let mut unknown_tagged_fields = BTreeMap::new();
1136        if version >= 6 {
1137            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1138            for _ in 0..num_tagged_fields {
1139                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1140                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1141                match tag {
1142                    0 => {
1143                        if version >= 8 {
1144                            _type = types::Int8.decode(buf)?;
1145                        } else {
1146                            bail!("Tag {} is not valid for version {}", tag, version);
1147                        }
1148                    }
1149                    _ => {
1150                        let unknown_value = buf.try_get_bytes(size as usize)?;
1151                        unknown_tagged_fields.insert(tag as i32, unknown_value);
1152                    }
1153                }
1154            }
1155        }
1156        Ok(Self {
1157            controller_id,
1158            is_k_raft_controller,
1159            _type,
1160            controller_epoch,
1161            broker_epoch,
1162            ungrouped_partition_states,
1163            topic_states,
1164            live_brokers,
1165            unknown_tagged_fields,
1166        })
1167    }
1168}
1169
1170impl Default for UpdateMetadataRequest {
1171    fn default() -> Self {
1172        Self {
1173            controller_id: (0).into(),
1174            is_k_raft_controller: false,
1175            _type: 0,
1176            controller_epoch: 0,
1177            broker_epoch: -1,
1178            ungrouped_partition_states: Default::default(),
1179            topic_states: Default::default(),
1180            live_brokers: Default::default(),
1181            unknown_tagged_fields: BTreeMap::new(),
1182        }
1183    }
1184}
1185
1186impl Message for UpdateMetadataRequest {
1187    const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
1188    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1189}
1190
1191/// Valid versions: 0-8
1192#[non_exhaustive]
1193#[derive(Debug, Clone, PartialEq)]
1194pub struct UpdateMetadataTopicState {
1195    /// The topic name.
1196    ///
1197    /// Supported API versions: 5-8
1198    pub topic_name: super::TopicName,
1199
1200    /// The topic id.
1201    ///
1202    /// Supported API versions: 7-8
1203    pub topic_id: Uuid,
1204
1205    /// The partition that we would like to update.
1206    ///
1207    /// Supported API versions: 5-8
1208    pub partition_states: Vec<UpdateMetadataPartitionState>,
1209
1210    /// Other tagged fields
1211    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1212}
1213
1214impl UpdateMetadataTopicState {
1215    /// Sets `topic_name` to the passed value.
1216    ///
1217    /// The topic name.
1218    ///
1219    /// Supported API versions: 5-8
1220    pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
1221        self.topic_name = value;
1222        self
1223    }
1224    /// Sets `topic_id` to the passed value.
1225    ///
1226    /// The topic id.
1227    ///
1228    /// Supported API versions: 7-8
1229    pub fn with_topic_id(mut self, value: Uuid) -> Self {
1230        self.topic_id = value;
1231        self
1232    }
1233    /// Sets `partition_states` to the passed value.
1234    ///
1235    /// The partition that we would like to update.
1236    ///
1237    /// Supported API versions: 5-8
1238    pub fn with_partition_states(mut self, value: Vec<UpdateMetadataPartitionState>) -> Self {
1239        self.partition_states = value;
1240        self
1241    }
1242    /// Sets unknown_tagged_fields to the passed value.
1243    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1244        self.unknown_tagged_fields = value;
1245        self
1246    }
1247    /// Inserts an entry into unknown_tagged_fields.
1248    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1249        self.unknown_tagged_fields.insert(key, value);
1250        self
1251    }
1252}
1253
1254#[cfg(feature = "client")]
1255impl Encodable for UpdateMetadataTopicState {
1256    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1257        if version >= 5 {
1258            if version >= 6 {
1259                types::CompactString.encode(buf, &self.topic_name)?;
1260            } else {
1261                types::String.encode(buf, &self.topic_name)?;
1262            }
1263        } else {
1264            if !self.topic_name.is_empty() {
1265                bail!("A field is set that is not available on the selected protocol version");
1266            }
1267        }
1268        if version >= 7 {
1269            types::Uuid.encode(buf, &self.topic_id)?;
1270        }
1271        if version >= 5 {
1272            if version >= 6 {
1273                types::CompactArray(types::Struct { version })
1274                    .encode(buf, &self.partition_states)?;
1275            } else {
1276                types::Array(types::Struct { version }).encode(buf, &self.partition_states)?;
1277            }
1278        } else {
1279            if !self.partition_states.is_empty() {
1280                bail!("A field is set that is not available on the selected protocol version");
1281            }
1282        }
1283        if version >= 6 {
1284            let num_tagged_fields = self.unknown_tagged_fields.len();
1285            if num_tagged_fields > std::u32::MAX as usize {
1286                bail!(
1287                    "Too many tagged fields to encode ({} fields)",
1288                    num_tagged_fields
1289                );
1290            }
1291            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1292
1293            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1294        }
1295        Ok(())
1296    }
1297    fn compute_size(&self, version: i16) -> Result<usize> {
1298        let mut total_size = 0;
1299        if version >= 5 {
1300            if version >= 6 {
1301                total_size += types::CompactString.compute_size(&self.topic_name)?;
1302            } else {
1303                total_size += types::String.compute_size(&self.topic_name)?;
1304            }
1305        } else {
1306            if !self.topic_name.is_empty() {
1307                bail!("A field is set that is not available on the selected protocol version");
1308            }
1309        }
1310        if version >= 7 {
1311            total_size += types::Uuid.compute_size(&self.topic_id)?;
1312        }
1313        if version >= 5 {
1314            if version >= 6 {
1315                total_size += types::CompactArray(types::Struct { version })
1316                    .compute_size(&self.partition_states)?;
1317            } else {
1318                total_size +=
1319                    types::Array(types::Struct { version }).compute_size(&self.partition_states)?;
1320            }
1321        } else {
1322            if !self.partition_states.is_empty() {
1323                bail!("A field is set that is not available on the selected protocol version");
1324            }
1325        }
1326        if version >= 6 {
1327            let num_tagged_fields = self.unknown_tagged_fields.len();
1328            if num_tagged_fields > std::u32::MAX as usize {
1329                bail!(
1330                    "Too many tagged fields to encode ({} fields)",
1331                    num_tagged_fields
1332                );
1333            }
1334            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1335
1336            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1337        }
1338        Ok(total_size)
1339    }
1340}
1341
1342#[cfg(feature = "broker")]
1343impl Decodable for UpdateMetadataTopicState {
1344    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1345        let topic_name = if version >= 5 {
1346            if version >= 6 {
1347                types::CompactString.decode(buf)?
1348            } else {
1349                types::String.decode(buf)?
1350            }
1351        } else {
1352            Default::default()
1353        };
1354        let topic_id = if version >= 7 {
1355            types::Uuid.decode(buf)?
1356        } else {
1357            Uuid::nil()
1358        };
1359        let partition_states = if version >= 5 {
1360            if version >= 6 {
1361                types::CompactArray(types::Struct { version }).decode(buf)?
1362            } else {
1363                types::Array(types::Struct { version }).decode(buf)?
1364            }
1365        } else {
1366            Default::default()
1367        };
1368        let mut unknown_tagged_fields = BTreeMap::new();
1369        if version >= 6 {
1370            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1371            for _ in 0..num_tagged_fields {
1372                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1373                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1374                let unknown_value = buf.try_get_bytes(size as usize)?;
1375                unknown_tagged_fields.insert(tag as i32, unknown_value);
1376            }
1377        }
1378        Ok(Self {
1379            topic_name,
1380            topic_id,
1381            partition_states,
1382            unknown_tagged_fields,
1383        })
1384    }
1385}
1386
1387impl Default for UpdateMetadataTopicState {
1388    fn default() -> Self {
1389        Self {
1390            topic_name: Default::default(),
1391            topic_id: Uuid::nil(),
1392            partition_states: Default::default(),
1393            unknown_tagged_fields: BTreeMap::new(),
1394        }
1395    }
1396}
1397
1398impl Message for UpdateMetadataTopicState {
1399    const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
1400    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1401}
1402
1403impl HeaderVersion for UpdateMetadataRequest {
1404    fn header_version(version: i16) -> i16 {
1405        if version >= 6 {
1406            2
1407        } else {
1408            1
1409        }
1410    }
1411}