use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::RwLock;
#[derive(Debug, Error)]
pub enum MigrationError {
#[error("Configuration error: {0}")]
ConfigError(String),
#[error("Validation error: {0}")]
ValidationError(String),
#[error("Execution error: {0}")]
ExecutionError(String),
#[error("Rollback error: {0}")]
RollbackError(String),
#[error("Version compatibility error: {0}")]
VersionError(String),
#[error("Transformation error: {0}")]
TransformationError(String),
#[error("Migration error: {0}")]
Other(String),
}
pub type Result<T> = std::result::Result<T, MigrationError>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum MigrationStatus {
Pending,
Preparing,
InProgress,
Validating,
Completed,
Failed,
RollingBack,
RolledBack,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum MigrationPhase {
SchemaValidation,
DataBackup,
SchemaModification,
DataTransformation,
ConsistencyValidation,
Finalization,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum MigrationStrategy {
AllAtOnce,
BlueGreen,
Rolling {
batch_percent: u8,
},
Canary {
canary_percent: u8,
},
}
impl Default for MigrationStrategy {
fn default() -> Self {
Self::Rolling { batch_percent: 25 }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationConfig {
pub strategy: MigrationStrategy,
pub batch_size: usize,
pub enable_validation: bool,
pub enable_auto_rollback: bool,
pub timeout_seconds: u64,
pub max_retries: usize,
pub retry_delay_ms: u64,
pub enable_progress_tracking: bool,
pub enable_data_verification: bool,
pub checkpoint_interval: usize,
}
impl Default for MigrationConfig {
fn default() -> Self {
Self {
strategy: MigrationStrategy::default(),
batch_size: 1000,
enable_validation: true,
enable_auto_rollback: true,
timeout_seconds: 3600, max_retries: 3,
retry_delay_ms: 1000,
enable_progress_tracking: true,
enable_data_verification: true,
checkpoint_interval: 10,
}
}
}
impl MigrationConfig {
pub fn with_strategy(mut self, strategy: MigrationStrategy) -> Self {
self.strategy = strategy;
self
}
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = size;
self
}
pub fn with_validation(mut self, enable: bool) -> Self {
self.enable_validation = enable;
self
}
pub fn with_timeout(mut self, timeout_seconds: u64) -> Self {
self.timeout_seconds = timeout_seconds;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationMetadata {
pub migration_id: String,
pub from_version: String,
pub to_version: String,
pub description: String,
pub created_by: String,
pub created_at: DateTime<Utc>,
pub status: MigrationStatus,
pub current_phase: Option<MigrationPhase>,
pub progress_percent: f64,
pub total_items: u64,
pub items_migrated: u64,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub error_message: Option<String>,
pub checkpoints: Vec<MigrationCheckpoint>,
}
impl MigrationMetadata {
pub fn new(
from_version: impl Into<String>,
to_version: impl Into<String>,
description: impl Into<String>,
) -> Self {
Self {
migration_id: uuid::Uuid::new_v4().to_string(),
from_version: from_version.into(),
to_version: to_version.into(),
description: description.into(),
created_by: "system".to_string(),
created_at: Utc::now(),
status: MigrationStatus::Pending,
current_phase: None,
progress_percent: 0.0,
total_items: 0,
items_migrated: 0,
started_at: None,
completed_at: None,
error_message: None,
checkpoints: Vec::new(),
}
}
pub fn update_progress(&mut self, items_migrated: u64) {
self.items_migrated = items_migrated;
if self.total_items > 0 {
self.progress_percent = (items_migrated as f64 / self.total_items as f64) * 100.0;
}
}
pub fn mark_started(&mut self) {
self.status = MigrationStatus::InProgress;
self.started_at = Some(Utc::now());
}
pub fn mark_completed(&mut self) {
self.status = MigrationStatus::Completed;
self.completed_at = Some(Utc::now());
self.progress_percent = 100.0;
}
pub fn mark_failed(&mut self, error: impl Into<String>) {
self.status = MigrationStatus::Failed;
self.error_message = Some(error.into());
self.completed_at = Some(Utc::now());
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationCheckpoint {
pub checkpoint_id: String,
pub phase: MigrationPhase,
pub items_processed: u64,
pub timestamp: DateTime<Utc>,
pub data: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SchemaOperation {
AddClass {
class_uri: String,
properties: Vec<String>,
},
RemoveClass { class_uri: String },
AddProperty {
property_uri: String,
domain: String,
range: String,
},
RemoveProperty { property_uri: String },
RenameProperty { old_uri: String, new_uri: String },
ChangePropertyRange {
property_uri: String,
old_range: String,
new_range: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TransformOperation {
MapProperty {
from_property: String,
to_property: String,
transform_fn: String,
},
SplitProperty {
source_property: String,
target_properties: Vec<String>,
},
MergeProperties {
source_properties: Vec<String>,
target_property: String,
},
Custom { name: String, script: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationPlan {
pub metadata: MigrationMetadata,
pub schema_operations: Vec<SchemaOperation>,
pub data_transformations: Vec<TransformOperation>,
pub validation_rules: Vec<String>,
}
impl MigrationPlan {
pub fn new(metadata: MigrationMetadata) -> Self {
Self {
metadata,
schema_operations: Vec::new(),
data_transformations: Vec::new(),
validation_rules: Vec::new(),
}
}
pub fn add_schema_operation(mut self, operation: SchemaOperation) -> Self {
self.schema_operations.push(operation);
self
}
pub fn add_data_transformation(mut self, operation: TransformOperation) -> Self {
self.data_transformations.push(operation);
self
}
pub fn add_validation_rule(mut self, rule: impl Into<String>) -> Self {
self.validation_rules.push(rule.into());
self
}
}
pub struct MigrationManager {
config: MigrationConfig,
active_migrations: Arc<RwLock<HashMap<String, MigrationMetadata>>>,
migration_history: Arc<RwLock<Vec<MigrationMetadata>>>,
running: Arc<RwLock<bool>>,
}
impl MigrationManager {
pub async fn new(config: MigrationConfig) -> Result<Self> {
Ok(Self {
config,
active_migrations: Arc::new(RwLock::new(HashMap::new())),
migration_history: Arc::new(RwLock::new(Vec::new())),
running: Arc::new(RwLock::new(false)),
})
}
pub async fn start_migration(
&mut self,
from_version: impl Into<String>,
to_version: impl Into<String>,
) -> Result<String> {
let mut running = self.running.write().await;
if *running {
return Err(MigrationError::ExecutionError(
"Migration already in progress".to_string(),
));
}
let from = from_version.into();
let to = to_version.into();
tracing::info!(
from_version = %from,
to_version = %to,
"Starting migration"
);
let metadata = MigrationMetadata::new(
from.clone(),
to.clone(),
format!("Migration from {} to {}", from, to),
);
let migration_id = metadata.migration_id.clone();
let mut active = self.active_migrations.write().await;
active.insert(migration_id.clone(), metadata.clone());
*running = true;
let config = self.config.clone();
let active_migrations = Arc::clone(&self.active_migrations);
let migration_history = Arc::clone(&self.migration_history);
let running_clone = Arc::clone(&self.running);
let migration_id_clone = migration_id.clone();
tokio::spawn(async move {
let result = Self::execute_migration(
&config,
migration_id_clone.clone(),
active_migrations.clone(),
)
.await;
let mut active = active_migrations.write().await;
if let Some(metadata) = active.remove(&migration_id_clone) {
let mut history = migration_history.write().await;
history.push(metadata);
}
let mut running = running_clone.write().await;
*running = false;
if let Err(e) = result {
tracing::error!("Migration failed: {}", e);
} else {
tracing::info!("Migration completed successfully");
}
});
Ok(migration_id)
}
async fn execute_migration(
config: &MigrationConfig,
migration_id: String,
active_migrations: Arc<RwLock<HashMap<String, MigrationMetadata>>>,
) -> Result<()> {
{
let mut active = active_migrations.write().await;
if let Some(metadata) = active.get_mut(&migration_id) {
metadata.mark_started();
}
}
let phases = vec![
MigrationPhase::SchemaValidation,
MigrationPhase::DataBackup,
MigrationPhase::SchemaModification,
MigrationPhase::DataTransformation,
MigrationPhase::ConsistencyValidation,
MigrationPhase::Finalization,
];
for phase in phases {
tracing::info!("Executing migration phase: {:?}", phase);
{
let mut active = active_migrations.write().await;
if let Some(metadata) = active.get_mut(&migration_id) {
metadata.current_phase = Some(phase);
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
if config.enable_progress_tracking {
let checkpoint = MigrationCheckpoint {
checkpoint_id: uuid::Uuid::new_v4().to_string(),
phase,
items_processed: 0,
timestamp: Utc::now(),
data: HashMap::new(),
};
let mut active = active_migrations.write().await;
if let Some(metadata) = active.get_mut(&migration_id) {
metadata.checkpoints.push(checkpoint);
}
}
}
{
let mut active = active_migrations.write().await;
if let Some(metadata) = active.get_mut(&migration_id) {
metadata.mark_completed();
}
}
Ok(())
}
pub async fn get_migration_status(&self, migration_id: &str) -> Option<MigrationMetadata> {
let active = self.active_migrations.read().await;
active.get(migration_id).cloned()
}
pub async fn get_migration_history(&self) -> Vec<MigrationMetadata> {
let history = self.migration_history.read().await;
history.clone()
}
pub async fn rollback_migration(&self, migration_id: &str) -> Result<()> {
tracing::info!(migration_id = %migration_id, "Rolling back migration");
let mut active = self.active_migrations.write().await;
if let Some(metadata) = active.get_mut(migration_id) {
metadata.status = MigrationStatus::RollingBack;
for checkpoint in metadata.checkpoints.iter().rev() {
tracing::info!(
checkpoint_id = %checkpoint.checkpoint_id,
phase = ?checkpoint.phase,
"Restoring checkpoint"
);
tokio::time::sleep(Duration::from_millis(100)).await;
}
metadata.status = MigrationStatus::RolledBack;
metadata.completed_at = Some(Utc::now());
}
Ok(())
}
pub async fn validate_plan(&self, plan: &MigrationPlan) -> Result<Vec<String>> {
let mut warnings = Vec::new();
for operation in &plan.schema_operations {
match operation {
SchemaOperation::RemoveClass { class_uri } => {
warnings.push(format!(
"Removing class '{}' may cause data loss",
class_uri
));
}
SchemaOperation::RemoveProperty { property_uri } => {
warnings.push(format!(
"Removing property '{}' may cause data loss",
property_uri
));
}
_ => {}
}
}
if plan.data_transformations.is_empty() && !plan.schema_operations.is_empty() {
warnings.push(
"Schema changes without data transformations may lead to inconsistencies"
.to_string(),
);
}
Ok(warnings)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationStatistics {
pub total_migrations: usize,
pub successful_migrations: usize,
pub failed_migrations: usize,
pub rolled_back_migrations: usize,
pub avg_duration_seconds: f64,
pub active_migrations: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_migration_metadata_creation() {
let metadata = MigrationMetadata::new("v1", "v2", "Test migration");
assert_eq!(metadata.from_version, "v1");
assert_eq!(metadata.to_version, "v2");
assert_eq!(metadata.status, MigrationStatus::Pending);
assert_eq!(metadata.progress_percent, 0.0);
}
#[test]
fn test_migration_metadata_progress() {
let mut metadata = MigrationMetadata::new("v1", "v2", "Test migration");
metadata.total_items = 100;
metadata.update_progress(50);
assert_eq!(metadata.progress_percent, 50.0);
metadata.update_progress(100);
assert_eq!(metadata.progress_percent, 100.0);
}
#[test]
fn test_migration_config_builder() {
let config = MigrationConfig::default()
.with_batch_size(500)
.with_validation(false)
.with_timeout(7200);
assert_eq!(config.batch_size, 500);
assert!(!config.enable_validation);
assert_eq!(config.timeout_seconds, 7200);
}
#[tokio::test]
async fn test_migration_manager_creation() {
let config = MigrationConfig::default();
let manager = MigrationManager::new(config).await;
assert!(manager.is_ok());
}
#[test]
fn test_migration_plan() {
let metadata = MigrationMetadata::new("v1", "v2", "Test migration");
let plan = MigrationPlan::new(metadata)
.add_schema_operation(SchemaOperation::AddClass {
class_uri: "http://example.org/NewClass".to_string(),
properties: vec!["prop1".to_string(), "prop2".to_string()],
})
.add_validation_rule("Check data integrity");
assert_eq!(plan.schema_operations.len(), 1);
assert_eq!(plan.validation_rules.len(), 1);
}
}