kafka_api/schemata/fetch_request.rs
1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use byteorder::ReadBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20// Version 1 is the same as version 0.
21//
22// Starting in Version 2, the requester must be able to handle Kafka Log
23// Message format version 1.
24//
25// Version 3 adds MaxBytes. Starting in version 3, the partition ordering in
26// the request is now relevant. Partitions will be processed in the order
27// they appear in the request.
28//
29// Version 4 adds IsolationLevel. Starting in version 4, the requester must be
30// able to handle Kafka log message format version 2.
31//
32// Version 5 adds LogStartOffset to indicate the earliest available offset of
33// partition data that can be consumed.
34//
35// Version 6 is the same as version 5.
36//
37// Version 7 adds incremental fetch request support.
38//
39// Version 8 is the same as version 7.
40//
41// Version 9 adds CurrentLeaderEpoch, as described in KIP-320.
42//
43// Version 10 indicates that we can use the ZStd compression algorithm, as
44// described in KIP-110.
45// Version 12 adds flexible versions support as well as epoch validation through
46// the `LastFetchedEpoch` field
47//
48// Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
49//
50// Version 14 is the same as version 13 but it also receives a new error called
51// OffsetMovedToTieredStorageException(KIP-405)
52//
53// Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also,
54// deprecate the old ReplicaId field and set its default value to -1. (KIP-903)
55
56#[derive(Debug, Default, Clone)]
57pub struct FetchRequest {
58 /// The clusterId if known. This is used to validate metadata fetches prior to broker
59 /// registration.
60 pub cluster_id: Option<String>,
61 /// The broker ID of the follower, of -1 if this request is from a consumer.
62 pub replica_id: i32,
63 pub replica_state: ReplicaState,
64 /// The maximum time in milliseconds to wait for the response.
65 pub max_wait_ms: i32,
66 /// The minimum bytes to accumulate in the response.
67 pub min_bytes: i32,
68 /// The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored.
69 pub max_bytes: i32,
70 /// This setting controls the visibility of transactional records. Using READ_UNCOMMITTED
71 /// (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1),
72 /// non-transactional and COMMITTED transactional records are visible. To be more concrete,
73 /// READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable
74 /// offset), and enables the inclusion of the list of aborted transactions in the result, which
75 /// allows consumers to discard ABORTED transactional records
76 pub isolation_level: i8,
77 /// The fetch session ID.
78 pub session_id: i32,
79 /// The fetch session epoch, which is used for ordering requests in a session.
80 pub session_epoch: i32,
81 /// The topics to fetch.
82 pub topics: Vec<FetchTopic>,
83 /// In an incremental fetch request, the partitions to remove.
84 pub forgotten_topics_data: Vec<ForgottenTopic>,
85 /// Rack ID of the consumer making this request.
86 pub rack_id: String,
87 /// Unknown tagged fields.
88 pub unknown_tagged_fields: Vec<RawTaggedField>,
89}
90
91impl Decodable for FetchRequest {
92 fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
93 let mut this = FetchRequest {
94 replica_id: -1,
95 max_bytes: i32::MAX,
96 session_epoch: -1,
97 ..Default::default()
98 };
99 if version <= 14 {
100 this.replica_id = Int32.decode(buf)?
101 }
102 this.max_wait_ms = Int32.decode(buf)?;
103 this.min_bytes = Int32.decode(buf)?;
104 if version >= 3 {
105 this.max_bytes = Int32.decode(buf)?;
106 }
107 if version >= 4 {
108 this.isolation_level = Int8.decode(buf)?;
109 }
110 if version >= 7 {
111 this.session_id = Int32.decode(buf)?;
112 }
113 if version >= 7 {
114 this.session_epoch = Int32.decode(buf)?;
115 }
116 this.topics = NullableArray(Struct(version), version >= 12)
117 .decode(buf)?
118 .ok_or_else(|| err_decode_message_null("topics"))?;
119 if version >= 7 {
120 this.forgotten_topics_data = NullableArray(Struct(version), version >= 12)
121 .decode(buf)?
122 .ok_or_else(|| err_decode_message_null("forgotten_topics_data"))?;
123 }
124 if version >= 11 {
125 this.rack_id = NullableString(version >= 12)
126 .decode(buf)?
127 .ok_or_else(|| err_decode_message_null("rack_id"))?;
128 }
129 if version >= 12 {
130 this.unknown_tagged_fields =
131 RawTaggedFieldList.decode_with(buf, |buf, tag, _| match tag {
132 0 => {
133 this.cluster_id = NullableString(true).decode(buf)?;
134 Ok(true)
135 }
136 1 => {
137 if version >= 15 {
138 this.replica_state = ReplicaState::read(buf, version)?;
139 }
140 Ok(true)
141 }
142 _ => Ok(false),
143 })?;
144 }
145 Ok(this)
146 }
147}
148
149#[derive(Debug, Default, Clone)]
150pub struct ReplicaState {
151 /// The replica ID of the follower, or -1 if this request is from a consumer.
152 pub replica_id: i32,
153 /// The epoch of this follower, or -1 if not available.
154 pub replica_epoch: i64,
155 /// Unknown tagged fields.
156 pub unknown_tagged_fields: Vec<RawTaggedField>,
157}
158
159impl Decodable for ReplicaState {
160 fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
161 if version > 15 {
162 Err(err_decode_message_unsupported(version, "ReplicaState"))?
163 }
164
165 Ok(ReplicaState {
166 replica_id: Int32.decode(buf)?,
167 replica_epoch: Int64.decode(buf)?,
168 unknown_tagged_fields: RawTaggedFieldList.decode(buf)?,
169 })
170 }
171}
172
173#[derive(Debug, Default, Clone)]
174pub struct FetchTopic {
175 /// The name of the topic to fetch.
176 pub topic: String,
177 /// The unique topic ID
178 pub topic_id: uuid::Uuid,
179 /// The partitions to fetch.
180 pub partitions: Vec<FetchPartition>,
181 /// Unknown tagged fields.
182 pub unknown_tagged_fields: Vec<RawTaggedField>,
183}
184
185impl Decodable for FetchTopic {
186 fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
187 if version > 15 {
188 Err(err_decode_message_unsupported(version, "FetchTopic"))?
189 }
190 let mut this = FetchTopic::default();
191 if version <= 12 {
192 this.topic = NullableString(version >= 12)
193 .decode(buf)?
194 .ok_or_else(|| err_decode_message_null("topic"))?;
195 }
196 if version >= 13 {
197 this.topic_id = Uuid.decode(buf)?;
198 }
199 this.partitions = NullableArray(Struct(version), version >= 12)
200 .decode(buf)?
201 .ok_or_else(|| err_decode_message_null("partitions"))?;
202 if version >= 12 {
203 this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
204 }
205 Ok(this)
206 }
207}
208
209#[derive(Debug, Default, Clone)]
210pub struct FetchPartition {
211 /// The partition index.
212 pub partition: i32,
213 /// The current leader epoch of the partition.
214 pub current_leader_epoch: i32,
215 /// The message offset.
216 pub fetch_offset: i64,
217 /// The epoch of the last fetched record or -1 if there is none
218 pub last_fetched_epoch: i32,
219 /// The earliest available offset of the follower replica.
220 ///
221 /// The field is only used when the request is sent by the follower.
222 pub log_start_offset: i64,
223 /// The maximum bytes to fetch from this partition.
224 ///
225 /// See KIP-74 for cases where this limit may not be honored.
226 pub partition_max_bytes: i32,
227 /// Unknown tagged fields.
228 pub unknown_tagged_fields: Vec<RawTaggedField>,
229}
230
231impl Decodable for FetchPartition {
232 fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
233 if version > 15 {
234 Err(err_decode_message_unsupported(version, "FetchPartition"))?
235 }
236 let mut this = FetchPartition {
237 partition: Int32.decode(buf)?,
238 ..Default::default()
239 };
240 this.current_leader_epoch = if version >= 9 { Int32.decode(buf)? } else { -1 };
241 this.fetch_offset = Int64.decode(buf)?;
242 this.last_fetched_epoch = if version >= 12 {
243 Int32.decode(buf)?
244 } else {
245 -1
246 };
247 this.log_start_offset = if version >= 5 { Int64.decode(buf)? } else { -1 };
248 this.partition_max_bytes = Int32.decode(buf)?;
249 if version >= 12 {
250 this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
251 }
252 Ok(this)
253 }
254}
255
256#[derive(Debug, Default, Clone)]
257pub struct ForgottenTopic {
258 /// The topic name.
259 pub topic: String,
260 /// The unique topic ID
261 pub topic_id: uuid::Uuid,
262 /// The partitions indexes to forget.
263 pub partitions: Vec<i32>,
264 /// Unknown tagged fields.
265 pub unknown_tagged_fields: Vec<RawTaggedField>,
266}
267
268impl Decodable for ForgottenTopic {
269 fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
270 if version > 15 {
271 Err(err_decode_message_unsupported(version, "ForgottenTopic"))?
272 }
273 let mut this = ForgottenTopic::default();
274 if version <= 12 {
275 this.topic = NullableString(version >= 12)
276 .decode(buf)?
277 .ok_or_else(|| err_decode_message_null("topic"))?;
278 }
279 if version >= 13 {
280 this.topic_id = Uuid.decode(buf)?;
281 }
282 this.partitions = NullableArray(Int32, version >= 12)
283 .decode(buf)?
284 .ok_or_else(|| err_decode_message_null("partitions"))?;
285 if version >= 12 {
286 this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
287 }
288 Ok(this)
289 }
290}