1pub mod error;
21pub mod wv;
22
23use convert_case::{Case, Casing};
24pub use error::Error;
25use lazy_static::lazy_static;
26use proc_macro2::{Ident, Span, TokenStream};
27use quote::ToTokens;
28use regex::Regex;
29use serde_json::Value;
30use std::{fmt, str::FromStr};
31use syn::{Expr, Type};
32use tracing::debug;
33use wv::{As, AsOption, Wv};
34
35macro_rules! prefix_crate {
36 ($e:ident) => {
37 format!("{}::{}", env!("CARGO_CRATE_NAME"), stringify!($e))
38 };
39}
40
41macro_rules! with_crate {
42 ($e:expr_2021) => {
43 format!("{}::{:?}", env!("CARGO_CRATE_NAME"), $e)
44 };
45
46 ($e:expr_2021, $f:expr_2021) => {
47 format!("{}::{}::{:?}", env!("CARGO_CRATE_NAME"), $e, $f)
48 };
49}
50
51pub type Result<T, E = Error> = std::result::Result<T, E>;
52
53fn type_mapping(kind: &str) -> String {
54 match kind {
55 "bytes" => String::from("bytes::Bytes"),
56 "float64" => String::from("f64"),
57 "int16" => String::from("i16"),
58 "int32" => String::from("i32"),
59 "int64" => String::from("i64"),
60 "int8" => String::from("i8"),
61 "string" => String::from("String"),
62 "uint16" => String::from("u16"),
63 "uuid" => String::from("[u8; 16]"),
64 "records" => String::from("crate::RecordBatch"),
65
66 sequence if sequence.starts_with("[]") => type_mapping(&sequence[2..]),
67
68 s => String::from(s),
69 }
70}
71
72#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
73pub struct Kind(String);
75
76impl ToTokens for Kind {
77 #![allow(clippy::unwrap_used)]
78 fn to_tokens(&self, tokens: &mut TokenStream) {
79 let expr = with_crate!(self);
80 syn::parse_str::<Expr>(&expr).unwrap().to_tokens(tokens);
81 }
82}
83
84impl From<Type> for Kind {
85 fn from(value: Type) -> Self {
86 Self(value.to_token_stream().to_string())
87 }
88}
89
90lazy_static! {
91 static ref PRIMITIVE: &'static [&'static str] = &[
92 "bool", "bytes", "float64", "int16", "int32", "int64", "int8", "records", "string",
93 "uint16", "uuid",
94 ];
95}
96
97impl Kind {
98 #[must_use]
99 pub fn new(kind: &str) -> Self {
100 Self(kind.to_owned())
101 }
102
103 #[must_use]
104 pub fn name(&self) -> &str {
105 &self.0[..]
106 }
107
108 #[must_use]
109 #[allow(clippy::missing_panics_doc)]
110 pub fn type_name(&self) -> Type {
112 syn::parse_str::<Type>(&type_mapping(&self.0))
113 .unwrap_or_else(|_| panic!("not a type: {self:?}"))
114 }
115
116 #[must_use]
117 pub fn is_sequence(&self) -> bool {
119 self.0.starts_with("[]")
120 }
121
122 #[must_use]
123 pub fn is_primitive(&self) -> bool {
125 PRIMITIVE.contains(&self.0.as_str())
126 }
127
128 #[must_use]
129 pub fn is_float(&self) -> bool {
131 self.0.eq("float64")
132 }
133
134 #[must_use]
135 pub fn is_sequence_of_primitive(&self) -> bool {
137 self.is_sequence() && PRIMITIVE.contains(&&self.0[2..])
138 }
139
140 #[must_use]
141 pub fn kind_of_sequence(&self) -> Option<Self> {
143 if self.is_sequence() {
144 Some(Self::new(&self.0[2..]))
145 } else {
146 None
147 }
148 }
149}
150
151impl FromStr for Kind {
152 type Err = Error;
153
154 fn from_str(s: &str) -> Result<Self, Self::Err> {
155 Ok(Self(s.to_owned()))
156 }
157}
158
159impl TryFrom<&Value> for Kind {
160 type Error = Error;
161
162 fn try_from(value: &Value) -> Result<Self, Self::Error> {
163 as_str(value, "type").and_then(Self::from_str)
164 }
165}
166
167#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
168pub enum Listener {
170 ZkBroker,
171 #[default]
172 Broker,
173 Controller,
174}
175
176impl TryFrom<&Value> for Listener {
177 type Error = Error;
178
179 fn try_from(value: &Value) -> Result<Self, Self::Error> {
180 value
181 .as_str()
182 .ok_or(Error::Message(String::from(
183 "expecting string for listener",
184 )))
185 .and_then(Self::from_str)
186 }
187}
188
189impl FromStr for Listener {
190 type Err = Error;
191
192 fn from_str(s: &str) -> Result<Self, Self::Err> {
193 match s {
194 "zkBroker" => Ok(Self::ZkBroker),
195 "broker" => Ok(Self::Broker),
196 "controller" => Ok(Self::Controller),
197 s => Err(Error::Message(String::from(s))),
198 }
199 }
200}
201
202#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
203pub enum MessageKind {
205 #[default]
206 Request,
207 Response,
208}
209
210impl ToTokens for MessageKind {
211 #[allow(clippy::unwrap_used)]
212 fn to_tokens(&self, tokens: &mut TokenStream) {
213 let expr = with_crate!("MessageKind", self);
214 syn::parse_str::<Expr>(&expr).unwrap().to_tokens(tokens);
215 }
216}
217
218impl TryFrom<&Value> for MessageKind {
219 type Error = Error;
220
221 fn try_from(value: &Value) -> Result<Self, Self::Error> {
222 as_str(value, "type").and_then(Self::from_str)
223 }
224}
225
226impl FromStr for MessageKind {
227 type Err = Error;
228
229 fn from_str(s: &str) -> Result<Self, Self::Err> {
230 match s {
231 "request" => Ok(Self::Request),
232 "response" => Ok(Self::Response),
233 s => Err(Error::Message(String::from(s))),
234 }
235 }
236}
237
238#[derive(Clone, Copy, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
239pub struct VersionRange {
241 pub start: i16,
242 pub end: i16,
243}
244
245impl fmt::Debug for VersionRange {
246 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247 f.debug_struct(&prefix_crate!(VersionRange))
248 .field("start", &self.start)
249 .field("end", &self.end)
250 .finish()
251 }
252}
253
254impl VersionRange {
255 #[must_use]
256 pub fn within(&self, version: i16) -> bool {
257 version >= self.start && version <= self.end
258 }
259
260 #[must_use]
261 pub fn is_mandatory(&self, parent: Option<VersionRange>) -> bool {
262 parent.map_or(
263 self.start == 0 && self.end == i16::MAX,
264 |VersionRange { start, end }| {
265 if start > self.start {
266 self.end == end
267 } else {
268 self.start == start && self.end == end
269 }
270 },
271 )
272 }
273}
274
275impl ToTokens for VersionRange {
276 fn to_tokens(&self, tokens: &mut TokenStream) {
277 let expr = format!("{self:?}");
278 syn::parse_str::<Expr>(&expr)
279 .unwrap_or_else(|_| panic!("an expression: {self:?}"))
280 .to_tokens(tokens);
281 }
282}
283
284impl FromStr for VersionRange {
285 type Err = Error;
286
287 fn from_str(s: &str) -> Result<Self, Self::Err> {
288 #![allow(clippy::expect_used)]
289 lazy_static! {
290 static ref RX: Regex =
291 Regex::new(r"^(?<start>\d+)(-(?<end>\d+)|(?<infinite>\+))?$").expect("regex");
292 }
293
294 if s == "none" {
295 Ok(VersionRange { start: 0, end: 0 })
296 } else {
297 RX.captures(s)
298 .ok_or(Error::Message(format!("invalid version range format: {s}")))
299 .and_then(|captures| {
300 let parse = |name, default: i16| {
301 captures
302 .name(name)
303 .map_or(Ok(&default.to_string()[..]), |s| Ok(s.as_str()))
304 .and_then(str::parse)
305 };
306
307 let start = parse("start", 0)?;
308
309 let end = if let Some(end) = captures.name("end") {
310 str::parse(end.as_str())?
311 } else if captures.name("infinite").is_some() {
312 i16::MAX
313 } else {
314 start
315 };
316
317 Ok(VersionRange { start, end })
318 })
319 }
320 }
321}
322
323#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
324pub struct Version {
326 pub valid: VersionRange,
328 pub deprecated: Option<VersionRange>,
330 pub flexible: VersionRange,
332}
333
334impl ToTokens for Version {
335 #[allow(clippy::unwrap_used)]
336 fn to_tokens(&self, tokens: &mut TokenStream) {
337 let expr = with_crate!(self);
338 syn::parse_str::<Expr>(&expr).unwrap().to_tokens(tokens);
339 }
340}
341
342impl Version {
343 #[must_use]
344 pub fn valid(&self) -> VersionRange {
345 self.valid
346 }
347
348 #[must_use]
349 pub fn deprecated(&self) -> Option<VersionRange> {
350 self.deprecated
351 }
352
353 #[must_use]
354 pub fn flexible(&self) -> VersionRange {
355 self.flexible
356 }
357}
358
359impl<'a> TryFrom<&Wv<'a>> for Version {
360 type Error = Error;
361
362 fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
363 debug!("value: {:?}", value);
364
365 Ok(Self {
366 valid: value.as_a("validVersions")?,
367 deprecated: value.as_option("deprecatedVersions")?,
368 flexible: value.as_a("flexibleVersions")?,
369 })
370 }
371}
372
373#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
374pub struct Field {
376 name: String,
378 kind: Kind,
380 about: Option<String>,
382 versions: VersionRange,
384 map_key: Option<bool>,
386 nullable: Option<VersionRange>,
388 tag: Option<u32>,
390 tagged: Option<VersionRange>,
392 entity_type: Option<String>,
394 default: Option<String>,
396 fields: Option<Vec<Field>>,
398}
399
400impl Field {
401 #[must_use]
402 pub fn ident(&self) -> Ident {
403 match self.name.to_case(Case::Snake) {
404 reserved if is_reserved_keyword(&reserved) => {
405 Ident::new_raw(&reserved, Span::call_site())
406 }
407
408 otherwise => Ident::new(&otherwise, Span::call_site()),
409 }
410 }
411
412 #[must_use]
413 pub fn name(&self) -> &str {
414 &self.name
415 }
416
417 #[must_use]
418 pub fn kind(&self) -> &Kind {
419 &self.kind
420 }
421
422 #[must_use]
423 pub fn fields(&self) -> Option<&[Field]> {
424 self.fields.as_deref()
425 }
426
427 #[must_use]
428 pub fn versions(&self) -> VersionRange {
429 self.versions
430 }
431
432 #[must_use]
433 pub fn nullable(&self) -> Option<VersionRange> {
434 self.nullable
435 }
436
437 #[must_use]
438 pub fn tagged(&self) -> Option<VersionRange> {
439 self.tagged
440 }
441
442 #[must_use]
443 pub fn tag(&self) -> Option<u32> {
444 self.tag
445 }
446
447 #[must_use]
448 pub fn about(&self) -> Option<&str> {
449 self.about.as_deref()
450 }
451
452 #[must_use]
453 pub fn has_tags(&self) -> bool {
454 self.tagged.is_some()
455 }
456
457 #[must_use]
458 pub fn has_records(&self) -> bool {
459 self.kind.0 == "records"
460 || self
461 .fields()
462 .is_some_and(|fields| fields.iter().any(Field::has_records))
463 }
464
465 #[must_use]
466 pub fn has_float(&self) -> bool {
467 self.kind().is_float()
468 || self
469 .fields()
470 .is_some_and(|fields| fields.iter().any(Field::has_float))
471 }
472}
473
474impl<'a> TryFrom<&Wv<'a>> for Field {
475 type Error = Error;
476
477 fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
478 let name = value.as_a("name")?;
479 let kind = value.as_a("type")?;
480 let about = value.as_option("about")?;
481 let versions = value.as_a("versions")?;
482 let map_key = value.as_option("mapKey")?;
483 let nullable = value.as_option("nullableVersions")?;
484 let tag = value.as_option("tag")?;
485 let tagged = value.as_option("taggedVersions")?;
486 let entity_type = value.as_option("entityType")?;
487 let default = value.as_option("default")?;
488
489 let fields = value
490 .as_option("fields")?
491 .map_or(Ok(None), |values: &[Value]| {
492 values
493 .iter()
494 .try_fold(Vec::new(), |mut acc, field| {
495 Field::try_from(&Wv::from(field)).map(|f| {
496 acc.push(f);
497 acc
498 })
499 })
500 .map(Some)
501 })?;
502
503 Ok(Field {
504 name,
505 kind,
506 about,
507 versions,
508 map_key,
509 nullable,
510 tag,
511 tagged,
512 entity_type,
513 default,
514 fields,
515 })
516 }
517}
518
519fn is_reserved_keyword(s: &str) -> bool {
520 lazy_static! {
521 static ref RESERVED: Vec<&'static str> = vec!["as",
522 "break",
523 "const",
524 "continue",
525 "crate",
526 "else",
527 "enum",
528 "extern",
529 "false",
530 "fn",
531 "for",
532 "if",
533 "impl",
534 "in",
535 "let",
536 "loop",
537 "match",
538 "mod",
539 "move",
540 "mut",
541 "pub",
542 "ref",
543 "return",
544 "self",
545 "Self",
546 "static",
547 "struct",
548 "super",
549 "trait",
550 "true",
551 "type",
552 "unsafe",
553 "use",
554 "where",
555 "while",
556
557 "async",
559 "await",
560 "dyn",
561
562 "abstract",
564 "become",
565 "box",
566 "do",
567 "final",
568 "macro",
569 "override",
570 "priv",
571 "typeof",
572 "unsized",
573 "virtual",
574 "yield",
575
576 "try",
578 ];
579 }
580
581 RESERVED.contains(&s)
582}
583
584#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
585pub struct Message {
586 api_key: i16,
587 kind: MessageKind,
588 listeners: Option<Vec<Listener>>,
589 name: String,
590 versions: Version,
591 fields: Vec<Field>,
592 common_structs: Option<Vec<CommonStruct>>,
593}
594
595impl Message {
596 #[must_use]
597 pub fn api_key(&self) -> i16 {
598 self.api_key
599 }
600
601 #[must_use]
602 pub fn kind(&self) -> MessageKind {
603 self.kind
604 }
605
606 #[must_use]
607 pub fn name(&self) -> &str {
608 &self.name
609 }
610
611 #[must_use]
612 #[allow(clippy::missing_panics_doc, clippy::unwrap_used)]
613 pub fn type_name(&self) -> Type {
614 syn::parse_str::<Type>(&self.name).unwrap()
615 }
616
617 #[must_use]
618 pub fn version(&self) -> Version {
619 self.versions
620 }
621
622 #[must_use]
623 pub fn listeners(&self) -> Option<&[Listener]> {
624 self.listeners.as_deref()
625 }
626
627 #[must_use]
628 pub fn fields(&self) -> &[Field] {
629 &self.fields
630 }
631
632 #[must_use]
633 #[allow(clippy::missing_panics_doc, clippy::unwrap_used)]
634 pub fn wrapper_new_type(&self, field: &Field) -> Type {
635 syn::parse_str::<Type>(&format!("{}{}", self.name, field.name).to_case(Case::Pascal))
636 .unwrap()
637 }
638
639 #[must_use]
640 pub fn common_structs(&self) -> Option<&[CommonStruct]> {
641 self.common_structs.as_deref()
642 }
643
644 #[must_use]
645 pub fn has_records(&self) -> bool {
646 self.fields().iter().any(Field::has_records)
647 }
648
649 #[must_use]
650 pub fn has_tags(&self) -> bool {
651 self.fields().iter().any(Field::has_tags)
652 }
653
654 #[must_use]
655 pub fn has_float(&self) -> bool {
656 self.fields.iter().any(|field| field.kind().is_float())
657 || self
658 .common_structs()
659 .is_some_and(|structures| structures.iter().any(CommonStruct::has_float))
660 }
661}
662
663impl<'a> TryFrom<&Wv<'a>> for Message {
664 type Error = Error;
665
666 fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
667 let api_key = value.as_a("apiKey")?;
668 let name = value.as_a("name")?;
669 let kind = value.as_a("type")?;
670 let listeners = value
671 .as_option("listeners")?
672 .map_or(Ok(None), |maybe: &[Value]| {
673 maybe
674 .iter()
675 .try_fold(Vec::new(), |mut acc, listener| {
676 Listener::try_from(listener).map(|l| {
677 acc.push(l);
678 acc
679 })
680 })
681 .map(Some)
682 })?;
683
684 let fields = value.as_a("fields").and_then(|fields: &[Value]| {
685 fields.iter().try_fold(Vec::new(), |mut acc, field| {
686 Field::try_from(&Wv::from(field)).map(|f| {
687 acc.push(f);
688 acc
689 })
690 })
691 })?;
692
693 let versions = Version::try_from(value)?;
694 let common_structs =
695 value
696 .as_option("commonStructs")?
697 .map_or(Ok(None), |maybe: &[Value]| {
698 maybe
699 .iter()
700 .try_fold(Vec::new(), |mut acc, value| {
701 CommonStruct::try_from(&Wv::from(value)).map(|field| {
702 acc.push(field);
703 acc
704 })
705 })
706 .map(Some)
707 })?;
708
709 Ok(Self {
710 api_key,
711 kind,
712 listeners,
713 name,
714 versions,
715 fields,
716 common_structs,
717 })
718 }
719}
720
721#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
722pub struct CommonStruct {
723 name: String,
724 fields: Vec<Field>,
725}
726
727impl CommonStruct {
728 #[must_use]
729 pub fn name(&self) -> &str {
730 self.name.as_str()
731 }
732
733 #[must_use]
734 #[allow(clippy::missing_panics_doc)]
735 pub fn type_name(&self) -> Type {
736 syn::parse_str::<Type>(&self.name).unwrap_or_else(|_| panic!("not a type: {self:?}"))
737 }
738
739 #[must_use]
740 pub fn fields(&self) -> &Vec<Field> {
741 &self.fields
742 }
743
744 #[must_use]
745 pub fn has_float(&self) -> bool {
746 self.fields.iter().any(|field| field.kind().is_float())
747 }
748}
749
750impl<'a> TryFrom<&Wv<'a>> for CommonStruct {
751 type Error = Error;
752
753 fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
754 let name = value.as_a("name")?;
755 let fields = value.as_a("fields").and_then(|fields: &[Value]| {
756 fields.iter().try_fold(Vec::new(), |mut acc, field| {
757 Field::try_from(&Wv::from(field)).map(|f| {
758 acc.push(f);
759 acc
760 })
761 })
762 })?;
763
764 Ok(Self { name, fields })
765 }
766}
767
768#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
769pub struct HeaderMeta {
770 pub name: &'static str,
771 pub valid: VersionRange,
772 pub flexible: VersionRange,
773 pub fields: &'static [(&'static str, &'static FieldMeta)],
774}
775
776#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
777pub struct MessageMeta {
779 pub name: &'static str,
781 pub api_key: i16,
783 pub version: Version,
785 pub message_kind: MessageKind,
787 pub fields: &'static [(&'static str, &'static FieldMeta)],
789}
790
791impl MessageMeta {
792 #[must_use]
793 pub fn is_flexible(&self, version: i16) -> bool {
794 self.version.flexible.within(version)
795 }
796
797 #[must_use]
798 pub fn structures(&self) -> Vec<(&str, &FieldMeta)> {
799 self.fields
800 .iter()
801 .filter(|(_, fm)| fm.is_structure())
802 .flat_map(|(name, fm)| {
803 debug!(name = self.name, field = ?name, kind = ?fm.kind.0);
804
805 let mut children = fm.structures();
806
807 if let Some(kind) = fm.kind.kind_of_sequence() {
808 if !kind.is_primitive() {
809 children.push((kind.name(), *fm));
810 }
811 } else {
812 children.push((fm.kind.name(), *fm));
813 }
814
815 children
816 })
817 .collect()
818 }
819
820 #[must_use]
821 pub fn field(&self, name: &str) -> Option<&'static FieldMeta> {
822 self.fields
823 .iter()
824 .find(|(found, _)| name == *found)
825 .map(|(_, meta)| *meta)
826 }
827}
828
829#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
830pub struct FieldMeta {
832 pub version: VersionRange,
834 pub nullable: Option<VersionRange>,
836 pub kind: KindMeta,
838 pub tag: Option<u32>,
840 pub tagged: Option<VersionRange>,
842 pub fields: &'static [(&'static str, &'static FieldMeta)],
844}
845
846impl FieldMeta {
847 #[must_use]
848 pub fn is_nullable(&self, version: i16) -> bool {
849 self.nullable.is_some_and(|range| range.within(version))
850 }
851
852 #[must_use]
853 pub fn is_mandatory(&self, parent: Option<VersionRange>) -> bool {
854 self.version.is_mandatory(parent)
855 }
856
857 #[must_use]
858 pub fn is_structure(&self) -> bool {
859 self.kind
860 .kind_of_sequence()
861 .is_some_and(|sk| !sk.is_primitive())
862 || !self.fields.is_empty()
863 }
864
865 #[must_use]
866 pub fn structures(&self) -> Vec<(&str, &FieldMeta)> {
867 self.fields
868 .iter()
869 .filter(|(_, fm)| fm.is_structure())
870 .flat_map(|(_, fm)| {
871 let mut children = fm.structures();
872
873 if let Some(kind) = fm.kind.kind_of_sequence()
874 && !kind.is_primitive()
875 {
876 children.push((kind.name(), *fm));
877 }
878
879 children
880 })
881 .collect()
882 }
883
884 pub fn field(&self, name: &str) -> Option<&FieldMeta> {
885 self.fields
886 .iter()
887 .find(|field| name == field.0)
888 .map(|(_, meta)| *meta)
889 }
890}
891
892#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
893pub struct KindMeta(pub &'static str);
894
895impl KindMeta {
896 #[must_use]
897 pub fn name(&self) -> &'static str {
898 self.0
899 }
900
901 #[must_use]
902 pub fn is_sequence(&self) -> bool {
903 self.0.starts_with("[]")
904 }
905
906 #[must_use]
907 pub fn is_primitive(&self) -> bool {
908 PRIMITIVE.contains(&self.0)
909 }
910
911 #[must_use]
912 pub fn is_string(&self) -> bool {
913 self.0 == "string"
914 }
915
916 #[must_use]
917 pub fn is_records(&self) -> bool {
918 self.0 == "records"
919 }
920
921 #[must_use]
922 pub fn kind_of_sequence(&self) -> Option<Self> {
923 if self.is_sequence() {
924 Some(Self(&self.0[2..]))
925 } else {
926 None
927 }
928 }
929}
930
931fn as_str<'v>(value: &'v Value, name: &str) -> Result<&'v str> {
932 value[name]
933 .as_str()
934 .ok_or(Error::Message(String::from(name)))
935}
936
937#[cfg(test)]
938mod tests {
939 use std::{any::type_name_of_val, collections::HashMap};
940
941 use serde_json::json;
942
943 use super::*;
944
945 const PRIMITIVES: [&str; 10] = [
946 "bool", "bytes", "float64", "int16", "int32", "int64", "int8", "string", "uint16", "uuid",
947 ];
948
949 #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
950 struct Header {
951 name: String,
952 valid: VersionRange,
953 flexible: VersionRange,
954 fields: Vec<Field>,
955 }
956
957 impl<'a> TryFrom<&Wv<'a>> for Header {
958 type Error = Error;
959
960 fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
961 let fields: &[Value] = value.as_a("fields")?;
962
963 Ok(Header {
964 name: value.as_a("name")?,
965 valid: value.as_a("validVersions")?,
966 flexible: value.as_a("flexibleVersions")?,
967 fields: fields.iter().try_fold(Vec::new(), |mut acc, field| {
968 Field::try_from(&Wv::from(field)).map(|f| {
969 acc.push(f);
970 acc
971 })
972 })?,
973 })
974 }
975 }
976
977 #[test]
978 fn type_mapping_test() {
979 assert_eq!("bytes::Bytes", type_mapping("bytes"));
980 assert_eq!("f64", type_mapping("float64"));
981 assert_eq!("i16", type_mapping("int16"));
982 assert_eq!("i32", type_mapping("int32"));
983 assert_eq!("i64", type_mapping("int64"));
984 assert_eq!("i8", type_mapping("int8"));
985 assert_eq!("String", type_mapping("string"));
986 assert_eq!("u16", type_mapping("uint16"));
987 assert_eq!("[u8; 16]", type_mapping("uuid"));
988
989 assert_eq!("i16", type_mapping("[]int16"));
990 assert_eq!("SomeType", type_mapping("[]SomeType"));
991
992 assert_eq!("SomeType", type_mapping("SomeType"));
993 }
994
995 #[test]
996 fn kind_is_sequence() {
997 for primitive in PRIMITIVES {
998 let k = Kind::new(primitive);
999 assert!(!k.is_sequence());
1000 }
1001
1002 let sequences = vec!["[]int16", "[]string", "[]SomeType"];
1003
1004 for sequence in sequences {
1005 let k = Kind::new(sequence);
1006 assert!(k.is_sequence());
1007 }
1008 }
1009
1010 #[test]
1011 fn kind_is_sequence_of_primitive() {
1012 for primitive in PRIMITIVES {
1013 let k = Kind::new(primitive);
1014 assert!(!k.is_sequence_of_primitive());
1015 }
1016
1017 let primitive_sequences: Vec<String> = PRIMITIVES
1018 .iter()
1019 .map(|primitive| format!("[]{primitive}"))
1020 .collect();
1021
1022 for sequence in primitive_sequences {
1023 let k = Kind::new(sequence.as_str());
1024 assert!(k.is_sequence_of_primitive());
1025 }
1026
1027 let sequences = vec!["[]SomeType", "[]OtherType"];
1028
1029 for sequence in sequences {
1030 let k = Kind::new(sequence);
1031 assert!(!k.is_sequence_of_primitive());
1032 }
1033 }
1034
1035 #[test]
1036 fn kind_is_primitive() {
1037 for primitive in PRIMITIVES {
1038 let k = Kind::new(primitive);
1039 assert!(k.is_primitive());
1040 }
1041
1042 let primitive_sequences: Vec<String> = PRIMITIVES
1043 .iter()
1044 .map(|primitive| format!("[]{primitive}"))
1045 .collect();
1046
1047 for sequence in primitive_sequences {
1048 let k = Kind::new(sequence.as_str());
1049 assert!(!k.is_primitive());
1050 }
1051 }
1052
1053 #[test]
1054 fn listener_from_value() -> Result<()> {
1055 assert_eq!(Listener::ZkBroker, Listener::try_from(&json!("zkBroker"))?);
1056
1057 assert_eq!(Listener::Broker, Listener::try_from(&json!("broker"))?);
1058
1059 assert_eq!(
1060 Listener::Controller,
1061 Listener::try_from(&json!("controller"))?
1062 );
1063
1064 Ok(())
1065 }
1066
1067 #[test]
1068 fn message_kind_from_value() -> Result<()> {
1069 assert_eq!(
1070 MessageKind::Request,
1071 MessageKind::try_from(&json!({
1072 "type": "request"
1073 }
1074 ))?
1075 );
1076
1077 assert_eq!(
1078 MessageKind::Response,
1079 MessageKind::try_from(&json!({
1080 "type": "response"
1081 }
1082 ))?
1083 );
1084
1085 Ok(())
1086 }
1087
1088 #[test]
1089 fn version_range_from_str() -> Result<()> {
1090 assert_eq!(
1091 VersionRange { start: 0, end: 0 },
1092 VersionRange::from_str("none")?
1093 );
1094
1095 assert_eq!(
1096 VersionRange {
1097 start: 3,
1098 end: i16::MAX
1099 },
1100 VersionRange::from_str("3+")?
1101 );
1102
1103 assert_eq!(
1104 VersionRange { start: 6, end: 9 },
1105 VersionRange::from_str("6-9")?
1106 );
1107
1108 assert_eq!(
1109 VersionRange { start: 1, end: 1 },
1110 VersionRange::from_str("1")?
1111 );
1112
1113 Ok(())
1114 }
1115
1116 #[test]
1117 fn version_range_within() {
1118 {
1119 let range = VersionRange { start: 0, end: 0 };
1120 assert!(!range.within(i16::MIN));
1121 assert!(range.within(0));
1122 assert!(!range.within(1));
1123 assert!(!range.within(i16::MAX));
1124 }
1125
1126 {
1127 let range = VersionRange {
1128 start: 3,
1129 end: i16::MAX,
1130 };
1131 assert!(!range.within(i16::MIN));
1132 assert!(!range.within(2));
1133 assert!(range.within(3));
1134 assert!(range.within(i16::MAX));
1135 }
1136
1137 {
1138 let range = VersionRange { start: 6, end: 9 };
1139 assert!(!range.within(i16::MIN));
1140 assert!(!range.within(5));
1141 assert!(range.within(6));
1142 assert!(range.within(7));
1143 assert!(range.within(8));
1144 assert!(range.within(9));
1145 assert!(!range.within(10));
1146 assert!(!range.within(i16::MAX));
1147 }
1148 }
1149
1150 #[test]
1151 fn field_from_value() -> Result<()> {
1152 assert_eq!(
1153 Field {
1154 name: String::from("Topics"),
1155 kind: Kind(String::from("[]CreatableTopic")),
1156 about: Some(String::from("The topics to create.")),
1157 versions: VersionRange::from_str("0+")?,
1158 map_key: None,
1159 nullable: None,
1160 tag: None,
1161 tagged: None,
1162 entity_type: None,
1163 default: None,
1164 fields: Some(vec![Field {
1165 name: String::from("Name"),
1166 kind: Kind(String::from("string")),
1167 versions: VersionRange::from_str("0+")?,
1168 map_key: Some(true),
1169 entity_type: Some(String::from("topicName")),
1170 about: Some(String::from("The topic name.")),
1171 default: None,
1172 nullable: None,
1173 tag: None,
1174 tagged: None,
1175 fields: None,
1176 }]),
1177 },
1178 serde_json::from_str::<Value>(
1179 r#"
1180 {
1181 "name": "Topics",
1182 "type": "[]CreatableTopic",
1183 "versions": "0+",
1184 "about": "The topics to create.",
1185 "fields": [
1186 { "name": "Name",
1187 "type": "string",
1188 "versions": "0+",
1189 "mapKey": true,
1190 "entityType": "topicName",
1191 "about": "The topic name."
1192 }]
1193 }
1194 "#
1195 )
1196 .map_err(Into::into)
1197 .and_then(|v| Field::try_from(&Wv::from(&v)))?
1198 );
1199
1200 Ok(())
1201 }
1202
1203 #[allow(clippy::too_many_lines)]
1204 #[test]
1205 fn tagged_field_from_value() -> Result<()> {
1206 assert_eq!(
1207 Field {
1208 name: "SupportedFeatures".into(),
1209 kind: Kind("[]SupportedFeatureKey".into()),
1210 about: Some("Features supported by the broker.".into()),
1211 versions: VersionRange {
1212 start: 3,
1213 end: 32767
1214 },
1215 map_key: None,
1216 nullable: None,
1217 tag: Some(0),
1218 tagged: Some(VersionRange {
1219 start: 3,
1220 end: 32767
1221 }),
1222 entity_type: None,
1223 default: None,
1224 fields: Some(
1225 [
1226 Field {
1227 name: "Name".into(),
1228 kind: Kind("string".into()),
1229 about: Some("The name of the feature.".into()),
1230 versions: VersionRange {
1231 start: 3,
1232 end: 32767
1233 },
1234 map_key: Some(true),
1235 nullable: None,
1236 tag: None,
1237 tagged: None,
1238 entity_type: None,
1239 default: None,
1240 fields: None
1241 },
1242 Field {
1243 name: "MinVersion".into(),
1244 kind: Kind("int16".into()),
1245 about: Some("The minimum supported version for the feature.".into()),
1246 versions: VersionRange {
1247 start: 3,
1248 end: 32767
1249 },
1250 map_key: None,
1251 nullable: None,
1252 tag: None,
1253 tagged: None,
1254 entity_type: None,
1255 default: None,
1256 fields: None
1257 },
1258 Field {
1259 name: "MaxVersion".into(),
1260 kind: Kind("int16".into()),
1261 about: Some("The maximum supported version for the feature.".into()),
1262 versions: VersionRange {
1263 start: 3,
1264 end: 32767
1265 },
1266 map_key: None,
1267 nullable: None,
1268 tag: None,
1269 tagged: None,
1270 entity_type: None,
1271 default: None,
1272 fields: None
1273 }
1274 ]
1275 .into()
1276 )
1277 },
1278 serde_json::from_str::<Value>(
1279 r#"
1280 { "name": "SupportedFeatures",
1281 "type": "[]SupportedFeatureKey",
1282 "ignorable": true,
1283 "versions": "3+",
1284 "tag": 0,
1285 "taggedVersions": "3+",
1286 "about": "Features supported by the broker.",
1287 "fields": [
1288 { "name": "Name",
1289 "type": "string",
1290 "versions": "3+",
1291 "mapKey": true,
1292 "about": "The name of the feature." },
1293 { "name": "MinVersion",
1294 "type": "int16",
1295 "versions": "3+",
1296 "about": "The minimum supported version for the feature." },
1297 { "name": "MaxVersion",
1298 "type": "int16",
1299 "versions": "3+",
1300 "about": "The maximum supported version for the feature." }
1301 ]
1302 }
1303 "#
1304 )
1305 .map_err(Into::into)
1306 .and_then(|v| Field::try_from(&Wv::from(&v)))?
1307 );
1308
1309 Ok(())
1310 }
1311
1312 #[test]
1313 fn untagged_message() -> Result<()> {
1314 let m = serde_json::from_str::<Value>(
1315 r#"
1316 {
1317 "apiKey": 25,
1318 "type": "request",
1319 "listeners": ["zkBroker", "broker"],
1320 "name": "AddOffsetsToTxnRequest",
1321 "validVersions": "0-3",
1322 "flexibleVersions": "3+",
1323 "fields": [
1324 { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
1325 "about": "The transactional id corresponding to the transaction."},
1326 { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
1327 "about": "Current producer id in use by the transactional id." },
1328 { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
1329 "about": "Current epoch associated with the producer id." },
1330 { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
1331 "about": "The unique group identifier." }
1332 ]
1333 }
1334 "#,
1335 ).map_err(Into::into)
1336 .and_then(|v| Message::try_from(&Wv::from(&v)))?;
1337
1338 assert_eq!(MessageKind::Request, m.kind());
1339
1340 assert!(!m.has_tags());
1341
1342 Ok(())
1343 }
1344
1345 #[test]
1346 fn tagged_message() -> Result<()> {
1347 let m = Message::try_from(&Wv::from(&json!(
1348 {
1349 "apiKey": 63,
1350 "type": "request",
1351 "listeners": ["controller"],
1352 "name": "BrokerHeartbeatRequest",
1353 "validVersions": "0-1",
1354 "flexibleVersions": "0+",
1355 "fields": [
1356 { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
1357 "about": "The broker ID." },
1358 { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
1359 "about": "The broker epoch." },
1360 { "name": "CurrentMetadataOffset", "type": "int64", "versions": "0+",
1361 "about": "The highest metadata offset which the broker has reached." },
1362 { "name": "WantFence", "type": "bool", "versions": "0+",
1363 "about": "True if the broker wants to be fenced, false otherwise." },
1364 { "name": "WantShutDown", "type": "bool", "versions": "0+",
1365 "about": "True if the broker wants to be shut down, false otherwise." },
1366 { "name": "OfflineLogDirs", "type": "[]uuid", "versions": "1+", "taggedVersions": "1+", "tag": "0",
1367 "about": "Log directories that failed and went offline." }
1368 ]
1369 }
1370 )))?;
1371
1372 assert_eq!(MessageKind::Request, m.kind());
1373 assert!(m.has_tags());
1374
1375 Ok(())
1376 }
1377
1378 #[allow(clippy::too_many_lines)]
1379 #[test]
1380 fn message_from_value() -> Result<()> {
1381 assert_eq!(
1382 Message {
1383 api_key: 19,
1384 kind: MessageKind::Request,
1385 listeners: Some(vec![
1386 Listener::ZkBroker,
1387 Listener::Broker,
1388 Listener::Controller
1389 ]),
1390 name: String::from("CreateTopicsRequest"),
1391 versions: Version {
1392 valid: VersionRange::from_str("0-7")?,
1393 deprecated: Some(VersionRange::from_str("0-1")?),
1394 flexible: VersionRange::from_str("5+")?,
1395 },
1396 common_structs: Some(vec![CommonStruct {
1397 name: String::from("AddPartitionsToTxnTopic"),
1398 fields: vec![
1399 Field {
1400 name: String::from("Name"),
1401 kind: Kind(String::from("string")),
1402 versions: VersionRange::from_str("0+")?,
1403 map_key: Some(true),
1404 nullable: None,
1405 tag: None,
1406 tagged: None,
1407 default: None,
1408 fields: None,
1409 entity_type: Some(String::from("topicName")),
1410 about: Some(String::from("The name of the topic.")),
1411 },
1412 Field {
1413 name: String::from("Partitions"),
1414 kind: Kind(String::from("[]int32")),
1415 versions: VersionRange::from_str("0+")?,
1416 about: Some(String::from(
1417 "The partition indexes to add to the transaction"
1418 )),
1419 map_key: None,
1420 nullable: None,
1421 tag: None,
1422 tagged: None,
1423 default: None,
1424 fields: None,
1425 entity_type: None,
1426 }
1427 ],
1428 }]),
1429 fields: vec![Field {
1430 name: String::from("Topics"),
1431 kind: Kind(String::from("[]CreatableTopic")),
1432 about: Some(String::from("The topics to create.")),
1433 versions: VersionRange::from_str("0+")?,
1434 map_key: None,
1435 nullable: None,
1436 tag: None,
1437 tagged: None,
1438 entity_type: None,
1439 default: None,
1440 fields: Some(vec![Field {
1441 name: String::from("Name"),
1442 kind: Kind(String::from("string")),
1443 versions: VersionRange::from_str("0+")?,
1444 map_key: Some(true),
1445 entity_type: Some(String::from("topicName")),
1446 about: Some(String::from("The topic name.")),
1447 default: None,
1448 nullable: None,
1449 tag: None,
1450 tagged: None,
1451 fields: None,
1452 }])
1453 }],
1454 },
1455 serde_json::from_str::<Value>(
1456 r#"
1457 {
1458 "apiKey": 19,
1459 "type": "request",
1460 "listeners": ["zkBroker", "broker", "controller"],
1461 "name": "CreateTopicsRequest",
1462 "validVersions": "0-7",
1463 "deprecatedVersions": "0-1",
1464 "flexibleVersions": "5+",
1465 "fields": [
1466 {"name": "Topics",
1467 "type": "[]CreatableTopic",
1468 "versions": "0+",
1469 "about": "The topics to create.",
1470 "fields": [
1471 { "name": "Name",
1472 "type": "string",
1473 "versions": "0+",
1474 "mapKey": true,
1475 "entityType": "topicName",
1476 "about": "The topic name."
1477 }]}],
1478 "commonStructs": [
1479 { "name": "AddPartitionsToTxnTopic",
1480 "versions": "0+",
1481 "fields": [
1482 { "name": "Name",
1483 "type": "string",
1484 "versions": "0+",
1485 "mapKey": true,
1486 "entityType": "topicName",
1487 "about": "The name of the topic."
1488 },
1489 { "name": "Partitions",
1490 "type": "[]int32",
1491 "versions": "0+",
1492 "about": "The partition indexes to add to the transaction"
1493 }]}]
1494 }
1495 "#
1496 )
1497 .map_err(Into::into)
1498 .and_then(|v| Message::try_from(&Wv::from(&v)))?
1499 );
1500
1501 Ok(())
1502 }
1503
1504 #[allow(clippy::too_many_lines)]
1505 #[test]
1506 fn header_from_value() -> Result<()> {
1507 let v = serde_json::from_str::<Value>(
1508 r#"
1509 {
1510 "type": "header",
1511 "name": "RequestHeader",
1512 "validVersions": "0-2",
1513 "flexibleVersions": "2+",
1514 "fields": [
1515 { "name": "RequestApiKey", "type": "int16", "versions": "0+",
1516 "about": "The API key of this request." },
1517 { "name": "RequestApiVersion", "type": "int16", "versions": "0+",
1518 "about": "The API version of this request." },
1519 { "name": "CorrelationId", "type": "int32", "versions": "0+",
1520 "about": "The correlation ID of this request." },
1521
1522 { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true,
1523 "flexibleVersions": "none", "about": "The client ID string." }
1524 ]
1525 }
1526 "#,
1527 )?;
1528
1529 let wv = Wv::from(&v);
1530
1531 assert_eq!(
1532 Header {
1533 name: String::from("RequestHeader"),
1534 valid: VersionRange { start: 0, end: 2 },
1535 flexible: VersionRange {
1536 start: 2,
1537 end: i16::MAX
1538 },
1539 fields: vec![
1540 Field {
1541 name: String::from("RequestApiKey"),
1542 kind: Kind(String::from("int16")),
1543 about: Some(String::from("The API key of this request.")),
1544 versions: VersionRange {
1545 start: 0,
1546 end: 32767
1547 },
1548 map_key: None,
1549 nullable: None,
1550 tag: None,
1551 tagged: None,
1552 entity_type: None,
1553 default: None,
1554 fields: None
1555 },
1556 Field {
1557 name: String::from("RequestApiVersion"),
1558 kind: Kind(String::from("int16")),
1559 about: Some(String::from("The API version of this request.")),
1560 versions: VersionRange {
1561 start: 0,
1562 end: 32767
1563 },
1564 map_key: None,
1565 nullable: None,
1566 tag: None,
1567 tagged: None,
1568 entity_type: None,
1569 default: None,
1570 fields: None
1571 },
1572 Field {
1573 name: String::from("CorrelationId"),
1574 kind: Kind(String::from("int32")),
1575 about: Some(String::from("The correlation ID of this request.")),
1576 versions: VersionRange {
1577 start: 0,
1578 end: 32767
1579 },
1580 map_key: None,
1581 nullable: None,
1582 tag: None,
1583 tagged: None,
1584 entity_type: None,
1585 default: None,
1586 fields: None
1587 },
1588 Field {
1589 name: String::from("ClientId"),
1590 kind: Kind(String::from("string")),
1591 about: Some(String::from("The client ID string.")),
1592 versions: VersionRange {
1593 start: 1,
1594 end: 32767
1595 },
1596 map_key: None,
1597 nullable: Some(VersionRange {
1598 start: 1,
1599 end: 32767
1600 }),
1601 tag: None,
1602 tagged: None,
1603 entity_type: None,
1604 default: None,
1605 fields: None
1606 }
1607 ],
1608 },
1609 Header::try_from(&wv)?
1610 );
1611 Ok(())
1612 }
1613
1614 #[test]
1615 fn parse_expression() -> Result<()> {
1616 let Expr::Array(expression) =
1617 syn::parse_str::<Expr>(r#"[(one, "a/b/c"), (abc, 123), (pqr, a::b::c)]"#)?
1618 else {
1619 return Err(Error::Message(String::from("expecting an array")));
1620 };
1621
1622 let mut mappings = HashMap::new();
1623
1624 for expression in expression.elems {
1625 let Expr::Tuple(tuple) = expression else {
1626 return Err(Error::Message(String::from("expecting a tuple")));
1627 };
1628
1629 assert_eq!(2, tuple.elems.len());
1630
1631 println!(
1632 "i: {}, ty: {}",
1633 tuple.to_token_stream(),
1634 type_name_of_val(&tuple)
1635 );
1636
1637 let Expr::Path(ref lhs) = tuple.elems[0] else {
1638 return Err(Error::Message(String::from(
1639 "lhs expecting a path expression",
1640 )));
1641 };
1642
1643 let Some(lhs) = lhs.path.get_ident() else {
1644 return Err(Error::Message(String::from(
1645 "lhs expecting a path ident expression",
1646 )));
1647 };
1648
1649 _ = mappings.insert(lhs.clone(), tuple.elems[1].clone());
1650
1651 println!("lhs: {}", lhs.to_token_stream());
1652 println!("rhs: {}", tuple.elems[1].to_token_stream());
1653 }
1654
1655 let one = syn::parse_str::<Ident>("one")?;
1656 assert!(mappings.contains_key(&one));
1657
1658 Ok(())
1659 }
1660
1661 #[test]
1662 fn find_coordinator_request() -> Result<()> {
1663 let m = serde_json::from_str::<Value>(
1664 r#"
1665 {
1666 "apiKey": 10,
1667 "type": "request",
1668 "listeners": ["zkBroker", "broker"],
1669 "name": "FindCoordinatorRequest",
1670 "validVersions": "0-4",
1671 "deprecatedVersions": "0",
1672 "flexibleVersions": "3+",
1673 "fields": [
1674 { "name": "Key", "type": "string", "versions": "0-3",
1675 "about": "The coordinator key." },
1676 { "name": "KeyType", "type": "int8", "versions": "1+", "default": "0", "ignorable": false,
1677 "about": "The coordinator key type. (Group, transaction, etc.)" },
1678 { "name": "CoordinatorKeys", "type": "[]string", "versions": "4+",
1679 "about": "The coordinator keys." }
1680 ]
1681 }
1682 "#,
1683 )
1684 .map_err(Into::into)
1685 .and_then(|v| Message::try_from(&Wv::from(&v)))?;
1686
1687 assert!(!m.has_records());
1688 assert_eq!(MessageKind::Request, m.kind());
1689 assert_eq!("FindCoordinatorRequest", m.name());
1690 assert_eq!(
1691 Version {
1692 valid: VersionRange { start: 0, end: 4 },
1693 deprecated: Some(VersionRange { start: 0, end: 0 }),
1694 flexible: VersionRange {
1695 start: 3,
1696 end: i16::MAX
1697 },
1698 },
1699 m.version()
1700 );
1701
1702 assert_eq!("Key", m.fields()[0].name());
1703 assert_eq!(Kind::new("string"), m.fields()[0].kind);
1704 assert_eq!(VersionRange { start: 0, end: 3 }, m.fields()[0].versions());
1705 assert_eq!(Some("The coordinator key."), m.fields()[0].about());
1706
1707 Ok(())
1708 }
1709
1710 #[test]
1711 fn fetch_response() -> Result<()> {
1712 let m = serde_json::from_str::<Value>(
1713 r#"
1714 {
1715 "apiKey": 1,
1716 "type": "response",
1717 "name": "FetchResponse",
1718 "validVersions": "0-16",
1719 "flexibleVersions": "12+",
1720 "fields": [
1721 { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "16+", "tag": 0,
1722 "about": "Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.", "fields": [
1723 { "name": "NodeId", "type": "int32", "versions": "16+",
1724 "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."},
1725 { "name": "Host", "type": "string", "versions": "16+", "about": "The node's hostname." },
1726 { "name": "Port", "type": "int32", "versions": "16+", "about": "The node's port." },
1727 { "name": "Rack", "type": "string", "versions": "16+", "nullableVersions": "16+", "default": "null",
1728 "about": "The rack of the node, or null if it has not been assigned to a rack." }
1729 ]}
1730 ]
1731 }
1732 "#,
1733 )
1734 .map_err(Into::into)
1735 .and_then(|v| Message::try_from(&Wv::from(&v)))?;
1736
1737 assert_eq!(MessageKind::Response, m.kind());
1738 assert_eq!("FetchResponse", m.name());
1739 assert_eq!(
1740 Version {
1741 valid: VersionRange { start: 0, end: 16 },
1742 deprecated: None,
1743 flexible: VersionRange {
1744 start: 12,
1745 end: i16::MAX
1746 },
1747 },
1748 m.version()
1749 );
1750
1751 assert_eq!("NodeEndpoints", m.fields()[0].name());
1752 assert_eq!(Kind::new("[]NodeEndpoint"), m.fields()[0].kind);
1753 assert_eq!(
1754 VersionRange {
1755 start: 16,
1756 end: i16::MAX
1757 },
1758 m.fields()[0].versions()
1759 );
1760
1761 let node_id = &m.fields()[0].fields().unwrap_or_default()[0];
1762
1763 assert_eq!("NodeId", node_id.name());
1764 assert_eq!(Kind::new("int32"), node_id.kind);
1765 assert_eq!(
1766 VersionRange {
1767 start: 16,
1768 end: i16::MAX
1769 },
1770 node_id.versions()
1771 );
1772
1773 Ok(())
1774 }
1775}