Skip to main content

rivven_schema/
compatibility.rs

1//! Schema compatibility checking
2//!
3//! Supports Confluent-compatible compatibility levels for schema evolution.
4
5use crate::error::{SchemaError, SchemaResult};
6use crate::types::SchemaType;
7use serde::{Deserialize, Serialize};
8
9// Re-export CompatibilityLevel from types for backward compatibility
10pub use crate::types::CompatibilityLevel;
11
12/// Result of a compatibility check
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct CompatibilityResult {
15    /// Whether the schemas are compatible
16    pub is_compatible: bool,
17    /// Compatibility error messages (if any)
18    #[serde(default, skip_serializing_if = "Vec::is_empty")]
19    pub messages: Vec<String>,
20}
21
22impl CompatibilityResult {
23    pub fn compatible() -> Self {
24        Self {
25            is_compatible: true,
26            messages: Vec::new(),
27        }
28    }
29
30    pub fn incompatible(messages: Vec<String>) -> Self {
31        Self {
32            is_compatible: false,
33            messages,
34        }
35    }
36}
37
38/// Schema compatibility checker
39pub struct CompatibilityChecker {
40    level: CompatibilityLevel,
41}
42
43impl CompatibilityChecker {
44    pub fn new(level: CompatibilityLevel) -> Self {
45        Self { level }
46    }
47
48    /// Check if a new schema is compatible with existing schemas
49    pub fn check(
50        &self,
51        schema_type: SchemaType,
52        new_schema: &str,
53        existing_schemas: &[&str],
54    ) -> SchemaResult<CompatibilityResult> {
55        if self.level == CompatibilityLevel::None {
56            return Ok(CompatibilityResult::compatible());
57        }
58
59        if existing_schemas.is_empty() {
60            return Ok(CompatibilityResult::compatible());
61        }
62
63        match schema_type {
64            SchemaType::Avro => self.check_avro(new_schema, existing_schemas),
65            SchemaType::Json => self.check_json(new_schema, existing_schemas),
66            SchemaType::Protobuf => self.check_protobuf(new_schema, existing_schemas),
67        }
68    }
69
70    /// Check Avro schema compatibility
71    ///
72    /// Uses Apache Avro schema resolution rules:
73    /// - BACKWARD: Reader (new) can read data written with Writer (old)
74    /// - FORWARD: Reader (old) can read data written with Writer (new)
75    /// - FULL: Both directions work
76    #[cfg(feature = "avro")]
77    fn check_avro(
78        &self,
79        new_schema: &str,
80        existing_schemas: &[&str],
81    ) -> SchemaResult<CompatibilityResult> {
82        use apache_avro::Schema;
83
84        let new = Schema::parse_str(new_schema)
85            .map_err(|e| SchemaError::ParseError(format!("New schema: {}", e)))?;
86
87        let schemas_to_check = if self.level.is_transitive() {
88            existing_schemas.to_vec()
89        } else {
90            // Only check against the latest
91            existing_schemas
92                .last()
93                .map(|s| vec![*s])
94                .unwrap_or_default()
95        };
96
97        let mut errors = Vec::new();
98
99        for (i, existing_str) in schemas_to_check.iter().enumerate() {
100            let existing = Schema::parse_str(existing_str)
101                .map_err(|e| SchemaError::ParseError(format!("Existing schema {}: {}", i, e)))?;
102
103            // Check backward compatibility: new schema (reader) can read data written with old schema (writer)
104            if self.level.is_backward() {
105                if let Err(e) = check_schema_resolution(&existing, &new) {
106                    errors.push(format!(
107                        "Backward incompatible with version {}: {}",
108                        i + 1,
109                        e
110                    ));
111                }
112            }
113
114            // Check forward compatibility: old schema (reader) can read data written with new schema (writer)
115            if self.level.is_forward() {
116                if let Err(e) = check_schema_resolution(&new, &existing) {
117                    errors.push(format!(
118                        "Forward incompatible with version {}: {}",
119                        i + 1,
120                        e
121                    ));
122                }
123            }
124        }
125
126        if errors.is_empty() {
127            Ok(CompatibilityResult::compatible())
128        } else {
129            Ok(CompatibilityResult::incompatible(errors))
130        }
131    }
132
133    #[cfg(not(feature = "avro"))]
134    fn check_avro(
135        &self,
136        _new_schema: &str,
137        _existing_schemas: &[&str],
138    ) -> SchemaResult<CompatibilityResult> {
139        Err(SchemaError::Config("Avro support not enabled".to_string()))
140    }
141
142    /// Check JSON Schema compatibility
143    ///
144    /// Implements JSON Schema evolution rules:
145    /// - BACKWARD: New schema can read old data (adding required fields is incompatible)
146    /// - FORWARD: Old schema can read new data (removing required fields is incompatible)
147    /// - FULL: Both directions must be compatible
148    fn check_json(
149        &self,
150        new_schema: &str,
151        existing_schemas: &[&str],
152    ) -> SchemaResult<CompatibilityResult> {
153        use serde_json::Value as JsonValue;
154
155        let new: JsonValue = serde_json::from_str(new_schema)
156            .map_err(|e| SchemaError::ParseError(format!("New JSON schema: {}", e)))?;
157
158        let schemas_to_check = if self.level.is_transitive() {
159            existing_schemas.to_vec()
160        } else {
161            existing_schemas
162                .last()
163                .map(|s| vec![*s])
164                .unwrap_or_default()
165        };
166
167        let mut messages = Vec::new();
168
169        for (i, existing_str) in schemas_to_check.iter().enumerate() {
170            let old: JsonValue = serde_json::from_str(existing_str)
171                .map_err(|e| SchemaError::ParseError(format!("Existing schema {}: {}", i, e)))?;
172
173            let result = self.check_json_pair(&new, &old)?;
174            if !result.is_compatible {
175                for msg in result.messages {
176                    messages.push(format!("Version {}: {}", i + 1, msg));
177                }
178            }
179        }
180
181        if messages.is_empty() {
182            Ok(CompatibilityResult::compatible())
183        } else {
184            Ok(CompatibilityResult::incompatible(messages))
185        }
186    }
187
188    /// Check compatibility between two JSON schemas
189    fn check_json_pair(
190        &self,
191        new: &serde_json::Value,
192        old: &serde_json::Value,
193    ) -> SchemaResult<CompatibilityResult> {
194        let mut messages = Vec::new();
195
196        let new_props = new.get("properties").and_then(|p| p.as_object());
197        let old_props = old.get("properties").and_then(|p| p.as_object());
198
199        let new_required: Vec<&str> = new
200            .get("required")
201            .and_then(|r| r.as_array())
202            .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
203            .unwrap_or_default();
204        let old_required: Vec<&str> = old
205            .get("required")
206            .and_then(|r| r.as_array())
207            .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
208            .unwrap_or_default();
209
210        if let (Some(new_p), Some(old_p)) = (new_props, old_props) {
211            // Check backward compatibility
212            if self.level.is_backward() {
213                // New schema must be able to read old data
214                // Cannot add required fields that didn't exist in old schema
215                for field in &new_required {
216                    if !old_required.contains(field) && !old_p.contains_key(*field) {
217                        messages.push(format!(
218                            "BACKWARD incompatible: new required field '{}' not in old schema",
219                            field
220                        ));
221                    }
222                }
223
224                // Check field type changes
225                for (name, old_def) in old_p {
226                    if let Some(new_def) = new_p.get(name) {
227                        if !self.json_types_compatible(old_def, new_def) {
228                            messages.push(format!(
229                                "BACKWARD incompatible: field '{}' type changed incompatibly",
230                                name
231                            ));
232                        }
233                    }
234                }
235            }
236
237            // Check forward compatibility
238            if self.level.is_forward() {
239                // Old schema must be able to read new data
240                // Cannot remove required fields
241                for field in &old_required {
242                    if !new_p.contains_key(*field) {
243                        messages.push(format!(
244                            "FORWARD incompatible: required field '{}' removed from new schema",
245                            field
246                        ));
247                    }
248                }
249
250                // Check field type changes (reverse direction)
251                for (name, old_def) in old_p {
252                    if let Some(new_def) = new_p.get(name) {
253                        if !self.json_types_compatible(new_def, old_def) {
254                            messages.push(format!(
255                                "FORWARD incompatible: field '{}' type changed incompatibly",
256                                name
257                            ));
258                        }
259                    }
260                }
261            }
262        }
263
264        if messages.is_empty() {
265            Ok(CompatibilityResult::compatible())
266        } else {
267            Ok(CompatibilityResult::incompatible(messages))
268        }
269    }
270
271    /// Check if JSON Schema types are compatible
272    fn json_types_compatible(
273        &self,
274        old_type: &serde_json::Value,
275        new_type: &serde_json::Value,
276    ) -> bool {
277        let old_t = old_type.get("type").and_then(|t| t.as_str());
278        let new_t = new_type.get("type").and_then(|t| t.as_str());
279
280        match (old_t, new_t) {
281            (Some(old), Some(new)) => {
282                // Same type is always compatible
283                if old == new {
284                    return true;
285                }
286
287                // Allow widening: integer -> number
288                if old == "integer" && new == "number" {
289                    return true;
290                }
291
292                false
293            }
294            // If type is missing, assume object (compatible)
295            (None, None) => true,
296            _ => false,
297        }
298    }
299
300    /// Check Protobuf schema compatibility
301    ///
302    /// Implements Protobuf evolution rules:
303    /// - Field numbers cannot be reused with different names
304    /// - Reserved field numbers cannot be reused
305    /// - Required fields cannot be removed (proto2)
306    fn check_protobuf(
307        &self,
308        new_schema: &str,
309        existing_schemas: &[&str],
310    ) -> SchemaResult<CompatibilityResult> {
311        let schemas_to_check = if self.level.is_transitive() {
312            existing_schemas.to_vec()
313        } else {
314            existing_schemas
315                .last()
316                .map(|s| vec![*s])
317                .unwrap_or_default()
318        };
319
320        let mut messages = Vec::new();
321
322        for (i, existing_str) in schemas_to_check.iter().enumerate() {
323            let result = self.check_protobuf_pair(new_schema, existing_str)?;
324            if !result.is_compatible {
325                for msg in result.messages {
326                    messages.push(format!("Version {}: {}", i + 1, msg));
327                }
328            }
329        }
330
331        if messages.is_empty() {
332            Ok(CompatibilityResult::compatible())
333        } else {
334            Ok(CompatibilityResult::incompatible(messages))
335        }
336    }
337
338    /// Check compatibility between two Protobuf schemas
339    fn check_protobuf_pair(
340        &self,
341        new_schema: &str,
342        existing_schema: &str,
343    ) -> SchemaResult<CompatibilityResult> {
344        use std::collections::{HashMap, HashSet};
345
346        let mut messages = Vec::new();
347
348        // Extract field numbers from both schemas using regex
349        // Pattern matches: field_name = field_number
350        let field_pattern = regex::Regex::new(r"(\w+)\s*=\s*(\d+)")
351            .map_err(|e| SchemaError::ParseError(format!("Regex error: {}", e)))?;
352
353        let old_fields: HashMap<u32, String> = field_pattern
354            .captures_iter(existing_schema)
355            .filter_map(|cap| {
356                let name = cap.get(1)?.as_str().to_string();
357                let num: u32 = cap.get(2)?.as_str().parse().ok()?;
358                Some((num, name))
359            })
360            .collect();
361
362        let new_fields: HashMap<u32, String> = field_pattern
363            .captures_iter(new_schema)
364            .filter_map(|cap| {
365                let name = cap.get(1)?.as_str().to_string();
366                let num: u32 = cap.get(2)?.as_str().parse().ok()?;
367                Some((num, name))
368            })
369            .collect();
370
371        // Rule 1: Field numbers cannot be reused with different names
372        for (num, old_name) in &old_fields {
373            if let Some(new_name) = new_fields.get(num) {
374                if old_name != new_name {
375                    messages.push(format!(
376                        "PROTOBUF incompatible: field number {} reused (was '{}', now '{}')",
377                        num, old_name, new_name
378                    ));
379                }
380            }
381        }
382
383        // Rule 2: Reserved field numbers cannot be reused
384        let reserved_pattern = regex::Regex::new(r"reserved\s+(\d+(?:,\s*\d+)*)")
385            .map_err(|e| SchemaError::ParseError(format!("Regex error: {}", e)))?;
386
387        let old_reserved: HashSet<u32> = reserved_pattern
388            .captures_iter(existing_schema)
389            .flat_map(|cap| {
390                cap.get(1)
391                    .map(|m| {
392                        m.as_str()
393                            .split(',')
394                            .filter_map(|n| n.trim().parse().ok())
395                            .collect::<Vec<u32>>()
396                    })
397                    .unwrap_or_default()
398            })
399            .collect();
400
401        for (num, name) in &new_fields {
402            if old_reserved.contains(num) {
403                messages.push(format!(
404                    "PROTOBUF incompatible: field '{}' uses reserved number {}",
405                    name, num
406                ));
407            }
408        }
409
410        // Rule 3: Required fields cannot be removed (proto2)
411        let required_pattern = regex::Regex::new(r"required\s+\w+\s+(\w+)")
412            .map_err(|e| SchemaError::ParseError(format!("Regex error: {}", e)))?;
413
414        let old_required: HashSet<&str> = required_pattern
415            .captures_iter(existing_schema)
416            .filter_map(|cap| cap.get(1).map(|m| m.as_str()))
417            .collect();
418
419        for required_name in old_required {
420            if !new_fields.values().any(|n| n == required_name) {
421                messages.push(format!(
422                    "PROTOBUF incompatible: required field '{}' removed",
423                    required_name
424                ));
425            }
426        }
427
428        if messages.is_empty() {
429            Ok(CompatibilityResult::compatible())
430        } else {
431            Ok(CompatibilityResult::incompatible(messages))
432        }
433    }
434}
435
436/// Check if a reader schema can read data written with a writer schema
437///
438/// This implements Avro schema resolution rules:
439/// - Field matching by name (aliases supported)
440/// - Default values for missing fields
441/// - Type promotion rules (int -> long -> float -> double)
442#[cfg(feature = "avro")]
443fn check_schema_resolution(
444    writer: &apache_avro::Schema,
445    reader: &apache_avro::Schema,
446) -> Result<(), String> {
447    use apache_avro::Schema;
448
449    match (writer, reader) {
450        // Same type always compatible
451        (Schema::Null, Schema::Null) => Ok(()),
452        (Schema::Boolean, Schema::Boolean) => Ok(()),
453        (Schema::String, Schema::String) => Ok(()),
454        (Schema::Bytes, Schema::Bytes) => Ok(()),
455
456        // Numeric promotions: int -> long -> float -> double
457        (Schema::Int, Schema::Int) => Ok(()),
458        (Schema::Int, Schema::Long) => Ok(()),
459        (Schema::Int, Schema::Float) => Ok(()),
460        (Schema::Int, Schema::Double) => Ok(()),
461        (Schema::Long, Schema::Long) => Ok(()),
462        (Schema::Long, Schema::Float) => Ok(()),
463        (Schema::Long, Schema::Double) => Ok(()),
464        (Schema::Float, Schema::Float) => Ok(()),
465        (Schema::Float, Schema::Double) => Ok(()),
466        (Schema::Double, Schema::Double) => Ok(()),
467
468        // String <-> Bytes promotion
469        (Schema::String, Schema::Bytes) => Ok(()),
470        (Schema::Bytes, Schema::String) => Ok(()),
471
472        // Arrays: element types must be compatible
473        (Schema::Array(w), Schema::Array(r)) => check_schema_resolution(&w.items, &r.items),
474
475        // Maps: value types must be compatible
476        (Schema::Map(w), Schema::Map(r)) => check_schema_resolution(&w.types, &r.types),
477
478        // Enums: reader symbols must be superset of writer symbols
479        (Schema::Enum(w), Schema::Enum(r)) => {
480            for symbol in &w.symbols {
481                if !r.symbols.contains(symbol) {
482                    return Err(format!(
483                        "Enum symbol '{}' in writer not found in reader",
484                        symbol
485                    ));
486                }
487            }
488            Ok(())
489        }
490
491        // Fixed: must have same size and name
492        (Schema::Fixed(w), Schema::Fixed(r)) => {
493            if w.size != r.size {
494                return Err(format!(
495                    "Fixed size mismatch: writer={}, reader={}",
496                    w.size, r.size
497                ));
498            }
499            Ok(())
500        }
501
502        // Records: check field compatibility
503        (Schema::Record(w), Schema::Record(r)) => {
504            // Check all writer fields can be read
505            for w_field in &w.fields {
506                // Find matching reader field by name or alias
507                let r_field = r.fields.iter().find(|rf| {
508                    rf.name == w_field.name
509                        || rf
510                            .aliases
511                            .as_ref()
512                            .is_some_and(|a| a.contains(&w_field.name))
513                });
514
515                match r_field {
516                    Some(rf) => {
517                        // Field exists in reader, check type compatibility
518                        check_schema_resolution(&w_field.schema, &rf.schema)?;
519                    }
520                    None => {
521                        // Writer field not in reader - that's OK, reader ignores it
522                    }
523                }
524            }
525
526            // Check all reader fields have values (from writer or default)
527            for r_field in &r.fields {
528                let w_field = w.fields.iter().find(|wf| {
529                    wf.name == r_field.name
530                        || r_field
531                            .aliases
532                            .as_ref()
533                            .is_some_and(|a| a.contains(&wf.name))
534                });
535
536                if w_field.is_none() && r_field.default.is_none() {
537                    return Err(format!(
538                        "Reader field '{}' has no matching writer field and no default",
539                        r_field.name
540                    ));
541                }
542            }
543
544            Ok(())
545        }
546
547        // Unions: writer type must be resolvable to one reader type
548        (Schema::Union(w), Schema::Union(r)) => {
549            for w_variant in w.variants() {
550                let compatible = r
551                    .variants()
552                    .iter()
553                    .any(|rv| check_schema_resolution(w_variant, rv).is_ok());
554                if !compatible {
555                    return Err(format!(
556                        "Writer union variant {:?} not compatible with any reader variant",
557                        w_variant
558                    ));
559                }
560            }
561            Ok(())
562        }
563
564        // Non-union writer with union reader: writer must match one reader variant
565        (w, Schema::Union(r)) => {
566            let compatible = r
567                .variants()
568                .iter()
569                .any(|rv| check_schema_resolution(w, rv).is_ok());
570            if compatible {
571                Ok(())
572            } else {
573                Err(format!(
574                    "Writer schema {:?} not compatible with reader union",
575                    w
576                ))
577            }
578        }
579
580        // Union writer with non-union reader: all writer variants must be compatible
581        (Schema::Union(w), r) => {
582            for w_variant in w.variants() {
583                check_schema_resolution(w_variant, r)?;
584            }
585            Ok(())
586        }
587
588        // Incompatible types
589        (w, r) => Err(format!(
590            "Incompatible types: writer={:?}, reader={:?}",
591            w, r
592        )),
593    }
594}
595
596#[cfg(test)]
597mod tests {
598    use super::*;
599
600    #[test]
601    fn test_compatibility_level_properties() {
602        assert!(CompatibilityLevel::Backward.is_backward());
603        assert!(!CompatibilityLevel::Backward.is_forward());
604        assert!(!CompatibilityLevel::Backward.is_transitive());
605
606        assert!(!CompatibilityLevel::Forward.is_backward());
607        assert!(CompatibilityLevel::Forward.is_forward());
608
609        assert!(CompatibilityLevel::Full.is_backward());
610        assert!(CompatibilityLevel::Full.is_forward());
611
612        assert!(CompatibilityLevel::FullTransitive.is_transitive());
613    }
614
615    #[test]
616    fn test_compatibility_level_parse() {
617        assert_eq!(
618            "BACKWARD".parse::<CompatibilityLevel>().unwrap(),
619            CompatibilityLevel::Backward
620        );
621        assert_eq!(
622            "FULL_TRANSITIVE".parse::<CompatibilityLevel>().unwrap(),
623            CompatibilityLevel::FullTransitive
624        );
625        assert_eq!(
626            "NONE".parse::<CompatibilityLevel>().unwrap(),
627            CompatibilityLevel::None
628        );
629    }
630
631    #[test]
632    fn test_none_compatibility_always_passes() {
633        let checker = CompatibilityChecker::new(CompatibilityLevel::None);
634        let result = checker
635            .check(SchemaType::Avro, "{}", &["{\"invalid\"}"])
636            .unwrap();
637        assert!(result.is_compatible);
638    }
639
640    #[test]
641    fn test_empty_existing_schemas() {
642        let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
643        let result = checker.check(SchemaType::Avro, "{}", &[]).unwrap();
644        assert!(result.is_compatible);
645    }
646
647    #[cfg(feature = "avro")]
648    #[test]
649    fn test_backward_compatible_add_optional_field() {
650        let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
651
652        let v1 =
653            r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}]}"#;
654        let v2 = r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}, {"name": "name", "type": ["null", "string"], "default": null}]}"#;
655
656        let result = checker.check(SchemaType::Avro, v2, &[v1]).unwrap();
657        assert!(
658            result.is_compatible,
659            "Adding optional field should be backward compatible"
660        );
661    }
662
663    #[cfg(feature = "avro")]
664    #[test]
665    fn test_backward_incompatible_add_required_field() {
666        let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
667
668        // v1 has only 'id', v2 adds 'name' WITHOUT a default
669        // Old data doesn't have 'name', so new reader can't read it
670        let v1 =
671            r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}]}"#;
672        let v2 = r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}, {"name": "name", "type": "string"}]}"#;
673
674        let result = checker.check(SchemaType::Avro, v2, &[v1]).unwrap();
675        assert!(
676            !result.is_compatible,
677            "Adding required field (no default) should be backward incompatible"
678        );
679    }
680
681    #[cfg(feature = "avro")]
682    #[test]
683    fn test_numeric_type_promotion() {
684        let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
685
686        let v1 =
687            r#"{"type": "record", "name": "Data", "fields": [{"name": "value", "type": "int"}]}"#;
688        let v2 =
689            r#"{"type": "record", "name": "Data", "fields": [{"name": "value", "type": "long"}]}"#;
690
691        let result = checker.check(SchemaType::Avro, v2, &[v1]).unwrap();
692        assert!(
693            result.is_compatible,
694            "int -> long promotion should be backward compatible"
695        );
696    }
697
698    // JSON Schema compatibility tests
699    #[test]
700    fn test_json_backward_add_optional_field() {
701        let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
702
703        let old = r#"{"type":"object","properties":{"id":{"type":"integer"}},"required":["id"]}"#;
704        let new = r#"{"type":"object","properties":{"id":{"type":"integer"},"name":{"type":"string"}},"required":["id"]}"#;
705
706        let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
707        assert!(
708            result.is_compatible,
709            "Adding optional field should be backward compatible"
710        );
711    }
712
713    #[test]
714    fn test_json_backward_add_required_field() {
715        let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
716
717        let old = r#"{"type":"object","properties":{"id":{"type":"integer"}},"required":["id"]}"#;
718        let new = r#"{"type":"object","properties":{"id":{"type":"integer"},"name":{"type":"string"}},"required":["id","name"]}"#;
719
720        let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
721        assert!(
722            !result.is_compatible,
723            "Adding required field should NOT be backward compatible"
724        );
725    }
726
727    #[test]
728    fn test_json_forward_remove_required_field() {
729        let checker = CompatibilityChecker::new(CompatibilityLevel::Forward);
730
731        let old = r#"{"type":"object","properties":{"id":{"type":"integer"},"name":{"type":"string"}},"required":["id","name"]}"#;
732        let new = r#"{"type":"object","properties":{"id":{"type":"integer"}},"required":["id"]}"#;
733
734        let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
735        assert!(
736            !result.is_compatible,
737            "Removing required field should NOT be forward compatible"
738        );
739    }
740
741    #[test]
742    fn test_json_type_widening() {
743        let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
744
745        let old = r#"{"type":"object","properties":{"value":{"type":"integer"}}}"#;
746        let new = r#"{"type":"object","properties":{"value":{"type":"number"}}}"#;
747
748        let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
749        assert!(
750            result.is_compatible,
751            "integer -> number widening should be backward compatible"
752        );
753    }
754
755    // Protobuf compatibility tests
756    #[test]
757    fn test_protobuf_field_number_reuse() {
758        let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
759
760        let old = r#"
761            message User {
762                int64 id = 1;
763                string name = 2;
764            }
765        "#;
766        let new = r#"
767            message User {
768                int64 id = 1;
769                string email = 2;
770            }
771        "#;
772
773        let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
774        assert!(
775            !result.is_compatible,
776            "Reusing field number with different name should be incompatible"
777        );
778    }
779
780    #[test]
781    fn test_protobuf_add_new_field() {
782        let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
783
784        let old = r#"
785            message User {
786                int64 id = 1;
787            }
788        "#;
789        let new = r#"
790            message User {
791                int64 id = 1;
792                string name = 2;
793            }
794        "#;
795
796        let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
797        assert!(
798            result.is_compatible,
799            "Adding new field with new number should be compatible: {:?}",
800            result.messages
801        );
802    }
803
804    #[test]
805    fn test_protobuf_reserved_field_reuse() {
806        let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
807
808        let old = r#"
809            message User {
810                int64 id = 1;
811                reserved 2, 3;
812            }
813        "#;
814        let new = r#"
815            message User {
816                int64 id = 1;
817                string name = 2;
818            }
819        "#;
820
821        let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
822        assert!(
823            !result.is_compatible,
824            "Reusing reserved field number should be incompatible"
825        );
826    }
827}