1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct DescribeClusterBroker {
24 pub broker_id: super::BrokerId,
28
29 pub host: StrBytes,
33
34 pub port: i32,
38
39 pub rack: Option<StrBytes>,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl DescribeClusterBroker {
49 pub fn with_broker_id(mut self, value: super::BrokerId) -> Self {
55 self.broker_id = value;
56 self
57 }
58 pub fn with_host(mut self, value: StrBytes) -> Self {
64 self.host = value;
65 self
66 }
67 pub fn with_port(mut self, value: i32) -> Self {
73 self.port = value;
74 self
75 }
76 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
82 self.rack = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "broker")]
98impl Encodable for DescribeClusterBroker {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 types::Int32.encode(buf, &self.broker_id)?;
101 types::CompactString.encode(buf, &self.host)?;
102 types::Int32.encode(buf, &self.port)?;
103 types::CompactString.encode(buf, &self.rack)?;
104 let num_tagged_fields = self.unknown_tagged_fields.len();
105 if num_tagged_fields > std::u32::MAX as usize {
106 bail!(
107 "Too many tagged fields to encode ({} fields)",
108 num_tagged_fields
109 );
110 }
111 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
112
113 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
114 Ok(())
115 }
116 fn compute_size(&self, version: i16) -> Result<usize> {
117 let mut total_size = 0;
118 total_size += types::Int32.compute_size(&self.broker_id)?;
119 total_size += types::CompactString.compute_size(&self.host)?;
120 total_size += types::Int32.compute_size(&self.port)?;
121 total_size += types::CompactString.compute_size(&self.rack)?;
122 let num_tagged_fields = self.unknown_tagged_fields.len();
123 if num_tagged_fields > std::u32::MAX as usize {
124 bail!(
125 "Too many tagged fields to encode ({} fields)",
126 num_tagged_fields
127 );
128 }
129 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
130
131 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
132 Ok(total_size)
133 }
134}
135
136#[cfg(feature = "client")]
137impl Decodable for DescribeClusterBroker {
138 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
139 let broker_id = types::Int32.decode(buf)?;
140 let host = types::CompactString.decode(buf)?;
141 let port = types::Int32.decode(buf)?;
142 let rack = types::CompactString.decode(buf)?;
143 let mut unknown_tagged_fields = BTreeMap::new();
144 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
145 for _ in 0..num_tagged_fields {
146 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
147 let size: u32 = types::UnsignedVarInt.decode(buf)?;
148 let unknown_value = buf.try_get_bytes(size as usize)?;
149 unknown_tagged_fields.insert(tag as i32, unknown_value);
150 }
151 Ok(Self {
152 broker_id,
153 host,
154 port,
155 rack,
156 unknown_tagged_fields,
157 })
158 }
159}
160
161impl Default for DescribeClusterBroker {
162 fn default() -> Self {
163 Self {
164 broker_id: (0).into(),
165 host: Default::default(),
166 port: 0,
167 rack: None,
168 unknown_tagged_fields: BTreeMap::new(),
169 }
170 }
171}
172
173impl Message for DescribeClusterBroker {
174 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
175 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
176}
177
178#[non_exhaustive]
180#[derive(Debug, Clone, PartialEq)]
181pub struct DescribeClusterResponse {
182 pub throttle_time_ms: i32,
186
187 pub error_code: i16,
191
192 pub error_message: Option<StrBytes>,
196
197 pub endpoint_type: i8,
201
202 pub cluster_id: StrBytes,
206
207 pub controller_id: super::BrokerId,
211
212 pub brokers: Vec<DescribeClusterBroker>,
216
217 pub cluster_authorized_operations: i32,
221
222 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
224}
225
226impl DescribeClusterResponse {
227 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
233 self.throttle_time_ms = value;
234 self
235 }
236 pub fn with_error_code(mut self, value: i16) -> Self {
242 self.error_code = value;
243 self
244 }
245 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
251 self.error_message = value;
252 self
253 }
254 pub fn with_endpoint_type(mut self, value: i8) -> Self {
260 self.endpoint_type = value;
261 self
262 }
263 pub fn with_cluster_id(mut self, value: StrBytes) -> Self {
269 self.cluster_id = value;
270 self
271 }
272 pub fn with_controller_id(mut self, value: super::BrokerId) -> Self {
278 self.controller_id = value;
279 self
280 }
281 pub fn with_brokers(mut self, value: Vec<DescribeClusterBroker>) -> Self {
287 self.brokers = value;
288 self
289 }
290 pub fn with_cluster_authorized_operations(mut self, value: i32) -> Self {
296 self.cluster_authorized_operations = value;
297 self
298 }
299 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
301 self.unknown_tagged_fields = value;
302 self
303 }
304 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
306 self.unknown_tagged_fields.insert(key, value);
307 self
308 }
309}
310
311#[cfg(feature = "broker")]
312impl Encodable for DescribeClusterResponse {
313 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
314 types::Int32.encode(buf, &self.throttle_time_ms)?;
315 types::Int16.encode(buf, &self.error_code)?;
316 types::CompactString.encode(buf, &self.error_message)?;
317 if version >= 1 {
318 types::Int8.encode(buf, &self.endpoint_type)?;
319 } else {
320 if self.endpoint_type != 1 {
321 bail!("A field is set that is not available on the selected protocol version");
322 }
323 }
324 types::CompactString.encode(buf, &self.cluster_id)?;
325 types::Int32.encode(buf, &self.controller_id)?;
326 types::CompactArray(types::Struct { version }).encode(buf, &self.brokers)?;
327 types::Int32.encode(buf, &self.cluster_authorized_operations)?;
328 let num_tagged_fields = self.unknown_tagged_fields.len();
329 if num_tagged_fields > std::u32::MAX as usize {
330 bail!(
331 "Too many tagged fields to encode ({} fields)",
332 num_tagged_fields
333 );
334 }
335 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
336
337 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
338 Ok(())
339 }
340 fn compute_size(&self, version: i16) -> Result<usize> {
341 let mut total_size = 0;
342 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
343 total_size += types::Int16.compute_size(&self.error_code)?;
344 total_size += types::CompactString.compute_size(&self.error_message)?;
345 if version >= 1 {
346 total_size += types::Int8.compute_size(&self.endpoint_type)?;
347 } else {
348 if self.endpoint_type != 1 {
349 bail!("A field is set that is not available on the selected protocol version");
350 }
351 }
352 total_size += types::CompactString.compute_size(&self.cluster_id)?;
353 total_size += types::Int32.compute_size(&self.controller_id)?;
354 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.brokers)?;
355 total_size += types::Int32.compute_size(&self.cluster_authorized_operations)?;
356 let num_tagged_fields = self.unknown_tagged_fields.len();
357 if num_tagged_fields > std::u32::MAX as usize {
358 bail!(
359 "Too many tagged fields to encode ({} fields)",
360 num_tagged_fields
361 );
362 }
363 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
364
365 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
366 Ok(total_size)
367 }
368}
369
370#[cfg(feature = "client")]
371impl Decodable for DescribeClusterResponse {
372 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
373 let throttle_time_ms = types::Int32.decode(buf)?;
374 let error_code = types::Int16.decode(buf)?;
375 let error_message = types::CompactString.decode(buf)?;
376 let endpoint_type = if version >= 1 {
377 types::Int8.decode(buf)?
378 } else {
379 1
380 };
381 let cluster_id = types::CompactString.decode(buf)?;
382 let controller_id = types::Int32.decode(buf)?;
383 let brokers = types::CompactArray(types::Struct { version }).decode(buf)?;
384 let cluster_authorized_operations = types::Int32.decode(buf)?;
385 let mut unknown_tagged_fields = BTreeMap::new();
386 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
387 for _ in 0..num_tagged_fields {
388 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
389 let size: u32 = types::UnsignedVarInt.decode(buf)?;
390 let unknown_value = buf.try_get_bytes(size as usize)?;
391 unknown_tagged_fields.insert(tag as i32, unknown_value);
392 }
393 Ok(Self {
394 throttle_time_ms,
395 error_code,
396 error_message,
397 endpoint_type,
398 cluster_id,
399 controller_id,
400 brokers,
401 cluster_authorized_operations,
402 unknown_tagged_fields,
403 })
404 }
405}
406
407impl Default for DescribeClusterResponse {
408 fn default() -> Self {
409 Self {
410 throttle_time_ms: 0,
411 error_code: 0,
412 error_message: None,
413 endpoint_type: 1,
414 cluster_id: Default::default(),
415 controller_id: (-1).into(),
416 brokers: Default::default(),
417 cluster_authorized_operations: -2147483648,
418 unknown_tagged_fields: BTreeMap::new(),
419 }
420 }
421}
422
423impl Message for DescribeClusterResponse {
424 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
425 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
426}
427
428impl HeaderVersion for DescribeClusterResponse {
429 fn header_version(version: i16) -> i16 {
430 1
431 }
432}