use crate::engine::WorkflowDefinition;
use crate::error::{Result, WorkflowError};
use serde::{Deserialize, Serialize};
pub struct WorkflowMigration;
impl WorkflowMigration {
pub fn new() -> Self {
Self
}
pub fn migrate(
&self,
from: WorkflowDefinition,
to: WorkflowDefinition,
) -> Result<WorkflowDefinition> {
let plan = self.create_migration_plan(&from, &to)?;
self.execute_migration_plan(from, plan)
}
fn create_migration_plan(
&self,
from: &WorkflowDefinition,
to: &WorkflowDefinition,
) -> Result<MigrationPlan> {
let mut steps = Vec::new();
if from.version != to.version {
steps.push(MigrationStep::UpdateVersion {
from: from.version.clone(),
to: to.version.clone(),
});
}
if from.dag.task_count() != to.dag.task_count() {
steps.push(MigrationStep::UpdateTaskCount {
from: from.dag.task_count(),
to: to.dag.task_count(),
});
}
Ok(MigrationPlan {
steps,
requires_downtime: false,
estimated_duration_secs: 0,
})
}
fn execute_migration_plan(
&self,
mut workflow: WorkflowDefinition,
plan: MigrationPlan,
) -> Result<WorkflowDefinition> {
for step in plan.steps {
workflow = self.execute_migration_step(workflow, step)?;
}
Ok(workflow)
}
fn execute_migration_step(
&self,
mut workflow: WorkflowDefinition,
step: MigrationStep,
) -> Result<WorkflowDefinition> {
match step {
MigrationStep::UpdateVersion { to, .. } => {
workflow.version = to;
}
MigrationStep::UpdateTaskCount { .. } => {
}
MigrationStep::UpdateMetadata => {
}
MigrationStep::Custom { .. } => {
}
}
Ok(workflow)
}
pub fn validate_migration(
&self,
from: &WorkflowDefinition,
to: &WorkflowDefinition,
) -> Result<()> {
if from.id != to.id {
return Err(WorkflowError::versioning(
"Cannot migrate between different workflows",
));
}
Ok(())
}
}
impl Default for WorkflowMigration {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationPlan {
pub steps: Vec<MigrationStep>,
pub requires_downtime: bool,
pub estimated_duration_secs: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MigrationStep {
UpdateVersion {
from: String,
to: String,
},
UpdateTaskCount {
from: usize,
to: usize,
},
UpdateMetadata,
Custom {
name: String,
description: String,
},
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dag::WorkflowDag;
#[test]
fn test_migration_creation() {
let migration = WorkflowMigration::new();
let from = WorkflowDefinition {
id: "test".to_string(),
name: "Test".to_string(),
description: None,
version: "1.0.0".to_string(),
dag: WorkflowDag::new(),
};
let to = WorkflowDefinition {
id: "test".to_string(),
name: "Test".to_string(),
description: None,
version: "2.0.0".to_string(),
dag: WorkflowDag::new(),
};
assert!(migration.validate_migration(&from, &to).is_ok());
}
#[test]
fn test_migration_plan() {
let migration = WorkflowMigration::new();
let from = WorkflowDefinition {
id: "test".to_string(),
name: "Test".to_string(),
description: None,
version: "1.0.0".to_string(),
dag: WorkflowDag::new(),
};
let to = WorkflowDefinition {
id: "test".to_string(),
name: "Test".to_string(),
description: None,
version: "2.0.0".to_string(),
dag: WorkflowDag::new(),
};
let plan = migration.create_migration_plan(&from, &to).expect("Failed");
assert!(!plan.steps.is_empty());
}
#[test]
fn test_invalid_migration() {
let migration = WorkflowMigration::new();
let from = WorkflowDefinition {
id: "test1".to_string(),
name: "Test1".to_string(),
description: None,
version: "1.0.0".to_string(),
dag: WorkflowDag::new(),
};
let to = WorkflowDefinition {
id: "test2".to_string(),
name: "Test2".to_string(),
description: None,
version: "2.0.0".to_string(),
dag: WorkflowDag::new(),
};
assert!(migration.validate_migration(&from, &to).is_err());
}
}