apache-avro 0.16.0

A library for working with Apache Avro in Rust
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Logic for parsing and interacting with schemas in Avro format.
use crate::{error::Error, types, util::MapHelper, AvroResult};
use digest::Digest;
use lazy_static::lazy_static;
use regex_lite::Regex;
use serde::{
    ser::{SerializeMap, SerializeSeq},
    Deserialize, Serialize, Serializer,
};
use serde_json::{Map, Value};
use std::{
    borrow::{Borrow, Cow},
    collections::{BTreeMap, HashMap, HashSet},
    convert::{TryFrom, TryInto},
    fmt,
    fmt::Debug,
    hash::Hash,
    io::Read,
    str::FromStr,
};
use strum_macros::{EnumDiscriminants, EnumString};

lazy_static! {
    static ref ENUM_SYMBOL_NAME_R: Regex = Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").unwrap();

    // An optional namespace (with optional dots) followed by a name without any dots in it.
    static ref SCHEMA_NAME_R: Regex =
        Regex::new(r"^((?P<namespace>([A-Za-z_][A-Za-z0-9_\.]*)*)\.)?(?P<name>[A-Za-z_][A-Za-z0-9_]*)$").unwrap();

    static ref FIELD_NAME_R: Regex = Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").unwrap();

    static ref NAMESPACE_R: Regex = Regex::new(r"^([A-Za-z_][A-Za-z0-9_]*(\.[A-Za-z_][A-Za-z0-9_]*)*)?$").unwrap();
}

/// Represents an Avro schema fingerprint
/// More information about Avro schema fingerprints can be found in the
/// [Avro Schema Fingerprint documentation](https://avro.apache.org/docs/current/spec.html#schema_fingerprints)
pub struct SchemaFingerprint {
    pub bytes: Vec<u8>,
}

impl fmt::Display for SchemaFingerprint {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(
            f,
            "{}",
            self.bytes
                .iter()
                .map(|byte| format!("{byte:02x}"))
                .collect::<Vec<String>>()
                .join("")
        )
    }
}

/// Represents any valid Avro schema
/// More information about Avro schemas can be found in the
/// [Avro Specification](https://avro.apache.org/docs/current/spec.html#schemas)
#[derive(Clone, Debug, EnumDiscriminants)]
#[strum_discriminants(name(SchemaKind), derive(Hash, Ord, PartialOrd))]
pub enum Schema {
    /// A `null` Avro schema.
    Null,
    /// A `boolean` Avro schema.
    Boolean,
    /// An `int` Avro schema.
    Int,
    /// A `long` Avro schema.
    Long,
    /// A `float` Avro schema.
    Float,
    /// A `double` Avro schema.
    Double,
    /// A `bytes` Avro schema.
    /// `Bytes` represents a sequence of 8-bit unsigned bytes.
    Bytes,
    /// A `string` Avro schema.
    /// `String` represents a unicode character sequence.
    String,
    /// A `array` Avro schema. Avro arrays are required to have the same type for each element.
    /// This variant holds the `Schema` for the array element type.
    Array(Box<Schema>),
    /// A `map` Avro schema.
    /// `Map` holds a pointer to the `Schema` of its values, which must all be the same schema.
    /// `Map` keys are assumed to be `string`.
    Map(Box<Schema>),
    /// A `union` Avro schema.
    Union(UnionSchema),
    /// A `record` Avro schema.
    Record(RecordSchema),
    /// An `enum` Avro schema.
    Enum(EnumSchema),
    /// A `fixed` Avro schema.
    Fixed(FixedSchema),
    /// Logical type which represents `Decimal` values. The underlying type is serialized and
    /// deserialized as `Schema::Bytes` or `Schema::Fixed`.
    Decimal(DecimalSchema),
    /// A universally unique identifier, annotating a string.
    Uuid,
    /// Logical type which represents the number of days since the unix epoch.
    /// Serialization format is `Schema::Int`.
    Date,
    /// The time of day in number of milliseconds after midnight with no reference any calendar,
    /// time zone or date in particular.
    TimeMillis,
    /// The time of day in number of microseconds after midnight with no reference any calendar,
    /// time zone or date in particular.
    TimeMicros,
    /// An instant in time represented as the number of milliseconds after the UNIX epoch.
    TimestampMillis,
    /// An instant in time represented as the number of microseconds after the UNIX epoch.
    TimestampMicros,
    /// An instant in localtime represented as the number of milliseconds after the UNIX epoch.
    LocalTimestampMillis,
    /// An instant in local time represented as the number of microseconds after the UNIX epoch.
    LocalTimestampMicros,
    /// An amount of time defined by a number of months, days and milliseconds.
    Duration,
    /// A reference to another schema.
    Ref { name: Name },
}

impl PartialEq for Schema {
    /// Assess equality of two `Schema` based on [Parsing Canonical Form].
    ///
    /// [Parsing Canonical Form]:
    /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas
    fn eq(&self, other: &Self) -> bool {
        self.canonical_form() == other.canonical_form()
    }
}

impl SchemaKind {
    pub fn is_primitive(self) -> bool {
        matches!(
            self,
            SchemaKind::Null
                | SchemaKind::Boolean
                | SchemaKind::Int
                | SchemaKind::Long
                | SchemaKind::Double
                | SchemaKind::Float
                | SchemaKind::Bytes
                | SchemaKind::String,
        )
    }

    pub fn is_named(self) -> bool {
        matches!(
            self,
            SchemaKind::Record | SchemaKind::Enum | SchemaKind::Fixed | SchemaKind::Ref
        )
    }
}

impl From<&types::Value> for SchemaKind {
    fn from(value: &types::Value) -> Self {
        use crate::types::Value;
        match value {
            Value::Null => Self::Null,
            Value::Boolean(_) => Self::Boolean,
            Value::Int(_) => Self::Int,
            Value::Long(_) => Self::Long,
            Value::Float(_) => Self::Float,
            Value::Double(_) => Self::Double,
            Value::Bytes(_) => Self::Bytes,
            Value::String(_) => Self::String,
            Value::Array(_) => Self::Array,
            Value::Map(_) => Self::Map,
            Value::Union(_, _) => Self::Union,
            Value::Record(_) => Self::Record,
            Value::Enum(_, _) => Self::Enum,
            Value::Fixed(_, _) => Self::Fixed,
            Value::Decimal { .. } => Self::Decimal,
            Value::Uuid(_) => Self::Uuid,
            Value::Date(_) => Self::Date,
            Value::TimeMillis(_) => Self::TimeMillis,
            Value::TimeMicros(_) => Self::TimeMicros,
            Value::TimestampMillis(_) => Self::TimestampMillis,
            Value::TimestampMicros(_) => Self::TimestampMicros,
            Value::LocalTimestampMillis(_) => Self::LocalTimestampMillis,
            Value::LocalTimestampMicros(_) => Self::LocalTimestampMicros,
            Value::Duration { .. } => Self::Duration,
        }
    }
}

