1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct SyncGroupRequest {
24 pub group_id: super::GroupId,
28
29 pub generation_id: i32,
33
34 pub member_id: StrBytes,
38
39 pub group_instance_id: Option<StrBytes>,
43
44 pub protocol_type: Option<StrBytes>,
48
49 pub protocol_name: Option<StrBytes>,
53
54 pub assignments: Vec<SyncGroupRequestAssignment>,
58
59 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
61}
62
63impl SyncGroupRequest {
64 pub fn with_group_id(mut self, value: super::GroupId) -> Self {
70 self.group_id = value;
71 self
72 }
73 pub fn with_generation_id(mut self, value: i32) -> Self {
79 self.generation_id = value;
80 self
81 }
82 pub fn with_member_id(mut self, value: StrBytes) -> Self {
88 self.member_id = value;
89 self
90 }
91 pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
97 self.group_instance_id = value;
98 self
99 }
100 pub fn with_protocol_type(mut self, value: Option<StrBytes>) -> Self {
106 self.protocol_type = value;
107 self
108 }
109 pub fn with_protocol_name(mut self, value: Option<StrBytes>) -> Self {
115 self.protocol_name = value;
116 self
117 }
118 pub fn with_assignments(mut self, value: Vec<SyncGroupRequestAssignment>) -> Self {
124 self.assignments = value;
125 self
126 }
127 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
129 self.unknown_tagged_fields = value;
130 self
131 }
132 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
134 self.unknown_tagged_fields.insert(key, value);
135 self
136 }
137}
138
139#[cfg(feature = "client")]
140impl Encodable for SyncGroupRequest {
141 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
142 if version < 0 || version > 5 {
143 bail!("specified version not supported by this message type");
144 }
145 if version >= 4 {
146 types::CompactString.encode(buf, &self.group_id)?;
147 } else {
148 types::String.encode(buf, &self.group_id)?;
149 }
150 types::Int32.encode(buf, &self.generation_id)?;
151 if version >= 4 {
152 types::CompactString.encode(buf, &self.member_id)?;
153 } else {
154 types::String.encode(buf, &self.member_id)?;
155 }
156 if version >= 3 {
157 if version >= 4 {
158 types::CompactString.encode(buf, &self.group_instance_id)?;
159 } else {
160 types::String.encode(buf, &self.group_instance_id)?;
161 }
162 } else {
163 if !self.group_instance_id.is_none() {
164 bail!("A field is set that is not available on the selected protocol version");
165 }
166 }
167 if version >= 5 {
168 types::CompactString.encode(buf, &self.protocol_type)?;
169 }
170 if version >= 5 {
171 types::CompactString.encode(buf, &self.protocol_name)?;
172 }
173 if version >= 4 {
174 types::CompactArray(types::Struct { version }).encode(buf, &self.assignments)?;
175 } else {
176 types::Array(types::Struct { version }).encode(buf, &self.assignments)?;
177 }
178 if version >= 4 {
179 let num_tagged_fields = self.unknown_tagged_fields.len();
180 if num_tagged_fields > std::u32::MAX as usize {
181 bail!(
182 "Too many tagged fields to encode ({} fields)",
183 num_tagged_fields
184 );
185 }
186 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
187
188 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
189 }
190 Ok(())
191 }
192 fn compute_size(&self, version: i16) -> Result<usize> {
193 let mut total_size = 0;
194 if version >= 4 {
195 total_size += types::CompactString.compute_size(&self.group_id)?;
196 } else {
197 total_size += types::String.compute_size(&self.group_id)?;
198 }
199 total_size += types::Int32.compute_size(&self.generation_id)?;
200 if version >= 4 {
201 total_size += types::CompactString.compute_size(&self.member_id)?;
202 } else {
203 total_size += types::String.compute_size(&self.member_id)?;
204 }
205 if version >= 3 {
206 if version >= 4 {
207 total_size += types::CompactString.compute_size(&self.group_instance_id)?;
208 } else {
209 total_size += types::String.compute_size(&self.group_instance_id)?;
210 }
211 } else {
212 if !self.group_instance_id.is_none() {
213 bail!("A field is set that is not available on the selected protocol version");
214 }
215 }
216 if version >= 5 {
217 total_size += types::CompactString.compute_size(&self.protocol_type)?;
218 }
219 if version >= 5 {
220 total_size += types::CompactString.compute_size(&self.protocol_name)?;
221 }
222 if version >= 4 {
223 total_size +=
224 types::CompactArray(types::Struct { version }).compute_size(&self.assignments)?;
225 } else {
226 total_size +=
227 types::Array(types::Struct { version }).compute_size(&self.assignments)?;
228 }
229 if version >= 4 {
230 let num_tagged_fields = self.unknown_tagged_fields.len();
231 if num_tagged_fields > std::u32::MAX as usize {
232 bail!(
233 "Too many tagged fields to encode ({} fields)",
234 num_tagged_fields
235 );
236 }
237 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
238
239 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
240 }
241 Ok(total_size)
242 }
243}
244
245#[cfg(feature = "broker")]
246impl Decodable for SyncGroupRequest {
247 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
248 if version < 0 || version > 5 {
249 bail!("specified version not supported by this message type");
250 }
251 let group_id = if version >= 4 {
252 types::CompactString.decode(buf)?
253 } else {
254 types::String.decode(buf)?
255 };
256 let generation_id = types::Int32.decode(buf)?;
257 let member_id = if version >= 4 {
258 types::CompactString.decode(buf)?
259 } else {
260 types::String.decode(buf)?
261 };
262 let group_instance_id = if version >= 3 {
263 if version >= 4 {
264 types::CompactString.decode(buf)?
265 } else {
266 types::String.decode(buf)?
267 }
268 } else {
269 None
270 };
271 let protocol_type = if version >= 5 {
272 types::CompactString.decode(buf)?
273 } else {
274 None
275 };
276 let protocol_name = if version >= 5 {
277 types::CompactString.decode(buf)?
278 } else {
279 None
280 };
281 let assignments = if version >= 4 {
282 types::CompactArray(types::Struct { version }).decode(buf)?
283 } else {
284 types::Array(types::Struct { version }).decode(buf)?
285 };
286 let mut unknown_tagged_fields = BTreeMap::new();
287 if version >= 4 {
288 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
289 for _ in 0..num_tagged_fields {
290 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
291 let size: u32 = types::UnsignedVarInt.decode(buf)?;
292 let unknown_value = buf.try_get_bytes(size as usize)?;
293 unknown_tagged_fields.insert(tag as i32, unknown_value);
294 }
295 }
296 Ok(Self {
297 group_id,
298 generation_id,
299 member_id,
300 group_instance_id,
301 protocol_type,
302 protocol_name,
303 assignments,
304 unknown_tagged_fields,
305 })
306 }
307}
308
309impl Default for SyncGroupRequest {
310 fn default() -> Self {
311 Self {
312 group_id: Default::default(),
313 generation_id: 0,
314 member_id: Default::default(),
315 group_instance_id: None,
316 protocol_type: None,
317 protocol_name: None,
318 assignments: Default::default(),
319 unknown_tagged_fields: BTreeMap::new(),
320 }
321 }
322}
323
324impl Message for SyncGroupRequest {
325 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
326 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
327}
328
329#[non_exhaustive]
331#[derive(Debug, Clone, PartialEq)]
332pub struct SyncGroupRequestAssignment {
333 pub member_id: StrBytes,
337
338 pub assignment: Bytes,
342
343 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
345}
346
347impl SyncGroupRequestAssignment {
348 pub fn with_member_id(mut self, value: StrBytes) -> Self {
354 self.member_id = value;
355 self
356 }
357 pub fn with_assignment(mut self, value: Bytes) -> Self {
363 self.assignment = value;
364 self
365 }
366 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
368 self.unknown_tagged_fields = value;
369 self
370 }
371 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
373 self.unknown_tagged_fields.insert(key, value);
374 self
375 }
376}
377
378#[cfg(feature = "client")]
379impl Encodable for SyncGroupRequestAssignment {
380 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
381 if version < 0 || version > 5 {
382 bail!("specified version not supported by this message type");
383 }
384 if version >= 4 {
385 types::CompactString.encode(buf, &self.member_id)?;
386 } else {
387 types::String.encode(buf, &self.member_id)?;
388 }
389 if version >= 4 {
390 types::CompactBytes.encode(buf, &self.assignment)?;
391 } else {
392 types::Bytes.encode(buf, &self.assignment)?;
393 }
394 if version >= 4 {
395 let num_tagged_fields = self.unknown_tagged_fields.len();
396 if num_tagged_fields > std::u32::MAX as usize {
397 bail!(
398 "Too many tagged fields to encode ({} fields)",
399 num_tagged_fields
400 );
401 }
402 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
403
404 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
405 }
406 Ok(())
407 }
408 fn compute_size(&self, version: i16) -> Result<usize> {
409 let mut total_size = 0;
410 if version >= 4 {
411 total_size += types::CompactString.compute_size(&self.member_id)?;
412 } else {
413 total_size += types::String.compute_size(&self.member_id)?;
414 }
415 if version >= 4 {
416 total_size += types::CompactBytes.compute_size(&self.assignment)?;
417 } else {
418 total_size += types::Bytes.compute_size(&self.assignment)?;
419 }
420 if version >= 4 {
421 let num_tagged_fields = self.unknown_tagged_fields.len();
422 if num_tagged_fields > std::u32::MAX as usize {
423 bail!(
424 "Too many tagged fields to encode ({} fields)",
425 num_tagged_fields
426 );
427 }
428 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
429
430 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
431 }
432 Ok(total_size)
433 }
434}
435
436#[cfg(feature = "broker")]
437impl Decodable for SyncGroupRequestAssignment {
438 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
439 if version < 0 || version > 5 {
440 bail!("specified version not supported by this message type");
441 }
442 let member_id = if version >= 4 {
443 types::CompactString.decode(buf)?
444 } else {
445 types::String.decode(buf)?
446 };
447 let assignment = if version >= 4 {
448 types::CompactBytes.decode(buf)?
449 } else {
450 types::Bytes.decode(buf)?
451 };
452 let mut unknown_tagged_fields = BTreeMap::new();
453 if version >= 4 {
454 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
455 for _ in 0..num_tagged_fields {
456 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
457 let size: u32 = types::UnsignedVarInt.decode(buf)?;
458 let unknown_value = buf.try_get_bytes(size as usize)?;
459 unknown_tagged_fields.insert(tag as i32, unknown_value);
460 }
461 }
462 Ok(Self {
463 member_id,
464 assignment,
465 unknown_tagged_fields,
466 })
467 }
468}
469
470impl Default for SyncGroupRequestAssignment {
471 fn default() -> Self {
472 Self {
473 member_id: Default::default(),
474 assignment: Default::default(),
475 unknown_tagged_fields: BTreeMap::new(),
476 }
477 }
478}
479
480impl Message for SyncGroupRequestAssignment {
481 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
482 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
483}
484
485impl HeaderVersion for SyncGroupRequest {
486 fn header_version(version: i16) -> i16 {
487 if version >= 4 {
488 2
489 } else {
490 1
491 }
492 }
493}