1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ConsumerGroupHeartbeatRequest {
24 pub group_id: super::GroupId,
28
29 pub member_id: StrBytes,
33
34 pub member_epoch: i32,
38
39 pub instance_id: Option<StrBytes>,
43
44 pub rack_id: Option<StrBytes>,
48
49 pub rebalance_timeout_ms: i32,
53
54 pub subscribed_topic_names: Option<Vec<super::TopicName>>,
58
59 pub server_assignor: Option<StrBytes>,
63
64 pub topic_partitions: Option<Vec<TopicPartitions>>,
68
69 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
71}
72
73impl ConsumerGroupHeartbeatRequest {
74 pub fn with_group_id(mut self, value: super::GroupId) -> Self {
80 self.group_id = value;
81 self
82 }
83 pub fn with_member_id(mut self, value: StrBytes) -> Self {
89 self.member_id = value;
90 self
91 }
92 pub fn with_member_epoch(mut self, value: i32) -> Self {
98 self.member_epoch = value;
99 self
100 }
101 pub fn with_instance_id(mut self, value: Option<StrBytes>) -> Self {
107 self.instance_id = value;
108 self
109 }
110 pub fn with_rack_id(mut self, value: Option<StrBytes>) -> Self {
116 self.rack_id = value;
117 self
118 }
119 pub fn with_rebalance_timeout_ms(mut self, value: i32) -> Self {
125 self.rebalance_timeout_ms = value;
126 self
127 }
128 pub fn with_subscribed_topic_names(mut self, value: Option<Vec<super::TopicName>>) -> Self {
134 self.subscribed_topic_names = value;
135 self
136 }
137 pub fn with_server_assignor(mut self, value: Option<StrBytes>) -> Self {
143 self.server_assignor = value;
144 self
145 }
146 pub fn with_topic_partitions(mut self, value: Option<Vec<TopicPartitions>>) -> Self {
152 self.topic_partitions = value;
153 self
154 }
155 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
157 self.unknown_tagged_fields = value;
158 self
159 }
160 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
162 self.unknown_tagged_fields.insert(key, value);
163 self
164 }
165}
166
167#[cfg(feature = "client")]
168impl Encodable for ConsumerGroupHeartbeatRequest {
169 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
170 types::CompactString.encode(buf, &self.group_id)?;
171 types::CompactString.encode(buf, &self.member_id)?;
172 types::Int32.encode(buf, &self.member_epoch)?;
173 types::CompactString.encode(buf, &self.instance_id)?;
174 types::CompactString.encode(buf, &self.rack_id)?;
175 types::Int32.encode(buf, &self.rebalance_timeout_ms)?;
176 types::CompactArray(types::CompactString).encode(buf, &self.subscribed_topic_names)?;
177 types::CompactString.encode(buf, &self.server_assignor)?;
178 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_partitions)?;
179 let num_tagged_fields = self.unknown_tagged_fields.len();
180 if num_tagged_fields > std::u32::MAX as usize {
181 bail!(
182 "Too many tagged fields to encode ({} fields)",
183 num_tagged_fields
184 );
185 }
186 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
187
188 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
189 Ok(())
190 }
191 fn compute_size(&self, version: i16) -> Result<usize> {
192 let mut total_size = 0;
193 total_size += types::CompactString.compute_size(&self.group_id)?;
194 total_size += types::CompactString.compute_size(&self.member_id)?;
195 total_size += types::Int32.compute_size(&self.member_epoch)?;
196 total_size += types::CompactString.compute_size(&self.instance_id)?;
197 total_size += types::CompactString.compute_size(&self.rack_id)?;
198 total_size += types::Int32.compute_size(&self.rebalance_timeout_ms)?;
199 total_size +=
200 types::CompactArray(types::CompactString).compute_size(&self.subscribed_topic_names)?;
201 total_size += types::CompactString.compute_size(&self.server_assignor)?;
202 total_size +=
203 types::CompactArray(types::Struct { version }).compute_size(&self.topic_partitions)?;
204 let num_tagged_fields = self.unknown_tagged_fields.len();
205 if num_tagged_fields > std::u32::MAX as usize {
206 bail!(
207 "Too many tagged fields to encode ({} fields)",
208 num_tagged_fields
209 );
210 }
211 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
212
213 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
214 Ok(total_size)
215 }
216}
217
218#[cfg(feature = "broker")]
219impl Decodable for ConsumerGroupHeartbeatRequest {
220 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
221 let group_id = types::CompactString.decode(buf)?;
222 let member_id = types::CompactString.decode(buf)?;
223 let member_epoch = types::Int32.decode(buf)?;
224 let instance_id = types::CompactString.decode(buf)?;
225 let rack_id = types::CompactString.decode(buf)?;
226 let rebalance_timeout_ms = types::Int32.decode(buf)?;
227 let subscribed_topic_names = types::CompactArray(types::CompactString).decode(buf)?;
228 let server_assignor = types::CompactString.decode(buf)?;
229 let topic_partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
230 let mut unknown_tagged_fields = BTreeMap::new();
231 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
232 for _ in 0..num_tagged_fields {
233 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
234 let size: u32 = types::UnsignedVarInt.decode(buf)?;
235 let unknown_value = buf.try_get_bytes(size as usize)?;
236 unknown_tagged_fields.insert(tag as i32, unknown_value);
237 }
238 Ok(Self {
239 group_id,
240 member_id,
241 member_epoch,
242 instance_id,
243 rack_id,
244 rebalance_timeout_ms,
245 subscribed_topic_names,
246 server_assignor,
247 topic_partitions,
248 unknown_tagged_fields,
249 })
250 }
251}
252
253impl Default for ConsumerGroupHeartbeatRequest {
254 fn default() -> Self {
255 Self {
256 group_id: Default::default(),
257 member_id: Default::default(),
258 member_epoch: 0,
259 instance_id: None,
260 rack_id: None,
261 rebalance_timeout_ms: -1,
262 subscribed_topic_names: None,
263 server_assignor: None,
264 topic_partitions: None,
265 unknown_tagged_fields: BTreeMap::new(),
266 }
267 }
268}
269
270impl Message for ConsumerGroupHeartbeatRequest {
271 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
272 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
273}
274
275#[non_exhaustive]
277#[derive(Debug, Clone, PartialEq)]
278pub struct TopicPartitions {
279 pub topic_id: Uuid,
283
284 pub partitions: Vec<i32>,
288
289 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
291}
292
293impl TopicPartitions {
294 pub fn with_topic_id(mut self, value: Uuid) -> Self {
300 self.topic_id = value;
301 self
302 }
303 pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
309 self.partitions = value;
310 self
311 }
312 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
314 self.unknown_tagged_fields = value;
315 self
316 }
317 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
319 self.unknown_tagged_fields.insert(key, value);
320 self
321 }
322}
323
324#[cfg(feature = "client")]
325impl Encodable for TopicPartitions {
326 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
327 types::Uuid.encode(buf, &self.topic_id)?;
328 types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
329 let num_tagged_fields = self.unknown_tagged_fields.len();
330 if num_tagged_fields > std::u32::MAX as usize {
331 bail!(
332 "Too many tagged fields to encode ({} fields)",
333 num_tagged_fields
334 );
335 }
336 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
337
338 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
339 Ok(())
340 }
341 fn compute_size(&self, version: i16) -> Result<usize> {
342 let mut total_size = 0;
343 total_size += types::Uuid.compute_size(&self.topic_id)?;
344 total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
345 let num_tagged_fields = self.unknown_tagged_fields.len();
346 if num_tagged_fields > std::u32::MAX as usize {
347 bail!(
348 "Too many tagged fields to encode ({} fields)",
349 num_tagged_fields
350 );
351 }
352 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
353
354 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
355 Ok(total_size)
356 }
357}
358
359#[cfg(feature = "broker")]
360impl Decodable for TopicPartitions {
361 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
362 let topic_id = types::Uuid.decode(buf)?;
363 let partitions = types::CompactArray(types::Int32).decode(buf)?;
364 let mut unknown_tagged_fields = BTreeMap::new();
365 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
366 for _ in 0..num_tagged_fields {
367 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
368 let size: u32 = types::UnsignedVarInt.decode(buf)?;
369 let unknown_value = buf.try_get_bytes(size as usize)?;
370 unknown_tagged_fields.insert(tag as i32, unknown_value);
371 }
372 Ok(Self {
373 topic_id,
374 partitions,
375 unknown_tagged_fields,
376 })
377 }
378}
379
380impl Default for TopicPartitions {
381 fn default() -> Self {
382 Self {
383 topic_id: Uuid::nil(),
384 partitions: Default::default(),
385 unknown_tagged_fields: BTreeMap::new(),
386 }
387 }
388}
389
390impl Message for TopicPartitions {
391 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
392 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
393}
394
395impl HeaderVersion for ConsumerGroupHeartbeatRequest {
396 fn header_version(version: i16) -> i16 {
397 2
398 }
399}