kafka_api/schemata/
metadata_response.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use byteorder::WriteBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20// Version 1 adds fields for the rack of each broker, the controller id, and
21// whether the topic is internal.
22//
23// Version 2 adds the cluster ID field.
24//
25// Version 3 adds the throttle time.
26//
27// Version 4 is the same as version 3.
28//
29// Version 5 adds a per-partition offline_replicas field. This field specifies
30// the list of replicas that are offline.
31//
32// Starting in version 6, on quota violation, brokers send out responses before throttling.
33//
34// Version 7 adds the leader epoch to the partition metadata.
35//
36// Starting in version 8, brokers can send authorized operations for topic and cluster.
37//
38// Version 9 is the first flexible version.
39//
40// Version 10 adds topicId.
41//
42// Version 11 deprecates ClusterAuthorizedOperations. This is now exposed
43// by the DescribeCluster API (KIP-700).
44//
45// Version 12 supports topicId.
46
47#[derive(Debug, Default, Clone)]
48pub struct MetadataResponse {
49    /// The duration in milliseconds for which the request was throttled due to a quota violation,
50    /// or zero if the request did not violate any quota.
51    pub throttle_time_ms: i32,
52    /// Each broker in the response.
53    pub brokers: Vec<MetadataResponseBroker>,
54    /// The cluster ID that responding broker belongs to.
55    pub cluster_id: Option<String>,
56    /// The ID of the controller broker.
57    pub controller_id: i32,
58    /// Each topic in the response.
59    pub topics: Vec<MetadataResponseTopic>,
60    /// 32-bit bitfield to represent authorized operations for this cluster.
61    pub cluster_authorized_operations: i32,
62    /// Unknown tagged fields.
63    pub unknown_tagged_fields: Vec<RawTaggedField>,
64}
65
66impl Encodable for MetadataResponse {
67    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
68        if version >= 3 {
69            Int32.encode(buf, self.throttle_time_ms)?;
70        }
71        NullableArray(Struct(version), version >= 9).encode(buf, self.brokers.as_slice())?;
72        if version >= 2 {
73            NullableString(version >= 9).encode(buf, self.cluster_id.as_deref())?;
74        }
75        if version >= 1 {
76            Int32.encode(buf, self.controller_id)?;
77        }
78        NullableArray(Struct(version), version >= 9).encode(buf, self.topics.as_slice())?;
79        if (8..=10).contains(&version) {
80            Int32.encode(buf, self.cluster_authorized_operations)?;
81        }
82        if version >= 9 {
83            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
84        }
85        Ok(())
86    }
87
88    fn calculate_size(&self, version: i16) -> usize {
89        let mut res = 0;
90        if version >= 3 {
91            res += Int32::SIZE; // self.throttle_time_ms
92        }
93        res += NullableArray(Struct(version), version >= 9).calculate_size(self.brokers.as_slice());
94        if version >= 2 {
95            res += NullableString(version >= 9).calculate_size(self.cluster_id.as_deref());
96        }
97        if version >= 1 {
98            res += Int32::SIZE; // self.controller_id
99        }
100        res += NullableArray(Struct(version), version >= 9).calculate_size(self.topics.as_slice());
101        if (8..=10).contains(&version) {
102            res += Int32::SIZE; // self.cluster_authorized_operations
103        }
104        if version >= 9 {
105            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
106        }
107        res
108    }
109}
110
111#[derive(Debug, Default, Clone)]
112pub struct MetadataResponseBroker {
113    /// The broker ID.
114    pub node_id: i32,
115    /// The broker hostname.
116    pub host: String,
117    /// The broker port.
118    pub port: i32,
119    /// The rack of the broker, or null if it has not been assigned to a rack.
120    pub rack: Option<String>,
121    /// Unknown tagged fields.
122    pub unknown_tagged_fields: Vec<RawTaggedField>,
123}
124
125impl Encodable for MetadataResponseBroker {
126    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
127        Int32.encode(buf, self.node_id)?;
128        NullableString(version >= 9).encode(buf, self.host.as_str())?;
129        Int32.encode(buf, self.port)?;
130        if version >= 1 {
131            NullableString(version >= 9).encode(buf, self.rack.as_deref())?;
132        }
133        if version >= 9 {
134            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
135        }
136        Ok(())
137    }
138
139    fn calculate_size(&self, version: i16) -> usize {
140        let mut res = 0;
141        res += Int32::SIZE; // self.node_id
142        res += NullableString(version >= 9).calculate_size(self.host.as_str());
143        res += Int32::SIZE; // self.port
144        if version >= 1 {
145            res += NullableString(version >= 9).calculate_size(self.rack.as_deref());
146        }
147        if version >= 9 {
148            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
149        }
150        res
151    }
152}
153
154#[derive(Debug, Default, Clone)]
155pub struct MetadataResponseTopic {
156    /// The topic error, or 0 if there was no error.
157    pub error_code: i16,
158    /// The topic name.
159    pub name: Option<String>,
160    /// The topic id.
161    pub topic_id: uuid::Uuid,
162    /// True if the topic is internal.
163    pub is_internal: bool,
164    /// Each partition in the topic.
165    pub partitions: Vec<MetadataResponsePartition>,
166    /// 32-bit bitfield to represent authorized operations for this topic.
167    pub topic_authorized_operations: i32,
168    /// Unknown tagged fields.
169    pub unknown_tagged_fields: Vec<RawTaggedField>,
170}
171
172impl Encodable for MetadataResponseTopic {
173    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
174        Int16.encode(buf, self.error_code)?;
175        match self.name {
176            None => {
177                if version >= 12 {
178                    NullableString(true).encode(buf, None)?;
179                } else {
180                    Err(err_encode_message_null("name"))?;
181                }
182            }
183            Some(ref name) => {
184                NullableString(version >= 9).encode(buf, name.as_str())?;
185            }
186        }
187        if version >= 10 {
188            Uuid.encode(buf, self.topic_id)?;
189        }
190        if version >= 1 {
191            Bool.encode(buf, self.is_internal)?;
192        }
193        NullableArray(Struct(version), version >= 9).encode(buf, self.partitions.as_slice())?;
194        if version >= 8 {
195            Int32.encode(buf, self.topic_authorized_operations)?;
196        }
197        if version >= 9 {
198            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
199        }
200        Ok(())
201    }
202
203    fn calculate_size(&self, version: i16) -> usize {
204        let mut res = 0;
205        res += Int16::SIZE; // self.error_code
206        res += NullableString(version >= 9).calculate_size(self.name.as_deref());
207        if version >= 10 {
208            res += Uuid::SIZE; // self.topic_id
209        }
210        if version >= 1 {
211            res += Bool::SIZE; // self.is_internal
212        }
213        res +=
214            NullableArray(Struct(version), version >= 9).calculate_size(self.partitions.as_slice());
215        if version >= 8 {
216            res += Int32::SIZE; // self.topic_authorized_operations
217        }
218        if version >= 9 {
219            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
220        }
221        res
222    }
223}
224
225#[derive(Debug, Default, Clone)]
226pub struct MetadataResponsePartition {
227    /// The partition error, or 0 if there was no error.
228    pub error_code: i16,
229    /// The partition index.
230    pub partition_index: i32,
231    /// The ID of the leader broker.
232    pub leader_id: i32,
233    /// The leader epoch of this partition.
234    pub leader_epoch: i32,
235    /// The set of all nodes that host this partition.
236    pub replica_nodes: Vec<i32>,
237    /// The set of nodes that are in sync with the leader for this partition.
238    pub isr_nodes: Vec<i32>,
239    /// The set of offline replicas of this partition.
240    pub offline_replicas: Vec<i32>,
241    /// Unknown tagged fields.
242    pub unknown_tagged_fields: Vec<RawTaggedField>,
243}
244
245impl Encodable for MetadataResponsePartition {
246    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
247        Int16.encode(buf, self.error_code)?;
248        Int32.encode(buf, self.partition_index)?;
249        Int32.encode(buf, self.leader_id)?;
250        if version >= 7 {
251            Int32.encode(buf, self.leader_epoch)?;
252        }
253        NullableArray(Int32, version >= 9).encode(buf, self.replica_nodes.as_slice())?;
254        NullableArray(Int32, version >= 9).encode(buf, self.isr_nodes.as_slice())?;
255        if version >= 5 {
256            NullableArray(Int32, version >= 9).encode(buf, self.offline_replicas.as_slice())?;
257        }
258        if version >= 9 {
259            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
260        }
261        Ok(())
262    }
263
264    fn calculate_size(&self, version: i16) -> usize {
265        let mut res = 0;
266        res += Int16::SIZE; // self.error_code
267        res += Int32::SIZE; // self.partition_index
268        res += Int32::SIZE; // self.leader_id
269        if version >= 7 {
270            res += Int32::SIZE; // self.leader_epoch
271        }
272        res += NullableArray(Int32, version >= 9).calculate_size(self.replica_nodes.as_slice());
273        res += NullableArray(Int32, version >= 9).calculate_size(self.isr_nodes.as_slice());
274        if version >= 5 {
275            res +=
276                NullableArray(Int32, version >= 9).calculate_size(self.offline_replicas.as_slice());
277        }
278        if version >= 9 {
279            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
280        }
281        res
282    }
283}