data_modelling_sdk/convert/
converter.rs1use crate::export::{ExportError, ODCSExporter};
6use crate::import::{
7 AvroImporter, CADSImporter, ColumnData, ImportError, ImportResult, JSONSchemaImporter,
8 ODCSImporter, ODPSImporter, ProtobufImporter, SQLImporter, TableData,
9};
10use crate::models::{Column, DataModel, Domain, Table};
11
12#[derive(Debug, thiserror::Error)]
14pub enum ConversionError {
15 #[error("Import error: {0}")]
16 ImportError(#[from] ImportError),
17 #[error("Export error: {0}")]
18 ExportError(#[from] ExportError),
19 #[error("Unsupported format: {0}")]
20 UnsupportedFormat(String),
21 #[error("Auto-detection failed: {0}")]
22 AutoDetectionFailed(String),
23 #[error("OpenAPI to ODCS conversion error: {0}")]
24 OpenAPIToODCSError(String),
25 #[error("OpenAPI component not found: {0}")]
26 OpenAPIComponentNotFound(String),
27 #[error("OpenAPI schema invalid: {0}")]
28 OpenAPISchemaInvalid(String),
29 #[error("Nested object conversion failed: {0}")]
30 NestedObjectConversionFailed(String),
31}
32
33fn parse_struct_columns(parent_name: &str, data_type: &str, col_data: &ColumnData) -> Vec<Column> {
35 let importer = ODCSImporter::new();
36
37 let field_data = serde_json::Map::new();
39
40 match importer.parse_struct_type_from_string(parent_name, data_type, &field_data) {
41 Ok(nested_cols) if !nested_cols.is_empty() => {
42 let mut all_cols = Vec::new();
43
44 let parent_data_type = if data_type.to_uppercase().starts_with("ARRAY<") {
46 "ARRAY<STRUCT<...>>".to_string()
47 } else {
48 "STRUCT<...>".to_string()
49 };
50
51 all_cols.push(Column {
52 name: parent_name.to_string(),
53 data_type: parent_data_type,
54 physical_type: col_data.physical_type.clone(),
55 nullable: col_data.nullable,
56 primary_key: col_data.primary_key,
57 description: col_data.description.clone().unwrap_or_default(),
58 quality: col_data.quality.clone().unwrap_or_default(),
59 relationships: col_data.relationships.clone(),
60 enum_values: col_data.enum_values.clone().unwrap_or_default(),
61 ..Default::default()
62 });
63
64 all_cols.extend(nested_cols);
66 all_cols
67 }
68 _ => Vec::new(),
69 }
70}
71
72fn table_data_to_table(table_data: &TableData) -> Table {
81 let table_name = table_data
82 .name
83 .clone()
84 .unwrap_or_else(|| format!("table_{}", table_data.table_index));
85
86 let mut all_columns = Vec::new();
87
88 for col_data in &table_data.columns {
89 let data_type_upper = col_data.data_type.to_uppercase();
90 let is_map = data_type_upper.starts_with("MAP<");
91
92 if is_map {
94 all_columns.push(column_data_to_column(col_data));
95 continue;
96 }
97
98 let is_struct = data_type_upper.contains("STRUCT<");
100 if is_struct {
101 let struct_cols = parse_struct_columns(&col_data.name, &col_data.data_type, col_data);
102 if !struct_cols.is_empty() {
103 all_columns.extend(struct_cols);
104 continue;
105 }
106 }
107
108 all_columns.push(column_data_to_column(col_data));
110 }
111
112 Table::new(table_name, all_columns)
113}
114
115fn column_data_to_column(col_data: &ColumnData) -> Column {
117 Column {
118 id: col_data.id.clone(),
120 name: col_data.name.clone(),
121 business_name: col_data.business_name.clone(),
122 description: col_data.description.clone().unwrap_or_default(),
123 data_type: col_data.data_type.clone(),
125 physical_type: col_data.physical_type.clone(),
126 physical_name: col_data.physical_name.clone(),
127 logical_type_options: col_data.logical_type_options.clone(),
128 primary_key: col_data.primary_key,
130 primary_key_position: col_data.primary_key_position,
131 unique: col_data.unique,
132 nullable: col_data.nullable,
133 partitioned: col_data.partitioned,
135 partition_key_position: col_data.partition_key_position,
136 clustered: col_data.clustered,
137 classification: col_data.classification.clone(),
139 critical_data_element: col_data.critical_data_element,
140 encrypted_name: col_data.encrypted_name.clone(),
141 transform_source_objects: col_data.transform_source_objects.clone(),
143 transform_logic: col_data.transform_logic.clone(),
144 transform_description: col_data.transform_description.clone(),
145 examples: col_data.examples.clone(),
147 default_value: col_data.default_value.clone(),
148 relationships: col_data.relationships.clone(),
150 authoritative_definitions: col_data.authoritative_definitions.clone(),
151 quality: col_data.quality.clone().unwrap_or_default(),
153 enum_values: col_data.enum_values.clone().unwrap_or_default(),
154 tags: col_data.tags.clone(),
156 custom_properties: col_data.custom_properties.clone(),
157 ..Default::default()
159 }
160}
161
162pub fn reconstruct_tables(import_result: &ImportResult) -> Vec<Table> {
167 import_result
168 .tables
169 .iter()
170 .map(table_data_to_table)
171 .collect()
172}
173
174pub fn convert_to_odcs(input: &str, format: Option<&str>) -> Result<String, ConversionError> {
186 let detected_format = if let Some(fmt) = format {
188 fmt
189 } else {
190 auto_detect_format(input)?
191 };
192
193 let import_result = match detected_format {
195 "odcs" => {
196 let mut importer = ODCSImporter::new();
197 importer
198 .import(input)
199 .map_err(ConversionError::ImportError)?
200 }
201 "odcl" => {
202 let mut importer = ODCSImporter::new();
203 importer
204 .import(input)
205 .map_err(ConversionError::ImportError)?
206 }
207 "sql" => {
208 let importer = SQLImporter::new("postgresql");
209 importer
210 .parse(input)
211 .map_err(|e| ConversionError::ImportError(ImportError::ParseError(e.to_string())))?
212 }
213 "json_schema" => {
214 let importer = JSONSchemaImporter::new();
215 importer
216 .import(input)
217 .map_err(ConversionError::ImportError)?
218 }
219 "avro" => {
220 let importer = AvroImporter::new();
221 importer
222 .import(input)
223 .map_err(ConversionError::ImportError)?
224 }
225 "protobuf" => {
226 let importer = ProtobufImporter::new();
227 importer
228 .import(input)
229 .map_err(ConversionError::ImportError)?
230 }
231 "cads" => {
232 let importer = CADSImporter::new();
238 let _asset = importer
239 .import(input)
240 .map_err(ConversionError::ImportError)?;
241
242 return Err(ConversionError::UnsupportedFormat(
245 "CADS → ODCS conversion requires data schema information. CADS assets represent compute resources, not data contracts.".to_string()
246 ));
247 }
248 "odps" => {
249 let importer = ODPSImporter::new();
253 let product = importer
254 .import(input)
255 .map_err(ConversionError::ImportError)?;
256
257 let mut contract_ids = Vec::new();
259 if let Some(input_ports) = &product.input_ports {
260 for port in input_ports {
261 contract_ids.push(port.contract_id.clone());
262 }
263 }
264 if let Some(output_ports) = &product.output_ports {
265 for port in output_ports {
266 if let Some(contract_id) = &port.contract_id {
267 contract_ids.push(contract_id.clone());
268 }
269 }
270 }
271
272 if contract_ids.is_empty() {
273 return Err(ConversionError::UnsupportedFormat(
274 "ODPS → ODCS conversion requires contractId references. No contractIds found in input/output ports.".to_string()
275 ));
276 }
277
278 return Err(ConversionError::UnsupportedFormat(format!(
282 "ODPS → ODCS conversion requires ODCS Table definitions for contractIds: {}. Please provide the referenced ODCS Tables.",
283 contract_ids.join(", ")
284 )));
285 }
286 "domain" => {
287 let domain: Domain = serde_yaml::from_str(input).map_err(|e| {
291 ConversionError::ImportError(ImportError::ParseError(format!(
292 "Failed to parse Domain YAML: {}",
293 e
294 )))
295 })?;
296
297 let odcs_node_count = domain.odcs_nodes.len();
299 if odcs_node_count == 0 {
300 return Err(ConversionError::UnsupportedFormat(
301 "Domain → ODCS conversion: Domain contains no ODCS nodes.".to_string(),
302 ));
303 }
304
305 return Err(ConversionError::UnsupportedFormat(format!(
309 "Domain → ODCS conversion requires Table definitions. Domain contains {} ODCS node references, but full Table definitions must be provided separately (e.g., from a DataModel).",
310 odcs_node_count
311 )));
312 }
313 _ => {
314 return Err(ConversionError::UnsupportedFormat(
315 detected_format.to_string(),
316 ));
317 }
318 };
319
320 if import_result.tables.is_empty() {
322 return Err(ConversionError::ImportError(ImportError::ParseError(
323 "No tables found in input".to_string(),
324 )));
325 }
326
327 let tables = reconstruct_tables(&import_result);
329
330 let yaml_docs: Vec<String> = tables
332 .iter()
333 .map(|table| ODCSExporter::export_table(table, "odcs_v3_1_0"))
334 .collect();
335
336 Ok(yaml_docs.join("\n---\n"))
337}
338
339pub fn import_result_to_data_model(
344 import_result: &ImportResult,
345 model_name: &str,
346) -> Result<DataModel, ConversionError> {
347 if import_result.tables.is_empty() {
348 return Err(ConversionError::ImportError(ImportError::ParseError(
349 "No tables found in import result".to_string(),
350 )));
351 }
352
353 let tables = reconstruct_tables(import_result);
354
355 let mut model = DataModel::new(model_name.to_string(), String::new(), String::new());
356
357 for table in tables {
358 model.tables.push(table);
359 }
360
361 Ok(model)
362}
363
364fn auto_detect_format(input: &str) -> Result<&str, ConversionError> {
366 if input.contains("apiVersion:") && input.contains("kind: DataContract") {
368 return Ok("odcs");
369 }
370
371 if input.contains("dataContractSpecification:") {
373 return Ok("odcl");
374 }
375
376 if input.to_uppercase().contains("CREATE TABLE") {
378 return Ok("sql");
379 }
380
381 if input.trim_start().starts_with('{')
383 && (input.contains("\"$schema\"") || input.contains("\"type\""))
384 {
385 return Ok("json_schema");
386 }
387
388 if input.contains("\"type\"") && input.contains("\"fields\"") && input.contains("\"name\"") {
390 return Ok("avro");
391 }
392
393 if input.contains("syntax") || input.contains("message") || input.contains("service") {
395 return Ok("protobuf");
396 }
397
398 if input.contains("apiVersion:")
400 && (input.contains("kind: AIModel")
401 || input.contains("kind: MLPipeline")
402 || input.contains("kind: Application")
403 || input.contains("kind: ETLPipeline")
404 || input.contains("kind: SourceSystem")
405 || input.contains("kind: DestinationSystem"))
406 {
407 return Ok("cads");
408 }
409
410 if input.contains("apiVersion:") && input.contains("kind: DataProduct") {
412 return Ok("odps");
413 }
414
415 if input.contains("systems:")
417 && (input.contains("cads_nodes:") || input.contains("odcs_nodes:"))
418 {
419 return Ok("domain");
420 }
421
422 Err(ConversionError::AutoDetectionFailed(
423 "Could not auto-detect format. Please specify format explicitly.".to_string(),
424 ))
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430
431 #[test]
432 fn test_reconstruct_tables_from_import_result() {
433 let import_result = ImportResult {
434 tables: vec![TableData {
435 table_index: 0,
436 name: Some("users".to_string()),
437 columns: vec![
438 ColumnData {
439 name: "id".to_string(),
440 data_type: "INTEGER".to_string(),
441 nullable: false,
442 primary_key: true,
443 description: Some("User ID".to_string()),
444 ..Default::default()
445 },
446 ColumnData {
447 name: "name".to_string(),
448 data_type: "VARCHAR(100)".to_string(),
449 nullable: true,
450 ..Default::default()
451 },
452 ],
453 }],
454 tables_requiring_name: vec![],
455 errors: vec![],
456 ai_suggestions: None,
457 };
458
459 let tables = reconstruct_tables(&import_result);
460 assert_eq!(tables.len(), 1);
461 assert_eq!(tables[0].name, "users");
462 assert_eq!(tables[0].columns.len(), 2);
463 assert_eq!(tables[0].columns[0].name, "id");
464 assert!(tables[0].columns[0].primary_key);
465 assert_eq!(tables[0].columns[0].description, "User ID");
466 }
467
468 #[test]
469 fn test_convert_sql_to_odcs() {
470 let sql = "CREATE TABLE users (id INTEGER PRIMARY KEY, name VARCHAR(100));";
471 let result = convert_to_odcs(sql, Some("sql"));
472 assert!(result.is_ok());
473 let yaml = result.unwrap();
474 assert!(yaml.contains("kind: DataContract"));
475 assert!(yaml.contains("users"));
476 }
477
478 #[test]
479 fn test_auto_detect_sql() {
480 let sql = "CREATE TABLE test (id INT);";
481 let format = auto_detect_format(sql);
482 assert!(format.is_ok());
483 assert_eq!(format.unwrap(), "sql");
484 }
485
486 #[test]
487 fn test_auto_detect_odcs() {
488 let odcs = "apiVersion: v3.1.0\nkind: DataContract\n";
489 let format = auto_detect_format(odcs);
490 assert!(format.is_ok());
491 assert_eq!(format.unwrap(), "odcs");
492 }
493
494 #[test]
495 fn test_import_result_to_data_model() {
496 let import_result = ImportResult {
497 tables: vec![TableData {
498 table_index: 0,
499 name: Some("orders".to_string()),
500 columns: vec![ColumnData {
501 name: "order_id".to_string(),
502 data_type: "UUID".to_string(),
503 nullable: false,
504 primary_key: true,
505 ..Default::default()
506 }],
507 }],
508 tables_requiring_name: vec![],
509 errors: vec![],
510 ai_suggestions: None,
511 };
512
513 let model = import_result_to_data_model(&import_result, "test_model");
514 assert!(model.is_ok());
515 let model = model.unwrap();
516 assert_eq!(model.name, "test_model");
517 assert_eq!(model.tables.len(), 1);
518 assert_eq!(model.tables[0].name, "orders");
519 }
520}