1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct WritableTxnMarker {
24 pub producer_id: super::ProducerId,
28
29 pub producer_epoch: i16,
33
34 pub transaction_result: bool,
38
39 pub topics: Vec<WritableTxnMarkerTopic>,
43
44 pub coordinator_epoch: i32,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl WritableTxnMarker {
54 pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
60 self.producer_id = value;
61 self
62 }
63 pub fn with_producer_epoch(mut self, value: i16) -> Self {
69 self.producer_epoch = value;
70 self
71 }
72 pub fn with_transaction_result(mut self, value: bool) -> Self {
78 self.transaction_result = value;
79 self
80 }
81 pub fn with_topics(mut self, value: Vec<WritableTxnMarkerTopic>) -> Self {
87 self.topics = value;
88 self
89 }
90 pub fn with_coordinator_epoch(mut self, value: i32) -> Self {
96 self.coordinator_epoch = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "client")]
112impl Encodable for WritableTxnMarker {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 types::Int64.encode(buf, &self.producer_id)?;
115 types::Int16.encode(buf, &self.producer_epoch)?;
116 types::Boolean.encode(buf, &self.transaction_result)?;
117 if version >= 1 {
118 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
119 } else {
120 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
121 }
122 types::Int32.encode(buf, &self.coordinator_epoch)?;
123 if version >= 1 {
124 let num_tagged_fields = self.unknown_tagged_fields.len();
125 if num_tagged_fields > std::u32::MAX as usize {
126 bail!(
127 "Too many tagged fields to encode ({} fields)",
128 num_tagged_fields
129 );
130 }
131 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
132
133 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
134 }
135 Ok(())
136 }
137 fn compute_size(&self, version: i16) -> Result<usize> {
138 let mut total_size = 0;
139 total_size += types::Int64.compute_size(&self.producer_id)?;
140 total_size += types::Int16.compute_size(&self.producer_epoch)?;
141 total_size += types::Boolean.compute_size(&self.transaction_result)?;
142 if version >= 1 {
143 total_size +=
144 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
145 } else {
146 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
147 }
148 total_size += types::Int32.compute_size(&self.coordinator_epoch)?;
149 if version >= 1 {
150 let num_tagged_fields = self.unknown_tagged_fields.len();
151 if num_tagged_fields > std::u32::MAX as usize {
152 bail!(
153 "Too many tagged fields to encode ({} fields)",
154 num_tagged_fields
155 );
156 }
157 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
158
159 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
160 }
161 Ok(total_size)
162 }
163}
164
165#[cfg(feature = "broker")]
166impl Decodable for WritableTxnMarker {
167 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
168 let producer_id = types::Int64.decode(buf)?;
169 let producer_epoch = types::Int16.decode(buf)?;
170 let transaction_result = types::Boolean.decode(buf)?;
171 let topics = if version >= 1 {
172 types::CompactArray(types::Struct { version }).decode(buf)?
173 } else {
174 types::Array(types::Struct { version }).decode(buf)?
175 };
176 let coordinator_epoch = types::Int32.decode(buf)?;
177 let mut unknown_tagged_fields = BTreeMap::new();
178 if version >= 1 {
179 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
180 for _ in 0..num_tagged_fields {
181 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
182 let size: u32 = types::UnsignedVarInt.decode(buf)?;
183 let unknown_value = buf.try_get_bytes(size as usize)?;
184 unknown_tagged_fields.insert(tag as i32, unknown_value);
185 }
186 }
187 Ok(Self {
188 producer_id,
189 producer_epoch,
190 transaction_result,
191 topics,
192 coordinator_epoch,
193 unknown_tagged_fields,
194 })
195 }
196}
197
198impl Default for WritableTxnMarker {
199 fn default() -> Self {
200 Self {
201 producer_id: (0).into(),
202 producer_epoch: 0,
203 transaction_result: false,
204 topics: Default::default(),
205 coordinator_epoch: 0,
206 unknown_tagged_fields: BTreeMap::new(),
207 }
208 }
209}
210
211impl Message for WritableTxnMarker {
212 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
213 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
214}
215
216#[non_exhaustive]
218#[derive(Debug, Clone, PartialEq)]
219pub struct WritableTxnMarkerTopic {
220 pub name: super::TopicName,
224
225 pub partition_indexes: Vec<i32>,
229
230 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
232}
233
234impl WritableTxnMarkerTopic {
235 pub fn with_name(mut self, value: super::TopicName) -> Self {
241 self.name = value;
242 self
243 }
244 pub fn with_partition_indexes(mut self, value: Vec<i32>) -> Self {
250 self.partition_indexes = value;
251 self
252 }
253 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
255 self.unknown_tagged_fields = value;
256 self
257 }
258 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
260 self.unknown_tagged_fields.insert(key, value);
261 self
262 }
263}
264
265#[cfg(feature = "client")]
266impl Encodable for WritableTxnMarkerTopic {
267 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
268 if version >= 1 {
269 types::CompactString.encode(buf, &self.name)?;
270 } else {
271 types::String.encode(buf, &self.name)?;
272 }
273 if version >= 1 {
274 types::CompactArray(types::Int32).encode(buf, &self.partition_indexes)?;
275 } else {
276 types::Array(types::Int32).encode(buf, &self.partition_indexes)?;
277 }
278 if version >= 1 {
279 let num_tagged_fields = self.unknown_tagged_fields.len();
280 if num_tagged_fields > std::u32::MAX as usize {
281 bail!(
282 "Too many tagged fields to encode ({} fields)",
283 num_tagged_fields
284 );
285 }
286 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
287
288 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
289 }
290 Ok(())
291 }
292 fn compute_size(&self, version: i16) -> Result<usize> {
293 let mut total_size = 0;
294 if version >= 1 {
295 total_size += types::CompactString.compute_size(&self.name)?;
296 } else {
297 total_size += types::String.compute_size(&self.name)?;
298 }
299 if version >= 1 {
300 total_size +=
301 types::CompactArray(types::Int32).compute_size(&self.partition_indexes)?;
302 } else {
303 total_size += types::Array(types::Int32).compute_size(&self.partition_indexes)?;
304 }
305 if version >= 1 {
306 let num_tagged_fields = self.unknown_tagged_fields.len();
307 if num_tagged_fields > std::u32::MAX as usize {
308 bail!(
309 "Too many tagged fields to encode ({} fields)",
310 num_tagged_fields
311 );
312 }
313 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
314
315 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
316 }
317 Ok(total_size)
318 }
319}
320
321#[cfg(feature = "broker")]
322impl Decodable for WritableTxnMarkerTopic {
323 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
324 let name = if version >= 1 {
325 types::CompactString.decode(buf)?
326 } else {
327 types::String.decode(buf)?
328 };
329 let partition_indexes = if version >= 1 {
330 types::CompactArray(types::Int32).decode(buf)?
331 } else {
332 types::Array(types::Int32).decode(buf)?
333 };
334 let mut unknown_tagged_fields = BTreeMap::new();
335 if version >= 1 {
336 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
337 for _ in 0..num_tagged_fields {
338 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
339 let size: u32 = types::UnsignedVarInt.decode(buf)?;
340 let unknown_value = buf.try_get_bytes(size as usize)?;
341 unknown_tagged_fields.insert(tag as i32, unknown_value);
342 }
343 }
344 Ok(Self {
345 name,
346 partition_indexes,
347 unknown_tagged_fields,
348 })
349 }
350}
351
352impl Default for WritableTxnMarkerTopic {
353 fn default() -> Self {
354 Self {
355 name: Default::default(),
356 partition_indexes: Default::default(),
357 unknown_tagged_fields: BTreeMap::new(),
358 }
359 }
360}
361
362impl Message for WritableTxnMarkerTopic {
363 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
364 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
365}
366
367#[non_exhaustive]
369#[derive(Debug, Clone, PartialEq)]
370pub struct WriteTxnMarkersRequest {
371 pub markers: Vec<WritableTxnMarker>,
375
376 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
378}
379
380impl WriteTxnMarkersRequest {
381 pub fn with_markers(mut self, value: Vec<WritableTxnMarker>) -> Self {
387 self.markers = value;
388 self
389 }
390 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
392 self.unknown_tagged_fields = value;
393 self
394 }
395 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
397 self.unknown_tagged_fields.insert(key, value);
398 self
399 }
400}
401
402#[cfg(feature = "client")]
403impl Encodable for WriteTxnMarkersRequest {
404 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
405 if version >= 1 {
406 types::CompactArray(types::Struct { version }).encode(buf, &self.markers)?;
407 } else {
408 types::Array(types::Struct { version }).encode(buf, &self.markers)?;
409 }
410 if version >= 1 {
411 let num_tagged_fields = self.unknown_tagged_fields.len();
412 if num_tagged_fields > std::u32::MAX as usize {
413 bail!(
414 "Too many tagged fields to encode ({} fields)",
415 num_tagged_fields
416 );
417 }
418 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
419
420 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
421 }
422 Ok(())
423 }
424 fn compute_size(&self, version: i16) -> Result<usize> {
425 let mut total_size = 0;
426 if version >= 1 {
427 total_size +=
428 types::CompactArray(types::Struct { version }).compute_size(&self.markers)?;
429 } else {
430 total_size += types::Array(types::Struct { version }).compute_size(&self.markers)?;
431 }
432 if version >= 1 {
433 let num_tagged_fields = self.unknown_tagged_fields.len();
434 if num_tagged_fields > std::u32::MAX as usize {
435 bail!(
436 "Too many tagged fields to encode ({} fields)",
437 num_tagged_fields
438 );
439 }
440 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
441
442 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
443 }
444 Ok(total_size)
445 }
446}
447
448#[cfg(feature = "broker")]
449impl Decodable for WriteTxnMarkersRequest {
450 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
451 let markers = if version >= 1 {
452 types::CompactArray(types::Struct { version }).decode(buf)?
453 } else {
454 types::Array(types::Struct { version }).decode(buf)?
455 };
456 let mut unknown_tagged_fields = BTreeMap::new();
457 if version >= 1 {
458 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
459 for _ in 0..num_tagged_fields {
460 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
461 let size: u32 = types::UnsignedVarInt.decode(buf)?;
462 let unknown_value = buf.try_get_bytes(size as usize)?;
463 unknown_tagged_fields.insert(tag as i32, unknown_value);
464 }
465 }
466 Ok(Self {
467 markers,
468 unknown_tagged_fields,
469 })
470 }
471}
472
473impl Default for WriteTxnMarkersRequest {
474 fn default() -> Self {
475 Self {
476 markers: Default::default(),
477 unknown_tagged_fields: BTreeMap::new(),
478 }
479 }
480}
481
482impl Message for WriteTxnMarkersRequest {
483 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
484 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
485}
486
487impl HeaderVersion for WriteTxnMarkersRequest {
488 fn header_version(version: i16) -> i16 {
489 if version >= 1 {
490 2
491 } else {
492 1
493 }
494 }
495}