Skip to main content

crabka_protocol/opt/rustwide/workdir/generated/
StreamsGroupHeartbeatRequest.owned.rs

1// AUTO-GENERATED by crabka-protocol-codegen against a9ce3221537b8653448750697915607dc7936cf3. Do not edit.
2
3use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_bool, get_i8, get_i16, get_i32, put_bool, put_i16, put_i32};
6use crate::primitives::string_bytes::{
7    compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8    get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9    put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
13
14pub const API_KEY: i16 = 88;
15pub const MIN_VERSION: i16 = 0;
16pub const MAX_VERSION: i16 = 0;
17pub const FLEXIBLE_MIN: i16 = 0;
18
19#[inline]
20fn is_flexible(version: i16) -> bool {
21    version >= FLEXIBLE_MIN
22}
23
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct StreamsGroupHeartbeatRequest {
26    pub group_id: String,
27    pub member_id: String,
28    pub member_epoch: i32,
29    pub endpoint_information_epoch: i32,
30    pub instance_id: Option<String>,
31    pub rack_id: Option<String>,
32    pub rebalance_timeout_ms: i32,
33    pub topology: Option<Topology>,
34    pub active_tasks:
35        Option<Vec<super::common::streams_group_heartbeat_request::task_ids::TaskIds>>,
36    pub standby_tasks:
37        Option<Vec<super::common::streams_group_heartbeat_request::task_ids::TaskIds>>,
38    pub warmup_tasks:
39        Option<Vec<super::common::streams_group_heartbeat_request::task_ids::TaskIds>>,
40    pub process_id: Option<String>,
41    pub user_endpoint: Option<super::common::streams_group_heartbeat_request::endpoint::Endpoint>,
42    pub client_tags:
43        Option<Vec<super::common::streams_group_heartbeat_request::key_value::KeyValue>>,
44    pub task_offsets:
45        Option<Vec<super::common::streams_group_heartbeat_request::task_offset::TaskOffset>>,
46    pub task_end_offsets:
47        Option<Vec<super::common::streams_group_heartbeat_request::task_offset::TaskOffset>>,
48    pub shutdown_application: bool,
49    pub unknown_tagged_fields: UnknownTaggedFields,
50}
51impl Default for StreamsGroupHeartbeatRequest {
52    fn default() -> Self {
53        Self {
54            group_id: String::new(),
55            member_id: String::new(),
56            member_epoch: 0i32,
57            endpoint_information_epoch: 0i32,
58            instance_id: None,
59            rack_id: None,
60            rebalance_timeout_ms: -1i32,
61            topology: None,
62            active_tasks: None,
63            standby_tasks: None,
64            warmup_tasks: None,
65            process_id: None,
66            user_endpoint: None,
67            client_tags: None,
68            task_offsets: None,
69            task_end_offsets: None,
70            shutdown_application: false,
71            unknown_tagged_fields: Default::default(),
72        }
73    }
74}
75impl Encode for StreamsGroupHeartbeatRequest {
76    fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
77        if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
78            return Err(ProtocolError::UnsupportedVersion {
79                api_key: API_KEY,
80                version,
81            });
82        }
83        let flex = is_flexible(version);
84        if version >= 0 {
85            if flex {
86                put_compact_string(buf, &self.group_id);
87            } else {
88                put_string(buf, &self.group_id);
89            }
90        }
91        if version >= 0 {
92            if flex {
93                put_compact_string(buf, &self.member_id);
94            } else {
95                put_string(buf, &self.member_id);
96            }
97        }
98        if version >= 0 {
99            put_i32(buf, self.member_epoch);
100        }
101        if version >= 0 {
102            put_i32(buf, self.endpoint_information_epoch);
103        }
104        if version >= 0 {
105            if flex {
106                put_compact_nullable_string(buf, self.instance_id.as_deref());
107            } else {
108                put_nullable_string(buf, self.instance_id.as_deref());
109            }
110        }
111        if version >= 0 {
112            if flex {
113                put_compact_nullable_string(buf, self.rack_id.as_deref());
114            } else {
115                put_nullable_string(buf, self.rack_id.as_deref());
116            }
117        }
118        if version >= 0 {
119            put_i32(buf, self.rebalance_timeout_ms);
120        }
121        if version >= 0 {
122            match &self.topology {
123                None => {
124                    buf.put_i8(-1);
125                }
126                Some(v) => {
127                    buf.put_i8(1);
128                    v.encode(buf, version)?;
129                }
130            }
131        }
132        if version >= 0 {
133            {
134                let len = (self.active_tasks).as_ref().map(Vec::len);
135                crate::primitives::array::put_nullable_array_len(buf, len, flex);
136                if let Some(v) = &self.active_tasks {
137                    for it in v {
138                        it.encode(buf, version)?;
139                    }
140                }
141            }
142        }
143        if version >= 0 {
144            {
145                let len = (self.standby_tasks).as_ref().map(Vec::len);
146                crate::primitives::array::put_nullable_array_len(buf, len, flex);
147                if let Some(v) = &self.standby_tasks {
148                    for it in v {
149                        it.encode(buf, version)?;
150                    }
151                }
152            }
153        }
154        if version >= 0 {
155            {
156                let len = (self.warmup_tasks).as_ref().map(Vec::len);
157                crate::primitives::array::put_nullable_array_len(buf, len, flex);
158                if let Some(v) = &self.warmup_tasks {
159                    for it in v {
160                        it.encode(buf, version)?;
161                    }
162                }
163            }
164        }
165        if version >= 0 {
166            if flex {
167                put_compact_nullable_string(buf, self.process_id.as_deref());
168            } else {
169                put_nullable_string(buf, self.process_id.as_deref());
170            }
171        }
172        if version >= 0 {
173            match &self.user_endpoint {
174                None => {
175                    buf.put_i8(-1);
176                }
177                Some(v) => {
178                    buf.put_i8(1);
179                    v.encode(buf, version)?;
180                }
181            }
182        }
183        if version >= 0 {
184            {
185                let len = (self.client_tags).as_ref().map(Vec::len);
186                crate::primitives::array::put_nullable_array_len(buf, len, flex);
187                if let Some(v) = &self.client_tags {
188                    for it in v {
189                        it.encode(buf, version)?;
190                    }
191                }
192            }
193        }
194        if version >= 0 {
195            {
196                let len = (self.task_offsets).as_ref().map(Vec::len);
197                crate::primitives::array::put_nullable_array_len(buf, len, flex);
198                if let Some(v) = &self.task_offsets {
199                    for it in v {
200                        it.encode(buf, version)?;
201                    }
202                }
203            }
204        }
205        if version >= 0 {
206            {
207                let len = (self.task_end_offsets).as_ref().map(Vec::len);
208                crate::primitives::array::put_nullable_array_len(buf, len, flex);
209                if let Some(v) = &self.task_end_offsets {
210                    for it in v {
211                        it.encode(buf, version)?;
212                    }
213                }
214            }
215        }
216        if version >= 0 {
217            put_bool(buf, self.shutdown_application);
218        }
219        if flex {
220            let tagged = WriteTaggedFields::new();
221            tagged.write(buf, &self.unknown_tagged_fields);
222        }
223        Ok(())
224    }
225    fn encoded_len(&self, version: i16) -> usize {
226        let flex = is_flexible(version);
227        let mut n: usize = 0;
228        if version >= 0 {
229            n += if flex {
230                compact_string_len(&self.group_id)
231            } else {
232                string_len(&self.group_id)
233            };
234        }
235        if version >= 0 {
236            n += if flex {
237                compact_string_len(&self.member_id)
238            } else {
239                string_len(&self.member_id)
240            };
241        }
242        if version >= 0 {
243            n += 4;
244        }
245        if version >= 0 {
246            n += 4;
247        }
248        if version >= 0 {
249            n += if flex {
250                compact_nullable_string_len(self.instance_id.as_deref())
251            } else {
252                nullable_string_len(self.instance_id.as_deref())
253            };
254        }
255        if version >= 0 {
256            n += if flex {
257                compact_nullable_string_len(self.rack_id.as_deref())
258            } else {
259                nullable_string_len(self.rack_id.as_deref())
260            };
261        }
262        if version >= 0 {
263            n += 4;
264        }
265        if version >= 0 {
266            n += 1 + self.topology.as_ref().map_or(0, |v| v.encoded_len(version));
267        }
268        if version >= 0 {
269            n += {
270                let opt: Option<&Vec<_>> = (self.active_tasks).as_ref();
271                let prefix = crate::primitives::array::nullable_array_len_prefix_len(
272                    opt.map(std::vec::Vec::len),
273                    flex,
274                );
275                let body: usize =
276                    opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
277                prefix + body
278            };
279        }
280        if version >= 0 {
281            n += {
282                let opt: Option<&Vec<_>> = (self.standby_tasks).as_ref();
283                let prefix = crate::primitives::array::nullable_array_len_prefix_len(
284                    opt.map(std::vec::Vec::len),
285                    flex,
286                );
287                let body: usize =
288                    opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
289                prefix + body
290            };
291        }
292        if version >= 0 {
293            n += {
294                let opt: Option<&Vec<_>> = (self.warmup_tasks).as_ref();
295                let prefix = crate::primitives::array::nullable_array_len_prefix_len(
296                    opt.map(std::vec::Vec::len),
297                    flex,
298                );
299                let body: usize =
300                    opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
301                prefix + body
302            };
303        }
304        if version >= 0 {
305            n += if flex {
306                compact_nullable_string_len(self.process_id.as_deref())
307            } else {
308                nullable_string_len(self.process_id.as_deref())
309            };
310        }
311        if version >= 0 {
312            n += 1 + self
313                .user_endpoint
314                .as_ref()
315                .map_or(0, |v| v.encoded_len(version));
316        }
317        if version >= 0 {
318            n += {
319                let opt: Option<&Vec<_>> = (self.client_tags).as_ref();
320                let prefix = crate::primitives::array::nullable_array_len_prefix_len(
321                    opt.map(std::vec::Vec::len),
322                    flex,
323                );
324                let body: usize =
325                    opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
326                prefix + body
327            };
328        }
329        if version >= 0 {
330            n += {
331                let opt: Option<&Vec<_>> = (self.task_offsets).as_ref();
332                let prefix = crate::primitives::array::nullable_array_len_prefix_len(
333                    opt.map(std::vec::Vec::len),
334                    flex,
335                );
336                let body: usize =
337                    opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
338                prefix + body
339            };
340        }
341        if version >= 0 {
342            n += {
343                let opt: Option<&Vec<_>> = (self.task_end_offsets).as_ref();
344                let prefix = crate::primitives::array::nullable_array_len_prefix_len(
345                    opt.map(std::vec::Vec::len),
346                    flex,
347                );
348                let body: usize =
349                    opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
350                prefix + body
351            };
352        }
353        if version >= 0 {
354            n += 1;
355        }
356        if flex {
357            let known_pairs: Vec<(u32, usize)> = Vec::new();
358            n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
359        }
360        n
361    }
362}
363impl Decode<'_> for StreamsGroupHeartbeatRequest {
364    fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
365        if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
366            return Err(ProtocolError::UnsupportedVersion {
367                api_key: API_KEY,
368                version,
369            });
370        }
371        let flex = is_flexible(version);
372        let mut out = Self::default();
373        if version >= 0 {
374            out.group_id = if flex {
375                get_compact_string_owned(buf)?
376            } else {
377                get_string_owned(buf)?
378            };
379        }
380        if version >= 0 {
381            out.member_id = if flex {
382                get_compact_string_owned(buf)?
383            } else {
384                get_string_owned(buf)?
385            };
386        }
387        if version >= 0 {
388            out.member_epoch = get_i32(buf)?;
389        }
390        if version >= 0 {
391            out.endpoint_information_epoch = get_i32(buf)?;
392        }
393        if version >= 0 {
394            out.instance_id = if flex {
395                get_compact_nullable_string_owned(buf)?
396            } else {
397                get_nullable_string_owned(buf)?
398            };
399        }
400        if version >= 0 {
401            out.rack_id = if flex {
402                get_compact_nullable_string_owned(buf)?
403            } else {
404                get_nullable_string_owned(buf)?
405            };
406        }
407        if version >= 0 {
408            out.rebalance_timeout_ms = get_i32(buf)?;
409        }
410        if version >= 0 {
411            out.topology = if get_i8(buf)? < 0 {
412                None
413            } else {
414                Some(Topology::decode(buf, version)?)
415            };
416        }
417        if version >= 0 {
418            out.active_tasks = {
419                let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
420                match opt {
421                    None => None,
422                    Some(n) => {
423                        let mut v = Vec::with_capacity(n);
424                        for _ in 0..n {
425                            v . push (super :: common :: streams_group_heartbeat_request :: task_ids :: TaskIds :: decode (buf , version) ?) ;
426                        }
427                        Some(v)
428                    }
429                }
430            };
431        }
432        if version >= 0 {
433            out.standby_tasks = {
434                let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
435                match opt {
436                    None => None,
437                    Some(n) => {
438                        let mut v = Vec::with_capacity(n);
439                        for _ in 0..n {
440                            v . push (super :: common :: streams_group_heartbeat_request :: task_ids :: TaskIds :: decode (buf , version) ?) ;
441                        }
442                        Some(v)
443                    }
444                }
445            };
446        }
447        if version >= 0 {
448            out.warmup_tasks = {
449                let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
450                match opt {
451                    None => None,
452                    Some(n) => {
453                        let mut v = Vec::with_capacity(n);
454                        for _ in 0..n {
455                            v . push (super :: common :: streams_group_heartbeat_request :: task_ids :: TaskIds :: decode (buf , version) ?) ;
456                        }
457                        Some(v)
458                    }
459                }
460            };
461        }
462        if version >= 0 {
463            out.process_id = if flex {
464                get_compact_nullable_string_owned(buf)?
465            } else {
466                get_nullable_string_owned(buf)?
467            };
468        }
469        if version >= 0 {
470            out.user_endpoint = if get_i8(buf)? < 0 {
471                None
472            } else {
473                Some(
474                    super::common::streams_group_heartbeat_request::endpoint::Endpoint::decode(
475                        buf, version,
476                    )?,
477                )
478            };
479        }
480        if version >= 0 {
481            out.client_tags = {
482                let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
483                match opt {
484                    None => None,
485                    Some(n) => {
486                        let mut v = Vec::with_capacity(n);
487                        for _ in 0..n {
488                            v . push (super :: common :: streams_group_heartbeat_request :: key_value :: KeyValue :: decode (buf , version) ?) ;
489                        }
490                        Some(v)
491                    }
492                }
493            };
494        }
495        if version >= 0 {
496            out.task_offsets = {
497                let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
498                match opt {
499                    None => None,
500                    Some(n) => {
501                        let mut v = Vec::with_capacity(n);
502                        for _ in 0..n {
503                            v . push (super :: common :: streams_group_heartbeat_request :: task_offset :: TaskOffset :: decode (buf , version) ?) ;
504                        }
505                        Some(v)
506                    }
507                }
508            };
509        }
510        if version >= 0 {
511            out.task_end_offsets = {
512                let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
513                match opt {
514                    None => None,
515                    Some(n) => {
516                        let mut v = Vec::with_capacity(n);
517                        for _ in 0..n {
518                            v . push (super :: common :: streams_group_heartbeat_request :: task_offset :: TaskOffset :: decode (buf , version) ?) ;
519                        }
520                        Some(v)
521                    }
522                }
523            };
524        }
525        if version >= 0 {
526            out.shutdown_application = get_bool(buf)?;
527        }
528        if flex {
529            out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
530        }
531        Ok(out)
532    }
533}
534#[cfg(test)]
535impl StreamsGroupHeartbeatRequest {
536    #[must_use]
537    pub fn populated(version: i16) -> Self {
538        let mut m = Self::default();
539        if version >= 0 {
540            m.group_id = "x".to_string();
541        }
542        if version >= 0 {
543            m.member_id = "x".to_string();
544        }
545        if version >= 0 {
546            m.member_epoch = 1i32;
547        }
548        if version >= 0 {
549            m.endpoint_information_epoch = 1i32;
550        }
551        if version >= 0 {
552            m.instance_id = Some("x".to_string());
553        }
554        if version >= 0 {
555            m.rack_id = Some("x".to_string());
556        }
557        if version >= 0 {
558            m.rebalance_timeout_ms = 1i32;
559        }
560        if version >= 0 {
561            m.topology = Some(Topology::populated(version));
562        }
563        if version >= 0 {
564            m.active_tasks = Some(vec![
565                super::common::streams_group_heartbeat_request::task_ids::TaskIds::populated(
566                    version,
567                ),
568            ]);
569        }
570        if version >= 0 {
571            m.standby_tasks = Some(vec![
572                super::common::streams_group_heartbeat_request::task_ids::TaskIds::populated(
573                    version,
574                ),
575            ]);
576        }
577        if version >= 0 {
578            m.warmup_tasks = Some(vec![
579                super::common::streams_group_heartbeat_request::task_ids::TaskIds::populated(
580                    version,
581                ),
582            ]);
583        }
584        if version >= 0 {
585            m.process_id = Some("x".to_string());
586        }
587        if version >= 0 {
588            m.user_endpoint = Some(
589                super::common::streams_group_heartbeat_request::endpoint::Endpoint::populated(
590                    version,
591                ),
592            );
593        }
594        if version >= 0 {
595            m.client_tags = Some(vec![
596                super::common::streams_group_heartbeat_request::key_value::KeyValue::populated(
597                    version,
598                ),
599            ]);
600        }
601        if version >= 0 {
602            m.task_offsets = Some(vec![
603                super::common::streams_group_heartbeat_request::task_offset::TaskOffset::populated(
604                    version,
605                ),
606            ]);
607        }
608        if version >= 0 {
609            m.task_end_offsets = Some(vec![
610                super::common::streams_group_heartbeat_request::task_offset::TaskOffset::populated(
611                    version,
612                ),
613            ]);
614        }
615        if version >= 0 {
616            m.shutdown_application = true;
617        }
618        m
619    }
620}
621#[derive(Debug, Clone, PartialEq, Eq, Default)]
622pub struct Topology {
623    pub epoch: i32,
624    pub subtopologies: Vec<Subtopology>,
625    pub unknown_tagged_fields: UnknownTaggedFields,
626}
627impl Encode for Topology {
628    fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
629        let flex = version >= 0;
630        if version >= 0 {
631            put_i32(buf, self.epoch);
632        }
633        if version >= 0 {
634            {
635                crate::primitives::array::put_array_len(buf, (self.subtopologies).len(), flex);
636                for it in &self.subtopologies {
637                    it.encode(buf, version)?;
638                }
639            }
640        }
641        if flex {
642            let tagged = WriteTaggedFields::new();
643            tagged.write(buf, &self.unknown_tagged_fields);
644        }
645        Ok(())
646    }
647    fn encoded_len(&self, version: i16) -> usize {
648        let flex = version >= 0;
649        let mut n: usize = 0;
650        if version >= 0 {
651            n += 4;
652        }
653        if version >= 0 {
654            n += {
655                let prefix = crate::primitives::array::array_len_prefix_len(
656                    (self.subtopologies).len(),
657                    flex,
658                );
659                let body: usize = (self.subtopologies)
660                    .iter()
661                    .map(|it| it.encoded_len(version))
662                    .sum();
663                prefix + body
664            };
665        }
666        if flex {
667            let known_pairs: Vec<(u32, usize)> = Vec::new();
668            n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
669        }
670        n
671    }
672}
673impl Decode<'_> for Topology {
674    fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
675        let flex = version >= 0;
676        let mut out = Self::default();
677        if version >= 0 {
678            out.epoch = get_i32(buf)?;
679        }
680        if version >= 0 {
681            out.subtopologies = {
682                let n = crate::primitives::array::get_array_len(buf, flex)?;
683                let mut v = Vec::with_capacity(n);
684                for _ in 0..n {
685                    v.push(Subtopology::decode(buf, version)?);
686                }
687                v
688            };
689        }
690        if flex {
691            out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
692        }
693        Ok(out)
694    }
695}
696#[cfg(test)]
697impl Topology {
698    #[must_use]
699    pub fn populated(version: i16) -> Self {
700        let mut m = Self::default();
701        if version >= 0 {
702            m.epoch = 1i32;
703        }
704        if version >= 0 {
705            m.subtopologies = vec![Subtopology::populated(version)];
706        }
707        m
708    }
709}
710#[derive(Debug, Clone, PartialEq, Eq, Default)]
711pub struct Subtopology {
712    pub subtopology_id: String,
713    pub source_topics: Vec<String>,
714    pub source_topic_regex: Vec<String>,
715    pub state_changelog_topics:
716        Vec<super::common::streams_group_heartbeat_request::topic_info::TopicInfo>,
717    pub repartition_sink_topics: Vec<String>,
718    pub repartition_source_topics:
719        Vec<super::common::streams_group_heartbeat_request::topic_info::TopicInfo>,
720    pub copartition_groups: Vec<CopartitionGroup>,
721    pub unknown_tagged_fields: UnknownTaggedFields,
722}
723impl Encode for Subtopology {
724    fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
725        let flex = version >= 0;
726        if version >= 0 {
727            if flex {
728                put_compact_string(buf, &self.subtopology_id);
729            } else {
730                put_string(buf, &self.subtopology_id);
731            }
732        }
733        if version >= 0 {
734            {
735                crate::primitives::array::put_array_len(buf, (self.source_topics).len(), flex);
736                for it in &self.source_topics {
737                    if flex {
738                        put_compact_string(buf, it);
739                    } else {
740                        put_string(buf, it);
741                    }
742                }
743            }
744        }
745        if version >= 0 {
746            {
747                crate::primitives::array::put_array_len(buf, (self.source_topic_regex).len(), flex);
748                for it in &self.source_topic_regex {
749                    if flex {
750                        put_compact_string(buf, it);
751                    } else {
752                        put_string(buf, it);
753                    }
754                }
755            }
756        }
757        if version >= 0 {
758            {
759                crate::primitives::array::put_array_len(
760                    buf,
761                    (self.state_changelog_topics).len(),
762                    flex,
763                );
764                for it in &self.state_changelog_topics {
765                    it.encode(buf, version)?;
766                }
767            }
768        }
769        if version >= 0 {
770            {
771                crate::primitives::array::put_array_len(
772                    buf,
773                    (self.repartition_sink_topics).len(),
774                    flex,
775                );
776                for it in &self.repartition_sink_topics {
777                    if flex {
778                        put_compact_string(buf, it);
779                    } else {
780                        put_string(buf, it);
781                    }
782                }
783            }
784        }
785        if version >= 0 {
786            {
787                crate::primitives::array::put_array_len(
788                    buf,
789                    (self.repartition_source_topics).len(),
790                    flex,
791                );
792                for it in &self.repartition_source_topics {
793                    it.encode(buf, version)?;
794                }
795            }
796        }
797        if version >= 0 {
798            {
799                crate::primitives::array::put_array_len(buf, (self.copartition_groups).len(), flex);
800                for it in &self.copartition_groups {
801                    it.encode(buf, version)?;
802                }
803            }
804        }
805        if flex {
806            let tagged = WriteTaggedFields::new();
807            tagged.write(buf, &self.unknown_tagged_fields);
808        }
809        Ok(())
810    }
811    fn encoded_len(&self, version: i16) -> usize {
812        let flex = version >= 0;
813        let mut n: usize = 0;
814        if version >= 0 {
815            n += if flex {
816                compact_string_len(&self.subtopology_id)
817            } else {
818                string_len(&self.subtopology_id)
819            };
820        }
821        if version >= 0 {
822            n += {
823                let prefix = crate::primitives::array::array_len_prefix_len(
824                    (self.source_topics).len(),
825                    flex,
826                );
827                let body: usize = (self.source_topics)
828                    .iter()
829                    .map(|it| {
830                        if flex {
831                            compact_string_len(it)
832                        } else {
833                            string_len(it)
834                        }
835                    })
836                    .sum();
837                prefix + body
838            };
839        }
840        if version >= 0 {
841            n += {
842                let prefix = crate::primitives::array::array_len_prefix_len(
843                    (self.source_topic_regex).len(),
844                    flex,
845                );
846                let body: usize = (self.source_topic_regex)
847                    .iter()
848                    .map(|it| {
849                        if flex {
850                            compact_string_len(it)
851                        } else {
852                            string_len(it)
853                        }
854                    })
855                    .sum();
856                prefix + body
857            };
858        }
859        if version >= 0 {
860            n += {
861                let prefix = crate::primitives::array::array_len_prefix_len(
862                    (self.state_changelog_topics).len(),
863                    flex,
864                );
865                let body: usize = (self.state_changelog_topics)
866                    .iter()
867                    .map(|it| it.encoded_len(version))
868                    .sum();
869                prefix + body
870            };
871        }
872        if version >= 0 {
873            n += {
874                let prefix = crate::primitives::array::array_len_prefix_len(
875                    (self.repartition_sink_topics).len(),
876                    flex,
877                );
878                let body: usize = (self.repartition_sink_topics)
879                    .iter()
880                    .map(|it| {
881                        if flex {
882                            compact_string_len(it)
883                        } else {
884                            string_len(it)
885                        }
886                    })
887                    .sum();
888                prefix + body
889            };
890        }
891        if version >= 0 {
892            n += {
893                let prefix = crate::primitives::array::array_len_prefix_len(
894                    (self.repartition_source_topics).len(),
895                    flex,
896                );
897                let body: usize = (self.repartition_source_topics)
898                    .iter()
899                    .map(|it| it.encoded_len(version))
900                    .sum();
901                prefix + body
902            };
903        }
904        if version >= 0 {
905            n += {
906                let prefix = crate::primitives::array::array_len_prefix_len(
907                    (self.copartition_groups).len(),
908                    flex,
909                );
910                let body: usize = (self.copartition_groups)
911                    .iter()
912                    .map(|it| it.encoded_len(version))
913                    .sum();
914                prefix + body
915            };
916        }
917        if flex {
918            let known_pairs: Vec<(u32, usize)> = Vec::new();
919            n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
920        }
921        n
922    }
923}
924impl Decode<'_> for Subtopology {
925    fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
926        let flex = version >= 0;
927        let mut out = Self::default();
928        if version >= 0 {
929            out.subtopology_id = if flex {
930                get_compact_string_owned(buf)?
931            } else {
932                get_string_owned(buf)?
933            };
934        }
935        if version >= 0 {
936            out.source_topics = {
937                let n = crate::primitives::array::get_array_len(buf, flex)?;
938                let mut v = Vec::with_capacity(n);
939                for _ in 0..n {
940                    v.push(if flex {
941                        get_compact_string_owned(buf)?
942                    } else {
943                        get_string_owned(buf)?
944                    });
945                }
946                v
947            };
948        }
949        if version >= 0 {
950            out.source_topic_regex = {
951                let n = crate::primitives::array::get_array_len(buf, flex)?;
952                let mut v = Vec::with_capacity(n);
953                for _ in 0..n {
954                    v.push(if flex {
955                        get_compact_string_owned(buf)?
956                    } else {
957                        get_string_owned(buf)?
958                    });
959                }
960                v
961            };
962        }
963        if version >= 0 {
964            out.state_changelog_topics = {
965                let n = crate::primitives::array::get_array_len(buf, flex)?;
966                let mut v = Vec::with_capacity(n);
967                for _ in 0..n {
968                    v . push (super :: common :: streams_group_heartbeat_request :: topic_info :: TopicInfo :: decode (buf , version) ?) ;
969                }
970                v
971            };
972        }
973        if version >= 0 {
974            out.repartition_sink_topics = {
975                let n = crate::primitives::array::get_array_len(buf, flex)?;
976                let mut v = Vec::with_capacity(n);
977                for _ in 0..n {
978                    v.push(if flex {
979                        get_compact_string_owned(buf)?
980                    } else {
981                        get_string_owned(buf)?
982                    });
983                }
984                v
985            };
986        }
987        if version >= 0 {
988            out.repartition_source_topics = {
989                let n = crate::primitives::array::get_array_len(buf, flex)?;
990                let mut v = Vec::with_capacity(n);
991                for _ in 0..n {
992                    v . push (super :: common :: streams_group_heartbeat_request :: topic_info :: TopicInfo :: decode (buf , version) ?) ;
993                }
994                v
995            };
996        }
997        if version >= 0 {
998            out.copartition_groups = {
999                let n = crate::primitives::array::get_array_len(buf, flex)?;
1000                let mut v = Vec::with_capacity(n);
1001                for _ in 0..n {
1002                    v.push(CopartitionGroup::decode(buf, version)?);
1003                }
1004                v
1005            };
1006        }
1007        if flex {
1008            out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
1009        }
1010        Ok(out)
1011    }
1012}
1013#[cfg(test)]
1014impl Subtopology {
1015    #[must_use]
1016    pub fn populated(version: i16) -> Self {
1017        let mut m = Self::default();
1018        if version >= 0 {
1019            m.subtopology_id = "x".to_string();
1020        }
1021        if version >= 0 {
1022            m.source_topics = vec!["x".to_string()];
1023        }
1024        if version >= 0 {
1025            m.source_topic_regex = vec!["x".to_string()];
1026        }
1027        if version >= 0 {
1028            m.state_changelog_topics = vec![
1029                super::common::streams_group_heartbeat_request::topic_info::TopicInfo::populated(
1030                    version,
1031                ),
1032            ];
1033        }
1034        if version >= 0 {
1035            m.repartition_sink_topics = vec!["x".to_string()];
1036        }
1037        if version >= 0 {
1038            m.repartition_source_topics = vec![
1039                super::common::streams_group_heartbeat_request::topic_info::TopicInfo::populated(
1040                    version,
1041                ),
1042            ];
1043        }
1044        if version >= 0 {
1045            m.copartition_groups = vec![CopartitionGroup::populated(version)];
1046        }
1047        m
1048    }
1049}
1050#[derive(Debug, Clone, PartialEq, Eq, Default)]
1051pub struct CopartitionGroup {
1052    pub source_topics: Vec<i16>,
1053    pub source_topic_regex: Vec<i16>,
1054    pub repartition_source_topics: Vec<i16>,
1055    pub unknown_tagged_fields: UnknownTaggedFields,
1056}
1057impl Encode for CopartitionGroup {
1058    fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
1059        let flex = version >= 0;
1060        if version >= 0 {
1061            {
1062                crate::primitives::array::put_array_len(buf, (self.source_topics).len(), flex);
1063                for it in &self.source_topics {
1064                    put_i16(buf, *it);
1065                }
1066            }
1067        }
1068        if version >= 0 {
1069            {
1070                crate::primitives::array::put_array_len(buf, (self.source_topic_regex).len(), flex);
1071                for it in &self.source_topic_regex {
1072                    put_i16(buf, *it);
1073                }
1074            }
1075        }
1076        if version >= 0 {
1077            {
1078                crate::primitives::array::put_array_len(
1079                    buf,
1080                    (self.repartition_source_topics).len(),
1081                    flex,
1082                );
1083                for it in &self.repartition_source_topics {
1084                    put_i16(buf, *it);
1085                }
1086            }
1087        }
1088        if flex {
1089            let tagged = WriteTaggedFields::new();
1090            tagged.write(buf, &self.unknown_tagged_fields);
1091        }
1092        Ok(())
1093    }
1094    fn encoded_len(&self, version: i16) -> usize {
1095        let flex = version >= 0;
1096        let mut n: usize = 0;
1097        if version >= 0 {
1098            n += {
1099                let prefix = crate::primitives::array::array_len_prefix_len(
1100                    (self.source_topics).len(),
1101                    flex,
1102                );
1103                let body: usize = (self.source_topics).iter().map(|_| 2).sum();
1104                prefix + body
1105            };
1106        }
1107        if version >= 0 {
1108            n += {
1109                let prefix = crate::primitives::array::array_len_prefix_len(
1110                    (self.source_topic_regex).len(),
1111                    flex,
1112                );
1113                let body: usize = (self.source_topic_regex).iter().map(|_| 2).sum();
1114                prefix + body
1115            };
1116        }
1117        if version >= 0 {
1118            n += {
1119                let prefix = crate::primitives::array::array_len_prefix_len(
1120                    (self.repartition_source_topics).len(),
1121                    flex,
1122                );
1123                let body: usize = (self.repartition_source_topics).iter().map(|_| 2).sum();
1124                prefix + body
1125            };
1126        }
1127        if flex {
1128            let known_pairs: Vec<(u32, usize)> = Vec::new();
1129            n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
1130        }
1131        n
1132    }
1133}
1134impl Decode<'_> for CopartitionGroup {
1135    fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
1136        let flex = version >= 0;
1137        let mut out = Self::default();
1138        if version >= 0 {
1139            out.source_topics = {
1140                let n = crate::primitives::array::get_array_len(buf, flex)?;
1141                let mut v = Vec::with_capacity(n);
1142                for _ in 0..n {
1143                    v.push(get_i16(buf)?);
1144                }
1145                v
1146            };
1147        }
1148        if version >= 0 {
1149            out.source_topic_regex = {
1150                let n = crate::primitives::array::get_array_len(buf, flex)?;
1151                let mut v = Vec::with_capacity(n);
1152                for _ in 0..n {
1153                    v.push(get_i16(buf)?);
1154                }
1155                v
1156            };
1157        }
1158        if version >= 0 {
1159            out.repartition_source_topics = {
1160                let n = crate::primitives::array::get_array_len(buf, flex)?;
1161                let mut v = Vec::with_capacity(n);
1162                for _ in 0..n {
1163                    v.push(get_i16(buf)?);
1164                }
1165                v
1166            };
1167        }
1168        if flex {
1169            out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
1170        }
1171        Ok(out)
1172    }
1173}
1174#[cfg(test)]
1175impl CopartitionGroup {
1176    #[must_use]
1177    pub fn populated(version: i16) -> Self {
1178        let mut m = Self::default();
1179        if version >= 0 {
1180            m.source_topics = vec![1i16];
1181        }
1182        if version >= 0 {
1183            m.source_topic_regex = vec![1i16];
1184        }
1185        if version >= 0 {
1186            m.repartition_source_topics = vec![1i16];
1187        }
1188        m
1189    }
1190}
1191
1192/// Default JSON payload matching `Self::default()` for JVM oracle differential testing.
1193/// Only includes fields valid for the given version.
1194#[must_use]
1195#[allow(unused_comparisons)]
1196pub fn default_json(version: i16) -> ::serde_json::Value {
1197    let mut obj = ::serde_json::Map::new();
1198    obj.insert(
1199        "groupId".to_string(),
1200        ::serde_json::Value::String(String::new()),
1201    );
1202    obj.insert(
1203        "memberId".to_string(),
1204        ::serde_json::Value::String(String::new()),
1205    );
1206    obj.insert("memberEpoch".to_string(), ::serde_json::json!(0));
1207    obj.insert(
1208        "endpointInformationEpoch".to_string(),
1209        ::serde_json::json!(0),
1210    );
1211    obj.insert("instanceId".to_string(), ::serde_json::Value::Null);
1212    obj.insert("rackId".to_string(), ::serde_json::Value::Null);
1213    obj.insert("rebalanceTimeoutMs".to_string(), ::serde_json::json!(-1));
1214    obj.insert("topology".to_string(), ::serde_json::Value::Null);
1215    obj.insert("activeTasks".to_string(), ::serde_json::Value::Null);
1216    obj.insert("standbyTasks".to_string(), ::serde_json::Value::Null);
1217    obj.insert("warmupTasks".to_string(), ::serde_json::Value::Null);
1218    obj.insert("processId".to_string(), ::serde_json::Value::Null);
1219    obj.insert("userEndpoint".to_string(), ::serde_json::Value::Null);
1220    obj.insert("clientTags".to_string(), ::serde_json::Value::Null);
1221    obj.insert("taskOffsets".to_string(), ::serde_json::Value::Null);
1222    obj.insert("taskEndOffsets".to_string(), ::serde_json::Value::Null);
1223    obj.insert(
1224        "shutdownApplication".to_string(),
1225        ::serde_json::Value::Bool(false),
1226    );
1227    ::serde_json::Value::Object(obj)
1228}
1229
1230impl crate::ProtocolRequest for StreamsGroupHeartbeatRequest {
1231    const API_KEY: i16 = API_KEY;
1232    const MIN_VERSION: i16 = MIN_VERSION;
1233    const MAX_VERSION: i16 = MAX_VERSION;
1234    const FLEXIBLE_MIN: i16 = FLEXIBLE_MIN;
1235    type Response = super::streams_group_heartbeat_response::StreamsGroupHeartbeatResponse;
1236}