apache-avro 0.14.0

A library for working with Apache Avro in Rust
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Logic for parsing and interacting with schemas in Avro format.
use crate::{error::Error, types, util::MapHelper, AvroResult};
use digest::Digest;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{
    ser::{SerializeMap, SerializeSeq},
    Deserialize, Serialize, Serializer,
};
use serde_json::{Map, Value};
use std::{
    borrow::Cow,
    collections::{BTreeMap, HashMap, HashSet},
    convert::{TryFrom, TryInto},
    fmt,
    hash::Hash,
    str::FromStr,
};
use strum_macros::{EnumDiscriminants, EnumString};

lazy_static! {
    static ref ENUM_SYMBOL_NAME_R: Regex = Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").unwrap();

    // An optional namespace (with optional dots) followed by a name without any dots in it.
    static ref SCHEMA_NAME_R: Regex =
        Regex::new(r"^((?P<namespace>[A-Za-z_][A-Za-z0-9_\.]*)*\.)?(?P<name>[A-Za-z_][A-Za-z0-9_]*)$").unwrap();
}

/// Represents an Avro schema fingerprint
/// More information about Avro schema fingerprints can be found in the
/// [Avro Schema Fingerprint documentation](https://avro.apache.org/docs/current/spec.html#schema_fingerprints)
pub struct SchemaFingerprint {
    pub bytes: Vec<u8>,
}

impl fmt::Display for SchemaFingerprint {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(
            f,
            "{}",
            self.bytes
                .iter()
                .map(|byte| format!("{:02x}", byte))
                .collect::<Vec<String>>()
                .join("")
        )
    }
}

/// Represents any valid Avro schema
/// More information about Avro schemas can be found in the
/// [Avro Specification](https://avro.apache.org/docs/current/spec.html#schemas)
#[derive(Clone, Debug, EnumDiscriminants)]
#[strum_discriminants(name(SchemaKind), derive(Hash, Ord, PartialOrd))]
pub enum Schema {
    /// A `null` Avro schema.
    Null,
    /// A `boolean` Avro schema.
    Boolean,
    /// An `int` Avro schema.
    Int,
    /// A `long` Avro schema.
    Long,
    /// A `float` Avro schema.
    Float,
    /// A `double` Avro schema.
    Double,
    /// A `bytes` Avro schema.
    /// `Bytes` represents a sequence of 8-bit unsigned bytes.
    Bytes,
    /// A `string` Avro schema.
    /// `String` represents a unicode character sequence.
    String,
    /// A `array` Avro schema. Avro arrays are required to have the same type for each element.
    /// This variant holds the `Schema` for the array element type.
    Array(Box<Schema>),
    /// A `map` Avro schema.
    /// `Map` holds a pointer to the `Schema` of its values, which must all be the same schema.
    /// `Map` keys are assumed to be `string`.
    Map(Box<Schema>),
    /// A `union` Avro schema.
    Union(UnionSchema),
    /// A `record` Avro schema.
    ///
    /// The `lookup` table maps field names to their position in the `Vec`
    /// of `fields`.
    Record {
        name: Name,
        aliases: Aliases,
        doc: Documentation,
        fields: Vec<RecordField>,
        lookup: BTreeMap<String, usize>,
    },
    /// An `enum` Avro schema.
    Enum {
        name: Name,
        aliases: Aliases,
        doc: Documentation,
        symbols: Vec<String>,
    },
    /// A `fixed` Avro schema.
    Fixed {
        name: Name,
        aliases: Aliases,
        doc: Documentation,
        size: usize,
    },
    /// Logical type which represents `Decimal` values. The underlying type is serialized and
    /// deserialized as `Schema::Bytes` or `Schema::Fixed`.
    ///
    /// `scale` defaults to 0 and is an integer greater than or equal to 0 and `precision` is an
    /// integer greater than 0.
    Decimal {
        precision: DecimalMetadata,
        scale: DecimalMetadata,
        inner: Box<Schema>,
    },
    /// A universally unique identifier, annotating a string.
    Uuid,
    /// Logical type which represents the number of days since the unix epoch.
    /// Serialization format is `Schema::Int`.
    Date,
    /// The time of day in number of milliseconds after midnight with no reference any calendar,
    /// time zone or date in particular.
    TimeMillis,
    /// The time of day in number of microseconds after midnight with no reference any calendar,
    /// time zone or date in particular.
    TimeMicros,
    /// An instant in time represented as the number of milliseconds after the UNIX epoch.
    TimestampMillis,
    /// An instant in time represented as the number of microseconds after the UNIX epoch.
    TimestampMicros,
    /// An amount of time defined by a number of months, days and milliseconds.
    Duration,
    // A reference to another schema.
    Ref {
        name: Name,
    },
}

impl PartialEq for Schema {
    /// Assess equality of two `Schema` based on [Parsing Canonical Form].
    ///
    /// [Parsing Canonical Form]:
    /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas
    fn eq(&self, other: &Self) -> bool {
        self.canonical_form() == other.canonical_form()
    }
}

impl SchemaKind {
    pub fn is_primitive(self) -> bool {
        matches!(
            self,
            SchemaKind::Null
                | SchemaKind::Boolean
                | SchemaKind::Int
                | SchemaKind::Long
                | SchemaKind::Double
                | SchemaKind::Float
                | SchemaKind::Bytes
                | SchemaKind::String,
        )
    }

    pub fn is_named(self) -> bool {
        matches!(
            self,
            SchemaKind::Record | SchemaKind::Enum | SchemaKind::Fixed | SchemaKind::Ref
        )
    }
}

impl From<&types::Value> for SchemaKind {
    fn from(value: &types::Value) -> Self {
        use crate::types::Value;
        match value {
            Value::Null => Self::Null,
            Value::Boolean(_) => Self::Boolean,
            Value::Int(_) => Self::Int,
            Value::Long(_) => Self::Long,
            Value::Float(_) => Self::Float,
            Value::Double(_) => Self::Double,
            Value::Bytes(_) => Self::Bytes,
            Value::String(_) => Self::String,
            Value::Array(_) => Self::Array,
            Value::Map(_) => Self::Map,
            Value::Union(_, _) => Self::Union,
            Value::Record(_) => Self::Record,
            Value::Enum(_, _) => Self::Enum,
            Value::Fixed(_, _) => Self::Fixed,
            Value::Decimal { .. } => Self::Decimal,
            Value::Uuid(_) => Self::Uuid,
            Value::Date(_) => Self::Date,
            Value::TimeMillis(_) => Self::TimeMillis,
            Value::TimeMicros(_) => Self::TimeMicros,
            Value::TimestampMillis(_) => Self::TimestampMillis,
            Value::TimestampMicros(_) => Self::TimestampMicros,
            Value::Duration { .. } => Self::Duration,
        }
    }
}

/// Represents names for `record`, `enum` and `fixed` Avro schemas.
///
/// Each of these `Schema`s have a `fullname` composed of two parts:
///   * a name
///   * a namespace
///
/// `aliases` can also be defined, to facilitate schema evolution.
///
/// More information about schema names can be found in the
/// [Avro specification](https://avro.apache.org/docs/current/spec.html#names)
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct Name {
    pub name: String,
    pub namespace: Namespace,
}

/// Represents documentation for complex Avro schemas.
pub type Documentation = Option<String>;
/// Represents the aliases for Named Schema
pub type Aliases = Option<Vec<Alias>>;
/// Represents Schema lookup within a schema env
pub(crate) type Names = HashMap<Name, Schema>;
/// Represents Schema lookup within a schema
pub(crate) type NamesRef<'a> = HashMap<Name, &'a Schema>;
/// Represents the namespace for Named Schema
pub type Namespace = Option<String>;

impl Name {
    /// Create a new `Name`.
    /// Parses the optional `namespace` from the `name` string.
    /// `aliases` will not be defined.
    pub fn new(name: &str) -> AvroResult<Self> {
        let (name, namespace) = Name::get_name_and_namespace(name)?;
        Ok(Self { name, namespace })
    }

    fn get_name_and_namespace(name: &str) -> AvroResult<(String, Namespace)> {
        let caps = SCHEMA_NAME_R
            .captures(name)
            .ok_or_else(|| Error::InvalidSchemaName(name.to_string(), SCHEMA_NAME_R.as_str()))?;
        Ok((
            caps["name"].to_string(),
            caps.name("namespace").map(|s| s.as_str().to_string()),
        ))
    }

    /// Parse a `serde_json::Value` into a `Name`.
    pub(crate) fn parse(complex: &Map<String, Value>) -> AvroResult<Self> {
        let (name, namespace_from_name) = complex
            .name()
            .map(|name| Name::get_name_and_namespace(name.as_str()).unwrap())
            .ok_or(Error::GetNameField)?;
        // FIXME Reading name from the type is wrong ! The name there is just a metadata (AVRO-3430)
        let type_name = match complex.get("type") {
            Some(Value::Object(complex_type)) => complex_type.name().or(None),
            _ => None,
        };

        Ok(Self {
            name: type_name.unwrap_or(name),
            namespace: namespace_from_name.or_else(|| complex.string("namespace")),
        })
    }

    /// Return the `fullname` of this `Name`
    ///
    /// More information about fullnames can be found in the
    /// [Avro specification](https://avro.apache.org/docs/current/spec.html#names)
    pub fn fullname(&self, default_namespace: Namespace) -> String {
        if self.name.contains('.') {
            self.name.clone()
        } else {
            let namespace = self.namespace.clone().or(default_namespace);

            match namespace {
                Some(ref namespace) => format!("{}.{}", namespace, self.name),
                None => self.name.clone(),
            }
        }
    }

    /// Return the fully qualified name needed for indexing or searching for the schema within a schema/schema env context. Puts the enclosing namespace into the name's namespace for clarity in schema/schema env parsing
    /// ```ignore
    /// use apache_avro::schema::Name;
    ///
    /// assert_eq!(
    /// Name::new("some_name").unwrap().fully_qualified_name(&Some("some_namespace".into())),
    /// Name::new("some_namespace.some_name").unwrap()
    /// );
    /// assert_eq!(
    /// Name::new("some_namespace.some_name").unwrap().fully_qualified_name(&Some("other_namespace".into())),
    /// Name::new("some_namespace.some_name").unwrap()
    /// );
    /// ```
    pub fn fully_qualified_name(&self, enclosing_namespace: &Namespace) -> Name {
        Name {
            name: self.name.clone(),
            namespace: self
                .namespace
                .clone()
                .or_else(|| enclosing_namespace.clone()),
        }
    }
}

impl From<&str> for Name {
    fn from(name: &str) -> Self {
        Name::new(name).unwrap()
    }
}

impl fmt::Display for Name {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str(&self.fullname(None)[..])
    }
}

impl<'de> Deserialize<'de> for Name {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: serde::de::Deserializer<'de>,
    {
        serde_json::Value::deserialize(deserializer).and_then(|value| {
            use serde::de::Error;
            if let Value::Object(json) = value {
                Name::parse(&json).map_err(D::Error::custom)
            } else {
                Err(D::Error::custom(format!(
                    "Expected a JSON object: {:?}",
                    value
                )))
            }
        })
    }
}

/// Newtype pattern for `Name` to better control the `serde_json::Value` representation.
/// Aliases are serialized as an array of plain strings in the JSON representation.
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct Alias(Name);

impl Alias {
    pub fn new(name: &str) -> AvroResult<Self> {
        Name::new(name).map(Self)
    }

    pub fn name(&self) -> String {
        self.0.name.clone()
    }

    pub fn namespace(&self) -> Namespace {
        self.0.namespace.clone()
    }

    pub fn fullname(&self, default_namespace: Namespace) -> String {
        self.0.fullname(default_namespace)
    }

    pub fn fully_qualified_name(&self, default_namespace: &Namespace) -> Name {
        self.0.fully_qualified_name(default_namespace)
    }
}

impl From<&str> for Alias {
    fn from(name: &str) -> Self {
        Alias::new(name).unwrap()
    }
}

impl Serialize for Alias {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        serializer.serialize_str(&self.fullname(None))
    }
}

pub(crate) struct ResolvedSchema<'s> {
    names_ref: NamesRef<'s>,
    root_schema: &'s Schema,
}

impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
    type Error = Error;

    fn try_from(schema: &'s Schema) -> AvroResult<Self> {
        let names = HashMap::new();
        let mut rs = ResolvedSchema {
            names_ref: names,
            root_schema: schema,
        };
        Self::from_internal(rs.root_schema, &mut rs.names_ref, &None)?;
        Ok(rs)
    }
}

