1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AbortedTransaction {
24 pub producer_id: super::ProducerId,
28
29 pub first_offset: i64,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl AbortedTransaction {
39 pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
45 self.producer_id = value;
46 self
47 }
48 pub fn with_first_offset(mut self, value: i64) -> Self {
54 self.first_offset = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for AbortedTransaction {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version >= 4 {
73 types::Int64.encode(buf, &self.producer_id)?;
74 } else {
75 if self.producer_id != 0 {
76 bail!("A field is set that is not available on the selected protocol version");
77 }
78 }
79 if version >= 4 {
80 types::Int64.encode(buf, &self.first_offset)?;
81 } else {
82 if self.first_offset != 0 {
83 bail!("A field is set that is not available on the selected protocol version");
84 }
85 }
86 if version >= 12 {
87 let num_tagged_fields = self.unknown_tagged_fields.len();
88 if num_tagged_fields > std::u32::MAX as usize {
89 bail!(
90 "Too many tagged fields to encode ({} fields)",
91 num_tagged_fields
92 );
93 }
94 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
95
96 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
97 }
98 Ok(())
99 }
100 fn compute_size(&self, version: i16) -> Result<usize> {
101 let mut total_size = 0;
102 if version >= 4 {
103 total_size += types::Int64.compute_size(&self.producer_id)?;
104 } else {
105 if self.producer_id != 0 {
106 bail!("A field is set that is not available on the selected protocol version");
107 }
108 }
109 if version >= 4 {
110 total_size += types::Int64.compute_size(&self.first_offset)?;
111 } else {
112 if self.first_offset != 0 {
113 bail!("A field is set that is not available on the selected protocol version");
114 }
115 }
116 if version >= 12 {
117 let num_tagged_fields = self.unknown_tagged_fields.len();
118 if num_tagged_fields > std::u32::MAX as usize {
119 bail!(
120 "Too many tagged fields to encode ({} fields)",
121 num_tagged_fields
122 );
123 }
124 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
125
126 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
127 }
128 Ok(total_size)
129 }
130}
131
132#[cfg(feature = "client")]
133impl Decodable for AbortedTransaction {
134 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
135 let producer_id = if version >= 4 {
136 types::Int64.decode(buf)?
137 } else {
138 (0).into()
139 };
140 let first_offset = if version >= 4 {
141 types::Int64.decode(buf)?
142 } else {
143 0
144 };
145 let mut unknown_tagged_fields = BTreeMap::new();
146 if version >= 12 {
147 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
148 for _ in 0..num_tagged_fields {
149 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
150 let size: u32 = types::UnsignedVarInt.decode(buf)?;
151 let unknown_value = buf.try_get_bytes(size as usize)?;
152 unknown_tagged_fields.insert(tag as i32, unknown_value);
153 }
154 }
155 Ok(Self {
156 producer_id,
157 first_offset,
158 unknown_tagged_fields,
159 })
160 }
161}
162
163impl Default for AbortedTransaction {
164 fn default() -> Self {
165 Self {
166 producer_id: (0).into(),
167 first_offset: 0,
168 unknown_tagged_fields: BTreeMap::new(),
169 }
170 }
171}
172
173impl Message for AbortedTransaction {
174 const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
175 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
176}
177
178#[non_exhaustive]
180#[derive(Debug, Clone, PartialEq)]
181pub struct EpochEndOffset {
182 pub epoch: i32,
186
187 pub end_offset: i64,
191
192 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
194}
195
196impl EpochEndOffset {
197 pub fn with_epoch(mut self, value: i32) -> Self {
203 self.epoch = value;
204 self
205 }
206 pub fn with_end_offset(mut self, value: i64) -> Self {
212 self.end_offset = value;
213 self
214 }
215 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
217 self.unknown_tagged_fields = value;
218 self
219 }
220 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
222 self.unknown_tagged_fields.insert(key, value);
223 self
224 }
225}
226
227#[cfg(feature = "broker")]
228impl Encodable for EpochEndOffset {
229 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
230 if version >= 12 {
231 types::Int32.encode(buf, &self.epoch)?;
232 } else {
233 if self.epoch != -1 {
234 bail!("A field is set that is not available on the selected protocol version");
235 }
236 }
237 if version >= 12 {
238 types::Int64.encode(buf, &self.end_offset)?;
239 } else {
240 if self.end_offset != -1 {
241 bail!("A field is set that is not available on the selected protocol version");
242 }
243 }
244 if version >= 12 {
245 let num_tagged_fields = self.unknown_tagged_fields.len();
246 if num_tagged_fields > std::u32::MAX as usize {
247 bail!(
248 "Too many tagged fields to encode ({} fields)",
249 num_tagged_fields
250 );
251 }
252 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
253
254 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
255 }
256 Ok(())
257 }
258 fn compute_size(&self, version: i16) -> Result<usize> {
259 let mut total_size = 0;
260 if version >= 12 {
261 total_size += types::Int32.compute_size(&self.epoch)?;
262 } else {
263 if self.epoch != -1 {
264 bail!("A field is set that is not available on the selected protocol version");
265 }
266 }
267 if version >= 12 {
268 total_size += types::Int64.compute_size(&self.end_offset)?;
269 } else {
270 if self.end_offset != -1 {
271 bail!("A field is set that is not available on the selected protocol version");
272 }
273 }
274 if version >= 12 {
275 let num_tagged_fields = self.unknown_tagged_fields.len();
276 if num_tagged_fields > std::u32::MAX as usize {
277 bail!(
278 "Too many tagged fields to encode ({} fields)",
279 num_tagged_fields
280 );
281 }
282 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
283
284 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
285 }
286 Ok(total_size)
287 }
288}
289
290#[cfg(feature = "client")]
291impl Decodable for EpochEndOffset {
292 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
293 let epoch = if version >= 12 {
294 types::Int32.decode(buf)?
295 } else {
296 -1
297 };
298 let end_offset = if version >= 12 {
299 types::Int64.decode(buf)?
300 } else {
301 -1
302 };
303 let mut unknown_tagged_fields = BTreeMap::new();
304 if version >= 12 {
305 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
306 for _ in 0..num_tagged_fields {
307 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
308 let size: u32 = types::UnsignedVarInt.decode(buf)?;
309 let unknown_value = buf.try_get_bytes(size as usize)?;
310 unknown_tagged_fields.insert(tag as i32, unknown_value);
311 }
312 }
313 Ok(Self {
314 epoch,
315 end_offset,
316 unknown_tagged_fields,
317 })
318 }
319}
320
321impl Default for EpochEndOffset {
322 fn default() -> Self {
323 Self {
324 epoch: -1,
325 end_offset: -1,
326 unknown_tagged_fields: BTreeMap::new(),
327 }
328 }
329}
330
331impl Message for EpochEndOffset {
332 const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
333 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
334}
335
336#[non_exhaustive]
338#[derive(Debug, Clone, PartialEq)]
339pub struct FetchResponse {
340 pub throttle_time_ms: i32,
344
345 pub error_code: i16,
349
350 pub session_id: i32,
354
355 pub responses: Vec<FetchableTopicResponse>,
359
360 pub node_endpoints: Vec<NodeEndpoint>,
364
365 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
367}
368
369impl FetchResponse {
370 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
376 self.throttle_time_ms = value;
377 self
378 }
379 pub fn with_error_code(mut self, value: i16) -> Self {
385 self.error_code = value;
386 self
387 }
388 pub fn with_session_id(mut self, value: i32) -> Self {
394 self.session_id = value;
395 self
396 }
397 pub fn with_responses(mut self, value: Vec<FetchableTopicResponse>) -> Self {
403 self.responses = value;
404 self
405 }
406 pub fn with_node_endpoints(mut self, value: Vec<NodeEndpoint>) -> Self {
412 self.node_endpoints = value;
413 self
414 }
415 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
417 self.unknown_tagged_fields = value;
418 self
419 }
420 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
422 self.unknown_tagged_fields.insert(key, value);
423 self
424 }
425}
426
427#[cfg(feature = "broker")]
428impl Encodable for FetchResponse {
429 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
430 if version >= 1 {
431 types::Int32.encode(buf, &self.throttle_time_ms)?;
432 }
433 if version >= 7 {
434 types::Int16.encode(buf, &self.error_code)?;
435 }
436 if version >= 7 {
437 types::Int32.encode(buf, &self.session_id)?;
438 } else {
439 if self.session_id != 0 {
440 bail!("A field is set that is not available on the selected protocol version");
441 }
442 }
443 if version >= 12 {
444 types::CompactArray(types::Struct { version }).encode(buf, &self.responses)?;
445 } else {
446 types::Array(types::Struct { version }).encode(buf, &self.responses)?;
447 }
448 if version >= 12 {
449 let mut num_tagged_fields = self.unknown_tagged_fields.len();
450 if version >= 16 {
451 if !self.node_endpoints.is_empty() {
452 num_tagged_fields += 1;
453 }
454 }
455 if num_tagged_fields > std::u32::MAX as usize {
456 bail!(
457 "Too many tagged fields to encode ({} fields)",
458 num_tagged_fields
459 );
460 }
461 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
462 if version >= 16 {
463 if !self.node_endpoints.is_empty() {
464 let computed_size = types::CompactArray(types::Struct { version })
465 .compute_size(&self.node_endpoints)?;
466 if computed_size > std::u32::MAX as usize {
467 bail!(
468 "Tagged field is too large to encode ({} bytes)",
469 computed_size
470 );
471 }
472 types::UnsignedVarInt.encode(buf, 0)?;
473 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
474 types::CompactArray(types::Struct { version })
475 .encode(buf, &self.node_endpoints)?;
476 }
477 }
478 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
479 }
480 Ok(())
481 }
482 fn compute_size(&self, version: i16) -> Result<usize> {
483 let mut total_size = 0;
484 if version >= 1 {
485 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
486 }
487 if version >= 7 {
488 total_size += types::Int16.compute_size(&self.error_code)?;
489 }
490 if version >= 7 {
491 total_size += types::Int32.compute_size(&self.session_id)?;
492 } else {
493 if self.session_id != 0 {
494 bail!("A field is set that is not available on the selected protocol version");
495 }
496 }
497 if version >= 12 {
498 total_size +=
499 types::CompactArray(types::Struct { version }).compute_size(&self.responses)?;
500 } else {
501 total_size += types::Array(types::Struct { version }).compute_size(&self.responses)?;
502 }
503 if version >= 12 {
504 let mut num_tagged_fields = self.unknown_tagged_fields.len();
505 if version >= 16 {
506 if !self.node_endpoints.is_empty() {
507 num_tagged_fields += 1;
508 }
509 }
510 if num_tagged_fields > std::u32::MAX as usize {
511 bail!(
512 "Too many tagged fields to encode ({} fields)",
513 num_tagged_fields
514 );
515 }
516 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
517 if version >= 16 {
518 if !self.node_endpoints.is_empty() {
519 let computed_size = types::CompactArray(types::Struct { version })
520 .compute_size(&self.node_endpoints)?;
521 if computed_size > std::u32::MAX as usize {
522 bail!(
523 "Tagged field is too large to encode ({} bytes)",
524 computed_size
525 );
526 }
527 total_size += types::UnsignedVarInt.compute_size(0)?;
528 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
529 total_size += computed_size;
530 }
531 }
532 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
533 }
534 Ok(total_size)
535 }
536}
537
538#[cfg(feature = "client")]
539impl Decodable for FetchResponse {
540 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
541 let throttle_time_ms = if version >= 1 {
542 types::Int32.decode(buf)?
543 } else {
544 0
545 };
546 let error_code = if version >= 7 {
547 types::Int16.decode(buf)?
548 } else {
549 0
550 };
551 let session_id = if version >= 7 {
552 types::Int32.decode(buf)?
553 } else {
554 0
555 };
556 let responses = if version >= 12 {
557 types::CompactArray(types::Struct { version }).decode(buf)?
558 } else {
559 types::Array(types::Struct { version }).decode(buf)?
560 };
561 let mut node_endpoints = Default::default();
562 let mut unknown_tagged_fields = BTreeMap::new();
563 if version >= 12 {
564 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
565 for _ in 0..num_tagged_fields {
566 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
567 let size: u32 = types::UnsignedVarInt.decode(buf)?;
568 match tag {
569 0 => {
570 if version >= 16 {
571 node_endpoints =
572 types::CompactArray(types::Struct { version }).decode(buf)?;
573 } else {
574 bail!("Tag {} is not valid for version {}", tag, version);
575 }
576 }
577 _ => {
578 let unknown_value = buf.try_get_bytes(size as usize)?;
579 unknown_tagged_fields.insert(tag as i32, unknown_value);
580 }
581 }
582 }
583 }
584 Ok(Self {
585 throttle_time_ms,
586 error_code,
587 session_id,
588 responses,
589 node_endpoints,
590 unknown_tagged_fields,
591 })
592 }
593}
594
595impl Default for FetchResponse {
596 fn default() -> Self {
597 Self {
598 throttle_time_ms: 0,
599 error_code: 0,
600 session_id: 0,
601 responses: Default::default(),
602 node_endpoints: Default::default(),
603 unknown_tagged_fields: BTreeMap::new(),
604 }
605 }
606}
607
608impl Message for FetchResponse {
609 const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
610 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
611}
612
613#[non_exhaustive]
615#[derive(Debug, Clone, PartialEq)]
616pub struct FetchableTopicResponse {
617 pub topic: super::TopicName,
621
622 pub topic_id: Uuid,
626
627 pub partitions: Vec<PartitionData>,
631
632 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
634}
635
636impl FetchableTopicResponse {
637 pub fn with_topic(mut self, value: super::TopicName) -> Self {
643 self.topic = value;
644 self
645 }
646 pub fn with_topic_id(mut self, value: Uuid) -> Self {
652 self.topic_id = value;
653 self
654 }
655 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
661 self.partitions = value;
662 self
663 }
664 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
666 self.unknown_tagged_fields = value;
667 self
668 }
669 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
671 self.unknown_tagged_fields.insert(key, value);
672 self
673 }
674}
675
676#[cfg(feature = "broker")]
677impl Encodable for FetchableTopicResponse {
678 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
679 if version <= 12 {
680 if version >= 12 {
681 types::CompactString.encode(buf, &self.topic)?;
682 } else {
683 types::String.encode(buf, &self.topic)?;
684 }
685 }
686 if version >= 13 {
687 types::Uuid.encode(buf, &self.topic_id)?;
688 }
689 if version >= 12 {
690 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
691 } else {
692 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
693 }
694 if version >= 12 {
695 let num_tagged_fields = self.unknown_tagged_fields.len();
696 if num_tagged_fields > std::u32::MAX as usize {
697 bail!(
698 "Too many tagged fields to encode ({} fields)",
699 num_tagged_fields
700 );
701 }
702 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
703
704 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
705 }
706 Ok(())
707 }
708 fn compute_size(&self, version: i16) -> Result<usize> {
709 let mut total_size = 0;
710 if version <= 12 {
711 if version >= 12 {
712 total_size += types::CompactString.compute_size(&self.topic)?;
713 } else {
714 total_size += types::String.compute_size(&self.topic)?;
715 }
716 }
717 if version >= 13 {
718 total_size += types::Uuid.compute_size(&self.topic_id)?;
719 }
720 if version >= 12 {
721 total_size +=
722 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
723 } else {
724 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
725 }
726 if version >= 12 {
727 let num_tagged_fields = self.unknown_tagged_fields.len();
728 if num_tagged_fields > std::u32::MAX as usize {
729 bail!(
730 "Too many tagged fields to encode ({} fields)",
731 num_tagged_fields
732 );
733 }
734 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
735
736 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
737 }
738 Ok(total_size)
739 }
740}
741
742#[cfg(feature = "client")]
743impl Decodable for FetchableTopicResponse {
744 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
745 let topic = if version <= 12 {
746 if version >= 12 {
747 types::CompactString.decode(buf)?
748 } else {
749 types::String.decode(buf)?
750 }
751 } else {
752 Default::default()
753 };
754 let topic_id = if version >= 13 {
755 types::Uuid.decode(buf)?
756 } else {
757 Uuid::nil()
758 };
759 let partitions = if version >= 12 {
760 types::CompactArray(types::Struct { version }).decode(buf)?
761 } else {
762 types::Array(types::Struct { version }).decode(buf)?
763 };
764 let mut unknown_tagged_fields = BTreeMap::new();
765 if version >= 12 {
766 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
767 for _ in 0..num_tagged_fields {
768 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
769 let size: u32 = types::UnsignedVarInt.decode(buf)?;
770 let unknown_value = buf.try_get_bytes(size as usize)?;
771 unknown_tagged_fields.insert(tag as i32, unknown_value);
772 }
773 }
774 Ok(Self {
775 topic,
776 topic_id,
777 partitions,
778 unknown_tagged_fields,
779 })
780 }
781}
782
783impl Default for FetchableTopicResponse {
784 fn default() -> Self {
785 Self {
786 topic: Default::default(),
787 topic_id: Uuid::nil(),
788 partitions: Default::default(),
789 unknown_tagged_fields: BTreeMap::new(),
790 }
791 }
792}
793
794impl Message for FetchableTopicResponse {
795 const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
796 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
797}
798
799#[non_exhaustive]
801#[derive(Debug, Clone, PartialEq)]
802pub struct LeaderIdAndEpoch {
803 pub leader_id: super::BrokerId,
807
808 pub leader_epoch: i32,
812
813 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
815}
816
817impl LeaderIdAndEpoch {
818 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
824 self.leader_id = value;
825 self
826 }
827 pub fn with_leader_epoch(mut self, value: i32) -> Self {
833 self.leader_epoch = value;
834 self
835 }
836 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
838 self.unknown_tagged_fields = value;
839 self
840 }
841 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
843 self.unknown_tagged_fields.insert(key, value);
844 self
845 }
846}
847
848#[cfg(feature = "broker")]
849impl Encodable for LeaderIdAndEpoch {
850 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
851 if version >= 12 {
852 types::Int32.encode(buf, &self.leader_id)?;
853 } else {
854 if self.leader_id != -1 {
855 bail!("A field is set that is not available on the selected protocol version");
856 }
857 }
858 if version >= 12 {
859 types::Int32.encode(buf, &self.leader_epoch)?;
860 } else {
861 if self.leader_epoch != -1 {
862 bail!("A field is set that is not available on the selected protocol version");
863 }
864 }
865 if version >= 12 {
866 let num_tagged_fields = self.unknown_tagged_fields.len();
867 if num_tagged_fields > std::u32::MAX as usize {
868 bail!(
869 "Too many tagged fields to encode ({} fields)",
870 num_tagged_fields
871 );
872 }
873 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
874
875 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
876 }
877 Ok(())
878 }
879 fn compute_size(&self, version: i16) -> Result<usize> {
880 let mut total_size = 0;
881 if version >= 12 {
882 total_size += types::Int32.compute_size(&self.leader_id)?;
883 } else {
884 if self.leader_id != -1 {
885 bail!("A field is set that is not available on the selected protocol version");
886 }
887 }
888 if version >= 12 {
889 total_size += types::Int32.compute_size(&self.leader_epoch)?;
890 } else {
891 if self.leader_epoch != -1 {
892 bail!("A field is set that is not available on the selected protocol version");
893 }
894 }
895 if version >= 12 {
896 let num_tagged_fields = self.unknown_tagged_fields.len();
897 if num_tagged_fields > std::u32::MAX as usize {
898 bail!(
899 "Too many tagged fields to encode ({} fields)",
900 num_tagged_fields
901 );
902 }
903 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
904
905 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
906 }
907 Ok(total_size)
908 }
909}
910
911#[cfg(feature = "client")]
912impl Decodable for LeaderIdAndEpoch {
913 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
914 let leader_id = if version >= 12 {
915 types::Int32.decode(buf)?
916 } else {
917 (-1).into()
918 };
919 let leader_epoch = if version >= 12 {
920 types::Int32.decode(buf)?
921 } else {
922 -1
923 };
924 let mut unknown_tagged_fields = BTreeMap::new();
925 if version >= 12 {
926 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
927 for _ in 0..num_tagged_fields {
928 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
929 let size: u32 = types::UnsignedVarInt.decode(buf)?;
930 let unknown_value = buf.try_get_bytes(size as usize)?;
931 unknown_tagged_fields.insert(tag as i32, unknown_value);
932 }
933 }
934 Ok(Self {
935 leader_id,
936 leader_epoch,
937 unknown_tagged_fields,
938 })
939 }
940}
941
942impl Default for LeaderIdAndEpoch {
943 fn default() -> Self {
944 Self {
945 leader_id: (-1).into(),
946 leader_epoch: -1,
947 unknown_tagged_fields: BTreeMap::new(),
948 }
949 }
950}
951
952impl Message for LeaderIdAndEpoch {
953 const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
954 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
955}
956
957#[non_exhaustive]
959#[derive(Debug, Clone, PartialEq)]
960pub struct NodeEndpoint {
961 pub node_id: super::BrokerId,
965
966 pub host: StrBytes,
970
971 pub port: i32,
975
976 pub rack: Option<StrBytes>,
980
981 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
983}
984
985impl NodeEndpoint {
986 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
992 self.node_id = value;
993 self
994 }
995 pub fn with_host(mut self, value: StrBytes) -> Self {
1001 self.host = value;
1002 self
1003 }
1004 pub fn with_port(mut self, value: i32) -> Self {
1010 self.port = value;
1011 self
1012 }
1013 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
1019 self.rack = value;
1020 self
1021 }
1022 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1024 self.unknown_tagged_fields = value;
1025 self
1026 }
1027 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1029 self.unknown_tagged_fields.insert(key, value);
1030 self
1031 }
1032}
1033
1034#[cfg(feature = "broker")]
1035impl Encodable for NodeEndpoint {
1036 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1037 if version >= 16 {
1038 types::Int32.encode(buf, &self.node_id)?;
1039 } else {
1040 if self.node_id != 0 {
1041 bail!("A field is set that is not available on the selected protocol version");
1042 }
1043 }
1044 if version >= 16 {
1045 types::CompactString.encode(buf, &self.host)?;
1046 } else {
1047 if !self.host.is_empty() {
1048 bail!("A field is set that is not available on the selected protocol version");
1049 }
1050 }
1051 if version >= 16 {
1052 types::Int32.encode(buf, &self.port)?;
1053 } else {
1054 if self.port != 0 {
1055 bail!("A field is set that is not available on the selected protocol version");
1056 }
1057 }
1058 if version >= 16 {
1059 types::CompactString.encode(buf, &self.rack)?;
1060 } else {
1061 if !self.rack.is_none() {
1062 bail!("A field is set that is not available on the selected protocol version");
1063 }
1064 }
1065 if version >= 12 {
1066 let num_tagged_fields = self.unknown_tagged_fields.len();
1067 if num_tagged_fields > std::u32::MAX as usize {
1068 bail!(
1069 "Too many tagged fields to encode ({} fields)",
1070 num_tagged_fields
1071 );
1072 }
1073 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1074
1075 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1076 }
1077 Ok(())
1078 }
1079 fn compute_size(&self, version: i16) -> Result<usize> {
1080 let mut total_size = 0;
1081 if version >= 16 {
1082 total_size += types::Int32.compute_size(&self.node_id)?;
1083 } else {
1084 if self.node_id != 0 {
1085 bail!("A field is set that is not available on the selected protocol version");
1086 }
1087 }
1088 if version >= 16 {
1089 total_size += types::CompactString.compute_size(&self.host)?;
1090 } else {
1091 if !self.host.is_empty() {
1092 bail!("A field is set that is not available on the selected protocol version");
1093 }
1094 }
1095 if version >= 16 {
1096 total_size += types::Int32.compute_size(&self.port)?;
1097 } else {
1098 if self.port != 0 {
1099 bail!("A field is set that is not available on the selected protocol version");
1100 }
1101 }
1102 if version >= 16 {
1103 total_size += types::CompactString.compute_size(&self.rack)?;
1104 } else {
1105 if !self.rack.is_none() {
1106 bail!("A field is set that is not available on the selected protocol version");
1107 }
1108 }
1109 if version >= 12 {
1110 let num_tagged_fields = self.unknown_tagged_fields.len();
1111 if num_tagged_fields > std::u32::MAX as usize {
1112 bail!(
1113 "Too many tagged fields to encode ({} fields)",
1114 num_tagged_fields
1115 );
1116 }
1117 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1118
1119 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1120 }
1121 Ok(total_size)
1122 }
1123}
1124
1125#[cfg(feature = "client")]
1126impl Decodable for NodeEndpoint {
1127 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1128 let node_id = if version >= 16 {
1129 types::Int32.decode(buf)?
1130 } else {
1131 (0).into()
1132 };
1133 let host = if version >= 16 {
1134 types::CompactString.decode(buf)?
1135 } else {
1136 Default::default()
1137 };
1138 let port = if version >= 16 {
1139 types::Int32.decode(buf)?
1140 } else {
1141 0
1142 };
1143 let rack = if version >= 16 {
1144 types::CompactString.decode(buf)?
1145 } else {
1146 None
1147 };
1148 let mut unknown_tagged_fields = BTreeMap::new();
1149 if version >= 12 {
1150 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1151 for _ in 0..num_tagged_fields {
1152 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1153 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1154 let unknown_value = buf.try_get_bytes(size as usize)?;
1155 unknown_tagged_fields.insert(tag as i32, unknown_value);
1156 }
1157 }
1158 Ok(Self {
1159 node_id,
1160 host,
1161 port,
1162 rack,
1163 unknown_tagged_fields,
1164 })
1165 }
1166}
1167
1168impl Default for NodeEndpoint {
1169 fn default() -> Self {
1170 Self {
1171 node_id: (0).into(),
1172 host: Default::default(),
1173 port: 0,
1174 rack: None,
1175 unknown_tagged_fields: BTreeMap::new(),
1176 }
1177 }
1178}
1179
1180impl Message for NodeEndpoint {
1181 const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
1182 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1183}
1184
1185#[non_exhaustive]
1187#[derive(Debug, Clone, PartialEq)]
1188pub struct PartitionData {
1189 pub partition_index: i32,
1193
1194 pub error_code: i16,
1198
1199 pub high_watermark: i64,
1203
1204 pub last_stable_offset: i64,
1208
1209 pub log_start_offset: i64,
1213
1214 pub diverging_epoch: EpochEndOffset,
1218
1219 pub current_leader: LeaderIdAndEpoch,
1223
1224 pub snapshot_id: SnapshotId,
1228
1229 pub aborted_transactions: Option<Vec<AbortedTransaction>>,
1233
1234 pub preferred_read_replica: super::BrokerId,
1238
1239 pub records: Option<Bytes>,
1243
1244 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1246}
1247
1248impl PartitionData {
1249 pub fn with_partition_index(mut self, value: i32) -> Self {
1255 self.partition_index = value;
1256 self
1257 }
1258 pub fn with_error_code(mut self, value: i16) -> Self {
1264 self.error_code = value;
1265 self
1266 }
1267 pub fn with_high_watermark(mut self, value: i64) -> Self {
1273 self.high_watermark = value;
1274 self
1275 }
1276 pub fn with_last_stable_offset(mut self, value: i64) -> Self {
1282 self.last_stable_offset = value;
1283 self
1284 }
1285 pub fn with_log_start_offset(mut self, value: i64) -> Self {
1291 self.log_start_offset = value;
1292 self
1293 }
1294 pub fn with_diverging_epoch(mut self, value: EpochEndOffset) -> Self {
1300 self.diverging_epoch = value;
1301 self
1302 }
1303 pub fn with_current_leader(mut self, value: LeaderIdAndEpoch) -> Self {
1309 self.current_leader = value;
1310 self
1311 }
1312 pub fn with_snapshot_id(mut self, value: SnapshotId) -> Self {
1318 self.snapshot_id = value;
1319 self
1320 }
1321 pub fn with_aborted_transactions(mut self, value: Option<Vec<AbortedTransaction>>) -> Self {
1327 self.aborted_transactions = value;
1328 self
1329 }
1330 pub fn with_preferred_read_replica(mut self, value: super::BrokerId) -> Self {
1336 self.preferred_read_replica = value;
1337 self
1338 }
1339 pub fn with_records(mut self, value: Option<Bytes>) -> Self {
1345 self.records = value;
1346 self
1347 }
1348 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1350 self.unknown_tagged_fields = value;
1351 self
1352 }
1353 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1355 self.unknown_tagged_fields.insert(key, value);
1356 self
1357 }
1358}
1359
1360#[cfg(feature = "broker")]
1361impl Encodable for PartitionData {
1362 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1363 types::Int32.encode(buf, &self.partition_index)?;
1364 types::Int16.encode(buf, &self.error_code)?;
1365 types::Int64.encode(buf, &self.high_watermark)?;
1366 if version >= 4 {
1367 types::Int64.encode(buf, &self.last_stable_offset)?;
1368 }
1369 if version >= 5 {
1370 types::Int64.encode(buf, &self.log_start_offset)?;
1371 }
1372 if version >= 4 {
1373 if version >= 12 {
1374 types::CompactArray(types::Struct { version })
1375 .encode(buf, &self.aborted_transactions)?;
1376 } else {
1377 types::Array(types::Struct { version }).encode(buf, &self.aborted_transactions)?;
1378 }
1379 }
1380 if version >= 11 {
1381 types::Int32.encode(buf, &self.preferred_read_replica)?;
1382 } else {
1383 if self.preferred_read_replica != -1 {
1384 bail!("A field is set that is not available on the selected protocol version");
1385 }
1386 }
1387 if version >= 12 {
1388 types::CompactBytes.encode(buf, &self.records)?;
1389 } else {
1390 types::Bytes.encode(buf, &self.records)?;
1391 }
1392 if version >= 12 {
1393 let mut num_tagged_fields = self.unknown_tagged_fields.len();
1394 if &self.diverging_epoch != &Default::default() {
1395 num_tagged_fields += 1;
1396 }
1397 if &self.current_leader != &Default::default() {
1398 num_tagged_fields += 1;
1399 }
1400 if &self.snapshot_id != &Default::default() {
1401 num_tagged_fields += 1;
1402 }
1403 if num_tagged_fields > std::u32::MAX as usize {
1404 bail!(
1405 "Too many tagged fields to encode ({} fields)",
1406 num_tagged_fields
1407 );
1408 }
1409 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1410 if &self.diverging_epoch != &Default::default() {
1411 let computed_size =
1412 types::Struct { version }.compute_size(&self.diverging_epoch)?;
1413 if computed_size > std::u32::MAX as usize {
1414 bail!(
1415 "Tagged field is too large to encode ({} bytes)",
1416 computed_size
1417 );
1418 }
1419 types::UnsignedVarInt.encode(buf, 0)?;
1420 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1421 types::Struct { version }.encode(buf, &self.diverging_epoch)?;
1422 }
1423 if &self.current_leader != &Default::default() {
1424 let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
1425 if computed_size > std::u32::MAX as usize {
1426 bail!(
1427 "Tagged field is too large to encode ({} bytes)",
1428 computed_size
1429 );
1430 }
1431 types::UnsignedVarInt.encode(buf, 1)?;
1432 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1433 types::Struct { version }.encode(buf, &self.current_leader)?;
1434 }
1435 if &self.snapshot_id != &Default::default() {
1436 let computed_size = types::Struct { version }.compute_size(&self.snapshot_id)?;
1437 if computed_size > std::u32::MAX as usize {
1438 bail!(
1439 "Tagged field is too large to encode ({} bytes)",
1440 computed_size
1441 );
1442 }
1443 types::UnsignedVarInt.encode(buf, 2)?;
1444 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1445 types::Struct { version }.encode(buf, &self.snapshot_id)?;
1446 }
1447
1448 write_unknown_tagged_fields(buf, 3.., &self.unknown_tagged_fields)?;
1449 }
1450 Ok(())
1451 }
1452 fn compute_size(&self, version: i16) -> Result<usize> {
1453 let mut total_size = 0;
1454 total_size += types::Int32.compute_size(&self.partition_index)?;
1455 total_size += types::Int16.compute_size(&self.error_code)?;
1456 total_size += types::Int64.compute_size(&self.high_watermark)?;
1457 if version >= 4 {
1458 total_size += types::Int64.compute_size(&self.last_stable_offset)?;
1459 }
1460 if version >= 5 {
1461 total_size += types::Int64.compute_size(&self.log_start_offset)?;
1462 }
1463 if version >= 4 {
1464 if version >= 12 {
1465 total_size += types::CompactArray(types::Struct { version })
1466 .compute_size(&self.aborted_transactions)?;
1467 } else {
1468 total_size += types::Array(types::Struct { version })
1469 .compute_size(&self.aborted_transactions)?;
1470 }
1471 }
1472 if version >= 11 {
1473 total_size += types::Int32.compute_size(&self.preferred_read_replica)?;
1474 } else {
1475 if self.preferred_read_replica != -1 {
1476 bail!("A field is set that is not available on the selected protocol version");
1477 }
1478 }
1479 if version >= 12 {
1480 total_size += types::CompactBytes.compute_size(&self.records)?;
1481 } else {
1482 total_size += types::Bytes.compute_size(&self.records)?;
1483 }
1484 if version >= 12 {
1485 let mut num_tagged_fields = self.unknown_tagged_fields.len();
1486 if &self.diverging_epoch != &Default::default() {
1487 num_tagged_fields += 1;
1488 }
1489 if &self.current_leader != &Default::default() {
1490 num_tagged_fields += 1;
1491 }
1492 if &self.snapshot_id != &Default::default() {
1493 num_tagged_fields += 1;
1494 }
1495 if num_tagged_fields > std::u32::MAX as usize {
1496 bail!(
1497 "Too many tagged fields to encode ({} fields)",
1498 num_tagged_fields
1499 );
1500 }
1501 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1502 if &self.diverging_epoch != &Default::default() {
1503 let computed_size =
1504 types::Struct { version }.compute_size(&self.diverging_epoch)?;
1505 if computed_size > std::u32::MAX as usize {
1506 bail!(
1507 "Tagged field is too large to encode ({} bytes)",
1508 computed_size
1509 );
1510 }
1511 total_size += types::UnsignedVarInt.compute_size(0)?;
1512 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1513 total_size += computed_size;
1514 }
1515 if &self.current_leader != &Default::default() {
1516 let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
1517 if computed_size > std::u32::MAX as usize {
1518 bail!(
1519 "Tagged field is too large to encode ({} bytes)",
1520 computed_size
1521 );
1522 }
1523 total_size += types::UnsignedVarInt.compute_size(1)?;
1524 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1525 total_size += computed_size;
1526 }
1527 if &self.snapshot_id != &Default::default() {
1528 let computed_size = types::Struct { version }.compute_size(&self.snapshot_id)?;
1529 if computed_size > std::u32::MAX as usize {
1530 bail!(
1531 "Tagged field is too large to encode ({} bytes)",
1532 computed_size
1533 );
1534 }
1535 total_size += types::UnsignedVarInt.compute_size(2)?;
1536 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1537 total_size += computed_size;
1538 }
1539
1540 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1541 }
1542 Ok(total_size)
1543 }
1544}
1545
1546#[cfg(feature = "client")]
1547impl Decodable for PartitionData {
1548 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1549 let partition_index = types::Int32.decode(buf)?;
1550 let error_code = types::Int16.decode(buf)?;
1551 let high_watermark = types::Int64.decode(buf)?;
1552 let last_stable_offset = if version >= 4 {
1553 types::Int64.decode(buf)?
1554 } else {
1555 -1
1556 };
1557 let log_start_offset = if version >= 5 {
1558 types::Int64.decode(buf)?
1559 } else {
1560 -1
1561 };
1562 let mut diverging_epoch = Default::default();
1563 let mut current_leader = Default::default();
1564 let mut snapshot_id = Default::default();
1565 let aborted_transactions = if version >= 4 {
1566 if version >= 12 {
1567 types::CompactArray(types::Struct { version }).decode(buf)?
1568 } else {
1569 types::Array(types::Struct { version }).decode(buf)?
1570 }
1571 } else {
1572 Some(Default::default())
1573 };
1574 let preferred_read_replica = if version >= 11 {
1575 types::Int32.decode(buf)?
1576 } else {
1577 (-1).into()
1578 };
1579 let records = if version >= 12 {
1580 types::CompactBytes.decode(buf)?
1581 } else {
1582 types::Bytes.decode(buf)?
1583 };
1584 let mut unknown_tagged_fields = BTreeMap::new();
1585 if version >= 12 {
1586 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1587 for _ in 0..num_tagged_fields {
1588 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1589 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1590 match tag {
1591 0 => {
1592 diverging_epoch = types::Struct { version }.decode(buf)?;
1593 }
1594 1 => {
1595 current_leader = types::Struct { version }.decode(buf)?;
1596 }
1597 2 => {
1598 snapshot_id = types::Struct { version }.decode(buf)?;
1599 }
1600 _ => {
1601 let unknown_value = buf.try_get_bytes(size as usize)?;
1602 unknown_tagged_fields.insert(tag as i32, unknown_value);
1603 }
1604 }
1605 }
1606 }
1607 Ok(Self {
1608 partition_index,
1609 error_code,
1610 high_watermark,
1611 last_stable_offset,
1612 log_start_offset,
1613 diverging_epoch,
1614 current_leader,
1615 snapshot_id,
1616 aborted_transactions,
1617 preferred_read_replica,
1618 records,
1619 unknown_tagged_fields,
1620 })
1621 }
1622}
1623
1624impl Default for PartitionData {
1625 fn default() -> Self {
1626 Self {
1627 partition_index: 0,
1628 error_code: 0,
1629 high_watermark: 0,
1630 last_stable_offset: -1,
1631 log_start_offset: -1,
1632 diverging_epoch: Default::default(),
1633 current_leader: Default::default(),
1634 snapshot_id: Default::default(),
1635 aborted_transactions: Some(Default::default()),
1636 preferred_read_replica: (-1).into(),
1637 records: Some(Default::default()),
1638 unknown_tagged_fields: BTreeMap::new(),
1639 }
1640 }
1641}
1642
1643impl Message for PartitionData {
1644 const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
1645 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1646}
1647
1648#[non_exhaustive]
1650#[derive(Debug, Clone, PartialEq)]
1651pub struct SnapshotId {
1652 pub end_offset: i64,
1656
1657 pub epoch: i32,
1661
1662 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1664}
1665
1666impl SnapshotId {
1667 pub fn with_end_offset(mut self, value: i64) -> Self {
1673 self.end_offset = value;
1674 self
1675 }
1676 pub fn with_epoch(mut self, value: i32) -> Self {
1682 self.epoch = value;
1683 self
1684 }
1685 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1687 self.unknown_tagged_fields = value;
1688 self
1689 }
1690 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1692 self.unknown_tagged_fields.insert(key, value);
1693 self
1694 }
1695}
1696
1697#[cfg(feature = "broker")]
1698impl Encodable for SnapshotId {
1699 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1700 types::Int64.encode(buf, &self.end_offset)?;
1701 types::Int32.encode(buf, &self.epoch)?;
1702 if version >= 12 {
1703 let num_tagged_fields = self.unknown_tagged_fields.len();
1704 if num_tagged_fields > std::u32::MAX as usize {
1705 bail!(
1706 "Too many tagged fields to encode ({} fields)",
1707 num_tagged_fields
1708 );
1709 }
1710 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1711
1712 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1713 }
1714 Ok(())
1715 }
1716 fn compute_size(&self, version: i16) -> Result<usize> {
1717 let mut total_size = 0;
1718 total_size += types::Int64.compute_size(&self.end_offset)?;
1719 total_size += types::Int32.compute_size(&self.epoch)?;
1720 if version >= 12 {
1721 let num_tagged_fields = self.unknown_tagged_fields.len();
1722 if num_tagged_fields > std::u32::MAX as usize {
1723 bail!(
1724 "Too many tagged fields to encode ({} fields)",
1725 num_tagged_fields
1726 );
1727 }
1728 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1729
1730 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1731 }
1732 Ok(total_size)
1733 }
1734}
1735
1736#[cfg(feature = "client")]
1737impl Decodable for SnapshotId {
1738 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1739 let end_offset = types::Int64.decode(buf)?;
1740 let epoch = types::Int32.decode(buf)?;
1741 let mut unknown_tagged_fields = BTreeMap::new();
1742 if version >= 12 {
1743 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1744 for _ in 0..num_tagged_fields {
1745 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1746 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1747 let unknown_value = buf.try_get_bytes(size as usize)?;
1748 unknown_tagged_fields.insert(tag as i32, unknown_value);
1749 }
1750 }
1751 Ok(Self {
1752 end_offset,
1753 epoch,
1754 unknown_tagged_fields,
1755 })
1756 }
1757}
1758
1759impl Default for SnapshotId {
1760 fn default() -> Self {
1761 Self {
1762 end_offset: -1,
1763 epoch: -1,
1764 unknown_tagged_fields: BTreeMap::new(),
1765 }
1766 }
1767}
1768
1769impl Message for SnapshotId {
1770 const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
1771 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1772}
1773
1774impl HeaderVersion for FetchResponse {
1775 fn header_version(version: i16) -> i16 {
1776 if version >= 12 {
1777 1
1778 } else {
1779 0
1780 }
1781 }
1782}