1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct LeaveGroupResponse {
24 pub throttle_time_ms: i32,
28
29 pub error_code: i16,
33
34 pub members: Vec<MemberResponse>,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl LeaveGroupResponse {
44 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
50 self.throttle_time_ms = value;
51 self
52 }
53 pub fn with_error_code(mut self, value: i16) -> Self {
59 self.error_code = value;
60 self
61 }
62 pub fn with_members(mut self, value: Vec<MemberResponse>) -> Self {
68 self.members = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "broker")]
84impl Encodable for LeaveGroupResponse {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version < 0 || version > 5 {
87 bail!("specified version not supported by this message type");
88 }
89 if version >= 1 {
90 types::Int32.encode(buf, &self.throttle_time_ms)?;
91 }
92 types::Int16.encode(buf, &self.error_code)?;
93 if version >= 3 {
94 if version >= 4 {
95 types::CompactArray(types::Struct { version }).encode(buf, &self.members)?;
96 } else {
97 types::Array(types::Struct { version }).encode(buf, &self.members)?;
98 }
99 } else {
100 if !self.members.is_empty() {
101 bail!("A field is set that is not available on the selected protocol version");
102 }
103 }
104 if version >= 4 {
105 let num_tagged_fields = self.unknown_tagged_fields.len();
106 if num_tagged_fields > std::u32::MAX as usize {
107 bail!(
108 "Too many tagged fields to encode ({} fields)",
109 num_tagged_fields
110 );
111 }
112 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
113
114 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
115 }
116 Ok(())
117 }
118 fn compute_size(&self, version: i16) -> Result<usize> {
119 let mut total_size = 0;
120 if version >= 1 {
121 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
122 }
123 total_size += types::Int16.compute_size(&self.error_code)?;
124 if version >= 3 {
125 if version >= 4 {
126 total_size +=
127 types::CompactArray(types::Struct { version }).compute_size(&self.members)?;
128 } else {
129 total_size +=
130 types::Array(types::Struct { version }).compute_size(&self.members)?;
131 }
132 } else {
133 if !self.members.is_empty() {
134 bail!("A field is set that is not available on the selected protocol version");
135 }
136 }
137 if version >= 4 {
138 let num_tagged_fields = self.unknown_tagged_fields.len();
139 if num_tagged_fields > std::u32::MAX as usize {
140 bail!(
141 "Too many tagged fields to encode ({} fields)",
142 num_tagged_fields
143 );
144 }
145 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
146
147 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
148 }
149 Ok(total_size)
150 }
151}
152
153#[cfg(feature = "client")]
154impl Decodable for LeaveGroupResponse {
155 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
156 if version < 0 || version > 5 {
157 bail!("specified version not supported by this message type");
158 }
159 let throttle_time_ms = if version >= 1 {
160 types::Int32.decode(buf)?
161 } else {
162 0
163 };
164 let error_code = types::Int16.decode(buf)?;
165 let members = if version >= 3 {
166 if version >= 4 {
167 types::CompactArray(types::Struct { version }).decode(buf)?
168 } else {
169 types::Array(types::Struct { version }).decode(buf)?
170 }
171 } else {
172 Default::default()
173 };
174 let mut unknown_tagged_fields = BTreeMap::new();
175 if version >= 4 {
176 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
177 for _ in 0..num_tagged_fields {
178 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
179 let size: u32 = types::UnsignedVarInt.decode(buf)?;
180 let unknown_value = buf.try_get_bytes(size as usize)?;
181 unknown_tagged_fields.insert(tag as i32, unknown_value);
182 }
183 }
184 Ok(Self {
185 throttle_time_ms,
186 error_code,
187 members,
188 unknown_tagged_fields,
189 })
190 }
191}
192
193impl Default for LeaveGroupResponse {
194 fn default() -> Self {
195 Self {
196 throttle_time_ms: 0,
197 error_code: 0,
198 members: Default::default(),
199 unknown_tagged_fields: BTreeMap::new(),
200 }
201 }
202}
203
204impl Message for LeaveGroupResponse {
205 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
206 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
207}
208
209#[non_exhaustive]
211#[derive(Debug, Clone, PartialEq)]
212pub struct MemberResponse {
213 pub member_id: StrBytes,
217
218 pub group_instance_id: Option<StrBytes>,
222
223 pub error_code: i16,
227
228 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
230}
231
232impl MemberResponse {
233 pub fn with_member_id(mut self, value: StrBytes) -> Self {
239 self.member_id = value;
240 self
241 }
242 pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
248 self.group_instance_id = value;
249 self
250 }
251 pub fn with_error_code(mut self, value: i16) -> Self {
257 self.error_code = value;
258 self
259 }
260 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
262 self.unknown_tagged_fields = value;
263 self
264 }
265 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
267 self.unknown_tagged_fields.insert(key, value);
268 self
269 }
270}
271
272#[cfg(feature = "broker")]
273impl Encodable for MemberResponse {
274 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
275 if version < 0 || version > 5 {
276 bail!("specified version not supported by this message type");
277 }
278 if version >= 3 {
279 if version >= 4 {
280 types::CompactString.encode(buf, &self.member_id)?;
281 } else {
282 types::String.encode(buf, &self.member_id)?;
283 }
284 } else {
285 if !self.member_id.is_empty() {
286 bail!("A field is set that is not available on the selected protocol version");
287 }
288 }
289 if version >= 3 {
290 if version >= 4 {
291 types::CompactString.encode(buf, &self.group_instance_id)?;
292 } else {
293 types::String.encode(buf, &self.group_instance_id)?;
294 }
295 } else {
296 if !self
297 .group_instance_id
298 .as_ref()
299 .map(|x| x.is_empty())
300 .unwrap_or_default()
301 {
302 bail!("A field is set that is not available on the selected protocol version");
303 }
304 }
305 if version >= 3 {
306 types::Int16.encode(buf, &self.error_code)?;
307 } else {
308 if self.error_code != 0 {
309 bail!("A field is set that is not available on the selected protocol version");
310 }
311 }
312 if version >= 4 {
313 let num_tagged_fields = self.unknown_tagged_fields.len();
314 if num_tagged_fields > std::u32::MAX as usize {
315 bail!(
316 "Too many tagged fields to encode ({} fields)",
317 num_tagged_fields
318 );
319 }
320 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
321
322 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
323 }
324 Ok(())
325 }
326 fn compute_size(&self, version: i16) -> Result<usize> {
327 let mut total_size = 0;
328 if version >= 3 {
329 if version >= 4 {
330 total_size += types::CompactString.compute_size(&self.member_id)?;
331 } else {
332 total_size += types::String.compute_size(&self.member_id)?;
333 }
334 } else {
335 if !self.member_id.is_empty() {
336 bail!("A field is set that is not available on the selected protocol version");
337 }
338 }
339 if version >= 3 {
340 if version >= 4 {
341 total_size += types::CompactString.compute_size(&self.group_instance_id)?;
342 } else {
343 total_size += types::String.compute_size(&self.group_instance_id)?;
344 }
345 } else {
346 if !self
347 .group_instance_id
348 .as_ref()
349 .map(|x| x.is_empty())
350 .unwrap_or_default()
351 {
352 bail!("A field is set that is not available on the selected protocol version");
353 }
354 }
355 if version >= 3 {
356 total_size += types::Int16.compute_size(&self.error_code)?;
357 } else {
358 if self.error_code != 0 {
359 bail!("A field is set that is not available on the selected protocol version");
360 }
361 }
362 if version >= 4 {
363 let num_tagged_fields = self.unknown_tagged_fields.len();
364 if num_tagged_fields > std::u32::MAX as usize {
365 bail!(
366 "Too many tagged fields to encode ({} fields)",
367 num_tagged_fields
368 );
369 }
370 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
371
372 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
373 }
374 Ok(total_size)
375 }
376}
377
378#[cfg(feature = "client")]
379impl Decodable for MemberResponse {
380 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
381 if version < 0 || version > 5 {
382 bail!("specified version not supported by this message type");
383 }
384 let member_id = if version >= 3 {
385 if version >= 4 {
386 types::CompactString.decode(buf)?
387 } else {
388 types::String.decode(buf)?
389 }
390 } else {
391 Default::default()
392 };
393 let group_instance_id = if version >= 3 {
394 if version >= 4 {
395 types::CompactString.decode(buf)?
396 } else {
397 types::String.decode(buf)?
398 }
399 } else {
400 Some(Default::default())
401 };
402 let error_code = if version >= 3 {
403 types::Int16.decode(buf)?
404 } else {
405 0
406 };
407 let mut unknown_tagged_fields = BTreeMap::new();
408 if version >= 4 {
409 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
410 for _ in 0..num_tagged_fields {
411 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
412 let size: u32 = types::UnsignedVarInt.decode(buf)?;
413 let unknown_value = buf.try_get_bytes(size as usize)?;
414 unknown_tagged_fields.insert(tag as i32, unknown_value);
415 }
416 }
417 Ok(Self {
418 member_id,
419 group_instance_id,
420 error_code,
421 unknown_tagged_fields,
422 })
423 }
424}
425
426impl Default for MemberResponse {
427 fn default() -> Self {
428 Self {
429 member_id: Default::default(),
430 group_instance_id: Some(Default::default()),
431 error_code: 0,
432 unknown_tagged_fields: BTreeMap::new(),
433 }
434 }
435}
436
437impl Message for MemberResponse {
438 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
439 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
440}
441
442impl HeaderVersion for LeaveGroupResponse {
443 fn header_version(version: i16) -> i16 {
444 if version >= 4 {
445 1
446 } else {
447 0
448 }
449 }
450}