kafka_protocol/messages/
consumer_group_heartbeat_request.rs

1//! ConsumerGroupHeartbeatRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ConsumerGroupHeartbeatRequest {
24    /// The group identifier.
25    ///
26    /// Supported API versions: 0
27    pub group_id: super::GroupId,
28
29    /// The member id generated by the coordinator. The member id must be kept during the entire lifetime of the member.
30    ///
31    /// Supported API versions: 0
32    pub member_id: StrBytes,
33
34    /// The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin.
35    ///
36    /// Supported API versions: 0
37    pub member_epoch: i32,
38
39    /// null if not provided or if it didn't change since the last heartbeat; the instance Id otherwise.
40    ///
41    /// Supported API versions: 0
42    pub instance_id: Option<StrBytes>,
43
44    /// null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise.
45    ///
46    /// Supported API versions: 0
47    pub rack_id: Option<StrBytes>,
48
49    /// -1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise.
50    ///
51    /// Supported API versions: 0
52    pub rebalance_timeout_ms: i32,
53
54    /// null if it didn't change since the last heartbeat; the subscribed topic names otherwise.
55    ///
56    /// Supported API versions: 0
57    pub subscribed_topic_names: Option<Vec<super::TopicName>>,
58
59    /// null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise.
60    ///
61    /// Supported API versions: 0
62    pub server_assignor: Option<StrBytes>,
63
64    /// null if it didn't change since the last heartbeat; the partitions owned by the member.
65    ///
66    /// Supported API versions: 0
67    pub topic_partitions: Option<Vec<TopicPartitions>>,
68
69    /// Other tagged fields
70    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
71}
72
73impl ConsumerGroupHeartbeatRequest {
74    /// Sets `group_id` to the passed value.
75    ///
76    /// The group identifier.
77    ///
78    /// Supported API versions: 0
79    pub fn with_group_id(mut self, value: super::GroupId) -> Self {
80        self.group_id = value;
81        self
82    }
83    /// Sets `member_id` to the passed value.
84    ///
85    /// The member id generated by the coordinator. The member id must be kept during the entire lifetime of the member.
86    ///
87    /// Supported API versions: 0
88    pub fn with_member_id(mut self, value: StrBytes) -> Self {
89        self.member_id = value;
90        self
91    }
92    /// Sets `member_epoch` to the passed value.
93    ///
94    /// The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin.
95    ///
96    /// Supported API versions: 0
97    pub fn with_member_epoch(mut self, value: i32) -> Self {
98        self.member_epoch = value;
99        self
100    }
101    /// Sets `instance_id` to the passed value.
102    ///
103    /// null if not provided or if it didn't change since the last heartbeat; the instance Id otherwise.
104    ///
105    /// Supported API versions: 0
106    pub fn with_instance_id(mut self, value: Option<StrBytes>) -> Self {
107        self.instance_id = value;
108        self
109    }
110    /// Sets `rack_id` to the passed value.
111    ///
112    /// null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise.
113    ///
114    /// Supported API versions: 0
115    pub fn with_rack_id(mut self, value: Option<StrBytes>) -> Self {
116        self.rack_id = value;
117        self
118    }
119    /// Sets `rebalance_timeout_ms` to the passed value.
120    ///
121    /// -1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise.
122    ///
123    /// Supported API versions: 0
124    pub fn with_rebalance_timeout_ms(mut self, value: i32) -> Self {
125        self.rebalance_timeout_ms = value;
126        self
127    }
128    /// Sets `subscribed_topic_names` to the passed value.
129    ///
130    /// null if it didn't change since the last heartbeat; the subscribed topic names otherwise.
131    ///
132    /// Supported API versions: 0
133    pub fn with_subscribed_topic_names(mut self, value: Option<Vec<super::TopicName>>) -> Self {
134        self.subscribed_topic_names = value;
135        self
136    }
137    /// Sets `server_assignor` to the passed value.
138    ///
139    /// null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise.
140    ///
141    /// Supported API versions: 0
142    pub fn with_server_assignor(mut self, value: Option<StrBytes>) -> Self {
143        self.server_assignor = value;
144        self
145    }
146    /// Sets `topic_partitions` to the passed value.
147    ///
148    /// null if it didn't change since the last heartbeat; the partitions owned by the member.
149    ///
150    /// Supported API versions: 0
151    pub fn with_topic_partitions(mut self, value: Option<Vec<TopicPartitions>>) -> Self {
152        self.topic_partitions = value;
153        self
154    }
155    /// Sets unknown_tagged_fields to the passed value.
156    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
157        self.unknown_tagged_fields = value;
158        self
159    }
160    /// Inserts an entry into unknown_tagged_fields.
161    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
162        self.unknown_tagged_fields.insert(key, value);
163        self
164    }
165}
166
167#[cfg(feature = "client")]
168impl Encodable for ConsumerGroupHeartbeatRequest {
169    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
170        if version != 0 {
171            bail!("specified version not supported by this message type");
172        }
173        types::CompactString.encode(buf, &self.group_id)?;
174        types::CompactString.encode(buf, &self.member_id)?;
175        types::Int32.encode(buf, &self.member_epoch)?;
176        types::CompactString.encode(buf, &self.instance_id)?;
177        types::CompactString.encode(buf, &self.rack_id)?;
178        types::Int32.encode(buf, &self.rebalance_timeout_ms)?;
179        types::CompactArray(types::CompactString).encode(buf, &self.subscribed_topic_names)?;
180        types::CompactString.encode(buf, &self.server_assignor)?;
181        types::CompactArray(types::Struct { version }).encode(buf, &self.topic_partitions)?;
182        let num_tagged_fields = self.unknown_tagged_fields.len();
183        if num_tagged_fields > std::u32::MAX as usize {
184            bail!(
185                "Too many tagged fields to encode ({} fields)",
186                num_tagged_fields
187            );
188        }
189        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
190
191        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
192        Ok(())
193    }
194    fn compute_size(&self, version: i16) -> Result<usize> {
195        let mut total_size = 0;
196        total_size += types::CompactString.compute_size(&self.group_id)?;
197        total_size += types::CompactString.compute_size(&self.member_id)?;
198        total_size += types::Int32.compute_size(&self.member_epoch)?;
199        total_size += types::CompactString.compute_size(&self.instance_id)?;
200        total_size += types::CompactString.compute_size(&self.rack_id)?;
201        total_size += types::Int32.compute_size(&self.rebalance_timeout_ms)?;
202        total_size +=
203            types::CompactArray(types::CompactString).compute_size(&self.subscribed_topic_names)?;
204        total_size += types::CompactString.compute_size(&self.server_assignor)?;
205        total_size +=
206            types::CompactArray(types::Struct { version }).compute_size(&self.topic_partitions)?;
207        let num_tagged_fields = self.unknown_tagged_fields.len();
208        if num_tagged_fields > std::u32::MAX as usize {
209            bail!(
210                "Too many tagged fields to encode ({} fields)",
211                num_tagged_fields
212            );
213        }
214        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
215
216        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
217        Ok(total_size)
218    }
219}
220
221#[cfg(feature = "broker")]
222impl Decodable for ConsumerGroupHeartbeatRequest {
223    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
224        if version != 0 {
225            bail!("specified version not supported by this message type");
226        }
227        let group_id = types::CompactString.decode(buf)?;
228        let member_id = types::CompactString.decode(buf)?;
229        let member_epoch = types::Int32.decode(buf)?;
230        let instance_id = types::CompactString.decode(buf)?;
231        let rack_id = types::CompactString.decode(buf)?;
232        let rebalance_timeout_ms = types::Int32.decode(buf)?;
233        let subscribed_topic_names = types::CompactArray(types::CompactString).decode(buf)?;
234        let server_assignor = types::CompactString.decode(buf)?;
235        let topic_partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
236        let mut unknown_tagged_fields = BTreeMap::new();
237        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
238        for _ in 0..num_tagged_fields {
239            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
240            let size: u32 = types::UnsignedVarInt.decode(buf)?;
241            let unknown_value = buf.try_get_bytes(size as usize)?;
242            unknown_tagged_fields.insert(tag as i32, unknown_value);
243        }
244        Ok(Self {
245            group_id,
246            member_id,
247            member_epoch,
248            instance_id,
249            rack_id,
250            rebalance_timeout_ms,
251            subscribed_topic_names,
252            server_assignor,
253            topic_partitions,
254            unknown_tagged_fields,
255        })
256    }
257}
258
259impl Default for ConsumerGroupHeartbeatRequest {
260    fn default() -> Self {
261        Self {
262            group_id: Default::default(),
263            member_id: Default::default(),
264            member_epoch: 0,
265            instance_id: None,
266            rack_id: None,
267            rebalance_timeout_ms: -1,
268            subscribed_topic_names: None,
269            server_assignor: None,
270            topic_partitions: None,
271            unknown_tagged_fields: BTreeMap::new(),
272        }
273    }
274}
275
276impl Message for ConsumerGroupHeartbeatRequest {
277    const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
278    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
279}
280
281/// Valid versions: 0
282#[non_exhaustive]
283#[derive(Debug, Clone, PartialEq)]
284pub struct TopicPartitions {
285    /// The topic ID.
286    ///
287    /// Supported API versions: 0
288    pub topic_id: Uuid,
289
290    /// The partitions.
291    ///
292    /// Supported API versions: 0
293    pub partitions: Vec<i32>,
294
295    /// Other tagged fields
296    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
297}
298
299impl TopicPartitions {
300    /// Sets `topic_id` to the passed value.
301    ///
302    /// The topic ID.
303    ///
304    /// Supported API versions: 0
305    pub fn with_topic_id(mut self, value: Uuid) -> Self {
306        self.topic_id = value;
307        self
308    }
309    /// Sets `partitions` to the passed value.
310    ///
311    /// The partitions.
312    ///
313    /// Supported API versions: 0
314    pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
315        self.partitions = value;
316        self
317    }
318    /// Sets unknown_tagged_fields to the passed value.
319    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
320        self.unknown_tagged_fields = value;
321        self
322    }
323    /// Inserts an entry into unknown_tagged_fields.
324    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
325        self.unknown_tagged_fields.insert(key, value);
326        self
327    }
328}
329
330#[cfg(feature = "client")]
331impl Encodable for TopicPartitions {
332    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
333        if version != 0 {
334            bail!("specified version not supported by this message type");
335        }
336        types::Uuid.encode(buf, &self.topic_id)?;
337        types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
338        let num_tagged_fields = self.unknown_tagged_fields.len();
339        if num_tagged_fields > std::u32::MAX as usize {
340            bail!(
341                "Too many tagged fields to encode ({} fields)",
342                num_tagged_fields
343            );
344        }
345        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
346
347        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
348        Ok(())
349    }
350    fn compute_size(&self, version: i16) -> Result<usize> {
351        let mut total_size = 0;
352        total_size += types::Uuid.compute_size(&self.topic_id)?;
353        total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
354        let num_tagged_fields = self.unknown_tagged_fields.len();
355        if num_tagged_fields > std::u32::MAX as usize {
356            bail!(
357                "Too many tagged fields to encode ({} fields)",
358                num_tagged_fields
359            );
360        }
361        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
362
363        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
364        Ok(total_size)
365    }
366}
367
368#[cfg(feature = "broker")]
369impl Decodable for TopicPartitions {
370    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
371        if version != 0 {
372            bail!("specified version not supported by this message type");
373        }
374        let topic_id = types::Uuid.decode(buf)?;
375        let partitions = types::CompactArray(types::Int32).decode(buf)?;
376        let mut unknown_tagged_fields = BTreeMap::new();
377        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
378        for _ in 0..num_tagged_fields {
379            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
380            let size: u32 = types::UnsignedVarInt.decode(buf)?;
381            let unknown_value = buf.try_get_bytes(size as usize)?;
382            unknown_tagged_fields.insert(tag as i32, unknown_value);
383        }
384        Ok(Self {
385            topic_id,
386            partitions,
387            unknown_tagged_fields,
388        })
389    }
390}
391
392impl Default for TopicPartitions {
393    fn default() -> Self {
394        Self {
395            topic_id: Uuid::nil(),
396            partitions: Default::default(),
397            unknown_tagged_fields: BTreeMap::new(),
398        }
399    }
400}
401
402impl Message for TopicPartitions {
403    const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
404    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
405}
406
407impl HeaderVersion for ConsumerGroupHeartbeatRequest {
408    fn header_version(version: i16) -> i16 {
409        2
410    }
411}