1use bytes::BufMut;
4
5use crate::primitives::fixed::{get_bool, get_i8, get_i16, get_i32, put_bool, put_i16, put_i32};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, nullable_string_len,
8 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
9};
10use crate::primitives::string_bytes_borrowed::{
11 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
12 get_nullable_string_borrowed, get_string_borrowed,
13};
14use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
15use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
16
17pub const API_KEY: i16 = 89;
18pub const MIN_VERSION: i16 = 0;
19pub const MAX_VERSION: i16 = 0;
20pub const FLEXIBLE_MIN: i16 = 0;
21
22#[inline]
23fn is_flexible(version: i16) -> bool {
24 version >= FLEXIBLE_MIN
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Default)]
28pub struct StreamsGroupDescribeResponse<'a> {
29 pub throttle_time_ms: i32,
30 pub groups: Vec<DescribedGroup<'a>>,
31 pub unknown_tagged_fields: UnknownTaggedFields,
32}
33impl StreamsGroupDescribeResponse<'_> {
34 pub fn to_owned(
35 &self,
36 ) -> crate::owned::streams_group_describe_response::StreamsGroupDescribeResponse {
37 crate::owned::streams_group_describe_response::StreamsGroupDescribeResponse {
38 throttle_time_ms: (self.throttle_time_ms),
39 groups: (self.groups).iter().map(DescribedGroup::to_owned).collect(),
40 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
41 }
42 }
43}
44impl Encode for StreamsGroupDescribeResponse<'_> {
45 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
46 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
47 return Err(ProtocolError::UnsupportedVersion {
48 api_key: API_KEY,
49 version,
50 });
51 }
52 let flex = is_flexible(version);
53 if version >= 0 {
54 put_i32(buf, self.throttle_time_ms);
55 }
56 if version >= 0 {
57 {
58 crate::primitives::array::put_array_len(buf, (self.groups).len(), flex);
59 for it in &self.groups {
60 it.encode(buf, version)?;
61 }
62 }
63 }
64 if flex {
65 let tagged = WriteTaggedFields::new();
66 tagged.write(buf, &self.unknown_tagged_fields);
67 }
68 Ok(())
69 }
70 fn encoded_len(&self, version: i16) -> usize {
71 let flex = is_flexible(version);
72 let mut n: usize = 0;
73 if version >= 0 {
74 n += 4;
75 }
76 if version >= 0 {
77 n += {
78 let prefix =
79 crate::primitives::array::array_len_prefix_len((self.groups).len(), flex);
80 let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum();
81 prefix + body
82 };
83 }
84 if flex {
85 let known_pairs: Vec<(u32, usize)> = Vec::new();
86 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
87 }
88 n
89 }
90}
91impl<'de> DecodeBorrow<'de> for StreamsGroupDescribeResponse<'de> {
92 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
93 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
94 return Err(ProtocolError::UnsupportedVersion {
95 api_key: API_KEY,
96 version,
97 });
98 }
99 let flex = is_flexible(version);
100 let mut out = Self::default();
101 if version >= 0 {
102 out.throttle_time_ms = get_i32(buf)?;
103 }
104 if version >= 0 {
105 out.groups = {
106 let n = crate::primitives::array::get_array_len(buf, flex)?;
107 let mut v = Vec::with_capacity(n);
108 for _ in 0..n {
109 v.push(DescribedGroup::decode_borrow(buf, version)?);
110 }
111 v
112 };
113 }
114 if flex {
115 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
116 }
117 Ok(out)
118 }
119}
120#[cfg(test)]
121impl StreamsGroupDescribeResponse<'_> {
122 #[must_use]
123 pub fn populated(version: i16) -> Self {
124 let mut m = Self::default();
125 if version >= 0 {
126 m.throttle_time_ms = 1i32;
127 }
128 if version >= 0 {
129 m.groups = vec![DescribedGroup::populated(version)];
130 }
131 m
132 }
133}
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct DescribedGroup<'a> {
136 pub error_code: i16,
137 pub error_message: Option<&'a str>,
138 pub group_id: &'a str,
139 pub group_state: &'a str,
140 pub group_epoch: i32,
141 pub assignment_epoch: i32,
142 pub topology: Option<Topology<'a>>,
143 pub members: Vec<Member<'a>>,
144 pub authorized_operations: i32,
145 pub unknown_tagged_fields: UnknownTaggedFields,
146}
147impl Default for DescribedGroup<'_> {
148 fn default() -> Self {
149 Self {
150 error_code: 0i16,
151 error_message: None,
152 group_id: "",
153 group_state: "",
154 group_epoch: 0i32,
155 assignment_epoch: 0i32,
156 topology: None,
157 members: Vec::new(),
158 authorized_operations: -2_147_483_648i32,
159 unknown_tagged_fields: Default::default(),
160 }
161 }
162}
163impl DescribedGroup<'_> {
164 pub fn to_owned(&self) -> crate::owned::streams_group_describe_response::DescribedGroup {
165 crate::owned::streams_group_describe_response::DescribedGroup {
166 error_code: (self.error_code),
167 error_message: (self.error_message).map(std::string::ToString::to_string),
168 group_id: (self.group_id).to_string(),
169 group_state: (self.group_state).to_string(),
170 group_epoch: (self.group_epoch),
171 assignment_epoch: (self.assignment_epoch),
172 topology: (self.topology).as_ref().map(Topology::to_owned),
173 members: (self.members).iter().map(Member::to_owned).collect(),
174 authorized_operations: (self.authorized_operations),
175 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
176 }
177 }
178}
179impl Encode for DescribedGroup<'_> {
180 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
181 let flex = version >= 0;
182 if version >= 0 {
183 put_i16(buf, self.error_code);
184 }
185 if version >= 0 {
186 if flex {
187 put_compact_nullable_string(buf, self.error_message);
188 } else {
189 put_nullable_string(buf, self.error_message);
190 }
191 }
192 if version >= 0 {
193 if flex {
194 put_compact_string(buf, self.group_id);
195 } else {
196 put_string(buf, self.group_id);
197 }
198 }
199 if version >= 0 {
200 if flex {
201 put_compact_string(buf, self.group_state);
202 } else {
203 put_string(buf, self.group_state);
204 }
205 }
206 if version >= 0 {
207 put_i32(buf, self.group_epoch);
208 }
209 if version >= 0 {
210 put_i32(buf, self.assignment_epoch);
211 }
212 if version >= 0 {
213 match &self.topology {
214 None => {
215 buf.put_i8(-1);
216 }
217 Some(v) => {
218 buf.put_i8(1);
219 v.encode(buf, version)?;
220 }
221 }
222 }
223 if version >= 0 {
224 {
225 crate::primitives::array::put_array_len(buf, (self.members).len(), flex);
226 for it in &self.members {
227 it.encode(buf, version)?;
228 }
229 }
230 }
231 if version >= 0 {
232 put_i32(buf, self.authorized_operations);
233 }
234 if flex {
235 let tagged = WriteTaggedFields::new();
236 tagged.write(buf, &self.unknown_tagged_fields);
237 }
238 Ok(())
239 }
240 fn encoded_len(&self, version: i16) -> usize {
241 let flex = version >= 0;
242 let mut n: usize = 0;
243 if version >= 0 {
244 n += 2;
245 }
246 if version >= 0 {
247 n += if flex {
248 compact_nullable_string_len(self.error_message)
249 } else {
250 nullable_string_len(self.error_message)
251 };
252 }
253 if version >= 0 {
254 n += if flex {
255 compact_string_len(self.group_id)
256 } else {
257 string_len(self.group_id)
258 };
259 }
260 if version >= 0 {
261 n += if flex {
262 compact_string_len(self.group_state)
263 } else {
264 string_len(self.group_state)
265 };
266 }
267 if version >= 0 {
268 n += 4;
269 }
270 if version >= 0 {
271 n += 4;
272 }
273 if version >= 0 {
274 n += 1 + self.topology.as_ref().map_or(0, |v| v.encoded_len(version));
275 }
276 if version >= 0 {
277 n += {
278 let prefix =
279 crate::primitives::array::array_len_prefix_len((self.members).len(), flex);
280 let body: usize = (self.members)
281 .iter()
282 .map(|it| it.encoded_len(version))
283 .sum();
284 prefix + body
285 };
286 }
287 if version >= 0 {
288 n += 4;
289 }
290 if flex {
291 let known_pairs: Vec<(u32, usize)> = Vec::new();
292 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
293 }
294 n
295 }
296}
297impl<'de> DecodeBorrow<'de> for DescribedGroup<'de> {
298 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
299 let flex = version >= 0;
300 let mut out = Self::default();
301 if version >= 0 {
302 out.error_code = get_i16(buf)?;
303 }
304 if version >= 0 {
305 out.error_message = if flex {
306 get_compact_nullable_string_borrowed(buf)?
307 } else {
308 get_nullable_string_borrowed(buf)?
309 };
310 }
311 if version >= 0 {
312 out.group_id = if flex {
313 get_compact_string_borrowed(buf)?
314 } else {
315 get_string_borrowed(buf)?
316 };
317 }
318 if version >= 0 {
319 out.group_state = if flex {
320 get_compact_string_borrowed(buf)?
321 } else {
322 get_string_borrowed(buf)?
323 };
324 }
325 if version >= 0 {
326 out.group_epoch = get_i32(buf)?;
327 }
328 if version >= 0 {
329 out.assignment_epoch = get_i32(buf)?;
330 }
331 if version >= 0 {
332 out.topology = if get_i8(buf)? < 0 {
333 None
334 } else {
335 Some(Topology::decode_borrow(buf, version)?)
336 };
337 }
338 if version >= 0 {
339 out.members = {
340 let n = crate::primitives::array::get_array_len(buf, flex)?;
341 let mut v = Vec::with_capacity(n);
342 for _ in 0..n {
343 v.push(Member::decode_borrow(buf, version)?);
344 }
345 v
346 };
347 }
348 if version >= 0 {
349 out.authorized_operations = get_i32(buf)?;
350 }
351 if flex {
352 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
353 }
354 Ok(out)
355 }
356}
357#[cfg(test)]
358impl DescribedGroup<'_> {
359 #[must_use]
360 pub fn populated(version: i16) -> Self {
361 let mut m = Self::default();
362 if version >= 0 {
363 m.error_code = 1i16;
364 }
365 if version >= 0 {
366 m.error_message = Some("x");
367 }
368 if version >= 0 {
369 m.group_id = "x";
370 }
371 if version >= 0 {
372 m.group_state = "x";
373 }
374 if version >= 0 {
375 m.group_epoch = 1i32;
376 }
377 if version >= 0 {
378 m.assignment_epoch = 1i32;
379 }
380 if version >= 0 {
381 m.topology = Some(Topology::populated(version));
382 }
383 if version >= 0 {
384 m.members = vec![Member::populated(version)];
385 }
386 if version >= 0 {
387 m.authorized_operations = 1i32;
388 }
389 m
390 }
391}
392#[derive(Debug, Clone, PartialEq, Eq, Default)]
393pub struct Topology<'a> {
394 pub epoch: i32,
395 pub subtopologies: Option<Vec<Subtopology<'a>>>,
396 pub unknown_tagged_fields: UnknownTaggedFields,
397}
398impl Topology<'_> {
399 pub fn to_owned(&self) -> crate::owned::streams_group_describe_response::Topology {
400 crate::owned::streams_group_describe_response::Topology {
401 epoch: (self.epoch),
402 subtopologies: (self.subtopologies)
403 .as_ref()
404 .map(|v| v.iter().map(Subtopology::to_owned).collect()),
405 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
406 }
407 }
408}
409impl Encode for Topology<'_> {
410 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
411 let flex = version >= 0;
412 if version >= 0 {
413 put_i32(buf, self.epoch);
414 }
415 if version >= 0 {
416 {
417 let len = (self.subtopologies).as_ref().map(Vec::len);
418 crate::primitives::array::put_nullable_array_len(buf, len, flex);
419 if let Some(v) = &self.subtopologies {
420 for it in v {
421 it.encode(buf, version)?;
422 }
423 }
424 }
425 }
426 if flex {
427 let tagged = WriteTaggedFields::new();
428 tagged.write(buf, &self.unknown_tagged_fields);
429 }
430 Ok(())
431 }
432 fn encoded_len(&self, version: i16) -> usize {
433 let flex = version >= 0;
434 let mut n: usize = 0;
435 if version >= 0 {
436 n += 4;
437 }
438 if version >= 0 {
439 n += {
440 let opt: Option<&Vec<_>> = (self.subtopologies).as_ref();
441 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
442 opt.map(std::vec::Vec::len),
443 flex,
444 );
445 let body: usize =
446 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
447 prefix + body
448 };
449 }
450 if flex {
451 let known_pairs: Vec<(u32, usize)> = Vec::new();
452 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
453 }
454 n
455 }
456}
457impl<'de> DecodeBorrow<'de> for Topology<'de> {
458 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
459 let flex = version >= 0;
460 let mut out = Self::default();
461 if version >= 0 {
462 out.epoch = get_i32(buf)?;
463 }
464 if version >= 0 {
465 out.subtopologies = {
466 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
467 match opt {
468 None => None,
469 Some(n) => {
470 let mut v = Vec::with_capacity(n);
471 for _ in 0..n {
472 v.push(Subtopology::decode_borrow(buf, version)?);
473 }
474 Some(v)
475 }
476 }
477 };
478 }
479 if flex {
480 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
481 }
482 Ok(out)
483 }
484}
485#[cfg(test)]
486impl Topology<'_> {
487 #[must_use]
488 pub fn populated(version: i16) -> Self {
489 let mut m = Self::default();
490 if version >= 0 {
491 m.epoch = 1i32;
492 }
493 if version >= 0 {
494 m.subtopologies = Some(vec![Subtopology::populated(version)]);
495 }
496 m
497 }
498}
499#[derive(Debug, Clone, PartialEq, Eq, Default)]
500pub struct Subtopology<'a> {
501 pub subtopology_id: &'a str,
502 pub source_topics: Vec<&'a str>,
503 pub repartition_sink_topics: Vec<&'a str>,
504 pub state_changelog_topics:
505 Vec<super::common::streams_group_describe_response::topic_info::TopicInfo<'a>>,
506 pub repartition_source_topics:
507 Vec<super::common::streams_group_describe_response::topic_info::TopicInfo<'a>>,
508 pub unknown_tagged_fields: UnknownTaggedFields,
509}
510impl Subtopology<'_> {
511 pub fn to_owned(&self) -> crate::owned::streams_group_describe_response::Subtopology {
512 crate::owned::streams_group_describe_response::Subtopology {
513 subtopology_id: (self.subtopology_id).to_string(),
514 source_topics: (self.source_topics)
515 .iter()
516 .map(std::string::ToString::to_string)
517 .collect(),
518 repartition_sink_topics: (self.repartition_sink_topics)
519 .iter()
520 .map(std::string::ToString::to_string)
521 .collect(),
522 state_changelog_topics: (self.state_changelog_topics)
523 .iter()
524 .map(
525 super::common::streams_group_describe_response::topic_info::TopicInfo::to_owned,
526 )
527 .collect(),
528 repartition_source_topics: (self.repartition_source_topics)
529 .iter()
530 .map(
531 super::common::streams_group_describe_response::topic_info::TopicInfo::to_owned,
532 )
533 .collect(),
534 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
535 }
536 }
537}
538impl Encode for Subtopology<'_> {
539 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
540 let flex = version >= 0;
541 if version >= 0 {
542 if flex {
543 put_compact_string(buf, self.subtopology_id);
544 } else {
545 put_string(buf, self.subtopology_id);
546 }
547 }
548 if version >= 0 {
549 {
550 crate::primitives::array::put_array_len(buf, (self.source_topics).len(), flex);
551 for it in &self.source_topics {
552 if flex {
553 put_compact_string(buf, it);
554 } else {
555 put_string(buf, it);
556 }
557 }
558 }
559 }
560 if version >= 0 {
561 {
562 crate::primitives::array::put_array_len(
563 buf,
564 (self.repartition_sink_topics).len(),
565 flex,
566 );
567 for it in &self.repartition_sink_topics {
568 if flex {
569 put_compact_string(buf, it);
570 } else {
571 put_string(buf, it);
572 }
573 }
574 }
575 }
576 if version >= 0 {
577 {
578 crate::primitives::array::put_array_len(
579 buf,
580 (self.state_changelog_topics).len(),
581 flex,
582 );
583 for it in &self.state_changelog_topics {
584 it.encode(buf, version)?;
585 }
586 }
587 }
588 if version >= 0 {
589 {
590 crate::primitives::array::put_array_len(
591 buf,
592 (self.repartition_source_topics).len(),
593 flex,
594 );
595 for it in &self.repartition_source_topics {
596 it.encode(buf, version)?;
597 }
598 }
599 }
600 if flex {
601 let tagged = WriteTaggedFields::new();
602 tagged.write(buf, &self.unknown_tagged_fields);
603 }
604 Ok(())
605 }
606 fn encoded_len(&self, version: i16) -> usize {
607 let flex = version >= 0;
608 let mut n: usize = 0;
609 if version >= 0 {
610 n += if flex {
611 compact_string_len(self.subtopology_id)
612 } else {
613 string_len(self.subtopology_id)
614 };
615 }
616 if version >= 0 {
617 n += {
618 let prefix = crate::primitives::array::array_len_prefix_len(
619 (self.source_topics).len(),
620 flex,
621 );
622 let body: usize = (self.source_topics)
623 .iter()
624 .map(|it| {
625 if flex {
626 compact_string_len(it)
627 } else {
628 string_len(it)
629 }
630 })
631 .sum();
632 prefix + body
633 };
634 }
635 if version >= 0 {
636 n += {
637 let prefix = crate::primitives::array::array_len_prefix_len(
638 (self.repartition_sink_topics).len(),
639 flex,
640 );
641 let body: usize = (self.repartition_sink_topics)
642 .iter()
643 .map(|it| {
644 if flex {
645 compact_string_len(it)
646 } else {
647 string_len(it)
648 }
649 })
650 .sum();
651 prefix + body
652 };
653 }
654 if version >= 0 {
655 n += {
656 let prefix = crate::primitives::array::array_len_prefix_len(
657 (self.state_changelog_topics).len(),
658 flex,
659 );
660 let body: usize = (self.state_changelog_topics)
661 .iter()
662 .map(|it| it.encoded_len(version))
663 .sum();
664 prefix + body
665 };
666 }
667 if version >= 0 {
668 n += {
669 let prefix = crate::primitives::array::array_len_prefix_len(
670 (self.repartition_source_topics).len(),
671 flex,
672 );
673 let body: usize = (self.repartition_source_topics)
674 .iter()
675 .map(|it| it.encoded_len(version))
676 .sum();
677 prefix + body
678 };
679 }
680 if flex {
681 let known_pairs: Vec<(u32, usize)> = Vec::new();
682 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
683 }
684 n
685 }
686}
687impl<'de> DecodeBorrow<'de> for Subtopology<'de> {
688 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
689 let flex = version >= 0;
690 let mut out = Self::default();
691 if version >= 0 {
692 out.subtopology_id = if flex {
693 get_compact_string_borrowed(buf)?
694 } else {
695 get_string_borrowed(buf)?
696 };
697 }
698 if version >= 0 {
699 out.source_topics = {
700 let n = crate::primitives::array::get_array_len(buf, flex)?;
701 let mut v = Vec::with_capacity(n);
702 for _ in 0..n {
703 v.push(if flex {
704 get_compact_string_borrowed(buf)?
705 } else {
706 get_string_borrowed(buf)?
707 });
708 }
709 v
710 };
711 }
712 if version >= 0 {
713 out.repartition_sink_topics = {
714 let n = crate::primitives::array::get_array_len(buf, flex)?;
715 let mut v = Vec::with_capacity(n);
716 for _ in 0..n {
717 v.push(if flex {
718 get_compact_string_borrowed(buf)?
719 } else {
720 get_string_borrowed(buf)?
721 });
722 }
723 v
724 };
725 }
726 if version >= 0 {
727 out.state_changelog_topics = {
728 let n = crate::primitives::array::get_array_len(buf, flex)?;
729 let mut v = Vec::with_capacity(n);
730 for _ in 0..n {
731 v . push (super :: common :: streams_group_describe_response :: topic_info :: TopicInfo :: decode_borrow (buf , version) ?) ;
732 }
733 v
734 };
735 }
736 if version >= 0 {
737 out.repartition_source_topics = {
738 let n = crate::primitives::array::get_array_len(buf, flex)?;
739 let mut v = Vec::with_capacity(n);
740 for _ in 0..n {
741 v . push (super :: common :: streams_group_describe_response :: topic_info :: TopicInfo :: decode_borrow (buf , version) ?) ;
742 }
743 v
744 };
745 }
746 if flex {
747 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
748 }
749 Ok(out)
750 }
751}
752#[cfg(test)]
753impl Subtopology<'_> {
754 #[must_use]
755 pub fn populated(version: i16) -> Self {
756 let mut m = Self::default();
757 if version >= 0 {
758 m.subtopology_id = "x";
759 }
760 if version >= 0 {
761 m.source_topics = vec!["x"];
762 }
763 if version >= 0 {
764 m.repartition_sink_topics = vec!["x"];
765 }
766 if version >= 0 {
767 m.state_changelog_topics = vec![
768 super::common::streams_group_describe_response::topic_info::TopicInfo::populated(
769 version,
770 ),
771 ];
772 }
773 if version >= 0 {
774 m.repartition_source_topics = vec![
775 super::common::streams_group_describe_response::topic_info::TopicInfo::populated(
776 version,
777 ),
778 ];
779 }
780 m
781 }
782}
783#[derive(Debug, Clone, PartialEq, Eq, Default)]
784pub struct Member<'a> {
785 pub member_id: &'a str,
786 pub member_epoch: i32,
787 pub instance_id: Option<&'a str>,
788 pub rack_id: Option<&'a str>,
789 pub client_id: &'a str,
790 pub client_host: &'a str,
791 pub topology_epoch: i32,
792 pub process_id: &'a str,
793 pub user_endpoint:
794 Option<super::common::streams_group_describe_response::endpoint::Endpoint<'a>>,
795 pub client_tags: Vec<super::common::streams_group_describe_response::key_value::KeyValue<'a>>,
796 pub task_offsets:
797 Vec<super::common::streams_group_describe_response::task_offset::TaskOffset<'a>>,
798 pub task_end_offsets:
799 Vec<super::common::streams_group_describe_response::task_offset::TaskOffset<'a>>,
800 pub assignment: super::common::streams_group_describe_response::assignment::Assignment<'a>,
801 pub target_assignment:
802 super::common::streams_group_describe_response::assignment::Assignment<'a>,
803 pub is_classic: bool,
804 pub unknown_tagged_fields: UnknownTaggedFields,
805}
806impl Member<'_> {
807 pub fn to_owned(&self) -> crate::owned::streams_group_describe_response::Member {
808 crate::owned::streams_group_describe_response::Member {
809 member_id: (self.member_id).to_string(),
810 member_epoch: (self.member_epoch),
811 instance_id: (self.instance_id).map(std::string::ToString::to_string),
812 rack_id: (self.rack_id).map(std::string::ToString::to_string),
813 client_id: (self.client_id).to_string(),
814 client_host: (self.client_host).to_string(),
815 topology_epoch: (self.topology_epoch),
816 process_id: (self.process_id).to_string(),
817 user_endpoint: (self.user_endpoint).as_ref().map(super::common::streams_group_describe_response::endpoint::Endpoint::to_owned),
818 client_tags: (self.client_tags).iter().map(super::common::streams_group_describe_response::key_value::KeyValue::to_owned).collect(),
819 task_offsets: (self.task_offsets).iter().map(super::common::streams_group_describe_response::task_offset::TaskOffset::to_owned).collect(),
820 task_end_offsets: (self.task_end_offsets)
821 .iter()
822 .map(super::common::streams_group_describe_response::task_offset::TaskOffset::to_owned)
823 .collect(),
824 assignment: (self.assignment).to_owned(),
825 target_assignment: (self.target_assignment).to_owned(),
826 is_classic: (self.is_classic),
827 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
828 }
829 }
830}
831impl Encode for Member<'_> {
832 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
833 let flex = version >= 0;
834 if version >= 0 {
835 if flex {
836 put_compact_string(buf, self.member_id);
837 } else {
838 put_string(buf, self.member_id);
839 }
840 }
841 if version >= 0 {
842 put_i32(buf, self.member_epoch);
843 }
844 if version >= 0 {
845 if flex {
846 put_compact_nullable_string(buf, self.instance_id);
847 } else {
848 put_nullable_string(buf, self.instance_id);
849 }
850 }
851 if version >= 0 {
852 if flex {
853 put_compact_nullable_string(buf, self.rack_id);
854 } else {
855 put_nullable_string(buf, self.rack_id);
856 }
857 }
858 if version >= 0 {
859 if flex {
860 put_compact_string(buf, self.client_id);
861 } else {
862 put_string(buf, self.client_id);
863 }
864 }
865 if version >= 0 {
866 if flex {
867 put_compact_string(buf, self.client_host);
868 } else {
869 put_string(buf, self.client_host);
870 }
871 }
872 if version >= 0 {
873 put_i32(buf, self.topology_epoch);
874 }
875 if version >= 0 {
876 if flex {
877 put_compact_string(buf, self.process_id);
878 } else {
879 put_string(buf, self.process_id);
880 }
881 }
882 if version >= 0 {
883 match &self.user_endpoint {
884 None => {
885 buf.put_i8(-1);
886 }
887 Some(v) => {
888 buf.put_i8(1);
889 v.encode(buf, version)?;
890 }
891 }
892 }
893 if version >= 0 {
894 {
895 crate::primitives::array::put_array_len(buf, (self.client_tags).len(), flex);
896 for it in &self.client_tags {
897 it.encode(buf, version)?;
898 }
899 }
900 }
901 if version >= 0 {
902 {
903 crate::primitives::array::put_array_len(buf, (self.task_offsets).len(), flex);
904 for it in &self.task_offsets {
905 it.encode(buf, version)?;
906 }
907 }
908 }
909 if version >= 0 {
910 {
911 crate::primitives::array::put_array_len(buf, (self.task_end_offsets).len(), flex);
912 for it in &self.task_end_offsets {
913 it.encode(buf, version)?;
914 }
915 }
916 }
917 if version >= 0 {
918 self.assignment.encode(buf, version)?;
919 }
920 if version >= 0 {
921 self.target_assignment.encode(buf, version)?;
922 }
923 if version >= 0 {
924 put_bool(buf, self.is_classic);
925 }
926 if flex {
927 let tagged = WriteTaggedFields::new();
928 tagged.write(buf, &self.unknown_tagged_fields);
929 }
930 Ok(())
931 }
932 fn encoded_len(&self, version: i16) -> usize {
933 let flex = version >= 0;
934 let mut n: usize = 0;
935 if version >= 0 {
936 n += if flex {
937 compact_string_len(self.member_id)
938 } else {
939 string_len(self.member_id)
940 };
941 }
942 if version >= 0 {
943 n += 4;
944 }
945 if version >= 0 {
946 n += if flex {
947 compact_nullable_string_len(self.instance_id)
948 } else {
949 nullable_string_len(self.instance_id)
950 };
951 }
952 if version >= 0 {
953 n += if flex {
954 compact_nullable_string_len(self.rack_id)
955 } else {
956 nullable_string_len(self.rack_id)
957 };
958 }
959 if version >= 0 {
960 n += if flex {
961 compact_string_len(self.client_id)
962 } else {
963 string_len(self.client_id)
964 };
965 }
966 if version >= 0 {
967 n += if flex {
968 compact_string_len(self.client_host)
969 } else {
970 string_len(self.client_host)
971 };
972 }
973 if version >= 0 {
974 n += 4;
975 }
976 if version >= 0 {
977 n += if flex {
978 compact_string_len(self.process_id)
979 } else {
980 string_len(self.process_id)
981 };
982 }
983 if version >= 0 {
984 n += 1 + self
985 .user_endpoint
986 .as_ref()
987 .map_or(0, |v| v.encoded_len(version));
988 }
989 if version >= 0 {
990 n += {
991 let prefix =
992 crate::primitives::array::array_len_prefix_len((self.client_tags).len(), flex);
993 let body: usize = (self.client_tags)
994 .iter()
995 .map(|it| it.encoded_len(version))
996 .sum();
997 prefix + body
998 };
999 }
1000 if version >= 0 {
1001 n += {
1002 let prefix =
1003 crate::primitives::array::array_len_prefix_len((self.task_offsets).len(), flex);
1004 let body: usize = (self.task_offsets)
1005 .iter()
1006 .map(|it| it.encoded_len(version))
1007 .sum();
1008 prefix + body
1009 };
1010 }
1011 if version >= 0 {
1012 n += {
1013 let prefix = crate::primitives::array::array_len_prefix_len(
1014 (self.task_end_offsets).len(),
1015 flex,
1016 );
1017 let body: usize = (self.task_end_offsets)
1018 .iter()
1019 .map(|it| it.encoded_len(version))
1020 .sum();
1021 prefix + body
1022 };
1023 }
1024 if version >= 0 {
1025 n += self.assignment.encoded_len(version);
1026 }
1027 if version >= 0 {
1028 n += self.target_assignment.encoded_len(version);
1029 }
1030 if version >= 0 {
1031 n += 1;
1032 }
1033 if flex {
1034 let known_pairs: Vec<(u32, usize)> = Vec::new();
1035 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
1036 }
1037 n
1038 }
1039}
1040impl<'de> DecodeBorrow<'de> for Member<'de> {
1041 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
1042 let flex = version >= 0;
1043 let mut out = Self::default();
1044 if version >= 0 {
1045 out.member_id = if flex {
1046 get_compact_string_borrowed(buf)?
1047 } else {
1048 get_string_borrowed(buf)?
1049 };
1050 }
1051 if version >= 0 {
1052 out.member_epoch = get_i32(buf)?;
1053 }
1054 if version >= 0 {
1055 out.instance_id = if flex {
1056 get_compact_nullable_string_borrowed(buf)?
1057 } else {
1058 get_nullable_string_borrowed(buf)?
1059 };
1060 }
1061 if version >= 0 {
1062 out.rack_id = if flex {
1063 get_compact_nullable_string_borrowed(buf)?
1064 } else {
1065 get_nullable_string_borrowed(buf)?
1066 };
1067 }
1068 if version >= 0 {
1069 out.client_id = if flex {
1070 get_compact_string_borrowed(buf)?
1071 } else {
1072 get_string_borrowed(buf)?
1073 };
1074 }
1075 if version >= 0 {
1076 out.client_host = if flex {
1077 get_compact_string_borrowed(buf)?
1078 } else {
1079 get_string_borrowed(buf)?
1080 };
1081 }
1082 if version >= 0 {
1083 out.topology_epoch = get_i32(buf)?;
1084 }
1085 if version >= 0 {
1086 out.process_id = if flex {
1087 get_compact_string_borrowed(buf)?
1088 } else {
1089 get_string_borrowed(buf)?
1090 };
1091 }
1092 if version >= 0 {
1093 out.user_endpoint = if get_i8(buf)? < 0 {
1094 None
1095 } else {
1096 Some (super :: common :: streams_group_describe_response :: endpoint :: Endpoint :: decode_borrow (buf , version) ?)
1097 };
1098 }
1099 if version >= 0 {
1100 out.client_tags = {
1101 let n = crate::primitives::array::get_array_len(buf, flex)?;
1102 let mut v = Vec::with_capacity(n);
1103 for _ in 0..n {
1104 v . push (super :: common :: streams_group_describe_response :: key_value :: KeyValue :: decode_borrow (buf , version) ?) ;
1105 }
1106 v
1107 };
1108 }
1109 if version >= 0 {
1110 out.task_offsets = {
1111 let n = crate::primitives::array::get_array_len(buf, flex)?;
1112 let mut v = Vec::with_capacity(n);
1113 for _ in 0..n {
1114 v . push (super :: common :: streams_group_describe_response :: task_offset :: TaskOffset :: decode_borrow (buf , version) ?) ;
1115 }
1116 v
1117 };
1118 }
1119 if version >= 0 {
1120 out.task_end_offsets = {
1121 let n = crate::primitives::array::get_array_len(buf, flex)?;
1122 let mut v = Vec::with_capacity(n);
1123 for _ in 0..n {
1124 v . push (super :: common :: streams_group_describe_response :: task_offset :: TaskOffset :: decode_borrow (buf , version) ?) ;
1125 }
1126 v
1127 };
1128 }
1129 if version >= 0 {
1130 out . assignment = super :: common :: streams_group_describe_response :: assignment :: Assignment :: decode_borrow (buf , version) ? ;
1131 }
1132 if version >= 0 {
1133 out . target_assignment = super :: common :: streams_group_describe_response :: assignment :: Assignment :: decode_borrow (buf , version) ? ;
1134 }
1135 if version >= 0 {
1136 out.is_classic = get_bool(buf)?;
1137 }
1138 if flex {
1139 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
1140 }
1141 Ok(out)
1142 }
1143}
1144#[cfg(test)]
1145impl Member<'_> {
1146 #[must_use]
1147 pub fn populated(version: i16) -> Self {
1148 let mut m = Self::default();
1149 if version >= 0 {
1150 m.member_id = "x";
1151 }
1152 if version >= 0 {
1153 m.member_epoch = 1i32;
1154 }
1155 if version >= 0 {
1156 m.instance_id = Some("x");
1157 }
1158 if version >= 0 {
1159 m.rack_id = Some("x");
1160 }
1161 if version >= 0 {
1162 m.client_id = "x";
1163 }
1164 if version >= 0 {
1165 m.client_host = "x";
1166 }
1167 if version >= 0 {
1168 m.topology_epoch = 1i32;
1169 }
1170 if version >= 0 {
1171 m.process_id = "x";
1172 }
1173 if version >= 0 {
1174 m.user_endpoint = Some(
1175 super::common::streams_group_describe_response::endpoint::Endpoint::populated(
1176 version,
1177 ),
1178 );
1179 }
1180 if version >= 0 {
1181 m.client_tags = vec![
1182 super::common::streams_group_describe_response::key_value::KeyValue::populated(
1183 version,
1184 ),
1185 ];
1186 }
1187 if version >= 0 {
1188 m.task_offsets = vec![
1189 super::common::streams_group_describe_response::task_offset::TaskOffset::populated(
1190 version,
1191 ),
1192 ];
1193 }
1194 if version >= 0 {
1195 m.task_end_offsets = vec![
1196 super::common::streams_group_describe_response::task_offset::TaskOffset::populated(
1197 version,
1198 ),
1199 ];
1200 }
1201 if version >= 0 {
1202 m.assignment =
1203 super::common::streams_group_describe_response::assignment::Assignment::populated(
1204 version,
1205 );
1206 }
1207 if version >= 0 {
1208 m.target_assignment =
1209 super::common::streams_group_describe_response::assignment::Assignment::populated(
1210 version,
1211 );
1212 }
1213 if version >= 0 {
1214 m.is_classic = true;
1215 }
1216 m
1217 }
1218}