1use crate::error::{SchemaError, SchemaResult};
6use crate::types::SchemaType;
7use serde::{Deserialize, Serialize};
8
9#[cfg(not(feature = "protobuf"))]
10use std::sync::LazyLock;
11
12#[cfg(not(feature = "protobuf"))]
14static PROTO_FIELD_RE: LazyLock<regex::Regex> = LazyLock::new(|| {
15 regex::Regex::new(r"(?:(?:optional|repeated|required)\s+)?\w+\s+(\w+)\s*=\s*(\d+)").unwrap()
16});
17#[cfg(not(feature = "protobuf"))]
18static PROTO_RESERVED_RE: LazyLock<regex::Regex> =
19 LazyLock::new(|| regex::Regex::new(r"reserved\s+(\d+(?:,\s*\d+)*)").unwrap());
20#[cfg(not(feature = "protobuf"))]
21static PROTO_REQUIRED_RE: LazyLock<regex::Regex> =
22 LazyLock::new(|| regex::Regex::new(r"required\s+\w+\s+(\w+)").unwrap());
23
24pub use crate::types::CompatibilityLevel;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct CompatibilityResult {
30 pub is_compatible: bool,
32 #[serde(default, skip_serializing_if = "Vec::is_empty")]
34 pub messages: Vec<String>,
35}
36
37impl CompatibilityResult {
38 pub fn compatible() -> Self {
39 Self {
40 is_compatible: true,
41 messages: Vec::new(),
42 }
43 }
44
45 pub fn incompatible(messages: Vec<String>) -> Self {
46 Self {
47 is_compatible: false,
48 messages,
49 }
50 }
51}
52
53pub struct CompatibilityChecker {
55 level: CompatibilityLevel,
56}
57
58#[cfg(feature = "protobuf")]
62struct ProtoFieldInfo {
63 name: String,
64 type_desc: String,
66 is_required: bool,
68}
69
70#[cfg(feature = "protobuf")]
72struct ProtoMessageInfo {
73 fields: std::collections::HashMap<i32, ProtoFieldInfo>,
74 reserved_numbers: std::collections::HashSet<i32>,
75 reserved_names: std::collections::HashSet<String>,
76}
77
78#[cfg(feature = "protobuf")]
80struct ProtoEnumInfo {
81 values: std::collections::HashMap<i32, String>,
82}
83
84impl CompatibilityChecker {
85 pub fn new(level: CompatibilityLevel) -> Self {
86 Self { level }
87 }
88
89 pub fn check(
91 &self,
92 schema_type: SchemaType,
93 new_schema: &str,
94 existing_schemas: &[&str],
95 ) -> SchemaResult<CompatibilityResult> {
96 if self.level == CompatibilityLevel::None {
97 return Ok(CompatibilityResult::compatible());
98 }
99
100 if existing_schemas.is_empty() {
101 return Ok(CompatibilityResult::compatible());
102 }
103
104 match schema_type {
105 SchemaType::Avro => self.check_avro(new_schema, existing_schemas),
106 SchemaType::Json => self.check_json(new_schema, existing_schemas),
107 SchemaType::Protobuf => self.check_protobuf(new_schema, existing_schemas),
108 }
109 }
110
111 #[cfg(feature = "avro")]
118 fn check_avro(
119 &self,
120 new_schema: &str,
121 existing_schemas: &[&str],
122 ) -> SchemaResult<CompatibilityResult> {
123 use apache_avro::Schema;
124
125 let new = Schema::parse_str(new_schema)
126 .map_err(|e| SchemaError::ParseError(format!("New schema: {}", e)))?;
127
128 let schemas_to_check = if self.level.is_transitive() {
129 existing_schemas.to_vec()
130 } else {
131 existing_schemas
133 .last()
134 .map(|s| vec![*s])
135 .unwrap_or_default()
136 };
137
138 let mut errors = Vec::new();
139
140 for (i, existing_str) in schemas_to_check.iter().enumerate() {
141 let existing = Schema::parse_str(existing_str)
142 .map_err(|e| SchemaError::ParseError(format!("Existing schema {}: {}", i, e)))?;
143
144 if self.level.is_backward() {
146 if let Err(e) = check_schema_resolution(&existing, &new) {
147 errors.push(format!(
148 "Backward incompatible with version {}: {}",
149 i + 1,
150 e
151 ));
152 }
153 }
154
155 if self.level.is_forward() {
157 if let Err(e) = check_schema_resolution(&new, &existing) {
158 errors.push(format!(
159 "Forward incompatible with version {}: {}",
160 i + 1,
161 e
162 ));
163 }
164 }
165 }
166
167 if errors.is_empty() {
168 Ok(CompatibilityResult::compatible())
169 } else {
170 Ok(CompatibilityResult::incompatible(errors))
171 }
172 }
173
174 #[cfg(not(feature = "avro"))]
175 fn check_avro(
176 &self,
177 _new_schema: &str,
178 _existing_schemas: &[&str],
179 ) -> SchemaResult<CompatibilityResult> {
180 Err(SchemaError::Config("Avro support not enabled".to_string()))
181 }
182
183 fn check_json(
190 &self,
191 new_schema: &str,
192 existing_schemas: &[&str],
193 ) -> SchemaResult<CompatibilityResult> {
194 use serde_json::Value as JsonValue;
195
196 let new: JsonValue = serde_json::from_str(new_schema)
197 .map_err(|e| SchemaError::ParseError(format!("New JSON schema: {}", e)))?;
198
199 let schemas_to_check = if self.level.is_transitive() {
200 existing_schemas.to_vec()
201 } else {
202 existing_schemas
203 .last()
204 .map(|s| vec![*s])
205 .unwrap_or_default()
206 };
207
208 let mut messages = Vec::new();
209
210 for (i, existing_str) in schemas_to_check.iter().enumerate() {
211 let old: JsonValue = serde_json::from_str(existing_str)
212 .map_err(|e| SchemaError::ParseError(format!("Existing schema {}: {}", i, e)))?;
213
214 let result = self.check_json_pair(&new, &old)?;
215 if !result.is_compatible {
216 for msg in result.messages {
217 messages.push(format!("Version {}: {}", i + 1, msg));
218 }
219 }
220 }
221
222 if messages.is_empty() {
223 Ok(CompatibilityResult::compatible())
224 } else {
225 Ok(CompatibilityResult::incompatible(messages))
226 }
227 }
228
229 fn check_json_pair(
236 &self,
237 new: &serde_json::Value,
238 old: &serde_json::Value,
239 ) -> SchemaResult<CompatibilityResult> {
240 let mut messages = Vec::new();
241 self.check_json_recursive(new, old, "", &mut messages);
242
243 if messages.is_empty() {
244 Ok(CompatibilityResult::compatible())
245 } else {
246 Ok(CompatibilityResult::incompatible(messages))
247 }
248 }
249
250 fn check_json_recursive(
255 &self,
256 new: &serde_json::Value,
257 old: &serde_json::Value,
258 path: &str,
259 messages: &mut Vec<String>,
260 ) {
261 self.check_json_recursive_inner(new, old, path, messages, 0);
262 }
263
264 const MAX_JSON_DEPTH: usize = 64;
266
267 fn check_json_recursive_inner(
268 &self,
269 new: &serde_json::Value,
270 old: &serde_json::Value,
271 path: &str,
272 messages: &mut Vec<String>,
273 depth: usize,
274 ) {
275 if depth >= Self::MAX_JSON_DEPTH {
276 let prefix = if path.is_empty() {
277 String::new()
278 } else {
279 format!("{}: ", path)
280 };
281 messages.push(format!(
282 "{}schema nesting exceeds maximum depth ({})",
283 prefix,
284 Self::MAX_JSON_DEPTH
285 ));
286 return;
287 }
288 let prefix = if path.is_empty() {
289 String::new()
290 } else {
291 format!("{}: ", path)
292 };
293
294 if !self.json_types_compatible(old, new) && self.level.is_backward() {
296 messages.push(format!("{}BACKWARD incompatible: type changed", prefix));
297 }
298 if !self.json_types_compatible(new, old) && self.level.is_forward() {
299 messages.push(format!("{}FORWARD incompatible: type changed", prefix));
300 }
301
302 let new_props = new.get("properties").and_then(|p| p.as_object());
304 let old_props = old.get("properties").and_then(|p| p.as_object());
305
306 let new_required: Vec<&str> = new
307 .get("required")
308 .and_then(|r| r.as_array())
309 .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
310 .unwrap_or_default();
311 let old_required: Vec<&str> = old
312 .get("required")
313 .and_then(|r| r.as_array())
314 .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
315 .unwrap_or_default();
316
317 if let (Some(new_p), Some(old_p)) = (new_props, old_props) {
318 if self.level.is_backward() {
319 for field in &new_required {
321 if !old_required.contains(field) && !old_p.contains_key(*field) {
322 messages.push(format!(
323 "{}BACKWARD incompatible: new required field '{}'",
324 prefix, field
325 ));
326 }
327 }
328 for (name, old_def) in old_p {
330 let child_path = if path.is_empty() {
331 name.clone()
332 } else {
333 format!("{}.{}", path, name)
334 };
335 if let Some(new_def) = new_p.get(name) {
336 self.check_json_recursive_inner(
337 new_def,
338 old_def,
339 &child_path,
340 messages,
341 depth + 1,
342 );
343 }
344 }
345 }
346 if self.level.is_forward() {
347 for field in &old_required {
349 if !new_p.contains_key(*field) {
350 messages.push(format!(
351 "{}FORWARD incompatible: required field '{}' removed",
352 prefix, field
353 ));
354 }
355 }
356 for (name, old_def) in old_p {
358 let child_path = if path.is_empty() {
359 name.clone()
360 } else {
361 format!("{}.{}", path, name)
362 };
363 if let Some(new_def) = new_p.get(name) {
364 self.check_json_recursive_inner(
367 new_def,
368 old_def,
369 &child_path,
370 messages,
371 depth + 1,
372 );
373 }
374 }
375 }
376 }
377
378 if let (Some(new_ap), Some(old_ap)) = (
380 new.get("additionalProperties"),
381 old.get("additionalProperties"),
382 ) {
383 let new_allows = !matches!(new_ap, serde_json::Value::Bool(false));
385 let old_allows = !matches!(old_ap, serde_json::Value::Bool(false));
386 if self.level.is_backward() && !new_allows && old_allows {
387 messages.push(format!(
388 "{}BACKWARD incompatible: additionalProperties changed from allowed to disallowed",
389 prefix
390 ));
391 }
392 if self.level.is_forward() && new_allows && !old_allows {
393 messages.push(format!(
394 "{}FORWARD incompatible: additionalProperties changed from disallowed to allowed",
395 prefix
396 ));
397 }
398 if new_ap.is_object() && old_ap.is_object() {
400 let child_path = if path.is_empty() {
401 "additionalProperties".to_string()
402 } else {
403 format!("{}.additionalProperties", path)
404 };
405 self.check_json_recursive_inner(new_ap, old_ap, &child_path, messages, depth + 1);
406 }
407 }
408
409 if let (Some(new_items), Some(old_items)) = (new.get("items"), old.get("items")) {
411 let child_path = if path.is_empty() {
412 "items".to_string()
413 } else {
414 format!("{}[]", path)
415 };
416 self.check_json_recursive_inner(new_items, old_items, &child_path, messages, depth + 1);
417 }
418
419 if let (Some(new_enum), Some(old_enum)) = (
421 new.get("enum").and_then(|e| e.as_array()),
422 old.get("enum").and_then(|e| e.as_array()),
423 ) {
424 if self.level.is_backward() {
425 for val in old_enum {
427 if !new_enum.contains(val) {
428 messages.push(format!(
429 "{}BACKWARD incompatible: enum value {:?} removed",
430 prefix, val
431 ));
432 }
433 }
434 }
435 if self.level.is_forward() {
436 for val in new_enum {
438 if !old_enum.contains(val) {
439 messages.push(format!(
440 "{}FORWARD incompatible: new enum value {:?} added",
441 prefix, val
442 ));
443 }
444 }
445 }
446 }
447 }
448
449 fn json_types_compatible(
451 &self,
452 old_type: &serde_json::Value,
453 new_type: &serde_json::Value,
454 ) -> bool {
455 let old_t = old_type.get("type").and_then(|t| t.as_str());
456 let new_t = new_type.get("type").and_then(|t| t.as_str());
457
458 match (old_t, new_t) {
459 (Some(old), Some(new)) => {
460 if old == new {
462 return true;
463 }
464
465 if old == "integer" && new == "number" {
467 return true;
468 }
469
470 false
471 }
472 (None, None) => true,
474 _ => false,
475 }
476 }
477
478 #[cfg(feature = "protobuf")]
489 fn check_protobuf(
490 &self,
491 new_schema: &str,
492 existing_schemas: &[&str],
493 ) -> SchemaResult<CompatibilityResult> {
494 let schemas_to_check = if self.level.is_transitive() {
495 existing_schemas.to_vec()
496 } else {
497 existing_schemas
498 .last()
499 .map(|s| vec![*s])
500 .unwrap_or_default()
501 };
502
503 let mut messages = Vec::new();
504
505 for (i, existing_str) in schemas_to_check.iter().enumerate() {
506 let result = self.check_protobuf_pair(new_schema, existing_str)?;
507 if !result.is_compatible {
508 for msg in result.messages {
509 messages.push(format!("Version {}: {}", i + 1, msg));
510 }
511 }
512 }
513
514 if messages.is_empty() {
515 Ok(CompatibilityResult::compatible())
516 } else {
517 Ok(CompatibilityResult::incompatible(messages))
518 }
519 }
520
521 #[cfg(feature = "protobuf")]
531 fn check_protobuf_pair(
532 &self,
533 new_schema: &str,
534 existing_schema: &str,
535 ) -> SchemaResult<CompatibilityResult> {
536 use std::collections::HashMap;
537
538 let old_file = protox_parse::parse("existing.proto", existing_schema)
539 .map_err(|e| SchemaError::ParseError(format!("Existing protobuf schema: {e}")))?;
540 let new_file = protox_parse::parse("new.proto", new_schema)
541 .map_err(|e| SchemaError::ParseError(format!("New protobuf schema: {e}")))?;
542
543 let mut errors = Vec::new();
544
545 let mut old_msgs: HashMap<String, ProtoMessageInfo> = HashMap::new();
547 let mut old_enums: HashMap<String, ProtoEnumInfo> = HashMap::new();
548 Self::extract_messages_recursive(&old_file.message_type, "", &mut old_msgs, &mut old_enums);
549 Self::extract_enums(&old_file.enum_type, "", &mut old_enums);
550
551 let mut new_msgs: HashMap<String, ProtoMessageInfo> = HashMap::new();
552 let mut new_enums: HashMap<String, ProtoEnumInfo> = HashMap::new();
553 Self::extract_messages_recursive(&new_file.message_type, "", &mut new_msgs, &mut new_enums);
554 Self::extract_enums(&new_file.enum_type, "", &mut new_enums);
555
556 for (msg_name, old_msg) in &old_msgs {
558 if let Some(new_msg) = new_msgs.get(msg_name) {
559 for (num, old_field) in &old_msg.fields {
561 if let Some(new_field) = new_msg.fields.get(num) {
562 if old_field.name != new_field.name {
563 errors.push(format!(
564 "PROTOBUF incompatible: {}: field number {} reused (was '{}', now '{}')",
565 msg_name, num, old_field.name, new_field.name
566 ));
567 }
568 if old_field.type_desc != new_field.type_desc {
569 errors.push(format!(
570 "PROTOBUF incompatible: {}: field {} ('{}') type changed '{}' → '{}'",
571 msg_name, num, old_field.name, old_field.type_desc, new_field.type_desc
572 ));
573 }
574 }
575 }
576
577 for (num, field) in &new_msg.fields {
579 if old_msg.reserved_numbers.contains(num) {
580 errors.push(format!(
581 "PROTOBUF incompatible: {}: field '{}' uses reserved number {}",
582 msg_name, field.name, num
583 ));
584 }
585 }
586
587 for field in new_msg.fields.values() {
589 if old_msg.reserved_names.contains(&field.name) {
590 errors.push(format!(
591 "PROTOBUF incompatible: {}: field name '{}' is reserved",
592 msg_name, field.name
593 ));
594 }
595 }
596
597 for (num, field) in &old_msg.fields {
599 if field.is_required && !new_msg.fields.contains_key(num) {
600 errors.push(format!(
601 "PROTOBUF incompatible: {}: required field '{}' (number {}) removed",
602 msg_name, field.name, num
603 ));
604 }
605 }
606 }
607 }
608
609 for (enum_name, old_enum) in &old_enums {
611 if let Some(new_enum) = new_enums.get(enum_name) {
612 for (num, name) in &old_enum.values {
613 if !new_enum.values.contains_key(num) {
614 errors.push(format!(
615 "PROTOBUF incompatible: {}: enum value '{}' = {} removed",
616 enum_name, name, num
617 ));
618 }
619 }
620 }
621 }
622
623 if errors.is_empty() {
624 Ok(CompatibilityResult::compatible())
625 } else {
626 Ok(CompatibilityResult::incompatible(errors))
627 }
628 }
629
630 #[cfg(feature = "protobuf")]
635 fn extract_messages_recursive(
636 messages: &[prost_types::DescriptorProto],
637 prefix: &str,
638 out_msgs: &mut std::collections::HashMap<String, ProtoMessageInfo>,
639 out_enums: &mut std::collections::HashMap<String, ProtoEnumInfo>,
640 ) {
641 for msg in messages {
642 let name = msg.name.as_deref().unwrap_or("<unknown>");
643 let fqn = if prefix.is_empty() {
644 name.to_string()
645 } else {
646 format!("{}.{}", prefix, name)
647 };
648
649 let is_map_entry = msg
652 .options
653 .as_ref()
654 .and_then(|o| o.map_entry)
655 .unwrap_or(false);
656
657 if !is_map_entry {
658 let mut fields = std::collections::HashMap::new();
659 let mut reserved_numbers = std::collections::HashSet::new();
660 let mut reserved_names = std::collections::HashSet::new();
661
662 for field in &msg.field {
663 let field_name = field.name.clone().unwrap_or_default();
664 let field_number = field.number.unwrap_or(0);
665 let type_desc = Self::proto_type_description(field);
666 let is_required = field.label == Some(2); fields.insert(
669 field_number,
670 ProtoFieldInfo {
671 name: field_name,
672 type_desc,
673 is_required,
674 },
675 );
676 }
677
678 for range in &msg.reserved_range {
680 let start = range.start.unwrap_or(0);
681 let end = range.end.unwrap_or(start); for num in start..end {
683 reserved_numbers.insert(num);
684 }
685 }
686
687 for name in &msg.reserved_name {
689 reserved_names.insert(name.clone());
690 }
691
692 out_msgs.insert(
693 fqn.clone(),
694 ProtoMessageInfo {
695 fields,
696 reserved_numbers,
697 reserved_names,
698 },
699 );
700 }
701
702 Self::extract_messages_recursive(&msg.nested_type, &fqn, out_msgs, out_enums);
704
705 Self::extract_enums(&msg.enum_type, &fqn, out_enums);
707 }
708 }
709
710 #[cfg(feature = "protobuf")]
712 fn extract_enums(
713 enums: &[prost_types::EnumDescriptorProto],
714 prefix: &str,
715 out: &mut std::collections::HashMap<String, ProtoEnumInfo>,
716 ) {
717 for e in enums {
718 let name = e.name.as_deref().unwrap_or("<unknown>");
719 let fqn = if prefix.is_empty() {
720 name.to_string()
721 } else {
722 format!("{}.{}", prefix, name)
723 };
724
725 let mut values = std::collections::HashMap::new();
726 for v in &e.value {
727 let val_name = v.name.clone().unwrap_or_default();
728 let val_number = v.number.unwrap_or(0);
729 values.insert(val_number, val_name);
730 }
731
732 out.insert(fqn, ProtoEnumInfo { values });
733 }
734 }
735
736 #[cfg(feature = "protobuf")]
741 fn proto_type_description(field: &prost_types::FieldDescriptorProto) -> String {
742 let type_val = field.r#type.unwrap_or(0);
743 match type_val {
744 1 => "double".into(),
745 2 => "float".into(),
746 3 => "int64".into(),
747 4 => "uint64".into(),
748 5 => "int32".into(),
749 6 => "fixed64".into(),
750 7 => "fixed32".into(),
751 8 => "bool".into(),
752 9 => "string".into(),
753 10 => "group".into(),
754 11 | 14 => {
755 field
757 .type_name
758 .clone()
759 .unwrap_or_else(|| format!("type_{}", type_val))
760 }
761 12 => "bytes".into(),
762 13 => "uint32".into(),
763 15 => "sfixed32".into(),
764 16 => "sfixed64".into(),
765 17 => "sint32".into(),
766 18 => "sint64".into(),
767 _ => format!("unknown_{}", type_val),
768 }
769 }
770
771 #[cfg(not(feature = "protobuf"))]
780 fn check_protobuf(
781 &self,
782 new_schema: &str,
783 existing_schemas: &[&str],
784 ) -> SchemaResult<CompatibilityResult> {
785 let schemas_to_check = if self.level.is_transitive() {
786 existing_schemas.to_vec()
787 } else {
788 existing_schemas
789 .last()
790 .map(|s| vec![*s])
791 .unwrap_or_default()
792 };
793
794 let mut messages = Vec::new();
795
796 for (i, existing_str) in schemas_to_check.iter().enumerate() {
797 let result = self.check_protobuf_pair(new_schema, existing_str)?;
798 if !result.is_compatible {
799 for msg in result.messages {
800 messages.push(format!("Version {}: {}", i + 1, msg));
801 }
802 }
803 }
804
805 if messages.is_empty() {
806 Ok(CompatibilityResult::compatible())
807 } else {
808 Ok(CompatibilityResult::incompatible(messages))
809 }
810 }
811
812 #[cfg(not(feature = "protobuf"))]
814 fn check_protobuf_pair(
815 &self,
816 new_schema: &str,
817 existing_schema: &str,
818 ) -> SchemaResult<CompatibilityResult> {
819 use std::collections::{HashMap, HashSet};
820
821 let mut messages = Vec::new();
822
823 let strip_comments = |s: &str| -> String {
826 let mut result = String::with_capacity(s.len());
827 let mut chars = s.chars().peekable();
828 while let Some(c) = chars.next() {
829 if c == '/' {
830 if chars.peek() == Some(&'/') {
831 for ch in chars.by_ref() {
833 if ch == '\n' {
834 result.push('\n');
835 break;
836 }
837 }
838 continue;
839 } else if chars.peek() == Some(&'*') {
840 chars.next(); loop {
843 match chars.next() {
844 Some('*') if chars.peek() == Some(&'/') => {
845 chars.next();
846 break;
847 }
848 Some(_) => continue,
849 None => break,
850 }
851 }
852 continue;
853 }
854 }
855 result.push(c);
856 }
857 result
858 };
859
860 let clean_existing = strip_comments(existing_schema);
861 let clean_new = strip_comments(new_schema);
862
863 let field_pattern = &*PROTO_FIELD_RE;
868
869 let old_fields: HashMap<u32, String> = field_pattern
870 .captures_iter(&clean_existing)
871 .filter_map(|cap| {
872 let name = cap.get(1)?.as_str().to_string();
873 let num: u32 = cap.get(2)?.as_str().parse().ok()?;
874 Some((num, name))
875 })
876 .collect();
877
878 let new_fields: HashMap<u32, String> = field_pattern
879 .captures_iter(&clean_new)
880 .filter_map(|cap| {
881 let name = cap.get(1)?.as_str().to_string();
882 let num: u32 = cap.get(2)?.as_str().parse().ok()?;
883 Some((num, name))
884 })
885 .collect();
886
887 for (num, old_name) in &old_fields {
889 if let Some(new_name) = new_fields.get(num) {
890 if old_name != new_name {
891 messages.push(format!(
892 "PROTOBUF incompatible: field number {} reused (was '{}', now '{}')",
893 num, old_name, new_name
894 ));
895 }
896 }
897 }
898
899 let reserved_pattern = &*PROTO_RESERVED_RE;
901
902 let old_reserved: HashSet<u32> = reserved_pattern
903 .captures_iter(&clean_existing)
904 .flat_map(|cap| {
905 cap.get(1)
906 .map(|m| {
907 m.as_str()
908 .split(',')
909 .filter_map(|n| n.trim().parse().ok())
910 .collect::<Vec<u32>>()
911 })
912 .unwrap_or_default()
913 })
914 .collect();
915
916 for (num, name) in &new_fields {
917 if old_reserved.contains(num) {
918 messages.push(format!(
919 "PROTOBUF incompatible: field '{}' uses reserved number {}",
920 name, num
921 ));
922 }
923 }
924
925 let required_pattern = &*PROTO_REQUIRED_RE;
927
928 let old_required: HashSet<&str> = required_pattern
929 .captures_iter(&clean_existing)
930 .filter_map(|cap| cap.get(1).map(|m| m.as_str()))
931 .collect();
932
933 for required_name in old_required {
934 if !new_fields.values().any(|n| n == required_name) {
935 messages.push(format!(
936 "PROTOBUF incompatible: required field '{}' removed",
937 required_name
938 ));
939 }
940 }
941
942 if messages.is_empty() {
943 Ok(CompatibilityResult::compatible())
944 } else {
945 Ok(CompatibilityResult::incompatible(messages))
946 }
947 }
948}
949
950#[cfg(feature = "avro")]
957fn check_schema_resolution(
958 writer: &apache_avro::Schema,
959 reader: &apache_avro::Schema,
960) -> Result<(), String> {
961 use apache_avro::Schema;
962
963 match (writer, reader) {
964 (Schema::Null, Schema::Null) => Ok(()),
966 (Schema::Boolean, Schema::Boolean) => Ok(()),
967 (Schema::String, Schema::String) => Ok(()),
968 (Schema::Bytes, Schema::Bytes) => Ok(()),
969
970 (Schema::Int, Schema::Int) => Ok(()),
972 (Schema::Int, Schema::Long) => Ok(()),
973 (Schema::Int, Schema::Float) => Ok(()),
974 (Schema::Int, Schema::Double) => Ok(()),
975 (Schema::Long, Schema::Long) => Ok(()),
976 (Schema::Long, Schema::Float) => Ok(()),
977 (Schema::Long, Schema::Double) => Ok(()),
978 (Schema::Float, Schema::Float) => Ok(()),
979 (Schema::Float, Schema::Double) => Ok(()),
980 (Schema::Double, Schema::Double) => Ok(()),
981
982 (Schema::String, Schema::Bytes) => Ok(()),
984 (Schema::Bytes, Schema::String) => Ok(()),
985
986 (Schema::Array(w), Schema::Array(r)) => check_schema_resolution(&w.items, &r.items),
988
989 (Schema::Map(w), Schema::Map(r)) => check_schema_resolution(&w.types, &r.types),
991
992 (Schema::Enum(w), Schema::Enum(r)) => {
994 for symbol in &w.symbols {
995 if !r.symbols.contains(symbol) {
996 return Err(format!(
997 "Enum symbol '{}' in writer not found in reader",
998 symbol
999 ));
1000 }
1001 }
1002 Ok(())
1003 }
1004
1005 (Schema::Fixed(w), Schema::Fixed(r)) => {
1007 if w.size != r.size {
1008 return Err(format!(
1009 "Fixed size mismatch: writer={}, reader={}",
1010 w.size, r.size
1011 ));
1012 }
1013 Ok(())
1014 }
1015
1016 (Schema::Record(w), Schema::Record(r)) => {
1018 for w_field in &w.fields {
1020 let r_field = r.fields.iter().find(|rf| {
1022 rf.name == w_field.name
1023 || rf
1024 .aliases
1025 .as_ref()
1026 .is_some_and(|a| a.contains(&w_field.name))
1027 });
1028
1029 match r_field {
1030 Some(rf) => {
1031 check_schema_resolution(&w_field.schema, &rf.schema)?;
1033 }
1034 None => {
1035 }
1037 }
1038 }
1039
1040 for r_field in &r.fields {
1042 let w_field = w.fields.iter().find(|wf| {
1043 wf.name == r_field.name
1044 || r_field
1045 .aliases
1046 .as_ref()
1047 .is_some_and(|a| a.contains(&wf.name))
1048 });
1049
1050 if w_field.is_none() && r_field.default.is_none() {
1051 return Err(format!(
1052 "Reader field '{}' has no matching writer field and no default",
1053 r_field.name
1054 ));
1055 }
1056 }
1057
1058 Ok(())
1059 }
1060
1061 (Schema::Union(w), Schema::Union(r)) => {
1063 for w_variant in w.variants() {
1064 let compatible = r
1065 .variants()
1066 .iter()
1067 .any(|rv| check_schema_resolution(w_variant, rv).is_ok());
1068 if !compatible {
1069 return Err(format!(
1070 "Writer union variant {:?} not compatible with any reader variant",
1071 w_variant
1072 ));
1073 }
1074 }
1075 Ok(())
1076 }
1077
1078 (w, Schema::Union(r)) => {
1080 let compatible = r
1081 .variants()
1082 .iter()
1083 .any(|rv| check_schema_resolution(w, rv).is_ok());
1084 if compatible {
1085 Ok(())
1086 } else {
1087 Err(format!(
1088 "Writer schema {:?} not compatible with reader union",
1089 w
1090 ))
1091 }
1092 }
1093
1094 (Schema::Union(w), r) => {
1097 let compatible = w
1098 .variants()
1099 .iter()
1100 .any(|wv| check_schema_resolution(wv, r).is_ok());
1101 if compatible {
1102 Ok(())
1103 } else {
1104 Err(format!(
1105 "No writer union variant compatible with reader {:?}",
1106 r
1107 ))
1108 }
1109 }
1110
1111 (w, r) => Err(format!(
1113 "Incompatible types: writer={:?}, reader={:?}",
1114 w, r
1115 )),
1116 }
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121 use super::*;
1122
1123 #[test]
1124 fn test_compatibility_level_properties() {
1125 assert!(CompatibilityLevel::Backward.is_backward());
1126 assert!(!CompatibilityLevel::Backward.is_forward());
1127 assert!(!CompatibilityLevel::Backward.is_transitive());
1128
1129 assert!(!CompatibilityLevel::Forward.is_backward());
1130 assert!(CompatibilityLevel::Forward.is_forward());
1131
1132 assert!(CompatibilityLevel::Full.is_backward());
1133 assert!(CompatibilityLevel::Full.is_forward());
1134
1135 assert!(CompatibilityLevel::FullTransitive.is_transitive());
1136 }
1137
1138 #[test]
1139 fn test_compatibility_level_parse() {
1140 assert_eq!(
1141 "BACKWARD".parse::<CompatibilityLevel>().unwrap(),
1142 CompatibilityLevel::Backward
1143 );
1144 assert_eq!(
1145 "FULL_TRANSITIVE".parse::<CompatibilityLevel>().unwrap(),
1146 CompatibilityLevel::FullTransitive
1147 );
1148 assert_eq!(
1149 "NONE".parse::<CompatibilityLevel>().unwrap(),
1150 CompatibilityLevel::None
1151 );
1152 }
1153
1154 #[test]
1155 fn test_none_compatibility_always_passes() {
1156 let checker = CompatibilityChecker::new(CompatibilityLevel::None);
1157 let result = checker
1158 .check(SchemaType::Avro, "{}", &["{\"invalid\"}"])
1159 .unwrap();
1160 assert!(result.is_compatible);
1161 }
1162
1163 #[test]
1164 fn test_empty_existing_schemas() {
1165 let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
1166 let result = checker.check(SchemaType::Avro, "{}", &[]).unwrap();
1167 assert!(result.is_compatible);
1168 }
1169
1170 #[cfg(feature = "avro")]
1171 #[test]
1172 fn test_backward_compatible_add_optional_field() {
1173 let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
1174
1175 let v1 =
1176 r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}]}"#;
1177 let v2 = r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}, {"name": "name", "type": ["null", "string"], "default": null}]}"#;
1178
1179 let result = checker.check(SchemaType::Avro, v2, &[v1]).unwrap();
1180 assert!(
1181 result.is_compatible,
1182 "Adding optional field should be backward compatible"
1183 );
1184 }
1185
1186 #[cfg(feature = "avro")]
1187 #[test]
1188 fn test_backward_incompatible_add_required_field() {
1189 let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
1190
1191 let v1 =
1194 r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}]}"#;
1195 let v2 = r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}, {"name": "name", "type": "string"}]}"#;
1196
1197 let result = checker.check(SchemaType::Avro, v2, &[v1]).unwrap();
1198 assert!(
1199 !result.is_compatible,
1200 "Adding required field (no default) should be backward incompatible"
1201 );
1202 }
1203
1204 #[cfg(feature = "avro")]
1205 #[test]
1206 fn test_numeric_type_promotion() {
1207 let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
1208
1209 let v1 =
1210 r#"{"type": "record", "name": "Data", "fields": [{"name": "value", "type": "int"}]}"#;
1211 let v2 =
1212 r#"{"type": "record", "name": "Data", "fields": [{"name": "value", "type": "long"}]}"#;
1213
1214 let result = checker.check(SchemaType::Avro, v2, &[v1]).unwrap();
1215 assert!(
1216 result.is_compatible,
1217 "int -> long promotion should be backward compatible"
1218 );
1219 }
1220
1221 #[test]
1223 fn test_json_backward_add_optional_field() {
1224 let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
1225
1226 let old = r#"{"type":"object","properties":{"id":{"type":"integer"}},"required":["id"]}"#;
1227 let new = r#"{"type":"object","properties":{"id":{"type":"integer"},"name":{"type":"string"}},"required":["id"]}"#;
1228
1229 let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
1230 assert!(
1231 result.is_compatible,
1232 "Adding optional field should be backward compatible"
1233 );
1234 }
1235
1236 #[test]
1237 fn test_json_backward_add_required_field() {
1238 let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
1239
1240 let old = r#"{"type":"object","properties":{"id":{"type":"integer"}},"required":["id"]}"#;
1241 let new = r#"{"type":"object","properties":{"id":{"type":"integer"},"name":{"type":"string"}},"required":["id","name"]}"#;
1242
1243 let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
1244 assert!(
1245 !result.is_compatible,
1246 "Adding required field should NOT be backward compatible"
1247 );
1248 }
1249
1250 #[test]
1251 fn test_json_forward_remove_required_field() {
1252 let checker = CompatibilityChecker::new(CompatibilityLevel::Forward);
1253
1254 let old = r#"{"type":"object","properties":{"id":{"type":"integer"},"name":{"type":"string"}},"required":["id","name"]}"#;
1255 let new = r#"{"type":"object","properties":{"id":{"type":"integer"}},"required":["id"]}"#;
1256
1257 let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
1258 assert!(
1259 !result.is_compatible,
1260 "Removing required field should NOT be forward compatible"
1261 );
1262 }
1263
1264 #[test]
1265 fn test_json_type_widening() {
1266 let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
1267
1268 let old = r#"{"type":"object","properties":{"value":{"type":"integer"}}}"#;
1269 let new = r#"{"type":"object","properties":{"value":{"type":"number"}}}"#;
1270
1271 let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
1272 assert!(
1273 result.is_compatible,
1274 "integer -> number widening should be backward compatible"
1275 );
1276 }
1277
1278 #[test]
1280 fn test_protobuf_field_number_reuse() {
1281 let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1282
1283 let old = r#"
1284 syntax = "proto3";
1285 message User {
1286 int64 id = 1;
1287 string name = 2;
1288 }
1289 "#;
1290 let new = r#"
1291 syntax = "proto3";
1292 message User {
1293 int64 id = 1;
1294 string email = 2;
1295 }
1296 "#;
1297
1298 let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1299 assert!(
1300 !result.is_compatible,
1301 "Reusing field number with different name should be incompatible"
1302 );
1303 }
1304
1305 #[test]
1306 fn test_protobuf_add_new_field() {
1307 let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1308
1309 let old = r#"
1310 syntax = "proto3";
1311 message User {
1312 int64 id = 1;
1313 }
1314 "#;
1315 let new = r#"
1316 syntax = "proto3";
1317 message User {
1318 int64 id = 1;
1319 string name = 2;
1320 }
1321 "#;
1322
1323 let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1324 assert!(
1325 result.is_compatible,
1326 "Adding new field with new number should be compatible: {:?}",
1327 result.messages
1328 );
1329 }
1330
1331 #[test]
1332 fn test_protobuf_reserved_field_reuse() {
1333 let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1334
1335 let old = r#"
1336 syntax = "proto3";
1337 message User {
1338 int64 id = 1;
1339 reserved 2, 3;
1340 }
1341 "#;
1342 let new = r#"
1343 syntax = "proto3";
1344 message User {
1345 int64 id = 1;
1346 string name = 2;
1347 }
1348 "#;
1349
1350 let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1351 assert!(
1352 !result.is_compatible,
1353 "Reusing reserved field number should be incompatible"
1354 );
1355 }
1356
1357 #[test]
1358 fn test_protobuf_field_type_change() {
1359 let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1360
1361 let old = r#"
1362 syntax = "proto3";
1363 message Event {
1364 int32 id = 1;
1365 string payload = 2;
1366 }
1367 "#;
1368 let new = r#"
1369 syntax = "proto3";
1370 message Event {
1371 int32 id = 1;
1372 bytes payload = 2;
1373 }
1374 "#;
1375
1376 let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1377 assert!(
1378 !result.is_compatible,
1379 "Changing field type should be incompatible: {:?}",
1380 result.messages
1381 );
1382 }
1383
1384 #[test]
1385 fn test_protobuf_nested_message() {
1386 let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1387
1388 let old = r#"
1389 syntax = "proto3";
1390 message Outer {
1391 int32 id = 1;
1392 message Inner {
1393 string value = 1;
1394 }
1395 Inner data = 2;
1396 }
1397 "#;
1398 let new = r#"
1399 syntax = "proto3";
1400 message Outer {
1401 int32 id = 1;
1402 message Inner {
1403 int32 value = 1;
1404 }
1405 Inner data = 2;
1406 }
1407 "#;
1408
1409 let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1410 assert!(
1411 !result.is_compatible,
1412 "Type change in nested message should be detected: {:?}",
1413 result.messages
1414 );
1415 }
1416
1417 #[test]
1418 fn test_protobuf_reserved_range() {
1419 let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1420
1421 let old = r#"
1422 syntax = "proto3";
1423 message Data {
1424 int32 id = 1;
1425 reserved 5 to 10;
1426 }
1427 "#;
1428 let new = r#"
1429 syntax = "proto3";
1430 message Data {
1431 int32 id = 1;
1432 string tag = 7;
1433 }
1434 "#;
1435
1436 let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1437 assert!(
1438 !result.is_compatible,
1439 "Using field number from reserved range should be incompatible"
1440 );
1441 }
1442
1443 #[test]
1444 fn test_protobuf_reserved_name() {
1445 let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1446
1447 let old = r#"
1448 syntax = "proto3";
1449 message Data {
1450 int32 id = 1;
1451 reserved "old_field";
1452 }
1453 "#;
1454 let new = r#"
1455 syntax = "proto3";
1456 message Data {
1457 int32 id = 1;
1458 string old_field = 2;
1459 }
1460 "#;
1461
1462 let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1463 assert!(
1464 !result.is_compatible,
1465 "Reusing reserved field name should be incompatible"
1466 );
1467 }
1468
1469 #[test]
1470 fn test_protobuf_enum_value_removal() {
1471 let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1472
1473 let old = r#"
1474 syntax = "proto3";
1475 enum Status {
1476 UNKNOWN = 0;
1477 ACTIVE = 1;
1478 INACTIVE = 2;
1479 }
1480 "#;
1481 let new = r#"
1482 syntax = "proto3";
1483 enum Status {
1484 UNKNOWN = 0;
1485 ACTIVE = 1;
1486 }
1487 "#;
1488
1489 let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1490 assert!(
1491 !result.is_compatible,
1492 "Removing enum value should be incompatible"
1493 );
1494 }
1495
1496 #[test]
1497 fn test_protobuf_enum_value_addition() {
1498 let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1499
1500 let old = r#"
1501 syntax = "proto3";
1502 enum Status {
1503 UNKNOWN = 0;
1504 ACTIVE = 1;
1505 }
1506 "#;
1507 let new = r#"
1508 syntax = "proto3";
1509 enum Status {
1510 UNKNOWN = 0;
1511 ACTIVE = 1;
1512 INACTIVE = 2;
1513 }
1514 "#;
1515
1516 let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1517 assert!(
1518 result.is_compatible,
1519 "Adding enum value should be compatible: {:?}",
1520 result.messages
1521 );
1522 }
1523
1524 #[test]
1525 fn test_protobuf_map_field() {
1526 let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1527
1528 let old = r#"
1529 syntax = "proto3";
1530 message Config {
1531 int32 id = 1;
1532 map<string, string> labels = 2;
1533 }
1534 "#;
1535 let new = r#"
1536 syntax = "proto3";
1537 message Config {
1538 int32 id = 1;
1539 map<string, string> labels = 2;
1540 string name = 3;
1541 }
1542 "#;
1543
1544 let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1545 assert!(
1546 result.is_compatible,
1547 "Adding a new field alongside a map field should be compatible: {:?}",
1548 result.messages
1549 );
1550 }
1551
1552 #[test]
1553 fn test_protobuf_proto2_required_field_removal() {
1554 let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1555
1556 let old = r#"
1557 syntax = "proto2";
1558 message User {
1559 required int64 id = 1;
1560 required string name = 2;
1561 }
1562 "#;
1563 let new = r#"
1564 syntax = "proto2";
1565 message User {
1566 required int64 id = 1;
1567 }
1568 "#;
1569
1570 let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1571 assert!(
1572 !result.is_compatible,
1573 "Removing required field in proto2 should be incompatible"
1574 );
1575 }
1576
1577 #[test]
1578 fn test_protobuf_compatible_evolution() {
1579 let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1580
1581 let old = r#"
1582 syntax = "proto3";
1583 message User {
1584 int64 id = 1;
1585 string name = 2;
1586 }
1587 "#;
1588 let new = r#"
1589 syntax = "proto3";
1590 message User {
1591 int64 id = 1;
1592 string name = 2;
1593 string email = 3;
1594 int32 age = 4;
1595 }
1596 "#;
1597
1598 let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1599 assert!(
1600 result.is_compatible,
1601 "Adding new fields with new numbers should be fully compatible: {:?}",
1602 result.messages
1603 );
1604 }
1605}