apache-avro 0.16.0

A library for working with Apache Avro in Rust
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Logic for checking schema compatibility
use crate::schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind};
use std::{
    collections::{hash_map::DefaultHasher, HashSet},
    hash::Hasher,
    ptr,
};

pub struct SchemaCompatibility;

struct Checker {
    recursion: HashSet<(u64, u64)>,
}

impl Checker {
    /// Create a new checker, with recursion set to an empty set.
    pub(crate) fn new() -> Self {
        Self {
            recursion: HashSet::new(),
        }
    }

    pub(crate) fn can_read(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
        self.full_match_schemas(writers_schema, readers_schema)
    }

    pub(crate) fn full_match_schemas(
        &mut self,
        writers_schema: &Schema,
        readers_schema: &Schema,
    ) -> bool {
        if self.recursion_in_progress(writers_schema, readers_schema) {
            return true;
        }

        if !SchemaCompatibility::match_schemas(writers_schema, readers_schema) {
            return false;
        }

        let w_type = SchemaKind::from(writers_schema);
        let r_type = SchemaKind::from(readers_schema);

        if w_type != SchemaKind::Union && (r_type.is_primitive() || r_type == SchemaKind::Fixed) {
            return true;
        }

        match r_type {
            SchemaKind::Record => self.match_record_schemas(writers_schema, readers_schema),
            SchemaKind::Map => {
                if let Schema::Map(w_m) = writers_schema {
                    if let Schema::Map(r_m) = readers_schema {
                        self.full_match_schemas(w_m, r_m)
                    } else {
                        unreachable!("readers_schema should have been Schema::Map")
                    }
                } else {
                    unreachable!("writers_schema should have been Schema::Map")
                }
            }
            SchemaKind::Array => {
                if let Schema::Array(w_a) = writers_schema {
                    if let Schema::Array(r_a) = readers_schema {
                        self.full_match_schemas(w_a, r_a)
                    } else {
                        unreachable!("readers_schema should have been Schema::Array")
                    }
                } else {
                    unreachable!("writers_schema should have been Schema::Array")
                }
            }
            SchemaKind::Union => self.match_union_schemas(writers_schema, readers_schema),
            SchemaKind::Enum => {
                // reader's symbols must contain all writer's symbols
                if let Schema::Enum(EnumSchema {
                    symbols: w_symbols, ..
                }) = writers_schema
                {
                    if let Schema::Enum(EnumSchema {
                        symbols: r_symbols, ..
                    }) = readers_schema
                    {
                        return !w_symbols.iter().any(|e| !r_symbols.contains(e));
                    }
                }
                false
            }
            _ => {
                if w_type == SchemaKind::Union {
                    if let Schema::Union(r) = writers_schema {
                        if r.schemas.len() == 1 {
                            return self.full_match_schemas(&r.schemas[0], readers_schema);
                        }
                    }
                }
                false
            }
        }
    }

    fn match_record_schemas(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
        let w_type = SchemaKind::from(writers_schema);

        if w_type == SchemaKind::Union {
            return false;
        }

        if let Schema::Record(RecordSchema {
            fields: w_fields,
            lookup: w_lookup,
            ..
        }) = writers_schema
        {
            if let Schema::Record(RecordSchema {
                fields: r_fields, ..
            }) = readers_schema
            {
                for field in r_fields.iter() {
                    if let Some(pos) = w_lookup.get(&field.name) {
                        if !self.full_match_schemas(&w_fields[*pos].schema, &field.schema) {
                            return false;
                        }
                    } else if field.default.is_none() {
                        return false;
                    }
                }
            }
        }
        true
    }

    fn match_union_schemas(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
        let w_type = SchemaKind::from(writers_schema);
        let r_type = SchemaKind::from(readers_schema);

        assert_eq!(r_type, SchemaKind::Union);

        if w_type == SchemaKind::Union {
            if let Schema::Union(u) = writers_schema {
                u.schemas
                    .iter()
                    .all(|schema| self.full_match_schemas(schema, readers_schema))
            } else {
                unreachable!("writers_schema should have been Schema::Union")
            }
        } else if let Schema::Union(u) = readers_schema {
            u.schemas
                .iter()
                .any(|schema| self.full_match_schemas(writers_schema, schema))
        } else {
            unreachable!("readers_schema should have been Schema::Union")
        }
    }