/// Represents names for `record`, `enum` and `fixed` Avro schemas.
///
/// Each of these `Schema`s have a `fullname` composed of two parts:
///   * a name
///   * a namespace
///
/// `aliases` can also be defined, to facilitate schema evolution.
///
/// More information about schema names can be found in the
/// [Avro specification](https://avro.apache.org/docs/current/spec.html#names)
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct Name {
    pub name: String,
    pub namespace: Namespace,
}

/// Represents documentation for complex Avro schemas.
pub type Documentation = Option<String>;
/// Represents the aliases for Named Schema
pub type Aliases = Option<Vec<Alias>>;
/// Represents Schema lookup within a schema env
pub(crate) type Names = HashMap<Name, Schema>;
/// Represents Schema lookup within a schema
pub type NamesRef<'a> = HashMap<Name, &'a Schema>;
/// Represents the namespace for Named Schema
pub type Namespace = Option<String>;

impl Name {
    /// Create a new `Name`.
    /// Parses the optional `namespace` from the `name` string.
    /// `aliases` will not be defined.
    pub fn new(name: &str) -> AvroResult<Self> {
        let (name, namespace) = Name::get_name_and_namespace(name)?;
        Ok(Self {
            name,
            namespace: namespace.filter(|ns| !ns.is_empty()),
        })
    }

    fn get_name_and_namespace(name: &str) -> AvroResult<(String, Namespace)> {
        let caps = SCHEMA_NAME_R
            .captures(name)
            .ok_or_else(|| Error::InvalidSchemaName(name.to_string(), SCHEMA_NAME_R.as_str()))?;
        Ok((
            caps["name"].to_string(),
            caps.name("namespace").map(|s| s.as_str().to_string()),
        ))
    }

    /// Parse a `serde_json::Value` into a `Name`.
    pub(crate) fn parse(
        complex: &Map<String, Value>,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Self> {
        let (name, namespace_from_name) = complex
            .name()
            .map(|name| Name::get_name_and_namespace(name.as_str()).unwrap())
            .ok_or(Error::GetNameField)?;
        // FIXME Reading name from the type is wrong ! The name there is just a metadata (AVRO-3430)
        let type_name = match complex.get("type") {
            Some(Value::Object(complex_type)) => complex_type.name().or(None),
            _ => None,
        };

        let namespace = namespace_from_name
            .or_else(|| {
                complex
                    .string("namespace")
                    .or_else(|| enclosing_namespace.clone())
            })
            .filter(|ns| !ns.is_empty());

        if let Some(ref ns) = namespace {
            if !NAMESPACE_R.is_match(ns) {
                return Err(Error::InvalidNamespace(
                    ns.to_string(),
                    NAMESPACE_R.as_str(),
                ));
            }
        }

        Ok(Self {
            name: type_name.unwrap_or(name),
            namespace,
        })
    }

    /// Return the `fullname` of this `Name`
    ///
    /// More information about fullnames can be found in the
    /// [Avro specification](https://avro.apache.org/docs/current/spec.html#names)
    pub fn fullname(&self, default_namespace: Namespace) -> String {
        if self.name.contains('.') {
            self.name.clone()
        } else {
            let namespace = self.namespace.clone().or(default_namespace);

            match namespace {
                Some(ref namespace) if !namespace.is_empty() => {
                    format!("{}.{}", namespace, self.name)
                }
                _ => self.name.clone(),
            }
        }
    }

    /// Return the fully qualified name needed for indexing or searching for the schema within a schema/schema env context. Puts the enclosing namespace into the name's namespace for clarity in schema/schema env parsing
    /// ```ignore
    /// use apache_avro::schema::Name;
    ///
    /// assert_eq!(
    /// Name::new("some_name")?.fully_qualified_name(&Some("some_namespace".into())),
    /// Name::new("some_namespace.some_name")?
    /// );
    /// assert_eq!(
    /// Name::new("some_namespace.some_name")?.fully_qualified_name(&Some("other_namespace".into())),
    /// Name::new("some_namespace.some_name")?
    /// );
    /// ```
    pub fn fully_qualified_name(&self, enclosing_namespace: &Namespace) -> Name {
        Name {
            name: self.name.clone(),
            namespace: self
                .namespace
                .clone()
                .or_else(|| enclosing_namespace.clone().filter(|ns| !ns.is_empty())),
        }
    }
}

impl From<&str> for Name {
    fn from(name: &str) -> Self {
        Name::new(name).unwrap()
    }
}

impl fmt::Display for Name {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str(&self.fullname(None)[..])
    }
}

impl<'de> Deserialize<'de> for Name {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: serde::de::Deserializer<'de>,
    {
        Value::deserialize(deserializer).and_then(|value| {
            use serde::de::Error;
            if let Value::Object(json) = value {
                Name::parse(&json, &None).map_err(Error::custom)
            } else {
                Err(Error::custom(format!("Expected a JSON object: {value:?}")))
            }
        })
    }
}

/// Newtype pattern for `Name` to better control the `serde_json::Value` representation.
/// Aliases are serialized as an array of plain strings in the JSON representation.
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct Alias(Name);

impl Alias {
    pub fn new(name: &str) -> AvroResult<Self> {
        Name::new(name).map(Self)
    }

    pub fn name(&self) -> String {
        self.0.name.clone()
    }

    pub fn namespace(&self) -> Namespace {
        self.0.namespace.clone()
    }

    pub fn fullname(&self, default_namespace: Namespace) -> String {
        self.0.fullname(default_namespace)
    }

    pub fn fully_qualified_name(&self, default_namespace: &Namespace) -> Name {
        self.0.fully_qualified_name(default_namespace)
    }
}

impl From<&str> for Alias {
    fn from(name: &str) -> Self {
        Alias::new(name).unwrap()
    }
}

impl Serialize for Alias {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        serializer.serialize_str(&self.fullname(None))
    }
}

#[derive(Debug)]
pub struct ResolvedSchema<'s> {
    names_ref: NamesRef<'s>,
    schemata: Vec<&'s Schema>,
}

impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
    type Error = Error;

    fn try_from(schema: &'s Schema) -> AvroResult<Self> {
        let names = HashMap::new();
        let mut rs = ResolvedSchema {
            names_ref: names,
            schemata: vec![schema],
        };
        rs.resolve(rs.get_schemata(), &None, None)?;
        Ok(rs)
    }
}

impl<'s> TryFrom<Vec<&'s Schema>> for ResolvedSchema<'s> {
    type Error = Error;

    fn try_from(schemata: Vec<&'s Schema>) -> AvroResult<Self> {
        let names = HashMap::new();
        let mut rs = ResolvedSchema {
            names_ref: names,
            schemata,
        };
        rs.resolve(rs.get_schemata(), &None, None)?;
        Ok(rs)
    }
}

