1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AddPartitionsToTxnPartitionResult {
24 pub partition_index: i32,
28
29 pub partition_error_code: i16,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl AddPartitionsToTxnPartitionResult {
39 pub fn with_partition_index(mut self, value: i32) -> Self {
45 self.partition_index = value;
46 self
47 }
48 pub fn with_partition_error_code(mut self, value: i16) -> Self {
54 self.partition_error_code = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for AddPartitionsToTxnPartitionResult {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version < 0 || version > 5 {
73 bail!("specified version not supported by this message type");
74 }
75 types::Int32.encode(buf, &self.partition_index)?;
76 types::Int16.encode(buf, &self.partition_error_code)?;
77 if version >= 3 {
78 let num_tagged_fields = self.unknown_tagged_fields.len();
79 if num_tagged_fields > std::u32::MAX as usize {
80 bail!(
81 "Too many tagged fields to encode ({} fields)",
82 num_tagged_fields
83 );
84 }
85 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
86
87 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
88 }
89 Ok(())
90 }
91 fn compute_size(&self, version: i16) -> Result<usize> {
92 let mut total_size = 0;
93 total_size += types::Int32.compute_size(&self.partition_index)?;
94 total_size += types::Int16.compute_size(&self.partition_error_code)?;
95 if version >= 3 {
96 let num_tagged_fields = self.unknown_tagged_fields.len();
97 if num_tagged_fields > std::u32::MAX as usize {
98 bail!(
99 "Too many tagged fields to encode ({} fields)",
100 num_tagged_fields
101 );
102 }
103 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
104
105 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
106 }
107 Ok(total_size)
108 }
109}
110
111#[cfg(feature = "client")]
112impl Decodable for AddPartitionsToTxnPartitionResult {
113 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
114 if version < 0 || version > 5 {
115 bail!("specified version not supported by this message type");
116 }
117 let partition_index = types::Int32.decode(buf)?;
118 let partition_error_code = types::Int16.decode(buf)?;
119 let mut unknown_tagged_fields = BTreeMap::new();
120 if version >= 3 {
121 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
122 for _ in 0..num_tagged_fields {
123 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
124 let size: u32 = types::UnsignedVarInt.decode(buf)?;
125 let unknown_value = buf.try_get_bytes(size as usize)?;
126 unknown_tagged_fields.insert(tag as i32, unknown_value);
127 }
128 }
129 Ok(Self {
130 partition_index,
131 partition_error_code,
132 unknown_tagged_fields,
133 })
134 }
135}
136
137impl Default for AddPartitionsToTxnPartitionResult {
138 fn default() -> Self {
139 Self {
140 partition_index: 0,
141 partition_error_code: 0,
142 unknown_tagged_fields: BTreeMap::new(),
143 }
144 }
145}
146
147impl Message for AddPartitionsToTxnPartitionResult {
148 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
149 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
150}
151
152#[non_exhaustive]
154#[derive(Debug, Clone, PartialEq)]
155pub struct AddPartitionsToTxnResponse {
156 pub throttle_time_ms: i32,
160
161 pub error_code: i16,
165
166 pub results_by_transaction: Vec<AddPartitionsToTxnResult>,
170
171 pub results_by_topic_v3_and_below: Vec<AddPartitionsToTxnTopicResult>,
175
176 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
178}
179
180impl AddPartitionsToTxnResponse {
181 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
187 self.throttle_time_ms = value;
188 self
189 }
190 pub fn with_error_code(mut self, value: i16) -> Self {
196 self.error_code = value;
197 self
198 }
199 pub fn with_results_by_transaction(mut self, value: Vec<AddPartitionsToTxnResult>) -> Self {
205 self.results_by_transaction = value;
206 self
207 }
208 pub fn with_results_by_topic_v3_and_below(
214 mut self,
215 value: Vec<AddPartitionsToTxnTopicResult>,
216 ) -> Self {
217 self.results_by_topic_v3_and_below = value;
218 self
219 }
220 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
222 self.unknown_tagged_fields = value;
223 self
224 }
225 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
227 self.unknown_tagged_fields.insert(key, value);
228 self
229 }
230}
231
232#[cfg(feature = "broker")]
233impl Encodable for AddPartitionsToTxnResponse {
234 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
235 if version < 0 || version > 5 {
236 bail!("specified version not supported by this message type");
237 }
238 types::Int32.encode(buf, &self.throttle_time_ms)?;
239 if version >= 4 {
240 types::Int16.encode(buf, &self.error_code)?;
241 }
242 if version >= 4 {
243 types::CompactArray(types::Struct { version })
244 .encode(buf, &self.results_by_transaction)?;
245 } else {
246 if !self.results_by_transaction.is_empty() {
247 bail!("A field is set that is not available on the selected protocol version");
248 }
249 }
250 if version <= 3 {
251 if version >= 3 {
252 types::CompactArray(types::Struct { version })
253 .encode(buf, &self.results_by_topic_v3_and_below)?;
254 } else {
255 types::Array(types::Struct { version })
256 .encode(buf, &self.results_by_topic_v3_and_below)?;
257 }
258 } else {
259 if !self.results_by_topic_v3_and_below.is_empty() {
260 bail!("A field is set that is not available on the selected protocol version");
261 }
262 }
263 if version >= 3 {
264 let num_tagged_fields = self.unknown_tagged_fields.len();
265 if num_tagged_fields > std::u32::MAX as usize {
266 bail!(
267 "Too many tagged fields to encode ({} fields)",
268 num_tagged_fields
269 );
270 }
271 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
272
273 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
274 }
275 Ok(())
276 }
277 fn compute_size(&self, version: i16) -> Result<usize> {
278 let mut total_size = 0;
279 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
280 if version >= 4 {
281 total_size += types::Int16.compute_size(&self.error_code)?;
282 }
283 if version >= 4 {
284 total_size += types::CompactArray(types::Struct { version })
285 .compute_size(&self.results_by_transaction)?;
286 } else {
287 if !self.results_by_transaction.is_empty() {
288 bail!("A field is set that is not available on the selected protocol version");
289 }
290 }
291 if version <= 3 {
292 if version >= 3 {
293 total_size += types::CompactArray(types::Struct { version })
294 .compute_size(&self.results_by_topic_v3_and_below)?;
295 } else {
296 total_size += types::Array(types::Struct { version })
297 .compute_size(&self.results_by_topic_v3_and_below)?;
298 }
299 } else {
300 if !self.results_by_topic_v3_and_below.is_empty() {
301 bail!("A field is set that is not available on the selected protocol version");
302 }
303 }
304 if version >= 3 {
305 let num_tagged_fields = self.unknown_tagged_fields.len();
306 if num_tagged_fields > std::u32::MAX as usize {
307 bail!(
308 "Too many tagged fields to encode ({} fields)",
309 num_tagged_fields
310 );
311 }
312 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
313
314 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
315 }
316 Ok(total_size)
317 }
318}
319
320#[cfg(feature = "client")]
321impl Decodable for AddPartitionsToTxnResponse {
322 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
323 if version < 0 || version > 5 {
324 bail!("specified version not supported by this message type");
325 }
326 let throttle_time_ms = types::Int32.decode(buf)?;
327 let error_code = if version >= 4 {
328 types::Int16.decode(buf)?
329 } else {
330 0
331 };
332 let results_by_transaction = if version >= 4 {
333 types::CompactArray(types::Struct { version }).decode(buf)?
334 } else {
335 Default::default()
336 };
337 let results_by_topic_v3_and_below = if version <= 3 {
338 if version >= 3 {
339 types::CompactArray(types::Struct { version }).decode(buf)?
340 } else {
341 types::Array(types::Struct { version }).decode(buf)?
342 }
343 } else {
344 Default::default()
345 };
346 let mut unknown_tagged_fields = BTreeMap::new();
347 if version >= 3 {
348 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
349 for _ in 0..num_tagged_fields {
350 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
351 let size: u32 = types::UnsignedVarInt.decode(buf)?;
352 let unknown_value = buf.try_get_bytes(size as usize)?;
353 unknown_tagged_fields.insert(tag as i32, unknown_value);
354 }
355 }
356 Ok(Self {
357 throttle_time_ms,
358 error_code,
359 results_by_transaction,
360 results_by_topic_v3_and_below,
361 unknown_tagged_fields,
362 })
363 }
364}
365
366impl Default for AddPartitionsToTxnResponse {
367 fn default() -> Self {
368 Self {
369 throttle_time_ms: 0,
370 error_code: 0,
371 results_by_transaction: Default::default(),
372 results_by_topic_v3_and_below: Default::default(),
373 unknown_tagged_fields: BTreeMap::new(),
374 }
375 }
376}
377
378impl Message for AddPartitionsToTxnResponse {
379 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
380 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
381}
382
383#[non_exhaustive]
385#[derive(Debug, Clone, PartialEq)]
386pub struct AddPartitionsToTxnResult {
387 pub transactional_id: super::TransactionalId,
391
392 pub topic_results: Vec<AddPartitionsToTxnTopicResult>,
396
397 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
399}
400
401impl AddPartitionsToTxnResult {
402 pub fn with_transactional_id(mut self, value: super::TransactionalId) -> Self {
408 self.transactional_id = value;
409 self
410 }
411 pub fn with_topic_results(mut self, value: Vec<AddPartitionsToTxnTopicResult>) -> Self {
417 self.topic_results = value;
418 self
419 }
420 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
422 self.unknown_tagged_fields = value;
423 self
424 }
425 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
427 self.unknown_tagged_fields.insert(key, value);
428 self
429 }
430}
431
432#[cfg(feature = "broker")]
433impl Encodable for AddPartitionsToTxnResult {
434 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
435 if version < 0 || version > 5 {
436 bail!("specified version not supported by this message type");
437 }
438 if version >= 4 {
439 types::CompactString.encode(buf, &self.transactional_id)?;
440 } else {
441 if !self.transactional_id.is_empty() {
442 bail!("A field is set that is not available on the selected protocol version");
443 }
444 }
445 if version >= 4 {
446 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_results)?;
447 } else {
448 if !self.topic_results.is_empty() {
449 bail!("A field is set that is not available on the selected protocol version");
450 }
451 }
452 if version >= 3 {
453 let num_tagged_fields = self.unknown_tagged_fields.len();
454 if num_tagged_fields > std::u32::MAX as usize {
455 bail!(
456 "Too many tagged fields to encode ({} fields)",
457 num_tagged_fields
458 );
459 }
460 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
461
462 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
463 }
464 Ok(())
465 }
466 fn compute_size(&self, version: i16) -> Result<usize> {
467 let mut total_size = 0;
468 if version >= 4 {
469 total_size += types::CompactString.compute_size(&self.transactional_id)?;
470 } else {
471 if !self.transactional_id.is_empty() {
472 bail!("A field is set that is not available on the selected protocol version");
473 }
474 }
475 if version >= 4 {
476 total_size +=
477 types::CompactArray(types::Struct { version }).compute_size(&self.topic_results)?;
478 } else {
479 if !self.topic_results.is_empty() {
480 bail!("A field is set that is not available on the selected protocol version");
481 }
482 }
483 if version >= 3 {
484 let num_tagged_fields = self.unknown_tagged_fields.len();
485 if num_tagged_fields > std::u32::MAX as usize {
486 bail!(
487 "Too many tagged fields to encode ({} fields)",
488 num_tagged_fields
489 );
490 }
491 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
492
493 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
494 }
495 Ok(total_size)
496 }
497}
498
499#[cfg(feature = "client")]
500impl Decodable for AddPartitionsToTxnResult {
501 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
502 if version < 0 || version > 5 {
503 bail!("specified version not supported by this message type");
504 }
505 let transactional_id = if version >= 4 {
506 types::CompactString.decode(buf)?
507 } else {
508 Default::default()
509 };
510 let topic_results = if version >= 4 {
511 types::CompactArray(types::Struct { version }).decode(buf)?
512 } else {
513 Default::default()
514 };
515 let mut unknown_tagged_fields = BTreeMap::new();
516 if version >= 3 {
517 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
518 for _ in 0..num_tagged_fields {
519 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
520 let size: u32 = types::UnsignedVarInt.decode(buf)?;
521 let unknown_value = buf.try_get_bytes(size as usize)?;
522 unknown_tagged_fields.insert(tag as i32, unknown_value);
523 }
524 }
525 Ok(Self {
526 transactional_id,
527 topic_results,
528 unknown_tagged_fields,
529 })
530 }
531}
532
533impl Default for AddPartitionsToTxnResult {
534 fn default() -> Self {
535 Self {
536 transactional_id: Default::default(),
537 topic_results: Default::default(),
538 unknown_tagged_fields: BTreeMap::new(),
539 }
540 }
541}
542
543impl Message for AddPartitionsToTxnResult {
544 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
545 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
546}
547
548#[non_exhaustive]
550#[derive(Debug, Clone, PartialEq)]
551pub struct AddPartitionsToTxnTopicResult {
552 pub name: super::TopicName,
556
557 pub results_by_partition: Vec<AddPartitionsToTxnPartitionResult>,
561
562 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
564}
565
566impl AddPartitionsToTxnTopicResult {
567 pub fn with_name(mut self, value: super::TopicName) -> Self {
573 self.name = value;
574 self
575 }
576 pub fn with_results_by_partition(
582 mut self,
583 value: Vec<AddPartitionsToTxnPartitionResult>,
584 ) -> Self {
585 self.results_by_partition = value;
586 self
587 }
588 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
590 self.unknown_tagged_fields = value;
591 self
592 }
593 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
595 self.unknown_tagged_fields.insert(key, value);
596 self
597 }
598}
599
600#[cfg(feature = "broker")]
601impl Encodable for AddPartitionsToTxnTopicResult {
602 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
603 if version < 0 || version > 5 {
604 bail!("specified version not supported by this message type");
605 }
606 if version >= 3 {
607 types::CompactString.encode(buf, &self.name)?;
608 } else {
609 types::String.encode(buf, &self.name)?;
610 }
611 if version >= 3 {
612 types::CompactArray(types::Struct { version })
613 .encode(buf, &self.results_by_partition)?;
614 } else {
615 types::Array(types::Struct { version }).encode(buf, &self.results_by_partition)?;
616 }
617 if version >= 3 {
618 let num_tagged_fields = self.unknown_tagged_fields.len();
619 if num_tagged_fields > std::u32::MAX as usize {
620 bail!(
621 "Too many tagged fields to encode ({} fields)",
622 num_tagged_fields
623 );
624 }
625 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
626
627 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
628 }
629 Ok(())
630 }
631 fn compute_size(&self, version: i16) -> Result<usize> {
632 let mut total_size = 0;
633 if version >= 3 {
634 total_size += types::CompactString.compute_size(&self.name)?;
635 } else {
636 total_size += types::String.compute_size(&self.name)?;
637 }
638 if version >= 3 {
639 total_size += types::CompactArray(types::Struct { version })
640 .compute_size(&self.results_by_partition)?;
641 } else {
642 total_size +=
643 types::Array(types::Struct { version }).compute_size(&self.results_by_partition)?;
644 }
645 if version >= 3 {
646 let num_tagged_fields = self.unknown_tagged_fields.len();
647 if num_tagged_fields > std::u32::MAX as usize {
648 bail!(
649 "Too many tagged fields to encode ({} fields)",
650 num_tagged_fields
651 );
652 }
653 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
654
655 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
656 }
657 Ok(total_size)
658 }
659}
660
661#[cfg(feature = "client")]
662impl Decodable for AddPartitionsToTxnTopicResult {
663 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
664 if version < 0 || version > 5 {
665 bail!("specified version not supported by this message type");
666 }
667 let name = if version >= 3 {
668 types::CompactString.decode(buf)?
669 } else {
670 types::String.decode(buf)?
671 };
672 let results_by_partition = if version >= 3 {
673 types::CompactArray(types::Struct { version }).decode(buf)?
674 } else {
675 types::Array(types::Struct { version }).decode(buf)?
676 };
677 let mut unknown_tagged_fields = BTreeMap::new();
678 if version >= 3 {
679 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
680 for _ in 0..num_tagged_fields {
681 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
682 let size: u32 = types::UnsignedVarInt.decode(buf)?;
683 let unknown_value = buf.try_get_bytes(size as usize)?;
684 unknown_tagged_fields.insert(tag as i32, unknown_value);
685 }
686 }
687 Ok(Self {
688 name,
689 results_by_partition,
690 unknown_tagged_fields,
691 })
692 }
693}
694
695impl Default for AddPartitionsToTxnTopicResult {
696 fn default() -> Self {
697 Self {
698 name: Default::default(),
699 results_by_partition: Default::default(),
700 unknown_tagged_fields: BTreeMap::new(),
701 }
702 }
703}
704
705impl Message for AddPartitionsToTxnTopicResult {
706 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
707 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
708}
709
710impl HeaderVersion for AddPartitionsToTxnResponse {
711 fn header_version(version: i16) -> i16 {
712 if version >= 3 {
713 1
714 } else {
715 0
716 }
717 }
718}