1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AbortedTransaction {
24 pub producer_id: super::ProducerId,
28
29 pub first_offset: i64,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl AbortedTransaction {
39 pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
45 self.producer_id = value;
46 self
47 }
48 pub fn with_first_offset(mut self, value: i64) -> Self {
54 self.first_offset = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for AbortedTransaction {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version < 0 || version > 17 {
73 bail!("specified version not supported by this message type");
74 }
75 if version >= 4 {
76 types::Int64.encode(buf, &self.producer_id)?;
77 } else {
78 if self.producer_id != 0 {
79 bail!("A field is set that is not available on the selected protocol version");
80 }
81 }
82 if version >= 4 {
83 types::Int64.encode(buf, &self.first_offset)?;
84 } else {
85 if self.first_offset != 0 {
86 bail!("A field is set that is not available on the selected protocol version");
87 }
88 }
89 if version >= 12 {
90 let num_tagged_fields = self.unknown_tagged_fields.len();
91 if num_tagged_fields > std::u32::MAX as usize {
92 bail!(
93 "Too many tagged fields to encode ({} fields)",
94 num_tagged_fields
95 );
96 }
97 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
98
99 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
100 }
101 Ok(())
102 }
103 fn compute_size(&self, version: i16) -> Result<usize> {
104 let mut total_size = 0;
105 if version >= 4 {
106 total_size += types::Int64.compute_size(&self.producer_id)?;
107 } else {
108 if self.producer_id != 0 {
109 bail!("A field is set that is not available on the selected protocol version");
110 }
111 }
112 if version >= 4 {
113 total_size += types::Int64.compute_size(&self.first_offset)?;
114 } else {
115 if self.first_offset != 0 {
116 bail!("A field is set that is not available on the selected protocol version");
117 }
118 }
119 if version >= 12 {
120 let num_tagged_fields = self.unknown_tagged_fields.len();
121 if num_tagged_fields > std::u32::MAX as usize {
122 bail!(
123 "Too many tagged fields to encode ({} fields)",
124 num_tagged_fields
125 );
126 }
127 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
128
129 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
130 }
131 Ok(total_size)
132 }
133}
134
135#[cfg(feature = "client")]
136impl Decodable for AbortedTransaction {
137 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
138 if version < 0 || version > 17 {
139 bail!("specified version not supported by this message type");
140 }
141 let producer_id = if version >= 4 {
142 types::Int64.decode(buf)?
143 } else {
144 (0).into()
145 };
146 let first_offset = if version >= 4 {
147 types::Int64.decode(buf)?
148 } else {
149 0
150 };
151 let mut unknown_tagged_fields = BTreeMap::new();
152 if version >= 12 {
153 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
154 for _ in 0..num_tagged_fields {
155 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
156 let size: u32 = types::UnsignedVarInt.decode(buf)?;
157 let unknown_value = buf.try_get_bytes(size as usize)?;
158 unknown_tagged_fields.insert(tag as i32, unknown_value);
159 }
160 }
161 Ok(Self {
162 producer_id,
163 first_offset,
164 unknown_tagged_fields,
165 })
166 }
167}
168
169impl Default for AbortedTransaction {
170 fn default() -> Self {
171 Self {
172 producer_id: (0).into(),
173 first_offset: 0,
174 unknown_tagged_fields: BTreeMap::new(),
175 }
176 }
177}
178
179impl Message for AbortedTransaction {
180 const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
181 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
182}
183
184#[non_exhaustive]
186#[derive(Debug, Clone, PartialEq)]
187pub struct EpochEndOffset {
188 pub epoch: i32,
192
193 pub end_offset: i64,
197
198 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
200}
201
202impl EpochEndOffset {
203 pub fn with_epoch(mut self, value: i32) -> Self {
209 self.epoch = value;
210 self
211 }
212 pub fn with_end_offset(mut self, value: i64) -> Self {
218 self.end_offset = value;
219 self
220 }
221 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
223 self.unknown_tagged_fields = value;
224 self
225 }
226 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
228 self.unknown_tagged_fields.insert(key, value);
229 self
230 }
231}
232
233#[cfg(feature = "broker")]
234impl Encodable for EpochEndOffset {
235 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
236 if version < 0 || version > 17 {
237 bail!("specified version not supported by this message type");
238 }
239 if version >= 12 {
240 types::Int32.encode(buf, &self.epoch)?;
241 } else {
242 if self.epoch != -1 {
243 bail!("A field is set that is not available on the selected protocol version");
244 }
245 }
246 if version >= 12 {
247 types::Int64.encode(buf, &self.end_offset)?;
248 } else {
249 if self.end_offset != -1 {
250 bail!("A field is set that is not available on the selected protocol version");
251 }
252 }
253 if version >= 12 {
254 let num_tagged_fields = self.unknown_tagged_fields.len();
255 if num_tagged_fields > std::u32::MAX as usize {
256 bail!(
257 "Too many tagged fields to encode ({} fields)",
258 num_tagged_fields
259 );
260 }
261 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
262
263 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
264 }
265 Ok(())
266 }
267 fn compute_size(&self, version: i16) -> Result<usize> {
268 let mut total_size = 0;
269 if version >= 12 {
270 total_size += types::Int32.compute_size(&self.epoch)?;
271 } else {
272 if self.epoch != -1 {
273 bail!("A field is set that is not available on the selected protocol version");
274 }
275 }
276 if version >= 12 {
277 total_size += types::Int64.compute_size(&self.end_offset)?;
278 } else {
279 if self.end_offset != -1 {
280 bail!("A field is set that is not available on the selected protocol version");
281 }
282 }
283 if version >= 12 {
284 let num_tagged_fields = self.unknown_tagged_fields.len();
285 if num_tagged_fields > std::u32::MAX as usize {
286 bail!(
287 "Too many tagged fields to encode ({} fields)",
288 num_tagged_fields
289 );
290 }
291 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
292
293 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
294 }
295 Ok(total_size)
296 }
297}
298
299#[cfg(feature = "client")]
300impl Decodable for EpochEndOffset {
301 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
302 if version < 0 || version > 17 {
303 bail!("specified version not supported by this message type");
304 }
305 let epoch = if version >= 12 {
306 types::Int32.decode(buf)?
307 } else {
308 -1
309 };
310 let end_offset = if version >= 12 {
311 types::Int64.decode(buf)?
312 } else {
313 -1
314 };
315 let mut unknown_tagged_fields = BTreeMap::new();
316 if version >= 12 {
317 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
318 for _ in 0..num_tagged_fields {
319 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
320 let size: u32 = types::UnsignedVarInt.decode(buf)?;
321 let unknown_value = buf.try_get_bytes(size as usize)?;
322 unknown_tagged_fields.insert(tag as i32, unknown_value);
323 }
324 }
325 Ok(Self {
326 epoch,
327 end_offset,
328 unknown_tagged_fields,
329 })
330 }
331}
332
333impl Default for EpochEndOffset {
334 fn default() -> Self {
335 Self {
336 epoch: -1,
337 end_offset: -1,
338 unknown_tagged_fields: BTreeMap::new(),
339 }
340 }
341}
342
343impl Message for EpochEndOffset {
344 const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
345 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
346}
347
348#[non_exhaustive]
350#[derive(Debug, Clone, PartialEq)]
351pub struct FetchResponse {
352 pub throttle_time_ms: i32,
356
357 pub error_code: i16,
361
362 pub session_id: i32,
366
367 pub responses: Vec<FetchableTopicResponse>,
371
372 pub node_endpoints: Vec<NodeEndpoint>,
376
377 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
379}
380
381impl FetchResponse {
382 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
388 self.throttle_time_ms = value;
389 self
390 }
391 pub fn with_error_code(mut self, value: i16) -> Self {
397 self.error_code = value;
398 self
399 }
400 pub fn with_session_id(mut self, value: i32) -> Self {
406 self.session_id = value;
407 self
408 }
409 pub fn with_responses(mut self, value: Vec<FetchableTopicResponse>) -> Self {
415 self.responses = value;
416 self
417 }
418 pub fn with_node_endpoints(mut self, value: Vec<NodeEndpoint>) -> Self {
424 self.node_endpoints = value;
425 self
426 }
427 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
429 self.unknown_tagged_fields = value;
430 self
431 }
432 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
434 self.unknown_tagged_fields.insert(key, value);
435 self
436 }
437}
438
439#[cfg(feature = "broker")]
440impl Encodable for FetchResponse {
441 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
442 if version < 0 || version > 17 {
443 bail!("specified version not supported by this message type");
444 }
445 if version >= 1 {
446 types::Int32.encode(buf, &self.throttle_time_ms)?;
447 }
448 if version >= 7 {
449 types::Int16.encode(buf, &self.error_code)?;
450 }
451 if version >= 7 {
452 types::Int32.encode(buf, &self.session_id)?;
453 } else {
454 if self.session_id != 0 {
455 bail!("A field is set that is not available on the selected protocol version");
456 }
457 }
458 if version >= 12 {
459 types::CompactArray(types::Struct { version }).encode(buf, &self.responses)?;
460 } else {
461 types::Array(types::Struct { version }).encode(buf, &self.responses)?;
462 }
463 if version >= 12 {
464 let mut num_tagged_fields = self.unknown_tagged_fields.len();
465 if version >= 16 {
466 if !self.node_endpoints.is_empty() {
467 num_tagged_fields += 1;
468 }
469 }
470 if num_tagged_fields > std::u32::MAX as usize {
471 bail!(
472 "Too many tagged fields to encode ({} fields)",
473 num_tagged_fields
474 );
475 }
476 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
477 if version >= 16 {
478 if !self.node_endpoints.is_empty() {
479 let computed_size = types::CompactArray(types::Struct { version })
480 .compute_size(&self.node_endpoints)?;
481 if computed_size > std::u32::MAX as usize {
482 bail!(
483 "Tagged field is too large to encode ({} bytes)",
484 computed_size
485 );
486 }
487 types::UnsignedVarInt.encode(buf, 0)?;
488 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
489 types::CompactArray(types::Struct { version })
490 .encode(buf, &self.node_endpoints)?;
491 }
492 }
493 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
494 }
495 Ok(())
496 }
497 fn compute_size(&self, version: i16) -> Result<usize> {
498 let mut total_size = 0;
499 if version >= 1 {
500 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
501 }
502 if version >= 7 {
503 total_size += types::Int16.compute_size(&self.error_code)?;
504 }
505 if version >= 7 {
506 total_size += types::Int32.compute_size(&self.session_id)?;
507 } else {
508 if self.session_id != 0 {
509 bail!("A field is set that is not available on the selected protocol version");
510 }
511 }
512 if version >= 12 {
513 total_size +=
514 types::CompactArray(types::Struct { version }).compute_size(&self.responses)?;
515 } else {
516 total_size += types::Array(types::Struct { version }).compute_size(&self.responses)?;
517 }
518 if version >= 12 {
519 let mut num_tagged_fields = self.unknown_tagged_fields.len();
520 if version >= 16 {
521 if !self.node_endpoints.is_empty() {
522 num_tagged_fields += 1;
523 }
524 }
525 if num_tagged_fields > std::u32::MAX as usize {
526 bail!(
527 "Too many tagged fields to encode ({} fields)",
528 num_tagged_fields
529 );
530 }
531 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
532 if version >= 16 {
533 if !self.node_endpoints.is_empty() {
534 let computed_size = types::CompactArray(types::Struct { version })
535 .compute_size(&self.node_endpoints)?;
536 if computed_size > std::u32::MAX as usize {
537 bail!(
538 "Tagged field is too large to encode ({} bytes)",
539 computed_size
540 );
541 }
542 total_size += types::UnsignedVarInt.compute_size(0)?;
543 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
544 total_size += computed_size;
545 }
546 }
547 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
548 }
549 Ok(total_size)
550 }
551}
552
553#[cfg(feature = "client")]
554impl Decodable for FetchResponse {
555 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
556 if version < 0 || version > 17 {
557 bail!("specified version not supported by this message type");
558 }
559 let throttle_time_ms = if version >= 1 {
560 types::Int32.decode(buf)?
561 } else {
562 0
563 };
564 let error_code = if version >= 7 {
565 types::Int16.decode(buf)?
566 } else {
567 0
568 };
569 let session_id = if version >= 7 {
570 types::Int32.decode(buf)?
571 } else {
572 0
573 };
574 let responses = if version >= 12 {
575 types::CompactArray(types::Struct { version }).decode(buf)?
576 } else {
577 types::Array(types::Struct { version }).decode(buf)?
578 };
579 let mut node_endpoints = Default::default();
580 let mut unknown_tagged_fields = BTreeMap::new();
581 if version >= 12 {
582 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
583 for _ in 0..num_tagged_fields {
584 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
585 let size: u32 = types::UnsignedVarInt.decode(buf)?;
586 match tag {
587 0 => {
588 if version >= 16 {
589 node_endpoints =
590 types::CompactArray(types::Struct { version }).decode(buf)?;
591 } else {
592 bail!("Tag {} is not valid for version {}", tag, version);
593 }
594 }
595 _ => {
596 let unknown_value = buf.try_get_bytes(size as usize)?;
597 unknown_tagged_fields.insert(tag as i32, unknown_value);
598 }
599 }
600 }
601 }
602 Ok(Self {
603 throttle_time_ms,
604 error_code,
605 session_id,
606 responses,
607 node_endpoints,
608 unknown_tagged_fields,
609 })
610 }
611}
612
613impl Default for FetchResponse {
614 fn default() -> Self {
615 Self {
616 throttle_time_ms: 0,
617 error_code: 0,
618 session_id: 0,
619 responses: Default::default(),
620 node_endpoints: Default::default(),
621 unknown_tagged_fields: BTreeMap::new(),
622 }
623 }
624}
625
626impl Message for FetchResponse {
627 const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
628 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
629}
630
631#[non_exhaustive]
633#[derive(Debug, Clone, PartialEq)]
634pub struct FetchableTopicResponse {
635 pub topic: super::TopicName,
639
640 pub topic_id: Uuid,
644
645 pub partitions: Vec<PartitionData>,
649
650 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
652}
653
654impl FetchableTopicResponse {
655 pub fn with_topic(mut self, value: super::TopicName) -> Self {
661 self.topic = value;
662 self
663 }
664 pub fn with_topic_id(mut self, value: Uuid) -> Self {
670 self.topic_id = value;
671 self
672 }
673 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
679 self.partitions = value;
680 self
681 }
682 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
684 self.unknown_tagged_fields = value;
685 self
686 }
687 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
689 self.unknown_tagged_fields.insert(key, value);
690 self
691 }
692}
693
694#[cfg(feature = "broker")]
695impl Encodable for FetchableTopicResponse {
696 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
697 if version < 0 || version > 17 {
698 bail!("specified version not supported by this message type");
699 }
700 if version <= 12 {
701 if version >= 12 {
702 types::CompactString.encode(buf, &self.topic)?;
703 } else {
704 types::String.encode(buf, &self.topic)?;
705 }
706 }
707 if version >= 13 {
708 types::Uuid.encode(buf, &self.topic_id)?;
709 }
710 if version >= 12 {
711 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
712 } else {
713 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
714 }
715 if version >= 12 {
716 let num_tagged_fields = self.unknown_tagged_fields.len();
717 if num_tagged_fields > std::u32::MAX as usize {
718 bail!(
719 "Too many tagged fields to encode ({} fields)",
720 num_tagged_fields
721 );
722 }
723 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
724
725 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
726 }
727 Ok(())
728 }
729 fn compute_size(&self, version: i16) -> Result<usize> {
730 let mut total_size = 0;
731 if version <= 12 {
732 if version >= 12 {
733 total_size += types::CompactString.compute_size(&self.topic)?;
734 } else {
735 total_size += types::String.compute_size(&self.topic)?;
736 }
737 }
738 if version >= 13 {
739 total_size += types::Uuid.compute_size(&self.topic_id)?;
740 }
741 if version >= 12 {
742 total_size +=
743 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
744 } else {
745 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
746 }
747 if version >= 12 {
748 let num_tagged_fields = self.unknown_tagged_fields.len();
749 if num_tagged_fields > std::u32::MAX as usize {
750 bail!(
751 "Too many tagged fields to encode ({} fields)",
752 num_tagged_fields
753 );
754 }
755 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
756
757 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
758 }
759 Ok(total_size)
760 }
761}
762
763#[cfg(feature = "client")]
764impl Decodable for FetchableTopicResponse {
765 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
766 if version < 0 || version > 17 {
767 bail!("specified version not supported by this message type");
768 }
769 let topic = if version <= 12 {
770 if version >= 12 {
771 types::CompactString.decode(buf)?
772 } else {
773 types::String.decode(buf)?
774 }
775 } else {
776 Default::default()
777 };
778 let topic_id = if version >= 13 {
779 types::Uuid.decode(buf)?
780 } else {
781 Uuid::nil()
782 };
783 let partitions = if version >= 12 {
784 types::CompactArray(types::Struct { version }).decode(buf)?
785 } else {
786 types::Array(types::Struct { version }).decode(buf)?
787 };
788 let mut unknown_tagged_fields = BTreeMap::new();
789 if version >= 12 {
790 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
791 for _ in 0..num_tagged_fields {
792 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
793 let size: u32 = types::UnsignedVarInt.decode(buf)?;
794 let unknown_value = buf.try_get_bytes(size as usize)?;
795 unknown_tagged_fields.insert(tag as i32, unknown_value);
796 }
797 }
798 Ok(Self {
799 topic,
800 topic_id,
801 partitions,
802 unknown_tagged_fields,
803 })
804 }
805}
806
807impl Default for FetchableTopicResponse {
808 fn default() -> Self {
809 Self {
810 topic: Default::default(),
811 topic_id: Uuid::nil(),
812 partitions: Default::default(),
813 unknown_tagged_fields: BTreeMap::new(),
814 }
815 }
816}
817
818impl Message for FetchableTopicResponse {
819 const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
820 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
821}
822
823#[non_exhaustive]
825#[derive(Debug, Clone, PartialEq)]
826pub struct LeaderIdAndEpoch {
827 pub leader_id: super::BrokerId,
831
832 pub leader_epoch: i32,
836
837 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
839}
840
841impl LeaderIdAndEpoch {
842 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
848 self.leader_id = value;
849 self
850 }
851 pub fn with_leader_epoch(mut self, value: i32) -> Self {
857 self.leader_epoch = value;
858 self
859 }
860 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
862 self.unknown_tagged_fields = value;
863 self
864 }
865 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
867 self.unknown_tagged_fields.insert(key, value);
868 self
869 }
870}
871
872#[cfg(feature = "broker")]
873impl Encodable for LeaderIdAndEpoch {
874 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
875 if version < 0 || version > 17 {
876 bail!("specified version not supported by this message type");
877 }
878 if version >= 12 {
879 types::Int32.encode(buf, &self.leader_id)?;
880 } else {
881 if self.leader_id != -1 {
882 bail!("A field is set that is not available on the selected protocol version");
883 }
884 }
885 if version >= 12 {
886 types::Int32.encode(buf, &self.leader_epoch)?;
887 } else {
888 if self.leader_epoch != -1 {
889 bail!("A field is set that is not available on the selected protocol version");
890 }
891 }
892 if version >= 12 {
893 let num_tagged_fields = self.unknown_tagged_fields.len();
894 if num_tagged_fields > std::u32::MAX as usize {
895 bail!(
896 "Too many tagged fields to encode ({} fields)",
897 num_tagged_fields
898 );
899 }
900 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
901
902 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
903 }
904 Ok(())
905 }
906 fn compute_size(&self, version: i16) -> Result<usize> {
907 let mut total_size = 0;
908 if version >= 12 {
909 total_size += types::Int32.compute_size(&self.leader_id)?;
910 } else {
911 if self.leader_id != -1 {
912 bail!("A field is set that is not available on the selected protocol version");
913 }
914 }
915 if version >= 12 {
916 total_size += types::Int32.compute_size(&self.leader_epoch)?;
917 } else {
918 if self.leader_epoch != -1 {
919 bail!("A field is set that is not available on the selected protocol version");
920 }
921 }
922 if version >= 12 {
923 let num_tagged_fields = self.unknown_tagged_fields.len();
924 if num_tagged_fields > std::u32::MAX as usize {
925 bail!(
926 "Too many tagged fields to encode ({} fields)",
927 num_tagged_fields
928 );
929 }
930 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
931
932 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
933 }
934 Ok(total_size)
935 }
936}
937
938#[cfg(feature = "client")]
939impl Decodable for LeaderIdAndEpoch {
940 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
941 if version < 0 || version > 17 {
942 bail!("specified version not supported by this message type");
943 }
944 let leader_id = if version >= 12 {
945 types::Int32.decode(buf)?
946 } else {
947 (-1).into()
948 };
949 let leader_epoch = if version >= 12 {
950 types::Int32.decode(buf)?
951 } else {
952 -1
953 };
954 let mut unknown_tagged_fields = BTreeMap::new();
955 if version >= 12 {
956 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
957 for _ in 0..num_tagged_fields {
958 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
959 let size: u32 = types::UnsignedVarInt.decode(buf)?;
960 let unknown_value = buf.try_get_bytes(size as usize)?;
961 unknown_tagged_fields.insert(tag as i32, unknown_value);
962 }
963 }
964 Ok(Self {
965 leader_id,
966 leader_epoch,
967 unknown_tagged_fields,
968 })
969 }
970}
971
972impl Default for LeaderIdAndEpoch {
973 fn default() -> Self {
974 Self {
975 leader_id: (-1).into(),
976 leader_epoch: -1,
977 unknown_tagged_fields: BTreeMap::new(),
978 }
979 }
980}
981
982impl Message for LeaderIdAndEpoch {
983 const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
984 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
985}
986
987#[non_exhaustive]
989#[derive(Debug, Clone, PartialEq)]
990pub struct NodeEndpoint {
991 pub node_id: super::BrokerId,
995
996 pub host: StrBytes,
1000
1001 pub port: i32,
1005
1006 pub rack: Option<StrBytes>,
1010
1011 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1013}
1014
1015impl NodeEndpoint {
1016 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
1022 self.node_id = value;
1023 self
1024 }
1025 pub fn with_host(mut self, value: StrBytes) -> Self {
1031 self.host = value;
1032 self
1033 }
1034 pub fn with_port(mut self, value: i32) -> Self {
1040 self.port = value;
1041 self
1042 }
1043 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
1049 self.rack = value;
1050 self
1051 }
1052 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1054 self.unknown_tagged_fields = value;
1055 self
1056 }
1057 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1059 self.unknown_tagged_fields.insert(key, value);
1060 self
1061 }
1062}
1063
1064#[cfg(feature = "broker")]
1065impl Encodable for NodeEndpoint {
1066 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1067 if version < 0 || version > 17 {
1068 bail!("specified version not supported by this message type");
1069 }
1070 if version >= 16 {
1071 types::Int32.encode(buf, &self.node_id)?;
1072 } else {
1073 if self.node_id != 0 {
1074 bail!("A field is set that is not available on the selected protocol version");
1075 }
1076 }
1077 if version >= 16 {
1078 types::CompactString.encode(buf, &self.host)?;
1079 } else {
1080 if !self.host.is_empty() {
1081 bail!("A field is set that is not available on the selected protocol version");
1082 }
1083 }
1084 if version >= 16 {
1085 types::Int32.encode(buf, &self.port)?;
1086 } else {
1087 if self.port != 0 {
1088 bail!("A field is set that is not available on the selected protocol version");
1089 }
1090 }
1091 if version >= 16 {
1092 types::CompactString.encode(buf, &self.rack)?;
1093 } else {
1094 if !self.rack.is_none() {
1095 bail!("A field is set that is not available on the selected protocol version");
1096 }
1097 }
1098 if version >= 12 {
1099 let num_tagged_fields = self.unknown_tagged_fields.len();
1100 if num_tagged_fields > std::u32::MAX as usize {
1101 bail!(
1102 "Too many tagged fields to encode ({} fields)",
1103 num_tagged_fields
1104 );
1105 }
1106 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1107
1108 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1109 }
1110 Ok(())
1111 }
1112 fn compute_size(&self, version: i16) -> Result<usize> {
1113 let mut total_size = 0;
1114 if version >= 16 {
1115 total_size += types::Int32.compute_size(&self.node_id)?;
1116 } else {
1117 if self.node_id != 0 {
1118 bail!("A field is set that is not available on the selected protocol version");
1119 }
1120 }
1121 if version >= 16 {
1122 total_size += types::CompactString.compute_size(&self.host)?;
1123 } else {
1124 if !self.host.is_empty() {
1125 bail!("A field is set that is not available on the selected protocol version");
1126 }
1127 }
1128 if version >= 16 {
1129 total_size += types::Int32.compute_size(&self.port)?;
1130 } else {
1131 if self.port != 0 {
1132 bail!("A field is set that is not available on the selected protocol version");
1133 }
1134 }
1135 if version >= 16 {
1136 total_size += types::CompactString.compute_size(&self.rack)?;
1137 } else {
1138 if !self.rack.is_none() {
1139 bail!("A field is set that is not available on the selected protocol version");
1140 }
1141 }
1142 if version >= 12 {
1143 let num_tagged_fields = self.unknown_tagged_fields.len();
1144 if num_tagged_fields > std::u32::MAX as usize {
1145 bail!(
1146 "Too many tagged fields to encode ({} fields)",
1147 num_tagged_fields
1148 );
1149 }
1150 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1151
1152 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1153 }
1154 Ok(total_size)
1155 }
1156}
1157
1158#[cfg(feature = "client")]
1159impl Decodable for NodeEndpoint {
1160 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1161 if version < 0 || version > 17 {
1162 bail!("specified version not supported by this message type");
1163 }
1164 let node_id = if version >= 16 {
1165 types::Int32.decode(buf)?
1166 } else {
1167 (0).into()
1168 };
1169 let host = if version >= 16 {
1170 types::CompactString.decode(buf)?
1171 } else {
1172 Default::default()
1173 };
1174 let port = if version >= 16 {
1175 types::Int32.decode(buf)?
1176 } else {
1177 0
1178 };
1179 let rack = if version >= 16 {
1180 types::CompactString.decode(buf)?
1181 } else {
1182 None
1183 };
1184 let mut unknown_tagged_fields = BTreeMap::new();
1185 if version >= 12 {
1186 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1187 for _ in 0..num_tagged_fields {
1188 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1189 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1190 let unknown_value = buf.try_get_bytes(size as usize)?;
1191 unknown_tagged_fields.insert(tag as i32, unknown_value);
1192 }
1193 }
1194 Ok(Self {
1195 node_id,
1196 host,
1197 port,
1198 rack,
1199 unknown_tagged_fields,
1200 })
1201 }
1202}
1203
1204impl Default for NodeEndpoint {
1205 fn default() -> Self {
1206 Self {
1207 node_id: (0).into(),
1208 host: Default::default(),
1209 port: 0,
1210 rack: None,
1211 unknown_tagged_fields: BTreeMap::new(),
1212 }
1213 }
1214}
1215
1216impl Message for NodeEndpoint {
1217 const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
1218 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1219}
1220
1221#[non_exhaustive]
1223#[derive(Debug, Clone, PartialEq)]
1224pub struct PartitionData {
1225 pub partition_index: i32,
1229
1230 pub error_code: i16,
1234
1235 pub high_watermark: i64,
1239
1240 pub last_stable_offset: i64,
1244
1245 pub log_start_offset: i64,
1249
1250 pub diverging_epoch: EpochEndOffset,
1254
1255 pub current_leader: LeaderIdAndEpoch,
1259
1260 pub snapshot_id: SnapshotId,
1264
1265 pub aborted_transactions: Option<Vec<AbortedTransaction>>,
1269
1270 pub preferred_read_replica: super::BrokerId,
1274
1275 pub records: Option<Bytes>,
1279
1280 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1282}
1283
1284impl PartitionData {
1285 pub fn with_partition_index(mut self, value: i32) -> Self {
1291 self.partition_index = value;
1292 self
1293 }
1294 pub fn with_error_code(mut self, value: i16) -> Self {
1300 self.error_code = value;
1301 self
1302 }
1303 pub fn with_high_watermark(mut self, value: i64) -> Self {
1309 self.high_watermark = value;
1310 self
1311 }
1312 pub fn with_last_stable_offset(mut self, value: i64) -> Self {
1318 self.last_stable_offset = value;
1319 self
1320 }
1321 pub fn with_log_start_offset(mut self, value: i64) -> Self {
1327 self.log_start_offset = value;
1328 self
1329 }
1330 pub fn with_diverging_epoch(mut self, value: EpochEndOffset) -> Self {
1336 self.diverging_epoch = value;
1337 self
1338 }
1339 pub fn with_current_leader(mut self, value: LeaderIdAndEpoch) -> Self {
1345 self.current_leader = value;
1346 self
1347 }
1348 pub fn with_snapshot_id(mut self, value: SnapshotId) -> Self {
1354 self.snapshot_id = value;
1355 self
1356 }
1357 pub fn with_aborted_transactions(mut self, value: Option<Vec<AbortedTransaction>>) -> Self {
1363 self.aborted_transactions = value;
1364 self
1365 }
1366 pub fn with_preferred_read_replica(mut self, value: super::BrokerId) -> Self {
1372 self.preferred_read_replica = value;
1373 self
1374 }
1375 pub fn with_records(mut self, value: Option<Bytes>) -> Self {
1381 self.records = value;
1382 self
1383 }
1384 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1386 self.unknown_tagged_fields = value;
1387 self
1388 }
1389 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1391 self.unknown_tagged_fields.insert(key, value);
1392 self
1393 }
1394}
1395
1396#[cfg(feature = "broker")]
1397impl Encodable for PartitionData {
1398 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1399 if version < 0 || version > 17 {
1400 bail!("specified version not supported by this message type");
1401 }
1402 types::Int32.encode(buf, &self.partition_index)?;
1403 types::Int16.encode(buf, &self.error_code)?;
1404 types::Int64.encode(buf, &self.high_watermark)?;
1405 if version >= 4 {
1406 types::Int64.encode(buf, &self.last_stable_offset)?;
1407 }
1408 if version >= 5 {
1409 types::Int64.encode(buf, &self.log_start_offset)?;
1410 }
1411 if version >= 4 {
1412 if version >= 12 {
1413 types::CompactArray(types::Struct { version })
1414 .encode(buf, &self.aborted_transactions)?;
1415 } else {
1416 types::Array(types::Struct { version }).encode(buf, &self.aborted_transactions)?;
1417 }
1418 }
1419 if version >= 11 {
1420 types::Int32.encode(buf, &self.preferred_read_replica)?;
1421 } else {
1422 if self.preferred_read_replica != -1 {
1423 bail!("A field is set that is not available on the selected protocol version");
1424 }
1425 }
1426 if version >= 12 {
1427 types::CompactBytes.encode(buf, &self.records)?;
1428 } else {
1429 types::Bytes.encode(buf, &self.records)?;
1430 }
1431 if version >= 12 {
1432 let mut num_tagged_fields = self.unknown_tagged_fields.len();
1433 if &self.diverging_epoch != &Default::default() {
1434 num_tagged_fields += 1;
1435 }
1436 if &self.current_leader != &Default::default() {
1437 num_tagged_fields += 1;
1438 }
1439 if &self.snapshot_id != &Default::default() {
1440 num_tagged_fields += 1;
1441 }
1442 if num_tagged_fields > std::u32::MAX as usize {
1443 bail!(
1444 "Too many tagged fields to encode ({} fields)",
1445 num_tagged_fields
1446 );
1447 }
1448 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1449 if &self.diverging_epoch != &Default::default() {
1450 let computed_size =
1451 types::Struct { version }.compute_size(&self.diverging_epoch)?;
1452 if computed_size > std::u32::MAX as usize {
1453 bail!(
1454 "Tagged field is too large to encode ({} bytes)",
1455 computed_size
1456 );
1457 }
1458 types::UnsignedVarInt.encode(buf, 0)?;
1459 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1460 types::Struct { version }.encode(buf, &self.diverging_epoch)?;
1461 }
1462 if &self.current_leader != &Default::default() {
1463 let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
1464 if computed_size > std::u32::MAX as usize {
1465 bail!(
1466 "Tagged field is too large to encode ({} bytes)",
1467 computed_size
1468 );
1469 }
1470 types::UnsignedVarInt.encode(buf, 1)?;
1471 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1472 types::Struct { version }.encode(buf, &self.current_leader)?;
1473 }
1474 if &self.snapshot_id != &Default::default() {
1475 let computed_size = types::Struct { version }.compute_size(&self.snapshot_id)?;
1476 if computed_size > std::u32::MAX as usize {
1477 bail!(
1478 "Tagged field is too large to encode ({} bytes)",
1479 computed_size
1480 );
1481 }
1482 types::UnsignedVarInt.encode(buf, 2)?;
1483 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1484 types::Struct { version }.encode(buf, &self.snapshot_id)?;
1485 }
1486
1487 write_unknown_tagged_fields(buf, 3.., &self.unknown_tagged_fields)?;
1488 }
1489 Ok(())
1490 }
1491 fn compute_size(&self, version: i16) -> Result<usize> {
1492 let mut total_size = 0;
1493 total_size += types::Int32.compute_size(&self.partition_index)?;
1494 total_size += types::Int16.compute_size(&self.error_code)?;
1495 total_size += types::Int64.compute_size(&self.high_watermark)?;
1496 if version >= 4 {
1497 total_size += types::Int64.compute_size(&self.last_stable_offset)?;
1498 }
1499 if version >= 5 {
1500 total_size += types::Int64.compute_size(&self.log_start_offset)?;
1501 }
1502 if version >= 4 {
1503 if version >= 12 {
1504 total_size += types::CompactArray(types::Struct { version })
1505 .compute_size(&self.aborted_transactions)?;
1506 } else {
1507 total_size += types::Array(types::Struct { version })
1508 .compute_size(&self.aborted_transactions)?;
1509 }
1510 }
1511 if version >= 11 {
1512 total_size += types::Int32.compute_size(&self.preferred_read_replica)?;
1513 } else {
1514 if self.preferred_read_replica != -1 {
1515 bail!("A field is set that is not available on the selected protocol version");
1516 }
1517 }
1518 if version >= 12 {
1519 total_size += types::CompactBytes.compute_size(&self.records)?;
1520 } else {
1521 total_size += types::Bytes.compute_size(&self.records)?;
1522 }
1523 if version >= 12 {
1524 let mut num_tagged_fields = self.unknown_tagged_fields.len();
1525 if &self.diverging_epoch != &Default::default() {
1526 num_tagged_fields += 1;
1527 }
1528 if &self.current_leader != &Default::default() {
1529 num_tagged_fields += 1;
1530 }
1531 if &self.snapshot_id != &Default::default() {
1532 num_tagged_fields += 1;
1533 }
1534 if num_tagged_fields > std::u32::MAX as usize {
1535 bail!(
1536 "Too many tagged fields to encode ({} fields)",
1537 num_tagged_fields
1538 );
1539 }
1540 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1541 if &self.diverging_epoch != &Default::default() {
1542 let computed_size =
1543 types::Struct { version }.compute_size(&self.diverging_epoch)?;
1544 if computed_size > std::u32::MAX as usize {
1545 bail!(
1546 "Tagged field is too large to encode ({} bytes)",
1547 computed_size
1548 );
1549 }
1550 total_size += types::UnsignedVarInt.compute_size(0)?;
1551 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1552 total_size += computed_size;
1553 }
1554 if &self.current_leader != &Default::default() {
1555 let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
1556 if computed_size > std::u32::MAX as usize {
1557 bail!(
1558 "Tagged field is too large to encode ({} bytes)",
1559 computed_size
1560 );
1561 }
1562 total_size += types::UnsignedVarInt.compute_size(1)?;
1563 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1564 total_size += computed_size;
1565 }
1566 if &self.snapshot_id != &Default::default() {
1567 let computed_size = types::Struct { version }.compute_size(&self.snapshot_id)?;
1568 if computed_size > std::u32::MAX as usize {
1569 bail!(
1570 "Tagged field is too large to encode ({} bytes)",
1571 computed_size
1572 );
1573 }
1574 total_size += types::UnsignedVarInt.compute_size(2)?;
1575 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1576 total_size += computed_size;
1577 }
1578
1579 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1580 }
1581 Ok(total_size)
1582 }
1583}
1584
1585#[cfg(feature = "client")]
1586impl Decodable for PartitionData {
1587 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1588 if version < 0 || version > 17 {
1589 bail!("specified version not supported by this message type");
1590 }
1591 let partition_index = types::Int32.decode(buf)?;
1592 let error_code = types::Int16.decode(buf)?;
1593 let high_watermark = types::Int64.decode(buf)?;
1594 let last_stable_offset = if version >= 4 {
1595 types::Int64.decode(buf)?
1596 } else {
1597 -1
1598 };
1599 let log_start_offset = if version >= 5 {
1600 types::Int64.decode(buf)?
1601 } else {
1602 -1
1603 };
1604 let mut diverging_epoch = Default::default();
1605 let mut current_leader = Default::default();
1606 let mut snapshot_id = Default::default();
1607 let aborted_transactions = if version >= 4 {
1608 if version >= 12 {
1609 types::CompactArray(types::Struct { version }).decode(buf)?
1610 } else {
1611 types::Array(types::Struct { version }).decode(buf)?
1612 }
1613 } else {
1614 Some(Default::default())
1615 };
1616 let preferred_read_replica = if version >= 11 {
1617 types::Int32.decode(buf)?
1618 } else {
1619 (-1).into()
1620 };
1621 let records = if version >= 12 {
1622 types::CompactBytes.decode(buf)?
1623 } else {
1624 types::Bytes.decode(buf)?
1625 };
1626 let mut unknown_tagged_fields = BTreeMap::new();
1627 if version >= 12 {
1628 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1629 for _ in 0..num_tagged_fields {
1630 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1631 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1632 match tag {
1633 0 => {
1634 diverging_epoch = types::Struct { version }.decode(buf)?;
1635 }
1636 1 => {
1637 current_leader = types::Struct { version }.decode(buf)?;
1638 }
1639 2 => {
1640 snapshot_id = types::Struct { version }.decode(buf)?;
1641 }
1642 _ => {
1643 let unknown_value = buf.try_get_bytes(size as usize)?;
1644 unknown_tagged_fields.insert(tag as i32, unknown_value);
1645 }
1646 }
1647 }
1648 }
1649 Ok(Self {
1650 partition_index,
1651 error_code,
1652 high_watermark,
1653 last_stable_offset,
1654 log_start_offset,
1655 diverging_epoch,
1656 current_leader,
1657 snapshot_id,
1658 aborted_transactions,
1659 preferred_read_replica,
1660 records,
1661 unknown_tagged_fields,
1662 })
1663 }
1664}
1665
1666impl Default for PartitionData {
1667 fn default() -> Self {
1668 Self {
1669 partition_index: 0,
1670 error_code: 0,
1671 high_watermark: 0,
1672 last_stable_offset: -1,
1673 log_start_offset: -1,
1674 diverging_epoch: Default::default(),
1675 current_leader: Default::default(),
1676 snapshot_id: Default::default(),
1677 aborted_transactions: Some(Default::default()),
1678 preferred_read_replica: (-1).into(),
1679 records: Some(Default::default()),
1680 unknown_tagged_fields: BTreeMap::new(),
1681 }
1682 }
1683}
1684
1685impl Message for PartitionData {
1686 const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
1687 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1688}
1689
1690#[non_exhaustive]
1692#[derive(Debug, Clone, PartialEq)]
1693pub struct SnapshotId {
1694 pub end_offset: i64,
1698
1699 pub epoch: i32,
1703
1704 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1706}
1707
1708impl SnapshotId {
1709 pub fn with_end_offset(mut self, value: i64) -> Self {
1715 self.end_offset = value;
1716 self
1717 }
1718 pub fn with_epoch(mut self, value: i32) -> Self {
1724 self.epoch = value;
1725 self
1726 }
1727 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1729 self.unknown_tagged_fields = value;
1730 self
1731 }
1732 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1734 self.unknown_tagged_fields.insert(key, value);
1735 self
1736 }
1737}
1738
1739#[cfg(feature = "broker")]
1740impl Encodable for SnapshotId {
1741 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1742 if version < 0 || version > 17 {
1743 bail!("specified version not supported by this message type");
1744 }
1745 types::Int64.encode(buf, &self.end_offset)?;
1746 types::Int32.encode(buf, &self.epoch)?;
1747 if version >= 12 {
1748 let num_tagged_fields = self.unknown_tagged_fields.len();
1749 if num_tagged_fields > std::u32::MAX as usize {
1750 bail!(
1751 "Too many tagged fields to encode ({} fields)",
1752 num_tagged_fields
1753 );
1754 }
1755 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1756
1757 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1758 }
1759 Ok(())
1760 }
1761 fn compute_size(&self, version: i16) -> Result<usize> {
1762 let mut total_size = 0;
1763 total_size += types::Int64.compute_size(&self.end_offset)?;
1764 total_size += types::Int32.compute_size(&self.epoch)?;
1765 if version >= 12 {
1766 let num_tagged_fields = self.unknown_tagged_fields.len();
1767 if num_tagged_fields > std::u32::MAX as usize {
1768 bail!(
1769 "Too many tagged fields to encode ({} fields)",
1770 num_tagged_fields
1771 );
1772 }
1773 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1774
1775 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1776 }
1777 Ok(total_size)
1778 }
1779}
1780
1781#[cfg(feature = "client")]
1782impl Decodable for SnapshotId {
1783 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1784 if version < 0 || version > 17 {
1785 bail!("specified version not supported by this message type");
1786 }
1787 let end_offset = types::Int64.decode(buf)?;
1788 let epoch = types::Int32.decode(buf)?;
1789 let mut unknown_tagged_fields = BTreeMap::new();
1790 if version >= 12 {
1791 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1792 for _ in 0..num_tagged_fields {
1793 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1794 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1795 let unknown_value = buf.try_get_bytes(size as usize)?;
1796 unknown_tagged_fields.insert(tag as i32, unknown_value);
1797 }
1798 }
1799 Ok(Self {
1800 end_offset,
1801 epoch,
1802 unknown_tagged_fields,
1803 })
1804 }
1805}
1806
1807impl Default for SnapshotId {
1808 fn default() -> Self {
1809 Self {
1810 end_offset: -1,
1811 epoch: -1,
1812 unknown_tagged_fields: BTreeMap::new(),
1813 }
1814 }
1815}
1816
1817impl Message for SnapshotId {
1818 const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
1819 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1820}
1821
1822impl HeaderVersion for FetchResponse {
1823 fn header_version(version: i16) -> i16 {
1824 if version >= 12 {
1825 1
1826 } else {
1827 0
1828 }
1829 }
1830}