impl<'s> ResolvedSchema<'s> {
    pub fn get_schemata(&self) -> Vec<&'s Schema> {
        self.schemata.clone()
    }

    pub fn get_names(&self) -> &NamesRef<'s> {
        &self.names_ref
    }

    /// Creates `ResolvedSchema` with some already known schemas.
    ///
    /// Those schemata would be used to resolve references if needed.
    pub fn new_with_known_schemata<'n>(
        schemata_to_resolve: Vec<&'s Schema>,
        enclosing_namespace: &Namespace,
        known_schemata: &'n NamesRef<'n>,
    ) -> AvroResult<Self> {
        let names = HashMap::new();
        let mut rs = ResolvedSchema {
            names_ref: names,
            schemata: schemata_to_resolve,
        };
        rs.resolve(rs.get_schemata(), enclosing_namespace, Some(known_schemata))?;
        Ok(rs)
    }

    fn resolve<'n>(
        &mut self,
        schemata: Vec<&'s Schema>,
        enclosing_namespace: &Namespace,
        known_schemata: Option<&'n NamesRef<'n>>,
    ) -> AvroResult<()> {
        for schema in schemata {
            match schema {
                Schema::Array(schema) | Schema::Map(schema) => {
                    self.resolve(vec![schema], enclosing_namespace, known_schemata)?
                }
                Schema::Union(UnionSchema { schemas, .. }) => {
                    for schema in schemas {
                        self.resolve(vec![schema], enclosing_namespace, known_schemata)?
                    }
                }
                Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => {
                    let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
                    if self
                        .names_ref
                        .insert(fully_qualified_name.clone(), schema)
                        .is_some()
                    {
                        return Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
                    }
                }
                Schema::Record(RecordSchema { name, fields, .. }) => {
                    let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
                    if self
                        .names_ref
                        .insert(fully_qualified_name.clone(), schema)
                        .is_some()
                    {
                        return Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
                    } else {
                        let record_namespace = fully_qualified_name.namespace;
                        for field in fields {
                            self.resolve(vec![&field.schema], &record_namespace, known_schemata)?
                        }
                    }
                }
                Schema::Ref { name } => {
                    let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
                    // first search for reference in current schemata, then look into external references.
                    if !self.names_ref.contains_key(&fully_qualified_name) {
                        let is_resolved_with_known_schemas = known_schemata
                            .as_ref()
                            .map(|names| names.contains_key(&fully_qualified_name))
                            .unwrap_or(false);
                        if !is_resolved_with_known_schemas {
                            return Err(Error::SchemaResolutionError(fully_qualified_name));
                        }
                    }
                }
                _ => (),
            }
        }
        Ok(())
    }
}

pub(crate) struct ResolvedOwnedSchema {
    names: Names,
    root_schema: Schema,
}

impl TryFrom<Schema> for ResolvedOwnedSchema {
    type Error = Error;

    fn try_from(schema: Schema) -> AvroResult<Self> {
        let names = HashMap::new();
        let mut rs = ResolvedOwnedSchema {
            names,
            root_schema: schema,
        };
        Self::from_internal(&rs.root_schema, &mut rs.names, &None)?;
        Ok(rs)
    }
}

impl ResolvedOwnedSchema {
    pub(crate) fn get_root_schema(&self) -> &Schema {
        &self.root_schema
    }
    pub(crate) fn get_names(&self) -> &Names {
        &self.names
    }

    fn from_internal(
        schema: &Schema,
        names: &mut Names,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<()> {
        match schema {
            Schema::Array(schema) | Schema::Map(schema) => {
                Self::from_internal(schema, names, enclosing_namespace)
            }
            Schema::Union(UnionSchema { schemas, .. }) => {
                for schema in schemas {
                    Self::from_internal(schema, names, enclosing_namespace)?
                }
                Ok(())
            }
            Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => {
                let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
                if names
                    .insert(fully_qualified_name.clone(), schema.clone())
                    .is_some()
                {
                    Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
                } else {
                    Ok(())
                }
            }
            Schema::Record(RecordSchema { name, fields, .. }) => {
                let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
                if names
                    .insert(fully_qualified_name.clone(), schema.clone())
                    .is_some()
                {
                    Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
                } else {
                    let record_namespace = fully_qualified_name.namespace;
                    for field in fields {
                        Self::from_internal(&field.schema, names, &record_namespace)?
                    }
                    Ok(())
                }
            }
            Schema::Ref { name } => {
                let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
                names
                    .get(&fully_qualified_name)
                    .map(|_| ())
                    .ok_or(Error::SchemaResolutionError(fully_qualified_name))
            }
            _ => Ok(()),
        }
    }
}

/// Represents a `field` in a `record` Avro schema.
#[derive(Clone, Debug, PartialEq)]
pub struct RecordField {
    /// Name of the field.
    pub name: String,
    /// Documentation of the field.
    pub doc: Documentation,
    /// Aliases of the field's name. They have no namespace.
    pub aliases: Option<Vec<String>>,
    /// Default value of the field.
    /// This value will be used when reading Avro datum if schema resolution
    /// is enabled.
    pub default: Option<Value>,
    /// Schema of the field.
    pub schema: Schema,
    /// Order of the field.
    ///
    /// **NOTE** This currently has no effect.
    pub order: RecordFieldOrder,
    /// Position of the field in the list of `field` of its parent `Schema`
    pub position: usize,
    /// A collection of all unknown fields in the record field.
    pub custom_attributes: BTreeMap<String, Value>,
}

/// Represents any valid order for a `field` in a `record` Avro schema.
#[derive(Clone, Debug, Eq, PartialEq, EnumString)]
#[strum(serialize_all = "kebab_case")]
pub enum RecordFieldOrder {
    Ascending,
    Descending,
    Ignore,
}

impl RecordField {
    /// Parse a `serde_json::Value` into a `RecordField`.
    fn parse(
        field: &Map<String, Value>,
        position: usize,
        parser: &mut Parser,
        enclosing_record: &Name,
    ) -> AvroResult<Self> {
        let name = field.name().ok_or(Error::GetNameFieldFromRecord)?;

        if !FIELD_NAME_R.is_match(&name) {
            return Err(Error::FieldName(name));
        }

        // TODO: "type" = "<record name>"
        let schema = parser.parse_complex(field, &enclosing_record.namespace)?;

        let default = field.get("default").cloned();
        Self::resolve_default_value(
            &schema,
            &name,
            &enclosing_record.fullname(None),
            &parser.parsed_schemas,
            &default,
        )?;

        let aliases = field.get("aliases").and_then(|aliases| {
            aliases.as_array().map(|aliases| {
                aliases
                    .iter()
                    .flat_map(|alias| alias.as_str())
                    .map(|alias| alias.to_string())
                    .collect::<Vec<String>>()
            })
        });

        let order = field
            .get("order")
            .and_then(|order| order.as_str())
            .and_then(|order| RecordFieldOrder::from_str(order).ok())
            .unwrap_or(RecordFieldOrder::Ascending);

        Ok(RecordField {
            name,
            doc: field.doc(),
            default,
            aliases,
            schema,
            order,
            position,
            custom_attributes: RecordField::get_field_custom_attributes(field),
        })
    }

