kafka_api/schemata/
api_versions_response.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use byteorder::WriteBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20// Version 1 adds throttle time to the response.
21//
22// Starting in version 2, on quota violation, brokers send out responses before throttling.
23//
24// Version 3 is the first flexible version. Tagged fields are only supported in the body but
25// not in the header. The length of the header must not change in order to guarantee the
26// backward compatibility.
27//
28// Starting from Apache Kafka 2.4 (KIP-511), ApiKeys field is populated with the supported
29// versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned.
30
31#[derive(Debug, Default, Clone)]
32pub struct ApiVersionsResponse {
33    /// The top-level error code.
34    pub error_code: i16,
35    /// The APIs supported by the broker.
36    pub api_keys: Vec<ApiVersion>,
37    /// The duration in milliseconds for which the request was throttled due to a quota violation,
38    /// or zero if the request did not violate any quota.
39    pub throttle_time_ms: i32,
40    /// Features supported by the broker.
41    pub supported_features: Vec<SupportedFeatureKey>,
42    /// The monotonically increasing epoch for the finalized features information. Valid values are
43    /// >= 0. A value of -1 is special and represents unknown epoch.
44    pub finalized_features_epoch: i64,
45    /// List of cluster-wide finalized features. The information is valid only if
46    /// FinalizedFeaturesEpoch >= 0.
47    pub finalized_features: Vec<FinalizedFeatureKey>,
48    /// Set by a KRaft controller if the required configurations for ZK migration are present.
49    pub zk_migration_ready: bool,
50    /// Unknown tagged fields.
51    pub unknown_tagged_fields: Vec<RawTaggedField>,
52}
53
54impl Encodable for ApiVersionsResponse {
55    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
56        Int16.encode(buf, self.error_code)?;
57        NullableArray(Struct(version), version >= 3).encode(buf, self.api_keys.as_slice())?;
58        if version >= 1 {
59            Int32.encode(buf, self.throttle_time_ms)?;
60        }
61        if version >= 3 {
62            RawTaggedFieldList.encode_with(buf, 3, &self.unknown_tagged_fields, |buf| {
63                RawTaggedFieldWriter.write_field(
64                    buf,
65                    0,
66                    NullableArray(Struct(version), version >= 3),
67                    self.supported_features.as_slice(),
68                )?;
69                RawTaggedFieldWriter.write_field(buf, 1, Int64, self.finalized_features_epoch)?;
70                RawTaggedFieldWriter.write_field(
71                    buf,
72                    2,
73                    NullableArray(Struct(version), version >= 3),
74                    self.finalized_features.as_slice(),
75                )?;
76                Ok(())
77            })?;
78        }
79        Ok(())
80    }
81
82    fn calculate_size(&self, version: i16) -> usize {
83        let mut res = 0;
84        res += Int16::SIZE; //self.error_code
85        res +=
86            NullableArray(Struct(version), version >= 3).calculate_size(self.api_keys.as_slice());
87        if version >= 1 {
88            res += Int32::SIZE; // self.throttle_time_ms
89        }
90        if version >= 3 {
91            res += RawTaggedFieldList.calculate_size_with(
92                3,
93                RawTaggedFieldWriter.calculate_field_size(
94                    0,
95                    NullableArray(Struct(version), version >= 3),
96                    self.supported_features.as_slice(),
97                ) + RawTaggedFieldWriter.calculate_field_size(
98                    1,
99                    Int64,
100                    &self.finalized_features_epoch,
101                ) + RawTaggedFieldWriter.calculate_field_size(
102                    2,
103                    NullableArray(Struct(version), version >= 3),
104                    self.finalized_features.as_slice(),
105                ),
106                &self.unknown_tagged_fields,
107            );
108        }
109        res
110    }
111}
112
113#[derive(Debug, Default, Clone)]
114pub struct ApiVersion {
115    /// The API index.
116    pub api_key: i16,
117    /// The minimum supported version, inclusive.
118    pub min_version: i16,
119    /// The maximum supported version, inclusive.
120    pub max_version: i16,
121    /// Unknown tagged fields.
122    pub unknown_tagged_fields: Vec<RawTaggedField>,
123}
124
125impl Encodable for ApiVersion {
126    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
127        Int16.encode(buf, self.api_key)?;
128        Int16.encode(buf, self.min_version)?;
129        Int16.encode(buf, self.max_version)?;
130        if version >= 3 {
131            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
132        }
133        Ok(())
134    }
135
136    fn calculate_size(&self, version: i16) -> usize {
137        let mut res = 0;
138        res += Int16::SIZE; // self.api_key
139        res += Int16::SIZE; // self.min_version
140        res += Int16::SIZE; // self.max_version
141        if version >= 3 {
142            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
143        }
144        res
145    }
146}
147
148#[derive(Debug, Default, Clone)]
149pub struct SupportedFeatureKey {
150    /// The name of the feature.
151    pub name: String,
152    /// The minimum supported version for the feature.
153    pub min_version: i16,
154    /// The maximum supported version for the feature.
155    pub max_version: i16,
156    /// Unknown tagged fields.
157    pub unknown_tagged_fields: Vec<RawTaggedField>,
158}
159
160impl Encodable for SupportedFeatureKey {
161    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
162        if version > 3 {
163            Err(err_encode_message_unsupported(
164                version,
165                "SupportedFeatureKey",
166            ))?
167        }
168        NullableString(true).encode(buf, self.name.as_ref())?;
169        Int16.encode(buf, self.min_version)?;
170        Int16.encode(buf, self.max_version)?;
171        RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
172        Ok(())
173    }
174
175    fn calculate_size(&self, _version: i16) -> usize {
176        let mut res = 0;
177        res += NullableString(true).calculate_size(self.name.as_ref());
178        res += Int16::SIZE; // self.min_version
179        res += Int16::SIZE; // self.max_version
180        res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
181        res
182    }
183}
184
185#[derive(Debug, Default, Clone)]
186pub struct FinalizedFeatureKey {
187    /// The name of the feature.
188    pub name: String,
189    /// The cluster-wide finalized max version level for the feature.
190    pub max_version_level: i16,
191    /// The cluster-wide finalized min version level for the feature.
192    pub min_version_level: i16,
193    /// Unknown tagged fields.
194    pub unknown_tagged_fields: Vec<RawTaggedField>,
195}
196
197impl Encodable for FinalizedFeatureKey {
198    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
199        if version > 3 {
200            Err(err_encode_message_unsupported(
201                version,
202                "FinalizedFeatureKey",
203            ))?
204        }
205        NullableString(true).encode(buf, self.name.as_ref())?;
206        Int16.encode(buf, self.max_version_level)?;
207        Int16.encode(buf, self.min_version_level)?;
208        RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
209        Ok(())
210    }
211
212    fn calculate_size(&self, _version: i16) -> usize {
213        let mut res = 0;
214        res += NullableString(true).calculate_size(self.name.as_ref());
215        res += Int16::SIZE; // self.max_version_level
216        res += Int16::SIZE; // self.min_version_level
217        res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
218        res
219    }
220}