1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_bool, get_i16, get_i32, put_bool, put_i16, put_i32};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
13
14pub const API_KEY: i16 = 3;
15pub const MIN_VERSION: i16 = 0;
16pub const MAX_VERSION: i16 = 13;
17pub const FLEXIBLE_MIN: i16 = 9;
18
19#[inline]
20fn is_flexible(version: i16) -> bool {
21 version >= FLEXIBLE_MIN
22}
23
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct MetadataResponse {
26 pub throttle_time_ms: i32,
27 pub brokers: Vec<MetadataResponseBroker>,
28 pub cluster_id: Option<String>,
29 pub controller_id: i32,
30 pub topics: Vec<MetadataResponseTopic>,
31 pub cluster_authorized_operations: i32,
32 pub error_code: i16,
33 pub unknown_tagged_fields: UnknownTaggedFields,
34}
35impl Default for MetadataResponse {
36 fn default() -> Self {
37 Self {
38 throttle_time_ms: 0i32,
39 brokers: Vec::new(),
40 cluster_id: None,
41 controller_id: -1i32,
42 topics: Vec::new(),
43 cluster_authorized_operations: -2_147_483_648i32,
44 error_code: 0i16,
45 unknown_tagged_fields: Default::default(),
46 }
47 }
48}
49impl Encode for MetadataResponse {
50 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
51 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
52 return Err(ProtocolError::UnsupportedVersion {
53 api_key: API_KEY,
54 version,
55 });
56 }
57 let flex = is_flexible(version);
58 if version >= 3 {
59 put_i32(buf, self.throttle_time_ms);
60 }
61 if version >= 0 {
62 {
63 crate::primitives::array::put_array_len(buf, (self.brokers).len(), flex);
64 for it in &self.brokers {
65 it.encode(buf, version)?;
66 }
67 }
68 }
69 if version >= 2 {
70 if flex {
71 put_compact_nullable_string(buf, self.cluster_id.as_deref());
72 } else {
73 put_nullable_string(buf, self.cluster_id.as_deref());
74 }
75 }
76 if version >= 1 {
77 put_i32(buf, self.controller_id);
78 }
79 if version >= 0 {
80 {
81 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
82 for it in &self.topics {
83 it.encode(buf, version)?;
84 }
85 }
86 }
87 if (8..=10).contains(&version) {
88 put_i32(buf, self.cluster_authorized_operations);
89 }
90 if version >= 13 {
91 put_i16(buf, self.error_code);
92 }
93 if flex {
94 let tagged = WriteTaggedFields::new();
95 tagged.write(buf, &self.unknown_tagged_fields);
96 }
97 Ok(())
98 }
99 fn encoded_len(&self, version: i16) -> usize {
100 let flex = is_flexible(version);
101 let mut n: usize = 0;
102 if version >= 3 {
103 n += 4;
104 }
105 if version >= 0 {
106 n += {
107 let prefix =
108 crate::primitives::array::array_len_prefix_len((self.brokers).len(), flex);
109 let body: usize = (self.brokers)
110 .iter()
111 .map(|it| it.encoded_len(version))
112 .sum();
113 prefix + body
114 };
115 }
116 if version >= 2 {
117 n += if flex {
118 compact_nullable_string_len(self.cluster_id.as_deref())
119 } else {
120 nullable_string_len(self.cluster_id.as_deref())
121 };
122 }
123 if version >= 1 {
124 n += 4;
125 }
126 if version >= 0 {
127 n += {
128 let prefix =
129 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
130 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
131 prefix + body
132 };
133 }
134 if (8..=10).contains(&version) {
135 n += 4;
136 }
137 if version >= 13 {
138 n += 2;
139 }
140 if flex {
141 let known_pairs: Vec<(u32, usize)> = Vec::new();
142 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
143 }
144 n
145 }
146}
147impl Decode<'_> for MetadataResponse {
148 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
149 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
150 return Err(ProtocolError::UnsupportedVersion {
151 api_key: API_KEY,
152 version,
153 });
154 }
155 let flex = is_flexible(version);
156 let mut out = Self::default();
157 if version >= 3 {
158 out.throttle_time_ms = get_i32(buf)?;
159 }
160 if version >= 0 {
161 out.brokers = {
162 let n = crate::primitives::array::get_array_len(buf, flex)?;
163 let mut v = Vec::with_capacity(n);
164 for _ in 0..n {
165 v.push(MetadataResponseBroker::decode(buf, version)?);
166 }
167 v
168 };
169 }
170 if version >= 2 {
171 out.cluster_id = if flex {
172 get_compact_nullable_string_owned(buf)?
173 } else {
174 get_nullable_string_owned(buf)?
175 };
176 }
177 if version >= 1 {
178 out.controller_id = get_i32(buf)?;
179 }
180 if version >= 0 {
181 out.topics = {
182 let n = crate::primitives::array::get_array_len(buf, flex)?;
183 let mut v = Vec::with_capacity(n);
184 for _ in 0..n {
185 v.push(MetadataResponseTopic::decode(buf, version)?);
186 }
187 v
188 };
189 }
190 if (8..=10).contains(&version) {
191 out.cluster_authorized_operations = get_i32(buf)?;
192 }
193 if version >= 13 {
194 out.error_code = get_i16(buf)?;
195 }
196 if flex {
197 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
198 }
199 Ok(out)
200 }
201}
202#[cfg(test)]
203impl MetadataResponse {
204 #[must_use]
205 pub fn populated(version: i16) -> Self {
206 let mut m = Self::default();
207 if version >= 3 {
208 m.throttle_time_ms = 1i32;
209 }
210 if version >= 0 {
211 m.brokers = vec![MetadataResponseBroker::populated(version)];
212 }
213 if version >= 2 {
214 m.cluster_id = Some("x".to_string());
215 }
216 if version >= 1 {
217 m.controller_id = 1i32;
218 }
219 if version >= 0 {
220 m.topics = vec![MetadataResponseTopic::populated(version)];
221 }
222 if (8..=10).contains(&version) {
223 m.cluster_authorized_operations = 1i32;
224 }
225 if version >= 13 {
226 m.error_code = 1i16;
227 }
228 m
229 }
230}
231#[derive(Debug, Clone, PartialEq, Eq, Default)]
232pub struct MetadataResponseBroker {
233 pub node_id: i32,
234 pub host: String,
235 pub port: i32,
236 pub rack: Option<String>,
237 pub unknown_tagged_fields: UnknownTaggedFields,
238}
239impl Encode for MetadataResponseBroker {
240 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
241 let flex = version >= 9;
242 if version >= 0 {
243 put_i32(buf, self.node_id);
244 }
245 if version >= 0 {
246 if flex {
247 put_compact_string(buf, &self.host);
248 } else {
249 put_string(buf, &self.host);
250 }
251 }
252 if version >= 0 {
253 put_i32(buf, self.port);
254 }
255 if version >= 1 {
256 if flex {
257 put_compact_nullable_string(buf, self.rack.as_deref());
258 } else {
259 put_nullable_string(buf, self.rack.as_deref());
260 }
261 }
262 if flex {
263 let tagged = WriteTaggedFields::new();
264 tagged.write(buf, &self.unknown_tagged_fields);
265 }
266 Ok(())
267 }
268 fn encoded_len(&self, version: i16) -> usize {
269 let flex = version >= 9;
270 let mut n: usize = 0;
271 if version >= 0 {
272 n += 4;
273 }
274 if version >= 0 {
275 n += if flex {
276 compact_string_len(&self.host)
277 } else {
278 string_len(&self.host)
279 };
280 }
281 if version >= 0 {
282 n += 4;
283 }
284 if version >= 1 {
285 n += if flex {
286 compact_nullable_string_len(self.rack.as_deref())
287 } else {
288 nullable_string_len(self.rack.as_deref())
289 };
290 }
291 if flex {
292 let known_pairs: Vec<(u32, usize)> = Vec::new();
293 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
294 }
295 n
296 }
297}
298impl Decode<'_> for MetadataResponseBroker {
299 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
300 let flex = version >= 9;
301 let mut out = Self::default();
302 if version >= 0 {
303 out.node_id = get_i32(buf)?;
304 }
305 if version >= 0 {
306 out.host = if flex {
307 get_compact_string_owned(buf)?
308 } else {
309 get_string_owned(buf)?
310 };
311 }
312 if version >= 0 {
313 out.port = get_i32(buf)?;
314 }
315 if version >= 1 {
316 out.rack = if flex {
317 get_compact_nullable_string_owned(buf)?
318 } else {
319 get_nullable_string_owned(buf)?
320 };
321 }
322 if flex {
323 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
324 }
325 Ok(out)
326 }
327}
328#[cfg(test)]
329impl MetadataResponseBroker {
330 #[must_use]
331 pub fn populated(version: i16) -> Self {
332 let mut m = Self::default();
333 if version >= 0 {
334 m.node_id = 1i32;
335 }
336 if version >= 0 {
337 m.host = "x".to_string();
338 }
339 if version >= 0 {
340 m.port = 1i32;
341 }
342 if version >= 1 {
343 m.rack = Some("x".to_string());
344 }
345 m
346 }
347}
348#[derive(Debug, Clone, PartialEq, Eq)]
349pub struct MetadataResponseTopic {
350 pub error_code: i16,
351 pub name: Option<String>,
352 pub topic_id: crate::primitives::uuid::Uuid,
353 pub is_internal: bool,
354 pub partitions: Vec<MetadataResponsePartition>,
355 pub topic_authorized_operations: i32,
356 pub unknown_tagged_fields: UnknownTaggedFields,
357}
358impl Default for MetadataResponseTopic {
359 fn default() -> Self {
360 Self {
361 error_code: 0i16,
362 name: None,
363 topic_id: Default::default(),
364 is_internal: false,
365 partitions: Vec::new(),
366 topic_authorized_operations: -2_147_483_648i32,
367 unknown_tagged_fields: Default::default(),
368 }
369 }
370}
371impl Encode for MetadataResponseTopic {
372 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
373 let flex = version >= 9;
374 if version >= 0 {
375 put_i16(buf, self.error_code);
376 }
377 if version >= 0 {
378 if version >= 12 {
379 if flex {
380 put_compact_nullable_string(buf, self.name.as_deref());
381 } else {
382 put_nullable_string(buf, self.name.as_deref());
383 }
384 } else {
385 if flex {
386 put_compact_string(buf, (self.name).as_deref().unwrap_or(""));
387 } else {
388 put_string(buf, (self.name).as_deref().unwrap_or(""));
389 }
390 }
391 }
392 if version >= 10 {
393 crate::primitives::uuid::put_uuid(buf, self.topic_id);
394 }
395 if version >= 1 {
396 put_bool(buf, self.is_internal);
397 }
398 if version >= 0 {
399 {
400 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
401 for it in &self.partitions {
402 it.encode(buf, version)?;
403 }
404 }
405 }
406 if version >= 8 {
407 put_i32(buf, self.topic_authorized_operations);
408 }
409 if flex {
410 let tagged = WriteTaggedFields::new();
411 tagged.write(buf, &self.unknown_tagged_fields);
412 }
413 Ok(())
414 }
415 fn encoded_len(&self, version: i16) -> usize {
416 let flex = version >= 9;
417 let mut n: usize = 0;
418 if version >= 0 {
419 n += 2;
420 }
421 if version >= 0 {
422 n += if version >= 12 {
423 if flex {
424 compact_nullable_string_len(self.name.as_deref())
425 } else {
426 nullable_string_len(self.name.as_deref())
427 }
428 } else {
429 if flex {
430 compact_string_len((self.name).as_deref().unwrap_or(""))
431 } else {
432 string_len((self.name).as_deref().unwrap_or(""))
433 }
434 };
435 }
436 if version >= 10 {
437 n += 16;
438 }
439 if version >= 1 {
440 n += 1;
441 }
442 if version >= 0 {
443 n += {
444 let prefix =
445 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
446 let body: usize = (self.partitions)
447 .iter()
448 .map(|it| it.encoded_len(version))
449 .sum();
450 prefix + body
451 };
452 }
453 if version >= 8 {
454 n += 4;
455 }
456 if flex {
457 let known_pairs: Vec<(u32, usize)> = Vec::new();
458 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
459 }
460 n
461 }
462}
463impl Decode<'_> for MetadataResponseTopic {
464 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
465 let flex = version >= 9;
466 let mut out = Self::default();
467 if version >= 0 {
468 out.error_code = get_i16(buf)?;
469 }
470 if version >= 0 {
471 out.name = if version >= 12 {
472 if flex {
473 get_compact_nullable_string_owned(buf)?
474 } else {
475 get_nullable_string_owned(buf)?
476 }
477 } else {
478 Some(if flex {
479 get_compact_string_owned(buf)?
480 } else {
481 get_string_owned(buf)?
482 })
483 };
484 }
485 if version >= 10 {
486 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
487 }
488 if version >= 1 {
489 out.is_internal = get_bool(buf)?;
490 }
491 if version >= 0 {
492 out.partitions = {
493 let n = crate::primitives::array::get_array_len(buf, flex)?;
494 let mut v = Vec::with_capacity(n);
495 for _ in 0..n {
496 v.push(MetadataResponsePartition::decode(buf, version)?);
497 }
498 v
499 };
500 }
501 if version >= 8 {
502 out.topic_authorized_operations = get_i32(buf)?;
503 }
504 if flex {
505 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
506 }
507 Ok(out)
508 }
509}
510#[cfg(test)]
511impl MetadataResponseTopic {
512 #[must_use]
513 pub fn populated(version: i16) -> Self {
514 let mut m = Self::default();
515 if version >= 0 {
516 m.error_code = 1i16;
517 }
518 if version >= 0 {
519 m.name = Some("x".to_string());
520 }
521 if version >= 10 {
522 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
523 }
524 if version >= 1 {
525 m.is_internal = true;
526 }
527 if version >= 0 {
528 m.partitions = vec![MetadataResponsePartition::populated(version)];
529 }
530 if version >= 8 {
531 m.topic_authorized_operations = 1i32;
532 }
533 m
534 }
535}
536#[derive(Debug, Clone, PartialEq, Eq)]
537pub struct MetadataResponsePartition {
538 pub error_code: i16,
539 pub partition_index: i32,
540 pub leader_id: i32,
541 pub leader_epoch: i32,
542 pub replica_nodes: Vec<i32>,
543 pub isr_nodes: Vec<i32>,
544 pub offline_replicas: Vec<i32>,
545 pub unknown_tagged_fields: UnknownTaggedFields,
546}
547impl Default for MetadataResponsePartition {
548 fn default() -> Self {
549 Self {
550 error_code: 0i16,
551 partition_index: 0i32,
552 leader_id: 0i32,
553 leader_epoch: -1i32,
554 replica_nodes: Vec::new(),
555 isr_nodes: Vec::new(),
556 offline_replicas: Vec::new(),
557 unknown_tagged_fields: Default::default(),
558 }
559 }
560}
561impl Encode for MetadataResponsePartition {
562 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
563 let flex = version >= 9;
564 if version >= 0 {
565 put_i16(buf, self.error_code);
566 }
567 if version >= 0 {
568 put_i32(buf, self.partition_index);
569 }
570 if version >= 0 {
571 put_i32(buf, self.leader_id);
572 }
573 if version >= 7 {
574 put_i32(buf, self.leader_epoch);
575 }
576 if version >= 0 {
577 {
578 crate::primitives::array::put_array_len(buf, (self.replica_nodes).len(), flex);
579 for it in &self.replica_nodes {
580 put_i32(buf, *it);
581 }
582 }
583 }
584 if version >= 0 {
585 {
586 crate::primitives::array::put_array_len(buf, (self.isr_nodes).len(), flex);
587 for it in &self.isr_nodes {
588 put_i32(buf, *it);
589 }
590 }
591 }
592 if version >= 5 {
593 {
594 crate::primitives::array::put_array_len(buf, (self.offline_replicas).len(), flex);
595 for it in &self.offline_replicas {
596 put_i32(buf, *it);
597 }
598 }
599 }
600 if flex {
601 let tagged = WriteTaggedFields::new();
602 tagged.write(buf, &self.unknown_tagged_fields);
603 }
604 Ok(())
605 }
606 fn encoded_len(&self, version: i16) -> usize {
607 let flex = version >= 9;
608 let mut n: usize = 0;
609 if version >= 0 {
610 n += 2;
611 }
612 if version >= 0 {
613 n += 4;
614 }
615 if version >= 0 {
616 n += 4;
617 }
618 if version >= 7 {
619 n += 4;
620 }
621 if version >= 0 {
622 n += {
623 let prefix = crate::primitives::array::array_len_prefix_len(
624 (self.replica_nodes).len(),
625 flex,
626 );
627 let body: usize = (self.replica_nodes).iter().map(|_| 4).sum();
628 prefix + body
629 };
630 }
631 if version >= 0 {
632 n += {
633 let prefix =
634 crate::primitives::array::array_len_prefix_len((self.isr_nodes).len(), flex);
635 let body: usize = (self.isr_nodes).iter().map(|_| 4).sum();
636 prefix + body
637 };
638 }
639 if version >= 5 {
640 n += {
641 let prefix = crate::primitives::array::array_len_prefix_len(
642 (self.offline_replicas).len(),
643 flex,
644 );
645 let body: usize = (self.offline_replicas).iter().map(|_| 4).sum();
646 prefix + body
647 };
648 }
649 if flex {
650 let known_pairs: Vec<(u32, usize)> = Vec::new();
651 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
652 }
653 n
654 }
655}
656impl Decode<'_> for MetadataResponsePartition {
657 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
658 let flex = version >= 9;
659 let mut out = Self::default();
660 if version >= 0 {
661 out.error_code = get_i16(buf)?;
662 }
663 if version >= 0 {
664 out.partition_index = get_i32(buf)?;
665 }
666 if version >= 0 {
667 out.leader_id = get_i32(buf)?;
668 }
669 if version >= 7 {
670 out.leader_epoch = get_i32(buf)?;
671 }
672 if version >= 0 {
673 out.replica_nodes = {
674 let n = crate::primitives::array::get_array_len(buf, flex)?;
675 let mut v = Vec::with_capacity(n);
676 for _ in 0..n {
677 v.push(get_i32(buf)?);
678 }
679 v
680 };
681 }
682 if version >= 0 {
683 out.isr_nodes = {
684 let n = crate::primitives::array::get_array_len(buf, flex)?;
685 let mut v = Vec::with_capacity(n);
686 for _ in 0..n {
687 v.push(get_i32(buf)?);
688 }
689 v
690 };
691 }
692 if version >= 5 {
693 out.offline_replicas = {
694 let n = crate::primitives::array::get_array_len(buf, flex)?;
695 let mut v = Vec::with_capacity(n);
696 for _ in 0..n {
697 v.push(get_i32(buf)?);
698 }
699 v
700 };
701 }
702 if flex {
703 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
704 }
705 Ok(out)
706 }
707}
708#[cfg(test)]
709impl MetadataResponsePartition {
710 #[must_use]
711 pub fn populated(version: i16) -> Self {
712 let mut m = Self::default();
713 if version >= 0 {
714 m.error_code = 1i16;
715 }
716 if version >= 0 {
717 m.partition_index = 1i32;
718 }
719 if version >= 0 {
720 m.leader_id = 1i32;
721 }
722 if version >= 7 {
723 m.leader_epoch = 1i32;
724 }
725 if version >= 0 {
726 m.replica_nodes = vec![1i32];
727 }
728 if version >= 0 {
729 m.isr_nodes = vec![1i32];
730 }
731 if version >= 5 {
732 m.offline_replicas = vec![1i32];
733 }
734 m
735 }
736}
737
738#[must_use]
741#[allow(unused_comparisons)]
742pub fn default_json(version: i16) -> ::serde_json::Value {
743 let mut obj = ::serde_json::Map::new();
744 if version >= 3 {
745 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
746 }
747 obj.insert("brokers".to_string(), ::serde_json::Value::Array(vec![]));
748 if version >= 2 {
749 obj.insert("clusterId".to_string(), ::serde_json::Value::Null);
750 }
751 if version >= 1 {
752 obj.insert("controllerId".to_string(), ::serde_json::json!(-1));
753 }
754 obj.insert("topics".to_string(), ::serde_json::Value::Array(vec![]));
755 if (8..=10).contains(&version) {
756 obj.insert(
757 "clusterAuthorizedOperations".to_string(),
758 ::serde_json::json!(-2147483648),
759 );
760 }
761 if version >= 13 {
762 obj.insert("errorCode".to_string(), ::serde_json::json!(0));
763 }
764 ::serde_json::Value::Object(obj)
765}