1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetForLeaderEpochRequest {
24 pub replica_id: super::BrokerId,
28
29 pub topics: Vec<OffsetForLeaderTopic>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl OffsetForLeaderEpochRequest {
39 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
45 self.replica_id = value;
46 self
47 }
48 pub fn with_topics(mut self, value: Vec<OffsetForLeaderTopic>) -> Self {
54 self.topics = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "client")]
70impl Encodable for OffsetForLeaderEpochRequest {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version >= 3 {
73 types::Int32.encode(buf, &self.replica_id)?;
74 }
75 if version >= 4 {
76 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
77 } else {
78 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
79 }
80 if version >= 4 {
81 let num_tagged_fields = self.unknown_tagged_fields.len();
82 if num_tagged_fields > std::u32::MAX as usize {
83 bail!(
84 "Too many tagged fields to encode ({} fields)",
85 num_tagged_fields
86 );
87 }
88 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
89
90 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
91 }
92 Ok(())
93 }
94 fn compute_size(&self, version: i16) -> Result<usize> {
95 let mut total_size = 0;
96 if version >= 3 {
97 total_size += types::Int32.compute_size(&self.replica_id)?;
98 }
99 if version >= 4 {
100 total_size +=
101 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
102 } else {
103 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
104 }
105 if version >= 4 {
106 let num_tagged_fields = self.unknown_tagged_fields.len();
107 if num_tagged_fields > std::u32::MAX as usize {
108 bail!(
109 "Too many tagged fields to encode ({} fields)",
110 num_tagged_fields
111 );
112 }
113 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
114
115 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
116 }
117 Ok(total_size)
118 }
119}
120
121#[cfg(feature = "broker")]
122impl Decodable for OffsetForLeaderEpochRequest {
123 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
124 let replica_id = if version >= 3 {
125 types::Int32.decode(buf)?
126 } else {
127 (-2).into()
128 };
129 let topics = if version >= 4 {
130 types::CompactArray(types::Struct { version }).decode(buf)?
131 } else {
132 types::Array(types::Struct { version }).decode(buf)?
133 };
134 let mut unknown_tagged_fields = BTreeMap::new();
135 if version >= 4 {
136 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
137 for _ in 0..num_tagged_fields {
138 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
139 let size: u32 = types::UnsignedVarInt.decode(buf)?;
140 let unknown_value = buf.try_get_bytes(size as usize)?;
141 unknown_tagged_fields.insert(tag as i32, unknown_value);
142 }
143 }
144 Ok(Self {
145 replica_id,
146 topics,
147 unknown_tagged_fields,
148 })
149 }
150}
151
152impl Default for OffsetForLeaderEpochRequest {
153 fn default() -> Self {
154 Self {
155 replica_id: (-2).into(),
156 topics: Default::default(),
157 unknown_tagged_fields: BTreeMap::new(),
158 }
159 }
160}
161
162impl Message for OffsetForLeaderEpochRequest {
163 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
164 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
165}
166
167#[non_exhaustive]
169#[derive(Debug, Clone, PartialEq)]
170pub struct OffsetForLeaderPartition {
171 pub partition: i32,
175
176 pub current_leader_epoch: i32,
180
181 pub leader_epoch: i32,
185
186 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
188}
189
190impl OffsetForLeaderPartition {
191 pub fn with_partition(mut self, value: i32) -> Self {
197 self.partition = value;
198 self
199 }
200 pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
206 self.current_leader_epoch = value;
207 self
208 }
209 pub fn with_leader_epoch(mut self, value: i32) -> Self {
215 self.leader_epoch = value;
216 self
217 }
218 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
220 self.unknown_tagged_fields = value;
221 self
222 }
223 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
225 self.unknown_tagged_fields.insert(key, value);
226 self
227 }
228}
229
230#[cfg(feature = "client")]
231impl Encodable for OffsetForLeaderPartition {
232 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
233 types::Int32.encode(buf, &self.partition)?;
234 if version >= 2 {
235 types::Int32.encode(buf, &self.current_leader_epoch)?;
236 }
237 types::Int32.encode(buf, &self.leader_epoch)?;
238 if version >= 4 {
239 let num_tagged_fields = self.unknown_tagged_fields.len();
240 if num_tagged_fields > std::u32::MAX as usize {
241 bail!(
242 "Too many tagged fields to encode ({} fields)",
243 num_tagged_fields
244 );
245 }
246 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
247
248 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
249 }
250 Ok(())
251 }
252 fn compute_size(&self, version: i16) -> Result<usize> {
253 let mut total_size = 0;
254 total_size += types::Int32.compute_size(&self.partition)?;
255 if version >= 2 {
256 total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
257 }
258 total_size += types::Int32.compute_size(&self.leader_epoch)?;
259 if version >= 4 {
260 let num_tagged_fields = self.unknown_tagged_fields.len();
261 if num_tagged_fields > std::u32::MAX as usize {
262 bail!(
263 "Too many tagged fields to encode ({} fields)",
264 num_tagged_fields
265 );
266 }
267 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
268
269 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
270 }
271 Ok(total_size)
272 }
273}
274
275#[cfg(feature = "broker")]
276impl Decodable for OffsetForLeaderPartition {
277 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
278 let partition = types::Int32.decode(buf)?;
279 let current_leader_epoch = if version >= 2 {
280 types::Int32.decode(buf)?
281 } else {
282 -1
283 };
284 let leader_epoch = types::Int32.decode(buf)?;
285 let mut unknown_tagged_fields = BTreeMap::new();
286 if version >= 4 {
287 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
288 for _ in 0..num_tagged_fields {
289 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
290 let size: u32 = types::UnsignedVarInt.decode(buf)?;
291 let unknown_value = buf.try_get_bytes(size as usize)?;
292 unknown_tagged_fields.insert(tag as i32, unknown_value);
293 }
294 }
295 Ok(Self {
296 partition,
297 current_leader_epoch,
298 leader_epoch,
299 unknown_tagged_fields,
300 })
301 }
302}
303
304impl Default for OffsetForLeaderPartition {
305 fn default() -> Self {
306 Self {
307 partition: 0,
308 current_leader_epoch: -1,
309 leader_epoch: 0,
310 unknown_tagged_fields: BTreeMap::new(),
311 }
312 }
313}
314
315impl Message for OffsetForLeaderPartition {
316 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
317 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
318}
319
320#[non_exhaustive]
322#[derive(Debug, Clone, PartialEq)]
323pub struct OffsetForLeaderTopic {
324 pub topic: super::TopicName,
328
329 pub partitions: Vec<OffsetForLeaderPartition>,
333
334 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
336}
337
338impl OffsetForLeaderTopic {
339 pub fn with_topic(mut self, value: super::TopicName) -> Self {
345 self.topic = value;
346 self
347 }
348 pub fn with_partitions(mut self, value: Vec<OffsetForLeaderPartition>) -> Self {
354 self.partitions = value;
355 self
356 }
357 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
359 self.unknown_tagged_fields = value;
360 self
361 }
362 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
364 self.unknown_tagged_fields.insert(key, value);
365 self
366 }
367}
368
369#[cfg(feature = "client")]
370impl Encodable for OffsetForLeaderTopic {
371 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
372 if version >= 4 {
373 types::CompactString.encode(buf, &self.topic)?;
374 } else {
375 types::String.encode(buf, &self.topic)?;
376 }
377 if version >= 4 {
378 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
379 } else {
380 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
381 }
382 if version >= 4 {
383 let num_tagged_fields = self.unknown_tagged_fields.len();
384 if num_tagged_fields > std::u32::MAX as usize {
385 bail!(
386 "Too many tagged fields to encode ({} fields)",
387 num_tagged_fields
388 );
389 }
390 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
391
392 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
393 }
394 Ok(())
395 }
396 fn compute_size(&self, version: i16) -> Result<usize> {
397 let mut total_size = 0;
398 if version >= 4 {
399 total_size += types::CompactString.compute_size(&self.topic)?;
400 } else {
401 total_size += types::String.compute_size(&self.topic)?;
402 }
403 if version >= 4 {
404 total_size +=
405 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
406 } else {
407 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
408 }
409 if version >= 4 {
410 let num_tagged_fields = self.unknown_tagged_fields.len();
411 if num_tagged_fields > std::u32::MAX as usize {
412 bail!(
413 "Too many tagged fields to encode ({} fields)",
414 num_tagged_fields
415 );
416 }
417 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
418
419 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
420 }
421 Ok(total_size)
422 }
423}
424
425#[cfg(feature = "broker")]
426impl Decodable for OffsetForLeaderTopic {
427 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
428 let topic = if version >= 4 {
429 types::CompactString.decode(buf)?
430 } else {
431 types::String.decode(buf)?
432 };
433 let partitions = if version >= 4 {
434 types::CompactArray(types::Struct { version }).decode(buf)?
435 } else {
436 types::Array(types::Struct { version }).decode(buf)?
437 };
438 let mut unknown_tagged_fields = BTreeMap::new();
439 if version >= 4 {
440 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
441 for _ in 0..num_tagged_fields {
442 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
443 let size: u32 = types::UnsignedVarInt.decode(buf)?;
444 let unknown_value = buf.try_get_bytes(size as usize)?;
445 unknown_tagged_fields.insert(tag as i32, unknown_value);
446 }
447 }
448 Ok(Self {
449 topic,
450 partitions,
451 unknown_tagged_fields,
452 })
453 }
454}
455
456impl Default for OffsetForLeaderTopic {
457 fn default() -> Self {
458 Self {
459 topic: Default::default(),
460 partitions: Default::default(),
461 unknown_tagged_fields: BTreeMap::new(),
462 }
463 }
464}
465
466impl Message for OffsetForLeaderTopic {
467 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
468 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
469}
470
471impl HeaderVersion for OffsetForLeaderEpochRequest {
472 fn header_version(version: i16) -> i16 {
473 if version >= 4 {
474 2
475 } else {
476 1
477 }
478 }
479}