1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct DescribeQuorumResponse {
24 pub error_code: i16,
28
29 pub error_message: Option<StrBytes>,
33
34 pub topics: Vec<TopicData>,
38
39 pub nodes: Vec<Node>,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl DescribeQuorumResponse {
49 pub fn with_error_code(mut self, value: i16) -> Self {
55 self.error_code = value;
56 self
57 }
58 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
64 self.error_message = value;
65 self
66 }
67 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
73 self.topics = value;
74 self
75 }
76 pub fn with_nodes(mut self, value: Vec<Node>) -> Self {
82 self.nodes = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "broker")]
98impl Encodable for DescribeQuorumResponse {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 if version < 0 || version > 2 {
101 bail!("specified version not supported by this message type");
102 }
103 types::Int16.encode(buf, &self.error_code)?;
104 if version >= 2 {
105 types::CompactString.encode(buf, &self.error_message)?;
106 }
107 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
108 if version >= 2 {
109 types::CompactArray(types::Struct { version }).encode(buf, &self.nodes)?;
110 } else {
111 if !self.nodes.is_empty() {
112 bail!("A field is set that is not available on the selected protocol version");
113 }
114 }
115 let num_tagged_fields = self.unknown_tagged_fields.len();
116 if num_tagged_fields > std::u32::MAX as usize {
117 bail!(
118 "Too many tagged fields to encode ({} fields)",
119 num_tagged_fields
120 );
121 }
122 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
123
124 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
125 Ok(())
126 }
127 fn compute_size(&self, version: i16) -> Result<usize> {
128 let mut total_size = 0;
129 total_size += types::Int16.compute_size(&self.error_code)?;
130 if version >= 2 {
131 total_size += types::CompactString.compute_size(&self.error_message)?;
132 }
133 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
134 if version >= 2 {
135 total_size +=
136 types::CompactArray(types::Struct { version }).compute_size(&self.nodes)?;
137 } else {
138 if !self.nodes.is_empty() {
139 bail!("A field is set that is not available on the selected protocol version");
140 }
141 }
142 let num_tagged_fields = self.unknown_tagged_fields.len();
143 if num_tagged_fields > std::u32::MAX as usize {
144 bail!(
145 "Too many tagged fields to encode ({} fields)",
146 num_tagged_fields
147 );
148 }
149 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
150
151 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
152 Ok(total_size)
153 }
154}
155
156#[cfg(feature = "client")]
157impl Decodable for DescribeQuorumResponse {
158 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
159 if version < 0 || version > 2 {
160 bail!("specified version not supported by this message type");
161 }
162 let error_code = types::Int16.decode(buf)?;
163 let error_message = if version >= 2 {
164 types::CompactString.decode(buf)?
165 } else {
166 Some(Default::default())
167 };
168 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
169 let nodes = if version >= 2 {
170 types::CompactArray(types::Struct { version }).decode(buf)?
171 } else {
172 Default::default()
173 };
174 let mut unknown_tagged_fields = BTreeMap::new();
175 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
176 for _ in 0..num_tagged_fields {
177 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
178 let size: u32 = types::UnsignedVarInt.decode(buf)?;
179 let unknown_value = buf.try_get_bytes(size as usize)?;
180 unknown_tagged_fields.insert(tag as i32, unknown_value);
181 }
182 Ok(Self {
183 error_code,
184 error_message,
185 topics,
186 nodes,
187 unknown_tagged_fields,
188 })
189 }
190}
191
192impl Default for DescribeQuorumResponse {
193 fn default() -> Self {
194 Self {
195 error_code: 0,
196 error_message: Some(Default::default()),
197 topics: Default::default(),
198 nodes: Default::default(),
199 unknown_tagged_fields: BTreeMap::new(),
200 }
201 }
202}
203
204impl Message for DescribeQuorumResponse {
205 const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
206 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
207}
208
209#[non_exhaustive]
211#[derive(Debug, Clone, PartialEq)]
212pub struct Listener {
213 pub name: StrBytes,
217
218 pub host: StrBytes,
222
223 pub port: u16,
227
228 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
230}
231
232impl Listener {
233 pub fn with_name(mut self, value: StrBytes) -> Self {
239 self.name = value;
240 self
241 }
242 pub fn with_host(mut self, value: StrBytes) -> Self {
248 self.host = value;
249 self
250 }
251 pub fn with_port(mut self, value: u16) -> Self {
257 self.port = value;
258 self
259 }
260 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
262 self.unknown_tagged_fields = value;
263 self
264 }
265 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
267 self.unknown_tagged_fields.insert(key, value);
268 self
269 }
270}
271
272#[cfg(feature = "broker")]
273impl Encodable for Listener {
274 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
275 if version < 0 || version > 2 {
276 bail!("specified version not supported by this message type");
277 }
278 if version >= 2 {
279 types::CompactString.encode(buf, &self.name)?;
280 } else {
281 if !self.name.is_empty() {
282 bail!("A field is set that is not available on the selected protocol version");
283 }
284 }
285 if version >= 2 {
286 types::CompactString.encode(buf, &self.host)?;
287 } else {
288 if !self.host.is_empty() {
289 bail!("A field is set that is not available on the selected protocol version");
290 }
291 }
292 if version >= 2 {
293 types::UInt16.encode(buf, &self.port)?;
294 } else {
295 if self.port != 0 {
296 bail!("A field is set that is not available on the selected protocol version");
297 }
298 }
299 let num_tagged_fields = self.unknown_tagged_fields.len();
300 if num_tagged_fields > std::u32::MAX as usize {
301 bail!(
302 "Too many tagged fields to encode ({} fields)",
303 num_tagged_fields
304 );
305 }
306 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
307
308 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
309 Ok(())
310 }
311 fn compute_size(&self, version: i16) -> Result<usize> {
312 let mut total_size = 0;
313 if version >= 2 {
314 total_size += types::CompactString.compute_size(&self.name)?;
315 } else {
316 if !self.name.is_empty() {
317 bail!("A field is set that is not available on the selected protocol version");
318 }
319 }
320 if version >= 2 {
321 total_size += types::CompactString.compute_size(&self.host)?;
322 } else {
323 if !self.host.is_empty() {
324 bail!("A field is set that is not available on the selected protocol version");
325 }
326 }
327 if version >= 2 {
328 total_size += types::UInt16.compute_size(&self.port)?;
329 } else {
330 if self.port != 0 {
331 bail!("A field is set that is not available on the selected protocol version");
332 }
333 }
334 let num_tagged_fields = self.unknown_tagged_fields.len();
335 if num_tagged_fields > std::u32::MAX as usize {
336 bail!(
337 "Too many tagged fields to encode ({} fields)",
338 num_tagged_fields
339 );
340 }
341 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
342
343 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
344 Ok(total_size)
345 }
346}
347
348#[cfg(feature = "client")]
349impl Decodable for Listener {
350 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
351 if version < 0 || version > 2 {
352 bail!("specified version not supported by this message type");
353 }
354 let name = if version >= 2 {
355 types::CompactString.decode(buf)?
356 } else {
357 Default::default()
358 };
359 let host = if version >= 2 {
360 types::CompactString.decode(buf)?
361 } else {
362 Default::default()
363 };
364 let port = if version >= 2 {
365 types::UInt16.decode(buf)?
366 } else {
367 0
368 };
369 let mut unknown_tagged_fields = BTreeMap::new();
370 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
371 for _ in 0..num_tagged_fields {
372 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
373 let size: u32 = types::UnsignedVarInt.decode(buf)?;
374 let unknown_value = buf.try_get_bytes(size as usize)?;
375 unknown_tagged_fields.insert(tag as i32, unknown_value);
376 }
377 Ok(Self {
378 name,
379 host,
380 port,
381 unknown_tagged_fields,
382 })
383 }
384}
385
386impl Default for Listener {
387 fn default() -> Self {
388 Self {
389 name: Default::default(),
390 host: Default::default(),
391 port: 0,
392 unknown_tagged_fields: BTreeMap::new(),
393 }
394 }
395}
396
397impl Message for Listener {
398 const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
399 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
400}
401
402#[non_exhaustive]
404#[derive(Debug, Clone, PartialEq)]
405pub struct Node {
406 pub node_id: super::BrokerId,
410
411 pub listeners: Vec<Listener>,
415
416 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
418}
419
420impl Node {
421 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
427 self.node_id = value;
428 self
429 }
430 pub fn with_listeners(mut self, value: Vec<Listener>) -> Self {
436 self.listeners = value;
437 self
438 }
439 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
441 self.unknown_tagged_fields = value;
442 self
443 }
444 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
446 self.unknown_tagged_fields.insert(key, value);
447 self
448 }
449}
450
451#[cfg(feature = "broker")]
452impl Encodable for Node {
453 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
454 if version < 0 || version > 2 {
455 bail!("specified version not supported by this message type");
456 }
457 if version >= 2 {
458 types::Int32.encode(buf, &self.node_id)?;
459 } else {
460 if self.node_id != 0 {
461 bail!("A field is set that is not available on the selected protocol version");
462 }
463 }
464 if version >= 2 {
465 types::CompactArray(types::Struct { version }).encode(buf, &self.listeners)?;
466 } else {
467 if !self.listeners.is_empty() {
468 bail!("A field is set that is not available on the selected protocol version");
469 }
470 }
471 let num_tagged_fields = self.unknown_tagged_fields.len();
472 if num_tagged_fields > std::u32::MAX as usize {
473 bail!(
474 "Too many tagged fields to encode ({} fields)",
475 num_tagged_fields
476 );
477 }
478 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
479
480 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
481 Ok(())
482 }
483 fn compute_size(&self, version: i16) -> Result<usize> {
484 let mut total_size = 0;
485 if version >= 2 {
486 total_size += types::Int32.compute_size(&self.node_id)?;
487 } else {
488 if self.node_id != 0 {
489 bail!("A field is set that is not available on the selected protocol version");
490 }
491 }
492 if version >= 2 {
493 total_size +=
494 types::CompactArray(types::Struct { version }).compute_size(&self.listeners)?;
495 } else {
496 if !self.listeners.is_empty() {
497 bail!("A field is set that is not available on the selected protocol version");
498 }
499 }
500 let num_tagged_fields = self.unknown_tagged_fields.len();
501 if num_tagged_fields > std::u32::MAX as usize {
502 bail!(
503 "Too many tagged fields to encode ({} fields)",
504 num_tagged_fields
505 );
506 }
507 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
508
509 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
510 Ok(total_size)
511 }
512}
513
514#[cfg(feature = "client")]
515impl Decodable for Node {
516 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
517 if version < 0 || version > 2 {
518 bail!("specified version not supported by this message type");
519 }
520 let node_id = if version >= 2 {
521 types::Int32.decode(buf)?
522 } else {
523 (0).into()
524 };
525 let listeners = if version >= 2 {
526 types::CompactArray(types::Struct { version }).decode(buf)?
527 } else {
528 Default::default()
529 };
530 let mut unknown_tagged_fields = BTreeMap::new();
531 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
532 for _ in 0..num_tagged_fields {
533 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
534 let size: u32 = types::UnsignedVarInt.decode(buf)?;
535 let unknown_value = buf.try_get_bytes(size as usize)?;
536 unknown_tagged_fields.insert(tag as i32, unknown_value);
537 }
538 Ok(Self {
539 node_id,
540 listeners,
541 unknown_tagged_fields,
542 })
543 }
544}
545
546impl Default for Node {
547 fn default() -> Self {
548 Self {
549 node_id: (0).into(),
550 listeners: Default::default(),
551 unknown_tagged_fields: BTreeMap::new(),
552 }
553 }
554}
555
556impl Message for Node {
557 const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
558 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
559}
560
561#[non_exhaustive]
563#[derive(Debug, Clone, PartialEq)]
564pub struct PartitionData {
565 pub partition_index: i32,
569
570 pub error_code: i16,
574
575 pub error_message: Option<StrBytes>,
579
580 pub leader_id: super::BrokerId,
584
585 pub leader_epoch: i32,
589
590 pub high_watermark: i64,
594
595 pub current_voters: Vec<ReplicaState>,
599
600 pub observers: Vec<ReplicaState>,
604
605 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
607}
608
609impl PartitionData {
610 pub fn with_partition_index(mut self, value: i32) -> Self {
616 self.partition_index = value;
617 self
618 }
619 pub fn with_error_code(mut self, value: i16) -> Self {
625 self.error_code = value;
626 self
627 }
628 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
634 self.error_message = value;
635 self
636 }
637 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
643 self.leader_id = value;
644 self
645 }
646 pub fn with_leader_epoch(mut self, value: i32) -> Self {
652 self.leader_epoch = value;
653 self
654 }
655 pub fn with_high_watermark(mut self, value: i64) -> Self {
661 self.high_watermark = value;
662 self
663 }
664 pub fn with_current_voters(mut self, value: Vec<ReplicaState>) -> Self {
670 self.current_voters = value;
671 self
672 }
673 pub fn with_observers(mut self, value: Vec<ReplicaState>) -> Self {
679 self.observers = value;
680 self
681 }
682 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
684 self.unknown_tagged_fields = value;
685 self
686 }
687 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
689 self.unknown_tagged_fields.insert(key, value);
690 self
691 }
692}
693
694#[cfg(feature = "broker")]
695impl Encodable for PartitionData {
696 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
697 if version < 0 || version > 2 {
698 bail!("specified version not supported by this message type");
699 }
700 types::Int32.encode(buf, &self.partition_index)?;
701 types::Int16.encode(buf, &self.error_code)?;
702 if version >= 2 {
703 types::CompactString.encode(buf, &self.error_message)?;
704 }
705 types::Int32.encode(buf, &self.leader_id)?;
706 types::Int32.encode(buf, &self.leader_epoch)?;
707 types::Int64.encode(buf, &self.high_watermark)?;
708 types::CompactArray(types::Struct { version }).encode(buf, &self.current_voters)?;
709 types::CompactArray(types::Struct { version }).encode(buf, &self.observers)?;
710 let num_tagged_fields = self.unknown_tagged_fields.len();
711 if num_tagged_fields > std::u32::MAX as usize {
712 bail!(
713 "Too many tagged fields to encode ({} fields)",
714 num_tagged_fields
715 );
716 }
717 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
718
719 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
720 Ok(())
721 }
722 fn compute_size(&self, version: i16) -> Result<usize> {
723 let mut total_size = 0;
724 total_size += types::Int32.compute_size(&self.partition_index)?;
725 total_size += types::Int16.compute_size(&self.error_code)?;
726 if version >= 2 {
727 total_size += types::CompactString.compute_size(&self.error_message)?;
728 }
729 total_size += types::Int32.compute_size(&self.leader_id)?;
730 total_size += types::Int32.compute_size(&self.leader_epoch)?;
731 total_size += types::Int64.compute_size(&self.high_watermark)?;
732 total_size +=
733 types::CompactArray(types::Struct { version }).compute_size(&self.current_voters)?;
734 total_size +=
735 types::CompactArray(types::Struct { version }).compute_size(&self.observers)?;
736 let num_tagged_fields = self.unknown_tagged_fields.len();
737 if num_tagged_fields > std::u32::MAX as usize {
738 bail!(
739 "Too many tagged fields to encode ({} fields)",
740 num_tagged_fields
741 );
742 }
743 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
744
745 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
746 Ok(total_size)
747 }
748}
749
750#[cfg(feature = "client")]
751impl Decodable for PartitionData {
752 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
753 if version < 0 || version > 2 {
754 bail!("specified version not supported by this message type");
755 }
756 let partition_index = types::Int32.decode(buf)?;
757 let error_code = types::Int16.decode(buf)?;
758 let error_message = if version >= 2 {
759 types::CompactString.decode(buf)?
760 } else {
761 Some(Default::default())
762 };
763 let leader_id = types::Int32.decode(buf)?;
764 let leader_epoch = types::Int32.decode(buf)?;
765 let high_watermark = types::Int64.decode(buf)?;
766 let current_voters = types::CompactArray(types::Struct { version }).decode(buf)?;
767 let observers = types::CompactArray(types::Struct { version }).decode(buf)?;
768 let mut unknown_tagged_fields = BTreeMap::new();
769 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
770 for _ in 0..num_tagged_fields {
771 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
772 let size: u32 = types::UnsignedVarInt.decode(buf)?;
773 let unknown_value = buf.try_get_bytes(size as usize)?;
774 unknown_tagged_fields.insert(tag as i32, unknown_value);
775 }
776 Ok(Self {
777 partition_index,
778 error_code,
779 error_message,
780 leader_id,
781 leader_epoch,
782 high_watermark,
783 current_voters,
784 observers,
785 unknown_tagged_fields,
786 })
787 }
788}
789
790impl Default for PartitionData {
791 fn default() -> Self {
792 Self {
793 partition_index: 0,
794 error_code: 0,
795 error_message: Some(Default::default()),
796 leader_id: (0).into(),
797 leader_epoch: 0,
798 high_watermark: 0,
799 current_voters: Default::default(),
800 observers: Default::default(),
801 unknown_tagged_fields: BTreeMap::new(),
802 }
803 }
804}
805
806impl Message for PartitionData {
807 const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
808 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
809}
810
811#[non_exhaustive]
813#[derive(Debug, Clone, PartialEq)]
814pub struct ReplicaState {
815 pub replica_id: super::BrokerId,
819
820 pub replica_directory_id: Uuid,
824
825 pub log_end_offset: i64,
829
830 pub last_fetch_timestamp: i64,
834
835 pub last_caught_up_timestamp: i64,
839
840 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
842}
843
844impl ReplicaState {
845 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
851 self.replica_id = value;
852 self
853 }
854 pub fn with_replica_directory_id(mut self, value: Uuid) -> Self {
860 self.replica_directory_id = value;
861 self
862 }
863 pub fn with_log_end_offset(mut self, value: i64) -> Self {
869 self.log_end_offset = value;
870 self
871 }
872 pub fn with_last_fetch_timestamp(mut self, value: i64) -> Self {
878 self.last_fetch_timestamp = value;
879 self
880 }
881 pub fn with_last_caught_up_timestamp(mut self, value: i64) -> Self {
887 self.last_caught_up_timestamp = value;
888 self
889 }
890 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
892 self.unknown_tagged_fields = value;
893 self
894 }
895 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
897 self.unknown_tagged_fields.insert(key, value);
898 self
899 }
900}
901
902#[cfg(feature = "broker")]
903impl Encodable for ReplicaState {
904 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
905 if version < 0 || version > 2 {
906 bail!("specified version not supported by this message type");
907 }
908 types::Int32.encode(buf, &self.replica_id)?;
909 if version >= 2 {
910 types::Uuid.encode(buf, &self.replica_directory_id)?;
911 } else {
912 if &self.replica_directory_id != &Uuid::nil() {
913 bail!("A field is set that is not available on the selected protocol version");
914 }
915 }
916 types::Int64.encode(buf, &self.log_end_offset)?;
917 if version >= 1 {
918 types::Int64.encode(buf, &self.last_fetch_timestamp)?;
919 }
920 if version >= 1 {
921 types::Int64.encode(buf, &self.last_caught_up_timestamp)?;
922 }
923 let num_tagged_fields = self.unknown_tagged_fields.len();
924 if num_tagged_fields > std::u32::MAX as usize {
925 bail!(
926 "Too many tagged fields to encode ({} fields)",
927 num_tagged_fields
928 );
929 }
930 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
931
932 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
933 Ok(())
934 }
935 fn compute_size(&self, version: i16) -> Result<usize> {
936 let mut total_size = 0;
937 total_size += types::Int32.compute_size(&self.replica_id)?;
938 if version >= 2 {
939 total_size += types::Uuid.compute_size(&self.replica_directory_id)?;
940 } else {
941 if &self.replica_directory_id != &Uuid::nil() {
942 bail!("A field is set that is not available on the selected protocol version");
943 }
944 }
945 total_size += types::Int64.compute_size(&self.log_end_offset)?;
946 if version >= 1 {
947 total_size += types::Int64.compute_size(&self.last_fetch_timestamp)?;
948 }
949 if version >= 1 {
950 total_size += types::Int64.compute_size(&self.last_caught_up_timestamp)?;
951 }
952 let num_tagged_fields = self.unknown_tagged_fields.len();
953 if num_tagged_fields > std::u32::MAX as usize {
954 bail!(
955 "Too many tagged fields to encode ({} fields)",
956 num_tagged_fields
957 );
958 }
959 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
960
961 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
962 Ok(total_size)
963 }
964}
965
966#[cfg(feature = "client")]
967impl Decodable for ReplicaState {
968 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
969 if version < 0 || version > 2 {
970 bail!("specified version not supported by this message type");
971 }
972 let replica_id = types::Int32.decode(buf)?;
973 let replica_directory_id = if version >= 2 {
974 types::Uuid.decode(buf)?
975 } else {
976 Uuid::nil()
977 };
978 let log_end_offset = types::Int64.decode(buf)?;
979 let last_fetch_timestamp = if version >= 1 {
980 types::Int64.decode(buf)?
981 } else {
982 -1
983 };
984 let last_caught_up_timestamp = if version >= 1 {
985 types::Int64.decode(buf)?
986 } else {
987 -1
988 };
989 let mut unknown_tagged_fields = BTreeMap::new();
990 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
991 for _ in 0..num_tagged_fields {
992 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
993 let size: u32 = types::UnsignedVarInt.decode(buf)?;
994 let unknown_value = buf.try_get_bytes(size as usize)?;
995 unknown_tagged_fields.insert(tag as i32, unknown_value);
996 }
997 Ok(Self {
998 replica_id,
999 replica_directory_id,
1000 log_end_offset,
1001 last_fetch_timestamp,
1002 last_caught_up_timestamp,
1003 unknown_tagged_fields,
1004 })
1005 }
1006}
1007
1008impl Default for ReplicaState {
1009 fn default() -> Self {
1010 Self {
1011 replica_id: (0).into(),
1012 replica_directory_id: Uuid::nil(),
1013 log_end_offset: 0,
1014 last_fetch_timestamp: -1,
1015 last_caught_up_timestamp: -1,
1016 unknown_tagged_fields: BTreeMap::new(),
1017 }
1018 }
1019}
1020
1021impl Message for ReplicaState {
1022 const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
1023 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1024}
1025
1026#[non_exhaustive]
1028#[derive(Debug, Clone, PartialEq)]
1029pub struct TopicData {
1030 pub topic_name: super::TopicName,
1034
1035 pub partitions: Vec<PartitionData>,
1039
1040 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1042}
1043
1044impl TopicData {
1045 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
1051 self.topic_name = value;
1052 self
1053 }
1054 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
1060 self.partitions = value;
1061 self
1062 }
1063 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1065 self.unknown_tagged_fields = value;
1066 self
1067 }
1068 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1070 self.unknown_tagged_fields.insert(key, value);
1071 self
1072 }
1073}
1074
1075#[cfg(feature = "broker")]
1076impl Encodable for TopicData {
1077 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1078 if version < 0 || version > 2 {
1079 bail!("specified version not supported by this message type");
1080 }
1081 types::CompactString.encode(buf, &self.topic_name)?;
1082 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
1083 let num_tagged_fields = self.unknown_tagged_fields.len();
1084 if num_tagged_fields > std::u32::MAX as usize {
1085 bail!(
1086 "Too many tagged fields to encode ({} fields)",
1087 num_tagged_fields
1088 );
1089 }
1090 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1091
1092 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1093 Ok(())
1094 }
1095 fn compute_size(&self, version: i16) -> Result<usize> {
1096 let mut total_size = 0;
1097 total_size += types::CompactString.compute_size(&self.topic_name)?;
1098 total_size +=
1099 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
1100 let num_tagged_fields = self.unknown_tagged_fields.len();
1101 if num_tagged_fields > std::u32::MAX as usize {
1102 bail!(
1103 "Too many tagged fields to encode ({} fields)",
1104 num_tagged_fields
1105 );
1106 }
1107 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1108
1109 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1110 Ok(total_size)
1111 }
1112}
1113
1114#[cfg(feature = "client")]
1115impl Decodable for TopicData {
1116 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1117 if version < 0 || version > 2 {
1118 bail!("specified version not supported by this message type");
1119 }
1120 let topic_name = types::CompactString.decode(buf)?;
1121 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
1122 let mut unknown_tagged_fields = BTreeMap::new();
1123 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1124 for _ in 0..num_tagged_fields {
1125 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1126 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1127 let unknown_value = buf.try_get_bytes(size as usize)?;
1128 unknown_tagged_fields.insert(tag as i32, unknown_value);
1129 }
1130 Ok(Self {
1131 topic_name,
1132 partitions,
1133 unknown_tagged_fields,
1134 })
1135 }
1136}
1137
1138impl Default for TopicData {
1139 fn default() -> Self {
1140 Self {
1141 topic_name: Default::default(),
1142 partitions: Default::default(),
1143 unknown_tagged_fields: BTreeMap::new(),
1144 }
1145 }
1146}
1147
1148impl Message for TopicData {
1149 const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
1150 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1151}
1152
1153impl HeaderVersion for DescribeQuorumResponse {
1154 fn header_version(version: i16) -> i16 {
1155 1
1156 }
1157}