1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct MetadataRequest {
24 pub topics: Option<Vec<MetadataRequestTopic>>,
28
29 pub allow_auto_topic_creation: bool,
33
34 pub include_cluster_authorized_operations: bool,
38
39 pub include_topic_authorized_operations: bool,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl MetadataRequest {
49 pub fn with_topics(mut self, value: Option<Vec<MetadataRequestTopic>>) -> Self {
55 self.topics = value;
56 self
57 }
58 pub fn with_allow_auto_topic_creation(mut self, value: bool) -> Self {
64 self.allow_auto_topic_creation = value;
65 self
66 }
67 pub fn with_include_cluster_authorized_operations(mut self, value: bool) -> Self {
73 self.include_cluster_authorized_operations = value;
74 self
75 }
76 pub fn with_include_topic_authorized_operations(mut self, value: bool) -> Self {
82 self.include_topic_authorized_operations = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "client")]
98impl Encodable for MetadataRequest {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 if version < 0 || version > 12 {
101 bail!("specified version not supported by this message type");
102 }
103 if version >= 9 {
104 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
105 } else {
106 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
107 }
108 if version >= 4 {
109 types::Boolean.encode(buf, &self.allow_auto_topic_creation)?;
110 } else {
111 if !self.allow_auto_topic_creation {
112 bail!("A field is set that is not available on the selected protocol version");
113 }
114 }
115 if version >= 8 && version <= 10 {
116 types::Boolean.encode(buf, &self.include_cluster_authorized_operations)?;
117 } else {
118 if self.include_cluster_authorized_operations {
119 bail!("A field is set that is not available on the selected protocol version");
120 }
121 }
122 if version >= 8 {
123 types::Boolean.encode(buf, &self.include_topic_authorized_operations)?;
124 } else {
125 if self.include_topic_authorized_operations {
126 bail!("A field is set that is not available on the selected protocol version");
127 }
128 }
129 if version >= 9 {
130 let num_tagged_fields = self.unknown_tagged_fields.len();
131 if num_tagged_fields > std::u32::MAX as usize {
132 bail!(
133 "Too many tagged fields to encode ({} fields)",
134 num_tagged_fields
135 );
136 }
137 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
138
139 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
140 }
141 Ok(())
142 }
143 fn compute_size(&self, version: i16) -> Result<usize> {
144 let mut total_size = 0;
145 if version >= 9 {
146 total_size +=
147 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
148 } else {
149 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
150 }
151 if version >= 4 {
152 total_size += types::Boolean.compute_size(&self.allow_auto_topic_creation)?;
153 } else {
154 if !self.allow_auto_topic_creation {
155 bail!("A field is set that is not available on the selected protocol version");
156 }
157 }
158 if version >= 8 && version <= 10 {
159 total_size +=
160 types::Boolean.compute_size(&self.include_cluster_authorized_operations)?;
161 } else {
162 if self.include_cluster_authorized_operations {
163 bail!("A field is set that is not available on the selected protocol version");
164 }
165 }
166 if version >= 8 {
167 total_size += types::Boolean.compute_size(&self.include_topic_authorized_operations)?;
168 } else {
169 if self.include_topic_authorized_operations {
170 bail!("A field is set that is not available on the selected protocol version");
171 }
172 }
173 if version >= 9 {
174 let num_tagged_fields = self.unknown_tagged_fields.len();
175 if num_tagged_fields > std::u32::MAX as usize {
176 bail!(
177 "Too many tagged fields to encode ({} fields)",
178 num_tagged_fields
179 );
180 }
181 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
182
183 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
184 }
185 Ok(total_size)
186 }
187}
188
189#[cfg(feature = "broker")]
190impl Decodable for MetadataRequest {
191 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
192 if version < 0 || version > 12 {
193 bail!("specified version not supported by this message type");
194 }
195 let topics = if version >= 9 {
196 types::CompactArray(types::Struct { version }).decode(buf)?
197 } else {
198 types::Array(types::Struct { version }).decode(buf)?
199 };
200 let allow_auto_topic_creation = if version >= 4 {
201 types::Boolean.decode(buf)?
202 } else {
203 true
204 };
205 let include_cluster_authorized_operations = if version >= 8 && version <= 10 {
206 types::Boolean.decode(buf)?
207 } else {
208 false
209 };
210 let include_topic_authorized_operations = if version >= 8 {
211 types::Boolean.decode(buf)?
212 } else {
213 false
214 };
215 let mut unknown_tagged_fields = BTreeMap::new();
216 if version >= 9 {
217 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
218 for _ in 0..num_tagged_fields {
219 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
220 let size: u32 = types::UnsignedVarInt.decode(buf)?;
221 let unknown_value = buf.try_get_bytes(size as usize)?;
222 unknown_tagged_fields.insert(tag as i32, unknown_value);
223 }
224 }
225 Ok(Self {
226 topics,
227 allow_auto_topic_creation,
228 include_cluster_authorized_operations,
229 include_topic_authorized_operations,
230 unknown_tagged_fields,
231 })
232 }
233}
234
235impl Default for MetadataRequest {
236 fn default() -> Self {
237 Self {
238 topics: Some(Default::default()),
239 allow_auto_topic_creation: true,
240 include_cluster_authorized_operations: false,
241 include_topic_authorized_operations: false,
242 unknown_tagged_fields: BTreeMap::new(),
243 }
244 }
245}
246
247impl Message for MetadataRequest {
248 const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
249 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
250}
251
252#[non_exhaustive]
254#[derive(Debug, Clone, PartialEq)]
255pub struct MetadataRequestTopic {
256 pub topic_id: Uuid,
260
261 pub name: Option<super::TopicName>,
265
266 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
268}
269
270impl MetadataRequestTopic {
271 pub fn with_topic_id(mut self, value: Uuid) -> Self {
277 self.topic_id = value;
278 self
279 }
280 pub fn with_name(mut self, value: Option<super::TopicName>) -> Self {
286 self.name = value;
287 self
288 }
289 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
291 self.unknown_tagged_fields = value;
292 self
293 }
294 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
296 self.unknown_tagged_fields.insert(key, value);
297 self
298 }
299}
300
301#[cfg(feature = "client")]
302impl Encodable for MetadataRequestTopic {
303 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
304 if version < 0 || version > 12 {
305 bail!("specified version not supported by this message type");
306 }
307 if version >= 10 {
308 types::Uuid.encode(buf, &self.topic_id)?;
309 }
310 if version >= 9 {
311 types::CompactString.encode(buf, &self.name)?;
312 } else {
313 types::String.encode(buf, &self.name)?;
314 }
315 if version >= 9 {
316 let num_tagged_fields = self.unknown_tagged_fields.len();
317 if num_tagged_fields > std::u32::MAX as usize {
318 bail!(
319 "Too many tagged fields to encode ({} fields)",
320 num_tagged_fields
321 );
322 }
323 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
324
325 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
326 }
327 Ok(())
328 }
329 fn compute_size(&self, version: i16) -> Result<usize> {
330 let mut total_size = 0;
331 if version >= 10 {
332 total_size += types::Uuid.compute_size(&self.topic_id)?;
333 }
334 if version >= 9 {
335 total_size += types::CompactString.compute_size(&self.name)?;
336 } else {
337 total_size += types::String.compute_size(&self.name)?;
338 }
339 if version >= 9 {
340 let num_tagged_fields = self.unknown_tagged_fields.len();
341 if num_tagged_fields > std::u32::MAX as usize {
342 bail!(
343 "Too many tagged fields to encode ({} fields)",
344 num_tagged_fields
345 );
346 }
347 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
348
349 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
350 }
351 Ok(total_size)
352 }
353}
354
355#[cfg(feature = "broker")]
356impl Decodable for MetadataRequestTopic {
357 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
358 if version < 0 || version > 12 {
359 bail!("specified version not supported by this message type");
360 }
361 let topic_id = if version >= 10 {
362 types::Uuid.decode(buf)?
363 } else {
364 Uuid::nil()
365 };
366 let name = if version >= 9 {
367 types::CompactString.decode(buf)?
368 } else {
369 types::String.decode(buf)?
370 };
371 let mut unknown_tagged_fields = BTreeMap::new();
372 if version >= 9 {
373 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
374 for _ in 0..num_tagged_fields {
375 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
376 let size: u32 = types::UnsignedVarInt.decode(buf)?;
377 let unknown_value = buf.try_get_bytes(size as usize)?;
378 unknown_tagged_fields.insert(tag as i32, unknown_value);
379 }
380 }
381 Ok(Self {
382 topic_id,
383 name,
384 unknown_tagged_fields,
385 })
386 }
387}
388
389impl Default for MetadataRequestTopic {
390 fn default() -> Self {
391 Self {
392 topic_id: Uuid::nil(),
393 name: Some(Default::default()),
394 unknown_tagged_fields: BTreeMap::new(),
395 }
396 }
397}
398
399impl Message for MetadataRequestTopic {
400 const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
401 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
402}
403
404impl HeaderVersion for MetadataRequest {
405 fn header_version(version: i16) -> i16 {
406 if version >= 9 {
407 2
408 } else {
409 1
410 }
411 }
412}