tansu_model/
lib.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15//! Structures representing Kafka JSON protocol definitions.
16//!
17//! This crate converts Kafka JSON protocol definitions into structures
18//! that can be easily used during the Tansu Sans I/O build process.
19
20pub mod error;
21pub mod wv;
22
23use convert_case::{Case, Casing};
24pub use error::Error;
25use lazy_static::lazy_static;
26use proc_macro2::{Ident, Span, TokenStream};
27use quote::ToTokens;
28use regex::Regex;
29use serde_json::Value;
30use std::{collections::BTreeMap, fmt, str::FromStr};
31use syn::{Expr, Type};
32use tracing::debug;
33use wv::{As, AsOption, Wv};
34
35macro_rules! prefix_crate {
36    ($e:ident) => {
37        format!("{}::{}", env!("CARGO_CRATE_NAME"), stringify!($e))
38    };
39}
40
41macro_rules! with_crate {
42    ($e:expr_2021) => {
43        format!("{}::{:?}", env!("CARGO_CRATE_NAME"), $e)
44    };
45
46    ($e:expr_2021, $f:expr_2021) => {
47        format!("{}::{}::{:?}", env!("CARGO_CRATE_NAME"), $e, $f)
48    };
49}
50
51pub type Result<T, E = Error> = std::result::Result<T, E>;
52
53fn type_mapping(kind: &str) -> String {
54    match kind {
55        "bytes" => String::from("bytes::Bytes"),
56        "float64" => String::from("f64"),
57        "int16" => String::from("i16"),
58        "int32" => String::from("i32"),
59        "int64" => String::from("i64"),
60        "int8" => String::from("i8"),
61        "string" => String::from("String"),
62        "uint16" => String::from("u16"),
63        "uuid" => String::from("[u8; 16]"),
64        "records" => String::from("crate::RecordBatch"),
65
66        sequence if sequence.starts_with("[]") => type_mapping(&sequence[2..]),
67
68        s => String::from(s),
69    }
70}
71
72#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
73/// The Kafka field kind (type).
74pub struct Kind(String);
75
76impl ToTokens for Kind {
77    fn to_tokens(&self, tokens: &mut TokenStream) {
78        let expr = with_crate!(self);
79        syn::parse_str::<Expr>(&expr).unwrap().to_tokens(tokens);
80    }
81}
82
83impl From<Type> for Kind {
84    fn from(value: Type) -> Self {
85        Self(value.to_token_stream().to_string())
86    }
87}
88
89lazy_static! {
90    static ref PRIMITIVE: &'static [&'static str] = &[
91        "bool", "bytes", "float64", "int16", "int32", "int64", "int8", "records", "string",
92        "uint16", "uuid",
93    ];
94}
95
96impl Kind {
97    #[must_use]
98    pub fn new(kind: &str) -> Self {
99        Self(kind.to_owned())
100    }
101
102    #[must_use]
103    pub fn name(&self) -> &str {
104        &self.0[..]
105    }
106
107    #[must_use]
108    #[allow(clippy::missing_panics_doc)]
109    /// The Rust type name of this kind.
110    pub fn type_name(&self) -> Type {
111        syn::parse_str::<Type>(&type_mapping(&self.0))
112            .unwrap_or_else(|_| panic!("not a type: {self:?}"))
113    }
114
115    #[must_use]
116    /// Returns true if this kind is a sequence (array)
117    pub fn is_sequence(&self) -> bool {
118        self.0.starts_with("[]")
119    }
120
121    #[must_use]
122    /// Returns true is this kind is a Kafka primitive (defined) type
123    pub fn is_primitive(&self) -> bool {
124        PRIMITIVE.contains(&self.0.as_str())
125    }
126
127    #[must_use]
128    /// Returns true if this kind is a float64
129    pub fn is_float(&self) -> bool {
130        self.0.eq("float64")
131    }
132
133    #[must_use]
134    /// Returns true if this is a sequence of Kafka primitive types
135    pub fn is_sequence_of_primitive(&self) -> bool {
136        self.is_sequence() && PRIMITIVE.contains(&&self.0[2..])
137    }
138
139    #[must_use]
140    /// Optionally when the kind is a sequence, returns the kind of the sequence
141    pub fn kind_of_sequence(&self) -> Option<Self> {
142        if self.is_sequence() {
143            Some(Self::new(&self.0[2..]))
144        } else {
145            None
146        }
147    }
148}
149
150impl FromStr for Kind {
151    type Err = Error;
152
153    fn from_str(s: &str) -> Result<Self, Self::Err> {
154        Ok(Self(s.to_owned()))
155    }
156}
157
158impl TryFrom<&Value> for Kind {
159    type Error = Error;
160
161    fn try_from(value: &Value) -> Result<Self, Self::Error> {
162        as_str(value, "type").and_then(Self::from_str)
163    }
164}
165
166#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
167/// The listener for this message type.
168pub enum Listener {
169    ZkBroker,
170    #[default]
171    Broker,
172    Controller,
173}
174
175impl TryFrom<&Value> for Listener {
176    type Error = Error;
177
178    fn try_from(value: &Value) -> Result<Self, Self::Error> {
179        value
180            .as_str()
181            .ok_or(Error::Message(String::from(
182                "expecting string for listener",
183            )))
184            .and_then(Self::from_str)
185    }
186}
187
188impl FromStr for Listener {
189    type Err = Error;
190
191    fn from_str(s: &str) -> Result<Self, Self::Err> {
192        match s {
193            "zkBroker" => Ok(Self::ZkBroker),
194            "broker" => Ok(Self::Broker),
195            "controller" => Ok(Self::Controller),
196            s => Err(Error::Message(String::from(s))),
197        }
198    }
199}
200
201#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
202/// Model request and response types only
203pub enum MessageKind {
204    #[default]
205    Request,
206    Response,
207}
208
209impl ToTokens for MessageKind {
210    fn to_tokens(&self, tokens: &mut TokenStream) {
211        let expr = with_crate!("MessageKind", self);
212        syn::parse_str::<Expr>(&expr).unwrap().to_tokens(tokens);
213    }
214}
215
216impl TryFrom<&Value> for MessageKind {
217    type Error = Error;
218
219    fn try_from(value: &Value) -> Result<Self, Self::Error> {
220        as_str(value, "type").and_then(Self::from_str)
221    }
222}
223
224impl FromStr for MessageKind {
225    type Err = Error;
226
227    fn from_str(s: &str) -> Result<Self, Self::Err> {
228        match s {
229            "request" => Ok(Self::Request),
230            "response" => Ok(Self::Response),
231            s => Err(Error::Message(String::from(s))),
232        }
233    }
234}
235
236#[derive(Clone, Copy, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
237/// A range of versions.
238pub struct VersionRange {
239    pub start: i16,
240    pub end: i16,
241}
242
243impl fmt::Debug for VersionRange {
244    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245        f.debug_struct(&prefix_crate!(VersionRange))
246            .field("start", &self.start)
247            .field("end", &self.end)
248            .finish()
249    }
250}
251
252impl VersionRange {
253    #[must_use]
254    pub fn within(&self, version: i16) -> bool {
255        version >= self.start && version <= self.end
256    }
257
258    #[must_use]
259    pub fn is_mandatory(&self, parent: Option<VersionRange>) -> bool {
260        parent.map_or(
261            self.start == 0 && self.end == i16::MAX,
262            |VersionRange { start, end }| {
263                if start > self.start {
264                    self.end == end
265                } else {
266                    self.start == start && self.end == end
267                }
268            },
269        )
270    }
271}
272
273impl ToTokens for VersionRange {
274    fn to_tokens(&self, tokens: &mut TokenStream) {
275        let expr = format!("{self:?}");
276        syn::parse_str::<Expr>(&expr)
277            .unwrap_or_else(|_| panic!("an expression: {self:?}"))
278            .to_tokens(tokens);
279    }
280}
281
282impl FromStr for VersionRange {
283    type Err = Error;
284
285    fn from_str(s: &str) -> Result<Self, Self::Err> {
286        lazy_static! {
287            static ref RX: Regex =
288                Regex::new(r"^(?<start>\d+)(-(?<end>\d+)|(?<infinite>\+))?$").expect("regex");
289        }
290
291        if s == "none" {
292            Ok(VersionRange { start: 0, end: 0 })
293        } else {
294            RX.captures(s)
295                .ok_or(Error::Message(format!("invalid version range format: {s}")))
296                .and_then(|captures| {
297                    let parse = |name, default: i16| {
298                        captures
299                            .name(name)
300                            .map_or(Ok(&default.to_string()[..]), |s| Ok(s.as_str()))
301                            .and_then(str::parse)
302                    };
303
304                    let start = parse("start", 0)?;
305
306                    let end = if let Some(end) = captures.name("end") {
307                        str::parse(end.as_str())?
308                    } else if captures.name("infinite").is_some() {
309                        i16::MAX
310                    } else {
311                        start
312                    };
313
314                    Ok(VersionRange { start, end })
315                })
316        }
317    }
318}
319
320#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
321/// The validity, deprecation and flexible version ranges of a Kafka API message.
322pub struct Version {
323    /// The valid version ranges of this Kafka message.
324    pub valid: VersionRange,
325    /// The deprecated version range of this Kafka message.
326    pub deprecated: Option<VersionRange>,
327    /// The range of versions where this message uses flexible encoding.
328    pub flexible: VersionRange,
329}
330
331impl ToTokens for Version {
332    fn to_tokens(&self, tokens: &mut TokenStream) {
333        let expr = with_crate!(self);
334        syn::parse_str::<Expr>(&expr).unwrap().to_tokens(tokens);
335    }
336}
337
338impl Version {
339    #[must_use]
340    pub fn valid(&self) -> VersionRange {
341        self.valid
342    }
343
344    #[must_use]
345    pub fn deprecated(&self) -> Option<VersionRange> {
346        self.deprecated
347    }
348
349    #[must_use]
350    pub fn flexible(&self) -> VersionRange {
351        self.flexible
352    }
353}
354
355impl<'a> TryFrom<&Wv<'a>> for Version {
356    type Error = Error;
357
358    fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
359        debug!("value: {:?}", value);
360
361        Ok(Self {
362            valid: value.as_a("validVersions")?,
363            deprecated: value.as_option("deprecatedVersions")?,
364            flexible: value.as_a("flexibleVersions")?,
365        })
366    }
367}
368
369#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
370/// Kafka message field schema
371pub struct Field {
372    /// The name of this field.
373    name: String,
374    /// The Kafka type of this field.
375    kind: Kind,
376    /// A comment about this field.
377    about: Option<String>,
378    /// The version range for this field.
379    versions: VersionRange,
380    /// Whether this field can be used as the key in a map.
381    map_key: Option<bool>,
382    /// Whether this field can be null.
383    nullable: Option<VersionRange>,
384    /// The tag ID for this field
385    tag: Option<u32>,
386    /// The version range in which this field can be tagged.
387    tagged: Option<VersionRange>,
388    /// The entity type of this field.
389    entity_type: Option<String>,
390    /// The default value of this field.
391    default: Option<String>,
392    /// Any fields this field contains.
393    fields: Option<Vec<Field>>,
394}
395
396impl Field {
397    #[must_use]
398    pub fn ident(&self) -> Ident {
399        match self.name.to_case(Case::Snake) {
400            reserved if is_reserved_keyword(&reserved) => {
401                Ident::new_raw(&reserved, Span::call_site())
402            }
403
404            otherwise => Ident::new(&otherwise, Span::call_site()),
405        }
406    }
407
408    #[must_use]
409    pub fn name(&self) -> &str {
410        &self.name
411    }
412
413    #[must_use]
414    pub fn kind(&self) -> &Kind {
415        &self.kind
416    }
417
418    #[must_use]
419    pub fn fields(&self) -> Option<&[Field]> {
420        self.fields.as_deref()
421    }
422
423    #[must_use]
424    pub fn versions(&self) -> VersionRange {
425        self.versions
426    }
427
428    #[must_use]
429    pub fn nullable(&self) -> Option<VersionRange> {
430        self.nullable
431    }
432
433    #[must_use]
434    pub fn tagged(&self) -> Option<VersionRange> {
435        self.tagged
436    }
437
438    #[must_use]
439    pub fn tag(&self) -> Option<u32> {
440        self.tag
441    }
442
443    #[must_use]
444    pub fn about(&self) -> Option<&str> {
445        self.about.as_deref()
446    }
447
448    #[must_use]
449    pub fn has_tags(&self) -> bool {
450        self.tagged.is_some()
451    }
452
453    #[must_use]
454    pub fn has_records(&self) -> bool {
455        self.kind.0 == "records"
456            || self
457                .fields()
458                .is_some_and(|fields| fields.iter().any(Field::has_records))
459    }
460
461    #[must_use]
462    pub fn has_float(&self) -> bool {
463        self.kind().is_float()
464            || self
465                .fields()
466                .is_some_and(|fields| fields.iter().any(Field::has_float))
467    }
468}
469
470impl<'a> TryFrom<&Wv<'a>> for Field {
471    type Error = Error;
472
473    fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
474        let name = value.as_a("name")?;
475        let kind = value.as_a("type")?;
476        let about = value.as_option("about")?;
477        let versions = value.as_a("versions")?;
478        let map_key = value.as_option("mapKey")?;
479        let nullable = value.as_option("nullableVersions")?;
480        let tag = value.as_option("tag")?;
481        let tagged = value.as_option("taggedVersions")?;
482        let entity_type = value.as_option("entityType")?;
483        let default = value.as_option("default")?;
484
485        let fields = value
486            .as_option("fields")?
487            .map_or(Ok(None), |values: &[Value]| {
488                values
489                    .iter()
490                    .try_fold(Vec::new(), |mut acc, field| {
491                        Field::try_from(&Wv::from(field)).map(|f| {
492                            acc.push(f);
493                            acc
494                        })
495                    })
496                    .map(Some)
497            })?;
498
499        Ok(Field {
500            name,
501            kind,
502            about,
503            versions,
504            map_key,
505            nullable,
506            tag,
507            tagged,
508            entity_type,
509            default,
510            fields,
511        })
512    }
513}
514
515fn is_reserved_keyword(s: &str) -> bool {
516    lazy_static! {
517        static ref RESERVED: Vec<&'static str> = vec!["as",
518        "break",
519        "const",
520        "continue",
521        "crate",
522        "else",
523        "enum",
524        "extern",
525        "false",
526        "fn",
527        "for",
528        "if",
529        "impl",
530        "in",
531        "let",
532        "loop",
533        "match",
534        "mod",
535        "move",
536        "mut",
537        "pub",
538        "ref",
539        "return",
540        "self",
541        "Self",
542        "static",
543        "struct",
544        "super",
545        "trait",
546        "true",
547        "type",
548        "unsafe",
549        "use",
550        "where",
551        "while",
552
553        // 2018 edition
554        "async",
555        "await",
556        "dyn",
557
558        // reserved
559        "abstract",
560        "become",
561        "box",
562        "do",
563        "final",
564        "macro",
565        "override",
566        "priv",
567        "typeof",
568        "unsized",
569        "virtual",
570        "yield",
571
572        // reserved 2018 edition
573        "try",
574        ];
575    }
576
577    RESERVED.contains(&s)
578}
579
580#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
581struct Header {
582    name: String,
583    valid: VersionRange,
584    flexible: VersionRange,
585    fields: Vec<Field>,
586}
587
588impl<'a> TryFrom<&Wv<'a>> for Header {
589    type Error = Error;
590
591    fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
592        let fields: &[Value] = value.as_a("fields")?;
593
594        Ok(Header {
595            name: value.as_a("name")?,
596            valid: value.as_a("validVersions")?,
597            flexible: value.as_a("flexibleVersions")?,
598            fields: fields.iter().try_fold(Vec::new(), |mut acc, field| {
599                Field::try_from(&Wv::from(field)).map(|f| {
600                    acc.push(f);
601                    acc
602                })
603            })?,
604        })
605    }
606}
607
608#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
609pub struct Message {
610    api_key: i16,
611    kind: MessageKind,
612    listeners: Option<Vec<Listener>>,
613    name: String,
614    versions: Version,
615    fields: Vec<Field>,
616    common_structs: Option<Vec<CommonStruct>>,
617}
618
619impl Message {
620    #[must_use]
621    pub fn api_key(&self) -> i16 {
622        self.api_key
623    }
624
625    #[must_use]
626    pub fn kind(&self) -> MessageKind {
627        self.kind
628    }
629
630    #[must_use]
631    pub fn name(&self) -> &str {
632        &self.name
633    }
634
635    #[must_use]
636    #[allow(clippy::missing_panics_doc)]
637    pub fn type_name(&self) -> Type {
638        syn::parse_str::<Type>(&self.name).unwrap()
639    }
640
641    #[must_use]
642    pub fn version(&self) -> Version {
643        self.versions
644    }
645
646    #[must_use]
647    pub fn listeners(&self) -> Option<&[Listener]> {
648        self.listeners.as_deref()
649    }
650
651    #[must_use]
652    pub fn fields(&self) -> &[Field] {
653        &self.fields
654    }
655
656    #[must_use]
657    #[allow(clippy::missing_panics_doc)]
658    pub fn wrapper_new_type(&self, field: &Field) -> Type {
659        syn::parse_str::<Type>(&format!("{}{}", self.name, field.name).to_case(Case::Pascal))
660            .unwrap()
661    }
662
663    #[must_use]
664    pub fn common_structs(&self) -> Option<&[CommonStruct]> {
665        self.common_structs.as_deref()
666    }
667
668    #[must_use]
669    pub fn has_records(&self) -> bool {
670        self.fields().iter().any(Field::has_records)
671    }
672
673    #[must_use]
674    pub fn has_tags(&self) -> bool {
675        self.fields().iter().any(Field::has_tags)
676    }
677
678    #[must_use]
679    pub fn has_float(&self) -> bool {
680        self.fields.iter().any(|field| field.kind().is_float())
681            || self
682                .common_structs()
683                .is_some_and(|structures| structures.iter().any(CommonStruct::has_float))
684    }
685}
686
687impl<'a> TryFrom<&Wv<'a>> for Message {
688    type Error = Error;
689
690    fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
691        let api_key = value.as_a("apiKey")?;
692        let name = value.as_a("name")?;
693        let kind = value.as_a("type")?;
694        let listeners = value
695            .as_option("listeners")?
696            .map_or(Ok(None), |maybe: &[Value]| {
697                maybe
698                    .iter()
699                    .try_fold(Vec::new(), |mut acc, listener| {
700                        Listener::try_from(listener).map(|l| {
701                            acc.push(l);
702                            acc
703                        })
704                    })
705                    .map(Some)
706            })?;
707
708        let fields = value.as_a("fields").and_then(|fields: &[Value]| {
709            fields.iter().try_fold(Vec::new(), |mut acc, field| {
710                Field::try_from(&Wv::from(field)).map(|f| {
711                    acc.push(f);
712                    acc
713                })
714            })
715        })?;
716
717        let versions = Version::try_from(value)?;
718        let common_structs =
719            value
720                .as_option("commonStructs")?
721                .map_or(Ok(None), |maybe: &[Value]| {
722                    maybe
723                        .iter()
724                        .try_fold(Vec::new(), |mut acc, value| {
725                            CommonStruct::try_from(&Wv::from(value)).map(|field| {
726                                acc.push(field);
727                                acc
728                            })
729                        })
730                        .map(Some)
731                })?;
732
733        Ok(Self {
734            api_key,
735            kind,
736            listeners,
737            name,
738            versions,
739            fields,
740            common_structs,
741        })
742    }
743}
744
745#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
746pub struct CommonStruct {
747    name: String,
748    fields: Vec<Field>,
749}
750
751impl CommonStruct {
752    #[must_use]
753    pub fn name(&self) -> &str {
754        self.name.as_str()
755    }
756
757    #[must_use]
758    #[allow(clippy::missing_panics_doc)]
759    pub fn type_name(&self) -> Type {
760        syn::parse_str::<Type>(&self.name).unwrap_or_else(|_| panic!("not a type: {self:?}"))
761    }
762
763    #[must_use]
764    pub fn fields(&self) -> &Vec<Field> {
765        &self.fields
766    }
767
768    #[must_use]
769    pub fn has_float(&self) -> bool {
770        self.fields.iter().any(|field| field.kind().is_float())
771    }
772}
773
774impl<'a> TryFrom<&Wv<'a>> for CommonStruct {
775    type Error = Error;
776
777    fn try_from(value: &Wv<'a>) -> Result<Self, Self::Error> {
778        let name = value.as_a("name")?;
779        let fields = value.as_a("fields").and_then(|fields: &[Value]| {
780            fields.iter().try_fold(Vec::new(), |mut acc, field| {
781                Field::try_from(&Wv::from(field)).map(|f| {
782                    acc.push(f);
783                    acc
784                })
785            })
786        })?;
787
788        Ok(Self { name, fields })
789    }
790}
791
792#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
793pub struct HeaderMeta {
794    pub name: &'static str,
795    pub valid: VersionRange,
796    pub flexible: VersionRange,
797    pub fields: &'static [(&'static str, &'static FieldMeta)],
798}
799
800#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
801/// Kafka API message metadata.
802pub struct MessageMeta {
803    /// The name of the Kafka API message.
804    pub name: &'static str,
805    /// The API key used by this message.
806    pub api_key: i16,
807    /// The version ranges for this message.
808    pub version: Version,
809    /// The message kind of this message.
810    pub message_kind: MessageKind,
811    /// The fields that this message describes.
812    pub fields: &'static [(&'static str, &'static FieldMeta)],
813}
814
815impl MessageMeta {
816    #[must_use]
817    pub fn is_flexible(&self, version: i16) -> bool {
818        self.version.flexible.within(version)
819    }
820
821    #[must_use]
822    pub fn structures(&self) -> BTreeMap<&str, &FieldMeta> {
823        self.fields.iter().filter(|(_, fm)| fm.is_structure()).fold(
824            BTreeMap::new(),
825            |mut acc, (name, fm)| {
826                debug!(name = self.name, field = ?name, kind = ?fm.kind.0);
827
828                if let Some(kind) = fm.kind.kind_of_sequence() {
829                    if !kind.is_primitive() {
830                        _ = acc.insert(kind.name(), fm);
831                    }
832                } else {
833                    _ = acc.insert(fm.kind.name(), fm);
834                }
835
836                let mut children = fm.structures();
837                debug!(name = self.name, field = ?name, children = ?children.keys().collect::<Vec<_>>());
838                acc.append(&mut children);
839
840                acc
841            },
842        )
843    }
844
845    #[must_use]
846    pub fn field(&self, name: &str) -> Option<&'static FieldMeta> {
847        self.fields
848            .iter()
849            .find(|(found, _)| name == *found)
850            .map(|(_, meta)| *meta)
851    }
852}
853
854#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
855/// Kafka API message field metadata
856pub struct FieldMeta {
857    /// The version range of this field.
858    pub version: VersionRange,
859    /// The version range where this field may be null.
860    pub nullable: Option<VersionRange>,
861    /// The kind (type) metadata of this field.
862    pub kind: KindMeta,
863    /// When present the tag ID of this field.
864    pub tag: Option<u32>,
865    /// The range of versions where this field is tagged.
866    pub tagged: Option<VersionRange>,
867    /// The fields contained within this structure.
868    pub fields: &'static [(&'static str, &'static FieldMeta)],
869}
870
871impl FieldMeta {
872    #[must_use]
873    pub fn is_nullable(&self, version: i16) -> bool {
874        self.nullable.is_some_and(|range| range.within(version))
875    }
876
877    #[must_use]
878    pub fn is_mandatory(&self, parent: Option<VersionRange>) -> bool {
879        self.version.is_mandatory(parent)
880    }
881
882    #[must_use]
883    pub fn is_structure(&self) -> bool {
884        self.kind
885            .kind_of_sequence()
886            .is_some_and(|sk| !sk.is_primitive())
887            || !self.fields.is_empty()
888    }
889
890    #[must_use]
891    pub fn structures(&self) -> BTreeMap<&str, &FieldMeta> {
892        self.fields.iter().filter(|(_, fm)| fm.is_structure()).fold(
893            BTreeMap::new(),
894            |mut acc, (_name, fm)| {
895                if let Some(kind) = fm.kind.kind_of_sequence() {
896                    if !kind.is_primitive() {
897                        _ = acc.insert(kind.name(), fm);
898                    }
899                }
900
901                let mut children = fm.structures();
902                acc.append(&mut children);
903                acc
904            },
905        )
906    }
907
908    pub fn field(&self, name: &str) -> Option<&FieldMeta> {
909        self.fields
910            .iter()
911            .find(|field| name == field.0)
912            .map(|(_, meta)| *meta)
913    }
914}
915
916#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
917pub struct KindMeta(pub &'static str);
918
919impl KindMeta {
920    #[must_use]
921    pub fn name(&self) -> &'static str {
922        self.0
923    }
924
925    #[must_use]
926    pub fn is_sequence(&self) -> bool {
927        self.0.starts_with("[]")
928    }
929
930    #[must_use]
931    pub fn is_primitive(&self) -> bool {
932        PRIMITIVE.contains(&self.0)
933    }
934
935    #[must_use]
936    pub fn is_string(&self) -> bool {
937        self.0 == "string"
938    }
939
940    #[must_use]
941    pub fn is_records(&self) -> bool {
942        self.0 == "records"
943    }
944
945    #[must_use]
946    pub fn kind_of_sequence(&self) -> Option<Self> {
947        if self.is_sequence() {
948            Some(Self(&self.0[2..]))
949        } else {
950            None
951        }
952    }
953}
954
955fn as_str<'v>(value: &'v Value, name: &str) -> Result<&'v str> {
956    value[name]
957        .as_str()
958        .ok_or(Error::Message(String::from(name)))
959}
960
961#[cfg(test)]
962mod tests {
963    use std::{any::type_name_of_val, collections::HashMap};
964
965    use serde_json::json;
966
967    use super::*;
968
969    const PRIMITIVES: [&str; 10] = [
970        "bool", "bytes", "float64", "int16", "int32", "int64", "int8", "string", "uint16", "uuid",
971    ];
972
973    #[test]
974    fn type_mapping_test() {
975        assert_eq!("bytes::Bytes", type_mapping("bytes"));
976        assert_eq!("f64", type_mapping("float64"));
977        assert_eq!("i16", type_mapping("int16"));
978        assert_eq!("i32", type_mapping("int32"));
979        assert_eq!("i64", type_mapping("int64"));
980        assert_eq!("i8", type_mapping("int8"));
981        assert_eq!("String", type_mapping("string"));
982        assert_eq!("u16", type_mapping("uint16"));
983        assert_eq!("[u8; 16]", type_mapping("uuid"));
984
985        assert_eq!("i16", type_mapping("[]int16"));
986        assert_eq!("SomeType", type_mapping("[]SomeType"));
987
988        assert_eq!("SomeType", type_mapping("SomeType"));
989    }
990
991    #[test]
992    fn kind_is_sequence() {
993        for primitive in PRIMITIVES {
994            let k = Kind::new(primitive);
995            assert!(!k.is_sequence());
996        }
997
998        let sequences = vec!["[]int16", "[]string", "[]SomeType"];
999
1000        for sequence in sequences {
1001            let k = Kind::new(sequence);
1002            assert!(k.is_sequence());
1003        }
1004    }
1005
1006    #[test]
1007    fn kind_is_sequence_of_primitive() {
1008        for primitive in PRIMITIVES {
1009            let k = Kind::new(primitive);
1010            assert!(!k.is_sequence_of_primitive());
1011        }
1012
1013        let primitive_sequences: Vec<String> = PRIMITIVES
1014            .iter()
1015            .map(|primitive| format!("[]{primitive}"))
1016            .collect();
1017
1018        for sequence in primitive_sequences {
1019            let k = Kind::new(sequence.as_str());
1020            assert!(k.is_sequence_of_primitive());
1021        }
1022
1023        let sequences = vec!["[]SomeType", "[]OtherType"];
1024
1025        for sequence in sequences {
1026            let k = Kind::new(sequence);
1027            assert!(!k.is_sequence_of_primitive());
1028        }
1029    }
1030
1031    #[test]
1032    fn kind_is_primitive() {
1033        for primitive in PRIMITIVES {
1034            let k = Kind::new(primitive);
1035            assert!(k.is_primitive());
1036        }
1037
1038        let primitive_sequences: Vec<String> = PRIMITIVES
1039            .iter()
1040            .map(|primitive| format!("[]{primitive}"))
1041            .collect();
1042
1043        for sequence in primitive_sequences {
1044            let k = Kind::new(sequence.as_str());
1045            assert!(!k.is_primitive());
1046        }
1047    }
1048
1049    #[test]
1050    fn listener_from_value() -> Result<()> {
1051        assert_eq!(Listener::ZkBroker, Listener::try_from(&json!("zkBroker"))?);
1052
1053        assert_eq!(Listener::Broker, Listener::try_from(&json!("broker"))?);
1054
1055        assert_eq!(
1056            Listener::Controller,
1057            Listener::try_from(&json!("controller"))?
1058        );
1059
1060        Ok(())
1061    }
1062
1063    #[test]
1064    fn message_kind_from_value() -> Result<()> {
1065        assert_eq!(
1066            MessageKind::Request,
1067            MessageKind::try_from(&json!({
1068                "type": "request"
1069            }
1070            ))?
1071        );
1072
1073        assert_eq!(
1074            MessageKind::Response,
1075            MessageKind::try_from(&json!({
1076                "type": "response"
1077            }
1078            ))?
1079        );
1080
1081        Ok(())
1082    }
1083
1084    #[test]
1085    fn version_range_from_str() -> Result<()> {
1086        assert_eq!(
1087            VersionRange { start: 0, end: 0 },
1088            VersionRange::from_str("none")?
1089        );
1090
1091        assert_eq!(
1092            VersionRange {
1093                start: 3,
1094                end: i16::MAX
1095            },
1096            VersionRange::from_str("3+")?
1097        );
1098
1099        assert_eq!(
1100            VersionRange { start: 6, end: 9 },
1101            VersionRange::from_str("6-9")?
1102        );
1103
1104        assert_eq!(
1105            VersionRange { start: 1, end: 1 },
1106            VersionRange::from_str("1")?
1107        );
1108
1109        Ok(())
1110    }
1111
1112    #[test]
1113    fn version_range_within() {
1114        {
1115            let range = VersionRange { start: 0, end: 0 };
1116            assert!(!range.within(i16::MIN));
1117            assert!(range.within(0));
1118            assert!(!range.within(1));
1119            assert!(!range.within(i16::MAX));
1120        }
1121
1122        {
1123            let range = VersionRange {
1124                start: 3,
1125                end: i16::MAX,
1126            };
1127            assert!(!range.within(i16::MIN));
1128            assert!(!range.within(2));
1129            assert!(range.within(3));
1130            assert!(range.within(i16::MAX));
1131        }
1132
1133        {
1134            let range = VersionRange { start: 6, end: 9 };
1135            assert!(!range.within(i16::MIN));
1136            assert!(!range.within(5));
1137            assert!(range.within(6));
1138            assert!(range.within(7));
1139            assert!(range.within(8));
1140            assert!(range.within(9));
1141            assert!(!range.within(10));
1142            assert!(!range.within(i16::MAX));
1143        }
1144    }
1145
1146    #[test]
1147    fn field_from_value() -> Result<()> {
1148        assert_eq!(
1149            Field {
1150                name: String::from("Topics"),
1151                kind: Kind(String::from("[]CreatableTopic")),
1152                about: Some(String::from("The topics to create.")),
1153                versions: VersionRange::from_str("0+")?,
1154                map_key: None,
1155                nullable: None,
1156                tag: None,
1157                tagged: None,
1158                entity_type: None,
1159                default: None,
1160                fields: Some(vec![Field {
1161                    name: String::from("Name"),
1162                    kind: Kind(String::from("string")),
1163                    versions: VersionRange::from_str("0+")?,
1164                    map_key: Some(true),
1165                    entity_type: Some(String::from("topicName")),
1166                    about: Some(String::from("The topic name.")),
1167                    default: None,
1168                    nullable: None,
1169                    tag: None,
1170                    tagged: None,
1171                    fields: None,
1172                }]),
1173            },
1174            serde_json::from_str::<Value>(
1175                r#"
1176                {
1177                    "name": "Topics",
1178                    "type": "[]CreatableTopic",
1179                    "versions": "0+",
1180                    "about": "The topics to create.",
1181                     "fields": [
1182                        { "name": "Name",
1183                          "type": "string",
1184                          "versions": "0+",
1185                          "mapKey": true,
1186                          "entityType": "topicName",
1187                          "about": "The topic name."
1188                        }]
1189                }
1190                "#
1191            )
1192            .map_err(Into::into)
1193            .and_then(|v| Field::try_from(&Wv::from(&v)))?
1194        );
1195
1196        Ok(())
1197    }
1198
1199    #[allow(clippy::too_many_lines)]
1200    #[test]
1201    fn tagged_field_from_value() -> Result<()> {
1202        assert_eq!(
1203            Field {
1204                name: "SupportedFeatures".into(),
1205                kind: Kind("[]SupportedFeatureKey".into()),
1206                about: Some("Features supported by the broker.".into()),
1207                versions: VersionRange {
1208                    start: 3,
1209                    end: 32767
1210                },
1211                map_key: None,
1212                nullable: None,
1213                tag: Some(0),
1214                tagged: Some(VersionRange {
1215                    start: 3,
1216                    end: 32767
1217                }),
1218                entity_type: None,
1219                default: None,
1220                fields: Some(
1221                    [
1222                        Field {
1223                            name: "Name".into(),
1224                            kind: Kind("string".into()),
1225                            about: Some("The name of the feature.".into()),
1226                            versions: VersionRange {
1227                                start: 3,
1228                                end: 32767
1229                            },
1230                            map_key: Some(true),
1231                            nullable: None,
1232                            tag: None,
1233                            tagged: None,
1234                            entity_type: None,
1235                            default: None,
1236                            fields: None
1237                        },
1238                        Field {
1239                            name: "MinVersion".into(),
1240                            kind: Kind("int16".into()),
1241                            about: Some("The minimum supported version for the feature.".into()),
1242                            versions: VersionRange {
1243                                start: 3,
1244                                end: 32767
1245                            },
1246                            map_key: None,
1247                            nullable: None,
1248                            tag: None,
1249                            tagged: None,
1250                            entity_type: None,
1251                            default: None,
1252                            fields: None
1253                        },
1254                        Field {
1255                            name: "MaxVersion".into(),
1256                            kind: Kind("int16".into()),
1257                            about: Some("The maximum supported version for the feature.".into()),
1258                            versions: VersionRange {
1259                                start: 3,
1260                                end: 32767
1261                            },
1262                            map_key: None,
1263                            nullable: None,
1264                            tag: None,
1265                            tagged: None,
1266                            entity_type: None,
1267                            default: None,
1268                            fields: None
1269                        }
1270                    ]
1271                    .into()
1272                )
1273            },
1274            serde_json::from_str::<Value>(
1275                r#"
1276                { "name":  "SupportedFeatures",
1277                  "type": "[]SupportedFeatureKey",
1278                  "ignorable": true,
1279                  "versions":  "3+",
1280                  "tag": 0,
1281                  "taggedVersions": "3+",
1282                  "about": "Features supported by the broker.",
1283                  "fields":  [
1284                                { "name": "Name",
1285                                  "type": "string",
1286                                  "versions": "3+",
1287                                  "mapKey": true,
1288                                  "about": "The name of the feature." },
1289                                { "name": "MinVersion",
1290                                  "type": "int16",
1291                                  "versions": "3+",
1292                                  "about": "The minimum supported version for the feature." },
1293                                { "name": "MaxVersion",
1294                                  "type": "int16",
1295                                  "versions": "3+",
1296                                  "about": "The maximum supported version for the feature." }
1297                             ]
1298                }
1299                "#
1300            )
1301            .map_err(Into::into)
1302            .and_then(|v| Field::try_from(&Wv::from(&v)))?
1303        );
1304
1305        Ok(())
1306    }
1307
1308    #[test]
1309    fn untagged_message() -> Result<()> {
1310        let m = serde_json::from_str::<Value>(
1311            r#"
1312            {
1313              "apiKey": 25,
1314              "type": "request",
1315              "listeners": ["zkBroker", "broker"],
1316              "name": "AddOffsetsToTxnRequest",
1317              "validVersions": "0-3",
1318              "flexibleVersions": "3+",
1319              "fields": [
1320                { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
1321                  "about": "The transactional id corresponding to the transaction."},
1322                { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
1323                  "about": "Current producer id in use by the transactional id." },
1324                { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
1325                  "about": "Current epoch associated with the producer id." },
1326                { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
1327                  "about": "The unique group identifier." }
1328              ]
1329            }
1330            "#,
1331        ).map_err(Into::into)
1332        .and_then(|v| Message::try_from(&Wv::from(&v)))?;
1333
1334        assert_eq!(MessageKind::Request, m.kind());
1335
1336        assert!(!m.has_tags());
1337
1338        Ok(())
1339    }
1340
1341    #[test]
1342    fn tagged_message() -> Result<()> {
1343        let m = Message::try_from(&Wv::from(&json!(
1344            {
1345              "apiKey": 63,
1346              "type": "request",
1347              "listeners": ["controller"],
1348              "name": "BrokerHeartbeatRequest",
1349              "validVersions": "0-1",
1350              "flexibleVersions": "0+",
1351              "fields": [
1352                { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
1353                  "about": "The broker ID." },
1354                { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
1355                  "about": "The broker epoch." },
1356                { "name": "CurrentMetadataOffset", "type": "int64", "versions": "0+",
1357                  "about": "The highest metadata offset which the broker has reached." },
1358                { "name": "WantFence", "type": "bool", "versions": "0+",
1359                  "about": "True if the broker wants to be fenced, false otherwise." },
1360                { "name": "WantShutDown", "type": "bool", "versions": "0+",
1361                  "about": "True if the broker wants to be shut down, false otherwise." },
1362                { "name": "OfflineLogDirs", "type":  "[]uuid", "versions": "1+", "taggedVersions": "1+", "tag": "0",
1363                  "about": "Log directories that failed and went offline." }
1364              ]
1365            }
1366        )))?;
1367
1368        assert_eq!(MessageKind::Request, m.kind());
1369        assert!(m.has_tags());
1370
1371        Ok(())
1372    }
1373
1374    #[allow(clippy::too_many_lines)]
1375    #[test]
1376    fn message_from_value() -> Result<()> {
1377        assert_eq!(
1378            Message {
1379                api_key: 19,
1380                kind: MessageKind::Request,
1381                listeners: Some(vec![
1382                    Listener::ZkBroker,
1383                    Listener::Broker,
1384                    Listener::Controller
1385                ]),
1386                name: String::from("CreateTopicsRequest"),
1387                versions: Version {
1388                    valid: VersionRange::from_str("0-7")?,
1389                    deprecated: Some(VersionRange::from_str("0-1")?),
1390                    flexible: VersionRange::from_str("5+")?,
1391                },
1392                common_structs: Some(vec![CommonStruct {
1393                    name: String::from("AddPartitionsToTxnTopic"),
1394                    fields: vec![
1395                        Field {
1396                            name: String::from("Name"),
1397                            kind: Kind(String::from("string")),
1398                            versions: VersionRange::from_str("0+")?,
1399                            map_key: Some(true),
1400                            nullable: None,
1401                            tag: None,
1402                            tagged: None,
1403                            default: None,
1404                            fields: None,
1405                            entity_type: Some(String::from("topicName")),
1406                            about: Some(String::from("The name of the topic.")),
1407                        },
1408                        Field {
1409                            name: String::from("Partitions"),
1410                            kind: Kind(String::from("[]int32")),
1411                            versions: VersionRange::from_str("0+")?,
1412                            about: Some(String::from(
1413                                "The partition indexes to add to the transaction"
1414                            )),
1415                            map_key: None,
1416                            nullable: None,
1417                            tag: None,
1418                            tagged: None,
1419                            default: None,
1420                            fields: None,
1421                            entity_type: None,
1422                        }
1423                    ],
1424                }]),
1425                fields: vec![Field {
1426                    name: String::from("Topics"),
1427                    kind: Kind(String::from("[]CreatableTopic")),
1428                    about: Some(String::from("The topics to create.")),
1429                    versions: VersionRange::from_str("0+")?,
1430                    map_key: None,
1431                    nullable: None,
1432                    tag: None,
1433                    tagged: None,
1434                    entity_type: None,
1435                    default: None,
1436                    fields: Some(vec![Field {
1437                        name: String::from("Name"),
1438                        kind: Kind(String::from("string")),
1439                        versions: VersionRange::from_str("0+")?,
1440                        map_key: Some(true),
1441                        entity_type: Some(String::from("topicName")),
1442                        about: Some(String::from("The topic name.")),
1443                        default: None,
1444                        nullable: None,
1445                        tag: None,
1446                        tagged: None,
1447                        fields: None,
1448                    }])
1449                }],
1450            },
1451            serde_json::from_str::<Value>(
1452                r#"
1453                {
1454                    "apiKey": 19,
1455                    "type": "request",
1456                    "listeners": ["zkBroker", "broker", "controller"],
1457                    "name": "CreateTopicsRequest",
1458                    "validVersions": "0-7",
1459                    "deprecatedVersions": "0-1",
1460                    "flexibleVersions": "5+",
1461                    "fields": [
1462                        {"name": "Topics",
1463                        "type": "[]CreatableTopic",
1464                        "versions": "0+",
1465                        "about": "The topics to create.",
1466                         "fields": [
1467                            { "name": "Name",
1468                              "type": "string",
1469                              "versions": "0+",
1470                              "mapKey": true,
1471                              "entityType": "topicName",
1472                              "about": "The topic name."
1473                        }]}],
1474                        "commonStructs": [
1475                            { "name": "AddPartitionsToTxnTopic",
1476                              "versions": "0+",
1477                              "fields": [
1478                                { "name": "Name",
1479                                  "type": "string",
1480                                  "versions": "0+",
1481                                  "mapKey": true,
1482                                  "entityType": "topicName",
1483                                  "about": "The name of the topic."
1484                                },
1485                                { "name": "Partitions",
1486                                  "type": "[]int32",
1487                                  "versions": "0+",
1488                                  "about": "The partition indexes to add to the transaction"
1489                                }]}]
1490                }
1491                "#
1492            )
1493            .map_err(Into::into)
1494            .and_then(|v| Message::try_from(&Wv::from(&v)))?
1495        );
1496
1497        Ok(())
1498    }
1499
1500    #[allow(clippy::too_many_lines)]
1501    #[test]
1502    fn header_from_value() -> Result<()> {
1503        let v = serde_json::from_str::<Value>(
1504            r#"
1505            {
1506              "type": "header",
1507              "name": "RequestHeader",
1508              "validVersions": "0-2",
1509              "flexibleVersions": "2+",
1510              "fields": [
1511                { "name": "RequestApiKey", "type": "int16", "versions": "0+",
1512                  "about": "The API key of this request." },
1513                { "name": "RequestApiVersion", "type": "int16", "versions": "0+",
1514                  "about": "The API version of this request." },
1515                { "name": "CorrelationId", "type": "int32", "versions": "0+",
1516                  "about": "The correlation ID of this request." },
1517
1518                { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true,
1519                  "flexibleVersions": "none", "about": "The client ID string." }
1520              ]
1521            }
1522            "#,
1523        )?;
1524
1525        let wv = Wv::from(&v);
1526
1527        assert_eq!(
1528            Header {
1529                name: String::from("RequestHeader"),
1530                valid: VersionRange { start: 0, end: 2 },
1531                flexible: VersionRange {
1532                    start: 2,
1533                    end: i16::MAX
1534                },
1535                fields: vec![
1536                    Field {
1537                        name: String::from("RequestApiKey"),
1538                        kind: Kind(String::from("int16")),
1539                        about: Some(String::from("The API key of this request.")),
1540                        versions: VersionRange {
1541                            start: 0,
1542                            end: 32767
1543                        },
1544                        map_key: None,
1545                        nullable: None,
1546                        tag: None,
1547                        tagged: None,
1548                        entity_type: None,
1549                        default: None,
1550                        fields: None
1551                    },
1552                    Field {
1553                        name: String::from("RequestApiVersion"),
1554                        kind: Kind(String::from("int16")),
1555                        about: Some(String::from("The API version of this request.")),
1556                        versions: VersionRange {
1557                            start: 0,
1558                            end: 32767
1559                        },
1560                        map_key: None,
1561                        nullable: None,
1562                        tag: None,
1563                        tagged: None,
1564                        entity_type: None,
1565                        default: None,
1566                        fields: None
1567                    },
1568                    Field {
1569                        name: String::from("CorrelationId"),
1570                        kind: Kind(String::from("int32")),
1571                        about: Some(String::from("The correlation ID of this request.")),
1572                        versions: VersionRange {
1573                            start: 0,
1574                            end: 32767
1575                        },
1576                        map_key: None,
1577                        nullable: None,
1578                        tag: None,
1579                        tagged: None,
1580                        entity_type: None,
1581                        default: None,
1582                        fields: None
1583                    },
1584                    Field {
1585                        name: String::from("ClientId"),
1586                        kind: Kind(String::from("string")),
1587                        about: Some(String::from("The client ID string.")),
1588                        versions: VersionRange {
1589                            start: 1,
1590                            end: 32767
1591                        },
1592                        map_key: None,
1593                        nullable: Some(VersionRange {
1594                            start: 1,
1595                            end: 32767
1596                        }),
1597                        tag: None,
1598                        tagged: None,
1599                        entity_type: None,
1600                        default: None,
1601                        fields: None
1602                    }
1603                ],
1604            },
1605            Header::try_from(&wv)?
1606        );
1607        Ok(())
1608    }
1609
1610    #[test]
1611    fn parse_expression() -> Result<()> {
1612        let Expr::Array(expression) =
1613            syn::parse_str::<Expr>(r#"[(one, "a/b/c"), (abc, 123), (pqr, a::b::c)]"#)?
1614        else {
1615            return Err(Error::Message(String::from("expecting an array")));
1616        };
1617
1618        let mut mappings = HashMap::new();
1619
1620        for expression in expression.elems {
1621            let Expr::Tuple(tuple) = expression else {
1622                return Err(Error::Message(String::from("expecting a tuple")));
1623            };
1624
1625            assert_eq!(2, tuple.elems.len());
1626
1627            println!(
1628                "i: {}, ty: {}",
1629                tuple.to_token_stream(),
1630                type_name_of_val(&tuple)
1631            );
1632
1633            let Expr::Path(ref lhs) = tuple.elems[0] else {
1634                return Err(Error::Message(String::from(
1635                    "lhs expecting a path expression",
1636                )));
1637            };
1638
1639            let Some(lhs) = lhs.path.get_ident() else {
1640                return Err(Error::Message(String::from(
1641                    "lhs expecting a path ident expression",
1642                )));
1643            };
1644
1645            _ = mappings.insert(lhs.clone(), tuple.elems[1].clone());
1646
1647            println!("lhs: {}", lhs.to_token_stream());
1648            println!("rhs: {}", tuple.elems[1].to_token_stream());
1649        }
1650
1651        let one = syn::parse_str::<Ident>("one")?;
1652        assert!(mappings.contains_key(&one));
1653
1654        Ok(())
1655    }
1656
1657    #[test]
1658    fn find_coordinator_request() -> Result<()> {
1659        let m = serde_json::from_str::<Value>(
1660            r#"
1661                {
1662                    "apiKey": 10,
1663                    "type": "request",
1664                    "listeners": ["zkBroker", "broker"],
1665                    "name": "FindCoordinatorRequest",
1666                    "validVersions": "0-4",
1667                    "deprecatedVersions": "0",
1668                    "flexibleVersions": "3+",
1669                    "fields": [
1670                        { "name": "Key", "type": "string", "versions": "0-3",
1671                        "about": "The coordinator key." },
1672                        { "name": "KeyType", "type": "int8", "versions": "1+", "default": "0", "ignorable": false,
1673                        "about": "The coordinator key type. (Group, transaction, etc.)" },
1674                        { "name": "CoordinatorKeys", "type": "[]string", "versions": "4+",
1675                        "about": "The coordinator keys." }
1676                    ]
1677                }
1678            "#,
1679        )
1680        .map_err(Into::into)
1681        .and_then(|v| Message::try_from(&Wv::from(&v)))?;
1682
1683        assert!(!m.has_records());
1684        assert_eq!(MessageKind::Request, m.kind());
1685        assert_eq!("FindCoordinatorRequest", m.name());
1686        assert_eq!(
1687            Version {
1688                valid: VersionRange { start: 0, end: 4 },
1689                deprecated: Some(VersionRange { start: 0, end: 0 }),
1690                flexible: VersionRange {
1691                    start: 3,
1692                    end: i16::MAX
1693                },
1694            },
1695            m.version()
1696        );
1697
1698        assert_eq!("Key", m.fields()[0].name());
1699        assert_eq!(Kind::new("string"), m.fields()[0].kind);
1700        assert_eq!(VersionRange { start: 0, end: 3 }, m.fields()[0].versions());
1701        assert_eq!(Some("The coordinator key.".into()), m.fields()[0].about());
1702
1703        Ok(())
1704    }
1705
1706    #[test]
1707    fn fetch_response() -> Result<()> {
1708        let m = serde_json::from_str::<Value>(
1709            r#"
1710                {
1711                    "apiKey": 1,
1712                    "type": "response",
1713                    "name": "FetchResponse",
1714                    "validVersions": "0-16",
1715                    "flexibleVersions": "12+",
1716                    "fields": [
1717                        { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "16+", "tag": 0,
1718                          "about": "Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.", "fields": [
1719                          { "name": "NodeId", "type": "int32", "versions": "16+",
1720                            "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."},
1721                          { "name": "Host", "type": "string", "versions": "16+", "about": "The node's hostname." },
1722                          { "name": "Port", "type": "int32", "versions": "16+", "about": "The node's port." },
1723                          { "name": "Rack", "type": "string", "versions": "16+", "nullableVersions": "16+", "default": "null",
1724                            "about": "The rack of the node, or null if it has not been assigned to a rack." }
1725                        ]}
1726                    ]
1727                }
1728            "#,
1729        )
1730        .map_err(Into::into)
1731        .and_then(|v| Message::try_from(&Wv::from(&v)))?;
1732
1733        assert_eq!(MessageKind::Response, m.kind());
1734        assert_eq!("FetchResponse", m.name());
1735        assert_eq!(
1736            Version {
1737                valid: VersionRange { start: 0, end: 16 },
1738                deprecated: None,
1739                flexible: VersionRange {
1740                    start: 12,
1741                    end: i16::MAX
1742                },
1743            },
1744            m.version()
1745        );
1746
1747        assert_eq!("NodeEndpoints", m.fields()[0].name());
1748        assert_eq!(Kind::new("[]NodeEndpoint"), m.fields()[0].kind);
1749        assert_eq!(
1750            VersionRange {
1751                start: 16,
1752                end: i16::MAX
1753            },
1754            m.fields()[0].versions()
1755        );
1756
1757        let node_id = &m.fields()[0].fields().unwrap()[0];
1758
1759        assert_eq!("NodeId", node_id.name());
1760        assert_eq!(Kind::new("int32"), node_id.kind);
1761        assert_eq!(
1762            VersionRange {
1763                start: 16,
1764                end: i16::MAX
1765            },
1766            node_id.versions()
1767        );
1768
1769        Ok(())
1770    }
1771}