1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetForLeaderEpochRequest {
24 pub replica_id: super::BrokerId,
28
29 pub topics: Vec<OffsetForLeaderTopic>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl OffsetForLeaderEpochRequest {
39 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
45 self.replica_id = value;
46 self
47 }
48 pub fn with_topics(mut self, value: Vec<OffsetForLeaderTopic>) -> Self {
54 self.topics = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "client")]
70impl Encodable for OffsetForLeaderEpochRequest {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version < 2 || version > 4 {
73 bail!("specified version not supported by this message type");
74 }
75 if version >= 3 {
76 types::Int32.encode(buf, &self.replica_id)?;
77 }
78 if version >= 4 {
79 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
80 } else {
81 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
82 }
83 if version >= 4 {
84 let num_tagged_fields = self.unknown_tagged_fields.len();
85 if num_tagged_fields > std::u32::MAX as usize {
86 bail!(
87 "Too many tagged fields to encode ({} fields)",
88 num_tagged_fields
89 );
90 }
91 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
92
93 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
94 }
95 Ok(())
96 }
97 fn compute_size(&self, version: i16) -> Result<usize> {
98 let mut total_size = 0;
99 if version >= 3 {
100 total_size += types::Int32.compute_size(&self.replica_id)?;
101 }
102 if version >= 4 {
103 total_size +=
104 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
105 } else {
106 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
107 }
108 if version >= 4 {
109 let num_tagged_fields = self.unknown_tagged_fields.len();
110 if num_tagged_fields > std::u32::MAX as usize {
111 bail!(
112 "Too many tagged fields to encode ({} fields)",
113 num_tagged_fields
114 );
115 }
116 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
117
118 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
119 }
120 Ok(total_size)
121 }
122}
123
124#[cfg(feature = "broker")]
125impl Decodable for OffsetForLeaderEpochRequest {
126 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
127 if version < 2 || version > 4 {
128 bail!("specified version not supported by this message type");
129 }
130 let replica_id = if version >= 3 {
131 types::Int32.decode(buf)?
132 } else {
133 (-2).into()
134 };
135 let topics = if version >= 4 {
136 types::CompactArray(types::Struct { version }).decode(buf)?
137 } else {
138 types::Array(types::Struct { version }).decode(buf)?
139 };
140 let mut unknown_tagged_fields = BTreeMap::new();
141 if version >= 4 {
142 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
143 for _ in 0..num_tagged_fields {
144 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
145 let size: u32 = types::UnsignedVarInt.decode(buf)?;
146 let unknown_value = buf.try_get_bytes(size as usize)?;
147 unknown_tagged_fields.insert(tag as i32, unknown_value);
148 }
149 }
150 Ok(Self {
151 replica_id,
152 topics,
153 unknown_tagged_fields,
154 })
155 }
156}
157
158impl Default for OffsetForLeaderEpochRequest {
159 fn default() -> Self {
160 Self {
161 replica_id: (-2).into(),
162 topics: Default::default(),
163 unknown_tagged_fields: BTreeMap::new(),
164 }
165 }
166}
167
168impl Message for OffsetForLeaderEpochRequest {
169 const VERSIONS: VersionRange = VersionRange { min: 2, max: 4 };
170 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
171}
172
173#[non_exhaustive]
175#[derive(Debug, Clone, PartialEq)]
176pub struct OffsetForLeaderPartition {
177 pub partition: i32,
181
182 pub current_leader_epoch: i32,
186
187 pub leader_epoch: i32,
191
192 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
194}
195
196impl OffsetForLeaderPartition {
197 pub fn with_partition(mut self, value: i32) -> Self {
203 self.partition = value;
204 self
205 }
206 pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
212 self.current_leader_epoch = value;
213 self
214 }
215 pub fn with_leader_epoch(mut self, value: i32) -> Self {
221 self.leader_epoch = value;
222 self
223 }
224 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
226 self.unknown_tagged_fields = value;
227 self
228 }
229 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
231 self.unknown_tagged_fields.insert(key, value);
232 self
233 }
234}
235
236#[cfg(feature = "client")]
237impl Encodable for OffsetForLeaderPartition {
238 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
239 if version < 2 || version > 4 {
240 bail!("specified version not supported by this message type");
241 }
242 types::Int32.encode(buf, &self.partition)?;
243 types::Int32.encode(buf, &self.current_leader_epoch)?;
244 types::Int32.encode(buf, &self.leader_epoch)?;
245 if version >= 4 {
246 let num_tagged_fields = self.unknown_tagged_fields.len();
247 if num_tagged_fields > std::u32::MAX as usize {
248 bail!(
249 "Too many tagged fields to encode ({} fields)",
250 num_tagged_fields
251 );
252 }
253 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
254
255 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
256 }
257 Ok(())
258 }
259 fn compute_size(&self, version: i16) -> Result<usize> {
260 let mut total_size = 0;
261 total_size += types::Int32.compute_size(&self.partition)?;
262 total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
263 total_size += types::Int32.compute_size(&self.leader_epoch)?;
264 if version >= 4 {
265 let num_tagged_fields = self.unknown_tagged_fields.len();
266 if num_tagged_fields > std::u32::MAX as usize {
267 bail!(
268 "Too many tagged fields to encode ({} fields)",
269 num_tagged_fields
270 );
271 }
272 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
273
274 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
275 }
276 Ok(total_size)
277 }
278}
279
280#[cfg(feature = "broker")]
281impl Decodable for OffsetForLeaderPartition {
282 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
283 if version < 2 || version > 4 {
284 bail!("specified version not supported by this message type");
285 }
286 let partition = types::Int32.decode(buf)?;
287 let current_leader_epoch = types::Int32.decode(buf)?;
288 let leader_epoch = types::Int32.decode(buf)?;
289 let mut unknown_tagged_fields = BTreeMap::new();
290 if version >= 4 {
291 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
292 for _ in 0..num_tagged_fields {
293 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
294 let size: u32 = types::UnsignedVarInt.decode(buf)?;
295 let unknown_value = buf.try_get_bytes(size as usize)?;
296 unknown_tagged_fields.insert(tag as i32, unknown_value);
297 }
298 }
299 Ok(Self {
300 partition,
301 current_leader_epoch,
302 leader_epoch,
303 unknown_tagged_fields,
304 })
305 }
306}
307
308impl Default for OffsetForLeaderPartition {
309 fn default() -> Self {
310 Self {
311 partition: 0,
312 current_leader_epoch: -1,
313 leader_epoch: 0,
314 unknown_tagged_fields: BTreeMap::new(),
315 }
316 }
317}
318
319impl Message for OffsetForLeaderPartition {
320 const VERSIONS: VersionRange = VersionRange { min: 2, max: 4 };
321 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
322}
323
324#[non_exhaustive]
326#[derive(Debug, Clone, PartialEq)]
327pub struct OffsetForLeaderTopic {
328 pub topic: super::TopicName,
332
333 pub partitions: Vec<OffsetForLeaderPartition>,
337
338 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
340}
341
342impl OffsetForLeaderTopic {
343 pub fn with_topic(mut self, value: super::TopicName) -> Self {
349 self.topic = value;
350 self
351 }
352 pub fn with_partitions(mut self, value: Vec<OffsetForLeaderPartition>) -> Self {
358 self.partitions = value;
359 self
360 }
361 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
363 self.unknown_tagged_fields = value;
364 self
365 }
366 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
368 self.unknown_tagged_fields.insert(key, value);
369 self
370 }
371}
372
373#[cfg(feature = "client")]
374impl Encodable for OffsetForLeaderTopic {
375 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
376 if version < 2 || version > 4 {
377 bail!("specified version not supported by this message type");
378 }
379 if version >= 4 {
380 types::CompactString.encode(buf, &self.topic)?;
381 } else {
382 types::String.encode(buf, &self.topic)?;
383 }
384 if version >= 4 {
385 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
386 } else {
387 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
388 }
389 if version >= 4 {
390 let num_tagged_fields = self.unknown_tagged_fields.len();
391 if num_tagged_fields > std::u32::MAX as usize {
392 bail!(
393 "Too many tagged fields to encode ({} fields)",
394 num_tagged_fields
395 );
396 }
397 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
398
399 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
400 }
401 Ok(())
402 }
403 fn compute_size(&self, version: i16) -> Result<usize> {
404 let mut total_size = 0;
405 if version >= 4 {
406 total_size += types::CompactString.compute_size(&self.topic)?;
407 } else {
408 total_size += types::String.compute_size(&self.topic)?;
409 }
410 if version >= 4 {
411 total_size +=
412 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
413 } else {
414 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
415 }
416 if version >= 4 {
417 let num_tagged_fields = self.unknown_tagged_fields.len();
418 if num_tagged_fields > std::u32::MAX as usize {
419 bail!(
420 "Too many tagged fields to encode ({} fields)",
421 num_tagged_fields
422 );
423 }
424 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
425
426 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
427 }
428 Ok(total_size)
429 }
430}
431
432#[cfg(feature = "broker")]
433impl Decodable for OffsetForLeaderTopic {
434 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
435 if version < 2 || version > 4 {
436 bail!("specified version not supported by this message type");
437 }
438 let topic = if version >= 4 {
439 types::CompactString.decode(buf)?
440 } else {
441 types::String.decode(buf)?
442 };
443 let partitions = if version >= 4 {
444 types::CompactArray(types::Struct { version }).decode(buf)?
445 } else {
446 types::Array(types::Struct { version }).decode(buf)?
447 };
448 let mut unknown_tagged_fields = BTreeMap::new();
449 if version >= 4 {
450 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
451 for _ in 0..num_tagged_fields {
452 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
453 let size: u32 = types::UnsignedVarInt.decode(buf)?;
454 let unknown_value = buf.try_get_bytes(size as usize)?;
455 unknown_tagged_fields.insert(tag as i32, unknown_value);
456 }
457 }
458 Ok(Self {
459 topic,
460 partitions,
461 unknown_tagged_fields,
462 })
463 }
464}
465
466impl Default for OffsetForLeaderTopic {
467 fn default() -> Self {
468 Self {
469 topic: Default::default(),
470 partitions: Default::default(),
471 unknown_tagged_fields: BTreeMap::new(),
472 }
473 }
474}
475
476impl Message for OffsetForLeaderTopic {
477 const VERSIONS: VersionRange = VersionRange { min: 2, max: 4 };
478 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
479}
480
481impl HeaderVersion for OffsetForLeaderEpochRequest {
482 fn header_version(version: i16) -> i16 {
483 if version >= 4 {
484 2
485 } else {
486 1
487 }
488 }
489}