rskafka/protocol/messages/
fetch.rs

1use std::io::{Read, Write};
2
3use crate::protocol::{
4    api_key::ApiKey,
5    api_version::{ApiVersion, ApiVersionRange},
6    error::Error as ApiError,
7    messages::{read_versioned_array, write_versioned_array, IsolationLevel},
8    primitives::{Int16, Int32, Int64, Int8, Records, String_},
9    traits::{ReadType, WriteType},
10};
11
12use super::{
13    ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
14};
15
16#[derive(Debug)]
17#[allow(missing_copy_implementations)]
18pub struct FetchRequestPartition {
19    /// The partition index.
20    pub partition: Int32,
21
22    /// The message offset.
23    pub fetch_offset: Int64,
24
25    /// The maximum bytes to fetch from this partition.
26    ///
27    /// See KIP-74 for cases where this limit may not be honored.
28    pub partition_max_bytes: Int32,
29}
30
31impl<W> WriteVersionedType<W> for FetchRequestPartition
32where
33    W: Write,
34{
35    fn write_versioned(
36        &self,
37        writer: &mut W,
38        version: ApiVersion,
39    ) -> Result<(), WriteVersionedError> {
40        let v = version.0 .0;
41        assert!(v <= 4);
42
43        self.partition.write(writer)?;
44        self.fetch_offset.write(writer)?;
45        self.partition_max_bytes.write(writer)?;
46
47        Ok(())
48    }
49}
50
51#[derive(Debug)]
52pub struct FetchRequestTopic {
53    /// The name of the topic to fetch.
54    pub topic: String_,
55
56    /// The partitions to fetch.
57    pub partitions: Vec<FetchRequestPartition>,
58}
59
60impl<W> WriteVersionedType<W> for FetchRequestTopic
61where
62    W: Write,
63{
64    fn write_versioned(
65        &self,
66        writer: &mut W,
67        version: ApiVersion,
68    ) -> Result<(), WriteVersionedError> {
69        let v = version.0 .0;
70        assert!(v <= 4);
71
72        self.topic.write(writer)?;
73        write_versioned_array(writer, version, Some(&self.partitions))?;
74
75        Ok(())
76    }
77}
78
79#[derive(Debug)]
80pub struct FetchRequest {
81    /// The broker ID of the follower, of -1 if this request is from a consumer.
82    pub replica_id: Int32,
83
84    /// The maximum time in milliseconds to wait for the response.
85    pub max_wait_ms: Int32,
86
87    /// The minimum bytes to accumulate in the response.
88    pub min_bytes: Int32,
89
90    /// The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored.
91    ///
92    /// Defaults to "no limit / max".
93    ///
94    /// Added in version 3.
95    pub max_bytes: Option<Int32>,
96
97    /// This setting controls the visibility of transactional records.
98    ///
99    /// Using `READ_UNCOMMITTED` (`isolation_level = 0`) makes all records visible. With `READ_COMMITTED`
100    /// (`isolation_level = 1`), non-transactional and `COMMITTED` transactional records are visible. To be more
101    /// concrete, `READ_COMMITTED` returns all data from offsets smaller than the current LSO (last stable offset), and
102    /// enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard
103    /// `ABORTED` transactional records.
104    ///
105    /// As per [KIP-98] the default is `READ_UNCOMMITTED`.
106    ///
107    /// Added in version 4.
108    ///
109    /// [KIP-98]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
110    pub isolation_level: Option<IsolationLevel>,
111
112    /// The topics to fetch.
113    pub topics: Vec<FetchRequestTopic>,
114}
115
116impl<W> WriteVersionedType<W> for FetchRequest
117where
118    W: Write,
119{
120    fn write_versioned(
121        &self,
122        writer: &mut W,
123        version: ApiVersion,
124    ) -> Result<(), WriteVersionedError> {
125        let v = version.0 .0;
126        assert!(v <= 4);
127
128        self.replica_id.write(writer)?;
129        self.max_wait_ms.write(writer)?;
130        self.min_bytes.write(writer)?;
131
132        if v >= 3 {
133            // defaults to "no limit / max".
134            self.max_bytes.unwrap_or(Int32(i32::MAX)).write(writer)?;
135        }
136
137        if v >= 4 {
138            // The default is `READ_UNCOMMITTED`.
139            let level: Int8 = self.isolation_level.unwrap_or_default().into();
140            level.write(writer)?;
141        }
142
143        write_versioned_array(writer, version, Some(&self.topics))?;
144
145        Ok(())
146    }
147}
148
149impl RequestBody for FetchRequest {
150    type ResponseBody = FetchResponse;
151
152    const API_KEY: ApiKey = ApiKey::Fetch;
153
154    /// That's enough for now.
155    ///
156    /// Note that we do not support fetch request prior to version 4, since this is the version when message version 2
157    /// was introduced ([KIP-98]).
158    ///
159    /// [KIP-98]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
160    const API_VERSION_RANGE: ApiVersionRange =
161        ApiVersionRange::new(ApiVersion(Int16(4)), ApiVersion(Int16(4)));
162
163    const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(12));
164}
165
166#[derive(Debug)]
167#[allow(missing_copy_implementations)]
168pub struct FetchResponseAbortedTransaction {
169    /// The producer id associated with the aborted transaction.
170    pub producer_id: Int64,
171
172    /// The first offset in the aborted transaction.
173    pub first_offset: Int64,
174}
175
176impl<R> ReadVersionedType<R> for FetchResponseAbortedTransaction
177where
178    R: Read,
179{
180    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
181        let v = version.0 .0;
182        assert!(4 <= v && v <= 4);
183
184        Ok(Self {
185            producer_id: Int64::read(reader)?,
186            first_offset: Int64::read(reader)?,
187        })
188    }
189}
190
191#[derive(Debug)]
192pub struct FetchResponsePartition {
193    /// The partition index.
194    pub partition_index: Int32,
195
196    /// The error code, or 0 if there was no fetch error.
197    pub error_code: Option<ApiError>,
198
199    /// The current high water mark.
200    pub high_watermark: Int64,
201
202    /// The last stable offset (or LSO) of the partition.
203    ///
204    /// This is the last offset such that the state of all transactional records prior to this offset have been decided
205    /// (`ABORTED` or `COMMITTED`).
206    ///
207    /// Added in version 4.
208    pub last_stable_offset: Option<Int64>,
209
210    /// The aborted transactions.
211    ///
212    /// Added in version 4.
213    pub aborted_transactions: Vec<FetchResponseAbortedTransaction>,
214
215    /// The record data.
216    pub records: Records,
217}
218
219impl<R> ReadVersionedType<R> for FetchResponsePartition
220where
221    R: Read,
222{
223    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
224        let v = version.0 .0;
225        assert!(v <= 4);
226
227        Ok(Self {
228            partition_index: Int32::read(reader)?,
229            error_code: ApiError::new(Int16::read(reader)?.0),
230            high_watermark: Int64::read(reader)?,
231            last_stable_offset: (v >= 4).then(|| Int64::read(reader)).transpose()?,
232            aborted_transactions: (v >= 4)
233                .then(|| read_versioned_array(reader, version))
234                .transpose()?
235                .flatten()
236                .unwrap_or_default(),
237            records: Records::read(reader)?,
238        })
239    }
240}
241
242#[derive(Debug)]
243pub struct FetchResponseTopic {
244    /// The topic name.
245    pub topic: String_,
246
247    /// The topic partitions.
248    pub partitions: Vec<FetchResponsePartition>,
249}
250
251impl<R> ReadVersionedType<R> for FetchResponseTopic
252where
253    R: Read,
254{
255    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
256        let v = version.0 .0;
257        assert!(v <= 4);
258
259        Ok(Self {
260            topic: String_::read(reader)?,
261            partitions: read_versioned_array(reader, version)?.unwrap_or_default(),
262        })
263    }
264}
265
266#[derive(Debug)]
267pub struct FetchResponse {
268    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
269    ///
270    /// Added in version 1.
271    pub throttle_time_ms: Option<Int32>,
272
273    /// The response topics.
274    pub responses: Vec<FetchResponseTopic>,
275}
276
277impl<R> ReadVersionedType<R> for FetchResponse
278where
279    R: Read,
280{
281    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
282        let v = version.0 .0;
283        assert!(v <= 4);
284
285        Ok(Self {
286            throttle_time_ms: (v >= 1).then(|| Int32::read(reader)).transpose()?,
287            responses: read_versioned_array(reader, version)?.unwrap_or_default(),
288        })
289    }
290}