1use anyhow::{anyhow, Result};
24use chrono::{DateTime, Utc};
25use dashmap::DashMap;
26use parking_lot::RwLock;
27use serde::{Deserialize, Serialize};
28use std::collections::HashMap;
29use std::sync::Arc;
30use tracing::{debug, info, warn};
31use uuid::Uuid;
32
33use crate::event::StreamEvent;
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
37pub enum CompatibilityMode {
38 None,
40 Backward,
42 Forward,
44 Full,
46 BackwardTransitive,
48 ForwardTransitive,
50 FullTransitive,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct SchemaDefinition {
57 pub schema_id: String,
59 pub name: String,
61 pub version: String,
63 pub format: SchemaFormat,
65 pub content: String,
67 pub fields: Vec<FieldDefinition>,
69 pub created_at: DateTime<Utc>,
71 pub created_by: String,
73 pub description: Option<String>,
75 pub tags: Vec<String>,
77 pub compatibility: CompatibilityMode,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
83pub struct FieldDefinition {
84 pub name: String,
86 pub field_type: FieldType,
88 pub required: bool,
90 pub default_value: Option<String>,
92 pub description: Option<String>,
94 pub aliases: Vec<String>,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
100pub enum FieldType {
101 String,
102 Integer,
103 Float,
104 Boolean,
105 DateTime,
106 URI,
107 Literal,
108 Array { item_type: Box<FieldType> },
109 Object { fields: Vec<FieldDefinition> },
110 Union { types: Vec<FieldType> },
111 Custom { type_name: String },
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
116pub enum SchemaFormat {
117 RDFS,
119 OWL,
121 SHACL,
123 JsonSchema,
125 Avro,
127 Protobuf,
129 Custom { format_name: String },
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
135pub enum SchemaChange {
136 AddField { field: FieldDefinition },
138 RemoveField { field_name: String },
140 ModifyFieldType {
142 field_name: String,
143 old_type: FieldType,
144 new_type: FieldType,
145 },
146 MakeFieldOptional { field_name: String },
148 MakeFieldRequired { field_name: String },
150 AddFieldAlias { field_name: String, alias: String },
152 ChangeDefaultValue {
154 field_name: String,
155 old_default: Option<String>,
156 new_default: Option<String>,
157 },
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct EvolutionResult {
163 pub success: bool,
165 pub new_version: Option<String>,
167 pub changes: Vec<SchemaChange>,
169 pub compatibility_result: CompatibilityCheckResult,
171 pub migration_required: bool,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct CompatibilityCheckResult {
178 pub is_compatible: bool,
180 pub issues: Vec<CompatibilityIssue>,
182 pub warnings: Vec<String>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct CompatibilityIssue {
189 pub issue_type: CompatibilityIssueType,
191 pub field_name: Option<String>,
193 pub description: String,
195 pub severity: IssueSeverity,
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
201pub enum CompatibilityIssueType {
202 BreakingChange,
204 TypeMismatch,
206 MissingRequiredField,
208 IncompatibleDefaultValue,
210 Other,
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
216pub enum IssueSeverity {
217 Info,
218 Warning,
219 Error,
220 Critical,
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize)]
225pub enum MigrationStrategy {
226 None,
228 Automatic,
230 Custom { migration_id: String },
232 Manual,
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct MigrationRule {
239 pub from_version: String,
241 pub to_version: String,
243 pub strategy: MigrationStrategy,
245 pub field_mappings: HashMap<String, String>,
247 pub transformations: HashMap<String, String>,
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct SchemaVersion {
254 pub version: String,
256 pub schema: SchemaDefinition,
258 pub previous_version: Option<String>,
260 pub migration_rule: Option<MigrationRule>,
262 pub is_active: bool,
264 pub deprecated: Option<DeprecationInfo>,
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct DeprecationInfo {
271 pub deprecated_at: DateTime<Utc>,
273 pub sunset_date: Option<DateTime<Utc>>,
275 pub reason: String,
277 pub migration_guide: Option<String>,
279}
280
281pub struct SchemaEvolutionManager {
283 schemas: Arc<DashMap<String, Vec<SchemaVersion>>>,
285 active_versions: Arc<DashMap<String, String>>,
287 compatibility_rules: Arc<RwLock<HashMap<String, CompatibilityMode>>>,
289 migration_rules: Arc<DashMap<String, Vec<MigrationRule>>>,
291 evolution_history: Arc<RwLock<Vec<SchemaEvolutionEvent>>>,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
297pub struct SchemaEvolutionEvent {
298 pub event_id: String,
299 pub timestamp: DateTime<Utc>,
300 pub schema_name: String,
301 pub old_version: Option<String>,
302 pub new_version: String,
303 pub changes: Vec<SchemaChange>,
304 pub user: String,
305}
306
307impl SchemaEvolutionManager {
308 pub fn new() -> Self {
310 Self {
311 schemas: Arc::new(DashMap::new()),
312 active_versions: Arc::new(DashMap::new()),
313 compatibility_rules: Arc::new(RwLock::new(HashMap::new())),
314 migration_rules: Arc::new(DashMap::new()),
315 evolution_history: Arc::new(RwLock::new(Vec::new())),
316 }
317 }
318
319 pub fn register_schema(&self, schema: SchemaDefinition) -> Result<String> {
321 let schema_name = schema.name.clone();
322 let version = schema.version.clone();
323
324 let schema_version = SchemaVersion {
325 version: version.clone(),
326 schema,
327 previous_version: None,
328 migration_rule: None,
329 is_active: true,
330 deprecated: None,
331 };
332
333 self.schemas
335 .entry(schema_name.clone())
336 .or_default()
337 .push(schema_version);
338
339 self.active_versions
341 .insert(schema_name.clone(), version.clone());
342
343 info!("Registered schema {} version {}", schema_name, version);
344 Ok(version)
345 }
346
347 pub fn evolve_schema(
349 &self,
350 schema_name: &str,
351 changes: Vec<SchemaChange>,
352 user: String,
353 ) -> Result<EvolutionResult> {
354 let current_version = self
356 .active_versions
357 .get(schema_name)
358 .ok_or_else(|| anyhow!("Schema not found: {}", schema_name))?
359 .clone();
360
361 let current_schema = self.get_schema(schema_name, ¤t_version)?;
362
363 let new_schema = self.apply_changes(¤t_schema.schema, &changes)?;
365
366 let compatibility_result = self.check_compatibility(¤t_schema.schema, &new_schema)?;
368
369 if !compatibility_result.is_compatible {
370 warn!(
371 "Schema evolution would break compatibility: {:?}",
372 compatibility_result.issues
373 );
374 return Ok(EvolutionResult {
375 success: false,
376 new_version: None,
377 changes,
378 compatibility_result,
379 migration_required: false,
380 });
381 }
382
383 let new_version = self.generate_version(¤t_version, &changes)?;
385
386 let migration_required = self.is_migration_required(&changes);
388
389 let migration_rule = if migration_required {
391 Some(self.create_migration_rule(¤t_version, &new_version, &changes)?)
392 } else {
393 None
394 };
395
396 let new_schema_version = SchemaVersion {
398 version: new_version.clone(),
399 schema: new_schema,
400 previous_version: Some(current_version.clone()),
401 migration_rule: migration_rule.clone(),
402 is_active: true,
403 deprecated: None,
404 };
405
406 if let Some(mut versions) = self.schemas.get_mut(schema_name) {
408 if let Some(prev) = versions.iter_mut().find(|v| v.version == current_version) {
410 prev.is_active = false;
411 }
412 versions.push(new_schema_version);
413 }
414
415 self.active_versions
417 .insert(schema_name.to_string(), new_version.clone());
418
419 if let Some(rule) = migration_rule {
421 self.migration_rules
422 .entry(schema_name.to_string())
423 .or_default()
424 .push(rule);
425 }
426
427 let event = SchemaEvolutionEvent {
429 event_id: Uuid::new_v4().to_string(),
430 timestamp: Utc::now(),
431 schema_name: schema_name.to_string(),
432 old_version: Some(current_version),
433 new_version: new_version.clone(),
434 changes: changes.clone(),
435 user,
436 };
437 self.evolution_history.write().push(event);
438
439 info!("Evolved schema {} to version {}", schema_name, new_version);
440
441 Ok(EvolutionResult {
442 success: true,
443 new_version: Some(new_version),
444 changes,
445 compatibility_result,
446 migration_required,
447 })
448 }
449
450 pub fn get_schema(&self, schema_name: &str, version: &str) -> Result<SchemaVersion> {
452 let versions = self
453 .schemas
454 .get(schema_name)
455 .ok_or_else(|| anyhow!("Schema not found: {}", schema_name))?;
456
457 versions
458 .iter()
459 .find(|v| v.version == version)
460 .cloned()
461 .ok_or_else(|| anyhow!("Version not found: {}", version))
462 }
463
464 pub fn get_active_schema(&self, schema_name: &str) -> Result<SchemaVersion> {
466 let active_version = self
467 .active_versions
468 .get(schema_name)
469 .ok_or_else(|| anyhow!("Schema not found: {}", schema_name))?
470 .clone();
471
472 self.get_schema(schema_name, &active_version)
473 }
474
475 fn apply_changes(
477 &self,
478 current_schema: &SchemaDefinition,
479 changes: &[SchemaChange],
480 ) -> Result<SchemaDefinition> {
481 let mut new_schema = current_schema.clone();
482 let mut fields = new_schema.fields.clone();
483
484 for change in changes {
485 match change {
486 SchemaChange::AddField { field } => {
487 fields.push(field.clone());
488 }
489 SchemaChange::RemoveField { field_name } => {
490 fields.retain(|f| f.name != *field_name);
491 }
492 SchemaChange::ModifyFieldType {
493 field_name,
494 new_type,
495 ..
496 } => {
497 if let Some(field) = fields.iter_mut().find(|f| f.name == *field_name) {
498 field.field_type = new_type.clone();
499 }
500 }
501 SchemaChange::MakeFieldOptional { field_name } => {
502 if let Some(field) = fields.iter_mut().find(|f| f.name == *field_name) {
503 field.required = false;
504 }
505 }
506 SchemaChange::MakeFieldRequired { field_name } => {
507 if let Some(field) = fields.iter_mut().find(|f| f.name == *field_name) {
508 field.required = true;
509 }
510 }
511 SchemaChange::AddFieldAlias { field_name, alias } => {
512 if let Some(field) = fields.iter_mut().find(|f| f.name == *field_name) {
513 if !field.aliases.contains(alias) {
514 field.aliases.push(alias.clone());
515 }
516 }
517 }
518 SchemaChange::ChangeDefaultValue {
519 field_name,
520 new_default,
521 ..
522 } => {
523 if let Some(field) = fields.iter_mut().find(|f| f.name == *field_name) {
524 field.default_value = new_default.clone();
525 }
526 }
527 }
528 }
529
530 new_schema.fields = fields;
531 Ok(new_schema)
532 }
533
534 fn check_compatibility(
536 &self,
537 old_schema: &SchemaDefinition,
538 new_schema: &SchemaDefinition,
539 ) -> Result<CompatibilityCheckResult> {
540 let mut issues = Vec::new();
541 let mut warnings = Vec::new();
542
543 let old_fields: HashMap<String, &FieldDefinition> = old_schema
544 .fields
545 .iter()
546 .map(|f| (f.name.clone(), f))
547 .collect();
548 let new_fields: HashMap<String, &FieldDefinition> = new_schema
549 .fields
550 .iter()
551 .map(|f| (f.name.clone(), f))
552 .collect();
553
554 if matches!(
556 new_schema.compatibility,
557 CompatibilityMode::Backward
558 | CompatibilityMode::Full
559 | CompatibilityMode::BackwardTransitive
560 | CompatibilityMode::FullTransitive
561 ) {
562 for (field_name, old_field) in &old_fields {
564 if let Some(new_field) = new_fields.get(field_name) {
565 if old_field.field_type != new_field.field_type {
567 issues.push(CompatibilityIssue {
568 issue_type: CompatibilityIssueType::TypeMismatch,
569 field_name: Some(field_name.clone()),
570 description: format!(
571 "Field type changed from {:?} to {:?}",
572 old_field.field_type, new_field.field_type
573 ),
574 severity: IssueSeverity::Error,
575 });
576 }
577 } else if old_field.required {
578 issues.push(CompatibilityIssue {
580 issue_type: CompatibilityIssueType::BreakingChange,
581 field_name: Some(field_name.clone()),
582 description: format!("Required field '{}' was removed", field_name),
583 severity: IssueSeverity::Critical,
584 });
585 } else {
586 warnings.push(format!("Optional field '{}' was removed", field_name));
587 }
588 }
589 }
590
591 if matches!(
593 new_schema.compatibility,
594 CompatibilityMode::Forward
595 | CompatibilityMode::Full
596 | CompatibilityMode::ForwardTransitive
597 | CompatibilityMode::FullTransitive
598 ) {
599 for (field_name, new_field) in &new_fields {
601 if !old_fields.contains_key(field_name) && new_field.required {
602 issues.push(CompatibilityIssue {
603 issue_type: CompatibilityIssueType::MissingRequiredField,
604 field_name: Some(field_name.clone()),
605 description: format!(
606 "New required field '{}' added without default value",
607 field_name
608 ),
609 severity: IssueSeverity::Error,
610 });
611 }
612 }
613 }
614
615 Ok(CompatibilityCheckResult {
616 is_compatible: issues.is_empty(),
617 issues,
618 warnings,
619 })
620 }
621
622 fn generate_version(&self, current_version: &str, changes: &[SchemaChange]) -> Result<String> {
624 let parts: Vec<&str> = current_version.split('.').collect();
626 if parts.len() != 3 {
627 return Err(anyhow!("Invalid version format: {}", current_version));
628 }
629
630 let major: u32 = parts[0].parse()?;
631 let minor: u32 = parts[1].parse()?;
632 let patch: u32 = parts[2].parse()?;
633
634 let has_breaking_changes = changes.iter().any(|c| {
636 matches!(
637 c,
638 SchemaChange::RemoveField { .. }
639 | SchemaChange::ModifyFieldType { .. }
640 | SchemaChange::MakeFieldRequired { .. }
641 )
642 });
643
644 if has_breaking_changes {
645 Ok(format!("{}.0.0", major + 1))
646 } else if changes
647 .iter()
648 .any(|c| matches!(c, SchemaChange::AddField { .. }))
649 {
650 Ok(format!("{}.{}.0", major, minor + 1))
651 } else {
652 Ok(format!("{}.{}.{}", major, minor, patch + 1))
653 }
654 }
655
656 fn is_migration_required(&self, changes: &[SchemaChange]) -> bool {
658 changes.iter().any(|c| match c {
659 SchemaChange::ModifyFieldType { .. } => true,
660 SchemaChange::RemoveField { .. } => true,
661 SchemaChange::AddField { field } => field.required,
662 _ => false,
663 })
664 }
665
666 fn create_migration_rule(
668 &self,
669 from_version: &str,
670 to_version: &str,
671 changes: &[SchemaChange],
672 ) -> Result<MigrationRule> {
673 let mut field_mappings = HashMap::new();
674 let mut transformations = HashMap::new();
675
676 for change in changes {
677 match change {
678 SchemaChange::AddFieldAlias { field_name, alias } => {
679 field_mappings.insert(alias.clone(), field_name.clone());
680 }
681 SchemaChange::ModifyFieldType {
682 field_name,
683 old_type,
684 new_type,
685 } => {
686 transformations.insert(
687 field_name.clone(),
688 format!("convert_{:?}_to_{:?}", old_type, new_type),
689 );
690 }
691 _ => {}
692 }
693 }
694
695 Ok(MigrationRule {
696 from_version: from_version.to_string(),
697 to_version: to_version.to_string(),
698 strategy: if transformations.is_empty() {
699 MigrationStrategy::Automatic
700 } else {
701 MigrationStrategy::Custom {
702 migration_id: Uuid::new_v4().to_string(),
703 }
704 },
705 field_mappings,
706 transformations,
707 })
708 }
709
710 pub fn migrate_event(
712 &self,
713 event: &StreamEvent,
714 from_version: &str,
715 to_version: &str,
716 schema_name: &str,
717 ) -> Result<StreamEvent> {
718 let migration_rule = self
720 .migration_rules
721 .get(schema_name)
722 .and_then(|rules| {
723 rules
724 .iter()
725 .find(|r| r.from_version == from_version && r.to_version == to_version)
726 .cloned()
727 })
728 .ok_or_else(|| {
729 anyhow!(
730 "No migration rule found from {} to {}",
731 from_version,
732 to_version
733 )
734 })?;
735
736 match migration_rule.strategy {
738 MigrationStrategy::None => Ok(event.clone()),
739 MigrationStrategy::Automatic => {
740 debug!("Applying automatic migration with field mappings");
742 Ok(event.clone()) }
744 MigrationStrategy::Custom { ref migration_id } => {
745 debug!("Custom migration required: {}", migration_id);
746 Ok(event.clone()) }
748 MigrationStrategy::Manual => Err(anyhow!("Manual migration required")),
749 }
750 }
751
752 pub fn deprecate_version(
754 &self,
755 schema_name: &str,
756 version: &str,
757 reason: String,
758 sunset_date: Option<DateTime<Utc>>,
759 ) -> Result<()> {
760 if let Some(mut versions) = self.schemas.get_mut(schema_name) {
761 if let Some(schema_version) = versions.iter_mut().find(|v| v.version == version) {
762 schema_version.deprecated = Some(DeprecationInfo {
763 deprecated_at: Utc::now(),
764 sunset_date,
765 reason,
766 migration_guide: None,
767 });
768 info!("Deprecated schema {} version {}", schema_name, version);
769 return Ok(());
770 }
771 }
772 Err(anyhow!("Schema version not found"))
773 }
774
775 pub fn get_evolution_history(&self, schema_name: &str) -> Vec<SchemaEvolutionEvent> {
777 self.evolution_history
778 .read()
779 .iter()
780 .filter(|e| e.schema_name == schema_name)
781 .cloned()
782 .collect()
783 }
784}
785
786impl Default for SchemaEvolutionManager {
787 fn default() -> Self {
788 Self::new()
789 }
790}
791
792#[cfg(test)]
793mod tests {
794 use super::*;
795
796 #[test]
797 fn test_schema_registration() {
798 let manager = SchemaEvolutionManager::new();
799
800 let schema = SchemaDefinition {
801 schema_id: Uuid::new_v4().to_string(),
802 name: "TestSchema".to_string(),
803 version: "1.0.0".to_string(),
804 format: SchemaFormat::RDFS,
805 content: "test content".to_string(),
806 fields: vec![FieldDefinition {
807 name: "name".to_string(),
808 field_type: FieldType::String,
809 required: true,
810 default_value: None,
811 description: None,
812 aliases: Vec::new(),
813 }],
814 created_at: Utc::now(),
815 created_by: "test".to_string(),
816 description: None,
817 tags: Vec::new(),
818 compatibility: CompatibilityMode::Backward,
819 };
820
821 let version = manager.register_schema(schema).unwrap();
822 assert_eq!(version, "1.0.0");
823
824 let active = manager.get_active_schema("TestSchema").unwrap();
825 assert_eq!(active.version, "1.0.0");
826 }
827
828 #[test]
829 fn test_schema_evolution() {
830 let manager = SchemaEvolutionManager::new();
831
832 let schema = SchemaDefinition {
833 schema_id: Uuid::new_v4().to_string(),
834 name: "TestSchema".to_string(),
835 version: "1.0.0".to_string(),
836 format: SchemaFormat::RDFS,
837 content: "test content".to_string(),
838 fields: vec![FieldDefinition {
839 name: "name".to_string(),
840 field_type: FieldType::String,
841 required: true,
842 default_value: None,
843 description: None,
844 aliases: Vec::new(),
845 }],
846 created_at: Utc::now(),
847 created_by: "test".to_string(),
848 description: None,
849 tags: Vec::new(),
850 compatibility: CompatibilityMode::Backward,
851 };
852
853 manager.register_schema(schema).unwrap();
854
855 let changes = vec![SchemaChange::AddField {
857 field: FieldDefinition {
858 name: "email".to_string(),
859 field_type: FieldType::String,
860 required: false,
861 default_value: Some("".to_string()),
862 description: None,
863 aliases: Vec::new(),
864 },
865 }];
866
867 let result = manager
868 .evolve_schema("TestSchema", changes, "test".to_string())
869 .unwrap();
870
871 assert!(result.success);
872 assert!(result.compatibility_result.is_compatible);
873 assert_eq!(result.new_version, Some("1.1.0".to_string()));
874 }
875
876 #[test]
877 fn test_breaking_change_detection() {
878 let manager = SchemaEvolutionManager::new();
879
880 let schema = SchemaDefinition {
881 schema_id: Uuid::new_v4().to_string(),
882 name: "TestSchema".to_string(),
883 version: "1.0.0".to_string(),
884 format: SchemaFormat::RDFS,
885 content: "test content".to_string(),
886 fields: vec![FieldDefinition {
887 name: "name".to_string(),
888 field_type: FieldType::String,
889 required: true,
890 default_value: None,
891 description: None,
892 aliases: Vec::new(),
893 }],
894 created_at: Utc::now(),
895 created_by: "test".to_string(),
896 description: None,
897 tags: Vec::new(),
898 compatibility: CompatibilityMode::Backward,
899 };
900
901 manager.register_schema(schema).unwrap();
902
903 let changes = vec![SchemaChange::RemoveField {
905 field_name: "name".to_string(),
906 }];
907
908 let result = manager
909 .evolve_schema("TestSchema", changes, "test".to_string())
910 .unwrap();
911
912 assert!(!result.success);
913 assert!(!result.compatibility_result.is_compatible);
914 }
915}