    fn recursion_in_progress(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
        let mut hasher = DefaultHasher::new();
        ptr::hash(writers_schema, &mut hasher);
        let w_hash = hasher.finish();

        hasher = DefaultHasher::new();
        ptr::hash(readers_schema, &mut hasher);
        let r_hash = hasher.finish();

        let key = (w_hash, r_hash);
        // This is a shortcut to add if not exists *and* return false. It will return true
        // if it was able to insert.
        !self.recursion.insert(key)
    }
}

impl SchemaCompatibility {
    /// `can_read` performs a full, recursive check that a datum written using the
    /// writers_schema can be read using the readers_schema.
    pub fn can_read(writers_schema: &Schema, readers_schema: &Schema) -> bool {
        let mut c = Checker::new();
        c.can_read(writers_schema, readers_schema)
    }

    /// `mutual_read` performs a full, recursive check that a datum written using either
    /// the writers_schema or the readers_schema can be read using the other schema.
    pub fn mutual_read(writers_schema: &Schema, readers_schema: &Schema) -> bool {
        SchemaCompatibility::can_read(writers_schema, readers_schema)
            && SchemaCompatibility::can_read(readers_schema, writers_schema)
    }

    ///  `match_schemas` performs a basic check that a datum written with the
    ///  writers_schema could be read using the readers_schema. This check only includes
    ///  matching the types, including schema promotion, and matching the full name for
    ///  named types. Aliases for named types are not supported here, and the rust
    ///  implementation of Avro in general does not include support for aliases (I think).
    pub(crate) fn match_schemas(writers_schema: &Schema, readers_schema: &Schema) -> bool {
        let w_type = SchemaKind::from(writers_schema);
        let r_type = SchemaKind::from(readers_schema);

        if w_type == SchemaKind::Union || r_type == SchemaKind::Union {
            return true;
        }

        if w_type == r_type {
            if r_type.is_primitive() {
                return true;
            }

            match r_type {
                SchemaKind::Record => {
                    if let Schema::Record(RecordSchema { name: w_name, .. }) = writers_schema {
                        if let Schema::Record(RecordSchema { name: r_name, .. }) = readers_schema {
                            return w_name.fullname(None) == r_name.fullname(None);
                        } else {
                            unreachable!("readers_schema should have been Schema::Record")
                        }
                    } else {
                        unreachable!("writers_schema should have been Schema::Record")
                    }
                }
                SchemaKind::Fixed => {
                    if let Schema::Fixed(FixedSchema {
                        name: w_name,
                        aliases: _,
                        doc: _w_doc,
                        size: w_size,
                        attributes: _,
                    }) = writers_schema
                    {
                        if let Schema::Fixed(FixedSchema {
                            name: r_name,
                            aliases: _,
                            doc: _r_doc,
                            size: r_size,
                            attributes: _,
                        }) = readers_schema
                        {
                            return w_name.fullname(None) == r_name.fullname(None)
                                && w_size == r_size;
                        } else {
                            unreachable!("readers_schema should have been Schema::Fixed")
                        }
                    } else {
                        unreachable!("writers_schema should have been Schema::Fixed")
                    }
                }
                SchemaKind::Enum => {
                    if let Schema::Enum(EnumSchema { name: w_name, .. }) = writers_schema {
                        if let Schema::Enum(EnumSchema { name: r_name, .. }) = readers_schema {
                            return w_name.fullname(None) == r_name.fullname(None);
                        } else {
                            unreachable!("readers_schema should have been Schema::Enum")
                        }
                    } else {
                        unreachable!("writers_schema should have been Schema::Enum")
                    }
                }
                SchemaKind::Map => {
                    if let Schema::Map(w_m) = writers_schema {
                        if let Schema::Map(r_m) = readers_schema {
                            return SchemaCompatibility::match_schemas(w_m, r_m);
                        } else {
                            unreachable!("readers_schema should have been Schema::Map")
                        }
                    } else {
                        unreachable!("writers_schema should have been Schema::Map")
                    }
                }
                SchemaKind::Array => {
                    if let Schema::Array(w_a) = writers_schema {
                        if let Schema::Array(r_a) = readers_schema {
                            return SchemaCompatibility::match_schemas(w_a, r_a);
                        } else {
                            unreachable!("readers_schema should have been Schema::Array")
                        }
                    } else {
                        unreachable!("writers_schema should have been Schema::Array")
                    }
                }
                _ => (),
            };
        }

        if w_type == SchemaKind::Int
            && [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double]
                .iter()
                .any(|&t| t == r_type)
        {
            return true;
        }

        if w_type == SchemaKind::Long
            && [SchemaKind::Float, SchemaKind::Double]
                .iter()
                .any(|&t| t == r_type)
        {
            return true;
        }

        if w_type == SchemaKind::Float && r_type == SchemaKind::Double {
            return true;
        }

        if w_type == SchemaKind::String && r_type == SchemaKind::Bytes {
            return true;
        }

        if w_type == SchemaKind::Bytes && r_type == SchemaKind::String {
            return true;
        }

        false
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{
        types::{Record, Value},
        Codec, Reader, Writer,
    };
    use apache_avro_test_helper::TestResult;

    fn int_array_schema() -> Schema {
        Schema::parse_str(r#"{"type":"array", "items":"int"}"#).unwrap()
    }

    fn long_array_schema() -> Schema {
        Schema::parse_str(r#"{"type":"array", "items":"long"}"#).unwrap()
    }

    fn string_array_schema() -> Schema {
        Schema::parse_str(r#"{"type":"array", "items":"string"}"#).unwrap()
    }

    fn int_map_schema() -> Schema {
        Schema::parse_str(r#"{"type":"map", "values":"int"}"#).unwrap()
    }

    fn long_map_schema() -> Schema {
        Schema::parse_str(r#"{"type":"map", "values":"long"}"#).unwrap()
    }

    fn string_map_schema() -> Schema {
        Schema::parse_str(r#"{"type":"map", "values":"string"}"#).unwrap()
    }

    fn enum1_ab_schema() -> Schema {
        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B"]}"#).unwrap()
    }

    fn enum1_abc_schema() -> Schema {
        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}"#).unwrap()
    }

    fn enum1_bc_schema() -> Schema {
        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["B","C"]}"#).unwrap()
    }

    fn enum2_ab_schema() -> Schema {
        Schema::parse_str(r#"{"type":"enum", "name":"Enum2", "symbols":["A","B"]}"#).unwrap()
    }

    fn empty_record1_schema() -> Schema {
        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[]}"#).unwrap()
    }

    fn empty_record2_schema() -> Schema {
        Schema::parse_str(r#"{"type":"record", "name":"Record2", "fields": []}"#).unwrap()
    }

    fn a_int_record1_schema() -> Schema {
        Schema::parse_str(
            r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}]}"#,
        )
        .unwrap()
    }

    fn a_long_record1_schema() -> Schema {
        Schema::parse_str(
            r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"long"}]}"#,
        )
        .unwrap()
    }

