1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetCommitRequest {
24 pub group_id: super::GroupId,
28
29 pub generation_id_or_member_epoch: i32,
33
34 pub member_id: StrBytes,
38
39 pub group_instance_id: Option<StrBytes>,
43
44 pub retention_time_ms: i64,
48
49 pub topics: Vec<OffsetCommitRequestTopic>,
53
54 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl OffsetCommitRequest {
59 pub fn with_group_id(mut self, value: super::GroupId) -> Self {
65 self.group_id = value;
66 self
67 }
68 pub fn with_generation_id_or_member_epoch(mut self, value: i32) -> Self {
74 self.generation_id_or_member_epoch = value;
75 self
76 }
77 pub fn with_member_id(mut self, value: StrBytes) -> Self {
83 self.member_id = value;
84 self
85 }
86 pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
92 self.group_instance_id = value;
93 self
94 }
95 pub fn with_retention_time_ms(mut self, value: i64) -> Self {
101 self.retention_time_ms = value;
102 self
103 }
104 pub fn with_topics(mut self, value: Vec<OffsetCommitRequestTopic>) -> Self {
110 self.topics = value;
111 self
112 }
113 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115 self.unknown_tagged_fields = value;
116 self
117 }
118 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120 self.unknown_tagged_fields.insert(key, value);
121 self
122 }
123}
124
125#[cfg(feature = "client")]
126impl Encodable for OffsetCommitRequest {
127 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128 if version >= 8 {
129 types::CompactString.encode(buf, &self.group_id)?;
130 } else {
131 types::String.encode(buf, &self.group_id)?;
132 }
133 if version >= 1 {
134 types::Int32.encode(buf, &self.generation_id_or_member_epoch)?;
135 }
136 if version >= 1 {
137 if version >= 8 {
138 types::CompactString.encode(buf, &self.member_id)?;
139 } else {
140 types::String.encode(buf, &self.member_id)?;
141 }
142 }
143 if version >= 7 {
144 if version >= 8 {
145 types::CompactString.encode(buf, &self.group_instance_id)?;
146 } else {
147 types::String.encode(buf, &self.group_instance_id)?;
148 }
149 } else {
150 if !self.group_instance_id.is_none() {
151 bail!("A field is set that is not available on the selected protocol version");
152 }
153 }
154 if version >= 2 && version <= 4 {
155 types::Int64.encode(buf, &self.retention_time_ms)?;
156 }
157 if version >= 8 {
158 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
159 } else {
160 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
161 }
162 if version >= 8 {
163 let num_tagged_fields = self.unknown_tagged_fields.len();
164 if num_tagged_fields > std::u32::MAX as usize {
165 bail!(
166 "Too many tagged fields to encode ({} fields)",
167 num_tagged_fields
168 );
169 }
170 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
171
172 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
173 }
174 Ok(())
175 }
176 fn compute_size(&self, version: i16) -> Result<usize> {
177 let mut total_size = 0;
178 if version >= 8 {
179 total_size += types::CompactString.compute_size(&self.group_id)?;
180 } else {
181 total_size += types::String.compute_size(&self.group_id)?;
182 }
183 if version >= 1 {
184 total_size += types::Int32.compute_size(&self.generation_id_or_member_epoch)?;
185 }
186 if version >= 1 {
187 if version >= 8 {
188 total_size += types::CompactString.compute_size(&self.member_id)?;
189 } else {
190 total_size += types::String.compute_size(&self.member_id)?;
191 }
192 }
193 if version >= 7 {
194 if version >= 8 {
195 total_size += types::CompactString.compute_size(&self.group_instance_id)?;
196 } else {
197 total_size += types::String.compute_size(&self.group_instance_id)?;
198 }
199 } else {
200 if !self.group_instance_id.is_none() {
201 bail!("A field is set that is not available on the selected protocol version");
202 }
203 }
204 if version >= 2 && version <= 4 {
205 total_size += types::Int64.compute_size(&self.retention_time_ms)?;
206 }
207 if version >= 8 {
208 total_size +=
209 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
210 } else {
211 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
212 }
213 if version >= 8 {
214 let num_tagged_fields = self.unknown_tagged_fields.len();
215 if num_tagged_fields > std::u32::MAX as usize {
216 bail!(
217 "Too many tagged fields to encode ({} fields)",
218 num_tagged_fields
219 );
220 }
221 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
222
223 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
224 }
225 Ok(total_size)
226 }
227}
228
229#[cfg(feature = "broker")]
230impl Decodable for OffsetCommitRequest {
231 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
232 let group_id = if version >= 8 {
233 types::CompactString.decode(buf)?
234 } else {
235 types::String.decode(buf)?
236 };
237 let generation_id_or_member_epoch = if version >= 1 {
238 types::Int32.decode(buf)?
239 } else {
240 -1
241 };
242 let member_id = if version >= 1 {
243 if version >= 8 {
244 types::CompactString.decode(buf)?
245 } else {
246 types::String.decode(buf)?
247 }
248 } else {
249 Default::default()
250 };
251 let group_instance_id = if version >= 7 {
252 if version >= 8 {
253 types::CompactString.decode(buf)?
254 } else {
255 types::String.decode(buf)?
256 }
257 } else {
258 None
259 };
260 let retention_time_ms = if version >= 2 && version <= 4 {
261 types::Int64.decode(buf)?
262 } else {
263 -1
264 };
265 let topics = if version >= 8 {
266 types::CompactArray(types::Struct { version }).decode(buf)?
267 } else {
268 types::Array(types::Struct { version }).decode(buf)?
269 };
270 let mut unknown_tagged_fields = BTreeMap::new();
271 if version >= 8 {
272 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
273 for _ in 0..num_tagged_fields {
274 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
275 let size: u32 = types::UnsignedVarInt.decode(buf)?;
276 let unknown_value = buf.try_get_bytes(size as usize)?;
277 unknown_tagged_fields.insert(tag as i32, unknown_value);
278 }
279 }
280 Ok(Self {
281 group_id,
282 generation_id_or_member_epoch,
283 member_id,
284 group_instance_id,
285 retention_time_ms,
286 topics,
287 unknown_tagged_fields,
288 })
289 }
290}
291
292impl Default for OffsetCommitRequest {
293 fn default() -> Self {
294 Self {
295 group_id: Default::default(),
296 generation_id_or_member_epoch: -1,
297 member_id: Default::default(),
298 group_instance_id: None,
299 retention_time_ms: -1,
300 topics: Default::default(),
301 unknown_tagged_fields: BTreeMap::new(),
302 }
303 }
304}
305
306impl Message for OffsetCommitRequest {
307 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
308 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
309}
310
311#[non_exhaustive]
313#[derive(Debug, Clone, PartialEq)]
314pub struct OffsetCommitRequestPartition {
315 pub partition_index: i32,
319
320 pub committed_offset: i64,
324
325 pub committed_leader_epoch: i32,
329
330 pub commit_timestamp: i64,
334
335 pub committed_metadata: Option<StrBytes>,
339
340 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
342}
343
344impl OffsetCommitRequestPartition {
345 pub fn with_partition_index(mut self, value: i32) -> Self {
351 self.partition_index = value;
352 self
353 }
354 pub fn with_committed_offset(mut self, value: i64) -> Self {
360 self.committed_offset = value;
361 self
362 }
363 pub fn with_committed_leader_epoch(mut self, value: i32) -> Self {
369 self.committed_leader_epoch = value;
370 self
371 }
372 pub fn with_commit_timestamp(mut self, value: i64) -> Self {
378 self.commit_timestamp = value;
379 self
380 }
381 pub fn with_committed_metadata(mut self, value: Option<StrBytes>) -> Self {
387 self.committed_metadata = value;
388 self
389 }
390 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
392 self.unknown_tagged_fields = value;
393 self
394 }
395 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
397 self.unknown_tagged_fields.insert(key, value);
398 self
399 }
400}
401
402#[cfg(feature = "client")]
403impl Encodable for OffsetCommitRequestPartition {
404 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
405 types::Int32.encode(buf, &self.partition_index)?;
406 types::Int64.encode(buf, &self.committed_offset)?;
407 if version >= 6 {
408 types::Int32.encode(buf, &self.committed_leader_epoch)?;
409 }
410 if version == 1 {
411 types::Int64.encode(buf, &self.commit_timestamp)?;
412 } else {
413 if self.commit_timestamp != -1 {
414 bail!("A field is set that is not available on the selected protocol version");
415 }
416 }
417 if version >= 8 {
418 types::CompactString.encode(buf, &self.committed_metadata)?;
419 } else {
420 types::String.encode(buf, &self.committed_metadata)?;
421 }
422 if version >= 8 {
423 let num_tagged_fields = self.unknown_tagged_fields.len();
424 if num_tagged_fields > std::u32::MAX as usize {
425 bail!(
426 "Too many tagged fields to encode ({} fields)",
427 num_tagged_fields
428 );
429 }
430 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
431
432 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
433 }
434 Ok(())
435 }
436 fn compute_size(&self, version: i16) -> Result<usize> {
437 let mut total_size = 0;
438 total_size += types::Int32.compute_size(&self.partition_index)?;
439 total_size += types::Int64.compute_size(&self.committed_offset)?;
440 if version >= 6 {
441 total_size += types::Int32.compute_size(&self.committed_leader_epoch)?;
442 }
443 if version == 1 {
444 total_size += types::Int64.compute_size(&self.commit_timestamp)?;
445 } else {
446 if self.commit_timestamp != -1 {
447 bail!("A field is set that is not available on the selected protocol version");
448 }
449 }
450 if version >= 8 {
451 total_size += types::CompactString.compute_size(&self.committed_metadata)?;
452 } else {
453 total_size += types::String.compute_size(&self.committed_metadata)?;
454 }
455 if version >= 8 {
456 let num_tagged_fields = self.unknown_tagged_fields.len();
457 if num_tagged_fields > std::u32::MAX as usize {
458 bail!(
459 "Too many tagged fields to encode ({} fields)",
460 num_tagged_fields
461 );
462 }
463 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
464
465 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
466 }
467 Ok(total_size)
468 }
469}
470
471#[cfg(feature = "broker")]
472impl Decodable for OffsetCommitRequestPartition {
473 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
474 let partition_index = types::Int32.decode(buf)?;
475 let committed_offset = types::Int64.decode(buf)?;
476 let committed_leader_epoch = if version >= 6 {
477 types::Int32.decode(buf)?
478 } else {
479 -1
480 };
481 let commit_timestamp = if version == 1 {
482 types::Int64.decode(buf)?
483 } else {
484 -1
485 };
486 let committed_metadata = if version >= 8 {
487 types::CompactString.decode(buf)?
488 } else {
489 types::String.decode(buf)?
490 };
491 let mut unknown_tagged_fields = BTreeMap::new();
492 if version >= 8 {
493 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
494 for _ in 0..num_tagged_fields {
495 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
496 let size: u32 = types::UnsignedVarInt.decode(buf)?;
497 let unknown_value = buf.try_get_bytes(size as usize)?;
498 unknown_tagged_fields.insert(tag as i32, unknown_value);
499 }
500 }
501 Ok(Self {
502 partition_index,
503 committed_offset,
504 committed_leader_epoch,
505 commit_timestamp,
506 committed_metadata,
507 unknown_tagged_fields,
508 })
509 }
510}
511
512impl Default for OffsetCommitRequestPartition {
513 fn default() -> Self {
514 Self {
515 partition_index: 0,
516 committed_offset: 0,
517 committed_leader_epoch: -1,
518 commit_timestamp: -1,
519 committed_metadata: Some(Default::default()),
520 unknown_tagged_fields: BTreeMap::new(),
521 }
522 }
523}
524
525impl Message for OffsetCommitRequestPartition {
526 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
527 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
528}
529
530#[non_exhaustive]
532#[derive(Debug, Clone, PartialEq)]
533pub struct OffsetCommitRequestTopic {
534 pub name: super::TopicName,
538
539 pub partitions: Vec<OffsetCommitRequestPartition>,
543
544 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
546}
547
548impl OffsetCommitRequestTopic {
549 pub fn with_name(mut self, value: super::TopicName) -> Self {
555 self.name = value;
556 self
557 }
558 pub fn with_partitions(mut self, value: Vec<OffsetCommitRequestPartition>) -> Self {
564 self.partitions = value;
565 self
566 }
567 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
569 self.unknown_tagged_fields = value;
570 self
571 }
572 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
574 self.unknown_tagged_fields.insert(key, value);
575 self
576 }
577}
578
579#[cfg(feature = "client")]
580impl Encodable for OffsetCommitRequestTopic {
581 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
582 if version >= 8 {
583 types::CompactString.encode(buf, &self.name)?;
584 } else {
585 types::String.encode(buf, &self.name)?;
586 }
587 if version >= 8 {
588 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
589 } else {
590 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
591 }
592 if version >= 8 {
593 let num_tagged_fields = self.unknown_tagged_fields.len();
594 if num_tagged_fields > std::u32::MAX as usize {
595 bail!(
596 "Too many tagged fields to encode ({} fields)",
597 num_tagged_fields
598 );
599 }
600 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
601
602 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
603 }
604 Ok(())
605 }
606 fn compute_size(&self, version: i16) -> Result<usize> {
607 let mut total_size = 0;
608 if version >= 8 {
609 total_size += types::CompactString.compute_size(&self.name)?;
610 } else {
611 total_size += types::String.compute_size(&self.name)?;
612 }
613 if version >= 8 {
614 total_size +=
615 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
616 } else {
617 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
618 }
619 if version >= 8 {
620 let num_tagged_fields = self.unknown_tagged_fields.len();
621 if num_tagged_fields > std::u32::MAX as usize {
622 bail!(
623 "Too many tagged fields to encode ({} fields)",
624 num_tagged_fields
625 );
626 }
627 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
628
629 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
630 }
631 Ok(total_size)
632 }
633}
634
635#[cfg(feature = "broker")]
636impl Decodable for OffsetCommitRequestTopic {
637 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
638 let name = if version >= 8 {
639 types::CompactString.decode(buf)?
640 } else {
641 types::String.decode(buf)?
642 };
643 let partitions = if version >= 8 {
644 types::CompactArray(types::Struct { version }).decode(buf)?
645 } else {
646 types::Array(types::Struct { version }).decode(buf)?
647 };
648 let mut unknown_tagged_fields = BTreeMap::new();
649 if version >= 8 {
650 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
651 for _ in 0..num_tagged_fields {
652 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
653 let size: u32 = types::UnsignedVarInt.decode(buf)?;
654 let unknown_value = buf.try_get_bytes(size as usize)?;
655 unknown_tagged_fields.insert(tag as i32, unknown_value);
656 }
657 }
658 Ok(Self {
659 name,
660 partitions,
661 unknown_tagged_fields,
662 })
663 }
664}
665
666impl Default for OffsetCommitRequestTopic {
667 fn default() -> Self {
668 Self {
669 name: Default::default(),
670 partitions: Default::default(),
671 unknown_tagged_fields: BTreeMap::new(),
672 }
673 }
674}
675
676impl Message for OffsetCommitRequestTopic {
677 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
678 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
679}
680
681impl HeaderVersion for OffsetCommitRequest {
682 fn header_version(version: i16) -> i16 {
683 if version >= 8 {
684 2
685 } else {
686 1
687 }
688 }
689}