1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct UpdateMetadataBroker {
24 pub id: super::BrokerId,
28
29 pub v0_host: StrBytes,
33
34 pub v0_port: i32,
38
39 pub endpoints: Vec<UpdateMetadataEndpoint>,
43
44 pub rack: Option<StrBytes>,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl UpdateMetadataBroker {
54 pub fn with_id(mut self, value: super::BrokerId) -> Self {
60 self.id = value;
61 self
62 }
63 pub fn with_v0_host(mut self, value: StrBytes) -> Self {
69 self.v0_host = value;
70 self
71 }
72 pub fn with_v0_port(mut self, value: i32) -> Self {
78 self.v0_port = value;
79 self
80 }
81 pub fn with_endpoints(mut self, value: Vec<UpdateMetadataEndpoint>) -> Self {
87 self.endpoints = value;
88 self
89 }
90 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
96 self.rack = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "client")]
112impl Encodable for UpdateMetadataBroker {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 types::Int32.encode(buf, &self.id)?;
115 if version == 0 {
116 types::String.encode(buf, &self.v0_host)?;
117 }
118 if version == 0 {
119 types::Int32.encode(buf, &self.v0_port)?;
120 }
121 if version >= 1 {
122 if version >= 6 {
123 types::CompactArray(types::Struct { version }).encode(buf, &self.endpoints)?;
124 } else {
125 types::Array(types::Struct { version }).encode(buf, &self.endpoints)?;
126 }
127 }
128 if version >= 2 {
129 if version >= 6 {
130 types::CompactString.encode(buf, &self.rack)?;
131 } else {
132 types::String.encode(buf, &self.rack)?;
133 }
134 }
135 if version >= 6 {
136 let num_tagged_fields = self.unknown_tagged_fields.len();
137 if num_tagged_fields > std::u32::MAX as usize {
138 bail!(
139 "Too many tagged fields to encode ({} fields)",
140 num_tagged_fields
141 );
142 }
143 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
144
145 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
146 }
147 Ok(())
148 }
149 fn compute_size(&self, version: i16) -> Result<usize> {
150 let mut total_size = 0;
151 total_size += types::Int32.compute_size(&self.id)?;
152 if version == 0 {
153 total_size += types::String.compute_size(&self.v0_host)?;
154 }
155 if version == 0 {
156 total_size += types::Int32.compute_size(&self.v0_port)?;
157 }
158 if version >= 1 {
159 if version >= 6 {
160 total_size +=
161 types::CompactArray(types::Struct { version }).compute_size(&self.endpoints)?;
162 } else {
163 total_size +=
164 types::Array(types::Struct { version }).compute_size(&self.endpoints)?;
165 }
166 }
167 if version >= 2 {
168 if version >= 6 {
169 total_size += types::CompactString.compute_size(&self.rack)?;
170 } else {
171 total_size += types::String.compute_size(&self.rack)?;
172 }
173 }
174 if version >= 6 {
175 let num_tagged_fields = self.unknown_tagged_fields.len();
176 if num_tagged_fields > std::u32::MAX as usize {
177 bail!(
178 "Too many tagged fields to encode ({} fields)",
179 num_tagged_fields
180 );
181 }
182 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
183
184 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
185 }
186 Ok(total_size)
187 }
188}
189
190#[cfg(feature = "broker")]
191impl Decodable for UpdateMetadataBroker {
192 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
193 let id = types::Int32.decode(buf)?;
194 let v0_host = if version == 0 {
195 types::String.decode(buf)?
196 } else {
197 Default::default()
198 };
199 let v0_port = if version == 0 {
200 types::Int32.decode(buf)?
201 } else {
202 0
203 };
204 let endpoints = if version >= 1 {
205 if version >= 6 {
206 types::CompactArray(types::Struct { version }).decode(buf)?
207 } else {
208 types::Array(types::Struct { version }).decode(buf)?
209 }
210 } else {
211 Default::default()
212 };
213 let rack = if version >= 2 {
214 if version >= 6 {
215 types::CompactString.decode(buf)?
216 } else {
217 types::String.decode(buf)?
218 }
219 } else {
220 Some(Default::default())
221 };
222 let mut unknown_tagged_fields = BTreeMap::new();
223 if version >= 6 {
224 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
225 for _ in 0..num_tagged_fields {
226 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
227 let size: u32 = types::UnsignedVarInt.decode(buf)?;
228 let unknown_value = buf.try_get_bytes(size as usize)?;
229 unknown_tagged_fields.insert(tag as i32, unknown_value);
230 }
231 }
232 Ok(Self {
233 id,
234 v0_host,
235 v0_port,
236 endpoints,
237 rack,
238 unknown_tagged_fields,
239 })
240 }
241}
242
243impl Default for UpdateMetadataBroker {
244 fn default() -> Self {
245 Self {
246 id: (0).into(),
247 v0_host: Default::default(),
248 v0_port: 0,
249 endpoints: Default::default(),
250 rack: Some(Default::default()),
251 unknown_tagged_fields: BTreeMap::new(),
252 }
253 }
254}
255
256impl Message for UpdateMetadataBroker {
257 const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
258 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
259}
260
261#[non_exhaustive]
263#[derive(Debug, Clone, PartialEq)]
264pub struct UpdateMetadataEndpoint {
265 pub port: i32,
269
270 pub host: StrBytes,
274
275 pub listener: StrBytes,
279
280 pub security_protocol: i16,
284
285 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
287}
288
289impl UpdateMetadataEndpoint {
290 pub fn with_port(mut self, value: i32) -> Self {
296 self.port = value;
297 self
298 }
299 pub fn with_host(mut self, value: StrBytes) -> Self {
305 self.host = value;
306 self
307 }
308 pub fn with_listener(mut self, value: StrBytes) -> Self {
314 self.listener = value;
315 self
316 }
317 pub fn with_security_protocol(mut self, value: i16) -> Self {
323 self.security_protocol = value;
324 self
325 }
326 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
328 self.unknown_tagged_fields = value;
329 self
330 }
331 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
333 self.unknown_tagged_fields.insert(key, value);
334 self
335 }
336}
337
338#[cfg(feature = "client")]
339impl Encodable for UpdateMetadataEndpoint {
340 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
341 if version >= 1 {
342 types::Int32.encode(buf, &self.port)?;
343 } else {
344 if self.port != 0 {
345 bail!("A field is set that is not available on the selected protocol version");
346 }
347 }
348 if version >= 1 {
349 if version >= 6 {
350 types::CompactString.encode(buf, &self.host)?;
351 } else {
352 types::String.encode(buf, &self.host)?;
353 }
354 } else {
355 if !self.host.is_empty() {
356 bail!("A field is set that is not available on the selected protocol version");
357 }
358 }
359 if version >= 3 {
360 if version >= 6 {
361 types::CompactString.encode(buf, &self.listener)?;
362 } else {
363 types::String.encode(buf, &self.listener)?;
364 }
365 }
366 if version >= 1 {
367 types::Int16.encode(buf, &self.security_protocol)?;
368 } else {
369 if self.security_protocol != 0 {
370 bail!("A field is set that is not available on the selected protocol version");
371 }
372 }
373 if version >= 6 {
374 let num_tagged_fields = self.unknown_tagged_fields.len();
375 if num_tagged_fields > std::u32::MAX as usize {
376 bail!(
377 "Too many tagged fields to encode ({} fields)",
378 num_tagged_fields
379 );
380 }
381 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
382
383 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
384 }
385 Ok(())
386 }
387 fn compute_size(&self, version: i16) -> Result<usize> {
388 let mut total_size = 0;
389 if version >= 1 {
390 total_size += types::Int32.compute_size(&self.port)?;
391 } else {
392 if self.port != 0 {
393 bail!("A field is set that is not available on the selected protocol version");
394 }
395 }
396 if version >= 1 {
397 if version >= 6 {
398 total_size += types::CompactString.compute_size(&self.host)?;
399 } else {
400 total_size += types::String.compute_size(&self.host)?;
401 }
402 } else {
403 if !self.host.is_empty() {
404 bail!("A field is set that is not available on the selected protocol version");
405 }
406 }
407 if version >= 3 {
408 if version >= 6 {
409 total_size += types::CompactString.compute_size(&self.listener)?;
410 } else {
411 total_size += types::String.compute_size(&self.listener)?;
412 }
413 }
414 if version >= 1 {
415 total_size += types::Int16.compute_size(&self.security_protocol)?;
416 } else {
417 if self.security_protocol != 0 {
418 bail!("A field is set that is not available on the selected protocol version");
419 }
420 }
421 if version >= 6 {
422 let num_tagged_fields = self.unknown_tagged_fields.len();
423 if num_tagged_fields > std::u32::MAX as usize {
424 bail!(
425 "Too many tagged fields to encode ({} fields)",
426 num_tagged_fields
427 );
428 }
429 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
430
431 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
432 }
433 Ok(total_size)
434 }
435}
436
437#[cfg(feature = "broker")]
438impl Decodable for UpdateMetadataEndpoint {
439 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
440 let port = if version >= 1 {
441 types::Int32.decode(buf)?
442 } else {
443 0
444 };
445 let host = if version >= 1 {
446 if version >= 6 {
447 types::CompactString.decode(buf)?
448 } else {
449 types::String.decode(buf)?
450 }
451 } else {
452 Default::default()
453 };
454 let listener = if version >= 3 {
455 if version >= 6 {
456 types::CompactString.decode(buf)?
457 } else {
458 types::String.decode(buf)?
459 }
460 } else {
461 Default::default()
462 };
463 let security_protocol = if version >= 1 {
464 types::Int16.decode(buf)?
465 } else {
466 0
467 };
468 let mut unknown_tagged_fields = BTreeMap::new();
469 if version >= 6 {
470 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
471 for _ in 0..num_tagged_fields {
472 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
473 let size: u32 = types::UnsignedVarInt.decode(buf)?;
474 let unknown_value = buf.try_get_bytes(size as usize)?;
475 unknown_tagged_fields.insert(tag as i32, unknown_value);
476 }
477 }
478 Ok(Self {
479 port,
480 host,
481 listener,
482 security_protocol,
483 unknown_tagged_fields,
484 })
485 }
486}
487
488impl Default for UpdateMetadataEndpoint {
489 fn default() -> Self {
490 Self {
491 port: 0,
492 host: Default::default(),
493 listener: Default::default(),
494 security_protocol: 0,
495 unknown_tagged_fields: BTreeMap::new(),
496 }
497 }
498}
499
500impl Message for UpdateMetadataEndpoint {
501 const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
502 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
503}
504
505#[non_exhaustive]
507#[derive(Debug, Clone, PartialEq)]
508pub struct UpdateMetadataPartitionState {
509 pub topic_name: super::TopicName,
513
514 pub partition_index: i32,
518
519 pub controller_epoch: i32,
523
524 pub leader: super::BrokerId,
528
529 pub leader_epoch: i32,
533
534 pub isr: Vec<super::BrokerId>,
538
539 pub zk_version: i32,
543
544 pub replicas: Vec<super::BrokerId>,
548
549 pub offline_replicas: Vec<super::BrokerId>,
553
554 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
556}
557
558impl UpdateMetadataPartitionState {
559 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
565 self.topic_name = value;
566 self
567 }
568 pub fn with_partition_index(mut self, value: i32) -> Self {
574 self.partition_index = value;
575 self
576 }
577 pub fn with_controller_epoch(mut self, value: i32) -> Self {
583 self.controller_epoch = value;
584 self
585 }
586 pub fn with_leader(mut self, value: super::BrokerId) -> Self {
592 self.leader = value;
593 self
594 }
595 pub fn with_leader_epoch(mut self, value: i32) -> Self {
601 self.leader_epoch = value;
602 self
603 }
604 pub fn with_isr(mut self, value: Vec<super::BrokerId>) -> Self {
610 self.isr = value;
611 self
612 }
613 pub fn with_zk_version(mut self, value: i32) -> Self {
619 self.zk_version = value;
620 self
621 }
622 pub fn with_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
628 self.replicas = value;
629 self
630 }
631 pub fn with_offline_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
637 self.offline_replicas = value;
638 self
639 }
640 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
642 self.unknown_tagged_fields = value;
643 self
644 }
645 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
647 self.unknown_tagged_fields.insert(key, value);
648 self
649 }
650}
651
652#[cfg(feature = "client")]
653impl Encodable for UpdateMetadataPartitionState {
654 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
655 if version <= 4 {
656 types::String.encode(buf, &self.topic_name)?;
657 }
658 types::Int32.encode(buf, &self.partition_index)?;
659 types::Int32.encode(buf, &self.controller_epoch)?;
660 types::Int32.encode(buf, &self.leader)?;
661 types::Int32.encode(buf, &self.leader_epoch)?;
662 if version >= 6 {
663 types::CompactArray(types::Int32).encode(buf, &self.isr)?;
664 } else {
665 types::Array(types::Int32).encode(buf, &self.isr)?;
666 }
667 types::Int32.encode(buf, &self.zk_version)?;
668 if version >= 6 {
669 types::CompactArray(types::Int32).encode(buf, &self.replicas)?;
670 } else {
671 types::Array(types::Int32).encode(buf, &self.replicas)?;
672 }
673 if version >= 4 {
674 if version >= 6 {
675 types::CompactArray(types::Int32).encode(buf, &self.offline_replicas)?;
676 } else {
677 types::Array(types::Int32).encode(buf, &self.offline_replicas)?;
678 }
679 }
680 if version >= 6 {
681 let num_tagged_fields = self.unknown_tagged_fields.len();
682 if num_tagged_fields > std::u32::MAX as usize {
683 bail!(
684 "Too many tagged fields to encode ({} fields)",
685 num_tagged_fields
686 );
687 }
688 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
689
690 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
691 }
692 Ok(())
693 }
694 fn compute_size(&self, version: i16) -> Result<usize> {
695 let mut total_size = 0;
696 if version <= 4 {
697 total_size += types::String.compute_size(&self.topic_name)?;
698 }
699 total_size += types::Int32.compute_size(&self.partition_index)?;
700 total_size += types::Int32.compute_size(&self.controller_epoch)?;
701 total_size += types::Int32.compute_size(&self.leader)?;
702 total_size += types::Int32.compute_size(&self.leader_epoch)?;
703 if version >= 6 {
704 total_size += types::CompactArray(types::Int32).compute_size(&self.isr)?;
705 } else {
706 total_size += types::Array(types::Int32).compute_size(&self.isr)?;
707 }
708 total_size += types::Int32.compute_size(&self.zk_version)?;
709 if version >= 6 {
710 total_size += types::CompactArray(types::Int32).compute_size(&self.replicas)?;
711 } else {
712 total_size += types::Array(types::Int32).compute_size(&self.replicas)?;
713 }
714 if version >= 4 {
715 if version >= 6 {
716 total_size +=
717 types::CompactArray(types::Int32).compute_size(&self.offline_replicas)?;
718 } else {
719 total_size += types::Array(types::Int32).compute_size(&self.offline_replicas)?;
720 }
721 }
722 if version >= 6 {
723 let num_tagged_fields = self.unknown_tagged_fields.len();
724 if num_tagged_fields > std::u32::MAX as usize {
725 bail!(
726 "Too many tagged fields to encode ({} fields)",
727 num_tagged_fields
728 );
729 }
730 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
731
732 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
733 }
734 Ok(total_size)
735 }
736}
737
738#[cfg(feature = "broker")]
739impl Decodable for UpdateMetadataPartitionState {
740 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
741 let topic_name = if version <= 4 {
742 types::String.decode(buf)?
743 } else {
744 Default::default()
745 };
746 let partition_index = types::Int32.decode(buf)?;
747 let controller_epoch = types::Int32.decode(buf)?;
748 let leader = types::Int32.decode(buf)?;
749 let leader_epoch = types::Int32.decode(buf)?;
750 let isr = if version >= 6 {
751 types::CompactArray(types::Int32).decode(buf)?
752 } else {
753 types::Array(types::Int32).decode(buf)?
754 };
755 let zk_version = types::Int32.decode(buf)?;
756 let replicas = if version >= 6 {
757 types::CompactArray(types::Int32).decode(buf)?
758 } else {
759 types::Array(types::Int32).decode(buf)?
760 };
761 let offline_replicas = if version >= 4 {
762 if version >= 6 {
763 types::CompactArray(types::Int32).decode(buf)?
764 } else {
765 types::Array(types::Int32).decode(buf)?
766 }
767 } else {
768 Default::default()
769 };
770 let mut unknown_tagged_fields = BTreeMap::new();
771 if version >= 6 {
772 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
773 for _ in 0..num_tagged_fields {
774 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
775 let size: u32 = types::UnsignedVarInt.decode(buf)?;
776 let unknown_value = buf.try_get_bytes(size as usize)?;
777 unknown_tagged_fields.insert(tag as i32, unknown_value);
778 }
779 }
780 Ok(Self {
781 topic_name,
782 partition_index,
783 controller_epoch,
784 leader,
785 leader_epoch,
786 isr,
787 zk_version,
788 replicas,
789 offline_replicas,
790 unknown_tagged_fields,
791 })
792 }
793}
794
795impl Default for UpdateMetadataPartitionState {
796 fn default() -> Self {
797 Self {
798 topic_name: Default::default(),
799 partition_index: 0,
800 controller_epoch: 0,
801 leader: (0).into(),
802 leader_epoch: 0,
803 isr: Default::default(),
804 zk_version: 0,
805 replicas: Default::default(),
806 offline_replicas: Default::default(),
807 unknown_tagged_fields: BTreeMap::new(),
808 }
809 }
810}
811
812impl Message for UpdateMetadataPartitionState {
813 const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
814 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
815}
816
817#[non_exhaustive]
819#[derive(Debug, Clone, PartialEq)]
820pub struct UpdateMetadataRequest {
821 pub controller_id: super::BrokerId,
825
826 pub is_k_raft_controller: bool,
830
831 pub _type: i8,
835
836 pub controller_epoch: i32,
840
841 pub broker_epoch: i64,
845
846 pub ungrouped_partition_states: Vec<UpdateMetadataPartitionState>,
850
851 pub topic_states: Vec<UpdateMetadataTopicState>,
855
856 pub live_brokers: Vec<UpdateMetadataBroker>,
860
861 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
863}
864
865impl UpdateMetadataRequest {
866 pub fn with_controller_id(mut self, value: super::BrokerId) -> Self {
872 self.controller_id = value;
873 self
874 }
875 pub fn with_is_k_raft_controller(mut self, value: bool) -> Self {
881 self.is_k_raft_controller = value;
882 self
883 }
884 pub fn with_type(mut self, value: i8) -> Self {
890 self._type = value;
891 self
892 }
893 pub fn with_controller_epoch(mut self, value: i32) -> Self {
899 self.controller_epoch = value;
900 self
901 }
902 pub fn with_broker_epoch(mut self, value: i64) -> Self {
908 self.broker_epoch = value;
909 self
910 }
911 pub fn with_ungrouped_partition_states(
917 mut self,
918 value: Vec<UpdateMetadataPartitionState>,
919 ) -> Self {
920 self.ungrouped_partition_states = value;
921 self
922 }
923 pub fn with_topic_states(mut self, value: Vec<UpdateMetadataTopicState>) -> Self {
929 self.topic_states = value;
930 self
931 }
932 pub fn with_live_brokers(mut self, value: Vec<UpdateMetadataBroker>) -> Self {
938 self.live_brokers = value;
939 self
940 }
941 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
943 self.unknown_tagged_fields = value;
944 self
945 }
946 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
948 self.unknown_tagged_fields.insert(key, value);
949 self
950 }
951}
952
953#[cfg(feature = "client")]
954impl Encodable for UpdateMetadataRequest {
955 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
956 types::Int32.encode(buf, &self.controller_id)?;
957 if version >= 8 {
958 types::Boolean.encode(buf, &self.is_k_raft_controller)?;
959 } else {
960 if self.is_k_raft_controller {
961 bail!("A field is set that is not available on the selected protocol version");
962 }
963 }
964 types::Int32.encode(buf, &self.controller_epoch)?;
965 if version >= 5 {
966 types::Int64.encode(buf, &self.broker_epoch)?;
967 }
968 if version <= 4 {
969 types::Array(types::Struct { version })
970 .encode(buf, &self.ungrouped_partition_states)?;
971 } else {
972 if !self.ungrouped_partition_states.is_empty() {
973 bail!("A field is set that is not available on the selected protocol version");
974 }
975 }
976 if version >= 5 {
977 if version >= 6 {
978 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_states)?;
979 } else {
980 types::Array(types::Struct { version }).encode(buf, &self.topic_states)?;
981 }
982 } else {
983 if !self.topic_states.is_empty() {
984 bail!("A field is set that is not available on the selected protocol version");
985 }
986 }
987 if version >= 6 {
988 types::CompactArray(types::Struct { version }).encode(buf, &self.live_brokers)?;
989 } else {
990 types::Array(types::Struct { version }).encode(buf, &self.live_brokers)?;
991 }
992 if version >= 6 {
993 let mut num_tagged_fields = self.unknown_tagged_fields.len();
994 if version >= 8 {
995 if self._type != 0 {
996 num_tagged_fields += 1;
997 }
998 }
999 if num_tagged_fields > std::u32::MAX as usize {
1000 bail!(
1001 "Too many tagged fields to encode ({} fields)",
1002 num_tagged_fields
1003 );
1004 }
1005 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1006 if version >= 8 {
1007 if self._type != 0 {
1008 let computed_size = types::Int8.compute_size(&self._type)?;
1009 if computed_size > std::u32::MAX as usize {
1010 bail!(
1011 "Tagged field is too large to encode ({} bytes)",
1012 computed_size
1013 );
1014 }
1015 types::UnsignedVarInt.encode(buf, 0)?;
1016 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1017 types::Int8.encode(buf, &self._type)?;
1018 }
1019 }
1020 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
1021 }
1022 Ok(())
1023 }
1024 fn compute_size(&self, version: i16) -> Result<usize> {
1025 let mut total_size = 0;
1026 total_size += types::Int32.compute_size(&self.controller_id)?;
1027 if version >= 8 {
1028 total_size += types::Boolean.compute_size(&self.is_k_raft_controller)?;
1029 } else {
1030 if self.is_k_raft_controller {
1031 bail!("A field is set that is not available on the selected protocol version");
1032 }
1033 }
1034 total_size += types::Int32.compute_size(&self.controller_epoch)?;
1035 if version >= 5 {
1036 total_size += types::Int64.compute_size(&self.broker_epoch)?;
1037 }
1038 if version <= 4 {
1039 total_size += types::Array(types::Struct { version })
1040 .compute_size(&self.ungrouped_partition_states)?;
1041 } else {
1042 if !self.ungrouped_partition_states.is_empty() {
1043 bail!("A field is set that is not available on the selected protocol version");
1044 }
1045 }
1046 if version >= 5 {
1047 if version >= 6 {
1048 total_size += types::CompactArray(types::Struct { version })
1049 .compute_size(&self.topic_states)?;
1050 } else {
1051 total_size +=
1052 types::Array(types::Struct { version }).compute_size(&self.topic_states)?;
1053 }
1054 } else {
1055 if !self.topic_states.is_empty() {
1056 bail!("A field is set that is not available on the selected protocol version");
1057 }
1058 }
1059 if version >= 6 {
1060 total_size +=
1061 types::CompactArray(types::Struct { version }).compute_size(&self.live_brokers)?;
1062 } else {
1063 total_size +=
1064 types::Array(types::Struct { version }).compute_size(&self.live_brokers)?;
1065 }
1066 if version >= 6 {
1067 let mut num_tagged_fields = self.unknown_tagged_fields.len();
1068 if version >= 8 {
1069 if self._type != 0 {
1070 num_tagged_fields += 1;
1071 }
1072 }
1073 if num_tagged_fields > std::u32::MAX as usize {
1074 bail!(
1075 "Too many tagged fields to encode ({} fields)",
1076 num_tagged_fields
1077 );
1078 }
1079 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1080 if version >= 8 {
1081 if self._type != 0 {
1082 let computed_size = types::Int8.compute_size(&self._type)?;
1083 if computed_size > std::u32::MAX as usize {
1084 bail!(
1085 "Tagged field is too large to encode ({} bytes)",
1086 computed_size
1087 );
1088 }
1089 total_size += types::UnsignedVarInt.compute_size(0)?;
1090 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1091 total_size += computed_size;
1092 }
1093 }
1094 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1095 }
1096 Ok(total_size)
1097 }
1098}
1099
1100#[cfg(feature = "broker")]
1101impl Decodable for UpdateMetadataRequest {
1102 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1103 let controller_id = types::Int32.decode(buf)?;
1104 let is_k_raft_controller = if version >= 8 {
1105 types::Boolean.decode(buf)?
1106 } else {
1107 false
1108 };
1109 let mut _type = 0;
1110 let controller_epoch = types::Int32.decode(buf)?;
1111 let broker_epoch = if version >= 5 {
1112 types::Int64.decode(buf)?
1113 } else {
1114 -1
1115 };
1116 let ungrouped_partition_states = if version <= 4 {
1117 types::Array(types::Struct { version }).decode(buf)?
1118 } else {
1119 Default::default()
1120 };
1121 let topic_states = if version >= 5 {
1122 if version >= 6 {
1123 types::CompactArray(types::Struct { version }).decode(buf)?
1124 } else {
1125 types::Array(types::Struct { version }).decode(buf)?
1126 }
1127 } else {
1128 Default::default()
1129 };
1130 let live_brokers = if version >= 6 {
1131 types::CompactArray(types::Struct { version }).decode(buf)?
1132 } else {
1133 types::Array(types::Struct { version }).decode(buf)?
1134 };
1135 let mut unknown_tagged_fields = BTreeMap::new();
1136 if version >= 6 {
1137 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1138 for _ in 0..num_tagged_fields {
1139 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1140 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1141 match tag {
1142 0 => {
1143 if version >= 8 {
1144 _type = types::Int8.decode(buf)?;
1145 } else {
1146 bail!("Tag {} is not valid for version {}", tag, version);
1147 }
1148 }
1149 _ => {
1150 let unknown_value = buf.try_get_bytes(size as usize)?;
1151 unknown_tagged_fields.insert(tag as i32, unknown_value);
1152 }
1153 }
1154 }
1155 }
1156 Ok(Self {
1157 controller_id,
1158 is_k_raft_controller,
1159 _type,
1160 controller_epoch,
1161 broker_epoch,
1162 ungrouped_partition_states,
1163 topic_states,
1164 live_brokers,
1165 unknown_tagged_fields,
1166 })
1167 }
1168}
1169
1170impl Default for UpdateMetadataRequest {
1171 fn default() -> Self {
1172 Self {
1173 controller_id: (0).into(),
1174 is_k_raft_controller: false,
1175 _type: 0,
1176 controller_epoch: 0,
1177 broker_epoch: -1,
1178 ungrouped_partition_states: Default::default(),
1179 topic_states: Default::default(),
1180 live_brokers: Default::default(),
1181 unknown_tagged_fields: BTreeMap::new(),
1182 }
1183 }
1184}
1185
1186impl Message for UpdateMetadataRequest {
1187 const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
1188 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1189}
1190
1191#[non_exhaustive]
1193#[derive(Debug, Clone, PartialEq)]
1194pub struct UpdateMetadataTopicState {
1195 pub topic_name: super::TopicName,
1199
1200 pub topic_id: Uuid,
1204
1205 pub partition_states: Vec<UpdateMetadataPartitionState>,
1209
1210 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1212}
1213
1214impl UpdateMetadataTopicState {
1215 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
1221 self.topic_name = value;
1222 self
1223 }
1224 pub fn with_topic_id(mut self, value: Uuid) -> Self {
1230 self.topic_id = value;
1231 self
1232 }
1233 pub fn with_partition_states(mut self, value: Vec<UpdateMetadataPartitionState>) -> Self {
1239 self.partition_states = value;
1240 self
1241 }
1242 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1244 self.unknown_tagged_fields = value;
1245 self
1246 }
1247 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1249 self.unknown_tagged_fields.insert(key, value);
1250 self
1251 }
1252}
1253
1254#[cfg(feature = "client")]
1255impl Encodable for UpdateMetadataTopicState {
1256 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1257 if version >= 5 {
1258 if version >= 6 {
1259 types::CompactString.encode(buf, &self.topic_name)?;
1260 } else {
1261 types::String.encode(buf, &self.topic_name)?;
1262 }
1263 } else {
1264 if !self.topic_name.is_empty() {
1265 bail!("A field is set that is not available on the selected protocol version");
1266 }
1267 }
1268 if version >= 7 {
1269 types::Uuid.encode(buf, &self.topic_id)?;
1270 }
1271 if version >= 5 {
1272 if version >= 6 {
1273 types::CompactArray(types::Struct { version })
1274 .encode(buf, &self.partition_states)?;
1275 } else {
1276 types::Array(types::Struct { version }).encode(buf, &self.partition_states)?;
1277 }
1278 } else {
1279 if !self.partition_states.is_empty() {
1280 bail!("A field is set that is not available on the selected protocol version");
1281 }
1282 }
1283 if version >= 6 {
1284 let num_tagged_fields = self.unknown_tagged_fields.len();
1285 if num_tagged_fields > std::u32::MAX as usize {
1286 bail!(
1287 "Too many tagged fields to encode ({} fields)",
1288 num_tagged_fields
1289 );
1290 }
1291 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1292
1293 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1294 }
1295 Ok(())
1296 }
1297 fn compute_size(&self, version: i16) -> Result<usize> {
1298 let mut total_size = 0;
1299 if version >= 5 {
1300 if version >= 6 {
1301 total_size += types::CompactString.compute_size(&self.topic_name)?;
1302 } else {
1303 total_size += types::String.compute_size(&self.topic_name)?;
1304 }
1305 } else {
1306 if !self.topic_name.is_empty() {
1307 bail!("A field is set that is not available on the selected protocol version");
1308 }
1309 }
1310 if version >= 7 {
1311 total_size += types::Uuid.compute_size(&self.topic_id)?;
1312 }
1313 if version >= 5 {
1314 if version >= 6 {
1315 total_size += types::CompactArray(types::Struct { version })
1316 .compute_size(&self.partition_states)?;
1317 } else {
1318 total_size +=
1319 types::Array(types::Struct { version }).compute_size(&self.partition_states)?;
1320 }
1321 } else {
1322 if !self.partition_states.is_empty() {
1323 bail!("A field is set that is not available on the selected protocol version");
1324 }
1325 }
1326 if version >= 6 {
1327 let num_tagged_fields = self.unknown_tagged_fields.len();
1328 if num_tagged_fields > std::u32::MAX as usize {
1329 bail!(
1330 "Too many tagged fields to encode ({} fields)",
1331 num_tagged_fields
1332 );
1333 }
1334 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1335
1336 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1337 }
1338 Ok(total_size)
1339 }
1340}
1341
1342#[cfg(feature = "broker")]
1343impl Decodable for UpdateMetadataTopicState {
1344 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1345 let topic_name = if version >= 5 {
1346 if version >= 6 {
1347 types::CompactString.decode(buf)?
1348 } else {
1349 types::String.decode(buf)?
1350 }
1351 } else {
1352 Default::default()
1353 };
1354 let topic_id = if version >= 7 {
1355 types::Uuid.decode(buf)?
1356 } else {
1357 Uuid::nil()
1358 };
1359 let partition_states = if version >= 5 {
1360 if version >= 6 {
1361 types::CompactArray(types::Struct { version }).decode(buf)?
1362 } else {
1363 types::Array(types::Struct { version }).decode(buf)?
1364 }
1365 } else {
1366 Default::default()
1367 };
1368 let mut unknown_tagged_fields = BTreeMap::new();
1369 if version >= 6 {
1370 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1371 for _ in 0..num_tagged_fields {
1372 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1373 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1374 let unknown_value = buf.try_get_bytes(size as usize)?;
1375 unknown_tagged_fields.insert(tag as i32, unknown_value);
1376 }
1377 }
1378 Ok(Self {
1379 topic_name,
1380 topic_id,
1381 partition_states,
1382 unknown_tagged_fields,
1383 })
1384 }
1385}
1386
1387impl Default for UpdateMetadataTopicState {
1388 fn default() -> Self {
1389 Self {
1390 topic_name: Default::default(),
1391 topic_id: Uuid::nil(),
1392 partition_states: Default::default(),
1393 unknown_tagged_fields: BTreeMap::new(),
1394 }
1395 }
1396}
1397
1398impl Message for UpdateMetadataTopicState {
1399 const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
1400 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1401}
1402
1403impl HeaderVersion for UpdateMetadataRequest {
1404 fn header_version(version: i16) -> i16 {
1405 if version >= 6 {
1406 2
1407 } else {
1408 1
1409 }
1410 }
1411}