1use crate::primitives::fixed::{get_bool, get_i16, get_i32, put_bool, put_i16, put_i32};
4use crate::primitives::string_bytes::{
5 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
6 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
7 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
8};
9use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
10use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
11use bytes::{Buf, BufMut};
12pub const API_KEY: i16 = 3;
13pub const MIN_VERSION: i16 = 0;
14pub const MAX_VERSION: i16 = 13;
15pub const FLEXIBLE_MIN: i16 = 9;
16#[inline]
17fn is_flexible(version: i16) -> bool {
18 version >= FLEXIBLE_MIN
19}
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct MetadataResponse {
22 pub throttle_time_ms: i32,
23 pub brokers: Vec<MetadataResponseBroker>,
24 pub cluster_id: Option<String>,
25 pub controller_id: i32,
26 pub topics: Vec<MetadataResponseTopic>,
27 pub cluster_authorized_operations: i32,
28 pub error_code: i16,
29 pub unknown_tagged_fields: UnknownTaggedFields,
30}
31impl Default for MetadataResponse {
32 fn default() -> Self {
33 Self {
34 throttle_time_ms: 0i32,
35 brokers: Vec::new(),
36 cluster_id: None,
37 controller_id: -1i32,
38 topics: Vec::new(),
39 cluster_authorized_operations: -2_147_483_648i32,
40 error_code: 0i16,
41 unknown_tagged_fields: Default::default(),
42 }
43 }
44}
45impl Encode for MetadataResponse {
46 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
47 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
48 return Err(ProtocolError::UnsupportedVersion {
49 api_key: API_KEY,
50 version,
51 });
52 }
53 let flex = is_flexible(version);
54 if version >= 3 {
55 put_i32(buf, self.throttle_time_ms);
56 }
57 if version >= 0 {
58 {
59 crate::primitives::array::put_array_len(buf, (self.brokers).len(), flex);
60 for it in &self.brokers {
61 it.encode(buf, version)?;
62 }
63 }
64 }
65 if version >= 2 {
66 if flex {
67 put_compact_nullable_string(buf, self.cluster_id.as_deref());
68 } else {
69 put_nullable_string(buf, self.cluster_id.as_deref());
70 }
71 }
72 if version >= 1 {
73 put_i32(buf, self.controller_id);
74 }
75 if version >= 0 {
76 {
77 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
78 for it in &self.topics {
79 it.encode(buf, version)?;
80 }
81 }
82 }
83 if (8..=10).contains(&version) {
84 put_i32(buf, self.cluster_authorized_operations);
85 }
86 if version >= 13 {
87 put_i16(buf, self.error_code);
88 }
89 if flex {
90 let tagged = WriteTaggedFields::new();
91 tagged.write(buf, &self.unknown_tagged_fields);
92 }
93 Ok(())
94 }
95 fn encoded_len(&self, version: i16) -> usize {
96 let flex = is_flexible(version);
97 let mut n: usize = 0;
98 if version >= 3 {
99 n += 4;
100 }
101 if version >= 0 {
102 n += {
103 let prefix =
104 crate::primitives::array::array_len_prefix_len((self.brokers).len(), flex);
105 let body: usize = (self.brokers)
106 .iter()
107 .map(|it| it.encoded_len(version))
108 .sum();
109 prefix + body
110 };
111 }
112 if version >= 2 {
113 n += if flex {
114 compact_nullable_string_len(self.cluster_id.as_deref())
115 } else {
116 nullable_string_len(self.cluster_id.as_deref())
117 };
118 }
119 if version >= 1 {
120 n += 4;
121 }
122 if version >= 0 {
123 n += {
124 let prefix =
125 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
126 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
127 prefix + body
128 };
129 }
130 if (8..=10).contains(&version) {
131 n += 4;
132 }
133 if version >= 13 {
134 n += 2;
135 }
136 if flex {
137 let known_pairs: Vec<(u32, usize)> = Vec::new();
138 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
139 }
140 n
141 }
142}
143impl Decode<'_> for MetadataResponse {
144 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
145 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
146 return Err(ProtocolError::UnsupportedVersion {
147 api_key: API_KEY,
148 version,
149 });
150 }
151 let flex = is_flexible(version);
152 let mut out = Self::default();
153 if version >= 3 {
154 out.throttle_time_ms = get_i32(buf)?;
155 }
156 if version >= 0 {
157 out.brokers = {
158 let n = crate::primitives::array::get_array_len(buf, flex)?;
159 let mut v = Vec::with_capacity(n);
160 for _ in 0..n {
161 v.push(MetadataResponseBroker::decode(buf, version)?);
162 }
163 v
164 };
165 }
166 if version >= 2 {
167 out.cluster_id = if flex {
168 get_compact_nullable_string_owned(buf)?
169 } else {
170 get_nullable_string_owned(buf)?
171 };
172 }
173 if version >= 1 {
174 out.controller_id = get_i32(buf)?;
175 }
176 if version >= 0 {
177 out.topics = {
178 let n = crate::primitives::array::get_array_len(buf, flex)?;
179 let mut v = Vec::with_capacity(n);
180 for _ in 0..n {
181 v.push(MetadataResponseTopic::decode(buf, version)?);
182 }
183 v
184 };
185 }
186 if (8..=10).contains(&version) {
187 out.cluster_authorized_operations = get_i32(buf)?;
188 }
189 if version >= 13 {
190 out.error_code = get_i16(buf)?;
191 }
192 if flex {
193 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
194 }
195 Ok(out)
196 }
197}
198#[cfg(test)]
199impl MetadataResponse {
200 #[must_use]
201 pub fn populated(version: i16) -> Self {
202 let mut m = Self::default();
203 if version >= 3 {
204 m.throttle_time_ms = 1i32;
205 }
206 if version >= 0 {
207 m.brokers = vec![MetadataResponseBroker::populated(version)];
208 }
209 if version >= 2 {
210 m.cluster_id = Some("x".to_string());
211 }
212 if version >= 1 {
213 m.controller_id = 1i32;
214 }
215 if version >= 0 {
216 m.topics = vec![MetadataResponseTopic::populated(version)];
217 }
218 if (8..=10).contains(&version) {
219 m.cluster_authorized_operations = 1i32;
220 }
221 if version >= 13 {
222 m.error_code = 1i16;
223 }
224 m
225 }
226}
227#[derive(Debug, Clone, PartialEq, Eq, Default)]
228pub struct MetadataResponseBroker {
229 pub node_id: i32,
230 pub host: String,
231 pub port: i32,
232 pub rack: Option<String>,
233 pub unknown_tagged_fields: UnknownTaggedFields,
234}
235impl Encode for MetadataResponseBroker {
236 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
237 let flex = version >= 9;
238 if version >= 0 {
239 put_i32(buf, self.node_id);
240 }
241 if version >= 0 {
242 if flex {
243 put_compact_string(buf, &self.host);
244 } else {
245 put_string(buf, &self.host);
246 }
247 }
248 if version >= 0 {
249 put_i32(buf, self.port);
250 }
251 if version >= 1 {
252 if flex {
253 put_compact_nullable_string(buf, self.rack.as_deref());
254 } else {
255 put_nullable_string(buf, self.rack.as_deref());
256 }
257 }
258 if flex {
259 let tagged = WriteTaggedFields::new();
260 tagged.write(buf, &self.unknown_tagged_fields);
261 }
262 Ok(())
263 }
264 fn encoded_len(&self, version: i16) -> usize {
265 let flex = version >= 9;
266 let mut n: usize = 0;
267 if version >= 0 {
268 n += 4;
269 }
270 if version >= 0 {
271 n += if flex {
272 compact_string_len(&self.host)
273 } else {
274 string_len(&self.host)
275 };
276 }
277 if version >= 0 {
278 n += 4;
279 }
280 if version >= 1 {
281 n += if flex {
282 compact_nullable_string_len(self.rack.as_deref())
283 } else {
284 nullable_string_len(self.rack.as_deref())
285 };
286 }
287 if flex {
288 let known_pairs: Vec<(u32, usize)> = Vec::new();
289 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
290 }
291 n
292 }
293}
294impl Decode<'_> for MetadataResponseBroker {
295 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
296 let flex = version >= 9;
297 let mut out = Self::default();
298 if version >= 0 {
299 out.node_id = get_i32(buf)?;
300 }
301 if version >= 0 {
302 out.host = if flex {
303 get_compact_string_owned(buf)?
304 } else {
305 get_string_owned(buf)?
306 };
307 }
308 if version >= 0 {
309 out.port = get_i32(buf)?;
310 }
311 if version >= 1 {
312 out.rack = if flex {
313 get_compact_nullable_string_owned(buf)?
314 } else {
315 get_nullable_string_owned(buf)?
316 };
317 }
318 if flex {
319 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
320 }
321 Ok(out)
322 }
323}
324#[cfg(test)]
325impl MetadataResponseBroker {
326 #[must_use]
327 pub fn populated(version: i16) -> Self {
328 let mut m = Self::default();
329 if version >= 0 {
330 m.node_id = 1i32;
331 }
332 if version >= 0 {
333 m.host = "x".to_string();
334 }
335 if version >= 0 {
336 m.port = 1i32;
337 }
338 if version >= 1 {
339 m.rack = Some("x".to_string());
340 }
341 m
342 }
343}
344#[derive(Debug, Clone, PartialEq, Eq)]
345pub struct MetadataResponseTopic {
346 pub error_code: i16,
347 pub name: Option<String>,
348 pub topic_id: crate::primitives::uuid::Uuid,
349 pub is_internal: bool,
350 pub partitions: Vec<MetadataResponsePartition>,
351 pub topic_authorized_operations: i32,
352 pub unknown_tagged_fields: UnknownTaggedFields,
353}
354impl Default for MetadataResponseTopic {
355 fn default() -> Self {
356 Self {
357 error_code: 0i16,
358 name: None,
359 topic_id: Default::default(),
360 is_internal: false,
361 partitions: Vec::new(),
362 topic_authorized_operations: -2_147_483_648i32,
363 unknown_tagged_fields: Default::default(),
364 }
365 }
366}
367impl Encode for MetadataResponseTopic {
368 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
369 let flex = version >= 9;
370 if version >= 0 {
371 put_i16(buf, self.error_code);
372 }
373 if version >= 0 {
374 if version >= 12 {
375 if flex {
376 put_compact_nullable_string(buf, self.name.as_deref());
377 } else {
378 put_nullable_string(buf, self.name.as_deref());
379 }
380 } else {
381 if flex {
382 put_compact_string(buf, (self.name).as_deref().unwrap_or(""));
383 } else {
384 put_string(buf, (self.name).as_deref().unwrap_or(""));
385 }
386 }
387 }
388 if version >= 10 {
389 crate::primitives::uuid::put_uuid(buf, self.topic_id);
390 }
391 if version >= 1 {
392 put_bool(buf, self.is_internal);
393 }
394 if version >= 0 {
395 {
396 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
397 for it in &self.partitions {
398 it.encode(buf, version)?;
399 }
400 }
401 }
402 if version >= 8 {
403 put_i32(buf, self.topic_authorized_operations);
404 }
405 if flex {
406 let tagged = WriteTaggedFields::new();
407 tagged.write(buf, &self.unknown_tagged_fields);
408 }
409 Ok(())
410 }
411 fn encoded_len(&self, version: i16) -> usize {
412 let flex = version >= 9;
413 let mut n: usize = 0;
414 if version >= 0 {
415 n += 2;
416 }
417 if version >= 0 {
418 n += if version >= 12 {
419 if flex {
420 compact_nullable_string_len(self.name.as_deref())
421 } else {
422 nullable_string_len(self.name.as_deref())
423 }
424 } else {
425 if flex {
426 compact_string_len((self.name).as_deref().unwrap_or(""))
427 } else {
428 string_len((self.name).as_deref().unwrap_or(""))
429 }
430 };
431 }
432 if version >= 10 {
433 n += 16;
434 }
435 if version >= 1 {
436 n += 1;
437 }
438 if version >= 0 {
439 n += {
440 let prefix =
441 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
442 let body: usize = (self.partitions)
443 .iter()
444 .map(|it| it.encoded_len(version))
445 .sum();
446 prefix + body
447 };
448 }
449 if version >= 8 {
450 n += 4;
451 }
452 if flex {
453 let known_pairs: Vec<(u32, usize)> = Vec::new();
454 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
455 }
456 n
457 }
458}
459impl Decode<'_> for MetadataResponseTopic {
460 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
461 let flex = version >= 9;
462 let mut out = Self::default();
463 if version >= 0 {
464 out.error_code = get_i16(buf)?;
465 }
466 if version >= 0 {
467 out.name = if version >= 12 {
468 if flex {
469 get_compact_nullable_string_owned(buf)?
470 } else {
471 get_nullable_string_owned(buf)?
472 }
473 } else {
474 Some(if flex {
475 get_compact_string_owned(buf)?
476 } else {
477 get_string_owned(buf)?
478 })
479 };
480 }
481 if version >= 10 {
482 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
483 }
484 if version >= 1 {
485 out.is_internal = get_bool(buf)?;
486 }
487 if version >= 0 {
488 out.partitions = {
489 let n = crate::primitives::array::get_array_len(buf, flex)?;
490 let mut v = Vec::with_capacity(n);
491 for _ in 0..n {
492 v.push(MetadataResponsePartition::decode(buf, version)?);
493 }
494 v
495 };
496 }
497 if version >= 8 {
498 out.topic_authorized_operations = get_i32(buf)?;
499 }
500 if flex {
501 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
502 }
503 Ok(out)
504 }
505}
506#[cfg(test)]
507impl MetadataResponseTopic {
508 #[must_use]
509 pub fn populated(version: i16) -> Self {
510 let mut m = Self::default();
511 if version >= 0 {
512 m.error_code = 1i16;
513 }
514 if version >= 0 {
515 m.name = Some("x".to_string());
516 }
517 if version >= 10 {
518 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
519 }
520 if version >= 1 {
521 m.is_internal = true;
522 }
523 if version >= 0 {
524 m.partitions = vec![MetadataResponsePartition::populated(version)];
525 }
526 if version >= 8 {
527 m.topic_authorized_operations = 1i32;
528 }
529 m
530 }
531}
532#[derive(Debug, Clone, PartialEq, Eq)]
533pub struct MetadataResponsePartition {
534 pub error_code: i16,
535 pub partition_index: i32,
536 pub leader_id: i32,
537 pub leader_epoch: i32,
538 pub replica_nodes: Vec<i32>,
539 pub isr_nodes: Vec<i32>,
540 pub offline_replicas: Vec<i32>,
541 pub unknown_tagged_fields: UnknownTaggedFields,
542}
543impl Default for MetadataResponsePartition {
544 fn default() -> Self {
545 Self {
546 error_code: 0i16,
547 partition_index: 0i32,
548 leader_id: 0i32,
549 leader_epoch: -1i32,
550 replica_nodes: Vec::new(),
551 isr_nodes: Vec::new(),
552 offline_replicas: Vec::new(),
553 unknown_tagged_fields: Default::default(),
554 }
555 }
556}
557impl Encode for MetadataResponsePartition {
558 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
559 let flex = version >= 9;
560 if version >= 0 {
561 put_i16(buf, self.error_code);
562 }
563 if version >= 0 {
564 put_i32(buf, self.partition_index);
565 }
566 if version >= 0 {
567 put_i32(buf, self.leader_id);
568 }
569 if version >= 7 {
570 put_i32(buf, self.leader_epoch);
571 }
572 if version >= 0 {
573 {
574 crate::primitives::array::put_array_len(buf, (self.replica_nodes).len(), flex);
575 for it in &self.replica_nodes {
576 put_i32(buf, *it);
577 }
578 }
579 }
580 if version >= 0 {
581 {
582 crate::primitives::array::put_array_len(buf, (self.isr_nodes).len(), flex);
583 for it in &self.isr_nodes {
584 put_i32(buf, *it);
585 }
586 }
587 }
588 if version >= 5 {
589 {
590 crate::primitives::array::put_array_len(buf, (self.offline_replicas).len(), flex);
591 for it in &self.offline_replicas {
592 put_i32(buf, *it);
593 }
594 }
595 }
596 if flex {
597 let tagged = WriteTaggedFields::new();
598 tagged.write(buf, &self.unknown_tagged_fields);
599 }
600 Ok(())
601 }
602 fn encoded_len(&self, version: i16) -> usize {
603 let flex = version >= 9;
604 let mut n: usize = 0;
605 if version >= 0 {
606 n += 2;
607 }
608 if version >= 0 {
609 n += 4;
610 }
611 if version >= 0 {
612 n += 4;
613 }
614 if version >= 7 {
615 n += 4;
616 }
617 if version >= 0 {
618 n += {
619 let prefix = crate::primitives::array::array_len_prefix_len(
620 (self.replica_nodes).len(),
621 flex,
622 );
623 let body: usize = (self.replica_nodes).iter().map(|_| 4).sum();
624 prefix + body
625 };
626 }
627 if version >= 0 {
628 n += {
629 let prefix =
630 crate::primitives::array::array_len_prefix_len((self.isr_nodes).len(), flex);
631 let body: usize = (self.isr_nodes).iter().map(|_| 4).sum();
632 prefix + body
633 };
634 }
635 if version >= 5 {
636 n += {
637 let prefix = crate::primitives::array::array_len_prefix_len(
638 (self.offline_replicas).len(),
639 flex,
640 );
641 let body: usize = (self.offline_replicas).iter().map(|_| 4).sum();
642 prefix + body
643 };
644 }
645 if flex {
646 let known_pairs: Vec<(u32, usize)> = Vec::new();
647 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
648 }
649 n
650 }
651}
652impl Decode<'_> for MetadataResponsePartition {
653 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
654 let flex = version >= 9;
655 let mut out = Self::default();
656 if version >= 0 {
657 out.error_code = get_i16(buf)?;
658 }
659 if version >= 0 {
660 out.partition_index = get_i32(buf)?;
661 }
662 if version >= 0 {
663 out.leader_id = get_i32(buf)?;
664 }
665 if version >= 7 {
666 out.leader_epoch = get_i32(buf)?;
667 }
668 if version >= 0 {
669 out.replica_nodes = {
670 let n = crate::primitives::array::get_array_len(buf, flex)?;
671 let mut v = Vec::with_capacity(n);
672 for _ in 0..n {
673 v.push(get_i32(buf)?);
674 }
675 v
676 };
677 }
678 if version >= 0 {
679 out.isr_nodes = {
680 let n = crate::primitives::array::get_array_len(buf, flex)?;
681 let mut v = Vec::with_capacity(n);
682 for _ in 0..n {
683 v.push(get_i32(buf)?);
684 }
685 v
686 };
687 }
688 if version >= 5 {
689 out.offline_replicas = {
690 let n = crate::primitives::array::get_array_len(buf, flex)?;
691 let mut v = Vec::with_capacity(n);
692 for _ in 0..n {
693 v.push(get_i32(buf)?);
694 }
695 v
696 };
697 }
698 if flex {
699 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
700 }
701 Ok(out)
702 }
703}
704#[cfg(test)]
705impl MetadataResponsePartition {
706 #[must_use]
707 pub fn populated(version: i16) -> Self {
708 let mut m = Self::default();
709 if version >= 0 {
710 m.error_code = 1i16;
711 }
712 if version >= 0 {
713 m.partition_index = 1i32;
714 }
715 if version >= 0 {
716 m.leader_id = 1i32;
717 }
718 if version >= 7 {
719 m.leader_epoch = 1i32;
720 }
721 if version >= 0 {
722 m.replica_nodes = vec![1i32];
723 }
724 if version >= 0 {
725 m.isr_nodes = vec![1i32];
726 }
727 if version >= 5 {
728 m.offline_replicas = vec![1i32];
729 }
730 m
731 }
732}
733#[must_use]
736#[allow(unused_comparisons)]
737pub fn default_json(version: i16) -> ::serde_json::Value {
738 let mut obj = ::serde_json::Map::new();
739 if version >= 3 {
740 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
741 }
742 obj.insert("brokers".to_string(), ::serde_json::Value::Array(vec![]));
743 if version >= 2 {
744 obj.insert("clusterId".to_string(), ::serde_json::Value::Null);
745 }
746 if version >= 1 {
747 obj.insert("controllerId".to_string(), ::serde_json::json!(-1));
748 }
749 obj.insert("topics".to_string(), ::serde_json::Value::Array(vec![]));
750 if (8..=10).contains(&version) {
751 obj.insert(
752 "clusterAuthorizedOperations".to_string(),
753 ::serde_json::json!(-2147483648),
754 );
755 }
756 if version >= 13 {
757 obj.insert("errorCode".to_string(), ::serde_json::json!(0));
758 }
759 ::serde_json::Value::Object(obj)
760}