1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct MetadataRequest {
24 pub topics: Option<Vec<MetadataRequestTopic>>,
28
29 pub allow_auto_topic_creation: bool,
33
34 pub include_cluster_authorized_operations: bool,
38
39 pub include_topic_authorized_operations: bool,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl MetadataRequest {
49 pub fn with_topics(mut self, value: Option<Vec<MetadataRequestTopic>>) -> Self {
55 self.topics = value;
56 self
57 }
58 pub fn with_allow_auto_topic_creation(mut self, value: bool) -> Self {
64 self.allow_auto_topic_creation = value;
65 self
66 }
67 pub fn with_include_cluster_authorized_operations(mut self, value: bool) -> Self {
73 self.include_cluster_authorized_operations = value;
74 self
75 }
76 pub fn with_include_topic_authorized_operations(mut self, value: bool) -> Self {
82 self.include_topic_authorized_operations = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "client")]
98impl Encodable for MetadataRequest {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 if version >= 9 {
101 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
102 } else {
103 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
104 }
105 if version >= 4 {
106 types::Boolean.encode(buf, &self.allow_auto_topic_creation)?;
107 } else {
108 if !self.allow_auto_topic_creation {
109 bail!("A field is set that is not available on the selected protocol version");
110 }
111 }
112 if version >= 8 && version <= 10 {
113 types::Boolean.encode(buf, &self.include_cluster_authorized_operations)?;
114 } else {
115 if self.include_cluster_authorized_operations {
116 bail!("A field is set that is not available on the selected protocol version");
117 }
118 }
119 if version >= 8 {
120 types::Boolean.encode(buf, &self.include_topic_authorized_operations)?;
121 } else {
122 if self.include_topic_authorized_operations {
123 bail!("A field is set that is not available on the selected protocol version");
124 }
125 }
126 if version >= 9 {
127 let num_tagged_fields = self.unknown_tagged_fields.len();
128 if num_tagged_fields > std::u32::MAX as usize {
129 bail!(
130 "Too many tagged fields to encode ({} fields)",
131 num_tagged_fields
132 );
133 }
134 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
135
136 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
137 }
138 Ok(())
139 }
140 fn compute_size(&self, version: i16) -> Result<usize> {
141 let mut total_size = 0;
142 if version >= 9 {
143 total_size +=
144 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
145 } else {
146 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
147 }
148 if version >= 4 {
149 total_size += types::Boolean.compute_size(&self.allow_auto_topic_creation)?;
150 } else {
151 if !self.allow_auto_topic_creation {
152 bail!("A field is set that is not available on the selected protocol version");
153 }
154 }
155 if version >= 8 && version <= 10 {
156 total_size +=
157 types::Boolean.compute_size(&self.include_cluster_authorized_operations)?;
158 } else {
159 if self.include_cluster_authorized_operations {
160 bail!("A field is set that is not available on the selected protocol version");
161 }
162 }
163 if version >= 8 {
164 total_size += types::Boolean.compute_size(&self.include_topic_authorized_operations)?;
165 } else {
166 if self.include_topic_authorized_operations {
167 bail!("A field is set that is not available on the selected protocol version");
168 }
169 }
170 if version >= 9 {
171 let num_tagged_fields = self.unknown_tagged_fields.len();
172 if num_tagged_fields > std::u32::MAX as usize {
173 bail!(
174 "Too many tagged fields to encode ({} fields)",
175 num_tagged_fields
176 );
177 }
178 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
179
180 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
181 }
182 Ok(total_size)
183 }
184}
185
186#[cfg(feature = "broker")]
187impl Decodable for MetadataRequest {
188 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
189 let topics = if version >= 9 {
190 types::CompactArray(types::Struct { version }).decode(buf)?
191 } else {
192 types::Array(types::Struct { version }).decode(buf)?
193 };
194 let allow_auto_topic_creation = if version >= 4 {
195 types::Boolean.decode(buf)?
196 } else {
197 true
198 };
199 let include_cluster_authorized_operations = if version >= 8 && version <= 10 {
200 types::Boolean.decode(buf)?
201 } else {
202 false
203 };
204 let include_topic_authorized_operations = if version >= 8 {
205 types::Boolean.decode(buf)?
206 } else {
207 false
208 };
209 let mut unknown_tagged_fields = BTreeMap::new();
210 if version >= 9 {
211 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
212 for _ in 0..num_tagged_fields {
213 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
214 let size: u32 = types::UnsignedVarInt.decode(buf)?;
215 let unknown_value = buf.try_get_bytes(size as usize)?;
216 unknown_tagged_fields.insert(tag as i32, unknown_value);
217 }
218 }
219 Ok(Self {
220 topics,
221 allow_auto_topic_creation,
222 include_cluster_authorized_operations,
223 include_topic_authorized_operations,
224 unknown_tagged_fields,
225 })
226 }
227}
228
229impl Default for MetadataRequest {
230 fn default() -> Self {
231 Self {
232 topics: Some(Default::default()),
233 allow_auto_topic_creation: true,
234 include_cluster_authorized_operations: false,
235 include_topic_authorized_operations: false,
236 unknown_tagged_fields: BTreeMap::new(),
237 }
238 }
239}
240
241impl Message for MetadataRequest {
242 const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
243 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
244}
245
246#[non_exhaustive]
248#[derive(Debug, Clone, PartialEq)]
249pub struct MetadataRequestTopic {
250 pub topic_id: Uuid,
254
255 pub name: Option<super::TopicName>,
259
260 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
262}
263
264impl MetadataRequestTopic {
265 pub fn with_topic_id(mut self, value: Uuid) -> Self {
271 self.topic_id = value;
272 self
273 }
274 pub fn with_name(mut self, value: Option<super::TopicName>) -> Self {
280 self.name = value;
281 self
282 }
283 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
285 self.unknown_tagged_fields = value;
286 self
287 }
288 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
290 self.unknown_tagged_fields.insert(key, value);
291 self
292 }
293}
294
295#[cfg(feature = "client")]
296impl Encodable for MetadataRequestTopic {
297 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
298 if version >= 10 {
299 types::Uuid.encode(buf, &self.topic_id)?;
300 }
301 if version >= 9 {
302 types::CompactString.encode(buf, &self.name)?;
303 } else {
304 types::String.encode(buf, &self.name)?;
305 }
306 if version >= 9 {
307 let num_tagged_fields = self.unknown_tagged_fields.len();
308 if num_tagged_fields > std::u32::MAX as usize {
309 bail!(
310 "Too many tagged fields to encode ({} fields)",
311 num_tagged_fields
312 );
313 }
314 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
315
316 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
317 }
318 Ok(())
319 }
320 fn compute_size(&self, version: i16) -> Result<usize> {
321 let mut total_size = 0;
322 if version >= 10 {
323 total_size += types::Uuid.compute_size(&self.topic_id)?;
324 }
325 if version >= 9 {
326 total_size += types::CompactString.compute_size(&self.name)?;
327 } else {
328 total_size += types::String.compute_size(&self.name)?;
329 }
330 if version >= 9 {
331 let num_tagged_fields = self.unknown_tagged_fields.len();
332 if num_tagged_fields > std::u32::MAX as usize {
333 bail!(
334 "Too many tagged fields to encode ({} fields)",
335 num_tagged_fields
336 );
337 }
338 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
339
340 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
341 }
342 Ok(total_size)
343 }
344}
345
346#[cfg(feature = "broker")]
347impl Decodable for MetadataRequestTopic {
348 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
349 let topic_id = if version >= 10 {
350 types::Uuid.decode(buf)?
351 } else {
352 Uuid::nil()
353 };
354 let name = if version >= 9 {
355 types::CompactString.decode(buf)?
356 } else {
357 types::String.decode(buf)?
358 };
359 let mut unknown_tagged_fields = BTreeMap::new();
360 if version >= 9 {
361 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
362 for _ in 0..num_tagged_fields {
363 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
364 let size: u32 = types::UnsignedVarInt.decode(buf)?;
365 let unknown_value = buf.try_get_bytes(size as usize)?;
366 unknown_tagged_fields.insert(tag as i32, unknown_value);
367 }
368 }
369 Ok(Self {
370 topic_id,
371 name,
372 unknown_tagged_fields,
373 })
374 }
375}
376
377impl Default for MetadataRequestTopic {
378 fn default() -> Self {
379 Self {
380 topic_id: Uuid::nil(),
381 name: Some(Default::default()),
382 unknown_tagged_fields: BTreeMap::new(),
383 }
384 }
385}
386
387impl Message for MetadataRequestTopic {
388 const VERSIONS: VersionRange = VersionRange { min: 0, max: 12 };
389 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
390}
391
392impl HeaderVersion for MetadataRequest {
393 fn header_version(version: i16) -> i16 {
394 if version >= 9 {
395 2
396 } else {
397 1
398 }
399 }
400}