1use crate::{
19 schema::{
20 ArraySchema, DecimalSchema, EnumSchema, FixedSchema, MapSchema, RecordField, RecordSchema,
21 UnionSchema,
22 },
23 Schema,
24};
25use std::{fmt::Debug, sync::OnceLock};
26
27pub trait SchemataEq: Debug + Send + Sync {
30 fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool;
32}
33
34#[derive(Debug)]
38pub struct SpecificationEq;
39impl SchemataEq for SpecificationEq {
40 fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
41 schema_one.canonical_form() == schema_two.canonical_form()
42 }
43}
44
45#[derive(Debug)]
49pub struct StructFieldEq {
50 pub include_attributes: bool,
53}
54
55impl SchemataEq for StructFieldEq {
56 fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
57 macro_rules! compare_primitive {
58 ($primitive:ident) => {
59 if let Schema::$primitive = schema_one {
60 if let Schema::$primitive = schema_two {
61 return true;
62 }
63 return false;
64 }
65 };
66 }
67
68 if schema_one.name() != schema_two.name() {
69 return false;
70 }
71
72 compare_primitive!(Null);
73 compare_primitive!(Boolean);
74 compare_primitive!(Int);
75 compare_primitive!(Int);
76 compare_primitive!(Long);
77 compare_primitive!(Float);
78 compare_primitive!(Double);
79 compare_primitive!(Bytes);
80 compare_primitive!(String);
81 compare_primitive!(Uuid);
82 compare_primitive!(BigDecimal);
83 compare_primitive!(Date);
84 compare_primitive!(Duration);
85 compare_primitive!(TimeMicros);
86 compare_primitive!(TimeMillis);
87 compare_primitive!(TimestampMicros);
88 compare_primitive!(TimestampMillis);
89 compare_primitive!(TimestampNanos);
90 compare_primitive!(LocalTimestampMicros);
91 compare_primitive!(LocalTimestampMillis);
92 compare_primitive!(LocalTimestampNanos);
93
94 if self.include_attributes
95 && schema_one.custom_attributes() != schema_two.custom_attributes()
96 {
97 return false;
98 }
99
100 if let Schema::Record(RecordSchema {
101 fields: fields_one, ..
102 }) = schema_one
103 {
104 if let Schema::Record(RecordSchema {
105 fields: fields_two, ..
106 }) = schema_two
107 {
108 return self.compare_fields(fields_one, fields_two);
109 }
110 return false;
111 }
112
113 if let Schema::Enum(EnumSchema {
114 symbols: symbols_one,
115 ..
116 }) = schema_one
117 {
118 if let Schema::Enum(EnumSchema {
119 symbols: symbols_two,
120 ..
121 }) = schema_two
122 {
123 return symbols_one == symbols_two;
124 }
125 return false;
126 }
127
128 if let Schema::Fixed(FixedSchema { size: size_one, .. }) = schema_one {
129 if let Schema::Fixed(FixedSchema { size: size_two, .. }) = schema_two {
130 return size_one == size_two;
131 }
132 return false;
133 }
134
135 if let Schema::Union(UnionSchema {
136 schemas: schemas_one,
137 ..
138 }) = schema_one
139 {
140 if let Schema::Union(UnionSchema {
141 schemas: schemas_two,
142 ..
143 }) = schema_two
144 {
145 return schemas_one.len() == schemas_two.len()
146 && schemas_one
147 .iter()
148 .zip(schemas_two.iter())
149 .all(|(s1, s2)| self.compare(s1, s2));
150 }
151 return false;
152 }
153
154 if let Schema::Decimal(DecimalSchema {
155 precision: precision_one,
156 scale: scale_one,
157 ..
158 }) = schema_one
159 {
160 if let Schema::Decimal(DecimalSchema {
161 precision: precision_two,
162 scale: scale_two,
163 ..
164 }) = schema_two
165 {
166 return precision_one == precision_two && scale_one == scale_two;
167 }
168 return false;
169 }
170
171 if let Schema::Array(ArraySchema {
172 items: items_one, ..
173 }) = schema_one
174 {
175 if let Schema::Array(ArraySchema {
176 items: items_two, ..
177 }) = schema_two
178 {
179 return items_one == items_two;
180 }
181 return false;
182 }
183
184 if let Schema::Map(MapSchema {
185 types: types_one, ..
186 }) = schema_one
187 {
188 if let Schema::Map(MapSchema {
189 types: types_two, ..
190 }) = schema_two
191 {
192 return self.compare(types_one, types_two);
193 }
194 return false;
195 }
196
197 if let Schema::Ref { name: name_one } = schema_one {
198 if let Schema::Ref { name: name_two } = schema_two {
199 return name_one == name_two;
200 }
201 return false;
202 }
203
204 error!(
205 "This is a bug in schema_equality.rs! The following schemata types are not checked! \
206 Please report it to the Avro library maintainers! \
207 \n{:?}\n\n{:?}",
208 schema_one, schema_two
209 );
210 false
211 }
212}
213
214impl StructFieldEq {
215 fn compare_fields(&self, fields_one: &[RecordField], fields_two: &[RecordField]) -> bool {
216 fields_one.len() == fields_two.len()
217 && fields_one
218 .iter()
219 .zip(fields_two.iter())
220 .all(|(f1, f2)| self.compare(&f1.schema, &f2.schema))
221 }
222}
223
224static SCHEMATA_COMPARATOR_ONCE: OnceLock<Box<dyn SchemataEq>> = OnceLock::new();
225
226pub fn set_schemata_equality_comparator(
234 comparator: Box<dyn SchemataEq>,
235) -> Result<(), Box<dyn SchemataEq>> {
236 debug!(
237 "Setting a custom schemata equality comparator: {:?}.",
238 comparator
239 );
240 SCHEMATA_COMPARATOR_ONCE.set(comparator)
241}
242
243pub(crate) fn compare_schemata(schema_one: &Schema, schema_two: &Schema) -> bool {
244 SCHEMATA_COMPARATOR_ONCE
245 .get_or_init(|| {
246 debug!("Going to use the default schemata equality comparator: SpecificationEq.",);
247 Box::new(StructFieldEq {
248 include_attributes: false,
249 })
250 })
251 .compare(schema_one, schema_two)
252}
253
254#[cfg(test)]
255#[allow(non_snake_case)]
256mod tests {
257 use super::*;
258 use crate::schema::{Name, RecordFieldOrder};
259 use apache_avro_test_helper::TestResult;
260 use serde_json::Value;
261 use std::collections::BTreeMap;
262
263 const SPECIFICATION_EQ: SpecificationEq = SpecificationEq;
264 const STRUCT_FIELD_EQ: StructFieldEq = StructFieldEq {
265 include_attributes: false,
266 };
267
268 macro_rules! test_primitives {
269 ($primitive:ident) => {
270 paste::item! {
271 #[test]
272 fn [<test_avro_3939_compare_schemata_$primitive>]() {
273 let specification_eq_res = SPECIFICATION_EQ.compare(&Schema::$primitive, &Schema::$primitive);
274 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&Schema::$primitive, &Schema::$primitive);
275 assert_eq!(specification_eq_res, struct_field_eq_res)
276 }
277 }
278 };
279 }
280
281 test_primitives!(Null);
282 test_primitives!(Boolean);
283 test_primitives!(Int);
284 test_primitives!(Long);
285 test_primitives!(Float);
286 test_primitives!(Double);
287 test_primitives!(Bytes);
288 test_primitives!(String);
289 test_primitives!(Uuid);
290 test_primitives!(BigDecimal);
291 test_primitives!(Date);
292 test_primitives!(Duration);
293 test_primitives!(TimeMicros);
294 test_primitives!(TimeMillis);
295 test_primitives!(TimestampMicros);
296 test_primitives!(TimestampMillis);
297 test_primitives!(TimestampNanos);
298 test_primitives!(LocalTimestampMicros);
299 test_primitives!(LocalTimestampMillis);
300 test_primitives!(LocalTimestampNanos);
301
302 #[test]
303 fn test_avro_3939_compare_named_schemata_with_different_names() {
304 let schema_one = Schema::Ref {
305 name: Name::from("name1"),
306 };
307
308 let schema_two = Schema::Ref {
309 name: Name::from("name2"),
310 };
311
312 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
313 assert!(!specification_eq_res);
314 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
315 assert!(!struct_field_eq_res);
316
317 assert_eq!(specification_eq_res, struct_field_eq_res);
318 }
319
320 #[test]
321 fn test_avro_3939_compare_schemata_not_including_attributes() {
322 let schema_one = Schema::map_with_attributes(
323 Schema::Boolean,
324 BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
325 );
326 let schema_two = Schema::map_with_attributes(
327 Schema::Boolean,
328 BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
329 );
330 assert!(STRUCT_FIELD_EQ.compare(&schema_one, &schema_two));
332 }
333
334 #[test]
335 fn test_avro_3939_compare_schemata_including_attributes() {
336 let struct_field_eq = StructFieldEq {
337 include_attributes: true,
338 };
339 let schema_one = Schema::map_with_attributes(
340 Schema::Boolean,
341 BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
342 );
343 let schema_two = Schema::map_with_attributes(
344 Schema::Boolean,
345 BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
346 );
347 assert!(!struct_field_eq.compare(&schema_one, &schema_two));
348 }
349
350 #[test]
351 fn test_avro_3939_compare_map_schemata() {
352 let schema_one = Schema::map(Schema::Boolean);
353 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
354 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
355
356 let schema_two = Schema::map(Schema::Boolean);
357
358 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
359 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
360 assert!(
361 specification_eq_res,
362 "SpecificationEq: Equality of two Schema::Map failed!"
363 );
364 assert!(
365 struct_field_eq_res,
366 "StructFieldEq: Equality of two Schema::Map failed!"
367 );
368 assert_eq!(specification_eq_res, struct_field_eq_res);
369 }
370
371 #[test]
372 fn test_avro_3939_compare_array_schemata() {
373 let schema_one = Schema::array(Schema::Boolean);
374 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
375 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
376
377 let schema_two = Schema::array(Schema::Boolean);
378
379 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
380 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
381 assert!(
382 specification_eq_res,
383 "SpecificationEq: Equality of two Schema::Array failed!"
384 );
385 assert!(
386 struct_field_eq_res,
387 "StructFieldEq: Equality of two Schema::Array failed!"
388 );
389 assert_eq!(specification_eq_res, struct_field_eq_res);
390 }
391
392 #[test]
393 fn test_avro_3939_compare_decimal_schemata() {
394 let schema_one = Schema::Decimal(DecimalSchema {
395 precision: 10,
396 scale: 2,
397 inner: Box::new(Schema::Bytes),
398 });
399 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
400 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
401
402 let schema_two = Schema::Decimal(DecimalSchema {
403 precision: 10,
404 scale: 2,
405 inner: Box::new(Schema::Bytes),
406 });
407
408 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
409 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
410 assert!(
411 specification_eq_res,
412 "SpecificationEq: Equality of two Schema::Decimal failed!"
413 );
414 assert!(
415 struct_field_eq_res,
416 "StructFieldEq: Equality of two Schema::Decimal failed!"
417 );
418 assert_eq!(specification_eq_res, struct_field_eq_res);
419 }
420
421 #[test]
422 fn test_avro_3939_compare_fixed_schemata() {
423 let schema_one = Schema::Fixed(FixedSchema {
424 name: Name::from("fixed"),
425 doc: None,
426 size: 10,
427 default: None,
428 aliases: None,
429 attributes: BTreeMap::new(),
430 });
431 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
432 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
433
434 let schema_two = Schema::Fixed(FixedSchema {
435 name: Name::from("fixed"),
436 doc: None,
437 size: 10,
438 default: None,
439 aliases: None,
440 attributes: BTreeMap::new(),
441 });
442
443 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
444 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
445 assert!(
446 specification_eq_res,
447 "SpecificationEq: Equality of two Schema::Fixed failed!"
448 );
449 assert!(
450 struct_field_eq_res,
451 "StructFieldEq: Equality of two Schema::Fixed failed!"
452 );
453 assert_eq!(specification_eq_res, struct_field_eq_res);
454 }
455
456 #[test]
457 fn test_avro_3939_compare_enum_schemata() {
458 let schema_one = Schema::Enum(EnumSchema {
459 name: Name::from("enum"),
460 doc: None,
461 symbols: vec!["A".to_string(), "B".to_string()],
462 default: None,
463 aliases: None,
464 attributes: BTreeMap::new(),
465 });
466 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
467 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
468
469 let schema_two = Schema::Enum(EnumSchema {
470 name: Name::from("enum"),
471 doc: None,
472 symbols: vec!["A".to_string(), "B".to_string()],
473 default: None,
474 aliases: None,
475 attributes: BTreeMap::new(),
476 });
477
478 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
479 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
480 assert!(
481 specification_eq_res,
482 "SpecificationEq: Equality of two Schema::Enum failed!"
483 );
484 assert!(
485 struct_field_eq_res,
486 "StructFieldEq: Equality of two Schema::Enum failed!"
487 );
488 assert_eq!(specification_eq_res, struct_field_eq_res);
489 }
490
491 #[test]
492 fn test_avro_3939_compare_ref_schemata() {
493 let schema_one = Schema::Ref {
494 name: Name::from("ref"),
495 };
496 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
497 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
498
499 let schema_two = Schema::Ref {
500 name: Name::from("ref"),
501 };
502
503 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
504 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
505 assert!(
506 specification_eq_res,
507 "SpecificationEq: Equality of two Schema::Ref failed!"
508 );
509 assert!(
510 struct_field_eq_res,
511 "StructFieldEq: Equality of two Schema::Ref failed!"
512 );
513 assert_eq!(specification_eq_res, struct_field_eq_res);
514 }
515
516 #[test]
517 fn test_avro_3939_compare_record_schemata() {
518 let schema_one = Schema::Record(RecordSchema {
519 name: Name::from("record"),
520 doc: None,
521 fields: vec![RecordField {
522 name: "field".to_string(),
523 doc: None,
524 default: None,
525 schema: Schema::Boolean,
526 order: RecordFieldOrder::Ignore,
527 aliases: None,
528 custom_attributes: BTreeMap::new(),
529 position: 0,
530 }],
531 aliases: None,
532 attributes: BTreeMap::new(),
533 lookup: Default::default(),
534 });
535 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
536 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
537
538 let schema_two = Schema::Record(RecordSchema {
539 name: Name::from("record"),
540 doc: None,
541 fields: vec![RecordField {
542 name: "field".to_string(),
543 doc: None,
544 default: None,
545 schema: Schema::Boolean,
546 order: RecordFieldOrder::Ignore,
547 aliases: None,
548 custom_attributes: BTreeMap::new(),
549 position: 0,
550 }],
551 aliases: None,
552 attributes: BTreeMap::new(),
553 lookup: Default::default(),
554 });
555
556 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
557 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
558 assert!(
559 specification_eq_res,
560 "SpecificationEq: Equality of two Schema::Record failed!"
561 );
562 assert!(
563 struct_field_eq_res,
564 "StructFieldEq: Equality of two Schema::Record failed!"
565 );
566 assert_eq!(specification_eq_res, struct_field_eq_res);
567 }
568
569 #[test]
570 fn test_avro_3939_compare_union_schemata() -> TestResult {
571 let schema_one = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
572 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
573 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
574
575 let schema_two = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
576
577 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
578 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
579 assert!(
580 specification_eq_res,
581 "SpecificationEq: Equality of two Schema::Union failed!"
582 );
583 assert!(
584 struct_field_eq_res,
585 "StructFieldEq: Equality of two Schema::Union failed!"
586 );
587 assert_eq!(specification_eq_res, struct_field_eq_res);
588 Ok(())
589 }
590}