1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ListOffsetsPartition {
24 pub partition_index: i32,
28
29 pub current_leader_epoch: i32,
33
34 pub timestamp: i64,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl ListOffsetsPartition {
44 pub fn with_partition_index(mut self, value: i32) -> Self {
50 self.partition_index = value;
51 self
52 }
53 pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
59 self.current_leader_epoch = value;
60 self
61 }
62 pub fn with_timestamp(mut self, value: i64) -> Self {
68 self.timestamp = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for ListOffsetsPartition {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version < 1 || version > 10 {
87 bail!("specified version not supported by this message type");
88 }
89 types::Int32.encode(buf, &self.partition_index)?;
90 if version >= 4 {
91 types::Int32.encode(buf, &self.current_leader_epoch)?;
92 }
93 types::Int64.encode(buf, &self.timestamp)?;
94 if version >= 6 {
95 let num_tagged_fields = self.unknown_tagged_fields.len();
96 if num_tagged_fields > std::u32::MAX as usize {
97 bail!(
98 "Too many tagged fields to encode ({} fields)",
99 num_tagged_fields
100 );
101 }
102 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
103
104 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
105 }
106 Ok(())
107 }
108 fn compute_size(&self, version: i16) -> Result<usize> {
109 let mut total_size = 0;
110 total_size += types::Int32.compute_size(&self.partition_index)?;
111 if version >= 4 {
112 total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
113 }
114 total_size += types::Int64.compute_size(&self.timestamp)?;
115 if version >= 6 {
116 let num_tagged_fields = self.unknown_tagged_fields.len();
117 if num_tagged_fields > std::u32::MAX as usize {
118 bail!(
119 "Too many tagged fields to encode ({} fields)",
120 num_tagged_fields
121 );
122 }
123 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
124
125 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
126 }
127 Ok(total_size)
128 }
129}
130
131#[cfg(feature = "broker")]
132impl Decodable for ListOffsetsPartition {
133 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
134 if version < 1 || version > 10 {
135 bail!("specified version not supported by this message type");
136 }
137 let partition_index = types::Int32.decode(buf)?;
138 let current_leader_epoch = if version >= 4 {
139 types::Int32.decode(buf)?
140 } else {
141 -1
142 };
143 let timestamp = types::Int64.decode(buf)?;
144 let mut unknown_tagged_fields = BTreeMap::new();
145 if version >= 6 {
146 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
147 for _ in 0..num_tagged_fields {
148 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
149 let size: u32 = types::UnsignedVarInt.decode(buf)?;
150 let unknown_value = buf.try_get_bytes(size as usize)?;
151 unknown_tagged_fields.insert(tag as i32, unknown_value);
152 }
153 }
154 Ok(Self {
155 partition_index,
156 current_leader_epoch,
157 timestamp,
158 unknown_tagged_fields,
159 })
160 }
161}
162
163impl Default for ListOffsetsPartition {
164 fn default() -> Self {
165 Self {
166 partition_index: 0,
167 current_leader_epoch: -1,
168 timestamp: 0,
169 unknown_tagged_fields: BTreeMap::new(),
170 }
171 }
172}
173
174impl Message for ListOffsetsPartition {
175 const VERSIONS: VersionRange = VersionRange { min: 1, max: 10 };
176 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
177}
178
179#[non_exhaustive]
181#[derive(Debug, Clone, PartialEq)]
182pub struct ListOffsetsRequest {
183 pub replica_id: super::BrokerId,
187
188 pub isolation_level: i8,
192
193 pub topics: Vec<ListOffsetsTopic>,
197
198 pub timeout_ms: i32,
202
203 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
205}
206
207impl ListOffsetsRequest {
208 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
214 self.replica_id = value;
215 self
216 }
217 pub fn with_isolation_level(mut self, value: i8) -> Self {
223 self.isolation_level = value;
224 self
225 }
226 pub fn with_topics(mut self, value: Vec<ListOffsetsTopic>) -> Self {
232 self.topics = value;
233 self
234 }
235 pub fn with_timeout_ms(mut self, value: i32) -> Self {
241 self.timeout_ms = value;
242 self
243 }
244 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
246 self.unknown_tagged_fields = value;
247 self
248 }
249 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
251 self.unknown_tagged_fields.insert(key, value);
252 self
253 }
254}
255
256#[cfg(feature = "client")]
257impl Encodable for ListOffsetsRequest {
258 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
259 if version < 1 || version > 10 {
260 bail!("specified version not supported by this message type");
261 }
262 types::Int32.encode(buf, &self.replica_id)?;
263 if version >= 2 {
264 types::Int8.encode(buf, &self.isolation_level)?;
265 } else {
266 if self.isolation_level != 0 {
267 bail!("A field is set that is not available on the selected protocol version");
268 }
269 }
270 if version >= 6 {
271 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
272 } else {
273 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
274 }
275 if version >= 10 {
276 types::Int32.encode(buf, &self.timeout_ms)?;
277 }
278 if version >= 6 {
279 let num_tagged_fields = self.unknown_tagged_fields.len();
280 if num_tagged_fields > std::u32::MAX as usize {
281 bail!(
282 "Too many tagged fields to encode ({} fields)",
283 num_tagged_fields
284 );
285 }
286 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
287
288 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
289 }
290 Ok(())
291 }
292 fn compute_size(&self, version: i16) -> Result<usize> {
293 let mut total_size = 0;
294 total_size += types::Int32.compute_size(&self.replica_id)?;
295 if version >= 2 {
296 total_size += types::Int8.compute_size(&self.isolation_level)?;
297 } else {
298 if self.isolation_level != 0 {
299 bail!("A field is set that is not available on the selected protocol version");
300 }
301 }
302 if version >= 6 {
303 total_size +=
304 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
305 } else {
306 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
307 }
308 if version >= 10 {
309 total_size += types::Int32.compute_size(&self.timeout_ms)?;
310 }
311 if version >= 6 {
312 let num_tagged_fields = self.unknown_tagged_fields.len();
313 if num_tagged_fields > std::u32::MAX as usize {
314 bail!(
315 "Too many tagged fields to encode ({} fields)",
316 num_tagged_fields
317 );
318 }
319 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
320
321 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
322 }
323 Ok(total_size)
324 }
325}
326
327#[cfg(feature = "broker")]
328impl Decodable for ListOffsetsRequest {
329 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
330 if version < 1 || version > 10 {
331 bail!("specified version not supported by this message type");
332 }
333 let replica_id = types::Int32.decode(buf)?;
334 let isolation_level = if version >= 2 {
335 types::Int8.decode(buf)?
336 } else {
337 0
338 };
339 let topics = if version >= 6 {
340 types::CompactArray(types::Struct { version }).decode(buf)?
341 } else {
342 types::Array(types::Struct { version }).decode(buf)?
343 };
344 let timeout_ms = if version >= 10 {
345 types::Int32.decode(buf)?
346 } else {
347 0
348 };
349 let mut unknown_tagged_fields = BTreeMap::new();
350 if version >= 6 {
351 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
352 for _ in 0..num_tagged_fields {
353 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
354 let size: u32 = types::UnsignedVarInt.decode(buf)?;
355 let unknown_value = buf.try_get_bytes(size as usize)?;
356 unknown_tagged_fields.insert(tag as i32, unknown_value);
357 }
358 }
359 Ok(Self {
360 replica_id,
361 isolation_level,
362 topics,
363 timeout_ms,
364 unknown_tagged_fields,
365 })
366 }
367}
368
369impl Default for ListOffsetsRequest {
370 fn default() -> Self {
371 Self {
372 replica_id: (0).into(),
373 isolation_level: 0,
374 topics: Default::default(),
375 timeout_ms: 0,
376 unknown_tagged_fields: BTreeMap::new(),
377 }
378 }
379}
380
381impl Message for ListOffsetsRequest {
382 const VERSIONS: VersionRange = VersionRange { min: 1, max: 10 };
383 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
384}
385
386#[non_exhaustive]
388#[derive(Debug, Clone, PartialEq)]
389pub struct ListOffsetsTopic {
390 pub name: super::TopicName,
394
395 pub partitions: Vec<ListOffsetsPartition>,
399
400 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
402}
403
404impl ListOffsetsTopic {
405 pub fn with_name(mut self, value: super::TopicName) -> Self {
411 self.name = value;
412 self
413 }
414 pub fn with_partitions(mut self, value: Vec<ListOffsetsPartition>) -> Self {
420 self.partitions = value;
421 self
422 }
423 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
425 self.unknown_tagged_fields = value;
426 self
427 }
428 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
430 self.unknown_tagged_fields.insert(key, value);
431 self
432 }
433}
434
435#[cfg(feature = "client")]
436impl Encodable for ListOffsetsTopic {
437 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
438 if version < 1 || version > 10 {
439 bail!("specified version not supported by this message type");
440 }
441 if version >= 6 {
442 types::CompactString.encode(buf, &self.name)?;
443 } else {
444 types::String.encode(buf, &self.name)?;
445 }
446 if version >= 6 {
447 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
448 } else {
449 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
450 }
451 if version >= 6 {
452 let num_tagged_fields = self.unknown_tagged_fields.len();
453 if num_tagged_fields > std::u32::MAX as usize {
454 bail!(
455 "Too many tagged fields to encode ({} fields)",
456 num_tagged_fields
457 );
458 }
459 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
460
461 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
462 }
463 Ok(())
464 }
465 fn compute_size(&self, version: i16) -> Result<usize> {
466 let mut total_size = 0;
467 if version >= 6 {
468 total_size += types::CompactString.compute_size(&self.name)?;
469 } else {
470 total_size += types::String.compute_size(&self.name)?;
471 }
472 if version >= 6 {
473 total_size +=
474 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
475 } else {
476 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
477 }
478 if version >= 6 {
479 let num_tagged_fields = self.unknown_tagged_fields.len();
480 if num_tagged_fields > std::u32::MAX as usize {
481 bail!(
482 "Too many tagged fields to encode ({} fields)",
483 num_tagged_fields
484 );
485 }
486 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
487
488 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
489 }
490 Ok(total_size)
491 }
492}
493
494#[cfg(feature = "broker")]
495impl Decodable for ListOffsetsTopic {
496 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
497 if version < 1 || version > 10 {
498 bail!("specified version not supported by this message type");
499 }
500 let name = if version >= 6 {
501 types::CompactString.decode(buf)?
502 } else {
503 types::String.decode(buf)?
504 };
505 let partitions = if version >= 6 {
506 types::CompactArray(types::Struct { version }).decode(buf)?
507 } else {
508 types::Array(types::Struct { version }).decode(buf)?
509 };
510 let mut unknown_tagged_fields = BTreeMap::new();
511 if version >= 6 {
512 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
513 for _ in 0..num_tagged_fields {
514 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
515 let size: u32 = types::UnsignedVarInt.decode(buf)?;
516 let unknown_value = buf.try_get_bytes(size as usize)?;
517 unknown_tagged_fields.insert(tag as i32, unknown_value);
518 }
519 }
520 Ok(Self {
521 name,
522 partitions,
523 unknown_tagged_fields,
524 })
525 }
526}
527
528impl Default for ListOffsetsTopic {
529 fn default() -> Self {
530 Self {
531 name: Default::default(),
532 partitions: Default::default(),
533 unknown_tagged_fields: BTreeMap::new(),
534 }
535 }
536}
537
538impl Message for ListOffsetsTopic {
539 const VERSIONS: VersionRange = VersionRange { min: 1, max: 10 };
540 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
541}
542
543impl HeaderVersion for ListOffsetsRequest {
544 fn header_version(version: i16) -> i16 {
545 if version >= 6 {
546 2
547 } else {
548 1
549 }
550 }
551}