1use crate::primitives::fixed::{
4 get_bool, get_i16, get_i32, get_i64, get_u16, put_bool, put_i16, put_i32, put_i64, put_u16,
5};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
10};
11use crate::tagged_fields::{
12 WriteTaggedFields, encode_to_bytes, read_tagged_fields, tagged_fields_len,
13};
14use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
15use bytes::{Buf, BufMut};
16pub const MIN_VERSION: i16 = 0;
17pub const MAX_VERSION: i16 = 4;
18pub const FLEXIBLE_MIN: i16 = 0;
19#[inline]
20fn is_flexible(version: i16) -> bool {
21 version >= FLEXIBLE_MIN
22}
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct RegisterBrokerRecord {
25 pub broker_id: i32,
26 pub is_migrating_zk_broker: bool,
27 pub incarnation_id: crate::primitives::uuid::Uuid,
28 pub broker_epoch: i64,
29 pub end_points: Vec<BrokerEndpoint>,
30 pub features: Vec<BrokerFeature>,
31 pub rack: Option<String>,
32 pub fenced: bool,
33 pub in_controlled_shutdown: bool,
34 pub log_dirs: Vec<crate::primitives::uuid::Uuid>,
35 pub cordoned_log_dirs: Option<Vec<crate::primitives::uuid::Uuid>>,
36 pub unknown_tagged_fields: UnknownTaggedFields,
37}
38impl Default for RegisterBrokerRecord {
39 fn default() -> Self {
40 Self {
41 broker_id: 0i32,
42 is_migrating_zk_broker: false,
43 incarnation_id: Default::default(),
44 broker_epoch: 0i64,
45 end_points: Vec::new(),
46 features: Vec::new(),
47 rack: None,
48 fenced: true,
49 in_controlled_shutdown: false,
50 log_dirs: Vec::new(),
51 cordoned_log_dirs: None,
52 unknown_tagged_fields: Default::default(),
53 }
54 }
55}
56impl Encode for RegisterBrokerRecord {
57 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
58 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
59 return Err(ProtocolError::SchemaMismatch(
60 "RegisterBrokerRecord version out of range",
61 ));
62 }
63 let flex = is_flexible(version);
64 if version >= 0 {
65 put_i32(buf, self.broker_id);
66 }
67 if version >= 2 {
68 put_bool(buf, self.is_migrating_zk_broker);
69 }
70 if version >= 0 {
71 crate::primitives::uuid::put_uuid(buf, self.incarnation_id);
72 }
73 if version >= 0 {
74 put_i64(buf, self.broker_epoch);
75 }
76 if version >= 0 {
77 {
78 crate::primitives::array::put_array_len(buf, (self.end_points).len(), flex);
79 for it in &self.end_points {
80 it.encode(buf, version)?;
81 }
82 }
83 }
84 if version >= 0 {
85 {
86 crate::primitives::array::put_array_len(buf, (self.features).len(), flex);
87 for it in &self.features {
88 it.encode(buf, version)?;
89 }
90 }
91 }
92 if version >= 0 {
93 if flex {
94 put_compact_nullable_string(buf, self.rack.as_deref());
95 } else {
96 put_nullable_string(buf, self.rack.as_deref());
97 }
98 }
99 if version >= 0 {
100 put_bool(buf, self.fenced);
101 }
102 if version >= 1 {
103 put_bool(buf, self.in_controlled_shutdown);
104 }
105 if flex {
106 let mut tagged = WriteTaggedFields::new();
107 if !(crate::codegen_helpers::is_default(&self.log_dirs)) {
108 let payload = encode_to_bytes(
109 {
110 let prefix = crate::primitives::array::array_len_prefix_len(
111 (self.log_dirs).len(),
112 flex,
113 );
114 let body: usize = (self.log_dirs).iter().map(|_| 16).sum();
115 prefix + body
116 },
117 |b| {
118 {
119 crate::primitives::array::put_array_len(b, (self.log_dirs).len(), flex);
120 for it in &self.log_dirs {
121 crate::primitives::uuid::put_uuid(b, *it);
122 }
123 };
124 Ok(())
125 },
126 );
127 tagged.add(0, payload);
128 }
129 if !(self.cordoned_log_dirs.is_none()) {
130 let payload = encode_to_bytes(
131 {
132 let opt: Option<&Vec<_>> = (self.cordoned_log_dirs).as_ref();
133 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
134 opt.map(std::vec::Vec::len),
135 flex,
136 );
137 let body: usize = opt.map_or(0, |v| v.iter().map(|_| 16).sum());
138 prefix + body
139 },
140 |b| {
141 {
142 let len = (self.cordoned_log_dirs).as_ref().map(Vec::len);
143 crate::primitives::array::put_nullable_array_len(b, len, flex);
144 if let Some(v) = &self.cordoned_log_dirs {
145 for it in v {
146 crate::primitives::uuid::put_uuid(b, *it);
147 }
148 }
149 };
150 Ok(())
151 },
152 );
153 tagged.add(1, payload);
154 }
155 tagged.write(buf, &self.unknown_tagged_fields);
156 }
157 Ok(())
158 }
159 fn encoded_len(&self, version: i16) -> usize {
160 let flex = is_flexible(version);
161 let mut n: usize = 0;
162 if version >= 0 {
163 n += 4;
164 }
165 if version >= 2 {
166 n += 1;
167 }
168 if version >= 0 {
169 n += 16;
170 }
171 if version >= 0 {
172 n += 8;
173 }
174 if version >= 0 {
175 n += {
176 let prefix =
177 crate::primitives::array::array_len_prefix_len((self.end_points).len(), flex);
178 let body: usize = (self.end_points)
179 .iter()
180 .map(|it| it.encoded_len(version))
181 .sum();
182 prefix + body
183 };
184 }
185 if version >= 0 {
186 n += {
187 let prefix =
188 crate::primitives::array::array_len_prefix_len((self.features).len(), flex);
189 let body: usize = (self.features)
190 .iter()
191 .map(|it| it.encoded_len(version))
192 .sum();
193 prefix + body
194 };
195 }
196 if version >= 0 {
197 n += if flex {
198 compact_nullable_string_len(self.rack.as_deref())
199 } else {
200 nullable_string_len(self.rack.as_deref())
201 };
202 }
203 if version >= 0 {
204 n += 1;
205 }
206 if version >= 1 {
207 n += 1;
208 }
209 if flex {
210 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
211 if !(crate::codegen_helpers::is_default(&self.log_dirs)) {
212 known_pairs.push((0, {
213 let prefix =
214 crate::primitives::array::array_len_prefix_len((self.log_dirs).len(), flex);
215 let body: usize = (self.log_dirs).iter().map(|_| 16).sum();
216 prefix + body
217 }));
218 }
219 if !(self.cordoned_log_dirs.is_none()) {
220 known_pairs.push((1, {
221 let opt: Option<&Vec<_>> = (self.cordoned_log_dirs).as_ref();
222 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
223 opt.map(std::vec::Vec::len),
224 flex,
225 );
226 let body: usize = opt.map_or(0, |v| v.iter().map(|_| 16).sum());
227 prefix + body
228 }));
229 }
230 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
231 }
232 n
233 }
234}
235impl Decode<'_> for RegisterBrokerRecord {
236 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
237 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
238 return Err(ProtocolError::SchemaMismatch(
239 "RegisterBrokerRecord version out of range",
240 ));
241 }
242 let flex = is_flexible(version);
243 let mut out = Self::default();
244 if version >= 0 {
245 out.broker_id = get_i32(buf)?;
246 }
247 if version >= 2 {
248 out.is_migrating_zk_broker = get_bool(buf)?;
249 }
250 if version >= 0 {
251 out.incarnation_id = crate::primitives::uuid::get_uuid(buf)?;
252 }
253 if version >= 0 {
254 out.broker_epoch = get_i64(buf)?;
255 }
256 if version >= 0 {
257 out.end_points = {
258 let n = crate::primitives::array::get_array_len(buf, flex)?;
259 let mut v = Vec::with_capacity(n);
260 for _ in 0..n {
261 v.push(BrokerEndpoint::decode(buf, version)?);
262 }
263 v
264 };
265 }
266 if version >= 0 {
267 out.features = {
268 let n = crate::primitives::array::get_array_len(buf, flex)?;
269 let mut v = Vec::with_capacity(n);
270 for _ in 0..n {
271 v.push(BrokerFeature::decode(buf, version)?);
272 }
273 v
274 };
275 }
276 if version >= 0 {
277 out.rack = if flex {
278 get_compact_nullable_string_owned(buf)?
279 } else {
280 get_nullable_string_owned(buf)?
281 };
282 }
283 if version >= 0 {
284 out.fenced = get_bool(buf)?;
285 }
286 if version >= 1 {
287 out.in_controlled_shutdown = get_bool(buf)?;
288 }
289 if flex {
290 let mut tag_log_dirs = None;
291 let mut tag_cordoned_log_dirs = None;
292 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
293 0 => {
294 tag_log_dirs = Some({
295 let b: &mut &[u8] = payload;
296 {
297 let n = crate::primitives::array::get_array_len(b, flex)?;
298 let mut v = Vec::with_capacity(n);
299 for _ in 0..n {
300 v.push(crate::primitives::uuid::get_uuid(b)?);
301 }
302 v
303 }
304 });
305 Ok(true)
306 }
307 1 => {
308 tag_cordoned_log_dirs = Some({
309 let b: &mut &[u8] = payload;
310 {
311 let opt = crate::primitives::array::get_nullable_array_len(b, flex)?;
312 match opt {
313 None => None,
314 Some(n) => {
315 let mut v = Vec::with_capacity(n);
316 for _ in 0..n {
317 v.push(crate::primitives::uuid::get_uuid(b)?);
318 }
319 Some(v)
320 }
321 }
322 }
323 });
324 Ok(true)
325 }
326 _ => Ok(false),
327 })?;
328 if let Some(v) = tag_log_dirs {
329 out.log_dirs = v;
330 }
331 if let Some(v) = tag_cordoned_log_dirs {
332 out.cordoned_log_dirs = v;
333 }
334 }
335 Ok(out)
336 }
337}
338#[cfg(test)]
339impl RegisterBrokerRecord {
340 #[must_use]
341 pub fn populated(version: i16) -> Self {
342 let mut m = Self::default();
343 if version >= 0 {
344 m.broker_id = 1i32;
345 }
346 if version >= 2 {
347 m.is_migrating_zk_broker = true;
348 }
349 if version >= 0 {
350 m.incarnation_id = crate::primitives::uuid::Uuid([1u8; 16]);
351 }
352 if version >= 0 {
353 m.broker_epoch = 1i64;
354 }
355 if version >= 0 {
356 m.end_points = vec![BrokerEndpoint::populated(version)];
357 }
358 if version >= 0 {
359 m.features = vec![BrokerFeature::populated(version)];
360 }
361 if version >= 0 {
362 m.rack = Some("x".to_string());
363 }
364 if version >= 0 {
365 m.fenced = true;
366 }
367 if version >= 1 {
368 m.in_controlled_shutdown = true;
369 }
370 if version >= 3 {
371 m.log_dirs = vec![crate::primitives::uuid::Uuid([1u8; 16])];
372 }
373 if version >= 4 {
374 m.cordoned_log_dirs = Some(vec![crate::primitives::uuid::Uuid([1u8; 16])]);
375 }
376 m
377 }
378}
379#[derive(Debug, Clone, PartialEq, Eq, Default)]
380pub struct BrokerEndpoint {
381 pub name: String,
382 pub host: String,
383 pub port: u16,
384 pub security_protocol: i16,
385 pub unknown_tagged_fields: UnknownTaggedFields,
386}
387impl Encode for BrokerEndpoint {
388 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
389 let flex = version >= 0;
390 if version >= 0 {
391 if flex {
392 put_compact_string(buf, &self.name);
393 } else {
394 put_string(buf, &self.name);
395 }
396 }
397 if version >= 0 {
398 if flex {
399 put_compact_string(buf, &self.host);
400 } else {
401 put_string(buf, &self.host);
402 }
403 }
404 if version >= 0 {
405 put_u16(buf, self.port);
406 }
407 if version >= 0 {
408 put_i16(buf, self.security_protocol);
409 }
410 if flex {
411 let tagged = WriteTaggedFields::new();
412 tagged.write(buf, &self.unknown_tagged_fields);
413 }
414 Ok(())
415 }
416 fn encoded_len(&self, version: i16) -> usize {
417 let flex = version >= 0;
418 let mut n: usize = 0;
419 if version >= 0 {
420 n += if flex {
421 compact_string_len(&self.name)
422 } else {
423 string_len(&self.name)
424 };
425 }
426 if version >= 0 {
427 n += if flex {
428 compact_string_len(&self.host)
429 } else {
430 string_len(&self.host)
431 };
432 }
433 if version >= 0 {
434 n += 2;
435 }
436 if version >= 0 {
437 n += 2;
438 }
439 if flex {
440 let known_pairs: Vec<(u32, usize)> = Vec::new();
441 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
442 }
443 n
444 }
445}
446impl Decode<'_> for BrokerEndpoint {
447 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
448 let flex = version >= 0;
449 let mut out = Self::default();
450 if version >= 0 {
451 out.name = if flex {
452 get_compact_string_owned(buf)?
453 } else {
454 get_string_owned(buf)?
455 };
456 }
457 if version >= 0 {
458 out.host = if flex {
459 get_compact_string_owned(buf)?
460 } else {
461 get_string_owned(buf)?
462 };
463 }
464 if version >= 0 {
465 out.port = get_u16(buf)?;
466 }
467 if version >= 0 {
468 out.security_protocol = get_i16(buf)?;
469 }
470 if flex {
471 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
472 }
473 Ok(out)
474 }
475}
476#[cfg(test)]
477impl BrokerEndpoint {
478 #[must_use]
479 pub fn populated(version: i16) -> Self {
480 let mut m = Self::default();
481 if version >= 0 {
482 m.name = "x".to_string();
483 }
484 if version >= 0 {
485 m.host = "x".to_string();
486 }
487 if version >= 0 {
488 m.port = 1u16;
489 }
490 if version >= 0 {
491 m.security_protocol = 1i16;
492 }
493 m
494 }
495}
496#[derive(Debug, Clone, PartialEq, Eq, Default)]
497pub struct BrokerFeature {
498 pub name: String,
499 pub min_supported_version: i16,
500 pub max_supported_version: i16,
501 pub unknown_tagged_fields: UnknownTaggedFields,
502}
503impl Encode for BrokerFeature {
504 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
505 let flex = version >= 0;
506 if version >= 0 {
507 if flex {
508 put_compact_string(buf, &self.name);
509 } else {
510 put_string(buf, &self.name);
511 }
512 }
513 if version >= 0 {
514 put_i16(buf, self.min_supported_version);
515 }
516 if version >= 0 {
517 put_i16(buf, self.max_supported_version);
518 }
519 if flex {
520 let tagged = WriteTaggedFields::new();
521 tagged.write(buf, &self.unknown_tagged_fields);
522 }
523 Ok(())
524 }
525 fn encoded_len(&self, version: i16) -> usize {
526 let flex = version >= 0;
527 let mut n: usize = 0;
528 if version >= 0 {
529 n += if flex {
530 compact_string_len(&self.name)
531 } else {
532 string_len(&self.name)
533 };
534 }
535 if version >= 0 {
536 n += 2;
537 }
538 if version >= 0 {
539 n += 2;
540 }
541 if flex {
542 let known_pairs: Vec<(u32, usize)> = Vec::new();
543 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
544 }
545 n
546 }
547}
548impl Decode<'_> for BrokerFeature {
549 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
550 let flex = version >= 0;
551 let mut out = Self::default();
552 if version >= 0 {
553 out.name = if flex {
554 get_compact_string_owned(buf)?
555 } else {
556 get_string_owned(buf)?
557 };
558 }
559 if version >= 0 {
560 out.min_supported_version = get_i16(buf)?;
561 }
562 if version >= 0 {
563 out.max_supported_version = get_i16(buf)?;
564 }
565 if flex {
566 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
567 }
568 Ok(out)
569 }
570}
571#[cfg(test)]
572impl BrokerFeature {
573 #[must_use]
574 pub fn populated(version: i16) -> Self {
575 let mut m = Self::default();
576 if version >= 0 {
577 m.name = "x".to_string();
578 }
579 if version >= 0 {
580 m.min_supported_version = 1i16;
581 }
582 if version >= 0 {
583 m.max_supported_version = 1i16;
584 }
585 m
586 }
587}
588#[must_use]
591#[allow(unused_comparisons)]
592pub fn default_json(version: i16) -> ::serde_json::Value {
593 let mut obj = ::serde_json::Map::new();
594 obj.insert("brokerId".to_string(), ::serde_json::json!(0));
595 if version >= 2 {
596 obj.insert(
597 "isMigratingZkBroker".to_string(),
598 ::serde_json::Value::Bool(false),
599 );
600 }
601 obj.insert(
602 "incarnationId".to_string(),
603 ::serde_json::Value::String("AAAAAAAAAAAAAAAAAAAAAA".to_string()),
604 );
605 obj.insert("brokerEpoch".to_string(), ::serde_json::json!(0));
606 obj.insert("endPoints".to_string(), ::serde_json::Value::Array(vec![]));
607 obj.insert("features".to_string(), ::serde_json::Value::Array(vec![]));
608 obj.insert("rack".to_string(), ::serde_json::Value::Null);
609 obj.insert("fenced".to_string(), ::serde_json::Value::Bool(true));
610 if version >= 1 {
611 obj.insert(
612 "inControlledShutdown".to_string(),
613 ::serde_json::Value::Bool(false),
614 );
615 }
616 if version >= 3 {
617 obj.insert("logDirs".to_string(), ::serde_json::Value::Array(vec![]));
618 }
619 if version >= 4 {
620 obj.insert("cordonedLogDirs".to_string(), ::serde_json::Value::Null);
621 }
622 ::serde_json::Value::Object(obj)
623}