    fn resolve_default_value(
        field_schema: &Schema,
        field_name: &str,
        record_name: &str,
        names: &Names,
        default: &Option<Value>,
    ) -> AvroResult<()> {
        if let Some(value) = default {
            let avro_value = types::Value::from(value.clone());
            match field_schema {
                Schema::Union(union_schema) => {
                    let schemas = &union_schema.schemas;
                    let resolved = schemas.iter().any(|schema| {
                        avro_value
                            .to_owned()
                            .resolve_internal(schema, names, &schema.namespace(), &None)
                            .is_ok()
                    });

                    if !resolved {
                        let schema: Option<&Schema> = schemas.get(0);
                        return match schema {
                            Some(first_schema) => Err(Error::GetDefaultUnion(
                                SchemaKind::from(first_schema),
                                types::ValueKind::from(avro_value),
                            )),
                            None => Err(Error::EmptyUnion),
                        };
                    }
                }
                _ => {
                    let resolved = avro_value
                        .resolve_internal(field_schema, names, &field_schema.namespace(), &None)
                        .is_ok();

                    if !resolved {
                        return Err(Error::GetDefaultRecordField(
                            field_name.to_string(),
                            record_name.to_string(),
                            field_schema.canonical_form(),
                        ));
                    }
                }
            };
        }

        Ok(())
    }

    fn get_field_custom_attributes(field: &Map<String, Value>) -> BTreeMap<String, Value> {
        let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
        for (key, value) in field {
            match key.as_str() {
                "type" | "name" | "doc" | "default" | "order" | "position" => continue,
                _ => custom_attributes.insert(key.clone(), value.clone()),
            };
            custom_attributes.insert(key.to_string(), value.clone());
        }
        custom_attributes
    }

    /// Returns true if this `RecordField` is nullable, meaning the schema is a `UnionSchema` where the first variant is `Null`.
    pub fn is_nullable(&self) -> bool {
        match self.schema {
            Schema::Union(ref inner) => inner.is_nullable(),
            _ => false,
        }
    }
}

/// A description of an Enum schema.
#[derive(Debug, Clone)]
pub struct RecordSchema {
    /// The name of the schema
    pub name: Name,
    /// The aliases of the schema
    pub aliases: Aliases,
    /// The documentation of the schema
    pub doc: Documentation,
    /// The set of fields of the schema
    pub fields: Vec<RecordField>,
    /// The `lookup` table maps field names to their position in the `Vec`
    /// of `fields`.
    pub lookup: BTreeMap<String, usize>,
    /// The custom attributes of the schema
    pub attributes: BTreeMap<String, Value>,
}

/// A description of an Enum schema.
#[derive(Debug, Clone)]
pub struct EnumSchema {
    /// The name of the schema
    pub name: Name,
    /// The aliases of the schema
    pub aliases: Aliases,
    /// The documentation of the schema
    pub doc: Documentation,
    /// The set of symbols of the schema
    pub symbols: Vec<String>,
    /// An optional default symbol used for compatibility
    pub default: Option<String>,
    /// The custom attributes of the schema
    pub attributes: BTreeMap<String, Value>,
}

/// A description of a Union schema.
#[derive(Debug, Clone)]
pub struct FixedSchema {
    /// The name of the schema
    pub name: Name,
    /// The aliases of the schema
    pub aliases: Aliases,
    /// The documentation of the schema
    pub doc: Documentation,
    /// The size of the fixed schema
    pub size: usize,
    /// The custom attributes of the schema
    pub attributes: BTreeMap<String, Value>,
}

/// A description of a Union schema.
///
/// `scale` defaults to 0 and is an integer greater than or equal to 0 and `precision` is an
/// integer greater than 0.
#[derive(Debug, Clone)]
pub struct DecimalSchema {
    /// The number of digits in the unscaled value
    pub precision: DecimalMetadata,
    /// The number of digits to the right of the decimal point
    pub scale: DecimalMetadata,
    /// The inner schema of the decimal (fixed or bytes)
    pub inner: Box<Schema>,
}

/// A description of a Union schema
#[derive(Debug, Clone)]
pub struct UnionSchema {
    /// The schemas that make up this union
    pub(crate) schemas: Vec<Schema>,
    // Used to ensure uniqueness of schema inputs, and provide constant time finding of the
    // schema index given a value.
    // **NOTE** that this approach does not work for named types, and will have to be modified
    // to support that. A simple solution is to also keep a mapping of the names used.
    variant_index: BTreeMap<SchemaKind, usize>,
}

impl UnionSchema {
    /// Creates a new UnionSchema from a vector of schemas.
    pub fn new(schemas: Vec<Schema>) -> AvroResult<Self> {
        let mut vindex = BTreeMap::new();
        for (i, schema) in schemas.iter().enumerate() {
            if let Schema::Union(_) = schema {
                return Err(Error::GetNestedUnion);
            }
            let kind = SchemaKind::from(schema);
            if !kind.is_named() && vindex.insert(kind, i).is_some() {
                return Err(Error::GetUnionDuplicate);
            }
        }
        Ok(UnionSchema {
            schemas,
            variant_index: vindex,
        })
    }

    /// Returns a slice to all variants of this schema.
    pub fn variants(&self) -> &[Schema] {
        &self.schemas
    }

    /// Returns true if the any of the variants of this `UnionSchema` is `Null`.
    pub fn is_nullable(&self) -> bool {
        !self.schemas.is_empty() && self.schemas.iter().any(|s| s == &Schema::Null)
    }

    /// Optionally returns a reference to the schema matched by this value, as well as its position
    /// within this union.
    #[deprecated(
        since = "0.15.0",
        note = "Please use `find_schema_with_known_schemata` instead"
    )]
    pub fn find_schema(&self, value: &types::Value) -> Option<(usize, &Schema)> {
        self.find_schema_with_known_schemata::<Schema>(value, None, &None)
    }

    /// Optionally returns a reference to the schema matched by this value, as well as its position
    /// within this union.
    ///
    /// Extra arguments:
    /// - `known_schemata` - mapping between `Name` and `Schema` - if passed, additional external schemas would be used to resolve references.
    pub fn find_schema_with_known_schemata<S: Borrow<Schema> + Debug>(
        &self,
        value: &types::Value,
        known_schemata: Option<&HashMap<Name, S>>,
        enclosing_namespace: &Namespace,
    ) -> Option<(usize, &Schema)> {
        let schema_kind = SchemaKind::from(value);
        if let Some(&i) = self.variant_index.get(&schema_kind) {
            // fast path
            Some((i, &self.schemas[i]))
        } else {
            // slow path (required for matching logical or named types)

            // first collect what schemas we already know
            let mut collected_names: HashMap<Name, &Schema> = known_schemata
                .map(|names| {
                    names
                        .iter()
                        .map(|(name, schema)| (name.clone(), schema.borrow()))
                        .collect()
                })
                .unwrap_or_default();

            self.schemas.iter().enumerate().find(|(_, schema)| {
                let resolved_schema = ResolvedSchema::new_with_known_schemata(
                    vec![*schema],
                    enclosing_namespace,
                    &collected_names,
                )
                .expect("Schema didn't successfully parse");
                let resolved_names = resolved_schema.names_ref;

                // extend known schemas with just resolved names
                collected_names.extend(resolved_names);
                let namespace = &schema.namespace().or_else(|| enclosing_namespace.clone());

                value
                    .clone()
                    .resolve_internal(schema, &collected_names, namespace, &None)
                    .is_ok()
            })
        }
    }
}

