1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct DescribeGroupsResponse {
24 pub throttle_time_ms: i32,
28
29 pub groups: Vec<DescribedGroup>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl DescribeGroupsResponse {
39 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
45 self.throttle_time_ms = value;
46 self
47 }
48 pub fn with_groups(mut self, value: Vec<DescribedGroup>) -> Self {
54 self.groups = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for DescribeGroupsResponse {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version < 0 || version > 6 {
73 bail!("specified version not supported by this message type");
74 }
75 if version >= 1 {
76 types::Int32.encode(buf, &self.throttle_time_ms)?;
77 }
78 if version >= 5 {
79 types::CompactArray(types::Struct { version }).encode(buf, &self.groups)?;
80 } else {
81 types::Array(types::Struct { version }).encode(buf, &self.groups)?;
82 }
83 if version >= 5 {
84 let num_tagged_fields = self.unknown_tagged_fields.len();
85 if num_tagged_fields > std::u32::MAX as usize {
86 bail!(
87 "Too many tagged fields to encode ({} fields)",
88 num_tagged_fields
89 );
90 }
91 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
92
93 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
94 }
95 Ok(())
96 }
97 fn compute_size(&self, version: i16) -> Result<usize> {
98 let mut total_size = 0;
99 if version >= 1 {
100 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
101 }
102 if version >= 5 {
103 total_size +=
104 types::CompactArray(types::Struct { version }).compute_size(&self.groups)?;
105 } else {
106 total_size += types::Array(types::Struct { version }).compute_size(&self.groups)?;
107 }
108 if version >= 5 {
109 let num_tagged_fields = self.unknown_tagged_fields.len();
110 if num_tagged_fields > std::u32::MAX as usize {
111 bail!(
112 "Too many tagged fields to encode ({} fields)",
113 num_tagged_fields
114 );
115 }
116 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
117
118 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
119 }
120 Ok(total_size)
121 }
122}
123
124#[cfg(feature = "client")]
125impl Decodable for DescribeGroupsResponse {
126 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
127 if version < 0 || version > 6 {
128 bail!("specified version not supported by this message type");
129 }
130 let throttle_time_ms = if version >= 1 {
131 types::Int32.decode(buf)?
132 } else {
133 0
134 };
135 let groups = if version >= 5 {
136 types::CompactArray(types::Struct { version }).decode(buf)?
137 } else {
138 types::Array(types::Struct { version }).decode(buf)?
139 };
140 let mut unknown_tagged_fields = BTreeMap::new();
141 if version >= 5 {
142 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
143 for _ in 0..num_tagged_fields {
144 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
145 let size: u32 = types::UnsignedVarInt.decode(buf)?;
146 let unknown_value = buf.try_get_bytes(size as usize)?;
147 unknown_tagged_fields.insert(tag as i32, unknown_value);
148 }
149 }
150 Ok(Self {
151 throttle_time_ms,
152 groups,
153 unknown_tagged_fields,
154 })
155 }
156}
157
158impl Default for DescribeGroupsResponse {
159 fn default() -> Self {
160 Self {
161 throttle_time_ms: 0,
162 groups: Default::default(),
163 unknown_tagged_fields: BTreeMap::new(),
164 }
165 }
166}
167
168impl Message for DescribeGroupsResponse {
169 const VERSIONS: VersionRange = VersionRange { min: 0, max: 6 };
170 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
171}
172
173#[non_exhaustive]
175#[derive(Debug, Clone, PartialEq)]
176pub struct DescribedGroup {
177 pub error_code: i16,
181
182 pub error_message: Option<StrBytes>,
186
187 pub group_id: super::GroupId,
191
192 pub group_state: StrBytes,
196
197 pub protocol_type: StrBytes,
201
202 pub protocol_data: StrBytes,
206
207 pub members: Vec<DescribedGroupMember>,
211
212 pub authorized_operations: i32,
216
217 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
219}
220
221impl DescribedGroup {
222 pub fn with_error_code(mut self, value: i16) -> Self {
228 self.error_code = value;
229 self
230 }
231 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
237 self.error_message = value;
238 self
239 }
240 pub fn with_group_id(mut self, value: super::GroupId) -> Self {
246 self.group_id = value;
247 self
248 }
249 pub fn with_group_state(mut self, value: StrBytes) -> Self {
255 self.group_state = value;
256 self
257 }
258 pub fn with_protocol_type(mut self, value: StrBytes) -> Self {
264 self.protocol_type = value;
265 self
266 }
267 pub fn with_protocol_data(mut self, value: StrBytes) -> Self {
273 self.protocol_data = value;
274 self
275 }
276 pub fn with_members(mut self, value: Vec<DescribedGroupMember>) -> Self {
282 self.members = value;
283 self
284 }
285 pub fn with_authorized_operations(mut self, value: i32) -> Self {
291 self.authorized_operations = value;
292 self
293 }
294 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
296 self.unknown_tagged_fields = value;
297 self
298 }
299 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
301 self.unknown_tagged_fields.insert(key, value);
302 self
303 }
304}
305
306#[cfg(feature = "broker")]
307impl Encodable for DescribedGroup {
308 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
309 if version < 0 || version > 6 {
310 bail!("specified version not supported by this message type");
311 }
312 types::Int16.encode(buf, &self.error_code)?;
313 if version >= 6 {
314 types::CompactString.encode(buf, &self.error_message)?;
315 } else {
316 if !self.error_message.is_none() {
317 bail!("A field is set that is not available on the selected protocol version");
318 }
319 }
320 if version >= 5 {
321 types::CompactString.encode(buf, &self.group_id)?;
322 } else {
323 types::String.encode(buf, &self.group_id)?;
324 }
325 if version >= 5 {
326 types::CompactString.encode(buf, &self.group_state)?;
327 } else {
328 types::String.encode(buf, &self.group_state)?;
329 }
330 if version >= 5 {
331 types::CompactString.encode(buf, &self.protocol_type)?;
332 } else {
333 types::String.encode(buf, &self.protocol_type)?;
334 }
335 if version >= 5 {
336 types::CompactString.encode(buf, &self.protocol_data)?;
337 } else {
338 types::String.encode(buf, &self.protocol_data)?;
339 }
340 if version >= 5 {
341 types::CompactArray(types::Struct { version }).encode(buf, &self.members)?;
342 } else {
343 types::Array(types::Struct { version }).encode(buf, &self.members)?;
344 }
345 if version >= 3 {
346 types::Int32.encode(buf, &self.authorized_operations)?;
347 } else {
348 if self.authorized_operations != -2147483648 {
349 bail!("A field is set that is not available on the selected protocol version");
350 }
351 }
352 if version >= 5 {
353 let num_tagged_fields = self.unknown_tagged_fields.len();
354 if num_tagged_fields > std::u32::MAX as usize {
355 bail!(
356 "Too many tagged fields to encode ({} fields)",
357 num_tagged_fields
358 );
359 }
360 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
361
362 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
363 }
364 Ok(())
365 }
366 fn compute_size(&self, version: i16) -> Result<usize> {
367 let mut total_size = 0;
368 total_size += types::Int16.compute_size(&self.error_code)?;
369 if version >= 6 {
370 total_size += types::CompactString.compute_size(&self.error_message)?;
371 } else {
372 if !self.error_message.is_none() {
373 bail!("A field is set that is not available on the selected protocol version");
374 }
375 }
376 if version >= 5 {
377 total_size += types::CompactString.compute_size(&self.group_id)?;
378 } else {
379 total_size += types::String.compute_size(&self.group_id)?;
380 }
381 if version >= 5 {
382 total_size += types::CompactString.compute_size(&self.group_state)?;
383 } else {
384 total_size += types::String.compute_size(&self.group_state)?;
385 }
386 if version >= 5 {
387 total_size += types::CompactString.compute_size(&self.protocol_type)?;
388 } else {
389 total_size += types::String.compute_size(&self.protocol_type)?;
390 }
391 if version >= 5 {
392 total_size += types::CompactString.compute_size(&self.protocol_data)?;
393 } else {
394 total_size += types::String.compute_size(&self.protocol_data)?;
395 }
396 if version >= 5 {
397 total_size +=
398 types::CompactArray(types::Struct { version }).compute_size(&self.members)?;
399 } else {
400 total_size += types::Array(types::Struct { version }).compute_size(&self.members)?;
401 }
402 if version >= 3 {
403 total_size += types::Int32.compute_size(&self.authorized_operations)?;
404 } else {
405 if self.authorized_operations != -2147483648 {
406 bail!("A field is set that is not available on the selected protocol version");
407 }
408 }
409 if version >= 5 {
410 let num_tagged_fields = self.unknown_tagged_fields.len();
411 if num_tagged_fields > std::u32::MAX as usize {
412 bail!(
413 "Too many tagged fields to encode ({} fields)",
414 num_tagged_fields
415 );
416 }
417 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
418
419 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
420 }
421 Ok(total_size)
422 }
423}
424
425#[cfg(feature = "client")]
426impl Decodable for DescribedGroup {
427 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
428 if version < 0 || version > 6 {
429 bail!("specified version not supported by this message type");
430 }
431 let error_code = types::Int16.decode(buf)?;
432 let error_message = if version >= 6 {
433 types::CompactString.decode(buf)?
434 } else {
435 None
436 };
437 let group_id = if version >= 5 {
438 types::CompactString.decode(buf)?
439 } else {
440 types::String.decode(buf)?
441 };
442 let group_state = if version >= 5 {
443 types::CompactString.decode(buf)?
444 } else {
445 types::String.decode(buf)?
446 };
447 let protocol_type = if version >= 5 {
448 types::CompactString.decode(buf)?
449 } else {
450 types::String.decode(buf)?
451 };
452 let protocol_data = if version >= 5 {
453 types::CompactString.decode(buf)?
454 } else {
455 types::String.decode(buf)?
456 };
457 let members = if version >= 5 {
458 types::CompactArray(types::Struct { version }).decode(buf)?
459 } else {
460 types::Array(types::Struct { version }).decode(buf)?
461 };
462 let authorized_operations = if version >= 3 {
463 types::Int32.decode(buf)?
464 } else {
465 -2147483648
466 };
467 let mut unknown_tagged_fields = BTreeMap::new();
468 if version >= 5 {
469 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
470 for _ in 0..num_tagged_fields {
471 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
472 let size: u32 = types::UnsignedVarInt.decode(buf)?;
473 let unknown_value = buf.try_get_bytes(size as usize)?;
474 unknown_tagged_fields.insert(tag as i32, unknown_value);
475 }
476 }
477 Ok(Self {
478 error_code,
479 error_message,
480 group_id,
481 group_state,
482 protocol_type,
483 protocol_data,
484 members,
485 authorized_operations,
486 unknown_tagged_fields,
487 })
488 }
489}
490
491impl Default for DescribedGroup {
492 fn default() -> Self {
493 Self {
494 error_code: 0,
495 error_message: None,
496 group_id: Default::default(),
497 group_state: Default::default(),
498 protocol_type: Default::default(),
499 protocol_data: Default::default(),
500 members: Default::default(),
501 authorized_operations: -2147483648,
502 unknown_tagged_fields: BTreeMap::new(),
503 }
504 }
505}
506
507impl Message for DescribedGroup {
508 const VERSIONS: VersionRange = VersionRange { min: 0, max: 6 };
509 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
510}
511
512#[non_exhaustive]
514#[derive(Debug, Clone, PartialEq)]
515pub struct DescribedGroupMember {
516 pub member_id: StrBytes,
520
521 pub group_instance_id: Option<StrBytes>,
525
526 pub client_id: StrBytes,
530
531 pub client_host: StrBytes,
535
536 pub member_metadata: Bytes,
540
541 pub member_assignment: Bytes,
545
546 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
548}
549
550impl DescribedGroupMember {
551 pub fn with_member_id(mut self, value: StrBytes) -> Self {
557 self.member_id = value;
558 self
559 }
560 pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
566 self.group_instance_id = value;
567 self
568 }
569 pub fn with_client_id(mut self, value: StrBytes) -> Self {
575 self.client_id = value;
576 self
577 }
578 pub fn with_client_host(mut self, value: StrBytes) -> Self {
584 self.client_host = value;
585 self
586 }
587 pub fn with_member_metadata(mut self, value: Bytes) -> Self {
593 self.member_metadata = value;
594 self
595 }
596 pub fn with_member_assignment(mut self, value: Bytes) -> Self {
602 self.member_assignment = value;
603 self
604 }
605 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
607 self.unknown_tagged_fields = value;
608 self
609 }
610 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
612 self.unknown_tagged_fields.insert(key, value);
613 self
614 }
615}
616
617#[cfg(feature = "broker")]
618impl Encodable for DescribedGroupMember {
619 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
620 if version < 0 || version > 6 {
621 bail!("specified version not supported by this message type");
622 }
623 if version >= 5 {
624 types::CompactString.encode(buf, &self.member_id)?;
625 } else {
626 types::String.encode(buf, &self.member_id)?;
627 }
628 if version >= 4 {
629 if version >= 5 {
630 types::CompactString.encode(buf, &self.group_instance_id)?;
631 } else {
632 types::String.encode(buf, &self.group_instance_id)?;
633 }
634 }
635 if version >= 5 {
636 types::CompactString.encode(buf, &self.client_id)?;
637 } else {
638 types::String.encode(buf, &self.client_id)?;
639 }
640 if version >= 5 {
641 types::CompactString.encode(buf, &self.client_host)?;
642 } else {
643 types::String.encode(buf, &self.client_host)?;
644 }
645 if version >= 5 {
646 types::CompactBytes.encode(buf, &self.member_metadata)?;
647 } else {
648 types::Bytes.encode(buf, &self.member_metadata)?;
649 }
650 if version >= 5 {
651 types::CompactBytes.encode(buf, &self.member_assignment)?;
652 } else {
653 types::Bytes.encode(buf, &self.member_assignment)?;
654 }
655 if version >= 5 {
656 let num_tagged_fields = self.unknown_tagged_fields.len();
657 if num_tagged_fields > std::u32::MAX as usize {
658 bail!(
659 "Too many tagged fields to encode ({} fields)",
660 num_tagged_fields
661 );
662 }
663 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
664
665 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
666 }
667 Ok(())
668 }
669 fn compute_size(&self, version: i16) -> Result<usize> {
670 let mut total_size = 0;
671 if version >= 5 {
672 total_size += types::CompactString.compute_size(&self.member_id)?;
673 } else {
674 total_size += types::String.compute_size(&self.member_id)?;
675 }
676 if version >= 4 {
677 if version >= 5 {
678 total_size += types::CompactString.compute_size(&self.group_instance_id)?;
679 } else {
680 total_size += types::String.compute_size(&self.group_instance_id)?;
681 }
682 }
683 if version >= 5 {
684 total_size += types::CompactString.compute_size(&self.client_id)?;
685 } else {
686 total_size += types::String.compute_size(&self.client_id)?;
687 }
688 if version >= 5 {
689 total_size += types::CompactString.compute_size(&self.client_host)?;
690 } else {
691 total_size += types::String.compute_size(&self.client_host)?;
692 }
693 if version >= 5 {
694 total_size += types::CompactBytes.compute_size(&self.member_metadata)?;
695 } else {
696 total_size += types::Bytes.compute_size(&self.member_metadata)?;
697 }
698 if version >= 5 {
699 total_size += types::CompactBytes.compute_size(&self.member_assignment)?;
700 } else {
701 total_size += types::Bytes.compute_size(&self.member_assignment)?;
702 }
703 if version >= 5 {
704 let num_tagged_fields = self.unknown_tagged_fields.len();
705 if num_tagged_fields > std::u32::MAX as usize {
706 bail!(
707 "Too many tagged fields to encode ({} fields)",
708 num_tagged_fields
709 );
710 }
711 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
712
713 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
714 }
715 Ok(total_size)
716 }
717}
718
719#[cfg(feature = "client")]
720impl Decodable for DescribedGroupMember {
721 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
722 if version < 0 || version > 6 {
723 bail!("specified version not supported by this message type");
724 }
725 let member_id = if version >= 5 {
726 types::CompactString.decode(buf)?
727 } else {
728 types::String.decode(buf)?
729 };
730 let group_instance_id = if version >= 4 {
731 if version >= 5 {
732 types::CompactString.decode(buf)?
733 } else {
734 types::String.decode(buf)?
735 }
736 } else {
737 None
738 };
739 let client_id = if version >= 5 {
740 types::CompactString.decode(buf)?
741 } else {
742 types::String.decode(buf)?
743 };
744 let client_host = if version >= 5 {
745 types::CompactString.decode(buf)?
746 } else {
747 types::String.decode(buf)?
748 };
749 let member_metadata = if version >= 5 {
750 types::CompactBytes.decode(buf)?
751 } else {
752 types::Bytes.decode(buf)?
753 };
754 let member_assignment = if version >= 5 {
755 types::CompactBytes.decode(buf)?
756 } else {
757 types::Bytes.decode(buf)?
758 };
759 let mut unknown_tagged_fields = BTreeMap::new();
760 if version >= 5 {
761 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
762 for _ in 0..num_tagged_fields {
763 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
764 let size: u32 = types::UnsignedVarInt.decode(buf)?;
765 let unknown_value = buf.try_get_bytes(size as usize)?;
766 unknown_tagged_fields.insert(tag as i32, unknown_value);
767 }
768 }
769 Ok(Self {
770 member_id,
771 group_instance_id,
772 client_id,
773 client_host,
774 member_metadata,
775 member_assignment,
776 unknown_tagged_fields,
777 })
778 }
779}
780
781impl Default for DescribedGroupMember {
782 fn default() -> Self {
783 Self {
784 member_id: Default::default(),
785 group_instance_id: None,
786 client_id: Default::default(),
787 client_host: Default::default(),
788 member_metadata: Default::default(),
789 member_assignment: Default::default(),
790 unknown_tagged_fields: BTreeMap::new(),
791 }
792 }
793}
794
795impl Message for DescribedGroupMember {
796 const VERSIONS: VersionRange = VersionRange { min: 0, max: 6 };
797 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
798}
799
800impl HeaderVersion for DescribeGroupsResponse {
801 fn header_version(version: i16) -> i16 {
802 if version >= 5 {
803 1
804 } else {
805 0
806 }
807 }
808}