1use crate::primitives::fixed::{
3 get_i16, get_i32, get_i64, get_u16, put_i16, put_i32, put_i64, put_u16,
4};
5use crate::primitives::string_bytes::{
6 compact_nullable_string_len, compact_string_len, nullable_string_len,
7 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
8};
9use crate::primitives::string_bytes_borrowed::{
10 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
11 get_nullable_string_borrowed, get_string_borrowed,
12};
13use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
14use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
15use bytes::BufMut;
16pub const API_KEY: i16 = 55;
17pub const MIN_VERSION: i16 = 0;
18pub const MAX_VERSION: i16 = 2;
19pub const FLEXIBLE_MIN: i16 = 0;
20#[inline]
21fn is_flexible(version: i16) -> bool {
22 version >= FLEXIBLE_MIN
23}
24#[derive(Debug, Clone, PartialEq, Eq, Default)]
25pub struct DescribeQuorumResponse<'a> {
26 pub error_code: i16,
27 pub error_message: Option<&'a str>,
28 pub topics: Vec<TopicData<'a>>,
29 pub nodes: Vec<Node<'a>>,
30 pub unknown_tagged_fields: UnknownTaggedFields,
31}
32impl DescribeQuorumResponse<'_> {
33 pub fn to_owned(&self) -> crate::owned::describe_quorum_response::DescribeQuorumResponse {
34 crate::owned::describe_quorum_response::DescribeQuorumResponse {
35 error_code: (self.error_code),
36 error_message: (self.error_message).map(std::string::ToString::to_string),
37 topics: (self.topics).iter().map(TopicData::to_owned).collect(),
38 nodes: (self.nodes).iter().map(Node::to_owned).collect(),
39 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
40 }
41 }
42}
43impl Encode for DescribeQuorumResponse<'_> {
44 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
45 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
46 return Err(ProtocolError::UnsupportedVersion {
47 api_key: API_KEY,
48 version,
49 });
50 }
51 let flex = is_flexible(version);
52 if version >= 0 {
53 put_i16(buf, self.error_code);
54 }
55 if version >= 2 {
56 if flex {
57 put_compact_nullable_string(buf, self.error_message);
58 } else {
59 put_nullable_string(buf, self.error_message);
60 }
61 }
62 if version >= 0 {
63 {
64 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
65 for it in &self.topics {
66 it.encode(buf, version)?;
67 }
68 }
69 }
70 if version >= 2 {
71 {
72 crate::primitives::array::put_array_len(buf, (self.nodes).len(), flex);
73 for it in &self.nodes {
74 it.encode(buf, version)?;
75 }
76 }
77 }
78 if flex {
79 let tagged = WriteTaggedFields::new();
80 tagged.write(buf, &self.unknown_tagged_fields);
81 }
82 Ok(())
83 }
84 fn encoded_len(&self, version: i16) -> usize {
85 let flex = is_flexible(version);
86 let mut n: usize = 0;
87 if version >= 0 {
88 n += 2;
89 }
90 if version >= 2 {
91 n += if flex {
92 compact_nullable_string_len(self.error_message)
93 } else {
94 nullable_string_len(self.error_message)
95 };
96 }
97 if version >= 0 {
98 n += {
99 let prefix =
100 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
101 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
102 prefix + body
103 };
104 }
105 if version >= 2 {
106 n += {
107 let prefix =
108 crate::primitives::array::array_len_prefix_len((self.nodes).len(), flex);
109 let body: usize = (self.nodes).iter().map(|it| it.encoded_len(version)).sum();
110 prefix + body
111 };
112 }
113 if flex {
114 let known_pairs: Vec<(u32, usize)> = Vec::new();
115 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
116 }
117 n
118 }
119}
120impl<'de> DecodeBorrow<'de> for DescribeQuorumResponse<'de> {
121 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
122 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
123 return Err(ProtocolError::UnsupportedVersion {
124 api_key: API_KEY,
125 version,
126 });
127 }
128 let flex = is_flexible(version);
129 let mut out = Self::default();
130 if version >= 0 {
131 out.error_code = get_i16(buf)?;
132 }
133 if version >= 2 {
134 out.error_message = if flex {
135 get_compact_nullable_string_borrowed(buf)?
136 } else {
137 get_nullable_string_borrowed(buf)?
138 };
139 }
140 if version >= 0 {
141 out.topics = {
142 let n = crate::primitives::array::get_array_len(buf, flex)?;
143 let mut v = Vec::with_capacity(n);
144 for _ in 0..n {
145 v.push(TopicData::decode_borrow(buf, version)?);
146 }
147 v
148 };
149 }
150 if version >= 2 {
151 out.nodes = {
152 let n = crate::primitives::array::get_array_len(buf, flex)?;
153 let mut v = Vec::with_capacity(n);
154 for _ in 0..n {
155 v.push(Node::decode_borrow(buf, version)?);
156 }
157 v
158 };
159 }
160 if flex {
161 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
162 }
163 Ok(out)
164 }
165}
166#[cfg(test)]
167impl DescribeQuorumResponse<'_> {
168 #[must_use]
169 pub fn populated(version: i16) -> Self {
170 let mut m = Self::default();
171 if version >= 0 {
172 m.error_code = 1i16;
173 }
174 if version >= 2 {
175 m.error_message = Some("x");
176 }
177 if version >= 0 {
178 m.topics = vec![TopicData::populated(version)];
179 }
180 if version >= 2 {
181 m.nodes = vec![Node::populated(version)];
182 }
183 m
184 }
185}
186#[derive(Debug, Clone, PartialEq, Eq, Default)]
187pub struct TopicData<'a> {
188 pub topic_name: &'a str,
189 pub partitions: Vec<PartitionData<'a>>,
190 pub unknown_tagged_fields: UnknownTaggedFields,
191}
192impl TopicData<'_> {
193 pub fn to_owned(&self) -> crate::owned::describe_quorum_response::TopicData {
194 crate::owned::describe_quorum_response::TopicData {
195 topic_name: (self.topic_name).to_string(),
196 partitions: (self.partitions)
197 .iter()
198 .map(PartitionData::to_owned)
199 .collect(),
200 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
201 }
202 }
203}
204impl Encode for TopicData<'_> {
205 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
206 let flex = version >= 0;
207 if version >= 0 {
208 if flex {
209 put_compact_string(buf, self.topic_name);
210 } else {
211 put_string(buf, self.topic_name);
212 }
213 }
214 if version >= 0 {
215 {
216 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
217 for it in &self.partitions {
218 it.encode(buf, version)?;
219 }
220 }
221 }
222 if flex {
223 let tagged = WriteTaggedFields::new();
224 tagged.write(buf, &self.unknown_tagged_fields);
225 }
226 Ok(())
227 }
228 fn encoded_len(&self, version: i16) -> usize {
229 let flex = version >= 0;
230 let mut n: usize = 0;
231 if version >= 0 {
232 n += if flex {
233 compact_string_len(self.topic_name)
234 } else {
235 string_len(self.topic_name)
236 };
237 }
238 if version >= 0 {
239 n += {
240 let prefix =
241 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
242 let body: usize = (self.partitions)
243 .iter()
244 .map(|it| it.encoded_len(version))
245 .sum();
246 prefix + body
247 };
248 }
249 if flex {
250 let known_pairs: Vec<(u32, usize)> = Vec::new();
251 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
252 }
253 n
254 }
255}
256impl<'de> DecodeBorrow<'de> for TopicData<'de> {
257 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
258 let flex = version >= 0;
259 let mut out = Self::default();
260 if version >= 0 {
261 out.topic_name = if flex {
262 get_compact_string_borrowed(buf)?
263 } else {
264 get_string_borrowed(buf)?
265 };
266 }
267 if version >= 0 {
268 out.partitions = {
269 let n = crate::primitives::array::get_array_len(buf, flex)?;
270 let mut v = Vec::with_capacity(n);
271 for _ in 0..n {
272 v.push(PartitionData::decode_borrow(buf, version)?);
273 }
274 v
275 };
276 }
277 if flex {
278 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
279 }
280 Ok(out)
281 }
282}
283#[cfg(test)]
284impl TopicData<'_> {
285 #[must_use]
286 pub fn populated(version: i16) -> Self {
287 let mut m = Self::default();
288 if version >= 0 {
289 m.topic_name = "x";
290 }
291 if version >= 0 {
292 m.partitions = vec![PartitionData::populated(version)];
293 }
294 m
295 }
296}
297#[derive(Debug, Clone, PartialEq, Eq, Default)]
298pub struct PartitionData<'a> {
299 pub partition_index: i32,
300 pub error_code: i16,
301 pub error_message: Option<&'a str>,
302 pub leader_id: i32,
303 pub leader_epoch: i32,
304 pub high_watermark: i64,
305 pub current_voters: Vec<super::common::describe_quorum_response::replica_state::ReplicaState>,
306 pub observers: Vec<super::common::describe_quorum_response::replica_state::ReplicaState>,
307 pub unknown_tagged_fields: UnknownTaggedFields,
308}
309impl PartitionData<'_> {
310 pub fn to_owned(&self) -> crate::owned::describe_quorum_response::PartitionData {
311 crate::owned::describe_quorum_response::PartitionData {
312 partition_index: (self.partition_index),
313 error_code: (self.error_code),
314 error_message: (self.error_message).map(std::string::ToString::to_string),
315 leader_id: (self.leader_id),
316 leader_epoch: (self.leader_epoch),
317 high_watermark: (self.high_watermark),
318 current_voters: (self.current_voters)
319 .iter()
320 .map(super::common::describe_quorum_response::replica_state::ReplicaState::to_owned)
321 .collect(),
322 observers: (self.observers)
323 .iter()
324 .map(super::common::describe_quorum_response::replica_state::ReplicaState::to_owned)
325 .collect(),
326 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
327 }
328 }
329}
330impl Encode for PartitionData<'_> {
331 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
332 let flex = version >= 0;
333 if version >= 0 {
334 put_i32(buf, self.partition_index);
335 }
336 if version >= 0 {
337 put_i16(buf, self.error_code);
338 }
339 if version >= 2 {
340 if flex {
341 put_compact_nullable_string(buf, self.error_message);
342 } else {
343 put_nullable_string(buf, self.error_message);
344 }
345 }
346 if version >= 0 {
347 put_i32(buf, self.leader_id);
348 }
349 if version >= 0 {
350 put_i32(buf, self.leader_epoch);
351 }
352 if version >= 0 {
353 put_i64(buf, self.high_watermark);
354 }
355 if version >= 0 {
356 {
357 crate::primitives::array::put_array_len(buf, (self.current_voters).len(), flex);
358 for it in &self.current_voters {
359 it.encode(buf, version)?;
360 }
361 }
362 }
363 if version >= 0 {
364 {
365 crate::primitives::array::put_array_len(buf, (self.observers).len(), flex);
366 for it in &self.observers {
367 it.encode(buf, version)?;
368 }
369 }
370 }
371 if flex {
372 let tagged = WriteTaggedFields::new();
373 tagged.write(buf, &self.unknown_tagged_fields);
374 }
375 Ok(())
376 }
377 fn encoded_len(&self, version: i16) -> usize {
378 let flex = version >= 0;
379 let mut n: usize = 0;
380 if version >= 0 {
381 n += 4;
382 }
383 if version >= 0 {
384 n += 2;
385 }
386 if version >= 2 {
387 n += if flex {
388 compact_nullable_string_len(self.error_message)
389 } else {
390 nullable_string_len(self.error_message)
391 };
392 }
393 if version >= 0 {
394 n += 4;
395 }
396 if version >= 0 {
397 n += 4;
398 }
399 if version >= 0 {
400 n += 8;
401 }
402 if version >= 0 {
403 n += {
404 let prefix = crate::primitives::array::array_len_prefix_len(
405 (self.current_voters).len(),
406 flex,
407 );
408 let body: usize = (self.current_voters)
409 .iter()
410 .map(|it| it.encoded_len(version))
411 .sum();
412 prefix + body
413 };
414 }
415 if version >= 0 {
416 n += {
417 let prefix =
418 crate::primitives::array::array_len_prefix_len((self.observers).len(), flex);
419 let body: usize = (self.observers)
420 .iter()
421 .map(|it| it.encoded_len(version))
422 .sum();
423 prefix + body
424 };
425 }
426 if flex {
427 let known_pairs: Vec<(u32, usize)> = Vec::new();
428 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
429 }
430 n
431 }
432}
433impl<'de> DecodeBorrow<'de> for PartitionData<'de> {
434 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
435 let flex = version >= 0;
436 let mut out = Self::default();
437 if version >= 0 {
438 out.partition_index = get_i32(buf)?;
439 }
440 if version >= 0 {
441 out.error_code = get_i16(buf)?;
442 }
443 if version >= 2 {
444 out.error_message = if flex {
445 get_compact_nullable_string_borrowed(buf)?
446 } else {
447 get_nullable_string_borrowed(buf)?
448 };
449 }
450 if version >= 0 {
451 out.leader_id = get_i32(buf)?;
452 }
453 if version >= 0 {
454 out.leader_epoch = get_i32(buf)?;
455 }
456 if version >= 0 {
457 out.high_watermark = get_i64(buf)?;
458 }
459 if version >= 0 {
460 out.current_voters = {
461 let n = crate::primitives::array::get_array_len(buf, flex)?;
462 let mut v = Vec::with_capacity(n);
463 for _ in 0..n {
464 v.push(
465 super::common::describe_quorum_response::replica_state::ReplicaState::decode_borrow(
466 buf,
467 version,
468 )?,
469 );
470 }
471 v
472 };
473 }
474 if version >= 0 {
475 out.observers = {
476 let n = crate::primitives::array::get_array_len(buf, flex)?;
477 let mut v = Vec::with_capacity(n);
478 for _ in 0..n {
479 v.push(
480 super::common::describe_quorum_response::replica_state::ReplicaState::decode_borrow(
481 buf,
482 version,
483 )?,
484 );
485 }
486 v
487 };
488 }
489 if flex {
490 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
491 }
492 Ok(out)
493 }
494}
495#[cfg(test)]
496impl PartitionData<'_> {
497 #[must_use]
498 pub fn populated(version: i16) -> Self {
499 let mut m = Self::default();
500 if version >= 0 {
501 m.partition_index = 1i32;
502 }
503 if version >= 0 {
504 m.error_code = 1i16;
505 }
506 if version >= 2 {
507 m.error_message = Some("x");
508 }
509 if version >= 0 {
510 m.leader_id = 1i32;
511 }
512 if version >= 0 {
513 m.leader_epoch = 1i32;
514 }
515 if version >= 0 {
516 m.high_watermark = 1i64;
517 }
518 if version >= 0 {
519 m.current_voters = vec![
520 super::common::describe_quorum_response::replica_state::ReplicaState::populated(
521 version,
522 ),
523 ];
524 }
525 if version >= 0 {
526 m.observers = vec![
527 super::common::describe_quorum_response::replica_state::ReplicaState::populated(
528 version,
529 ),
530 ];
531 }
532 m
533 }
534}
535#[derive(Debug, Clone, PartialEq, Eq, Default)]
536pub struct Node<'a> {
537 pub node_id: i32,
538 pub listeners: Vec<Listener<'a>>,
539 pub unknown_tagged_fields: UnknownTaggedFields,
540}
541impl Node<'_> {
542 pub fn to_owned(&self) -> crate::owned::describe_quorum_response::Node {
543 crate::owned::describe_quorum_response::Node {
544 node_id: (self.node_id),
545 listeners: (self.listeners).iter().map(Listener::to_owned).collect(),
546 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
547 }
548 }
549}
550impl Encode for Node<'_> {
551 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
552 let flex = version >= 0;
553 if version >= 2 {
554 put_i32(buf, self.node_id);
555 }
556 if version >= 2 {
557 {
558 crate::primitives::array::put_array_len(buf, (self.listeners).len(), flex);
559 for it in &self.listeners {
560 it.encode(buf, version)?;
561 }
562 }
563 }
564 if flex {
565 let tagged = WriteTaggedFields::new();
566 tagged.write(buf, &self.unknown_tagged_fields);
567 }
568 Ok(())
569 }
570 fn encoded_len(&self, version: i16) -> usize {
571 let flex = version >= 0;
572 let mut n: usize = 0;
573 if version >= 2 {
574 n += 4;
575 }
576 if version >= 2 {
577 n += {
578 let prefix =
579 crate::primitives::array::array_len_prefix_len((self.listeners).len(), flex);
580 let body: usize = (self.listeners)
581 .iter()
582 .map(|it| it.encoded_len(version))
583 .sum();
584 prefix + body
585 };
586 }
587 if flex {
588 let known_pairs: Vec<(u32, usize)> = Vec::new();
589 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
590 }
591 n
592 }
593}
594impl<'de> DecodeBorrow<'de> for Node<'de> {
595 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
596 let flex = version >= 0;
597 let mut out = Self::default();
598 if version >= 2 {
599 out.node_id = get_i32(buf)?;
600 }
601 if version >= 2 {
602 out.listeners = {
603 let n = crate::primitives::array::get_array_len(buf, flex)?;
604 let mut v = Vec::with_capacity(n);
605 for _ in 0..n {
606 v.push(Listener::decode_borrow(buf, version)?);
607 }
608 v
609 };
610 }
611 if flex {
612 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
613 }
614 Ok(out)
615 }
616}
617#[cfg(test)]
618impl Node<'_> {
619 #[must_use]
620 pub fn populated(version: i16) -> Self {
621 let mut m = Self::default();
622 if version >= 2 {
623 m.node_id = 1i32;
624 }
625 if version >= 2 {
626 m.listeners = vec![Listener::populated(version)];
627 }
628 m
629 }
630}
631#[derive(Debug, Clone, PartialEq, Eq, Default)]
632pub struct Listener<'a> {
633 pub name: &'a str,
634 pub host: &'a str,
635 pub port: u16,
636 pub unknown_tagged_fields: UnknownTaggedFields,
637}
638impl Listener<'_> {
639 pub fn to_owned(&self) -> crate::owned::describe_quorum_response::Listener {
640 crate::owned::describe_quorum_response::Listener {
641 name: (self.name).to_string(),
642 host: (self.host).to_string(),
643 port: (self.port),
644 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
645 }
646 }
647}
648impl Encode for Listener<'_> {
649 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
650 let flex = version >= 0;
651 if version >= 2 {
652 if flex {
653 put_compact_string(buf, self.name);
654 } else {
655 put_string(buf, self.name);
656 }
657 }
658 if version >= 2 {
659 if flex {
660 put_compact_string(buf, self.host);
661 } else {
662 put_string(buf, self.host);
663 }
664 }
665 if version >= 2 {
666 put_u16(buf, self.port);
667 }
668 if flex {
669 let tagged = WriteTaggedFields::new();
670 tagged.write(buf, &self.unknown_tagged_fields);
671 }
672 Ok(())
673 }
674 fn encoded_len(&self, version: i16) -> usize {
675 let flex = version >= 0;
676 let mut n: usize = 0;
677 if version >= 2 {
678 n += if flex {
679 compact_string_len(self.name)
680 } else {
681 string_len(self.name)
682 };
683 }
684 if version >= 2 {
685 n += if flex {
686 compact_string_len(self.host)
687 } else {
688 string_len(self.host)
689 };
690 }
691 if version >= 2 {
692 n += 2;
693 }
694 if flex {
695 let known_pairs: Vec<(u32, usize)> = Vec::new();
696 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
697 }
698 n
699 }
700}
701impl<'de> DecodeBorrow<'de> for Listener<'de> {
702 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
703 let flex = version >= 0;
704 let mut out = Self::default();
705 if version >= 2 {
706 out.name = if flex {
707 get_compact_string_borrowed(buf)?
708 } else {
709 get_string_borrowed(buf)?
710 };
711 }
712 if version >= 2 {
713 out.host = if flex {
714 get_compact_string_borrowed(buf)?
715 } else {
716 get_string_borrowed(buf)?
717 };
718 }
719 if version >= 2 {
720 out.port = get_u16(buf)?;
721 }
722 if flex {
723 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
724 }
725 Ok(out)
726 }
727}
728#[cfg(test)]
729impl Listener<'_> {
730 #[must_use]
731 pub fn populated(version: i16) -> Self {
732 let mut m = Self::default();
733 if version >= 2 {
734 m.name = "x";
735 }
736 if version >= 2 {
737 m.host = "x";
738 }
739 if version >= 2 {
740 m.port = 1u16;
741 }
742 m
743 }
744}