data_modelling_core/convert/
migrate_dataflow.rs1use crate::models::domain::{Domain, System, SystemConnection};
7use crate::models::enums::InfrastructureType;
8use crate::models::table::{ContactDetails, SlaProperty};
9use anyhow::Result;
10use serde::{Deserialize, Serialize};
11use serde_yaml;
12use std::collections::HashMap;
13use uuid::Uuid;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17struct DataFlowFormat {
18 nodes: Option<Vec<DataFlowNode>>,
19 relationships: Option<Vec<DataFlowRelationship>>,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23struct DataFlowNode {
24 id: Option<String>,
25 name: String,
26 #[serde(rename = "type")]
27 node_type: Option<String>,
28 columns: Option<Vec<DataFlowColumn>>,
29 metadata: Option<DataFlowMetadata>,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33struct DataFlowColumn {
34 name: String,
35 #[serde(rename = "type")]
36 data_type: String,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40struct DataFlowRelationship {
41 id: Option<String>,
42 source_node_id: Option<String>,
43 target_node_id: Option<String>,
44 metadata: Option<DataFlowMetadata>,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48struct DataFlowMetadata {
49 owner: Option<String>,
50 sla: Option<Vec<SlaProperty>>,
51 contact_details: Option<ContactDetails>,
52 infrastructure_type: Option<String>,
53 notes: Option<String>,
54}
55
56#[derive(Debug, thiserror::Error)]
58pub enum MigrationError {
59 #[error("Parse error: {0}")]
60 ParseError(String),
61 #[error("Invalid infrastructure type: {0}")]
62 InvalidInfrastructureType(String),
63 #[error("Missing required field: {0}")]
64 MissingField(String),
65}
66
67pub fn migrate_dataflow_to_domain(
107 dataflow_yaml: &str,
108 domain_name: Option<&str>,
109) -> Result<Domain, MigrationError> {
110 let data_flow: DataFlowFormat = serde_yaml::from_str(dataflow_yaml)
112 .map_err(|e| MigrationError::ParseError(format!("Failed to parse DataFlow YAML: {}", e)))?;
113
114 let mut domain = Domain::new(domain_name.unwrap_or("MigratedDomain").to_string());
116
117 let mut node_id_to_system_id: HashMap<String, Uuid> = HashMap::new();
119
120 if let Some(nodes) = data_flow.nodes {
122 for node in nodes {
123 let system = migrate_node_to_system(&node, domain.id)?;
124 let system_id = system.id;
125
126 let node_id = node.id.unwrap_or_else(|| system_id.to_string());
128 node_id_to_system_id.insert(node_id, system_id);
129
130 domain.add_system(system);
131 }
132 }
133
134 if let Some(relationships) = data_flow.relationships {
136 for rel in relationships {
137 let connection =
138 migrate_relationship_to_system_connection(&rel, &node_id_to_system_id)?;
139 domain.add_system_connection(connection);
140 }
141 }
142
143 Ok(domain)
144}
145
146fn migrate_node_to_system(node: &DataFlowNode, domain_id: Uuid) -> Result<System, MigrationError> {
148 let system_id = if let Some(id_str) = &node.id {
150 Uuid::parse_str(id_str)
151 .map_err(|e| MigrationError::ParseError(format!("Invalid node UUID: {}", e)))?
152 } else {
153 Uuid::new_v4()
155 };
156
157 let infrastructure_type = if let Some(infra_str) = node
159 .metadata
160 .as_ref()
161 .and_then(|m| m.infrastructure_type.as_ref())
162 {
163 parse_infrastructure_type(infra_str)?
164 } else {
165 InfrastructureType::Kafka
167 };
168
169 let mut system = System::new(node.name.clone(), infrastructure_type, domain_id);
171 system.id = system_id;
172
173 if let Some(metadata) = &node.metadata {
175 system.owner = metadata.owner.clone();
176 system.sla = metadata.sla.clone();
177 system.contact_details = metadata.contact_details.clone();
178 system.notes = metadata.notes.clone();
179
180 }
183
184 if let Some(columns) = &node.columns
186 && !columns.is_empty()
187 {
188 system.description = Some(format!(
189 "Migrated from DataFlow node with {} columns",
190 columns.len()
191 ));
192 }
193
194 Ok(system)
195}
196
197fn migrate_relationship_to_system_connection(
199 rel: &DataFlowRelationship,
200 node_id_to_system_id: &HashMap<String, Uuid>,
201) -> Result<SystemConnection, MigrationError> {
202 let connection_id = if let Some(id_str) = &rel.id {
204 Uuid::parse_str(id_str)
205 .map_err(|e| MigrationError::ParseError(format!("Invalid relationship UUID: {}", e)))?
206 } else {
207 Uuid::new_v4()
208 };
209
210 let source_node_id = rel
212 .source_node_id
213 .as_ref()
214 .ok_or_else(|| MigrationError::MissingField("source_node_id".to_string()))?;
215 let target_node_id = rel
216 .target_node_id
217 .as_ref()
218 .ok_or_else(|| MigrationError::MissingField("target_node_id".to_string()))?;
219
220 let source_system_id = *node_id_to_system_id.get(source_node_id).ok_or_else(|| {
221 MigrationError::ParseError(format!("Source node ID not found: {}", source_node_id))
222 })?;
223 let target_system_id = *node_id_to_system_id.get(target_node_id).ok_or_else(|| {
224 MigrationError::ParseError(format!("Target node ID not found: {}", target_node_id))
225 })?;
226
227 let mut connection = SystemConnection {
229 id: connection_id,
230 source_system_id,
231 target_system_id,
232 connection_type: "data_flow".to_string(), bidirectional: false, metadata: HashMap::new(),
235 created_at: None,
236 updated_at: None,
237 };
238
239 if let Some(metadata) = &rel.metadata {
241 if let Some(owner) = &metadata.owner {
243 connection.metadata.insert(
244 "owner".to_string(),
245 serde_json::Value::String(owner.clone()),
246 );
247 }
248 if let Some(notes) = &metadata.notes {
249 connection.metadata.insert(
250 "notes".to_string(),
251 serde_json::Value::String(notes.clone()),
252 );
253 }
254 if let Some(sla) = &metadata.sla {
255 connection.metadata.insert(
256 "sla".to_string(),
257 serde_json::to_value(sla).unwrap_or(serde_json::Value::Null),
258 );
259 }
260 if let Some(contact_details) = &metadata.contact_details {
261 connection.metadata.insert(
262 "contact_details".to_string(),
263 serde_json::to_value(contact_details).unwrap_or(serde_json::Value::Null),
264 );
265 }
266 if let Some(infra_type) = &metadata.infrastructure_type {
267 connection.metadata.insert(
268 "infrastructure_type".to_string(),
269 serde_json::Value::String(infra_type.clone()),
270 );
271 }
272 }
273
274 Ok(connection)
275}
276
277fn parse_infrastructure_type(infra_str: &str) -> Result<InfrastructureType, MigrationError> {
279 match serde_json::from_str::<InfrastructureType>(&format!("\"{}\"", infra_str)) {
282 Ok(infra_type) => Ok(infra_type),
283 Err(_) => Err(MigrationError::InvalidInfrastructureType(format!(
284 "Invalid infrastructure type: {}. Must be one of the valid InfrastructureType values.",
285 infra_str
286 ))),
287 }
288}