1use crate::primitives::fixed::{get_i8, get_i16, get_i32, put_i8, put_i16, put_i32};
4use crate::primitives::string_bytes::{
5 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
6 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
7 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
8};
9use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
10use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
11use bytes::{Buf, BufMut};
12pub const API_KEY: i16 = 69;
13pub const MIN_VERSION: i16 = 0;
14pub const MAX_VERSION: i16 = 1;
15pub const FLEXIBLE_MIN: i16 = 0;
16#[inline]
17fn is_flexible(version: i16) -> bool {
18 version >= FLEXIBLE_MIN
19}
20#[derive(Debug, Clone, PartialEq, Eq, Default)]
21pub struct ConsumerGroupDescribeResponse {
22 pub throttle_time_ms: i32,
23 pub groups: Vec<DescribedGroup>,
24 pub unknown_tagged_fields: UnknownTaggedFields,
25}
26impl Encode for ConsumerGroupDescribeResponse {
27 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
28 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
29 return Err(ProtocolError::UnsupportedVersion {
30 api_key: API_KEY,
31 version,
32 });
33 }
34 let flex = is_flexible(version);
35 if version >= 0 {
36 put_i32(buf, self.throttle_time_ms);
37 }
38 if version >= 0 {
39 {
40 crate::primitives::array::put_array_len(buf, (self.groups).len(), flex);
41 for it in &self.groups {
42 it.encode(buf, version)?;
43 }
44 }
45 }
46 if flex {
47 let tagged = WriteTaggedFields::new();
48 tagged.write(buf, &self.unknown_tagged_fields);
49 }
50 Ok(())
51 }
52 fn encoded_len(&self, version: i16) -> usize {
53 let flex = is_flexible(version);
54 let mut n: usize = 0;
55 if version >= 0 {
56 n += 4;
57 }
58 if version >= 0 {
59 n += {
60 let prefix =
61 crate::primitives::array::array_len_prefix_len((self.groups).len(), flex);
62 let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum();
63 prefix + body
64 };
65 }
66 if flex {
67 let known_pairs: Vec<(u32, usize)> = Vec::new();
68 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
69 }
70 n
71 }
72}
73impl Decode<'_> for ConsumerGroupDescribeResponse {
74 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
75 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
76 return Err(ProtocolError::UnsupportedVersion {
77 api_key: API_KEY,
78 version,
79 });
80 }
81 let flex = is_flexible(version);
82 let mut out = Self::default();
83 if version >= 0 {
84 out.throttle_time_ms = get_i32(buf)?;
85 }
86 if version >= 0 {
87 out.groups = {
88 let n = crate::primitives::array::get_array_len(buf, flex)?;
89 let mut v = Vec::with_capacity(n);
90 for _ in 0..n {
91 v.push(DescribedGroup::decode(buf, version)?);
92 }
93 v
94 };
95 }
96 if flex {
97 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
98 }
99 Ok(out)
100 }
101}
102#[cfg(test)]
103impl ConsumerGroupDescribeResponse {
104 #[must_use]
105 pub fn populated(version: i16) -> Self {
106 let mut m = Self::default();
107 if version >= 0 {
108 m.throttle_time_ms = 1i32;
109 }
110 if version >= 0 {
111 m.groups = vec![DescribedGroup::populated(version)];
112 }
113 m
114 }
115}
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct DescribedGroup {
118 pub error_code: i16,
119 pub error_message: Option<String>,
120 pub group_id: String,
121 pub group_state: String,
122 pub group_epoch: i32,
123 pub assignment_epoch: i32,
124 pub assignor_name: String,
125 pub members: Vec<Member>,
126 pub authorized_operations: i32,
127 pub unknown_tagged_fields: UnknownTaggedFields,
128}
129impl Default for DescribedGroup {
130 fn default() -> Self {
131 Self {
132 error_code: 0i16,
133 error_message: None,
134 group_id: String::new(),
135 group_state: String::new(),
136 group_epoch: 0i32,
137 assignment_epoch: 0i32,
138 assignor_name: String::new(),
139 members: Vec::new(),
140 authorized_operations: -2_147_483_648i32,
141 unknown_tagged_fields: Default::default(),
142 }
143 }
144}
145impl Encode for DescribedGroup {
146 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
147 let flex = version >= 0;
148 if version >= 0 {
149 put_i16(buf, self.error_code);
150 }
151 if version >= 0 {
152 if flex {
153 put_compact_nullable_string(buf, self.error_message.as_deref());
154 } else {
155 put_nullable_string(buf, self.error_message.as_deref());
156 }
157 }
158 if version >= 0 {
159 if flex {
160 put_compact_string(buf, &self.group_id);
161 } else {
162 put_string(buf, &self.group_id);
163 }
164 }
165 if version >= 0 {
166 if flex {
167 put_compact_string(buf, &self.group_state);
168 } else {
169 put_string(buf, &self.group_state);
170 }
171 }
172 if version >= 0 {
173 put_i32(buf, self.group_epoch);
174 }
175 if version >= 0 {
176 put_i32(buf, self.assignment_epoch);
177 }
178 if version >= 0 {
179 if flex {
180 put_compact_string(buf, &self.assignor_name);
181 } else {
182 put_string(buf, &self.assignor_name);
183 }
184 }
185 if version >= 0 {
186 {
187 crate::primitives::array::put_array_len(buf, (self.members).len(), flex);
188 for it in &self.members {
189 it.encode(buf, version)?;
190 }
191 }
192 }
193 if version >= 0 {
194 put_i32(buf, self.authorized_operations);
195 }
196 if flex {
197 let tagged = WriteTaggedFields::new();
198 tagged.write(buf, &self.unknown_tagged_fields);
199 }
200 Ok(())
201 }
202 fn encoded_len(&self, version: i16) -> usize {
203 let flex = version >= 0;
204 let mut n: usize = 0;
205 if version >= 0 {
206 n += 2;
207 }
208 if version >= 0 {
209 n += if flex {
210 compact_nullable_string_len(self.error_message.as_deref())
211 } else {
212 nullable_string_len(self.error_message.as_deref())
213 };
214 }
215 if version >= 0 {
216 n += if flex {
217 compact_string_len(&self.group_id)
218 } else {
219 string_len(&self.group_id)
220 };
221 }
222 if version >= 0 {
223 n += if flex {
224 compact_string_len(&self.group_state)
225 } else {
226 string_len(&self.group_state)
227 };
228 }
229 if version >= 0 {
230 n += 4;
231 }
232 if version >= 0 {
233 n += 4;
234 }
235 if version >= 0 {
236 n += if flex {
237 compact_string_len(&self.assignor_name)
238 } else {
239 string_len(&self.assignor_name)
240 };
241 }
242 if version >= 0 {
243 n += {
244 let prefix =
245 crate::primitives::array::array_len_prefix_len((self.members).len(), flex);
246 let body: usize = (self.members)
247 .iter()
248 .map(|it| it.encoded_len(version))
249 .sum();
250 prefix + body
251 };
252 }
253 if version >= 0 {
254 n += 4;
255 }
256 if flex {
257 let known_pairs: Vec<(u32, usize)> = Vec::new();
258 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
259 }
260 n
261 }
262}
263impl Decode<'_> for DescribedGroup {
264 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
265 let flex = version >= 0;
266 let mut out = Self::default();
267 if version >= 0 {
268 out.error_code = get_i16(buf)?;
269 }
270 if version >= 0 {
271 out.error_message = if flex {
272 get_compact_nullable_string_owned(buf)?
273 } else {
274 get_nullable_string_owned(buf)?
275 };
276 }
277 if version >= 0 {
278 out.group_id = if flex {
279 get_compact_string_owned(buf)?
280 } else {
281 get_string_owned(buf)?
282 };
283 }
284 if version >= 0 {
285 out.group_state = if flex {
286 get_compact_string_owned(buf)?
287 } else {
288 get_string_owned(buf)?
289 };
290 }
291 if version >= 0 {
292 out.group_epoch = get_i32(buf)?;
293 }
294 if version >= 0 {
295 out.assignment_epoch = get_i32(buf)?;
296 }
297 if version >= 0 {
298 out.assignor_name = if flex {
299 get_compact_string_owned(buf)?
300 } else {
301 get_string_owned(buf)?
302 };
303 }
304 if version >= 0 {
305 out.members = {
306 let n = crate::primitives::array::get_array_len(buf, flex)?;
307 let mut v = Vec::with_capacity(n);
308 for _ in 0..n {
309 v.push(Member::decode(buf, version)?);
310 }
311 v
312 };
313 }
314 if version >= 0 {
315 out.authorized_operations = get_i32(buf)?;
316 }
317 if flex {
318 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
319 }
320 Ok(out)
321 }
322}
323#[cfg(test)]
324impl DescribedGroup {
325 #[must_use]
326 pub fn populated(version: i16) -> Self {
327 let mut m = Self::default();
328 if version >= 0 {
329 m.error_code = 1i16;
330 }
331 if version >= 0 {
332 m.error_message = Some("x".to_string());
333 }
334 if version >= 0 {
335 m.group_id = "x".to_string();
336 }
337 if version >= 0 {
338 m.group_state = "x".to_string();
339 }
340 if version >= 0 {
341 m.group_epoch = 1i32;
342 }
343 if version >= 0 {
344 m.assignment_epoch = 1i32;
345 }
346 if version >= 0 {
347 m.assignor_name = "x".to_string();
348 }
349 if version >= 0 {
350 m.members = vec![Member::populated(version)];
351 }
352 if version >= 0 {
353 m.authorized_operations = 1i32;
354 }
355 m
356 }
357}
358#[derive(Debug, Clone, PartialEq, Eq)]
359pub struct Member {
360 pub member_id: String,
361 pub instance_id: Option<String>,
362 pub rack_id: Option<String>,
363 pub member_epoch: i32,
364 pub client_id: String,
365 pub client_host: String,
366 pub subscribed_topic_names: Vec<String>,
367 pub subscribed_topic_regex: Option<String>,
368 pub assignment: super::common::consumer_group_describe_response::assignment::Assignment,
369 pub target_assignment: super::common::consumer_group_describe_response::assignment::Assignment,
370 pub member_type: i8,
371 pub unknown_tagged_fields: UnknownTaggedFields,
372}
373impl Default for Member {
374 fn default() -> Self {
375 Self {
376 member_id: String::new(),
377 instance_id: None,
378 rack_id: None,
379 member_epoch: 0i32,
380 client_id: String::new(),
381 client_host: String::new(),
382 subscribed_topic_names: Vec::new(),
383 subscribed_topic_regex: None,
384 assignment: Default::default(),
385 target_assignment: Default::default(),
386 member_type: -1i8,
387 unknown_tagged_fields: Default::default(),
388 }
389 }
390}
391impl Encode for Member {
392 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
393 let flex = version >= 0;
394 if version >= 0 {
395 if flex {
396 put_compact_string(buf, &self.member_id);
397 } else {
398 put_string(buf, &self.member_id);
399 }
400 }
401 if version >= 0 {
402 if flex {
403 put_compact_nullable_string(buf, self.instance_id.as_deref());
404 } else {
405 put_nullable_string(buf, self.instance_id.as_deref());
406 }
407 }
408 if version >= 0 {
409 if flex {
410 put_compact_nullable_string(buf, self.rack_id.as_deref());
411 } else {
412 put_nullable_string(buf, self.rack_id.as_deref());
413 }
414 }
415 if version >= 0 {
416 put_i32(buf, self.member_epoch);
417 }
418 if version >= 0 {
419 if flex {
420 put_compact_string(buf, &self.client_id);
421 } else {
422 put_string(buf, &self.client_id);
423 }
424 }
425 if version >= 0 {
426 if flex {
427 put_compact_string(buf, &self.client_host);
428 } else {
429 put_string(buf, &self.client_host);
430 }
431 }
432 if version >= 0 {
433 {
434 crate::primitives::array::put_array_len(
435 buf,
436 (self.subscribed_topic_names).len(),
437 flex,
438 );
439 for it in &self.subscribed_topic_names {
440 if flex {
441 put_compact_string(buf, it);
442 } else {
443 put_string(buf, it);
444 }
445 }
446 }
447 }
448 if version >= 0 {
449 if flex {
450 put_compact_nullable_string(buf, self.subscribed_topic_regex.as_deref());
451 } else {
452 put_nullable_string(buf, self.subscribed_topic_regex.as_deref());
453 }
454 }
455 if version >= 0 {
456 self.assignment.encode(buf, version)?;
457 }
458 if version >= 0 {
459 self.target_assignment.encode(buf, version)?;
460 }
461 if version >= 1 {
462 put_i8(buf, self.member_type);
463 }
464 if flex {
465 let tagged = WriteTaggedFields::new();
466 tagged.write(buf, &self.unknown_tagged_fields);
467 }
468 Ok(())
469 }
470 fn encoded_len(&self, version: i16) -> usize {
471 let flex = version >= 0;
472 let mut n: usize = 0;
473 if version >= 0 {
474 n += if flex {
475 compact_string_len(&self.member_id)
476 } else {
477 string_len(&self.member_id)
478 };
479 }
480 if version >= 0 {
481 n += if flex {
482 compact_nullable_string_len(self.instance_id.as_deref())
483 } else {
484 nullable_string_len(self.instance_id.as_deref())
485 };
486 }
487 if version >= 0 {
488 n += if flex {
489 compact_nullable_string_len(self.rack_id.as_deref())
490 } else {
491 nullable_string_len(self.rack_id.as_deref())
492 };
493 }
494 if version >= 0 {
495 n += 4;
496 }
497 if version >= 0 {
498 n += if flex {
499 compact_string_len(&self.client_id)
500 } else {
501 string_len(&self.client_id)
502 };
503 }
504 if version >= 0 {
505 n += if flex {
506 compact_string_len(&self.client_host)
507 } else {
508 string_len(&self.client_host)
509 };
510 }
511 if version >= 0 {
512 n += {
513 let prefix = crate::primitives::array::array_len_prefix_len(
514 (self.subscribed_topic_names).len(),
515 flex,
516 );
517 let body: usize = (self.subscribed_topic_names)
518 .iter()
519 .map(|it| {
520 if flex {
521 compact_string_len(it)
522 } else {
523 string_len(it)
524 }
525 })
526 .sum();
527 prefix + body
528 };
529 }
530 if version >= 0 {
531 n += if flex {
532 compact_nullable_string_len(self.subscribed_topic_regex.as_deref())
533 } else {
534 nullable_string_len(self.subscribed_topic_regex.as_deref())
535 };
536 }
537 if version >= 0 {
538 n += self.assignment.encoded_len(version);
539 }
540 if version >= 0 {
541 n += self.target_assignment.encoded_len(version);
542 }
543 if version >= 1 {
544 n += 1;
545 }
546 if flex {
547 let known_pairs: Vec<(u32, usize)> = Vec::new();
548 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
549 }
550 n
551 }
552}
553impl Decode<'_> for Member {
554 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
555 let flex = version >= 0;
556 let mut out = Self::default();
557 if version >= 0 {
558 out.member_id = if flex {
559 get_compact_string_owned(buf)?
560 } else {
561 get_string_owned(buf)?
562 };
563 }
564 if version >= 0 {
565 out.instance_id = if flex {
566 get_compact_nullable_string_owned(buf)?
567 } else {
568 get_nullable_string_owned(buf)?
569 };
570 }
571 if version >= 0 {
572 out.rack_id = if flex {
573 get_compact_nullable_string_owned(buf)?
574 } else {
575 get_nullable_string_owned(buf)?
576 };
577 }
578 if version >= 0 {
579 out.member_epoch = get_i32(buf)?;
580 }
581 if version >= 0 {
582 out.client_id = if flex {
583 get_compact_string_owned(buf)?
584 } else {
585 get_string_owned(buf)?
586 };
587 }
588 if version >= 0 {
589 out.client_host = if flex {
590 get_compact_string_owned(buf)?
591 } else {
592 get_string_owned(buf)?
593 };
594 }
595 if version >= 0 {
596 out.subscribed_topic_names = {
597 let n = crate::primitives::array::get_array_len(buf, flex)?;
598 let mut v = Vec::with_capacity(n);
599 for _ in 0..n {
600 v.push(if flex {
601 get_compact_string_owned(buf)?
602 } else {
603 get_string_owned(buf)?
604 });
605 }
606 v
607 };
608 }
609 if version >= 0 {
610 out.subscribed_topic_regex = if flex {
611 get_compact_nullable_string_owned(buf)?
612 } else {
613 get_nullable_string_owned(buf)?
614 };
615 }
616 if version >= 0 {
617 out.assignment =
618 super::common::consumer_group_describe_response::assignment::Assignment::decode(
619 buf, version,
620 )?;
621 }
622 if version >= 0 {
623 out.target_assignment =
624 super::common::consumer_group_describe_response::assignment::Assignment::decode(
625 buf, version,
626 )?;
627 }
628 if version >= 1 {
629 out.member_type = get_i8(buf)?;
630 }
631 if flex {
632 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
633 }
634 Ok(out)
635 }
636}
637#[cfg(test)]
638impl Member {
639 #[must_use]
640 pub fn populated(version: i16) -> Self {
641 let mut m = Self::default();
642 if version >= 0 {
643 m.member_id = "x".to_string();
644 }
645 if version >= 0 {
646 m.instance_id = Some("x".to_string());
647 }
648 if version >= 0 {
649 m.rack_id = Some("x".to_string());
650 }
651 if version >= 0 {
652 m.member_epoch = 1i32;
653 }
654 if version >= 0 {
655 m.client_id = "x".to_string();
656 }
657 if version >= 0 {
658 m.client_host = "x".to_string();
659 }
660 if version >= 0 {
661 m.subscribed_topic_names = vec!["x".to_string()];
662 }
663 if version >= 0 {
664 m.subscribed_topic_regex = Some("x".to_string());
665 }
666 if version >= 0 {
667 m.assignment =
668 super::common::consumer_group_describe_response::assignment::Assignment::populated(
669 version,
670 );
671 }
672 if version >= 0 {
673 m.target_assignment =
674 super::common::consumer_group_describe_response::assignment::Assignment::populated(
675 version,
676 );
677 }
678 if version >= 1 {
679 m.member_type = 1i8;
680 }
681 m
682 }
683}
684#[must_use]
687#[allow(unused_comparisons)]
688pub fn default_json(version: i16) -> ::serde_json::Value {
689 let mut obj = ::serde_json::Map::new();
690 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
691 obj.insert("groups".to_string(), ::serde_json::Value::Array(vec![]));
692 ::serde_json::Value::Object(obj)
693}