kafka_protocol/messages/
consumer_group_heartbeat_request.rs

1//! ConsumerGroupHeartbeatRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-1
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ConsumerGroupHeartbeatRequest {
24    /// The group identifier.
25    ///
26    /// Supported API versions: 0-1
27    pub group_id: super::GroupId,
28
29    /// The member id generated by the consumer. The member id must be kept during the entire lifetime of the consumer process.
30    ///
31    /// Supported API versions: 0-1
32    pub member_id: StrBytes,
33
34    /// The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin.
35    ///
36    /// Supported API versions: 0-1
37    pub member_epoch: i32,
38
39    /// null if not provided or if it didn't change since the last heartbeat; the instance Id otherwise.
40    ///
41    /// Supported API versions: 0-1
42    pub instance_id: Option<StrBytes>,
43
44    /// null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise.
45    ///
46    /// Supported API versions: 0-1
47    pub rack_id: Option<StrBytes>,
48
49    /// -1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise.
50    ///
51    /// Supported API versions: 0-1
52    pub rebalance_timeout_ms: i32,
53
54    /// null if it didn't change since the last heartbeat; the subscribed topic names otherwise.
55    ///
56    /// Supported API versions: 0-1
57    pub subscribed_topic_names: Option<Vec<super::TopicName>>,
58
59    /// null if it didn't change since the last heartbeat; the subscribed topic regex otherwise.
60    ///
61    /// Supported API versions: 1
62    pub subscribed_topic_regex: Option<StrBytes>,
63
64    /// null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise.
65    ///
66    /// Supported API versions: 0-1
67    pub server_assignor: Option<StrBytes>,
68
69    /// null if it didn't change since the last heartbeat; the partitions owned by the member.
70    ///
71    /// Supported API versions: 0-1
72    pub topic_partitions: Option<Vec<TopicPartitions>>,
73
74    /// Other tagged fields
75    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
76}
77
78impl ConsumerGroupHeartbeatRequest {
79    /// Sets `group_id` to the passed value.
80    ///
81    /// The group identifier.
82    ///
83    /// Supported API versions: 0-1
84    pub fn with_group_id(mut self, value: super::GroupId) -> Self {
85        self.group_id = value;
86        self
87    }
88    /// Sets `member_id` to the passed value.
89    ///
90    /// The member id generated by the consumer. The member id must be kept during the entire lifetime of the consumer process.
91    ///
92    /// Supported API versions: 0-1
93    pub fn with_member_id(mut self, value: StrBytes) -> Self {
94        self.member_id = value;
95        self
96    }
97    /// Sets `member_epoch` to the passed value.
98    ///
99    /// The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin.
100    ///
101    /// Supported API versions: 0-1
102    pub fn with_member_epoch(mut self, value: i32) -> Self {
103        self.member_epoch = value;
104        self
105    }
106    /// Sets `instance_id` to the passed value.
107    ///
108    /// null if not provided or if it didn't change since the last heartbeat; the instance Id otherwise.
109    ///
110    /// Supported API versions: 0-1
111    pub fn with_instance_id(mut self, value: Option<StrBytes>) -> Self {
112        self.instance_id = value;
113        self
114    }
115    /// Sets `rack_id` to the passed value.
116    ///
117    /// null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise.
118    ///
119    /// Supported API versions: 0-1
120    pub fn with_rack_id(mut self, value: Option<StrBytes>) -> Self {
121        self.rack_id = value;
122        self
123    }
124    /// Sets `rebalance_timeout_ms` to the passed value.
125    ///
126    /// -1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise.
127    ///
128    /// Supported API versions: 0-1
129    pub fn with_rebalance_timeout_ms(mut self, value: i32) -> Self {
130        self.rebalance_timeout_ms = value;
131        self
132    }
133    /// Sets `subscribed_topic_names` to the passed value.
134    ///
135    /// null if it didn't change since the last heartbeat; the subscribed topic names otherwise.
136    ///
137    /// Supported API versions: 0-1
138    pub fn with_subscribed_topic_names(mut self, value: Option<Vec<super::TopicName>>) -> Self {
139        self.subscribed_topic_names = value;
140        self
141    }
142    /// Sets `subscribed_topic_regex` to the passed value.
143    ///
144    /// null if it didn't change since the last heartbeat; the subscribed topic regex otherwise.
145    ///
146    /// Supported API versions: 1
147    pub fn with_subscribed_topic_regex(mut self, value: Option<StrBytes>) -> Self {
148        self.subscribed_topic_regex = value;
149        self
150    }
151    /// Sets `server_assignor` to the passed value.
152    ///
153    /// null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise.
154    ///
155    /// Supported API versions: 0-1
156    pub fn with_server_assignor(mut self, value: Option<StrBytes>) -> Self {
157        self.server_assignor = value;
158        self
159    }
160    /// Sets `topic_partitions` to the passed value.
161    ///
162    /// null if it didn't change since the last heartbeat; the partitions owned by the member.
163    ///
164    /// Supported API versions: 0-1
165    pub fn with_topic_partitions(mut self, value: Option<Vec<TopicPartitions>>) -> Self {
166        self.topic_partitions = value;
167        self
168    }
169    /// Sets unknown_tagged_fields to the passed value.
170    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
171        self.unknown_tagged_fields = value;
172        self
173    }
174    /// Inserts an entry into unknown_tagged_fields.
175    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
176        self.unknown_tagged_fields.insert(key, value);
177        self
178    }
179}
180
181#[cfg(feature = "client")]
182impl Encodable for ConsumerGroupHeartbeatRequest {
183    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
184        if version < 0 || version > 1 {
185            bail!("specified version not supported by this message type");
186        }
187        types::CompactString.encode(buf, &self.group_id)?;
188        types::CompactString.encode(buf, &self.member_id)?;
189        types::Int32.encode(buf, &self.member_epoch)?;
190        types::CompactString.encode(buf, &self.instance_id)?;
191        types::CompactString.encode(buf, &self.rack_id)?;
192        types::Int32.encode(buf, &self.rebalance_timeout_ms)?;
193        types::CompactArray(types::CompactString).encode(buf, &self.subscribed_topic_names)?;
194        if version >= 1 {
195            types::CompactString.encode(buf, &self.subscribed_topic_regex)?;
196        } else {
197            if !self.subscribed_topic_regex.is_none() {
198                bail!("A field is set that is not available on the selected protocol version");
199            }
200        }
201        types::CompactString.encode(buf, &self.server_assignor)?;
202        types::CompactArray(types::Struct { version }).encode(buf, &self.topic_partitions)?;
203        let num_tagged_fields = self.unknown_tagged_fields.len();
204        if num_tagged_fields > std::u32::MAX as usize {
205            bail!(
206                "Too many tagged fields to encode ({} fields)",
207                num_tagged_fields
208            );
209        }
210        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
211
212        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
213        Ok(())
214    }
215    fn compute_size(&self, version: i16) -> Result<usize> {
216        let mut total_size = 0;
217        total_size += types::CompactString.compute_size(&self.group_id)?;
218        total_size += types::CompactString.compute_size(&self.member_id)?;
219        total_size += types::Int32.compute_size(&self.member_epoch)?;
220        total_size += types::CompactString.compute_size(&self.instance_id)?;
221        total_size += types::CompactString.compute_size(&self.rack_id)?;
222        total_size += types::Int32.compute_size(&self.rebalance_timeout_ms)?;
223        total_size +=
224            types::CompactArray(types::CompactString).compute_size(&self.subscribed_topic_names)?;
225        if version >= 1 {
226            total_size += types::CompactString.compute_size(&self.subscribed_topic_regex)?;
227        } else {
228            if !self.subscribed_topic_regex.is_none() {
229                bail!("A field is set that is not available on the selected protocol version");
230            }
231        }
232        total_size += types::CompactString.compute_size(&self.server_assignor)?;
233        total_size +=
234            types::CompactArray(types::Struct { version }).compute_size(&self.topic_partitions)?;
235        let num_tagged_fields = self.unknown_tagged_fields.len();
236        if num_tagged_fields > std::u32::MAX as usize {
237            bail!(
238                "Too many tagged fields to encode ({} fields)",
239                num_tagged_fields
240            );
241        }
242        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
243
244        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
245        Ok(total_size)
246    }
247}
248
249#[cfg(feature = "broker")]
250impl Decodable for ConsumerGroupHeartbeatRequest {
251    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
252        if version < 0 || version > 1 {
253            bail!("specified version not supported by this message type");
254        }
255        let group_id = types::CompactString.decode(buf)?;
256        let member_id = types::CompactString.decode(buf)?;
257        let member_epoch = types::Int32.decode(buf)?;
258        let instance_id = types::CompactString.decode(buf)?;
259        let rack_id = types::CompactString.decode(buf)?;
260        let rebalance_timeout_ms = types::Int32.decode(buf)?;
261        let subscribed_topic_names = types::CompactArray(types::CompactString).decode(buf)?;
262        let subscribed_topic_regex = if version >= 1 {
263            types::CompactString.decode(buf)?
264        } else {
265            None
266        };
267        let server_assignor = types::CompactString.decode(buf)?;
268        let topic_partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
269        let mut unknown_tagged_fields = BTreeMap::new();
270        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
271        for _ in 0..num_tagged_fields {
272            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
273            let size: u32 = types::UnsignedVarInt.decode(buf)?;
274            let unknown_value = buf.try_get_bytes(size as usize)?;
275            unknown_tagged_fields.insert(tag as i32, unknown_value);
276        }
277        Ok(Self {
278            group_id,
279            member_id,
280            member_epoch,
281            instance_id,
282            rack_id,
283            rebalance_timeout_ms,
284            subscribed_topic_names,
285            subscribed_topic_regex,
286            server_assignor,
287            topic_partitions,
288            unknown_tagged_fields,
289        })
290    }
291}
292
293impl Default for ConsumerGroupHeartbeatRequest {
294    fn default() -> Self {
295        Self {
296            group_id: Default::default(),
297            member_id: Default::default(),
298            member_epoch: 0,
299            instance_id: None,
300            rack_id: None,
301            rebalance_timeout_ms: -1,
302            subscribed_topic_names: None,
303            subscribed_topic_regex: None,
304            server_assignor: None,
305            topic_partitions: None,
306            unknown_tagged_fields: BTreeMap::new(),
307        }
308    }
309}
310
311impl Message for ConsumerGroupHeartbeatRequest {
312    const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
313    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
314}
315
316/// Valid versions: 0-1
317#[non_exhaustive]
318#[derive(Debug, Clone, PartialEq)]
319pub struct TopicPartitions {
320    /// The topic ID.
321    ///
322    /// Supported API versions: 0-1
323    pub topic_id: Uuid,
324
325    /// The partitions.
326    ///
327    /// Supported API versions: 0-1
328    pub partitions: Vec<i32>,
329
330    /// Other tagged fields
331    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
332}
333
334impl TopicPartitions {
335    /// Sets `topic_id` to the passed value.
336    ///
337    /// The topic ID.
338    ///
339    /// Supported API versions: 0-1
340    pub fn with_topic_id(mut self, value: Uuid) -> Self {
341        self.topic_id = value;
342        self
343    }
344    /// Sets `partitions` to the passed value.
345    ///
346    /// The partitions.
347    ///
348    /// Supported API versions: 0-1
349    pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
350        self.partitions = value;
351        self
352    }
353    /// Sets unknown_tagged_fields to the passed value.
354    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
355        self.unknown_tagged_fields = value;
356        self
357    }
358    /// Inserts an entry into unknown_tagged_fields.
359    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
360        self.unknown_tagged_fields.insert(key, value);
361        self
362    }
363}
364
365#[cfg(feature = "client")]
366impl Encodable for TopicPartitions {
367    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
368        if version < 0 || version > 1 {
369            bail!("specified version not supported by this message type");
370        }
371        types::Uuid.encode(buf, &self.topic_id)?;
372        types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
373        let num_tagged_fields = self.unknown_tagged_fields.len();
374        if num_tagged_fields > std::u32::MAX as usize {
375            bail!(
376                "Too many tagged fields to encode ({} fields)",
377                num_tagged_fields
378            );
379        }
380        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
381
382        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
383        Ok(())
384    }
385    fn compute_size(&self, version: i16) -> Result<usize> {
386        let mut total_size = 0;
387        total_size += types::Uuid.compute_size(&self.topic_id)?;
388        total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
389        let num_tagged_fields = self.unknown_tagged_fields.len();
390        if num_tagged_fields > std::u32::MAX as usize {
391            bail!(
392                "Too many tagged fields to encode ({} fields)",
393                num_tagged_fields
394            );
395        }
396        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
397
398        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
399        Ok(total_size)
400    }
401}
402
403#[cfg(feature = "broker")]
404impl Decodable for TopicPartitions {
405    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
406        if version < 0 || version > 1 {
407            bail!("specified version not supported by this message type");
408        }
409        let topic_id = types::Uuid.decode(buf)?;
410        let partitions = types::CompactArray(types::Int32).decode(buf)?;
411        let mut unknown_tagged_fields = BTreeMap::new();
412        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
413        for _ in 0..num_tagged_fields {
414            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
415            let size: u32 = types::UnsignedVarInt.decode(buf)?;
416            let unknown_value = buf.try_get_bytes(size as usize)?;
417            unknown_tagged_fields.insert(tag as i32, unknown_value);
418        }
419        Ok(Self {
420            topic_id,
421            partitions,
422            unknown_tagged_fields,
423        })
424    }
425}
426
427impl Default for TopicPartitions {
428    fn default() -> Self {
429        Self {
430            topic_id: Uuid::nil(),
431            partitions: Default::default(),
432            unknown_tagged_fields: BTreeMap::new(),
433        }
434    }
435}
436
437impl Message for TopicPartitions {
438    const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
439    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
440}
441
442impl HeaderVersion for ConsumerGroupHeartbeatRequest {
443    fn header_version(version: i16) -> i16 {
444        2
445    }
446}