1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct TxnOffsetCommitRequest {
24 pub transactional_id: super::TransactionalId,
28
29 pub group_id: super::GroupId,
33
34 pub producer_id: super::ProducerId,
38
39 pub producer_epoch: i16,
43
44 pub generation_id: i32,
48
49 pub member_id: StrBytes,
53
54 pub group_instance_id: Option<StrBytes>,
58
59 pub topics: Vec<TxnOffsetCommitRequestTopic>,
63
64 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
66}
67
68impl TxnOffsetCommitRequest {
69 pub fn with_transactional_id(mut self, value: super::TransactionalId) -> Self {
75 self.transactional_id = value;
76 self
77 }
78 pub fn with_group_id(mut self, value: super::GroupId) -> Self {
84 self.group_id = value;
85 self
86 }
87 pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
93 self.producer_id = value;
94 self
95 }
96 pub fn with_producer_epoch(mut self, value: i16) -> Self {
102 self.producer_epoch = value;
103 self
104 }
105 pub fn with_generation_id(mut self, value: i32) -> Self {
111 self.generation_id = value;
112 self
113 }
114 pub fn with_member_id(mut self, value: StrBytes) -> Self {
120 self.member_id = value;
121 self
122 }
123 pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
129 self.group_instance_id = value;
130 self
131 }
132 pub fn with_topics(mut self, value: Vec<TxnOffsetCommitRequestTopic>) -> Self {
138 self.topics = value;
139 self
140 }
141 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
143 self.unknown_tagged_fields = value;
144 self
145 }
146 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
148 self.unknown_tagged_fields.insert(key, value);
149 self
150 }
151}
152
153#[cfg(feature = "client")]
154impl Encodable for TxnOffsetCommitRequest {
155 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
156 if version >= 3 {
157 types::CompactString.encode(buf, &self.transactional_id)?;
158 } else {
159 types::String.encode(buf, &self.transactional_id)?;
160 }
161 if version >= 3 {
162 types::CompactString.encode(buf, &self.group_id)?;
163 } else {
164 types::String.encode(buf, &self.group_id)?;
165 }
166 types::Int64.encode(buf, &self.producer_id)?;
167 types::Int16.encode(buf, &self.producer_epoch)?;
168 if version >= 3 {
169 types::Int32.encode(buf, &self.generation_id)?;
170 } else {
171 if self.generation_id != -1 {
172 bail!("A field is set that is not available on the selected protocol version");
173 }
174 }
175 if version >= 3 {
176 types::CompactString.encode(buf, &self.member_id)?;
177 } else {
178 if &self.member_id != "" {
179 bail!("A field is set that is not available on the selected protocol version");
180 }
181 }
182 if version >= 3 {
183 types::CompactString.encode(buf, &self.group_instance_id)?;
184 } else {
185 if !self.group_instance_id.is_none() {
186 bail!("A field is set that is not available on the selected protocol version");
187 }
188 }
189 if version >= 3 {
190 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
191 } else {
192 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
193 }
194 if version >= 3 {
195 let num_tagged_fields = self.unknown_tagged_fields.len();
196 if num_tagged_fields > std::u32::MAX as usize {
197 bail!(
198 "Too many tagged fields to encode ({} fields)",
199 num_tagged_fields
200 );
201 }
202 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
203
204 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
205 }
206 Ok(())
207 }
208 fn compute_size(&self, version: i16) -> Result<usize> {
209 let mut total_size = 0;
210 if version >= 3 {
211 total_size += types::CompactString.compute_size(&self.transactional_id)?;
212 } else {
213 total_size += types::String.compute_size(&self.transactional_id)?;
214 }
215 if version >= 3 {
216 total_size += types::CompactString.compute_size(&self.group_id)?;
217 } else {
218 total_size += types::String.compute_size(&self.group_id)?;
219 }
220 total_size += types::Int64.compute_size(&self.producer_id)?;
221 total_size += types::Int16.compute_size(&self.producer_epoch)?;
222 if version >= 3 {
223 total_size += types::Int32.compute_size(&self.generation_id)?;
224 } else {
225 if self.generation_id != -1 {
226 bail!("A field is set that is not available on the selected protocol version");
227 }
228 }
229 if version >= 3 {
230 total_size += types::CompactString.compute_size(&self.member_id)?;
231 } else {
232 if &self.member_id != "" {
233 bail!("A field is set that is not available on the selected protocol version");
234 }
235 }
236 if version >= 3 {
237 total_size += types::CompactString.compute_size(&self.group_instance_id)?;
238 } else {
239 if !self.group_instance_id.is_none() {
240 bail!("A field is set that is not available on the selected protocol version");
241 }
242 }
243 if version >= 3 {
244 total_size +=
245 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
246 } else {
247 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
248 }
249 if version >= 3 {
250 let num_tagged_fields = self.unknown_tagged_fields.len();
251 if num_tagged_fields > std::u32::MAX as usize {
252 bail!(
253 "Too many tagged fields to encode ({} fields)",
254 num_tagged_fields
255 );
256 }
257 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
258
259 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
260 }
261 Ok(total_size)
262 }
263}
264
265#[cfg(feature = "broker")]
266impl Decodable for TxnOffsetCommitRequest {
267 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
268 let transactional_id = if version >= 3 {
269 types::CompactString.decode(buf)?
270 } else {
271 types::String.decode(buf)?
272 };
273 let group_id = if version >= 3 {
274 types::CompactString.decode(buf)?
275 } else {
276 types::String.decode(buf)?
277 };
278 let producer_id = types::Int64.decode(buf)?;
279 let producer_epoch = types::Int16.decode(buf)?;
280 let generation_id = if version >= 3 {
281 types::Int32.decode(buf)?
282 } else {
283 -1
284 };
285 let member_id = if version >= 3 {
286 types::CompactString.decode(buf)?
287 } else {
288 StrBytes::from_static_str("")
289 };
290 let group_instance_id = if version >= 3 {
291 types::CompactString.decode(buf)?
292 } else {
293 None
294 };
295 let topics = if version >= 3 {
296 types::CompactArray(types::Struct { version }).decode(buf)?
297 } else {
298 types::Array(types::Struct { version }).decode(buf)?
299 };
300 let mut unknown_tagged_fields = BTreeMap::new();
301 if version >= 3 {
302 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
303 for _ in 0..num_tagged_fields {
304 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
305 let size: u32 = types::UnsignedVarInt.decode(buf)?;
306 let unknown_value = buf.try_get_bytes(size as usize)?;
307 unknown_tagged_fields.insert(tag as i32, unknown_value);
308 }
309 }
310 Ok(Self {
311 transactional_id,
312 group_id,
313 producer_id,
314 producer_epoch,
315 generation_id,
316 member_id,
317 group_instance_id,
318 topics,
319 unknown_tagged_fields,
320 })
321 }
322}
323
324impl Default for TxnOffsetCommitRequest {
325 fn default() -> Self {
326 Self {
327 transactional_id: Default::default(),
328 group_id: Default::default(),
329 producer_id: (0).into(),
330 producer_epoch: 0,
331 generation_id: -1,
332 member_id: StrBytes::from_static_str(""),
333 group_instance_id: None,
334 topics: Default::default(),
335 unknown_tagged_fields: BTreeMap::new(),
336 }
337 }
338}
339
340impl Message for TxnOffsetCommitRequest {
341 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
342 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
343}
344
345#[non_exhaustive]
347#[derive(Debug, Clone, PartialEq)]
348pub struct TxnOffsetCommitRequestPartition {
349 pub partition_index: i32,
353
354 pub committed_offset: i64,
358
359 pub committed_leader_epoch: i32,
363
364 pub committed_metadata: Option<StrBytes>,
368
369 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
371}
372
373impl TxnOffsetCommitRequestPartition {
374 pub fn with_partition_index(mut self, value: i32) -> Self {
380 self.partition_index = value;
381 self
382 }
383 pub fn with_committed_offset(mut self, value: i64) -> Self {
389 self.committed_offset = value;
390 self
391 }
392 pub fn with_committed_leader_epoch(mut self, value: i32) -> Self {
398 self.committed_leader_epoch = value;
399 self
400 }
401 pub fn with_committed_metadata(mut self, value: Option<StrBytes>) -> Self {
407 self.committed_metadata = value;
408 self
409 }
410 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
412 self.unknown_tagged_fields = value;
413 self
414 }
415 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
417 self.unknown_tagged_fields.insert(key, value);
418 self
419 }
420}
421
422#[cfg(feature = "client")]
423impl Encodable for TxnOffsetCommitRequestPartition {
424 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
425 types::Int32.encode(buf, &self.partition_index)?;
426 types::Int64.encode(buf, &self.committed_offset)?;
427 if version >= 2 {
428 types::Int32.encode(buf, &self.committed_leader_epoch)?;
429 }
430 if version >= 3 {
431 types::CompactString.encode(buf, &self.committed_metadata)?;
432 } else {
433 types::String.encode(buf, &self.committed_metadata)?;
434 }
435 if version >= 3 {
436 let num_tagged_fields = self.unknown_tagged_fields.len();
437 if num_tagged_fields > std::u32::MAX as usize {
438 bail!(
439 "Too many tagged fields to encode ({} fields)",
440 num_tagged_fields
441 );
442 }
443 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
444
445 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
446 }
447 Ok(())
448 }
449 fn compute_size(&self, version: i16) -> Result<usize> {
450 let mut total_size = 0;
451 total_size += types::Int32.compute_size(&self.partition_index)?;
452 total_size += types::Int64.compute_size(&self.committed_offset)?;
453 if version >= 2 {
454 total_size += types::Int32.compute_size(&self.committed_leader_epoch)?;
455 }
456 if version >= 3 {
457 total_size += types::CompactString.compute_size(&self.committed_metadata)?;
458 } else {
459 total_size += types::String.compute_size(&self.committed_metadata)?;
460 }
461 if version >= 3 {
462 let num_tagged_fields = self.unknown_tagged_fields.len();
463 if num_tagged_fields > std::u32::MAX as usize {
464 bail!(
465 "Too many tagged fields to encode ({} fields)",
466 num_tagged_fields
467 );
468 }
469 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
470
471 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
472 }
473 Ok(total_size)
474 }
475}
476
477#[cfg(feature = "broker")]
478impl Decodable for TxnOffsetCommitRequestPartition {
479 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
480 let partition_index = types::Int32.decode(buf)?;
481 let committed_offset = types::Int64.decode(buf)?;
482 let committed_leader_epoch = if version >= 2 {
483 types::Int32.decode(buf)?
484 } else {
485 -1
486 };
487 let committed_metadata = if version >= 3 {
488 types::CompactString.decode(buf)?
489 } else {
490 types::String.decode(buf)?
491 };
492 let mut unknown_tagged_fields = BTreeMap::new();
493 if version >= 3 {
494 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
495 for _ in 0..num_tagged_fields {
496 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
497 let size: u32 = types::UnsignedVarInt.decode(buf)?;
498 let unknown_value = buf.try_get_bytes(size as usize)?;
499 unknown_tagged_fields.insert(tag as i32, unknown_value);
500 }
501 }
502 Ok(Self {
503 partition_index,
504 committed_offset,
505 committed_leader_epoch,
506 committed_metadata,
507 unknown_tagged_fields,
508 })
509 }
510}
511
512impl Default for TxnOffsetCommitRequestPartition {
513 fn default() -> Self {
514 Self {
515 partition_index: 0,
516 committed_offset: 0,
517 committed_leader_epoch: -1,
518 committed_metadata: Some(Default::default()),
519 unknown_tagged_fields: BTreeMap::new(),
520 }
521 }
522}
523
524impl Message for TxnOffsetCommitRequestPartition {
525 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
526 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
527}
528
529#[non_exhaustive]
531#[derive(Debug, Clone, PartialEq)]
532pub struct TxnOffsetCommitRequestTopic {
533 pub name: super::TopicName,
537
538 pub partitions: Vec<TxnOffsetCommitRequestPartition>,
542
543 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
545}
546
547impl TxnOffsetCommitRequestTopic {
548 pub fn with_name(mut self, value: super::TopicName) -> Self {
554 self.name = value;
555 self
556 }
557 pub fn with_partitions(mut self, value: Vec<TxnOffsetCommitRequestPartition>) -> Self {
563 self.partitions = value;
564 self
565 }
566 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
568 self.unknown_tagged_fields = value;
569 self
570 }
571 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
573 self.unknown_tagged_fields.insert(key, value);
574 self
575 }
576}
577
578#[cfg(feature = "client")]
579impl Encodable for TxnOffsetCommitRequestTopic {
580 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
581 if version >= 3 {
582 types::CompactString.encode(buf, &self.name)?;
583 } else {
584 types::String.encode(buf, &self.name)?;
585 }
586 if version >= 3 {
587 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
588 } else {
589 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
590 }
591 if version >= 3 {
592 let num_tagged_fields = self.unknown_tagged_fields.len();
593 if num_tagged_fields > std::u32::MAX as usize {
594 bail!(
595 "Too many tagged fields to encode ({} fields)",
596 num_tagged_fields
597 );
598 }
599 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
600
601 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
602 }
603 Ok(())
604 }
605 fn compute_size(&self, version: i16) -> Result<usize> {
606 let mut total_size = 0;
607 if version >= 3 {
608 total_size += types::CompactString.compute_size(&self.name)?;
609 } else {
610 total_size += types::String.compute_size(&self.name)?;
611 }
612 if version >= 3 {
613 total_size +=
614 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
615 } else {
616 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
617 }
618 if version >= 3 {
619 let num_tagged_fields = self.unknown_tagged_fields.len();
620 if num_tagged_fields > std::u32::MAX as usize {
621 bail!(
622 "Too many tagged fields to encode ({} fields)",
623 num_tagged_fields
624 );
625 }
626 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
627
628 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
629 }
630 Ok(total_size)
631 }
632}
633
634#[cfg(feature = "broker")]
635impl Decodable for TxnOffsetCommitRequestTopic {
636 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
637 let name = if version >= 3 {
638 types::CompactString.decode(buf)?
639 } else {
640 types::String.decode(buf)?
641 };
642 let partitions = if version >= 3 {
643 types::CompactArray(types::Struct { version }).decode(buf)?
644 } else {
645 types::Array(types::Struct { version }).decode(buf)?
646 };
647 let mut unknown_tagged_fields = BTreeMap::new();
648 if version >= 3 {
649 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
650 for _ in 0..num_tagged_fields {
651 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
652 let size: u32 = types::UnsignedVarInt.decode(buf)?;
653 let unknown_value = buf.try_get_bytes(size as usize)?;
654 unknown_tagged_fields.insert(tag as i32, unknown_value);
655 }
656 }
657 Ok(Self {
658 name,
659 partitions,
660 unknown_tagged_fields,
661 })
662 }
663}
664
665impl Default for TxnOffsetCommitRequestTopic {
666 fn default() -> Self {
667 Self {
668 name: Default::default(),
669 partitions: Default::default(),
670 unknown_tagged_fields: BTreeMap::new(),
671 }
672 }
673}
674
675impl Message for TxnOffsetCommitRequestTopic {
676 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
677 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
678}
679
680impl HeaderVersion for TxnOffsetCommitRequest {
681 fn header_version(version: i16) -> i16 {
682 if version >= 3 {
683 2
684 } else {
685 1
686 }
687 }
688}