1use crate::primitives::fixed::{get_bool, get_i8, get_i16, get_i32, put_bool, put_i16, put_i32};
4use crate::primitives::string_bytes::{
5 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
6 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
7 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
8};
9use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
10use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
11use bytes::{Buf, BufMut};
12pub const API_KEY: i16 = 89;
13pub const MIN_VERSION: i16 = 0;
14pub const MAX_VERSION: i16 = 0;
15pub const FLEXIBLE_MIN: i16 = 0;
16#[inline]
17fn is_flexible(version: i16) -> bool {
18 version >= FLEXIBLE_MIN
19}
20#[derive(Debug, Clone, PartialEq, Eq, Default)]
21pub struct StreamsGroupDescribeResponse {
22 pub throttle_time_ms: i32,
23 pub groups: Vec<DescribedGroup>,
24 pub unknown_tagged_fields: UnknownTaggedFields,
25}
26impl Encode for StreamsGroupDescribeResponse {
27 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
28 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
29 return Err(ProtocolError::UnsupportedVersion {
30 api_key: API_KEY,
31 version,
32 });
33 }
34 let flex = is_flexible(version);
35 if version >= 0 {
36 put_i32(buf, self.throttle_time_ms);
37 }
38 if version >= 0 {
39 {
40 crate::primitives::array::put_array_len(buf, (self.groups).len(), flex);
41 for it in &self.groups {
42 it.encode(buf, version)?;
43 }
44 }
45 }
46 if flex {
47 let tagged = WriteTaggedFields::new();
48 tagged.write(buf, &self.unknown_tagged_fields);
49 }
50 Ok(())
51 }
52 fn encoded_len(&self, version: i16) -> usize {
53 let flex = is_flexible(version);
54 let mut n: usize = 0;
55 if version >= 0 {
56 n += 4;
57 }
58 if version >= 0 {
59 n += {
60 let prefix =
61 crate::primitives::array::array_len_prefix_len((self.groups).len(), flex);
62 let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum();
63 prefix + body
64 };
65 }
66 if flex {
67 let known_pairs: Vec<(u32, usize)> = Vec::new();
68 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
69 }
70 n
71 }
72}
73impl Decode<'_> for StreamsGroupDescribeResponse {
74 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
75 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
76 return Err(ProtocolError::UnsupportedVersion {
77 api_key: API_KEY,
78 version,
79 });
80 }
81 let flex = is_flexible(version);
82 let mut out = Self::default();
83 if version >= 0 {
84 out.throttle_time_ms = get_i32(buf)?;
85 }
86 if version >= 0 {
87 out.groups = {
88 let n = crate::primitives::array::get_array_len(buf, flex)?;
89 let mut v = Vec::with_capacity(n);
90 for _ in 0..n {
91 v.push(DescribedGroup::decode(buf, version)?);
92 }
93 v
94 };
95 }
96 if flex {
97 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
98 }
99 Ok(out)
100 }
101}
102#[cfg(test)]
103impl StreamsGroupDescribeResponse {
104 #[must_use]
105 pub fn populated(version: i16) -> Self {
106 let mut m = Self::default();
107 if version >= 0 {
108 m.throttle_time_ms = 1i32;
109 }
110 if version >= 0 {
111 m.groups = vec![DescribedGroup::populated(version)];
112 }
113 m
114 }
115}
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct DescribedGroup {
118 pub error_code: i16,
119 pub error_message: Option<String>,
120 pub group_id: String,
121 pub group_state: String,
122 pub group_epoch: i32,
123 pub assignment_epoch: i32,
124 pub topology: Option<Topology>,
125 pub members: Vec<Member>,
126 pub authorized_operations: i32,
127 pub unknown_tagged_fields: UnknownTaggedFields,
128}
129impl Default for DescribedGroup {
130 fn default() -> Self {
131 Self {
132 error_code: 0i16,
133 error_message: None,
134 group_id: String::new(),
135 group_state: String::new(),
136 group_epoch: 0i32,
137 assignment_epoch: 0i32,
138 topology: None,
139 members: Vec::new(),
140 authorized_operations: -2_147_483_648i32,
141 unknown_tagged_fields: Default::default(),
142 }
143 }
144}
145impl Encode for DescribedGroup {
146 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
147 let flex = version >= 0;
148 if version >= 0 {
149 put_i16(buf, self.error_code);
150 }
151 if version >= 0 {
152 if flex {
153 put_compact_nullable_string(buf, self.error_message.as_deref());
154 } else {
155 put_nullable_string(buf, self.error_message.as_deref());
156 }
157 }
158 if version >= 0 {
159 if flex {
160 put_compact_string(buf, &self.group_id);
161 } else {
162 put_string(buf, &self.group_id);
163 }
164 }
165 if version >= 0 {
166 if flex {
167 put_compact_string(buf, &self.group_state);
168 } else {
169 put_string(buf, &self.group_state);
170 }
171 }
172 if version >= 0 {
173 put_i32(buf, self.group_epoch);
174 }
175 if version >= 0 {
176 put_i32(buf, self.assignment_epoch);
177 }
178 if version >= 0 {
179 match &self.topology {
180 None => {
181 buf.put_i8(-1);
182 }
183 Some(v) => {
184 buf.put_i8(1);
185 v.encode(buf, version)?;
186 }
187 }
188 }
189 if version >= 0 {
190 {
191 crate::primitives::array::put_array_len(buf, (self.members).len(), flex);
192 for it in &self.members {
193 it.encode(buf, version)?;
194 }
195 }
196 }
197 if version >= 0 {
198 put_i32(buf, self.authorized_operations);
199 }
200 if flex {
201 let tagged = WriteTaggedFields::new();
202 tagged.write(buf, &self.unknown_tagged_fields);
203 }
204 Ok(())
205 }
206 fn encoded_len(&self, version: i16) -> usize {
207 let flex = version >= 0;
208 let mut n: usize = 0;
209 if version >= 0 {
210 n += 2;
211 }
212 if version >= 0 {
213 n += if flex {
214 compact_nullable_string_len(self.error_message.as_deref())
215 } else {
216 nullable_string_len(self.error_message.as_deref())
217 };
218 }
219 if version >= 0 {
220 n += if flex {
221 compact_string_len(&self.group_id)
222 } else {
223 string_len(&self.group_id)
224 };
225 }
226 if version >= 0 {
227 n += if flex {
228 compact_string_len(&self.group_state)
229 } else {
230 string_len(&self.group_state)
231 };
232 }
233 if version >= 0 {
234 n += 4;
235 }
236 if version >= 0 {
237 n += 4;
238 }
239 if version >= 0 {
240 n += 1 + self.topology.as_ref().map_or(0, |v| v.encoded_len(version));
241 }
242 if version >= 0 {
243 n += {
244 let prefix =
245 crate::primitives::array::array_len_prefix_len((self.members).len(), flex);
246 let body: usize = (self.members)
247 .iter()
248 .map(|it| it.encoded_len(version))
249 .sum();
250 prefix + body
251 };
252 }
253 if version >= 0 {
254 n += 4;
255 }
256 if flex {
257 let known_pairs: Vec<(u32, usize)> = Vec::new();
258 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
259 }
260 n
261 }
262}
263impl Decode<'_> for DescribedGroup {
264 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
265 let flex = version >= 0;
266 let mut out = Self::default();
267 if version >= 0 {
268 out.error_code = get_i16(buf)?;
269 }
270 if version >= 0 {
271 out.error_message = if flex {
272 get_compact_nullable_string_owned(buf)?
273 } else {
274 get_nullable_string_owned(buf)?
275 };
276 }
277 if version >= 0 {
278 out.group_id = if flex {
279 get_compact_string_owned(buf)?
280 } else {
281 get_string_owned(buf)?
282 };
283 }
284 if version >= 0 {
285 out.group_state = if flex {
286 get_compact_string_owned(buf)?
287 } else {
288 get_string_owned(buf)?
289 };
290 }
291 if version >= 0 {
292 out.group_epoch = get_i32(buf)?;
293 }
294 if version >= 0 {
295 out.assignment_epoch = get_i32(buf)?;
296 }
297 if version >= 0 {
298 out.topology = if get_i8(buf)? < 0 {
299 None
300 } else {
301 Some(Topology::decode(buf, version)?)
302 };
303 }
304 if version >= 0 {
305 out.members = {
306 let n = crate::primitives::array::get_array_len(buf, flex)?;
307 let mut v = Vec::with_capacity(n);
308 for _ in 0..n {
309 v.push(Member::decode(buf, version)?);
310 }
311 v
312 };
313 }
314 if version >= 0 {
315 out.authorized_operations = get_i32(buf)?;
316 }
317 if flex {
318 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
319 }
320 Ok(out)
321 }
322}
323#[cfg(test)]
324impl DescribedGroup {
325 #[must_use]
326 pub fn populated(version: i16) -> Self {
327 let mut m = Self::default();
328 if version >= 0 {
329 m.error_code = 1i16;
330 }
331 if version >= 0 {
332 m.error_message = Some("x".to_string());
333 }
334 if version >= 0 {
335 m.group_id = "x".to_string();
336 }
337 if version >= 0 {
338 m.group_state = "x".to_string();
339 }
340 if version >= 0 {
341 m.group_epoch = 1i32;
342 }
343 if version >= 0 {
344 m.assignment_epoch = 1i32;
345 }
346 if version >= 0 {
347 m.topology = Some(Topology::populated(version));
348 }
349 if version >= 0 {
350 m.members = vec![Member::populated(version)];
351 }
352 if version >= 0 {
353 m.authorized_operations = 1i32;
354 }
355 m
356 }
357}
358#[derive(Debug, Clone, PartialEq, Eq, Default)]
359pub struct Topology {
360 pub epoch: i32,
361 pub subtopologies: Option<Vec<Subtopology>>,
362 pub unknown_tagged_fields: UnknownTaggedFields,
363}
364impl Encode for Topology {
365 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
366 let flex = version >= 0;
367 if version >= 0 {
368 put_i32(buf, self.epoch);
369 }
370 if version >= 0 {
371 {
372 let len = (self.subtopologies).as_ref().map(Vec::len);
373 crate::primitives::array::put_nullable_array_len(buf, len, flex);
374 if let Some(v) = &self.subtopologies {
375 for it in v {
376 it.encode(buf, version)?;
377 }
378 }
379 }
380 }
381 if flex {
382 let tagged = WriteTaggedFields::new();
383 tagged.write(buf, &self.unknown_tagged_fields);
384 }
385 Ok(())
386 }
387 fn encoded_len(&self, version: i16) -> usize {
388 let flex = version >= 0;
389 let mut n: usize = 0;
390 if version >= 0 {
391 n += 4;
392 }
393 if version >= 0 {
394 n += {
395 let opt: Option<&Vec<_>> = (self.subtopologies).as_ref();
396 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
397 opt.map(std::vec::Vec::len),
398 flex,
399 );
400 let body: usize =
401 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
402 prefix + body
403 };
404 }
405 if flex {
406 let known_pairs: Vec<(u32, usize)> = Vec::new();
407 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
408 }
409 n
410 }
411}
412impl Decode<'_> for Topology {
413 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
414 let flex = version >= 0;
415 let mut out = Self::default();
416 if version >= 0 {
417 out.epoch = get_i32(buf)?;
418 }
419 if version >= 0 {
420 out.subtopologies = {
421 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
422 match opt {
423 None => None,
424 Some(n) => {
425 let mut v = Vec::with_capacity(n);
426 for _ in 0..n {
427 v.push(Subtopology::decode(buf, version)?);
428 }
429 Some(v)
430 }
431 }
432 };
433 }
434 if flex {
435 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
436 }
437 Ok(out)
438 }
439}
440#[cfg(test)]
441impl Topology {
442 #[must_use]
443 pub fn populated(version: i16) -> Self {
444 let mut m = Self::default();
445 if version >= 0 {
446 m.epoch = 1i32;
447 }
448 if version >= 0 {
449 m.subtopologies = Some(vec![Subtopology::populated(version)]);
450 }
451 m
452 }
453}
454#[derive(Debug, Clone, PartialEq, Eq, Default)]
455pub struct Subtopology {
456 pub subtopology_id: String,
457 pub source_topics: Vec<String>,
458 pub repartition_sink_topics: Vec<String>,
459 pub state_changelog_topics:
460 Vec<super::common::streams_group_describe_response::topic_info::TopicInfo>,
461 pub repartition_source_topics:
462 Vec<super::common::streams_group_describe_response::topic_info::TopicInfo>,
463 pub unknown_tagged_fields: UnknownTaggedFields,
464}
465impl Encode for Subtopology {
466 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
467 let flex = version >= 0;
468 if version >= 0 {
469 if flex {
470 put_compact_string(buf, &self.subtopology_id);
471 } else {
472 put_string(buf, &self.subtopology_id);
473 }
474 }
475 if version >= 0 {
476 {
477 crate::primitives::array::put_array_len(buf, (self.source_topics).len(), flex);
478 for it in &self.source_topics {
479 if flex {
480 put_compact_string(buf, it);
481 } else {
482 put_string(buf, it);
483 }
484 }
485 }
486 }
487 if version >= 0 {
488 {
489 crate::primitives::array::put_array_len(
490 buf,
491 (self.repartition_sink_topics).len(),
492 flex,
493 );
494 for it in &self.repartition_sink_topics {
495 if flex {
496 put_compact_string(buf, it);
497 } else {
498 put_string(buf, it);
499 }
500 }
501 }
502 }
503 if version >= 0 {
504 {
505 crate::primitives::array::put_array_len(
506 buf,
507 (self.state_changelog_topics).len(),
508 flex,
509 );
510 for it in &self.state_changelog_topics {
511 it.encode(buf, version)?;
512 }
513 }
514 }
515 if version >= 0 {
516 {
517 crate::primitives::array::put_array_len(
518 buf,
519 (self.repartition_source_topics).len(),
520 flex,
521 );
522 for it in &self.repartition_source_topics {
523 it.encode(buf, version)?;
524 }
525 }
526 }
527 if flex {
528 let tagged = WriteTaggedFields::new();
529 tagged.write(buf, &self.unknown_tagged_fields);
530 }
531 Ok(())
532 }
533 fn encoded_len(&self, version: i16) -> usize {
534 let flex = version >= 0;
535 let mut n: usize = 0;
536 if version >= 0 {
537 n += if flex {
538 compact_string_len(&self.subtopology_id)
539 } else {
540 string_len(&self.subtopology_id)
541 };
542 }
543 if version >= 0 {
544 n += {
545 let prefix = crate::primitives::array::array_len_prefix_len(
546 (self.source_topics).len(),
547 flex,
548 );
549 let body: usize = (self.source_topics)
550 .iter()
551 .map(|it| {
552 if flex {
553 compact_string_len(it)
554 } else {
555 string_len(it)
556 }
557 })
558 .sum();
559 prefix + body
560 };
561 }
562 if version >= 0 {
563 n += {
564 let prefix = crate::primitives::array::array_len_prefix_len(
565 (self.repartition_sink_topics).len(),
566 flex,
567 );
568 let body: usize = (self.repartition_sink_topics)
569 .iter()
570 .map(|it| {
571 if flex {
572 compact_string_len(it)
573 } else {
574 string_len(it)
575 }
576 })
577 .sum();
578 prefix + body
579 };
580 }
581 if version >= 0 {
582 n += {
583 let prefix = crate::primitives::array::array_len_prefix_len(
584 (self.state_changelog_topics).len(),
585 flex,
586 );
587 let body: usize = (self.state_changelog_topics)
588 .iter()
589 .map(|it| it.encoded_len(version))
590 .sum();
591 prefix + body
592 };
593 }
594 if version >= 0 {
595 n += {
596 let prefix = crate::primitives::array::array_len_prefix_len(
597 (self.repartition_source_topics).len(),
598 flex,
599 );
600 let body: usize = (self.repartition_source_topics)
601 .iter()
602 .map(|it| it.encoded_len(version))
603 .sum();
604 prefix + body
605 };
606 }
607 if flex {
608 let known_pairs: Vec<(u32, usize)> = Vec::new();
609 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
610 }
611 n
612 }
613}
614impl Decode<'_> for Subtopology {
615 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
616 let flex = version >= 0;
617 let mut out = Self::default();
618 if version >= 0 {
619 out.subtopology_id = if flex {
620 get_compact_string_owned(buf)?
621 } else {
622 get_string_owned(buf)?
623 };
624 }
625 if version >= 0 {
626 out.source_topics = {
627 let n = crate::primitives::array::get_array_len(buf, flex)?;
628 let mut v = Vec::with_capacity(n);
629 for _ in 0..n {
630 v.push(if flex {
631 get_compact_string_owned(buf)?
632 } else {
633 get_string_owned(buf)?
634 });
635 }
636 v
637 };
638 }
639 if version >= 0 {
640 out.repartition_sink_topics = {
641 let n = crate::primitives::array::get_array_len(buf, flex)?;
642 let mut v = Vec::with_capacity(n);
643 for _ in 0..n {
644 v.push(if flex {
645 get_compact_string_owned(buf)?
646 } else {
647 get_string_owned(buf)?
648 });
649 }
650 v
651 };
652 }
653 if version >= 0 {
654 out.state_changelog_topics = {
655 let n = crate::primitives::array::get_array_len(buf, flex)?;
656 let mut v = Vec::with_capacity(n);
657 for _ in 0..n {
658 v.push(
659 super::common::streams_group_describe_response::topic_info::TopicInfo::decode(
660 buf,
661 version,
662 )?,
663 );
664 }
665 v
666 };
667 }
668 if version >= 0 {
669 out.repartition_source_topics = {
670 let n = crate::primitives::array::get_array_len(buf, flex)?;
671 let mut v = Vec::with_capacity(n);
672 for _ in 0..n {
673 v.push(
674 super::common::streams_group_describe_response::topic_info::TopicInfo::decode(
675 buf,
676 version,
677 )?,
678 );
679 }
680 v
681 };
682 }
683 if flex {
684 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
685 }
686 Ok(out)
687 }
688}
689#[cfg(test)]
690impl Subtopology {
691 #[must_use]
692 pub fn populated(version: i16) -> Self {
693 let mut m = Self::default();
694 if version >= 0 {
695 m.subtopology_id = "x".to_string();
696 }
697 if version >= 0 {
698 m.source_topics = vec!["x".to_string()];
699 }
700 if version >= 0 {
701 m.repartition_sink_topics = vec!["x".to_string()];
702 }
703 if version >= 0 {
704 m.state_changelog_topics = vec![
705 super::common::streams_group_describe_response::topic_info::TopicInfo::populated(
706 version,
707 ),
708 ];
709 }
710 if version >= 0 {
711 m.repartition_source_topics = vec![
712 super::common::streams_group_describe_response::topic_info::TopicInfo::populated(
713 version,
714 ),
715 ];
716 }
717 m
718 }
719}
720#[derive(Debug, Clone, PartialEq, Eq, Default)]
721pub struct Member {
722 pub member_id: String,
723 pub member_epoch: i32,
724 pub instance_id: Option<String>,
725 pub rack_id: Option<String>,
726 pub client_id: String,
727 pub client_host: String,
728 pub topology_epoch: i32,
729 pub process_id: String,
730 pub user_endpoint: Option<super::common::streams_group_describe_response::endpoint::Endpoint>,
731 pub client_tags: Vec<super::common::streams_group_describe_response::key_value::KeyValue>,
732 pub task_offsets: Vec<super::common::streams_group_describe_response::task_offset::TaskOffset>,
733 pub task_end_offsets:
734 Vec<super::common::streams_group_describe_response::task_offset::TaskOffset>,
735 pub assignment: super::common::streams_group_describe_response::assignment::Assignment,
736 pub target_assignment: super::common::streams_group_describe_response::assignment::Assignment,
737 pub is_classic: bool,
738 pub unknown_tagged_fields: UnknownTaggedFields,
739}
740impl Encode for Member {
741 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
742 let flex = version >= 0;
743 if version >= 0 {
744 if flex {
745 put_compact_string(buf, &self.member_id);
746 } else {
747 put_string(buf, &self.member_id);
748 }
749 }
750 if version >= 0 {
751 put_i32(buf, self.member_epoch);
752 }
753 if version >= 0 {
754 if flex {
755 put_compact_nullable_string(buf, self.instance_id.as_deref());
756 } else {
757 put_nullable_string(buf, self.instance_id.as_deref());
758 }
759 }
760 if version >= 0 {
761 if flex {
762 put_compact_nullable_string(buf, self.rack_id.as_deref());
763 } else {
764 put_nullable_string(buf, self.rack_id.as_deref());
765 }
766 }
767 if version >= 0 {
768 if flex {
769 put_compact_string(buf, &self.client_id);
770 } else {
771 put_string(buf, &self.client_id);
772 }
773 }
774 if version >= 0 {
775 if flex {
776 put_compact_string(buf, &self.client_host);
777 } else {
778 put_string(buf, &self.client_host);
779 }
780 }
781 if version >= 0 {
782 put_i32(buf, self.topology_epoch);
783 }
784 if version >= 0 {
785 if flex {
786 put_compact_string(buf, &self.process_id);
787 } else {
788 put_string(buf, &self.process_id);
789 }
790 }
791 if version >= 0 {
792 match &self.user_endpoint {
793 None => {
794 buf.put_i8(-1);
795 }
796 Some(v) => {
797 buf.put_i8(1);
798 v.encode(buf, version)?;
799 }
800 }
801 }
802 if version >= 0 {
803 {
804 crate::primitives::array::put_array_len(buf, (self.client_tags).len(), flex);
805 for it in &self.client_tags {
806 it.encode(buf, version)?;
807 }
808 }
809 }
810 if version >= 0 {
811 {
812 crate::primitives::array::put_array_len(buf, (self.task_offsets).len(), flex);
813 for it in &self.task_offsets {
814 it.encode(buf, version)?;
815 }
816 }
817 }
818 if version >= 0 {
819 {
820 crate::primitives::array::put_array_len(buf, (self.task_end_offsets).len(), flex);
821 for it in &self.task_end_offsets {
822 it.encode(buf, version)?;
823 }
824 }
825 }
826 if version >= 0 {
827 self.assignment.encode(buf, version)?;
828 }
829 if version >= 0 {
830 self.target_assignment.encode(buf, version)?;
831 }
832 if version >= 0 {
833 put_bool(buf, self.is_classic);
834 }
835 if flex {
836 let tagged = WriteTaggedFields::new();
837 tagged.write(buf, &self.unknown_tagged_fields);
838 }
839 Ok(())
840 }
841 fn encoded_len(&self, version: i16) -> usize {
842 let flex = version >= 0;
843 let mut n: usize = 0;
844 if version >= 0 {
845 n += if flex {
846 compact_string_len(&self.member_id)
847 } else {
848 string_len(&self.member_id)
849 };
850 }
851 if version >= 0 {
852 n += 4;
853 }
854 if version >= 0 {
855 n += if flex {
856 compact_nullable_string_len(self.instance_id.as_deref())
857 } else {
858 nullable_string_len(self.instance_id.as_deref())
859 };
860 }
861 if version >= 0 {
862 n += if flex {
863 compact_nullable_string_len(self.rack_id.as_deref())
864 } else {
865 nullable_string_len(self.rack_id.as_deref())
866 };
867 }
868 if version >= 0 {
869 n += if flex {
870 compact_string_len(&self.client_id)
871 } else {
872 string_len(&self.client_id)
873 };
874 }
875 if version >= 0 {
876 n += if flex {
877 compact_string_len(&self.client_host)
878 } else {
879 string_len(&self.client_host)
880 };
881 }
882 if version >= 0 {
883 n += 4;
884 }
885 if version >= 0 {
886 n += if flex {
887 compact_string_len(&self.process_id)
888 } else {
889 string_len(&self.process_id)
890 };
891 }
892 if version >= 0 {
893 n += 1 + self
894 .user_endpoint
895 .as_ref()
896 .map_or(0, |v| v.encoded_len(version));
897 }
898 if version >= 0 {
899 n += {
900 let prefix =
901 crate::primitives::array::array_len_prefix_len((self.client_tags).len(), flex);
902 let body: usize = (self.client_tags)
903 .iter()
904 .map(|it| it.encoded_len(version))
905 .sum();
906 prefix + body
907 };
908 }
909 if version >= 0 {
910 n += {
911 let prefix =
912 crate::primitives::array::array_len_prefix_len((self.task_offsets).len(), flex);
913 let body: usize = (self.task_offsets)
914 .iter()
915 .map(|it| it.encoded_len(version))
916 .sum();
917 prefix + body
918 };
919 }
920 if version >= 0 {
921 n += {
922 let prefix = crate::primitives::array::array_len_prefix_len(
923 (self.task_end_offsets).len(),
924 flex,
925 );
926 let body: usize = (self.task_end_offsets)
927 .iter()
928 .map(|it| it.encoded_len(version))
929 .sum();
930 prefix + body
931 };
932 }
933 if version >= 0 {
934 n += self.assignment.encoded_len(version);
935 }
936 if version >= 0 {
937 n += self.target_assignment.encoded_len(version);
938 }
939 if version >= 0 {
940 n += 1;
941 }
942 if flex {
943 let known_pairs: Vec<(u32, usize)> = Vec::new();
944 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
945 }
946 n
947 }
948}
949impl Decode<'_> for Member {
950 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
951 let flex = version >= 0;
952 let mut out = Self::default();
953 if version >= 0 {
954 out.member_id = if flex {
955 get_compact_string_owned(buf)?
956 } else {
957 get_string_owned(buf)?
958 };
959 }
960 if version >= 0 {
961 out.member_epoch = get_i32(buf)?;
962 }
963 if version >= 0 {
964 out.instance_id = if flex {
965 get_compact_nullable_string_owned(buf)?
966 } else {
967 get_nullable_string_owned(buf)?
968 };
969 }
970 if version >= 0 {
971 out.rack_id = if flex {
972 get_compact_nullable_string_owned(buf)?
973 } else {
974 get_nullable_string_owned(buf)?
975 };
976 }
977 if version >= 0 {
978 out.client_id = if flex {
979 get_compact_string_owned(buf)?
980 } else {
981 get_string_owned(buf)?
982 };
983 }
984 if version >= 0 {
985 out.client_host = if flex {
986 get_compact_string_owned(buf)?
987 } else {
988 get_string_owned(buf)?
989 };
990 }
991 if version >= 0 {
992 out.topology_epoch = get_i32(buf)?;
993 }
994 if version >= 0 {
995 out.process_id = if flex {
996 get_compact_string_owned(buf)?
997 } else {
998 get_string_owned(buf)?
999 };
1000 }
1001 if version >= 0 {
1002 out.user_endpoint = if get_i8(buf)? < 0 {
1003 None
1004 } else {
1005 Some(
1006 super::common::streams_group_describe_response::endpoint::Endpoint::decode(
1007 buf, version,
1008 )?,
1009 )
1010 };
1011 }
1012 if version >= 0 {
1013 out.client_tags = {
1014 let n = crate::primitives::array::get_array_len(buf, flex)?;
1015 let mut v = Vec::with_capacity(n);
1016 for _ in 0..n {
1017 v.push(
1018 super::common::streams_group_describe_response::key_value::KeyValue::decode(
1019 buf,
1020 version,
1021 )?,
1022 );
1023 }
1024 v
1025 };
1026 }
1027 if version >= 0 {
1028 out.task_offsets = {
1029 let n = crate::primitives::array::get_array_len(buf, flex)?;
1030 let mut v = Vec::with_capacity(n);
1031 for _ in 0..n {
1032 v.push(
1033 super::common::streams_group_describe_response::task_offset::TaskOffset::decode(
1034 buf,
1035 version,
1036 )?,
1037 );
1038 }
1039 v
1040 };
1041 }
1042 if version >= 0 {
1043 out.task_end_offsets = {
1044 let n = crate::primitives::array::get_array_len(buf, flex)?;
1045 let mut v = Vec::with_capacity(n);
1046 for _ in 0..n {
1047 v.push(
1048 super::common::streams_group_describe_response::task_offset::TaskOffset::decode(
1049 buf,
1050 version,
1051 )?,
1052 );
1053 }
1054 v
1055 };
1056 }
1057 if version >= 0 {
1058 out.assignment =
1059 super::common::streams_group_describe_response::assignment::Assignment::decode(
1060 buf, version,
1061 )?;
1062 }
1063 if version >= 0 {
1064 out.target_assignment =
1065 super::common::streams_group_describe_response::assignment::Assignment::decode(
1066 buf, version,
1067 )?;
1068 }
1069 if version >= 0 {
1070 out.is_classic = get_bool(buf)?;
1071 }
1072 if flex {
1073 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
1074 }
1075 Ok(out)
1076 }
1077}
1078#[cfg(test)]
1079impl Member {
1080 #[must_use]
1081 pub fn populated(version: i16) -> Self {
1082 let mut m = Self::default();
1083 if version >= 0 {
1084 m.member_id = "x".to_string();
1085 }
1086 if version >= 0 {
1087 m.member_epoch = 1i32;
1088 }
1089 if version >= 0 {
1090 m.instance_id = Some("x".to_string());
1091 }
1092 if version >= 0 {
1093 m.rack_id = Some("x".to_string());
1094 }
1095 if version >= 0 {
1096 m.client_id = "x".to_string();
1097 }
1098 if version >= 0 {
1099 m.client_host = "x".to_string();
1100 }
1101 if version >= 0 {
1102 m.topology_epoch = 1i32;
1103 }
1104 if version >= 0 {
1105 m.process_id = "x".to_string();
1106 }
1107 if version >= 0 {
1108 m.user_endpoint = Some(
1109 super::common::streams_group_describe_response::endpoint::Endpoint::populated(
1110 version,
1111 ),
1112 );
1113 }
1114 if version >= 0 {
1115 m.client_tags = vec![
1116 super::common::streams_group_describe_response::key_value::KeyValue::populated(
1117 version,
1118 ),
1119 ];
1120 }
1121 if version >= 0 {
1122 m.task_offsets = vec![
1123 super::common::streams_group_describe_response::task_offset::TaskOffset::populated(
1124 version,
1125 ),
1126 ];
1127 }
1128 if version >= 0 {
1129 m.task_end_offsets = vec![
1130 super::common::streams_group_describe_response::task_offset::TaskOffset::populated(
1131 version,
1132 ),
1133 ];
1134 }
1135 if version >= 0 {
1136 m.assignment =
1137 super::common::streams_group_describe_response::assignment::Assignment::populated(
1138 version,
1139 );
1140 }
1141 if version >= 0 {
1142 m.target_assignment =
1143 super::common::streams_group_describe_response::assignment::Assignment::populated(
1144 version,
1145 );
1146 }
1147 if version >= 0 {
1148 m.is_classic = true;
1149 }
1150 m
1151 }
1152}
1153#[must_use]
1156#[allow(unused_comparisons)]
1157pub fn default_json(version: i16) -> ::serde_json::Value {
1158 let mut obj = ::serde_json::Map::new();
1159 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
1160 obj.insert("groups".to_string(), ::serde_json::Value::Array(vec![]));
1161 ::serde_json::Value::Object(obj)
1162}