1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AddPartitionsToTxnRequest {
24 pub transactions: Vec<AddPartitionsToTxnTransaction>,
28
29 pub v3_and_below_transactional_id: super::TransactionalId,
33
34 pub v3_and_below_producer_id: super::ProducerId,
38
39 pub v3_and_below_producer_epoch: i16,
43
44 pub v3_and_below_topics: Vec<AddPartitionsToTxnTopic>,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl AddPartitionsToTxnRequest {
54 pub fn with_transactions(mut self, value: Vec<AddPartitionsToTxnTransaction>) -> Self {
60 self.transactions = value;
61 self
62 }
63 pub fn with_v3_and_below_transactional_id(mut self, value: super::TransactionalId) -> Self {
69 self.v3_and_below_transactional_id = value;
70 self
71 }
72 pub fn with_v3_and_below_producer_id(mut self, value: super::ProducerId) -> Self {
78 self.v3_and_below_producer_id = value;
79 self
80 }
81 pub fn with_v3_and_below_producer_epoch(mut self, value: i16) -> Self {
87 self.v3_and_below_producer_epoch = value;
88 self
89 }
90 pub fn with_v3_and_below_topics(mut self, value: Vec<AddPartitionsToTxnTopic>) -> Self {
96 self.v3_and_below_topics = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "client")]
112impl Encodable for AddPartitionsToTxnRequest {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 if version < 0 || version > 5 {
115 bail!("specified version not supported by this message type");
116 }
117 if version >= 4 {
118 types::CompactArray(types::Struct { version }).encode(buf, &self.transactions)?;
119 } else {
120 if !self.transactions.is_empty() {
121 bail!("A field is set that is not available on the selected protocol version");
122 }
123 }
124 if version <= 3 {
125 if version >= 3 {
126 types::CompactString.encode(buf, &self.v3_and_below_transactional_id)?;
127 } else {
128 types::String.encode(buf, &self.v3_and_below_transactional_id)?;
129 }
130 } else {
131 if !self.v3_and_below_transactional_id.is_empty() {
132 bail!("A field is set that is not available on the selected protocol version");
133 }
134 }
135 if version <= 3 {
136 types::Int64.encode(buf, &self.v3_and_below_producer_id)?;
137 } else {
138 if self.v3_and_below_producer_id != 0 {
139 bail!("A field is set that is not available on the selected protocol version");
140 }
141 }
142 if version <= 3 {
143 types::Int16.encode(buf, &self.v3_and_below_producer_epoch)?;
144 } else {
145 if self.v3_and_below_producer_epoch != 0 {
146 bail!("A field is set that is not available on the selected protocol version");
147 }
148 }
149 if version <= 3 {
150 if version >= 3 {
151 types::CompactArray(types::Struct { version })
152 .encode(buf, &self.v3_and_below_topics)?;
153 } else {
154 types::Array(types::Struct { version }).encode(buf, &self.v3_and_below_topics)?;
155 }
156 } else {
157 if !self.v3_and_below_topics.is_empty() {
158 bail!("A field is set that is not available on the selected protocol version");
159 }
160 }
161 if version >= 3 {
162 let num_tagged_fields = self.unknown_tagged_fields.len();
163 if num_tagged_fields > std::u32::MAX as usize {
164 bail!(
165 "Too many tagged fields to encode ({} fields)",
166 num_tagged_fields
167 );
168 }
169 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
170
171 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
172 }
173 Ok(())
174 }
175 fn compute_size(&self, version: i16) -> Result<usize> {
176 let mut total_size = 0;
177 if version >= 4 {
178 total_size +=
179 types::CompactArray(types::Struct { version }).compute_size(&self.transactions)?;
180 } else {
181 if !self.transactions.is_empty() {
182 bail!("A field is set that is not available on the selected protocol version");
183 }
184 }
185 if version <= 3 {
186 if version >= 3 {
187 total_size +=
188 types::CompactString.compute_size(&self.v3_and_below_transactional_id)?;
189 } else {
190 total_size += types::String.compute_size(&self.v3_and_below_transactional_id)?;
191 }
192 } else {
193 if !self.v3_and_below_transactional_id.is_empty() {
194 bail!("A field is set that is not available on the selected protocol version");
195 }
196 }
197 if version <= 3 {
198 total_size += types::Int64.compute_size(&self.v3_and_below_producer_id)?;
199 } else {
200 if self.v3_and_below_producer_id != 0 {
201 bail!("A field is set that is not available on the selected protocol version");
202 }
203 }
204 if version <= 3 {
205 total_size += types::Int16.compute_size(&self.v3_and_below_producer_epoch)?;
206 } else {
207 if self.v3_and_below_producer_epoch != 0 {
208 bail!("A field is set that is not available on the selected protocol version");
209 }
210 }
211 if version <= 3 {
212 if version >= 3 {
213 total_size += types::CompactArray(types::Struct { version })
214 .compute_size(&self.v3_and_below_topics)?;
215 } else {
216 total_size += types::Array(types::Struct { version })
217 .compute_size(&self.v3_and_below_topics)?;
218 }
219 } else {
220 if !self.v3_and_below_topics.is_empty() {
221 bail!("A field is set that is not available on the selected protocol version");
222 }
223 }
224 if version >= 3 {
225 let num_tagged_fields = self.unknown_tagged_fields.len();
226 if num_tagged_fields > std::u32::MAX as usize {
227 bail!(
228 "Too many tagged fields to encode ({} fields)",
229 num_tagged_fields
230 );
231 }
232 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
233
234 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
235 }
236 Ok(total_size)
237 }
238}
239
240#[cfg(feature = "broker")]
241impl Decodable for AddPartitionsToTxnRequest {
242 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
243 if version < 0 || version > 5 {
244 bail!("specified version not supported by this message type");
245 }
246 let transactions = if version >= 4 {
247 types::CompactArray(types::Struct { version }).decode(buf)?
248 } else {
249 Default::default()
250 };
251 let v3_and_below_transactional_id = if version <= 3 {
252 if version >= 3 {
253 types::CompactString.decode(buf)?
254 } else {
255 types::String.decode(buf)?
256 }
257 } else {
258 Default::default()
259 };
260 let v3_and_below_producer_id = if version <= 3 {
261 types::Int64.decode(buf)?
262 } else {
263 (0).into()
264 };
265 let v3_and_below_producer_epoch = if version <= 3 {
266 types::Int16.decode(buf)?
267 } else {
268 0
269 };
270 let v3_and_below_topics = if version <= 3 {
271 if version >= 3 {
272 types::CompactArray(types::Struct { version }).decode(buf)?
273 } else {
274 types::Array(types::Struct { version }).decode(buf)?
275 }
276 } else {
277 Default::default()
278 };
279 let mut unknown_tagged_fields = BTreeMap::new();
280 if version >= 3 {
281 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
282 for _ in 0..num_tagged_fields {
283 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
284 let size: u32 = types::UnsignedVarInt.decode(buf)?;
285 let unknown_value = buf.try_get_bytes(size as usize)?;
286 unknown_tagged_fields.insert(tag as i32, unknown_value);
287 }
288 }
289 Ok(Self {
290 transactions,
291 v3_and_below_transactional_id,
292 v3_and_below_producer_id,
293 v3_and_below_producer_epoch,
294 v3_and_below_topics,
295 unknown_tagged_fields,
296 })
297 }
298}
299
300impl Default for AddPartitionsToTxnRequest {
301 fn default() -> Self {
302 Self {
303 transactions: Default::default(),
304 v3_and_below_transactional_id: Default::default(),
305 v3_and_below_producer_id: (0).into(),
306 v3_and_below_producer_epoch: 0,
307 v3_and_below_topics: Default::default(),
308 unknown_tagged_fields: BTreeMap::new(),
309 }
310 }
311}
312
313impl Message for AddPartitionsToTxnRequest {
314 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
315 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
316}
317
318#[non_exhaustive]
320#[derive(Debug, Clone, PartialEq)]
321pub struct AddPartitionsToTxnTopic {
322 pub name: super::TopicName,
326
327 pub partitions: Vec<i32>,
331
332 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
334}
335
336impl AddPartitionsToTxnTopic {
337 pub fn with_name(mut self, value: super::TopicName) -> Self {
343 self.name = value;
344 self
345 }
346 pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
352 self.partitions = value;
353 self
354 }
355 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
357 self.unknown_tagged_fields = value;
358 self
359 }
360 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
362 self.unknown_tagged_fields.insert(key, value);
363 self
364 }
365}
366
367#[cfg(feature = "client")]
368impl Encodable for AddPartitionsToTxnTopic {
369 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
370 if version < 0 || version > 5 {
371 bail!("specified version not supported by this message type");
372 }
373 if version >= 3 {
374 types::CompactString.encode(buf, &self.name)?;
375 } else {
376 types::String.encode(buf, &self.name)?;
377 }
378 if version >= 3 {
379 types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
380 } else {
381 types::Array(types::Int32).encode(buf, &self.partitions)?;
382 }
383 if version >= 3 {
384 let num_tagged_fields = self.unknown_tagged_fields.len();
385 if num_tagged_fields > std::u32::MAX as usize {
386 bail!(
387 "Too many tagged fields to encode ({} fields)",
388 num_tagged_fields
389 );
390 }
391 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
392
393 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
394 }
395 Ok(())
396 }
397 fn compute_size(&self, version: i16) -> Result<usize> {
398 let mut total_size = 0;
399 if version >= 3 {
400 total_size += types::CompactString.compute_size(&self.name)?;
401 } else {
402 total_size += types::String.compute_size(&self.name)?;
403 }
404 if version >= 3 {
405 total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
406 } else {
407 total_size += types::Array(types::Int32).compute_size(&self.partitions)?;
408 }
409 if version >= 3 {
410 let num_tagged_fields = self.unknown_tagged_fields.len();
411 if num_tagged_fields > std::u32::MAX as usize {
412 bail!(
413 "Too many tagged fields to encode ({} fields)",
414 num_tagged_fields
415 );
416 }
417 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
418
419 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
420 }
421 Ok(total_size)
422 }
423}
424
425#[cfg(feature = "broker")]
426impl Decodable for AddPartitionsToTxnTopic {
427 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
428 if version < 0 || version > 5 {
429 bail!("specified version not supported by this message type");
430 }
431 let name = if version >= 3 {
432 types::CompactString.decode(buf)?
433 } else {
434 types::String.decode(buf)?
435 };
436 let partitions = if version >= 3 {
437 types::CompactArray(types::Int32).decode(buf)?
438 } else {
439 types::Array(types::Int32).decode(buf)?
440 };
441 let mut unknown_tagged_fields = BTreeMap::new();
442 if version >= 3 {
443 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
444 for _ in 0..num_tagged_fields {
445 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
446 let size: u32 = types::UnsignedVarInt.decode(buf)?;
447 let unknown_value = buf.try_get_bytes(size as usize)?;
448 unknown_tagged_fields.insert(tag as i32, unknown_value);
449 }
450 }
451 Ok(Self {
452 name,
453 partitions,
454 unknown_tagged_fields,
455 })
456 }
457}
458
459impl Default for AddPartitionsToTxnTopic {
460 fn default() -> Self {
461 Self {
462 name: Default::default(),
463 partitions: Default::default(),
464 unknown_tagged_fields: BTreeMap::new(),
465 }
466 }
467}
468
469impl Message for AddPartitionsToTxnTopic {
470 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
471 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
472}
473
474#[non_exhaustive]
476#[derive(Debug, Clone, PartialEq)]
477pub struct AddPartitionsToTxnTransaction {
478 pub transactional_id: super::TransactionalId,
482
483 pub producer_id: super::ProducerId,
487
488 pub producer_epoch: i16,
492
493 pub verify_only: bool,
497
498 pub topics: Vec<AddPartitionsToTxnTopic>,
502
503 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
505}
506
507impl AddPartitionsToTxnTransaction {
508 pub fn with_transactional_id(mut self, value: super::TransactionalId) -> Self {
514 self.transactional_id = value;
515 self
516 }
517 pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
523 self.producer_id = value;
524 self
525 }
526 pub fn with_producer_epoch(mut self, value: i16) -> Self {
532 self.producer_epoch = value;
533 self
534 }
535 pub fn with_verify_only(mut self, value: bool) -> Self {
541 self.verify_only = value;
542 self
543 }
544 pub fn with_topics(mut self, value: Vec<AddPartitionsToTxnTopic>) -> Self {
550 self.topics = value;
551 self
552 }
553 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
555 self.unknown_tagged_fields = value;
556 self
557 }
558 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
560 self.unknown_tagged_fields.insert(key, value);
561 self
562 }
563}
564
565#[cfg(feature = "client")]
566impl Encodable for AddPartitionsToTxnTransaction {
567 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
568 if version < 0 || version > 5 {
569 bail!("specified version not supported by this message type");
570 }
571 if version >= 4 {
572 types::CompactString.encode(buf, &self.transactional_id)?;
573 } else {
574 if !self.transactional_id.is_empty() {
575 bail!("A field is set that is not available on the selected protocol version");
576 }
577 }
578 if version >= 4 {
579 types::Int64.encode(buf, &self.producer_id)?;
580 } else {
581 if self.producer_id != 0 {
582 bail!("A field is set that is not available on the selected protocol version");
583 }
584 }
585 if version >= 4 {
586 types::Int16.encode(buf, &self.producer_epoch)?;
587 } else {
588 if self.producer_epoch != 0 {
589 bail!("A field is set that is not available on the selected protocol version");
590 }
591 }
592 if version >= 4 {
593 types::Boolean.encode(buf, &self.verify_only)?;
594 } else {
595 if self.verify_only {
596 bail!("A field is set that is not available on the selected protocol version");
597 }
598 }
599 if version >= 4 {
600 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
601 } else {
602 if !self.topics.is_empty() {
603 bail!("A field is set that is not available on the selected protocol version");
604 }
605 }
606 if version >= 3 {
607 let num_tagged_fields = self.unknown_tagged_fields.len();
608 if num_tagged_fields > std::u32::MAX as usize {
609 bail!(
610 "Too many tagged fields to encode ({} fields)",
611 num_tagged_fields
612 );
613 }
614 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
615
616 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
617 }
618 Ok(())
619 }
620 fn compute_size(&self, version: i16) -> Result<usize> {
621 let mut total_size = 0;
622 if version >= 4 {
623 total_size += types::CompactString.compute_size(&self.transactional_id)?;
624 } else {
625 if !self.transactional_id.is_empty() {
626 bail!("A field is set that is not available on the selected protocol version");
627 }
628 }
629 if version >= 4 {
630 total_size += types::Int64.compute_size(&self.producer_id)?;
631 } else {
632 if self.producer_id != 0 {
633 bail!("A field is set that is not available on the selected protocol version");
634 }
635 }
636 if version >= 4 {
637 total_size += types::Int16.compute_size(&self.producer_epoch)?;
638 } else {
639 if self.producer_epoch != 0 {
640 bail!("A field is set that is not available on the selected protocol version");
641 }
642 }
643 if version >= 4 {
644 total_size += types::Boolean.compute_size(&self.verify_only)?;
645 } else {
646 if self.verify_only {
647 bail!("A field is set that is not available on the selected protocol version");
648 }
649 }
650 if version >= 4 {
651 total_size +=
652 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
653 } else {
654 if !self.topics.is_empty() {
655 bail!("A field is set that is not available on the selected protocol version");
656 }
657 }
658 if version >= 3 {
659 let num_tagged_fields = self.unknown_tagged_fields.len();
660 if num_tagged_fields > std::u32::MAX as usize {
661 bail!(
662 "Too many tagged fields to encode ({} fields)",
663 num_tagged_fields
664 );
665 }
666 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
667
668 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
669 }
670 Ok(total_size)
671 }
672}
673
674#[cfg(feature = "broker")]
675impl Decodable for AddPartitionsToTxnTransaction {
676 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
677 if version < 0 || version > 5 {
678 bail!("specified version not supported by this message type");
679 }
680 let transactional_id = if version >= 4 {
681 types::CompactString.decode(buf)?
682 } else {
683 Default::default()
684 };
685 let producer_id = if version >= 4 {
686 types::Int64.decode(buf)?
687 } else {
688 (0).into()
689 };
690 let producer_epoch = if version >= 4 {
691 types::Int16.decode(buf)?
692 } else {
693 0
694 };
695 let verify_only = if version >= 4 {
696 types::Boolean.decode(buf)?
697 } else {
698 false
699 };
700 let topics = if version >= 4 {
701 types::CompactArray(types::Struct { version }).decode(buf)?
702 } else {
703 Default::default()
704 };
705 let mut unknown_tagged_fields = BTreeMap::new();
706 if version >= 3 {
707 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
708 for _ in 0..num_tagged_fields {
709 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
710 let size: u32 = types::UnsignedVarInt.decode(buf)?;
711 let unknown_value = buf.try_get_bytes(size as usize)?;
712 unknown_tagged_fields.insert(tag as i32, unknown_value);
713 }
714 }
715 Ok(Self {
716 transactional_id,
717 producer_id,
718 producer_epoch,
719 verify_only,
720 topics,
721 unknown_tagged_fields,
722 })
723 }
724}
725
726impl Default for AddPartitionsToTxnTransaction {
727 fn default() -> Self {
728 Self {
729 transactional_id: Default::default(),
730 producer_id: (0).into(),
731 producer_epoch: 0,
732 verify_only: false,
733 topics: Default::default(),
734 unknown_tagged_fields: BTreeMap::new(),
735 }
736 }
737}
738
739impl Message for AddPartitionsToTxnTransaction {
740 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
741 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
742}
743
744impl HeaderVersion for AddPartitionsToTxnRequest {
745 fn header_version(version: i16) -> i16 {
746 if version >= 3 {
747 2
748 } else {
749 1
750 }
751 }
752}