1pub mod error;
21pub mod wv;
22
23use convert_case::{Case, Casing};
24pub use error::Error;
25use lazy_static::lazy_static;
26use proc_macro2::{Ident, Span, TokenStream};
27use quote::ToTokens;
28use regex::Regex;
29use serde_json::Value;
30use std::{collections::BTreeMap, fmt, str::FromStr};
31use syn::{Expr, Type};
32use tracing::debug;
33use wv::{As, AsOption, Wv};
34
35macro_rules! prefix_crate {
36 ($e:ident) => {
37 format!("{}::{}", env!("CARGO_CRATE_NAME"), stringify!($e))
38 };
39}
40
41macro_rules! with_crate {
42 ($e:expr_2021) => {
43 format!("{}::{:?}", env!("CARGO_CRATE_NAME"), $e)
44 };
45
46 ($e:expr_2021, $f:expr_2021) => {
47 format!("{}::{}::{:?}", env!("CARGO_CRATE_NAME"), $e, $f)
48 };
49}
50
51pub type Result<T, E = Error> = std::result::Result<T, E>;
52
53fn type_mapping(kind: &str) -> String {
54 match kind {
55 "bytes" => String::from("bytes::Bytes"),
56 "float64" => String::from("f64"),
57 "int16" => String::from("i16"),
58 "int32" => String::from("i32"),
59 "int64" => String::from("i64"),
60 "int8" => String::from("i8"),
61 "string" => String::from("String"),
62 "uint16" => String::from("u16"),
63 "uuid" => String::from("[u8; 16]"),
64 "records" => String::from("crate::RecordBatch"),
65
66 sequence if sequence.starts_with("[]") => type_mapping(&sequence[2..]),
67
68 s => String::from(s),
69 }
70}
71
72#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
73pub struct Kind(String);
75
76impl ToTokens for Kind {
77 fn to_tokens(&self, tokens: &mut TokenStream) {
78 let expr = with_crate!(self);
79 syn::parse_str::<Expr>(&expr).unwrap().to_tokens(tokens);
80 }
81}
82
83impl From<Type> for Kind {
84 fn from(value: Type) -> Self {
85 Self(value.to_token_stream().to_string())
86 }
87}
88
89lazy_static! {
90 static ref PRIMITIVE: &'static [&'static str] = &[
91 "bool", "bytes", "float64", "int16", "int32", "int64", "int8", "records", "string",
92 "uint16", "uuid",
93 ];
94}
95
96impl Kind {
97 #[must_use]
98 pub fn new(kind: &str) -> Self {
99 Self(kind.to_owned())
100 }
101
102 #[must_use]
103 pub fn name(&self) -> &str {
104 &self.0[..]
105 }
106
107 #[must_use]
108 #[allow(clippy::missing_panics_doc)]
109 pub fn type_name(&self) -> Type {
111 syn::parse_str::<Type>(&type_mapping(&self.0))
112 .unwrap_or_else(|_| panic!("not a type: {self:?}"))
113 }
114
115 #[must_use]
116 pub fn is_sequence(&self) -> bool {
118 self.0.starts_with("[]")
119 }
120
121 #[must_use]
122 pub fn is_primitive(&self) -> bool {
124 PRIMITIVE.contains(&self.0.as_str())
125 }
126
127 #[must_use]
128 pub fn is_float(&self) -> bool {
130 self.0.eq("float64")
131 }
132
133 #[must_use]
134 pub fn is_sequence_of_primitive(&self) -> bool {
136 self.is_sequence() && PRIMITIVE.contains(&&self.0[2..])
137 }
138
139 #[must_use]
140 pub fn kind_of_sequence(&self) -> Option<Self> {
142 if self.is_sequence() {
143 Some(Self::new(&self.0[2..]))
144 } else {
145 None
146 }
147 }
148}
149
150impl FromStr for Kind {
151 type Err = Error;
152
153 fn from_str(s: &str) -> Result<Self, Self::Err> {
154 Ok(Self(s.to_owned()))
155 }
156}
157
158impl TryFrom<&Value> for Kind {
159 type Error = Error;
160
161 fn try_from(value: &Value) -> Result<Self, Self::Error> {
162 as_str(value, "type").and_then(Self::from_str)
163 }
164}
165
166#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
167pub enum Listener {
169 ZkBroker,
170 #[default]
171 Broker,
172 Controller,
173}
174
175impl TryFrom<&Value> for Listener {
176 type Error = Error;
177
178 fn try_from(value: &Value) -> Result<Self, Self::Error> {
179 value
180 .as_str()
181 .ok_or(Error::Message(String::from(
182 "expecting string for listener",
183 )))
184 .and_then(Self::from_str)
185 }
186}
187
188impl FromStr for Listener {
189 type Err = Error;
190
191 fn from_str(s: &str) -> Result<Self, Self::Err> {
192 match s {
193 "zkBroker" => Ok(Self::ZkBroker),
194 "broker" => Ok(Self::Broker),
195 "controller" => Ok(Self::Controller),
196 s => Err(Error::Message(String::from(s))),
197 }
198 }
199}
200
201#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
202pub enum MessageKind {
204 #[default]
205 Request,
206 Response,
207}
208
209impl ToTokens for MessageKind {
210 fn to_tokens(&self, tokens: &mut TokenStream) {
211 let expr = with_crate!("MessageKind", self);
212 syn::parse_str::<Expr>(&expr).unwrap().to_tokens(tokens);
213 }
214}
215
216impl TryFrom<&Value> for MessageKind {
217 type Error = Error;
218
219 fn try_from(value: &Value) -> Result<Self, Self::Error> {
220 as_str(value, "type").and_then(Self::from_str)
221 }
222}
223
224impl FromStr for MessageKind {
225 type Err = Error;
226
227 fn from_str(s: &str) -> Result<Self, Self::Err> {
228 match s {
229 "request" => Ok(Self::Request),
230 "response" => Ok(Self::Response),
231 s => Err(Error::Message(String::from(s))),
232 }
233 }
234}
235
236#[derive(Clone, Copy, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
237pub struct VersionRange {
239 pub start: i16,
240 pub end: i16,
241}
242
243impl fmt::Debug for VersionRange {
244 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245 f.debug_struct(&prefix_crate!(VersionRange))
246 .field("start", &self.start)
247 .field("end", &self.end)
248 .finish()
249 }
250}
251
252impl VersionRange {
253 #[must_use]
254 pub fn within(&self, version: i16) -> bool {
255 version >= self.start && version <= self.end
256 }
257
258 #[must_use]
259 pub fn is_mandatory(&self, parent: Option<VersionRange>) -> bool {
260 parent.map_or(
261 self.start == 0 && self.end == i16::MAX,
262 |VersionRange { start, end }| {
263 if start > self.start {
264 self.end == end
265 } else {
266 self.start == start && self.end == end
267 }
268 },
269 )
270 }
271}
272
273impl ToTokens for VersionRange {
274 fn to_tokens(&self, tokens: &mut TokenStream) {
275 let expr = format!("{self:?}");
276 syn::parse_str::<Expr>(&expr)
277 .unwrap_or_else(|_| panic!("an expression: {self:?}"))
278 .to_tokens(tokens);
279 }
280}
281
282impl FromStr for VersionRange {
283 type Err = Error;
284
285 fn from_str(s: &str) -> Result<Self, Self::Err> {
286 lazy_static! {
287 static ref RX: Regex =
288 Regex::new(r"^(?<start>\d+)(-(?<end>\d+)|(?<infinite>\+))?$").expect("regex");
289 }
290
291 if s == "none" {
292 Ok(VersionRange { start: 0, end: 0 })
293 } else {
294 RX.captures(s)
295 .ok_or(Error::Message(format!("invalid version range format: {s}")))
296 .and_then(|captures| {
297 let parse = |name, default: i16| {
298 captures
299 .name(name)
300 .map_or(Ok(&default.to_string()[..]), |s| Ok(s.as_str()))
301 .and_then(str::parse)
302 };
303
304 let start = parse("start", 0)?;
305
306 let end = if let Some(end) = captures.name("end") {
307 str::parse(end.as_str())?
308 } else if captures.name("infinite").is_some() {
309 i16::MAX
310 } else {
311 start
312 };
313
314 Ok(VersionRange { start, end })
315 })
316 }
317 }
318}
319
320#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
321pub struct Version {
323 pub valid: VersionRange,
325 pub deprecated: Option<VersionRange>,
327 pub flexible: VersionRange,
329}
330
331impl ToTokens for Version {
332 fn to_tokens(&self, tokens: &mut TokenStream) {
333 let expr = with_crate!(self);
334 syn::parse_str::<Expr>(&expr).unwrap().to_tokens(tokens);
335 }
336}
337
338impl Version {
339 #[must_use]
340 pub fn valid(&self) -> VersionRange {
341 self.valid
342 }
343
344 #[must_use]
345 pub fn deprecated(&self) -> Option<VersionRange> {
346 self.deprecated
347 }
348
349 #[must_use]
350 pub fn flexible(&self) -> VersionRange {
351 self.flexible
352 }
353}
354
355impl<'a> TryFrom<&Wv<'a>> for Version {
356 type Error = Error;
357
358 fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
359 debug!("value: {:?}", value);
360
361 Ok(Self {
362 valid: value.as_a("validVersions")?,
363 deprecated: value.as_option("deprecatedVersions")?,
364 flexible: value.as_a("flexibleVersions")?,
365 })
366 }
367}
368
369#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
370pub struct Field {
372 name: String,
374 kind: Kind,
376 about: Option<String>,
378 versions: VersionRange,
380 map_key: Option<bool>,
382 nullable: Option<VersionRange>,
384 tag: Option<u32>,
386 tagged: Option<VersionRange>,
388 entity_type: Option<String>,
390 default: Option<String>,
392 fields: Option<Vec<Field>>,
394}
395
396impl Field {
397 #[must_use]
398 pub fn ident(&self) -> Ident {
399 match self.name.to_case(Case::Snake) {
400 reserved if is_reserved_keyword(&reserved) => {
401 Ident::new_raw(&reserved, Span::call_site())
402 }
403
404 otherwise => Ident::new(&otherwise, Span::call_site()),
405 }
406 }
407
408 #[must_use]
409 pub fn name(&self) -> &str {
410 &self.name
411 }
412
413 #[must_use]
414 pub fn kind(&self) -> &Kind {
415 &self.kind
416 }
417
418 #[must_use]
419 pub fn fields(&self) -> Option<&[Field]> {
420 self.fields.as_deref()
421 }
422
423 #[must_use]
424 pub fn versions(&self) -> VersionRange {
425 self.versions
426 }
427
428 #[must_use]
429 pub fn nullable(&self) -> Option<VersionRange> {
430 self.nullable
431 }
432
433 #[must_use]
434 pub fn tagged(&self) -> Option<VersionRange> {
435 self.tagged
436 }
437
438 #[must_use]
439 pub fn tag(&self) -> Option<u32> {
440 self.tag
441 }
442
443 #[must_use]
444 pub fn about(&self) -> Option<&str> {
445 self.about.as_deref()
446 }
447
448 #[must_use]
449 pub fn has_tags(&self) -> bool {
450 self.tagged.is_some()
451 }
452
453 #[must_use]
454 pub fn has_records(&self) -> bool {
455 self.kind.0 == "records"
456 || self
457 .fields()
458 .is_some_and(|fields| fields.iter().any(Field::has_records))
459 }
460
461 #[must_use]
462 pub fn has_float(&self) -> bool {
463 self.kind().is_float()
464 || self
465 .fields()
466 .is_some_and(|fields| fields.iter().any(Field::has_float))
467 }
468}
469
470impl<'a> TryFrom<&Wv<'a>> for Field {
471 type Error = Error;
472
473 fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
474 let name = value.as_a("name")?;
475 let kind = value.as_a("type")?;
476 let about = value.as_option("about")?;
477 let versions = value.as_a("versions")?;
478 let map_key = value.as_option("mapKey")?;
479 let nullable = value.as_option("nullableVersions")?;
480 let tag = value.as_option("tag")?;
481 let tagged = value.as_option("taggedVersions")?;
482 let entity_type = value.as_option("entityType")?;
483 let default = value.as_option("default")?;
484
485 let fields = value
486 .as_option("fields")?
487 .map_or(Ok(None), |values: &[Value]| {
488 values
489 .iter()
490 .try_fold(Vec::new(), |mut acc, field| {
491 Field::try_from(&Wv::from(field)).map(|f| {
492 acc.push(f);
493 acc
494 })
495 })
496 .map(Some)
497 })?;
498
499 Ok(Field {
500 name,
501 kind,
502 about,
503 versions,
504 map_key,
505 nullable,
506 tag,
507 tagged,
508 entity_type,
509 default,
510 fields,
511 })
512 }
513}
514
515fn is_reserved_keyword(s: &str) -> bool {
516 lazy_static! {
517 static ref RESERVED: Vec<&'static str> = vec!["as",
518 "break",
519 "const",
520 "continue",
521 "crate",
522 "else",
523 "enum",
524 "extern",
525 "false",
526 "fn",
527 "for",
528 "if",
529 "impl",
530 "in",
531 "let",
532 "loop",
533 "match",
534 "mod",
535 "move",
536 "mut",
537 "pub",
538 "ref",
539 "return",
540 "self",
541 "Self",
542 "static",
543 "struct",
544 "super",
545 "trait",
546 "true",
547 "type",
548 "unsafe",
549 "use",
550 "where",
551 "while",
552
553 "async",
555 "await",
556 "dyn",
557
558 "abstract",
560 "become",
561 "box",
562 "do",
563 "final",
564 "macro",
565 "override",
566 "priv",
567 "typeof",
568 "unsized",
569 "virtual",
570 "yield",
571
572 "try",
574 ];
575 }
576
577 RESERVED.contains(&s)
578}
579
580#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
581struct Header {
582 name: String,
583 valid: VersionRange,
584 flexible: VersionRange,
585 fields: Vec<Field>,
586}
587
588impl<'a> TryFrom<&Wv<'a>> for Header {
589 type Error = Error;
590
591 fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
592 let fields: &[Value] = value.as_a("fields")?;
593
594 Ok(Header {
595 name: value.as_a("name")?,
596 valid: value.as_a("validVersions")?,
597 flexible: value.as_a("flexibleVersions")?,
598 fields: fields.iter().try_fold(Vec::new(), |mut acc, field| {
599 Field::try_from(&Wv::from(field)).map(|f| {
600 acc.push(f);
601 acc
602 })
603 })?,
604 })
605 }
606}
607
608#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
609pub struct Message {
610 api_key: i16,
611 kind: MessageKind,
612 listeners: Option<Vec<Listener>>,
613 name: String,
614 versions: Version,
615 fields: Vec<Field>,
616 common_structs: Option<Vec<CommonStruct>>,
617}
618
619impl Message {
620 #[must_use]
621 pub fn api_key(&self) -> i16 {
622 self.api_key
623 }
624
625 #[must_use]
626 pub fn kind(&self) -> MessageKind {
627 self.kind
628 }
629
630 #[must_use]
631 pub fn name(&self) -> &str {
632 &self.name
633 }
634
635 #[must_use]
636 #[allow(clippy::missing_panics_doc)]
637 pub fn type_name(&self) -> Type {
638 syn::parse_str::<Type>(&self.name).unwrap()
639 }
640
641 #[must_use]
642 pub fn version(&self) -> Version {
643 self.versions
644 }
645
646 #[must_use]
647 pub fn listeners(&self) -> Option<&[Listener]> {
648 self.listeners.as_deref()
649 }
650
651 #[must_use]
652 pub fn fields(&self) -> &[Field] {
653 &self.fields
654 }
655
656 #[must_use]
657 #[allow(clippy::missing_panics_doc)]
658 pub fn wrapper_new_type(&self, field: &Field) -> Type {
659 syn::parse_str::<Type>(&format!("{}{}", self.name, field.name).to_case(Case::Pascal))
660 .unwrap()
661 }
662
663 #[must_use]
664 pub fn common_structs(&self) -> Option<&[CommonStruct]> {
665 self.common_structs.as_deref()
666 }
667
668 #[must_use]
669 pub fn has_records(&self) -> bool {
670 self.fields().iter().any(Field::has_records)
671 }
672
673 #[must_use]
674 pub fn has_tags(&self) -> bool {
675 self.fields().iter().any(Field::has_tags)
676 }
677
678 #[must_use]
679 pub fn has_float(&self) -> bool {
680 self.fields.iter().any(|field| field.kind().is_float())
681 || self
682 .common_structs()
683 .is_some_and(|structures| structures.iter().any(CommonStruct::has_float))
684 }
685}
686
687impl<'a> TryFrom<&Wv<'a>> for Message {
688 type Error = Error;
689
690 fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
691 let api_key = value.as_a("apiKey")?;
692 let name = value.as_a("name")?;
693 let kind = value.as_a("type")?;
694 let listeners = value
695 .as_option("listeners")?
696 .map_or(Ok(None), |maybe: &[Value]| {
697 maybe
698 .iter()
699 .try_fold(Vec::new(), |mut acc, listener| {
700 Listener::try_from(listener).map(|l| {
701 acc.push(l);
702 acc
703 })
704 })
705 .map(Some)
706 })?;
707
708 let fields = value.as_a("fields").and_then(|fields: &[Value]| {
709 fields.iter().try_fold(Vec::new(), |mut acc, field| {
710 Field::try_from(&Wv::from(field)).map(|f| {
711 acc.push(f);
712 acc
713 })
714 })
715 })?;
716
717 let versions = Version::try_from(value)?;
718 let common_structs =
719 value
720 .as_option("commonStructs")?
721 .map_or(Ok(None), |maybe: &[Value]| {
722 maybe
723 .iter()
724 .try_fold(Vec::new(), |mut acc, value| {
725 CommonStruct::try_from(&Wv::from(value)).map(|field| {
726 acc.push(field);
727 acc
728 })
729 })
730 .map(Some)
731 })?;
732
733 Ok(Self {
734 api_key,
735 kind,
736 listeners,
737 name,
738 versions,
739 fields,
740 common_structs,
741 })
742 }
743}
744
745#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
746pub struct CommonStruct {
747 name: String,
748 fields: Vec<Field>,
749}
750
751impl CommonStruct {
752 #[must_use]
753 pub fn name(&self) -> &str {
754 self.name.as_str()
755 }
756
757 #[must_use]
758 #[allow(clippy::missing_panics_doc)]
759 pub fn type_name(&self) -> Type {
760 syn::parse_str::<Type>(&self.name).unwrap_or_else(|_| panic!("not a type: {self:?}"))
761 }
762
763 #[must_use]
764 pub fn fields(&self) -> &Vec<Field> {
765 &self.fields
766 }
767
768 #[must_use]
769 pub fn has_float(&self) -> bool {
770 self.fields.iter().any(|field| field.kind().is_float())
771 }
772}
773
774impl<'a> TryFrom<&Wv<'a>> for CommonStruct {
775 type Error = Error;
776
777 fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
778 let name = value.as_a("name")?;
779 let fields = value.as_a("fields").and_then(|fields: &[Value]| {
780 fields.iter().try_fold(Vec::new(), |mut acc, field| {
781 Field::try_from(&Wv::from(field)).map(|f| {
782 acc.push(f);
783 acc
784 })
785 })
786 })?;
787
788 Ok(Self { name, fields })
789 }
790}
791
792#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
793pub struct HeaderMeta {
794 pub name: &'static str,
795 pub valid: VersionRange,
796 pub flexible: VersionRange,
797 pub fields: &'static [(&'static str, &'static FieldMeta)],
798}
799
800#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
801pub struct MessageMeta {
803 pub name: &'static str,
805 pub api_key: i16,
807 pub version: Version,
809 pub message_kind: MessageKind,
811 pub fields: &'static [(&'static str, &'static FieldMeta)],
813}
814
815impl MessageMeta {
816 #[must_use]
817 pub fn is_flexible(&self, version: i16) -> bool {
818 self.version.flexible.within(version)
819 }
820
821 #[must_use]
822 pub fn structures(&self) -> BTreeMap<&str, &FieldMeta> {
823 self.fields.iter().filter(|(_, fm)| fm.is_structure()).fold(
824 BTreeMap::new(),
825 |mut acc, (name, fm)| {
826 debug!(name = self.name, field = ?name, kind = ?fm.kind.0);
827
828 if let Some(kind) = fm.kind.kind_of_sequence() {
829 if !kind.is_primitive() {
830 _ = acc.insert(kind.name(), fm);
831 }
832 } else {
833 _ = acc.insert(fm.kind.name(), fm);
834 }
835
836 let mut children = fm.structures();
837 debug!(name = self.name, field = ?name, children = ?children.keys().collect::<Vec<_>>());
838 acc.append(&mut children);
839
840 acc
841 },
842 )
843 }
844
845 #[must_use]
846 pub fn field(&self, name: &str) -> Option<&'static FieldMeta> {
847 self.fields
848 .iter()
849 .find(|(found, _)| name == *found)
850 .map(|(_, meta)| *meta)
851 }
852}
853
854#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
855pub struct FieldMeta {
857 pub version: VersionRange,
859 pub nullable: Option<VersionRange>,
861 pub kind: KindMeta,
863 pub tag: Option<u32>,
865 pub tagged: Option<VersionRange>,
867 pub fields: &'static [(&'static str, &'static FieldMeta)],
869}
870
871impl FieldMeta {
872 #[must_use]
873 pub fn is_nullable(&self, version: i16) -> bool {
874 self.nullable.is_some_and(|range| range.within(version))
875 }
876
877 #[must_use]
878 pub fn is_mandatory(&self, parent: Option<VersionRange>) -> bool {
879 self.version.is_mandatory(parent)
880 }
881
882 #[must_use]
883 pub fn is_structure(&self) -> bool {
884 self.kind
885 .kind_of_sequence()
886 .is_some_and(|sk| !sk.is_primitive())
887 || !self.fields.is_empty()
888 }
889
890 #[must_use]
891 pub fn structures(&self) -> BTreeMap<&str, &FieldMeta> {
892 self.fields.iter().filter(|(_, fm)| fm.is_structure()).fold(
893 BTreeMap::new(),
894 |mut acc, (_name, fm)| {
895 if let Some(kind) = fm.kind.kind_of_sequence() {
896 if !kind.is_primitive() {
897 _ = acc.insert(kind.name(), fm);
898 }
899 }
900
901 let mut children = fm.structures();
902 acc.append(&mut children);
903 acc
904 },
905 )
906 }
907
908 pub fn field(&self, name: &str) -> Option<&FieldMeta> {
909 self.fields
910 .iter()
911 .find(|field| name == field.0)
912 .map(|(_, meta)| *meta)
913 }
914}
915
916#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
917pub struct KindMeta(pub &'static str);
918
919impl KindMeta {
920 #[must_use]
921 pub fn name(&self) -> &'static str {
922 self.0
923 }
924
925 #[must_use]
926 pub fn is_sequence(&self) -> bool {
927 self.0.starts_with("[]")
928 }
929
930 #[must_use]
931 pub fn is_primitive(&self) -> bool {
932 PRIMITIVE.contains(&self.0)
933 }
934
935 #[must_use]
936 pub fn is_string(&self) -> bool {
937 self.0 == "string"
938 }
939
940 #[must_use]
941 pub fn is_records(&self) -> bool {
942 self.0 == "records"
943 }
944
945 #[must_use]
946 pub fn kind_of_sequence(&self) -> Option<Self> {
947 if self.is_sequence() {
948 Some(Self(&self.0[2..]))
949 } else {
950 None
951 }
952 }
953}
954
955fn as_str<'v>(value: &'v Value, name: &str) -> Result<&'v str> {
956 value[name]
957 .as_str()
958 .ok_or(Error::Message(String::from(name)))
959}
960
961#[cfg(test)]
962mod tests {
963 use std::{any::type_name_of_val, collections::HashMap};
964
965 use serde_json::json;
966
967 use super::*;
968
969 const PRIMITIVES: [&str; 10] = [
970 "bool", "bytes", "float64", "int16", "int32", "int64", "int8", "string", "uint16", "uuid",
971 ];
972
973 #[test]
974 fn type_mapping_test() {
975 assert_eq!("bytes::Bytes", type_mapping("bytes"));
976 assert_eq!("f64", type_mapping("float64"));
977 assert_eq!("i16", type_mapping("int16"));
978 assert_eq!("i32", type_mapping("int32"));
979 assert_eq!("i64", type_mapping("int64"));
980 assert_eq!("i8", type_mapping("int8"));
981 assert_eq!("String", type_mapping("string"));
982 assert_eq!("u16", type_mapping("uint16"));
983 assert_eq!("[u8; 16]", type_mapping("uuid"));
984
985 assert_eq!("i16", type_mapping("[]int16"));
986 assert_eq!("SomeType", type_mapping("[]SomeType"));
987
988 assert_eq!("SomeType", type_mapping("SomeType"));
989 }
990
991 #[test]
992 fn kind_is_sequence() {
993 for primitive in PRIMITIVES {
994 let k = Kind::new(primitive);
995 assert!(!k.is_sequence());
996 }
997
998 let sequences = vec!["[]int16", "[]string", "[]SomeType"];
999
1000 for sequence in sequences {
1001 let k = Kind::new(sequence);
1002 assert!(k.is_sequence());
1003 }
1004 }
1005
1006 #[test]
1007 fn kind_is_sequence_of_primitive() {
1008 for primitive in PRIMITIVES {
1009 let k = Kind::new(primitive);
1010 assert!(!k.is_sequence_of_primitive());
1011 }
1012
1013 let primitive_sequences: Vec<String> = PRIMITIVES
1014 .iter()
1015 .map(|primitive| format!("[]{primitive}"))
1016 .collect();
1017
1018 for sequence in primitive_sequences {
1019 let k = Kind::new(sequence.as_str());
1020 assert!(k.is_sequence_of_primitive());
1021 }
1022
1023 let sequences = vec!["[]SomeType", "[]OtherType"];
1024
1025 for sequence in sequences {
1026 let k = Kind::new(sequence);
1027 assert!(!k.is_sequence_of_primitive());
1028 }
1029 }
1030
1031 #[test]
1032 fn kind_is_primitive() {
1033 for primitive in PRIMITIVES {
1034 let k = Kind::new(primitive);
1035 assert!(k.is_primitive());
1036 }
1037
1038 let primitive_sequences: Vec<String> = PRIMITIVES
1039 .iter()
1040 .map(|primitive| format!("[]{primitive}"))
1041 .collect();
1042
1043 for sequence in primitive_sequences {
1044 let k = Kind::new(sequence.as_str());
1045 assert!(!k.is_primitive());
1046 }
1047 }
1048
1049 #[test]
1050 fn listener_from_value() -> Result<()> {
1051 assert_eq!(Listener::ZkBroker, Listener::try_from(&json!("zkBroker"))?);
1052
1053 assert_eq!(Listener::Broker, Listener::try_from(&json!("broker"))?);
1054
1055 assert_eq!(
1056 Listener::Controller,
1057 Listener::try_from(&json!("controller"))?
1058 );
1059
1060 Ok(())
1061 }
1062
1063 #[test]
1064 fn message_kind_from_value() -> Result<()> {
1065 assert_eq!(
1066 MessageKind::Request,
1067 MessageKind::try_from(&json!({
1068 "type": "request"
1069 }
1070 ))?
1071 );
1072
1073 assert_eq!(
1074 MessageKind::Response,
1075 MessageKind::try_from(&json!({
1076 "type": "response"
1077 }
1078 ))?
1079 );
1080
1081 Ok(())
1082 }
1083
1084 #[test]
1085 fn version_range_from_str() -> Result<()> {
1086 assert_eq!(
1087 VersionRange { start: 0, end: 0 },
1088 VersionRange::from_str("none")?
1089 );
1090
1091 assert_eq!(
1092 VersionRange {
1093 start: 3,
1094 end: i16::MAX
1095 },
1096 VersionRange::from_str("3+")?
1097 );
1098
1099 assert_eq!(
1100 VersionRange { start: 6, end: 9 },
1101 VersionRange::from_str("6-9")?
1102 );
1103
1104 assert_eq!(
1105 VersionRange { start: 1, end: 1 },
1106 VersionRange::from_str("1")?
1107 );
1108
1109 Ok(())
1110 }
1111
1112 #[test]
1113 fn version_range_within() {
1114 {
1115 let range = VersionRange { start: 0, end: 0 };
1116 assert!(!range.within(i16::MIN));
1117 assert!(range.within(0));
1118 assert!(!range.within(1));
1119 assert!(!range.within(i16::MAX));
1120 }
1121
1122 {
1123 let range = VersionRange {
1124 start: 3,
1125 end: i16::MAX,
1126 };
1127 assert!(!range.within(i16::MIN));
1128 assert!(!range.within(2));
1129 assert!(range.within(3));
1130 assert!(range.within(i16::MAX));
1131 }
1132
1133 {
1134 let range = VersionRange { start: 6, end: 9 };
1135 assert!(!range.within(i16::MIN));
1136 assert!(!range.within(5));
1137 assert!(range.within(6));
1138 assert!(range.within(7));
1139 assert!(range.within(8));
1140 assert!(range.within(9));
1141 assert!(!range.within(10));
1142 assert!(!range.within(i16::MAX));
1143 }
1144 }
1145
1146 #[test]
1147 fn field_from_value() -> Result<()> {
1148 assert_eq!(
1149 Field {
1150 name: String::from("Topics"),
1151 kind: Kind(String::from("[]CreatableTopic")),
1152 about: Some(String::from("The topics to create.")),
1153 versions: VersionRange::from_str("0+")?,
1154 map_key: None,
1155 nullable: None,
1156 tag: None,
1157 tagged: None,
1158 entity_type: None,
1159 default: None,
1160 fields: Some(vec![Field {
1161 name: String::from("Name"),
1162 kind: Kind(String::from("string")),
1163 versions: VersionRange::from_str("0+")?,
1164 map_key: Some(true),
1165 entity_type: Some(String::from("topicName")),
1166 about: Some(String::from("The topic name.")),
1167 default: None,
1168 nullable: None,
1169 tag: None,
1170 tagged: None,
1171 fields: None,
1172 }]),
1173 },
1174 serde_json::from_str::<Value>(
1175 r#"
1176 {
1177 "name": "Topics",
1178 "type": "[]CreatableTopic",
1179 "versions": "0+",
1180 "about": "The topics to create.",
1181 "fields": [
1182 { "name": "Name",
1183 "type": "string",
1184 "versions": "0+",
1185 "mapKey": true,
1186 "entityType": "topicName",
1187 "about": "The topic name."
1188 }]
1189 }
1190 "#
1191 )
1192 .map_err(Into::into)
1193 .and_then(|v| Field::try_from(&Wv::from(&v)))?
1194 );
1195
1196 Ok(())
1197 }
1198
1199 #[allow(clippy::too_many_lines)]
1200 #[test]
1201 fn tagged_field_from_value() -> Result<()> {
1202 assert_eq!(
1203 Field {
1204 name: "SupportedFeatures".into(),
1205 kind: Kind("[]SupportedFeatureKey".into()),
1206 about: Some("Features supported by the broker.".into()),
1207 versions: VersionRange {
1208 start: 3,
1209 end: 32767
1210 },
1211 map_key: None,
1212 nullable: None,
1213 tag: Some(0),
1214 tagged: Some(VersionRange {
1215 start: 3,
1216 end: 32767
1217 }),
1218 entity_type: None,
1219 default: None,
1220 fields: Some(
1221 [
1222 Field {
1223 name: "Name".into(),
1224 kind: Kind("string".into()),
1225 about: Some("The name of the feature.".into()),
1226 versions: VersionRange {
1227 start: 3,
1228 end: 32767
1229 },
1230 map_key: Some(true),
1231 nullable: None,
1232 tag: None,
1233 tagged: None,
1234 entity_type: None,
1235 default: None,
1236 fields: None
1237 },
1238 Field {
1239 name: "MinVersion".into(),
1240 kind: Kind("int16".into()),
1241 about: Some("The minimum supported version for the feature.".into()),
1242 versions: VersionRange {
1243 start: 3,
1244 end: 32767
1245 },
1246 map_key: None,
1247 nullable: None,
1248 tag: None,
1249 tagged: None,
1250 entity_type: None,
1251 default: None,
1252 fields: None
1253 },
1254 Field {
1255 name: "MaxVersion".into(),
1256 kind: Kind("int16".into()),
1257 about: Some("The maximum supported version for the feature.".into()),
1258 versions: VersionRange {
1259 start: 3,
1260 end: 32767
1261 },
1262 map_key: None,
1263 nullable: None,
1264 tag: None,
1265 tagged: None,
1266 entity_type: None,
1267 default: None,
1268 fields: None
1269 }
1270 ]
1271 .into()
1272 )
1273 },
1274 serde_json::from_str::<Value>(
1275 r#"
1276 { "name": "SupportedFeatures",
1277 "type": "[]SupportedFeatureKey",
1278 "ignorable": true,
1279 "versions": "3+",
1280 "tag": 0,
1281 "taggedVersions": "3+",
1282 "about": "Features supported by the broker.",
1283 "fields": [
1284 { "name": "Name",
1285 "type": "string",
1286 "versions": "3+",
1287 "mapKey": true,
1288 "about": "The name of the feature." },
1289 { "name": "MinVersion",
1290 "type": "int16",
1291 "versions": "3+",
1292 "about": "The minimum supported version for the feature." },
1293 { "name": "MaxVersion",
1294 "type": "int16",
1295 "versions": "3+",
1296 "about": "The maximum supported version for the feature." }
1297 ]
1298 }
1299 "#
1300 )
1301 .map_err(Into::into)
1302 .and_then(|v| Field::try_from(&Wv::from(&v)))?
1303 );
1304
1305 Ok(())
1306 }
1307
1308 #[test]
1309 fn untagged_message() -> Result<()> {
1310 let m = serde_json::from_str::<Value>(
1311 r#"
1312 {
1313 "apiKey": 25,
1314 "type": "request",
1315 "listeners": ["zkBroker", "broker"],
1316 "name": "AddOffsetsToTxnRequest",
1317 "validVersions": "0-3",
1318 "flexibleVersions": "3+",
1319 "fields": [
1320 { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
1321 "about": "The transactional id corresponding to the transaction."},
1322 { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
1323 "about": "Current producer id in use by the transactional id." },
1324 { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
1325 "about": "Current epoch associated with the producer id." },
1326 { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
1327 "about": "The unique group identifier." }
1328 ]
1329 }
1330 "#,
1331 ).map_err(Into::into)
1332 .and_then(|v| Message::try_from(&Wv::from(&v)))?;
1333
1334 assert_eq!(MessageKind::Request, m.kind());
1335
1336 assert!(!m.has_tags());
1337
1338 Ok(())
1339 }
1340
1341 #[test]
1342 fn tagged_message() -> Result<()> {
1343 let m = Message::try_from(&Wv::from(&json!(
1344 {
1345 "apiKey": 63,
1346 "type": "request",
1347 "listeners": ["controller"],
1348 "name": "BrokerHeartbeatRequest",
1349 "validVersions": "0-1",
1350 "flexibleVersions": "0+",
1351 "fields": [
1352 { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
1353 "about": "The broker ID." },
1354 { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
1355 "about": "The broker epoch." },
1356 { "name": "CurrentMetadataOffset", "type": "int64", "versions": "0+",
1357 "about": "The highest metadata offset which the broker has reached." },
1358 { "name": "WantFence", "type": "bool", "versions": "0+",
1359 "about": "True if the broker wants to be fenced, false otherwise." },
1360 { "name": "WantShutDown", "type": "bool", "versions": "0+",
1361 "about": "True if the broker wants to be shut down, false otherwise." },
1362 { "name": "OfflineLogDirs", "type": "[]uuid", "versions": "1+", "taggedVersions": "1+", "tag": "0",
1363 "about": "Log directories that failed and went offline." }
1364 ]
1365 }
1366 )))?;
1367
1368 assert_eq!(MessageKind::Request, m.kind());
1369 assert!(m.has_tags());
1370
1371 Ok(())
1372 }
1373
1374 #[allow(clippy::too_many_lines)]
1375 #[test]
1376 fn message_from_value() -> Result<()> {
1377 assert_eq!(
1378 Message {
1379 api_key: 19,
1380 kind: MessageKind::Request,
1381 listeners: Some(vec![
1382 Listener::ZkBroker,
1383 Listener::Broker,
1384 Listener::Controller
1385 ]),
1386 name: String::from("CreateTopicsRequest"),
1387 versions: Version {
1388 valid: VersionRange::from_str("0-7")?,
1389 deprecated: Some(VersionRange::from_str("0-1")?),
1390 flexible: VersionRange::from_str("5+")?,
1391 },
1392 common_structs: Some(vec![CommonStruct {
1393 name: String::from("AddPartitionsToTxnTopic"),
1394 fields: vec![
1395 Field {
1396 name: String::from("Name"),
1397 kind: Kind(String::from("string")),
1398 versions: VersionRange::from_str("0+")?,
1399 map_key: Some(true),
1400 nullable: None,
1401 tag: None,
1402 tagged: None,
1403 default: None,
1404 fields: None,
1405 entity_type: Some(String::from("topicName")),
1406 about: Some(String::from("The name of the topic.")),
1407 },
1408 Field {
1409 name: String::from("Partitions"),
1410 kind: Kind(String::from("[]int32")),
1411 versions: VersionRange::from_str("0+")?,
1412 about: Some(String::from(
1413 "The partition indexes to add to the transaction"
1414 )),
1415 map_key: None,
1416 nullable: None,
1417 tag: None,
1418 tagged: None,
1419 default: None,
1420 fields: None,
1421 entity_type: None,
1422 }
1423 ],
1424 }]),
1425 fields: vec![Field {
1426 name: String::from("Topics"),
1427 kind: Kind(String::from("[]CreatableTopic")),
1428 about: Some(String::from("The topics to create.")),
1429 versions: VersionRange::from_str("0+")?,
1430 map_key: None,
1431 nullable: None,
1432 tag: None,
1433 tagged: None,
1434 entity_type: None,
1435 default: None,
1436 fields: Some(vec![Field {
1437 name: String::from("Name"),
1438 kind: Kind(String::from("string")),
1439 versions: VersionRange::from_str("0+")?,
1440 map_key: Some(true),
1441 entity_type: Some(String::from("topicName")),
1442 about: Some(String::from("The topic name.")),
1443 default: None,
1444 nullable: None,
1445 tag: None,
1446 tagged: None,
1447 fields: None,
1448 }])
1449 }],
1450 },
1451 serde_json::from_str::<Value>(
1452 r#"
1453 {
1454 "apiKey": 19,
1455 "type": "request",
1456 "listeners": ["zkBroker", "broker", "controller"],
1457 "name": "CreateTopicsRequest",
1458 "validVersions": "0-7",
1459 "deprecatedVersions": "0-1",
1460 "flexibleVersions": "5+",
1461 "fields": [
1462 {"name": "Topics",
1463 "type": "[]CreatableTopic",
1464 "versions": "0+",
1465 "about": "The topics to create.",
1466 "fields": [
1467 { "name": "Name",
1468 "type": "string",
1469 "versions": "0+",
1470 "mapKey": true,
1471 "entityType": "topicName",
1472 "about": "The topic name."
1473 }]}],
1474 "commonStructs": [
1475 { "name": "AddPartitionsToTxnTopic",
1476 "versions": "0+",
1477 "fields": [
1478 { "name": "Name",
1479 "type": "string",
1480 "versions": "0+",
1481 "mapKey": true,
1482 "entityType": "topicName",
1483 "about": "The name of the topic."
1484 },
1485 { "name": "Partitions",
1486 "type": "[]int32",
1487 "versions": "0+",
1488 "about": "The partition indexes to add to the transaction"
1489 }]}]
1490 }
1491 "#
1492 )
1493 .map_err(Into::into)
1494 .and_then(|v| Message::try_from(&Wv::from(&v)))?
1495 );
1496
1497 Ok(())
1498 }
1499
1500 #[allow(clippy::too_many_lines)]
1501 #[test]
1502 fn header_from_value() -> Result<()> {
1503 let v = serde_json::from_str::<Value>(
1504 r#"
1505 {
1506 "type": "header",
1507 "name": "RequestHeader",
1508 "validVersions": "0-2",
1509 "flexibleVersions": "2+",
1510 "fields": [
1511 { "name": "RequestApiKey", "type": "int16", "versions": "0+",
1512 "about": "The API key of this request." },
1513 { "name": "RequestApiVersion", "type": "int16", "versions": "0+",
1514 "about": "The API version of this request." },
1515 { "name": "CorrelationId", "type": "int32", "versions": "0+",
1516 "about": "The correlation ID of this request." },
1517
1518 { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true,
1519 "flexibleVersions": "none", "about": "The client ID string." }
1520 ]
1521 }
1522 "#,
1523 )?;
1524
1525 let wv = Wv::from(&v);
1526
1527 assert_eq!(
1528 Header {
1529 name: String::from("RequestHeader"),
1530 valid: VersionRange { start: 0, end: 2 },
1531 flexible: VersionRange {
1532 start: 2,
1533 end: i16::MAX
1534 },
1535 fields: vec![
1536 Field {
1537 name: String::from("RequestApiKey"),
1538 kind: Kind(String::from("int16")),
1539 about: Some(String::from("The API key of this request.")),
1540 versions: VersionRange {
1541 start: 0,
1542 end: 32767
1543 },
1544 map_key: None,
1545 nullable: None,
1546 tag: None,
1547 tagged: None,
1548 entity_type: None,
1549 default: None,
1550 fields: None
1551 },
1552 Field {
1553 name: String::from("RequestApiVersion"),
1554 kind: Kind(String::from("int16")),
1555 about: Some(String::from("The API version of this request.")),
1556 versions: VersionRange {
1557 start: 0,
1558 end: 32767
1559 },
1560 map_key: None,
1561 nullable: None,
1562 tag: None,
1563 tagged: None,
1564 entity_type: None,
1565 default: None,
1566 fields: None
1567 },
1568 Field {
1569 name: String::from("CorrelationId"),
1570 kind: Kind(String::from("int32")),
1571 about: Some(String::from("The correlation ID of this request.")),
1572 versions: VersionRange {
1573 start: 0,
1574 end: 32767
1575 },
1576 map_key: None,
1577 nullable: None,
1578 tag: None,
1579 tagged: None,
1580 entity_type: None,
1581 default: None,
1582 fields: None
1583 },
1584 Field {
1585 name: String::from("ClientId"),
1586 kind: Kind(String::from("string")),
1587 about: Some(String::from("The client ID string.")),
1588 versions: VersionRange {
1589 start: 1,
1590 end: 32767
1591 },
1592 map_key: None,
1593 nullable: Some(VersionRange {
1594 start: 1,
1595 end: 32767
1596 }),
1597 tag: None,
1598 tagged: None,
1599 entity_type: None,
1600 default: None,
1601 fields: None
1602 }
1603 ],
1604 },
1605 Header::try_from(&wv)?
1606 );
1607 Ok(())
1608 }
1609
1610 #[test]
1611 fn parse_expression() -> Result<()> {
1612 let Expr::Array(expression) =
1613 syn::parse_str::<Expr>(r#"[(one, "a/b/c"), (abc, 123), (pqr, a::b::c)]"#)?
1614 else {
1615 return Err(Error::Message(String::from("expecting an array")));
1616 };
1617
1618 let mut mappings = HashMap::new();
1619
1620 for expression in expression.elems {
1621 let Expr::Tuple(tuple) = expression else {
1622 return Err(Error::Message(String::from("expecting a tuple")));
1623 };
1624
1625 assert_eq!(2, tuple.elems.len());
1626
1627 println!(
1628 "i: {}, ty: {}",
1629 tuple.to_token_stream(),
1630 type_name_of_val(&tuple)
1631 );
1632
1633 let Expr::Path(ref lhs) = tuple.elems[0] else {
1634 return Err(Error::Message(String::from(
1635 "lhs expecting a path expression",
1636 )));
1637 };
1638
1639 let Some(lhs) = lhs.path.get_ident() else {
1640 return Err(Error::Message(String::from(
1641 "lhs expecting a path ident expression",
1642 )));
1643 };
1644
1645 _ = mappings.insert(lhs.clone(), tuple.elems[1].clone());
1646
1647 println!("lhs: {}", lhs.to_token_stream());
1648 println!("rhs: {}", tuple.elems[1].to_token_stream());
1649 }
1650
1651 let one = syn::parse_str::<Ident>("one")?;
1652 assert!(mappings.contains_key(&one));
1653
1654 Ok(())
1655 }
1656
1657 #[test]
1658 fn find_coordinator_request() -> Result<()> {
1659 let m = serde_json::from_str::<Value>(
1660 r#"
1661 {
1662 "apiKey": 10,
1663 "type": "request",
1664 "listeners": ["zkBroker", "broker"],
1665 "name": "FindCoordinatorRequest",
1666 "validVersions": "0-4",
1667 "deprecatedVersions": "0",
1668 "flexibleVersions": "3+",
1669 "fields": [
1670 { "name": "Key", "type": "string", "versions": "0-3",
1671 "about": "The coordinator key." },
1672 { "name": "KeyType", "type": "int8", "versions": "1+", "default": "0", "ignorable": false,
1673 "about": "The coordinator key type. (Group, transaction, etc.)" },
1674 { "name": "CoordinatorKeys", "type": "[]string", "versions": "4+",
1675 "about": "The coordinator keys." }
1676 ]
1677 }
1678 "#,
1679 )
1680 .map_err(Into::into)
1681 .and_then(|v| Message::try_from(&Wv::from(&v)))?;
1682
1683 assert!(!m.has_records());
1684 assert_eq!(MessageKind::Request, m.kind());
1685 assert_eq!("FindCoordinatorRequest", m.name());
1686 assert_eq!(
1687 Version {
1688 valid: VersionRange { start: 0, end: 4 },
1689 deprecated: Some(VersionRange { start: 0, end: 0 }),
1690 flexible: VersionRange {
1691 start: 3,
1692 end: i16::MAX
1693 },
1694 },
1695 m.version()
1696 );
1697
1698 assert_eq!("Key", m.fields()[0].name());
1699 assert_eq!(Kind::new("string"), m.fields()[0].kind);
1700 assert_eq!(VersionRange { start: 0, end: 3 }, m.fields()[0].versions());
1701 assert_eq!(Some("The coordinator key.".into()), m.fields()[0].about());
1702
1703 Ok(())
1704 }
1705
1706 #[test]
1707 fn fetch_response() -> Result<()> {
1708 let m = serde_json::from_str::<Value>(
1709 r#"
1710 {
1711 "apiKey": 1,
1712 "type": "response",
1713 "name": "FetchResponse",
1714 "validVersions": "0-16",
1715 "flexibleVersions": "12+",
1716 "fields": [
1717 { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "16+", "tag": 0,
1718 "about": "Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.", "fields": [
1719 { "name": "NodeId", "type": "int32", "versions": "16+",
1720 "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."},
1721 { "name": "Host", "type": "string", "versions": "16+", "about": "The node's hostname." },
1722 { "name": "Port", "type": "int32", "versions": "16+", "about": "The node's port." },
1723 { "name": "Rack", "type": "string", "versions": "16+", "nullableVersions": "16+", "default": "null",
1724 "about": "The rack of the node, or null if it has not been assigned to a rack." }
1725 ]}
1726 ]
1727 }
1728 "#,
1729 )
1730 .map_err(Into::into)
1731 .and_then(|v| Message::try_from(&Wv::from(&v)))?;
1732
1733 assert_eq!(MessageKind::Response, m.kind());
1734 assert_eq!("FetchResponse", m.name());
1735 assert_eq!(
1736 Version {
1737 valid: VersionRange { start: 0, end: 16 },
1738 deprecated: None,
1739 flexible: VersionRange {
1740 start: 12,
1741 end: i16::MAX
1742 },
1743 },
1744 m.version()
1745 );
1746
1747 assert_eq!("NodeEndpoints", m.fields()[0].name());
1748 assert_eq!(Kind::new("[]NodeEndpoint"), m.fields()[0].kind);
1749 assert_eq!(
1750 VersionRange {
1751 start: 16,
1752 end: i16::MAX
1753 },
1754 m.fields()[0].versions()
1755 );
1756
1757 let node_id = &m.fields()[0].fields().unwrap()[0];
1758
1759 assert_eq!("NodeId", node_id.name());
1760 assert_eq!(Kind::new("int32"), node_id.kind);
1761 assert_eq!(
1762 VersionRange {
1763 start: 16,
1764 end: i16::MAX
1765 },
1766 node_id.versions()
1767 );
1768
1769 Ok(())
1770 }
1771}