kafka_api/schemata/
sync_group_request.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use byteorder::ReadBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20// Versions 1 and 2 are the same as version 0.
21//
22// Starting from version 3, we add a new field called groupInstanceId to indicate member identity
23// across restarts.
24//
25// Version 4 is the first flexible version.
26//
27// Starting from version 5, the client sends the Protocol Type and the Protocol Name
28// to the broker (KIP-559). The broker will reject the request if they are inconsistent
29// with the Type and Name known by the broker.
30
31#[derive(Debug, Default, Clone)]
32pub struct SyncGroupRequest {
33    /// The unique group identifier.
34    pub group_id: String,
35    /// The generation of the group.
36    pub generation_id: i32,
37    /// The member ID assigned by the group.
38    pub member_id: String,
39    /// The unique identifier of the consumer instance provided by end user.
40    pub group_instance_id: Option<String>,
41    /// The group protocol type.
42    pub protocol_type: Option<String>,
43    /// The group protocol name
44    pub protocol_name: Option<String>,
45    /// Each assignment.
46    pub assignments: Vec<SyncGroupRequestAssignment>,
47    /// Unknown tagged fields.
48    pub unknown_tagged_fields: Vec<RawTaggedField>,
49}
50
51impl Decodable for SyncGroupRequest {
52    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
53        let mut this = SyncGroupRequest {
54            group_id: NullableString(version >= 4)
55                .decode(buf)?
56                .ok_or_else(|| err_decode_message_null("group_id"))?,
57            generation_id: Int32.decode(buf)?,
58            member_id: NullableString(version >= 4)
59                .decode(buf)?
60                .ok_or_else(|| err_decode_message_null("member_id"))?,
61            ..Default::default()
62        };
63        if version >= 3 {
64            this.group_instance_id = NullableString(version >= 4).decode(buf)?;
65        }
66        if version >= 5 {
67            this.protocol_type = NullableString(true).decode(buf)?;
68            this.protocol_name = NullableString(true).decode(buf)?;
69        }
70        this.assignments = NullableArray(Struct(version), version >= 4)
71            .decode(buf)?
72            .ok_or_else(|| err_decode_message_null("assignments"))?;
73        if version >= 4 {
74            this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
75        }
76        Ok(this)
77    }
78}
79
80#[derive(Debug, Default, Clone)]
81pub struct SyncGroupRequestAssignment {
82    /// The ID of the member to assign.
83    pub member_id: String,
84    /// The member assignment.
85    pub assignment: Vec<u8>,
86    /// Unknown tagged fields.
87    pub unknown_tagged_fields: Vec<RawTaggedField>,
88}
89
90impl Decodable for SyncGroupRequestAssignment {
91    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
92        if version > 5 {
93            Err(err_decode_message_unsupported(
94                version,
95                "SyncGroupRequestAssignment",
96            ))?
97        }
98        let mut this = SyncGroupRequestAssignment {
99            member_id: NullableString(version >= 4)
100                .decode(buf)?
101                .ok_or_else(|| err_decode_message_null("member_id"))?,
102            assignment: NullableBytes(version >= 4)
103                .decode(buf)?
104                .ok_or_else(|| err_decode_message_null("assignment"))?,
105            ..Default::default()
106        };
107        if version >= 4 {
108            this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
109        }
110        Ok(this)
111    }
112}