kafka_protocol/messages/
offset_commit_request.rs

1//! OffsetCommitRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 2-9
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetCommitRequest {
24    /// The unique group identifier.
25    ///
26    /// Supported API versions: 2-9
27    pub group_id: super::GroupId,
28
29    /// The generation of the group if using the classic group protocol or the member epoch if using the consumer protocol.
30    ///
31    /// Supported API versions: 2-9
32    pub generation_id_or_member_epoch: i32,
33
34    /// The member ID assigned by the group coordinator.
35    ///
36    /// Supported API versions: 2-9
37    pub member_id: StrBytes,
38
39    /// The unique identifier of the consumer instance provided by end user.
40    ///
41    /// Supported API versions: 7-9
42    pub group_instance_id: Option<StrBytes>,
43
44    /// The time period in ms to retain the offset.
45    ///
46    /// Supported API versions: 2-4
47    pub retention_time_ms: i64,
48
49    /// The topics to commit offsets for.
50    ///
51    /// Supported API versions: 2-9
52    pub topics: Vec<OffsetCommitRequestTopic>,
53
54    /// Other tagged fields
55    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl OffsetCommitRequest {
59    /// Sets `group_id` to the passed value.
60    ///
61    /// The unique group identifier.
62    ///
63    /// Supported API versions: 2-9
64    pub fn with_group_id(mut self, value: super::GroupId) -> Self {
65        self.group_id = value;
66        self
67    }
68    /// Sets `generation_id_or_member_epoch` to the passed value.
69    ///
70    /// The generation of the group if using the classic group protocol or the member epoch if using the consumer protocol.
71    ///
72    /// Supported API versions: 2-9
73    pub fn with_generation_id_or_member_epoch(mut self, value: i32) -> Self {
74        self.generation_id_or_member_epoch = value;
75        self
76    }
77    /// Sets `member_id` to the passed value.
78    ///
79    /// The member ID assigned by the group coordinator.
80    ///
81    /// Supported API versions: 2-9
82    pub fn with_member_id(mut self, value: StrBytes) -> Self {
83        self.member_id = value;
84        self
85    }
86    /// Sets `group_instance_id` to the passed value.
87    ///
88    /// The unique identifier of the consumer instance provided by end user.
89    ///
90    /// Supported API versions: 7-9
91    pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
92        self.group_instance_id = value;
93        self
94    }
95    /// Sets `retention_time_ms` to the passed value.
96    ///
97    /// The time period in ms to retain the offset.
98    ///
99    /// Supported API versions: 2-4
100    pub fn with_retention_time_ms(mut self, value: i64) -> Self {
101        self.retention_time_ms = value;
102        self
103    }
104    /// Sets `topics` to the passed value.
105    ///
106    /// The topics to commit offsets for.
107    ///
108    /// Supported API versions: 2-9
109    pub fn with_topics(mut self, value: Vec<OffsetCommitRequestTopic>) -> Self {
110        self.topics = value;
111        self
112    }
113    /// Sets unknown_tagged_fields to the passed value.
114    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115        self.unknown_tagged_fields = value;
116        self
117    }
118    /// Inserts an entry into unknown_tagged_fields.
119    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120        self.unknown_tagged_fields.insert(key, value);
121        self
122    }
123}
124
125#[cfg(feature = "client")]
126impl Encodable for OffsetCommitRequest {
127    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128        if version < 2 || version > 9 {
129            bail!("specified version not supported by this message type");
130        }
131        if version >= 8 {
132            types::CompactString.encode(buf, &self.group_id)?;
133        } else {
134            types::String.encode(buf, &self.group_id)?;
135        }
136        types::Int32.encode(buf, &self.generation_id_or_member_epoch)?;
137        if version >= 8 {
138            types::CompactString.encode(buf, &self.member_id)?;
139        } else {
140            types::String.encode(buf, &self.member_id)?;
141        }
142        if version >= 7 {
143            if version >= 8 {
144                types::CompactString.encode(buf, &self.group_instance_id)?;
145            } else {
146                types::String.encode(buf, &self.group_instance_id)?;
147            }
148        } else {
149            if !self.group_instance_id.is_none() {
150                bail!("A field is set that is not available on the selected protocol version");
151            }
152        }
153        if version <= 4 {
154            types::Int64.encode(buf, &self.retention_time_ms)?;
155        }
156        if version >= 8 {
157            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
158        } else {
159            types::Array(types::Struct { version }).encode(buf, &self.topics)?;
160        }
161        if version >= 8 {
162            let num_tagged_fields = self.unknown_tagged_fields.len();
163            if num_tagged_fields > std::u32::MAX as usize {
164                bail!(
165                    "Too many tagged fields to encode ({} fields)",
166                    num_tagged_fields
167                );
168            }
169            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
170
171            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
172        }
173        Ok(())
174    }
175    fn compute_size(&self, version: i16) -> Result<usize> {
176        let mut total_size = 0;
177        if version >= 8 {
178            total_size += types::CompactString.compute_size(&self.group_id)?;
179        } else {
180            total_size += types::String.compute_size(&self.group_id)?;
181        }
182        total_size += types::Int32.compute_size(&self.generation_id_or_member_epoch)?;
183        if version >= 8 {
184            total_size += types::CompactString.compute_size(&self.member_id)?;
185        } else {
186            total_size += types::String.compute_size(&self.member_id)?;
187        }
188        if version >= 7 {
189            if version >= 8 {
190                total_size += types::CompactString.compute_size(&self.group_instance_id)?;
191            } else {
192                total_size += types::String.compute_size(&self.group_instance_id)?;
193            }
194        } else {
195            if !self.group_instance_id.is_none() {
196                bail!("A field is set that is not available on the selected protocol version");
197            }
198        }
199        if version <= 4 {
200            total_size += types::Int64.compute_size(&self.retention_time_ms)?;
201        }
202        if version >= 8 {
203            total_size +=
204                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
205        } else {
206            total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
207        }
208        if version >= 8 {
209            let num_tagged_fields = self.unknown_tagged_fields.len();
210            if num_tagged_fields > std::u32::MAX as usize {
211                bail!(
212                    "Too many tagged fields to encode ({} fields)",
213                    num_tagged_fields
214                );
215            }
216            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
217
218            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
219        }
220        Ok(total_size)
221    }
222}
223
224#[cfg(feature = "broker")]
225impl Decodable for OffsetCommitRequest {
226    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
227        if version < 2 || version > 9 {
228            bail!("specified version not supported by this message type");
229        }
230        let group_id = if version >= 8 {
231            types::CompactString.decode(buf)?
232        } else {
233            types::String.decode(buf)?
234        };
235        let generation_id_or_member_epoch = types::Int32.decode(buf)?;
236        let member_id = if version >= 8 {
237            types::CompactString.decode(buf)?
238        } else {
239            types::String.decode(buf)?
240        };
241        let group_instance_id = if version >= 7 {
242            if version >= 8 {
243                types::CompactString.decode(buf)?
244            } else {
245                types::String.decode(buf)?
246            }
247        } else {
248            None
249        };
250        let retention_time_ms = if version <= 4 {
251            types::Int64.decode(buf)?
252        } else {
253            -1
254        };
255        let topics = if version >= 8 {
256            types::CompactArray(types::Struct { version }).decode(buf)?
257        } else {
258            types::Array(types::Struct { version }).decode(buf)?
259        };
260        let mut unknown_tagged_fields = BTreeMap::new();
261        if version >= 8 {
262            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
263            for _ in 0..num_tagged_fields {
264                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
265                let size: u32 = types::UnsignedVarInt.decode(buf)?;
266                let unknown_value = buf.try_get_bytes(size as usize)?;
267                unknown_tagged_fields.insert(tag as i32, unknown_value);
268            }
269        }
270        Ok(Self {
271            group_id,
272            generation_id_or_member_epoch,
273            member_id,
274            group_instance_id,
275            retention_time_ms,
276            topics,
277            unknown_tagged_fields,
278        })
279    }
280}
281
282impl Default for OffsetCommitRequest {
283    fn default() -> Self {
284        Self {
285            group_id: Default::default(),
286            generation_id_or_member_epoch: -1,
287            member_id: Default::default(),
288            group_instance_id: None,
289            retention_time_ms: -1,
290            topics: Default::default(),
291            unknown_tagged_fields: BTreeMap::new(),
292        }
293    }
294}
295
296impl Message for OffsetCommitRequest {
297    const VERSIONS: VersionRange = VersionRange { min: 2, max: 9 };
298    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
299}
300
301/// Valid versions: 2-9
302#[non_exhaustive]
303#[derive(Debug, Clone, PartialEq)]
304pub struct OffsetCommitRequestPartition {
305    /// The partition index.
306    ///
307    /// Supported API versions: 2-9
308    pub partition_index: i32,
309
310    /// The message offset to be committed.
311    ///
312    /// Supported API versions: 2-9
313    pub committed_offset: i64,
314
315    /// The leader epoch of this partition.
316    ///
317    /// Supported API versions: 6-9
318    pub committed_leader_epoch: i32,
319
320    /// Any associated metadata the client wants to keep.
321    ///
322    /// Supported API versions: 2-9
323    pub committed_metadata: Option<StrBytes>,
324
325    /// Other tagged fields
326    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
327}
328
329impl OffsetCommitRequestPartition {
330    /// Sets `partition_index` to the passed value.
331    ///
332    /// The partition index.
333    ///
334    /// Supported API versions: 2-9
335    pub fn with_partition_index(mut self, value: i32) -> Self {
336        self.partition_index = value;
337        self
338    }
339    /// Sets `committed_offset` to the passed value.
340    ///
341    /// The message offset to be committed.
342    ///
343    /// Supported API versions: 2-9
344    pub fn with_committed_offset(mut self, value: i64) -> Self {
345        self.committed_offset = value;
346        self
347    }
348    /// Sets `committed_leader_epoch` to the passed value.
349    ///
350    /// The leader epoch of this partition.
351    ///
352    /// Supported API versions: 6-9
353    pub fn with_committed_leader_epoch(mut self, value: i32) -> Self {
354        self.committed_leader_epoch = value;
355        self
356    }
357    /// Sets `committed_metadata` to the passed value.
358    ///
359    /// Any associated metadata the client wants to keep.
360    ///
361    /// Supported API versions: 2-9
362    pub fn with_committed_metadata(mut self, value: Option<StrBytes>) -> Self {
363        self.committed_metadata = value;
364        self
365    }
366    /// Sets unknown_tagged_fields to the passed value.
367    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
368        self.unknown_tagged_fields = value;
369        self
370    }
371    /// Inserts an entry into unknown_tagged_fields.
372    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
373        self.unknown_tagged_fields.insert(key, value);
374        self
375    }
376}
377
378#[cfg(feature = "client")]
379impl Encodable for OffsetCommitRequestPartition {
380    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
381        if version < 2 || version > 9 {
382            bail!("specified version not supported by this message type");
383        }
384        types::Int32.encode(buf, &self.partition_index)?;
385        types::Int64.encode(buf, &self.committed_offset)?;
386        if version >= 6 {
387            types::Int32.encode(buf, &self.committed_leader_epoch)?;
388        }
389        if version >= 8 {
390            types::CompactString.encode(buf, &self.committed_metadata)?;
391        } else {
392            types::String.encode(buf, &self.committed_metadata)?;
393        }
394        if version >= 8 {
395            let num_tagged_fields = self.unknown_tagged_fields.len();
396            if num_tagged_fields > std::u32::MAX as usize {
397                bail!(
398                    "Too many tagged fields to encode ({} fields)",
399                    num_tagged_fields
400                );
401            }
402            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
403
404            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
405        }
406        Ok(())
407    }
408    fn compute_size(&self, version: i16) -> Result<usize> {
409        let mut total_size = 0;
410        total_size += types::Int32.compute_size(&self.partition_index)?;
411        total_size += types::Int64.compute_size(&self.committed_offset)?;
412        if version >= 6 {
413            total_size += types::Int32.compute_size(&self.committed_leader_epoch)?;
414        }
415        if version >= 8 {
416            total_size += types::CompactString.compute_size(&self.committed_metadata)?;
417        } else {
418            total_size += types::String.compute_size(&self.committed_metadata)?;
419        }
420        if version >= 8 {
421            let num_tagged_fields = self.unknown_tagged_fields.len();
422            if num_tagged_fields > std::u32::MAX as usize {
423                bail!(
424                    "Too many tagged fields to encode ({} fields)",
425                    num_tagged_fields
426                );
427            }
428            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
429
430            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
431        }
432        Ok(total_size)
433    }
434}
435
436#[cfg(feature = "broker")]
437impl Decodable for OffsetCommitRequestPartition {
438    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
439        if version < 2 || version > 9 {
440            bail!("specified version not supported by this message type");
441        }
442        let partition_index = types::Int32.decode(buf)?;
443        let committed_offset = types::Int64.decode(buf)?;
444        let committed_leader_epoch = if version >= 6 {
445            types::Int32.decode(buf)?
446        } else {
447            -1
448        };
449        let committed_metadata = if version >= 8 {
450            types::CompactString.decode(buf)?
451        } else {
452            types::String.decode(buf)?
453        };
454        let mut unknown_tagged_fields = BTreeMap::new();
455        if version >= 8 {
456            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
457            for _ in 0..num_tagged_fields {
458                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
459                let size: u32 = types::UnsignedVarInt.decode(buf)?;
460                let unknown_value = buf.try_get_bytes(size as usize)?;
461                unknown_tagged_fields.insert(tag as i32, unknown_value);
462            }
463        }
464        Ok(Self {
465            partition_index,
466            committed_offset,
467            committed_leader_epoch,
468            committed_metadata,
469            unknown_tagged_fields,
470        })
471    }
472}
473
474impl Default for OffsetCommitRequestPartition {
475    fn default() -> Self {
476        Self {
477            partition_index: 0,
478            committed_offset: 0,
479            committed_leader_epoch: -1,
480            committed_metadata: Some(Default::default()),
481            unknown_tagged_fields: BTreeMap::new(),
482        }
483    }
484}
485
486impl Message for OffsetCommitRequestPartition {
487    const VERSIONS: VersionRange = VersionRange { min: 2, max: 9 };
488    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
489}
490
491/// Valid versions: 2-9
492#[non_exhaustive]
493#[derive(Debug, Clone, PartialEq)]
494pub struct OffsetCommitRequestTopic {
495    /// The topic name.
496    ///
497    /// Supported API versions: 2-9
498    pub name: super::TopicName,
499
500    /// The topic ID.
501    ///
502    /// Supported API versions: none
503    pub topic_id: Uuid,
504
505    /// Each partition to commit offsets for.
506    ///
507    /// Supported API versions: 2-9
508    pub partitions: Vec<OffsetCommitRequestPartition>,
509
510    /// Other tagged fields
511    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
512}
513
514impl OffsetCommitRequestTopic {
515    /// Sets `name` to the passed value.
516    ///
517    /// The topic name.
518    ///
519    /// Supported API versions: 2-9
520    pub fn with_name(mut self, value: super::TopicName) -> Self {
521        self.name = value;
522        self
523    }
524    /// Sets `topic_id` to the passed value.
525    ///
526    /// The topic ID.
527    ///
528    /// Supported API versions: none
529    pub fn with_topic_id(mut self, value: Uuid) -> Self {
530        self.topic_id = value;
531        self
532    }
533    /// Sets `partitions` to the passed value.
534    ///
535    /// Each partition to commit offsets for.
536    ///
537    /// Supported API versions: 2-9
538    pub fn with_partitions(mut self, value: Vec<OffsetCommitRequestPartition>) -> Self {
539        self.partitions = value;
540        self
541    }
542    /// Sets unknown_tagged_fields to the passed value.
543    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
544        self.unknown_tagged_fields = value;
545        self
546    }
547    /// Inserts an entry into unknown_tagged_fields.
548    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
549        self.unknown_tagged_fields.insert(key, value);
550        self
551    }
552}
553
554#[cfg(feature = "client")]
555impl Encodable for OffsetCommitRequestTopic {
556    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
557        if version < 2 || version > 9 {
558            bail!("specified version not supported by this message type");
559        }
560        if version >= 8 {
561            types::CompactString.encode(buf, &self.name)?;
562        } else {
563            types::String.encode(buf, &self.name)?;
564        }
565        if version >= 8 {
566            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
567        } else {
568            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
569        }
570        if version >= 8 {
571            let num_tagged_fields = self.unknown_tagged_fields.len();
572            if num_tagged_fields > std::u32::MAX as usize {
573                bail!(
574                    "Too many tagged fields to encode ({} fields)",
575                    num_tagged_fields
576                );
577            }
578            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
579
580            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
581        }
582        Ok(())
583    }
584    fn compute_size(&self, version: i16) -> Result<usize> {
585        let mut total_size = 0;
586        if version >= 8 {
587            total_size += types::CompactString.compute_size(&self.name)?;
588        } else {
589            total_size += types::String.compute_size(&self.name)?;
590        }
591        if version >= 8 {
592            total_size +=
593                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
594        } else {
595            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
596        }
597        if version >= 8 {
598            let num_tagged_fields = self.unknown_tagged_fields.len();
599            if num_tagged_fields > std::u32::MAX as usize {
600                bail!(
601                    "Too many tagged fields to encode ({} fields)",
602                    num_tagged_fields
603                );
604            }
605            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
606
607            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
608        }
609        Ok(total_size)
610    }
611}
612
613#[cfg(feature = "broker")]
614impl Decodable for OffsetCommitRequestTopic {
615    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
616        if version < 2 || version > 9 {
617            bail!("specified version not supported by this message type");
618        }
619        let name = if version >= 8 {
620            types::CompactString.decode(buf)?
621        } else {
622            types::String.decode(buf)?
623        };
624        let topic_id = Uuid::nil();
625        let partitions = if version >= 8 {
626            types::CompactArray(types::Struct { version }).decode(buf)?
627        } else {
628            types::Array(types::Struct { version }).decode(buf)?
629        };
630        let mut unknown_tagged_fields = BTreeMap::new();
631        if version >= 8 {
632            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
633            for _ in 0..num_tagged_fields {
634                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
635                let size: u32 = types::UnsignedVarInt.decode(buf)?;
636                let unknown_value = buf.try_get_bytes(size as usize)?;
637                unknown_tagged_fields.insert(tag as i32, unknown_value);
638            }
639        }
640        Ok(Self {
641            name,
642            topic_id,
643            partitions,
644            unknown_tagged_fields,
645        })
646    }
647}
648
649impl Default for OffsetCommitRequestTopic {
650    fn default() -> Self {
651        Self {
652            name: Default::default(),
653            topic_id: Uuid::nil(),
654            partitions: Default::default(),
655            unknown_tagged_fields: BTreeMap::new(),
656        }
657    }
658}
659
660impl Message for OffsetCommitRequestTopic {
661    const VERSIONS: VersionRange = VersionRange { min: 2, max: 9 };
662    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
663}
664
665impl HeaderVersion for OffsetCommitRequest {
666    fn header_version(version: i16) -> i16 {
667        if version >= 8 {
668            2
669        } else {
670            1
671        }
672    }
673}