oxirs_stream/
schema_evolution.rs

1//! # Dynamic Schema Evolution and Migration
2//!
3//! This module provides comprehensive schema evolution capabilities for streaming
4//! RDF data, allowing schemas to evolve over time without downtime.
5//!
6//! ## Features
7//!
8//! - **Backward Compatibility**: Old consumers can read new data
9//! - **Forward Compatibility**: New consumers can read old data
10//! - **Full Compatibility**: Both backward and forward compatible
11//! - **Version Management**: Track and manage schema versions
12//! - **Migration Strategies**: Automatic data migration between versions
13//! - **Schema Validation**: Ensure data conforms to schemas
14//! - **Evolution Rules**: Define allowed schema changes
15//!
16//! ## Use Cases
17//!
18//! - **API Evolution**: Evolve APIs without breaking clients
19//! - **Data Migration**: Migrate data to new formats
20//! - **Feature Flags**: Gradually roll out schema changes
21//! - **A/B Testing**: Test new schemas with subset of data
22
23use anyhow::{anyhow, Result};
24use chrono::{DateTime, Utc};
25use dashmap::DashMap;
26use parking_lot::RwLock;
27use serde::{Deserialize, Serialize};
28use std::collections::HashMap;
29use std::sync::Arc;
30use tracing::{debug, info, warn};
31use uuid::Uuid;
32
33use crate::event::StreamEvent;
34
35/// Schema compatibility mode
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
37pub enum CompatibilityMode {
38    /// No compatibility checks
39    None,
40    /// New schema can read old data
41    Backward,
42    /// Old schema can read new data
43    Forward,
44    /// Both backward and forward compatible
45    Full,
46    /// Transitive backward compatibility
47    BackwardTransitive,
48    /// Transitive forward compatibility
49    ForwardTransitive,
50    /// Transitive full compatibility
51    FullTransitive,
52}
53
54/// Schema definition
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct SchemaDefinition {
57    /// Schema ID
58    pub schema_id: String,
59    /// Schema name
60    pub name: String,
61    /// Schema version
62    pub version: String,
63    /// Schema format (RDF, JSON Schema, Avro, etc.)
64    pub format: SchemaFormat,
65    /// Schema content
66    pub content: String,
67    /// Field definitions
68    pub fields: Vec<FieldDefinition>,
69    /// Creation timestamp
70    pub created_at: DateTime<Utc>,
71    /// Created by
72    pub created_by: String,
73    /// Description
74    pub description: Option<String>,
75    /// Tags
76    pub tags: Vec<String>,
77    /// Compatibility mode
78    pub compatibility: CompatibilityMode,
79}
80
81/// Field definition in a schema
82#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
83pub struct FieldDefinition {
84    /// Field name
85    pub name: String,
86    /// Field type
87    pub field_type: FieldType,
88    /// Is required
89    pub required: bool,
90    /// Default value
91    pub default_value: Option<String>,
92    /// Description
93    pub description: Option<String>,
94    /// Aliases (for backward compatibility)
95    pub aliases: Vec<String>,
96}
97
98/// Field type
99#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
100pub enum FieldType {
101    String,
102    Integer,
103    Float,
104    Boolean,
105    DateTime,
106    URI,
107    Literal,
108    Array { item_type: Box<FieldType> },
109    Object { fields: Vec<FieldDefinition> },
110    Union { types: Vec<FieldType> },
111    Custom { type_name: String },
112}
113
114/// Schema format
115#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
116pub enum SchemaFormat {
117    /// RDF Schema
118    RDFS,
119    /// OWL Ontology
120    OWL,
121    /// SHACL Shapes
122    SHACL,
123    /// JSON Schema
124    JsonSchema,
125    /// Apache Avro
126    Avro,
127    /// Protocol Buffers
128    Protobuf,
129    /// Custom format
130    Custom { format_name: String },
131}
132
133/// Schema change type
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub enum SchemaChange {
136    /// Add a new field
137    AddField { field: FieldDefinition },
138    /// Remove a field
139    RemoveField { field_name: String },
140    /// Modify field type
141    ModifyFieldType {
142        field_name: String,
143        old_type: FieldType,
144        new_type: FieldType,
145    },
146    /// Make field optional
147    MakeFieldOptional { field_name: String },
148    /// Make field required
149    MakeFieldRequired { field_name: String },
150    /// Add field alias
151    AddFieldAlias { field_name: String, alias: String },
152    /// Change default value
153    ChangeDefaultValue {
154        field_name: String,
155        old_default: Option<String>,
156        new_default: Option<String>,
157    },
158}
159
160/// Schema evolution result
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct EvolutionResult {
163    /// Success
164    pub success: bool,
165    /// New schema version
166    pub new_version: Option<String>,
167    /// Applied changes
168    pub changes: Vec<SchemaChange>,
169    /// Compatibility check result
170    pub compatibility_result: CompatibilityCheckResult,
171    /// Migration required
172    pub migration_required: bool,
173}
174
175/// Compatibility check result
176#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct CompatibilityCheckResult {
178    /// Is compatible
179    pub is_compatible: bool,
180    /// Compatibility issues
181    pub issues: Vec<CompatibilityIssue>,
182    /// Warnings
183    pub warnings: Vec<String>,
184}
185
186/// Compatibility issue
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct CompatibilityIssue {
189    /// Issue type
190    pub issue_type: CompatibilityIssueType,
191    /// Field name (if applicable)
192    pub field_name: Option<String>,
193    /// Description
194    pub description: String,
195    /// Severity
196    pub severity: IssueSeverity,
197}
198
199/// Compatibility issue type
200#[derive(Debug, Clone, Serialize, Deserialize)]
201pub enum CompatibilityIssueType {
202    /// Breaking change
203    BreakingChange,
204    /// Type mismatch
205    TypeMismatch,
206    /// Missing required field
207    MissingRequiredField,
208    /// Incompatible default value
209    IncompatibleDefaultValue,
210    /// Other issue
211    Other,
212}
213
214/// Issue severity
215#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
216pub enum IssueSeverity {
217    Info,
218    Warning,
219    Error,
220    Critical,
221}
222
223/// Migration strategy
224#[derive(Debug, Clone, Serialize, Deserialize)]
225pub enum MigrationStrategy {
226    /// No migration needed
227    None,
228    /// Automatic migration with default values
229    Automatic,
230    /// Custom migration function
231    Custom { migration_id: String },
232    /// Manual migration required
233    Manual,
234}
235
236/// Data migration rule
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct MigrationRule {
239    /// From version
240    pub from_version: String,
241    /// To version
242    pub to_version: String,
243    /// Migration strategy
244    pub strategy: MigrationStrategy,
245    /// Field mappings
246    pub field_mappings: HashMap<String, String>,
247    /// Transformation functions (as strings)
248    pub transformations: HashMap<String, String>,
249}
250
251/// Schema version metadata
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct SchemaVersion {
254    /// Version number
255    pub version: String,
256    /// Schema definition
257    pub schema: SchemaDefinition,
258    /// Previous version
259    pub previous_version: Option<String>,
260    /// Migration rules from previous version
261    pub migration_rule: Option<MigrationRule>,
262    /// Is active version
263    pub is_active: bool,
264    /// Deprecation info
265    pub deprecated: Option<DeprecationInfo>,
266}
267
268/// Deprecation information
269#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct DeprecationInfo {
271    /// Deprecated at
272    pub deprecated_at: DateTime<Utc>,
273    /// Sunset date (when it will be removed)
274    pub sunset_date: Option<DateTime<Utc>>,
275    /// Deprecation reason
276    pub reason: String,
277    /// Migration guide
278    pub migration_guide: Option<String>,
279}
280
281/// Schema evolution manager
282pub struct SchemaEvolutionManager {
283    /// Schema versions by name
284    schemas: Arc<DashMap<String, Vec<SchemaVersion>>>,
285    /// Active schema versions
286    active_versions: Arc<DashMap<String, String>>,
287    /// Compatibility rules
288    compatibility_rules: Arc<RwLock<HashMap<String, CompatibilityMode>>>,
289    /// Migration rules
290    migration_rules: Arc<DashMap<String, Vec<MigrationRule>>>,
291    /// Schema evolution history
292    evolution_history: Arc<RwLock<Vec<SchemaEvolutionEvent>>>,
293}
294
295/// Schema evolution event for auditing
296#[derive(Debug, Clone, Serialize, Deserialize)]
297pub struct SchemaEvolutionEvent {
298    pub event_id: String,
299    pub timestamp: DateTime<Utc>,
300    pub schema_name: String,
301    pub old_version: Option<String>,
302    pub new_version: String,
303    pub changes: Vec<SchemaChange>,
304    pub user: String,
305}
306
307impl SchemaEvolutionManager {
308    /// Create a new schema evolution manager
309    pub fn new() -> Self {
310        Self {
311            schemas: Arc::new(DashMap::new()),
312            active_versions: Arc::new(DashMap::new()),
313            compatibility_rules: Arc::new(RwLock::new(HashMap::new())),
314            migration_rules: Arc::new(DashMap::new()),
315            evolution_history: Arc::new(RwLock::new(Vec::new())),
316        }
317    }
318
319    /// Register a new schema
320    pub fn register_schema(&self, schema: SchemaDefinition) -> Result<String> {
321        let schema_name = schema.name.clone();
322        let version = schema.version.clone();
323
324        let schema_version = SchemaVersion {
325            version: version.clone(),
326            schema,
327            previous_version: None,
328            migration_rule: None,
329            is_active: true,
330            deprecated: None,
331        };
332
333        // Add to schemas
334        self.schemas
335            .entry(schema_name.clone())
336            .or_default()
337            .push(schema_version);
338
339        // Set as active version
340        self.active_versions
341            .insert(schema_name.clone(), version.clone());
342
343        info!("Registered schema {} version {}", schema_name, version);
344        Ok(version)
345    }
346
347    /// Evolve a schema with new changes
348    pub fn evolve_schema(
349        &self,
350        schema_name: &str,
351        changes: Vec<SchemaChange>,
352        user: String,
353    ) -> Result<EvolutionResult> {
354        // Get current active version
355        let current_version = self
356            .active_versions
357            .get(schema_name)
358            .ok_or_else(|| anyhow!("Schema not found: {}", schema_name))?
359            .clone();
360
361        let current_schema = self.get_schema(schema_name, &current_version)?;
362
363        // Apply changes to create new schema
364        let new_schema = self.apply_changes(&current_schema.schema, &changes)?;
365
366        // Check compatibility
367        let compatibility_result = self.check_compatibility(&current_schema.schema, &new_schema)?;
368
369        if !compatibility_result.is_compatible {
370            warn!(
371                "Schema evolution would break compatibility: {:?}",
372                compatibility_result.issues
373            );
374            return Ok(EvolutionResult {
375                success: false,
376                new_version: None,
377                changes,
378                compatibility_result,
379                migration_required: false,
380            });
381        }
382
383        // Generate new version number
384        let new_version = self.generate_version(&current_version, &changes)?;
385
386        // Determine if migration is required
387        let migration_required = self.is_migration_required(&changes);
388
389        // Create migration rule if needed
390        let migration_rule = if migration_required {
391            Some(self.create_migration_rule(&current_version, &new_version, &changes)?)
392        } else {
393            None
394        };
395
396        // Create new schema version
397        let new_schema_version = SchemaVersion {
398            version: new_version.clone(),
399            schema: new_schema,
400            previous_version: Some(current_version.clone()),
401            migration_rule: migration_rule.clone(),
402            is_active: true,
403            deprecated: None,
404        };
405
406        // Add to schemas
407        if let Some(mut versions) = self.schemas.get_mut(schema_name) {
408            // Mark previous version as inactive
409            if let Some(prev) = versions.iter_mut().find(|v| v.version == current_version) {
410                prev.is_active = false;
411            }
412            versions.push(new_schema_version);
413        }
414
415        // Update active version
416        self.active_versions
417            .insert(schema_name.to_string(), new_version.clone());
418
419        // Store migration rule
420        if let Some(rule) = migration_rule {
421            self.migration_rules
422                .entry(schema_name.to_string())
423                .or_default()
424                .push(rule);
425        }
426
427        // Record evolution event
428        let event = SchemaEvolutionEvent {
429            event_id: Uuid::new_v4().to_string(),
430            timestamp: Utc::now(),
431            schema_name: schema_name.to_string(),
432            old_version: Some(current_version),
433            new_version: new_version.clone(),
434            changes: changes.clone(),
435            user,
436        };
437        self.evolution_history.write().push(event);
438
439        info!("Evolved schema {} to version {}", schema_name, new_version);
440
441        Ok(EvolutionResult {
442            success: true,
443            new_version: Some(new_version),
444            changes,
445            compatibility_result,
446            migration_required,
447        })
448    }
449
450    /// Get a specific schema version
451    pub fn get_schema(&self, schema_name: &str, version: &str) -> Result<SchemaVersion> {
452        let versions = self
453            .schemas
454            .get(schema_name)
455            .ok_or_else(|| anyhow!("Schema not found: {}", schema_name))?;
456
457        versions
458            .iter()
459            .find(|v| v.version == version)
460            .cloned()
461            .ok_or_else(|| anyhow!("Version not found: {}", version))
462    }
463
464    /// Get the active schema version
465    pub fn get_active_schema(&self, schema_name: &str) -> Result<SchemaVersion> {
466        let active_version = self
467            .active_versions
468            .get(schema_name)
469            .ok_or_else(|| anyhow!("Schema not found: {}", schema_name))?
470            .clone();
471
472        self.get_schema(schema_name, &active_version)
473    }
474
475    /// Apply schema changes to create new schema
476    fn apply_changes(
477        &self,
478        current_schema: &SchemaDefinition,
479        changes: &[SchemaChange],
480    ) -> Result<SchemaDefinition> {
481        let mut new_schema = current_schema.clone();
482        let mut fields = new_schema.fields.clone();
483
484        for change in changes {
485            match change {
486                SchemaChange::AddField { field } => {
487                    fields.push(field.clone());
488                }
489                SchemaChange::RemoveField { field_name } => {
490                    fields.retain(|f| f.name != *field_name);
491                }
492                SchemaChange::ModifyFieldType {
493                    field_name,
494                    new_type,
495                    ..
496                } => {
497                    if let Some(field) = fields.iter_mut().find(|f| f.name == *field_name) {
498                        field.field_type = new_type.clone();
499                    }
500                }
501                SchemaChange::MakeFieldOptional { field_name } => {
502                    if let Some(field) = fields.iter_mut().find(|f| f.name == *field_name) {
503                        field.required = false;
504                    }
505                }
506                SchemaChange::MakeFieldRequired { field_name } => {
507                    if let Some(field) = fields.iter_mut().find(|f| f.name == *field_name) {
508                        field.required = true;
509                    }
510                }
511                SchemaChange::AddFieldAlias { field_name, alias } => {
512                    if let Some(field) = fields.iter_mut().find(|f| f.name == *field_name) {
513                        if !field.aliases.contains(alias) {
514                            field.aliases.push(alias.clone());
515                        }
516                    }
517                }
518                SchemaChange::ChangeDefaultValue {
519                    field_name,
520                    new_default,
521                    ..
522                } => {
523                    if let Some(field) = fields.iter_mut().find(|f| f.name == *field_name) {
524                        field.default_value = new_default.clone();
525                    }
526                }
527            }
528        }
529
530        new_schema.fields = fields;
531        Ok(new_schema)
532    }
533
534    /// Check compatibility between two schemas
535    fn check_compatibility(
536        &self,
537        old_schema: &SchemaDefinition,
538        new_schema: &SchemaDefinition,
539    ) -> Result<CompatibilityCheckResult> {
540        let mut issues = Vec::new();
541        let mut warnings = Vec::new();
542
543        let old_fields: HashMap<String, &FieldDefinition> = old_schema
544            .fields
545            .iter()
546            .map(|f| (f.name.clone(), f))
547            .collect();
548        let new_fields: HashMap<String, &FieldDefinition> = new_schema
549            .fields
550            .iter()
551            .map(|f| (f.name.clone(), f))
552            .collect();
553
554        // Check backward compatibility
555        if matches!(
556            new_schema.compatibility,
557            CompatibilityMode::Backward
558                | CompatibilityMode::Full
559                | CompatibilityMode::BackwardTransitive
560                | CompatibilityMode::FullTransitive
561        ) {
562            // Check backward compatibility: new schema can read old data
563            for (field_name, old_field) in &old_fields {
564                if let Some(new_field) = new_fields.get(field_name) {
565                    // Field exists in both - check type compatibility
566                    if old_field.field_type != new_field.field_type {
567                        issues.push(CompatibilityIssue {
568                            issue_type: CompatibilityIssueType::TypeMismatch,
569                            field_name: Some(field_name.clone()),
570                            description: format!(
571                                "Field type changed from {:?} to {:?}",
572                                old_field.field_type, new_field.field_type
573                            ),
574                            severity: IssueSeverity::Error,
575                        });
576                    }
577                } else if old_field.required {
578                    // Required field removed
579                    issues.push(CompatibilityIssue {
580                        issue_type: CompatibilityIssueType::BreakingChange,
581                        field_name: Some(field_name.clone()),
582                        description: format!("Required field '{}' was removed", field_name),
583                        severity: IssueSeverity::Critical,
584                    });
585                } else {
586                    warnings.push(format!("Optional field '{}' was removed", field_name));
587                }
588            }
589        }
590
591        // Check forward compatibility
592        if matches!(
593            new_schema.compatibility,
594            CompatibilityMode::Forward
595                | CompatibilityMode::Full
596                | CompatibilityMode::ForwardTransitive
597                | CompatibilityMode::FullTransitive
598        ) {
599            // Check forward compatibility: old schema can read new data
600            for (field_name, new_field) in &new_fields {
601                if !old_fields.contains_key(field_name) && new_field.required {
602                    issues.push(CompatibilityIssue {
603                        issue_type: CompatibilityIssueType::MissingRequiredField,
604                        field_name: Some(field_name.clone()),
605                        description: format!(
606                            "New required field '{}' added without default value",
607                            field_name
608                        ),
609                        severity: IssueSeverity::Error,
610                    });
611                }
612            }
613        }
614
615        Ok(CompatibilityCheckResult {
616            is_compatible: issues.is_empty(),
617            issues,
618            warnings,
619        })
620    }
621
622    /// Generate new version number
623    fn generate_version(&self, current_version: &str, changes: &[SchemaChange]) -> Result<String> {
624        // Simple semantic versioning: major.minor.patch
625        let parts: Vec<&str> = current_version.split('.').collect();
626        if parts.len() != 3 {
627            return Err(anyhow!("Invalid version format: {}", current_version));
628        }
629
630        let major: u32 = parts[0].parse()?;
631        let minor: u32 = parts[1].parse()?;
632        let patch: u32 = parts[2].parse()?;
633
634        // Determine if this is a breaking change
635        let has_breaking_changes = changes.iter().any(|c| {
636            matches!(
637                c,
638                SchemaChange::RemoveField { .. }
639                    | SchemaChange::ModifyFieldType { .. }
640                    | SchemaChange::MakeFieldRequired { .. }
641            )
642        });
643
644        if has_breaking_changes {
645            Ok(format!("{}.0.0", major + 1))
646        } else if changes
647            .iter()
648            .any(|c| matches!(c, SchemaChange::AddField { .. }))
649        {
650            Ok(format!("{}.{}.0", major, minor + 1))
651        } else {
652            Ok(format!("{}.{}.{}", major, minor, patch + 1))
653        }
654    }
655
656    /// Check if migration is required
657    fn is_migration_required(&self, changes: &[SchemaChange]) -> bool {
658        changes.iter().any(|c| match c {
659            SchemaChange::ModifyFieldType { .. } => true,
660            SchemaChange::RemoveField { .. } => true,
661            SchemaChange::AddField { field } => field.required,
662            _ => false,
663        })
664    }
665
666    /// Create migration rule
667    fn create_migration_rule(
668        &self,
669        from_version: &str,
670        to_version: &str,
671        changes: &[SchemaChange],
672    ) -> Result<MigrationRule> {
673        let mut field_mappings = HashMap::new();
674        let mut transformations = HashMap::new();
675
676        for change in changes {
677            match change {
678                SchemaChange::AddFieldAlias { field_name, alias } => {
679                    field_mappings.insert(alias.clone(), field_name.clone());
680                }
681                SchemaChange::ModifyFieldType {
682                    field_name,
683                    old_type,
684                    new_type,
685                } => {
686                    transformations.insert(
687                        field_name.clone(),
688                        format!("convert_{:?}_to_{:?}", old_type, new_type),
689                    );
690                }
691                _ => {}
692            }
693        }
694
695        Ok(MigrationRule {
696            from_version: from_version.to_string(),
697            to_version: to_version.to_string(),
698            strategy: if transformations.is_empty() {
699                MigrationStrategy::Automatic
700            } else {
701                MigrationStrategy::Custom {
702                    migration_id: Uuid::new_v4().to_string(),
703                }
704            },
705            field_mappings,
706            transformations,
707        })
708    }
709
710    /// Migrate event to a specific schema version
711    pub fn migrate_event(
712        &self,
713        event: &StreamEvent,
714        from_version: &str,
715        to_version: &str,
716        schema_name: &str,
717    ) -> Result<StreamEvent> {
718        // Get migration rule
719        let migration_rule = self
720            .migration_rules
721            .get(schema_name)
722            .and_then(|rules| {
723                rules
724                    .iter()
725                    .find(|r| r.from_version == from_version && r.to_version == to_version)
726                    .cloned()
727            })
728            .ok_or_else(|| {
729                anyhow!(
730                    "No migration rule found from {} to {}",
731                    from_version,
732                    to_version
733                )
734            })?;
735
736        // Apply migration strategy
737        match migration_rule.strategy {
738            MigrationStrategy::None => Ok(event.clone()),
739            MigrationStrategy::Automatic => {
740                // Automatic migration - apply field mappings
741                debug!("Applying automatic migration with field mappings");
742                Ok(event.clone()) // Simplified - would apply mappings in real implementation
743            }
744            MigrationStrategy::Custom { ref migration_id } => {
745                debug!("Custom migration required: {}", migration_id);
746                Ok(event.clone()) // Would call custom migration function
747            }
748            MigrationStrategy::Manual => Err(anyhow!("Manual migration required")),
749        }
750    }
751
752    /// Deprecate a schema version
753    pub fn deprecate_version(
754        &self,
755        schema_name: &str,
756        version: &str,
757        reason: String,
758        sunset_date: Option<DateTime<Utc>>,
759    ) -> Result<()> {
760        if let Some(mut versions) = self.schemas.get_mut(schema_name) {
761            if let Some(schema_version) = versions.iter_mut().find(|v| v.version == version) {
762                schema_version.deprecated = Some(DeprecationInfo {
763                    deprecated_at: Utc::now(),
764                    sunset_date,
765                    reason,
766                    migration_guide: None,
767                });
768                info!("Deprecated schema {} version {}", schema_name, version);
769                return Ok(());
770            }
771        }
772        Err(anyhow!("Schema version not found"))
773    }
774
775    /// Get evolution history
776    pub fn get_evolution_history(&self, schema_name: &str) -> Vec<SchemaEvolutionEvent> {
777        self.evolution_history
778            .read()
779            .iter()
780            .filter(|e| e.schema_name == schema_name)
781            .cloned()
782            .collect()
783    }
784}
785
786impl Default for SchemaEvolutionManager {
787    fn default() -> Self {
788        Self::new()
789    }
790}
791
792#[cfg(test)]
793mod tests {
794    use super::*;
795
796    #[test]
797    fn test_schema_registration() {
798        let manager = SchemaEvolutionManager::new();
799
800        let schema = SchemaDefinition {
801            schema_id: Uuid::new_v4().to_string(),
802            name: "TestSchema".to_string(),
803            version: "1.0.0".to_string(),
804            format: SchemaFormat::RDFS,
805            content: "test content".to_string(),
806            fields: vec![FieldDefinition {
807                name: "name".to_string(),
808                field_type: FieldType::String,
809                required: true,
810                default_value: None,
811                description: None,
812                aliases: Vec::new(),
813            }],
814            created_at: Utc::now(),
815            created_by: "test".to_string(),
816            description: None,
817            tags: Vec::new(),
818            compatibility: CompatibilityMode::Backward,
819        };
820
821        let version = manager.register_schema(schema).unwrap();
822        assert_eq!(version, "1.0.0");
823
824        let active = manager.get_active_schema("TestSchema").unwrap();
825        assert_eq!(active.version, "1.0.0");
826    }
827
828    #[test]
829    fn test_schema_evolution() {
830        let manager = SchemaEvolutionManager::new();
831
832        let schema = SchemaDefinition {
833            schema_id: Uuid::new_v4().to_string(),
834            name: "TestSchema".to_string(),
835            version: "1.0.0".to_string(),
836            format: SchemaFormat::RDFS,
837            content: "test content".to_string(),
838            fields: vec![FieldDefinition {
839                name: "name".to_string(),
840                field_type: FieldType::String,
841                required: true,
842                default_value: None,
843                description: None,
844                aliases: Vec::new(),
845            }],
846            created_at: Utc::now(),
847            created_by: "test".to_string(),
848            description: None,
849            tags: Vec::new(),
850            compatibility: CompatibilityMode::Backward,
851        };
852
853        manager.register_schema(schema).unwrap();
854
855        // Add optional field (backward compatible)
856        let changes = vec![SchemaChange::AddField {
857            field: FieldDefinition {
858                name: "email".to_string(),
859                field_type: FieldType::String,
860                required: false,
861                default_value: Some("".to_string()),
862                description: None,
863                aliases: Vec::new(),
864            },
865        }];
866
867        let result = manager
868            .evolve_schema("TestSchema", changes, "test".to_string())
869            .unwrap();
870
871        assert!(result.success);
872        assert!(result.compatibility_result.is_compatible);
873        assert_eq!(result.new_version, Some("1.1.0".to_string()));
874    }
875
876    #[test]
877    fn test_breaking_change_detection() {
878        let manager = SchemaEvolutionManager::new();
879
880        let schema = SchemaDefinition {
881            schema_id: Uuid::new_v4().to_string(),
882            name: "TestSchema".to_string(),
883            version: "1.0.0".to_string(),
884            format: SchemaFormat::RDFS,
885            content: "test content".to_string(),
886            fields: vec![FieldDefinition {
887                name: "name".to_string(),
888                field_type: FieldType::String,
889                required: true,
890                default_value: None,
891                description: None,
892                aliases: Vec::new(),
893            }],
894            created_at: Utc::now(),
895            created_by: "test".to_string(),
896            description: None,
897            tags: Vec::new(),
898            compatibility: CompatibilityMode::Backward,
899        };
900
901        manager.register_schema(schema).unwrap();
902
903        // Remove required field (breaking change)
904        let changes = vec![SchemaChange::RemoveField {
905            field_name: "name".to_string(),
906        }];
907
908        let result = manager
909            .evolve_schema("TestSchema", changes, "test".to_string())
910            .unwrap();
911
912        assert!(!result.success);
913        assert!(!result.compatibility_result.is_compatible);
914    }
915}