data_modelling_sdk/import/
dataflow.rs

1//! Data Flow format importer for lightweight Data Flow nodes and relationships.
2//!
3//! This module imports Data Flow format YAML files (lightweight format separate from ODCS).
4//! ODCS format is only for Data Models (tables), while this format is for Data Flow nodes and relationships.
5
6use super::ImportError;
7use crate::models::enums::InfrastructureType;
8use crate::models::{Column, ContactDetails, DataModel, Relationship, SlaProperty, Table};
9use serde::{Deserialize, Serialize};
10use serde_yaml;
11use uuid::Uuid;
12
13/// Data Flow format structure for YAML parsing
14#[derive(Debug, Clone, Serialize, Deserialize)]
15struct DataFlowFormat {
16    nodes: Option<Vec<DataFlowNode>>,
17    relationships: Option<Vec<DataFlowRelationship>>,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21struct DataFlowNode {
22    id: Option<String>,
23    name: String,
24    #[serde(rename = "type")]
25    node_type: Option<String>,
26    columns: Option<Vec<DataFlowColumn>>,
27    metadata: Option<DataFlowMetadata>,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31struct DataFlowColumn {
32    name: String,
33    #[serde(rename = "type")]
34    data_type: String,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38struct DataFlowRelationship {
39    id: Option<String>,
40    source_node_id: Option<String>,
41    target_node_id: Option<String>,
42    metadata: Option<DataFlowMetadata>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46struct DataFlowMetadata {
47    owner: Option<String>,
48    sla: Option<Vec<SlaProperty>>,
49    contact_details: Option<ContactDetails>,
50    infrastructure_type: Option<String>,
51    notes: Option<String>,
52}
53
54/// Data Flow format importer
55pub struct DataFlowImporter;
56
57impl Default for DataFlowImporter {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl DataFlowImporter {
64    /// Create a new Data Flow importer instance.
65    ///
66    /// # Example
67    ///
68    /// ```rust
69    /// use data_modelling_sdk::import::dataflow::DataFlowImporter;
70    ///
71    /// let importer = DataFlowImporter::new();
72    /// ```
73    pub fn new() -> Self {
74        Self
75    }
76
77    /// Import Data Flow format YAML content and create DataModel.
78    ///
79    /// # Arguments
80    ///
81    /// * `yaml_content` - Data Flow format YAML content as a string
82    ///
83    /// # Returns
84    ///
85    /// A `DataModel` containing the extracted nodes and relationships.
86    ///
87    /// # Example
88    ///
89    /// ```rust
90    /// use data_modelling_sdk::import::dataflow::DataFlowImporter;
91    ///
92    /// let importer = DataFlowImporter::new();
93    /// let yaml = r#"
94    /// nodes:
95    ///   - name: user_events
96    ///     metadata:
97    ///       owner: "Data Engineering Team"
98    ///       infrastructure_type: "Kafka"
99    /// "#;
100    /// let model = importer.import(yaml).unwrap();
101    /// ```
102    pub fn import(&self, yaml_content: &str) -> Result<DataModel, ImportError> {
103        let data_flow: DataFlowFormat = serde_yaml::from_str(yaml_content)
104            .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
105
106        let mut model = DataModel::new(
107            "DataFlow".to_string(),
108            "/tmp".to_string(),
109            "relationships.yaml".to_string(),
110        );
111
112        // Import nodes
113        if let Some(nodes) = data_flow.nodes {
114            for node in nodes {
115                let table = self.parse_node(node)?;
116                model.tables.push(table);
117            }
118        }
119
120        // Import relationships
121        if let Some(relationships) = data_flow.relationships {
122            for rel in relationships {
123                let relationship = self.parse_relationship(rel)?;
124                model.relationships.push(relationship);
125            }
126        }
127
128        Ok(model)
129    }
130
131    fn parse_node(&self, node: DataFlowNode) -> Result<Table, ImportError> {
132        let id = if let Some(id_str) = node.id {
133            Uuid::parse_str(&id_str)
134                .map_err(|e| ImportError::ParseError(format!("Invalid UUID: {}", e)))?
135        } else {
136            Table::generate_id(&node.name, None, None, None)
137        };
138
139        let columns: Vec<Column> = node
140            .columns
141            .unwrap_or_default()
142            .into_iter()
143            .map(|c| Column::new(c.name, c.data_type))
144            .collect();
145
146        let mut table = Table::new(node.name, columns);
147        table.id = id;
148
149        // Extract metadata
150        if let Some(metadata) = node.metadata {
151            table.owner = metadata.owner;
152            table.sla = metadata.sla;
153            table.contact_details = metadata.contact_details;
154            table.notes = metadata.notes;
155
156            // Parse infrastructure type
157            if let Some(infra_str) = metadata.infrastructure_type {
158                table.infrastructure_type = self.parse_infrastructure_type(&infra_str)?;
159            }
160        }
161
162        Ok(table)
163    }
164
165    fn parse_relationship(&self, rel: DataFlowRelationship) -> Result<Relationship, ImportError> {
166        let id = if let Some(id_str) = rel.id {
167            Uuid::parse_str(&id_str)
168                .map_err(|e| ImportError::ParseError(format!("Invalid UUID: {}", e)))?
169        } else {
170            Uuid::new_v4()
171        };
172
173        let source_id = rel
174            .source_node_id
175            .ok_or_else(|| ImportError::ParseError("Missing source_node_id".to_string()))?;
176        let target_id = rel
177            .target_node_id
178            .ok_or_else(|| ImportError::ParseError("Missing target_node_id".to_string()))?;
179
180        let source_uuid = Uuid::parse_str(&source_id)
181            .map_err(|e| ImportError::ParseError(format!("Invalid source UUID: {}", e)))?;
182        let target_uuid = Uuid::parse_str(&target_id)
183            .map_err(|e| ImportError::ParseError(format!("Invalid target UUID: {}", e)))?;
184
185        let mut relationship = Relationship::new(source_uuid, target_uuid);
186        relationship.id = id;
187
188        // Extract metadata
189        if let Some(metadata) = rel.metadata {
190            relationship.owner = metadata.owner;
191            relationship.sla = metadata.sla;
192            relationship.contact_details = metadata.contact_details;
193            relationship.notes = metadata.notes;
194
195            // Parse infrastructure type
196            if let Some(infra_str) = metadata.infrastructure_type {
197                relationship.infrastructure_type = self.parse_infrastructure_type(&infra_str)?;
198            }
199        }
200
201        Ok(relationship)
202    }
203
204    fn parse_infrastructure_type(
205        &self,
206        infra_str: &str,
207    ) -> Result<Option<InfrastructureType>, ImportError> {
208        // Try to match the string to InfrastructureType enum
209        // Using serde deserialization which handles PascalCase
210        match serde_json::from_str::<InfrastructureType>(&format!("\"{}\"", infra_str)) {
211            Ok(infra_type) => Ok(Some(infra_type)),
212            Err(_) => Err(ImportError::ParseError(format!(
213                "Invalid infrastructure type: {}. Must be one of the valid InfrastructureType values.",
214                infra_str
215            ))),
216        }
217    }
218}