1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ListOffsetsPartition {
24 pub partition_index: i32,
28
29 pub current_leader_epoch: i32,
33
34 pub timestamp: i64,
38
39 pub max_num_offsets: i32,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl ListOffsetsPartition {
49 pub fn with_partition_index(mut self, value: i32) -> Self {
55 self.partition_index = value;
56 self
57 }
58 pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
64 self.current_leader_epoch = value;
65 self
66 }
67 pub fn with_timestamp(mut self, value: i64) -> Self {
73 self.timestamp = value;
74 self
75 }
76 pub fn with_max_num_offsets(mut self, value: i32) -> Self {
82 self.max_num_offsets = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "client")]
98impl Encodable for ListOffsetsPartition {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 if version < 0 || version > 9 {
101 bail!("specified version not supported by this message type");
102 }
103 types::Int32.encode(buf, &self.partition_index)?;
104 if version >= 4 {
105 types::Int32.encode(buf, &self.current_leader_epoch)?;
106 }
107 types::Int64.encode(buf, &self.timestamp)?;
108 if version == 0 {
109 types::Int32.encode(buf, &self.max_num_offsets)?;
110 } else {
111 if self.max_num_offsets != 1 {
112 bail!("A field is set that is not available on the selected protocol version");
113 }
114 }
115 if version >= 6 {
116 let num_tagged_fields = self.unknown_tagged_fields.len();
117 if num_tagged_fields > std::u32::MAX as usize {
118 bail!(
119 "Too many tagged fields to encode ({} fields)",
120 num_tagged_fields
121 );
122 }
123 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
124
125 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
126 }
127 Ok(())
128 }
129 fn compute_size(&self, version: i16) -> Result<usize> {
130 let mut total_size = 0;
131 total_size += types::Int32.compute_size(&self.partition_index)?;
132 if version >= 4 {
133 total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
134 }
135 total_size += types::Int64.compute_size(&self.timestamp)?;
136 if version == 0 {
137 total_size += types::Int32.compute_size(&self.max_num_offsets)?;
138 } else {
139 if self.max_num_offsets != 1 {
140 bail!("A field is set that is not available on the selected protocol version");
141 }
142 }
143 if version >= 6 {
144 let num_tagged_fields = self.unknown_tagged_fields.len();
145 if num_tagged_fields > std::u32::MAX as usize {
146 bail!(
147 "Too many tagged fields to encode ({} fields)",
148 num_tagged_fields
149 );
150 }
151 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
152
153 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
154 }
155 Ok(total_size)
156 }
157}
158
159#[cfg(feature = "broker")]
160impl Decodable for ListOffsetsPartition {
161 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
162 if version < 0 || version > 9 {
163 bail!("specified version not supported by this message type");
164 }
165 let partition_index = types::Int32.decode(buf)?;
166 let current_leader_epoch = if version >= 4 {
167 types::Int32.decode(buf)?
168 } else {
169 -1
170 };
171 let timestamp = types::Int64.decode(buf)?;
172 let max_num_offsets = if version == 0 {
173 types::Int32.decode(buf)?
174 } else {
175 1
176 };
177 let mut unknown_tagged_fields = BTreeMap::new();
178 if version >= 6 {
179 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
180 for _ in 0..num_tagged_fields {
181 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
182 let size: u32 = types::UnsignedVarInt.decode(buf)?;
183 let unknown_value = buf.try_get_bytes(size as usize)?;
184 unknown_tagged_fields.insert(tag as i32, unknown_value);
185 }
186 }
187 Ok(Self {
188 partition_index,
189 current_leader_epoch,
190 timestamp,
191 max_num_offsets,
192 unknown_tagged_fields,
193 })
194 }
195}
196
197impl Default for ListOffsetsPartition {
198 fn default() -> Self {
199 Self {
200 partition_index: 0,
201 current_leader_epoch: -1,
202 timestamp: 0,
203 max_num_offsets: 1,
204 unknown_tagged_fields: BTreeMap::new(),
205 }
206 }
207}
208
209impl Message for ListOffsetsPartition {
210 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
211 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
212}
213
214#[non_exhaustive]
216#[derive(Debug, Clone, PartialEq)]
217pub struct ListOffsetsRequest {
218 pub replica_id: super::BrokerId,
222
223 pub isolation_level: i8,
227
228 pub topics: Vec<ListOffsetsTopic>,
232
233 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
235}
236
237impl ListOffsetsRequest {
238 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
244 self.replica_id = value;
245 self
246 }
247 pub fn with_isolation_level(mut self, value: i8) -> Self {
253 self.isolation_level = value;
254 self
255 }
256 pub fn with_topics(mut self, value: Vec<ListOffsetsTopic>) -> Self {
262 self.topics = value;
263 self
264 }
265 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
267 self.unknown_tagged_fields = value;
268 self
269 }
270 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
272 self.unknown_tagged_fields.insert(key, value);
273 self
274 }
275}
276
277#[cfg(feature = "client")]
278impl Encodable for ListOffsetsRequest {
279 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
280 if version < 0 || version > 9 {
281 bail!("specified version not supported by this message type");
282 }
283 types::Int32.encode(buf, &self.replica_id)?;
284 if version >= 2 {
285 types::Int8.encode(buf, &self.isolation_level)?;
286 } else {
287 if self.isolation_level != 0 {
288 bail!("A field is set that is not available on the selected protocol version");
289 }
290 }
291 if version >= 6 {
292 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
293 } else {
294 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
295 }
296 if version >= 6 {
297 let num_tagged_fields = self.unknown_tagged_fields.len();
298 if num_tagged_fields > std::u32::MAX as usize {
299 bail!(
300 "Too many tagged fields to encode ({} fields)",
301 num_tagged_fields
302 );
303 }
304 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
305
306 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
307 }
308 Ok(())
309 }
310 fn compute_size(&self, version: i16) -> Result<usize> {
311 let mut total_size = 0;
312 total_size += types::Int32.compute_size(&self.replica_id)?;
313 if version >= 2 {
314 total_size += types::Int8.compute_size(&self.isolation_level)?;
315 } else {
316 if self.isolation_level != 0 {
317 bail!("A field is set that is not available on the selected protocol version");
318 }
319 }
320 if version >= 6 {
321 total_size +=
322 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
323 } else {
324 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
325 }
326 if version >= 6 {
327 let num_tagged_fields = self.unknown_tagged_fields.len();
328 if num_tagged_fields > std::u32::MAX as usize {
329 bail!(
330 "Too many tagged fields to encode ({} fields)",
331 num_tagged_fields
332 );
333 }
334 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
335
336 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
337 }
338 Ok(total_size)
339 }
340}
341
342#[cfg(feature = "broker")]
343impl Decodable for ListOffsetsRequest {
344 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
345 if version < 0 || version > 9 {
346 bail!("specified version not supported by this message type");
347 }
348 let replica_id = types::Int32.decode(buf)?;
349 let isolation_level = if version >= 2 {
350 types::Int8.decode(buf)?
351 } else {
352 0
353 };
354 let topics = if version >= 6 {
355 types::CompactArray(types::Struct { version }).decode(buf)?
356 } else {
357 types::Array(types::Struct { version }).decode(buf)?
358 };
359 let mut unknown_tagged_fields = BTreeMap::new();
360 if version >= 6 {
361 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
362 for _ in 0..num_tagged_fields {
363 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
364 let size: u32 = types::UnsignedVarInt.decode(buf)?;
365 let unknown_value = buf.try_get_bytes(size as usize)?;
366 unknown_tagged_fields.insert(tag as i32, unknown_value);
367 }
368 }
369 Ok(Self {
370 replica_id,
371 isolation_level,
372 topics,
373 unknown_tagged_fields,
374 })
375 }
376}
377
378impl Default for ListOffsetsRequest {
379 fn default() -> Self {
380 Self {
381 replica_id: (0).into(),
382 isolation_level: 0,
383 topics: Default::default(),
384 unknown_tagged_fields: BTreeMap::new(),
385 }
386 }
387}
388
389impl Message for ListOffsetsRequest {
390 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
391 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
392}
393
394#[non_exhaustive]
396#[derive(Debug, Clone, PartialEq)]
397pub struct ListOffsetsTopic {
398 pub name: super::TopicName,
402
403 pub partitions: Vec<ListOffsetsPartition>,
407
408 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
410}
411
412impl ListOffsetsTopic {
413 pub fn with_name(mut self, value: super::TopicName) -> Self {
419 self.name = value;
420 self
421 }
422 pub fn with_partitions(mut self, value: Vec<ListOffsetsPartition>) -> Self {
428 self.partitions = value;
429 self
430 }
431 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
433 self.unknown_tagged_fields = value;
434 self
435 }
436 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
438 self.unknown_tagged_fields.insert(key, value);
439 self
440 }
441}
442
443#[cfg(feature = "client")]
444impl Encodable for ListOffsetsTopic {
445 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
446 if version < 0 || version > 9 {
447 bail!("specified version not supported by this message type");
448 }
449 if version >= 6 {
450 types::CompactString.encode(buf, &self.name)?;
451 } else {
452 types::String.encode(buf, &self.name)?;
453 }
454 if version >= 6 {
455 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
456 } else {
457 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
458 }
459 if version >= 6 {
460 let num_tagged_fields = self.unknown_tagged_fields.len();
461 if num_tagged_fields > std::u32::MAX as usize {
462 bail!(
463 "Too many tagged fields to encode ({} fields)",
464 num_tagged_fields
465 );
466 }
467 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
468
469 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
470 }
471 Ok(())
472 }
473 fn compute_size(&self, version: i16) -> Result<usize> {
474 let mut total_size = 0;
475 if version >= 6 {
476 total_size += types::CompactString.compute_size(&self.name)?;
477 } else {
478 total_size += types::String.compute_size(&self.name)?;
479 }
480 if version >= 6 {
481 total_size +=
482 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
483 } else {
484 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
485 }
486 if version >= 6 {
487 let num_tagged_fields = self.unknown_tagged_fields.len();
488 if num_tagged_fields > std::u32::MAX as usize {
489 bail!(
490 "Too many tagged fields to encode ({} fields)",
491 num_tagged_fields
492 );
493 }
494 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
495
496 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
497 }
498 Ok(total_size)
499 }
500}
501
502#[cfg(feature = "broker")]
503impl Decodable for ListOffsetsTopic {
504 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
505 if version < 0 || version > 9 {
506 bail!("specified version not supported by this message type");
507 }
508 let name = if version >= 6 {
509 types::CompactString.decode(buf)?
510 } else {
511 types::String.decode(buf)?
512 };
513 let partitions = if version >= 6 {
514 types::CompactArray(types::Struct { version }).decode(buf)?
515 } else {
516 types::Array(types::Struct { version }).decode(buf)?
517 };
518 let mut unknown_tagged_fields = BTreeMap::new();
519 if version >= 6 {
520 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
521 for _ in 0..num_tagged_fields {
522 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
523 let size: u32 = types::UnsignedVarInt.decode(buf)?;
524 let unknown_value = buf.try_get_bytes(size as usize)?;
525 unknown_tagged_fields.insert(tag as i32, unknown_value);
526 }
527 }
528 Ok(Self {
529 name,
530 partitions,
531 unknown_tagged_fields,
532 })
533 }
534}
535
536impl Default for ListOffsetsTopic {
537 fn default() -> Self {
538 Self {
539 name: Default::default(),
540 partitions: Default::default(),
541 unknown_tagged_fields: BTreeMap::new(),
542 }
543 }
544}
545
546impl Message for ListOffsetsTopic {
547 const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
548 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
549}
550
551impl HeaderVersion for ListOffsetsRequest {
552 fn header_version(version: i16) -> i16 {
553 if version >= 6 {
554 2
555 } else {
556 1
557 }
558 }
559}