1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ListOffsetsPartitionResponse {
24 pub partition_index: i32,
28
29 pub error_code: i16,
33
34 pub timestamp: i64,
38
39 pub offset: i64,
43
44 pub leader_epoch: i32,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl ListOffsetsPartitionResponse {
54 pub fn with_partition_index(mut self, value: i32) -> Self {
60 self.partition_index = value;
61 self
62 }
63 pub fn with_error_code(mut self, value: i16) -> Self {
69 self.error_code = value;
70 self
71 }
72 pub fn with_timestamp(mut self, value: i64) -> Self {
78 self.timestamp = value;
79 self
80 }
81 pub fn with_offset(mut self, value: i64) -> Self {
87 self.offset = value;
88 self
89 }
90 pub fn with_leader_epoch(mut self, value: i32) -> Self {
96 self.leader_epoch = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "broker")]
112impl Encodable for ListOffsetsPartitionResponse {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 if version < 1 || version > 10 {
115 bail!("specified version not supported by this message type");
116 }
117 types::Int32.encode(buf, &self.partition_index)?;
118 types::Int16.encode(buf, &self.error_code)?;
119 types::Int64.encode(buf, &self.timestamp)?;
120 types::Int64.encode(buf, &self.offset)?;
121 if version >= 4 {
122 types::Int32.encode(buf, &self.leader_epoch)?;
123 } else {
124 if self.leader_epoch != -1 {
125 bail!("A field is set that is not available on the selected protocol version");
126 }
127 }
128 if version >= 6 {
129 let num_tagged_fields = self.unknown_tagged_fields.len();
130 if num_tagged_fields > std::u32::MAX as usize {
131 bail!(
132 "Too many tagged fields to encode ({} fields)",
133 num_tagged_fields
134 );
135 }
136 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
137
138 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
139 }
140 Ok(())
141 }
142 fn compute_size(&self, version: i16) -> Result<usize> {
143 let mut total_size = 0;
144 total_size += types::Int32.compute_size(&self.partition_index)?;
145 total_size += types::Int16.compute_size(&self.error_code)?;
146 total_size += types::Int64.compute_size(&self.timestamp)?;
147 total_size += types::Int64.compute_size(&self.offset)?;
148 if version >= 4 {
149 total_size += types::Int32.compute_size(&self.leader_epoch)?;
150 } else {
151 if self.leader_epoch != -1 {
152 bail!("A field is set that is not available on the selected protocol version");
153 }
154 }
155 if version >= 6 {
156 let num_tagged_fields = self.unknown_tagged_fields.len();
157 if num_tagged_fields > std::u32::MAX as usize {
158 bail!(
159 "Too many tagged fields to encode ({} fields)",
160 num_tagged_fields
161 );
162 }
163 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
164
165 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
166 }
167 Ok(total_size)
168 }
169}
170
171#[cfg(feature = "client")]
172impl Decodable for ListOffsetsPartitionResponse {
173 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
174 if version < 1 || version > 10 {
175 bail!("specified version not supported by this message type");
176 }
177 let partition_index = types::Int32.decode(buf)?;
178 let error_code = types::Int16.decode(buf)?;
179 let timestamp = types::Int64.decode(buf)?;
180 let offset = types::Int64.decode(buf)?;
181 let leader_epoch = if version >= 4 {
182 types::Int32.decode(buf)?
183 } else {
184 -1
185 };
186 let mut unknown_tagged_fields = BTreeMap::new();
187 if version >= 6 {
188 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
189 for _ in 0..num_tagged_fields {
190 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
191 let size: u32 = types::UnsignedVarInt.decode(buf)?;
192 let unknown_value = buf.try_get_bytes(size as usize)?;
193 unknown_tagged_fields.insert(tag as i32, unknown_value);
194 }
195 }
196 Ok(Self {
197 partition_index,
198 error_code,
199 timestamp,
200 offset,
201 leader_epoch,
202 unknown_tagged_fields,
203 })
204 }
205}
206
207impl Default for ListOffsetsPartitionResponse {
208 fn default() -> Self {
209 Self {
210 partition_index: 0,
211 error_code: 0,
212 timestamp: -1,
213 offset: -1,
214 leader_epoch: -1,
215 unknown_tagged_fields: BTreeMap::new(),
216 }
217 }
218}
219
220impl Message for ListOffsetsPartitionResponse {
221 const VERSIONS: VersionRange = VersionRange { min: 1, max: 10 };
222 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
223}
224
225#[non_exhaustive]
227#[derive(Debug, Clone, PartialEq)]
228pub struct ListOffsetsResponse {
229 pub throttle_time_ms: i32,
233
234 pub topics: Vec<ListOffsetsTopicResponse>,
238
239 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
241}
242
243impl ListOffsetsResponse {
244 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
250 self.throttle_time_ms = value;
251 self
252 }
253 pub fn with_topics(mut self, value: Vec<ListOffsetsTopicResponse>) -> Self {
259 self.topics = value;
260 self
261 }
262 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
264 self.unknown_tagged_fields = value;
265 self
266 }
267 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
269 self.unknown_tagged_fields.insert(key, value);
270 self
271 }
272}
273
274#[cfg(feature = "broker")]
275impl Encodable for ListOffsetsResponse {
276 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
277 if version < 1 || version > 10 {
278 bail!("specified version not supported by this message type");
279 }
280 if version >= 2 {
281 types::Int32.encode(buf, &self.throttle_time_ms)?;
282 }
283 if version >= 6 {
284 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
285 } else {
286 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
287 }
288 if version >= 6 {
289 let num_tagged_fields = self.unknown_tagged_fields.len();
290 if num_tagged_fields > std::u32::MAX as usize {
291 bail!(
292 "Too many tagged fields to encode ({} fields)",
293 num_tagged_fields
294 );
295 }
296 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
297
298 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
299 }
300 Ok(())
301 }
302 fn compute_size(&self, version: i16) -> Result<usize> {
303 let mut total_size = 0;
304 if version >= 2 {
305 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
306 }
307 if version >= 6 {
308 total_size +=
309 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
310 } else {
311 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
312 }
313 if version >= 6 {
314 let num_tagged_fields = self.unknown_tagged_fields.len();
315 if num_tagged_fields > std::u32::MAX as usize {
316 bail!(
317 "Too many tagged fields to encode ({} fields)",
318 num_tagged_fields
319 );
320 }
321 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
322
323 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
324 }
325 Ok(total_size)
326 }
327}
328
329#[cfg(feature = "client")]
330impl Decodable for ListOffsetsResponse {
331 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
332 if version < 1 || version > 10 {
333 bail!("specified version not supported by this message type");
334 }
335 let throttle_time_ms = if version >= 2 {
336 types::Int32.decode(buf)?
337 } else {
338 0
339 };
340 let topics = if version >= 6 {
341 types::CompactArray(types::Struct { version }).decode(buf)?
342 } else {
343 types::Array(types::Struct { version }).decode(buf)?
344 };
345 let mut unknown_tagged_fields = BTreeMap::new();
346 if version >= 6 {
347 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
348 for _ in 0..num_tagged_fields {
349 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
350 let size: u32 = types::UnsignedVarInt.decode(buf)?;
351 let unknown_value = buf.try_get_bytes(size as usize)?;
352 unknown_tagged_fields.insert(tag as i32, unknown_value);
353 }
354 }
355 Ok(Self {
356 throttle_time_ms,
357 topics,
358 unknown_tagged_fields,
359 })
360 }
361}
362
363impl Default for ListOffsetsResponse {
364 fn default() -> Self {
365 Self {
366 throttle_time_ms: 0,
367 topics: Default::default(),
368 unknown_tagged_fields: BTreeMap::new(),
369 }
370 }
371}
372
373impl Message for ListOffsetsResponse {
374 const VERSIONS: VersionRange = VersionRange { min: 1, max: 10 };
375 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
376}
377
378#[non_exhaustive]
380#[derive(Debug, Clone, PartialEq)]
381pub struct ListOffsetsTopicResponse {
382 pub name: super::TopicName,
386
387 pub partitions: Vec<ListOffsetsPartitionResponse>,
391
392 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
394}
395
396impl ListOffsetsTopicResponse {
397 pub fn with_name(mut self, value: super::TopicName) -> Self {
403 self.name = value;
404 self
405 }
406 pub fn with_partitions(mut self, value: Vec<ListOffsetsPartitionResponse>) -> Self {
412 self.partitions = value;
413 self
414 }
415 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
417 self.unknown_tagged_fields = value;
418 self
419 }
420 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
422 self.unknown_tagged_fields.insert(key, value);
423 self
424 }
425}
426
427#[cfg(feature = "broker")]
428impl Encodable for ListOffsetsTopicResponse {
429 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
430 if version < 1 || version > 10 {
431 bail!("specified version not supported by this message type");
432 }
433 if version >= 6 {
434 types::CompactString.encode(buf, &self.name)?;
435 } else {
436 types::String.encode(buf, &self.name)?;
437 }
438 if version >= 6 {
439 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
440 } else {
441 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
442 }
443 if version >= 6 {
444 let num_tagged_fields = self.unknown_tagged_fields.len();
445 if num_tagged_fields > std::u32::MAX as usize {
446 bail!(
447 "Too many tagged fields to encode ({} fields)",
448 num_tagged_fields
449 );
450 }
451 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
452
453 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
454 }
455 Ok(())
456 }
457 fn compute_size(&self, version: i16) -> Result<usize> {
458 let mut total_size = 0;
459 if version >= 6 {
460 total_size += types::CompactString.compute_size(&self.name)?;
461 } else {
462 total_size += types::String.compute_size(&self.name)?;
463 }
464 if version >= 6 {
465 total_size +=
466 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
467 } else {
468 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
469 }
470 if version >= 6 {
471 let num_tagged_fields = self.unknown_tagged_fields.len();
472 if num_tagged_fields > std::u32::MAX as usize {
473 bail!(
474 "Too many tagged fields to encode ({} fields)",
475 num_tagged_fields
476 );
477 }
478 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
479
480 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
481 }
482 Ok(total_size)
483 }
484}
485
486#[cfg(feature = "client")]
487impl Decodable for ListOffsetsTopicResponse {
488 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
489 if version < 1 || version > 10 {
490 bail!("specified version not supported by this message type");
491 }
492 let name = if version >= 6 {
493 types::CompactString.decode(buf)?
494 } else {
495 types::String.decode(buf)?
496 };
497 let partitions = if version >= 6 {
498 types::CompactArray(types::Struct { version }).decode(buf)?
499 } else {
500 types::Array(types::Struct { version }).decode(buf)?
501 };
502 let mut unknown_tagged_fields = BTreeMap::new();
503 if version >= 6 {
504 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
505 for _ in 0..num_tagged_fields {
506 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
507 let size: u32 = types::UnsignedVarInt.decode(buf)?;
508 let unknown_value = buf.try_get_bytes(size as usize)?;
509 unknown_tagged_fields.insert(tag as i32, unknown_value);
510 }
511 }
512 Ok(Self {
513 name,
514 partitions,
515 unknown_tagged_fields,
516 })
517 }
518}
519
520impl Default for ListOffsetsTopicResponse {
521 fn default() -> Self {
522 Self {
523 name: Default::default(),
524 partitions: Default::default(),
525 unknown_tagged_fields: BTreeMap::new(),
526 }
527 }
528}
529
530impl Message for ListOffsetsTopicResponse {
531 const VERSIONS: VersionRange = VersionRange { min: 1, max: 10 };
532 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
533}
534
535impl HeaderVersion for ListOffsetsResponse {
536 fn header_version(version: i16) -> i16 {
537 if version >= 6 {
538 1
539 } else {
540 0
541 }
542 }
543}