1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_bool, get_i8, get_i16, get_i32, put_bool, put_i16, put_i32};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
13
14pub const API_KEY: i16 = 89;
15pub const MIN_VERSION: i16 = 0;
16pub const MAX_VERSION: i16 = 0;
17pub const FLEXIBLE_MIN: i16 = 0;
18
19#[inline]
20fn is_flexible(version: i16) -> bool {
21 version >= FLEXIBLE_MIN
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Default)]
25pub struct StreamsGroupDescribeResponse {
26 pub throttle_time_ms: i32,
27 pub groups: Vec<DescribedGroup>,
28 pub unknown_tagged_fields: UnknownTaggedFields,
29}
30impl Encode for StreamsGroupDescribeResponse {
31 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
32 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
33 return Err(ProtocolError::UnsupportedVersion {
34 api_key: API_KEY,
35 version,
36 });
37 }
38 let flex = is_flexible(version);
39 if version >= 0 {
40 put_i32(buf, self.throttle_time_ms);
41 }
42 if version >= 0 {
43 {
44 crate::primitives::array::put_array_len(buf, (self.groups).len(), flex);
45 for it in &self.groups {
46 it.encode(buf, version)?;
47 }
48 }
49 }
50 if flex {
51 let tagged = WriteTaggedFields::new();
52 tagged.write(buf, &self.unknown_tagged_fields);
53 }
54 Ok(())
55 }
56 fn encoded_len(&self, version: i16) -> usize {
57 let flex = is_flexible(version);
58 let mut n: usize = 0;
59 if version >= 0 {
60 n += 4;
61 }
62 if version >= 0 {
63 n += {
64 let prefix =
65 crate::primitives::array::array_len_prefix_len((self.groups).len(), flex);
66 let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum();
67 prefix + body
68 };
69 }
70 if flex {
71 let known_pairs: Vec<(u32, usize)> = Vec::new();
72 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
73 }
74 n
75 }
76}
77impl Decode<'_> for StreamsGroupDescribeResponse {
78 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
79 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
80 return Err(ProtocolError::UnsupportedVersion {
81 api_key: API_KEY,
82 version,
83 });
84 }
85 let flex = is_flexible(version);
86 let mut out = Self::default();
87 if version >= 0 {
88 out.throttle_time_ms = get_i32(buf)?;
89 }
90 if version >= 0 {
91 out.groups = {
92 let n = crate::primitives::array::get_array_len(buf, flex)?;
93 let mut v = Vec::with_capacity(n);
94 for _ in 0..n {
95 v.push(DescribedGroup::decode(buf, version)?);
96 }
97 v
98 };
99 }
100 if flex {
101 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
102 }
103 Ok(out)
104 }
105}
106#[cfg(test)]
107impl StreamsGroupDescribeResponse {
108 #[must_use]
109 pub fn populated(version: i16) -> Self {
110 let mut m = Self::default();
111 if version >= 0 {
112 m.throttle_time_ms = 1i32;
113 }
114 if version >= 0 {
115 m.groups = vec![DescribedGroup::populated(version)];
116 }
117 m
118 }
119}
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub struct DescribedGroup {
122 pub error_code: i16,
123 pub error_message: Option<String>,
124 pub group_id: String,
125 pub group_state: String,
126 pub group_epoch: i32,
127 pub assignment_epoch: i32,
128 pub topology: Option<Topology>,
129 pub members: Vec<Member>,
130 pub authorized_operations: i32,
131 pub unknown_tagged_fields: UnknownTaggedFields,
132}
133impl Default for DescribedGroup {
134 fn default() -> Self {
135 Self {
136 error_code: 0i16,
137 error_message: None,
138 group_id: String::new(),
139 group_state: String::new(),
140 group_epoch: 0i32,
141 assignment_epoch: 0i32,
142 topology: None,
143 members: Vec::new(),
144 authorized_operations: -2_147_483_648i32,
145 unknown_tagged_fields: Default::default(),
146 }
147 }
148}
149impl Encode for DescribedGroup {
150 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
151 let flex = version >= 0;
152 if version >= 0 {
153 put_i16(buf, self.error_code);
154 }
155 if version >= 0 {
156 if flex {
157 put_compact_nullable_string(buf, self.error_message.as_deref());
158 } else {
159 put_nullable_string(buf, self.error_message.as_deref());
160 }
161 }
162 if version >= 0 {
163 if flex {
164 put_compact_string(buf, &self.group_id);
165 } else {
166 put_string(buf, &self.group_id);
167 }
168 }
169 if version >= 0 {
170 if flex {
171 put_compact_string(buf, &self.group_state);
172 } else {
173 put_string(buf, &self.group_state);
174 }
175 }
176 if version >= 0 {
177 put_i32(buf, self.group_epoch);
178 }
179 if version >= 0 {
180 put_i32(buf, self.assignment_epoch);
181 }
182 if version >= 0 {
183 match &self.topology {
184 None => {
185 buf.put_i8(-1);
186 }
187 Some(v) => {
188 buf.put_i8(1);
189 v.encode(buf, version)?;
190 }
191 }
192 }
193 if version >= 0 {
194 {
195 crate::primitives::array::put_array_len(buf, (self.members).len(), flex);
196 for it in &self.members {
197 it.encode(buf, version)?;
198 }
199 }
200 }
201 if version >= 0 {
202 put_i32(buf, self.authorized_operations);
203 }
204 if flex {
205 let tagged = WriteTaggedFields::new();
206 tagged.write(buf, &self.unknown_tagged_fields);
207 }
208 Ok(())
209 }
210 fn encoded_len(&self, version: i16) -> usize {
211 let flex = version >= 0;
212 let mut n: usize = 0;
213 if version >= 0 {
214 n += 2;
215 }
216 if version >= 0 {
217 n += if flex {
218 compact_nullable_string_len(self.error_message.as_deref())
219 } else {
220 nullable_string_len(self.error_message.as_deref())
221 };
222 }
223 if version >= 0 {
224 n += if flex {
225 compact_string_len(&self.group_id)
226 } else {
227 string_len(&self.group_id)
228 };
229 }
230 if version >= 0 {
231 n += if flex {
232 compact_string_len(&self.group_state)
233 } else {
234 string_len(&self.group_state)
235 };
236 }
237 if version >= 0 {
238 n += 4;
239 }
240 if version >= 0 {
241 n += 4;
242 }
243 if version >= 0 {
244 n += 1 + self.topology.as_ref().map_or(0, |v| v.encoded_len(version));
245 }
246 if version >= 0 {
247 n += {
248 let prefix =
249 crate::primitives::array::array_len_prefix_len((self.members).len(), flex);
250 let body: usize = (self.members)
251 .iter()
252 .map(|it| it.encoded_len(version))
253 .sum();
254 prefix + body
255 };
256 }
257 if version >= 0 {
258 n += 4;
259 }
260 if flex {
261 let known_pairs: Vec<(u32, usize)> = Vec::new();
262 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
263 }
264 n
265 }
266}
267impl Decode<'_> for DescribedGroup {
268 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
269 let flex = version >= 0;
270 let mut out = Self::default();
271 if version >= 0 {
272 out.error_code = get_i16(buf)?;
273 }
274 if version >= 0 {
275 out.error_message = if flex {
276 get_compact_nullable_string_owned(buf)?
277 } else {
278 get_nullable_string_owned(buf)?
279 };
280 }
281 if version >= 0 {
282 out.group_id = if flex {
283 get_compact_string_owned(buf)?
284 } else {
285 get_string_owned(buf)?
286 };
287 }
288 if version >= 0 {
289 out.group_state = if flex {
290 get_compact_string_owned(buf)?
291 } else {
292 get_string_owned(buf)?
293 };
294 }
295 if version >= 0 {
296 out.group_epoch = get_i32(buf)?;
297 }
298 if version >= 0 {
299 out.assignment_epoch = get_i32(buf)?;
300 }
301 if version >= 0 {
302 out.topology = if get_i8(buf)? < 0 {
303 None
304 } else {
305 Some(Topology::decode(buf, version)?)
306 };
307 }
308 if version >= 0 {
309 out.members = {
310 let n = crate::primitives::array::get_array_len(buf, flex)?;
311 let mut v = Vec::with_capacity(n);
312 for _ in 0..n {
313 v.push(Member::decode(buf, version)?);
314 }
315 v
316 };
317 }
318 if version >= 0 {
319 out.authorized_operations = get_i32(buf)?;
320 }
321 if flex {
322 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
323 }
324 Ok(out)
325 }
326}
327#[cfg(test)]
328impl DescribedGroup {
329 #[must_use]
330 pub fn populated(version: i16) -> Self {
331 let mut m = Self::default();
332 if version >= 0 {
333 m.error_code = 1i16;
334 }
335 if version >= 0 {
336 m.error_message = Some("x".to_string());
337 }
338 if version >= 0 {
339 m.group_id = "x".to_string();
340 }
341 if version >= 0 {
342 m.group_state = "x".to_string();
343 }
344 if version >= 0 {
345 m.group_epoch = 1i32;
346 }
347 if version >= 0 {
348 m.assignment_epoch = 1i32;
349 }
350 if version >= 0 {
351 m.topology = Some(Topology::populated(version));
352 }
353 if version >= 0 {
354 m.members = vec![Member::populated(version)];
355 }
356 if version >= 0 {
357 m.authorized_operations = 1i32;
358 }
359 m
360 }
361}
362#[derive(Debug, Clone, PartialEq, Eq, Default)]
363pub struct Topology {
364 pub epoch: i32,
365 pub subtopologies: Option<Vec<Subtopology>>,
366 pub unknown_tagged_fields: UnknownTaggedFields,
367}
368impl Encode for Topology {
369 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
370 let flex = version >= 0;
371 if version >= 0 {
372 put_i32(buf, self.epoch);
373 }
374 if version >= 0 {
375 {
376 let len = (self.subtopologies).as_ref().map(Vec::len);
377 crate::primitives::array::put_nullable_array_len(buf, len, flex);
378 if let Some(v) = &self.subtopologies {
379 for it in v {
380 it.encode(buf, version)?;
381 }
382 }
383 }
384 }
385 if flex {
386 let tagged = WriteTaggedFields::new();
387 tagged.write(buf, &self.unknown_tagged_fields);
388 }
389 Ok(())
390 }
391 fn encoded_len(&self, version: i16) -> usize {
392 let flex = version >= 0;
393 let mut n: usize = 0;
394 if version >= 0 {
395 n += 4;
396 }
397 if version >= 0 {
398 n += {
399 let opt: Option<&Vec<_>> = (self.subtopologies).as_ref();
400 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
401 opt.map(std::vec::Vec::len),
402 flex,
403 );
404 let body: usize =
405 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
406 prefix + body
407 };
408 }
409 if flex {
410 let known_pairs: Vec<(u32, usize)> = Vec::new();
411 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
412 }
413 n
414 }
415}
416impl Decode<'_> for Topology {
417 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
418 let flex = version >= 0;
419 let mut out = Self::default();
420 if version >= 0 {
421 out.epoch = get_i32(buf)?;
422 }
423 if version >= 0 {
424 out.subtopologies = {
425 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
426 match opt {
427 None => None,
428 Some(n) => {
429 let mut v = Vec::with_capacity(n);
430 for _ in 0..n {
431 v.push(Subtopology::decode(buf, version)?);
432 }
433 Some(v)
434 }
435 }
436 };
437 }
438 if flex {
439 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
440 }
441 Ok(out)
442 }
443}
444#[cfg(test)]
445impl Topology {
446 #[must_use]
447 pub fn populated(version: i16) -> Self {
448 let mut m = Self::default();
449 if version >= 0 {
450 m.epoch = 1i32;
451 }
452 if version >= 0 {
453 m.subtopologies = Some(vec![Subtopology::populated(version)]);
454 }
455 m
456 }
457}
458#[derive(Debug, Clone, PartialEq, Eq, Default)]
459pub struct Subtopology {
460 pub subtopology_id: String,
461 pub source_topics: Vec<String>,
462 pub repartition_sink_topics: Vec<String>,
463 pub state_changelog_topics:
464 Vec<super::common::streams_group_describe_response::topic_info::TopicInfo>,
465 pub repartition_source_topics:
466 Vec<super::common::streams_group_describe_response::topic_info::TopicInfo>,
467 pub unknown_tagged_fields: UnknownTaggedFields,
468}
469impl Encode for Subtopology {
470 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
471 let flex = version >= 0;
472 if version >= 0 {
473 if flex {
474 put_compact_string(buf, &self.subtopology_id);
475 } else {
476 put_string(buf, &self.subtopology_id);
477 }
478 }
479 if version >= 0 {
480 {
481 crate::primitives::array::put_array_len(buf, (self.source_topics).len(), flex);
482 for it in &self.source_topics {
483 if flex {
484 put_compact_string(buf, it);
485 } else {
486 put_string(buf, it);
487 }
488 }
489 }
490 }
491 if version >= 0 {
492 {
493 crate::primitives::array::put_array_len(
494 buf,
495 (self.repartition_sink_topics).len(),
496 flex,
497 );
498 for it in &self.repartition_sink_topics {
499 if flex {
500 put_compact_string(buf, it);
501 } else {
502 put_string(buf, it);
503 }
504 }
505 }
506 }
507 if version >= 0 {
508 {
509 crate::primitives::array::put_array_len(
510 buf,
511 (self.state_changelog_topics).len(),
512 flex,
513 );
514 for it in &self.state_changelog_topics {
515 it.encode(buf, version)?;
516 }
517 }
518 }
519 if version >= 0 {
520 {
521 crate::primitives::array::put_array_len(
522 buf,
523 (self.repartition_source_topics).len(),
524 flex,
525 );
526 for it in &self.repartition_source_topics {
527 it.encode(buf, version)?;
528 }
529 }
530 }
531 if flex {
532 let tagged = WriteTaggedFields::new();
533 tagged.write(buf, &self.unknown_tagged_fields);
534 }
535 Ok(())
536 }
537 fn encoded_len(&self, version: i16) -> usize {
538 let flex = version >= 0;
539 let mut n: usize = 0;
540 if version >= 0 {
541 n += if flex {
542 compact_string_len(&self.subtopology_id)
543 } else {
544 string_len(&self.subtopology_id)
545 };
546 }
547 if version >= 0 {
548 n += {
549 let prefix = crate::primitives::array::array_len_prefix_len(
550 (self.source_topics).len(),
551 flex,
552 );
553 let body: usize = (self.source_topics)
554 .iter()
555 .map(|it| {
556 if flex {
557 compact_string_len(it)
558 } else {
559 string_len(it)
560 }
561 })
562 .sum();
563 prefix + body
564 };
565 }
566 if version >= 0 {
567 n += {
568 let prefix = crate::primitives::array::array_len_prefix_len(
569 (self.repartition_sink_topics).len(),
570 flex,
571 );
572 let body: usize = (self.repartition_sink_topics)
573 .iter()
574 .map(|it| {
575 if flex {
576 compact_string_len(it)
577 } else {
578 string_len(it)
579 }
580 })
581 .sum();
582 prefix + body
583 };
584 }
585 if version >= 0 {
586 n += {
587 let prefix = crate::primitives::array::array_len_prefix_len(
588 (self.state_changelog_topics).len(),
589 flex,
590 );
591 let body: usize = (self.state_changelog_topics)
592 .iter()
593 .map(|it| it.encoded_len(version))
594 .sum();
595 prefix + body
596 };
597 }
598 if version >= 0 {
599 n += {
600 let prefix = crate::primitives::array::array_len_prefix_len(
601 (self.repartition_source_topics).len(),
602 flex,
603 );
604 let body: usize = (self.repartition_source_topics)
605 .iter()
606 .map(|it| it.encoded_len(version))
607 .sum();
608 prefix + body
609 };
610 }
611 if flex {
612 let known_pairs: Vec<(u32, usize)> = Vec::new();
613 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
614 }
615 n
616 }
617}
618impl Decode<'_> for Subtopology {
619 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
620 let flex = version >= 0;
621 let mut out = Self::default();
622 if version >= 0 {
623 out.subtopology_id = if flex {
624 get_compact_string_owned(buf)?
625 } else {
626 get_string_owned(buf)?
627 };
628 }
629 if version >= 0 {
630 out.source_topics = {
631 let n = crate::primitives::array::get_array_len(buf, flex)?;
632 let mut v = Vec::with_capacity(n);
633 for _ in 0..n {
634 v.push(if flex {
635 get_compact_string_owned(buf)?
636 } else {
637 get_string_owned(buf)?
638 });
639 }
640 v
641 };
642 }
643 if version >= 0 {
644 out.repartition_sink_topics = {
645 let n = crate::primitives::array::get_array_len(buf, flex)?;
646 let mut v = Vec::with_capacity(n);
647 for _ in 0..n {
648 v.push(if flex {
649 get_compact_string_owned(buf)?
650 } else {
651 get_string_owned(buf)?
652 });
653 }
654 v
655 };
656 }
657 if version >= 0 {
658 out.state_changelog_topics = {
659 let n = crate::primitives::array::get_array_len(buf, flex)?;
660 let mut v = Vec::with_capacity(n);
661 for _ in 0..n {
662 v . push (super :: common :: streams_group_describe_response :: topic_info :: TopicInfo :: decode (buf , version) ?) ;
663 }
664 v
665 };
666 }
667 if version >= 0 {
668 out.repartition_source_topics = {
669 let n = crate::primitives::array::get_array_len(buf, flex)?;
670 let mut v = Vec::with_capacity(n);
671 for _ in 0..n {
672 v . push (super :: common :: streams_group_describe_response :: topic_info :: TopicInfo :: decode (buf , version) ?) ;
673 }
674 v
675 };
676 }
677 if flex {
678 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
679 }
680 Ok(out)
681 }
682}
683#[cfg(test)]
684impl Subtopology {
685 #[must_use]
686 pub fn populated(version: i16) -> Self {
687 let mut m = Self::default();
688 if version >= 0 {
689 m.subtopology_id = "x".to_string();
690 }
691 if version >= 0 {
692 m.source_topics = vec!["x".to_string()];
693 }
694 if version >= 0 {
695 m.repartition_sink_topics = vec!["x".to_string()];
696 }
697 if version >= 0 {
698 m.state_changelog_topics = vec![
699 super::common::streams_group_describe_response::topic_info::TopicInfo::populated(
700 version,
701 ),
702 ];
703 }
704 if version >= 0 {
705 m.repartition_source_topics = vec![
706 super::common::streams_group_describe_response::topic_info::TopicInfo::populated(
707 version,
708 ),
709 ];
710 }
711 m
712 }
713}
714#[derive(Debug, Clone, PartialEq, Eq, Default)]
715pub struct Member {
716 pub member_id: String,
717 pub member_epoch: i32,
718 pub instance_id: Option<String>,
719 pub rack_id: Option<String>,
720 pub client_id: String,
721 pub client_host: String,
722 pub topology_epoch: i32,
723 pub process_id: String,
724 pub user_endpoint: Option<super::common::streams_group_describe_response::endpoint::Endpoint>,
725 pub client_tags: Vec<super::common::streams_group_describe_response::key_value::KeyValue>,
726 pub task_offsets: Vec<super::common::streams_group_describe_response::task_offset::TaskOffset>,
727 pub task_end_offsets:
728 Vec<super::common::streams_group_describe_response::task_offset::TaskOffset>,
729 pub assignment: super::common::streams_group_describe_response::assignment::Assignment,
730 pub target_assignment: super::common::streams_group_describe_response::assignment::Assignment,
731 pub is_classic: bool,
732 pub unknown_tagged_fields: UnknownTaggedFields,
733}
734impl Encode for Member {
735 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
736 let flex = version >= 0;
737 if version >= 0 {
738 if flex {
739 put_compact_string(buf, &self.member_id);
740 } else {
741 put_string(buf, &self.member_id);
742 }
743 }
744 if version >= 0 {
745 put_i32(buf, self.member_epoch);
746 }
747 if version >= 0 {
748 if flex {
749 put_compact_nullable_string(buf, self.instance_id.as_deref());
750 } else {
751 put_nullable_string(buf, self.instance_id.as_deref());
752 }
753 }
754 if version >= 0 {
755 if flex {
756 put_compact_nullable_string(buf, self.rack_id.as_deref());
757 } else {
758 put_nullable_string(buf, self.rack_id.as_deref());
759 }
760 }
761 if version >= 0 {
762 if flex {
763 put_compact_string(buf, &self.client_id);
764 } else {
765 put_string(buf, &self.client_id);
766 }
767 }
768 if version >= 0 {
769 if flex {
770 put_compact_string(buf, &self.client_host);
771 } else {
772 put_string(buf, &self.client_host);
773 }
774 }
775 if version >= 0 {
776 put_i32(buf, self.topology_epoch);
777 }
778 if version >= 0 {
779 if flex {
780 put_compact_string(buf, &self.process_id);
781 } else {
782 put_string(buf, &self.process_id);
783 }
784 }
785 if version >= 0 {
786 match &self.user_endpoint {
787 None => {
788 buf.put_i8(-1);
789 }
790 Some(v) => {
791 buf.put_i8(1);
792 v.encode(buf, version)?;
793 }
794 }
795 }
796 if version >= 0 {
797 {
798 crate::primitives::array::put_array_len(buf, (self.client_tags).len(), flex);
799 for it in &self.client_tags {
800 it.encode(buf, version)?;
801 }
802 }
803 }
804 if version >= 0 {
805 {
806 crate::primitives::array::put_array_len(buf, (self.task_offsets).len(), flex);
807 for it in &self.task_offsets {
808 it.encode(buf, version)?;
809 }
810 }
811 }
812 if version >= 0 {
813 {
814 crate::primitives::array::put_array_len(buf, (self.task_end_offsets).len(), flex);
815 for it in &self.task_end_offsets {
816 it.encode(buf, version)?;
817 }
818 }
819 }
820 if version >= 0 {
821 self.assignment.encode(buf, version)?;
822 }
823 if version >= 0 {
824 self.target_assignment.encode(buf, version)?;
825 }
826 if version >= 0 {
827 put_bool(buf, self.is_classic);
828 }
829 if flex {
830 let tagged = WriteTaggedFields::new();
831 tagged.write(buf, &self.unknown_tagged_fields);
832 }
833 Ok(())
834 }
835 fn encoded_len(&self, version: i16) -> usize {
836 let flex = version >= 0;
837 let mut n: usize = 0;
838 if version >= 0 {
839 n += if flex {
840 compact_string_len(&self.member_id)
841 } else {
842 string_len(&self.member_id)
843 };
844 }
845 if version >= 0 {
846 n += 4;
847 }
848 if version >= 0 {
849 n += if flex {
850 compact_nullable_string_len(self.instance_id.as_deref())
851 } else {
852 nullable_string_len(self.instance_id.as_deref())
853 };
854 }
855 if version >= 0 {
856 n += if flex {
857 compact_nullable_string_len(self.rack_id.as_deref())
858 } else {
859 nullable_string_len(self.rack_id.as_deref())
860 };
861 }
862 if version >= 0 {
863 n += if flex {
864 compact_string_len(&self.client_id)
865 } else {
866 string_len(&self.client_id)
867 };
868 }
869 if version >= 0 {
870 n += if flex {
871 compact_string_len(&self.client_host)
872 } else {
873 string_len(&self.client_host)
874 };
875 }
876 if version >= 0 {
877 n += 4;
878 }
879 if version >= 0 {
880 n += if flex {
881 compact_string_len(&self.process_id)
882 } else {
883 string_len(&self.process_id)
884 };
885 }
886 if version >= 0 {
887 n += 1 + self
888 .user_endpoint
889 .as_ref()
890 .map_or(0, |v| v.encoded_len(version));
891 }
892 if version >= 0 {
893 n += {
894 let prefix =
895 crate::primitives::array::array_len_prefix_len((self.client_tags).len(), flex);
896 let body: usize = (self.client_tags)
897 .iter()
898 .map(|it| it.encoded_len(version))
899 .sum();
900 prefix + body
901 };
902 }
903 if version >= 0 {
904 n += {
905 let prefix =
906 crate::primitives::array::array_len_prefix_len((self.task_offsets).len(), flex);
907 let body: usize = (self.task_offsets)
908 .iter()
909 .map(|it| it.encoded_len(version))
910 .sum();
911 prefix + body
912 };
913 }
914 if version >= 0 {
915 n += {
916 let prefix = crate::primitives::array::array_len_prefix_len(
917 (self.task_end_offsets).len(),
918 flex,
919 );
920 let body: usize = (self.task_end_offsets)
921 .iter()
922 .map(|it| it.encoded_len(version))
923 .sum();
924 prefix + body
925 };
926 }
927 if version >= 0 {
928 n += self.assignment.encoded_len(version);
929 }
930 if version >= 0 {
931 n += self.target_assignment.encoded_len(version);
932 }
933 if version >= 0 {
934 n += 1;
935 }
936 if flex {
937 let known_pairs: Vec<(u32, usize)> = Vec::new();
938 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
939 }
940 n
941 }
942}
943impl Decode<'_> for Member {
944 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
945 let flex = version >= 0;
946 let mut out = Self::default();
947 if version >= 0 {
948 out.member_id = if flex {
949 get_compact_string_owned(buf)?
950 } else {
951 get_string_owned(buf)?
952 };
953 }
954 if version >= 0 {
955 out.member_epoch = get_i32(buf)?;
956 }
957 if version >= 0 {
958 out.instance_id = if flex {
959 get_compact_nullable_string_owned(buf)?
960 } else {
961 get_nullable_string_owned(buf)?
962 };
963 }
964 if version >= 0 {
965 out.rack_id = if flex {
966 get_compact_nullable_string_owned(buf)?
967 } else {
968 get_nullable_string_owned(buf)?
969 };
970 }
971 if version >= 0 {
972 out.client_id = if flex {
973 get_compact_string_owned(buf)?
974 } else {
975 get_string_owned(buf)?
976 };
977 }
978 if version >= 0 {
979 out.client_host = if flex {
980 get_compact_string_owned(buf)?
981 } else {
982 get_string_owned(buf)?
983 };
984 }
985 if version >= 0 {
986 out.topology_epoch = get_i32(buf)?;
987 }
988 if version >= 0 {
989 out.process_id = if flex {
990 get_compact_string_owned(buf)?
991 } else {
992 get_string_owned(buf)?
993 };
994 }
995 if version >= 0 {
996 out.user_endpoint = if get_i8(buf)? < 0 {
997 None
998 } else {
999 Some(
1000 super::common::streams_group_describe_response::endpoint::Endpoint::decode(
1001 buf, version,
1002 )?,
1003 )
1004 };
1005 }
1006 if version >= 0 {
1007 out.client_tags = {
1008 let n = crate::primitives::array::get_array_len(buf, flex)?;
1009 let mut v = Vec::with_capacity(n);
1010 for _ in 0..n {
1011 v . push (super :: common :: streams_group_describe_response :: key_value :: KeyValue :: decode (buf , version) ?) ;
1012 }
1013 v
1014 };
1015 }
1016 if version >= 0 {
1017 out.task_offsets = {
1018 let n = crate::primitives::array::get_array_len(buf, flex)?;
1019 let mut v = Vec::with_capacity(n);
1020 for _ in 0..n {
1021 v . push (super :: common :: streams_group_describe_response :: task_offset :: TaskOffset :: decode (buf , version) ?) ;
1022 }
1023 v
1024 };
1025 }
1026 if version >= 0 {
1027 out.task_end_offsets = {
1028 let n = crate::primitives::array::get_array_len(buf, flex)?;
1029 let mut v = Vec::with_capacity(n);
1030 for _ in 0..n {
1031 v . push (super :: common :: streams_group_describe_response :: task_offset :: TaskOffset :: decode (buf , version) ?) ;
1032 }
1033 v
1034 };
1035 }
1036 if version >= 0 {
1037 out.assignment =
1038 super::common::streams_group_describe_response::assignment::Assignment::decode(
1039 buf, version,
1040 )?;
1041 }
1042 if version >= 0 {
1043 out.target_assignment =
1044 super::common::streams_group_describe_response::assignment::Assignment::decode(
1045 buf, version,
1046 )?;
1047 }
1048 if version >= 0 {
1049 out.is_classic = get_bool(buf)?;
1050 }
1051 if flex {
1052 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
1053 }
1054 Ok(out)
1055 }
1056}
1057#[cfg(test)]
1058impl Member {
1059 #[must_use]
1060 pub fn populated(version: i16) -> Self {
1061 let mut m = Self::default();
1062 if version >= 0 {
1063 m.member_id = "x".to_string();
1064 }
1065 if version >= 0 {
1066 m.member_epoch = 1i32;
1067 }
1068 if version >= 0 {
1069 m.instance_id = Some("x".to_string());
1070 }
1071 if version >= 0 {
1072 m.rack_id = Some("x".to_string());
1073 }
1074 if version >= 0 {
1075 m.client_id = "x".to_string();
1076 }
1077 if version >= 0 {
1078 m.client_host = "x".to_string();
1079 }
1080 if version >= 0 {
1081 m.topology_epoch = 1i32;
1082 }
1083 if version >= 0 {
1084 m.process_id = "x".to_string();
1085 }
1086 if version >= 0 {
1087 m.user_endpoint = Some(
1088 super::common::streams_group_describe_response::endpoint::Endpoint::populated(
1089 version,
1090 ),
1091 );
1092 }
1093 if version >= 0 {
1094 m.client_tags = vec![
1095 super::common::streams_group_describe_response::key_value::KeyValue::populated(
1096 version,
1097 ),
1098 ];
1099 }
1100 if version >= 0 {
1101 m.task_offsets = vec![
1102 super::common::streams_group_describe_response::task_offset::TaskOffset::populated(
1103 version,
1104 ),
1105 ];
1106 }
1107 if version >= 0 {
1108 m.task_end_offsets = vec![
1109 super::common::streams_group_describe_response::task_offset::TaskOffset::populated(
1110 version,
1111 ),
1112 ];
1113 }
1114 if version >= 0 {
1115 m.assignment =
1116 super::common::streams_group_describe_response::assignment::Assignment::populated(
1117 version,
1118 );
1119 }
1120 if version >= 0 {
1121 m.target_assignment =
1122 super::common::streams_group_describe_response::assignment::Assignment::populated(
1123 version,
1124 );
1125 }
1126 if version >= 0 {
1127 m.is_classic = true;
1128 }
1129 m
1130 }
1131}
1132
1133#[must_use]
1136#[allow(unused_comparisons)]
1137pub fn default_json(version: i16) -> ::serde_json::Value {
1138 let mut obj = ::serde_json::Map::new();
1139 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
1140 obj.insert("groups".to_string(), ::serde_json::Value::Array(vec![]));
1141 ::serde_json::Value::Object(obj)
1142}