1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetCommitRequest {
24 pub group_id: super::GroupId,
28
29 pub generation_id_or_member_epoch: i32,
33
34 pub member_id: StrBytes,
38
39 pub group_instance_id: Option<StrBytes>,
43
44 pub retention_time_ms: i64,
48
49 pub topics: Vec<OffsetCommitRequestTopic>,
53
54 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl OffsetCommitRequest {
59 pub fn with_group_id(mut self, value: super::GroupId) -> Self {
65 self.group_id = value;
66 self
67 }
68 pub fn with_generation_id_or_member_epoch(mut self, value: i32) -> Self {
74 self.generation_id_or_member_epoch = value;
75 self
76 }
77 pub fn with_member_id(mut self, value: StrBytes) -> Self {
83 self.member_id = value;
84 self
85 }
86 pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
92 self.group_instance_id = value;
93 self
94 }
95 pub fn with_retention_time_ms(mut self, value: i64) -> Self {
101 self.retention_time_ms = value;
102 self
103 }
104 pub fn with_topics(mut self, value: Vec<OffsetCommitRequestTopic>) -> Self {
110 self.topics = value;
111 self
112 }
113 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115 self.unknown_tagged_fields = value;
116 self
117 }
118 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120 self.unknown_tagged_fields.insert(key, value);
121 self
122 }
123}
124
125#[cfg(feature = "client")]
126impl Encodable for OffsetCommitRequest {
127 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128 if version < 0 || version > 9 {
129 bail!("specified version not supported by this message type");
130 }
131 if version >= 8 {
132 types::CompactString.encode(buf, &self.group_id)?;
133 } else {
134 types::String.encode(buf, &self.group_id)?;
135 }
136 if version >= 1 {
137 types::Int32.encode(buf, &self.generation_id_or_member_epoch)?;
138 }
139 if version >= 1 {
140 if version >= 8 {
141 types::CompactString.encode(buf, &self.member_id)?;
142 } else {
143 types::String.encode(buf, &self.member_id)?;
144 }
145 }
146 if version >= 7 {
147 if version >= 8 {
148 types::CompactString.encode(buf, &self.group_instance_id)?;
149 } else {
150 types::String.encode(buf, &self.group_instance_id)?;
151 }
152 } else {
153 if !self.group_instance_id.is_none() {
154 bail!("A field is set that is not available on the selected protocol version");
155 }
156 }
157 if version >= 2 && version <= 4 {
158 types::Int64.encode(buf, &self.retention_time_ms)?;
159 }
160 if version >= 8 {
161 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
162 } else {
163 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
164 }
165 if version >= 8 {
166 let num_tagged_fields = self.unknown_tagged_fields.len();
167 if num_tagged_fields > std::u32::MAX as usize {
168 bail!(
169 "Too many tagged fields to encode ({} fields)",
170 num_tagged_fields
171 );
172 }
173 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
174
175 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
176 }
177 Ok(())
178 }
179 fn compute_size(&self, version: i16) -> Result<usize> {
180 let mut total_size = 0;
181 if version >= 8 {
182 total_size += types::CompactString.compute_size(&self.group_id)?;
183 } else {
184 total_size += types::String.compute_size(&self.group_id)?;
185 }
186 if version >= 1 {
187 total_size += types::Int32.compute_size(&self.generation_id_or_member_epoch)?;
188 }
189 if version >= 1 {
190 if version >= 8 {
191 total_size += types::CompactString.compute_size(&self.member_id)?;
192 } else {
193 total_size += types::String.compute_size(&self.member_id)?;
194 }
195 }
196 if version >= 7 {
197 if version >= 8 {
198 total_size += types::CompactString.compute_size(&self.group_instance_id)?;
199 } else {
200 total_size += types::String.compute_size(&self.group_instance_id)?;
201 }
202 } else {
203 if !self.group_instance_id.is_none() {
204 bail!("A field is set that is not available on the selected protocol version");
205 }
206 }
207 if version >= 2 && version <= 4 {
208 total_size += types::Int64.compute_size(&self.retention_time_ms)?;
209 }
210 if version >= 8 {
211 total_size +=
212 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
213 } else {
214 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
215 }
216 if version >= 8 {
217 let num_tagged_fields = self.unknown_tagged_fields.len();
218 if num_tagged_fields > std::u32::MAX as usize {
219 bail!(
220 "Too many tagged fields to encode ({} fields)",
221 num_tagged_fields
222 );
223 }
224 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
225
226 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
227 }
228 Ok(total_size)
229 }
230}
231
232#[cfg(feature = "broker")]
233impl Decodable for OffsetCommitRequest {
234 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
235 if version < 0 || version > 9 {
236 bail!("specified version not supported by this message type");
237 }
238 let group_id = if version >= 8 {
239 types::CompactString.decode(buf)?
240 } else {
241 types::String.decode(buf)?
242 };
243 let generation_id_or_member_epoch = if version >= 1 {
244 types::Int32.decode(buf)?
245 } else {
246 -1
247 };
248 let member_id = if version >= 1 {
249 if version >= 8 {
250 types::CompactString.decode(buf)?
251 } else {
252 types::String.decode(buf)?
253 }
254 } else {
255 Default::default()
256 };
257 let group_instance_id = if version >= 7 {
258 if version >= 8 {
259 types::CompactString.decode(buf)?
260 } else {
261 types::String.decode(buf)?
262 }
263 } else {
264 None
265 };
266 let retention_time_ms = if version >= 2 && version <= 4 {
267 types::Int64.decode(buf)?
268 } else {
269 -1
270 };
271 let topics = if version >= 8 {
272 types::CompactArray(types::Struct { version }).decode(buf)?
273 } else {
274 types::Array(types::Struct { version }).decode(buf)?
275 };
276 let mut unknown_tagged_fields = BTreeMap::new();
277 if version >= 8 {
278 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
279 for _ in 0..num_tagged_fields {
280 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
281 let size: u32 = types::UnsignedVarInt.decode(buf)?;
282 let unknown_value = buf.try_get_bytes(size as usize)?;
283 unknown_tagged_fields.insert(tag as i32, unknown_value);
284 }
285 }
286 Ok(Self {
287 group_id,
288 generation_id_or_member_epoch,
289 member_id,
290 group_instance_id,
291 retention_time_ms,
292 topics,
293 unknown_tagged_fields,
294 })
295 }
296}
297
298impl Default for OffsetCommitRequest {
299 fn default() -> Self {
300 Self {
301 group_id: Default::default(),
302 generation_id_or_member_epoch: -1,
303 member_id: Default::default(),
304 group_instance_id: None,
305 retention_time_ms: -1,
306 topics: Default::default(),
307 unknown_tagged_fields: BTreeMap::new(),
308 }
309 }
310}
311
312impl Message for OffsetCommitRequest {
313 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
314 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
315}
316
317#[non_exhaustive]
319#[derive(Debug, Clone, PartialEq)]
320pub struct OffsetCommitRequestPartition {
321 pub partition_index: i32,
325
326 pub committed_offset: i64,
330
331 pub committed_leader_epoch: i32,
335
336 pub commit_timestamp: i64,
340
341 pub committed_metadata: Option<StrBytes>,
345
346 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
348}
349
350impl OffsetCommitRequestPartition {
351 pub fn with_partition_index(mut self, value: i32) -> Self {
357 self.partition_index = value;
358 self
359 }
360 pub fn with_committed_offset(mut self, value: i64) -> Self {
366 self.committed_offset = value;
367 self
368 }
369 pub fn with_committed_leader_epoch(mut self, value: i32) -> Self {
375 self.committed_leader_epoch = value;
376 self
377 }
378 pub fn with_commit_timestamp(mut self, value: i64) -> Self {
384 self.commit_timestamp = value;
385 self
386 }
387 pub fn with_committed_metadata(mut self, value: Option<StrBytes>) -> Self {
393 self.committed_metadata = value;
394 self
395 }
396 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
398 self.unknown_tagged_fields = value;
399 self
400 }
401 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
403 self.unknown_tagged_fields.insert(key, value);
404 self
405 }
406}
407
408#[cfg(feature = "client")]
409impl Encodable for OffsetCommitRequestPartition {
410 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
411 if version < 0 || version > 9 {
412 bail!("specified version not supported by this message type");
413 }
414 types::Int32.encode(buf, &self.partition_index)?;
415 types::Int64.encode(buf, &self.committed_offset)?;
416 if version >= 6 {
417 types::Int32.encode(buf, &self.committed_leader_epoch)?;
418 }
419 if version == 1 {
420 types::Int64.encode(buf, &self.commit_timestamp)?;
421 } else {
422 if self.commit_timestamp != -1 {
423 bail!("A field is set that is not available on the selected protocol version");
424 }
425 }
426 if version >= 8 {
427 types::CompactString.encode(buf, &self.committed_metadata)?;
428 } else {
429 types::String.encode(buf, &self.committed_metadata)?;
430 }
431 if version >= 8 {
432 let num_tagged_fields = self.unknown_tagged_fields.len();
433 if num_tagged_fields > std::u32::MAX as usize {
434 bail!(
435 "Too many tagged fields to encode ({} fields)",
436 num_tagged_fields
437 );
438 }
439 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
440
441 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
442 }
443 Ok(())
444 }
445 fn compute_size(&self, version: i16) -> Result<usize> {
446 let mut total_size = 0;
447 total_size += types::Int32.compute_size(&self.partition_index)?;
448 total_size += types::Int64.compute_size(&self.committed_offset)?;
449 if version >= 6 {
450 total_size += types::Int32.compute_size(&self.committed_leader_epoch)?;
451 }
452 if version == 1 {
453 total_size += types::Int64.compute_size(&self.commit_timestamp)?;
454 } else {
455 if self.commit_timestamp != -1 {
456 bail!("A field is set that is not available on the selected protocol version");
457 }
458 }
459 if version >= 8 {
460 total_size += types::CompactString.compute_size(&self.committed_metadata)?;
461 } else {
462 total_size += types::String.compute_size(&self.committed_metadata)?;
463 }
464 if version >= 8 {
465 let num_tagged_fields = self.unknown_tagged_fields.len();
466 if num_tagged_fields > std::u32::MAX as usize {
467 bail!(
468 "Too many tagged fields to encode ({} fields)",
469 num_tagged_fields
470 );
471 }
472 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
473
474 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
475 }
476 Ok(total_size)
477 }
478}
479
480#[cfg(feature = "broker")]
481impl Decodable for OffsetCommitRequestPartition {
482 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
483 if version < 0 || version > 9 {
484 bail!("specified version not supported by this message type");
485 }
486 let partition_index = types::Int32.decode(buf)?;
487 let committed_offset = types::Int64.decode(buf)?;
488 let committed_leader_epoch = if version >= 6 {
489 types::Int32.decode(buf)?
490 } else {
491 -1
492 };
493 let commit_timestamp = if version == 1 {
494 types::Int64.decode(buf)?
495 } else {
496 -1
497 };
498 let committed_metadata = if version >= 8 {
499 types::CompactString.decode(buf)?
500 } else {
501 types::String.decode(buf)?
502 };
503 let mut unknown_tagged_fields = BTreeMap::new();
504 if version >= 8 {
505 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
506 for _ in 0..num_tagged_fields {
507 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
508 let size: u32 = types::UnsignedVarInt.decode(buf)?;
509 let unknown_value = buf.try_get_bytes(size as usize)?;
510 unknown_tagged_fields.insert(tag as i32, unknown_value);
511 }
512 }
513 Ok(Self {
514 partition_index,
515 committed_offset,
516 committed_leader_epoch,
517 commit_timestamp,
518 committed_metadata,
519 unknown_tagged_fields,
520 })
521 }
522}
523
524impl Default for OffsetCommitRequestPartition {
525 fn default() -> Self {
526 Self {
527 partition_index: 0,
528 committed_offset: 0,
529 committed_leader_epoch: -1,
530 commit_timestamp: -1,
531 committed_metadata: Some(Default::default()),
532 unknown_tagged_fields: BTreeMap::new(),
533 }
534 }
535}
536
537impl Message for OffsetCommitRequestPartition {
538 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
539 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
540}
541
542#[non_exhaustive]
544#[derive(Debug, Clone, PartialEq)]
545pub struct OffsetCommitRequestTopic {
546 pub name: super::TopicName,
550
551 pub partitions: Vec<OffsetCommitRequestPartition>,
555
556 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
558}
559
560impl OffsetCommitRequestTopic {
561 pub fn with_name(mut self, value: super::TopicName) -> Self {
567 self.name = value;
568 self
569 }
570 pub fn with_partitions(mut self, value: Vec<OffsetCommitRequestPartition>) -> Self {
576 self.partitions = value;
577 self
578 }
579 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
581 self.unknown_tagged_fields = value;
582 self
583 }
584 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
586 self.unknown_tagged_fields.insert(key, value);
587 self
588 }
589}
590
591#[cfg(feature = "client")]
592impl Encodable for OffsetCommitRequestTopic {
593 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
594 if version < 0 || version > 9 {
595 bail!("specified version not supported by this message type");
596 }
597 if version >= 8 {
598 types::CompactString.encode(buf, &self.name)?;
599 } else {
600 types::String.encode(buf, &self.name)?;
601 }
602 if version >= 8 {
603 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
604 } else {
605 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
606 }
607 if version >= 8 {
608 let num_tagged_fields = self.unknown_tagged_fields.len();
609 if num_tagged_fields > std::u32::MAX as usize {
610 bail!(
611 "Too many tagged fields to encode ({} fields)",
612 num_tagged_fields
613 );
614 }
615 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
616
617 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
618 }
619 Ok(())
620 }
621 fn compute_size(&self, version: i16) -> Result<usize> {
622 let mut total_size = 0;
623 if version >= 8 {
624 total_size += types::CompactString.compute_size(&self.name)?;
625 } else {
626 total_size += types::String.compute_size(&self.name)?;
627 }
628 if version >= 8 {
629 total_size +=
630 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
631 } else {
632 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
633 }
634 if version >= 8 {
635 let num_tagged_fields = self.unknown_tagged_fields.len();
636 if num_tagged_fields > std::u32::MAX as usize {
637 bail!(
638 "Too many tagged fields to encode ({} fields)",
639 num_tagged_fields
640 );
641 }
642 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
643
644 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
645 }
646 Ok(total_size)
647 }
648}
649
650#[cfg(feature = "broker")]
651impl Decodable for OffsetCommitRequestTopic {
652 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
653 if version < 0 || version > 9 {
654 bail!("specified version not supported by this message type");
655 }
656 let name = if version >= 8 {
657 types::CompactString.decode(buf)?
658 } else {
659 types::String.decode(buf)?
660 };
661 let partitions = if version >= 8 {
662 types::CompactArray(types::Struct { version }).decode(buf)?
663 } else {
664 types::Array(types::Struct { version }).decode(buf)?
665 };
666 let mut unknown_tagged_fields = BTreeMap::new();
667 if version >= 8 {
668 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
669 for _ in 0..num_tagged_fields {
670 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
671 let size: u32 = types::UnsignedVarInt.decode(buf)?;
672 let unknown_value = buf.try_get_bytes(size as usize)?;
673 unknown_tagged_fields.insert(tag as i32, unknown_value);
674 }
675 }
676 Ok(Self {
677 name,
678 partitions,
679 unknown_tagged_fields,
680 })
681 }
682}
683
684impl Default for OffsetCommitRequestTopic {
685 fn default() -> Self {
686 Self {
687 name: Default::default(),
688 partitions: Default::default(),
689 unknown_tagged_fields: BTreeMap::new(),
690 }
691 }
692}
693
694impl Message for OffsetCommitRequestTopic {
695 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
696 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
697}
698
699impl HeaderVersion for OffsetCommitRequest {
700 fn header_version(version: i16) -> i16 {
701 if version >= 8 {
702 2
703 } else {
704 1
705 }
706 }
707}