kafka_protocol/messages/
offset_fetch_request.rs

1//! OffsetFetchRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetFetchRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-9
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetFetchRequest {
24    /// The group to fetch offsets for.
25    ///
26    /// Supported API versions: 0-7
27    pub group_id: super::GroupId,
28
29    /// Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.
30    ///
31    /// Supported API versions: 0-7
32    pub topics: Option<Vec<OffsetFetchRequestTopic>>,
33
34    /// Each group we would like to fetch offsets for
35    ///
36    /// Supported API versions: 8-9
37    pub groups: Vec<OffsetFetchRequestGroup>,
38
39    /// Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions.
40    ///
41    /// Supported API versions: 7-9
42    pub require_stable: bool,
43
44    /// Other tagged fields
45    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl OffsetFetchRequest {
49    /// Sets `group_id` to the passed value.
50    ///
51    /// The group to fetch offsets for.
52    ///
53    /// Supported API versions: 0-7
54    pub fn with_group_id(mut self, value: super::GroupId) -> Self {
55        self.group_id = value;
56        self
57    }
58    /// Sets `topics` to the passed value.
59    ///
60    /// Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.
61    ///
62    /// Supported API versions: 0-7
63    pub fn with_topics(mut self, value: Option<Vec<OffsetFetchRequestTopic>>) -> Self {
64        self.topics = value;
65        self
66    }
67    /// Sets `groups` to the passed value.
68    ///
69    /// Each group we would like to fetch offsets for
70    ///
71    /// Supported API versions: 8-9
72    pub fn with_groups(mut self, value: Vec<OffsetFetchRequestGroup>) -> Self {
73        self.groups = value;
74        self
75    }
76    /// Sets `require_stable` to the passed value.
77    ///
78    /// Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions.
79    ///
80    /// Supported API versions: 7-9
81    pub fn with_require_stable(mut self, value: bool) -> Self {
82        self.require_stable = value;
83        self
84    }
85    /// Sets unknown_tagged_fields to the passed value.
86    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87        self.unknown_tagged_fields = value;
88        self
89    }
90    /// Inserts an entry into unknown_tagged_fields.
91    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92        self.unknown_tagged_fields.insert(key, value);
93        self
94    }
95}
96
97#[cfg(feature = "client")]
98impl Encodable for OffsetFetchRequest {
99    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100        if version < 0 || version > 9 {
101            bail!("specified version not supported by this message type");
102        }
103        if version <= 7 {
104            if version >= 6 {
105                types::CompactString.encode(buf, &self.group_id)?;
106            } else {
107                types::String.encode(buf, &self.group_id)?;
108            }
109        } else {
110            if !self.group_id.is_empty() {
111                bail!("A field is set that is not available on the selected protocol version");
112            }
113        }
114        if version <= 7 {
115            if version >= 6 {
116                types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
117            } else {
118                types::Array(types::Struct { version }).encode(buf, &self.topics)?;
119            }
120        } else {
121            if !self
122                .topics
123                .as_ref()
124                .map(|x| x.is_empty())
125                .unwrap_or_default()
126            {
127                bail!("A field is set that is not available on the selected protocol version");
128            }
129        }
130        if version >= 8 {
131            types::CompactArray(types::Struct { version }).encode(buf, &self.groups)?;
132        } else {
133            if !self.groups.is_empty() {
134                bail!("A field is set that is not available on the selected protocol version");
135            }
136        }
137        if version >= 7 {
138            types::Boolean.encode(buf, &self.require_stable)?;
139        } else {
140            if self.require_stable {
141                bail!("A field is set that is not available on the selected protocol version");
142            }
143        }
144        if version >= 6 {
145            let num_tagged_fields = self.unknown_tagged_fields.len();
146            if num_tagged_fields > std::u32::MAX as usize {
147                bail!(
148                    "Too many tagged fields to encode ({} fields)",
149                    num_tagged_fields
150                );
151            }
152            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
153
154            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
155        }
156        Ok(())
157    }
158    fn compute_size(&self, version: i16) -> Result<usize> {
159        let mut total_size = 0;
160        if version <= 7 {
161            if version >= 6 {
162                total_size += types::CompactString.compute_size(&self.group_id)?;
163            } else {
164                total_size += types::String.compute_size(&self.group_id)?;
165            }
166        } else {
167            if !self.group_id.is_empty() {
168                bail!("A field is set that is not available on the selected protocol version");
169            }
170        }
171        if version <= 7 {
172            if version >= 6 {
173                total_size +=
174                    types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
175            } else {
176                total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
177            }
178        } else {
179            if !self
180                .topics
181                .as_ref()
182                .map(|x| x.is_empty())
183                .unwrap_or_default()
184            {
185                bail!("A field is set that is not available on the selected protocol version");
186            }
187        }
188        if version >= 8 {
189            total_size +=
190                types::CompactArray(types::Struct { version }).compute_size(&self.groups)?;
191        } else {
192            if !self.groups.is_empty() {
193                bail!("A field is set that is not available on the selected protocol version");
194            }
195        }
196        if version >= 7 {
197            total_size += types::Boolean.compute_size(&self.require_stable)?;
198        } else {
199            if self.require_stable {
200                bail!("A field is set that is not available on the selected protocol version");
201            }
202        }
203        if version >= 6 {
204            let num_tagged_fields = self.unknown_tagged_fields.len();
205            if num_tagged_fields > std::u32::MAX as usize {
206                bail!(
207                    "Too many tagged fields to encode ({} fields)",
208                    num_tagged_fields
209                );
210            }
211            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
212
213            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
214        }
215        Ok(total_size)
216    }
217}
218
219#[cfg(feature = "broker")]
220impl Decodable for OffsetFetchRequest {
221    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
222        if version < 0 || version > 9 {
223            bail!("specified version not supported by this message type");
224        }
225        let group_id = if version <= 7 {
226            if version >= 6 {
227                types::CompactString.decode(buf)?
228            } else {
229                types::String.decode(buf)?
230            }
231        } else {
232            Default::default()
233        };
234        let topics = if version <= 7 {
235            if version >= 6 {
236                types::CompactArray(types::Struct { version }).decode(buf)?
237            } else {
238                types::Array(types::Struct { version }).decode(buf)?
239            }
240        } else {
241            Some(Default::default())
242        };
243        let groups = if version >= 8 {
244            types::CompactArray(types::Struct { version }).decode(buf)?
245        } else {
246            Default::default()
247        };
248        let require_stable = if version >= 7 {
249            types::Boolean.decode(buf)?
250        } else {
251            false
252        };
253        let mut unknown_tagged_fields = BTreeMap::new();
254        if version >= 6 {
255            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
256            for _ in 0..num_tagged_fields {
257                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
258                let size: u32 = types::UnsignedVarInt.decode(buf)?;
259                let unknown_value = buf.try_get_bytes(size as usize)?;
260                unknown_tagged_fields.insert(tag as i32, unknown_value);
261            }
262        }
263        Ok(Self {
264            group_id,
265            topics,
266            groups,
267            require_stable,
268            unknown_tagged_fields,
269        })
270    }
271}
272
273impl Default for OffsetFetchRequest {
274    fn default() -> Self {
275        Self {
276            group_id: Default::default(),
277            topics: Some(Default::default()),
278            groups: Default::default(),
279            require_stable: false,
280            unknown_tagged_fields: BTreeMap::new(),
281        }
282    }
283}
284
285impl Message for OffsetFetchRequest {
286    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
287    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
288}
289
290/// Valid versions: 0-9
291#[non_exhaustive]
292#[derive(Debug, Clone, PartialEq)]
293pub struct OffsetFetchRequestGroup {
294    /// The group ID.
295    ///
296    /// Supported API versions: 8-9
297    pub group_id: super::GroupId,
298
299    /// The member ID assigned by the group coordinator if using the new consumer protocol (KIP-848).
300    ///
301    /// Supported API versions: 9
302    pub member_id: Option<StrBytes>,
303
304    /// The member epoch if using the new consumer protocol (KIP-848).
305    ///
306    /// Supported API versions: 9
307    pub member_epoch: i32,
308
309    /// Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.
310    ///
311    /// Supported API versions: 8-9
312    pub topics: Option<Vec<OffsetFetchRequestTopics>>,
313
314    /// Other tagged fields
315    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
316}
317
318impl OffsetFetchRequestGroup {
319    /// Sets `group_id` to the passed value.
320    ///
321    /// The group ID.
322    ///
323    /// Supported API versions: 8-9
324    pub fn with_group_id(mut self, value: super::GroupId) -> Self {
325        self.group_id = value;
326        self
327    }
328    /// Sets `member_id` to the passed value.
329    ///
330    /// The member ID assigned by the group coordinator if using the new consumer protocol (KIP-848).
331    ///
332    /// Supported API versions: 9
333    pub fn with_member_id(mut self, value: Option<StrBytes>) -> Self {
334        self.member_id = value;
335        self
336    }
337    /// Sets `member_epoch` to the passed value.
338    ///
339    /// The member epoch if using the new consumer protocol (KIP-848).
340    ///
341    /// Supported API versions: 9
342    pub fn with_member_epoch(mut self, value: i32) -> Self {
343        self.member_epoch = value;
344        self
345    }
346    /// Sets `topics` to the passed value.
347    ///
348    /// Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.
349    ///
350    /// Supported API versions: 8-9
351    pub fn with_topics(mut self, value: Option<Vec<OffsetFetchRequestTopics>>) -> Self {
352        self.topics = value;
353        self
354    }
355    /// Sets unknown_tagged_fields to the passed value.
356    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
357        self.unknown_tagged_fields = value;
358        self
359    }
360    /// Inserts an entry into unknown_tagged_fields.
361    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
362        self.unknown_tagged_fields.insert(key, value);
363        self
364    }
365}
366
367#[cfg(feature = "client")]
368impl Encodable for OffsetFetchRequestGroup {
369    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
370        if version < 0 || version > 9 {
371            bail!("specified version not supported by this message type");
372        }
373        if version >= 8 {
374            types::CompactString.encode(buf, &self.group_id)?;
375        } else {
376            if !self.group_id.is_empty() {
377                bail!("A field is set that is not available on the selected protocol version");
378            }
379        }
380        if version >= 9 {
381            types::CompactString.encode(buf, &self.member_id)?;
382        }
383        if version >= 9 {
384            types::Int32.encode(buf, &self.member_epoch)?;
385        }
386        if version >= 8 {
387            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
388        } else {
389            if !self
390                .topics
391                .as_ref()
392                .map(|x| x.is_empty())
393                .unwrap_or_default()
394            {
395                bail!("A field is set that is not available on the selected protocol version");
396            }
397        }
398        if version >= 6 {
399            let num_tagged_fields = self.unknown_tagged_fields.len();
400            if num_tagged_fields > std::u32::MAX as usize {
401                bail!(
402                    "Too many tagged fields to encode ({} fields)",
403                    num_tagged_fields
404                );
405            }
406            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
407
408            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
409        }
410        Ok(())
411    }
412    fn compute_size(&self, version: i16) -> Result<usize> {
413        let mut total_size = 0;
414        if version >= 8 {
415            total_size += types::CompactString.compute_size(&self.group_id)?;
416        } else {
417            if !self.group_id.is_empty() {
418                bail!("A field is set that is not available on the selected protocol version");
419            }
420        }
421        if version >= 9 {
422            total_size += types::CompactString.compute_size(&self.member_id)?;
423        }
424        if version >= 9 {
425            total_size += types::Int32.compute_size(&self.member_epoch)?;
426        }
427        if version >= 8 {
428            total_size +=
429                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
430        } else {
431            if !self
432                .topics
433                .as_ref()
434                .map(|x| x.is_empty())
435                .unwrap_or_default()
436            {
437                bail!("A field is set that is not available on the selected protocol version");
438            }
439        }
440        if version >= 6 {
441            let num_tagged_fields = self.unknown_tagged_fields.len();
442            if num_tagged_fields > std::u32::MAX as usize {
443                bail!(
444                    "Too many tagged fields to encode ({} fields)",
445                    num_tagged_fields
446                );
447            }
448            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
449
450            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
451        }
452        Ok(total_size)
453    }
454}
455
456#[cfg(feature = "broker")]
457impl Decodable for OffsetFetchRequestGroup {
458    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
459        if version < 0 || version > 9 {
460            bail!("specified version not supported by this message type");
461        }
462        let group_id = if version >= 8 {
463            types::CompactString.decode(buf)?
464        } else {
465            Default::default()
466        };
467        let member_id = if version >= 9 {
468            types::CompactString.decode(buf)?
469        } else {
470            None
471        };
472        let member_epoch = if version >= 9 {
473            types::Int32.decode(buf)?
474        } else {
475            -1
476        };
477        let topics = if version >= 8 {
478            types::CompactArray(types::Struct { version }).decode(buf)?
479        } else {
480            Some(Default::default())
481        };
482        let mut unknown_tagged_fields = BTreeMap::new();
483        if version >= 6 {
484            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
485            for _ in 0..num_tagged_fields {
486                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
487                let size: u32 = types::UnsignedVarInt.decode(buf)?;
488                let unknown_value = buf.try_get_bytes(size as usize)?;
489                unknown_tagged_fields.insert(tag as i32, unknown_value);
490            }
491        }
492        Ok(Self {
493            group_id,
494            member_id,
495            member_epoch,
496            topics,
497            unknown_tagged_fields,
498        })
499    }
500}
501
502impl Default for OffsetFetchRequestGroup {
503    fn default() -> Self {
504        Self {
505            group_id: Default::default(),
506            member_id: None,
507            member_epoch: -1,
508            topics: Some(Default::default()),
509            unknown_tagged_fields: BTreeMap::new(),
510        }
511    }
512}
513
514impl Message for OffsetFetchRequestGroup {
515    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
516    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
517}
518
519/// Valid versions: 0-9
520#[non_exhaustive]
521#[derive(Debug, Clone, PartialEq)]
522pub struct OffsetFetchRequestTopic {
523    /// The topic name.
524    ///
525    /// Supported API versions: 0-7
526    pub name: super::TopicName,
527
528    /// The partition indexes we would like to fetch offsets for.
529    ///
530    /// Supported API versions: 0-7
531    pub partition_indexes: Vec<i32>,
532
533    /// Other tagged fields
534    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
535}
536
537impl OffsetFetchRequestTopic {
538    /// Sets `name` to the passed value.
539    ///
540    /// The topic name.
541    ///
542    /// Supported API versions: 0-7
543    pub fn with_name(mut self, value: super::TopicName) -> Self {
544        self.name = value;
545        self
546    }
547    /// Sets `partition_indexes` to the passed value.
548    ///
549    /// The partition indexes we would like to fetch offsets for.
550    ///
551    /// Supported API versions: 0-7
552    pub fn with_partition_indexes(mut self, value: Vec<i32>) -> Self {
553        self.partition_indexes = value;
554        self
555    }
556    /// Sets unknown_tagged_fields to the passed value.
557    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
558        self.unknown_tagged_fields = value;
559        self
560    }
561    /// Inserts an entry into unknown_tagged_fields.
562    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
563        self.unknown_tagged_fields.insert(key, value);
564        self
565    }
566}
567
568#[cfg(feature = "client")]
569impl Encodable for OffsetFetchRequestTopic {
570    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
571        if version < 0 || version > 9 {
572            bail!("specified version not supported by this message type");
573        }
574        if version <= 7 {
575            if version >= 6 {
576                types::CompactString.encode(buf, &self.name)?;
577            } else {
578                types::String.encode(buf, &self.name)?;
579            }
580        } else {
581            if !self.name.is_empty() {
582                bail!("A field is set that is not available on the selected protocol version");
583            }
584        }
585        if version <= 7 {
586            if version >= 6 {
587                types::CompactArray(types::Int32).encode(buf, &self.partition_indexes)?;
588            } else {
589                types::Array(types::Int32).encode(buf, &self.partition_indexes)?;
590            }
591        } else {
592            if !self.partition_indexes.is_empty() {
593                bail!("A field is set that is not available on the selected protocol version");
594            }
595        }
596        if version >= 6 {
597            let num_tagged_fields = self.unknown_tagged_fields.len();
598            if num_tagged_fields > std::u32::MAX as usize {
599                bail!(
600                    "Too many tagged fields to encode ({} fields)",
601                    num_tagged_fields
602                );
603            }
604            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
605
606            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
607        }
608        Ok(())
609    }
610    fn compute_size(&self, version: i16) -> Result<usize> {
611        let mut total_size = 0;
612        if version <= 7 {
613            if version >= 6 {
614                total_size += types::CompactString.compute_size(&self.name)?;
615            } else {
616                total_size += types::String.compute_size(&self.name)?;
617            }
618        } else {
619            if !self.name.is_empty() {
620                bail!("A field is set that is not available on the selected protocol version");
621            }
622        }
623        if version <= 7 {
624            if version >= 6 {
625                total_size +=
626                    types::CompactArray(types::Int32).compute_size(&self.partition_indexes)?;
627            } else {
628                total_size += types::Array(types::Int32).compute_size(&self.partition_indexes)?;
629            }
630        } else {
631            if !self.partition_indexes.is_empty() {
632                bail!("A field is set that is not available on the selected protocol version");
633            }
634        }
635        if version >= 6 {
636            let num_tagged_fields = self.unknown_tagged_fields.len();
637            if num_tagged_fields > std::u32::MAX as usize {
638                bail!(
639                    "Too many tagged fields to encode ({} fields)",
640                    num_tagged_fields
641                );
642            }
643            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
644
645            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
646        }
647        Ok(total_size)
648    }
649}
650
651#[cfg(feature = "broker")]
652impl Decodable for OffsetFetchRequestTopic {
653    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
654        if version < 0 || version > 9 {
655            bail!("specified version not supported by this message type");
656        }
657        let name = if version <= 7 {
658            if version >= 6 {
659                types::CompactString.decode(buf)?
660            } else {
661                types::String.decode(buf)?
662            }
663        } else {
664            Default::default()
665        };
666        let partition_indexes = if version <= 7 {
667            if version >= 6 {
668                types::CompactArray(types::Int32).decode(buf)?
669            } else {
670                types::Array(types::Int32).decode(buf)?
671            }
672        } else {
673            Default::default()
674        };
675        let mut unknown_tagged_fields = BTreeMap::new();
676        if version >= 6 {
677            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
678            for _ in 0..num_tagged_fields {
679                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
680                let size: u32 = types::UnsignedVarInt.decode(buf)?;
681                let unknown_value = buf.try_get_bytes(size as usize)?;
682                unknown_tagged_fields.insert(tag as i32, unknown_value);
683            }
684        }
685        Ok(Self {
686            name,
687            partition_indexes,
688            unknown_tagged_fields,
689        })
690    }
691}
692
693impl Default for OffsetFetchRequestTopic {
694    fn default() -> Self {
695        Self {
696            name: Default::default(),
697            partition_indexes: Default::default(),
698            unknown_tagged_fields: BTreeMap::new(),
699        }
700    }
701}
702
703impl Message for OffsetFetchRequestTopic {
704    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
705    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
706}
707
708/// Valid versions: 0-9
709#[non_exhaustive]
710#[derive(Debug, Clone, PartialEq)]
711pub struct OffsetFetchRequestTopics {
712    /// The topic name.
713    ///
714    /// Supported API versions: 8-9
715    pub name: super::TopicName,
716
717    /// The partition indexes we would like to fetch offsets for.
718    ///
719    /// Supported API versions: 8-9
720    pub partition_indexes: Vec<i32>,
721
722    /// Other tagged fields
723    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
724}
725
726impl OffsetFetchRequestTopics {
727    /// Sets `name` to the passed value.
728    ///
729    /// The topic name.
730    ///
731    /// Supported API versions: 8-9
732    pub fn with_name(mut self, value: super::TopicName) -> Self {
733        self.name = value;
734        self
735    }
736    /// Sets `partition_indexes` to the passed value.
737    ///
738    /// The partition indexes we would like to fetch offsets for.
739    ///
740    /// Supported API versions: 8-9
741    pub fn with_partition_indexes(mut self, value: Vec<i32>) -> Self {
742        self.partition_indexes = value;
743        self
744    }
745    /// Sets unknown_tagged_fields to the passed value.
746    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
747        self.unknown_tagged_fields = value;
748        self
749    }
750    /// Inserts an entry into unknown_tagged_fields.
751    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
752        self.unknown_tagged_fields.insert(key, value);
753        self
754    }
755}
756
757#[cfg(feature = "client")]
758impl Encodable for OffsetFetchRequestTopics {
759    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
760        if version < 0 || version > 9 {
761            bail!("specified version not supported by this message type");
762        }
763        if version >= 8 {
764            types::CompactString.encode(buf, &self.name)?;
765        } else {
766            if !self.name.is_empty() {
767                bail!("A field is set that is not available on the selected protocol version");
768            }
769        }
770        if version >= 8 {
771            types::CompactArray(types::Int32).encode(buf, &self.partition_indexes)?;
772        } else {
773            if !self.partition_indexes.is_empty() {
774                bail!("A field is set that is not available on the selected protocol version");
775            }
776        }
777        if version >= 6 {
778            let num_tagged_fields = self.unknown_tagged_fields.len();
779            if num_tagged_fields > std::u32::MAX as usize {
780                bail!(
781                    "Too many tagged fields to encode ({} fields)",
782                    num_tagged_fields
783                );
784            }
785            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
786
787            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
788        }
789        Ok(())
790    }
791    fn compute_size(&self, version: i16) -> Result<usize> {
792        let mut total_size = 0;
793        if version >= 8 {
794            total_size += types::CompactString.compute_size(&self.name)?;
795        } else {
796            if !self.name.is_empty() {
797                bail!("A field is set that is not available on the selected protocol version");
798            }
799        }
800        if version >= 8 {
801            total_size +=
802                types::CompactArray(types::Int32).compute_size(&self.partition_indexes)?;
803        } else {
804            if !self.partition_indexes.is_empty() {
805                bail!("A field is set that is not available on the selected protocol version");
806            }
807        }
808        if version >= 6 {
809            let num_tagged_fields = self.unknown_tagged_fields.len();
810            if num_tagged_fields > std::u32::MAX as usize {
811                bail!(
812                    "Too many tagged fields to encode ({} fields)",
813                    num_tagged_fields
814                );
815            }
816            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
817
818            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
819        }
820        Ok(total_size)
821    }
822}
823
824#[cfg(feature = "broker")]
825impl Decodable for OffsetFetchRequestTopics {
826    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
827        if version < 0 || version > 9 {
828            bail!("specified version not supported by this message type");
829        }
830        let name = if version >= 8 {
831            types::CompactString.decode(buf)?
832        } else {
833            Default::default()
834        };
835        let partition_indexes = if version >= 8 {
836            types::CompactArray(types::Int32).decode(buf)?
837        } else {
838            Default::default()
839        };
840        let mut unknown_tagged_fields = BTreeMap::new();
841        if version >= 6 {
842            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
843            for _ in 0..num_tagged_fields {
844                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
845                let size: u32 = types::UnsignedVarInt.decode(buf)?;
846                let unknown_value = buf.try_get_bytes(size as usize)?;
847                unknown_tagged_fields.insert(tag as i32, unknown_value);
848            }
849        }
850        Ok(Self {
851            name,
852            partition_indexes,
853            unknown_tagged_fields,
854        })
855    }
856}
857
858impl Default for OffsetFetchRequestTopics {
859    fn default() -> Self {
860        Self {
861            name: Default::default(),
862            partition_indexes: Default::default(),
863            unknown_tagged_fields: BTreeMap::new(),
864        }
865    }
866}
867
868impl Message for OffsetFetchRequestTopics {
869    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
870    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
871}
872
873impl HeaderVersion for OffsetFetchRequest {
874    fn header_version(version: i16) -> i16 {
875        if version >= 6 {
876            2
877        } else {
878            1
879        }
880    }
881}