1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct WritableTxnMarker {
24 pub producer_id: super::ProducerId,
28
29 pub producer_epoch: i16,
33
34 pub transaction_result: bool,
38
39 pub topics: Vec<WritableTxnMarkerTopic>,
43
44 pub coordinator_epoch: i32,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl WritableTxnMarker {
54 pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
60 self.producer_id = value;
61 self
62 }
63 pub fn with_producer_epoch(mut self, value: i16) -> Self {
69 self.producer_epoch = value;
70 self
71 }
72 pub fn with_transaction_result(mut self, value: bool) -> Self {
78 self.transaction_result = value;
79 self
80 }
81 pub fn with_topics(mut self, value: Vec<WritableTxnMarkerTopic>) -> Self {
87 self.topics = value;
88 self
89 }
90 pub fn with_coordinator_epoch(mut self, value: i32) -> Self {
96 self.coordinator_epoch = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "client")]
112impl Encodable for WritableTxnMarker {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 if version < 0 || version > 1 {
115 bail!("specified version not supported by this message type");
116 }
117 types::Int64.encode(buf, &self.producer_id)?;
118 types::Int16.encode(buf, &self.producer_epoch)?;
119 types::Boolean.encode(buf, &self.transaction_result)?;
120 if version >= 1 {
121 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
122 } else {
123 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
124 }
125 types::Int32.encode(buf, &self.coordinator_epoch)?;
126 if version >= 1 {
127 let num_tagged_fields = self.unknown_tagged_fields.len();
128 if num_tagged_fields > std::u32::MAX as usize {
129 bail!(
130 "Too many tagged fields to encode ({} fields)",
131 num_tagged_fields
132 );
133 }
134 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
135
136 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
137 }
138 Ok(())
139 }
140 fn compute_size(&self, version: i16) -> Result<usize> {
141 let mut total_size = 0;
142 total_size += types::Int64.compute_size(&self.producer_id)?;
143 total_size += types::Int16.compute_size(&self.producer_epoch)?;
144 total_size += types::Boolean.compute_size(&self.transaction_result)?;
145 if version >= 1 {
146 total_size +=
147 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
148 } else {
149 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
150 }
151 total_size += types::Int32.compute_size(&self.coordinator_epoch)?;
152 if version >= 1 {
153 let num_tagged_fields = self.unknown_tagged_fields.len();
154 if num_tagged_fields > std::u32::MAX as usize {
155 bail!(
156 "Too many tagged fields to encode ({} fields)",
157 num_tagged_fields
158 );
159 }
160 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
161
162 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
163 }
164 Ok(total_size)
165 }
166}
167
168#[cfg(feature = "broker")]
169impl Decodable for WritableTxnMarker {
170 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
171 if version < 0 || version > 1 {
172 bail!("specified version not supported by this message type");
173 }
174 let producer_id = types::Int64.decode(buf)?;
175 let producer_epoch = types::Int16.decode(buf)?;
176 let transaction_result = types::Boolean.decode(buf)?;
177 let topics = if version >= 1 {
178 types::CompactArray(types::Struct { version }).decode(buf)?
179 } else {
180 types::Array(types::Struct { version }).decode(buf)?
181 };
182 let coordinator_epoch = types::Int32.decode(buf)?;
183 let mut unknown_tagged_fields = BTreeMap::new();
184 if version >= 1 {
185 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
186 for _ in 0..num_tagged_fields {
187 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
188 let size: u32 = types::UnsignedVarInt.decode(buf)?;
189 let unknown_value = buf.try_get_bytes(size as usize)?;
190 unknown_tagged_fields.insert(tag as i32, unknown_value);
191 }
192 }
193 Ok(Self {
194 producer_id,
195 producer_epoch,
196 transaction_result,
197 topics,
198 coordinator_epoch,
199 unknown_tagged_fields,
200 })
201 }
202}
203
204impl Default for WritableTxnMarker {
205 fn default() -> Self {
206 Self {
207 producer_id: (0).into(),
208 producer_epoch: 0,
209 transaction_result: false,
210 topics: Default::default(),
211 coordinator_epoch: 0,
212 unknown_tagged_fields: BTreeMap::new(),
213 }
214 }
215}
216
217impl Message for WritableTxnMarker {
218 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
219 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
220}
221
222#[non_exhaustive]
224#[derive(Debug, Clone, PartialEq)]
225pub struct WritableTxnMarkerTopic {
226 pub name: super::TopicName,
230
231 pub partition_indexes: Vec<i32>,
235
236 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
238}
239
240impl WritableTxnMarkerTopic {
241 pub fn with_name(mut self, value: super::TopicName) -> Self {
247 self.name = value;
248 self
249 }
250 pub fn with_partition_indexes(mut self, value: Vec<i32>) -> Self {
256 self.partition_indexes = value;
257 self
258 }
259 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
261 self.unknown_tagged_fields = value;
262 self
263 }
264 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
266 self.unknown_tagged_fields.insert(key, value);
267 self
268 }
269}
270
271#[cfg(feature = "client")]
272impl Encodable for WritableTxnMarkerTopic {
273 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
274 if version < 0 || version > 1 {
275 bail!("specified version not supported by this message type");
276 }
277 if version >= 1 {
278 types::CompactString.encode(buf, &self.name)?;
279 } else {
280 types::String.encode(buf, &self.name)?;
281 }
282 if version >= 1 {
283 types::CompactArray(types::Int32).encode(buf, &self.partition_indexes)?;
284 } else {
285 types::Array(types::Int32).encode(buf, &self.partition_indexes)?;
286 }
287 if version >= 1 {
288 let num_tagged_fields = self.unknown_tagged_fields.len();
289 if num_tagged_fields > std::u32::MAX as usize {
290 bail!(
291 "Too many tagged fields to encode ({} fields)",
292 num_tagged_fields
293 );
294 }
295 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
296
297 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
298 }
299 Ok(())
300 }
301 fn compute_size(&self, version: i16) -> Result<usize> {
302 let mut total_size = 0;
303 if version >= 1 {
304 total_size += types::CompactString.compute_size(&self.name)?;
305 } else {
306 total_size += types::String.compute_size(&self.name)?;
307 }
308 if version >= 1 {
309 total_size +=
310 types::CompactArray(types::Int32).compute_size(&self.partition_indexes)?;
311 } else {
312 total_size += types::Array(types::Int32).compute_size(&self.partition_indexes)?;
313 }
314 if version >= 1 {
315 let num_tagged_fields = self.unknown_tagged_fields.len();
316 if num_tagged_fields > std::u32::MAX as usize {
317 bail!(
318 "Too many tagged fields to encode ({} fields)",
319 num_tagged_fields
320 );
321 }
322 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
323
324 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
325 }
326 Ok(total_size)
327 }
328}
329
330#[cfg(feature = "broker")]
331impl Decodable for WritableTxnMarkerTopic {
332 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
333 if version < 0 || version > 1 {
334 bail!("specified version not supported by this message type");
335 }
336 let name = if version >= 1 {
337 types::CompactString.decode(buf)?
338 } else {
339 types::String.decode(buf)?
340 };
341 let partition_indexes = if version >= 1 {
342 types::CompactArray(types::Int32).decode(buf)?
343 } else {
344 types::Array(types::Int32).decode(buf)?
345 };
346 let mut unknown_tagged_fields = BTreeMap::new();
347 if version >= 1 {
348 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
349 for _ in 0..num_tagged_fields {
350 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
351 let size: u32 = types::UnsignedVarInt.decode(buf)?;
352 let unknown_value = buf.try_get_bytes(size as usize)?;
353 unknown_tagged_fields.insert(tag as i32, unknown_value);
354 }
355 }
356 Ok(Self {
357 name,
358 partition_indexes,
359 unknown_tagged_fields,
360 })
361 }
362}
363
364impl Default for WritableTxnMarkerTopic {
365 fn default() -> Self {
366 Self {
367 name: Default::default(),
368 partition_indexes: Default::default(),
369 unknown_tagged_fields: BTreeMap::new(),
370 }
371 }
372}
373
374impl Message for WritableTxnMarkerTopic {
375 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
376 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
377}
378
379#[non_exhaustive]
381#[derive(Debug, Clone, PartialEq)]
382pub struct WriteTxnMarkersRequest {
383 pub markers: Vec<WritableTxnMarker>,
387
388 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
390}
391
392impl WriteTxnMarkersRequest {
393 pub fn with_markers(mut self, value: Vec<WritableTxnMarker>) -> Self {
399 self.markers = value;
400 self
401 }
402 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
404 self.unknown_tagged_fields = value;
405 self
406 }
407 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
409 self.unknown_tagged_fields.insert(key, value);
410 self
411 }
412}
413
414#[cfg(feature = "client")]
415impl Encodable for WriteTxnMarkersRequest {
416 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
417 if version < 0 || version > 1 {
418 bail!("specified version not supported by this message type");
419 }
420 if version >= 1 {
421 types::CompactArray(types::Struct { version }).encode(buf, &self.markers)?;
422 } else {
423 types::Array(types::Struct { version }).encode(buf, &self.markers)?;
424 }
425 if version >= 1 {
426 let num_tagged_fields = self.unknown_tagged_fields.len();
427 if num_tagged_fields > std::u32::MAX as usize {
428 bail!(
429 "Too many tagged fields to encode ({} fields)",
430 num_tagged_fields
431 );
432 }
433 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
434
435 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
436 }
437 Ok(())
438 }
439 fn compute_size(&self, version: i16) -> Result<usize> {
440 let mut total_size = 0;
441 if version >= 1 {
442 total_size +=
443 types::CompactArray(types::Struct { version }).compute_size(&self.markers)?;
444 } else {
445 total_size += types::Array(types::Struct { version }).compute_size(&self.markers)?;
446 }
447 if version >= 1 {
448 let num_tagged_fields = self.unknown_tagged_fields.len();
449 if num_tagged_fields > std::u32::MAX as usize {
450 bail!(
451 "Too many tagged fields to encode ({} fields)",
452 num_tagged_fields
453 );
454 }
455 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
456
457 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
458 }
459 Ok(total_size)
460 }
461}
462
463#[cfg(feature = "broker")]
464impl Decodable for WriteTxnMarkersRequest {
465 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
466 if version < 0 || version > 1 {
467 bail!("specified version not supported by this message type");
468 }
469 let markers = if version >= 1 {
470 types::CompactArray(types::Struct { version }).decode(buf)?
471 } else {
472 types::Array(types::Struct { version }).decode(buf)?
473 };
474 let mut unknown_tagged_fields = BTreeMap::new();
475 if version >= 1 {
476 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
477 for _ in 0..num_tagged_fields {
478 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
479 let size: u32 = types::UnsignedVarInt.decode(buf)?;
480 let unknown_value = buf.try_get_bytes(size as usize)?;
481 unknown_tagged_fields.insert(tag as i32, unknown_value);
482 }
483 }
484 Ok(Self {
485 markers,
486 unknown_tagged_fields,
487 })
488 }
489}
490
491impl Default for WriteTxnMarkersRequest {
492 fn default() -> Self {
493 Self {
494 markers: Default::default(),
495 unknown_tagged_fields: BTreeMap::new(),
496 }
497 }
498}
499
500impl Message for WriteTxnMarkersRequest {
501 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
502 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
503}
504
505impl HeaderVersion for WriteTxnMarkersRequest {
506 fn header_version(version: i16) -> i16 {
507 if version >= 1 {
508 2
509 } else {
510 1
511 }
512 }
513}