1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct WritableTxnMarker {
24 pub producer_id: super::ProducerId,
28
29 pub producer_epoch: i16,
33
34 pub transaction_result: bool,
38
39 pub topics: Vec<WritableTxnMarkerTopic>,
43
44 pub coordinator_epoch: i32,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl WritableTxnMarker {
54 pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
60 self.producer_id = value;
61 self
62 }
63 pub fn with_producer_epoch(mut self, value: i16) -> Self {
69 self.producer_epoch = value;
70 self
71 }
72 pub fn with_transaction_result(mut self, value: bool) -> Self {
78 self.transaction_result = value;
79 self
80 }
81 pub fn with_topics(mut self, value: Vec<WritableTxnMarkerTopic>) -> Self {
87 self.topics = value;
88 self
89 }
90 pub fn with_coordinator_epoch(mut self, value: i32) -> Self {
96 self.coordinator_epoch = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "client")]
112impl Encodable for WritableTxnMarker {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 if version != 1 {
115 bail!("specified version not supported by this message type");
116 }
117 types::Int64.encode(buf, &self.producer_id)?;
118 types::Int16.encode(buf, &self.producer_epoch)?;
119 types::Boolean.encode(buf, &self.transaction_result)?;
120 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
121 types::Int32.encode(buf, &self.coordinator_epoch)?;
122 let num_tagged_fields = self.unknown_tagged_fields.len();
123 if num_tagged_fields > std::u32::MAX as usize {
124 bail!(
125 "Too many tagged fields to encode ({} fields)",
126 num_tagged_fields
127 );
128 }
129 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
130
131 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
132 Ok(())
133 }
134 fn compute_size(&self, version: i16) -> Result<usize> {
135 let mut total_size = 0;
136 total_size += types::Int64.compute_size(&self.producer_id)?;
137 total_size += types::Int16.compute_size(&self.producer_epoch)?;
138 total_size += types::Boolean.compute_size(&self.transaction_result)?;
139 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
140 total_size += types::Int32.compute_size(&self.coordinator_epoch)?;
141 let num_tagged_fields = self.unknown_tagged_fields.len();
142 if num_tagged_fields > std::u32::MAX as usize {
143 bail!(
144 "Too many tagged fields to encode ({} fields)",
145 num_tagged_fields
146 );
147 }
148 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
149
150 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
151 Ok(total_size)
152 }
153}
154
155#[cfg(feature = "broker")]
156impl Decodable for WritableTxnMarker {
157 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
158 if version != 1 {
159 bail!("specified version not supported by this message type");
160 }
161 let producer_id = types::Int64.decode(buf)?;
162 let producer_epoch = types::Int16.decode(buf)?;
163 let transaction_result = types::Boolean.decode(buf)?;
164 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
165 let coordinator_epoch = types::Int32.decode(buf)?;
166 let mut unknown_tagged_fields = BTreeMap::new();
167 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
168 for _ in 0..num_tagged_fields {
169 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
170 let size: u32 = types::UnsignedVarInt.decode(buf)?;
171 let unknown_value = buf.try_get_bytes(size as usize)?;
172 unknown_tagged_fields.insert(tag as i32, unknown_value);
173 }
174 Ok(Self {
175 producer_id,
176 producer_epoch,
177 transaction_result,
178 topics,
179 coordinator_epoch,
180 unknown_tagged_fields,
181 })
182 }
183}
184
185impl Default for WritableTxnMarker {
186 fn default() -> Self {
187 Self {
188 producer_id: (0).into(),
189 producer_epoch: 0,
190 transaction_result: false,
191 topics: Default::default(),
192 coordinator_epoch: 0,
193 unknown_tagged_fields: BTreeMap::new(),
194 }
195 }
196}
197
198impl Message for WritableTxnMarker {
199 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
200 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
201}
202
203#[non_exhaustive]
205#[derive(Debug, Clone, PartialEq)]
206pub struct WritableTxnMarkerTopic {
207 pub name: super::TopicName,
211
212 pub partition_indexes: Vec<i32>,
216
217 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
219}
220
221impl WritableTxnMarkerTopic {
222 pub fn with_name(mut self, value: super::TopicName) -> Self {
228 self.name = value;
229 self
230 }
231 pub fn with_partition_indexes(mut self, value: Vec<i32>) -> Self {
237 self.partition_indexes = value;
238 self
239 }
240 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
242 self.unknown_tagged_fields = value;
243 self
244 }
245 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
247 self.unknown_tagged_fields.insert(key, value);
248 self
249 }
250}
251
252#[cfg(feature = "client")]
253impl Encodable for WritableTxnMarkerTopic {
254 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
255 if version != 1 {
256 bail!("specified version not supported by this message type");
257 }
258 types::CompactString.encode(buf, &self.name)?;
259 types::CompactArray(types::Int32).encode(buf, &self.partition_indexes)?;
260 let num_tagged_fields = self.unknown_tagged_fields.len();
261 if num_tagged_fields > std::u32::MAX as usize {
262 bail!(
263 "Too many tagged fields to encode ({} fields)",
264 num_tagged_fields
265 );
266 }
267 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
268
269 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
270 Ok(())
271 }
272 fn compute_size(&self, version: i16) -> Result<usize> {
273 let mut total_size = 0;
274 total_size += types::CompactString.compute_size(&self.name)?;
275 total_size += types::CompactArray(types::Int32).compute_size(&self.partition_indexes)?;
276 let num_tagged_fields = self.unknown_tagged_fields.len();
277 if num_tagged_fields > std::u32::MAX as usize {
278 bail!(
279 "Too many tagged fields to encode ({} fields)",
280 num_tagged_fields
281 );
282 }
283 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
284
285 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
286 Ok(total_size)
287 }
288}
289
290#[cfg(feature = "broker")]
291impl Decodable for WritableTxnMarkerTopic {
292 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
293 if version != 1 {
294 bail!("specified version not supported by this message type");
295 }
296 let name = types::CompactString.decode(buf)?;
297 let partition_indexes = types::CompactArray(types::Int32).decode(buf)?;
298 let mut unknown_tagged_fields = BTreeMap::new();
299 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
300 for _ in 0..num_tagged_fields {
301 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
302 let size: u32 = types::UnsignedVarInt.decode(buf)?;
303 let unknown_value = buf.try_get_bytes(size as usize)?;
304 unknown_tagged_fields.insert(tag as i32, unknown_value);
305 }
306 Ok(Self {
307 name,
308 partition_indexes,
309 unknown_tagged_fields,
310 })
311 }
312}
313
314impl Default for WritableTxnMarkerTopic {
315 fn default() -> Self {
316 Self {
317 name: Default::default(),
318 partition_indexes: Default::default(),
319 unknown_tagged_fields: BTreeMap::new(),
320 }
321 }
322}
323
324impl Message for WritableTxnMarkerTopic {
325 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
326 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
327}
328
329#[non_exhaustive]
331#[derive(Debug, Clone, PartialEq)]
332pub struct WriteTxnMarkersRequest {
333 pub markers: Vec<WritableTxnMarker>,
337
338 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
340}
341
342impl WriteTxnMarkersRequest {
343 pub fn with_markers(mut self, value: Vec<WritableTxnMarker>) -> Self {
349 self.markers = value;
350 self
351 }
352 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
354 self.unknown_tagged_fields = value;
355 self
356 }
357 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
359 self.unknown_tagged_fields.insert(key, value);
360 self
361 }
362}
363
364#[cfg(feature = "client")]
365impl Encodable for WriteTxnMarkersRequest {
366 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
367 if version != 1 {
368 bail!("specified version not supported by this message type");
369 }
370 types::CompactArray(types::Struct { version }).encode(buf, &self.markers)?;
371 let num_tagged_fields = self.unknown_tagged_fields.len();
372 if num_tagged_fields > std::u32::MAX as usize {
373 bail!(
374 "Too many tagged fields to encode ({} fields)",
375 num_tagged_fields
376 );
377 }
378 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
379
380 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
381 Ok(())
382 }
383 fn compute_size(&self, version: i16) -> Result<usize> {
384 let mut total_size = 0;
385 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.markers)?;
386 let num_tagged_fields = self.unknown_tagged_fields.len();
387 if num_tagged_fields > std::u32::MAX as usize {
388 bail!(
389 "Too many tagged fields to encode ({} fields)",
390 num_tagged_fields
391 );
392 }
393 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
394
395 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
396 Ok(total_size)
397 }
398}
399
400#[cfg(feature = "broker")]
401impl Decodable for WriteTxnMarkersRequest {
402 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
403 if version != 1 {
404 bail!("specified version not supported by this message type");
405 }
406 let markers = types::CompactArray(types::Struct { version }).decode(buf)?;
407 let mut unknown_tagged_fields = BTreeMap::new();
408 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
409 for _ in 0..num_tagged_fields {
410 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
411 let size: u32 = types::UnsignedVarInt.decode(buf)?;
412 let unknown_value = buf.try_get_bytes(size as usize)?;
413 unknown_tagged_fields.insert(tag as i32, unknown_value);
414 }
415 Ok(Self {
416 markers,
417 unknown_tagged_fields,
418 })
419 }
420}
421
422impl Default for WriteTxnMarkersRequest {
423 fn default() -> Self {
424 Self {
425 markers: Default::default(),
426 unknown_tagged_fields: BTreeMap::new(),
427 }
428 }
429}
430
431impl Message for WriteTxnMarkersRequest {
432 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
433 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
434}
435
436impl HeaderVersion for WriteTxnMarkersRequest {
437 fn header_version(version: i16) -> i16 {
438 2
439 }
440}