// No need to compare variant_index, it is derivative of schemas.
impl PartialEq for UnionSchema {
    fn eq(&self, other: &UnionSchema) -> bool {
        self.schemas.eq(&other.schemas)
    }
}

type DecimalMetadata = usize;
pub(crate) type Precision = DecimalMetadata;
pub(crate) type Scale = DecimalMetadata;

fn parse_json_integer_for_decimal(value: &serde_json::Number) -> Result<DecimalMetadata, Error> {
    Ok(if value.is_u64() {
        let num = value
            .as_u64()
            .ok_or_else(|| Error::GetU64FromJson(value.clone()))?;
        num.try_into()
            .map_err(|e| Error::ConvertU64ToUsize(e, num))?
    } else if value.is_i64() {
        let num = value
            .as_i64()
            .ok_or_else(|| Error::GetI64FromJson(value.clone()))?;
        num.try_into()
            .map_err(|e| Error::ConvertI64ToUsize(e, num))?
    } else {
        return Err(Error::GetPrecisionOrScaleFromJson(value.clone()));
    })
}

#[derive(Default)]
struct Parser {
    input_schemas: HashMap<Name, Value>,
    /// A map of name -> Schema::Ref
    /// Used to resolve cyclic references, i.e. when a
    /// field's type is a reference to its record's type
    resolving_schemas: Names,
    input_order: Vec<Name>,
    /// A map of name -> fully parsed Schema
    /// Used to avoid parsing the same schema twice
    parsed_schemas: Names,
}

impl Schema {
    /// Converts `self` into its [Parsing Canonical Form].
    ///
    /// [Parsing Canonical Form]:
    /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas
    pub fn canonical_form(&self) -> String {
        let json = serde_json::to_value(self)
            .unwrap_or_else(|e| panic!("Cannot parse Schema from JSON: {e}"));
        parsing_canonical_form(&json)
    }

    /// Generate [fingerprint] of Schema's [Parsing Canonical Form].
    ///
    /// [Parsing Canonical Form]:
    /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas
    /// [fingerprint]:
    /// https://avro.apache.org/docs/current/spec.html#schema_fingerprints
    pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
        let mut d = D::new();
        d.update(self.canonical_form());
        SchemaFingerprint {
            bytes: d.finalize().to_vec(),
        }
    }

    /// Create a `Schema` from a string representing a JSON Avro schema.
    pub fn parse_str(input: &str) -> Result<Schema, Error> {
        let mut parser = Parser::default();
        parser.parse_str(input)
    }

    /// Create a array of `Schema`'s from a list of named JSON Avro schemas (Record, Enum, and
    /// Fixed).
    ///
    /// It is allowed that the schemas have cross-dependencies; these will be resolved
    /// during parsing.
    ///
    /// If two of the input schemas have the same fullname, an Error will be returned.
    pub fn parse_list(input: &[&str]) -> AvroResult<Vec<Schema>> {
        let mut input_schemas: HashMap<Name, Value> = HashMap::with_capacity(input.len());
        let mut input_order: Vec<Name> = Vec::with_capacity(input.len());
        for js in input {
            let schema: Value = serde_json::from_str(js).map_err(Error::ParseSchemaJson)?;
            if let Value::Object(inner) = &schema {
                let name = Name::parse(inner, &None)?;
                let previous_value = input_schemas.insert(name.clone(), schema);
                if previous_value.is_some() {
                    return Err(Error::NameCollision(name.fullname(None)));
                }
                input_order.push(name);
            } else {
                return Err(Error::GetNameField);
            }
        }
        let mut parser = Parser {
            input_schemas,
            resolving_schemas: HashMap::default(),
            input_order,
            parsed_schemas: HashMap::with_capacity(input.len()),
        };
        parser.parse_list()
    }

    /// Create a `Schema` from a reader which implements [`Read`].
    pub fn parse_reader(reader: &mut (impl Read + ?Sized)) -> AvroResult<Schema> {
        let mut buf = String::new();
        match reader.read_to_string(&mut buf) {
            Ok(_) => Self::parse_str(&buf),
            Err(e) => Err(Error::ReadSchemaFromReader(e)),
        }
    }

    /// Parses an Avro schema from JSON.
    pub fn parse(value: &Value) -> AvroResult<Schema> {
        let mut parser = Parser::default();
        parser.parse(value, &None)
    }

    /// Parses an Avro schema from JSON.
    /// Any `Schema::Ref`s must be known in the `names` map.
    pub(crate) fn parse_with_names(value: &Value, names: Names) -> AvroResult<Schema> {
        let mut parser = Parser {
            input_schemas: HashMap::with_capacity(1),
            resolving_schemas: Names::default(),
            input_order: Vec::with_capacity(1),
            parsed_schemas: names,
        };
        parser.parse(value, &None)
    }

    /// Returns the custom attributes (metadata) if the schema supports them.
    pub fn custom_attributes(&self) -> Option<&BTreeMap<String, Value>> {
        match self {
            Schema::Record(RecordSchema { attributes, .. })
            | Schema::Enum(EnumSchema { attributes, .. })
            | Schema::Fixed(FixedSchema { attributes, .. }) => Some(attributes),
            _ => None,
        }
    }

    /// Returns the name of the schema if it has one.
    pub fn name(&self) -> Option<&Name> {
        match self {
            Schema::Ref { ref name, .. }
            | Schema::Record(RecordSchema { ref name, .. })
            | Schema::Enum(EnumSchema { ref name, .. })
            | Schema::Fixed(FixedSchema { ref name, .. }) => Some(name),
            _ => None,
        }
    }

    /// Returns the namespace of the schema if it has one.
    pub fn namespace(&self) -> Namespace {
        self.name().and_then(|n| n.namespace.clone())
    }
}

impl Parser {
    /// Create a `Schema` from a string representing a JSON Avro schema.
    fn parse_str(&mut self, input: &str) -> Result<Schema, Error> {
        let value = serde_json::from_str(input).map_err(Error::ParseSchemaJson)?;
        self.parse(&value, &None)
    }

    /// Create an array of `Schema`'s from an iterator of JSON Avro schemas. It is allowed that
    /// the schemas have cross-dependencies; these will be resolved during parsing.
    fn parse_list(&mut self) -> Result<Vec<Schema>, Error> {
        while !self.input_schemas.is_empty() {
            let next_name = self
                .input_schemas
                .keys()
                .next()
                .expect("Input schemas unexpectedly empty")
                .to_owned();
            let (name, value) = self
                .input_schemas
                .remove_entry(&next_name)
                .expect("Key unexpectedly missing");
            let parsed = self.parse(&value, &None)?;
            self.parsed_schemas
                .insert(get_schema_type_name(name, value), parsed);
        }

        let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
        for name in self.input_order.drain(0..) {
            let parsed = self
                .parsed_schemas
                .remove(&name)
                .expect("One of the input schemas was unexpectedly not parsed");
            parsed_schemas.push(parsed);
        }
        Ok(parsed_schemas)
    }

