kafka_protocol/messages/
txn_offset_commit_request.rs

1//! TxnOffsetCommitRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-4
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct TxnOffsetCommitRequest {
24    /// The ID of the transaction.
25    ///
26    /// Supported API versions: 0-4
27    pub transactional_id: super::TransactionalId,
28
29    /// The ID of the group.
30    ///
31    /// Supported API versions: 0-4
32    pub group_id: super::GroupId,
33
34    /// The current producer ID in use by the transactional ID.
35    ///
36    /// Supported API versions: 0-4
37    pub producer_id: super::ProducerId,
38
39    /// The current epoch associated with the producer ID.
40    ///
41    /// Supported API versions: 0-4
42    pub producer_epoch: i16,
43
44    /// The generation of the consumer.
45    ///
46    /// Supported API versions: 3-4
47    pub generation_id: i32,
48
49    /// The member ID assigned by the group coordinator.
50    ///
51    /// Supported API versions: 3-4
52    pub member_id: StrBytes,
53
54    /// The unique identifier of the consumer instance provided by end user.
55    ///
56    /// Supported API versions: 3-4
57    pub group_instance_id: Option<StrBytes>,
58
59    /// Each topic that we want to commit offsets for.
60    ///
61    /// Supported API versions: 0-4
62    pub topics: Vec<TxnOffsetCommitRequestTopic>,
63
64    /// Other tagged fields
65    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
66}
67
68impl TxnOffsetCommitRequest {
69    /// Sets `transactional_id` to the passed value.
70    ///
71    /// The ID of the transaction.
72    ///
73    /// Supported API versions: 0-4
74    pub fn with_transactional_id(mut self, value: super::TransactionalId) -> Self {
75        self.transactional_id = value;
76        self
77    }
78    /// Sets `group_id` to the passed value.
79    ///
80    /// The ID of the group.
81    ///
82    /// Supported API versions: 0-4
83    pub fn with_group_id(mut self, value: super::GroupId) -> Self {
84        self.group_id = value;
85        self
86    }
87    /// Sets `producer_id` to the passed value.
88    ///
89    /// The current producer ID in use by the transactional ID.
90    ///
91    /// Supported API versions: 0-4
92    pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
93        self.producer_id = value;
94        self
95    }
96    /// Sets `producer_epoch` to the passed value.
97    ///
98    /// The current epoch associated with the producer ID.
99    ///
100    /// Supported API versions: 0-4
101    pub fn with_producer_epoch(mut self, value: i16) -> Self {
102        self.producer_epoch = value;
103        self
104    }
105    /// Sets `generation_id` to the passed value.
106    ///
107    /// The generation of the consumer.
108    ///
109    /// Supported API versions: 3-4
110    pub fn with_generation_id(mut self, value: i32) -> Self {
111        self.generation_id = value;
112        self
113    }
114    /// Sets `member_id` to the passed value.
115    ///
116    /// The member ID assigned by the group coordinator.
117    ///
118    /// Supported API versions: 3-4
119    pub fn with_member_id(mut self, value: StrBytes) -> Self {
120        self.member_id = value;
121        self
122    }
123    /// Sets `group_instance_id` to the passed value.
124    ///
125    /// The unique identifier of the consumer instance provided by end user.
126    ///
127    /// Supported API versions: 3-4
128    pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
129        self.group_instance_id = value;
130        self
131    }
132    /// Sets `topics` to the passed value.
133    ///
134    /// Each topic that we want to commit offsets for.
135    ///
136    /// Supported API versions: 0-4
137    pub fn with_topics(mut self, value: Vec<TxnOffsetCommitRequestTopic>) -> Self {
138        self.topics = value;
139        self
140    }
141    /// Sets unknown_tagged_fields to the passed value.
142    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
143        self.unknown_tagged_fields = value;
144        self
145    }
146    /// Inserts an entry into unknown_tagged_fields.
147    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
148        self.unknown_tagged_fields.insert(key, value);
149        self
150    }
151}
152
153#[cfg(feature = "client")]
154impl Encodable for TxnOffsetCommitRequest {
155    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
156        if version < 0 || version > 4 {
157            bail!("specified version not supported by this message type");
158        }
159        if version >= 3 {
160            types::CompactString.encode(buf, &self.transactional_id)?;
161        } else {
162            types::String.encode(buf, &self.transactional_id)?;
163        }
164        if version >= 3 {
165            types::CompactString.encode(buf, &self.group_id)?;
166        } else {
167            types::String.encode(buf, &self.group_id)?;
168        }
169        types::Int64.encode(buf, &self.producer_id)?;
170        types::Int16.encode(buf, &self.producer_epoch)?;
171        if version >= 3 {
172            types::Int32.encode(buf, &self.generation_id)?;
173        } else {
174            if self.generation_id != -1 {
175                bail!("A field is set that is not available on the selected protocol version");
176            }
177        }
178        if version >= 3 {
179            types::CompactString.encode(buf, &self.member_id)?;
180        } else {
181            if &self.member_id != "" {
182                bail!("A field is set that is not available on the selected protocol version");
183            }
184        }
185        if version >= 3 {
186            types::CompactString.encode(buf, &self.group_instance_id)?;
187        } else {
188            if !self.group_instance_id.is_none() {
189                bail!("A field is set that is not available on the selected protocol version");
190            }
191        }
192        if version >= 3 {
193            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
194        } else {
195            types::Array(types::Struct { version }).encode(buf, &self.topics)?;
196        }
197        if version >= 3 {
198            let num_tagged_fields = self.unknown_tagged_fields.len();
199            if num_tagged_fields > std::u32::MAX as usize {
200                bail!(
201                    "Too many tagged fields to encode ({} fields)",
202                    num_tagged_fields
203                );
204            }
205            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
206
207            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
208        }
209        Ok(())
210    }
211    fn compute_size(&self, version: i16) -> Result<usize> {
212        let mut total_size = 0;
213        if version >= 3 {
214            total_size += types::CompactString.compute_size(&self.transactional_id)?;
215        } else {
216            total_size += types::String.compute_size(&self.transactional_id)?;
217        }
218        if version >= 3 {
219            total_size += types::CompactString.compute_size(&self.group_id)?;
220        } else {
221            total_size += types::String.compute_size(&self.group_id)?;
222        }
223        total_size += types::Int64.compute_size(&self.producer_id)?;
224        total_size += types::Int16.compute_size(&self.producer_epoch)?;
225        if version >= 3 {
226            total_size += types::Int32.compute_size(&self.generation_id)?;
227        } else {
228            if self.generation_id != -1 {
229                bail!("A field is set that is not available on the selected protocol version");
230            }
231        }
232        if version >= 3 {
233            total_size += types::CompactString.compute_size(&self.member_id)?;
234        } else {
235            if &self.member_id != "" {
236                bail!("A field is set that is not available on the selected protocol version");
237            }
238        }
239        if version >= 3 {
240            total_size += types::CompactString.compute_size(&self.group_instance_id)?;
241        } else {
242            if !self.group_instance_id.is_none() {
243                bail!("A field is set that is not available on the selected protocol version");
244            }
245        }
246        if version >= 3 {
247            total_size +=
248                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
249        } else {
250            total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
251        }
252        if version >= 3 {
253            let num_tagged_fields = self.unknown_tagged_fields.len();
254            if num_tagged_fields > std::u32::MAX as usize {
255                bail!(
256                    "Too many tagged fields to encode ({} fields)",
257                    num_tagged_fields
258                );
259            }
260            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
261
262            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
263        }
264        Ok(total_size)
265    }
266}
267
268#[cfg(feature = "broker")]
269impl Decodable for TxnOffsetCommitRequest {
270    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
271        if version < 0 || version > 4 {
272            bail!("specified version not supported by this message type");
273        }
274        let transactional_id = if version >= 3 {
275            types::CompactString.decode(buf)?
276        } else {
277            types::String.decode(buf)?
278        };
279        let group_id = if version >= 3 {
280            types::CompactString.decode(buf)?
281        } else {
282            types::String.decode(buf)?
283        };
284        let producer_id = types::Int64.decode(buf)?;
285        let producer_epoch = types::Int16.decode(buf)?;
286        let generation_id = if version >= 3 {
287            types::Int32.decode(buf)?
288        } else {
289            -1
290        };
291        let member_id = if version >= 3 {
292            types::CompactString.decode(buf)?
293        } else {
294            StrBytes::from_static_str("")
295        };
296        let group_instance_id = if version >= 3 {
297            types::CompactString.decode(buf)?
298        } else {
299            None
300        };
301        let topics = if version >= 3 {
302            types::CompactArray(types::Struct { version }).decode(buf)?
303        } else {
304            types::Array(types::Struct { version }).decode(buf)?
305        };
306        let mut unknown_tagged_fields = BTreeMap::new();
307        if version >= 3 {
308            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
309            for _ in 0..num_tagged_fields {
310                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
311                let size: u32 = types::UnsignedVarInt.decode(buf)?;
312                let unknown_value = buf.try_get_bytes(size as usize)?;
313                unknown_tagged_fields.insert(tag as i32, unknown_value);
314            }
315        }
316        Ok(Self {
317            transactional_id,
318            group_id,
319            producer_id,
320            producer_epoch,
321            generation_id,
322            member_id,
323            group_instance_id,
324            topics,
325            unknown_tagged_fields,
326        })
327    }
328}
329
330impl Default for TxnOffsetCommitRequest {
331    fn default() -> Self {
332        Self {
333            transactional_id: Default::default(),
334            group_id: Default::default(),
335            producer_id: (0).into(),
336            producer_epoch: 0,
337            generation_id: -1,
338            member_id: StrBytes::from_static_str(""),
339            group_instance_id: None,
340            topics: Default::default(),
341            unknown_tagged_fields: BTreeMap::new(),
342        }
343    }
344}
345
346impl Message for TxnOffsetCommitRequest {
347    const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
348    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
349}
350
351/// Valid versions: 0-4
352#[non_exhaustive]
353#[derive(Debug, Clone, PartialEq)]
354pub struct TxnOffsetCommitRequestPartition {
355    /// The index of the partition within the topic.
356    ///
357    /// Supported API versions: 0-4
358    pub partition_index: i32,
359
360    /// The message offset to be committed.
361    ///
362    /// Supported API versions: 0-4
363    pub committed_offset: i64,
364
365    /// The leader epoch of the last consumed record.
366    ///
367    /// Supported API versions: 2-4
368    pub committed_leader_epoch: i32,
369
370    /// Any associated metadata the client wants to keep.
371    ///
372    /// Supported API versions: 0-4
373    pub committed_metadata: Option<StrBytes>,
374
375    /// Other tagged fields
376    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
377}
378
379impl TxnOffsetCommitRequestPartition {
380    /// Sets `partition_index` to the passed value.
381    ///
382    /// The index of the partition within the topic.
383    ///
384    /// Supported API versions: 0-4
385    pub fn with_partition_index(mut self, value: i32) -> Self {
386        self.partition_index = value;
387        self
388    }
389    /// Sets `committed_offset` to the passed value.
390    ///
391    /// The message offset to be committed.
392    ///
393    /// Supported API versions: 0-4
394    pub fn with_committed_offset(mut self, value: i64) -> Self {
395        self.committed_offset = value;
396        self
397    }
398    /// Sets `committed_leader_epoch` to the passed value.
399    ///
400    /// The leader epoch of the last consumed record.
401    ///
402    /// Supported API versions: 2-4
403    pub fn with_committed_leader_epoch(mut self, value: i32) -> Self {
404        self.committed_leader_epoch = value;
405        self
406    }
407    /// Sets `committed_metadata` to the passed value.
408    ///
409    /// Any associated metadata the client wants to keep.
410    ///
411    /// Supported API versions: 0-4
412    pub fn with_committed_metadata(mut self, value: Option<StrBytes>) -> Self {
413        self.committed_metadata = value;
414        self
415    }
416    /// Sets unknown_tagged_fields to the passed value.
417    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
418        self.unknown_tagged_fields = value;
419        self
420    }
421    /// Inserts an entry into unknown_tagged_fields.
422    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
423        self.unknown_tagged_fields.insert(key, value);
424        self
425    }
426}
427
428#[cfg(feature = "client")]
429impl Encodable for TxnOffsetCommitRequestPartition {
430    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
431        if version < 0 || version > 4 {
432            bail!("specified version not supported by this message type");
433        }
434        types::Int32.encode(buf, &self.partition_index)?;
435        types::Int64.encode(buf, &self.committed_offset)?;
436        if version >= 2 {
437            types::Int32.encode(buf, &self.committed_leader_epoch)?;
438        }
439        if version >= 3 {
440            types::CompactString.encode(buf, &self.committed_metadata)?;
441        } else {
442            types::String.encode(buf, &self.committed_metadata)?;
443        }
444        if version >= 3 {
445            let num_tagged_fields = self.unknown_tagged_fields.len();
446            if num_tagged_fields > std::u32::MAX as usize {
447                bail!(
448                    "Too many tagged fields to encode ({} fields)",
449                    num_tagged_fields
450                );
451            }
452            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
453
454            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
455        }
456        Ok(())
457    }
458    fn compute_size(&self, version: i16) -> Result<usize> {
459        let mut total_size = 0;
460        total_size += types::Int32.compute_size(&self.partition_index)?;
461        total_size += types::Int64.compute_size(&self.committed_offset)?;
462        if version >= 2 {
463            total_size += types::Int32.compute_size(&self.committed_leader_epoch)?;
464        }
465        if version >= 3 {
466            total_size += types::CompactString.compute_size(&self.committed_metadata)?;
467        } else {
468            total_size += types::String.compute_size(&self.committed_metadata)?;
469        }
470        if version >= 3 {
471            let num_tagged_fields = self.unknown_tagged_fields.len();
472            if num_tagged_fields > std::u32::MAX as usize {
473                bail!(
474                    "Too many tagged fields to encode ({} fields)",
475                    num_tagged_fields
476                );
477            }
478            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
479
480            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
481        }
482        Ok(total_size)
483    }
484}
485
486#[cfg(feature = "broker")]
487impl Decodable for TxnOffsetCommitRequestPartition {
488    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
489        if version < 0 || version > 4 {
490            bail!("specified version not supported by this message type");
491        }
492        let partition_index = types::Int32.decode(buf)?;
493        let committed_offset = types::Int64.decode(buf)?;
494        let committed_leader_epoch = if version >= 2 {
495            types::Int32.decode(buf)?
496        } else {
497            -1
498        };
499        let committed_metadata = if version >= 3 {
500            types::CompactString.decode(buf)?
501        } else {
502            types::String.decode(buf)?
503        };
504        let mut unknown_tagged_fields = BTreeMap::new();
505        if version >= 3 {
506            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
507            for _ in 0..num_tagged_fields {
508                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
509                let size: u32 = types::UnsignedVarInt.decode(buf)?;
510                let unknown_value = buf.try_get_bytes(size as usize)?;
511                unknown_tagged_fields.insert(tag as i32, unknown_value);
512            }
513        }
514        Ok(Self {
515            partition_index,
516            committed_offset,
517            committed_leader_epoch,
518            committed_metadata,
519            unknown_tagged_fields,
520        })
521    }
522}
523
524impl Default for TxnOffsetCommitRequestPartition {
525    fn default() -> Self {
526        Self {
527            partition_index: 0,
528            committed_offset: 0,
529            committed_leader_epoch: -1,
530            committed_metadata: Some(Default::default()),
531            unknown_tagged_fields: BTreeMap::new(),
532        }
533    }
534}
535
536impl Message for TxnOffsetCommitRequestPartition {
537    const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
538    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
539}
540
541/// Valid versions: 0-4
542#[non_exhaustive]
543#[derive(Debug, Clone, PartialEq)]
544pub struct TxnOffsetCommitRequestTopic {
545    /// The topic name.
546    ///
547    /// Supported API versions: 0-4
548    pub name: super::TopicName,
549
550    /// The partitions inside the topic that we want to commit offsets for.
551    ///
552    /// Supported API versions: 0-4
553    pub partitions: Vec<TxnOffsetCommitRequestPartition>,
554
555    /// Other tagged fields
556    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
557}
558
559impl TxnOffsetCommitRequestTopic {
560    /// Sets `name` to the passed value.
561    ///
562    /// The topic name.
563    ///
564    /// Supported API versions: 0-4
565    pub fn with_name(mut self, value: super::TopicName) -> Self {
566        self.name = value;
567        self
568    }
569    /// Sets `partitions` to the passed value.
570    ///
571    /// The partitions inside the topic that we want to commit offsets for.
572    ///
573    /// Supported API versions: 0-4
574    pub fn with_partitions(mut self, value: Vec<TxnOffsetCommitRequestPartition>) -> Self {
575        self.partitions = value;
576        self
577    }
578    /// Sets unknown_tagged_fields to the passed value.
579    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
580        self.unknown_tagged_fields = value;
581        self
582    }
583    /// Inserts an entry into unknown_tagged_fields.
584    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
585        self.unknown_tagged_fields.insert(key, value);
586        self
587    }
588}
589
590#[cfg(feature = "client")]
591impl Encodable for TxnOffsetCommitRequestTopic {
592    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
593        if version < 0 || version > 4 {
594            bail!("specified version not supported by this message type");
595        }
596        if version >= 3 {
597            types::CompactString.encode(buf, &self.name)?;
598        } else {
599            types::String.encode(buf, &self.name)?;
600        }
601        if version >= 3 {
602            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
603        } else {
604            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
605        }
606        if version >= 3 {
607            let num_tagged_fields = self.unknown_tagged_fields.len();
608            if num_tagged_fields > std::u32::MAX as usize {
609                bail!(
610                    "Too many tagged fields to encode ({} fields)",
611                    num_tagged_fields
612                );
613            }
614            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
615
616            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
617        }
618        Ok(())
619    }
620    fn compute_size(&self, version: i16) -> Result<usize> {
621        let mut total_size = 0;
622        if version >= 3 {
623            total_size += types::CompactString.compute_size(&self.name)?;
624        } else {
625            total_size += types::String.compute_size(&self.name)?;
626        }
627        if version >= 3 {
628            total_size +=
629                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
630        } else {
631            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
632        }
633        if version >= 3 {
634            let num_tagged_fields = self.unknown_tagged_fields.len();
635            if num_tagged_fields > std::u32::MAX as usize {
636                bail!(
637                    "Too many tagged fields to encode ({} fields)",
638                    num_tagged_fields
639                );
640            }
641            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
642
643            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
644        }
645        Ok(total_size)
646    }
647}
648
649#[cfg(feature = "broker")]
650impl Decodable for TxnOffsetCommitRequestTopic {
651    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
652        if version < 0 || version > 4 {
653            bail!("specified version not supported by this message type");
654        }
655        let name = if version >= 3 {
656            types::CompactString.decode(buf)?
657        } else {
658            types::String.decode(buf)?
659        };
660        let partitions = if version >= 3 {
661            types::CompactArray(types::Struct { version }).decode(buf)?
662        } else {
663            types::Array(types::Struct { version }).decode(buf)?
664        };
665        let mut unknown_tagged_fields = BTreeMap::new();
666        if version >= 3 {
667            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
668            for _ in 0..num_tagged_fields {
669                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
670                let size: u32 = types::UnsignedVarInt.decode(buf)?;
671                let unknown_value = buf.try_get_bytes(size as usize)?;
672                unknown_tagged_fields.insert(tag as i32, unknown_value);
673            }
674        }
675        Ok(Self {
676            name,
677            partitions,
678            unknown_tagged_fields,
679        })
680    }
681}
682
683impl Default for TxnOffsetCommitRequestTopic {
684    fn default() -> Self {
685        Self {
686            name: Default::default(),
687            partitions: Default::default(),
688            unknown_tagged_fields: BTreeMap::new(),
689        }
690    }
691}
692
693impl Message for TxnOffsetCommitRequestTopic {
694    const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
695    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
696}
697
698impl HeaderVersion for TxnOffsetCommitRequest {
699    fn header_version(version: i16) -> i16 {
700        if version >= 3 {
701            2
702        } else {
703            1
704        }
705    }
706}