kafka_protocol/messages/
sync_group_response.rs1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct SyncGroupResponse {
24 pub throttle_time_ms: i32,
28
29 pub error_code: i16,
33
34 pub protocol_type: Option<StrBytes>,
38
39 pub protocol_name: Option<StrBytes>,
43
44 pub assignment: Bytes,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl SyncGroupResponse {
54 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
60 self.throttle_time_ms = value;
61 self
62 }
63 pub fn with_error_code(mut self, value: i16) -> Self {
69 self.error_code = value;
70 self
71 }
72 pub fn with_protocol_type(mut self, value: Option<StrBytes>) -> Self {
78 self.protocol_type = value;
79 self
80 }
81 pub fn with_protocol_name(mut self, value: Option<StrBytes>) -> Self {
87 self.protocol_name = value;
88 self
89 }
90 pub fn with_assignment(mut self, value: Bytes) -> Self {
96 self.assignment = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "broker")]
112impl Encodable for SyncGroupResponse {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 if version < 0 || version > 5 {
115 bail!("specified version not supported by this message type");
116 }
117 if version >= 1 {
118 types::Int32.encode(buf, &self.throttle_time_ms)?;
119 }
120 types::Int16.encode(buf, &self.error_code)?;
121 if version >= 5 {
122 types::CompactString.encode(buf, &self.protocol_type)?;
123 }
124 if version >= 5 {
125 types::CompactString.encode(buf, &self.protocol_name)?;
126 }
127 if version >= 4 {
128 types::CompactBytes.encode(buf, &self.assignment)?;
129 } else {
130 types::Bytes.encode(buf, &self.assignment)?;
131 }
132 if version >= 4 {
133 let num_tagged_fields = self.unknown_tagged_fields.len();
134 if num_tagged_fields > std::u32::MAX as usize {
135 bail!(
136 "Too many tagged fields to encode ({} fields)",
137 num_tagged_fields
138 );
139 }
140 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
141
142 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
143 }
144 Ok(())
145 }
146 fn compute_size(&self, version: i16) -> Result<usize> {
147 let mut total_size = 0;
148 if version >= 1 {
149 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
150 }
151 total_size += types::Int16.compute_size(&self.error_code)?;
152 if version >= 5 {
153 total_size += types::CompactString.compute_size(&self.protocol_type)?;
154 }
155 if version >= 5 {
156 total_size += types::CompactString.compute_size(&self.protocol_name)?;
157 }
158 if version >= 4 {
159 total_size += types::CompactBytes.compute_size(&self.assignment)?;
160 } else {
161 total_size += types::Bytes.compute_size(&self.assignment)?;
162 }
163 if version >= 4 {
164 let num_tagged_fields = self.unknown_tagged_fields.len();
165 if num_tagged_fields > std::u32::MAX as usize {
166 bail!(
167 "Too many tagged fields to encode ({} fields)",
168 num_tagged_fields
169 );
170 }
171 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
172
173 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
174 }
175 Ok(total_size)
176 }
177}
178
179#[cfg(feature = "client")]
180impl Decodable for SyncGroupResponse {
181 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
182 if version < 0 || version > 5 {
183 bail!("specified version not supported by this message type");
184 }
185 let throttle_time_ms = if version >= 1 {
186 types::Int32.decode(buf)?
187 } else {
188 0
189 };
190 let error_code = types::Int16.decode(buf)?;
191 let protocol_type = if version >= 5 {
192 types::CompactString.decode(buf)?
193 } else {
194 None
195 };
196 let protocol_name = if version >= 5 {
197 types::CompactString.decode(buf)?
198 } else {
199 None
200 };
201 let assignment = if version >= 4 {
202 types::CompactBytes.decode(buf)?
203 } else {
204 types::Bytes.decode(buf)?
205 };
206 let mut unknown_tagged_fields = BTreeMap::new();
207 if version >= 4 {
208 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
209 for _ in 0..num_tagged_fields {
210 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
211 let size: u32 = types::UnsignedVarInt.decode(buf)?;
212 let unknown_value = buf.try_get_bytes(size as usize)?;
213 unknown_tagged_fields.insert(tag as i32, unknown_value);
214 }
215 }
216 Ok(Self {
217 throttle_time_ms,
218 error_code,
219 protocol_type,
220 protocol_name,
221 assignment,
222 unknown_tagged_fields,
223 })
224 }
225}
226
227impl Default for SyncGroupResponse {
228 fn default() -> Self {
229 Self {
230 throttle_time_ms: 0,
231 error_code: 0,
232 protocol_type: None,
233 protocol_name: None,
234 assignment: Default::default(),
235 unknown_tagged_fields: BTreeMap::new(),
236 }
237 }
238}
239
240impl Message for SyncGroupResponse {
241 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
242 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
243}
244
245impl HeaderVersion for SyncGroupResponse {
246 fn header_version(version: i16) -> i16 {
247 if version >= 4 {
248 1
249 } else {
250 0
251 }
252 }
253}