data_modelling_sdk/convert/
converter.rs1use crate::export::{ExportError, ODCSExporter};
6use crate::import::{
7 AvroImporter, CADSImporter, ColumnData, ImportError, ImportResult, JSONSchemaImporter,
8 ODCSImporter, ODPSImporter, ProtobufImporter, SQLImporter, TableData,
9};
10use crate::models::{Column, DataModel, Domain, Table};
11
12#[derive(Debug, thiserror::Error)]
14pub enum ConversionError {
15 #[error("Import error: {0}")]
16 ImportError(#[from] ImportError),
17 #[error("Export error: {0}")]
18 ExportError(#[from] ExportError),
19 #[error("Unsupported format: {0}")]
20 UnsupportedFormat(String),
21 #[error("Auto-detection failed: {0}")]
22 AutoDetectionFailed(String),
23 #[error("OpenAPI to ODCS conversion error: {0}")]
24 OpenAPIToODCSError(String),
25 #[error("OpenAPI component not found: {0}")]
26 OpenAPIComponentNotFound(String),
27 #[error("OpenAPI schema invalid: {0}")]
28 OpenAPISchemaInvalid(String),
29 #[error("Nested object conversion failed: {0}")]
30 NestedObjectConversionFailed(String),
31}
32
33fn parse_struct_columns(parent_name: &str, data_type: &str, col_data: &ColumnData) -> Vec<Column> {
35 let importer = ODCSImporter::new();
36
37 let field_data = serde_json::Map::new();
39
40 match importer.parse_struct_type_from_string(parent_name, data_type, &field_data) {
41 Ok(nested_cols) if !nested_cols.is_empty() => {
42 let mut all_cols = Vec::new();
43
44 let parent_data_type = if data_type.to_uppercase().starts_with("ARRAY<") {
46 "ARRAY<STRUCT<...>>".to_string()
47 } else {
48 "STRUCT<...>".to_string()
49 };
50
51 all_cols.push(Column {
52 name: parent_name.to_string(),
53 data_type: parent_data_type,
54 physical_type: col_data.physical_type.clone(),
55 nullable: col_data.nullable,
56 primary_key: col_data.primary_key,
57 secondary_key: false,
58 composite_key: None,
59 foreign_key: None,
60 constraints: Vec::new(),
61 description: col_data.description.clone().unwrap_or_default(),
62 errors: Vec::new(),
63 quality: col_data.quality.clone().unwrap_or_default(),
64 relationships: col_data.relationships.clone(),
65 enum_values: col_data.enum_values.clone().unwrap_or_default(),
66 column_order: 0,
67 nested_data: None,
68 });
69
70 all_cols.extend(nested_cols);
72 all_cols
73 }
74 _ => Vec::new(),
75 }
76}
77
78fn table_data_to_table(table_data: &TableData) -> Table {
87 let table_name = table_data
88 .name
89 .clone()
90 .unwrap_or_else(|| format!("table_{}", table_data.table_index));
91
92 let mut all_columns = Vec::new();
93
94 for col_data in &table_data.columns {
95 let data_type_upper = col_data.data_type.to_uppercase();
96 let is_map = data_type_upper.starts_with("MAP<");
97
98 if is_map {
100 all_columns.push(column_data_to_column(col_data));
101 continue;
102 }
103
104 let is_struct = data_type_upper.contains("STRUCT<");
106 if is_struct {
107 let struct_cols = parse_struct_columns(&col_data.name, &col_data.data_type, col_data);
108 if !struct_cols.is_empty() {
109 all_columns.extend(struct_cols);
110 continue;
111 }
112 }
113
114 all_columns.push(column_data_to_column(col_data));
116 }
117
118 Table::new(table_name, all_columns)
119}
120
121fn column_data_to_column(col_data: &ColumnData) -> Column {
123 Column {
124 name: col_data.name.clone(),
125 data_type: col_data.data_type.clone(),
126 physical_type: col_data.physical_type.clone(),
127 nullable: col_data.nullable,
128 primary_key: col_data.primary_key,
129 secondary_key: false,
130 composite_key: None,
131 foreign_key: None,
132 constraints: Vec::new(),
133 description: col_data.description.clone().unwrap_or_default(),
134 errors: Vec::new(),
135 quality: col_data.quality.clone().unwrap_or_default(),
136 relationships: col_data.relationships.clone(),
137 enum_values: col_data.enum_values.clone().unwrap_or_default(),
138 column_order: 0,
139 nested_data: None,
140 }
141}
142
143pub fn reconstruct_tables(import_result: &ImportResult) -> Vec<Table> {
148 import_result
149 .tables
150 .iter()
151 .map(table_data_to_table)
152 .collect()
153}
154
155pub fn convert_to_odcs(input: &str, format: Option<&str>) -> Result<String, ConversionError> {
167 let detected_format = if let Some(fmt) = format {
169 fmt
170 } else {
171 auto_detect_format(input)?
172 };
173
174 let import_result = match detected_format {
176 "odcs" => {
177 let mut importer = ODCSImporter::new();
178 importer
179 .import(input)
180 .map_err(ConversionError::ImportError)?
181 }
182 "odcl" => {
183 let mut importer = ODCSImporter::new();
184 importer
185 .import(input)
186 .map_err(ConversionError::ImportError)?
187 }
188 "sql" => {
189 let importer = SQLImporter::new("postgresql");
190 importer
191 .parse(input)
192 .map_err(|e| ConversionError::ImportError(ImportError::ParseError(e.to_string())))?
193 }
194 "json_schema" => {
195 let importer = JSONSchemaImporter::new();
196 importer
197 .import(input)
198 .map_err(ConversionError::ImportError)?
199 }
200 "avro" => {
201 let importer = AvroImporter::new();
202 importer
203 .import(input)
204 .map_err(ConversionError::ImportError)?
205 }
206 "protobuf" => {
207 let importer = ProtobufImporter::new();
208 importer
209 .import(input)
210 .map_err(ConversionError::ImportError)?
211 }
212 "cads" => {
213 let importer = CADSImporter::new();
219 let _asset = importer
220 .import(input)
221 .map_err(ConversionError::ImportError)?;
222
223 return Err(ConversionError::UnsupportedFormat(
226 "CADS → ODCS conversion requires data schema information. CADS assets represent compute resources, not data contracts.".to_string()
227 ));
228 }
229 "odps" => {
230 let importer = ODPSImporter::new();
234 let product = importer
235 .import(input)
236 .map_err(ConversionError::ImportError)?;
237
238 let mut contract_ids = Vec::new();
240 if let Some(input_ports) = &product.input_ports {
241 for port in input_ports {
242 contract_ids.push(port.contract_id.clone());
243 }
244 }
245 if let Some(output_ports) = &product.output_ports {
246 for port in output_ports {
247 if let Some(contract_id) = &port.contract_id {
248 contract_ids.push(contract_id.clone());
249 }
250 }
251 }
252
253 if contract_ids.is_empty() {
254 return Err(ConversionError::UnsupportedFormat(
255 "ODPS → ODCS conversion requires contractId references. No contractIds found in input/output ports.".to_string()
256 ));
257 }
258
259 return Err(ConversionError::UnsupportedFormat(format!(
263 "ODPS → ODCS conversion requires ODCS Table definitions for contractIds: {}. Please provide the referenced ODCS Tables.",
264 contract_ids.join(", ")
265 )));
266 }
267 "domain" => {
268 let domain: Domain = serde_yaml::from_str(input).map_err(|e| {
272 ConversionError::ImportError(ImportError::ParseError(format!(
273 "Failed to parse Domain YAML: {}",
274 e
275 )))
276 })?;
277
278 let odcs_node_count = domain.odcs_nodes.len();
280 if odcs_node_count == 0 {
281 return Err(ConversionError::UnsupportedFormat(
282 "Domain → ODCS conversion: Domain contains no ODCS nodes.".to_string(),
283 ));
284 }
285
286 return Err(ConversionError::UnsupportedFormat(format!(
290 "Domain → ODCS conversion requires Table definitions. Domain contains {} ODCS node references, but full Table definitions must be provided separately (e.g., from a DataModel).",
291 odcs_node_count
292 )));
293 }
294 _ => {
295 return Err(ConversionError::UnsupportedFormat(
296 detected_format.to_string(),
297 ));
298 }
299 };
300
301 if import_result.tables.is_empty() {
303 return Err(ConversionError::ImportError(ImportError::ParseError(
304 "No tables found in input".to_string(),
305 )));
306 }
307
308 let tables = reconstruct_tables(&import_result);
310
311 let yaml_docs: Vec<String> = tables
313 .iter()
314 .map(|table| ODCSExporter::export_table(table, "odcs_v3_1_0"))
315 .collect();
316
317 Ok(yaml_docs.join("\n---\n"))
318}
319
320pub fn import_result_to_data_model(
325 import_result: &ImportResult,
326 model_name: &str,
327) -> Result<DataModel, ConversionError> {
328 if import_result.tables.is_empty() {
329 return Err(ConversionError::ImportError(ImportError::ParseError(
330 "No tables found in import result".to_string(),
331 )));
332 }
333
334 let tables = reconstruct_tables(import_result);
335
336 let mut model = DataModel::new(model_name.to_string(), String::new(), String::new());
337
338 for table in tables {
339 model.tables.push(table);
340 }
341
342 Ok(model)
343}
344
345fn auto_detect_format(input: &str) -> Result<&str, ConversionError> {
347 if input.contains("apiVersion:") && input.contains("kind: DataContract") {
349 return Ok("odcs");
350 }
351
352 if input.contains("dataContractSpecification:") {
354 return Ok("odcl");
355 }
356
357 if input.to_uppercase().contains("CREATE TABLE") {
359 return Ok("sql");
360 }
361
362 if input.trim_start().starts_with('{')
364 && (input.contains("\"$schema\"") || input.contains("\"type\""))
365 {
366 return Ok("json_schema");
367 }
368
369 if input.contains("\"type\"") && input.contains("\"fields\"") && input.contains("\"name\"") {
371 return Ok("avro");
372 }
373
374 if input.contains("syntax") || input.contains("message") || input.contains("service") {
376 return Ok("protobuf");
377 }
378
379 if input.contains("apiVersion:")
381 && (input.contains("kind: AIModel")
382 || input.contains("kind: MLPipeline")
383 || input.contains("kind: Application")
384 || input.contains("kind: ETLPipeline")
385 || input.contains("kind: SourceSystem")
386 || input.contains("kind: DestinationSystem"))
387 {
388 return Ok("cads");
389 }
390
391 if input.contains("apiVersion:") && input.contains("kind: DataProduct") {
393 return Ok("odps");
394 }
395
396 if input.contains("systems:")
398 && (input.contains("cads_nodes:") || input.contains("odcs_nodes:"))
399 {
400 return Ok("domain");
401 }
402
403 Err(ConversionError::AutoDetectionFailed(
404 "Could not auto-detect format. Please specify format explicitly.".to_string(),
405 ))
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411
412 #[test]
413 fn test_reconstruct_tables_from_import_result() {
414 let import_result = ImportResult {
415 tables: vec![TableData {
416 table_index: 0,
417 name: Some("users".to_string()),
418 columns: vec![
419 ColumnData {
420 name: "id".to_string(),
421 data_type: "INTEGER".to_string(),
422 physical_type: None,
423 nullable: false,
424 primary_key: true,
425 description: Some("User ID".to_string()),
426 quality: None,
427 relationships: vec![],
428 enum_values: None,
429 },
430 ColumnData {
431 name: "name".to_string(),
432 data_type: "VARCHAR(100)".to_string(),
433 physical_type: None,
434 nullable: true,
435 primary_key: false,
436 description: None,
437 quality: None,
438 relationships: vec![],
439 enum_values: None,
440 },
441 ],
442 }],
443 tables_requiring_name: vec![],
444 errors: vec![],
445 ai_suggestions: None,
446 };
447
448 let tables = reconstruct_tables(&import_result);
449 assert_eq!(tables.len(), 1);
450 assert_eq!(tables[0].name, "users");
451 assert_eq!(tables[0].columns.len(), 2);
452 assert_eq!(tables[0].columns[0].name, "id");
453 assert!(tables[0].columns[0].primary_key);
454 assert_eq!(tables[0].columns[0].description, "User ID");
455 }
456
457 #[test]
458 fn test_convert_sql_to_odcs() {
459 let sql = "CREATE TABLE users (id INTEGER PRIMARY KEY, name VARCHAR(100));";
460 let result = convert_to_odcs(sql, Some("sql"));
461 assert!(result.is_ok());
462 let yaml = result.unwrap();
463 assert!(yaml.contains("kind: DataContract"));
464 assert!(yaml.contains("users"));
465 }
466
467 #[test]
468 fn test_auto_detect_sql() {
469 let sql = "CREATE TABLE test (id INT);";
470 let format = auto_detect_format(sql);
471 assert!(format.is_ok());
472 assert_eq!(format.unwrap(), "sql");
473 }
474
475 #[test]
476 fn test_auto_detect_odcs() {
477 let odcs = "apiVersion: v3.1.0\nkind: DataContract\n";
478 let format = auto_detect_format(odcs);
479 assert!(format.is_ok());
480 assert_eq!(format.unwrap(), "odcs");
481 }
482
483 #[test]
484 fn test_import_result_to_data_model() {
485 let import_result = ImportResult {
486 tables: vec![TableData {
487 table_index: 0,
488 name: Some("orders".to_string()),
489 columns: vec![ColumnData {
490 name: "order_id".to_string(),
491 data_type: "UUID".to_string(),
492 physical_type: None,
493 nullable: false,
494 primary_key: true,
495 description: None,
496 quality: None,
497 relationships: vec![],
498 enum_values: None,
499 }],
500 }],
501 tables_requiring_name: vec![],
502 errors: vec![],
503 ai_suggestions: None,
504 };
505
506 let model = import_result_to_data_model(&import_result, "test_model");
507 assert!(model.is_ok());
508 let model = model.unwrap();
509 assert_eq!(model.name, "test_model");
510 assert_eq!(model.tables.len(), 1);
511 assert_eq!(model.tables[0].name, "orders");
512 }
513}