1use bytes::BufMut;
4
5use crate::primitives::fixed::{get_i16, get_i32, put_i16, put_i32};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, nullable_string_len,
8 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string,
9 string_len,
10};
11use crate::primitives::string_bytes_borrowed::{
12 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
13 get_nullable_string_borrowed, get_string_borrowed,
14};
15use crate::tagged_fields::{read_tagged_fields, tagged_fields_len, WriteTaggedFields};
16use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
17
18pub const API_KEY: i16 = 88;
19pub const MIN_VERSION: i16 = 0;
20pub const MAX_VERSION: i16 = 0;
21pub const FLEXIBLE_MIN: i16 = 0;
22
23#[inline]
24fn is_flexible(version: i16) -> bool { version >= FLEXIBLE_MIN }
25
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct StreamsGroupHeartbeatResponse<'a> {
28 pub throttle_time_ms: i32,
29 pub error_code: i16,
30 pub error_message: Option<&'a str>,
31 pub member_id: &'a str,
32 pub member_epoch: i32,
33 pub heartbeat_interval_ms: i32,
34 pub acceptable_recovery_lag: i32,
35 pub task_offset_interval_ms: i32,
36 pub status: Option<Vec<super::common::status::Status<'a>>>,
37 pub active_tasks: Option<Vec<super::common::task_ids::TaskIds<'a>>>,
38 pub standby_tasks: Option<Vec<super::common::task_ids::TaskIds<'a>>>,
39 pub warmup_tasks: Option<Vec<super::common::task_ids::TaskIds<'a>>>,
40 pub endpoint_information_epoch: i32,
41 pub partitions_by_user_endpoint: Option<Vec<EndpointToPartitions<'a>>>,
42 pub unknown_tagged_fields: UnknownTaggedFields,
43}
44
45impl<'a> Default for StreamsGroupHeartbeatResponse<'a> {
46 fn default() -> Self {
47 Self {
48 throttle_time_ms: 0i32,
49 error_code: 0i16,
50 error_message: None,
51 member_id: "",
52 member_epoch: 0i32,
53 heartbeat_interval_ms: 0i32,
54 acceptable_recovery_lag: 0i32,
55 task_offset_interval_ms: 0i32,
56 status: None,
57 active_tasks: None,
58 standby_tasks: None,
59 warmup_tasks: None,
60 endpoint_information_epoch: 0i32,
61 partitions_by_user_endpoint: None,
62 unknown_tagged_fields: Default::default(),
63 }
64 }
65}
66
67impl<'a> StreamsGroupHeartbeatResponse<'a> {
68 pub fn to_owned(&self) -> crate::owned::streams_group_heartbeat_response::StreamsGroupHeartbeatResponse {
69 crate::owned::streams_group_heartbeat_response::StreamsGroupHeartbeatResponse {
70 throttle_time_ms: (self.throttle_time_ms),
71 error_code: (self.error_code),
72 error_message: (self.error_message).map(|s| s.to_string()),
73 member_id: (self.member_id).to_string(),
74 member_epoch: (self.member_epoch),
75 heartbeat_interval_ms: (self.heartbeat_interval_ms),
76 acceptable_recovery_lag: (self.acceptable_recovery_lag),
77 task_offset_interval_ms: (self.task_offset_interval_ms),
78 status: (self.status).as_ref().map(|v| v.iter().map(|it| it.to_owned()).collect()),
79 active_tasks: (self.active_tasks).as_ref().map(|v| v.iter().map(|it| it.to_owned()).collect()),
80 standby_tasks: (self.standby_tasks).as_ref().map(|v| v.iter().map(|it| it.to_owned()).collect()),
81 warmup_tasks: (self.warmup_tasks).as_ref().map(|v| v.iter().map(|it| it.to_owned()).collect()),
82 endpoint_information_epoch: (self.endpoint_information_epoch),
83 partitions_by_user_endpoint: (self.partitions_by_user_endpoint).as_ref().map(|v| v.iter().map(|it| it.to_owned()).collect()),
84 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
85 }
86 }
87}
88
89impl<'a> Encode for StreamsGroupHeartbeatResponse<'a> {
90 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
91 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
92 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
93 }
94 let flex = is_flexible(version);
95 if version >= 0 { put_i32(buf, self.throttle_time_ms) }
96 if version >= 0 { put_i16(buf, self.error_code) }
97 if version >= 0 { if flex { put_compact_nullable_string(buf, self.error_message) } else { put_nullable_string(buf, self.error_message) } }
98 if version >= 0 { if flex { put_compact_string(buf, self.member_id) } else { put_string(buf, self.member_id) } }
99 if version >= 0 { put_i32(buf, self.member_epoch) }
100 if version >= 0 { put_i32(buf, self.heartbeat_interval_ms) }
101 if version >= 0 { put_i32(buf, self.acceptable_recovery_lag) }
102 if version >= 0 { put_i32(buf, self.task_offset_interval_ms) }
103 if version >= 0 { { let len = (self.status).as_ref().map(Vec::len); crate::primitives::array::put_nullable_array_len(buf, len, flex); if let Some(v) = &self.status { for it in v { it.encode(buf, version)?; } } } }
104 if version >= 0 { { let len = (self.active_tasks).as_ref().map(Vec::len); crate::primitives::array::put_nullable_array_len(buf, len, flex); if let Some(v) = &self.active_tasks { for it in v { it.encode(buf, version)?; } } } }
105 if version >= 0 { { let len = (self.standby_tasks).as_ref().map(Vec::len); crate::primitives::array::put_nullable_array_len(buf, len, flex); if let Some(v) = &self.standby_tasks { for it in v { it.encode(buf, version)?; } } } }
106 if version >= 0 { { let len = (self.warmup_tasks).as_ref().map(Vec::len); crate::primitives::array::put_nullable_array_len(buf, len, flex); if let Some(v) = &self.warmup_tasks { for it in v { it.encode(buf, version)?; } } } }
107 if version >= 0 { put_i32(buf, self.endpoint_information_epoch) }
108 if version >= 0 { { let len = (self.partitions_by_user_endpoint).as_ref().map(Vec::len); crate::primitives::array::put_nullable_array_len(buf, len, flex); if let Some(v) = &self.partitions_by_user_endpoint { for it in v { it.encode(buf, version)?; } } } }
109 if flex {
110 let tagged = WriteTaggedFields::new();
111 tagged.write(buf, &self.unknown_tagged_fields);
112 }
113 Ok(())
114 }
115 fn encoded_len(&self, version: i16) -> usize {
116 let flex = is_flexible(version);
117 let mut n: usize = 0;
118 if version >= 0 { n += 4; }
119 if version >= 0 { n += 2; }
120 if version >= 0 { n += if flex { compact_nullable_string_len(self.error_message) } else { nullable_string_len(self.error_message) }; }
121 if version >= 0 { n += if flex { compact_string_len(self.member_id) } else { string_len(self.member_id) }; }
122 if version >= 0 { n += 4; }
123 if version >= 0 { n += 4; }
124 if version >= 0 { n += 4; }
125 if version >= 0 { n += 4; }
126 if version >= 0 { n += { let opt: Option<&Vec<_>> = (self.status).as_ref(); let prefix = crate::primitives::array::nullable_array_len_prefix_len(opt.map(|v| v.len()), flex); let body: usize = opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum()); prefix + body }; }
127 if version >= 0 { n += { let opt: Option<&Vec<_>> = (self.active_tasks).as_ref(); let prefix = crate::primitives::array::nullable_array_len_prefix_len(opt.map(|v| v.len()), flex); let body: usize = opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum()); prefix + body }; }
128 if version >= 0 { n += { let opt: Option<&Vec<_>> = (self.standby_tasks).as_ref(); let prefix = crate::primitives::array::nullable_array_len_prefix_len(opt.map(|v| v.len()), flex); let body: usize = opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum()); prefix + body }; }
129 if version >= 0 { n += { let opt: Option<&Vec<_>> = (self.warmup_tasks).as_ref(); let prefix = crate::primitives::array::nullable_array_len_prefix_len(opt.map(|v| v.len()), flex); let body: usize = opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum()); prefix + body }; }
130 if version >= 0 { n += 4; }
131 if version >= 0 { n += { let opt: Option<&Vec<_>> = (self.partitions_by_user_endpoint).as_ref(); let prefix = crate::primitives::array::nullable_array_len_prefix_len(opt.map(|v| v.len()), flex); let body: usize = opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum()); prefix + body }; }
132 if flex {
133 let known_pairs: Vec<(u32, usize)> = Vec::new();
134 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
135 }
136 n
137 }
138}
139
140impl<'de> DecodeBorrow<'de> for StreamsGroupHeartbeatResponse<'de> {
141 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
142 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
143 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
144 }
145 let flex = is_flexible(version);
146 let mut out = Self::default();
147 if version >= 0 { out.throttle_time_ms = get_i32(buf)?; }
148 if version >= 0 { out.error_code = get_i16(buf)?; }
149 if version >= 0 { out.error_message = if flex { get_compact_nullable_string_borrowed(buf)? } else { get_nullable_string_borrowed(buf)? }; }
150 if version >= 0 { out.member_id = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
151 if version >= 0 { out.member_epoch = get_i32(buf)?; }
152 if version >= 0 { out.heartbeat_interval_ms = get_i32(buf)?; }
153 if version >= 0 { out.acceptable_recovery_lag = get_i32(buf)?; }
154 if version >= 0 { out.task_offset_interval_ms = get_i32(buf)?; }
155 if version >= 0 { out.status = { let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?; match opt { None => None, Some(n) => { let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::status::Status::decode_borrow(buf, version)?); } Some(v) } } }; }
156 if version >= 0 { out.active_tasks = { let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?; match opt { None => None, Some(n) => { let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::task_ids::TaskIds::decode_borrow(buf, version)?); } Some(v) } } }; }
157 if version >= 0 { out.standby_tasks = { let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?; match opt { None => None, Some(n) => { let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::task_ids::TaskIds::decode_borrow(buf, version)?); } Some(v) } } }; }
158 if version >= 0 { out.warmup_tasks = { let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?; match opt { None => None, Some(n) => { let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::task_ids::TaskIds::decode_borrow(buf, version)?); } Some(v) } } }; }
159 if version >= 0 { out.endpoint_information_epoch = get_i32(buf)?; }
160 if version >= 0 { out.partitions_by_user_endpoint = { let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?; match opt { None => None, Some(n) => { let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(EndpointToPartitions::decode_borrow(buf, version)?); } Some(v) } } }; }
161 if flex {
162 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
163 Ok(false)
164 })?;
165 }
166 Ok(out)
167 }
168}
169
170#[derive(Debug, Clone, PartialEq, Eq)]
171pub struct EndpointToPartitions<'a> {
172 pub user_endpoint: super::common::endpoint::Endpoint<'a>,
173 pub active_partitions: Vec<super::common::topic_partition::TopicPartition<'a>>,
174 pub standby_partitions: Vec<super::common::topic_partition::TopicPartition<'a>>,
175 pub unknown_tagged_fields: UnknownTaggedFields,
176}
177
178impl<'a> Default for EndpointToPartitions<'a> {
179 fn default() -> Self {
180 Self {
181 user_endpoint: Default::default(),
182 active_partitions: Vec::new(),
183 standby_partitions: Vec::new(),
184 unknown_tagged_fields: Default::default(),
185 }
186 }
187}
188
189impl<'a> EndpointToPartitions<'a> {
190 pub fn to_owned(&self) -> crate::owned::streams_group_heartbeat_response::EndpointToPartitions {
191 crate::owned::streams_group_heartbeat_response::EndpointToPartitions {
192 user_endpoint: (self.user_endpoint).to_owned(),
193 active_partitions: (self.active_partitions).iter().map(|it| it.to_owned()).collect(),
194 standby_partitions: (self.standby_partitions).iter().map(|it| it.to_owned()).collect(),
195 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
196 }
197 }
198}
199
200impl<'a> Encode for EndpointToPartitions<'a> {
201 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
202 let flex = version >= 0;
203 if version >= 0 { self.user_endpoint.encode(buf, version)? }
204 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.active_partitions).len(), flex); for it in &self.active_partitions { it.encode(buf, version)?; } } }
205 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.standby_partitions).len(), flex); for it in &self.standby_partitions { it.encode(buf, version)?; } } }
206 if flex {
207 let tagged = WriteTaggedFields::new();
208 tagged.write(buf, &self.unknown_tagged_fields);
209 }
210 Ok(())
211 }
212 fn encoded_len(&self, version: i16) -> usize {
213 let flex = version >= 0;
214 let mut n: usize = 0;
215 if version >= 0 { n += self.user_endpoint.encoded_len(version); }
216 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.active_partitions).len(), flex); let body: usize = (self.active_partitions).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
217 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.standby_partitions).len(), flex); let body: usize = (self.standby_partitions).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
218 if flex {
219 let known_pairs: Vec<(u32, usize)> = Vec::new();
220 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
221 }
222 n
223 }
224}
225
226impl<'de> DecodeBorrow<'de> for EndpointToPartitions<'de> {
227 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
228 let flex = version >= 0;
229 let mut out = Self::default();
230 if version >= 0 { out.user_endpoint = super::common::endpoint::Endpoint::decode_borrow(buf, version)?; }
231 if version >= 0 { out.active_partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::topic_partition::TopicPartition::decode_borrow(buf, version)?); } v }; }
232 if version >= 0 { out.standby_partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::topic_partition::TopicPartition::decode_borrow(buf, version)?); } v }; }
233 if flex {
234 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
235 Ok(false)
236 })?;
237 }
238 Ok(out)
239 }
240}