1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetCommitResponse {
24 pub throttle_time_ms: i32,
28
29 pub topics: Vec<OffsetCommitResponseTopic>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl OffsetCommitResponse {
39 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
45 self.throttle_time_ms = value;
46 self
47 }
48 pub fn with_topics(mut self, value: Vec<OffsetCommitResponseTopic>) -> Self {
54 self.topics = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for OffsetCommitResponse {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version >= 3 {
73 types::Int32.encode(buf, &self.throttle_time_ms)?;
74 }
75 if version >= 8 {
76 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
77 } else {
78 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
79 }
80 if version >= 8 {
81 let num_tagged_fields = self.unknown_tagged_fields.len();
82 if num_tagged_fields > std::u32::MAX as usize {
83 bail!(
84 "Too many tagged fields to encode ({} fields)",
85 num_tagged_fields
86 );
87 }
88 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
89
90 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
91 }
92 Ok(())
93 }
94 fn compute_size(&self, version: i16) -> Result<usize> {
95 let mut total_size = 0;
96 if version >= 3 {
97 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
98 }
99 if version >= 8 {
100 total_size +=
101 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
102 } else {
103 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
104 }
105 if version >= 8 {
106 let num_tagged_fields = self.unknown_tagged_fields.len();
107 if num_tagged_fields > std::u32::MAX as usize {
108 bail!(
109 "Too many tagged fields to encode ({} fields)",
110 num_tagged_fields
111 );
112 }
113 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
114
115 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
116 }
117 Ok(total_size)
118 }
119}
120
121#[cfg(feature = "client")]
122impl Decodable for OffsetCommitResponse {
123 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
124 let throttle_time_ms = if version >= 3 {
125 types::Int32.decode(buf)?
126 } else {
127 0
128 };
129 let topics = if version >= 8 {
130 types::CompactArray(types::Struct { version }).decode(buf)?
131 } else {
132 types::Array(types::Struct { version }).decode(buf)?
133 };
134 let mut unknown_tagged_fields = BTreeMap::new();
135 if version >= 8 {
136 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
137 for _ in 0..num_tagged_fields {
138 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
139 let size: u32 = types::UnsignedVarInt.decode(buf)?;
140 let unknown_value = buf.try_get_bytes(size as usize)?;
141 unknown_tagged_fields.insert(tag as i32, unknown_value);
142 }
143 }
144 Ok(Self {
145 throttle_time_ms,
146 topics,
147 unknown_tagged_fields,
148 })
149 }
150}
151
152impl Default for OffsetCommitResponse {
153 fn default() -> Self {
154 Self {
155 throttle_time_ms: 0,
156 topics: Default::default(),
157 unknown_tagged_fields: BTreeMap::new(),
158 }
159 }
160}
161
162impl Message for OffsetCommitResponse {
163 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
164 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
165}
166
167#[non_exhaustive]
169#[derive(Debug, Clone, PartialEq)]
170pub struct OffsetCommitResponsePartition {
171 pub partition_index: i32,
175
176 pub error_code: i16,
180
181 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
183}
184
185impl OffsetCommitResponsePartition {
186 pub fn with_partition_index(mut self, value: i32) -> Self {
192 self.partition_index = value;
193 self
194 }
195 pub fn with_error_code(mut self, value: i16) -> Self {
201 self.error_code = value;
202 self
203 }
204 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
206 self.unknown_tagged_fields = value;
207 self
208 }
209 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
211 self.unknown_tagged_fields.insert(key, value);
212 self
213 }
214}
215
216#[cfg(feature = "broker")]
217impl Encodable for OffsetCommitResponsePartition {
218 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
219 types::Int32.encode(buf, &self.partition_index)?;
220 types::Int16.encode(buf, &self.error_code)?;
221 if version >= 8 {
222 let num_tagged_fields = self.unknown_tagged_fields.len();
223 if num_tagged_fields > std::u32::MAX as usize {
224 bail!(
225 "Too many tagged fields to encode ({} fields)",
226 num_tagged_fields
227 );
228 }
229 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
230
231 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
232 }
233 Ok(())
234 }
235 fn compute_size(&self, version: i16) -> Result<usize> {
236 let mut total_size = 0;
237 total_size += types::Int32.compute_size(&self.partition_index)?;
238 total_size += types::Int16.compute_size(&self.error_code)?;
239 if version >= 8 {
240 let num_tagged_fields = self.unknown_tagged_fields.len();
241 if num_tagged_fields > std::u32::MAX as usize {
242 bail!(
243 "Too many tagged fields to encode ({} fields)",
244 num_tagged_fields
245 );
246 }
247 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
248
249 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
250 }
251 Ok(total_size)
252 }
253}
254
255#[cfg(feature = "client")]
256impl Decodable for OffsetCommitResponsePartition {
257 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
258 let partition_index = types::Int32.decode(buf)?;
259 let error_code = types::Int16.decode(buf)?;
260 let mut unknown_tagged_fields = BTreeMap::new();
261 if version >= 8 {
262 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
263 for _ in 0..num_tagged_fields {
264 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
265 let size: u32 = types::UnsignedVarInt.decode(buf)?;
266 let unknown_value = buf.try_get_bytes(size as usize)?;
267 unknown_tagged_fields.insert(tag as i32, unknown_value);
268 }
269 }
270 Ok(Self {
271 partition_index,
272 error_code,
273 unknown_tagged_fields,
274 })
275 }
276}
277
278impl Default for OffsetCommitResponsePartition {
279 fn default() -> Self {
280 Self {
281 partition_index: 0,
282 error_code: 0,
283 unknown_tagged_fields: BTreeMap::new(),
284 }
285 }
286}
287
288impl Message for OffsetCommitResponsePartition {
289 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
290 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
291}
292
293#[non_exhaustive]
295#[derive(Debug, Clone, PartialEq)]
296pub struct OffsetCommitResponseTopic {
297 pub name: super::TopicName,
301
302 pub partitions: Vec<OffsetCommitResponsePartition>,
306
307 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
309}
310
311impl OffsetCommitResponseTopic {
312 pub fn with_name(mut self, value: super::TopicName) -> Self {
318 self.name = value;
319 self
320 }
321 pub fn with_partitions(mut self, value: Vec<OffsetCommitResponsePartition>) -> Self {
327 self.partitions = value;
328 self
329 }
330 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
332 self.unknown_tagged_fields = value;
333 self
334 }
335 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
337 self.unknown_tagged_fields.insert(key, value);
338 self
339 }
340}
341
342#[cfg(feature = "broker")]
343impl Encodable for OffsetCommitResponseTopic {
344 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
345 if version >= 8 {
346 types::CompactString.encode(buf, &self.name)?;
347 } else {
348 types::String.encode(buf, &self.name)?;
349 }
350 if version >= 8 {
351 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
352 } else {
353 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
354 }
355 if version >= 8 {
356 let num_tagged_fields = self.unknown_tagged_fields.len();
357 if num_tagged_fields > std::u32::MAX as usize {
358 bail!(
359 "Too many tagged fields to encode ({} fields)",
360 num_tagged_fields
361 );
362 }
363 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
364
365 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
366 }
367 Ok(())
368 }
369 fn compute_size(&self, version: i16) -> Result<usize> {
370 let mut total_size = 0;
371 if version >= 8 {
372 total_size += types::CompactString.compute_size(&self.name)?;
373 } else {
374 total_size += types::String.compute_size(&self.name)?;
375 }
376 if version >= 8 {
377 total_size +=
378 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
379 } else {
380 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
381 }
382 if version >= 8 {
383 let num_tagged_fields = self.unknown_tagged_fields.len();
384 if num_tagged_fields > std::u32::MAX as usize {
385 bail!(
386 "Too many tagged fields to encode ({} fields)",
387 num_tagged_fields
388 );
389 }
390 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
391
392 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
393 }
394 Ok(total_size)
395 }
396}
397
398#[cfg(feature = "client")]
399impl Decodable for OffsetCommitResponseTopic {
400 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
401 let name = if version >= 8 {
402 types::CompactString.decode(buf)?
403 } else {
404 types::String.decode(buf)?
405 };
406 let partitions = if version >= 8 {
407 types::CompactArray(types::Struct { version }).decode(buf)?
408 } else {
409 types::Array(types::Struct { version }).decode(buf)?
410 };
411 let mut unknown_tagged_fields = BTreeMap::new();
412 if version >= 8 {
413 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
414 for _ in 0..num_tagged_fields {
415 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
416 let size: u32 = types::UnsignedVarInt.decode(buf)?;
417 let unknown_value = buf.try_get_bytes(size as usize)?;
418 unknown_tagged_fields.insert(tag as i32, unknown_value);
419 }
420 }
421 Ok(Self {
422 name,
423 partitions,
424 unknown_tagged_fields,
425 })
426 }
427}
428
429impl Default for OffsetCommitResponseTopic {
430 fn default() -> Self {
431 Self {
432 name: Default::default(),
433 partitions: Default::default(),
434 unknown_tagged_fields: BTreeMap::new(),
435 }
436 }
437}
438
439impl Message for OffsetCommitResponseTopic {
440 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
441 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
442}
443
444impl HeaderVersion for OffsetCommitResponse {
445 fn header_version(version: i16) -> i16 {
446 if version >= 8 {
447 1
448 } else {
449 0
450 }
451 }
452}