    fn a_int_b_int_record1_schema() -> Schema {
        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int"}]}"#).unwrap()
    }

    fn a_dint_record1_schema() -> Schema {
        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}]}"#).unwrap()
    }

    fn a_int_b_dint_record1_schema() -> Schema {
        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
    }

    fn a_dint_b_dint_record1_schema() -> Schema {
        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
    }

    fn nested_record() -> Schema {
        Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}}]}"#).unwrap()
    }

    fn nested_optional_record() -> Schema {
        Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":["null",{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}],"default":null}]}"#).unwrap()
    }

    fn int_list_record_schema() -> Schema {
        Schema::parse_str(r#"{"type":"record", "name":"List", "fields": [{"name": "head", "type": "int"},{"name": "tail", "type": "array", "items": "int"}]}"#).unwrap()
    }

    fn long_list_record_schema() -> Schema {
        Schema::parse_str(
            r#"
      {
        "type":"record", "name":"List", "fields": [
          {"name": "head", "type": "long"},
          {"name": "tail", "type": "array", "items": "long"}
      ]}
"#,
        )
        .unwrap()
    }

    fn union_schema(schemas: Vec<Schema>) -> Schema {
        let schema_string = schemas
            .iter()
            .map(|s| s.canonical_form())
            .collect::<Vec<String>>()
            .join(",");
        Schema::parse_str(&format!("[{schema_string}]")).unwrap()
    }

    fn empty_union_schema() -> Schema {
        union_schema(vec![])
    }

    // unused
    // fn null_union_schema() -> Schema { union_schema(vec![Schema::Null]) }

    fn int_union_schema() -> Schema {
        union_schema(vec![Schema::Int])
    }

    fn long_union_schema() -> Schema {
        union_schema(vec![Schema::Long])
    }

    fn string_union_schema() -> Schema {
        union_schema(vec![Schema::String])
    }

    fn int_string_union_schema() -> Schema {
        union_schema(vec![Schema::Int, Schema::String])
    }

    fn string_int_union_schema() -> Schema {
        union_schema(vec![Schema::String, Schema::Int])
    }

    #[test]
    fn test_broken() {
        assert!(!SchemaCompatibility::can_read(
            &int_string_union_schema(),
            &int_union_schema()
        ))
    }

    #[test]
    fn test_incompatible_reader_writer_pairs() {
        let incompatible_schemas = vec![
            // null
            (Schema::Null, Schema::Int),
            (Schema::Null, Schema::Long),
            // boolean
            (Schema::Boolean, Schema::Int),
            // int
            (Schema::Int, Schema::Null),
            (Schema::Int, Schema::Boolean),
            (Schema::Int, Schema::Long),
            (Schema::Int, Schema::Float),
            (Schema::Int, Schema::Double),
            // long
            (Schema::Long, Schema::Float),
            (Schema::Long, Schema::Double),
            // float
            (Schema::Float, Schema::Double),
            // string
            (Schema::String, Schema::Boolean),
            (Schema::String, Schema::Int),
            // bytes
            (Schema::Bytes, Schema::Null),
            (Schema::Bytes, Schema::Int),
            // array and maps
            (int_array_schema(), long_array_schema()),
            (int_map_schema(), int_array_schema()),
            (int_array_schema(), int_map_schema()),
            (int_map_schema(), long_map_schema()),
            // enum
            (enum1_ab_schema(), enum1_abc_schema()),
            (enum1_bc_schema(), enum1_abc_schema()),
            (enum1_ab_schema(), enum2_ab_schema()),
            (Schema::Int, enum2_ab_schema()),
            (enum2_ab_schema(), Schema::Int),
            //union
            (int_union_schema(), int_string_union_schema()),
            (string_union_schema(), int_string_union_schema()),
            //record
            (empty_record2_schema(), empty_record1_schema()),
            (a_int_record1_schema(), empty_record1_schema()),
            (a_int_b_dint_record1_schema(), empty_record1_schema()),
            (int_list_record_schema(), long_list_record_schema()),
            (nested_record(), nested_optional_record()),
        ];

        assert!(!incompatible_schemas
            .iter()
            .any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader)));
    }

    #[test]
    fn test_compatible_reader_writer_pairs() {
        let compatible_schemas = vec![
            (Schema::Null, Schema::Null),
            (Schema::Long, Schema::Int),
            (Schema::Float, Schema::Int),
            (Schema::Float, Schema::Long),
            (Schema::Double, Schema::Long),
            (Schema::Double, Schema::Int),
            (Schema::Double, Schema::Float),
            (Schema::String, Schema::Bytes),
            (Schema::Bytes, Schema::String),
            (int_array_schema(), int_array_schema()),
            (long_array_schema(), int_array_schema()),
            (int_map_schema(), int_map_schema()),
            (long_map_schema(), int_map_schema()),
            (enum1_ab_schema(), enum1_ab_schema()),
            (enum1_abc_schema(), enum1_ab_schema()),
            (empty_union_schema(), empty_union_schema()),
            (int_union_schema(), int_union_schema()),
            (int_string_union_schema(), string_int_union_schema()),
            (int_union_schema(), empty_union_schema()),
            (long_union_schema(), int_union_schema()),
            (int_union_schema(), Schema::Int),
            (Schema::Int, int_union_schema()),
            (empty_record1_schema(), empty_record1_schema()),
            (empty_record1_schema(), a_int_record1_schema()),
            (a_int_record1_schema(), a_int_record1_schema()),
            (a_dint_record1_schema(), a_int_record1_schema()),
            (a_dint_record1_schema(), a_dint_record1_schema()),
            (a_int_record1_schema(), a_dint_record1_schema()),
            (a_long_record1_schema(), a_int_record1_schema()),
            (a_int_record1_schema(), a_int_b_int_record1_schema()),
            (a_dint_record1_schema(), a_int_b_int_record1_schema()),
            (a_int_b_dint_record1_schema(), a_int_record1_schema()),
            (a_dint_b_dint_record1_schema(), empty_record1_schema()),
            (a_dint_b_dint_record1_schema(), a_int_record1_schema()),
            (a_int_b_int_record1_schema(), a_dint_b_dint_record1_schema()),
            (int_list_record_schema(), int_list_record_schema()),
            (long_list_record_schema(), long_list_record_schema()),
            (long_list_record_schema(), int_list_record_schema()),
            (nested_optional_record(), nested_record()),
        ];

        assert!(compatible_schemas
            .iter()
            .all(|(reader, writer)| SchemaCompatibility::can_read(writer, reader)));
    }

    fn writer_schema() -> Schema {
        Schema::parse_str(
            r#"
      {"type":"record", "name":"Record", "fields":[
        {"name":"oldfield1", "type":"int"},
        {"name":"oldfield2", "type":"string"}
      ]}
"#,
        )
        .unwrap()
    }

    #[test]
    fn test_missing_field() -> TestResult {
        let reader_schema = Schema::parse_str(
            r#"
      {"type":"record", "name":"Record", "fields":[
        {"name":"oldfield1", "type":"int"}
      ]}
"#,
        )?;
        assert!(SchemaCompatibility::can_read(
            &writer_schema(),
            &reader_schema,
        ));
        assert!(!SchemaCompatibility::can_read(
            &reader_schema,
            &writer_schema()
        ));

        Ok(())
    }

    #[test]
    fn test_missing_second_field() -> TestResult {
        let reader_schema = Schema::parse_str(
            r#"
        {"type":"record", "name":"Record", "fields":[
          {"name":"oldfield2", "type":"string"}
        ]}
"#,
        )?;
        assert!(SchemaCompatibility::can_read(
            &writer_schema(),
            &reader_schema
        ));
        assert!(!SchemaCompatibility::can_read(
            &reader_schema,
            &writer_schema()
        ));

        Ok(())
    }

    #[test]
    fn test_all_fields() -> TestResult {
        let reader_schema = Schema::parse_str(
            r#"
        {"type":"record", "name":"Record", "fields":[
          {"name":"oldfield1", "type":"int"},
          {"name":"oldfield2", "type":"string"}
        ]}
"#,
        )?;
        assert!(SchemaCompatibility::can_read(
            &writer_schema(),
            &reader_schema
        ));
        assert!(SchemaCompatibility::can_read(
            &reader_schema,
            &writer_schema()
        ));

        Ok(())
    }

    #[test]
    fn test_new_field_with_default() -> TestResult {
        let reader_schema = Schema::parse_str(
            r#"
        {"type":"record", "name":"Record", "fields":[
          {"name":"oldfield1", "type":"int"},
          {"name":"newfield1", "type":"int", "default":42}
        ]}
"#,
        )?;
        assert!(SchemaCompatibility::can_read(
            &writer_schema(),
            &reader_schema
        ));
        assert!(!SchemaCompatibility::can_read(
            &reader_schema,
            &writer_schema()
        ));

        Ok(())
    }

    #[test]
    fn test_new_field() -> TestResult {
        let reader_schema = Schema::parse_str(
            r#"
        {"type":"record", "name":"Record", "fields":[
          {"name":"oldfield1", "type":"int"},
          {"name":"newfield1", "type":"int"}
        ]}
"#,
        )?;
        assert!(!SchemaCompatibility::can_read(
            &writer_schema(),
            &reader_schema
        ));
        assert!(!SchemaCompatibility::can_read(
            &reader_schema,
            &writer_schema()
        ));

        Ok(())
    }

    #[test]
    fn test_array_writer_schema() {
        let valid_reader = string_array_schema();
        let invalid_reader = string_map_schema();

        assert!(SchemaCompatibility::can_read(
            &string_array_schema(),
            &valid_reader
        ));
        assert!(!SchemaCompatibility::can_read(
            &string_array_schema(),
            &invalid_reader
        ));
    }

    #[test]
    fn test_primitive_writer_schema() {
        let valid_reader = Schema::String;
        assert!(SchemaCompatibility::can_read(
            &Schema::String,
            &valid_reader
        ));
        assert!(!SchemaCompatibility::can_read(
            &Schema::Int,
            &Schema::String
        ));
    }

    #[test]
    fn test_union_reader_writer_subset_incompatibility() {
        // reader union schema must contain all writer union branches
        let union_writer = union_schema(vec![Schema::Int, Schema::String]);
        let union_reader = union_schema(vec![Schema::String]);

        assert!(!SchemaCompatibility::can_read(&union_writer, &union_reader));
        assert!(SchemaCompatibility::can_read(&union_reader, &union_writer));
    }

    #[test]
    fn test_incompatible_record_field() -> TestResult {
        let string_schema = Schema::parse_str(
            r#"
        {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
            {"name":"field1", "type":"string"}
        ]}
        "#,
        )?;

        let int_schema = Schema::parse_str(
            r#"
      {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
        {"name":"field1", "type":"int"}
      ]}
