data_modelling_sdk/convert/
converter.rs

1//! Universal format converter
2//!
3//! Converts any import format to ODCS v3.1.0 format.
4
5use crate::export::{ExportError, ODCSExporter};
6use crate::import::{
7    AvroImporter, CADSImporter, ColumnData, ImportError, ImportResult, JSONSchemaImporter,
8    ODCSImporter, ODPSImporter, ProtobufImporter, SQLImporter, TableData,
9};
10use crate::models::{Column, DataModel, Domain, Table};
11
12/// Error during format conversion
13#[derive(Debug, thiserror::Error)]
14pub enum ConversionError {
15    #[error("Import error: {0}")]
16    ImportError(#[from] ImportError),
17    #[error("Export error: {0}")]
18    ExportError(#[from] ExportError),
19    #[error("Unsupported format: {0}")]
20    UnsupportedFormat(String),
21    #[error("Auto-detection failed: {0}")]
22    AutoDetectionFailed(String),
23    #[error("OpenAPI to ODCS conversion error: {0}")]
24    OpenAPIToODCSError(String),
25    #[error("OpenAPI component not found: {0}")]
26    OpenAPIComponentNotFound(String),
27    #[error("OpenAPI schema invalid: {0}")]
28    OpenAPISchemaInvalid(String),
29    #[error("Nested object conversion failed: {0}")]
30    NestedObjectConversionFailed(String),
31}
32
33/// Reconstruct a Table from TableData
34///
35/// Converts import-format TableData/ColumnData into full Table/Column structs
36/// suitable for export operations.
37fn table_data_to_table(table_data: &TableData) -> Table {
38    let table_name = table_data
39        .name
40        .clone()
41        .unwrap_or_else(|| format!("table_{}", table_data.table_index));
42
43    let columns: Vec<Column> = table_data
44        .columns
45        .iter()
46        .map(column_data_to_column)
47        .collect();
48
49    Table::new(table_name, columns)
50}
51
52/// Convert ColumnData to Column
53fn column_data_to_column(col_data: &ColumnData) -> Column {
54    Column {
55        name: col_data.name.clone(),
56        data_type: col_data.data_type.clone(),
57        physical_type: col_data.physical_type.clone(),
58        nullable: col_data.nullable,
59        primary_key: col_data.primary_key,
60        secondary_key: false,
61        composite_key: None,
62        foreign_key: None,
63        constraints: Vec::new(),
64        description: col_data.description.clone().unwrap_or_default(),
65        errors: Vec::new(),
66        quality: col_data.quality.clone().unwrap_or_default(),
67        relationships: col_data.relationships.clone(),
68        enum_values: col_data.enum_values.clone().unwrap_or_default(),
69        column_order: 0,
70        nested_data: None,
71    }
72}
73
74/// Reconstruct full Table structs from ImportResult
75///
76/// This function converts the flat TableData/ColumnData structures from imports
77/// into complete Table/Column model structs that can be used for export.
78pub fn reconstruct_tables(import_result: &ImportResult) -> Vec<Table> {
79    import_result
80        .tables
81        .iter()
82        .map(table_data_to_table)
83        .collect()
84}
85
86/// Convert any import format to ODCS v3.1.0 YAML format.
87///
88/// # Arguments
89///
90/// * `input` - Format-specific content as a string
91/// * `format` - Optional format identifier. If None, attempts auto-detection.
92///   Supported formats: "sql", "json_schema", "avro", "protobuf", "odcl", "odcs", "cads", "odps", "domain"
93///
94/// # Returns
95///
96/// ODCS v3.1.0 YAML string, or ConversionError
97pub fn convert_to_odcs(input: &str, format: Option<&str>) -> Result<String, ConversionError> {
98    // Determine format (auto-detect if not specified)
99    let detected_format = if let Some(fmt) = format {
100        fmt
101    } else {
102        auto_detect_format(input)?
103    };
104
105    // Import using appropriate importer
106    let import_result = match detected_format {
107        "odcs" => {
108            let mut importer = ODCSImporter::new();
109            importer
110                .import(input)
111                .map_err(ConversionError::ImportError)?
112        }
113        "odcl" => {
114            let mut importer = ODCSImporter::new();
115            importer
116                .import(input)
117                .map_err(ConversionError::ImportError)?
118        }
119        "sql" => {
120            let importer = SQLImporter::new("postgresql");
121            importer
122                .parse(input)
123                .map_err(|e| ConversionError::ImportError(ImportError::ParseError(e.to_string())))?
124        }
125        "json_schema" => {
126            let importer = JSONSchemaImporter::new();
127            importer
128                .import(input)
129                .map_err(ConversionError::ImportError)?
130        }
131        "avro" => {
132            let importer = AvroImporter::new();
133            importer
134                .import(input)
135                .map_err(ConversionError::ImportError)?
136        }
137        "protobuf" => {
138            let importer = ProtobufImporter::new();
139            importer
140                .import(input)
141                .map_err(ConversionError::ImportError)?
142        }
143        "cads" => {
144            // CADS assets are compute assets, not data contracts
145            // For CADS → ODCS conversion, we create a minimal ODCS representation
146            // that captures metadata but doesn't represent a true data contract
147            // This is a placeholder - full conversion would require understanding
148            // the data schema produced by the CADS asset
149            let importer = CADSImporter::new();
150            let _asset = importer
151                .import(input)
152                .map_err(ConversionError::ImportError)?;
153
154            // For now, return an error indicating CADS → ODCS conversion
155            // requires additional context about the data schema
156            return Err(ConversionError::UnsupportedFormat(
157                "CADS → ODCS conversion requires data schema information. CADS assets represent compute resources, not data contracts.".to_string()
158            ));
159        }
160        "odps" => {
161            // ODPS Data Products link to ODCS Tables via contractId
162            // For ODPS → ODCS conversion, we extract the referenced ODCS Tables
163            // from the input/output ports and export them
164            let importer = ODPSImporter::new();
165            let product = importer
166                .import(input)
167                .map_err(ConversionError::ImportError)?;
168
169            // Extract contractIds from input and output ports
170            let mut contract_ids = Vec::new();
171            if let Some(input_ports) = &product.input_ports {
172                for port in input_ports {
173                    contract_ids.push(port.contract_id.clone());
174                }
175            }
176            if let Some(output_ports) = &product.output_ports {
177                for port in output_ports {
178                    if let Some(contract_id) = &port.contract_id {
179                        contract_ids.push(contract_id.clone());
180                    }
181                }
182            }
183
184            if contract_ids.is_empty() {
185                return Err(ConversionError::UnsupportedFormat(
186                    "ODPS → ODCS conversion requires contractId references. No contractIds found in input/output ports.".to_string()
187                ));
188            }
189
190            // For now, return an error indicating that ODPS → ODCS conversion
191            // requires the actual ODCS Table definitions to be provided
192            // In a full implementation, we would look up the ODCS Tables by contractId
193            return Err(ConversionError::UnsupportedFormat(format!(
194                "ODPS → ODCS conversion requires ODCS Table definitions for contractIds: {}. Please provide the referenced ODCS Tables.",
195                contract_ids.join(", ")
196            )));
197        }
198        "domain" => {
199            // Domain schema stores references to ODCS Tables (ODCSNode with table_id)
200            // but doesn't contain the full Table definitions
201            // For Domain → ODCS conversion, we need the actual Table definitions
202            let domain: Domain = serde_yaml::from_str(input).map_err(|e| {
203                ConversionError::ImportError(ImportError::ParseError(format!(
204                    "Failed to parse Domain YAML: {}",
205                    e
206                )))
207            })?;
208
209            // Extract ODCS node references
210            let odcs_node_count = domain.odcs_nodes.len();
211            if odcs_node_count == 0 {
212                return Err(ConversionError::UnsupportedFormat(
213                    "Domain → ODCS conversion: Domain contains no ODCS nodes.".to_string(),
214                ));
215            }
216
217            // Domain schema only stores references, not full Table definitions
218            // To convert Domain → ODCS, we need the actual Table definitions
219            // This would require looking up Tables by table_id from a DataModel or similar
220            return Err(ConversionError::UnsupportedFormat(format!(
221                "Domain → ODCS conversion requires Table definitions. Domain contains {} ODCS node references, but full Table definitions must be provided separately (e.g., from a DataModel).",
222                odcs_node_count
223            )));
224        }
225        _ => {
226            return Err(ConversionError::UnsupportedFormat(
227                detected_format.to_string(),
228            ));
229        }
230    };
231
232    // Check for empty input
233    if import_result.tables.is_empty() {
234        return Err(ConversionError::ImportError(ImportError::ParseError(
235            "No tables found in input".to_string(),
236        )));
237    }
238
239    // Reconstruct full Table structs from ImportResult
240    let tables = reconstruct_tables(&import_result);
241
242    // Export each table to ODCS format
243    let yaml_docs: Vec<String> = tables
244        .iter()
245        .map(|table| ODCSExporter::export_table(table, "odcs_v3_1_0"))
246        .collect();
247
248    Ok(yaml_docs.join("\n---\n"))
249}
250
251/// Convert ImportResult to a DataModel with fully reconstructed Tables
252///
253/// This is useful when you need the full DataModel structure after import,
254/// rather than just the YAML output.
255pub fn import_result_to_data_model(
256    import_result: &ImportResult,
257    model_name: &str,
258) -> Result<DataModel, ConversionError> {
259    if import_result.tables.is_empty() {
260        return Err(ConversionError::ImportError(ImportError::ParseError(
261            "No tables found in import result".to_string(),
262        )));
263    }
264
265    let tables = reconstruct_tables(import_result);
266
267    let mut model = DataModel::new(model_name.to_string(), String::new(), String::new());
268
269    for table in tables {
270        model.tables.push(table);
271    }
272
273    Ok(model)
274}
275
276/// Auto-detect format from input content
277fn auto_detect_format(input: &str) -> Result<&str, ConversionError> {
278    // Check for ODCS format
279    if input.contains("apiVersion:") && input.contains("kind: DataContract") {
280        return Ok("odcs");
281    }
282
283    // Check for ODCL format
284    if input.contains("dataContractSpecification:") {
285        return Ok("odcl");
286    }
287
288    // Check for SQL format
289    if input.to_uppercase().contains("CREATE TABLE") {
290        return Ok("sql");
291    }
292
293    // Check for JSON Schema format
294    if input.trim_start().starts_with('{')
295        && (input.contains("\"$schema\"") || input.contains("\"type\""))
296    {
297        return Ok("json_schema");
298    }
299
300    // Check for AVRO format
301    if input.contains("\"type\"") && input.contains("\"fields\"") && input.contains("\"name\"") {
302        return Ok("avro");
303    }
304
305    // Check for Protobuf format
306    if input.contains("syntax") || input.contains("message") || input.contains("service") {
307        return Ok("protobuf");
308    }
309
310    // Check for CADS format
311    if input.contains("apiVersion:")
312        && (input.contains("kind: AIModel")
313            || input.contains("kind: MLPipeline")
314            || input.contains("kind: Application")
315            || input.contains("kind: ETLPipeline")
316            || input.contains("kind: SourceSystem")
317            || input.contains("kind: DestinationSystem"))
318    {
319        return Ok("cads");
320    }
321
322    // Check for ODPS format
323    if input.contains("apiVersion:") && input.contains("kind: DataProduct") {
324        return Ok("odps");
325    }
326
327    // Check for Domain format (Business Domain schema)
328    if input.contains("systems:")
329        && (input.contains("cads_nodes:") || input.contains("odcs_nodes:"))
330    {
331        return Ok("domain");
332    }
333
334    Err(ConversionError::AutoDetectionFailed(
335        "Could not auto-detect format. Please specify format explicitly.".to_string(),
336    ))
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    #[test]
344    fn test_reconstruct_tables_from_import_result() {
345        let import_result = ImportResult {
346            tables: vec![TableData {
347                table_index: 0,
348                name: Some("users".to_string()),
349                columns: vec![
350                    ColumnData {
351                        name: "id".to_string(),
352                        data_type: "INTEGER".to_string(),
353                        physical_type: None,
354                        nullable: false,
355                        primary_key: true,
356                        description: Some("User ID".to_string()),
357                        quality: None,
358                        relationships: vec![],
359                        enum_values: None,
360                    },
361                    ColumnData {
362                        name: "name".to_string(),
363                        data_type: "VARCHAR(100)".to_string(),
364                        physical_type: None,
365                        nullable: true,
366                        primary_key: false,
367                        description: None,
368                        quality: None,
369                        relationships: vec![],
370                        enum_values: None,
371                    },
372                ],
373            }],
374            tables_requiring_name: vec![],
375            errors: vec![],
376            ai_suggestions: None,
377        };
378
379        let tables = reconstruct_tables(&import_result);
380        assert_eq!(tables.len(), 1);
381        assert_eq!(tables[0].name, "users");
382        assert_eq!(tables[0].columns.len(), 2);
383        assert_eq!(tables[0].columns[0].name, "id");
384        assert!(tables[0].columns[0].primary_key);
385        assert_eq!(tables[0].columns[0].description, "User ID");
386    }
387
388    #[test]
389    fn test_convert_sql_to_odcs() {
390        let sql = "CREATE TABLE users (id INTEGER PRIMARY KEY, name VARCHAR(100));";
391        let result = convert_to_odcs(sql, Some("sql"));
392        assert!(result.is_ok());
393        let yaml = result.unwrap();
394        assert!(yaml.contains("kind: DataContract"));
395        assert!(yaml.contains("users"));
396    }
397
398    #[test]
399    fn test_auto_detect_sql() {
400        let sql = "CREATE TABLE test (id INT);";
401        let format = auto_detect_format(sql);
402        assert!(format.is_ok());
403        assert_eq!(format.unwrap(), "sql");
404    }
405
406    #[test]
407    fn test_auto_detect_odcs() {
408        let odcs = "apiVersion: v3.1.0\nkind: DataContract\n";
409        let format = auto_detect_format(odcs);
410        assert!(format.is_ok());
411        assert_eq!(format.unwrap(), "odcs");
412    }
413
414    #[test]
415    fn test_import_result_to_data_model() {
416        let import_result = ImportResult {
417            tables: vec![TableData {
418                table_index: 0,
419                name: Some("orders".to_string()),
420                columns: vec![ColumnData {
421                    name: "order_id".to_string(),
422                    data_type: "UUID".to_string(),
423                    physical_type: None,
424                    nullable: false,
425                    primary_key: true,
426                    description: None,
427                    quality: None,
428                    relationships: vec![],
429                    enum_values: None,
430                }],
431            }],
432            tables_requiring_name: vec![],
433            errors: vec![],
434            ai_suggestions: None,
435        };
436
437        let model = import_result_to_data_model(&import_result, "test_model");
438        assert!(model.is_ok());
439        let model = model.unwrap();
440        assert_eq!(model.name, "test_model");
441        assert_eq!(model.tables.len(), 1);
442        assert_eq!(model.tables[0].name, "orders");
443    }
444}