kafka_api/schemata/
offset_fetch_response.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use byteorder::WriteBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20// Version 1 is the same as version 0.
21//
22// Version 2 adds a top-level error code.
23//
24// Version 3 adds the throttle time.
25//
26// Starting in version 4, on quota violation, brokers send out responses before throttling.
27//
28// Version 5 adds the leader epoch to the committed offset.
29//
30// Version 6 is the first flexible version.
31//
32// Version 7 adds pending offset commit as new error response on partition level.
33//
34// Version 8 is adding support for fetching offsets for multiple groups
35
36#[derive(Debug, Default, Clone)]
37pub struct OffsetFetchResponse {
38    /// The duration in milliseconds for which the request was throttled due to a quota violation,
39    /// or zero if the request did not violate any quota.
40    pub throttle_time_ms: i32,
41    /// The responses per topic.
42    pub topics: Vec<OffsetFetchResponseTopic>,
43    /// The top-level error code, or 0 if there was no error.
44    pub error_code: i16,
45    /// The responses per group id.
46    pub groups: Vec<OffsetFetchResponseGroup>,
47    /// Unknown tagged fields.
48    pub unknown_tagged_fields: Vec<RawTaggedField>,
49}
50
51impl Encodable for OffsetFetchResponse {
52    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
53        if version >= 3 {
54            Int32.encode(buf, self.throttle_time_ms)?;
55        }
56        if version <= 7 {
57            NullableArray(Struct(version), version >= 6).encode(buf, self.topics.as_slice())?;
58        }
59        if (2..=7).contains(&version) {
60            Int16.encode(buf, self.error_code)?;
61        }
62        if version >= 8 {
63            NullableArray(Struct(version), true).encode(buf, self.groups.as_slice())?;
64        }
65        if version >= 6 {
66            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
67        }
68        Ok(())
69    }
70
71    fn calculate_size(&self, version: i16) -> usize {
72        let mut res = 0;
73        if version >= 3 {
74            res += Int32::SIZE; // self.throttle_time_ms
75        }
76        if version <= 7 {
77            res +=
78                NullableArray(Struct(version), version >= 6).calculate_size(self.topics.as_slice());
79        }
80        if (2..=7).contains(&version) {
81            res += Int16::SIZE; // self.error_code
82        }
83        if version >= 8 {
84            res += NullableArray(Struct(version), true).calculate_size(self.groups.as_slice());
85        }
86        if version >= 6 {
87            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
88        }
89        res
90    }
91}
92
93#[derive(Debug, Default, Clone)]
94pub struct OffsetFetchResponseTopic {
95    /// The topic name.
96    pub name: String,
97    /// The responses per partition.
98    pub partitions: Vec<OffsetFetchResponsePartition>,
99    /// Unknown tagged fields.
100    pub unknown_tagged_fields: Vec<RawTaggedField>,
101}
102
103impl Encodable for OffsetFetchResponseTopic {
104    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
105        if version > 7 {
106            Err(err_encode_message_unsupported(
107                version,
108                "OffsetFetchResponseTopic",
109            ))?
110        }
111        NullableString(version >= 6).encode(buf, self.name.as_str())?;
112        NullableArray(Struct(version), version >= 6).encode(buf, self.partitions.as_slice())?;
113        if version >= 6 {
114            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
115        }
116        Ok(())
117    }
118
119    fn calculate_size(&self, version: i16) -> usize {
120        let mut res = 0;
121        res += NullableString(version >= 6).calculate_size(self.name.as_str());
122        res +=
123            NullableArray(Struct(version), version >= 6).calculate_size(self.partitions.as_slice());
124        if version >= 6 {
125            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
126        }
127        res
128    }
129}
130
131#[derive(Debug, Default, Clone)]
132pub struct OffsetFetchResponsePartition {
133    /// The partition index.
134    pub partition_index: i32,
135    /// The committed message offset.
136    pub committed_offset: i64,
137    /// The leader epoch.
138    pub committed_leader_epoch: i32,
139    /// The partition metadata.
140    pub metadata: Option<String>,
141    /// The partition-level error code, or 0 if there was no error.
142    pub error_code: i16,
143    /// Unknown tagged fields.
144    pub unknown_tagged_fields: Vec<RawTaggedField>,
145}
146
147impl Encodable for OffsetFetchResponsePartition {
148    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
149        Int32.encode(buf, self.partition_index)?;
150        Int64.encode(buf, self.committed_offset)?;
151        if version >= 5 {
152            Int32.encode(buf, self.committed_leader_epoch)?;
153        }
154        NullableString(version >= 6).encode(buf, self.metadata.as_deref())?;
155        Int16.encode(buf, self.error_code)?;
156        if version >= 6 {
157            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
158        }
159        Ok(())
160    }
161
162    fn calculate_size(&self, version: i16) -> usize {
163        let mut res = 0;
164        res += Int32::SIZE; // self.partition_index
165        res += Int64::SIZE; // self.committed_offset
166        if version >= 5 {
167            res += Int32::SIZE; // self.committed_leader_epoch
168        }
169        res += NullableString(version >= 6).calculate_size(self.metadata.as_deref());
170        res += Int16::SIZE; // self.error_code
171        if version >= 6 {
172            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
173        }
174        res
175    }
176}
177
178#[derive(Debug, Default, Clone)]
179pub struct OffsetFetchResponseGroup {
180    /// The group to fetch offsets for.
181    pub group_id: String,
182    /// The responses per topic.
183    pub topics: Vec<OffsetFetchResponseTopics>,
184    /// The group-level error code, or 0 if there was no error.
185    pub error_code: i16,
186    /// Unknown tagged fields.
187    pub unknown_tagged_fields: Vec<RawTaggedField>,
188}
189
190impl Encodable for OffsetFetchResponseGroup {
191    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
192        if version < 8 {
193            Err(err_encode_message_unsupported(
194                version,
195                "OffsetFetchResponseGroup",
196            ))?
197        }
198        NullableString(true).encode(buf, self.group_id.as_str())?;
199        NullableArray(Struct(version), true).encode(buf, self.topics.as_slice())?;
200        Int16.encode(buf, self.error_code)?;
201        RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
202        Ok(())
203    }
204
205    fn calculate_size(&self, version: i16) -> usize {
206        let mut res = 0;
207        res += NullableString(true).calculate_size(self.group_id.as_str());
208        res += NullableArray(Struct(version), true).calculate_size(self.topics.as_slice());
209        res += Int16::SIZE; // self.error_code
210        res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
211        res
212    }
213}
214
215#[derive(Debug, Default, Clone)]
216pub struct OffsetFetchResponseTopics {
217    /// The topic name.
218    pub name: String,
219    /// The responses per partition.
220    pub partitions: Vec<OffsetFetchResponsePartitions>,
221    /// Unknown tagged fields.
222    pub unknown_tagged_fields: Vec<RawTaggedField>,
223}
224
225impl Encodable for OffsetFetchResponseTopics {
226    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
227        NullableString(true).encode(buf, self.name.as_str())?;
228        NullableArray(Struct(version), true).encode(buf, self.partitions.as_slice())?;
229        RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
230        Ok(())
231    }
232
233    fn calculate_size(&self, version: i16) -> usize {
234        let mut res = 0;
235        res += NullableString(true).calculate_size(self.name.as_str());
236        res += NullableArray(Struct(version), true).calculate_size(self.partitions.as_slice());
237        res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
238        res
239    }
240}
241
242#[derive(Debug, Default, Clone)]
243pub struct OffsetFetchResponsePartitions {
244    /// The partition index.
245    pub partition_index: i32,
246    /// The committed message offset.
247    pub committed_offset: i64,
248    /// The leader epoch.
249    pub committed_leader_epoch: i32,
250    /// The partition metadata.
251    pub metadata: Option<String>,
252    /// The partition-level error code, or 0 if there was no error.
253    pub error_code: i16,
254    /// Unknown tagged fields.
255    pub unknown_tagged_fields: Vec<RawTaggedField>,
256}
257
258impl Encodable for OffsetFetchResponsePartitions {
259    fn write<B: WriteBytesExt>(&self, buf: &mut B, _version: i16) -> IoResult<()> {
260        Int32.encode(buf, self.partition_index)?;
261        Int64.encode(buf, self.committed_offset)?;
262        Int32.encode(buf, self.committed_leader_epoch)?;
263        NullableString(true).encode(buf, self.metadata.as_deref())?;
264        Int16.encode(buf, self.error_code)?;
265        RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
266        Ok(())
267    }
268
269    fn calculate_size(&self, _version: i16) -> usize {
270        let mut res = 0;
271        res += Int32::SIZE; // self.partition_index
272        res += Int64::SIZE; // self.committed_offset
273        res += Int32::SIZE; // self.committed_leader_epoch
274        res += NullableString(true).calculate_size(self.metadata.as_deref());
275        res += Int16::SIZE; // self.error_code
276        res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
277        res
278    }
279}