kafka_protocol/messages/
offset_for_leader_epoch_request.rs

1//! OffsetForLeaderEpochRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-4
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct OffsetForLeaderEpochRequest {
24    /// The broker ID of the follower, of -1 if this request is from a consumer.
25    ///
26    /// Supported API versions: 3-4
27    pub replica_id: super::BrokerId,
28
29    /// Each topic to get offsets for.
30    ///
31    /// Supported API versions: 0-4
32    pub topics: Vec<OffsetForLeaderTopic>,
33
34    /// Other tagged fields
35    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl OffsetForLeaderEpochRequest {
39    /// Sets `replica_id` to the passed value.
40    ///
41    /// The broker ID of the follower, of -1 if this request is from a consumer.
42    ///
43    /// Supported API versions: 3-4
44    pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
45        self.replica_id = value;
46        self
47    }
48    /// Sets `topics` to the passed value.
49    ///
50    /// Each topic to get offsets for.
51    ///
52    /// Supported API versions: 0-4
53    pub fn with_topics(mut self, value: Vec<OffsetForLeaderTopic>) -> Self {
54        self.topics = value;
55        self
56    }
57    /// Sets unknown_tagged_fields to the passed value.
58    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59        self.unknown_tagged_fields = value;
60        self
61    }
62    /// Inserts an entry into unknown_tagged_fields.
63    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64        self.unknown_tagged_fields.insert(key, value);
65        self
66    }
67}
68
69#[cfg(feature = "client")]
70impl Encodable for OffsetForLeaderEpochRequest {
71    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72        if version < 0 || version > 4 {
73            bail!("specified version not supported by this message type");
74        }
75        if version >= 3 {
76            types::Int32.encode(buf, &self.replica_id)?;
77        }
78        if version >= 4 {
79            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
80        } else {
81            types::Array(types::Struct { version }).encode(buf, &self.topics)?;
82        }
83        if version >= 4 {
84            let num_tagged_fields = self.unknown_tagged_fields.len();
85            if num_tagged_fields > std::u32::MAX as usize {
86                bail!(
87                    "Too many tagged fields to encode ({} fields)",
88                    num_tagged_fields
89                );
90            }
91            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
92
93            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
94        }
95        Ok(())
96    }
97    fn compute_size(&self, version: i16) -> Result<usize> {
98        let mut total_size = 0;
99        if version >= 3 {
100            total_size += types::Int32.compute_size(&self.replica_id)?;
101        }
102        if version >= 4 {
103            total_size +=
104                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
105        } else {
106            total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
107        }
108        if version >= 4 {
109            let num_tagged_fields = self.unknown_tagged_fields.len();
110            if num_tagged_fields > std::u32::MAX as usize {
111                bail!(
112                    "Too many tagged fields to encode ({} fields)",
113                    num_tagged_fields
114                );
115            }
116            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
117
118            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
119        }
120        Ok(total_size)
121    }
122}
123
124#[cfg(feature = "broker")]
125impl Decodable for OffsetForLeaderEpochRequest {
126    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
127        if version < 0 || version > 4 {
128            bail!("specified version not supported by this message type");
129        }
130        let replica_id = if version >= 3 {
131            types::Int32.decode(buf)?
132        } else {
133            (-2).into()
134        };
135        let topics = if version >= 4 {
136            types::CompactArray(types::Struct { version }).decode(buf)?
137        } else {
138            types::Array(types::Struct { version }).decode(buf)?
139        };
140        let mut unknown_tagged_fields = BTreeMap::new();
141        if version >= 4 {
142            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
143            for _ in 0..num_tagged_fields {
144                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
145                let size: u32 = types::UnsignedVarInt.decode(buf)?;
146                let unknown_value = buf.try_get_bytes(size as usize)?;
147                unknown_tagged_fields.insert(tag as i32, unknown_value);
148            }
149        }
150        Ok(Self {
151            replica_id,
152            topics,
153            unknown_tagged_fields,
154        })
155    }
156}
157
158impl Default for OffsetForLeaderEpochRequest {
159    fn default() -> Self {
160        Self {
161            replica_id: (-2).into(),
162            topics: Default::default(),
163            unknown_tagged_fields: BTreeMap::new(),
164        }
165    }
166}
167
168impl Message for OffsetForLeaderEpochRequest {
169    const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
170    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
171}
172
173/// Valid versions: 0-4
174#[non_exhaustive]
175#[derive(Debug, Clone, PartialEq)]
176pub struct OffsetForLeaderPartition {
177    /// The partition index.
178    ///
179    /// Supported API versions: 0-4
180    pub partition: i32,
181
182    /// An epoch used to fence consumers/replicas with old metadata. If the epoch provided by the client is larger than the current epoch known to the broker, then the UNKNOWN_LEADER_EPOCH error code will be returned. If the provided epoch is smaller, then the FENCED_LEADER_EPOCH error code will be returned.
183    ///
184    /// Supported API versions: 2-4
185    pub current_leader_epoch: i32,
186
187    /// The epoch to look up an offset for.
188    ///
189    /// Supported API versions: 0-4
190    pub leader_epoch: i32,
191
192    /// Other tagged fields
193    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
194}
195
196impl OffsetForLeaderPartition {
197    /// Sets `partition` to the passed value.
198    ///
199    /// The partition index.
200    ///
201    /// Supported API versions: 0-4
202    pub fn with_partition(mut self, value: i32) -> Self {
203        self.partition = value;
204        self
205    }
206    /// Sets `current_leader_epoch` to the passed value.
207    ///
208    /// An epoch used to fence consumers/replicas with old metadata. If the epoch provided by the client is larger than the current epoch known to the broker, then the UNKNOWN_LEADER_EPOCH error code will be returned. If the provided epoch is smaller, then the FENCED_LEADER_EPOCH error code will be returned.
209    ///
210    /// Supported API versions: 2-4
211    pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
212        self.current_leader_epoch = value;
213        self
214    }
215    /// Sets `leader_epoch` to the passed value.
216    ///
217    /// The epoch to look up an offset for.
218    ///
219    /// Supported API versions: 0-4
220    pub fn with_leader_epoch(mut self, value: i32) -> Self {
221        self.leader_epoch = value;
222        self
223    }
224    /// Sets unknown_tagged_fields to the passed value.
225    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
226        self.unknown_tagged_fields = value;
227        self
228    }
229    /// Inserts an entry into unknown_tagged_fields.
230    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
231        self.unknown_tagged_fields.insert(key, value);
232        self
233    }
234}
235
236#[cfg(feature = "client")]
237impl Encodable for OffsetForLeaderPartition {
238    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
239        if version < 0 || version > 4 {
240            bail!("specified version not supported by this message type");
241        }
242        types::Int32.encode(buf, &self.partition)?;
243        if version >= 2 {
244            types::Int32.encode(buf, &self.current_leader_epoch)?;
245        }
246        types::Int32.encode(buf, &self.leader_epoch)?;
247        if version >= 4 {
248            let num_tagged_fields = self.unknown_tagged_fields.len();
249            if num_tagged_fields > std::u32::MAX as usize {
250                bail!(
251                    "Too many tagged fields to encode ({} fields)",
252                    num_tagged_fields
253                );
254            }
255            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
256
257            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
258        }
259        Ok(())
260    }
261    fn compute_size(&self, version: i16) -> Result<usize> {
262        let mut total_size = 0;
263        total_size += types::Int32.compute_size(&self.partition)?;
264        if version >= 2 {
265            total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
266        }
267        total_size += types::Int32.compute_size(&self.leader_epoch)?;
268        if version >= 4 {
269            let num_tagged_fields = self.unknown_tagged_fields.len();
270            if num_tagged_fields > std::u32::MAX as usize {
271                bail!(
272                    "Too many tagged fields to encode ({} fields)",
273                    num_tagged_fields
274                );
275            }
276            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
277
278            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
279        }
280        Ok(total_size)
281    }
282}
283
284#[cfg(feature = "broker")]
285impl Decodable for OffsetForLeaderPartition {
286    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
287        if version < 0 || version > 4 {
288            bail!("specified version not supported by this message type");
289        }
290        let partition = types::Int32.decode(buf)?;
291        let current_leader_epoch = if version >= 2 {
292            types::Int32.decode(buf)?
293        } else {
294            -1
295        };
296        let leader_epoch = types::Int32.decode(buf)?;
297        let mut unknown_tagged_fields = BTreeMap::new();
298        if version >= 4 {
299            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
300            for _ in 0..num_tagged_fields {
301                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
302                let size: u32 = types::UnsignedVarInt.decode(buf)?;
303                let unknown_value = buf.try_get_bytes(size as usize)?;
304                unknown_tagged_fields.insert(tag as i32, unknown_value);
305            }
306        }
307        Ok(Self {
308            partition,
309            current_leader_epoch,
310            leader_epoch,
311            unknown_tagged_fields,
312        })
313    }
314}
315
316impl Default for OffsetForLeaderPartition {
317    fn default() -> Self {
318        Self {
319            partition: 0,
320            current_leader_epoch: -1,
321            leader_epoch: 0,
322            unknown_tagged_fields: BTreeMap::new(),
323        }
324    }
325}
326
327impl Message for OffsetForLeaderPartition {
328    const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
329    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
330}
331
332/// Valid versions: 0-4
333#[non_exhaustive]
334#[derive(Debug, Clone, PartialEq)]
335pub struct OffsetForLeaderTopic {
336    /// The topic name.
337    ///
338    /// Supported API versions: 0-4
339    pub topic: super::TopicName,
340
341    /// Each partition to get offsets for.
342    ///
343    /// Supported API versions: 0-4
344    pub partitions: Vec<OffsetForLeaderPartition>,
345
346    /// Other tagged fields
347    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
348}
349
350impl OffsetForLeaderTopic {
351    /// Sets `topic` to the passed value.
352    ///
353    /// The topic name.
354    ///
355    /// Supported API versions: 0-4
356    pub fn with_topic(mut self, value: super::TopicName) -> Self {
357        self.topic = value;
358        self
359    }
360    /// Sets `partitions` to the passed value.
361    ///
362    /// Each partition to get offsets for.
363    ///
364    /// Supported API versions: 0-4
365    pub fn with_partitions(mut self, value: Vec<OffsetForLeaderPartition>) -> Self {
366        self.partitions = value;
367        self
368    }
369    /// Sets unknown_tagged_fields to the passed value.
370    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
371        self.unknown_tagged_fields = value;
372        self
373    }
374    /// Inserts an entry into unknown_tagged_fields.
375    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
376        self.unknown_tagged_fields.insert(key, value);
377        self
378    }
379}
380
381#[cfg(feature = "client")]
382impl Encodable for OffsetForLeaderTopic {
383    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
384        if version < 0 || version > 4 {
385            bail!("specified version not supported by this message type");
386        }
387        if version >= 4 {
388            types::CompactString.encode(buf, &self.topic)?;
389        } else {
390            types::String.encode(buf, &self.topic)?;
391        }
392        if version >= 4 {
393            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
394        } else {
395            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
396        }
397        if version >= 4 {
398            let num_tagged_fields = self.unknown_tagged_fields.len();
399            if num_tagged_fields > std::u32::MAX as usize {
400                bail!(
401                    "Too many tagged fields to encode ({} fields)",
402                    num_tagged_fields
403                );
404            }
405            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
406
407            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
408        }
409        Ok(())
410    }
411    fn compute_size(&self, version: i16) -> Result<usize> {
412        let mut total_size = 0;
413        if version >= 4 {
414            total_size += types::CompactString.compute_size(&self.topic)?;
415        } else {
416            total_size += types::String.compute_size(&self.topic)?;
417        }
418        if version >= 4 {
419            total_size +=
420                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
421        } else {
422            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
423        }
424        if version >= 4 {
425            let num_tagged_fields = self.unknown_tagged_fields.len();
426            if num_tagged_fields > std::u32::MAX as usize {
427                bail!(
428                    "Too many tagged fields to encode ({} fields)",
429                    num_tagged_fields
430                );
431            }
432            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
433
434            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
435        }
436        Ok(total_size)
437    }
438}
439
440#[cfg(feature = "broker")]
441impl Decodable for OffsetForLeaderTopic {
442    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
443        if version < 0 || version > 4 {
444            bail!("specified version not supported by this message type");
445        }
446        let topic = if version >= 4 {
447            types::CompactString.decode(buf)?
448        } else {
449            types::String.decode(buf)?
450        };
451        let partitions = if version >= 4 {
452            types::CompactArray(types::Struct { version }).decode(buf)?
453        } else {
454            types::Array(types::Struct { version }).decode(buf)?
455        };
456        let mut unknown_tagged_fields = BTreeMap::new();
457        if version >= 4 {
458            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
459            for _ in 0..num_tagged_fields {
460                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
461                let size: u32 = types::UnsignedVarInt.decode(buf)?;
462                let unknown_value = buf.try_get_bytes(size as usize)?;
463                unknown_tagged_fields.insert(tag as i32, unknown_value);
464            }
465        }
466        Ok(Self {
467            topic,
468            partitions,
469            unknown_tagged_fields,
470        })
471    }
472}
473
474impl Default for OffsetForLeaderTopic {
475    fn default() -> Self {
476        Self {
477            topic: Default::default(),
478            partitions: Default::default(),
479            unknown_tagged_fields: BTreeMap::new(),
480        }
481    }
482}
483
484impl Message for OffsetForLeaderTopic {
485    const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
486    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
487}
488
489impl HeaderVersion for OffsetForLeaderEpochRequest {
490    fn header_version(version: i16) -> i16 {
491        if version >= 4 {
492            2
493        } else {
494            1
495        }
496    }
497}