data_modelling_core/convert/
converter.rs

1//! Universal format converter
2//!
3//! Converts any import format to ODCS v3.1.0 format.
4
5use crate::export::{ExportError, ODCSExporter};
6use crate::import::{
7    AvroImporter, CADSImporter, ColumnData, ImportError, ImportResult, JSONSchemaImporter,
8    ODCSImporter, ODPSImporter, ProtobufImporter, SQLImporter, TableData,
9};
10use crate::models::{Column, DataModel, Domain, Table};
11
12/// Error during format conversion
13#[derive(Debug, thiserror::Error)]
14pub enum ConversionError {
15    #[error("Import error: {0}")]
16    ImportError(#[from] ImportError),
17    #[error("Export error: {0}")]
18    ExportError(#[from] ExportError),
19    #[error("Unsupported format: {0}")]
20    UnsupportedFormat(String),
21    #[error("Auto-detection failed: {0}")]
22    AutoDetectionFailed(String),
23    #[error("OpenAPI to ODCS conversion error: {0}")]
24    OpenAPIToODCSError(String),
25    #[error("OpenAPI component not found: {0}")]
26    OpenAPIComponentNotFound(String),
27    #[error("OpenAPI schema invalid: {0}")]
28    OpenAPISchemaInvalid(String),
29    #[error("Nested object conversion failed: {0}")]
30    NestedObjectConversionFailed(String),
31}
32
33/// Parse STRUCT type columns into nested columns with dot notation
34fn parse_struct_columns(parent_name: &str, data_type: &str, col_data: &ColumnData) -> Vec<Column> {
35    let importer = ODCSImporter::new();
36
37    // Try to parse STRUCT type using ODCS importer's logic
38    let field_data = serde_json::Map::new();
39
40    match importer.parse_struct_type_from_string(parent_name, data_type, &field_data) {
41        Ok(nested_cols) if !nested_cols.is_empty() => {
42            let mut all_cols = Vec::new();
43
44            // Add parent column with simplified type
45            let parent_data_type = if data_type.to_uppercase().starts_with("ARRAY<") {
46                "ARRAY<STRUCT<...>>".to_string()
47            } else {
48                "STRUCT<...>".to_string()
49            };
50
51            all_cols.push(Column {
52                name: parent_name.to_string(),
53                data_type: parent_data_type,
54                physical_type: col_data.physical_type.clone(),
55                nullable: col_data.nullable,
56                primary_key: col_data.primary_key,
57                description: col_data.description.clone().unwrap_or_default(),
58                quality: col_data.quality.clone().unwrap_or_default(),
59                relationships: col_data.relationships.clone(),
60                enum_values: col_data.enum_values.clone().unwrap_or_default(),
61                ..Default::default()
62            });
63
64            // Add nested columns
65            all_cols.extend(nested_cols);
66            all_cols
67        }
68        _ => Vec::new(),
69    }
70}
71
72/// Reconstruct a Table from TableData
73///
74/// Converts import-format TableData/ColumnData into full Table/Column structs
75/// suitable for export operations. Handles STRUCT types by flattening them
76/// into nested columns with dot notation:
77/// - STRUCT<...> → parent.field
78/// - ARRAY<STRUCT<...>> → parent.[].field
79/// - MAP types are kept as-is (keys are dynamic)
80fn table_data_to_table(table_data: &TableData) -> Table {
81    let table_name = table_data
82        .name
83        .clone()
84        .unwrap_or_else(|| format!("table_{}", table_data.table_index));
85
86    let mut all_columns = Vec::new();
87
88    for col_data in &table_data.columns {
89        let data_type_upper = col_data.data_type.to_uppercase();
90        let is_map = data_type_upper.starts_with("MAP<");
91
92        // Skip parsing for MAP types - keys are dynamic
93        if is_map {
94            all_columns.push(column_data_to_column(col_data));
95            continue;
96        }
97
98        // For STRUCT or ARRAY<STRUCT> types, try to parse and create nested columns
99        let is_struct = data_type_upper.contains("STRUCT<");
100        if is_struct {
101            let struct_cols = parse_struct_columns(&col_data.name, &col_data.data_type, col_data);
102            if !struct_cols.is_empty() {
103                all_columns.extend(struct_cols);
104                continue;
105            }
106        }
107
108        // Regular column or STRUCT parsing failed - add as-is
109        all_columns.push(column_data_to_column(col_data));
110    }
111
112    Table::new(table_name, all_columns)
113}
114
115/// Convert ColumnData to Column, preserving ALL ODCS v3.1.0 fields
116fn column_data_to_column(col_data: &ColumnData) -> Column {
117    Column {
118        // Core Identity
119        id: col_data.id.clone(),
120        name: col_data.name.clone(),
121        business_name: col_data.business_name.clone(),
122        description: col_data.description.clone().unwrap_or_default(),
123        // Type Information
124        data_type: col_data.data_type.clone(),
125        physical_type: col_data.physical_type.clone(),
126        physical_name: col_data.physical_name.clone(),
127        logical_type_options: col_data.logical_type_options.clone(),
128        // Key Constraints
129        primary_key: col_data.primary_key,
130        primary_key_position: col_data.primary_key_position,
131        unique: col_data.unique,
132        nullable: col_data.nullable,
133        // Partitioning & Clustering
134        partitioned: col_data.partitioned,
135        partition_key_position: col_data.partition_key_position,
136        clustered: col_data.clustered,
137        // Data Classification & Security
138        classification: col_data.classification.clone(),
139        critical_data_element: col_data.critical_data_element,
140        encrypted_name: col_data.encrypted_name.clone(),
141        // Transformation Metadata
142        transform_source_objects: col_data.transform_source_objects.clone(),
143        transform_logic: col_data.transform_logic.clone(),
144        transform_description: col_data.transform_description.clone(),
145        // Examples & Documentation
146        examples: col_data.examples.clone(),
147        default_value: col_data.default_value.clone(),
148        // Relationships & References
149        relationships: col_data.relationships.clone(),
150        authoritative_definitions: col_data.authoritative_definitions.clone(),
151        // Quality & Validation
152        quality: col_data.quality.clone().unwrap_or_default(),
153        enum_values: col_data.enum_values.clone().unwrap_or_default(),
154        // Tags & Custom Properties
155        tags: col_data.tags.clone(),
156        custom_properties: col_data.custom_properties.clone(),
157        // Legacy/Internal Fields - use defaults
158        ..Default::default()
159    }
160}
161
162/// Reconstruct full Table structs from ImportResult
163///
164/// This function converts the flat TableData/ColumnData structures from imports
165/// into complete Table/Column model structs that can be used for export.
166pub fn reconstruct_tables(import_result: &ImportResult) -> Vec<Table> {
167    import_result
168        .tables
169        .iter()
170        .map(table_data_to_table)
171        .collect()
172}
173
174/// Convert any import format to ODCS v3.1.0 YAML format.
175///
176/// # Arguments
177///
178/// * `input` - Format-specific content as a string
179/// * `format` - Optional format identifier. If None, attempts auto-detection.
180///   Supported formats: "sql", "json_schema", "avro", "protobuf", "odcl", "odcs", "cads", "odps", "domain"
181///
182/// # Returns
183///
184/// ODCS v3.1.0 YAML string, or ConversionError
185pub fn convert_to_odcs(input: &str, format: Option<&str>) -> Result<String, ConversionError> {
186    // Determine format (auto-detect if not specified)
187    let detected_format = if let Some(fmt) = format {
188        fmt
189    } else {
190        auto_detect_format(input)?
191    };
192
193    // Import using appropriate importer
194    let import_result = match detected_format {
195        "odcs" => {
196            let mut importer = ODCSImporter::new();
197            importer
198                .import(input)
199                .map_err(ConversionError::ImportError)?
200        }
201        "odcl" => {
202            let mut importer = ODCSImporter::new();
203            importer
204                .import(input)
205                .map_err(ConversionError::ImportError)?
206        }
207        "sql" => {
208            let importer = SQLImporter::new("postgresql");
209            importer
210                .parse(input)
211                .map_err(|e| ConversionError::ImportError(ImportError::ParseError(e.to_string())))?
212        }
213        "json_schema" => {
214            let importer = JSONSchemaImporter::new();
215            importer
216                .import(input)
217                .map_err(ConversionError::ImportError)?
218        }
219        "avro" => {
220            let importer = AvroImporter::new();
221            importer
222                .import(input)
223                .map_err(ConversionError::ImportError)?
224        }
225        "protobuf" => {
226            let importer = ProtobufImporter::new();
227            importer
228                .import(input)
229                .map_err(ConversionError::ImportError)?
230        }
231        "cads" => {
232            // CADS assets are compute assets, not data contracts
233            // For CADS → ODCS conversion, we create a minimal ODCS representation
234            // that captures metadata but doesn't represent a true data contract
235            // This is a placeholder - full conversion would require understanding
236            // the data schema produced by the CADS asset
237            let importer = CADSImporter::new();
238            let _asset = importer
239                .import(input)
240                .map_err(ConversionError::ImportError)?;
241
242            // For now, return an error indicating CADS → ODCS conversion
243            // requires additional context about the data schema
244            return Err(ConversionError::UnsupportedFormat(
245                "CADS → ODCS conversion requires data schema information. CADS assets represent compute resources, not data contracts.".to_string()
246            ));
247        }
248        "odps" => {
249            // ODPS Data Products link to ODCS Tables via contractId
250            // For ODPS → ODCS conversion, we extract the referenced ODCS Tables
251            // from the input/output ports and export them
252            let importer = ODPSImporter::new();
253            let product = importer
254                .import(input)
255                .map_err(ConversionError::ImportError)?;
256
257            // Extract contractIds from input and output ports
258            let mut contract_ids = Vec::new();
259            if let Some(input_ports) = &product.input_ports {
260                for port in input_ports {
261                    contract_ids.push(port.contract_id.clone());
262                }
263            }
264            if let Some(output_ports) = &product.output_ports {
265                for port in output_ports {
266                    if let Some(contract_id) = &port.contract_id {
267                        contract_ids.push(contract_id.clone());
268                    }
269                }
270            }
271
272            if contract_ids.is_empty() {
273                return Err(ConversionError::UnsupportedFormat(
274                    "ODPS → ODCS conversion requires contractId references. No contractIds found in input/output ports.".to_string()
275                ));
276            }
277
278            // For now, return an error indicating that ODPS → ODCS conversion
279            // requires the actual ODCS Table definitions to be provided
280            // In a full implementation, we would look up the ODCS Tables by contractId
281            return Err(ConversionError::UnsupportedFormat(format!(
282                "ODPS → ODCS conversion requires ODCS Table definitions for contractIds: {}. Please provide the referenced ODCS Tables.",
283                contract_ids.join(", ")
284            )));
285        }
286        "domain" => {
287            // Domain schema stores references to ODCS Tables (ODCSNode with table_id)
288            // but doesn't contain the full Table definitions
289            // For Domain → ODCS conversion, we need the actual Table definitions
290            let domain: Domain = serde_yaml::from_str(input).map_err(|e| {
291                ConversionError::ImportError(ImportError::ParseError(format!(
292                    "Failed to parse Domain YAML: {}",
293                    e
294                )))
295            })?;
296
297            // Extract ODCS node references
298            let odcs_node_count = domain.odcs_nodes.len();
299            if odcs_node_count == 0 {
300                return Err(ConversionError::UnsupportedFormat(
301                    "Domain → ODCS conversion: Domain contains no ODCS nodes.".to_string(),
302                ));
303            }
304
305            // Domain schema only stores references, not full Table definitions
306            // To convert Domain → ODCS, we need the actual Table definitions
307            // This would require looking up Tables by table_id from a DataModel or similar
308            return Err(ConversionError::UnsupportedFormat(format!(
309                "Domain → ODCS conversion requires Table definitions. Domain contains {} ODCS node references, but full Table definitions must be provided separately (e.g., from a DataModel).",
310                odcs_node_count
311            )));
312        }
313        _ => {
314            return Err(ConversionError::UnsupportedFormat(
315                detected_format.to_string(),
316            ));
317        }
318    };
319
320    // Check for empty input
321    if import_result.tables.is_empty() {
322        return Err(ConversionError::ImportError(ImportError::ParseError(
323            "No tables found in input".to_string(),
324        )));
325    }
326
327    // Reconstruct full Table structs from ImportResult
328    let tables = reconstruct_tables(&import_result);
329
330    // Export each table to ODCS format
331    let yaml_docs: Vec<String> = tables
332        .iter()
333        .map(|table| ODCSExporter::export_table(table, "odcs_v3_1_0"))
334        .collect();
335
336    Ok(yaml_docs.join("\n---\n"))
337}
338
339/// Convert ImportResult to a DataModel with fully reconstructed Tables
340///
341/// This is useful when you need the full DataModel structure after import,
342/// rather than just the YAML output.
343pub fn import_result_to_data_model(
344    import_result: &ImportResult,
345    model_name: &str,
346) -> Result<DataModel, ConversionError> {
347    if import_result.tables.is_empty() {
348        return Err(ConversionError::ImportError(ImportError::ParseError(
349            "No tables found in import result".to_string(),
350        )));
351    }
352
353    let tables = reconstruct_tables(import_result);
354
355    let mut model = DataModel::new(model_name.to_string(), String::new(), String::new());
356
357    for table in tables {
358        model.tables.push(table);
359    }
360
361    Ok(model)
362}
363
364/// Auto-detect format from input content
365fn auto_detect_format(input: &str) -> Result<&str, ConversionError> {
366    // Check for ODCS format
367    if input.contains("apiVersion:") && input.contains("kind: DataContract") {
368        return Ok("odcs");
369    }
370
371    // Check for ODCL format
372    if input.contains("dataContractSpecification:") {
373        return Ok("odcl");
374    }
375
376    // Check for SQL format
377    if input.to_uppercase().contains("CREATE TABLE") {
378        return Ok("sql");
379    }
380
381    // Check for JSON Schema format
382    if input.trim_start().starts_with('{')
383        && (input.contains("\"$schema\"") || input.contains("\"type\""))
384    {
385        return Ok("json_schema");
386    }
387
388    // Check for AVRO format
389    if input.contains("\"type\"") && input.contains("\"fields\"") && input.contains("\"name\"") {
390        return Ok("avro");
391    }
392
393    // Check for Protobuf format
394    if input.contains("syntax") || input.contains("message") || input.contains("service") {
395        return Ok("protobuf");
396    }
397
398    // Check for CADS format
399    if input.contains("apiVersion:")
400        && (input.contains("kind: AIModel")
401            || input.contains("kind: MLPipeline")
402            || input.contains("kind: Application")
403            || input.contains("kind: ETLPipeline")
404            || input.contains("kind: SourceSystem")
405            || input.contains("kind: DestinationSystem"))
406    {
407        return Ok("cads");
408    }
409
410    // Check for ODPS format
411    if input.contains("apiVersion:") && input.contains("kind: DataProduct") {
412        return Ok("odps");
413    }
414
415    // Check for Domain format (Business Domain schema)
416    if input.contains("systems:")
417        && (input.contains("cads_nodes:") || input.contains("odcs_nodes:"))
418    {
419        return Ok("domain");
420    }
421
422    Err(ConversionError::AutoDetectionFailed(
423        "Could not auto-detect format. Please specify format explicitly.".to_string(),
424    ))
425}
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430
431    #[test]
432    fn test_reconstruct_tables_from_import_result() {
433        let import_result = ImportResult {
434            tables: vec![TableData {
435                table_index: 0,
436                id: Some("550e8400-e29b-41d4-a716-446655440000".to_string()),
437                name: Some("users".to_string()),
438                columns: vec![
439                    ColumnData {
440                        name: "id".to_string(),
441                        data_type: "INTEGER".to_string(),
442                        nullable: false,
443                        primary_key: true,
444                        description: Some("User ID".to_string()),
445                        ..Default::default()
446                    },
447                    ColumnData {
448                        name: "name".to_string(),
449                        data_type: "VARCHAR(100)".to_string(),
450                        nullable: true,
451                        ..Default::default()
452                    },
453                ],
454                ..Default::default()
455            }],
456            tables_requiring_name: vec![],
457            errors: vec![],
458            ai_suggestions: None,
459        };
460
461        let tables = reconstruct_tables(&import_result);
462        assert_eq!(tables.len(), 1);
463        assert_eq!(tables[0].name, "users");
464        assert_eq!(tables[0].columns.len(), 2);
465        assert_eq!(tables[0].columns[0].name, "id");
466        assert!(tables[0].columns[0].primary_key);
467        assert_eq!(tables[0].columns[0].description, "User ID");
468    }
469
470    #[test]
471    fn test_convert_sql_to_odcs() {
472        let sql = "CREATE TABLE users (id INTEGER PRIMARY KEY, name VARCHAR(100));";
473        let result = convert_to_odcs(sql, Some("sql"));
474        assert!(result.is_ok());
475        let yaml = result.unwrap();
476        assert!(yaml.contains("kind: DataContract"));
477        assert!(yaml.contains("users"));
478    }
479
480    #[test]
481    fn test_auto_detect_sql() {
482        let sql = "CREATE TABLE test (id INT);";
483        let format = auto_detect_format(sql);
484        assert!(format.is_ok());
485        assert_eq!(format.unwrap(), "sql");
486    }
487
488    #[test]
489    fn test_auto_detect_odcs() {
490        let odcs = "apiVersion: v3.1.0\nkind: DataContract\n";
491        let format = auto_detect_format(odcs);
492        assert!(format.is_ok());
493        assert_eq!(format.unwrap(), "odcs");
494    }
495
496    #[test]
497    fn test_import_result_to_data_model() {
498        let import_result = ImportResult {
499            tables: vec![TableData {
500                table_index: 0,
501                name: Some("orders".to_string()),
502                columns: vec![ColumnData {
503                    name: "order_id".to_string(),
504                    data_type: "UUID".to_string(),
505                    nullable: false,
506                    primary_key: true,
507                    ..Default::default()
508                }],
509                ..Default::default()
510            }],
511            tables_requiring_name: vec![],
512            errors: vec![],
513            ai_suggestions: None,
514        };
515
516        let model = import_result_to_data_model(&import_result, "test_model");
517        assert!(model.is_ok());
518        let model = model.unwrap();
519        assert_eq!(model.name, "test_model");
520        assert_eq!(model.tables.len(), 1);
521        assert_eq!(model.tables[0].name, "orders");
522    }
523}