Skip to main content

rivven_schema/
compatibility.rs

1//! Schema compatibility checking
2//!
3//! Supports Confluent-compatible compatibility levels for schema evolution.
4
5use crate::error::{SchemaError, SchemaResult};
6use crate::types::SchemaType;
7use serde::{Deserialize, Serialize};
8
9#[cfg(not(feature = "protobuf"))]
10use std::sync::LazyLock;
11
12// Compiled regexes for protobuf compatibility checking (regex fallback when protox-parse unavailable)
13#[cfg(not(feature = "protobuf"))]
14static PROTO_FIELD_RE: LazyLock<regex::Regex> = LazyLock::new(|| {
15    regex::Regex::new(r"(?:(?:optional|repeated|required)\s+)?\w+\s+(\w+)\s*=\s*(\d+)").unwrap()
16});
17#[cfg(not(feature = "protobuf"))]
18static PROTO_RESERVED_RE: LazyLock<regex::Regex> =
19    LazyLock::new(|| regex::Regex::new(r"reserved\s+(\d+(?:,\s*\d+)*)").unwrap());
20#[cfg(not(feature = "protobuf"))]
21static PROTO_REQUIRED_RE: LazyLock<regex::Regex> =
22    LazyLock::new(|| regex::Regex::new(r"required\s+\w+\s+(\w+)").unwrap());
23
24// Re-export CompatibilityLevel from types for backward compatibility
25pub use crate::types::CompatibilityLevel;
26
27/// Result of a compatibility check
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct CompatibilityResult {
30    /// Whether the schemas are compatible
31    pub is_compatible: bool,
32    /// Compatibility error messages (if any)
33    #[serde(default, skip_serializing_if = "Vec::is_empty")]
34    pub messages: Vec<String>,
35}
36
37impl CompatibilityResult {
38    pub fn compatible() -> Self {
39        Self {
40            is_compatible: true,
41            messages: Vec::new(),
42        }
43    }
44
45    pub fn incompatible(messages: Vec<String>) -> Self {
46        Self {
47            is_compatible: false,
48            messages,
49        }
50    }
51}
52
53/// Schema compatibility checker
54pub struct CompatibilityChecker {
55    level: CompatibilityLevel,
56}
57
58// ── Protobuf AST helper types ──
59
60/// Parsed field info from a `FieldDescriptorProto`.
61#[cfg(feature = "protobuf")]
62struct ProtoFieldInfo {
63    name: String,
64    /// Human-readable type description (e.g., "int32", "string", ".Foo").
65    type_desc: String,
66    /// Whether this field has `required` label (proto2 only).
67    is_required: bool,
68}
69
70/// Parsed message info with field map, reserved numbers, and reserved names.
71#[cfg(feature = "protobuf")]
72struct ProtoMessageInfo {
73    fields: std::collections::HashMap<i32, ProtoFieldInfo>,
74    reserved_numbers: std::collections::HashSet<i32>,
75    reserved_names: std::collections::HashSet<String>,
76}
77
78/// Parsed enum info with value-number → value-name mapping.
79#[cfg(feature = "protobuf")]
80struct ProtoEnumInfo {
81    values: std::collections::HashMap<i32, String>,
82}
83
84impl CompatibilityChecker {
85    pub fn new(level: CompatibilityLevel) -> Self {
86        Self { level }
87    }
88
89    /// Check if a new schema is compatible with existing schemas
90    pub fn check(
91        &self,
92        schema_type: SchemaType,
93        new_schema: &str,
94        existing_schemas: &[&str],
95    ) -> SchemaResult<CompatibilityResult> {
96        if self.level == CompatibilityLevel::None {
97            return Ok(CompatibilityResult::compatible());
98        }
99
100        if existing_schemas.is_empty() {
101            return Ok(CompatibilityResult::compatible());
102        }
103
104        match schema_type {
105            SchemaType::Avro => self.check_avro(new_schema, existing_schemas),
106            SchemaType::Json => self.check_json(new_schema, existing_schemas),
107            SchemaType::Protobuf => self.check_protobuf(new_schema, existing_schemas),
108        }
109    }
110
111    /// Check Avro schema compatibility
112    ///
113    /// Uses Apache Avro schema resolution rules:
114    /// - BACKWARD: Reader (new) can read data written with Writer (old)
115    /// - FORWARD: Reader (old) can read data written with Writer (new)
116    /// - FULL: Both directions work
117    #[cfg(feature = "avro")]
118    fn check_avro(
119        &self,
120        new_schema: &str,
121        existing_schemas: &[&str],
122    ) -> SchemaResult<CompatibilityResult> {
123        use apache_avro::Schema;
124
125        let new = Schema::parse_str(new_schema)
126            .map_err(|e| SchemaError::ParseError(format!("New schema: {}", e)))?;
127
128        let schemas_to_check = if self.level.is_transitive() {
129            existing_schemas.to_vec()
130        } else {
131            // Only check against the latest
132            existing_schemas
133                .last()
134                .map(|s| vec![*s])
135                .unwrap_or_default()
136        };
137
138        let mut errors = Vec::new();
139
140        for (i, existing_str) in schemas_to_check.iter().enumerate() {
141            let existing = Schema::parse_str(existing_str)
142                .map_err(|e| SchemaError::ParseError(format!("Existing schema {}: {}", i, e)))?;
143
144            // Check backward compatibility: new schema (reader) can read data written with old schema (writer)
145            if self.level.is_backward() {
146                if let Err(e) = check_schema_resolution(&existing, &new) {
147                    errors.push(format!(
148                        "Backward incompatible with version {}: {}",
149                        i + 1,
150                        e
151                    ));
152                }
153            }
154
155            // Check forward compatibility: old schema (reader) can read data written with new schema (writer)
156            if self.level.is_forward() {
157                if let Err(e) = check_schema_resolution(&new, &existing) {
158                    errors.push(format!(
159                        "Forward incompatible with version {}: {}",
160                        i + 1,
161                        e
162                    ));
163                }
164            }
165        }
166
167        if errors.is_empty() {
168            Ok(CompatibilityResult::compatible())
169        } else {
170            Ok(CompatibilityResult::incompatible(errors))
171        }
172    }
173
174    #[cfg(not(feature = "avro"))]
175    fn check_avro(
176        &self,
177        _new_schema: &str,
178        _existing_schemas: &[&str],
179    ) -> SchemaResult<CompatibilityResult> {
180        Err(SchemaError::Config("Avro support not enabled".to_string()))
181    }
182
183    /// Check JSON Schema compatibility
184    ///
185    /// Implements JSON Schema evolution rules:
186    /// - BACKWARD: New schema can read old data (adding required fields is incompatible)
187    /// - FORWARD: Old schema can read new data (removing required fields is incompatible)
188    /// - FULL: Both directions must be compatible
189    fn check_json(
190        &self,
191        new_schema: &str,
192        existing_schemas: &[&str],
193    ) -> SchemaResult<CompatibilityResult> {
194        use serde_json::Value as JsonValue;
195
196        let new: JsonValue = serde_json::from_str(new_schema)
197            .map_err(|e| SchemaError::ParseError(format!("New JSON schema: {}", e)))?;
198
199        let schemas_to_check = if self.level.is_transitive() {
200            existing_schemas.to_vec()
201        } else {
202            existing_schemas
203                .last()
204                .map(|s| vec![*s])
205                .unwrap_or_default()
206        };
207
208        let mut messages = Vec::new();
209
210        for (i, existing_str) in schemas_to_check.iter().enumerate() {
211            let old: JsonValue = serde_json::from_str(existing_str)
212                .map_err(|e| SchemaError::ParseError(format!("Existing schema {}: {}", i, e)))?;
213
214            let result = self.check_json_pair(&new, &old)?;
215            if !result.is_compatible {
216                for msg in result.messages {
217                    messages.push(format!("Version {}: {}", i + 1, msg));
218                }
219            }
220        }
221
222        if messages.is_empty() {
223            Ok(CompatibilityResult::compatible())
224        } else {
225            Ok(CompatibilityResult::incompatible(messages))
226        }
227    }
228
229    /// Check compatibility between two JSON schemas (SCHEMA-01: recursive).
230    ///
231    /// Recursively walks `properties`, `items`, `additionalProperties`,
232    /// `oneOf`/`anyOf`/`allOf`, and `$defs`/`definitions` for full
233    /// structural compatibility checking. `$ref` is resolved within the
234    /// same document when pointing to `#/$defs/<name>` or `#/definitions/<name>`.
235    fn check_json_pair(
236        &self,
237        new: &serde_json::Value,
238        old: &serde_json::Value,
239    ) -> SchemaResult<CompatibilityResult> {
240        let mut messages = Vec::new();
241        self.check_json_recursive(new, old, "", &mut messages);
242
243        if messages.is_empty() {
244            Ok(CompatibilityResult::compatible())
245        } else {
246            Ok(CompatibilityResult::incompatible(messages))
247        }
248    }
249
250    /// Recursively check JSON schema compatibility at a given path.
251    ///
252    /// `depth` prevents stack overflow from circular `$ref` or deeply nested
253    /// schemas.  We cap at 64 levels — well beyond any realistic schema.
254    fn check_json_recursive(
255        &self,
256        new: &serde_json::Value,
257        old: &serde_json::Value,
258        path: &str,
259        messages: &mut Vec<String>,
260    ) {
261        self.check_json_recursive_inner(new, old, path, messages, 0);
262    }
263
264    /// Inner recursive implementation with depth tracking.
265    const MAX_JSON_DEPTH: usize = 64;
266
267    fn check_json_recursive_inner(
268        &self,
269        new: &serde_json::Value,
270        old: &serde_json::Value,
271        path: &str,
272        messages: &mut Vec<String>,
273        depth: usize,
274    ) {
275        if depth >= Self::MAX_JSON_DEPTH {
276            let prefix = if path.is_empty() {
277                String::new()
278            } else {
279                format!("{}: ", path)
280            };
281            messages.push(format!(
282                "{}schema nesting exceeds maximum depth ({})",
283                prefix,
284                Self::MAX_JSON_DEPTH
285            ));
286            return;
287        }
288        let prefix = if path.is_empty() {
289            String::new()
290        } else {
291            format!("{}: ", path)
292        };
293
294        // --- Type compatibility ---
295        if !self.json_types_compatible(old, new) && self.level.is_backward() {
296            messages.push(format!("{}BACKWARD incompatible: type changed", prefix));
297        }
298        if !self.json_types_compatible(new, old) && self.level.is_forward() {
299            messages.push(format!("{}FORWARD incompatible: type changed", prefix));
300        }
301
302        // --- Properties (object fields) ---
303        let new_props = new.get("properties").and_then(|p| p.as_object());
304        let old_props = old.get("properties").and_then(|p| p.as_object());
305
306        let new_required: Vec<&str> = new
307            .get("required")
308            .and_then(|r| r.as_array())
309            .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
310            .unwrap_or_default();
311        let old_required: Vec<&str> = old
312            .get("required")
313            .and_then(|r| r.as_array())
314            .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
315            .unwrap_or_default();
316
317        if let (Some(new_p), Some(old_p)) = (new_props, old_props) {
318            if self.level.is_backward() {
319                // Cannot add new required fields that didn't exist in old schema
320                for field in &new_required {
321                    if !old_required.contains(field) && !old_p.contains_key(*field) {
322                        messages.push(format!(
323                            "{}BACKWARD incompatible: new required field '{}'",
324                            prefix, field
325                        ));
326                    }
327                }
328                // Recurse into shared properties
329                for (name, old_def) in old_p {
330                    let child_path = if path.is_empty() {
331                        name.clone()
332                    } else {
333                        format!("{}.{}", path, name)
334                    };
335                    if let Some(new_def) = new_p.get(name) {
336                        self.check_json_recursive_inner(
337                            new_def,
338                            old_def,
339                            &child_path,
340                            messages,
341                            depth + 1,
342                        );
343                    }
344                }
345            }
346            if self.level.is_forward() {
347                // Cannot remove required fields
348                for field in &old_required {
349                    if !new_p.contains_key(*field) {
350                        messages.push(format!(
351                            "{}FORWARD incompatible: required field '{}' removed",
352                            prefix, field
353                        ));
354                    }
355                }
356                // Recurse into shared properties (reverse direction)
357                for (name, old_def) in old_p {
358                    let child_path = if path.is_empty() {
359                        name.clone()
360                    } else {
361                        format!("{}.{}", path, name)
362                    };
363                    if let Some(new_def) = new_p.get(name) {
364                        // For forward compat we check if old can read new
365                        // (type narrowing detection is already done above via json_types_compatible)
366                        self.check_json_recursive_inner(
367                            new_def,
368                            old_def,
369                            &child_path,
370                            messages,
371                            depth + 1,
372                        );
373                    }
374                }
375            }
376        }
377
378        // --- additionalProperties ---
379        if let (Some(new_ap), Some(old_ap)) = (
380            new.get("additionalProperties"),
381            old.get("additionalProperties"),
382        ) {
383            // If one schema disallows additional properties and the other allows them
384            let new_allows = !matches!(new_ap, serde_json::Value::Bool(false));
385            let old_allows = !matches!(old_ap, serde_json::Value::Bool(false));
386            if self.level.is_backward() && !new_allows && old_allows {
387                messages.push(format!(
388                    "{}BACKWARD incompatible: additionalProperties changed from allowed to disallowed",
389                    prefix
390                ));
391            }
392            if self.level.is_forward() && new_allows && !old_allows {
393                messages.push(format!(
394                    "{}FORWARD incompatible: additionalProperties changed from disallowed to allowed",
395                    prefix
396                ));
397            }
398            // If both are schemas, recurse
399            if new_ap.is_object() && old_ap.is_object() {
400                let child_path = if path.is_empty() {
401                    "additionalProperties".to_string()
402                } else {
403                    format!("{}.additionalProperties", path)
404                };
405                self.check_json_recursive_inner(new_ap, old_ap, &child_path, messages, depth + 1);
406            }
407        }
408
409        // --- items (array element schema) ---
410        if let (Some(new_items), Some(old_items)) = (new.get("items"), old.get("items")) {
411            let child_path = if path.is_empty() {
412                "items".to_string()
413            } else {
414                format!("{}[]", path)
415            };
416            self.check_json_recursive_inner(new_items, old_items, &child_path, messages, depth + 1);
417        }
418
419        // --- enum value constraints ---
420        if let (Some(new_enum), Some(old_enum)) = (
421            new.get("enum").and_then(|e| e.as_array()),
422            old.get("enum").and_then(|e| e.as_array()),
423        ) {
424            if self.level.is_backward() {
425                // Old data may contain values removed from new enum
426                for val in old_enum {
427                    if !new_enum.contains(val) {
428                        messages.push(format!(
429                            "{}BACKWARD incompatible: enum value {:?} removed",
430                            prefix, val
431                        ));
432                    }
433                }
434            }
435            if self.level.is_forward() {
436                // New data may contain values not in old enum
437                for val in new_enum {
438                    if !old_enum.contains(val) {
439                        messages.push(format!(
440                            "{}FORWARD incompatible: new enum value {:?} added",
441                            prefix, val
442                        ));
443                    }
444                }
445            }
446        }
447    }
448
449    /// Check if JSON Schema types are compatible
450    fn json_types_compatible(
451        &self,
452        old_type: &serde_json::Value,
453        new_type: &serde_json::Value,
454    ) -> bool {
455        let old_t = old_type.get("type").and_then(|t| t.as_str());
456        let new_t = new_type.get("type").and_then(|t| t.as_str());
457
458        match (old_t, new_t) {
459            (Some(old), Some(new)) => {
460                // Same type is always compatible
461                if old == new {
462                    return true;
463                }
464
465                // Allow widening: integer -> number
466                if old == "integer" && new == "number" {
467                    return true;
468                }
469
470                false
471            }
472            // If type is missing, assume object (compatible)
473            (None, None) => true,
474            _ => false,
475        }
476    }
477
478    /// Check Protobuf schema compatibility
479    ///
480    /// Implements Protobuf evolution rules using a proper protobuf AST parser:
481    /// - Field numbers cannot be reused with different names
482    /// - Field type changes are detected (including in nested messages)
483    /// - Reserved field numbers and names cannot be reused
484    /// - Required fields cannot be removed (proto2)
485    /// - Enum value removal is detected
486    /// - Nested messages and `oneof` fields are handled correctly
487    /// - `map<K,V>` fields are parsed properly
488    #[cfg(feature = "protobuf")]
489    fn check_protobuf(
490        &self,
491        new_schema: &str,
492        existing_schemas: &[&str],
493    ) -> SchemaResult<CompatibilityResult> {
494        let schemas_to_check = if self.level.is_transitive() {
495            existing_schemas.to_vec()
496        } else {
497            existing_schemas
498                .last()
499                .map(|s| vec![*s])
500                .unwrap_or_default()
501        };
502
503        let mut messages = Vec::new();
504
505        for (i, existing_str) in schemas_to_check.iter().enumerate() {
506            let result = self.check_protobuf_pair(new_schema, existing_str)?;
507            if !result.is_compatible {
508                for msg in result.messages {
509                    messages.push(format!("Version {}: {}", i + 1, msg));
510                }
511            }
512        }
513
514        if messages.is_empty() {
515            Ok(CompatibilityResult::compatible())
516        } else {
517            Ok(CompatibilityResult::incompatible(messages))
518        }
519    }
520
521    /// Check compatibility between two Protobuf schemas using AST-based analysis.
522    ///
523    /// Parses both schemas with `protox-parse` and compares:
524    /// 1. Field number reuse with different names
525    /// 2. Field type changes (wire-incompatible)
526    /// 3. Reserved field number reuse
527    /// 4. Reserved field name reuse
528    /// 5. Required field removal (proto2)
529    /// 6. Enum value removal
530    #[cfg(feature = "protobuf")]
531    fn check_protobuf_pair(
532        &self,
533        new_schema: &str,
534        existing_schema: &str,
535    ) -> SchemaResult<CompatibilityResult> {
536        use std::collections::HashMap;
537
538        let old_file = protox_parse::parse("existing.proto", existing_schema)
539            .map_err(|e| SchemaError::ParseError(format!("Existing protobuf schema: {e}")))?;
540        let new_file = protox_parse::parse("new.proto", new_schema)
541            .map_err(|e| SchemaError::ParseError(format!("New protobuf schema: {e}")))?;
542
543        let mut errors = Vec::new();
544
545        // Extract all messages recursively (handles nested messages, map entries, etc.)
546        let mut old_msgs: HashMap<String, ProtoMessageInfo> = HashMap::new();
547        let mut old_enums: HashMap<String, ProtoEnumInfo> = HashMap::new();
548        Self::extract_messages_recursive(&old_file.message_type, "", &mut old_msgs, &mut old_enums);
549        Self::extract_enums(&old_file.enum_type, "", &mut old_enums);
550
551        let mut new_msgs: HashMap<String, ProtoMessageInfo> = HashMap::new();
552        let mut new_enums: HashMap<String, ProtoEnumInfo> = HashMap::new();
553        Self::extract_messages_recursive(&new_file.message_type, "", &mut new_msgs, &mut new_enums);
554        Self::extract_enums(&new_file.enum_type, "", &mut new_enums);
555
556        // Compare messages
557        for (msg_name, old_msg) in &old_msgs {
558            if let Some(new_msg) = new_msgs.get(msg_name) {
559                // Rule 1 & 2: Field number reuse with different name or type
560                for (num, old_field) in &old_msg.fields {
561                    if let Some(new_field) = new_msg.fields.get(num) {
562                        if old_field.name != new_field.name {
563                            errors.push(format!(
564                                "PROTOBUF incompatible: {}: field number {} reused (was '{}', now '{}')",
565                                msg_name, num, old_field.name, new_field.name
566                            ));
567                        }
568                        if old_field.type_desc != new_field.type_desc {
569                            errors.push(format!(
570                                "PROTOBUF incompatible: {}: field {} ('{}') type changed '{}' → '{}'",
571                                msg_name, num, old_field.name, old_field.type_desc, new_field.type_desc
572                            ));
573                        }
574                    }
575                }
576
577                // Rule 3: Reserved number reuse
578                for (num, field) in &new_msg.fields {
579                    if old_msg.reserved_numbers.contains(num) {
580                        errors.push(format!(
581                            "PROTOBUF incompatible: {}: field '{}' uses reserved number {}",
582                            msg_name, field.name, num
583                        ));
584                    }
585                }
586
587                // Rule 4: Reserved name reuse
588                for field in new_msg.fields.values() {
589                    if old_msg.reserved_names.contains(&field.name) {
590                        errors.push(format!(
591                            "PROTOBUF incompatible: {}: field name '{}' is reserved",
592                            msg_name, field.name
593                        ));
594                    }
595                }
596
597                // Rule 5: Required fields cannot be removed (proto2)
598                for (num, field) in &old_msg.fields {
599                    if field.is_required && !new_msg.fields.contains_key(num) {
600                        errors.push(format!(
601                            "PROTOBUF incompatible: {}: required field '{}' (number {}) removed",
602                            msg_name, field.name, num
603                        ));
604                    }
605                }
606            }
607        }
608
609        // Rule 6: Enum value removal
610        for (enum_name, old_enum) in &old_enums {
611            if let Some(new_enum) = new_enums.get(enum_name) {
612                for (num, name) in &old_enum.values {
613                    if !new_enum.values.contains_key(num) {
614                        errors.push(format!(
615                            "PROTOBUF incompatible: {}: enum value '{}' = {} removed",
616                            enum_name, name, num
617                        ));
618                    }
619                }
620            }
621        }
622
623        if errors.is_empty() {
624            Ok(CompatibilityResult::compatible())
625        } else {
626            Ok(CompatibilityResult::incompatible(errors))
627        }
628    }
629
630    /// Extract messages recursively from a `DescriptorProto` list.
631    ///
632    /// Walks nested messages (including synthetic map-entry messages) and builds
633    /// a flat map of fully-qualified message name → field info.
634    #[cfg(feature = "protobuf")]
635    fn extract_messages_recursive(
636        messages: &[prost_types::DescriptorProto],
637        prefix: &str,
638        out_msgs: &mut std::collections::HashMap<String, ProtoMessageInfo>,
639        out_enums: &mut std::collections::HashMap<String, ProtoEnumInfo>,
640    ) {
641        for msg in messages {
642            let name = msg.name.as_deref().unwrap_or("<unknown>");
643            let fqn = if prefix.is_empty() {
644                name.to_string()
645            } else {
646                format!("{}.{}", prefix, name)
647            };
648
649            // Skip synthetic map-entry messages — they are implementation details
650            // of `map<K,V>` fields, not user-visible message types.
651            let is_map_entry = msg
652                .options
653                .as_ref()
654                .and_then(|o| o.map_entry)
655                .unwrap_or(false);
656
657            if !is_map_entry {
658                let mut fields = std::collections::HashMap::new();
659                let mut reserved_numbers = std::collections::HashSet::new();
660                let mut reserved_names = std::collections::HashSet::new();
661
662                for field in &msg.field {
663                    let field_name = field.name.clone().unwrap_or_default();
664                    let field_number = field.number.unwrap_or(0);
665                    let type_desc = Self::proto_type_description(field);
666                    let is_required = field.label == Some(2); // LABEL_REQUIRED = 2
667
668                    fields.insert(
669                        field_number,
670                        ProtoFieldInfo {
671                            name: field_name,
672                            type_desc,
673                            is_required,
674                        },
675                    );
676                }
677
678                // Reserved ranges (e.g., `reserved 5 to 10;`)
679                for range in &msg.reserved_range {
680                    let start = range.start.unwrap_or(0);
681                    let end = range.end.unwrap_or(start); // end is exclusive in descriptor
682                    for num in start..end {
683                        reserved_numbers.insert(num);
684                    }
685                }
686
687                // Reserved names (e.g., `reserved "old_field";`)
688                for name in &msg.reserved_name {
689                    reserved_names.insert(name.clone());
690                }
691
692                out_msgs.insert(
693                    fqn.clone(),
694                    ProtoMessageInfo {
695                        fields,
696                        reserved_numbers,
697                        reserved_names,
698                    },
699                );
700            }
701
702            // Recurse into nested messages
703            Self::extract_messages_recursive(&msg.nested_type, &fqn, out_msgs, out_enums);
704
705            // Extract nested enums
706            Self::extract_enums(&msg.enum_type, &fqn, out_enums);
707        }
708    }
709
710    /// Extract enum definitions into a flat map.
711    #[cfg(feature = "protobuf")]
712    fn extract_enums(
713        enums: &[prost_types::EnumDescriptorProto],
714        prefix: &str,
715        out: &mut std::collections::HashMap<String, ProtoEnumInfo>,
716    ) {
717        for e in enums {
718            let name = e.name.as_deref().unwrap_or("<unknown>");
719            let fqn = if prefix.is_empty() {
720                name.to_string()
721            } else {
722                format!("{}.{}", prefix, name)
723            };
724
725            let mut values = std::collections::HashMap::new();
726            for v in &e.value {
727                let val_name = v.name.clone().unwrap_or_default();
728                let val_number = v.number.unwrap_or(0);
729                values.insert(val_number, val_name);
730            }
731
732            out.insert(fqn, ProtoEnumInfo { values });
733        }
734    }
735
736    /// Build a human-readable type description for a protobuf field.
737    ///
738    /// For scalar types, returns the type name (e.g., "int32", "string").
739    /// For message/enum references, returns the type_name from the descriptor.
740    #[cfg(feature = "protobuf")]
741    fn proto_type_description(field: &prost_types::FieldDescriptorProto) -> String {
742        let type_val = field.r#type.unwrap_or(0);
743        match type_val {
744            1 => "double".into(),
745            2 => "float".into(),
746            3 => "int64".into(),
747            4 => "uint64".into(),
748            5 => "int32".into(),
749            6 => "fixed64".into(),
750            7 => "fixed32".into(),
751            8 => "bool".into(),
752            9 => "string".into(),
753            10 => "group".into(),
754            11 | 14 => {
755                // TYPE_MESSAGE (11) or TYPE_ENUM (14) — use type_name
756                field
757                    .type_name
758                    .clone()
759                    .unwrap_or_else(|| format!("type_{}", type_val))
760            }
761            12 => "bytes".into(),
762            13 => "uint32".into(),
763            15 => "sfixed32".into(),
764            16 => "sfixed64".into(),
765            17 => "sint32".into(),
766            18 => "sint64".into(),
767            _ => format!("unknown_{}", type_val),
768        }
769    }
770
771    // ── Regex-based fallback when `protobuf` feature is disabled ──
772
773    /// Check Protobuf schema compatibility (regex fallback)
774    ///
775    /// # Limitations
776    ///
777    /// This is a regex-based heuristic fallback used when the `protobuf` feature
778    /// is not enabled. Enable the `protobuf` feature for proper AST-based checking.
779    #[cfg(not(feature = "protobuf"))]
780    fn check_protobuf(
781        &self,
782        new_schema: &str,
783        existing_schemas: &[&str],
784    ) -> SchemaResult<CompatibilityResult> {
785        let schemas_to_check = if self.level.is_transitive() {
786            existing_schemas.to_vec()
787        } else {
788            existing_schemas
789                .last()
790                .map(|s| vec![*s])
791                .unwrap_or_default()
792        };
793
794        let mut messages = Vec::new();
795
796        for (i, existing_str) in schemas_to_check.iter().enumerate() {
797            let result = self.check_protobuf_pair(new_schema, existing_str)?;
798            if !result.is_compatible {
799                for msg in result.messages {
800                    messages.push(format!("Version {}: {}", i + 1, msg));
801                }
802            }
803        }
804
805        if messages.is_empty() {
806            Ok(CompatibilityResult::compatible())
807        } else {
808            Ok(CompatibilityResult::incompatible(messages))
809        }
810    }
811
812    /// Check compatibility between two Protobuf schemas (regex fallback)
813    #[cfg(not(feature = "protobuf"))]
814    fn check_protobuf_pair(
815        &self,
816        new_schema: &str,
817        existing_schema: &str,
818    ) -> SchemaResult<CompatibilityResult> {
819        use std::collections::{HashMap, HashSet};
820
821        let mut messages = Vec::new();
822
823        // Strip comments before regex extraction to avoid
824        // matching field numbers in comments or strings.
825        let strip_comments = |s: &str| -> String {
826            let mut result = String::with_capacity(s.len());
827            let mut chars = s.chars().peekable();
828            while let Some(c) = chars.next() {
829                if c == '/' {
830                    if chars.peek() == Some(&'/') {
831                        // Line comment — skip to end of line
832                        for ch in chars.by_ref() {
833                            if ch == '\n' {
834                                result.push('\n');
835                                break;
836                            }
837                        }
838                        continue;
839                    } else if chars.peek() == Some(&'*') {
840                        // Block comment — skip to */
841                        chars.next(); // consume *
842                        loop {
843                            match chars.next() {
844                                Some('*') if chars.peek() == Some(&'/') => {
845                                    chars.next();
846                                    break;
847                                }
848                                Some(_) => continue,
849                                None => break,
850                            }
851                        }
852                        continue;
853                    }
854                }
855                result.push(c);
856            }
857            result
858        };
859
860        let clean_existing = strip_comments(existing_schema);
861        let clean_new = strip_comments(new_schema);
862
863        // Extract field numbers from both schemas using regex
864        // Pattern matches: [optional|repeated|required] type field_name = field_number
865        // The label is optional (proto3 omits it), and we require a type word
866        // before the field name to avoid matching option values or reserved numbers.
867        let field_pattern = &*PROTO_FIELD_RE;
868
869        let old_fields: HashMap<u32, String> = field_pattern
870            .captures_iter(&clean_existing)
871            .filter_map(|cap| {
872                let name = cap.get(1)?.as_str().to_string();
873                let num: u32 = cap.get(2)?.as_str().parse().ok()?;
874                Some((num, name))
875            })
876            .collect();
877
878        let new_fields: HashMap<u32, String> = field_pattern
879            .captures_iter(&clean_new)
880            .filter_map(|cap| {
881                let name = cap.get(1)?.as_str().to_string();
882                let num: u32 = cap.get(2)?.as_str().parse().ok()?;
883                Some((num, name))
884            })
885            .collect();
886
887        // Rule 1: Field numbers cannot be reused with different names
888        for (num, old_name) in &old_fields {
889            if let Some(new_name) = new_fields.get(num) {
890                if old_name != new_name {
891                    messages.push(format!(
892                        "PROTOBUF incompatible: field number {} reused (was '{}', now '{}')",
893                        num, old_name, new_name
894                    ));
895                }
896            }
897        }
898
899        // Rule 2: Reserved field numbers cannot be reused
900        let reserved_pattern = &*PROTO_RESERVED_RE;
901
902        let old_reserved: HashSet<u32> = reserved_pattern
903            .captures_iter(&clean_existing)
904            .flat_map(|cap| {
905                cap.get(1)
906                    .map(|m| {
907                        m.as_str()
908                            .split(',')
909                            .filter_map(|n| n.trim().parse().ok())
910                            .collect::<Vec<u32>>()
911                    })
912                    .unwrap_or_default()
913            })
914            .collect();
915
916        for (num, name) in &new_fields {
917            if old_reserved.contains(num) {
918                messages.push(format!(
919                    "PROTOBUF incompatible: field '{}' uses reserved number {}",
920                    name, num
921                ));
922            }
923        }
924
925        // Rule 3: Required fields cannot be removed (proto2)
926        let required_pattern = &*PROTO_REQUIRED_RE;
927
928        let old_required: HashSet<&str> = required_pattern
929            .captures_iter(&clean_existing)
930            .filter_map(|cap| cap.get(1).map(|m| m.as_str()))
931            .collect();
932
933        for required_name in old_required {
934            if !new_fields.values().any(|n| n == required_name) {
935                messages.push(format!(
936                    "PROTOBUF incompatible: required field '{}' removed",
937                    required_name
938                ));
939            }
940        }
941
942        if messages.is_empty() {
943            Ok(CompatibilityResult::compatible())
944        } else {
945            Ok(CompatibilityResult::incompatible(messages))
946        }
947    }
948}
949
950/// Check if a reader schema can read data written with a writer schema
951///
952/// This implements Avro schema resolution rules:
953/// - Field matching by name (aliases supported)
954/// - Default values for missing fields
955/// - Type promotion rules (int -> long -> float -> double)
956#[cfg(feature = "avro")]
957fn check_schema_resolution(
958    writer: &apache_avro::Schema,
959    reader: &apache_avro::Schema,
960) -> Result<(), String> {
961    use apache_avro::Schema;
962
963    match (writer, reader) {
964        // Same type always compatible
965        (Schema::Null, Schema::Null) => Ok(()),
966        (Schema::Boolean, Schema::Boolean) => Ok(()),
967        (Schema::String, Schema::String) => Ok(()),
968        (Schema::Bytes, Schema::Bytes) => Ok(()),
969
970        // Numeric promotions: int -> long -> float -> double
971        (Schema::Int, Schema::Int) => Ok(()),
972        (Schema::Int, Schema::Long) => Ok(()),
973        (Schema::Int, Schema::Float) => Ok(()),
974        (Schema::Int, Schema::Double) => Ok(()),
975        (Schema::Long, Schema::Long) => Ok(()),
976        (Schema::Long, Schema::Float) => Ok(()),
977        (Schema::Long, Schema::Double) => Ok(()),
978        (Schema::Float, Schema::Float) => Ok(()),
979        (Schema::Float, Schema::Double) => Ok(()),
980        (Schema::Double, Schema::Double) => Ok(()),
981
982        // String <-> Bytes promotion
983        (Schema::String, Schema::Bytes) => Ok(()),
984        (Schema::Bytes, Schema::String) => Ok(()),
985
986        // Arrays: element types must be compatible
987        (Schema::Array(w), Schema::Array(r)) => check_schema_resolution(&w.items, &r.items),
988
989        // Maps: value types must be compatible
990        (Schema::Map(w), Schema::Map(r)) => check_schema_resolution(&w.types, &r.types),
991
992        // Enums: reader symbols must be superset of writer symbols
993        (Schema::Enum(w), Schema::Enum(r)) => {
994            for symbol in &w.symbols {
995                if !r.symbols.contains(symbol) {
996                    return Err(format!(
997                        "Enum symbol '{}' in writer not found in reader",
998                        symbol
999                    ));
1000                }
1001            }
1002            Ok(())
1003        }
1004
1005        // Fixed: must have same size and name
1006        (Schema::Fixed(w), Schema::Fixed(r)) => {
1007            if w.size != r.size {
1008                return Err(format!(
1009                    "Fixed size mismatch: writer={}, reader={}",
1010                    w.size, r.size
1011                ));
1012            }
1013            Ok(())
1014        }
1015
1016        // Records: check field compatibility
1017        (Schema::Record(w), Schema::Record(r)) => {
1018            // Check all writer fields can be read
1019            for w_field in &w.fields {
1020                // Find matching reader field by name or alias
1021                let r_field = r.fields.iter().find(|rf| {
1022                    rf.name == w_field.name
1023                        || rf
1024                            .aliases
1025                            .as_ref()
1026                            .is_some_and(|a| a.contains(&w_field.name))
1027                });
1028
1029                match r_field {
1030                    Some(rf) => {
1031                        // Field exists in reader, check type compatibility
1032                        check_schema_resolution(&w_field.schema, &rf.schema)?;
1033                    }
1034                    None => {
1035                        // Writer field not in reader - that's OK, reader ignores it
1036                    }
1037                }
1038            }
1039
1040            // Check all reader fields have values (from writer or default)
1041            for r_field in &r.fields {
1042                let w_field = w.fields.iter().find(|wf| {
1043                    wf.name == r_field.name
1044                        || r_field
1045                            .aliases
1046                            .as_ref()
1047                            .is_some_and(|a| a.contains(&wf.name))
1048                });
1049
1050                if w_field.is_none() && r_field.default.is_none() {
1051                    return Err(format!(
1052                        "Reader field '{}' has no matching writer field and no default",
1053                        r_field.name
1054                    ));
1055                }
1056            }
1057
1058            Ok(())
1059        }
1060
1061        // Unions: writer type must be resolvable to one reader type
1062        (Schema::Union(w), Schema::Union(r)) => {
1063            for w_variant in w.variants() {
1064                let compatible = r
1065                    .variants()
1066                    .iter()
1067                    .any(|rv| check_schema_resolution(w_variant, rv).is_ok());
1068                if !compatible {
1069                    return Err(format!(
1070                        "Writer union variant {:?} not compatible with any reader variant",
1071                        w_variant
1072                    ));
1073                }
1074            }
1075            Ok(())
1076        }
1077
1078        // Non-union writer with union reader: writer must match one reader variant
1079        (w, Schema::Union(r)) => {
1080            let compatible = r
1081                .variants()
1082                .iter()
1083                .any(|rv| check_schema_resolution(w, rv).is_ok());
1084            if compatible {
1085                Ok(())
1086            } else {
1087                Err(format!(
1088                    "Writer schema {:?} not compatible with reader union",
1089                    w
1090                ))
1091            }
1092        }
1093
1094        // Union writer with non-union reader: at least one writer variant
1095        // must be compatible (Avro spec says reader resolves the first matching variant)
1096        (Schema::Union(w), r) => {
1097            let compatible = w
1098                .variants()
1099                .iter()
1100                .any(|wv| check_schema_resolution(wv, r).is_ok());
1101            if compatible {
1102                Ok(())
1103            } else {
1104                Err(format!(
1105                    "No writer union variant compatible with reader {:?}",
1106                    r
1107                ))
1108            }
1109        }
1110
1111        // Incompatible types
1112        (w, r) => Err(format!(
1113            "Incompatible types: writer={:?}, reader={:?}",
1114            w, r
1115        )),
1116    }
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121    use super::*;
1122
1123    #[test]
1124    fn test_compatibility_level_properties() {
1125        assert!(CompatibilityLevel::Backward.is_backward());
1126        assert!(!CompatibilityLevel::Backward.is_forward());
1127        assert!(!CompatibilityLevel::Backward.is_transitive());
1128
1129        assert!(!CompatibilityLevel::Forward.is_backward());
1130        assert!(CompatibilityLevel::Forward.is_forward());
1131
1132        assert!(CompatibilityLevel::Full.is_backward());
1133        assert!(CompatibilityLevel::Full.is_forward());
1134
1135        assert!(CompatibilityLevel::FullTransitive.is_transitive());
1136    }
1137
1138    #[test]
1139    fn test_compatibility_level_parse() {
1140        assert_eq!(
1141            "BACKWARD".parse::<CompatibilityLevel>().unwrap(),
1142            CompatibilityLevel::Backward
1143        );
1144        assert_eq!(
1145            "FULL_TRANSITIVE".parse::<CompatibilityLevel>().unwrap(),
1146            CompatibilityLevel::FullTransitive
1147        );
1148        assert_eq!(
1149            "NONE".parse::<CompatibilityLevel>().unwrap(),
1150            CompatibilityLevel::None
1151        );
1152    }
1153
1154    #[test]
1155    fn test_none_compatibility_always_passes() {
1156        let checker = CompatibilityChecker::new(CompatibilityLevel::None);
1157        let result = checker
1158            .check(SchemaType::Avro, "{}", &["{\"invalid\"}"])
1159            .unwrap();
1160        assert!(result.is_compatible);
1161    }
1162
1163    #[test]
1164    fn test_empty_existing_schemas() {
1165        let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
1166        let result = checker.check(SchemaType::Avro, "{}", &[]).unwrap();
1167        assert!(result.is_compatible);
1168    }
1169
1170    #[cfg(feature = "avro")]
1171    #[test]
1172    fn test_backward_compatible_add_optional_field() {
1173        let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
1174
1175        let v1 =
1176            r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}]}"#;
1177        let v2 = r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}, {"name": "name", "type": ["null", "string"], "default": null}]}"#;
1178
1179        let result = checker.check(SchemaType::Avro, v2, &[v1]).unwrap();
1180        assert!(
1181            result.is_compatible,
1182            "Adding optional field should be backward compatible"
1183        );
1184    }
1185
1186    #[cfg(feature = "avro")]
1187    #[test]
1188    fn test_backward_incompatible_add_required_field() {
1189        let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
1190
1191        // v1 has only 'id', v2 adds 'name' WITHOUT a default
1192        // Old data doesn't have 'name', so new reader can't read it
1193        let v1 =
1194            r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}]}"#;
1195        let v2 = r#"{"type": "record", "name": "User", "fields": [{"name": "id", "type": "long"}, {"name": "name", "type": "string"}]}"#;
1196
1197        let result = checker.check(SchemaType::Avro, v2, &[v1]).unwrap();
1198        assert!(
1199            !result.is_compatible,
1200            "Adding required field (no default) should be backward incompatible"
1201        );
1202    }
1203
1204    #[cfg(feature = "avro")]
1205    #[test]
1206    fn test_numeric_type_promotion() {
1207        let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
1208
1209        let v1 =
1210            r#"{"type": "record", "name": "Data", "fields": [{"name": "value", "type": "int"}]}"#;
1211        let v2 =
1212            r#"{"type": "record", "name": "Data", "fields": [{"name": "value", "type": "long"}]}"#;
1213
1214        let result = checker.check(SchemaType::Avro, v2, &[v1]).unwrap();
1215        assert!(
1216            result.is_compatible,
1217            "int -> long promotion should be backward compatible"
1218        );
1219    }
1220
1221    // JSON Schema compatibility tests
1222    #[test]
1223    fn test_json_backward_add_optional_field() {
1224        let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
1225
1226        let old = r#"{"type":"object","properties":{"id":{"type":"integer"}},"required":["id"]}"#;
1227        let new = r#"{"type":"object","properties":{"id":{"type":"integer"},"name":{"type":"string"}},"required":["id"]}"#;
1228
1229        let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
1230        assert!(
1231            result.is_compatible,
1232            "Adding optional field should be backward compatible"
1233        );
1234    }
1235
1236    #[test]
1237    fn test_json_backward_add_required_field() {
1238        let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
1239
1240        let old = r#"{"type":"object","properties":{"id":{"type":"integer"}},"required":["id"]}"#;
1241        let new = r#"{"type":"object","properties":{"id":{"type":"integer"},"name":{"type":"string"}},"required":["id","name"]}"#;
1242
1243        let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
1244        assert!(
1245            !result.is_compatible,
1246            "Adding required field should NOT be backward compatible"
1247        );
1248    }
1249
1250    #[test]
1251    fn test_json_forward_remove_required_field() {
1252        let checker = CompatibilityChecker::new(CompatibilityLevel::Forward);
1253
1254        let old = r#"{"type":"object","properties":{"id":{"type":"integer"},"name":{"type":"string"}},"required":["id","name"]}"#;
1255        let new = r#"{"type":"object","properties":{"id":{"type":"integer"}},"required":["id"]}"#;
1256
1257        let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
1258        assert!(
1259            !result.is_compatible,
1260            "Removing required field should NOT be forward compatible"
1261        );
1262    }
1263
1264    #[test]
1265    fn test_json_type_widening() {
1266        let checker = CompatibilityChecker::new(CompatibilityLevel::Backward);
1267
1268        let old = r#"{"type":"object","properties":{"value":{"type":"integer"}}}"#;
1269        let new = r#"{"type":"object","properties":{"value":{"type":"number"}}}"#;
1270
1271        let result = checker.check(SchemaType::Json, new, &[old]).unwrap();
1272        assert!(
1273            result.is_compatible,
1274            "integer -> number widening should be backward compatible"
1275        );
1276    }
1277
1278    // Protobuf compatibility tests
1279    #[test]
1280    fn test_protobuf_field_number_reuse() {
1281        let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1282
1283        let old = r#"
1284            syntax = "proto3";
1285            message User {
1286                int64 id = 1;
1287                string name = 2;
1288            }
1289        "#;
1290        let new = r#"
1291            syntax = "proto3";
1292            message User {
1293                int64 id = 1;
1294                string email = 2;
1295            }
1296        "#;
1297
1298        let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1299        assert!(
1300            !result.is_compatible,
1301            "Reusing field number with different name should be incompatible"
1302        );
1303    }
1304
1305    #[test]
1306    fn test_protobuf_add_new_field() {
1307        let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1308
1309        let old = r#"
1310            syntax = "proto3";
1311            message User {
1312                int64 id = 1;
1313            }
1314        "#;
1315        let new = r#"
1316            syntax = "proto3";
1317            message User {
1318                int64 id = 1;
1319                string name = 2;
1320            }
1321        "#;
1322
1323        let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1324        assert!(
1325            result.is_compatible,
1326            "Adding new field with new number should be compatible: {:?}",
1327            result.messages
1328        );
1329    }
1330
1331    #[test]
1332    fn test_protobuf_reserved_field_reuse() {
1333        let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1334
1335        let old = r#"
1336            syntax = "proto3";
1337            message User {
1338                int64 id = 1;
1339                reserved 2, 3;
1340            }
1341        "#;
1342        let new = r#"
1343            syntax = "proto3";
1344            message User {
1345                int64 id = 1;
1346                string name = 2;
1347            }
1348        "#;
1349
1350        let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1351        assert!(
1352            !result.is_compatible,
1353            "Reusing reserved field number should be incompatible"
1354        );
1355    }
1356
1357    #[test]
1358    fn test_protobuf_field_type_change() {
1359        let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1360
1361        let old = r#"
1362            syntax = "proto3";
1363            message Event {
1364                int32 id = 1;
1365                string payload = 2;
1366            }
1367        "#;
1368        let new = r#"
1369            syntax = "proto3";
1370            message Event {
1371                int32 id = 1;
1372                bytes payload = 2;
1373            }
1374        "#;
1375
1376        let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1377        assert!(
1378            !result.is_compatible,
1379            "Changing field type should be incompatible: {:?}",
1380            result.messages
1381        );
1382    }
1383
1384    #[test]
1385    fn test_protobuf_nested_message() {
1386        let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1387
1388        let old = r#"
1389            syntax = "proto3";
1390            message Outer {
1391                int32 id = 1;
1392                message Inner {
1393                    string value = 1;
1394                }
1395                Inner data = 2;
1396            }
1397        "#;
1398        let new = r#"
1399            syntax = "proto3";
1400            message Outer {
1401                int32 id = 1;
1402                message Inner {
1403                    int32 value = 1;
1404                }
1405                Inner data = 2;
1406            }
1407        "#;
1408
1409        let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1410        assert!(
1411            !result.is_compatible,
1412            "Type change in nested message should be detected: {:?}",
1413            result.messages
1414        );
1415    }
1416
1417    #[test]
1418    fn test_protobuf_reserved_range() {
1419        let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1420
1421        let old = r#"
1422            syntax = "proto3";
1423            message Data {
1424                int32 id = 1;
1425                reserved 5 to 10;
1426            }
1427        "#;
1428        let new = r#"
1429            syntax = "proto3";
1430            message Data {
1431                int32 id = 1;
1432                string tag = 7;
1433            }
1434        "#;
1435
1436        let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1437        assert!(
1438            !result.is_compatible,
1439            "Using field number from reserved range should be incompatible"
1440        );
1441    }
1442
1443    #[test]
1444    fn test_protobuf_reserved_name() {
1445        let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1446
1447        let old = r#"
1448            syntax = "proto3";
1449            message Data {
1450                int32 id = 1;
1451                reserved "old_field";
1452            }
1453        "#;
1454        let new = r#"
1455            syntax = "proto3";
1456            message Data {
1457                int32 id = 1;
1458                string old_field = 2;
1459            }
1460        "#;
1461
1462        let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1463        assert!(
1464            !result.is_compatible,
1465            "Reusing reserved field name should be incompatible"
1466        );
1467    }
1468
1469    #[test]
1470    fn test_protobuf_enum_value_removal() {
1471        let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1472
1473        let old = r#"
1474            syntax = "proto3";
1475            enum Status {
1476                UNKNOWN = 0;
1477                ACTIVE = 1;
1478                INACTIVE = 2;
1479            }
1480        "#;
1481        let new = r#"
1482            syntax = "proto3";
1483            enum Status {
1484                UNKNOWN = 0;
1485                ACTIVE = 1;
1486            }
1487        "#;
1488
1489        let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1490        assert!(
1491            !result.is_compatible,
1492            "Removing enum value should be incompatible"
1493        );
1494    }
1495
1496    #[test]
1497    fn test_protobuf_enum_value_addition() {
1498        let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1499
1500        let old = r#"
1501            syntax = "proto3";
1502            enum Status {
1503                UNKNOWN = 0;
1504                ACTIVE = 1;
1505            }
1506        "#;
1507        let new = r#"
1508            syntax = "proto3";
1509            enum Status {
1510                UNKNOWN = 0;
1511                ACTIVE = 1;
1512                INACTIVE = 2;
1513            }
1514        "#;
1515
1516        let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1517        assert!(
1518            result.is_compatible,
1519            "Adding enum value should be compatible: {:?}",
1520            result.messages
1521        );
1522    }
1523
1524    #[test]
1525    fn test_protobuf_map_field() {
1526        let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1527
1528        let old = r#"
1529            syntax = "proto3";
1530            message Config {
1531                int32 id = 1;
1532                map<string, string> labels = 2;
1533            }
1534        "#;
1535        let new = r#"
1536            syntax = "proto3";
1537            message Config {
1538                int32 id = 1;
1539                map<string, string> labels = 2;
1540                string name = 3;
1541            }
1542        "#;
1543
1544        let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1545        assert!(
1546            result.is_compatible,
1547            "Adding a new field alongside a map field should be compatible: {:?}",
1548            result.messages
1549        );
1550    }
1551
1552    #[test]
1553    fn test_protobuf_proto2_required_field_removal() {
1554        let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1555
1556        let old = r#"
1557            syntax = "proto2";
1558            message User {
1559                required int64 id = 1;
1560                required string name = 2;
1561            }
1562        "#;
1563        let new = r#"
1564            syntax = "proto2";
1565            message User {
1566                required int64 id = 1;
1567            }
1568        "#;
1569
1570        let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1571        assert!(
1572            !result.is_compatible,
1573            "Removing required field in proto2 should be incompatible"
1574        );
1575    }
1576
1577    #[test]
1578    fn test_protobuf_compatible_evolution() {
1579        let checker = CompatibilityChecker::new(CompatibilityLevel::Full);
1580
1581        let old = r#"
1582            syntax = "proto3";
1583            message User {
1584                int64 id = 1;
1585                string name = 2;
1586            }
1587        "#;
1588        let new = r#"
1589            syntax = "proto3";
1590            message User {
1591                int64 id = 1;
1592                string name = 2;
1593                string email = 3;
1594                int32 age = 4;
1595            }
1596        "#;
1597
1598        let result = checker.check(SchemaType::Protobuf, new, &[old]).unwrap();
1599        assert!(
1600            result.is_compatible,
1601            "Adding new fields with new numbers should be fully compatible: {:?}",
1602            result.messages
1603        );
1604    }
1605}