1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct TxnOffsetCommitRequest {
24 pub transactional_id: super::TransactionalId,
28
29 pub group_id: super::GroupId,
33
34 pub producer_id: super::ProducerId,
38
39 pub producer_epoch: i16,
43
44 pub generation_id: i32,
48
49 pub member_id: StrBytes,
53
54 pub group_instance_id: Option<StrBytes>,
58
59 pub topics: Vec<TxnOffsetCommitRequestTopic>,
63
64 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
66}
67
68impl TxnOffsetCommitRequest {
69 pub fn with_transactional_id(mut self, value: super::TransactionalId) -> Self {
75 self.transactional_id = value;
76 self
77 }
78 pub fn with_group_id(mut self, value: super::GroupId) -> Self {
84 self.group_id = value;
85 self
86 }
87 pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
93 self.producer_id = value;
94 self
95 }
96 pub fn with_producer_epoch(mut self, value: i16) -> Self {
102 self.producer_epoch = value;
103 self
104 }
105 pub fn with_generation_id(mut self, value: i32) -> Self {
111 self.generation_id = value;
112 self
113 }
114 pub fn with_member_id(mut self, value: StrBytes) -> Self {
120 self.member_id = value;
121 self
122 }
123 pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
129 self.group_instance_id = value;
130 self
131 }
132 pub fn with_topics(mut self, value: Vec<TxnOffsetCommitRequestTopic>) -> Self {
138 self.topics = value;
139 self
140 }
141 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
143 self.unknown_tagged_fields = value;
144 self
145 }
146 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
148 self.unknown_tagged_fields.insert(key, value);
149 self
150 }
151}
152
153#[cfg(feature = "client")]
154impl Encodable for TxnOffsetCommitRequest {
155 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
156 if version < 0 || version > 4 {
157 bail!("specified version not supported by this message type");
158 }
159 if version >= 3 {
160 types::CompactString.encode(buf, &self.transactional_id)?;
161 } else {
162 types::String.encode(buf, &self.transactional_id)?;
163 }
164 if version >= 3 {
165 types::CompactString.encode(buf, &self.group_id)?;
166 } else {
167 types::String.encode(buf, &self.group_id)?;
168 }
169 types::Int64.encode(buf, &self.producer_id)?;
170 types::Int16.encode(buf, &self.producer_epoch)?;
171 if version >= 3 {
172 types::Int32.encode(buf, &self.generation_id)?;
173 } else {
174 if self.generation_id != -1 {
175 bail!("A field is set that is not available on the selected protocol version");
176 }
177 }
178 if version >= 3 {
179 types::CompactString.encode(buf, &self.member_id)?;
180 } else {
181 if &self.member_id != "" {
182 bail!("A field is set that is not available on the selected protocol version");
183 }
184 }
185 if version >= 3 {
186 types::CompactString.encode(buf, &self.group_instance_id)?;
187 } else {
188 if !self.group_instance_id.is_none() {
189 bail!("A field is set that is not available on the selected protocol version");
190 }
191 }
192 if version >= 3 {
193 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
194 } else {
195 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
196 }
197 if version >= 3 {
198 let num_tagged_fields = self.unknown_tagged_fields.len();
199 if num_tagged_fields > std::u32::MAX as usize {
200 bail!(
201 "Too many tagged fields to encode ({} fields)",
202 num_tagged_fields
203 );
204 }
205 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
206
207 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
208 }
209 Ok(())
210 }
211 fn compute_size(&self, version: i16) -> Result<usize> {
212 let mut total_size = 0;
213 if version >= 3 {
214 total_size += types::CompactString.compute_size(&self.transactional_id)?;
215 } else {
216 total_size += types::String.compute_size(&self.transactional_id)?;
217 }
218 if version >= 3 {
219 total_size += types::CompactString.compute_size(&self.group_id)?;
220 } else {
221 total_size += types::String.compute_size(&self.group_id)?;
222 }
223 total_size += types::Int64.compute_size(&self.producer_id)?;
224 total_size += types::Int16.compute_size(&self.producer_epoch)?;
225 if version >= 3 {
226 total_size += types::Int32.compute_size(&self.generation_id)?;
227 } else {
228 if self.generation_id != -1 {
229 bail!("A field is set that is not available on the selected protocol version");
230 }
231 }
232 if version >= 3 {
233 total_size += types::CompactString.compute_size(&self.member_id)?;
234 } else {
235 if &self.member_id != "" {
236 bail!("A field is set that is not available on the selected protocol version");
237 }
238 }
239 if version >= 3 {
240 total_size += types::CompactString.compute_size(&self.group_instance_id)?;
241 } else {
242 if !self.group_instance_id.is_none() {
243 bail!("A field is set that is not available on the selected protocol version");
244 }
245 }
246 if version >= 3 {
247 total_size +=
248 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
249 } else {
250 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
251 }
252 if version >= 3 {
253 let num_tagged_fields = self.unknown_tagged_fields.len();
254 if num_tagged_fields > std::u32::MAX as usize {
255 bail!(
256 "Too many tagged fields to encode ({} fields)",
257 num_tagged_fields
258 );
259 }
260 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
261
262 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
263 }
264 Ok(total_size)
265 }
266}
267
268#[cfg(feature = "broker")]
269impl Decodable for TxnOffsetCommitRequest {
270 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
271 if version < 0 || version > 4 {
272 bail!("specified version not supported by this message type");
273 }
274 let transactional_id = if version >= 3 {
275 types::CompactString.decode(buf)?
276 } else {
277 types::String.decode(buf)?
278 };
279 let group_id = if version >= 3 {
280 types::CompactString.decode(buf)?
281 } else {
282 types::String.decode(buf)?
283 };
284 let producer_id = types::Int64.decode(buf)?;
285 let producer_epoch = types::Int16.decode(buf)?;
286 let generation_id = if version >= 3 {
287 types::Int32.decode(buf)?
288 } else {
289 -1
290 };
291 let member_id = if version >= 3 {
292 types::CompactString.decode(buf)?
293 } else {
294 StrBytes::from_static_str("")
295 };
296 let group_instance_id = if version >= 3 {
297 types::CompactString.decode(buf)?
298 } else {
299 None
300 };
301 let topics = if version >= 3 {
302 types::CompactArray(types::Struct { version }).decode(buf)?
303 } else {
304 types::Array(types::Struct { version }).decode(buf)?
305 };
306 let mut unknown_tagged_fields = BTreeMap::new();
307 if version >= 3 {
308 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
309 for _ in 0..num_tagged_fields {
310 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
311 let size: u32 = types::UnsignedVarInt.decode(buf)?;
312 let unknown_value = buf.try_get_bytes(size as usize)?;
313 unknown_tagged_fields.insert(tag as i32, unknown_value);
314 }
315 }
316 Ok(Self {
317 transactional_id,
318 group_id,
319 producer_id,
320 producer_epoch,
321 generation_id,
322 member_id,
323 group_instance_id,
324 topics,
325 unknown_tagged_fields,
326 })
327 }
328}
329
330impl Default for TxnOffsetCommitRequest {
331 fn default() -> Self {
332 Self {
333 transactional_id: Default::default(),
334 group_id: Default::default(),
335 producer_id: (0).into(),
336 producer_epoch: 0,
337 generation_id: -1,
338 member_id: StrBytes::from_static_str(""),
339 group_instance_id: None,
340 topics: Default::default(),
341 unknown_tagged_fields: BTreeMap::new(),
342 }
343 }
344}
345
346impl Message for TxnOffsetCommitRequest {
347 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
348 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
349}
350
351#[non_exhaustive]
353#[derive(Debug, Clone, PartialEq)]
354pub struct TxnOffsetCommitRequestPartition {
355 pub partition_index: i32,
359
360 pub committed_offset: i64,
364
365 pub committed_leader_epoch: i32,
369
370 pub committed_metadata: Option<StrBytes>,
374
375 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
377}
378
379impl TxnOffsetCommitRequestPartition {
380 pub fn with_partition_index(mut self, value: i32) -> Self {
386 self.partition_index = value;
387 self
388 }
389 pub fn with_committed_offset(mut self, value: i64) -> Self {
395 self.committed_offset = value;
396 self
397 }
398 pub fn with_committed_leader_epoch(mut self, value: i32) -> Self {
404 self.committed_leader_epoch = value;
405 self
406 }
407 pub fn with_committed_metadata(mut self, value: Option<StrBytes>) -> Self {
413 self.committed_metadata = value;
414 self
415 }
416 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
418 self.unknown_tagged_fields = value;
419 self
420 }
421 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
423 self.unknown_tagged_fields.insert(key, value);
424 self
425 }
426}
427
428#[cfg(feature = "client")]
429impl Encodable for TxnOffsetCommitRequestPartition {
430 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
431 if version < 0 || version > 4 {
432 bail!("specified version not supported by this message type");
433 }
434 types::Int32.encode(buf, &self.partition_index)?;
435 types::Int64.encode(buf, &self.committed_offset)?;
436 if version >= 2 {
437 types::Int32.encode(buf, &self.committed_leader_epoch)?;
438 }
439 if version >= 3 {
440 types::CompactString.encode(buf, &self.committed_metadata)?;
441 } else {
442 types::String.encode(buf, &self.committed_metadata)?;
443 }
444 if version >= 3 {
445 let num_tagged_fields = self.unknown_tagged_fields.len();
446 if num_tagged_fields > std::u32::MAX as usize {
447 bail!(
448 "Too many tagged fields to encode ({} fields)",
449 num_tagged_fields
450 );
451 }
452 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
453
454 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
455 }
456 Ok(())
457 }
458 fn compute_size(&self, version: i16) -> Result<usize> {
459 let mut total_size = 0;
460 total_size += types::Int32.compute_size(&self.partition_index)?;
461 total_size += types::Int64.compute_size(&self.committed_offset)?;
462 if version >= 2 {
463 total_size += types::Int32.compute_size(&self.committed_leader_epoch)?;
464 }
465 if version >= 3 {
466 total_size += types::CompactString.compute_size(&self.committed_metadata)?;
467 } else {
468 total_size += types::String.compute_size(&self.committed_metadata)?;
469 }
470 if version >= 3 {
471 let num_tagged_fields = self.unknown_tagged_fields.len();
472 if num_tagged_fields > std::u32::MAX as usize {
473 bail!(
474 "Too many tagged fields to encode ({} fields)",
475 num_tagged_fields
476 );
477 }
478 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
479
480 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
481 }
482 Ok(total_size)
483 }
484}
485
486#[cfg(feature = "broker")]
487impl Decodable for TxnOffsetCommitRequestPartition {
488 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
489 if version < 0 || version > 4 {
490 bail!("specified version not supported by this message type");
491 }
492 let partition_index = types::Int32.decode(buf)?;
493 let committed_offset = types::Int64.decode(buf)?;
494 let committed_leader_epoch = if version >= 2 {
495 types::Int32.decode(buf)?
496 } else {
497 -1
498 };
499 let committed_metadata = if version >= 3 {
500 types::CompactString.decode(buf)?
501 } else {
502 types::String.decode(buf)?
503 };
504 let mut unknown_tagged_fields = BTreeMap::new();
505 if version >= 3 {
506 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
507 for _ in 0..num_tagged_fields {
508 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
509 let size: u32 = types::UnsignedVarInt.decode(buf)?;
510 let unknown_value = buf.try_get_bytes(size as usize)?;
511 unknown_tagged_fields.insert(tag as i32, unknown_value);
512 }
513 }
514 Ok(Self {
515 partition_index,
516 committed_offset,
517 committed_leader_epoch,
518 committed_metadata,
519 unknown_tagged_fields,
520 })
521 }
522}
523
524impl Default for TxnOffsetCommitRequestPartition {
525 fn default() -> Self {
526 Self {
527 partition_index: 0,
528 committed_offset: 0,
529 committed_leader_epoch: -1,
530 committed_metadata: Some(Default::default()),
531 unknown_tagged_fields: BTreeMap::new(),
532 }
533 }
534}
535
536impl Message for TxnOffsetCommitRequestPartition {
537 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
538 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
539}
540
541#[non_exhaustive]
543#[derive(Debug, Clone, PartialEq)]
544pub struct TxnOffsetCommitRequestTopic {
545 pub name: super::TopicName,
549
550 pub partitions: Vec<TxnOffsetCommitRequestPartition>,
554
555 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
557}
558
559impl TxnOffsetCommitRequestTopic {
560 pub fn with_name(mut self, value: super::TopicName) -> Self {
566 self.name = value;
567 self
568 }
569 pub fn with_partitions(mut self, value: Vec<TxnOffsetCommitRequestPartition>) -> Self {
575 self.partitions = value;
576 self
577 }
578 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
580 self.unknown_tagged_fields = value;
581 self
582 }
583 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
585 self.unknown_tagged_fields.insert(key, value);
586 self
587 }
588}
589
590#[cfg(feature = "client")]
591impl Encodable for TxnOffsetCommitRequestTopic {
592 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
593 if version < 0 || version > 4 {
594 bail!("specified version not supported by this message type");
595 }
596 if version >= 3 {
597 types::CompactString.encode(buf, &self.name)?;
598 } else {
599 types::String.encode(buf, &self.name)?;
600 }
601 if version >= 3 {
602 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
603 } else {
604 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
605 }
606 if version >= 3 {
607 let num_tagged_fields = self.unknown_tagged_fields.len();
608 if num_tagged_fields > std::u32::MAX as usize {
609 bail!(
610 "Too many tagged fields to encode ({} fields)",
611 num_tagged_fields
612 );
613 }
614 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
615
616 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
617 }
618 Ok(())
619 }
620 fn compute_size(&self, version: i16) -> Result<usize> {
621 let mut total_size = 0;
622 if version >= 3 {
623 total_size += types::CompactString.compute_size(&self.name)?;
624 } else {
625 total_size += types::String.compute_size(&self.name)?;
626 }
627 if version >= 3 {
628 total_size +=
629 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
630 } else {
631 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
632 }
633 if version >= 3 {
634 let num_tagged_fields = self.unknown_tagged_fields.len();
635 if num_tagged_fields > std::u32::MAX as usize {
636 bail!(
637 "Too many tagged fields to encode ({} fields)",
638 num_tagged_fields
639 );
640 }
641 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
642
643 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
644 }
645 Ok(total_size)
646 }
647}
648
649#[cfg(feature = "broker")]
650impl Decodable for TxnOffsetCommitRequestTopic {
651 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
652 if version < 0 || version > 4 {
653 bail!("specified version not supported by this message type");
654 }
655 let name = if version >= 3 {
656 types::CompactString.decode(buf)?
657 } else {
658 types::String.decode(buf)?
659 };
660 let partitions = if version >= 3 {
661 types::CompactArray(types::Struct { version }).decode(buf)?
662 } else {
663 types::Array(types::Struct { version }).decode(buf)?
664 };
665 let mut unknown_tagged_fields = BTreeMap::new();
666 if version >= 3 {
667 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
668 for _ in 0..num_tagged_fields {
669 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
670 let size: u32 = types::UnsignedVarInt.decode(buf)?;
671 let unknown_value = buf.try_get_bytes(size as usize)?;
672 unknown_tagged_fields.insert(tag as i32, unknown_value);
673 }
674 }
675 Ok(Self {
676 name,
677 partitions,
678 unknown_tagged_fields,
679 })
680 }
681}
682
683impl Default for TxnOffsetCommitRequestTopic {
684 fn default() -> Self {
685 Self {
686 name: Default::default(),
687 partitions: Default::default(),
688 unknown_tagged_fields: BTreeMap::new(),
689 }
690 }
691}
692
693impl Message for TxnOffsetCommitRequestTopic {
694 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
695 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
696}
697
698impl HeaderVersion for TxnOffsetCommitRequest {
699 fn header_version(version: i16) -> i16 {
700 if version >= 3 {
701 2
702 } else {
703 1
704 }
705 }
706}