apache_avro/
schema_equality.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::{
19    schema::{
20        ArraySchema, DecimalSchema, EnumSchema, FixedSchema, MapSchema, RecordField, RecordSchema,
21        UnionSchema,
22    },
23    Schema,
24};
25use std::{fmt::Debug, sync::OnceLock};
26
27/// A trait that compares two schemata for equality.
28/// To register a custom one use [set_schemata_equality_comparator].
29pub trait SchemataEq: Debug + Send + Sync {
30    /// Compares two schemata for equality.
31    fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool;
32}
33
34/// Compares two schemas according to the Avro specification by using
35/// their canonical forms.
36/// See <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
37#[derive(Debug)]
38pub struct SpecificationEq;
39impl SchemataEq for SpecificationEq {
40    fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
41        schema_one.canonical_form() == schema_two.canonical_form()
42    }
43}
44
45/// Compares two schemas for equality field by field, using only the fields that
46/// are used to construct their canonical forms.
47/// See <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
48#[derive(Debug)]
49pub struct StructFieldEq {
50    /// Whether to include custom attributes in the comparison.
51    /// The custom attributes are not used to construct the canonical form of the schema!
52    pub include_attributes: bool,
53}
54
55impl SchemataEq for StructFieldEq {
56    fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
57        macro_rules! compare_primitive {
58            ($primitive:ident) => {
59                if let Schema::$primitive = schema_one {
60                    if let Schema::$primitive = schema_two {
61                        return true;
62                    }
63                    return false;
64                }
65            };
66        }
67
68        if schema_one.name() != schema_two.name() {
69            return false;
70        }
71
72        compare_primitive!(Null);
73        compare_primitive!(Boolean);
74        compare_primitive!(Int);
75        compare_primitive!(Int);
76        compare_primitive!(Long);
77        compare_primitive!(Float);
78        compare_primitive!(Double);
79        compare_primitive!(Bytes);
80        compare_primitive!(String);
81        compare_primitive!(Uuid);
82        compare_primitive!(BigDecimal);
83        compare_primitive!(Date);
84        compare_primitive!(Duration);
85        compare_primitive!(TimeMicros);
86        compare_primitive!(TimeMillis);
87        compare_primitive!(TimestampMicros);
88        compare_primitive!(TimestampMillis);
89        compare_primitive!(TimestampNanos);
90        compare_primitive!(LocalTimestampMicros);
91        compare_primitive!(LocalTimestampMillis);
92        compare_primitive!(LocalTimestampNanos);
93
94        if self.include_attributes
95            && schema_one.custom_attributes() != schema_two.custom_attributes()
96        {
97            return false;
98        }
99
100        if let Schema::Record(RecordSchema {
101            fields: fields_one, ..
102        }) = schema_one
103        {
104            if let Schema::Record(RecordSchema {
105                fields: fields_two, ..
106            }) = schema_two
107            {
108                return self.compare_fields(fields_one, fields_two);
109            }
110            return false;
111        }
112
113        if let Schema::Enum(EnumSchema {
114            symbols: symbols_one,
115            ..
116        }) = schema_one
117        {
118            if let Schema::Enum(EnumSchema {
119                symbols: symbols_two,
120                ..
121            }) = schema_two
122            {
123                return symbols_one == symbols_two;
124            }
125            return false;
126        }
127
128        if let Schema::Fixed(FixedSchema { size: size_one, .. }) = schema_one {
129            if let Schema::Fixed(FixedSchema { size: size_two, .. }) = schema_two {
130                return size_one == size_two;
131            }
132            return false;
133        }
134
135        if let Schema::Union(UnionSchema {
136            schemas: schemas_one,
137            ..
138        }) = schema_one
139        {
140            if let Schema::Union(UnionSchema {
141                schemas: schemas_two,
142                ..
143            }) = schema_two
144            {
145                return schemas_one.len() == schemas_two.len()
146                    && schemas_one
147                        .iter()
148                        .zip(schemas_two.iter())
149                        .all(|(s1, s2)| self.compare(s1, s2));
150            }
151            return false;
152        }
153
154        if let Schema::Decimal(DecimalSchema {
155            precision: precision_one,
156            scale: scale_one,
157            ..
158        }) = schema_one
159        {
160            if let Schema::Decimal(DecimalSchema {
161                precision: precision_two,
162                scale: scale_two,
163                ..
164            }) = schema_two
165            {
166                return precision_one == precision_two && scale_one == scale_two;
167            }
168            return false;
169        }
170
171        if let Schema::Array(ArraySchema {
172            items: items_one, ..
173        }) = schema_one
174        {
175            if let Schema::Array(ArraySchema {
176                items: items_two, ..
177            }) = schema_two
178            {
179                return items_one == items_two;
180            }
181            return false;
182        }
183
184        if let Schema::Map(MapSchema {
185            types: types_one, ..
186        }) = schema_one
187        {
188            if let Schema::Map(MapSchema {
189                types: types_two, ..
190            }) = schema_two
191            {
192                return self.compare(types_one, types_two);
193            }
194            return false;
195        }
196
197        if let Schema::Ref { name: name_one } = schema_one {
198            if let Schema::Ref { name: name_two } = schema_two {
199                return name_one == name_two;
200            }
201            return false;
202        }
203
204        error!(
205            "This is a bug in schema_equality.rs! The following schemata types are not checked! \
206            Please report it to the Avro library maintainers! \
207            \n{:?}\n\n{:?}",
208            schema_one, schema_two
209        );
210        false
211    }
212}
213
214impl StructFieldEq {
215    fn compare_fields(&self, fields_one: &[RecordField], fields_two: &[RecordField]) -> bool {
216        fields_one.len() == fields_two.len()
217            && fields_one
218                .iter()
219                .zip(fields_two.iter())
220                .all(|(f1, f2)| self.compare(&f1.schema, &f2.schema))
221    }
222}
223
224static SCHEMATA_COMPARATOR_ONCE: OnceLock<Box<dyn SchemataEq>> = OnceLock::new();
225
226/// Sets a custom schemata equality comparator.
227///
228/// Returns a unit if the registration was successful or the already
229/// registered comparator if the registration failed.
230///
231/// **Note**: This function must be called before parsing any schema because this will
232/// register the default comparator and the registration is one time only!
233pub fn set_schemata_equality_comparator(
234    comparator: Box<dyn SchemataEq>,
235) -> Result<(), Box<dyn SchemataEq>> {
236    debug!(
237        "Setting a custom schemata equality comparator: {:?}.",
238        comparator
239    );
240    SCHEMATA_COMPARATOR_ONCE.set(comparator)
241}
242
243pub(crate) fn compare_schemata(schema_one: &Schema, schema_two: &Schema) -> bool {
244    SCHEMATA_COMPARATOR_ONCE
245        .get_or_init(|| {
246            debug!("Going to use the default schemata equality comparator: SpecificationEq.",);
247            Box::new(StructFieldEq {
248                include_attributes: false,
249            })
250        })
251        .compare(schema_one, schema_two)
252}
253
254#[cfg(test)]
255#[allow(non_snake_case)]
256mod tests {
257    use super::*;
258    use crate::schema::{Name, RecordFieldOrder};
259    use apache_avro_test_helper::TestResult;
260    use serde_json::Value;
261    use std::collections::BTreeMap;
262
263    const SPECIFICATION_EQ: SpecificationEq = SpecificationEq;
264    const STRUCT_FIELD_EQ: StructFieldEq = StructFieldEq {
265        include_attributes: false,
266    };
267
268    macro_rules! test_primitives {
269        ($primitive:ident) => {
270            paste::item! {
271                #[test]
272                fn [<test_avro_3939_compare_schemata_$primitive>]() {
273                    let specification_eq_res = SPECIFICATION_EQ.compare(&Schema::$primitive, &Schema::$primitive);
274                    let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&Schema::$primitive, &Schema::$primitive);
275                    assert_eq!(specification_eq_res, struct_field_eq_res)
276                }
277            }
278        };
279    }
280
281    test_primitives!(Null);
282    test_primitives!(Boolean);
283    test_primitives!(Int);
284    test_primitives!(Long);
285    test_primitives!(Float);
286    test_primitives!(Double);
287    test_primitives!(Bytes);
288    test_primitives!(String);
289    test_primitives!(Uuid);
290    test_primitives!(BigDecimal);
291    test_primitives!(Date);
292    test_primitives!(Duration);
293    test_primitives!(TimeMicros);
294    test_primitives!(TimeMillis);
295    test_primitives!(TimestampMicros);
296    test_primitives!(TimestampMillis);
297    test_primitives!(TimestampNanos);
298    test_primitives!(LocalTimestampMicros);
299    test_primitives!(LocalTimestampMillis);
300    test_primitives!(LocalTimestampNanos);
301
302    #[test]
303    fn test_avro_3939_compare_named_schemata_with_different_names() {
304        let schema_one = Schema::Ref {
305            name: Name::from("name1"),
306        };
307
308        let schema_two = Schema::Ref {
309            name: Name::from("name2"),
310        };
311
312        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
313        assert!(!specification_eq_res);
314        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
315        assert!(!struct_field_eq_res);
316
317        assert_eq!(specification_eq_res, struct_field_eq_res);
318    }
319
320    #[test]
321    fn test_avro_3939_compare_schemata_not_including_attributes() {
322        let schema_one = Schema::map_with_attributes(
323            Schema::Boolean,
324            BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
325        );
326        let schema_two = Schema::map_with_attributes(
327            Schema::Boolean,
328            BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
329        );
330        // STRUCT_FIELD_EQ does not include attributes !
331        assert!(STRUCT_FIELD_EQ.compare(&schema_one, &schema_two));
332    }
333
334    #[test]
335    fn test_avro_3939_compare_schemata_including_attributes() {
336        let struct_field_eq = StructFieldEq {
337            include_attributes: true,
338        };
339        let schema_one = Schema::map_with_attributes(
340            Schema::Boolean,
341            BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
342        );
343        let schema_two = Schema::map_with_attributes(
344            Schema::Boolean,
345            BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
346        );
347        assert!(!struct_field_eq.compare(&schema_one, &schema_two));
348    }
349
350    #[test]
351    fn test_avro_3939_compare_map_schemata() {
352        let schema_one = Schema::map(Schema::Boolean);
353        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
354        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
355
356        let schema_two = Schema::map(Schema::Boolean);
357
358        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
359        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
360        assert!(
361            specification_eq_res,
362            "SpecificationEq: Equality of two Schema::Map failed!"
363        );
364        assert!(
365            struct_field_eq_res,
366            "StructFieldEq: Equality of two Schema::Map failed!"
367        );
368        assert_eq!(specification_eq_res, struct_field_eq_res);
369    }
370
371    #[test]
372    fn test_avro_3939_compare_array_schemata() {
373        let schema_one = Schema::array(Schema::Boolean);
374        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
375        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
376
377        let schema_two = Schema::array(Schema::Boolean);
378
379        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
380        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
381        assert!(
382            specification_eq_res,
383            "SpecificationEq: Equality of two Schema::Array failed!"
384        );
385        assert!(
386            struct_field_eq_res,
387            "StructFieldEq: Equality of two Schema::Array failed!"
388        );
389        assert_eq!(specification_eq_res, struct_field_eq_res);
390    }
391
392    #[test]
393    fn test_avro_3939_compare_decimal_schemata() {
394        let schema_one = Schema::Decimal(DecimalSchema {
395            precision: 10,
396            scale: 2,
397            inner: Box::new(Schema::Bytes),
398        });
399        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
400        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
401
402        let schema_two = Schema::Decimal(DecimalSchema {
403            precision: 10,
404            scale: 2,
405            inner: Box::new(Schema::Bytes),
406        });
407
408        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
409        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
410        assert!(
411            specification_eq_res,
412            "SpecificationEq: Equality of two Schema::Decimal failed!"
413        );
414        assert!(
415            struct_field_eq_res,
416            "StructFieldEq: Equality of two Schema::Decimal failed!"
417        );
418        assert_eq!(specification_eq_res, struct_field_eq_res);
419    }
420
421    #[test]
422    fn test_avro_3939_compare_fixed_schemata() {
423        let schema_one = Schema::Fixed(FixedSchema {
424            name: Name::from("fixed"),
425            doc: None,
426            size: 10,
427            default: None,
428            aliases: None,
429            attributes: BTreeMap::new(),
430        });
431        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
432        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
433
434        let schema_two = Schema::Fixed(FixedSchema {
435            name: Name::from("fixed"),
436            doc: None,
437            size: 10,
438            default: None,
439            aliases: None,
440            attributes: BTreeMap::new(),
441        });
442
443        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
444        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
445        assert!(
446            specification_eq_res,
447            "SpecificationEq: Equality of two Schema::Fixed failed!"
448        );
449        assert!(
450            struct_field_eq_res,
451            "StructFieldEq: Equality of two Schema::Fixed failed!"
452        );
453        assert_eq!(specification_eq_res, struct_field_eq_res);
454    }
455
456    #[test]
457    fn test_avro_3939_compare_enum_schemata() {
458        let schema_one = Schema::Enum(EnumSchema {
459            name: Name::from("enum"),
460            doc: None,
461            symbols: vec!["A".to_string(), "B".to_string()],
462            default: None,
463            aliases: None,
464            attributes: BTreeMap::new(),
465        });
466        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
467        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
468
469        let schema_two = Schema::Enum(EnumSchema {
470            name: Name::from("enum"),
471            doc: None,
472            symbols: vec!["A".to_string(), "B".to_string()],
473            default: None,
474            aliases: None,
475            attributes: BTreeMap::new(),
476        });
477
478        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
479        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
480        assert!(
481            specification_eq_res,
482            "SpecificationEq: Equality of two Schema::Enum failed!"
483        );
484        assert!(
485            struct_field_eq_res,
486            "StructFieldEq: Equality of two Schema::Enum failed!"
487        );
488        assert_eq!(specification_eq_res, struct_field_eq_res);
489    }
490
491    #[test]
492    fn test_avro_3939_compare_ref_schemata() {
493        let schema_one = Schema::Ref {
494            name: Name::from("ref"),
495        };
496        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
497        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
498
499        let schema_two = Schema::Ref {
500            name: Name::from("ref"),
501        };
502
503        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
504        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
505        assert!(
506            specification_eq_res,
507            "SpecificationEq: Equality of two Schema::Ref failed!"
508        );
509        assert!(
510            struct_field_eq_res,
511            "StructFieldEq: Equality of two Schema::Ref failed!"
512        );
513        assert_eq!(specification_eq_res, struct_field_eq_res);
514    }
515
516    #[test]
517    fn test_avro_3939_compare_record_schemata() {
518        let schema_one = Schema::Record(RecordSchema {
519            name: Name::from("record"),
520            doc: None,
521            fields: vec![RecordField {
522                name: "field".to_string(),
523                doc: None,
524                default: None,
525                schema: Schema::Boolean,
526                order: RecordFieldOrder::Ignore,
527                aliases: None,
528                custom_attributes: BTreeMap::new(),
529                position: 0,
530            }],
531            aliases: None,
532            attributes: BTreeMap::new(),
533            lookup: Default::default(),
534        });
535        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
536        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
537
538        let schema_two = Schema::Record(RecordSchema {
539            name: Name::from("record"),
540            doc: None,
541            fields: vec![RecordField {
542                name: "field".to_string(),
543                doc: None,
544                default: None,
545                schema: Schema::Boolean,
546                order: RecordFieldOrder::Ignore,
547                aliases: None,
548                custom_attributes: BTreeMap::new(),
549                position: 0,
550            }],
551            aliases: None,
552            attributes: BTreeMap::new(),
553            lookup: Default::default(),
554        });
555
556        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
557        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
558        assert!(
559            specification_eq_res,
560            "SpecificationEq: Equality of two Schema::Record failed!"
561        );
562        assert!(
563            struct_field_eq_res,
564            "StructFieldEq: Equality of two Schema::Record failed!"
565        );
566        assert_eq!(specification_eq_res, struct_field_eq_res);
567    }
568
569    #[test]
570    fn test_avro_3939_compare_union_schemata() -> TestResult {
571        let schema_one = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
572        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
573        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
574
575        let schema_two = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
576
577        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
578        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
579        assert!(
580            specification_eq_res,
581            "SpecificationEq: Equality of two Schema::Union failed!"
582        );
583        assert!(
584            struct_field_eq_res,
585            "StructFieldEq: Equality of two Schema::Union failed!"
586        );
587        assert_eq!(specification_eq_res, struct_field_eq_res);
588        Ok(())
589    }
590}