Skip to main content

tl_compiler/
schema.rs

1// ThinkingLanguage — Schema Registry & Evolution
2// Licensed under MIT OR Apache-2.0
3//
4// Phase 21: Versioned schema registry with compatibility checking.
5// When the `native` feature is disabled (WASM builds), a minimal stub is provided.
6
7#[cfg(feature = "native")]
8mod native {
9    use std::collections::HashMap;
10    use std::sync::Arc;
11    use tl_data::{ArrowDataType, ArrowField, ArrowSchema};
12
13    /// Runtime schema registry for versioned schemas.
14    #[derive(Debug, Clone, Default)]
15    pub struct SchemaRegistry {
16        /// Map of schema name → list of (version, schema) ordered by version
17        schemas: HashMap<String, Vec<VersionedSchema>>,
18        /// Registered migrations: (schema, from_ver, to_ver) → ops
19        migrations: HashMap<(String, i64, i64), Vec<MigrationOp>>,
20    }
21
22    /// A versioned schema entry.
23    #[derive(Debug, Clone)]
24    pub struct VersionedSchema {
25        pub version: i64,
26        pub schema: Arc<ArrowSchema>,
27        pub metadata: SchemaMetadata,
28    }
29
30    /// Metadata about a versioned schema's fields.
31    #[derive(Debug, Clone, Default)]
32    pub struct SchemaMetadata {
33        /// field name → version when added
34        pub field_since: HashMap<String, i64>,
35        /// field name → version when deprecated
36        pub field_deprecated: HashMap<String, i64>,
37        /// field name → default value as string
38        pub field_defaults: HashMap<String, String>,
39    }
40
41    /// Compatibility mode for schema evolution checking.
42    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
43    pub enum CompatibilityMode {
44        /// New schema can read old data (additions OK, removals NOT OK)
45        Backward,
46        /// Old schema can read new data (removals OK, additions with defaults OK)
47        Forward,
48        /// Both backward and forward compatible
49        Full,
50        /// No compatibility checking
51        None,
52    }
53
54    /// Difference between two schema versions.
55    #[derive(Debug, Clone, PartialEq)]
56    pub enum SchemaDiff {
57        FieldAdded {
58            name: String,
59            type_name: String,
60        },
61        FieldRemoved {
62            name: String,
63        },
64        FieldRenamed {
65            from: String,
66            to: String,
67        },
68        TypeChanged {
69            field: String,
70            from: String,
71            to: String,
72        },
73    }
74
75    /// A compatibility issue found during checking.
76    #[derive(Debug, Clone, PartialEq)]
77    pub enum CompatIssue {
78        FieldRemovedNotBackward(String),
79        FieldAddedNoDefault(String),
80        TypeNarrowed {
81            field: String,
82            from: String,
83            to: String,
84        },
85    }
86
87    /// A migration operation stored in the registry.
88    #[derive(Debug, Clone)]
89    pub enum MigrationOp {
90        AddColumn {
91            name: String,
92            type_name: String,
93            default: Option<String>,
94        },
95        DropColumn {
96            name: String,
97        },
98        RenameColumn {
99            from: String,
100            to: String,
101        },
102        AlterType {
103            column: String,
104            new_type: String,
105        },
106    }
107
108    impl SchemaRegistry {
109        pub fn new() -> Self {
110            Self::default()
111        }
112
113        /// Register a versioned schema. Returns error if version already exists.
114        pub fn register(
115            &mut self,
116            name: &str,
117            version: i64,
118            schema: Arc<ArrowSchema>,
119            metadata: SchemaMetadata,
120        ) -> Result<(), String> {
121            let entries = self.schemas.entry(name.to_string()).or_default();
122            if entries.iter().any(|e| e.version == version) {
123                return Err(format!(
124                    "Schema `{}` version {} already registered",
125                    name, version
126                ));
127            }
128            entries.push(VersionedSchema {
129                version,
130                schema,
131                metadata,
132            });
133            entries.sort_by_key(|e| e.version);
134            Ok(())
135        }
136
137        /// Get a specific version of a schema.
138        pub fn get(&self, name: &str, version: i64) -> Option<&VersionedSchema> {
139            self.schemas
140                .get(name)?
141                .iter()
142                .find(|e| e.version == version)
143        }
144
145        /// Get the latest version of a schema.
146        pub fn latest(&self, name: &str) -> Option<&VersionedSchema> {
147            self.schemas.get(name)?.last()
148        }
149
150        /// Get all versions of a schema, ordered by version.
151        pub fn history(&self, name: &str) -> Vec<&VersionedSchema> {
152            self.schemas
153                .get(name)
154                .map(|v| v.iter().collect())
155                .unwrap_or_default()
156        }
157
158        /// Get list of version numbers for a schema.
159        pub fn versions(&self, name: &str) -> Vec<i64> {
160            self.schemas
161                .get(name)
162                .map(|v| v.iter().map(|e| e.version).collect())
163                .unwrap_or_default()
164        }
165
166        /// Get field names and types for a specific version.
167        pub fn fields(&self, name: &str, version: i64) -> Vec<(String, String)> {
168            if let Some(vs) = self.get(name, version) {
169                vs.schema
170                    .fields()
171                    .iter()
172                    .map(|f| (f.name().to_string(), format!("{}", f.data_type())))
173                    .collect()
174            } else {
175                Vec::new()
176            }
177        }
178
179        /// Compute diff between two versions.
180        pub fn diff(&self, name: &str, v1: i64, v2: i64) -> Vec<SchemaDiff> {
181            let s1 = match self.get(name, v1) {
182                Some(s) => s,
183                None => return Vec::new(),
184            };
185            let s2 = match self.get(name, v2) {
186                Some(s) => s,
187                None => return Vec::new(),
188            };
189
190            let mut diffs = Vec::new();
191            let old_fields: HashMap<&str, &ArrowField> = s1
192                .schema
193                .fields()
194                .iter()
195                .map(|f| (f.name().as_str(), f.as_ref()))
196                .collect();
197            let new_fields: HashMap<&str, &ArrowField> = s2
198                .schema
199                .fields()
200                .iter()
201                .map(|f| (f.name().as_str(), f.as_ref()))
202                .collect();
203
204            // Check for renames first (from registered migrations)
205            let renames = self.get_renames(name, v1, v2);
206
207            // Fields in old but not in new
208            for name_str in old_fields.keys() {
209                if !new_fields.contains_key(name_str) {
210                    // Check if this was a rename
211                    if let Some(new_name) = renames.get(*name_str) {
212                        diffs.push(SchemaDiff::FieldRenamed {
213                            from: name_str.to_string(),
214                            to: new_name.clone(),
215                        });
216                    } else {
217                        diffs.push(SchemaDiff::FieldRemoved {
218                            name: name_str.to_string(),
219                        });
220                    }
221                }
222            }
223
224            // Fields in new but not in old
225            for (name_str, field) in &new_fields {
226                if !old_fields.contains_key(name_str) {
227                    // Check if this was a rename target
228                    let is_rename_target = renames.values().any(|v| v == *name_str);
229                    if !is_rename_target {
230                        diffs.push(SchemaDiff::FieldAdded {
231                            name: name_str.to_string(),
232                            type_name: format!("{}", field.data_type()),
233                        });
234                    }
235                }
236            }
237
238            // Type changes (fields in both)
239            for (name_str, old_field) in &old_fields {
240                if let Some(new_field) = new_fields.get(name_str)
241                    && old_field.data_type() != new_field.data_type()
242                {
243                    diffs.push(SchemaDiff::TypeChanged {
244                        field: name_str.to_string(),
245                        from: format!("{}", old_field.data_type()),
246                        to: format!("{}", new_field.data_type()),
247                    });
248                }
249            }
250
251            diffs
252        }
253
254        /// Check compatibility between two versions.
255        pub fn check_compatibility(
256            &self,
257            name: &str,
258            old_ver: i64,
259            new_ver: i64,
260            mode: CompatibilityMode,
261        ) -> Vec<CompatIssue> {
262            if mode == CompatibilityMode::None {
263                return Vec::new();
264            }
265
266            let old_schema = match self.get(name, old_ver) {
267                Some(s) => s,
268                None => return Vec::new(),
269            };
270            let new_schema = match self.get(name, new_ver) {
271                Some(s) => s,
272                None => return Vec::new(),
273            };
274
275            let mut issues = Vec::new();
276            let old_fields: HashMap<&str, &ArrowField> = old_schema
277                .schema
278                .fields()
279                .iter()
280                .map(|f| (f.name().as_str(), f.as_ref()))
281                .collect();
282            let new_fields: HashMap<&str, &ArrowField> = new_schema
283                .schema
284                .fields()
285                .iter()
286                .map(|f| (f.name().as_str(), f.as_ref()))
287                .collect();
288
289            // Backward compatibility: new must contain all old fields
290            if mode == CompatibilityMode::Backward || mode == CompatibilityMode::Full {
291                for name_str in old_fields.keys() {
292                    if !new_fields.contains_key(name_str) {
293                        issues.push(CompatIssue::FieldRemovedNotBackward(name_str.to_string()));
294                    }
295                }
296            }
297
298            // Forward compatibility: new fields must have defaults
299            if mode == CompatibilityMode::Forward || mode == CompatibilityMode::Full {
300                for name_str in new_fields.keys() {
301                    if !old_fields.contains_key(name_str) {
302                        // Check if field has a default in metadata
303                        let has_default =
304                            new_schema.metadata.field_defaults.contains_key(*name_str);
305                        if !has_default {
306                            issues.push(CompatIssue::FieldAddedNoDefault(name_str.to_string()));
307                        }
308                    }
309                }
310            }
311
312            // Type narrowing check (both directions)
313            for (name_str, old_field) in &old_fields {
314                if let Some(new_field) = new_fields.get(name_str)
315                    && old_field.data_type() != new_field.data_type()
316                    && !is_type_widening(old_field.data_type(), new_field.data_type())
317                {
318                    issues.push(CompatIssue::TypeNarrowed {
319                        field: name_str.to_string(),
320                        from: format!("{}", old_field.data_type()),
321                        to: format!("{}", new_field.data_type()),
322                    });
323                }
324            }
325
326            issues
327        }
328
329        /// Register a migration.
330        pub fn register_migration(
331            &mut self,
332            schema_name: &str,
333            from_ver: i64,
334            to_ver: i64,
335            ops: Vec<MigrationOp>,
336        ) {
337            self.migrations
338                .insert((schema_name.to_string(), from_ver, to_ver), ops);
339        }
340
341        /// Get renames from registered migrations.
342        fn get_renames(&self, name: &str, from_ver: i64, to_ver: i64) -> HashMap<String, String> {
343            let mut renames = HashMap::new();
344            if let Some(ops) = self.migrations.get(&(name.to_string(), from_ver, to_ver)) {
345                for op in ops {
346                    if let MigrationOp::RenameColumn { from, to } = op {
347                        renames.insert(from.clone(), to.clone());
348                    }
349                }
350            }
351            renames
352        }
353
354        /// Apply a migration to produce a new schema version.
355        pub fn apply_migration(
356            &mut self,
357            schema_name: &str,
358            from_ver: i64,
359            to_ver: i64,
360            ops: &[MigrationOp],
361        ) -> Result<(), String> {
362            let source = self
363                .get(schema_name, from_ver)
364                .ok_or_else(|| format!("Source schema `{}` v{} not found", schema_name, from_ver))?
365                .clone();
366
367            let mut fields: Vec<ArrowField> = source
368                .schema
369                .fields()
370                .iter()
371                .map(|f| f.as_ref().clone())
372                .collect();
373            let mut metadata = source.metadata.clone();
374
375            for op in ops {
376                match op {
377                    MigrationOp::AddColumn {
378                        name,
379                        type_name,
380                        default,
381                    } => {
382                        let dt = type_name_to_arrow(type_name);
383                        fields.push(ArrowField::new(name, dt, true));
384                        metadata.field_since.insert(name.clone(), to_ver);
385                        if let Some(def) = default {
386                            metadata.field_defaults.insert(name.clone(), def.clone());
387                        }
388                    }
389                    MigrationOp::DropColumn { name } => {
390                        fields.retain(|f| f.name() != name);
391                    }
392                    MigrationOp::RenameColumn { from, to } => {
393                        for f in &mut fields {
394                            if f.name() == from {
395                                *f = ArrowField::new(to, f.data_type().clone(), f.is_nullable());
396                            }
397                        }
398                    }
399                    MigrationOp::AlterType { column, new_type } => {
400                        let dt = type_name_to_arrow(new_type);
401                        for f in &mut fields {
402                            if f.name() == column {
403                                *f = ArrowField::new(column, dt.clone(), f.is_nullable());
404                            }
405                        }
406                    }
407                }
408            }
409
410            let new_schema = Arc::new(ArrowSchema::new(fields));
411            self.register(schema_name, to_ver, new_schema, metadata)?;
412            self.register_migration(schema_name, from_ver, to_ver, ops.to_vec());
413            Ok(())
414        }
415    }
416
417    /// Check if a type change is a widening (safe) operation.
418    fn is_type_widening(from: &ArrowDataType, to: &ArrowDataType) -> bool {
419        matches!(
420            (from, to),
421            (
422                ArrowDataType::Int8,
423                ArrowDataType::Int16
424                    | ArrowDataType::Int32
425                    | ArrowDataType::Int64
426                    | ArrowDataType::Float32
427                    | ArrowDataType::Float64
428            ) | (
429                ArrowDataType::Int16,
430                ArrowDataType::Int32
431                    | ArrowDataType::Int64
432                    | ArrowDataType::Float32
433                    | ArrowDataType::Float64
434            ) | (
435                ArrowDataType::Int32,
436                ArrowDataType::Int64 | ArrowDataType::Float64
437            ) | (ArrowDataType::Float32, ArrowDataType::Float64)
438        )
439    }
440
441    /// Convert a type name string to Arrow DataType (public for VM use).
442    pub fn type_name_to_arrow_pub(name: &str) -> ArrowDataType {
443        type_name_to_arrow(name)
444    }
445
446    /// Convert a type name string to Arrow DataType.
447    fn type_name_to_arrow(name: &str) -> ArrowDataType {
448        match name {
449            "int8" => ArrowDataType::Int8,
450            "int16" => ArrowDataType::Int16,
451            "int32" | "int" => ArrowDataType::Int32,
452            "int64" => ArrowDataType::Int64,
453            "float32" | "float" => ArrowDataType::Float32,
454            "float64" => ArrowDataType::Float64,
455            "string" | "utf8" | "Utf8" => ArrowDataType::Utf8,
456            "bool" | "boolean" => ArrowDataType::Boolean,
457            _ => ArrowDataType::Utf8, // fallback
458        }
459    }
460
461    impl CompatibilityMode {
462        pub fn from_str(s: &str) -> Self {
463            match s.to_lowercase().as_str() {
464                "backward" | "backwards" => CompatibilityMode::Backward,
465                "forward" | "forwards" => CompatibilityMode::Forward,
466                "full" => CompatibilityMode::Full,
467                "none" => CompatibilityMode::None,
468                _ => CompatibilityMode::Backward,
469            }
470        }
471    }
472
473    impl std::fmt::Display for SchemaDiff {
474        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
475            match self {
476                SchemaDiff::FieldAdded { name, type_name } => {
477                    write!(f, "added field `{}` ({})", name, type_name)
478                }
479                SchemaDiff::FieldRemoved { name } => write!(f, "removed field `{}`", name),
480                SchemaDiff::FieldRenamed { from, to } => {
481                    write!(f, "renamed field `{}` to `{}`", from, to)
482                }
483                SchemaDiff::TypeChanged { field, from, to } => {
484                    write!(f, "changed type of `{}` from {} to {}", field, from, to)
485                }
486            }
487        }
488    }
489
490    impl std::fmt::Display for CompatIssue {
491        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
492            match self {
493                CompatIssue::FieldRemovedNotBackward(name) => {
494                    write!(f, "field `{}` removed (breaks backward compat)", name)
495                }
496                CompatIssue::FieldAddedNoDefault(name) => write!(
497                    f,
498                    "field `{}` added without default (breaks forward compat)",
499                    name
500                ),
501                CompatIssue::TypeNarrowed { field, from, to } => {
502                    write!(f, "field `{}` type narrowed from {} to {}", field, from, to)
503                }
504            }
505        }
506    }
507
508    #[cfg(test)]
509    mod tests {
510        use super::*;
511
512        fn make_schema(fields: &[(&str, ArrowDataType)]) -> Arc<ArrowSchema> {
513            let arrow_fields: Vec<ArrowField> = fields
514                .iter()
515                .map(|(n, dt)| ArrowField::new(*n, dt.clone(), true))
516                .collect();
517            Arc::new(ArrowSchema::new(arrow_fields))
518        }
519
520        #[test]
521        fn test_register_schema_v1() {
522            let mut reg = SchemaRegistry::new();
523            let schema =
524                make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
525            assert!(
526                reg.register("User", 1, schema, SchemaMetadata::default())
527                    .is_ok()
528            );
529            assert!(reg.get("User", 1).is_some());
530        }
531
532        #[test]
533        fn test_register_schema_v2() {
534            let mut reg = SchemaRegistry::new();
535            let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
536            let s2 = make_schema(&[("id", ArrowDataType::Int64), ("email", ArrowDataType::Utf8)]);
537            reg.register("User", 1, s1, SchemaMetadata::default())
538                .unwrap();
539            reg.register("User", 2, s2, SchemaMetadata::default())
540                .unwrap();
541            assert!(reg.get("User", 2).is_some());
542        }
543
544        #[test]
545        fn test_get_specific_version() {
546            let mut reg = SchemaRegistry::new();
547            let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
548            let s2 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
549            reg.register("User", 1, s1, SchemaMetadata::default())
550                .unwrap();
551            reg.register("User", 2, s2, SchemaMetadata::default())
552                .unwrap();
553            let v1 = reg.get("User", 1).unwrap();
554            assert_eq!(v1.schema.fields().len(), 1);
555            let v2 = reg.get("User", 2).unwrap();
556            assert_eq!(v2.schema.fields().len(), 2);
557        }
558
559        #[test]
560        fn test_get_latest() {
561            let mut reg = SchemaRegistry::new();
562            let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
563            let s2 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
564            reg.register("User", 1, s1, SchemaMetadata::default())
565                .unwrap();
566            reg.register("User", 2, s2, SchemaMetadata::default())
567                .unwrap();
568            let latest = reg.latest("User").unwrap();
569            assert_eq!(latest.version, 2);
570        }
571
572        #[test]
573        fn test_history_ordered() {
574            let mut reg = SchemaRegistry::new();
575            let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
576            let s2 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
577            let s3 = make_schema(&[
578                ("id", ArrowDataType::Int64),
579                ("name", ArrowDataType::Utf8),
580                ("email", ArrowDataType::Utf8),
581            ]);
582            reg.register("User", 1, s1, SchemaMetadata::default())
583                .unwrap();
584            reg.register("User", 3, s3, SchemaMetadata::default())
585                .unwrap();
586            reg.register("User", 2, s2, SchemaMetadata::default())
587                .unwrap();
588            let hist = reg.history("User");
589            let versions: Vec<i64> = hist.iter().map(|v| v.version).collect();
590            assert_eq!(versions, vec![1, 2, 3]);
591        }
592
593        #[test]
594        fn test_backward_compat_adding_column_ok() {
595            let mut reg = SchemaRegistry::new();
596            let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
597            let s2 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
598            reg.register("User", 1, s1, SchemaMetadata::default())
599                .unwrap();
600            reg.register("User", 2, s2, SchemaMetadata::default())
601                .unwrap();
602            let issues = reg.check_compatibility("User", 1, 2, CompatibilityMode::Backward);
603            assert!(
604                issues.is_empty(),
605                "Adding column should be backward compatible, got: {:?}",
606                issues
607            );
608        }
609
610        #[test]
611        fn test_backward_compat_removing_column_fails() {
612            let mut reg = SchemaRegistry::new();
613            let s1 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
614            let s2 = make_schema(&[("id", ArrowDataType::Int64)]);
615            reg.register("User", 1, s1, SchemaMetadata::default())
616                .unwrap();
617            reg.register("User", 2, s2, SchemaMetadata::default())
618                .unwrap();
619            let issues = reg.check_compatibility("User", 1, 2, CompatibilityMode::Backward);
620            assert!(!issues.is_empty());
621            assert!(matches!(&issues[0], CompatIssue::FieldRemovedNotBackward(n) if n == "name"));
622        }
623
624        #[test]
625        fn test_backward_compat_type_widening_ok() {
626            let mut reg = SchemaRegistry::new();
627            let s1 = make_schema(&[("id", ArrowDataType::Int32)]);
628            let s2 = make_schema(&[("id", ArrowDataType::Int64)]);
629            reg.register("T", 1, s1, SchemaMetadata::default()).unwrap();
630            reg.register("T", 2, s2, SchemaMetadata::default()).unwrap();
631            let issues = reg.check_compatibility("T", 1, 2, CompatibilityMode::Backward);
632            assert!(
633                issues.is_empty(),
634                "Type widening Int32->Int64 should be backward compatible"
635            );
636        }
637
638        #[test]
639        fn test_backward_compat_type_narrowing_fails() {
640            let mut reg = SchemaRegistry::new();
641            let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
642            let s2 = make_schema(&[("id", ArrowDataType::Int32)]);
643            reg.register("T", 1, s1, SchemaMetadata::default()).unwrap();
644            reg.register("T", 2, s2, SchemaMetadata::default()).unwrap();
645            let issues = reg.check_compatibility("T", 1, 2, CompatibilityMode::Backward);
646            assert!(!issues.is_empty());
647            assert!(matches!(&issues[0], CompatIssue::TypeNarrowed { .. }));
648        }
649
650        #[test]
651        fn test_forward_compat_removing_column_ok() {
652            let mut reg = SchemaRegistry::new();
653            let s1 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
654            let s2 = make_schema(&[("id", ArrowDataType::Int64)]);
655            reg.register("User", 1, s1, SchemaMetadata::default())
656                .unwrap();
657            reg.register("User", 2, s2, SchemaMetadata::default())
658                .unwrap();
659            let issues = reg.check_compatibility("User", 1, 2, CompatibilityMode::Forward);
660            assert!(
661                issues.is_empty(),
662                "Removing column should be forward compatible"
663            );
664        }
665
666        #[test]
667        fn test_forward_compat_adding_without_default_fails() {
668            let mut reg = SchemaRegistry::new();
669            let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
670            let s2 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
671            reg.register("User", 1, s1, SchemaMetadata::default())
672                .unwrap();
673            reg.register("User", 2, s2, SchemaMetadata::default())
674                .unwrap();
675            let issues = reg.check_compatibility("User", 1, 2, CompatibilityMode::Forward);
676            assert!(!issues.is_empty());
677            assert!(matches!(&issues[0], CompatIssue::FieldAddedNoDefault(n) if n == "name"));
678        }
679
680        #[test]
681        fn test_full_compat() {
682            let mut reg = SchemaRegistry::new();
683            let s1 = make_schema(&[("id", ArrowDataType::Int32)]);
684            let mut meta = SchemaMetadata::default();
685            meta.field_defaults
686                .insert("name".to_string(), "\"\"".to_string());
687            let s2 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
688            reg.register("T", 1, s1, SchemaMetadata::default()).unwrap();
689            reg.register("T", 2, s2, meta).unwrap();
690            let issues = reg.check_compatibility("T", 1, 2, CompatibilityMode::Full);
691            assert!(
692                issues.is_empty(),
693                "Type widening + defaults should pass full compat, got: {:?}",
694                issues
695            );
696        }
697
698        #[test]
699        fn test_diff_additions_removals() {
700            let mut reg = SchemaRegistry::new();
701            let s1 = make_schema(&[
702                ("id", ArrowDataType::Int64),
703                ("old_field", ArrowDataType::Utf8),
704            ]);
705            let s2 = make_schema(&[
706                ("id", ArrowDataType::Int64),
707                ("new_field", ArrowDataType::Utf8),
708            ]);
709            reg.register("User", 1, s1, SchemaMetadata::default())
710                .unwrap();
711            reg.register("User", 2, s2, SchemaMetadata::default())
712                .unwrap();
713            let diffs = reg.diff("User", 1, 2);
714            assert!(
715                diffs
716                    .iter()
717                    .any(|d| matches!(d, SchemaDiff::FieldRemoved { name } if name == "old_field"))
718            );
719            assert!(
720                diffs.iter().any(
721                    |d| matches!(d, SchemaDiff::FieldAdded { name, .. } if name == "new_field")
722                )
723            );
724        }
725
726        #[test]
727        fn test_duplicate_version_error() {
728            let mut reg = SchemaRegistry::new();
729            let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
730            let s2 = make_schema(&[("id", ArrowDataType::Int64)]);
731            reg.register("User", 1, s1, SchemaMetadata::default())
732                .unwrap();
733            let result = reg.register("User", 1, s2, SchemaMetadata::default());
734            assert!(result.is_err());
735        }
736    }
737}
738
739#[cfg(feature = "native")]
740pub use native::*;
741
742// Stub schema registry for WASM builds (no Arrow dependency)
743#[cfg(not(feature = "native"))]
744#[derive(Debug, Clone, Default)]
745pub struct SchemaRegistry;
746
747#[cfg(not(feature = "native"))]
748impl SchemaRegistry {
749    pub fn new() -> Self {
750        SchemaRegistry
751    }
752}