1pub mod error;
21pub mod wv;
22
23use convert_case::{Case, Casing};
24pub use error::Error;
25use lazy_static::lazy_static;
26use proc_macro2::{Ident, Span, TokenStream};
27use quote::ToTokens;
28use regex::Regex;
29use serde_json::Value;
30use std::{collections::BTreeMap, fmt, str::FromStr};
31use syn::{Expr, Type};
32use tracing::debug;
33use wv::{As, AsOption, Wv};
34
35macro_rules! prefix_crate {
36 ($e:ident) => {
37 format!("{}::{}", env!("CARGO_CRATE_NAME"), stringify!($e))
38 };
39}
40
41macro_rules! with_crate {
42 ($e:expr_2021) => {
43 format!("{}::{:?}", env!("CARGO_CRATE_NAME"), $e)
44 };
45
46 ($e:expr_2021, $f:expr_2021) => {
47 format!("{}::{}::{:?}", env!("CARGO_CRATE_NAME"), $e, $f)
48 };
49}
50
51pub type Result<T, E = Error> = std::result::Result<T, E>;
52
53fn type_mapping(kind: &str) -> String {
54 match kind {
55 "bytes" => String::from("bytes::Bytes"),
56 "float64" => String::from("f64"),
57 "int16" => String::from("i16"),
58 "int32" => String::from("i32"),
59 "int64" => String::from("i64"),
60 "int8" => String::from("i8"),
61 "string" => String::from("String"),
62 "uint16" => String::from("u16"),
63 "uuid" => String::from("[u8; 16]"),
64 "records" => String::from("crate::RecordBatch"),
65
66 sequence if sequence.starts_with("[]") => type_mapping(&sequence[2..]),
67
68 s => String::from(s),
69 }
70}
71
72#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
73pub struct Kind(String);
75
76impl ToTokens for Kind {
77 fn to_tokens(&self, tokens: &mut TokenStream) {
78 let expr = with_crate!(self);
79 syn::parse_str::<Expr>(&expr).unwrap().to_tokens(tokens);
80 }
81}
82
83impl From<Type> for Kind {
84 fn from(value: Type) -> Self {
85 Self(value.to_token_stream().to_string())
86 }
87}
88
89lazy_static! {
90 static ref PRIMITIVE: &'static [&'static str] = &[
91 "bool", "bytes", "float64", "int16", "int32", "int64", "int8", "records", "string",
92 "uint16", "uuid",
93 ];
94}
95
96impl Kind {
97 #[must_use]
98 pub fn new(kind: &str) -> Self {
99 Self(kind.to_owned())
100 }
101
102 #[must_use]
103 pub fn name(&self) -> &str {
104 &self.0[..]
105 }
106
107 #[must_use]
108 #[allow(clippy::missing_panics_doc)]
109 pub fn type_name(&self) -> Type {
111 syn::parse_str::<Type>(&type_mapping(&self.0))
112 .unwrap_or_else(|_| panic!("not a type: {self:?}"))
113 }
114
115 #[must_use]
116 pub fn is_sequence(&self) -> bool {
118 self.0.starts_with("[]")
119 }
120
121 #[must_use]
122 pub fn is_primitive(&self) -> bool {
124 PRIMITIVE.contains(&self.0.as_str())
125 }
126
127 #[must_use]
128 pub fn is_float(&self) -> bool {
130 self.0.eq("float64")
131 }
132
133 #[must_use]
134 pub fn is_sequence_of_primitive(&self) -> bool {
136 self.is_sequence() && PRIMITIVE.contains(&&self.0[2..])
137 }
138
139 #[must_use]
140 pub fn kind_of_sequence(&self) -> Option<Self> {
142 if self.is_sequence() {
143 Some(Self::new(&self.0[2..]))
144 } else {
145 None
146 }
147 }
148}
149
150impl FromStr for Kind {
151 type Err = Error;
152
153 fn from_str(s: &str) -> Result<Self, Self::Err> {
154 Ok(Self(s.to_owned()))
155 }
156}
157
158impl TryFrom<&Value> for Kind {
159 type Error = Error;
160
161 fn try_from(value: &Value) -> Result<Self, Self::Error> {
162 as_str(value, "type").and_then(Self::from_str)
163 }
164}
165
166#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
167pub enum Listener {
169 ZkBroker,
170 #[default]
171 Broker,
172 Controller,
173}
174
175impl TryFrom<&Value> for Listener {
176 type Error = Error;
177
178 fn try_from(value: &Value) -> Result<Self, Self::Error> {
179 value
180 .as_str()
181 .ok_or(Error::Message(String::from(
182 "expecting string for listener",
183 )))
184 .and_then(Self::from_str)
185 }
186}
187
188impl FromStr for Listener {
189 type Err = Error;
190
191 fn from_str(s: &str) -> Result<Self, Self::Err> {
192 match s {
193 "zkBroker" => Ok(Self::ZkBroker),
194 "broker" => Ok(Self::Broker),
195 "controller" => Ok(Self::Controller),
196 s => Err(Error::Message(String::from(s))),
197 }
198 }
199}
200
201#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
202pub enum MessageKind {
204 #[default]
205 Request,
206 Response,
207}
208
209impl ToTokens for MessageKind {
210 fn to_tokens(&self, tokens: &mut TokenStream) {
211 let expr = with_crate!("MessageKind", self);
212 syn::parse_str::<Expr>(&expr).unwrap().to_tokens(tokens);
213 }
214}
215
216impl TryFrom<&Value> for MessageKind {
217 type Error = Error;
218
219 fn try_from(value: &Value) -> Result<Self, Self::Error> {
220 as_str(value, "type").and_then(Self::from_str)
221 }
222}
223
224impl FromStr for MessageKind {
225 type Err = Error;
226
227 fn from_str(s: &str) -> Result<Self, Self::Err> {
228 match s {
229 "request" => Ok(Self::Request),
230 "response" => Ok(Self::Response),
231 s => Err(Error::Message(String::from(s))),
232 }
233 }
234}
235
236#[derive(Clone, Copy, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
237pub struct VersionRange {
239 pub start: i16,
240 pub end: i16,
241}
242
243impl fmt::Debug for VersionRange {
244 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245 f.debug_struct(&prefix_crate!(VersionRange))
246 .field("start", &self.start)
247 .field("end", &self.end)
248 .finish()
249 }
250}
251
252impl VersionRange {
253 #[must_use]
254 pub fn within(&self, version: i16) -> bool {
255 version >= self.start && version <= self.end
256 }
257
258 #[must_use]
259 pub fn is_mandatory(&self, parent: Option<VersionRange>) -> bool {
260 parent.map_or(
261 self.start == 0 && self.end == i16::MAX,
262 |VersionRange { start, end }| {
263 if start > self.start {
264 self.end == end
265 } else {
266 self.start == start && self.end == end
267 }
268 },
269 )
270 }
271}
272
273impl ToTokens for VersionRange {
274 fn to_tokens(&self, tokens: &mut TokenStream) {
275 let expr = format!("{self:?}");
276 syn::parse_str::<Expr>(&expr)
277 .unwrap_or_else(|_| panic!("an expression: {self:?}"))
278 .to_tokens(tokens);
279 }
280}
281
282impl FromStr for VersionRange {
283 type Err = Error;
284
285 fn from_str(s: &str) -> Result<Self, Self::Err> {
286 lazy_static! {
287 static ref RX: Regex =
288 Regex::new(r"^(?<start>\d+)(-(?<end>\d+)|(?<infinite>\+))?$").expect("regex");
289 }
290
291 if s == "none" {
292 Ok(VersionRange { start: 0, end: 0 })
293 } else {
294 RX.captures(s)
295 .ok_or(Error::Message(format!("invalid version range format: {s}")))
296 .and_then(|captures| {
297 let parse = |name, default: i16| {
298 captures
299 .name(name)
300 .map_or(Ok(&default.to_string()[..]), |s| Ok(s.as_str()))
301 .and_then(str::parse)
302 };
303
304 let start = parse("start", 0)?;
305
306 let end = if let Some(end) = captures.name("end") {
307 str::parse(end.as_str())?
308 } else if captures.name("infinite").is_some() {
309 i16::MAX
310 } else {
311 start
312 };
313
314 Ok(VersionRange { start, end })
315 })
316 }
317 }
318}
319
320#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
321pub struct Version {
323 pub valid: VersionRange,
325 pub deprecated: Option<VersionRange>,
327 pub flexible: VersionRange,
329}
330
331impl ToTokens for Version {
332 fn to_tokens(&self, tokens: &mut TokenStream) {
333 let expr = with_crate!(self);
334 syn::parse_str::<Expr>(&expr).unwrap().to_tokens(tokens);
335 }
336}
337
338impl Version {
339 #[must_use]
340 pub fn valid(&self) -> VersionRange {
341 self.valid
342 }
343
344 #[must_use]
345 pub fn deprecated(&self) -> Option<VersionRange> {
346 self.deprecated
347 }
348
349 #[must_use]
350 pub fn flexible(&self) -> VersionRange {
351 self.flexible
352 }
353}
354
355impl<'a> TryFrom<&Wv<'a>> for Version {
356 type Error = Error;
357
358 fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
359 debug!("value: {:?}", value);
360
361 Ok(Self {
362 valid: value.as_a("validVersions")?,
363 deprecated: value.as_option("deprecatedVersions")?,
364 flexible: value.as_a("flexibleVersions")?,
365 })
366 }
367}
368
369#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
370pub struct Field {
372 name: String,
374 kind: Kind,
376 about: Option<String>,
378 versions: VersionRange,
380 map_key: Option<bool>,
382 nullable: Option<VersionRange>,
384 tag: Option<u32>,
386 tagged: Option<VersionRange>,
388 entity_type: Option<String>,
390 default: Option<String>,
392 fields: Option<Vec<Field>>,
394}
395
396impl Field {
397 #[must_use]
398 pub fn ident(&self) -> Ident {
399 match self.name.to_case(Case::Snake) {
400 reserved if is_reserved_keyword(&reserved) => {
401 Ident::new_raw(&reserved, Span::call_site())
402 }
403
404 otherwise => Ident::new(&otherwise, Span::call_site()),
405 }
406 }
407
408 #[must_use]
409 pub fn name(&self) -> &str {
410 &self.name
411 }
412
413 #[must_use]
414 pub fn kind(&self) -> &Kind {
415 &self.kind
416 }
417
418 #[must_use]
419 pub fn fields(&self) -> Option<&[Field]> {
420 self.fields.as_deref()
421 }
422
423 #[must_use]
424 pub fn versions(&self) -> VersionRange {
425 self.versions
426 }
427
428 #[must_use]
429 pub fn nullable(&self) -> Option<VersionRange> {
430 self.nullable
431 }
432
433 #[must_use]
434 pub fn tagged(&self) -> Option<VersionRange> {
435 self.tagged
436 }
437
438 #[must_use]
439 pub fn tag(&self) -> Option<u32> {
440 self.tag
441 }
442
443 #[must_use]
444 pub fn about(&self) -> Option<&str> {
445 self.about.as_deref()
446 }
447
448 #[must_use]
449 pub fn has_tags(&self) -> bool {
450 self.tagged.is_some()
451 }
452
453 #[must_use]
454 pub fn has_records(&self) -> bool {
455 self.kind.0 == "records"
456 || self
457 .fields()
458 .is_some_and(|fields| fields.iter().any(Field::has_records))
459 }
460
461 #[must_use]
462 pub fn has_float(&self) -> bool {
463 self.kind().is_float()
464 || self
465 .fields()
466 .is_some_and(|fields| fields.iter().any(Field::has_float))
467 }
468}
469
470impl<'a> TryFrom<&Wv<'a>> for Field {
471 type Error = Error;
472
473 fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
474 let name = value.as_a("name")?;
475 let kind = value.as_a("type")?;
476 let about = value.as_option("about")?;
477 let versions = value.as_a("versions")?;
478 let map_key = value.as_option("mapKey")?;
479 let nullable = value.as_option("nullableVersions")?;
480 let tag = value.as_option("tag")?;
481 let tagged = value.as_option("taggedVersions")?;
482 let entity_type = value.as_option("entityType")?;
483 let default = value.as_option("default")?;
484
485 let fields = value
486 .as_option("fields")?
487 .map_or(Ok(None), |values: &[Value]| {
488 values
489 .iter()
490 .try_fold(Vec::new(), |mut acc, field| {
491 Field::try_from(&Wv::from(field)).map(|f| {
492 acc.push(f);
493 acc
494 })
495 })
496 .map(Some)
497 })?;
498
499 Ok(Field {
500 name,
501 kind,
502 about,
503 versions,
504 map_key,
505 nullable,
506 tag,
507 tagged,
508 entity_type,
509 default,
510 fields,
511 })
512 }
513}
514
515fn is_reserved_keyword(s: &str) -> bool {
516 lazy_static! {
517 static ref RESERVED: Vec<&'static str> = vec!["as",
518 "break",
519 "const",
520 "continue",
521 "crate",
522 "else",
523 "enum",
524 "extern",
525 "false",
526 "fn",
527 "for",
528 "if",
529 "impl",
530 "in",
531 "let",
532 "loop",
533 "match",
534 "mod",
535 "move",
536 "mut",
537 "pub",
538 "ref",
539 "return",
540 "self",
541 "Self",
542 "static",
543 "struct",
544 "super",
545 "trait",
546 "true",
547 "type",
548 "unsafe",
549 "use",
550 "where",
551 "while",
552
553 "async",
555 "await",
556 "dyn",
557
558 "abstract",
560 "become",
561 "box",
562 "do",
563 "final",
564 "macro",
565 "override",
566 "priv",
567 "typeof",
568 "unsized",
569 "virtual",
570 "yield",
571
572 "try",
574 ];
575 }
576
577 RESERVED.contains(&s)
578}
579
580#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
581pub struct Message {
582 api_key: i16,
583 kind: MessageKind,
584 listeners: Option<Vec<Listener>>,
585 name: String,
586 versions: Version,
587 fields: Vec<Field>,
588 common_structs: Option<Vec<CommonStruct>>,
589}
590
591impl Message {
592 #[must_use]
593 pub fn api_key(&self) -> i16 {
594 self.api_key
595 }
596
597 #[must_use]
598 pub fn kind(&self) -> MessageKind {
599 self.kind
600 }
601
602 #[must_use]
603 pub fn name(&self) -> &str {
604 &self.name
605 }
606
607 #[must_use]
608 #[allow(clippy::missing_panics_doc)]
609 pub fn type_name(&self) -> Type {
610 syn::parse_str::<Type>(&self.name).unwrap()
611 }
612
613 #[must_use]
614 pub fn version(&self) -> Version {
615 self.versions
616 }
617
618 #[must_use]
619 pub fn listeners(&self) -> Option<&[Listener]> {
620 self.listeners.as_deref()
621 }
622
623 #[must_use]
624 pub fn fields(&self) -> &[Field] {
625 &self.fields
626 }
627
628 #[must_use]
629 #[allow(clippy::missing_panics_doc)]
630 pub fn wrapper_new_type(&self, field: &Field) -> Type {
631 syn::parse_str::<Type>(&format!("{}{}", self.name, field.name).to_case(Case::Pascal))
632 .unwrap()
633 }
634
635 #[must_use]
636 pub fn common_structs(&self) -> Option<&[CommonStruct]> {
637 self.common_structs.as_deref()
638 }
639
640 #[must_use]
641 pub fn has_records(&self) -> bool {
642 self.fields().iter().any(Field::has_records)
643 }
644
645 #[must_use]
646 pub fn has_tags(&self) -> bool {
647 self.fields().iter().any(Field::has_tags)
648 }
649
650 #[must_use]
651 pub fn has_float(&self) -> bool {
652 self.fields.iter().any(|field| field.kind().is_float())
653 || self
654 .common_structs()
655 .is_some_and(|structures| structures.iter().any(CommonStruct::has_float))
656 }
657}
658
659impl<'a> TryFrom<&Wv<'a>> for Message {
660 type Error = Error;
661
662 fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
663 let api_key = value.as_a("apiKey")?;
664 let name = value.as_a("name")?;
665 let kind = value.as_a("type")?;
666 let listeners = value
667 .as_option("listeners")?
668 .map_or(Ok(None), |maybe: &[Value]| {
669 maybe
670 .iter()
671 .try_fold(Vec::new(), |mut acc, listener| {
672 Listener::try_from(listener).map(|l| {
673 acc.push(l);
674 acc
675 })
676 })
677 .map(Some)
678 })?;
679
680 let fields = value.as_a("fields").and_then(|fields: &[Value]| {
681 fields.iter().try_fold(Vec::new(), |mut acc, field| {
682 Field::try_from(&Wv::from(field)).map(|f| {
683 acc.push(f);
684 acc
685 })
686 })
687 })?;
688
689 let versions = Version::try_from(value)?;
690 let common_structs =
691 value
692 .as_option("commonStructs")?
693 .map_or(Ok(None), |maybe: &[Value]| {
694 maybe
695 .iter()
696 .try_fold(Vec::new(), |mut acc, value| {
697 CommonStruct::try_from(&Wv::from(value)).map(|field| {
698 acc.push(field);
699 acc
700 })
701 })
702 .map(Some)
703 })?;
704
705 Ok(Self {
706 api_key,
707 kind,
708 listeners,
709 name,
710 versions,
711 fields,
712 common_structs,
713 })
714 }
715}
716
717#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
718pub struct CommonStruct {
719 name: String,
720 fields: Vec<Field>,
721}
722
723impl CommonStruct {
724 #[must_use]
725 pub fn name(&self) -> &str {
726 self.name.as_str()
727 }
728
729 #[must_use]
730 #[allow(clippy::missing_panics_doc)]
731 pub fn type_name(&self) -> Type {
732 syn::parse_str::<Type>(&self.name).unwrap_or_else(|_| panic!("not a type: {self:?}"))
733 }
734
735 #[must_use]
736 pub fn fields(&self) -> &Vec<Field> {
737 &self.fields
738 }
739
740 #[must_use]
741 pub fn has_float(&self) -> bool {
742 self.fields.iter().any(|field| field.kind().is_float())
743 }
744}
745
746impl<'a> TryFrom<&Wv<'a>> for CommonStruct {
747 type Error = Error;
748
749 fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
750 let name = value.as_a("name")?;
751 let fields = value.as_a("fields").and_then(|fields: &[Value]| {
752 fields.iter().try_fold(Vec::new(), |mut acc, field| {
753 Field::try_from(&Wv::from(field)).map(|f| {
754 acc.push(f);
755 acc
756 })
757 })
758 })?;
759
760 Ok(Self { name, fields })
761 }
762}
763
764#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
765pub struct HeaderMeta {
766 pub name: &'static str,
767 pub valid: VersionRange,
768 pub flexible: VersionRange,
769 pub fields: &'static [(&'static str, &'static FieldMeta)],
770}
771
772#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
773pub struct MessageMeta {
775 pub name: &'static str,
777 pub api_key: i16,
779 pub version: Version,
781 pub message_kind: MessageKind,
783 pub fields: &'static [(&'static str, &'static FieldMeta)],
785}
786
787impl MessageMeta {
788 #[must_use]
789 pub fn is_flexible(&self, version: i16) -> bool {
790 self.version.flexible.within(version)
791 }
792
793 #[must_use]
794 pub fn structures(&self) -> BTreeMap<&str, &FieldMeta> {
795 self.fields.iter().filter(|(_, fm)| fm.is_structure()).fold(
796 BTreeMap::new(),
797 |mut acc, (name, fm)| {
798 debug!(name = self.name, field = ?name, kind = ?fm.kind.0);
799
800 if let Some(kind) = fm.kind.kind_of_sequence() {
801 if !kind.is_primitive() {
802 _ = acc.insert(kind.name(), fm);
803 }
804 } else {
805 _ = acc.insert(fm.kind.name(), fm);
806 }
807
808 let mut children = fm.structures();
809 debug!(name = self.name, field = ?name, children = ?children.keys().collect::<Vec<_>>());
810 acc.append(&mut children);
811
812 acc
813 },
814 )
815 }
816
817 #[must_use]
818 pub fn field(&self, name: &str) -> Option<&'static FieldMeta> {
819 self.fields
820 .iter()
821 .find(|(found, _)| name == *found)
822 .map(|(_, meta)| *meta)
823 }
824}
825
826#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
827pub struct FieldMeta {
829 pub version: VersionRange,
831 pub nullable: Option<VersionRange>,
833 pub kind: KindMeta,
835 pub tag: Option<u32>,
837 pub tagged: Option<VersionRange>,
839 pub fields: &'static [(&'static str, &'static FieldMeta)],
841}
842
843impl FieldMeta {
844 #[must_use]
845 pub fn is_nullable(&self, version: i16) -> bool {
846 self.nullable.is_some_and(|range| range.within(version))
847 }
848
849 #[must_use]
850 pub fn is_mandatory(&self, parent: Option<VersionRange>) -> bool {
851 self.version.is_mandatory(parent)
852 }
853
854 #[must_use]
855 pub fn is_structure(&self) -> bool {
856 self.kind
857 .kind_of_sequence()
858 .is_some_and(|sk| !sk.is_primitive())
859 || !self.fields.is_empty()
860 }
861
862 #[must_use]
863 pub fn structures(&self) -> BTreeMap<&str, &FieldMeta> {
864 self.fields.iter().filter(|(_, fm)| fm.is_structure()).fold(
865 BTreeMap::new(),
866 |mut acc, (_name, fm)| {
867 if let Some(kind) = fm.kind.kind_of_sequence()
868 && !kind.is_primitive()
869 {
870 _ = acc.insert(kind.name(), fm);
871 }
872
873 let mut children = fm.structures();
874 acc.append(&mut children);
875 acc
876 },
877 )
878 }
879
880 pub fn field(&self, name: &str) -> Option<&FieldMeta> {
881 self.fields
882 .iter()
883 .find(|field| name == field.0)
884 .map(|(_, meta)| *meta)
885 }
886}
887
888#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
889pub struct KindMeta(pub &'static str);
890
891impl KindMeta {
892 #[must_use]
893 pub fn name(&self) -> &'static str {
894 self.0
895 }
896
897 #[must_use]
898 pub fn is_sequence(&self) -> bool {
899 self.0.starts_with("[]")
900 }
901
902 #[must_use]
903 pub fn is_primitive(&self) -> bool {
904 PRIMITIVE.contains(&self.0)
905 }
906
907 #[must_use]
908 pub fn is_string(&self) -> bool {
909 self.0 == "string"
910 }
911
912 #[must_use]
913 pub fn is_records(&self) -> bool {
914 self.0 == "records"
915 }
916
917 #[must_use]
918 pub fn kind_of_sequence(&self) -> Option<Self> {
919 if self.is_sequence() {
920 Some(Self(&self.0[2..]))
921 } else {
922 None
923 }
924 }
925}
926
927fn as_str<'v>(value: &'v Value, name: &str) -> Result<&'v str> {
928 value[name]
929 .as_str()
930 .ok_or(Error::Message(String::from(name)))
931}
932
933#[cfg(test)]
934mod tests {
935 use std::{any::type_name_of_val, collections::HashMap};
936
937 use serde_json::json;
938
939 use super::*;
940
941 const PRIMITIVES: [&str; 10] = [
942 "bool", "bytes", "float64", "int16", "int32", "int64", "int8", "string", "uint16", "uuid",
943 ];
944
945 #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
946 struct Header {
947 name: String,
948 valid: VersionRange,
949 flexible: VersionRange,
950 fields: Vec<Field>,
951 }
952
953 impl<'a> TryFrom<&Wv<'a>> for Header {
954 type Error = Error;
955
956 fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
957 let fields: &[Value] = value.as_a("fields")?;
958
959 Ok(Header {
960 name: value.as_a("name")?,
961 valid: value.as_a("validVersions")?,
962 flexible: value.as_a("flexibleVersions")?,
963 fields: fields.iter().try_fold(Vec::new(), |mut acc, field| {
964 Field::try_from(&Wv::from(field)).map(|f| {
965 acc.push(f);
966 acc
967 })
968 })?,
969 })
970 }
971 }
972
973 #[test]
974 fn type_mapping_test() {
975 assert_eq!("bytes::Bytes", type_mapping("bytes"));
976 assert_eq!("f64", type_mapping("float64"));
977 assert_eq!("i16", type_mapping("int16"));
978 assert_eq!("i32", type_mapping("int32"));
979 assert_eq!("i64", type_mapping("int64"));
980 assert_eq!("i8", type_mapping("int8"));
981 assert_eq!("String", type_mapping("string"));
982 assert_eq!("u16", type_mapping("uint16"));
983 assert_eq!("[u8; 16]", type_mapping("uuid"));
984
985 assert_eq!("i16", type_mapping("[]int16"));
986 assert_eq!("SomeType", type_mapping("[]SomeType"));
987
988 assert_eq!("SomeType", type_mapping("SomeType"));
989 }
990
991 #[test]
992 fn kind_is_sequence() {
993 for primitive in PRIMITIVES {
994 let k = Kind::new(primitive);
995 assert!(!k.is_sequence());
996 }
997
998 let sequences = vec!["[]int16", "[]string", "[]SomeType"];
999
1000 for sequence in sequences {
1001 let k = Kind::new(sequence);
1002 assert!(k.is_sequence());
1003 }
1004 }
1005
1006 #[test]
1007 fn kind_is_sequence_of_primitive() {
1008 for primitive in PRIMITIVES {
1009 let k = Kind::new(primitive);
1010 assert!(!k.is_sequence_of_primitive());
1011 }
1012
1013 let primitive_sequences: Vec<String> = PRIMITIVES
1014 .iter()
1015 .map(|primitive| format!("[]{primitive}"))
1016 .collect();
1017
1018 for sequence in primitive_sequences {
1019 let k = Kind::new(sequence.as_str());
1020 assert!(k.is_sequence_of_primitive());
1021 }
1022
1023 let sequences = vec!["[]SomeType", "[]OtherType"];
1024
1025 for sequence in sequences {
1026 let k = Kind::new(sequence);
1027 assert!(!k.is_sequence_of_primitive());
1028 }
1029 }
1030
1031 #[test]
1032 fn kind_is_primitive() {
1033 for primitive in PRIMITIVES {
1034 let k = Kind::new(primitive);
1035 assert!(k.is_primitive());
1036 }
1037
1038 let primitive_sequences: Vec<String> = PRIMITIVES
1039 .iter()
1040 .map(|primitive| format!("[]{primitive}"))
1041 .collect();
1042
1043 for sequence in primitive_sequences {
1044 let k = Kind::new(sequence.as_str());
1045 assert!(!k.is_primitive());
1046 }
1047 }
1048
1049 #[test]
1050 fn listener_from_value() -> Result<()> {
1051 assert_eq!(Listener::ZkBroker, Listener::try_from(&json!("zkBroker"))?);
1052
1053 assert_eq!(Listener::Broker, Listener::try_from(&json!("broker"))?);
1054
1055 assert_eq!(
1056 Listener::Controller,
1057 Listener::try_from(&json!("controller"))?
1058 );
1059
1060 Ok(())
1061 }
1062
1063 #[test]
1064 fn message_kind_from_value() -> Result<()> {
1065 assert_eq!(
1066 MessageKind::Request,
1067 MessageKind::try_from(&json!({
1068 "type": "request"
1069 }
1070 ))?
1071 );
1072
1073 assert_eq!(
1074 MessageKind::Response,
1075 MessageKind::try_from(&json!({
1076 "type": "response"
1077 }
1078 ))?
1079 );
1080
1081 Ok(())
1082 }
1083
1084 #[test]
1085 fn version_range_from_str() -> Result<()> {
1086 assert_eq!(
1087 VersionRange { start: 0, end: 0 },
1088 VersionRange::from_str("none")?
1089 );
1090
1091 assert_eq!(
1092 VersionRange {
1093 start: 3,
1094 end: i16::MAX
1095 },
1096 VersionRange::from_str("3+")?
1097 );
1098
1099 assert_eq!(
1100 VersionRange { start: 6, end: 9 },
1101 VersionRange::from_str("6-9")?
1102 );
1103
1104 assert_eq!(
1105 VersionRange { start: 1, end: 1 },
1106 VersionRange::from_str("1")?
1107 );
1108
1109 Ok(())
1110 }
1111
1112 #[test]
1113 fn version_range_within() {
1114 {
1115 let range = VersionRange { start: 0, end: 0 };
1116 assert!(!range.within(i16::MIN));
1117 assert!(range.within(0));
1118 assert!(!range.within(1));
1119 assert!(!range.within(i16::MAX));
1120 }
1121
1122 {
1123 let range = VersionRange {
1124 start: 3,
1125 end: i16::MAX,
1126 };
1127 assert!(!range.within(i16::MIN));
1128 assert!(!range.within(2));
1129 assert!(range.within(3));
1130 assert!(range.within(i16::MAX));
1131 }
1132
1133 {
1134 let range = VersionRange { start: 6, end: 9 };
1135 assert!(!range.within(i16::MIN));
1136 assert!(!range.within(5));
1137 assert!(range.within(6));
1138 assert!(range.within(7));
1139 assert!(range.within(8));
1140 assert!(range.within(9));
1141 assert!(!range.within(10));
1142 assert!(!range.within(i16::MAX));
1143 }
1144 }
1145
1146 #[test]
1147 fn field_from_value() -> Result<()> {
1148 assert_eq!(
1149 Field {
1150 name: String::from("Topics"),
1151 kind: Kind(String::from("[]CreatableTopic")),
1152 about: Some(String::from("The topics to create.")),
1153 versions: VersionRange::from_str("0+")?,
1154 map_key: None,
1155 nullable: None,
1156 tag: None,
1157 tagged: None,
1158 entity_type: None,
1159 default: None,
1160 fields: Some(vec![Field {
1161 name: String::from("Name"),
1162 kind: Kind(String::from("string")),
1163 versions: VersionRange::from_str("0+")?,
1164 map_key: Some(true),
1165 entity_type: Some(String::from("topicName")),
1166 about: Some(String::from("The topic name.")),
1167 default: None,
1168 nullable: None,
1169 tag: None,
1170 tagged: None,
1171 fields: None,
1172 }]),
1173 },
1174 serde_json::from_str::<Value>(
1175 r#"
1176 {
1177 "name": "Topics",
1178 "type": "[]CreatableTopic",
1179 "versions": "0+",
1180 "about": "The topics to create.",
1181 "fields": [
1182 { "name": "Name",
1183 "type": "string",
1184 "versions": "0+",
1185 "mapKey": true,
1186 "entityType": "topicName",
1187 "about": "The topic name."
1188 }]
1189 }
1190 "#
1191 )
1192 .map_err(Into::into)
1193 .and_then(|v| Field::try_from(&Wv::from(&v)))?
1194 );
1195
1196 Ok(())
1197 }
1198
1199 #[allow(clippy::too_many_lines)]
1200 #[test]
1201 fn tagged_field_from_value() -> Result<()> {
1202 assert_eq!(
1203 Field {
1204 name: "SupportedFeatures".into(),
1205 kind: Kind("[]SupportedFeatureKey".into()),
1206 about: Some("Features supported by the broker.".into()),
1207 versions: VersionRange {
1208 start: 3,
1209 end: 32767
1210 },
1211 map_key: None,
1212 nullable: None,
1213 tag: Some(0),
1214 tagged: Some(VersionRange {
1215 start: 3,
1216 end: 32767
1217 }),
1218 entity_type: None,
1219 default: None,
1220 fields: Some(
1221 [
1222 Field {
1223 name: "Name".into(),
1224 kind: Kind("string".into()),
1225 about: Some("The name of the feature.".into()),
1226 versions: VersionRange {
1227 start: 3,
1228 end: 32767
1229 },
1230 map_key: Some(true),
1231 nullable: None,
1232 tag: None,
1233 tagged: None,
1234 entity_type: None,
1235 default: None,
1236 fields: None
1237 },
1238 Field {
1239 name: "MinVersion".into(),
1240 kind: Kind("int16".into()),
1241 about: Some("The minimum supported version for the feature.".into()),
1242 versions: VersionRange {
1243 start: 3,
1244 end: 32767
1245 },
1246 map_key: None,
1247 nullable: None,
1248 tag: None,
1249 tagged: None,
1250 entity_type: None,
1251 default: None,
1252 fields: None
1253 },
1254 Field {
1255 name: "MaxVersion".into(),
1256 kind: Kind("int16".into()),
1257 about: Some("The maximum supported version for the feature.".into()),
1258 versions: VersionRange {
1259 start: 3,
1260 end: 32767
1261 },
1262 map_key: None,
1263 nullable: None,
1264 tag: None,
1265 tagged: None,
1266 entity_type: None,
1267 default: None,
1268 fields: None
1269 }
1270 ]
1271 .into()
1272 )
1273 },
1274 serde_json::from_str::<Value>(
1275 r#"
1276 { "name": "SupportedFeatures",
1277 "type": "[]SupportedFeatureKey",
1278 "ignorable": true,
1279 "versions": "3+",
1280 "tag": 0,
1281 "taggedVersions": "3+",
1282 "about": "Features supported by the broker.",
1283 "fields": [
1284 { "name": "Name",
1285 "type": "string",
1286 "versions": "3+",
1287 "mapKey": true,
1288 "about": "The name of the feature." },
1289 { "name": "MinVersion",
1290 "type": "int16",
1291 "versions": "3+",
1292 "about": "The minimum supported version for the feature." },
1293 { "name": "MaxVersion",
1294 "type": "int16",
1295 "versions": "3+",
1296 "about": "The maximum supported version for the feature." }
1297 ]
1298 }
1299 "#
1300 )
1301 .map_err(Into::into)
1302 .and_then(|v| Field::try_from(&Wv::from(&v)))?
1303 );
1304
1305 Ok(())
1306 }
1307
1308 #[test]
1309 fn untagged_message() -> Result<()> {
1310 let m = serde_json::from_str::<Value>(
1311 r#"
1312 {
1313 "apiKey": 25,
1314 "type": "request",
1315 "listeners": ["zkBroker", "broker"],
1316 "name": "AddOffsetsToTxnRequest",
1317 "validVersions": "0-3",
1318 "flexibleVersions": "3+",
1319 "fields": [
1320 { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
1321 "about": "The transactional id corresponding to the transaction."},
1322 { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
1323 "about": "Current producer id in use by the transactional id." },
1324 { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
1325 "about": "Current epoch associated with the producer id." },
1326 { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
1327 "about": "The unique group identifier." }
1328 ]
1329 }
1330 "#,
1331 ).map_err(Into::into)
1332 .and_then(|v| Message::try_from(&Wv::from(&v)))?;
1333
1334 assert_eq!(MessageKind::Request, m.kind());
1335
1336 assert!(!m.has_tags());
1337
1338 Ok(())
1339 }
1340
1341 #[test]
1342 fn tagged_message() -> Result<()> {
1343 let m = Message::try_from(&Wv::from(&json!(
1344 {
1345 "apiKey": 63,
1346 "type": "request",
1347 "listeners": ["controller"],
1348 "name": "BrokerHeartbeatRequest",
1349 "validVersions": "0-1",
1350 "flexibleVersions": "0+",
1351 "fields": [
1352 { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
1353 "about": "The broker ID." },
1354 { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
1355 "about": "The broker epoch." },
1356 { "name": "CurrentMetadataOffset", "type": "int64", "versions": "0+",
1357 "about": "The highest metadata offset which the broker has reached." },
1358 { "name": "WantFence", "type": "bool", "versions": "0+",
1359 "about": "True if the broker wants to be fenced, false otherwise." },
1360 { "name": "WantShutDown", "type": "bool", "versions": "0+",
1361 "about": "True if the broker wants to be shut down, false otherwise." },
1362 { "name": "OfflineLogDirs", "type": "[]uuid", "versions": "1+", "taggedVersions": "1+", "tag": "0",
1363 "about": "Log directories that failed and went offline." }
1364 ]
1365 }
1366 )))?;
1367
1368 assert_eq!(MessageKind::Request, m.kind());
1369 assert!(m.has_tags());
1370
1371 Ok(())
1372 }
1373
1374 #[allow(clippy::too_many_lines)]
1375 #[test]
1376 fn message_from_value() -> Result<()> {
1377 assert_eq!(
1378 Message {
1379 api_key: 19,
1380 kind: MessageKind::Request,
1381 listeners: Some(vec![
1382 Listener::ZkBroker,
1383 Listener::Broker,
1384 Listener::Controller
1385 ]),
1386 name: String::from("CreateTopicsRequest"),
1387 versions: Version {
1388 valid: VersionRange::from_str("0-7")?,
1389 deprecated: Some(VersionRange::from_str("0-1")?),
1390 flexible: VersionRange::from_str("5+")?,
1391 },
1392 common_structs: Some(vec![CommonStruct {
1393 name: String::from("AddPartitionsToTxnTopic"),
1394 fields: vec![
1395 Field {
1396 name: String::from("Name"),
1397 kind: Kind(String::from("string")),
1398 versions: VersionRange::from_str("0+")?,
1399 map_key: Some(true),
1400 nullable: None,
1401 tag: None,
1402 tagged: None,
1403 default: None,
1404 fields: None,
1405 entity_type: Some(String::from("topicName")),
1406 about: Some(String::from("The name of the topic.")),
1407 },
1408 Field {
1409 name: String::from("Partitions"),
1410 kind: Kind(String::from("[]int32")),
1411 versions: VersionRange::from_str("0+")?,
1412 about: Some(String::from(
1413 "The partition indexes to add to the transaction"
1414 )),
1415 map_key: None,
1416 nullable: None,
1417 tag: None,
1418 tagged: None,
1419 default: None,
1420 fields: None,
1421 entity_type: None,
1422 }
1423 ],
1424 }]),
1425 fields: vec![Field {
1426 name: String::from("Topics"),
1427 kind: Kind(String::from("[]CreatableTopic")),
1428 about: Some(String::from("The topics to create.")),
1429 versions: VersionRange::from_str("0+")?,
1430 map_key: None,
1431 nullable: None,
1432 tag: None,
1433 tagged: None,
1434 entity_type: None,
1435 default: None,
1436 fields: Some(vec![Field {
1437 name: String::from("Name"),
1438 kind: Kind(String::from("string")),
1439 versions: VersionRange::from_str("0+")?,
1440 map_key: Some(true),
1441 entity_type: Some(String::from("topicName")),
1442 about: Some(String::from("The topic name.")),
1443 default: None,
1444 nullable: None,
1445 tag: None,
1446 tagged: None,
1447 fields: None,
1448 }])
1449 }],
1450 },
1451 serde_json::from_str::<Value>(
1452 r#"
1453 {
1454 "apiKey": 19,
1455 "type": "request",
1456 "listeners": ["zkBroker", "broker", "controller"],
1457 "name": "CreateTopicsRequest",
1458 "validVersions": "0-7",
1459 "deprecatedVersions": "0-1",
1460 "flexibleVersions": "5+",
1461 "fields": [
1462 {"name": "Topics",
1463 "type": "[]CreatableTopic",
1464 "versions": "0+",
1465 "about": "The topics to create.",
1466 "fields": [
1467 { "name": "Name",
1468 "type": "string",
1469 "versions": "0+",
1470 "mapKey": true,
1471 "entityType": "topicName",
1472 "about": "The topic name."
1473 }]}],
1474 "commonStructs": [
1475 { "name": "AddPartitionsToTxnTopic",
1476 "versions": "0+",
1477 "fields": [
1478 { "name": "Name",
1479 "type": "string",
1480 "versions": "0+",
1481 "mapKey": true,
1482 "entityType": "topicName",
1483 "about": "The name of the topic."
1484 },
1485 { "name": "Partitions",
1486 "type": "[]int32",
1487 "versions": "0+",
1488 "about": "The partition indexes to add to the transaction"
1489 }]}]
1490 }
1491 "#
1492 )
1493 .map_err(Into::into)
1494 .and_then(|v| Message::try_from(&Wv::from(&v)))?
1495 );
1496
1497 Ok(())
1498 }
1499
1500 #[allow(clippy::too_many_lines)]
1501 #[test]
1502 fn header_from_value() -> Result<()> {
1503 let v = serde_json::from_str::<Value>(
1504 r#"
1505 {
1506 "type": "header",
1507 "name": "RequestHeader",
1508 "validVersions": "0-2",
1509 "flexibleVersions": "2+",
1510 "fields": [
1511 { "name": "RequestApiKey", "type": "int16", "versions": "0+",
1512 "about": "The API key of this request." },
1513 { "name": "RequestApiVersion", "type": "int16", "versions": "0+",
1514 "about": "The API version of this request." },
1515 { "name": "CorrelationId", "type": "int32", "versions": "0+",
1516 "about": "The correlation ID of this request." },
1517
1518 { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true,
1519 "flexibleVersions": "none", "about": "The client ID string." }
1520 ]
1521 }
1522 "#,
1523 )?;
1524
1525 let wv = Wv::from(&v);
1526
1527 assert_eq!(
1528 Header {
1529 name: String::from("RequestHeader"),
1530 valid: VersionRange { start: 0, end: 2 },
1531 flexible: VersionRange {
1532 start: 2,
1533 end: i16::MAX
1534 },
1535 fields: vec![
1536 Field {
1537 name: String::from("RequestApiKey"),
1538 kind: Kind(String::from("int16")),
1539 about: Some(String::from("The API key of this request.")),
1540 versions: VersionRange {
1541 start: 0,
1542 end: 32767
1543 },
1544 map_key: None,
1545 nullable: None,
1546 tag: None,
1547 tagged: None,
1548 entity_type: None,
1549 default: None,
1550 fields: None
1551 },
1552 Field {
1553 name: String::from("RequestApiVersion"),
1554 kind: Kind(String::from("int16")),
1555 about: Some(String::from("The API version of this request.")),
1556 versions: VersionRange {
1557 start: 0,
1558 end: 32767
1559 },
1560 map_key: None,
1561 nullable: None,
1562 tag: None,
1563 tagged: None,
1564 entity_type: None,
1565 default: None,
1566 fields: None
1567 },
1568 Field {
1569 name: String::from("CorrelationId"),
1570 kind: Kind(String::from("int32")),
1571 about: Some(String::from("The correlation ID of this request.")),
1572 versions: VersionRange {
1573 start: 0,
1574 end: 32767
1575 },
1576 map_key: None,
1577 nullable: None,
1578 tag: None,
1579 tagged: None,
1580 entity_type: None,
1581 default: None,
1582 fields: None
1583 },
1584 Field {
1585 name: String::from("ClientId"),
1586 kind: Kind(String::from("string")),
1587 about: Some(String::from("The client ID string.")),
1588 versions: VersionRange {
1589 start: 1,
1590 end: 32767
1591 },
1592 map_key: None,
1593 nullable: Some(VersionRange {
1594 start: 1,
1595 end: 32767
1596 }),
1597 tag: None,
1598 tagged: None,
1599 entity_type: None,
1600 default: None,
1601 fields: None
1602 }
1603 ],
1604 },
1605 Header::try_from(&wv)?
1606 );
1607 Ok(())
1608 }
1609
1610 #[test]
1611 fn parse_expression() -> Result<()> {
1612 let Expr::Array(expression) =
1613 syn::parse_str::<Expr>(r#"[(one, "a/b/c"), (abc, 123), (pqr, a::b::c)]"#)?
1614 else {
1615 return Err(Error::Message(String::from("expecting an array")));
1616 };
1617
1618 let mut mappings = HashMap::new();
1619
1620 for expression in expression.elems {
1621 let Expr::Tuple(tuple) = expression else {
1622 return Err(Error::Message(String::from("expecting a tuple")));
1623 };
1624
1625 assert_eq!(2, tuple.elems.len());
1626
1627 println!(
1628 "i: {}, ty: {}",
1629 tuple.to_token_stream(),
1630 type_name_of_val(&tuple)
1631 );
1632
1633 let Expr::Path(ref lhs) = tuple.elems[0] else {
1634 return Err(Error::Message(String::from(
1635 "lhs expecting a path expression",
1636 )));
1637 };
1638
1639 let Some(lhs) = lhs.path.get_ident() else {
1640 return Err(Error::Message(String::from(
1641 "lhs expecting a path ident expression",
1642 )));
1643 };
1644
1645 _ = mappings.insert(lhs.clone(), tuple.elems[1].clone());
1646
1647 println!("lhs: {}", lhs.to_token_stream());
1648 println!("rhs: {}", tuple.elems[1].to_token_stream());
1649 }
1650
1651 let one = syn::parse_str::<Ident>("one")?;
1652 assert!(mappings.contains_key(&one));
1653
1654 Ok(())
1655 }
1656
1657 #[test]
1658 fn find_coordinator_request() -> Result<()> {
1659 let m = serde_json::from_str::<Value>(
1660 r#"
1661 {
1662 "apiKey": 10,
1663 "type": "request",
1664 "listeners": ["zkBroker", "broker"],
1665 "name": "FindCoordinatorRequest",
1666 "validVersions": "0-4",
1667 "deprecatedVersions": "0",
1668 "flexibleVersions": "3+",
1669 "fields": [
1670 { "name": "Key", "type": "string", "versions": "0-3",
1671 "about": "The coordinator key." },
1672 { "name": "KeyType", "type": "int8", "versions": "1+", "default": "0", "ignorable": false,
1673 "about": "The coordinator key type. (Group, transaction, etc.)" },
1674 { "name": "CoordinatorKeys", "type": "[]string", "versions": "4+",
1675 "about": "The coordinator keys." }
1676 ]
1677 }
1678 "#,
1679 )
1680 .map_err(Into::into)
1681 .and_then(|v| Message::try_from(&Wv::from(&v)))?;
1682
1683 assert!(!m.has_records());
1684 assert_eq!(MessageKind::Request, m.kind());
1685 assert_eq!("FindCoordinatorRequest", m.name());
1686 assert_eq!(
1687 Version {
1688 valid: VersionRange { start: 0, end: 4 },
1689 deprecated: Some(VersionRange { start: 0, end: 0 }),
1690 flexible: VersionRange {
1691 start: 3,
1692 end: i16::MAX
1693 },
1694 },
1695 m.version()
1696 );
1697
1698 assert_eq!("Key", m.fields()[0].name());
1699 assert_eq!(Kind::new("string"), m.fields()[0].kind);
1700 assert_eq!(VersionRange { start: 0, end: 3 }, m.fields()[0].versions());
1701 assert_eq!(Some("The coordinator key."), m.fields()[0].about());
1702
1703 Ok(())
1704 }
1705
1706 #[test]
1707 fn fetch_response() -> Result<()> {
1708 let m = serde_json::from_str::<Value>(
1709 r#"
1710 {
1711 "apiKey": 1,
1712 "type": "response",
1713 "name": "FetchResponse",
1714 "validVersions": "0-16",
1715 "flexibleVersions": "12+",
1716 "fields": [
1717 { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "16+", "tag": 0,
1718 "about": "Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.", "fields": [
1719 { "name": "NodeId", "type": "int32", "versions": "16+",
1720 "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."},
1721 { "name": "Host", "type": "string", "versions": "16+", "about": "The node's hostname." },
1722 { "name": "Port", "type": "int32", "versions": "16+", "about": "The node's port." },
1723 { "name": "Rack", "type": "string", "versions": "16+", "nullableVersions": "16+", "default": "null",
1724 "about": "The rack of the node, or null if it has not been assigned to a rack." }
1725 ]}
1726 ]
1727 }
1728 "#,
1729 )
1730 .map_err(Into::into)
1731 .and_then(|v| Message::try_from(&Wv::from(&v)))?;
1732
1733 assert_eq!(MessageKind::Response, m.kind());
1734 assert_eq!("FetchResponse", m.name());
1735 assert_eq!(
1736 Version {
1737 valid: VersionRange { start: 0, end: 16 },
1738 deprecated: None,
1739 flexible: VersionRange {
1740 start: 12,
1741 end: i16::MAX
1742 },
1743 },
1744 m.version()
1745 );
1746
1747 assert_eq!("NodeEndpoints", m.fields()[0].name());
1748 assert_eq!(Kind::new("[]NodeEndpoint"), m.fields()[0].kind);
1749 assert_eq!(
1750 VersionRange {
1751 start: 16,
1752 end: i16::MAX
1753 },
1754 m.fields()[0].versions()
1755 );
1756
1757 let node_id = &m.fields()[0].fields().unwrap()[0];
1758
1759 assert_eq!("NodeId", node_id.name());
1760 assert_eq!(Kind::new("int32"), node_id.kind);
1761 assert_eq!(
1762 VersionRange {
1763 start: 16,
1764 end: i16::MAX
1765 },
1766 node_id.versions()
1767 );
1768
1769 Ok(())
1770 }
1771}