kafka_protocol/messages/
write_txn_markers_request.rs

1//! WriteTxnMarkersRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-1
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct WritableTxnMarker {
24    /// The current producer ID.
25    ///
26    /// Supported API versions: 0-1
27    pub producer_id: super::ProducerId,
28
29    /// The current epoch associated with the producer ID.
30    ///
31    /// Supported API versions: 0-1
32    pub producer_epoch: i16,
33
34    /// The result of the transaction to write to the partitions (false = ABORT, true = COMMIT).
35    ///
36    /// Supported API versions: 0-1
37    pub transaction_result: bool,
38
39    /// Each topic that we want to write transaction marker(s) for.
40    ///
41    /// Supported API versions: 0-1
42    pub topics: Vec<WritableTxnMarkerTopic>,
43
44    /// Epoch associated with the transaction state partition hosted by this transaction coordinator
45    ///
46    /// Supported API versions: 0-1
47    pub coordinator_epoch: i32,
48
49    /// Other tagged fields
50    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl WritableTxnMarker {
54    /// Sets `producer_id` to the passed value.
55    ///
56    /// The current producer ID.
57    ///
58    /// Supported API versions: 0-1
59    pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
60        self.producer_id = value;
61        self
62    }
63    /// Sets `producer_epoch` to the passed value.
64    ///
65    /// The current epoch associated with the producer ID.
66    ///
67    /// Supported API versions: 0-1
68    pub fn with_producer_epoch(mut self, value: i16) -> Self {
69        self.producer_epoch = value;
70        self
71    }
72    /// Sets `transaction_result` to the passed value.
73    ///
74    /// The result of the transaction to write to the partitions (false = ABORT, true = COMMIT).
75    ///
76    /// Supported API versions: 0-1
77    pub fn with_transaction_result(mut self, value: bool) -> Self {
78        self.transaction_result = value;
79        self
80    }
81    /// Sets `topics` to the passed value.
82    ///
83    /// Each topic that we want to write transaction marker(s) for.
84    ///
85    /// Supported API versions: 0-1
86    pub fn with_topics(mut self, value: Vec<WritableTxnMarkerTopic>) -> Self {
87        self.topics = value;
88        self
89    }
90    /// Sets `coordinator_epoch` to the passed value.
91    ///
92    /// Epoch associated with the transaction state partition hosted by this transaction coordinator
93    ///
94    /// Supported API versions: 0-1
95    pub fn with_coordinator_epoch(mut self, value: i32) -> Self {
96        self.coordinator_epoch = value;
97        self
98    }
99    /// Sets unknown_tagged_fields to the passed value.
100    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101        self.unknown_tagged_fields = value;
102        self
103    }
104    /// Inserts an entry into unknown_tagged_fields.
105    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106        self.unknown_tagged_fields.insert(key, value);
107        self
108    }
109}
110
111#[cfg(feature = "client")]
112impl Encodable for WritableTxnMarker {
113    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114        if version < 0 || version > 1 {
115            bail!("specified version not supported by this message type");
116        }
117        types::Int64.encode(buf, &self.producer_id)?;
118        types::Int16.encode(buf, &self.producer_epoch)?;
119        types::Boolean.encode(buf, &self.transaction_result)?;
120        if version >= 1 {
121            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
122        } else {
123            types::Array(types::Struct { version }).encode(buf, &self.topics)?;
124        }
125        types::Int32.encode(buf, &self.coordinator_epoch)?;
126        if version >= 1 {
127            let num_tagged_fields = self.unknown_tagged_fields.len();
128            if num_tagged_fields > std::u32::MAX as usize {
129                bail!(
130                    "Too many tagged fields to encode ({} fields)",
131                    num_tagged_fields
132                );
133            }
134            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
135
136            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
137        }
138        Ok(())
139    }
140    fn compute_size(&self, version: i16) -> Result<usize> {
141        let mut total_size = 0;
142        total_size += types::Int64.compute_size(&self.producer_id)?;
143        total_size += types::Int16.compute_size(&self.producer_epoch)?;
144        total_size += types::Boolean.compute_size(&self.transaction_result)?;
145        if version >= 1 {
146            total_size +=
147                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
148        } else {
149            total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
150        }
151        total_size += types::Int32.compute_size(&self.coordinator_epoch)?;
152        if version >= 1 {
153            let num_tagged_fields = self.unknown_tagged_fields.len();
154            if num_tagged_fields > std::u32::MAX as usize {
155                bail!(
156                    "Too many tagged fields to encode ({} fields)",
157                    num_tagged_fields
158                );
159            }
160            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
161
162            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
163        }
164        Ok(total_size)
165    }
166}
167
168#[cfg(feature = "broker")]
169impl Decodable for WritableTxnMarker {
170    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
171        if version < 0 || version > 1 {
172            bail!("specified version not supported by this message type");
173        }
174        let producer_id = types::Int64.decode(buf)?;
175        let producer_epoch = types::Int16.decode(buf)?;
176        let transaction_result = types::Boolean.decode(buf)?;
177        let topics = if version >= 1 {
178            types::CompactArray(types::Struct { version }).decode(buf)?
179        } else {
180            types::Array(types::Struct { version }).decode(buf)?
181        };
182        let coordinator_epoch = types::Int32.decode(buf)?;
183        let mut unknown_tagged_fields = BTreeMap::new();
184        if version >= 1 {
185            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
186            for _ in 0..num_tagged_fields {
187                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
188                let size: u32 = types::UnsignedVarInt.decode(buf)?;
189                let unknown_value = buf.try_get_bytes(size as usize)?;
190                unknown_tagged_fields.insert(tag as i32, unknown_value);
191            }
192        }
193        Ok(Self {
194            producer_id,
195            producer_epoch,
196            transaction_result,
197            topics,
198            coordinator_epoch,
199            unknown_tagged_fields,
200        })
201    }
202}
203
204impl Default for WritableTxnMarker {
205    fn default() -> Self {
206        Self {
207            producer_id: (0).into(),
208            producer_epoch: 0,
209            transaction_result: false,
210            topics: Default::default(),
211            coordinator_epoch: 0,
212            unknown_tagged_fields: BTreeMap::new(),
213        }
214    }
215}
216
217impl Message for WritableTxnMarker {
218    const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
219    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
220}
221
222/// Valid versions: 0-1
223#[non_exhaustive]
224#[derive(Debug, Clone, PartialEq)]
225pub struct WritableTxnMarkerTopic {
226    /// The topic name.
227    ///
228    /// Supported API versions: 0-1
229    pub name: super::TopicName,
230
231    /// The indexes of the partitions to write transaction markers for.
232    ///
233    /// Supported API versions: 0-1
234    pub partition_indexes: Vec<i32>,
235
236    /// Other tagged fields
237    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
238}
239
240impl WritableTxnMarkerTopic {
241    /// Sets `name` to the passed value.
242    ///
243    /// The topic name.
244    ///
245    /// Supported API versions: 0-1
246    pub fn with_name(mut self, value: super::TopicName) -> Self {
247        self.name = value;
248        self
249    }
250    /// Sets `partition_indexes` to the passed value.
251    ///
252    /// The indexes of the partitions to write transaction markers for.
253    ///
254    /// Supported API versions: 0-1
255    pub fn with_partition_indexes(mut self, value: Vec<i32>) -> Self {
256        self.partition_indexes = value;
257        self
258    }
259    /// Sets unknown_tagged_fields to the passed value.
260    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
261        self.unknown_tagged_fields = value;
262        self
263    }
264    /// Inserts an entry into unknown_tagged_fields.
265    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
266        self.unknown_tagged_fields.insert(key, value);
267        self
268    }
269}
270
271#[cfg(feature = "client")]
272impl Encodable for WritableTxnMarkerTopic {
273    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
274        if version < 0 || version > 1 {
275            bail!("specified version not supported by this message type");
276        }
277        if version >= 1 {
278            types::CompactString.encode(buf, &self.name)?;
279        } else {
280            types::String.encode(buf, &self.name)?;
281        }
282        if version >= 1 {
283            types::CompactArray(types::Int32).encode(buf, &self.partition_indexes)?;
284        } else {
285            types::Array(types::Int32).encode(buf, &self.partition_indexes)?;
286        }
287        if version >= 1 {
288            let num_tagged_fields = self.unknown_tagged_fields.len();
289            if num_tagged_fields > std::u32::MAX as usize {
290                bail!(
291                    "Too many tagged fields to encode ({} fields)",
292                    num_tagged_fields
293                );
294            }
295            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
296
297            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
298        }
299        Ok(())
300    }
301    fn compute_size(&self, version: i16) -> Result<usize> {
302        let mut total_size = 0;
303        if version >= 1 {
304            total_size += types::CompactString.compute_size(&self.name)?;
305        } else {
306            total_size += types::String.compute_size(&self.name)?;
307        }
308        if version >= 1 {
309            total_size +=
310                types::CompactArray(types::Int32).compute_size(&self.partition_indexes)?;
311        } else {
312            total_size += types::Array(types::Int32).compute_size(&self.partition_indexes)?;
313        }
314        if version >= 1 {
315            let num_tagged_fields = self.unknown_tagged_fields.len();
316            if num_tagged_fields > std::u32::MAX as usize {
317                bail!(
318                    "Too many tagged fields to encode ({} fields)",
319                    num_tagged_fields
320                );
321            }
322            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
323
324            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
325        }
326        Ok(total_size)
327    }
328}
329
330#[cfg(feature = "broker")]
331impl Decodable for WritableTxnMarkerTopic {
332    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
333        if version < 0 || version > 1 {
334            bail!("specified version not supported by this message type");
335        }
336        let name = if version >= 1 {
337            types::CompactString.decode(buf)?
338        } else {
339            types::String.decode(buf)?
340        };
341        let partition_indexes = if version >= 1 {
342            types::CompactArray(types::Int32).decode(buf)?
343        } else {
344            types::Array(types::Int32).decode(buf)?
345        };
346        let mut unknown_tagged_fields = BTreeMap::new();
347        if version >= 1 {
348            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
349            for _ in 0..num_tagged_fields {
350                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
351                let size: u32 = types::UnsignedVarInt.decode(buf)?;
352                let unknown_value = buf.try_get_bytes(size as usize)?;
353                unknown_tagged_fields.insert(tag as i32, unknown_value);
354            }
355        }
356        Ok(Self {
357            name,
358            partition_indexes,
359            unknown_tagged_fields,
360        })
361    }
362}
363
364impl Default for WritableTxnMarkerTopic {
365    fn default() -> Self {
366        Self {
367            name: Default::default(),
368            partition_indexes: Default::default(),
369            unknown_tagged_fields: BTreeMap::new(),
370        }
371    }
372}
373
374impl Message for WritableTxnMarkerTopic {
375    const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
376    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
377}
378
379/// Valid versions: 0-1
380#[non_exhaustive]
381#[derive(Debug, Clone, PartialEq)]
382pub struct WriteTxnMarkersRequest {
383    /// The transaction markers to be written.
384    ///
385    /// Supported API versions: 0-1
386    pub markers: Vec<WritableTxnMarker>,
387
388    /// Other tagged fields
389    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
390}
391
392impl WriteTxnMarkersRequest {
393    /// Sets `markers` to the passed value.
394    ///
395    /// The transaction markers to be written.
396    ///
397    /// Supported API versions: 0-1
398    pub fn with_markers(mut self, value: Vec<WritableTxnMarker>) -> Self {
399        self.markers = value;
400        self
401    }
402    /// Sets unknown_tagged_fields to the passed value.
403    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
404        self.unknown_tagged_fields = value;
405        self
406    }
407    /// Inserts an entry into unknown_tagged_fields.
408    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
409        self.unknown_tagged_fields.insert(key, value);
410        self
411    }
412}
413
414#[cfg(feature = "client")]
415impl Encodable for WriteTxnMarkersRequest {
416    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
417        if version < 0 || version > 1 {
418            bail!("specified version not supported by this message type");
419        }
420        if version >= 1 {
421            types::CompactArray(types::Struct { version }).encode(buf, &self.markers)?;
422        } else {
423            types::Array(types::Struct { version }).encode(buf, &self.markers)?;
424        }
425        if version >= 1 {
426            let num_tagged_fields = self.unknown_tagged_fields.len();
427            if num_tagged_fields > std::u32::MAX as usize {
428                bail!(
429                    "Too many tagged fields to encode ({} fields)",
430                    num_tagged_fields
431                );
432            }
433            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
434
435            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
436        }
437        Ok(())
438    }
439    fn compute_size(&self, version: i16) -> Result<usize> {
440        let mut total_size = 0;
441        if version >= 1 {
442            total_size +=
443                types::CompactArray(types::Struct { version }).compute_size(&self.markers)?;
444        } else {
445            total_size += types::Array(types::Struct { version }).compute_size(&self.markers)?;
446        }
447        if version >= 1 {
448            let num_tagged_fields = self.unknown_tagged_fields.len();
449            if num_tagged_fields > std::u32::MAX as usize {
450                bail!(
451                    "Too many tagged fields to encode ({} fields)",
452                    num_tagged_fields
453                );
454            }
455            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
456
457            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
458        }
459        Ok(total_size)
460    }
461}
462
463#[cfg(feature = "broker")]
464impl Decodable for WriteTxnMarkersRequest {
465    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
466        if version < 0 || version > 1 {
467            bail!("specified version not supported by this message type");
468        }
469        let markers = if version >= 1 {
470            types::CompactArray(types::Struct { version }).decode(buf)?
471        } else {
472            types::Array(types::Struct { version }).decode(buf)?
473        };
474        let mut unknown_tagged_fields = BTreeMap::new();
475        if version >= 1 {
476            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
477            for _ in 0..num_tagged_fields {
478                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
479                let size: u32 = types::UnsignedVarInt.decode(buf)?;
480                let unknown_value = buf.try_get_bytes(size as usize)?;
481                unknown_tagged_fields.insert(tag as i32, unknown_value);
482            }
483        }
484        Ok(Self {
485            markers,
486            unknown_tagged_fields,
487        })
488    }
489}
490
491impl Default for WriteTxnMarkersRequest {
492    fn default() -> Self {
493        Self {
494            markers: Default::default(),
495            unknown_tagged_fields: BTreeMap::new(),
496        }
497    }
498}
499
500impl Message for WriteTxnMarkersRequest {
501    const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
502    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
503}
504
505impl HeaderVersion for WriteTxnMarkersRequest {
506    fn header_version(version: i16) -> i16 {
507        if version >= 1 {
508            2
509        } else {
510            1
511        }
512    }
513}