impl<'s> ResolvedSchema<'s> {
    pub(crate) fn get_root_schema(&self) -> &'s Schema {
        self.root_schema
    }
    pub(crate) fn get_names(&self) -> &NamesRef<'s> {
        &self.names_ref
    }

    fn from_internal(
        schema: &'s Schema,
        names_ref: &mut NamesRef<'s>,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<()> {
        match schema {
            Schema::Array(schema) | Schema::Map(schema) => {
                Self::from_internal(schema, names_ref, enclosing_namespace)
            }
            Schema::Union(UnionSchema { schemas, .. }) => {
                for schema in schemas {
                    Self::from_internal(schema, names_ref, enclosing_namespace)?
                }
                Ok(())
            }
            Schema::Enum { name, .. } | Schema::Fixed { name, .. } => {
                let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
                if names_ref
                    .insert(fully_qualified_name.clone(), schema)
                    .is_some()
                {
                    Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
                } else {
                    Ok(())
                }
            }
            Schema::Record { name, fields, .. } => {
                let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
                if names_ref
                    .insert(fully_qualified_name.clone(), schema)
                    .is_some()
                {
                    Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
                } else {
                    let record_namespace = fully_qualified_name.namespace;
                    for field in fields {
                        Self::from_internal(&field.schema, names_ref, &record_namespace)?
                    }
                    Ok(())
                }
            }
            Schema::Ref { name } => {
                let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
                names_ref
                    .get(&fully_qualified_name)
                    .map(|_| ())
                    .ok_or(Error::SchemaResolutionError(fully_qualified_name))
            }
            _ => Ok(()),
        }
    }
}

pub(crate) struct ResolvedOwnedSchema {
    names: Names,
    root_schema: Schema,
}

impl TryFrom<Schema> for ResolvedOwnedSchema {
    type Error = Error;

    fn try_from(schema: Schema) -> AvroResult<Self> {
        let names = HashMap::new();
        let mut rs = ResolvedOwnedSchema {
            names,
            root_schema: schema,
        };
        Self::from_internal(&rs.root_schema, &mut rs.names, &None)?;
        Ok(rs)
    }
}

impl ResolvedOwnedSchema {
    pub(crate) fn get_root_schema(&self) -> &Schema {
        &self.root_schema
    }
    pub(crate) fn get_names(&self) -> &Names {
        &self.names
    }

    fn from_internal(
        schema: &Schema,
        names: &mut Names,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<()> {
        match schema {
            Schema::Array(schema) | Schema::Map(schema) => {
                Self::from_internal(schema, names, enclosing_namespace)
            }
            Schema::Union(UnionSchema { schemas, .. }) => {
                for schema in schemas {
                    Self::from_internal(schema, names, enclosing_namespace)?
                }
                Ok(())
            }
            Schema::Enum { name, .. } | Schema::Fixed { name, .. } => {
                let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
                if names
                    .insert(fully_qualified_name.clone(), schema.clone())
                    .is_some()
                {
                    Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
                } else {
                    Ok(())
                }
            }
            Schema::Record { name, fields, .. } => {
                let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
                if names
                    .insert(fully_qualified_name.clone(), schema.clone())
                    .is_some()
                {
                    Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
                } else {
                    let record_namespace = fully_qualified_name.namespace;
                    for field in fields {
                        Self::from_internal(&field.schema, names, &record_namespace)?
                    }
                    Ok(())
                }
            }
            Schema::Ref { name } => {
                let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
                names
                    .get(&fully_qualified_name)
                    .map(|_| ())
                    .ok_or(Error::SchemaResolutionError(fully_qualified_name))
            }
            _ => Ok(()),
        }
    }
}

/// Represents a `field` in a `record` Avro schema.
#[derive(Clone, Debug, PartialEq)]
pub struct RecordField {
    /// Name of the field.
    pub name: String,
    /// Documentation of the field.
    pub doc: Documentation,
    /// Default value of the field.
    /// This value will be used when reading Avro datum if schema resolution
    /// is enabled.
    pub default: Option<Value>,
    /// Schema of the field.
    pub schema: Schema,
    /// Order of the field.
    ///
    /// **NOTE** This currently has no effect.
    pub order: RecordFieldOrder,
    /// Position of the field in the list of `field` of its parent `Schema`
    pub position: usize,
}

/// Represents any valid order for a `field` in a `record` Avro schema.
#[derive(Clone, Debug, PartialEq, EnumString)]
#[strum(serialize_all = "kebab_case")]
pub enum RecordFieldOrder {
    Ascending,
    Descending,
    Ignore,
}

impl RecordField {
    /// Parse a `serde_json::Value` into a `RecordField`.
    fn parse(
        field: &Map<String, Value>,
        position: usize,
        parser: &mut Parser,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Self> {
        let name = field.name().ok_or(Error::GetNameFieldFromRecord)?;

        // TODO: "type" = "<record name>"
        let schema = parser.parse_complex(field, enclosing_namespace)?;

        let default = field.get("default").cloned();

        let order = field
            .get("order")
            .and_then(|order| order.as_str())
            .and_then(|order| RecordFieldOrder::from_str(order).ok())
            .unwrap_or(RecordFieldOrder::Ascending);

        Ok(RecordField {
            name,
            doc: field.doc(),
            default,
            schema,
            order,
            position,
        })
    }
}

#[derive(Debug, Clone)]
pub struct UnionSchema {
    pub(crate) schemas: Vec<Schema>,
    // Used to ensure uniqueness of schema inputs, and provide constant time finding of the
    // schema index given a value.
    // **NOTE** that this approach does not work for named types, and will have to be modified
    // to support that. A simple solution is to also keep a mapping of the names used.
    variant_index: BTreeMap<SchemaKind, usize>,
}

impl UnionSchema {
    pub(crate) fn new(schemas: Vec<Schema>) -> AvroResult<Self> {
        let mut vindex = BTreeMap::new();
        for (i, schema) in schemas.iter().enumerate() {
            if let Schema::Union(_) = schema {
                return Err(Error::GetNestedUnion);
            }
            let kind = SchemaKind::from(schema);
            if !kind.is_named() && vindex.insert(kind, i).is_some() {
                return Err(Error::GetUnionDuplicate);
            }
        }
        Ok(UnionSchema {
            schemas,
            variant_index: vindex,
        })
    }

    /// Returns a slice to all variants of this schema.
    pub fn variants(&self) -> &[Schema] {
        &self.schemas
    }

    /// Returns true if the first variant of this `UnionSchema` is `Null`.
    pub fn is_nullable(&self) -> bool {
        !self.schemas.is_empty() && self.schemas[0] == Schema::Null
    }

    /// Optionally returns a reference to the schema matched by this value, as well as its position
    /// within this union.
    pub fn find_schema(&self, value: &types::Value) -> Option<(usize, &Schema)> {
        let schema_kind = SchemaKind::from(value);
        if let Some(&i) = self.variant_index.get(&schema_kind) {
            // fast path
            Some((i, &self.schemas[i]))
        } else {
            // slow path (required for matching logical or named types)
            self.schemas
                .iter()
                .enumerate()
                .find(|(_, schema)| value.validate(schema))
        }
    }
}

// No need to compare variant_index, it is derivative of schemas.
impl PartialEq for UnionSchema {
    fn eq(&self, other: &UnionSchema) -> bool {
        self.schemas.eq(&other.schemas)
    }
}

type DecimalMetadata = usize;
pub(crate) type Precision = DecimalMetadata;
pub(crate) type Scale = DecimalMetadata;

fn parse_json_integer_for_decimal(value: &serde_json::Number) -> Result<DecimalMetadata, Error> {
    Ok(if value.is_u64() {
        let num = value
            .as_u64()
            .ok_or_else(|| Error::GetU64FromJson(value.clone()))?;
        num.try_into()
            .map_err(|e| Error::ConvertU64ToUsize(e, num))?
    } else if value.is_i64() {
        let num = value
            .as_i64()
            .ok_or_else(|| Error::GetI64FromJson(value.clone()))?;
        num.try_into()
            .map_err(|e| Error::ConvertI64ToUsize(e, num))?
    } else {
        return Err(Error::GetPrecisionOrScaleFromJson(value.clone()));
    })
}

#[derive(Default)]
struct Parser {
    input_schemas: HashMap<Name, Value>,
    // A map of name -> Schema::Ref
    // Used to resolve cyclic references, i.e. when a
    // field's type is a reference to its record's type
    resolving_schemas: Names,
    input_order: Vec<Name>,
    // A map of name -> fully parsed Schema
    // Used to avoid parsing the same schema twice
    parsed_schemas: Names,
}

impl Schema {
    /// Converts `self` into its [Parsing Canonical Form].
    ///
    /// [Parsing Canonical Form]:
    /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas
    pub fn canonical_form(&self) -> String {
        let json = serde_json::to_value(self)
            .unwrap_or_else(|e| panic!("Cannot parse Schema from JSON: {0}", e));
        parsing_canonical_form(&json)
    }

    /// Generate [fingerprint] of Schema's [Parsing Canonical Form].
    ///
    /// [Parsing Canonical Form]:
    /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas
    /// [fingerprint]:
    /// https://avro.apache.org/docs/current/spec.html#schema_fingerprints
    pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
        let mut d = D::new();
        d.update(self.canonical_form());
        SchemaFingerprint {
            bytes: d.finalize().to_vec(),
        }
    }

    /// Create a `Schema` from a string representing a JSON Avro schema.
    pub fn parse_str(input: &str) -> Result<Schema, Error> {
        let mut parser = Parser::default();
        parser.parse_str(input)
    }

    /// Create a array of `Schema`'s from a list of named JSON Avro schemas (Record, Enum, and
    /// Fixed).
    ///
    /// It is allowed that the schemas have cross-dependencies; these will be resolved
    /// during parsing.
    ///
    /// If two of the input schemas have the same fullname, an Error will be returned.
    pub fn parse_list(input: &[&str]) -> Result<Vec<Schema>, Error> {
        let mut input_schemas: HashMap<Name, Value> = HashMap::with_capacity(input.len());
        let mut input_order: Vec<Name> = Vec::with_capacity(input.len());
        for js in input {
            let schema: Value = serde_json::from_str(js).map_err(Error::ParseSchemaJson)?;
            if let Value::Object(inner) = &schema {
                let name = Name::parse(inner)?;
                let previous_value = input_schemas.insert(name.clone(), schema);
                if previous_value.is_some() {
                    return Err(Error::NameCollision(name.fullname(None)));
                }
                input_order.push(name);
            } else {
                return Err(Error::GetNameField);
            }
        }
        let mut parser = Parser {
            input_schemas,
            resolving_schemas: HashMap::default(),
            input_order,
            parsed_schemas: HashMap::with_capacity(input.len()),
        };
        parser.parse_list()
    }

    pub fn parse(value: &Value) -> AvroResult<Schema> {
        let mut parser = Parser::default();
        parser.parse(value, &None)
    }
}

impl Parser {
    /// Create a `Schema` from a string representing a JSON Avro schema.
    fn parse_str(&mut self, input: &str) -> Result<Schema, Error> {
        let value = serde_json::from_str(input).map_err(Error::ParseSchemaJson)?;
        self.parse(&value, &None)
    }

    /// Create an array of `Schema`'s from an iterator of JSON Avro schemas. It is allowed that
    /// the schemas have cross-dependencies; these will be resolved during parsing.
    fn parse_list(&mut self) -> Result<Vec<Schema>, Error> {
        while !self.input_schemas.is_empty() {
            let next_name = self
                .input_schemas
                .keys()
                .next()
                .expect("Input schemas unexpectedly empty")
                .to_owned();
            let (name, value) = self
                .input_schemas
                .remove_entry(&next_name)
                .expect("Key unexpectedly missing");
            let parsed = self.parse(&value, &None)?;
            self.parsed_schemas
                .insert(get_schema_type_name(name, value), parsed);
        }

        let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
        for name in self.input_order.drain(0..) {
            let parsed = self
                .parsed_schemas
                .remove(&name)
                .expect("One of the input schemas was unexpectedly not parsed");
            parsed_schemas.push(parsed);
        }
        Ok(parsed_schemas)
    }

    /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro
    /// schema.
    fn parse(&mut self, value: &Value, enclosing_namespace: &Namespace) -> AvroResult<Schema> {
        match *value {
            Value::String(ref t) => self.parse_known_schema(t.as_str(), enclosing_namespace),
            Value::Object(ref data) => self.parse_complex(data, enclosing_namespace),
            Value::Array(ref data) => self.parse_union(data, enclosing_namespace),
            _ => Err(Error::ParseSchemaFromValidJson),
        }
    }

    /// Parse a `serde_json::Value` representing an Avro type whose Schema is known into a
    /// `Schema`. A Schema for a `serde_json::Value` is known if it is primitive or has
    /// been parsed previously by the parsed and stored in its map of parsed_schemas.
    fn parse_known_schema(
        &mut self,
        name: &str,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        match name {
            "null" => Ok(Schema::Null),
            "boolean" => Ok(Schema::Boolean),
            "int" => Ok(Schema::Int),
            "long" => Ok(Schema::Long),
            "double" => Ok(Schema::Double),
            "float" => Ok(Schema::Float),
            "bytes" => Ok(Schema::Bytes),
            "string" => Ok(Schema::String),
            _ => self.fetch_schema_ref(name, enclosing_namespace),
        }
    }

