1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AbortedTransaction {
24 pub producer_id: super::ProducerId,
28
29 pub first_offset: i64,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl AbortedTransaction {
39 pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
45 self.producer_id = value;
46 self
47 }
48 pub fn with_first_offset(mut self, value: i64) -> Self {
54 self.first_offset = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for AbortedTransaction {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version < 4 || version > 18 {
73 bail!("specified version not supported by this message type");
74 }
75 types::Int64.encode(buf, &self.producer_id)?;
76 types::Int64.encode(buf, &self.first_offset)?;
77 if version >= 12 {
78 let num_tagged_fields = self.unknown_tagged_fields.len();
79 if num_tagged_fields > std::u32::MAX as usize {
80 bail!(
81 "Too many tagged fields to encode ({} fields)",
82 num_tagged_fields
83 );
84 }
85 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
86
87 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
88 }
89 Ok(())
90 }
91 fn compute_size(&self, version: i16) -> Result<usize> {
92 let mut total_size = 0;
93 total_size += types::Int64.compute_size(&self.producer_id)?;
94 total_size += types::Int64.compute_size(&self.first_offset)?;
95 if version >= 12 {
96 let num_tagged_fields = self.unknown_tagged_fields.len();
97 if num_tagged_fields > std::u32::MAX as usize {
98 bail!(
99 "Too many tagged fields to encode ({} fields)",
100 num_tagged_fields
101 );
102 }
103 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
104
105 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
106 }
107 Ok(total_size)
108 }
109}
110
111#[cfg(feature = "client")]
112impl Decodable for AbortedTransaction {
113 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
114 if version < 4 || version > 18 {
115 bail!("specified version not supported by this message type");
116 }
117 let producer_id = types::Int64.decode(buf)?;
118 let first_offset = types::Int64.decode(buf)?;
119 let mut unknown_tagged_fields = BTreeMap::new();
120 if version >= 12 {
121 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
122 for _ in 0..num_tagged_fields {
123 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
124 let size: u32 = types::UnsignedVarInt.decode(buf)?;
125 let unknown_value = buf.try_get_bytes(size as usize)?;
126 unknown_tagged_fields.insert(tag as i32, unknown_value);
127 }
128 }
129 Ok(Self {
130 producer_id,
131 first_offset,
132 unknown_tagged_fields,
133 })
134 }
135}
136
137impl Default for AbortedTransaction {
138 fn default() -> Self {
139 Self {
140 producer_id: (0).into(),
141 first_offset: 0,
142 unknown_tagged_fields: BTreeMap::new(),
143 }
144 }
145}
146
147impl Message for AbortedTransaction {
148 const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
149 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
150}
151
152#[non_exhaustive]
154#[derive(Debug, Clone, PartialEq)]
155pub struct EpochEndOffset {
156 pub epoch: i32,
160
161 pub end_offset: i64,
165
166 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
168}
169
170impl EpochEndOffset {
171 pub fn with_epoch(mut self, value: i32) -> Self {
177 self.epoch = value;
178 self
179 }
180 pub fn with_end_offset(mut self, value: i64) -> Self {
186 self.end_offset = value;
187 self
188 }
189 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
191 self.unknown_tagged_fields = value;
192 self
193 }
194 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
196 self.unknown_tagged_fields.insert(key, value);
197 self
198 }
199}
200
201#[cfg(feature = "broker")]
202impl Encodable for EpochEndOffset {
203 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
204 if version < 4 || version > 18 {
205 bail!("specified version not supported by this message type");
206 }
207 if version >= 12 {
208 types::Int32.encode(buf, &self.epoch)?;
209 } else {
210 if self.epoch != -1 {
211 bail!("A field is set that is not available on the selected protocol version");
212 }
213 }
214 if version >= 12 {
215 types::Int64.encode(buf, &self.end_offset)?;
216 } else {
217 if self.end_offset != -1 {
218 bail!("A field is set that is not available on the selected protocol version");
219 }
220 }
221 if version >= 12 {
222 let num_tagged_fields = self.unknown_tagged_fields.len();
223 if num_tagged_fields > std::u32::MAX as usize {
224 bail!(
225 "Too many tagged fields to encode ({} fields)",
226 num_tagged_fields
227 );
228 }
229 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
230
231 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
232 }
233 Ok(())
234 }
235 fn compute_size(&self, version: i16) -> Result<usize> {
236 let mut total_size = 0;
237 if version >= 12 {
238 total_size += types::Int32.compute_size(&self.epoch)?;
239 } else {
240 if self.epoch != -1 {
241 bail!("A field is set that is not available on the selected protocol version");
242 }
243 }
244 if version >= 12 {
245 total_size += types::Int64.compute_size(&self.end_offset)?;
246 } else {
247 if self.end_offset != -1 {
248 bail!("A field is set that is not available on the selected protocol version");
249 }
250 }
251 if version >= 12 {
252 let num_tagged_fields = self.unknown_tagged_fields.len();
253 if num_tagged_fields > std::u32::MAX as usize {
254 bail!(
255 "Too many tagged fields to encode ({} fields)",
256 num_tagged_fields
257 );
258 }
259 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
260
261 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
262 }
263 Ok(total_size)
264 }
265}
266
267#[cfg(feature = "client")]
268impl Decodable for EpochEndOffset {
269 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
270 if version < 4 || version > 18 {
271 bail!("specified version not supported by this message type");
272 }
273 let epoch = if version >= 12 {
274 types::Int32.decode(buf)?
275 } else {
276 -1
277 };
278 let end_offset = if version >= 12 {
279 types::Int64.decode(buf)?
280 } else {
281 -1
282 };
283 let mut unknown_tagged_fields = BTreeMap::new();
284 if version >= 12 {
285 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
286 for _ in 0..num_tagged_fields {
287 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
288 let size: u32 = types::UnsignedVarInt.decode(buf)?;
289 let unknown_value = buf.try_get_bytes(size as usize)?;
290 unknown_tagged_fields.insert(tag as i32, unknown_value);
291 }
292 }
293 Ok(Self {
294 epoch,
295 end_offset,
296 unknown_tagged_fields,
297 })
298 }
299}
300
301impl Default for EpochEndOffset {
302 fn default() -> Self {
303 Self {
304 epoch: -1,
305 end_offset: -1,
306 unknown_tagged_fields: BTreeMap::new(),
307 }
308 }
309}
310
311impl Message for EpochEndOffset {
312 const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
313 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
314}
315
316#[non_exhaustive]
318#[derive(Debug, Clone, PartialEq)]
319pub struct FetchResponse {
320 pub throttle_time_ms: i32,
324
325 pub error_code: i16,
329
330 pub session_id: i32,
334
335 pub responses: Vec<FetchableTopicResponse>,
339
340 pub node_endpoints: Vec<NodeEndpoint>,
344
345 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
347}
348
349impl FetchResponse {
350 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
356 self.throttle_time_ms = value;
357 self
358 }
359 pub fn with_error_code(mut self, value: i16) -> Self {
365 self.error_code = value;
366 self
367 }
368 pub fn with_session_id(mut self, value: i32) -> Self {
374 self.session_id = value;
375 self
376 }
377 pub fn with_responses(mut self, value: Vec<FetchableTopicResponse>) -> Self {
383 self.responses = value;
384 self
385 }
386 pub fn with_node_endpoints(mut self, value: Vec<NodeEndpoint>) -> Self {
392 self.node_endpoints = value;
393 self
394 }
395 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
397 self.unknown_tagged_fields = value;
398 self
399 }
400 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
402 self.unknown_tagged_fields.insert(key, value);
403 self
404 }
405}
406
407#[cfg(feature = "broker")]
408impl Encodable for FetchResponse {
409 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
410 if version < 4 || version > 18 {
411 bail!("specified version not supported by this message type");
412 }
413 types::Int32.encode(buf, &self.throttle_time_ms)?;
414 if version >= 7 {
415 types::Int16.encode(buf, &self.error_code)?;
416 }
417 if version >= 7 {
418 types::Int32.encode(buf, &self.session_id)?;
419 } else {
420 if self.session_id != 0 {
421 bail!("A field is set that is not available on the selected protocol version");
422 }
423 }
424 if version >= 12 {
425 types::CompactArray(types::Struct { version }).encode(buf, &self.responses)?;
426 } else {
427 types::Array(types::Struct { version }).encode(buf, &self.responses)?;
428 }
429 if version >= 12 {
430 let mut num_tagged_fields = self.unknown_tagged_fields.len();
431 if version >= 16 {
432 if !self.node_endpoints.is_empty() {
433 num_tagged_fields += 1;
434 }
435 }
436 if num_tagged_fields > std::u32::MAX as usize {
437 bail!(
438 "Too many tagged fields to encode ({} fields)",
439 num_tagged_fields
440 );
441 }
442 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
443 if version >= 16 {
444 if !self.node_endpoints.is_empty() {
445 let computed_size = types::CompactArray(types::Struct { version })
446 .compute_size(&self.node_endpoints)?;
447 if computed_size > std::u32::MAX as usize {
448 bail!(
449 "Tagged field is too large to encode ({} bytes)",
450 computed_size
451 );
452 }
453 types::UnsignedVarInt.encode(buf, 0)?;
454 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
455 types::CompactArray(types::Struct { version })
456 .encode(buf, &self.node_endpoints)?;
457 }
458 }
459 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
460 }
461 Ok(())
462 }
463 fn compute_size(&self, version: i16) -> Result<usize> {
464 let mut total_size = 0;
465 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
466 if version >= 7 {
467 total_size += types::Int16.compute_size(&self.error_code)?;
468 }
469 if version >= 7 {
470 total_size += types::Int32.compute_size(&self.session_id)?;
471 } else {
472 if self.session_id != 0 {
473 bail!("A field is set that is not available on the selected protocol version");
474 }
475 }
476 if version >= 12 {
477 total_size +=
478 types::CompactArray(types::Struct { version }).compute_size(&self.responses)?;
479 } else {
480 total_size += types::Array(types::Struct { version }).compute_size(&self.responses)?;
481 }
482 if version >= 12 {
483 let mut num_tagged_fields = self.unknown_tagged_fields.len();
484 if version >= 16 {
485 if !self.node_endpoints.is_empty() {
486 num_tagged_fields += 1;
487 }
488 }
489 if num_tagged_fields > std::u32::MAX as usize {
490 bail!(
491 "Too many tagged fields to encode ({} fields)",
492 num_tagged_fields
493 );
494 }
495 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
496 if version >= 16 {
497 if !self.node_endpoints.is_empty() {
498 let computed_size = types::CompactArray(types::Struct { version })
499 .compute_size(&self.node_endpoints)?;
500 if computed_size > std::u32::MAX as usize {
501 bail!(
502 "Tagged field is too large to encode ({} bytes)",
503 computed_size
504 );
505 }
506 total_size += types::UnsignedVarInt.compute_size(0)?;
507 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
508 total_size += computed_size;
509 }
510 }
511 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
512 }
513 Ok(total_size)
514 }
515}
516
517#[cfg(feature = "client")]
518impl Decodable for FetchResponse {
519 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
520 if version < 4 || version > 18 {
521 bail!("specified version not supported by this message type");
522 }
523 let throttle_time_ms = types::Int32.decode(buf)?;
524 let error_code = if version >= 7 {
525 types::Int16.decode(buf)?
526 } else {
527 0
528 };
529 let session_id = if version >= 7 {
530 types::Int32.decode(buf)?
531 } else {
532 0
533 };
534 let responses = if version >= 12 {
535 types::CompactArray(types::Struct { version }).decode(buf)?
536 } else {
537 types::Array(types::Struct { version }).decode(buf)?
538 };
539 let mut node_endpoints = Default::default();
540 let mut unknown_tagged_fields = BTreeMap::new();
541 if version >= 12 {
542 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
543 for _ in 0..num_tagged_fields {
544 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
545 let size: u32 = types::UnsignedVarInt.decode(buf)?;
546 match tag {
547 0 => {
548 if version >= 16 {
549 node_endpoints =
550 types::CompactArray(types::Struct { version }).decode(buf)?;
551 } else {
552 bail!("Tag {} is not valid for version {}", tag, version);
553 }
554 }
555 _ => {
556 let unknown_value = buf.try_get_bytes(size as usize)?;
557 unknown_tagged_fields.insert(tag as i32, unknown_value);
558 }
559 }
560 }
561 }
562 Ok(Self {
563 throttle_time_ms,
564 error_code,
565 session_id,
566 responses,
567 node_endpoints,
568 unknown_tagged_fields,
569 })
570 }
571}
572
573impl Default for FetchResponse {
574 fn default() -> Self {
575 Self {
576 throttle_time_ms: 0,
577 error_code: 0,
578 session_id: 0,
579 responses: Default::default(),
580 node_endpoints: Default::default(),
581 unknown_tagged_fields: BTreeMap::new(),
582 }
583 }
584}
585
586impl Message for FetchResponse {
587 const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
588 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
589}
590
591#[non_exhaustive]
593#[derive(Debug, Clone, PartialEq)]
594pub struct FetchableTopicResponse {
595 pub topic: super::TopicName,
599
600 pub topic_id: Uuid,
604
605 pub partitions: Vec<PartitionData>,
609
610 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
612}
613
614impl FetchableTopicResponse {
615 pub fn with_topic(mut self, value: super::TopicName) -> Self {
621 self.topic = value;
622 self
623 }
624 pub fn with_topic_id(mut self, value: Uuid) -> Self {
630 self.topic_id = value;
631 self
632 }
633 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
639 self.partitions = value;
640 self
641 }
642 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
644 self.unknown_tagged_fields = value;
645 self
646 }
647 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
649 self.unknown_tagged_fields.insert(key, value);
650 self
651 }
652}
653
654#[cfg(feature = "broker")]
655impl Encodable for FetchableTopicResponse {
656 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
657 if version < 4 || version > 18 {
658 bail!("specified version not supported by this message type");
659 }
660 if version <= 12 {
661 if version >= 12 {
662 types::CompactString.encode(buf, &self.topic)?;
663 } else {
664 types::String.encode(buf, &self.topic)?;
665 }
666 }
667 if version >= 13 {
668 types::Uuid.encode(buf, &self.topic_id)?;
669 }
670 if version >= 12 {
671 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
672 } else {
673 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
674 }
675 if version >= 12 {
676 let num_tagged_fields = self.unknown_tagged_fields.len();
677 if num_tagged_fields > std::u32::MAX as usize {
678 bail!(
679 "Too many tagged fields to encode ({} fields)",
680 num_tagged_fields
681 );
682 }
683 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
684
685 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
686 }
687 Ok(())
688 }
689 fn compute_size(&self, version: i16) -> Result<usize> {
690 let mut total_size = 0;
691 if version <= 12 {
692 if version >= 12 {
693 total_size += types::CompactString.compute_size(&self.topic)?;
694 } else {
695 total_size += types::String.compute_size(&self.topic)?;
696 }
697 }
698 if version >= 13 {
699 total_size += types::Uuid.compute_size(&self.topic_id)?;
700 }
701 if version >= 12 {
702 total_size +=
703 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
704 } else {
705 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
706 }
707 if version >= 12 {
708 let num_tagged_fields = self.unknown_tagged_fields.len();
709 if num_tagged_fields > std::u32::MAX as usize {
710 bail!(
711 "Too many tagged fields to encode ({} fields)",
712 num_tagged_fields
713 );
714 }
715 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
716
717 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
718 }
719 Ok(total_size)
720 }
721}
722
723#[cfg(feature = "client")]
724impl Decodable for FetchableTopicResponse {
725 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
726 if version < 4 || version > 18 {
727 bail!("specified version not supported by this message type");
728 }
729 let topic = if version <= 12 {
730 if version >= 12 {
731 types::CompactString.decode(buf)?
732 } else {
733 types::String.decode(buf)?
734 }
735 } else {
736 Default::default()
737 };
738 let topic_id = if version >= 13 {
739 types::Uuid.decode(buf)?
740 } else {
741 Uuid::nil()
742 };
743 let partitions = if version >= 12 {
744 types::CompactArray(types::Struct { version }).decode(buf)?
745 } else {
746 types::Array(types::Struct { version }).decode(buf)?
747 };
748 let mut unknown_tagged_fields = BTreeMap::new();
749 if version >= 12 {
750 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
751 for _ in 0..num_tagged_fields {
752 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
753 let size: u32 = types::UnsignedVarInt.decode(buf)?;
754 let unknown_value = buf.try_get_bytes(size as usize)?;
755 unknown_tagged_fields.insert(tag as i32, unknown_value);
756 }
757 }
758 Ok(Self {
759 topic,
760 topic_id,
761 partitions,
762 unknown_tagged_fields,
763 })
764 }
765}
766
767impl Default for FetchableTopicResponse {
768 fn default() -> Self {
769 Self {
770 topic: Default::default(),
771 topic_id: Uuid::nil(),
772 partitions: Default::default(),
773 unknown_tagged_fields: BTreeMap::new(),
774 }
775 }
776}
777
778impl Message for FetchableTopicResponse {
779 const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
780 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
781}
782
783#[non_exhaustive]
785#[derive(Debug, Clone, PartialEq)]
786pub struct LeaderIdAndEpoch {
787 pub leader_id: super::BrokerId,
791
792 pub leader_epoch: i32,
796
797 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
799}
800
801impl LeaderIdAndEpoch {
802 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
808 self.leader_id = value;
809 self
810 }
811 pub fn with_leader_epoch(mut self, value: i32) -> Self {
817 self.leader_epoch = value;
818 self
819 }
820 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
822 self.unknown_tagged_fields = value;
823 self
824 }
825 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
827 self.unknown_tagged_fields.insert(key, value);
828 self
829 }
830}
831
832#[cfg(feature = "broker")]
833impl Encodable for LeaderIdAndEpoch {
834 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
835 if version < 4 || version > 18 {
836 bail!("specified version not supported by this message type");
837 }
838 if version >= 12 {
839 types::Int32.encode(buf, &self.leader_id)?;
840 } else {
841 if self.leader_id != -1 {
842 bail!("A field is set that is not available on the selected protocol version");
843 }
844 }
845 if version >= 12 {
846 types::Int32.encode(buf, &self.leader_epoch)?;
847 } else {
848 if self.leader_epoch != -1 {
849 bail!("A field is set that is not available on the selected protocol version");
850 }
851 }
852 if version >= 12 {
853 let num_tagged_fields = self.unknown_tagged_fields.len();
854 if num_tagged_fields > std::u32::MAX as usize {
855 bail!(
856 "Too many tagged fields to encode ({} fields)",
857 num_tagged_fields
858 );
859 }
860 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
861
862 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
863 }
864 Ok(())
865 }
866 fn compute_size(&self, version: i16) -> Result<usize> {
867 let mut total_size = 0;
868 if version >= 12 {
869 total_size += types::Int32.compute_size(&self.leader_id)?;
870 } else {
871 if self.leader_id != -1 {
872 bail!("A field is set that is not available on the selected protocol version");
873 }
874 }
875 if version >= 12 {
876 total_size += types::Int32.compute_size(&self.leader_epoch)?;
877 } else {
878 if self.leader_epoch != -1 {
879 bail!("A field is set that is not available on the selected protocol version");
880 }
881 }
882 if version >= 12 {
883 let num_tagged_fields = self.unknown_tagged_fields.len();
884 if num_tagged_fields > std::u32::MAX as usize {
885 bail!(
886 "Too many tagged fields to encode ({} fields)",
887 num_tagged_fields
888 );
889 }
890 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
891
892 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
893 }
894 Ok(total_size)
895 }
896}
897
898#[cfg(feature = "client")]
899impl Decodable for LeaderIdAndEpoch {
900 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
901 if version < 4 || version > 18 {
902 bail!("specified version not supported by this message type");
903 }
904 let leader_id = if version >= 12 {
905 types::Int32.decode(buf)?
906 } else {
907 (-1).into()
908 };
909 let leader_epoch = if version >= 12 {
910 types::Int32.decode(buf)?
911 } else {
912 -1
913 };
914 let mut unknown_tagged_fields = BTreeMap::new();
915 if version >= 12 {
916 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
917 for _ in 0..num_tagged_fields {
918 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
919 let size: u32 = types::UnsignedVarInt.decode(buf)?;
920 let unknown_value = buf.try_get_bytes(size as usize)?;
921 unknown_tagged_fields.insert(tag as i32, unknown_value);
922 }
923 }
924 Ok(Self {
925 leader_id,
926 leader_epoch,
927 unknown_tagged_fields,
928 })
929 }
930}
931
932impl Default for LeaderIdAndEpoch {
933 fn default() -> Self {
934 Self {
935 leader_id: (-1).into(),
936 leader_epoch: -1,
937 unknown_tagged_fields: BTreeMap::new(),
938 }
939 }
940}
941
942impl Message for LeaderIdAndEpoch {
943 const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
944 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
945}
946
947#[non_exhaustive]
949#[derive(Debug, Clone, PartialEq)]
950pub struct NodeEndpoint {
951 pub node_id: super::BrokerId,
955
956 pub host: StrBytes,
960
961 pub port: i32,
965
966 pub rack: Option<StrBytes>,
970
971 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
973}
974
975impl NodeEndpoint {
976 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
982 self.node_id = value;
983 self
984 }
985 pub fn with_host(mut self, value: StrBytes) -> Self {
991 self.host = value;
992 self
993 }
994 pub fn with_port(mut self, value: i32) -> Self {
1000 self.port = value;
1001 self
1002 }
1003 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
1009 self.rack = value;
1010 self
1011 }
1012 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1014 self.unknown_tagged_fields = value;
1015 self
1016 }
1017 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1019 self.unknown_tagged_fields.insert(key, value);
1020 self
1021 }
1022}
1023
1024#[cfg(feature = "broker")]
1025impl Encodable for NodeEndpoint {
1026 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1027 if version < 4 || version > 18 {
1028 bail!("specified version not supported by this message type");
1029 }
1030 if version >= 16 {
1031 types::Int32.encode(buf, &self.node_id)?;
1032 } else {
1033 if self.node_id != 0 {
1034 bail!("A field is set that is not available on the selected protocol version");
1035 }
1036 }
1037 if version >= 16 {
1038 types::CompactString.encode(buf, &self.host)?;
1039 } else {
1040 if !self.host.is_empty() {
1041 bail!("A field is set that is not available on the selected protocol version");
1042 }
1043 }
1044 if version >= 16 {
1045 types::Int32.encode(buf, &self.port)?;
1046 } else {
1047 if self.port != 0 {
1048 bail!("A field is set that is not available on the selected protocol version");
1049 }
1050 }
1051 if version >= 16 {
1052 types::CompactString.encode(buf, &self.rack)?;
1053 } else {
1054 if !self.rack.is_none() {
1055 bail!("A field is set that is not available on the selected protocol version");
1056 }
1057 }
1058 if version >= 12 {
1059 let num_tagged_fields = self.unknown_tagged_fields.len();
1060 if num_tagged_fields > std::u32::MAX as usize {
1061 bail!(
1062 "Too many tagged fields to encode ({} fields)",
1063 num_tagged_fields
1064 );
1065 }
1066 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1067
1068 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1069 }
1070 Ok(())
1071 }
1072 fn compute_size(&self, version: i16) -> Result<usize> {
1073 let mut total_size = 0;
1074 if version >= 16 {
1075 total_size += types::Int32.compute_size(&self.node_id)?;
1076 } else {
1077 if self.node_id != 0 {
1078 bail!("A field is set that is not available on the selected protocol version");
1079 }
1080 }
1081 if version >= 16 {
1082 total_size += types::CompactString.compute_size(&self.host)?;
1083 } else {
1084 if !self.host.is_empty() {
1085 bail!("A field is set that is not available on the selected protocol version");
1086 }
1087 }
1088 if version >= 16 {
1089 total_size += types::Int32.compute_size(&self.port)?;
1090 } else {
1091 if self.port != 0 {
1092 bail!("A field is set that is not available on the selected protocol version");
1093 }
1094 }
1095 if version >= 16 {
1096 total_size += types::CompactString.compute_size(&self.rack)?;
1097 } else {
1098 if !self.rack.is_none() {
1099 bail!("A field is set that is not available on the selected protocol version");
1100 }
1101 }
1102 if version >= 12 {
1103 let num_tagged_fields = self.unknown_tagged_fields.len();
1104 if num_tagged_fields > std::u32::MAX as usize {
1105 bail!(
1106 "Too many tagged fields to encode ({} fields)",
1107 num_tagged_fields
1108 );
1109 }
1110 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1111
1112 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1113 }
1114 Ok(total_size)
1115 }
1116}
1117
1118#[cfg(feature = "client")]
1119impl Decodable for NodeEndpoint {
1120 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1121 if version < 4 || version > 18 {
1122 bail!("specified version not supported by this message type");
1123 }
1124 let node_id = if version >= 16 {
1125 types::Int32.decode(buf)?
1126 } else {
1127 (0).into()
1128 };
1129 let host = if version >= 16 {
1130 types::CompactString.decode(buf)?
1131 } else {
1132 Default::default()
1133 };
1134 let port = if version >= 16 {
1135 types::Int32.decode(buf)?
1136 } else {
1137 0
1138 };
1139 let rack = if version >= 16 {
1140 types::CompactString.decode(buf)?
1141 } else {
1142 None
1143 };
1144 let mut unknown_tagged_fields = BTreeMap::new();
1145 if version >= 12 {
1146 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1147 for _ in 0..num_tagged_fields {
1148 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1149 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1150 let unknown_value = buf.try_get_bytes(size as usize)?;
1151 unknown_tagged_fields.insert(tag as i32, unknown_value);
1152 }
1153 }
1154 Ok(Self {
1155 node_id,
1156 host,
1157 port,
1158 rack,
1159 unknown_tagged_fields,
1160 })
1161 }
1162}
1163
1164impl Default for NodeEndpoint {
1165 fn default() -> Self {
1166 Self {
1167 node_id: (0).into(),
1168 host: Default::default(),
1169 port: 0,
1170 rack: None,
1171 unknown_tagged_fields: BTreeMap::new(),
1172 }
1173 }
1174}
1175
1176impl Message for NodeEndpoint {
1177 const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
1178 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1179}
1180
1181#[non_exhaustive]
1183#[derive(Debug, Clone, PartialEq)]
1184pub struct PartitionData {
1185 pub partition_index: i32,
1189
1190 pub error_code: i16,
1194
1195 pub high_watermark: i64,
1199
1200 pub last_stable_offset: i64,
1204
1205 pub log_start_offset: i64,
1209
1210 pub diverging_epoch: EpochEndOffset,
1214
1215 pub current_leader: LeaderIdAndEpoch,
1219
1220 pub snapshot_id: SnapshotId,
1224
1225 pub aborted_transactions: Option<Vec<AbortedTransaction>>,
1229
1230 pub preferred_read_replica: super::BrokerId,
1234
1235 pub records: Option<Bytes>,
1239
1240 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1242}
1243
1244impl PartitionData {
1245 pub fn with_partition_index(mut self, value: i32) -> Self {
1251 self.partition_index = value;
1252 self
1253 }
1254 pub fn with_error_code(mut self, value: i16) -> Self {
1260 self.error_code = value;
1261 self
1262 }
1263 pub fn with_high_watermark(mut self, value: i64) -> Self {
1269 self.high_watermark = value;
1270 self
1271 }
1272 pub fn with_last_stable_offset(mut self, value: i64) -> Self {
1278 self.last_stable_offset = value;
1279 self
1280 }
1281 pub fn with_log_start_offset(mut self, value: i64) -> Self {
1287 self.log_start_offset = value;
1288 self
1289 }
1290 pub fn with_diverging_epoch(mut self, value: EpochEndOffset) -> Self {
1296 self.diverging_epoch = value;
1297 self
1298 }
1299 pub fn with_current_leader(mut self, value: LeaderIdAndEpoch) -> Self {
1305 self.current_leader = value;
1306 self
1307 }
1308 pub fn with_snapshot_id(mut self, value: SnapshotId) -> Self {
1314 self.snapshot_id = value;
1315 self
1316 }
1317 pub fn with_aborted_transactions(mut self, value: Option<Vec<AbortedTransaction>>) -> Self {
1323 self.aborted_transactions = value;
1324 self
1325 }
1326 pub fn with_preferred_read_replica(mut self, value: super::BrokerId) -> Self {
1332 self.preferred_read_replica = value;
1333 self
1334 }
1335 pub fn with_records(mut self, value: Option<Bytes>) -> Self {
1341 self.records = value;
1342 self
1343 }
1344 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1346 self.unknown_tagged_fields = value;
1347 self
1348 }
1349 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1351 self.unknown_tagged_fields.insert(key, value);
1352 self
1353 }
1354}
1355
1356#[cfg(feature = "broker")]
1357impl Encodable for PartitionData {
1358 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1359 if version < 4 || version > 18 {
1360 bail!("specified version not supported by this message type");
1361 }
1362 types::Int32.encode(buf, &self.partition_index)?;
1363 types::Int16.encode(buf, &self.error_code)?;
1364 types::Int64.encode(buf, &self.high_watermark)?;
1365 types::Int64.encode(buf, &self.last_stable_offset)?;
1366 if version >= 5 {
1367 types::Int64.encode(buf, &self.log_start_offset)?;
1368 }
1369 if version >= 12 {
1370 types::CompactArray(types::Struct { version })
1371 .encode(buf, &self.aborted_transactions)?;
1372 } else {
1373 types::Array(types::Struct { version }).encode(buf, &self.aborted_transactions)?;
1374 }
1375 if version >= 11 {
1376 types::Int32.encode(buf, &self.preferred_read_replica)?;
1377 } else {
1378 if self.preferred_read_replica != -1 {
1379 bail!("A field is set that is not available on the selected protocol version");
1380 }
1381 }
1382 if version >= 12 {
1383 types::CompactBytes.encode(buf, &self.records)?;
1384 } else {
1385 types::Bytes.encode(buf, &self.records)?;
1386 }
1387 if version >= 12 {
1388 let mut num_tagged_fields = self.unknown_tagged_fields.len();
1389 if &self.diverging_epoch != &Default::default() {
1390 num_tagged_fields += 1;
1391 }
1392 if &self.current_leader != &Default::default() {
1393 num_tagged_fields += 1;
1394 }
1395 if &self.snapshot_id != &Default::default() {
1396 num_tagged_fields += 1;
1397 }
1398 if num_tagged_fields > std::u32::MAX as usize {
1399 bail!(
1400 "Too many tagged fields to encode ({} fields)",
1401 num_tagged_fields
1402 );
1403 }
1404 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1405 if &self.diverging_epoch != &Default::default() {
1406 let computed_size =
1407 types::Struct { version }.compute_size(&self.diverging_epoch)?;
1408 if computed_size > std::u32::MAX as usize {
1409 bail!(
1410 "Tagged field is too large to encode ({} bytes)",
1411 computed_size
1412 );
1413 }
1414 types::UnsignedVarInt.encode(buf, 0)?;
1415 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1416 types::Struct { version }.encode(buf, &self.diverging_epoch)?;
1417 }
1418 if &self.current_leader != &Default::default() {
1419 let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
1420 if computed_size > std::u32::MAX as usize {
1421 bail!(
1422 "Tagged field is too large to encode ({} bytes)",
1423 computed_size
1424 );
1425 }
1426 types::UnsignedVarInt.encode(buf, 1)?;
1427 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1428 types::Struct { version }.encode(buf, &self.current_leader)?;
1429 }
1430 if &self.snapshot_id != &Default::default() {
1431 let computed_size = types::Struct { version }.compute_size(&self.snapshot_id)?;
1432 if computed_size > std::u32::MAX as usize {
1433 bail!(
1434 "Tagged field is too large to encode ({} bytes)",
1435 computed_size
1436 );
1437 }
1438 types::UnsignedVarInt.encode(buf, 2)?;
1439 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1440 types::Struct { version }.encode(buf, &self.snapshot_id)?;
1441 }
1442
1443 write_unknown_tagged_fields(buf, 3.., &self.unknown_tagged_fields)?;
1444 }
1445 Ok(())
1446 }
1447 fn compute_size(&self, version: i16) -> Result<usize> {
1448 let mut total_size = 0;
1449 total_size += types::Int32.compute_size(&self.partition_index)?;
1450 total_size += types::Int16.compute_size(&self.error_code)?;
1451 total_size += types::Int64.compute_size(&self.high_watermark)?;
1452 total_size += types::Int64.compute_size(&self.last_stable_offset)?;
1453 if version >= 5 {
1454 total_size += types::Int64.compute_size(&self.log_start_offset)?;
1455 }
1456 if version >= 12 {
1457 total_size += types::CompactArray(types::Struct { version })
1458 .compute_size(&self.aborted_transactions)?;
1459 } else {
1460 total_size +=
1461 types::Array(types::Struct { version }).compute_size(&self.aborted_transactions)?;
1462 }
1463 if version >= 11 {
1464 total_size += types::Int32.compute_size(&self.preferred_read_replica)?;
1465 } else {
1466 if self.preferred_read_replica != -1 {
1467 bail!("A field is set that is not available on the selected protocol version");
1468 }
1469 }
1470 if version >= 12 {
1471 total_size += types::CompactBytes.compute_size(&self.records)?;
1472 } else {
1473 total_size += types::Bytes.compute_size(&self.records)?;
1474 }
1475 if version >= 12 {
1476 let mut num_tagged_fields = self.unknown_tagged_fields.len();
1477 if &self.diverging_epoch != &Default::default() {
1478 num_tagged_fields += 1;
1479 }
1480 if &self.current_leader != &Default::default() {
1481 num_tagged_fields += 1;
1482 }
1483 if &self.snapshot_id != &Default::default() {
1484 num_tagged_fields += 1;
1485 }
1486 if num_tagged_fields > std::u32::MAX as usize {
1487 bail!(
1488 "Too many tagged fields to encode ({} fields)",
1489 num_tagged_fields
1490 );
1491 }
1492 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1493 if &self.diverging_epoch != &Default::default() {
1494 let computed_size =
1495 types::Struct { version }.compute_size(&self.diverging_epoch)?;
1496 if computed_size > std::u32::MAX as usize {
1497 bail!(
1498 "Tagged field is too large to encode ({} bytes)",
1499 computed_size
1500 );
1501 }
1502 total_size += types::UnsignedVarInt.compute_size(0)?;
1503 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1504 total_size += computed_size;
1505 }
1506 if &self.current_leader != &Default::default() {
1507 let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
1508 if computed_size > std::u32::MAX as usize {
1509 bail!(
1510 "Tagged field is too large to encode ({} bytes)",
1511 computed_size
1512 );
1513 }
1514 total_size += types::UnsignedVarInt.compute_size(1)?;
1515 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1516 total_size += computed_size;
1517 }
1518 if &self.snapshot_id != &Default::default() {
1519 let computed_size = types::Struct { version }.compute_size(&self.snapshot_id)?;
1520 if computed_size > std::u32::MAX as usize {
1521 bail!(
1522 "Tagged field is too large to encode ({} bytes)",
1523 computed_size
1524 );
1525 }
1526 total_size += types::UnsignedVarInt.compute_size(2)?;
1527 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1528 total_size += computed_size;
1529 }
1530
1531 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1532 }
1533 Ok(total_size)
1534 }
1535}
1536
1537#[cfg(feature = "client")]
1538impl Decodable for PartitionData {
1539 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1540 if version < 4 || version > 18 {
1541 bail!("specified version not supported by this message type");
1542 }
1543 let partition_index = types::Int32.decode(buf)?;
1544 let error_code = types::Int16.decode(buf)?;
1545 let high_watermark = types::Int64.decode(buf)?;
1546 let last_stable_offset = types::Int64.decode(buf)?;
1547 let log_start_offset = if version >= 5 {
1548 types::Int64.decode(buf)?
1549 } else {
1550 -1
1551 };
1552 let mut diverging_epoch = Default::default();
1553 let mut current_leader = Default::default();
1554 let mut snapshot_id = Default::default();
1555 let aborted_transactions = if version >= 12 {
1556 types::CompactArray(types::Struct { version }).decode(buf)?
1557 } else {
1558 types::Array(types::Struct { version }).decode(buf)?
1559 };
1560 let preferred_read_replica = if version >= 11 {
1561 types::Int32.decode(buf)?
1562 } else {
1563 (-1).into()
1564 };
1565 let records = if version >= 12 {
1566 types::CompactBytes.decode(buf)?
1567 } else {
1568 types::Bytes.decode(buf)?
1569 };
1570 let mut unknown_tagged_fields = BTreeMap::new();
1571 if version >= 12 {
1572 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1573 for _ in 0..num_tagged_fields {
1574 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1575 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1576 match tag {
1577 0 => {
1578 diverging_epoch = types::Struct { version }.decode(buf)?;
1579 }
1580 1 => {
1581 current_leader = types::Struct { version }.decode(buf)?;
1582 }
1583 2 => {
1584 snapshot_id = types::Struct { version }.decode(buf)?;
1585 }
1586 _ => {
1587 let unknown_value = buf.try_get_bytes(size as usize)?;
1588 unknown_tagged_fields.insert(tag as i32, unknown_value);
1589 }
1590 }
1591 }
1592 }
1593 Ok(Self {
1594 partition_index,
1595 error_code,
1596 high_watermark,
1597 last_stable_offset,
1598 log_start_offset,
1599 diverging_epoch,
1600 current_leader,
1601 snapshot_id,
1602 aborted_transactions,
1603 preferred_read_replica,
1604 records,
1605 unknown_tagged_fields,
1606 })
1607 }
1608}
1609
1610impl Default for PartitionData {
1611 fn default() -> Self {
1612 Self {
1613 partition_index: 0,
1614 error_code: 0,
1615 high_watermark: 0,
1616 last_stable_offset: -1,
1617 log_start_offset: -1,
1618 diverging_epoch: Default::default(),
1619 current_leader: Default::default(),
1620 snapshot_id: Default::default(),
1621 aborted_transactions: Some(Default::default()),
1622 preferred_read_replica: (-1).into(),
1623 records: Some(Default::default()),
1624 unknown_tagged_fields: BTreeMap::new(),
1625 }
1626 }
1627}
1628
1629impl Message for PartitionData {
1630 const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
1631 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1632}
1633
1634#[non_exhaustive]
1636#[derive(Debug, Clone, PartialEq)]
1637pub struct SnapshotId {
1638 pub end_offset: i64,
1642
1643 pub epoch: i32,
1647
1648 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1650}
1651
1652impl SnapshotId {
1653 pub fn with_end_offset(mut self, value: i64) -> Self {
1659 self.end_offset = value;
1660 self
1661 }
1662 pub fn with_epoch(mut self, value: i32) -> Self {
1668 self.epoch = value;
1669 self
1670 }
1671 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1673 self.unknown_tagged_fields = value;
1674 self
1675 }
1676 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1678 self.unknown_tagged_fields.insert(key, value);
1679 self
1680 }
1681}
1682
1683#[cfg(feature = "broker")]
1684impl Encodable for SnapshotId {
1685 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1686 if version < 4 || version > 18 {
1687 bail!("specified version not supported by this message type");
1688 }
1689 types::Int64.encode(buf, &self.end_offset)?;
1690 types::Int32.encode(buf, &self.epoch)?;
1691 if version >= 12 {
1692 let num_tagged_fields = self.unknown_tagged_fields.len();
1693 if num_tagged_fields > std::u32::MAX as usize {
1694 bail!(
1695 "Too many tagged fields to encode ({} fields)",
1696 num_tagged_fields
1697 );
1698 }
1699 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1700
1701 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1702 }
1703 Ok(())
1704 }
1705 fn compute_size(&self, version: i16) -> Result<usize> {
1706 let mut total_size = 0;
1707 total_size += types::Int64.compute_size(&self.end_offset)?;
1708 total_size += types::Int32.compute_size(&self.epoch)?;
1709 if version >= 12 {
1710 let num_tagged_fields = self.unknown_tagged_fields.len();
1711 if num_tagged_fields > std::u32::MAX as usize {
1712 bail!(
1713 "Too many tagged fields to encode ({} fields)",
1714 num_tagged_fields
1715 );
1716 }
1717 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1718
1719 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1720 }
1721 Ok(total_size)
1722 }
1723}
1724
1725#[cfg(feature = "client")]
1726impl Decodable for SnapshotId {
1727 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1728 if version < 4 || version > 18 {
1729 bail!("specified version not supported by this message type");
1730 }
1731 let end_offset = types::Int64.decode(buf)?;
1732 let epoch = types::Int32.decode(buf)?;
1733 let mut unknown_tagged_fields = BTreeMap::new();
1734 if version >= 12 {
1735 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1736 for _ in 0..num_tagged_fields {
1737 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1738 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1739 let unknown_value = buf.try_get_bytes(size as usize)?;
1740 unknown_tagged_fields.insert(tag as i32, unknown_value);
1741 }
1742 }
1743 Ok(Self {
1744 end_offset,
1745 epoch,
1746 unknown_tagged_fields,
1747 })
1748 }
1749}
1750
1751impl Default for SnapshotId {
1752 fn default() -> Self {
1753 Self {
1754 end_offset: -1,
1755 epoch: -1,
1756 unknown_tagged_fields: BTreeMap::new(),
1757 }
1758 }
1759}
1760
1761impl Message for SnapshotId {
1762 const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
1763 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1764}
1765
1766impl HeaderVersion for FetchResponse {
1767 fn header_version(version: i16) -> i16 {
1768 if version >= 12 {
1769 1
1770 } else {
1771 0
1772 }
1773 }
1774}