data_modelling_sdk/convert/
converter.rs1use crate::export::{ExportError, ODCSExporter};
6use crate::import::{
7 AvroImporter, CADSImporter, ColumnData, ImportError, ImportResult, JSONSchemaImporter,
8 ODCSImporter, ODPSImporter, ProtobufImporter, SQLImporter, TableData,
9};
10use crate::models::{Column, DataModel, Domain, Table};
11
12#[derive(Debug, thiserror::Error)]
14pub enum ConversionError {
15 #[error("Import error: {0}")]
16 ImportError(#[from] ImportError),
17 #[error("Export error: {0}")]
18 ExportError(#[from] ExportError),
19 #[error("Unsupported format: {0}")]
20 UnsupportedFormat(String),
21 #[error("Auto-detection failed: {0}")]
22 AutoDetectionFailed(String),
23 #[error("OpenAPI to ODCS conversion error: {0}")]
24 OpenAPIToODCSError(String),
25 #[error("OpenAPI component not found: {0}")]
26 OpenAPIComponentNotFound(String),
27 #[error("OpenAPI schema invalid: {0}")]
28 OpenAPISchemaInvalid(String),
29 #[error("Nested object conversion failed: {0}")]
30 NestedObjectConversionFailed(String),
31}
32
33fn table_data_to_table(table_data: &TableData) -> Table {
38 let table_name = table_data
39 .name
40 .clone()
41 .unwrap_or_else(|| format!("table_{}", table_data.table_index));
42
43 let columns: Vec<Column> = table_data
44 .columns
45 .iter()
46 .map(column_data_to_column)
47 .collect();
48
49 Table::new(table_name, columns)
50}
51
52fn column_data_to_column(col_data: &ColumnData) -> Column {
54 Column {
55 name: col_data.name.clone(),
56 data_type: col_data.data_type.clone(),
57 physical_type: col_data.physical_type.clone(),
58 nullable: col_data.nullable,
59 primary_key: col_data.primary_key,
60 secondary_key: false,
61 composite_key: None,
62 foreign_key: None,
63 constraints: Vec::new(),
64 description: col_data.description.clone().unwrap_or_default(),
65 errors: Vec::new(),
66 quality: col_data.quality.clone().unwrap_or_default(),
67 relationships: col_data.relationships.clone(),
68 enum_values: col_data.enum_values.clone().unwrap_or_default(),
69 column_order: 0,
70 nested_data: None,
71 }
72}
73
74pub fn reconstruct_tables(import_result: &ImportResult) -> Vec<Table> {
79 import_result
80 .tables
81 .iter()
82 .map(table_data_to_table)
83 .collect()
84}
85
86pub fn convert_to_odcs(input: &str, format: Option<&str>) -> Result<String, ConversionError> {
98 let detected_format = if let Some(fmt) = format {
100 fmt
101 } else {
102 auto_detect_format(input)?
103 };
104
105 let import_result = match detected_format {
107 "odcs" => {
108 let mut importer = ODCSImporter::new();
109 importer
110 .import(input)
111 .map_err(ConversionError::ImportError)?
112 }
113 "odcl" => {
114 let mut importer = ODCSImporter::new();
115 importer
116 .import(input)
117 .map_err(ConversionError::ImportError)?
118 }
119 "sql" => {
120 let importer = SQLImporter::new("postgresql");
121 importer
122 .parse(input)
123 .map_err(|e| ConversionError::ImportError(ImportError::ParseError(e.to_string())))?
124 }
125 "json_schema" => {
126 let importer = JSONSchemaImporter::new();
127 importer
128 .import(input)
129 .map_err(ConversionError::ImportError)?
130 }
131 "avro" => {
132 let importer = AvroImporter::new();
133 importer
134 .import(input)
135 .map_err(ConversionError::ImportError)?
136 }
137 "protobuf" => {
138 let importer = ProtobufImporter::new();
139 importer
140 .import(input)
141 .map_err(ConversionError::ImportError)?
142 }
143 "cads" => {
144 let importer = CADSImporter::new();
150 let _asset = importer
151 .import(input)
152 .map_err(ConversionError::ImportError)?;
153
154 return Err(ConversionError::UnsupportedFormat(
157 "CADS → ODCS conversion requires data schema information. CADS assets represent compute resources, not data contracts.".to_string()
158 ));
159 }
160 "odps" => {
161 let importer = ODPSImporter::new();
165 let product = importer
166 .import(input)
167 .map_err(ConversionError::ImportError)?;
168
169 let mut contract_ids = Vec::new();
171 if let Some(input_ports) = &product.input_ports {
172 for port in input_ports {
173 contract_ids.push(port.contract_id.clone());
174 }
175 }
176 if let Some(output_ports) = &product.output_ports {
177 for port in output_ports {
178 if let Some(contract_id) = &port.contract_id {
179 contract_ids.push(contract_id.clone());
180 }
181 }
182 }
183
184 if contract_ids.is_empty() {
185 return Err(ConversionError::UnsupportedFormat(
186 "ODPS → ODCS conversion requires contractId references. No contractIds found in input/output ports.".to_string()
187 ));
188 }
189
190 return Err(ConversionError::UnsupportedFormat(format!(
194 "ODPS → ODCS conversion requires ODCS Table definitions for contractIds: {}. Please provide the referenced ODCS Tables.",
195 contract_ids.join(", ")
196 )));
197 }
198 "domain" => {
199 let domain: Domain = serde_yaml::from_str(input).map_err(|e| {
203 ConversionError::ImportError(ImportError::ParseError(format!(
204 "Failed to parse Domain YAML: {}",
205 e
206 )))
207 })?;
208
209 let odcs_node_count = domain.odcs_nodes.len();
211 if odcs_node_count == 0 {
212 return Err(ConversionError::UnsupportedFormat(
213 "Domain → ODCS conversion: Domain contains no ODCS nodes.".to_string(),
214 ));
215 }
216
217 return Err(ConversionError::UnsupportedFormat(format!(
221 "Domain → ODCS conversion requires Table definitions. Domain contains {} ODCS node references, but full Table definitions must be provided separately (e.g., from a DataModel).",
222 odcs_node_count
223 )));
224 }
225 _ => {
226 return Err(ConversionError::UnsupportedFormat(
227 detected_format.to_string(),
228 ));
229 }
230 };
231
232 if import_result.tables.is_empty() {
234 return Err(ConversionError::ImportError(ImportError::ParseError(
235 "No tables found in input".to_string(),
236 )));
237 }
238
239 let tables = reconstruct_tables(&import_result);
241
242 let yaml_docs: Vec<String> = tables
244 .iter()
245 .map(|table| ODCSExporter::export_table(table, "odcs_v3_1_0"))
246 .collect();
247
248 Ok(yaml_docs.join("\n---\n"))
249}
250
251pub fn import_result_to_data_model(
256 import_result: &ImportResult,
257 model_name: &str,
258) -> Result<DataModel, ConversionError> {
259 if import_result.tables.is_empty() {
260 return Err(ConversionError::ImportError(ImportError::ParseError(
261 "No tables found in import result".to_string(),
262 )));
263 }
264
265 let tables = reconstruct_tables(import_result);
266
267 let mut model = DataModel::new(model_name.to_string(), String::new(), String::new());
268
269 for table in tables {
270 model.tables.push(table);
271 }
272
273 Ok(model)
274}
275
276fn auto_detect_format(input: &str) -> Result<&str, ConversionError> {
278 if input.contains("apiVersion:") && input.contains("kind: DataContract") {
280 return Ok("odcs");
281 }
282
283 if input.contains("dataContractSpecification:") {
285 return Ok("odcl");
286 }
287
288 if input.to_uppercase().contains("CREATE TABLE") {
290 return Ok("sql");
291 }
292
293 if input.trim_start().starts_with('{')
295 && (input.contains("\"$schema\"") || input.contains("\"type\""))
296 {
297 return Ok("json_schema");
298 }
299
300 if input.contains("\"type\"") && input.contains("\"fields\"") && input.contains("\"name\"") {
302 return Ok("avro");
303 }
304
305 if input.contains("syntax") || input.contains("message") || input.contains("service") {
307 return Ok("protobuf");
308 }
309
310 if input.contains("apiVersion:")
312 && (input.contains("kind: AIModel")
313 || input.contains("kind: MLPipeline")
314 || input.contains("kind: Application")
315 || input.contains("kind: ETLPipeline")
316 || input.contains("kind: SourceSystem")
317 || input.contains("kind: DestinationSystem"))
318 {
319 return Ok("cads");
320 }
321
322 if input.contains("apiVersion:") && input.contains("kind: DataProduct") {
324 return Ok("odps");
325 }
326
327 if input.contains("systems:")
329 && (input.contains("cads_nodes:") || input.contains("odcs_nodes:"))
330 {
331 return Ok("domain");
332 }
333
334 Err(ConversionError::AutoDetectionFailed(
335 "Could not auto-detect format. Please specify format explicitly.".to_string(),
336 ))
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342
343 #[test]
344 fn test_reconstruct_tables_from_import_result() {
345 let import_result = ImportResult {
346 tables: vec![TableData {
347 table_index: 0,
348 name: Some("users".to_string()),
349 columns: vec![
350 ColumnData {
351 name: "id".to_string(),
352 data_type: "INTEGER".to_string(),
353 physical_type: None,
354 nullable: false,
355 primary_key: true,
356 description: Some("User ID".to_string()),
357 quality: None,
358 relationships: vec![],
359 enum_values: None,
360 },
361 ColumnData {
362 name: "name".to_string(),
363 data_type: "VARCHAR(100)".to_string(),
364 physical_type: None,
365 nullable: true,
366 primary_key: false,
367 description: None,
368 quality: None,
369 relationships: vec![],
370 enum_values: None,
371 },
372 ],
373 }],
374 tables_requiring_name: vec![],
375 errors: vec![],
376 ai_suggestions: None,
377 };
378
379 let tables = reconstruct_tables(&import_result);
380 assert_eq!(tables.len(), 1);
381 assert_eq!(tables[0].name, "users");
382 assert_eq!(tables[0].columns.len(), 2);
383 assert_eq!(tables[0].columns[0].name, "id");
384 assert!(tables[0].columns[0].primary_key);
385 assert_eq!(tables[0].columns[0].description, "User ID");
386 }
387
388 #[test]
389 fn test_convert_sql_to_odcs() {
390 let sql = "CREATE TABLE users (id INTEGER PRIMARY KEY, name VARCHAR(100));";
391 let result = convert_to_odcs(sql, Some("sql"));
392 assert!(result.is_ok());
393 let yaml = result.unwrap();
394 assert!(yaml.contains("kind: DataContract"));
395 assert!(yaml.contains("users"));
396 }
397
398 #[test]
399 fn test_auto_detect_sql() {
400 let sql = "CREATE TABLE test (id INT);";
401 let format = auto_detect_format(sql);
402 assert!(format.is_ok());
403 assert_eq!(format.unwrap(), "sql");
404 }
405
406 #[test]
407 fn test_auto_detect_odcs() {
408 let odcs = "apiVersion: v3.1.0\nkind: DataContract\n";
409 let format = auto_detect_format(odcs);
410 assert!(format.is_ok());
411 assert_eq!(format.unwrap(), "odcs");
412 }
413
414 #[test]
415 fn test_import_result_to_data_model() {
416 let import_result = ImportResult {
417 tables: vec![TableData {
418 table_index: 0,
419 name: Some("orders".to_string()),
420 columns: vec![ColumnData {
421 name: "order_id".to_string(),
422 data_type: "UUID".to_string(),
423 physical_type: None,
424 nullable: false,
425 primary_key: true,
426 description: None,
427 quality: None,
428 relationships: vec![],
429 enum_values: None,
430 }],
431 }],
432 tables_requiring_name: vec![],
433 errors: vec![],
434 ai_suggestions: None,
435 };
436
437 let model = import_result_to_data_model(&import_result, "test_model");
438 assert!(model.is_ok());
439 let model = model.unwrap();
440 assert_eq!(model.name, "test_model");
441 assert_eq!(model.tables.len(), 1);
442 assert_eq!(model.tables[0].name, "orders");
443 }
444}