1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AddPartitionsToTxnPartitionResult {
24 pub partition_index: i32,
28
29 pub partition_error_code: i16,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl AddPartitionsToTxnPartitionResult {
39 pub fn with_partition_index(mut self, value: i32) -> Self {
45 self.partition_index = value;
46 self
47 }
48 pub fn with_partition_error_code(mut self, value: i16) -> Self {
54 self.partition_error_code = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for AddPartitionsToTxnPartitionResult {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 types::Int32.encode(buf, &self.partition_index)?;
73 types::Int16.encode(buf, &self.partition_error_code)?;
74 if version >= 3 {
75 let num_tagged_fields = self.unknown_tagged_fields.len();
76 if num_tagged_fields > std::u32::MAX as usize {
77 bail!(
78 "Too many tagged fields to encode ({} fields)",
79 num_tagged_fields
80 );
81 }
82 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
83
84 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
85 }
86 Ok(())
87 }
88 fn compute_size(&self, version: i16) -> Result<usize> {
89 let mut total_size = 0;
90 total_size += types::Int32.compute_size(&self.partition_index)?;
91 total_size += types::Int16.compute_size(&self.partition_error_code)?;
92 if version >= 3 {
93 let num_tagged_fields = self.unknown_tagged_fields.len();
94 if num_tagged_fields > std::u32::MAX as usize {
95 bail!(
96 "Too many tagged fields to encode ({} fields)",
97 num_tagged_fields
98 );
99 }
100 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
101
102 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
103 }
104 Ok(total_size)
105 }
106}
107
108#[cfg(feature = "client")]
109impl Decodable for AddPartitionsToTxnPartitionResult {
110 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
111 let partition_index = types::Int32.decode(buf)?;
112 let partition_error_code = types::Int16.decode(buf)?;
113 let mut unknown_tagged_fields = BTreeMap::new();
114 if version >= 3 {
115 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
116 for _ in 0..num_tagged_fields {
117 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
118 let size: u32 = types::UnsignedVarInt.decode(buf)?;
119 let unknown_value = buf.try_get_bytes(size as usize)?;
120 unknown_tagged_fields.insert(tag as i32, unknown_value);
121 }
122 }
123 Ok(Self {
124 partition_index,
125 partition_error_code,
126 unknown_tagged_fields,
127 })
128 }
129}
130
131impl Default for AddPartitionsToTxnPartitionResult {
132 fn default() -> Self {
133 Self {
134 partition_index: 0,
135 partition_error_code: 0,
136 unknown_tagged_fields: BTreeMap::new(),
137 }
138 }
139}
140
141impl Message for AddPartitionsToTxnPartitionResult {
142 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
143 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
144}
145
146#[non_exhaustive]
148#[derive(Debug, Clone, PartialEq)]
149pub struct AddPartitionsToTxnResponse {
150 pub throttle_time_ms: i32,
154
155 pub error_code: i16,
159
160 pub results_by_transaction: Vec<AddPartitionsToTxnResult>,
164
165 pub results_by_topic_v3_and_below: Vec<AddPartitionsToTxnTopicResult>,
169
170 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
172}
173
174impl AddPartitionsToTxnResponse {
175 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
181 self.throttle_time_ms = value;
182 self
183 }
184 pub fn with_error_code(mut self, value: i16) -> Self {
190 self.error_code = value;
191 self
192 }
193 pub fn with_results_by_transaction(mut self, value: Vec<AddPartitionsToTxnResult>) -> Self {
199 self.results_by_transaction = value;
200 self
201 }
202 pub fn with_results_by_topic_v3_and_below(
208 mut self,
209 value: Vec<AddPartitionsToTxnTopicResult>,
210 ) -> Self {
211 self.results_by_topic_v3_and_below = value;
212 self
213 }
214 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
216 self.unknown_tagged_fields = value;
217 self
218 }
219 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
221 self.unknown_tagged_fields.insert(key, value);
222 self
223 }
224}
225
226#[cfg(feature = "broker")]
227impl Encodable for AddPartitionsToTxnResponse {
228 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
229 types::Int32.encode(buf, &self.throttle_time_ms)?;
230 if version >= 4 {
231 types::Int16.encode(buf, &self.error_code)?;
232 }
233 if version >= 4 {
234 types::CompactArray(types::Struct { version })
235 .encode(buf, &self.results_by_transaction)?;
236 } else {
237 if !self.results_by_transaction.is_empty() {
238 bail!("A field is set that is not available on the selected protocol version");
239 }
240 }
241 if version <= 3 {
242 if version >= 3 {
243 types::CompactArray(types::Struct { version })
244 .encode(buf, &self.results_by_topic_v3_and_below)?;
245 } else {
246 types::Array(types::Struct { version })
247 .encode(buf, &self.results_by_topic_v3_and_below)?;
248 }
249 } else {
250 if !self.results_by_topic_v3_and_below.is_empty() {
251 bail!("A field is set that is not available on the selected protocol version");
252 }
253 }
254 if version >= 3 {
255 let num_tagged_fields = self.unknown_tagged_fields.len();
256 if num_tagged_fields > std::u32::MAX as usize {
257 bail!(
258 "Too many tagged fields to encode ({} fields)",
259 num_tagged_fields
260 );
261 }
262 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
263
264 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
265 }
266 Ok(())
267 }
268 fn compute_size(&self, version: i16) -> Result<usize> {
269 let mut total_size = 0;
270 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
271 if version >= 4 {
272 total_size += types::Int16.compute_size(&self.error_code)?;
273 }
274 if version >= 4 {
275 total_size += types::CompactArray(types::Struct { version })
276 .compute_size(&self.results_by_transaction)?;
277 } else {
278 if !self.results_by_transaction.is_empty() {
279 bail!("A field is set that is not available on the selected protocol version");
280 }
281 }
282 if version <= 3 {
283 if version >= 3 {
284 total_size += types::CompactArray(types::Struct { version })
285 .compute_size(&self.results_by_topic_v3_and_below)?;
286 } else {
287 total_size += types::Array(types::Struct { version })
288 .compute_size(&self.results_by_topic_v3_and_below)?;
289 }
290 } else {
291 if !self.results_by_topic_v3_and_below.is_empty() {
292 bail!("A field is set that is not available on the selected protocol version");
293 }
294 }
295 if version >= 3 {
296 let num_tagged_fields = self.unknown_tagged_fields.len();
297 if num_tagged_fields > std::u32::MAX as usize {
298 bail!(
299 "Too many tagged fields to encode ({} fields)",
300 num_tagged_fields
301 );
302 }
303 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
304
305 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
306 }
307 Ok(total_size)
308 }
309}
310
311#[cfg(feature = "client")]
312impl Decodable for AddPartitionsToTxnResponse {
313 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
314 let throttle_time_ms = types::Int32.decode(buf)?;
315 let error_code = if version >= 4 {
316 types::Int16.decode(buf)?
317 } else {
318 0
319 };
320 let results_by_transaction = if version >= 4 {
321 types::CompactArray(types::Struct { version }).decode(buf)?
322 } else {
323 Default::default()
324 };
325 let results_by_topic_v3_and_below = if version <= 3 {
326 if version >= 3 {
327 types::CompactArray(types::Struct { version }).decode(buf)?
328 } else {
329 types::Array(types::Struct { version }).decode(buf)?
330 }
331 } else {
332 Default::default()
333 };
334 let mut unknown_tagged_fields = BTreeMap::new();
335 if version >= 3 {
336 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
337 for _ in 0..num_tagged_fields {
338 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
339 let size: u32 = types::UnsignedVarInt.decode(buf)?;
340 let unknown_value = buf.try_get_bytes(size as usize)?;
341 unknown_tagged_fields.insert(tag as i32, unknown_value);
342 }
343 }
344 Ok(Self {
345 throttle_time_ms,
346 error_code,
347 results_by_transaction,
348 results_by_topic_v3_and_below,
349 unknown_tagged_fields,
350 })
351 }
352}
353
354impl Default for AddPartitionsToTxnResponse {
355 fn default() -> Self {
356 Self {
357 throttle_time_ms: 0,
358 error_code: 0,
359 results_by_transaction: Default::default(),
360 results_by_topic_v3_and_below: Default::default(),
361 unknown_tagged_fields: BTreeMap::new(),
362 }
363 }
364}
365
366impl Message for AddPartitionsToTxnResponse {
367 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
368 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
369}
370
371#[non_exhaustive]
373#[derive(Debug, Clone, PartialEq)]
374pub struct AddPartitionsToTxnResult {
375 pub transactional_id: super::TransactionalId,
379
380 pub topic_results: Vec<AddPartitionsToTxnTopicResult>,
384
385 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
387}
388
389impl AddPartitionsToTxnResult {
390 pub fn with_transactional_id(mut self, value: super::TransactionalId) -> Self {
396 self.transactional_id = value;
397 self
398 }
399 pub fn with_topic_results(mut self, value: Vec<AddPartitionsToTxnTopicResult>) -> Self {
405 self.topic_results = value;
406 self
407 }
408 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
410 self.unknown_tagged_fields = value;
411 self
412 }
413 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
415 self.unknown_tagged_fields.insert(key, value);
416 self
417 }
418}
419
420#[cfg(feature = "broker")]
421impl Encodable for AddPartitionsToTxnResult {
422 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
423 if version >= 4 {
424 types::CompactString.encode(buf, &self.transactional_id)?;
425 } else {
426 if !self.transactional_id.is_empty() {
427 bail!("A field is set that is not available on the selected protocol version");
428 }
429 }
430 if version >= 4 {
431 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_results)?;
432 } else {
433 if !self.topic_results.is_empty() {
434 bail!("A field is set that is not available on the selected protocol version");
435 }
436 }
437 if version >= 3 {
438 let num_tagged_fields = self.unknown_tagged_fields.len();
439 if num_tagged_fields > std::u32::MAX as usize {
440 bail!(
441 "Too many tagged fields to encode ({} fields)",
442 num_tagged_fields
443 );
444 }
445 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
446
447 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
448 }
449 Ok(())
450 }
451 fn compute_size(&self, version: i16) -> Result<usize> {
452 let mut total_size = 0;
453 if version >= 4 {
454 total_size += types::CompactString.compute_size(&self.transactional_id)?;
455 } else {
456 if !self.transactional_id.is_empty() {
457 bail!("A field is set that is not available on the selected protocol version");
458 }
459 }
460 if version >= 4 {
461 total_size +=
462 types::CompactArray(types::Struct { version }).compute_size(&self.topic_results)?;
463 } else {
464 if !self.topic_results.is_empty() {
465 bail!("A field is set that is not available on the selected protocol version");
466 }
467 }
468 if version >= 3 {
469 let num_tagged_fields = self.unknown_tagged_fields.len();
470 if num_tagged_fields > std::u32::MAX as usize {
471 bail!(
472 "Too many tagged fields to encode ({} fields)",
473 num_tagged_fields
474 );
475 }
476 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
477
478 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
479 }
480 Ok(total_size)
481 }
482}
483
484#[cfg(feature = "client")]
485impl Decodable for AddPartitionsToTxnResult {
486 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
487 let transactional_id = if version >= 4 {
488 types::CompactString.decode(buf)?
489 } else {
490 Default::default()
491 };
492 let topic_results = if version >= 4 {
493 types::CompactArray(types::Struct { version }).decode(buf)?
494 } else {
495 Default::default()
496 };
497 let mut unknown_tagged_fields = BTreeMap::new();
498 if version >= 3 {
499 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
500 for _ in 0..num_tagged_fields {
501 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
502 let size: u32 = types::UnsignedVarInt.decode(buf)?;
503 let unknown_value = buf.try_get_bytes(size as usize)?;
504 unknown_tagged_fields.insert(tag as i32, unknown_value);
505 }
506 }
507 Ok(Self {
508 transactional_id,
509 topic_results,
510 unknown_tagged_fields,
511 })
512 }
513}
514
515impl Default for AddPartitionsToTxnResult {
516 fn default() -> Self {
517 Self {
518 transactional_id: Default::default(),
519 topic_results: Default::default(),
520 unknown_tagged_fields: BTreeMap::new(),
521 }
522 }
523}
524
525impl Message for AddPartitionsToTxnResult {
526 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
527 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
528}
529
530#[non_exhaustive]
532#[derive(Debug, Clone, PartialEq)]
533pub struct AddPartitionsToTxnTopicResult {
534 pub name: super::TopicName,
538
539 pub results_by_partition: Vec<AddPartitionsToTxnPartitionResult>,
543
544 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
546}
547
548impl AddPartitionsToTxnTopicResult {
549 pub fn with_name(mut self, value: super::TopicName) -> Self {
555 self.name = value;
556 self
557 }
558 pub fn with_results_by_partition(
564 mut self,
565 value: Vec<AddPartitionsToTxnPartitionResult>,
566 ) -> Self {
567 self.results_by_partition = value;
568 self
569 }
570 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
572 self.unknown_tagged_fields = value;
573 self
574 }
575 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
577 self.unknown_tagged_fields.insert(key, value);
578 self
579 }
580}
581
582#[cfg(feature = "broker")]
583impl Encodable for AddPartitionsToTxnTopicResult {
584 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
585 if version >= 3 {
586 types::CompactString.encode(buf, &self.name)?;
587 } else {
588 types::String.encode(buf, &self.name)?;
589 }
590 if version >= 3 {
591 types::CompactArray(types::Struct { version })
592 .encode(buf, &self.results_by_partition)?;
593 } else {
594 types::Array(types::Struct { version }).encode(buf, &self.results_by_partition)?;
595 }
596 if version >= 3 {
597 let num_tagged_fields = self.unknown_tagged_fields.len();
598 if num_tagged_fields > std::u32::MAX as usize {
599 bail!(
600 "Too many tagged fields to encode ({} fields)",
601 num_tagged_fields
602 );
603 }
604 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
605
606 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
607 }
608 Ok(())
609 }
610 fn compute_size(&self, version: i16) -> Result<usize> {
611 let mut total_size = 0;
612 if version >= 3 {
613 total_size += types::CompactString.compute_size(&self.name)?;
614 } else {
615 total_size += types::String.compute_size(&self.name)?;
616 }
617 if version >= 3 {
618 total_size += types::CompactArray(types::Struct { version })
619 .compute_size(&self.results_by_partition)?;
620 } else {
621 total_size +=
622 types::Array(types::Struct { version }).compute_size(&self.results_by_partition)?;
623 }
624 if version >= 3 {
625 let num_tagged_fields = self.unknown_tagged_fields.len();
626 if num_tagged_fields > std::u32::MAX as usize {
627 bail!(
628 "Too many tagged fields to encode ({} fields)",
629 num_tagged_fields
630 );
631 }
632 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
633
634 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
635 }
636 Ok(total_size)
637 }
638}
639
640#[cfg(feature = "client")]
641impl Decodable for AddPartitionsToTxnTopicResult {
642 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
643 let name = if version >= 3 {
644 types::CompactString.decode(buf)?
645 } else {
646 types::String.decode(buf)?
647 };
648 let results_by_partition = if version >= 3 {
649 types::CompactArray(types::Struct { version }).decode(buf)?
650 } else {
651 types::Array(types::Struct { version }).decode(buf)?
652 };
653 let mut unknown_tagged_fields = BTreeMap::new();
654 if version >= 3 {
655 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
656 for _ in 0..num_tagged_fields {
657 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
658 let size: u32 = types::UnsignedVarInt.decode(buf)?;
659 let unknown_value = buf.try_get_bytes(size as usize)?;
660 unknown_tagged_fields.insert(tag as i32, unknown_value);
661 }
662 }
663 Ok(Self {
664 name,
665 results_by_partition,
666 unknown_tagged_fields,
667 })
668 }
669}
670
671impl Default for AddPartitionsToTxnTopicResult {
672 fn default() -> Self {
673 Self {
674 name: Default::default(),
675 results_by_partition: Default::default(),
676 unknown_tagged_fields: BTreeMap::new(),
677 }
678 }
679}
680
681impl Message for AddPartitionsToTxnTopicResult {
682 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
683 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
684}
685
686impl HeaderVersion for AddPartitionsToTxnResponse {
687 fn header_version(version: i16) -> i16 {
688 if version >= 3 {
689 1
690 } else {
691 0
692 }
693 }
694}