kafka_protocol/messages/
consumer_group_heartbeat_request.rs

1//! ConsumerGroupHeartbeatRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ConsumerGroupHeartbeatRequest {
24    /// The group identifier.
25    ///
26    /// Supported API versions: 0
27    pub group_id: super::GroupId,
28
29    /// The member id generated by the coordinator. The member id must be kept during the entire lifetime of the member.
30    ///
31    /// Supported API versions: 0
32    pub member_id: StrBytes,
33
34    /// The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin.
35    ///
36    /// Supported API versions: 0
37    pub member_epoch: i32,
38
39    /// null if not provided or if it didn't change since the last heartbeat; the instance Id otherwise.
40    ///
41    /// Supported API versions: 0
42    pub instance_id: Option<StrBytes>,
43
44    /// null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise.
45    ///
46    /// Supported API versions: 0
47    pub rack_id: Option<StrBytes>,
48
49    /// -1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise.
50    ///
51    /// Supported API versions: 0
52    pub rebalance_timeout_ms: i32,
53
54    /// null if it didn't change since the last heartbeat; the subscribed topic names otherwise.
55    ///
56    /// Supported API versions: 0
57    pub subscribed_topic_names: Option<Vec<super::TopicName>>,
58
59    /// null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise.
60    ///
61    /// Supported API versions: 0
62    pub server_assignor: Option<StrBytes>,
63
64    /// null if it didn't change since the last heartbeat; the partitions owned by the member.
65    ///
66    /// Supported API versions: 0
67    pub topic_partitions: Option<Vec<TopicPartitions>>,
68
69    /// Other tagged fields
70    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
71}
72
73impl ConsumerGroupHeartbeatRequest {
74    /// Sets `group_id` to the passed value.
75    ///
76    /// The group identifier.
77    ///
78    /// Supported API versions: 0
79    pub fn with_group_id(mut self, value: super::GroupId) -> Self {
80        self.group_id = value;
81        self
82    }
83    /// Sets `member_id` to the passed value.
84    ///
85    /// The member id generated by the coordinator. The member id must be kept during the entire lifetime of the member.
86    ///
87    /// Supported API versions: 0
88    pub fn with_member_id(mut self, value: StrBytes) -> Self {
89        self.member_id = value;
90        self
91    }
92    /// Sets `member_epoch` to the passed value.
93    ///
94    /// The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin.
95    ///
96    /// Supported API versions: 0
97    pub fn with_member_epoch(mut self, value: i32) -> Self {
98        self.member_epoch = value;
99        self
100    }
101    /// Sets `instance_id` to the passed value.
102    ///
103    /// null if not provided or if it didn't change since the last heartbeat; the instance Id otherwise.
104    ///
105    /// Supported API versions: 0
106    pub fn with_instance_id(mut self, value: Option<StrBytes>) -> Self {
107        self.instance_id = value;
108        self
109    }
110    /// Sets `rack_id` to the passed value.
111    ///
112    /// null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise.
113    ///
114    /// Supported API versions: 0
115    pub fn with_rack_id(mut self, value: Option<StrBytes>) -> Self {
116        self.rack_id = value;
117        self
118    }
119    /// Sets `rebalance_timeout_ms` to the passed value.
120    ///
121    /// -1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise.
122    ///
123    /// Supported API versions: 0
124    pub fn with_rebalance_timeout_ms(mut self, value: i32) -> Self {
125        self.rebalance_timeout_ms = value;
126        self
127    }
128    /// Sets `subscribed_topic_names` to the passed value.
129    ///
130    /// null if it didn't change since the last heartbeat; the subscribed topic names otherwise.
131    ///
132    /// Supported API versions: 0
133    pub fn with_subscribed_topic_names(mut self, value: Option<Vec<super::TopicName>>) -> Self {
134        self.subscribed_topic_names = value;
135        self
136    }
137    /// Sets `server_assignor` to the passed value.
138    ///
139    /// null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise.
140    ///
141    /// Supported API versions: 0
142    pub fn with_server_assignor(mut self, value: Option<StrBytes>) -> Self {
143        self.server_assignor = value;
144        self
145    }
146    /// Sets `topic_partitions` to the passed value.
147    ///
148    /// null if it didn't change since the last heartbeat; the partitions owned by the member.
149    ///
150    /// Supported API versions: 0
151    pub fn with_topic_partitions(mut self, value: Option<Vec<TopicPartitions>>) -> Self {
152        self.topic_partitions = value;
153        self
154    }
155    /// Sets unknown_tagged_fields to the passed value.
156    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
157        self.unknown_tagged_fields = value;
158        self
159    }
160    /// Inserts an entry into unknown_tagged_fields.
161    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
162        self.unknown_tagged_fields.insert(key, value);
163        self
164    }
165}
166
167#[cfg(feature = "client")]
168impl Encodable for ConsumerGroupHeartbeatRequest {
169    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
170        types::CompactString.encode(buf, &self.group_id)?;
171        types::CompactString.encode(buf, &self.member_id)?;
172        types::Int32.encode(buf, &self.member_epoch)?;
173        types::CompactString.encode(buf, &self.instance_id)?;
174        types::CompactString.encode(buf, &self.rack_id)?;
175        types::Int32.encode(buf, &self.rebalance_timeout_ms)?;
176        types::CompactArray(types::CompactString).encode(buf, &self.subscribed_topic_names)?;
177        types::CompactString.encode(buf, &self.server_assignor)?;
178        types::CompactArray(types::Struct { version }).encode(buf, &self.topic_partitions)?;
179        let num_tagged_fields = self.unknown_tagged_fields.len();
180        if num_tagged_fields > std::u32::MAX as usize {
181            bail!(
182                "Too many tagged fields to encode ({} fields)",
183                num_tagged_fields
184            );
185        }
186        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
187
188        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
189        Ok(())
190    }
191    fn compute_size(&self, version: i16) -> Result<usize> {
192        let mut total_size = 0;
193        total_size += types::CompactString.compute_size(&self.group_id)?;
194        total_size += types::CompactString.compute_size(&self.member_id)?;
195        total_size += types::Int32.compute_size(&self.member_epoch)?;
196        total_size += types::CompactString.compute_size(&self.instance_id)?;
197        total_size += types::CompactString.compute_size(&self.rack_id)?;
198        total_size += types::Int32.compute_size(&self.rebalance_timeout_ms)?;
199        total_size +=
200            types::CompactArray(types::CompactString).compute_size(&self.subscribed_topic_names)?;
201        total_size += types::CompactString.compute_size(&self.server_assignor)?;
202        total_size +=
203            types::CompactArray(types::Struct { version }).compute_size(&self.topic_partitions)?;
204        let num_tagged_fields = self.unknown_tagged_fields.len();
205        if num_tagged_fields > std::u32::MAX as usize {
206            bail!(
207                "Too many tagged fields to encode ({} fields)",
208                num_tagged_fields
209            );
210        }
211        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
212
213        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
214        Ok(total_size)
215    }
216}
217
218#[cfg(feature = "broker")]
219impl Decodable for ConsumerGroupHeartbeatRequest {
220    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
221        let group_id = types::CompactString.decode(buf)?;
222        let member_id = types::CompactString.decode(buf)?;
223        let member_epoch = types::Int32.decode(buf)?;
224        let instance_id = types::CompactString.decode(buf)?;
225        let rack_id = types::CompactString.decode(buf)?;
226        let rebalance_timeout_ms = types::Int32.decode(buf)?;
227        let subscribed_topic_names = types::CompactArray(types::CompactString).decode(buf)?;
228        let server_assignor = types::CompactString.decode(buf)?;
229        let topic_partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
230        let mut unknown_tagged_fields = BTreeMap::new();
231        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
232        for _ in 0..num_tagged_fields {
233            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
234            let size: u32 = types::UnsignedVarInt.decode(buf)?;
235            let unknown_value = buf.try_get_bytes(size as usize)?;
236            unknown_tagged_fields.insert(tag as i32, unknown_value);
237        }
238        Ok(Self {
239            group_id,
240            member_id,
241            member_epoch,
242            instance_id,
243            rack_id,
244            rebalance_timeout_ms,
245            subscribed_topic_names,
246            server_assignor,
247            topic_partitions,
248            unknown_tagged_fields,
249        })
250    }
251}
252
253impl Default for ConsumerGroupHeartbeatRequest {
254    fn default() -> Self {
255        Self {
256            group_id: Default::default(),
257            member_id: Default::default(),
258            member_epoch: 0,
259            instance_id: None,
260            rack_id: None,
261            rebalance_timeout_ms: -1,
262            subscribed_topic_names: None,
263            server_assignor: None,
264            topic_partitions: None,
265            unknown_tagged_fields: BTreeMap::new(),
266        }
267    }
268}
269
270impl Message for ConsumerGroupHeartbeatRequest {
271    const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
272    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
273}
274
275/// Valid versions: 0
276#[non_exhaustive]
277#[derive(Debug, Clone, PartialEq)]
278pub struct TopicPartitions {
279    /// The topic ID.
280    ///
281    /// Supported API versions: 0
282    pub topic_id: Uuid,
283
284    /// The partitions.
285    ///
286    /// Supported API versions: 0
287    pub partitions: Vec<i32>,
288
289    /// Other tagged fields
290    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
291}
292
293impl TopicPartitions {
294    /// Sets `topic_id` to the passed value.
295    ///
296    /// The topic ID.
297    ///
298    /// Supported API versions: 0
299    pub fn with_topic_id(mut self, value: Uuid) -> Self {
300        self.topic_id = value;
301        self
302    }
303    /// Sets `partitions` to the passed value.
304    ///
305    /// The partitions.
306    ///
307    /// Supported API versions: 0
308    pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
309        self.partitions = value;
310        self
311    }
312    /// Sets unknown_tagged_fields to the passed value.
313    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
314        self.unknown_tagged_fields = value;
315        self
316    }
317    /// Inserts an entry into unknown_tagged_fields.
318    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
319        self.unknown_tagged_fields.insert(key, value);
320        self
321    }
322}
323
324#[cfg(feature = "client")]
325impl Encodable for TopicPartitions {
326    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
327        types::Uuid.encode(buf, &self.topic_id)?;
328        types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
329        let num_tagged_fields = self.unknown_tagged_fields.len();
330        if num_tagged_fields > std::u32::MAX as usize {
331            bail!(
332                "Too many tagged fields to encode ({} fields)",
333                num_tagged_fields
334            );
335        }
336        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
337
338        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
339        Ok(())
340    }
341    fn compute_size(&self, version: i16) -> Result<usize> {
342        let mut total_size = 0;
343        total_size += types::Uuid.compute_size(&self.topic_id)?;
344        total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
345        let num_tagged_fields = self.unknown_tagged_fields.len();
346        if num_tagged_fields > std::u32::MAX as usize {
347            bail!(
348                "Too many tagged fields to encode ({} fields)",
349                num_tagged_fields
350            );
351        }
352        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
353
354        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
355        Ok(total_size)
356    }
357}
358
359#[cfg(feature = "broker")]
360impl Decodable for TopicPartitions {
361    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
362        let topic_id = types::Uuid.decode(buf)?;
363        let partitions = types::CompactArray(types::Int32).decode(buf)?;
364        let mut unknown_tagged_fields = BTreeMap::new();
365        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
366        for _ in 0..num_tagged_fields {
367            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
368            let size: u32 = types::UnsignedVarInt.decode(buf)?;
369            let unknown_value = buf.try_get_bytes(size as usize)?;
370            unknown_tagged_fields.insert(tag as i32, unknown_value);
371        }
372        Ok(Self {
373            topic_id,
374            partitions,
375            unknown_tagged_fields,
376        })
377    }
378}
379
380impl Default for TopicPartitions {
381    fn default() -> Self {
382        Self {
383            topic_id: Uuid::nil(),
384            partitions: Default::default(),
385            unknown_tagged_fields: BTreeMap::new(),
386        }
387    }
388}
389
390impl Message for TopicPartitions {
391    const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
392    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
393}
394
395impl HeaderVersion for ConsumerGroupHeartbeatRequest {
396    fn header_version(version: i16) -> i16 {
397        2
398    }
399}