1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AddPartitionsToTxnRequest {
24 pub transactions: Vec<AddPartitionsToTxnTransaction>,
28
29 pub v3_and_below_transactional_id: super::TransactionalId,
33
34 pub v3_and_below_producer_id: super::ProducerId,
38
39 pub v3_and_below_producer_epoch: i16,
43
44 pub v3_and_below_topics: Vec<AddPartitionsToTxnTopic>,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl AddPartitionsToTxnRequest {
54 pub fn with_transactions(mut self, value: Vec<AddPartitionsToTxnTransaction>) -> Self {
60 self.transactions = value;
61 self
62 }
63 pub fn with_v3_and_below_transactional_id(mut self, value: super::TransactionalId) -> Self {
69 self.v3_and_below_transactional_id = value;
70 self
71 }
72 pub fn with_v3_and_below_producer_id(mut self, value: super::ProducerId) -> Self {
78 self.v3_and_below_producer_id = value;
79 self
80 }
81 pub fn with_v3_and_below_producer_epoch(mut self, value: i16) -> Self {
87 self.v3_and_below_producer_epoch = value;
88 self
89 }
90 pub fn with_v3_and_below_topics(mut self, value: Vec<AddPartitionsToTxnTopic>) -> Self {
96 self.v3_and_below_topics = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "client")]
112impl Encodable for AddPartitionsToTxnRequest {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 if version >= 4 {
115 types::CompactArray(types::Struct { version }).encode(buf, &self.transactions)?;
116 } else {
117 if !self.transactions.is_empty() {
118 bail!("A field is set that is not available on the selected protocol version");
119 }
120 }
121 if version <= 3 {
122 if version >= 3 {
123 types::CompactString.encode(buf, &self.v3_and_below_transactional_id)?;
124 } else {
125 types::String.encode(buf, &self.v3_and_below_transactional_id)?;
126 }
127 } else {
128 if !self.v3_and_below_transactional_id.is_empty() {
129 bail!("A field is set that is not available on the selected protocol version");
130 }
131 }
132 if version <= 3 {
133 types::Int64.encode(buf, &self.v3_and_below_producer_id)?;
134 } else {
135 if self.v3_and_below_producer_id != 0 {
136 bail!("A field is set that is not available on the selected protocol version");
137 }
138 }
139 if version <= 3 {
140 types::Int16.encode(buf, &self.v3_and_below_producer_epoch)?;
141 } else {
142 if self.v3_and_below_producer_epoch != 0 {
143 bail!("A field is set that is not available on the selected protocol version");
144 }
145 }
146 if version <= 3 {
147 if version >= 3 {
148 types::CompactArray(types::Struct { version })
149 .encode(buf, &self.v3_and_below_topics)?;
150 } else {
151 types::Array(types::Struct { version }).encode(buf, &self.v3_and_below_topics)?;
152 }
153 } else {
154 if !self.v3_and_below_topics.is_empty() {
155 bail!("A field is set that is not available on the selected protocol version");
156 }
157 }
158 if version >= 3 {
159 let num_tagged_fields = self.unknown_tagged_fields.len();
160 if num_tagged_fields > std::u32::MAX as usize {
161 bail!(
162 "Too many tagged fields to encode ({} fields)",
163 num_tagged_fields
164 );
165 }
166 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
167
168 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
169 }
170 Ok(())
171 }
172 fn compute_size(&self, version: i16) -> Result<usize> {
173 let mut total_size = 0;
174 if version >= 4 {
175 total_size +=
176 types::CompactArray(types::Struct { version }).compute_size(&self.transactions)?;
177 } else {
178 if !self.transactions.is_empty() {
179 bail!("A field is set that is not available on the selected protocol version");
180 }
181 }
182 if version <= 3 {
183 if version >= 3 {
184 total_size +=
185 types::CompactString.compute_size(&self.v3_and_below_transactional_id)?;
186 } else {
187 total_size += types::String.compute_size(&self.v3_and_below_transactional_id)?;
188 }
189 } else {
190 if !self.v3_and_below_transactional_id.is_empty() {
191 bail!("A field is set that is not available on the selected protocol version");
192 }
193 }
194 if version <= 3 {
195 total_size += types::Int64.compute_size(&self.v3_and_below_producer_id)?;
196 } else {
197 if self.v3_and_below_producer_id != 0 {
198 bail!("A field is set that is not available on the selected protocol version");
199 }
200 }
201 if version <= 3 {
202 total_size += types::Int16.compute_size(&self.v3_and_below_producer_epoch)?;
203 } else {
204 if self.v3_and_below_producer_epoch != 0 {
205 bail!("A field is set that is not available on the selected protocol version");
206 }
207 }
208 if version <= 3 {
209 if version >= 3 {
210 total_size += types::CompactArray(types::Struct { version })
211 .compute_size(&self.v3_and_below_topics)?;
212 } else {
213 total_size += types::Array(types::Struct { version })
214 .compute_size(&self.v3_and_below_topics)?;
215 }
216 } else {
217 if !self.v3_and_below_topics.is_empty() {
218 bail!("A field is set that is not available on the selected protocol version");
219 }
220 }
221 if version >= 3 {
222 let num_tagged_fields = self.unknown_tagged_fields.len();
223 if num_tagged_fields > std::u32::MAX as usize {
224 bail!(
225 "Too many tagged fields to encode ({} fields)",
226 num_tagged_fields
227 );
228 }
229 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
230
231 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
232 }
233 Ok(total_size)
234 }
235}
236
237#[cfg(feature = "broker")]
238impl Decodable for AddPartitionsToTxnRequest {
239 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
240 let transactions = if version >= 4 {
241 types::CompactArray(types::Struct { version }).decode(buf)?
242 } else {
243 Default::default()
244 };
245 let v3_and_below_transactional_id = if version <= 3 {
246 if version >= 3 {
247 types::CompactString.decode(buf)?
248 } else {
249 types::String.decode(buf)?
250 }
251 } else {
252 Default::default()
253 };
254 let v3_and_below_producer_id = if version <= 3 {
255 types::Int64.decode(buf)?
256 } else {
257 (0).into()
258 };
259 let v3_and_below_producer_epoch = if version <= 3 {
260 types::Int16.decode(buf)?
261 } else {
262 0
263 };
264 let v3_and_below_topics = if version <= 3 {
265 if version >= 3 {
266 types::CompactArray(types::Struct { version }).decode(buf)?
267 } else {
268 types::Array(types::Struct { version }).decode(buf)?
269 }
270 } else {
271 Default::default()
272 };
273 let mut unknown_tagged_fields = BTreeMap::new();
274 if version >= 3 {
275 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
276 for _ in 0..num_tagged_fields {
277 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
278 let size: u32 = types::UnsignedVarInt.decode(buf)?;
279 let unknown_value = buf.try_get_bytes(size as usize)?;
280 unknown_tagged_fields.insert(tag as i32, unknown_value);
281 }
282 }
283 Ok(Self {
284 transactions,
285 v3_and_below_transactional_id,
286 v3_and_below_producer_id,
287 v3_and_below_producer_epoch,
288 v3_and_below_topics,
289 unknown_tagged_fields,
290 })
291 }
292}
293
294impl Default for AddPartitionsToTxnRequest {
295 fn default() -> Self {
296 Self {
297 transactions: Default::default(),
298 v3_and_below_transactional_id: Default::default(),
299 v3_and_below_producer_id: (0).into(),
300 v3_and_below_producer_epoch: 0,
301 v3_and_below_topics: Default::default(),
302 unknown_tagged_fields: BTreeMap::new(),
303 }
304 }
305}
306
307impl Message for AddPartitionsToTxnRequest {
308 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
309 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
310}
311
312#[non_exhaustive]
314#[derive(Debug, Clone, PartialEq)]
315pub struct AddPartitionsToTxnTopic {
316 pub name: super::TopicName,
320
321 pub partitions: Vec<i32>,
325
326 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
328}
329
330impl AddPartitionsToTxnTopic {
331 pub fn with_name(mut self, value: super::TopicName) -> Self {
337 self.name = value;
338 self
339 }
340 pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
346 self.partitions = value;
347 self
348 }
349 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
351 self.unknown_tagged_fields = value;
352 self
353 }
354 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
356 self.unknown_tagged_fields.insert(key, value);
357 self
358 }
359}
360
361#[cfg(feature = "client")]
362impl Encodable for AddPartitionsToTxnTopic {
363 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
364 if version >= 3 {
365 types::CompactString.encode(buf, &self.name)?;
366 } else {
367 types::String.encode(buf, &self.name)?;
368 }
369 if version >= 3 {
370 types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
371 } else {
372 types::Array(types::Int32).encode(buf, &self.partitions)?;
373 }
374 if version >= 3 {
375 let num_tagged_fields = self.unknown_tagged_fields.len();
376 if num_tagged_fields > std::u32::MAX as usize {
377 bail!(
378 "Too many tagged fields to encode ({} fields)",
379 num_tagged_fields
380 );
381 }
382 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
383
384 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
385 }
386 Ok(())
387 }
388 fn compute_size(&self, version: i16) -> Result<usize> {
389 let mut total_size = 0;
390 if version >= 3 {
391 total_size += types::CompactString.compute_size(&self.name)?;
392 } else {
393 total_size += types::String.compute_size(&self.name)?;
394 }
395 if version >= 3 {
396 total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
397 } else {
398 total_size += types::Array(types::Int32).compute_size(&self.partitions)?;
399 }
400 if version >= 3 {
401 let num_tagged_fields = self.unknown_tagged_fields.len();
402 if num_tagged_fields > std::u32::MAX as usize {
403 bail!(
404 "Too many tagged fields to encode ({} fields)",
405 num_tagged_fields
406 );
407 }
408 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
409
410 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
411 }
412 Ok(total_size)
413 }
414}
415
416#[cfg(feature = "broker")]
417impl Decodable for AddPartitionsToTxnTopic {
418 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
419 let name = if version >= 3 {
420 types::CompactString.decode(buf)?
421 } else {
422 types::String.decode(buf)?
423 };
424 let partitions = if version >= 3 {
425 types::CompactArray(types::Int32).decode(buf)?
426 } else {
427 types::Array(types::Int32).decode(buf)?
428 };
429 let mut unknown_tagged_fields = BTreeMap::new();
430 if version >= 3 {
431 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
432 for _ in 0..num_tagged_fields {
433 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
434 let size: u32 = types::UnsignedVarInt.decode(buf)?;
435 let unknown_value = buf.try_get_bytes(size as usize)?;
436 unknown_tagged_fields.insert(tag as i32, unknown_value);
437 }
438 }
439 Ok(Self {
440 name,
441 partitions,
442 unknown_tagged_fields,
443 })
444 }
445}
446
447impl Default for AddPartitionsToTxnTopic {
448 fn default() -> Self {
449 Self {
450 name: Default::default(),
451 partitions: Default::default(),
452 unknown_tagged_fields: BTreeMap::new(),
453 }
454 }
455}
456
457impl Message for AddPartitionsToTxnTopic {
458 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
459 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
460}
461
462#[non_exhaustive]
464#[derive(Debug, Clone, PartialEq)]
465pub struct AddPartitionsToTxnTransaction {
466 pub transactional_id: super::TransactionalId,
470
471 pub producer_id: super::ProducerId,
475
476 pub producer_epoch: i16,
480
481 pub verify_only: bool,
485
486 pub topics: Vec<AddPartitionsToTxnTopic>,
490
491 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
493}
494
495impl AddPartitionsToTxnTransaction {
496 pub fn with_transactional_id(mut self, value: super::TransactionalId) -> Self {
502 self.transactional_id = value;
503 self
504 }
505 pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
511 self.producer_id = value;
512 self
513 }
514 pub fn with_producer_epoch(mut self, value: i16) -> Self {
520 self.producer_epoch = value;
521 self
522 }
523 pub fn with_verify_only(mut self, value: bool) -> Self {
529 self.verify_only = value;
530 self
531 }
532 pub fn with_topics(mut self, value: Vec<AddPartitionsToTxnTopic>) -> Self {
538 self.topics = value;
539 self
540 }
541 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
543 self.unknown_tagged_fields = value;
544 self
545 }
546 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
548 self.unknown_tagged_fields.insert(key, value);
549 self
550 }
551}
552
553#[cfg(feature = "client")]
554impl Encodable for AddPartitionsToTxnTransaction {
555 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
556 if version >= 4 {
557 types::CompactString.encode(buf, &self.transactional_id)?;
558 } else {
559 if !self.transactional_id.is_empty() {
560 bail!("A field is set that is not available on the selected protocol version");
561 }
562 }
563 if version >= 4 {
564 types::Int64.encode(buf, &self.producer_id)?;
565 } else {
566 if self.producer_id != 0 {
567 bail!("A field is set that is not available on the selected protocol version");
568 }
569 }
570 if version >= 4 {
571 types::Int16.encode(buf, &self.producer_epoch)?;
572 } else {
573 if self.producer_epoch != 0 {
574 bail!("A field is set that is not available on the selected protocol version");
575 }
576 }
577 if version >= 4 {
578 types::Boolean.encode(buf, &self.verify_only)?;
579 } else {
580 if self.verify_only {
581 bail!("A field is set that is not available on the selected protocol version");
582 }
583 }
584 if version >= 4 {
585 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
586 } else {
587 if !self.topics.is_empty() {
588 bail!("A field is set that is not available on the selected protocol version");
589 }
590 }
591 if version >= 3 {
592 let num_tagged_fields = self.unknown_tagged_fields.len();
593 if num_tagged_fields > std::u32::MAX as usize {
594 bail!(
595 "Too many tagged fields to encode ({} fields)",
596 num_tagged_fields
597 );
598 }
599 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
600
601 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
602 }
603 Ok(())
604 }
605 fn compute_size(&self, version: i16) -> Result<usize> {
606 let mut total_size = 0;
607 if version >= 4 {
608 total_size += types::CompactString.compute_size(&self.transactional_id)?;
609 } else {
610 if !self.transactional_id.is_empty() {
611 bail!("A field is set that is not available on the selected protocol version");
612 }
613 }
614 if version >= 4 {
615 total_size += types::Int64.compute_size(&self.producer_id)?;
616 } else {
617 if self.producer_id != 0 {
618 bail!("A field is set that is not available on the selected protocol version");
619 }
620 }
621 if version >= 4 {
622 total_size += types::Int16.compute_size(&self.producer_epoch)?;
623 } else {
624 if self.producer_epoch != 0 {
625 bail!("A field is set that is not available on the selected protocol version");
626 }
627 }
628 if version >= 4 {
629 total_size += types::Boolean.compute_size(&self.verify_only)?;
630 } else {
631 if self.verify_only {
632 bail!("A field is set that is not available on the selected protocol version");
633 }
634 }
635 if version >= 4 {
636 total_size +=
637 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
638 } else {
639 if !self.topics.is_empty() {
640 bail!("A field is set that is not available on the selected protocol version");
641 }
642 }
643 if version >= 3 {
644 let num_tagged_fields = self.unknown_tagged_fields.len();
645 if num_tagged_fields > std::u32::MAX as usize {
646 bail!(
647 "Too many tagged fields to encode ({} fields)",
648 num_tagged_fields
649 );
650 }
651 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
652
653 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
654 }
655 Ok(total_size)
656 }
657}
658
659#[cfg(feature = "broker")]
660impl Decodable for AddPartitionsToTxnTransaction {
661 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
662 let transactional_id = if version >= 4 {
663 types::CompactString.decode(buf)?
664 } else {
665 Default::default()
666 };
667 let producer_id = if version >= 4 {
668 types::Int64.decode(buf)?
669 } else {
670 (0).into()
671 };
672 let producer_epoch = if version >= 4 {
673 types::Int16.decode(buf)?
674 } else {
675 0
676 };
677 let verify_only = if version >= 4 {
678 types::Boolean.decode(buf)?
679 } else {
680 false
681 };
682 let topics = if version >= 4 {
683 types::CompactArray(types::Struct { version }).decode(buf)?
684 } else {
685 Default::default()
686 };
687 let mut unknown_tagged_fields = BTreeMap::new();
688 if version >= 3 {
689 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
690 for _ in 0..num_tagged_fields {
691 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
692 let size: u32 = types::UnsignedVarInt.decode(buf)?;
693 let unknown_value = buf.try_get_bytes(size as usize)?;
694 unknown_tagged_fields.insert(tag as i32, unknown_value);
695 }
696 }
697 Ok(Self {
698 transactional_id,
699 producer_id,
700 producer_epoch,
701 verify_only,
702 topics,
703 unknown_tagged_fields,
704 })
705 }
706}
707
708impl Default for AddPartitionsToTxnTransaction {
709 fn default() -> Self {
710 Self {
711 transactional_id: Default::default(),
712 producer_id: (0).into(),
713 producer_epoch: 0,
714 verify_only: false,
715 topics: Default::default(),
716 unknown_tagged_fields: BTreeMap::new(),
717 }
718 }
719}
720
721impl Message for AddPartitionsToTxnTransaction {
722 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
723 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
724}
725
726impl HeaderVersion for AddPartitionsToTxnRequest {
727 fn header_version(version: i16) -> i16 {
728 if version >= 3 {
729 2
730 } else {
731 1
732 }
733 }
734}