1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_bool, get_i8, get_i16, get_i32, put_bool, put_i16, put_i32};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
13
14pub const API_KEY: i16 = 88;
15pub const MIN_VERSION: i16 = 0;
16pub const MAX_VERSION: i16 = 0;
17pub const FLEXIBLE_MIN: i16 = 0;
18
19#[inline]
20fn is_flexible(version: i16) -> bool {
21 version >= FLEXIBLE_MIN
22}
23
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct StreamsGroupHeartbeatRequest {
26 pub group_id: String,
27 pub member_id: String,
28 pub member_epoch: i32,
29 pub endpoint_information_epoch: i32,
30 pub instance_id: Option<String>,
31 pub rack_id: Option<String>,
32 pub rebalance_timeout_ms: i32,
33 pub topology: Option<Topology>,
34 pub active_tasks:
35 Option<Vec<super::common::streams_group_heartbeat_request::task_ids::TaskIds>>,
36 pub standby_tasks:
37 Option<Vec<super::common::streams_group_heartbeat_request::task_ids::TaskIds>>,
38 pub warmup_tasks:
39 Option<Vec<super::common::streams_group_heartbeat_request::task_ids::TaskIds>>,
40 pub process_id: Option<String>,
41 pub user_endpoint: Option<super::common::streams_group_heartbeat_request::endpoint::Endpoint>,
42 pub client_tags:
43 Option<Vec<super::common::streams_group_heartbeat_request::key_value::KeyValue>>,
44 pub task_offsets:
45 Option<Vec<super::common::streams_group_heartbeat_request::task_offset::TaskOffset>>,
46 pub task_end_offsets:
47 Option<Vec<super::common::streams_group_heartbeat_request::task_offset::TaskOffset>>,
48 pub shutdown_application: bool,
49 pub unknown_tagged_fields: UnknownTaggedFields,
50}
51impl Default for StreamsGroupHeartbeatRequest {
52 fn default() -> Self {
53 Self {
54 group_id: String::new(),
55 member_id: String::new(),
56 member_epoch: 0i32,
57 endpoint_information_epoch: 0i32,
58 instance_id: None,
59 rack_id: None,
60 rebalance_timeout_ms: -1i32,
61 topology: None,
62 active_tasks: None,
63 standby_tasks: None,
64 warmup_tasks: None,
65 process_id: None,
66 user_endpoint: None,
67 client_tags: None,
68 task_offsets: None,
69 task_end_offsets: None,
70 shutdown_application: false,
71 unknown_tagged_fields: Default::default(),
72 }
73 }
74}
75impl Encode for StreamsGroupHeartbeatRequest {
76 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
77 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
78 return Err(ProtocolError::UnsupportedVersion {
79 api_key: API_KEY,
80 version,
81 });
82 }
83 let flex = is_flexible(version);
84 if version >= 0 {
85 if flex {
86 put_compact_string(buf, &self.group_id);
87 } else {
88 put_string(buf, &self.group_id);
89 }
90 }
91 if version >= 0 {
92 if flex {
93 put_compact_string(buf, &self.member_id);
94 } else {
95 put_string(buf, &self.member_id);
96 }
97 }
98 if version >= 0 {
99 put_i32(buf, self.member_epoch);
100 }
101 if version >= 0 {
102 put_i32(buf, self.endpoint_information_epoch);
103 }
104 if version >= 0 {
105 if flex {
106 put_compact_nullable_string(buf, self.instance_id.as_deref());
107 } else {
108 put_nullable_string(buf, self.instance_id.as_deref());
109 }
110 }
111 if version >= 0 {
112 if flex {
113 put_compact_nullable_string(buf, self.rack_id.as_deref());
114 } else {
115 put_nullable_string(buf, self.rack_id.as_deref());
116 }
117 }
118 if version >= 0 {
119 put_i32(buf, self.rebalance_timeout_ms);
120 }
121 if version >= 0 {
122 match &self.topology {
123 None => {
124 buf.put_i8(-1);
125 }
126 Some(v) => {
127 buf.put_i8(1);
128 v.encode(buf, version)?;
129 }
130 }
131 }
132 if version >= 0 {
133 {
134 let len = (self.active_tasks).as_ref().map(Vec::len);
135 crate::primitives::array::put_nullable_array_len(buf, len, flex);
136 if let Some(v) = &self.active_tasks {
137 for it in v {
138 it.encode(buf, version)?;
139 }
140 }
141 }
142 }
143 if version >= 0 {
144 {
145 let len = (self.standby_tasks).as_ref().map(Vec::len);
146 crate::primitives::array::put_nullable_array_len(buf, len, flex);
147 if let Some(v) = &self.standby_tasks {
148 for it in v {
149 it.encode(buf, version)?;
150 }
151 }
152 }
153 }
154 if version >= 0 {
155 {
156 let len = (self.warmup_tasks).as_ref().map(Vec::len);
157 crate::primitives::array::put_nullable_array_len(buf, len, flex);
158 if let Some(v) = &self.warmup_tasks {
159 for it in v {
160 it.encode(buf, version)?;
161 }
162 }
163 }
164 }
165 if version >= 0 {
166 if flex {
167 put_compact_nullable_string(buf, self.process_id.as_deref());
168 } else {
169 put_nullable_string(buf, self.process_id.as_deref());
170 }
171 }
172 if version >= 0 {
173 match &self.user_endpoint {
174 None => {
175 buf.put_i8(-1);
176 }
177 Some(v) => {
178 buf.put_i8(1);
179 v.encode(buf, version)?;
180 }
181 }
182 }
183 if version >= 0 {
184 {
185 let len = (self.client_tags).as_ref().map(Vec::len);
186 crate::primitives::array::put_nullable_array_len(buf, len, flex);
187 if let Some(v) = &self.client_tags {
188 for it in v {
189 it.encode(buf, version)?;
190 }
191 }
192 }
193 }
194 if version >= 0 {
195 {
196 let len = (self.task_offsets).as_ref().map(Vec::len);
197 crate::primitives::array::put_nullable_array_len(buf, len, flex);
198 if let Some(v) = &self.task_offsets {
199 for it in v {
200 it.encode(buf, version)?;
201 }
202 }
203 }
204 }
205 if version >= 0 {
206 {
207 let len = (self.task_end_offsets).as_ref().map(Vec::len);
208 crate::primitives::array::put_nullable_array_len(buf, len, flex);
209 if let Some(v) = &self.task_end_offsets {
210 for it in v {
211 it.encode(buf, version)?;
212 }
213 }
214 }
215 }
216 if version >= 0 {
217 put_bool(buf, self.shutdown_application);
218 }
219 if flex {
220 let tagged = WriteTaggedFields::new();
221 tagged.write(buf, &self.unknown_tagged_fields);
222 }
223 Ok(())
224 }
225 fn encoded_len(&self, version: i16) -> usize {
226 let flex = is_flexible(version);
227 let mut n: usize = 0;
228 if version >= 0 {
229 n += if flex {
230 compact_string_len(&self.group_id)
231 } else {
232 string_len(&self.group_id)
233 };
234 }
235 if version >= 0 {
236 n += if flex {
237 compact_string_len(&self.member_id)
238 } else {
239 string_len(&self.member_id)
240 };
241 }
242 if version >= 0 {
243 n += 4;
244 }
245 if version >= 0 {
246 n += 4;
247 }
248 if version >= 0 {
249 n += if flex {
250 compact_nullable_string_len(self.instance_id.as_deref())
251 } else {
252 nullable_string_len(self.instance_id.as_deref())
253 };
254 }
255 if version >= 0 {
256 n += if flex {
257 compact_nullable_string_len(self.rack_id.as_deref())
258 } else {
259 nullable_string_len(self.rack_id.as_deref())
260 };
261 }
262 if version >= 0 {
263 n += 4;
264 }
265 if version >= 0 {
266 n += 1 + self.topology.as_ref().map_or(0, |v| v.encoded_len(version));
267 }
268 if version >= 0 {
269 n += {
270 let opt: Option<&Vec<_>> = (self.active_tasks).as_ref();
271 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
272 opt.map(std::vec::Vec::len),
273 flex,
274 );
275 let body: usize =
276 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
277 prefix + body
278 };
279 }
280 if version >= 0 {
281 n += {
282 let opt: Option<&Vec<_>> = (self.standby_tasks).as_ref();
283 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
284 opt.map(std::vec::Vec::len),
285 flex,
286 );
287 let body: usize =
288 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
289 prefix + body
290 };
291 }
292 if version >= 0 {
293 n += {
294 let opt: Option<&Vec<_>> = (self.warmup_tasks).as_ref();
295 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
296 opt.map(std::vec::Vec::len),
297 flex,
298 );
299 let body: usize =
300 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
301 prefix + body
302 };
303 }
304 if version >= 0 {
305 n += if flex {
306 compact_nullable_string_len(self.process_id.as_deref())
307 } else {
308 nullable_string_len(self.process_id.as_deref())
309 };
310 }
311 if version >= 0 {
312 n += 1 + self
313 .user_endpoint
314 .as_ref()
315 .map_or(0, |v| v.encoded_len(version));
316 }
317 if version >= 0 {
318 n += {
319 let opt: Option<&Vec<_>> = (self.client_tags).as_ref();
320 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
321 opt.map(std::vec::Vec::len),
322 flex,
323 );
324 let body: usize =
325 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
326 prefix + body
327 };
328 }
329 if version >= 0 {
330 n += {
331 let opt: Option<&Vec<_>> = (self.task_offsets).as_ref();
332 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
333 opt.map(std::vec::Vec::len),
334 flex,
335 );
336 let body: usize =
337 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
338 prefix + body
339 };
340 }
341 if version >= 0 {
342 n += {
343 let opt: Option<&Vec<_>> = (self.task_end_offsets).as_ref();
344 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
345 opt.map(std::vec::Vec::len),
346 flex,
347 );
348 let body: usize =
349 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
350 prefix + body
351 };
352 }
353 if version >= 0 {
354 n += 1;
355 }
356 if flex {
357 let known_pairs: Vec<(u32, usize)> = Vec::new();
358 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
359 }
360 n
361 }
362}
363impl Decode<'_> for StreamsGroupHeartbeatRequest {
364 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
365 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
366 return Err(ProtocolError::UnsupportedVersion {
367 api_key: API_KEY,
368 version,
369 });
370 }
371 let flex = is_flexible(version);
372 let mut out = Self::default();
373 if version >= 0 {
374 out.group_id = if flex {
375 get_compact_string_owned(buf)?
376 } else {
377 get_string_owned(buf)?
378 };
379 }
380 if version >= 0 {
381 out.member_id = if flex {
382 get_compact_string_owned(buf)?
383 } else {
384 get_string_owned(buf)?
385 };
386 }
387 if version >= 0 {
388 out.member_epoch = get_i32(buf)?;
389 }
390 if version >= 0 {
391 out.endpoint_information_epoch = get_i32(buf)?;
392 }
393 if version >= 0 {
394 out.instance_id = if flex {
395 get_compact_nullable_string_owned(buf)?
396 } else {
397 get_nullable_string_owned(buf)?
398 };
399 }
400 if version >= 0 {
401 out.rack_id = if flex {
402 get_compact_nullable_string_owned(buf)?
403 } else {
404 get_nullable_string_owned(buf)?
405 };
406 }
407 if version >= 0 {
408 out.rebalance_timeout_ms = get_i32(buf)?;
409 }
410 if version >= 0 {
411 out.topology = if get_i8(buf)? < 0 {
412 None
413 } else {
414 Some(Topology::decode(buf, version)?)
415 };
416 }
417 if version >= 0 {
418 out.active_tasks = {
419 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
420 match opt {
421 None => None,
422 Some(n) => {
423 let mut v = Vec::with_capacity(n);
424 for _ in 0..n {
425 v . push (super :: common :: streams_group_heartbeat_request :: task_ids :: TaskIds :: decode (buf , version) ?) ;
426 }
427 Some(v)
428 }
429 }
430 };
431 }
432 if version >= 0 {
433 out.standby_tasks = {
434 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
435 match opt {
436 None => None,
437 Some(n) => {
438 let mut v = Vec::with_capacity(n);
439 for _ in 0..n {
440 v . push (super :: common :: streams_group_heartbeat_request :: task_ids :: TaskIds :: decode (buf , version) ?) ;
441 }
442 Some(v)
443 }
444 }
445 };
446 }
447 if version >= 0 {
448 out.warmup_tasks = {
449 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
450 match opt {
451 None => None,
452 Some(n) => {
453 let mut v = Vec::with_capacity(n);
454 for _ in 0..n {
455 v . push (super :: common :: streams_group_heartbeat_request :: task_ids :: TaskIds :: decode (buf , version) ?) ;
456 }
457 Some(v)
458 }
459 }
460 };
461 }
462 if version >= 0 {
463 out.process_id = if flex {
464 get_compact_nullable_string_owned(buf)?
465 } else {
466 get_nullable_string_owned(buf)?
467 };
468 }
469 if version >= 0 {
470 out.user_endpoint = if get_i8(buf)? < 0 {
471 None
472 } else {
473 Some(
474 super::common::streams_group_heartbeat_request::endpoint::Endpoint::decode(
475 buf, version,
476 )?,
477 )
478 };
479 }
480 if version >= 0 {
481 out.client_tags = {
482 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
483 match opt {
484 None => None,
485 Some(n) => {
486 let mut v = Vec::with_capacity(n);
487 for _ in 0..n {
488 v . push (super :: common :: streams_group_heartbeat_request :: key_value :: KeyValue :: decode (buf , version) ?) ;
489 }
490 Some(v)
491 }
492 }
493 };
494 }
495 if version >= 0 {
496 out.task_offsets = {
497 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
498 match opt {
499 None => None,
500 Some(n) => {
501 let mut v = Vec::with_capacity(n);
502 for _ in 0..n {
503 v . push (super :: common :: streams_group_heartbeat_request :: task_offset :: TaskOffset :: decode (buf , version) ?) ;
504 }
505 Some(v)
506 }
507 }
508 };
509 }
510 if version >= 0 {
511 out.task_end_offsets = {
512 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
513 match opt {
514 None => None,
515 Some(n) => {
516 let mut v = Vec::with_capacity(n);
517 for _ in 0..n {
518 v . push (super :: common :: streams_group_heartbeat_request :: task_offset :: TaskOffset :: decode (buf , version) ?) ;
519 }
520 Some(v)
521 }
522 }
523 };
524 }
525 if version >= 0 {
526 out.shutdown_application = get_bool(buf)?;
527 }
528 if flex {
529 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
530 }
531 Ok(out)
532 }
533}
534#[cfg(test)]
535impl StreamsGroupHeartbeatRequest {
536 #[must_use]
537 pub fn populated(version: i16) -> Self {
538 let mut m = Self::default();
539 if version >= 0 {
540 m.group_id = "x".to_string();
541 }
542 if version >= 0 {
543 m.member_id = "x".to_string();
544 }
545 if version >= 0 {
546 m.member_epoch = 1i32;
547 }
548 if version >= 0 {
549 m.endpoint_information_epoch = 1i32;
550 }
551 if version >= 0 {
552 m.instance_id = Some("x".to_string());
553 }
554 if version >= 0 {
555 m.rack_id = Some("x".to_string());
556 }
557 if version >= 0 {
558 m.rebalance_timeout_ms = 1i32;
559 }
560 if version >= 0 {
561 m.topology = Some(Topology::populated(version));
562 }
563 if version >= 0 {
564 m.active_tasks = Some(vec![
565 super::common::streams_group_heartbeat_request::task_ids::TaskIds::populated(
566 version,
567 ),
568 ]);
569 }
570 if version >= 0 {
571 m.standby_tasks = Some(vec![
572 super::common::streams_group_heartbeat_request::task_ids::TaskIds::populated(
573 version,
574 ),
575 ]);
576 }
577 if version >= 0 {
578 m.warmup_tasks = Some(vec![
579 super::common::streams_group_heartbeat_request::task_ids::TaskIds::populated(
580 version,
581 ),
582 ]);
583 }
584 if version >= 0 {
585 m.process_id = Some("x".to_string());
586 }
587 if version >= 0 {
588 m.user_endpoint = Some(
589 super::common::streams_group_heartbeat_request::endpoint::Endpoint::populated(
590 version,
591 ),
592 );
593 }
594 if version >= 0 {
595 m.client_tags = Some(vec![
596 super::common::streams_group_heartbeat_request::key_value::KeyValue::populated(
597 version,
598 ),
599 ]);
600 }
601 if version >= 0 {
602 m.task_offsets = Some(vec![
603 super::common::streams_group_heartbeat_request::task_offset::TaskOffset::populated(
604 version,
605 ),
606 ]);
607 }
608 if version >= 0 {
609 m.task_end_offsets = Some(vec![
610 super::common::streams_group_heartbeat_request::task_offset::TaskOffset::populated(
611 version,
612 ),
613 ]);
614 }
615 if version >= 0 {
616 m.shutdown_application = true;
617 }
618 m
619 }
620}
621#[derive(Debug, Clone, PartialEq, Eq, Default)]
622pub struct Topology {
623 pub epoch: i32,
624 pub subtopologies: Vec<Subtopology>,
625 pub unknown_tagged_fields: UnknownTaggedFields,
626}
627impl Encode for Topology {
628 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
629 let flex = version >= 0;
630 if version >= 0 {
631 put_i32(buf, self.epoch);
632 }
633 if version >= 0 {
634 {
635 crate::primitives::array::put_array_len(buf, (self.subtopologies).len(), flex);
636 for it in &self.subtopologies {
637 it.encode(buf, version)?;
638 }
639 }
640 }
641 if flex {
642 let tagged = WriteTaggedFields::new();
643 tagged.write(buf, &self.unknown_tagged_fields);
644 }
645 Ok(())
646 }
647 fn encoded_len(&self, version: i16) -> usize {
648 let flex = version >= 0;
649 let mut n: usize = 0;
650 if version >= 0 {
651 n += 4;
652 }
653 if version >= 0 {
654 n += {
655 let prefix = crate::primitives::array::array_len_prefix_len(
656 (self.subtopologies).len(),
657 flex,
658 );
659 let body: usize = (self.subtopologies)
660 .iter()
661 .map(|it| it.encoded_len(version))
662 .sum();
663 prefix + body
664 };
665 }
666 if flex {
667 let known_pairs: Vec<(u32, usize)> = Vec::new();
668 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
669 }
670 n
671 }
672}
673impl Decode<'_> for Topology {
674 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
675 let flex = version >= 0;
676 let mut out = Self::default();
677 if version >= 0 {
678 out.epoch = get_i32(buf)?;
679 }
680 if version >= 0 {
681 out.subtopologies = {
682 let n = crate::primitives::array::get_array_len(buf, flex)?;
683 let mut v = Vec::with_capacity(n);
684 for _ in 0..n {
685 v.push(Subtopology::decode(buf, version)?);
686 }
687 v
688 };
689 }
690 if flex {
691 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
692 }
693 Ok(out)
694 }
695}
696#[cfg(test)]
697impl Topology {
698 #[must_use]
699 pub fn populated(version: i16) -> Self {
700 let mut m = Self::default();
701 if version >= 0 {
702 m.epoch = 1i32;
703 }
704 if version >= 0 {
705 m.subtopologies = vec![Subtopology::populated(version)];
706 }
707 m
708 }
709}
710#[derive(Debug, Clone, PartialEq, Eq, Default)]
711pub struct Subtopology {
712 pub subtopology_id: String,
713 pub source_topics: Vec<String>,
714 pub source_topic_regex: Vec<String>,
715 pub state_changelog_topics:
716 Vec<super::common::streams_group_heartbeat_request::topic_info::TopicInfo>,
717 pub repartition_sink_topics: Vec<String>,
718 pub repartition_source_topics:
719 Vec<super::common::streams_group_heartbeat_request::topic_info::TopicInfo>,
720 pub copartition_groups: Vec<CopartitionGroup>,
721 pub unknown_tagged_fields: UnknownTaggedFields,
722}
723impl Encode for Subtopology {
724 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
725 let flex = version >= 0;
726 if version >= 0 {
727 if flex {
728 put_compact_string(buf, &self.subtopology_id);
729 } else {
730 put_string(buf, &self.subtopology_id);
731 }
732 }
733 if version >= 0 {
734 {
735 crate::primitives::array::put_array_len(buf, (self.source_topics).len(), flex);
736 for it in &self.source_topics {
737 if flex {
738 put_compact_string(buf, it);
739 } else {
740 put_string(buf, it);
741 }
742 }
743 }
744 }
745 if version >= 0 {
746 {
747 crate::primitives::array::put_array_len(buf, (self.source_topic_regex).len(), flex);
748 for it in &self.source_topic_regex {
749 if flex {
750 put_compact_string(buf, it);
751 } else {
752 put_string(buf, it);
753 }
754 }
755 }
756 }
757 if version >= 0 {
758 {
759 crate::primitives::array::put_array_len(
760 buf,
761 (self.state_changelog_topics).len(),
762 flex,
763 );
764 for it in &self.state_changelog_topics {
765 it.encode(buf, version)?;
766 }
767 }
768 }
769 if version >= 0 {
770 {
771 crate::primitives::array::put_array_len(
772 buf,
773 (self.repartition_sink_topics).len(),
774 flex,
775 );
776 for it in &self.repartition_sink_topics {
777 if flex {
778 put_compact_string(buf, it);
779 } else {
780 put_string(buf, it);
781 }
782 }
783 }
784 }
785 if version >= 0 {
786 {
787 crate::primitives::array::put_array_len(
788 buf,
789 (self.repartition_source_topics).len(),
790 flex,
791 );
792 for it in &self.repartition_source_topics {
793 it.encode(buf, version)?;
794 }
795 }
796 }
797 if version >= 0 {
798 {
799 crate::primitives::array::put_array_len(buf, (self.copartition_groups).len(), flex);
800 for it in &self.copartition_groups {
801 it.encode(buf, version)?;
802 }
803 }
804 }
805 if flex {
806 let tagged = WriteTaggedFields::new();
807 tagged.write(buf, &self.unknown_tagged_fields);
808 }
809 Ok(())
810 }
811 fn encoded_len(&self, version: i16) -> usize {
812 let flex = version >= 0;
813 let mut n: usize = 0;
814 if version >= 0 {
815 n += if flex {
816 compact_string_len(&self.subtopology_id)
817 } else {
818 string_len(&self.subtopology_id)
819 };
820 }
821 if version >= 0 {
822 n += {
823 let prefix = crate::primitives::array::array_len_prefix_len(
824 (self.source_topics).len(),
825 flex,
826 );
827 let body: usize = (self.source_topics)
828 .iter()
829 .map(|it| {
830 if flex {
831 compact_string_len(it)
832 } else {
833 string_len(it)
834 }
835 })
836 .sum();
837 prefix + body
838 };
839 }
840 if version >= 0 {
841 n += {
842 let prefix = crate::primitives::array::array_len_prefix_len(
843 (self.source_topic_regex).len(),
844 flex,
845 );
846 let body: usize = (self.source_topic_regex)
847 .iter()
848 .map(|it| {
849 if flex {
850 compact_string_len(it)
851 } else {
852 string_len(it)
853 }
854 })
855 .sum();
856 prefix + body
857 };
858 }
859 if version >= 0 {
860 n += {
861 let prefix = crate::primitives::array::array_len_prefix_len(
862 (self.state_changelog_topics).len(),
863 flex,
864 );
865 let body: usize = (self.state_changelog_topics)
866 .iter()
867 .map(|it| it.encoded_len(version))
868 .sum();
869 prefix + body
870 };
871 }
872 if version >= 0 {
873 n += {
874 let prefix = crate::primitives::array::array_len_prefix_len(
875 (self.repartition_sink_topics).len(),
876 flex,
877 );
878 let body: usize = (self.repartition_sink_topics)
879 .iter()
880 .map(|it| {
881 if flex {
882 compact_string_len(it)
883 } else {
884 string_len(it)
885 }
886 })
887 .sum();
888 prefix + body
889 };
890 }
891 if version >= 0 {
892 n += {
893 let prefix = crate::primitives::array::array_len_prefix_len(
894 (self.repartition_source_topics).len(),
895 flex,
896 );
897 let body: usize = (self.repartition_source_topics)
898 .iter()
899 .map(|it| it.encoded_len(version))
900 .sum();
901 prefix + body
902 };
903 }
904 if version >= 0 {
905 n += {
906 let prefix = crate::primitives::array::array_len_prefix_len(
907 (self.copartition_groups).len(),
908 flex,
909 );
910 let body: usize = (self.copartition_groups)
911 .iter()
912 .map(|it| it.encoded_len(version))
913 .sum();
914 prefix + body
915 };
916 }
917 if flex {
918 let known_pairs: Vec<(u32, usize)> = Vec::new();
919 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
920 }
921 n
922 }
923}
924impl Decode<'_> for Subtopology {
925 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
926 let flex = version >= 0;
927 let mut out = Self::default();
928 if version >= 0 {
929 out.subtopology_id = if flex {
930 get_compact_string_owned(buf)?
931 } else {
932 get_string_owned(buf)?
933 };
934 }
935 if version >= 0 {
936 out.source_topics = {
937 let n = crate::primitives::array::get_array_len(buf, flex)?;
938 let mut v = Vec::with_capacity(n);
939 for _ in 0..n {
940 v.push(if flex {
941 get_compact_string_owned(buf)?
942 } else {
943 get_string_owned(buf)?
944 });
945 }
946 v
947 };
948 }
949 if version >= 0 {
950 out.source_topic_regex = {
951 let n = crate::primitives::array::get_array_len(buf, flex)?;
952 let mut v = Vec::with_capacity(n);
953 for _ in 0..n {
954 v.push(if flex {
955 get_compact_string_owned(buf)?
956 } else {
957 get_string_owned(buf)?
958 });
959 }
960 v
961 };
962 }
963 if version >= 0 {
964 out.state_changelog_topics = {
965 let n = crate::primitives::array::get_array_len(buf, flex)?;
966 let mut v = Vec::with_capacity(n);
967 for _ in 0..n {
968 v . push (super :: common :: streams_group_heartbeat_request :: topic_info :: TopicInfo :: decode (buf , version) ?) ;
969 }
970 v
971 };
972 }
973 if version >= 0 {
974 out.repartition_sink_topics = {
975 let n = crate::primitives::array::get_array_len(buf, flex)?;
976 let mut v = Vec::with_capacity(n);
977 for _ in 0..n {
978 v.push(if flex {
979 get_compact_string_owned(buf)?
980 } else {
981 get_string_owned(buf)?
982 });
983 }
984 v
985 };
986 }
987 if version >= 0 {
988 out.repartition_source_topics = {
989 let n = crate::primitives::array::get_array_len(buf, flex)?;
990 let mut v = Vec::with_capacity(n);
991 for _ in 0..n {
992 v . push (super :: common :: streams_group_heartbeat_request :: topic_info :: TopicInfo :: decode (buf , version) ?) ;
993 }
994 v
995 };
996 }
997 if version >= 0 {
998 out.copartition_groups = {
999 let n = crate::primitives::array::get_array_len(buf, flex)?;
1000 let mut v = Vec::with_capacity(n);
1001 for _ in 0..n {
1002 v.push(CopartitionGroup::decode(buf, version)?);
1003 }
1004 v
1005 };
1006 }
1007 if flex {
1008 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
1009 }
1010 Ok(out)
1011 }
1012}
1013#[cfg(test)]
1014impl Subtopology {
1015 #[must_use]
1016 pub fn populated(version: i16) -> Self {
1017 let mut m = Self::default();
1018 if version >= 0 {
1019 m.subtopology_id = "x".to_string();
1020 }
1021 if version >= 0 {
1022 m.source_topics = vec!["x".to_string()];
1023 }
1024 if version >= 0 {
1025 m.source_topic_regex = vec!["x".to_string()];
1026 }
1027 if version >= 0 {
1028 m.state_changelog_topics = vec![
1029 super::common::streams_group_heartbeat_request::topic_info::TopicInfo::populated(
1030 version,
1031 ),
1032 ];
1033 }
1034 if version >= 0 {
1035 m.repartition_sink_topics = vec!["x".to_string()];
1036 }
1037 if version >= 0 {
1038 m.repartition_source_topics = vec![
1039 super::common::streams_group_heartbeat_request::topic_info::TopicInfo::populated(
1040 version,
1041 ),
1042 ];
1043 }
1044 if version >= 0 {
1045 m.copartition_groups = vec![CopartitionGroup::populated(version)];
1046 }
1047 m
1048 }
1049}
1050#[derive(Debug, Clone, PartialEq, Eq, Default)]
1051pub struct CopartitionGroup {
1052 pub source_topics: Vec<i16>,
1053 pub source_topic_regex: Vec<i16>,
1054 pub repartition_source_topics: Vec<i16>,
1055 pub unknown_tagged_fields: UnknownTaggedFields,
1056}
1057impl Encode for CopartitionGroup {
1058 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
1059 let flex = version >= 0;
1060 if version >= 0 {
1061 {
1062 crate::primitives::array::put_array_len(buf, (self.source_topics).len(), flex);
1063 for it in &self.source_topics {
1064 put_i16(buf, *it);
1065 }
1066 }
1067 }
1068 if version >= 0 {
1069 {
1070 crate::primitives::array::put_array_len(buf, (self.source_topic_regex).len(), flex);
1071 for it in &self.source_topic_regex {
1072 put_i16(buf, *it);
1073 }
1074 }
1075 }
1076 if version >= 0 {
1077 {
1078 crate::primitives::array::put_array_len(
1079 buf,
1080 (self.repartition_source_topics).len(),
1081 flex,
1082 );
1083 for it in &self.repartition_source_topics {
1084 put_i16(buf, *it);
1085 }
1086 }
1087 }
1088 if flex {
1089 let tagged = WriteTaggedFields::new();
1090 tagged.write(buf, &self.unknown_tagged_fields);
1091 }
1092 Ok(())
1093 }
1094 fn encoded_len(&self, version: i16) -> usize {
1095 let flex = version >= 0;
1096 let mut n: usize = 0;
1097 if version >= 0 {
1098 n += {
1099 let prefix = crate::primitives::array::array_len_prefix_len(
1100 (self.source_topics).len(),
1101 flex,
1102 );
1103 let body: usize = (self.source_topics).iter().map(|_| 2).sum();
1104 prefix + body
1105 };
1106 }
1107 if version >= 0 {
1108 n += {
1109 let prefix = crate::primitives::array::array_len_prefix_len(
1110 (self.source_topic_regex).len(),
1111 flex,
1112 );
1113 let body: usize = (self.source_topic_regex).iter().map(|_| 2).sum();
1114 prefix + body
1115 };
1116 }
1117 if version >= 0 {
1118 n += {
1119 let prefix = crate::primitives::array::array_len_prefix_len(
1120 (self.repartition_source_topics).len(),
1121 flex,
1122 );
1123 let body: usize = (self.repartition_source_topics).iter().map(|_| 2).sum();
1124 prefix + body
1125 };
1126 }
1127 if flex {
1128 let known_pairs: Vec<(u32, usize)> = Vec::new();
1129 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
1130 }
1131 n
1132 }
1133}
1134impl Decode<'_> for CopartitionGroup {
1135 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
1136 let flex = version >= 0;
1137 let mut out = Self::default();
1138 if version >= 0 {
1139 out.source_topics = {
1140 let n = crate::primitives::array::get_array_len(buf, flex)?;
1141 let mut v = Vec::with_capacity(n);
1142 for _ in 0..n {
1143 v.push(get_i16(buf)?);
1144 }
1145 v
1146 };
1147 }
1148 if version >= 0 {
1149 out.source_topic_regex = {
1150 let n = crate::primitives::array::get_array_len(buf, flex)?;
1151 let mut v = Vec::with_capacity(n);
1152 for _ in 0..n {
1153 v.push(get_i16(buf)?);
1154 }
1155 v
1156 };
1157 }
1158 if version >= 0 {
1159 out.repartition_source_topics = {
1160 let n = crate::primitives::array::get_array_len(buf, flex)?;
1161 let mut v = Vec::with_capacity(n);
1162 for _ in 0..n {
1163 v.push(get_i16(buf)?);
1164 }
1165 v
1166 };
1167 }
1168 if flex {
1169 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
1170 }
1171 Ok(out)
1172 }
1173}
1174#[cfg(test)]
1175impl CopartitionGroup {
1176 #[must_use]
1177 pub fn populated(version: i16) -> Self {
1178 let mut m = Self::default();
1179 if version >= 0 {
1180 m.source_topics = vec![1i16];
1181 }
1182 if version >= 0 {
1183 m.source_topic_regex = vec![1i16];
1184 }
1185 if version >= 0 {
1186 m.repartition_source_topics = vec![1i16];
1187 }
1188 m
1189 }
1190}
1191
1192#[must_use]
1195#[allow(unused_comparisons)]
1196pub fn default_json(version: i16) -> ::serde_json::Value {
1197 let mut obj = ::serde_json::Map::new();
1198 obj.insert(
1199 "groupId".to_string(),
1200 ::serde_json::Value::String(String::new()),
1201 );
1202 obj.insert(
1203 "memberId".to_string(),
1204 ::serde_json::Value::String(String::new()),
1205 );
1206 obj.insert("memberEpoch".to_string(), ::serde_json::json!(0));
1207 obj.insert(
1208 "endpointInformationEpoch".to_string(),
1209 ::serde_json::json!(0),
1210 );
1211 obj.insert("instanceId".to_string(), ::serde_json::Value::Null);
1212 obj.insert("rackId".to_string(), ::serde_json::Value::Null);
1213 obj.insert("rebalanceTimeoutMs".to_string(), ::serde_json::json!(-1));
1214 obj.insert("topology".to_string(), ::serde_json::Value::Null);
1215 obj.insert("activeTasks".to_string(), ::serde_json::Value::Null);
1216 obj.insert("standbyTasks".to_string(), ::serde_json::Value::Null);
1217 obj.insert("warmupTasks".to_string(), ::serde_json::Value::Null);
1218 obj.insert("processId".to_string(), ::serde_json::Value::Null);
1219 obj.insert("userEndpoint".to_string(), ::serde_json::Value::Null);
1220 obj.insert("clientTags".to_string(), ::serde_json::Value::Null);
1221 obj.insert("taskOffsets".to_string(), ::serde_json::Value::Null);
1222 obj.insert("taskEndOffsets".to_string(), ::serde_json::Value::Null);
1223 obj.insert(
1224 "shutdownApplication".to_string(),
1225 ::serde_json::Value::Bool(false),
1226 );
1227 ::serde_json::Value::Object(obj)
1228}
1229
1230impl crate::ProtocolRequest for StreamsGroupHeartbeatRequest {
1231 const API_KEY: i16 = API_KEY;
1232 const MIN_VERSION: i16 = MIN_VERSION;
1233 const MAX_VERSION: i16 = MAX_VERSION;
1234 const FLEXIBLE_MIN: i16 = FLEXIBLE_MIN;
1235 type Response = super::streams_group_heartbeat_response::StreamsGroupHeartbeatResponse;
1236}