data_modelling_sdk/convert/
converter.rs

1//! Universal format converter
2//!
3//! Converts any import format to ODCS v3.1.0 format.
4
5use crate::export::{ExportError, ODCSExporter};
6use crate::import::{
7    AvroImporter, CADSImporter, ColumnData, ImportError, ImportResult, JSONSchemaImporter,
8    ODCSImporter, ODPSImporter, ProtobufImporter, SQLImporter, TableData,
9};
10use crate::models::{Column, DataModel, Domain, Table};
11
12/// Error during format conversion
13#[derive(Debug, thiserror::Error)]
14pub enum ConversionError {
15    #[error("Import error: {0}")]
16    ImportError(#[from] ImportError),
17    #[error("Export error: {0}")]
18    ExportError(#[from] ExportError),
19    #[error("Unsupported format: {0}")]
20    UnsupportedFormat(String),
21    #[error("Auto-detection failed: {0}")]
22    AutoDetectionFailed(String),
23    #[error("OpenAPI to ODCS conversion error: {0}")]
24    OpenAPIToODCSError(String),
25    #[error("OpenAPI component not found: {0}")]
26    OpenAPIComponentNotFound(String),
27    #[error("OpenAPI schema invalid: {0}")]
28    OpenAPISchemaInvalid(String),
29    #[error("Nested object conversion failed: {0}")]
30    NestedObjectConversionFailed(String),
31}
32
33/// Parse STRUCT type columns into nested columns with dot notation
34fn parse_struct_columns(parent_name: &str, data_type: &str, col_data: &ColumnData) -> Vec<Column> {
35    let importer = ODCSImporter::new();
36
37    // Try to parse STRUCT type using ODCS importer's logic
38    let field_data = serde_json::Map::new();
39
40    match importer.parse_struct_type_from_string(parent_name, data_type, &field_data) {
41        Ok(nested_cols) if !nested_cols.is_empty() => {
42            let mut all_cols = Vec::new();
43
44            // Add parent column with simplified type
45            let parent_data_type = if data_type.to_uppercase().starts_with("ARRAY<") {
46                "ARRAY<STRUCT<...>>".to_string()
47            } else {
48                "STRUCT<...>".to_string()
49            };
50
51            all_cols.push(Column {
52                name: parent_name.to_string(),
53                data_type: parent_data_type,
54                physical_type: col_data.physical_type.clone(),
55                nullable: col_data.nullable,
56                primary_key: col_data.primary_key,
57                secondary_key: false,
58                composite_key: None,
59                foreign_key: None,
60                constraints: Vec::new(),
61                description: col_data.description.clone().unwrap_or_default(),
62                errors: Vec::new(),
63                quality: col_data.quality.clone().unwrap_or_default(),
64                relationships: col_data.relationships.clone(),
65                enum_values: col_data.enum_values.clone().unwrap_or_default(),
66                column_order: 0,
67                nested_data: None,
68            });
69
70            // Add nested columns
71            all_cols.extend(nested_cols);
72            all_cols
73        }
74        _ => Vec::new(),
75    }
76}
77
78/// Reconstruct a Table from TableData
79///
80/// Converts import-format TableData/ColumnData into full Table/Column structs
81/// suitable for export operations. Handles STRUCT types by flattening them
82/// into nested columns with dot notation:
83/// - STRUCT<...> → parent.field
84/// - ARRAY<STRUCT<...>> → parent.[].field
85/// - MAP types are kept as-is (keys are dynamic)
86fn table_data_to_table(table_data: &TableData) -> Table {
87    let table_name = table_data
88        .name
89        .clone()
90        .unwrap_or_else(|| format!("table_{}", table_data.table_index));
91
92    let mut all_columns = Vec::new();
93
94    for col_data in &table_data.columns {
95        let data_type_upper = col_data.data_type.to_uppercase();
96        let is_map = data_type_upper.starts_with("MAP<");
97
98        // Skip parsing for MAP types - keys are dynamic
99        if is_map {
100            all_columns.push(column_data_to_column(col_data));
101            continue;
102        }
103
104        // For STRUCT or ARRAY<STRUCT> types, try to parse and create nested columns
105        let is_struct = data_type_upper.contains("STRUCT<");
106        if is_struct {
107            let struct_cols = parse_struct_columns(&col_data.name, &col_data.data_type, col_data);
108            if !struct_cols.is_empty() {
109                all_columns.extend(struct_cols);
110                continue;
111            }
112        }
113
114        // Regular column or STRUCT parsing failed - add as-is
115        all_columns.push(column_data_to_column(col_data));
116    }
117
118    Table::new(table_name, all_columns)
119}
120
121/// Convert ColumnData to Column
122fn column_data_to_column(col_data: &ColumnData) -> Column {
123    Column {
124        name: col_data.name.clone(),
125        data_type: col_data.data_type.clone(),
126        physical_type: col_data.physical_type.clone(),
127        nullable: col_data.nullable,
128        primary_key: col_data.primary_key,
129        secondary_key: false,
130        composite_key: None,
131        foreign_key: None,
132        constraints: Vec::new(),
133        description: col_data.description.clone().unwrap_or_default(),
134        errors: Vec::new(),
135        quality: col_data.quality.clone().unwrap_or_default(),
136        relationships: col_data.relationships.clone(),
137        enum_values: col_data.enum_values.clone().unwrap_or_default(),
138        column_order: 0,
139        nested_data: None,
140    }
141}
142
143/// Reconstruct full Table structs from ImportResult
144///
145/// This function converts the flat TableData/ColumnData structures from imports
146/// into complete Table/Column model structs that can be used for export.
147pub fn reconstruct_tables(import_result: &ImportResult) -> Vec<Table> {
148    import_result
149        .tables
150        .iter()
151        .map(table_data_to_table)
152        .collect()
153}
154
155/// Convert any import format to ODCS v3.1.0 YAML format.
156///
157/// # Arguments
158///
159/// * `input` - Format-specific content as a string
160/// * `format` - Optional format identifier. If None, attempts auto-detection.
161///   Supported formats: "sql", "json_schema", "avro", "protobuf", "odcl", "odcs", "cads", "odps", "domain"
162///
163/// # Returns
164///
165/// ODCS v3.1.0 YAML string, or ConversionError
166pub fn convert_to_odcs(input: &str, format: Option<&str>) -> Result<String, ConversionError> {
167    // Determine format (auto-detect if not specified)
168    let detected_format = if let Some(fmt) = format {
169        fmt
170    } else {
171        auto_detect_format(input)?
172    };
173
174    // Import using appropriate importer
175    let import_result = match detected_format {
176        "odcs" => {
177            let mut importer = ODCSImporter::new();
178            importer
179                .import(input)
180                .map_err(ConversionError::ImportError)?
181        }
182        "odcl" => {
183            let mut importer = ODCSImporter::new();
184            importer
185                .import(input)
186                .map_err(ConversionError::ImportError)?
187        }
188        "sql" => {
189            let importer = SQLImporter::new("postgresql");
190            importer
191                .parse(input)
192                .map_err(|e| ConversionError::ImportError(ImportError::ParseError(e.to_string())))?
193        }
194        "json_schema" => {
195            let importer = JSONSchemaImporter::new();
196            importer
197                .import(input)
198                .map_err(ConversionError::ImportError)?
199        }
200        "avro" => {
201            let importer = AvroImporter::new();
202            importer
203                .import(input)
204                .map_err(ConversionError::ImportError)?
205        }
206        "protobuf" => {
207            let importer = ProtobufImporter::new();
208            importer
209                .import(input)
210                .map_err(ConversionError::ImportError)?
211        }
212        "cads" => {
213            // CADS assets are compute assets, not data contracts
214            // For CADS → ODCS conversion, we create a minimal ODCS representation
215            // that captures metadata but doesn't represent a true data contract
216            // This is a placeholder - full conversion would require understanding
217            // the data schema produced by the CADS asset
218            let importer = CADSImporter::new();
219            let _asset = importer
220                .import(input)
221                .map_err(ConversionError::ImportError)?;
222
223            // For now, return an error indicating CADS → ODCS conversion
224            // requires additional context about the data schema
225            return Err(ConversionError::UnsupportedFormat(
226                "CADS → ODCS conversion requires data schema information. CADS assets represent compute resources, not data contracts.".to_string()
227            ));
228        }
229        "odps" => {
230            // ODPS Data Products link to ODCS Tables via contractId
231            // For ODPS → ODCS conversion, we extract the referenced ODCS Tables
232            // from the input/output ports and export them
233            let importer = ODPSImporter::new();
234            let product = importer
235                .import(input)
236                .map_err(ConversionError::ImportError)?;
237
238            // Extract contractIds from input and output ports
239            let mut contract_ids = Vec::new();
240            if let Some(input_ports) = &product.input_ports {
241                for port in input_ports {
242                    contract_ids.push(port.contract_id.clone());
243                }
244            }
245            if let Some(output_ports) = &product.output_ports {
246                for port in output_ports {
247                    if let Some(contract_id) = &port.contract_id {
248                        contract_ids.push(contract_id.clone());
249                    }
250                }
251            }
252
253            if contract_ids.is_empty() {
254                return Err(ConversionError::UnsupportedFormat(
255                    "ODPS → ODCS conversion requires contractId references. No contractIds found in input/output ports.".to_string()
256                ));
257            }
258
259            // For now, return an error indicating that ODPS → ODCS conversion
260            // requires the actual ODCS Table definitions to be provided
261            // In a full implementation, we would look up the ODCS Tables by contractId
262            return Err(ConversionError::UnsupportedFormat(format!(
263                "ODPS → ODCS conversion requires ODCS Table definitions for contractIds: {}. Please provide the referenced ODCS Tables.",
264                contract_ids.join(", ")
265            )));
266        }
267        "domain" => {
268            // Domain schema stores references to ODCS Tables (ODCSNode with table_id)
269            // but doesn't contain the full Table definitions
270            // For Domain → ODCS conversion, we need the actual Table definitions
271            let domain: Domain = serde_yaml::from_str(input).map_err(|e| {
272                ConversionError::ImportError(ImportError::ParseError(format!(
273                    "Failed to parse Domain YAML: {}",
274                    e
275                )))
276            })?;
277
278            // Extract ODCS node references
279            let odcs_node_count = domain.odcs_nodes.len();
280            if odcs_node_count == 0 {
281                return Err(ConversionError::UnsupportedFormat(
282                    "Domain → ODCS conversion: Domain contains no ODCS nodes.".to_string(),
283                ));
284            }
285
286            // Domain schema only stores references, not full Table definitions
287            // To convert Domain → ODCS, we need the actual Table definitions
288            // This would require looking up Tables by table_id from a DataModel or similar
289            return Err(ConversionError::UnsupportedFormat(format!(
290                "Domain → ODCS conversion requires Table definitions. Domain contains {} ODCS node references, but full Table definitions must be provided separately (e.g., from a DataModel).",
291                odcs_node_count
292            )));
293        }
294        _ => {
295            return Err(ConversionError::UnsupportedFormat(
296                detected_format.to_string(),
297            ));
298        }
299    };
300
301    // Check for empty input
302    if import_result.tables.is_empty() {
303        return Err(ConversionError::ImportError(ImportError::ParseError(
304            "No tables found in input".to_string(),
305        )));
306    }
307
308    // Reconstruct full Table structs from ImportResult
309    let tables = reconstruct_tables(&import_result);
310
311    // Export each table to ODCS format
312    let yaml_docs: Vec<String> = tables
313        .iter()
314        .map(|table| ODCSExporter::export_table(table, "odcs_v3_1_0"))
315        .collect();
316
317    Ok(yaml_docs.join("\n---\n"))
318}
319
320/// Convert ImportResult to a DataModel with fully reconstructed Tables
321///
322/// This is useful when you need the full DataModel structure after import,
323/// rather than just the YAML output.
324pub fn import_result_to_data_model(
325    import_result: &ImportResult,
326    model_name: &str,
327) -> Result<DataModel, ConversionError> {
328    if import_result.tables.is_empty() {
329        return Err(ConversionError::ImportError(ImportError::ParseError(
330            "No tables found in import result".to_string(),
331        )));
332    }
333
334    let tables = reconstruct_tables(import_result);
335
336    let mut model = DataModel::new(model_name.to_string(), String::new(), String::new());
337
338    for table in tables {
339        model.tables.push(table);
340    }
341
342    Ok(model)
343}
344
345/// Auto-detect format from input content
346fn auto_detect_format(input: &str) -> Result<&str, ConversionError> {
347    // Check for ODCS format
348    if input.contains("apiVersion:") && input.contains("kind: DataContract") {
349        return Ok("odcs");
350    }
351
352    // Check for ODCL format
353    if input.contains("dataContractSpecification:") {
354        return Ok("odcl");
355    }
356
357    // Check for SQL format
358    if input.to_uppercase().contains("CREATE TABLE") {
359        return Ok("sql");
360    }
361
362    // Check for JSON Schema format
363    if input.trim_start().starts_with('{')
364        && (input.contains("\"$schema\"") || input.contains("\"type\""))
365    {
366        return Ok("json_schema");
367    }
368
369    // Check for AVRO format
370    if input.contains("\"type\"") && input.contains("\"fields\"") && input.contains("\"name\"") {
371        return Ok("avro");
372    }
373
374    // Check for Protobuf format
375    if input.contains("syntax") || input.contains("message") || input.contains("service") {
376        return Ok("protobuf");
377    }
378
379    // Check for CADS format
380    if input.contains("apiVersion:")
381        && (input.contains("kind: AIModel")
382            || input.contains("kind: MLPipeline")
383            || input.contains("kind: Application")
384            || input.contains("kind: ETLPipeline")
385            || input.contains("kind: SourceSystem")
386            || input.contains("kind: DestinationSystem"))
387    {
388        return Ok("cads");
389    }
390
391    // Check for ODPS format
392    if input.contains("apiVersion:") && input.contains("kind: DataProduct") {
393        return Ok("odps");
394    }
395
396    // Check for Domain format (Business Domain schema)
397    if input.contains("systems:")
398        && (input.contains("cads_nodes:") || input.contains("odcs_nodes:"))
399    {
400        return Ok("domain");
401    }
402
403    Err(ConversionError::AutoDetectionFailed(
404        "Could not auto-detect format. Please specify format explicitly.".to_string(),
405    ))
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411
412    #[test]
413    fn test_reconstruct_tables_from_import_result() {
414        let import_result = ImportResult {
415            tables: vec![TableData {
416                table_index: 0,
417                name: Some("users".to_string()),
418                columns: vec![
419                    ColumnData {
420                        name: "id".to_string(),
421                        data_type: "INTEGER".to_string(),
422                        physical_type: None,
423                        nullable: false,
424                        primary_key: true,
425                        description: Some("User ID".to_string()),
426                        quality: None,
427                        relationships: vec![],
428                        enum_values: None,
429                    },
430                    ColumnData {
431                        name: "name".to_string(),
432                        data_type: "VARCHAR(100)".to_string(),
433                        physical_type: None,
434                        nullable: true,
435                        primary_key: false,
436                        description: None,
437                        quality: None,
438                        relationships: vec![],
439                        enum_values: None,
440                    },
441                ],
442            }],
443            tables_requiring_name: vec![],
444            errors: vec![],
445            ai_suggestions: None,
446        };
447
448        let tables = reconstruct_tables(&import_result);
449        assert_eq!(tables.len(), 1);
450        assert_eq!(tables[0].name, "users");
451        assert_eq!(tables[0].columns.len(), 2);
452        assert_eq!(tables[0].columns[0].name, "id");
453        assert!(tables[0].columns[0].primary_key);
454        assert_eq!(tables[0].columns[0].description, "User ID");
455    }
456
457    #[test]
458    fn test_convert_sql_to_odcs() {
459        let sql = "CREATE TABLE users (id INTEGER PRIMARY KEY, name VARCHAR(100));";
460        let result = convert_to_odcs(sql, Some("sql"));
461        assert!(result.is_ok());
462        let yaml = result.unwrap();
463        assert!(yaml.contains("kind: DataContract"));
464        assert!(yaml.contains("users"));
465    }
466
467    #[test]
468    fn test_auto_detect_sql() {
469        let sql = "CREATE TABLE test (id INT);";
470        let format = auto_detect_format(sql);
471        assert!(format.is_ok());
472        assert_eq!(format.unwrap(), "sql");
473    }
474
475    #[test]
476    fn test_auto_detect_odcs() {
477        let odcs = "apiVersion: v3.1.0\nkind: DataContract\n";
478        let format = auto_detect_format(odcs);
479        assert!(format.is_ok());
480        assert_eq!(format.unwrap(), "odcs");
481    }
482
483    #[test]
484    fn test_import_result_to_data_model() {
485        let import_result = ImportResult {
486            tables: vec![TableData {
487                table_index: 0,
488                name: Some("orders".to_string()),
489                columns: vec![ColumnData {
490                    name: "order_id".to_string(),
491                    data_type: "UUID".to_string(),
492                    physical_type: None,
493                    nullable: false,
494                    primary_key: true,
495                    description: None,
496                    quality: None,
497                    relationships: vec![],
498                    enum_values: None,
499                }],
500            }],
501            tables_requiring_name: vec![],
502            errors: vec![],
503            ai_suggestions: None,
504        };
505
506        let model = import_result_to_data_model(&import_result, "test_model");
507        assert!(model.is_ok());
508        let model = model.unwrap();
509        assert_eq!(model.name, "test_model");
510        assert_eq!(model.tables.len(), 1);
511        assert_eq!(model.tables[0].name, "orders");
512    }
513}