    /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro
    /// schema.
    fn parse(&mut self, value: &Value, enclosing_namespace: &Namespace) -> AvroResult<Schema> {
        match *value {
            Value::String(ref t) => self.parse_known_schema(t.as_str(), enclosing_namespace),
            Value::Object(ref data) => self.parse_complex(data, enclosing_namespace),
            Value::Array(ref data) => self.parse_union(data, enclosing_namespace),
            _ => Err(Error::ParseSchemaFromValidJson),
        }
    }

    /// Parse a `serde_json::Value` representing an Avro type whose Schema is known into a
    /// `Schema`. A Schema for a `serde_json::Value` is known if it is primitive or has
    /// been parsed previously by the parsed and stored in its map of parsed_schemas.
    fn parse_known_schema(
        &mut self,
        name: &str,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        match name {
            "null" => Ok(Schema::Null),
            "boolean" => Ok(Schema::Boolean),
            "int" => Ok(Schema::Int),
            "long" => Ok(Schema::Long),
            "double" => Ok(Schema::Double),
            "float" => Ok(Schema::Float),
            "bytes" => Ok(Schema::Bytes),
            "string" => Ok(Schema::String),
            _ => self.fetch_schema_ref(name, enclosing_namespace),
        }
    }

    /// Given a name, tries to retrieve the parsed schema from `parsed_schemas`.
    /// If a parsed schema is not found, it checks if a currently resolving
    /// schema with that name exists.
    /// If a resolving schema is not found, it checks if a json with that name exists
    /// in `input_schemas` and then parses it (removing it from `input_schemas`)
    /// and adds the parsed schema to `parsed_schemas`.
    ///
    /// This method allows schemas definitions that depend on other types to
    /// parse their dependencies (or look them up if already parsed).
    fn fetch_schema_ref(
        &mut self,
        name: &str,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        fn get_schema_ref(parsed: &Schema) -> Schema {
            match &parsed {
                Schema::Record(RecordSchema { ref name, .. })
                | Schema::Enum(EnumSchema { ref name, .. })
                | Schema::Fixed(FixedSchema { ref name, .. }) => Schema::Ref { name: name.clone() },
                _ => parsed.clone(),
            }
        }

        let name = Name::new(name)?;
        let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);

        if self.parsed_schemas.get(&fully_qualified_name).is_some() {
            return Ok(Schema::Ref {
                name: fully_qualified_name,
            });
        }
        if let Some(resolving_schema) = self.resolving_schemas.get(&fully_qualified_name) {
            return Ok(resolving_schema.clone());
        }

        let value = self
            .input_schemas
            .remove(&fully_qualified_name)
            // TODO make a better descriptive error message here that conveys that a named schema cannot be found
            .ok_or_else(|| Error::ParsePrimitive(fully_qualified_name.fullname(None)))?;

        // parsing a full schema from inside another schema. Other full schema will not inherit namespace
        let parsed = self.parse(&value, &None)?;
        self.parsed_schemas
            .insert(get_schema_type_name(name, value), parsed.clone());

