1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct FetchPartition {
24 pub partition: i32,
28
29 pub current_leader_epoch: i32,
33
34 pub fetch_offset: i64,
38
39 pub last_fetched_epoch: i32,
43
44 pub log_start_offset: i64,
48
49 pub partition_max_bytes: i32,
53
54 pub replica_directory_id: Uuid,
58
59 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
61}
62
63impl FetchPartition {
64 pub fn with_partition(mut self, value: i32) -> Self {
70 self.partition = value;
71 self
72 }
73 pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
79 self.current_leader_epoch = value;
80 self
81 }
82 pub fn with_fetch_offset(mut self, value: i64) -> Self {
88 self.fetch_offset = value;
89 self
90 }
91 pub fn with_last_fetched_epoch(mut self, value: i32) -> Self {
97 self.last_fetched_epoch = value;
98 self
99 }
100 pub fn with_log_start_offset(mut self, value: i64) -> Self {
106 self.log_start_offset = value;
107 self
108 }
109 pub fn with_partition_max_bytes(mut self, value: i32) -> Self {
115 self.partition_max_bytes = value;
116 self
117 }
118 pub fn with_replica_directory_id(mut self, value: Uuid) -> Self {
124 self.replica_directory_id = value;
125 self
126 }
127 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
129 self.unknown_tagged_fields = value;
130 self
131 }
132 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
134 self.unknown_tagged_fields.insert(key, value);
135 self
136 }
137}
138
139#[cfg(feature = "client")]
140impl Encodable for FetchPartition {
141 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
142 if version < 0 || version > 17 {
143 bail!("specified version not supported by this message type");
144 }
145 types::Int32.encode(buf, &self.partition)?;
146 if version >= 9 {
147 types::Int32.encode(buf, &self.current_leader_epoch)?;
148 }
149 types::Int64.encode(buf, &self.fetch_offset)?;
150 if version >= 12 {
151 types::Int32.encode(buf, &self.last_fetched_epoch)?;
152 } else {
153 if self.last_fetched_epoch != -1 {
154 bail!("A field is set that is not available on the selected protocol version");
155 }
156 }
157 if version >= 5 {
158 types::Int64.encode(buf, &self.log_start_offset)?;
159 }
160 types::Int32.encode(buf, &self.partition_max_bytes)?;
161 if version >= 12 {
162 let mut num_tagged_fields = self.unknown_tagged_fields.len();
163 if version >= 17 {
164 if &self.replica_directory_id != &Uuid::nil() {
165 num_tagged_fields += 1;
166 }
167 }
168 if num_tagged_fields > std::u32::MAX as usize {
169 bail!(
170 "Too many tagged fields to encode ({} fields)",
171 num_tagged_fields
172 );
173 }
174 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
175 if version >= 17 {
176 if &self.replica_directory_id != &Uuid::nil() {
177 let computed_size = types::Uuid.compute_size(&self.replica_directory_id)?;
178 if computed_size > std::u32::MAX as usize {
179 bail!(
180 "Tagged field is too large to encode ({} bytes)",
181 computed_size
182 );
183 }
184 types::UnsignedVarInt.encode(buf, 0)?;
185 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
186 types::Uuid.encode(buf, &self.replica_directory_id)?;
187 }
188 }
189 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
190 }
191 Ok(())
192 }
193 fn compute_size(&self, version: i16) -> Result<usize> {
194 let mut total_size = 0;
195 total_size += types::Int32.compute_size(&self.partition)?;
196 if version >= 9 {
197 total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
198 }
199 total_size += types::Int64.compute_size(&self.fetch_offset)?;
200 if version >= 12 {
201 total_size += types::Int32.compute_size(&self.last_fetched_epoch)?;
202 } else {
203 if self.last_fetched_epoch != -1 {
204 bail!("A field is set that is not available on the selected protocol version");
205 }
206 }
207 if version >= 5 {
208 total_size += types::Int64.compute_size(&self.log_start_offset)?;
209 }
210 total_size += types::Int32.compute_size(&self.partition_max_bytes)?;
211 if version >= 12 {
212 let mut num_tagged_fields = self.unknown_tagged_fields.len();
213 if version >= 17 {
214 if &self.replica_directory_id != &Uuid::nil() {
215 num_tagged_fields += 1;
216 }
217 }
218 if num_tagged_fields > std::u32::MAX as usize {
219 bail!(
220 "Too many tagged fields to encode ({} fields)",
221 num_tagged_fields
222 );
223 }
224 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
225 if version >= 17 {
226 if &self.replica_directory_id != &Uuid::nil() {
227 let computed_size = types::Uuid.compute_size(&self.replica_directory_id)?;
228 if computed_size > std::u32::MAX as usize {
229 bail!(
230 "Tagged field is too large to encode ({} bytes)",
231 computed_size
232 );
233 }
234 total_size += types::UnsignedVarInt.compute_size(0)?;
235 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
236 total_size += computed_size;
237 }
238 }
239 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
240 }
241 Ok(total_size)
242 }
243}
244
245#[cfg(feature = "broker")]
246impl Decodable for FetchPartition {
247 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
248 if version < 0 || version > 17 {
249 bail!("specified version not supported by this message type");
250 }
251 let partition = types::Int32.decode(buf)?;
252 let current_leader_epoch = if version >= 9 {
253 types::Int32.decode(buf)?
254 } else {
255 -1
256 };
257 let fetch_offset = types::Int64.decode(buf)?;
258 let last_fetched_epoch = if version >= 12 {
259 types::Int32.decode(buf)?
260 } else {
261 -1
262 };
263 let log_start_offset = if version >= 5 {
264 types::Int64.decode(buf)?
265 } else {
266 -1
267 };
268 let partition_max_bytes = types::Int32.decode(buf)?;
269 let mut replica_directory_id = Uuid::nil();
270 let mut unknown_tagged_fields = BTreeMap::new();
271 if version >= 12 {
272 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
273 for _ in 0..num_tagged_fields {
274 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
275 let size: u32 = types::UnsignedVarInt.decode(buf)?;
276 match tag {
277 0 => {
278 if version >= 17 {
279 replica_directory_id = types::Uuid.decode(buf)?;
280 } else {
281 bail!("Tag {} is not valid for version {}", tag, version);
282 }
283 }
284 _ => {
285 let unknown_value = buf.try_get_bytes(size as usize)?;
286 unknown_tagged_fields.insert(tag as i32, unknown_value);
287 }
288 }
289 }
290 }
291 Ok(Self {
292 partition,
293 current_leader_epoch,
294 fetch_offset,
295 last_fetched_epoch,
296 log_start_offset,
297 partition_max_bytes,
298 replica_directory_id,
299 unknown_tagged_fields,
300 })
301 }
302}
303
304impl Default for FetchPartition {
305 fn default() -> Self {
306 Self {
307 partition: 0,
308 current_leader_epoch: -1,
309 fetch_offset: 0,
310 last_fetched_epoch: -1,
311 log_start_offset: -1,
312 partition_max_bytes: 0,
313 replica_directory_id: Uuid::nil(),
314 unknown_tagged_fields: BTreeMap::new(),
315 }
316 }
317}
318
319impl Message for FetchPartition {
320 const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
321 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
322}
323
324#[non_exhaustive]
326#[derive(Debug, Clone, PartialEq)]
327pub struct FetchRequest {
328 pub cluster_id: Option<StrBytes>,
332
333 pub replica_id: super::BrokerId,
337
338 pub replica_state: ReplicaState,
342
343 pub max_wait_ms: i32,
347
348 pub min_bytes: i32,
352
353 pub max_bytes: i32,
357
358 pub isolation_level: i8,
362
363 pub session_id: i32,
367
368 pub session_epoch: i32,
372
373 pub topics: Vec<FetchTopic>,
377
378 pub forgotten_topics_data: Vec<ForgottenTopic>,
382
383 pub rack_id: StrBytes,
387
388 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
390}
391
392impl FetchRequest {
393 pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
399 self.cluster_id = value;
400 self
401 }
402 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
408 self.replica_id = value;
409 self
410 }
411 pub fn with_replica_state(mut self, value: ReplicaState) -> Self {
417 self.replica_state = value;
418 self
419 }
420 pub fn with_max_wait_ms(mut self, value: i32) -> Self {
426 self.max_wait_ms = value;
427 self
428 }
429 pub fn with_min_bytes(mut self, value: i32) -> Self {
435 self.min_bytes = value;
436 self
437 }
438 pub fn with_max_bytes(mut self, value: i32) -> Self {
444 self.max_bytes = value;
445 self
446 }
447 pub fn with_isolation_level(mut self, value: i8) -> Self {
453 self.isolation_level = value;
454 self
455 }
456 pub fn with_session_id(mut self, value: i32) -> Self {
462 self.session_id = value;
463 self
464 }
465 pub fn with_session_epoch(mut self, value: i32) -> Self {
471 self.session_epoch = value;
472 self
473 }
474 pub fn with_topics(mut self, value: Vec<FetchTopic>) -> Self {
480 self.topics = value;
481 self
482 }
483 pub fn with_forgotten_topics_data(mut self, value: Vec<ForgottenTopic>) -> Self {
489 self.forgotten_topics_data = value;
490 self
491 }
492 pub fn with_rack_id(mut self, value: StrBytes) -> Self {
498 self.rack_id = value;
499 self
500 }
501 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
503 self.unknown_tagged_fields = value;
504 self
505 }
506 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
508 self.unknown_tagged_fields.insert(key, value);
509 self
510 }
511}
512
513#[cfg(feature = "client")]
514impl Encodable for FetchRequest {
515 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
516 if version < 0 || version > 17 {
517 bail!("specified version not supported by this message type");
518 }
519 if version <= 14 {
520 types::Int32.encode(buf, &self.replica_id)?;
521 } else {
522 if self.replica_id != -1 {
523 bail!("A field is set that is not available on the selected protocol version");
524 }
525 }
526 types::Int32.encode(buf, &self.max_wait_ms)?;
527 types::Int32.encode(buf, &self.min_bytes)?;
528 if version >= 3 {
529 types::Int32.encode(buf, &self.max_bytes)?;
530 }
531 if version >= 4 {
532 types::Int8.encode(buf, &self.isolation_level)?;
533 }
534 if version >= 7 {
535 types::Int32.encode(buf, &self.session_id)?;
536 }
537 if version >= 7 {
538 types::Int32.encode(buf, &self.session_epoch)?;
539 }
540 if version >= 12 {
541 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
542 } else {
543 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
544 }
545 if version >= 7 {
546 if version >= 12 {
547 types::CompactArray(types::Struct { version })
548 .encode(buf, &self.forgotten_topics_data)?;
549 } else {
550 types::Array(types::Struct { version }).encode(buf, &self.forgotten_topics_data)?;
551 }
552 } else {
553 if !self.forgotten_topics_data.is_empty() {
554 bail!("A field is set that is not available on the selected protocol version");
555 }
556 }
557 if version >= 11 {
558 if version >= 12 {
559 types::CompactString.encode(buf, &self.rack_id)?;
560 } else {
561 types::String.encode(buf, &self.rack_id)?;
562 }
563 }
564 if version >= 12 {
565 let mut num_tagged_fields = self.unknown_tagged_fields.len();
566 if !self.cluster_id.is_none() {
567 num_tagged_fields += 1;
568 }
569 if version >= 15 {
570 if &self.replica_state != &Default::default() {
571 num_tagged_fields += 1;
572 }
573 }
574 if num_tagged_fields > std::u32::MAX as usize {
575 bail!(
576 "Too many tagged fields to encode ({} fields)",
577 num_tagged_fields
578 );
579 }
580 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
581 if !self.cluster_id.is_none() {
582 let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
583 if computed_size > std::u32::MAX as usize {
584 bail!(
585 "Tagged field is too large to encode ({} bytes)",
586 computed_size
587 );
588 }
589 types::UnsignedVarInt.encode(buf, 0)?;
590 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
591 types::CompactString.encode(buf, &self.cluster_id)?;
592 }
593 if version >= 15 {
594 if &self.replica_state != &Default::default() {
595 let computed_size =
596 types::Struct { version }.compute_size(&self.replica_state)?;
597 if computed_size > std::u32::MAX as usize {
598 bail!(
599 "Tagged field is too large to encode ({} bytes)",
600 computed_size
601 );
602 }
603 types::UnsignedVarInt.encode(buf, 1)?;
604 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
605 types::Struct { version }.encode(buf, &self.replica_state)?;
606 }
607 }
608 write_unknown_tagged_fields(buf, 2.., &self.unknown_tagged_fields)?;
609 }
610 Ok(())
611 }
612 fn compute_size(&self, version: i16) -> Result<usize> {
613 let mut total_size = 0;
614 if version <= 14 {
615 total_size += types::Int32.compute_size(&self.replica_id)?;
616 } else {
617 if self.replica_id != -1 {
618 bail!("A field is set that is not available on the selected protocol version");
619 }
620 }
621 total_size += types::Int32.compute_size(&self.max_wait_ms)?;
622 total_size += types::Int32.compute_size(&self.min_bytes)?;
623 if version >= 3 {
624 total_size += types::Int32.compute_size(&self.max_bytes)?;
625 }
626 if version >= 4 {
627 total_size += types::Int8.compute_size(&self.isolation_level)?;
628 }
629 if version >= 7 {
630 total_size += types::Int32.compute_size(&self.session_id)?;
631 }
632 if version >= 7 {
633 total_size += types::Int32.compute_size(&self.session_epoch)?;
634 }
635 if version >= 12 {
636 total_size +=
637 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
638 } else {
639 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
640 }
641 if version >= 7 {
642 if version >= 12 {
643 total_size += types::CompactArray(types::Struct { version })
644 .compute_size(&self.forgotten_topics_data)?;
645 } else {
646 total_size += types::Array(types::Struct { version })
647 .compute_size(&self.forgotten_topics_data)?;
648 }
649 } else {
650 if !self.forgotten_topics_data.is_empty() {
651 bail!("A field is set that is not available on the selected protocol version");
652 }
653 }
654 if version >= 11 {
655 if version >= 12 {
656 total_size += types::CompactString.compute_size(&self.rack_id)?;
657 } else {
658 total_size += types::String.compute_size(&self.rack_id)?;
659 }
660 }
661 if version >= 12 {
662 let mut num_tagged_fields = self.unknown_tagged_fields.len();
663 if !self.cluster_id.is_none() {
664 num_tagged_fields += 1;
665 }
666 if version >= 15 {
667 if &self.replica_state != &Default::default() {
668 num_tagged_fields += 1;
669 }
670 }
671 if num_tagged_fields > std::u32::MAX as usize {
672 bail!(
673 "Too many tagged fields to encode ({} fields)",
674 num_tagged_fields
675 );
676 }
677 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
678 if !self.cluster_id.is_none() {
679 let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
680 if computed_size > std::u32::MAX as usize {
681 bail!(
682 "Tagged field is too large to encode ({} bytes)",
683 computed_size
684 );
685 }
686 total_size += types::UnsignedVarInt.compute_size(0)?;
687 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
688 total_size += computed_size;
689 }
690 if version >= 15 {
691 if &self.replica_state != &Default::default() {
692 let computed_size =
693 types::Struct { version }.compute_size(&self.replica_state)?;
694 if computed_size > std::u32::MAX as usize {
695 bail!(
696 "Tagged field is too large to encode ({} bytes)",
697 computed_size
698 );
699 }
700 total_size += types::UnsignedVarInt.compute_size(1)?;
701 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
702 total_size += computed_size;
703 }
704 }
705 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
706 }
707 Ok(total_size)
708 }
709}
710
711#[cfg(feature = "broker")]
712impl Decodable for FetchRequest {
713 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
714 if version < 0 || version > 17 {
715 bail!("specified version not supported by this message type");
716 }
717 let mut cluster_id = None;
718 let replica_id = if version <= 14 {
719 types::Int32.decode(buf)?
720 } else {
721 (-1).into()
722 };
723 let mut replica_state = Default::default();
724 let max_wait_ms = types::Int32.decode(buf)?;
725 let min_bytes = types::Int32.decode(buf)?;
726 let max_bytes = if version >= 3 {
727 types::Int32.decode(buf)?
728 } else {
729 0x7fffffff
730 };
731 let isolation_level = if version >= 4 {
732 types::Int8.decode(buf)?
733 } else {
734 0
735 };
736 let session_id = if version >= 7 {
737 types::Int32.decode(buf)?
738 } else {
739 0
740 };
741 let session_epoch = if version >= 7 {
742 types::Int32.decode(buf)?
743 } else {
744 -1
745 };
746 let topics = if version >= 12 {
747 types::CompactArray(types::Struct { version }).decode(buf)?
748 } else {
749 types::Array(types::Struct { version }).decode(buf)?
750 };
751 let forgotten_topics_data = if version >= 7 {
752 if version >= 12 {
753 types::CompactArray(types::Struct { version }).decode(buf)?
754 } else {
755 types::Array(types::Struct { version }).decode(buf)?
756 }
757 } else {
758 Default::default()
759 };
760 let rack_id = if version >= 11 {
761 if version >= 12 {
762 types::CompactString.decode(buf)?
763 } else {
764 types::String.decode(buf)?
765 }
766 } else {
767 StrBytes::from_static_str("")
768 };
769 let mut unknown_tagged_fields = BTreeMap::new();
770 if version >= 12 {
771 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
772 for _ in 0..num_tagged_fields {
773 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
774 let size: u32 = types::UnsignedVarInt.decode(buf)?;
775 match tag {
776 0 => {
777 cluster_id = types::CompactString.decode(buf)?;
778 }
779 1 => {
780 if version >= 15 {
781 replica_state = types::Struct { version }.decode(buf)?;
782 } else {
783 bail!("Tag {} is not valid for version {}", tag, version);
784 }
785 }
786 _ => {
787 let unknown_value = buf.try_get_bytes(size as usize)?;
788 unknown_tagged_fields.insert(tag as i32, unknown_value);
789 }
790 }
791 }
792 }
793 Ok(Self {
794 cluster_id,
795 replica_id,
796 replica_state,
797 max_wait_ms,
798 min_bytes,
799 max_bytes,
800 isolation_level,
801 session_id,
802 session_epoch,
803 topics,
804 forgotten_topics_data,
805 rack_id,
806 unknown_tagged_fields,
807 })
808 }
809}
810
811impl Default for FetchRequest {
812 fn default() -> Self {
813 Self {
814 cluster_id: None,
815 replica_id: (-1).into(),
816 replica_state: Default::default(),
817 max_wait_ms: 0,
818 min_bytes: 0,
819 max_bytes: 0x7fffffff,
820 isolation_level: 0,
821 session_id: 0,
822 session_epoch: -1,
823 topics: Default::default(),
824 forgotten_topics_data: Default::default(),
825 rack_id: StrBytes::from_static_str(""),
826 unknown_tagged_fields: BTreeMap::new(),
827 }
828 }
829}
830
831impl Message for FetchRequest {
832 const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
833 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
834}
835
836#[non_exhaustive]
838#[derive(Debug, Clone, PartialEq)]
839pub struct FetchTopic {
840 pub topic: super::TopicName,
844
845 pub topic_id: Uuid,
849
850 pub partitions: Vec<FetchPartition>,
854
855 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
857}
858
859impl FetchTopic {
860 pub fn with_topic(mut self, value: super::TopicName) -> Self {
866 self.topic = value;
867 self
868 }
869 pub fn with_topic_id(mut self, value: Uuid) -> Self {
875 self.topic_id = value;
876 self
877 }
878 pub fn with_partitions(mut self, value: Vec<FetchPartition>) -> Self {
884 self.partitions = value;
885 self
886 }
887 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
889 self.unknown_tagged_fields = value;
890 self
891 }
892 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
894 self.unknown_tagged_fields.insert(key, value);
895 self
896 }
897}
898
899#[cfg(feature = "client")]
900impl Encodable for FetchTopic {
901 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
902 if version < 0 || version > 17 {
903 bail!("specified version not supported by this message type");
904 }
905 if version <= 12 {
906 if version >= 12 {
907 types::CompactString.encode(buf, &self.topic)?;
908 } else {
909 types::String.encode(buf, &self.topic)?;
910 }
911 }
912 if version >= 13 {
913 types::Uuid.encode(buf, &self.topic_id)?;
914 }
915 if version >= 12 {
916 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
917 } else {
918 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
919 }
920 if version >= 12 {
921 let num_tagged_fields = self.unknown_tagged_fields.len();
922 if num_tagged_fields > std::u32::MAX as usize {
923 bail!(
924 "Too many tagged fields to encode ({} fields)",
925 num_tagged_fields
926 );
927 }
928 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
929
930 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
931 }
932 Ok(())
933 }
934 fn compute_size(&self, version: i16) -> Result<usize> {
935 let mut total_size = 0;
936 if version <= 12 {
937 if version >= 12 {
938 total_size += types::CompactString.compute_size(&self.topic)?;
939 } else {
940 total_size += types::String.compute_size(&self.topic)?;
941 }
942 }
943 if version >= 13 {
944 total_size += types::Uuid.compute_size(&self.topic_id)?;
945 }
946 if version >= 12 {
947 total_size +=
948 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
949 } else {
950 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
951 }
952 if version >= 12 {
953 let num_tagged_fields = self.unknown_tagged_fields.len();
954 if num_tagged_fields > std::u32::MAX as usize {
955 bail!(
956 "Too many tagged fields to encode ({} fields)",
957 num_tagged_fields
958 );
959 }
960 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
961
962 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
963 }
964 Ok(total_size)
965 }
966}
967
968#[cfg(feature = "broker")]
969impl Decodable for FetchTopic {
970 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
971 if version < 0 || version > 17 {
972 bail!("specified version not supported by this message type");
973 }
974 let topic = if version <= 12 {
975 if version >= 12 {
976 types::CompactString.decode(buf)?
977 } else {
978 types::String.decode(buf)?
979 }
980 } else {
981 Default::default()
982 };
983 let topic_id = if version >= 13 {
984 types::Uuid.decode(buf)?
985 } else {
986 Uuid::nil()
987 };
988 let partitions = if version >= 12 {
989 types::CompactArray(types::Struct { version }).decode(buf)?
990 } else {
991 types::Array(types::Struct { version }).decode(buf)?
992 };
993 let mut unknown_tagged_fields = BTreeMap::new();
994 if version >= 12 {
995 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
996 for _ in 0..num_tagged_fields {
997 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
998 let size: u32 = types::UnsignedVarInt.decode(buf)?;
999 let unknown_value = buf.try_get_bytes(size as usize)?;
1000 unknown_tagged_fields.insert(tag as i32, unknown_value);
1001 }
1002 }
1003 Ok(Self {
1004 topic,
1005 topic_id,
1006 partitions,
1007 unknown_tagged_fields,
1008 })
1009 }
1010}
1011
1012impl Default for FetchTopic {
1013 fn default() -> Self {
1014 Self {
1015 topic: Default::default(),
1016 topic_id: Uuid::nil(),
1017 partitions: Default::default(),
1018 unknown_tagged_fields: BTreeMap::new(),
1019 }
1020 }
1021}
1022
1023impl Message for FetchTopic {
1024 const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
1025 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
1026}
1027
1028#[non_exhaustive]
1030#[derive(Debug, Clone, PartialEq)]
1031pub struct ForgottenTopic {
1032 pub topic: super::TopicName,
1036
1037 pub topic_id: Uuid,
1041
1042 pub partitions: Vec<i32>,
1046
1047 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1049}
1050
1051impl ForgottenTopic {
1052 pub fn with_topic(mut self, value: super::TopicName) -> Self {
1058 self.topic = value;
1059 self
1060 }
1061 pub fn with_topic_id(mut self, value: Uuid) -> Self {
1067 self.topic_id = value;
1068 self
1069 }
1070 pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
1076 self.partitions = value;
1077 self
1078 }
1079 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1081 self.unknown_tagged_fields = value;
1082 self
1083 }
1084 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1086 self.unknown_tagged_fields.insert(key, value);
1087 self
1088 }
1089}
1090
1091#[cfg(feature = "client")]
1092impl Encodable for ForgottenTopic {
1093 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1094 if version < 0 || version > 17 {
1095 bail!("specified version not supported by this message type");
1096 }
1097 if version >= 7 && version <= 12 {
1098 if version >= 12 {
1099 types::CompactString.encode(buf, &self.topic)?;
1100 } else {
1101 types::String.encode(buf, &self.topic)?;
1102 }
1103 }
1104 if version >= 13 {
1105 types::Uuid.encode(buf, &self.topic_id)?;
1106 }
1107 if version >= 7 {
1108 if version >= 12 {
1109 types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
1110 } else {
1111 types::Array(types::Int32).encode(buf, &self.partitions)?;
1112 }
1113 } else {
1114 if !self.partitions.is_empty() {
1115 bail!("A field is set that is not available on the selected protocol version");
1116 }
1117 }
1118 if version >= 12 {
1119 let num_tagged_fields = self.unknown_tagged_fields.len();
1120 if num_tagged_fields > std::u32::MAX as usize {
1121 bail!(
1122 "Too many tagged fields to encode ({} fields)",
1123 num_tagged_fields
1124 );
1125 }
1126 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1127
1128 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1129 }
1130 Ok(())
1131 }
1132 fn compute_size(&self, version: i16) -> Result<usize> {
1133 let mut total_size = 0;
1134 if version >= 7 && version <= 12 {
1135 if version >= 12 {
1136 total_size += types::CompactString.compute_size(&self.topic)?;
1137 } else {
1138 total_size += types::String.compute_size(&self.topic)?;
1139 }
1140 }
1141 if version >= 13 {
1142 total_size += types::Uuid.compute_size(&self.topic_id)?;
1143 }
1144 if version >= 7 {
1145 if version >= 12 {
1146 total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
1147 } else {
1148 total_size += types::Array(types::Int32).compute_size(&self.partitions)?;
1149 }
1150 } else {
1151 if !self.partitions.is_empty() {
1152 bail!("A field is set that is not available on the selected protocol version");
1153 }
1154 }
1155 if version >= 12 {
1156 let num_tagged_fields = self.unknown_tagged_fields.len();
1157 if num_tagged_fields > std::u32::MAX as usize {
1158 bail!(
1159 "Too many tagged fields to encode ({} fields)",
1160 num_tagged_fields
1161 );
1162 }
1163 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1164
1165 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1166 }
1167 Ok(total_size)
1168 }
1169}
1170
1171#[cfg(feature = "broker")]
1172impl Decodable for ForgottenTopic {
1173 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1174 if version < 0 || version > 17 {
1175 bail!("specified version not supported by this message type");
1176 }
1177 let topic = if version >= 7 && version <= 12 {
1178 if version >= 12 {
1179 types::CompactString.decode(buf)?
1180 } else {
1181 types::String.decode(buf)?
1182 }
1183 } else {
1184 Default::default()
1185 };
1186 let topic_id = if version >= 13 {
1187 types::Uuid.decode(buf)?
1188 } else {
1189 Uuid::nil()
1190 };
1191 let partitions = if version >= 7 {
1192 if version >= 12 {
1193 types::CompactArray(types::Int32).decode(buf)?
1194 } else {
1195 types::Array(types::Int32).decode(buf)?
1196 }
1197 } else {
1198 Default::default()
1199 };
1200 let mut unknown_tagged_fields = BTreeMap::new();
1201 if version >= 12 {
1202 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1203 for _ in 0..num_tagged_fields {
1204 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1205 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1206 let unknown_value = buf.try_get_bytes(size as usize)?;
1207 unknown_tagged_fields.insert(tag as i32, unknown_value);
1208 }
1209 }
1210 Ok(Self {
1211 topic,
1212 topic_id,
1213 partitions,
1214 unknown_tagged_fields,
1215 })
1216 }
1217}
1218
1219impl Default for ForgottenTopic {
1220 fn default() -> Self {
1221 Self {
1222 topic: Default::default(),
1223 topic_id: Uuid::nil(),
1224 partitions: Default::default(),
1225 unknown_tagged_fields: BTreeMap::new(),
1226 }
1227 }
1228}
1229
1230impl Message for ForgottenTopic {
1231 const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
1232 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
1233}
1234
1235#[non_exhaustive]
1237#[derive(Debug, Clone, PartialEq)]
1238pub struct ReplicaState {
1239 pub replica_id: super::BrokerId,
1243
1244 pub replica_epoch: i64,
1248
1249 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1251}
1252
1253impl ReplicaState {
1254 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
1260 self.replica_id = value;
1261 self
1262 }
1263 pub fn with_replica_epoch(mut self, value: i64) -> Self {
1269 self.replica_epoch = value;
1270 self
1271 }
1272 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1274 self.unknown_tagged_fields = value;
1275 self
1276 }
1277 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1279 self.unknown_tagged_fields.insert(key, value);
1280 self
1281 }
1282}
1283
1284#[cfg(feature = "client")]
1285impl Encodable for ReplicaState {
1286 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1287 if version < 0 || version > 17 {
1288 bail!("specified version not supported by this message type");
1289 }
1290 if version >= 15 {
1291 types::Int32.encode(buf, &self.replica_id)?;
1292 } else {
1293 if self.replica_id != -1 {
1294 bail!("A field is set that is not available on the selected protocol version");
1295 }
1296 }
1297 if version >= 15 {
1298 types::Int64.encode(buf, &self.replica_epoch)?;
1299 } else {
1300 if self.replica_epoch != -1 {
1301 bail!("A field is set that is not available on the selected protocol version");
1302 }
1303 }
1304 if version >= 12 {
1305 let num_tagged_fields = self.unknown_tagged_fields.len();
1306 if num_tagged_fields > std::u32::MAX as usize {
1307 bail!(
1308 "Too many tagged fields to encode ({} fields)",
1309 num_tagged_fields
1310 );
1311 }
1312 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1313
1314 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1315 }
1316 Ok(())
1317 }
1318 fn compute_size(&self, version: i16) -> Result<usize> {
1319 let mut total_size = 0;
1320 if version >= 15 {
1321 total_size += types::Int32.compute_size(&self.replica_id)?;
1322 } else {
1323 if self.replica_id != -1 {
1324 bail!("A field is set that is not available on the selected protocol version");
1325 }
1326 }
1327 if version >= 15 {
1328 total_size += types::Int64.compute_size(&self.replica_epoch)?;
1329 } else {
1330 if self.replica_epoch != -1 {
1331 bail!("A field is set that is not available on the selected protocol version");
1332 }
1333 }
1334 if version >= 12 {
1335 let num_tagged_fields = self.unknown_tagged_fields.len();
1336 if num_tagged_fields > std::u32::MAX as usize {
1337 bail!(
1338 "Too many tagged fields to encode ({} fields)",
1339 num_tagged_fields
1340 );
1341 }
1342 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1343
1344 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1345 }
1346 Ok(total_size)
1347 }
1348}
1349
1350#[cfg(feature = "broker")]
1351impl Decodable for ReplicaState {
1352 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1353 if version < 0 || version > 17 {
1354 bail!("specified version not supported by this message type");
1355 }
1356 let replica_id = if version >= 15 {
1357 types::Int32.decode(buf)?
1358 } else {
1359 (-1).into()
1360 };
1361 let replica_epoch = if version >= 15 {
1362 types::Int64.decode(buf)?
1363 } else {
1364 -1
1365 };
1366 let mut unknown_tagged_fields = BTreeMap::new();
1367 if version >= 12 {
1368 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1369 for _ in 0..num_tagged_fields {
1370 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1371 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1372 let unknown_value = buf.try_get_bytes(size as usize)?;
1373 unknown_tagged_fields.insert(tag as i32, unknown_value);
1374 }
1375 }
1376 Ok(Self {
1377 replica_id,
1378 replica_epoch,
1379 unknown_tagged_fields,
1380 })
1381 }
1382}
1383
1384impl Default for ReplicaState {
1385 fn default() -> Self {
1386 Self {
1387 replica_id: (-1).into(),
1388 replica_epoch: -1,
1389 unknown_tagged_fields: BTreeMap::new(),
1390 }
1391 }
1392}
1393
1394impl Message for ReplicaState {
1395 const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
1396 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
1397}
1398
1399impl HeaderVersion for FetchRequest {
1400 fn header_version(version: i16) -> i16 {
1401 if version >= 12 {
1402 2
1403 } else {
1404 1
1405 }
1406 }
1407}