1use crate::primitives::fixed::{get_bool, get_i8, get_i16, get_i32, put_bool, put_i16, put_i32};
3use crate::primitives::string_bytes::{
4 compact_nullable_string_len, compact_string_len, nullable_string_len,
5 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
6};
7use crate::primitives::string_bytes_borrowed::{
8 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
9 get_nullable_string_borrowed, get_string_borrowed,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
13use bytes::BufMut;
14pub const API_KEY: i16 = 89;
15pub const MIN_VERSION: i16 = 0;
16pub const MAX_VERSION: i16 = 0;
17pub const FLEXIBLE_MIN: i16 = 0;
18#[inline]
19fn is_flexible(version: i16) -> bool {
20 version >= FLEXIBLE_MIN
21}
22#[derive(Debug, Clone, PartialEq, Eq, Default)]
23pub struct StreamsGroupDescribeResponse<'a> {
24 pub throttle_time_ms: i32,
25 pub groups: Vec<DescribedGroup<'a>>,
26 pub unknown_tagged_fields: UnknownTaggedFields,
27}
28impl StreamsGroupDescribeResponse<'_> {
29 pub fn to_owned(
30 &self,
31 ) -> crate::owned::streams_group_describe_response::StreamsGroupDescribeResponse {
32 crate::owned::streams_group_describe_response::StreamsGroupDescribeResponse {
33 throttle_time_ms: (self.throttle_time_ms),
34 groups: (self.groups).iter().map(DescribedGroup::to_owned).collect(),
35 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
36 }
37 }
38}
39impl Encode for StreamsGroupDescribeResponse<'_> {
40 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
41 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
42 return Err(ProtocolError::UnsupportedVersion {
43 api_key: API_KEY,
44 version,
45 });
46 }
47 let flex = is_flexible(version);
48 if version >= 0 {
49 put_i32(buf, self.throttle_time_ms);
50 }
51 if version >= 0 {
52 {
53 crate::primitives::array::put_array_len(buf, (self.groups).len(), flex);
54 for it in &self.groups {
55 it.encode(buf, version)?;
56 }
57 }
58 }
59 if flex {
60 let tagged = WriteTaggedFields::new();
61 tagged.write(buf, &self.unknown_tagged_fields);
62 }
63 Ok(())
64 }
65 fn encoded_len(&self, version: i16) -> usize {
66 let flex = is_flexible(version);
67 let mut n: usize = 0;
68 if version >= 0 {
69 n += 4;
70 }
71 if version >= 0 {
72 n += {
73 let prefix =
74 crate::primitives::array::array_len_prefix_len((self.groups).len(), flex);
75 let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum();
76 prefix + body
77 };
78 }
79 if flex {
80 let known_pairs: Vec<(u32, usize)> = Vec::new();
81 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
82 }
83 n
84 }
85}
86impl<'de> DecodeBorrow<'de> for StreamsGroupDescribeResponse<'de> {
87 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
88 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
89 return Err(ProtocolError::UnsupportedVersion {
90 api_key: API_KEY,
91 version,
92 });
93 }
94 let flex = is_flexible(version);
95 let mut out = Self::default();
96 if version >= 0 {
97 out.throttle_time_ms = get_i32(buf)?;
98 }
99 if version >= 0 {
100 out.groups = {
101 let n = crate::primitives::array::get_array_len(buf, flex)?;
102 let mut v = Vec::with_capacity(n);
103 for _ in 0..n {
104 v.push(DescribedGroup::decode_borrow(buf, version)?);
105 }
106 v
107 };
108 }
109 if flex {
110 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
111 }
112 Ok(out)
113 }
114}
115#[cfg(test)]
116impl StreamsGroupDescribeResponse<'_> {
117 #[must_use]
118 pub fn populated(version: i16) -> Self {
119 let mut m = Self::default();
120 if version >= 0 {
121 m.throttle_time_ms = 1i32;
122 }
123 if version >= 0 {
124 m.groups = vec![DescribedGroup::populated(version)];
125 }
126 m
127 }
128}
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub struct DescribedGroup<'a> {
131 pub error_code: i16,
132 pub error_message: Option<&'a str>,
133 pub group_id: &'a str,
134 pub group_state: &'a str,
135 pub group_epoch: i32,
136 pub assignment_epoch: i32,
137 pub topology: Option<Topology<'a>>,
138 pub members: Vec<Member<'a>>,
139 pub authorized_operations: i32,
140 pub unknown_tagged_fields: UnknownTaggedFields,
141}
142impl Default for DescribedGroup<'_> {
143 fn default() -> Self {
144 Self {
145 error_code: 0i16,
146 error_message: None,
147 group_id: "",
148 group_state: "",
149 group_epoch: 0i32,
150 assignment_epoch: 0i32,
151 topology: None,
152 members: Vec::new(),
153 authorized_operations: -2_147_483_648i32,
154 unknown_tagged_fields: Default::default(),
155 }
156 }
157}
158impl DescribedGroup<'_> {
159 pub fn to_owned(&self) -> crate::owned::streams_group_describe_response::DescribedGroup {
160 crate::owned::streams_group_describe_response::DescribedGroup {
161 error_code: (self.error_code),
162 error_message: (self.error_message).map(std::string::ToString::to_string),
163 group_id: (self.group_id).to_string(),
164 group_state: (self.group_state).to_string(),
165 group_epoch: (self.group_epoch),
166 assignment_epoch: (self.assignment_epoch),
167 topology: (self.topology).as_ref().map(Topology::to_owned),
168 members: (self.members).iter().map(Member::to_owned).collect(),
169 authorized_operations: (self.authorized_operations),
170 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
171 }
172 }
173}
174impl Encode for DescribedGroup<'_> {
175 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
176 let flex = version >= 0;
177 if version >= 0 {
178 put_i16(buf, self.error_code);
179 }
180 if version >= 0 {
181 if flex {
182 put_compact_nullable_string(buf, self.error_message);
183 } else {
184 put_nullable_string(buf, self.error_message);
185 }
186 }
187 if version >= 0 {
188 if flex {
189 put_compact_string(buf, self.group_id);
190 } else {
191 put_string(buf, self.group_id);
192 }
193 }
194 if version >= 0 {
195 if flex {
196 put_compact_string(buf, self.group_state);
197 } else {
198 put_string(buf, self.group_state);
199 }
200 }
201 if version >= 0 {
202 put_i32(buf, self.group_epoch);
203 }
204 if version >= 0 {
205 put_i32(buf, self.assignment_epoch);
206 }
207 if version >= 0 {
208 match &self.topology {
209 None => {
210 buf.put_i8(-1);
211 }
212 Some(v) => {
213 buf.put_i8(1);
214 v.encode(buf, version)?;
215 }
216 }
217 }
218 if version >= 0 {
219 {
220 crate::primitives::array::put_array_len(buf, (self.members).len(), flex);
221 for it in &self.members {
222 it.encode(buf, version)?;
223 }
224 }
225 }
226 if version >= 0 {
227 put_i32(buf, self.authorized_operations);
228 }
229 if flex {
230 let tagged = WriteTaggedFields::new();
231 tagged.write(buf, &self.unknown_tagged_fields);
232 }
233 Ok(())
234 }
235 fn encoded_len(&self, version: i16) -> usize {
236 let flex = version >= 0;
237 let mut n: usize = 0;
238 if version >= 0 {
239 n += 2;
240 }
241 if version >= 0 {
242 n += if flex {
243 compact_nullable_string_len(self.error_message)
244 } else {
245 nullable_string_len(self.error_message)
246 };
247 }
248 if version >= 0 {
249 n += if flex {
250 compact_string_len(self.group_id)
251 } else {
252 string_len(self.group_id)
253 };
254 }
255 if version >= 0 {
256 n += if flex {
257 compact_string_len(self.group_state)
258 } else {
259 string_len(self.group_state)
260 };
261 }
262 if version >= 0 {
263 n += 4;
264 }
265 if version >= 0 {
266 n += 4;
267 }
268 if version >= 0 {
269 n += 1 + self.topology.as_ref().map_or(0, |v| v.encoded_len(version));
270 }
271 if version >= 0 {
272 n += {
273 let prefix =
274 crate::primitives::array::array_len_prefix_len((self.members).len(), flex);
275 let body: usize = (self.members)
276 .iter()
277 .map(|it| it.encoded_len(version))
278 .sum();
279 prefix + body
280 };
281 }
282 if version >= 0 {
283 n += 4;
284 }
285 if flex {
286 let known_pairs: Vec<(u32, usize)> = Vec::new();
287 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
288 }
289 n
290 }
291}
292impl<'de> DecodeBorrow<'de> for DescribedGroup<'de> {
293 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
294 let flex = version >= 0;
295 let mut out = Self::default();
296 if version >= 0 {
297 out.error_code = get_i16(buf)?;
298 }
299 if version >= 0 {
300 out.error_message = if flex {
301 get_compact_nullable_string_borrowed(buf)?
302 } else {
303 get_nullable_string_borrowed(buf)?
304 };
305 }
306 if version >= 0 {
307 out.group_id = if flex {
308 get_compact_string_borrowed(buf)?
309 } else {
310 get_string_borrowed(buf)?
311 };
312 }
313 if version >= 0 {
314 out.group_state = if flex {
315 get_compact_string_borrowed(buf)?
316 } else {
317 get_string_borrowed(buf)?
318 };
319 }
320 if version >= 0 {
321 out.group_epoch = get_i32(buf)?;
322 }
323 if version >= 0 {
324 out.assignment_epoch = get_i32(buf)?;
325 }
326 if version >= 0 {
327 out.topology = if get_i8(buf)? < 0 {
328 None
329 } else {
330 Some(Topology::decode_borrow(buf, version)?)
331 };
332 }
333 if version >= 0 {
334 out.members = {
335 let n = crate::primitives::array::get_array_len(buf, flex)?;
336 let mut v = Vec::with_capacity(n);
337 for _ in 0..n {
338 v.push(Member::decode_borrow(buf, version)?);
339 }
340 v
341 };
342 }
343 if version >= 0 {
344 out.authorized_operations = get_i32(buf)?;
345 }
346 if flex {
347 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
348 }
349 Ok(out)
350 }
351}
352#[cfg(test)]
353impl DescribedGroup<'_> {
354 #[must_use]
355 pub fn populated(version: i16) -> Self {
356 let mut m = Self::default();
357 if version >= 0 {
358 m.error_code = 1i16;
359 }
360 if version >= 0 {
361 m.error_message = Some("x");
362 }
363 if version >= 0 {
364 m.group_id = "x";
365 }
366 if version >= 0 {
367 m.group_state = "x";
368 }
369 if version >= 0 {
370 m.group_epoch = 1i32;
371 }
372 if version >= 0 {
373 m.assignment_epoch = 1i32;
374 }
375 if version >= 0 {
376 m.topology = Some(Topology::populated(version));
377 }
378 if version >= 0 {
379 m.members = vec![Member::populated(version)];
380 }
381 if version >= 0 {
382 m.authorized_operations = 1i32;
383 }
384 m
385 }
386}
387#[derive(Debug, Clone, PartialEq, Eq, Default)]
388pub struct Topology<'a> {
389 pub epoch: i32,
390 pub subtopologies: Option<Vec<Subtopology<'a>>>,
391 pub unknown_tagged_fields: UnknownTaggedFields,
392}
393impl Topology<'_> {
394 pub fn to_owned(&self) -> crate::owned::streams_group_describe_response::Topology {
395 crate::owned::streams_group_describe_response::Topology {
396 epoch: (self.epoch),
397 subtopologies: (self.subtopologies)
398 .as_ref()
399 .map(|v| v.iter().map(Subtopology::to_owned).collect()),
400 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
401 }
402 }
403}
404impl Encode for Topology<'_> {
405 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
406 let flex = version >= 0;
407 if version >= 0 {
408 put_i32(buf, self.epoch);
409 }
410 if version >= 0 {
411 {
412 let len = (self.subtopologies).as_ref().map(Vec::len);
413 crate::primitives::array::put_nullable_array_len(buf, len, flex);
414 if let Some(v) = &self.subtopologies {
415 for it in v {
416 it.encode(buf, version)?;
417 }
418 }
419 }
420 }
421 if flex {
422 let tagged = WriteTaggedFields::new();
423 tagged.write(buf, &self.unknown_tagged_fields);
424 }
425 Ok(())
426 }
427 fn encoded_len(&self, version: i16) -> usize {
428 let flex = version >= 0;
429 let mut n: usize = 0;
430 if version >= 0 {
431 n += 4;
432 }
433 if version >= 0 {
434 n += {
435 let opt: Option<&Vec<_>> = (self.subtopologies).as_ref();
436 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
437 opt.map(std::vec::Vec::len),
438 flex,
439 );
440 let body: usize =
441 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
442 prefix + body
443 };
444 }
445 if flex {
446 let known_pairs: Vec<(u32, usize)> = Vec::new();
447 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
448 }
449 n
450 }
451}
452impl<'de> DecodeBorrow<'de> for Topology<'de> {
453 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
454 let flex = version >= 0;
455 let mut out = Self::default();
456 if version >= 0 {
457 out.epoch = get_i32(buf)?;
458 }
459 if version >= 0 {
460 out.subtopologies = {
461 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
462 match opt {
463 None => None,
464 Some(n) => {
465 let mut v = Vec::with_capacity(n);
466 for _ in 0..n {
467 v.push(Subtopology::decode_borrow(buf, version)?);
468 }
469 Some(v)
470 }
471 }
472 };
473 }
474 if flex {
475 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
476 }
477 Ok(out)
478 }
479}
480#[cfg(test)]
481impl Topology<'_> {
482 #[must_use]
483 pub fn populated(version: i16) -> Self {
484 let mut m = Self::default();
485 if version >= 0 {
486 m.epoch = 1i32;
487 }
488 if version >= 0 {
489 m.subtopologies = Some(vec![Subtopology::populated(version)]);
490 }
491 m
492 }
493}
494#[derive(Debug, Clone, PartialEq, Eq, Default)]
495pub struct Subtopology<'a> {
496 pub subtopology_id: &'a str,
497 pub source_topics: Vec<&'a str>,
498 pub repartition_sink_topics: Vec<&'a str>,
499 pub state_changelog_topics:
500 Vec<super::common::streams_group_describe_response::topic_info::TopicInfo<'a>>,
501 pub repartition_source_topics:
502 Vec<super::common::streams_group_describe_response::topic_info::TopicInfo<'a>>,
503 pub unknown_tagged_fields: UnknownTaggedFields,
504}
505impl Subtopology<'_> {
506 pub fn to_owned(&self) -> crate::owned::streams_group_describe_response::Subtopology {
507 crate::owned::streams_group_describe_response::Subtopology {
508 subtopology_id: (self.subtopology_id).to_string(),
509 source_topics: (self.source_topics)
510 .iter()
511 .map(std::string::ToString::to_string)
512 .collect(),
513 repartition_sink_topics: (self.repartition_sink_topics)
514 .iter()
515 .map(std::string::ToString::to_string)
516 .collect(),
517 state_changelog_topics: (self.state_changelog_topics)
518 .iter()
519 .map(
520 super::common::streams_group_describe_response::topic_info::TopicInfo::to_owned,
521 )
522 .collect(),
523 repartition_source_topics: (self.repartition_source_topics)
524 .iter()
525 .map(
526 super::common::streams_group_describe_response::topic_info::TopicInfo::to_owned,
527 )
528 .collect(),
529 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
530 }
531 }
532}
533impl Encode for Subtopology<'_> {
534 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
535 let flex = version >= 0;
536 if version >= 0 {
537 if flex {
538 put_compact_string(buf, self.subtopology_id);
539 } else {
540 put_string(buf, self.subtopology_id);
541 }
542 }
543 if version >= 0 {
544 {
545 crate::primitives::array::put_array_len(buf, (self.source_topics).len(), flex);
546 for it in &self.source_topics {
547 if flex {
548 put_compact_string(buf, it);
549 } else {
550 put_string(buf, it);
551 }
552 }
553 }
554 }
555 if version >= 0 {
556 {
557 crate::primitives::array::put_array_len(
558 buf,
559 (self.repartition_sink_topics).len(),
560 flex,
561 );
562 for it in &self.repartition_sink_topics {
563 if flex {
564 put_compact_string(buf, it);
565 } else {
566 put_string(buf, it);
567 }
568 }
569 }
570 }
571 if version >= 0 {
572 {
573 crate::primitives::array::put_array_len(
574 buf,
575 (self.state_changelog_topics).len(),
576 flex,
577 );
578 for it in &self.state_changelog_topics {
579 it.encode(buf, version)?;
580 }
581 }
582 }
583 if version >= 0 {
584 {
585 crate::primitives::array::put_array_len(
586 buf,
587 (self.repartition_source_topics).len(),
588 flex,
589 );
590 for it in &self.repartition_source_topics {
591 it.encode(buf, version)?;
592 }
593 }
594 }
595 if flex {
596 let tagged = WriteTaggedFields::new();
597 tagged.write(buf, &self.unknown_tagged_fields);
598 }
599 Ok(())
600 }
601 fn encoded_len(&self, version: i16) -> usize {
602 let flex = version >= 0;
603 let mut n: usize = 0;
604 if version >= 0 {
605 n += if flex {
606 compact_string_len(self.subtopology_id)
607 } else {
608 string_len(self.subtopology_id)
609 };
610 }
611 if version >= 0 {
612 n += {
613 let prefix = crate::primitives::array::array_len_prefix_len(
614 (self.source_topics).len(),
615 flex,
616 );
617 let body: usize = (self.source_topics)
618 .iter()
619 .map(|it| {
620 if flex {
621 compact_string_len(it)
622 } else {
623 string_len(it)
624 }
625 })
626 .sum();
627 prefix + body
628 };
629 }
630 if version >= 0 {
631 n += {
632 let prefix = crate::primitives::array::array_len_prefix_len(
633 (self.repartition_sink_topics).len(),
634 flex,
635 );
636 let body: usize = (self.repartition_sink_topics)
637 .iter()
638 .map(|it| {
639 if flex {
640 compact_string_len(it)
641 } else {
642 string_len(it)
643 }
644 })
645 .sum();
646 prefix + body
647 };
648 }
649 if version >= 0 {
650 n += {
651 let prefix = crate::primitives::array::array_len_prefix_len(
652 (self.state_changelog_topics).len(),
653 flex,
654 );
655 let body: usize = (self.state_changelog_topics)
656 .iter()
657 .map(|it| it.encoded_len(version))
658 .sum();
659 prefix + body
660 };
661 }
662 if version >= 0 {
663 n += {
664 let prefix = crate::primitives::array::array_len_prefix_len(
665 (self.repartition_source_topics).len(),
666 flex,
667 );
668 let body: usize = (self.repartition_source_topics)
669 .iter()
670 .map(|it| it.encoded_len(version))
671 .sum();
672 prefix + body
673 };
674 }
675 if flex {
676 let known_pairs: Vec<(u32, usize)> = Vec::new();
677 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
678 }
679 n
680 }
681}
682impl<'de> DecodeBorrow<'de> for Subtopology<'de> {
683 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
684 let flex = version >= 0;
685 let mut out = Self::default();
686 if version >= 0 {
687 out.subtopology_id = if flex {
688 get_compact_string_borrowed(buf)?
689 } else {
690 get_string_borrowed(buf)?
691 };
692 }
693 if version >= 0 {
694 out.source_topics = {
695 let n = crate::primitives::array::get_array_len(buf, flex)?;
696 let mut v = Vec::with_capacity(n);
697 for _ in 0..n {
698 v.push(if flex {
699 get_compact_string_borrowed(buf)?
700 } else {
701 get_string_borrowed(buf)?
702 });
703 }
704 v
705 };
706 }
707 if version >= 0 {
708 out.repartition_sink_topics = {
709 let n = crate::primitives::array::get_array_len(buf, flex)?;
710 let mut v = Vec::with_capacity(n);
711 for _ in 0..n {
712 v.push(if flex {
713 get_compact_string_borrowed(buf)?
714 } else {
715 get_string_borrowed(buf)?
716 });
717 }
718 v
719 };
720 }
721 if version >= 0 {
722 out.state_changelog_topics = {
723 let n = crate::primitives::array::get_array_len(buf, flex)?;
724 let mut v = Vec::with_capacity(n);
725 for _ in 0..n {
726 v.push(
727 super::common::streams_group_describe_response::topic_info::TopicInfo::decode_borrow(
728 buf,
729 version,
730 )?,
731 );
732 }
733 v
734 };
735 }
736 if version >= 0 {
737 out.repartition_source_topics = {
738 let n = crate::primitives::array::get_array_len(buf, flex)?;
739 let mut v = Vec::with_capacity(n);
740 for _ in 0..n {
741 v.push(
742 super::common::streams_group_describe_response::topic_info::TopicInfo::decode_borrow(
743 buf,
744 version,
745 )?,
746 );
747 }
748 v
749 };
750 }
751 if flex {
752 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
753 }
754 Ok(out)
755 }
756}
757#[cfg(test)]
758impl Subtopology<'_> {
759 #[must_use]
760 pub fn populated(version: i16) -> Self {
761 let mut m = Self::default();
762 if version >= 0 {
763 m.subtopology_id = "x";
764 }
765 if version >= 0 {
766 m.source_topics = vec!["x"];
767 }
768 if version >= 0 {
769 m.repartition_sink_topics = vec!["x"];
770 }
771 if version >= 0 {
772 m.state_changelog_topics = vec![
773 super::common::streams_group_describe_response::topic_info::TopicInfo::populated(
774 version,
775 ),
776 ];
777 }
778 if version >= 0 {
779 m.repartition_source_topics = vec![
780 super::common::streams_group_describe_response::topic_info::TopicInfo::populated(
781 version,
782 ),
783 ];
784 }
785 m
786 }
787}
788#[derive(Debug, Clone, PartialEq, Eq, Default)]
789pub struct Member<'a> {
790 pub member_id: &'a str,
791 pub member_epoch: i32,
792 pub instance_id: Option<&'a str>,
793 pub rack_id: Option<&'a str>,
794 pub client_id: &'a str,
795 pub client_host: &'a str,
796 pub topology_epoch: i32,
797 pub process_id: &'a str,
798 pub user_endpoint:
799 Option<super::common::streams_group_describe_response::endpoint::Endpoint<'a>>,
800 pub client_tags: Vec<super::common::streams_group_describe_response::key_value::KeyValue<'a>>,
801 pub task_offsets:
802 Vec<super::common::streams_group_describe_response::task_offset::TaskOffset<'a>>,
803 pub task_end_offsets:
804 Vec<super::common::streams_group_describe_response::task_offset::TaskOffset<'a>>,
805 pub assignment: super::common::streams_group_describe_response::assignment::Assignment<'a>,
806 pub target_assignment:
807 super::common::streams_group_describe_response::assignment::Assignment<'a>,
808 pub is_classic: bool,
809 pub unknown_tagged_fields: UnknownTaggedFields,
810}
811impl Member<'_> {
812 pub fn to_owned(&self) -> crate::owned::streams_group_describe_response::Member {
813 crate::owned::streams_group_describe_response::Member {
814 member_id: (self.member_id).to_string(),
815 member_epoch: (self.member_epoch),
816 instance_id: (self.instance_id).map(std::string::ToString::to_string),
817 rack_id: (self.rack_id).map(std::string::ToString::to_string),
818 client_id: (self.client_id).to_string(),
819 client_host: (self.client_host).to_string(),
820 topology_epoch: (self.topology_epoch),
821 process_id: (self.process_id).to_string(),
822 user_endpoint: (self.user_endpoint).as_ref().map(super::common::streams_group_describe_response::endpoint::Endpoint::to_owned),
823 client_tags: (self.client_tags).iter().map(super::common::streams_group_describe_response::key_value::KeyValue::to_owned).collect(),
824 task_offsets: (self.task_offsets).iter().map(super::common::streams_group_describe_response::task_offset::TaskOffset::to_owned).collect(),
825 task_end_offsets: (self.task_end_offsets)
826 .iter()
827 .map(super::common::streams_group_describe_response::task_offset::TaskOffset::to_owned)
828 .collect(),
829 assignment: (self.assignment).to_owned(),
830 target_assignment: (self.target_assignment).to_owned(),
831 is_classic: (self.is_classic),
832 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
833 }
834 }
835}
836impl Encode for Member<'_> {
837 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
838 let flex = version >= 0;
839 if version >= 0 {
840 if flex {
841 put_compact_string(buf, self.member_id);
842 } else {
843 put_string(buf, self.member_id);
844 }
845 }
846 if version >= 0 {
847 put_i32(buf, self.member_epoch);
848 }
849 if version >= 0 {
850 if flex {
851 put_compact_nullable_string(buf, self.instance_id);
852 } else {
853 put_nullable_string(buf, self.instance_id);
854 }
855 }
856 if version >= 0 {
857 if flex {
858 put_compact_nullable_string(buf, self.rack_id);
859 } else {
860 put_nullable_string(buf, self.rack_id);
861 }
862 }
863 if version >= 0 {
864 if flex {
865 put_compact_string(buf, self.client_id);
866 } else {
867 put_string(buf, self.client_id);
868 }
869 }
870 if version >= 0 {
871 if flex {
872 put_compact_string(buf, self.client_host);
873 } else {
874 put_string(buf, self.client_host);
875 }
876 }
877 if version >= 0 {
878 put_i32(buf, self.topology_epoch);
879 }
880 if version >= 0 {
881 if flex {
882 put_compact_string(buf, self.process_id);
883 } else {
884 put_string(buf, self.process_id);
885 }
886 }
887 if version >= 0 {
888 match &self.user_endpoint {
889 None => {
890 buf.put_i8(-1);
891 }
892 Some(v) => {
893 buf.put_i8(1);
894 v.encode(buf, version)?;
895 }
896 }
897 }
898 if version >= 0 {
899 {
900 crate::primitives::array::put_array_len(buf, (self.client_tags).len(), flex);
901 for it in &self.client_tags {
902 it.encode(buf, version)?;
903 }
904 }
905 }
906 if version >= 0 {
907 {
908 crate::primitives::array::put_array_len(buf, (self.task_offsets).len(), flex);
909 for it in &self.task_offsets {
910 it.encode(buf, version)?;
911 }
912 }
913 }
914 if version >= 0 {
915 {
916 crate::primitives::array::put_array_len(buf, (self.task_end_offsets).len(), flex);
917 for it in &self.task_end_offsets {
918 it.encode(buf, version)?;
919 }
920 }
921 }
922 if version >= 0 {
923 self.assignment.encode(buf, version)?;
924 }
925 if version >= 0 {
926 self.target_assignment.encode(buf, version)?;
927 }
928 if version >= 0 {
929 put_bool(buf, self.is_classic);
930 }
931 if flex {
932 let tagged = WriteTaggedFields::new();
933 tagged.write(buf, &self.unknown_tagged_fields);
934 }
935 Ok(())
936 }
937 fn encoded_len(&self, version: i16) -> usize {
938 let flex = version >= 0;
939 let mut n: usize = 0;
940 if version >= 0 {
941 n += if flex {
942 compact_string_len(self.member_id)
943 } else {
944 string_len(self.member_id)
945 };
946 }
947 if version >= 0 {
948 n += 4;
949 }
950 if version >= 0 {
951 n += if flex {
952 compact_nullable_string_len(self.instance_id)
953 } else {
954 nullable_string_len(self.instance_id)
955 };
956 }
957 if version >= 0 {
958 n += if flex {
959 compact_nullable_string_len(self.rack_id)
960 } else {
961 nullable_string_len(self.rack_id)
962 };
963 }
964 if version >= 0 {
965 n += if flex {
966 compact_string_len(self.client_id)
967 } else {
968 string_len(self.client_id)
969 };
970 }
971 if version >= 0 {
972 n += if flex {
973 compact_string_len(self.client_host)
974 } else {
975 string_len(self.client_host)
976 };
977 }
978 if version >= 0 {
979 n += 4;
980 }
981 if version >= 0 {
982 n += if flex {
983 compact_string_len(self.process_id)
984 } else {
985 string_len(self.process_id)
986 };
987 }
988 if version >= 0 {
989 n += 1 + self
990 .user_endpoint
991 .as_ref()
992 .map_or(0, |v| v.encoded_len(version));
993 }
994 if version >= 0 {
995 n += {
996 let prefix =
997 crate::primitives::array::array_len_prefix_len((self.client_tags).len(), flex);
998 let body: usize = (self.client_tags)
999 .iter()
1000 .map(|it| it.encoded_len(version))
1001 .sum();
1002 prefix + body
1003 };
1004 }
1005 if version >= 0 {
1006 n += {
1007 let prefix =
1008 crate::primitives::array::array_len_prefix_len((self.task_offsets).len(), flex);
1009 let body: usize = (self.task_offsets)
1010 .iter()
1011 .map(|it| it.encoded_len(version))
1012 .sum();
1013 prefix + body
1014 };
1015 }
1016 if version >= 0 {
1017 n += {
1018 let prefix = crate::primitives::array::array_len_prefix_len(
1019 (self.task_end_offsets).len(),
1020 flex,
1021 );
1022 let body: usize = (self.task_end_offsets)
1023 .iter()
1024 .map(|it| it.encoded_len(version))
1025 .sum();
1026 prefix + body
1027 };
1028 }
1029 if version >= 0 {
1030 n += self.assignment.encoded_len(version);
1031 }
1032 if version >= 0 {
1033 n += self.target_assignment.encoded_len(version);
1034 }
1035 if version >= 0 {
1036 n += 1;
1037 }
1038 if flex {
1039 let known_pairs: Vec<(u32, usize)> = Vec::new();
1040 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
1041 }
1042 n
1043 }
1044}
1045impl<'de> DecodeBorrow<'de> for Member<'de> {
1046 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
1047 let flex = version >= 0;
1048 let mut out = Self::default();
1049 if version >= 0 {
1050 out.member_id = if flex {
1051 get_compact_string_borrowed(buf)?
1052 } else {
1053 get_string_borrowed(buf)?
1054 };
1055 }
1056 if version >= 0 {
1057 out.member_epoch = get_i32(buf)?;
1058 }
1059 if version >= 0 {
1060 out.instance_id = if flex {
1061 get_compact_nullable_string_borrowed(buf)?
1062 } else {
1063 get_nullable_string_borrowed(buf)?
1064 };
1065 }
1066 if version >= 0 {
1067 out.rack_id = if flex {
1068 get_compact_nullable_string_borrowed(buf)?
1069 } else {
1070 get_nullable_string_borrowed(buf)?
1071 };
1072 }
1073 if version >= 0 {
1074 out.client_id = if flex {
1075 get_compact_string_borrowed(buf)?
1076 } else {
1077 get_string_borrowed(buf)?
1078 };
1079 }
1080 if version >= 0 {
1081 out.client_host = if flex {
1082 get_compact_string_borrowed(buf)?
1083 } else {
1084 get_string_borrowed(buf)?
1085 };
1086 }
1087 if version >= 0 {
1088 out.topology_epoch = get_i32(buf)?;
1089 }
1090 if version >= 0 {
1091 out.process_id = if flex {
1092 get_compact_string_borrowed(buf)?
1093 } else {
1094 get_string_borrowed(buf)?
1095 };
1096 }
1097 if version >= 0 {
1098 out.user_endpoint = if get_i8(buf)? < 0 {
1099 None
1100 } else {
1101 Some(
1102 super::common::streams_group_describe_response::endpoint::Endpoint::decode_borrow(
1103 buf,
1104 version,
1105 )?,
1106 )
1107 };
1108 }
1109 if version >= 0 {
1110 out.client_tags = {
1111 let n = crate::primitives::array::get_array_len(buf, flex)?;
1112 let mut v = Vec::with_capacity(n);
1113 for _ in 0..n {
1114 v.push(
1115 super::common::streams_group_describe_response::key_value::KeyValue::decode_borrow(
1116 buf,
1117 version,
1118 )?,
1119 );
1120 }
1121 v
1122 };
1123 }
1124 if version >= 0 {
1125 out.task_offsets = {
1126 let n = crate::primitives::array::get_array_len(buf, flex)?;
1127 let mut v = Vec::with_capacity(n);
1128 for _ in 0..n {
1129 v.push(
1130 super::common::streams_group_describe_response::task_offset::TaskOffset::decode_borrow(
1131 buf,
1132 version,
1133 )?,
1134 );
1135 }
1136 v
1137 };
1138 }
1139 if version >= 0 {
1140 out.task_end_offsets = {
1141 let n = crate::primitives::array::get_array_len(buf, flex)?;
1142 let mut v = Vec::with_capacity(n);
1143 for _ in 0..n {
1144 v.push(
1145 super::common::streams_group_describe_response::task_offset::TaskOffset::decode_borrow(
1146 buf,
1147 version,
1148 )?,
1149 );
1150 }
1151 v
1152 };
1153 }
1154 if version >= 0 {
1155 out.assignment = super::common::streams_group_describe_response::assignment::Assignment::decode_borrow(
1156 buf,
1157 version,
1158 )?;
1159 }
1160 if version >= 0 {
1161 out.target_assignment = super::common::streams_group_describe_response::assignment::Assignment::decode_borrow(
1162 buf,
1163 version,
1164 )?;
1165 }
1166 if version >= 0 {
1167 out.is_classic = get_bool(buf)?;
1168 }
1169 if flex {
1170 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
1171 }
1172 Ok(out)
1173 }
1174}
1175#[cfg(test)]
1176impl Member<'_> {
1177 #[must_use]
1178 pub fn populated(version: i16) -> Self {
1179 let mut m = Self::default();
1180 if version >= 0 {
1181 m.member_id = "x";
1182 }
1183 if version >= 0 {
1184 m.member_epoch = 1i32;
1185 }
1186 if version >= 0 {
1187 m.instance_id = Some("x");
1188 }
1189 if version >= 0 {
1190 m.rack_id = Some("x");
1191 }
1192 if version >= 0 {
1193 m.client_id = "x";
1194 }
1195 if version >= 0 {
1196 m.client_host = "x";
1197 }
1198 if version >= 0 {
1199 m.topology_epoch = 1i32;
1200 }
1201 if version >= 0 {
1202 m.process_id = "x";
1203 }
1204 if version >= 0 {
1205 m.user_endpoint = Some(
1206 super::common::streams_group_describe_response::endpoint::Endpoint::populated(
1207 version,
1208 ),
1209 );
1210 }
1211 if version >= 0 {
1212 m.client_tags = vec![
1213 super::common::streams_group_describe_response::key_value::KeyValue::populated(
1214 version,
1215 ),
1216 ];
1217 }
1218 if version >= 0 {
1219 m.task_offsets = vec![
1220 super::common::streams_group_describe_response::task_offset::TaskOffset::populated(
1221 version,
1222 ),
1223 ];
1224 }
1225 if version >= 0 {
1226 m.task_end_offsets = vec![
1227 super::common::streams_group_describe_response::task_offset::TaskOffset::populated(
1228 version,
1229 ),
1230 ];
1231 }
1232 if version >= 0 {
1233 m.assignment =
1234 super::common::streams_group_describe_response::assignment::Assignment::populated(
1235 version,
1236 );
1237 }
1238 if version >= 0 {
1239 m.target_assignment =
1240 super::common::streams_group_describe_response::assignment::Assignment::populated(
1241 version,
1242 );
1243 }
1244 if version >= 0 {
1245 m.is_classic = true;
1246 }
1247 m
1248 }
1249}