Skip to main content

tansu_model/
lib.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15//! Structures representing Kafka JSON protocol definitions.
16//!
17//! This crate converts Kafka JSON protocol definitions into structures
18//! that can be easily used during the Tansu Sans I/O build process.
19
20pub mod error;
21pub mod wv;
22
23use convert_case::{Case, Casing};
24pub use error::Error;
25use lazy_static::lazy_static;
26use proc_macro2::{Ident, Span, TokenStream};
27use quote::ToTokens;
28use regex::Regex;
29use serde_json::Value;
30use std::{fmt, str::FromStr};
31use syn::{Expr, Type};
32use tracing::debug;
33use wv::{As, AsOption, Wv};
34
35macro_rules! prefix_crate {
36    ($e:ident) => {
37        format!("{}::{}", env!("CARGO_CRATE_NAME"), stringify!($e))
38    };
39}
40
41macro_rules! with_crate {
42    ($e:expr_2021) => {
43        format!("{}::{:?}", env!("CARGO_CRATE_NAME"), $e)
44    };
45
46    ($e:expr_2021, $f:expr_2021) => {
47        format!("{}::{}::{:?}", env!("CARGO_CRATE_NAME"), $e, $f)
48    };
49}
50
51pub type Result<T, E = Error> = std::result::Result<T, E>;
52
53fn type_mapping(kind: &str) -> String {
54    match kind {
55        "bytes" => String::from("bytes::Bytes"),
56        "float64" => String::from("f64"),
57        "int16" => String::from("i16"),
58        "int32" => String::from("i32"),
59        "int64" => String::from("i64"),
60        "int8" => String::from("i8"),
61        "string" => String::from("String"),
62        "uint16" => String::from("u16"),
63        "uuid" => String::from("[u8; 16]"),
64        "records" => String::from("crate::RecordBatch"),
65
66        sequence if sequence.starts_with("[]") => type_mapping(&sequence[2..]),
67
68        s => String::from(s),
69    }
70}
71
72#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
73/// The Kafka field kind (type).
74pub struct Kind(String);
75
76impl ToTokens for Kind {
77    #![allow(clippy::unwrap_used)]
78    fn to_tokens(&self, tokens: &mut TokenStream) {
79        let expr = with_crate!(self);
80        syn::parse_str::<Expr>(&expr).unwrap().to_tokens(tokens);
81    }
82}
83
84impl From<Type> for Kind {
85    fn from(value: Type) -> Self {
86        Self(value.to_token_stream().to_string())
87    }
88}
89
90lazy_static! {
91    static ref PRIMITIVE: &'static [&'static str] = &[
92        "bool", "bytes", "float64", "int16", "int32", "int64", "int8", "records", "string",
93        "uint16", "uuid",
94    ];
95}
96
97impl Kind {
98    #[must_use]
99    pub fn new(kind: &str) -> Self {
100        Self(kind.to_owned())
101    }
102
103    #[must_use]
104    pub fn name(&self) -> &str {
105        &self.0[..]
106    }
107
108    #[must_use]
109    #[allow(clippy::missing_panics_doc)]
110    /// The Rust type name of this kind.
111    pub fn type_name(&self) -> Type {
112        syn::parse_str::<Type>(&type_mapping(&self.0))
113            .unwrap_or_else(|_| panic!("not a type: {self:?}"))
114    }
115
116    #[must_use]
117    /// Returns true if this kind is a sequence (array)
118    pub fn is_sequence(&self) -> bool {
119        self.0.starts_with("[]")
120    }
121
122    #[must_use]
123    /// Returns true is this kind is a Kafka primitive (defined) type
124    pub fn is_primitive(&self) -> bool {
125        PRIMITIVE.contains(&self.0.as_str())
126    }
127
128    #[must_use]
129    /// Returns true if this kind is a float64
130    pub fn is_float(&self) -> bool {
131        self.0.eq("float64")
132    }
133
134    #[must_use]
135    /// Returns true if this is a sequence of Kafka primitive types
136    pub fn is_sequence_of_primitive(&self) -> bool {
137        self.is_sequence() && PRIMITIVE.contains(&&self.0[2..])
138    }
139
140    #[must_use]
141    /// Optionally when the kind is a sequence, returns the kind of the sequence
142    pub fn kind_of_sequence(&self) -> Option<Self> {
143        if self.is_sequence() {
144            Some(Self::new(&self.0[2..]))
145        } else {
146            None
147        }
148    }
149}
150
151impl FromStr for Kind {
152    type Err = Error;
153
154    fn from_str(s: &str) -> Result<Self, Self::Err> {
155        Ok(Self(s.to_owned()))
156    }
157}
158
159impl TryFrom<&Value> for Kind {
160    type Error = Error;
161
162    fn try_from(value: &Value) -> Result<Self, Self::Error> {
163        as_str(value, "type").and_then(Self::from_str)
164    }
165}
166
167#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
168/// The listener for this message type.
169pub enum Listener {
170    ZkBroker,
171    #[default]
172    Broker,
173    Controller,
174}
175
176impl TryFrom<&Value> for Listener {
177    type Error = Error;
178
179    fn try_from(value: &Value) -> Result<Self, Self::Error> {
180        value
181            .as_str()
182            .ok_or(Error::Message(String::from(
183                "expecting string for listener",
184            )))
185            .and_then(Self::from_str)
186    }
187}
188
189impl FromStr for Listener {
190    type Err = Error;
191
192    fn from_str(s: &str) -> Result<Self, Self::Err> {
193        match s {
194            "zkBroker" => Ok(Self::ZkBroker),
195            "broker" => Ok(Self::Broker),
196            "controller" => Ok(Self::Controller),
197            s => Err(Error::Message(String::from(s))),
198        }
199    }
200}
201
202#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
203/// Model request and response types only
204pub enum MessageKind {
205    #[default]
206    Request,
207    Response,
208}
209
210impl ToTokens for MessageKind {
211    #[allow(clippy::unwrap_used)]
212    fn to_tokens(&self, tokens: &mut TokenStream) {
213        let expr = with_crate!("MessageKind", self);
214        syn::parse_str::<Expr>(&expr).unwrap().to_tokens(tokens);
215    }
216}
217
218impl TryFrom<&Value> for MessageKind {
219    type Error = Error;
220
221    fn try_from(value: &Value) -> Result<Self, Self::Error> {
222        as_str(value, "type").and_then(Self::from_str)
223    }
224}
225
226impl FromStr for MessageKind {
227    type Err = Error;
228
229    fn from_str(s: &str) -> Result<Self, Self::Err> {
230        match s {
231            "request" => Ok(Self::Request),
232            "response" => Ok(Self::Response),
233            s => Err(Error::Message(String::from(s))),
234        }
235    }
236}
237
238#[derive(Clone, Copy, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
239/// A range of versions.
240pub struct VersionRange {
241    pub start: i16,
242    pub end: i16,
243}
244
245impl fmt::Debug for VersionRange {
246    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247        f.debug_struct(&prefix_crate!(VersionRange))
248            .field("start", &self.start)
249            .field("end", &self.end)
250            .finish()
251    }
252}
253
254impl VersionRange {
255    #[must_use]
256    pub fn within(&self, version: i16) -> bool {
257        version >= self.start && version <= self.end
258    }
259
260    #[must_use]
261    pub fn is_mandatory(&self, parent: Option<VersionRange>) -> bool {
262        parent.map_or(
263            self.start == 0 && self.end == i16::MAX,
264            |VersionRange { start, end }| {
265                if start > self.start {
266                    self.end == end
267                } else {
268                    self.start == start && self.end == end
269                }
270            },
271        )
272    }
273}
274
275impl ToTokens for VersionRange {
276    fn to_tokens(&self, tokens: &mut TokenStream) {
277        let expr = format!("{self:?}");
278        syn::parse_str::<Expr>(&expr)
279            .unwrap_or_else(|_| panic!("an expression: {self:?}"))
280            .to_tokens(tokens);
281    }
282}
283
284impl FromStr for VersionRange {
285    type Err = Error;
286
287    fn from_str(s: &str) -> Result<Self, Self::Err> {
288        #![allow(clippy::expect_used)]
289        lazy_static! {
290            static ref RX: Regex =
291                Regex::new(r"^(?<start>\d+)(-(?<end>\d+)|(?<infinite>\+))?$").expect("regex");
292        }
293
294        if s == "none" {
295            Ok(VersionRange { start: -2, end: -1 })
296        } else {
297            RX.captures(s)
298                .ok_or(Error::Message(format!("invalid version range format: {s}")))
299                .and_then(|captures| {
300                    let parse = |name, default: i16| {
301                        captures
302                            .name(name)
303                            .map_or(Ok(&default.to_string()[..]), |s| Ok(s.as_str()))
304                            .and_then(str::parse)
305                    };
306
307                    let start = parse("start", 0)?;
308
309                    let end = if let Some(end) = captures.name("end") {
310                        str::parse(end.as_str())?
311                    } else if captures.name("infinite").is_some() {
312                        i16::MAX
313                    } else {
314                        start
315                    };
316
317                    Ok(VersionRange { start, end })
318                })
319        }
320    }
321}
322
323#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
324/// The validity, deprecation and flexible version ranges of a Kafka API message.
325pub struct Version {
326    /// The valid version ranges of this Kafka message.
327    pub valid: VersionRange,
328    /// The deprecated version range of this Kafka message.
329    pub deprecated: Option<VersionRange>,
330    /// The range of versions where this message uses flexible encoding.
331    pub flexible: VersionRange,
332}
333
334impl ToTokens for Version {
335    #[allow(clippy::unwrap_used)]
336    fn to_tokens(&self, tokens: &mut TokenStream) {
337        let expr = with_crate!(self);
338        syn::parse_str::<Expr>(&expr).unwrap().to_tokens(tokens);
339    }
340}
341
342impl Version {
343    #[must_use]
344    pub fn valid(&self) -> VersionRange {
345        self.valid
346    }
347
348    #[must_use]
349    pub fn deprecated(&self) -> Option<VersionRange> {
350        self.deprecated
351    }
352
353    #[must_use]
354    pub fn flexible(&self) -> VersionRange {
355        self.flexible
356    }
357}
358
359impl<'a> TryFrom<&Wv<'a>> for Version {
360    type Error = Error;
361
362    fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
363        debug!("value: {:?}", value);
364
365        Ok(Self {
366            valid: value.as_a("validVersions")?,
367            deprecated: value.as_option("deprecatedVersions")?,
368            flexible: value.as_a("flexibleVersions")?,
369        })
370    }
371}
372
373#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
374/// Kafka message field schema
375pub struct Field {
376    /// The name of this field.
377    name: String,
378    /// The Kafka type of this field.
379    kind: Kind,
380    /// A comment about this field.
381    about: Option<String>,
382    /// The version range for this field.
383    versions: VersionRange,
384    /// Whether this field can be used as the key in a map.
385    map_key: Option<bool>,
386    /// Whether this field can be null.
387    nullable: Option<VersionRange>,
388    /// The tag ID for this field
389    tag: Option<u32>,
390    /// The version range in which this field can be tagged.
391    tagged: Option<VersionRange>,
392    /// The entity type of this field.
393    entity_type: Option<String>,
394    /// The default value of this field.
395    default: Option<String>,
396    /// Any fields this field contains.
397    fields: Option<Vec<Field>>,
398}
399
400impl Field {
401    #[must_use]
402    pub fn ident(&self) -> Ident {
403        match self.name.to_case(Case::Snake) {
404            reserved if is_reserved_keyword(&reserved) => {
405                Ident::new_raw(&reserved, Span::call_site())
406            }
407
408            otherwise => Ident::new(&otherwise, Span::call_site()),
409        }
410    }
411
412    #[must_use]
413    pub fn name(&self) -> &str {
414        &self.name
415    }
416
417    #[must_use]
418    pub fn kind(&self) -> &Kind {
419        &self.kind
420    }
421
422    #[must_use]
423    pub fn fields(&self) -> Option<&[Field]> {
424        self.fields.as_deref()
425    }
426
427    #[must_use]
428    pub fn versions(&self) -> VersionRange {
429        self.versions
430    }
431
432    #[must_use]
433    pub fn nullable(&self) -> Option<VersionRange> {
434        self.nullable
435    }
436
437    #[must_use]
438    pub fn tagged(&self) -> Option<VersionRange> {
439        self.tagged
440    }
441
442    #[must_use]
443    pub fn tag(&self) -> Option<u32> {
444        self.tag
445    }
446
447    #[must_use]
448    pub fn about(&self) -> Option<&str> {
449        self.about.as_deref()
450    }
451
452    #[must_use]
453    pub fn has_tags(&self) -> bool {
454        self.tagged.is_some()
455    }
456
457    #[must_use]
458    pub fn has_records(&self) -> bool {
459        self.kind.0 == "records"
460            || self
461                .fields()
462                .is_some_and(|fields| fields.iter().any(Field::has_records))
463    }
464
465    #[must_use]
466    pub fn has_float(&self) -> bool {
467        self.kind().is_float()
468            || self
469                .fields()
470                .is_some_and(|fields| fields.iter().any(Field::has_float))
471    }
472}
473
474impl<'a> TryFrom<&Wv<'a>> for Field {
475    type Error = Error;
476
477    fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
478        let name = value.as_a("name")?;
479        let kind = value.as_a("type")?;
480        let about = value.as_option("about")?;
481        let versions = value.as_a("versions")?;
482        let map_key = value.as_option("mapKey")?;
483        let nullable = value.as_option("nullableVersions")?;
484        let tag = value.as_option("tag")?;
485        let tagged = value.as_option("taggedVersions")?;
486        let entity_type = value.as_option("entityType")?;
487        let default = value.as_option("default")?;
488
489        let fields = value
490            .as_option("fields")?
491            .map_or(Ok(None), |values: &[Value]| {
492                values
493                    .iter()
494                    .try_fold(Vec::new(), |mut acc, field| {
495                        Field::try_from(&Wv::from(field)).map(|f| {
496                            acc.push(f);
497                            acc
498                        })
499                    })
500                    .map(Some)
501            })?;
502
503        Ok(Field {
504            name,
505            kind,
506            about,
507            versions,
508            map_key,
509            nullable,
510            tag,
511            tagged,
512            entity_type,
513            default,
514            fields,
515        })
516    }
517}
518
519fn is_reserved_keyword(s: &str) -> bool {
520    lazy_static! {
521        static ref RESERVED: Vec<&'static str> = vec!["as",
522        "break",
523        "const",
524        "continue",
525        "crate",
526        "else",
527        "enum",
528        "extern",
529        "false",
530        "fn",
531        "for",
532        "if",
533        "impl",
534        "in",
535        "let",
536        "loop",
537        "match",
538        "mod",
539        "move",
540        "mut",
541        "pub",
542        "ref",
543        "return",
544        "self",
545        "Self",
546        "static",
547        "struct",
548        "super",
549        "trait",
550        "true",
551        "type",
552        "unsafe",
553        "use",
554        "where",
555        "while",
556
557        // 2018 edition
558        "async",
559        "await",
560        "dyn",
561
562        // reserved
563        "abstract",
564        "become",
565        "box",
566        "do",
567        "final",
568        "macro",
569        "override",
570        "priv",
571        "typeof",
572        "unsized",
573        "virtual",
574        "yield",
575
576        // reserved 2018 edition
577        "try",
578        ];
579    }
580
581    RESERVED.contains(&s)
582}
583
584#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
585pub struct Message {
586    api_key: i16,
587    kind: MessageKind,
588    listeners: Option<Vec<Listener>>,
589    name: String,
590    versions: Version,
591    fields: Vec<Field>,
592    common_structs: Option<Vec<CommonStruct>>,
593}
594
595impl Message {
596    #[must_use]
597    pub fn api_key(&self) -> i16 {
598        self.api_key
599    }
600
601    #[must_use]
602    pub fn kind(&self) -> MessageKind {
603        self.kind
604    }
605
606    #[must_use]
607    pub fn name(&self) -> &str {
608        &self.name
609    }
610
611    #[must_use]
612    #[allow(clippy::missing_panics_doc, clippy::unwrap_used)]
613    pub fn type_name(&self) -> Type {
614        syn::parse_str::<Type>(&self.name).unwrap()
615    }
616
617    #[must_use]
618    pub fn version(&self) -> Version {
619        self.versions
620    }
621
622    #[must_use]
623    pub fn listeners(&self) -> Option<&[Listener]> {
624        self.listeners.as_deref()
625    }
626
627    #[must_use]
628    pub fn fields(&self) -> &[Field] {
629        &self.fields
630    }
631
632    #[must_use]
633    #[allow(clippy::missing_panics_doc, clippy::unwrap_used)]
634    pub fn wrapper_new_type(&self, field: &Field) -> Type {
635        syn::parse_str::<Type>(&format!("{}{}", self.name, field.name).to_case(Case::Pascal))
636            .unwrap()
637    }
638
639    #[must_use]
640    pub fn common_structs(&self) -> Option<&[CommonStruct]> {
641        self.common_structs.as_deref()
642    }
643
644    #[must_use]
645    pub fn has_records(&self) -> bool {
646        self.fields().iter().any(Field::has_records)
647    }
648
649    #[must_use]
650    pub fn has_tags(&self) -> bool {
651        self.fields().iter().any(Field::has_tags)
652    }
653
654    #[must_use]
655    pub fn has_float(&self) -> bool {
656        self.fields.iter().any(|field| field.kind().is_float())
657            || self
658                .common_structs()
659                .is_some_and(|structures| structures.iter().any(CommonStruct::has_float))
660    }
661}
662
663impl<'a> TryFrom<&Wv<'a>> for Message {
664    type Error = Error;
665
666    fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
667        let api_key = value.as_a("apiKey")?;
668        let name = value.as_a("name")?;
669        let kind = value.as_a("type")?;
670        let listeners = value
671            .as_option("listeners")?
672            .map_or(Ok(None), |maybe: &[Value]| {
673                maybe
674                    .iter()
675                    .try_fold(Vec::new(), |mut acc, listener| {
676                        Listener::try_from(listener).map(|l| {
677                            acc.push(l);
678                            acc
679                        })
680                    })
681                    .map(Some)
682            })?;
683
684        let fields = value.as_a("fields").and_then(|fields: &[Value]| {
685            fields.iter().try_fold(Vec::new(), |mut acc, field| {
686                Field::try_from(&Wv::from(field)).map(|f| {
687                    acc.push(f);
688                    acc
689                })
690            })
691        })?;
692
693        let versions = Version::try_from(value)?;
694        let common_structs =
695            value
696                .as_option("commonStructs")?
697                .map_or(Ok(None), |maybe: &[Value]| {
698                    maybe
699                        .iter()
700                        .try_fold(Vec::new(), |mut acc, value| {
701                            CommonStruct::try_from(&Wv::from(value)).map(|field| {
702                                acc.push(field);
703                                acc
704                            })
705                        })
706                        .map(Some)
707                })?;
708
709        Ok(Self {
710            api_key,
711            kind,
712            listeners,
713            name,
714            versions,
715            fields,
716            common_structs,
717        })
718    }
719}
720
721#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
722pub struct CommonStruct {
723    name: String,
724    fields: Vec<Field>,
725}
726
727impl CommonStruct {
728    #[must_use]
729    pub fn name(&self) -> &str {
730        self.name.as_str()
731    }
732
733    #[must_use]
734    #[allow(clippy::missing_panics_doc)]
735    pub fn type_name(&self) -> Type {
736        syn::parse_str::<Type>(&self.name).unwrap_or_else(|_| panic!("not a type: {self:?}"))
737    }
738
739    #[must_use]
740    pub fn fields(&self) -> &Vec<Field> {
741        &self.fields
742    }
743
744    #[must_use]
745    pub fn has_float(&self) -> bool {
746        self.fields.iter().any(|field| field.kind().is_float())
747    }
748}
749
750impl<'a> TryFrom<&Wv<'a>> for CommonStruct {
751    type Error = Error;
752
753    fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
754        let name = value.as_a("name")?;
755        let fields = value.as_a("fields").and_then(|fields: &[Value]| {
756            fields.iter().try_fold(Vec::new(), |mut acc, field| {
757                Field::try_from(&Wv::from(field)).map(|f| {
758                    acc.push(f);
759                    acc
760                })
761            })
762        })?;
763
764        Ok(Self { name, fields })
765    }
766}
767
768#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
769pub struct HeaderMeta {
770    pub name: &'static str,
771    pub valid: VersionRange,
772    pub flexible: VersionRange,
773    pub fields: &'static [(&'static str, &'static FieldMeta)],
774}
775
776#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
777/// Kafka API message metadata.
778pub struct MessageMeta {
779    /// The name of the Kafka API message.
780    pub name: &'static str,
781    /// The API key used by this message.
782    pub api_key: i16,
783    /// The version ranges for this message.
784    pub version: Version,
785    /// The message kind of this message.
786    pub message_kind: MessageKind,
787    /// The fields that this message describes.
788    pub fields: &'static [(&'static str, &'static FieldMeta)],
789}
790
791impl MessageMeta {
792    #[must_use]
793    pub fn is_flexible(&self, version: i16) -> bool {
794        self.version.flexible.within(version)
795    }
796
797    #[must_use]
798    pub fn structures(&self) -> Vec<(&str, &FieldMeta)> {
799        self.fields
800            .iter()
801            .filter(|(_, fm)| fm.is_structure())
802            .flat_map(|(name, fm)| {
803                debug!(name = self.name, field = ?name, kind = ?fm.kind.0);
804
805                let mut children = fm.structures();
806
807                if let Some(kind) = fm.kind.kind_of_sequence() {
808                    if !kind.is_primitive() {
809                        children.push((kind.name(), *fm));
810                    }
811                } else {
812                    children.push((fm.kind.name(), *fm));
813                }
814
815                children
816            })
817            .collect()
818    }
819
820    #[must_use]
821    pub fn field(&self, name: &str) -> Option<&'static FieldMeta> {
822        self.fields
823            .iter()
824            .find(|(found, _)| name == *found)
825            .map(|(_, meta)| *meta)
826    }
827}
828
829#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
830/// Kafka API message field metadata
831pub struct FieldMeta {
832    /// The version range of this field.
833    pub version: VersionRange,
834    /// The version range where this field may be null.
835    pub nullable: Option<VersionRange>,
836    /// The kind (type) metadata of this field.
837    pub kind: KindMeta,
838    /// When present the tag ID of this field.
839    pub tag: Option<u32>,
840    /// The range of versions where this field is tagged.
841    pub tagged: Option<VersionRange>,
842    /// The fields contained within this structure.
843    pub fields: &'static [(&'static str, &'static FieldMeta)],
844}
845
846impl FieldMeta {
847    #[must_use]
848    pub fn is_nullable(&self, version: i16) -> bool {
849        self.nullable.is_some_and(|range| range.within(version))
850    }
851
852    #[must_use]
853    pub fn is_mandatory(&self, parent: Option<VersionRange>) -> bool {
854        self.version.is_mandatory(parent)
855    }
856
857    #[must_use]
858    pub fn is_structure(&self) -> bool {
859        self.kind
860            .kind_of_sequence()
861            .is_some_and(|sk| !sk.is_primitive())
862            || !self.fields.is_empty()
863    }
864
865    #[must_use]
866    pub fn structures(&self) -> Vec<(&str, &FieldMeta)> {
867        self.fields
868            .iter()
869            .filter(|(_, fm)| fm.is_structure())
870            .flat_map(|(_, fm)| {
871                let mut children = fm.structures();
872
873                if let Some(kind) = fm.kind.kind_of_sequence()
874                    && !kind.is_primitive()
875                {
876                    children.push((kind.name(), *fm));
877                }
878
879                children
880            })
881            .collect()
882    }
883
884    pub fn field(&self, name: &str) -> Option<&FieldMeta> {
885        self.fields
886            .iter()
887            .find(|field| name == field.0)
888            .map(|(_, meta)| *meta)
889    }
890}
891
892#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
893pub struct KindMeta(pub &'static str);
894
895impl KindMeta {
896    #[must_use]
897    pub fn name(&self) -> &'static str {
898        self.0
899    }
900
901    #[must_use]
902    pub fn is_sequence(&self) -> bool {
903        self.0.starts_with("[]")
904    }
905
906    #[must_use]
907    pub fn is_primitive(&self) -> bool {
908        PRIMITIVE.contains(&self.0)
909    }
910
911    #[must_use]
912    pub fn is_string(&self) -> bool {
913        self.0 == "string"
914    }
915
916    #[must_use]
917    pub fn is_records(&self) -> bool {
918        self.0 == "records"
919    }
920
921    #[must_use]
922    pub fn kind_of_sequence(&self) -> Option<Self> {
923        if self.is_sequence() {
924            Some(Self(&self.0[2..]))
925        } else {
926            None
927        }
928    }
929}
930
931fn as_str<'v>(value: &'v Value, name: &str) -> Result<&'v str> {
932    value[name]
933        .as_str()
934        .ok_or(Error::Message(String::from(name)))
935}
936
937#[cfg(test)]
938mod tests {
939    use std::{any::type_name_of_val, collections::HashMap};
940
941    use serde_json::json;
942
943    use super::*;
944
945    const PRIMITIVES: [&str; 10] = [
946        "bool", "bytes", "float64", "int16", "int32", "int64", "int8", "string", "uint16", "uuid",
947    ];
948
949    #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
950    struct Header {
951        name: String,
952        valid: VersionRange,
953        flexible: VersionRange,
954        fields: Vec<Field>,
955    }
956
957    impl<'a> TryFrom<&Wv<'a>> for Header {
958        type Error = Error;
959
960        fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
961            let fields: &[Value] = value.as_a("fields")?;
962
963            Ok(Header {
964                name: value.as_a("name")?,
965                valid: value.as_a("validVersions")?,
966                flexible: value.as_a("flexibleVersions")?,
967                fields: fields.iter().try_fold(Vec::new(), |mut acc, field| {
968                    Field::try_from(&Wv::from(field)).map(|f| {
969                        acc.push(f);
970                        acc
971                    })
972                })?,
973            })
974        }
975    }
976
977    #[test]
978    fn type_mapping_test() {
979        assert_eq!("bytes::Bytes", type_mapping("bytes"));
980        assert_eq!("f64", type_mapping("float64"));
981        assert_eq!("i16", type_mapping("int16"));
982        assert_eq!("i32", type_mapping("int32"));
983        assert_eq!("i64", type_mapping("int64"));
984        assert_eq!("i8", type_mapping("int8"));
985        assert_eq!("String", type_mapping("string"));
986        assert_eq!("u16", type_mapping("uint16"));
987        assert_eq!("[u8; 16]", type_mapping("uuid"));
988
989        assert_eq!("i16", type_mapping("[]int16"));
990        assert_eq!("SomeType", type_mapping("[]SomeType"));
991
992        assert_eq!("SomeType", type_mapping("SomeType"));
993    }
994
995    #[test]
996    fn kind_is_sequence() {
997        for primitive in PRIMITIVES {
998            let k = Kind::new(primitive);
999            assert!(!k.is_sequence());
1000        }
1001
1002        let sequences = vec!["[]int16", "[]string", "[]SomeType"];
1003
1004        for sequence in sequences {
1005            let k = Kind::new(sequence);
1006            assert!(k.is_sequence());
1007        }
1008    }
1009
1010    #[test]
1011    fn kind_is_sequence_of_primitive() {
1012        for primitive in PRIMITIVES {
1013            let k = Kind::new(primitive);
1014            assert!(!k.is_sequence_of_primitive());
1015        }
1016
1017        let primitive_sequences: Vec<String> = PRIMITIVES
1018            .iter()
1019            .map(|primitive| format!("[]{primitive}"))
1020            .collect();
1021
1022        for sequence in primitive_sequences {
1023            let k = Kind::new(sequence.as_str());
1024            assert!(k.is_sequence_of_primitive());
1025        }
1026
1027        let sequences = vec!["[]SomeType", "[]OtherType"];
1028
1029        for sequence in sequences {
1030            let k = Kind::new(sequence);
1031            assert!(!k.is_sequence_of_primitive());
1032        }
1033    }
1034
1035    #[test]
1036    fn kind_is_primitive() {
1037        for primitive in PRIMITIVES {
1038            let k = Kind::new(primitive);
1039            assert!(k.is_primitive());
1040        }
1041
1042        let primitive_sequences: Vec<String> = PRIMITIVES
1043            .iter()
1044            .map(|primitive| format!("[]{primitive}"))
1045            .collect();
1046
1047        for sequence in primitive_sequences {
1048            let k = Kind::new(sequence.as_str());
1049            assert!(!k.is_primitive());
1050        }
1051    }
1052
1053    #[test]
1054    fn listener_from_value() -> Result<()> {
1055        assert_eq!(Listener::ZkBroker, Listener::try_from(&json!("zkBroker"))?);
1056
1057        assert_eq!(Listener::Broker, Listener::try_from(&json!("broker"))?);
1058
1059        assert_eq!(
1060            Listener::Controller,
1061            Listener::try_from(&json!("controller"))?
1062        );
1063
1064        Ok(())
1065    }
1066
1067    #[test]
1068    fn message_kind_from_value() -> Result<()> {
1069        assert_eq!(
1070            MessageKind::Request,
1071            MessageKind::try_from(&json!({
1072                "type": "request"
1073            }
1074            ))?
1075        );
1076
1077        assert_eq!(
1078            MessageKind::Response,
1079            MessageKind::try_from(&json!({
1080                "type": "response"
1081            }
1082            ))?
1083        );
1084
1085        Ok(())
1086    }
1087
1088    #[test]
1089    fn version_range_from_str() -> Result<()> {
1090        assert_eq!(
1091            VersionRange { start: -2, end: -1 },
1092            VersionRange::from_str("none")?
1093        );
1094
1095        assert_eq!(
1096            VersionRange {
1097                start: 3,
1098                end: i16::MAX
1099            },
1100            VersionRange::from_str("3+")?
1101        );
1102
1103        assert_eq!(
1104            VersionRange { start: 6, end: 9 },
1105            VersionRange::from_str("6-9")?
1106        );
1107
1108        assert_eq!(
1109            VersionRange { start: 1, end: 1 },
1110            VersionRange::from_str("1")?
1111        );
1112
1113        Ok(())
1114    }
1115
1116    #[test]
1117    fn version_range_within() {
1118        {
1119            let range = VersionRange { start: 0, end: 0 };
1120            assert!(!range.within(i16::MIN));
1121            assert!(range.within(0));
1122            assert!(!range.within(1));
1123            assert!(!range.within(i16::MAX));
1124        }
1125
1126        {
1127            let range = VersionRange {
1128                start: 3,
1129                end: i16::MAX,
1130            };
1131            assert!(!range.within(i16::MIN));
1132            assert!(!range.within(2));
1133            assert!(range.within(3));
1134            assert!(range.within(i16::MAX));
1135        }
1136
1137        {
1138            let range = VersionRange { start: 6, end: 9 };
1139            assert!(!range.within(i16::MIN));
1140            assert!(!range.within(5));
1141            assert!(range.within(6));
1142            assert!(range.within(7));
1143            assert!(range.within(8));
1144            assert!(range.within(9));
1145            assert!(!range.within(10));
1146            assert!(!range.within(i16::MAX));
1147        }
1148    }
1149
1150    #[test]
1151    fn field_from_value() -> Result<()> {
1152        assert_eq!(
1153            Field {
1154                name: String::from("Topics"),
1155                kind: Kind(String::from("[]CreatableTopic")),
1156                about: Some(String::from("The topics to create.")),
1157                versions: VersionRange::from_str("0+")?,
1158                map_key: None,
1159                nullable: None,
1160                tag: None,
1161                tagged: None,
1162                entity_type: None,
1163                default: None,
1164                fields: Some(vec![Field {
1165                    name: String::from("Name"),
1166                    kind: Kind(String::from("string")),
1167                    versions: VersionRange::from_str("0+")?,
1168                    map_key: Some(true),
1169                    entity_type: Some(String::from("topicName")),
1170                    about: Some(String::from("The topic name.")),
1171                    default: None,
1172                    nullable: None,
1173                    tag: None,
1174                    tagged: None,
1175                    fields: None,
1176                }]),
1177            },
1178            serde_json::from_str::<Value>(
1179                r#"
1180                {
1181                    "name": "Topics",
1182                    "type": "[]CreatableTopic",
1183                    "versions": "0+",
1184                    "about": "The topics to create.",
1185                     "fields": [
1186                        { "name": "Name",
1187                          "type": "string",
1188                          "versions": "0+",
1189                          "mapKey": true,
1190                          "entityType": "topicName",
1191                          "about": "The topic name."
1192                        }]
1193                }
1194                "#
1195            )
1196            .map_err(Into::into)
1197            .and_then(|v| Field::try_from(&Wv::from(&v)))?
1198        );
1199
1200        Ok(())
1201    }
1202
1203    #[allow(clippy::too_many_lines)]
1204    #[test]
1205    fn tagged_field_from_value() -> Result<()> {
1206        assert_eq!(
1207            Field {
1208                name: "SupportedFeatures".into(),
1209                kind: Kind("[]SupportedFeatureKey".into()),
1210                about: Some("Features supported by the broker.".into()),
1211                versions: VersionRange {
1212                    start: 3,
1213                    end: 32767
1214                },
1215                map_key: None,
1216                nullable: None,
1217                tag: Some(0),
1218                tagged: Some(VersionRange {
1219                    start: 3,
1220                    end: 32767
1221                }),
1222                entity_type: None,
1223                default: None,
1224                fields: Some(
1225                    [
1226                        Field {
1227                            name: "Name".into(),
1228                            kind: Kind("string".into()),
1229                            about: Some("The name of the feature.".into()),
1230                            versions: VersionRange {
1231                                start: 3,
1232                                end: 32767
1233                            },
1234                            map_key: Some(true),
1235                            nullable: None,
1236                            tag: None,
1237                            tagged: None,
1238                            entity_type: None,
1239                            default: None,
1240                            fields: None
1241                        },
1242                        Field {
1243                            name: "MinVersion".into(),
1244                            kind: Kind("int16".into()),
1245                            about: Some("The minimum supported version for the feature.".into()),
1246                            versions: VersionRange {
1247                                start: 3,
1248                                end: 32767
1249                            },
1250                            map_key: None,
1251                            nullable: None,
1252                            tag: None,
1253                            tagged: None,
1254                            entity_type: None,
1255                            default: None,
1256                            fields: None
1257                        },
1258                        Field {
1259                            name: "MaxVersion".into(),
1260                            kind: Kind("int16".into()),
1261                            about: Some("The maximum supported version for the feature.".into()),
1262                            versions: VersionRange {
1263                                start: 3,
1264                                end: 32767
1265                            },
1266                            map_key: None,
1267                            nullable: None,
1268                            tag: None,
1269                            tagged: None,
1270                            entity_type: None,
1271                            default: None,
1272                            fields: None
1273                        }
1274                    ]
1275                    .into()
1276                )
1277            },
1278            serde_json::from_str::<Value>(
1279                r#"
1280                { "name":  "SupportedFeatures",
1281                  "type": "[]SupportedFeatureKey",
1282                  "ignorable": true,
1283                  "versions":  "3+",
1284                  "tag": 0,
1285                  "taggedVersions": "3+",
1286                  "about": "Features supported by the broker.",
1287                  "fields":  [
1288                                { "name": "Name",
1289                                  "type": "string",
1290                                  "versions": "3+",
1291                                  "mapKey": true,
1292                                  "about": "The name of the feature." },
1293                                { "name": "MinVersion",
1294                                  "type": "int16",
1295                                  "versions": "3+",
1296                                  "about": "The minimum supported version for the feature." },
1297                                { "name": "MaxVersion",
1298                                  "type": "int16",
1299                                  "versions": "3+",
1300                                  "about": "The maximum supported version for the feature." }
1301                             ]
1302                }
1303                "#
1304            )
1305            .map_err(Into::into)
1306            .and_then(|v| Field::try_from(&Wv::from(&v)))?
1307        );
1308
1309        Ok(())
1310    }
1311
1312    #[test]
1313    fn untagged_message() -> Result<()> {
1314        let m = serde_json::from_str::<Value>(
1315            r#"
1316            {
1317              "apiKey": 25,
1318              "type": "request",
1319              "listeners": ["zkBroker", "broker"],
1320              "name": "AddOffsetsToTxnRequest",
1321              "validVersions": "0-3",
1322              "flexibleVersions": "3+",
1323              "fields": [
1324                { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
1325                  "about": "The transactional id corresponding to the transaction."},
1326                { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
1327                  "about": "Current producer id in use by the transactional id." },
1328                { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
1329                  "about": "Current epoch associated with the producer id." },
1330                { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
1331                  "about": "The unique group identifier." }
1332              ]
1333            }
1334            "#,
1335        ).map_err(Into::into)
1336        .and_then(|v| Message::try_from(&Wv::from(&v)))?;
1337
1338        assert_eq!(MessageKind::Request, m.kind());
1339
1340        assert!(!m.has_tags());
1341
1342        Ok(())
1343    }
1344
1345    #[test]
1346    fn tagged_message() -> Result<()> {
1347        let m = Message::try_from(&Wv::from(&json!(
1348            {
1349              "apiKey": 63,
1350              "type": "request",
1351              "listeners": ["controller"],
1352              "name": "BrokerHeartbeatRequest",
1353              "validVersions": "0-1",
1354              "flexibleVersions": "0+",
1355              "fields": [
1356                { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
1357                  "about": "The broker ID." },
1358                { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
1359                  "about": "The broker epoch." },
1360                { "name": "CurrentMetadataOffset", "type": "int64", "versions": "0+",
1361                  "about": "The highest metadata offset which the broker has reached." },
1362                { "name": "WantFence", "type": "bool", "versions": "0+",
1363                  "about": "True if the broker wants to be fenced, false otherwise." },
1364                { "name": "WantShutDown", "type": "bool", "versions": "0+",
1365                  "about": "True if the broker wants to be shut down, false otherwise." },
1366                { "name": "OfflineLogDirs", "type":  "[]uuid", "versions": "1+", "taggedVersions": "1+", "tag": "0",
1367                  "about": "Log directories that failed and went offline." }
1368              ]
1369            }
1370        )))?;
1371
1372        assert_eq!(MessageKind::Request, m.kind());
1373        assert!(m.has_tags());
1374
1375        Ok(())
1376    }
1377
1378    #[allow(clippy::too_many_lines)]
1379    #[test]
1380    fn message_from_value() -> Result<()> {
1381        assert_eq!(
1382            Message {
1383                api_key: 19,
1384                kind: MessageKind::Request,
1385                listeners: Some(vec![
1386                    Listener::ZkBroker,
1387                    Listener::Broker,
1388                    Listener::Controller
1389                ]),
1390                name: String::from("CreateTopicsRequest"),
1391                versions: Version {
1392                    valid: VersionRange::from_str("0-7")?,
1393                    deprecated: Some(VersionRange::from_str("0-1")?),
1394                    flexible: VersionRange::from_str("5+")?,
1395                },
1396                common_structs: Some(vec![CommonStruct {
1397                    name: String::from("AddPartitionsToTxnTopic"),
1398                    fields: vec![
1399                        Field {
1400                            name: String::from("Name"),
1401                            kind: Kind(String::from("string")),
1402                            versions: VersionRange::from_str("0+")?,
1403                            map_key: Some(true),
1404                            nullable: None,
1405                            tag: None,
1406                            tagged: None,
1407                            default: None,
1408                            fields: None,
1409                            entity_type: Some(String::from("topicName")),
1410                            about: Some(String::from("The name of the topic.")),
1411                        },
1412                        Field {
1413                            name: String::from("Partitions"),
1414                            kind: Kind(String::from("[]int32")),
1415                            versions: VersionRange::from_str("0+")?,
1416                            about: Some(String::from(
1417                                "The partition indexes to add to the transaction"
1418                            )),
1419                            map_key: None,
1420                            nullable: None,
1421                            tag: None,
1422                            tagged: None,
1423                            default: None,
1424                            fields: None,
1425                            entity_type: None,
1426                        }
1427                    ],
1428                }]),
1429                fields: vec![Field {
1430                    name: String::from("Topics"),
1431                    kind: Kind(String::from("[]CreatableTopic")),
1432                    about: Some(String::from("The topics to create.")),
1433                    versions: VersionRange::from_str("0+")?,
1434                    map_key: None,
1435                    nullable: None,
1436                    tag: None,
1437                    tagged: None,
1438                    entity_type: None,
1439                    default: None,
1440                    fields: Some(vec![Field {
1441                        name: String::from("Name"),
1442                        kind: Kind(String::from("string")),
1443                        versions: VersionRange::from_str("0+")?,
1444                        map_key: Some(true),
1445                        entity_type: Some(String::from("topicName")),
1446                        about: Some(String::from("The topic name.")),
1447                        default: None,
1448                        nullable: None,
1449                        tag: None,
1450                        tagged: None,
1451                        fields: None,
1452                    }])
1453                }],
1454            },
1455            serde_json::from_str::<Value>(
1456                r#"
1457                {
1458                    "apiKey": 19,
1459                    "type": "request",
1460                    "listeners": ["zkBroker", "broker", "controller"],
1461                    "name": "CreateTopicsRequest",
1462                    "validVersions": "0-7",
1463                    "deprecatedVersions": "0-1",
1464                    "flexibleVersions": "5+",
1465                    "fields": [
1466                        {"name": "Topics",
1467                        "type": "[]CreatableTopic",
1468                        "versions": "0+",
1469                        "about": "The topics to create.",
1470                         "fields": [
1471                            { "name": "Name",
1472                              "type": "string",
1473                              "versions": "0+",
1474                              "mapKey": true,
1475                              "entityType": "topicName",
1476                              "about": "The topic name."
1477                        }]}],
1478                        "commonStructs": [
1479                            { "name": "AddPartitionsToTxnTopic",
1480                              "versions": "0+",
1481                              "fields": [
1482                                { "name": "Name",
1483                                  "type": "string",
1484                                  "versions": "0+",
1485                                  "mapKey": true,
1486                                  "entityType": "topicName",
1487                                  "about": "The name of the topic."
1488                                },
1489                                { "name": "Partitions",
1490                                  "type": "[]int32",
1491                                  "versions": "0+",
1492                                  "about": "The partition indexes to add to the transaction"
1493                                }]}]
1494                }
1495                "#
1496            )
1497            .map_err(Into::into)
1498            .and_then(|v| Message::try_from(&Wv::from(&v)))?
1499        );
1500
1501        Ok(())
1502    }
1503
1504    #[allow(clippy::too_many_lines)]
1505    #[test]
1506    fn header_from_value() -> Result<()> {
1507        let v = serde_json::from_str::<Value>(
1508            r#"
1509            {
1510              "type": "header",
1511              "name": "RequestHeader",
1512              "validVersions": "0-2",
1513              "flexibleVersions": "2+",
1514              "fields": [
1515                { "name": "RequestApiKey", "type": "int16", "versions": "0+",
1516                  "about": "The API key of this request." },
1517                { "name": "RequestApiVersion", "type": "int16", "versions": "0+",
1518                  "about": "The API version of this request." },
1519                { "name": "CorrelationId", "type": "int32", "versions": "0+",
1520                  "about": "The correlation ID of this request." },
1521
1522                { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true,
1523                  "flexibleVersions": "none", "about": "The client ID string." }
1524              ]
1525            }
1526            "#,
1527        )?;
1528
1529        let wv = Wv::from(&v);
1530
1531        assert_eq!(
1532            Header {
1533                name: String::from("RequestHeader"),
1534                valid: VersionRange { start: 0, end: 2 },
1535                flexible: VersionRange {
1536                    start: 2,
1537                    end: i16::MAX
1538                },
1539                fields: vec![
1540                    Field {
1541                        name: String::from("RequestApiKey"),
1542                        kind: Kind(String::from("int16")),
1543                        about: Some(String::from("The API key of this request.")),
1544                        versions: VersionRange {
1545                            start: 0,
1546                            end: 32767
1547                        },
1548                        map_key: None,
1549                        nullable: None,
1550                        tag: None,
1551                        tagged: None,
1552                        entity_type: None,
1553                        default: None,
1554                        fields: None
1555                    },
1556                    Field {
1557                        name: String::from("RequestApiVersion"),
1558                        kind: Kind(String::from("int16")),
1559                        about: Some(String::from("The API version of this request.")),
1560                        versions: VersionRange {
1561                            start: 0,
1562                            end: 32767
1563                        },
1564                        map_key: None,
1565                        nullable: None,
1566                        tag: None,
1567                        tagged: None,
1568                        entity_type: None,
1569                        default: None,
1570                        fields: None
1571                    },
1572                    Field {
1573                        name: String::from("CorrelationId"),
1574                        kind: Kind(String::from("int32")),
1575                        about: Some(String::from("The correlation ID of this request.")),
1576                        versions: VersionRange {
1577                            start: 0,
1578                            end: 32767
1579                        },
1580                        map_key: None,
1581                        nullable: None,
1582                        tag: None,
1583                        tagged: None,
1584                        entity_type: None,
1585                        default: None,
1586                        fields: None
1587                    },
1588                    Field {
1589                        name: String::from("ClientId"),
1590                        kind: Kind(String::from("string")),
1591                        about: Some(String::from("The client ID string.")),
1592                        versions: VersionRange {
1593                            start: 1,
1594                            end: 32767
1595                        },
1596                        map_key: None,
1597                        nullable: Some(VersionRange {
1598                            start: 1,
1599                            end: 32767
1600                        }),
1601                        tag: None,
1602                        tagged: None,
1603                        entity_type: None,
1604                        default: None,
1605                        fields: None
1606                    }
1607                ],
1608            },
1609            Header::try_from(&wv)?
1610        );
1611        Ok(())
1612    }
1613
1614    #[test]
1615    fn parse_expression() -> Result<()> {
1616        let Expr::Array(expression) =
1617            syn::parse_str::<Expr>(r#"[(one, "a/b/c"), (abc, 123), (pqr, a::b::c)]"#)?
1618        else {
1619            return Err(Error::Message(String::from("expecting an array")));
1620        };
1621
1622        let mut mappings = HashMap::new();
1623
1624        for expression in expression.elems {
1625            let Expr::Tuple(tuple) = expression else {
1626                return Err(Error::Message(String::from("expecting a tuple")));
1627            };
1628
1629            assert_eq!(2, tuple.elems.len());
1630
1631            println!(
1632                "i: {}, ty: {}",
1633                tuple.to_token_stream(),
1634                type_name_of_val(&tuple)
1635            );
1636
1637            let Expr::Path(ref lhs) = tuple.elems[0] else {
1638                return Err(Error::Message(String::from(
1639                    "lhs expecting a path expression",
1640                )));
1641            };
1642
1643            let Some(lhs) = lhs.path.get_ident() else {
1644                return Err(Error::Message(String::from(
1645                    "lhs expecting a path ident expression",
1646                )));
1647            };
1648
1649            _ = mappings.insert(lhs.clone(), tuple.elems[1].clone());
1650
1651            println!("lhs: {}", lhs.to_token_stream());
1652            println!("rhs: {}", tuple.elems[1].to_token_stream());
1653        }
1654
1655        let one = syn::parse_str::<Ident>("one")?;
1656        assert!(mappings.contains_key(&one));
1657
1658        Ok(())
1659    }
1660
1661    #[test]
1662    fn find_coordinator_request() -> Result<()> {
1663        let m = serde_json::from_str::<Value>(
1664            r#"
1665                {
1666                    "apiKey": 10,
1667                    "type": "request",
1668                    "listeners": ["zkBroker", "broker"],
1669                    "name": "FindCoordinatorRequest",
1670                    "validVersions": "0-4",
1671                    "deprecatedVersions": "0",
1672                    "flexibleVersions": "3+",
1673                    "fields": [
1674                        { "name": "Key", "type": "string", "versions": "0-3",
1675                        "about": "The coordinator key." },
1676                        { "name": "KeyType", "type": "int8", "versions": "1+", "default": "0", "ignorable": false,
1677                        "about": "The coordinator key type. (Group, transaction, etc.)" },
1678                        { "name": "CoordinatorKeys", "type": "[]string", "versions": "4+",
1679                        "about": "The coordinator keys." }
1680                    ]
1681                }
1682            "#,
1683        )
1684        .map_err(Into::into)
1685        .and_then(|v| Message::try_from(&Wv::from(&v)))?;
1686
1687        assert!(!m.has_records());
1688        assert_eq!(MessageKind::Request, m.kind());
1689        assert_eq!("FindCoordinatorRequest", m.name());
1690        assert_eq!(
1691            Version {
1692                valid: VersionRange { start: 0, end: 4 },
1693                deprecated: Some(VersionRange { start: 0, end: 0 }),
1694                flexible: VersionRange {
1695                    start: 3,
1696                    end: i16::MAX
1697                },
1698            },
1699            m.version()
1700        );
1701
1702        assert_eq!("Key", m.fields()[0].name());
1703        assert_eq!(Kind::new("string"), m.fields()[0].kind);
1704        assert_eq!(VersionRange { start: 0, end: 3 }, m.fields()[0].versions());
1705        assert_eq!(Some("The coordinator key."), m.fields()[0].about());
1706
1707        Ok(())
1708    }
1709
1710    #[test]
1711    fn fetch_response() -> Result<()> {
1712        let m = serde_json::from_str::<Value>(
1713            r#"
1714                {
1715                    "apiKey": 1,
1716                    "type": "response",
1717                    "name": "FetchResponse",
1718                    "validVersions": "0-16",
1719                    "flexibleVersions": "12+",
1720                    "fields": [
1721                        { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "16+", "tag": 0,
1722                          "about": "Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.", "fields": [
1723                          { "name": "NodeId", "type": "int32", "versions": "16+",
1724                            "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."},
1725                          { "name": "Host", "type": "string", "versions": "16+", "about": "The node's hostname." },
1726                          { "name": "Port", "type": "int32", "versions": "16+", "about": "The node's port." },
1727                          { "name": "Rack", "type": "string", "versions": "16+", "nullableVersions": "16+", "default": "null",
1728                            "about": "The rack of the node, or null if it has not been assigned to a rack." }
1729                        ]}
1730                    ]
1731                }
1732            "#,
1733        )
1734        .map_err(Into::into)
1735        .and_then(|v| Message::try_from(&Wv::from(&v)))?;
1736
1737        assert_eq!(MessageKind::Response, m.kind());
1738        assert_eq!("FetchResponse", m.name());
1739        assert_eq!(
1740            Version {
1741                valid: VersionRange { start: 0, end: 16 },
1742                deprecated: None,
1743                flexible: VersionRange {
1744                    start: 12,
1745                    end: i16::MAX
1746                },
1747            },
1748            m.version()
1749        );
1750
1751        assert_eq!("NodeEndpoints", m.fields()[0].name());
1752        assert_eq!(Kind::new("[]NodeEndpoint"), m.fields()[0].kind);
1753        assert_eq!(
1754            VersionRange {
1755                start: 16,
1756                end: i16::MAX
1757            },
1758            m.fields()[0].versions()
1759        );
1760
1761        let node_id = &m.fields()[0].fields().unwrap_or_default()[0];
1762
1763        assert_eq!("NodeId", node_id.name());
1764        assert_eq!(Kind::new("int32"), node_id.kind);
1765        assert_eq!(
1766            VersionRange {
1767                start: 16,
1768                end: i16::MAX
1769            },
1770            node_id.versions()
1771        );
1772
1773        Ok(())
1774    }
1775}