kafka_protocol/messages/
write_txn_markers_request.rs

1//! WriteTxnMarkersRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 1
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct WritableTxnMarker {
24    /// The current producer ID.
25    ///
26    /// Supported API versions: 1
27    pub producer_id: super::ProducerId,
28
29    /// The current epoch associated with the producer ID.
30    ///
31    /// Supported API versions: 1
32    pub producer_epoch: i16,
33
34    /// The result of the transaction to write to the partitions (false = ABORT, true = COMMIT).
35    ///
36    /// Supported API versions: 1
37    pub transaction_result: bool,
38
39    /// Each topic that we want to write transaction marker(s) for.
40    ///
41    /// Supported API versions: 1
42    pub topics: Vec<WritableTxnMarkerTopic>,
43
44    /// Epoch associated with the transaction state partition hosted by this transaction coordinator.
45    ///
46    /// Supported API versions: 1
47    pub coordinator_epoch: i32,
48
49    /// Other tagged fields
50    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl WritableTxnMarker {
54    /// Sets `producer_id` to the passed value.
55    ///
56    /// The current producer ID.
57    ///
58    /// Supported API versions: 1
59    pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
60        self.producer_id = value;
61        self
62    }
63    /// Sets `producer_epoch` to the passed value.
64    ///
65    /// The current epoch associated with the producer ID.
66    ///
67    /// Supported API versions: 1
68    pub fn with_producer_epoch(mut self, value: i16) -> Self {
69        self.producer_epoch = value;
70        self
71    }
72    /// Sets `transaction_result` to the passed value.
73    ///
74    /// The result of the transaction to write to the partitions (false = ABORT, true = COMMIT).
75    ///
76    /// Supported API versions: 1
77    pub fn with_transaction_result(mut self, value: bool) -> Self {
78        self.transaction_result = value;
79        self
80    }
81    /// Sets `topics` to the passed value.
82    ///
83    /// Each topic that we want to write transaction marker(s) for.
84    ///
85    /// Supported API versions: 1
86    pub fn with_topics(mut self, value: Vec<WritableTxnMarkerTopic>) -> Self {
87        self.topics = value;
88        self
89    }
90    /// Sets `coordinator_epoch` to the passed value.
91    ///
92    /// Epoch associated with the transaction state partition hosted by this transaction coordinator.
93    ///
94    /// Supported API versions: 1
95    pub fn with_coordinator_epoch(mut self, value: i32) -> Self {
96        self.coordinator_epoch = value;
97        self
98    }
99    /// Sets unknown_tagged_fields to the passed value.
100    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101        self.unknown_tagged_fields = value;
102        self
103    }
104    /// Inserts an entry into unknown_tagged_fields.
105    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106        self.unknown_tagged_fields.insert(key, value);
107        self
108    }
109}
110
111#[cfg(feature = "client")]
112impl Encodable for WritableTxnMarker {
113    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114        if version != 1 {
115            bail!("specified version not supported by this message type");
116        }
117        types::Int64.encode(buf, &self.producer_id)?;
118        types::Int16.encode(buf, &self.producer_epoch)?;
119        types::Boolean.encode(buf, &self.transaction_result)?;
120        types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
121        types::Int32.encode(buf, &self.coordinator_epoch)?;
122        let num_tagged_fields = self.unknown_tagged_fields.len();
123        if num_tagged_fields > std::u32::MAX as usize {
124            bail!(
125                "Too many tagged fields to encode ({} fields)",
126                num_tagged_fields
127            );
128        }
129        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
130
131        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
132        Ok(())
133    }
134    fn compute_size(&self, version: i16) -> Result<usize> {
135        let mut total_size = 0;
136        total_size += types::Int64.compute_size(&self.producer_id)?;
137        total_size += types::Int16.compute_size(&self.producer_epoch)?;
138        total_size += types::Boolean.compute_size(&self.transaction_result)?;
139        total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
140        total_size += types::Int32.compute_size(&self.coordinator_epoch)?;
141        let num_tagged_fields = self.unknown_tagged_fields.len();
142        if num_tagged_fields > std::u32::MAX as usize {
143            bail!(
144                "Too many tagged fields to encode ({} fields)",
145                num_tagged_fields
146            );
147        }
148        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
149
150        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
151        Ok(total_size)
152    }
153}
154
155#[cfg(feature = "broker")]
156impl Decodable for WritableTxnMarker {
157    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
158        if version != 1 {
159            bail!("specified version not supported by this message type");
160        }
161        let producer_id = types::Int64.decode(buf)?;
162        let producer_epoch = types::Int16.decode(buf)?;
163        let transaction_result = types::Boolean.decode(buf)?;
164        let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
165        let coordinator_epoch = types::Int32.decode(buf)?;
166        let mut unknown_tagged_fields = BTreeMap::new();
167        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
168        for _ in 0..num_tagged_fields {
169            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
170            let size: u32 = types::UnsignedVarInt.decode(buf)?;
171            let unknown_value = buf.try_get_bytes(size as usize)?;
172            unknown_tagged_fields.insert(tag as i32, unknown_value);
173        }
174        Ok(Self {
175            producer_id,
176            producer_epoch,
177            transaction_result,
178            topics,
179            coordinator_epoch,
180            unknown_tagged_fields,
181        })
182    }
183}
184
185impl Default for WritableTxnMarker {
186    fn default() -> Self {
187        Self {
188            producer_id: (0).into(),
189            producer_epoch: 0,
190            transaction_result: false,
191            topics: Default::default(),
192            coordinator_epoch: 0,
193            unknown_tagged_fields: BTreeMap::new(),
194        }
195    }
196}
197
198impl Message for WritableTxnMarker {
199    const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
200    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
201}
202
203/// Valid versions: 1
204#[non_exhaustive]
205#[derive(Debug, Clone, PartialEq)]
206pub struct WritableTxnMarkerTopic {
207    /// The topic name.
208    ///
209    /// Supported API versions: 1
210    pub name: super::TopicName,
211
212    /// The indexes of the partitions to write transaction markers for.
213    ///
214    /// Supported API versions: 1
215    pub partition_indexes: Vec<i32>,
216
217    /// Other tagged fields
218    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
219}
220
221impl WritableTxnMarkerTopic {
222    /// Sets `name` to the passed value.
223    ///
224    /// The topic name.
225    ///
226    /// Supported API versions: 1
227    pub fn with_name(mut self, value: super::TopicName) -> Self {
228        self.name = value;
229        self
230    }
231    /// Sets `partition_indexes` to the passed value.
232    ///
233    /// The indexes of the partitions to write transaction markers for.
234    ///
235    /// Supported API versions: 1
236    pub fn with_partition_indexes(mut self, value: Vec<i32>) -> Self {
237        self.partition_indexes = value;
238        self
239    }
240    /// Sets unknown_tagged_fields to the passed value.
241    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
242        self.unknown_tagged_fields = value;
243        self
244    }
245    /// Inserts an entry into unknown_tagged_fields.
246    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
247        self.unknown_tagged_fields.insert(key, value);
248        self
249    }
250}
251
252#[cfg(feature = "client")]
253impl Encodable for WritableTxnMarkerTopic {
254    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
255        if version != 1 {
256            bail!("specified version not supported by this message type");
257        }
258        types::CompactString.encode(buf, &self.name)?;
259        types::CompactArray(types::Int32).encode(buf, &self.partition_indexes)?;
260        let num_tagged_fields = self.unknown_tagged_fields.len();
261        if num_tagged_fields > std::u32::MAX as usize {
262            bail!(
263                "Too many tagged fields to encode ({} fields)",
264                num_tagged_fields
265            );
266        }
267        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
268
269        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
270        Ok(())
271    }
272    fn compute_size(&self, version: i16) -> Result<usize> {
273        let mut total_size = 0;
274        total_size += types::CompactString.compute_size(&self.name)?;
275        total_size += types::CompactArray(types::Int32).compute_size(&self.partition_indexes)?;
276        let num_tagged_fields = self.unknown_tagged_fields.len();
277        if num_tagged_fields > std::u32::MAX as usize {
278            bail!(
279                "Too many tagged fields to encode ({} fields)",
280                num_tagged_fields
281            );
282        }
283        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
284
285        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
286        Ok(total_size)
287    }
288}
289
290#[cfg(feature = "broker")]
291impl Decodable for WritableTxnMarkerTopic {
292    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
293        if version != 1 {
294            bail!("specified version not supported by this message type");
295        }
296        let name = types::CompactString.decode(buf)?;
297        let partition_indexes = types::CompactArray(types::Int32).decode(buf)?;
298        let mut unknown_tagged_fields = BTreeMap::new();
299        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
300        for _ in 0..num_tagged_fields {
301            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
302            let size: u32 = types::UnsignedVarInt.decode(buf)?;
303            let unknown_value = buf.try_get_bytes(size as usize)?;
304            unknown_tagged_fields.insert(tag as i32, unknown_value);
305        }
306        Ok(Self {
307            name,
308            partition_indexes,
309            unknown_tagged_fields,
310        })
311    }
312}
313
314impl Default for WritableTxnMarkerTopic {
315    fn default() -> Self {
316        Self {
317            name: Default::default(),
318            partition_indexes: Default::default(),
319            unknown_tagged_fields: BTreeMap::new(),
320        }
321    }
322}
323
324impl Message for WritableTxnMarkerTopic {
325    const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
326    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
327}
328
329/// Valid versions: 1
330#[non_exhaustive]
331#[derive(Debug, Clone, PartialEq)]
332pub struct WriteTxnMarkersRequest {
333    /// The transaction markers to be written.
334    ///
335    /// Supported API versions: 1
336    pub markers: Vec<WritableTxnMarker>,
337
338    /// Other tagged fields
339    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
340}
341
342impl WriteTxnMarkersRequest {
343    /// Sets `markers` to the passed value.
344    ///
345    /// The transaction markers to be written.
346    ///
347    /// Supported API versions: 1
348    pub fn with_markers(mut self, value: Vec<WritableTxnMarker>) -> Self {
349        self.markers = value;
350        self
351    }
352    /// Sets unknown_tagged_fields to the passed value.
353    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
354        self.unknown_tagged_fields = value;
355        self
356    }
357    /// Inserts an entry into unknown_tagged_fields.
358    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
359        self.unknown_tagged_fields.insert(key, value);
360        self
361    }
362}
363
364#[cfg(feature = "client")]
365impl Encodable for WriteTxnMarkersRequest {
366    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
367        if version != 1 {
368            bail!("specified version not supported by this message type");
369        }
370        types::CompactArray(types::Struct { version }).encode(buf, &self.markers)?;
371        let num_tagged_fields = self.unknown_tagged_fields.len();
372        if num_tagged_fields > std::u32::MAX as usize {
373            bail!(
374                "Too many tagged fields to encode ({} fields)",
375                num_tagged_fields
376            );
377        }
378        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
379
380        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
381        Ok(())
382    }
383    fn compute_size(&self, version: i16) -> Result<usize> {
384        let mut total_size = 0;
385        total_size += types::CompactArray(types::Struct { version }).compute_size(&self.markers)?;
386        let num_tagged_fields = self.unknown_tagged_fields.len();
387        if num_tagged_fields > std::u32::MAX as usize {
388            bail!(
389                "Too many tagged fields to encode ({} fields)",
390                num_tagged_fields
391            );
392        }
393        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
394
395        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
396        Ok(total_size)
397    }
398}
399
400#[cfg(feature = "broker")]
401impl Decodable for WriteTxnMarkersRequest {
402    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
403        if version != 1 {
404            bail!("specified version not supported by this message type");
405        }
406        let markers = types::CompactArray(types::Struct { version }).decode(buf)?;
407        let mut unknown_tagged_fields = BTreeMap::new();
408        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
409        for _ in 0..num_tagged_fields {
410            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
411            let size: u32 = types::UnsignedVarInt.decode(buf)?;
412            let unknown_value = buf.try_get_bytes(size as usize)?;
413            unknown_tagged_fields.insert(tag as i32, unknown_value);
414        }
415        Ok(Self {
416            markers,
417            unknown_tagged_fields,
418        })
419    }
420}
421
422impl Default for WriteTxnMarkersRequest {
423    fn default() -> Self {
424        Self {
425            markers: Default::default(),
426            unknown_tagged_fields: BTreeMap::new(),
427        }
428    }
429}
430
431impl Message for WriteTxnMarkersRequest {
432    const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
433    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
434}
435
436impl HeaderVersion for WriteTxnMarkersRequest {
437    fn header_version(version: i16) -> i16 {
438        2
439    }
440}