    /// Given a name, tries to retrieve the parsed schema from `parsed_schemas`.
    /// If a parsed schema is not found, it checks if a currently resolving
    /// schema with that name exists.
    /// If a resolving schema is not found, it checks if a json with that name exists
    /// in `input_schemas` and then parses it (removing it from `input_schemas`)
    /// and adds the parsed schema to `parsed_schemas`.
    ///
    /// This method allows schemas definitions that depend on other types to
    /// parse their dependencies (or look them up if already parsed).
    fn fetch_schema_ref(
        &mut self,
        name: &str,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        fn get_schema_ref(parsed: &Schema) -> Schema {
            match &parsed {
                Schema::Record { ref name, .. }
                | Schema::Enum { ref name, .. }
                | Schema::Fixed { ref name, .. } => Schema::Ref { name: name.clone() },
                _ => parsed.clone(),
            }
        }

        let name = Name::new(name)?;
        let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);

        if self.parsed_schemas.get(&fully_qualified_name).is_some() {
            return Ok(Schema::Ref { name });
        }
        if let Some(resolving_schema) = self.resolving_schemas.get(&fully_qualified_name) {
            return Ok(resolving_schema.clone());
        }

        let value = self
            .input_schemas
            .remove(&fully_qualified_name)
            // TODO make a better descriptive error message here that conveys that a named schema cannot be found
            .ok_or_else(|| Error::ParsePrimitive(fully_qualified_name.fullname(None)))?;

        // parsing a full schema from inside another schema. Other full schema will not inherit namespace
        let parsed = self.parse(&value, &None)?;
        self.parsed_schemas
            .insert(get_schema_type_name(name, value), parsed.clone());

        Ok(get_schema_ref(&parsed))
    }

    fn parse_precision_and_scale(
        complex: &Map<String, Value>,
    ) -> Result<(Precision, Scale), Error> {
        fn get_decimal_integer(
            complex: &Map<String, Value>,
            key: &'static str,
        ) -> Result<DecimalMetadata, Error> {
            match complex.get(key) {
                Some(&Value::Number(ref value)) => parse_json_integer_for_decimal(value),
                None => {
                    if key == "scale" {
                        Ok(0)
                    } else {
                        Err(Error::GetDecimalMetadataFromJson(key))
                    }
                }
                Some(value) => Err(Error::GetDecimalMetadataValueFromJson {
                    key: key.into(),
                    value: value.clone(),
                }),
            }
        }
        let precision = get_decimal_integer(complex, "precision")?;
        let scale = get_decimal_integer(complex, "scale")?;

        if precision < 1 {
            return Err(Error::DecimalPrecisionMuBePositive { precision });
        }

        if precision < scale {
            Err(Error::DecimalPrecisionLessThanScale { precision, scale })
        } else {
            Ok((precision, scale))
        }
    }

    /// Parse a `serde_json::Value` representing a complex Avro type into a
    /// `Schema`.
    ///
    /// Avro supports "recursive" definition of types.
    /// e.g: {"type": {"type": "string"}}
    fn parse_complex(
        &mut self,
        complex: &Map<String, Value>,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        fn logical_verify_type(
            complex: &Map<String, Value>,
            kinds: &[SchemaKind],
            parser: &mut Parser,
            enclosing_namespace: &Namespace,
        ) -> AvroResult<Schema> {
            match complex.get("type") {
                Some(value) => {
                    let ty = parser.parse(value, enclosing_namespace)?;

                    if kinds
                        .iter()
                        .any(|&kind| SchemaKind::from(ty.clone()) == kind)
                    {
                        Ok(ty)
                    } else {
                        match get_type_rec(value.clone()) {
                            Ok(v) => Err(Error::GetLogicalTypeVariant(v)),
                            Err(err) => Err(err),
                        }
                    }
                }
                None => Err(Error::GetLogicalTypeField),
            }
        }

        fn get_type_rec(json_value: Value) -> AvroResult<Value> {
            match json_value {
                typ @ Value::String(_) => Ok(typ),
                Value::Object(ref complex) => match complex.get("type") {
                    Some(v) => get_type_rec(v.clone()),
                    None => Err(Error::GetComplexTypeField),
                },
                _ => Err(Error::GetComplexTypeField),
            }
        }

        // checks whether the logicalType is supported by the type
        fn try_logical_type(
            logical_type: &str,
            complex: &Map<String, Value>,
            kinds: &[SchemaKind],
            ok_schema: Schema,
            parser: &mut Parser,
            enclosing_namespace: &Namespace,
        ) -> AvroResult<Schema> {
            match logical_verify_type(complex, kinds, parser, enclosing_namespace) {
                // type and logicalType match!
                Ok(_) => Ok(ok_schema),
                // the logicalType is not expected for this type!
                Err(Error::GetLogicalTypeVariant(json_value)) => match json_value {
                    Value::String(_) => match parser.parse(&json_value, enclosing_namespace) {
                        Ok(schema) => {
                            warn!(
                                "Ignoring invalid logical type '{}' for schema of type: {:?}!",
                                logical_type, schema
                            );
                            Ok(schema)
                        }
                        Err(parse_err) => Err(parse_err),
                    },
                    _ => Err(Error::GetLogicalTypeVariant(json_value)),
                },
                err => err,
            }
        }

        match complex.get("logicalType") {
            Some(&Value::String(ref t)) => match t.as_str() {
                "decimal" => {
                    let inner = Box::new(logical_verify_type(
                        complex,
                        &[SchemaKind::Fixed, SchemaKind::Bytes],
                        self,
                        enclosing_namespace,
                    )?);

                    let (precision, scale) = Self::parse_precision_and_scale(complex)?;

                    return Ok(Schema::Decimal {
                        precision,
                        scale,
                        inner,
                    });
                }
                "uuid" => {
                    logical_verify_type(complex, &[SchemaKind::String], self, enclosing_namespace)?;
                    return Ok(Schema::Uuid);
                }
                "date" => {
                    return try_logical_type(
                        "date",
                        complex,
                        &[SchemaKind::Int],
                        Schema::Date,
                        self,
                        enclosing_namespace,
                    );
                }
                "time-millis" => {
                    return try_logical_type(
                        "time-millis",
                        complex,
                        &[SchemaKind::Int],
                        Schema::TimeMillis,
                        self,
                        enclosing_namespace,
                    );
                }
                "time-micros" => {
                    return try_logical_type(
                        "time-micros",
                        complex,
                        &[SchemaKind::Long],
                        Schema::TimeMicros,
                        self,
                        enclosing_namespace,
                    );
                }
                "timestamp-millis" => {
                    return try_logical_type(
                        "timestamp-millis",
                        complex,
                        &[SchemaKind::Long],
                        Schema::TimestampMillis,
                        self,
                        enclosing_namespace,
                    );
                }
                "timestamp-micros" => {
                    return try_logical_type(
                        "timestamp-micros",
                        complex,
                        &[SchemaKind::Long],
                        Schema::TimestampMicros,
                        self,
                        enclosing_namespace,
                    );
                }
                "duration" => {
                    logical_verify_type(complex, &[SchemaKind::Fixed], self, enclosing_namespace)?;
                    return Ok(Schema::Duration);
                }
                // In this case, of an unknown logical type, we just pass through to the underlying
                // type.
                _ => {}
            },
            // The spec says to ignore invalid logical types and just continue through to the
            // underlying type - It is unclear whether that applies to this case or not, where the
            // `logicalType` is not a string.
            Some(_) => return Err(Error::GetLogicalTypeFieldType),
            _ => {}
        }
        match complex.get("type") {
            Some(&Value::String(ref t)) => match t.as_str() {
                "record" => self.parse_record(complex, enclosing_namespace),
                "enum" => self.parse_enum(complex, enclosing_namespace),
                "array" => self.parse_array(complex, enclosing_namespace),
                "map" => self.parse_map(complex, enclosing_namespace),
                "fixed" => self.parse_fixed(complex, enclosing_namespace),
                other => self.parse_known_schema(other, enclosing_namespace),
            },
            Some(&Value::Object(ref data)) => self.parse_complex(data, enclosing_namespace),
            Some(&Value::Array(ref variants)) => self.parse_union(variants, enclosing_namespace),
            Some(unknown) => Err(Error::GetComplexType(unknown.clone())),
            None => Err(Error::GetComplexTypeField),
        }
    }

    fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
        let resolving_schema = Schema::Ref { name: name.clone() };
        self.resolving_schemas
            .insert(name.clone(), resolving_schema.clone());

        let namespace = &name.namespace;

        if let Some(ref aliases) = aliases {
            aliases.iter().for_each(|alias| {
                let alias_fullname = alias.fully_qualified_name(namespace);
                self.resolving_schemas
                    .insert(alias_fullname, resolving_schema.clone());
            });
        }
    }

    fn register_parsed_schema(
        &mut self,
        fully_qualified_name: &Name,
        schema: &Schema,
        aliases: &Aliases,
    ) {
        // FIXME, this should be globally aware, so if there is something overwriting something
        // else then there is an ambiguous schema definition. An appropriate error should be thrown
        self.parsed_schemas
            .insert(fully_qualified_name.clone(), schema.clone());
        self.resolving_schemas.remove(fully_qualified_name);

        let namespace = &fully_qualified_name.namespace;

        if let Some(ref aliases) = aliases {
            aliases.iter().for_each(|alias| {
                let alias_fullname = alias.fully_qualified_name(namespace);
                self.resolving_schemas.remove(&alias_fullname);
                self.parsed_schemas.insert(alias_fullname, schema.clone());
            });
        }
    }

    /// Returns already parsed schema or a schema that is currently being resolved.
    fn get_already_seen_schema(
        &self,
        complex: &Map<String, Value>,
        enclosing_namespace: &Namespace,
    ) -> Option<&Schema> {
        match complex.get("type") {
            Some(Value::String(ref typ)) => {
                let name = Name::new(typ.as_str())
                    .unwrap()
                    .fully_qualified_name(enclosing_namespace);
                self.resolving_schemas
                    .get(&name)
                    .or_else(|| self.parsed_schemas.get(&name))
            }
            _ => None,
        }
    }

    /// Parse a `serde_json::Value` representing a Avro record type into a
    /// `Schema`.
    fn parse_record(
        &mut self,
        complex: &Map<String, Value>,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        let fields_opt = complex.get("fields");

        if fields_opt.is_none() {
            if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
                return Ok(seen.clone());
            }
        }

        let name = Name::parse(complex)?;
        let aliases = fix_aliases_namespace(complex.aliases(), &name.namespace);

        let mut lookup = BTreeMap::new();
        let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
        self.register_resolving_schema(&fully_qualified_name, &aliases);

        let fields: Vec<RecordField> = fields_opt
            .and_then(|fields| fields.as_array())
            .ok_or(Error::GetRecordFieldsJson)
            .and_then(|fields| {
                fields
                    .iter()
                    .filter_map(|field| field.as_object())
                    .enumerate()
                    .map(|(position, field)| {
                        RecordField::parse(field, position, self, &fully_qualified_name.namespace)
                    })
                    .collect::<Result<_, _>>()
            })?;

        for field in &fields {
            lookup.insert(field.name.clone(), field.position);
        }

        let schema = Schema::Record {
            name,
            aliases: aliases.clone(),
            doc: complex.doc(),
            fields,
            lookup,
        };

        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
        Ok(schema)
    }

    /// Parse a `serde_json::Value` representing a Avro enum type into a
    /// `Schema`.
    fn parse_enum(
        &mut self,
        complex: &Map<String, Value>,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        let symbols_opt = complex.get("symbols");

        if symbols_opt.is_none() {
            if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
                return Ok(seen.clone());
            }
        }

        let name = Name::parse(complex)?;
        let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
        let aliases = fix_aliases_namespace(complex.aliases(), &name.namespace);

        let symbols: Vec<String> = symbols_opt
            .and_then(|v| v.as_array())
            .ok_or(Error::GetEnumSymbolsField)
            .and_then(|symbols| {
                symbols
                    .iter()
                    .map(|symbol| symbol.as_str().map(|s| s.to_string()))
                    .collect::<Option<_>>()
                    .ok_or(Error::GetEnumSymbols)
            })?;

        let mut existing_symbols: HashSet<&String> = HashSet::with_capacity(symbols.len());
        for symbol in symbols.iter() {
            // Ensure enum symbol names match [A-Za-z_][A-Za-z0-9_]*
            if !ENUM_SYMBOL_NAME_R.is_match(symbol) {
                return Err(Error::EnumSymbolName(symbol.to_string()));
            }

            // Ensure there are no duplicate symbols
            if existing_symbols.contains(&symbol) {
                return Err(Error::EnumSymbolDuplicate(symbol.to_string()));
            }

            existing_symbols.insert(symbol);
        }

        let schema = Schema::Enum {
            name,
            aliases: aliases.clone(),
            doc: complex.doc(),
            symbols,
        };

        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);

        Ok(schema)
    }

    /// Parse a `serde_json::Value` representing a Avro array type into a
    /// `Schema`.
    fn parse_array(
        &mut self,
        complex: &Map<String, Value>,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        complex
            .get("items")
            .ok_or(Error::GetArrayItemsField)
            .and_then(|items| self.parse(items, enclosing_namespace))
            .map(|schema| Schema::Array(Box::new(schema)))
    }

    /// Parse a `serde_json::Value` representing a Avro map type into a
    /// `Schema`.
    fn parse_map(
        &mut self,
        complex: &Map<String, Value>,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        complex
            .get("values")
            .ok_or(Error::GetMapValuesField)
            .and_then(|items| self.parse(items, enclosing_namespace))
            .map(|schema| Schema::Map(Box::new(schema)))
    }

    /// Parse a `serde_json::Value` representing a Avro union type into a
    /// `Schema`.
    fn parse_union(
        &mut self,
        items: &[Value],
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        items
            .iter()
            .map(|v| self.parse(v, enclosing_namespace))
            .collect::<Result<Vec<_>, _>>()
            .and_then(|schemas| Ok(Schema::Union(UnionSchema::new(schemas)?)))
    }

    /// Parse a `serde_json::Value` representing a Avro fixed type into a
    /// `Schema`.
    fn parse_fixed(
        &mut self,
        complex: &Map<String, Value>,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        let size_opt = complex.get("size");
        if size_opt.is_none() {
            if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
                return Ok(seen.clone());
            }
        }

        let doc = complex.get("doc").and_then(|v| match &v {
            Value::String(ref docstr) => Some(docstr.clone()),
            _ => None,
        });

        let size = size_opt
            .and_then(|v| v.as_i64())
            .ok_or(Error::GetFixedSizeField)?;

        let name = Name::parse(complex)?;
        let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
        let aliases = fix_aliases_namespace(complex.aliases(), &name.namespace);

        let schema = Schema::Fixed {
            name,
            aliases: aliases.clone(),
            doc,
            size: size as usize,
        };

        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);

        Ok(schema)
    }
}

