kafka_protocol/messages/
join_group_request.rs

1//! JoinGroupRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/JoinGroupRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-9
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct JoinGroupRequest {
24    /// The group identifier.
25    ///
26    /// Supported API versions: 0-9
27    pub group_id: super::GroupId,
28
29    /// The coordinator considers the consumer dead if it receives no heartbeat after this timeout in milliseconds.
30    ///
31    /// Supported API versions: 0-9
32    pub session_timeout_ms: i32,
33
34    /// The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group.
35    ///
36    /// Supported API versions: 1-9
37    pub rebalance_timeout_ms: i32,
38
39    /// The member id assigned by the group coordinator.
40    ///
41    /// Supported API versions: 0-9
42    pub member_id: StrBytes,
43
44    /// The unique identifier of the consumer instance provided by end user.
45    ///
46    /// Supported API versions: 5-9
47    pub group_instance_id: Option<StrBytes>,
48
49    /// The unique name the for class of protocols implemented by the group we want to join.
50    ///
51    /// Supported API versions: 0-9
52    pub protocol_type: StrBytes,
53
54    /// The list of protocols that the member supports.
55    ///
56    /// Supported API versions: 0-9
57    pub protocols: Vec<JoinGroupRequestProtocol>,
58
59    /// The reason why the member (re-)joins the group.
60    ///
61    /// Supported API versions: 8-9
62    pub reason: Option<StrBytes>,
63
64    /// Other tagged fields
65    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
66}
67
68impl JoinGroupRequest {
69    /// Sets `group_id` to the passed value.
70    ///
71    /// The group identifier.
72    ///
73    /// Supported API versions: 0-9
74    pub fn with_group_id(mut self, value: super::GroupId) -> Self {
75        self.group_id = value;
76        self
77    }
78    /// Sets `session_timeout_ms` to the passed value.
79    ///
80    /// The coordinator considers the consumer dead if it receives no heartbeat after this timeout in milliseconds.
81    ///
82    /// Supported API versions: 0-9
83    pub fn with_session_timeout_ms(mut self, value: i32) -> Self {
84        self.session_timeout_ms = value;
85        self
86    }
87    /// Sets `rebalance_timeout_ms` to the passed value.
88    ///
89    /// The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group.
90    ///
91    /// Supported API versions: 1-9
92    pub fn with_rebalance_timeout_ms(mut self, value: i32) -> Self {
93        self.rebalance_timeout_ms = value;
94        self
95    }
96    /// Sets `member_id` to the passed value.
97    ///
98    /// The member id assigned by the group coordinator.
99    ///
100    /// Supported API versions: 0-9
101    pub fn with_member_id(mut self, value: StrBytes) -> Self {
102        self.member_id = value;
103        self
104    }
105    /// Sets `group_instance_id` to the passed value.
106    ///
107    /// The unique identifier of the consumer instance provided by end user.
108    ///
109    /// Supported API versions: 5-9
110    pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
111        self.group_instance_id = value;
112        self
113    }
114    /// Sets `protocol_type` to the passed value.
115    ///
116    /// The unique name the for class of protocols implemented by the group we want to join.
117    ///
118    /// Supported API versions: 0-9
119    pub fn with_protocol_type(mut self, value: StrBytes) -> Self {
120        self.protocol_type = value;
121        self
122    }
123    /// Sets `protocols` to the passed value.
124    ///
125    /// The list of protocols that the member supports.
126    ///
127    /// Supported API versions: 0-9
128    pub fn with_protocols(mut self, value: Vec<JoinGroupRequestProtocol>) -> Self {
129        self.protocols = value;
130        self
131    }
132    /// Sets `reason` to the passed value.
133    ///
134    /// The reason why the member (re-)joins the group.
135    ///
136    /// Supported API versions: 8-9
137    pub fn with_reason(mut self, value: Option<StrBytes>) -> Self {
138        self.reason = value;
139        self
140    }
141    /// Sets unknown_tagged_fields to the passed value.
142    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
143        self.unknown_tagged_fields = value;
144        self
145    }
146    /// Inserts an entry into unknown_tagged_fields.
147    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
148        self.unknown_tagged_fields.insert(key, value);
149        self
150    }
151}
152
153#[cfg(feature = "client")]
154impl Encodable for JoinGroupRequest {
155    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
156        if version >= 6 {
157            types::CompactString.encode(buf, &self.group_id)?;
158        } else {
159            types::String.encode(buf, &self.group_id)?;
160        }
161        types::Int32.encode(buf, &self.session_timeout_ms)?;
162        if version >= 1 {
163            types::Int32.encode(buf, &self.rebalance_timeout_ms)?;
164        }
165        if version >= 6 {
166            types::CompactString.encode(buf, &self.member_id)?;
167        } else {
168            types::String.encode(buf, &self.member_id)?;
169        }
170        if version >= 5 {
171            if version >= 6 {
172                types::CompactString.encode(buf, &self.group_instance_id)?;
173            } else {
174                types::String.encode(buf, &self.group_instance_id)?;
175            }
176        } else {
177            if !self.group_instance_id.is_none() {
178                bail!("A field is set that is not available on the selected protocol version");
179            }
180        }
181        if version >= 6 {
182            types::CompactString.encode(buf, &self.protocol_type)?;
183        } else {
184            types::String.encode(buf, &self.protocol_type)?;
185        }
186        if version >= 6 {
187            types::CompactArray(types::Struct { version }).encode(buf, &self.protocols)?;
188        } else {
189            types::Array(types::Struct { version }).encode(buf, &self.protocols)?;
190        }
191        if version >= 8 {
192            types::CompactString.encode(buf, &self.reason)?;
193        }
194        if version >= 6 {
195            let num_tagged_fields = self.unknown_tagged_fields.len();
196            if num_tagged_fields > std::u32::MAX as usize {
197                bail!(
198                    "Too many tagged fields to encode ({} fields)",
199                    num_tagged_fields
200                );
201            }
202            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
203
204            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
205        }
206        Ok(())
207    }
208    fn compute_size(&self, version: i16) -> Result<usize> {
209        let mut total_size = 0;
210        if version >= 6 {
211            total_size += types::CompactString.compute_size(&self.group_id)?;
212        } else {
213            total_size += types::String.compute_size(&self.group_id)?;
214        }
215        total_size += types::Int32.compute_size(&self.session_timeout_ms)?;
216        if version >= 1 {
217            total_size += types::Int32.compute_size(&self.rebalance_timeout_ms)?;
218        }
219        if version >= 6 {
220            total_size += types::CompactString.compute_size(&self.member_id)?;
221        } else {
222            total_size += types::String.compute_size(&self.member_id)?;
223        }
224        if version >= 5 {
225            if version >= 6 {
226                total_size += types::CompactString.compute_size(&self.group_instance_id)?;
227            } else {
228                total_size += types::String.compute_size(&self.group_instance_id)?;
229            }
230        } else {
231            if !self.group_instance_id.is_none() {
232                bail!("A field is set that is not available on the selected protocol version");
233            }
234        }
235        if version >= 6 {
236            total_size += types::CompactString.compute_size(&self.protocol_type)?;
237        } else {
238            total_size += types::String.compute_size(&self.protocol_type)?;
239        }
240        if version >= 6 {
241            total_size +=
242                types::CompactArray(types::Struct { version }).compute_size(&self.protocols)?;
243        } else {
244            total_size += types::Array(types::Struct { version }).compute_size(&self.protocols)?;
245        }
246        if version >= 8 {
247            total_size += types::CompactString.compute_size(&self.reason)?;
248        }
249        if version >= 6 {
250            let num_tagged_fields = self.unknown_tagged_fields.len();
251            if num_tagged_fields > std::u32::MAX as usize {
252                bail!(
253                    "Too many tagged fields to encode ({} fields)",
254                    num_tagged_fields
255                );
256            }
257            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
258
259            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
260        }
261        Ok(total_size)
262    }
263}
264
265#[cfg(feature = "broker")]
266impl Decodable for JoinGroupRequest {
267    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
268        let group_id = if version >= 6 {
269            types::CompactString.decode(buf)?
270        } else {
271            types::String.decode(buf)?
272        };
273        let session_timeout_ms = types::Int32.decode(buf)?;
274        let rebalance_timeout_ms = if version >= 1 {
275            types::Int32.decode(buf)?
276        } else {
277            -1
278        };
279        let member_id = if version >= 6 {
280            types::CompactString.decode(buf)?
281        } else {
282            types::String.decode(buf)?
283        };
284        let group_instance_id = if version >= 5 {
285            if version >= 6 {
286                types::CompactString.decode(buf)?
287            } else {
288                types::String.decode(buf)?
289            }
290        } else {
291            None
292        };
293        let protocol_type = if version >= 6 {
294            types::CompactString.decode(buf)?
295        } else {
296            types::String.decode(buf)?
297        };
298        let protocols = if version >= 6 {
299            types::CompactArray(types::Struct { version }).decode(buf)?
300        } else {
301            types::Array(types::Struct { version }).decode(buf)?
302        };
303        let reason = if version >= 8 {
304            types::CompactString.decode(buf)?
305        } else {
306            None
307        };
308        let mut unknown_tagged_fields = BTreeMap::new();
309        if version >= 6 {
310            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
311            for _ in 0..num_tagged_fields {
312                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
313                let size: u32 = types::UnsignedVarInt.decode(buf)?;
314                let unknown_value = buf.try_get_bytes(size as usize)?;
315                unknown_tagged_fields.insert(tag as i32, unknown_value);
316            }
317        }
318        Ok(Self {
319            group_id,
320            session_timeout_ms,
321            rebalance_timeout_ms,
322            member_id,
323            group_instance_id,
324            protocol_type,
325            protocols,
326            reason,
327            unknown_tagged_fields,
328        })
329    }
330}
331
332impl Default for JoinGroupRequest {
333    fn default() -> Self {
334        Self {
335            group_id: Default::default(),
336            session_timeout_ms: 0,
337            rebalance_timeout_ms: -1,
338            member_id: Default::default(),
339            group_instance_id: None,
340            protocol_type: Default::default(),
341            protocols: Default::default(),
342            reason: None,
343            unknown_tagged_fields: BTreeMap::new(),
344        }
345    }
346}
347
348impl Message for JoinGroupRequest {
349    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
350    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
351}
352
353/// Valid versions: 0-9
354#[non_exhaustive]
355#[derive(Debug, Clone, PartialEq)]
356pub struct JoinGroupRequestProtocol {
357    /// The protocol name.
358    ///
359    /// Supported API versions: 0-9
360    pub name: StrBytes,
361
362    /// The protocol metadata.
363    ///
364    /// Supported API versions: 0-9
365    pub metadata: Bytes,
366
367    /// Other tagged fields
368    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
369}
370
371impl JoinGroupRequestProtocol {
372    /// Sets `name` to the passed value.
373    ///
374    /// The protocol name.
375    ///
376    /// Supported API versions: 0-9
377    pub fn with_name(mut self, value: StrBytes) -> Self {
378        self.name = value;
379        self
380    }
381    /// Sets `metadata` to the passed value.
382    ///
383    /// The protocol metadata.
384    ///
385    /// Supported API versions: 0-9
386    pub fn with_metadata(mut self, value: Bytes) -> Self {
387        self.metadata = value;
388        self
389    }
390    /// Sets unknown_tagged_fields to the passed value.
391    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
392        self.unknown_tagged_fields = value;
393        self
394    }
395    /// Inserts an entry into unknown_tagged_fields.
396    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
397        self.unknown_tagged_fields.insert(key, value);
398        self
399    }
400}
401
402#[cfg(feature = "client")]
403impl Encodable for JoinGroupRequestProtocol {
404    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
405        if version >= 6 {
406            types::CompactString.encode(buf, &self.name)?;
407        } else {
408            types::String.encode(buf, &self.name)?;
409        }
410        if version >= 6 {
411            types::CompactBytes.encode(buf, &self.metadata)?;
412        } else {
413            types::Bytes.encode(buf, &self.metadata)?;
414        }
415        if version >= 6 {
416            let num_tagged_fields = self.unknown_tagged_fields.len();
417            if num_tagged_fields > std::u32::MAX as usize {
418                bail!(
419                    "Too many tagged fields to encode ({} fields)",
420                    num_tagged_fields
421                );
422            }
423            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
424
425            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
426        }
427        Ok(())
428    }
429    fn compute_size(&self, version: i16) -> Result<usize> {
430        let mut total_size = 0;
431        if version >= 6 {
432            total_size += types::CompactString.compute_size(&self.name)?;
433        } else {
434            total_size += types::String.compute_size(&self.name)?;
435        }
436        if version >= 6 {
437            total_size += types::CompactBytes.compute_size(&self.metadata)?;
438        } else {
439            total_size += types::Bytes.compute_size(&self.metadata)?;
440        }
441        if version >= 6 {
442            let num_tagged_fields = self.unknown_tagged_fields.len();
443            if num_tagged_fields > std::u32::MAX as usize {
444                bail!(
445                    "Too many tagged fields to encode ({} fields)",
446                    num_tagged_fields
447                );
448            }
449            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
450
451            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
452        }
453        Ok(total_size)
454    }
455}
456
457#[cfg(feature = "broker")]
458impl Decodable for JoinGroupRequestProtocol {
459    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
460        let name = if version >= 6 {
461            types::CompactString.decode(buf)?
462        } else {
463            types::String.decode(buf)?
464        };
465        let metadata = if version >= 6 {
466            types::CompactBytes.decode(buf)?
467        } else {
468            types::Bytes.decode(buf)?
469        };
470        let mut unknown_tagged_fields = BTreeMap::new();
471        if version >= 6 {
472            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
473            for _ in 0..num_tagged_fields {
474                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
475                let size: u32 = types::UnsignedVarInt.decode(buf)?;
476                let unknown_value = buf.try_get_bytes(size as usize)?;
477                unknown_tagged_fields.insert(tag as i32, unknown_value);
478            }
479        }
480        Ok(Self {
481            name,
482            metadata,
483            unknown_tagged_fields,
484        })
485    }
486}
487
488impl Default for JoinGroupRequestProtocol {
489    fn default() -> Self {
490        Self {
491            name: Default::default(),
492            metadata: Default::default(),
493            unknown_tagged_fields: BTreeMap::new(),
494        }
495    }
496}
497
498impl Message for JoinGroupRequestProtocol {
499    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
500    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
501}
502
503impl HeaderVersion for JoinGroupRequest {
504    fn header_version(version: i16) -> i16 {
505        if version >= 6 {
506            2
507        } else {
508            1
509        }
510    }
511}