1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct BrokerRegistrationRequest {
24 pub broker_id: super::BrokerId,
28
29 pub cluster_id: StrBytes,
33
34 pub incarnation_id: Uuid,
38
39 pub listeners: Vec<Listener>,
43
44 pub features: Vec<Feature>,
48
49 pub rack: Option<StrBytes>,
53
54 pub is_migrating_zk_broker: bool,
58
59 pub log_dirs: Vec<Uuid>,
63
64 pub previous_broker_epoch: i64,
68
69 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
71}
72
73impl BrokerRegistrationRequest {
74 pub fn with_broker_id(mut self, value: super::BrokerId) -> Self {
80 self.broker_id = value;
81 self
82 }
83 pub fn with_cluster_id(mut self, value: StrBytes) -> Self {
89 self.cluster_id = value;
90 self
91 }
92 pub fn with_incarnation_id(mut self, value: Uuid) -> Self {
98 self.incarnation_id = value;
99 self
100 }
101 pub fn with_listeners(mut self, value: Vec<Listener>) -> Self {
107 self.listeners = value;
108 self
109 }
110 pub fn with_features(mut self, value: Vec<Feature>) -> Self {
116 self.features = value;
117 self
118 }
119 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
125 self.rack = value;
126 self
127 }
128 pub fn with_is_migrating_zk_broker(mut self, value: bool) -> Self {
134 self.is_migrating_zk_broker = value;
135 self
136 }
137 pub fn with_log_dirs(mut self, value: Vec<Uuid>) -> Self {
143 self.log_dirs = value;
144 self
145 }
146 pub fn with_previous_broker_epoch(mut self, value: i64) -> Self {
152 self.previous_broker_epoch = value;
153 self
154 }
155 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
157 self.unknown_tagged_fields = value;
158 self
159 }
160 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
162 self.unknown_tagged_fields.insert(key, value);
163 self
164 }
165}
166
167#[cfg(feature = "client")]
168impl Encodable for BrokerRegistrationRequest {
169 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
170 if version < 0 || version > 4 {
171 bail!("specified version not supported by this message type");
172 }
173 types::Int32.encode(buf, &self.broker_id)?;
174 types::CompactString.encode(buf, &self.cluster_id)?;
175 types::Uuid.encode(buf, &self.incarnation_id)?;
176 types::CompactArray(types::Struct { version }).encode(buf, &self.listeners)?;
177 types::CompactArray(types::Struct { version }).encode(buf, &self.features)?;
178 types::CompactString.encode(buf, &self.rack)?;
179 if version >= 1 {
180 types::Boolean.encode(buf, &self.is_migrating_zk_broker)?;
181 } else {
182 if self.is_migrating_zk_broker {
183 bail!("A field is set that is not available on the selected protocol version");
184 }
185 }
186 if version >= 2 {
187 types::CompactArray(types::Uuid).encode(buf, &self.log_dirs)?;
188 }
189 if version >= 3 {
190 types::Int64.encode(buf, &self.previous_broker_epoch)?;
191 }
192 let num_tagged_fields = self.unknown_tagged_fields.len();
193 if num_tagged_fields > std::u32::MAX as usize {
194 bail!(
195 "Too many tagged fields to encode ({} fields)",
196 num_tagged_fields
197 );
198 }
199 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
200
201 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
202 Ok(())
203 }
204 fn compute_size(&self, version: i16) -> Result<usize> {
205 let mut total_size = 0;
206 total_size += types::Int32.compute_size(&self.broker_id)?;
207 total_size += types::CompactString.compute_size(&self.cluster_id)?;
208 total_size += types::Uuid.compute_size(&self.incarnation_id)?;
209 total_size +=
210 types::CompactArray(types::Struct { version }).compute_size(&self.listeners)?;
211 total_size +=
212 types::CompactArray(types::Struct { version }).compute_size(&self.features)?;
213 total_size += types::CompactString.compute_size(&self.rack)?;
214 if version >= 1 {
215 total_size += types::Boolean.compute_size(&self.is_migrating_zk_broker)?;
216 } else {
217 if self.is_migrating_zk_broker {
218 bail!("A field is set that is not available on the selected protocol version");
219 }
220 }
221 if version >= 2 {
222 total_size += types::CompactArray(types::Uuid).compute_size(&self.log_dirs)?;
223 }
224 if version >= 3 {
225 total_size += types::Int64.compute_size(&self.previous_broker_epoch)?;
226 }
227 let num_tagged_fields = self.unknown_tagged_fields.len();
228 if num_tagged_fields > std::u32::MAX as usize {
229 bail!(
230 "Too many tagged fields to encode ({} fields)",
231 num_tagged_fields
232 );
233 }
234 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
235
236 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
237 Ok(total_size)
238 }
239}
240
241#[cfg(feature = "broker")]
242impl Decodable for BrokerRegistrationRequest {
243 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
244 if version < 0 || version > 4 {
245 bail!("specified version not supported by this message type");
246 }
247 let broker_id = types::Int32.decode(buf)?;
248 let cluster_id = types::CompactString.decode(buf)?;
249 let incarnation_id = types::Uuid.decode(buf)?;
250 let listeners = types::CompactArray(types::Struct { version }).decode(buf)?;
251 let features = types::CompactArray(types::Struct { version }).decode(buf)?;
252 let rack = types::CompactString.decode(buf)?;
253 let is_migrating_zk_broker = if version >= 1 {
254 types::Boolean.decode(buf)?
255 } else {
256 false
257 };
258 let log_dirs = if version >= 2 {
259 types::CompactArray(types::Uuid).decode(buf)?
260 } else {
261 Default::default()
262 };
263 let previous_broker_epoch = if version >= 3 {
264 types::Int64.decode(buf)?
265 } else {
266 -1
267 };
268 let mut unknown_tagged_fields = BTreeMap::new();
269 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
270 for _ in 0..num_tagged_fields {
271 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
272 let size: u32 = types::UnsignedVarInt.decode(buf)?;
273 let unknown_value = buf.try_get_bytes(size as usize)?;
274 unknown_tagged_fields.insert(tag as i32, unknown_value);
275 }
276 Ok(Self {
277 broker_id,
278 cluster_id,
279 incarnation_id,
280 listeners,
281 features,
282 rack,
283 is_migrating_zk_broker,
284 log_dirs,
285 previous_broker_epoch,
286 unknown_tagged_fields,
287 })
288 }
289}
290
291impl Default for BrokerRegistrationRequest {
292 fn default() -> Self {
293 Self {
294 broker_id: (0).into(),
295 cluster_id: Default::default(),
296 incarnation_id: Uuid::nil(),
297 listeners: Default::default(),
298 features: Default::default(),
299 rack: Some(Default::default()),
300 is_migrating_zk_broker: false,
301 log_dirs: Default::default(),
302 previous_broker_epoch: -1,
303 unknown_tagged_fields: BTreeMap::new(),
304 }
305 }
306}
307
308impl Message for BrokerRegistrationRequest {
309 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
310 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
311}
312
313#[non_exhaustive]
315#[derive(Debug, Clone, PartialEq)]
316pub struct Feature {
317 pub name: StrBytes,
321
322 pub min_supported_version: i16,
326
327 pub max_supported_version: i16,
331
332 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
334}
335
336impl Feature {
337 pub fn with_name(mut self, value: StrBytes) -> Self {
343 self.name = value;
344 self
345 }
346 pub fn with_min_supported_version(mut self, value: i16) -> Self {
352 self.min_supported_version = value;
353 self
354 }
355 pub fn with_max_supported_version(mut self, value: i16) -> Self {
361 self.max_supported_version = value;
362 self
363 }
364 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
366 self.unknown_tagged_fields = value;
367 self
368 }
369 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
371 self.unknown_tagged_fields.insert(key, value);
372 self
373 }
374}
375
376#[cfg(feature = "client")]
377impl Encodable for Feature {
378 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
379 if version < 0 || version > 4 {
380 bail!("specified version not supported by this message type");
381 }
382 types::CompactString.encode(buf, &self.name)?;
383 types::Int16.encode(buf, &self.min_supported_version)?;
384 types::Int16.encode(buf, &self.max_supported_version)?;
385 let num_tagged_fields = self.unknown_tagged_fields.len();
386 if num_tagged_fields > std::u32::MAX as usize {
387 bail!(
388 "Too many tagged fields to encode ({} fields)",
389 num_tagged_fields
390 );
391 }
392 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
393
394 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
395 Ok(())
396 }
397 fn compute_size(&self, version: i16) -> Result<usize> {
398 let mut total_size = 0;
399 total_size += types::CompactString.compute_size(&self.name)?;
400 total_size += types::Int16.compute_size(&self.min_supported_version)?;
401 total_size += types::Int16.compute_size(&self.max_supported_version)?;
402 let num_tagged_fields = self.unknown_tagged_fields.len();
403 if num_tagged_fields > std::u32::MAX as usize {
404 bail!(
405 "Too many tagged fields to encode ({} fields)",
406 num_tagged_fields
407 );
408 }
409 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
410
411 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
412 Ok(total_size)
413 }
414}
415
416#[cfg(feature = "broker")]
417impl Decodable for Feature {
418 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
419 if version < 0 || version > 4 {
420 bail!("specified version not supported by this message type");
421 }
422 let name = types::CompactString.decode(buf)?;
423 let min_supported_version = types::Int16.decode(buf)?;
424 let max_supported_version = types::Int16.decode(buf)?;
425 let mut unknown_tagged_fields = BTreeMap::new();
426 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
427 for _ in 0..num_tagged_fields {
428 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
429 let size: u32 = types::UnsignedVarInt.decode(buf)?;
430 let unknown_value = buf.try_get_bytes(size as usize)?;
431 unknown_tagged_fields.insert(tag as i32, unknown_value);
432 }
433 Ok(Self {
434 name,
435 min_supported_version,
436 max_supported_version,
437 unknown_tagged_fields,
438 })
439 }
440}
441
442impl Default for Feature {
443 fn default() -> Self {
444 Self {
445 name: Default::default(),
446 min_supported_version: 0,
447 max_supported_version: 0,
448 unknown_tagged_fields: BTreeMap::new(),
449 }
450 }
451}
452
453impl Message for Feature {
454 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
455 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
456}
457
458#[non_exhaustive]
460#[derive(Debug, Clone, PartialEq)]
461pub struct Listener {
462 pub name: StrBytes,
466
467 pub host: StrBytes,
471
472 pub port: u16,
476
477 pub security_protocol: i16,
481
482 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
484}
485
486impl Listener {
487 pub fn with_name(mut self, value: StrBytes) -> Self {
493 self.name = value;
494 self
495 }
496 pub fn with_host(mut self, value: StrBytes) -> Self {
502 self.host = value;
503 self
504 }
505 pub fn with_port(mut self, value: u16) -> Self {
511 self.port = value;
512 self
513 }
514 pub fn with_security_protocol(mut self, value: i16) -> Self {
520 self.security_protocol = value;
521 self
522 }
523 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
525 self.unknown_tagged_fields = value;
526 self
527 }
528 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
530 self.unknown_tagged_fields.insert(key, value);
531 self
532 }
533}
534
535#[cfg(feature = "client")]
536impl Encodable for Listener {
537 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
538 if version < 0 || version > 4 {
539 bail!("specified version not supported by this message type");
540 }
541 types::CompactString.encode(buf, &self.name)?;
542 types::CompactString.encode(buf, &self.host)?;
543 types::UInt16.encode(buf, &self.port)?;
544 types::Int16.encode(buf, &self.security_protocol)?;
545 let num_tagged_fields = self.unknown_tagged_fields.len();
546 if num_tagged_fields > std::u32::MAX as usize {
547 bail!(
548 "Too many tagged fields to encode ({} fields)",
549 num_tagged_fields
550 );
551 }
552 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
553
554 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
555 Ok(())
556 }
557 fn compute_size(&self, version: i16) -> Result<usize> {
558 let mut total_size = 0;
559 total_size += types::CompactString.compute_size(&self.name)?;
560 total_size += types::CompactString.compute_size(&self.host)?;
561 total_size += types::UInt16.compute_size(&self.port)?;
562 total_size += types::Int16.compute_size(&self.security_protocol)?;
563 let num_tagged_fields = self.unknown_tagged_fields.len();
564 if num_tagged_fields > std::u32::MAX as usize {
565 bail!(
566 "Too many tagged fields to encode ({} fields)",
567 num_tagged_fields
568 );
569 }
570 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
571
572 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
573 Ok(total_size)
574 }
575}
576
577#[cfg(feature = "broker")]
578impl Decodable for Listener {
579 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
580 if version < 0 || version > 4 {
581 bail!("specified version not supported by this message type");
582 }
583 let name = types::CompactString.decode(buf)?;
584 let host = types::CompactString.decode(buf)?;
585 let port = types::UInt16.decode(buf)?;
586 let security_protocol = types::Int16.decode(buf)?;
587 let mut unknown_tagged_fields = BTreeMap::new();
588 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
589 for _ in 0..num_tagged_fields {
590 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
591 let size: u32 = types::UnsignedVarInt.decode(buf)?;
592 let unknown_value = buf.try_get_bytes(size as usize)?;
593 unknown_tagged_fields.insert(tag as i32, unknown_value);
594 }
595 Ok(Self {
596 name,
597 host,
598 port,
599 security_protocol,
600 unknown_tagged_fields,
601 })
602 }
603}
604
605impl Default for Listener {
606 fn default() -> Self {
607 Self {
608 name: Default::default(),
609 host: Default::default(),
610 port: 0,
611 security_protocol: 0,
612 unknown_tagged_fields: BTreeMap::new(),
613 }
614 }
615}
616
617impl Message for Listener {
618 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
619 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
620}
621
622impl HeaderVersion for BrokerRegistrationRequest {
623 fn header_version(version: i16) -> i16 {
624 2
625 }
626}