        Ok(get_schema_ref(&parsed))
    }

    fn parse_precision_and_scale(
        complex: &Map<String, Value>,
    ) -> Result<(Precision, Scale), Error> {
        fn get_decimal_integer(
            complex: &Map<String, Value>,
            key: &'static str,
        ) -> Result<DecimalMetadata, Error> {
            match complex.get(key) {
                Some(Value::Number(value)) => parse_json_integer_for_decimal(value),
                None => {
                    if key == "scale" {
                        Ok(0)
                    } else {
                        Err(Error::GetDecimalMetadataFromJson(key))
                    }
                }
                Some(value) => Err(Error::GetDecimalMetadataValueFromJson {
                    key: key.into(),
                    value: value.clone(),
                }),
            }
        }
        let precision = get_decimal_integer(complex, "precision")?;
        let scale = get_decimal_integer(complex, "scale")?;

        if precision < 1 {
            return Err(Error::DecimalPrecisionMuBePositive { precision });
        }

        if precision < scale {
            Err(Error::DecimalPrecisionLessThanScale { precision, scale })
        } else {
            Ok((precision, scale))
        }
    }

    /// Parse a `serde_json::Value` representing a complex Avro type into a
    /// `Schema`.
    ///
    /// Avro supports "recursive" definition of types.
    /// e.g: {"type": {"type": "string"}}
    fn parse_complex(
        &mut self,
        complex: &Map<String, Value>,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        fn logical_verify_type(
            complex: &Map<String, Value>,
            kinds: &[SchemaKind],
            parser: &mut Parser,
            enclosing_namespace: &Namespace,
        ) -> AvroResult<Schema> {
            match complex.get("type") {
                Some(value) => {
                    let ty = match value {
                        Value::String(s) if s == "fixed" => {
                            parser.parse_fixed(complex, enclosing_namespace)?
                        }
                        _ => parser.parse(value, enclosing_namespace)?,
                    };

                    if kinds
                        .iter()
                        .any(|&kind| SchemaKind::from(ty.clone()) == kind)
                    {
                        Ok(ty)
                    } else {
                        match get_type_rec(value.clone()) {
                            Ok(v) => Err(Error::GetLogicalTypeVariant(v)),
                            Err(err) => Err(err),
                        }
                    }
                }
                None => Err(Error::GetLogicalTypeField),
            }
        }

        fn get_type_rec(json_value: Value) -> AvroResult<Value> {
            match json_value {
                typ @ Value::String(_) => Ok(typ),
                Value::Object(ref complex) => match complex.get("type") {
                    Some(v) => get_type_rec(v.clone()),
                    None => Err(Error::GetComplexTypeField),
                },
                _ => Err(Error::GetComplexTypeField),
            }
        }

        // checks whether the logicalType is supported by the type
        fn try_logical_type(
            logical_type: &str,
            complex: &Map<String, Value>,
            kinds: &[SchemaKind],
            ok_schema: Schema,
            parser: &mut Parser,
            enclosing_namespace: &Namespace,
        ) -> AvroResult<Schema> {
            match logical_verify_type(complex, kinds, parser, enclosing_namespace) {
                // type and logicalType match!
                Ok(_) => Ok(ok_schema),
                // the logicalType is not expected for this type!
                Err(Error::GetLogicalTypeVariant(json_value)) => match json_value {
                    Value::String(_) => match parser.parse(&json_value, enclosing_namespace) {
                        Ok(schema) => {
                            warn!(
                                "Ignoring invalid logical type '{}' for schema of type: {:?}!",
                                logical_type, schema
                            );
                            Ok(schema)
                        }
                        Err(parse_err) => Err(parse_err),
                    },
                    _ => Err(Error::GetLogicalTypeVariant(json_value)),
                },
                err => err,
            }
        }

        match complex.get("logicalType") {
            Some(Value::String(t)) => match t.as_str() {
                "decimal" => {
                    let inner = Box::new(logical_verify_type(
                        complex,
                        &[SchemaKind::Fixed, SchemaKind::Bytes],
                        self,
                        enclosing_namespace,
                    )?);

                    let (precision, scale) = Self::parse_precision_and_scale(complex)?;

                    return Ok(Schema::Decimal(DecimalSchema {
                        precision,
                        scale,
                        inner,
                    }));
                }
                "uuid" => {
                    logical_verify_type(complex, &[SchemaKind::String], self, enclosing_namespace)?;
                    return Ok(Schema::Uuid);
                }
                "date" => {
                    return try_logical_type(
                        "date",
                        complex,
                        &[SchemaKind::Int],
                        Schema::Date,
                        self,
                        enclosing_namespace,
                    );
                }
                "time-millis" => {
                    return try_logical_type(
                        "time-millis",
                        complex,
                        &[SchemaKind::Int],
                        Schema::TimeMillis,
                        self,
                        enclosing_namespace,
                    );
                }
                "time-micros" => {
                    return try_logical_type(
                        "time-micros",
                        complex,
                        &[SchemaKind::Long],
                        Schema::TimeMicros,
                        self,
                        enclosing_namespace,
                    );
                }
                "timestamp-millis" => {
                    return try_logical_type(
                        "timestamp-millis",
                        complex,
                        &[SchemaKind::Long],
                        Schema::TimestampMillis,
                        self,
                        enclosing_namespace,
                    );
                }
                "timestamp-micros" => {
                    return try_logical_type(
                        "timestamp-micros",
                        complex,
                        &[SchemaKind::Long],
                        Schema::TimestampMicros,
                        self,
                        enclosing_namespace,
                    );
                }
                "local-timestamp-millis" => {
                    return try_logical_type(
                        "local-timestamp-millis",
                        complex,
                        &[SchemaKind::Long],
                        Schema::LocalTimestampMillis,
                        self,
                        enclosing_namespace,
                    );
                }
                "local-timestamp-micros" => {
                    return try_logical_type(
                        "local-timestamp-micros",
                        complex,
                        &[SchemaKind::Long],
                        Schema::LocalTimestampMicros,
                        self,
                        enclosing_namespace,
                    );
                }
                "duration" => {
                    logical_verify_type(complex, &[SchemaKind::Fixed], self, enclosing_namespace)?;
                    return Ok(Schema::Duration);
                }
                // In this case, of an unknown logical type, we just pass through to the underlying
                // type.
                _ => {}
            },
            // The spec says to ignore invalid logical types and just continue through to the
            // underlying type - It is unclear whether that applies to this case or not, where the
            // `logicalType` is not a string.
            Some(_) => return Err(Error::GetLogicalTypeFieldType),
            _ => {}
        }
        match complex.get("type") {
            Some(Value::String(t)) => match t.as_str() {
                "record" => self.parse_record(complex, enclosing_namespace),
                "enum" => self.parse_enum(complex, enclosing_namespace),
                "array" => self.parse_array(complex, enclosing_namespace),
                "map" => self.parse_map(complex, enclosing_namespace),
                "fixed" => self.parse_fixed(complex, enclosing_namespace),
                other => self.parse_known_schema(other, enclosing_namespace),
            },
            Some(Value::Object(data)) => self.parse_complex(data, enclosing_namespace),
            Some(Value::Array(variants)) => self.parse_union(variants, enclosing_namespace),
            Some(unknown) => Err(Error::GetComplexType(unknown.clone())),
            None => Err(Error::GetComplexTypeField),
        }
    }

    fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
        let resolving_schema = Schema::Ref { name: name.clone() };
        self.resolving_schemas
            .insert(name.clone(), resolving_schema.clone());

        let namespace = &name.namespace;

        if let Some(ref aliases) = aliases {
            aliases.iter().for_each(|alias| {
                let alias_fullname = alias.fully_qualified_name(namespace);
                self.resolving_schemas
                    .insert(alias_fullname, resolving_schema.clone());
            });
        }
    }

    fn register_parsed_schema(
        &mut self,
        fully_qualified_name: &Name,
        schema: &Schema,
        aliases: &Aliases,
    ) {
        // FIXME, this should be globally aware, so if there is something overwriting something
        // else then there is an ambiguous schema definition. An appropriate error should be thrown
        self.parsed_schemas
            .insert(fully_qualified_name.clone(), schema.clone());
        self.resolving_schemas.remove(fully_qualified_name);

        let namespace = &fully_qualified_name.namespace;

        if let Some(ref aliases) = aliases {
            aliases.iter().for_each(|alias| {
                let alias_fullname = alias.fully_qualified_name(namespace);
                self.resolving_schemas.remove(&alias_fullname);
                self.parsed_schemas.insert(alias_fullname, schema.clone());
            });
        }
    }

    /// Returns already parsed schema or a schema that is currently being resolved.
    fn get_already_seen_schema(
        &self,
        complex: &Map<String, Value>,
        enclosing_namespace: &Namespace,
    ) -> Option<&Schema> {
        match complex.get("type") {
            Some(Value::String(ref typ)) => {
                let name = Name::new(typ.as_str())
                    .unwrap()
                    .fully_qualified_name(enclosing_namespace);
                self.resolving_schemas
                    .get(&name)
                    .or_else(|| self.parsed_schemas.get(&name))
            }
            _ => None,
        }
    }

    /// Parse a `serde_json::Value` representing a Avro record type into a
    /// `Schema`.
    fn parse_record(
        &mut self,
        complex: &Map<String, Value>,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        let fields_opt = complex.get("fields");

        if fields_opt.is_none() {
            if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
                return Ok(seen.clone());
            }
        }

        let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
        let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);

        let mut lookup = BTreeMap::new();

        self.register_resolving_schema(&fully_qualified_name, &aliases);

        let fields: Vec<RecordField> = fields_opt
            .and_then(|fields| fields.as_array())
            .ok_or(Error::GetRecordFieldsJson)
            .and_then(|fields| {
                fields
                    .iter()
                    .filter_map(|field| field.as_object())
                    .enumerate()
                    .map(|(position, field)| {
                        RecordField::parse(field, position, self, &fully_qualified_name)
                    })
                    .collect::<Result<_, _>>()
            })?;

        for field in &fields {
            if let Some(_old) = lookup.insert(field.name.clone(), field.position) {
                return Err(Error::FieldNameDuplicate(field.name.clone()));
            }

            if let Some(ref field_aliases) = field.aliases {
                for alias in field_aliases {
                    lookup.insert(alias.clone(), field.position);
                }
            }
        }

        let schema = Schema::Record(RecordSchema {
            name: fully_qualified_name.clone(),
            aliases: aliases.clone(),
            doc: complex.doc(),
            fields,
            lookup,
            attributes: self.get_custom_attributes(complex, vec!["fields"]),
        });

        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
        Ok(schema)
    }

    fn get_custom_attributes(
        &self,
        complex: &Map<String, Value>,
        excluded: Vec<&'static str>,
    ) -> BTreeMap<String, Value> {
        let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
        for (key, value) in complex {
            match key.as_str() {
                "type" | "name" | "namespace" | "doc" | "aliases" => continue,
                candidate if excluded.contains(&candidate) => continue,
                _ => custom_attributes.insert(key.clone(), value.clone()),
            };
        }
        custom_attributes
    }

    /// Parse a `serde_json::Value` representing a Avro enum type into a
    /// `Schema`.
    fn parse_enum(
        &mut self,
        complex: &Map<String, Value>,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        let symbols_opt = complex.get("symbols");

        if symbols_opt.is_none() {
            if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
                return Ok(seen.clone());
            }
        }

        let name = Name::parse(complex, enclosing_namespace)?;
        let fully_qualified_name = name.clone();
        let aliases = fix_aliases_namespace(complex.aliases(), &name.namespace);

        let symbols: Vec<String> = symbols_opt
            .and_then(|v| v.as_array())
            .ok_or(Error::GetEnumSymbolsField)
            .and_then(|symbols| {
                symbols
                    .iter()
                    .map(|symbol| symbol.as_str().map(|s| s.to_string()))
                    .collect::<Option<_>>()
                    .ok_or(Error::GetEnumSymbols)
            })?;

        let mut existing_symbols: HashSet<&String> = HashSet::with_capacity(symbols.len());
        for symbol in symbols.iter() {
            // Ensure enum symbol names match [A-Za-z_][A-Za-z0-9_]*
            if !ENUM_SYMBOL_NAME_R.is_match(symbol) {
                return Err(Error::EnumSymbolName(symbol.to_string()));
            }

            // Ensure there are no duplicate symbols
            if existing_symbols.contains(&symbol) {
                return Err(Error::EnumSymbolDuplicate(symbol.to_string()));
            }

            existing_symbols.insert(symbol);
        }

        let mut default: Option<String> = None;
        if let Some(value) = complex.get("default") {
            if let Value::String(ref s) = *value {
                default = Some(s.clone());
            } else {
                return Err(Error::EnumDefaultWrongType(value.clone()));
            }
        }

        if let Some(ref value) = default {
            let resolved = types::Value::from(value.clone())
                .resolve_enum(&symbols, &Some(value.to_string()), &None)
                .is_ok();
            if !resolved {
                return Err(Error::GetEnumDefault {
                    symbol: value.to_string(),
                    symbols,
                });
            }
        }

        let schema = Schema::Enum(EnumSchema {
            name: fully_qualified_name.clone(),
            aliases: aliases.clone(),
            doc: complex.doc(),
            symbols,
            default,
            attributes: self.get_custom_attributes(complex, vec!["symbols"]),
        });

        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);

        Ok(schema)
    }

    /// Parse a `serde_json::Value` representing a Avro array type into a
    /// `Schema`.
    fn parse_array(
        &mut self,
        complex: &Map<String, Value>,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        complex
            .get("items")
            .ok_or(Error::GetArrayItemsField)
            .and_then(|items| self.parse(items, enclosing_namespace))
            .map(|schema| Schema::Array(Box::new(schema)))
    }

    /// Parse a `serde_json::Value` representing a Avro map type into a
    /// `Schema`.
    fn parse_map(
        &mut self,
        complex: &Map<String, Value>,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        complex
            .get("values")
            .ok_or(Error::GetMapValuesField)
            .and_then(|items| self.parse(items, enclosing_namespace))
            .map(|schema| Schema::Map(Box::new(schema)))
    }

    /// Parse a `serde_json::Value` representing a Avro union type into a
    /// `Schema`.
    fn parse_union(
        &mut self,
        items: &[Value],
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        items
            .iter()
            .map(|v| self.parse(v, enclosing_namespace))
            .collect::<Result<Vec<_>, _>>()
            .and_then(|schemas| Ok(Schema::Union(UnionSchema::new(schemas)?)))
    }

    /// Parse a `serde_json::Value` representing a Avro fixed type into a
    /// `Schema`.
    fn parse_fixed(
        &mut self,
        complex: &Map<String, Value>,
        enclosing_namespace: &Namespace,
    ) -> AvroResult<Schema> {
        let size_opt = complex.get("size");
        if size_opt.is_none() {
            if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
                return Ok(seen.clone());
            }
        }

        let doc = complex.get("doc").and_then(|v| match &v {
            Value::String(ref docstr) => Some(docstr.clone()),
            _ => None,
        });

        let size = match size_opt {
            Some(size) => size
                .as_u64()
                .ok_or_else(|| Error::GetFixedSizeFieldPositive(size.clone())),
            None => Err(Error::GetFixedSizeField),
        }?;

        let name = Name::parse(complex, enclosing_namespace)?;
        let fully_qualified_name = name.clone();
        let aliases = fix_aliases_namespace(complex.aliases(), &name.namespace);

        let schema = Schema::Fixed(FixedSchema {
            name: fully_qualified_name.clone(),
            aliases: aliases.clone(),
            doc,
            size: size as usize,
            attributes: self.get_custom_attributes(complex, vec!["size"]),
        });

        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);

        Ok(schema)
    }
}

