1use bytes::BufMut;
4
5use crate::primitives::fixed::{
6 get_i16, get_i32, get_i64, get_u16, put_i16, put_i32, put_i64, put_u16,
7};
8use crate::primitives::string_bytes::{
9 compact_nullable_string_len, compact_string_len, nullable_string_len,
10 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
11};
12use crate::primitives::string_bytes_borrowed::{
13 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
14 get_nullable_string_borrowed, get_string_borrowed,
15};
16use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
17use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
18
19pub const API_KEY: i16 = 55;
20pub const MIN_VERSION: i16 = 0;
21pub const MAX_VERSION: i16 = 2;
22pub const FLEXIBLE_MIN: i16 = 0;
23
24#[inline]
25fn is_flexible(version: i16) -> bool {
26 version >= FLEXIBLE_MIN
27}
28
29#[derive(Debug, Clone, PartialEq, Eq, Default)]
30pub struct DescribeQuorumResponse<'a> {
31 pub error_code: i16,
32 pub error_message: Option<&'a str>,
33 pub topics: Vec<TopicData<'a>>,
34 pub nodes: Vec<Node<'a>>,
35 pub unknown_tagged_fields: UnknownTaggedFields,
36}
37impl DescribeQuorumResponse<'_> {
38 pub fn to_owned(&self) -> crate::owned::describe_quorum_response::DescribeQuorumResponse {
39 crate::owned::describe_quorum_response::DescribeQuorumResponse {
40 error_code: (self.error_code),
41 error_message: (self.error_message).map(std::string::ToString::to_string),
42 topics: (self.topics).iter().map(TopicData::to_owned).collect(),
43 nodes: (self.nodes).iter().map(Node::to_owned).collect(),
44 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
45 }
46 }
47}
48impl Encode for DescribeQuorumResponse<'_> {
49 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
50 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
51 return Err(ProtocolError::UnsupportedVersion {
52 api_key: API_KEY,
53 version,
54 });
55 }
56 let flex = is_flexible(version);
57 if version >= 0 {
58 put_i16(buf, self.error_code);
59 }
60 if version >= 2 {
61 if flex {
62 put_compact_nullable_string(buf, self.error_message);
63 } else {
64 put_nullable_string(buf, self.error_message);
65 }
66 }
67 if version >= 0 {
68 {
69 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
70 for it in &self.topics {
71 it.encode(buf, version)?;
72 }
73 }
74 }
75 if version >= 2 {
76 {
77 crate::primitives::array::put_array_len(buf, (self.nodes).len(), flex);
78 for it in &self.nodes {
79 it.encode(buf, version)?;
80 }
81 }
82 }
83 if flex {
84 let tagged = WriteTaggedFields::new();
85 tagged.write(buf, &self.unknown_tagged_fields);
86 }
87 Ok(())
88 }
89 fn encoded_len(&self, version: i16) -> usize {
90 let flex = is_flexible(version);
91 let mut n: usize = 0;
92 if version >= 0 {
93 n += 2;
94 }
95 if version >= 2 {
96 n += if flex {
97 compact_nullable_string_len(self.error_message)
98 } else {
99 nullable_string_len(self.error_message)
100 };
101 }
102 if version >= 0 {
103 n += {
104 let prefix =
105 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
106 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
107 prefix + body
108 };
109 }
110 if version >= 2 {
111 n += {
112 let prefix =
113 crate::primitives::array::array_len_prefix_len((self.nodes).len(), flex);
114 let body: usize = (self.nodes).iter().map(|it| it.encoded_len(version)).sum();
115 prefix + body
116 };
117 }
118 if flex {
119 let known_pairs: Vec<(u32, usize)> = Vec::new();
120 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
121 }
122 n
123 }
124}
125impl<'de> DecodeBorrow<'de> for DescribeQuorumResponse<'de> {
126 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
127 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
128 return Err(ProtocolError::UnsupportedVersion {
129 api_key: API_KEY,
130 version,
131 });
132 }
133 let flex = is_flexible(version);
134 let mut out = Self::default();
135 if version >= 0 {
136 out.error_code = get_i16(buf)?;
137 }
138 if version >= 2 {
139 out.error_message = if flex {
140 get_compact_nullable_string_borrowed(buf)?
141 } else {
142 get_nullable_string_borrowed(buf)?
143 };
144 }
145 if version >= 0 {
146 out.topics = {
147 let n = crate::primitives::array::get_array_len(buf, flex)?;
148 let mut v = Vec::with_capacity(n);
149 for _ in 0..n {
150 v.push(TopicData::decode_borrow(buf, version)?);
151 }
152 v
153 };
154 }
155 if version >= 2 {
156 out.nodes = {
157 let n = crate::primitives::array::get_array_len(buf, flex)?;
158 let mut v = Vec::with_capacity(n);
159 for _ in 0..n {
160 v.push(Node::decode_borrow(buf, version)?);
161 }
162 v
163 };
164 }
165 if flex {
166 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
167 }
168 Ok(out)
169 }
170}
171#[cfg(test)]
172impl DescribeQuorumResponse<'_> {
173 #[must_use]
174 pub fn populated(version: i16) -> Self {
175 let mut m = Self::default();
176 if version >= 0 {
177 m.error_code = 1i16;
178 }
179 if version >= 2 {
180 m.error_message = Some("x");
181 }
182 if version >= 0 {
183 m.topics = vec![TopicData::populated(version)];
184 }
185 if version >= 2 {
186 m.nodes = vec![Node::populated(version)];
187 }
188 m
189 }
190}
191#[derive(Debug, Clone, PartialEq, Eq, Default)]
192pub struct TopicData<'a> {
193 pub topic_name: &'a str,
194 pub partitions: Vec<PartitionData<'a>>,
195 pub unknown_tagged_fields: UnknownTaggedFields,
196}
197impl TopicData<'_> {
198 pub fn to_owned(&self) -> crate::owned::describe_quorum_response::TopicData {
199 crate::owned::describe_quorum_response::TopicData {
200 topic_name: (self.topic_name).to_string(),
201 partitions: (self.partitions)
202 .iter()
203 .map(PartitionData::to_owned)
204 .collect(),
205 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
206 }
207 }
208}
209impl Encode for TopicData<'_> {
210 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
211 let flex = version >= 0;
212 if version >= 0 {
213 if flex {
214 put_compact_string(buf, self.topic_name);
215 } else {
216 put_string(buf, self.topic_name);
217 }
218 }
219 if version >= 0 {
220 {
221 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
222 for it in &self.partitions {
223 it.encode(buf, version)?;
224 }
225 }
226 }
227 if flex {
228 let tagged = WriteTaggedFields::new();
229 tagged.write(buf, &self.unknown_tagged_fields);
230 }
231 Ok(())
232 }
233 fn encoded_len(&self, version: i16) -> usize {
234 let flex = version >= 0;
235 let mut n: usize = 0;
236 if version >= 0 {
237 n += if flex {
238 compact_string_len(self.topic_name)
239 } else {
240 string_len(self.topic_name)
241 };
242 }
243 if version >= 0 {
244 n += {
245 let prefix =
246 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
247 let body: usize = (self.partitions)
248 .iter()
249 .map(|it| it.encoded_len(version))
250 .sum();
251 prefix + body
252 };
253 }
254 if flex {
255 let known_pairs: Vec<(u32, usize)> = Vec::new();
256 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
257 }
258 n
259 }
260}
261impl<'de> DecodeBorrow<'de> for TopicData<'de> {
262 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
263 let flex = version >= 0;
264 let mut out = Self::default();
265 if version >= 0 {
266 out.topic_name = if flex {
267 get_compact_string_borrowed(buf)?
268 } else {
269 get_string_borrowed(buf)?
270 };
271 }
272 if version >= 0 {
273 out.partitions = {
274 let n = crate::primitives::array::get_array_len(buf, flex)?;
275 let mut v = Vec::with_capacity(n);
276 for _ in 0..n {
277 v.push(PartitionData::decode_borrow(buf, version)?);
278 }
279 v
280 };
281 }
282 if flex {
283 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
284 }
285 Ok(out)
286 }
287}
288#[cfg(test)]
289impl TopicData<'_> {
290 #[must_use]
291 pub fn populated(version: i16) -> Self {
292 let mut m = Self::default();
293 if version >= 0 {
294 m.topic_name = "x";
295 }
296 if version >= 0 {
297 m.partitions = vec![PartitionData::populated(version)];
298 }
299 m
300 }
301}
302#[derive(Debug, Clone, PartialEq, Eq, Default)]
303pub struct PartitionData<'a> {
304 pub partition_index: i32,
305 pub error_code: i16,
306 pub error_message: Option<&'a str>,
307 pub leader_id: i32,
308 pub leader_epoch: i32,
309 pub high_watermark: i64,
310 pub current_voters: Vec<super::common::describe_quorum_response::replica_state::ReplicaState>,
311 pub observers: Vec<super::common::describe_quorum_response::replica_state::ReplicaState>,
312 pub unknown_tagged_fields: UnknownTaggedFields,
313}
314impl PartitionData<'_> {
315 pub fn to_owned(&self) -> crate::owned::describe_quorum_response::PartitionData {
316 crate::owned::describe_quorum_response::PartitionData {
317 partition_index: (self.partition_index),
318 error_code: (self.error_code),
319 error_message: (self.error_message).map(std::string::ToString::to_string),
320 leader_id: (self.leader_id),
321 leader_epoch: (self.leader_epoch),
322 high_watermark: (self.high_watermark),
323 current_voters: (self.current_voters)
324 .iter()
325 .map(super::common::describe_quorum_response::replica_state::ReplicaState::to_owned)
326 .collect(),
327 observers: (self.observers)
328 .iter()
329 .map(super::common::describe_quorum_response::replica_state::ReplicaState::to_owned)
330 .collect(),
331 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
332 }
333 }
334}
335impl Encode for PartitionData<'_> {
336 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
337 let flex = version >= 0;
338 if version >= 0 {
339 put_i32(buf, self.partition_index);
340 }
341 if version >= 0 {
342 put_i16(buf, self.error_code);
343 }
344 if version >= 2 {
345 if flex {
346 put_compact_nullable_string(buf, self.error_message);
347 } else {
348 put_nullable_string(buf, self.error_message);
349 }
350 }
351 if version >= 0 {
352 put_i32(buf, self.leader_id);
353 }
354 if version >= 0 {
355 put_i32(buf, self.leader_epoch);
356 }
357 if version >= 0 {
358 put_i64(buf, self.high_watermark);
359 }
360 if version >= 0 {
361 {
362 crate::primitives::array::put_array_len(buf, (self.current_voters).len(), flex);
363 for it in &self.current_voters {
364 it.encode(buf, version)?;
365 }
366 }
367 }
368 if version >= 0 {
369 {
370 crate::primitives::array::put_array_len(buf, (self.observers).len(), flex);
371 for it in &self.observers {
372 it.encode(buf, version)?;
373 }
374 }
375 }
376 if flex {
377 let tagged = WriteTaggedFields::new();
378 tagged.write(buf, &self.unknown_tagged_fields);
379 }
380 Ok(())
381 }
382 fn encoded_len(&self, version: i16) -> usize {
383 let flex = version >= 0;
384 let mut n: usize = 0;
385 if version >= 0 {
386 n += 4;
387 }
388 if version >= 0 {
389 n += 2;
390 }
391 if version >= 2 {
392 n += if flex {
393 compact_nullable_string_len(self.error_message)
394 } else {
395 nullable_string_len(self.error_message)
396 };
397 }
398 if version >= 0 {
399 n += 4;
400 }
401 if version >= 0 {
402 n += 4;
403 }
404 if version >= 0 {
405 n += 8;
406 }
407 if version >= 0 {
408 n += {
409 let prefix = crate::primitives::array::array_len_prefix_len(
410 (self.current_voters).len(),
411 flex,
412 );
413 let body: usize = (self.current_voters)
414 .iter()
415 .map(|it| it.encoded_len(version))
416 .sum();
417 prefix + body
418 };
419 }
420 if version >= 0 {
421 n += {
422 let prefix =
423 crate::primitives::array::array_len_prefix_len((self.observers).len(), flex);
424 let body: usize = (self.observers)
425 .iter()
426 .map(|it| it.encoded_len(version))
427 .sum();
428 prefix + body
429 };
430 }
431 if flex {
432 let known_pairs: Vec<(u32, usize)> = Vec::new();
433 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
434 }
435 n
436 }
437}
438impl<'de> DecodeBorrow<'de> for PartitionData<'de> {
439 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
440 let flex = version >= 0;
441 let mut out = Self::default();
442 if version >= 0 {
443 out.partition_index = get_i32(buf)?;
444 }
445 if version >= 0 {
446 out.error_code = get_i16(buf)?;
447 }
448 if version >= 2 {
449 out.error_message = if flex {
450 get_compact_nullable_string_borrowed(buf)?
451 } else {
452 get_nullable_string_borrowed(buf)?
453 };
454 }
455 if version >= 0 {
456 out.leader_id = get_i32(buf)?;
457 }
458 if version >= 0 {
459 out.leader_epoch = get_i32(buf)?;
460 }
461 if version >= 0 {
462 out.high_watermark = get_i64(buf)?;
463 }
464 if version >= 0 {
465 out.current_voters = {
466 let n = crate::primitives::array::get_array_len(buf, flex)?;
467 let mut v = Vec::with_capacity(n);
468 for _ in 0..n {
469 v . push (super :: common :: describe_quorum_response :: replica_state :: ReplicaState :: decode_borrow (buf , version) ?) ;
470 }
471 v
472 };
473 }
474 if version >= 0 {
475 out.observers = {
476 let n = crate::primitives::array::get_array_len(buf, flex)?;
477 let mut v = Vec::with_capacity(n);
478 for _ in 0..n {
479 v . push (super :: common :: describe_quorum_response :: replica_state :: ReplicaState :: decode_borrow (buf , version) ?) ;
480 }
481 v
482 };
483 }
484 if flex {
485 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
486 }
487 Ok(out)
488 }
489}
490#[cfg(test)]
491impl PartitionData<'_> {
492 #[must_use]
493 pub fn populated(version: i16) -> Self {
494 let mut m = Self::default();
495 if version >= 0 {
496 m.partition_index = 1i32;
497 }
498 if version >= 0 {
499 m.error_code = 1i16;
500 }
501 if version >= 2 {
502 m.error_message = Some("x");
503 }
504 if version >= 0 {
505 m.leader_id = 1i32;
506 }
507 if version >= 0 {
508 m.leader_epoch = 1i32;
509 }
510 if version >= 0 {
511 m.high_watermark = 1i64;
512 }
513 if version >= 0 {
514 m.current_voters = vec![
515 super::common::describe_quorum_response::replica_state::ReplicaState::populated(
516 version,
517 ),
518 ];
519 }
520 if version >= 0 {
521 m.observers = vec![
522 super::common::describe_quorum_response::replica_state::ReplicaState::populated(
523 version,
524 ),
525 ];
526 }
527 m
528 }
529}
530#[derive(Debug, Clone, PartialEq, Eq, Default)]
531pub struct Node<'a> {
532 pub node_id: i32,
533 pub listeners: Vec<Listener<'a>>,
534 pub unknown_tagged_fields: UnknownTaggedFields,
535}
536impl Node<'_> {
537 pub fn to_owned(&self) -> crate::owned::describe_quorum_response::Node {
538 crate::owned::describe_quorum_response::Node {
539 node_id: (self.node_id),
540 listeners: (self.listeners).iter().map(Listener::to_owned).collect(),
541 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
542 }
543 }
544}
545impl Encode for Node<'_> {
546 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
547 let flex = version >= 0;
548 if version >= 2 {
549 put_i32(buf, self.node_id);
550 }
551 if version >= 2 {
552 {
553 crate::primitives::array::put_array_len(buf, (self.listeners).len(), flex);
554 for it in &self.listeners {
555 it.encode(buf, version)?;
556 }
557 }
558 }
559 if flex {
560 let tagged = WriteTaggedFields::new();
561 tagged.write(buf, &self.unknown_tagged_fields);
562 }
563 Ok(())
564 }
565 fn encoded_len(&self, version: i16) -> usize {
566 let flex = version >= 0;
567 let mut n: usize = 0;
568 if version >= 2 {
569 n += 4;
570 }
571 if version >= 2 {
572 n += {
573 let prefix =
574 crate::primitives::array::array_len_prefix_len((self.listeners).len(), flex);
575 let body: usize = (self.listeners)
576 .iter()
577 .map(|it| it.encoded_len(version))
578 .sum();
579 prefix + body
580 };
581 }
582 if flex {
583 let known_pairs: Vec<(u32, usize)> = Vec::new();
584 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
585 }
586 n
587 }
588}
589impl<'de> DecodeBorrow<'de> for Node<'de> {
590 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
591 let flex = version >= 0;
592 let mut out = Self::default();
593 if version >= 2 {
594 out.node_id = get_i32(buf)?;
595 }
596 if version >= 2 {
597 out.listeners = {
598 let n = crate::primitives::array::get_array_len(buf, flex)?;
599 let mut v = Vec::with_capacity(n);
600 for _ in 0..n {
601 v.push(Listener::decode_borrow(buf, version)?);
602 }
603 v
604 };
605 }
606 if flex {
607 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
608 }
609 Ok(out)
610 }
611}
612#[cfg(test)]
613impl Node<'_> {
614 #[must_use]
615 pub fn populated(version: i16) -> Self {
616 let mut m = Self::default();
617 if version >= 2 {
618 m.node_id = 1i32;
619 }
620 if version >= 2 {
621 m.listeners = vec![Listener::populated(version)];
622 }
623 m
624 }
625}
626#[derive(Debug, Clone, PartialEq, Eq, Default)]
627pub struct Listener<'a> {
628 pub name: &'a str,
629 pub host: &'a str,
630 pub port: u16,
631 pub unknown_tagged_fields: UnknownTaggedFields,
632}
633impl Listener<'_> {
634 pub fn to_owned(&self) -> crate::owned::describe_quorum_response::Listener {
635 crate::owned::describe_quorum_response::Listener {
636 name: (self.name).to_string(),
637 host: (self.host).to_string(),
638 port: (self.port),
639 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
640 }
641 }
642}
643impl Encode for Listener<'_> {
644 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
645 let flex = version >= 0;
646 if version >= 2 {
647 if flex {
648 put_compact_string(buf, self.name);
649 } else {
650 put_string(buf, self.name);
651 }
652 }
653 if version >= 2 {
654 if flex {
655 put_compact_string(buf, self.host);
656 } else {
657 put_string(buf, self.host);
658 }
659 }
660 if version >= 2 {
661 put_u16(buf, self.port);
662 }
663 if flex {
664 let tagged = WriteTaggedFields::new();
665 tagged.write(buf, &self.unknown_tagged_fields);
666 }
667 Ok(())
668 }
669 fn encoded_len(&self, version: i16) -> usize {
670 let flex = version >= 0;
671 let mut n: usize = 0;
672 if version >= 2 {
673 n += if flex {
674 compact_string_len(self.name)
675 } else {
676 string_len(self.name)
677 };
678 }
679 if version >= 2 {
680 n += if flex {
681 compact_string_len(self.host)
682 } else {
683 string_len(self.host)
684 };
685 }
686 if version >= 2 {
687 n += 2;
688 }
689 if flex {
690 let known_pairs: Vec<(u32, usize)> = Vec::new();
691 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
692 }
693 n
694 }
695}
696impl<'de> DecodeBorrow<'de> for Listener<'de> {
697 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
698 let flex = version >= 0;
699 let mut out = Self::default();
700 if version >= 2 {
701 out.name = if flex {
702 get_compact_string_borrowed(buf)?
703 } else {
704 get_string_borrowed(buf)?
705 };
706 }
707 if version >= 2 {
708 out.host = if flex {
709 get_compact_string_borrowed(buf)?
710 } else {
711 get_string_borrowed(buf)?
712 };
713 }
714 if version >= 2 {
715 out.port = get_u16(buf)?;
716 }
717 if flex {
718 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
719 }
720 Ok(out)
721 }
722}
723#[cfg(test)]
724impl Listener<'_> {
725 #[must_use]
726 pub fn populated(version: i16) -> Self {
727 let mut m = Self::default();
728 if version >= 2 {
729 m.name = "x";
730 }
731 if version >= 2 {
732 m.host = "x";
733 }
734 if version >= 2 {
735 m.port = 1u16;
736 }
737 m
738 }
739}