1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct JoinGroupRequest {
24 pub group_id: super::GroupId,
28
29 pub session_timeout_ms: i32,
33
34 pub rebalance_timeout_ms: i32,
38
39 pub member_id: StrBytes,
43
44 pub group_instance_id: Option<StrBytes>,
48
49 pub protocol_type: StrBytes,
53
54 pub protocols: Vec<JoinGroupRequestProtocol>,
58
59 pub reason: Option<StrBytes>,
63
64 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
66}
67
68impl JoinGroupRequest {
69 pub fn with_group_id(mut self, value: super::GroupId) -> Self {
75 self.group_id = value;
76 self
77 }
78 pub fn with_session_timeout_ms(mut self, value: i32) -> Self {
84 self.session_timeout_ms = value;
85 self
86 }
87 pub fn with_rebalance_timeout_ms(mut self, value: i32) -> Self {
93 self.rebalance_timeout_ms = value;
94 self
95 }
96 pub fn with_member_id(mut self, value: StrBytes) -> Self {
102 self.member_id = value;
103 self
104 }
105 pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
111 self.group_instance_id = value;
112 self
113 }
114 pub fn with_protocol_type(mut self, value: StrBytes) -> Self {
120 self.protocol_type = value;
121 self
122 }
123 pub fn with_protocols(mut self, value: Vec<JoinGroupRequestProtocol>) -> Self {
129 self.protocols = value;
130 self
131 }
132 pub fn with_reason(mut self, value: Option<StrBytes>) -> Self {
138 self.reason = value;
139 self
140 }
141 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
143 self.unknown_tagged_fields = value;
144 self
145 }
146 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
148 self.unknown_tagged_fields.insert(key, value);
149 self
150 }
151}
152
153#[cfg(feature = "client")]
154impl Encodable for JoinGroupRequest {
155 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
156 if version < 0 || version > 9 {
157 bail!("specified version not supported by this message type");
158 }
159 if version >= 6 {
160 types::CompactString.encode(buf, &self.group_id)?;
161 } else {
162 types::String.encode(buf, &self.group_id)?;
163 }
164 types::Int32.encode(buf, &self.session_timeout_ms)?;
165 if version >= 1 {
166 types::Int32.encode(buf, &self.rebalance_timeout_ms)?;
167 }
168 if version >= 6 {
169 types::CompactString.encode(buf, &self.member_id)?;
170 } else {
171 types::String.encode(buf, &self.member_id)?;
172 }
173 if version >= 5 {
174 if version >= 6 {
175 types::CompactString.encode(buf, &self.group_instance_id)?;
176 } else {
177 types::String.encode(buf, &self.group_instance_id)?;
178 }
179 } else {
180 if !self.group_instance_id.is_none() {
181 bail!("A field is set that is not available on the selected protocol version");
182 }
183 }
184 if version >= 6 {
185 types::CompactString.encode(buf, &self.protocol_type)?;
186 } else {
187 types::String.encode(buf, &self.protocol_type)?;
188 }
189 if version >= 6 {
190 types::CompactArray(types::Struct { version }).encode(buf, &self.protocols)?;
191 } else {
192 types::Array(types::Struct { version }).encode(buf, &self.protocols)?;
193 }
194 if version >= 8 {
195 types::CompactString.encode(buf, &self.reason)?;
196 }
197 if version >= 6 {
198 let num_tagged_fields = self.unknown_tagged_fields.len();
199 if num_tagged_fields > std::u32::MAX as usize {
200 bail!(
201 "Too many tagged fields to encode ({} fields)",
202 num_tagged_fields
203 );
204 }
205 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
206
207 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
208 }
209 Ok(())
210 }
211 fn compute_size(&self, version: i16) -> Result<usize> {
212 let mut total_size = 0;
213 if version >= 6 {
214 total_size += types::CompactString.compute_size(&self.group_id)?;
215 } else {
216 total_size += types::String.compute_size(&self.group_id)?;
217 }
218 total_size += types::Int32.compute_size(&self.session_timeout_ms)?;
219 if version >= 1 {
220 total_size += types::Int32.compute_size(&self.rebalance_timeout_ms)?;
221 }
222 if version >= 6 {
223 total_size += types::CompactString.compute_size(&self.member_id)?;
224 } else {
225 total_size += types::String.compute_size(&self.member_id)?;
226 }
227 if version >= 5 {
228 if version >= 6 {
229 total_size += types::CompactString.compute_size(&self.group_instance_id)?;
230 } else {
231 total_size += types::String.compute_size(&self.group_instance_id)?;
232 }
233 } else {
234 if !self.group_instance_id.is_none() {
235 bail!("A field is set that is not available on the selected protocol version");
236 }
237 }
238 if version >= 6 {
239 total_size += types::CompactString.compute_size(&self.protocol_type)?;
240 } else {
241 total_size += types::String.compute_size(&self.protocol_type)?;
242 }
243 if version >= 6 {
244 total_size +=
245 types::CompactArray(types::Struct { version }).compute_size(&self.protocols)?;
246 } else {
247 total_size += types::Array(types::Struct { version }).compute_size(&self.protocols)?;
248 }
249 if version >= 8 {
250 total_size += types::CompactString.compute_size(&self.reason)?;
251 }
252 if version >= 6 {
253 let num_tagged_fields = self.unknown_tagged_fields.len();
254 if num_tagged_fields > std::u32::MAX as usize {
255 bail!(
256 "Too many tagged fields to encode ({} fields)",
257 num_tagged_fields
258 );
259 }
260 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
261
262 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
263 }
264 Ok(total_size)
265 }
266}
267
268#[cfg(feature = "broker")]
269impl Decodable for JoinGroupRequest {
270 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
271 if version < 0 || version > 9 {
272 bail!("specified version not supported by this message type");
273 }
274 let group_id = if version >= 6 {
275 types::CompactString.decode(buf)?
276 } else {
277 types::String.decode(buf)?
278 };
279 let session_timeout_ms = types::Int32.decode(buf)?;
280 let rebalance_timeout_ms = if version >= 1 {
281 types::Int32.decode(buf)?
282 } else {
283 -1
284 };
285 let member_id = if version >= 6 {
286 types::CompactString.decode(buf)?
287 } else {
288 types::String.decode(buf)?
289 };
290 let group_instance_id = if version >= 5 {
291 if version >= 6 {
292 types::CompactString.decode(buf)?
293 } else {
294 types::String.decode(buf)?
295 }
296 } else {
297 None
298 };
299 let protocol_type = if version >= 6 {
300 types::CompactString.decode(buf)?
301 } else {
302 types::String.decode(buf)?
303 };
304 let protocols = if version >= 6 {
305 types::CompactArray(types::Struct { version }).decode(buf)?
306 } else {
307 types::Array(types::Struct { version }).decode(buf)?
308 };
309 let reason = if version >= 8 {
310 types::CompactString.decode(buf)?
311 } else {
312 None
313 };
314 let mut unknown_tagged_fields = BTreeMap::new();
315 if version >= 6 {
316 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
317 for _ in 0..num_tagged_fields {
318 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
319 let size: u32 = types::UnsignedVarInt.decode(buf)?;
320 let unknown_value = buf.try_get_bytes(size as usize)?;
321 unknown_tagged_fields.insert(tag as i32, unknown_value);
322 }
323 }
324 Ok(Self {
325 group_id,
326 session_timeout_ms,
327 rebalance_timeout_ms,
328 member_id,
329 group_instance_id,
330 protocol_type,
331 protocols,
332 reason,
333 unknown_tagged_fields,
334 })
335 }
336}
337
338impl Default for JoinGroupRequest {
339 fn default() -> Self {
340 Self {
341 group_id: Default::default(),
342 session_timeout_ms: 0,
343 rebalance_timeout_ms: -1,
344 member_id: Default::default(),
345 group_instance_id: None,
346 protocol_type: Default::default(),
347 protocols: Default::default(),
348 reason: None,
349 unknown_tagged_fields: BTreeMap::new(),
350 }
351 }
352}
353
354impl Message for JoinGroupRequest {
355 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
356 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
357}
358
359#[non_exhaustive]
361#[derive(Debug, Clone, PartialEq)]
362pub struct JoinGroupRequestProtocol {
363 pub name: StrBytes,
367
368 pub metadata: Bytes,
372
373 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
375}
376
377impl JoinGroupRequestProtocol {
378 pub fn with_name(mut self, value: StrBytes) -> Self {
384 self.name = value;
385 self
386 }
387 pub fn with_metadata(mut self, value: Bytes) -> Self {
393 self.metadata = value;
394 self
395 }
396 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
398 self.unknown_tagged_fields = value;
399 self
400 }
401 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
403 self.unknown_tagged_fields.insert(key, value);
404 self
405 }
406}
407
408#[cfg(feature = "client")]
409impl Encodable for JoinGroupRequestProtocol {
410 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
411 if version < 0 || version > 9 {
412 bail!("specified version not supported by this message type");
413 }
414 if version >= 6 {
415 types::CompactString.encode(buf, &self.name)?;
416 } else {
417 types::String.encode(buf, &self.name)?;
418 }
419 if version >= 6 {
420 types::CompactBytes.encode(buf, &self.metadata)?;
421 } else {
422 types::Bytes.encode(buf, &self.metadata)?;
423 }
424 if version >= 6 {
425 let num_tagged_fields = self.unknown_tagged_fields.len();
426 if num_tagged_fields > std::u32::MAX as usize {
427 bail!(
428 "Too many tagged fields to encode ({} fields)",
429 num_tagged_fields
430 );
431 }
432 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
433
434 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
435 }
436 Ok(())
437 }
438 fn compute_size(&self, version: i16) -> Result<usize> {
439 let mut total_size = 0;
440 if version >= 6 {
441 total_size += types::CompactString.compute_size(&self.name)?;
442 } else {
443 total_size += types::String.compute_size(&self.name)?;
444 }
445 if version >= 6 {
446 total_size += types::CompactBytes.compute_size(&self.metadata)?;
447 } else {
448 total_size += types::Bytes.compute_size(&self.metadata)?;
449 }
450 if version >= 6 {
451 let num_tagged_fields = self.unknown_tagged_fields.len();
452 if num_tagged_fields > std::u32::MAX as usize {
453 bail!(
454 "Too many tagged fields to encode ({} fields)",
455 num_tagged_fields
456 );
457 }
458 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
459
460 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
461 }
462 Ok(total_size)
463 }
464}
465
466#[cfg(feature = "broker")]
467impl Decodable for JoinGroupRequestProtocol {
468 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
469 if version < 0 || version > 9 {
470 bail!("specified version not supported by this message type");
471 }
472 let name = if version >= 6 {
473 types::CompactString.decode(buf)?
474 } else {
475 types::String.decode(buf)?
476 };
477 let metadata = if version >= 6 {
478 types::CompactBytes.decode(buf)?
479 } else {
480 types::Bytes.decode(buf)?
481 };
482 let mut unknown_tagged_fields = BTreeMap::new();
483 if version >= 6 {
484 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
485 for _ in 0..num_tagged_fields {
486 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
487 let size: u32 = types::UnsignedVarInt.decode(buf)?;
488 let unknown_value = buf.try_get_bytes(size as usize)?;
489 unknown_tagged_fields.insert(tag as i32, unknown_value);
490 }
491 }
492 Ok(Self {
493 name,
494 metadata,
495 unknown_tagged_fields,
496 })
497 }
498}
499
500impl Default for JoinGroupRequestProtocol {
501 fn default() -> Self {
502 Self {
503 name: Default::default(),
504 metadata: Default::default(),
505 unknown_tagged_fields: BTreeMap::new(),
506 }
507 }
508}
509
510impl Message for JoinGroupRequestProtocol {
511 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
512 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
513}
514
515impl HeaderVersion for JoinGroupRequest {
516 fn header_version(version: i16) -> i16 {
517 if version >= 6 {
518 2
519 } else {
520 1
521 }
522 }
523}