1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct EpochEndOffset {
24 pub error_code: i16,
28
29 pub partition: i32,
33
34 pub leader_epoch: i32,
38
39 pub end_offset: i64,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl EpochEndOffset {
49 pub fn with_error_code(mut self, value: i16) -> Self {
55 self.error_code = value;
56 self
57 }
58 pub fn with_partition(mut self, value: i32) -> Self {
64 self.partition = value;
65 self
66 }
67 pub fn with_leader_epoch(mut self, value: i32) -> Self {
73 self.leader_epoch = value;
74 self
75 }
76 pub fn with_end_offset(mut self, value: i64) -> Self {
82 self.end_offset = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "broker")]
98impl Encodable for EpochEndOffset {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 if version < 2 || version > 4 {
101 bail!("specified version not supported by this message type");
102 }
103 types::Int16.encode(buf, &self.error_code)?;
104 types::Int32.encode(buf, &self.partition)?;
105 types::Int32.encode(buf, &self.leader_epoch)?;
106 types::Int64.encode(buf, &self.end_offset)?;
107 if version >= 4 {
108 let num_tagged_fields = self.unknown_tagged_fields.len();
109 if num_tagged_fields > std::u32::MAX as usize {
110 bail!(
111 "Too many tagged fields to encode ({} fields)",
112 num_tagged_fields
113 );
114 }
115 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
116
117 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
118 }
119 Ok(())
120 }
121 fn compute_size(&self, version: i16) -> Result<usize> {
122 let mut total_size = 0;
123 total_size += types::Int16.compute_size(&self.error_code)?;
124 total_size += types::Int32.compute_size(&self.partition)?;
125 total_size += types::Int32.compute_size(&self.leader_epoch)?;
126 total_size += types::Int64.compute_size(&self.end_offset)?;
127 if version >= 4 {
128 let num_tagged_fields = self.unknown_tagged_fields.len();
129 if num_tagged_fields > std::u32::MAX as usize {
130 bail!(
131 "Too many tagged fields to encode ({} fields)",
132 num_tagged_fields
133 );
134 }
135 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
136
137 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
138 }
139 Ok(total_size)
140 }
141}
142
143#[cfg(feature = "client")]
144impl Decodable for EpochEndOffset {
145 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
146 if version < 2 || version > 4 {
147 bail!("specified version not supported by this message type");
148 }
149 let error_code = types::Int16.decode(buf)?;
150 let partition = types::Int32.decode(buf)?;
151 let leader_epoch = types::Int32.decode(buf)?;
152 let end_offset = types::Int64.decode(buf)?;
153 let mut unknown_tagged_fields = BTreeMap::new();
154 if version >= 4 {
155 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
156 for _ in 0..num_tagged_fields {
157 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
158 let size: u32 = types::UnsignedVarInt.decode(buf)?;
159 let unknown_value = buf.try_get_bytes(size as usize)?;
160 unknown_tagged_fields.insert(tag as i32, unknown_value);
161 }
162 }
163 Ok(Self {
164 error_code,
165 partition,
166 leader_epoch,
167 end_offset,
168 unknown_tagged_fields,
169 })
170 }
171}
172
173impl Default for EpochEndOffset {
174 fn default() -> Self {
175 Self {
176 error_code: 0,
177 partition: 0,
178 leader_epoch: -1,
179 end_offset: -1,
180 unknown_tagged_fields: BTreeMap::new(),
181 }
182 }
183}
184
185impl Message for EpochEndOffset {
186 const VERSIONS: VersionRange = VersionRange { min: 2, max: 4 };
187 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
188}
189
190#[non_exhaustive]
192#[derive(Debug, Clone, PartialEq)]
193pub struct OffsetForLeaderEpochResponse {
194 pub throttle_time_ms: i32,
198
199 pub topics: Vec<OffsetForLeaderTopicResult>,
203
204 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
206}
207
208impl OffsetForLeaderEpochResponse {
209 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
215 self.throttle_time_ms = value;
216 self
217 }
218 pub fn with_topics(mut self, value: Vec<OffsetForLeaderTopicResult>) -> Self {
224 self.topics = value;
225 self
226 }
227 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
229 self.unknown_tagged_fields = value;
230 self
231 }
232 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
234 self.unknown_tagged_fields.insert(key, value);
235 self
236 }
237}
238
239#[cfg(feature = "broker")]
240impl Encodable for OffsetForLeaderEpochResponse {
241 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
242 if version < 2 || version > 4 {
243 bail!("specified version not supported by this message type");
244 }
245 types::Int32.encode(buf, &self.throttle_time_ms)?;
246 if version >= 4 {
247 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
248 } else {
249 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
250 }
251 if version >= 4 {
252 let num_tagged_fields = self.unknown_tagged_fields.len();
253 if num_tagged_fields > std::u32::MAX as usize {
254 bail!(
255 "Too many tagged fields to encode ({} fields)",
256 num_tagged_fields
257 );
258 }
259 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
260
261 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
262 }
263 Ok(())
264 }
265 fn compute_size(&self, version: i16) -> Result<usize> {
266 let mut total_size = 0;
267 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
268 if version >= 4 {
269 total_size +=
270 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
271 } else {
272 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
273 }
274 if version >= 4 {
275 let num_tagged_fields = self.unknown_tagged_fields.len();
276 if num_tagged_fields > std::u32::MAX as usize {
277 bail!(
278 "Too many tagged fields to encode ({} fields)",
279 num_tagged_fields
280 );
281 }
282 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
283
284 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
285 }
286 Ok(total_size)
287 }
288}
289
290#[cfg(feature = "client")]
291impl Decodable for OffsetForLeaderEpochResponse {
292 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
293 if version < 2 || version > 4 {
294 bail!("specified version not supported by this message type");
295 }
296 let throttle_time_ms = types::Int32.decode(buf)?;
297 let topics = if version >= 4 {
298 types::CompactArray(types::Struct { version }).decode(buf)?
299 } else {
300 types::Array(types::Struct { version }).decode(buf)?
301 };
302 let mut unknown_tagged_fields = BTreeMap::new();
303 if version >= 4 {
304 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
305 for _ in 0..num_tagged_fields {
306 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
307 let size: u32 = types::UnsignedVarInt.decode(buf)?;
308 let unknown_value = buf.try_get_bytes(size as usize)?;
309 unknown_tagged_fields.insert(tag as i32, unknown_value);
310 }
311 }
312 Ok(Self {
313 throttle_time_ms,
314 topics,
315 unknown_tagged_fields,
316 })
317 }
318}
319
320impl Default for OffsetForLeaderEpochResponse {
321 fn default() -> Self {
322 Self {
323 throttle_time_ms: 0,
324 topics: Default::default(),
325 unknown_tagged_fields: BTreeMap::new(),
326 }
327 }
328}
329
330impl Message for OffsetForLeaderEpochResponse {
331 const VERSIONS: VersionRange = VersionRange { min: 2, max: 4 };
332 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
333}
334
335#[non_exhaustive]
337#[derive(Debug, Clone, PartialEq)]
338pub struct OffsetForLeaderTopicResult {
339 pub topic: super::TopicName,
343
344 pub partitions: Vec<EpochEndOffset>,
348
349 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
351}
352
353impl OffsetForLeaderTopicResult {
354 pub fn with_topic(mut self, value: super::TopicName) -> Self {
360 self.topic = value;
361 self
362 }
363 pub fn with_partitions(mut self, value: Vec<EpochEndOffset>) -> Self {
369 self.partitions = value;
370 self
371 }
372 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
374 self.unknown_tagged_fields = value;
375 self
376 }
377 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
379 self.unknown_tagged_fields.insert(key, value);
380 self
381 }
382}
383
384#[cfg(feature = "broker")]
385impl Encodable for OffsetForLeaderTopicResult {
386 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
387 if version < 2 || version > 4 {
388 bail!("specified version not supported by this message type");
389 }
390 if version >= 4 {
391 types::CompactString.encode(buf, &self.topic)?;
392 } else {
393 types::String.encode(buf, &self.topic)?;
394 }
395 if version >= 4 {
396 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
397 } else {
398 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
399 }
400 if version >= 4 {
401 let num_tagged_fields = self.unknown_tagged_fields.len();
402 if num_tagged_fields > std::u32::MAX as usize {
403 bail!(
404 "Too many tagged fields to encode ({} fields)",
405 num_tagged_fields
406 );
407 }
408 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
409
410 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
411 }
412 Ok(())
413 }
414 fn compute_size(&self, version: i16) -> Result<usize> {
415 let mut total_size = 0;
416 if version >= 4 {
417 total_size += types::CompactString.compute_size(&self.topic)?;
418 } else {
419 total_size += types::String.compute_size(&self.topic)?;
420 }
421 if version >= 4 {
422 total_size +=
423 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
424 } else {
425 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
426 }
427 if version >= 4 {
428 let num_tagged_fields = self.unknown_tagged_fields.len();
429 if num_tagged_fields > std::u32::MAX as usize {
430 bail!(
431 "Too many tagged fields to encode ({} fields)",
432 num_tagged_fields
433 );
434 }
435 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
436
437 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
438 }
439 Ok(total_size)
440 }
441}
442
443#[cfg(feature = "client")]
444impl Decodable for OffsetForLeaderTopicResult {
445 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
446 if version < 2 || version > 4 {
447 bail!("specified version not supported by this message type");
448 }
449 let topic = if version >= 4 {
450 types::CompactString.decode(buf)?
451 } else {
452 types::String.decode(buf)?
453 };
454 let partitions = if version >= 4 {
455 types::CompactArray(types::Struct { version }).decode(buf)?
456 } else {
457 types::Array(types::Struct { version }).decode(buf)?
458 };
459 let mut unknown_tagged_fields = BTreeMap::new();
460 if version >= 4 {
461 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
462 for _ in 0..num_tagged_fields {
463 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
464 let size: u32 = types::UnsignedVarInt.decode(buf)?;
465 let unknown_value = buf.try_get_bytes(size as usize)?;
466 unknown_tagged_fields.insert(tag as i32, unknown_value);
467 }
468 }
469 Ok(Self {
470 topic,
471 partitions,
472 unknown_tagged_fields,
473 })
474 }
475}
476
477impl Default for OffsetForLeaderTopicResult {
478 fn default() -> Self {
479 Self {
480 topic: Default::default(),
481 partitions: Default::default(),
482 unknown_tagged_fields: BTreeMap::new(),
483 }
484 }
485}
486
487impl Message for OffsetForLeaderTopicResult {
488 const VERSIONS: VersionRange = VersionRange { min: 2, max: 4 };
489 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
490}
491
492impl HeaderVersion for OffsetForLeaderEpochResponse {
493 fn header_version(version: i16) -> i16 {
494 if version >= 4 {
495 1
496 } else {
497 0
498 }
499 }
500}