1use crate::primitives::fixed::{get_bool, get_i8, get_i16, get_i32, put_bool, put_i16, put_i32};
3use crate::primitives::string_bytes::{
4 compact_nullable_string_len, compact_string_len, nullable_string_len,
5 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
6};
7use crate::primitives::string_bytes_borrowed::{
8 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
9 get_nullable_string_borrowed, get_string_borrowed,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
13use bytes::BufMut;
14pub const API_KEY: i16 = 88;
15pub const MIN_VERSION: i16 = 0;
16pub const MAX_VERSION: i16 = 0;
17pub const FLEXIBLE_MIN: i16 = 0;
18#[inline]
19fn is_flexible(version: i16) -> bool {
20 version >= FLEXIBLE_MIN
21}
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct StreamsGroupHeartbeatRequest<'a> {
24 pub group_id: &'a str,
25 pub member_id: &'a str,
26 pub member_epoch: i32,
27 pub endpoint_information_epoch: i32,
28 pub instance_id: Option<&'a str>,
29 pub rack_id: Option<&'a str>,
30 pub rebalance_timeout_ms: i32,
31 pub topology: Option<Topology<'a>>,
32 pub active_tasks:
33 Option<Vec<super::common::streams_group_heartbeat_request::task_ids::TaskIds<'a>>>,
34 pub standby_tasks:
35 Option<Vec<super::common::streams_group_heartbeat_request::task_ids::TaskIds<'a>>>,
36 pub warmup_tasks:
37 Option<Vec<super::common::streams_group_heartbeat_request::task_ids::TaskIds<'a>>>,
38 pub process_id: Option<&'a str>,
39 pub user_endpoint:
40 Option<super::common::streams_group_heartbeat_request::endpoint::Endpoint<'a>>,
41 pub client_tags:
42 Option<Vec<super::common::streams_group_heartbeat_request::key_value::KeyValue<'a>>>,
43 pub task_offsets:
44 Option<Vec<super::common::streams_group_heartbeat_request::task_offset::TaskOffset<'a>>>,
45 pub task_end_offsets:
46 Option<Vec<super::common::streams_group_heartbeat_request::task_offset::TaskOffset<'a>>>,
47 pub shutdown_application: bool,
48 pub unknown_tagged_fields: UnknownTaggedFields,
49}
50impl Default for StreamsGroupHeartbeatRequest<'_> {
51 fn default() -> Self {
52 Self {
53 group_id: "",
54 member_id: "",
55 member_epoch: 0i32,
56 endpoint_information_epoch: 0i32,
57 instance_id: None,
58 rack_id: None,
59 rebalance_timeout_ms: -1i32,
60 topology: None,
61 active_tasks: None,
62 standby_tasks: None,
63 warmup_tasks: None,
64 process_id: None,
65 user_endpoint: None,
66 client_tags: None,
67 task_offsets: None,
68 task_end_offsets: None,
69 shutdown_application: false,
70 unknown_tagged_fields: Default::default(),
71 }
72 }
73}
74impl StreamsGroupHeartbeatRequest<'_> {
75 pub fn to_owned(
76 &self,
77 ) -> crate::owned::streams_group_heartbeat_request::StreamsGroupHeartbeatRequest {
78 crate::owned::streams_group_heartbeat_request::StreamsGroupHeartbeatRequest {
79 group_id: (self.group_id).to_string(),
80 member_id: (self.member_id).to_string(),
81 member_epoch: (self.member_epoch),
82 endpoint_information_epoch: (self.endpoint_information_epoch),
83 instance_id: (self.instance_id).map(std::string::ToString::to_string),
84 rack_id: (self.rack_id).map(std::string::ToString::to_string),
85 rebalance_timeout_ms: (self.rebalance_timeout_ms),
86 topology: (self.topology).as_ref().map(Topology::to_owned),
87 active_tasks: (self.active_tasks)
88 .as_ref()
89 .map(|v| v.iter().map(super::common::streams_group_heartbeat_request::task_ids::TaskIds::to_owned).collect()),
90 standby_tasks: (self.standby_tasks)
91 .as_ref()
92 .map(|v| v.iter().map(super::common::streams_group_heartbeat_request::task_ids::TaskIds::to_owned).collect()),
93 warmup_tasks: (self.warmup_tasks)
94 .as_ref()
95 .map(|v| v.iter().map(super::common::streams_group_heartbeat_request::task_ids::TaskIds::to_owned).collect()),
96 process_id: (self.process_id).map(std::string::ToString::to_string),
97 user_endpoint: (self.user_endpoint).as_ref().map(super::common::streams_group_heartbeat_request::endpoint::Endpoint::to_owned),
98 client_tags: (self.client_tags)
99 .as_ref()
100 .map(|v| v.iter().map(super::common::streams_group_heartbeat_request::key_value::KeyValue::to_owned).collect()),
101 task_offsets: (self.task_offsets)
102 .as_ref()
103 .map(|v| v.iter().map(super::common::streams_group_heartbeat_request::task_offset::TaskOffset::to_owned).collect()),
104 task_end_offsets: (self.task_end_offsets)
105 .as_ref()
106 .map(|v| v.iter().map(super::common::streams_group_heartbeat_request::task_offset::TaskOffset::to_owned).collect()),
107 shutdown_application: (self.shutdown_application),
108 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
109 }
110 }
111}
112impl Encode for StreamsGroupHeartbeatRequest<'_> {
113 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
114 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
115 return Err(ProtocolError::UnsupportedVersion {
116 api_key: API_KEY,
117 version,
118 });
119 }
120 let flex = is_flexible(version);
121 if version >= 0 {
122 if flex {
123 put_compact_string(buf, self.group_id);
124 } else {
125 put_string(buf, self.group_id);
126 }
127 }
128 if version >= 0 {
129 if flex {
130 put_compact_string(buf, self.member_id);
131 } else {
132 put_string(buf, self.member_id);
133 }
134 }
135 if version >= 0 {
136 put_i32(buf, self.member_epoch);
137 }
138 if version >= 0 {
139 put_i32(buf, self.endpoint_information_epoch);
140 }
141 if version >= 0 {
142 if flex {
143 put_compact_nullable_string(buf, self.instance_id);
144 } else {
145 put_nullable_string(buf, self.instance_id);
146 }
147 }
148 if version >= 0 {
149 if flex {
150 put_compact_nullable_string(buf, self.rack_id);
151 } else {
152 put_nullable_string(buf, self.rack_id);
153 }
154 }
155 if version >= 0 {
156 put_i32(buf, self.rebalance_timeout_ms);
157 }
158 if version >= 0 {
159 match &self.topology {
160 None => {
161 buf.put_i8(-1);
162 }
163 Some(v) => {
164 buf.put_i8(1);
165 v.encode(buf, version)?;
166 }
167 }
168 }
169 if version >= 0 {
170 {
171 let len = (self.active_tasks).as_ref().map(Vec::len);
172 crate::primitives::array::put_nullable_array_len(buf, len, flex);
173 if let Some(v) = &self.active_tasks {
174 for it in v {
175 it.encode(buf, version)?;
176 }
177 }
178 }
179 }
180 if version >= 0 {
181 {
182 let len = (self.standby_tasks).as_ref().map(Vec::len);
183 crate::primitives::array::put_nullable_array_len(buf, len, flex);
184 if let Some(v) = &self.standby_tasks {
185 for it in v {
186 it.encode(buf, version)?;
187 }
188 }
189 }
190 }
191 if version >= 0 {
192 {
193 let len = (self.warmup_tasks).as_ref().map(Vec::len);
194 crate::primitives::array::put_nullable_array_len(buf, len, flex);
195 if let Some(v) = &self.warmup_tasks {
196 for it in v {
197 it.encode(buf, version)?;
198 }
199 }
200 }
201 }
202 if version >= 0 {
203 if flex {
204 put_compact_nullable_string(buf, self.process_id);
205 } else {
206 put_nullable_string(buf, self.process_id);
207 }
208 }
209 if version >= 0 {
210 match &self.user_endpoint {
211 None => {
212 buf.put_i8(-1);
213 }
214 Some(v) => {
215 buf.put_i8(1);
216 v.encode(buf, version)?;
217 }
218 }
219 }
220 if version >= 0 {
221 {
222 let len = (self.client_tags).as_ref().map(Vec::len);
223 crate::primitives::array::put_nullable_array_len(buf, len, flex);
224 if let Some(v) = &self.client_tags {
225 for it in v {
226 it.encode(buf, version)?;
227 }
228 }
229 }
230 }
231 if version >= 0 {
232 {
233 let len = (self.task_offsets).as_ref().map(Vec::len);
234 crate::primitives::array::put_nullable_array_len(buf, len, flex);
235 if let Some(v) = &self.task_offsets {
236 for it in v {
237 it.encode(buf, version)?;
238 }
239 }
240 }
241 }
242 if version >= 0 {
243 {
244 let len = (self.task_end_offsets).as_ref().map(Vec::len);
245 crate::primitives::array::put_nullable_array_len(buf, len, flex);
246 if let Some(v) = &self.task_end_offsets {
247 for it in v {
248 it.encode(buf, version)?;
249 }
250 }
251 }
252 }
253 if version >= 0 {
254 put_bool(buf, self.shutdown_application);
255 }
256 if flex {
257 let tagged = WriteTaggedFields::new();
258 tagged.write(buf, &self.unknown_tagged_fields);
259 }
260 Ok(())
261 }
262 fn encoded_len(&self, version: i16) -> usize {
263 let flex = is_flexible(version);
264 let mut n: usize = 0;
265 if version >= 0 {
266 n += if flex {
267 compact_string_len(self.group_id)
268 } else {
269 string_len(self.group_id)
270 };
271 }
272 if version >= 0 {
273 n += if flex {
274 compact_string_len(self.member_id)
275 } else {
276 string_len(self.member_id)
277 };
278 }
279 if version >= 0 {
280 n += 4;
281 }
282 if version >= 0 {
283 n += 4;
284 }
285 if version >= 0 {
286 n += if flex {
287 compact_nullable_string_len(self.instance_id)
288 } else {
289 nullable_string_len(self.instance_id)
290 };
291 }
292 if version >= 0 {
293 n += if flex {
294 compact_nullable_string_len(self.rack_id)
295 } else {
296 nullable_string_len(self.rack_id)
297 };
298 }
299 if version >= 0 {
300 n += 4;
301 }
302 if version >= 0 {
303 n += 1 + self.topology.as_ref().map_or(0, |v| v.encoded_len(version));
304 }
305 if version >= 0 {
306 n += {
307 let opt: Option<&Vec<_>> = (self.active_tasks).as_ref();
308 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
309 opt.map(std::vec::Vec::len),
310 flex,
311 );
312 let body: usize =
313 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
314 prefix + body
315 };
316 }
317 if version >= 0 {
318 n += {
319 let opt: Option<&Vec<_>> = (self.standby_tasks).as_ref();
320 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
321 opt.map(std::vec::Vec::len),
322 flex,
323 );
324 let body: usize =
325 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
326 prefix + body
327 };
328 }
329 if version >= 0 {
330 n += {
331 let opt: Option<&Vec<_>> = (self.warmup_tasks).as_ref();
332 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
333 opt.map(std::vec::Vec::len),
334 flex,
335 );
336 let body: usize =
337 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
338 prefix + body
339 };
340 }
341 if version >= 0 {
342 n += if flex {
343 compact_nullable_string_len(self.process_id)
344 } else {
345 nullable_string_len(self.process_id)
346 };
347 }
348 if version >= 0 {
349 n += 1 + self
350 .user_endpoint
351 .as_ref()
352 .map_or(0, |v| v.encoded_len(version));
353 }
354 if version >= 0 {
355 n += {
356 let opt: Option<&Vec<_>> = (self.client_tags).as_ref();
357 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
358 opt.map(std::vec::Vec::len),
359 flex,
360 );
361 let body: usize =
362 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
363 prefix + body
364 };
365 }
366 if version >= 0 {
367 n += {
368 let opt: Option<&Vec<_>> = (self.task_offsets).as_ref();
369 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
370 opt.map(std::vec::Vec::len),
371 flex,
372 );
373 let body: usize =
374 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
375 prefix + body
376 };
377 }
378 if version >= 0 {
379 n += {
380 let opt: Option<&Vec<_>> = (self.task_end_offsets).as_ref();
381 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
382 opt.map(std::vec::Vec::len),
383 flex,
384 );
385 let body: usize =
386 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
387 prefix + body
388 };
389 }
390 if version >= 0 {
391 n += 1;
392 }
393 if flex {
394 let known_pairs: Vec<(u32, usize)> = Vec::new();
395 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
396 }
397 n
398 }
399}
400impl<'de> DecodeBorrow<'de> for StreamsGroupHeartbeatRequest<'de> {
401 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
402 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
403 return Err(ProtocolError::UnsupportedVersion {
404 api_key: API_KEY,
405 version,
406 });
407 }
408 let flex = is_flexible(version);
409 let mut out = Self::default();
410 if version >= 0 {
411 out.group_id = if flex {
412 get_compact_string_borrowed(buf)?
413 } else {
414 get_string_borrowed(buf)?
415 };
416 }
417 if version >= 0 {
418 out.member_id = if flex {
419 get_compact_string_borrowed(buf)?
420 } else {
421 get_string_borrowed(buf)?
422 };
423 }
424 if version >= 0 {
425 out.member_epoch = get_i32(buf)?;
426 }
427 if version >= 0 {
428 out.endpoint_information_epoch = get_i32(buf)?;
429 }
430 if version >= 0 {
431 out.instance_id = if flex {
432 get_compact_nullable_string_borrowed(buf)?
433 } else {
434 get_nullable_string_borrowed(buf)?
435 };
436 }
437 if version >= 0 {
438 out.rack_id = if flex {
439 get_compact_nullable_string_borrowed(buf)?
440 } else {
441 get_nullable_string_borrowed(buf)?
442 };
443 }
444 if version >= 0 {
445 out.rebalance_timeout_ms = get_i32(buf)?;
446 }
447 if version >= 0 {
448 out.topology = if get_i8(buf)? < 0 {
449 None
450 } else {
451 Some(Topology::decode_borrow(buf, version)?)
452 };
453 }
454 if version >= 0 {
455 out.active_tasks = {
456 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
457 match opt {
458 None => None,
459 Some(n) => {
460 let mut v = Vec::with_capacity(n);
461 for _ in 0..n {
462 v.push(
463 super::common::streams_group_heartbeat_request::task_ids::TaskIds::decode_borrow(
464 buf,
465 version,
466 )?,
467 );
468 }
469 Some(v)
470 }
471 }
472 };
473 }
474 if version >= 0 {
475 out.standby_tasks = {
476 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
477 match opt {
478 None => None,
479 Some(n) => {
480 let mut v = Vec::with_capacity(n);
481 for _ in 0..n {
482 v.push(
483 super::common::streams_group_heartbeat_request::task_ids::TaskIds::decode_borrow(
484 buf,
485 version,
486 )?,
487 );
488 }
489 Some(v)
490 }
491 }
492 };
493 }
494 if version >= 0 {
495 out.warmup_tasks = {
496 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
497 match opt {
498 None => None,
499 Some(n) => {
500 let mut v = Vec::with_capacity(n);
501 for _ in 0..n {
502 v.push(
503 super::common::streams_group_heartbeat_request::task_ids::TaskIds::decode_borrow(
504 buf,
505 version,
506 )?,
507 );
508 }
509 Some(v)
510 }
511 }
512 };
513 }
514 if version >= 0 {
515 out.process_id = if flex {
516 get_compact_nullable_string_borrowed(buf)?
517 } else {
518 get_nullable_string_borrowed(buf)?
519 };
520 }
521 if version >= 0 {
522 out.user_endpoint = if get_i8(buf)? < 0 {
523 None
524 } else {
525 Some(
526 super::common::streams_group_heartbeat_request::endpoint::Endpoint::decode_borrow(
527 buf,
528 version,
529 )?,
530 )
531 };
532 }
533 if version >= 0 {
534 out.client_tags = {
535 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
536 match opt {
537 None => None,
538 Some(n) => {
539 let mut v = Vec::with_capacity(n);
540 for _ in 0..n {
541 v.push(
542 super::common::streams_group_heartbeat_request::key_value::KeyValue::decode_borrow(
543 buf,
544 version,
545 )?,
546 );
547 }
548 Some(v)
549 }
550 }
551 };
552 }
553 if version >= 0 {
554 out.task_offsets = {
555 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
556 match opt {
557 None => None,
558 Some(n) => {
559 let mut v = Vec::with_capacity(n);
560 for _ in 0..n {
561 v.push(
562 super::common::streams_group_heartbeat_request::task_offset::TaskOffset::decode_borrow(
563 buf,
564 version,
565 )?,
566 );
567 }
568 Some(v)
569 }
570 }
571 };
572 }
573 if version >= 0 {
574 out.task_end_offsets = {
575 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
576 match opt {
577 None => None,
578 Some(n) => {
579 let mut v = Vec::with_capacity(n);
580 for _ in 0..n {
581 v.push(
582 super::common::streams_group_heartbeat_request::task_offset::TaskOffset::decode_borrow(
583 buf,
584 version,
585 )?,
586 );
587 }
588 Some(v)
589 }
590 }
591 };
592 }
593 if version >= 0 {
594 out.shutdown_application = get_bool(buf)?;
595 }
596 if flex {
597 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
598 }
599 Ok(out)
600 }
601}
602#[cfg(test)]
603impl StreamsGroupHeartbeatRequest<'_> {
604 #[must_use]
605 pub fn populated(version: i16) -> Self {
606 let mut m = Self::default();
607 if version >= 0 {
608 m.group_id = "x";
609 }
610 if version >= 0 {
611 m.member_id = "x";
612 }
613 if version >= 0 {
614 m.member_epoch = 1i32;
615 }
616 if version >= 0 {
617 m.endpoint_information_epoch = 1i32;
618 }
619 if version >= 0 {
620 m.instance_id = Some("x");
621 }
622 if version >= 0 {
623 m.rack_id = Some("x");
624 }
625 if version >= 0 {
626 m.rebalance_timeout_ms = 1i32;
627 }
628 if version >= 0 {
629 m.topology = Some(Topology::populated(version));
630 }
631 if version >= 0 {
632 m.active_tasks = Some(vec![
633 super::common::streams_group_heartbeat_request::task_ids::TaskIds::populated(
634 version,
635 ),
636 ]);
637 }
638 if version >= 0 {
639 m.standby_tasks = Some(vec![
640 super::common::streams_group_heartbeat_request::task_ids::TaskIds::populated(
641 version,
642 ),
643 ]);
644 }
645 if version >= 0 {
646 m.warmup_tasks = Some(vec![
647 super::common::streams_group_heartbeat_request::task_ids::TaskIds::populated(
648 version,
649 ),
650 ]);
651 }
652 if version >= 0 {
653 m.process_id = Some("x");
654 }
655 if version >= 0 {
656 m.user_endpoint = Some(
657 super::common::streams_group_heartbeat_request::endpoint::Endpoint::populated(
658 version,
659 ),
660 );
661 }
662 if version >= 0 {
663 m.client_tags = Some(vec![
664 super::common::streams_group_heartbeat_request::key_value::KeyValue::populated(
665 version,
666 ),
667 ]);
668 }
669 if version >= 0 {
670 m.task_offsets = Some(vec![
671 super::common::streams_group_heartbeat_request::task_offset::TaskOffset::populated(
672 version,
673 ),
674 ]);
675 }
676 if version >= 0 {
677 m.task_end_offsets = Some(vec![
678 super::common::streams_group_heartbeat_request::task_offset::TaskOffset::populated(
679 version,
680 ),
681 ]);
682 }
683 if version >= 0 {
684 m.shutdown_application = true;
685 }
686 m
687 }
688}
689#[derive(Debug, Clone, PartialEq, Eq, Default)]
690pub struct Topology<'a> {
691 pub epoch: i32,
692 pub subtopologies: Vec<Subtopology<'a>>,
693 pub unknown_tagged_fields: UnknownTaggedFields,
694}
695impl Topology<'_> {
696 pub fn to_owned(&self) -> crate::owned::streams_group_heartbeat_request::Topology {
697 crate::owned::streams_group_heartbeat_request::Topology {
698 epoch: (self.epoch),
699 subtopologies: (self.subtopologies)
700 .iter()
701 .map(Subtopology::to_owned)
702 .collect(),
703 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
704 }
705 }
706}
707impl Encode for Topology<'_> {
708 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
709 let flex = version >= 0;
710 if version >= 0 {
711 put_i32(buf, self.epoch);
712 }
713 if version >= 0 {
714 {
715 crate::primitives::array::put_array_len(buf, (self.subtopologies).len(), flex);
716 for it in &self.subtopologies {
717 it.encode(buf, version)?;
718 }
719 }
720 }
721 if flex {
722 let tagged = WriteTaggedFields::new();
723 tagged.write(buf, &self.unknown_tagged_fields);
724 }
725 Ok(())
726 }
727 fn encoded_len(&self, version: i16) -> usize {
728 let flex = version >= 0;
729 let mut n: usize = 0;
730 if version >= 0 {
731 n += 4;
732 }
733 if version >= 0 {
734 n += {
735 let prefix = crate::primitives::array::array_len_prefix_len(
736 (self.subtopologies).len(),
737 flex,
738 );
739 let body: usize = (self.subtopologies)
740 .iter()
741 .map(|it| it.encoded_len(version))
742 .sum();
743 prefix + body
744 };
745 }
746 if flex {
747 let known_pairs: Vec<(u32, usize)> = Vec::new();
748 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
749 }
750 n
751 }
752}
753impl<'de> DecodeBorrow<'de> for Topology<'de> {
754 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
755 let flex = version >= 0;
756 let mut out = Self::default();
757 if version >= 0 {
758 out.epoch = get_i32(buf)?;
759 }
760 if version >= 0 {
761 out.subtopologies = {
762 let n = crate::primitives::array::get_array_len(buf, flex)?;
763 let mut v = Vec::with_capacity(n);
764 for _ in 0..n {
765 v.push(Subtopology::decode_borrow(buf, version)?);
766 }
767 v
768 };
769 }
770 if flex {
771 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
772 }
773 Ok(out)
774 }
775}
776#[cfg(test)]
777impl Topology<'_> {
778 #[must_use]
779 pub fn populated(version: i16) -> Self {
780 let mut m = Self::default();
781 if version >= 0 {
782 m.epoch = 1i32;
783 }
784 if version >= 0 {
785 m.subtopologies = vec![Subtopology::populated(version)];
786 }
787 m
788 }
789}
790#[derive(Debug, Clone, PartialEq, Eq, Default)]
791pub struct Subtopology<'a> {
792 pub subtopology_id: &'a str,
793 pub source_topics: Vec<&'a str>,
794 pub source_topic_regex: Vec<&'a str>,
795 pub state_changelog_topics:
796 Vec<super::common::streams_group_heartbeat_request::topic_info::TopicInfo<'a>>,
797 pub repartition_sink_topics: Vec<&'a str>,
798 pub repartition_source_topics:
799 Vec<super::common::streams_group_heartbeat_request::topic_info::TopicInfo<'a>>,
800 pub copartition_groups: Vec<CopartitionGroup>,
801 pub unknown_tagged_fields: UnknownTaggedFields,
802}
803impl Subtopology<'_> {
804 pub fn to_owned(&self) -> crate::owned::streams_group_heartbeat_request::Subtopology {
805 crate::owned::streams_group_heartbeat_request::Subtopology {
806 subtopology_id: (self.subtopology_id).to_string(),
807 source_topics: (self.source_topics)
808 .iter()
809 .map(std::string::ToString::to_string)
810 .collect(),
811 source_topic_regex: (self.source_topic_regex)
812 .iter()
813 .map(std::string::ToString::to_string)
814 .collect(),
815 state_changelog_topics: (self.state_changelog_topics)
816 .iter()
817 .map(
818 super::common::streams_group_heartbeat_request::topic_info::TopicInfo::to_owned,
819 )
820 .collect(),
821 repartition_sink_topics: (self.repartition_sink_topics)
822 .iter()
823 .map(std::string::ToString::to_string)
824 .collect(),
825 repartition_source_topics: (self.repartition_source_topics)
826 .iter()
827 .map(
828 super::common::streams_group_heartbeat_request::topic_info::TopicInfo::to_owned,
829 )
830 .collect(),
831 copartition_groups: (self.copartition_groups)
832 .iter()
833 .map(CopartitionGroup::to_owned)
834 .collect(),
835 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
836 }
837 }
838}
839impl Encode for Subtopology<'_> {
840 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
841 let flex = version >= 0;
842 if version >= 0 {
843 if flex {
844 put_compact_string(buf, self.subtopology_id);
845 } else {
846 put_string(buf, self.subtopology_id);
847 }
848 }
849 if version >= 0 {
850 {
851 crate::primitives::array::put_array_len(buf, (self.source_topics).len(), flex);
852 for it in &self.source_topics {
853 if flex {
854 put_compact_string(buf, it);
855 } else {
856 put_string(buf, it);
857 }
858 }
859 }
860 }
861 if version >= 0 {
862 {
863 crate::primitives::array::put_array_len(buf, (self.source_topic_regex).len(), flex);
864 for it in &self.source_topic_regex {
865 if flex {
866 put_compact_string(buf, it);
867 } else {
868 put_string(buf, it);
869 }
870 }
871 }
872 }
873 if version >= 0 {
874 {
875 crate::primitives::array::put_array_len(
876 buf,
877 (self.state_changelog_topics).len(),
878 flex,
879 );
880 for it in &self.state_changelog_topics {
881 it.encode(buf, version)?;
882 }
883 }
884 }
885 if version >= 0 {
886 {
887 crate::primitives::array::put_array_len(
888 buf,
889 (self.repartition_sink_topics).len(),
890 flex,
891 );
892 for it in &self.repartition_sink_topics {
893 if flex {
894 put_compact_string(buf, it);
895 } else {
896 put_string(buf, it);
897 }
898 }
899 }
900 }
901 if version >= 0 {
902 {
903 crate::primitives::array::put_array_len(
904 buf,
905 (self.repartition_source_topics).len(),
906 flex,
907 );
908 for it in &self.repartition_source_topics {
909 it.encode(buf, version)?;
910 }
911 }
912 }
913 if version >= 0 {
914 {
915 crate::primitives::array::put_array_len(buf, (self.copartition_groups).len(), flex);
916 for it in &self.copartition_groups {
917 it.encode(buf, version)?;
918 }
919 }
920 }
921 if flex {
922 let tagged = WriteTaggedFields::new();
923 tagged.write(buf, &self.unknown_tagged_fields);
924 }
925 Ok(())
926 }
927 fn encoded_len(&self, version: i16) -> usize {
928 let flex = version >= 0;
929 let mut n: usize = 0;
930 if version >= 0 {
931 n += if flex {
932 compact_string_len(self.subtopology_id)
933 } else {
934 string_len(self.subtopology_id)
935 };
936 }
937 if version >= 0 {
938 n += {
939 let prefix = crate::primitives::array::array_len_prefix_len(
940 (self.source_topics).len(),
941 flex,
942 );
943 let body: usize = (self.source_topics)
944 .iter()
945 .map(|it| {
946 if flex {
947 compact_string_len(it)
948 } else {
949 string_len(it)
950 }
951 })
952 .sum();
953 prefix + body
954 };
955 }
956 if version >= 0 {
957 n += {
958 let prefix = crate::primitives::array::array_len_prefix_len(
959 (self.source_topic_regex).len(),
960 flex,
961 );
962 let body: usize = (self.source_topic_regex)
963 .iter()
964 .map(|it| {
965 if flex {
966 compact_string_len(it)
967 } else {
968 string_len(it)
969 }
970 })
971 .sum();
972 prefix + body
973 };
974 }
975 if version >= 0 {
976 n += {
977 let prefix = crate::primitives::array::array_len_prefix_len(
978 (self.state_changelog_topics).len(),
979 flex,
980 );
981 let body: usize = (self.state_changelog_topics)
982 .iter()
983 .map(|it| it.encoded_len(version))
984 .sum();
985 prefix + body
986 };
987 }
988 if version >= 0 {
989 n += {
990 let prefix = crate::primitives::array::array_len_prefix_len(
991 (self.repartition_sink_topics).len(),
992 flex,
993 );
994 let body: usize = (self.repartition_sink_topics)
995 .iter()
996 .map(|it| {
997 if flex {
998 compact_string_len(it)
999 } else {
1000 string_len(it)
1001 }
1002 })
1003 .sum();
1004 prefix + body
1005 };
1006 }
1007 if version >= 0 {
1008 n += {
1009 let prefix = crate::primitives::array::array_len_prefix_len(
1010 (self.repartition_source_topics).len(),
1011 flex,
1012 );
1013 let body: usize = (self.repartition_source_topics)
1014 .iter()
1015 .map(|it| it.encoded_len(version))
1016 .sum();
1017 prefix + body
1018 };
1019 }
1020 if version >= 0 {
1021 n += {
1022 let prefix = crate::primitives::array::array_len_prefix_len(
1023 (self.copartition_groups).len(),
1024 flex,
1025 );
1026 let body: usize = (self.copartition_groups)
1027 .iter()
1028 .map(|it| it.encoded_len(version))
1029 .sum();
1030 prefix + body
1031 };
1032 }
1033 if flex {
1034 let known_pairs: Vec<(u32, usize)> = Vec::new();
1035 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
1036 }
1037 n
1038 }
1039}
1040impl<'de> DecodeBorrow<'de> for Subtopology<'de> {
1041 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
1042 let flex = version >= 0;
1043 let mut out = Self::default();
1044 if version >= 0 {
1045 out.subtopology_id = if flex {
1046 get_compact_string_borrowed(buf)?
1047 } else {
1048 get_string_borrowed(buf)?
1049 };
1050 }
1051 if version >= 0 {
1052 out.source_topics = {
1053 let n = crate::primitives::array::get_array_len(buf, flex)?;
1054 let mut v = Vec::with_capacity(n);
1055 for _ in 0..n {
1056 v.push(if flex {
1057 get_compact_string_borrowed(buf)?
1058 } else {
1059 get_string_borrowed(buf)?
1060 });
1061 }
1062 v
1063 };
1064 }
1065 if version >= 0 {
1066 out.source_topic_regex = {
1067 let n = crate::primitives::array::get_array_len(buf, flex)?;
1068 let mut v = Vec::with_capacity(n);
1069 for _ in 0..n {
1070 v.push(if flex {
1071 get_compact_string_borrowed(buf)?
1072 } else {
1073 get_string_borrowed(buf)?
1074 });
1075 }
1076 v
1077 };
1078 }
1079 if version >= 0 {
1080 out.state_changelog_topics = {
1081 let n = crate::primitives::array::get_array_len(buf, flex)?;
1082 let mut v = Vec::with_capacity(n);
1083 for _ in 0..n {
1084 v.push(
1085 super::common::streams_group_heartbeat_request::topic_info::TopicInfo::decode_borrow(
1086 buf,
1087 version,
1088 )?,
1089 );
1090 }
1091 v
1092 };
1093 }
1094 if version >= 0 {
1095 out.repartition_sink_topics = {
1096 let n = crate::primitives::array::get_array_len(buf, flex)?;
1097 let mut v = Vec::with_capacity(n);
1098 for _ in 0..n {
1099 v.push(if flex {
1100 get_compact_string_borrowed(buf)?
1101 } else {
1102 get_string_borrowed(buf)?
1103 });
1104 }
1105 v
1106 };
1107 }
1108 if version >= 0 {
1109 out.repartition_source_topics = {
1110 let n = crate::primitives::array::get_array_len(buf, flex)?;
1111 let mut v = Vec::with_capacity(n);
1112 for _ in 0..n {
1113 v.push(
1114 super::common::streams_group_heartbeat_request::topic_info::TopicInfo::decode_borrow(
1115 buf,
1116 version,
1117 )?,
1118 );
1119 }
1120 v
1121 };
1122 }
1123 if version >= 0 {
1124 out.copartition_groups = {
1125 let n = crate::primitives::array::get_array_len(buf, flex)?;
1126 let mut v = Vec::with_capacity(n);
1127 for _ in 0..n {
1128 v.push(CopartitionGroup::decode_borrow(buf, version)?);
1129 }
1130 v
1131 };
1132 }
1133 if flex {
1134 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
1135 }
1136 Ok(out)
1137 }
1138}
1139#[cfg(test)]
1140impl Subtopology<'_> {
1141 #[must_use]
1142 pub fn populated(version: i16) -> Self {
1143 let mut m = Self::default();
1144 if version >= 0 {
1145 m.subtopology_id = "x";
1146 }
1147 if version >= 0 {
1148 m.source_topics = vec!["x"];
1149 }
1150 if version >= 0 {
1151 m.source_topic_regex = vec!["x"];
1152 }
1153 if version >= 0 {
1154 m.state_changelog_topics = vec![
1155 super::common::streams_group_heartbeat_request::topic_info::TopicInfo::populated(
1156 version,
1157 ),
1158 ];
1159 }
1160 if version >= 0 {
1161 m.repartition_sink_topics = vec!["x"];
1162 }
1163 if version >= 0 {
1164 m.repartition_source_topics = vec![
1165 super::common::streams_group_heartbeat_request::topic_info::TopicInfo::populated(
1166 version,
1167 ),
1168 ];
1169 }
1170 if version >= 0 {
1171 m.copartition_groups = vec![CopartitionGroup::populated(version)];
1172 }
1173 m
1174 }
1175}
1176#[derive(Debug, Clone, PartialEq, Eq, Default)]
1177pub struct CopartitionGroup {
1178 pub source_topics: Vec<i16>,
1179 pub source_topic_regex: Vec<i16>,
1180 pub repartition_source_topics: Vec<i16>,
1181 pub unknown_tagged_fields: UnknownTaggedFields,
1182}
1183impl CopartitionGroup {
1184 pub fn to_owned(&self) -> crate::owned::streams_group_heartbeat_request::CopartitionGroup {
1185 crate::owned::streams_group_heartbeat_request::CopartitionGroup {
1186 source_topics: (self.source_topics).clone(),
1187 source_topic_regex: (self.source_topic_regex).clone(),
1188 repartition_source_topics: (self.repartition_source_topics).clone(),
1189 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
1190 }
1191 }
1192}
1193impl Encode for CopartitionGroup {
1194 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
1195 let flex = version >= 0;
1196 if version >= 0 {
1197 {
1198 crate::primitives::array::put_array_len(buf, (self.source_topics).len(), flex);
1199 for it in &self.source_topics {
1200 put_i16(buf, *it);
1201 }
1202 }
1203 }
1204 if version >= 0 {
1205 {
1206 crate::primitives::array::put_array_len(buf, (self.source_topic_regex).len(), flex);
1207 for it in &self.source_topic_regex {
1208 put_i16(buf, *it);
1209 }
1210 }
1211 }
1212 if version >= 0 {
1213 {
1214 crate::primitives::array::put_array_len(
1215 buf,
1216 (self.repartition_source_topics).len(),
1217 flex,
1218 );
1219 for it in &self.repartition_source_topics {
1220 put_i16(buf, *it);
1221 }
1222 }
1223 }
1224 if flex {
1225 let tagged = WriteTaggedFields::new();
1226 tagged.write(buf, &self.unknown_tagged_fields);
1227 }
1228 Ok(())
1229 }
1230 fn encoded_len(&self, version: i16) -> usize {
1231 let flex = version >= 0;
1232 let mut n: usize = 0;
1233 if version >= 0 {
1234 n += {
1235 let prefix = crate::primitives::array::array_len_prefix_len(
1236 (self.source_topics).len(),
1237 flex,
1238 );
1239 let body: usize = (self.source_topics).iter().map(|_| 2).sum();
1240 prefix + body
1241 };
1242 }
1243 if version >= 0 {
1244 n += {
1245 let prefix = crate::primitives::array::array_len_prefix_len(
1246 (self.source_topic_regex).len(),
1247 flex,
1248 );
1249 let body: usize = (self.source_topic_regex).iter().map(|_| 2).sum();
1250 prefix + body
1251 };
1252 }
1253 if version >= 0 {
1254 n += {
1255 let prefix = crate::primitives::array::array_len_prefix_len(
1256 (self.repartition_source_topics).len(),
1257 flex,
1258 );
1259 let body: usize = (self.repartition_source_topics).iter().map(|_| 2).sum();
1260 prefix + body
1261 };
1262 }
1263 if flex {
1264 let known_pairs: Vec<(u32, usize)> = Vec::new();
1265 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
1266 }
1267 n
1268 }
1269}
1270impl<'de> DecodeBorrow<'de> for CopartitionGroup {
1271 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
1272 let flex = version >= 0;
1273 let mut out = Self::default();
1274 if version >= 0 {
1275 out.source_topics = {
1276 let n = crate::primitives::array::get_array_len(buf, flex)?;
1277 let mut v = Vec::with_capacity(n);
1278 for _ in 0..n {
1279 v.push(get_i16(buf)?);
1280 }
1281 v
1282 };
1283 }
1284 if version >= 0 {
1285 out.source_topic_regex = {
1286 let n = crate::primitives::array::get_array_len(buf, flex)?;
1287 let mut v = Vec::with_capacity(n);
1288 for _ in 0..n {
1289 v.push(get_i16(buf)?);
1290 }
1291 v
1292 };
1293 }
1294 if version >= 0 {
1295 out.repartition_source_topics = {
1296 let n = crate::primitives::array::get_array_len(buf, flex)?;
1297 let mut v = Vec::with_capacity(n);
1298 for _ in 0..n {
1299 v.push(get_i16(buf)?);
1300 }
1301 v
1302 };
1303 }
1304 if flex {
1305 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
1306 }
1307 Ok(out)
1308 }
1309}
1310#[cfg(test)]
1311impl CopartitionGroup {
1312 #[must_use]
1313 pub fn populated(version: i16) -> Self {
1314 let mut m = Self::default();
1315 if version >= 0 {
1316 m.source_topics = vec![1i16];
1317 }
1318 if version >= 0 {
1319 m.source_topic_regex = vec![1i16];
1320 }
1321 if version >= 0 {
1322 m.repartition_source_topics = vec![1i16];
1323 }
1324 m
1325 }
1326}