// A type alias may be specified either as a fully namespace-qualified, or relative
// to the namespace of the name it is an alias for. For example, if a type named "a.b"
// has aliases of "c" and "x.y", then the fully qualified names of its aliases are "a.c"
// and "x.y".
// https://avro.apache.org/docs/current/spec.html#Aliases
fn fix_aliases_namespace(aliases: Option<Vec<String>>, namespace: &Namespace) -> Aliases {
    aliases.map(|aliases| {
        aliases
            .iter()
            .map(|alias| {
                if alias.find('.').is_none() {
                    match namespace {
                        Some(ref ns) => format!("{}.{}", ns, alias),
                        None => alias.clone(),
                    }
                } else {
                    alias.clone()
                }
            })
            .map(|alias| Alias::new(alias.as_str()).unwrap())
            .collect()
    })
}

fn get_schema_type_name(name: Name, value: Value) -> Name {
    match value.get("type") {
        Some(Value::Object(complex_type)) => match complex_type.name() {
            Some(name) => Name::new(name.as_str()).unwrap(),
            _ => name,
        },
        _ => name,
    }
}

impl Serialize for Schema {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        match *self {
            Schema::Ref { ref name } => serializer.serialize_str(&name.fullname(None)),
            Schema::Null => serializer.serialize_str("null"),
            Schema::Boolean => serializer.serialize_str("boolean"),
            Schema::Int => serializer.serialize_str("int"),
            Schema::Long => serializer.serialize_str("long"),
            Schema::Float => serializer.serialize_str("float"),
            Schema::Double => serializer.serialize_str("double"),
            Schema::Bytes => serializer.serialize_str("bytes"),
            Schema::String => serializer.serialize_str("string"),
            Schema::Array(ref inner) => {
                let mut map = serializer.serialize_map(Some(2))?;
                map.serialize_entry("type", "array")?;
                map.serialize_entry("items", &*inner.clone())?;
                map.end()
            }
            Schema::Map(ref inner) => {
                let mut map = serializer.serialize_map(Some(2))?;
                map.serialize_entry("type", "map")?;
                map.serialize_entry("values", &*inner.clone())?;
                map.end()
            }
            Schema::Union(ref inner) => {
                let variants = inner.variants();
                let mut seq = serializer.serialize_seq(Some(variants.len()))?;
                for v in variants {
                    seq.serialize_element(v)?;
                }
                seq.end()
            }
            Schema::Record {
                ref name,
                ref aliases,
                ref doc,
                ref fields,
                ..
            } => {
                let mut map = serializer.serialize_map(None)?;
                map.serialize_entry("type", "record")?;
                if let Some(ref n) = name.namespace {
                    map.serialize_entry("namespace", n)?;
                }
                map.serialize_entry("name", &name.name)?;
                if let Some(ref docstr) = doc {
                    map.serialize_entry("doc", docstr)?;
                }
                if let Some(ref aliases) = aliases {
                    map.serialize_entry("aliases", aliases)?;
                }
                map.serialize_entry("fields", fields)?;
                map.end()
            }
            Schema::Enum {
                ref name,
                ref symbols,
                ref aliases,
                ..
            } => {
                let mut map = serializer.serialize_map(None)?;
                map.serialize_entry("type", "enum")?;
                if let Some(ref n) = name.namespace {
                    map.serialize_entry("namespace", n)?;
                }
                map.serialize_entry("name", &name.name)?;
                map.serialize_entry("symbols", symbols)?;

                if let Some(ref aliases) = aliases {
                    map.serialize_entry("aliases", aliases)?;
                }
                map.end()
            }
            Schema::Fixed {
                ref name,
                ref doc,
                ref size,
                ref aliases,
                ..
            } => {
                let mut map = serializer.serialize_map(None)?;
                map.serialize_entry("type", "fixed")?;
                if let Some(ref n) = name.namespace {
                    map.serialize_entry("namespace", n)?;
                }
                map.serialize_entry("name", &name.name)?;
                if let Some(ref docstr) = doc {
                    map.serialize_entry("doc", docstr)?;
                }
                map.serialize_entry("size", size)?;

                if let Some(ref aliases) = aliases {
                    map.serialize_entry("aliases", aliases)?;
                }
                map.end()
            }
            Schema::Decimal {
                ref scale,
                ref precision,
                ref inner,
            } => {
                let mut map = serializer.serialize_map(None)?;
                map.serialize_entry("type", &*inner.clone())?;
                map.serialize_entry("logicalType", "decimal")?;
                map.serialize_entry("scale", scale)?;
                map.serialize_entry("precision", precision)?;
                map.end()
            }
            Schema::Uuid => {
                let mut map = serializer.serialize_map(None)?;
                map.serialize_entry("type", "string")?;
                map.serialize_entry("logicalType", "uuid")?;
                map.end()
            }
            Schema::Date => {
                let mut map = serializer.serialize_map(None)?;
                map.serialize_entry("type", "int")?;
                map.serialize_entry("logicalType", "date")?;
                map.end()
            }
            Schema::TimeMillis => {
                let mut map = serializer.serialize_map(None)?;
                map.serialize_entry("type", "int")?;
                map.serialize_entry("logicalType", "time-millis")?;
                map.end()
            }
            Schema::TimeMicros => {
                let mut map = serializer.serialize_map(None)?;
                map.serialize_entry("type", "long")?;
                map.serialize_entry("logicalType", "time-micros")?;
                map.end()
            }
            Schema::TimestampMillis => {
                let mut map = serializer.serialize_map(None)?;
                map.serialize_entry("type", "long")?;
                map.serialize_entry("logicalType", "timestamp-millis")?;
                map.end()
            }
            Schema::TimestampMicros => {
                let mut map = serializer.serialize_map(None)?;
                map.serialize_entry("type", "long")?;
                map.serialize_entry("logicalType", "timestamp-micros")?;
                map.end()
            }
            Schema::Duration => {
                let mut map = serializer.serialize_map(None)?;

                // the Avro doesn't indicate what the name of the underlying fixed type of a
                // duration should be or typically is.
                let inner = Schema::Fixed {
                    name: Name::new("duration").unwrap(),
                    aliases: None,
                    doc: None,
                    size: 12,
                };
                map.serialize_entry("type", &inner)?;
                map.serialize_entry("logicalType", "duration")?;
                map.end()
            }
        }
    }
}

impl Serialize for RecordField {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        let mut map = serializer.serialize_map(None)?;
        map.serialize_entry("name", &self.name)?;
        map.serialize_entry("type", &self.schema)?;

        if let Some(ref default) = self.default {
            map.serialize_entry("default", default)?;
        }

        map.end()
    }
}

/// Parses a **valid** avro schema into the Parsing Canonical Form.
/// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas
fn parsing_canonical_form(schema: &serde_json::Value) -> String {
    match schema {
        serde_json::Value::Object(map) => pcf_map(map),
        serde_json::Value::String(s) => pcf_string(s),
        serde_json::Value::Array(v) => pcf_array(v),
        json => panic!(
            "got invalid JSON value for canonical form of schema: {0}",
            json
        ),
    }
}

fn pcf_map(schema: &Map<String, serde_json::Value>) -> String {
    // Look for the namespace variant up front.
    let ns = schema.get("namespace").and_then(|v| v.as_str());
    let mut fields = Vec::new();
    for (k, v) in schema {
        // Reduce primitive types to their simple form. ([PRIMITIVE] rule)
        if schema.len() == 1 && k == "type" {
            // Invariant: function is only callable from a valid schema, so this is acceptable.
            if let serde_json::Value::String(s) = v {
                return pcf_string(s);
            }
        }

        // Strip out unused fields ([STRIP] rule)
        if field_ordering_position(k).is_none() || k == "default" || k == "doc" || k == "aliases" {
            continue;
        }

        // Fully qualify the name, if it isn't already ([FULLNAMES] rule).
        if k == "name" {
            // Invariant: Only valid schemas. Must be a string.
            let name = v.as_str().unwrap();
            let n = match ns {
                Some(namespace) if !name.contains('.') => {
                    Cow::Owned(format!("{}.{}", namespace, name))
                }
                _ => Cow::Borrowed(name),
            };

            fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
            continue;
        }

        // Strip off quotes surrounding "size" type, if they exist ([INTEGERS] rule).
        if k == "size" || k == "precision" || k == "scale" {
            let i = match v.as_str() {
                Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
                None => v.as_i64().unwrap(),
            };
            fields.push((k, format!("{}:{}", pcf_string(k), i)));
            continue;
        }

        // For anything else, recursively process the result.
        fields.push((
            k,
            format!("{}:{}", pcf_string(k), parsing_canonical_form(v)),
        ));
    }

    // Sort the fields by their canonical ordering ([ORDER] rule).
    fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
    let inter = fields
        .into_iter()
        .map(|(_, v)| v)
        .collect::<Vec<_>>()
        .join(",");
    format!("{{{}}}", inter)
}

fn pcf_array(arr: &[serde_json::Value]) -> String {
    let inter = arr
        .iter()
        .map(parsing_canonical_form)
        .collect::<Vec<String>>()
        .join(",");
    format!("[{}]", inter)
}

fn pcf_string(s: &str) -> String {
    format!("\"{}\"", s)
}

const RESERVED_FIELDS: &[&str] = &[
    "name",
    "type",
    "fields",
    "symbols",
    "items",
    "values",
    "size",
    "logicalType",
    "order",
    "doc",
    "aliases",
    "default",
    "precision",
    "scale",
];

// Used to define the ordering and inclusion of fields.
fn field_ordering_position(field: &str) -> Option<usize> {
    RESERVED_FIELDS
        .iter()
        .position(|&f| f == field)
        .map(|pos| pos + 1)
}

/// Trait for types that serve as an Avro data model. Derive implementation available
/// through `derive` feature. Do not implement directly!
/// Implement `apache_avro::schema::derive::AvroSchemaComponent` to get this trait
/// through a blanket implementation.
pub trait AvroSchema {
    fn get_schema() -> Schema;
}

#[cfg(feature = "derive")]
pub mod derive {
    use super::*;

