kafka_protocol/messages/
list_offsets_request.rs

1//! ListOffsetsRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ListOffsetsRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-8
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ListOffsetsPartition {
24    /// The partition index.
25    ///
26    /// Supported API versions: 0-8
27    pub partition_index: i32,
28
29    /// The current leader epoch.
30    ///
31    /// Supported API versions: 4-8
32    pub current_leader_epoch: i32,
33
34    /// The current timestamp.
35    ///
36    /// Supported API versions: 0-8
37    pub timestamp: i64,
38
39    /// The maximum number of offsets to report.
40    ///
41    /// Supported API versions: 0
42    pub max_num_offsets: i32,
43
44    /// Other tagged fields
45    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl ListOffsetsPartition {
49    /// Sets `partition_index` to the passed value.
50    ///
51    /// The partition index.
52    ///
53    /// Supported API versions: 0-8
54    pub fn with_partition_index(mut self, value: i32) -> Self {
55        self.partition_index = value;
56        self
57    }
58    /// Sets `current_leader_epoch` to the passed value.
59    ///
60    /// The current leader epoch.
61    ///
62    /// Supported API versions: 4-8
63    pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
64        self.current_leader_epoch = value;
65        self
66    }
67    /// Sets `timestamp` to the passed value.
68    ///
69    /// The current timestamp.
70    ///
71    /// Supported API versions: 0-8
72    pub fn with_timestamp(mut self, value: i64) -> Self {
73        self.timestamp = value;
74        self
75    }
76    /// Sets `max_num_offsets` to the passed value.
77    ///
78    /// The maximum number of offsets to report.
79    ///
80    /// Supported API versions: 0
81    pub fn with_max_num_offsets(mut self, value: i32) -> Self {
82        self.max_num_offsets = value;
83        self
84    }
85    /// Sets unknown_tagged_fields to the passed value.
86    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87        self.unknown_tagged_fields = value;
88        self
89    }
90    /// Inserts an entry into unknown_tagged_fields.
91    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92        self.unknown_tagged_fields.insert(key, value);
93        self
94    }
95}
96
97#[cfg(feature = "client")]
98impl Encodable for ListOffsetsPartition {
99    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100        types::Int32.encode(buf, &self.partition_index)?;
101        if version >= 4 {
102            types::Int32.encode(buf, &self.current_leader_epoch)?;
103        }
104        types::Int64.encode(buf, &self.timestamp)?;
105        if version == 0 {
106            types::Int32.encode(buf, &self.max_num_offsets)?;
107        } else {
108            if self.max_num_offsets != 1 {
109                bail!("A field is set that is not available on the selected protocol version");
110            }
111        }
112        if version >= 6 {
113            let num_tagged_fields = self.unknown_tagged_fields.len();
114            if num_tagged_fields > std::u32::MAX as usize {
115                bail!(
116                    "Too many tagged fields to encode ({} fields)",
117                    num_tagged_fields
118                );
119            }
120            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
121
122            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
123        }
124        Ok(())
125    }
126    fn compute_size(&self, version: i16) -> Result<usize> {
127        let mut total_size = 0;
128        total_size += types::Int32.compute_size(&self.partition_index)?;
129        if version >= 4 {
130            total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
131        }
132        total_size += types::Int64.compute_size(&self.timestamp)?;
133        if version == 0 {
134            total_size += types::Int32.compute_size(&self.max_num_offsets)?;
135        } else {
136            if self.max_num_offsets != 1 {
137                bail!("A field is set that is not available on the selected protocol version");
138            }
139        }
140        if version >= 6 {
141            let num_tagged_fields = self.unknown_tagged_fields.len();
142            if num_tagged_fields > std::u32::MAX as usize {
143                bail!(
144                    "Too many tagged fields to encode ({} fields)",
145                    num_tagged_fields
146                );
147            }
148            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
149
150            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
151        }
152        Ok(total_size)
153    }
154}
155
156#[cfg(feature = "broker")]
157impl Decodable for ListOffsetsPartition {
158    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
159        let partition_index = types::Int32.decode(buf)?;
160        let current_leader_epoch = if version >= 4 {
161            types::Int32.decode(buf)?
162        } else {
163            -1
164        };
165        let timestamp = types::Int64.decode(buf)?;
166        let max_num_offsets = if version == 0 {
167            types::Int32.decode(buf)?
168        } else {
169            1
170        };
171        let mut unknown_tagged_fields = BTreeMap::new();
172        if version >= 6 {
173            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
174            for _ in 0..num_tagged_fields {
175                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
176                let size: u32 = types::UnsignedVarInt.decode(buf)?;
177                let unknown_value = buf.try_get_bytes(size as usize)?;
178                unknown_tagged_fields.insert(tag as i32, unknown_value);
179            }
180        }
181        Ok(Self {
182            partition_index,
183            current_leader_epoch,
184            timestamp,
185            max_num_offsets,
186            unknown_tagged_fields,
187        })
188    }
189}
190
191impl Default for ListOffsetsPartition {
192    fn default() -> Self {
193        Self {
194            partition_index: 0,
195            current_leader_epoch: -1,
196            timestamp: 0,
197            max_num_offsets: 1,
198            unknown_tagged_fields: BTreeMap::new(),
199        }
200    }
201}
202
203impl Message for ListOffsetsPartition {
204    const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
205    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
206}
207
208/// Valid versions: 0-8
209#[non_exhaustive]
210#[derive(Debug, Clone, PartialEq)]
211pub struct ListOffsetsRequest {
212    /// The broker ID of the requester, or -1 if this request is being made by a normal consumer.
213    ///
214    /// Supported API versions: 0-8
215    pub replica_id: super::BrokerId,
216
217    /// This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records
218    ///
219    /// Supported API versions: 2-8
220    pub isolation_level: i8,
221
222    /// Each topic in the request.
223    ///
224    /// Supported API versions: 0-8
225    pub topics: Vec<ListOffsetsTopic>,
226
227    /// Other tagged fields
228    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
229}
230
231impl ListOffsetsRequest {
232    /// Sets `replica_id` to the passed value.
233    ///
234    /// The broker ID of the requester, or -1 if this request is being made by a normal consumer.
235    ///
236    /// Supported API versions: 0-8
237    pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
238        self.replica_id = value;
239        self
240    }
241    /// Sets `isolation_level` to the passed value.
242    ///
243    /// This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records
244    ///
245    /// Supported API versions: 2-8
246    pub fn with_isolation_level(mut self, value: i8) -> Self {
247        self.isolation_level = value;
248        self
249    }
250    /// Sets `topics` to the passed value.
251    ///
252    /// Each topic in the request.
253    ///
254    /// Supported API versions: 0-8
255    pub fn with_topics(mut self, value: Vec<ListOffsetsTopic>) -> Self {
256        self.topics = value;
257        self
258    }
259    /// Sets unknown_tagged_fields to the passed value.
260    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
261        self.unknown_tagged_fields = value;
262        self
263    }
264    /// Inserts an entry into unknown_tagged_fields.
265    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
266        self.unknown_tagged_fields.insert(key, value);
267        self
268    }
269}
270
271#[cfg(feature = "client")]
272impl Encodable for ListOffsetsRequest {
273    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
274        types::Int32.encode(buf, &self.replica_id)?;
275        if version >= 2 {
276            types::Int8.encode(buf, &self.isolation_level)?;
277        } else {
278            if self.isolation_level != 0 {
279                bail!("A field is set that is not available on the selected protocol version");
280            }
281        }
282        if version >= 6 {
283            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
284        } else {
285            types::Array(types::Struct { version }).encode(buf, &self.topics)?;
286        }
287        if version >= 6 {
288            let num_tagged_fields = self.unknown_tagged_fields.len();
289            if num_tagged_fields > std::u32::MAX as usize {
290                bail!(
291                    "Too many tagged fields to encode ({} fields)",
292                    num_tagged_fields
293                );
294            }
295            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
296
297            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
298        }
299        Ok(())
300    }
301    fn compute_size(&self, version: i16) -> Result<usize> {
302        let mut total_size = 0;
303        total_size += types::Int32.compute_size(&self.replica_id)?;
304        if version >= 2 {
305            total_size += types::Int8.compute_size(&self.isolation_level)?;
306        } else {
307            if self.isolation_level != 0 {
308                bail!("A field is set that is not available on the selected protocol version");
309            }
310        }
311        if version >= 6 {
312            total_size +=
313                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
314        } else {
315            total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
316        }
317        if version >= 6 {
318            let num_tagged_fields = self.unknown_tagged_fields.len();
319            if num_tagged_fields > std::u32::MAX as usize {
320                bail!(
321                    "Too many tagged fields to encode ({} fields)",
322                    num_tagged_fields
323                );
324            }
325            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
326
327            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
328        }
329        Ok(total_size)
330    }
331}
332
333#[cfg(feature = "broker")]
334impl Decodable for ListOffsetsRequest {
335    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
336        let replica_id = types::Int32.decode(buf)?;
337        let isolation_level = if version >= 2 {
338            types::Int8.decode(buf)?
339        } else {
340            0
341        };
342        let topics = if version >= 6 {
343            types::CompactArray(types::Struct { version }).decode(buf)?
344        } else {
345            types::Array(types::Struct { version }).decode(buf)?
346        };
347        let mut unknown_tagged_fields = BTreeMap::new();
348        if version >= 6 {
349            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
350            for _ in 0..num_tagged_fields {
351                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
352                let size: u32 = types::UnsignedVarInt.decode(buf)?;
353                let unknown_value = buf.try_get_bytes(size as usize)?;
354                unknown_tagged_fields.insert(tag as i32, unknown_value);
355            }
356        }
357        Ok(Self {
358            replica_id,
359            isolation_level,
360            topics,
361            unknown_tagged_fields,
362        })
363    }
364}
365
366impl Default for ListOffsetsRequest {
367    fn default() -> Self {
368        Self {
369            replica_id: (0).into(),
370            isolation_level: 0,
371            topics: Default::default(),
372            unknown_tagged_fields: BTreeMap::new(),
373        }
374    }
375}
376
377impl Message for ListOffsetsRequest {
378    const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
379    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
380}
381
382/// Valid versions: 0-8
383#[non_exhaustive]
384#[derive(Debug, Clone, PartialEq)]
385pub struct ListOffsetsTopic {
386    /// The topic name.
387    ///
388    /// Supported API versions: 0-8
389    pub name: super::TopicName,
390
391    /// Each partition in the request.
392    ///
393    /// Supported API versions: 0-8
394    pub partitions: Vec<ListOffsetsPartition>,
395
396    /// Other tagged fields
397    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
398}
399
400impl ListOffsetsTopic {
401    /// Sets `name` to the passed value.
402    ///
403    /// The topic name.
404    ///
405    /// Supported API versions: 0-8
406    pub fn with_name(mut self, value: super::TopicName) -> Self {
407        self.name = value;
408        self
409    }
410    /// Sets `partitions` to the passed value.
411    ///
412    /// Each partition in the request.
413    ///
414    /// Supported API versions: 0-8
415    pub fn with_partitions(mut self, value: Vec<ListOffsetsPartition>) -> Self {
416        self.partitions = value;
417        self
418    }
419    /// Sets unknown_tagged_fields to the passed value.
420    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
421        self.unknown_tagged_fields = value;
422        self
423    }
424    /// Inserts an entry into unknown_tagged_fields.
425    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
426        self.unknown_tagged_fields.insert(key, value);
427        self
428    }
429}
430
431#[cfg(feature = "client")]
432impl Encodable for ListOffsetsTopic {
433    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
434        if version >= 6 {
435            types::CompactString.encode(buf, &self.name)?;
436        } else {
437            types::String.encode(buf, &self.name)?;
438        }
439        if version >= 6 {
440            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
441        } else {
442            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
443        }
444        if version >= 6 {
445            let num_tagged_fields = self.unknown_tagged_fields.len();
446            if num_tagged_fields > std::u32::MAX as usize {
447                bail!(
448                    "Too many tagged fields to encode ({} fields)",
449                    num_tagged_fields
450                );
451            }
452            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
453
454            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
455        }
456        Ok(())
457    }
458    fn compute_size(&self, version: i16) -> Result<usize> {
459        let mut total_size = 0;
460        if version >= 6 {
461            total_size += types::CompactString.compute_size(&self.name)?;
462        } else {
463            total_size += types::String.compute_size(&self.name)?;
464        }
465        if version >= 6 {
466            total_size +=
467                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
468        } else {
469            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
470        }
471        if version >= 6 {
472            let num_tagged_fields = self.unknown_tagged_fields.len();
473            if num_tagged_fields > std::u32::MAX as usize {
474                bail!(
475                    "Too many tagged fields to encode ({} fields)",
476                    num_tagged_fields
477                );
478            }
479            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
480
481            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
482        }
483        Ok(total_size)
484    }
485}
486
487#[cfg(feature = "broker")]
488impl Decodable for ListOffsetsTopic {
489    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
490        let name = if version >= 6 {
491            types::CompactString.decode(buf)?
492        } else {
493            types::String.decode(buf)?
494        };
495        let partitions = if version >= 6 {
496            types::CompactArray(types::Struct { version }).decode(buf)?
497        } else {
498            types::Array(types::Struct { version }).decode(buf)?
499        };
500        let mut unknown_tagged_fields = BTreeMap::new();
501        if version >= 6 {
502            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
503            for _ in 0..num_tagged_fields {
504                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
505                let size: u32 = types::UnsignedVarInt.decode(buf)?;
506                let unknown_value = buf.try_get_bytes(size as usize)?;
507                unknown_tagged_fields.insert(tag as i32, unknown_value);
508            }
509        }
510        Ok(Self {
511            name,
512            partitions,
513            unknown_tagged_fields,
514        })
515    }
516}
517
518impl Default for ListOffsetsTopic {
519    fn default() -> Self {
520        Self {
521            name: Default::default(),
522            partitions: Default::default(),
523            unknown_tagged_fields: BTreeMap::new(),
524        }
525    }
526}
527
528impl Message for ListOffsetsTopic {
529    const VERSIONS: VersionRange = VersionRange { min: 0, max: 8 };
530    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
531}
532
533impl HeaderVersion for ListOffsetsRequest {
534    fn header_version(version: i16) -> i16 {
535        if version >= 6 {
536            2
537        } else {
538            1
539        }
540    }
541}