1use crate::primitives::fixed::{get_i16, get_i32, put_i16, put_i32};
4use crate::primitives::string_bytes::{
5 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
6 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
7 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
8};
9use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
10use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
11use bytes::{Buf, BufMut};
12pub const API_KEY: i16 = 88;
13pub const MIN_VERSION: i16 = 0;
14pub const MAX_VERSION: i16 = 0;
15pub const FLEXIBLE_MIN: i16 = 0;
16#[inline]
17fn is_flexible(version: i16) -> bool {
18 version >= FLEXIBLE_MIN
19}
20#[derive(Debug, Clone, PartialEq, Eq, Default)]
21pub struct StreamsGroupHeartbeatResponse {
22 pub throttle_time_ms: i32,
23 pub error_code: i16,
24 pub error_message: Option<String>,
25 pub member_id: String,
26 pub member_epoch: i32,
27 pub heartbeat_interval_ms: i32,
28 pub acceptable_recovery_lag: i32,
29 pub task_offset_interval_ms: i32,
30 pub status: Option<Vec<super::common::streams_group_heartbeat_response::status::Status>>,
31 pub active_tasks:
32 Option<Vec<super::common::streams_group_heartbeat_response::task_ids::TaskIds>>,
33 pub standby_tasks:
34 Option<Vec<super::common::streams_group_heartbeat_response::task_ids::TaskIds>>,
35 pub warmup_tasks:
36 Option<Vec<super::common::streams_group_heartbeat_response::task_ids::TaskIds>>,
37 pub endpoint_information_epoch: i32,
38 pub partitions_by_user_endpoint: Option<Vec<EndpointToPartitions>>,
39 pub unknown_tagged_fields: UnknownTaggedFields,
40}
41impl Encode for StreamsGroupHeartbeatResponse {
42 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
43 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
44 return Err(ProtocolError::UnsupportedVersion {
45 api_key: API_KEY,
46 version,
47 });
48 }
49 let flex = is_flexible(version);
50 if version >= 0 {
51 put_i32(buf, self.throttle_time_ms);
52 }
53 if version >= 0 {
54 put_i16(buf, self.error_code);
55 }
56 if version >= 0 {
57 if flex {
58 put_compact_nullable_string(buf, self.error_message.as_deref());
59 } else {
60 put_nullable_string(buf, self.error_message.as_deref());
61 }
62 }
63 if version >= 0 {
64 if flex {
65 put_compact_string(buf, &self.member_id);
66 } else {
67 put_string(buf, &self.member_id);
68 }
69 }
70 if version >= 0 {
71 put_i32(buf, self.member_epoch);
72 }
73 if version >= 0 {
74 put_i32(buf, self.heartbeat_interval_ms);
75 }
76 if version >= 0 {
77 put_i32(buf, self.acceptable_recovery_lag);
78 }
79 if version >= 0 {
80 put_i32(buf, self.task_offset_interval_ms);
81 }
82 if version >= 0 {
83 {
84 let len = (self.status).as_ref().map(Vec::len);
85 crate::primitives::array::put_nullable_array_len(buf, len, flex);
86 if let Some(v) = &self.status {
87 for it in v {
88 it.encode(buf, version)?;
89 }
90 }
91 }
92 }
93 if version >= 0 {
94 {
95 let len = (self.active_tasks).as_ref().map(Vec::len);
96 crate::primitives::array::put_nullable_array_len(buf, len, flex);
97 if let Some(v) = &self.active_tasks {
98 for it in v {
99 it.encode(buf, version)?;
100 }
101 }
102 }
103 }
104 if version >= 0 {
105 {
106 let len = (self.standby_tasks).as_ref().map(Vec::len);
107 crate::primitives::array::put_nullable_array_len(buf, len, flex);
108 if let Some(v) = &self.standby_tasks {
109 for it in v {
110 it.encode(buf, version)?;
111 }
112 }
113 }
114 }
115 if version >= 0 {
116 {
117 let len = (self.warmup_tasks).as_ref().map(Vec::len);
118 crate::primitives::array::put_nullable_array_len(buf, len, flex);
119 if let Some(v) = &self.warmup_tasks {
120 for it in v {
121 it.encode(buf, version)?;
122 }
123 }
124 }
125 }
126 if version >= 0 {
127 put_i32(buf, self.endpoint_information_epoch);
128 }
129 if version >= 0 {
130 {
131 let len = (self.partitions_by_user_endpoint).as_ref().map(Vec::len);
132 crate::primitives::array::put_nullable_array_len(buf, len, flex);
133 if let Some(v) = &self.partitions_by_user_endpoint {
134 for it in v {
135 it.encode(buf, version)?;
136 }
137 }
138 }
139 }
140 if flex {
141 let tagged = WriteTaggedFields::new();
142 tagged.write(buf, &self.unknown_tagged_fields);
143 }
144 Ok(())
145 }
146 fn encoded_len(&self, version: i16) -> usize {
147 let flex = is_flexible(version);
148 let mut n: usize = 0;
149 if version >= 0 {
150 n += 4;
151 }
152 if version >= 0 {
153 n += 2;
154 }
155 if version >= 0 {
156 n += if flex {
157 compact_nullable_string_len(self.error_message.as_deref())
158 } else {
159 nullable_string_len(self.error_message.as_deref())
160 };
161 }
162 if version >= 0 {
163 n += if flex {
164 compact_string_len(&self.member_id)
165 } else {
166 string_len(&self.member_id)
167 };
168 }
169 if version >= 0 {
170 n += 4;
171 }
172 if version >= 0 {
173 n += 4;
174 }
175 if version >= 0 {
176 n += 4;
177 }
178 if version >= 0 {
179 n += 4;
180 }
181 if version >= 0 {
182 n += {
183 let opt: Option<&Vec<_>> = (self.status).as_ref();
184 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
185 opt.map(std::vec::Vec::len),
186 flex,
187 );
188 let body: usize =
189 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
190 prefix + body
191 };
192 }
193 if version >= 0 {
194 n += {
195 let opt: Option<&Vec<_>> = (self.active_tasks).as_ref();
196 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
197 opt.map(std::vec::Vec::len),
198 flex,
199 );
200 let body: usize =
201 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
202 prefix + body
203 };
204 }
205 if version >= 0 {
206 n += {
207 let opt: Option<&Vec<_>> = (self.standby_tasks).as_ref();
208 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
209 opt.map(std::vec::Vec::len),
210 flex,
211 );
212 let body: usize =
213 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
214 prefix + body
215 };
216 }
217 if version >= 0 {
218 n += {
219 let opt: Option<&Vec<_>> = (self.warmup_tasks).as_ref();
220 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
221 opt.map(std::vec::Vec::len),
222 flex,
223 );
224 let body: usize =
225 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
226 prefix + body
227 };
228 }
229 if version >= 0 {
230 n += 4;
231 }
232 if version >= 0 {
233 n += {
234 let opt: Option<&Vec<_>> = (self.partitions_by_user_endpoint).as_ref();
235 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
236 opt.map(std::vec::Vec::len),
237 flex,
238 );
239 let body: usize =
240 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
241 prefix + body
242 };
243 }
244 if flex {
245 let known_pairs: Vec<(u32, usize)> = Vec::new();
246 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
247 }
248 n
249 }
250}
251impl Decode<'_> for StreamsGroupHeartbeatResponse {
252 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
253 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
254 return Err(ProtocolError::UnsupportedVersion {
255 api_key: API_KEY,
256 version,
257 });
258 }
259 let flex = is_flexible(version);
260 let mut out = Self::default();
261 if version >= 0 {
262 out.throttle_time_ms = get_i32(buf)?;
263 }
264 if version >= 0 {
265 out.error_code = get_i16(buf)?;
266 }
267 if version >= 0 {
268 out.error_message = if flex {
269 get_compact_nullable_string_owned(buf)?
270 } else {
271 get_nullable_string_owned(buf)?
272 };
273 }
274 if version >= 0 {
275 out.member_id = if flex {
276 get_compact_string_owned(buf)?
277 } else {
278 get_string_owned(buf)?
279 };
280 }
281 if version >= 0 {
282 out.member_epoch = get_i32(buf)?;
283 }
284 if version >= 0 {
285 out.heartbeat_interval_ms = get_i32(buf)?;
286 }
287 if version >= 0 {
288 out.acceptable_recovery_lag = get_i32(buf)?;
289 }
290 if version >= 0 {
291 out.task_offset_interval_ms = get_i32(buf)?;
292 }
293 if version >= 0 {
294 out.status = {
295 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
296 match opt {
297 None => None,
298 Some(n) => {
299 let mut v = Vec::with_capacity(n);
300 for _ in 0..n {
301 v.push(
302 super::common::streams_group_heartbeat_response::status::Status::decode(
303 buf,
304 version,
305 )?,
306 );
307 }
308 Some(v)
309 }
310 }
311 };
312 }
313 if version >= 0 {
314 out.active_tasks = {
315 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
316 match opt {
317 None => None,
318 Some(n) => {
319 let mut v = Vec::with_capacity(n);
320 for _ in 0..n {
321 v.push(
322 super::common::streams_group_heartbeat_response::task_ids::TaskIds::decode(
323 buf,
324 version,
325 )?,
326 );
327 }
328 Some(v)
329 }
330 }
331 };
332 }
333 if version >= 0 {
334 out.standby_tasks = {
335 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
336 match opt {
337 None => None,
338 Some(n) => {
339 let mut v = Vec::with_capacity(n);
340 for _ in 0..n {
341 v.push(
342 super::common::streams_group_heartbeat_response::task_ids::TaskIds::decode(
343 buf,
344 version,
345 )?,
346 );
347 }
348 Some(v)
349 }
350 }
351 };
352 }
353 if version >= 0 {
354 out.warmup_tasks = {
355 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
356 match opt {
357 None => None,
358 Some(n) => {
359 let mut v = Vec::with_capacity(n);
360 for _ in 0..n {
361 v.push(
362 super::common::streams_group_heartbeat_response::task_ids::TaskIds::decode(
363 buf,
364 version,
365 )?,
366 );
367 }
368 Some(v)
369 }
370 }
371 };
372 }
373 if version >= 0 {
374 out.endpoint_information_epoch = get_i32(buf)?;
375 }
376 if version >= 0 {
377 out.partitions_by_user_endpoint = {
378 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
379 match opt {
380 None => None,
381 Some(n) => {
382 let mut v = Vec::with_capacity(n);
383 for _ in 0..n {
384 v.push(EndpointToPartitions::decode(buf, version)?);
385 }
386 Some(v)
387 }
388 }
389 };
390 }
391 if flex {
392 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
393 }
394 Ok(out)
395 }
396}
397#[cfg(test)]
398impl StreamsGroupHeartbeatResponse {
399 #[must_use]
400 pub fn populated(version: i16) -> Self {
401 let mut m = Self::default();
402 if version >= 0 {
403 m.throttle_time_ms = 1i32;
404 }
405 if version >= 0 {
406 m.error_code = 1i16;
407 }
408 if version >= 0 {
409 m.error_message = Some("x".to_string());
410 }
411 if version >= 0 {
412 m.member_id = "x".to_string();
413 }
414 if version >= 0 {
415 m.member_epoch = 1i32;
416 }
417 if version >= 0 {
418 m.heartbeat_interval_ms = 1i32;
419 }
420 if version >= 0 {
421 m.acceptable_recovery_lag = 1i32;
422 }
423 if version >= 0 {
424 m.task_offset_interval_ms = 1i32;
425 }
426 if version >= 0 {
427 m.status = Some(vec![
428 super::common::streams_group_heartbeat_response::status::Status::populated(version),
429 ]);
430 }
431 if version >= 0 {
432 m.active_tasks = Some(vec![
433 super::common::streams_group_heartbeat_response::task_ids::TaskIds::populated(
434 version,
435 ),
436 ]);
437 }
438 if version >= 0 {
439 m.standby_tasks = Some(vec![
440 super::common::streams_group_heartbeat_response::task_ids::TaskIds::populated(
441 version,
442 ),
443 ]);
444 }
445 if version >= 0 {
446 m.warmup_tasks = Some(vec![
447 super::common::streams_group_heartbeat_response::task_ids::TaskIds::populated(
448 version,
449 ),
450 ]);
451 }
452 if version >= 0 {
453 m.endpoint_information_epoch = 1i32;
454 }
455 if version >= 0 {
456 m.partitions_by_user_endpoint = Some(vec![EndpointToPartitions::populated(version)]);
457 }
458 m
459 }
460}
461#[derive(Debug, Clone, PartialEq, Eq, Default)]
462pub struct EndpointToPartitions {
463 pub user_endpoint: super::common::streams_group_heartbeat_response::endpoint::Endpoint,
464 pub active_partitions:
465 Vec<super::common::streams_group_heartbeat_response::topic_partition::TopicPartition>,
466 pub standby_partitions:
467 Vec<super::common::streams_group_heartbeat_response::topic_partition::TopicPartition>,
468 pub unknown_tagged_fields: UnknownTaggedFields,
469}
470impl Encode for EndpointToPartitions {
471 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
472 let flex = version >= 0;
473 if version >= 0 {
474 self.user_endpoint.encode(buf, version)?;
475 }
476 if version >= 0 {
477 {
478 crate::primitives::array::put_array_len(buf, (self.active_partitions).len(), flex);
479 for it in &self.active_partitions {
480 it.encode(buf, version)?;
481 }
482 }
483 }
484 if version >= 0 {
485 {
486 crate::primitives::array::put_array_len(buf, (self.standby_partitions).len(), flex);
487 for it in &self.standby_partitions {
488 it.encode(buf, version)?;
489 }
490 }
491 }
492 if flex {
493 let tagged = WriteTaggedFields::new();
494 tagged.write(buf, &self.unknown_tagged_fields);
495 }
496 Ok(())
497 }
498 fn encoded_len(&self, version: i16) -> usize {
499 let flex = version >= 0;
500 let mut n: usize = 0;
501 if version >= 0 {
502 n += self.user_endpoint.encoded_len(version);
503 }
504 if version >= 0 {
505 n += {
506 let prefix = crate::primitives::array::array_len_prefix_len(
507 (self.active_partitions).len(),
508 flex,
509 );
510 let body: usize = (self.active_partitions)
511 .iter()
512 .map(|it| it.encoded_len(version))
513 .sum();
514 prefix + body
515 };
516 }
517 if version >= 0 {
518 n += {
519 let prefix = crate::primitives::array::array_len_prefix_len(
520 (self.standby_partitions).len(),
521 flex,
522 );
523 let body: usize = (self.standby_partitions)
524 .iter()
525 .map(|it| it.encoded_len(version))
526 .sum();
527 prefix + body
528 };
529 }
530 if flex {
531 let known_pairs: Vec<(u32, usize)> = Vec::new();
532 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
533 }
534 n
535 }
536}
537impl Decode<'_> for EndpointToPartitions {
538 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
539 let flex = version >= 0;
540 let mut out = Self::default();
541 if version >= 0 {
542 out.user_endpoint =
543 super::common::streams_group_heartbeat_response::endpoint::Endpoint::decode(
544 buf, version,
545 )?;
546 }
547 if version >= 0 {
548 out.active_partitions = {
549 let n = crate::primitives::array::get_array_len(buf, flex)?;
550 let mut v = Vec::with_capacity(n);
551 for _ in 0..n {
552 v.push(
553 super::common::streams_group_heartbeat_response::topic_partition::TopicPartition::decode(
554 buf,
555 version,
556 )?,
557 );
558 }
559 v
560 };
561 }
562 if version >= 0 {
563 out.standby_partitions = {
564 let n = crate::primitives::array::get_array_len(buf, flex)?;
565 let mut v = Vec::with_capacity(n);
566 for _ in 0..n {
567 v.push(
568 super::common::streams_group_heartbeat_response::topic_partition::TopicPartition::decode(
569 buf,
570 version,
571 )?,
572 );
573 }
574 v
575 };
576 }
577 if flex {
578 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
579 }
580 Ok(out)
581 }
582}
583#[cfg(test)]
584impl EndpointToPartitions {
585 #[must_use]
586 pub fn populated(version: i16) -> Self {
587 let mut m = Self::default();
588 if version >= 0 {
589 m.user_endpoint =
590 super::common::streams_group_heartbeat_response::endpoint::Endpoint::populated(
591 version,
592 );
593 }
594 if version >= 0 {
595 m.active_partitions = vec![
596 super::common::streams_group_heartbeat_response::topic_partition::TopicPartition::populated(version)
597 ];
598 }
599 if version >= 0 {
600 m.standby_partitions = vec![
601 super::common::streams_group_heartbeat_response::topic_partition::TopicPartition::populated(version)
602 ];
603 }
604 m
605 }
606}
607#[must_use]
610#[allow(unused_comparisons)]
611pub fn default_json(version: i16) -> ::serde_json::Value {
612 let mut obj = ::serde_json::Map::new();
613 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
614 obj.insert("errorCode".to_string(), ::serde_json::json!(0));
615 obj.insert("errorMessage".to_string(), ::serde_json::Value::Null);
616 obj.insert(
617 "memberId".to_string(),
618 ::serde_json::Value::String(String::new()),
619 );
620 obj.insert("memberEpoch".to_string(), ::serde_json::json!(0));
621 obj.insert("heartbeatIntervalMs".to_string(), ::serde_json::json!(0));
622 obj.insert("acceptableRecoveryLag".to_string(), ::serde_json::json!(0));
623 obj.insert("taskOffsetIntervalMs".to_string(), ::serde_json::json!(0));
624 obj.insert("status".to_string(), ::serde_json::Value::Null);
625 obj.insert("activeTasks".to_string(), ::serde_json::Value::Null);
626 obj.insert("standbyTasks".to_string(), ::serde_json::Value::Null);
627 obj.insert("warmupTasks".to_string(), ::serde_json::Value::Null);
628 obj.insert(
629 "endpointInformationEpoch".to_string(),
630 ::serde_json::json!(0),
631 );
632 obj.insert(
633 "partitionsByUserEndpoint".to_string(),
634 ::serde_json::Value::Null,
635 );
636 ::serde_json::Value::Object(obj)
637}