1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ListOffsetsPartition {
24 pub partition_index: i32,
28
29 pub current_leader_epoch: i32,
33
34 pub timestamp: i64,
38
39 pub max_num_offsets: i32,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl ListOffsetsPartition {
49 pub fn with_partition_index(mut self, value: i32) -> Self {
55 self.partition_index = value;
56 self
57 }
58 pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
64 self.current_leader_epoch = value;
65 self
66 }
67 pub fn with_timestamp(mut self, value: i64) -> Self {
73 self.timestamp = value;
74 self
75 }
76 pub fn with_max_num_offsets(mut self, value: i32) -> Self {
82 self.max_num_offsets = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "client")]
98impl Encodable for ListOffsetsPartition {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 types::Int32.encode(buf, &self.partition_index)?;
101 if version >= 4 {
102 types::Int32.encode(buf, &self.current_leader_epoch)?;
103 }
104 types::Int64.encode(buf, &self.timestamp)?;
105 if version == 0 {
106 types::Int32.encode(buf, &self.max_num_offsets)?;
107 } else {
108 if self.max_num_offsets != 1 {
109 bail!("A field is set that is not available on the selected protocol version");
110 }
111 }
112 if version >= 6 {
113 let num_tagged_fields = self.unknown_tagged_fields.len();
114 if num_tagged_fields > std::u32::MAX as usize {
115 bail!(
116 "Too many tagged fields to encode ({} fields)",
117 num_tagged_fields
118 );
119 }
120 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
121
122 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
123 }
124 Ok(())
125 }
126 fn compute_size(&self, version: i16) -> Result<usize> {
127 let mut total_size = 0;
128 total_size += types::Int32.compute_size(&self.partition_index)?;
129 if version >= 4 {
130 total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
131 }
132 total_size += types::Int64.compute_size(&self.timestamp)?;
133 if version == 0 {
134 total_size += types::Int32.compute_size(&self.max_num_offsets)?;
135 } else {
136 if self.max_num_offsets != 1 {
137 bail!("A field is set that is not available on the selected protocol version");
138 }
139 }
140 if version >= 6 {
141 let num_tagged_fields = self.unknown_tagged_fields.len();
142 if num_tagged_fields > std::u32::MAX as usize {
143 bail!(
144 "Too many tagged fields to encode ({} fields)",
145 num_tagged_fields
146 );
147 }
148 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
149
150 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
151 }
152 Ok(total_size)
153 }
154}
155
156#[cfg(feature = "broker")]
157impl Decodable for ListOffsetsPartition {
158 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
159 let partition_index = types::Int32.decode(buf)?;
160 let current_leader_epoch = if version >= 4 {
161 types::Int32.decode(buf)?
162 } else {
163 -1
164 };
165 let timestamp = types::Int64.decode(buf)?;
166 let max_num_offsets = if version == 0 {
167 types::Int32.decode(buf)?
168 } else {
169 1
170 };
171 let mut unknown_tagged_fields = BTreeMap::new();
172 if version >= 6 {
173 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
174 for _ in 0..num_tagged_fields {
175 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
176 let size: u32 = types::UnsignedVarInt.decode(buf)?;
177 let unknown_value = buf.try_get_bytes(size as usize)?;
178 unknown_tagged_fields.insert(tag as i32, unknown_value);
179 }
180 }
181 Ok(Self {
182 partition_index,
183 current_leader_epoch,
184 timestamp,
185 max_num_offsets,
186 unknown_tagged_fields,
187 })
188 }
189}
190
191impl Default for ListOffsetsPartition {
192 fn default() -> Self {
193 Self {
194 partition_index: 0,
195 current_leader_epoch: -1,
196 timestamp: 0,
197 max_num_offsets: 1,
198 unknown_tagged_fields: BTreeMap::new(),
199 }
200 }
201}
202
203impl Message for ListOffsetsPartition {
204 const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
205 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
206}
207
208#[non_exhaustive]
210#[derive(Debug, Clone, PartialEq)]
211pub struct ListOffsetsRequest {
212 pub replica_id: super::BrokerId,
216
217 pub isolation_level: i8,
221
222 pub topics: Vec<ListOffsetsTopic>,
226
227 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
229}
230
231impl ListOffsetsRequest {
232 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
238 self.replica_id = value;
239 self
240 }
241 pub fn with_isolation_level(mut self, value: i8) -> Self {
247 self.isolation_level = value;
248 self
249 }
250 pub fn with_topics(mut self, value: Vec<ListOffsetsTopic>) -> Self {
256 self.topics = value;
257 self
258 }
259 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
261 self.unknown_tagged_fields = value;
262 self
263 }
264 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
266 self.unknown_tagged_fields.insert(key, value);
267 self
268 }
269}
270
271#[cfg(feature = "client")]
272impl Encodable for ListOffsetsRequest {
273 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
274 types::Int32.encode(buf, &self.replica_id)?;
275 if version >= 2 {
276 types::Int8.encode(buf, &self.isolation_level)?;
277 } else {
278 if self.isolation_level != 0 {
279 bail!("A field is set that is not available on the selected protocol version");
280 }
281 }
282 if version >= 6 {
283 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
284 } else {
285 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
286 }
287 if version >= 6 {
288 let num_tagged_fields = self.unknown_tagged_fields.len();
289 if num_tagged_fields > std::u32::MAX as usize {
290 bail!(
291 "Too many tagged fields to encode ({} fields)",
292 num_tagged_fields
293 );
294 }
295 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
296
297 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
298 }
299 Ok(())
300 }
301 fn compute_size(&self, version: i16) -> Result<usize> {
302 let mut total_size = 0;
303 total_size += types::Int32.compute_size(&self.replica_id)?;
304 if version >= 2 {
305 total_size += types::Int8.compute_size(&self.isolation_level)?;
306 } else {
307 if self.isolation_level != 0 {
308 bail!("A field is set that is not available on the selected protocol version");
309 }
310 }
311 if version >= 6 {
312 total_size +=
313 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
314 } else {
315 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
316 }
317 if version >= 6 {
318 let num_tagged_fields = self.unknown_tagged_fields.len();
319 if num_tagged_fields > std::u32::MAX as usize {
320 bail!(
321 "Too many tagged fields to encode ({} fields)",
322 num_tagged_fields
323 );
324 }
325 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
326
327 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
328 }
329 Ok(total_size)
330 }
331}
332
333#[cfg(feature = "broker")]
334impl Decodable for ListOffsetsRequest {
335 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
336 let replica_id = types::Int32.decode(buf)?;
337 let isolation_level = if version >= 2 {
338 types::Int8.decode(buf)?
339 } else {
340 0
341 };
342 let topics = if version >= 6 {
343 types::CompactArray(types::Struct { version }).decode(buf)?
344 } else {
345 types::Array(types::Struct { version }).decode(buf)?
346 };
347 let mut unknown_tagged_fields = BTreeMap::new();
348 if version >= 6 {
349 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
350 for _ in 0..num_tagged_fields {
351 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
352 let size: u32 = types::UnsignedVarInt.decode(buf)?;
353 let unknown_value = buf.try_get_bytes(size as usize)?;
354 unknown_tagged_fields.insert(tag as i32, unknown_value);
355 }
356 }
357 Ok(Self {
358 replica_id,
359 isolation_level,
360 topics,
361 unknown_tagged_fields,
362 })
363 }
364}
365
366impl Default for ListOffsetsRequest {
367 fn default() -> Self {
368 Self {
369 replica_id: (0).into(),
370 isolation_level: 0,
371 topics: Default::default(),
372 unknown_tagged_fields: BTreeMap::new(),
373 }
374 }
375}
376
377impl Message for ListOffsetsRequest {
378 const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
379 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
380}
381
382#[non_exhaustive]
384#[derive(Debug, Clone, PartialEq)]
385pub struct ListOffsetsTopic {
386 pub name: super::TopicName,
390
391 pub partitions: Vec<ListOffsetsPartition>,
395
396 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
398}
399
400impl ListOffsetsTopic {
401 pub fn with_name(mut self, value: super::TopicName) -> Self {
407 self.name = value;
408 self
409 }
410 pub fn with_partitions(mut self, value: Vec<ListOffsetsPartition>) -> Self {
416 self.partitions = value;
417 self
418 }
419 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
421 self.unknown_tagged_fields = value;
422 self
423 }
424 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
426 self.unknown_tagged_fields.insert(key, value);
427 self
428 }
429}
430
431#[cfg(feature = "client")]
432impl Encodable for ListOffsetsTopic {
433 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
434 if version >= 6 {
435 types::CompactString.encode(buf, &self.name)?;
436 } else {
437 types::String.encode(buf, &self.name)?;
438 }
439 if version >= 6 {
440 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
441 } else {
442 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
443 }
444 if version >= 6 {
445 let num_tagged_fields = self.unknown_tagged_fields.len();
446 if num_tagged_fields > std::u32::MAX as usize {
447 bail!(
448 "Too many tagged fields to encode ({} fields)",
449 num_tagged_fields
450 );
451 }
452 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
453
454 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
455 }
456 Ok(())
457 }
458 fn compute_size(&self, version: i16) -> Result<usize> {
459 let mut total_size = 0;
460 if version >= 6 {
461 total_size += types::CompactString.compute_size(&self.name)?;
462 } else {
463 total_size += types::String.compute_size(&self.name)?;
464 }
465 if version >= 6 {
466 total_size +=
467 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
468 } else {
469 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
470 }
471 if version >= 6 {
472 let num_tagged_fields = self.unknown_tagged_fields.len();
473 if num_tagged_fields > std::u32::MAX as usize {
474 bail!(
475 "Too many tagged fields to encode ({} fields)",
476 num_tagged_fields
477 );
478 }
479 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
480
481 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
482 }
483 Ok(total_size)
484 }
485}
486
487#[cfg(feature = "broker")]
488impl Decodable for ListOffsetsTopic {
489 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
490 let name = if version >= 6 {
491 types::CompactString.decode(buf)?
492 } else {
493 types::String.decode(buf)?
494 };
495 let partitions = if version >= 6 {
496 types::CompactArray(types::Struct { version }).decode(buf)?
497 } else {
498 types::Array(types::Struct { version }).decode(buf)?
499 };
500 let mut unknown_tagged_fields = BTreeMap::new();
501 if version >= 6 {
502 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
503 for _ in 0..num_tagged_fields {
504 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
505 let size: u32 = types::UnsignedVarInt.decode(buf)?;
506 let unknown_value = buf.try_get_bytes(size as usize)?;
507 unknown_tagged_fields.insert(tag as i32, unknown_value);
508 }
509 }
510 Ok(Self {
511 name,
512 partitions,
513 unknown_tagged_fields,
514 })
515 }
516}
517
518impl Default for ListOffsetsTopic {
519 fn default() -> Self {
520 Self {
521 name: Default::default(),
522 partitions: Default::default(),
523 unknown_tagged_fields: BTreeMap::new(),
524 }
525 }
526}
527
528impl Message for ListOffsetsTopic {
529 const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
530 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
531}
532
533impl HeaderVersion for ListOffsetsRequest {
534 fn header_version(version: i16) -> i16 {
535 if version >= 6 {
536 2
537 } else {
538 1
539 }
540 }
541}