1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct LeaderAndIsrLiveLeader {
24 pub broker_id: super::BrokerId,
28
29 pub host_name: StrBytes,
33
34 pub port: i32,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl LeaderAndIsrLiveLeader {
44 pub fn with_broker_id(mut self, value: super::BrokerId) -> Self {
50 self.broker_id = value;
51 self
52 }
53 pub fn with_host_name(mut self, value: StrBytes) -> Self {
59 self.host_name = value;
60 self
61 }
62 pub fn with_port(mut self, value: i32) -> Self {
68 self.port = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for LeaderAndIsrLiveLeader {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version < 0 || version > 7 {
87 bail!("specified version not supported by this message type");
88 }
89 types::Int32.encode(buf, &self.broker_id)?;
90 if version >= 4 {
91 types::CompactString.encode(buf, &self.host_name)?;
92 } else {
93 types::String.encode(buf, &self.host_name)?;
94 }
95 types::Int32.encode(buf, &self.port)?;
96 if version >= 4 {
97 let num_tagged_fields = self.unknown_tagged_fields.len();
98 if num_tagged_fields > std::u32::MAX as usize {
99 bail!(
100 "Too many tagged fields to encode ({} fields)",
101 num_tagged_fields
102 );
103 }
104 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
105
106 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
107 }
108 Ok(())
109 }
110 fn compute_size(&self, version: i16) -> Result<usize> {
111 let mut total_size = 0;
112 total_size += types::Int32.compute_size(&self.broker_id)?;
113 if version >= 4 {
114 total_size += types::CompactString.compute_size(&self.host_name)?;
115 } else {
116 total_size += types::String.compute_size(&self.host_name)?;
117 }
118 total_size += types::Int32.compute_size(&self.port)?;
119 if version >= 4 {
120 let num_tagged_fields = self.unknown_tagged_fields.len();
121 if num_tagged_fields > std::u32::MAX as usize {
122 bail!(
123 "Too many tagged fields to encode ({} fields)",
124 num_tagged_fields
125 );
126 }
127 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
128
129 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
130 }
131 Ok(total_size)
132 }
133}
134
135#[cfg(feature = "broker")]
136impl Decodable for LeaderAndIsrLiveLeader {
137 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
138 if version < 0 || version > 7 {
139 bail!("specified version not supported by this message type");
140 }
141 let broker_id = types::Int32.decode(buf)?;
142 let host_name = if version >= 4 {
143 types::CompactString.decode(buf)?
144 } else {
145 types::String.decode(buf)?
146 };
147 let port = types::Int32.decode(buf)?;
148 let mut unknown_tagged_fields = BTreeMap::new();
149 if version >= 4 {
150 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
151 for _ in 0..num_tagged_fields {
152 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
153 let size: u32 = types::UnsignedVarInt.decode(buf)?;
154 let unknown_value = buf.try_get_bytes(size as usize)?;
155 unknown_tagged_fields.insert(tag as i32, unknown_value);
156 }
157 }
158 Ok(Self {
159 broker_id,
160 host_name,
161 port,
162 unknown_tagged_fields,
163 })
164 }
165}
166
167impl Default for LeaderAndIsrLiveLeader {
168 fn default() -> Self {
169 Self {
170 broker_id: (0).into(),
171 host_name: Default::default(),
172 port: 0,
173 unknown_tagged_fields: BTreeMap::new(),
174 }
175 }
176}
177
178impl Message for LeaderAndIsrLiveLeader {
179 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
180 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
181}
182
183#[non_exhaustive]
185#[derive(Debug, Clone, PartialEq)]
186pub struct LeaderAndIsrPartitionState {
187 pub topic_name: super::TopicName,
191
192 pub partition_index: i32,
196
197 pub controller_epoch: i32,
201
202 pub leader: super::BrokerId,
206
207 pub leader_epoch: i32,
211
212 pub isr: Vec<super::BrokerId>,
216
217 pub partition_epoch: i32,
221
222 pub replicas: Vec<super::BrokerId>,
226
227 pub adding_replicas: Vec<super::BrokerId>,
231
232 pub removing_replicas: Vec<super::BrokerId>,
236
237 pub is_new: bool,
241
242 pub leader_recovery_state: i8,
246
247 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
249}
250
251impl LeaderAndIsrPartitionState {
252 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
258 self.topic_name = value;
259 self
260 }
261 pub fn with_partition_index(mut self, value: i32) -> Self {
267 self.partition_index = value;
268 self
269 }
270 pub fn with_controller_epoch(mut self, value: i32) -> Self {
276 self.controller_epoch = value;
277 self
278 }
279 pub fn with_leader(mut self, value: super::BrokerId) -> Self {
285 self.leader = value;
286 self
287 }
288 pub fn with_leader_epoch(mut self, value: i32) -> Self {
294 self.leader_epoch = value;
295 self
296 }
297 pub fn with_isr(mut self, value: Vec<super::BrokerId>) -> Self {
303 self.isr = value;
304 self
305 }
306 pub fn with_partition_epoch(mut self, value: i32) -> Self {
312 self.partition_epoch = value;
313 self
314 }
315 pub fn with_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
321 self.replicas = value;
322 self
323 }
324 pub fn with_adding_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
330 self.adding_replicas = value;
331 self
332 }
333 pub fn with_removing_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
339 self.removing_replicas = value;
340 self
341 }
342 pub fn with_is_new(mut self, value: bool) -> Self {
348 self.is_new = value;
349 self
350 }
351 pub fn with_leader_recovery_state(mut self, value: i8) -> Self {
357 self.leader_recovery_state = value;
358 self
359 }
360 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
362 self.unknown_tagged_fields = value;
363 self
364 }
365 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
367 self.unknown_tagged_fields.insert(key, value);
368 self
369 }
370}
371
372#[cfg(feature = "client")]
373impl Encodable for LeaderAndIsrPartitionState {
374 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
375 if version < 0 || version > 7 {
376 bail!("specified version not supported by this message type");
377 }
378 if version <= 1 {
379 types::String.encode(buf, &self.topic_name)?;
380 }
381 types::Int32.encode(buf, &self.partition_index)?;
382 types::Int32.encode(buf, &self.controller_epoch)?;
383 types::Int32.encode(buf, &self.leader)?;
384 types::Int32.encode(buf, &self.leader_epoch)?;
385 if version >= 4 {
386 types::CompactArray(types::Int32).encode(buf, &self.isr)?;
387 } else {
388 types::Array(types::Int32).encode(buf, &self.isr)?;
389 }
390 types::Int32.encode(buf, &self.partition_epoch)?;
391 if version >= 4 {
392 types::CompactArray(types::Int32).encode(buf, &self.replicas)?;
393 } else {
394 types::Array(types::Int32).encode(buf, &self.replicas)?;
395 }
396 if version >= 3 {
397 if version >= 4 {
398 types::CompactArray(types::Int32).encode(buf, &self.adding_replicas)?;
399 } else {
400 types::Array(types::Int32).encode(buf, &self.adding_replicas)?;
401 }
402 }
403 if version >= 3 {
404 if version >= 4 {
405 types::CompactArray(types::Int32).encode(buf, &self.removing_replicas)?;
406 } else {
407 types::Array(types::Int32).encode(buf, &self.removing_replicas)?;
408 }
409 }
410 if version >= 1 {
411 types::Boolean.encode(buf, &self.is_new)?;
412 }
413 if version >= 6 {
414 types::Int8.encode(buf, &self.leader_recovery_state)?;
415 } else {
416 if self.leader_recovery_state != 0 {
417 bail!("A field is set that is not available on the selected protocol version");
418 }
419 }
420 if version >= 4 {
421 let num_tagged_fields = self.unknown_tagged_fields.len();
422 if num_tagged_fields > std::u32::MAX as usize {
423 bail!(
424 "Too many tagged fields to encode ({} fields)",
425 num_tagged_fields
426 );
427 }
428 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
429
430 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
431 }
432 Ok(())
433 }
434 fn compute_size(&self, version: i16) -> Result<usize> {
435 let mut total_size = 0;
436 if version <= 1 {
437 total_size += types::String.compute_size(&self.topic_name)?;
438 }
439 total_size += types::Int32.compute_size(&self.partition_index)?;
440 total_size += types::Int32.compute_size(&self.controller_epoch)?;
441 total_size += types::Int32.compute_size(&self.leader)?;
442 total_size += types::Int32.compute_size(&self.leader_epoch)?;
443 if version >= 4 {
444 total_size += types::CompactArray(types::Int32).compute_size(&self.isr)?;
445 } else {
446 total_size += types::Array(types::Int32).compute_size(&self.isr)?;
447 }
448 total_size += types::Int32.compute_size(&self.partition_epoch)?;
449 if version >= 4 {
450 total_size += types::CompactArray(types::Int32).compute_size(&self.replicas)?;
451 } else {
452 total_size += types::Array(types::Int32).compute_size(&self.replicas)?;
453 }
454 if version >= 3 {
455 if version >= 4 {
456 total_size +=
457 types::CompactArray(types::Int32).compute_size(&self.adding_replicas)?;
458 } else {
459 total_size += types::Array(types::Int32).compute_size(&self.adding_replicas)?;
460 }
461 }
462 if version >= 3 {
463 if version >= 4 {
464 total_size +=
465 types::CompactArray(types::Int32).compute_size(&self.removing_replicas)?;
466 } else {
467 total_size += types::Array(types::Int32).compute_size(&self.removing_replicas)?;
468 }
469 }
470 if version >= 1 {
471 total_size += types::Boolean.compute_size(&self.is_new)?;
472 }
473 if version >= 6 {
474 total_size += types::Int8.compute_size(&self.leader_recovery_state)?;
475 } else {
476 if self.leader_recovery_state != 0 {
477 bail!("A field is set that is not available on the selected protocol version");
478 }
479 }
480 if version >= 4 {
481 let num_tagged_fields = self.unknown_tagged_fields.len();
482 if num_tagged_fields > std::u32::MAX as usize {
483 bail!(
484 "Too many tagged fields to encode ({} fields)",
485 num_tagged_fields
486 );
487 }
488 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
489
490 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
491 }
492 Ok(total_size)
493 }
494}
495
496#[cfg(feature = "broker")]
497impl Decodable for LeaderAndIsrPartitionState {
498 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
499 if version < 0 || version > 7 {
500 bail!("specified version not supported by this message type");
501 }
502 let topic_name = if version <= 1 {
503 types::String.decode(buf)?
504 } else {
505 Default::default()
506 };
507 let partition_index = types::Int32.decode(buf)?;
508 let controller_epoch = types::Int32.decode(buf)?;
509 let leader = types::Int32.decode(buf)?;
510 let leader_epoch = types::Int32.decode(buf)?;
511 let isr = if version >= 4 {
512 types::CompactArray(types::Int32).decode(buf)?
513 } else {
514 types::Array(types::Int32).decode(buf)?
515 };
516 let partition_epoch = types::Int32.decode(buf)?;
517 let replicas = if version >= 4 {
518 types::CompactArray(types::Int32).decode(buf)?
519 } else {
520 types::Array(types::Int32).decode(buf)?
521 };
522 let adding_replicas = if version >= 3 {
523 if version >= 4 {
524 types::CompactArray(types::Int32).decode(buf)?
525 } else {
526 types::Array(types::Int32).decode(buf)?
527 }
528 } else {
529 Default::default()
530 };
531 let removing_replicas = if version >= 3 {
532 if version >= 4 {
533 types::CompactArray(types::Int32).decode(buf)?
534 } else {
535 types::Array(types::Int32).decode(buf)?
536 }
537 } else {
538 Default::default()
539 };
540 let is_new = if version >= 1 {
541 types::Boolean.decode(buf)?
542 } else {
543 false
544 };
545 let leader_recovery_state = if version >= 6 {
546 types::Int8.decode(buf)?
547 } else {
548 0
549 };
550 let mut unknown_tagged_fields = BTreeMap::new();
551 if version >= 4 {
552 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
553 for _ in 0..num_tagged_fields {
554 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
555 let size: u32 = types::UnsignedVarInt.decode(buf)?;
556 let unknown_value = buf.try_get_bytes(size as usize)?;
557 unknown_tagged_fields.insert(tag as i32, unknown_value);
558 }
559 }
560 Ok(Self {
561 topic_name,
562 partition_index,
563 controller_epoch,
564 leader,
565 leader_epoch,
566 isr,
567 partition_epoch,
568 replicas,
569 adding_replicas,
570 removing_replicas,
571 is_new,
572 leader_recovery_state,
573 unknown_tagged_fields,
574 })
575 }
576}
577
578impl Default for LeaderAndIsrPartitionState {
579 fn default() -> Self {
580 Self {
581 topic_name: Default::default(),
582 partition_index: 0,
583 controller_epoch: 0,
584 leader: (0).into(),
585 leader_epoch: 0,
586 isr: Default::default(),
587 partition_epoch: 0,
588 replicas: Default::default(),
589 adding_replicas: Default::default(),
590 removing_replicas: Default::default(),
591 is_new: false,
592 leader_recovery_state: 0,
593 unknown_tagged_fields: BTreeMap::new(),
594 }
595 }
596}
597
598impl Message for LeaderAndIsrPartitionState {
599 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
600 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
601}
602
603#[non_exhaustive]
605#[derive(Debug, Clone, PartialEq)]
606pub struct LeaderAndIsrRequest {
607 pub controller_id: super::BrokerId,
611
612 pub is_k_raft_controller: bool,
616
617 pub controller_epoch: i32,
621
622 pub broker_epoch: i64,
626
627 pub _type: i8,
631
632 pub ungrouped_partition_states: Vec<LeaderAndIsrPartitionState>,
636
637 pub topic_states: Vec<LeaderAndIsrTopicState>,
641
642 pub live_leaders: Vec<LeaderAndIsrLiveLeader>,
646
647 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
649}
650
651impl LeaderAndIsrRequest {
652 pub fn with_controller_id(mut self, value: super::BrokerId) -> Self {
658 self.controller_id = value;
659 self
660 }
661 pub fn with_is_k_raft_controller(mut self, value: bool) -> Self {
667 self.is_k_raft_controller = value;
668 self
669 }
670 pub fn with_controller_epoch(mut self, value: i32) -> Self {
676 self.controller_epoch = value;
677 self
678 }
679 pub fn with_broker_epoch(mut self, value: i64) -> Self {
685 self.broker_epoch = value;
686 self
687 }
688 pub fn with_type(mut self, value: i8) -> Self {
694 self._type = value;
695 self
696 }
697 pub fn with_ungrouped_partition_states(
703 mut self,
704 value: Vec<LeaderAndIsrPartitionState>,
705 ) -> Self {
706 self.ungrouped_partition_states = value;
707 self
708 }
709 pub fn with_topic_states(mut self, value: Vec<LeaderAndIsrTopicState>) -> Self {
715 self.topic_states = value;
716 self
717 }
718 pub fn with_live_leaders(mut self, value: Vec<LeaderAndIsrLiveLeader>) -> Self {
724 self.live_leaders = value;
725 self
726 }
727 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
729 self.unknown_tagged_fields = value;
730 self
731 }
732 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
734 self.unknown_tagged_fields.insert(key, value);
735 self
736 }
737}
738
739#[cfg(feature = "client")]
740impl Encodable for LeaderAndIsrRequest {
741 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
742 if version < 0 || version > 7 {
743 bail!("specified version not supported by this message type");
744 }
745 types::Int32.encode(buf, &self.controller_id)?;
746 if version >= 7 {
747 types::Boolean.encode(buf, &self.is_k_raft_controller)?;
748 } else {
749 if self.is_k_raft_controller {
750 bail!("A field is set that is not available on the selected protocol version");
751 }
752 }
753 types::Int32.encode(buf, &self.controller_epoch)?;
754 if version >= 2 {
755 types::Int64.encode(buf, &self.broker_epoch)?;
756 }
757 if version >= 5 {
758 types::Int8.encode(buf, &self._type)?;
759 } else {
760 if self._type != 0 {
761 bail!("A field is set that is not available on the selected protocol version");
762 }
763 }
764 if version <= 1 {
765 types::Array(types::Struct { version })
766 .encode(buf, &self.ungrouped_partition_states)?;
767 } else {
768 if !self.ungrouped_partition_states.is_empty() {
769 bail!("A field is set that is not available on the selected protocol version");
770 }
771 }
772 if version >= 2 {
773 if version >= 4 {
774 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_states)?;
775 } else {
776 types::Array(types::Struct { version }).encode(buf, &self.topic_states)?;
777 }
778 } else {
779 if !self.topic_states.is_empty() {
780 bail!("A field is set that is not available on the selected protocol version");
781 }
782 }
783 if version >= 4 {
784 types::CompactArray(types::Struct { version }).encode(buf, &self.live_leaders)?;
785 } else {
786 types::Array(types::Struct { version }).encode(buf, &self.live_leaders)?;
787 }
788 if version >= 4 {
789 let num_tagged_fields = self.unknown_tagged_fields.len();
790 if num_tagged_fields > std::u32::MAX as usize {
791 bail!(
792 "Too many tagged fields to encode ({} fields)",
793 num_tagged_fields
794 );
795 }
796 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
797
798 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
799 }
800 Ok(())
801 }
802 fn compute_size(&self, version: i16) -> Result<usize> {
803 let mut total_size = 0;
804 total_size += types::Int32.compute_size(&self.controller_id)?;
805 if version >= 7 {
806 total_size += types::Boolean.compute_size(&self.is_k_raft_controller)?;
807 } else {
808 if self.is_k_raft_controller {
809 bail!("A field is set that is not available on the selected protocol version");
810 }
811 }
812 total_size += types::Int32.compute_size(&self.controller_epoch)?;
813 if version >= 2 {
814 total_size += types::Int64.compute_size(&self.broker_epoch)?;
815 }
816 if version >= 5 {
817 total_size += types::Int8.compute_size(&self._type)?;
818 } else {
819 if self._type != 0 {
820 bail!("A field is set that is not available on the selected protocol version");
821 }
822 }
823 if version <= 1 {
824 total_size += types::Array(types::Struct { version })
825 .compute_size(&self.ungrouped_partition_states)?;
826 } else {
827 if !self.ungrouped_partition_states.is_empty() {
828 bail!("A field is set that is not available on the selected protocol version");
829 }
830 }
831 if version >= 2 {
832 if version >= 4 {
833 total_size += types::CompactArray(types::Struct { version })
834 .compute_size(&self.topic_states)?;
835 } else {
836 total_size +=
837 types::Array(types::Struct { version }).compute_size(&self.topic_states)?;
838 }
839 } else {
840 if !self.topic_states.is_empty() {
841 bail!("A field is set that is not available on the selected protocol version");
842 }
843 }
844 if version >= 4 {
845 total_size +=
846 types::CompactArray(types::Struct { version }).compute_size(&self.live_leaders)?;
847 } else {
848 total_size +=
849 types::Array(types::Struct { version }).compute_size(&self.live_leaders)?;
850 }
851 if version >= 4 {
852 let num_tagged_fields = self.unknown_tagged_fields.len();
853 if num_tagged_fields > std::u32::MAX as usize {
854 bail!(
855 "Too many tagged fields to encode ({} fields)",
856 num_tagged_fields
857 );
858 }
859 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
860
861 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
862 }
863 Ok(total_size)
864 }
865}
866
867#[cfg(feature = "broker")]
868impl Decodable for LeaderAndIsrRequest {
869 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
870 if version < 0 || version > 7 {
871 bail!("specified version not supported by this message type");
872 }
873 let controller_id = types::Int32.decode(buf)?;
874 let is_k_raft_controller = if version >= 7 {
875 types::Boolean.decode(buf)?
876 } else {
877 false
878 };
879 let controller_epoch = types::Int32.decode(buf)?;
880 let broker_epoch = if version >= 2 {
881 types::Int64.decode(buf)?
882 } else {
883 -1
884 };
885 let _type = if version >= 5 {
886 types::Int8.decode(buf)?
887 } else {
888 0
889 };
890 let ungrouped_partition_states = if version <= 1 {
891 types::Array(types::Struct { version }).decode(buf)?
892 } else {
893 Default::default()
894 };
895 let topic_states = if version >= 2 {
896 if version >= 4 {
897 types::CompactArray(types::Struct { version }).decode(buf)?
898 } else {
899 types::Array(types::Struct { version }).decode(buf)?
900 }
901 } else {
902 Default::default()
903 };
904 let live_leaders = if version >= 4 {
905 types::CompactArray(types::Struct { version }).decode(buf)?
906 } else {
907 types::Array(types::Struct { version }).decode(buf)?
908 };
909 let mut unknown_tagged_fields = BTreeMap::new();
910 if version >= 4 {
911 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
912 for _ in 0..num_tagged_fields {
913 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
914 let size: u32 = types::UnsignedVarInt.decode(buf)?;
915 let unknown_value = buf.try_get_bytes(size as usize)?;
916 unknown_tagged_fields.insert(tag as i32, unknown_value);
917 }
918 }
919 Ok(Self {
920 controller_id,
921 is_k_raft_controller,
922 controller_epoch,
923 broker_epoch,
924 _type,
925 ungrouped_partition_states,
926 topic_states,
927 live_leaders,
928 unknown_tagged_fields,
929 })
930 }
931}
932
933impl Default for LeaderAndIsrRequest {
934 fn default() -> Self {
935 Self {
936 controller_id: (0).into(),
937 is_k_raft_controller: false,
938 controller_epoch: 0,
939 broker_epoch: -1,
940 _type: 0,
941 ungrouped_partition_states: Default::default(),
942 topic_states: Default::default(),
943 live_leaders: Default::default(),
944 unknown_tagged_fields: BTreeMap::new(),
945 }
946 }
947}
948
949impl Message for LeaderAndIsrRequest {
950 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
951 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
952}
953
954#[non_exhaustive]
956#[derive(Debug, Clone, PartialEq)]
957pub struct LeaderAndIsrTopicState {
958 pub topic_name: super::TopicName,
962
963 pub topic_id: Uuid,
967
968 pub partition_states: Vec<LeaderAndIsrPartitionState>,
972
973 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
975}
976
977impl LeaderAndIsrTopicState {
978 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
984 self.topic_name = value;
985 self
986 }
987 pub fn with_topic_id(mut self, value: Uuid) -> Self {
993 self.topic_id = value;
994 self
995 }
996 pub fn with_partition_states(mut self, value: Vec<LeaderAndIsrPartitionState>) -> Self {
1002 self.partition_states = value;
1003 self
1004 }
1005 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1007 self.unknown_tagged_fields = value;
1008 self
1009 }
1010 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1012 self.unknown_tagged_fields.insert(key, value);
1013 self
1014 }
1015}
1016
1017#[cfg(feature = "client")]
1018impl Encodable for LeaderAndIsrTopicState {
1019 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1020 if version < 0 || version > 7 {
1021 bail!("specified version not supported by this message type");
1022 }
1023 if version >= 2 {
1024 if version >= 4 {
1025 types::CompactString.encode(buf, &self.topic_name)?;
1026 } else {
1027 types::String.encode(buf, &self.topic_name)?;
1028 }
1029 } else {
1030 if !self.topic_name.is_empty() {
1031 bail!("A field is set that is not available on the selected protocol version");
1032 }
1033 }
1034 if version >= 5 {
1035 types::Uuid.encode(buf, &self.topic_id)?;
1036 }
1037 if version >= 2 {
1038 if version >= 4 {
1039 types::CompactArray(types::Struct { version })
1040 .encode(buf, &self.partition_states)?;
1041 } else {
1042 types::Array(types::Struct { version }).encode(buf, &self.partition_states)?;
1043 }
1044 } else {
1045 if !self.partition_states.is_empty() {
1046 bail!("A field is set that is not available on the selected protocol version");
1047 }
1048 }
1049 if version >= 4 {
1050 let num_tagged_fields = self.unknown_tagged_fields.len();
1051 if num_tagged_fields > std::u32::MAX as usize {
1052 bail!(
1053 "Too many tagged fields to encode ({} fields)",
1054 num_tagged_fields
1055 );
1056 }
1057 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1058
1059 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1060 }
1061 Ok(())
1062 }
1063 fn compute_size(&self, version: i16) -> Result<usize> {
1064 let mut total_size = 0;
1065 if version >= 2 {
1066 if version >= 4 {
1067 total_size += types::CompactString.compute_size(&self.topic_name)?;
1068 } else {
1069 total_size += types::String.compute_size(&self.topic_name)?;
1070 }
1071 } else {
1072 if !self.topic_name.is_empty() {
1073 bail!("A field is set that is not available on the selected protocol version");
1074 }
1075 }
1076 if version >= 5 {
1077 total_size += types::Uuid.compute_size(&self.topic_id)?;
1078 }
1079 if version >= 2 {
1080 if version >= 4 {
1081 total_size += types::CompactArray(types::Struct { version })
1082 .compute_size(&self.partition_states)?;
1083 } else {
1084 total_size +=
1085 types::Array(types::Struct { version }).compute_size(&self.partition_states)?;
1086 }
1087 } else {
1088 if !self.partition_states.is_empty() {
1089 bail!("A field is set that is not available on the selected protocol version");
1090 }
1091 }
1092 if version >= 4 {
1093 let num_tagged_fields = self.unknown_tagged_fields.len();
1094 if num_tagged_fields > std::u32::MAX as usize {
1095 bail!(
1096 "Too many tagged fields to encode ({} fields)",
1097 num_tagged_fields
1098 );
1099 }
1100 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1101
1102 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1103 }
1104 Ok(total_size)
1105 }
1106}
1107
1108#[cfg(feature = "broker")]
1109impl Decodable for LeaderAndIsrTopicState {
1110 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1111 if version < 0 || version > 7 {
1112 bail!("specified version not supported by this message type");
1113 }
1114 let topic_name = if version >= 2 {
1115 if version >= 4 {
1116 types::CompactString.decode(buf)?
1117 } else {
1118 types::String.decode(buf)?
1119 }
1120 } else {
1121 Default::default()
1122 };
1123 let topic_id = if version >= 5 {
1124 types::Uuid.decode(buf)?
1125 } else {
1126 Uuid::nil()
1127 };
1128 let partition_states = if version >= 2 {
1129 if version >= 4 {
1130 types::CompactArray(types::Struct { version }).decode(buf)?
1131 } else {
1132 types::Array(types::Struct { version }).decode(buf)?
1133 }
1134 } else {
1135 Default::default()
1136 };
1137 let mut unknown_tagged_fields = BTreeMap::new();
1138 if version >= 4 {
1139 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1140 for _ in 0..num_tagged_fields {
1141 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1142 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1143 let unknown_value = buf.try_get_bytes(size as usize)?;
1144 unknown_tagged_fields.insert(tag as i32, unknown_value);
1145 }
1146 }
1147 Ok(Self {
1148 topic_name,
1149 topic_id,
1150 partition_states,
1151 unknown_tagged_fields,
1152 })
1153 }
1154}
1155
1156impl Default for LeaderAndIsrTopicState {
1157 fn default() -> Self {
1158 Self {
1159 topic_name: Default::default(),
1160 topic_id: Uuid::nil(),
1161 partition_states: Default::default(),
1162 unknown_tagged_fields: BTreeMap::new(),
1163 }
1164 }
1165}
1166
1167impl Message for LeaderAndIsrTopicState {
1168 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
1169 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1170}
1171
1172impl HeaderVersion for LeaderAndIsrRequest {
1173 fn header_version(version: i16) -> i16 {
1174 if version >= 4 {
1175 2
1176 } else {
1177 1
1178 }
1179 }
1180}