1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_bool, get_i16, get_i32, get_i8, put_bool, put_i16, put_i32};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string,
10 string_len,
11};
12use crate::tagged_fields::{read_tagged_fields, tagged_fields_len, WriteTaggedFields};
13use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
14
15pub const API_KEY: i16 = 88;
16pub const MIN_VERSION: i16 = 0;
17pub const MAX_VERSION: i16 = 0;
18pub const FLEXIBLE_MIN: i16 = 0;
19
20#[inline]
21fn is_flexible(version: i16) -> bool { version >= FLEXIBLE_MIN }
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct StreamsGroupHeartbeatRequest {
25 pub group_id: String,
26 pub member_id: String,
27 pub member_epoch: i32,
28 pub endpoint_information_epoch: i32,
29 pub instance_id: Option<String>,
30 pub rack_id: Option<String>,
31 pub rebalance_timeout_ms: i32,
32 pub topology: Option<Topology>,
33 pub active_tasks: Option<Vec<super::common::task_ids::TaskIds>>,
34 pub standby_tasks: Option<Vec<super::common::task_ids::TaskIds>>,
35 pub warmup_tasks: Option<Vec<super::common::task_ids::TaskIds>>,
36 pub process_id: Option<String>,
37 pub user_endpoint: Option<super::common::endpoint::Endpoint>,
38 pub client_tags: Option<Vec<super::common::key_value::KeyValue>>,
39 pub task_offsets: Option<Vec<super::common::task_offset::TaskOffset>>,
40 pub task_end_offsets: Option<Vec<super::common::task_offset::TaskOffset>>,
41 pub shutdown_application: bool,
42 pub unknown_tagged_fields: UnknownTaggedFields,
43}
44
45impl Default for StreamsGroupHeartbeatRequest {
46 fn default() -> Self {
47 Self {
48 group_id: String::new(),
49 member_id: String::new(),
50 member_epoch: 0i32,
51 endpoint_information_epoch: 0i32,
52 instance_id: None,
53 rack_id: None,
54 rebalance_timeout_ms: -1i32,
55 topology: None,
56 active_tasks: None,
57 standby_tasks: None,
58 warmup_tasks: None,
59 process_id: None,
60 user_endpoint: None,
61 client_tags: None,
62 task_offsets: None,
63 task_end_offsets: None,
64 shutdown_application: false,
65 unknown_tagged_fields: Default::default(),
66 }
67 }
68}
69
70impl Encode for StreamsGroupHeartbeatRequest {
71 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
72 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
73 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
74 }
75 let flex = is_flexible(version);
76 if version >= 0 { if flex { put_compact_string(buf, &self.group_id) } else { put_string(buf, &self.group_id) } }
77 if version >= 0 { if flex { put_compact_string(buf, &self.member_id) } else { put_string(buf, &self.member_id) } }
78 if version >= 0 { put_i32(buf, self.member_epoch) }
79 if version >= 0 { put_i32(buf, self.endpoint_information_epoch) }
80 if version >= 0 { if flex { put_compact_nullable_string(buf, self.instance_id.as_deref()) } else { put_nullable_string(buf, self.instance_id.as_deref()) } }
81 if version >= 0 { if flex { put_compact_nullable_string(buf, self.rack_id.as_deref()) } else { put_nullable_string(buf, self.rack_id.as_deref()) } }
82 if version >= 0 { put_i32(buf, self.rebalance_timeout_ms) }
83 if version >= 0 { match &self.topology { None => { buf.put_i8(-1); }, Some(v) => { buf.put_i8(1); v.encode(buf, version)?; } } }
84 if version >= 0 { { let len = (self.active_tasks).as_ref().map(Vec::len); crate::primitives::array::put_nullable_array_len(buf, len, flex); if let Some(v) = &self.active_tasks { for it in v { it.encode(buf, version)?; } } } }
85 if version >= 0 { { let len = (self.standby_tasks).as_ref().map(Vec::len); crate::primitives::array::put_nullable_array_len(buf, len, flex); if let Some(v) = &self.standby_tasks { for it in v { it.encode(buf, version)?; } } } }
86 if version >= 0 { { let len = (self.warmup_tasks).as_ref().map(Vec::len); crate::primitives::array::put_nullable_array_len(buf, len, flex); if let Some(v) = &self.warmup_tasks { for it in v { it.encode(buf, version)?; } } } }
87 if version >= 0 { if flex { put_compact_nullable_string(buf, self.process_id.as_deref()) } else { put_nullable_string(buf, self.process_id.as_deref()) } }
88 if version >= 0 { match &self.user_endpoint { None => { buf.put_i8(-1); }, Some(v) => { buf.put_i8(1); v.encode(buf, version)?; } } }
89 if version >= 0 { { let len = (self.client_tags).as_ref().map(Vec::len); crate::primitives::array::put_nullable_array_len(buf, len, flex); if let Some(v) = &self.client_tags { for it in v { it.encode(buf, version)?; } } } }
90 if version >= 0 { { let len = (self.task_offsets).as_ref().map(Vec::len); crate::primitives::array::put_nullable_array_len(buf, len, flex); if let Some(v) = &self.task_offsets { for it in v { it.encode(buf, version)?; } } } }
91 if version >= 0 { { let len = (self.task_end_offsets).as_ref().map(Vec::len); crate::primitives::array::put_nullable_array_len(buf, len, flex); if let Some(v) = &self.task_end_offsets { for it in v { it.encode(buf, version)?; } } } }
92 if version >= 0 { put_bool(buf, self.shutdown_application) }
93 if flex {
94 let tagged = WriteTaggedFields::new();
95 tagged.write(buf, &self.unknown_tagged_fields);
96 }
97 Ok(())
98 }
99 fn encoded_len(&self, version: i16) -> usize {
100 let flex = is_flexible(version);
101 let mut n: usize = 0;
102 if version >= 0 { n += if flex { compact_string_len(&self.group_id) } else { string_len(&self.group_id) }; }
103 if version >= 0 { n += if flex { compact_string_len(&self.member_id) } else { string_len(&self.member_id) }; }
104 if version >= 0 { n += 4; }
105 if version >= 0 { n += 4; }
106 if version >= 0 { n += if flex { compact_nullable_string_len(self.instance_id.as_deref()) } else { nullable_string_len(self.instance_id.as_deref()) }; }
107 if version >= 0 { n += if flex { compact_nullable_string_len(self.rack_id.as_deref()) } else { nullable_string_len(self.rack_id.as_deref()) }; }
108 if version >= 0 { n += 4; }
109 if version >= 0 { n += 1 + self.topology.as_ref().map_or(0, |v| v.encoded_len(version)); }
110 if version >= 0 { n += { let opt: Option<&Vec<_>> = (self.active_tasks).as_ref(); let prefix = crate::primitives::array::nullable_array_len_prefix_len(opt.map(|v| v.len()), flex); let body: usize = opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum()); prefix + body }; }
111 if version >= 0 { n += { let opt: Option<&Vec<_>> = (self.standby_tasks).as_ref(); let prefix = crate::primitives::array::nullable_array_len_prefix_len(opt.map(|v| v.len()), flex); let body: usize = opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum()); prefix + body }; }
112 if version >= 0 { n += { let opt: Option<&Vec<_>> = (self.warmup_tasks).as_ref(); let prefix = crate::primitives::array::nullable_array_len_prefix_len(opt.map(|v| v.len()), flex); let body: usize = opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum()); prefix + body }; }
113 if version >= 0 { n += if flex { compact_nullable_string_len(self.process_id.as_deref()) } else { nullable_string_len(self.process_id.as_deref()) }; }
114 if version >= 0 { n += 1 + self.user_endpoint.as_ref().map_or(0, |v| v.encoded_len(version)); }
115 if version >= 0 { n += { let opt: Option<&Vec<_>> = (self.client_tags).as_ref(); let prefix = crate::primitives::array::nullable_array_len_prefix_len(opt.map(|v| v.len()), flex); let body: usize = opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum()); prefix + body }; }
116 if version >= 0 { n += { let opt: Option<&Vec<_>> = (self.task_offsets).as_ref(); let prefix = crate::primitives::array::nullable_array_len_prefix_len(opt.map(|v| v.len()), flex); let body: usize = opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum()); prefix + body }; }
117 if version >= 0 { n += { let opt: Option<&Vec<_>> = (self.task_end_offsets).as_ref(); let prefix = crate::primitives::array::nullable_array_len_prefix_len(opt.map(|v| v.len()), flex); let body: usize = opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum()); prefix + body }; }
118 if version >= 0 { n += 1; }
119 if flex {
120 let known_pairs: Vec<(u32, usize)> = Vec::new();
121 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
122 }
123 n
124 }
125}
126
127impl<'de> Decode<'de> for StreamsGroupHeartbeatRequest {
128 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
129 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
130 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
131 }
132 let flex = is_flexible(version);
133 let mut out = Self::default();
134 if version >= 0 { out.group_id = if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }; }
135 if version >= 0 { out.member_id = if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }; }
136 if version >= 0 { out.member_epoch = get_i32(buf)?; }
137 if version >= 0 { out.endpoint_information_epoch = get_i32(buf)?; }
138 if version >= 0 { out.instance_id = if flex { get_compact_nullable_string_owned(buf)? } else { get_nullable_string_owned(buf)? }; }
139 if version >= 0 { out.rack_id = if flex { get_compact_nullable_string_owned(buf)? } else { get_nullable_string_owned(buf)? }; }
140 if version >= 0 { out.rebalance_timeout_ms = get_i32(buf)?; }
141 if version >= 0 { out.topology = if get_i8(buf)? < 0 { None } else { Some(Topology::decode(buf, version)?) }; }
142 if version >= 0 { out.active_tasks = { let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?; match opt { None => None, Some(n) => { let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::task_ids::TaskIds::decode(buf, version)?); } Some(v) } } }; }
143 if version >= 0 { out.standby_tasks = { let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?; match opt { None => None, Some(n) => { let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::task_ids::TaskIds::decode(buf, version)?); } Some(v) } } }; }
144 if version >= 0 { out.warmup_tasks = { let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?; match opt { None => None, Some(n) => { let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::task_ids::TaskIds::decode(buf, version)?); } Some(v) } } }; }
145 if version >= 0 { out.process_id = if flex { get_compact_nullable_string_owned(buf)? } else { get_nullable_string_owned(buf)? }; }
146 if version >= 0 { out.user_endpoint = if get_i8(buf)? < 0 { None } else { Some(super::common::endpoint::Endpoint::decode(buf, version)?) }; }
147 if version >= 0 { out.client_tags = { let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?; match opt { None => None, Some(n) => { let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::key_value::KeyValue::decode(buf, version)?); } Some(v) } } }; }
148 if version >= 0 { out.task_offsets = { let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?; match opt { None => None, Some(n) => { let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::task_offset::TaskOffset::decode(buf, version)?); } Some(v) } } }; }
149 if version >= 0 { out.task_end_offsets = { let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?; match opt { None => None, Some(n) => { let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::task_offset::TaskOffset::decode(buf, version)?); } Some(v) } } }; }
150 if version >= 0 { out.shutdown_application = get_bool(buf)?; }
151 if flex {
152 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
153 Ok(false)
154 })?;
155 }
156 Ok(out)
157 }
158}
159
160#[derive(Debug, Clone, PartialEq, Eq, Default)]
161pub struct Topology {
162 pub epoch: i32,
163 pub subtopologies: Vec<Subtopology>,
164 pub unknown_tagged_fields: UnknownTaggedFields,
165}
166
167impl Encode for Topology {
168 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
169 let flex = version >= 0;
170 if version >= 0 { put_i32(buf, self.epoch) }
171 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.subtopologies).len(), flex); for it in &self.subtopologies { it.encode(buf, version)?; } } }
172 if flex {
173 let tagged = WriteTaggedFields::new();
174 tagged.write(buf, &self.unknown_tagged_fields);
175 }
176 Ok(())
177 }
178 fn encoded_len(&self, version: i16) -> usize {
179 let flex = version >= 0;
180 let mut n: usize = 0;
181 if version >= 0 { n += 4; }
182 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.subtopologies).len(), flex); let body: usize = (self.subtopologies).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
183 if flex {
184 let known_pairs: Vec<(u32, usize)> = Vec::new();
185 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
186 }
187 n
188 }
189}
190
191impl<'de> Decode<'de> for Topology {
192 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
193 let flex = version >= 0;
194 let mut out = Self::default();
195 if version >= 0 { out.epoch = get_i32(buf)?; }
196 if version >= 0 { out.subtopologies = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(Subtopology::decode(buf, version)?); } v }; }
197 if flex {
198 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
199 Ok(false)
200 })?;
201 }
202 Ok(out)
203 }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq, Default)]
207pub struct Subtopology {
208 pub subtopology_id: String,
209 pub source_topics: Vec<String>,
210 pub source_topic_regex: Vec<String>,
211 pub state_changelog_topics: Vec<super::common::topic_info::TopicInfo>,
212 pub repartition_sink_topics: Vec<String>,
213 pub repartition_source_topics: Vec<super::common::topic_info::TopicInfo>,
214 pub copartition_groups: Vec<CopartitionGroup>,
215 pub unknown_tagged_fields: UnknownTaggedFields,
216}
217
218impl Encode for Subtopology {
219 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
220 let flex = version >= 0;
221 if version >= 0 { if flex { put_compact_string(buf, &self.subtopology_id) } else { put_string(buf, &self.subtopology_id) } }
222 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.source_topics).len(), flex); for it in &self.source_topics { if flex { put_compact_string(buf, &*it) } else { put_string(buf, &*it) }; } } }
223 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.source_topic_regex).len(), flex); for it in &self.source_topic_regex { if flex { put_compact_string(buf, &*it) } else { put_string(buf, &*it) }; } } }
224 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.state_changelog_topics).len(), flex); for it in &self.state_changelog_topics { it.encode(buf, version)?; } } }
225 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.repartition_sink_topics).len(), flex); for it in &self.repartition_sink_topics { if flex { put_compact_string(buf, &*it) } else { put_string(buf, &*it) }; } } }
226 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.repartition_source_topics).len(), flex); for it in &self.repartition_source_topics { it.encode(buf, version)?; } } }
227 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.copartition_groups).len(), flex); for it in &self.copartition_groups { it.encode(buf, version)?; } } }
228 if flex {
229 let tagged = WriteTaggedFields::new();
230 tagged.write(buf, &self.unknown_tagged_fields);
231 }
232 Ok(())
233 }
234 fn encoded_len(&self, version: i16) -> usize {
235 let flex = version >= 0;
236 let mut n: usize = 0;
237 if version >= 0 { n += if flex { compact_string_len(&self.subtopology_id) } else { string_len(&self.subtopology_id) }; }
238 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.source_topics).len(), flex); let body: usize = (self.source_topics).iter().map(|it| if flex { compact_string_len(&*it) } else { string_len(&*it) }).sum(); prefix + body }; }
239 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.source_topic_regex).len(), flex); let body: usize = (self.source_topic_regex).iter().map(|it| if flex { compact_string_len(&*it) } else { string_len(&*it) }).sum(); prefix + body }; }
240 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.state_changelog_topics).len(), flex); let body: usize = (self.state_changelog_topics).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
241 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.repartition_sink_topics).len(), flex); let body: usize = (self.repartition_sink_topics).iter().map(|it| if flex { compact_string_len(&*it) } else { string_len(&*it) }).sum(); prefix + body }; }
242 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.repartition_source_topics).len(), flex); let body: usize = (self.repartition_source_topics).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
243 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.copartition_groups).len(), flex); let body: usize = (self.copartition_groups).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
244 if flex {
245 let known_pairs: Vec<(u32, usize)> = Vec::new();
246 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
247 }
248 n
249 }
250}
251
252impl<'de> Decode<'de> for Subtopology {
253 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
254 let flex = version >= 0;
255 let mut out = Self::default();
256 if version >= 0 { out.subtopology_id = if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }; }
257 if version >= 0 { out.source_topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }); } v }; }
258 if version >= 0 { out.source_topic_regex = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }); } v }; }
259 if version >= 0 { out.state_changelog_topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::topic_info::TopicInfo::decode(buf, version)?); } v }; }
260 if version >= 0 { out.repartition_sink_topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }); } v }; }
261 if version >= 0 { out.repartition_source_topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::topic_info::TopicInfo::decode(buf, version)?); } v }; }
262 if version >= 0 { out.copartition_groups = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(CopartitionGroup::decode(buf, version)?); } v }; }
263 if flex {
264 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
265 Ok(false)
266 })?;
267 }
268 Ok(out)
269 }
270}
271
272#[derive(Debug, Clone, PartialEq, Eq, Default)]
273pub struct CopartitionGroup {
274 pub source_topics: Vec<i16>,
275 pub source_topic_regex: Vec<i16>,
276 pub repartition_source_topics: Vec<i16>,
277 pub unknown_tagged_fields: UnknownTaggedFields,
278}
279
280impl Encode for CopartitionGroup {
281 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
282 let flex = version >= 0;
283 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.source_topics).len(), flex); for it in &self.source_topics { put_i16(buf, *it); } } }
284 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.source_topic_regex).len(), flex); for it in &self.source_topic_regex { put_i16(buf, *it); } } }
285 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.repartition_source_topics).len(), flex); for it in &self.repartition_source_topics { put_i16(buf, *it); } } }
286 if flex {
287 let tagged = WriteTaggedFields::new();
288 tagged.write(buf, &self.unknown_tagged_fields);
289 }
290 Ok(())
291 }
292 fn encoded_len(&self, version: i16) -> usize {
293 let flex = version >= 0;
294 let mut n: usize = 0;
295 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.source_topics).len(), flex); let body: usize = (self.source_topics).iter().map(|_| 2).sum(); prefix + body }; }
296 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.source_topic_regex).len(), flex); let body: usize = (self.source_topic_regex).iter().map(|_| 2).sum(); prefix + body }; }
297 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.repartition_source_topics).len(), flex); let body: usize = (self.repartition_source_topics).iter().map(|_| 2).sum(); prefix + body }; }
298 if flex {
299 let known_pairs: Vec<(u32, usize)> = Vec::new();
300 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
301 }
302 n
303 }
304}
305
306impl<'de> Decode<'de> for CopartitionGroup {
307 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
308 let flex = version >= 0;
309 let mut out = Self::default();
310 if version >= 0 { out.source_topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(get_i16(buf)?); } v }; }
311 if version >= 0 { out.source_topic_regex = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(get_i16(buf)?); } v }; }
312 if version >= 0 { out.repartition_source_topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(get_i16(buf)?); } v }; }
313 if flex {
314 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
315 Ok(false)
316 })?;
317 }
318 Ok(out)
319 }
320}
321
322#[must_use]
325#[allow(unused_comparisons)]
326pub fn default_json(version: i16) -> ::serde_json::Value {
327 let mut obj = ::serde_json::Map::new();
328 obj.insert("groupId".to_string(), ::serde_json::Value::String(String::new()));
329 obj.insert("memberId".to_string(), ::serde_json::Value::String(String::new()));
330 obj.insert("memberEpoch".to_string(), ::serde_json::json!(0));
331 obj.insert("endpointInformationEpoch".to_string(), ::serde_json::json!(0));
332 obj.insert("instanceId".to_string(), ::serde_json::Value::Null);
333 obj.insert("rackId".to_string(), ::serde_json::Value::Null);
334 obj.insert("rebalanceTimeoutMs".to_string(), ::serde_json::json!(-1));
335 obj.insert("topology".to_string(), ::serde_json::Value::Null);
336 obj.insert("activeTasks".to_string(), ::serde_json::Value::Null);
337 obj.insert("standbyTasks".to_string(), ::serde_json::Value::Null);
338 obj.insert("warmupTasks".to_string(), ::serde_json::Value::Null);
339 obj.insert("processId".to_string(), ::serde_json::Value::Null);
340 obj.insert("userEndpoint".to_string(), ::serde_json::Value::Null);
341 obj.insert("clientTags".to_string(), ::serde_json::Value::Null);
342 obj.insert("taskOffsets".to_string(), ::serde_json::Value::Null);
343 obj.insert("taskEndOffsets".to_string(), ::serde_json::Value::Null);
344 obj.insert("shutdownApplication".to_string(), ::serde_json::Value::Bool(false));
345 ::serde_json::Value::Object(obj)
346}
347
348impl crate::ProtocolRequest for StreamsGroupHeartbeatRequest {
349 const API_KEY: i16 = API_KEY;
350 const MIN_VERSION: i16 = MIN_VERSION;
351 const MAX_VERSION: i16 = MAX_VERSION;
352 const FLEXIBLE_MIN: i16 = FLEXIBLE_MIN;
353 type Response = super::streams_group_heartbeat_response::StreamsGroupHeartbeatResponse;
354}