Skip to main content

crabka_protocol/opt/rustwide/workdir/generated/
StreamsGroupHeartbeatResponse.owned.rs

1// AUTO-GENERATED by crabka-protocol-codegen against a9ce3221537b8653448750697915607dc7936cf3. Do not edit.
2
3use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_i16, get_i32, put_i16, put_i32};
6use crate::primitives::string_bytes::{
7    compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8    get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9    put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
13
14pub const API_KEY: i16 = 88;
15pub const MIN_VERSION: i16 = 0;
16pub const MAX_VERSION: i16 = 0;
17pub const FLEXIBLE_MIN: i16 = 0;
18
19#[inline]
20fn is_flexible(version: i16) -> bool {
21    version >= FLEXIBLE_MIN
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Default)]
25pub struct StreamsGroupHeartbeatResponse {
26    pub throttle_time_ms: i32,
27    pub error_code: i16,
28    pub error_message: Option<String>,
29    pub member_id: String,
30    pub member_epoch: i32,
31    pub heartbeat_interval_ms: i32,
32    pub acceptable_recovery_lag: i32,
33    pub task_offset_interval_ms: i32,
34    pub status: Option<Vec<super::common::streams_group_heartbeat_response::status::Status>>,
35    pub active_tasks:
36        Option<Vec<super::common::streams_group_heartbeat_response::task_ids::TaskIds>>,
37    pub standby_tasks:
38        Option<Vec<super::common::streams_group_heartbeat_response::task_ids::TaskIds>>,
39    pub warmup_tasks:
40        Option<Vec<super::common::streams_group_heartbeat_response::task_ids::TaskIds>>,
41    pub endpoint_information_epoch: i32,
42    pub partitions_by_user_endpoint: Option<Vec<EndpointToPartitions>>,
43    pub unknown_tagged_fields: UnknownTaggedFields,
44}
45impl Encode for StreamsGroupHeartbeatResponse {
46    fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
47        if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
48            return Err(ProtocolError::UnsupportedVersion {
49                api_key: API_KEY,
50                version,
51            });
52        }
53        let flex = is_flexible(version);
54        if version >= 0 {
55            put_i32(buf, self.throttle_time_ms);
56        }
57        if version >= 0 {
58            put_i16(buf, self.error_code);
59        }
60        if version >= 0 {
61            if flex {
62                put_compact_nullable_string(buf, self.error_message.as_deref());
63            } else {
64                put_nullable_string(buf, self.error_message.as_deref());
65            }
66        }
67        if version >= 0 {
68            if flex {
69                put_compact_string(buf, &self.member_id);
70            } else {
71                put_string(buf, &self.member_id);
72            }
73        }
74        if version >= 0 {
75            put_i32(buf, self.member_epoch);
76        }
77        if version >= 0 {
78            put_i32(buf, self.heartbeat_interval_ms);
79        }
80        if version >= 0 {
81            put_i32(buf, self.acceptable_recovery_lag);
82        }
83        if version >= 0 {
84            put_i32(buf, self.task_offset_interval_ms);
85        }
86        if version >= 0 {
87            {
88                let len = (self.status).as_ref().map(Vec::len);
89                crate::primitives::array::put_nullable_array_len(buf, len, flex);
90                if let Some(v) = &self.status {
91                    for it in v {
92                        it.encode(buf, version)?;
93                    }
94                }
95            }
96        }
97        if version >= 0 {
98            {
99                let len = (self.active_tasks).as_ref().map(Vec::len);
100                crate::primitives::array::put_nullable_array_len(buf, len, flex);
101                if let Some(v) = &self.active_tasks {
102                    for it in v {
103                        it.encode(buf, version)?;
104                    }
105                }
106            }
107        }
108        if version >= 0 {
109            {
110                let len = (self.standby_tasks).as_ref().map(Vec::len);
111                crate::primitives::array::put_nullable_array_len(buf, len, flex);
112                if let Some(v) = &self.standby_tasks {
113                    for it in v {
114                        it.encode(buf, version)?;
115                    }
116                }
117            }
118        }
119        if version >= 0 {
120            {
121                let len = (self.warmup_tasks).as_ref().map(Vec::len);
122                crate::primitives::array::put_nullable_array_len(buf, len, flex);
123                if let Some(v) = &self.warmup_tasks {
124                    for it in v {
125                        it.encode(buf, version)?;
126                    }
127                }
128            }
129        }
130        if version >= 0 {
131            put_i32(buf, self.endpoint_information_epoch);
132        }
133        if version >= 0 {
134            {
135                let len = (self.partitions_by_user_endpoint).as_ref().map(Vec::len);
136                crate::primitives::array::put_nullable_array_len(buf, len, flex);
137                if let Some(v) = &self.partitions_by_user_endpoint {
138                    for it in v {
139                        it.encode(buf, version)?;
140                    }
141                }
142            }
143        }
144        if flex {
145            let tagged = WriteTaggedFields::new();
146            tagged.write(buf, &self.unknown_tagged_fields);
147        }
148        Ok(())
149    }
150    fn encoded_len(&self, version: i16) -> usize {
151        let flex = is_flexible(version);
152        let mut n: usize = 0;
153        if version >= 0 {
154            n += 4;
155        }
156        if version >= 0 {
157            n += 2;
158        }
159        if version >= 0 {
160            n += if flex {
161                compact_nullable_string_len(self.error_message.as_deref())
162            } else {
163                nullable_string_len(self.error_message.as_deref())
164            };
165        }
166        if version >= 0 {
167            n += if flex {
168                compact_string_len(&self.member_id)
169            } else {
170                string_len(&self.member_id)
171            };
172        }
173        if version >= 0 {
174            n += 4;
175        }
176        if version >= 0 {
177            n += 4;
178        }
179        if version >= 0 {
180            n += 4;
181        }
182        if version >= 0 {
183            n += 4;
184        }
185        if version >= 0 {
186            n += {
187                let opt: Option<&Vec<_>> = (self.status).as_ref();
188                let prefix = crate::primitives::array::nullable_array_len_prefix_len(
189                    opt.map(std::vec::Vec::len),
190                    flex,
191                );
192                let body: usize =
193                    opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
194                prefix + body
195            };
196        }
197        if version >= 0 {
198            n += {
199                let opt: Option<&Vec<_>> = (self.active_tasks).as_ref();
200                let prefix = crate::primitives::array::nullable_array_len_prefix_len(
201                    opt.map(std::vec::Vec::len),
202                    flex,
203                );
204                let body: usize =
205                    opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
206                prefix + body
207            };
208        }
209        if version >= 0 {
210            n += {
211                let opt: Option<&Vec<_>> = (self.standby_tasks).as_ref();
212                let prefix = crate::primitives::array::nullable_array_len_prefix_len(
213                    opt.map(std::vec::Vec::len),
214                    flex,
215                );
216                let body: usize =
217                    opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
218                prefix + body
219            };
220        }
221        if version >= 0 {
222            n += {
223                let opt: Option<&Vec<_>> = (self.warmup_tasks).as_ref();
224                let prefix = crate::primitives::array::nullable_array_len_prefix_len(
225                    opt.map(std::vec::Vec::len),
226                    flex,
227                );
228                let body: usize =
229                    opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
230                prefix + body
231            };
232        }
233        if version >= 0 {
234            n += 4;
235        }
236        if version >= 0 {
237            n += {
238                let opt: Option<&Vec<_>> = (self.partitions_by_user_endpoint).as_ref();
239                let prefix = crate::primitives::array::nullable_array_len_prefix_len(
240                    opt.map(std::vec::Vec::len),
241                    flex,
242                );
243                let body: usize =
244                    opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
245                prefix + body
246            };
247        }
248        if flex {
249            let known_pairs: Vec<(u32, usize)> = Vec::new();
250            n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
251        }
252        n
253    }
254}
255impl Decode<'_> for StreamsGroupHeartbeatResponse {
256    fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
257        if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
258            return Err(ProtocolError::UnsupportedVersion {
259                api_key: API_KEY,
260                version,
261            });
262        }
263        let flex = is_flexible(version);
264        let mut out = Self::default();
265        if version >= 0 {
266            out.throttle_time_ms = get_i32(buf)?;
267        }
268        if version >= 0 {
269            out.error_code = get_i16(buf)?;
270        }
271        if version >= 0 {
272            out.error_message = if flex {
273                get_compact_nullable_string_owned(buf)?
274            } else {
275                get_nullable_string_owned(buf)?
276            };
277        }
278        if version >= 0 {
279            out.member_id = if flex {
280                get_compact_string_owned(buf)?
281            } else {
282                get_string_owned(buf)?
283            };
284        }
285        if version >= 0 {
286            out.member_epoch = get_i32(buf)?;
287        }
288        if version >= 0 {
289            out.heartbeat_interval_ms = get_i32(buf)?;
290        }
291        if version >= 0 {
292            out.acceptable_recovery_lag = get_i32(buf)?;
293        }
294        if version >= 0 {
295            out.task_offset_interval_ms = get_i32(buf)?;
296        }
297        if version >= 0 {
298            out.status = {
299                let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
300                match opt {
301                    None => None,
302                    Some(n) => {
303                        let mut v = Vec::with_capacity(n);
304                        for _ in 0..n {
305                            v . push (super :: common :: streams_group_heartbeat_response :: status :: Status :: decode (buf , version) ?) ;
306                        }
307                        Some(v)
308                    }
309                }
310            };
311        }
312        if version >= 0 {
313            out.active_tasks = {
314                let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
315                match opt {
316                    None => None,
317                    Some(n) => {
318                        let mut v = Vec::with_capacity(n);
319                        for _ in 0..n {
320                            v . push (super :: common :: streams_group_heartbeat_response :: task_ids :: TaskIds :: decode (buf , version) ?) ;
321                        }
322                        Some(v)
323                    }
324                }
325            };
326        }
327        if version >= 0 {
328            out.standby_tasks = {
329                let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
330                match opt {
331                    None => None,
332                    Some(n) => {
333                        let mut v = Vec::with_capacity(n);
334                        for _ in 0..n {
335                            v . push (super :: common :: streams_group_heartbeat_response :: task_ids :: TaskIds :: decode (buf , version) ?) ;
336                        }
337                        Some(v)
338                    }
339                }
340            };
341        }
342        if version >= 0 {
343            out.warmup_tasks = {
344                let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
345                match opt {
346                    None => None,
347                    Some(n) => {
348                        let mut v = Vec::with_capacity(n);
349                        for _ in 0..n {
350                            v . push (super :: common :: streams_group_heartbeat_response :: task_ids :: TaskIds :: decode (buf , version) ?) ;
351                        }
352                        Some(v)
353                    }
354                }
355            };
356        }
357        if version >= 0 {
358            out.endpoint_information_epoch = get_i32(buf)?;
359        }
360        if version >= 0 {
361            out.partitions_by_user_endpoint = {
362                let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
363                match opt {
364                    None => None,
365                    Some(n) => {
366                        let mut v = Vec::with_capacity(n);
367                        for _ in 0..n {
368                            v.push(EndpointToPartitions::decode(buf, version)?);
369                        }
370                        Some(v)
371                    }
372                }
373            };
374        }
375        if flex {
376            out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
377        }
378        Ok(out)
379    }
380}
381#[cfg(test)]
382impl StreamsGroupHeartbeatResponse {
383    #[must_use]
384    pub fn populated(version: i16) -> Self {
385        let mut m = Self::default();
386        if version >= 0 {
387            m.throttle_time_ms = 1i32;
388        }
389        if version >= 0 {
390            m.error_code = 1i16;
391        }
392        if version >= 0 {
393            m.error_message = Some("x".to_string());
394        }
395        if version >= 0 {
396            m.member_id = "x".to_string();
397        }
398        if version >= 0 {
399            m.member_epoch = 1i32;
400        }
401        if version >= 0 {
402            m.heartbeat_interval_ms = 1i32;
403        }
404        if version >= 0 {
405            m.acceptable_recovery_lag = 1i32;
406        }
407        if version >= 0 {
408            m.task_offset_interval_ms = 1i32;
409        }
410        if version >= 0 {
411            m.status = Some(vec![
412                super::common::streams_group_heartbeat_response::status::Status::populated(version),
413            ]);
414        }
415        if version >= 0 {
416            m.active_tasks = Some(vec![
417                super::common::streams_group_heartbeat_response::task_ids::TaskIds::populated(
418                    version,
419                ),
420            ]);
421        }
422        if version >= 0 {
423            m.standby_tasks = Some(vec![
424                super::common::streams_group_heartbeat_response::task_ids::TaskIds::populated(
425                    version,
426                ),
427            ]);
428        }
429        if version >= 0 {
430            m.warmup_tasks = Some(vec![
431                super::common::streams_group_heartbeat_response::task_ids::TaskIds::populated(
432                    version,
433                ),
434            ]);
435        }
436        if version >= 0 {
437            m.endpoint_information_epoch = 1i32;
438        }
439        if version >= 0 {
440            m.partitions_by_user_endpoint = Some(vec![EndpointToPartitions::populated(version)]);
441        }
442        m
443    }
444}
445#[derive(Debug, Clone, PartialEq, Eq, Default)]
446pub struct EndpointToPartitions {
447    pub user_endpoint: super::common::streams_group_heartbeat_response::endpoint::Endpoint,
448    pub active_partitions:
449        Vec<super::common::streams_group_heartbeat_response::topic_partition::TopicPartition>,
450    pub standby_partitions:
451        Vec<super::common::streams_group_heartbeat_response::topic_partition::TopicPartition>,
452    pub unknown_tagged_fields: UnknownTaggedFields,
453}
454impl Encode for EndpointToPartitions {
455    fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
456        let flex = version >= 0;
457        if version >= 0 {
458            self.user_endpoint.encode(buf, version)?;
459        }
460        if version >= 0 {
461            {
462                crate::primitives::array::put_array_len(buf, (self.active_partitions).len(), flex);
463                for it in &self.active_partitions {
464                    it.encode(buf, version)?;
465                }
466            }
467        }
468        if version >= 0 {
469            {
470                crate::primitives::array::put_array_len(buf, (self.standby_partitions).len(), flex);
471                for it in &self.standby_partitions {
472                    it.encode(buf, version)?;
473                }
474            }
475        }
476        if flex {
477            let tagged = WriteTaggedFields::new();
478            tagged.write(buf, &self.unknown_tagged_fields);
479        }
480        Ok(())
481    }
482    fn encoded_len(&self, version: i16) -> usize {
483        let flex = version >= 0;
484        let mut n: usize = 0;
485        if version >= 0 {
486            n += self.user_endpoint.encoded_len(version);
487        }
488        if version >= 0 {
489            n += {
490                let prefix = crate::primitives::array::array_len_prefix_len(
491                    (self.active_partitions).len(),
492                    flex,
493                );
494                let body: usize = (self.active_partitions)
495                    .iter()
496                    .map(|it| it.encoded_len(version))
497                    .sum();
498                prefix + body
499            };
500        }
501        if version >= 0 {
502            n += {
503                let prefix = crate::primitives::array::array_len_prefix_len(
504                    (self.standby_partitions).len(),
505                    flex,
506                );
507                let body: usize = (self.standby_partitions)
508                    .iter()
509                    .map(|it| it.encoded_len(version))
510                    .sum();
511                prefix + body
512            };
513        }
514        if flex {
515            let known_pairs: Vec<(u32, usize)> = Vec::new();
516            n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
517        }
518        n
519    }
520}
521impl Decode<'_> for EndpointToPartitions {
522    fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
523        let flex = version >= 0;
524        let mut out = Self::default();
525        if version >= 0 {
526            out.user_endpoint =
527                super::common::streams_group_heartbeat_response::endpoint::Endpoint::decode(
528                    buf, version,
529                )?;
530        }
531        if version >= 0 {
532            out.active_partitions = {
533                let n = crate::primitives::array::get_array_len(buf, flex)?;
534                let mut v = Vec::with_capacity(n);
535                for _ in 0..n {
536                    v . push (super :: common :: streams_group_heartbeat_response :: topic_partition :: TopicPartition :: decode (buf , version) ?) ;
537                }
538                v
539            };
540        }
541        if version >= 0 {
542            out.standby_partitions = {
543                let n = crate::primitives::array::get_array_len(buf, flex)?;
544                let mut v = Vec::with_capacity(n);
545                for _ in 0..n {
546                    v . push (super :: common :: streams_group_heartbeat_response :: topic_partition :: TopicPartition :: decode (buf , version) ?) ;
547                }
548                v
549            };
550        }
551        if flex {
552            out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
553        }
554        Ok(out)
555    }
556}
557#[cfg(test)]
558impl EndpointToPartitions {
559    #[must_use]
560    pub fn populated(version: i16) -> Self {
561        let mut m = Self::default();
562        if version >= 0 {
563            m.user_endpoint =
564                super::common::streams_group_heartbeat_response::endpoint::Endpoint::populated(
565                    version,
566                );
567        }
568        if version >= 0 {
569            m . active_partitions = vec ! [super :: common :: streams_group_heartbeat_response :: topic_partition :: TopicPartition :: populated (version)] ;
570        }
571        if version >= 0 {
572            m . standby_partitions = vec ! [super :: common :: streams_group_heartbeat_response :: topic_partition :: TopicPartition :: populated (version)] ;
573        }
574        m
575    }
576}
577
578/// Default JSON payload matching `Self::default()` for JVM oracle differential testing.
579/// Only includes fields valid for the given version.
580#[must_use]
581#[allow(unused_comparisons)]
582pub fn default_json(version: i16) -> ::serde_json::Value {
583    let mut obj = ::serde_json::Map::new();
584    obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
585    obj.insert("errorCode".to_string(), ::serde_json::json!(0));
586    obj.insert("errorMessage".to_string(), ::serde_json::Value::Null);
587    obj.insert(
588        "memberId".to_string(),
589        ::serde_json::Value::String(String::new()),
590    );
591    obj.insert("memberEpoch".to_string(), ::serde_json::json!(0));
592    obj.insert("heartbeatIntervalMs".to_string(), ::serde_json::json!(0));
593    obj.insert("acceptableRecoveryLag".to_string(), ::serde_json::json!(0));
594    obj.insert("taskOffsetIntervalMs".to_string(), ::serde_json::json!(0));
595    obj.insert("status".to_string(), ::serde_json::Value::Null);
596    obj.insert("activeTasks".to_string(), ::serde_json::Value::Null);
597    obj.insert("standbyTasks".to_string(), ::serde_json::Value::Null);
598    obj.insert("warmupTasks".to_string(), ::serde_json::Value::Null);
599    obj.insert(
600        "endpointInformationEpoch".to_string(),
601        ::serde_json::json!(0),
602    );
603    obj.insert(
604        "partitionsByUserEndpoint".to_string(),
605        ::serde_json::Value::Null,
606    );
607    ::serde_json::Value::Object(obj)
608}