1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetFetchResponse {
24 pub throttle_time_ms: i32,
28
29 pub topics: Vec<OffsetFetchResponseTopic>,
33
34 pub error_code: i16,
38
39 pub groups: Vec<OffsetFetchResponseGroup>,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl OffsetFetchResponse {
49 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
55 self.throttle_time_ms = value;
56 self
57 }
58 pub fn with_topics(mut self, value: Vec<OffsetFetchResponseTopic>) -> Self {
64 self.topics = value;
65 self
66 }
67 pub fn with_error_code(mut self, value: i16) -> Self {
73 self.error_code = value;
74 self
75 }
76 pub fn with_groups(mut self, value: Vec<OffsetFetchResponseGroup>) -> Self {
82 self.groups = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "broker")]
98impl Encodable for OffsetFetchResponse {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 if version >= 3 {
101 types::Int32.encode(buf, &self.throttle_time_ms)?;
102 }
103 if version <= 7 {
104 if version >= 6 {
105 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
106 } else {
107 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
108 }
109 } else {
110 if !self.topics.is_empty() {
111 bail!("A field is set that is not available on the selected protocol version");
112 }
113 }
114 if version >= 2 && version <= 7 {
115 types::Int16.encode(buf, &self.error_code)?;
116 }
117 if version >= 8 {
118 types::CompactArray(types::Struct { version }).encode(buf, &self.groups)?;
119 } else {
120 if !self.groups.is_empty() {
121 bail!("A field is set that is not available on the selected protocol version");
122 }
123 }
124 if version >= 6 {
125 let num_tagged_fields = self.unknown_tagged_fields.len();
126 if num_tagged_fields > std::u32::MAX as usize {
127 bail!(
128 "Too many tagged fields to encode ({} fields)",
129 num_tagged_fields
130 );
131 }
132 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
133
134 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
135 }
136 Ok(())
137 }
138 fn compute_size(&self, version: i16) -> Result<usize> {
139 let mut total_size = 0;
140 if version >= 3 {
141 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
142 }
143 if version <= 7 {
144 if version >= 6 {
145 total_size +=
146 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
147 } else {
148 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
149 }
150 } else {
151 if !self.topics.is_empty() {
152 bail!("A field is set that is not available on the selected protocol version");
153 }
154 }
155 if version >= 2 && version <= 7 {
156 total_size += types::Int16.compute_size(&self.error_code)?;
157 }
158 if version >= 8 {
159 total_size +=
160 types::CompactArray(types::Struct { version }).compute_size(&self.groups)?;
161 } else {
162 if !self.groups.is_empty() {
163 bail!("A field is set that is not available on the selected protocol version");
164 }
165 }
166 if version >= 6 {
167 let num_tagged_fields = self.unknown_tagged_fields.len();
168 if num_tagged_fields > std::u32::MAX as usize {
169 bail!(
170 "Too many tagged fields to encode ({} fields)",
171 num_tagged_fields
172 );
173 }
174 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
175
176 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
177 }
178 Ok(total_size)
179 }
180}
181
182#[cfg(feature = "client")]
183impl Decodable for OffsetFetchResponse {
184 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
185 let throttle_time_ms = if version >= 3 {
186 types::Int32.decode(buf)?
187 } else {
188 0
189 };
190 let topics = if version <= 7 {
191 if version >= 6 {
192 types::CompactArray(types::Struct { version }).decode(buf)?
193 } else {
194 types::Array(types::Struct { version }).decode(buf)?
195 }
196 } else {
197 Default::default()
198 };
199 let error_code = if version >= 2 && version <= 7 {
200 types::Int16.decode(buf)?
201 } else {
202 0
203 };
204 let groups = if version >= 8 {
205 types::CompactArray(types::Struct { version }).decode(buf)?
206 } else {
207 Default::default()
208 };
209 let mut unknown_tagged_fields = BTreeMap::new();
210 if version >= 6 {
211 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
212 for _ in 0..num_tagged_fields {
213 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
214 let size: u32 = types::UnsignedVarInt.decode(buf)?;
215 let unknown_value = buf.try_get_bytes(size as usize)?;
216 unknown_tagged_fields.insert(tag as i32, unknown_value);
217 }
218 }
219 Ok(Self {
220 throttle_time_ms,
221 topics,
222 error_code,
223 groups,
224 unknown_tagged_fields,
225 })
226 }
227}
228
229impl Default for OffsetFetchResponse {
230 fn default() -> Self {
231 Self {
232 throttle_time_ms: 0,
233 topics: Default::default(),
234 error_code: 0,
235 groups: Default::default(),
236 unknown_tagged_fields: BTreeMap::new(),
237 }
238 }
239}
240
241impl Message for OffsetFetchResponse {
242 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
243 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
244}
245
246#[non_exhaustive]
248#[derive(Debug, Clone, PartialEq)]
249pub struct OffsetFetchResponseGroup {
250 pub group_id: super::GroupId,
254
255 pub topics: Vec<OffsetFetchResponseTopics>,
259
260 pub error_code: i16,
264
265 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
267}
268
269impl OffsetFetchResponseGroup {
270 pub fn with_group_id(mut self, value: super::GroupId) -> Self {
276 self.group_id = value;
277 self
278 }
279 pub fn with_topics(mut self, value: Vec<OffsetFetchResponseTopics>) -> Self {
285 self.topics = value;
286 self
287 }
288 pub fn with_error_code(mut self, value: i16) -> Self {
294 self.error_code = value;
295 self
296 }
297 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
299 self.unknown_tagged_fields = value;
300 self
301 }
302 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
304 self.unknown_tagged_fields.insert(key, value);
305 self
306 }
307}
308
309#[cfg(feature = "broker")]
310impl Encodable for OffsetFetchResponseGroup {
311 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
312 if version >= 8 {
313 types::CompactString.encode(buf, &self.group_id)?;
314 } else {
315 if !self.group_id.is_empty() {
316 bail!("A field is set that is not available on the selected protocol version");
317 }
318 }
319 if version >= 8 {
320 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
321 } else {
322 if !self.topics.is_empty() {
323 bail!("A field is set that is not available on the selected protocol version");
324 }
325 }
326 if version >= 8 {
327 types::Int16.encode(buf, &self.error_code)?;
328 } else {
329 if self.error_code != 0 {
330 bail!("A field is set that is not available on the selected protocol version");
331 }
332 }
333 if version >= 6 {
334 let num_tagged_fields = self.unknown_tagged_fields.len();
335 if num_tagged_fields > std::u32::MAX as usize {
336 bail!(
337 "Too many tagged fields to encode ({} fields)",
338 num_tagged_fields
339 );
340 }
341 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
342
343 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
344 }
345 Ok(())
346 }
347 fn compute_size(&self, version: i16) -> Result<usize> {
348 let mut total_size = 0;
349 if version >= 8 {
350 total_size += types::CompactString.compute_size(&self.group_id)?;
351 } else {
352 if !self.group_id.is_empty() {
353 bail!("A field is set that is not available on the selected protocol version");
354 }
355 }
356 if version >= 8 {
357 total_size +=
358 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
359 } else {
360 if !self.topics.is_empty() {
361 bail!("A field is set that is not available on the selected protocol version");
362 }
363 }
364 if version >= 8 {
365 total_size += types::Int16.compute_size(&self.error_code)?;
366 } else {
367 if self.error_code != 0 {
368 bail!("A field is set that is not available on the selected protocol version");
369 }
370 }
371 if version >= 6 {
372 let num_tagged_fields = self.unknown_tagged_fields.len();
373 if num_tagged_fields > std::u32::MAX as usize {
374 bail!(
375 "Too many tagged fields to encode ({} fields)",
376 num_tagged_fields
377 );
378 }
379 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
380
381 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
382 }
383 Ok(total_size)
384 }
385}
386
387#[cfg(feature = "client")]
388impl Decodable for OffsetFetchResponseGroup {
389 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
390 let group_id = if version >= 8 {
391 types::CompactString.decode(buf)?
392 } else {
393 Default::default()
394 };
395 let topics = if version >= 8 {
396 types::CompactArray(types::Struct { version }).decode(buf)?
397 } else {
398 Default::default()
399 };
400 let error_code = if version >= 8 {
401 types::Int16.decode(buf)?
402 } else {
403 0
404 };
405 let mut unknown_tagged_fields = BTreeMap::new();
406 if version >= 6 {
407 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
408 for _ in 0..num_tagged_fields {
409 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
410 let size: u32 = types::UnsignedVarInt.decode(buf)?;
411 let unknown_value = buf.try_get_bytes(size as usize)?;
412 unknown_tagged_fields.insert(tag as i32, unknown_value);
413 }
414 }
415 Ok(Self {
416 group_id,
417 topics,
418 error_code,
419 unknown_tagged_fields,
420 })
421 }
422}
423
424impl Default for OffsetFetchResponseGroup {
425 fn default() -> Self {
426 Self {
427 group_id: Default::default(),
428 topics: Default::default(),
429 error_code: 0,
430 unknown_tagged_fields: BTreeMap::new(),
431 }
432 }
433}
434
435impl Message for OffsetFetchResponseGroup {
436 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
437 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
438}
439
440#[non_exhaustive]
442#[derive(Debug, Clone, PartialEq)]
443pub struct OffsetFetchResponsePartition {
444 pub partition_index: i32,
448
449 pub committed_offset: i64,
453
454 pub committed_leader_epoch: i32,
458
459 pub metadata: Option<StrBytes>,
463
464 pub error_code: i16,
468
469 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
471}
472
473impl OffsetFetchResponsePartition {
474 pub fn with_partition_index(mut self, value: i32) -> Self {
480 self.partition_index = value;
481 self
482 }
483 pub fn with_committed_offset(mut self, value: i64) -> Self {
489 self.committed_offset = value;
490 self
491 }
492 pub fn with_committed_leader_epoch(mut self, value: i32) -> Self {
498 self.committed_leader_epoch = value;
499 self
500 }
501 pub fn with_metadata(mut self, value: Option<StrBytes>) -> Self {
507 self.metadata = value;
508 self
509 }
510 pub fn with_error_code(mut self, value: i16) -> Self {
516 self.error_code = value;
517 self
518 }
519 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
521 self.unknown_tagged_fields = value;
522 self
523 }
524 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
526 self.unknown_tagged_fields.insert(key, value);
527 self
528 }
529}
530
531#[cfg(feature = "broker")]
532impl Encodable for OffsetFetchResponsePartition {
533 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
534 if version <= 7 {
535 types::Int32.encode(buf, &self.partition_index)?;
536 } else {
537 if self.partition_index != 0 {
538 bail!("A field is set that is not available on the selected protocol version");
539 }
540 }
541 if version <= 7 {
542 types::Int64.encode(buf, &self.committed_offset)?;
543 } else {
544 if self.committed_offset != 0 {
545 bail!("A field is set that is not available on the selected protocol version");
546 }
547 }
548 if version >= 5 && version <= 7 {
549 types::Int32.encode(buf, &self.committed_leader_epoch)?;
550 }
551 if version <= 7 {
552 if version >= 6 {
553 types::CompactString.encode(buf, &self.metadata)?;
554 } else {
555 types::String.encode(buf, &self.metadata)?;
556 }
557 } else {
558 if !self
559 .metadata
560 .as_ref()
561 .map(|x| x.is_empty())
562 .unwrap_or_default()
563 {
564 bail!("A field is set that is not available on the selected protocol version");
565 }
566 }
567 if version <= 7 {
568 types::Int16.encode(buf, &self.error_code)?;
569 } else {
570 if self.error_code != 0 {
571 bail!("A field is set that is not available on the selected protocol version");
572 }
573 }
574 if version >= 6 {
575 let num_tagged_fields = self.unknown_tagged_fields.len();
576 if num_tagged_fields > std::u32::MAX as usize {
577 bail!(
578 "Too many tagged fields to encode ({} fields)",
579 num_tagged_fields
580 );
581 }
582 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
583
584 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
585 }
586 Ok(())
587 }
588 fn compute_size(&self, version: i16) -> Result<usize> {
589 let mut total_size = 0;
590 if version <= 7 {
591 total_size += types::Int32.compute_size(&self.partition_index)?;
592 } else {
593 if self.partition_index != 0 {
594 bail!("A field is set that is not available on the selected protocol version");
595 }
596 }
597 if version <= 7 {
598 total_size += types::Int64.compute_size(&self.committed_offset)?;
599 } else {
600 if self.committed_offset != 0 {
601 bail!("A field is set that is not available on the selected protocol version");
602 }
603 }
604 if version >= 5 && version <= 7 {
605 total_size += types::Int32.compute_size(&self.committed_leader_epoch)?;
606 }
607 if version <= 7 {
608 if version >= 6 {
609 total_size += types::CompactString.compute_size(&self.metadata)?;
610 } else {
611 total_size += types::String.compute_size(&self.metadata)?;
612 }
613 } else {
614 if !self
615 .metadata
616 .as_ref()
617 .map(|x| x.is_empty())
618 .unwrap_or_default()
619 {
620 bail!("A field is set that is not available on the selected protocol version");
621 }
622 }
623 if version <= 7 {
624 total_size += types::Int16.compute_size(&self.error_code)?;
625 } else {
626 if self.error_code != 0 {
627 bail!("A field is set that is not available on the selected protocol version");
628 }
629 }
630 if version >= 6 {
631 let num_tagged_fields = self.unknown_tagged_fields.len();
632 if num_tagged_fields > std::u32::MAX as usize {
633 bail!(
634 "Too many tagged fields to encode ({} fields)",
635 num_tagged_fields
636 );
637 }
638 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
639
640 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
641 }
642 Ok(total_size)
643 }
644}
645
646#[cfg(feature = "client")]
647impl Decodable for OffsetFetchResponsePartition {
648 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
649 let partition_index = if version <= 7 {
650 types::Int32.decode(buf)?
651 } else {
652 0
653 };
654 let committed_offset = if version <= 7 {
655 types::Int64.decode(buf)?
656 } else {
657 0
658 };
659 let committed_leader_epoch = if version >= 5 && version <= 7 {
660 types::Int32.decode(buf)?
661 } else {
662 -1
663 };
664 let metadata = if version <= 7 {
665 if version >= 6 {
666 types::CompactString.decode(buf)?
667 } else {
668 types::String.decode(buf)?
669 }
670 } else {
671 Some(Default::default())
672 };
673 let error_code = if version <= 7 {
674 types::Int16.decode(buf)?
675 } else {
676 0
677 };
678 let mut unknown_tagged_fields = BTreeMap::new();
679 if version >= 6 {
680 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
681 for _ in 0..num_tagged_fields {
682 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
683 let size: u32 = types::UnsignedVarInt.decode(buf)?;
684 let unknown_value = buf.try_get_bytes(size as usize)?;
685 unknown_tagged_fields.insert(tag as i32, unknown_value);
686 }
687 }
688 Ok(Self {
689 partition_index,
690 committed_offset,
691 committed_leader_epoch,
692 metadata,
693 error_code,
694 unknown_tagged_fields,
695 })
696 }
697}
698
699impl Default for OffsetFetchResponsePartition {
700 fn default() -> Self {
701 Self {
702 partition_index: 0,
703 committed_offset: 0,
704 committed_leader_epoch: -1,
705 metadata: Some(Default::default()),
706 error_code: 0,
707 unknown_tagged_fields: BTreeMap::new(),
708 }
709 }
710}
711
712impl Message for OffsetFetchResponsePartition {
713 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
714 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
715}
716
717#[non_exhaustive]
719#[derive(Debug, Clone, PartialEq)]
720pub struct OffsetFetchResponsePartitions {
721 pub partition_index: i32,
725
726 pub committed_offset: i64,
730
731 pub committed_leader_epoch: i32,
735
736 pub metadata: Option<StrBytes>,
740
741 pub error_code: i16,
745
746 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
748}
749
750impl OffsetFetchResponsePartitions {
751 pub fn with_partition_index(mut self, value: i32) -> Self {
757 self.partition_index = value;
758 self
759 }
760 pub fn with_committed_offset(mut self, value: i64) -> Self {
766 self.committed_offset = value;
767 self
768 }
769 pub fn with_committed_leader_epoch(mut self, value: i32) -> Self {
775 self.committed_leader_epoch = value;
776 self
777 }
778 pub fn with_metadata(mut self, value: Option<StrBytes>) -> Self {
784 self.metadata = value;
785 self
786 }
787 pub fn with_error_code(mut self, value: i16) -> Self {
793 self.error_code = value;
794 self
795 }
796 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
798 self.unknown_tagged_fields = value;
799 self
800 }
801 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
803 self.unknown_tagged_fields.insert(key, value);
804 self
805 }
806}
807
808#[cfg(feature = "broker")]
809impl Encodable for OffsetFetchResponsePartitions {
810 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
811 if version >= 8 {
812 types::Int32.encode(buf, &self.partition_index)?;
813 } else {
814 if self.partition_index != 0 {
815 bail!("A field is set that is not available on the selected protocol version");
816 }
817 }
818 if version >= 8 {
819 types::Int64.encode(buf, &self.committed_offset)?;
820 } else {
821 if self.committed_offset != 0 {
822 bail!("A field is set that is not available on the selected protocol version");
823 }
824 }
825 if version >= 8 {
826 types::Int32.encode(buf, &self.committed_leader_epoch)?;
827 }
828 if version >= 8 {
829 types::CompactString.encode(buf, &self.metadata)?;
830 } else {
831 if !self
832 .metadata
833 .as_ref()
834 .map(|x| x.is_empty())
835 .unwrap_or_default()
836 {
837 bail!("A field is set that is not available on the selected protocol version");
838 }
839 }
840 if version >= 8 {
841 types::Int16.encode(buf, &self.error_code)?;
842 } else {
843 if self.error_code != 0 {
844 bail!("A field is set that is not available on the selected protocol version");
845 }
846 }
847 if version >= 6 {
848 let num_tagged_fields = self.unknown_tagged_fields.len();
849 if num_tagged_fields > std::u32::MAX as usize {
850 bail!(
851 "Too many tagged fields to encode ({} fields)",
852 num_tagged_fields
853 );
854 }
855 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
856
857 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
858 }
859 Ok(())
860 }
861 fn compute_size(&self, version: i16) -> Result<usize> {
862 let mut total_size = 0;
863 if version >= 8 {
864 total_size += types::Int32.compute_size(&self.partition_index)?;
865 } else {
866 if self.partition_index != 0 {
867 bail!("A field is set that is not available on the selected protocol version");
868 }
869 }
870 if version >= 8 {
871 total_size += types::Int64.compute_size(&self.committed_offset)?;
872 } else {
873 if self.committed_offset != 0 {
874 bail!("A field is set that is not available on the selected protocol version");
875 }
876 }
877 if version >= 8 {
878 total_size += types::Int32.compute_size(&self.committed_leader_epoch)?;
879 }
880 if version >= 8 {
881 total_size += types::CompactString.compute_size(&self.metadata)?;
882 } else {
883 if !self
884 .metadata
885 .as_ref()
886 .map(|x| x.is_empty())
887 .unwrap_or_default()
888 {
889 bail!("A field is set that is not available on the selected protocol version");
890 }
891 }
892 if version >= 8 {
893 total_size += types::Int16.compute_size(&self.error_code)?;
894 } else {
895 if self.error_code != 0 {
896 bail!("A field is set that is not available on the selected protocol version");
897 }
898 }
899 if version >= 6 {
900 let num_tagged_fields = self.unknown_tagged_fields.len();
901 if num_tagged_fields > std::u32::MAX as usize {
902 bail!(
903 "Too many tagged fields to encode ({} fields)",
904 num_tagged_fields
905 );
906 }
907 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
908
909 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
910 }
911 Ok(total_size)
912 }
913}
914
915#[cfg(feature = "client")]
916impl Decodable for OffsetFetchResponsePartitions {
917 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
918 let partition_index = if version >= 8 {
919 types::Int32.decode(buf)?
920 } else {
921 0
922 };
923 let committed_offset = if version >= 8 {
924 types::Int64.decode(buf)?
925 } else {
926 0
927 };
928 let committed_leader_epoch = if version >= 8 {
929 types::Int32.decode(buf)?
930 } else {
931 -1
932 };
933 let metadata = if version >= 8 {
934 types::CompactString.decode(buf)?
935 } else {
936 Some(Default::default())
937 };
938 let error_code = if version >= 8 {
939 types::Int16.decode(buf)?
940 } else {
941 0
942 };
943 let mut unknown_tagged_fields = BTreeMap::new();
944 if version >= 6 {
945 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
946 for _ in 0..num_tagged_fields {
947 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
948 let size: u32 = types::UnsignedVarInt.decode(buf)?;
949 let unknown_value = buf.try_get_bytes(size as usize)?;
950 unknown_tagged_fields.insert(tag as i32, unknown_value);
951 }
952 }
953 Ok(Self {
954 partition_index,
955 committed_offset,
956 committed_leader_epoch,
957 metadata,
958 error_code,
959 unknown_tagged_fields,
960 })
961 }
962}
963
964impl Default for OffsetFetchResponsePartitions {
965 fn default() -> Self {
966 Self {
967 partition_index: 0,
968 committed_offset: 0,
969 committed_leader_epoch: -1,
970 metadata: Some(Default::default()),
971 error_code: 0,
972 unknown_tagged_fields: BTreeMap::new(),
973 }
974 }
975}
976
977impl Message for OffsetFetchResponsePartitions {
978 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
979 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
980}
981
982#[non_exhaustive]
984#[derive(Debug, Clone, PartialEq)]
985pub struct OffsetFetchResponseTopic {
986 pub name: super::TopicName,
990
991 pub partitions: Vec<OffsetFetchResponsePartition>,
995
996 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
998}
999
1000impl OffsetFetchResponseTopic {
1001 pub fn with_name(mut self, value: super::TopicName) -> Self {
1007 self.name = value;
1008 self
1009 }
1010 pub fn with_partitions(mut self, value: Vec<OffsetFetchResponsePartition>) -> Self {
1016 self.partitions = value;
1017 self
1018 }
1019 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1021 self.unknown_tagged_fields = value;
1022 self
1023 }
1024 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1026 self.unknown_tagged_fields.insert(key, value);
1027 self
1028 }
1029}
1030
1031#[cfg(feature = "broker")]
1032impl Encodable for OffsetFetchResponseTopic {
1033 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1034 if version <= 7 {
1035 if version >= 6 {
1036 types::CompactString.encode(buf, &self.name)?;
1037 } else {
1038 types::String.encode(buf, &self.name)?;
1039 }
1040 } else {
1041 if !self.name.is_empty() {
1042 bail!("A field is set that is not available on the selected protocol version");
1043 }
1044 }
1045 if version <= 7 {
1046 if version >= 6 {
1047 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
1048 } else {
1049 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
1050 }
1051 } else {
1052 if !self.partitions.is_empty() {
1053 bail!("A field is set that is not available on the selected protocol version");
1054 }
1055 }
1056 if version >= 6 {
1057 let num_tagged_fields = self.unknown_tagged_fields.len();
1058 if num_tagged_fields > std::u32::MAX as usize {
1059 bail!(
1060 "Too many tagged fields to encode ({} fields)",
1061 num_tagged_fields
1062 );
1063 }
1064 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1065
1066 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1067 }
1068 Ok(())
1069 }
1070 fn compute_size(&self, version: i16) -> Result<usize> {
1071 let mut total_size = 0;
1072 if version <= 7 {
1073 if version >= 6 {
1074 total_size += types::CompactString.compute_size(&self.name)?;
1075 } else {
1076 total_size += types::String.compute_size(&self.name)?;
1077 }
1078 } else {
1079 if !self.name.is_empty() {
1080 bail!("A field is set that is not available on the selected protocol version");
1081 }
1082 }
1083 if version <= 7 {
1084 if version >= 6 {
1085 total_size += types::CompactArray(types::Struct { version })
1086 .compute_size(&self.partitions)?;
1087 } else {
1088 total_size +=
1089 types::Array(types::Struct { version }).compute_size(&self.partitions)?;
1090 }
1091 } else {
1092 if !self.partitions.is_empty() {
1093 bail!("A field is set that is not available on the selected protocol version");
1094 }
1095 }
1096 if version >= 6 {
1097 let num_tagged_fields = self.unknown_tagged_fields.len();
1098 if num_tagged_fields > std::u32::MAX as usize {
1099 bail!(
1100 "Too many tagged fields to encode ({} fields)",
1101 num_tagged_fields
1102 );
1103 }
1104 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1105
1106 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1107 }
1108 Ok(total_size)
1109 }
1110}
1111
1112#[cfg(feature = "client")]
1113impl Decodable for OffsetFetchResponseTopic {
1114 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1115 let name = if version <= 7 {
1116 if version >= 6 {
1117 types::CompactString.decode(buf)?
1118 } else {
1119 types::String.decode(buf)?
1120 }
1121 } else {
1122 Default::default()
1123 };
1124 let partitions = if version <= 7 {
1125 if version >= 6 {
1126 types::CompactArray(types::Struct { version }).decode(buf)?
1127 } else {
1128 types::Array(types::Struct { version }).decode(buf)?
1129 }
1130 } else {
1131 Default::default()
1132 };
1133 let mut unknown_tagged_fields = BTreeMap::new();
1134 if version >= 6 {
1135 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1136 for _ in 0..num_tagged_fields {
1137 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1138 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1139 let unknown_value = buf.try_get_bytes(size as usize)?;
1140 unknown_tagged_fields.insert(tag as i32, unknown_value);
1141 }
1142 }
1143 Ok(Self {
1144 name,
1145 partitions,
1146 unknown_tagged_fields,
1147 })
1148 }
1149}
1150
1151impl Default for OffsetFetchResponseTopic {
1152 fn default() -> Self {
1153 Self {
1154 name: Default::default(),
1155 partitions: Default::default(),
1156 unknown_tagged_fields: BTreeMap::new(),
1157 }
1158 }
1159}
1160
1161impl Message for OffsetFetchResponseTopic {
1162 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
1163 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1164}
1165
1166#[non_exhaustive]
1168#[derive(Debug, Clone, PartialEq)]
1169pub struct OffsetFetchResponseTopics {
1170 pub name: super::TopicName,
1174
1175 pub partitions: Vec<OffsetFetchResponsePartitions>,
1179
1180 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1182}
1183
1184impl OffsetFetchResponseTopics {
1185 pub fn with_name(mut self, value: super::TopicName) -> Self {
1191 self.name = value;
1192 self
1193 }
1194 pub fn with_partitions(mut self, value: Vec<OffsetFetchResponsePartitions>) -> Self {
1200 self.partitions = value;
1201 self
1202 }
1203 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1205 self.unknown_tagged_fields = value;
1206 self
1207 }
1208 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1210 self.unknown_tagged_fields.insert(key, value);
1211 self
1212 }
1213}
1214
1215#[cfg(feature = "broker")]
1216impl Encodable for OffsetFetchResponseTopics {
1217 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1218 if version >= 8 {
1219 types::CompactString.encode(buf, &self.name)?;
1220 } else {
1221 if !self.name.is_empty() {
1222 bail!("A field is set that is not available on the selected protocol version");
1223 }
1224 }
1225 if version >= 8 {
1226 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
1227 } else {
1228 if !self.partitions.is_empty() {
1229 bail!("A field is set that is not available on the selected protocol version");
1230 }
1231 }
1232 if version >= 6 {
1233 let num_tagged_fields = self.unknown_tagged_fields.len();
1234 if num_tagged_fields > std::u32::MAX as usize {
1235 bail!(
1236 "Too many tagged fields to encode ({} fields)",
1237 num_tagged_fields
1238 );
1239 }
1240 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1241
1242 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1243 }
1244 Ok(())
1245 }
1246 fn compute_size(&self, version: i16) -> Result<usize> {
1247 let mut total_size = 0;
1248 if version >= 8 {
1249 total_size += types::CompactString.compute_size(&self.name)?;
1250 } else {
1251 if !self.name.is_empty() {
1252 bail!("A field is set that is not available on the selected protocol version");
1253 }
1254 }
1255 if version >= 8 {
1256 total_size +=
1257 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
1258 } else {
1259 if !self.partitions.is_empty() {
1260 bail!("A field is set that is not available on the selected protocol version");
1261 }
1262 }
1263 if version >= 6 {
1264 let num_tagged_fields = self.unknown_tagged_fields.len();
1265 if num_tagged_fields > std::u32::MAX as usize {
1266 bail!(
1267 "Too many tagged fields to encode ({} fields)",
1268 num_tagged_fields
1269 );
1270 }
1271 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1272
1273 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1274 }
1275 Ok(total_size)
1276 }
1277}
1278
1279#[cfg(feature = "client")]
1280impl Decodable for OffsetFetchResponseTopics {
1281 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1282 let name = if version >= 8 {
1283 types::CompactString.decode(buf)?
1284 } else {
1285 Default::default()
1286 };
1287 let partitions = if version >= 8 {
1288 types::CompactArray(types::Struct { version }).decode(buf)?
1289 } else {
1290 Default::default()
1291 };
1292 let mut unknown_tagged_fields = BTreeMap::new();
1293 if version >= 6 {
1294 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1295 for _ in 0..num_tagged_fields {
1296 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1297 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1298 let unknown_value = buf.try_get_bytes(size as usize)?;
1299 unknown_tagged_fields.insert(tag as i32, unknown_value);
1300 }
1301 }
1302 Ok(Self {
1303 name,
1304 partitions,
1305 unknown_tagged_fields,
1306 })
1307 }
1308}
1309
1310impl Default for OffsetFetchResponseTopics {
1311 fn default() -> Self {
1312 Self {
1313 name: Default::default(),
1314 partitions: Default::default(),
1315 unknown_tagged_fields: BTreeMap::new(),
1316 }
1317 }
1318}
1319
1320impl Message for OffsetFetchResponseTopics {
1321 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
1322 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1323}
1324
1325impl HeaderVersion for OffsetFetchResponse {
1326 fn header_version(version: i16) -> i16 {
1327 if version >= 6 {
1328 1
1329 } else {
1330 0
1331 }
1332 }
1333}