1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ListOffsetsPartitionResponse {
24 pub partition_index: i32,
28
29 pub error_code: i16,
33
34 pub old_style_offsets: Vec<i64>,
38
39 pub timestamp: i64,
43
44 pub offset: i64,
48
49 pub leader_epoch: i32,
53
54 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl ListOffsetsPartitionResponse {
59 pub fn with_partition_index(mut self, value: i32) -> Self {
65 self.partition_index = value;
66 self
67 }
68 pub fn with_error_code(mut self, value: i16) -> Self {
74 self.error_code = value;
75 self
76 }
77 pub fn with_old_style_offsets(mut self, value: Vec<i64>) -> Self {
83 self.old_style_offsets = value;
84 self
85 }
86 pub fn with_timestamp(mut self, value: i64) -> Self {
92 self.timestamp = value;
93 self
94 }
95 pub fn with_offset(mut self, value: i64) -> Self {
101 self.offset = value;
102 self
103 }
104 pub fn with_leader_epoch(mut self, value: i32) -> Self {
110 self.leader_epoch = value;
111 self
112 }
113 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115 self.unknown_tagged_fields = value;
116 self
117 }
118 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120 self.unknown_tagged_fields.insert(key, value);
121 self
122 }
123}
124
125#[cfg(feature = "broker")]
126impl Encodable for ListOffsetsPartitionResponse {
127 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128 if version < 0 || version > 9 {
129 bail!("specified version not supported by this message type");
130 }
131 types::Int32.encode(buf, &self.partition_index)?;
132 types::Int16.encode(buf, &self.error_code)?;
133 if version == 0 {
134 types::Array(types::Int64).encode(buf, &self.old_style_offsets)?;
135 } else {
136 if !self.old_style_offsets.is_empty() {
137 bail!("A field is set that is not available on the selected protocol version");
138 }
139 }
140 if version >= 1 {
141 types::Int64.encode(buf, &self.timestamp)?;
142 } else {
143 if self.timestamp != -1 {
144 bail!("A field is set that is not available on the selected protocol version");
145 }
146 }
147 if version >= 1 {
148 types::Int64.encode(buf, &self.offset)?;
149 } else {
150 if self.offset != -1 {
151 bail!("A field is set that is not available on the selected protocol version");
152 }
153 }
154 if version >= 4 {
155 types::Int32.encode(buf, &self.leader_epoch)?;
156 } else {
157 if self.leader_epoch != -1 {
158 bail!("A field is set that is not available on the selected protocol version");
159 }
160 }
161 if version >= 6 {
162 let num_tagged_fields = self.unknown_tagged_fields.len();
163 if num_tagged_fields > std::u32::MAX as usize {
164 bail!(
165 "Too many tagged fields to encode ({} fields)",
166 num_tagged_fields
167 );
168 }
169 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
170
171 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
172 }
173 Ok(())
174 }
175 fn compute_size(&self, version: i16) -> Result<usize> {
176 let mut total_size = 0;
177 total_size += types::Int32.compute_size(&self.partition_index)?;
178 total_size += types::Int16.compute_size(&self.error_code)?;
179 if version == 0 {
180 total_size += types::Array(types::Int64).compute_size(&self.old_style_offsets)?;
181 } else {
182 if !self.old_style_offsets.is_empty() {
183 bail!("A field is set that is not available on the selected protocol version");
184 }
185 }
186 if version >= 1 {
187 total_size += types::Int64.compute_size(&self.timestamp)?;
188 } else {
189 if self.timestamp != -1 {
190 bail!("A field is set that is not available on the selected protocol version");
191 }
192 }
193 if version >= 1 {
194 total_size += types::Int64.compute_size(&self.offset)?;
195 } else {
196 if self.offset != -1 {
197 bail!("A field is set that is not available on the selected protocol version");
198 }
199 }
200 if version >= 4 {
201 total_size += types::Int32.compute_size(&self.leader_epoch)?;
202 } else {
203 if self.leader_epoch != -1 {
204 bail!("A field is set that is not available on the selected protocol version");
205 }
206 }
207 if version >= 6 {
208 let num_tagged_fields = self.unknown_tagged_fields.len();
209 if num_tagged_fields > std::u32::MAX as usize {
210 bail!(
211 "Too many tagged fields to encode ({} fields)",
212 num_tagged_fields
213 );
214 }
215 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
216
217 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
218 }
219 Ok(total_size)
220 }
221}
222
223#[cfg(feature = "client")]
224impl Decodable for ListOffsetsPartitionResponse {
225 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
226 if version < 0 || version > 9 {
227 bail!("specified version not supported by this message type");
228 }
229 let partition_index = types::Int32.decode(buf)?;
230 let error_code = types::Int16.decode(buf)?;
231 let old_style_offsets = if version == 0 {
232 types::Array(types::Int64).decode(buf)?
233 } else {
234 Default::default()
235 };
236 let timestamp = if version >= 1 {
237 types::Int64.decode(buf)?
238 } else {
239 -1
240 };
241 let offset = if version >= 1 {
242 types::Int64.decode(buf)?
243 } else {
244 -1
245 };
246 let leader_epoch = if version >= 4 {
247 types::Int32.decode(buf)?
248 } else {
249 -1
250 };
251 let mut unknown_tagged_fields = BTreeMap::new();
252 if version >= 6 {
253 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
254 for _ in 0..num_tagged_fields {
255 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
256 let size: u32 = types::UnsignedVarInt.decode(buf)?;
257 let unknown_value = buf.try_get_bytes(size as usize)?;
258 unknown_tagged_fields.insert(tag as i32, unknown_value);
259 }
260 }
261 Ok(Self {
262 partition_index,
263 error_code,
264 old_style_offsets,
265 timestamp,
266 offset,
267 leader_epoch,
268 unknown_tagged_fields,
269 })
270 }
271}
272
273impl Default for ListOffsetsPartitionResponse {
274 fn default() -> Self {
275 Self {
276 partition_index: 0,
277 error_code: 0,
278 old_style_offsets: Default::default(),
279 timestamp: -1,
280 offset: -1,
281 leader_epoch: -1,
282 unknown_tagged_fields: BTreeMap::new(),
283 }
284 }
285}
286
287impl Message for ListOffsetsPartitionResponse {
288 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
289 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
290}
291
292#[non_exhaustive]
294#[derive(Debug, Clone, PartialEq)]
295pub struct ListOffsetsResponse {
296 pub throttle_time_ms: i32,
300
301 pub topics: Vec<ListOffsetsTopicResponse>,
305
306 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
308}
309
310impl ListOffsetsResponse {
311 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
317 self.throttle_time_ms = value;
318 self
319 }
320 pub fn with_topics(mut self, value: Vec<ListOffsetsTopicResponse>) -> Self {
326 self.topics = value;
327 self
328 }
329 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
331 self.unknown_tagged_fields = value;
332 self
333 }
334 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
336 self.unknown_tagged_fields.insert(key, value);
337 self
338 }
339}
340
341#[cfg(feature = "broker")]
342impl Encodable for ListOffsetsResponse {
343 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
344 if version < 0 || version > 9 {
345 bail!("specified version not supported by this message type");
346 }
347 if version >= 2 {
348 types::Int32.encode(buf, &self.throttle_time_ms)?;
349 }
350 if version >= 6 {
351 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
352 } else {
353 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
354 }
355 if version >= 6 {
356 let num_tagged_fields = self.unknown_tagged_fields.len();
357 if num_tagged_fields > std::u32::MAX as usize {
358 bail!(
359 "Too many tagged fields to encode ({} fields)",
360 num_tagged_fields
361 );
362 }
363 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
364
365 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
366 }
367 Ok(())
368 }
369 fn compute_size(&self, version: i16) -> Result<usize> {
370 let mut total_size = 0;
371 if version >= 2 {
372 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
373 }
374 if version >= 6 {
375 total_size +=
376 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
377 } else {
378 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
379 }
380 if version >= 6 {
381 let num_tagged_fields = self.unknown_tagged_fields.len();
382 if num_tagged_fields > std::u32::MAX as usize {
383 bail!(
384 "Too many tagged fields to encode ({} fields)",
385 num_tagged_fields
386 );
387 }
388 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
389
390 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
391 }
392 Ok(total_size)
393 }
394}
395
396#[cfg(feature = "client")]
397impl Decodable for ListOffsetsResponse {
398 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
399 if version < 0 || version > 9 {
400 bail!("specified version not supported by this message type");
401 }
402 let throttle_time_ms = if version >= 2 {
403 types::Int32.decode(buf)?
404 } else {
405 0
406 };
407 let topics = if version >= 6 {
408 types::CompactArray(types::Struct { version }).decode(buf)?
409 } else {
410 types::Array(types::Struct { version }).decode(buf)?
411 };
412 let mut unknown_tagged_fields = BTreeMap::new();
413 if version >= 6 {
414 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
415 for _ in 0..num_tagged_fields {
416 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
417 let size: u32 = types::UnsignedVarInt.decode(buf)?;
418 let unknown_value = buf.try_get_bytes(size as usize)?;
419 unknown_tagged_fields.insert(tag as i32, unknown_value);
420 }
421 }
422 Ok(Self {
423 throttle_time_ms,
424 topics,
425 unknown_tagged_fields,
426 })
427 }
428}
429
430impl Default for ListOffsetsResponse {
431 fn default() -> Self {
432 Self {
433 throttle_time_ms: 0,
434 topics: Default::default(),
435 unknown_tagged_fields: BTreeMap::new(),
436 }
437 }
438}
439
440impl Message for ListOffsetsResponse {
441 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
442 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
443}
444
445#[non_exhaustive]
447#[derive(Debug, Clone, PartialEq)]
448pub struct ListOffsetsTopicResponse {
449 pub name: super::TopicName,
453
454 pub partitions: Vec<ListOffsetsPartitionResponse>,
458
459 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
461}
462
463impl ListOffsetsTopicResponse {
464 pub fn with_name(mut self, value: super::TopicName) -> Self {
470 self.name = value;
471 self
472 }
473 pub fn with_partitions(mut self, value: Vec<ListOffsetsPartitionResponse>) -> Self {
479 self.partitions = value;
480 self
481 }
482 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
484 self.unknown_tagged_fields = value;
485 self
486 }
487 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
489 self.unknown_tagged_fields.insert(key, value);
490 self
491 }
492}
493
494#[cfg(feature = "broker")]
495impl Encodable for ListOffsetsTopicResponse {
496 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
497 if version < 0 || version > 9 {
498 bail!("specified version not supported by this message type");
499 }
500 if version >= 6 {
501 types::CompactString.encode(buf, &self.name)?;
502 } else {
503 types::String.encode(buf, &self.name)?;
504 }
505 if version >= 6 {
506 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
507 } else {
508 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
509 }
510 if version >= 6 {
511 let num_tagged_fields = self.unknown_tagged_fields.len();
512 if num_tagged_fields > std::u32::MAX as usize {
513 bail!(
514 "Too many tagged fields to encode ({} fields)",
515 num_tagged_fields
516 );
517 }
518 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
519
520 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
521 }
522 Ok(())
523 }
524 fn compute_size(&self, version: i16) -> Result<usize> {
525 let mut total_size = 0;
526 if version >= 6 {
527 total_size += types::CompactString.compute_size(&self.name)?;
528 } else {
529 total_size += types::String.compute_size(&self.name)?;
530 }
531 if version >= 6 {
532 total_size +=
533 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
534 } else {
535 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
536 }
537 if version >= 6 {
538 let num_tagged_fields = self.unknown_tagged_fields.len();
539 if num_tagged_fields > std::u32::MAX as usize {
540 bail!(
541 "Too many tagged fields to encode ({} fields)",
542 num_tagged_fields
543 );
544 }
545 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
546
547 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
548 }
549 Ok(total_size)
550 }
551}
552
553#[cfg(feature = "client")]
554impl Decodable for ListOffsetsTopicResponse {
555 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
556 if version < 0 || version > 9 {
557 bail!("specified version not supported by this message type");
558 }
559 let name = if version >= 6 {
560 types::CompactString.decode(buf)?
561 } else {
562 types::String.decode(buf)?
563 };
564 let partitions = if version >= 6 {
565 types::CompactArray(types::Struct { version }).decode(buf)?
566 } else {
567 types::Array(types::Struct { version }).decode(buf)?
568 };
569 let mut unknown_tagged_fields = BTreeMap::new();
570 if version >= 6 {
571 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
572 for _ in 0..num_tagged_fields {
573 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
574 let size: u32 = types::UnsignedVarInt.decode(buf)?;
575 let unknown_value = buf.try_get_bytes(size as usize)?;
576 unknown_tagged_fields.insert(tag as i32, unknown_value);
577 }
578 }
579 Ok(Self {
580 name,
581 partitions,
582 unknown_tagged_fields,
583 })
584 }
585}
586
587impl Default for ListOffsetsTopicResponse {
588 fn default() -> Self {
589 Self {
590 name: Default::default(),
591 partitions: Default::default(),
592 unknown_tagged_fields: BTreeMap::new(),
593 }
594 }
595}
596
597impl Message for ListOffsetsTopicResponse {
598 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
599 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
600}
601
602impl HeaderVersion for ListOffsetsResponse {
603 fn header_version(version: i16) -> i16 {
604 if version >= 6 {
605 1
606 } else {
607 0
608 }
609 }
610}