    /// Trait for types that serve as fully defined components inside an Avro data model. Derive
    /// implementation available through `derive` feature. This is what is implemented by
    /// the `derive(AvroSchema)` macro.
    ///
    /// # Implementation guide
    ///
    ///### Simple implementation
    /// To construct a non named simple schema, it is possible to ignore the input argument making the
    /// general form implementation look like
    /// ```ignore
    /// impl AvroSchemaComponent for AType {
    ///     fn get_schema_in_ctxt(_: &mut Names, _: &Namespace) -> Schema {
    ///        Schema::?
    ///    }
    ///}
    /// ```
    /// ### Passthrough implementation
    /// To construct a schema for a Type that acts as in "inner" type, such as for smart pointers, simply
    /// pass through the arguments to the inner type
    /// ```ignore
    /// impl AvroSchemaComponent for PassthroughType {
    ///     fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
    ///        InnerType::get_schema_in_ctxt(names, enclosing_namespace)
    ///    }
    ///}
    /// ```
    ///### Complex implementation
    /// To implement this for Named schema there is a general form needed to avoid creating invalid
    /// schemas or infinite loops.
    /// ```ignore
    /// impl AvroSchemaComponent for ComplexType {
    ///     fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
    ///         // Create the fully qualified name for your type given the enclosing namespace
    ///         let name =  apache_avro::schema::Name::new("MyName")
    ///             .expect("Unable to parse schema name")
    ///             .fully_qualified_name(enclosing_namespace);
    ///         let enclosing_namespace = &name.namespace;
    ///         // Check, if your name is already defined, and if so, return a ref to that name
    ///         if named_schemas.contains_key(&name) {
    ///             apache_avro::schema::Schema::Ref{name: name.clone()}
    ///         } else {
    ///             named_schemas.insert(name.clone(), apache_avro::schema::Schema::Ref{name: name.clone()});
    ///             // YOUR SCHEMA DEFINITION HERE with the name equivalent to "MyName".
    ///             // For non-simple sub types delegate to their implementation of AvroSchemaComponent
    ///         }
    ///    }
    ///}
    /// ```
    pub trait AvroSchemaComponent {
        fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace)
            -> Schema;
    }

    impl<T> AvroSchema for T
    where
        T: AvroSchemaComponent,
    {
        fn get_schema() -> Schema {
            T::get_schema_in_ctxt(&mut HashMap::default(), &Option::None)
        }
    }

    macro_rules! impl_schema(
        ($type:ty, $variant_constructor:expr) => (
            impl AvroSchemaComponent for $type {
                fn get_schema_in_ctxt(_: &mut Names, _: &Namespace) -> Schema {
                    $variant_constructor
                }
            }
        );
    );

    impl_schema!(i8, Schema::Int);
    impl_schema!(i16, Schema::Int);
    impl_schema!(i32, Schema::Int);
    impl_schema!(i64, Schema::Long);
    impl_schema!(u8, Schema::Int);
    impl_schema!(u16, Schema::Int);
    impl_schema!(u32, Schema::Long);
    impl_schema!(f32, Schema::Float);
    impl_schema!(f64, Schema::Double);
    impl_schema!(String, Schema::String);
    impl_schema!(uuid::Uuid, Schema::Uuid);
    impl_schema!(core::time::Duration, Schema::Duration);

    impl<T> AvroSchemaComponent for Vec<T>
    where
        T: AvroSchemaComponent,
    {
        fn get_schema_in_ctxt(
            named_schemas: &mut Names,
            enclosing_namespace: &Namespace,
        ) -> Schema {
            Schema::Array(Box::new(T::get_schema_in_ctxt(
                named_schemas,
                enclosing_namespace,
            )))
        }
    }

    impl<T> AvroSchemaComponent for Option<T>
    where
        T: AvroSchemaComponent,
    {
        fn get_schema_in_ctxt(
            named_schemas: &mut Names,
            enclosing_namespace: &Namespace,
        ) -> Schema {
            let inner_schema = T::get_schema_in_ctxt(named_schemas, enclosing_namespace);
            Schema::Union(UnionSchema {
                schemas: vec![Schema::Null, inner_schema.clone()],
                variant_index: vec![Schema::Null, inner_schema]
                    .iter()
                    .enumerate()
                    .map(|(idx, s)| (SchemaKind::from(s), idx))
                    .collect(),
            })
        }
    }

    impl<T> AvroSchemaComponent for Map<String, T>
    where
        T: AvroSchemaComponent,
    {
        fn get_schema_in_ctxt(
            named_schemas: &mut Names,
            enclosing_namespace: &Namespace,
        ) -> Schema {
            Schema::Map(Box::new(T::get_schema_in_ctxt(
                named_schemas,
                enclosing_namespace,
            )))
        }
    }

    impl<T> AvroSchemaComponent for HashMap<String, T>
    where
        T: AvroSchemaComponent,
    {
        fn get_schema_in_ctxt(
            named_schemas: &mut Names,
            enclosing_namespace: &Namespace,
        ) -> Schema {
            Schema::Map(Box::new(T::get_schema_in_ctxt(
                named_schemas,
                enclosing_namespace,
            )))
        }
    }

    impl<T> AvroSchemaComponent for Box<T>
    where
        T: AvroSchemaComponent,
    {
        fn get_schema_in_ctxt(
            named_schemas: &mut Names,
            enclosing_namespace: &Namespace,
        ) -> Schema {
            T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
        }
    }

    impl<T> AvroSchemaComponent for std::sync::Mutex<T>
    where
        T: AvroSchemaComponent,
    {
        fn get_schema_in_ctxt(
            named_schemas: &mut Names,
            enclosing_namespace: &Namespace,
        ) -> Schema {
            T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
        }
    }

    impl<T> AvroSchemaComponent for Cow<'_, T>
    where
        T: AvroSchemaComponent + Clone,
    {
        fn get_schema_in_ctxt(
            named_schemas: &mut Names,
            enclosing_namespace: &Namespace,
        ) -> Schema {
            T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use pretty_assertions::assert_eq;

    #[test]
    fn test_invalid_schema() {
        assert!(Schema::parse_str("invalid").is_err());
    }

    #[test]
    fn test_primitive_schema() {
        assert_eq!(Schema::Null, Schema::parse_str("\"null\"").unwrap());
        assert_eq!(Schema::Int, Schema::parse_str("\"int\"").unwrap());
        assert_eq!(Schema::Double, Schema::parse_str("\"double\"").unwrap());
    }

    #[test]
    fn test_array_schema() {
        let schema = Schema::parse_str(r#"{"type": "array", "items": "string"}"#).unwrap();
        assert_eq!(Schema::Array(Box::new(Schema::String)), schema);
    }

    #[test]
    fn test_map_schema() {
        let schema = Schema::parse_str(r#"{"type": "map", "values": "double"}"#).unwrap();
        assert_eq!(Schema::Map(Box::new(Schema::Double)), schema);
    }

    #[test]
    fn test_union_schema() {
        let schema = Schema::parse_str(r#"["null", "int"]"#).unwrap();
        assert_eq!(
            Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
            schema
        );
    }

    #[test]
    fn test_union_unsupported_schema() {
        let schema = Schema::parse_str(r#"["null", ["null", "int"], "string"]"#);
        assert!(schema.is_err());
    }

    #[test]
    fn test_multi_union_schema() {
        let schema = Schema::parse_str(r#"["null", "int", "float", "string", "bytes"]"#);
        assert!(schema.is_ok());
        let schema = schema.unwrap();
        assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
        let union_schema = match schema {
            Schema::Union(u) => u,
            _ => unreachable!(),
        };
        assert_eq!(union_schema.variants().len(), 5);
        let mut variants = union_schema.variants().iter();
        assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Null);
        assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Int);
        assert_eq!(
            SchemaKind::from(variants.next().unwrap()),
            SchemaKind::Float
        );
        assert_eq!(
            SchemaKind::from(variants.next().unwrap()),
            SchemaKind::String
        );
        assert_eq!(
            SchemaKind::from(variants.next().unwrap()),
            SchemaKind::Bytes
        );
        assert_eq!(variants.next(), None);
    }

    // AVRO-3248
    #[test]
    fn test_union_of_records() {
        use std::iter::FromIterator;

        // A and B are the same except the name.
        let schema_str_a = r#"{
            "name": "A",
            "type": "record",
            "fields": [
                {"name": "field_one", "type": "float"}
            ]
        }"#;

        let schema_str_b = r#"{
            "name": "B",
            "type": "record",
            "fields": [
                {"name": "field_one", "type": "float"}
            ]
        }"#;

        // we get Error::GetNameField if we put ["A", "B"] directly here.
        let schema_str_c = r#"{
            "name": "C",
            "type": "record",
            "fields": [
                {"name": "field_one",  "type": ["A", "B"]}
            ]
        }"#;

        let schema_c = Schema::parse_list(&[schema_str_a, schema_str_b, schema_str_c])
            .unwrap()
            .last()
            .unwrap()
            .clone();

        let schema_c_expected = Schema::Record {
            name: Name::new("C").unwrap(),
            aliases: None,
            doc: None,
            fields: vec![RecordField {
                name: "field_one".to_string(),
                doc: None,
                default: None,
                schema: Schema::Union(
                    UnionSchema::new(vec![
                        Schema::Ref {
                            name: Name::new("A").unwrap(),
                        },
                        Schema::Ref {
                            name: Name::new("B").unwrap(),
                        },
                    ])
                    .unwrap(),
                ),
                order: RecordFieldOrder::Ignore,
                position: 0,
            }],
            lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
        };

        assert_eq!(schema_c, schema_c_expected);
    }

    // AVRO-3584 : recursion in type definitions
    #[test]
    fn avro_3584_test_recursion_records() {
        // A and B are the same except the name.
        let schema_str_a = r#"{
            "name": "A",
            "type": "record",
            "fields": [ {"name": "field_one", "type": "B"} ]
        }"#;

        let schema_str_b = r#"{
            "name": "B",
            "type": "record",
            "fields": [ {"name": "field_one", "type": "A"} ]
        }"#;

        let list = Schema::parse_list(&[schema_str_a, schema_str_b]).unwrap();

        let schema_a = list.first().unwrap().clone();

        match schema_a {
            Schema::Record { fields, .. } => {
                let f1 = fields.get(0);

                let ref_schema = Schema::Ref {
                    name: Name::new("B").unwrap(),
                };
                assert_eq!(ref_schema, f1.unwrap().schema);
            }
            _ => panic!("Expected a record schema!"),
        }
    }

    // AVRO-3248
    #[test]
    fn test_nullable_record() {
        use std::iter::FromIterator;

        let schema_str_a = r#"{
            "name": "A",
            "type": "record",
            "fields": [
                {"name": "field_one", "type": "float"}
            ]
        }"#;

        // we get Error::GetNameField if we put ["null", "B"] directly here.
        let schema_str_option_a = r#"{
            "name": "OptionA",
            "type": "record",
            "fields": [
                {"name": "field_one",  "type": ["null", "A"], "default": "null"}
            ]
        }"#;

        let schema_option_a = Schema::parse_list(&[schema_str_a, schema_str_option_a])
            .unwrap()
            .last()
            .unwrap()
            .clone();

        let schema_option_a_expected = Schema::Record {
            name: Name::new("OptionA").unwrap(),
            aliases: None,
            doc: None,
            fields: vec![RecordField {
                name: "field_one".to_string(),
                doc: None,
                default: Some(Value::String("null".to_string())),
                schema: Schema::Union(
                    UnionSchema::new(vec![
                        Schema::Null,
                        Schema::Ref {
                            name: Name::new("A").unwrap(),
                        },
                    ])
                    .unwrap(),
                ),
                order: RecordFieldOrder::Ignore,
                position: 0,
            }],
            lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
        };

        assert_eq!(schema_option_a, schema_option_a_expected);
    }

    #[test]
    fn test_record_schema() {
        let parsed = Schema::parse_str(
            r#"
            {
                "type": "record",
                "name": "test",
                "fields": [
                    {"name": "a", "type": "long", "default": 42},
                    {"name": "b", "type": "string"}
                ]
            }
        "#,
        )
        .unwrap();

        let mut lookup = BTreeMap::new();
        lookup.insert("a".to_owned(), 0);
        lookup.insert("b".to_owned(), 1);

        let expected = Schema::Record {
            name: Name::new("test").unwrap(),
            aliases: None,
            doc: None,
            fields: vec![
                RecordField {
                    name: "a".to_string(),
                    doc: None,
                    default: Some(Value::Number(42i64.into())),
                    schema: Schema::Long,
                    order: RecordFieldOrder::Ascending,
                    position: 0,
                },
                RecordField {
                    name: "b".to_string(),
                    doc: None,
                    default: None,
                    schema: Schema::String,
                    order: RecordFieldOrder::Ascending,
                    position: 1,
                },
            ],
            lookup,
        };

        assert_eq!(parsed, expected);
    }

    // AVRO-3302
    #[test]
    fn test_record_schema_with_currently_parsing_schema() {
        let schema = Schema::parse_str(
            r#"
            {
                "type": "record",
                "name": "test",
                "fields": [{
                    "name": "recordField",
                    "type": {
                        "type": "record",
                        "name": "Node",
                        "fields": [
                            {"name": "label", "type": "string"},
                            {"name": "children", "type": {"type": "array", "items": "Node"}}
                        ]
                    }
                }]
            }
        "#,
        )
        .unwrap();

        let mut lookup = BTreeMap::new();
        lookup.insert("recordField".to_owned(), 0);

        let mut node_lookup = BTreeMap::new();
        node_lookup.insert("children".to_owned(), 1);
        node_lookup.insert("label".to_owned(), 0);

        let expected = Schema::Record {
            name: Name::new("test").unwrap(),
            aliases: None,
            doc: None,
            fields: vec![RecordField {
                name: "recordField".to_string(),
                doc: None,
                default: None,
                schema: Schema::Record {
                    name: Name::new("Node").unwrap(),
                    aliases: None,
                    doc: None,
                    fields: vec![
                        RecordField {
                            name: "label".to_string(),
                            doc: None,
                            default: None,
                            schema: Schema::String,
                            order: RecordFieldOrder::Ascending,
                            position: 0,
                        },
                        RecordField {
                            name: "children".to_string(),
                            doc: None,
                            default: None,
                            schema: Schema::Array(Box::new(Schema::Ref {
                                name: Name::new("Node").unwrap(),
                            })),
                            order: RecordFieldOrder::Ascending,
                            position: 1,
                        },
                    ],
                    lookup: node_lookup,
                },
                order: RecordFieldOrder::Ascending,
                position: 0,
            }],
            lookup,
        };
        assert_eq!(schema, expected);

        let canonical_form = &schema.canonical_form();
        let expected = r#"{"name":"test","type":"record","fields":[{"name":"recordField","type":{"name":"Node","type":"record","fields":[{"name":"label","type":"string"},{"name":"children","type":{"type":"array","items":"Node"}}]}}]}"#;
        assert_eq!(canonical_form, &expected);
    }

    // https://github.com/flavray/avro-rs/pull/99#issuecomment-1016948451
    #[test]
    fn test_parsing_of_recursive_type_enum() {
        let schema = r#"
    {
        "type": "record",
        "name": "User",
        "namespace": "office",
        "fields": [
            {
              "name": "details",
              "type": [
                {
                  "type": "record",
                  "name": "Employee",
                  "fields": [
                    {
                      "name": "gender",
                      "type": {
                        "type": "enum",
                        "name": "Gender",
                        "symbols": [
                          "male",
                          "female"
                        ]
                      },
                      "default": "female"
                    }
                  ]
                },
                {
                  "type": "record",
                  "name": "Manager",
                  "fields": [
                    {
                      "name": "gender",
                      "type": "Gender"
                    }
                  ]
                }
              ]
            }
          ]
        }
        "#;

        let schema = Schema::parse_str(schema).unwrap();
        let schema_str = schema.canonical_form();
        let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"Employee","type":"record","fields":[{"name":"gender","type":{"name":"Gender","type":"enum","symbols":["male","female"]}}]},{"name":"Manager","type":"record","fields":[{"name":"gender","type":"Gender"}]}]}]}"#;
        assert_eq!(schema_str, expected);
    }

    #[test]
    fn test_parsing_of_recursive_type_fixed() {
        let schema = r#"
    {
        "type": "record",
        "name": "User",
        "namespace": "office",
        "fields": [
            {
              "name": "details",
              "type": [
                {
                  "type": "record",
                  "name": "Employee",
                  "fields": [
                    {
                      "name": "id",
                      "type": {
                        "type": "fixed",
                        "name": "EmployeeId",
                        "size": 16
                      },
                      "default": "female"
                    }
                  ]
                },
                {
                  "type": "record",
                  "name": "Manager",
                  "fields": [
                    {
                      "name": "id",
                      "type": "EmployeeId"
                    }
                  ]
                }
              ]
            }
          ]
        }
        "#;

        let schema = Schema::parse_str(schema).unwrap();
        let schema_str = schema.canonical_form();
        let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"Employee","type":"record","fields":[{"name":"id","type":{"name":"EmployeeId","type":"fixed","size":16}}]},{"name":"Manager","type":"record","fields":[{"name":"id","type":"EmployeeId"}]}]}]}"#;
        assert_eq!(schema_str, expected);
    }

    // AVRO-3302
    #[test]
    fn test_record_schema_with_currently_parsing_schema_aliases() {
        let schema = Schema::parse_str(
            r#"
            {
              "type": "record",
              "name": "LongList",
              "aliases": ["LinkedLongs"],
              "fields" : [
                {"name": "value", "type": "long"},
                {"name": "next", "type": ["null", "LinkedLongs"]}
              ]
            }
        "#,
        )
        .unwrap();

        let mut lookup = BTreeMap::new();
        lookup.insert("value".to_owned(), 0);
        lookup.insert("next".to_owned(), 1);

        let expected = Schema::Record {
            name: Name {
                name: "LongList".to_owned(),
                namespace: None,
            },
            aliases: Some(vec![Alias::new("LinkedLongs").unwrap()]),
            doc: None,
            fields: vec![
                RecordField {
                    name: "value".to_string(),
                    doc: None,
                    default: None,
                    schema: Schema::Long,
                    order: RecordFieldOrder::Ascending,
                    position: 0,
                },
                RecordField {
                    name: "next".to_string(),
                    doc: None,
                    default: None,
                    schema: Schema::Union(
                        UnionSchema::new(vec![
                            Schema::Null,
                            Schema::Ref {
                                name: Name {
                                    name: "LongList".to_owned(),
                                    namespace: None,
                                },
                            },
                        ])
                        .unwrap(),
                    ),
                    order: RecordFieldOrder::Ascending,
                    position: 1,
                },
            ],
            lookup,
        };
        assert_eq!(schema, expected);

        let canonical_form = &schema.canonical_form();
        let expected = r#"{"name":"LongList","type":"record","fields":[{"name":"value","type":"long"},{"name":"next","type":["null","LongList"]}]}"#;
        assert_eq!(canonical_form, &expected);
    }

    // AVRO-3370
    #[test]
    fn test_record_schema_with_currently_parsing_schema_named_record() {
        let schema = Schema::parse_str(
            r#"
            {
              "type" : "record",
              "name" : "record",
              "fields" : [
                 { "name" : "value", "type" : "long" },
                 { "name" : "next", "type" : "record" }
             ]
            }
        "#,
        )
        .unwrap();

        let mut lookup = BTreeMap::new();
        lookup.insert("value".to_owned(), 0);
        lookup.insert("next".to_owned(), 1);

        let expected = Schema::Record {
            name: Name {
                name: "record".to_owned(),
                namespace: None,
            },
            aliases: None,
            doc: None,
            fields: vec![
                RecordField {
                    name: "value".to_string(),
                    doc: None,
                    default: None,
                    schema: Schema::Long,
                    order: RecordFieldOrder::Ascending,
                    position: 0,
                },
                RecordField {
                    name: "next".to_string(),
                    doc: None,
                    default: None,
                    schema: Schema::Ref {
                        name: Name {
                            name: "record".to_owned(),
                            namespace: None,
                        },
                    },
                    order: RecordFieldOrder::Ascending,
                    position: 1,
                },
            ],
            lookup,
        };
        assert_eq!(schema, expected);

        let canonical_form = &schema.canonical_form();
        let expected = r#"{"name":"record","type":"record","fields":[{"name":"value","type":"long"},{"name":"next","type":"record"}]}"#;
        assert_eq!(canonical_form, &expected);
    }

    // AVRO-3370
    #[test]
    fn test_record_schema_with_currently_parsing_schema_named_enum() {
        let schema = Schema::parse_str(
            r#"
            {
              "type" : "record",
              "name" : "record",
              "fields" : [
                 {
                    "type" : "enum",
                    "name" : "enum",
                    "symbols": ["one", "two", "three"]
                 },
                 { "name" : "next", "type" : "enum" }
             ]
            }
        "#,
        )
        .unwrap();

        let mut lookup = BTreeMap::new();
        lookup.insert("enum".to_owned(), 0);
        lookup.insert("next".to_owned(), 1);

        let expected = Schema::Record {
            name: Name {
                name: "record".to_owned(),
                namespace: None,
            },
            aliases: None,
            doc: None,
            fields: vec![
                RecordField {
                    name: "enum".to_string(),
                    doc: None,
                    default: None,
                    schema: Schema::Enum {
                        name: Name {
                            name: "enum".to_owned(),
                            namespace: None,
                        },
                        aliases: None,
                        doc: None,
                        symbols: vec!["one".to_string(), "two".to_string(), "three".to_string()],
                    },
                    order: RecordFieldOrder::Ascending,
                    position: 0,
                },
                RecordField {
                    name: "next".to_string(),
                    doc: None,
                    default: None,
                    schema: Schema::Enum {
                        name: Name {
                            name: "enum".to_owned(),
                            namespace: None,
                        },
                        aliases: None,
                        doc: None,
                        symbols: vec!["one".to_string(), "two".to_string(), "three".to_string()],
                    },
                    order: RecordFieldOrder::Ascending,
                    position: 1,
                },
            ],
            lookup,
        };
        assert_eq!(schema, expected);

        let canonical_form = &schema.canonical_form();
        let expected = r#"{"name":"record","type":"record","fields":[{"name":"enum","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}},{"name":"next","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}}]}"#;
        assert_eq!(canonical_form, &expected);
    }

    // AVRO-3370
    #[test]
    fn test_record_schema_with_currently_parsing_schema_named_fixed() {
        let schema = Schema::parse_str(
            r#"
            {
              "type" : "record",
              "name" : "record",
              "fields" : [
                 {
                    "type" : "fixed",
                    "name" : "fixed",
                    "size": 456
                 },
                 { "name" : "next", "type" : "fixed" }
             ]
            }
        "#,
        )
        .unwrap();

        let mut lookup = BTreeMap::new();
        lookup.insert("fixed".to_owned(), 0);
        lookup.insert("next".to_owned(), 1);

        let expected = Schema::Record {
            name: Name {
                name: "record".to_owned(),
                namespace: None,
            },
            aliases: None,
            doc: None,
            fields: vec![
                RecordField {
                    name: "fixed".to_string(),
                    doc: None,
                    default: None,
                    schema: Schema::Fixed {
                        name: Name {
                            name: "fixed".to_owned(),
                            namespace: None,
                        },
                        aliases: None,
                        doc: None,
                        size: 456,
                    },
                    order: RecordFieldOrder::Ascending,
                    position: 0,
                },
                RecordField {
                    name: "next".to_string(),
                    doc: None,
                    default: None,
                    schema: Schema::Fixed {
                        name: Name {
                            name: "fixed".to_owned(),
                            namespace: None,
                        },
                        aliases: None,
                        doc: None,
                        size: 456,
                    },
                    order: RecordFieldOrder::Ascending,
                    position: 1,
                },
            ],
            lookup,
        };
        assert_eq!(schema, expected);

        let canonical_form = &schema.canonical_form();
        let expected = r#"{"name":"record","type":"record","fields":[{"name":"fixed","type":{"name":"fixed","type":"fixed","size":456}},{"name":"next","type":{"name":"fixed","type":"fixed","size":456}}]}"#;
        assert_eq!(canonical_form, &expected);
    }

    #[test]
    fn test_enum_schema() {
        let schema = Schema::parse_str(
            r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "hearts"]}"#,
        ).unwrap();

        let expected = Schema::Enum {
            name: Name::new("Suit").unwrap(),
            aliases: None,
            doc: None,
            symbols: vec![
                "diamonds".to_owned(),
                "spades".to_owned(),
                "clubs".to_owned(),
                "hearts".to_owned(),
            ],
        };

        assert_eq!(expected, schema);
    }

    #[test]
    fn test_enum_schema_duplicate() {
        // Duplicate "diamonds"
        let schema = Schema::parse_str(
            r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "diamonds"]}"#,
        );
        assert!(schema.is_err());
    }

    #[test]
    fn test_enum_schema_name() {
        // Invalid name "0000" does not match [A-Za-z_][A-Za-z0-9_]*
        let schema = Schema::parse_str(
            r#"{"type": "enum", "name": "Enum", "symbols": ["0000", "variant"]}"#,
        );
        assert!(schema.is_err());
    }

    #[test]
    fn test_fixed_schema() {
        let schema = Schema::parse_str(r#"{"type": "fixed", "name": "test", "size": 16}"#).unwrap();

        let expected = Schema::Fixed {
            name: Name::new("test").unwrap(),
            aliases: None,
            doc: None,
            size: 16usize,
        };

        assert_eq!(expected, schema);
    }

    #[test]
    fn test_fixed_schema_with_documentation() {
        let schema = Schema::parse_str(
            r#"{"type": "fixed", "name": "test", "size": 16, "doc": "FixedSchema documentation"}"#,
        )
        .unwrap();

        let expected = Schema::Fixed {
            name: Name::new("test").unwrap(),
            aliases: None,
            doc: Some(String::from("FixedSchema documentation")),
            size: 16usize,
        };

        assert_eq!(expected, schema);
    }

    #[test]
    fn test_no_documentation() {
        let schema =
            Schema::parse_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
                .unwrap();

        let doc = match schema {
            Schema::Enum { doc, .. } => doc,
            _ => return,
        };

        assert!(doc.is_none());
    }

    #[test]
    fn test_documentation() {
        let schema = Schema::parse_str(
            r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
        ).unwrap();

        let doc = match schema {
            Schema::Enum { doc, .. } => doc,
            _ => None,
        };

        assert_eq!("Some documentation".to_owned(), doc.unwrap());
    }

    // Tests to ensure Schema is Send + Sync. These tests don't need to _do_ anything, if they can
    // compile, they pass.
    #[test]
    fn test_schema_is_send() {
        fn send<S: Send>(_s: S) {}

        let schema = Schema::Null;
        send(schema);
    }

    #[test]
    fn test_schema_is_sync() {
        fn sync<S: Sync>(_s: S) {}

        let schema = Schema::Null;
        sync(&schema);
        sync(schema);
    }

    #[test]
    #[cfg_attr(miri, ignore)] // Sha256 uses an inline assembly instructions which is not supported by miri
    fn test_schema_fingerprint() {
        use crate::rabin::Rabin;
        use md5::Md5;
        use sha2::Sha256;

        let raw_schema = r#"
    {
        "type": "record",
        "name": "test",
        "fields": [
            {"name": "a", "type": "long", "default": 42},
            {"name": "b", "type": "string"},
            {"name": "c", "type": "long", "logicalType": "timestamp-micros"}
        ]
    }
"#;

        let schema = Schema::parse_str(raw_schema).unwrap();
        assert_eq!(
            "abf662f831715ff78f88545a05a9262af75d6406b54e1a8a174ff1d2b75affc4",
            format!("{}", schema.fingerprint::<Sha256>())
        );

        assert_eq!(
            "6e21c350f71b1a34e9efe90970f1bc69",
            format!("{}", schema.fingerprint::<Md5>())
        );
        assert_eq!(
            "28cf0a67d9937bb3",
            format!("{}", schema.fingerprint::<Rabin>())
        )
    }

    #[test]
    fn test_logical_types() {
        let schema = Schema::parse_str(r#"{"type": "int", "logicalType": "date"}"#).unwrap();
        assert_eq!(schema, Schema::Date);

        let schema =
            Schema::parse_str(r#"{"type": "long", "logicalType": "timestamp-micros"}"#).unwrap();
        assert_eq!(schema, Schema::TimestampMicros);
    }

    #[test]
    fn test_nullable_logical_type() {
        let schema = Schema::parse_str(
            r#"{"type": ["null", {"type": "long", "logicalType": "timestamp-micros"}]}"#,
        )
        .unwrap();
        assert_eq!(
            schema,
            Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::TimestampMicros]).unwrap())
        );
    }

    #[test]
    fn record_field_order_from_str() {
        use std::str::FromStr;

        assert_eq!(
            RecordFieldOrder::from_str("ascending").unwrap(),
            RecordFieldOrder::Ascending
        );
        assert_eq!(
            RecordFieldOrder::from_str("descending").unwrap(),
            RecordFieldOrder::Descending
        );
        assert_eq!(
            RecordFieldOrder::from_str("ignore").unwrap(),
            RecordFieldOrder::Ignore
        );
        assert!(RecordFieldOrder::from_str("not an ordering").is_err());
    }

    /// AVRO-3374
    #[test]
    fn test_avro_3374_preserve_namespace_for_primitive() {
        let schema = Schema::parse_str(
            r#"
            {
              "type" : "record",
              "name" : "ns.int",
              "fields" : [
                {"name" : "value", "type" : "int"},
                {"name" : "next", "type" : [ "null", "ns.int" ]}
              ]
            }
            "#,
        )
        .unwrap();

        let json = schema.canonical_form();
        assert_eq!(
            json,
            r#"{"name":"ns.int","type":"record","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","ns.int"]}]}"#
        );
    }

    #[test]
    fn test_avro_3433_preserve_schema_refs_in_json() {
        let schema = r#"
    {
      "name": "test.test",
      "type": "record",
      "fields": [
        {
          "name": "bar",
          "type": { "name": "test.foo", "type": "record", "fields": [{ "name": "id", "type": "long" }] }
        },
        { "name": "baz", "type": "test.foo" }
      ]
    }
    "#;

        let schema = Schema::parse_str(schema).unwrap();

        let expected = r#"{"name":"test.test","type":"record","fields":[{"name":"bar","type":{"name":"test.foo","type":"record","fields":[{"name":"id","type":"long"}]}},{"name":"baz","type":"test.foo"}]}"#;
        assert_eq!(schema.canonical_form(), expected);
    }

    #[test]
    fn test_read_namespace_from_name() {
        let schema = r#"
    {
      "name": "space.name",
      "type": "record",
      "fields": [
        {
          "name": "num",
          "type": "int"
        }
      ]
    }
    "#;

        let schema = Schema::parse_str(schema).unwrap();
        if let Schema::Record { name, .. } = schema {
            assert_eq!(name.name, "name");
            assert_eq!(name.namespace, Some("space".to_string()));
        } else {
            panic!("Expected a record schema!");
        }
    }

    #[test]
    fn test_namespace_from_name_has_priority_over_from_field() {
        let schema = r#"
    {
      "name": "space1.name",
      "namespace": "space2",
      "type": "record",
      "fields": [
        {
          "name": "num",
          "type": "int"
        }
      ]
    }
    "#;

        let schema = Schema::parse_str(schema).unwrap();
        if let Schema::Record { name, .. } = schema {
            assert_eq!(name.namespace, Some("space1".to_string()));
        } else {
            panic!("Expected a record schema!");
        }
    }

    #[test]
    fn test_namespace_from_field() {
        let schema = r#"
    {
      "name": "name",
      "namespace": "space2",
      "type": "record",
      "fields": [
        {
          "name": "num",
          "type": "int"
        }
      ]
    }
    "#;

        let schema = Schema::parse_str(schema).unwrap();
        if let Schema::Record { name, .. } = schema {
            assert_eq!(name.namespace, Some("space2".to_string()));
        } else {
            panic!("Expected a record schema!");
        }
    }

    #[test]
    /// Zero-length namespace is considered as no-namespace.
    fn test_namespace_from_name_with_empty_value() {
        let name = Name::new(".name").unwrap();
        assert_eq!(name.name, "name");
        assert_eq!(name.namespace, None);
    }

    #[test]
    /// Whitespace is not allowed in the name.
    fn test_name_with_whitespace_value() {
        match Name::new(" ") {
            Err(Error::InvalidSchemaName(_, _)) => {}
            _ => panic!("Expected an Error::InvalidSchemaName!"),
        }
    }

    #[test]
    /// The name must be non-empty.
    fn test_name_with_no_name_part() {
        match Name::new("space.") {
            Err(Error::InvalidSchemaName(_, _)) => {}
            _ => panic!("Expected an Error::InvalidSchemaName!"),
        }
    }

    #[test]
    fn avro_3448_test_proper_resolution_inner_record_inherited_namespace() {
        let schema = r#"
        {
          "name": "record_name",
          "namespace": "space",
          "type": "record",
          "fields": [
            {
              "name": "outer_field_1",
              "type": [
                        "null",
                        {
                            "type":"record",
                            "name":"inner_record_name",
                            "fields":[
                                {
                                    "name":"inner_field_1",
                                    "type":"double"
                                }
                            ]
                        }
                    ]
            },
            {
                "name": "outer_field_2",
                "type" : "inner_record_name"
            }
          ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
        assert_eq!(rs.get_names().len(), 2);
        for s in &["space.record_name", "space.inner_record_name"] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }
    }

    #[test]
    fn avro_3448_test_proper_resolution_inner_record_qualified_namespace() {
        let schema = r#"
        {
          "name": "record_name",
          "namespace": "space",
          "type": "record",
          "fields": [
            {
              "name": "outer_field_1",
              "type": [
                        "null",
                        {
                            "type":"record",
                            "name":"inner_record_name",
                            "fields":[
                                {
                                    "name":"inner_field_1",
                                    "type":"double"
                                }
                            ]
                        }
                    ]
            },
            {
                "name": "outer_field_2",
                "type" : "space.inner_record_name"
            }
          ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
        assert_eq!(rs.get_names().len(), 2);
        for s in &["space.record_name", "space.inner_record_name"] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }
    }

    #[test]
    fn avro_3448_test_proper_resolution_inner_enum_inherited_namespace() {
        let schema = r#"
        {
          "name": "record_name",
          "namespace": "space",
          "type": "record",
          "fields": [
            {
              "name": "outer_field_1",
              "type": [
                        "null",
                        {
                            "type":"enum",
                            "name":"inner_enum_name",
                            "symbols":["Extensive","Testing"]
                        }
                    ]
            },
            {
                "name": "outer_field_2",
                "type" : "inner_enum_name"
            }
          ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
        assert_eq!(rs.get_names().len(), 2);
        for s in &["space.record_name", "space.inner_enum_name"] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }
    }

    #[test]
    fn avro_3448_test_proper_resolution_inner_enum_qualified_namespace() {
        let schema = r#"
        {
          "name": "record_name",
          "namespace": "space",
          "type": "record",
          "fields": [
            {
              "name": "outer_field_1",
              "type": [
                        "null",
                        {
                            "type":"enum",
                            "name":"inner_enum_name",
                            "symbols":["Extensive","Testing"]
                        }
                    ]
            },
            {
                "name": "outer_field_2",
                "type" : "space.inner_enum_name"
            }
          ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
        assert_eq!(rs.get_names().len(), 2);
        for s in &["space.record_name", "space.inner_enum_name"] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }
    }

    #[test]
    fn avro_3448_test_proper_resolution_inner_fixed_inherited_namespace() {
        let schema = r#"
        {
          "name": "record_name",
          "namespace": "space",
          "type": "record",
          "fields": [
            {
              "name": "outer_field_1",
              "type": [
                        "null",
                        {
                            "type":"fixed",
                            "name":"inner_fixed_name",
                            "size": 16
                        }
                    ]
            },
            {
                "name": "outer_field_2",
                "type" : "inner_fixed_name"
            }
          ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
        assert_eq!(rs.get_names().len(), 2);
        for s in &["space.record_name", "space.inner_fixed_name"] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }
    }

    #[test]
    fn avro_3448_test_proper_resolution_inner_fixed_qualified_namespace() {
        let schema = r#"
        {
          "name": "record_name",
          "namespace": "space",
          "type": "record",
          "fields": [
            {
              "name": "outer_field_1",
              "type": [
                        "null",
                        {
                            "type":"fixed",
                            "name":"inner_fixed_name",
                            "size": 16
                        }
                    ]
            },
            {
                "name": "outer_field_2",
                "type" : "space.inner_fixed_name"
            }
          ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
        assert_eq!(rs.get_names().len(), 2);
        for s in &["space.record_name", "space.inner_fixed_name"] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }
    }

    #[test]
    fn avro_3448_test_proper_resolution_inner_record_inner_namespace() {
        let schema = r#"
        {
          "name": "record_name",
          "namespace": "space",
          "type": "record",
          "fields": [
            {
              "name": "outer_field_1",
              "type": [
                        "null",
                        {
                            "type":"record",
                            "name":"inner_record_name",
                            "namespace":"inner_space",
                            "fields":[
                                {
                                    "name":"inner_field_1",
                                    "type":"double"
                                }
                            ]
                        }
                    ]
            },
            {
                "name": "outer_field_2",
                "type" : "inner_space.inner_record_name"
            }
          ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
        assert_eq!(rs.get_names().len(), 2);
        for s in &["space.record_name", "inner_space.inner_record_name"] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }
    }

    #[test]
    fn avro_3448_test_proper_resolution_inner_enum_inner_namespace() {
        let schema = r#"
        {
          "name": "record_name",
          "namespace": "space",
          "type": "record",
          "fields": [
            {
              "name": "outer_field_1",
              "type": [
                        "null",
                        {
                            "type":"enum",
                            "name":"inner_enum_name",
                            "namespace": "inner_space",
                            "symbols":["Extensive","Testing"]
                        }
                    ]
            },
            {
                "name": "outer_field_2",
                "type" : "inner_space.inner_enum_name"
            }
          ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
        assert_eq!(rs.get_names().len(), 2);
        for s in &["space.record_name", "inner_space.inner_enum_name"] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }
    }

    #[test]
    fn avro_3448_test_proper_resolution_inner_fixed_inner_namespace() {
        let schema = r#"
        {
          "name": "record_name",
          "namespace": "space",
          "type": "record",
          "fields": [
            {
              "name": "outer_field_1",
              "type": [
                        "null",
                        {
                            "type":"fixed",
                            "name":"inner_fixed_name",
                            "namespace": "inner_space",
                            "size": 16
                        }
                    ]
            },
            {
                "name": "outer_field_2",
                "type" : "inner_space.inner_fixed_name"
            }
          ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
        assert_eq!(rs.get_names().len(), 2);
        for s in &["space.record_name", "inner_space.inner_fixed_name"] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }
    }

    #[test]
    fn avro_3448_test_proper_multi_level_resolution_inner_record_outer_namespace() {
        let schema = r#"
        {
          "name": "record_name",
          "namespace": "space",
          "type": "record",
          "fields": [
            {
              "name": "outer_field_1",
              "type": [
                        "null",
                        {
                            "type":"record",
                            "name":"middle_record_name",
                            "fields":[
                                {
                                    "name":"middle_field_1",
                                    "type":[
                                        "null",
                                        {
                                            "type":"record",
                                            "name":"inner_record_name",
                                            "fields":[
                                                {
                                                    "name":"inner_field_1",
                                                    "type":"double"
                                                }
                                            ]
                                        }
                                    ]
                                }
                            ]
                        }
                    ]
            },
            {
                "name": "outer_field_2",
                "type" : "space.inner_record_name"
            }
          ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
        assert_eq!(rs.get_names().len(), 3);
        for s in &[
            "space.record_name",
            "space.middle_record_name",
            "space.inner_record_name",
        ] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }
    }

    #[test]
    fn avro_3448_test_proper_multi_level_resolution_inner_record_middle_namespace() {
        let schema = r#"
        {
          "name": "record_name",
          "namespace": "space",
          "type": "record",
          "fields": [
            {
              "name": "outer_field_1",
              "type": [
                        "null",
                        {
                            "type":"record",
                            "name":"middle_record_name",
                            "namespace":"middle_namespace",
                            "fields":[
                                {
                                    "name":"middle_field_1",
                                    "type":[
                                        "null",
                                        {
                                            "type":"record",
                                            "name":"inner_record_name",
                                            "fields":[
                                                {
                                                    "name":"inner_field_1",
                                                    "type":"double"
                                                }
                                            ]
                                        }
                                    ]
                                }
                            ]
                        }
                    ]
            },
            {
                "name": "outer_field_2",
                "type" : "middle_namespace.inner_record_name"
            }
          ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
        assert_eq!(rs.get_names().len(), 3);
        for s in &[
            "space.record_name",
            "middle_namespace.middle_record_name",
            "middle_namespace.inner_record_name",
        ] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }
    }

    #[test]
    fn avro_3448_test_proper_multi_level_resolution_inner_record_inner_namespace() {
        let schema = r#"
        {
          "name": "record_name",
          "namespace": "space",
          "type": "record",
          "fields": [
            {
              "name": "outer_field_1",
              "type": [
                        "null",
                        {
                            "type":"record",
                            "name":"middle_record_name",
                            "namespace":"middle_namespace",
                            "fields":[
                                {
                                    "name":"middle_field_1",
                                    "type":[
                                        "null",
                                        {
                                            "type":"record",
                                            "name":"inner_record_name",
                                            "namespace":"inner_namespace",
                                            "fields":[
                                                {
                                                    "name":"inner_field_1",
                                                    "type":"double"
                                                }
                                            ]
                                        }
                                    ]
                                }
                            ]
                        }
                    ]
            },
            {
                "name": "outer_field_2",
                "type" : "inner_namespace.inner_record_name"
            }
          ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
        assert_eq!(rs.get_names().len(), 3);
        for s in &[
            "space.record_name",
            "middle_namespace.middle_record_name",
            "inner_namespace.inner_record_name",
        ] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }
    }

    #[test]
    fn avro_3448_test_proper_in_array_resolution_inherited_namespace() {
        let schema = r#"
        {
          "name": "record_name",
          "namespace": "space",
          "type": "record",
          "fields": [
            {
              "name": "outer_field_1",
              "type": {
                  "type":"array",
                  "items":{
                      "type":"record",
                      "name":"in_array_record",
                      "fields": [
                          {
                              "name":"array_record_field",
                              "type":"string"
                          }
                      ]
                  }
              }
            },
            {
                "name":"outer_field_2",
                "type":"in_array_record"
            }
          ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
        assert_eq!(rs.get_names().len(), 2);
        for s in &["space.record_name", "space.in_array_record"] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }
    }

    #[test]
    fn avro_3448_test_proper_in_map_resolution_inherited_namespace() {
        let schema = r#"
        {
          "name": "record_name",
          "namespace": "space",
          "type": "record",
          "fields": [
            {
              "name": "outer_field_1",
              "type": {
                  "type":"map",
                  "values":{
                      "type":"record",
                      "name":"in_map_record",
                      "fields": [
                          {
                              "name":"map_record_field",
                              "type":"string"
                          }
                      ]
                  }
              }
            },
            {
                "name":"outer_field_2",
                "type":"in_map_record"
            }
          ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
        assert_eq!(rs.get_names().len(), 2);
        for s in &["space.record_name", "space.in_map_record"] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }
    }

    #[test]
    fn avro_3466_test_to_json_inner_enum_inner_namespace() {
        let schema = r#"
        {
        "name": "record_name",
        "namespace": "space",
        "type": "record",
        "fields": [
            {
            "name": "outer_field_1",
            "type": [
                        "null",
                        {
                            "type":"enum",
                            "name":"inner_enum_name",
                            "namespace": "inner_space",
                            "symbols":["Extensive","Testing"]
                        }
                    ]
            },
            {
                "name": "outer_field_2",
                "type" : "inner_space.inner_enum_name"
            }
        ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");

        // confirm we have expected 2 full-names
        assert_eq!(rs.get_names().len(), 2);
        for s in &["space.record_name", "inner_space.inner_enum_name"] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }

        // convert Schema back to JSON string
        let schema_str = serde_json::to_string(&schema).expect("test failed");
        let _schema = Schema::parse_str(&schema_str).expect("test failed");
        assert_eq!(schema, _schema);
    }

    #[test]
    fn avro_3466_test_to_json_inner_fixed_inner_namespace() {
        let schema = r#"
        {
        "name": "record_name",
        "namespace": "space",
        "type": "record",
        "fields": [
            {
            "name": "outer_field_1",
            "type": [
                        "null",
                        {
                            "type":"fixed",
                            "name":"inner_fixed_name",
                            "namespace": "inner_space",
                            "size":54
                        }
                    ]
            },
            {
                "name": "outer_field_2",
                "type" : "inner_space.inner_fixed_name"
            }
        ]
        }
        "#;
        let schema = Schema::parse_str(schema).unwrap();
        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");

        // confirm we have expected 2 full-names
        assert_eq!(rs.get_names().len(), 2);
        for s in &["space.record_name", "inner_space.inner_fixed_name"] {
            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
        }

        // convert Schema back to JSON string
        let schema_str = serde_json::to_string(&schema).expect("test failed");
        let _schema = Schema::parse_str(&schema_str).expect("test failed");
        assert_eq!(schema, _schema);
    }

    fn assert_avro_3512_aliases(aliases: &Aliases) {
        match aliases {
            Some(aliases) => {
                assert_eq!(aliases.len(), 3);
                assert_eq!(aliases[0], Alias::new("space.b").unwrap());
                assert_eq!(aliases[1], Alias::new("x.y").unwrap());
                assert_eq!(aliases[2], Alias::new(".c").unwrap());
            }
            None => {
                panic!("'aliases' must be Some");
            }
        }
    }

    #[test]
    fn avro_3512_alias_with_null_namespace_record() {
        let schema = Schema::parse_str(
            r#"
            {
              "type": "record",
              "name": "a",
              "namespace": "space",
              "aliases": ["b", "x.y", ".c"],
              "fields" : [
                {"name": "time", "type": "long"}
              ]
            }
        "#,
        )
        .unwrap();

        if let Schema::Record { ref aliases, .. } = schema {
            assert_avro_3512_aliases(aliases);
        } else {
            panic!("The Schema should be a record: {:?}", schema);
        }
    }

    #[test]
    fn avro_3512_alias_with_null_namespace_enum() {
        let schema = Schema::parse_str(
            r#"
            {
              "type": "enum",
              "name": "a",
              "namespace": "space",
              "aliases": ["b", "x.y", ".c"],
              "symbols" : [
                "symbol1", "symbol2"
              ]
            }
        "#,
        )
        .unwrap();

        if let Schema::Enum { ref aliases, .. } = schema {
            assert_avro_3512_aliases(aliases);
        } else {
            panic!("The Schema should be an enum: {:?}", schema);
        }
    }

    #[test]
    fn avro_3512_alias_with_null_namespace_fixed() {
        let schema = Schema::parse_str(
            r#"
            {
              "type": "fixed",
              "name": "a",
              "namespace": "space",
              "aliases": ["b", "x.y", ".c"],
              "size" : 12
            }
        "#,
        )
        .unwrap();

        if let Schema::Fixed { ref aliases, .. } = schema {
            assert_avro_3512_aliases(aliases);
        } else {
            panic!("The Schema should be a fixed: {:?}", schema);
        }
    }

    #[test]
    fn avro_3518_serialize_aliases_record() {
        let schema = Schema::parse_str(
            r#"
            {
              "type": "record",
              "name": "a",
              "namespace": "space",
              "aliases": ["b", "x.y", ".c"],
              "fields" : [
                {"name": "time", "type": "long"}
              ]
            }
        "#,
        )
        .unwrap();

        let value = serde_json::to_value(&schema).unwrap();
        let serialized = serde_json::to_string(&value).unwrap();
        assert_eq!(
            r#"{"aliases":["space.b","x.y","c"],"fields":[{"name":"time","type":"long"}],"name":"a","namespace":"space","type":"record"}"#,
            &serialized
        );
        assert_eq!(schema, Schema::parse_str(&serialized).unwrap());
    }

    #[test]
    fn avro_3518_serialize_aliases_enum() {
        let schema = Schema::parse_str(
            r#"
            {
              "type": "enum",
              "name": "a",
              "namespace": "space",
              "aliases": ["b", "x.y", ".c"],
              "symbols" : [
                "symbol1", "symbol2"
              ]
            }
        "#,
        )
        .unwrap();

        let value = serde_json::to_value(&schema).unwrap();
        let serialized = serde_json::to_string(&value).unwrap();
        assert_eq!(
            r#"{"aliases":["space.b","x.y","c"],"name":"a","namespace":"space","symbols":["symbol1","symbol2"],"type":"enum"}"#,
            &serialized
        );
        assert_eq!(schema, Schema::parse_str(&serialized).unwrap());
    }

    #[test]
    fn avro_3518_serialize_aliases_fixed() {
        let schema = Schema::parse_str(
            r#"
            {
              "type": "fixed",
              "name": "a",
              "namespace": "space",
              "aliases": ["b", "x.y", ".c"],
              "size" : 12
            }
        "#,
        )
        .unwrap();

        let value = serde_json::to_value(&schema).unwrap();
        let serialized = serde_json::to_string(&value).unwrap();
        assert_eq!(
            r#"{"aliases":["space.b","x.y","c"],"name":"a","namespace":"space","size":12,"type":"fixed"}"#,
            &serialized
        );
        assert_eq!(schema, Schema::parse_str(&serialized).unwrap());
    }

    #[test]
    fn avro_3130_parse_anonymous_union_type() {
        let schema_str = r#"
        {
            "type": "record",
            "name": "AccountEvent",
            "fields": [
                {"type":
                  ["null",
                   { "name": "accountList",
                      "type": {
                        "type": "array",
                        "items": "long"
                      }
                  }
                  ],
                 "name":"NullableLongArray"
               }
            ]
        }
        "#;
        let schema = Schema::parse_str(schema_str).unwrap();

        if let Schema::Record { name, fields, .. } = schema {
            assert_eq!(name, Name::new("AccountEvent").unwrap());

            let field = &fields[0];
            assert_eq!(&field.name, "NullableLongArray");

            if let Schema::Union(ref union) = field.schema {
                assert_eq!(union.schemas[0], Schema::Null);

                if let Schema::Array(ref array_schema) = union.schemas[1] {
                    if let Schema::Long = **array_schema {
                        // OK
                    } else {
                        panic!("Expected a Schema::Array of type Long");
                    }
                } else {
                    panic!("Expected Schema::Array");
                }
            } else {
                panic!("Expected Schema::Union");
            }
        } else {
            panic!("Expected Schema::Record");
        }
    }
}