1use bytes::BufMut;
4
5use crate::primitives::fixed::{get_i8, get_i16, get_i32, put_i8, put_i16, put_i32};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, nullable_string_len,
8 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
9};
10use crate::primitives::string_bytes_borrowed::{
11 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
12 get_nullable_string_borrowed, get_string_borrowed,
13};
14use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
15use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
16
17pub const API_KEY: i16 = 69;
18pub const MIN_VERSION: i16 = 0;
19pub const MAX_VERSION: i16 = 1;
20pub const FLEXIBLE_MIN: i16 = 0;
21
22#[inline]
23fn is_flexible(version: i16) -> bool {
24 version >= FLEXIBLE_MIN
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Default)]
28pub struct ConsumerGroupDescribeResponse<'a> {
29 pub throttle_time_ms: i32,
30 pub groups: Vec<DescribedGroup<'a>>,
31 pub unknown_tagged_fields: UnknownTaggedFields,
32}
33impl ConsumerGroupDescribeResponse<'_> {
34 pub fn to_owned(
35 &self,
36 ) -> crate::owned::consumer_group_describe_response::ConsumerGroupDescribeResponse {
37 crate::owned::consumer_group_describe_response::ConsumerGroupDescribeResponse {
38 throttle_time_ms: (self.throttle_time_ms),
39 groups: (self.groups).iter().map(DescribedGroup::to_owned).collect(),
40 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
41 }
42 }
43}
44impl Encode for ConsumerGroupDescribeResponse<'_> {
45 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
46 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
47 return Err(ProtocolError::UnsupportedVersion {
48 api_key: API_KEY,
49 version,
50 });
51 }
52 let flex = is_flexible(version);
53 if version >= 0 {
54 put_i32(buf, self.throttle_time_ms);
55 }
56 if version >= 0 {
57 {
58 crate::primitives::array::put_array_len(buf, (self.groups).len(), flex);
59 for it in &self.groups {
60 it.encode(buf, version)?;
61 }
62 }
63 }
64 if flex {
65 let tagged = WriteTaggedFields::new();
66 tagged.write(buf, &self.unknown_tagged_fields);
67 }
68 Ok(())
69 }
70 fn encoded_len(&self, version: i16) -> usize {
71 let flex = is_flexible(version);
72 let mut n: usize = 0;
73 if version >= 0 {
74 n += 4;
75 }
76 if version >= 0 {
77 n += {
78 let prefix =
79 crate::primitives::array::array_len_prefix_len((self.groups).len(), flex);
80 let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum();
81 prefix + body
82 };
83 }
84 if flex {
85 let known_pairs: Vec<(u32, usize)> = Vec::new();
86 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
87 }
88 n
89 }
90}
91impl<'de> DecodeBorrow<'de> for ConsumerGroupDescribeResponse<'de> {
92 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
93 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
94 return Err(ProtocolError::UnsupportedVersion {
95 api_key: API_KEY,
96 version,
97 });
98 }
99 let flex = is_flexible(version);
100 let mut out = Self::default();
101 if version >= 0 {
102 out.throttle_time_ms = get_i32(buf)?;
103 }
104 if version >= 0 {
105 out.groups = {
106 let n = crate::primitives::array::get_array_len(buf, flex)?;
107 let mut v = Vec::with_capacity(n);
108 for _ in 0..n {
109 v.push(DescribedGroup::decode_borrow(buf, version)?);
110 }
111 v
112 };
113 }
114 if flex {
115 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
116 }
117 Ok(out)
118 }
119}
120#[cfg(test)]
121impl ConsumerGroupDescribeResponse<'_> {
122 #[must_use]
123 pub fn populated(version: i16) -> Self {
124 let mut m = Self::default();
125 if version >= 0 {
126 m.throttle_time_ms = 1i32;
127 }
128 if version >= 0 {
129 m.groups = vec![DescribedGroup::populated(version)];
130 }
131 m
132 }
133}
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct DescribedGroup<'a> {
136 pub error_code: i16,
137 pub error_message: Option<&'a str>,
138 pub group_id: &'a str,
139 pub group_state: &'a str,
140 pub group_epoch: i32,
141 pub assignment_epoch: i32,
142 pub assignor_name: &'a str,
143 pub members: Vec<Member<'a>>,
144 pub authorized_operations: i32,
145 pub unknown_tagged_fields: UnknownTaggedFields,
146}
147impl Default for DescribedGroup<'_> {
148 fn default() -> Self {
149 Self {
150 error_code: 0i16,
151 error_message: None,
152 group_id: "",
153 group_state: "",
154 group_epoch: 0i32,
155 assignment_epoch: 0i32,
156 assignor_name: "",
157 members: Vec::new(),
158 authorized_operations: -2_147_483_648i32,
159 unknown_tagged_fields: Default::default(),
160 }
161 }
162}
163impl DescribedGroup<'_> {
164 pub fn to_owned(&self) -> crate::owned::consumer_group_describe_response::DescribedGroup {
165 crate::owned::consumer_group_describe_response::DescribedGroup {
166 error_code: (self.error_code),
167 error_message: (self.error_message).map(std::string::ToString::to_string),
168 group_id: (self.group_id).to_string(),
169 group_state: (self.group_state).to_string(),
170 group_epoch: (self.group_epoch),
171 assignment_epoch: (self.assignment_epoch),
172 assignor_name: (self.assignor_name).to_string(),
173 members: (self.members).iter().map(Member::to_owned).collect(),
174 authorized_operations: (self.authorized_operations),
175 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
176 }
177 }
178}
179impl Encode for DescribedGroup<'_> {
180 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
181 let flex = version >= 0;
182 if version >= 0 {
183 put_i16(buf, self.error_code);
184 }
185 if version >= 0 {
186 if flex {
187 put_compact_nullable_string(buf, self.error_message);
188 } else {
189 put_nullable_string(buf, self.error_message);
190 }
191 }
192 if version >= 0 {
193 if flex {
194 put_compact_string(buf, self.group_id);
195 } else {
196 put_string(buf, self.group_id);
197 }
198 }
199 if version >= 0 {
200 if flex {
201 put_compact_string(buf, self.group_state);
202 } else {
203 put_string(buf, self.group_state);
204 }
205 }
206 if version >= 0 {
207 put_i32(buf, self.group_epoch);
208 }
209 if version >= 0 {
210 put_i32(buf, self.assignment_epoch);
211 }
212 if version >= 0 {
213 if flex {
214 put_compact_string(buf, self.assignor_name);
215 } else {
216 put_string(buf, self.assignor_name);
217 }
218 }
219 if version >= 0 {
220 {
221 crate::primitives::array::put_array_len(buf, (self.members).len(), flex);
222 for it in &self.members {
223 it.encode(buf, version)?;
224 }
225 }
226 }
227 if version >= 0 {
228 put_i32(buf, self.authorized_operations);
229 }
230 if flex {
231 let tagged = WriteTaggedFields::new();
232 tagged.write(buf, &self.unknown_tagged_fields);
233 }
234 Ok(())
235 }
236 fn encoded_len(&self, version: i16) -> usize {
237 let flex = version >= 0;
238 let mut n: usize = 0;
239 if version >= 0 {
240 n += 2;
241 }
242 if version >= 0 {
243 n += if flex {
244 compact_nullable_string_len(self.error_message)
245 } else {
246 nullable_string_len(self.error_message)
247 };
248 }
249 if version >= 0 {
250 n += if flex {
251 compact_string_len(self.group_id)
252 } else {
253 string_len(self.group_id)
254 };
255 }
256 if version >= 0 {
257 n += if flex {
258 compact_string_len(self.group_state)
259 } else {
260 string_len(self.group_state)
261 };
262 }
263 if version >= 0 {
264 n += 4;
265 }
266 if version >= 0 {
267 n += 4;
268 }
269 if version >= 0 {
270 n += if flex {
271 compact_string_len(self.assignor_name)
272 } else {
273 string_len(self.assignor_name)
274 };
275 }
276 if version >= 0 {
277 n += {
278 let prefix =
279 crate::primitives::array::array_len_prefix_len((self.members).len(), flex);
280 let body: usize = (self.members)
281 .iter()
282 .map(|it| it.encoded_len(version))
283 .sum();
284 prefix + body
285 };
286 }
287 if version >= 0 {
288 n += 4;
289 }
290 if flex {
291 let known_pairs: Vec<(u32, usize)> = Vec::new();
292 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
293 }
294 n
295 }
296}
297impl<'de> DecodeBorrow<'de> for DescribedGroup<'de> {
298 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
299 let flex = version >= 0;
300 let mut out = Self::default();
301 if version >= 0 {
302 out.error_code = get_i16(buf)?;
303 }
304 if version >= 0 {
305 out.error_message = if flex {
306 get_compact_nullable_string_borrowed(buf)?
307 } else {
308 get_nullable_string_borrowed(buf)?
309 };
310 }
311 if version >= 0 {
312 out.group_id = if flex {
313 get_compact_string_borrowed(buf)?
314 } else {
315 get_string_borrowed(buf)?
316 };
317 }
318 if version >= 0 {
319 out.group_state = if flex {
320 get_compact_string_borrowed(buf)?
321 } else {
322 get_string_borrowed(buf)?
323 };
324 }
325 if version >= 0 {
326 out.group_epoch = get_i32(buf)?;
327 }
328 if version >= 0 {
329 out.assignment_epoch = get_i32(buf)?;
330 }
331 if version >= 0 {
332 out.assignor_name = if flex {
333 get_compact_string_borrowed(buf)?
334 } else {
335 get_string_borrowed(buf)?
336 };
337 }
338 if version >= 0 {
339 out.members = {
340 let n = crate::primitives::array::get_array_len(buf, flex)?;
341 let mut v = Vec::with_capacity(n);
342 for _ in 0..n {
343 v.push(Member::decode_borrow(buf, version)?);
344 }
345 v
346 };
347 }
348 if version >= 0 {
349 out.authorized_operations = get_i32(buf)?;
350 }
351 if flex {
352 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
353 }
354 Ok(out)
355 }
356}
357#[cfg(test)]
358impl DescribedGroup<'_> {
359 #[must_use]
360 pub fn populated(version: i16) -> Self {
361 let mut m = Self::default();
362 if version >= 0 {
363 m.error_code = 1i16;
364 }
365 if version >= 0 {
366 m.error_message = Some("x");
367 }
368 if version >= 0 {
369 m.group_id = "x";
370 }
371 if version >= 0 {
372 m.group_state = "x";
373 }
374 if version >= 0 {
375 m.group_epoch = 1i32;
376 }
377 if version >= 0 {
378 m.assignment_epoch = 1i32;
379 }
380 if version >= 0 {
381 m.assignor_name = "x";
382 }
383 if version >= 0 {
384 m.members = vec![Member::populated(version)];
385 }
386 if version >= 0 {
387 m.authorized_operations = 1i32;
388 }
389 m
390 }
391}
392#[derive(Debug, Clone, PartialEq, Eq)]
393pub struct Member<'a> {
394 pub member_id: &'a str,
395 pub instance_id: Option<&'a str>,
396 pub rack_id: Option<&'a str>,
397 pub member_epoch: i32,
398 pub client_id: &'a str,
399 pub client_host: &'a str,
400 pub subscribed_topic_names: Vec<&'a str>,
401 pub subscribed_topic_regex: Option<&'a str>,
402 pub assignment: super::common::consumer_group_describe_response::assignment::Assignment<'a>,
403 pub target_assignment:
404 super::common::consumer_group_describe_response::assignment::Assignment<'a>,
405 pub member_type: i8,
406 pub unknown_tagged_fields: UnknownTaggedFields,
407}
408impl Default for Member<'_> {
409 fn default() -> Self {
410 Self {
411 member_id: "",
412 instance_id: None,
413 rack_id: None,
414 member_epoch: 0i32,
415 client_id: "",
416 client_host: "",
417 subscribed_topic_names: Vec::new(),
418 subscribed_topic_regex: None,
419 assignment: Default::default(),
420 target_assignment: Default::default(),
421 member_type: -1i8,
422 unknown_tagged_fields: Default::default(),
423 }
424 }
425}
426impl Member<'_> {
427 pub fn to_owned(&self) -> crate::owned::consumer_group_describe_response::Member {
428 crate::owned::consumer_group_describe_response::Member {
429 member_id: (self.member_id).to_string(),
430 instance_id: (self.instance_id).map(std::string::ToString::to_string),
431 rack_id: (self.rack_id).map(std::string::ToString::to_string),
432 member_epoch: (self.member_epoch),
433 client_id: (self.client_id).to_string(),
434 client_host: (self.client_host).to_string(),
435 subscribed_topic_names: (self.subscribed_topic_names)
436 .iter()
437 .map(std::string::ToString::to_string)
438 .collect(),
439 subscribed_topic_regex: (self.subscribed_topic_regex)
440 .map(std::string::ToString::to_string),
441 assignment: (self.assignment).to_owned(),
442 target_assignment: (self.target_assignment).to_owned(),
443 member_type: (self.member_type),
444 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
445 }
446 }
447}
448impl Encode for Member<'_> {
449 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
450 let flex = version >= 0;
451 if version >= 0 {
452 if flex {
453 put_compact_string(buf, self.member_id);
454 } else {
455 put_string(buf, self.member_id);
456 }
457 }
458 if version >= 0 {
459 if flex {
460 put_compact_nullable_string(buf, self.instance_id);
461 } else {
462 put_nullable_string(buf, self.instance_id);
463 }
464 }
465 if version >= 0 {
466 if flex {
467 put_compact_nullable_string(buf, self.rack_id);
468 } else {
469 put_nullable_string(buf, self.rack_id);
470 }
471 }
472 if version >= 0 {
473 put_i32(buf, self.member_epoch);
474 }
475 if version >= 0 {
476 if flex {
477 put_compact_string(buf, self.client_id);
478 } else {
479 put_string(buf, self.client_id);
480 }
481 }
482 if version >= 0 {
483 if flex {
484 put_compact_string(buf, self.client_host);
485 } else {
486 put_string(buf, self.client_host);
487 }
488 }
489 if version >= 0 {
490 {
491 crate::primitives::array::put_array_len(
492 buf,
493 (self.subscribed_topic_names).len(),
494 flex,
495 );
496 for it in &self.subscribed_topic_names {
497 if flex {
498 put_compact_string(buf, it);
499 } else {
500 put_string(buf, it);
501 }
502 }
503 }
504 }
505 if version >= 0 {
506 if flex {
507 put_compact_nullable_string(buf, self.subscribed_topic_regex);
508 } else {
509 put_nullable_string(buf, self.subscribed_topic_regex);
510 }
511 }
512 if version >= 0 {
513 self.assignment.encode(buf, version)?;
514 }
515 if version >= 0 {
516 self.target_assignment.encode(buf, version)?;
517 }
518 if version >= 1 {
519 put_i8(buf, self.member_type);
520 }
521 if flex {
522 let tagged = WriteTaggedFields::new();
523 tagged.write(buf, &self.unknown_tagged_fields);
524 }
525 Ok(())
526 }
527 fn encoded_len(&self, version: i16) -> usize {
528 let flex = version >= 0;
529 let mut n: usize = 0;
530 if version >= 0 {
531 n += if flex {
532 compact_string_len(self.member_id)
533 } else {
534 string_len(self.member_id)
535 };
536 }
537 if version >= 0 {
538 n += if flex {
539 compact_nullable_string_len(self.instance_id)
540 } else {
541 nullable_string_len(self.instance_id)
542 };
543 }
544 if version >= 0 {
545 n += if flex {
546 compact_nullable_string_len(self.rack_id)
547 } else {
548 nullable_string_len(self.rack_id)
549 };
550 }
551 if version >= 0 {
552 n += 4;
553 }
554 if version >= 0 {
555 n += if flex {
556 compact_string_len(self.client_id)
557 } else {
558 string_len(self.client_id)
559 };
560 }
561 if version >= 0 {
562 n += if flex {
563 compact_string_len(self.client_host)
564 } else {
565 string_len(self.client_host)
566 };
567 }
568 if version >= 0 {
569 n += {
570 let prefix = crate::primitives::array::array_len_prefix_len(
571 (self.subscribed_topic_names).len(),
572 flex,
573 );
574 let body: usize = (self.subscribed_topic_names)
575 .iter()
576 .map(|it| {
577 if flex {
578 compact_string_len(it)
579 } else {
580 string_len(it)
581 }
582 })
583 .sum();
584 prefix + body
585 };
586 }
587 if version >= 0 {
588 n += if flex {
589 compact_nullable_string_len(self.subscribed_topic_regex)
590 } else {
591 nullable_string_len(self.subscribed_topic_regex)
592 };
593 }
594 if version >= 0 {
595 n += self.assignment.encoded_len(version);
596 }
597 if version >= 0 {
598 n += self.target_assignment.encoded_len(version);
599 }
600 if version >= 1 {
601 n += 1;
602 }
603 if flex {
604 let known_pairs: Vec<(u32, usize)> = Vec::new();
605 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
606 }
607 n
608 }
609}
610impl<'de> DecodeBorrow<'de> for Member<'de> {
611 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
612 let flex = version >= 0;
613 let mut out = Self::default();
614 if version >= 0 {
615 out.member_id = if flex {
616 get_compact_string_borrowed(buf)?
617 } else {
618 get_string_borrowed(buf)?
619 };
620 }
621 if version >= 0 {
622 out.instance_id = if flex {
623 get_compact_nullable_string_borrowed(buf)?
624 } else {
625 get_nullable_string_borrowed(buf)?
626 };
627 }
628 if version >= 0 {
629 out.rack_id = if flex {
630 get_compact_nullable_string_borrowed(buf)?
631 } else {
632 get_nullable_string_borrowed(buf)?
633 };
634 }
635 if version >= 0 {
636 out.member_epoch = get_i32(buf)?;
637 }
638 if version >= 0 {
639 out.client_id = if flex {
640 get_compact_string_borrowed(buf)?
641 } else {
642 get_string_borrowed(buf)?
643 };
644 }
645 if version >= 0 {
646 out.client_host = if flex {
647 get_compact_string_borrowed(buf)?
648 } else {
649 get_string_borrowed(buf)?
650 };
651 }
652 if version >= 0 {
653 out.subscribed_topic_names = {
654 let n = crate::primitives::array::get_array_len(buf, flex)?;
655 let mut v = Vec::with_capacity(n);
656 for _ in 0..n {
657 v.push(if flex {
658 get_compact_string_borrowed(buf)?
659 } else {
660 get_string_borrowed(buf)?
661 });
662 }
663 v
664 };
665 }
666 if version >= 0 {
667 out.subscribed_topic_regex = if flex {
668 get_compact_nullable_string_borrowed(buf)?
669 } else {
670 get_nullable_string_borrowed(buf)?
671 };
672 }
673 if version >= 0 {
674 out . assignment = super :: common :: consumer_group_describe_response :: assignment :: Assignment :: decode_borrow (buf , version) ? ;
675 }
676 if version >= 0 {
677 out . target_assignment = super :: common :: consumer_group_describe_response :: assignment :: Assignment :: decode_borrow (buf , version) ? ;
678 }
679 if version >= 1 {
680 out.member_type = get_i8(buf)?;
681 }
682 if flex {
683 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
684 }
685 Ok(out)
686 }
687}
688#[cfg(test)]
689impl Member<'_> {
690 #[must_use]
691 pub fn populated(version: i16) -> Self {
692 let mut m = Self::default();
693 if version >= 0 {
694 m.member_id = "x";
695 }
696 if version >= 0 {
697 m.instance_id = Some("x");
698 }
699 if version >= 0 {
700 m.rack_id = Some("x");
701 }
702 if version >= 0 {
703 m.member_epoch = 1i32;
704 }
705 if version >= 0 {
706 m.client_id = "x";
707 }
708 if version >= 0 {
709 m.client_host = "x";
710 }
711 if version >= 0 {
712 m.subscribed_topic_names = vec!["x"];
713 }
714 if version >= 0 {
715 m.subscribed_topic_regex = Some("x");
716 }
717 if version >= 0 {
718 m.assignment =
719 super::common::consumer_group_describe_response::assignment::Assignment::populated(
720 version,
721 );
722 }
723 if version >= 0 {
724 m.target_assignment =
725 super::common::consumer_group_describe_response::assignment::Assignment::populated(
726 version,
727 );
728 }
729 if version >= 1 {
730 m.member_type = 1i8;
731 }
732 m
733 }
734}