kafka_protocol/messages/
offset_fetch_request.rs

1//! OffsetFetchRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetFetchRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 1-9
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetFetchRequest {
24    /// The group to fetch offsets for.
25    ///
26    /// Supported API versions: 1-7
27    pub group_id: super::GroupId,
28
29    /// Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.
30    ///
31    /// Supported API versions: 1-7
32    pub topics: Option<Vec<OffsetFetchRequestTopic>>,
33
34    /// Each group we would like to fetch offsets for.
35    ///
36    /// Supported API versions: 8-9
37    pub groups: Vec<OffsetFetchRequestGroup>,
38
39    /// Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions.
40    ///
41    /// Supported API versions: 7-9
42    pub require_stable: bool,
43
44    /// Other tagged fields
45    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl OffsetFetchRequest {
49    /// Sets `group_id` to the passed value.
50    ///
51    /// The group to fetch offsets for.
52    ///
53    /// Supported API versions: 1-7
54    pub fn with_group_id(mut self, value: super::GroupId) -> Self {
55        self.group_id = value;
56        self
57    }
58    /// Sets `topics` to the passed value.
59    ///
60    /// Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.
61    ///
62    /// Supported API versions: 1-7
63    pub fn with_topics(mut self, value: Option<Vec<OffsetFetchRequestTopic>>) -> Self {
64        self.topics = value;
65        self
66    }
67    /// Sets `groups` to the passed value.
68    ///
69    /// Each group we would like to fetch offsets for.
70    ///
71    /// Supported API versions: 8-9
72    pub fn with_groups(mut self, value: Vec<OffsetFetchRequestGroup>) -> Self {
73        self.groups = value;
74        self
75    }
76    /// Sets `require_stable` to the passed value.
77    ///
78    /// Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions.
79    ///
80    /// Supported API versions: 7-9
81    pub fn with_require_stable(mut self, value: bool) -> Self {
82        self.require_stable = value;
83        self
84    }
85    /// Sets unknown_tagged_fields to the passed value.
86    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87        self.unknown_tagged_fields = value;
88        self
89    }
90    /// Inserts an entry into unknown_tagged_fields.
91    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92        self.unknown_tagged_fields.insert(key, value);
93        self
94    }
95}
96
97#[cfg(feature = "client")]
98impl Encodable for OffsetFetchRequest {
99    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100        if version < 1 || version > 9 {
101            bail!("specified version not supported by this message type");
102        }
103        if version <= 7 {
104            if version >= 6 {
105                types::CompactString.encode(buf, &self.group_id)?;
106            } else {
107                types::String.encode(buf, &self.group_id)?;
108            }
109        } else {
110            if !self.group_id.is_empty() {
111                bail!("A field is set that is not available on the selected protocol version");
112            }
113        }
114        if version <= 7 {
115            if version >= 6 {
116                types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
117            } else {
118                types::Array(types::Struct { version }).encode(buf, &self.topics)?;
119            }
120        } else {
121            if !self
122                .topics
123                .as_ref()
124                .map(|x| x.is_empty())
125                .unwrap_or_default()
126            {
127                bail!("A field is set that is not available on the selected protocol version");
128            }
129        }
130        if version >= 8 {
131            types::CompactArray(types::Struct { version }).encode(buf, &self.groups)?;
132        } else {
133            if !self.groups.is_empty() {
134                bail!("A field is set that is not available on the selected protocol version");
135            }
136        }
137        if version >= 7 {
138            types::Boolean.encode(buf, &self.require_stable)?;
139        } else {
140            if self.require_stable {
141                bail!("A field is set that is not available on the selected protocol version");
142            }
143        }
144        if version >= 6 {
145            let num_tagged_fields = self.unknown_tagged_fields.len();
146            if num_tagged_fields > std::u32::MAX as usize {
147                bail!(
148                    "Too many tagged fields to encode ({} fields)",
149                    num_tagged_fields
150                );
151            }
152            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
153
154            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
155        }
156        Ok(())
157    }
158    fn compute_size(&self, version: i16) -> Result<usize> {
159        let mut total_size = 0;
160        if version <= 7 {
161            if version >= 6 {
162                total_size += types::CompactString.compute_size(&self.group_id)?;
163            } else {
164                total_size += types::String.compute_size(&self.group_id)?;
165            }
166        } else {
167            if !self.group_id.is_empty() {
168                bail!("A field is set that is not available on the selected protocol version");
169            }
170        }
171        if version <= 7 {
172            if version >= 6 {
173                total_size +=
174                    types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
175            } else {
176                total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
177            }
178        } else {
179            if !self
180                .topics
181                .as_ref()
182                .map(|x| x.is_empty())
183                .unwrap_or_default()
184            {
185                bail!("A field is set that is not available on the selected protocol version");
186            }
187        }
188        if version >= 8 {
189            total_size +=
190                types::CompactArray(types::Struct { version }).compute_size(&self.groups)?;
191        } else {
192            if !self.groups.is_empty() {
193                bail!("A field is set that is not available on the selected protocol version");
194            }
195        }
196        if version >= 7 {
197            total_size += types::Boolean.compute_size(&self.require_stable)?;
198        } else {
199            if self.require_stable {
200                bail!("A field is set that is not available on the selected protocol version");
201            }
202        }
203        if version >= 6 {
204            let num_tagged_fields = self.unknown_tagged_fields.len();
205            if num_tagged_fields > std::u32::MAX as usize {
206                bail!(
207                    "Too many tagged fields to encode ({} fields)",
208                    num_tagged_fields
209                );
210            }
211            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
212
213            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
214        }
215        Ok(total_size)
216    }
217}
218
219#[cfg(feature = "broker")]
220impl Decodable for OffsetFetchRequest {
221    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
222        if version < 1 || version > 9 {
223            bail!("specified version not supported by this message type");
224        }
225        let group_id = if version <= 7 {
226            if version >= 6 {
227                types::CompactString.decode(buf)?
228            } else {
229                types::String.decode(buf)?
230            }
231        } else {
232            Default::default()
233        };
234        let topics = if version <= 7 {
235            if version >= 6 {
236                types::CompactArray(types::Struct { version }).decode(buf)?
237            } else {
238                types::Array(types::Struct { version }).decode(buf)?
239            }
240        } else {
241            Some(Default::default())
242        };
243        let groups = if version >= 8 {
244            types::CompactArray(types::Struct { version }).decode(buf)?
245        } else {
246            Default::default()
247        };
248        let require_stable = if version >= 7 {
249            types::Boolean.decode(buf)?
250        } else {
251            false
252        };
253        let mut unknown_tagged_fields = BTreeMap::new();
254        if version >= 6 {
255            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
256            for _ in 0..num_tagged_fields {
257                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
258                let size: u32 = types::UnsignedVarInt.decode(buf)?;
259                let unknown_value = buf.try_get_bytes(size as usize)?;
260                unknown_tagged_fields.insert(tag as i32, unknown_value);
261            }
262        }
263        Ok(Self {
264            group_id,
265            topics,
266            groups,
267            require_stable,
268            unknown_tagged_fields,
269        })
270    }
271}
272
273impl Default for OffsetFetchRequest {
274    fn default() -> Self {
275        Self {
276            group_id: Default::default(),
277            topics: Some(Default::default()),
278            groups: Default::default(),
279            require_stable: false,
280            unknown_tagged_fields: BTreeMap::new(),
281        }
282    }
283}
284
285impl Message for OffsetFetchRequest {
286    const VERSIONS: VersionRange = VersionRange { min: 1, max: 9 };
287    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
288}
289
290/// Valid versions: 1-9
291#[non_exhaustive]
292#[derive(Debug, Clone, PartialEq)]
293pub struct OffsetFetchRequestGroup {
294    /// The group ID.
295    ///
296    /// Supported API versions: 8-9
297    pub group_id: super::GroupId,
298
299    /// The member id.
300    ///
301    /// Supported API versions: 9
302    pub member_id: Option<StrBytes>,
303
304    /// The member epoch if using the new consumer protocol (KIP-848).
305    ///
306    /// Supported API versions: 9
307    pub member_epoch: i32,
308
309    /// Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.
310    ///
311    /// Supported API versions: 8-9
312    pub topics: Option<Vec<OffsetFetchRequestTopics>>,
313
314    /// Other tagged fields
315    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
316}
317
318impl OffsetFetchRequestGroup {
319    /// Sets `group_id` to the passed value.
320    ///
321    /// The group ID.
322    ///
323    /// Supported API versions: 8-9
324    pub fn with_group_id(mut self, value: super::GroupId) -> Self {
325        self.group_id = value;
326        self
327    }
328    /// Sets `member_id` to the passed value.
329    ///
330    /// The member id.
331    ///
332    /// Supported API versions: 9
333    pub fn with_member_id(mut self, value: Option<StrBytes>) -> Self {
334        self.member_id = value;
335        self
336    }
337    /// Sets `member_epoch` to the passed value.
338    ///
339    /// The member epoch if using the new consumer protocol (KIP-848).
340    ///
341    /// Supported API versions: 9
342    pub fn with_member_epoch(mut self, value: i32) -> Self {
343        self.member_epoch = value;
344        self
345    }
346    /// Sets `topics` to the passed value.
347    ///
348    /// Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.
349    ///
350    /// Supported API versions: 8-9
351    pub fn with_topics(mut self, value: Option<Vec<OffsetFetchRequestTopics>>) -> Self {
352        self.topics = value;
353        self
354    }
355    /// Sets unknown_tagged_fields to the passed value.
356    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
357        self.unknown_tagged_fields = value;
358        self
359    }
360    /// Inserts an entry into unknown_tagged_fields.
361    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
362        self.unknown_tagged_fields.insert(key, value);
363        self
364    }
365}
366
367#[cfg(feature = "client")]
368impl Encodable for OffsetFetchRequestGroup {
369    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
370        if version < 1 || version > 9 {
371            bail!("specified version not supported by this message type");
372        }
373        if version >= 8 {
374            types::CompactString.encode(buf, &self.group_id)?;
375        } else {
376            if !self.group_id.is_empty() {
377                bail!("A field is set that is not available on the selected protocol version");
378            }
379        }
380        if version >= 9 {
381            types::CompactString.encode(buf, &self.member_id)?;
382        }
383        if version >= 9 {
384            types::Int32.encode(buf, &self.member_epoch)?;
385        }
386        if version >= 8 {
387            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
388        } else {
389            if !self
390                .topics
391                .as_ref()
392                .map(|x| x.is_empty())
393                .unwrap_or_default()
394            {
395                bail!("A field is set that is not available on the selected protocol version");
396            }
397        }
398        if version >= 6 {
399            let num_tagged_fields = self.unknown_tagged_fields.len();
400            if num_tagged_fields > std::u32::MAX as usize {
401                bail!(
402                    "Too many tagged fields to encode ({} fields)",
403                    num_tagged_fields
404                );
405            }
406            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
407
408            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
409        }
410        Ok(())
411    }
412    fn compute_size(&self, version: i16) -> Result<usize> {
413        let mut total_size = 0;
414        if version >= 8 {
415            total_size += types::CompactString.compute_size(&self.group_id)?;
416        } else {
417            if !self.group_id.is_empty() {
418                bail!("A field is set that is not available on the selected protocol version");
419            }
420        }
421        if version >= 9 {
422            total_size += types::CompactString.compute_size(&self.member_id)?;
423        }
424        if version >= 9 {
425            total_size += types::Int32.compute_size(&self.member_epoch)?;
426        }
427        if version >= 8 {
428            total_size +=
429                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
430        } else {
431            if !self
432                .topics
433                .as_ref()
434                .map(|x| x.is_empty())
435                .unwrap_or_default()
436            {
437                bail!("A field is set that is not available on the selected protocol version");
438            }
439        }
440        if version >= 6 {
441            let num_tagged_fields = self.unknown_tagged_fields.len();
442            if num_tagged_fields > std::u32::MAX as usize {
443                bail!(
444                    "Too many tagged fields to encode ({} fields)",
445                    num_tagged_fields
446                );
447            }
448            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
449
450            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
451        }
452        Ok(total_size)
453    }
454}
455
456#[cfg(feature = "broker")]
457impl Decodable for OffsetFetchRequestGroup {
458    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
459        if version < 1 || version > 9 {
460            bail!("specified version not supported by this message type");
461        }
462        let group_id = if version >= 8 {
463            types::CompactString.decode(buf)?
464        } else {
465            Default::default()
466        };
467        let member_id = if version >= 9 {
468            types::CompactString.decode(buf)?
469        } else {
470            None
471        };
472        let member_epoch = if version >= 9 {
473            types::Int32.decode(buf)?
474        } else {
475            -1
476        };
477        let topics = if version >= 8 {
478            types::CompactArray(types::Struct { version }).decode(buf)?
479        } else {
480            Some(Default::default())
481        };
482        let mut unknown_tagged_fields = BTreeMap::new();
483        if version >= 6 {
484            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
485            for _ in 0..num_tagged_fields {
486                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
487                let size: u32 = types::UnsignedVarInt.decode(buf)?;
488                let unknown_value = buf.try_get_bytes(size as usize)?;
489                unknown_tagged_fields.insert(tag as i32, unknown_value);
490            }
491        }
492        Ok(Self {
493            group_id,
494            member_id,
495            member_epoch,
496            topics,
497            unknown_tagged_fields,
498        })
499    }
500}
501
502impl Default for OffsetFetchRequestGroup {
503    fn default() -> Self {
504        Self {
505            group_id: Default::default(),
506            member_id: None,
507            member_epoch: -1,
508            topics: Some(Default::default()),
509            unknown_tagged_fields: BTreeMap::new(),
510        }
511    }
512}
513
514impl Message for OffsetFetchRequestGroup {
515    const VERSIONS: VersionRange = VersionRange { min: 1, max: 9 };
516    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
517}
518
519/// Valid versions: 1-9
520#[non_exhaustive]
521#[derive(Debug, Clone, PartialEq)]
522pub struct OffsetFetchRequestTopic {
523    /// The topic name.
524    ///
525    /// Supported API versions: 1-7
526    pub name: super::TopicName,
527
528    /// The partition indexes we would like to fetch offsets for.
529    ///
530    /// Supported API versions: 1-7
531    pub partition_indexes: Vec<i32>,
532
533    /// Other tagged fields
534    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
535}
536
537impl OffsetFetchRequestTopic {
538    /// Sets `name` to the passed value.
539    ///
540    /// The topic name.
541    ///
542    /// Supported API versions: 1-7
543    pub fn with_name(mut self, value: super::TopicName) -> Self {
544        self.name = value;
545        self
546    }
547    /// Sets `partition_indexes` to the passed value.
548    ///
549    /// The partition indexes we would like to fetch offsets for.
550    ///
551    /// Supported API versions: 1-7
552    pub fn with_partition_indexes(mut self, value: Vec<i32>) -> Self {
553        self.partition_indexes = value;
554        self
555    }
556    /// Sets unknown_tagged_fields to the passed value.
557    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
558        self.unknown_tagged_fields = value;
559        self
560    }
561    /// Inserts an entry into unknown_tagged_fields.
562    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
563        self.unknown_tagged_fields.insert(key, value);
564        self
565    }
566}
567
568#[cfg(feature = "client")]
569impl Encodable for OffsetFetchRequestTopic {
570    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
571        if version < 1 || version > 9 {
572            bail!("specified version not supported by this message type");
573        }
574        if version <= 7 {
575            if version >= 6 {
576                types::CompactString.encode(buf, &self.name)?;
577            } else {
578                types::String.encode(buf, &self.name)?;
579            }
580        } else {
581            if !self.name.is_empty() {
582                bail!("A field is set that is not available on the selected protocol version");
583            }
584        }
585        if version <= 7 {
586            if version >= 6 {
587                types::CompactArray(types::Int32).encode(buf, &self.partition_indexes)?;
588            } else {
589                types::Array(types::Int32).encode(buf, &self.partition_indexes)?;
590            }
591        } else {
592            if !self.partition_indexes.is_empty() {
593                bail!("A field is set that is not available on the selected protocol version");
594            }
595        }
596        if version >= 6 {
597            let num_tagged_fields = self.unknown_tagged_fields.len();
598            if num_tagged_fields > std::u32::MAX as usize {
599                bail!(
600                    "Too many tagged fields to encode ({} fields)",
601                    num_tagged_fields
602                );
603            }
604            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
605
606            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
607        }
608        Ok(())
609    }
610    fn compute_size(&self, version: i16) -> Result<usize> {
611        let mut total_size = 0;
612        if version <= 7 {
613            if version >= 6 {
614                total_size += types::CompactString.compute_size(&self.name)?;
615            } else {
616                total_size += types::String.compute_size(&self.name)?;
617            }
618        } else {
619            if !self.name.is_empty() {
620                bail!("A field is set that is not available on the selected protocol version");
621            }
622        }
623        if version <= 7 {
624            if version >= 6 {
625                total_size +=
626                    types::CompactArray(types::Int32).compute_size(&self.partition_indexes)?;
627            } else {
628                total_size += types::Array(types::Int32).compute_size(&self.partition_indexes)?;
629            }
630        } else {
631            if !self.partition_indexes.is_empty() {
632                bail!("A field is set that is not available on the selected protocol version");
633            }
634        }
635        if version >= 6 {
636            let num_tagged_fields = self.unknown_tagged_fields.len();
637            if num_tagged_fields > std::u32::MAX as usize {
638                bail!(
639                    "Too many tagged fields to encode ({} fields)",
640                    num_tagged_fields
641                );
642            }
643            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
644
645            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
646        }
647        Ok(total_size)
648    }
649}
650
651#[cfg(feature = "broker")]
652impl Decodable for OffsetFetchRequestTopic {
653    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
654        if version < 1 || version > 9 {
655            bail!("specified version not supported by this message type");
656        }
657        let name = if version <= 7 {
658            if version >= 6 {
659                types::CompactString.decode(buf)?
660            } else {
661                types::String.decode(buf)?
662            }
663        } else {
664            Default::default()
665        };
666        let partition_indexes = if version <= 7 {
667            if version >= 6 {
668                types::CompactArray(types::Int32).decode(buf)?
669            } else {
670                types::Array(types::Int32).decode(buf)?
671            }
672        } else {
673            Default::default()
674        };
675        let mut unknown_tagged_fields = BTreeMap::new();
676        if version >= 6 {
677            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
678            for _ in 0..num_tagged_fields {
679                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
680                let size: u32 = types::UnsignedVarInt.decode(buf)?;
681                let unknown_value = buf.try_get_bytes(size as usize)?;
682                unknown_tagged_fields.insert(tag as i32, unknown_value);
683            }
684        }
685        Ok(Self {
686            name,
687            partition_indexes,
688            unknown_tagged_fields,
689        })
690    }
691}
692
693impl Default for OffsetFetchRequestTopic {
694    fn default() -> Self {
695        Self {
696            name: Default::default(),
697            partition_indexes: Default::default(),
698            unknown_tagged_fields: BTreeMap::new(),
699        }
700    }
701}
702
703impl Message for OffsetFetchRequestTopic {
704    const VERSIONS: VersionRange = VersionRange { min: 1, max: 9 };
705    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
706}
707
708/// Valid versions: 1-9
709#[non_exhaustive]
710#[derive(Debug, Clone, PartialEq)]
711pub struct OffsetFetchRequestTopics {
712    /// The topic name.
713    ///
714    /// Supported API versions: 8-9
715    pub name: super::TopicName,
716
717    /// The topic ID.
718    ///
719    /// Supported API versions: none
720    pub topic_id: Uuid,
721
722    /// The partition indexes we would like to fetch offsets for.
723    ///
724    /// Supported API versions: 8-9
725    pub partition_indexes: Vec<i32>,
726
727    /// Other tagged fields
728    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
729}
730
731impl OffsetFetchRequestTopics {
732    /// Sets `name` to the passed value.
733    ///
734    /// The topic name.
735    ///
736    /// Supported API versions: 8-9
737    pub fn with_name(mut self, value: super::TopicName) -> Self {
738        self.name = value;
739        self
740    }
741    /// Sets `topic_id` to the passed value.
742    ///
743    /// The topic ID.
744    ///
745    /// Supported API versions: none
746    pub fn with_topic_id(mut self, value: Uuid) -> Self {
747        self.topic_id = value;
748        self
749    }
750    /// Sets `partition_indexes` to the passed value.
751    ///
752    /// The partition indexes we would like to fetch offsets for.
753    ///
754    /// Supported API versions: 8-9
755    pub fn with_partition_indexes(mut self, value: Vec<i32>) -> Self {
756        self.partition_indexes = value;
757        self
758    }
759    /// Sets unknown_tagged_fields to the passed value.
760    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
761        self.unknown_tagged_fields = value;
762        self
763    }
764    /// Inserts an entry into unknown_tagged_fields.
765    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
766        self.unknown_tagged_fields.insert(key, value);
767        self
768    }
769}
770
771#[cfg(feature = "client")]
772impl Encodable for OffsetFetchRequestTopics {
773    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
774        if version < 1 || version > 9 {
775            bail!("specified version not supported by this message type");
776        }
777        if version >= 8 {
778            types::CompactString.encode(buf, &self.name)?;
779        }
780        if version >= 8 {
781            types::CompactArray(types::Int32).encode(buf, &self.partition_indexes)?;
782        } else {
783            if !self.partition_indexes.is_empty() {
784                bail!("A field is set that is not available on the selected protocol version");
785            }
786        }
787        if version >= 6 {
788            let num_tagged_fields = self.unknown_tagged_fields.len();
789            if num_tagged_fields > std::u32::MAX as usize {
790                bail!(
791                    "Too many tagged fields to encode ({} fields)",
792                    num_tagged_fields
793                );
794            }
795            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
796
797            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
798        }
799        Ok(())
800    }
801    fn compute_size(&self, version: i16) -> Result<usize> {
802        let mut total_size = 0;
803        if version >= 8 {
804            total_size += types::CompactString.compute_size(&self.name)?;
805        }
806        if version >= 8 {
807            total_size +=
808                types::CompactArray(types::Int32).compute_size(&self.partition_indexes)?;
809        } else {
810            if !self.partition_indexes.is_empty() {
811                bail!("A field is set that is not available on the selected protocol version");
812            }
813        }
814        if version >= 6 {
815            let num_tagged_fields = self.unknown_tagged_fields.len();
816            if num_tagged_fields > std::u32::MAX as usize {
817                bail!(
818                    "Too many tagged fields to encode ({} fields)",
819                    num_tagged_fields
820                );
821            }
822            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
823
824            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
825        }
826        Ok(total_size)
827    }
828}
829
830#[cfg(feature = "broker")]
831impl Decodable for OffsetFetchRequestTopics {
832    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
833        if version < 1 || version > 9 {
834            bail!("specified version not supported by this message type");
835        }
836        let name = if version >= 8 {
837            types::CompactString.decode(buf)?
838        } else {
839            Default::default()
840        };
841        let topic_id = Uuid::nil();
842        let partition_indexes = if version >= 8 {
843            types::CompactArray(types::Int32).decode(buf)?
844        } else {
845            Default::default()
846        };
847        let mut unknown_tagged_fields = BTreeMap::new();
848        if version >= 6 {
849            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
850            for _ in 0..num_tagged_fields {
851                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
852                let size: u32 = types::UnsignedVarInt.decode(buf)?;
853                let unknown_value = buf.try_get_bytes(size as usize)?;
854                unknown_tagged_fields.insert(tag as i32, unknown_value);
855            }
856        }
857        Ok(Self {
858            name,
859            topic_id,
860            partition_indexes,
861            unknown_tagged_fields,
862        })
863    }
864}
865
866impl Default for OffsetFetchRequestTopics {
867    fn default() -> Self {
868        Self {
869            name: Default::default(),
870            topic_id: Uuid::nil(),
871            partition_indexes: Default::default(),
872            unknown_tagged_fields: BTreeMap::new(),
873        }
874    }
875}
876
877impl Message for OffsetFetchRequestTopics {
878    const VERSIONS: VersionRange = VersionRange { min: 1, max: 9 };
879    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
880}
881
882impl HeaderVersion for OffsetFetchRequest {
883    fn header_version(version: i16) -> i16 {
884        if version >= 6 {
885            2
886        } else {
887            1
888        }
889    }
890}