kafka_protocol/messages/
txn_offset_commit_request.rs

1//! TxnOffsetCommitRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-4
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct TxnOffsetCommitRequest {
24    /// The ID of the transaction.
25    ///
26    /// Supported API versions: 0-4
27    pub transactional_id: super::TransactionalId,
28
29    /// The ID of the group.
30    ///
31    /// Supported API versions: 0-4
32    pub group_id: super::GroupId,
33
34    /// The current producer ID in use by the transactional ID.
35    ///
36    /// Supported API versions: 0-4
37    pub producer_id: super::ProducerId,
38
39    /// The current epoch associated with the producer ID.
40    ///
41    /// Supported API versions: 0-4
42    pub producer_epoch: i16,
43
44    /// The generation of the consumer.
45    ///
46    /// Supported API versions: 3-4
47    pub generation_id: i32,
48
49    /// The member ID assigned by the group coordinator.
50    ///
51    /// Supported API versions: 3-4
52    pub member_id: StrBytes,
53
54    /// The unique identifier of the consumer instance provided by end user.
55    ///
56    /// Supported API versions: 3-4
57    pub group_instance_id: Option<StrBytes>,
58
59    /// Each topic that we want to commit offsets for.
60    ///
61    /// Supported API versions: 0-4
62    pub topics: Vec<TxnOffsetCommitRequestTopic>,
63
64    /// Other tagged fields
65    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
66}
67
68impl TxnOffsetCommitRequest {
69    /// Sets `transactional_id` to the passed value.
70    ///
71    /// The ID of the transaction.
72    ///
73    /// Supported API versions: 0-4
74    pub fn with_transactional_id(mut self, value: super::TransactionalId) -> Self {
75        self.transactional_id = value;
76        self
77    }
78    /// Sets `group_id` to the passed value.
79    ///
80    /// The ID of the group.
81    ///
82    /// Supported API versions: 0-4
83    pub fn with_group_id(mut self, value: super::GroupId) -> Self {
84        self.group_id = value;
85        self
86    }
87    /// Sets `producer_id` to the passed value.
88    ///
89    /// The current producer ID in use by the transactional ID.
90    ///
91    /// Supported API versions: 0-4
92    pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
93        self.producer_id = value;
94        self
95    }
96    /// Sets `producer_epoch` to the passed value.
97    ///
98    /// The current epoch associated with the producer ID.
99    ///
100    /// Supported API versions: 0-4
101    pub fn with_producer_epoch(mut self, value: i16) -> Self {
102        self.producer_epoch = value;
103        self
104    }
105    /// Sets `generation_id` to the passed value.
106    ///
107    /// The generation of the consumer.
108    ///
109    /// Supported API versions: 3-4
110    pub fn with_generation_id(mut self, value: i32) -> Self {
111        self.generation_id = value;
112        self
113    }
114    /// Sets `member_id` to the passed value.
115    ///
116    /// The member ID assigned by the group coordinator.
117    ///
118    /// Supported API versions: 3-4
119    pub fn with_member_id(mut self, value: StrBytes) -> Self {
120        self.member_id = value;
121        self
122    }
123    /// Sets `group_instance_id` to the passed value.
124    ///
125    /// The unique identifier of the consumer instance provided by end user.
126    ///
127    /// Supported API versions: 3-4
128    pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
129        self.group_instance_id = value;
130        self
131    }
132    /// Sets `topics` to the passed value.
133    ///
134    /// Each topic that we want to commit offsets for.
135    ///
136    /// Supported API versions: 0-4
137    pub fn with_topics(mut self, value: Vec<TxnOffsetCommitRequestTopic>) -> Self {
138        self.topics = value;
139        self
140    }
141    /// Sets unknown_tagged_fields to the passed value.
142    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
143        self.unknown_tagged_fields = value;
144        self
145    }
146    /// Inserts an entry into unknown_tagged_fields.
147    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
148        self.unknown_tagged_fields.insert(key, value);
149        self
150    }
151}
152
153#[cfg(feature = "client")]
154impl Encodable for TxnOffsetCommitRequest {
155    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
156        if version >= 3 {
157            types::CompactString.encode(buf, &self.transactional_id)?;
158        } else {
159            types::String.encode(buf, &self.transactional_id)?;
160        }
161        if version >= 3 {
162            types::CompactString.encode(buf, &self.group_id)?;
163        } else {
164            types::String.encode(buf, &self.group_id)?;
165        }
166        types::Int64.encode(buf, &self.producer_id)?;
167        types::Int16.encode(buf, &self.producer_epoch)?;
168        if version >= 3 {
169            types::Int32.encode(buf, &self.generation_id)?;
170        } else {
171            if self.generation_id != -1 {
172                bail!("A field is set that is not available on the selected protocol version");
173            }
174        }
175        if version >= 3 {
176            types::CompactString.encode(buf, &self.member_id)?;
177        } else {
178            if &self.member_id != "" {
179                bail!("A field is set that is not available on the selected protocol version");
180            }
181        }
182        if version >= 3 {
183            types::CompactString.encode(buf, &self.group_instance_id)?;
184        } else {
185            if !self.group_instance_id.is_none() {
186                bail!("A field is set that is not available on the selected protocol version");
187            }
188        }
189        if version >= 3 {
190            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
191        } else {
192            types::Array(types::Struct { version }).encode(buf, &self.topics)?;
193        }
194        if version >= 3 {
195            let num_tagged_fields = self.unknown_tagged_fields.len();
196            if num_tagged_fields > std::u32::MAX as usize {
197                bail!(
198                    "Too many tagged fields to encode ({} fields)",
199                    num_tagged_fields
200                );
201            }
202            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
203
204            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
205        }
206        Ok(())
207    }
208    fn compute_size(&self, version: i16) -> Result<usize> {
209        let mut total_size = 0;
210        if version >= 3 {
211            total_size += types::CompactString.compute_size(&self.transactional_id)?;
212        } else {
213            total_size += types::String.compute_size(&self.transactional_id)?;
214        }
215        if version >= 3 {
216            total_size += types::CompactString.compute_size(&self.group_id)?;
217        } else {
218            total_size += types::String.compute_size(&self.group_id)?;
219        }
220        total_size += types::Int64.compute_size(&self.producer_id)?;
221        total_size += types::Int16.compute_size(&self.producer_epoch)?;
222        if version >= 3 {
223            total_size += types::Int32.compute_size(&self.generation_id)?;
224        } else {
225            if self.generation_id != -1 {
226                bail!("A field is set that is not available on the selected protocol version");
227            }
228        }
229        if version >= 3 {
230            total_size += types::CompactString.compute_size(&self.member_id)?;
231        } else {
232            if &self.member_id != "" {
233                bail!("A field is set that is not available on the selected protocol version");
234            }
235        }
236        if version >= 3 {
237            total_size += types::CompactString.compute_size(&self.group_instance_id)?;
238        } else {
239            if !self.group_instance_id.is_none() {
240                bail!("A field is set that is not available on the selected protocol version");
241            }
242        }
243        if version >= 3 {
244            total_size +=
245                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
246        } else {
247            total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
248        }
249        if version >= 3 {
250            let num_tagged_fields = self.unknown_tagged_fields.len();
251            if num_tagged_fields > std::u32::MAX as usize {
252                bail!(
253                    "Too many tagged fields to encode ({} fields)",
254                    num_tagged_fields
255                );
256            }
257            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
258
259            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
260        }
261        Ok(total_size)
262    }
263}
264
265#[cfg(feature = "broker")]
266impl Decodable for TxnOffsetCommitRequest {
267    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
268        let transactional_id = if version >= 3 {
269            types::CompactString.decode(buf)?
270        } else {
271            types::String.decode(buf)?
272        };
273        let group_id = if version >= 3 {
274            types::CompactString.decode(buf)?
275        } else {
276            types::String.decode(buf)?
277        };
278        let producer_id = types::Int64.decode(buf)?;
279        let producer_epoch = types::Int16.decode(buf)?;
280        let generation_id = if version >= 3 {
281            types::Int32.decode(buf)?
282        } else {
283            -1
284        };
285        let member_id = if version >= 3 {
286            types::CompactString.decode(buf)?
287        } else {
288            StrBytes::from_static_str("")
289        };
290        let group_instance_id = if version >= 3 {
291            types::CompactString.decode(buf)?
292        } else {
293            None
294        };
295        let topics = if version >= 3 {
296            types::CompactArray(types::Struct { version }).decode(buf)?
297        } else {
298            types::Array(types::Struct { version }).decode(buf)?
299        };
300        let mut unknown_tagged_fields = BTreeMap::new();
301        if version >= 3 {
302            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
303            for _ in 0..num_tagged_fields {
304                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
305                let size: u32 = types::UnsignedVarInt.decode(buf)?;
306                let unknown_value = buf.try_get_bytes(size as usize)?;
307                unknown_tagged_fields.insert(tag as i32, unknown_value);
308            }
309        }
310        Ok(Self {
311            transactional_id,
312            group_id,
313            producer_id,
314            producer_epoch,
315            generation_id,
316            member_id,
317            group_instance_id,
318            topics,
319            unknown_tagged_fields,
320        })
321    }
322}
323
324impl Default for TxnOffsetCommitRequest {
325    fn default() -> Self {
326        Self {
327            transactional_id: Default::default(),
328            group_id: Default::default(),
329            producer_id: (0).into(),
330            producer_epoch: 0,
331            generation_id: -1,
332            member_id: StrBytes::from_static_str(""),
333            group_instance_id: None,
334            topics: Default::default(),
335            unknown_tagged_fields: BTreeMap::new(),
336        }
337    }
338}
339
340impl Message for TxnOffsetCommitRequest {
341    const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
342    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
343}
344
345/// Valid versions: 0-4
346#[non_exhaustive]
347#[derive(Debug, Clone, PartialEq)]
348pub struct TxnOffsetCommitRequestPartition {
349    /// The index of the partition within the topic.
350    ///
351    /// Supported API versions: 0-4
352    pub partition_index: i32,
353
354    /// The message offset to be committed.
355    ///
356    /// Supported API versions: 0-4
357    pub committed_offset: i64,
358
359    /// The leader epoch of the last consumed record.
360    ///
361    /// Supported API versions: 2-4
362    pub committed_leader_epoch: i32,
363
364    /// Any associated metadata the client wants to keep.
365    ///
366    /// Supported API versions: 0-4
367    pub committed_metadata: Option<StrBytes>,
368
369    /// Other tagged fields
370    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
371}
372
373impl TxnOffsetCommitRequestPartition {
374    /// Sets `partition_index` to the passed value.
375    ///
376    /// The index of the partition within the topic.
377    ///
378    /// Supported API versions: 0-4
379    pub fn with_partition_index(mut self, value: i32) -> Self {
380        self.partition_index = value;
381        self
382    }
383    /// Sets `committed_offset` to the passed value.
384    ///
385    /// The message offset to be committed.
386    ///
387    /// Supported API versions: 0-4
388    pub fn with_committed_offset(mut self, value: i64) -> Self {
389        self.committed_offset = value;
390        self
391    }
392    /// Sets `committed_leader_epoch` to the passed value.
393    ///
394    /// The leader epoch of the last consumed record.
395    ///
396    /// Supported API versions: 2-4
397    pub fn with_committed_leader_epoch(mut self, value: i32) -> Self {
398        self.committed_leader_epoch = value;
399        self
400    }
401    /// Sets `committed_metadata` to the passed value.
402    ///
403    /// Any associated metadata the client wants to keep.
404    ///
405    /// Supported API versions: 0-4
406    pub fn with_committed_metadata(mut self, value: Option<StrBytes>) -> Self {
407        self.committed_metadata = value;
408        self
409    }
410    /// Sets unknown_tagged_fields to the passed value.
411    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
412        self.unknown_tagged_fields = value;
413        self
414    }
415    /// Inserts an entry into unknown_tagged_fields.
416    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
417        self.unknown_tagged_fields.insert(key, value);
418        self
419    }
420}
421
422#[cfg(feature = "client")]
423impl Encodable for TxnOffsetCommitRequestPartition {
424    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
425        types::Int32.encode(buf, &self.partition_index)?;
426        types::Int64.encode(buf, &self.committed_offset)?;
427        if version >= 2 {
428            types::Int32.encode(buf, &self.committed_leader_epoch)?;
429        }
430        if version >= 3 {
431            types::CompactString.encode(buf, &self.committed_metadata)?;
432        } else {
433            types::String.encode(buf, &self.committed_metadata)?;
434        }
435        if version >= 3 {
436            let num_tagged_fields = self.unknown_tagged_fields.len();
437            if num_tagged_fields > std::u32::MAX as usize {
438                bail!(
439                    "Too many tagged fields to encode ({} fields)",
440                    num_tagged_fields
441                );
442            }
443            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
444
445            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
446        }
447        Ok(())
448    }
449    fn compute_size(&self, version: i16) -> Result<usize> {
450        let mut total_size = 0;
451        total_size += types::Int32.compute_size(&self.partition_index)?;
452        total_size += types::Int64.compute_size(&self.committed_offset)?;
453        if version >= 2 {
454            total_size += types::Int32.compute_size(&self.committed_leader_epoch)?;
455        }
456        if version >= 3 {
457            total_size += types::CompactString.compute_size(&self.committed_metadata)?;
458        } else {
459            total_size += types::String.compute_size(&self.committed_metadata)?;
460        }
461        if version >= 3 {
462            let num_tagged_fields = self.unknown_tagged_fields.len();
463            if num_tagged_fields > std::u32::MAX as usize {
464                bail!(
465                    "Too many tagged fields to encode ({} fields)",
466                    num_tagged_fields
467                );
468            }
469            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
470
471            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
472        }
473        Ok(total_size)
474    }
475}
476
477#[cfg(feature = "broker")]
478impl Decodable for TxnOffsetCommitRequestPartition {
479    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
480        let partition_index = types::Int32.decode(buf)?;
481        let committed_offset = types::Int64.decode(buf)?;
482        let committed_leader_epoch = if version >= 2 {
483            types::Int32.decode(buf)?
484        } else {
485            -1
486        };
487        let committed_metadata = if version >= 3 {
488            types::CompactString.decode(buf)?
489        } else {
490            types::String.decode(buf)?
491        };
492        let mut unknown_tagged_fields = BTreeMap::new();
493        if version >= 3 {
494            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
495            for _ in 0..num_tagged_fields {
496                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
497                let size: u32 = types::UnsignedVarInt.decode(buf)?;
498                let unknown_value = buf.try_get_bytes(size as usize)?;
499                unknown_tagged_fields.insert(tag as i32, unknown_value);
500            }
501        }
502        Ok(Self {
503            partition_index,
504            committed_offset,
505            committed_leader_epoch,
506            committed_metadata,
507            unknown_tagged_fields,
508        })
509    }
510}
511
512impl Default for TxnOffsetCommitRequestPartition {
513    fn default() -> Self {
514        Self {
515            partition_index: 0,
516            committed_offset: 0,
517            committed_leader_epoch: -1,
518            committed_metadata: Some(Default::default()),
519            unknown_tagged_fields: BTreeMap::new(),
520        }
521    }
522}
523
524impl Message for TxnOffsetCommitRequestPartition {
525    const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
526    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
527}
528
529/// Valid versions: 0-4
530#[non_exhaustive]
531#[derive(Debug, Clone, PartialEq)]
532pub struct TxnOffsetCommitRequestTopic {
533    /// The topic name.
534    ///
535    /// Supported API versions: 0-4
536    pub name: super::TopicName,
537
538    /// The partitions inside the topic that we want to commit offsets for.
539    ///
540    /// Supported API versions: 0-4
541    pub partitions: Vec<TxnOffsetCommitRequestPartition>,
542
543    /// Other tagged fields
544    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
545}
546
547impl TxnOffsetCommitRequestTopic {
548    /// Sets `name` to the passed value.
549    ///
550    /// The topic name.
551    ///
552    /// Supported API versions: 0-4
553    pub fn with_name(mut self, value: super::TopicName) -> Self {
554        self.name = value;
555        self
556    }
557    /// Sets `partitions` to the passed value.
558    ///
559    /// The partitions inside the topic that we want to commit offsets for.
560    ///
561    /// Supported API versions: 0-4
562    pub fn with_partitions(mut self, value: Vec<TxnOffsetCommitRequestPartition>) -> Self {
563        self.partitions = value;
564        self
565    }
566    /// Sets unknown_tagged_fields to the passed value.
567    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
568        self.unknown_tagged_fields = value;
569        self
570    }
571    /// Inserts an entry into unknown_tagged_fields.
572    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
573        self.unknown_tagged_fields.insert(key, value);
574        self
575    }
576}
577
578#[cfg(feature = "client")]
579impl Encodable for TxnOffsetCommitRequestTopic {
580    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
581        if version >= 3 {
582            types::CompactString.encode(buf, &self.name)?;
583        } else {
584            types::String.encode(buf, &self.name)?;
585        }
586        if version >= 3 {
587            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
588        } else {
589            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
590        }
591        if version >= 3 {
592            let num_tagged_fields = self.unknown_tagged_fields.len();
593            if num_tagged_fields > std::u32::MAX as usize {
594                bail!(
595                    "Too many tagged fields to encode ({} fields)",
596                    num_tagged_fields
597                );
598            }
599            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
600
601            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
602        }
603        Ok(())
604    }
605    fn compute_size(&self, version: i16) -> Result<usize> {
606        let mut total_size = 0;
607        if version >= 3 {
608            total_size += types::CompactString.compute_size(&self.name)?;
609        } else {
610            total_size += types::String.compute_size(&self.name)?;
611        }
612        if version >= 3 {
613            total_size +=
614                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
615        } else {
616            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
617        }
618        if version >= 3 {
619            let num_tagged_fields = self.unknown_tagged_fields.len();
620            if num_tagged_fields > std::u32::MAX as usize {
621                bail!(
622                    "Too many tagged fields to encode ({} fields)",
623                    num_tagged_fields
624                );
625            }
626            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
627
628            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
629        }
630        Ok(total_size)
631    }
632}
633
634#[cfg(feature = "broker")]
635impl Decodable for TxnOffsetCommitRequestTopic {
636    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
637        let name = if version >= 3 {
638            types::CompactString.decode(buf)?
639        } else {
640            types::String.decode(buf)?
641        };
642        let partitions = if version >= 3 {
643            types::CompactArray(types::Struct { version }).decode(buf)?
644        } else {
645            types::Array(types::Struct { version }).decode(buf)?
646        };
647        let mut unknown_tagged_fields = BTreeMap::new();
648        if version >= 3 {
649            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
650            for _ in 0..num_tagged_fields {
651                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
652                let size: u32 = types::UnsignedVarInt.decode(buf)?;
653                let unknown_value = buf.try_get_bytes(size as usize)?;
654                unknown_tagged_fields.insert(tag as i32, unknown_value);
655            }
656        }
657        Ok(Self {
658            name,
659            partitions,
660            unknown_tagged_fields,
661        })
662    }
663}
664
665impl Default for TxnOffsetCommitRequestTopic {
666    fn default() -> Self {
667        Self {
668            name: Default::default(),
669            partitions: Default::default(),
670            unknown_tagged_fields: BTreeMap::new(),
671        }
672    }
673}
674
675impl Message for TxnOffsetCommitRequestTopic {
676    const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
677    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
678}
679
680impl HeaderVersion for TxnOffsetCommitRequest {
681    fn header_version(version: i16) -> i16 {
682        if version >= 3 {
683            2
684        } else {
685            1
686        }
687    }
688}