kafka_protocol/messages/
leader_and_isr_request.rs

1//! LeaderAndIsrRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/LeaderAndIsrRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-7
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct LeaderAndIsrLiveLeader {
24    /// The leader's broker ID.
25    ///
26    /// Supported API versions: 0-7
27    pub broker_id: super::BrokerId,
28
29    /// The leader's hostname.
30    ///
31    /// Supported API versions: 0-7
32    pub host_name: StrBytes,
33
34    /// The leader's port.
35    ///
36    /// Supported API versions: 0-7
37    pub port: i32,
38
39    /// Other tagged fields
40    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl LeaderAndIsrLiveLeader {
44    /// Sets `broker_id` to the passed value.
45    ///
46    /// The leader's broker ID.
47    ///
48    /// Supported API versions: 0-7
49    pub fn with_broker_id(mut self, value: super::BrokerId) -> Self {
50        self.broker_id = value;
51        self
52    }
53    /// Sets `host_name` to the passed value.
54    ///
55    /// The leader's hostname.
56    ///
57    /// Supported API versions: 0-7
58    pub fn with_host_name(mut self, value: StrBytes) -> Self {
59        self.host_name = value;
60        self
61    }
62    /// Sets `port` to the passed value.
63    ///
64    /// The leader's port.
65    ///
66    /// Supported API versions: 0-7
67    pub fn with_port(mut self, value: i32) -> Self {
68        self.port = value;
69        self
70    }
71    /// Sets unknown_tagged_fields to the passed value.
72    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73        self.unknown_tagged_fields = value;
74        self
75    }
76    /// Inserts an entry into unknown_tagged_fields.
77    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78        self.unknown_tagged_fields.insert(key, value);
79        self
80    }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for LeaderAndIsrLiveLeader {
85    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86        types::Int32.encode(buf, &self.broker_id)?;
87        if version >= 4 {
88            types::CompactString.encode(buf, &self.host_name)?;
89        } else {
90            types::String.encode(buf, &self.host_name)?;
91        }
92        types::Int32.encode(buf, &self.port)?;
93        if version >= 4 {
94            let num_tagged_fields = self.unknown_tagged_fields.len();
95            if num_tagged_fields > std::u32::MAX as usize {
96                bail!(
97                    "Too many tagged fields to encode ({} fields)",
98                    num_tagged_fields
99                );
100            }
101            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
102
103            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
104        }
105        Ok(())
106    }
107    fn compute_size(&self, version: i16) -> Result<usize> {
108        let mut total_size = 0;
109        total_size += types::Int32.compute_size(&self.broker_id)?;
110        if version >= 4 {
111            total_size += types::CompactString.compute_size(&self.host_name)?;
112        } else {
113            total_size += types::String.compute_size(&self.host_name)?;
114        }
115        total_size += types::Int32.compute_size(&self.port)?;
116        if version >= 4 {
117            let num_tagged_fields = self.unknown_tagged_fields.len();
118            if num_tagged_fields > std::u32::MAX as usize {
119                bail!(
120                    "Too many tagged fields to encode ({} fields)",
121                    num_tagged_fields
122                );
123            }
124            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
125
126            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
127        }
128        Ok(total_size)
129    }
130}
131
132#[cfg(feature = "broker")]
133impl Decodable for LeaderAndIsrLiveLeader {
134    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
135        let broker_id = types::Int32.decode(buf)?;
136        let host_name = if version >= 4 {
137            types::CompactString.decode(buf)?
138        } else {
139            types::String.decode(buf)?
140        };
141        let port = types::Int32.decode(buf)?;
142        let mut unknown_tagged_fields = BTreeMap::new();
143        if version >= 4 {
144            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
145            for _ in 0..num_tagged_fields {
146                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
147                let size: u32 = types::UnsignedVarInt.decode(buf)?;
148                let unknown_value = buf.try_get_bytes(size as usize)?;
149                unknown_tagged_fields.insert(tag as i32, unknown_value);
150            }
151        }
152        Ok(Self {
153            broker_id,
154            host_name,
155            port,
156            unknown_tagged_fields,
157        })
158    }
159}
160
161impl Default for LeaderAndIsrLiveLeader {
162    fn default() -> Self {
163        Self {
164            broker_id: (0).into(),
165            host_name: Default::default(),
166            port: 0,
167            unknown_tagged_fields: BTreeMap::new(),
168        }
169    }
170}
171
172impl Message for LeaderAndIsrLiveLeader {
173    const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
174    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
175}
176
177/// Valid versions: 0-7
178#[non_exhaustive]
179#[derive(Debug, Clone, PartialEq)]
180pub struct LeaderAndIsrPartitionState {
181    /// The topic name.  This is only present in v0 or v1.
182    ///
183    /// Supported API versions: 0-1
184    pub topic_name: super::TopicName,
185
186    /// The partition index.
187    ///
188    /// Supported API versions: 0-7
189    pub partition_index: i32,
190
191    /// The controller epoch.
192    ///
193    /// Supported API versions: 0-7
194    pub controller_epoch: i32,
195
196    /// The broker ID of the leader.
197    ///
198    /// Supported API versions: 0-7
199    pub leader: super::BrokerId,
200
201    /// The leader epoch.
202    ///
203    /// Supported API versions: 0-7
204    pub leader_epoch: i32,
205
206    /// The in-sync replica IDs.
207    ///
208    /// Supported API versions: 0-7
209    pub isr: Vec<super::BrokerId>,
210
211    /// The current epoch for the partition. The epoch is a monotonically increasing value which is incremented after every partition change. (Since the LeaderAndIsr request is only used by the legacy controller, this corresponds to the zkVersion)
212    ///
213    /// Supported API versions: 0-7
214    pub partition_epoch: i32,
215
216    /// The replica IDs.
217    ///
218    /// Supported API versions: 0-7
219    pub replicas: Vec<super::BrokerId>,
220
221    /// The replica IDs that we are adding this partition to, or null if no replicas are being added.
222    ///
223    /// Supported API versions: 3-7
224    pub adding_replicas: Vec<super::BrokerId>,
225
226    /// The replica IDs that we are removing this partition from, or null if no replicas are being removed.
227    ///
228    /// Supported API versions: 3-7
229    pub removing_replicas: Vec<super::BrokerId>,
230
231    /// Whether the replica should have existed on the broker or not.
232    ///
233    /// Supported API versions: 1-7
234    pub is_new: bool,
235
236    /// 1 if the partition is recovering from an unclean leader election; 0 otherwise.
237    ///
238    /// Supported API versions: 6-7
239    pub leader_recovery_state: i8,
240
241    /// Other tagged fields
242    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
243}
244
245impl LeaderAndIsrPartitionState {
246    /// Sets `topic_name` to the passed value.
247    ///
248    /// The topic name.  This is only present in v0 or v1.
249    ///
250    /// Supported API versions: 0-1
251    pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
252        self.topic_name = value;
253        self
254    }
255    /// Sets `partition_index` to the passed value.
256    ///
257    /// The partition index.
258    ///
259    /// Supported API versions: 0-7
260    pub fn with_partition_index(mut self, value: i32) -> Self {
261        self.partition_index = value;
262        self
263    }
264    /// Sets `controller_epoch` to the passed value.
265    ///
266    /// The controller epoch.
267    ///
268    /// Supported API versions: 0-7
269    pub fn with_controller_epoch(mut self, value: i32) -> Self {
270        self.controller_epoch = value;
271        self
272    }
273    /// Sets `leader` to the passed value.
274    ///
275    /// The broker ID of the leader.
276    ///
277    /// Supported API versions: 0-7
278    pub fn with_leader(mut self, value: super::BrokerId) -> Self {
279        self.leader = value;
280        self
281    }
282    /// Sets `leader_epoch` to the passed value.
283    ///
284    /// The leader epoch.
285    ///
286    /// Supported API versions: 0-7
287    pub fn with_leader_epoch(mut self, value: i32) -> Self {
288        self.leader_epoch = value;
289        self
290    }
291    /// Sets `isr` to the passed value.
292    ///
293    /// The in-sync replica IDs.
294    ///
295    /// Supported API versions: 0-7
296    pub fn with_isr(mut self, value: Vec<super::BrokerId>) -> Self {
297        self.isr = value;
298        self
299    }
300    /// Sets `partition_epoch` to the passed value.
301    ///
302    /// The current epoch for the partition. The epoch is a monotonically increasing value which is incremented after every partition change. (Since the LeaderAndIsr request is only used by the legacy controller, this corresponds to the zkVersion)
303    ///
304    /// Supported API versions: 0-7
305    pub fn with_partition_epoch(mut self, value: i32) -> Self {
306        self.partition_epoch = value;
307        self
308    }
309    /// Sets `replicas` to the passed value.
310    ///
311    /// The replica IDs.
312    ///
313    /// Supported API versions: 0-7
314    pub fn with_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
315        self.replicas = value;
316        self
317    }
318    /// Sets `adding_replicas` to the passed value.
319    ///
320    /// The replica IDs that we are adding this partition to, or null if no replicas are being added.
321    ///
322    /// Supported API versions: 3-7
323    pub fn with_adding_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
324        self.adding_replicas = value;
325        self
326    }
327    /// Sets `removing_replicas` to the passed value.
328    ///
329    /// The replica IDs that we are removing this partition from, or null if no replicas are being removed.
330    ///
331    /// Supported API versions: 3-7
332    pub fn with_removing_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
333        self.removing_replicas = value;
334        self
335    }
336    /// Sets `is_new` to the passed value.
337    ///
338    /// Whether the replica should have existed on the broker or not.
339    ///
340    /// Supported API versions: 1-7
341    pub fn with_is_new(mut self, value: bool) -> Self {
342        self.is_new = value;
343        self
344    }
345    /// Sets `leader_recovery_state` to the passed value.
346    ///
347    /// 1 if the partition is recovering from an unclean leader election; 0 otherwise.
348    ///
349    /// Supported API versions: 6-7
350    pub fn with_leader_recovery_state(mut self, value: i8) -> Self {
351        self.leader_recovery_state = value;
352        self
353    }
354    /// Sets unknown_tagged_fields to the passed value.
355    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
356        self.unknown_tagged_fields = value;
357        self
358    }
359    /// Inserts an entry into unknown_tagged_fields.
360    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
361        self.unknown_tagged_fields.insert(key, value);
362        self
363    }
364}
365
366#[cfg(feature = "client")]
367impl Encodable for LeaderAndIsrPartitionState {
368    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
369        if version <= 1 {
370            types::String.encode(buf, &self.topic_name)?;
371        }
372        types::Int32.encode(buf, &self.partition_index)?;
373        types::Int32.encode(buf, &self.controller_epoch)?;
374        types::Int32.encode(buf, &self.leader)?;
375        types::Int32.encode(buf, &self.leader_epoch)?;
376        if version >= 4 {
377            types::CompactArray(types::Int32).encode(buf, &self.isr)?;
378        } else {
379            types::Array(types::Int32).encode(buf, &self.isr)?;
380        }
381        types::Int32.encode(buf, &self.partition_epoch)?;
382        if version >= 4 {
383            types::CompactArray(types::Int32).encode(buf, &self.replicas)?;
384        } else {
385            types::Array(types::Int32).encode(buf, &self.replicas)?;
386        }
387        if version >= 3 {
388            if version >= 4 {
389                types::CompactArray(types::Int32).encode(buf, &self.adding_replicas)?;
390            } else {
391                types::Array(types::Int32).encode(buf, &self.adding_replicas)?;
392            }
393        }
394        if version >= 3 {
395            if version >= 4 {
396                types::CompactArray(types::Int32).encode(buf, &self.removing_replicas)?;
397            } else {
398                types::Array(types::Int32).encode(buf, &self.removing_replicas)?;
399            }
400        }
401        if version >= 1 {
402            types::Boolean.encode(buf, &self.is_new)?;
403        }
404        if version >= 6 {
405            types::Int8.encode(buf, &self.leader_recovery_state)?;
406        } else {
407            if self.leader_recovery_state != 0 {
408                bail!("A field is set that is not available on the selected protocol version");
409            }
410        }
411        if version >= 4 {
412            let num_tagged_fields = self.unknown_tagged_fields.len();
413            if num_tagged_fields > std::u32::MAX as usize {
414                bail!(
415                    "Too many tagged fields to encode ({} fields)",
416                    num_tagged_fields
417                );
418            }
419            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
420
421            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
422        }
423        Ok(())
424    }
425    fn compute_size(&self, version: i16) -> Result<usize> {
426        let mut total_size = 0;
427        if version <= 1 {
428            total_size += types::String.compute_size(&self.topic_name)?;
429        }
430        total_size += types::Int32.compute_size(&self.partition_index)?;
431        total_size += types::Int32.compute_size(&self.controller_epoch)?;
432        total_size += types::Int32.compute_size(&self.leader)?;
433        total_size += types::Int32.compute_size(&self.leader_epoch)?;
434        if version >= 4 {
435            total_size += types::CompactArray(types::Int32).compute_size(&self.isr)?;
436        } else {
437            total_size += types::Array(types::Int32).compute_size(&self.isr)?;
438        }
439        total_size += types::Int32.compute_size(&self.partition_epoch)?;
440        if version >= 4 {
441            total_size += types::CompactArray(types::Int32).compute_size(&self.replicas)?;
442        } else {
443            total_size += types::Array(types::Int32).compute_size(&self.replicas)?;
444        }
445        if version >= 3 {
446            if version >= 4 {
447                total_size +=
448                    types::CompactArray(types::Int32).compute_size(&self.adding_replicas)?;
449            } else {
450                total_size += types::Array(types::Int32).compute_size(&self.adding_replicas)?;
451            }
452        }
453        if version >= 3 {
454            if version >= 4 {
455                total_size +=
456                    types::CompactArray(types::Int32).compute_size(&self.removing_replicas)?;
457            } else {
458                total_size += types::Array(types::Int32).compute_size(&self.removing_replicas)?;
459            }
460        }
461        if version >= 1 {
462            total_size += types::Boolean.compute_size(&self.is_new)?;
463        }
464        if version >= 6 {
465            total_size += types::Int8.compute_size(&self.leader_recovery_state)?;
466        } else {
467            if self.leader_recovery_state != 0 {
468                bail!("A field is set that is not available on the selected protocol version");
469            }
470        }
471        if version >= 4 {
472            let num_tagged_fields = self.unknown_tagged_fields.len();
473            if num_tagged_fields > std::u32::MAX as usize {
474                bail!(
475                    "Too many tagged fields to encode ({} fields)",
476                    num_tagged_fields
477                );
478            }
479            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
480
481            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
482        }
483        Ok(total_size)
484    }
485}
486
487#[cfg(feature = "broker")]
488impl Decodable for LeaderAndIsrPartitionState {
489    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
490        let topic_name = if version <= 1 {
491            types::String.decode(buf)?
492        } else {
493            Default::default()
494        };
495        let partition_index = types::Int32.decode(buf)?;
496        let controller_epoch = types::Int32.decode(buf)?;
497        let leader = types::Int32.decode(buf)?;
498        let leader_epoch = types::Int32.decode(buf)?;
499        let isr = if version >= 4 {
500            types::CompactArray(types::Int32).decode(buf)?
501        } else {
502            types::Array(types::Int32).decode(buf)?
503        };
504        let partition_epoch = types::Int32.decode(buf)?;
505        let replicas = if version >= 4 {
506            types::CompactArray(types::Int32).decode(buf)?
507        } else {
508            types::Array(types::Int32).decode(buf)?
509        };
510        let adding_replicas = if version >= 3 {
511            if version >= 4 {
512                types::CompactArray(types::Int32).decode(buf)?
513            } else {
514                types::Array(types::Int32).decode(buf)?
515            }
516        } else {
517            Default::default()
518        };
519        let removing_replicas = if version >= 3 {
520            if version >= 4 {
521                types::CompactArray(types::Int32).decode(buf)?
522            } else {
523                types::Array(types::Int32).decode(buf)?
524            }
525        } else {
526            Default::default()
527        };
528        let is_new = if version >= 1 {
529            types::Boolean.decode(buf)?
530        } else {
531            false
532        };
533        let leader_recovery_state = if version >= 6 {
534            types::Int8.decode(buf)?
535        } else {
536            0
537        };
538        let mut unknown_tagged_fields = BTreeMap::new();
539        if version >= 4 {
540            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
541            for _ in 0..num_tagged_fields {
542                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
543                let size: u32 = types::UnsignedVarInt.decode(buf)?;
544                let unknown_value = buf.try_get_bytes(size as usize)?;
545                unknown_tagged_fields.insert(tag as i32, unknown_value);
546            }
547        }
548        Ok(Self {
549            topic_name,
550            partition_index,
551            controller_epoch,
552            leader,
553            leader_epoch,
554            isr,
555            partition_epoch,
556            replicas,
557            adding_replicas,
558            removing_replicas,
559            is_new,
560            leader_recovery_state,
561            unknown_tagged_fields,
562        })
563    }
564}
565
566impl Default for LeaderAndIsrPartitionState {
567    fn default() -> Self {
568        Self {
569            topic_name: Default::default(),
570            partition_index: 0,
571            controller_epoch: 0,
572            leader: (0).into(),
573            leader_epoch: 0,
574            isr: Default::default(),
575            partition_epoch: 0,
576            replicas: Default::default(),
577            adding_replicas: Default::default(),
578            removing_replicas: Default::default(),
579            is_new: false,
580            leader_recovery_state: 0,
581            unknown_tagged_fields: BTreeMap::new(),
582        }
583    }
584}
585
586impl Message for LeaderAndIsrPartitionState {
587    const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
588    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
589}
590
591/// Valid versions: 0-7
592#[non_exhaustive]
593#[derive(Debug, Clone, PartialEq)]
594pub struct LeaderAndIsrRequest {
595    /// The current controller ID.
596    ///
597    /// Supported API versions: 0-7
598    pub controller_id: super::BrokerId,
599
600    /// If KRaft controller id is used during migration. See KIP-866
601    ///
602    /// Supported API versions: 7
603    pub is_k_raft_controller: bool,
604
605    /// The current controller epoch.
606    ///
607    /// Supported API versions: 0-7
608    pub controller_epoch: i32,
609
610    /// The current broker epoch.
611    ///
612    /// Supported API versions: 2-7
613    pub broker_epoch: i64,
614
615    /// The type that indicates whether all topics are included in the request
616    ///
617    /// Supported API versions: 5-7
618    pub _type: i8,
619
620    /// The state of each partition, in a v0 or v1 message.
621    ///
622    /// Supported API versions: 0-1
623    pub ungrouped_partition_states: Vec<LeaderAndIsrPartitionState>,
624
625    /// Each topic.
626    ///
627    /// Supported API versions: 2-7
628    pub topic_states: Vec<LeaderAndIsrTopicState>,
629
630    /// The current live leaders.
631    ///
632    /// Supported API versions: 0-7
633    pub live_leaders: Vec<LeaderAndIsrLiveLeader>,
634
635    /// Other tagged fields
636    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
637}
638
639impl LeaderAndIsrRequest {
640    /// Sets `controller_id` to the passed value.
641    ///
642    /// The current controller ID.
643    ///
644    /// Supported API versions: 0-7
645    pub fn with_controller_id(mut self, value: super::BrokerId) -> Self {
646        self.controller_id = value;
647        self
648    }
649    /// Sets `is_k_raft_controller` to the passed value.
650    ///
651    /// If KRaft controller id is used during migration. See KIP-866
652    ///
653    /// Supported API versions: 7
654    pub fn with_is_k_raft_controller(mut self, value: bool) -> Self {
655        self.is_k_raft_controller = value;
656        self
657    }
658    /// Sets `controller_epoch` to the passed value.
659    ///
660    /// The current controller epoch.
661    ///
662    /// Supported API versions: 0-7
663    pub fn with_controller_epoch(mut self, value: i32) -> Self {
664        self.controller_epoch = value;
665        self
666    }
667    /// Sets `broker_epoch` to the passed value.
668    ///
669    /// The current broker epoch.
670    ///
671    /// Supported API versions: 2-7
672    pub fn with_broker_epoch(mut self, value: i64) -> Self {
673        self.broker_epoch = value;
674        self
675    }
676    /// Sets `_type` to the passed value.
677    ///
678    /// The type that indicates whether all topics are included in the request
679    ///
680    /// Supported API versions: 5-7
681    pub fn with_type(mut self, value: i8) -> Self {
682        self._type = value;
683        self
684    }
685    /// Sets `ungrouped_partition_states` to the passed value.
686    ///
687    /// The state of each partition, in a v0 or v1 message.
688    ///
689    /// Supported API versions: 0-1
690    pub fn with_ungrouped_partition_states(
691        mut self,
692        value: Vec<LeaderAndIsrPartitionState>,
693    ) -> Self {
694        self.ungrouped_partition_states = value;
695        self
696    }
697    /// Sets `topic_states` to the passed value.
698    ///
699    /// Each topic.
700    ///
701    /// Supported API versions: 2-7
702    pub fn with_topic_states(mut self, value: Vec<LeaderAndIsrTopicState>) -> Self {
703        self.topic_states = value;
704        self
705    }
706    /// Sets `live_leaders` to the passed value.
707    ///
708    /// The current live leaders.
709    ///
710    /// Supported API versions: 0-7
711    pub fn with_live_leaders(mut self, value: Vec<LeaderAndIsrLiveLeader>) -> Self {
712        self.live_leaders = value;
713        self
714    }
715    /// Sets unknown_tagged_fields to the passed value.
716    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
717        self.unknown_tagged_fields = value;
718        self
719    }
720    /// Inserts an entry into unknown_tagged_fields.
721    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
722        self.unknown_tagged_fields.insert(key, value);
723        self
724    }
725}
726
727#[cfg(feature = "client")]
728impl Encodable for LeaderAndIsrRequest {
729    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
730        types::Int32.encode(buf, &self.controller_id)?;
731        if version >= 7 {
732            types::Boolean.encode(buf, &self.is_k_raft_controller)?;
733        } else {
734            if self.is_k_raft_controller {
735                bail!("A field is set that is not available on the selected protocol version");
736            }
737        }
738        types::Int32.encode(buf, &self.controller_epoch)?;
739        if version >= 2 {
740            types::Int64.encode(buf, &self.broker_epoch)?;
741        }
742        if version >= 5 {
743            types::Int8.encode(buf, &self._type)?;
744        } else {
745            if self._type != 0 {
746                bail!("A field is set that is not available on the selected protocol version");
747            }
748        }
749        if version <= 1 {
750            types::Array(types::Struct { version })
751                .encode(buf, &self.ungrouped_partition_states)?;
752        } else {
753            if !self.ungrouped_partition_states.is_empty() {
754                bail!("A field is set that is not available on the selected protocol version");
755            }
756        }
757        if version >= 2 {
758            if version >= 4 {
759                types::CompactArray(types::Struct { version }).encode(buf, &self.topic_states)?;
760            } else {
761                types::Array(types::Struct { version }).encode(buf, &self.topic_states)?;
762            }
763        } else {
764            if !self.topic_states.is_empty() {
765                bail!("A field is set that is not available on the selected protocol version");
766            }
767        }
768        if version >= 4 {
769            types::CompactArray(types::Struct { version }).encode(buf, &self.live_leaders)?;
770        } else {
771            types::Array(types::Struct { version }).encode(buf, &self.live_leaders)?;
772        }
773        if version >= 4 {
774            let num_tagged_fields = self.unknown_tagged_fields.len();
775            if num_tagged_fields > std::u32::MAX as usize {
776                bail!(
777                    "Too many tagged fields to encode ({} fields)",
778                    num_tagged_fields
779                );
780            }
781            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
782
783            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
784        }
785        Ok(())
786    }
787    fn compute_size(&self, version: i16) -> Result<usize> {
788        let mut total_size = 0;
789        total_size += types::Int32.compute_size(&self.controller_id)?;
790        if version >= 7 {
791            total_size += types::Boolean.compute_size(&self.is_k_raft_controller)?;
792        } else {
793            if self.is_k_raft_controller {
794                bail!("A field is set that is not available on the selected protocol version");
795            }
796        }
797        total_size += types::Int32.compute_size(&self.controller_epoch)?;
798        if version >= 2 {
799            total_size += types::Int64.compute_size(&self.broker_epoch)?;
800        }
801        if version >= 5 {
802            total_size += types::Int8.compute_size(&self._type)?;
803        } else {
804            if self._type != 0 {
805                bail!("A field is set that is not available on the selected protocol version");
806            }
807        }
808        if version <= 1 {
809            total_size += types::Array(types::Struct { version })
810                .compute_size(&self.ungrouped_partition_states)?;
811        } else {
812            if !self.ungrouped_partition_states.is_empty() {
813                bail!("A field is set that is not available on the selected protocol version");
814            }
815        }
816        if version >= 2 {
817            if version >= 4 {
818                total_size += types::CompactArray(types::Struct { version })
819                    .compute_size(&self.topic_states)?;
820            } else {
821                total_size +=
822                    types::Array(types::Struct { version }).compute_size(&self.topic_states)?;
823            }
824        } else {
825            if !self.topic_states.is_empty() {
826                bail!("A field is set that is not available on the selected protocol version");
827            }
828        }
829        if version >= 4 {
830            total_size +=
831                types::CompactArray(types::Struct { version }).compute_size(&self.live_leaders)?;
832        } else {
833            total_size +=
834                types::Array(types::Struct { version }).compute_size(&self.live_leaders)?;
835        }
836        if version >= 4 {
837            let num_tagged_fields = self.unknown_tagged_fields.len();
838            if num_tagged_fields > std::u32::MAX as usize {
839                bail!(
840                    "Too many tagged fields to encode ({} fields)",
841                    num_tagged_fields
842                );
843            }
844            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
845
846            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
847        }
848        Ok(total_size)
849    }
850}
851
852#[cfg(feature = "broker")]
853impl Decodable for LeaderAndIsrRequest {
854    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
855        let controller_id = types::Int32.decode(buf)?;
856        let is_k_raft_controller = if version >= 7 {
857            types::Boolean.decode(buf)?
858        } else {
859            false
860        };
861        let controller_epoch = types::Int32.decode(buf)?;
862        let broker_epoch = if version >= 2 {
863            types::Int64.decode(buf)?
864        } else {
865            -1
866        };
867        let _type = if version >= 5 {
868            types::Int8.decode(buf)?
869        } else {
870            0
871        };
872        let ungrouped_partition_states = if version <= 1 {
873            types::Array(types::Struct { version }).decode(buf)?
874        } else {
875            Default::default()
876        };
877        let topic_states = if version >= 2 {
878            if version >= 4 {
879                types::CompactArray(types::Struct { version }).decode(buf)?
880            } else {
881                types::Array(types::Struct { version }).decode(buf)?
882            }
883        } else {
884            Default::default()
885        };
886        let live_leaders = if version >= 4 {
887            types::CompactArray(types::Struct { version }).decode(buf)?
888        } else {
889            types::Array(types::Struct { version }).decode(buf)?
890        };
891        let mut unknown_tagged_fields = BTreeMap::new();
892        if version >= 4 {
893            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
894            for _ in 0..num_tagged_fields {
895                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
896                let size: u32 = types::UnsignedVarInt.decode(buf)?;
897                let unknown_value = buf.try_get_bytes(size as usize)?;
898                unknown_tagged_fields.insert(tag as i32, unknown_value);
899            }
900        }
901        Ok(Self {
902            controller_id,
903            is_k_raft_controller,
904            controller_epoch,
905            broker_epoch,
906            _type,
907            ungrouped_partition_states,
908            topic_states,
909            live_leaders,
910            unknown_tagged_fields,
911        })
912    }
913}
914
915impl Default for LeaderAndIsrRequest {
916    fn default() -> Self {
917        Self {
918            controller_id: (0).into(),
919            is_k_raft_controller: false,
920            controller_epoch: 0,
921            broker_epoch: -1,
922            _type: 0,
923            ungrouped_partition_states: Default::default(),
924            topic_states: Default::default(),
925            live_leaders: Default::default(),
926            unknown_tagged_fields: BTreeMap::new(),
927        }
928    }
929}
930
931impl Message for LeaderAndIsrRequest {
932    const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
933    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
934}
935
936/// Valid versions: 0-7
937#[non_exhaustive]
938#[derive(Debug, Clone, PartialEq)]
939pub struct LeaderAndIsrTopicState {
940    /// The topic name.
941    ///
942    /// Supported API versions: 2-7
943    pub topic_name: super::TopicName,
944
945    /// The unique topic ID.
946    ///
947    /// Supported API versions: 5-7
948    pub topic_id: Uuid,
949
950    /// The state of each partition
951    ///
952    /// Supported API versions: 2-7
953    pub partition_states: Vec<LeaderAndIsrPartitionState>,
954
955    /// Other tagged fields
956    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
957}
958
959impl LeaderAndIsrTopicState {
960    /// Sets `topic_name` to the passed value.
961    ///
962    /// The topic name.
963    ///
964    /// Supported API versions: 2-7
965    pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
966        self.topic_name = value;
967        self
968    }
969    /// Sets `topic_id` to the passed value.
970    ///
971    /// The unique topic ID.
972    ///
973    /// Supported API versions: 5-7
974    pub fn with_topic_id(mut self, value: Uuid) -> Self {
975        self.topic_id = value;
976        self
977    }
978    /// Sets `partition_states` to the passed value.
979    ///
980    /// The state of each partition
981    ///
982    /// Supported API versions: 2-7
983    pub fn with_partition_states(mut self, value: Vec<LeaderAndIsrPartitionState>) -> Self {
984        self.partition_states = value;
985        self
986    }
987    /// Sets unknown_tagged_fields to the passed value.
988    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
989        self.unknown_tagged_fields = value;
990        self
991    }
992    /// Inserts an entry into unknown_tagged_fields.
993    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
994        self.unknown_tagged_fields.insert(key, value);
995        self
996    }
997}
998
999#[cfg(feature = "client")]
1000impl Encodable for LeaderAndIsrTopicState {
1001    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1002        if version >= 2 {
1003            if version >= 4 {
1004                types::CompactString.encode(buf, &self.topic_name)?;
1005            } else {
1006                types::String.encode(buf, &self.topic_name)?;
1007            }
1008        } else {
1009            if !self.topic_name.is_empty() {
1010                bail!("A field is set that is not available on the selected protocol version");
1011            }
1012        }
1013        if version >= 5 {
1014            types::Uuid.encode(buf, &self.topic_id)?;
1015        }
1016        if version >= 2 {
1017            if version >= 4 {
1018                types::CompactArray(types::Struct { version })
1019                    .encode(buf, &self.partition_states)?;
1020            } else {
1021                types::Array(types::Struct { version }).encode(buf, &self.partition_states)?;
1022            }
1023        } else {
1024            if !self.partition_states.is_empty() {
1025                bail!("A field is set that is not available on the selected protocol version");
1026            }
1027        }
1028        if version >= 4 {
1029            let num_tagged_fields = self.unknown_tagged_fields.len();
1030            if num_tagged_fields > std::u32::MAX as usize {
1031                bail!(
1032                    "Too many tagged fields to encode ({} fields)",
1033                    num_tagged_fields
1034                );
1035            }
1036            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1037
1038            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1039        }
1040        Ok(())
1041    }
1042    fn compute_size(&self, version: i16) -> Result<usize> {
1043        let mut total_size = 0;
1044        if version >= 2 {
1045            if version >= 4 {
1046                total_size += types::CompactString.compute_size(&self.topic_name)?;
1047            } else {
1048                total_size += types::String.compute_size(&self.topic_name)?;
1049            }
1050        } else {
1051            if !self.topic_name.is_empty() {
1052                bail!("A field is set that is not available on the selected protocol version");
1053            }
1054        }
1055        if version >= 5 {
1056            total_size += types::Uuid.compute_size(&self.topic_id)?;
1057        }
1058        if version >= 2 {
1059            if version >= 4 {
1060                total_size += types::CompactArray(types::Struct { version })
1061                    .compute_size(&self.partition_states)?;
1062            } else {
1063                total_size +=
1064                    types::Array(types::Struct { version }).compute_size(&self.partition_states)?;
1065            }
1066        } else {
1067            if !self.partition_states.is_empty() {
1068                bail!("A field is set that is not available on the selected protocol version");
1069            }
1070        }
1071        if version >= 4 {
1072            let num_tagged_fields = self.unknown_tagged_fields.len();
1073            if num_tagged_fields > std::u32::MAX as usize {
1074                bail!(
1075                    "Too many tagged fields to encode ({} fields)",
1076                    num_tagged_fields
1077                );
1078            }
1079            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1080
1081            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1082        }
1083        Ok(total_size)
1084    }
1085}
1086
1087#[cfg(feature = "broker")]
1088impl Decodable for LeaderAndIsrTopicState {
1089    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1090        let topic_name = if version >= 2 {
1091            if version >= 4 {
1092                types::CompactString.decode(buf)?
1093            } else {
1094                types::String.decode(buf)?
1095            }
1096        } else {
1097            Default::default()
1098        };
1099        let topic_id = if version >= 5 {
1100            types::Uuid.decode(buf)?
1101        } else {
1102            Uuid::nil()
1103        };
1104        let partition_states = if version >= 2 {
1105            if version >= 4 {
1106                types::CompactArray(types::Struct { version }).decode(buf)?
1107            } else {
1108                types::Array(types::Struct { version }).decode(buf)?
1109            }
1110        } else {
1111            Default::default()
1112        };
1113        let mut unknown_tagged_fields = BTreeMap::new();
1114        if version >= 4 {
1115            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1116            for _ in 0..num_tagged_fields {
1117                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1118                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1119                let unknown_value = buf.try_get_bytes(size as usize)?;
1120                unknown_tagged_fields.insert(tag as i32, unknown_value);
1121            }
1122        }
1123        Ok(Self {
1124            topic_name,
1125            topic_id,
1126            partition_states,
1127            unknown_tagged_fields,
1128        })
1129    }
1130}
1131
1132impl Default for LeaderAndIsrTopicState {
1133    fn default() -> Self {
1134        Self {
1135            topic_name: Default::default(),
1136            topic_id: Uuid::nil(),
1137            partition_states: Default::default(),
1138            unknown_tagged_fields: BTreeMap::new(),
1139        }
1140    }
1141}
1142
1143impl Message for LeaderAndIsrTopicState {
1144    const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
1145    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1146}
1147
1148impl HeaderVersion for LeaderAndIsrRequest {
1149    fn header_version(version: i16) -> i16 {
1150        if version >= 4 {
1151            2
1152        } else {
1153            1
1154        }
1155    }
1156}