kafka_api/schemata/
fetch_response.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use byteorder::WriteBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20// Version 1 adds throttle time.
21//
22// Version 2 and 3 are the same as version 1.
23//
24// Version 4 adds features for transactional consumption.
25//
26// Version 5 adds LogStartOffset to indicate the earliest available offset of
27// partition data that can be consumed.
28//
29// Starting in version 6, we may return KAFKA_STORAGE_ERROR as an error code.
30//
31// Version 7 adds incremental fetch request support.
32//
33// Starting in version 8, on quota violation, brokers send out responses before throttling.
34//
35// Version 9 is the same as version 8.
36//
37// Version 10 indicates that the response data can use the ZStd compression
38// algorithm, as described in KIP-110.
39// Version 12 adds support for flexible versions, epoch detection through the `TruncationOffset`
40// field, and leader discovery through the `CurrentLeader` field
41//
42// Version 13 replaces the topic name field with topic ID (KIP-516).
43//
44// Version 14 is the same as version 13 but it also receives a new error called
45// OffsetMovedToTieredStorageException (KIP-405)
46//
47// Version 15 is the same as version 14 (KIP-903).
48
49#[derive(Debug, Default, Clone)]
50pub struct FetchResponse {
51    /// The duration in milliseconds for which the request was throttled due to a quota violation,
52    /// or zero if the request did not violate any quota.
53    pub throttle_time_ms: i32,
54    /// The top level response error code.
55    pub error_code: i16,
56    /// The fetch session ID, or 0 if this is not part of a fetch session.
57    pub session_id: i32,
58    /// The response topics.
59    pub responses: Vec<FetchableTopicResponse>,
60    /// Unknown tagged fields.
61    pub unknown_tagged_fields: Vec<RawTaggedField>,
62}
63
64impl Encodable for FetchResponse {
65    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
66        if version >= 1 {
67            Int32.encode(buf, self.throttle_time_ms)?;
68        }
69        if version >= 7 {
70            Int16.encode(buf, self.error_code)?;
71            Int32.encode(buf, self.session_id)?;
72        }
73        NullableArray(Struct(version), version >= 12).encode(buf, self.responses.as_slice())?;
74        if version >= 12 {
75            RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
76        }
77        Ok(())
78    }
79
80    fn calculate_size(&self, version: i16) -> usize {
81        let mut res = 0;
82        if version >= 1 {
83            res += Int32::SIZE; // self.throttle_time_ms
84        }
85        if version >= 7 {
86            res += Int16::SIZE; // self.error_code
87            res += Int32::SIZE; // self.session_id
88        }
89        res +=
90            NullableArray(Struct(version), version >= 12).calculate_size(self.responses.as_slice());
91        if version >= 12 {
92            res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
93        }
94        res
95    }
96}
97
98#[derive(Debug, Default, Clone)]
99pub struct FetchableTopicResponse {
100    /// The topic name.
101    pub topic: String,
102    /// The unique topic ID
103    pub topic_id: uuid::Uuid,
104    /// The topic partitions.
105    pub partitions: Vec<PartitionData>,
106    /// Unknown tagged fields.
107    pub unknown_tagged_fields: Vec<RawTaggedField>,
108}
109
110impl Encodable for FetchableTopicResponse {
111    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
112        if version <= 12 {
113            NullableString(version >= 12).encode(buf, self.topic.as_str())?;
114        }
115        if version >= 13 {
116            Uuid.encode(buf, self.topic_id)?;
117        }
118        NullableArray(Struct(version), version >= 12).encode(buf, self.partitions.as_slice())?;
119        if version >= 12 {
120            RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
121        }
122        Ok(())
123    }
124
125    fn calculate_size(&self, version: i16) -> usize {
126        let mut res = 0;
127        if version <= 12 {
128            res += NullableString(version >= 12).calculate_size(self.topic.as_str());
129        }
130        if version >= 13 {
131            res += Uuid::SIZE; // self.topic_id
132        }
133        res += NullableArray(Struct(version), version >= 12)
134            .calculate_size(self.partitions.as_slice());
135        if version >= 12 {
136            res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
137        }
138        res
139    }
140}
141
142#[derive(Debug, Clone)]
143pub struct PartitionData {
144    /// The topic name.
145    pub partition_index: i32,
146    /// The error code, or 0 if there was no fetch error.
147    pub error_code: i16,
148    /// The current high watermark.
149    pub high_watermark: i64,
150    /// The last stable offset (or LSO) of the partition. This is the last offset such that the
151    /// state of all transactional records prior to this offset have been decided (ABORTED or
152    /// COMMITTED).
153    pub last_stable_offset: i64,
154    /// The current log start offset.
155    pub log_start_offset: i64,
156    /// In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the
157    /// request, this field indicates the largest epoch and its end offset such that subsequent
158    /// records are known to diverge
159    pub diverging_epoch: Option<EpochEndOffset>,
160    pub current_leader: Option<LeaderIdAndEpoch>,
161    /// In the case of fetching an offset less than the LogStartOffset, this is the end offset and
162    /// epoch that should be used in the FetchSnapshot request.
163    pub snapshot_id: Option<SnapshotId>,
164    /// The aborted transactions.
165    pub aborted_transactions: Option<Vec<AbortedTransaction>>,
166    /// The preferred read replica for the consumer to use on its next fetch request
167    pub preferred_read_replica: i32,
168    /// The record data.
169    pub records: Vec<u8>,
170    /// Unknown tagged fields.
171    pub unknown_tagged_fields: Vec<RawTaggedField>,
172}
173
174impl Default for PartitionData {
175    fn default() -> Self {
176        PartitionData {
177            partition_index: 0,
178            error_code: 0,
179            high_watermark: 0,
180            last_stable_offset: -1,
181            log_start_offset: -1,
182            diverging_epoch: None,
183            current_leader: None,
184            snapshot_id: None,
185            aborted_transactions: None,
186            preferred_read_replica: -1,
187            records: Default::default(),
188            unknown_tagged_fields: vec![],
189        }
190    }
191}
192
193impl Encodable for PartitionData {
194    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
195        Int32.encode(buf, self.partition_index)?;
196        Int16.encode(buf, self.error_code)?;
197        Int64.encode(buf, self.high_watermark)?;
198        if version >= 4 {
199            Int64.encode(buf, self.last_stable_offset)?;
200        }
201        if version >= 5 {
202            Int64.encode(buf, self.log_start_offset)?;
203        }
204        if version >= 4 {
205            NullableArray(Struct(version), version >= 12)
206                .encode(buf, self.aborted_transactions.as_deref())?;
207        }
208        if version >= 11 {
209            Int32.encode(buf, self.preferred_read_replica)?;
210        }
211        NullableBytes(version >= 12).encode(buf, &self.records)?;
212        if version >= 12 {
213            let mut n = self.diverging_epoch.is_some() as usize;
214            n += self.current_leader.is_some() as usize;
215            n += self.snapshot_id.is_some() as usize;
216            RawTaggedFieldList.encode_with(buf, n, &self.unknown_tagged_fields, |buf| {
217                if let Some(diverging_epoch) = &self.diverging_epoch {
218                    RawTaggedFieldWriter.write_field(buf, 0, Struct(version), diverging_epoch)?;
219                }
220                if let Some(current_leader) = &self.current_leader {
221                    RawTaggedFieldWriter.write_field(buf, 1, Struct(version), current_leader)?;
222                }
223                if let Some(snapshot_id) = &self.snapshot_id {
224                    RawTaggedFieldWriter.write_field(buf, 2, Struct(version), snapshot_id)?;
225                }
226                Ok(())
227            })?;
228        }
229        Ok(())
230    }
231
232    fn calculate_size(&self, version: i16) -> usize {
233        let mut res = 0;
234        res += Int32::SIZE; // self.partition_index
235        res += Int16::SIZE; // self.error_code
236        res += Int64::SIZE; // self.high_watermark
237        if version >= 4 {
238            res += Int64::SIZE; // self.last_stable_offset
239        }
240        if version >= 5 {
241            res += Int64::SIZE; // self.log_start_offset
242        }
243        if version >= 4 {
244            res += NullableArray(Struct(version), version >= 12)
245                .calculate_size(self.aborted_transactions.as_deref());
246        }
247        if version >= 11 {
248            res += Int32::SIZE; // self.preferred_read_replica
249        }
250        res += NullableBytes(version >= 12).calculate_size(&self.records);
251        if version >= 12 {
252            let mut n = 0;
253            let mut bs = 0;
254            if let Some(diverging_epoch) = &self.diverging_epoch {
255                n += 1;
256                bs +=
257                    RawTaggedFieldWriter.calculate_field_size(0, Struct(version), diverging_epoch);
258            }
259            if let Some(current_leader) = &self.current_leader {
260                n += 1;
261                bs += RawTaggedFieldWriter.calculate_field_size(0, Struct(version), current_leader);
262            }
263            if let Some(snapshot_id) = &self.snapshot_id {
264                n += 1;
265                bs += RawTaggedFieldWriter.calculate_field_size(0, Struct(version), snapshot_id);
266            }
267            res += RawTaggedFieldList.calculate_size_with(n, bs, &self.unknown_tagged_fields);
268        }
269        res
270    }
271}
272
273#[derive(Debug, Clone)]
274pub struct EpochEndOffset {
275    pub epoch: i32,
276    pub end_offset: i64,
277    /// Unknown tagged fields.
278    pub unknown_tagged_fields: Vec<RawTaggedField>,
279}
280
281impl Default for EpochEndOffset {
282    fn default() -> Self {
283        EpochEndOffset {
284            epoch: -1,
285            end_offset: -1,
286            unknown_tagged_fields: vec![],
287        }
288    }
289}
290
291impl Encodable for EpochEndOffset {
292    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
293        if version < 12 {
294            Err(err_encode_message_unsupported(version, "EpochEndOffset"))?
295        }
296        Int32.encode(buf, self.epoch)?;
297        Int64.encode(buf, self.end_offset)?;
298        RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
299        Ok(())
300    }
301
302    fn calculate_size(&self, _version: i16) -> usize {
303        let mut res = 0;
304        res += Int32::SIZE; // self.epoch
305        res += Int64::SIZE; // self.end_offset
306        res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
307        res
308    }
309}
310
311#[derive(Debug, Clone)]
312pub struct LeaderIdAndEpoch {
313    /// The ID of the current leader or -1 if the leader is unknown.
314    pub leader_id: i32,
315    /// The latest known leader epoch
316    pub leader_epoch: i32,
317    /// Unknown tagged fields.
318    pub unknown_tagged_fields: Vec<RawTaggedField>,
319}
320
321impl Default for LeaderIdAndEpoch {
322    fn default() -> Self {
323        LeaderIdAndEpoch {
324            leader_id: -1,
325            leader_epoch: -1,
326            unknown_tagged_fields: vec![],
327        }
328    }
329}
330
331impl Encodable for LeaderIdAndEpoch {
332    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
333        if version < 12 {
334            Err(err_encode_message_unsupported(version, "LeaderIdAndEpoch"))?
335        }
336        Int32.encode(buf, self.leader_id)?;
337        Int32.encode(buf, self.leader_epoch)?;
338        RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
339        Ok(())
340    }
341
342    fn calculate_size(&self, _version: i16) -> usize {
343        let mut res = 0;
344        res += Int32::SIZE; // self.leader_id
345        res += Int32::SIZE; // self.leader_epoch
346        res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
347        res
348    }
349}
350
351#[derive(Debug, Clone)]
352pub struct SnapshotId {
353    pub end_offset: i64,
354    pub epoch: i32,
355    /// Unknown tagged fields.
356    pub unknown_tagged_fields: Vec<RawTaggedField>,
357}
358
359impl Default for SnapshotId {
360    fn default() -> Self {
361        SnapshotId {
362            end_offset: -1,
363            epoch: -1,
364            unknown_tagged_fields: vec![],
365        }
366    }
367}
368
369impl Encodable for SnapshotId {
370    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
371        if version < 12 {
372            Err(err_encode_message_unsupported(version, "SnapshotId"))?
373        }
374        Int64.encode(buf, self.end_offset)?;
375        Int32.encode(buf, self.epoch)?;
376        RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
377        Ok(())
378    }
379
380    fn calculate_size(&self, _version: i16) -> usize {
381        let mut res = 0;
382        res += Int64::SIZE; // self.end_offset
383        res += Int32::SIZE; // self.epoch
384        res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
385        res
386    }
387}
388
389#[derive(Debug, Default, Clone)]
390pub struct AbortedTransaction {
391    /// The producer id associated with the aborted transaction.
392    pub producer_id: i64,
393    /// The first offset in the aborted transaction.
394    pub first_offset: i64,
395    /// Unknown tagged fields.
396    pub unknown_tagged_fields: Vec<RawTaggedField>,
397}
398
399impl Encodable for AbortedTransaction {
400    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
401        if version < 4 {
402            Err(err_encode_message_unsupported(
403                version,
404                "AbortedTransaction",
405            ))?
406        }
407        Int64.encode(buf, self.producer_id)?;
408        Int64.encode(buf, self.first_offset)?;
409        if version >= 12 {
410            RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
411        }
412        Ok(())
413    }
414
415    fn calculate_size(&self, version: i16) -> usize {
416        let mut res = 0;
417        res += Int64::SIZE; // self.producer_id
418        res += Int64::SIZE; // self.first_offset
419        if version >= 12 {
420            res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
421        }
422        res
423    }
424}