"#,
        )?;

        assert!(!SchemaCompatibility::can_read(&string_schema, &int_schema));

        Ok(())
    }

    #[test]
    fn test_enum_symbols() -> TestResult {
        let enum_schema1 = Schema::parse_str(
            r#"
      {"type":"enum", "name":"MyEnum", "symbols":["A","B"]}
"#,
        )?;
        let enum_schema2 =
            Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)?;
        assert!(!SchemaCompatibility::can_read(&enum_schema2, &enum_schema1));
        assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2));

        Ok(())
    }

    fn point_2d_schema() -> Schema {
        Schema::parse_str(
            r#"
      {"type":"record", "name":"Point2D", "fields":[
        {"name":"x", "type":"double"},
        {"name":"y", "type":"double"}
      ]}
    "#,
        )
        .unwrap()
    }

    fn point_2d_fullname_schema() -> Schema {
        Schema::parse_str(
            r#"
      {"type":"record", "name":"Point", "namespace":"written", "fields":[
        {"name":"x", "type":"double"},
        {"name":"y", "type":"double"}
      ]}
    "#,
        )
        .unwrap()
    }

    fn point_3d_no_default_schema() -> Schema {
        Schema::parse_str(
            r#"
      {"type":"record", "name":"Point", "fields":[
        {"name":"x", "type":"double"},
        {"name":"y", "type":"double"},
        {"name":"z", "type":"double"}
      ]}
    "#,
        )
        .unwrap()
    }

    fn point_3d_schema() -> Schema {
        Schema::parse_str(
            r#"
      {"type":"record", "name":"Point3D", "fields":[
        {"name":"x", "type":"double"},
        {"name":"y", "type":"double"},
        {"name":"z", "type":"double", "default": 0.0}
      ]}
    "#,
        )
        .unwrap()
    }

    fn point_3d_match_name_schema() -> Schema {
        Schema::parse_str(
            r#"
      {"type":"record", "name":"Point", "fields":[
        {"name":"x", "type":"double"},
        {"name":"y", "type":"double"},
        {"name":"z", "type":"double", "default": 0.0}
      ]}
    "#,
        )
        .unwrap()
    }

    #[test]
    fn test_union_resolution_no_structure_match() {
        // short name match, but no structure match
        let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]);
        assert!(!SchemaCompatibility::can_read(
            &point_2d_fullname_schema(),
            &read_schema
        ));
    }

    #[test]
    fn test_union_resolution_first_structure_match_2d() {
        // multiple structure matches with no name matches
        let read_schema = union_schema(vec![
            Schema::Null,
            point_3d_no_default_schema(),
            point_2d_schema(),
            point_3d_schema(),
        ]);
        assert!(!SchemaCompatibility::can_read(
            &point_2d_fullname_schema(),
            &read_schema
        ));
    }

    #[test]
    fn test_union_resolution_first_structure_match_3d() {
        // multiple structure matches with no name matches
        let read_schema = union_schema(vec![
            Schema::Null,
            point_3d_no_default_schema(),
            point_3d_schema(),
            point_2d_schema(),
        ]);
        assert!(!SchemaCompatibility::can_read(
            &point_2d_fullname_schema(),
            &read_schema
        ));
    }

    #[test]
    fn test_union_resolution_named_structure_match() {
        // multiple structure matches with a short name match
        let read_schema = union_schema(vec![
            Schema::Null,
            point_2d_schema(),
            point_3d_match_name_schema(),
            point_3d_schema(),
        ]);
        assert!(!SchemaCompatibility::can_read(
            &point_2d_fullname_schema(),
            &read_schema
        ));
    }

    #[test]
    fn test_union_resolution_full_name_match() {
        // there is a full name match that should be chosen
        let read_schema = union_schema(vec![
            Schema::Null,
            point_2d_schema(),
            point_3d_match_name_schema(),
            point_3d_schema(),
            point_2d_fullname_schema(),
        ]);
        assert!(SchemaCompatibility::can_read(
            &point_2d_fullname_schema(),
            &read_schema
        ));
    }

    #[test]
    fn test_avro_3772_enum_default() -> TestResult {
        let writer_raw_schema = r#"
        {
          "type": "record",
          "name": "test",
          "fields": [
            {"name": "a", "type": "long", "default": 42},
            {"name": "b", "type": "string"},
            {
              "name": "c",
              "type": {
                "type": "enum",
                "name": "suit",
                "symbols": ["diamonds", "spades", "clubs", "hearts"],
                "default": "spades"
              }
            }
          ]
        }
        "#;

        let reader_raw_schema = r#"
        {
          "type": "record",
          "name": "test",
          "fields": [
            {"name": "a", "type": "long", "default": 42},
            {"name": "b", "type": "string"},
            {
              "name": "c",
              "type": {
                 "type": "enum",
                 "name": "suit",
                 "symbols": ["diamonds", "spades", "ninja", "hearts"],
                 "default": "spades"
              }
            }
          ]
        }
      "#;
        let writer_schema = Schema::parse_str(writer_raw_schema)?;
        let reader_schema = Schema::parse_str(reader_raw_schema)?;
        let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null);
        let mut record = Record::new(writer.schema()).unwrap();
        record.put("a", 27i64);
        record.put("b", "foo");
        record.put("c", "clubs");
        writer.append(record).unwrap();
        let input = writer.into_inner()?;
        let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
        assert_eq!(
            reader.next().unwrap().unwrap(),
            Value::Record(vec![
                ("a".to_string(), Value::Long(27)),
                ("b".to_string(), Value::String("foo".to_string())),
                ("c".to_string(), Value::Enum(1, "spades".to_string())),
            ])
        );
        assert!(reader.next().is_none());

        Ok(())
    }

    #[test]
    fn test_avro_3772_enum_default_less_symbols() -> TestResult {
        let writer_raw_schema = r#"
        {
          "type": "record",
          "name": "test",
          "fields": [
            {"name": "a", "type": "long", "default": 42},
            {"name": "b", "type": "string"},
            {
              "name": "c",
              "type": {
                "type": "enum",
                "name": "suit",
                "symbols": ["diamonds", "spades", "clubs", "hearts"],
                "default": "spades"
              }
            }
          ]
        }
        "#;

        let reader_raw_schema = r#"
        {
          "type": "record",
          "name": "test",
          "fields": [
            {"name": "a", "type": "long", "default": 42},
            {"name": "b", "type": "string"},
            {
              "name": "c",
              "type": {
                 "type": "enum",
                  "name": "suit",
                  "symbols": ["hearts", "spades"],
                  "default": "spades"
              }
            }
          ]
        }
      "#;
        let writer_schema = Schema::parse_str(writer_raw_schema)?;
        let reader_schema = Schema::parse_str(reader_raw_schema)?;
        let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null);
        let mut record = Record::new(writer.schema()).unwrap();
        record.put("a", 27i64);
        record.put("b", "foo");
        record.put("c", "hearts");
        writer.append(record).unwrap();
        let input = writer.into_inner()?;
        let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
        assert_eq!(
            reader.next().unwrap().unwrap(),
            Value::Record(vec![
                ("a".to_string(), Value::Long(27)),
                ("b".to_string(), Value::String("foo".to_string())),
                ("c".to_string(), Value::Enum(0, "hearts".to_string())),
            ])
        );
        assert!(reader.next().is_none());

        Ok(())
    }
}