kafka_api/schemata/join_group_request.rs
1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use byteorder::ReadBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20// Version 1 adds RebalanceTimeoutMs.
21//
22// Version 2 and 3 are the same as version 1.
23//
24// Starting from version 4, the client needs to issue a second request to join group
25//
26// Starting from version 5, we add a new field called groupInstanceId to indicate member identity
27// across restarts. with assigned id.
28//
29// Version 6 is the first flexible version.
30//
31// Version 7 is the same as version 6.
32//
33// Version 8 adds the Reason field (KIP-800).
34//
35// Version 9 is the same as version 8.
36
37#[derive(Debug, Default, Clone)]
38pub struct JoinGroupRequest {
39 /// The group identifier.
40 pub group_id: String,
41 /// The coordinator considers the consumer dead if it receives no heartbeat after this timeout
42 /// in milliseconds.
43 pub session_timeout_ms: i32,
44 /// The maximum time in milliseconds that the coordinator will wait for each member to rejoin
45 /// when rebalancing the group.
46 pub rebalance_timeout_ms: i32,
47 /// The member id assigned by the group coordinator.
48 pub member_id: String,
49 /// The unique identifier of the consumer instance provided by end user.
50 pub group_instance_id: Option<String>,
51 /// The unique name the for class of protocols implemented by the group we want to join.
52 pub protocol_type: String,
53 /// The list of protocols that the member supports.
54 pub protocols: Vec<JoinGroupRequestProtocol>,
55 /// The reason why the member (re-)joins the group.
56 pub reason: Option<String>,
57 /// Unknown tagged fields.
58 pub unknown_tagged_fields: Vec<RawTaggedField>,
59}
60
61impl Decodable for JoinGroupRequest {
62 fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
63 let mut this = JoinGroupRequest {
64 group_id: NullableString(version >= 6)
65 .decode(buf)?
66 .ok_or_else(|| err_decode_message_null("group_id"))?,
67 session_timeout_ms: Int32.decode(buf)?,
68 ..Default::default()
69 };
70 this.rebalance_timeout_ms = if version >= 1 { Int32.decode(buf)? } else { -1 };
71 this.member_id = NullableString(version >= 6)
72 .decode(buf)?
73 .ok_or_else(|| err_decode_message_null("member_id"))?;
74 if version >= 5 {
75 this.group_instance_id = NullableString(version >= 6).decode(buf)?;
76 }
77 this.protocol_type = NullableString(version >= 6)
78 .decode(buf)?
79 .ok_or_else(|| err_decode_message_null("protocol_type"))?;
80 this.protocols = NullableArray(Struct(version), version >= 6)
81 .decode(buf)?
82 .ok_or_else(|| err_decode_message_null("protocols"))?;
83 if version >= 8 {
84 this.reason = NullableString(true).decode(buf)?;
85 }
86 if version >= 6 {
87 this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
88 }
89 Ok(this)
90 }
91}
92
93#[derive(Debug, Default, Clone)]
94pub struct JoinGroupRequestProtocol {
95 /// The protocol name.
96 pub name: String,
97 /// The protocol metadata.
98 pub metadata: Vec<u8>,
99 /// Unknown tagged fields.
100 pub unknown_tagged_fields: Vec<RawTaggedField>,
101}
102
103impl Decodable for JoinGroupRequestProtocol {
104 fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
105 if version > 9 {
106 Err(err_decode_message_unsupported(
107 version,
108 "JoinGroupRequestProtocol",
109 ))?
110 }
111 let mut this = JoinGroupRequestProtocol {
112 name: NullableString(version >= 6)
113 .decode(buf)?
114 .ok_or_else(|| err_decode_message_null("name"))?,
115 metadata: NullableBytes(version >= 6)
116 .decode(buf)?
117 .ok_or_else(|| err_decode_message_null("metadata"))?,
118 ..Default::default()
119 };
120 if version >= 6 {
121 this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
122 }
123 Ok(this)
124 }
125}