Skip to main content

avro_rs/
schema_compatibility.rs

1//! Logic for checking schema compatibility
2use crate::schema::{Schema, SchemaKind};
3use std::{
4    collections::{hash_map::DefaultHasher, HashSet},
5    hash::Hasher,
6    ptr,
7};
8
9pub struct SchemaCompatibility;
10
11struct Checker {
12    recursion: HashSet<(u64, u64)>,
13}
14
15impl Checker {
16    /// Create a new checker, with recursion set to an empty set.
17    pub(crate) fn new() -> Self {
18        Self {
19            recursion: HashSet::new(),
20        }
21    }
22
23    pub(crate) fn can_read(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
24        self.full_match_schemas(writers_schema, readers_schema)
25    }
26
27    pub(crate) fn full_match_schemas(
28        &mut self,
29        writers_schema: &Schema,
30        readers_schema: &Schema,
31    ) -> bool {
32        if self.recursion_in_progress(writers_schema, readers_schema) {
33            return true;
34        }
35
36        if !SchemaCompatibility::match_schemas(writers_schema, readers_schema) {
37            return false;
38        }
39
40        let w_type = SchemaKind::from(writers_schema);
41        let r_type = SchemaKind::from(readers_schema);
42
43        if w_type != SchemaKind::Union && (r_type.is_primitive() || r_type == SchemaKind::Fixed) {
44            return true;
45        }
46
47        match r_type {
48            SchemaKind::Record => self.match_record_schemas(writers_schema, readers_schema),
49            SchemaKind::Map => {
50                if let Schema::Map(w_m) = writers_schema {
51                    if let Schema::Map(r_m) = readers_schema {
52                        self.full_match_schemas(w_m, r_m)
53                    } else {
54                        unreachable!("readers_schema should have been Schema::Map")
55                    }
56                } else {
57                    unreachable!("writers_schema should have been Schema::Map")
58                }
59            }
60            SchemaKind::Array => {
61                if let Schema::Array(w_a) = writers_schema {
62                    if let Schema::Array(r_a) = readers_schema {
63                        self.full_match_schemas(w_a, r_a)
64                    } else {
65                        unreachable!("readers_schema should have been Schema::Array")
66                    }
67                } else {
68                    unreachable!("writers_schema should have been Schema::Array")
69                }
70            }
71            SchemaKind::Union => self.match_union_schemas(writers_schema, readers_schema),
72            SchemaKind::Enum => {
73                // reader's symbols must contain all writer's symbols
74                if let Schema::Enum {
75                    symbols: w_symbols, ..
76                } = writers_schema
77                {
78                    if let Schema::Enum {
79                        symbols: r_symbols, ..
80                    } = readers_schema
81                    {
82                        return w_symbols.iter().find(|e| !r_symbols.contains(e)).is_none();
83                    }
84                }
85                false
86            }
87            _ => {
88                if w_type == SchemaKind::Union {
89                    if let Schema::Union(r) = writers_schema {
90                        if r.schemas.len() == 1 {
91                            return self.full_match_schemas(&r.schemas[0], readers_schema);
92                        }
93                    }
94                }
95                false
96            }
97        }
98    }
99
100    fn match_record_schemas(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
101        let w_type = SchemaKind::from(writers_schema);
102
103        if w_type == SchemaKind::Union {
104            return false;
105        }
106
107        if let Schema::Record {
108            fields: w_fields,
109            lookup: w_lookup,
110            ..
111        } = writers_schema
112        {
113            if let Schema::Record {
114                fields: r_fields, ..
115            } = readers_schema
116            {
117                for field in r_fields.iter() {
118                    if let Some(pos) = w_lookup.get(&field.name) {
119                        if !self.full_match_schemas(&w_fields[*pos].schema, &field.schema) {
120                            return false;
121                        }
122                    } else if field.default.is_none() {
123                        return false;
124                    }
125                }
126            }
127        }
128        true
129    }
130
131    fn match_union_schemas(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
132        let w_type = SchemaKind::from(writers_schema);
133        let r_type = SchemaKind::from(readers_schema);
134
135        assert_eq!(r_type, SchemaKind::Union);
136
137        if w_type == SchemaKind::Union {
138            if let Schema::Union(u) = writers_schema {
139                u.schemas
140                    .iter()
141                    .all(|schema| self.full_match_schemas(schema, readers_schema))
142            } else {
143                unreachable!("writers_schema should have been Schema::Union")
144            }
145        } else if let Schema::Union(u) = readers_schema {
146            u.schemas
147                .iter()
148                .any(|schema| self.full_match_schemas(writers_schema, schema))
149        } else {
150            unreachable!("readers_schema should have been Schema::Union")
151        }
152    }
153
154    fn recursion_in_progress(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
155        let mut hasher = DefaultHasher::new();
156        ptr::hash(writers_schema, &mut hasher);
157        let w_hash = hasher.finish();
158
159        hasher = DefaultHasher::new();
160        ptr::hash(readers_schema, &mut hasher);
161        let r_hash = hasher.finish();
162
163        let key = (w_hash, r_hash);
164        // This is a shortcut to add if not exists *and* return false. It will return true
165        // if it was able to insert.
166        !self.recursion.insert(key)
167    }
168}
169
170impl SchemaCompatibility {
171    /// `can_read` performs a full, recursive check that a datum written using the
172    /// writers_schema can be read using the readers_schema.
173    pub fn can_read(writers_schema: &Schema, readers_schema: &Schema) -> bool {
174        let mut c = Checker::new();
175        c.can_read(writers_schema, readers_schema)
176    }
177
178    /// `mutual_read` performs a full, recursive check that a datum written using either
179    /// the writers_schema or the readers_schema can be read using the other schema.
180    pub fn mutual_read(writers_schema: &Schema, readers_schema: &Schema) -> bool {
181        SchemaCompatibility::can_read(writers_schema, readers_schema)
182            && SchemaCompatibility::can_read(readers_schema, writers_schema)
183    }
184
185    ///  `match_schemas` performs a basic check that a datum written with the
186    ///  writers_schema could be read using the readers_schema. This check only includes
187    ///  matching the types, including schema promotion, and matching the full name for
188    ///  named types. Aliases for named types are not supported here, and the rust
189    ///  implementation of Avro in general does not include support for aliases (I think).
190    pub(crate) fn match_schemas(writers_schema: &Schema, readers_schema: &Schema) -> bool {
191        let w_type = SchemaKind::from(writers_schema);
192        let r_type = SchemaKind::from(readers_schema);
193
194        if w_type == SchemaKind::Union || r_type == SchemaKind::Union {
195            return true;
196        }
197
198        if w_type == r_type {
199            if r_type.is_primitive() {
200                return true;
201            }
202
203            match r_type {
204                SchemaKind::Record => {
205                    if let Schema::Record { name: w_name, .. } = writers_schema {
206                        if let Schema::Record { name: r_name, .. } = readers_schema {
207                            return w_name.fullname(None) == r_name.fullname(None);
208                        } else {
209                            unreachable!("readers_schema should have been Schema::Record")
210                        }
211                    } else {
212                        unreachable!("writers_schema should have been Schema::Record")
213                    }
214                }
215                SchemaKind::Fixed => {
216                    if let Schema::Fixed {
217                        name: w_name,
218                        size: w_size,
219                    } = writers_schema
220                    {
221                        if let Schema::Fixed {
222                            name: r_name,
223                            size: r_size,
224                        } = readers_schema
225                        {
226                            return w_name.fullname(None) == r_name.fullname(None)
227                                && w_size == r_size;
228                        } else {
229                            unreachable!("readers_schema should have been Schema::Fixed")
230                        }
231                    } else {
232                        unreachable!("writers_schema should have been Schema::Fixed")
233                    }
234                }
235                SchemaKind::Enum => {
236                    if let Schema::Enum { name: w_name, .. } = writers_schema {
237                        if let Schema::Enum { name: r_name, .. } = readers_schema {
238                            return w_name.fullname(None) == r_name.fullname(None);
239                        } else {
240                            unreachable!("readers_schema should have been Schema::Enum")
241                        }
242                    } else {
243                        unreachable!("writers_schema should have been Schema::Enum")
244                    }
245                }
246                SchemaKind::Map => {
247                    if let Schema::Map(w_m) = writers_schema {
248                        if let Schema::Map(r_m) = readers_schema {
249                            return SchemaCompatibility::match_schemas(w_m, r_m);
250                        } else {
251                            unreachable!("readers_schema should have been Schema::Map")
252                        }
253                    } else {
254                        unreachable!("writers_schema should have been Schema::Map")
255                    }
256                }
257                SchemaKind::Array => {
258                    if let Schema::Array(w_a) = writers_schema {
259                        if let Schema::Array(r_a) = readers_schema {
260                            return SchemaCompatibility::match_schemas(w_a, r_a);
261                        } else {
262                            unreachable!("readers_schema should have been Schema::Array")
263                        }
264                    } else {
265                        unreachable!("writers_schema should have been Schema::Array")
266                    }
267                }
268                _ => (),
269            };
270        }
271
272        if w_type == SchemaKind::Int
273            && vec![SchemaKind::Long, SchemaKind::Float, SchemaKind::Double]
274                .iter()
275                .any(|&t| t == r_type)
276        {
277            return true;
278        }
279
280        if w_type == SchemaKind::Long
281            && vec![SchemaKind::Float, SchemaKind::Double]
282                .iter()
283                .any(|&t| t == r_type)
284        {
285            return true;
286        }
287
288        if w_type == SchemaKind::Float && r_type == SchemaKind::Double {
289            return true;
290        }
291
292        if w_type == SchemaKind::String && r_type == SchemaKind::Bytes {
293            return true;
294        }
295
296        if w_type == SchemaKind::Bytes && r_type == SchemaKind::String {
297            return true;
298        }
299
300        false
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307
308    fn int_array_schema() -> Schema {
309        Schema::parse_str(r#"{"type":"array", "items":"int"}"#).unwrap()
310    }
311
312    fn long_array_schema() -> Schema {
313        Schema::parse_str(r#"{"type":"array", "items":"long"}"#).unwrap()
314    }
315
316    fn string_array_schema() -> Schema {
317        Schema::parse_str(r#"{"type":"array", "items":"string"}"#).unwrap()
318    }
319
320    fn int_map_schema() -> Schema {
321        Schema::parse_str(r#"{"type":"map", "values":"int"}"#).unwrap()
322    }
323
324    fn long_map_schema() -> Schema {
325        Schema::parse_str(r#"{"type":"map", "values":"long"}"#).unwrap()
326    }
327
328    fn string_map_schema() -> Schema {
329        Schema::parse_str(r#"{"type":"map", "values":"string"}"#).unwrap()
330    }
331
332    fn enum1_ab_schema() -> Schema {
333        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B"]}"#).unwrap()
334    }
335
336    fn enum1_abc_schema() -> Schema {
337        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}"#).unwrap()
338    }
339
340    fn enum1_bc_schema() -> Schema {
341        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["B","C"]}"#).unwrap()
342    }
343
344    fn enum2_ab_schema() -> Schema {
345        Schema::parse_str(r#"{"type":"enum", "name":"Enum2", "symbols":["A","B"]}"#).unwrap()
346    }
347
348    fn empty_record1_schema() -> Schema {
349        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[]}"#).unwrap()
350    }
351
352    fn empty_record2_schema() -> Schema {
353        Schema::parse_str(r#"{"type":"record", "name":"Record2", "fields": []}"#).unwrap()
354    }
355
356    fn a_int_record1_schema() -> Schema {
357        Schema::parse_str(
358            r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}]}"#,
359        )
360        .unwrap()
361    }
362
363    fn a_long_record1_schema() -> Schema {
364        Schema::parse_str(
365            r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"long"}]}"#,
366        )
367        .unwrap()
368    }
369
370    fn a_int_b_int_record1_schema() -> Schema {
371        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int"}]}"#).unwrap()
372    }
373
374    fn a_dint_record1_schema() -> Schema {
375        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}]}"#).unwrap()
376    }
377
378    fn a_int_b_dint_record1_schema() -> Schema {
379        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
380    }
381
382    fn a_dint_b_dint_record1_schema() -> Schema {
383        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
384    }
385
386    fn nested_record() -> Schema {
387        Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}}]}"#).unwrap()
388    }
389
390    fn nested_optional_record() -> Schema {
391        Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":["null",{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}],"default":null}]}"#).unwrap()
392    }
393
394    fn int_list_record_schema() -> Schema {
395        Schema::parse_str(r#"{"type":"record", "name":"List", "fields": [{"name": "head", "type": "int"},{"name": "tail", "type": "array", "items": "int"}]}"#).unwrap()
396    }
397
398    fn long_list_record_schema() -> Schema {
399        Schema::parse_str(
400            r#"
401      {
402        "type":"record", "name":"List", "fields": [
403          {"name": "head", "type": "long"},
404          {"name": "tail", "type": "array", "items": "long"}
405      ]}
406"#,
407        )
408        .unwrap()
409    }
410
411    fn union_schema(schemas: Vec<Schema>) -> Schema {
412        let schema_string = schemas
413            .iter()
414            .map(|s| s.canonical_form())
415            .collect::<Vec<String>>()
416            .join(",");
417        dbg!(&schema_string);
418        Schema::parse_str(&format!("[{}]", schema_string)).unwrap()
419    }
420
421    fn empty_union_schema() -> Schema {
422        union_schema(vec![])
423    }
424
425    // unused
426    // fn null_union_schema() -> Schema { union_schema(vec![Schema::Null]) }
427
428    fn int_union_schema() -> Schema {
429        union_schema(vec![Schema::Int])
430    }
431
432    fn long_union_schema() -> Schema {
433        union_schema(vec![Schema::Long])
434    }
435
436    fn string_union_schema() -> Schema {
437        union_schema(vec![Schema::String])
438    }
439
440    fn int_string_union_schema() -> Schema {
441        union_schema(vec![Schema::Int, Schema::String])
442    }
443
444    fn string_int_union_schema() -> Schema {
445        union_schema(vec![Schema::String, Schema::Int])
446    }
447
448    #[test]
449    fn test_broken() {
450        assert!(!SchemaCompatibility::can_read(
451            &int_string_union_schema(),
452            &int_union_schema()
453        ))
454    }
455
456    #[test]
457    fn test_incompatible_reader_writer_pairs() {
458        let incompatible_schemas = vec![
459            // null
460            (Schema::Null, Schema::Int),
461            (Schema::Null, Schema::Long),
462            // boolean
463            (Schema::Boolean, Schema::Int),
464            // int
465            (Schema::Int, Schema::Null),
466            (Schema::Int, Schema::Boolean),
467            (Schema::Int, Schema::Long),
468            (Schema::Int, Schema::Float),
469            (Schema::Int, Schema::Double),
470            // long
471            (Schema::Long, Schema::Float),
472            (Schema::Long, Schema::Double),
473            // float
474            (Schema::Float, Schema::Double),
475            // string
476            (Schema::String, Schema::Boolean),
477            (Schema::String, Schema::Int),
478            // bytes
479            (Schema::Bytes, Schema::Null),
480            (Schema::Bytes, Schema::Int),
481            // array and maps
482            (int_array_schema(), long_array_schema()),
483            (int_map_schema(), int_array_schema()),
484            (int_array_schema(), int_map_schema()),
485            (int_map_schema(), long_map_schema()),
486            // enum
487            (enum1_ab_schema(), enum1_abc_schema()),
488            (enum1_bc_schema(), enum1_abc_schema()),
489            (enum1_ab_schema(), enum2_ab_schema()),
490            (Schema::Int, enum2_ab_schema()),
491            (enum2_ab_schema(), Schema::Int),
492            //union
493            (int_union_schema(), int_string_union_schema()),
494            (string_union_schema(), int_string_union_schema()),
495            //record
496            (empty_record2_schema(), empty_record1_schema()),
497            (a_int_record1_schema(), empty_record1_schema()),
498            (a_int_b_dint_record1_schema(), empty_record1_schema()),
499            (int_list_record_schema(), long_list_record_schema()),
500            (nested_record(), nested_optional_record()),
501        ];
502
503        assert!(!incompatible_schemas
504            .iter()
505            .any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader)));
506    }
507
508    #[test]
509    fn test_compatible_reader_writer_pairs() {
510        let compatible_schemas = vec![
511            (Schema::Null, Schema::Null),
512            (Schema::Long, Schema::Int),
513            (Schema::Float, Schema::Int),
514            (Schema::Float, Schema::Long),
515            (Schema::Double, Schema::Long),
516            (Schema::Double, Schema::Int),
517            (Schema::Double, Schema::Float),
518            (Schema::String, Schema::Bytes),
519            (Schema::Bytes, Schema::String),
520            (int_array_schema(), int_array_schema()),
521            (long_array_schema(), int_array_schema()),
522            (int_map_schema(), int_map_schema()),
523            (long_map_schema(), int_map_schema()),
524            (enum1_ab_schema(), enum1_ab_schema()),
525            (enum1_abc_schema(), enum1_ab_schema()),
526            (empty_union_schema(), empty_union_schema()),
527            (int_union_schema(), int_union_schema()),
528            (int_string_union_schema(), string_int_union_schema()),
529            (int_union_schema(), empty_union_schema()),
530            (long_union_schema(), int_union_schema()),
531            (int_union_schema(), Schema::Int),
532            (Schema::Int, int_union_schema()),
533            (empty_record1_schema(), empty_record1_schema()),
534            (empty_record1_schema(), a_int_record1_schema()),
535            (a_int_record1_schema(), a_int_record1_schema()),
536            (a_dint_record1_schema(), a_int_record1_schema()),
537            (a_dint_record1_schema(), a_dint_record1_schema()),
538            (a_int_record1_schema(), a_dint_record1_schema()),
539            (a_long_record1_schema(), a_int_record1_schema()),
540            (a_int_record1_schema(), a_int_b_int_record1_schema()),
541            (a_dint_record1_schema(), a_int_b_int_record1_schema()),
542            (a_int_b_dint_record1_schema(), a_int_record1_schema()),
543            (a_dint_b_dint_record1_schema(), empty_record1_schema()),
544            (a_dint_b_dint_record1_schema(), a_int_record1_schema()),
545            (a_int_b_int_record1_schema(), a_dint_b_dint_record1_schema()),
546            (int_list_record_schema(), int_list_record_schema()),
547            (long_list_record_schema(), long_list_record_schema()),
548            (long_list_record_schema(), int_list_record_schema()),
549            (nested_optional_record(), nested_record()),
550        ];
551
552        assert!(compatible_schemas
553            .iter()
554            .all(|(reader, writer)| SchemaCompatibility::can_read(writer, reader)));
555    }
556
557    fn writer_schema() -> Schema {
558        Schema::parse_str(
559            r#"
560      {"type":"record", "name":"Record", "fields":[
561        {"name":"oldfield1", "type":"int"},
562        {"name":"oldfield2", "type":"string"}
563      ]}
564"#,
565        )
566        .unwrap()
567    }
568
569    #[test]
570    fn test_missing_field() {
571        let reader_schema = Schema::parse_str(
572            r#"
573      {"type":"record", "name":"Record", "fields":[
574        {"name":"oldfield1", "type":"int"}
575      ]}
576"#,
577        )
578        .unwrap();
579        assert!(SchemaCompatibility::can_read(
580            &writer_schema(),
581            &reader_schema,
582        ));
583        assert_eq!(
584            SchemaCompatibility::can_read(&reader_schema, &writer_schema()),
585            false
586        );
587    }
588
589    #[test]
590    fn test_missing_second_field() {
591        let reader_schema = Schema::parse_str(
592            r#"
593        {"type":"record", "name":"Record", "fields":[
594          {"name":"oldfield2", "type":"string"}
595        ]}
596"#,
597        )
598        .unwrap();
599        assert!(SchemaCompatibility::can_read(
600            &writer_schema(),
601            &reader_schema
602        ));
603        assert_eq!(
604            SchemaCompatibility::can_read(&reader_schema, &writer_schema()),
605            false
606        );
607    }
608
609    #[test]
610    fn test_all_fields() {
611        let reader_schema = Schema::parse_str(
612            r#"
613        {"type":"record", "name":"Record", "fields":[
614          {"name":"oldfield1", "type":"int"},
615          {"name":"oldfield2", "type":"string"}
616        ]}
617"#,
618        )
619        .unwrap();
620        assert!(SchemaCompatibility::can_read(
621            &writer_schema(),
622            &reader_schema
623        ));
624        assert!(SchemaCompatibility::can_read(
625            &reader_schema,
626            &writer_schema()
627        ));
628    }
629
630    #[test]
631    fn test_new_field_with_default() {
632        let reader_schema = Schema::parse_str(
633            r#"
634        {"type":"record", "name":"Record", "fields":[
635          {"name":"oldfield1", "type":"int"},
636          {"name":"newfield1", "type":"int", "default":42}
637        ]}
638"#,
639        )
640        .unwrap();
641        assert!(SchemaCompatibility::can_read(
642            &writer_schema(),
643            &reader_schema
644        ));
645        assert_eq!(
646            SchemaCompatibility::can_read(&reader_schema, &writer_schema()),
647            false
648        );
649    }
650
651    #[test]
652    fn test_new_field() {
653        let reader_schema = Schema::parse_str(
654            r#"
655        {"type":"record", "name":"Record", "fields":[
656          {"name":"oldfield1", "type":"int"},
657          {"name":"newfield1", "type":"int"}
658        ]}
659"#,
660        )
661        .unwrap();
662        assert_eq!(
663            SchemaCompatibility::can_read(&writer_schema(), &reader_schema),
664            false
665        );
666        assert_eq!(
667            SchemaCompatibility::can_read(&reader_schema, &writer_schema()),
668            false
669        );
670    }
671
672    #[test]
673    fn test_array_writer_schema() {
674        let valid_reader = string_array_schema();
675        let invalid_reader = string_map_schema();
676
677        assert!(SchemaCompatibility::can_read(
678            &string_array_schema(),
679            &valid_reader
680        ));
681        assert_eq!(
682            SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader),
683            false
684        );
685    }
686
687    #[test]
688    fn test_primitive_writer_schema() {
689        let valid_reader = Schema::String;
690        assert!(SchemaCompatibility::can_read(
691            &Schema::String,
692            &valid_reader
693        ));
694        assert_eq!(
695            SchemaCompatibility::can_read(&Schema::Int, &Schema::String),
696            false
697        );
698    }
699
700    #[test]
701    fn test_union_reader_writer_subset_incompatiblity() {
702        // reader union schema must contain all writer union branches
703        let union_writer = union_schema(vec![Schema::Int, Schema::String]);
704        let union_reader = union_schema(vec![Schema::String]);
705
706        assert_eq!(
707            SchemaCompatibility::can_read(&union_writer, &union_reader),
708            false
709        );
710        assert!(SchemaCompatibility::can_read(&union_reader, &union_writer));
711    }
712
713    #[test]
714    fn test_incompatible_record_field() {
715        let string_schema = Schema::parse_str(
716            r#"
717        {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
718            {"name":"field1", "type":"string"}
719        ]}
720        "#,
721        )
722        .unwrap();
723
724        let int_schema = Schema::parse_str(
725            r#"
726      {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
727        {"name":"field1", "type":"int"}
728      ]}
729"#,
730        )
731        .unwrap();
732
733        assert_eq!(
734            SchemaCompatibility::can_read(&string_schema, &int_schema),
735            false
736        );
737    }
738
739    #[test]
740    fn test_enum_symbols() {
741        let enum_schema1 = Schema::parse_str(
742            r#"
743      {"type":"enum", "name":"MyEnum", "symbols":["A","B"]}
744"#,
745        )
746        .unwrap();
747        let enum_schema2 =
748            Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)
749                .unwrap();
750        assert_eq!(
751            SchemaCompatibility::can_read(&enum_schema2, &enum_schema1),
752            false
753        );
754        assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2));
755    }
756
757    // unused
758    /*
759        fn point_2d_schema() -> Schema {
760            Schema::parse_str(
761                r#"
762          {"type":"record", "name":"Point2D", "fields":[
763            {"name":"x", "type":"double"},
764            {"name":"y", "type":"double"}
765          ]}
766        "#,
767            )
768            .unwrap()
769        }
770    */
771
772    fn point_2d_fullname_schema() -> Schema {
773        Schema::parse_str(
774            r#"
775      {"type":"record", "name":"Point", "namespace":"written", "fields":[
776        {"name":"x", "type":"double"},
777        {"name":"y", "type":"double"}
778      ]}
779    "#,
780        )
781        .unwrap()
782    }
783
784    fn point_3d_no_default_schema() -> Schema {
785        Schema::parse_str(
786            r#"
787      {"type":"record", "name":"Point", "fields":[
788        {"name":"x", "type":"double"},
789        {"name":"y", "type":"double"},
790        {"name":"z", "type":"double"}
791      ]}
792    "#,
793        )
794        .unwrap()
795    }
796
797    // unused
798    /*
799        fn point_3d_schema() -> Schema {
800            Schema::parse_str(
801                r#"
802          {"type":"record", "name":"Point3D", "fields":[
803            {"name":"x", "type":"double"},
804            {"name":"y", "type":"double"},
805            {"name":"z", "type":"double", "default": 0.0}
806          ]}
807        "#,
808            )
809            .unwrap()
810        }
811
812        fn point_3d_match_name_schema() -> Schema {
813            Schema::parse_str(
814                r#"
815          {"type":"record", "name":"Point", "fields":[
816            {"name":"x", "type":"double"},
817            {"name":"y", "type":"double"},
818            {"name":"z", "type":"double", "default": 0.0}
819          ]}
820        "#,
821            )
822            .unwrap()
823        }
824    */
825
826    #[test]
827    fn test_union_resolution_no_structure_match() {
828        // short name match, but no structure match
829        let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]);
830        assert_eq!(
831            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema),
832            false
833        );
834    }
835
836    // TODO(nlopes): the below require named schemas to be fully supported. See:
837    // https://github.com/flavray/avro-rs/pull/76
838    //
839    // #[test]
840    // fn test_union_resolution_first_structure_match_2d() {
841    //     // multiple structure matches with no name matches
842    //     let read_schema = union_schema(vec![
843    //         Schema::Null,
844    //         point_3d_no_default_schema(),
845    //         point_2d_schema(),
846    //         point_3d_schema(),
847    //     ]);
848    //     assert_eq!(
849    //         SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema),
850    //         false
851    //     );
852    // }
853
854    // #[test]
855    // fn test_union_resolution_first_structure_match_3d() {
856    //     // multiple structure matches with no name matches
857    //     let read_schema = union_schema(vec![
858    //         Schema::Null,
859    //         point_3d_no_default_schema(),
860    //         point_3d_schema(),
861    //         point_2d_schema(),
862    //     ]);
863    //     assert_eq!(
864    //         SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema),
865    //         false
866    //     );
867    // }
868
869    // #[test]
870    // fn test_union_resolution_named_structure_match() {
871    //     // multiple structure matches with a short name match
872    //     let read_schema = union_schema(vec![
873    //         Schema::Null,
874    //         point_2d_schema(),
875    //         point_3d_match_name_schema(),
876    //         point_3d_schema(),
877    //     ]);
878    //     assert_eq!(
879    //         SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema),
880    //         false
881    //     );
882    // }
883
884    // #[test]
885    // fn test_union_resolution_full_name_match() {
886    //     // there is a full name match that should be chosen
887    //     let read_schema = union_schema(vec![
888    //         Schema::Null,
889    //         point_2d_schema(),
890    //         point_3d_match_name_schema(),
891    //         point_3d_schema(),
892    //         point_2d_fullname_schema(),
893    //     ]);
894    //     assert!(SchemaCompatibility::can_read(
895    //         &point_2d_fullname_schema(),
896    //         &read_schema
897    //     ));
898    // }
899}