data_modelling_core/convert/
migrate_dataflow.rs

1//! DataFlow to Domain schema migration utility
2//!
3//! Converts DataFlow YAML format to Business Domain schema format.
4//! DataFlow nodes become Systems, and DataFlow relationships become SystemConnections.
5
6use crate::models::domain::{Domain, System, SystemConnection};
7use crate::models::enums::InfrastructureType;
8use crate::models::table::{ContactDetails, SlaProperty};
9use anyhow::Result;
10use serde::{Deserialize, Serialize};
11use serde_yaml;
12use std::collections::HashMap;
13use uuid::Uuid;
14
15/// DataFlow format structure for YAML parsing (internal, for migration only)
16#[derive(Debug, Clone, Serialize, Deserialize)]
17struct DataFlowFormat {
18    nodes: Option<Vec<DataFlowNode>>,
19    relationships: Option<Vec<DataFlowRelationship>>,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23struct DataFlowNode {
24    id: Option<String>,
25    name: String,
26    #[serde(rename = "type")]
27    node_type: Option<String>,
28    columns: Option<Vec<DataFlowColumn>>,
29    metadata: Option<DataFlowMetadata>,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33struct DataFlowColumn {
34    name: String,
35    #[serde(rename = "type")]
36    data_type: String,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40struct DataFlowRelationship {
41    id: Option<String>,
42    source_node_id: Option<String>,
43    target_node_id: Option<String>,
44    metadata: Option<DataFlowMetadata>,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48struct DataFlowMetadata {
49    owner: Option<String>,
50    sla: Option<Vec<SlaProperty>>,
51    contact_details: Option<ContactDetails>,
52    infrastructure_type: Option<String>,
53    notes: Option<String>,
54}
55
56/// Error during DataFlow migration
57#[derive(Debug, thiserror::Error)]
58pub enum MigrationError {
59    #[error("Parse error: {0}")]
60    ParseError(String),
61    #[error("Invalid infrastructure type: {0}")]
62    InvalidInfrastructureType(String),
63    #[error("Missing required field: {0}")]
64    MissingField(String),
65}
66
67/// Migrate DataFlow YAML to Domain schema format
68///
69/// # Arguments
70///
71/// * `dataflow_yaml` - DataFlow format YAML content as a string
72/// * `domain_name` - Name for the new Domain (optional, defaults to "MigratedDomain")
73///
74/// # Returns
75///
76/// A `Domain` containing Systems and SystemConnections migrated from DataFlow format
77///
78/// # Example
79///
80/// ```rust
81/// use data_modelling_core::convert::migrate_dataflow::migrate_dataflow_to_domain;
82/// use uuid::Uuid;
83///
84/// let node1_id = Uuid::new_v4();
85/// let node2_id = Uuid::new_v4();
86/// let dataflow_yaml = format!(r#"
87/// nodes:
88///   - id: {}
89///     name: kafka-cluster
90///     metadata:
91///       owner: "Data Engineering Team"
92///       infrastructure_type: "Kafka"
93///   - id: {}
94///     name: postgres-db
95///     metadata:
96///       infrastructure_type: "PostgreSQL"
97/// relationships:
98///   - source_node_id: "{}"
99///     target_node_id: "{}"
100/// "#, node1_id, node2_id, node1_id, node2_id);
101///
102/// let domain = migrate_dataflow_to_domain(&dataflow_yaml, Some("customer-service")).unwrap();
103/// assert_eq!(domain.systems.len(), 2);
104/// assert_eq!(domain.system_connections.len(), 1);
105/// ```
106pub fn migrate_dataflow_to_domain(
107    dataflow_yaml: &str,
108    domain_name: Option<&str>,
109) -> Result<Domain, MigrationError> {
110    // Parse DataFlow YAML
111    let data_flow: DataFlowFormat = serde_yaml::from_str(dataflow_yaml)
112        .map_err(|e| MigrationError::ParseError(format!("Failed to parse DataFlow YAML: {}", e)))?;
113
114    // Create new Domain
115    let mut domain = Domain::new(domain_name.unwrap_or("MigratedDomain").to_string());
116
117    // Map of old node IDs to new system IDs
118    let mut node_id_to_system_id: HashMap<String, Uuid> = HashMap::new();
119
120    // Migrate nodes to Systems
121    if let Some(nodes) = data_flow.nodes {
122        for node in nodes {
123            let system = migrate_node_to_system(&node, domain.id)?;
124            let system_id = system.id;
125
126            // Store mapping for relationships
127            let node_id = node.id.unwrap_or_else(|| system_id.to_string());
128            node_id_to_system_id.insert(node_id, system_id);
129
130            domain.add_system(system);
131        }
132    }
133
134    // Migrate relationships to SystemConnections
135    if let Some(relationships) = data_flow.relationships {
136        for rel in relationships {
137            let connection =
138                migrate_relationship_to_system_connection(&rel, &node_id_to_system_id)?;
139            domain.add_system_connection(connection);
140        }
141    }
142
143    Ok(domain)
144}
145
146/// Migrate a DataFlow node to a System
147fn migrate_node_to_system(node: &DataFlowNode, domain_id: Uuid) -> Result<System, MigrationError> {
148    // Parse system ID
149    let system_id = if let Some(id_str) = &node.id {
150        Uuid::parse_str(id_str)
151            .map_err(|e| MigrationError::ParseError(format!("Invalid node UUID: {}", e)))?
152    } else {
153        // Generate ID from name if not provided
154        Uuid::new_v4()
155    };
156
157    // Parse infrastructure type
158    let infrastructure_type = if let Some(infra_str) = node
159        .metadata
160        .as_ref()
161        .and_then(|m| m.infrastructure_type.as_ref())
162    {
163        parse_infrastructure_type(infra_str)?
164    } else {
165        // Default to Kafka if not specified (common for DataFlow)
166        InfrastructureType::Kafka
167    };
168
169    // Create System with DataFlow metadata
170    let mut system = System::new(node.name.clone(), infrastructure_type, domain_id);
171    system.id = system_id;
172
173    // Preserve all DataFlow metadata
174    if let Some(metadata) = &node.metadata {
175        system.owner = metadata.owner.clone();
176        system.sla = metadata.sla.clone();
177        system.contact_details = metadata.contact_details.clone();
178        system.notes = metadata.notes.clone();
179
180        // If infrastructure_type was in metadata, it's already set above
181        // But if it wasn't, we keep the default
182    }
183
184    // Add description if node has columns (indicates it might be a data store)
185    if let Some(columns) = &node.columns
186        && !columns.is_empty()
187    {
188        system.description = Some(format!(
189            "Migrated from DataFlow node with {} columns",
190            columns.len()
191        ));
192    }
193
194    Ok(system)
195}
196
197/// Migrate a DataFlow relationship to a SystemConnection
198fn migrate_relationship_to_system_connection(
199    rel: &DataFlowRelationship,
200    node_id_to_system_id: &HashMap<String, Uuid>,
201) -> Result<SystemConnection, MigrationError> {
202    // Parse connection ID
203    let connection_id = if let Some(id_str) = &rel.id {
204        Uuid::parse_str(id_str)
205            .map_err(|e| MigrationError::ParseError(format!("Invalid relationship UUID: {}", e)))?
206    } else {
207        Uuid::new_v4()
208    };
209
210    // Get source and target system IDs
211    let source_node_id = rel
212        .source_node_id
213        .as_ref()
214        .ok_or_else(|| MigrationError::MissingField("source_node_id".to_string()))?;
215    let target_node_id = rel
216        .target_node_id
217        .as_ref()
218        .ok_or_else(|| MigrationError::MissingField("target_node_id".to_string()))?;
219
220    let source_system_id = *node_id_to_system_id.get(source_node_id).ok_or_else(|| {
221        MigrationError::ParseError(format!("Source node ID not found: {}", source_node_id))
222    })?;
223    let target_system_id = *node_id_to_system_id.get(target_node_id).ok_or_else(|| {
224        MigrationError::ParseError(format!("Target node ID not found: {}", target_node_id))
225    })?;
226
227    // Create SystemConnection
228    let mut connection = SystemConnection {
229        id: connection_id,
230        source_system_id,
231        target_system_id,
232        connection_type: "data_flow".to_string(), // Default for DataFlow relationships
233        bidirectional: false,                     // Default to unidirectional
234        metadata: HashMap::new(),
235        created_at: None,
236        updated_at: None,
237    };
238
239    // Preserve relationship metadata if present
240    if let Some(metadata) = &rel.metadata {
241        // Store metadata in connection.metadata HashMap
242        if let Some(owner) = &metadata.owner {
243            connection.metadata.insert(
244                "owner".to_string(),
245                serde_json::Value::String(owner.clone()),
246            );
247        }
248        if let Some(notes) = &metadata.notes {
249            connection.metadata.insert(
250                "notes".to_string(),
251                serde_json::Value::String(notes.clone()),
252            );
253        }
254        if let Some(sla) = &metadata.sla {
255            connection.metadata.insert(
256                "sla".to_string(),
257                serde_json::to_value(sla).unwrap_or(serde_json::Value::Null),
258            );
259        }
260        if let Some(contact_details) = &metadata.contact_details {
261            connection.metadata.insert(
262                "contact_details".to_string(),
263                serde_json::to_value(contact_details).unwrap_or(serde_json::Value::Null),
264            );
265        }
266        if let Some(infra_type) = &metadata.infrastructure_type {
267            connection.metadata.insert(
268                "infrastructure_type".to_string(),
269                serde_json::Value::String(infra_type.clone()),
270            );
271        }
272    }
273
274    Ok(connection)
275}
276
277/// Parse infrastructure type string to InfrastructureType enum
278fn parse_infrastructure_type(infra_str: &str) -> Result<InfrastructureType, MigrationError> {
279    // Try to match the string to InfrastructureType enum
280    // Using serde deserialization which handles PascalCase
281    match serde_json::from_str::<InfrastructureType>(&format!("\"{}\"", infra_str)) {
282        Ok(infra_type) => Ok(infra_type),
283        Err(_) => Err(MigrationError::InvalidInfrastructureType(format!(
284            "Invalid infrastructure type: {}. Must be one of the valid InfrastructureType values.",
285            infra_str
286        ))),
287    }
288}