1use crate::primitives::fixed::{get_i8, get_i16, get_i32, put_i8, put_i16, put_i32};
3use crate::primitives::string_bytes::{
4 compact_nullable_string_len, compact_string_len, nullable_string_len,
5 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
6};
7use crate::primitives::string_bytes_borrowed::{
8 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
9 get_nullable_string_borrowed, get_string_borrowed,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
13use bytes::BufMut;
14pub const API_KEY: i16 = 69;
15pub const MIN_VERSION: i16 = 0;
16pub const MAX_VERSION: i16 = 1;
17pub const FLEXIBLE_MIN: i16 = 0;
18#[inline]
19fn is_flexible(version: i16) -> bool {
20 version >= FLEXIBLE_MIN
21}
22#[derive(Debug, Clone, PartialEq, Eq, Default)]
23pub struct ConsumerGroupDescribeResponse<'a> {
24 pub throttle_time_ms: i32,
25 pub groups: Vec<DescribedGroup<'a>>,
26 pub unknown_tagged_fields: UnknownTaggedFields,
27}
28impl ConsumerGroupDescribeResponse<'_> {
29 pub fn to_owned(
30 &self,
31 ) -> crate::owned::consumer_group_describe_response::ConsumerGroupDescribeResponse {
32 crate::owned::consumer_group_describe_response::ConsumerGroupDescribeResponse {
33 throttle_time_ms: (self.throttle_time_ms),
34 groups: (self.groups).iter().map(DescribedGroup::to_owned).collect(),
35 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
36 }
37 }
38}
39impl Encode for ConsumerGroupDescribeResponse<'_> {
40 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
41 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
42 return Err(ProtocolError::UnsupportedVersion {
43 api_key: API_KEY,
44 version,
45 });
46 }
47 let flex = is_flexible(version);
48 if version >= 0 {
49 put_i32(buf, self.throttle_time_ms);
50 }
51 if version >= 0 {
52 {
53 crate::primitives::array::put_array_len(buf, (self.groups).len(), flex);
54 for it in &self.groups {
55 it.encode(buf, version)?;
56 }
57 }
58 }
59 if flex {
60 let tagged = WriteTaggedFields::new();
61 tagged.write(buf, &self.unknown_tagged_fields);
62 }
63 Ok(())
64 }
65 fn encoded_len(&self, version: i16) -> usize {
66 let flex = is_flexible(version);
67 let mut n: usize = 0;
68 if version >= 0 {
69 n += 4;
70 }
71 if version >= 0 {
72 n += {
73 let prefix =
74 crate::primitives::array::array_len_prefix_len((self.groups).len(), flex);
75 let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum();
76 prefix + body
77 };
78 }
79 if flex {
80 let known_pairs: Vec<(u32, usize)> = Vec::new();
81 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
82 }
83 n
84 }
85}
86impl<'de> DecodeBorrow<'de> for ConsumerGroupDescribeResponse<'de> {
87 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
88 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
89 return Err(ProtocolError::UnsupportedVersion {
90 api_key: API_KEY,
91 version,
92 });
93 }
94 let flex = is_flexible(version);
95 let mut out = Self::default();
96 if version >= 0 {
97 out.throttle_time_ms = get_i32(buf)?;
98 }
99 if version >= 0 {
100 out.groups = {
101 let n = crate::primitives::array::get_array_len(buf, flex)?;
102 let mut v = Vec::with_capacity(n);
103 for _ in 0..n {
104 v.push(DescribedGroup::decode_borrow(buf, version)?);
105 }
106 v
107 };
108 }
109 if flex {
110 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
111 }
112 Ok(out)
113 }
114}
115#[cfg(test)]
116impl ConsumerGroupDescribeResponse<'_> {
117 #[must_use]
118 pub fn populated(version: i16) -> Self {
119 let mut m = Self::default();
120 if version >= 0 {
121 m.throttle_time_ms = 1i32;
122 }
123 if version >= 0 {
124 m.groups = vec![DescribedGroup::populated(version)];
125 }
126 m
127 }
128}
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub struct DescribedGroup<'a> {
131 pub error_code: i16,
132 pub error_message: Option<&'a str>,
133 pub group_id: &'a str,
134 pub group_state: &'a str,
135 pub group_epoch: i32,
136 pub assignment_epoch: i32,
137 pub assignor_name: &'a str,
138 pub members: Vec<Member<'a>>,
139 pub authorized_operations: i32,
140 pub unknown_tagged_fields: UnknownTaggedFields,
141}
142impl Default for DescribedGroup<'_> {
143 fn default() -> Self {
144 Self {
145 error_code: 0i16,
146 error_message: None,
147 group_id: "",
148 group_state: "",
149 group_epoch: 0i32,
150 assignment_epoch: 0i32,
151 assignor_name: "",
152 members: Vec::new(),
153 authorized_operations: -2_147_483_648i32,
154 unknown_tagged_fields: Default::default(),
155 }
156 }
157}
158impl DescribedGroup<'_> {
159 pub fn to_owned(&self) -> crate::owned::consumer_group_describe_response::DescribedGroup {
160 crate::owned::consumer_group_describe_response::DescribedGroup {
161 error_code: (self.error_code),
162 error_message: (self.error_message).map(std::string::ToString::to_string),
163 group_id: (self.group_id).to_string(),
164 group_state: (self.group_state).to_string(),
165 group_epoch: (self.group_epoch),
166 assignment_epoch: (self.assignment_epoch),
167 assignor_name: (self.assignor_name).to_string(),
168 members: (self.members).iter().map(Member::to_owned).collect(),
169 authorized_operations: (self.authorized_operations),
170 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
171 }
172 }
173}
174impl Encode for DescribedGroup<'_> {
175 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
176 let flex = version >= 0;
177 if version >= 0 {
178 put_i16(buf, self.error_code);
179 }
180 if version >= 0 {
181 if flex {
182 put_compact_nullable_string(buf, self.error_message);
183 } else {
184 put_nullable_string(buf, self.error_message);
185 }
186 }
187 if version >= 0 {
188 if flex {
189 put_compact_string(buf, self.group_id);
190 } else {
191 put_string(buf, self.group_id);
192 }
193 }
194 if version >= 0 {
195 if flex {
196 put_compact_string(buf, self.group_state);
197 } else {
198 put_string(buf, self.group_state);
199 }
200 }
201 if version >= 0 {
202 put_i32(buf, self.group_epoch);
203 }
204 if version >= 0 {
205 put_i32(buf, self.assignment_epoch);
206 }
207 if version >= 0 {
208 if flex {
209 put_compact_string(buf, self.assignor_name);
210 } else {
211 put_string(buf, self.assignor_name);
212 }
213 }
214 if version >= 0 {
215 {
216 crate::primitives::array::put_array_len(buf, (self.members).len(), flex);
217 for it in &self.members {
218 it.encode(buf, version)?;
219 }
220 }
221 }
222 if version >= 0 {
223 put_i32(buf, self.authorized_operations);
224 }
225 if flex {
226 let tagged = WriteTaggedFields::new();
227 tagged.write(buf, &self.unknown_tagged_fields);
228 }
229 Ok(())
230 }
231 fn encoded_len(&self, version: i16) -> usize {
232 let flex = version >= 0;
233 let mut n: usize = 0;
234 if version >= 0 {
235 n += 2;
236 }
237 if version >= 0 {
238 n += if flex {
239 compact_nullable_string_len(self.error_message)
240 } else {
241 nullable_string_len(self.error_message)
242 };
243 }
244 if version >= 0 {
245 n += if flex {
246 compact_string_len(self.group_id)
247 } else {
248 string_len(self.group_id)
249 };
250 }
251 if version >= 0 {
252 n += if flex {
253 compact_string_len(self.group_state)
254 } else {
255 string_len(self.group_state)
256 };
257 }
258 if version >= 0 {
259 n += 4;
260 }
261 if version >= 0 {
262 n += 4;
263 }
264 if version >= 0 {
265 n += if flex {
266 compact_string_len(self.assignor_name)
267 } else {
268 string_len(self.assignor_name)
269 };
270 }
271 if version >= 0 {
272 n += {
273 let prefix =
274 crate::primitives::array::array_len_prefix_len((self.members).len(), flex);
275 let body: usize = (self.members)
276 .iter()
277 .map(|it| it.encoded_len(version))
278 .sum();
279 prefix + body
280 };
281 }
282 if version >= 0 {
283 n += 4;
284 }
285 if flex {
286 let known_pairs: Vec<(u32, usize)> = Vec::new();
287 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
288 }
289 n
290 }
291}
292impl<'de> DecodeBorrow<'de> for DescribedGroup<'de> {
293 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
294 let flex = version >= 0;
295 let mut out = Self::default();
296 if version >= 0 {
297 out.error_code = get_i16(buf)?;
298 }
299 if version >= 0 {
300 out.error_message = if flex {
301 get_compact_nullable_string_borrowed(buf)?
302 } else {
303 get_nullable_string_borrowed(buf)?
304 };
305 }
306 if version >= 0 {
307 out.group_id = if flex {
308 get_compact_string_borrowed(buf)?
309 } else {
310 get_string_borrowed(buf)?
311 };
312 }
313 if version >= 0 {
314 out.group_state = if flex {
315 get_compact_string_borrowed(buf)?
316 } else {
317 get_string_borrowed(buf)?
318 };
319 }
320 if version >= 0 {
321 out.group_epoch = get_i32(buf)?;
322 }
323 if version >= 0 {
324 out.assignment_epoch = get_i32(buf)?;
325 }
326 if version >= 0 {
327 out.assignor_name = if flex {
328 get_compact_string_borrowed(buf)?
329 } else {
330 get_string_borrowed(buf)?
331 };
332 }
333 if version >= 0 {
334 out.members = {
335 let n = crate::primitives::array::get_array_len(buf, flex)?;
336 let mut v = Vec::with_capacity(n);
337 for _ in 0..n {
338 v.push(Member::decode_borrow(buf, version)?);
339 }
340 v
341 };
342 }
343 if version >= 0 {
344 out.authorized_operations = get_i32(buf)?;
345 }
346 if flex {
347 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
348 }
349 Ok(out)
350 }
351}
352#[cfg(test)]
353impl DescribedGroup<'_> {
354 #[must_use]
355 pub fn populated(version: i16) -> Self {
356 let mut m = Self::default();
357 if version >= 0 {
358 m.error_code = 1i16;
359 }
360 if version >= 0 {
361 m.error_message = Some("x");
362 }
363 if version >= 0 {
364 m.group_id = "x";
365 }
366 if version >= 0 {
367 m.group_state = "x";
368 }
369 if version >= 0 {
370 m.group_epoch = 1i32;
371 }
372 if version >= 0 {
373 m.assignment_epoch = 1i32;
374 }
375 if version >= 0 {
376 m.assignor_name = "x";
377 }
378 if version >= 0 {
379 m.members = vec![Member::populated(version)];
380 }
381 if version >= 0 {
382 m.authorized_operations = 1i32;
383 }
384 m
385 }
386}
387#[derive(Debug, Clone, PartialEq, Eq)]
388pub struct Member<'a> {
389 pub member_id: &'a str,
390 pub instance_id: Option<&'a str>,
391 pub rack_id: Option<&'a str>,
392 pub member_epoch: i32,
393 pub client_id: &'a str,
394 pub client_host: &'a str,
395 pub subscribed_topic_names: Vec<&'a str>,
396 pub subscribed_topic_regex: Option<&'a str>,
397 pub assignment: super::common::consumer_group_describe_response::assignment::Assignment<'a>,
398 pub target_assignment:
399 super::common::consumer_group_describe_response::assignment::Assignment<'a>,
400 pub member_type: i8,
401 pub unknown_tagged_fields: UnknownTaggedFields,
402}
403impl Default for Member<'_> {
404 fn default() -> Self {
405 Self {
406 member_id: "",
407 instance_id: None,
408 rack_id: None,
409 member_epoch: 0i32,
410 client_id: "",
411 client_host: "",
412 subscribed_topic_names: Vec::new(),
413 subscribed_topic_regex: None,
414 assignment: Default::default(),
415 target_assignment: Default::default(),
416 member_type: -1i8,
417 unknown_tagged_fields: Default::default(),
418 }
419 }
420}
421impl Member<'_> {
422 pub fn to_owned(&self) -> crate::owned::consumer_group_describe_response::Member {
423 crate::owned::consumer_group_describe_response::Member {
424 member_id: (self.member_id).to_string(),
425 instance_id: (self.instance_id).map(std::string::ToString::to_string),
426 rack_id: (self.rack_id).map(std::string::ToString::to_string),
427 member_epoch: (self.member_epoch),
428 client_id: (self.client_id).to_string(),
429 client_host: (self.client_host).to_string(),
430 subscribed_topic_names: (self.subscribed_topic_names)
431 .iter()
432 .map(std::string::ToString::to_string)
433 .collect(),
434 subscribed_topic_regex: (self.subscribed_topic_regex)
435 .map(std::string::ToString::to_string),
436 assignment: (self.assignment).to_owned(),
437 target_assignment: (self.target_assignment).to_owned(),
438 member_type: (self.member_type),
439 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
440 }
441 }
442}
443impl Encode for Member<'_> {
444 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
445 let flex = version >= 0;
446 if version >= 0 {
447 if flex {
448 put_compact_string(buf, self.member_id);
449 } else {
450 put_string(buf, self.member_id);
451 }
452 }
453 if version >= 0 {
454 if flex {
455 put_compact_nullable_string(buf, self.instance_id);
456 } else {
457 put_nullable_string(buf, self.instance_id);
458 }
459 }
460 if version >= 0 {
461 if flex {
462 put_compact_nullable_string(buf, self.rack_id);
463 } else {
464 put_nullable_string(buf, self.rack_id);
465 }
466 }
467 if version >= 0 {
468 put_i32(buf, self.member_epoch);
469 }
470 if version >= 0 {
471 if flex {
472 put_compact_string(buf, self.client_id);
473 } else {
474 put_string(buf, self.client_id);
475 }
476 }
477 if version >= 0 {
478 if flex {
479 put_compact_string(buf, self.client_host);
480 } else {
481 put_string(buf, self.client_host);
482 }
483 }
484 if version >= 0 {
485 {
486 crate::primitives::array::put_array_len(
487 buf,
488 (self.subscribed_topic_names).len(),
489 flex,
490 );
491 for it in &self.subscribed_topic_names {
492 if flex {
493 put_compact_string(buf, it);
494 } else {
495 put_string(buf, it);
496 }
497 }
498 }
499 }
500 if version >= 0 {
501 if flex {
502 put_compact_nullable_string(buf, self.subscribed_topic_regex);
503 } else {
504 put_nullable_string(buf, self.subscribed_topic_regex);
505 }
506 }
507 if version >= 0 {
508 self.assignment.encode(buf, version)?;
509 }
510 if version >= 0 {
511 self.target_assignment.encode(buf, version)?;
512 }
513 if version >= 1 {
514 put_i8(buf, self.member_type);
515 }
516 if flex {
517 let tagged = WriteTaggedFields::new();
518 tagged.write(buf, &self.unknown_tagged_fields);
519 }
520 Ok(())
521 }
522 fn encoded_len(&self, version: i16) -> usize {
523 let flex = version >= 0;
524 let mut n: usize = 0;
525 if version >= 0 {
526 n += if flex {
527 compact_string_len(self.member_id)
528 } else {
529 string_len(self.member_id)
530 };
531 }
532 if version >= 0 {
533 n += if flex {
534 compact_nullable_string_len(self.instance_id)
535 } else {
536 nullable_string_len(self.instance_id)
537 };
538 }
539 if version >= 0 {
540 n += if flex {
541 compact_nullable_string_len(self.rack_id)
542 } else {
543 nullable_string_len(self.rack_id)
544 };
545 }
546 if version >= 0 {
547 n += 4;
548 }
549 if version >= 0 {
550 n += if flex {
551 compact_string_len(self.client_id)
552 } else {
553 string_len(self.client_id)
554 };
555 }
556 if version >= 0 {
557 n += if flex {
558 compact_string_len(self.client_host)
559 } else {
560 string_len(self.client_host)
561 };
562 }
563 if version >= 0 {
564 n += {
565 let prefix = crate::primitives::array::array_len_prefix_len(
566 (self.subscribed_topic_names).len(),
567 flex,
568 );
569 let body: usize = (self.subscribed_topic_names)
570 .iter()
571 .map(|it| {
572 if flex {
573 compact_string_len(it)
574 } else {
575 string_len(it)
576 }
577 })
578 .sum();
579 prefix + body
580 };
581 }
582 if version >= 0 {
583 n += if flex {
584 compact_nullable_string_len(self.subscribed_topic_regex)
585 } else {
586 nullable_string_len(self.subscribed_topic_regex)
587 };
588 }
589 if version >= 0 {
590 n += self.assignment.encoded_len(version);
591 }
592 if version >= 0 {
593 n += self.target_assignment.encoded_len(version);
594 }
595 if version >= 1 {
596 n += 1;
597 }
598 if flex {
599 let known_pairs: Vec<(u32, usize)> = Vec::new();
600 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
601 }
602 n
603 }
604}
605impl<'de> DecodeBorrow<'de> for Member<'de> {
606 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
607 let flex = version >= 0;
608 let mut out = Self::default();
609 if version >= 0 {
610 out.member_id = if flex {
611 get_compact_string_borrowed(buf)?
612 } else {
613 get_string_borrowed(buf)?
614 };
615 }
616 if version >= 0 {
617 out.instance_id = if flex {
618 get_compact_nullable_string_borrowed(buf)?
619 } else {
620 get_nullable_string_borrowed(buf)?
621 };
622 }
623 if version >= 0 {
624 out.rack_id = if flex {
625 get_compact_nullable_string_borrowed(buf)?
626 } else {
627 get_nullable_string_borrowed(buf)?
628 };
629 }
630 if version >= 0 {
631 out.member_epoch = get_i32(buf)?;
632 }
633 if version >= 0 {
634 out.client_id = if flex {
635 get_compact_string_borrowed(buf)?
636 } else {
637 get_string_borrowed(buf)?
638 };
639 }
640 if version >= 0 {
641 out.client_host = if flex {
642 get_compact_string_borrowed(buf)?
643 } else {
644 get_string_borrowed(buf)?
645 };
646 }
647 if version >= 0 {
648 out.subscribed_topic_names = {
649 let n = crate::primitives::array::get_array_len(buf, flex)?;
650 let mut v = Vec::with_capacity(n);
651 for _ in 0..n {
652 v.push(if flex {
653 get_compact_string_borrowed(buf)?
654 } else {
655 get_string_borrowed(buf)?
656 });
657 }
658 v
659 };
660 }
661 if version >= 0 {
662 out.subscribed_topic_regex = if flex {
663 get_compact_nullable_string_borrowed(buf)?
664 } else {
665 get_nullable_string_borrowed(buf)?
666 };
667 }
668 if version >= 0 {
669 out.assignment = super::common::consumer_group_describe_response::assignment::Assignment::decode_borrow(
670 buf,
671 version,
672 )?;
673 }
674 if version >= 0 {
675 out.target_assignment = super::common::consumer_group_describe_response::assignment::Assignment::decode_borrow(
676 buf,
677 version,
678 )?;
679 }
680 if version >= 1 {
681 out.member_type = get_i8(buf)?;
682 }
683 if flex {
684 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
685 }
686 Ok(out)
687 }
688}
689#[cfg(test)]
690impl Member<'_> {
691 #[must_use]
692 pub fn populated(version: i16) -> Self {
693 let mut m = Self::default();
694 if version >= 0 {
695 m.member_id = "x";
696 }
697 if version >= 0 {
698 m.instance_id = Some("x");
699 }
700 if version >= 0 {
701 m.rack_id = Some("x");
702 }
703 if version >= 0 {
704 m.member_epoch = 1i32;
705 }
706 if version >= 0 {
707 m.client_id = "x";
708 }
709 if version >= 0 {
710 m.client_host = "x";
711 }
712 if version >= 0 {
713 m.subscribed_topic_names = vec!["x"];
714 }
715 if version >= 0 {
716 m.subscribed_topic_regex = Some("x");
717 }
718 if version >= 0 {
719 m.assignment =
720 super::common::consumer_group_describe_response::assignment::Assignment::populated(
721 version,
722 );
723 }
724 if version >= 0 {
725 m.target_assignment =
726 super::common::consumer_group_describe_response::assignment::Assignment::populated(
727 version,
728 );
729 }
730 if version >= 1 {
731 m.member_type = 1i8;
732 }
733 m
734 }
735}