1use crate::primitives::fixed::{get_i16, get_i32, put_i16, put_i32};
3use crate::primitives::string_bytes::{
4 compact_nullable_string_len, compact_string_len, nullable_string_len,
5 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
6};
7use crate::primitives::string_bytes_borrowed::{
8 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
9 get_nullable_string_borrowed, get_string_borrowed,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
13use bytes::BufMut;
14pub const API_KEY: i16 = 88;
15pub const MIN_VERSION: i16 = 0;
16pub const MAX_VERSION: i16 = 0;
17pub const FLEXIBLE_MIN: i16 = 0;
18#[inline]
19fn is_flexible(version: i16) -> bool {
20 version >= FLEXIBLE_MIN
21}
22#[derive(Debug, Clone, PartialEq, Eq, Default)]
23pub struct StreamsGroupHeartbeatResponse<'a> {
24 pub throttle_time_ms: i32,
25 pub error_code: i16,
26 pub error_message: Option<&'a str>,
27 pub member_id: &'a str,
28 pub member_epoch: i32,
29 pub heartbeat_interval_ms: i32,
30 pub acceptable_recovery_lag: i32,
31 pub task_offset_interval_ms: i32,
32 pub status: Option<Vec<super::common::streams_group_heartbeat_response::status::Status<'a>>>,
33 pub active_tasks:
34 Option<Vec<super::common::streams_group_heartbeat_response::task_ids::TaskIds<'a>>>,
35 pub standby_tasks:
36 Option<Vec<super::common::streams_group_heartbeat_response::task_ids::TaskIds<'a>>>,
37 pub warmup_tasks:
38 Option<Vec<super::common::streams_group_heartbeat_response::task_ids::TaskIds<'a>>>,
39 pub endpoint_information_epoch: i32,
40 pub partitions_by_user_endpoint: Option<Vec<EndpointToPartitions<'a>>>,
41 pub unknown_tagged_fields: UnknownTaggedFields,
42}
43impl StreamsGroupHeartbeatResponse<'_> {
44 pub fn to_owned(
45 &self,
46 ) -> crate::owned::streams_group_heartbeat_response::StreamsGroupHeartbeatResponse {
47 crate::owned::streams_group_heartbeat_response::StreamsGroupHeartbeatResponse {
48 throttle_time_ms: (self.throttle_time_ms),
49 error_code: (self.error_code),
50 error_message: (self.error_message).map(std::string::ToString::to_string),
51 member_id: (self.member_id).to_string(),
52 member_epoch: (self.member_epoch),
53 heartbeat_interval_ms: (self.heartbeat_interval_ms),
54 acceptable_recovery_lag: (self.acceptable_recovery_lag),
55 task_offset_interval_ms: (self.task_offset_interval_ms),
56 status: (self.status)
57 .as_ref()
58 .map(|v| v.iter().map(super::common::streams_group_heartbeat_response::status::Status::to_owned).collect()),
59 active_tasks: (self.active_tasks)
60 .as_ref()
61 .map(|v| v.iter().map(super::common::streams_group_heartbeat_response::task_ids::TaskIds::to_owned).collect()),
62 standby_tasks: (self.standby_tasks)
63 .as_ref()
64 .map(|v| v.iter().map(super::common::streams_group_heartbeat_response::task_ids::TaskIds::to_owned).collect()),
65 warmup_tasks: (self.warmup_tasks)
66 .as_ref()
67 .map(|v| v.iter().map(super::common::streams_group_heartbeat_response::task_ids::TaskIds::to_owned).collect()),
68 endpoint_information_epoch: (self.endpoint_information_epoch),
69 partitions_by_user_endpoint: (self.partitions_by_user_endpoint)
70 .as_ref()
71 .map(|v| v.iter().map(EndpointToPartitions::to_owned).collect()),
72 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
73 }
74 }
75}
76impl Encode for StreamsGroupHeartbeatResponse<'_> {
77 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
78 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
79 return Err(ProtocolError::UnsupportedVersion {
80 api_key: API_KEY,
81 version,
82 });
83 }
84 let flex = is_flexible(version);
85 if version >= 0 {
86 put_i32(buf, self.throttle_time_ms);
87 }
88 if version >= 0 {
89 put_i16(buf, self.error_code);
90 }
91 if version >= 0 {
92 if flex {
93 put_compact_nullable_string(buf, self.error_message);
94 } else {
95 put_nullable_string(buf, self.error_message);
96 }
97 }
98 if version >= 0 {
99 if flex {
100 put_compact_string(buf, self.member_id);
101 } else {
102 put_string(buf, self.member_id);
103 }
104 }
105 if version >= 0 {
106 put_i32(buf, self.member_epoch);
107 }
108 if version >= 0 {
109 put_i32(buf, self.heartbeat_interval_ms);
110 }
111 if version >= 0 {
112 put_i32(buf, self.acceptable_recovery_lag);
113 }
114 if version >= 0 {
115 put_i32(buf, self.task_offset_interval_ms);
116 }
117 if version >= 0 {
118 {
119 let len = (self.status).as_ref().map(Vec::len);
120 crate::primitives::array::put_nullable_array_len(buf, len, flex);
121 if let Some(v) = &self.status {
122 for it in v {
123 it.encode(buf, version)?;
124 }
125 }
126 }
127 }
128 if version >= 0 {
129 {
130 let len = (self.active_tasks).as_ref().map(Vec::len);
131 crate::primitives::array::put_nullable_array_len(buf, len, flex);
132 if let Some(v) = &self.active_tasks {
133 for it in v {
134 it.encode(buf, version)?;
135 }
136 }
137 }
138 }
139 if version >= 0 {
140 {
141 let len = (self.standby_tasks).as_ref().map(Vec::len);
142 crate::primitives::array::put_nullable_array_len(buf, len, flex);
143 if let Some(v) = &self.standby_tasks {
144 for it in v {
145 it.encode(buf, version)?;
146 }
147 }
148 }
149 }
150 if version >= 0 {
151 {
152 let len = (self.warmup_tasks).as_ref().map(Vec::len);
153 crate::primitives::array::put_nullable_array_len(buf, len, flex);
154 if let Some(v) = &self.warmup_tasks {
155 for it in v {
156 it.encode(buf, version)?;
157 }
158 }
159 }
160 }
161 if version >= 0 {
162 put_i32(buf, self.endpoint_information_epoch);
163 }
164 if version >= 0 {
165 {
166 let len = (self.partitions_by_user_endpoint).as_ref().map(Vec::len);
167 crate::primitives::array::put_nullable_array_len(buf, len, flex);
168 if let Some(v) = &self.partitions_by_user_endpoint {
169 for it in v {
170 it.encode(buf, version)?;
171 }
172 }
173 }
174 }
175 if flex {
176 let tagged = WriteTaggedFields::new();
177 tagged.write(buf, &self.unknown_tagged_fields);
178 }
179 Ok(())
180 }
181 fn encoded_len(&self, version: i16) -> usize {
182 let flex = is_flexible(version);
183 let mut n: usize = 0;
184 if version >= 0 {
185 n += 4;
186 }
187 if version >= 0 {
188 n += 2;
189 }
190 if version >= 0 {
191 n += if flex {
192 compact_nullable_string_len(self.error_message)
193 } else {
194 nullable_string_len(self.error_message)
195 };
196 }
197 if version >= 0 {
198 n += if flex {
199 compact_string_len(self.member_id)
200 } else {
201 string_len(self.member_id)
202 };
203 }
204 if version >= 0 {
205 n += 4;
206 }
207 if version >= 0 {
208 n += 4;
209 }
210 if version >= 0 {
211 n += 4;
212 }
213 if version >= 0 {
214 n += 4;
215 }
216 if version >= 0 {
217 n += {
218 let opt: Option<&Vec<_>> = (self.status).as_ref();
219 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
220 opt.map(std::vec::Vec::len),
221 flex,
222 );
223 let body: usize =
224 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
225 prefix + body
226 };
227 }
228 if version >= 0 {
229 n += {
230 let opt: Option<&Vec<_>> = (self.active_tasks).as_ref();
231 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
232 opt.map(std::vec::Vec::len),
233 flex,
234 );
235 let body: usize =
236 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
237 prefix + body
238 };
239 }
240 if version >= 0 {
241 n += {
242 let opt: Option<&Vec<_>> = (self.standby_tasks).as_ref();
243 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
244 opt.map(std::vec::Vec::len),
245 flex,
246 );
247 let body: usize =
248 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
249 prefix + body
250 };
251 }
252 if version >= 0 {
253 n += {
254 let opt: Option<&Vec<_>> = (self.warmup_tasks).as_ref();
255 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
256 opt.map(std::vec::Vec::len),
257 flex,
258 );
259 let body: usize =
260 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
261 prefix + body
262 };
263 }
264 if version >= 0 {
265 n += 4;
266 }
267 if version >= 0 {
268 n += {
269 let opt: Option<&Vec<_>> = (self.partitions_by_user_endpoint).as_ref();
270 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
271 opt.map(std::vec::Vec::len),
272 flex,
273 );
274 let body: usize =
275 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
276 prefix + body
277 };
278 }
279 if flex {
280 let known_pairs: Vec<(u32, usize)> = Vec::new();
281 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
282 }
283 n
284 }
285}
286impl<'de> DecodeBorrow<'de> for StreamsGroupHeartbeatResponse<'de> {
287 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
288 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
289 return Err(ProtocolError::UnsupportedVersion {
290 api_key: API_KEY,
291 version,
292 });
293 }
294 let flex = is_flexible(version);
295 let mut out = Self::default();
296 if version >= 0 {
297 out.throttle_time_ms = get_i32(buf)?;
298 }
299 if version >= 0 {
300 out.error_code = get_i16(buf)?;
301 }
302 if version >= 0 {
303 out.error_message = if flex {
304 get_compact_nullable_string_borrowed(buf)?
305 } else {
306 get_nullable_string_borrowed(buf)?
307 };
308 }
309 if version >= 0 {
310 out.member_id = if flex {
311 get_compact_string_borrowed(buf)?
312 } else {
313 get_string_borrowed(buf)?
314 };
315 }
316 if version >= 0 {
317 out.member_epoch = get_i32(buf)?;
318 }
319 if version >= 0 {
320 out.heartbeat_interval_ms = get_i32(buf)?;
321 }
322 if version >= 0 {
323 out.acceptable_recovery_lag = get_i32(buf)?;
324 }
325 if version >= 0 {
326 out.task_offset_interval_ms = get_i32(buf)?;
327 }
328 if version >= 0 {
329 out.status = {
330 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
331 match opt {
332 None => None,
333 Some(n) => {
334 let mut v = Vec::with_capacity(n);
335 for _ in 0..n {
336 v.push(
337 super::common::streams_group_heartbeat_response::status::Status::decode_borrow(
338 buf,
339 version,
340 )?,
341 );
342 }
343 Some(v)
344 }
345 }
346 };
347 }
348 if version >= 0 {
349 out.active_tasks = {
350 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
351 match opt {
352 None => None,
353 Some(n) => {
354 let mut v = Vec::with_capacity(n);
355 for _ in 0..n {
356 v.push(
357 super::common::streams_group_heartbeat_response::task_ids::TaskIds::decode_borrow(
358 buf,
359 version,
360 )?,
361 );
362 }
363 Some(v)
364 }
365 }
366 };
367 }
368 if version >= 0 {
369 out.standby_tasks = {
370 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
371 match opt {
372 None => None,
373 Some(n) => {
374 let mut v = Vec::with_capacity(n);
375 for _ in 0..n {
376 v.push(
377 super::common::streams_group_heartbeat_response::task_ids::TaskIds::decode_borrow(
378 buf,
379 version,
380 )?,
381 );
382 }
383 Some(v)
384 }
385 }
386 };
387 }
388 if version >= 0 {
389 out.warmup_tasks = {
390 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
391 match opt {
392 None => None,
393 Some(n) => {
394 let mut v = Vec::with_capacity(n);
395 for _ in 0..n {
396 v.push(
397 super::common::streams_group_heartbeat_response::task_ids::TaskIds::decode_borrow(
398 buf,
399 version,
400 )?,
401 );
402 }
403 Some(v)
404 }
405 }
406 };
407 }
408 if version >= 0 {
409 out.endpoint_information_epoch = get_i32(buf)?;
410 }
411 if version >= 0 {
412 out.partitions_by_user_endpoint = {
413 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
414 match opt {
415 None => None,
416 Some(n) => {
417 let mut v = Vec::with_capacity(n);
418 for _ in 0..n {
419 v.push(EndpointToPartitions::decode_borrow(buf, version)?);
420 }
421 Some(v)
422 }
423 }
424 };
425 }
426 if flex {
427 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
428 }
429 Ok(out)
430 }
431}
432#[cfg(test)]
433impl StreamsGroupHeartbeatResponse<'_> {
434 #[must_use]
435 pub fn populated(version: i16) -> Self {
436 let mut m = Self::default();
437 if version >= 0 {
438 m.throttle_time_ms = 1i32;
439 }
440 if version >= 0 {
441 m.error_code = 1i16;
442 }
443 if version >= 0 {
444 m.error_message = Some("x");
445 }
446 if version >= 0 {
447 m.member_id = "x";
448 }
449 if version >= 0 {
450 m.member_epoch = 1i32;
451 }
452 if version >= 0 {
453 m.heartbeat_interval_ms = 1i32;
454 }
455 if version >= 0 {
456 m.acceptable_recovery_lag = 1i32;
457 }
458 if version >= 0 {
459 m.task_offset_interval_ms = 1i32;
460 }
461 if version >= 0 {
462 m.status = Some(vec![
463 super::common::streams_group_heartbeat_response::status::Status::populated(version),
464 ]);
465 }
466 if version >= 0 {
467 m.active_tasks = Some(vec![
468 super::common::streams_group_heartbeat_response::task_ids::TaskIds::populated(
469 version,
470 ),
471 ]);
472 }
473 if version >= 0 {
474 m.standby_tasks = Some(vec![
475 super::common::streams_group_heartbeat_response::task_ids::TaskIds::populated(
476 version,
477 ),
478 ]);
479 }
480 if version >= 0 {
481 m.warmup_tasks = Some(vec![
482 super::common::streams_group_heartbeat_response::task_ids::TaskIds::populated(
483 version,
484 ),
485 ]);
486 }
487 if version >= 0 {
488 m.endpoint_information_epoch = 1i32;
489 }
490 if version >= 0 {
491 m.partitions_by_user_endpoint = Some(vec![EndpointToPartitions::populated(version)]);
492 }
493 m
494 }
495}
496#[derive(Debug, Clone, PartialEq, Eq, Default)]
497pub struct EndpointToPartitions<'a> {
498 pub user_endpoint: super::common::streams_group_heartbeat_response::endpoint::Endpoint<'a>,
499 pub active_partitions:
500 Vec<super::common::streams_group_heartbeat_response::topic_partition::TopicPartition<'a>>,
501 pub standby_partitions:
502 Vec<super::common::streams_group_heartbeat_response::topic_partition::TopicPartition<'a>>,
503 pub unknown_tagged_fields: UnknownTaggedFields,
504}
505impl EndpointToPartitions<'_> {
506 pub fn to_owned(&self) -> crate::owned::streams_group_heartbeat_response::EndpointToPartitions {
507 crate::owned::streams_group_heartbeat_response::EndpointToPartitions {
508 user_endpoint: (self.user_endpoint).to_owned(),
509 active_partitions: (self.active_partitions)
510 .iter()
511 .map(super::common::streams_group_heartbeat_response::topic_partition::TopicPartition::to_owned)
512 .collect(),
513 standby_partitions: (self.standby_partitions)
514 .iter()
515 .map(super::common::streams_group_heartbeat_response::topic_partition::TopicPartition::to_owned)
516 .collect(),
517 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
518 }
519 }
520}
521impl Encode for EndpointToPartitions<'_> {
522 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
523 let flex = version >= 0;
524 if version >= 0 {
525 self.user_endpoint.encode(buf, version)?;
526 }
527 if version >= 0 {
528 {
529 crate::primitives::array::put_array_len(buf, (self.active_partitions).len(), flex);
530 for it in &self.active_partitions {
531 it.encode(buf, version)?;
532 }
533 }
534 }
535 if version >= 0 {
536 {
537 crate::primitives::array::put_array_len(buf, (self.standby_partitions).len(), flex);
538 for it in &self.standby_partitions {
539 it.encode(buf, version)?;
540 }
541 }
542 }
543 if flex {
544 let tagged = WriteTaggedFields::new();
545 tagged.write(buf, &self.unknown_tagged_fields);
546 }
547 Ok(())
548 }
549 fn encoded_len(&self, version: i16) -> usize {
550 let flex = version >= 0;
551 let mut n: usize = 0;
552 if version >= 0 {
553 n += self.user_endpoint.encoded_len(version);
554 }
555 if version >= 0 {
556 n += {
557 let prefix = crate::primitives::array::array_len_prefix_len(
558 (self.active_partitions).len(),
559 flex,
560 );
561 let body: usize = (self.active_partitions)
562 .iter()
563 .map(|it| it.encoded_len(version))
564 .sum();
565 prefix + body
566 };
567 }
568 if version >= 0 {
569 n += {
570 let prefix = crate::primitives::array::array_len_prefix_len(
571 (self.standby_partitions).len(),
572 flex,
573 );
574 let body: usize = (self.standby_partitions)
575 .iter()
576 .map(|it| it.encoded_len(version))
577 .sum();
578 prefix + body
579 };
580 }
581 if flex {
582 let known_pairs: Vec<(u32, usize)> = Vec::new();
583 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
584 }
585 n
586 }
587}
588impl<'de> DecodeBorrow<'de> for EndpointToPartitions<'de> {
589 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
590 let flex = version >= 0;
591 let mut out = Self::default();
592 if version >= 0 {
593 out.user_endpoint =
594 super::common::streams_group_heartbeat_response::endpoint::Endpoint::decode_borrow(
595 buf, version,
596 )?;
597 }
598 if version >= 0 {
599 out.active_partitions = {
600 let n = crate::primitives::array::get_array_len(buf, flex)?;
601 let mut v = Vec::with_capacity(n);
602 for _ in 0..n {
603 v.push(
604 super::common::streams_group_heartbeat_response::topic_partition::TopicPartition::decode_borrow(
605 buf,
606 version,
607 )?,
608 );
609 }
610 v
611 };
612 }
613 if version >= 0 {
614 out.standby_partitions = {
615 let n = crate::primitives::array::get_array_len(buf, flex)?;
616 let mut v = Vec::with_capacity(n);
617 for _ in 0..n {
618 v.push(
619 super::common::streams_group_heartbeat_response::topic_partition::TopicPartition::decode_borrow(
620 buf,
621 version,
622 )?,
623 );
624 }
625 v
626 };
627 }
628 if flex {
629 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
630 }
631 Ok(out)
632 }
633}
634#[cfg(test)]
635impl EndpointToPartitions<'_> {
636 #[must_use]
637 pub fn populated(version: i16) -> Self {
638 let mut m = Self::default();
639 if version >= 0 {
640 m.user_endpoint =
641 super::common::streams_group_heartbeat_response::endpoint::Endpoint::populated(
642 version,
643 );
644 }
645 if version >= 0 {
646 m.active_partitions = vec![
647 super::common::streams_group_heartbeat_response::topic_partition::TopicPartition::populated(version)
648 ];
649 }
650 if version >= 0 {
651 m.standby_partitions = vec![
652 super::common::streams_group_heartbeat_response::topic_partition::TopicPartition::populated(version)
653 ];
654 }
655 m
656 }
657}