rskafka/protocol/messages/
list_offsets.rs

1//! `ListOffsets` request and response.
2//!
3//! # References
4//! - [KIP-79](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090)
5//! - [KIP-98](https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging)
6use std::io::{Read, Write};
7
8use crate::protocol::{
9    api_key::ApiKey,
10    api_version::{ApiVersion, ApiVersionRange},
11    error::Error as ApiError,
12    messages::{read_versioned_array, write_versioned_array, IsolationLevel},
13    primitives::{Array, Int16, Int32, Int64, Int8, String_},
14    traits::{ReadType, WriteType},
15};
16
17use super::{
18    ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
19};
20
21#[derive(Debug)]
22#[allow(missing_copy_implementations)]
23pub struct ListOffsetsRequestPartition {
24    /// The partition index.
25    pub partition_index: Int32,
26
27    /// The current timestamp.
28    ///
29    /// Depending on the version this will return:
30    ///
31    /// - **version 0:** `max_num_offsets` offsets that are smaller/equal than this timestamp.
32    /// - **version 1 and later:** return timestamp and offset of the first/message greater/equal than this timestamp
33    ///
34    /// Per [KIP-79] this can have the following special values:
35    ///
36    /// - `-1`: latest offset
37    /// - `-2`: earlist offset
38    ///
39    /// [KIP-79]: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090
40    pub timestamp: Int64,
41
42    /// The maximum number of offsets to report.
43    ///
44    /// Defaults to 1.
45    ///
46    /// Removed in version 1.
47    pub max_num_offsets: Option<Int32>,
48}
49
50impl<W> WriteVersionedType<W> for ListOffsetsRequestPartition
51where
52    W: Write,
53{
54    fn write_versioned(
55        &self,
56        writer: &mut W,
57        version: ApiVersion,
58    ) -> Result<(), WriteVersionedError> {
59        let v = version.0 .0;
60        assert!(v <= 3);
61
62        self.partition_index.write(writer)?;
63        self.timestamp.write(writer)?;
64
65        if v < 1 {
66            // Only fetch 1 offset by default.
67            self.max_num_offsets.unwrap_or(Int32(1)).write(writer)?;
68        }
69
70        Ok(())
71    }
72}
73
74#[derive(Debug)]
75pub struct ListOffsetsRequestTopic {
76    /// The topic name.
77    pub name: String_,
78
79    /// Each partition in the request.
80    ///
81    /// Note: A partition may only appear once within the request.
82    pub partitions: Vec<ListOffsetsRequestPartition>,
83}
84
85impl<W> WriteVersionedType<W> for ListOffsetsRequestTopic
86where
87    W: Write,
88{
89    fn write_versioned(
90        &self,
91        writer: &mut W,
92        version: ApiVersion,
93    ) -> Result<(), WriteVersionedError> {
94        let v = version.0 .0;
95        assert!(v <= 3);
96
97        self.name.write(writer)?;
98        write_versioned_array(writer, version, Some(&self.partitions))?;
99
100        Ok(())
101    }
102}
103
104#[derive(Debug)]
105pub struct ListOffsetsRequest {
106    /// The broker ID of the requestor, or -1 if this request is being made by a normal consumer.
107    pub replica_id: Int32,
108
109    /// This setting controls the visibility of transactional records.
110    ///
111    /// Using `READ_UNCOMMITTED` (`isolation_level = 0`) makes all records visible. With `READ_COMMITTED`
112    /// (`isolation_level = 1`), non-transactional and `COMMITTED` transactional records are visible. To be more
113    /// concrete, `READ_COMMITTED` returns all data from offsets smaller than the current LSO (last stable offset), and
114    /// enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard
115    /// `ABORTED` transactional records.
116    ///
117    /// As per [KIP-98] the default is `READ_UNCOMMITTED`.
118    ///
119    /// Added in version 2.
120    ///
121    /// [KIP-98]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
122    pub isolation_level: Option<IsolationLevel>,
123
124    /// Each topic in the request.
125    ///
126    /// Note: A topic may only appear once within the request.
127    pub topics: Vec<ListOffsetsRequestTopic>,
128}
129
130impl<W> WriteVersionedType<W> for ListOffsetsRequest
131where
132    W: Write,
133{
134    fn write_versioned(
135        &self,
136        writer: &mut W,
137        version: ApiVersion,
138    ) -> Result<(), WriteVersionedError> {
139        let v = version.0 .0;
140        assert!(v <= 3);
141
142        self.replica_id.write(writer)?;
143
144        if v >= 2 {
145            // The default is `READ_UNCOMMITTED`.
146            let level: Int8 = self.isolation_level.unwrap_or_default().into();
147            level.write(writer)?;
148        }
149
150        write_versioned_array(writer, version, Some(&self.topics))?;
151
152        Ok(())
153    }
154}
155
156impl RequestBody for ListOffsetsRequest {
157    type ResponseBody = ListOffsetsResponse;
158
159    const API_KEY: ApiKey = ApiKey::ListOffsets;
160
161    /// At the time of writing this is the same subset supported by rdkafka
162    const API_VERSION_RANGE: ApiVersionRange =
163        ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(3)));
164
165    const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(6));
166}
167
168#[derive(Debug)]
169pub struct ListOffsetsResponsePartition {
170    /// The partition index.
171    pub partition_index: Int32,
172
173    /// The partition error code, or 0 if there was no error.
174    pub error_code: Option<ApiError>,
175
176    /// The result offsets.
177    ///
178    /// Removed in version 1.
179    pub old_style_offsets: Option<Array<Int64>>,
180
181    /// The timestamp associated with the returned offset.
182    ///
183    /// Added in version 1.
184    pub timestamp: Option<Int64>,
185
186    /// The returned offset.
187    ///
188    /// Added in version 1.
189    pub offset: Option<Int64>,
190}
191
192impl<R> ReadVersionedType<R> for ListOffsetsResponsePartition
193where
194    R: Read,
195{
196    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
197        let v = version.0 .0;
198        assert!(v <= 3);
199
200        Ok(Self {
201            partition_index: Int32::read(reader)?,
202            error_code: ApiError::new(Int16::read(reader)?.0),
203            old_style_offsets: (v < 1).then(|| Array::read(reader)).transpose()?,
204            timestamp: (v >= 1).then(|| Int64::read(reader)).transpose()?,
205            offset: (v >= 1).then(|| Int64::read(reader)).transpose()?,
206        })
207    }
208}
209
210#[derive(Debug)]
211pub struct ListOffsetsResponseTopic {
212    /// The topic name.
213    pub name: String_,
214
215    /// Each partition in the response.
216    pub partitions: Vec<ListOffsetsResponsePartition>,
217}
218
219impl<R> ReadVersionedType<R> for ListOffsetsResponseTopic
220where
221    R: Read,
222{
223    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
224        let v = version.0 .0;
225        assert!(v <= 3);
226
227        Ok(Self {
228            name: String_::read(reader)?,
229            partitions: read_versioned_array(reader, version)?.unwrap_or_default(),
230        })
231    }
232}
233
234#[derive(Debug)]
235pub struct ListOffsetsResponse {
236    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
237    ///
238    /// Added in version 2.
239    pub throttle_time_ms: Option<Int32>,
240
241    /// Each topic in the response.
242    pub topics: Vec<ListOffsetsResponseTopic>,
243}
244
245impl<R> ReadVersionedType<R> for ListOffsetsResponse
246where
247    R: Read,
248{
249    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
250        let v = version.0 .0;
251        assert!(v <= 3);
252
253        Ok(Self {
254            throttle_time_ms: (v >= 2).then(|| Int32::read(reader)).transpose()?,
255            topics: read_versioned_array(reader, version)?.unwrap_or_default(),
256        })
257    }
258}