1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct UpdateMetadataBroker {
24 pub id: super::BrokerId,
28
29 pub v0_host: StrBytes,
33
34 pub v0_port: i32,
38
39 pub endpoints: Vec<UpdateMetadataEndpoint>,
43
44 pub rack: Option<StrBytes>,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl UpdateMetadataBroker {
54 pub fn with_id(mut self, value: super::BrokerId) -> Self {
60 self.id = value;
61 self
62 }
63 pub fn with_v0_host(mut self, value: StrBytes) -> Self {
69 self.v0_host = value;
70 self
71 }
72 pub fn with_v0_port(mut self, value: i32) -> Self {
78 self.v0_port = value;
79 self
80 }
81 pub fn with_endpoints(mut self, value: Vec<UpdateMetadataEndpoint>) -> Self {
87 self.endpoints = value;
88 self
89 }
90 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
96 self.rack = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "client")]
112impl Encodable for UpdateMetadataBroker {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 if version < 0 || version > 8 {
115 bail!("specified version not supported by this message type");
116 }
117 types::Int32.encode(buf, &self.id)?;
118 if version == 0 {
119 types::String.encode(buf, &self.v0_host)?;
120 }
121 if version == 0 {
122 types::Int32.encode(buf, &self.v0_port)?;
123 }
124 if version >= 1 {
125 if version >= 6 {
126 types::CompactArray(types::Struct { version }).encode(buf, &self.endpoints)?;
127 } else {
128 types::Array(types::Struct { version }).encode(buf, &self.endpoints)?;
129 }
130 }
131 if version >= 2 {
132 if version >= 6 {
133 types::CompactString.encode(buf, &self.rack)?;
134 } else {
135 types::String.encode(buf, &self.rack)?;
136 }
137 }
138 if version >= 6 {
139 let num_tagged_fields = self.unknown_tagged_fields.len();
140 if num_tagged_fields > std::u32::MAX as usize {
141 bail!(
142 "Too many tagged fields to encode ({} fields)",
143 num_tagged_fields
144 );
145 }
146 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
147
148 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
149 }
150 Ok(())
151 }
152 fn compute_size(&self, version: i16) -> Result<usize> {
153 let mut total_size = 0;
154 total_size += types::Int32.compute_size(&self.id)?;
155 if version == 0 {
156 total_size += types::String.compute_size(&self.v0_host)?;
157 }
158 if version == 0 {
159 total_size += types::Int32.compute_size(&self.v0_port)?;
160 }
161 if version >= 1 {
162 if version >= 6 {
163 total_size +=
164 types::CompactArray(types::Struct { version }).compute_size(&self.endpoints)?;
165 } else {
166 total_size +=
167 types::Array(types::Struct { version }).compute_size(&self.endpoints)?;
168 }
169 }
170 if version >= 2 {
171 if version >= 6 {
172 total_size += types::CompactString.compute_size(&self.rack)?;
173 } else {
174 total_size += types::String.compute_size(&self.rack)?;
175 }
176 }
177 if version >= 6 {
178 let num_tagged_fields = self.unknown_tagged_fields.len();
179 if num_tagged_fields > std::u32::MAX as usize {
180 bail!(
181 "Too many tagged fields to encode ({} fields)",
182 num_tagged_fields
183 );
184 }
185 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
186
187 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
188 }
189 Ok(total_size)
190 }
191}
192
193#[cfg(feature = "broker")]
194impl Decodable for UpdateMetadataBroker {
195 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
196 if version < 0 || version > 8 {
197 bail!("specified version not supported by this message type");
198 }
199 let id = types::Int32.decode(buf)?;
200 let v0_host = if version == 0 {
201 types::String.decode(buf)?
202 } else {
203 Default::default()
204 };
205 let v0_port = if version == 0 {
206 types::Int32.decode(buf)?
207 } else {
208 0
209 };
210 let endpoints = if version >= 1 {
211 if version >= 6 {
212 types::CompactArray(types::Struct { version }).decode(buf)?
213 } else {
214 types::Array(types::Struct { version }).decode(buf)?
215 }
216 } else {
217 Default::default()
218 };
219 let rack = if version >= 2 {
220 if version >= 6 {
221 types::CompactString.decode(buf)?
222 } else {
223 types::String.decode(buf)?
224 }
225 } else {
226 Some(Default::default())
227 };
228 let mut unknown_tagged_fields = BTreeMap::new();
229 if version >= 6 {
230 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
231 for _ in 0..num_tagged_fields {
232 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
233 let size: u32 = types::UnsignedVarInt.decode(buf)?;
234 let unknown_value = buf.try_get_bytes(size as usize)?;
235 unknown_tagged_fields.insert(tag as i32, unknown_value);
236 }
237 }
238 Ok(Self {
239 id,
240 v0_host,
241 v0_port,
242 endpoints,
243 rack,
244 unknown_tagged_fields,
245 })
246 }
247}
248
249impl Default for UpdateMetadataBroker {
250 fn default() -> Self {
251 Self {
252 id: (0).into(),
253 v0_host: Default::default(),
254 v0_port: 0,
255 endpoints: Default::default(),
256 rack: Some(Default::default()),
257 unknown_tagged_fields: BTreeMap::new(),
258 }
259 }
260}
261
262impl Message for UpdateMetadataBroker {
263 const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
264 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
265}
266
267#[non_exhaustive]
269#[derive(Debug, Clone, PartialEq)]
270pub struct UpdateMetadataEndpoint {
271 pub port: i32,
275
276 pub host: StrBytes,
280
281 pub listener: StrBytes,
285
286 pub security_protocol: i16,
290
291 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
293}
294
295impl UpdateMetadataEndpoint {
296 pub fn with_port(mut self, value: i32) -> Self {
302 self.port = value;
303 self
304 }
305 pub fn with_host(mut self, value: StrBytes) -> Self {
311 self.host = value;
312 self
313 }
314 pub fn with_listener(mut self, value: StrBytes) -> Self {
320 self.listener = value;
321 self
322 }
323 pub fn with_security_protocol(mut self, value: i16) -> Self {
329 self.security_protocol = value;
330 self
331 }
332 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
334 self.unknown_tagged_fields = value;
335 self
336 }
337 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
339 self.unknown_tagged_fields.insert(key, value);
340 self
341 }
342}
343
344#[cfg(feature = "client")]
345impl Encodable for UpdateMetadataEndpoint {
346 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
347 if version < 0 || version > 8 {
348 bail!("specified version not supported by this message type");
349 }
350 if version >= 1 {
351 types::Int32.encode(buf, &self.port)?;
352 } else {
353 if self.port != 0 {
354 bail!("A field is set that is not available on the selected protocol version");
355 }
356 }
357 if version >= 1 {
358 if version >= 6 {
359 types::CompactString.encode(buf, &self.host)?;
360 } else {
361 types::String.encode(buf, &self.host)?;
362 }
363 } else {
364 if !self.host.is_empty() {
365 bail!("A field is set that is not available on the selected protocol version");
366 }
367 }
368 if version >= 3 {
369 if version >= 6 {
370 types::CompactString.encode(buf, &self.listener)?;
371 } else {
372 types::String.encode(buf, &self.listener)?;
373 }
374 }
375 if version >= 1 {
376 types::Int16.encode(buf, &self.security_protocol)?;
377 } else {
378 if self.security_protocol != 0 {
379 bail!("A field is set that is not available on the selected protocol version");
380 }
381 }
382 if version >= 6 {
383 let num_tagged_fields = self.unknown_tagged_fields.len();
384 if num_tagged_fields > std::u32::MAX as usize {
385 bail!(
386 "Too many tagged fields to encode ({} fields)",
387 num_tagged_fields
388 );
389 }
390 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
391
392 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
393 }
394 Ok(())
395 }
396 fn compute_size(&self, version: i16) -> Result<usize> {
397 let mut total_size = 0;
398 if version >= 1 {
399 total_size += types::Int32.compute_size(&self.port)?;
400 } else {
401 if self.port != 0 {
402 bail!("A field is set that is not available on the selected protocol version");
403 }
404 }
405 if version >= 1 {
406 if version >= 6 {
407 total_size += types::CompactString.compute_size(&self.host)?;
408 } else {
409 total_size += types::String.compute_size(&self.host)?;
410 }
411 } else {
412 if !self.host.is_empty() {
413 bail!("A field is set that is not available on the selected protocol version");
414 }
415 }
416 if version >= 3 {
417 if version >= 6 {
418 total_size += types::CompactString.compute_size(&self.listener)?;
419 } else {
420 total_size += types::String.compute_size(&self.listener)?;
421 }
422 }
423 if version >= 1 {
424 total_size += types::Int16.compute_size(&self.security_protocol)?;
425 } else {
426 if self.security_protocol != 0 {
427 bail!("A field is set that is not available on the selected protocol version");
428 }
429 }
430 if version >= 6 {
431 let num_tagged_fields = self.unknown_tagged_fields.len();
432 if num_tagged_fields > std::u32::MAX as usize {
433 bail!(
434 "Too many tagged fields to encode ({} fields)",
435 num_tagged_fields
436 );
437 }
438 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
439
440 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
441 }
442 Ok(total_size)
443 }
444}
445
446#[cfg(feature = "broker")]
447impl Decodable for UpdateMetadataEndpoint {
448 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
449 if version < 0 || version > 8 {
450 bail!("specified version not supported by this message type");
451 }
452 let port = if version >= 1 {
453 types::Int32.decode(buf)?
454 } else {
455 0
456 };
457 let host = if version >= 1 {
458 if version >= 6 {
459 types::CompactString.decode(buf)?
460 } else {
461 types::String.decode(buf)?
462 }
463 } else {
464 Default::default()
465 };
466 let listener = if version >= 3 {
467 if version >= 6 {
468 types::CompactString.decode(buf)?
469 } else {
470 types::String.decode(buf)?
471 }
472 } else {
473 Default::default()
474 };
475 let security_protocol = if version >= 1 {
476 types::Int16.decode(buf)?
477 } else {
478 0
479 };
480 let mut unknown_tagged_fields = BTreeMap::new();
481 if version >= 6 {
482 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
483 for _ in 0..num_tagged_fields {
484 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
485 let size: u32 = types::UnsignedVarInt.decode(buf)?;
486 let unknown_value = buf.try_get_bytes(size as usize)?;
487 unknown_tagged_fields.insert(tag as i32, unknown_value);
488 }
489 }
490 Ok(Self {
491 port,
492 host,
493 listener,
494 security_protocol,
495 unknown_tagged_fields,
496 })
497 }
498}
499
500impl Default for UpdateMetadataEndpoint {
501 fn default() -> Self {
502 Self {
503 port: 0,
504 host: Default::default(),
505 listener: Default::default(),
506 security_protocol: 0,
507 unknown_tagged_fields: BTreeMap::new(),
508 }
509 }
510}
511
512impl Message for UpdateMetadataEndpoint {
513 const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
514 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
515}
516
517#[non_exhaustive]
519#[derive(Debug, Clone, PartialEq)]
520pub struct UpdateMetadataPartitionState {
521 pub topic_name: super::TopicName,
525
526 pub partition_index: i32,
530
531 pub controller_epoch: i32,
535
536 pub leader: super::BrokerId,
540
541 pub leader_epoch: i32,
545
546 pub isr: Vec<super::BrokerId>,
550
551 pub zk_version: i32,
555
556 pub replicas: Vec<super::BrokerId>,
560
561 pub offline_replicas: Vec<super::BrokerId>,
565
566 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
568}
569
570impl UpdateMetadataPartitionState {
571 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
577 self.topic_name = value;
578 self
579 }
580 pub fn with_partition_index(mut self, value: i32) -> Self {
586 self.partition_index = value;
587 self
588 }
589 pub fn with_controller_epoch(mut self, value: i32) -> Self {
595 self.controller_epoch = value;
596 self
597 }
598 pub fn with_leader(mut self, value: super::BrokerId) -> Self {
604 self.leader = value;
605 self
606 }
607 pub fn with_leader_epoch(mut self, value: i32) -> Self {
613 self.leader_epoch = value;
614 self
615 }
616 pub fn with_isr(mut self, value: Vec<super::BrokerId>) -> Self {
622 self.isr = value;
623 self
624 }
625 pub fn with_zk_version(mut self, value: i32) -> Self {
631 self.zk_version = value;
632 self
633 }
634 pub fn with_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
640 self.replicas = value;
641 self
642 }
643 pub fn with_offline_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
649 self.offline_replicas = value;
650 self
651 }
652 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
654 self.unknown_tagged_fields = value;
655 self
656 }
657 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
659 self.unknown_tagged_fields.insert(key, value);
660 self
661 }
662}
663
664#[cfg(feature = "client")]
665impl Encodable for UpdateMetadataPartitionState {
666 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
667 if version < 0 || version > 8 {
668 bail!("specified version not supported by this message type");
669 }
670 if version <= 4 {
671 types::String.encode(buf, &self.topic_name)?;
672 }
673 types::Int32.encode(buf, &self.partition_index)?;
674 types::Int32.encode(buf, &self.controller_epoch)?;
675 types::Int32.encode(buf, &self.leader)?;
676 types::Int32.encode(buf, &self.leader_epoch)?;
677 if version >= 6 {
678 types::CompactArray(types::Int32).encode(buf, &self.isr)?;
679 } else {
680 types::Array(types::Int32).encode(buf, &self.isr)?;
681 }
682 types::Int32.encode(buf, &self.zk_version)?;
683 if version >= 6 {
684 types::CompactArray(types::Int32).encode(buf, &self.replicas)?;
685 } else {
686 types::Array(types::Int32).encode(buf, &self.replicas)?;
687 }
688 if version >= 4 {
689 if version >= 6 {
690 types::CompactArray(types::Int32).encode(buf, &self.offline_replicas)?;
691 } else {
692 types::Array(types::Int32).encode(buf, &self.offline_replicas)?;
693 }
694 }
695 if version >= 6 {
696 let num_tagged_fields = self.unknown_tagged_fields.len();
697 if num_tagged_fields > std::u32::MAX as usize {
698 bail!(
699 "Too many tagged fields to encode ({} fields)",
700 num_tagged_fields
701 );
702 }
703 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
704
705 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
706 }
707 Ok(())
708 }
709 fn compute_size(&self, version: i16) -> Result<usize> {
710 let mut total_size = 0;
711 if version <= 4 {
712 total_size += types::String.compute_size(&self.topic_name)?;
713 }
714 total_size += types::Int32.compute_size(&self.partition_index)?;
715 total_size += types::Int32.compute_size(&self.controller_epoch)?;
716 total_size += types::Int32.compute_size(&self.leader)?;
717 total_size += types::Int32.compute_size(&self.leader_epoch)?;
718 if version >= 6 {
719 total_size += types::CompactArray(types::Int32).compute_size(&self.isr)?;
720 } else {
721 total_size += types::Array(types::Int32).compute_size(&self.isr)?;
722 }
723 total_size += types::Int32.compute_size(&self.zk_version)?;
724 if version >= 6 {
725 total_size += types::CompactArray(types::Int32).compute_size(&self.replicas)?;
726 } else {
727 total_size += types::Array(types::Int32).compute_size(&self.replicas)?;
728 }
729 if version >= 4 {
730 if version >= 6 {
731 total_size +=
732 types::CompactArray(types::Int32).compute_size(&self.offline_replicas)?;
733 } else {
734 total_size += types::Array(types::Int32).compute_size(&self.offline_replicas)?;
735 }
736 }
737 if version >= 6 {
738 let num_tagged_fields = self.unknown_tagged_fields.len();
739 if num_tagged_fields > std::u32::MAX as usize {
740 bail!(
741 "Too many tagged fields to encode ({} fields)",
742 num_tagged_fields
743 );
744 }
745 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
746
747 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
748 }
749 Ok(total_size)
750 }
751}
752
753#[cfg(feature = "broker")]
754impl Decodable for UpdateMetadataPartitionState {
755 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
756 if version < 0 || version > 8 {
757 bail!("specified version not supported by this message type");
758 }
759 let topic_name = if version <= 4 {
760 types::String.decode(buf)?
761 } else {
762 Default::default()
763 };
764 let partition_index = types::Int32.decode(buf)?;
765 let controller_epoch = types::Int32.decode(buf)?;
766 let leader = types::Int32.decode(buf)?;
767 let leader_epoch = types::Int32.decode(buf)?;
768 let isr = if version >= 6 {
769 types::CompactArray(types::Int32).decode(buf)?
770 } else {
771 types::Array(types::Int32).decode(buf)?
772 };
773 let zk_version = types::Int32.decode(buf)?;
774 let replicas = if version >= 6 {
775 types::CompactArray(types::Int32).decode(buf)?
776 } else {
777 types::Array(types::Int32).decode(buf)?
778 };
779 let offline_replicas = if version >= 4 {
780 if version >= 6 {
781 types::CompactArray(types::Int32).decode(buf)?
782 } else {
783 types::Array(types::Int32).decode(buf)?
784 }
785 } else {
786 Default::default()
787 };
788 let mut unknown_tagged_fields = BTreeMap::new();
789 if version >= 6 {
790 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
791 for _ in 0..num_tagged_fields {
792 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
793 let size: u32 = types::UnsignedVarInt.decode(buf)?;
794 let unknown_value = buf.try_get_bytes(size as usize)?;
795 unknown_tagged_fields.insert(tag as i32, unknown_value);
796 }
797 }
798 Ok(Self {
799 topic_name,
800 partition_index,
801 controller_epoch,
802 leader,
803 leader_epoch,
804 isr,
805 zk_version,
806 replicas,
807 offline_replicas,
808 unknown_tagged_fields,
809 })
810 }
811}
812
813impl Default for UpdateMetadataPartitionState {
814 fn default() -> Self {
815 Self {
816 topic_name: Default::default(),
817 partition_index: 0,
818 controller_epoch: 0,
819 leader: (0).into(),
820 leader_epoch: 0,
821 isr: Default::default(),
822 zk_version: 0,
823 replicas: Default::default(),
824 offline_replicas: Default::default(),
825 unknown_tagged_fields: BTreeMap::new(),
826 }
827 }
828}
829
830impl Message for UpdateMetadataPartitionState {
831 const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
832 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
833}
834
835#[non_exhaustive]
837#[derive(Debug, Clone, PartialEq)]
838pub struct UpdateMetadataRequest {
839 pub controller_id: super::BrokerId,
843
844 pub is_k_raft_controller: bool,
848
849 pub _type: i8,
853
854 pub controller_epoch: i32,
858
859 pub broker_epoch: i64,
863
864 pub ungrouped_partition_states: Vec<UpdateMetadataPartitionState>,
868
869 pub topic_states: Vec<UpdateMetadataTopicState>,
873
874 pub live_brokers: Vec<UpdateMetadataBroker>,
878
879 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
881}
882
883impl UpdateMetadataRequest {
884 pub fn with_controller_id(mut self, value: super::BrokerId) -> Self {
890 self.controller_id = value;
891 self
892 }
893 pub fn with_is_k_raft_controller(mut self, value: bool) -> Self {
899 self.is_k_raft_controller = value;
900 self
901 }
902 pub fn with_type(mut self, value: i8) -> Self {
908 self._type = value;
909 self
910 }
911 pub fn with_controller_epoch(mut self, value: i32) -> Self {
917 self.controller_epoch = value;
918 self
919 }
920 pub fn with_broker_epoch(mut self, value: i64) -> Self {
926 self.broker_epoch = value;
927 self
928 }
929 pub fn with_ungrouped_partition_states(
935 mut self,
936 value: Vec<UpdateMetadataPartitionState>,
937 ) -> Self {
938 self.ungrouped_partition_states = value;
939 self
940 }
941 pub fn with_topic_states(mut self, value: Vec<UpdateMetadataTopicState>) -> Self {
947 self.topic_states = value;
948 self
949 }
950 pub fn with_live_brokers(mut self, value: Vec<UpdateMetadataBroker>) -> Self {
956 self.live_brokers = value;
957 self
958 }
959 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
961 self.unknown_tagged_fields = value;
962 self
963 }
964 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
966 self.unknown_tagged_fields.insert(key, value);
967 self
968 }
969}
970
971#[cfg(feature = "client")]
972impl Encodable for UpdateMetadataRequest {
973 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
974 if version < 0 || version > 8 {
975 bail!("specified version not supported by this message type");
976 }
977 types::Int32.encode(buf, &self.controller_id)?;
978 if version >= 8 {
979 types::Boolean.encode(buf, &self.is_k_raft_controller)?;
980 } else {
981 if self.is_k_raft_controller {
982 bail!("A field is set that is not available on the selected protocol version");
983 }
984 }
985 types::Int32.encode(buf, &self.controller_epoch)?;
986 if version >= 5 {
987 types::Int64.encode(buf, &self.broker_epoch)?;
988 }
989 if version <= 4 {
990 types::Array(types::Struct { version })
991 .encode(buf, &self.ungrouped_partition_states)?;
992 } else {
993 if !self.ungrouped_partition_states.is_empty() {
994 bail!("A field is set that is not available on the selected protocol version");
995 }
996 }
997 if version >= 5 {
998 if version >= 6 {
999 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_states)?;
1000 } else {
1001 types::Array(types::Struct { version }).encode(buf, &self.topic_states)?;
1002 }
1003 } else {
1004 if !self.topic_states.is_empty() {
1005 bail!("A field is set that is not available on the selected protocol version");
1006 }
1007 }
1008 if version >= 6 {
1009 types::CompactArray(types::Struct { version }).encode(buf, &self.live_brokers)?;
1010 } else {
1011 types::Array(types::Struct { version }).encode(buf, &self.live_brokers)?;
1012 }
1013 if version >= 6 {
1014 let mut num_tagged_fields = self.unknown_tagged_fields.len();
1015 if version >= 8 {
1016 if self._type != 0 {
1017 num_tagged_fields += 1;
1018 }
1019 }
1020 if num_tagged_fields > std::u32::MAX as usize {
1021 bail!(
1022 "Too many tagged fields to encode ({} fields)",
1023 num_tagged_fields
1024 );
1025 }
1026 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1027 if version >= 8 {
1028 if self._type != 0 {
1029 let computed_size = types::Int8.compute_size(&self._type)?;
1030 if computed_size > std::u32::MAX as usize {
1031 bail!(
1032 "Tagged field is too large to encode ({} bytes)",
1033 computed_size
1034 );
1035 }
1036 types::UnsignedVarInt.encode(buf, 0)?;
1037 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1038 types::Int8.encode(buf, &self._type)?;
1039 }
1040 }
1041 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
1042 }
1043 Ok(())
1044 }
1045 fn compute_size(&self, version: i16) -> Result<usize> {
1046 let mut total_size = 0;
1047 total_size += types::Int32.compute_size(&self.controller_id)?;
1048 if version >= 8 {
1049 total_size += types::Boolean.compute_size(&self.is_k_raft_controller)?;
1050 } else {
1051 if self.is_k_raft_controller {
1052 bail!("A field is set that is not available on the selected protocol version");
1053 }
1054 }
1055 total_size += types::Int32.compute_size(&self.controller_epoch)?;
1056 if version >= 5 {
1057 total_size += types::Int64.compute_size(&self.broker_epoch)?;
1058 }
1059 if version <= 4 {
1060 total_size += types::Array(types::Struct { version })
1061 .compute_size(&self.ungrouped_partition_states)?;
1062 } else {
1063 if !self.ungrouped_partition_states.is_empty() {
1064 bail!("A field is set that is not available on the selected protocol version");
1065 }
1066 }
1067 if version >= 5 {
1068 if version >= 6 {
1069 total_size += types::CompactArray(types::Struct { version })
1070 .compute_size(&self.topic_states)?;
1071 } else {
1072 total_size +=
1073 types::Array(types::Struct { version }).compute_size(&self.topic_states)?;
1074 }
1075 } else {
1076 if !self.topic_states.is_empty() {
1077 bail!("A field is set that is not available on the selected protocol version");
1078 }
1079 }
1080 if version >= 6 {
1081 total_size +=
1082 types::CompactArray(types::Struct { version }).compute_size(&self.live_brokers)?;
1083 } else {
1084 total_size +=
1085 types::Array(types::Struct { version }).compute_size(&self.live_brokers)?;
1086 }
1087 if version >= 6 {
1088 let mut num_tagged_fields = self.unknown_tagged_fields.len();
1089 if version >= 8 {
1090 if self._type != 0 {
1091 num_tagged_fields += 1;
1092 }
1093 }
1094 if num_tagged_fields > std::u32::MAX as usize {
1095 bail!(
1096 "Too many tagged fields to encode ({} fields)",
1097 num_tagged_fields
1098 );
1099 }
1100 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1101 if version >= 8 {
1102 if self._type != 0 {
1103 let computed_size = types::Int8.compute_size(&self._type)?;
1104 if computed_size > std::u32::MAX as usize {
1105 bail!(
1106 "Tagged field is too large to encode ({} bytes)",
1107 computed_size
1108 );
1109 }
1110 total_size += types::UnsignedVarInt.compute_size(0)?;
1111 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1112 total_size += computed_size;
1113 }
1114 }
1115 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1116 }
1117 Ok(total_size)
1118 }
1119}
1120
1121#[cfg(feature = "broker")]
1122impl Decodable for UpdateMetadataRequest {
1123 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1124 if version < 0 || version > 8 {
1125 bail!("specified version not supported by this message type");
1126 }
1127 let controller_id = types::Int32.decode(buf)?;
1128 let is_k_raft_controller = if version >= 8 {
1129 types::Boolean.decode(buf)?
1130 } else {
1131 false
1132 };
1133 let mut _type = 0;
1134 let controller_epoch = types::Int32.decode(buf)?;
1135 let broker_epoch = if version >= 5 {
1136 types::Int64.decode(buf)?
1137 } else {
1138 -1
1139 };
1140 let ungrouped_partition_states = if version <= 4 {
1141 types::Array(types::Struct { version }).decode(buf)?
1142 } else {
1143 Default::default()
1144 };
1145 let topic_states = if version >= 5 {
1146 if version >= 6 {
1147 types::CompactArray(types::Struct { version }).decode(buf)?
1148 } else {
1149 types::Array(types::Struct { version }).decode(buf)?
1150 }
1151 } else {
1152 Default::default()
1153 };
1154 let live_brokers = if version >= 6 {
1155 types::CompactArray(types::Struct { version }).decode(buf)?
1156 } else {
1157 types::Array(types::Struct { version }).decode(buf)?
1158 };
1159 let mut unknown_tagged_fields = BTreeMap::new();
1160 if version >= 6 {
1161 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1162 for _ in 0..num_tagged_fields {
1163 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1164 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1165 match tag {
1166 0 => {
1167 if version >= 8 {
1168 _type = types::Int8.decode(buf)?;
1169 } else {
1170 bail!("Tag {} is not valid for version {}", tag, version);
1171 }
1172 }
1173 _ => {
1174 let unknown_value = buf.try_get_bytes(size as usize)?;
1175 unknown_tagged_fields.insert(tag as i32, unknown_value);
1176 }
1177 }
1178 }
1179 }
1180 Ok(Self {
1181 controller_id,
1182 is_k_raft_controller,
1183 _type,
1184 controller_epoch,
1185 broker_epoch,
1186 ungrouped_partition_states,
1187 topic_states,
1188 live_brokers,
1189 unknown_tagged_fields,
1190 })
1191 }
1192}
1193
1194impl Default for UpdateMetadataRequest {
1195 fn default() -> Self {
1196 Self {
1197 controller_id: (0).into(),
1198 is_k_raft_controller: false,
1199 _type: 0,
1200 controller_epoch: 0,
1201 broker_epoch: -1,
1202 ungrouped_partition_states: Default::default(),
1203 topic_states: Default::default(),
1204 live_brokers: Default::default(),
1205 unknown_tagged_fields: BTreeMap::new(),
1206 }
1207 }
1208}
1209
1210impl Message for UpdateMetadataRequest {
1211 const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
1212 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1213}
1214
1215#[non_exhaustive]
1217#[derive(Debug, Clone, PartialEq)]
1218pub struct UpdateMetadataTopicState {
1219 pub topic_name: super::TopicName,
1223
1224 pub topic_id: Uuid,
1228
1229 pub partition_states: Vec<UpdateMetadataPartitionState>,
1233
1234 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1236}
1237
1238impl UpdateMetadataTopicState {
1239 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
1245 self.topic_name = value;
1246 self
1247 }
1248 pub fn with_topic_id(mut self, value: Uuid) -> Self {
1254 self.topic_id = value;
1255 self
1256 }
1257 pub fn with_partition_states(mut self, value: Vec<UpdateMetadataPartitionState>) -> Self {
1263 self.partition_states = value;
1264 self
1265 }
1266 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1268 self.unknown_tagged_fields = value;
1269 self
1270 }
1271 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1273 self.unknown_tagged_fields.insert(key, value);
1274 self
1275 }
1276}
1277
1278#[cfg(feature = "client")]
1279impl Encodable for UpdateMetadataTopicState {
1280 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1281 if version < 0 || version > 8 {
1282 bail!("specified version not supported by this message type");
1283 }
1284 if version >= 5 {
1285 if version >= 6 {
1286 types::CompactString.encode(buf, &self.topic_name)?;
1287 } else {
1288 types::String.encode(buf, &self.topic_name)?;
1289 }
1290 } else {
1291 if !self.topic_name.is_empty() {
1292 bail!("A field is set that is not available on the selected protocol version");
1293 }
1294 }
1295 if version >= 7 {
1296 types::Uuid.encode(buf, &self.topic_id)?;
1297 }
1298 if version >= 5 {
1299 if version >= 6 {
1300 types::CompactArray(types::Struct { version })
1301 .encode(buf, &self.partition_states)?;
1302 } else {
1303 types::Array(types::Struct { version }).encode(buf, &self.partition_states)?;
1304 }
1305 } else {
1306 if !self.partition_states.is_empty() {
1307 bail!("A field is set that is not available on the selected protocol version");
1308 }
1309 }
1310 if version >= 6 {
1311 let num_tagged_fields = self.unknown_tagged_fields.len();
1312 if num_tagged_fields > std::u32::MAX as usize {
1313 bail!(
1314 "Too many tagged fields to encode ({} fields)",
1315 num_tagged_fields
1316 );
1317 }
1318 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1319
1320 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1321 }
1322 Ok(())
1323 }
1324 fn compute_size(&self, version: i16) -> Result<usize> {
1325 let mut total_size = 0;
1326 if version >= 5 {
1327 if version >= 6 {
1328 total_size += types::CompactString.compute_size(&self.topic_name)?;
1329 } else {
1330 total_size += types::String.compute_size(&self.topic_name)?;
1331 }
1332 } else {
1333 if !self.topic_name.is_empty() {
1334 bail!("A field is set that is not available on the selected protocol version");
1335 }
1336 }
1337 if version >= 7 {
1338 total_size += types::Uuid.compute_size(&self.topic_id)?;
1339 }
1340 if version >= 5 {
1341 if version >= 6 {
1342 total_size += types::CompactArray(types::Struct { version })
1343 .compute_size(&self.partition_states)?;
1344 } else {
1345 total_size +=
1346 types::Array(types::Struct { version }).compute_size(&self.partition_states)?;
1347 }
1348 } else {
1349 if !self.partition_states.is_empty() {
1350 bail!("A field is set that is not available on the selected protocol version");
1351 }
1352 }
1353 if version >= 6 {
1354 let num_tagged_fields = self.unknown_tagged_fields.len();
1355 if num_tagged_fields > std::u32::MAX as usize {
1356 bail!(
1357 "Too many tagged fields to encode ({} fields)",
1358 num_tagged_fields
1359 );
1360 }
1361 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1362
1363 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1364 }
1365 Ok(total_size)
1366 }
1367}
1368
1369#[cfg(feature = "broker")]
1370impl Decodable for UpdateMetadataTopicState {
1371 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1372 if version < 0 || version > 8 {
1373 bail!("specified version not supported by this message type");
1374 }
1375 let topic_name = if version >= 5 {
1376 if version >= 6 {
1377 types::CompactString.decode(buf)?
1378 } else {
1379 types::String.decode(buf)?
1380 }
1381 } else {
1382 Default::default()
1383 };
1384 let topic_id = if version >= 7 {
1385 types::Uuid.decode(buf)?
1386 } else {
1387 Uuid::nil()
1388 };
1389 let partition_states = if version >= 5 {
1390 if version >= 6 {
1391 types::CompactArray(types::Struct { version }).decode(buf)?
1392 } else {
1393 types::Array(types::Struct { version }).decode(buf)?
1394 }
1395 } else {
1396 Default::default()
1397 };
1398 let mut unknown_tagged_fields = BTreeMap::new();
1399 if version >= 6 {
1400 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1401 for _ in 0..num_tagged_fields {
1402 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1403 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1404 let unknown_value = buf.try_get_bytes(size as usize)?;
1405 unknown_tagged_fields.insert(tag as i32, unknown_value);
1406 }
1407 }
1408 Ok(Self {
1409 topic_name,
1410 topic_id,
1411 partition_states,
1412 unknown_tagged_fields,
1413 })
1414 }
1415}
1416
1417impl Default for UpdateMetadataTopicState {
1418 fn default() -> Self {
1419 Self {
1420 topic_name: Default::default(),
1421 topic_id: Uuid::nil(),
1422 partition_states: Default::default(),
1423 unknown_tagged_fields: BTreeMap::new(),
1424 }
1425 }
1426}
1427
1428impl Message for UpdateMetadataTopicState {
1429 const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
1430 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1431}
1432
1433impl HeaderVersion for UpdateMetadataRequest {
1434 fn header_version(version: i16) -> i16 {
1435 if version >= 6 {
1436 2
1437 } else {
1438 1
1439 }
1440 }
1441}