1use crate::primitives::fixed::{get_bool, get_i8, get_i16, get_i32, put_bool, put_i16, put_i32};
4use crate::primitives::string_bytes::{
5 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
6 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
7 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
8};
9use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
10use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
11use bytes::{Buf, BufMut};
12pub const API_KEY: i16 = 88;
13pub const MIN_VERSION: i16 = 0;
14pub const MAX_VERSION: i16 = 0;
15pub const FLEXIBLE_MIN: i16 = 0;
16#[inline]
17fn is_flexible(version: i16) -> bool {
18 version >= FLEXIBLE_MIN
19}
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct StreamsGroupHeartbeatRequest {
22 pub group_id: String,
23 pub member_id: String,
24 pub member_epoch: i32,
25 pub endpoint_information_epoch: i32,
26 pub instance_id: Option<String>,
27 pub rack_id: Option<String>,
28 pub rebalance_timeout_ms: i32,
29 pub topology: Option<Topology>,
30 pub active_tasks:
31 Option<Vec<super::common::streams_group_heartbeat_request::task_ids::TaskIds>>,
32 pub standby_tasks:
33 Option<Vec<super::common::streams_group_heartbeat_request::task_ids::TaskIds>>,
34 pub warmup_tasks:
35 Option<Vec<super::common::streams_group_heartbeat_request::task_ids::TaskIds>>,
36 pub process_id: Option<String>,
37 pub user_endpoint: Option<super::common::streams_group_heartbeat_request::endpoint::Endpoint>,
38 pub client_tags:
39 Option<Vec<super::common::streams_group_heartbeat_request::key_value::KeyValue>>,
40 pub task_offsets:
41 Option<Vec<super::common::streams_group_heartbeat_request::task_offset::TaskOffset>>,
42 pub task_end_offsets:
43 Option<Vec<super::common::streams_group_heartbeat_request::task_offset::TaskOffset>>,
44 pub shutdown_application: bool,
45 pub unknown_tagged_fields: UnknownTaggedFields,
46}
47impl Default for StreamsGroupHeartbeatRequest {
48 fn default() -> Self {
49 Self {
50 group_id: String::new(),
51 member_id: String::new(),
52 member_epoch: 0i32,
53 endpoint_information_epoch: 0i32,
54 instance_id: None,
55 rack_id: None,
56 rebalance_timeout_ms: -1i32,
57 topology: None,
58 active_tasks: None,
59 standby_tasks: None,
60 warmup_tasks: None,
61 process_id: None,
62 user_endpoint: None,
63 client_tags: None,
64 task_offsets: None,
65 task_end_offsets: None,
66 shutdown_application: false,
67 unknown_tagged_fields: Default::default(),
68 }
69 }
70}
71impl Encode for StreamsGroupHeartbeatRequest {
72 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
73 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
74 return Err(ProtocolError::UnsupportedVersion {
75 api_key: API_KEY,
76 version,
77 });
78 }
79 let flex = is_flexible(version);
80 if version >= 0 {
81 if flex {
82 put_compact_string(buf, &self.group_id);
83 } else {
84 put_string(buf, &self.group_id);
85 }
86 }
87 if version >= 0 {
88 if flex {
89 put_compact_string(buf, &self.member_id);
90 } else {
91 put_string(buf, &self.member_id);
92 }
93 }
94 if version >= 0 {
95 put_i32(buf, self.member_epoch);
96 }
97 if version >= 0 {
98 put_i32(buf, self.endpoint_information_epoch);
99 }
100 if version >= 0 {
101 if flex {
102 put_compact_nullable_string(buf, self.instance_id.as_deref());
103 } else {
104 put_nullable_string(buf, self.instance_id.as_deref());
105 }
106 }
107 if version >= 0 {
108 if flex {
109 put_compact_nullable_string(buf, self.rack_id.as_deref());
110 } else {
111 put_nullable_string(buf, self.rack_id.as_deref());
112 }
113 }
114 if version >= 0 {
115 put_i32(buf, self.rebalance_timeout_ms);
116 }
117 if version >= 0 {
118 match &self.topology {
119 None => {
120 buf.put_i8(-1);
121 }
122 Some(v) => {
123 buf.put_i8(1);
124 v.encode(buf, version)?;
125 }
126 }
127 }
128 if version >= 0 {
129 {
130 let len = (self.active_tasks).as_ref().map(Vec::len);
131 crate::primitives::array::put_nullable_array_len(buf, len, flex);
132 if let Some(v) = &self.active_tasks {
133 for it in v {
134 it.encode(buf, version)?;
135 }
136 }
137 }
138 }
139 if version >= 0 {
140 {
141 let len = (self.standby_tasks).as_ref().map(Vec::len);
142 crate::primitives::array::put_nullable_array_len(buf, len, flex);
143 if let Some(v) = &self.standby_tasks {
144 for it in v {
145 it.encode(buf, version)?;
146 }
147 }
148 }
149 }
150 if version >= 0 {
151 {
152 let len = (self.warmup_tasks).as_ref().map(Vec::len);
153 crate::primitives::array::put_nullable_array_len(buf, len, flex);
154 if let Some(v) = &self.warmup_tasks {
155 for it in v {
156 it.encode(buf, version)?;
157 }
158 }
159 }
160 }
161 if version >= 0 {
162 if flex {
163 put_compact_nullable_string(buf, self.process_id.as_deref());
164 } else {
165 put_nullable_string(buf, self.process_id.as_deref());
166 }
167 }
168 if version >= 0 {
169 match &self.user_endpoint {
170 None => {
171 buf.put_i8(-1);
172 }
173 Some(v) => {
174 buf.put_i8(1);
175 v.encode(buf, version)?;
176 }
177 }
178 }
179 if version >= 0 {
180 {
181 let len = (self.client_tags).as_ref().map(Vec::len);
182 crate::primitives::array::put_nullable_array_len(buf, len, flex);
183 if let Some(v) = &self.client_tags {
184 for it in v {
185 it.encode(buf, version)?;
186 }
187 }
188 }
189 }
190 if version >= 0 {
191 {
192 let len = (self.task_offsets).as_ref().map(Vec::len);
193 crate::primitives::array::put_nullable_array_len(buf, len, flex);
194 if let Some(v) = &self.task_offsets {
195 for it in v {
196 it.encode(buf, version)?;
197 }
198 }
199 }
200 }
201 if version >= 0 {
202 {
203 let len = (self.task_end_offsets).as_ref().map(Vec::len);
204 crate::primitives::array::put_nullable_array_len(buf, len, flex);
205 if let Some(v) = &self.task_end_offsets {
206 for it in v {
207 it.encode(buf, version)?;
208 }
209 }
210 }
211 }
212 if version >= 0 {
213 put_bool(buf, self.shutdown_application);
214 }
215 if flex {
216 let tagged = WriteTaggedFields::new();
217 tagged.write(buf, &self.unknown_tagged_fields);
218 }
219 Ok(())
220 }
221 fn encoded_len(&self, version: i16) -> usize {
222 let flex = is_flexible(version);
223 let mut n: usize = 0;
224 if version >= 0 {
225 n += if flex {
226 compact_string_len(&self.group_id)
227 } else {
228 string_len(&self.group_id)
229 };
230 }
231 if version >= 0 {
232 n += if flex {
233 compact_string_len(&self.member_id)
234 } else {
235 string_len(&self.member_id)
236 };
237 }
238 if version >= 0 {
239 n += 4;
240 }
241 if version >= 0 {
242 n += 4;
243 }
244 if version >= 0 {
245 n += if flex {
246 compact_nullable_string_len(self.instance_id.as_deref())
247 } else {
248 nullable_string_len(self.instance_id.as_deref())
249 };
250 }
251 if version >= 0 {
252 n += if flex {
253 compact_nullable_string_len(self.rack_id.as_deref())
254 } else {
255 nullable_string_len(self.rack_id.as_deref())
256 };
257 }
258 if version >= 0 {
259 n += 4;
260 }
261 if version >= 0 {
262 n += 1 + self.topology.as_ref().map_or(0, |v| v.encoded_len(version));
263 }
264 if version >= 0 {
265 n += {
266 let opt: Option<&Vec<_>> = (self.active_tasks).as_ref();
267 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
268 opt.map(std::vec::Vec::len),
269 flex,
270 );
271 let body: usize =
272 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
273 prefix + body
274 };
275 }
276 if version >= 0 {
277 n += {
278 let opt: Option<&Vec<_>> = (self.standby_tasks).as_ref();
279 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
280 opt.map(std::vec::Vec::len),
281 flex,
282 );
283 let body: usize =
284 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
285 prefix + body
286 };
287 }
288 if version >= 0 {
289 n += {
290 let opt: Option<&Vec<_>> = (self.warmup_tasks).as_ref();
291 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
292 opt.map(std::vec::Vec::len),
293 flex,
294 );
295 let body: usize =
296 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
297 prefix + body
298 };
299 }
300 if version >= 0 {
301 n += if flex {
302 compact_nullable_string_len(self.process_id.as_deref())
303 } else {
304 nullable_string_len(self.process_id.as_deref())
305 };
306 }
307 if version >= 0 {
308 n += 1 + self
309 .user_endpoint
310 .as_ref()
311 .map_or(0, |v| v.encoded_len(version));
312 }
313 if version >= 0 {
314 n += {
315 let opt: Option<&Vec<_>> = (self.client_tags).as_ref();
316 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
317 opt.map(std::vec::Vec::len),
318 flex,
319 );
320 let body: usize =
321 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
322 prefix + body
323 };
324 }
325 if version >= 0 {
326 n += {
327 let opt: Option<&Vec<_>> = (self.task_offsets).as_ref();
328 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
329 opt.map(std::vec::Vec::len),
330 flex,
331 );
332 let body: usize =
333 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
334 prefix + body
335 };
336 }
337 if version >= 0 {
338 n += {
339 let opt: Option<&Vec<_>> = (self.task_end_offsets).as_ref();
340 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
341 opt.map(std::vec::Vec::len),
342 flex,
343 );
344 let body: usize =
345 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
346 prefix + body
347 };
348 }
349 if version >= 0 {
350 n += 1;
351 }
352 if flex {
353 let known_pairs: Vec<(u32, usize)> = Vec::new();
354 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
355 }
356 n
357 }
358}
359impl Decode<'_> for StreamsGroupHeartbeatRequest {
360 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
361 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
362 return Err(ProtocolError::UnsupportedVersion {
363 api_key: API_KEY,
364 version,
365 });
366 }
367 let flex = is_flexible(version);
368 let mut out = Self::default();
369 if version >= 0 {
370 out.group_id = if flex {
371 get_compact_string_owned(buf)?
372 } else {
373 get_string_owned(buf)?
374 };
375 }
376 if version >= 0 {
377 out.member_id = if flex {
378 get_compact_string_owned(buf)?
379 } else {
380 get_string_owned(buf)?
381 };
382 }
383 if version >= 0 {
384 out.member_epoch = get_i32(buf)?;
385 }
386 if version >= 0 {
387 out.endpoint_information_epoch = get_i32(buf)?;
388 }
389 if version >= 0 {
390 out.instance_id = if flex {
391 get_compact_nullable_string_owned(buf)?
392 } else {
393 get_nullable_string_owned(buf)?
394 };
395 }
396 if version >= 0 {
397 out.rack_id = if flex {
398 get_compact_nullable_string_owned(buf)?
399 } else {
400 get_nullable_string_owned(buf)?
401 };
402 }
403 if version >= 0 {
404 out.rebalance_timeout_ms = get_i32(buf)?;
405 }
406 if version >= 0 {
407 out.topology = if get_i8(buf)? < 0 {
408 None
409 } else {
410 Some(Topology::decode(buf, version)?)
411 };
412 }
413 if version >= 0 {
414 out.active_tasks = {
415 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
416 match opt {
417 None => None,
418 Some(n) => {
419 let mut v = Vec::with_capacity(n);
420 for _ in 0..n {
421 v.push(
422 super::common::streams_group_heartbeat_request::task_ids::TaskIds::decode(
423 buf,
424 version,
425 )?,
426 );
427 }
428 Some(v)
429 }
430 }
431 };
432 }
433 if version >= 0 {
434 out.standby_tasks = {
435 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
436 match opt {
437 None => None,
438 Some(n) => {
439 let mut v = Vec::with_capacity(n);
440 for _ in 0..n {
441 v.push(
442 super::common::streams_group_heartbeat_request::task_ids::TaskIds::decode(
443 buf,
444 version,
445 )?,
446 );
447 }
448 Some(v)
449 }
450 }
451 };
452 }
453 if version >= 0 {
454 out.warmup_tasks = {
455 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
456 match opt {
457 None => None,
458 Some(n) => {
459 let mut v = Vec::with_capacity(n);
460 for _ in 0..n {
461 v.push(
462 super::common::streams_group_heartbeat_request::task_ids::TaskIds::decode(
463 buf,
464 version,
465 )?,
466 );
467 }
468 Some(v)
469 }
470 }
471 };
472 }
473 if version >= 0 {
474 out.process_id = if flex {
475 get_compact_nullable_string_owned(buf)?
476 } else {
477 get_nullable_string_owned(buf)?
478 };
479 }
480 if version >= 0 {
481 out.user_endpoint = if get_i8(buf)? < 0 {
482 None
483 } else {
484 Some(
485 super::common::streams_group_heartbeat_request::endpoint::Endpoint::decode(
486 buf, version,
487 )?,
488 )
489 };
490 }
491 if version >= 0 {
492 out.client_tags = {
493 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
494 match opt {
495 None => None,
496 Some(n) => {
497 let mut v = Vec::with_capacity(n);
498 for _ in 0..n {
499 v.push(
500 super::common::streams_group_heartbeat_request::key_value::KeyValue::decode(
501 buf,
502 version,
503 )?,
504 );
505 }
506 Some(v)
507 }
508 }
509 };
510 }
511 if version >= 0 {
512 out.task_offsets = {
513 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
514 match opt {
515 None => None,
516 Some(n) => {
517 let mut v = Vec::with_capacity(n);
518 for _ in 0..n {
519 v.push(
520 super::common::streams_group_heartbeat_request::task_offset::TaskOffset::decode(
521 buf,
522 version,
523 )?,
524 );
525 }
526 Some(v)
527 }
528 }
529 };
530 }
531 if version >= 0 {
532 out.task_end_offsets = {
533 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
534 match opt {
535 None => None,
536 Some(n) => {
537 let mut v = Vec::with_capacity(n);
538 for _ in 0..n {
539 v.push(
540 super::common::streams_group_heartbeat_request::task_offset::TaskOffset::decode(
541 buf,
542 version,
543 )?,
544 );
545 }
546 Some(v)
547 }
548 }
549 };
550 }
551 if version >= 0 {
552 out.shutdown_application = get_bool(buf)?;
553 }
554 if flex {
555 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
556 }
557 Ok(out)
558 }
559}
560#[cfg(test)]
561impl StreamsGroupHeartbeatRequest {
562 #[must_use]
563 pub fn populated(version: i16) -> Self {
564 let mut m = Self::default();
565 if version >= 0 {
566 m.group_id = "x".to_string();
567 }
568 if version >= 0 {
569 m.member_id = "x".to_string();
570 }
571 if version >= 0 {
572 m.member_epoch = 1i32;
573 }
574 if version >= 0 {
575 m.endpoint_information_epoch = 1i32;
576 }
577 if version >= 0 {
578 m.instance_id = Some("x".to_string());
579 }
580 if version >= 0 {
581 m.rack_id = Some("x".to_string());
582 }
583 if version >= 0 {
584 m.rebalance_timeout_ms = 1i32;
585 }
586 if version >= 0 {
587 m.topology = Some(Topology::populated(version));
588 }
589 if version >= 0 {
590 m.active_tasks = Some(vec![
591 super::common::streams_group_heartbeat_request::task_ids::TaskIds::populated(
592 version,
593 ),
594 ]);
595 }
596 if version >= 0 {
597 m.standby_tasks = Some(vec![
598 super::common::streams_group_heartbeat_request::task_ids::TaskIds::populated(
599 version,
600 ),
601 ]);
602 }
603 if version >= 0 {
604 m.warmup_tasks = Some(vec![
605 super::common::streams_group_heartbeat_request::task_ids::TaskIds::populated(
606 version,
607 ),
608 ]);
609 }
610 if version >= 0 {
611 m.process_id = Some("x".to_string());
612 }
613 if version >= 0 {
614 m.user_endpoint = Some(
615 super::common::streams_group_heartbeat_request::endpoint::Endpoint::populated(
616 version,
617 ),
618 );
619 }
620 if version >= 0 {
621 m.client_tags = Some(vec![
622 super::common::streams_group_heartbeat_request::key_value::KeyValue::populated(
623 version,
624 ),
625 ]);
626 }
627 if version >= 0 {
628 m.task_offsets = Some(vec![
629 super::common::streams_group_heartbeat_request::task_offset::TaskOffset::populated(
630 version,
631 ),
632 ]);
633 }
634 if version >= 0 {
635 m.task_end_offsets = Some(vec![
636 super::common::streams_group_heartbeat_request::task_offset::TaskOffset::populated(
637 version,
638 ),
639 ]);
640 }
641 if version >= 0 {
642 m.shutdown_application = true;
643 }
644 m
645 }
646}
647#[derive(Debug, Clone, PartialEq, Eq, Default)]
648pub struct Topology {
649 pub epoch: i32,
650 pub subtopologies: Vec<Subtopology>,
651 pub unknown_tagged_fields: UnknownTaggedFields,
652}
653impl Encode for Topology {
654 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
655 let flex = version >= 0;
656 if version >= 0 {
657 put_i32(buf, self.epoch);
658 }
659 if version >= 0 {
660 {
661 crate::primitives::array::put_array_len(buf, (self.subtopologies).len(), flex);
662 for it in &self.subtopologies {
663 it.encode(buf, version)?;
664 }
665 }
666 }
667 if flex {
668 let tagged = WriteTaggedFields::new();
669 tagged.write(buf, &self.unknown_tagged_fields);
670 }
671 Ok(())
672 }
673 fn encoded_len(&self, version: i16) -> usize {
674 let flex = version >= 0;
675 let mut n: usize = 0;
676 if version >= 0 {
677 n += 4;
678 }
679 if version >= 0 {
680 n += {
681 let prefix = crate::primitives::array::array_len_prefix_len(
682 (self.subtopologies).len(),
683 flex,
684 );
685 let body: usize = (self.subtopologies)
686 .iter()
687 .map(|it| it.encoded_len(version))
688 .sum();
689 prefix + body
690 };
691 }
692 if flex {
693 let known_pairs: Vec<(u32, usize)> = Vec::new();
694 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
695 }
696 n
697 }
698}
699impl Decode<'_> for Topology {
700 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
701 let flex = version >= 0;
702 let mut out = Self::default();
703 if version >= 0 {
704 out.epoch = get_i32(buf)?;
705 }
706 if version >= 0 {
707 out.subtopologies = {
708 let n = crate::primitives::array::get_array_len(buf, flex)?;
709 let mut v = Vec::with_capacity(n);
710 for _ in 0..n {
711 v.push(Subtopology::decode(buf, version)?);
712 }
713 v
714 };
715 }
716 if flex {
717 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
718 }
719 Ok(out)
720 }
721}
722#[cfg(test)]
723impl Topology {
724 #[must_use]
725 pub fn populated(version: i16) -> Self {
726 let mut m = Self::default();
727 if version >= 0 {
728 m.epoch = 1i32;
729 }
730 if version >= 0 {
731 m.subtopologies = vec![Subtopology::populated(version)];
732 }
733 m
734 }
735}
736#[derive(Debug, Clone, PartialEq, Eq, Default)]
737pub struct Subtopology {
738 pub subtopology_id: String,
739 pub source_topics: Vec<String>,
740 pub source_topic_regex: Vec<String>,
741 pub state_changelog_topics:
742 Vec<super::common::streams_group_heartbeat_request::topic_info::TopicInfo>,
743 pub repartition_sink_topics: Vec<String>,
744 pub repartition_source_topics:
745 Vec<super::common::streams_group_heartbeat_request::topic_info::TopicInfo>,
746 pub copartition_groups: Vec<CopartitionGroup>,
747 pub unknown_tagged_fields: UnknownTaggedFields,
748}
749impl Encode for Subtopology {
750 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
751 let flex = version >= 0;
752 if version >= 0 {
753 if flex {
754 put_compact_string(buf, &self.subtopology_id);
755 } else {
756 put_string(buf, &self.subtopology_id);
757 }
758 }
759 if version >= 0 {
760 {
761 crate::primitives::array::put_array_len(buf, (self.source_topics).len(), flex);
762 for it in &self.source_topics {
763 if flex {
764 put_compact_string(buf, it);
765 } else {
766 put_string(buf, it);
767 }
768 }
769 }
770 }
771 if version >= 0 {
772 {
773 crate::primitives::array::put_array_len(buf, (self.source_topic_regex).len(), flex);
774 for it in &self.source_topic_regex {
775 if flex {
776 put_compact_string(buf, it);
777 } else {
778 put_string(buf, it);
779 }
780 }
781 }
782 }
783 if version >= 0 {
784 {
785 crate::primitives::array::put_array_len(
786 buf,
787 (self.state_changelog_topics).len(),
788 flex,
789 );
790 for it in &self.state_changelog_topics {
791 it.encode(buf, version)?;
792 }
793 }
794 }
795 if version >= 0 {
796 {
797 crate::primitives::array::put_array_len(
798 buf,
799 (self.repartition_sink_topics).len(),
800 flex,
801 );
802 for it in &self.repartition_sink_topics {
803 if flex {
804 put_compact_string(buf, it);
805 } else {
806 put_string(buf, it);
807 }
808 }
809 }
810 }
811 if version >= 0 {
812 {
813 crate::primitives::array::put_array_len(
814 buf,
815 (self.repartition_source_topics).len(),
816 flex,
817 );
818 for it in &self.repartition_source_topics {
819 it.encode(buf, version)?;
820 }
821 }
822 }
823 if version >= 0 {
824 {
825 crate::primitives::array::put_array_len(buf, (self.copartition_groups).len(), flex);
826 for it in &self.copartition_groups {
827 it.encode(buf, version)?;
828 }
829 }
830 }
831 if flex {
832 let tagged = WriteTaggedFields::new();
833 tagged.write(buf, &self.unknown_tagged_fields);
834 }
835 Ok(())
836 }
837 fn encoded_len(&self, version: i16) -> usize {
838 let flex = version >= 0;
839 let mut n: usize = 0;
840 if version >= 0 {
841 n += if flex {
842 compact_string_len(&self.subtopology_id)
843 } else {
844 string_len(&self.subtopology_id)
845 };
846 }
847 if version >= 0 {
848 n += {
849 let prefix = crate::primitives::array::array_len_prefix_len(
850 (self.source_topics).len(),
851 flex,
852 );
853 let body: usize = (self.source_topics)
854 .iter()
855 .map(|it| {
856 if flex {
857 compact_string_len(it)
858 } else {
859 string_len(it)
860 }
861 })
862 .sum();
863 prefix + body
864 };
865 }
866 if version >= 0 {
867 n += {
868 let prefix = crate::primitives::array::array_len_prefix_len(
869 (self.source_topic_regex).len(),
870 flex,
871 );
872 let body: usize = (self.source_topic_regex)
873 .iter()
874 .map(|it| {
875 if flex {
876 compact_string_len(it)
877 } else {
878 string_len(it)
879 }
880 })
881 .sum();
882 prefix + body
883 };
884 }
885 if version >= 0 {
886 n += {
887 let prefix = crate::primitives::array::array_len_prefix_len(
888 (self.state_changelog_topics).len(),
889 flex,
890 );
891 let body: usize = (self.state_changelog_topics)
892 .iter()
893 .map(|it| it.encoded_len(version))
894 .sum();
895 prefix + body
896 };
897 }
898 if version >= 0 {
899 n += {
900 let prefix = crate::primitives::array::array_len_prefix_len(
901 (self.repartition_sink_topics).len(),
902 flex,
903 );
904 let body: usize = (self.repartition_sink_topics)
905 .iter()
906 .map(|it| {
907 if flex {
908 compact_string_len(it)
909 } else {
910 string_len(it)
911 }
912 })
913 .sum();
914 prefix + body
915 };
916 }
917 if version >= 0 {
918 n += {
919 let prefix = crate::primitives::array::array_len_prefix_len(
920 (self.repartition_source_topics).len(),
921 flex,
922 );
923 let body: usize = (self.repartition_source_topics)
924 .iter()
925 .map(|it| it.encoded_len(version))
926 .sum();
927 prefix + body
928 };
929 }
930 if version >= 0 {
931 n += {
932 let prefix = crate::primitives::array::array_len_prefix_len(
933 (self.copartition_groups).len(),
934 flex,
935 );
936 let body: usize = (self.copartition_groups)
937 .iter()
938 .map(|it| it.encoded_len(version))
939 .sum();
940 prefix + body
941 };
942 }
943 if flex {
944 let known_pairs: Vec<(u32, usize)> = Vec::new();
945 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
946 }
947 n
948 }
949}
950impl Decode<'_> for Subtopology {
951 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
952 let flex = version >= 0;
953 let mut out = Self::default();
954 if version >= 0 {
955 out.subtopology_id = if flex {
956 get_compact_string_owned(buf)?
957 } else {
958 get_string_owned(buf)?
959 };
960 }
961 if version >= 0 {
962 out.source_topics = {
963 let n = crate::primitives::array::get_array_len(buf, flex)?;
964 let mut v = Vec::with_capacity(n);
965 for _ in 0..n {
966 v.push(if flex {
967 get_compact_string_owned(buf)?
968 } else {
969 get_string_owned(buf)?
970 });
971 }
972 v
973 };
974 }
975 if version >= 0 {
976 out.source_topic_regex = {
977 let n = crate::primitives::array::get_array_len(buf, flex)?;
978 let mut v = Vec::with_capacity(n);
979 for _ in 0..n {
980 v.push(if flex {
981 get_compact_string_owned(buf)?
982 } else {
983 get_string_owned(buf)?
984 });
985 }
986 v
987 };
988 }
989 if version >= 0 {
990 out.state_changelog_topics = {
991 let n = crate::primitives::array::get_array_len(buf, flex)?;
992 let mut v = Vec::with_capacity(n);
993 for _ in 0..n {
994 v.push(
995 super::common::streams_group_heartbeat_request::topic_info::TopicInfo::decode(
996 buf,
997 version,
998 )?,
999 );
1000 }
1001 v
1002 };
1003 }
1004 if version >= 0 {
1005 out.repartition_sink_topics = {
1006 let n = crate::primitives::array::get_array_len(buf, flex)?;
1007 let mut v = Vec::with_capacity(n);
1008 for _ in 0..n {
1009 v.push(if flex {
1010 get_compact_string_owned(buf)?
1011 } else {
1012 get_string_owned(buf)?
1013 });
1014 }
1015 v
1016 };
1017 }
1018 if version >= 0 {
1019 out.repartition_source_topics = {
1020 let n = crate::primitives::array::get_array_len(buf, flex)?;
1021 let mut v = Vec::with_capacity(n);
1022 for _ in 0..n {
1023 v.push(
1024 super::common::streams_group_heartbeat_request::topic_info::TopicInfo::decode(
1025 buf,
1026 version,
1027 )?,
1028 );
1029 }
1030 v
1031 };
1032 }
1033 if version >= 0 {
1034 out.copartition_groups = {
1035 let n = crate::primitives::array::get_array_len(buf, flex)?;
1036 let mut v = Vec::with_capacity(n);
1037 for _ in 0..n {
1038 v.push(CopartitionGroup::decode(buf, version)?);
1039 }
1040 v
1041 };
1042 }
1043 if flex {
1044 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
1045 }
1046 Ok(out)
1047 }
1048}
1049#[cfg(test)]
1050impl Subtopology {
1051 #[must_use]
1052 pub fn populated(version: i16) -> Self {
1053 let mut m = Self::default();
1054 if version >= 0 {
1055 m.subtopology_id = "x".to_string();
1056 }
1057 if version >= 0 {
1058 m.source_topics = vec!["x".to_string()];
1059 }
1060 if version >= 0 {
1061 m.source_topic_regex = vec!["x".to_string()];
1062 }
1063 if version >= 0 {
1064 m.state_changelog_topics = vec![
1065 super::common::streams_group_heartbeat_request::topic_info::TopicInfo::populated(
1066 version,
1067 ),
1068 ];
1069 }
1070 if version >= 0 {
1071 m.repartition_sink_topics = vec!["x".to_string()];
1072 }
1073 if version >= 0 {
1074 m.repartition_source_topics = vec![
1075 super::common::streams_group_heartbeat_request::topic_info::TopicInfo::populated(
1076 version,
1077 ),
1078 ];
1079 }
1080 if version >= 0 {
1081 m.copartition_groups = vec![CopartitionGroup::populated(version)];
1082 }
1083 m
1084 }
1085}
1086#[derive(Debug, Clone, PartialEq, Eq, Default)]
1087pub struct CopartitionGroup {
1088 pub source_topics: Vec<i16>,
1089 pub source_topic_regex: Vec<i16>,
1090 pub repartition_source_topics: Vec<i16>,
1091 pub unknown_tagged_fields: UnknownTaggedFields,
1092}
1093impl Encode for CopartitionGroup {
1094 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
1095 let flex = version >= 0;
1096 if version >= 0 {
1097 {
1098 crate::primitives::array::put_array_len(buf, (self.source_topics).len(), flex);
1099 for it in &self.source_topics {
1100 put_i16(buf, *it);
1101 }
1102 }
1103 }
1104 if version >= 0 {
1105 {
1106 crate::primitives::array::put_array_len(buf, (self.source_topic_regex).len(), flex);
1107 for it in &self.source_topic_regex {
1108 put_i16(buf, *it);
1109 }
1110 }
1111 }
1112 if version >= 0 {
1113 {
1114 crate::primitives::array::put_array_len(
1115 buf,
1116 (self.repartition_source_topics).len(),
1117 flex,
1118 );
1119 for it in &self.repartition_source_topics {
1120 put_i16(buf, *it);
1121 }
1122 }
1123 }
1124 if flex {
1125 let tagged = WriteTaggedFields::new();
1126 tagged.write(buf, &self.unknown_tagged_fields);
1127 }
1128 Ok(())
1129 }
1130 fn encoded_len(&self, version: i16) -> usize {
1131 let flex = version >= 0;
1132 let mut n: usize = 0;
1133 if version >= 0 {
1134 n += {
1135 let prefix = crate::primitives::array::array_len_prefix_len(
1136 (self.source_topics).len(),
1137 flex,
1138 );
1139 let body: usize = (self.source_topics).iter().map(|_| 2).sum();
1140 prefix + body
1141 };
1142 }
1143 if version >= 0 {
1144 n += {
1145 let prefix = crate::primitives::array::array_len_prefix_len(
1146 (self.source_topic_regex).len(),
1147 flex,
1148 );
1149 let body: usize = (self.source_topic_regex).iter().map(|_| 2).sum();
1150 prefix + body
1151 };
1152 }
1153 if version >= 0 {
1154 n += {
1155 let prefix = crate::primitives::array::array_len_prefix_len(
1156 (self.repartition_source_topics).len(),
1157 flex,
1158 );
1159 let body: usize = (self.repartition_source_topics).iter().map(|_| 2).sum();
1160 prefix + body
1161 };
1162 }
1163 if flex {
1164 let known_pairs: Vec<(u32, usize)> = Vec::new();
1165 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
1166 }
1167 n
1168 }
1169}
1170impl Decode<'_> for CopartitionGroup {
1171 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
1172 let flex = version >= 0;
1173 let mut out = Self::default();
1174 if version >= 0 {
1175 out.source_topics = {
1176 let n = crate::primitives::array::get_array_len(buf, flex)?;
1177 let mut v = Vec::with_capacity(n);
1178 for _ in 0..n {
1179 v.push(get_i16(buf)?);
1180 }
1181 v
1182 };
1183 }
1184 if version >= 0 {
1185 out.source_topic_regex = {
1186 let n = crate::primitives::array::get_array_len(buf, flex)?;
1187 let mut v = Vec::with_capacity(n);
1188 for _ in 0..n {
1189 v.push(get_i16(buf)?);
1190 }
1191 v
1192 };
1193 }
1194 if version >= 0 {
1195 out.repartition_source_topics = {
1196 let n = crate::primitives::array::get_array_len(buf, flex)?;
1197 let mut v = Vec::with_capacity(n);
1198 for _ in 0..n {
1199 v.push(get_i16(buf)?);
1200 }
1201 v
1202 };
1203 }
1204 if flex {
1205 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
1206 }
1207 Ok(out)
1208 }
1209}
1210#[cfg(test)]
1211impl CopartitionGroup {
1212 #[must_use]
1213 pub fn populated(version: i16) -> Self {
1214 let mut m = Self::default();
1215 if version >= 0 {
1216 m.source_topics = vec![1i16];
1217 }
1218 if version >= 0 {
1219 m.source_topic_regex = vec![1i16];
1220 }
1221 if version >= 0 {
1222 m.repartition_source_topics = vec![1i16];
1223 }
1224 m
1225 }
1226}
1227#[must_use]
1230#[allow(unused_comparisons)]
1231pub fn default_json(version: i16) -> ::serde_json::Value {
1232 let mut obj = ::serde_json::Map::new();
1233 obj.insert(
1234 "groupId".to_string(),
1235 ::serde_json::Value::String(String::new()),
1236 );
1237 obj.insert(
1238 "memberId".to_string(),
1239 ::serde_json::Value::String(String::new()),
1240 );
1241 obj.insert("memberEpoch".to_string(), ::serde_json::json!(0));
1242 obj.insert(
1243 "endpointInformationEpoch".to_string(),
1244 ::serde_json::json!(0),
1245 );
1246 obj.insert("instanceId".to_string(), ::serde_json::Value::Null);
1247 obj.insert("rackId".to_string(), ::serde_json::Value::Null);
1248 obj.insert("rebalanceTimeoutMs".to_string(), ::serde_json::json!(-1));
1249 obj.insert("topology".to_string(), ::serde_json::Value::Null);
1250 obj.insert("activeTasks".to_string(), ::serde_json::Value::Null);
1251 obj.insert("standbyTasks".to_string(), ::serde_json::Value::Null);
1252 obj.insert("warmupTasks".to_string(), ::serde_json::Value::Null);
1253 obj.insert("processId".to_string(), ::serde_json::Value::Null);
1254 obj.insert("userEndpoint".to_string(), ::serde_json::Value::Null);
1255 obj.insert("clientTags".to_string(), ::serde_json::Value::Null);
1256 obj.insert("taskOffsets".to_string(), ::serde_json::Value::Null);
1257 obj.insert("taskEndOffsets".to_string(), ::serde_json::Value::Null);
1258 obj.insert(
1259 "shutdownApplication".to_string(),
1260 ::serde_json::Value::Bool(false),
1261 );
1262 ::serde_json::Value::Object(obj)
1263}
1264impl crate::ProtocolRequest for StreamsGroupHeartbeatRequest {
1265 const API_KEY: i16 = API_KEY;
1266 const MIN_VERSION: i16 = MIN_VERSION;
1267 const MAX_VERSION: i16 = MAX_VERSION;
1268 const FLEXIBLE_MIN: i16 = FLEXIBLE_MIN;
1269 type Response = super::streams_group_heartbeat_response::StreamsGroupHeartbeatResponse;
1270}