kafka_protocol/messages/
join_group_request.rs

1//! JoinGroupRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/JoinGroupRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-9
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct JoinGroupRequest {
24    /// The group identifier.
25    ///
26    /// Supported API versions: 0-9
27    pub group_id: super::GroupId,
28
29    /// The coordinator considers the consumer dead if it receives no heartbeat after this timeout in milliseconds.
30    ///
31    /// Supported API versions: 0-9
32    pub session_timeout_ms: i32,
33
34    /// The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group.
35    ///
36    /// Supported API versions: 1-9
37    pub rebalance_timeout_ms: i32,
38
39    /// The member id assigned by the group coordinator.
40    ///
41    /// Supported API versions: 0-9
42    pub member_id: StrBytes,
43
44    /// The unique identifier of the consumer instance provided by end user.
45    ///
46    /// Supported API versions: 5-9
47    pub group_instance_id: Option<StrBytes>,
48
49    /// The unique name the for class of protocols implemented by the group we want to join.
50    ///
51    /// Supported API versions: 0-9
52    pub protocol_type: StrBytes,
53
54    /// The list of protocols that the member supports.
55    ///
56    /// Supported API versions: 0-9
57    pub protocols: Vec<JoinGroupRequestProtocol>,
58
59    /// The reason why the member (re-)joins the group.
60    ///
61    /// Supported API versions: 8-9
62    pub reason: Option<StrBytes>,
63
64    /// Other tagged fields
65    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
66}
67
68impl JoinGroupRequest {
69    /// Sets `group_id` to the passed value.
70    ///
71    /// The group identifier.
72    ///
73    /// Supported API versions: 0-9
74    pub fn with_group_id(mut self, value: super::GroupId) -> Self {
75        self.group_id = value;
76        self
77    }
78    /// Sets `session_timeout_ms` to the passed value.
79    ///
80    /// The coordinator considers the consumer dead if it receives no heartbeat after this timeout in milliseconds.
81    ///
82    /// Supported API versions: 0-9
83    pub fn with_session_timeout_ms(mut self, value: i32) -> Self {
84        self.session_timeout_ms = value;
85        self
86    }
87    /// Sets `rebalance_timeout_ms` to the passed value.
88    ///
89    /// The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group.
90    ///
91    /// Supported API versions: 1-9
92    pub fn with_rebalance_timeout_ms(mut self, value: i32) -> Self {
93        self.rebalance_timeout_ms = value;
94        self
95    }
96    /// Sets `member_id` to the passed value.
97    ///
98    /// The member id assigned by the group coordinator.
99    ///
100    /// Supported API versions: 0-9
101    pub fn with_member_id(mut self, value: StrBytes) -> Self {
102        self.member_id = value;
103        self
104    }
105    /// Sets `group_instance_id` to the passed value.
106    ///
107    /// The unique identifier of the consumer instance provided by end user.
108    ///
109    /// Supported API versions: 5-9
110    pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
111        self.group_instance_id = value;
112        self
113    }
114    /// Sets `protocol_type` to the passed value.
115    ///
116    /// The unique name the for class of protocols implemented by the group we want to join.
117    ///
118    /// Supported API versions: 0-9
119    pub fn with_protocol_type(mut self, value: StrBytes) -> Self {
120        self.protocol_type = value;
121        self
122    }
123    /// Sets `protocols` to the passed value.
124    ///
125    /// The list of protocols that the member supports.
126    ///
127    /// Supported API versions: 0-9
128    pub fn with_protocols(mut self, value: Vec<JoinGroupRequestProtocol>) -> Self {
129        self.protocols = value;
130        self
131    }
132    /// Sets `reason` to the passed value.
133    ///
134    /// The reason why the member (re-)joins the group.
135    ///
136    /// Supported API versions: 8-9
137    pub fn with_reason(mut self, value: Option<StrBytes>) -> Self {
138        self.reason = value;
139        self
140    }
141    /// Sets unknown_tagged_fields to the passed value.
142    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
143        self.unknown_tagged_fields = value;
144        self
145    }
146    /// Inserts an entry into unknown_tagged_fields.
147    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
148        self.unknown_tagged_fields.insert(key, value);
149        self
150    }
151}
152
153#[cfg(feature = "client")]
154impl Encodable for JoinGroupRequest {
155    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
156        if version < 0 || version > 9 {
157            bail!("specified version not supported by this message type");
158        }
159        if version >= 6 {
160            types::CompactString.encode(buf, &self.group_id)?;
161        } else {
162            types::String.encode(buf, &self.group_id)?;
163        }
164        types::Int32.encode(buf, &self.session_timeout_ms)?;
165        if version >= 1 {
166            types::Int32.encode(buf, &self.rebalance_timeout_ms)?;
167        }
168        if version >= 6 {
169            types::CompactString.encode(buf, &self.member_id)?;
170        } else {
171            types::String.encode(buf, &self.member_id)?;
172        }
173        if version >= 5 {
174            if version >= 6 {
175                types::CompactString.encode(buf, &self.group_instance_id)?;
176            } else {
177                types::String.encode(buf, &self.group_instance_id)?;
178            }
179        } else {
180            if !self.group_instance_id.is_none() {
181                bail!("A field is set that is not available on the selected protocol version");
182            }
183        }
184        if version >= 6 {
185            types::CompactString.encode(buf, &self.protocol_type)?;
186        } else {
187            types::String.encode(buf, &self.protocol_type)?;
188        }
189        if version >= 6 {
190            types::CompactArray(types::Struct { version }).encode(buf, &self.protocols)?;
191        } else {
192            types::Array(types::Struct { version }).encode(buf, &self.protocols)?;
193        }
194        if version >= 8 {
195            types::CompactString.encode(buf, &self.reason)?;
196        }
197        if version >= 6 {
198            let num_tagged_fields = self.unknown_tagged_fields.len();
199            if num_tagged_fields > std::u32::MAX as usize {
200                bail!(
201                    "Too many tagged fields to encode ({} fields)",
202                    num_tagged_fields
203                );
204            }
205            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
206
207            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
208        }
209        Ok(())
210    }
211    fn compute_size(&self, version: i16) -> Result<usize> {
212        let mut total_size = 0;
213        if version >= 6 {
214            total_size += types::CompactString.compute_size(&self.group_id)?;
215        } else {
216            total_size += types::String.compute_size(&self.group_id)?;
217        }
218        total_size += types::Int32.compute_size(&self.session_timeout_ms)?;
219        if version >= 1 {
220            total_size += types::Int32.compute_size(&self.rebalance_timeout_ms)?;
221        }
222        if version >= 6 {
223            total_size += types::CompactString.compute_size(&self.member_id)?;
224        } else {
225            total_size += types::String.compute_size(&self.member_id)?;
226        }
227        if version >= 5 {
228            if version >= 6 {
229                total_size += types::CompactString.compute_size(&self.group_instance_id)?;
230            } else {
231                total_size += types::String.compute_size(&self.group_instance_id)?;
232            }
233        } else {
234            if !self.group_instance_id.is_none() {
235                bail!("A field is set that is not available on the selected protocol version");
236            }
237        }
238        if version >= 6 {
239            total_size += types::CompactString.compute_size(&self.protocol_type)?;
240        } else {
241            total_size += types::String.compute_size(&self.protocol_type)?;
242        }
243        if version >= 6 {
244            total_size +=
245                types::CompactArray(types::Struct { version }).compute_size(&self.protocols)?;
246        } else {
247            total_size += types::Array(types::Struct { version }).compute_size(&self.protocols)?;
248        }
249        if version >= 8 {
250            total_size += types::CompactString.compute_size(&self.reason)?;
251        }
252        if version >= 6 {
253            let num_tagged_fields = self.unknown_tagged_fields.len();
254            if num_tagged_fields > std::u32::MAX as usize {
255                bail!(
256                    "Too many tagged fields to encode ({} fields)",
257                    num_tagged_fields
258                );
259            }
260            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
261
262            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
263        }
264        Ok(total_size)
265    }
266}
267
268#[cfg(feature = "broker")]
269impl Decodable for JoinGroupRequest {
270    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
271        if version < 0 || version > 9 {
272            bail!("specified version not supported by this message type");
273        }
274        let group_id = if version >= 6 {
275            types::CompactString.decode(buf)?
276        } else {
277            types::String.decode(buf)?
278        };
279        let session_timeout_ms = types::Int32.decode(buf)?;
280        let rebalance_timeout_ms = if version >= 1 {
281            types::Int32.decode(buf)?
282        } else {
283            -1
284        };
285        let member_id = if version >= 6 {
286            types::CompactString.decode(buf)?
287        } else {
288            types::String.decode(buf)?
289        };
290        let group_instance_id = if version >= 5 {
291            if version >= 6 {
292                types::CompactString.decode(buf)?
293            } else {
294                types::String.decode(buf)?
295            }
296        } else {
297            None
298        };
299        let protocol_type = if version >= 6 {
300            types::CompactString.decode(buf)?
301        } else {
302            types::String.decode(buf)?
303        };
304        let protocols = if version >= 6 {
305            types::CompactArray(types::Struct { version }).decode(buf)?
306        } else {
307            types::Array(types::Struct { version }).decode(buf)?
308        };
309        let reason = if version >= 8 {
310            types::CompactString.decode(buf)?
311        } else {
312            None
313        };
314        let mut unknown_tagged_fields = BTreeMap::new();
315        if version >= 6 {
316            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
317            for _ in 0..num_tagged_fields {
318                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
319                let size: u32 = types::UnsignedVarInt.decode(buf)?;
320                let unknown_value = buf.try_get_bytes(size as usize)?;
321                unknown_tagged_fields.insert(tag as i32, unknown_value);
322            }
323        }
324        Ok(Self {
325            group_id,
326            session_timeout_ms,
327            rebalance_timeout_ms,
328            member_id,
329            group_instance_id,
330            protocol_type,
331            protocols,
332            reason,
333            unknown_tagged_fields,
334        })
335    }
336}
337
338impl Default for JoinGroupRequest {
339    fn default() -> Self {
340        Self {
341            group_id: Default::default(),
342            session_timeout_ms: 0,
343            rebalance_timeout_ms: -1,
344            member_id: Default::default(),
345            group_instance_id: None,
346            protocol_type: Default::default(),
347            protocols: Default::default(),
348            reason: None,
349            unknown_tagged_fields: BTreeMap::new(),
350        }
351    }
352}
353
354impl Message for JoinGroupRequest {
355    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
356    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
357}
358
359/// Valid versions: 0-9
360#[non_exhaustive]
361#[derive(Debug, Clone, PartialEq)]
362pub struct JoinGroupRequestProtocol {
363    /// The protocol name.
364    ///
365    /// Supported API versions: 0-9
366    pub name: StrBytes,
367
368    /// The protocol metadata.
369    ///
370    /// Supported API versions: 0-9
371    pub metadata: Bytes,
372
373    /// Other tagged fields
374    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
375}
376
377impl JoinGroupRequestProtocol {
378    /// Sets `name` to the passed value.
379    ///
380    /// The protocol name.
381    ///
382    /// Supported API versions: 0-9
383    pub fn with_name(mut self, value: StrBytes) -> Self {
384        self.name = value;
385        self
386    }
387    /// Sets `metadata` to the passed value.
388    ///
389    /// The protocol metadata.
390    ///
391    /// Supported API versions: 0-9
392    pub fn with_metadata(mut self, value: Bytes) -> Self {
393        self.metadata = value;
394        self
395    }
396    /// Sets unknown_tagged_fields to the passed value.
397    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
398        self.unknown_tagged_fields = value;
399        self
400    }
401    /// Inserts an entry into unknown_tagged_fields.
402    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
403        self.unknown_tagged_fields.insert(key, value);
404        self
405    }
406}
407
408#[cfg(feature = "client")]
409impl Encodable for JoinGroupRequestProtocol {
410    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
411        if version < 0 || version > 9 {
412            bail!("specified version not supported by this message type");
413        }
414        if version >= 6 {
415            types::CompactString.encode(buf, &self.name)?;
416        } else {
417            types::String.encode(buf, &self.name)?;
418        }
419        if version >= 6 {
420            types::CompactBytes.encode(buf, &self.metadata)?;
421        } else {
422            types::Bytes.encode(buf, &self.metadata)?;
423        }
424        if version >= 6 {
425            let num_tagged_fields = self.unknown_tagged_fields.len();
426            if num_tagged_fields > std::u32::MAX as usize {
427                bail!(
428                    "Too many tagged fields to encode ({} fields)",
429                    num_tagged_fields
430                );
431            }
432            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
433
434            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
435        }
436        Ok(())
437    }
438    fn compute_size(&self, version: i16) -> Result<usize> {
439        let mut total_size = 0;
440        if version >= 6 {
441            total_size += types::CompactString.compute_size(&self.name)?;
442        } else {
443            total_size += types::String.compute_size(&self.name)?;
444        }
445        if version >= 6 {
446            total_size += types::CompactBytes.compute_size(&self.metadata)?;
447        } else {
448            total_size += types::Bytes.compute_size(&self.metadata)?;
449        }
450        if version >= 6 {
451            let num_tagged_fields = self.unknown_tagged_fields.len();
452            if num_tagged_fields > std::u32::MAX as usize {
453                bail!(
454                    "Too many tagged fields to encode ({} fields)",
455                    num_tagged_fields
456                );
457            }
458            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
459
460            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
461        }
462        Ok(total_size)
463    }
464}
465
466#[cfg(feature = "broker")]
467impl Decodable for JoinGroupRequestProtocol {
468    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
469        if version < 0 || version > 9 {
470            bail!("specified version not supported by this message type");
471        }
472        let name = if version >= 6 {
473            types::CompactString.decode(buf)?
474        } else {
475            types::String.decode(buf)?
476        };
477        let metadata = if version >= 6 {
478            types::CompactBytes.decode(buf)?
479        } else {
480            types::Bytes.decode(buf)?
481        };
482        let mut unknown_tagged_fields = BTreeMap::new();
483        if version >= 6 {
484            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
485            for _ in 0..num_tagged_fields {
486                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
487                let size: u32 = types::UnsignedVarInt.decode(buf)?;
488                let unknown_value = buf.try_get_bytes(size as usize)?;
489                unknown_tagged_fields.insert(tag as i32, unknown_value);
490            }
491        }
492        Ok(Self {
493            name,
494            metadata,
495            unknown_tagged_fields,
496        })
497    }
498}
499
500impl Default for JoinGroupRequestProtocol {
501    fn default() -> Self {
502        Self {
503            name: Default::default(),
504            metadata: Default::default(),
505            unknown_tagged_fields: BTreeMap::new(),
506        }
507    }
508}
509
510impl Message for JoinGroupRequestProtocol {
511    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
512    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
513}
514
515impl HeaderVersion for JoinGroupRequest {
516    fn header_version(version: i16) -> i16 {
517        if version >= 6 {
518            2
519        } else {
520            1
521        }
522    }
523}