kafka_api/schemata/
join_group_response.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use byteorder::WriteBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20// Version 1 is the same as version 0.
21//
22// Version 2 adds throttle time.
23//
24// Starting in version 3, on quota violation, brokers send out responses before throttling.
25//
26// Starting in version 4, the client needs to issue a second request to join group
27// with assigned id.
28//
29// Version 5 is bumped to apply group.instance.id to identify member across restarts.
30//
31// Version 6 is the first flexible version.
32//
33// Starting from version 7, the broker sends back the Protocol Type to the client (KIP-559).
34//
35// Version 8 is the same as version 7.
36//
37// Version 9 adds the SkipAssignment field.
38
39#[derive(Debug, Default, Clone)]
40pub struct JoinGroupResponse {
41    /// The duration in milliseconds for which the request was throttled due to a quota violation,
42    /// or zero if the request did not violate any quota.
43    pub throttle_time_ms: i32,
44    /// The error code, or 0 if there was no error.
45    pub error_code: i16,
46    /// The generation ID of the group.
47    pub generation_id: i32,
48    /// The group protocol name.
49    pub protocol_type: Option<String>,
50    /// The group protocol selected by the coordinator.
51    pub protocol_name: Option<String>,
52    /// The leader of the group.
53    pub leader: String,
54    /// True if the leader must skip running the assignment.
55    pub skip_assignment: bool,
56    /// The member id assigned by the group coordinator.
57    pub member_id: String,
58    pub members: Vec<JoinGroupResponseMember>,
59    /// Unknown tagged fields.
60    pub unknown_tagged_fields: Vec<RawTaggedField>,
61}
62
63impl Encodable for JoinGroupResponse {
64    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
65        if version >= 2 {
66            Int32.encode(buf, self.throttle_time_ms)?;
67        }
68        Int16.encode(buf, self.error_code)?;
69        Int32.encode(buf, self.generation_id)?;
70        if version >= 7 {
71            NullableString(true).encode(buf, self.protocol_type.as_deref())?;
72        }
73        if version < 7 && self.protocol_name.is_none() {
74            Err(err_encode_message_null("protocol_name"))?
75        }
76        NullableString(version >= 6).encode(buf, self.protocol_name.as_deref())?;
77        NullableString(version >= 6).encode(buf, self.leader.as_str())?;
78        if version >= 9 {
79            Bool.encode(buf, self.skip_assignment)?;
80        }
81        NullableString(version >= 6).encode(buf, self.member_id.as_str())?;
82        NullableArray(Struct(version), version >= 6).encode(buf, self.members.as_slice())?;
83        if version >= 6 {
84            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
85        }
86        Ok(())
87    }
88
89    fn calculate_size(&self, version: i16) -> usize {
90        let mut res = 0;
91        if version >= 2 {
92            res += Int32::SIZE; // self.throttle_time_ms
93        }
94        res += Int16::SIZE; // self.error_code
95        res += Int32::SIZE; // self.generation_id
96        if version >= 7 {
97            res += NullableString(true).calculate_size(self.protocol_type.as_deref());
98        }
99        res += NullableString(version >= 6).calculate_size(self.protocol_name.as_deref());
100        res += NullableString(version >= 6).calculate_size(self.leader.as_str());
101        if version >= 9 {
102            res += Bool::SIZE; // self.skip_assignment
103        }
104        res += NullableString(version >= 6).calculate_size(self.member_id.as_str());
105        res += NullableArray(Struct(version), version >= 6).calculate_size(self.members.as_slice());
106        if version >= 6 {
107            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
108        }
109        res
110    }
111}
112
113#[derive(Debug, Default, Clone)]
114pub struct JoinGroupResponseMember {
115    /// The group member ID
116    pub member_id: String,
117    /// The unique identifier of the consumer instance provided by end user.
118    pub group_instance_id: Option<String>,
119    /// The group member metadata.
120    pub metadata: Vec<u8>,
121    /// Unknown tagged fields.
122    pub unknown_tagged_fields: Vec<RawTaggedField>,
123}
124
125impl Encodable for JoinGroupResponseMember {
126    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
127        NullableString(version >= 6).encode(buf, self.member_id.as_str())?;
128        if version >= 5 {
129            NullableString(version >= 6).encode(buf, self.group_instance_id.as_deref())?;
130        }
131        NullableBytes(version >= 6).encode(buf, &self.metadata)?;
132        if version >= 6 {
133            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
134        }
135        Ok(())
136    }
137
138    fn calculate_size(&self, version: i16) -> usize {
139        let mut res = 0;
140        res += NullableString(version >= 6).calculate_size(self.member_id.as_str());
141        if version >= 5 {
142            res += NullableString(version >= 6).calculate_size(self.group_instance_id.as_deref());
143        }
144        res += NullableBytes(version >= 6).calculate_size(&self.metadata);
145        if version >= 6 {
146            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
147        }
148        res
149    }
150}