1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetForLeaderEpochRequest {
24 pub replica_id: super::BrokerId,
28
29 pub topics: Vec<OffsetForLeaderTopic>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl OffsetForLeaderEpochRequest {
39 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
45 self.replica_id = value;
46 self
47 }
48 pub fn with_topics(mut self, value: Vec<OffsetForLeaderTopic>) -> Self {
54 self.topics = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "client")]
70impl Encodable for OffsetForLeaderEpochRequest {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version < 0 || version > 4 {
73 bail!("specified version not supported by this message type");
74 }
75 if version >= 3 {
76 types::Int32.encode(buf, &self.replica_id)?;
77 }
78 if version >= 4 {
79 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
80 } else {
81 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
82 }
83 if version >= 4 {
84 let num_tagged_fields = self.unknown_tagged_fields.len();
85 if num_tagged_fields > std::u32::MAX as usize {
86 bail!(
87 "Too many tagged fields to encode ({} fields)",
88 num_tagged_fields
89 );
90 }
91 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
92
93 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
94 }
95 Ok(())
96 }
97 fn compute_size(&self, version: i16) -> Result<usize> {
98 let mut total_size = 0;
99 if version >= 3 {
100 total_size += types::Int32.compute_size(&self.replica_id)?;
101 }
102 if version >= 4 {
103 total_size +=
104 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
105 } else {
106 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
107 }
108 if version >= 4 {
109 let num_tagged_fields = self.unknown_tagged_fields.len();
110 if num_tagged_fields > std::u32::MAX as usize {
111 bail!(
112 "Too many tagged fields to encode ({} fields)",
113 num_tagged_fields
114 );
115 }
116 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
117
118 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
119 }
120 Ok(total_size)
121 }
122}
123
124#[cfg(feature = "broker")]
125impl Decodable for OffsetForLeaderEpochRequest {
126 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
127 if version < 0 || version > 4 {
128 bail!("specified version not supported by this message type");
129 }
130 let replica_id = if version >= 3 {
131 types::Int32.decode(buf)?
132 } else {
133 (-2).into()
134 };
135 let topics = if version >= 4 {
136 types::CompactArray(types::Struct { version }).decode(buf)?
137 } else {
138 types::Array(types::Struct { version }).decode(buf)?
139 };
140 let mut unknown_tagged_fields = BTreeMap::new();
141 if version >= 4 {
142 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
143 for _ in 0..num_tagged_fields {
144 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
145 let size: u32 = types::UnsignedVarInt.decode(buf)?;
146 let unknown_value = buf.try_get_bytes(size as usize)?;
147 unknown_tagged_fields.insert(tag as i32, unknown_value);
148 }
149 }
150 Ok(Self {
151 replica_id,
152 topics,
153 unknown_tagged_fields,
154 })
155 }
156}
157
158impl Default for OffsetForLeaderEpochRequest {
159 fn default() -> Self {
160 Self {
161 replica_id: (-2).into(),
162 topics: Default::default(),
163 unknown_tagged_fields: BTreeMap::new(),
164 }
165 }
166}
167
168impl Message for OffsetForLeaderEpochRequest {
169 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
170 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
171}
172
173#[non_exhaustive]
175#[derive(Debug, Clone, PartialEq)]
176pub struct OffsetForLeaderPartition {
177 pub partition: i32,
181
182 pub current_leader_epoch: i32,
186
187 pub leader_epoch: i32,
191
192 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
194}
195
196impl OffsetForLeaderPartition {
197 pub fn with_partition(mut self, value: i32) -> Self {
203 self.partition = value;
204 self
205 }
206 pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
212 self.current_leader_epoch = value;
213 self
214 }
215 pub fn with_leader_epoch(mut self, value: i32) -> Self {
221 self.leader_epoch = value;
222 self
223 }
224 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
226 self.unknown_tagged_fields = value;
227 self
228 }
229 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
231 self.unknown_tagged_fields.insert(key, value);
232 self
233 }
234}
235
236#[cfg(feature = "client")]
237impl Encodable for OffsetForLeaderPartition {
238 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
239 if version < 0 || version > 4 {
240 bail!("specified version not supported by this message type");
241 }
242 types::Int32.encode(buf, &self.partition)?;
243 if version >= 2 {
244 types::Int32.encode(buf, &self.current_leader_epoch)?;
245 }
246 types::Int32.encode(buf, &self.leader_epoch)?;
247 if version >= 4 {
248 let num_tagged_fields = self.unknown_tagged_fields.len();
249 if num_tagged_fields > std::u32::MAX as usize {
250 bail!(
251 "Too many tagged fields to encode ({} fields)",
252 num_tagged_fields
253 );
254 }
255 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
256
257 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
258 }
259 Ok(())
260 }
261 fn compute_size(&self, version: i16) -> Result<usize> {
262 let mut total_size = 0;
263 total_size += types::Int32.compute_size(&self.partition)?;
264 if version >= 2 {
265 total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
266 }
267 total_size += types::Int32.compute_size(&self.leader_epoch)?;
268 if version >= 4 {
269 let num_tagged_fields = self.unknown_tagged_fields.len();
270 if num_tagged_fields > std::u32::MAX as usize {
271 bail!(
272 "Too many tagged fields to encode ({} fields)",
273 num_tagged_fields
274 );
275 }
276 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
277
278 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
279 }
280 Ok(total_size)
281 }
282}
283
284#[cfg(feature = "broker")]
285impl Decodable for OffsetForLeaderPartition {
286 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
287 if version < 0 || version > 4 {
288 bail!("specified version not supported by this message type");
289 }
290 let partition = types::Int32.decode(buf)?;
291 let current_leader_epoch = if version >= 2 {
292 types::Int32.decode(buf)?
293 } else {
294 -1
295 };
296 let leader_epoch = types::Int32.decode(buf)?;
297 let mut unknown_tagged_fields = BTreeMap::new();
298 if version >= 4 {
299 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
300 for _ in 0..num_tagged_fields {
301 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
302 let size: u32 = types::UnsignedVarInt.decode(buf)?;
303 let unknown_value = buf.try_get_bytes(size as usize)?;
304 unknown_tagged_fields.insert(tag as i32, unknown_value);
305 }
306 }
307 Ok(Self {
308 partition,
309 current_leader_epoch,
310 leader_epoch,
311 unknown_tagged_fields,
312 })
313 }
314}
315
316impl Default for OffsetForLeaderPartition {
317 fn default() -> Self {
318 Self {
319 partition: 0,
320 current_leader_epoch: -1,
321 leader_epoch: 0,
322 unknown_tagged_fields: BTreeMap::new(),
323 }
324 }
325}
326
327impl Message for OffsetForLeaderPartition {
328 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
329 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
330}
331
332#[non_exhaustive]
334#[derive(Debug, Clone, PartialEq)]
335pub struct OffsetForLeaderTopic {
336 pub topic: super::TopicName,
340
341 pub partitions: Vec<OffsetForLeaderPartition>,
345
346 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
348}
349
350impl OffsetForLeaderTopic {
351 pub fn with_topic(mut self, value: super::TopicName) -> Self {
357 self.topic = value;
358 self
359 }
360 pub fn with_partitions(mut self, value: Vec<OffsetForLeaderPartition>) -> Self {
366 self.partitions = value;
367 self
368 }
369 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
371 self.unknown_tagged_fields = value;
372 self
373 }
374 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
376 self.unknown_tagged_fields.insert(key, value);
377 self
378 }
379}
380
381#[cfg(feature = "client")]
382impl Encodable for OffsetForLeaderTopic {
383 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
384 if version < 0 || version > 4 {
385 bail!("specified version not supported by this message type");
386 }
387 if version >= 4 {
388 types::CompactString.encode(buf, &self.topic)?;
389 } else {
390 types::String.encode(buf, &self.topic)?;
391 }
392 if version >= 4 {
393 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
394 } else {
395 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
396 }
397 if version >= 4 {
398 let num_tagged_fields = self.unknown_tagged_fields.len();
399 if num_tagged_fields > std::u32::MAX as usize {
400 bail!(
401 "Too many tagged fields to encode ({} fields)",
402 num_tagged_fields
403 );
404 }
405 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
406
407 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
408 }
409 Ok(())
410 }
411 fn compute_size(&self, version: i16) -> Result<usize> {
412 let mut total_size = 0;
413 if version >= 4 {
414 total_size += types::CompactString.compute_size(&self.topic)?;
415 } else {
416 total_size += types::String.compute_size(&self.topic)?;
417 }
418 if version >= 4 {
419 total_size +=
420 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
421 } else {
422 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
423 }
424 if version >= 4 {
425 let num_tagged_fields = self.unknown_tagged_fields.len();
426 if num_tagged_fields > std::u32::MAX as usize {
427 bail!(
428 "Too many tagged fields to encode ({} fields)",
429 num_tagged_fields
430 );
431 }
432 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
433
434 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
435 }
436 Ok(total_size)
437 }
438}
439
440#[cfg(feature = "broker")]
441impl Decodable for OffsetForLeaderTopic {
442 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
443 if version < 0 || version > 4 {
444 bail!("specified version not supported by this message type");
445 }
446 let topic = if version >= 4 {
447 types::CompactString.decode(buf)?
448 } else {
449 types::String.decode(buf)?
450 };
451 let partitions = if version >= 4 {
452 types::CompactArray(types::Struct { version }).decode(buf)?
453 } else {
454 types::Array(types::Struct { version }).decode(buf)?
455 };
456 let mut unknown_tagged_fields = BTreeMap::new();
457 if version >= 4 {
458 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
459 for _ in 0..num_tagged_fields {
460 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
461 let size: u32 = types::UnsignedVarInt.decode(buf)?;
462 let unknown_value = buf.try_get_bytes(size as usize)?;
463 unknown_tagged_fields.insert(tag as i32, unknown_value);
464 }
465 }
466 Ok(Self {
467 topic,
468 partitions,
469 unknown_tagged_fields,
470 })
471 }
472}
473
474impl Default for OffsetForLeaderTopic {
475 fn default() -> Self {
476 Self {
477 topic: Default::default(),
478 partitions: Default::default(),
479 unknown_tagged_fields: BTreeMap::new(),
480 }
481 }
482}
483
484impl Message for OffsetForLeaderTopic {
485 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
486 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
487}
488
489impl HeaderVersion for OffsetForLeaderEpochRequest {
490 fn header_version(version: i16) -> i16 {
491 if version >= 4 {
492 2
493 } else {
494 1
495 }
496 }
497}