1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
1415use byteorder::ReadBytesExt;
1617use crate::codec::*;
18use crate::IoResult;
1920// Versions 1 and 2 are the same as version 0.
21//
22// Starting from version 3, we add a new field called groupInstanceId to indicate member identity
23// across restarts.
24//
25// Version 4 is the first flexible version.
26//
27// Starting from version 5, the client sends the Protocol Type and the Protocol Name
28// to the broker (KIP-559). The broker will reject the request if they are inconsistent
29// with the Type and Name known by the broker.
3031#[derive(Debug, Default, Clone)]
32pub struct SyncGroupRequest {
33/// The unique group identifier.
34pub group_id: String,
35/// The generation of the group.
36pub generation_id: i32,
37/// The member ID assigned by the group.
38pub member_id: String,
39/// The unique identifier of the consumer instance provided by end user.
40pub group_instance_id: Option<String>,
41/// The group protocol type.
42pub protocol_type: Option<String>,
43/// The group protocol name
44pub protocol_name: Option<String>,
45/// Each assignment.
46pub assignments: Vec<SyncGroupRequestAssignment>,
47/// Unknown tagged fields.
48pub unknown_tagged_fields: Vec<RawTaggedField>,
49}
5051impl Decodable for SyncGroupRequest {
52fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
53let mut this = SyncGroupRequest {
54 group_id: NullableString(version >= 4)
55 .decode(buf)?
56.ok_or_else(|| err_decode_message_null("group_id"))?,
57 generation_id: Int32.decode(buf)?,
58 member_id: NullableString(version >= 4)
59 .decode(buf)?
60.ok_or_else(|| err_decode_message_null("member_id"))?,
61 ..Default::default()
62 };
63if version >= 3 {
64 this.group_instance_id = NullableString(version >= 4).decode(buf)?;
65 }
66if version >= 5 {
67 this.protocol_type = NullableString(true).decode(buf)?;
68 this.protocol_name = NullableString(true).decode(buf)?;
69 }
70 this.assignments = NullableArray(Struct(version), version >= 4)
71 .decode(buf)?
72.ok_or_else(|| err_decode_message_null("assignments"))?;
73if version >= 4 {
74 this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
75 }
76Ok(this)
77 }
78}
7980#[derive(Debug, Default, Clone)]
81pub struct SyncGroupRequestAssignment {
82/// The ID of the member to assign.
83pub member_id: String,
84/// The member assignment.
85pub assignment: Vec<u8>,
86/// Unknown tagged fields.
87pub unknown_tagged_fields: Vec<RawTaggedField>,
88}
8990impl Decodable for SyncGroupRequestAssignment {
91fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
92if version > 5 {
93Err(err_decode_message_unsupported(
94 version,
95"SyncGroupRequestAssignment",
96 ))?
97}
98let mut this = SyncGroupRequestAssignment {
99 member_id: NullableString(version >= 4)
100 .decode(buf)?
101.ok_or_else(|| err_decode_message_null("member_id"))?,
102 assignment: NullableBytes(version >= 4)
103 .decode(buf)?
104.ok_or_else(|| err_decode_message_null("assignment"))?,
105 ..Default::default()
106 };
107if version >= 4 {
108 this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
109 }
110Ok(this)
111 }
112}