1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct DescribeGroupsResponse {
24 pub throttle_time_ms: i32,
28
29 pub groups: Vec<DescribedGroup>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl DescribeGroupsResponse {
39 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
45 self.throttle_time_ms = value;
46 self
47 }
48 pub fn with_groups(mut self, value: Vec<DescribedGroup>) -> Self {
54 self.groups = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for DescribeGroupsResponse {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version < 0 || version > 5 {
73 bail!("specified version not supported by this message type");
74 }
75 if version >= 1 {
76 types::Int32.encode(buf, &self.throttle_time_ms)?;
77 }
78 if version >= 5 {
79 types::CompactArray(types::Struct { version }).encode(buf, &self.groups)?;
80 } else {
81 types::Array(types::Struct { version }).encode(buf, &self.groups)?;
82 }
83 if version >= 5 {
84 let num_tagged_fields = self.unknown_tagged_fields.len();
85 if num_tagged_fields > std::u32::MAX as usize {
86 bail!(
87 "Too many tagged fields to encode ({} fields)",
88 num_tagged_fields
89 );
90 }
91 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
92
93 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
94 }
95 Ok(())
96 }
97 fn compute_size(&self, version: i16) -> Result<usize> {
98 let mut total_size = 0;
99 if version >= 1 {
100 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
101 }
102 if version >= 5 {
103 total_size +=
104 types::CompactArray(types::Struct { version }).compute_size(&self.groups)?;
105 } else {
106 total_size += types::Array(types::Struct { version }).compute_size(&self.groups)?;
107 }
108 if version >= 5 {
109 let num_tagged_fields = self.unknown_tagged_fields.len();
110 if num_tagged_fields > std::u32::MAX as usize {
111 bail!(
112 "Too many tagged fields to encode ({} fields)",
113 num_tagged_fields
114 );
115 }
116 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
117
118 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
119 }
120 Ok(total_size)
121 }
122}
123
124#[cfg(feature = "client")]
125impl Decodable for DescribeGroupsResponse {
126 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
127 if version < 0 || version > 5 {
128 bail!("specified version not supported by this message type");
129 }
130 let throttle_time_ms = if version >= 1 {
131 types::Int32.decode(buf)?
132 } else {
133 0
134 };
135 let groups = if version >= 5 {
136 types::CompactArray(types::Struct { version }).decode(buf)?
137 } else {
138 types::Array(types::Struct { version }).decode(buf)?
139 };
140 let mut unknown_tagged_fields = BTreeMap::new();
141 if version >= 5 {
142 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
143 for _ in 0..num_tagged_fields {
144 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
145 let size: u32 = types::UnsignedVarInt.decode(buf)?;
146 let unknown_value = buf.try_get_bytes(size as usize)?;
147 unknown_tagged_fields.insert(tag as i32, unknown_value);
148 }
149 }
150 Ok(Self {
151 throttle_time_ms,
152 groups,
153 unknown_tagged_fields,
154 })
155 }
156}
157
158impl Default for DescribeGroupsResponse {
159 fn default() -> Self {
160 Self {
161 throttle_time_ms: 0,
162 groups: Default::default(),
163 unknown_tagged_fields: BTreeMap::new(),
164 }
165 }
166}
167
168impl Message for DescribeGroupsResponse {
169 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
170 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
171}
172
173#[non_exhaustive]
175#[derive(Debug, Clone, PartialEq)]
176pub struct DescribedGroup {
177 pub error_code: i16,
181
182 pub group_id: super::GroupId,
186
187 pub group_state: StrBytes,
191
192 pub protocol_type: StrBytes,
196
197 pub protocol_data: StrBytes,
201
202 pub members: Vec<DescribedGroupMember>,
206
207 pub authorized_operations: i32,
211
212 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
214}
215
216impl DescribedGroup {
217 pub fn with_error_code(mut self, value: i16) -> Self {
223 self.error_code = value;
224 self
225 }
226 pub fn with_group_id(mut self, value: super::GroupId) -> Self {
232 self.group_id = value;
233 self
234 }
235 pub fn with_group_state(mut self, value: StrBytes) -> Self {
241 self.group_state = value;
242 self
243 }
244 pub fn with_protocol_type(mut self, value: StrBytes) -> Self {
250 self.protocol_type = value;
251 self
252 }
253 pub fn with_protocol_data(mut self, value: StrBytes) -> Self {
259 self.protocol_data = value;
260 self
261 }
262 pub fn with_members(mut self, value: Vec<DescribedGroupMember>) -> Self {
268 self.members = value;
269 self
270 }
271 pub fn with_authorized_operations(mut self, value: i32) -> Self {
277 self.authorized_operations = value;
278 self
279 }
280 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
282 self.unknown_tagged_fields = value;
283 self
284 }
285 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
287 self.unknown_tagged_fields.insert(key, value);
288 self
289 }
290}
291
292#[cfg(feature = "broker")]
293impl Encodable for DescribedGroup {
294 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
295 if version < 0 || version > 5 {
296 bail!("specified version not supported by this message type");
297 }
298 types::Int16.encode(buf, &self.error_code)?;
299 if version >= 5 {
300 types::CompactString.encode(buf, &self.group_id)?;
301 } else {
302 types::String.encode(buf, &self.group_id)?;
303 }
304 if version >= 5 {
305 types::CompactString.encode(buf, &self.group_state)?;
306 } else {
307 types::String.encode(buf, &self.group_state)?;
308 }
309 if version >= 5 {
310 types::CompactString.encode(buf, &self.protocol_type)?;
311 } else {
312 types::String.encode(buf, &self.protocol_type)?;
313 }
314 if version >= 5 {
315 types::CompactString.encode(buf, &self.protocol_data)?;
316 } else {
317 types::String.encode(buf, &self.protocol_data)?;
318 }
319 if version >= 5 {
320 types::CompactArray(types::Struct { version }).encode(buf, &self.members)?;
321 } else {
322 types::Array(types::Struct { version }).encode(buf, &self.members)?;
323 }
324 if version >= 3 {
325 types::Int32.encode(buf, &self.authorized_operations)?;
326 } else {
327 if self.authorized_operations != -2147483648 {
328 bail!("A field is set that is not available on the selected protocol version");
329 }
330 }
331 if version >= 5 {
332 let num_tagged_fields = self.unknown_tagged_fields.len();
333 if num_tagged_fields > std::u32::MAX as usize {
334 bail!(
335 "Too many tagged fields to encode ({} fields)",
336 num_tagged_fields
337 );
338 }
339 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
340
341 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
342 }
343 Ok(())
344 }
345 fn compute_size(&self, version: i16) -> Result<usize> {
346 let mut total_size = 0;
347 total_size += types::Int16.compute_size(&self.error_code)?;
348 if version >= 5 {
349 total_size += types::CompactString.compute_size(&self.group_id)?;
350 } else {
351 total_size += types::String.compute_size(&self.group_id)?;
352 }
353 if version >= 5 {
354 total_size += types::CompactString.compute_size(&self.group_state)?;
355 } else {
356 total_size += types::String.compute_size(&self.group_state)?;
357 }
358 if version >= 5 {
359 total_size += types::CompactString.compute_size(&self.protocol_type)?;
360 } else {
361 total_size += types::String.compute_size(&self.protocol_type)?;
362 }
363 if version >= 5 {
364 total_size += types::CompactString.compute_size(&self.protocol_data)?;
365 } else {
366 total_size += types::String.compute_size(&self.protocol_data)?;
367 }
368 if version >= 5 {
369 total_size +=
370 types::CompactArray(types::Struct { version }).compute_size(&self.members)?;
371 } else {
372 total_size += types::Array(types::Struct { version }).compute_size(&self.members)?;
373 }
374 if version >= 3 {
375 total_size += types::Int32.compute_size(&self.authorized_operations)?;
376 } else {
377 if self.authorized_operations != -2147483648 {
378 bail!("A field is set that is not available on the selected protocol version");
379 }
380 }
381 if version >= 5 {
382 let num_tagged_fields = self.unknown_tagged_fields.len();
383 if num_tagged_fields > std::u32::MAX as usize {
384 bail!(
385 "Too many tagged fields to encode ({} fields)",
386 num_tagged_fields
387 );
388 }
389 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
390
391 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
392 }
393 Ok(total_size)
394 }
395}
396
397#[cfg(feature = "client")]
398impl Decodable for DescribedGroup {
399 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
400 if version < 0 || version > 5 {
401 bail!("specified version not supported by this message type");
402 }
403 let error_code = types::Int16.decode(buf)?;
404 let group_id = if version >= 5 {
405 types::CompactString.decode(buf)?
406 } else {
407 types::String.decode(buf)?
408 };
409 let group_state = if version >= 5 {
410 types::CompactString.decode(buf)?
411 } else {
412 types::String.decode(buf)?
413 };
414 let protocol_type = if version >= 5 {
415 types::CompactString.decode(buf)?
416 } else {
417 types::String.decode(buf)?
418 };
419 let protocol_data = if version >= 5 {
420 types::CompactString.decode(buf)?
421 } else {
422 types::String.decode(buf)?
423 };
424 let members = if version >= 5 {
425 types::CompactArray(types::Struct { version }).decode(buf)?
426 } else {
427 types::Array(types::Struct { version }).decode(buf)?
428 };
429 let authorized_operations = if version >= 3 {
430 types::Int32.decode(buf)?
431 } else {
432 -2147483648
433 };
434 let mut unknown_tagged_fields = BTreeMap::new();
435 if version >= 5 {
436 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
437 for _ in 0..num_tagged_fields {
438 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
439 let size: u32 = types::UnsignedVarInt.decode(buf)?;
440 let unknown_value = buf.try_get_bytes(size as usize)?;
441 unknown_tagged_fields.insert(tag as i32, unknown_value);
442 }
443 }
444 Ok(Self {
445 error_code,
446 group_id,
447 group_state,
448 protocol_type,
449 protocol_data,
450 members,
451 authorized_operations,
452 unknown_tagged_fields,
453 })
454 }
455}
456
457impl Default for DescribedGroup {
458 fn default() -> Self {
459 Self {
460 error_code: 0,
461 group_id: Default::default(),
462 group_state: Default::default(),
463 protocol_type: Default::default(),
464 protocol_data: Default::default(),
465 members: Default::default(),
466 authorized_operations: -2147483648,
467 unknown_tagged_fields: BTreeMap::new(),
468 }
469 }
470}
471
472impl Message for DescribedGroup {
473 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
474 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
475}
476
477#[non_exhaustive]
479#[derive(Debug, Clone, PartialEq)]
480pub struct DescribedGroupMember {
481 pub member_id: StrBytes,
485
486 pub group_instance_id: Option<StrBytes>,
490
491 pub client_id: StrBytes,
495
496 pub client_host: StrBytes,
500
501 pub member_metadata: Bytes,
505
506 pub member_assignment: Bytes,
510
511 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
513}
514
515impl DescribedGroupMember {
516 pub fn with_member_id(mut self, value: StrBytes) -> Self {
522 self.member_id = value;
523 self
524 }
525 pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
531 self.group_instance_id = value;
532 self
533 }
534 pub fn with_client_id(mut self, value: StrBytes) -> Self {
540 self.client_id = value;
541 self
542 }
543 pub fn with_client_host(mut self, value: StrBytes) -> Self {
549 self.client_host = value;
550 self
551 }
552 pub fn with_member_metadata(mut self, value: Bytes) -> Self {
558 self.member_metadata = value;
559 self
560 }
561 pub fn with_member_assignment(mut self, value: Bytes) -> Self {
567 self.member_assignment = value;
568 self
569 }
570 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
572 self.unknown_tagged_fields = value;
573 self
574 }
575 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
577 self.unknown_tagged_fields.insert(key, value);
578 self
579 }
580}
581
582#[cfg(feature = "broker")]
583impl Encodable for DescribedGroupMember {
584 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
585 if version < 0 || version > 5 {
586 bail!("specified version not supported by this message type");
587 }
588 if version >= 5 {
589 types::CompactString.encode(buf, &self.member_id)?;
590 } else {
591 types::String.encode(buf, &self.member_id)?;
592 }
593 if version >= 4 {
594 if version >= 5 {
595 types::CompactString.encode(buf, &self.group_instance_id)?;
596 } else {
597 types::String.encode(buf, &self.group_instance_id)?;
598 }
599 }
600 if version >= 5 {
601 types::CompactString.encode(buf, &self.client_id)?;
602 } else {
603 types::String.encode(buf, &self.client_id)?;
604 }
605 if version >= 5 {
606 types::CompactString.encode(buf, &self.client_host)?;
607 } else {
608 types::String.encode(buf, &self.client_host)?;
609 }
610 if version >= 5 {
611 types::CompactBytes.encode(buf, &self.member_metadata)?;
612 } else {
613 types::Bytes.encode(buf, &self.member_metadata)?;
614 }
615 if version >= 5 {
616 types::CompactBytes.encode(buf, &self.member_assignment)?;
617 } else {
618 types::Bytes.encode(buf, &self.member_assignment)?;
619 }
620 if version >= 5 {
621 let num_tagged_fields = self.unknown_tagged_fields.len();
622 if num_tagged_fields > std::u32::MAX as usize {
623 bail!(
624 "Too many tagged fields to encode ({} fields)",
625 num_tagged_fields
626 );
627 }
628 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
629
630 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
631 }
632 Ok(())
633 }
634 fn compute_size(&self, version: i16) -> Result<usize> {
635 let mut total_size = 0;
636 if version >= 5 {
637 total_size += types::CompactString.compute_size(&self.member_id)?;
638 } else {
639 total_size += types::String.compute_size(&self.member_id)?;
640 }
641 if version >= 4 {
642 if version >= 5 {
643 total_size += types::CompactString.compute_size(&self.group_instance_id)?;
644 } else {
645 total_size += types::String.compute_size(&self.group_instance_id)?;
646 }
647 }
648 if version >= 5 {
649 total_size += types::CompactString.compute_size(&self.client_id)?;
650 } else {
651 total_size += types::String.compute_size(&self.client_id)?;
652 }
653 if version >= 5 {
654 total_size += types::CompactString.compute_size(&self.client_host)?;
655 } else {
656 total_size += types::String.compute_size(&self.client_host)?;
657 }
658 if version >= 5 {
659 total_size += types::CompactBytes.compute_size(&self.member_metadata)?;
660 } else {
661 total_size += types::Bytes.compute_size(&self.member_metadata)?;
662 }
663 if version >= 5 {
664 total_size += types::CompactBytes.compute_size(&self.member_assignment)?;
665 } else {
666 total_size += types::Bytes.compute_size(&self.member_assignment)?;
667 }
668 if version >= 5 {
669 let num_tagged_fields = self.unknown_tagged_fields.len();
670 if num_tagged_fields > std::u32::MAX as usize {
671 bail!(
672 "Too many tagged fields to encode ({} fields)",
673 num_tagged_fields
674 );
675 }
676 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
677
678 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
679 }
680 Ok(total_size)
681 }
682}
683
684#[cfg(feature = "client")]
685impl Decodable for DescribedGroupMember {
686 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
687 if version < 0 || version > 5 {
688 bail!("specified version not supported by this message type");
689 }
690 let member_id = if version >= 5 {
691 types::CompactString.decode(buf)?
692 } else {
693 types::String.decode(buf)?
694 };
695 let group_instance_id = if version >= 4 {
696 if version >= 5 {
697 types::CompactString.decode(buf)?
698 } else {
699 types::String.decode(buf)?
700 }
701 } else {
702 None
703 };
704 let client_id = if version >= 5 {
705 types::CompactString.decode(buf)?
706 } else {
707 types::String.decode(buf)?
708 };
709 let client_host = if version >= 5 {
710 types::CompactString.decode(buf)?
711 } else {
712 types::String.decode(buf)?
713 };
714 let member_metadata = if version >= 5 {
715 types::CompactBytes.decode(buf)?
716 } else {
717 types::Bytes.decode(buf)?
718 };
719 let member_assignment = if version >= 5 {
720 types::CompactBytes.decode(buf)?
721 } else {
722 types::Bytes.decode(buf)?
723 };
724 let mut unknown_tagged_fields = BTreeMap::new();
725 if version >= 5 {
726 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
727 for _ in 0..num_tagged_fields {
728 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
729 let size: u32 = types::UnsignedVarInt.decode(buf)?;
730 let unknown_value = buf.try_get_bytes(size as usize)?;
731 unknown_tagged_fields.insert(tag as i32, unknown_value);
732 }
733 }
734 Ok(Self {
735 member_id,
736 group_instance_id,
737 client_id,
738 client_host,
739 member_metadata,
740 member_assignment,
741 unknown_tagged_fields,
742 })
743 }
744}
745
746impl Default for DescribedGroupMember {
747 fn default() -> Self {
748 Self {
749 member_id: Default::default(),
750 group_instance_id: None,
751 client_id: Default::default(),
752 client_host: Default::default(),
753 member_metadata: Default::default(),
754 member_assignment: Default::default(),
755 unknown_tagged_fields: BTreeMap::new(),
756 }
757 }
758}
759
760impl Message for DescribedGroupMember {
761 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
762 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
763}
764
765impl HeaderVersion for DescribeGroupsResponse {
766 fn header_version(version: i16) -> i16 {
767 if version >= 5 {
768 1
769 } else {
770 0
771 }
772 }
773}