kafka_protocol/messages/
leader_and_isr_request.rs

1//! LeaderAndIsrRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/LeaderAndIsrRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-7
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct LeaderAndIsrLiveLeader {
24    /// The leader's broker ID.
25    ///
26    /// Supported API versions: 0-7
27    pub broker_id: super::BrokerId,
28
29    /// The leader's hostname.
30    ///
31    /// Supported API versions: 0-7
32    pub host_name: StrBytes,
33
34    /// The leader's port.
35    ///
36    /// Supported API versions: 0-7
37    pub port: i32,
38
39    /// Other tagged fields
40    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl LeaderAndIsrLiveLeader {
44    /// Sets `broker_id` to the passed value.
45    ///
46    /// The leader's broker ID.
47    ///
48    /// Supported API versions: 0-7
49    pub fn with_broker_id(mut self, value: super::BrokerId) -> Self {
50        self.broker_id = value;
51        self
52    }
53    /// Sets `host_name` to the passed value.
54    ///
55    /// The leader's hostname.
56    ///
57    /// Supported API versions: 0-7
58    pub fn with_host_name(mut self, value: StrBytes) -> Self {
59        self.host_name = value;
60        self
61    }
62    /// Sets `port` to the passed value.
63    ///
64    /// The leader's port.
65    ///
66    /// Supported API versions: 0-7
67    pub fn with_port(mut self, value: i32) -> Self {
68        self.port = value;
69        self
70    }
71    /// Sets unknown_tagged_fields to the passed value.
72    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73        self.unknown_tagged_fields = value;
74        self
75    }
76    /// Inserts an entry into unknown_tagged_fields.
77    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78        self.unknown_tagged_fields.insert(key, value);
79        self
80    }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for LeaderAndIsrLiveLeader {
85    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86        if version < 0 || version > 7 {
87            bail!("specified version not supported by this message type");
88        }
89        types::Int32.encode(buf, &self.broker_id)?;
90        if version >= 4 {
91            types::CompactString.encode(buf, &self.host_name)?;
92        } else {
93            types::String.encode(buf, &self.host_name)?;
94        }
95        types::Int32.encode(buf, &self.port)?;
96        if version >= 4 {
97            let num_tagged_fields = self.unknown_tagged_fields.len();
98            if num_tagged_fields > std::u32::MAX as usize {
99                bail!(
100                    "Too many tagged fields to encode ({} fields)",
101                    num_tagged_fields
102                );
103            }
104            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
105
106            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
107        }
108        Ok(())
109    }
110    fn compute_size(&self, version: i16) -> Result<usize> {
111        let mut total_size = 0;
112        total_size += types::Int32.compute_size(&self.broker_id)?;
113        if version >= 4 {
114            total_size += types::CompactString.compute_size(&self.host_name)?;
115        } else {
116            total_size += types::String.compute_size(&self.host_name)?;
117        }
118        total_size += types::Int32.compute_size(&self.port)?;
119        if version >= 4 {
120            let num_tagged_fields = self.unknown_tagged_fields.len();
121            if num_tagged_fields > std::u32::MAX as usize {
122                bail!(
123                    "Too many tagged fields to encode ({} fields)",
124                    num_tagged_fields
125                );
126            }
127            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
128
129            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
130        }
131        Ok(total_size)
132    }
133}
134
135#[cfg(feature = "broker")]
136impl Decodable for LeaderAndIsrLiveLeader {
137    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
138        if version < 0 || version > 7 {
139            bail!("specified version not supported by this message type");
140        }
141        let broker_id = types::Int32.decode(buf)?;
142        let host_name = if version >= 4 {
143            types::CompactString.decode(buf)?
144        } else {
145            types::String.decode(buf)?
146        };
147        let port = types::Int32.decode(buf)?;
148        let mut unknown_tagged_fields = BTreeMap::new();
149        if version >= 4 {
150            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
151            for _ in 0..num_tagged_fields {
152                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
153                let size: u32 = types::UnsignedVarInt.decode(buf)?;
154                let unknown_value = buf.try_get_bytes(size as usize)?;
155                unknown_tagged_fields.insert(tag as i32, unknown_value);
156            }
157        }
158        Ok(Self {
159            broker_id,
160            host_name,
161            port,
162            unknown_tagged_fields,
163        })
164    }
165}
166
167impl Default for LeaderAndIsrLiveLeader {
168    fn default() -> Self {
169        Self {
170            broker_id: (0).into(),
171            host_name: Default::default(),
172            port: 0,
173            unknown_tagged_fields: BTreeMap::new(),
174        }
175    }
176}
177
178impl Message for LeaderAndIsrLiveLeader {
179    const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
180    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
181}
182
183/// Valid versions: 0-7
184#[non_exhaustive]
185#[derive(Debug, Clone, PartialEq)]
186pub struct LeaderAndIsrPartitionState {
187    /// The topic name.  This is only present in v0 or v1.
188    ///
189    /// Supported API versions: 0-1
190    pub topic_name: super::TopicName,
191
192    /// The partition index.
193    ///
194    /// Supported API versions: 0-7
195    pub partition_index: i32,
196
197    /// The controller epoch.
198    ///
199    /// Supported API versions: 0-7
200    pub controller_epoch: i32,
201
202    /// The broker ID of the leader.
203    ///
204    /// Supported API versions: 0-7
205    pub leader: super::BrokerId,
206
207    /// The leader epoch.
208    ///
209    /// Supported API versions: 0-7
210    pub leader_epoch: i32,
211
212    /// The in-sync replica IDs.
213    ///
214    /// Supported API versions: 0-7
215    pub isr: Vec<super::BrokerId>,
216
217    /// The current epoch for the partition. The epoch is a monotonically increasing value which is incremented after every partition change. (Since the LeaderAndIsr request is only used by the legacy controller, this corresponds to the zkVersion)
218    ///
219    /// Supported API versions: 0-7
220    pub partition_epoch: i32,
221
222    /// The replica IDs.
223    ///
224    /// Supported API versions: 0-7
225    pub replicas: Vec<super::BrokerId>,
226
227    /// The replica IDs that we are adding this partition to, or null if no replicas are being added.
228    ///
229    /// Supported API versions: 3-7
230    pub adding_replicas: Vec<super::BrokerId>,
231
232    /// The replica IDs that we are removing this partition from, or null if no replicas are being removed.
233    ///
234    /// Supported API versions: 3-7
235    pub removing_replicas: Vec<super::BrokerId>,
236
237    /// Whether the replica should have existed on the broker or not.
238    ///
239    /// Supported API versions: 1-7
240    pub is_new: bool,
241
242    /// 1 if the partition is recovering from an unclean leader election; 0 otherwise.
243    ///
244    /// Supported API versions: 6-7
245    pub leader_recovery_state: i8,
246
247    /// Other tagged fields
248    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
249}
250
251impl LeaderAndIsrPartitionState {
252    /// Sets `topic_name` to the passed value.
253    ///
254    /// The topic name.  This is only present in v0 or v1.
255    ///
256    /// Supported API versions: 0-1
257    pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
258        self.topic_name = value;
259        self
260    }
261    /// Sets `partition_index` to the passed value.
262    ///
263    /// The partition index.
264    ///
265    /// Supported API versions: 0-7
266    pub fn with_partition_index(mut self, value: i32) -> Self {
267        self.partition_index = value;
268        self
269    }
270    /// Sets `controller_epoch` to the passed value.
271    ///
272    /// The controller epoch.
273    ///
274    /// Supported API versions: 0-7
275    pub fn with_controller_epoch(mut self, value: i32) -> Self {
276        self.controller_epoch = value;
277        self
278    }
279    /// Sets `leader` to the passed value.
280    ///
281    /// The broker ID of the leader.
282    ///
283    /// Supported API versions: 0-7
284    pub fn with_leader(mut self, value: super::BrokerId) -> Self {
285        self.leader = value;
286        self
287    }
288    /// Sets `leader_epoch` to the passed value.
289    ///
290    /// The leader epoch.
291    ///
292    /// Supported API versions: 0-7
293    pub fn with_leader_epoch(mut self, value: i32) -> Self {
294        self.leader_epoch = value;
295        self
296    }
297    /// Sets `isr` to the passed value.
298    ///
299    /// The in-sync replica IDs.
300    ///
301    /// Supported API versions: 0-7
302    pub fn with_isr(mut self, value: Vec<super::BrokerId>) -> Self {
303        self.isr = value;
304        self
305    }
306    /// Sets `partition_epoch` to the passed value.
307    ///
308    /// The current epoch for the partition. The epoch is a monotonically increasing value which is incremented after every partition change. (Since the LeaderAndIsr request is only used by the legacy controller, this corresponds to the zkVersion)
309    ///
310    /// Supported API versions: 0-7
311    pub fn with_partition_epoch(mut self, value: i32) -> Self {
312        self.partition_epoch = value;
313        self
314    }
315    /// Sets `replicas` to the passed value.
316    ///
317    /// The replica IDs.
318    ///
319    /// Supported API versions: 0-7
320    pub fn with_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
321        self.replicas = value;
322        self
323    }
324    /// Sets `adding_replicas` to the passed value.
325    ///
326    /// The replica IDs that we are adding this partition to, or null if no replicas are being added.
327    ///
328    /// Supported API versions: 3-7
329    pub fn with_adding_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
330        self.adding_replicas = value;
331        self
332    }
333    /// Sets `removing_replicas` to the passed value.
334    ///
335    /// The replica IDs that we are removing this partition from, or null if no replicas are being removed.
336    ///
337    /// Supported API versions: 3-7
338    pub fn with_removing_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
339        self.removing_replicas = value;
340        self
341    }
342    /// Sets `is_new` to the passed value.
343    ///
344    /// Whether the replica should have existed on the broker or not.
345    ///
346    /// Supported API versions: 1-7
347    pub fn with_is_new(mut self, value: bool) -> Self {
348        self.is_new = value;
349        self
350    }
351    /// Sets `leader_recovery_state` to the passed value.
352    ///
353    /// 1 if the partition is recovering from an unclean leader election; 0 otherwise.
354    ///
355    /// Supported API versions: 6-7
356    pub fn with_leader_recovery_state(mut self, value: i8) -> Self {
357        self.leader_recovery_state = value;
358        self
359    }
360    /// Sets unknown_tagged_fields to the passed value.
361    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
362        self.unknown_tagged_fields = value;
363        self
364    }
365    /// Inserts an entry into unknown_tagged_fields.
366    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
367        self.unknown_tagged_fields.insert(key, value);
368        self
369    }
370}
371
372#[cfg(feature = "client")]
373impl Encodable for LeaderAndIsrPartitionState {
374    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
375        if version < 0 || version > 7 {
376            bail!("specified version not supported by this message type");
377        }
378        if version <= 1 {
379            types::String.encode(buf, &self.topic_name)?;
380        }
381        types::Int32.encode(buf, &self.partition_index)?;
382        types::Int32.encode(buf, &self.controller_epoch)?;
383        types::Int32.encode(buf, &self.leader)?;
384        types::Int32.encode(buf, &self.leader_epoch)?;
385        if version >= 4 {
386            types::CompactArray(types::Int32).encode(buf, &self.isr)?;
387        } else {
388            types::Array(types::Int32).encode(buf, &self.isr)?;
389        }
390        types::Int32.encode(buf, &self.partition_epoch)?;
391        if version >= 4 {
392            types::CompactArray(types::Int32).encode(buf, &self.replicas)?;
393        } else {
394            types::Array(types::Int32).encode(buf, &self.replicas)?;
395        }
396        if version >= 3 {
397            if version >= 4 {
398                types::CompactArray(types::Int32).encode(buf, &self.adding_replicas)?;
399            } else {
400                types::Array(types::Int32).encode(buf, &self.adding_replicas)?;
401            }
402        }
403        if version >= 3 {
404            if version >= 4 {
405                types::CompactArray(types::Int32).encode(buf, &self.removing_replicas)?;
406            } else {
407                types::Array(types::Int32).encode(buf, &self.removing_replicas)?;
408            }
409        }
410        if version >= 1 {
411            types::Boolean.encode(buf, &self.is_new)?;
412        }
413        if version >= 6 {
414            types::Int8.encode(buf, &self.leader_recovery_state)?;
415        } else {
416            if self.leader_recovery_state != 0 {
417                bail!("A field is set that is not available on the selected protocol version");
418            }
419        }
420        if version >= 4 {
421            let num_tagged_fields = self.unknown_tagged_fields.len();
422            if num_tagged_fields > std::u32::MAX as usize {
423                bail!(
424                    "Too many tagged fields to encode ({} fields)",
425                    num_tagged_fields
426                );
427            }
428            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
429
430            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
431        }
432        Ok(())
433    }
434    fn compute_size(&self, version: i16) -> Result<usize> {
435        let mut total_size = 0;
436        if version <= 1 {
437            total_size += types::String.compute_size(&self.topic_name)?;
438        }
439        total_size += types::Int32.compute_size(&self.partition_index)?;
440        total_size += types::Int32.compute_size(&self.controller_epoch)?;
441        total_size += types::Int32.compute_size(&self.leader)?;
442        total_size += types::Int32.compute_size(&self.leader_epoch)?;
443        if version >= 4 {
444            total_size += types::CompactArray(types::Int32).compute_size(&self.isr)?;
445        } else {
446            total_size += types::Array(types::Int32).compute_size(&self.isr)?;
447        }
448        total_size += types::Int32.compute_size(&self.partition_epoch)?;
449        if version >= 4 {
450            total_size += types::CompactArray(types::Int32).compute_size(&self.replicas)?;
451        } else {
452            total_size += types::Array(types::Int32).compute_size(&self.replicas)?;
453        }
454        if version >= 3 {
455            if version >= 4 {
456                total_size +=
457                    types::CompactArray(types::Int32).compute_size(&self.adding_replicas)?;
458            } else {
459                total_size += types::Array(types::Int32).compute_size(&self.adding_replicas)?;
460            }
461        }
462        if version >= 3 {
463            if version >= 4 {
464                total_size +=
465                    types::CompactArray(types::Int32).compute_size(&self.removing_replicas)?;
466            } else {
467                total_size += types::Array(types::Int32).compute_size(&self.removing_replicas)?;
468            }
469        }
470        if version >= 1 {
471            total_size += types::Boolean.compute_size(&self.is_new)?;
472        }
473        if version >= 6 {
474            total_size += types::Int8.compute_size(&self.leader_recovery_state)?;
475        } else {
476            if self.leader_recovery_state != 0 {
477                bail!("A field is set that is not available on the selected protocol version");
478            }
479        }
480        if version >= 4 {
481            let num_tagged_fields = self.unknown_tagged_fields.len();
482            if num_tagged_fields > std::u32::MAX as usize {
483                bail!(
484                    "Too many tagged fields to encode ({} fields)",
485                    num_tagged_fields
486                );
487            }
488            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
489
490            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
491        }
492        Ok(total_size)
493    }
494}
495
496#[cfg(feature = "broker")]
497impl Decodable for LeaderAndIsrPartitionState {
498    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
499        if version < 0 || version > 7 {
500            bail!("specified version not supported by this message type");
501        }
502        let topic_name = if version <= 1 {
503            types::String.decode(buf)?
504        } else {
505            Default::default()
506        };
507        let partition_index = types::Int32.decode(buf)?;
508        let controller_epoch = types::Int32.decode(buf)?;
509        let leader = types::Int32.decode(buf)?;
510        let leader_epoch = types::Int32.decode(buf)?;
511        let isr = if version >= 4 {
512            types::CompactArray(types::Int32).decode(buf)?
513        } else {
514            types::Array(types::Int32).decode(buf)?
515        };
516        let partition_epoch = types::Int32.decode(buf)?;
517        let replicas = if version >= 4 {
518            types::CompactArray(types::Int32).decode(buf)?
519        } else {
520            types::Array(types::Int32).decode(buf)?
521        };
522        let adding_replicas = if version >= 3 {
523            if version >= 4 {
524                types::CompactArray(types::Int32).decode(buf)?
525            } else {
526                types::Array(types::Int32).decode(buf)?
527            }
528        } else {
529            Default::default()
530        };
531        let removing_replicas = if version >= 3 {
532            if version >= 4 {
533                types::CompactArray(types::Int32).decode(buf)?
534            } else {
535                types::Array(types::Int32).decode(buf)?
536            }
537        } else {
538            Default::default()
539        };
540        let is_new = if version >= 1 {
541            types::Boolean.decode(buf)?
542        } else {
543            false
544        };
545        let leader_recovery_state = if version >= 6 {
546            types::Int8.decode(buf)?
547        } else {
548            0
549        };
550        let mut unknown_tagged_fields = BTreeMap::new();
551        if version >= 4 {
552            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
553            for _ in 0..num_tagged_fields {
554                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
555                let size: u32 = types::UnsignedVarInt.decode(buf)?;
556                let unknown_value = buf.try_get_bytes(size as usize)?;
557                unknown_tagged_fields.insert(tag as i32, unknown_value);
558            }
559        }
560        Ok(Self {
561            topic_name,
562            partition_index,
563            controller_epoch,
564            leader,
565            leader_epoch,
566            isr,
567            partition_epoch,
568            replicas,
569            adding_replicas,
570            removing_replicas,
571            is_new,
572            leader_recovery_state,
573            unknown_tagged_fields,
574        })
575    }
576}
577
578impl Default for LeaderAndIsrPartitionState {
579    fn default() -> Self {
580        Self {
581            topic_name: Default::default(),
582            partition_index: 0,
583            controller_epoch: 0,
584            leader: (0).into(),
585            leader_epoch: 0,
586            isr: Default::default(),
587            partition_epoch: 0,
588            replicas: Default::default(),
589            adding_replicas: Default::default(),
590            removing_replicas: Default::default(),
591            is_new: false,
592            leader_recovery_state: 0,
593            unknown_tagged_fields: BTreeMap::new(),
594        }
595    }
596}
597
598impl Message for LeaderAndIsrPartitionState {
599    const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
600    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
601}
602
603/// Valid versions: 0-7
604#[non_exhaustive]
605#[derive(Debug, Clone, PartialEq)]
606pub struct LeaderAndIsrRequest {
607    /// The current controller ID.
608    ///
609    /// Supported API versions: 0-7
610    pub controller_id: super::BrokerId,
611
612    /// If KRaft controller id is used during migration. See KIP-866
613    ///
614    /// Supported API versions: 7
615    pub is_k_raft_controller: bool,
616
617    /// The current controller epoch.
618    ///
619    /// Supported API versions: 0-7
620    pub controller_epoch: i32,
621
622    /// The current broker epoch.
623    ///
624    /// Supported API versions: 2-7
625    pub broker_epoch: i64,
626
627    /// The type that indicates whether all topics are included in the request
628    ///
629    /// Supported API versions: 5-7
630    pub _type: i8,
631
632    /// The state of each partition, in a v0 or v1 message.
633    ///
634    /// Supported API versions: 0-1
635    pub ungrouped_partition_states: Vec<LeaderAndIsrPartitionState>,
636
637    /// Each topic.
638    ///
639    /// Supported API versions: 2-7
640    pub topic_states: Vec<LeaderAndIsrTopicState>,
641
642    /// The current live leaders.
643    ///
644    /// Supported API versions: 0-7
645    pub live_leaders: Vec<LeaderAndIsrLiveLeader>,
646
647    /// Other tagged fields
648    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
649}
650
651impl LeaderAndIsrRequest {
652    /// Sets `controller_id` to the passed value.
653    ///
654    /// The current controller ID.
655    ///
656    /// Supported API versions: 0-7
657    pub fn with_controller_id(mut self, value: super::BrokerId) -> Self {
658        self.controller_id = value;
659        self
660    }
661    /// Sets `is_k_raft_controller` to the passed value.
662    ///
663    /// If KRaft controller id is used during migration. See KIP-866
664    ///
665    /// Supported API versions: 7
666    pub fn with_is_k_raft_controller(mut self, value: bool) -> Self {
667        self.is_k_raft_controller = value;
668        self
669    }
670    /// Sets `controller_epoch` to the passed value.
671    ///
672    /// The current controller epoch.
673    ///
674    /// Supported API versions: 0-7
675    pub fn with_controller_epoch(mut self, value: i32) -> Self {
676        self.controller_epoch = value;
677        self
678    }
679    /// Sets `broker_epoch` to the passed value.
680    ///
681    /// The current broker epoch.
682    ///
683    /// Supported API versions: 2-7
684    pub fn with_broker_epoch(mut self, value: i64) -> Self {
685        self.broker_epoch = value;
686        self
687    }
688    /// Sets `_type` to the passed value.
689    ///
690    /// The type that indicates whether all topics are included in the request
691    ///
692    /// Supported API versions: 5-7
693    pub fn with_type(mut self, value: i8) -> Self {
694        self._type = value;
695        self
696    }
697    /// Sets `ungrouped_partition_states` to the passed value.
698    ///
699    /// The state of each partition, in a v0 or v1 message.
700    ///
701    /// Supported API versions: 0-1
702    pub fn with_ungrouped_partition_states(
703        mut self,
704        value: Vec<LeaderAndIsrPartitionState>,
705    ) -> Self {
706        self.ungrouped_partition_states = value;
707        self
708    }
709    /// Sets `topic_states` to the passed value.
710    ///
711    /// Each topic.
712    ///
713    /// Supported API versions: 2-7
714    pub fn with_topic_states(mut self, value: Vec<LeaderAndIsrTopicState>) -> Self {
715        self.topic_states = value;
716        self
717    }
718    /// Sets `live_leaders` to the passed value.
719    ///
720    /// The current live leaders.
721    ///
722    /// Supported API versions: 0-7
723    pub fn with_live_leaders(mut self, value: Vec<LeaderAndIsrLiveLeader>) -> Self {
724        self.live_leaders = value;
725        self
726    }
727    /// Sets unknown_tagged_fields to the passed value.
728    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
729        self.unknown_tagged_fields = value;
730        self
731    }
732    /// Inserts an entry into unknown_tagged_fields.
733    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
734        self.unknown_tagged_fields.insert(key, value);
735        self
736    }
737}
738
739#[cfg(feature = "client")]
740impl Encodable for LeaderAndIsrRequest {
741    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
742        if version < 0 || version > 7 {
743            bail!("specified version not supported by this message type");
744        }
745        types::Int32.encode(buf, &self.controller_id)?;
746        if version >= 7 {
747            types::Boolean.encode(buf, &self.is_k_raft_controller)?;
748        } else {
749            if self.is_k_raft_controller {
750                bail!("A field is set that is not available on the selected protocol version");
751            }
752        }
753        types::Int32.encode(buf, &self.controller_epoch)?;
754        if version >= 2 {
755            types::Int64.encode(buf, &self.broker_epoch)?;
756        }
757        if version >= 5 {
758            types::Int8.encode(buf, &self._type)?;
759        } else {
760            if self._type != 0 {
761                bail!("A field is set that is not available on the selected protocol version");
762            }
763        }
764        if version <= 1 {
765            types::Array(types::Struct { version })
766                .encode(buf, &self.ungrouped_partition_states)?;
767        } else {
768            if !self.ungrouped_partition_states.is_empty() {
769                bail!("A field is set that is not available on the selected protocol version");
770            }
771        }
772        if version >= 2 {
773            if version >= 4 {
774                types::CompactArray(types::Struct { version }).encode(buf, &self.topic_states)?;
775            } else {
776                types::Array(types::Struct { version }).encode(buf, &self.topic_states)?;
777            }
778        } else {
779            if !self.topic_states.is_empty() {
780                bail!("A field is set that is not available on the selected protocol version");
781            }
782        }
783        if version >= 4 {
784            types::CompactArray(types::Struct { version }).encode(buf, &self.live_leaders)?;
785        } else {
786            types::Array(types::Struct { version }).encode(buf, &self.live_leaders)?;
787        }
788        if version >= 4 {
789            let num_tagged_fields = self.unknown_tagged_fields.len();
790            if num_tagged_fields > std::u32::MAX as usize {
791                bail!(
792                    "Too many tagged fields to encode ({} fields)",
793                    num_tagged_fields
794                );
795            }
796            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
797
798            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
799        }
800        Ok(())
801    }
802    fn compute_size(&self, version: i16) -> Result<usize> {
803        let mut total_size = 0;
804        total_size += types::Int32.compute_size(&self.controller_id)?;
805        if version >= 7 {
806            total_size += types::Boolean.compute_size(&self.is_k_raft_controller)?;
807        } else {
808            if self.is_k_raft_controller {
809                bail!("A field is set that is not available on the selected protocol version");
810            }
811        }
812        total_size += types::Int32.compute_size(&self.controller_epoch)?;
813        if version >= 2 {
814            total_size += types::Int64.compute_size(&self.broker_epoch)?;
815        }
816        if version >= 5 {
817            total_size += types::Int8.compute_size(&self._type)?;
818        } else {
819            if self._type != 0 {
820                bail!("A field is set that is not available on the selected protocol version");
821            }
822        }
823        if version <= 1 {
824            total_size += types::Array(types::Struct { version })
825                .compute_size(&self.ungrouped_partition_states)?;
826        } else {
827            if !self.ungrouped_partition_states.is_empty() {
828                bail!("A field is set that is not available on the selected protocol version");
829            }
830        }
831        if version >= 2 {
832            if version >= 4 {
833                total_size += types::CompactArray(types::Struct { version })
834                    .compute_size(&self.topic_states)?;
835            } else {
836                total_size +=
837                    types::Array(types::Struct { version }).compute_size(&self.topic_states)?;
838            }
839        } else {
840            if !self.topic_states.is_empty() {
841                bail!("A field is set that is not available on the selected protocol version");
842            }
843        }
844        if version >= 4 {
845            total_size +=
846                types::CompactArray(types::Struct { version }).compute_size(&self.live_leaders)?;
847        } else {
848            total_size +=
849                types::Array(types::Struct { version }).compute_size(&self.live_leaders)?;
850        }
851        if version >= 4 {
852            let num_tagged_fields = self.unknown_tagged_fields.len();
853            if num_tagged_fields > std::u32::MAX as usize {
854                bail!(
855                    "Too many tagged fields to encode ({} fields)",
856                    num_tagged_fields
857                );
858            }
859            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
860
861            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
862        }
863        Ok(total_size)
864    }
865}
866
867#[cfg(feature = "broker")]
868impl Decodable for LeaderAndIsrRequest {
869    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
870        if version < 0 || version > 7 {
871            bail!("specified version not supported by this message type");
872        }
873        let controller_id = types::Int32.decode(buf)?;
874        let is_k_raft_controller = if version >= 7 {
875            types::Boolean.decode(buf)?
876        } else {
877            false
878        };
879        let controller_epoch = types::Int32.decode(buf)?;
880        let broker_epoch = if version >= 2 {
881            types::Int64.decode(buf)?
882        } else {
883            -1
884        };
885        let _type = if version >= 5 {
886            types::Int8.decode(buf)?
887        } else {
888            0
889        };
890        let ungrouped_partition_states = if version <= 1 {
891            types::Array(types::Struct { version }).decode(buf)?
892        } else {
893            Default::default()
894        };
895        let topic_states = if version >= 2 {
896            if version >= 4 {
897                types::CompactArray(types::Struct { version }).decode(buf)?
898            } else {
899                types::Array(types::Struct { version }).decode(buf)?
900            }
901        } else {
902            Default::default()
903        };
904        let live_leaders = if version >= 4 {
905            types::CompactArray(types::Struct { version }).decode(buf)?
906        } else {
907            types::Array(types::Struct { version }).decode(buf)?
908        };
909        let mut unknown_tagged_fields = BTreeMap::new();
910        if version >= 4 {
911            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
912            for _ in 0..num_tagged_fields {
913                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
914                let size: u32 = types::UnsignedVarInt.decode(buf)?;
915                let unknown_value = buf.try_get_bytes(size as usize)?;
916                unknown_tagged_fields.insert(tag as i32, unknown_value);
917            }
918        }
919        Ok(Self {
920            controller_id,
921            is_k_raft_controller,
922            controller_epoch,
923            broker_epoch,
924            _type,
925            ungrouped_partition_states,
926            topic_states,
927            live_leaders,
928            unknown_tagged_fields,
929        })
930    }
931}
932
933impl Default for LeaderAndIsrRequest {
934    fn default() -> Self {
935        Self {
936            controller_id: (0).into(),
937            is_k_raft_controller: false,
938            controller_epoch: 0,
939            broker_epoch: -1,
940            _type: 0,
941            ungrouped_partition_states: Default::default(),
942            topic_states: Default::default(),
943            live_leaders: Default::default(),
944            unknown_tagged_fields: BTreeMap::new(),
945        }
946    }
947}
948
949impl Message for LeaderAndIsrRequest {
950    const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
951    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
952}
953
954/// Valid versions: 0-7
955#[non_exhaustive]
956#[derive(Debug, Clone, PartialEq)]
957pub struct LeaderAndIsrTopicState {
958    /// The topic name.
959    ///
960    /// Supported API versions: 2-7
961    pub topic_name: super::TopicName,
962
963    /// The unique topic ID.
964    ///
965    /// Supported API versions: 5-7
966    pub topic_id: Uuid,
967
968    /// The state of each partition
969    ///
970    /// Supported API versions: 2-7
971    pub partition_states: Vec<LeaderAndIsrPartitionState>,
972
973    /// Other tagged fields
974    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
975}
976
977impl LeaderAndIsrTopicState {
978    /// Sets `topic_name` to the passed value.
979    ///
980    /// The topic name.
981    ///
982    /// Supported API versions: 2-7
983    pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
984        self.topic_name = value;
985        self
986    }
987    /// Sets `topic_id` to the passed value.
988    ///
989    /// The unique topic ID.
990    ///
991    /// Supported API versions: 5-7
992    pub fn with_topic_id(mut self, value: Uuid) -> Self {
993        self.topic_id = value;
994        self
995    }
996    /// Sets `partition_states` to the passed value.
997    ///
998    /// The state of each partition
999    ///
1000    /// Supported API versions: 2-7
1001    pub fn with_partition_states(mut self, value: Vec<LeaderAndIsrPartitionState>) -> Self {
1002        self.partition_states = value;
1003        self
1004    }
1005    /// Sets unknown_tagged_fields to the passed value.
1006    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1007        self.unknown_tagged_fields = value;
1008        self
1009    }
1010    /// Inserts an entry into unknown_tagged_fields.
1011    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1012        self.unknown_tagged_fields.insert(key, value);
1013        self
1014    }
1015}
1016
1017#[cfg(feature = "client")]
1018impl Encodable for LeaderAndIsrTopicState {
1019    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1020        if version < 0 || version > 7 {
1021            bail!("specified version not supported by this message type");
1022        }
1023        if version >= 2 {
1024            if version >= 4 {
1025                types::CompactString.encode(buf, &self.topic_name)?;
1026            } else {
1027                types::String.encode(buf, &self.topic_name)?;
1028            }
1029        } else {
1030            if !self.topic_name.is_empty() {
1031                bail!("A field is set that is not available on the selected protocol version");
1032            }
1033        }
1034        if version >= 5 {
1035            types::Uuid.encode(buf, &self.topic_id)?;
1036        }
1037        if version >= 2 {
1038            if version >= 4 {
1039                types::CompactArray(types::Struct { version })
1040                    .encode(buf, &self.partition_states)?;
1041            } else {
1042                types::Array(types::Struct { version }).encode(buf, &self.partition_states)?;
1043            }
1044        } else {
1045            if !self.partition_states.is_empty() {
1046                bail!("A field is set that is not available on the selected protocol version");
1047            }
1048        }
1049        if version >= 4 {
1050            let num_tagged_fields = self.unknown_tagged_fields.len();
1051            if num_tagged_fields > std::u32::MAX as usize {
1052                bail!(
1053                    "Too many tagged fields to encode ({} fields)",
1054                    num_tagged_fields
1055                );
1056            }
1057            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1058
1059            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1060        }
1061        Ok(())
1062    }
1063    fn compute_size(&self, version: i16) -> Result<usize> {
1064        let mut total_size = 0;
1065        if version >= 2 {
1066            if version >= 4 {
1067                total_size += types::CompactString.compute_size(&self.topic_name)?;
1068            } else {
1069                total_size += types::String.compute_size(&self.topic_name)?;
1070            }
1071        } else {
1072            if !self.topic_name.is_empty() {
1073                bail!("A field is set that is not available on the selected protocol version");
1074            }
1075        }
1076        if version >= 5 {
1077            total_size += types::Uuid.compute_size(&self.topic_id)?;
1078        }
1079        if version >= 2 {
1080            if version >= 4 {
1081                total_size += types::CompactArray(types::Struct { version })
1082                    .compute_size(&self.partition_states)?;
1083            } else {
1084                total_size +=
1085                    types::Array(types::Struct { version }).compute_size(&self.partition_states)?;
1086            }
1087        } else {
1088            if !self.partition_states.is_empty() {
1089                bail!("A field is set that is not available on the selected protocol version");
1090            }
1091        }
1092        if version >= 4 {
1093            let num_tagged_fields = self.unknown_tagged_fields.len();
1094            if num_tagged_fields > std::u32::MAX as usize {
1095                bail!(
1096                    "Too many tagged fields to encode ({} fields)",
1097                    num_tagged_fields
1098                );
1099            }
1100            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1101
1102            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1103        }
1104        Ok(total_size)
1105    }
1106}
1107
1108#[cfg(feature = "broker")]
1109impl Decodable for LeaderAndIsrTopicState {
1110    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1111        if version < 0 || version > 7 {
1112            bail!("specified version not supported by this message type");
1113        }
1114        let topic_name = if version >= 2 {
1115            if version >= 4 {
1116                types::CompactString.decode(buf)?
1117            } else {
1118                types::String.decode(buf)?
1119            }
1120        } else {
1121            Default::default()
1122        };
1123        let topic_id = if version >= 5 {
1124            types::Uuid.decode(buf)?
1125        } else {
1126            Uuid::nil()
1127        };
1128        let partition_states = if version >= 2 {
1129            if version >= 4 {
1130                types::CompactArray(types::Struct { version }).decode(buf)?
1131            } else {
1132                types::Array(types::Struct { version }).decode(buf)?
1133            }
1134        } else {
1135            Default::default()
1136        };
1137        let mut unknown_tagged_fields = BTreeMap::new();
1138        if version >= 4 {
1139            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1140            for _ in 0..num_tagged_fields {
1141                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1142                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1143                let unknown_value = buf.try_get_bytes(size as usize)?;
1144                unknown_tagged_fields.insert(tag as i32, unknown_value);
1145            }
1146        }
1147        Ok(Self {
1148            topic_name,
1149            topic_id,
1150            partition_states,
1151            unknown_tagged_fields,
1152        })
1153    }
1154}
1155
1156impl Default for LeaderAndIsrTopicState {
1157    fn default() -> Self {
1158        Self {
1159            topic_name: Default::default(),
1160            topic_id: Uuid::nil(),
1161            partition_states: Default::default(),
1162            unknown_tagged_fields: BTreeMap::new(),
1163        }
1164    }
1165}
1166
1167impl Message for LeaderAndIsrTopicState {
1168    const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
1169    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1170}
1171
1172impl HeaderVersion for LeaderAndIsrRequest {
1173    fn header_version(version: i16) -> i16 {
1174        if version >= 4 {
1175            2
1176        } else {
1177            1
1178        }
1179    }
1180}