kafka_protocol/messages/
list_offsets_request.rs

1//! ListOffsetsRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ListOffsetsRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-9
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ListOffsetsPartition {
24    /// The partition index.
25    ///
26    /// Supported API versions: 0-9
27    pub partition_index: i32,
28
29    /// The current leader epoch.
30    ///
31    /// Supported API versions: 4-9
32    pub current_leader_epoch: i32,
33
34    /// The current timestamp.
35    ///
36    /// Supported API versions: 0-9
37    pub timestamp: i64,
38
39    /// The maximum number of offsets to report.
40    ///
41    /// Supported API versions: 0
42    pub max_num_offsets: i32,
43
44    /// Other tagged fields
45    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl ListOffsetsPartition {
49    /// Sets `partition_index` to the passed value.
50    ///
51    /// The partition index.
52    ///
53    /// Supported API versions: 0-9
54    pub fn with_partition_index(mut self, value: i32) -> Self {
55        self.partition_index = value;
56        self
57    }
58    /// Sets `current_leader_epoch` to the passed value.
59    ///
60    /// The current leader epoch.
61    ///
62    /// Supported API versions: 4-9
63    pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
64        self.current_leader_epoch = value;
65        self
66    }
67    /// Sets `timestamp` to the passed value.
68    ///
69    /// The current timestamp.
70    ///
71    /// Supported API versions: 0-9
72    pub fn with_timestamp(mut self, value: i64) -> Self {
73        self.timestamp = value;
74        self
75    }
76    /// Sets `max_num_offsets` to the passed value.
77    ///
78    /// The maximum number of offsets to report.
79    ///
80    /// Supported API versions: 0
81    pub fn with_max_num_offsets(mut self, value: i32) -> Self {
82        self.max_num_offsets = value;
83        self
84    }
85    /// Sets unknown_tagged_fields to the passed value.
86    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87        self.unknown_tagged_fields = value;
88        self
89    }
90    /// Inserts an entry into unknown_tagged_fields.
91    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92        self.unknown_tagged_fields.insert(key, value);
93        self
94    }
95}
96
97#[cfg(feature = "client")]
98impl Encodable for ListOffsetsPartition {
99    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100        if version < 0 || version > 9 {
101            bail!("specified version not supported by this message type");
102        }
103        types::Int32.encode(buf, &self.partition_index)?;
104        if version >= 4 {
105            types::Int32.encode(buf, &self.current_leader_epoch)?;
106        }
107        types::Int64.encode(buf, &self.timestamp)?;
108        if version == 0 {
109            types::Int32.encode(buf, &self.max_num_offsets)?;
110        } else {
111            if self.max_num_offsets != 1 {
112                bail!("A field is set that is not available on the selected protocol version");
113            }
114        }
115        if version >= 6 {
116            let num_tagged_fields = self.unknown_tagged_fields.len();
117            if num_tagged_fields > std::u32::MAX as usize {
118                bail!(
119                    "Too many tagged fields to encode ({} fields)",
120                    num_tagged_fields
121                );
122            }
123            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
124
125            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
126        }
127        Ok(())
128    }
129    fn compute_size(&self, version: i16) -> Result<usize> {
130        let mut total_size = 0;
131        total_size += types::Int32.compute_size(&self.partition_index)?;
132        if version >= 4 {
133            total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
134        }
135        total_size += types::Int64.compute_size(&self.timestamp)?;
136        if version == 0 {
137            total_size += types::Int32.compute_size(&self.max_num_offsets)?;
138        } else {
139            if self.max_num_offsets != 1 {
140                bail!("A field is set that is not available on the selected protocol version");
141            }
142        }
143        if version >= 6 {
144            let num_tagged_fields = self.unknown_tagged_fields.len();
145            if num_tagged_fields > std::u32::MAX as usize {
146                bail!(
147                    "Too many tagged fields to encode ({} fields)",
148                    num_tagged_fields
149                );
150            }
151            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
152
153            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
154        }
155        Ok(total_size)
156    }
157}
158
159#[cfg(feature = "broker")]
160impl Decodable for ListOffsetsPartition {
161    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
162        if version < 0 || version > 9 {
163            bail!("specified version not supported by this message type");
164        }
165        let partition_index = types::Int32.decode(buf)?;
166        let current_leader_epoch = if version >= 4 {
167            types::Int32.decode(buf)?
168        } else {
169            -1
170        };
171        let timestamp = types::Int64.decode(buf)?;
172        let max_num_offsets = if version == 0 {
173            types::Int32.decode(buf)?
174        } else {
175            1
176        };
177        let mut unknown_tagged_fields = BTreeMap::new();
178        if version >= 6 {
179            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
180            for _ in 0..num_tagged_fields {
181                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
182                let size: u32 = types::UnsignedVarInt.decode(buf)?;
183                let unknown_value = buf.try_get_bytes(size as usize)?;
184                unknown_tagged_fields.insert(tag as i32, unknown_value);
185            }
186        }
187        Ok(Self {
188            partition_index,
189            current_leader_epoch,
190            timestamp,
191            max_num_offsets,
192            unknown_tagged_fields,
193        })
194    }
195}
196
197impl Default for ListOffsetsPartition {
198    fn default() -> Self {
199        Self {
200            partition_index: 0,
201            current_leader_epoch: -1,
202            timestamp: 0,
203            max_num_offsets: 1,
204            unknown_tagged_fields: BTreeMap::new(),
205        }
206    }
207}
208
209impl Message for ListOffsetsPartition {
210    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
211    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
212}
213
214/// Valid versions: 0-9
215#[non_exhaustive]
216#[derive(Debug, Clone, PartialEq)]
217pub struct ListOffsetsRequest {
218    /// The broker ID of the requester, or -1 if this request is being made by a normal consumer.
219    ///
220    /// Supported API versions: 0-9
221    pub replica_id: super::BrokerId,
222
223    /// This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records
224    ///
225    /// Supported API versions: 2-9
226    pub isolation_level: i8,
227
228    /// Each topic in the request.
229    ///
230    /// Supported API versions: 0-9
231    pub topics: Vec<ListOffsetsTopic>,
232
233    /// Other tagged fields
234    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
235}
236
237impl ListOffsetsRequest {
238    /// Sets `replica_id` to the passed value.
239    ///
240    /// The broker ID of the requester, or -1 if this request is being made by a normal consumer.
241    ///
242    /// Supported API versions: 0-9
243    pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
244        self.replica_id = value;
245        self
246    }
247    /// Sets `isolation_level` to the passed value.
248    ///
249    /// This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records
250    ///
251    /// Supported API versions: 2-9
252    pub fn with_isolation_level(mut self, value: i8) -> Self {
253        self.isolation_level = value;
254        self
255    }
256    /// Sets `topics` to the passed value.
257    ///
258    /// Each topic in the request.
259    ///
260    /// Supported API versions: 0-9
261    pub fn with_topics(mut self, value: Vec<ListOffsetsTopic>) -> Self {
262        self.topics = value;
263        self
264    }
265    /// Sets unknown_tagged_fields to the passed value.
266    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
267        self.unknown_tagged_fields = value;
268        self
269    }
270    /// Inserts an entry into unknown_tagged_fields.
271    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
272        self.unknown_tagged_fields.insert(key, value);
273        self
274    }
275}
276
277#[cfg(feature = "client")]
278impl Encodable for ListOffsetsRequest {
279    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
280        if version < 0 || version > 9 {
281            bail!("specified version not supported by this message type");
282        }
283        types::Int32.encode(buf, &self.replica_id)?;
284        if version >= 2 {
285            types::Int8.encode(buf, &self.isolation_level)?;
286        } else {
287            if self.isolation_level != 0 {
288                bail!("A field is set that is not available on the selected protocol version");
289            }
290        }
291        if version >= 6 {
292            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
293        } else {
294            types::Array(types::Struct { version }).encode(buf, &self.topics)?;
295        }
296        if version >= 6 {
297            let num_tagged_fields = self.unknown_tagged_fields.len();
298            if num_tagged_fields > std::u32::MAX as usize {
299                bail!(
300                    "Too many tagged fields to encode ({} fields)",
301                    num_tagged_fields
302                );
303            }
304            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
305
306            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
307        }
308        Ok(())
309    }
310    fn compute_size(&self, version: i16) -> Result<usize> {
311        let mut total_size = 0;
312        total_size += types::Int32.compute_size(&self.replica_id)?;
313        if version >= 2 {
314            total_size += types::Int8.compute_size(&self.isolation_level)?;
315        } else {
316            if self.isolation_level != 0 {
317                bail!("A field is set that is not available on the selected protocol version");
318            }
319        }
320        if version >= 6 {
321            total_size +=
322                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
323        } else {
324            total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
325        }
326        if version >= 6 {
327            let num_tagged_fields = self.unknown_tagged_fields.len();
328            if num_tagged_fields > std::u32::MAX as usize {
329                bail!(
330                    "Too many tagged fields to encode ({} fields)",
331                    num_tagged_fields
332                );
333            }
334            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
335
336            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
337        }
338        Ok(total_size)
339    }
340}
341
342#[cfg(feature = "broker")]
343impl Decodable for ListOffsetsRequest {
344    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
345        if version < 0 || version > 9 {
346            bail!("specified version not supported by this message type");
347        }
348        let replica_id = types::Int32.decode(buf)?;
349        let isolation_level = if version >= 2 {
350            types::Int8.decode(buf)?
351        } else {
352            0
353        };
354        let topics = if version >= 6 {
355            types::CompactArray(types::Struct { version }).decode(buf)?
356        } else {
357            types::Array(types::Struct { version }).decode(buf)?
358        };
359        let mut unknown_tagged_fields = BTreeMap::new();
360        if version >= 6 {
361            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
362            for _ in 0..num_tagged_fields {
363                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
364                let size: u32 = types::UnsignedVarInt.decode(buf)?;
365                let unknown_value = buf.try_get_bytes(size as usize)?;
366                unknown_tagged_fields.insert(tag as i32, unknown_value);
367            }
368        }
369        Ok(Self {
370            replica_id,
371            isolation_level,
372            topics,
373            unknown_tagged_fields,
374        })
375    }
376}
377
378impl Default for ListOffsetsRequest {
379    fn default() -> Self {
380        Self {
381            replica_id: (0).into(),
382            isolation_level: 0,
383            topics: Default::default(),
384            unknown_tagged_fields: BTreeMap::new(),
385        }
386    }
387}
388
389impl Message for ListOffsetsRequest {
390    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
391    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
392}
393
394/// Valid versions: 0-9
395#[non_exhaustive]
396#[derive(Debug, Clone, PartialEq)]
397pub struct ListOffsetsTopic {
398    /// The topic name.
399    ///
400    /// Supported API versions: 0-9
401    pub name: super::TopicName,
402
403    /// Each partition in the request.
404    ///
405    /// Supported API versions: 0-9
406    pub partitions: Vec<ListOffsetsPartition>,
407
408    /// Other tagged fields
409    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
410}
411
412impl ListOffsetsTopic {
413    /// Sets `name` to the passed value.
414    ///
415    /// The topic name.
416    ///
417    /// Supported API versions: 0-9
418    pub fn with_name(mut self, value: super::TopicName) -> Self {
419        self.name = value;
420        self
421    }
422    /// Sets `partitions` to the passed value.
423    ///
424    /// Each partition in the request.
425    ///
426    /// Supported API versions: 0-9
427    pub fn with_partitions(mut self, value: Vec<ListOffsetsPartition>) -> Self {
428        self.partitions = value;
429        self
430    }
431    /// Sets unknown_tagged_fields to the passed value.
432    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
433        self.unknown_tagged_fields = value;
434        self
435    }
436    /// Inserts an entry into unknown_tagged_fields.
437    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
438        self.unknown_tagged_fields.insert(key, value);
439        self
440    }
441}
442
443#[cfg(feature = "client")]
444impl Encodable for ListOffsetsTopic {
445    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
446        if version < 0 || version > 9 {
447            bail!("specified version not supported by this message type");
448        }
449        if version >= 6 {
450            types::CompactString.encode(buf, &self.name)?;
451        } else {
452            types::String.encode(buf, &self.name)?;
453        }
454        if version >= 6 {
455            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
456        } else {
457            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
458        }
459        if version >= 6 {
460            let num_tagged_fields = self.unknown_tagged_fields.len();
461            if num_tagged_fields > std::u32::MAX as usize {
462                bail!(
463                    "Too many tagged fields to encode ({} fields)",
464                    num_tagged_fields
465                );
466            }
467            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
468
469            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
470        }
471        Ok(())
472    }
473    fn compute_size(&self, version: i16) -> Result<usize> {
474        let mut total_size = 0;
475        if version >= 6 {
476            total_size += types::CompactString.compute_size(&self.name)?;
477        } else {
478            total_size += types::String.compute_size(&self.name)?;
479        }
480        if version >= 6 {
481            total_size +=
482                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
483        } else {
484            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
485        }
486        if version >= 6 {
487            let num_tagged_fields = self.unknown_tagged_fields.len();
488            if num_tagged_fields > std::u32::MAX as usize {
489                bail!(
490                    "Too many tagged fields to encode ({} fields)",
491                    num_tagged_fields
492                );
493            }
494            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
495
496            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
497        }
498        Ok(total_size)
499    }
500}
501
502#[cfg(feature = "broker")]
503impl Decodable for ListOffsetsTopic {
504    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
505        if version < 0 || version > 9 {
506            bail!("specified version not supported by this message type");
507        }
508        let name = if version >= 6 {
509            types::CompactString.decode(buf)?
510        } else {
511            types::String.decode(buf)?
512        };
513        let partitions = if version >= 6 {
514            types::CompactArray(types::Struct { version }).decode(buf)?
515        } else {
516            types::Array(types::Struct { version }).decode(buf)?
517        };
518        let mut unknown_tagged_fields = BTreeMap::new();
519        if version >= 6 {
520            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
521            for _ in 0..num_tagged_fields {
522                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
523                let size: u32 = types::UnsignedVarInt.decode(buf)?;
524                let unknown_value = buf.try_get_bytes(size as usize)?;
525                unknown_tagged_fields.insert(tag as i32, unknown_value);
526            }
527        }
528        Ok(Self {
529            name,
530            partitions,
531            unknown_tagged_fields,
532        })
533    }
534}
535
536impl Default for ListOffsetsTopic {
537    fn default() -> Self {
538        Self {
539            name: Default::default(),
540            partitions: Default::default(),
541            unknown_tagged_fields: BTreeMap::new(),
542        }
543    }
544}
545
546impl Message for ListOffsetsTopic {
547    const VERSIONS: VersionRange = VersionRange { min: 0, max: 9 };
548    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
549}
550
551impl HeaderVersion for ListOffsetsRequest {
552    fn header_version(version: i16) -> i16 {
553        if version >= 6 {
554            2
555        } else {
556            1
557        }
558    }
559}