1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ControllerRegistrationRequest {
24 pub controller_id: i32,
28
29 pub incarnation_id: Uuid,
33
34 pub zk_migration_ready: bool,
38
39 pub listeners: Vec<Listener>,
43
44 pub features: Vec<Feature>,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl ControllerRegistrationRequest {
54 pub fn with_controller_id(mut self, value: i32) -> Self {
60 self.controller_id = value;
61 self
62 }
63 pub fn with_incarnation_id(mut self, value: Uuid) -> Self {
69 self.incarnation_id = value;
70 self
71 }
72 pub fn with_zk_migration_ready(mut self, value: bool) -> Self {
78 self.zk_migration_ready = value;
79 self
80 }
81 pub fn with_listeners(mut self, value: Vec<Listener>) -> Self {
87 self.listeners = value;
88 self
89 }
90 pub fn with_features(mut self, value: Vec<Feature>) -> Self {
96 self.features = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "client")]
112impl Encodable for ControllerRegistrationRequest {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 if version != 0 {
115 bail!("specified version not supported by this message type");
116 }
117 types::Int32.encode(buf, &self.controller_id)?;
118 types::Uuid.encode(buf, &self.incarnation_id)?;
119 types::Boolean.encode(buf, &self.zk_migration_ready)?;
120 types::CompactArray(types::Struct { version }).encode(buf, &self.listeners)?;
121 types::CompactArray(types::Struct { version }).encode(buf, &self.features)?;
122 let num_tagged_fields = self.unknown_tagged_fields.len();
123 if num_tagged_fields > std::u32::MAX as usize {
124 bail!(
125 "Too many tagged fields to encode ({} fields)",
126 num_tagged_fields
127 );
128 }
129 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
130
131 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
132 Ok(())
133 }
134 fn compute_size(&self, version: i16) -> Result<usize> {
135 let mut total_size = 0;
136 total_size += types::Int32.compute_size(&self.controller_id)?;
137 total_size += types::Uuid.compute_size(&self.incarnation_id)?;
138 total_size += types::Boolean.compute_size(&self.zk_migration_ready)?;
139 total_size +=
140 types::CompactArray(types::Struct { version }).compute_size(&self.listeners)?;
141 total_size +=
142 types::CompactArray(types::Struct { version }).compute_size(&self.features)?;
143 let num_tagged_fields = self.unknown_tagged_fields.len();
144 if num_tagged_fields > std::u32::MAX as usize {
145 bail!(
146 "Too many tagged fields to encode ({} fields)",
147 num_tagged_fields
148 );
149 }
150 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
151
152 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
153 Ok(total_size)
154 }
155}
156
157#[cfg(feature = "broker")]
158impl Decodable for ControllerRegistrationRequest {
159 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
160 if version != 0 {
161 bail!("specified version not supported by this message type");
162 }
163 let controller_id = types::Int32.decode(buf)?;
164 let incarnation_id = types::Uuid.decode(buf)?;
165 let zk_migration_ready = types::Boolean.decode(buf)?;
166 let listeners = types::CompactArray(types::Struct { version }).decode(buf)?;
167 let features = types::CompactArray(types::Struct { version }).decode(buf)?;
168 let mut unknown_tagged_fields = BTreeMap::new();
169 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
170 for _ in 0..num_tagged_fields {
171 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
172 let size: u32 = types::UnsignedVarInt.decode(buf)?;
173 let unknown_value = buf.try_get_bytes(size as usize)?;
174 unknown_tagged_fields.insert(tag as i32, unknown_value);
175 }
176 Ok(Self {
177 controller_id,
178 incarnation_id,
179 zk_migration_ready,
180 listeners,
181 features,
182 unknown_tagged_fields,
183 })
184 }
185}
186
187impl Default for ControllerRegistrationRequest {
188 fn default() -> Self {
189 Self {
190 controller_id: 0,
191 incarnation_id: Uuid::nil(),
192 zk_migration_ready: false,
193 listeners: Default::default(),
194 features: Default::default(),
195 unknown_tagged_fields: BTreeMap::new(),
196 }
197 }
198}
199
200impl Message for ControllerRegistrationRequest {
201 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
202 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
203}
204
205#[non_exhaustive]
207#[derive(Debug, Clone, PartialEq)]
208pub struct Feature {
209 pub name: StrBytes,
213
214 pub min_supported_version: i16,
218
219 pub max_supported_version: i16,
223
224 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
226}
227
228impl Feature {
229 pub fn with_name(mut self, value: StrBytes) -> Self {
235 self.name = value;
236 self
237 }
238 pub fn with_min_supported_version(mut self, value: i16) -> Self {
244 self.min_supported_version = value;
245 self
246 }
247 pub fn with_max_supported_version(mut self, value: i16) -> Self {
253 self.max_supported_version = value;
254 self
255 }
256 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
258 self.unknown_tagged_fields = value;
259 self
260 }
261 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
263 self.unknown_tagged_fields.insert(key, value);
264 self
265 }
266}
267
268#[cfg(feature = "client")]
269impl Encodable for Feature {
270 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
271 if version != 0 {
272 bail!("specified version not supported by this message type");
273 }
274 types::CompactString.encode(buf, &self.name)?;
275 types::Int16.encode(buf, &self.min_supported_version)?;
276 types::Int16.encode(buf, &self.max_supported_version)?;
277 let num_tagged_fields = self.unknown_tagged_fields.len();
278 if num_tagged_fields > std::u32::MAX as usize {
279 bail!(
280 "Too many tagged fields to encode ({} fields)",
281 num_tagged_fields
282 );
283 }
284 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
285
286 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
287 Ok(())
288 }
289 fn compute_size(&self, version: i16) -> Result<usize> {
290 let mut total_size = 0;
291 total_size += types::CompactString.compute_size(&self.name)?;
292 total_size += types::Int16.compute_size(&self.min_supported_version)?;
293 total_size += types::Int16.compute_size(&self.max_supported_version)?;
294 let num_tagged_fields = self.unknown_tagged_fields.len();
295 if num_tagged_fields > std::u32::MAX as usize {
296 bail!(
297 "Too many tagged fields to encode ({} fields)",
298 num_tagged_fields
299 );
300 }
301 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
302
303 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
304 Ok(total_size)
305 }
306}
307
308#[cfg(feature = "broker")]
309impl Decodable for Feature {
310 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
311 if version != 0 {
312 bail!("specified version not supported by this message type");
313 }
314 let name = types::CompactString.decode(buf)?;
315 let min_supported_version = types::Int16.decode(buf)?;
316 let max_supported_version = types::Int16.decode(buf)?;
317 let mut unknown_tagged_fields = BTreeMap::new();
318 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
319 for _ in 0..num_tagged_fields {
320 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
321 let size: u32 = types::UnsignedVarInt.decode(buf)?;
322 let unknown_value = buf.try_get_bytes(size as usize)?;
323 unknown_tagged_fields.insert(tag as i32, unknown_value);
324 }
325 Ok(Self {
326 name,
327 min_supported_version,
328 max_supported_version,
329 unknown_tagged_fields,
330 })
331 }
332}
333
334impl Default for Feature {
335 fn default() -> Self {
336 Self {
337 name: Default::default(),
338 min_supported_version: 0,
339 max_supported_version: 0,
340 unknown_tagged_fields: BTreeMap::new(),
341 }
342 }
343}
344
345impl Message for Feature {
346 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
347 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
348}
349
350#[non_exhaustive]
352#[derive(Debug, Clone, PartialEq)]
353pub struct Listener {
354 pub name: StrBytes,
358
359 pub host: StrBytes,
363
364 pub port: u16,
368
369 pub security_protocol: i16,
373
374 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
376}
377
378impl Listener {
379 pub fn with_name(mut self, value: StrBytes) -> Self {
385 self.name = value;
386 self
387 }
388 pub fn with_host(mut self, value: StrBytes) -> Self {
394 self.host = value;
395 self
396 }
397 pub fn with_port(mut self, value: u16) -> Self {
403 self.port = value;
404 self
405 }
406 pub fn with_security_protocol(mut self, value: i16) -> Self {
412 self.security_protocol = value;
413 self
414 }
415 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
417 self.unknown_tagged_fields = value;
418 self
419 }
420 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
422 self.unknown_tagged_fields.insert(key, value);
423 self
424 }
425}
426
427#[cfg(feature = "client")]
428impl Encodable for Listener {
429 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
430 if version != 0 {
431 bail!("specified version not supported by this message type");
432 }
433 types::CompactString.encode(buf, &self.name)?;
434 types::CompactString.encode(buf, &self.host)?;
435 types::UInt16.encode(buf, &self.port)?;
436 types::Int16.encode(buf, &self.security_protocol)?;
437 let num_tagged_fields = self.unknown_tagged_fields.len();
438 if num_tagged_fields > std::u32::MAX as usize {
439 bail!(
440 "Too many tagged fields to encode ({} fields)",
441 num_tagged_fields
442 );
443 }
444 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
445
446 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
447 Ok(())
448 }
449 fn compute_size(&self, version: i16) -> Result<usize> {
450 let mut total_size = 0;
451 total_size += types::CompactString.compute_size(&self.name)?;
452 total_size += types::CompactString.compute_size(&self.host)?;
453 total_size += types::UInt16.compute_size(&self.port)?;
454 total_size += types::Int16.compute_size(&self.security_protocol)?;
455 let num_tagged_fields = self.unknown_tagged_fields.len();
456 if num_tagged_fields > std::u32::MAX as usize {
457 bail!(
458 "Too many tagged fields to encode ({} fields)",
459 num_tagged_fields
460 );
461 }
462 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
463
464 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
465 Ok(total_size)
466 }
467}
468
469#[cfg(feature = "broker")]
470impl Decodable for Listener {
471 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
472 if version != 0 {
473 bail!("specified version not supported by this message type");
474 }
475 let name = types::CompactString.decode(buf)?;
476 let host = types::CompactString.decode(buf)?;
477 let port = types::UInt16.decode(buf)?;
478 let security_protocol = types::Int16.decode(buf)?;
479 let mut unknown_tagged_fields = BTreeMap::new();
480 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
481 for _ in 0..num_tagged_fields {
482 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
483 let size: u32 = types::UnsignedVarInt.decode(buf)?;
484 let unknown_value = buf.try_get_bytes(size as usize)?;
485 unknown_tagged_fields.insert(tag as i32, unknown_value);
486 }
487 Ok(Self {
488 name,
489 host,
490 port,
491 security_protocol,
492 unknown_tagged_fields,
493 })
494 }
495}
496
497impl Default for Listener {
498 fn default() -> Self {
499 Self {
500 name: Default::default(),
501 host: Default::default(),
502 port: 0,
503 security_protocol: 0,
504 unknown_tagged_fields: BTreeMap::new(),
505 }
506 }
507}
508
509impl Message for Listener {
510 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
511 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
512}
513
514impl HeaderVersion for ControllerRegistrationRequest {
515 fn header_version(version: i16) -> i16 {
516 2
517 }
518}