1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_i8, get_i16, get_i32, put_i8, put_i16, put_i32};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
13
14pub const API_KEY: i16 = 69;
15pub const MIN_VERSION: i16 = 0;
16pub const MAX_VERSION: i16 = 1;
17pub const FLEXIBLE_MIN: i16 = 0;
18
19#[inline]
20fn is_flexible(version: i16) -> bool {
21 version >= FLEXIBLE_MIN
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Default)]
25pub struct ConsumerGroupDescribeResponse {
26 pub throttle_time_ms: i32,
27 pub groups: Vec<DescribedGroup>,
28 pub unknown_tagged_fields: UnknownTaggedFields,
29}
30impl Encode for ConsumerGroupDescribeResponse {
31 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
32 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
33 return Err(ProtocolError::UnsupportedVersion {
34 api_key: API_KEY,
35 version,
36 });
37 }
38 let flex = is_flexible(version);
39 if version >= 0 {
40 put_i32(buf, self.throttle_time_ms);
41 }
42 if version >= 0 {
43 {
44 crate::primitives::array::put_array_len(buf, (self.groups).len(), flex);
45 for it in &self.groups {
46 it.encode(buf, version)?;
47 }
48 }
49 }
50 if flex {
51 let tagged = WriteTaggedFields::new();
52 tagged.write(buf, &self.unknown_tagged_fields);
53 }
54 Ok(())
55 }
56 fn encoded_len(&self, version: i16) -> usize {
57 let flex = is_flexible(version);
58 let mut n: usize = 0;
59 if version >= 0 {
60 n += 4;
61 }
62 if version >= 0 {
63 n += {
64 let prefix =
65 crate::primitives::array::array_len_prefix_len((self.groups).len(), flex);
66 let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum();
67 prefix + body
68 };
69 }
70 if flex {
71 let known_pairs: Vec<(u32, usize)> = Vec::new();
72 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
73 }
74 n
75 }
76}
77impl Decode<'_> for ConsumerGroupDescribeResponse {
78 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
79 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
80 return Err(ProtocolError::UnsupportedVersion {
81 api_key: API_KEY,
82 version,
83 });
84 }
85 let flex = is_flexible(version);
86 let mut out = Self::default();
87 if version >= 0 {
88 out.throttle_time_ms = get_i32(buf)?;
89 }
90 if version >= 0 {
91 out.groups = {
92 let n = crate::primitives::array::get_array_len(buf, flex)?;
93 let mut v = Vec::with_capacity(n);
94 for _ in 0..n {
95 v.push(DescribedGroup::decode(buf, version)?);
96 }
97 v
98 };
99 }
100 if flex {
101 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
102 }
103 Ok(out)
104 }
105}
106#[cfg(test)]
107impl ConsumerGroupDescribeResponse {
108 #[must_use]
109 pub fn populated(version: i16) -> Self {
110 let mut m = Self::default();
111 if version >= 0 {
112 m.throttle_time_ms = 1i32;
113 }
114 if version >= 0 {
115 m.groups = vec![DescribedGroup::populated(version)];
116 }
117 m
118 }
119}
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub struct DescribedGroup {
122 pub error_code: i16,
123 pub error_message: Option<String>,
124 pub group_id: String,
125 pub group_state: String,
126 pub group_epoch: i32,
127 pub assignment_epoch: i32,
128 pub assignor_name: String,
129 pub members: Vec<Member>,
130 pub authorized_operations: i32,
131 pub unknown_tagged_fields: UnknownTaggedFields,
132}
133impl Default for DescribedGroup {
134 fn default() -> Self {
135 Self {
136 error_code: 0i16,
137 error_message: None,
138 group_id: String::new(),
139 group_state: String::new(),
140 group_epoch: 0i32,
141 assignment_epoch: 0i32,
142 assignor_name: String::new(),
143 members: Vec::new(),
144 authorized_operations: -2_147_483_648i32,
145 unknown_tagged_fields: Default::default(),
146 }
147 }
148}
149impl Encode for DescribedGroup {
150 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
151 let flex = version >= 0;
152 if version >= 0 {
153 put_i16(buf, self.error_code);
154 }
155 if version >= 0 {
156 if flex {
157 put_compact_nullable_string(buf, self.error_message.as_deref());
158 } else {
159 put_nullable_string(buf, self.error_message.as_deref());
160 }
161 }
162 if version >= 0 {
163 if flex {
164 put_compact_string(buf, &self.group_id);
165 } else {
166 put_string(buf, &self.group_id);
167 }
168 }
169 if version >= 0 {
170 if flex {
171 put_compact_string(buf, &self.group_state);
172 } else {
173 put_string(buf, &self.group_state);
174 }
175 }
176 if version >= 0 {
177 put_i32(buf, self.group_epoch);
178 }
179 if version >= 0 {
180 put_i32(buf, self.assignment_epoch);
181 }
182 if version >= 0 {
183 if flex {
184 put_compact_string(buf, &self.assignor_name);
185 } else {
186 put_string(buf, &self.assignor_name);
187 }
188 }
189 if version >= 0 {
190 {
191 crate::primitives::array::put_array_len(buf, (self.members).len(), flex);
192 for it in &self.members {
193 it.encode(buf, version)?;
194 }
195 }
196 }
197 if version >= 0 {
198 put_i32(buf, self.authorized_operations);
199 }
200 if flex {
201 let tagged = WriteTaggedFields::new();
202 tagged.write(buf, &self.unknown_tagged_fields);
203 }
204 Ok(())
205 }
206 fn encoded_len(&self, version: i16) -> usize {
207 let flex = version >= 0;
208 let mut n: usize = 0;
209 if version >= 0 {
210 n += 2;
211 }
212 if version >= 0 {
213 n += if flex {
214 compact_nullable_string_len(self.error_message.as_deref())
215 } else {
216 nullable_string_len(self.error_message.as_deref())
217 };
218 }
219 if version >= 0 {
220 n += if flex {
221 compact_string_len(&self.group_id)
222 } else {
223 string_len(&self.group_id)
224 };
225 }
226 if version >= 0 {
227 n += if flex {
228 compact_string_len(&self.group_state)
229 } else {
230 string_len(&self.group_state)
231 };
232 }
233 if version >= 0 {
234 n += 4;
235 }
236 if version >= 0 {
237 n += 4;
238 }
239 if version >= 0 {
240 n += if flex {
241 compact_string_len(&self.assignor_name)
242 } else {
243 string_len(&self.assignor_name)
244 };
245 }
246 if version >= 0 {
247 n += {
248 let prefix =
249 crate::primitives::array::array_len_prefix_len((self.members).len(), flex);
250 let body: usize = (self.members)
251 .iter()
252 .map(|it| it.encoded_len(version))
253 .sum();
254 prefix + body
255 };
256 }
257 if version >= 0 {
258 n += 4;
259 }
260 if flex {
261 let known_pairs: Vec<(u32, usize)> = Vec::new();
262 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
263 }
264 n
265 }
266}
267impl Decode<'_> for DescribedGroup {
268 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
269 let flex = version >= 0;
270 let mut out = Self::default();
271 if version >= 0 {
272 out.error_code = get_i16(buf)?;
273 }
274 if version >= 0 {
275 out.error_message = if flex {
276 get_compact_nullable_string_owned(buf)?
277 } else {
278 get_nullable_string_owned(buf)?
279 };
280 }
281 if version >= 0 {
282 out.group_id = if flex {
283 get_compact_string_owned(buf)?
284 } else {
285 get_string_owned(buf)?
286 };
287 }
288 if version >= 0 {
289 out.group_state = if flex {
290 get_compact_string_owned(buf)?
291 } else {
292 get_string_owned(buf)?
293 };
294 }
295 if version >= 0 {
296 out.group_epoch = get_i32(buf)?;
297 }
298 if version >= 0 {
299 out.assignment_epoch = get_i32(buf)?;
300 }
301 if version >= 0 {
302 out.assignor_name = if flex {
303 get_compact_string_owned(buf)?
304 } else {
305 get_string_owned(buf)?
306 };
307 }
308 if version >= 0 {
309 out.members = {
310 let n = crate::primitives::array::get_array_len(buf, flex)?;
311 let mut v = Vec::with_capacity(n);
312 for _ in 0..n {
313 v.push(Member::decode(buf, version)?);
314 }
315 v
316 };
317 }
318 if version >= 0 {
319 out.authorized_operations = get_i32(buf)?;
320 }
321 if flex {
322 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
323 }
324 Ok(out)
325 }
326}
327#[cfg(test)]
328impl DescribedGroup {
329 #[must_use]
330 pub fn populated(version: i16) -> Self {
331 let mut m = Self::default();
332 if version >= 0 {
333 m.error_code = 1i16;
334 }
335 if version >= 0 {
336 m.error_message = Some("x".to_string());
337 }
338 if version >= 0 {
339 m.group_id = "x".to_string();
340 }
341 if version >= 0 {
342 m.group_state = "x".to_string();
343 }
344 if version >= 0 {
345 m.group_epoch = 1i32;
346 }
347 if version >= 0 {
348 m.assignment_epoch = 1i32;
349 }
350 if version >= 0 {
351 m.assignor_name = "x".to_string();
352 }
353 if version >= 0 {
354 m.members = vec![Member::populated(version)];
355 }
356 if version >= 0 {
357 m.authorized_operations = 1i32;
358 }
359 m
360 }
361}
362#[derive(Debug, Clone, PartialEq, Eq)]
363pub struct Member {
364 pub member_id: String,
365 pub instance_id: Option<String>,
366 pub rack_id: Option<String>,
367 pub member_epoch: i32,
368 pub client_id: String,
369 pub client_host: String,
370 pub subscribed_topic_names: Vec<String>,
371 pub subscribed_topic_regex: Option<String>,
372 pub assignment: super::common::consumer_group_describe_response::assignment::Assignment,
373 pub target_assignment: super::common::consumer_group_describe_response::assignment::Assignment,
374 pub member_type: i8,
375 pub unknown_tagged_fields: UnknownTaggedFields,
376}
377impl Default for Member {
378 fn default() -> Self {
379 Self {
380 member_id: String::new(),
381 instance_id: None,
382 rack_id: None,
383 member_epoch: 0i32,
384 client_id: String::new(),
385 client_host: String::new(),
386 subscribed_topic_names: Vec::new(),
387 subscribed_topic_regex: None,
388 assignment: Default::default(),
389 target_assignment: Default::default(),
390 member_type: -1i8,
391 unknown_tagged_fields: Default::default(),
392 }
393 }
394}
395impl Encode for Member {
396 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
397 let flex = version >= 0;
398 if version >= 0 {
399 if flex {
400 put_compact_string(buf, &self.member_id);
401 } else {
402 put_string(buf, &self.member_id);
403 }
404 }
405 if version >= 0 {
406 if flex {
407 put_compact_nullable_string(buf, self.instance_id.as_deref());
408 } else {
409 put_nullable_string(buf, self.instance_id.as_deref());
410 }
411 }
412 if version >= 0 {
413 if flex {
414 put_compact_nullable_string(buf, self.rack_id.as_deref());
415 } else {
416 put_nullable_string(buf, self.rack_id.as_deref());
417 }
418 }
419 if version >= 0 {
420 put_i32(buf, self.member_epoch);
421 }
422 if version >= 0 {
423 if flex {
424 put_compact_string(buf, &self.client_id);
425 } else {
426 put_string(buf, &self.client_id);
427 }
428 }
429 if version >= 0 {
430 if flex {
431 put_compact_string(buf, &self.client_host);
432 } else {
433 put_string(buf, &self.client_host);
434 }
435 }
436 if version >= 0 {
437 {
438 crate::primitives::array::put_array_len(
439 buf,
440 (self.subscribed_topic_names).len(),
441 flex,
442 );
443 for it in &self.subscribed_topic_names {
444 if flex {
445 put_compact_string(buf, it);
446 } else {
447 put_string(buf, it);
448 }
449 }
450 }
451 }
452 if version >= 0 {
453 if flex {
454 put_compact_nullable_string(buf, self.subscribed_topic_regex.as_deref());
455 } else {
456 put_nullable_string(buf, self.subscribed_topic_regex.as_deref());
457 }
458 }
459 if version >= 0 {
460 self.assignment.encode(buf, version)?;
461 }
462 if version >= 0 {
463 self.target_assignment.encode(buf, version)?;
464 }
465 if version >= 1 {
466 put_i8(buf, self.member_type);
467 }
468 if flex {
469 let tagged = WriteTaggedFields::new();
470 tagged.write(buf, &self.unknown_tagged_fields);
471 }
472 Ok(())
473 }
474 fn encoded_len(&self, version: i16) -> usize {
475 let flex = version >= 0;
476 let mut n: usize = 0;
477 if version >= 0 {
478 n += if flex {
479 compact_string_len(&self.member_id)
480 } else {
481 string_len(&self.member_id)
482 };
483 }
484 if version >= 0 {
485 n += if flex {
486 compact_nullable_string_len(self.instance_id.as_deref())
487 } else {
488 nullable_string_len(self.instance_id.as_deref())
489 };
490 }
491 if version >= 0 {
492 n += if flex {
493 compact_nullable_string_len(self.rack_id.as_deref())
494 } else {
495 nullable_string_len(self.rack_id.as_deref())
496 };
497 }
498 if version >= 0 {
499 n += 4;
500 }
501 if version >= 0 {
502 n += if flex {
503 compact_string_len(&self.client_id)
504 } else {
505 string_len(&self.client_id)
506 };
507 }
508 if version >= 0 {
509 n += if flex {
510 compact_string_len(&self.client_host)
511 } else {
512 string_len(&self.client_host)
513 };
514 }
515 if version >= 0 {
516 n += {
517 let prefix = crate::primitives::array::array_len_prefix_len(
518 (self.subscribed_topic_names).len(),
519 flex,
520 );
521 let body: usize = (self.subscribed_topic_names)
522 .iter()
523 .map(|it| {
524 if flex {
525 compact_string_len(it)
526 } else {
527 string_len(it)
528 }
529 })
530 .sum();
531 prefix + body
532 };
533 }
534 if version >= 0 {
535 n += if flex {
536 compact_nullable_string_len(self.subscribed_topic_regex.as_deref())
537 } else {
538 nullable_string_len(self.subscribed_topic_regex.as_deref())
539 };
540 }
541 if version >= 0 {
542 n += self.assignment.encoded_len(version);
543 }
544 if version >= 0 {
545 n += self.target_assignment.encoded_len(version);
546 }
547 if version >= 1 {
548 n += 1;
549 }
550 if flex {
551 let known_pairs: Vec<(u32, usize)> = Vec::new();
552 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
553 }
554 n
555 }
556}
557impl Decode<'_> for Member {
558 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
559 let flex = version >= 0;
560 let mut out = Self::default();
561 if version >= 0 {
562 out.member_id = if flex {
563 get_compact_string_owned(buf)?
564 } else {
565 get_string_owned(buf)?
566 };
567 }
568 if version >= 0 {
569 out.instance_id = if flex {
570 get_compact_nullable_string_owned(buf)?
571 } else {
572 get_nullable_string_owned(buf)?
573 };
574 }
575 if version >= 0 {
576 out.rack_id = if flex {
577 get_compact_nullable_string_owned(buf)?
578 } else {
579 get_nullable_string_owned(buf)?
580 };
581 }
582 if version >= 0 {
583 out.member_epoch = get_i32(buf)?;
584 }
585 if version >= 0 {
586 out.client_id = if flex {
587 get_compact_string_owned(buf)?
588 } else {
589 get_string_owned(buf)?
590 };
591 }
592 if version >= 0 {
593 out.client_host = if flex {
594 get_compact_string_owned(buf)?
595 } else {
596 get_string_owned(buf)?
597 };
598 }
599 if version >= 0 {
600 out.subscribed_topic_names = {
601 let n = crate::primitives::array::get_array_len(buf, flex)?;
602 let mut v = Vec::with_capacity(n);
603 for _ in 0..n {
604 v.push(if flex {
605 get_compact_string_owned(buf)?
606 } else {
607 get_string_owned(buf)?
608 });
609 }
610 v
611 };
612 }
613 if version >= 0 {
614 out.subscribed_topic_regex = if flex {
615 get_compact_nullable_string_owned(buf)?
616 } else {
617 get_nullable_string_owned(buf)?
618 };
619 }
620 if version >= 0 {
621 out.assignment =
622 super::common::consumer_group_describe_response::assignment::Assignment::decode(
623 buf, version,
624 )?;
625 }
626 if version >= 0 {
627 out.target_assignment =
628 super::common::consumer_group_describe_response::assignment::Assignment::decode(
629 buf, version,
630 )?;
631 }
632 if version >= 1 {
633 out.member_type = get_i8(buf)?;
634 }
635 if flex {
636 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
637 }
638 Ok(out)
639 }
640}
641#[cfg(test)]
642impl Member {
643 #[must_use]
644 pub fn populated(version: i16) -> Self {
645 let mut m = Self::default();
646 if version >= 0 {
647 m.member_id = "x".to_string();
648 }
649 if version >= 0 {
650 m.instance_id = Some("x".to_string());
651 }
652 if version >= 0 {
653 m.rack_id = Some("x".to_string());
654 }
655 if version >= 0 {
656 m.member_epoch = 1i32;
657 }
658 if version >= 0 {
659 m.client_id = "x".to_string();
660 }
661 if version >= 0 {
662 m.client_host = "x".to_string();
663 }
664 if version >= 0 {
665 m.subscribed_topic_names = vec!["x".to_string()];
666 }
667 if version >= 0 {
668 m.subscribed_topic_regex = Some("x".to_string());
669 }
670 if version >= 0 {
671 m.assignment =
672 super::common::consumer_group_describe_response::assignment::Assignment::populated(
673 version,
674 );
675 }
676 if version >= 0 {
677 m.target_assignment =
678 super::common::consumer_group_describe_response::assignment::Assignment::populated(
679 version,
680 );
681 }
682 if version >= 1 {
683 m.member_type = 1i8;
684 }
685 m
686 }
687}
688
689#[must_use]
692#[allow(unused_comparisons)]
693pub fn default_json(version: i16) -> ::serde_json::Value {
694 let mut obj = ::serde_json::Map::new();
695 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
696 obj.insert("groups".to_string(), ::serde_json::Value::Array(vec![]));
697 ::serde_json::Value::Object(obj)
698}