1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct BrokerRegistrationRequest {
24 pub broker_id: super::BrokerId,
28
29 pub cluster_id: StrBytes,
33
34 pub incarnation_id: Uuid,
38
39 pub listeners: Vec<Listener>,
43
44 pub features: Vec<Feature>,
48
49 pub rack: Option<StrBytes>,
53
54 pub is_migrating_zk_broker: bool,
58
59 pub log_dirs: Vec<Uuid>,
63
64 pub previous_broker_epoch: i64,
68
69 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
71}
72
73impl BrokerRegistrationRequest {
74 pub fn with_broker_id(mut self, value: super::BrokerId) -> Self {
80 self.broker_id = value;
81 self
82 }
83 pub fn with_cluster_id(mut self, value: StrBytes) -> Self {
89 self.cluster_id = value;
90 self
91 }
92 pub fn with_incarnation_id(mut self, value: Uuid) -> Self {
98 self.incarnation_id = value;
99 self
100 }
101 pub fn with_listeners(mut self, value: Vec<Listener>) -> Self {
107 self.listeners = value;
108 self
109 }
110 pub fn with_features(mut self, value: Vec<Feature>) -> Self {
116 self.features = value;
117 self
118 }
119 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
125 self.rack = value;
126 self
127 }
128 pub fn with_is_migrating_zk_broker(mut self, value: bool) -> Self {
134 self.is_migrating_zk_broker = value;
135 self
136 }
137 pub fn with_log_dirs(mut self, value: Vec<Uuid>) -> Self {
143 self.log_dirs = value;
144 self
145 }
146 pub fn with_previous_broker_epoch(mut self, value: i64) -> Self {
152 self.previous_broker_epoch = value;
153 self
154 }
155 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
157 self.unknown_tagged_fields = value;
158 self
159 }
160 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
162 self.unknown_tagged_fields.insert(key, value);
163 self
164 }
165}
166
167#[cfg(feature = "client")]
168impl Encodable for BrokerRegistrationRequest {
169 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
170 types::Int32.encode(buf, &self.broker_id)?;
171 types::CompactString.encode(buf, &self.cluster_id)?;
172 types::Uuid.encode(buf, &self.incarnation_id)?;
173 types::CompactArray(types::Struct { version }).encode(buf, &self.listeners)?;
174 types::CompactArray(types::Struct { version }).encode(buf, &self.features)?;
175 types::CompactString.encode(buf, &self.rack)?;
176 if version >= 1 {
177 types::Boolean.encode(buf, &self.is_migrating_zk_broker)?;
178 } else {
179 if self.is_migrating_zk_broker {
180 bail!("A field is set that is not available on the selected protocol version");
181 }
182 }
183 if version >= 2 {
184 types::CompactArray(types::Uuid).encode(buf, &self.log_dirs)?;
185 }
186 if version >= 3 {
187 types::Int64.encode(buf, &self.previous_broker_epoch)?;
188 }
189 let num_tagged_fields = self.unknown_tagged_fields.len();
190 if num_tagged_fields > std::u32::MAX as usize {
191 bail!(
192 "Too many tagged fields to encode ({} fields)",
193 num_tagged_fields
194 );
195 }
196 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
197
198 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
199 Ok(())
200 }
201 fn compute_size(&self, version: i16) -> Result<usize> {
202 let mut total_size = 0;
203 total_size += types::Int32.compute_size(&self.broker_id)?;
204 total_size += types::CompactString.compute_size(&self.cluster_id)?;
205 total_size += types::Uuid.compute_size(&self.incarnation_id)?;
206 total_size +=
207 types::CompactArray(types::Struct { version }).compute_size(&self.listeners)?;
208 total_size +=
209 types::CompactArray(types::Struct { version }).compute_size(&self.features)?;
210 total_size += types::CompactString.compute_size(&self.rack)?;
211 if version >= 1 {
212 total_size += types::Boolean.compute_size(&self.is_migrating_zk_broker)?;
213 } else {
214 if self.is_migrating_zk_broker {
215 bail!("A field is set that is not available on the selected protocol version");
216 }
217 }
218 if version >= 2 {
219 total_size += types::CompactArray(types::Uuid).compute_size(&self.log_dirs)?;
220 }
221 if version >= 3 {
222 total_size += types::Int64.compute_size(&self.previous_broker_epoch)?;
223 }
224 let num_tagged_fields = self.unknown_tagged_fields.len();
225 if num_tagged_fields > std::u32::MAX as usize {
226 bail!(
227 "Too many tagged fields to encode ({} fields)",
228 num_tagged_fields
229 );
230 }
231 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
232
233 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
234 Ok(total_size)
235 }
236}
237
238#[cfg(feature = "broker")]
239impl Decodable for BrokerRegistrationRequest {
240 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
241 let broker_id = types::Int32.decode(buf)?;
242 let cluster_id = types::CompactString.decode(buf)?;
243 let incarnation_id = types::Uuid.decode(buf)?;
244 let listeners = types::CompactArray(types::Struct { version }).decode(buf)?;
245 let features = types::CompactArray(types::Struct { version }).decode(buf)?;
246 let rack = types::CompactString.decode(buf)?;
247 let is_migrating_zk_broker = if version >= 1 {
248 types::Boolean.decode(buf)?
249 } else {
250 false
251 };
252 let log_dirs = if version >= 2 {
253 types::CompactArray(types::Uuid).decode(buf)?
254 } else {
255 Default::default()
256 };
257 let previous_broker_epoch = if version >= 3 {
258 types::Int64.decode(buf)?
259 } else {
260 -1
261 };
262 let mut unknown_tagged_fields = BTreeMap::new();
263 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
264 for _ in 0..num_tagged_fields {
265 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
266 let size: u32 = types::UnsignedVarInt.decode(buf)?;
267 let unknown_value = buf.try_get_bytes(size as usize)?;
268 unknown_tagged_fields.insert(tag as i32, unknown_value);
269 }
270 Ok(Self {
271 broker_id,
272 cluster_id,
273 incarnation_id,
274 listeners,
275 features,
276 rack,
277 is_migrating_zk_broker,
278 log_dirs,
279 previous_broker_epoch,
280 unknown_tagged_fields,
281 })
282 }
283}
284
285impl Default for BrokerRegistrationRequest {
286 fn default() -> Self {
287 Self {
288 broker_id: (0).into(),
289 cluster_id: Default::default(),
290 incarnation_id: Uuid::nil(),
291 listeners: Default::default(),
292 features: Default::default(),
293 rack: Some(Default::default()),
294 is_migrating_zk_broker: false,
295 log_dirs: Default::default(),
296 previous_broker_epoch: -1,
297 unknown_tagged_fields: BTreeMap::new(),
298 }
299 }
300}
301
302impl Message for BrokerRegistrationRequest {
303 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
304 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
305}
306
307#[non_exhaustive]
309#[derive(Debug, Clone, PartialEq)]
310pub struct Feature {
311 pub name: StrBytes,
315
316 pub min_supported_version: i16,
320
321 pub max_supported_version: i16,
325
326 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
328}
329
330impl Feature {
331 pub fn with_name(mut self, value: StrBytes) -> Self {
337 self.name = value;
338 self
339 }
340 pub fn with_min_supported_version(mut self, value: i16) -> Self {
346 self.min_supported_version = value;
347 self
348 }
349 pub fn with_max_supported_version(mut self, value: i16) -> Self {
355 self.max_supported_version = value;
356 self
357 }
358 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
360 self.unknown_tagged_fields = value;
361 self
362 }
363 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
365 self.unknown_tagged_fields.insert(key, value);
366 self
367 }
368}
369
370#[cfg(feature = "client")]
371impl Encodable for Feature {
372 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
373 types::CompactString.encode(buf, &self.name)?;
374 types::Int16.encode(buf, &self.min_supported_version)?;
375 types::Int16.encode(buf, &self.max_supported_version)?;
376 let num_tagged_fields = self.unknown_tagged_fields.len();
377 if num_tagged_fields > std::u32::MAX as usize {
378 bail!(
379 "Too many tagged fields to encode ({} fields)",
380 num_tagged_fields
381 );
382 }
383 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
384
385 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
386 Ok(())
387 }
388 fn compute_size(&self, version: i16) -> Result<usize> {
389 let mut total_size = 0;
390 total_size += types::CompactString.compute_size(&self.name)?;
391 total_size += types::Int16.compute_size(&self.min_supported_version)?;
392 total_size += types::Int16.compute_size(&self.max_supported_version)?;
393 let num_tagged_fields = self.unknown_tagged_fields.len();
394 if num_tagged_fields > std::u32::MAX as usize {
395 bail!(
396 "Too many tagged fields to encode ({} fields)",
397 num_tagged_fields
398 );
399 }
400 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
401
402 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
403 Ok(total_size)
404 }
405}
406
407#[cfg(feature = "broker")]
408impl Decodable for Feature {
409 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
410 let name = types::CompactString.decode(buf)?;
411 let min_supported_version = types::Int16.decode(buf)?;
412 let max_supported_version = types::Int16.decode(buf)?;
413 let mut unknown_tagged_fields = BTreeMap::new();
414 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
415 for _ in 0..num_tagged_fields {
416 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
417 let size: u32 = types::UnsignedVarInt.decode(buf)?;
418 let unknown_value = buf.try_get_bytes(size as usize)?;
419 unknown_tagged_fields.insert(tag as i32, unknown_value);
420 }
421 Ok(Self {
422 name,
423 min_supported_version,
424 max_supported_version,
425 unknown_tagged_fields,
426 })
427 }
428}
429
430impl Default for Feature {
431 fn default() -> Self {
432 Self {
433 name: Default::default(),
434 min_supported_version: 0,
435 max_supported_version: 0,
436 unknown_tagged_fields: BTreeMap::new(),
437 }
438 }
439}
440
441impl Message for Feature {
442 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
443 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
444}
445
446#[non_exhaustive]
448#[derive(Debug, Clone, PartialEq)]
449pub struct Listener {
450 pub name: StrBytes,
454
455 pub host: StrBytes,
459
460 pub port: u16,
464
465 pub security_protocol: i16,
469
470 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
472}
473
474impl Listener {
475 pub fn with_name(mut self, value: StrBytes) -> Self {
481 self.name = value;
482 self
483 }
484 pub fn with_host(mut self, value: StrBytes) -> Self {
490 self.host = value;
491 self
492 }
493 pub fn with_port(mut self, value: u16) -> Self {
499 self.port = value;
500 self
501 }
502 pub fn with_security_protocol(mut self, value: i16) -> Self {
508 self.security_protocol = value;
509 self
510 }
511 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
513 self.unknown_tagged_fields = value;
514 self
515 }
516 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
518 self.unknown_tagged_fields.insert(key, value);
519 self
520 }
521}
522
523#[cfg(feature = "client")]
524impl Encodable for Listener {
525 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
526 types::CompactString.encode(buf, &self.name)?;
527 types::CompactString.encode(buf, &self.host)?;
528 types::UInt16.encode(buf, &self.port)?;
529 types::Int16.encode(buf, &self.security_protocol)?;
530 let num_tagged_fields = self.unknown_tagged_fields.len();
531 if num_tagged_fields > std::u32::MAX as usize {
532 bail!(
533 "Too many tagged fields to encode ({} fields)",
534 num_tagged_fields
535 );
536 }
537 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
538
539 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
540 Ok(())
541 }
542 fn compute_size(&self, version: i16) -> Result<usize> {
543 let mut total_size = 0;
544 total_size += types::CompactString.compute_size(&self.name)?;
545 total_size += types::CompactString.compute_size(&self.host)?;
546 total_size += types::UInt16.compute_size(&self.port)?;
547 total_size += types::Int16.compute_size(&self.security_protocol)?;
548 let num_tagged_fields = self.unknown_tagged_fields.len();
549 if num_tagged_fields > std::u32::MAX as usize {
550 bail!(
551 "Too many tagged fields to encode ({} fields)",
552 num_tagged_fields
553 );
554 }
555 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
556
557 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
558 Ok(total_size)
559 }
560}
561
562#[cfg(feature = "broker")]
563impl Decodable for Listener {
564 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
565 let name = types::CompactString.decode(buf)?;
566 let host = types::CompactString.decode(buf)?;
567 let port = types::UInt16.decode(buf)?;
568 let security_protocol = types::Int16.decode(buf)?;
569 let mut unknown_tagged_fields = BTreeMap::new();
570 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
571 for _ in 0..num_tagged_fields {
572 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
573 let size: u32 = types::UnsignedVarInt.decode(buf)?;
574 let unknown_value = buf.try_get_bytes(size as usize)?;
575 unknown_tagged_fields.insert(tag as i32, unknown_value);
576 }
577 Ok(Self {
578 name,
579 host,
580 port,
581 security_protocol,
582 unknown_tagged_fields,
583 })
584 }
585}
586
587impl Default for Listener {
588 fn default() -> Self {
589 Self {
590 name: Default::default(),
591 host: Default::default(),
592 port: 0,
593 security_protocol: 0,
594 unknown_tagged_fields: BTreeMap::new(),
595 }
596 }
597}
598
599impl Message for Listener {
600 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
601 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
602}
603
604impl HeaderVersion for BrokerRegistrationRequest {
605 fn header_version(version: i16) -> i16 {
606 2
607 }
608}