1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct FetchPartition {
24 pub partition: i32,
28
29 pub current_leader_epoch: i32,
33
34 pub fetch_offset: i64,
38
39 pub last_fetched_epoch: i32,
43
44 pub log_start_offset: i64,
48
49 pub partition_max_bytes: i32,
53
54 pub replica_directory_id: Uuid,
58
59 pub high_watermark: i64,
63
64 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
66}
67
68impl FetchPartition {
69 pub fn with_partition(mut self, value: i32) -> Self {
75 self.partition = value;
76 self
77 }
78 pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
84 self.current_leader_epoch = value;
85 self
86 }
87 pub fn with_fetch_offset(mut self, value: i64) -> Self {
93 self.fetch_offset = value;
94 self
95 }
96 pub fn with_last_fetched_epoch(mut self, value: i32) -> Self {
102 self.last_fetched_epoch = value;
103 self
104 }
105 pub fn with_log_start_offset(mut self, value: i64) -> Self {
111 self.log_start_offset = value;
112 self
113 }
114 pub fn with_partition_max_bytes(mut self, value: i32) -> Self {
120 self.partition_max_bytes = value;
121 self
122 }
123 pub fn with_replica_directory_id(mut self, value: Uuid) -> Self {
129 self.replica_directory_id = value;
130 self
131 }
132 pub fn with_high_watermark(mut self, value: i64) -> Self {
138 self.high_watermark = value;
139 self
140 }
141 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
143 self.unknown_tagged_fields = value;
144 self
145 }
146 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
148 self.unknown_tagged_fields.insert(key, value);
149 self
150 }
151}
152
153#[cfg(feature = "client")]
154impl Encodable for FetchPartition {
155 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
156 if version < 4 || version > 18 {
157 bail!("specified version not supported by this message type");
158 }
159 types::Int32.encode(buf, &self.partition)?;
160 if version >= 9 {
161 types::Int32.encode(buf, &self.current_leader_epoch)?;
162 }
163 types::Int64.encode(buf, &self.fetch_offset)?;
164 if version >= 12 {
165 types::Int32.encode(buf, &self.last_fetched_epoch)?;
166 } else {
167 if self.last_fetched_epoch != -1 {
168 bail!("A field is set that is not available on the selected protocol version");
169 }
170 }
171 if version >= 5 {
172 types::Int64.encode(buf, &self.log_start_offset)?;
173 }
174 types::Int32.encode(buf, &self.partition_max_bytes)?;
175 if version >= 12 {
176 let mut num_tagged_fields = self.unknown_tagged_fields.len();
177 if version >= 17 {
178 if &self.replica_directory_id != &Uuid::nil() {
179 num_tagged_fields += 1;
180 }
181 }
182 if version >= 18 {
183 if self.high_watermark != 9223372036854775807 {
184 num_tagged_fields += 1;
185 }
186 }
187 if num_tagged_fields > std::u32::MAX as usize {
188 bail!(
189 "Too many tagged fields to encode ({} fields)",
190 num_tagged_fields
191 );
192 }
193 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
194 if version >= 17 {
195 if &self.replica_directory_id != &Uuid::nil() {
196 let computed_size = types::Uuid.compute_size(&self.replica_directory_id)?;
197 if computed_size > std::u32::MAX as usize {
198 bail!(
199 "Tagged field is too large to encode ({} bytes)",
200 computed_size
201 );
202 }
203 types::UnsignedVarInt.encode(buf, 0)?;
204 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
205 types::Uuid.encode(buf, &self.replica_directory_id)?;
206 }
207 }
208 if version >= 18 {
209 if self.high_watermark != 9223372036854775807 {
210 let computed_size = types::Int64.compute_size(&self.high_watermark)?;
211 if computed_size > std::u32::MAX as usize {
212 bail!(
213 "Tagged field is too large to encode ({} bytes)",
214 computed_size
215 );
216 }
217 types::UnsignedVarInt.encode(buf, 1)?;
218 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
219 types::Int64.encode(buf, &self.high_watermark)?;
220 }
221 }
222 write_unknown_tagged_fields(buf, 2.., &self.unknown_tagged_fields)?;
223 }
224 Ok(())
225 }
226 fn compute_size(&self, version: i16) -> Result<usize> {
227 let mut total_size = 0;
228 total_size += types::Int32.compute_size(&self.partition)?;
229 if version >= 9 {
230 total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
231 }
232 total_size += types::Int64.compute_size(&self.fetch_offset)?;
233 if version >= 12 {
234 total_size += types::Int32.compute_size(&self.last_fetched_epoch)?;
235 } else {
236 if self.last_fetched_epoch != -1 {
237 bail!("A field is set that is not available on the selected protocol version");
238 }
239 }
240 if version >= 5 {
241 total_size += types::Int64.compute_size(&self.log_start_offset)?;
242 }
243 total_size += types::Int32.compute_size(&self.partition_max_bytes)?;
244 if version >= 12 {
245 let mut num_tagged_fields = self.unknown_tagged_fields.len();
246 if version >= 17 {
247 if &self.replica_directory_id != &Uuid::nil() {
248 num_tagged_fields += 1;
249 }
250 }
251 if version >= 18 {
252 if self.high_watermark != 9223372036854775807 {
253 num_tagged_fields += 1;
254 }
255 }
256 if num_tagged_fields > std::u32::MAX as usize {
257 bail!(
258 "Too many tagged fields to encode ({} fields)",
259 num_tagged_fields
260 );
261 }
262 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
263 if version >= 17 {
264 if &self.replica_directory_id != &Uuid::nil() {
265 let computed_size = types::Uuid.compute_size(&self.replica_directory_id)?;
266 if computed_size > std::u32::MAX as usize {
267 bail!(
268 "Tagged field is too large to encode ({} bytes)",
269 computed_size
270 );
271 }
272 total_size += types::UnsignedVarInt.compute_size(0)?;
273 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
274 total_size += computed_size;
275 }
276 }
277 if version >= 18 {
278 if self.high_watermark != 9223372036854775807 {
279 let computed_size = types::Int64.compute_size(&self.high_watermark)?;
280 if computed_size > std::u32::MAX as usize {
281 bail!(
282 "Tagged field is too large to encode ({} bytes)",
283 computed_size
284 );
285 }
286 total_size += types::UnsignedVarInt.compute_size(1)?;
287 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
288 total_size += computed_size;
289 }
290 }
291 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
292 }
293 Ok(total_size)
294 }
295}
296
297#[cfg(feature = "broker")]
298impl Decodable for FetchPartition {
299 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
300 if version < 4 || version > 18 {
301 bail!("specified version not supported by this message type");
302 }
303 let partition = types::Int32.decode(buf)?;
304 let current_leader_epoch = if version >= 9 {
305 types::Int32.decode(buf)?
306 } else {
307 -1
308 };
309 let fetch_offset = types::Int64.decode(buf)?;
310 let last_fetched_epoch = if version >= 12 {
311 types::Int32.decode(buf)?
312 } else {
313 -1
314 };
315 let log_start_offset = if version >= 5 {
316 types::Int64.decode(buf)?
317 } else {
318 -1
319 };
320 let partition_max_bytes = types::Int32.decode(buf)?;
321 let mut replica_directory_id = Uuid::nil();
322 let mut high_watermark = 9223372036854775807;
323 let mut unknown_tagged_fields = BTreeMap::new();
324 if version >= 12 {
325 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
326 for _ in 0..num_tagged_fields {
327 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
328 let size: u32 = types::UnsignedVarInt.decode(buf)?;
329 match tag {
330 0 => {
331 if version >= 17 {
332 replica_directory_id = types::Uuid.decode(buf)?;
333 } else {
334 bail!("Tag {} is not valid for version {}", tag, version);
335 }
336 }
337 1 => {
338 if version >= 18 {
339 high_watermark = types::Int64.decode(buf)?;
340 } else {
341 bail!("Tag {} is not valid for version {}", tag, version);
342 }
343 }
344 _ => {
345 let unknown_value = buf.try_get_bytes(size as usize)?;
346 unknown_tagged_fields.insert(tag as i32, unknown_value);
347 }
348 }
349 }
350 }
351 Ok(Self {
352 partition,
353 current_leader_epoch,
354 fetch_offset,
355 last_fetched_epoch,
356 log_start_offset,
357 partition_max_bytes,
358 replica_directory_id,
359 high_watermark,
360 unknown_tagged_fields,
361 })
362 }
363}
364
365impl Default for FetchPartition {
366 fn default() -> Self {
367 Self {
368 partition: 0,
369 current_leader_epoch: -1,
370 fetch_offset: 0,
371 last_fetched_epoch: -1,
372 log_start_offset: -1,
373 partition_max_bytes: 0,
374 replica_directory_id: Uuid::nil(),
375 high_watermark: 9223372036854775807,
376 unknown_tagged_fields: BTreeMap::new(),
377 }
378 }
379}
380
381impl Message for FetchPartition {
382 const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
383 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
384}
385
386#[non_exhaustive]
388#[derive(Debug, Clone, PartialEq)]
389pub struct FetchRequest {
390 pub cluster_id: Option<StrBytes>,
394
395 pub replica_id: super::BrokerId,
399
400 pub replica_state: ReplicaState,
404
405 pub max_wait_ms: i32,
409
410 pub min_bytes: i32,
414
415 pub max_bytes: i32,
419
420 pub isolation_level: i8,
424
425 pub session_id: i32,
429
430 pub session_epoch: i32,
434
435 pub topics: Vec<FetchTopic>,
439
440 pub forgotten_topics_data: Vec<ForgottenTopic>,
444
445 pub rack_id: StrBytes,
449
450 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
452}
453
454impl FetchRequest {
455 pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
461 self.cluster_id = value;
462 self
463 }
464 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
470 self.replica_id = value;
471 self
472 }
473 pub fn with_replica_state(mut self, value: ReplicaState) -> Self {
479 self.replica_state = value;
480 self
481 }
482 pub fn with_max_wait_ms(mut self, value: i32) -> Self {
488 self.max_wait_ms = value;
489 self
490 }
491 pub fn with_min_bytes(mut self, value: i32) -> Self {
497 self.min_bytes = value;
498 self
499 }
500 pub fn with_max_bytes(mut self, value: i32) -> Self {
506 self.max_bytes = value;
507 self
508 }
509 pub fn with_isolation_level(mut self, value: i8) -> Self {
515 self.isolation_level = value;
516 self
517 }
518 pub fn with_session_id(mut self, value: i32) -> Self {
524 self.session_id = value;
525 self
526 }
527 pub fn with_session_epoch(mut self, value: i32) -> Self {
533 self.session_epoch = value;
534 self
535 }
536 pub fn with_topics(mut self, value: Vec<FetchTopic>) -> Self {
542 self.topics = value;
543 self
544 }
545 pub fn with_forgotten_topics_data(mut self, value: Vec<ForgottenTopic>) -> Self {
551 self.forgotten_topics_data = value;
552 self
553 }
554 pub fn with_rack_id(mut self, value: StrBytes) -> Self {
560 self.rack_id = value;
561 self
562 }
563 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
565 self.unknown_tagged_fields = value;
566 self
567 }
568 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
570 self.unknown_tagged_fields.insert(key, value);
571 self
572 }
573}
574
575#[cfg(feature = "client")]
576impl Encodable for FetchRequest {
577 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
578 if version < 4 || version > 18 {
579 bail!("specified version not supported by this message type");
580 }
581 if version <= 14 {
582 types::Int32.encode(buf, &self.replica_id)?;
583 } else {
584 if self.replica_id != -1 {
585 bail!("A field is set that is not available on the selected protocol version");
586 }
587 }
588 types::Int32.encode(buf, &self.max_wait_ms)?;
589 types::Int32.encode(buf, &self.min_bytes)?;
590 types::Int32.encode(buf, &self.max_bytes)?;
591 types::Int8.encode(buf, &self.isolation_level)?;
592 if version >= 7 {
593 types::Int32.encode(buf, &self.session_id)?;
594 }
595 if version >= 7 {
596 types::Int32.encode(buf, &self.session_epoch)?;
597 }
598 if version >= 12 {
599 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
600 } else {
601 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
602 }
603 if version >= 7 {
604 if version >= 12 {
605 types::CompactArray(types::Struct { version })
606 .encode(buf, &self.forgotten_topics_data)?;
607 } else {
608 types::Array(types::Struct { version }).encode(buf, &self.forgotten_topics_data)?;
609 }
610 } else {
611 if !self.forgotten_topics_data.is_empty() {
612 bail!("A field is set that is not available on the selected protocol version");
613 }
614 }
615 if version >= 11 {
616 if version >= 12 {
617 types::CompactString.encode(buf, &self.rack_id)?;
618 } else {
619 types::String.encode(buf, &self.rack_id)?;
620 }
621 }
622 if version >= 12 {
623 let mut num_tagged_fields = self.unknown_tagged_fields.len();
624 if !self.cluster_id.is_none() {
625 num_tagged_fields += 1;
626 }
627 if version >= 15 {
628 if &self.replica_state != &Default::default() {
629 num_tagged_fields += 1;
630 }
631 }
632 if num_tagged_fields > std::u32::MAX as usize {
633 bail!(
634 "Too many tagged fields to encode ({} fields)",
635 num_tagged_fields
636 );
637 }
638 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
639 if !self.cluster_id.is_none() {
640 let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
641 if computed_size > std::u32::MAX as usize {
642 bail!(
643 "Tagged field is too large to encode ({} bytes)",
644 computed_size
645 );
646 }
647 types::UnsignedVarInt.encode(buf, 0)?;
648 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
649 types::CompactString.encode(buf, &self.cluster_id)?;
650 }
651 if version >= 15 {
652 if &self.replica_state != &Default::default() {
653 let computed_size =
654 types::Struct { version }.compute_size(&self.replica_state)?;
655 if computed_size > std::u32::MAX as usize {
656 bail!(
657 "Tagged field is too large to encode ({} bytes)",
658 computed_size
659 );
660 }
661 types::UnsignedVarInt.encode(buf, 1)?;
662 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
663 types::Struct { version }.encode(buf, &self.replica_state)?;
664 }
665 }
666 write_unknown_tagged_fields(buf, 2.., &self.unknown_tagged_fields)?;
667 }
668 Ok(())
669 }
670 fn compute_size(&self, version: i16) -> Result<usize> {
671 let mut total_size = 0;
672 if version <= 14 {
673 total_size += types::Int32.compute_size(&self.replica_id)?;
674 } else {
675 if self.replica_id != -1 {
676 bail!("A field is set that is not available on the selected protocol version");
677 }
678 }
679 total_size += types::Int32.compute_size(&self.max_wait_ms)?;
680 total_size += types::Int32.compute_size(&self.min_bytes)?;
681 total_size += types::Int32.compute_size(&self.max_bytes)?;
682 total_size += types::Int8.compute_size(&self.isolation_level)?;
683 if version >= 7 {
684 total_size += types::Int32.compute_size(&self.session_id)?;
685 }
686 if version >= 7 {
687 total_size += types::Int32.compute_size(&self.session_epoch)?;
688 }
689 if version >= 12 {
690 total_size +=
691 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
692 } else {
693 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
694 }
695 if version >= 7 {
696 if version >= 12 {
697 total_size += types::CompactArray(types::Struct { version })
698 .compute_size(&self.forgotten_topics_data)?;
699 } else {
700 total_size += types::Array(types::Struct { version })
701 .compute_size(&self.forgotten_topics_data)?;
702 }
703 } else {
704 if !self.forgotten_topics_data.is_empty() {
705 bail!("A field is set that is not available on the selected protocol version");
706 }
707 }
708 if version >= 11 {
709 if version >= 12 {
710 total_size += types::CompactString.compute_size(&self.rack_id)?;
711 } else {
712 total_size += types::String.compute_size(&self.rack_id)?;
713 }
714 }
715 if version >= 12 {
716 let mut num_tagged_fields = self.unknown_tagged_fields.len();
717 if !self.cluster_id.is_none() {
718 num_tagged_fields += 1;
719 }
720 if version >= 15 {
721 if &self.replica_state != &Default::default() {
722 num_tagged_fields += 1;
723 }
724 }
725 if num_tagged_fields > std::u32::MAX as usize {
726 bail!(
727 "Too many tagged fields to encode ({} fields)",
728 num_tagged_fields
729 );
730 }
731 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
732 if !self.cluster_id.is_none() {
733 let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
734 if computed_size > std::u32::MAX as usize {
735 bail!(
736 "Tagged field is too large to encode ({} bytes)",
737 computed_size
738 );
739 }
740 total_size += types::UnsignedVarInt.compute_size(0)?;
741 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
742 total_size += computed_size;
743 }
744 if version >= 15 {
745 if &self.replica_state != &Default::default() {
746 let computed_size =
747 types::Struct { version }.compute_size(&self.replica_state)?;
748 if computed_size > std::u32::MAX as usize {
749 bail!(
750 "Tagged field is too large to encode ({} bytes)",
751 computed_size
752 );
753 }
754 total_size += types::UnsignedVarInt.compute_size(1)?;
755 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
756 total_size += computed_size;
757 }
758 }
759 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
760 }
761 Ok(total_size)
762 }
763}
764
765#[cfg(feature = "broker")]
766impl Decodable for FetchRequest {
767 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
768 if version < 4 || version > 18 {
769 bail!("specified version not supported by this message type");
770 }
771 let mut cluster_id = None;
772 let replica_id = if version <= 14 {
773 types::Int32.decode(buf)?
774 } else {
775 (-1).into()
776 };
777 let mut replica_state = Default::default();
778 let max_wait_ms = types::Int32.decode(buf)?;
779 let min_bytes = types::Int32.decode(buf)?;
780 let max_bytes = types::Int32.decode(buf)?;
781 let isolation_level = types::Int8.decode(buf)?;
782 let session_id = if version >= 7 {
783 types::Int32.decode(buf)?
784 } else {
785 0
786 };
787 let session_epoch = if version >= 7 {
788 types::Int32.decode(buf)?
789 } else {
790 -1
791 };
792 let topics = if version >= 12 {
793 types::CompactArray(types::Struct { version }).decode(buf)?
794 } else {
795 types::Array(types::Struct { version }).decode(buf)?
796 };
797 let forgotten_topics_data = if version >= 7 {
798 if version >= 12 {
799 types::CompactArray(types::Struct { version }).decode(buf)?
800 } else {
801 types::Array(types::Struct { version }).decode(buf)?
802 }
803 } else {
804 Default::default()
805 };
806 let rack_id = if version >= 11 {
807 if version >= 12 {
808 types::CompactString.decode(buf)?
809 } else {
810 types::String.decode(buf)?
811 }
812 } else {
813 StrBytes::from_static_str("")
814 };
815 let mut unknown_tagged_fields = BTreeMap::new();
816 if version >= 12 {
817 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
818 for _ in 0..num_tagged_fields {
819 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
820 let size: u32 = types::UnsignedVarInt.decode(buf)?;
821 match tag {
822 0 => {
823 cluster_id = types::CompactString.decode(buf)?;
824 }
825 1 => {
826 if version >= 15 {
827 replica_state = types::Struct { version }.decode(buf)?;
828 } else {
829 bail!("Tag {} is not valid for version {}", tag, version);
830 }
831 }
832 _ => {
833 let unknown_value = buf.try_get_bytes(size as usize)?;
834 unknown_tagged_fields.insert(tag as i32, unknown_value);
835 }
836 }
837 }
838 }
839 Ok(Self {
840 cluster_id,
841 replica_id,
842 replica_state,
843 max_wait_ms,
844 min_bytes,
845 max_bytes,
846 isolation_level,
847 session_id,
848 session_epoch,
849 topics,
850 forgotten_topics_data,
851 rack_id,
852 unknown_tagged_fields,
853 })
854 }
855}
856
857impl Default for FetchRequest {
858 fn default() -> Self {
859 Self {
860 cluster_id: None,
861 replica_id: (-1).into(),
862 replica_state: Default::default(),
863 max_wait_ms: 0,
864 min_bytes: 0,
865 max_bytes: 0x7fffffff,
866 isolation_level: 0,
867 session_id: 0,
868 session_epoch: -1,
869 topics: Default::default(),
870 forgotten_topics_data: Default::default(),
871 rack_id: StrBytes::from_static_str(""),
872 unknown_tagged_fields: BTreeMap::new(),
873 }
874 }
875}
876
877impl Message for FetchRequest {
878 const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
879 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
880}
881
882#[non_exhaustive]
884#[derive(Debug, Clone, PartialEq)]
885pub struct FetchTopic {
886 pub topic: super::TopicName,
890
891 pub topic_id: Uuid,
895
896 pub partitions: Vec<FetchPartition>,
900
901 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
903}
904
905impl FetchTopic {
906 pub fn with_topic(mut self, value: super::TopicName) -> Self {
912 self.topic = value;
913 self
914 }
915 pub fn with_topic_id(mut self, value: Uuid) -> Self {
921 self.topic_id = value;
922 self
923 }
924 pub fn with_partitions(mut self, value: Vec<FetchPartition>) -> Self {
930 self.partitions = value;
931 self
932 }
933 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
935 self.unknown_tagged_fields = value;
936 self
937 }
938 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
940 self.unknown_tagged_fields.insert(key, value);
941 self
942 }
943}
944
945#[cfg(feature = "client")]
946impl Encodable for FetchTopic {
947 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
948 if version < 4 || version > 18 {
949 bail!("specified version not supported by this message type");
950 }
951 if version <= 12 {
952 if version >= 12 {
953 types::CompactString.encode(buf, &self.topic)?;
954 } else {
955 types::String.encode(buf, &self.topic)?;
956 }
957 }
958 if version >= 13 {
959 types::Uuid.encode(buf, &self.topic_id)?;
960 }
961 if version >= 12 {
962 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
963 } else {
964 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
965 }
966 if version >= 12 {
967 let num_tagged_fields = self.unknown_tagged_fields.len();
968 if num_tagged_fields > std::u32::MAX as usize {
969 bail!(
970 "Too many tagged fields to encode ({} fields)",
971 num_tagged_fields
972 );
973 }
974 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
975
976 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
977 }
978 Ok(())
979 }
980 fn compute_size(&self, version: i16) -> Result<usize> {
981 let mut total_size = 0;
982 if version <= 12 {
983 if version >= 12 {
984 total_size += types::CompactString.compute_size(&self.topic)?;
985 } else {
986 total_size += types::String.compute_size(&self.topic)?;
987 }
988 }
989 if version >= 13 {
990 total_size += types::Uuid.compute_size(&self.topic_id)?;
991 }
992 if version >= 12 {
993 total_size +=
994 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
995 } else {
996 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
997 }
998 if version >= 12 {
999 let num_tagged_fields = self.unknown_tagged_fields.len();
1000 if num_tagged_fields > std::u32::MAX as usize {
1001 bail!(
1002 "Too many tagged fields to encode ({} fields)",
1003 num_tagged_fields
1004 );
1005 }
1006 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1007
1008 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1009 }
1010 Ok(total_size)
1011 }
1012}
1013
1014#[cfg(feature = "broker")]
1015impl Decodable for FetchTopic {
1016 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1017 if version < 4 || version > 18 {
1018 bail!("specified version not supported by this message type");
1019 }
1020 let topic = if version <= 12 {
1021 if version >= 12 {
1022 types::CompactString.decode(buf)?
1023 } else {
1024 types::String.decode(buf)?
1025 }
1026 } else {
1027 Default::default()
1028 };
1029 let topic_id = if version >= 13 {
1030 types::Uuid.decode(buf)?
1031 } else {
1032 Uuid::nil()
1033 };
1034 let partitions = if version >= 12 {
1035 types::CompactArray(types::Struct { version }).decode(buf)?
1036 } else {
1037 types::Array(types::Struct { version }).decode(buf)?
1038 };
1039 let mut unknown_tagged_fields = BTreeMap::new();
1040 if version >= 12 {
1041 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1042 for _ in 0..num_tagged_fields {
1043 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1044 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1045 let unknown_value = buf.try_get_bytes(size as usize)?;
1046 unknown_tagged_fields.insert(tag as i32, unknown_value);
1047 }
1048 }
1049 Ok(Self {
1050 topic,
1051 topic_id,
1052 partitions,
1053 unknown_tagged_fields,
1054 })
1055 }
1056}
1057
1058impl Default for FetchTopic {
1059 fn default() -> Self {
1060 Self {
1061 topic: Default::default(),
1062 topic_id: Uuid::nil(),
1063 partitions: Default::default(),
1064 unknown_tagged_fields: BTreeMap::new(),
1065 }
1066 }
1067}
1068
1069impl Message for FetchTopic {
1070 const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
1071 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1072}
1073
1074#[non_exhaustive]
1076#[derive(Debug, Clone, PartialEq)]
1077pub struct ForgottenTopic {
1078 pub topic: super::TopicName,
1082
1083 pub topic_id: Uuid,
1087
1088 pub partitions: Vec<i32>,
1092
1093 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1095}
1096
1097impl ForgottenTopic {
1098 pub fn with_topic(mut self, value: super::TopicName) -> Self {
1104 self.topic = value;
1105 self
1106 }
1107 pub fn with_topic_id(mut self, value: Uuid) -> Self {
1113 self.topic_id = value;
1114 self
1115 }
1116 pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
1122 self.partitions = value;
1123 self
1124 }
1125 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1127 self.unknown_tagged_fields = value;
1128 self
1129 }
1130 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1132 self.unknown_tagged_fields.insert(key, value);
1133 self
1134 }
1135}
1136
1137#[cfg(feature = "client")]
1138impl Encodable for ForgottenTopic {
1139 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1140 if version < 4 || version > 18 {
1141 bail!("specified version not supported by this message type");
1142 }
1143 if version >= 7 && version <= 12 {
1144 if version >= 12 {
1145 types::CompactString.encode(buf, &self.topic)?;
1146 } else {
1147 types::String.encode(buf, &self.topic)?;
1148 }
1149 }
1150 if version >= 13 {
1151 types::Uuid.encode(buf, &self.topic_id)?;
1152 }
1153 if version >= 7 {
1154 if version >= 12 {
1155 types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
1156 } else {
1157 types::Array(types::Int32).encode(buf, &self.partitions)?;
1158 }
1159 } else {
1160 if !self.partitions.is_empty() {
1161 bail!("A field is set that is not available on the selected protocol version");
1162 }
1163 }
1164 if version >= 12 {
1165 let num_tagged_fields = self.unknown_tagged_fields.len();
1166 if num_tagged_fields > std::u32::MAX as usize {
1167 bail!(
1168 "Too many tagged fields to encode ({} fields)",
1169 num_tagged_fields
1170 );
1171 }
1172 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1173
1174 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1175 }
1176 Ok(())
1177 }
1178 fn compute_size(&self, version: i16) -> Result<usize> {
1179 let mut total_size = 0;
1180 if version >= 7 && version <= 12 {
1181 if version >= 12 {
1182 total_size += types::CompactString.compute_size(&self.topic)?;
1183 } else {
1184 total_size += types::String.compute_size(&self.topic)?;
1185 }
1186 }
1187 if version >= 13 {
1188 total_size += types::Uuid.compute_size(&self.topic_id)?;
1189 }
1190 if version >= 7 {
1191 if version >= 12 {
1192 total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
1193 } else {
1194 total_size += types::Array(types::Int32).compute_size(&self.partitions)?;
1195 }
1196 } else {
1197 if !self.partitions.is_empty() {
1198 bail!("A field is set that is not available on the selected protocol version");
1199 }
1200 }
1201 if version >= 12 {
1202 let num_tagged_fields = self.unknown_tagged_fields.len();
1203 if num_tagged_fields > std::u32::MAX as usize {
1204 bail!(
1205 "Too many tagged fields to encode ({} fields)",
1206 num_tagged_fields
1207 );
1208 }
1209 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1210
1211 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1212 }
1213 Ok(total_size)
1214 }
1215}
1216
1217#[cfg(feature = "broker")]
1218impl Decodable for ForgottenTopic {
1219 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1220 if version < 4 || version > 18 {
1221 bail!("specified version not supported by this message type");
1222 }
1223 let topic = if version >= 7 && version <= 12 {
1224 if version >= 12 {
1225 types::CompactString.decode(buf)?
1226 } else {
1227 types::String.decode(buf)?
1228 }
1229 } else {
1230 Default::default()
1231 };
1232 let topic_id = if version >= 13 {
1233 types::Uuid.decode(buf)?
1234 } else {
1235 Uuid::nil()
1236 };
1237 let partitions = if version >= 7 {
1238 if version >= 12 {
1239 types::CompactArray(types::Int32).decode(buf)?
1240 } else {
1241 types::Array(types::Int32).decode(buf)?
1242 }
1243 } else {
1244 Default::default()
1245 };
1246 let mut unknown_tagged_fields = BTreeMap::new();
1247 if version >= 12 {
1248 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1249 for _ in 0..num_tagged_fields {
1250 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1251 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1252 let unknown_value = buf.try_get_bytes(size as usize)?;
1253 unknown_tagged_fields.insert(tag as i32, unknown_value);
1254 }
1255 }
1256 Ok(Self {
1257 topic,
1258 topic_id,
1259 partitions,
1260 unknown_tagged_fields,
1261 })
1262 }
1263}
1264
1265impl Default for ForgottenTopic {
1266 fn default() -> Self {
1267 Self {
1268 topic: Default::default(),
1269 topic_id: Uuid::nil(),
1270 partitions: Default::default(),
1271 unknown_tagged_fields: BTreeMap::new(),
1272 }
1273 }
1274}
1275
1276impl Message for ForgottenTopic {
1277 const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
1278 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1279}
1280
1281#[non_exhaustive]
1283#[derive(Debug, Clone, PartialEq)]
1284pub struct ReplicaState {
1285 pub replica_id: super::BrokerId,
1289
1290 pub replica_epoch: i64,
1294
1295 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1297}
1298
1299impl ReplicaState {
1300 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
1306 self.replica_id = value;
1307 self
1308 }
1309 pub fn with_replica_epoch(mut self, value: i64) -> Self {
1315 self.replica_epoch = value;
1316 self
1317 }
1318 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1320 self.unknown_tagged_fields = value;
1321 self
1322 }
1323 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1325 self.unknown_tagged_fields.insert(key, value);
1326 self
1327 }
1328}
1329
1330#[cfg(feature = "client")]
1331impl Encodable for ReplicaState {
1332 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1333 if version < 4 || version > 18 {
1334 bail!("specified version not supported by this message type");
1335 }
1336 if version >= 15 {
1337 types::Int32.encode(buf, &self.replica_id)?;
1338 } else {
1339 if self.replica_id != -1 {
1340 bail!("A field is set that is not available on the selected protocol version");
1341 }
1342 }
1343 if version >= 15 {
1344 types::Int64.encode(buf, &self.replica_epoch)?;
1345 } else {
1346 if self.replica_epoch != -1 {
1347 bail!("A field is set that is not available on the selected protocol version");
1348 }
1349 }
1350 if version >= 12 {
1351 let num_tagged_fields = self.unknown_tagged_fields.len();
1352 if num_tagged_fields > std::u32::MAX as usize {
1353 bail!(
1354 "Too many tagged fields to encode ({} fields)",
1355 num_tagged_fields
1356 );
1357 }
1358 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1359
1360 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1361 }
1362 Ok(())
1363 }
1364 fn compute_size(&self, version: i16) -> Result<usize> {
1365 let mut total_size = 0;
1366 if version >= 15 {
1367 total_size += types::Int32.compute_size(&self.replica_id)?;
1368 } else {
1369 if self.replica_id != -1 {
1370 bail!("A field is set that is not available on the selected protocol version");
1371 }
1372 }
1373 if version >= 15 {
1374 total_size += types::Int64.compute_size(&self.replica_epoch)?;
1375 } else {
1376 if self.replica_epoch != -1 {
1377 bail!("A field is set that is not available on the selected protocol version");
1378 }
1379 }
1380 if version >= 12 {
1381 let num_tagged_fields = self.unknown_tagged_fields.len();
1382 if num_tagged_fields > std::u32::MAX as usize {
1383 bail!(
1384 "Too many tagged fields to encode ({} fields)",
1385 num_tagged_fields
1386 );
1387 }
1388 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1389
1390 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1391 }
1392 Ok(total_size)
1393 }
1394}
1395
1396#[cfg(feature = "broker")]
1397impl Decodable for ReplicaState {
1398 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1399 if version < 4 || version > 18 {
1400 bail!("specified version not supported by this message type");
1401 }
1402 let replica_id = if version >= 15 {
1403 types::Int32.decode(buf)?
1404 } else {
1405 (-1).into()
1406 };
1407 let replica_epoch = if version >= 15 {
1408 types::Int64.decode(buf)?
1409 } else {
1410 -1
1411 };
1412 let mut unknown_tagged_fields = BTreeMap::new();
1413 if version >= 12 {
1414 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1415 for _ in 0..num_tagged_fields {
1416 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1417 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1418 let unknown_value = buf.try_get_bytes(size as usize)?;
1419 unknown_tagged_fields.insert(tag as i32, unknown_value);
1420 }
1421 }
1422 Ok(Self {
1423 replica_id,
1424 replica_epoch,
1425 unknown_tagged_fields,
1426 })
1427 }
1428}
1429
1430impl Default for ReplicaState {
1431 fn default() -> Self {
1432 Self {
1433 replica_id: (-1).into(),
1434 replica_epoch: -1,
1435 unknown_tagged_fields: BTreeMap::new(),
1436 }
1437 }
1438}
1439
1440impl Message for ReplicaState {
1441 const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
1442 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1443}
1444
1445impl HeaderVersion for FetchRequest {
1446 fn header_version(version: i16) -> i16 {
1447 if version >= 12 {
1448 2
1449 } else {
1450 1
1451 }
1452 }
1453}