1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct JoinGroupResponse {
24 pub throttle_time_ms: i32,
28
29 pub error_code: i16,
33
34 pub generation_id: i32,
38
39 pub protocol_type: Option<StrBytes>,
43
44 pub protocol_name: Option<StrBytes>,
48
49 pub leader: StrBytes,
53
54 pub skip_assignment: bool,
58
59 pub member_id: StrBytes,
63
64 pub members: Vec<JoinGroupResponseMember>,
68
69 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
71}
72
73impl JoinGroupResponse {
74 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
80 self.throttle_time_ms = value;
81 self
82 }
83 pub fn with_error_code(mut self, value: i16) -> Self {
89 self.error_code = value;
90 self
91 }
92 pub fn with_generation_id(mut self, value: i32) -> Self {
98 self.generation_id = value;
99 self
100 }
101 pub fn with_protocol_type(mut self, value: Option<StrBytes>) -> Self {
107 self.protocol_type = value;
108 self
109 }
110 pub fn with_protocol_name(mut self, value: Option<StrBytes>) -> Self {
116 self.protocol_name = value;
117 self
118 }
119 pub fn with_leader(mut self, value: StrBytes) -> Self {
125 self.leader = value;
126 self
127 }
128 pub fn with_skip_assignment(mut self, value: bool) -> Self {
134 self.skip_assignment = value;
135 self
136 }
137 pub fn with_member_id(mut self, value: StrBytes) -> Self {
143 self.member_id = value;
144 self
145 }
146 pub fn with_members(mut self, value: Vec<JoinGroupResponseMember>) -> Self {
152 self.members = value;
153 self
154 }
155 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
157 self.unknown_tagged_fields = value;
158 self
159 }
160 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
162 self.unknown_tagged_fields.insert(key, value);
163 self
164 }
165}
166
167#[cfg(feature = "broker")]
168impl Encodable for JoinGroupResponse {
169 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
170 if version >= 2 {
171 types::Int32.encode(buf, &self.throttle_time_ms)?;
172 }
173 types::Int16.encode(buf, &self.error_code)?;
174 types::Int32.encode(buf, &self.generation_id)?;
175 if version >= 7 {
176 types::CompactString.encode(buf, &self.protocol_type)?;
177 }
178 if version >= 6 {
179 types::CompactString.encode(buf, &self.protocol_name)?;
180 } else {
181 types::String.encode(buf, &self.protocol_name)?;
182 }
183 if version >= 6 {
184 types::CompactString.encode(buf, &self.leader)?;
185 } else {
186 types::String.encode(buf, &self.leader)?;
187 }
188 if version >= 9 {
189 types::Boolean.encode(buf, &self.skip_assignment)?;
190 } else {
191 if self.skip_assignment {
192 bail!("A field is set that is not available on the selected protocol version");
193 }
194 }
195 if version >= 6 {
196 types::CompactString.encode(buf, &self.member_id)?;
197 } else {
198 types::String.encode(buf, &self.member_id)?;
199 }
200 if version >= 6 {
201 types::CompactArray(types::Struct { version }).encode(buf, &self.members)?;
202 } else {
203 types::Array(types::Struct { version }).encode(buf, &self.members)?;
204 }
205 if version >= 6 {
206 let num_tagged_fields = self.unknown_tagged_fields.len();
207 if num_tagged_fields > std::u32::MAX as usize {
208 bail!(
209 "Too many tagged fields to encode ({} fields)",
210 num_tagged_fields
211 );
212 }
213 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
214
215 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
216 }
217 Ok(())
218 }
219 fn compute_size(&self, version: i16) -> Result<usize> {
220 let mut total_size = 0;
221 if version >= 2 {
222 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
223 }
224 total_size += types::Int16.compute_size(&self.error_code)?;
225 total_size += types::Int32.compute_size(&self.generation_id)?;
226 if version >= 7 {
227 total_size += types::CompactString.compute_size(&self.protocol_type)?;
228 }
229 if version >= 6 {
230 total_size += types::CompactString.compute_size(&self.protocol_name)?;
231 } else {
232 total_size += types::String.compute_size(&self.protocol_name)?;
233 }
234 if version >= 6 {
235 total_size += types::CompactString.compute_size(&self.leader)?;
236 } else {
237 total_size += types::String.compute_size(&self.leader)?;
238 }
239 if version >= 9 {
240 total_size += types::Boolean.compute_size(&self.skip_assignment)?;
241 } else {
242 if self.skip_assignment {
243 bail!("A field is set that is not available on the selected protocol version");
244 }
245 }
246 if version >= 6 {
247 total_size += types::CompactString.compute_size(&self.member_id)?;
248 } else {
249 total_size += types::String.compute_size(&self.member_id)?;
250 }
251 if version >= 6 {
252 total_size +=
253 types::CompactArray(types::Struct { version }).compute_size(&self.members)?;
254 } else {
255 total_size += types::Array(types::Struct { version }).compute_size(&self.members)?;
256 }
257 if version >= 6 {
258 let num_tagged_fields = self.unknown_tagged_fields.len();
259 if num_tagged_fields > std::u32::MAX as usize {
260 bail!(
261 "Too many tagged fields to encode ({} fields)",
262 num_tagged_fields
263 );
264 }
265 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
266
267 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
268 }
269 Ok(total_size)
270 }
271}
272
273#[cfg(feature = "client")]
274impl Decodable for JoinGroupResponse {
275 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
276 let throttle_time_ms = if version >= 2 {
277 types::Int32.decode(buf)?
278 } else {
279 0
280 };
281 let error_code = types::Int16.decode(buf)?;
282 let generation_id = types::Int32.decode(buf)?;
283 let protocol_type = if version >= 7 {
284 types::CompactString.decode(buf)?
285 } else {
286 None
287 };
288 let protocol_name = if version >= 6 {
289 types::CompactString.decode(buf)?
290 } else {
291 types::String.decode(buf)?
292 };
293 let leader = if version >= 6 {
294 types::CompactString.decode(buf)?
295 } else {
296 types::String.decode(buf)?
297 };
298 let skip_assignment = if version >= 9 {
299 types::Boolean.decode(buf)?
300 } else {
301 false
302 };
303 let member_id = if version >= 6 {
304 types::CompactString.decode(buf)?
305 } else {
306 types::String.decode(buf)?
307 };
308 let members = if version >= 6 {
309 types::CompactArray(types::Struct { version }).decode(buf)?
310 } else {
311 types::Array(types::Struct { version }).decode(buf)?
312 };
313 let mut unknown_tagged_fields = BTreeMap::new();
314 if version >= 6 {
315 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
316 for _ in 0..num_tagged_fields {
317 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
318 let size: u32 = types::UnsignedVarInt.decode(buf)?;
319 let unknown_value = buf.try_get_bytes(size as usize)?;
320 unknown_tagged_fields.insert(tag as i32, unknown_value);
321 }
322 }
323 Ok(Self {
324 throttle_time_ms,
325 error_code,
326 generation_id,
327 protocol_type,
328 protocol_name,
329 leader,
330 skip_assignment,
331 member_id,
332 members,
333 unknown_tagged_fields,
334 })
335 }
336}
337
338impl Default for JoinGroupResponse {
339 fn default() -> Self {
340 Self {
341 throttle_time_ms: 0,
342 error_code: 0,
343 generation_id: -1,
344 protocol_type: None,
345 protocol_name: Some(Default::default()),
346 leader: Default::default(),
347 skip_assignment: false,
348 member_id: Default::default(),
349 members: Default::default(),
350 unknown_tagged_fields: BTreeMap::new(),
351 }
352 }
353}
354
355impl Message for JoinGroupResponse {
356 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
357 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
358}
359
360#[non_exhaustive]
362#[derive(Debug, Clone, PartialEq)]
363pub struct JoinGroupResponseMember {
364 pub member_id: StrBytes,
368
369 pub group_instance_id: Option<StrBytes>,
373
374 pub metadata: Bytes,
378
379 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
381}
382
383impl JoinGroupResponseMember {
384 pub fn with_member_id(mut self, value: StrBytes) -> Self {
390 self.member_id = value;
391 self
392 }
393 pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
399 self.group_instance_id = value;
400 self
401 }
402 pub fn with_metadata(mut self, value: Bytes) -> Self {
408 self.metadata = value;
409 self
410 }
411 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
413 self.unknown_tagged_fields = value;
414 self
415 }
416 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
418 self.unknown_tagged_fields.insert(key, value);
419 self
420 }
421}
422
423#[cfg(feature = "broker")]
424impl Encodable for JoinGroupResponseMember {
425 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
426 if version >= 6 {
427 types::CompactString.encode(buf, &self.member_id)?;
428 } else {
429 types::String.encode(buf, &self.member_id)?;
430 }
431 if version >= 5 {
432 if version >= 6 {
433 types::CompactString.encode(buf, &self.group_instance_id)?;
434 } else {
435 types::String.encode(buf, &self.group_instance_id)?;
436 }
437 }
438 if version >= 6 {
439 types::CompactBytes.encode(buf, &self.metadata)?;
440 } else {
441 types::Bytes.encode(buf, &self.metadata)?;
442 }
443 if version >= 6 {
444 let num_tagged_fields = self.unknown_tagged_fields.len();
445 if num_tagged_fields > std::u32::MAX as usize {
446 bail!(
447 "Too many tagged fields to encode ({} fields)",
448 num_tagged_fields
449 );
450 }
451 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
452
453 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
454 }
455 Ok(())
456 }
457 fn compute_size(&self, version: i16) -> Result<usize> {
458 let mut total_size = 0;
459 if version >= 6 {
460 total_size += types::CompactString.compute_size(&self.member_id)?;
461 } else {
462 total_size += types::String.compute_size(&self.member_id)?;
463 }
464 if version >= 5 {
465 if version >= 6 {
466 total_size += types::CompactString.compute_size(&self.group_instance_id)?;
467 } else {
468 total_size += types::String.compute_size(&self.group_instance_id)?;
469 }
470 }
471 if version >= 6 {
472 total_size += types::CompactBytes.compute_size(&self.metadata)?;
473 } else {
474 total_size += types::Bytes.compute_size(&self.metadata)?;
475 }
476 if version >= 6 {
477 let num_tagged_fields = self.unknown_tagged_fields.len();
478 if num_tagged_fields > std::u32::MAX as usize {
479 bail!(
480 "Too many tagged fields to encode ({} fields)",
481 num_tagged_fields
482 );
483 }
484 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
485
486 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
487 }
488 Ok(total_size)
489 }
490}
491
492#[cfg(feature = "client")]
493impl Decodable for JoinGroupResponseMember {
494 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
495 let member_id = if version >= 6 {
496 types::CompactString.decode(buf)?
497 } else {
498 types::String.decode(buf)?
499 };
500 let group_instance_id = if version >= 5 {
501 if version >= 6 {
502 types::CompactString.decode(buf)?
503 } else {
504 types::String.decode(buf)?
505 }
506 } else {
507 None
508 };
509 let metadata = if version >= 6 {
510 types::CompactBytes.decode(buf)?
511 } else {
512 types::Bytes.decode(buf)?
513 };
514 let mut unknown_tagged_fields = BTreeMap::new();
515 if version >= 6 {
516 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
517 for _ in 0..num_tagged_fields {
518 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
519 let size: u32 = types::UnsignedVarInt.decode(buf)?;
520 let unknown_value = buf.try_get_bytes(size as usize)?;
521 unknown_tagged_fields.insert(tag as i32, unknown_value);
522 }
523 }
524 Ok(Self {
525 member_id,
526 group_instance_id,
527 metadata,
528 unknown_tagged_fields,
529 })
530 }
531}
532
533impl Default for JoinGroupResponseMember {
534 fn default() -> Self {
535 Self {
536 member_id: Default::default(),
537 group_instance_id: None,
538 metadata: Default::default(),
539 unknown_tagged_fields: BTreeMap::new(),
540 }
541 }
542}
543
544impl Message for JoinGroupResponseMember {
545 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
546 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
547}
548
549impl HeaderVersion for JoinGroupResponse {
550 fn header_version(version: i16) -> i16 {
551 if version >= 6 {
552 1
553 } else {
554 0
555 }
556 }
557}