1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetFetchResponse {
24 pub throttle_time_ms: i32,
28
29 pub topics: Vec<OffsetFetchResponseTopic>,
33
34 pub error_code: i16,
38
39 pub groups: Vec<OffsetFetchResponseGroup>,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl OffsetFetchResponse {
49 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
55 self.throttle_time_ms = value;
56 self
57 }
58 pub fn with_topics(mut self, value: Vec<OffsetFetchResponseTopic>) -> Self {
64 self.topics = value;
65 self
66 }
67 pub fn with_error_code(mut self, value: i16) -> Self {
73 self.error_code = value;
74 self
75 }
76 pub fn with_groups(mut self, value: Vec<OffsetFetchResponseGroup>) -> Self {
82 self.groups = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "broker")]
98impl Encodable for OffsetFetchResponse {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 if version < 0 || version > 9 {
101 bail!("specified version not supported by this message type");
102 }
103 if version >= 3 {
104 types::Int32.encode(buf, &self.throttle_time_ms)?;
105 }
106 if version <= 7 {
107 if version >= 6 {
108 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
109 } else {
110 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
111 }
112 } else {
113 if !self.topics.is_empty() {
114 bail!("A field is set that is not available on the selected protocol version");
115 }
116 }
117 if version >= 2 && version <= 7 {
118 types::Int16.encode(buf, &self.error_code)?;
119 }
120 if version >= 8 {
121 types::CompactArray(types::Struct { version }).encode(buf, &self.groups)?;
122 } else {
123 if !self.groups.is_empty() {
124 bail!("A field is set that is not available on the selected protocol version");
125 }
126 }
127 if version >= 6 {
128 let num_tagged_fields = self.unknown_tagged_fields.len();
129 if num_tagged_fields > std::u32::MAX as usize {
130 bail!(
131 "Too many tagged fields to encode ({} fields)",
132 num_tagged_fields
133 );
134 }
135 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
136
137 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
138 }
139 Ok(())
140 }
141 fn compute_size(&self, version: i16) -> Result<usize> {
142 let mut total_size = 0;
143 if version >= 3 {
144 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
145 }
146 if version <= 7 {
147 if version >= 6 {
148 total_size +=
149 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
150 } else {
151 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
152 }
153 } else {
154 if !self.topics.is_empty() {
155 bail!("A field is set that is not available on the selected protocol version");
156 }
157 }
158 if version >= 2 && version <= 7 {
159 total_size += types::Int16.compute_size(&self.error_code)?;
160 }
161 if version >= 8 {
162 total_size +=
163 types::CompactArray(types::Struct { version }).compute_size(&self.groups)?;
164 } else {
165 if !self.groups.is_empty() {
166 bail!("A field is set that is not available on the selected protocol version");
167 }
168 }
169 if version >= 6 {
170 let num_tagged_fields = self.unknown_tagged_fields.len();
171 if num_tagged_fields > std::u32::MAX as usize {
172 bail!(
173 "Too many tagged fields to encode ({} fields)",
174 num_tagged_fields
175 );
176 }
177 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
178
179 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
180 }
181 Ok(total_size)
182 }
183}
184
185#[cfg(feature = "client")]
186impl Decodable for OffsetFetchResponse {
187 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
188 if version < 0 || version > 9 {
189 bail!("specified version not supported by this message type");
190 }
191 let throttle_time_ms = if version >= 3 {
192 types::Int32.decode(buf)?
193 } else {
194 0
195 };
196 let topics = if version <= 7 {
197 if version >= 6 {
198 types::CompactArray(types::Struct { version }).decode(buf)?
199 } else {
200 types::Array(types::Struct { version }).decode(buf)?
201 }
202 } else {
203 Default::default()
204 };
205 let error_code = if version >= 2 && version <= 7 {
206 types::Int16.decode(buf)?
207 } else {
208 0
209 };
210 let groups = if version >= 8 {
211 types::CompactArray(types::Struct { version }).decode(buf)?
212 } else {
213 Default::default()
214 };
215 let mut unknown_tagged_fields = BTreeMap::new();
216 if version >= 6 {
217 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
218 for _ in 0..num_tagged_fields {
219 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
220 let size: u32 = types::UnsignedVarInt.decode(buf)?;
221 let unknown_value = buf.try_get_bytes(size as usize)?;
222 unknown_tagged_fields.insert(tag as i32, unknown_value);
223 }
224 }
225 Ok(Self {
226 throttle_time_ms,
227 topics,
228 error_code,
229 groups,
230 unknown_tagged_fields,
231 })
232 }
233}
234
235impl Default for OffsetFetchResponse {
236 fn default() -> Self {
237 Self {
238 throttle_time_ms: 0,
239 topics: Default::default(),
240 error_code: 0,
241 groups: Default::default(),
242 unknown_tagged_fields: BTreeMap::new(),
243 }
244 }
245}
246
247impl Message for OffsetFetchResponse {
248 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
249 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
250}
251
252#[non_exhaustive]
254#[derive(Debug, Clone, PartialEq)]
255pub struct OffsetFetchResponseGroup {
256 pub group_id: super::GroupId,
260
261 pub topics: Vec<OffsetFetchResponseTopics>,
265
266 pub error_code: i16,
270
271 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
273}
274
275impl OffsetFetchResponseGroup {
276 pub fn with_group_id(mut self, value: super::GroupId) -> Self {
282 self.group_id = value;
283 self
284 }
285 pub fn with_topics(mut self, value: Vec<OffsetFetchResponseTopics>) -> Self {
291 self.topics = value;
292 self
293 }
294 pub fn with_error_code(mut self, value: i16) -> Self {
300 self.error_code = value;
301 self
302 }
303 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
305 self.unknown_tagged_fields = value;
306 self
307 }
308 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
310 self.unknown_tagged_fields.insert(key, value);
311 self
312 }
313}
314
315#[cfg(feature = "broker")]
316impl Encodable for OffsetFetchResponseGroup {
317 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
318 if version < 0 || version > 9 {
319 bail!("specified version not supported by this message type");
320 }
321 if version >= 8 {
322 types::CompactString.encode(buf, &self.group_id)?;
323 } else {
324 if !self.group_id.is_empty() {
325 bail!("A field is set that is not available on the selected protocol version");
326 }
327 }
328 if version >= 8 {
329 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
330 } else {
331 if !self.topics.is_empty() {
332 bail!("A field is set that is not available on the selected protocol version");
333 }
334 }
335 if version >= 8 {
336 types::Int16.encode(buf, &self.error_code)?;
337 } else {
338 if self.error_code != 0 {
339 bail!("A field is set that is not available on the selected protocol version");
340 }
341 }
342 if version >= 6 {
343 let num_tagged_fields = self.unknown_tagged_fields.len();
344 if num_tagged_fields > std::u32::MAX as usize {
345 bail!(
346 "Too many tagged fields to encode ({} fields)",
347 num_tagged_fields
348 );
349 }
350 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
351
352 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
353 }
354 Ok(())
355 }
356 fn compute_size(&self, version: i16) -> Result<usize> {
357 let mut total_size = 0;
358 if version >= 8 {
359 total_size += types::CompactString.compute_size(&self.group_id)?;
360 } else {
361 if !self.group_id.is_empty() {
362 bail!("A field is set that is not available on the selected protocol version");
363 }
364 }
365 if version >= 8 {
366 total_size +=
367 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
368 } else {
369 if !self.topics.is_empty() {
370 bail!("A field is set that is not available on the selected protocol version");
371 }
372 }
373 if version >= 8 {
374 total_size += types::Int16.compute_size(&self.error_code)?;
375 } else {
376 if self.error_code != 0 {
377 bail!("A field is set that is not available on the selected protocol version");
378 }
379 }
380 if version >= 6 {
381 let num_tagged_fields = self.unknown_tagged_fields.len();
382 if num_tagged_fields > std::u32::MAX as usize {
383 bail!(
384 "Too many tagged fields to encode ({} fields)",
385 num_tagged_fields
386 );
387 }
388 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
389
390 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
391 }
392 Ok(total_size)
393 }
394}
395
396#[cfg(feature = "client")]
397impl Decodable for OffsetFetchResponseGroup {
398 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
399 if version < 0 || version > 9 {
400 bail!("specified version not supported by this message type");
401 }
402 let group_id = if version >= 8 {
403 types::CompactString.decode(buf)?
404 } else {
405 Default::default()
406 };
407 let topics = if version >= 8 {
408 types::CompactArray(types::Struct { version }).decode(buf)?
409 } else {
410 Default::default()
411 };
412 let error_code = if version >= 8 {
413 types::Int16.decode(buf)?
414 } else {
415 0
416 };
417 let mut unknown_tagged_fields = BTreeMap::new();
418 if version >= 6 {
419 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
420 for _ in 0..num_tagged_fields {
421 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
422 let size: u32 = types::UnsignedVarInt.decode(buf)?;
423 let unknown_value = buf.try_get_bytes(size as usize)?;
424 unknown_tagged_fields.insert(tag as i32, unknown_value);
425 }
426 }
427 Ok(Self {
428 group_id,
429 topics,
430 error_code,
431 unknown_tagged_fields,
432 })
433 }
434}
435
436impl Default for OffsetFetchResponseGroup {
437 fn default() -> Self {
438 Self {
439 group_id: Default::default(),
440 topics: Default::default(),
441 error_code: 0,
442 unknown_tagged_fields: BTreeMap::new(),
443 }
444 }
445}
446
447impl Message for OffsetFetchResponseGroup {
448 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
449 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
450}
451
452#[non_exhaustive]
454#[derive(Debug, Clone, PartialEq)]
455pub struct OffsetFetchResponsePartition {
456 pub partition_index: i32,
460
461 pub committed_offset: i64,
465
466 pub committed_leader_epoch: i32,
470
471 pub metadata: Option<StrBytes>,
475
476 pub error_code: i16,
480
481 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
483}
484
485impl OffsetFetchResponsePartition {
486 pub fn with_partition_index(mut self, value: i32) -> Self {
492 self.partition_index = value;
493 self
494 }
495 pub fn with_committed_offset(mut self, value: i64) -> Self {
501 self.committed_offset = value;
502 self
503 }
504 pub fn with_committed_leader_epoch(mut self, value: i32) -> Self {
510 self.committed_leader_epoch = value;
511 self
512 }
513 pub fn with_metadata(mut self, value: Option<StrBytes>) -> Self {
519 self.metadata = value;
520 self
521 }
522 pub fn with_error_code(mut self, value: i16) -> Self {
528 self.error_code = value;
529 self
530 }
531 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
533 self.unknown_tagged_fields = value;
534 self
535 }
536 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
538 self.unknown_tagged_fields.insert(key, value);
539 self
540 }
541}
542
543#[cfg(feature = "broker")]
544impl Encodable for OffsetFetchResponsePartition {
545 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
546 if version < 0 || version > 9 {
547 bail!("specified version not supported by this message type");
548 }
549 if version <= 7 {
550 types::Int32.encode(buf, &self.partition_index)?;
551 } else {
552 if self.partition_index != 0 {
553 bail!("A field is set that is not available on the selected protocol version");
554 }
555 }
556 if version <= 7 {
557 types::Int64.encode(buf, &self.committed_offset)?;
558 } else {
559 if self.committed_offset != 0 {
560 bail!("A field is set that is not available on the selected protocol version");
561 }
562 }
563 if version >= 5 && version <= 7 {
564 types::Int32.encode(buf, &self.committed_leader_epoch)?;
565 }
566 if version <= 7 {
567 if version >= 6 {
568 types::CompactString.encode(buf, &self.metadata)?;
569 } else {
570 types::String.encode(buf, &self.metadata)?;
571 }
572 } else {
573 if !self
574 .metadata
575 .as_ref()
576 .map(|x| x.is_empty())
577 .unwrap_or_default()
578 {
579 bail!("A field is set that is not available on the selected protocol version");
580 }
581 }
582 if version <= 7 {
583 types::Int16.encode(buf, &self.error_code)?;
584 } else {
585 if self.error_code != 0 {
586 bail!("A field is set that is not available on the selected protocol version");
587 }
588 }
589 if version >= 6 {
590 let num_tagged_fields = self.unknown_tagged_fields.len();
591 if num_tagged_fields > std::u32::MAX as usize {
592 bail!(
593 "Too many tagged fields to encode ({} fields)",
594 num_tagged_fields
595 );
596 }
597 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
598
599 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
600 }
601 Ok(())
602 }
603 fn compute_size(&self, version: i16) -> Result<usize> {
604 let mut total_size = 0;
605 if version <= 7 {
606 total_size += types::Int32.compute_size(&self.partition_index)?;
607 } else {
608 if self.partition_index != 0 {
609 bail!("A field is set that is not available on the selected protocol version");
610 }
611 }
612 if version <= 7 {
613 total_size += types::Int64.compute_size(&self.committed_offset)?;
614 } else {
615 if self.committed_offset != 0 {
616 bail!("A field is set that is not available on the selected protocol version");
617 }
618 }
619 if version >= 5 && version <= 7 {
620 total_size += types::Int32.compute_size(&self.committed_leader_epoch)?;
621 }
622 if version <= 7 {
623 if version >= 6 {
624 total_size += types::CompactString.compute_size(&self.metadata)?;
625 } else {
626 total_size += types::String.compute_size(&self.metadata)?;
627 }
628 } else {
629 if !self
630 .metadata
631 .as_ref()
632 .map(|x| x.is_empty())
633 .unwrap_or_default()
634 {
635 bail!("A field is set that is not available on the selected protocol version");
636 }
637 }
638 if version <= 7 {
639 total_size += types::Int16.compute_size(&self.error_code)?;
640 } else {
641 if self.error_code != 0 {
642 bail!("A field is set that is not available on the selected protocol version");
643 }
644 }
645 if version >= 6 {
646 let num_tagged_fields = self.unknown_tagged_fields.len();
647 if num_tagged_fields > std::u32::MAX as usize {
648 bail!(
649 "Too many tagged fields to encode ({} fields)",
650 num_tagged_fields
651 );
652 }
653 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
654
655 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
656 }
657 Ok(total_size)
658 }
659}
660
661#[cfg(feature = "client")]
662impl Decodable for OffsetFetchResponsePartition {
663 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
664 if version < 0 || version > 9 {
665 bail!("specified version not supported by this message type");
666 }
667 let partition_index = if version <= 7 {
668 types::Int32.decode(buf)?
669 } else {
670 0
671 };
672 let committed_offset = if version <= 7 {
673 types::Int64.decode(buf)?
674 } else {
675 0
676 };
677 let committed_leader_epoch = if version >= 5 && version <= 7 {
678 types::Int32.decode(buf)?
679 } else {
680 -1
681 };
682 let metadata = if version <= 7 {
683 if version >= 6 {
684 types::CompactString.decode(buf)?
685 } else {
686 types::String.decode(buf)?
687 }
688 } else {
689 Some(Default::default())
690 };
691 let error_code = if version <= 7 {
692 types::Int16.decode(buf)?
693 } else {
694 0
695 };
696 let mut unknown_tagged_fields = BTreeMap::new();
697 if version >= 6 {
698 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
699 for _ in 0..num_tagged_fields {
700 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
701 let size: u32 = types::UnsignedVarInt.decode(buf)?;
702 let unknown_value = buf.try_get_bytes(size as usize)?;
703 unknown_tagged_fields.insert(tag as i32, unknown_value);
704 }
705 }
706 Ok(Self {
707 partition_index,
708 committed_offset,
709 committed_leader_epoch,
710 metadata,
711 error_code,
712 unknown_tagged_fields,
713 })
714 }
715}
716
717impl Default for OffsetFetchResponsePartition {
718 fn default() -> Self {
719 Self {
720 partition_index: 0,
721 committed_offset: 0,
722 committed_leader_epoch: -1,
723 metadata: Some(Default::default()),
724 error_code: 0,
725 unknown_tagged_fields: BTreeMap::new(),
726 }
727 }
728}
729
730impl Message for OffsetFetchResponsePartition {
731 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
732 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
733}
734
735#[non_exhaustive]
737#[derive(Debug, Clone, PartialEq)]
738pub struct OffsetFetchResponsePartitions {
739 pub partition_index: i32,
743
744 pub committed_offset: i64,
748
749 pub committed_leader_epoch: i32,
753
754 pub metadata: Option<StrBytes>,
758
759 pub error_code: i16,
763
764 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
766}
767
768impl OffsetFetchResponsePartitions {
769 pub fn with_partition_index(mut self, value: i32) -> Self {
775 self.partition_index = value;
776 self
777 }
778 pub fn with_committed_offset(mut self, value: i64) -> Self {
784 self.committed_offset = value;
785 self
786 }
787 pub fn with_committed_leader_epoch(mut self, value: i32) -> Self {
793 self.committed_leader_epoch = value;
794 self
795 }
796 pub fn with_metadata(mut self, value: Option<StrBytes>) -> Self {
802 self.metadata = value;
803 self
804 }
805 pub fn with_error_code(mut self, value: i16) -> Self {
811 self.error_code = value;
812 self
813 }
814 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
816 self.unknown_tagged_fields = value;
817 self
818 }
819 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
821 self.unknown_tagged_fields.insert(key, value);
822 self
823 }
824}
825
826#[cfg(feature = "broker")]
827impl Encodable for OffsetFetchResponsePartitions {
828 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
829 if version < 0 || version > 9 {
830 bail!("specified version not supported by this message type");
831 }
832 if version >= 8 {
833 types::Int32.encode(buf, &self.partition_index)?;
834 } else {
835 if self.partition_index != 0 {
836 bail!("A field is set that is not available on the selected protocol version");
837 }
838 }
839 if version >= 8 {
840 types::Int64.encode(buf, &self.committed_offset)?;
841 } else {
842 if self.committed_offset != 0 {
843 bail!("A field is set that is not available on the selected protocol version");
844 }
845 }
846 if version >= 8 {
847 types::Int32.encode(buf, &self.committed_leader_epoch)?;
848 }
849 if version >= 8 {
850 types::CompactString.encode(buf, &self.metadata)?;
851 } else {
852 if !self
853 .metadata
854 .as_ref()
855 .map(|x| x.is_empty())
856 .unwrap_or_default()
857 {
858 bail!("A field is set that is not available on the selected protocol version");
859 }
860 }
861 if version >= 8 {
862 types::Int16.encode(buf, &self.error_code)?;
863 } else {
864 if self.error_code != 0 {
865 bail!("A field is set that is not available on the selected protocol version");
866 }
867 }
868 if version >= 6 {
869 let num_tagged_fields = self.unknown_tagged_fields.len();
870 if num_tagged_fields > std::u32::MAX as usize {
871 bail!(
872 "Too many tagged fields to encode ({} fields)",
873 num_tagged_fields
874 );
875 }
876 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
877
878 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
879 }
880 Ok(())
881 }
882 fn compute_size(&self, version: i16) -> Result<usize> {
883 let mut total_size = 0;
884 if version >= 8 {
885 total_size += types::Int32.compute_size(&self.partition_index)?;
886 } else {
887 if self.partition_index != 0 {
888 bail!("A field is set that is not available on the selected protocol version");
889 }
890 }
891 if version >= 8 {
892 total_size += types::Int64.compute_size(&self.committed_offset)?;
893 } else {
894 if self.committed_offset != 0 {
895 bail!("A field is set that is not available on the selected protocol version");
896 }
897 }
898 if version >= 8 {
899 total_size += types::Int32.compute_size(&self.committed_leader_epoch)?;
900 }
901 if version >= 8 {
902 total_size += types::CompactString.compute_size(&self.metadata)?;
903 } else {
904 if !self
905 .metadata
906 .as_ref()
907 .map(|x| x.is_empty())
908 .unwrap_or_default()
909 {
910 bail!("A field is set that is not available on the selected protocol version");
911 }
912 }
913 if version >= 8 {
914 total_size += types::Int16.compute_size(&self.error_code)?;
915 } else {
916 if self.error_code != 0 {
917 bail!("A field is set that is not available on the selected protocol version");
918 }
919 }
920 if version >= 6 {
921 let num_tagged_fields = self.unknown_tagged_fields.len();
922 if num_tagged_fields > std::u32::MAX as usize {
923 bail!(
924 "Too many tagged fields to encode ({} fields)",
925 num_tagged_fields
926 );
927 }
928 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
929
930 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
931 }
932 Ok(total_size)
933 }
934}
935
936#[cfg(feature = "client")]
937impl Decodable for OffsetFetchResponsePartitions {
938 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
939 if version < 0 || version > 9 {
940 bail!("specified version not supported by this message type");
941 }
942 let partition_index = if version >= 8 {
943 types::Int32.decode(buf)?
944 } else {
945 0
946 };
947 let committed_offset = if version >= 8 {
948 types::Int64.decode(buf)?
949 } else {
950 0
951 };
952 let committed_leader_epoch = if version >= 8 {
953 types::Int32.decode(buf)?
954 } else {
955 -1
956 };
957 let metadata = if version >= 8 {
958 types::CompactString.decode(buf)?
959 } else {
960 Some(Default::default())
961 };
962 let error_code = if version >= 8 {
963 types::Int16.decode(buf)?
964 } else {
965 0
966 };
967 let mut unknown_tagged_fields = BTreeMap::new();
968 if version >= 6 {
969 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
970 for _ in 0..num_tagged_fields {
971 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
972 let size: u32 = types::UnsignedVarInt.decode(buf)?;
973 let unknown_value = buf.try_get_bytes(size as usize)?;
974 unknown_tagged_fields.insert(tag as i32, unknown_value);
975 }
976 }
977 Ok(Self {
978 partition_index,
979 committed_offset,
980 committed_leader_epoch,
981 metadata,
982 error_code,
983 unknown_tagged_fields,
984 })
985 }
986}
987
988impl Default for OffsetFetchResponsePartitions {
989 fn default() -> Self {
990 Self {
991 partition_index: 0,
992 committed_offset: 0,
993 committed_leader_epoch: -1,
994 metadata: Some(Default::default()),
995 error_code: 0,
996 unknown_tagged_fields: BTreeMap::new(),
997 }
998 }
999}
1000
1001impl Message for OffsetFetchResponsePartitions {
1002 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
1003 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1004}
1005
1006#[non_exhaustive]
1008#[derive(Debug, Clone, PartialEq)]
1009pub struct OffsetFetchResponseTopic {
1010 pub name: super::TopicName,
1014
1015 pub partitions: Vec<OffsetFetchResponsePartition>,
1019
1020 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1022}
1023
1024impl OffsetFetchResponseTopic {
1025 pub fn with_name(mut self, value: super::TopicName) -> Self {
1031 self.name = value;
1032 self
1033 }
1034 pub fn with_partitions(mut self, value: Vec<OffsetFetchResponsePartition>) -> Self {
1040 self.partitions = value;
1041 self
1042 }
1043 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1045 self.unknown_tagged_fields = value;
1046 self
1047 }
1048 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1050 self.unknown_tagged_fields.insert(key, value);
1051 self
1052 }
1053}
1054
1055#[cfg(feature = "broker")]
1056impl Encodable for OffsetFetchResponseTopic {
1057 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1058 if version < 0 || version > 9 {
1059 bail!("specified version not supported by this message type");
1060 }
1061 if version <= 7 {
1062 if version >= 6 {
1063 types::CompactString.encode(buf, &self.name)?;
1064 } else {
1065 types::String.encode(buf, &self.name)?;
1066 }
1067 } else {
1068 if !self.name.is_empty() {
1069 bail!("A field is set that is not available on the selected protocol version");
1070 }
1071 }
1072 if version <= 7 {
1073 if version >= 6 {
1074 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
1075 } else {
1076 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
1077 }
1078 } else {
1079 if !self.partitions.is_empty() {
1080 bail!("A field is set that is not available on the selected protocol version");
1081 }
1082 }
1083 if version >= 6 {
1084 let num_tagged_fields = self.unknown_tagged_fields.len();
1085 if num_tagged_fields > std::u32::MAX as usize {
1086 bail!(
1087 "Too many tagged fields to encode ({} fields)",
1088 num_tagged_fields
1089 );
1090 }
1091 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1092
1093 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1094 }
1095 Ok(())
1096 }
1097 fn compute_size(&self, version: i16) -> Result<usize> {
1098 let mut total_size = 0;
1099 if version <= 7 {
1100 if version >= 6 {
1101 total_size += types::CompactString.compute_size(&self.name)?;
1102 } else {
1103 total_size += types::String.compute_size(&self.name)?;
1104 }
1105 } else {
1106 if !self.name.is_empty() {
1107 bail!("A field is set that is not available on the selected protocol version");
1108 }
1109 }
1110 if version <= 7 {
1111 if version >= 6 {
1112 total_size += types::CompactArray(types::Struct { version })
1113 .compute_size(&self.partitions)?;
1114 } else {
1115 total_size +=
1116 types::Array(types::Struct { version }).compute_size(&self.partitions)?;
1117 }
1118 } else {
1119 if !self.partitions.is_empty() {
1120 bail!("A field is set that is not available on the selected protocol version");
1121 }
1122 }
1123 if version >= 6 {
1124 let num_tagged_fields = self.unknown_tagged_fields.len();
1125 if num_tagged_fields > std::u32::MAX as usize {
1126 bail!(
1127 "Too many tagged fields to encode ({} fields)",
1128 num_tagged_fields
1129 );
1130 }
1131 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1132
1133 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1134 }
1135 Ok(total_size)
1136 }
1137}
1138
1139#[cfg(feature = "client")]
1140impl Decodable for OffsetFetchResponseTopic {
1141 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1142 if version < 0 || version > 9 {
1143 bail!("specified version not supported by this message type");
1144 }
1145 let name = if version <= 7 {
1146 if version >= 6 {
1147 types::CompactString.decode(buf)?
1148 } else {
1149 types::String.decode(buf)?
1150 }
1151 } else {
1152 Default::default()
1153 };
1154 let partitions = if version <= 7 {
1155 if version >= 6 {
1156 types::CompactArray(types::Struct { version }).decode(buf)?
1157 } else {
1158 types::Array(types::Struct { version }).decode(buf)?
1159 }
1160 } else {
1161 Default::default()
1162 };
1163 let mut unknown_tagged_fields = BTreeMap::new();
1164 if version >= 6 {
1165 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1166 for _ in 0..num_tagged_fields {
1167 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1168 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1169 let unknown_value = buf.try_get_bytes(size as usize)?;
1170 unknown_tagged_fields.insert(tag as i32, unknown_value);
1171 }
1172 }
1173 Ok(Self {
1174 name,
1175 partitions,
1176 unknown_tagged_fields,
1177 })
1178 }
1179}
1180
1181impl Default for OffsetFetchResponseTopic {
1182 fn default() -> Self {
1183 Self {
1184 name: Default::default(),
1185 partitions: Default::default(),
1186 unknown_tagged_fields: BTreeMap::new(),
1187 }
1188 }
1189}
1190
1191impl Message for OffsetFetchResponseTopic {
1192 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
1193 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1194}
1195
1196#[non_exhaustive]
1198#[derive(Debug, Clone, PartialEq)]
1199pub struct OffsetFetchResponseTopics {
1200 pub name: super::TopicName,
1204
1205 pub partitions: Vec<OffsetFetchResponsePartitions>,
1209
1210 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1212}
1213
1214impl OffsetFetchResponseTopics {
1215 pub fn with_name(mut self, value: super::TopicName) -> Self {
1221 self.name = value;
1222 self
1223 }
1224 pub fn with_partitions(mut self, value: Vec<OffsetFetchResponsePartitions>) -> Self {
1230 self.partitions = value;
1231 self
1232 }
1233 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1235 self.unknown_tagged_fields = value;
1236 self
1237 }
1238 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1240 self.unknown_tagged_fields.insert(key, value);
1241 self
1242 }
1243}
1244
1245#[cfg(feature = "broker")]
1246impl Encodable for OffsetFetchResponseTopics {
1247 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1248 if version < 0 || version > 9 {
1249 bail!("specified version not supported by this message type");
1250 }
1251 if version >= 8 {
1252 types::CompactString.encode(buf, &self.name)?;
1253 } else {
1254 if !self.name.is_empty() {
1255 bail!("A field is set that is not available on the selected protocol version");
1256 }
1257 }
1258 if version >= 8 {
1259 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
1260 } else {
1261 if !self.partitions.is_empty() {
1262 bail!("A field is set that is not available on the selected protocol version");
1263 }
1264 }
1265 if version >= 6 {
1266 let num_tagged_fields = self.unknown_tagged_fields.len();
1267 if num_tagged_fields > std::u32::MAX as usize {
1268 bail!(
1269 "Too many tagged fields to encode ({} fields)",
1270 num_tagged_fields
1271 );
1272 }
1273 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1274
1275 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1276 }
1277 Ok(())
1278 }
1279 fn compute_size(&self, version: i16) -> Result<usize> {
1280 let mut total_size = 0;
1281 if version >= 8 {
1282 total_size += types::CompactString.compute_size(&self.name)?;
1283 } else {
1284 if !self.name.is_empty() {
1285 bail!("A field is set that is not available on the selected protocol version");
1286 }
1287 }
1288 if version >= 8 {
1289 total_size +=
1290 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
1291 } else {
1292 if !self.partitions.is_empty() {
1293 bail!("A field is set that is not available on the selected protocol version");
1294 }
1295 }
1296 if version >= 6 {
1297 let num_tagged_fields = self.unknown_tagged_fields.len();
1298 if num_tagged_fields > std::u32::MAX as usize {
1299 bail!(
1300 "Too many tagged fields to encode ({} fields)",
1301 num_tagged_fields
1302 );
1303 }
1304 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1305
1306 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1307 }
1308 Ok(total_size)
1309 }
1310}
1311
1312#[cfg(feature = "client")]
1313impl Decodable for OffsetFetchResponseTopics {
1314 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1315 if version < 0 || version > 9 {
1316 bail!("specified version not supported by this message type");
1317 }
1318 let name = if version >= 8 {
1319 types::CompactString.decode(buf)?
1320 } else {
1321 Default::default()
1322 };
1323 let partitions = if version >= 8 {
1324 types::CompactArray(types::Struct { version }).decode(buf)?
1325 } else {
1326 Default::default()
1327 };
1328 let mut unknown_tagged_fields = BTreeMap::new();
1329 if version >= 6 {
1330 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1331 for _ in 0..num_tagged_fields {
1332 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1333 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1334 let unknown_value = buf.try_get_bytes(size as usize)?;
1335 unknown_tagged_fields.insert(tag as i32, unknown_value);
1336 }
1337 }
1338 Ok(Self {
1339 name,
1340 partitions,
1341 unknown_tagged_fields,
1342 })
1343 }
1344}
1345
1346impl Default for OffsetFetchResponseTopics {
1347 fn default() -> Self {
1348 Self {
1349 name: Default::default(),
1350 partitions: Default::default(),
1351 unknown_tagged_fields: BTreeMap::new(),
1352 }
1353 }
1354}
1355
1356impl Message for OffsetFetchResponseTopics {
1357 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
1358 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1359}
1360
1361impl HeaderVersion for OffsetFetchResponse {
1362 fn header_version(version: i16) -> i16 {
1363 if version >= 6 {
1364 1
1365 } else {
1366 0
1367 }
1368 }
1369}