kafka_protocol/messages/
offset_commit_request.rs

1//! OffsetCommitRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-9
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetCommitRequest {
24    /// The unique group identifier.
25    ///
26    /// Supported API versions: 0-9
27    pub group_id: super::GroupId,
28
29    /// The generation of the group if using the classic group protocol or the member epoch if using the consumer protocol.
30    ///
31    /// Supported API versions: 1-9
32    pub generation_id_or_member_epoch: i32,
33
34    /// The member ID assigned by the group coordinator.
35    ///
36    /// Supported API versions: 1-9
37    pub member_id: StrBytes,
38
39    /// The unique identifier of the consumer instance provided by end user.
40    ///
41    /// Supported API versions: 7-9
42    pub group_instance_id: Option<StrBytes>,
43
44    /// The time period in ms to retain the offset.
45    ///
46    /// Supported API versions: 2-4
47    pub retention_time_ms: i64,
48
49    /// The topics to commit offsets for.
50    ///
51    /// Supported API versions: 0-9
52    pub topics: Vec<OffsetCommitRequestTopic>,
53
54    /// Other tagged fields
55    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl OffsetCommitRequest {
59    /// Sets `group_id` to the passed value.
60    ///
61    /// The unique group identifier.
62    ///
63    /// Supported API versions: 0-9
64    pub fn with_group_id(mut self, value: super::GroupId) -> Self {
65        self.group_id = value;
66        self
67    }
68    /// Sets `generation_id_or_member_epoch` to the passed value.
69    ///
70    /// The generation of the group if using the classic group protocol or the member epoch if using the consumer protocol.
71    ///
72    /// Supported API versions: 1-9
73    pub fn with_generation_id_or_member_epoch(mut self, value: i32) -> Self {
74        self.generation_id_or_member_epoch = value;
75        self
76    }
77    /// Sets `member_id` to the passed value.
78    ///
79    /// The member ID assigned by the group coordinator.
80    ///
81    /// Supported API versions: 1-9
82    pub fn with_member_id(mut self, value: StrBytes) -> Self {
83        self.member_id = value;
84        self
85    }
86    /// Sets `group_instance_id` to the passed value.
87    ///
88    /// The unique identifier of the consumer instance provided by end user.
89    ///
90    /// Supported API versions: 7-9
91    pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
92        self.group_instance_id = value;
93        self
94    }
95    /// Sets `retention_time_ms` to the passed value.
96    ///
97    /// The time period in ms to retain the offset.
98    ///
99    /// Supported API versions: 2-4
100    pub fn with_retention_time_ms(mut self, value: i64) -> Self {
101        self.retention_time_ms = value;
102        self
103    }
104    /// Sets `topics` to the passed value.
105    ///
106    /// The topics to commit offsets for.
107    ///
108    /// Supported API versions: 0-9
109    pub fn with_topics(mut self, value: Vec<OffsetCommitRequestTopic>) -> Self {
110        self.topics = value;
111        self
112    }
113    /// Sets unknown_tagged_fields to the passed value.
114    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115        self.unknown_tagged_fields = value;
116        self
117    }
118    /// Inserts an entry into unknown_tagged_fields.
119    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120        self.unknown_tagged_fields.insert(key, value);
121        self
122    }
123}
124
125#[cfg(feature = "client")]
126impl Encodable for OffsetCommitRequest {
127    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128        if version < 0 || version > 9 {
129            bail!("specified version not supported by this message type");
130        }
131        if version >= 8 {
132            types::CompactString.encode(buf, &self.group_id)?;
133        } else {
134            types::String.encode(buf, &self.group_id)?;
135        }
136        if version >= 1 {
137            types::Int32.encode(buf, &self.generation_id_or_member_epoch)?;
138        }
139        if version >= 1 {
140            if version >= 8 {
141                types::CompactString.encode(buf, &self.member_id)?;
142            } else {
143                types::String.encode(buf, &self.member_id)?;
144            }
145        }
146        if version >= 7 {
147            if version >= 8 {
148                types::CompactString.encode(buf, &self.group_instance_id)?;
149            } else {
150                types::String.encode(buf, &self.group_instance_id)?;
151            }
152        } else {
153            if !self.group_instance_id.is_none() {
154                bail!("A field is set that is not available on the selected protocol version");
155            }
156        }
157        if version >= 2 && version <= 4 {
158            types::Int64.encode(buf, &self.retention_time_ms)?;
159        }
160        if version >= 8 {
161            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
162        } else {
163            types::Array(types::Struct { version }).encode(buf, &self.topics)?;
164        }
165        if version >= 8 {
166            let num_tagged_fields = self.unknown_tagged_fields.len();
167            if num_tagged_fields > std::u32::MAX as usize {
168                bail!(
169                    "Too many tagged fields to encode ({} fields)",
170                    num_tagged_fields
171                );
172            }
173            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
174
175            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
176        }
177        Ok(())
178    }
179    fn compute_size(&self, version: i16) -> Result<usize> {
180        let mut total_size = 0;
181        if version >= 8 {
182            total_size += types::CompactString.compute_size(&self.group_id)?;
183        } else {
184            total_size += types::String.compute_size(&self.group_id)?;
185        }
186        if version >= 1 {
187            total_size += types::Int32.compute_size(&self.generation_id_or_member_epoch)?;
188        }
189        if version >= 1 {
190            if version >= 8 {
191                total_size += types::CompactString.compute_size(&self.member_id)?;
192            } else {
193                total_size += types::String.compute_size(&self.member_id)?;
194            }
195        }
196        if version >= 7 {
197            if version >= 8 {
198                total_size += types::CompactString.compute_size(&self.group_instance_id)?;
199            } else {
200                total_size += types::String.compute_size(&self.group_instance_id)?;
201            }
202        } else {
203            if !self.group_instance_id.is_none() {
204                bail!("A field is set that is not available on the selected protocol version");
205            }
206        }
207        if version >= 2 && version <= 4 {
208            total_size += types::Int64.compute_size(&self.retention_time_ms)?;
209        }
210        if version >= 8 {
211            total_size +=
212                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
213        } else {
214            total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
215        }
216        if version >= 8 {
217            let num_tagged_fields = self.unknown_tagged_fields.len();
218            if num_tagged_fields > std::u32::MAX as usize {
219                bail!(
220                    "Too many tagged fields to encode ({} fields)",
221                    num_tagged_fields
222                );
223            }
224            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
225
226            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
227        }
228        Ok(total_size)
229    }
230}
231
232#[cfg(feature = "broker")]
233impl Decodable for OffsetCommitRequest {
234    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
235        if version < 0 || version > 9 {
236            bail!("specified version not supported by this message type");
237        }
238        let group_id = if version >= 8 {
239            types::CompactString.decode(buf)?
240        } else {
241            types::String.decode(buf)?
242        };
243        let generation_id_or_member_epoch = if version >= 1 {
244            types::Int32.decode(buf)?
245        } else {
246            -1
247        };
248        let member_id = if version >= 1 {
249            if version >= 8 {
250                types::CompactString.decode(buf)?
251            } else {
252                types::String.decode(buf)?
253            }
254        } else {
255            Default::default()
256        };
257        let group_instance_id = if version >= 7 {
258            if version >= 8 {
259                types::CompactString.decode(buf)?
260            } else {
261                types::String.decode(buf)?
262            }
263        } else {
264            None
265        };
266        let retention_time_ms = if version >= 2 && version <= 4 {
267            types::Int64.decode(buf)?
268        } else {
269            -1
270        };
271        let topics = if version >= 8 {
272            types::CompactArray(types::Struct { version }).decode(buf)?
273        } else {
274            types::Array(types::Struct { version }).decode(buf)?
275        };
276        let mut unknown_tagged_fields = BTreeMap::new();
277        if version >= 8 {
278            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
279            for _ in 0..num_tagged_fields {
280                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
281                let size: u32 = types::UnsignedVarInt.decode(buf)?;
282                let unknown_value = buf.try_get_bytes(size as usize)?;
283                unknown_tagged_fields.insert(tag as i32, unknown_value);
284            }
285        }
286        Ok(Self {
287            group_id,
288            generation_id_or_member_epoch,
289            member_id,
290            group_instance_id,
291            retention_time_ms,
292            topics,
293            unknown_tagged_fields,
294        })
295    }
296}
297
298impl Default for OffsetCommitRequest {
299    fn default() -> Self {
300        Self {
301            group_id: Default::default(),
302            generation_id_or_member_epoch: -1,
303            member_id: Default::default(),
304            group_instance_id: None,
305            retention_time_ms: -1,
306            topics: Default::default(),
307            unknown_tagged_fields: BTreeMap::new(),
308        }
309    }
310}
311
312impl Message for OffsetCommitRequest {
313    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
314    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
315}
316
317/// Valid versions: 0-9
318#[non_exhaustive]
319#[derive(Debug, Clone, PartialEq)]
320pub struct OffsetCommitRequestPartition {
321    /// The partition index.
322    ///
323    /// Supported API versions: 0-9
324    pub partition_index: i32,
325
326    /// The message offset to be committed.
327    ///
328    /// Supported API versions: 0-9
329    pub committed_offset: i64,
330
331    /// The leader epoch of this partition.
332    ///
333    /// Supported API versions: 6-9
334    pub committed_leader_epoch: i32,
335
336    /// The timestamp of the commit.
337    ///
338    /// Supported API versions: 1
339    pub commit_timestamp: i64,
340
341    /// Any associated metadata the client wants to keep.
342    ///
343    /// Supported API versions: 0-9
344    pub committed_metadata: Option<StrBytes>,
345
346    /// Other tagged fields
347    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
348}
349
350impl OffsetCommitRequestPartition {
351    /// Sets `partition_index` to the passed value.
352    ///
353    /// The partition index.
354    ///
355    /// Supported API versions: 0-9
356    pub fn with_partition_index(mut self, value: i32) -> Self {
357        self.partition_index = value;
358        self
359    }
360    /// Sets `committed_offset` to the passed value.
361    ///
362    /// The message offset to be committed.
363    ///
364    /// Supported API versions: 0-9
365    pub fn with_committed_offset(mut self, value: i64) -> Self {
366        self.committed_offset = value;
367        self
368    }
369    /// Sets `committed_leader_epoch` to the passed value.
370    ///
371    /// The leader epoch of this partition.
372    ///
373    /// Supported API versions: 6-9
374    pub fn with_committed_leader_epoch(mut self, value: i32) -> Self {
375        self.committed_leader_epoch = value;
376        self
377    }
378    /// Sets `commit_timestamp` to the passed value.
379    ///
380    /// The timestamp of the commit.
381    ///
382    /// Supported API versions: 1
383    pub fn with_commit_timestamp(mut self, value: i64) -> Self {
384        self.commit_timestamp = value;
385        self
386    }
387    /// Sets `committed_metadata` to the passed value.
388    ///
389    /// Any associated metadata the client wants to keep.
390    ///
391    /// Supported API versions: 0-9
392    pub fn with_committed_metadata(mut self, value: Option<StrBytes>) -> Self {
393        self.committed_metadata = value;
394        self
395    }
396    /// Sets unknown_tagged_fields to the passed value.
397    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
398        self.unknown_tagged_fields = value;
399        self
400    }
401    /// Inserts an entry into unknown_tagged_fields.
402    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
403        self.unknown_tagged_fields.insert(key, value);
404        self
405    }
406}
407
408#[cfg(feature = "client")]
409impl Encodable for OffsetCommitRequestPartition {
410    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
411        if version < 0 || version > 9 {
412            bail!("specified version not supported by this message type");
413        }
414        types::Int32.encode(buf, &self.partition_index)?;
415        types::Int64.encode(buf, &self.committed_offset)?;
416        if version >= 6 {
417            types::Int32.encode(buf, &self.committed_leader_epoch)?;
418        }
419        if version == 1 {
420            types::Int64.encode(buf, &self.commit_timestamp)?;
421        } else {
422            if self.commit_timestamp != -1 {
423                bail!("A field is set that is not available on the selected protocol version");
424            }
425        }
426        if version >= 8 {
427            types::CompactString.encode(buf, &self.committed_metadata)?;
428        } else {
429            types::String.encode(buf, &self.committed_metadata)?;
430        }
431        if version >= 8 {
432            let num_tagged_fields = self.unknown_tagged_fields.len();
433            if num_tagged_fields > std::u32::MAX as usize {
434                bail!(
435                    "Too many tagged fields to encode ({} fields)",
436                    num_tagged_fields
437                );
438            }
439            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
440
441            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
442        }
443        Ok(())
444    }
445    fn compute_size(&self, version: i16) -> Result<usize> {
446        let mut total_size = 0;
447        total_size += types::Int32.compute_size(&self.partition_index)?;
448        total_size += types::Int64.compute_size(&self.committed_offset)?;
449        if version >= 6 {
450            total_size += types::Int32.compute_size(&self.committed_leader_epoch)?;
451        }
452        if version == 1 {
453            total_size += types::Int64.compute_size(&self.commit_timestamp)?;
454        } else {
455            if self.commit_timestamp != -1 {
456                bail!("A field is set that is not available on the selected protocol version");
457            }
458        }
459        if version >= 8 {
460            total_size += types::CompactString.compute_size(&self.committed_metadata)?;
461        } else {
462            total_size += types::String.compute_size(&self.committed_metadata)?;
463        }
464        if version >= 8 {
465            let num_tagged_fields = self.unknown_tagged_fields.len();
466            if num_tagged_fields > std::u32::MAX as usize {
467                bail!(
468                    "Too many tagged fields to encode ({} fields)",
469                    num_tagged_fields
470                );
471            }
472            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
473
474            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
475        }
476        Ok(total_size)
477    }
478}
479
480#[cfg(feature = "broker")]
481impl Decodable for OffsetCommitRequestPartition {
482    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
483        if version < 0 || version > 9 {
484            bail!("specified version not supported by this message type");
485        }
486        let partition_index = types::Int32.decode(buf)?;
487        let committed_offset = types::Int64.decode(buf)?;
488        let committed_leader_epoch = if version >= 6 {
489            types::Int32.decode(buf)?
490        } else {
491            -1
492        };
493        let commit_timestamp = if version == 1 {
494            types::Int64.decode(buf)?
495        } else {
496            -1
497        };
498        let committed_metadata = if version >= 8 {
499            types::CompactString.decode(buf)?
500        } else {
501            types::String.decode(buf)?
502        };
503        let mut unknown_tagged_fields = BTreeMap::new();
504        if version >= 8 {
505            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
506            for _ in 0..num_tagged_fields {
507                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
508                let size: u32 = types::UnsignedVarInt.decode(buf)?;
509                let unknown_value = buf.try_get_bytes(size as usize)?;
510                unknown_tagged_fields.insert(tag as i32, unknown_value);
511            }
512        }
513        Ok(Self {
514            partition_index,
515            committed_offset,
516            committed_leader_epoch,
517            commit_timestamp,
518            committed_metadata,
519            unknown_tagged_fields,
520        })
521    }
522}
523
524impl Default for OffsetCommitRequestPartition {
525    fn default() -> Self {
526        Self {
527            partition_index: 0,
528            committed_offset: 0,
529            committed_leader_epoch: -1,
530            commit_timestamp: -1,
531            committed_metadata: Some(Default::default()),
532            unknown_tagged_fields: BTreeMap::new(),
533        }
534    }
535}
536
537impl Message for OffsetCommitRequestPartition {
538    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
539    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
540}
541
542/// Valid versions: 0-9
543#[non_exhaustive]
544#[derive(Debug, Clone, PartialEq)]
545pub struct OffsetCommitRequestTopic {
546    /// The topic name.
547    ///
548    /// Supported API versions: 0-9
549    pub name: super::TopicName,
550
551    /// Each partition to commit offsets for.
552    ///
553    /// Supported API versions: 0-9
554    pub partitions: Vec<OffsetCommitRequestPartition>,
555
556    /// Other tagged fields
557    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
558}
559
560impl OffsetCommitRequestTopic {
561    /// Sets `name` to the passed value.
562    ///
563    /// The topic name.
564    ///
565    /// Supported API versions: 0-9
566    pub fn with_name(mut self, value: super::TopicName) -> Self {
567        self.name = value;
568        self
569    }
570    /// Sets `partitions` to the passed value.
571    ///
572    /// Each partition to commit offsets for.
573    ///
574    /// Supported API versions: 0-9
575    pub fn with_partitions(mut self, value: Vec<OffsetCommitRequestPartition>) -> Self {
576        self.partitions = value;
577        self
578    }
579    /// Sets unknown_tagged_fields to the passed value.
580    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
581        self.unknown_tagged_fields = value;
582        self
583    }
584    /// Inserts an entry into unknown_tagged_fields.
585    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
586        self.unknown_tagged_fields.insert(key, value);
587        self
588    }
589}
590
591#[cfg(feature = "client")]
592impl Encodable for OffsetCommitRequestTopic {
593    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
594        if version < 0 || version > 9 {
595            bail!("specified version not supported by this message type");
596        }
597        if version >= 8 {
598            types::CompactString.encode(buf, &self.name)?;
599        } else {
600            types::String.encode(buf, &self.name)?;
601        }
602        if version >= 8 {
603            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
604        } else {
605            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
606        }
607        if version >= 8 {
608            let num_tagged_fields = self.unknown_tagged_fields.len();
609            if num_tagged_fields > std::u32::MAX as usize {
610                bail!(
611                    "Too many tagged fields to encode ({} fields)",
612                    num_tagged_fields
613                );
614            }
615            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
616
617            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
618        }
619        Ok(())
620    }
621    fn compute_size(&self, version: i16) -> Result<usize> {
622        let mut total_size = 0;
623        if version >= 8 {
624            total_size += types::CompactString.compute_size(&self.name)?;
625        } else {
626            total_size += types::String.compute_size(&self.name)?;
627        }
628        if version >= 8 {
629            total_size +=
630                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
631        } else {
632            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
633        }
634        if version >= 8 {
635            let num_tagged_fields = self.unknown_tagged_fields.len();
636            if num_tagged_fields > std::u32::MAX as usize {
637                bail!(
638                    "Too many tagged fields to encode ({} fields)",
639                    num_tagged_fields
640                );
641            }
642            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
643
644            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
645        }
646        Ok(total_size)
647    }
648}
649
650#[cfg(feature = "broker")]
651impl Decodable for OffsetCommitRequestTopic {
652    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
653        if version < 0 || version > 9 {
654            bail!("specified version not supported by this message type");
655        }
656        let name = if version >= 8 {
657            types::CompactString.decode(buf)?
658        } else {
659            types::String.decode(buf)?
660        };
661        let partitions = if version >= 8 {
662            types::CompactArray(types::Struct { version }).decode(buf)?
663        } else {
664            types::Array(types::Struct { version }).decode(buf)?
665        };
666        let mut unknown_tagged_fields = BTreeMap::new();
667        if version >= 8 {
668            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
669            for _ in 0..num_tagged_fields {
670                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
671                let size: u32 = types::UnsignedVarInt.decode(buf)?;
672                let unknown_value = buf.try_get_bytes(size as usize)?;
673                unknown_tagged_fields.insert(tag as i32, unknown_value);
674            }
675        }
676        Ok(Self {
677            name,
678            partitions,
679            unknown_tagged_fields,
680        })
681    }
682}
683
684impl Default for OffsetCommitRequestTopic {
685    fn default() -> Self {
686        Self {
687            name: Default::default(),
688            partitions: Default::default(),
689            unknown_tagged_fields: BTreeMap::new(),
690        }
691    }
692}
693
694impl Message for OffsetCommitRequestTopic {
695    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
696    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
697}
698
699impl HeaderVersion for OffsetCommitRequest {
700    fn header_version(version: i16) -> i16 {
701        if version >= 8 {
702            2
703        } else {
704            1
705        }
706    }
707}