kafka_protocol/messages/
sync_group_response.rs1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct SyncGroupResponse {
24 pub throttle_time_ms: i32,
28
29 pub error_code: i16,
33
34 pub protocol_type: Option<StrBytes>,
38
39 pub protocol_name: Option<StrBytes>,
43
44 pub assignment: Bytes,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl SyncGroupResponse {
54 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
60 self.throttle_time_ms = value;
61 self
62 }
63 pub fn with_error_code(mut self, value: i16) -> Self {
69 self.error_code = value;
70 self
71 }
72 pub fn with_protocol_type(mut self, value: Option<StrBytes>) -> Self {
78 self.protocol_type = value;
79 self
80 }
81 pub fn with_protocol_name(mut self, value: Option<StrBytes>) -> Self {
87 self.protocol_name = value;
88 self
89 }
90 pub fn with_assignment(mut self, value: Bytes) -> Self {
96 self.assignment = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "broker")]
112impl Encodable for SyncGroupResponse {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 if version >= 1 {
115 types::Int32.encode(buf, &self.throttle_time_ms)?;
116 }
117 types::Int16.encode(buf, &self.error_code)?;
118 if version >= 5 {
119 types::CompactString.encode(buf, &self.protocol_type)?;
120 }
121 if version >= 5 {
122 types::CompactString.encode(buf, &self.protocol_name)?;
123 }
124 if version >= 4 {
125 types::CompactBytes.encode(buf, &self.assignment)?;
126 } else {
127 types::Bytes.encode(buf, &self.assignment)?;
128 }
129 if version >= 4 {
130 let num_tagged_fields = self.unknown_tagged_fields.len();
131 if num_tagged_fields > std::u32::MAX as usize {
132 bail!(
133 "Too many tagged fields to encode ({} fields)",
134 num_tagged_fields
135 );
136 }
137 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
138
139 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
140 }
141 Ok(())
142 }
143 fn compute_size(&self, version: i16) -> Result<usize> {
144 let mut total_size = 0;
145 if version >= 1 {
146 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
147 }
148 total_size += types::Int16.compute_size(&self.error_code)?;
149 if version >= 5 {
150 total_size += types::CompactString.compute_size(&self.protocol_type)?;
151 }
152 if version >= 5 {
153 total_size += types::CompactString.compute_size(&self.protocol_name)?;
154 }
155 if version >= 4 {
156 total_size += types::CompactBytes.compute_size(&self.assignment)?;
157 } else {
158 total_size += types::Bytes.compute_size(&self.assignment)?;
159 }
160 if version >= 4 {
161 let num_tagged_fields = self.unknown_tagged_fields.len();
162 if num_tagged_fields > std::u32::MAX as usize {
163 bail!(
164 "Too many tagged fields to encode ({} fields)",
165 num_tagged_fields
166 );
167 }
168 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
169
170 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
171 }
172 Ok(total_size)
173 }
174}
175
176#[cfg(feature = "client")]
177impl Decodable for SyncGroupResponse {
178 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
179 let throttle_time_ms = if version >= 1 {
180 types::Int32.decode(buf)?
181 } else {
182 0
183 };
184 let error_code = types::Int16.decode(buf)?;
185 let protocol_type = if version >= 5 {
186 types::CompactString.decode(buf)?
187 } else {
188 None
189 };
190 let protocol_name = if version >= 5 {
191 types::CompactString.decode(buf)?
192 } else {
193 None
194 };
195 let assignment = if version >= 4 {
196 types::CompactBytes.decode(buf)?
197 } else {
198 types::Bytes.decode(buf)?
199 };
200 let mut unknown_tagged_fields = BTreeMap::new();
201 if version >= 4 {
202 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
203 for _ in 0..num_tagged_fields {
204 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
205 let size: u32 = types::UnsignedVarInt.decode(buf)?;
206 let unknown_value = buf.try_get_bytes(size as usize)?;
207 unknown_tagged_fields.insert(tag as i32, unknown_value);
208 }
209 }
210 Ok(Self {
211 throttle_time_ms,
212 error_code,
213 protocol_type,
214 protocol_name,
215 assignment,
216 unknown_tagged_fields,
217 })
218 }
219}
220
221impl Default for SyncGroupResponse {
222 fn default() -> Self {
223 Self {
224 throttle_time_ms: 0,
225 error_code: 0,
226 protocol_type: None,
227 protocol_name: None,
228 assignment: Default::default(),
229 unknown_tagged_fields: BTreeMap::new(),
230 }
231 }
232}
233
234impl Message for SyncGroupResponse {
235 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
236 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
237}
238
239impl HeaderVersion for SyncGroupResponse {
240 fn header_version(version: i16) -> i16 {
241 if version >= 4 {
242 1
243 } else {
244 0
245 }
246 }
247}