1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct SyncGroupRequest {
24 pub group_id: super::GroupId,
28
29 pub generation_id: i32,
33
34 pub member_id: StrBytes,
38
39 pub group_instance_id: Option<StrBytes>,
43
44 pub protocol_type: Option<StrBytes>,
48
49 pub protocol_name: Option<StrBytes>,
53
54 pub assignments: Vec<SyncGroupRequestAssignment>,
58
59 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
61}
62
63impl SyncGroupRequest {
64 pub fn with_group_id(mut self, value: super::GroupId) -> Self {
70 self.group_id = value;
71 self
72 }
73 pub fn with_generation_id(mut self, value: i32) -> Self {
79 self.generation_id = value;
80 self
81 }
82 pub fn with_member_id(mut self, value: StrBytes) -> Self {
88 self.member_id = value;
89 self
90 }
91 pub fn with_group_instance_id(mut self, value: Option<StrBytes>) -> Self {
97 self.group_instance_id = value;
98 self
99 }
100 pub fn with_protocol_type(mut self, value: Option<StrBytes>) -> Self {
106 self.protocol_type = value;
107 self
108 }
109 pub fn with_protocol_name(mut self, value: Option<StrBytes>) -> Self {
115 self.protocol_name = value;
116 self
117 }
118 pub fn with_assignments(mut self, value: Vec<SyncGroupRequestAssignment>) -> Self {
124 self.assignments = value;
125 self
126 }
127 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
129 self.unknown_tagged_fields = value;
130 self
131 }
132 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
134 self.unknown_tagged_fields.insert(key, value);
135 self
136 }
137}
138
139#[cfg(feature = "client")]
140impl Encodable for SyncGroupRequest {
141 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
142 if version >= 4 {
143 types::CompactString.encode(buf, &self.group_id)?;
144 } else {
145 types::String.encode(buf, &self.group_id)?;
146 }
147 types::Int32.encode(buf, &self.generation_id)?;
148 if version >= 4 {
149 types::CompactString.encode(buf, &self.member_id)?;
150 } else {
151 types::String.encode(buf, &self.member_id)?;
152 }
153 if version >= 3 {
154 if version >= 4 {
155 types::CompactString.encode(buf, &self.group_instance_id)?;
156 } else {
157 types::String.encode(buf, &self.group_instance_id)?;
158 }
159 } else {
160 if !self.group_instance_id.is_none() {
161 bail!("A field is set that is not available on the selected protocol version");
162 }
163 }
164 if version >= 5 {
165 types::CompactString.encode(buf, &self.protocol_type)?;
166 }
167 if version >= 5 {
168 types::CompactString.encode(buf, &self.protocol_name)?;
169 }
170 if version >= 4 {
171 types::CompactArray(types::Struct { version }).encode(buf, &self.assignments)?;
172 } else {
173 types::Array(types::Struct { version }).encode(buf, &self.assignments)?;
174 }
175 if version >= 4 {
176 let num_tagged_fields = self.unknown_tagged_fields.len();
177 if num_tagged_fields > std::u32::MAX as usize {
178 bail!(
179 "Too many tagged fields to encode ({} fields)",
180 num_tagged_fields
181 );
182 }
183 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
184
185 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
186 }
187 Ok(())
188 }
189 fn compute_size(&self, version: i16) -> Result<usize> {
190 let mut total_size = 0;
191 if version >= 4 {
192 total_size += types::CompactString.compute_size(&self.group_id)?;
193 } else {
194 total_size += types::String.compute_size(&self.group_id)?;
195 }
196 total_size += types::Int32.compute_size(&self.generation_id)?;
197 if version >= 4 {
198 total_size += types::CompactString.compute_size(&self.member_id)?;
199 } else {
200 total_size += types::String.compute_size(&self.member_id)?;
201 }
202 if version >= 3 {
203 if version >= 4 {
204 total_size += types::CompactString.compute_size(&self.group_instance_id)?;
205 } else {
206 total_size += types::String.compute_size(&self.group_instance_id)?;
207 }
208 } else {
209 if !self.group_instance_id.is_none() {
210 bail!("A field is set that is not available on the selected protocol version");
211 }
212 }
213 if version >= 5 {
214 total_size += types::CompactString.compute_size(&self.protocol_type)?;
215 }
216 if version >= 5 {
217 total_size += types::CompactString.compute_size(&self.protocol_name)?;
218 }
219 if version >= 4 {
220 total_size +=
221 types::CompactArray(types::Struct { version }).compute_size(&self.assignments)?;
222 } else {
223 total_size +=
224 types::Array(types::Struct { version }).compute_size(&self.assignments)?;
225 }
226 if version >= 4 {
227 let num_tagged_fields = self.unknown_tagged_fields.len();
228 if num_tagged_fields > std::u32::MAX as usize {
229 bail!(
230 "Too many tagged fields to encode ({} fields)",
231 num_tagged_fields
232 );
233 }
234 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
235
236 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
237 }
238 Ok(total_size)
239 }
240}
241
242#[cfg(feature = "broker")]
243impl Decodable for SyncGroupRequest {
244 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
245 let group_id = if version >= 4 {
246 types::CompactString.decode(buf)?
247 } else {
248 types::String.decode(buf)?
249 };
250 let generation_id = types::Int32.decode(buf)?;
251 let member_id = if version >= 4 {
252 types::CompactString.decode(buf)?
253 } else {
254 types::String.decode(buf)?
255 };
256 let group_instance_id = if version >= 3 {
257 if version >= 4 {
258 types::CompactString.decode(buf)?
259 } else {
260 types::String.decode(buf)?
261 }
262 } else {
263 None
264 };
265 let protocol_type = if version >= 5 {
266 types::CompactString.decode(buf)?
267 } else {
268 None
269 };
270 let protocol_name = if version >= 5 {
271 types::CompactString.decode(buf)?
272 } else {
273 None
274 };
275 let assignments = if version >= 4 {
276 types::CompactArray(types::Struct { version }).decode(buf)?
277 } else {
278 types::Array(types::Struct { version }).decode(buf)?
279 };
280 let mut unknown_tagged_fields = BTreeMap::new();
281 if version >= 4 {
282 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
283 for _ in 0..num_tagged_fields {
284 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
285 let size: u32 = types::UnsignedVarInt.decode(buf)?;
286 let unknown_value = buf.try_get_bytes(size as usize)?;
287 unknown_tagged_fields.insert(tag as i32, unknown_value);
288 }
289 }
290 Ok(Self {
291 group_id,
292 generation_id,
293 member_id,
294 group_instance_id,
295 protocol_type,
296 protocol_name,
297 assignments,
298 unknown_tagged_fields,
299 })
300 }
301}
302
303impl Default for SyncGroupRequest {
304 fn default() -> Self {
305 Self {
306 group_id: Default::default(),
307 generation_id: 0,
308 member_id: Default::default(),
309 group_instance_id: None,
310 protocol_type: None,
311 protocol_name: None,
312 assignments: Default::default(),
313 unknown_tagged_fields: BTreeMap::new(),
314 }
315 }
316}
317
318impl Message for SyncGroupRequest {
319 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
320 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
321}
322
323#[non_exhaustive]
325#[derive(Debug, Clone, PartialEq)]
326pub struct SyncGroupRequestAssignment {
327 pub member_id: StrBytes,
331
332 pub assignment: Bytes,
336
337 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
339}
340
341impl SyncGroupRequestAssignment {
342 pub fn with_member_id(mut self, value: StrBytes) -> Self {
348 self.member_id = value;
349 self
350 }
351 pub fn with_assignment(mut self, value: Bytes) -> Self {
357 self.assignment = value;
358 self
359 }
360 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
362 self.unknown_tagged_fields = value;
363 self
364 }
365 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
367 self.unknown_tagged_fields.insert(key, value);
368 self
369 }
370}
371
372#[cfg(feature = "client")]
373impl Encodable for SyncGroupRequestAssignment {
374 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
375 if version >= 4 {
376 types::CompactString.encode(buf, &self.member_id)?;
377 } else {
378 types::String.encode(buf, &self.member_id)?;
379 }
380 if version >= 4 {
381 types::CompactBytes.encode(buf, &self.assignment)?;
382 } else {
383 types::Bytes.encode(buf, &self.assignment)?;
384 }
385 if version >= 4 {
386 let num_tagged_fields = self.unknown_tagged_fields.len();
387 if num_tagged_fields > std::u32::MAX as usize {
388 bail!(
389 "Too many tagged fields to encode ({} fields)",
390 num_tagged_fields
391 );
392 }
393 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
394
395 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
396 }
397 Ok(())
398 }
399 fn compute_size(&self, version: i16) -> Result<usize> {
400 let mut total_size = 0;
401 if version >= 4 {
402 total_size += types::CompactString.compute_size(&self.member_id)?;
403 } else {
404 total_size += types::String.compute_size(&self.member_id)?;
405 }
406 if version >= 4 {
407 total_size += types::CompactBytes.compute_size(&self.assignment)?;
408 } else {
409 total_size += types::Bytes.compute_size(&self.assignment)?;
410 }
411 if version >= 4 {
412 let num_tagged_fields = self.unknown_tagged_fields.len();
413 if num_tagged_fields > std::u32::MAX as usize {
414 bail!(
415 "Too many tagged fields to encode ({} fields)",
416 num_tagged_fields
417 );
418 }
419 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
420
421 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
422 }
423 Ok(total_size)
424 }
425}
426
427#[cfg(feature = "broker")]
428impl Decodable for SyncGroupRequestAssignment {
429 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
430 let member_id = if version >= 4 {
431 types::CompactString.decode(buf)?
432 } else {
433 types::String.decode(buf)?
434 };
435 let assignment = if version >= 4 {
436 types::CompactBytes.decode(buf)?
437 } else {
438 types::Bytes.decode(buf)?
439 };
440 let mut unknown_tagged_fields = BTreeMap::new();
441 if version >= 4 {
442 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
443 for _ in 0..num_tagged_fields {
444 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
445 let size: u32 = types::UnsignedVarInt.decode(buf)?;
446 let unknown_value = buf.try_get_bytes(size as usize)?;
447 unknown_tagged_fields.insert(tag as i32, unknown_value);
448 }
449 }
450 Ok(Self {
451 member_id,
452 assignment,
453 unknown_tagged_fields,
454 })
455 }
456}
457
458impl Default for SyncGroupRequestAssignment {
459 fn default() -> Self {
460 Self {
461 member_id: Default::default(),
462 assignment: Default::default(),
463 unknown_tagged_fields: BTreeMap::new(),
464 }
465 }
466}
467
468impl Message for SyncGroupRequestAssignment {
469 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
470 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
471}
472
473impl HeaderVersion for SyncGroupRequest {
474 fn header_version(version: i16) -> i16 {
475 if version >= 4 {
476 2
477 } else {
478 1
479 }
480 }
481}