1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct JoinGroupRequest {
24 pub group_id: super::GroupId,
28
29 pub session_timeout_ms: i32,
33
34 pub rebalance_timeout_ms: i32,
38
39 pub member_id: StrBytes,
43
44 pub group_instance_id: Option<StrBytes>,
48
49 pub protocol_type: StrBytes,
53
54 pub protocols: Vec<JoinGroupRequestProtocol>,
58
59 pub reason: Option<StrBytes>,
63
64 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
66}
67
68impl JoinGroupRequest {
69 pub fn with_group_id(mut self, value: super::GroupId) -> Self {
75 self.group_id = value;
76 self
77 }
78 pub fn with_session_timeout_ms(mut self, value: i32) -> Self {
84 self.session_timeout_ms = value;
85 self
86 }
87 pub fn with_rebalance_timeout_ms(mut self, value: i32) -> Self {
93 self.rebalance_timeout_ms = value;
94 self
95 }
96 pub fn with_member_id(mut self, value: StrBytes) -> Self {
102 self.member_id = value;
103 self
104 }
105 pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
111 self.group_instance_id = value;
112 self
113 }
114 pub fn with_protocol_type(mut self, value: StrBytes) -> Self {
120 self.protocol_type = value;
121 self
122 }
123 pub fn with_protocols(mut self, value: Vec<JoinGroupRequestProtocol>) -> Self {
129 self.protocols = value;
130 self
131 }
132 pub fn with_reason(mut self, value: Option<StrBytes>) -> Self {
138 self.reason = value;
139 self
140 }
141 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
143 self.unknown_tagged_fields = value;
144 self
145 }
146 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
148 self.unknown_tagged_fields.insert(key, value);
149 self
150 }
151}
152
153#[cfg(feature = "client")]
154impl Encodable for JoinGroupRequest {
155 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
156 if version >= 6 {
157 types::CompactString.encode(buf, &self.group_id)?;
158 } else {
159 types::String.encode(buf, &self.group_id)?;
160 }
161 types::Int32.encode(buf, &self.session_timeout_ms)?;
162 if version >= 1 {
163 types::Int32.encode(buf, &self.rebalance_timeout_ms)?;
164 }
165 if version >= 6 {
166 types::CompactString.encode(buf, &self.member_id)?;
167 } else {
168 types::String.encode(buf, &self.member_id)?;
169 }
170 if version >= 5 {
171 if version >= 6 {
172 types::CompactString.encode(buf, &self.group_instance_id)?;
173 } else {
174 types::String.encode(buf, &self.group_instance_id)?;
175 }
176 } else {
177 if !self.group_instance_id.is_none() {
178 bail!("A field is set that is not available on the selected protocol version");
179 }
180 }
181 if version >= 6 {
182 types::CompactString.encode(buf, &self.protocol_type)?;
183 } else {
184 types::String.encode(buf, &self.protocol_type)?;
185 }
186 if version >= 6 {
187 types::CompactArray(types::Struct { version }).encode(buf, &self.protocols)?;
188 } else {
189 types::Array(types::Struct { version }).encode(buf, &self.protocols)?;
190 }
191 if version >= 8 {
192 types::CompactString.encode(buf, &self.reason)?;
193 }
194 if version >= 6 {
195 let num_tagged_fields = self.unknown_tagged_fields.len();
196 if num_tagged_fields > std::u32::MAX as usize {
197 bail!(
198 "Too many tagged fields to encode ({} fields)",
199 num_tagged_fields
200 );
201 }
202 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
203
204 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
205 }
206 Ok(())
207 }
208 fn compute_size(&self, version: i16) -> Result<usize> {
209 let mut total_size = 0;
210 if version >= 6 {
211 total_size += types::CompactString.compute_size(&self.group_id)?;
212 } else {
213 total_size += types::String.compute_size(&self.group_id)?;
214 }
215 total_size += types::Int32.compute_size(&self.session_timeout_ms)?;
216 if version >= 1 {
217 total_size += types::Int32.compute_size(&self.rebalance_timeout_ms)?;
218 }
219 if version >= 6 {
220 total_size += types::CompactString.compute_size(&self.member_id)?;
221 } else {
222 total_size += types::String.compute_size(&self.member_id)?;
223 }
224 if version >= 5 {
225 if version >= 6 {
226 total_size += types::CompactString.compute_size(&self.group_instance_id)?;
227 } else {
228 total_size += types::String.compute_size(&self.group_instance_id)?;
229 }
230 } else {
231 if !self.group_instance_id.is_none() {
232 bail!("A field is set that is not available on the selected protocol version");
233 }
234 }
235 if version >= 6 {
236 total_size += types::CompactString.compute_size(&self.protocol_type)?;
237 } else {
238 total_size += types::String.compute_size(&self.protocol_type)?;
239 }
240 if version >= 6 {
241 total_size +=
242 types::CompactArray(types::Struct { version }).compute_size(&self.protocols)?;
243 } else {
244 total_size += types::Array(types::Struct { version }).compute_size(&self.protocols)?;
245 }
246 if version >= 8 {
247 total_size += types::CompactString.compute_size(&self.reason)?;
248 }
249 if version >= 6 {
250 let num_tagged_fields = self.unknown_tagged_fields.len();
251 if num_tagged_fields > std::u32::MAX as usize {
252 bail!(
253 "Too many tagged fields to encode ({} fields)",
254 num_tagged_fields
255 );
256 }
257 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
258
259 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
260 }
261 Ok(total_size)
262 }
263}
264
265#[cfg(feature = "broker")]
266impl Decodable for JoinGroupRequest {
267 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
268 let group_id = if version >= 6 {
269 types::CompactString.decode(buf)?
270 } else {
271 types::String.decode(buf)?
272 };
273 let session_timeout_ms = types::Int32.decode(buf)?;
274 let rebalance_timeout_ms = if version >= 1 {
275 types::Int32.decode(buf)?
276 } else {
277 -1
278 };
279 let member_id = if version >= 6 {
280 types::CompactString.decode(buf)?
281 } else {
282 types::String.decode(buf)?
283 };
284 let group_instance_id = if version >= 5 {
285 if version >= 6 {
286 types::CompactString.decode(buf)?
287 } else {
288 types::String.decode(buf)?
289 }
290 } else {
291 None
292 };
293 let protocol_type = if version >= 6 {
294 types::CompactString.decode(buf)?
295 } else {
296 types::String.decode(buf)?
297 };
298 let protocols = if version >= 6 {
299 types::CompactArray(types::Struct { version }).decode(buf)?
300 } else {
301 types::Array(types::Struct { version }).decode(buf)?
302 };
303 let reason = if version >= 8 {
304 types::CompactString.decode(buf)?
305 } else {
306 None
307 };
308 let mut unknown_tagged_fields = BTreeMap::new();
309 if version >= 6 {
310 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
311 for _ in 0..num_tagged_fields {
312 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
313 let size: u32 = types::UnsignedVarInt.decode(buf)?;
314 let unknown_value = buf.try_get_bytes(size as usize)?;
315 unknown_tagged_fields.insert(tag as i32, unknown_value);
316 }
317 }
318 Ok(Self {
319 group_id,
320 session_timeout_ms,
321 rebalance_timeout_ms,
322 member_id,
323 group_instance_id,
324 protocol_type,
325 protocols,
326 reason,
327 unknown_tagged_fields,
328 })
329 }
330}
331
332impl Default for JoinGroupRequest {
333 fn default() -> Self {
334 Self {
335 group_id: Default::default(),
336 session_timeout_ms: 0,
337 rebalance_timeout_ms: -1,
338 member_id: Default::default(),
339 group_instance_id: None,
340 protocol_type: Default::default(),
341 protocols: Default::default(),
342 reason: None,
343 unknown_tagged_fields: BTreeMap::new(),
344 }
345 }
346}
347
348impl Message for JoinGroupRequest {
349 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
350 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
351}
352
353#[non_exhaustive]
355#[derive(Debug, Clone, PartialEq)]
356pub struct JoinGroupRequestProtocol {
357 pub name: StrBytes,
361
362 pub metadata: Bytes,
366
367 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
369}
370
371impl JoinGroupRequestProtocol {
372 pub fn with_name(mut self, value: StrBytes) -> Self {
378 self.name = value;
379 self
380 }
381 pub fn with_metadata(mut self, value: Bytes) -> Self {
387 self.metadata = value;
388 self
389 }
390 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
392 self.unknown_tagged_fields = value;
393 self
394 }
395 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
397 self.unknown_tagged_fields.insert(key, value);
398 self
399 }
400}
401
402#[cfg(feature = "client")]
403impl Encodable for JoinGroupRequestProtocol {
404 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
405 if version >= 6 {
406 types::CompactString.encode(buf, &self.name)?;
407 } else {
408 types::String.encode(buf, &self.name)?;
409 }
410 if version >= 6 {
411 types::CompactBytes.encode(buf, &self.metadata)?;
412 } else {
413 types::Bytes.encode(buf, &self.metadata)?;
414 }
415 if version >= 6 {
416 let num_tagged_fields = self.unknown_tagged_fields.len();
417 if num_tagged_fields > std::u32::MAX as usize {
418 bail!(
419 "Too many tagged fields to encode ({} fields)",
420 num_tagged_fields
421 );
422 }
423 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
424
425 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
426 }
427 Ok(())
428 }
429 fn compute_size(&self, version: i16) -> Result<usize> {
430 let mut total_size = 0;
431 if version >= 6 {
432 total_size += types::CompactString.compute_size(&self.name)?;
433 } else {
434 total_size += types::String.compute_size(&self.name)?;
435 }
436 if version >= 6 {
437 total_size += types::CompactBytes.compute_size(&self.metadata)?;
438 } else {
439 total_size += types::Bytes.compute_size(&self.metadata)?;
440 }
441 if version >= 6 {
442 let num_tagged_fields = self.unknown_tagged_fields.len();
443 if num_tagged_fields > std::u32::MAX as usize {
444 bail!(
445 "Too many tagged fields to encode ({} fields)",
446 num_tagged_fields
447 );
448 }
449 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
450
451 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
452 }
453 Ok(total_size)
454 }
455}
456
457#[cfg(feature = "broker")]
458impl Decodable for JoinGroupRequestProtocol {
459 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
460 let name = if version >= 6 {
461 types::CompactString.decode(buf)?
462 } else {
463 types::String.decode(buf)?
464 };
465 let metadata = if version >= 6 {
466 types::CompactBytes.decode(buf)?
467 } else {
468 types::Bytes.decode(buf)?
469 };
470 let mut unknown_tagged_fields = BTreeMap::new();
471 if version >= 6 {
472 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
473 for _ in 0..num_tagged_fields {
474 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
475 let size: u32 = types::UnsignedVarInt.decode(buf)?;
476 let unknown_value = buf.try_get_bytes(size as usize)?;
477 unknown_tagged_fields.insert(tag as i32, unknown_value);
478 }
479 }
480 Ok(Self {
481 name,
482 metadata,
483 unknown_tagged_fields,
484 })
485 }
486}
487
488impl Default for JoinGroupRequestProtocol {
489 fn default() -> Self {
490 Self {
491 name: Default::default(),
492 metadata: Default::default(),
493 unknown_tagged_fields: BTreeMap::new(),
494 }
495 }
496}
497
498impl Message for JoinGroupRequestProtocol {
499 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
500 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
501}
502
503impl HeaderVersion for JoinGroupRequest {
504 fn header_version(version: i16) -> i16 {
505 if version >= 6 {
506 2
507 } else {
508 1
509 }
510 }
511}