kafka_protocol/messages/
write_txn_markers_request.rs

1//! WriteTxnMarkersRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-1
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct WritableTxnMarker {
24    /// The current producer ID.
25    ///
26    /// Supported API versions: 0-1
27    pub producer_id: super::ProducerId,
28
29    /// The current epoch associated with the producer ID.
30    ///
31    /// Supported API versions: 0-1
32    pub producer_epoch: i16,
33
34    /// The result of the transaction to write to the partitions (false = ABORT, true = COMMIT).
35    ///
36    /// Supported API versions: 0-1
37    pub transaction_result: bool,
38
39    /// Each topic that we want to write transaction marker(s) for.
40    ///
41    /// Supported API versions: 0-1
42    pub topics: Vec<WritableTxnMarkerTopic>,
43
44    /// Epoch associated with the transaction state partition hosted by this transaction coordinator
45    ///
46    /// Supported API versions: 0-1
47    pub coordinator_epoch: i32,
48
49    /// Other tagged fields
50    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl WritableTxnMarker {
54    /// Sets `producer_id` to the passed value.
55    ///
56    /// The current producer ID.
57    ///
58    /// Supported API versions: 0-1
59    pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
60        self.producer_id = value;
61        self
62    }
63    /// Sets `producer_epoch` to the passed value.
64    ///
65    /// The current epoch associated with the producer ID.
66    ///
67    /// Supported API versions: 0-1
68    pub fn with_producer_epoch(mut self, value: i16) -> Self {
69        self.producer_epoch = value;
70        self
71    }
72    /// Sets `transaction_result` to the passed value.
73    ///
74    /// The result of the transaction to write to the partitions (false = ABORT, true = COMMIT).
75    ///
76    /// Supported API versions: 0-1
77    pub fn with_transaction_result(mut self, value: bool) -> Self {
78        self.transaction_result = value;
79        self
80    }
81    /// Sets `topics` to the passed value.
82    ///
83    /// Each topic that we want to write transaction marker(s) for.
84    ///
85    /// Supported API versions: 0-1
86    pub fn with_topics(mut self, value: Vec<WritableTxnMarkerTopic>) -> Self {
87        self.topics = value;
88        self
89    }
90    /// Sets `coordinator_epoch` to the passed value.
91    ///
92    /// Epoch associated with the transaction state partition hosted by this transaction coordinator
93    ///
94    /// Supported API versions: 0-1
95    pub fn with_coordinator_epoch(mut self, value: i32) -> Self {
96        self.coordinator_epoch = value;
97        self
98    }
99    /// Sets unknown_tagged_fields to the passed value.
100    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101        self.unknown_tagged_fields = value;
102        self
103    }
104    /// Inserts an entry into unknown_tagged_fields.
105    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106        self.unknown_tagged_fields.insert(key, value);
107        self
108    }
109}
110
111#[cfg(feature = "client")]
112impl Encodable for WritableTxnMarker {
113    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114        types::Int64.encode(buf, &self.producer_id)?;
115        types::Int16.encode(buf, &self.producer_epoch)?;
116        types::Boolean.encode(buf, &self.transaction_result)?;
117        if version >= 1 {
118            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
119        } else {
120            types::Array(types::Struct { version }).encode(buf, &self.topics)?;
121        }
122        types::Int32.encode(buf, &self.coordinator_epoch)?;
123        if version >= 1 {
124            let num_tagged_fields = self.unknown_tagged_fields.len();
125            if num_tagged_fields > std::u32::MAX as usize {
126                bail!(
127                    "Too many tagged fields to encode ({} fields)",
128                    num_tagged_fields
129                );
130            }
131            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
132
133            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
134        }
135        Ok(())
136    }
137    fn compute_size(&self, version: i16) -> Result<usize> {
138        let mut total_size = 0;
139        total_size += types::Int64.compute_size(&self.producer_id)?;
140        total_size += types::Int16.compute_size(&self.producer_epoch)?;
141        total_size += types::Boolean.compute_size(&self.transaction_result)?;
142        if version >= 1 {
143            total_size +=
144                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
145        } else {
146            total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
147        }
148        total_size += types::Int32.compute_size(&self.coordinator_epoch)?;
149        if version >= 1 {
150            let num_tagged_fields = self.unknown_tagged_fields.len();
151            if num_tagged_fields > std::u32::MAX as usize {
152                bail!(
153                    "Too many tagged fields to encode ({} fields)",
154                    num_tagged_fields
155                );
156            }
157            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
158
159            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
160        }
161        Ok(total_size)
162    }
163}
164
165#[cfg(feature = "broker")]
166impl Decodable for WritableTxnMarker {
167    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
168        let producer_id = types::Int64.decode(buf)?;
169        let producer_epoch = types::Int16.decode(buf)?;
170        let transaction_result = types::Boolean.decode(buf)?;
171        let topics = if version >= 1 {
172            types::CompactArray(types::Struct { version }).decode(buf)?
173        } else {
174            types::Array(types::Struct { version }).decode(buf)?
175        };
176        let coordinator_epoch = types::Int32.decode(buf)?;
177        let mut unknown_tagged_fields = BTreeMap::new();
178        if version >= 1 {
179            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
180            for _ in 0..num_tagged_fields {
181                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
182                let size: u32 = types::UnsignedVarInt.decode(buf)?;
183                let unknown_value = buf.try_get_bytes(size as usize)?;
184                unknown_tagged_fields.insert(tag as i32, unknown_value);
185            }
186        }
187        Ok(Self {
188            producer_id,
189            producer_epoch,
190            transaction_result,
191            topics,
192            coordinator_epoch,
193            unknown_tagged_fields,
194        })
195    }
196}
197
198impl Default for WritableTxnMarker {
199    fn default() -> Self {
200        Self {
201            producer_id: (0).into(),
202            producer_epoch: 0,
203            transaction_result: false,
204            topics: Default::default(),
205            coordinator_epoch: 0,
206            unknown_tagged_fields: BTreeMap::new(),
207        }
208    }
209}
210
211impl Message for WritableTxnMarker {
212    const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
213    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
214}
215
216/// Valid versions: 0-1
217#[non_exhaustive]
218#[derive(Debug, Clone, PartialEq)]
219pub struct WritableTxnMarkerTopic {
220    /// The topic name.
221    ///
222    /// Supported API versions: 0-1
223    pub name: super::TopicName,
224
225    /// The indexes of the partitions to write transaction markers for.
226    ///
227    /// Supported API versions: 0-1
228    pub partition_indexes: Vec<i32>,
229
230    /// Other tagged fields
231    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
232}
233
234impl WritableTxnMarkerTopic {
235    /// Sets `name` to the passed value.
236    ///
237    /// The topic name.
238    ///
239    /// Supported API versions: 0-1
240    pub fn with_name(mut self, value: super::TopicName) -> Self {
241        self.name = value;
242        self
243    }
244    /// Sets `partition_indexes` to the passed value.
245    ///
246    /// The indexes of the partitions to write transaction markers for.
247    ///
248    /// Supported API versions: 0-1
249    pub fn with_partition_indexes(mut self, value: Vec<i32>) -> Self {
250        self.partition_indexes = value;
251        self
252    }
253    /// Sets unknown_tagged_fields to the passed value.
254    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
255        self.unknown_tagged_fields = value;
256        self
257    }
258    /// Inserts an entry into unknown_tagged_fields.
259    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
260        self.unknown_tagged_fields.insert(key, value);
261        self
262    }
263}
264
265#[cfg(feature = "client")]
266impl Encodable for WritableTxnMarkerTopic {
267    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
268        if version >= 1 {
269            types::CompactString.encode(buf, &self.name)?;
270        } else {
271            types::String.encode(buf, &self.name)?;
272        }
273        if version >= 1 {
274            types::CompactArray(types::Int32).encode(buf, &self.partition_indexes)?;
275        } else {
276            types::Array(types::Int32).encode(buf, &self.partition_indexes)?;
277        }
278        if version >= 1 {
279            let num_tagged_fields = self.unknown_tagged_fields.len();
280            if num_tagged_fields > std::u32::MAX as usize {
281                bail!(
282                    "Too many tagged fields to encode ({} fields)",
283                    num_tagged_fields
284                );
285            }
286            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
287
288            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
289        }
290        Ok(())
291    }
292    fn compute_size(&self, version: i16) -> Result<usize> {
293        let mut total_size = 0;
294        if version >= 1 {
295            total_size += types::CompactString.compute_size(&self.name)?;
296        } else {
297            total_size += types::String.compute_size(&self.name)?;
298        }
299        if version >= 1 {
300            total_size +=
301                types::CompactArray(types::Int32).compute_size(&self.partition_indexes)?;
302        } else {
303            total_size += types::Array(types::Int32).compute_size(&self.partition_indexes)?;
304        }
305        if version >= 1 {
306            let num_tagged_fields = self.unknown_tagged_fields.len();
307            if num_tagged_fields > std::u32::MAX as usize {
308                bail!(
309                    "Too many tagged fields to encode ({} fields)",
310                    num_tagged_fields
311                );
312            }
313            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
314
315            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
316        }
317        Ok(total_size)
318    }
319}
320
321#[cfg(feature = "broker")]
322impl Decodable for WritableTxnMarkerTopic {
323    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
324        let name = if version >= 1 {
325            types::CompactString.decode(buf)?
326        } else {
327            types::String.decode(buf)?
328        };
329        let partition_indexes = if version >= 1 {
330            types::CompactArray(types::Int32).decode(buf)?
331        } else {
332            types::Array(types::Int32).decode(buf)?
333        };
334        let mut unknown_tagged_fields = BTreeMap::new();
335        if version >= 1 {
336            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
337            for _ in 0..num_tagged_fields {
338                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
339                let size: u32 = types::UnsignedVarInt.decode(buf)?;
340                let unknown_value = buf.try_get_bytes(size as usize)?;
341                unknown_tagged_fields.insert(tag as i32, unknown_value);
342            }
343        }
344        Ok(Self {
345            name,
346            partition_indexes,
347            unknown_tagged_fields,
348        })
349    }
350}
351
352impl Default for WritableTxnMarkerTopic {
353    fn default() -> Self {
354        Self {
355            name: Default::default(),
356            partition_indexes: Default::default(),
357            unknown_tagged_fields: BTreeMap::new(),
358        }
359    }
360}
361
362impl Message for WritableTxnMarkerTopic {
363    const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
364    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
365}
366
367/// Valid versions: 0-1
368#[non_exhaustive]
369#[derive(Debug, Clone, PartialEq)]
370pub struct WriteTxnMarkersRequest {
371    /// The transaction markers to be written.
372    ///
373    /// Supported API versions: 0-1
374    pub markers: Vec<WritableTxnMarker>,
375
376    /// Other tagged fields
377    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
378}
379
380impl WriteTxnMarkersRequest {
381    /// Sets `markers` to the passed value.
382    ///
383    /// The transaction markers to be written.
384    ///
385    /// Supported API versions: 0-1
386    pub fn with_markers(mut self, value: Vec<WritableTxnMarker>) -> Self {
387        self.markers = value;
388        self
389    }
390    /// Sets unknown_tagged_fields to the passed value.
391    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
392        self.unknown_tagged_fields = value;
393        self
394    }
395    /// Inserts an entry into unknown_tagged_fields.
396    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
397        self.unknown_tagged_fields.insert(key, value);
398        self
399    }
400}
401
402#[cfg(feature = "client")]
403impl Encodable for WriteTxnMarkersRequest {
404    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
405        if version >= 1 {
406            types::CompactArray(types::Struct { version }).encode(buf, &self.markers)?;
407        } else {
408            types::Array(types::Struct { version }).encode(buf, &self.markers)?;
409        }
410        if version >= 1 {
411            let num_tagged_fields = self.unknown_tagged_fields.len();
412            if num_tagged_fields > std::u32::MAX as usize {
413                bail!(
414                    "Too many tagged fields to encode ({} fields)",
415                    num_tagged_fields
416                );
417            }
418            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
419
420            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
421        }
422        Ok(())
423    }
424    fn compute_size(&self, version: i16) -> Result<usize> {
425        let mut total_size = 0;
426        if version >= 1 {
427            total_size +=
428                types::CompactArray(types::Struct { version }).compute_size(&self.markers)?;
429        } else {
430            total_size += types::Array(types::Struct { version }).compute_size(&self.markers)?;
431        }
432        if version >= 1 {
433            let num_tagged_fields = self.unknown_tagged_fields.len();
434            if num_tagged_fields > std::u32::MAX as usize {
435                bail!(
436                    "Too many tagged fields to encode ({} fields)",
437                    num_tagged_fields
438                );
439            }
440            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
441
442            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
443        }
444        Ok(total_size)
445    }
446}
447
448#[cfg(feature = "broker")]
449impl Decodable for WriteTxnMarkersRequest {
450    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
451        let markers = if version >= 1 {
452            types::CompactArray(types::Struct { version }).decode(buf)?
453        } else {
454            types::Array(types::Struct { version }).decode(buf)?
455        };
456        let mut unknown_tagged_fields = BTreeMap::new();
457        if version >= 1 {
458            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
459            for _ in 0..num_tagged_fields {
460                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
461                let size: u32 = types::UnsignedVarInt.decode(buf)?;
462                let unknown_value = buf.try_get_bytes(size as usize)?;
463                unknown_tagged_fields.insert(tag as i32, unknown_value);
464            }
465        }
466        Ok(Self {
467            markers,
468            unknown_tagged_fields,
469        })
470    }
471}
472
473impl Default for WriteTxnMarkersRequest {
474    fn default() -> Self {
475        Self {
476            markers: Default::default(),
477            unknown_tagged_fields: BTreeMap::new(),
478        }
479    }
480}
481
482impl Message for WriteTxnMarkersRequest {
483    const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
484    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
485}
486
487impl HeaderVersion for WriteTxnMarkersRequest {
488    fn header_version(version: i16) -> i16 {
489        if version >= 1 {
490            2
491        } else {
492            1
493        }
494    }
495}