1use crate::schema::{Schema, SchemaKind};
3use std::{
4 collections::{hash_map::DefaultHasher, HashSet},
5 hash::Hasher,
6 ptr,
7};
8
9pub struct SchemaCompatibility;
10
11struct Checker {
12 recursion: HashSet<(u64, u64)>,
13}
14
15impl Checker {
16 pub(crate) fn new() -> Self {
18 Self {
19 recursion: HashSet::new(),
20 }
21 }
22
23 pub(crate) fn can_read(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
24 self.full_match_schemas(writers_schema, readers_schema)
25 }
26
27 pub(crate) fn full_match_schemas(
28 &mut self,
29 writers_schema: &Schema,
30 readers_schema: &Schema,
31 ) -> bool {
32 if self.recursion_in_progress(writers_schema, readers_schema) {
33 return true;
34 }
35
36 if !SchemaCompatibility::match_schemas(writers_schema, readers_schema) {
37 return false;
38 }
39
40 let w_type = SchemaKind::from(writers_schema);
41 let r_type = SchemaKind::from(readers_schema);
42
43 if w_type != SchemaKind::Union && (r_type.is_primitive() || r_type == SchemaKind::Fixed) {
44 return true;
45 }
46
47 match r_type {
48 SchemaKind::Record => self.match_record_schemas(writers_schema, readers_schema),
49 SchemaKind::Map => {
50 if let Schema::Map(w_m) = writers_schema {
51 if let Schema::Map(r_m) = readers_schema {
52 self.full_match_schemas(w_m, r_m)
53 } else {
54 unreachable!("readers_schema should have been Schema::Map")
55 }
56 } else {
57 unreachable!("writers_schema should have been Schema::Map")
58 }
59 }
60 SchemaKind::Array => {
61 if let Schema::Array(w_a) = writers_schema {
62 if let Schema::Array(r_a) = readers_schema {
63 self.full_match_schemas(w_a, r_a)
64 } else {
65 unreachable!("readers_schema should have been Schema::Array")
66 }
67 } else {
68 unreachable!("writers_schema should have been Schema::Array")
69 }
70 }
71 SchemaKind::Union => self.match_union_schemas(writers_schema, readers_schema),
72 SchemaKind::Enum => {
73 if let Schema::Enum {
75 symbols: w_symbols, ..
76 } = writers_schema
77 {
78 if let Schema::Enum {
79 symbols: r_symbols, ..
80 } = readers_schema
81 {
82 return w_symbols.iter().find(|e| !r_symbols.contains(e)).is_none();
83 }
84 }
85 false
86 }
87 _ => {
88 if w_type == SchemaKind::Union {
89 if let Schema::Union(r) = writers_schema {
90 if r.schemas.len() == 1 {
91 return self.full_match_schemas(&r.schemas[0], readers_schema);
92 }
93 }
94 }
95 false
96 }
97 }
98 }
99
100 fn match_record_schemas(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
101 let w_type = SchemaKind::from(writers_schema);
102
103 if w_type == SchemaKind::Union {
104 return false;
105 }
106
107 if let Schema::Record {
108 fields: w_fields,
109 lookup: w_lookup,
110 ..
111 } = writers_schema
112 {
113 if let Schema::Record {
114 fields: r_fields, ..
115 } = readers_schema
116 {
117 for field in r_fields.iter() {
118 if let Some(pos) = w_lookup.get(&field.name) {
119 if !self.full_match_schemas(&w_fields[*pos].schema, &field.schema) {
120 return false;
121 }
122 } else if field.default.is_none() {
123 return false;
124 }
125 }
126 }
127 }
128 true
129 }
130
131 fn match_union_schemas(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
132 let w_type = SchemaKind::from(writers_schema);
133 let r_type = SchemaKind::from(readers_schema);
134
135 assert_eq!(r_type, SchemaKind::Union);
136
137 if w_type == SchemaKind::Union {
138 if let Schema::Union(u) = writers_schema {
139 u.schemas
140 .iter()
141 .all(|schema| self.full_match_schemas(schema, readers_schema))
142 } else {
143 unreachable!("writers_schema should have been Schema::Union")
144 }
145 } else if let Schema::Union(u) = readers_schema {
146 u.schemas
147 .iter()
148 .any(|schema| self.full_match_schemas(writers_schema, schema))
149 } else {
150 unreachable!("readers_schema should have been Schema::Union")
151 }
152 }
153
154 fn recursion_in_progress(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
155 let mut hasher = DefaultHasher::new();
156 ptr::hash(writers_schema, &mut hasher);
157 let w_hash = hasher.finish();
158
159 hasher = DefaultHasher::new();
160 ptr::hash(readers_schema, &mut hasher);
161 let r_hash = hasher.finish();
162
163 let key = (w_hash, r_hash);
164 !self.recursion.insert(key)
167 }
168}
169
170impl SchemaCompatibility {
171 pub fn can_read(writers_schema: &Schema, readers_schema: &Schema) -> bool {
174 let mut c = Checker::new();
175 c.can_read(writers_schema, readers_schema)
176 }
177
178 pub fn mutual_read(writers_schema: &Schema, readers_schema: &Schema) -> bool {
181 SchemaCompatibility::can_read(writers_schema, readers_schema)
182 && SchemaCompatibility::can_read(readers_schema, writers_schema)
183 }
184
185 pub(crate) fn match_schemas(writers_schema: &Schema, readers_schema: &Schema) -> bool {
191 let w_type = SchemaKind::from(writers_schema);
192 let r_type = SchemaKind::from(readers_schema);
193
194 if w_type == SchemaKind::Union || r_type == SchemaKind::Union {
195 return true;
196 }
197
198 if w_type == r_type {
199 if r_type.is_primitive() {
200 return true;
201 }
202
203 match r_type {
204 SchemaKind::Record => {
205 if let Schema::Record { name: w_name, .. } = writers_schema {
206 if let Schema::Record { name: r_name, .. } = readers_schema {
207 return w_name.fullname(None) == r_name.fullname(None);
208 } else {
209 unreachable!("readers_schema should have been Schema::Record")
210 }
211 } else {
212 unreachable!("writers_schema should have been Schema::Record")
213 }
214 }
215 SchemaKind::Fixed => {
216 if let Schema::Fixed {
217 name: w_name,
218 size: w_size,
219 } = writers_schema
220 {
221 if let Schema::Fixed {
222 name: r_name,
223 size: r_size,
224 } = readers_schema
225 {
226 return w_name.fullname(None) == r_name.fullname(None)
227 && w_size == r_size;
228 } else {
229 unreachable!("readers_schema should have been Schema::Fixed")
230 }
231 } else {
232 unreachable!("writers_schema should have been Schema::Fixed")
233 }
234 }
235 SchemaKind::Enum => {
236 if let Schema::Enum { name: w_name, .. } = writers_schema {
237 if let Schema::Enum { name: r_name, .. } = readers_schema {
238 return w_name.fullname(None) == r_name.fullname(None);
239 } else {
240 unreachable!("readers_schema should have been Schema::Enum")
241 }
242 } else {
243 unreachable!("writers_schema should have been Schema::Enum")
244 }
245 }
246 SchemaKind::Map => {
247 if let Schema::Map(w_m) = writers_schema {
248 if let Schema::Map(r_m) = readers_schema {
249 return SchemaCompatibility::match_schemas(w_m, r_m);
250 } else {
251 unreachable!("readers_schema should have been Schema::Map")
252 }
253 } else {
254 unreachable!("writers_schema should have been Schema::Map")
255 }
256 }
257 SchemaKind::Array => {
258 if let Schema::Array(w_a) = writers_schema {
259 if let Schema::Array(r_a) = readers_schema {
260 return SchemaCompatibility::match_schemas(w_a, r_a);
261 } else {
262 unreachable!("readers_schema should have been Schema::Array")
263 }
264 } else {
265 unreachable!("writers_schema should have been Schema::Array")
266 }
267 }
268 _ => (),
269 };
270 }
271
272 if w_type == SchemaKind::Int
273 && vec![SchemaKind::Long, SchemaKind::Float, SchemaKind::Double]
274 .iter()
275 .any(|&t| t == r_type)
276 {
277 return true;
278 }
279
280 if w_type == SchemaKind::Long
281 && vec![SchemaKind::Float, SchemaKind::Double]
282 .iter()
283 .any(|&t| t == r_type)
284 {
285 return true;
286 }
287
288 if w_type == SchemaKind::Float && r_type == SchemaKind::Double {
289 return true;
290 }
291
292 if w_type == SchemaKind::String && r_type == SchemaKind::Bytes {
293 return true;
294 }
295
296 if w_type == SchemaKind::Bytes && r_type == SchemaKind::String {
297 return true;
298 }
299
300 false
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307
308 fn int_array_schema() -> Schema {
309 Schema::parse_str(r#"{"type":"array", "items":"int"}"#).unwrap()
310 }
311
312 fn long_array_schema() -> Schema {
313 Schema::parse_str(r#"{"type":"array", "items":"long"}"#).unwrap()
314 }
315
316 fn string_array_schema() -> Schema {
317 Schema::parse_str(r#"{"type":"array", "items":"string"}"#).unwrap()
318 }
319
320 fn int_map_schema() -> Schema {
321 Schema::parse_str(r#"{"type":"map", "values":"int"}"#).unwrap()
322 }
323
324 fn long_map_schema() -> Schema {
325 Schema::parse_str(r#"{"type":"map", "values":"long"}"#).unwrap()
326 }
327
328 fn string_map_schema() -> Schema {
329 Schema::parse_str(r#"{"type":"map", "values":"string"}"#).unwrap()
330 }
331
332 fn enum1_ab_schema() -> Schema {
333 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B"]}"#).unwrap()
334 }
335
336 fn enum1_abc_schema() -> Schema {
337 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}"#).unwrap()
338 }
339
340 fn enum1_bc_schema() -> Schema {
341 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["B","C"]}"#).unwrap()
342 }
343
344 fn enum2_ab_schema() -> Schema {
345 Schema::parse_str(r#"{"type":"enum", "name":"Enum2", "symbols":["A","B"]}"#).unwrap()
346 }
347
348 fn empty_record1_schema() -> Schema {
349 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[]}"#).unwrap()
350 }
351
352 fn empty_record2_schema() -> Schema {
353 Schema::parse_str(r#"{"type":"record", "name":"Record2", "fields": []}"#).unwrap()
354 }
355
356 fn a_int_record1_schema() -> Schema {
357 Schema::parse_str(
358 r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}]}"#,
359 )
360 .unwrap()
361 }
362
363 fn a_long_record1_schema() -> Schema {
364 Schema::parse_str(
365 r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"long"}]}"#,
366 )
367 .unwrap()
368 }
369
370 fn a_int_b_int_record1_schema() -> Schema {
371 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int"}]}"#).unwrap()
372 }
373
374 fn a_dint_record1_schema() -> Schema {
375 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}]}"#).unwrap()
376 }
377
378 fn a_int_b_dint_record1_schema() -> Schema {
379 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
380 }
381
382 fn a_dint_b_dint_record1_schema() -> Schema {
383 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
384 }
385
386 fn nested_record() -> Schema {
387 Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}}]}"#).unwrap()
388 }
389
390 fn nested_optional_record() -> Schema {
391 Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":["null",{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}],"default":null}]}"#).unwrap()
392 }
393
394 fn int_list_record_schema() -> Schema {
395 Schema::parse_str(r#"{"type":"record", "name":"List", "fields": [{"name": "head", "type": "int"},{"name": "tail", "type": "array", "items": "int"}]}"#).unwrap()
396 }
397
398 fn long_list_record_schema() -> Schema {
399 Schema::parse_str(
400 r#"
401 {
402 "type":"record", "name":"List", "fields": [
403 {"name": "head", "type": "long"},
404 {"name": "tail", "type": "array", "items": "long"}
405 ]}
406"#,
407 )
408 .unwrap()
409 }
410
411 fn union_schema(schemas: Vec<Schema>) -> Schema {
412 let schema_string = schemas
413 .iter()
414 .map(|s| s.canonical_form())
415 .collect::<Vec<String>>()
416 .join(",");
417 dbg!(&schema_string);
418 Schema::parse_str(&format!("[{}]", schema_string)).unwrap()
419 }
420
421 fn empty_union_schema() -> Schema {
422 union_schema(vec![])
423 }
424
425 fn int_union_schema() -> Schema {
429 union_schema(vec![Schema::Int])
430 }
431
432 fn long_union_schema() -> Schema {
433 union_schema(vec![Schema::Long])
434 }
435
436 fn string_union_schema() -> Schema {
437 union_schema(vec![Schema::String])
438 }
439
440 fn int_string_union_schema() -> Schema {
441 union_schema(vec![Schema::Int, Schema::String])
442 }
443
444 fn string_int_union_schema() -> Schema {
445 union_schema(vec![Schema::String, Schema::Int])
446 }
447
448 #[test]
449 fn test_broken() {
450 assert!(!SchemaCompatibility::can_read(
451 &int_string_union_schema(),
452 &int_union_schema()
453 ))
454 }
455
456 #[test]
457 fn test_incompatible_reader_writer_pairs() {
458 let incompatible_schemas = vec![
459 (Schema::Null, Schema::Int),
461 (Schema::Null, Schema::Long),
462 (Schema::Boolean, Schema::Int),
464 (Schema::Int, Schema::Null),
466 (Schema::Int, Schema::Boolean),
467 (Schema::Int, Schema::Long),
468 (Schema::Int, Schema::Float),
469 (Schema::Int, Schema::Double),
470 (Schema::Long, Schema::Float),
472 (Schema::Long, Schema::Double),
473 (Schema::Float, Schema::Double),
475 (Schema::String, Schema::Boolean),
477 (Schema::String, Schema::Int),
478 (Schema::Bytes, Schema::Null),
480 (Schema::Bytes, Schema::Int),
481 (int_array_schema(), long_array_schema()),
483 (int_map_schema(), int_array_schema()),
484 (int_array_schema(), int_map_schema()),
485 (int_map_schema(), long_map_schema()),
486 (enum1_ab_schema(), enum1_abc_schema()),
488 (enum1_bc_schema(), enum1_abc_schema()),
489 (enum1_ab_schema(), enum2_ab_schema()),
490 (Schema::Int, enum2_ab_schema()),
491 (enum2_ab_schema(), Schema::Int),
492 (int_union_schema(), int_string_union_schema()),
494 (string_union_schema(), int_string_union_schema()),
495 (empty_record2_schema(), empty_record1_schema()),
497 (a_int_record1_schema(), empty_record1_schema()),
498 (a_int_b_dint_record1_schema(), empty_record1_schema()),
499 (int_list_record_schema(), long_list_record_schema()),
500 (nested_record(), nested_optional_record()),
501 ];
502
503 assert!(!incompatible_schemas
504 .iter()
505 .any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader)));
506 }
507
508 #[test]
509 fn test_compatible_reader_writer_pairs() {
510 let compatible_schemas = vec![
511 (Schema::Null, Schema::Null),
512 (Schema::Long, Schema::Int),
513 (Schema::Float, Schema::Int),
514 (Schema::Float, Schema::Long),
515 (Schema::Double, Schema::Long),
516 (Schema::Double, Schema::Int),
517 (Schema::Double, Schema::Float),
518 (Schema::String, Schema::Bytes),
519 (Schema::Bytes, Schema::String),
520 (int_array_schema(), int_array_schema()),
521 (long_array_schema(), int_array_schema()),
522 (int_map_schema(), int_map_schema()),
523 (long_map_schema(), int_map_schema()),
524 (enum1_ab_schema(), enum1_ab_schema()),
525 (enum1_abc_schema(), enum1_ab_schema()),
526 (empty_union_schema(), empty_union_schema()),
527 (int_union_schema(), int_union_schema()),
528 (int_string_union_schema(), string_int_union_schema()),
529 (int_union_schema(), empty_union_schema()),
530 (long_union_schema(), int_union_schema()),
531 (int_union_schema(), Schema::Int),
532 (Schema::Int, int_union_schema()),
533 (empty_record1_schema(), empty_record1_schema()),
534 (empty_record1_schema(), a_int_record1_schema()),
535 (a_int_record1_schema(), a_int_record1_schema()),
536 (a_dint_record1_schema(), a_int_record1_schema()),
537 (a_dint_record1_schema(), a_dint_record1_schema()),
538 (a_int_record1_schema(), a_dint_record1_schema()),
539 (a_long_record1_schema(), a_int_record1_schema()),
540 (a_int_record1_schema(), a_int_b_int_record1_schema()),
541 (a_dint_record1_schema(), a_int_b_int_record1_schema()),
542 (a_int_b_dint_record1_schema(), a_int_record1_schema()),
543 (a_dint_b_dint_record1_schema(), empty_record1_schema()),
544 (a_dint_b_dint_record1_schema(), a_int_record1_schema()),
545 (a_int_b_int_record1_schema(), a_dint_b_dint_record1_schema()),
546 (int_list_record_schema(), int_list_record_schema()),
547 (long_list_record_schema(), long_list_record_schema()),
548 (long_list_record_schema(), int_list_record_schema()),
549 (nested_optional_record(), nested_record()),
550 ];
551
552 assert!(compatible_schemas
553 .iter()
554 .all(|(reader, writer)| SchemaCompatibility::can_read(writer, reader)));
555 }
556
557 fn writer_schema() -> Schema {
558 Schema::parse_str(
559 r#"
560 {"type":"record", "name":"Record", "fields":[
561 {"name":"oldfield1", "type":"int"},
562 {"name":"oldfield2", "type":"string"}
563 ]}
564"#,
565 )
566 .unwrap()
567 }
568
569 #[test]
570 fn test_missing_field() {
571 let reader_schema = Schema::parse_str(
572 r#"
573 {"type":"record", "name":"Record", "fields":[
574 {"name":"oldfield1", "type":"int"}
575 ]}
576"#,
577 )
578 .unwrap();
579 assert!(SchemaCompatibility::can_read(
580 &writer_schema(),
581 &reader_schema,
582 ));
583 assert_eq!(
584 SchemaCompatibility::can_read(&reader_schema, &writer_schema()),
585 false
586 );
587 }
588
589 #[test]
590 fn test_missing_second_field() {
591 let reader_schema = Schema::parse_str(
592 r#"
593 {"type":"record", "name":"Record", "fields":[
594 {"name":"oldfield2", "type":"string"}
595 ]}
596"#,
597 )
598 .unwrap();
599 assert!(SchemaCompatibility::can_read(
600 &writer_schema(),
601 &reader_schema
602 ));
603 assert_eq!(
604 SchemaCompatibility::can_read(&reader_schema, &writer_schema()),
605 false
606 );
607 }
608
609 #[test]
610 fn test_all_fields() {
611 let reader_schema = Schema::parse_str(
612 r#"
613 {"type":"record", "name":"Record", "fields":[
614 {"name":"oldfield1", "type":"int"},
615 {"name":"oldfield2", "type":"string"}
616 ]}
617"#,
618 )
619 .unwrap();
620 assert!(SchemaCompatibility::can_read(
621 &writer_schema(),
622 &reader_schema
623 ));
624 assert!(SchemaCompatibility::can_read(
625 &reader_schema,
626 &writer_schema()
627 ));
628 }
629
630 #[test]
631 fn test_new_field_with_default() {
632 let reader_schema = Schema::parse_str(
633 r#"
634 {"type":"record", "name":"Record", "fields":[
635 {"name":"oldfield1", "type":"int"},
636 {"name":"newfield1", "type":"int", "default":42}
637 ]}
638"#,
639 )
640 .unwrap();
641 assert!(SchemaCompatibility::can_read(
642 &writer_schema(),
643 &reader_schema
644 ));
645 assert_eq!(
646 SchemaCompatibility::can_read(&reader_schema, &writer_schema()),
647 false
648 );
649 }
650
651 #[test]
652 fn test_new_field() {
653 let reader_schema = Schema::parse_str(
654 r#"
655 {"type":"record", "name":"Record", "fields":[
656 {"name":"oldfield1", "type":"int"},
657 {"name":"newfield1", "type":"int"}
658 ]}
659"#,
660 )
661 .unwrap();
662 assert_eq!(
663 SchemaCompatibility::can_read(&writer_schema(), &reader_schema),
664 false
665 );
666 assert_eq!(
667 SchemaCompatibility::can_read(&reader_schema, &writer_schema()),
668 false
669 );
670 }
671
672 #[test]
673 fn test_array_writer_schema() {
674 let valid_reader = string_array_schema();
675 let invalid_reader = string_map_schema();
676
677 assert!(SchemaCompatibility::can_read(
678 &string_array_schema(),
679 &valid_reader
680 ));
681 assert_eq!(
682 SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader),
683 false
684 );
685 }
686
687 #[test]
688 fn test_primitive_writer_schema() {
689 let valid_reader = Schema::String;
690 assert!(SchemaCompatibility::can_read(
691 &Schema::String,
692 &valid_reader
693 ));
694 assert_eq!(
695 SchemaCompatibility::can_read(&Schema::Int, &Schema::String),
696 false
697 );
698 }
699
700 #[test]
701 fn test_union_reader_writer_subset_incompatiblity() {
702 let union_writer = union_schema(vec![Schema::Int, Schema::String]);
704 let union_reader = union_schema(vec![Schema::String]);
705
706 assert_eq!(
707 SchemaCompatibility::can_read(&union_writer, &union_reader),
708 false
709 );
710 assert!(SchemaCompatibility::can_read(&union_reader, &union_writer));
711 }
712
713 #[test]
714 fn test_incompatible_record_field() {
715 let string_schema = Schema::parse_str(
716 r#"
717 {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
718 {"name":"field1", "type":"string"}
719 ]}
720 "#,
721 )
722 .unwrap();
723
724 let int_schema = Schema::parse_str(
725 r#"
726 {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
727 {"name":"field1", "type":"int"}
728 ]}
729"#,
730 )
731 .unwrap();
732
733 assert_eq!(
734 SchemaCompatibility::can_read(&string_schema, &int_schema),
735 false
736 );
737 }
738
739 #[test]
740 fn test_enum_symbols() {
741 let enum_schema1 = Schema::parse_str(
742 r#"
743 {"type":"enum", "name":"MyEnum", "symbols":["A","B"]}
744"#,
745 )
746 .unwrap();
747 let enum_schema2 =
748 Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)
749 .unwrap();
750 assert_eq!(
751 SchemaCompatibility::can_read(&enum_schema2, &enum_schema1),
752 false
753 );
754 assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2));
755 }
756
757 fn point_2d_fullname_schema() -> Schema {
773 Schema::parse_str(
774 r#"
775 {"type":"record", "name":"Point", "namespace":"written", "fields":[
776 {"name":"x", "type":"double"},
777 {"name":"y", "type":"double"}
778 ]}
779 "#,
780 )
781 .unwrap()
782 }
783
784 fn point_3d_no_default_schema() -> Schema {
785 Schema::parse_str(
786 r#"
787 {"type":"record", "name":"Point", "fields":[
788 {"name":"x", "type":"double"},
789 {"name":"y", "type":"double"},
790 {"name":"z", "type":"double"}
791 ]}
792 "#,
793 )
794 .unwrap()
795 }
796
797 #[test]
827 fn test_union_resolution_no_structure_match() {
828 let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]);
830 assert_eq!(
831 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema),
832 false
833 );
834 }
835
836 }