1use crate::error::{SchemaError, SchemaResult};
6use crate::types::SchemaType;
7use serde::{Deserialize, Serialize};
8
9pub use crate::types::CompatibilityLevel;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct CompatibilityResult {
15 pub is_compatible: bool,
17 #[serde(default, skip_serializing_if = "Vec::is_empty")]
19 pub messages: Vec<String>,
20}
21
22impl CompatibilityResult {
23 pub fn compatible() -> Self {
24 Self {
25 is_compatible: true,
26 messages: Vec::new(),
27 }
28 }
29
30 pub fn incompatible(messages: Vec<String>) -> Self {
31 Self {
32 is_compatible: false,
33 messages,
34 }
35 }
36}
37
38pub struct CompatibilityChecker {
40 level: CompatibilityLevel,
41}
42
43impl CompatibilityChecker {
44 pub fn new(level: CompatibilityLevel) -> Self {
45 Self { level }
46 }
47
48 pub fn check(
50 &self,
51 schema_type: SchemaType,
52 new_schema: &str,
53 existing_schemas: &[&str],
54 ) -> SchemaResult<CompatibilityResult> {
55 if self.level == CompatibilityLevel::None {
56 return Ok(CompatibilityResult::compatible());
57 }
58
59 if existing_schemas.is_empty() {
60 return Ok(CompatibilityResult::compatible());
61 }
62
63 match schema_type {
64 SchemaType::Avro => self.check_avro(new_schema, existing_schemas),
65 SchemaType::Json => self.check_json(new_schema, existing_schemas),
66 SchemaType::Protobuf => self.check_protobuf(new_schema, existing_schemas),
67 }
68 }
69
70 #[cfg(feature = "avro")]
77 fn check_avro(
78 &self,
79 new_schema: &str,
80 existing_schemas: &[&str],
81 ) -> SchemaResult<CompatibilityResult> {
82 use apache_avro::Schema;
83
84 let new = Schema::parse_str(new_schema)
85 .map_err(|e| SchemaError::ParseError(format!("New schema: {}", e)))?;
86
87 let schemas_to_check = if self.level.is_transitive() {
88 existing_schemas.to_vec()
89 } else {
90 existing_schemas
92 .last()
93 .map(|s| vec![*s])
94 .unwrap_or_default()
95 };
96
97 let mut errors = Vec::new();
98
99 for (i, existing_str) in schemas_to_check.iter().enumerate() {
100 let existing = Schema::parse_str(existing_str)
101 .map_err(|e| SchemaError::ParseError(format!("Existing schema {}: {}", i, e)))?;
102
103 if self.level.is_backward() {
105 if let Err(e) = check_schema_resolution(&existing, &new) {
106 errors.push(format!(
107 "Backward incompatible with version {}: {}",
108 i + 1,
109 e
110 ));
111 }
112 }
113
114 if self.level.is_forward() {
116 if let Err(e) = check_schema_resolution(&new, &existing) {
117 errors.push(format!(
118 "Forward incompatible with version {}: {}",
119 i + 1,
120 e
121 ));
122 }
123 }
124 }
125
126 if errors.is_empty() {
127 Ok(CompatibilityResult::compatible())
128 } else {
129 Ok(CompatibilityResult::incompatible(errors))
130 }
131 }
132
133 #[cfg(not(feature = "avro"))]
134 fn check_avro(
135 &self,
136 _new_schema: &str,
137 _existing_schemas: &[&str],
138 ) -> SchemaResult<CompatibilityResult> {
139 Err(SchemaError::Config("Avro support not enabled".to_string()))
140 }
141
142 fn check_json(
149 &self,
150 new_schema: &str,
151 existing_schemas: &[&str],
152 ) -> SchemaResult<CompatibilityResult> {
153 use serde_json::Value as JsonValue;
154
155 let new: JsonValue = serde_json::from_str(new_schema)
156 .map_err(|e| SchemaError::ParseError(format!("New JSON schema: {}", e)))?;
157
158 let schemas_to_check = if self.level.is_transitive() {
159 existing_schemas.to_vec()
160 } else {
161 existing_schemas
162 .last()
163 .map(|s| vec![*s])
164 .unwrap_or_default()
165 };
166
167 let mut messages = Vec::new();
168
169 for (i, existing_str) in schemas_to_check.iter().enumerate() {
170 let old: JsonValue = serde_json::from_str(existing_str)
171 .map_err(|e| SchemaError::ParseError(format!("Existing schema {}: {}", i, e)))?;
172
173 let result = self.check_json_pair(&new, &old)?;
174 if !result.is_compatible {
175 for msg in result.messages {
176 messages.push(format!("Version {}: {}", i + 1, msg));
177 }
178 }
179 }
180
181 if messages.is_empty() {
182 Ok(CompatibilityResult::compatible())
183 } else {
184 Ok(CompatibilityResult::incompatible(messages))
185 }
186 }
187
188 fn check_json_pair(
190 &self,
191 new: &serde_json::Value,
192 old: &serde_json::Value,
193 ) -> SchemaResult<CompatibilityResult> {
194 let mut messages = Vec::new();
195
196 let new_props = new.get("properties").and_then(|p| p.as_object());
197 let old_props = old.get("properties").and_then(|p| p.as_object());
198
199 let new_required: Vec<&str> = new
200 .get("required")
201 .and_then(|r| r.as_array())
202 .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
203 .unwrap_or_default();
204 let old_required: Vec<&str> = old
205 .get("required")
206 .and_then(|r| r.as_array())
207 .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
208 .unwrap_or_default();
209
210 if let (Some(new_p), Some(old_p)) = (new_props, old_props) {
211 if self.level.is_backward() {
213 for field in &new_required {
216 if !old_required.contains(field) && !old_p.contains_key(*field) {
217 messages.push(format!(
218 "BACKWARD incompatible: new required field '{}' not in old schema",
219 field
220 ));
221 }
222 }
223
224 for (name, old_def) in old_p {
226 if let Some(new_def) = new_p.get(name) {
227 if !self.json_types_compatible(old_def, new_def) {
228 messages.push(format!(
229 "BACKWARD incompatible: field '{}' type changed incompatibly",
230 name
231 ));
232 }
233 }
234 }
235 }
236
237 if self.level.is_forward() {
239 for field in &old_required {
242 if !new_p.contains_key(*field) {
243 messages.push(format!(
244 "FORWARD incompatible: required field '{}' removed from new schema",
245 field
246 ));
247 }
248 }
249
250 for (name, old_def) in old_p {
252 if let Some(new_def) = new_p.get(name) {
253 if !self.json_types_compatible(new_def, old_def) {
254 messages.push(format!(
255 "FORWARD incompatible: field '{}' type changed incompatibly",
256 name
257 ));
258 }
259 }
260 }
261 }
262 }
263
264 if messages.is_empty() {
265 Ok(CompatibilityResult::compatible())
266 } else {
267 Ok(CompatibilityResult::incompatible(messages))
268 }
269 }
270
271 fn json_types_compatible(
273 &self,
274 old_type: &serde_json::Value,
275 new_type: &serde_json::Value,
276 ) -> bool {
277 let old_t = old_type.get("type").and_then(|t| t.as_str());
278 let new_t = new_type.get("type").and_then(|t| t.as_str());
279
280 match (old_t, new_t) {
281 (Some(old), Some(new)) => {
282 if old == new {
284 return true;
285 }
286
287 if old == "integer" && new == "number" {
289 return true;
290 }
291
292 false
293 }
294 (None, None) => true,
296 _ => false,
297 }
298 }
299
300 fn check_protobuf(
307 &self,
308 new_schema: &str,
309 existing_schemas: &[&str],
310 ) -> SchemaResult<CompatibilityResult> {
311 let schemas_to_check = if self.level.is_transitive() {
312 existing_schemas.to_vec()
313 } else {
314 existing_schemas
315 .last()
316 .map(|s| vec![*s])
317 .unwrap_or_default()
318 };
319
320 let mut messages = Vec::new();
321
322 for (i, existing_str) in schemas_to_check.iter().enumerate() {
323 let result = self.check_protobuf_pair(new_schema, existing_str)?;
324 if !result.is_compatible {
325 for msg in result.messages {
326 messages.push(format!("Version {}: {}", i + 1, msg));
327 }
328 }
329 }
330
331 if messages.is_empty() {
332 Ok(CompatibilityResult::compatible())
333 } else {
334 Ok(CompatibilityResult::incompatible(messages))
335 }
336 }
337
338 fn check_protobuf_pair(
340 &self,
341 new_schema: &str,
342 existing_schema: &str,
343 ) -> SchemaResult<CompatibilityResult> {
344 use std::collections::{HashMap, HashSet};
345
346 let mut messages = Vec::new();
347
348 let field_pattern = regex::Regex::new(r"(\w+)\s*=\s*(\d+)")
351 .map_err(|e| SchemaError::ParseError(format!("Regex error: {}", e)))?;
352
353 let old_fields: HashMap<u32, String> = field_pattern
354 .captures_iter(existing_schema)
355 .filter_map(|cap| {
356 let name = cap.get(1)?.as_str().to_string();
357 let num: u32 = cap.get(2)?.as_str().parse().ok()?;
358 Some((num, name))
359 })
360 .collect();
361
362 let new_fields: HashMap<u32, String> = field_pattern
363 .captures_iter(new_schema)
364 .filter_map(|cap| {
365 let name = cap.get(1)?.as_str().to_string();
366 let num: u32 = cap.get(2)?.as_str().parse().ok()?;
367 Some((num, name))
368 })
369 .collect();
370
371 for (num, old_name) in &old_fields {
373 if let Some(new_name) = new_fields.get(num) {
374 if old_name != new_name {
375 messages.push(format!(
376 "PROTOBUF incompatible: field number {} reused (was '{}', now '{}')",
377 num, old_name, new_name
378 ));
379 }
380 }
381 }
382
383 let reserved_pattern = regex::Regex::new(r"reserved\s+(\d+(?:,\s*\d+)*)")
385 .map_err(|e| SchemaError::ParseError(format!("Regex error: {}", e)))?;
386
387 let old_reserved: HashSet<u32> = reserved_pattern
388 .captures_iter(existing_schema)
389 .flat_map(|cap| {
390 cap.get(1)
391 .map(|m| {
392 m.as_str()
393 .split(',')
394 .filter_map(|n| n.trim().parse().ok())
395 .collect::<Vec<u32>>()
396 })
397 .unwrap_or_default()
398 })
399 .collect();
400
401 for (num, name) in &new_fields {
402 if old_reserved.contains(num) {
403 messages.push(format!(
404 "PROTOBUF incompatible: field '{}' uses reserved number {}",
405 name, num
406 ));
407 }
408 }
409
410 let required_pattern = regex::Regex::new(r"required\s+\w+\s+(\w+)")
412 .map_err(|e| SchemaError::ParseError(format!("Regex error: {}", e)))?;
413
414 let old_required: HashSet<&str> = required_pattern
415 .captures_iter(existing_schema)
416 .filter_map(|cap| cap.get(1).map(|m| m.as_str()))
417 .collect();
418
419 for required_name in old_required {
420 if !new_fields.values().any(|n| n == required_name) {
421 messages.push(format!(
422 "PROTOBUF incompatible: required field '{}' removed",
423 required_name
424 ));
425 }
426 }
427
428 if messages.is_empty() {
429 Ok(CompatibilityResult::compatible())
430 } else {
431 Ok(CompatibilityResult::incompatible(messages))
432 }
433 }
434}
435
436#[cfg(feature = "avro")]
443fn check_schema_resolution(
444 writer: &apache_avro::Schema,
445 reader: &apache_avro::Schema,
446) -> Result<(), String> {
447 use apache_avro::Schema;
448
449 match (writer, reader) {
450 (Schema::Null, Schema::Null) => Ok(()),
452 (Schema::Boolean, Schema::Boolean) => Ok(()),
453 (Schema::String, Schema::String) => Ok(()),
454 (Schema::Bytes, Schema::Bytes) => Ok(()),
455
456 (Schema::Int, Schema::Int) => Ok(()),
458 (Schema::Int, Schema::Long) => Ok(()),
459 (Schema::Int, Schema::Float) => Ok(()),
460 (Schema::Int, Schema::Double) => Ok(()),
461 (Schema::Long, Schema::Long) => Ok(()),
462 (Schema::Long, Schema::Float) => Ok(()),
463 (Schema::Long, Schema::Double) => Ok(()),
464 (Schema::Float, Schema::Float) => Ok(()),
465 (Schema::Float, Schema::Double) => Ok(()),
466 (Schema::Double, Schema::Double) => Ok(()),
467
468 (Schema::String, Schema::Bytes) => Ok(()),
470 (Schema::Bytes, Schema::String) => Ok(()),
471
472 (Schema::Array(w), Schema::Array(r)) => check_schema_resolution(&w.items, &r.items),
474
475 (Schema::Map(w), Schema::Map(r)) => check_schema_resolution(&w.types, &r.types),
477
478 (Schema::Enum(w), Schema::Enum(r)) => {
480 for symbol in &w.symbols {
481 if !r.symbols.contains(symbol) {
482 return Err(format!(
483 "Enum symbol '{}' in writer not found in reader",
484 symbol
485 ));
486 }
487 }
488 Ok(())
489 }
490
491 (Schema::Fixed(w), Schema::Fixed(r)) => {
493 if w.size != r.size {
494 return Err(format!(
495 "Fixed size mismatch: writer={}, reader={}",
496 w.size, r.size
497 ));
498 }
499 Ok(())
500 }
501
502 (Schema::Record(w), Schema::Record(r)) => {
504 for w_field in &w.fields {
506 let r_field = r.fields.iter().find(|rf| {
508 rf.name == w_field.name
509 || rf
510 .aliases
511 .as_ref()
512 .is_some_and(|a| a.contains(&w_field.name))
513 });
514
515 match r_field {
516 Some(rf) => {
517 check_schema_resolution(&w_field.schema, &rf.schema)?;
519 }
520 None => {
521 }
523 }
524 }
525
526 for r_field in &r.fields {
528 let w_field = w.fields.iter().find(|wf| {
529 wf.name == r_field.name
530 || r_field
531 .aliases
532 .as_ref()
533 .is_some_and(|a| a.contains(&wf.name))
534 });
535
536 if w_field.is_none() && r_field.default.is_none() {
537 return Err(format!(
538 "Reader field '{}' has no matching writer field and no default",
539 r_field.name
540 ));
541 }
542 }
543
544 Ok(())
545 }
546
547 (Schema::Union(w), Schema::Union(r)) => {
549 for w_variant in w.variants() {
550 let compatible = r
551 .variants()
552 .iter()
553 .any(|rv| check_schema_resolution(w_variant, rv).is_ok());
554 if !compatible {
555 return Err(format!(
556 "Writer union variant {:?} not compatible with any reader variant",
557 w_variant
558 ));
559 }
560 }
561 Ok(())
562 }
563
564 (w, Schema::Union(r)) => {
566 let compatible = r
567 .variants()
568 .iter()
569 .any(|rv| check_schema_resolution(w, rv).is_ok());
570 if compatible {
571 Ok(())
572 } else {
573 Err(format!(
574 "Writer schema {:?} not compatible with reader union",
575 w
576 ))
577 }
578 }
579
580 (Schema::Union(w), r) => {
582 for w_variant in w.variants() {
583 check_schema_resolution(w_variant, r)?;
584 }
585 Ok(())
586 }
587
588 (w, r) => Err(format!(
590 "Incompatible types: writer={:?}, reader={:?}",
591 w, r
592 )),
593 }
594}
595
596#[cfg(test)]
597mod tests {
598 use super::*;
599
600 #[test]
601 fn test_compatibility_level_properties() {
602 assert!(CompatibilityLevel::Backward.is_backward());
603 assert!(!CompatibilityLevel::Backward.is_forward());
604 assert!(!CompatibilityLevel::Backward.is_transitive());
605
606 assert!(!CompatibilityLevel::Forward.is_backward());
607 assert!(CompatibilityLevel::Forward.is_forward());
608
609 assert!(CompatibilityLevel::Full.is_backward());
610 assert!(CompatibilityLevel::Full.is_forward());
611
612 assert!(CompatibilityLevel::FullTransitive.is_transitive());
613 }
614
615 #[test]
616 fn test_compatibility_level_parse() {
617 assert_eq!(
618 "BACKWARD".parse::<CompatibilityLevel>().unwrap(),
619 CompatibilityLevel::Backward
620 );
621 assert_eq!(
622 "FULL_TRANSITIVE".parse::<CompatibilityLevel>().unwrap(),
623 CompatibilityLevel::FullTransitive
624 );
625 assert_eq!(
626 "NONE".parse::<CompatibilityLevel>().unwrap(),
627 CompatibilityLevel::None
628 );
629 }
630
631 #[test]
632 fn test_none_compatibility_always_passes() {
633 let checker = CompatibilityChecker::new(CompatibilityLevel::None);
634 let result = checker
635 .check(SchemaType::Avro, "{}", &["{\"invalid\"}"])
636 .unwrap();
637 assert!(result.is_compatible);
638 }
639
640 #[test]
641 fn test_empty_existing_schemas() {
642 let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
643 let result = checker.check(SchemaType::Avro, "{}", &[]).unwrap();
644 assert!(result.is_compatible);
645 }
646
647 #[cfg(feature = "avro")]
648 #[test]
649 fn test_backward_compatible_add_optional_field() {
650 let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
651
652 let v1 =
653 r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}]}"#;
654 let v2 = r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}, {"name": "name", "type": ["null", "string"], "default": null}]}"#;
655
656 let result = checker.check(SchemaType::Avro, v2, &[v1]).unwrap();
657 assert!(
658 result.is_compatible,
659 "Adding optional field should be backward compatible"
660 );
661 }
662
663 #[cfg(feature = "avro")]
664 #[test]
665 fn test_backward_incompatible_add_required_field() {
666 let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
667
668 let v1 =
671 r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}]}"#;
672 let v2 = r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}, {"name": "name", "type": "string"}]}"#;
673
674 let result = checker.check(SchemaType::Avro, v2, &[v1]).unwrap();
675 assert!(
676 !result.is_compatible,
677 "Adding required field (no default) should be backward incompatible"
678 );
679 }
680
681 #[cfg(feature = "avro")]
682 #[test]
683 fn test_numeric_type_promotion() {
684 let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
685
686 let v1 =
687 r#"{"type": "record", "name": "Data", "fields": [{"name": "value", "type": "int"}]}"#;
688 let v2 =
689 r#"{"type": "record", "name": "Data", "fields": [{"name": "value", "type": "long"}]}"#;
690
691 let result = checker.check(SchemaType::Avro, v2, &[v1]).unwrap();
692 assert!(
693 result.is_compatible,
694 "int -> long promotion should be backward compatible"
695 );
696 }
697
698 #[test]
700 fn test_json_backward_add_optional_field() {
701 let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
702
703 let old = r#"{"type":"object","properties":{"id":{"type":"integer"}},"required":["id"]}"#;
704 let new = r#"{"type":"object","properties":{"id":{"type":"integer"},"name":{"type":"string"}},"required":["id"]}"#;
705
706 let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
707 assert!(
708 result.is_compatible,
709 "Adding optional field should be backward compatible"
710 );
711 }
712
713 #[test]
714 fn test_json_backward_add_required_field() {
715 let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
716
717 let old = r#"{"type":"object","properties":{"id":{"type":"integer"}},"required":["id"]}"#;
718 let new = r#"{"type":"object","properties":{"id":{"type":"integer"},"name":{"type":"string"}},"required":["id","name"]}"#;
719
720 let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
721 assert!(
722 !result.is_compatible,
723 "Adding required field should NOT be backward compatible"
724 );
725 }
726
727 #[test]
728 fn test_json_forward_remove_required_field() {
729 let checker = CompatibilityChecker::new(CompatibilityLevel::Forward);
730
731 let old = r#"{"type":"object","properties":{"id":{"type":"integer"},"name":{"type":"string"}},"required":["id","name"]}"#;
732 let new = r#"{"type":"object","properties":{"id":{"type":"integer"}},"required":["id"]}"#;
733
734 let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
735 assert!(
736 !result.is_compatible,
737 "Removing required field should NOT be forward compatible"
738 );
739 }
740
741 #[test]
742 fn test_json_type_widening() {
743 let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
744
745 let old = r#"{"type":"object","properties":{"value":{"type":"integer"}}}"#;
746 let new = r#"{"type":"object","properties":{"value":{"type":"number"}}}"#;
747
748 let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
749 assert!(
750 result.is_compatible,
751 "integer -> number widening should be backward compatible"
752 );
753 }
754
755 #[test]
757 fn test_protobuf_field_number_reuse() {
758 let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
759
760 let old = r#"
761 message User {
762 int64 id = 1;
763 string name = 2;
764 }
765 "#;
766 let new = r#"
767 message User {
768 int64 id = 1;
769 string email = 2;
770 }
771 "#;
772
773 let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
774 assert!(
775 !result.is_compatible,
776 "Reusing field number with different name should be incompatible"
777 );
778 }
779
780 #[test]
781 fn test_protobuf_add_new_field() {
782 let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
783
784 let old = r#"
785 message User {
786 int64 id = 1;
787 }
788 "#;
789 let new = r#"
790 message User {
791 int64 id = 1;
792 string name = 2;
793 }
794 "#;
795
796 let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
797 assert!(
798 result.is_compatible,
799 "Adding new field with new number should be compatible: {:?}",
800 result.messages
801 );
802 }
803
804 #[test]
805 fn test_protobuf_reserved_field_reuse() {
806 let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
807
808 let old = r#"
809 message User {
810 int64 id = 1;
811 reserved 2, 3;
812 }
813 "#;
814 let new = r#"
815 message User {
816 int64 id = 1;
817 string name = 2;
818 }
819 "#;
820
821 let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
822 assert!(
823 !result.is_compatible,
824 "Reusing reserved field number should be incompatible"
825 );
826 }
827}