1use crate::primitives::fixed::{
4 get_i16, get_i32, get_i64, get_u16, put_i16, put_i32, put_i64, put_u16,
5};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
13use bytes::{Buf, BufMut};
14pub const API_KEY: i16 = 55;
15pub const MIN_VERSION: i16 = 0;
16pub const MAX_VERSION: i16 = 2;
17pub const FLEXIBLE_MIN: i16 = 0;
18#[inline]
19fn is_flexible(version: i16) -> bool {
20 version >= FLEXIBLE_MIN
21}
22#[derive(Debug, Clone, PartialEq, Eq, Default)]
23pub struct DescribeQuorumResponse {
24 pub error_code: i16,
25 pub error_message: Option<String>,
26 pub topics: Vec<TopicData>,
27 pub nodes: Vec<Node>,
28 pub unknown_tagged_fields: UnknownTaggedFields,
29}
30impl Encode for DescribeQuorumResponse {
31 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
32 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
33 return Err(ProtocolError::UnsupportedVersion {
34 api_key: API_KEY,
35 version,
36 });
37 }
38 let flex = is_flexible(version);
39 if version >= 0 {
40 put_i16(buf, self.error_code);
41 }
42 if version >= 2 {
43 if flex {
44 put_compact_nullable_string(buf, self.error_message.as_deref());
45 } else {
46 put_nullable_string(buf, self.error_message.as_deref());
47 }
48 }
49 if version >= 0 {
50 {
51 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
52 for it in &self.topics {
53 it.encode(buf, version)?;
54 }
55 }
56 }
57 if version >= 2 {
58 {
59 crate::primitives::array::put_array_len(buf, (self.nodes).len(), flex);
60 for it in &self.nodes {
61 it.encode(buf, version)?;
62 }
63 }
64 }
65 if flex {
66 let tagged = WriteTaggedFields::new();
67 tagged.write(buf, &self.unknown_tagged_fields);
68 }
69 Ok(())
70 }
71 fn encoded_len(&self, version: i16) -> usize {
72 let flex = is_flexible(version);
73 let mut n: usize = 0;
74 if version >= 0 {
75 n += 2;
76 }
77 if version >= 2 {
78 n += if flex {
79 compact_nullable_string_len(self.error_message.as_deref())
80 } else {
81 nullable_string_len(self.error_message.as_deref())
82 };
83 }
84 if version >= 0 {
85 n += {
86 let prefix =
87 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
88 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
89 prefix + body
90 };
91 }
92 if version >= 2 {
93 n += {
94 let prefix =
95 crate::primitives::array::array_len_prefix_len((self.nodes).len(), flex);
96 let body: usize = (self.nodes).iter().map(|it| it.encoded_len(version)).sum();
97 prefix + body
98 };
99 }
100 if flex {
101 let known_pairs: Vec<(u32, usize)> = Vec::new();
102 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
103 }
104 n
105 }
106}
107impl Decode<'_> for DescribeQuorumResponse {
108 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
109 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
110 return Err(ProtocolError::UnsupportedVersion {
111 api_key: API_KEY,
112 version,
113 });
114 }
115 let flex = is_flexible(version);
116 let mut out = Self::default();
117 if version >= 0 {
118 out.error_code = get_i16(buf)?;
119 }
120 if version >= 2 {
121 out.error_message = if flex {
122 get_compact_nullable_string_owned(buf)?
123 } else {
124 get_nullable_string_owned(buf)?
125 };
126 }
127 if version >= 0 {
128 out.topics = {
129 let n = crate::primitives::array::get_array_len(buf, flex)?;
130 let mut v = Vec::with_capacity(n);
131 for _ in 0..n {
132 v.push(TopicData::decode(buf, version)?);
133 }
134 v
135 };
136 }
137 if version >= 2 {
138 out.nodes = {
139 let n = crate::primitives::array::get_array_len(buf, flex)?;
140 let mut v = Vec::with_capacity(n);
141 for _ in 0..n {
142 v.push(Node::decode(buf, version)?);
143 }
144 v
145 };
146 }
147 if flex {
148 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
149 }
150 Ok(out)
151 }
152}
153#[cfg(test)]
154impl DescribeQuorumResponse {
155 #[must_use]
156 pub fn populated(version: i16) -> Self {
157 let mut m = Self::default();
158 if version >= 0 {
159 m.error_code = 1i16;
160 }
161 if version >= 2 {
162 m.error_message = Some("x".to_string());
163 }
164 if version >= 0 {
165 m.topics = vec![TopicData::populated(version)];
166 }
167 if version >= 2 {
168 m.nodes = vec![Node::populated(version)];
169 }
170 m
171 }
172}
173#[derive(Debug, Clone, PartialEq, Eq, Default)]
174pub struct TopicData {
175 pub topic_name: String,
176 pub partitions: Vec<PartitionData>,
177 pub unknown_tagged_fields: UnknownTaggedFields,
178}
179impl Encode for TopicData {
180 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
181 let flex = version >= 0;
182 if version >= 0 {
183 if flex {
184 put_compact_string(buf, &self.topic_name);
185 } else {
186 put_string(buf, &self.topic_name);
187 }
188 }
189 if version >= 0 {
190 {
191 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
192 for it in &self.partitions {
193 it.encode(buf, version)?;
194 }
195 }
196 }
197 if flex {
198 let tagged = WriteTaggedFields::new();
199 tagged.write(buf, &self.unknown_tagged_fields);
200 }
201 Ok(())
202 }
203 fn encoded_len(&self, version: i16) -> usize {
204 let flex = version >= 0;
205 let mut n: usize = 0;
206 if version >= 0 {
207 n += if flex {
208 compact_string_len(&self.topic_name)
209 } else {
210 string_len(&self.topic_name)
211 };
212 }
213 if version >= 0 {
214 n += {
215 let prefix =
216 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
217 let body: usize = (self.partitions)
218 .iter()
219 .map(|it| it.encoded_len(version))
220 .sum();
221 prefix + body
222 };
223 }
224 if flex {
225 let known_pairs: Vec<(u32, usize)> = Vec::new();
226 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
227 }
228 n
229 }
230}
231impl Decode<'_> for TopicData {
232 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
233 let flex = version >= 0;
234 let mut out = Self::default();
235 if version >= 0 {
236 out.topic_name = if flex {
237 get_compact_string_owned(buf)?
238 } else {
239 get_string_owned(buf)?
240 };
241 }
242 if version >= 0 {
243 out.partitions = {
244 let n = crate::primitives::array::get_array_len(buf, flex)?;
245 let mut v = Vec::with_capacity(n);
246 for _ in 0..n {
247 v.push(PartitionData::decode(buf, version)?);
248 }
249 v
250 };
251 }
252 if flex {
253 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
254 }
255 Ok(out)
256 }
257}
258#[cfg(test)]
259impl TopicData {
260 #[must_use]
261 pub fn populated(version: i16) -> Self {
262 let mut m = Self::default();
263 if version >= 0 {
264 m.topic_name = "x".to_string();
265 }
266 if version >= 0 {
267 m.partitions = vec![PartitionData::populated(version)];
268 }
269 m
270 }
271}
272#[derive(Debug, Clone, PartialEq, Eq, Default)]
273pub struct PartitionData {
274 pub partition_index: i32,
275 pub error_code: i16,
276 pub error_message: Option<String>,
277 pub leader_id: i32,
278 pub leader_epoch: i32,
279 pub high_watermark: i64,
280 pub current_voters: Vec<super::common::describe_quorum_response::replica_state::ReplicaState>,
281 pub observers: Vec<super::common::describe_quorum_response::replica_state::ReplicaState>,
282 pub unknown_tagged_fields: UnknownTaggedFields,
283}
284impl Encode for PartitionData {
285 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
286 let flex = version >= 0;
287 if version >= 0 {
288 put_i32(buf, self.partition_index);
289 }
290 if version >= 0 {
291 put_i16(buf, self.error_code);
292 }
293 if version >= 2 {
294 if flex {
295 put_compact_nullable_string(buf, self.error_message.as_deref());
296 } else {
297 put_nullable_string(buf, self.error_message.as_deref());
298 }
299 }
300 if version >= 0 {
301 put_i32(buf, self.leader_id);
302 }
303 if version >= 0 {
304 put_i32(buf, self.leader_epoch);
305 }
306 if version >= 0 {
307 put_i64(buf, self.high_watermark);
308 }
309 if version >= 0 {
310 {
311 crate::primitives::array::put_array_len(buf, (self.current_voters).len(), flex);
312 for it in &self.current_voters {
313 it.encode(buf, version)?;
314 }
315 }
316 }
317 if version >= 0 {
318 {
319 crate::primitives::array::put_array_len(buf, (self.observers).len(), flex);
320 for it in &self.observers {
321 it.encode(buf, version)?;
322 }
323 }
324 }
325 if flex {
326 let tagged = WriteTaggedFields::new();
327 tagged.write(buf, &self.unknown_tagged_fields);
328 }
329 Ok(())
330 }
331 fn encoded_len(&self, version: i16) -> usize {
332 let flex = version >= 0;
333 let mut n: usize = 0;
334 if version >= 0 {
335 n += 4;
336 }
337 if version >= 0 {
338 n += 2;
339 }
340 if version >= 2 {
341 n += if flex {
342 compact_nullable_string_len(self.error_message.as_deref())
343 } else {
344 nullable_string_len(self.error_message.as_deref())
345 };
346 }
347 if version >= 0 {
348 n += 4;
349 }
350 if version >= 0 {
351 n += 4;
352 }
353 if version >= 0 {
354 n += 8;
355 }
356 if version >= 0 {
357 n += {
358 let prefix = crate::primitives::array::array_len_prefix_len(
359 (self.current_voters).len(),
360 flex,
361 );
362 let body: usize = (self.current_voters)
363 .iter()
364 .map(|it| it.encoded_len(version))
365 .sum();
366 prefix + body
367 };
368 }
369 if version >= 0 {
370 n += {
371 let prefix =
372 crate::primitives::array::array_len_prefix_len((self.observers).len(), flex);
373 let body: usize = (self.observers)
374 .iter()
375 .map(|it| it.encoded_len(version))
376 .sum();
377 prefix + body
378 };
379 }
380 if flex {
381 let known_pairs: Vec<(u32, usize)> = Vec::new();
382 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
383 }
384 n
385 }
386}
387impl Decode<'_> for PartitionData {
388 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
389 let flex = version >= 0;
390 let mut out = Self::default();
391 if version >= 0 {
392 out.partition_index = get_i32(buf)?;
393 }
394 if version >= 0 {
395 out.error_code = get_i16(buf)?;
396 }
397 if version >= 2 {
398 out.error_message = if flex {
399 get_compact_nullable_string_owned(buf)?
400 } else {
401 get_nullable_string_owned(buf)?
402 };
403 }
404 if version >= 0 {
405 out.leader_id = get_i32(buf)?;
406 }
407 if version >= 0 {
408 out.leader_epoch = get_i32(buf)?;
409 }
410 if version >= 0 {
411 out.high_watermark = get_i64(buf)?;
412 }
413 if version >= 0 {
414 out.current_voters = {
415 let n = crate::primitives::array::get_array_len(buf, flex)?;
416 let mut v = Vec::with_capacity(n);
417 for _ in 0..n {
418 v.push(
419 super::common::describe_quorum_response::replica_state::ReplicaState::decode(
420 buf,
421 version,
422 )?,
423 );
424 }
425 v
426 };
427 }
428 if version >= 0 {
429 out.observers = {
430 let n = crate::primitives::array::get_array_len(buf, flex)?;
431 let mut v = Vec::with_capacity(n);
432 for _ in 0..n {
433 v.push(
434 super::common::describe_quorum_response::replica_state::ReplicaState::decode(
435 buf,
436 version,
437 )?,
438 );
439 }
440 v
441 };
442 }
443 if flex {
444 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
445 }
446 Ok(out)
447 }
448}
449#[cfg(test)]
450impl PartitionData {
451 #[must_use]
452 pub fn populated(version: i16) -> Self {
453 let mut m = Self::default();
454 if version >= 0 {
455 m.partition_index = 1i32;
456 }
457 if version >= 0 {
458 m.error_code = 1i16;
459 }
460 if version >= 2 {
461 m.error_message = Some("x".to_string());
462 }
463 if version >= 0 {
464 m.leader_id = 1i32;
465 }
466 if version >= 0 {
467 m.leader_epoch = 1i32;
468 }
469 if version >= 0 {
470 m.high_watermark = 1i64;
471 }
472 if version >= 0 {
473 m.current_voters = vec![
474 super::common::describe_quorum_response::replica_state::ReplicaState::populated(
475 version,
476 ),
477 ];
478 }
479 if version >= 0 {
480 m.observers = vec![
481 super::common::describe_quorum_response::replica_state::ReplicaState::populated(
482 version,
483 ),
484 ];
485 }
486 m
487 }
488}
489#[derive(Debug, Clone, PartialEq, Eq, Default)]
490pub struct Node {
491 pub node_id: i32,
492 pub listeners: Vec<Listener>,
493 pub unknown_tagged_fields: UnknownTaggedFields,
494}
495impl Encode for Node {
496 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
497 let flex = version >= 0;
498 if version >= 2 {
499 put_i32(buf, self.node_id);
500 }
501 if version >= 2 {
502 {
503 crate::primitives::array::put_array_len(buf, (self.listeners).len(), flex);
504 for it in &self.listeners {
505 it.encode(buf, version)?;
506 }
507 }
508 }
509 if flex {
510 let tagged = WriteTaggedFields::new();
511 tagged.write(buf, &self.unknown_tagged_fields);
512 }
513 Ok(())
514 }
515 fn encoded_len(&self, version: i16) -> usize {
516 let flex = version >= 0;
517 let mut n: usize = 0;
518 if version >= 2 {
519 n += 4;
520 }
521 if version >= 2 {
522 n += {
523 let prefix =
524 crate::primitives::array::array_len_prefix_len((self.listeners).len(), flex);
525 let body: usize = (self.listeners)
526 .iter()
527 .map(|it| it.encoded_len(version))
528 .sum();
529 prefix + body
530 };
531 }
532 if flex {
533 let known_pairs: Vec<(u32, usize)> = Vec::new();
534 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
535 }
536 n
537 }
538}
539impl Decode<'_> for Node {
540 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
541 let flex = version >= 0;
542 let mut out = Self::default();
543 if version >= 2 {
544 out.node_id = get_i32(buf)?;
545 }
546 if version >= 2 {
547 out.listeners = {
548 let n = crate::primitives::array::get_array_len(buf, flex)?;
549 let mut v = Vec::with_capacity(n);
550 for _ in 0..n {
551 v.push(Listener::decode(buf, version)?);
552 }
553 v
554 };
555 }
556 if flex {
557 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
558 }
559 Ok(out)
560 }
561}
562#[cfg(test)]
563impl Node {
564 #[must_use]
565 pub fn populated(version: i16) -> Self {
566 let mut m = Self::default();
567 if version >= 2 {
568 m.node_id = 1i32;
569 }
570 if version >= 2 {
571 m.listeners = vec![Listener::populated(version)];
572 }
573 m
574 }
575}
576#[derive(Debug, Clone, PartialEq, Eq, Default)]
577pub struct Listener {
578 pub name: String,
579 pub host: String,
580 pub port: u16,
581 pub unknown_tagged_fields: UnknownTaggedFields,
582}
583impl Encode for Listener {
584 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
585 let flex = version >= 0;
586 if version >= 2 {
587 if flex {
588 put_compact_string(buf, &self.name);
589 } else {
590 put_string(buf, &self.name);
591 }
592 }
593 if version >= 2 {
594 if flex {
595 put_compact_string(buf, &self.host);
596 } else {
597 put_string(buf, &self.host);
598 }
599 }
600 if version >= 2 {
601 put_u16(buf, self.port);
602 }
603 if flex {
604 let tagged = WriteTaggedFields::new();
605 tagged.write(buf, &self.unknown_tagged_fields);
606 }
607 Ok(())
608 }
609 fn encoded_len(&self, version: i16) -> usize {
610 let flex = version >= 0;
611 let mut n: usize = 0;
612 if version >= 2 {
613 n += if flex {
614 compact_string_len(&self.name)
615 } else {
616 string_len(&self.name)
617 };
618 }
619 if version >= 2 {
620 n += if flex {
621 compact_string_len(&self.host)
622 } else {
623 string_len(&self.host)
624 };
625 }
626 if version >= 2 {
627 n += 2;
628 }
629 if flex {
630 let known_pairs: Vec<(u32, usize)> = Vec::new();
631 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
632 }
633 n
634 }
635}
636impl Decode<'_> for Listener {
637 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
638 let flex = version >= 0;
639 let mut out = Self::default();
640 if version >= 2 {
641 out.name = if flex {
642 get_compact_string_owned(buf)?
643 } else {
644 get_string_owned(buf)?
645 };
646 }
647 if version >= 2 {
648 out.host = if flex {
649 get_compact_string_owned(buf)?
650 } else {
651 get_string_owned(buf)?
652 };
653 }
654 if version >= 2 {
655 out.port = get_u16(buf)?;
656 }
657 if flex {
658 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
659 }
660 Ok(out)
661 }
662}
663#[cfg(test)]
664impl Listener {
665 #[must_use]
666 pub fn populated(version: i16) -> Self {
667 let mut m = Self::default();
668 if version >= 2 {
669 m.name = "x".to_string();
670 }
671 if version >= 2 {
672 m.host = "x".to_string();
673 }
674 if version >= 2 {
675 m.port = 1u16;
676 }
677 m
678 }
679}
680#[must_use]
683#[allow(unused_comparisons)]
684pub fn default_json(version: i16) -> ::serde_json::Value {
685 let mut obj = ::serde_json::Map::new();
686 obj.insert("errorCode".to_string(), ::serde_json::json!(0));
687 if version >= 2 {
688 obj.insert("errorMessage".to_string(), ::serde_json::Value::Null);
689 }
690 obj.insert("topics".to_string(), ::serde_json::Value::Array(vec![]));
691 if version >= 2 {
692 obj.insert("nodes".to_string(), ::serde_json::Value::Array(vec![]));
693 }
694 ::serde_json::Value::Object(obj)
695}