kafka_protocol/messages/
broker_registration_request.rs

1//! BrokerRegistrationRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/BrokerRegistrationRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-4
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct BrokerRegistrationRequest {
24    /// The broker ID.
25    ///
26    /// Supported API versions: 0-4
27    pub broker_id: super::BrokerId,
28
29    /// The cluster id of the broker process.
30    ///
31    /// Supported API versions: 0-4
32    pub cluster_id: StrBytes,
33
34    /// The incarnation id of the broker process.
35    ///
36    /// Supported API versions: 0-4
37    pub incarnation_id: Uuid,
38
39    /// The listeners of this broker
40    ///
41    /// Supported API versions: 0-4
42    pub listeners: Vec<Listener>,
43
44    /// The features on this broker. Note: in v0-v3, features with MinSupportedVersion = 0 are omitted.
45    ///
46    /// Supported API versions: 0-4
47    pub features: Vec<Feature>,
48
49    /// The rack which this broker is in.
50    ///
51    /// Supported API versions: 0-4
52    pub rack: Option<StrBytes>,
53
54    /// If the required configurations for ZK migration are present, this value is set to true
55    ///
56    /// Supported API versions: 1-4
57    pub is_migrating_zk_broker: bool,
58
59    /// Log directories configured in this broker which are available.
60    ///
61    /// Supported API versions: 2-4
62    pub log_dirs: Vec<Uuid>,
63
64    /// The epoch before a clean shutdown.
65    ///
66    /// Supported API versions: 3-4
67    pub previous_broker_epoch: i64,
68
69    /// Other tagged fields
70    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
71}
72
73impl BrokerRegistrationRequest {
74    /// Sets `broker_id` to the passed value.
75    ///
76    /// The broker ID.
77    ///
78    /// Supported API versions: 0-4
79    pub fn with_broker_id(mut self, value: super::BrokerId) -> Self {
80        self.broker_id = value;
81        self
82    }
83    /// Sets `cluster_id` to the passed value.
84    ///
85    /// The cluster id of the broker process.
86    ///
87    /// Supported API versions: 0-4
88    pub fn with_cluster_id(mut self, value: StrBytes) -> Self {
89        self.cluster_id = value;
90        self
91    }
92    /// Sets `incarnation_id` to the passed value.
93    ///
94    /// The incarnation id of the broker process.
95    ///
96    /// Supported API versions: 0-4
97    pub fn with_incarnation_id(mut self, value: Uuid) -> Self {
98        self.incarnation_id = value;
99        self
100    }
101    /// Sets `listeners` to the passed value.
102    ///
103    /// The listeners of this broker
104    ///
105    /// Supported API versions: 0-4
106    pub fn with_listeners(mut self, value: Vec<Listener>) -> Self {
107        self.listeners = value;
108        self
109    }
110    /// Sets `features` to the passed value.
111    ///
112    /// The features on this broker. Note: in v0-v3, features with MinSupportedVersion = 0 are omitted.
113    ///
114    /// Supported API versions: 0-4
115    pub fn with_features(mut self, value: Vec<Feature>) -> Self {
116        self.features = value;
117        self
118    }
119    /// Sets `rack` to the passed value.
120    ///
121    /// The rack which this broker is in.
122    ///
123    /// Supported API versions: 0-4
124    pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
125        self.rack = value;
126        self
127    }
128    /// Sets `is_migrating_zk_broker` to the passed value.
129    ///
130    /// If the required configurations for ZK migration are present, this value is set to true
131    ///
132    /// Supported API versions: 1-4
133    pub fn with_is_migrating_zk_broker(mut self, value: bool) -> Self {
134        self.is_migrating_zk_broker = value;
135        self
136    }
137    /// Sets `log_dirs` to the passed value.
138    ///
139    /// Log directories configured in this broker which are available.
140    ///
141    /// Supported API versions: 2-4
142    pub fn with_log_dirs(mut self, value: Vec<Uuid>) -> Self {
143        self.log_dirs = value;
144        self
145    }
146    /// Sets `previous_broker_epoch` to the passed value.
147    ///
148    /// The epoch before a clean shutdown.
149    ///
150    /// Supported API versions: 3-4
151    pub fn with_previous_broker_epoch(mut self, value: i64) -> Self {
152        self.previous_broker_epoch = value;
153        self
154    }
155    /// Sets unknown_tagged_fields to the passed value.
156    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
157        self.unknown_tagged_fields = value;
158        self
159    }
160    /// Inserts an entry into unknown_tagged_fields.
161    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
162        self.unknown_tagged_fields.insert(key, value);
163        self
164    }
165}
166
167#[cfg(feature = "client")]
168impl Encodable for BrokerRegistrationRequest {
169    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
170        if version < 0 || version > 4 {
171            bail!("specified version not supported by this message type");
172        }
173        types::Int32.encode(buf, &self.broker_id)?;
174        types::CompactString.encode(buf, &self.cluster_id)?;
175        types::Uuid.encode(buf, &self.incarnation_id)?;
176        types::CompactArray(types::Struct { version }).encode(buf, &self.listeners)?;
177        types::CompactArray(types::Struct { version }).encode(buf, &self.features)?;
178        types::CompactString.encode(buf, &self.rack)?;
179        if version >= 1 {
180            types::Boolean.encode(buf, &self.is_migrating_zk_broker)?;
181        } else {
182            if self.is_migrating_zk_broker {
183                bail!("A field is set that is not available on the selected protocol version");
184            }
185        }
186        if version >= 2 {
187            types::CompactArray(types::Uuid).encode(buf, &self.log_dirs)?;
188        }
189        if version >= 3 {
190            types::Int64.encode(buf, &self.previous_broker_epoch)?;
191        }
192        let num_tagged_fields = self.unknown_tagged_fields.len();
193        if num_tagged_fields > std::u32::MAX as usize {
194            bail!(
195                "Too many tagged fields to encode ({} fields)",
196                num_tagged_fields
197            );
198        }
199        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
200
201        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
202        Ok(())
203    }
204    fn compute_size(&self, version: i16) -> Result<usize> {
205        let mut total_size = 0;
206        total_size += types::Int32.compute_size(&self.broker_id)?;
207        total_size += types::CompactString.compute_size(&self.cluster_id)?;
208        total_size += types::Uuid.compute_size(&self.incarnation_id)?;
209        total_size +=
210            types::CompactArray(types::Struct { version }).compute_size(&self.listeners)?;
211        total_size +=
212            types::CompactArray(types::Struct { version }).compute_size(&self.features)?;
213        total_size += types::CompactString.compute_size(&self.rack)?;
214        if version >= 1 {
215            total_size += types::Boolean.compute_size(&self.is_migrating_zk_broker)?;
216        } else {
217            if self.is_migrating_zk_broker {
218                bail!("A field is set that is not available on the selected protocol version");
219            }
220        }
221        if version >= 2 {
222            total_size += types::CompactArray(types::Uuid).compute_size(&self.log_dirs)?;
223        }
224        if version >= 3 {
225            total_size += types::Int64.compute_size(&self.previous_broker_epoch)?;
226        }
227        let num_tagged_fields = self.unknown_tagged_fields.len();
228        if num_tagged_fields > std::u32::MAX as usize {
229            bail!(
230                "Too many tagged fields to encode ({} fields)",
231                num_tagged_fields
232            );
233        }
234        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
235
236        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
237        Ok(total_size)
238    }
239}
240
241#[cfg(feature = "broker")]
242impl Decodable for BrokerRegistrationRequest {
243    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
244        if version < 0 || version > 4 {
245            bail!("specified version not supported by this message type");
246        }
247        let broker_id = types::Int32.decode(buf)?;
248        let cluster_id = types::CompactString.decode(buf)?;
249        let incarnation_id = types::Uuid.decode(buf)?;
250        let listeners = types::CompactArray(types::Struct { version }).decode(buf)?;
251        let features = types::CompactArray(types::Struct { version }).decode(buf)?;
252        let rack = types::CompactString.decode(buf)?;
253        let is_migrating_zk_broker = if version >= 1 {
254            types::Boolean.decode(buf)?
255        } else {
256            false
257        };
258        let log_dirs = if version >= 2 {
259            types::CompactArray(types::Uuid).decode(buf)?
260        } else {
261            Default::default()
262        };
263        let previous_broker_epoch = if version >= 3 {
264            types::Int64.decode(buf)?
265        } else {
266            -1
267        };
268        let mut unknown_tagged_fields = BTreeMap::new();
269        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
270        for _ in 0..num_tagged_fields {
271            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
272            let size: u32 = types::UnsignedVarInt.decode(buf)?;
273            let unknown_value = buf.try_get_bytes(size as usize)?;
274            unknown_tagged_fields.insert(tag as i32, unknown_value);
275        }
276        Ok(Self {
277            broker_id,
278            cluster_id,
279            incarnation_id,
280            listeners,
281            features,
282            rack,
283            is_migrating_zk_broker,
284            log_dirs,
285            previous_broker_epoch,
286            unknown_tagged_fields,
287        })
288    }
289}
290
291impl Default for BrokerRegistrationRequest {
292    fn default() -> Self {
293        Self {
294            broker_id: (0).into(),
295            cluster_id: Default::default(),
296            incarnation_id: Uuid::nil(),
297            listeners: Default::default(),
298            features: Default::default(),
299            rack: Some(Default::default()),
300            is_migrating_zk_broker: false,
301            log_dirs: Default::default(),
302            previous_broker_epoch: -1,
303            unknown_tagged_fields: BTreeMap::new(),
304        }
305    }
306}
307
308impl Message for BrokerRegistrationRequest {
309    const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
310    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
311}
312
313/// Valid versions: 0-4
314#[non_exhaustive]
315#[derive(Debug, Clone, PartialEq)]
316pub struct Feature {
317    /// The feature name.
318    ///
319    /// Supported API versions: 0-4
320    pub name: StrBytes,
321
322    /// The minimum supported feature level.
323    ///
324    /// Supported API versions: 0-4
325    pub min_supported_version: i16,
326
327    /// The maximum supported feature level.
328    ///
329    /// Supported API versions: 0-4
330    pub max_supported_version: i16,
331
332    /// Other tagged fields
333    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
334}
335
336impl Feature {
337    /// Sets `name` to the passed value.
338    ///
339    /// The feature name.
340    ///
341    /// Supported API versions: 0-4
342    pub fn with_name(mut self, value: StrBytes) -> Self {
343        self.name = value;
344        self
345    }
346    /// Sets `min_supported_version` to the passed value.
347    ///
348    /// The minimum supported feature level.
349    ///
350    /// Supported API versions: 0-4
351    pub fn with_min_supported_version(mut self, value: i16) -> Self {
352        self.min_supported_version = value;
353        self
354    }
355    /// Sets `max_supported_version` to the passed value.
356    ///
357    /// The maximum supported feature level.
358    ///
359    /// Supported API versions: 0-4
360    pub fn with_max_supported_version(mut self, value: i16) -> Self {
361        self.max_supported_version = value;
362        self
363    }
364    /// Sets unknown_tagged_fields to the passed value.
365    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
366        self.unknown_tagged_fields = value;
367        self
368    }
369    /// Inserts an entry into unknown_tagged_fields.
370    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
371        self.unknown_tagged_fields.insert(key, value);
372        self
373    }
374}
375
376#[cfg(feature = "client")]
377impl Encodable for Feature {
378    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
379        if version < 0 || version > 4 {
380            bail!("specified version not supported by this message type");
381        }
382        types::CompactString.encode(buf, &self.name)?;
383        types::Int16.encode(buf, &self.min_supported_version)?;
384        types::Int16.encode(buf, &self.max_supported_version)?;
385        let num_tagged_fields = self.unknown_tagged_fields.len();
386        if num_tagged_fields > std::u32::MAX as usize {
387            bail!(
388                "Too many tagged fields to encode ({} fields)",
389                num_tagged_fields
390            );
391        }
392        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
393
394        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
395        Ok(())
396    }
397    fn compute_size(&self, version: i16) -> Result<usize> {
398        let mut total_size = 0;
399        total_size += types::CompactString.compute_size(&self.name)?;
400        total_size += types::Int16.compute_size(&self.min_supported_version)?;
401        total_size += types::Int16.compute_size(&self.max_supported_version)?;
402        let num_tagged_fields = self.unknown_tagged_fields.len();
403        if num_tagged_fields > std::u32::MAX as usize {
404            bail!(
405                "Too many tagged fields to encode ({} fields)",
406                num_tagged_fields
407            );
408        }
409        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
410
411        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
412        Ok(total_size)
413    }
414}
415
416#[cfg(feature = "broker")]
417impl Decodable for Feature {
418    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
419        if version < 0 || version > 4 {
420            bail!("specified version not supported by this message type");
421        }
422        let name = types::CompactString.decode(buf)?;
423        let min_supported_version = types::Int16.decode(buf)?;
424        let max_supported_version = types::Int16.decode(buf)?;
425        let mut unknown_tagged_fields = BTreeMap::new();
426        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
427        for _ in 0..num_tagged_fields {
428            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
429            let size: u32 = types::UnsignedVarInt.decode(buf)?;
430            let unknown_value = buf.try_get_bytes(size as usize)?;
431            unknown_tagged_fields.insert(tag as i32, unknown_value);
432        }
433        Ok(Self {
434            name,
435            min_supported_version,
436            max_supported_version,
437            unknown_tagged_fields,
438        })
439    }
440}
441
442impl Default for Feature {
443    fn default() -> Self {
444        Self {
445            name: Default::default(),
446            min_supported_version: 0,
447            max_supported_version: 0,
448            unknown_tagged_fields: BTreeMap::new(),
449        }
450    }
451}
452
453impl Message for Feature {
454    const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
455    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
456}
457
458/// Valid versions: 0-4
459#[non_exhaustive]
460#[derive(Debug, Clone, PartialEq)]
461pub struct Listener {
462    /// The name of the endpoint.
463    ///
464    /// Supported API versions: 0-4
465    pub name: StrBytes,
466
467    /// The hostname.
468    ///
469    /// Supported API versions: 0-4
470    pub host: StrBytes,
471
472    /// The port.
473    ///
474    /// Supported API versions: 0-4
475    pub port: u16,
476
477    /// The security protocol.
478    ///
479    /// Supported API versions: 0-4
480    pub security_protocol: i16,
481
482    /// Other tagged fields
483    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
484}
485
486impl Listener {
487    /// Sets `name` to the passed value.
488    ///
489    /// The name of the endpoint.
490    ///
491    /// Supported API versions: 0-4
492    pub fn with_name(mut self, value: StrBytes) -> Self {
493        self.name = value;
494        self
495    }
496    /// Sets `host` to the passed value.
497    ///
498    /// The hostname.
499    ///
500    /// Supported API versions: 0-4
501    pub fn with_host(mut self, value: StrBytes) -> Self {
502        self.host = value;
503        self
504    }
505    /// Sets `port` to the passed value.
506    ///
507    /// The port.
508    ///
509    /// Supported API versions: 0-4
510    pub fn with_port(mut self, value: u16) -> Self {
511        self.port = value;
512        self
513    }
514    /// Sets `security_protocol` to the passed value.
515    ///
516    /// The security protocol.
517    ///
518    /// Supported API versions: 0-4
519    pub fn with_security_protocol(mut self, value: i16) -> Self {
520        self.security_protocol = value;
521        self
522    }
523    /// Sets unknown_tagged_fields to the passed value.
524    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
525        self.unknown_tagged_fields = value;
526        self
527    }
528    /// Inserts an entry into unknown_tagged_fields.
529    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
530        self.unknown_tagged_fields.insert(key, value);
531        self
532    }
533}
534
535#[cfg(feature = "client")]
536impl Encodable for Listener {
537    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
538        if version < 0 || version > 4 {
539            bail!("specified version not supported by this message type");
540        }
541        types::CompactString.encode(buf, &self.name)?;
542        types::CompactString.encode(buf, &self.host)?;
543        types::UInt16.encode(buf, &self.port)?;
544        types::Int16.encode(buf, &self.security_protocol)?;
545        let num_tagged_fields = self.unknown_tagged_fields.len();
546        if num_tagged_fields > std::u32::MAX as usize {
547            bail!(
548                "Too many tagged fields to encode ({} fields)",
549                num_tagged_fields
550            );
551        }
552        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
553
554        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
555        Ok(())
556    }
557    fn compute_size(&self, version: i16) -> Result<usize> {
558        let mut total_size = 0;
559        total_size += types::CompactString.compute_size(&self.name)?;
560        total_size += types::CompactString.compute_size(&self.host)?;
561        total_size += types::UInt16.compute_size(&self.port)?;
562        total_size += types::Int16.compute_size(&self.security_protocol)?;
563        let num_tagged_fields = self.unknown_tagged_fields.len();
564        if num_tagged_fields > std::u32::MAX as usize {
565            bail!(
566                "Too many tagged fields to encode ({} fields)",
567                num_tagged_fields
568            );
569        }
570        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
571
572        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
573        Ok(total_size)
574    }
575}
576
577#[cfg(feature = "broker")]
578impl Decodable for Listener {
579    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
580        if version < 0 || version > 4 {
581            bail!("specified version not supported by this message type");
582        }
583        let name = types::CompactString.decode(buf)?;
584        let host = types::CompactString.decode(buf)?;
585        let port = types::UInt16.decode(buf)?;
586        let security_protocol = types::Int16.decode(buf)?;
587        let mut unknown_tagged_fields = BTreeMap::new();
588        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
589        for _ in 0..num_tagged_fields {
590            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
591            let size: u32 = types::UnsignedVarInt.decode(buf)?;
592            let unknown_value = buf.try_get_bytes(size as usize)?;
593            unknown_tagged_fields.insert(tag as i32, unknown_value);
594        }
595        Ok(Self {
596            name,
597            host,
598            port,
599            security_protocol,
600            unknown_tagged_fields,
601        })
602    }
603}
604
605impl Default for Listener {
606    fn default() -> Self {
607        Self {
608            name: Default::default(),
609            host: Default::default(),
610            port: 0,
611            security_protocol: 0,
612            unknown_tagged_fields: BTreeMap::new(),
613        }
614    }
615}
616
617impl Message for Listener {
618    const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
619    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
620}
621
622impl HeaderVersion for BrokerRegistrationRequest {
623    fn header_version(version: i16) -> i16 {
624        2
625    }
626}