1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct FetchPartition {
24 pub partition: i32,
28
29 pub current_leader_epoch: i32,
33
34 pub fetch_offset: i64,
38
39 pub last_fetched_epoch: i32,
43
44 pub log_start_offset: i64,
48
49 pub partition_max_bytes: i32,
53
54 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl FetchPartition {
59 pub fn with_partition(mut self, value: i32) -> Self {
65 self.partition = value;
66 self
67 }
68 pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
74 self.current_leader_epoch = value;
75 self
76 }
77 pub fn with_fetch_offset(mut self, value: i64) -> Self {
83 self.fetch_offset = value;
84 self
85 }
86 pub fn with_last_fetched_epoch(mut self, value: i32) -> Self {
92 self.last_fetched_epoch = value;
93 self
94 }
95 pub fn with_log_start_offset(mut self, value: i64) -> Self {
101 self.log_start_offset = value;
102 self
103 }
104 pub fn with_partition_max_bytes(mut self, value: i32) -> Self {
110 self.partition_max_bytes = value;
111 self
112 }
113 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115 self.unknown_tagged_fields = value;
116 self
117 }
118 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120 self.unknown_tagged_fields.insert(key, value);
121 self
122 }
123}
124
125#[cfg(feature = "client")]
126impl Encodable for FetchPartition {
127 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128 types::Int32.encode(buf, &self.partition)?;
129 if version >= 9 {
130 types::Int32.encode(buf, &self.current_leader_epoch)?;
131 }
132 types::Int64.encode(buf, &self.fetch_offset)?;
133 if version >= 12 {
134 types::Int32.encode(buf, &self.last_fetched_epoch)?;
135 } else {
136 if self.last_fetched_epoch != -1 {
137 bail!("A field is set that is not available on the selected protocol version");
138 }
139 }
140 if version >= 5 {
141 types::Int64.encode(buf, &self.log_start_offset)?;
142 }
143 types::Int32.encode(buf, &self.partition_max_bytes)?;
144 if version >= 12 {
145 let num_tagged_fields = self.unknown_tagged_fields.len();
146 if num_tagged_fields > std::u32::MAX as usize {
147 bail!(
148 "Too many tagged fields to encode ({} fields)",
149 num_tagged_fields
150 );
151 }
152 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
153
154 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
155 }
156 Ok(())
157 }
158 fn compute_size(&self, version: i16) -> Result<usize> {
159 let mut total_size = 0;
160 total_size += types::Int32.compute_size(&self.partition)?;
161 if version >= 9 {
162 total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
163 }
164 total_size += types::Int64.compute_size(&self.fetch_offset)?;
165 if version >= 12 {
166 total_size += types::Int32.compute_size(&self.last_fetched_epoch)?;
167 } else {
168 if self.last_fetched_epoch != -1 {
169 bail!("A field is set that is not available on the selected protocol version");
170 }
171 }
172 if version >= 5 {
173 total_size += types::Int64.compute_size(&self.log_start_offset)?;
174 }
175 total_size += types::Int32.compute_size(&self.partition_max_bytes)?;
176 if version >= 12 {
177 let num_tagged_fields = self.unknown_tagged_fields.len();
178 if num_tagged_fields > std::u32::MAX as usize {
179 bail!(
180 "Too many tagged fields to encode ({} fields)",
181 num_tagged_fields
182 );
183 }
184 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
185
186 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
187 }
188 Ok(total_size)
189 }
190}
191
192#[cfg(feature = "broker")]
193impl Decodable for FetchPartition {
194 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
195 let partition = types::Int32.decode(buf)?;
196 let current_leader_epoch = if version >= 9 {
197 types::Int32.decode(buf)?
198 } else {
199 -1
200 };
201 let fetch_offset = types::Int64.decode(buf)?;
202 let last_fetched_epoch = if version >= 12 {
203 types::Int32.decode(buf)?
204 } else {
205 -1
206 };
207 let log_start_offset = if version >= 5 {
208 types::Int64.decode(buf)?
209 } else {
210 -1
211 };
212 let partition_max_bytes = types::Int32.decode(buf)?;
213 let mut unknown_tagged_fields = BTreeMap::new();
214 if version >= 12 {
215 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
216 for _ in 0..num_tagged_fields {
217 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
218 let size: u32 = types::UnsignedVarInt.decode(buf)?;
219 let unknown_value = buf.try_get_bytes(size as usize)?;
220 unknown_tagged_fields.insert(tag as i32, unknown_value);
221 }
222 }
223 Ok(Self {
224 partition,
225 current_leader_epoch,
226 fetch_offset,
227 last_fetched_epoch,
228 log_start_offset,
229 partition_max_bytes,
230 unknown_tagged_fields,
231 })
232 }
233}
234
235impl Default for FetchPartition {
236 fn default() -> Self {
237 Self {
238 partition: 0,
239 current_leader_epoch: -1,
240 fetch_offset: 0,
241 last_fetched_epoch: -1,
242 log_start_offset: -1,
243 partition_max_bytes: 0,
244 unknown_tagged_fields: BTreeMap::new(),
245 }
246 }
247}
248
249impl Message for FetchPartition {
250 const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
251 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
252}
253
254#[non_exhaustive]
256#[derive(Debug, Clone, PartialEq)]
257pub struct FetchRequest {
258 pub cluster_id: Option<StrBytes>,
262
263 pub replica_id: super::BrokerId,
267
268 pub replica_state: ReplicaState,
272
273 pub max_wait_ms: i32,
277
278 pub min_bytes: i32,
282
283 pub max_bytes: i32,
287
288 pub isolation_level: i8,
292
293 pub session_id: i32,
297
298 pub session_epoch: i32,
302
303 pub topics: Vec<FetchTopic>,
307
308 pub forgotten_topics_data: Vec<ForgottenTopic>,
312
313 pub rack_id: StrBytes,
317
318 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
320}
321
322impl FetchRequest {
323 pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
329 self.cluster_id = value;
330 self
331 }
332 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
338 self.replica_id = value;
339 self
340 }
341 pub fn with_replica_state(mut self, value: ReplicaState) -> Self {
347 self.replica_state = value;
348 self
349 }
350 pub fn with_max_wait_ms(mut self, value: i32) -> Self {
356 self.max_wait_ms = value;
357 self
358 }
359 pub fn with_min_bytes(mut self, value: i32) -> Self {
365 self.min_bytes = value;
366 self
367 }
368 pub fn with_max_bytes(mut self, value: i32) -> Self {
374 self.max_bytes = value;
375 self
376 }
377 pub fn with_isolation_level(mut self, value: i8) -> Self {
383 self.isolation_level = value;
384 self
385 }
386 pub fn with_session_id(mut self, value: i32) -> Self {
392 self.session_id = value;
393 self
394 }
395 pub fn with_session_epoch(mut self, value: i32) -> Self {
401 self.session_epoch = value;
402 self
403 }
404 pub fn with_topics(mut self, value: Vec<FetchTopic>) -> Self {
410 self.topics = value;
411 self
412 }
413 pub fn with_forgotten_topics_data(mut self, value: Vec<ForgottenTopic>) -> Self {
419 self.forgotten_topics_data = value;
420 self
421 }
422 pub fn with_rack_id(mut self, value: StrBytes) -> Self {
428 self.rack_id = value;
429 self
430 }
431 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
433 self.unknown_tagged_fields = value;
434 self
435 }
436 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
438 self.unknown_tagged_fields.insert(key, value);
439 self
440 }
441}
442
443#[cfg(feature = "client")]
444impl Encodable for FetchRequest {
445 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
446 if version <= 14 {
447 types::Int32.encode(buf, &self.replica_id)?;
448 } else {
449 if self.replica_id != -1 {
450 bail!("A field is set that is not available on the selected protocol version");
451 }
452 }
453 types::Int32.encode(buf, &self.max_wait_ms)?;
454 types::Int32.encode(buf, &self.min_bytes)?;
455 if version >= 3 {
456 types::Int32.encode(buf, &self.max_bytes)?;
457 }
458 if version >= 4 {
459 types::Int8.encode(buf, &self.isolation_level)?;
460 }
461 if version >= 7 {
462 types::Int32.encode(buf, &self.session_id)?;
463 }
464 if version >= 7 {
465 types::Int32.encode(buf, &self.session_epoch)?;
466 }
467 if version >= 12 {
468 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
469 } else {
470 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
471 }
472 if version >= 7 {
473 if version >= 12 {
474 types::CompactArray(types::Struct { version })
475 .encode(buf, &self.forgotten_topics_data)?;
476 } else {
477 types::Array(types::Struct { version }).encode(buf, &self.forgotten_topics_data)?;
478 }
479 } else {
480 if !self.forgotten_topics_data.is_empty() {
481 bail!("A field is set that is not available on the selected protocol version");
482 }
483 }
484 if version >= 11 {
485 if version >= 12 {
486 types::CompactString.encode(buf, &self.rack_id)?;
487 } else {
488 types::String.encode(buf, &self.rack_id)?;
489 }
490 }
491 if version >= 12 {
492 let mut num_tagged_fields = self.unknown_tagged_fields.len();
493 if !self.cluster_id.is_none() {
494 num_tagged_fields += 1;
495 }
496 if version >= 15 {
497 if &self.replica_state != &Default::default() {
498 num_tagged_fields += 1;
499 }
500 }
501 if num_tagged_fields > std::u32::MAX as usize {
502 bail!(
503 "Too many tagged fields to encode ({} fields)",
504 num_tagged_fields
505 );
506 }
507 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
508 if !self.cluster_id.is_none() {
509 let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
510 if computed_size > std::u32::MAX as usize {
511 bail!(
512 "Tagged field is too large to encode ({} bytes)",
513 computed_size
514 );
515 }
516 types::UnsignedVarInt.encode(buf, 0)?;
517 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
518 types::CompactString.encode(buf, &self.cluster_id)?;
519 }
520 if version >= 15 {
521 if &self.replica_state != &Default::default() {
522 let computed_size =
523 types::Struct { version }.compute_size(&self.replica_state)?;
524 if computed_size > std::u32::MAX as usize {
525 bail!(
526 "Tagged field is too large to encode ({} bytes)",
527 computed_size
528 );
529 }
530 types::UnsignedVarInt.encode(buf, 1)?;
531 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
532 types::Struct { version }.encode(buf, &self.replica_state)?;
533 }
534 }
535 write_unknown_tagged_fields(buf, 2.., &self.unknown_tagged_fields)?;
536 }
537 Ok(())
538 }
539 fn compute_size(&self, version: i16) -> Result<usize> {
540 let mut total_size = 0;
541 if version <= 14 {
542 total_size += types::Int32.compute_size(&self.replica_id)?;
543 } else {
544 if self.replica_id != -1 {
545 bail!("A field is set that is not available on the selected protocol version");
546 }
547 }
548 total_size += types::Int32.compute_size(&self.max_wait_ms)?;
549 total_size += types::Int32.compute_size(&self.min_bytes)?;
550 if version >= 3 {
551 total_size += types::Int32.compute_size(&self.max_bytes)?;
552 }
553 if version >= 4 {
554 total_size += types::Int8.compute_size(&self.isolation_level)?;
555 }
556 if version >= 7 {
557 total_size += types::Int32.compute_size(&self.session_id)?;
558 }
559 if version >= 7 {
560 total_size += types::Int32.compute_size(&self.session_epoch)?;
561 }
562 if version >= 12 {
563 total_size +=
564 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
565 } else {
566 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
567 }
568 if version >= 7 {
569 if version >= 12 {
570 total_size += types::CompactArray(types::Struct { version })
571 .compute_size(&self.forgotten_topics_data)?;
572 } else {
573 total_size += types::Array(types::Struct { version })
574 .compute_size(&self.forgotten_topics_data)?;
575 }
576 } else {
577 if !self.forgotten_topics_data.is_empty() {
578 bail!("A field is set that is not available on the selected protocol version");
579 }
580 }
581 if version >= 11 {
582 if version >= 12 {
583 total_size += types::CompactString.compute_size(&self.rack_id)?;
584 } else {
585 total_size += types::String.compute_size(&self.rack_id)?;
586 }
587 }
588 if version >= 12 {
589 let mut num_tagged_fields = self.unknown_tagged_fields.len();
590 if !self.cluster_id.is_none() {
591 num_tagged_fields += 1;
592 }
593 if version >= 15 {
594 if &self.replica_state != &Default::default() {
595 num_tagged_fields += 1;
596 }
597 }
598 if num_tagged_fields > std::u32::MAX as usize {
599 bail!(
600 "Too many tagged fields to encode ({} fields)",
601 num_tagged_fields
602 );
603 }
604 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
605 if !self.cluster_id.is_none() {
606 let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
607 if computed_size > std::u32::MAX as usize {
608 bail!(
609 "Tagged field is too large to encode ({} bytes)",
610 computed_size
611 );
612 }
613 total_size += types::UnsignedVarInt.compute_size(0)?;
614 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
615 total_size += computed_size;
616 }
617 if version >= 15 {
618 if &self.replica_state != &Default::default() {
619 let computed_size =
620 types::Struct { version }.compute_size(&self.replica_state)?;
621 if computed_size > std::u32::MAX as usize {
622 bail!(
623 "Tagged field is too large to encode ({} bytes)",
624 computed_size
625 );
626 }
627 total_size += types::UnsignedVarInt.compute_size(1)?;
628 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
629 total_size += computed_size;
630 }
631 }
632 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
633 }
634 Ok(total_size)
635 }
636}
637
638#[cfg(feature = "broker")]
639impl Decodable for FetchRequest {
640 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
641 let mut cluster_id = None;
642 let replica_id = if version <= 14 {
643 types::Int32.decode(buf)?
644 } else {
645 (-1).into()
646 };
647 let mut replica_state = Default::default();
648 let max_wait_ms = types::Int32.decode(buf)?;
649 let min_bytes = types::Int32.decode(buf)?;
650 let max_bytes = if version >= 3 {
651 types::Int32.decode(buf)?
652 } else {
653 0x7fffffff
654 };
655 let isolation_level = if version >= 4 {
656 types::Int8.decode(buf)?
657 } else {
658 0
659 };
660 let session_id = if version >= 7 {
661 types::Int32.decode(buf)?
662 } else {
663 0
664 };
665 let session_epoch = if version >= 7 {
666 types::Int32.decode(buf)?
667 } else {
668 -1
669 };
670 let topics = if version >= 12 {
671 types::CompactArray(types::Struct { version }).decode(buf)?
672 } else {
673 types::Array(types::Struct { version }).decode(buf)?
674 };
675 let forgotten_topics_data = if version >= 7 {
676 if version >= 12 {
677 types::CompactArray(types::Struct { version }).decode(buf)?
678 } else {
679 types::Array(types::Struct { version }).decode(buf)?
680 }
681 } else {
682 Default::default()
683 };
684 let rack_id = if version >= 11 {
685 if version >= 12 {
686 types::CompactString.decode(buf)?
687 } else {
688 types::String.decode(buf)?
689 }
690 } else {
691 StrBytes::from_static_str("")
692 };
693 let mut unknown_tagged_fields = BTreeMap::new();
694 if version >= 12 {
695 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
696 for _ in 0..num_tagged_fields {
697 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
698 let size: u32 = types::UnsignedVarInt.decode(buf)?;
699 match tag {
700 0 => {
701 cluster_id = types::CompactString.decode(buf)?;
702 }
703 1 => {
704 if version >= 15 {
705 replica_state = types::Struct { version }.decode(buf)?;
706 } else {
707 bail!("Tag {} is not valid for version {}", tag, version);
708 }
709 }
710 _ => {
711 let unknown_value = buf.try_get_bytes(size as usize)?;
712 unknown_tagged_fields.insert(tag as i32, unknown_value);
713 }
714 }
715 }
716 }
717 Ok(Self {
718 cluster_id,
719 replica_id,
720 replica_state,
721 max_wait_ms,
722 min_bytes,
723 max_bytes,
724 isolation_level,
725 session_id,
726 session_epoch,
727 topics,
728 forgotten_topics_data,
729 rack_id,
730 unknown_tagged_fields,
731 })
732 }
733}
734
735impl Default for FetchRequest {
736 fn default() -> Self {
737 Self {
738 cluster_id: None,
739 replica_id: (-1).into(),
740 replica_state: Default::default(),
741 max_wait_ms: 0,
742 min_bytes: 0,
743 max_bytes: 0x7fffffff,
744 isolation_level: 0,
745 session_id: 0,
746 session_epoch: -1,
747 topics: Default::default(),
748 forgotten_topics_data: Default::default(),
749 rack_id: StrBytes::from_static_str(""),
750 unknown_tagged_fields: BTreeMap::new(),
751 }
752 }
753}
754
755impl Message for FetchRequest {
756 const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
757 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
758}
759
760#[non_exhaustive]
762#[derive(Debug, Clone, PartialEq)]
763pub struct FetchTopic {
764 pub topic: super::TopicName,
768
769 pub topic_id: Uuid,
773
774 pub partitions: Vec<FetchPartition>,
778
779 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
781}
782
783impl FetchTopic {
784 pub fn with_topic(mut self, value: super::TopicName) -> Self {
790 self.topic = value;
791 self
792 }
793 pub fn with_topic_id(mut self, value: Uuid) -> Self {
799 self.topic_id = value;
800 self
801 }
802 pub fn with_partitions(mut self, value: Vec<FetchPartition>) -> Self {
808 self.partitions = value;
809 self
810 }
811 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
813 self.unknown_tagged_fields = value;
814 self
815 }
816 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
818 self.unknown_tagged_fields.insert(key, value);
819 self
820 }
821}
822
823#[cfg(feature = "client")]
824impl Encodable for FetchTopic {
825 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
826 if version <= 12 {
827 if version >= 12 {
828 types::CompactString.encode(buf, &self.topic)?;
829 } else {
830 types::String.encode(buf, &self.topic)?;
831 }
832 }
833 if version >= 13 {
834 types::Uuid.encode(buf, &self.topic_id)?;
835 }
836 if version >= 12 {
837 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
838 } else {
839 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
840 }
841 if version >= 12 {
842 let num_tagged_fields = self.unknown_tagged_fields.len();
843 if num_tagged_fields > std::u32::MAX as usize {
844 bail!(
845 "Too many tagged fields to encode ({} fields)",
846 num_tagged_fields
847 );
848 }
849 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
850
851 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
852 }
853 Ok(())
854 }
855 fn compute_size(&self, version: i16) -> Result<usize> {
856 let mut total_size = 0;
857 if version <= 12 {
858 if version >= 12 {
859 total_size += types::CompactString.compute_size(&self.topic)?;
860 } else {
861 total_size += types::String.compute_size(&self.topic)?;
862 }
863 }
864 if version >= 13 {
865 total_size += types::Uuid.compute_size(&self.topic_id)?;
866 }
867 if version >= 12 {
868 total_size +=
869 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
870 } else {
871 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
872 }
873 if version >= 12 {
874 let num_tagged_fields = self.unknown_tagged_fields.len();
875 if num_tagged_fields > std::u32::MAX as usize {
876 bail!(
877 "Too many tagged fields to encode ({} fields)",
878 num_tagged_fields
879 );
880 }
881 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
882
883 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
884 }
885 Ok(total_size)
886 }
887}
888
889#[cfg(feature = "broker")]
890impl Decodable for FetchTopic {
891 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
892 let topic = if version <= 12 {
893 if version >= 12 {
894 types::CompactString.decode(buf)?
895 } else {
896 types::String.decode(buf)?
897 }
898 } else {
899 Default::default()
900 };
901 let topic_id = if version >= 13 {
902 types::Uuid.decode(buf)?
903 } else {
904 Uuid::nil()
905 };
906 let partitions = if version >= 12 {
907 types::CompactArray(types::Struct { version }).decode(buf)?
908 } else {
909 types::Array(types::Struct { version }).decode(buf)?
910 };
911 let mut unknown_tagged_fields = BTreeMap::new();
912 if version >= 12 {
913 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
914 for _ in 0..num_tagged_fields {
915 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
916 let size: u32 = types::UnsignedVarInt.decode(buf)?;
917 let unknown_value = buf.try_get_bytes(size as usize)?;
918 unknown_tagged_fields.insert(tag as i32, unknown_value);
919 }
920 }
921 Ok(Self {
922 topic,
923 topic_id,
924 partitions,
925 unknown_tagged_fields,
926 })
927 }
928}
929
930impl Default for FetchTopic {
931 fn default() -> Self {
932 Self {
933 topic: Default::default(),
934 topic_id: Uuid::nil(),
935 partitions: Default::default(),
936 unknown_tagged_fields: BTreeMap::new(),
937 }
938 }
939}
940
941impl Message for FetchTopic {
942 const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
943 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
944}
945
946#[non_exhaustive]
948#[derive(Debug, Clone, PartialEq)]
949pub struct ForgottenTopic {
950 pub topic: super::TopicName,
954
955 pub topic_id: Uuid,
959
960 pub partitions: Vec<i32>,
964
965 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
967}
968
969impl ForgottenTopic {
970 pub fn with_topic(mut self, value: super::TopicName) -> Self {
976 self.topic = value;
977 self
978 }
979 pub fn with_topic_id(mut self, value: Uuid) -> Self {
985 self.topic_id = value;
986 self
987 }
988 pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
994 self.partitions = value;
995 self
996 }
997 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
999 self.unknown_tagged_fields = value;
1000 self
1001 }
1002 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1004 self.unknown_tagged_fields.insert(key, value);
1005 self
1006 }
1007}
1008
1009#[cfg(feature = "client")]
1010impl Encodable for ForgottenTopic {
1011 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1012 if version >= 7 && version <= 12 {
1013 if version >= 12 {
1014 types::CompactString.encode(buf, &self.topic)?;
1015 } else {
1016 types::String.encode(buf, &self.topic)?;
1017 }
1018 }
1019 if version >= 13 {
1020 types::Uuid.encode(buf, &self.topic_id)?;
1021 }
1022 if version >= 7 {
1023 if version >= 12 {
1024 types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
1025 } else {
1026 types::Array(types::Int32).encode(buf, &self.partitions)?;
1027 }
1028 } else {
1029 if !self.partitions.is_empty() {
1030 bail!("A field is set that is not available on the selected protocol version");
1031 }
1032 }
1033 if version >= 12 {
1034 let num_tagged_fields = self.unknown_tagged_fields.len();
1035 if num_tagged_fields > std::u32::MAX as usize {
1036 bail!(
1037 "Too many tagged fields to encode ({} fields)",
1038 num_tagged_fields
1039 );
1040 }
1041 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1042
1043 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1044 }
1045 Ok(())
1046 }
1047 fn compute_size(&self, version: i16) -> Result<usize> {
1048 let mut total_size = 0;
1049 if version >= 7 && version <= 12 {
1050 if version >= 12 {
1051 total_size += types::CompactString.compute_size(&self.topic)?;
1052 } else {
1053 total_size += types::String.compute_size(&self.topic)?;
1054 }
1055 }
1056 if version >= 13 {
1057 total_size += types::Uuid.compute_size(&self.topic_id)?;
1058 }
1059 if version >= 7 {
1060 if version >= 12 {
1061 total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
1062 } else {
1063 total_size += types::Array(types::Int32).compute_size(&self.partitions)?;
1064 }
1065 } else {
1066 if !self.partitions.is_empty() {
1067 bail!("A field is set that is not available on the selected protocol version");
1068 }
1069 }
1070 if version >= 12 {
1071 let num_tagged_fields = self.unknown_tagged_fields.len();
1072 if num_tagged_fields > std::u32::MAX as usize {
1073 bail!(
1074 "Too many tagged fields to encode ({} fields)",
1075 num_tagged_fields
1076 );
1077 }
1078 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1079
1080 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1081 }
1082 Ok(total_size)
1083 }
1084}
1085
1086#[cfg(feature = "broker")]
1087impl Decodable for ForgottenTopic {
1088 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1089 let topic = if version >= 7 && version <= 12 {
1090 if version >= 12 {
1091 types::CompactString.decode(buf)?
1092 } else {
1093 types::String.decode(buf)?
1094 }
1095 } else {
1096 Default::default()
1097 };
1098 let topic_id = if version >= 13 {
1099 types::Uuid.decode(buf)?
1100 } else {
1101 Uuid::nil()
1102 };
1103 let partitions = if version >= 7 {
1104 if version >= 12 {
1105 types::CompactArray(types::Int32).decode(buf)?
1106 } else {
1107 types::Array(types::Int32).decode(buf)?
1108 }
1109 } else {
1110 Default::default()
1111 };
1112 let mut unknown_tagged_fields = BTreeMap::new();
1113 if version >= 12 {
1114 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1115 for _ in 0..num_tagged_fields {
1116 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1117 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1118 let unknown_value = buf.try_get_bytes(size as usize)?;
1119 unknown_tagged_fields.insert(tag as i32, unknown_value);
1120 }
1121 }
1122 Ok(Self {
1123 topic,
1124 topic_id,
1125 partitions,
1126 unknown_tagged_fields,
1127 })
1128 }
1129}
1130
1131impl Default for ForgottenTopic {
1132 fn default() -> Self {
1133 Self {
1134 topic: Default::default(),
1135 topic_id: Uuid::nil(),
1136 partitions: Default::default(),
1137 unknown_tagged_fields: BTreeMap::new(),
1138 }
1139 }
1140}
1141
1142impl Message for ForgottenTopic {
1143 const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
1144 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
1145}
1146
1147#[non_exhaustive]
1149#[derive(Debug, Clone, PartialEq)]
1150pub struct ReplicaState {
1151 pub replica_id: super::BrokerId,
1155
1156 pub replica_epoch: i64,
1160
1161 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1163}
1164
1165impl ReplicaState {
1166 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
1172 self.replica_id = value;
1173 self
1174 }
1175 pub fn with_replica_epoch(mut self, value: i64) -> Self {
1181 self.replica_epoch = value;
1182 self
1183 }
1184 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1186 self.unknown_tagged_fields = value;
1187 self
1188 }
1189 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1191 self.unknown_tagged_fields.insert(key, value);
1192 self
1193 }
1194}
1195
1196#[cfg(feature = "client")]
1197impl Encodable for ReplicaState {
1198 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1199 if version >= 15 {
1200 types::Int32.encode(buf, &self.replica_id)?;
1201 } else {
1202 if self.replica_id != -1 {
1203 bail!("A field is set that is not available on the selected protocol version");
1204 }
1205 }
1206 if version >= 15 {
1207 types::Int64.encode(buf, &self.replica_epoch)?;
1208 } else {
1209 if self.replica_epoch != -1 {
1210 bail!("A field is set that is not available on the selected protocol version");
1211 }
1212 }
1213 if version >= 12 {
1214 let num_tagged_fields = self.unknown_tagged_fields.len();
1215 if num_tagged_fields > std::u32::MAX as usize {
1216 bail!(
1217 "Too many tagged fields to encode ({} fields)",
1218 num_tagged_fields
1219 );
1220 }
1221 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1222
1223 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1224 }
1225 Ok(())
1226 }
1227 fn compute_size(&self, version: i16) -> Result<usize> {
1228 let mut total_size = 0;
1229 if version >= 15 {
1230 total_size += types::Int32.compute_size(&self.replica_id)?;
1231 } else {
1232 if self.replica_id != -1 {
1233 bail!("A field is set that is not available on the selected protocol version");
1234 }
1235 }
1236 if version >= 15 {
1237 total_size += types::Int64.compute_size(&self.replica_epoch)?;
1238 } else {
1239 if self.replica_epoch != -1 {
1240 bail!("A field is set that is not available on the selected protocol version");
1241 }
1242 }
1243 if version >= 12 {
1244 let num_tagged_fields = self.unknown_tagged_fields.len();
1245 if num_tagged_fields > std::u32::MAX as usize {
1246 bail!(
1247 "Too many tagged fields to encode ({} fields)",
1248 num_tagged_fields
1249 );
1250 }
1251 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1252
1253 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1254 }
1255 Ok(total_size)
1256 }
1257}
1258
1259#[cfg(feature = "broker")]
1260impl Decodable for ReplicaState {
1261 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1262 let replica_id = if version >= 15 {
1263 types::Int32.decode(buf)?
1264 } else {
1265 (-1).into()
1266 };
1267 let replica_epoch = if version >= 15 {
1268 types::Int64.decode(buf)?
1269 } else {
1270 -1
1271 };
1272 let mut unknown_tagged_fields = BTreeMap::new();
1273 if version >= 12 {
1274 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1275 for _ in 0..num_tagged_fields {
1276 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1277 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1278 let unknown_value = buf.try_get_bytes(size as usize)?;
1279 unknown_tagged_fields.insert(tag as i32, unknown_value);
1280 }
1281 }
1282 Ok(Self {
1283 replica_id,
1284 replica_epoch,
1285 unknown_tagged_fields,
1286 })
1287 }
1288}
1289
1290impl Default for ReplicaState {
1291 fn default() -> Self {
1292 Self {
1293 replica_id: (-1).into(),
1294 replica_epoch: -1,
1295 unknown_tagged_fields: BTreeMap::new(),
1296 }
1297 }
1298}
1299
1300impl Message for ReplicaState {
1301 const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
1302 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
1303}
1304
1305impl HeaderVersion for FetchRequest {
1306 fn header_version(version: i16) -> i16 {
1307 if version >= 12 {
1308 2
1309 } else {
1310 1
1311 }
1312 }
1313}