use crate::models::domain::{Domain, System, SystemConnection};
use crate::models::enums::InfrastructureType;
use crate::models::table::{ContactDetails, SlaProperty};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use serde_yaml;
use std::collections::HashMap;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct DataFlowFormat {
nodes: Option<Vec<DataFlowNode>>,
relationships: Option<Vec<DataFlowRelationship>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct DataFlowNode {
id: Option<String>,
name: String,
#[serde(rename = "type")]
node_type: Option<String>,
columns: Option<Vec<DataFlowColumn>>,
metadata: Option<DataFlowMetadata>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct DataFlowColumn {
name: String,
#[serde(rename = "type")]
data_type: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct DataFlowRelationship {
id: Option<String>,
source_node_id: Option<String>,
target_node_id: Option<String>,
metadata: Option<DataFlowMetadata>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct DataFlowMetadata {
owner: Option<String>,
sla: Option<Vec<SlaProperty>>,
contact_details: Option<ContactDetails>,
infrastructure_type: Option<String>,
notes: Option<String>,
}
#[derive(Debug, thiserror::Error)]
pub enum MigrationError {
#[error("Parse error: {0}")]
ParseError(String),
#[error("Invalid infrastructure type: {0}")]
InvalidInfrastructureType(String),
#[error("Missing required field: {0}")]
MissingField(String),
}
pub fn migrate_dataflow_to_domain(
dataflow_yaml: &str,
domain_name: Option<&str>,
) -> Result<Domain, MigrationError> {
let data_flow: DataFlowFormat = serde_yaml::from_str(dataflow_yaml)
.map_err(|e| MigrationError::ParseError(format!("Failed to parse DataFlow YAML: {}", e)))?;
let mut domain = Domain::new(domain_name.unwrap_or("MigratedDomain").to_string());
let mut node_id_to_system_id: HashMap<String, Uuid> = HashMap::new();
if let Some(nodes) = data_flow.nodes {
for node in nodes {
let system = migrate_node_to_system(&node, domain.id)?;
let system_id = system.id;
let node_id = node.id.unwrap_or_else(|| system_id.to_string());
node_id_to_system_id.insert(node_id, system_id);
domain.add_system(system);
}
}
if let Some(relationships) = data_flow.relationships {
for rel in relationships {
let connection =
migrate_relationship_to_system_connection(&rel, &node_id_to_system_id)?;
domain.add_system_connection(connection);
}
}
Ok(domain)
}
fn migrate_node_to_system(node: &DataFlowNode, domain_id: Uuid) -> Result<System, MigrationError> {
let system_id = if let Some(id_str) = &node.id {
Uuid::parse_str(id_str)
.map_err(|e| MigrationError::ParseError(format!("Invalid node UUID: {}", e)))?
} else {
Uuid::new_v4()
};
let infrastructure_type = if let Some(infra_str) = node
.metadata
.as_ref()
.and_then(|m| m.infrastructure_type.as_ref())
{
parse_infrastructure_type(infra_str)?
} else {
InfrastructureType::Kafka
};
let mut system = System::new(node.name.clone(), infrastructure_type, domain_id);
system.id = system_id;
if let Some(metadata) = &node.metadata {
system.owner = metadata.owner.clone();
system.sla = metadata.sla.clone();
system.contact_details = metadata.contact_details.clone();
system.notes = metadata.notes.clone();
}
if let Some(columns) = &node.columns
&& !columns.is_empty()
{
system.description = Some(format!(
"Migrated from DataFlow node with {} columns",
columns.len()
));
}
Ok(system)
}
fn migrate_relationship_to_system_connection(
rel: &DataFlowRelationship,
node_id_to_system_id: &HashMap<String, Uuid>,
) -> Result<SystemConnection, MigrationError> {
let connection_id = if let Some(id_str) = &rel.id {
Uuid::parse_str(id_str)
.map_err(|e| MigrationError::ParseError(format!("Invalid relationship UUID: {}", e)))?
} else {
Uuid::new_v4()
};
let source_node_id = rel
.source_node_id
.as_ref()
.ok_or_else(|| MigrationError::MissingField("source_node_id".to_string()))?;
let target_node_id = rel
.target_node_id
.as_ref()
.ok_or_else(|| MigrationError::MissingField("target_node_id".to_string()))?;
let source_system_id = *node_id_to_system_id.get(source_node_id).ok_or_else(|| {
MigrationError::ParseError(format!("Source node ID not found: {}", source_node_id))
})?;
let target_system_id = *node_id_to_system_id.get(target_node_id).ok_or_else(|| {
MigrationError::ParseError(format!("Target node ID not found: {}", target_node_id))
})?;
let mut connection = SystemConnection {
id: connection_id,
source_system_id,
target_system_id,
connection_type: "data_flow".to_string(), bidirectional: false, metadata: HashMap::new(),
created_at: None,
updated_at: None,
};
if let Some(metadata) = &rel.metadata {
if let Some(owner) = &metadata.owner {
connection.metadata.insert(
"owner".to_string(),
serde_json::Value::String(owner.clone()),
);
}
if let Some(notes) = &metadata.notes {
connection.metadata.insert(
"notes".to_string(),
serde_json::Value::String(notes.clone()),
);
}
if let Some(sla) = &metadata.sla {
connection.metadata.insert(
"sla".to_string(),
serde_json::to_value(sla).unwrap_or(serde_json::Value::Null),
);
}
if let Some(contact_details) = &metadata.contact_details {
connection.metadata.insert(
"contact_details".to_string(),
serde_json::to_value(contact_details).unwrap_or(serde_json::Value::Null),
);
}
if let Some(infra_type) = &metadata.infrastructure_type {
connection.metadata.insert(
"infrastructure_type".to_string(),
serde_json::Value::String(infra_type.clone()),
);
}
}
Ok(connection)
}
fn parse_infrastructure_type(infra_str: &str) -> Result<InfrastructureType, MigrationError> {
match serde_json::from_str::<InfrastructureType>(&format!("\"{}\"", infra_str)) {
Ok(infra_type) => Ok(infra_type),
Err(_) => Err(MigrationError::InvalidInfrastructureType(format!(
"Invalid infrastructure type: {}. Must be one of the valid InfrastructureType values.",
infra_str
))),
}
}