1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_bool, get_i16, get_i32, put_bool, put_i16, put_i32};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string,
10 string_len,
11};
12use crate::tagged_fields::{read_tagged_fields, tagged_fields_len, WriteTaggedFields};
13use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
14
15pub const API_KEY: i16 = 3;
16pub const MIN_VERSION: i16 = 0;
17pub const MAX_VERSION: i16 = 13;
18pub const FLEXIBLE_MIN: i16 = 9;
19
20#[inline]
21fn is_flexible(version: i16) -> bool { version >= FLEXIBLE_MIN }
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct MetadataResponse {
25 pub throttle_time_ms: i32,
26 pub brokers: Vec<MetadataResponseBroker>,
27 pub cluster_id: Option<String>,
28 pub controller_id: i32,
29 pub topics: Vec<MetadataResponseTopic>,
30 pub cluster_authorized_operations: i32,
31 pub error_code: i16,
32 pub unknown_tagged_fields: UnknownTaggedFields,
33}
34
35impl Default for MetadataResponse {
36 fn default() -> Self {
37 Self {
38 throttle_time_ms: 0i32,
39 brokers: Vec::new(),
40 cluster_id: None,
41 controller_id: -1i32,
42 topics: Vec::new(),
43 cluster_authorized_operations: -2_147_483_648i32,
44 error_code: 0i16,
45 unknown_tagged_fields: Default::default(),
46 }
47 }
48}
49
50impl Encode for MetadataResponse {
51 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
52 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
53 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
54 }
55 let flex = is_flexible(version);
56 if version >= 3 { put_i32(buf, self.throttle_time_ms) }
57 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.brokers).len(), flex); for it in &self.brokers { it.encode(buf, version)?; } } }
58 if version >= 2 { if flex { put_compact_nullable_string(buf, self.cluster_id.as_deref()) } else { put_nullable_string(buf, self.cluster_id.as_deref()) } }
59 if version >= 1 { put_i32(buf, self.controller_id) }
60 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.topics).len(), flex); for it in &self.topics { it.encode(buf, version)?; } } }
61 if version >= 8 && version <= 10 { put_i32(buf, self.cluster_authorized_operations) }
62 if version >= 13 { put_i16(buf, self.error_code) }
63 if flex {
64 let tagged = WriteTaggedFields::new();
65 tagged.write(buf, &self.unknown_tagged_fields);
66 }
67 Ok(())
68 }
69 fn encoded_len(&self, version: i16) -> usize {
70 let flex = is_flexible(version);
71 let mut n: usize = 0;
72 if version >= 3 { n += 4; }
73 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.brokers).len(), flex); let body: usize = (self.brokers).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
74 if version >= 2 { n += if flex { compact_nullable_string_len(self.cluster_id.as_deref()) } else { nullable_string_len(self.cluster_id.as_deref()) }; }
75 if version >= 1 { n += 4; }
76 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.topics).len(), flex); let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
77 if version >= 8 && version <= 10 { n += 4; }
78 if version >= 13 { n += 2; }
79 if flex {
80 let known_pairs: Vec<(u32, usize)> = Vec::new();
81 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
82 }
83 n
84 }
85}
86
87impl<'de> Decode<'de> for MetadataResponse {
88 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
89 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
90 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
91 }
92 let flex = is_flexible(version);
93 let mut out = Self::default();
94 if version >= 3 { out.throttle_time_ms = get_i32(buf)?; }
95 if version >= 0 { out.brokers = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(MetadataResponseBroker::decode(buf, version)?); } v }; }
96 if version >= 2 { out.cluster_id = if flex { get_compact_nullable_string_owned(buf)? } else { get_nullable_string_owned(buf)? }; }
97 if version >= 1 { out.controller_id = get_i32(buf)?; }
98 if version >= 0 { out.topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(MetadataResponseTopic::decode(buf, version)?); } v }; }
99 if version >= 8 && version <= 10 { out.cluster_authorized_operations = get_i32(buf)?; }
100 if version >= 13 { out.error_code = get_i16(buf)?; }
101 if flex {
102 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
103 Ok(false)
104 })?;
105 }
106 Ok(out)
107 }
108}
109
110#[derive(Debug, Clone, PartialEq, Eq, Default)]
111pub struct MetadataResponseBroker {
112 pub node_id: i32,
113 pub host: String,
114 pub port: i32,
115 pub rack: Option<String>,
116 pub unknown_tagged_fields: UnknownTaggedFields,
117}
118
119impl Encode for MetadataResponseBroker {
120 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
121 let flex = version >= 9;
122 if version >= 0 { put_i32(buf, self.node_id) }
123 if version >= 0 { if flex { put_compact_string(buf, &self.host) } else { put_string(buf, &self.host) } }
124 if version >= 0 { put_i32(buf, self.port) }
125 if version >= 1 { if flex { put_compact_nullable_string(buf, self.rack.as_deref()) } else { put_nullable_string(buf, self.rack.as_deref()) } }
126 if flex {
127 let tagged = WriteTaggedFields::new();
128 tagged.write(buf, &self.unknown_tagged_fields);
129 }
130 Ok(())
131 }
132 fn encoded_len(&self, version: i16) -> usize {
133 let flex = version >= 9;
134 let mut n: usize = 0;
135 if version >= 0 { n += 4; }
136 if version >= 0 { n += if flex { compact_string_len(&self.host) } else { string_len(&self.host) }; }
137 if version >= 0 { n += 4; }
138 if version >= 1 { n += if flex { compact_nullable_string_len(self.rack.as_deref()) } else { nullable_string_len(self.rack.as_deref()) }; }
139 if flex {
140 let known_pairs: Vec<(u32, usize)> = Vec::new();
141 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
142 }
143 n
144 }
145}
146
147impl<'de> Decode<'de> for MetadataResponseBroker {
148 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
149 let flex = version >= 9;
150 let mut out = Self::default();
151 if version >= 0 { out.node_id = get_i32(buf)?; }
152 if version >= 0 { out.host = if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }; }
153 if version >= 0 { out.port = get_i32(buf)?; }
154 if version >= 1 { out.rack = if flex { get_compact_nullable_string_owned(buf)? } else { get_nullable_string_owned(buf)? }; }
155 if flex {
156 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
157 Ok(false)
158 })?;
159 }
160 Ok(out)
161 }
162}
163
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub struct MetadataResponseTopic {
166 pub error_code: i16,
167 pub name: Option<String>,
168 pub topic_id: crate::primitives::uuid::Uuid,
169 pub is_internal: bool,
170 pub partitions: Vec<MetadataResponsePartition>,
171 pub topic_authorized_operations: i32,
172 pub unknown_tagged_fields: UnknownTaggedFields,
173}
174
175impl Default for MetadataResponseTopic {
176 fn default() -> Self {
177 Self {
178 error_code: 0i16,
179 name: None,
180 topic_id: Default::default(),
181 is_internal: false,
182 partitions: Vec::new(),
183 topic_authorized_operations: -2_147_483_648i32,
184 unknown_tagged_fields: Default::default(),
185 }
186 }
187}
188
189impl Encode for MetadataResponseTopic {
190 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
191 let flex = version >= 9;
192 if version >= 0 { put_i16(buf, self.error_code) }
193 if version >= 0 { if version >= 12 { if flex { put_compact_nullable_string(buf, self.name.as_deref()) } else { put_nullable_string(buf, self.name.as_deref()) } } else { if flex { put_compact_string(buf, (self.name).as_deref().unwrap_or("")) } else { put_string(buf, (self.name).as_deref().unwrap_or("")) } } }
194 if version >= 10 { crate::primitives::uuid::put_uuid(buf, self.topic_id) }
195 if version >= 1 { put_bool(buf, self.is_internal) }
196 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { it.encode(buf, version)?; } } }
197 if version >= 8 { put_i32(buf, self.topic_authorized_operations) }
198 if flex {
199 let tagged = WriteTaggedFields::new();
200 tagged.write(buf, &self.unknown_tagged_fields);
201 }
202 Ok(())
203 }
204 fn encoded_len(&self, version: i16) -> usize {
205 let flex = version >= 9;
206 let mut n: usize = 0;
207 if version >= 0 { n += 2; }
208 if version >= 0 { n += if version >= 12 { if flex { compact_nullable_string_len(self.name.as_deref()) } else { nullable_string_len(self.name.as_deref()) } } else { if flex { compact_string_len((self.name).as_deref().unwrap_or("")) } else { string_len((self.name).as_deref().unwrap_or("")) } }; }
209 if version >= 10 { n += 16; }
210 if version >= 1 { n += 1; }
211 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
212 if version >= 8 { n += 4; }
213 if flex {
214 let known_pairs: Vec<(u32, usize)> = Vec::new();
215 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
216 }
217 n
218 }
219}
220
221impl<'de> Decode<'de> for MetadataResponseTopic {
222 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
223 let flex = version >= 9;
224 let mut out = Self::default();
225 if version >= 0 { out.error_code = get_i16(buf)?; }
226 if version >= 0 { out.name = if version >= 12 { if flex { get_compact_nullable_string_owned(buf)? } else { get_nullable_string_owned(buf)? } } else { Some(if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }) }; }
227 if version >= 10 { out.topic_id = crate::primitives::uuid::get_uuid(buf)?; }
228 if version >= 1 { out.is_internal = get_bool(buf)?; }
229 if version >= 0 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(MetadataResponsePartition::decode(buf, version)?); } v }; }
230 if version >= 8 { out.topic_authorized_operations = get_i32(buf)?; }
231 if flex {
232 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
233 Ok(false)
234 })?;
235 }
236 Ok(out)
237 }
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct MetadataResponsePartition {
242 pub error_code: i16,
243 pub partition_index: i32,
244 pub leader_id: i32,
245 pub leader_epoch: i32,
246 pub replica_nodes: Vec<i32>,
247 pub isr_nodes: Vec<i32>,
248 pub offline_replicas: Vec<i32>,
249 pub unknown_tagged_fields: UnknownTaggedFields,
250}
251
252impl Default for MetadataResponsePartition {
253 fn default() -> Self {
254 Self {
255 error_code: 0i16,
256 partition_index: 0i32,
257 leader_id: 0i32,
258 leader_epoch: -1i32,
259 replica_nodes: Vec::new(),
260 isr_nodes: Vec::new(),
261 offline_replicas: Vec::new(),
262 unknown_tagged_fields: Default::default(),
263 }
264 }
265}
266
267impl Encode for MetadataResponsePartition {
268 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
269 let flex = version >= 9;
270 if version >= 0 { put_i16(buf, self.error_code) }
271 if version >= 0 { put_i32(buf, self.partition_index) }
272 if version >= 0 { put_i32(buf, self.leader_id) }
273 if version >= 7 { put_i32(buf, self.leader_epoch) }
274 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.replica_nodes).len(), flex); for it in &self.replica_nodes { put_i32(buf, *it); } } }
275 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.isr_nodes).len(), flex); for it in &self.isr_nodes { put_i32(buf, *it); } } }
276 if version >= 5 { { crate::primitives::array::put_array_len(buf, (self.offline_replicas).len(), flex); for it in &self.offline_replicas { put_i32(buf, *it); } } }
277 if flex {
278 let tagged = WriteTaggedFields::new();
279 tagged.write(buf, &self.unknown_tagged_fields);
280 }
281 Ok(())
282 }
283 fn encoded_len(&self, version: i16) -> usize {
284 let flex = version >= 9;
285 let mut n: usize = 0;
286 if version >= 0 { n += 2; }
287 if version >= 0 { n += 4; }
288 if version >= 0 { n += 4; }
289 if version >= 7 { n += 4; }
290 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.replica_nodes).len(), flex); let body: usize = (self.replica_nodes).iter().map(|_| 4).sum(); prefix + body }; }
291 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.isr_nodes).len(), flex); let body: usize = (self.isr_nodes).iter().map(|_| 4).sum(); prefix + body }; }
292 if version >= 5 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.offline_replicas).len(), flex); let body: usize = (self.offline_replicas).iter().map(|_| 4).sum(); prefix + body }; }
293 if flex {
294 let known_pairs: Vec<(u32, usize)> = Vec::new();
295 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
296 }
297 n
298 }
299}
300
301impl<'de> Decode<'de> for MetadataResponsePartition {
302 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
303 let flex = version >= 9;
304 let mut out = Self::default();
305 if version >= 0 { out.error_code = get_i16(buf)?; }
306 if version >= 0 { out.partition_index = get_i32(buf)?; }
307 if version >= 0 { out.leader_id = get_i32(buf)?; }
308 if version >= 7 { out.leader_epoch = get_i32(buf)?; }
309 if version >= 0 { out.replica_nodes = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(get_i32(buf)?); } v }; }
310 if version >= 0 { out.isr_nodes = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(get_i32(buf)?); } v }; }
311 if version >= 5 { out.offline_replicas = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(get_i32(buf)?); } v }; }
312 if flex {
313 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
314 Ok(false)
315 })?;
316 }
317 Ok(out)
318 }
319}
320
321#[must_use]
324#[allow(unused_comparisons)]
325pub fn default_json(version: i16) -> ::serde_json::Value {
326 let mut obj = ::serde_json::Map::new();
327 if version >= 3 {
328 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
329 }
330 obj.insert("brokers".to_string(), ::serde_json::Value::Array(vec![]));
331 if version >= 2 {
332 obj.insert("clusterId".to_string(), ::serde_json::Value::Null);
333 }
334 if version >= 1 {
335 obj.insert("controllerId".to_string(), ::serde_json::json!(-1));
336 }
337 obj.insert("topics".to_string(), ::serde_json::Value::Array(vec![]));
338 if version >= 8 && version <= 10 {
339 obj.insert("clusterAuthorizedOperations".to_string(), ::serde_json::json!(-2147483648));
340 }
341 if version >= 13 {
342 obj.insert("errorCode".to_string(), ::serde_json::json!(0));
343 }
344 ::serde_json::Value::Object(obj)
345}