1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct LeaderAndIsrLiveLeader {
24 pub broker_id: super::BrokerId,
28
29 pub host_name: StrBytes,
33
34 pub port: i32,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl LeaderAndIsrLiveLeader {
44 pub fn with_broker_id(mut self, value: super::BrokerId) -> Self {
50 self.broker_id = value;
51 self
52 }
53 pub fn with_host_name(mut self, value: StrBytes) -> Self {
59 self.host_name = value;
60 self
61 }
62 pub fn with_port(mut self, value: i32) -> Self {
68 self.port = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for LeaderAndIsrLiveLeader {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 types::Int32.encode(buf, &self.broker_id)?;
87 if version >= 4 {
88 types::CompactString.encode(buf, &self.host_name)?;
89 } else {
90 types::String.encode(buf, &self.host_name)?;
91 }
92 types::Int32.encode(buf, &self.port)?;
93 if version >= 4 {
94 let num_tagged_fields = self.unknown_tagged_fields.len();
95 if num_tagged_fields > std::u32::MAX as usize {
96 bail!(
97 "Too many tagged fields to encode ({} fields)",
98 num_tagged_fields
99 );
100 }
101 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
102
103 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
104 }
105 Ok(())
106 }
107 fn compute_size(&self, version: i16) -> Result<usize> {
108 let mut total_size = 0;
109 total_size += types::Int32.compute_size(&self.broker_id)?;
110 if version >= 4 {
111 total_size += types::CompactString.compute_size(&self.host_name)?;
112 } else {
113 total_size += types::String.compute_size(&self.host_name)?;
114 }
115 total_size += types::Int32.compute_size(&self.port)?;
116 if version >= 4 {
117 let num_tagged_fields = self.unknown_tagged_fields.len();
118 if num_tagged_fields > std::u32::MAX as usize {
119 bail!(
120 "Too many tagged fields to encode ({} fields)",
121 num_tagged_fields
122 );
123 }
124 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
125
126 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
127 }
128 Ok(total_size)
129 }
130}
131
132#[cfg(feature = "broker")]
133impl Decodable for LeaderAndIsrLiveLeader {
134 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
135 let broker_id = types::Int32.decode(buf)?;
136 let host_name = if version >= 4 {
137 types::CompactString.decode(buf)?
138 } else {
139 types::String.decode(buf)?
140 };
141 let port = types::Int32.decode(buf)?;
142 let mut unknown_tagged_fields = BTreeMap::new();
143 if version >= 4 {
144 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
145 for _ in 0..num_tagged_fields {
146 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
147 let size: u32 = types::UnsignedVarInt.decode(buf)?;
148 let unknown_value = buf.try_get_bytes(size as usize)?;
149 unknown_tagged_fields.insert(tag as i32, unknown_value);
150 }
151 }
152 Ok(Self {
153 broker_id,
154 host_name,
155 port,
156 unknown_tagged_fields,
157 })
158 }
159}
160
161impl Default for LeaderAndIsrLiveLeader {
162 fn default() -> Self {
163 Self {
164 broker_id: (0).into(),
165 host_name: Default::default(),
166 port: 0,
167 unknown_tagged_fields: BTreeMap::new(),
168 }
169 }
170}
171
172impl Message for LeaderAndIsrLiveLeader {
173 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
174 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
175}
176
177#[non_exhaustive]
179#[derive(Debug, Clone, PartialEq)]
180pub struct LeaderAndIsrPartitionState {
181 pub topic_name: super::TopicName,
185
186 pub partition_index: i32,
190
191 pub controller_epoch: i32,
195
196 pub leader: super::BrokerId,
200
201 pub leader_epoch: i32,
205
206 pub isr: Vec<super::BrokerId>,
210
211 pub partition_epoch: i32,
215
216 pub replicas: Vec<super::BrokerId>,
220
221 pub adding_replicas: Vec<super::BrokerId>,
225
226 pub removing_replicas: Vec<super::BrokerId>,
230
231 pub is_new: bool,
235
236 pub leader_recovery_state: i8,
240
241 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
243}
244
245impl LeaderAndIsrPartitionState {
246 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
252 self.topic_name = value;
253 self
254 }
255 pub fn with_partition_index(mut self, value: i32) -> Self {
261 self.partition_index = value;
262 self
263 }
264 pub fn with_controller_epoch(mut self, value: i32) -> Self {
270 self.controller_epoch = value;
271 self
272 }
273 pub fn with_leader(mut self, value: super::BrokerId) -> Self {
279 self.leader = value;
280 self
281 }
282 pub fn with_leader_epoch(mut self, value: i32) -> Self {
288 self.leader_epoch = value;
289 self
290 }
291 pub fn with_isr(mut self, value: Vec<super::BrokerId>) -> Self {
297 self.isr = value;
298 self
299 }
300 pub fn with_partition_epoch(mut self, value: i32) -> Self {
306 self.partition_epoch = value;
307 self
308 }
309 pub fn with_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
315 self.replicas = value;
316 self
317 }
318 pub fn with_adding_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
324 self.adding_replicas = value;
325 self
326 }
327 pub fn with_removing_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
333 self.removing_replicas = value;
334 self
335 }
336 pub fn with_is_new(mut self, value: bool) -> Self {
342 self.is_new = value;
343 self
344 }
345 pub fn with_leader_recovery_state(mut self, value: i8) -> Self {
351 self.leader_recovery_state = value;
352 self
353 }
354 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
356 self.unknown_tagged_fields = value;
357 self
358 }
359 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
361 self.unknown_tagged_fields.insert(key, value);
362 self
363 }
364}
365
366#[cfg(feature = "client")]
367impl Encodable for LeaderAndIsrPartitionState {
368 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
369 if version <= 1 {
370 types::String.encode(buf, &self.topic_name)?;
371 }
372 types::Int32.encode(buf, &self.partition_index)?;
373 types::Int32.encode(buf, &self.controller_epoch)?;
374 types::Int32.encode(buf, &self.leader)?;
375 types::Int32.encode(buf, &self.leader_epoch)?;
376 if version >= 4 {
377 types::CompactArray(types::Int32).encode(buf, &self.isr)?;
378 } else {
379 types::Array(types::Int32).encode(buf, &self.isr)?;
380 }
381 types::Int32.encode(buf, &self.partition_epoch)?;
382 if version >= 4 {
383 types::CompactArray(types::Int32).encode(buf, &self.replicas)?;
384 } else {
385 types::Array(types::Int32).encode(buf, &self.replicas)?;
386 }
387 if version >= 3 {
388 if version >= 4 {
389 types::CompactArray(types::Int32).encode(buf, &self.adding_replicas)?;
390 } else {
391 types::Array(types::Int32).encode(buf, &self.adding_replicas)?;
392 }
393 }
394 if version >= 3 {
395 if version >= 4 {
396 types::CompactArray(types::Int32).encode(buf, &self.removing_replicas)?;
397 } else {
398 types::Array(types::Int32).encode(buf, &self.removing_replicas)?;
399 }
400 }
401 if version >= 1 {
402 types::Boolean.encode(buf, &self.is_new)?;
403 }
404 if version >= 6 {
405 types::Int8.encode(buf, &self.leader_recovery_state)?;
406 } else {
407 if self.leader_recovery_state != 0 {
408 bail!("A field is set that is not available on the selected protocol version");
409 }
410 }
411 if version >= 4 {
412 let num_tagged_fields = self.unknown_tagged_fields.len();
413 if num_tagged_fields > std::u32::MAX as usize {
414 bail!(
415 "Too many tagged fields to encode ({} fields)",
416 num_tagged_fields
417 );
418 }
419 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
420
421 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
422 }
423 Ok(())
424 }
425 fn compute_size(&self, version: i16) -> Result<usize> {
426 let mut total_size = 0;
427 if version <= 1 {
428 total_size += types::String.compute_size(&self.topic_name)?;
429 }
430 total_size += types::Int32.compute_size(&self.partition_index)?;
431 total_size += types::Int32.compute_size(&self.controller_epoch)?;
432 total_size += types::Int32.compute_size(&self.leader)?;
433 total_size += types::Int32.compute_size(&self.leader_epoch)?;
434 if version >= 4 {
435 total_size += types::CompactArray(types::Int32).compute_size(&self.isr)?;
436 } else {
437 total_size += types::Array(types::Int32).compute_size(&self.isr)?;
438 }
439 total_size += types::Int32.compute_size(&self.partition_epoch)?;
440 if version >= 4 {
441 total_size += types::CompactArray(types::Int32).compute_size(&self.replicas)?;
442 } else {
443 total_size += types::Array(types::Int32).compute_size(&self.replicas)?;
444 }
445 if version >= 3 {
446 if version >= 4 {
447 total_size +=
448 types::CompactArray(types::Int32).compute_size(&self.adding_replicas)?;
449 } else {
450 total_size += types::Array(types::Int32).compute_size(&self.adding_replicas)?;
451 }
452 }
453 if version >= 3 {
454 if version >= 4 {
455 total_size +=
456 types::CompactArray(types::Int32).compute_size(&self.removing_replicas)?;
457 } else {
458 total_size += types::Array(types::Int32).compute_size(&self.removing_replicas)?;
459 }
460 }
461 if version >= 1 {
462 total_size += types::Boolean.compute_size(&self.is_new)?;
463 }
464 if version >= 6 {
465 total_size += types::Int8.compute_size(&self.leader_recovery_state)?;
466 } else {
467 if self.leader_recovery_state != 0 {
468 bail!("A field is set that is not available on the selected protocol version");
469 }
470 }
471 if version >= 4 {
472 let num_tagged_fields = self.unknown_tagged_fields.len();
473 if num_tagged_fields > std::u32::MAX as usize {
474 bail!(
475 "Too many tagged fields to encode ({} fields)",
476 num_tagged_fields
477 );
478 }
479 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
480
481 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
482 }
483 Ok(total_size)
484 }
485}
486
487#[cfg(feature = "broker")]
488impl Decodable for LeaderAndIsrPartitionState {
489 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
490 let topic_name = if version <= 1 {
491 types::String.decode(buf)?
492 } else {
493 Default::default()
494 };
495 let partition_index = types::Int32.decode(buf)?;
496 let controller_epoch = types::Int32.decode(buf)?;
497 let leader = types::Int32.decode(buf)?;
498 let leader_epoch = types::Int32.decode(buf)?;
499 let isr = if version >= 4 {
500 types::CompactArray(types::Int32).decode(buf)?
501 } else {
502 types::Array(types::Int32).decode(buf)?
503 };
504 let partition_epoch = types::Int32.decode(buf)?;
505 let replicas = if version >= 4 {
506 types::CompactArray(types::Int32).decode(buf)?
507 } else {
508 types::Array(types::Int32).decode(buf)?
509 };
510 let adding_replicas = if version >= 3 {
511 if version >= 4 {
512 types::CompactArray(types::Int32).decode(buf)?
513 } else {
514 types::Array(types::Int32).decode(buf)?
515 }
516 } else {
517 Default::default()
518 };
519 let removing_replicas = if version >= 3 {
520 if version >= 4 {
521 types::CompactArray(types::Int32).decode(buf)?
522 } else {
523 types::Array(types::Int32).decode(buf)?
524 }
525 } else {
526 Default::default()
527 };
528 let is_new = if version >= 1 {
529 types::Boolean.decode(buf)?
530 } else {
531 false
532 };
533 let leader_recovery_state = if version >= 6 {
534 types::Int8.decode(buf)?
535 } else {
536 0
537 };
538 let mut unknown_tagged_fields = BTreeMap::new();
539 if version >= 4 {
540 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
541 for _ in 0..num_tagged_fields {
542 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
543 let size: u32 = types::UnsignedVarInt.decode(buf)?;
544 let unknown_value = buf.try_get_bytes(size as usize)?;
545 unknown_tagged_fields.insert(tag as i32, unknown_value);
546 }
547 }
548 Ok(Self {
549 topic_name,
550 partition_index,
551 controller_epoch,
552 leader,
553 leader_epoch,
554 isr,
555 partition_epoch,
556 replicas,
557 adding_replicas,
558 removing_replicas,
559 is_new,
560 leader_recovery_state,
561 unknown_tagged_fields,
562 })
563 }
564}
565
566impl Default for LeaderAndIsrPartitionState {
567 fn default() -> Self {
568 Self {
569 topic_name: Default::default(),
570 partition_index: 0,
571 controller_epoch: 0,
572 leader: (0).into(),
573 leader_epoch: 0,
574 isr: Default::default(),
575 partition_epoch: 0,
576 replicas: Default::default(),
577 adding_replicas: Default::default(),
578 removing_replicas: Default::default(),
579 is_new: false,
580 leader_recovery_state: 0,
581 unknown_tagged_fields: BTreeMap::new(),
582 }
583 }
584}
585
586impl Message for LeaderAndIsrPartitionState {
587 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
588 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
589}
590
591#[non_exhaustive]
593#[derive(Debug, Clone, PartialEq)]
594pub struct LeaderAndIsrRequest {
595 pub controller_id: super::BrokerId,
599
600 pub is_k_raft_controller: bool,
604
605 pub controller_epoch: i32,
609
610 pub broker_epoch: i64,
614
615 pub _type: i8,
619
620 pub ungrouped_partition_states: Vec<LeaderAndIsrPartitionState>,
624
625 pub topic_states: Vec<LeaderAndIsrTopicState>,
629
630 pub live_leaders: Vec<LeaderAndIsrLiveLeader>,
634
635 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
637}
638
639impl LeaderAndIsrRequest {
640 pub fn with_controller_id(mut self, value: super::BrokerId) -> Self {
646 self.controller_id = value;
647 self
648 }
649 pub fn with_is_k_raft_controller(mut self, value: bool) -> Self {
655 self.is_k_raft_controller = value;
656 self
657 }
658 pub fn with_controller_epoch(mut self, value: i32) -> Self {
664 self.controller_epoch = value;
665 self
666 }
667 pub fn with_broker_epoch(mut self, value: i64) -> Self {
673 self.broker_epoch = value;
674 self
675 }
676 pub fn with_type(mut self, value: i8) -> Self {
682 self._type = value;
683 self
684 }
685 pub fn with_ungrouped_partition_states(
691 mut self,
692 value: Vec<LeaderAndIsrPartitionState>,
693 ) -> Self {
694 self.ungrouped_partition_states = value;
695 self
696 }
697 pub fn with_topic_states(mut self, value: Vec<LeaderAndIsrTopicState>) -> Self {
703 self.topic_states = value;
704 self
705 }
706 pub fn with_live_leaders(mut self, value: Vec<LeaderAndIsrLiveLeader>) -> Self {
712 self.live_leaders = value;
713 self
714 }
715 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
717 self.unknown_tagged_fields = value;
718 self
719 }
720 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
722 self.unknown_tagged_fields.insert(key, value);
723 self
724 }
725}
726
727#[cfg(feature = "client")]
728impl Encodable for LeaderAndIsrRequest {
729 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
730 types::Int32.encode(buf, &self.controller_id)?;
731 if version >= 7 {
732 types::Boolean.encode(buf, &self.is_k_raft_controller)?;
733 } else {
734 if self.is_k_raft_controller {
735 bail!("A field is set that is not available on the selected protocol version");
736 }
737 }
738 types::Int32.encode(buf, &self.controller_epoch)?;
739 if version >= 2 {
740 types::Int64.encode(buf, &self.broker_epoch)?;
741 }
742 if version >= 5 {
743 types::Int8.encode(buf, &self._type)?;
744 } else {
745 if self._type != 0 {
746 bail!("A field is set that is not available on the selected protocol version");
747 }
748 }
749 if version <= 1 {
750 types::Array(types::Struct { version })
751 .encode(buf, &self.ungrouped_partition_states)?;
752 } else {
753 if !self.ungrouped_partition_states.is_empty() {
754 bail!("A field is set that is not available on the selected protocol version");
755 }
756 }
757 if version >= 2 {
758 if version >= 4 {
759 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_states)?;
760 } else {
761 types::Array(types::Struct { version }).encode(buf, &self.topic_states)?;
762 }
763 } else {
764 if !self.topic_states.is_empty() {
765 bail!("A field is set that is not available on the selected protocol version");
766 }
767 }
768 if version >= 4 {
769 types::CompactArray(types::Struct { version }).encode(buf, &self.live_leaders)?;
770 } else {
771 types::Array(types::Struct { version }).encode(buf, &self.live_leaders)?;
772 }
773 if version >= 4 {
774 let num_tagged_fields = self.unknown_tagged_fields.len();
775 if num_tagged_fields > std::u32::MAX as usize {
776 bail!(
777 "Too many tagged fields to encode ({} fields)",
778 num_tagged_fields
779 );
780 }
781 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
782
783 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
784 }
785 Ok(())
786 }
787 fn compute_size(&self, version: i16) -> Result<usize> {
788 let mut total_size = 0;
789 total_size += types::Int32.compute_size(&self.controller_id)?;
790 if version >= 7 {
791 total_size += types::Boolean.compute_size(&self.is_k_raft_controller)?;
792 } else {
793 if self.is_k_raft_controller {
794 bail!("A field is set that is not available on the selected protocol version");
795 }
796 }
797 total_size += types::Int32.compute_size(&self.controller_epoch)?;
798 if version >= 2 {
799 total_size += types::Int64.compute_size(&self.broker_epoch)?;
800 }
801 if version >= 5 {
802 total_size += types::Int8.compute_size(&self._type)?;
803 } else {
804 if self._type != 0 {
805 bail!("A field is set that is not available on the selected protocol version");
806 }
807 }
808 if version <= 1 {
809 total_size += types::Array(types::Struct { version })
810 .compute_size(&self.ungrouped_partition_states)?;
811 } else {
812 if !self.ungrouped_partition_states.is_empty() {
813 bail!("A field is set that is not available on the selected protocol version");
814 }
815 }
816 if version >= 2 {
817 if version >= 4 {
818 total_size += types::CompactArray(types::Struct { version })
819 .compute_size(&self.topic_states)?;
820 } else {
821 total_size +=
822 types::Array(types::Struct { version }).compute_size(&self.topic_states)?;
823 }
824 } else {
825 if !self.topic_states.is_empty() {
826 bail!("A field is set that is not available on the selected protocol version");
827 }
828 }
829 if version >= 4 {
830 total_size +=
831 types::CompactArray(types::Struct { version }).compute_size(&self.live_leaders)?;
832 } else {
833 total_size +=
834 types::Array(types::Struct { version }).compute_size(&self.live_leaders)?;
835 }
836 if version >= 4 {
837 let num_tagged_fields = self.unknown_tagged_fields.len();
838 if num_tagged_fields > std::u32::MAX as usize {
839 bail!(
840 "Too many tagged fields to encode ({} fields)",
841 num_tagged_fields
842 );
843 }
844 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
845
846 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
847 }
848 Ok(total_size)
849 }
850}
851
852#[cfg(feature = "broker")]
853impl Decodable for LeaderAndIsrRequest {
854 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
855 let controller_id = types::Int32.decode(buf)?;
856 let is_k_raft_controller = if version >= 7 {
857 types::Boolean.decode(buf)?
858 } else {
859 false
860 };
861 let controller_epoch = types::Int32.decode(buf)?;
862 let broker_epoch = if version >= 2 {
863 types::Int64.decode(buf)?
864 } else {
865 -1
866 };
867 let _type = if version >= 5 {
868 types::Int8.decode(buf)?
869 } else {
870 0
871 };
872 let ungrouped_partition_states = if version <= 1 {
873 types::Array(types::Struct { version }).decode(buf)?
874 } else {
875 Default::default()
876 };
877 let topic_states = if version >= 2 {
878 if version >= 4 {
879 types::CompactArray(types::Struct { version }).decode(buf)?
880 } else {
881 types::Array(types::Struct { version }).decode(buf)?
882 }
883 } else {
884 Default::default()
885 };
886 let live_leaders = if version >= 4 {
887 types::CompactArray(types::Struct { version }).decode(buf)?
888 } else {
889 types::Array(types::Struct { version }).decode(buf)?
890 };
891 let mut unknown_tagged_fields = BTreeMap::new();
892 if version >= 4 {
893 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
894 for _ in 0..num_tagged_fields {
895 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
896 let size: u32 = types::UnsignedVarInt.decode(buf)?;
897 let unknown_value = buf.try_get_bytes(size as usize)?;
898 unknown_tagged_fields.insert(tag as i32, unknown_value);
899 }
900 }
901 Ok(Self {
902 controller_id,
903 is_k_raft_controller,
904 controller_epoch,
905 broker_epoch,
906 _type,
907 ungrouped_partition_states,
908 topic_states,
909 live_leaders,
910 unknown_tagged_fields,
911 })
912 }
913}
914
915impl Default for LeaderAndIsrRequest {
916 fn default() -> Self {
917 Self {
918 controller_id: (0).into(),
919 is_k_raft_controller: false,
920 controller_epoch: 0,
921 broker_epoch: -1,
922 _type: 0,
923 ungrouped_partition_states: Default::default(),
924 topic_states: Default::default(),
925 live_leaders: Default::default(),
926 unknown_tagged_fields: BTreeMap::new(),
927 }
928 }
929}
930
931impl Message for LeaderAndIsrRequest {
932 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
933 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
934}
935
936#[non_exhaustive]
938#[derive(Debug, Clone, PartialEq)]
939pub struct LeaderAndIsrTopicState {
940 pub topic_name: super::TopicName,
944
945 pub topic_id: Uuid,
949
950 pub partition_states: Vec<LeaderAndIsrPartitionState>,
954
955 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
957}
958
959impl LeaderAndIsrTopicState {
960 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
966 self.topic_name = value;
967 self
968 }
969 pub fn with_topic_id(mut self, value: Uuid) -> Self {
975 self.topic_id = value;
976 self
977 }
978 pub fn with_partition_states(mut self, value: Vec<LeaderAndIsrPartitionState>) -> Self {
984 self.partition_states = value;
985 self
986 }
987 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
989 self.unknown_tagged_fields = value;
990 self
991 }
992 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
994 self.unknown_tagged_fields.insert(key, value);
995 self
996 }
997}
998
999#[cfg(feature = "client")]
1000impl Encodable for LeaderAndIsrTopicState {
1001 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1002 if version >= 2 {
1003 if version >= 4 {
1004 types::CompactString.encode(buf, &self.topic_name)?;
1005 } else {
1006 types::String.encode(buf, &self.topic_name)?;
1007 }
1008 } else {
1009 if !self.topic_name.is_empty() {
1010 bail!("A field is set that is not available on the selected protocol version");
1011 }
1012 }
1013 if version >= 5 {
1014 types::Uuid.encode(buf, &self.topic_id)?;
1015 }
1016 if version >= 2 {
1017 if version >= 4 {
1018 types::CompactArray(types::Struct { version })
1019 .encode(buf, &self.partition_states)?;
1020 } else {
1021 types::Array(types::Struct { version }).encode(buf, &self.partition_states)?;
1022 }
1023 } else {
1024 if !self.partition_states.is_empty() {
1025 bail!("A field is set that is not available on the selected protocol version");
1026 }
1027 }
1028 if version >= 4 {
1029 let num_tagged_fields = self.unknown_tagged_fields.len();
1030 if num_tagged_fields > std::u32::MAX as usize {
1031 bail!(
1032 "Too many tagged fields to encode ({} fields)",
1033 num_tagged_fields
1034 );
1035 }
1036 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1037
1038 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1039 }
1040 Ok(())
1041 }
1042 fn compute_size(&self, version: i16) -> Result<usize> {
1043 let mut total_size = 0;
1044 if version >= 2 {
1045 if version >= 4 {
1046 total_size += types::CompactString.compute_size(&self.topic_name)?;
1047 } else {
1048 total_size += types::String.compute_size(&self.topic_name)?;
1049 }
1050 } else {
1051 if !self.topic_name.is_empty() {
1052 bail!("A field is set that is not available on the selected protocol version");
1053 }
1054 }
1055 if version >= 5 {
1056 total_size += types::Uuid.compute_size(&self.topic_id)?;
1057 }
1058 if version >= 2 {
1059 if version >= 4 {
1060 total_size += types::CompactArray(types::Struct { version })
1061 .compute_size(&self.partition_states)?;
1062 } else {
1063 total_size +=
1064 types::Array(types::Struct { version }).compute_size(&self.partition_states)?;
1065 }
1066 } else {
1067 if !self.partition_states.is_empty() {
1068 bail!("A field is set that is not available on the selected protocol version");
1069 }
1070 }
1071 if version >= 4 {
1072 let num_tagged_fields = self.unknown_tagged_fields.len();
1073 if num_tagged_fields > std::u32::MAX as usize {
1074 bail!(
1075 "Too many tagged fields to encode ({} fields)",
1076 num_tagged_fields
1077 );
1078 }
1079 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1080
1081 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1082 }
1083 Ok(total_size)
1084 }
1085}
1086
1087#[cfg(feature = "broker")]
1088impl Decodable for LeaderAndIsrTopicState {
1089 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1090 let topic_name = if version >= 2 {
1091 if version >= 4 {
1092 types::CompactString.decode(buf)?
1093 } else {
1094 types::String.decode(buf)?
1095 }
1096 } else {
1097 Default::default()
1098 };
1099 let topic_id = if version >= 5 {
1100 types::Uuid.decode(buf)?
1101 } else {
1102 Uuid::nil()
1103 };
1104 let partition_states = if version >= 2 {
1105 if version >= 4 {
1106 types::CompactArray(types::Struct { version }).decode(buf)?
1107 } else {
1108 types::Array(types::Struct { version }).decode(buf)?
1109 }
1110 } else {
1111 Default::default()
1112 };
1113 let mut unknown_tagged_fields = BTreeMap::new();
1114 if version >= 4 {
1115 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1116 for _ in 0..num_tagged_fields {
1117 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1118 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1119 let unknown_value = buf.try_get_bytes(size as usize)?;
1120 unknown_tagged_fields.insert(tag as i32, unknown_value);
1121 }
1122 }
1123 Ok(Self {
1124 topic_name,
1125 topic_id,
1126 partition_states,
1127 unknown_tagged_fields,
1128 })
1129 }
1130}
1131
1132impl Default for LeaderAndIsrTopicState {
1133 fn default() -> Self {
1134 Self {
1135 topic_name: Default::default(),
1136 topic_id: Uuid::nil(),
1137 partition_states: Default::default(),
1138 unknown_tagged_fields: BTreeMap::new(),
1139 }
1140 }
1141}
1142
1143impl Message for LeaderAndIsrTopicState {
1144 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
1145 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1146}
1147
1148impl HeaderVersion for LeaderAndIsrRequest {
1149 fn header_version(version: i16) -> i16 {
1150 if version >= 4 {
1151 2
1152 } else {
1153 1
1154 }
1155 }
1156}