kafka_protocol/messages/
offset_commit_request.rs

1//! OffsetCommitRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-9
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetCommitRequest {
24    /// The unique group identifier.
25    ///
26    /// Supported API versions: 0-9
27    pub group_id: super::GroupId,
28
29    /// The generation of the group if using the classic group protocol or the member epoch if using the consumer protocol.
30    ///
31    /// Supported API versions: 1-9
32    pub generation_id_or_member_epoch: i32,
33
34    /// The member ID assigned by the group coordinator.
35    ///
36    /// Supported API versions: 1-9
37    pub member_id: StrBytes,
38
39    /// The unique identifier of the consumer instance provided by end user.
40    ///
41    /// Supported API versions: 7-9
42    pub group_instance_id: Option<StrBytes>,
43
44    /// The time period in ms to retain the offset.
45    ///
46    /// Supported API versions: 2-4
47    pub retention_time_ms: i64,
48
49    /// The topics to commit offsets for.
50    ///
51    /// Supported API versions: 0-9
52    pub topics: Vec<OffsetCommitRequestTopic>,
53
54    /// Other tagged fields
55    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl OffsetCommitRequest {
59    /// Sets `group_id` to the passed value.
60    ///
61    /// The unique group identifier.
62    ///
63    /// Supported API versions: 0-9
64    pub fn with_group_id(mut self, value: super::GroupId) -> Self {
65        self.group_id = value;
66        self
67    }
68    /// Sets `generation_id_or_member_epoch` to the passed value.
69    ///
70    /// The generation of the group if using the classic group protocol or the member epoch if using the consumer protocol.
71    ///
72    /// Supported API versions: 1-9
73    pub fn with_generation_id_or_member_epoch(mut self, value: i32) -> Self {
74        self.generation_id_or_member_epoch = value;
75        self
76    }
77    /// Sets `member_id` to the passed value.
78    ///
79    /// The member ID assigned by the group coordinator.
80    ///
81    /// Supported API versions: 1-9
82    pub fn with_member_id(mut self, value: StrBytes) -> Self {
83        self.member_id = value;
84        self
85    }
86    /// Sets `group_instance_id` to the passed value.
87    ///
88    /// The unique identifier of the consumer instance provided by end user.
89    ///
90    /// Supported API versions: 7-9
91    pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
92        self.group_instance_id = value;
93        self
94    }
95    /// Sets `retention_time_ms` to the passed value.
96    ///
97    /// The time period in ms to retain the offset.
98    ///
99    /// Supported API versions: 2-4
100    pub fn with_retention_time_ms(mut self, value: i64) -> Self {
101        self.retention_time_ms = value;
102        self
103    }
104    /// Sets `topics` to the passed value.
105    ///
106    /// The topics to commit offsets for.
107    ///
108    /// Supported API versions: 0-9
109    pub fn with_topics(mut self, value: Vec<OffsetCommitRequestTopic>) -> Self {
110        self.topics = value;
111        self
112    }
113    /// Sets unknown_tagged_fields to the passed value.
114    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115        self.unknown_tagged_fields = value;
116        self
117    }
118    /// Inserts an entry into unknown_tagged_fields.
119    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120        self.unknown_tagged_fields.insert(key, value);
121        self
122    }
123}
124
125#[cfg(feature = "client")]
126impl Encodable for OffsetCommitRequest {
127    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128        if version >= 8 {
129            types::CompactString.encode(buf, &self.group_id)?;
130        } else {
131            types::String.encode(buf, &self.group_id)?;
132        }
133        if version >= 1 {
134            types::Int32.encode(buf, &self.generation_id_or_member_epoch)?;
135        }
136        if version >= 1 {
137            if version >= 8 {
138                types::CompactString.encode(buf, &self.member_id)?;
139            } else {
140                types::String.encode(buf, &self.member_id)?;
141            }
142        }
143        if version >= 7 {
144            if version >= 8 {
145                types::CompactString.encode(buf, &self.group_instance_id)?;
146            } else {
147                types::String.encode(buf, &self.group_instance_id)?;
148            }
149        } else {
150            if !self.group_instance_id.is_none() {
151                bail!("A field is set that is not available on the selected protocol version");
152            }
153        }
154        if version >= 2 && version <= 4 {
155            types::Int64.encode(buf, &self.retention_time_ms)?;
156        }
157        if version >= 8 {
158            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
159        } else {
160            types::Array(types::Struct { version }).encode(buf, &self.topics)?;
161        }
162        if version >= 8 {
163            let num_tagged_fields = self.unknown_tagged_fields.len();
164            if num_tagged_fields > std::u32::MAX as usize {
165                bail!(
166                    "Too many tagged fields to encode ({} fields)",
167                    num_tagged_fields
168                );
169            }
170            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
171
172            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
173        }
174        Ok(())
175    }
176    fn compute_size(&self, version: i16) -> Result<usize> {
177        let mut total_size = 0;
178        if version >= 8 {
179            total_size += types::CompactString.compute_size(&self.group_id)?;
180        } else {
181            total_size += types::String.compute_size(&self.group_id)?;
182        }
183        if version >= 1 {
184            total_size += types::Int32.compute_size(&self.generation_id_or_member_epoch)?;
185        }
186        if version >= 1 {
187            if version >= 8 {
188                total_size += types::CompactString.compute_size(&self.member_id)?;
189            } else {
190                total_size += types::String.compute_size(&self.member_id)?;
191            }
192        }
193        if version >= 7 {
194            if version >= 8 {
195                total_size += types::CompactString.compute_size(&self.group_instance_id)?;
196            } else {
197                total_size += types::String.compute_size(&self.group_instance_id)?;
198            }
199        } else {
200            if !self.group_instance_id.is_none() {
201                bail!("A field is set that is not available on the selected protocol version");
202            }
203        }
204        if version >= 2 && version <= 4 {
205            total_size += types::Int64.compute_size(&self.retention_time_ms)?;
206        }
207        if version >= 8 {
208            total_size +=
209                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
210        } else {
211            total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
212        }
213        if version >= 8 {
214            let num_tagged_fields = self.unknown_tagged_fields.len();
215            if num_tagged_fields > std::u32::MAX as usize {
216                bail!(
217                    "Too many tagged fields to encode ({} fields)",
218                    num_tagged_fields
219                );
220            }
221            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
222
223            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
224        }
225        Ok(total_size)
226    }
227}
228
229#[cfg(feature = "broker")]
230impl Decodable for OffsetCommitRequest {
231    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
232        let group_id = if version >= 8 {
233            types::CompactString.decode(buf)?
234        } else {
235            types::String.decode(buf)?
236        };
237        let generation_id_or_member_epoch = if version >= 1 {
238            types::Int32.decode(buf)?
239        } else {
240            -1
241        };
242        let member_id = if version >= 1 {
243            if version >= 8 {
244                types::CompactString.decode(buf)?
245            } else {
246                types::String.decode(buf)?
247            }
248        } else {
249            Default::default()
250        };
251        let group_instance_id = if version >= 7 {
252            if version >= 8 {
253                types::CompactString.decode(buf)?
254            } else {
255                types::String.decode(buf)?
256            }
257        } else {
258            None
259        };
260        let retention_time_ms = if version >= 2 && version <= 4 {
261            types::Int64.decode(buf)?
262        } else {
263            -1
264        };
265        let topics = if version >= 8 {
266            types::CompactArray(types::Struct { version }).decode(buf)?
267        } else {
268            types::Array(types::Struct { version }).decode(buf)?
269        };
270        let mut unknown_tagged_fields = BTreeMap::new();
271        if version >= 8 {
272            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
273            for _ in 0..num_tagged_fields {
274                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
275                let size: u32 = types::UnsignedVarInt.decode(buf)?;
276                let unknown_value = buf.try_get_bytes(size as usize)?;
277                unknown_tagged_fields.insert(tag as i32, unknown_value);
278            }
279        }
280        Ok(Self {
281            group_id,
282            generation_id_or_member_epoch,
283            member_id,
284            group_instance_id,
285            retention_time_ms,
286            topics,
287            unknown_tagged_fields,
288        })
289    }
290}
291
292impl Default for OffsetCommitRequest {
293    fn default() -> Self {
294        Self {
295            group_id: Default::default(),
296            generation_id_or_member_epoch: -1,
297            member_id: Default::default(),
298            group_instance_id: None,
299            retention_time_ms: -1,
300            topics: Default::default(),
301            unknown_tagged_fields: BTreeMap::new(),
302        }
303    }
304}
305
306impl Message for OffsetCommitRequest {
307    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
308    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
309}
310
311/// Valid versions: 0-9
312#[non_exhaustive]
313#[derive(Debug, Clone, PartialEq)]
314pub struct OffsetCommitRequestPartition {
315    /// The partition index.
316    ///
317    /// Supported API versions: 0-9
318    pub partition_index: i32,
319
320    /// The message offset to be committed.
321    ///
322    /// Supported API versions: 0-9
323    pub committed_offset: i64,
324
325    /// The leader epoch of this partition.
326    ///
327    /// Supported API versions: 6-9
328    pub committed_leader_epoch: i32,
329
330    /// The timestamp of the commit.
331    ///
332    /// Supported API versions: 1
333    pub commit_timestamp: i64,
334
335    /// Any associated metadata the client wants to keep.
336    ///
337    /// Supported API versions: 0-9
338    pub committed_metadata: Option<StrBytes>,
339
340    /// Other tagged fields
341    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
342}
343
344impl OffsetCommitRequestPartition {
345    /// Sets `partition_index` to the passed value.
346    ///
347    /// The partition index.
348    ///
349    /// Supported API versions: 0-9
350    pub fn with_partition_index(mut self, value: i32) -> Self {
351        self.partition_index = value;
352        self
353    }
354    /// Sets `committed_offset` to the passed value.
355    ///
356    /// The message offset to be committed.
357    ///
358    /// Supported API versions: 0-9
359    pub fn with_committed_offset(mut self, value: i64) -> Self {
360        self.committed_offset = value;
361        self
362    }
363    /// Sets `committed_leader_epoch` to the passed value.
364    ///
365    /// The leader epoch of this partition.
366    ///
367    /// Supported API versions: 6-9
368    pub fn with_committed_leader_epoch(mut self, value: i32) -> Self {
369        self.committed_leader_epoch = value;
370        self
371    }
372    /// Sets `commit_timestamp` to the passed value.
373    ///
374    /// The timestamp of the commit.
375    ///
376    /// Supported API versions: 1
377    pub fn with_commit_timestamp(mut self, value: i64) -> Self {
378        self.commit_timestamp = value;
379        self
380    }
381    /// Sets `committed_metadata` to the passed value.
382    ///
383    /// Any associated metadata the client wants to keep.
384    ///
385    /// Supported API versions: 0-9
386    pub fn with_committed_metadata(mut self, value: Option<StrBytes>) -> Self {
387        self.committed_metadata = value;
388        self
389    }
390    /// Sets unknown_tagged_fields to the passed value.
391    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
392        self.unknown_tagged_fields = value;
393        self
394    }
395    /// Inserts an entry into unknown_tagged_fields.
396    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
397        self.unknown_tagged_fields.insert(key, value);
398        self
399    }
400}
401
402#[cfg(feature = "client")]
403impl Encodable for OffsetCommitRequestPartition {
404    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
405        types::Int32.encode(buf, &self.partition_index)?;
406        types::Int64.encode(buf, &self.committed_offset)?;
407        if version >= 6 {
408            types::Int32.encode(buf, &self.committed_leader_epoch)?;
409        }
410        if version == 1 {
411            types::Int64.encode(buf, &self.commit_timestamp)?;
412        } else {
413            if self.commit_timestamp != -1 {
414                bail!("A field is set that is not available on the selected protocol version");
415            }
416        }
417        if version >= 8 {
418            types::CompactString.encode(buf, &self.committed_metadata)?;
419        } else {
420            types::String.encode(buf, &self.committed_metadata)?;
421        }
422        if version >= 8 {
423            let num_tagged_fields = self.unknown_tagged_fields.len();
424            if num_tagged_fields > std::u32::MAX as usize {
425                bail!(
426                    "Too many tagged fields to encode ({} fields)",
427                    num_tagged_fields
428                );
429            }
430            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
431
432            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
433        }
434        Ok(())
435    }
436    fn compute_size(&self, version: i16) -> Result<usize> {
437        let mut total_size = 0;
438        total_size += types::Int32.compute_size(&self.partition_index)?;
439        total_size += types::Int64.compute_size(&self.committed_offset)?;
440        if version >= 6 {
441            total_size += types::Int32.compute_size(&self.committed_leader_epoch)?;
442        }
443        if version == 1 {
444            total_size += types::Int64.compute_size(&self.commit_timestamp)?;
445        } else {
446            if self.commit_timestamp != -1 {
447                bail!("A field is set that is not available on the selected protocol version");
448            }
449        }
450        if version >= 8 {
451            total_size += types::CompactString.compute_size(&self.committed_metadata)?;
452        } else {
453            total_size += types::String.compute_size(&self.committed_metadata)?;
454        }
455        if version >= 8 {
456            let num_tagged_fields = self.unknown_tagged_fields.len();
457            if num_tagged_fields > std::u32::MAX as usize {
458                bail!(
459                    "Too many tagged fields to encode ({} fields)",
460                    num_tagged_fields
461                );
462            }
463            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
464
465            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
466        }
467        Ok(total_size)
468    }
469}
470
471#[cfg(feature = "broker")]
472impl Decodable for OffsetCommitRequestPartition {
473    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
474        let partition_index = types::Int32.decode(buf)?;
475        let committed_offset = types::Int64.decode(buf)?;
476        let committed_leader_epoch = if version >= 6 {
477            types::Int32.decode(buf)?
478        } else {
479            -1
480        };
481        let commit_timestamp = if version == 1 {
482            types::Int64.decode(buf)?
483        } else {
484            -1
485        };
486        let committed_metadata = if version >= 8 {
487            types::CompactString.decode(buf)?
488        } else {
489            types::String.decode(buf)?
490        };
491        let mut unknown_tagged_fields = BTreeMap::new();
492        if version >= 8 {
493            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
494            for _ in 0..num_tagged_fields {
495                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
496                let size: u32 = types::UnsignedVarInt.decode(buf)?;
497                let unknown_value = buf.try_get_bytes(size as usize)?;
498                unknown_tagged_fields.insert(tag as i32, unknown_value);
499            }
500        }
501        Ok(Self {
502            partition_index,
503            committed_offset,
504            committed_leader_epoch,
505            commit_timestamp,
506            committed_metadata,
507            unknown_tagged_fields,
508        })
509    }
510}
511
512impl Default for OffsetCommitRequestPartition {
513    fn default() -> Self {
514        Self {
515            partition_index: 0,
516            committed_offset: 0,
517            committed_leader_epoch: -1,
518            commit_timestamp: -1,
519            committed_metadata: Some(Default::default()),
520            unknown_tagged_fields: BTreeMap::new(),
521        }
522    }
523}
524
525impl Message for OffsetCommitRequestPartition {
526    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
527    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
528}
529
530/// Valid versions: 0-9
531#[non_exhaustive]
532#[derive(Debug, Clone, PartialEq)]
533pub struct OffsetCommitRequestTopic {
534    /// The topic name.
535    ///
536    /// Supported API versions: 0-9
537    pub name: super::TopicName,
538
539    /// Each partition to commit offsets for.
540    ///
541    /// Supported API versions: 0-9
542    pub partitions: Vec<OffsetCommitRequestPartition>,
543
544    /// Other tagged fields
545    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
546}
547
548impl OffsetCommitRequestTopic {
549    /// Sets `name` to the passed value.
550    ///
551    /// The topic name.
552    ///
553    /// Supported API versions: 0-9
554    pub fn with_name(mut self, value: super::TopicName) -> Self {
555        self.name = value;
556        self
557    }
558    /// Sets `partitions` to the passed value.
559    ///
560    /// Each partition to commit offsets for.
561    ///
562    /// Supported API versions: 0-9
563    pub fn with_partitions(mut self, value: Vec<OffsetCommitRequestPartition>) -> Self {
564        self.partitions = value;
565        self
566    }
567    /// Sets unknown_tagged_fields to the passed value.
568    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
569        self.unknown_tagged_fields = value;
570        self
571    }
572    /// Inserts an entry into unknown_tagged_fields.
573    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
574        self.unknown_tagged_fields.insert(key, value);
575        self
576    }
577}
578
579#[cfg(feature = "client")]
580impl Encodable for OffsetCommitRequestTopic {
581    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
582        if version >= 8 {
583            types::CompactString.encode(buf, &self.name)?;
584        } else {
585            types::String.encode(buf, &self.name)?;
586        }
587        if version >= 8 {
588            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
589        } else {
590            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
591        }
592        if version >= 8 {
593            let num_tagged_fields = self.unknown_tagged_fields.len();
594            if num_tagged_fields > std::u32::MAX as usize {
595                bail!(
596                    "Too many tagged fields to encode ({} fields)",
597                    num_tagged_fields
598                );
599            }
600            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
601
602            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
603        }
604        Ok(())
605    }
606    fn compute_size(&self, version: i16) -> Result<usize> {
607        let mut total_size = 0;
608        if version >= 8 {
609            total_size += types::CompactString.compute_size(&self.name)?;
610        } else {
611            total_size += types::String.compute_size(&self.name)?;
612        }
613        if version >= 8 {
614            total_size +=
615                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
616        } else {
617            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
618        }
619        if version >= 8 {
620            let num_tagged_fields = self.unknown_tagged_fields.len();
621            if num_tagged_fields > std::u32::MAX as usize {
622                bail!(
623                    "Too many tagged fields to encode ({} fields)",
624                    num_tagged_fields
625                );
626            }
627            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
628
629            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
630        }
631        Ok(total_size)
632    }
633}
634
635#[cfg(feature = "broker")]
636impl Decodable for OffsetCommitRequestTopic {
637    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
638        let name = if version >= 8 {
639            types::CompactString.decode(buf)?
640        } else {
641            types::String.decode(buf)?
642        };
643        let partitions = if version >= 8 {
644            types::CompactArray(types::Struct { version }).decode(buf)?
645        } else {
646            types::Array(types::Struct { version }).decode(buf)?
647        };
648        let mut unknown_tagged_fields = BTreeMap::new();
649        if version >= 8 {
650            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
651            for _ in 0..num_tagged_fields {
652                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
653                let size: u32 = types::UnsignedVarInt.decode(buf)?;
654                let unknown_value = buf.try_get_bytes(size as usize)?;
655                unknown_tagged_fields.insert(tag as i32, unknown_value);
656            }
657        }
658        Ok(Self {
659            name,
660            partitions,
661            unknown_tagged_fields,
662        })
663    }
664}
665
666impl Default for OffsetCommitRequestTopic {
667    fn default() -> Self {
668        Self {
669            name: Default::default(),
670            partitions: Default::default(),
671            unknown_tagged_fields: BTreeMap::new(),
672        }
673    }
674}
675
676impl Message for OffsetCommitRequestTopic {
677    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
678    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
679}
680
681impl HeaderVersion for OffsetCommitRequest {
682    fn header_version(version: i16) -> i16 {
683        if version >= 8 {
684            2
685        } else {
686            1
687        }
688    }
689}