1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetCommitResponse {
24 pub throttle_time_ms: i32,
28
29 pub topics: Vec<OffsetCommitResponseTopic>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl OffsetCommitResponse {
39 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
45 self.throttle_time_ms = value;
46 self
47 }
48 pub fn with_topics(mut self, value: Vec<OffsetCommitResponseTopic>) -> Self {
54 self.topics = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for OffsetCommitResponse {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version < 2 || version > 10 {
73 bail!("specified version not supported by this message type");
74 }
75 if version >= 3 {
76 types::Int32.encode(buf, &self.throttle_time_ms)?;
77 }
78 if version >= 8 {
79 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
80 } else {
81 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
82 }
83 if version >= 8 {
84 let num_tagged_fields = self.unknown_tagged_fields.len();
85 if num_tagged_fields > std::u32::MAX as usize {
86 bail!(
87 "Too many tagged fields to encode ({} fields)",
88 num_tagged_fields
89 );
90 }
91 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
92
93 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
94 }
95 Ok(())
96 }
97 fn compute_size(&self, version: i16) -> Result<usize> {
98 let mut total_size = 0;
99 if version >= 3 {
100 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
101 }
102 if version >= 8 {
103 total_size +=
104 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
105 } else {
106 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
107 }
108 if version >= 8 {
109 let num_tagged_fields = self.unknown_tagged_fields.len();
110 if num_tagged_fields > std::u32::MAX as usize {
111 bail!(
112 "Too many tagged fields to encode ({} fields)",
113 num_tagged_fields
114 );
115 }
116 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
117
118 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
119 }
120 Ok(total_size)
121 }
122}
123
124#[cfg(feature = "client")]
125impl Decodable for OffsetCommitResponse {
126 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
127 if version < 2 || version > 10 {
128 bail!("specified version not supported by this message type");
129 }
130 let throttle_time_ms = if version >= 3 {
131 types::Int32.decode(buf)?
132 } else {
133 0
134 };
135 let topics = if version >= 8 {
136 types::CompactArray(types::Struct { version }).decode(buf)?
137 } else {
138 types::Array(types::Struct { version }).decode(buf)?
139 };
140 let mut unknown_tagged_fields = BTreeMap::new();
141 if version >= 8 {
142 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
143 for _ in 0..num_tagged_fields {
144 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
145 let size: u32 = types::UnsignedVarInt.decode(buf)?;
146 let unknown_value = buf.try_get_bytes(size as usize)?;
147 unknown_tagged_fields.insert(tag as i32, unknown_value);
148 }
149 }
150 Ok(Self {
151 throttle_time_ms,
152 topics,
153 unknown_tagged_fields,
154 })
155 }
156}
157
158impl Default for OffsetCommitResponse {
159 fn default() -> Self {
160 Self {
161 throttle_time_ms: 0,
162 topics: Default::default(),
163 unknown_tagged_fields: BTreeMap::new(),
164 }
165 }
166}
167
168impl Message for OffsetCommitResponse {
169 const VERSIONS: VersionRange = VersionRange { min: 2, max: 10 };
170 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
171}
172
173#[non_exhaustive]
175#[derive(Debug, Clone, PartialEq)]
176pub struct OffsetCommitResponsePartition {
177 pub partition_index: i32,
181
182 pub error_code: i16,
186
187 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
189}
190
191impl OffsetCommitResponsePartition {
192 pub fn with_partition_index(mut self, value: i32) -> Self {
198 self.partition_index = value;
199 self
200 }
201 pub fn with_error_code(mut self, value: i16) -> Self {
207 self.error_code = value;
208 self
209 }
210 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
212 self.unknown_tagged_fields = value;
213 self
214 }
215 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
217 self.unknown_tagged_fields.insert(key, value);
218 self
219 }
220}
221
222#[cfg(feature = "broker")]
223impl Encodable for OffsetCommitResponsePartition {
224 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
225 if version < 2 || version > 10 {
226 bail!("specified version not supported by this message type");
227 }
228 types::Int32.encode(buf, &self.partition_index)?;
229 types::Int16.encode(buf, &self.error_code)?;
230 if version >= 8 {
231 let num_tagged_fields = self.unknown_tagged_fields.len();
232 if num_tagged_fields > std::u32::MAX as usize {
233 bail!(
234 "Too many tagged fields to encode ({} fields)",
235 num_tagged_fields
236 );
237 }
238 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
239
240 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
241 }
242 Ok(())
243 }
244 fn compute_size(&self, version: i16) -> Result<usize> {
245 let mut total_size = 0;
246 total_size += types::Int32.compute_size(&self.partition_index)?;
247 total_size += types::Int16.compute_size(&self.error_code)?;
248 if version >= 8 {
249 let num_tagged_fields = self.unknown_tagged_fields.len();
250 if num_tagged_fields > std::u32::MAX as usize {
251 bail!(
252 "Too many tagged fields to encode ({} fields)",
253 num_tagged_fields
254 );
255 }
256 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
257
258 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
259 }
260 Ok(total_size)
261 }
262}
263
264#[cfg(feature = "client")]
265impl Decodable for OffsetCommitResponsePartition {
266 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
267 if version < 2 || version > 10 {
268 bail!("specified version not supported by this message type");
269 }
270 let partition_index = types::Int32.decode(buf)?;
271 let error_code = types::Int16.decode(buf)?;
272 let mut unknown_tagged_fields = BTreeMap::new();
273 if version >= 8 {
274 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
275 for _ in 0..num_tagged_fields {
276 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
277 let size: u32 = types::UnsignedVarInt.decode(buf)?;
278 let unknown_value = buf.try_get_bytes(size as usize)?;
279 unknown_tagged_fields.insert(tag as i32, unknown_value);
280 }
281 }
282 Ok(Self {
283 partition_index,
284 error_code,
285 unknown_tagged_fields,
286 })
287 }
288}
289
290impl Default for OffsetCommitResponsePartition {
291 fn default() -> Self {
292 Self {
293 partition_index: 0,
294 error_code: 0,
295 unknown_tagged_fields: BTreeMap::new(),
296 }
297 }
298}
299
300impl Message for OffsetCommitResponsePartition {
301 const VERSIONS: VersionRange = VersionRange { min: 2, max: 10 };
302 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
303}
304
305#[non_exhaustive]
307#[derive(Debug, Clone, PartialEq)]
308pub struct OffsetCommitResponseTopic {
309 pub name: super::TopicName,
313
314 pub topic_id: Uuid,
318
319 pub partitions: Vec<OffsetCommitResponsePartition>,
323
324 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
326}
327
328impl OffsetCommitResponseTopic {
329 pub fn with_name(mut self, value: super::TopicName) -> Self {
335 self.name = value;
336 self
337 }
338 pub fn with_topic_id(mut self, value: Uuid) -> Self {
344 self.topic_id = value;
345 self
346 }
347 pub fn with_partitions(mut self, value: Vec<OffsetCommitResponsePartition>) -> Self {
353 self.partitions = value;
354 self
355 }
356 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
358 self.unknown_tagged_fields = value;
359 self
360 }
361 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
363 self.unknown_tagged_fields.insert(key, value);
364 self
365 }
366}
367
368#[cfg(feature = "broker")]
369impl Encodable for OffsetCommitResponseTopic {
370 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
371 if version < 2 || version > 10 {
372 bail!("specified version not supported by this message type");
373 }
374 if version <= 9 {
375 if version >= 8 {
376 types::CompactString.encode(buf, &self.name)?;
377 } else {
378 types::String.encode(buf, &self.name)?;
379 }
380 }
381 if version >= 10 {
382 types::Uuid.encode(buf, &self.topic_id)?;
383 }
384 if version >= 8 {
385 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
386 } else {
387 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
388 }
389 if version >= 8 {
390 let num_tagged_fields = self.unknown_tagged_fields.len();
391 if num_tagged_fields > std::u32::MAX as usize {
392 bail!(
393 "Too many tagged fields to encode ({} fields)",
394 num_tagged_fields
395 );
396 }
397 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
398
399 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
400 }
401 Ok(())
402 }
403 fn compute_size(&self, version: i16) -> Result<usize> {
404 let mut total_size = 0;
405 if version <= 9 {
406 if version >= 8 {
407 total_size += types::CompactString.compute_size(&self.name)?;
408 } else {
409 total_size += types::String.compute_size(&self.name)?;
410 }
411 }
412 if version >= 10 {
413 total_size += types::Uuid.compute_size(&self.topic_id)?;
414 }
415 if version >= 8 {
416 total_size +=
417 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
418 } else {
419 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
420 }
421 if version >= 8 {
422 let num_tagged_fields = self.unknown_tagged_fields.len();
423 if num_tagged_fields > std::u32::MAX as usize {
424 bail!(
425 "Too many tagged fields to encode ({} fields)",
426 num_tagged_fields
427 );
428 }
429 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
430
431 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
432 }
433 Ok(total_size)
434 }
435}
436
437#[cfg(feature = "client")]
438impl Decodable for OffsetCommitResponseTopic {
439 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
440 if version < 2 || version > 10 {
441 bail!("specified version not supported by this message type");
442 }
443 let name = if version <= 9 {
444 if version >= 8 {
445 types::CompactString.decode(buf)?
446 } else {
447 types::String.decode(buf)?
448 }
449 } else {
450 Default::default()
451 };
452 let topic_id = if version >= 10 {
453 types::Uuid.decode(buf)?
454 } else {
455 Uuid::nil()
456 };
457 let partitions = if version >= 8 {
458 types::CompactArray(types::Struct { version }).decode(buf)?
459 } else {
460 types::Array(types::Struct { version }).decode(buf)?
461 };
462 let mut unknown_tagged_fields = BTreeMap::new();
463 if version >= 8 {
464 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
465 for _ in 0..num_tagged_fields {
466 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
467 let size: u32 = types::UnsignedVarInt.decode(buf)?;
468 let unknown_value = buf.try_get_bytes(size as usize)?;
469 unknown_tagged_fields.insert(tag as i32, unknown_value);
470 }
471 }
472 Ok(Self {
473 name,
474 topic_id,
475 partitions,
476 unknown_tagged_fields,
477 })
478 }
479}
480
481impl Default for OffsetCommitResponseTopic {
482 fn default() -> Self {
483 Self {
484 name: Default::default(),
485 topic_id: Uuid::nil(),
486 partitions: Default::default(),
487 unknown_tagged_fields: BTreeMap::new(),
488 }
489 }
490}
491
492impl Message for OffsetCommitResponseTopic {
493 const VERSIONS: VersionRange = VersionRange { min: 2, max: 10 };
494 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
495}
496
497impl HeaderVersion for OffsetCommitResponse {
498 fn header_version(version: i16) -> i16 {
499 if version >= 8 {
500 1
501 } else {
502 0
503 }
504 }
505}