1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetCommitRequest {
24 pub group_id: super::GroupId,
28
29 pub generation_id_or_member_epoch: i32,
33
34 pub member_id: StrBytes,
38
39 pub group_instance_id: Option<StrBytes>,
43
44 pub retention_time_ms: i64,
48
49 pub topics: Vec<OffsetCommitRequestTopic>,
53
54 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl OffsetCommitRequest {
59 pub fn with_group_id(mut self, value: super::GroupId) -> Self {
65 self.group_id = value;
66 self
67 }
68 pub fn with_generation_id_or_member_epoch(mut self, value: i32) -> Self {
74 self.generation_id_or_member_epoch = value;
75 self
76 }
77 pub fn with_member_id(mut self, value: StrBytes) -> Self {
83 self.member_id = value;
84 self
85 }
86 pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
92 self.group_instance_id = value;
93 self
94 }
95 pub fn with_retention_time_ms(mut self, value: i64) -> Self {
101 self.retention_time_ms = value;
102 self
103 }
104 pub fn with_topics(mut self, value: Vec<OffsetCommitRequestTopic>) -> Self {
110 self.topics = value;
111 self
112 }
113 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115 self.unknown_tagged_fields = value;
116 self
117 }
118 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120 self.unknown_tagged_fields.insert(key, value);
121 self
122 }
123}
124
125#[cfg(feature = "client")]
126impl Encodable for OffsetCommitRequest {
127 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128 if version < 2 || version > 9 {
129 bail!("specified version not supported by this message type");
130 }
131 if version >= 8 {
132 types::CompactString.encode(buf, &self.group_id)?;
133 } else {
134 types::String.encode(buf, &self.group_id)?;
135 }
136 types::Int32.encode(buf, &self.generation_id_or_member_epoch)?;
137 if version >= 8 {
138 types::CompactString.encode(buf, &self.member_id)?;
139 } else {
140 types::String.encode(buf, &self.member_id)?;
141 }
142 if version >= 7 {
143 if version >= 8 {
144 types::CompactString.encode(buf, &self.group_instance_id)?;
145 } else {
146 types::String.encode(buf, &self.group_instance_id)?;
147 }
148 } else {
149 if !self.group_instance_id.is_none() {
150 bail!("A field is set that is not available on the selected protocol version");
151 }
152 }
153 if version <= 4 {
154 types::Int64.encode(buf, &self.retention_time_ms)?;
155 }
156 if version >= 8 {
157 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
158 } else {
159 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
160 }
161 if version >= 8 {
162 let num_tagged_fields = self.unknown_tagged_fields.len();
163 if num_tagged_fields > std::u32::MAX as usize {
164 bail!(
165 "Too many tagged fields to encode ({} fields)",
166 num_tagged_fields
167 );
168 }
169 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
170
171 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
172 }
173 Ok(())
174 }
175 fn compute_size(&self, version: i16) -> Result<usize> {
176 let mut total_size = 0;
177 if version >= 8 {
178 total_size += types::CompactString.compute_size(&self.group_id)?;
179 } else {
180 total_size += types::String.compute_size(&self.group_id)?;
181 }
182 total_size += types::Int32.compute_size(&self.generation_id_or_member_epoch)?;
183 if version >= 8 {
184 total_size += types::CompactString.compute_size(&self.member_id)?;
185 } else {
186 total_size += types::String.compute_size(&self.member_id)?;
187 }
188 if version >= 7 {
189 if version >= 8 {
190 total_size += types::CompactString.compute_size(&self.group_instance_id)?;
191 } else {
192 total_size += types::String.compute_size(&self.group_instance_id)?;
193 }
194 } else {
195 if !self.group_instance_id.is_none() {
196 bail!("A field is set that is not available on the selected protocol version");
197 }
198 }
199 if version <= 4 {
200 total_size += types::Int64.compute_size(&self.retention_time_ms)?;
201 }
202 if version >= 8 {
203 total_size +=
204 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
205 } else {
206 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
207 }
208 if version >= 8 {
209 let num_tagged_fields = self.unknown_tagged_fields.len();
210 if num_tagged_fields > std::u32::MAX as usize {
211 bail!(
212 "Too many tagged fields to encode ({} fields)",
213 num_tagged_fields
214 );
215 }
216 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
217
218 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
219 }
220 Ok(total_size)
221 }
222}
223
224#[cfg(feature = "broker")]
225impl Decodable for OffsetCommitRequest {
226 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
227 if version < 2 || version > 9 {
228 bail!("specified version not supported by this message type");
229 }
230 let group_id = if version >= 8 {
231 types::CompactString.decode(buf)?
232 } else {
233 types::String.decode(buf)?
234 };
235 let generation_id_or_member_epoch = types::Int32.decode(buf)?;
236 let member_id = if version >= 8 {
237 types::CompactString.decode(buf)?
238 } else {
239 types::String.decode(buf)?
240 };
241 let group_instance_id = if version >= 7 {
242 if version >= 8 {
243 types::CompactString.decode(buf)?
244 } else {
245 types::String.decode(buf)?
246 }
247 } else {
248 None
249 };
250 let retention_time_ms = if version <= 4 {
251 types::Int64.decode(buf)?
252 } else {
253 -1
254 };
255 let topics = if version >= 8 {
256 types::CompactArray(types::Struct { version }).decode(buf)?
257 } else {
258 types::Array(types::Struct { version }).decode(buf)?
259 };
260 let mut unknown_tagged_fields = BTreeMap::new();
261 if version >= 8 {
262 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
263 for _ in 0..num_tagged_fields {
264 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
265 let size: u32 = types::UnsignedVarInt.decode(buf)?;
266 let unknown_value = buf.try_get_bytes(size as usize)?;
267 unknown_tagged_fields.insert(tag as i32, unknown_value);
268 }
269 }
270 Ok(Self {
271 group_id,
272 generation_id_or_member_epoch,
273 member_id,
274 group_instance_id,
275 retention_time_ms,
276 topics,
277 unknown_tagged_fields,
278 })
279 }
280}
281
282impl Default for OffsetCommitRequest {
283 fn default() -> Self {
284 Self {
285 group_id: Default::default(),
286 generation_id_or_member_epoch: -1,
287 member_id: Default::default(),
288 group_instance_id: None,
289 retention_time_ms: -1,
290 topics: Default::default(),
291 unknown_tagged_fields: BTreeMap::new(),
292 }
293 }
294}
295
296impl Message for OffsetCommitRequest {
297 const VERSIONS: VersionRange = VersionRange { min: 2, max: 9 };
298 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
299}
300
301#[non_exhaustive]
303#[derive(Debug, Clone, PartialEq)]
304pub struct OffsetCommitRequestPartition {
305 pub partition_index: i32,
309
310 pub committed_offset: i64,
314
315 pub committed_leader_epoch: i32,
319
320 pub committed_metadata: Option<StrBytes>,
324
325 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
327}
328
329impl OffsetCommitRequestPartition {
330 pub fn with_partition_index(mut self, value: i32) -> Self {
336 self.partition_index = value;
337 self
338 }
339 pub fn with_committed_offset(mut self, value: i64) -> Self {
345 self.committed_offset = value;
346 self
347 }
348 pub fn with_committed_leader_epoch(mut self, value: i32) -> Self {
354 self.committed_leader_epoch = value;
355 self
356 }
357 pub fn with_committed_metadata(mut self, value: Option<StrBytes>) -> Self {
363 self.committed_metadata = value;
364 self
365 }
366 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
368 self.unknown_tagged_fields = value;
369 self
370 }
371 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
373 self.unknown_tagged_fields.insert(key, value);
374 self
375 }
376}
377
378#[cfg(feature = "client")]
379impl Encodable for OffsetCommitRequestPartition {
380 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
381 if version < 2 || version > 9 {
382 bail!("specified version not supported by this message type");
383 }
384 types::Int32.encode(buf, &self.partition_index)?;
385 types::Int64.encode(buf, &self.committed_offset)?;
386 if version >= 6 {
387 types::Int32.encode(buf, &self.committed_leader_epoch)?;
388 }
389 if version >= 8 {
390 types::CompactString.encode(buf, &self.committed_metadata)?;
391 } else {
392 types::String.encode(buf, &self.committed_metadata)?;
393 }
394 if version >= 8 {
395 let num_tagged_fields = self.unknown_tagged_fields.len();
396 if num_tagged_fields > std::u32::MAX as usize {
397 bail!(
398 "Too many tagged fields to encode ({} fields)",
399 num_tagged_fields
400 );
401 }
402 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
403
404 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
405 }
406 Ok(())
407 }
408 fn compute_size(&self, version: i16) -> Result<usize> {
409 let mut total_size = 0;
410 total_size += types::Int32.compute_size(&self.partition_index)?;
411 total_size += types::Int64.compute_size(&self.committed_offset)?;
412 if version >= 6 {
413 total_size += types::Int32.compute_size(&self.committed_leader_epoch)?;
414 }
415 if version >= 8 {
416 total_size += types::CompactString.compute_size(&self.committed_metadata)?;
417 } else {
418 total_size += types::String.compute_size(&self.committed_metadata)?;
419 }
420 if version >= 8 {
421 let num_tagged_fields = self.unknown_tagged_fields.len();
422 if num_tagged_fields > std::u32::MAX as usize {
423 bail!(
424 "Too many tagged fields to encode ({} fields)",
425 num_tagged_fields
426 );
427 }
428 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
429
430 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
431 }
432 Ok(total_size)
433 }
434}
435
436#[cfg(feature = "broker")]
437impl Decodable for OffsetCommitRequestPartition {
438 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
439 if version < 2 || version > 9 {
440 bail!("specified version not supported by this message type");
441 }
442 let partition_index = types::Int32.decode(buf)?;
443 let committed_offset = types::Int64.decode(buf)?;
444 let committed_leader_epoch = if version >= 6 {
445 types::Int32.decode(buf)?
446 } else {
447 -1
448 };
449 let committed_metadata = if version >= 8 {
450 types::CompactString.decode(buf)?
451 } else {
452 types::String.decode(buf)?
453 };
454 let mut unknown_tagged_fields = BTreeMap::new();
455 if version >= 8 {
456 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
457 for _ in 0..num_tagged_fields {
458 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
459 let size: u32 = types::UnsignedVarInt.decode(buf)?;
460 let unknown_value = buf.try_get_bytes(size as usize)?;
461 unknown_tagged_fields.insert(tag as i32, unknown_value);
462 }
463 }
464 Ok(Self {
465 partition_index,
466 committed_offset,
467 committed_leader_epoch,
468 committed_metadata,
469 unknown_tagged_fields,
470 })
471 }
472}
473
474impl Default for OffsetCommitRequestPartition {
475 fn default() -> Self {
476 Self {
477 partition_index: 0,
478 committed_offset: 0,
479 committed_leader_epoch: -1,
480 committed_metadata: Some(Default::default()),
481 unknown_tagged_fields: BTreeMap::new(),
482 }
483 }
484}
485
486impl Message for OffsetCommitRequestPartition {
487 const VERSIONS: VersionRange = VersionRange { min: 2, max: 9 };
488 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
489}
490
491#[non_exhaustive]
493#[derive(Debug, Clone, PartialEq)]
494pub struct OffsetCommitRequestTopic {
495 pub name: super::TopicName,
499
500 pub topic_id: Uuid,
504
505 pub partitions: Vec<OffsetCommitRequestPartition>,
509
510 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
512}
513
514impl OffsetCommitRequestTopic {
515 pub fn with_name(mut self, value: super::TopicName) -> Self {
521 self.name = value;
522 self
523 }
524 pub fn with_topic_id(mut self, value: Uuid) -> Self {
530 self.topic_id = value;
531 self
532 }
533 pub fn with_partitions(mut self, value: Vec<OffsetCommitRequestPartition>) -> Self {
539 self.partitions = value;
540 self
541 }
542 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
544 self.unknown_tagged_fields = value;
545 self
546 }
547 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
549 self.unknown_tagged_fields.insert(key, value);
550 self
551 }
552}
553
554#[cfg(feature = "client")]
555impl Encodable for OffsetCommitRequestTopic {
556 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
557 if version < 2 || version > 9 {
558 bail!("specified version not supported by this message type");
559 }
560 if version >= 8 {
561 types::CompactString.encode(buf, &self.name)?;
562 } else {
563 types::String.encode(buf, &self.name)?;
564 }
565 if version >= 8 {
566 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
567 } else {
568 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
569 }
570 if version >= 8 {
571 let num_tagged_fields = self.unknown_tagged_fields.len();
572 if num_tagged_fields > std::u32::MAX as usize {
573 bail!(
574 "Too many tagged fields to encode ({} fields)",
575 num_tagged_fields
576 );
577 }
578 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
579
580 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
581 }
582 Ok(())
583 }
584 fn compute_size(&self, version: i16) -> Result<usize> {
585 let mut total_size = 0;
586 if version >= 8 {
587 total_size += types::CompactString.compute_size(&self.name)?;
588 } else {
589 total_size += types::String.compute_size(&self.name)?;
590 }
591 if version >= 8 {
592 total_size +=
593 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
594 } else {
595 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
596 }
597 if version >= 8 {
598 let num_tagged_fields = self.unknown_tagged_fields.len();
599 if num_tagged_fields > std::u32::MAX as usize {
600 bail!(
601 "Too many tagged fields to encode ({} fields)",
602 num_tagged_fields
603 );
604 }
605 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
606
607 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
608 }
609 Ok(total_size)
610 }
611}
612
613#[cfg(feature = "broker")]
614impl Decodable for OffsetCommitRequestTopic {
615 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
616 if version < 2 || version > 9 {
617 bail!("specified version not supported by this message type");
618 }
619 let name = if version >= 8 {
620 types::CompactString.decode(buf)?
621 } else {
622 types::String.decode(buf)?
623 };
624 let topic_id = Uuid::nil();
625 let partitions = if version >= 8 {
626 types::CompactArray(types::Struct { version }).decode(buf)?
627 } else {
628 types::Array(types::Struct { version }).decode(buf)?
629 };
630 let mut unknown_tagged_fields = BTreeMap::new();
631 if version >= 8 {
632 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
633 for _ in 0..num_tagged_fields {
634 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
635 let size: u32 = types::UnsignedVarInt.decode(buf)?;
636 let unknown_value = buf.try_get_bytes(size as usize)?;
637 unknown_tagged_fields.insert(tag as i32, unknown_value);
638 }
639 }
640 Ok(Self {
641 name,
642 topic_id,
643 partitions,
644 unknown_tagged_fields,
645 })
646 }
647}
648
649impl Default for OffsetCommitRequestTopic {
650 fn default() -> Self {
651 Self {
652 name: Default::default(),
653 topic_id: Uuid::nil(),
654 partitions: Default::default(),
655 unknown_tagged_fields: BTreeMap::new(),
656 }
657 }
658}
659
660impl Message for OffsetCommitRequestTopic {
661 const VERSIONS: VersionRange = VersionRange { min: 2, max: 9 };
662 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
663}
664
665impl HeaderVersion for OffsetCommitRequest {
666 fn header_version(version: i16) -> i16 {
667 if version >= 8 {
668 2
669 } else {
670 1
671 }
672 }
673}