1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct DescribeClusterBroker {
24 pub broker_id: super::BrokerId,
28
29 pub host: StrBytes,
33
34 pub port: i32,
38
39 pub rack: Option<StrBytes>,
43
44 pub is_fenced: bool,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl DescribeClusterBroker {
54 pub fn with_broker_id(mut self, value: super::BrokerId) -> Self {
60 self.broker_id = value;
61 self
62 }
63 pub fn with_host(mut self, value: StrBytes) -> Self {
69 self.host = value;
70 self
71 }
72 pub fn with_port(mut self, value: i32) -> Self {
78 self.port = value;
79 self
80 }
81 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
87 self.rack = value;
88 self
89 }
90 pub fn with_is_fenced(mut self, value: bool) -> Self {
96 self.is_fenced = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "broker")]
112impl Encodable for DescribeClusterBroker {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 if version < 0 || version > 2 {
115 bail!("specified version not supported by this message type");
116 }
117 types::Int32.encode(buf, &self.broker_id)?;
118 types::CompactString.encode(buf, &self.host)?;
119 types::Int32.encode(buf, &self.port)?;
120 types::CompactString.encode(buf, &self.rack)?;
121 if version >= 2 {
122 types::Boolean.encode(buf, &self.is_fenced)?;
123 } else {
124 if self.is_fenced {
125 bail!("A field is set that is not available on the selected protocol version");
126 }
127 }
128 let num_tagged_fields = self.unknown_tagged_fields.len();
129 if num_tagged_fields > std::u32::MAX as usize {
130 bail!(
131 "Too many tagged fields to encode ({} fields)",
132 num_tagged_fields
133 );
134 }
135 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
136
137 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
138 Ok(())
139 }
140 fn compute_size(&self, version: i16) -> Result<usize> {
141 let mut total_size = 0;
142 total_size += types::Int32.compute_size(&self.broker_id)?;
143 total_size += types::CompactString.compute_size(&self.host)?;
144 total_size += types::Int32.compute_size(&self.port)?;
145 total_size += types::CompactString.compute_size(&self.rack)?;
146 if version >= 2 {
147 total_size += types::Boolean.compute_size(&self.is_fenced)?;
148 } else {
149 if self.is_fenced {
150 bail!("A field is set that is not available on the selected protocol version");
151 }
152 }
153 let num_tagged_fields = self.unknown_tagged_fields.len();
154 if num_tagged_fields > std::u32::MAX as usize {
155 bail!(
156 "Too many tagged fields to encode ({} fields)",
157 num_tagged_fields
158 );
159 }
160 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
161
162 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
163 Ok(total_size)
164 }
165}
166
167#[cfg(feature = "client")]
168impl Decodable for DescribeClusterBroker {
169 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
170 if version < 0 || version > 2 {
171 bail!("specified version not supported by this message type");
172 }
173 let broker_id = types::Int32.decode(buf)?;
174 let host = types::CompactString.decode(buf)?;
175 let port = types::Int32.decode(buf)?;
176 let rack = types::CompactString.decode(buf)?;
177 let is_fenced = if version >= 2 {
178 types::Boolean.decode(buf)?
179 } else {
180 false
181 };
182 let mut unknown_tagged_fields = BTreeMap::new();
183 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
184 for _ in 0..num_tagged_fields {
185 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
186 let size: u32 = types::UnsignedVarInt.decode(buf)?;
187 let unknown_value = buf.try_get_bytes(size as usize)?;
188 unknown_tagged_fields.insert(tag as i32, unknown_value);
189 }
190 Ok(Self {
191 broker_id,
192 host,
193 port,
194 rack,
195 is_fenced,
196 unknown_tagged_fields,
197 })
198 }
199}
200
201impl Default for DescribeClusterBroker {
202 fn default() -> Self {
203 Self {
204 broker_id: (0).into(),
205 host: Default::default(),
206 port: 0,
207 rack: None,
208 is_fenced: false,
209 unknown_tagged_fields: BTreeMap::new(),
210 }
211 }
212}
213
214impl Message for DescribeClusterBroker {
215 const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
216 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
217}
218
219#[non_exhaustive]
221#[derive(Debug, Clone, PartialEq)]
222pub struct DescribeClusterResponse {
223 pub throttle_time_ms: i32,
227
228 pub error_code: i16,
232
233 pub error_message: Option<StrBytes>,
237
238 pub endpoint_type: i8,
242
243 pub cluster_id: StrBytes,
247
248 pub controller_id: super::BrokerId,
252
253 pub brokers: Vec<DescribeClusterBroker>,
257
258 pub cluster_authorized_operations: i32,
262
263 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
265}
266
267impl DescribeClusterResponse {
268 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
274 self.throttle_time_ms = value;
275 self
276 }
277 pub fn with_error_code(mut self, value: i16) -> Self {
283 self.error_code = value;
284 self
285 }
286 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
292 self.error_message = value;
293 self
294 }
295 pub fn with_endpoint_type(mut self, value: i8) -> Self {
301 self.endpoint_type = value;
302 self
303 }
304 pub fn with_cluster_id(mut self, value: StrBytes) -> Self {
310 self.cluster_id = value;
311 self
312 }
313 pub fn with_controller_id(mut self, value: super::BrokerId) -> Self {
319 self.controller_id = value;
320 self
321 }
322 pub fn with_brokers(mut self, value: Vec<DescribeClusterBroker>) -> Self {
328 self.brokers = value;
329 self
330 }
331 pub fn with_cluster_authorized_operations(mut self, value: i32) -> Self {
337 self.cluster_authorized_operations = value;
338 self
339 }
340 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
342 self.unknown_tagged_fields = value;
343 self
344 }
345 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
347 self.unknown_tagged_fields.insert(key, value);
348 self
349 }
350}
351
352#[cfg(feature = "broker")]
353impl Encodable for DescribeClusterResponse {
354 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
355 if version < 0 || version > 2 {
356 bail!("specified version not supported by this message type");
357 }
358 types::Int32.encode(buf, &self.throttle_time_ms)?;
359 types::Int16.encode(buf, &self.error_code)?;
360 types::CompactString.encode(buf, &self.error_message)?;
361 if version >= 1 {
362 types::Int8.encode(buf, &self.endpoint_type)?;
363 } else {
364 if self.endpoint_type != 1 {
365 bail!("A field is set that is not available on the selected protocol version");
366 }
367 }
368 types::CompactString.encode(buf, &self.cluster_id)?;
369 types::Int32.encode(buf, &self.controller_id)?;
370 types::CompactArray(types::Struct { version }).encode(buf, &self.brokers)?;
371 types::Int32.encode(buf, &self.cluster_authorized_operations)?;
372 let num_tagged_fields = self.unknown_tagged_fields.len();
373 if num_tagged_fields > std::u32::MAX as usize {
374 bail!(
375 "Too many tagged fields to encode ({} fields)",
376 num_tagged_fields
377 );
378 }
379 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
380
381 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
382 Ok(())
383 }
384 fn compute_size(&self, version: i16) -> Result<usize> {
385 let mut total_size = 0;
386 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
387 total_size += types::Int16.compute_size(&self.error_code)?;
388 total_size += types::CompactString.compute_size(&self.error_message)?;
389 if version >= 1 {
390 total_size += types::Int8.compute_size(&self.endpoint_type)?;
391 } else {
392 if self.endpoint_type != 1 {
393 bail!("A field is set that is not available on the selected protocol version");
394 }
395 }
396 total_size += types::CompactString.compute_size(&self.cluster_id)?;
397 total_size += types::Int32.compute_size(&self.controller_id)?;
398 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.brokers)?;
399 total_size += types::Int32.compute_size(&self.cluster_authorized_operations)?;
400 let num_tagged_fields = self.unknown_tagged_fields.len();
401 if num_tagged_fields > std::u32::MAX as usize {
402 bail!(
403 "Too many tagged fields to encode ({} fields)",
404 num_tagged_fields
405 );
406 }
407 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
408
409 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
410 Ok(total_size)
411 }
412}
413
414#[cfg(feature = "client")]
415impl Decodable for DescribeClusterResponse {
416 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
417 if version < 0 || version > 2 {
418 bail!("specified version not supported by this message type");
419 }
420 let throttle_time_ms = types::Int32.decode(buf)?;
421 let error_code = types::Int16.decode(buf)?;
422 let error_message = types::CompactString.decode(buf)?;
423 let endpoint_type = if version >= 1 {
424 types::Int8.decode(buf)?
425 } else {
426 1
427 };
428 let cluster_id = types::CompactString.decode(buf)?;
429 let controller_id = types::Int32.decode(buf)?;
430 let brokers = types::CompactArray(types::Struct { version }).decode(buf)?;
431 let cluster_authorized_operations = types::Int32.decode(buf)?;
432 let mut unknown_tagged_fields = BTreeMap::new();
433 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
434 for _ in 0..num_tagged_fields {
435 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
436 let size: u32 = types::UnsignedVarInt.decode(buf)?;
437 let unknown_value = buf.try_get_bytes(size as usize)?;
438 unknown_tagged_fields.insert(tag as i32, unknown_value);
439 }
440 Ok(Self {
441 throttle_time_ms,
442 error_code,
443 error_message,
444 endpoint_type,
445 cluster_id,
446 controller_id,
447 brokers,
448 cluster_authorized_operations,
449 unknown_tagged_fields,
450 })
451 }
452}
453
454impl Default for DescribeClusterResponse {
455 fn default() -> Self {
456 Self {
457 throttle_time_ms: 0,
458 error_code: 0,
459 error_message: None,
460 endpoint_type: 1,
461 cluster_id: Default::default(),
462 controller_id: (-1).into(),
463 brokers: Default::default(),
464 cluster_authorized_operations: -2147483648,
465 unknown_tagged_fields: BTreeMap::new(),
466 }
467 }
468}
469
470impl Message for DescribeClusterResponse {
471 const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
472 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
473}
474
475impl HeaderVersion for DescribeClusterResponse {
476 fn header_version(version: i16) -> i16 {
477 1
478 }
479}