1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct Assignment {
24 pub topic_partitions: Vec<TopicPartitions>,
28
29 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
31}
32
33impl Assignment {
34 pub fn with_topic_partitions(mut self, value: Vec<TopicPartitions>) -> Self {
40 self.topic_partitions = value;
41 self
42 }
43 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
45 self.unknown_tagged_fields = value;
46 self
47 }
48 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
50 self.unknown_tagged_fields.insert(key, value);
51 self
52 }
53}
54
55#[cfg(feature = "broker")]
56impl Encodable for Assignment {
57 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
58 if version != 0 {
59 bail!("specified version not supported by this message type");
60 }
61 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_partitions)?;
62 let num_tagged_fields = self.unknown_tagged_fields.len();
63 if num_tagged_fields > std::u32::MAX as usize {
64 bail!(
65 "Too many tagged fields to encode ({} fields)",
66 num_tagged_fields
67 );
68 }
69 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
70
71 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
72 Ok(())
73 }
74 fn compute_size(&self, version: i16) -> Result<usize> {
75 let mut total_size = 0;
76 total_size +=
77 types::CompactArray(types::Struct { version }).compute_size(&self.topic_partitions)?;
78 let num_tagged_fields = self.unknown_tagged_fields.len();
79 if num_tagged_fields > std::u32::MAX as usize {
80 bail!(
81 "Too many tagged fields to encode ({} fields)",
82 num_tagged_fields
83 );
84 }
85 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
86
87 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
88 Ok(total_size)
89 }
90}
91
92#[cfg(feature = "client")]
93impl Decodable for Assignment {
94 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
95 if version != 0 {
96 bail!("specified version not supported by this message type");
97 }
98 let topic_partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
99 let mut unknown_tagged_fields = BTreeMap::new();
100 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
101 for _ in 0..num_tagged_fields {
102 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
103 let size: u32 = types::UnsignedVarInt.decode(buf)?;
104 let unknown_value = buf.try_get_bytes(size as usize)?;
105 unknown_tagged_fields.insert(tag as i32, unknown_value);
106 }
107 Ok(Self {
108 topic_partitions,
109 unknown_tagged_fields,
110 })
111 }
112}
113
114impl Default for Assignment {
115 fn default() -> Self {
116 Self {
117 topic_partitions: Default::default(),
118 unknown_tagged_fields: BTreeMap::new(),
119 }
120 }
121}
122
123impl Message for Assignment {
124 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
125 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
126}
127
128#[non_exhaustive]
130#[derive(Debug, Clone, PartialEq)]
131pub struct ConsumerGroupHeartbeatResponse {
132 pub throttle_time_ms: i32,
136
137 pub error_code: i16,
141
142 pub error_message: Option<StrBytes>,
146
147 pub member_id: Option<StrBytes>,
151
152 pub member_epoch: i32,
156
157 pub heartbeat_interval_ms: i32,
161
162 pub assignment: Option<Assignment>,
166
167 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
169}
170
171impl ConsumerGroupHeartbeatResponse {
172 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
178 self.throttle_time_ms = value;
179 self
180 }
181 pub fn with_error_code(mut self, value: i16) -> Self {
187 self.error_code = value;
188 self
189 }
190 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
196 self.error_message = value;
197 self
198 }
199 pub fn with_member_id(mut self, value: Option<StrBytes>) -> Self {
205 self.member_id = value;
206 self
207 }
208 pub fn with_member_epoch(mut self, value: i32) -> Self {
214 self.member_epoch = value;
215 self
216 }
217 pub fn with_heartbeat_interval_ms(mut self, value: i32) -> Self {
223 self.heartbeat_interval_ms = value;
224 self
225 }
226 pub fn with_assignment(mut self, value: Option<Assignment>) -> Self {
232 self.assignment = value;
233 self
234 }
235 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
237 self.unknown_tagged_fields = value;
238 self
239 }
240 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
242 self.unknown_tagged_fields.insert(key, value);
243 self
244 }
245}
246
247#[cfg(feature = "broker")]
248impl Encodable for ConsumerGroupHeartbeatResponse {
249 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
250 if version != 0 {
251 bail!("specified version not supported by this message type");
252 }
253 types::Int32.encode(buf, &self.throttle_time_ms)?;
254 types::Int16.encode(buf, &self.error_code)?;
255 types::CompactString.encode(buf, &self.error_message)?;
256 types::CompactString.encode(buf, &self.member_id)?;
257 types::Int32.encode(buf, &self.member_epoch)?;
258 types::Int32.encode(buf, &self.heartbeat_interval_ms)?;
259 types::OptionStruct { version }.encode(buf, &self.assignment)?;
260 let num_tagged_fields = self.unknown_tagged_fields.len();
261 if num_tagged_fields > std::u32::MAX as usize {
262 bail!(
263 "Too many tagged fields to encode ({} fields)",
264 num_tagged_fields
265 );
266 }
267 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
268
269 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
270 Ok(())
271 }
272 fn compute_size(&self, version: i16) -> Result<usize> {
273 let mut total_size = 0;
274 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
275 total_size += types::Int16.compute_size(&self.error_code)?;
276 total_size += types::CompactString.compute_size(&self.error_message)?;
277 total_size += types::CompactString.compute_size(&self.member_id)?;
278 total_size += types::Int32.compute_size(&self.member_epoch)?;
279 total_size += types::Int32.compute_size(&self.heartbeat_interval_ms)?;
280 total_size += types::OptionStruct { version }.compute_size(&self.assignment)?;
281 let num_tagged_fields = self.unknown_tagged_fields.len();
282 if num_tagged_fields > std::u32::MAX as usize {
283 bail!(
284 "Too many tagged fields to encode ({} fields)",
285 num_tagged_fields
286 );
287 }
288 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
289
290 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
291 Ok(total_size)
292 }
293}
294
295#[cfg(feature = "client")]
296impl Decodable for ConsumerGroupHeartbeatResponse {
297 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
298 if version != 0 {
299 bail!("specified version not supported by this message type");
300 }
301 let throttle_time_ms = types::Int32.decode(buf)?;
302 let error_code = types::Int16.decode(buf)?;
303 let error_message = types::CompactString.decode(buf)?;
304 let member_id = types::CompactString.decode(buf)?;
305 let member_epoch = types::Int32.decode(buf)?;
306 let heartbeat_interval_ms = types::Int32.decode(buf)?;
307 let assignment = types::OptionStruct { version }.decode(buf)?;
308 let mut unknown_tagged_fields = BTreeMap::new();
309 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
310 for _ in 0..num_tagged_fields {
311 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
312 let size: u32 = types::UnsignedVarInt.decode(buf)?;
313 let unknown_value = buf.try_get_bytes(size as usize)?;
314 unknown_tagged_fields.insert(tag as i32, unknown_value);
315 }
316 Ok(Self {
317 throttle_time_ms,
318 error_code,
319 error_message,
320 member_id,
321 member_epoch,
322 heartbeat_interval_ms,
323 assignment,
324 unknown_tagged_fields,
325 })
326 }
327}
328
329impl Default for ConsumerGroupHeartbeatResponse {
330 fn default() -> Self {
331 Self {
332 throttle_time_ms: 0,
333 error_code: 0,
334 error_message: None,
335 member_id: None,
336 member_epoch: 0,
337 heartbeat_interval_ms: 0,
338 assignment: None,
339 unknown_tagged_fields: BTreeMap::new(),
340 }
341 }
342}
343
344impl Message for ConsumerGroupHeartbeatResponse {
345 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
346 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
347}
348
349#[non_exhaustive]
351#[derive(Debug, Clone, PartialEq)]
352pub struct TopicPartitions {
353 pub topic_id: Uuid,
357
358 pub partitions: Vec<i32>,
362
363 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
365}
366
367impl TopicPartitions {
368 pub fn with_topic_id(mut self, value: Uuid) -> Self {
374 self.topic_id = value;
375 self
376 }
377 pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
383 self.partitions = value;
384 self
385 }
386 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
388 self.unknown_tagged_fields = value;
389 self
390 }
391 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
393 self.unknown_tagged_fields.insert(key, value);
394 self
395 }
396}
397
398#[cfg(feature = "broker")]
399impl Encodable for TopicPartitions {
400 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
401 if version != 0 {
402 bail!("specified version not supported by this message type");
403 }
404 types::Uuid.encode(buf, &self.topic_id)?;
405 types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
406 let num_tagged_fields = self.unknown_tagged_fields.len();
407 if num_tagged_fields > std::u32::MAX as usize {
408 bail!(
409 "Too many tagged fields to encode ({} fields)",
410 num_tagged_fields
411 );
412 }
413 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
414
415 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
416 Ok(())
417 }
418 fn compute_size(&self, version: i16) -> Result<usize> {
419 let mut total_size = 0;
420 total_size += types::Uuid.compute_size(&self.topic_id)?;
421 total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
422 let num_tagged_fields = self.unknown_tagged_fields.len();
423 if num_tagged_fields > std::u32::MAX as usize {
424 bail!(
425 "Too many tagged fields to encode ({} fields)",
426 num_tagged_fields
427 );
428 }
429 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
430
431 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
432 Ok(total_size)
433 }
434}
435
436#[cfg(feature = "client")]
437impl Decodable for TopicPartitions {
438 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
439 if version != 0 {
440 bail!("specified version not supported by this message type");
441 }
442 let topic_id = types::Uuid.decode(buf)?;
443 let partitions = types::CompactArray(types::Int32).decode(buf)?;
444 let mut unknown_tagged_fields = BTreeMap::new();
445 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
446 for _ in 0..num_tagged_fields {
447 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
448 let size: u32 = types::UnsignedVarInt.decode(buf)?;
449 let unknown_value = buf.try_get_bytes(size as usize)?;
450 unknown_tagged_fields.insert(tag as i32, unknown_value);
451 }
452 Ok(Self {
453 topic_id,
454 partitions,
455 unknown_tagged_fields,
456 })
457 }
458}
459
460impl Default for TopicPartitions {
461 fn default() -> Self {
462 Self {
463 topic_id: Uuid::nil(),
464 partitions: Default::default(),
465 unknown_tagged_fields: BTreeMap::new(),
466 }
467 }
468}
469
470impl Message for TopicPartitions {
471 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
472 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
473}
474
475impl HeaderVersion for ConsumerGroupHeartbeatResponse {
476 fn header_version(version: i16) -> i16 {
477 1
478 }
479}