kafka_protocol/messages/
list_offsets_request.rs

1//! ListOffsetsRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ListOffsetsRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 1-10
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ListOffsetsPartition {
24    /// The partition index.
25    ///
26    /// Supported API versions: 1-10
27    pub partition_index: i32,
28
29    /// The current leader epoch.
30    ///
31    /// Supported API versions: 4-10
32    pub current_leader_epoch: i32,
33
34    /// The current timestamp.
35    ///
36    /// Supported API versions: 1-10
37    pub timestamp: i64,
38
39    /// Other tagged fields
40    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl ListOffsetsPartition {
44    /// Sets `partition_index` to the passed value.
45    ///
46    /// The partition index.
47    ///
48    /// Supported API versions: 1-10
49    pub fn with_partition_index(mut self, value: i32) -> Self {
50        self.partition_index = value;
51        self
52    }
53    /// Sets `current_leader_epoch` to the passed value.
54    ///
55    /// The current leader epoch.
56    ///
57    /// Supported API versions: 4-10
58    pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
59        self.current_leader_epoch = value;
60        self
61    }
62    /// Sets `timestamp` to the passed value.
63    ///
64    /// The current timestamp.
65    ///
66    /// Supported API versions: 1-10
67    pub fn with_timestamp(mut self, value: i64) -> Self {
68        self.timestamp = value;
69        self
70    }
71    /// Sets unknown_tagged_fields to the passed value.
72    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73        self.unknown_tagged_fields = value;
74        self
75    }
76    /// Inserts an entry into unknown_tagged_fields.
77    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78        self.unknown_tagged_fields.insert(key, value);
79        self
80    }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for ListOffsetsPartition {
85    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86        if version < 1 || version > 10 {
87            bail!("specified version not supported by this message type");
88        }
89        types::Int32.encode(buf, &self.partition_index)?;
90        if version >= 4 {
91            types::Int32.encode(buf, &self.current_leader_epoch)?;
92        }
93        types::Int64.encode(buf, &self.timestamp)?;
94        if version >= 6 {
95            let num_tagged_fields = self.unknown_tagged_fields.len();
96            if num_tagged_fields > std::u32::MAX as usize {
97                bail!(
98                    "Too many tagged fields to encode ({} fields)",
99                    num_tagged_fields
100                );
101            }
102            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
103
104            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
105        }
106        Ok(())
107    }
108    fn compute_size(&self, version: i16) -> Result<usize> {
109        let mut total_size = 0;
110        total_size += types::Int32.compute_size(&self.partition_index)?;
111        if version >= 4 {
112            total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
113        }
114        total_size += types::Int64.compute_size(&self.timestamp)?;
115        if version >= 6 {
116            let num_tagged_fields = self.unknown_tagged_fields.len();
117            if num_tagged_fields > std::u32::MAX as usize {
118                bail!(
119                    "Too many tagged fields to encode ({} fields)",
120                    num_tagged_fields
121                );
122            }
123            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
124
125            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
126        }
127        Ok(total_size)
128    }
129}
130
131#[cfg(feature = "broker")]
132impl Decodable for ListOffsetsPartition {
133    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
134        if version < 1 || version > 10 {
135            bail!("specified version not supported by this message type");
136        }
137        let partition_index = types::Int32.decode(buf)?;
138        let current_leader_epoch = if version >= 4 {
139            types::Int32.decode(buf)?
140        } else {
141            -1
142        };
143        let timestamp = types::Int64.decode(buf)?;
144        let mut unknown_tagged_fields = BTreeMap::new();
145        if version >= 6 {
146            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
147            for _ in 0..num_tagged_fields {
148                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
149                let size: u32 = types::UnsignedVarInt.decode(buf)?;
150                let unknown_value = buf.try_get_bytes(size as usize)?;
151                unknown_tagged_fields.insert(tag as i32, unknown_value);
152            }
153        }
154        Ok(Self {
155            partition_index,
156            current_leader_epoch,
157            timestamp,
158            unknown_tagged_fields,
159        })
160    }
161}
162
163impl Default for ListOffsetsPartition {
164    fn default() -> Self {
165        Self {
166            partition_index: 0,
167            current_leader_epoch: -1,
168            timestamp: 0,
169            unknown_tagged_fields: BTreeMap::new(),
170        }
171    }
172}
173
174impl Message for ListOffsetsPartition {
175    const VERSIONS: VersionRange = VersionRange { min: 1, max: 10 };
176    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
177}
178
179/// Valid versions: 1-10
180#[non_exhaustive]
181#[derive(Debug, Clone, PartialEq)]
182pub struct ListOffsetsRequest {
183    /// The broker ID of the requester, or -1 if this request is being made by a normal consumer.
184    ///
185    /// Supported API versions: 1-10
186    pub replica_id: super::BrokerId,
187
188    /// This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records.
189    ///
190    /// Supported API versions: 2-10
191    pub isolation_level: i8,
192
193    /// Each topic in the request.
194    ///
195    /// Supported API versions: 1-10
196    pub topics: Vec<ListOffsetsTopic>,
197
198    /// The timeout to await a response in milliseconds for requests that require reading from remote storage for topics enabled with tiered storage.
199    ///
200    /// Supported API versions: 10
201    pub timeout_ms: i32,
202
203    /// Other tagged fields
204    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
205}
206
207impl ListOffsetsRequest {
208    /// Sets `replica_id` to the passed value.
209    ///
210    /// The broker ID of the requester, or -1 if this request is being made by a normal consumer.
211    ///
212    /// Supported API versions: 1-10
213    pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
214        self.replica_id = value;
215        self
216    }
217    /// Sets `isolation_level` to the passed value.
218    ///
219    /// This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records.
220    ///
221    /// Supported API versions: 2-10
222    pub fn with_isolation_level(mut self, value: i8) -> Self {
223        self.isolation_level = value;
224        self
225    }
226    /// Sets `topics` to the passed value.
227    ///
228    /// Each topic in the request.
229    ///
230    /// Supported API versions: 1-10
231    pub fn with_topics(mut self, value: Vec<ListOffsetsTopic>) -> Self {
232        self.topics = value;
233        self
234    }
235    /// Sets `timeout_ms` to the passed value.
236    ///
237    /// The timeout to await a response in milliseconds for requests that require reading from remote storage for topics enabled with tiered storage.
238    ///
239    /// Supported API versions: 10
240    pub fn with_timeout_ms(mut self, value: i32) -> Self {
241        self.timeout_ms = value;
242        self
243    }
244    /// Sets unknown_tagged_fields to the passed value.
245    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
246        self.unknown_tagged_fields = value;
247        self
248    }
249    /// Inserts an entry into unknown_tagged_fields.
250    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
251        self.unknown_tagged_fields.insert(key, value);
252        self
253    }
254}
255
256#[cfg(feature = "client")]
257impl Encodable for ListOffsetsRequest {
258    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
259        if version < 1 || version > 10 {
260            bail!("specified version not supported by this message type");
261        }
262        types::Int32.encode(buf, &self.replica_id)?;
263        if version >= 2 {
264            types::Int8.encode(buf, &self.isolation_level)?;
265        } else {
266            if self.isolation_level != 0 {
267                bail!("A field is set that is not available on the selected protocol version");
268            }
269        }
270        if version >= 6 {
271            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
272        } else {
273            types::Array(types::Struct { version }).encode(buf, &self.topics)?;
274        }
275        if version >= 10 {
276            types::Int32.encode(buf, &self.timeout_ms)?;
277        }
278        if version >= 6 {
279            let num_tagged_fields = self.unknown_tagged_fields.len();
280            if num_tagged_fields > std::u32::MAX as usize {
281                bail!(
282                    "Too many tagged fields to encode ({} fields)",
283                    num_tagged_fields
284                );
285            }
286            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
287
288            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
289        }
290        Ok(())
291    }
292    fn compute_size(&self, version: i16) -> Result<usize> {
293        let mut total_size = 0;
294        total_size += types::Int32.compute_size(&self.replica_id)?;
295        if version >= 2 {
296            total_size += types::Int8.compute_size(&self.isolation_level)?;
297        } else {
298            if self.isolation_level != 0 {
299                bail!("A field is set that is not available on the selected protocol version");
300            }
301        }
302        if version >= 6 {
303            total_size +=
304                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
305        } else {
306            total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
307        }
308        if version >= 10 {
309            total_size += types::Int32.compute_size(&self.timeout_ms)?;
310        }
311        if version >= 6 {
312            let num_tagged_fields = self.unknown_tagged_fields.len();
313            if num_tagged_fields > std::u32::MAX as usize {
314                bail!(
315                    "Too many tagged fields to encode ({} fields)",
316                    num_tagged_fields
317                );
318            }
319            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
320
321            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
322        }
323        Ok(total_size)
324    }
325}
326
327#[cfg(feature = "broker")]
328impl Decodable for ListOffsetsRequest {
329    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
330        if version < 1 || version > 10 {
331            bail!("specified version not supported by this message type");
332        }
333        let replica_id = types::Int32.decode(buf)?;
334        let isolation_level = if version >= 2 {
335            types::Int8.decode(buf)?
336        } else {
337            0
338        };
339        let topics = if version >= 6 {
340            types::CompactArray(types::Struct { version }).decode(buf)?
341        } else {
342            types::Array(types::Struct { version }).decode(buf)?
343        };
344        let timeout_ms = if version >= 10 {
345            types::Int32.decode(buf)?
346        } else {
347            0
348        };
349        let mut unknown_tagged_fields = BTreeMap::new();
350        if version >= 6 {
351            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
352            for _ in 0..num_tagged_fields {
353                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
354                let size: u32 = types::UnsignedVarInt.decode(buf)?;
355                let unknown_value = buf.try_get_bytes(size as usize)?;
356                unknown_tagged_fields.insert(tag as i32, unknown_value);
357            }
358        }
359        Ok(Self {
360            replica_id,
361            isolation_level,
362            topics,
363            timeout_ms,
364            unknown_tagged_fields,
365        })
366    }
367}
368
369impl Default for ListOffsetsRequest {
370    fn default() -> Self {
371        Self {
372            replica_id: (0).into(),
373            isolation_level: 0,
374            topics: Default::default(),
375            timeout_ms: 0,
376            unknown_tagged_fields: BTreeMap::new(),
377        }
378    }
379}
380
381impl Message for ListOffsetsRequest {
382    const VERSIONS: VersionRange = VersionRange { min: 1, max: 10 };
383    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
384}
385
386/// Valid versions: 1-10
387#[non_exhaustive]
388#[derive(Debug, Clone, PartialEq)]
389pub struct ListOffsetsTopic {
390    /// The topic name.
391    ///
392    /// Supported API versions: 1-10
393    pub name: super::TopicName,
394
395    /// Each partition in the request.
396    ///
397    /// Supported API versions: 1-10
398    pub partitions: Vec<ListOffsetsPartition>,
399
400    /// Other tagged fields
401    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
402}
403
404impl ListOffsetsTopic {
405    /// Sets `name` to the passed value.
406    ///
407    /// The topic name.
408    ///
409    /// Supported API versions: 1-10
410    pub fn with_name(mut self, value: super::TopicName) -> Self {
411        self.name = value;
412        self
413    }
414    /// Sets `partitions` to the passed value.
415    ///
416    /// Each partition in the request.
417    ///
418    /// Supported API versions: 1-10
419    pub fn with_partitions(mut self, value: Vec<ListOffsetsPartition>) -> Self {
420        self.partitions = value;
421        self
422    }
423    /// Sets unknown_tagged_fields to the passed value.
424    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
425        self.unknown_tagged_fields = value;
426        self
427    }
428    /// Inserts an entry into unknown_tagged_fields.
429    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
430        self.unknown_tagged_fields.insert(key, value);
431        self
432    }
433}
434
435#[cfg(feature = "client")]
436impl Encodable for ListOffsetsTopic {
437    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
438        if version < 1 || version > 10 {
439            bail!("specified version not supported by this message type");
440        }
441        if version >= 6 {
442            types::CompactString.encode(buf, &self.name)?;
443        } else {
444            types::String.encode(buf, &self.name)?;
445        }
446        if version >= 6 {
447            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
448        } else {
449            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
450        }
451        if version >= 6 {
452            let num_tagged_fields = self.unknown_tagged_fields.len();
453            if num_tagged_fields > std::u32::MAX as usize {
454                bail!(
455                    "Too many tagged fields to encode ({} fields)",
456                    num_tagged_fields
457                );
458            }
459            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
460
461            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
462        }
463        Ok(())
464    }
465    fn compute_size(&self, version: i16) -> Result<usize> {
466        let mut total_size = 0;
467        if version >= 6 {
468            total_size += types::CompactString.compute_size(&self.name)?;
469        } else {
470            total_size += types::String.compute_size(&self.name)?;
471        }
472        if version >= 6 {
473            total_size +=
474                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
475        } else {
476            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
477        }
478        if version >= 6 {
479            let num_tagged_fields = self.unknown_tagged_fields.len();
480            if num_tagged_fields > std::u32::MAX as usize {
481                bail!(
482                    "Too many tagged fields to encode ({} fields)",
483                    num_tagged_fields
484                );
485            }
486            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
487
488            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
489        }
490        Ok(total_size)
491    }
492}
493
494#[cfg(feature = "broker")]
495impl Decodable for ListOffsetsTopic {
496    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
497        if version < 1 || version > 10 {
498            bail!("specified version not supported by this message type");
499        }
500        let name = if version >= 6 {
501            types::CompactString.decode(buf)?
502        } else {
503            types::String.decode(buf)?
504        };
505        let partitions = if version >= 6 {
506            types::CompactArray(types::Struct { version }).decode(buf)?
507        } else {
508            types::Array(types::Struct { version }).decode(buf)?
509        };
510        let mut unknown_tagged_fields = BTreeMap::new();
511        if version >= 6 {
512            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
513            for _ in 0..num_tagged_fields {
514                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
515                let size: u32 = types::UnsignedVarInt.decode(buf)?;
516                let unknown_value = buf.try_get_bytes(size as usize)?;
517                unknown_tagged_fields.insert(tag as i32, unknown_value);
518            }
519        }
520        Ok(Self {
521            name,
522            partitions,
523            unknown_tagged_fields,
524        })
525    }
526}
527
528impl Default for ListOffsetsTopic {
529    fn default() -> Self {
530        Self {
531            name: Default::default(),
532            partitions: Default::default(),
533            unknown_tagged_fields: BTreeMap::new(),
534        }
535    }
536}
537
538impl Message for ListOffsetsTopic {
539    const VERSIONS: VersionRange = VersionRange { min: 1, max: 10 };
540    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
541}
542
543impl HeaderVersion for ListOffsetsRequest {
544    fn header_version(version: i16) -> i16 {
545        if version >= 6 {
546            2
547        } else {
548            1
549        }
550    }
551}