kafka_api/schemata/
fetch_request.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use byteorder::ReadBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20// Version 1 is the same as version 0.
21//
22// Starting in Version 2, the requester must be able to handle Kafka Log
23// Message format version 1.
24//
25// Version 3 adds MaxBytes.  Starting in version 3, the partition ordering in
26// the request is now relevant.  Partitions will be processed in the order
27// they appear in the request.
28//
29// Version 4 adds IsolationLevel.  Starting in version 4, the requester must be
30// able to handle Kafka log message format version 2.
31//
32// Version 5 adds LogStartOffset to indicate the earliest available offset of
33// partition data that can be consumed.
34//
35// Version 6 is the same as version 5.
36//
37// Version 7 adds incremental fetch request support.
38//
39// Version 8 is the same as version 7.
40//
41// Version 9 adds CurrentLeaderEpoch, as described in KIP-320.
42//
43// Version 10 indicates that we can use the ZStd compression algorithm, as
44// described in KIP-110.
45// Version 12 adds flexible versions support as well as epoch validation through
46// the `LastFetchedEpoch` field
47//
48// Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
49//
50// Version 14 is the same as version 13 but it also receives a new error called
51// OffsetMovedToTieredStorageException(KIP-405)
52//
53// Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also,
54// deprecate the old ReplicaId field and set its default value to -1. (KIP-903)
55
56#[derive(Debug, Default, Clone)]
57pub struct FetchRequest {
58    /// The clusterId if known. This is used to validate metadata fetches prior to broker
59    /// registration.
60    pub cluster_id: Option<String>,
61    /// The broker ID of the follower, of -1 if this request is from a consumer.
62    pub replica_id: i32,
63    pub replica_state: ReplicaState,
64    /// The maximum time in milliseconds to wait for the response.
65    pub max_wait_ms: i32,
66    /// The minimum bytes to accumulate in the response.
67    pub min_bytes: i32,
68    /// The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored.
69    pub max_bytes: i32,
70    /// This setting controls the visibility of transactional records. Using READ_UNCOMMITTED
71    /// (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1),
72    /// non-transactional and COMMITTED transactional records are visible. To be more concrete,
73    /// READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable
74    /// offset), and enables the inclusion of the list of aborted transactions in the result, which
75    /// allows consumers to discard ABORTED transactional records
76    pub isolation_level: i8,
77    /// The fetch session ID.
78    pub session_id: i32,
79    /// The fetch session epoch, which is used for ordering requests in a session.
80    pub session_epoch: i32,
81    /// The topics to fetch.
82    pub topics: Vec<FetchTopic>,
83    /// In an incremental fetch request, the partitions to remove.
84    pub forgotten_topics_data: Vec<ForgottenTopic>,
85    /// Rack ID of the consumer making this request.
86    pub rack_id: String,
87    /// Unknown tagged fields.
88    pub unknown_tagged_fields: Vec<RawTaggedField>,
89}
90
91impl Decodable for FetchRequest {
92    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
93        let mut this = FetchRequest {
94            replica_id: -1,
95            max_bytes: i32::MAX,
96            session_epoch: -1,
97            ..Default::default()
98        };
99        if version <= 14 {
100            this.replica_id = Int32.decode(buf)?
101        }
102        this.max_wait_ms = Int32.decode(buf)?;
103        this.min_bytes = Int32.decode(buf)?;
104        if version >= 3 {
105            this.max_bytes = Int32.decode(buf)?;
106        }
107        if version >= 4 {
108            this.isolation_level = Int8.decode(buf)?;
109        }
110        if version >= 7 {
111            this.session_id = Int32.decode(buf)?;
112        }
113        if version >= 7 {
114            this.session_epoch = Int32.decode(buf)?;
115        }
116        this.topics = NullableArray(Struct(version), version >= 12)
117            .decode(buf)?
118            .ok_or_else(|| err_decode_message_null("topics"))?;
119        if version >= 7 {
120            this.forgotten_topics_data = NullableArray(Struct(version), version >= 12)
121                .decode(buf)?
122                .ok_or_else(|| err_decode_message_null("forgotten_topics_data"))?;
123        }
124        if version >= 11 {
125            this.rack_id = NullableString(version >= 12)
126                .decode(buf)?
127                .ok_or_else(|| err_decode_message_null("rack_id"))?;
128        }
129        if version >= 12 {
130            this.unknown_tagged_fields =
131                RawTaggedFieldList.decode_with(buf, |buf, tag, _| match tag {
132                    0 => {
133                        this.cluster_id = NullableString(true).decode(buf)?;
134                        Ok(true)
135                    }
136                    1 => {
137                        if version >= 15 {
138                            this.replica_state = ReplicaState::read(buf, version)?;
139                        }
140                        Ok(true)
141                    }
142                    _ => Ok(false),
143                })?;
144        }
145        Ok(this)
146    }
147}
148
149#[derive(Debug, Default, Clone)]
150pub struct ReplicaState {
151    /// The replica ID of the follower, or -1 if this request is from a consumer.
152    pub replica_id: i32,
153    /// The epoch of this follower, or -1 if not available.
154    pub replica_epoch: i64,
155    /// Unknown tagged fields.
156    pub unknown_tagged_fields: Vec<RawTaggedField>,
157}
158
159impl Decodable for ReplicaState {
160    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
161        if version > 15 {
162            Err(err_decode_message_unsupported(version, "ReplicaState"))?
163        }
164
165        Ok(ReplicaState {
166            replica_id: Int32.decode(buf)?,
167            replica_epoch: Int64.decode(buf)?,
168            unknown_tagged_fields: RawTaggedFieldList.decode(buf)?,
169        })
170    }
171}
172
173#[derive(Debug, Default, Clone)]
174pub struct FetchTopic {
175    /// The name of the topic to fetch.
176    pub topic: String,
177    /// The unique topic ID
178    pub topic_id: uuid::Uuid,
179    /// The partitions to fetch.
180    pub partitions: Vec<FetchPartition>,
181    /// Unknown tagged fields.
182    pub unknown_tagged_fields: Vec<RawTaggedField>,
183}
184
185impl Decodable for FetchTopic {
186    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
187        if version > 15 {
188            Err(err_decode_message_unsupported(version, "FetchTopic"))?
189        }
190        let mut this = FetchTopic::default();
191        if version <= 12 {
192            this.topic = NullableString(version >= 12)
193                .decode(buf)?
194                .ok_or_else(|| err_decode_message_null("topic"))?;
195        }
196        if version >= 13 {
197            this.topic_id = Uuid.decode(buf)?;
198        }
199        this.partitions = NullableArray(Struct(version), version >= 12)
200            .decode(buf)?
201            .ok_or_else(|| err_decode_message_null("partitions"))?;
202        if version >= 12 {
203            this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
204        }
205        Ok(this)
206    }
207}
208
209#[derive(Debug, Default, Clone)]
210pub struct FetchPartition {
211    /// The partition index.
212    pub partition: i32,
213    /// The current leader epoch of the partition.
214    pub current_leader_epoch: i32,
215    /// The message offset.
216    pub fetch_offset: i64,
217    /// The epoch of the last fetched record or -1 if there is none
218    pub last_fetched_epoch: i32,
219    /// The earliest available offset of the follower replica.
220    ///
221    /// The field is only used when the request is sent by the follower.
222    pub log_start_offset: i64,
223    /// The maximum bytes to fetch from this partition.
224    ///
225    /// See KIP-74 for cases where this limit may not be honored.
226    pub partition_max_bytes: i32,
227    /// Unknown tagged fields.
228    pub unknown_tagged_fields: Vec<RawTaggedField>,
229}
230
231impl Decodable for FetchPartition {
232    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
233        if version > 15 {
234            Err(err_decode_message_unsupported(version, "FetchPartition"))?
235        }
236        let mut this = FetchPartition {
237            partition: Int32.decode(buf)?,
238            ..Default::default()
239        };
240        this.current_leader_epoch = if version >= 9 { Int32.decode(buf)? } else { -1 };
241        this.fetch_offset = Int64.decode(buf)?;
242        this.last_fetched_epoch = if version >= 12 {
243            Int32.decode(buf)?
244        } else {
245            -1
246        };
247        this.log_start_offset = if version >= 5 { Int64.decode(buf)? } else { -1 };
248        this.partition_max_bytes = Int32.decode(buf)?;
249        if version >= 12 {
250            this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
251        }
252        Ok(this)
253    }
254}
255
256#[derive(Debug, Default, Clone)]
257pub struct ForgottenTopic {
258    /// The topic name.
259    pub topic: String,
260    /// The unique topic ID
261    pub topic_id: uuid::Uuid,
262    /// The partitions indexes to forget.
263    pub partitions: Vec<i32>,
264    /// Unknown tagged fields.
265    pub unknown_tagged_fields: Vec<RawTaggedField>,
266}
267
268impl Decodable for ForgottenTopic {
269    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
270        if version > 15 {
271            Err(err_decode_message_unsupported(version, "ForgottenTopic"))?
272        }
273        let mut this = ForgottenTopic::default();
274        if version <= 12 {
275            this.topic = NullableString(version >= 12)
276                .decode(buf)?
277                .ok_or_else(|| err_decode_message_null("topic"))?;
278        }
279        if version >= 13 {
280            this.topic_id = Uuid.decode(buf)?;
281        }
282        this.partitions = NullableArray(Int32, version >= 12)
283            .decode(buf)?
284            .ok_or_else(|| err_decode_message_null("partitions"))?;
285        if version >= 12 {
286            this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
287        }
288        Ok(this)
289    }
290}