// A type alias may be specified either as a fully namespace-qualified, or relative
// to the namespace of the name it is an alias for. For example, if a type named "a.b"
// has aliases of "c" and "x.y", then the fully qualified names of its aliases are "a.c"
// and "x.y".
// https://avro.apache.org/docs/current/spec.html#Aliases
fn fix_aliases_namespace(aliases: Option<Vec<String>>, namespace: &Namespace) -> Aliases {
    aliases.map(|aliases| {
        aliases
            .iter()
            .map(|alias| {
                if alias.find('.').is_none() {
                    match namespace {
                        Some(ref ns) => format!("{ns}.{alias}"),
                        None => alias.clone(),
                    }
                } else {
                    alias.clone()
                }
            })
            .map(|alias| Alias::new(alias.as_str()).unwrap())
            .collect()
    })
}

fn get_schema_type_name(name: Name, value: Value) -> Name {
    match value.get("type") {
        Some(Value::Object(complex_type)) => match complex_type.name() {
            Some(name) => Name::new(name.as_str()).unwrap(),
            _ => name,
        },
        _ => name,
    }
}

impl Serialize for Schema {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        match *self {
            Schema::Ref { ref name } => serializer.serialize_str(&name.fullname(None)),
            Schema::Null => serializer.serialize_str("null"),
            Schema::Boolean => serializer.serialize_str("boolean"),
            Schema::Int => serializer.serialize_str("int"),
            Schema::Long => serializer.serialize_str("long"),
            Schema::Float => serializer.serialize_str("float"),
            Schema::Double => serializer.serialize_str("double"),
            Schema::Bytes => serializer.serialize_str("bytes"),
            Schema::String => serializer.serialize_str("string"),
            Schema::Array(ref inner) => {
                let mut map = serializer