1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ConsumerGroupHeartbeatRequest {
24 pub group_id: super::GroupId,
28
29 pub member_id: StrBytes,
33
34 pub member_epoch: i32,
38
39 pub instance_id: Option<StrBytes>,
43
44 pub rack_id: Option<StrBytes>,
48
49 pub rebalance_timeout_ms: i32,
53
54 pub subscribed_topic_names: Option<Vec<super::TopicName>>,
58
59 pub subscribed_topic_regex: Option<StrBytes>,
63
64 pub server_assignor: Option<StrBytes>,
68
69 pub topic_partitions: Option<Vec<TopicPartitions>>,
73
74 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
76}
77
78impl ConsumerGroupHeartbeatRequest {
79 pub fn with_group_id(mut self, value: super::GroupId) -> Self {
85 self.group_id = value;
86 self
87 }
88 pub fn with_member_id(mut self, value: StrBytes) -> Self {
94 self.member_id = value;
95 self
96 }
97 pub fn with_member_epoch(mut self, value: i32) -> Self {
103 self.member_epoch = value;
104 self
105 }
106 pub fn with_instance_id(mut self, value: Option<StrBytes>) -> Self {
112 self.instance_id = value;
113 self
114 }
115 pub fn with_rack_id(mut self, value: Option<StrBytes>) -> Self {
121 self.rack_id = value;
122 self
123 }
124 pub fn with_rebalance_timeout_ms(mut self, value: i32) -> Self {
130 self.rebalance_timeout_ms = value;
131 self
132 }
133 pub fn with_subscribed_topic_names(mut self, value: Option<Vec<super::TopicName>>) -> Self {
139 self.subscribed_topic_names = value;
140 self
141 }
142 pub fn with_subscribed_topic_regex(mut self, value: Option<StrBytes>) -> Self {
148 self.subscribed_topic_regex = value;
149 self
150 }
151 pub fn with_server_assignor(mut self, value: Option<StrBytes>) -> Self {
157 self.server_assignor = value;
158 self
159 }
160 pub fn with_topic_partitions(mut self, value: Option<Vec<TopicPartitions>>) -> Self {
166 self.topic_partitions = value;
167 self
168 }
169 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
171 self.unknown_tagged_fields = value;
172 self
173 }
174 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
176 self.unknown_tagged_fields.insert(key, value);
177 self
178 }
179}
180
181#[cfg(feature = "client")]
182impl Encodable for ConsumerGroupHeartbeatRequest {
183 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
184 if version < 0 || version > 1 {
185 bail!("specified version not supported by this message type");
186 }
187 types::CompactString.encode(buf, &self.group_id)?;
188 types::CompactString.encode(buf, &self.member_id)?;
189 types::Int32.encode(buf, &self.member_epoch)?;
190 types::CompactString.encode(buf, &self.instance_id)?;
191 types::CompactString.encode(buf, &self.rack_id)?;
192 types::Int32.encode(buf, &self.rebalance_timeout_ms)?;
193 types::CompactArray(types::CompactString).encode(buf, &self.subscribed_topic_names)?;
194 if version >= 1 {
195 types::CompactString.encode(buf, &self.subscribed_topic_regex)?;
196 } else {
197 if !self.subscribed_topic_regex.is_none() {
198 bail!("A field is set that is not available on the selected protocol version");
199 }
200 }
201 types::CompactString.encode(buf, &self.server_assignor)?;
202 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_partitions)?;
203 let num_tagged_fields = self.unknown_tagged_fields.len();
204 if num_tagged_fields > std::u32::MAX as usize {
205 bail!(
206 "Too many tagged fields to encode ({} fields)",
207 num_tagged_fields
208 );
209 }
210 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
211
212 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
213 Ok(())
214 }
215 fn compute_size(&self, version: i16) -> Result<usize> {
216 let mut total_size = 0;
217 total_size += types::CompactString.compute_size(&self.group_id)?;
218 total_size += types::CompactString.compute_size(&self.member_id)?;
219 total_size += types::Int32.compute_size(&self.member_epoch)?;
220 total_size += types::CompactString.compute_size(&self.instance_id)?;
221 total_size += types::CompactString.compute_size(&self.rack_id)?;
222 total_size += types::Int32.compute_size(&self.rebalance_timeout_ms)?;
223 total_size +=
224 types::CompactArray(types::CompactString).compute_size(&self.subscribed_topic_names)?;
225 if version >= 1 {
226 total_size += types::CompactString.compute_size(&self.subscribed_topic_regex)?;
227 } else {
228 if !self.subscribed_topic_regex.is_none() {
229 bail!("A field is set that is not available on the selected protocol version");
230 }
231 }
232 total_size += types::CompactString.compute_size(&self.server_assignor)?;
233 total_size +=
234 types::CompactArray(types::Struct { version }).compute_size(&self.topic_partitions)?;
235 let num_tagged_fields = self.unknown_tagged_fields.len();
236 if num_tagged_fields > std::u32::MAX as usize {
237 bail!(
238 "Too many tagged fields to encode ({} fields)",
239 num_tagged_fields
240 );
241 }
242 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
243
244 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
245 Ok(total_size)
246 }
247}
248
249#[cfg(feature = "broker")]
250impl Decodable for ConsumerGroupHeartbeatRequest {
251 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
252 if version < 0 || version > 1 {
253 bail!("specified version not supported by this message type");
254 }
255 let group_id = types::CompactString.decode(buf)?;
256 let member_id = types::CompactString.decode(buf)?;
257 let member_epoch = types::Int32.decode(buf)?;
258 let instance_id = types::CompactString.decode(buf)?;
259 let rack_id = types::CompactString.decode(buf)?;
260 let rebalance_timeout_ms = types::Int32.decode(buf)?;
261 let subscribed_topic_names = types::CompactArray(types::CompactString).decode(buf)?;
262 let subscribed_topic_regex = if version >= 1 {
263 types::CompactString.decode(buf)?
264 } else {
265 None
266 };
267 let server_assignor = types::CompactString.decode(buf)?;
268 let topic_partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
269 let mut unknown_tagged_fields = BTreeMap::new();
270 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
271 for _ in 0..num_tagged_fields {
272 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
273 let size: u32 = types::UnsignedVarInt.decode(buf)?;
274 let unknown_value = buf.try_get_bytes(size as usize)?;
275 unknown_tagged_fields.insert(tag as i32, unknown_value);
276 }
277 Ok(Self {
278 group_id,
279 member_id,
280 member_epoch,
281 instance_id,
282 rack_id,
283 rebalance_timeout_ms,
284 subscribed_topic_names,
285 subscribed_topic_regex,
286 server_assignor,
287 topic_partitions,
288 unknown_tagged_fields,
289 })
290 }
291}
292
293impl Default for ConsumerGroupHeartbeatRequest {
294 fn default() -> Self {
295 Self {
296 group_id: Default::default(),
297 member_id: Default::default(),
298 member_epoch: 0,
299 instance_id: None,
300 rack_id: None,
301 rebalance_timeout_ms: -1,
302 subscribed_topic_names: None,
303 subscribed_topic_regex: None,
304 server_assignor: None,
305 topic_partitions: None,
306 unknown_tagged_fields: BTreeMap::new(),
307 }
308 }
309}
310
311impl Message for ConsumerGroupHeartbeatRequest {
312 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
313 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
314}
315
316#[non_exhaustive]
318#[derive(Debug, Clone, PartialEq)]
319pub struct TopicPartitions {
320 pub topic_id: Uuid,
324
325 pub partitions: Vec<i32>,
329
330 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
332}
333
334impl TopicPartitions {
335 pub fn with_topic_id(mut self, value: Uuid) -> Self {
341 self.topic_id = value;
342 self
343 }
344 pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
350 self.partitions = value;
351 self
352 }
353 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
355 self.unknown_tagged_fields = value;
356 self
357 }
358 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
360 self.unknown_tagged_fields.insert(key, value);
361 self
362 }
363}
364
365#[cfg(feature = "client")]
366impl Encodable for TopicPartitions {
367 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
368 if version < 0 || version > 1 {
369 bail!("specified version not supported by this message type");
370 }
371 types::Uuid.encode(buf, &self.topic_id)?;
372 types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
373 let num_tagged_fields = self.unknown_tagged_fields.len();
374 if num_tagged_fields > std::u32::MAX as usize {
375 bail!(
376 "Too many tagged fields to encode ({} fields)",
377 num_tagged_fields
378 );
379 }
380 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
381
382 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
383 Ok(())
384 }
385 fn compute_size(&self, version: i16) -> Result<usize> {
386 let mut total_size = 0;
387 total_size += types::Uuid.compute_size(&self.topic_id)?;
388 total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
389 let num_tagged_fields = self.unknown_tagged_fields.len();
390 if num_tagged_fields > std::u32::MAX as usize {
391 bail!(
392 "Too many tagged fields to encode ({} fields)",
393 num_tagged_fields
394 );
395 }
396 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
397
398 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
399 Ok(total_size)
400 }
401}
402
403#[cfg(feature = "broker")]
404impl Decodable for TopicPartitions {
405 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
406 if version < 0 || version > 1 {
407 bail!("specified version not supported by this message type");
408 }
409 let topic_id = types::Uuid.decode(buf)?;
410 let partitions = types::CompactArray(types::Int32).decode(buf)?;
411 let mut unknown_tagged_fields = BTreeMap::new();
412 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
413 for _ in 0..num_tagged_fields {
414 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
415 let size: u32 = types::UnsignedVarInt.decode(buf)?;
416 let unknown_value = buf.try_get_bytes(size as usize)?;
417 unknown_tagged_fields.insert(tag as i32, unknown_value);
418 }
419 Ok(Self {
420 topic_id,
421 partitions,
422 unknown_tagged_fields,
423 })
424 }
425}
426
427impl Default for TopicPartitions {
428 fn default() -> Self {
429 Self {
430 topic_id: Uuid::nil(),
431 partitions: Default::default(),
432 unknown_tagged_fields: BTreeMap::new(),
433 }
434 }
435}
436
437impl Message for TopicPartitions {
438 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
439 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
440}
441
442impl HeaderVersion for ConsumerGroupHeartbeatRequest {
443 fn header_version(version: i16) -> i16 {
444 2
445 }
446}