1use super::odcs_shared::{
12 ParserError, column_to_column_data, expand_nested_column, json_value_to_serde_value,
13 normalize_data_type, parse_data_vault_classification, parse_medallion_layer, parse_scd_pattern,
14 resolve_ref, yaml_to_json_value,
15};
16use super::{ImportError, ImportResult, TableData};
17use crate::models::column::ForeignKey;
18use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
19use crate::models::{Column, PropertyRelationship, Table, Tag};
20use anyhow::{Context, Result};
21use serde_json::Value as JsonValue;
22use std::collections::HashMap;
23use std::str::FromStr;
24use tracing::info;
25
26pub use super::odcs_shared::ParserError as OdcsParserError;
28
29fn ref_to_relationships(ref_path: &Option<String>) -> Vec<PropertyRelationship> {
32 match ref_path {
33 Some(ref_str) => {
34 let to = if ref_str.starts_with("#/definitions/") {
35 let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
36 format!("definitions/{}", def_path)
37 } else if ref_str.starts_with("#/") {
38 ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
39 } else {
40 ref_str.clone()
41 };
42 vec![PropertyRelationship {
43 relationship_type: "foreignKey".to_string(),
44 to,
45 }]
46 }
47 None => Vec::new(),
48 }
49}
50
51pub struct ODCSImporter {
54 current_yaml_data: Option<serde_yaml::Value>,
56}
57
58impl ODCSImporter {
59 pub fn new() -> Self {
69 Self {
70 current_yaml_data: None,
71 }
72 }
73
74 pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
107 let yaml_data: serde_yaml::Value = serde_yaml::from_str(yaml_content)
109 .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
110
111 let json_data = yaml_to_json_value(&yaml_data).map_err(|e| {
112 ImportError::ParseError(format!("Failed to convert YAML to JSON: {}", e))
113 })?;
114
115 match self.parse(yaml_content) {
116 Ok((table, errors)) => {
117 let sdk_tables = vec![TableData {
119 table_index: 0,
120 id: Some(table.id.to_string()),
121 name: Some(table.name.clone()),
122 api_version: json_data
123 .get("apiVersion")
124 .and_then(|v| v.as_str())
125 .map(|s| s.to_string()),
126 version: json_data
127 .get("version")
128 .and_then(|v| v.as_str())
129 .map(|s| s.to_string()),
130 status: json_data
131 .get("status")
132 .and_then(|v| v.as_str())
133 .map(|s| s.to_string()),
134 kind: json_data
135 .get("kind")
136 .and_then(|v| v.as_str())
137 .map(|s| s.to_string()),
138 domain: json_data
139 .get("domain")
140 .and_then(|v| v.as_str())
141 .map(|s| s.to_string()),
142 data_product: json_data
143 .get("dataProduct")
144 .and_then(|v| v.as_str())
145 .map(|s| s.to_string()),
146 tenant: json_data
147 .get("tenant")
148 .and_then(|v| v.as_str())
149 .map(|s| s.to_string()),
150 description: json_data.get("description").cloned(),
151 columns: table.columns.iter().map(column_to_column_data).collect(),
152 servers: json_data
153 .get("servers")
154 .and_then(|v| v.as_array())
155 .cloned()
156 .unwrap_or_default(),
157 team: json_data.get("team").cloned(),
158 support: json_data.get("support").cloned(),
159 roles: json_data
160 .get("roles")
161 .and_then(|v| v.as_array())
162 .cloned()
163 .unwrap_or_default(),
164 sla_properties: json_data
165 .get("slaProperties")
166 .and_then(|v| v.as_array())
167 .cloned()
168 .unwrap_or_default(),
169 quality: table.quality.clone(),
170 price: json_data.get("price").cloned(),
171 tags: table.tags.iter().map(|t| t.to_string()).collect(),
172 custom_properties: json_data
173 .get("customProperties")
174 .and_then(|v| v.as_array())
175 .cloned()
176 .unwrap_or_default(),
177 authoritative_definitions: json_data
178 .get("authoritativeDefinitions")
179 .and_then(|v| v.as_array())
180 .cloned()
181 .unwrap_or_default(),
182 contract_created_ts: json_data
183 .get("contractCreatedTs")
184 .and_then(|v| v.as_str())
185 .map(|s| s.to_string()),
186 odcs_metadata: table.odcl_metadata.clone(),
187 }];
188 let sdk_errors: Vec<ImportError> = errors
189 .iter()
190 .map(|e| ImportError::ParseError(e.message.clone()))
191 .collect();
192 Ok(ImportResult {
193 tables: sdk_tables,
194 tables_requiring_name: Vec::new(),
195 errors: sdk_errors,
196 ai_suggestions: None,
197 })
198 }
199 Err(e) => Err(ImportError::ParseError(e.to_string())),
200 }
201 }
202
203 pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
214 self.parse(yaml_content)
215 }
216
217 fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
227 let _errors: Vec<ParserError> = Vec::new();
229
230 let data: serde_yaml::Value =
232 serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
233
234 if data.is_null() {
235 return Err(anyhow::anyhow!("Empty YAML content"));
236 }
237
238 self.current_yaml_data = Some(data.clone());
240
241 let json_data = yaml_to_json_value(&data)?;
243
244 if self.is_liquibase_format(&json_data) {
246 return self.parse_liquibase(&json_data);
247 }
248
249 if self.is_odcl_v3_format(&json_data) {
250 return self.parse_odcl_v3(&json_data);
251 }
252
253 if self.is_data_contract_format(&json_data) {
254 return self.parse_data_contract(&json_data);
255 }
256
257 self.parse_simple_odcl(&json_data)
259 }
260
261 fn is_liquibase_format(&self, data: &JsonValue) -> bool {
263 if data.get("databaseChangeLog").is_some() {
264 return true;
265 }
266 if let Some(obj) = data.as_object() {
268 let obj_str = format!("{:?}", obj);
269 if obj_str.contains("changeSet") {
270 return true;
271 }
272 }
273 false
274 }
275
276 fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
278 if let Some(obj) = data.as_object() {
279 let has_api_version = obj.contains_key("apiVersion");
280 let has_kind = obj
281 .get("kind")
282 .and_then(|v| v.as_str())
283 .map(|s| s == "DataContract")
284 .unwrap_or(false);
285 let has_id = obj.contains_key("id");
286 let has_version = obj.contains_key("version");
287 return has_api_version && has_kind && has_id && has_version;
288 }
289 false
290 }
291
292 fn is_data_contract_format(&self, data: &JsonValue) -> bool {
294 if let Some(obj) = data.as_object() {
295 let has_spec = obj.contains_key("dataContractSpecification");
296 let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
297 return has_spec && has_models;
298 }
299 false
300 }
301
302 fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
304 let mut errors = Vec::new();
305
306 let name = data
308 .get("name")
309 .and_then(|v| v.as_str())
310 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
311 .to_string();
312
313 let columns_data = data
315 .get("columns")
316 .and_then(|v| v.as_array())
317 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
318
319 let mut columns = Vec::new();
320 for (idx, col_data) in columns_data.iter().enumerate() {
321 match self.parse_column(col_data) {
322 Ok(col) => columns.push(col),
323 Err(e) => {
324 errors.push(ParserError {
325 error_type: "column_parse_error".to_string(),
326 field: format!("columns[{}]", idx),
327 message: e.to_string(),
328 });
329 }
330 }
331 }
332
333 let database_type = self.extract_database_type(data);
335 let medallion_layers = self.extract_medallion_layers(data);
336 let scd_pattern = self.extract_scd_pattern(data);
337 let data_vault_classification = self.extract_data_vault_classification(data);
338 let quality_rules = self.extract_quality_rules(data);
339
340 if scd_pattern.is_some() && data_vault_classification.is_some() {
342 errors.push(ParserError {
343 error_type: "validation_error".to_string(),
344 field: "patterns".to_string(),
345 message: "SCD pattern and Data Vault classification are mutually exclusive"
346 .to_string(),
347 });
348 }
349
350 let mut odcl_metadata = HashMap::new();
352 if let Some(metadata) = data.get("odcl_metadata")
353 && let Some(obj) = metadata.as_object()
354 {
355 for (key, value) in obj {
356 odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
357 }
358 }
359
360 let table_uuid = self.extract_table_uuid(data);
361
362 let table = Table {
363 id: table_uuid,
364 name,
365 columns,
366 database_type,
367 catalog_name: None,
368 schema_name: None,
369 medallion_layers,
370 scd_pattern,
371 data_vault_classification,
372 modeling_level: None,
373 tags: Vec::<Tag>::new(),
374 odcl_metadata,
375 owner: None,
376 sla: None,
377 contact_details: None,
378 infrastructure_type: None,
379 notes: None,
380 position: None,
381 yaml_file_path: None,
382 drawio_cell_id: None,
383 quality: quality_rules,
384 errors: Vec::new(),
385 created_at: chrono::Utc::now(),
386 updated_at: chrono::Utc::now(),
387 };
388
389 info!("Parsed ODCL table: {}", table.name);
390 Ok((table, errors))
391 }
392
393 fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
395 let name = col_data
396 .get("name")
397 .and_then(|v| v.as_str())
398 .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
399 .to_string();
400
401 let data_type = col_data
402 .get("data_type")
403 .and_then(|v| v.as_str())
404 .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
405 .to_string();
406
407 let data_type = normalize_data_type(&data_type);
409
410 let nullable = col_data
411 .get("nullable")
412 .and_then(|v| v.as_bool())
413 .unwrap_or(true);
414
415 let primary_key = col_data
416 .get("primary_key")
417 .and_then(|v| v.as_bool())
418 .unwrap_or(false);
419
420 let foreign_key = col_data
421 .get("foreign_key")
422 .and_then(|v| self.parse_foreign_key(v));
423
424 let constraints = col_data
425 .get("constraints")
426 .and_then(|v| v.as_array())
427 .map(|arr| {
428 arr.iter()
429 .filter_map(|v| v.as_str().map(|s| s.to_string()))
430 .collect()
431 })
432 .unwrap_or_default();
433
434 let description = col_data
435 .get("description")
436 .and_then(|v| v.as_str())
437 .map(|s| s.to_string())
438 .unwrap_or_default();
439
440 let mut column_quality_rules = Vec::new();
442 if let Some(quality_val) = col_data.get("quality") {
443 if let Some(arr) = quality_val.as_array() {
444 for item in arr {
446 if let Some(obj) = item.as_object() {
447 let mut rule = HashMap::new();
448 for (key, value) in obj {
449 rule.insert(key.clone(), json_value_to_serde_value(value));
450 }
451 column_quality_rules.push(rule);
452 }
453 }
454 } else if let Some(obj) = quality_val.as_object() {
455 let mut rule = HashMap::new();
457 for (key, value) in obj {
458 rule.insert(key.clone(), json_value_to_serde_value(value));
459 }
460 column_quality_rules.push(rule);
461 }
462 }
463
464 Ok(Column {
469 name,
470 data_type,
471 nullable,
472 primary_key,
473 foreign_key,
474 constraints,
475 description,
476 quality: column_quality_rules,
477 ..Default::default()
478 })
479 }
480
481 fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
483 let obj = fk_data.as_object()?;
484 Some(ForeignKey {
485 table_id: obj
486 .get("table_id")
487 .or_else(|| obj.get("table"))
488 .and_then(|v| v.as_str())
489 .unwrap_or("")
490 .to_string(),
491 column_name: obj
492 .get("column_name")
493 .or_else(|| obj.get("column"))
494 .and_then(|v| v.as_str())
495 .unwrap_or("")
496 .to_string(),
497 })
498 }
499
500 fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
502 data.get("database_type")
503 .and_then(|v| v.as_str())
504 .and_then(|s| match s.to_uppercase().as_str() {
505 "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
506 "MYSQL" => Some(DatabaseType::Mysql),
507 "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
508 "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
509 "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
510 _ => None,
511 })
512 }
513
514 fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
516 let mut layers = Vec::new();
517
518 if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
520 for item in arr {
521 if let Some(s) = item.as_str()
522 && let Ok(layer) = parse_medallion_layer(s)
523 {
524 layers.push(layer);
525 }
526 }
527 }
528 else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
530 && let Ok(layer) = parse_medallion_layer(s)
531 {
532 layers.push(layer);
533 }
534
535 layers
536 }
537
538 fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
540 data.get("scd_pattern")
541 .and_then(|v| v.as_str())
542 .and_then(|s| parse_scd_pattern(s).ok())
543 }
544
545 fn extract_data_vault_classification(
547 &self,
548 data: &JsonValue,
549 ) -> Option<DataVaultClassification> {
550 data.get("data_vault_classification")
551 .and_then(|v| v.as_str())
552 .and_then(|s| parse_data_vault_classification(s).ok())
553 }
554
555 fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
557 use serde_json::Value;
558 let mut quality_rules = Vec::new();
559
560 if let Some(quality_val) = data.get("quality") {
562 if let Some(arr) = quality_val.as_array() {
563 for item in arr {
565 if let Some(obj) = item.as_object() {
566 let mut rule = HashMap::new();
567 for (key, value) in obj {
568 rule.insert(key.clone(), json_value_to_serde_value(value));
569 }
570 quality_rules.push(rule);
571 }
572 }
573 } else if let Some(obj) = quality_val.as_object() {
574 let mut rule = HashMap::new();
576 for (key, value) in obj {
577 rule.insert(key.clone(), json_value_to_serde_value(value));
578 }
579 quality_rules.push(rule);
580 } else if let Some(s) = quality_val.as_str() {
581 let mut rule = HashMap::new();
583 rule.insert("value".to_string(), Value::String(s.to_string()));
584 quality_rules.push(rule);
585 }
586 }
587
588 if let Some(metadata) = data.get("metadata")
590 && let Some(metadata_obj) = metadata.as_object()
591 && let Some(quality_val) = metadata_obj.get("quality")
592 {
593 if let Some(arr) = quality_val.as_array() {
594 for item in arr {
596 if let Some(obj) = item.as_object() {
597 let mut rule = HashMap::new();
598 for (key, value) in obj {
599 rule.insert(key.clone(), json_value_to_serde_value(value));
600 }
601 quality_rules.push(rule);
602 }
603 }
604 } else if let Some(obj) = quality_val.as_object() {
605 let mut rule = HashMap::new();
607 for (key, value) in obj {
608 rule.insert(key.clone(), json_value_to_serde_value(value));
609 }
610 quality_rules.push(rule);
611 } else if let Some(s) = quality_val.as_str() {
612 let mut rule = HashMap::new();
614 rule.insert("value".to_string(), Value::String(s.to_string()));
615 quality_rules.push(rule);
616 }
617 }
618
619 if let Some(tblprops) = data.get("tblproperties")
621 && let Some(obj) = tblprops.as_object()
622 {
623 for (key, value) in obj {
624 let mut rule = HashMap::new();
625 rule.insert("property".to_string(), Value::String(key.clone()));
626 rule.insert("value".to_string(), json_value_to_serde_value(value));
627 quality_rules.push(rule);
628 }
629 }
630
631 quality_rules
632 }
633
634 fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
642 let mut errors = Vec::new();
656
657 let changelog = data
658 .get("databaseChangeLog")
659 .and_then(|v| v.as_array())
660 .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
661
662 let mut table_name: Option<String> = None;
664 let mut columns: Vec<crate::models::column::Column> = Vec::new();
665
666 for entry in changelog {
667 if let Some(change_set) = entry.get("changeSet") {
669 let changes = if let Some(obj) = change_set.as_object() {
671 obj.get("changes")
672 .and_then(|v| v.as_array())
673 .cloned()
674 .unwrap_or_default()
675 } else if let Some(arr) = change_set.as_array() {
676 arr.clone()
678 } else {
679 Vec::new()
680 };
681
682 for ch in changes {
683 let create = ch.get("createTable").or_else(|| ch.get("create_table"));
684 if let Some(create) = create {
685 table_name = create
686 .get("tableName")
687 .or_else(|| create.get("table_name"))
688 .and_then(|v| v.as_str())
689 .map(|s| s.to_string());
690
691 if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
693 for col_entry in cols {
694 let col = col_entry.get("column").unwrap_or(col_entry);
695 let name = col
696 .get("name")
697 .and_then(|v| v.as_str())
698 .unwrap_or("")
699 .to_string();
700 let data_type = col
701 .get("type")
702 .and_then(|v| v.as_str())
703 .unwrap_or("")
704 .to_string();
705
706 if name.is_empty() {
707 errors.push(ParserError {
708 error_type: "validation_error".to_string(),
709 field: "columns.name".to_string(),
710 message: "Liquibase createTable column missing name"
711 .to_string(),
712 });
713 continue;
714 }
715
716 let mut column =
717 crate::models::column::Column::new(name, data_type);
718
719 if let Some(constraints) =
720 col.get("constraints").and_then(|v| v.as_object())
721 {
722 if let Some(pk) =
723 constraints.get("primaryKey").and_then(|v| v.as_bool())
724 {
725 column.primary_key = pk;
726 }
727 if let Some(nullable) =
728 constraints.get("nullable").and_then(|v| v.as_bool())
729 {
730 column.nullable = nullable;
731 }
732 }
733
734 columns.push(column);
735 }
736 }
737
738 break;
741 }
742 }
743 }
744 if table_name.is_some() {
745 break;
746 }
747 }
748
749 let table_name = table_name
750 .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
751 let table = Table::new(table_name, columns);
752 Ok((table, errors))
754 }
755
756 fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
758 let mut errors = Vec::new();
759
760 let table_name = data
762 .get("name")
763 .and_then(|v| v.as_str())
764 .map(|s| s.to_string())
765 .or_else(|| {
766 data.get("schema")
768 .and_then(|v| v.as_array())
769 .and_then(|arr| arr.first())
770 .and_then(|obj| obj.as_object())
771 .and_then(|obj| obj.get("name"))
772 .and_then(|v| v.as_str())
773 .map(|s| s.to_string())
774 })
775 .ok_or_else(|| {
776 anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
777 })?;
778
779 let schema = data
781 .get("schema")
782 .and_then(|v| v.as_array())
783 .ok_or_else(|| {
784 errors.push(ParserError {
785 error_type: "validation_error".to_string(),
786 field: "schema".to_string(),
787 message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
788 });
789 anyhow::anyhow!("Missing schema")
790 });
791
792 let schema = match schema {
793 Ok(s) if s.is_empty() => {
794 errors.push(ParserError {
795 error_type: "validation_error".to_string(),
796 field: "schema".to_string(),
797 message: "ODCS v3.0.x schema array is empty".to_string(),
798 });
799 let quality_rules = self.extract_quality_rules(data);
800 let table_uuid = self.extract_table_uuid(data);
801 let table = Table {
802 id: table_uuid,
803 name: table_name,
804 columns: Vec::new(),
805 database_type: None,
806 catalog_name: None,
807 schema_name: None,
808 medallion_layers: Vec::new(),
809 scd_pattern: None,
810 data_vault_classification: None,
811 modeling_level: None,
812 tags: Vec::<Tag>::new(),
813 odcl_metadata: HashMap::new(),
814 owner: None,
815 sla: None,
816 contact_details: None,
817 infrastructure_type: None,
818 notes: None,
819 position: None,
820 yaml_file_path: None,
821 drawio_cell_id: None,
822 quality: quality_rules,
823 errors: Vec::new(),
824 created_at: chrono::Utc::now(),
825 updated_at: chrono::Utc::now(),
826 };
827 return Ok((table, errors));
828 }
829 Ok(s) => s,
830 Err(_) => {
831 let quality_rules = self.extract_quality_rules(data);
832 let table_uuid = self.extract_table_uuid(data);
833 let table = Table {
834 id: table_uuid,
835 name: table_name,
836 columns: Vec::new(),
837 database_type: None,
838 catalog_name: None,
839 schema_name: None,
840 medallion_layers: Vec::new(),
841 scd_pattern: None,
842 data_vault_classification: None,
843 modeling_level: None,
844 tags: Vec::<Tag>::new(),
845 odcl_metadata: HashMap::new(),
846 owner: None,
847 sla: None,
848 contact_details: None,
849 infrastructure_type: None,
850 notes: None,
851 position: None,
852 yaml_file_path: None,
853 drawio_cell_id: None,
854 quality: quality_rules,
855 errors: Vec::new(),
856 created_at: chrono::Utc::now(),
857 updated_at: chrono::Utc::now(),
858 };
859 return Ok((table, errors));
860 }
861 };
862
863 let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
865 errors.push(ParserError {
866 error_type: "validation_error".to_string(),
867 field: "schema[0]".to_string(),
868 message: "First schema object must be a dictionary".to_string(),
869 });
870 anyhow::anyhow!("Invalid schema object")
871 })?;
872
873 let object_name = schema_object
874 .get("name")
875 .and_then(|v| v.as_str())
876 .unwrap_or(&table_name);
877
878 let mut columns = Vec::new();
880
881 if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
882 for (prop_name, prop_data) in properties_obj {
884 if let Some(prop_obj) = prop_data.as_object() {
885 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
886 Ok(mut cols) => columns.append(&mut cols),
887 Err(e) => {
888 errors.push(ParserError {
889 error_type: "property_parse_error".to_string(),
890 field: format!("Property '{}'", prop_name),
891 message: e.to_string(),
892 });
893 }
894 }
895 } else {
896 errors.push(ParserError {
897 error_type: "validation_error".to_string(),
898 field: format!("Property '{}'", prop_name),
899 message: format!("Property '{}' must be an object", prop_name),
900 });
901 }
902 }
903 } else if let Some(properties_arr) =
904 schema_object.get("properties").and_then(|v| v.as_array())
905 {
906 for (idx, prop_data) in properties_arr.iter().enumerate() {
908 if let Some(prop_obj) = prop_data.as_object() {
909 let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
912 Some(JsonValue::String(s)) => s.as_str(),
913 _ => {
914 errors.push(ParserError {
916 error_type: "validation_error".to_string(),
917 field: format!("Property[{}]", idx),
918 message: format!(
919 "Property[{}] missing required 'name' or 'id' field",
920 idx
921 ),
922 });
923 continue;
924 }
925 };
926
927 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
928 Ok(mut cols) => columns.append(&mut cols),
929 Err(e) => {
930 errors.push(ParserError {
931 error_type: "property_parse_error".to_string(),
932 field: format!("Property[{}] '{}'", idx, prop_name),
933 message: e.to_string(),
934 });
935 }
936 }
937 } else {
938 errors.push(ParserError {
939 error_type: "validation_error".to_string(),
940 field: format!("Property[{}]", idx),
941 message: format!("Property[{}] must be an object", idx),
942 });
943 }
944 }
945 } else {
946 errors.push(ParserError {
947 error_type: "validation_error".to_string(),
948 field: format!("Object '{}'", object_name),
949 message: format!(
950 "Object '{}' missing 'properties' field or properties is invalid",
951 object_name
952 ),
953 });
954 }
955
956 let (medallion_layers, scd_pattern, data_vault_classification, mut tags): (
958 _,
959 _,
960 _,
961 Vec<Tag>,
962 ) = self.extract_metadata_from_custom_properties(data);
963
964 let mut shared_domains: Vec<String> = Vec::new();
966 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
967 for prop in custom_props {
968 if let Some(prop_obj) = prop.as_object() {
969 let prop_key = prop_obj
970 .get("property")
971 .and_then(|v| v.as_str())
972 .unwrap_or("");
973 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
974 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
975 {
976 for item in arr {
977 if let Some(s) = item.as_str() {
978 shared_domains.push(s.to_string());
979 }
980 }
981 }
982 }
983 }
984 }
985
986 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
988 for item in tags_arr {
989 if let Some(s) = item.as_str() {
990 let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
992 if !tags.contains(&tag) {
993 tags.push(tag);
994 }
995 }
996 }
997 }
998
999 let database_type = self.extract_database_type_from_odcl_v3_servers(data);
1001
1002 let quality_rules = self.extract_quality_rules(data);
1004
1005 let mut odcl_metadata = HashMap::new();
1007 odcl_metadata.insert(
1008 "apiVersion".to_string(),
1009 json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
1010 );
1011 odcl_metadata.insert(
1012 "kind".to_string(),
1013 json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
1014 );
1015 odcl_metadata.insert(
1016 "id".to_string(),
1017 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1018 );
1019 odcl_metadata.insert(
1020 "version".to_string(),
1021 json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
1022 );
1023 odcl_metadata.insert(
1024 "status".to_string(),
1025 json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
1026 );
1027
1028 if let Some(servicelevels_val) = data.get("servicelevels") {
1030 odcl_metadata.insert(
1031 "servicelevels".to_string(),
1032 json_value_to_serde_value(servicelevels_val),
1033 );
1034 }
1035
1036 if let Some(links_val) = data.get("links") {
1038 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1039 }
1040
1041 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1043 odcl_metadata.insert(
1044 "domain".to_string(),
1045 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1046 );
1047 }
1048 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1049 odcl_metadata.insert(
1050 "dataProduct".to_string(),
1051 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1052 );
1053 }
1054 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1055 odcl_metadata.insert(
1056 "tenant".to_string(),
1057 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1058 );
1059 }
1060
1061 if let Some(desc_val) = data.get("description") {
1063 odcl_metadata.insert(
1064 "description".to_string(),
1065 json_value_to_serde_value(desc_val),
1066 );
1067 }
1068
1069 if let Some(pricing_val) = data.get("pricing") {
1071 odcl_metadata.insert(
1072 "pricing".to_string(),
1073 json_value_to_serde_value(pricing_val),
1074 );
1075 }
1076
1077 if let Some(team_val) = data.get("team") {
1079 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1080 }
1081
1082 if let Some(roles_val) = data.get("roles") {
1084 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1085 }
1086
1087 if let Some(terms_val) = data.get("terms") {
1089 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1090 }
1091
1092 if let Some(servers_val) = data.get("servers") {
1094 odcl_metadata.insert(
1095 "servers".to_string(),
1096 json_value_to_serde_value(servers_val),
1097 );
1098 }
1099
1100 if let Some(infrastructure_val) = data.get("infrastructure") {
1102 odcl_metadata.insert(
1103 "infrastructure".to_string(),
1104 json_value_to_serde_value(infrastructure_val),
1105 );
1106 }
1107
1108 if !shared_domains.is_empty() {
1110 let shared_domains_json: Vec<serde_json::Value> = shared_domains
1111 .iter()
1112 .map(|d| serde_json::Value::String(d.clone()))
1113 .collect();
1114 odcl_metadata.insert(
1115 "sharedDomains".to_string(),
1116 serde_json::Value::Array(shared_domains_json),
1117 );
1118 }
1119
1120 let table_uuid = self.extract_table_uuid(data);
1121
1122 let table = Table {
1123 id: table_uuid,
1124 name: table_name,
1125 columns,
1126 database_type,
1127 catalog_name: None,
1128 schema_name: None,
1129 medallion_layers,
1130 scd_pattern,
1131 data_vault_classification,
1132 modeling_level: None,
1133 tags,
1134 odcl_metadata,
1135 owner: None,
1136 sla: None,
1137 contact_details: None,
1138 infrastructure_type: None,
1139 notes: None,
1140 position: None,
1141 yaml_file_path: None,
1142 drawio_cell_id: None,
1143 quality: quality_rules,
1144 errors: Vec::new(),
1145 created_at: chrono::Utc::now(),
1146 updated_at: chrono::Utc::now(),
1147 };
1148
1149 info!(
1150 "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1151 table.name,
1152 errors.len()
1153 );
1154 Ok((table, errors))
1155 }
1156
1157 fn parse_odcl_v3_property(
1159 &self,
1160 prop_name: &str,
1161 prop_data: &serde_json::Map<String, JsonValue>,
1162 data: &JsonValue,
1163 ) -> Result<Vec<Column>> {
1164 let mut errors = Vec::new();
1166 self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
1167 }
1168
1169 fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
1172 if let Some(id_val) = data.get("id")
1174 && let Some(id_str) = id_val.as_str()
1175 {
1176 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1177 tracing::debug!(
1178 "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
1179 uuid
1180 );
1181 return uuid;
1182 } else {
1183 tracing::warn!(
1184 "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
1185 id_str
1186 );
1187 }
1188 }
1189
1190 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1192 for prop in custom_props {
1193 if let Some(prop_obj) = prop.as_object() {
1194 let prop_key = prop_obj
1195 .get("property")
1196 .and_then(|v| v.as_str())
1197 .unwrap_or("");
1198 if prop_key == "tableUuid"
1199 && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
1200 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1201 {
1202 tracing::debug!(
1203 "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
1204 uuid
1205 );
1206 return uuid;
1207 }
1208 }
1209 }
1210 }
1211
1212 if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
1214 && let Some(uuid_val) = metadata.get("tableUuid")
1215 && let Some(uuid_str) = uuid_val.as_str()
1216 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1217 {
1218 tracing::debug!(
1219 "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
1220 uuid
1221 );
1222 return uuid;
1223 }
1224
1225 let table_name = data
1227 .get("name")
1228 .and_then(|v| v.as_str())
1229 .unwrap_or("unknown");
1230 let new_uuid = crate::models::table::Table::generate_id(
1231 table_name, None, None, None, );
1235 tracing::warn!(
1236 "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
1237 table_name,
1238 new_uuid
1239 );
1240 new_uuid
1241 }
1242
1243 fn extract_metadata_from_custom_properties(
1245 &self,
1246 data: &JsonValue,
1247 ) -> (
1248 Vec<MedallionLayer>,
1249 Option<SCDPattern>,
1250 Option<DataVaultClassification>,
1251 Vec<Tag>,
1252 ) {
1253 let mut medallion_layers = Vec::new();
1254 let mut scd_pattern = None;
1255 let mut data_vault_classification = None;
1256 let mut tags: Vec<Tag> = Vec::new();
1257
1258 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1259 for prop in custom_props {
1260 if let Some(prop_obj) = prop.as_object() {
1261 let prop_key = prop_obj
1262 .get("property")
1263 .and_then(|v| v.as_str())
1264 .unwrap_or("");
1265 let prop_value = prop_obj.get("value");
1266
1267 match prop_key {
1268 "medallionLayers" | "medallion_layers" => {
1269 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1270 for item in arr {
1271 if let Some(s) = item.as_str()
1272 && let Ok(layer) = parse_medallion_layer(s)
1273 {
1274 medallion_layers.push(layer);
1275 }
1276 }
1277 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1278 for part in s.split(',') {
1280 if let Ok(layer) = parse_medallion_layer(part.trim()) {
1281 medallion_layers.push(layer);
1282 }
1283 }
1284 }
1285 }
1286 "scdPattern" | "scd_pattern" => {
1287 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1288 scd_pattern = parse_scd_pattern(s).ok();
1289 }
1290 }
1291 "dataVaultClassification" | "data_vault_classification" => {
1292 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1293 data_vault_classification = parse_data_vault_classification(s).ok();
1294 }
1295 }
1296 "tags" => {
1297 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1298 for item in arr {
1299 if let Some(s) = item.as_str() {
1300 if let Ok(tag) = Tag::from_str(s) {
1302 tags.push(tag);
1303 } else {
1304 tags.push(Tag::Simple(s.to_string()));
1305 }
1306 }
1307 }
1308 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1309 for part in s.split(',') {
1311 let part = part.trim();
1312 if let Ok(tag) = Tag::from_str(part) {
1313 tags.push(tag);
1314 } else {
1315 tags.push(Tag::Simple(part.to_string()));
1316 }
1317 }
1318 }
1319 }
1320 "sharedDomains" | "shared_domains" => {
1321 }
1324 _ => {}
1325 }
1326 }
1327 }
1328 }
1329
1330 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1332 for item in tags_arr {
1333 if let Some(s) = item.as_str() {
1334 let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
1336 if !tags.contains(&tag) {
1337 tags.push(tag);
1338 }
1339 }
1340 }
1341 }
1342
1343 (
1344 medallion_layers,
1345 scd_pattern,
1346 data_vault_classification,
1347 tags,
1348 )
1349 }
1350
1351 fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
1353 if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
1355 && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
1356 {
1357 return server_obj
1358 .get("type")
1359 .and_then(|v| v.as_str())
1360 .and_then(|s| self.parse_database_type(s));
1361 }
1362 None
1363 }
1364
1365 fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1367 let mut errors = Vec::new();
1368
1369 let models = data
1371 .get("models")
1372 .and_then(|v| v.as_object())
1373 .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
1374
1375 let (model_name, model_data) = models
1378 .iter()
1379 .next()
1380 .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
1381
1382 let model_data = model_data
1383 .as_object()
1384 .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
1385
1386 let fields = model_data
1388 .get("fields")
1389 .and_then(|v| v.as_object())
1390 .ok_or_else(|| {
1391 errors.push(ParserError {
1392 error_type: "validation_error".to_string(),
1393 field: format!("Model '{}'", model_name),
1394 message: format!("Model '{}' missing 'fields' field", model_name),
1395 });
1396 anyhow::anyhow!("Missing fields")
1397 });
1398
1399 let fields = match fields {
1400 Ok(f) => f,
1401 Err(_) => {
1402 let quality_rules = self.extract_quality_rules(data);
1404 let table_uuid = self.extract_table_uuid(data);
1405 let table = Table {
1406 id: table_uuid,
1407 name: model_name.clone(),
1408 columns: Vec::new(),
1409 database_type: None,
1410 catalog_name: None,
1411 schema_name: None,
1412 medallion_layers: Vec::new(),
1413 scd_pattern: None,
1414 data_vault_classification: None,
1415 modeling_level: None,
1416 tags: Vec::<Tag>::new(),
1417 odcl_metadata: HashMap::new(),
1418 owner: None,
1419 sla: None,
1420 contact_details: None,
1421 infrastructure_type: None,
1422 notes: None,
1423 position: None,
1424 yaml_file_path: None,
1425 drawio_cell_id: None,
1426 quality: quality_rules,
1427 errors: Vec::new(),
1428 created_at: chrono::Utc::now(),
1429 updated_at: chrono::Utc::now(),
1430 };
1431 return Ok((table, errors));
1432 }
1433 };
1434
1435 let mut columns = Vec::new();
1437 for (field_name, field_data) in fields {
1438 if let Some(field_obj) = field_data.as_object() {
1439 match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
1440 Ok(mut cols) => columns.append(&mut cols),
1441 Err(e) => {
1442 errors.push(ParserError {
1443 error_type: "field_parse_error".to_string(),
1444 field: format!("Field '{}'", field_name),
1445 message: e.to_string(),
1446 });
1447 }
1448 }
1449 } else {
1450 errors.push(ParserError {
1451 error_type: "validation_error".to_string(),
1452 field: format!("Field '{}'", field_name),
1453 message: format!("Field '{}' must be an object", field_name),
1454 });
1455 }
1456 }
1457
1458 let mut odcl_metadata = HashMap::new();
1461
1462 if let Some(info_val) = data.get("info") {
1465 let info_json_value = json_value_to_serde_value(info_val);
1467 odcl_metadata.insert("info".to_string(), info_json_value);
1468 }
1469
1470 odcl_metadata.insert(
1471 "dataContractSpecification".to_string(),
1472 json_value_to_serde_value(
1473 data.get("dataContractSpecification")
1474 .unwrap_or(&JsonValue::Null),
1475 ),
1476 );
1477 odcl_metadata.insert(
1478 "id".to_string(),
1479 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1480 );
1481 if let Some(servicelevels_val) = data.get("servicelevels") {
1485 odcl_metadata.insert(
1486 "servicelevels".to_string(),
1487 json_value_to_serde_value(servicelevels_val),
1488 );
1489 }
1490
1491 if let Some(links_val) = data.get("links") {
1493 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1494 }
1495
1496 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1498 odcl_metadata.insert(
1499 "domain".to_string(),
1500 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1501 );
1502 }
1503 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1504 odcl_metadata.insert(
1505 "dataProduct".to_string(),
1506 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1507 );
1508 }
1509 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1510 odcl_metadata.insert(
1511 "tenant".to_string(),
1512 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1513 );
1514 }
1515
1516 if let Some(desc_val) = data.get("description") {
1518 odcl_metadata.insert(
1519 "description".to_string(),
1520 json_value_to_serde_value(desc_val),
1521 );
1522 }
1523
1524 if let Some(pricing_val) = data.get("pricing") {
1526 odcl_metadata.insert(
1527 "pricing".to_string(),
1528 json_value_to_serde_value(pricing_val),
1529 );
1530 }
1531
1532 if let Some(team_val) = data.get("team") {
1534 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1535 }
1536
1537 if let Some(roles_val) = data.get("roles") {
1539 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1540 }
1541
1542 if let Some(terms_val) = data.get("terms") {
1544 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1545 }
1546
1547 if let Some(servers_val) = data.get("servers") {
1549 odcl_metadata.insert(
1550 "servers".to_string(),
1551 json_value_to_serde_value(servers_val),
1552 );
1553 }
1554
1555 if let Some(infrastructure_val) = data.get("infrastructure") {
1557 odcl_metadata.insert(
1558 "infrastructure".to_string(),
1559 json_value_to_serde_value(infrastructure_val),
1560 );
1561 }
1562
1563 let database_type = self.extract_database_type_from_servers(data);
1565
1566 let (catalog_name, schema_name) = self.extract_catalog_schema(data);
1568
1569 let mut shared_domains: Vec<String> = Vec::new();
1571 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1572 for prop in custom_props {
1573 if let Some(prop_obj) = prop.as_object() {
1574 let prop_key = prop_obj
1575 .get("property")
1576 .and_then(|v| v.as_str())
1577 .unwrap_or("");
1578 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1579 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1580 {
1581 for item in arr {
1582 if let Some(s) = item.as_str() {
1583 shared_domains.push(s.to_string());
1584 }
1585 }
1586 }
1587 }
1588 }
1589 }
1590
1591 let mut tags: Vec<Tag> = Vec::new();
1593 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1594 for item in tags_arr {
1595 if let Some(s) = item.as_str() {
1596 if let Ok(tag) = Tag::from_str(s) {
1598 tags.push(tag);
1599 } else {
1600 tags.push(crate::models::Tag::Simple(s.to_string()));
1602 }
1603 }
1604 }
1605 }
1606
1607 let quality_rules = self.extract_quality_rules(data);
1609
1610 if !shared_domains.is_empty() {
1612 let shared_domains_json: Vec<serde_json::Value> = shared_domains
1613 .iter()
1614 .map(|d| serde_json::Value::String(d.clone()))
1615 .collect();
1616 odcl_metadata.insert(
1617 "sharedDomains".to_string(),
1618 serde_json::Value::Array(shared_domains_json),
1619 );
1620 }
1621
1622 let table_uuid = self.extract_table_uuid(data);
1623
1624 let table = Table {
1625 id: table_uuid,
1626 name: model_name.clone(),
1627 columns,
1628 database_type,
1629 catalog_name,
1630 schema_name,
1631 medallion_layers: Vec::new(),
1632 scd_pattern: None,
1633 data_vault_classification: None,
1634 modeling_level: None,
1635 tags,
1636 odcl_metadata,
1637 owner: None,
1638 sla: None,
1639 contact_details: None,
1640 infrastructure_type: None,
1641 notes: None,
1642 position: None,
1643 yaml_file_path: None,
1644 drawio_cell_id: None,
1645 quality: quality_rules,
1646 errors: Vec::new(),
1647 created_at: chrono::Utc::now(),
1648 updated_at: chrono::Utc::now(),
1649 };
1650
1651 info!(
1652 "Parsed Data Contract table: {} with {} warnings/errors",
1653 model_name,
1654 errors.len()
1655 );
1656 Ok((table, errors))
1657 }
1658
1659 fn parse_data_contract_field(
1661 &self,
1662 field_name: &str,
1663 field_data: &serde_json::Map<String, JsonValue>,
1664 data: &JsonValue,
1665 errors: &mut Vec<ParserError>,
1666 ) -> Result<Vec<Column>> {
1667 let mut columns = Vec::new();
1668
1669 let extract_quality_from_obj =
1671 |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
1672 let mut quality_rules = Vec::new();
1673 if let Some(quality_val) = obj.get("quality") {
1674 if let Some(arr) = quality_val.as_array() {
1675 for item in arr {
1677 if let Some(rule_obj) = item.as_object() {
1678 let mut rule = HashMap::new();
1679 for (key, value) in rule_obj {
1680 rule.insert(key.clone(), json_value_to_serde_value(value));
1681 }
1682 quality_rules.push(rule);
1683 }
1684 }
1685 } else if let Some(rule_obj) = quality_val.as_object() {
1686 let mut rule = HashMap::new();
1688 for (key, value) in rule_obj {
1689 rule.insert(key.clone(), json_value_to_serde_value(value));
1690 }
1691 quality_rules.push(rule);
1692 }
1693 }
1694 quality_rules
1695 };
1696
1697 let description = field_data
1699 .get("description")
1700 .and_then(|v| v.as_str())
1701 .unwrap_or("")
1702 .to_string();
1703
1704 let mut quality_rules = extract_quality_from_obj(field_data);
1706
1707 if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
1709 let ref_path = Some(ref_str.to_string());
1711
1712 if let Some(definition) = resolve_ref(ref_str, data) {
1713 if quality_rules.is_empty() {
1717 if let Some(def_obj) = definition.as_object() {
1718 quality_rules = extract_quality_from_obj(def_obj);
1719 }
1720 } else {
1721 if let Some(def_obj) = definition.as_object() {
1723 let def_quality = extract_quality_from_obj(def_obj);
1724 quality_rules.extend(def_quality);
1726 }
1727 }
1728
1729 let required = field_data
1730 .get("required")
1731 .and_then(|v| v.as_bool())
1732 .unwrap_or(false);
1733
1734 let has_nested = definition
1740 .get("type")
1741 .and_then(|v| v.as_str())
1742 .map(|s| s == "object")
1743 .unwrap_or(false)
1744 || definition.get("properties").is_some()
1745 || definition.get("fields").is_some();
1746
1747 if has_nested {
1748 if let Some(properties) =
1750 definition.get("properties").and_then(|v| v.as_object())
1751 {
1752 let nested_required: Vec<String> = definition
1754 .get("required")
1755 .and_then(|v| v.as_array())
1756 .map(|arr| {
1757 arr.iter()
1758 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1759 .collect()
1760 })
1761 .unwrap_or_default();
1762
1763 for (nested_name, nested_schema) in properties {
1764 let nested_required_field = nested_required.contains(nested_name);
1765 expand_nested_column(
1766 &format!("{}.{}", field_name, nested_name),
1767 nested_schema,
1768 !nested_required_field,
1769 &mut columns,
1770 errors,
1771 );
1772 }
1773 } else if let Some(fields) =
1774 definition.get("fields").and_then(|v| v.as_object())
1775 {
1776 for (nested_name, nested_schema) in fields {
1778 expand_nested_column(
1779 &format!("{}.{}", field_name, nested_name),
1780 nested_schema,
1781 true, &mut columns,
1783 errors,
1784 );
1785 }
1786 } else {
1787 let def_physical_type = definition
1789 .get("physicalType")
1790 .and_then(|v| v.as_str())
1791 .map(|s| s.to_string());
1792 columns.push(Column {
1793 name: field_name.to_string(),
1794 data_type: "OBJECT".to_string(),
1795 physical_type: def_physical_type,
1796 nullable: !required,
1797 description: if description.is_empty() {
1798 definition
1799 .get("description")
1800 .and_then(|v| v.as_str())
1801 .unwrap_or("")
1802 .to_string()
1803 } else {
1804 description.clone()
1805 },
1806 quality: quality_rules.clone(),
1807 relationships: ref_to_relationships(&ref_path),
1808 ..Default::default()
1809 });
1810 }
1811 } else {
1812 let def_type = definition
1814 .get("type")
1815 .and_then(|v| v.as_str())
1816 .unwrap_or("STRING")
1817 .to_uppercase();
1818
1819 let enum_values = definition
1820 .get("enum")
1821 .and_then(|v| v.as_array())
1822 .map(|arr| {
1823 arr.iter()
1824 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1825 .collect()
1826 })
1827 .unwrap_or_default();
1828
1829 let def_physical_type = definition
1830 .get("physicalType")
1831 .and_then(|v| v.as_str())
1832 .map(|s| s.to_string());
1833
1834 columns.push(Column {
1835 name: field_name.to_string(),
1836 data_type: def_type,
1837 physical_type: def_physical_type,
1838 nullable: !required,
1839 description: if description.is_empty() {
1840 definition
1841 .get("description")
1842 .and_then(|v| v.as_str())
1843 .unwrap_or("")
1844 .to_string()
1845 } else {
1846 description
1847 },
1848 quality: quality_rules,
1849 relationships: ref_to_relationships(&ref_path),
1850 enum_values,
1851 ..Default::default()
1852 });
1853 }
1854 return Ok(columns);
1855 } else {
1856 let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
1858 let mut error_map = HashMap::new();
1859 error_map.insert("type".to_string(), serde_json::json!("validation_error"));
1860 error_map.insert("field".to_string(), serde_json::json!("data_type"));
1861 error_map.insert(
1862 "message".to_string(),
1863 serde_json::json!(format!(
1864 "Field '{}' references undefined definition: {}",
1865 field_name, ref_str
1866 )),
1867 );
1868 col_errors.push(error_map);
1869 let field_physical_type = field_data
1870 .get("physicalType")
1871 .and_then(|v| v.as_str())
1872 .map(|s| s.to_string());
1873 columns.push(Column {
1874 name: field_name.to_string(),
1875 data_type: "OBJECT".to_string(),
1876 physical_type: field_physical_type,
1877 description,
1878 errors: col_errors,
1879 relationships: ref_to_relationships(&Some(ref_str.to_string())),
1880 ..Default::default()
1881 });
1882 return Ok(columns);
1883 }
1884 }
1885
1886 let field_type_str = field_data
1889 .get("logicalType")
1890 .and_then(|v| v.as_str())
1891 .or_else(|| field_data.get("type").and_then(|v| v.as_str()))
1892 .unwrap_or("STRING");
1893
1894 if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
1896 match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
1897 Ok(nested_cols) if !nested_cols.is_empty() => {
1898 let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
1900 "ARRAY<STRUCT<...>>".to_string()
1901 } else {
1902 "STRUCT<...>".to_string()
1903 };
1904
1905 let struct_physical_type = field_data
1906 .get("physicalType")
1907 .and_then(|v| v.as_str())
1908 .map(|s| s.to_string());
1909
1910 columns.push(Column {
1912 name: field_name.to_string(),
1913 data_type: parent_data_type,
1914 physical_type: struct_physical_type,
1915 nullable: !field_data
1916 .get("required")
1917 .and_then(|v| v.as_bool())
1918 .unwrap_or(false),
1919 description: description.clone(),
1920 quality: quality_rules.clone(),
1921 relationships: ref_to_relationships(
1922 &field_data
1923 .get("$ref")
1924 .and_then(|v| v.as_str())
1925 .map(|s| s.to_string()),
1926 ),
1927 ..Default::default()
1928 });
1929
1930 columns.extend(nested_cols);
1932 return Ok(columns);
1933 }
1934 Ok(_) | Err(_) => {
1935 }
1937 }
1938 }
1939
1940 let field_type = normalize_data_type(field_type_str);
1941
1942 if field_type == "ARRAY" {
1944 let items = field_data.get("items");
1945 if let Some(items_val) = items {
1946 if let Some(items_obj) = items_val.as_object() {
1947 let items_type = items_obj
1950 .get("logicalType")
1951 .and_then(|v| v.as_str())
1952 .or_else(|| items_obj.get("type").and_then(|v| v.as_str()));
1953
1954 let normalized_items_type = match items_type {
1956 Some("object") | Some("struct") => Some("object"),
1957 Some("array") => Some("array"),
1958 Some("string") | Some("varchar") | Some("char") | Some("text") => {
1959 Some("string")
1960 }
1961 Some("integer") | Some("int") | Some("bigint") | Some("smallint")
1962 | Some("tinyint") => Some("integer"),
1963 Some("number") | Some("decimal") | Some("double") | Some("float")
1964 | Some("numeric") => Some("number"),
1965 Some("boolean") | Some("bool") => Some("boolean"),
1966 Some("date") => Some("date"),
1967 Some("timestamp") | Some("datetime") => Some("timestamp"),
1968 Some("time") => Some("time"),
1969 other => other,
1970 };
1971
1972 if items_obj.get("fields").is_some()
1973 || items_obj.get("properties").is_some()
1974 || normalized_items_type == Some("object")
1975 {
1976 let array_physical_type = field_data
1978 .get("physicalType")
1979 .and_then(|v| v.as_str())
1980 .map(|s| s.to_string());
1981 columns.push(Column {
1982 name: field_name.to_string(),
1983 data_type: "ARRAY<OBJECT>".to_string(),
1984 physical_type: array_physical_type,
1985 nullable: !field_data
1986 .get("required")
1987 .and_then(|v| v.as_bool())
1988 .unwrap_or(false),
1989 description: field_data
1990 .get("description")
1991 .and_then(|v| v.as_str())
1992 .unwrap_or("")
1993 .to_string(),
1994 ..Default::default()
1995 });
1996
1997 let properties_obj =
2000 items_obj.get("properties").and_then(|v| v.as_object());
2001 let properties_arr = items_obj.get("properties").and_then(|v| v.as_array());
2002 let fields_obj = items_obj.get("fields").and_then(|v| v.as_object());
2003
2004 if let Some(fields_map) = properties_obj.or(fields_obj) {
2005 for (nested_field_name, nested_field_data) in fields_map {
2007 if let Some(nested_field_obj) = nested_field_data.as_object() {
2008 let nested_field_type = nested_field_obj
2010 .get("logicalType")
2011 .and_then(|v| v.as_str())
2012 .or_else(|| {
2013 nested_field_obj.get("type").and_then(|v| v.as_str())
2014 })
2015 .unwrap_or("STRING");
2016
2017 let nested_col_name =
2019 format!("{}.[].{}", field_name, nested_field_name);
2020 let mut local_errors = Vec::new();
2021 match self.parse_data_contract_field(
2022 &nested_col_name,
2023 nested_field_obj,
2024 data,
2025 &mut local_errors,
2026 ) {
2027 Ok(mut nested_cols) => {
2028 columns.append(&mut nested_cols);
2031 }
2032 Err(_) => {
2033 let nested_physical_type = nested_field_obj
2035 .get("physicalType")
2036 .and_then(|v| v.as_str())
2037 .map(|s| s.to_string());
2038 columns.push(Column {
2039 name: nested_col_name,
2040 data_type: nested_field_type.to_uppercase(),
2041 physical_type: nested_physical_type,
2042 nullable: !nested_field_obj
2043 .get("required")
2044 .and_then(|v| v.as_bool())
2045 .unwrap_or(false),
2046 description: nested_field_obj
2047 .get("description")
2048 .and_then(|v| v.as_str())
2049 .unwrap_or("")
2050 .to_string(),
2051 ..Default::default()
2052 });
2053 }
2054 }
2055 }
2056 }
2057 } else if let Some(properties_list) = properties_arr {
2058 let mut local_errors = Vec::new();
2060 for prop_data in properties_list {
2061 if let Some(prop_obj) = prop_data.as_object() {
2062 let nested_field_name = prop_obj
2064 .get("name")
2065 .or_else(|| prop_obj.get("id"))
2066 .and_then(|v| v.as_str())
2067 .unwrap_or("");
2068
2069 if !nested_field_name.is_empty() {
2070 let nested_col_name =
2072 format!("{}.[].{}", field_name, nested_field_name);
2073 match self.parse_data_contract_field(
2074 &nested_col_name,
2075 prop_obj,
2076 data,
2077 &mut local_errors,
2078 ) {
2079 Ok(mut nested_cols) => {
2080 columns.append(&mut nested_cols);
2083 }
2084 Err(_) => {
2085 let nested_field_type = prop_obj
2088 .get("logicalType")
2089 .and_then(|v| v.as_str())
2090 .or_else(|| {
2091 prop_obj
2092 .get("type")
2093 .and_then(|v| v.as_str())
2094 })
2095 .unwrap_or("STRING");
2096 let nested_physical_type = prop_obj
2097 .get("physicalType")
2098 .and_then(|v| v.as_str())
2099 .map(|s| s.to_string());
2100 columns.push(Column {
2101 name: nested_col_name,
2102 data_type: nested_field_type.to_uppercase(),
2103 physical_type: nested_physical_type,
2104 nullable: !prop_obj
2105 .get("required")
2106 .and_then(|v| v.as_bool())
2107 .unwrap_or(false),
2108 description: prop_obj
2109 .get("description")
2110 .and_then(|v| v.as_str())
2111 .unwrap_or("")
2112 .to_string(),
2113 ..Default::default()
2114 });
2115 }
2116 }
2117 }
2118 }
2119 }
2120 }
2121
2122 return Ok(columns);
2123 } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
2124 let array_physical_type = field_data
2126 .get("physicalType")
2127 .and_then(|v| v.as_str())
2128 .map(|s| s.to_string());
2129 columns.push(Column {
2130 name: field_name.to_string(),
2131 data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
2132 physical_type: array_physical_type,
2133 nullable: !field_data
2134 .get("required")
2135 .and_then(|v| v.as_bool())
2136 .unwrap_or(false),
2137 description: description.clone(),
2138 quality: quality_rules.clone(),
2139 relationships: ref_to_relationships(
2140 &field_data
2141 .get("$ref")
2142 .and_then(|v| v.as_str())
2143 .map(|s| s.to_string()),
2144 ),
2145 ..Default::default()
2146 });
2147 return Ok(columns);
2148 }
2149 } else if let Some(item_type_str) = items_val.as_str() {
2150 let array_physical_type = field_data
2152 .get("physicalType")
2153 .and_then(|v| v.as_str())
2154 .map(|s| s.to_string());
2155 columns.push(Column {
2156 name: field_name.to_string(),
2157 data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
2158 physical_type: array_physical_type,
2159 nullable: !field_data
2160 .get("required")
2161 .and_then(|v| v.as_bool())
2162 .unwrap_or(false),
2163 description: description.clone(),
2164 quality: quality_rules.clone(),
2165 relationships: ref_to_relationships(
2166 &field_data
2167 .get("$ref")
2168 .and_then(|v| v.as_str())
2169 .map(|s| s.to_string()),
2170 ),
2171 ..Default::default()
2172 });
2173 return Ok(columns);
2174 }
2175 }
2176 let array_physical_type = field_data
2178 .get("physicalType")
2179 .and_then(|v| v.as_str())
2180 .map(|s| s.to_string());
2181 columns.push(Column {
2182 name: field_name.to_string(),
2183 data_type: "ARRAY<STRING>".to_string(),
2184 physical_type: array_physical_type,
2185 nullable: !field_data
2186 .get("required")
2187 .and_then(|v| v.as_bool())
2188 .unwrap_or(false),
2189 description: description.clone(),
2190 quality: quality_rules.clone(),
2191 relationships: ref_to_relationships(
2192 &field_data
2193 .get("$ref")
2194 .and_then(|v| v.as_str())
2195 .map(|s| s.to_string()),
2196 ),
2197 ..Default::default()
2198 });
2199 return Ok(columns);
2200 }
2201
2202 let nested_fields_obj = field_data
2206 .get("properties")
2207 .and_then(|v| v.as_object())
2208 .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
2209 let nested_fields_arr = field_data.get("properties").and_then(|v| v.as_array());
2210
2211 if field_type == "OBJECT" && (nested_fields_obj.is_some() || nested_fields_arr.is_some()) {
2212 let parent_physical_type = field_data
2216 .get("physicalType")
2217 .and_then(|v| v.as_str())
2218 .map(|s| s.to_string());
2219
2220 columns.push(Column {
2222 name: field_name.to_string(),
2223 data_type: "OBJECT".to_string(),
2224 physical_type: parent_physical_type,
2225 nullable: !field_data
2226 .get("required")
2227 .and_then(|v| v.as_bool())
2228 .unwrap_or(false),
2229 description: description.clone(),
2230 quality: quality_rules.clone(),
2231 relationships: ref_to_relationships(
2232 &field_data
2233 .get("$ref")
2234 .and_then(|v| v.as_str())
2235 .map(|s| s.to_string()),
2236 ),
2237 ..Default::default()
2238 });
2239
2240 if let Some(fields_obj) = nested_fields_obj {
2242 for (nested_field_name, nested_field_data) in fields_obj {
2244 if let Some(nested_field_obj) = nested_field_data.as_object() {
2245 let nested_field_type = nested_field_obj
2246 .get("logicalType")
2247 .and_then(|v| v.as_str())
2248 .or_else(|| nested_field_obj.get("type").and_then(|v| v.as_str()))
2249 .unwrap_or("STRING");
2250
2251 let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2253 match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data)
2254 {
2255 Ok(mut nested_cols) => {
2256 columns.append(&mut nested_cols);
2259 }
2260 Err(_) => {
2261 let nested_physical_type = nested_field_obj
2263 .get("physicalType")
2264 .and_then(|v| v.as_str())
2265 .map(|s| s.to_string());
2266 columns.push(Column {
2267 name: nested_col_name,
2268 data_type: nested_field_type.to_uppercase(),
2269 physical_type: nested_physical_type,
2270 nullable: !nested_field_obj
2271 .get("required")
2272 .and_then(|v| v.as_bool())
2273 .unwrap_or(false),
2274 description: nested_field_obj
2275 .get("description")
2276 .and_then(|v| v.as_str())
2277 .unwrap_or("")
2278 .to_string(),
2279 ..Default::default()
2280 });
2281 }
2282 }
2283 }
2284 }
2285 } else if let Some(fields_arr) = nested_fields_arr {
2286 for prop_data in fields_arr {
2288 if let Some(prop_obj) = prop_data.as_object() {
2289 let nested_field_name = prop_obj
2291 .get("name")
2292 .or_else(|| prop_obj.get("id"))
2293 .and_then(|v| v.as_str())
2294 .unwrap_or("");
2295
2296 if !nested_field_name.is_empty() {
2297 let nested_field_type = prop_obj
2298 .get("logicalType")
2299 .and_then(|v| v.as_str())
2300 .or_else(|| prop_obj.get("type").and_then(|v| v.as_str()))
2301 .unwrap_or("STRING");
2302
2303 let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2305 match self.parse_odcl_v3_property(&nested_col_name, prop_obj, data) {
2306 Ok(mut nested_cols) => {
2307 columns.append(&mut nested_cols);
2310 }
2311 Err(_) => {
2312 let nested_physical_type = prop_obj
2314 .get("physicalType")
2315 .and_then(|v| v.as_str())
2316 .map(|s| s.to_string());
2317 columns.push(Column {
2318 name: nested_col_name,
2319 data_type: nested_field_type.to_uppercase(),
2320 physical_type: nested_physical_type,
2321 nullable: !prop_obj
2322 .get("required")
2323 .and_then(|v| v.as_bool())
2324 .unwrap_or(false),
2325 description: prop_obj
2326 .get("description")
2327 .and_then(|v| v.as_str())
2328 .unwrap_or("")
2329 .to_string(),
2330 ..Default::default()
2331 });
2332 }
2333 }
2334 }
2335 }
2336 }
2337 }
2338
2339 return Ok(columns);
2340 }
2341
2342 let required = field_data
2344 .get("required")
2345 .and_then(|v| v.as_bool())
2346 .unwrap_or(false);
2347
2348 let field_description = if description.is_empty() {
2350 field_data
2351 .get("description")
2352 .and_then(|v| v.as_str())
2353 .unwrap_or("")
2354 .to_string()
2355 } else {
2356 description
2357 };
2358
2359 let mut column_quality_rules = quality_rules;
2361
2362 if column_quality_rules.is_empty()
2364 && let Some(quality_val) = field_data.get("quality")
2365 {
2366 if let Some(arr) = quality_val.as_array() {
2367 for item in arr {
2369 if let Some(obj) = item.as_object() {
2370 let mut rule = HashMap::new();
2371 for (key, value) in obj {
2372 rule.insert(key.clone(), json_value_to_serde_value(value));
2373 }
2374 column_quality_rules.push(rule);
2375 }
2376 }
2377 } else if let Some(obj) = quality_val.as_object() {
2378 let mut rule = HashMap::new();
2380 for (key, value) in obj {
2381 rule.insert(key.clone(), json_value_to_serde_value(value));
2382 }
2383 column_quality_rules.push(rule);
2384 }
2385 }
2386
2387 let physical_type = field_data
2393 .get("physicalType")
2394 .and_then(|v| v.as_str())
2395 .map(|s| s.to_string());
2396
2397 let column = self.parse_column_metadata_from_field(
2399 field_name,
2400 &field_type,
2401 physical_type,
2402 !required,
2403 field_description,
2404 column_quality_rules,
2405 field_data,
2406 );
2407
2408 columns.push(column);
2409
2410 Ok(columns)
2411 }
2412
2413 #[allow(clippy::too_many_arguments)]
2422 fn parse_column_metadata_from_field(
2423 &self,
2424 name: &str,
2425 data_type: &str,
2426 physical_type: Option<String>,
2427 nullable: bool,
2428 description: String,
2429 quality: Vec<HashMap<String, serde_json::Value>>,
2430 field_data: &serde_json::Map<String, JsonValue>,
2431 ) -> Column {
2432 use crate::models::{AuthoritativeDefinition, LogicalTypeOptions};
2433
2434 let business_name = field_data
2436 .get("businessName")
2437 .and_then(|v| v.as_str())
2438 .map(|s| s.to_string());
2439
2440 let physical_name = field_data
2442 .get("physicalName")
2443 .and_then(|v| v.as_str())
2444 .map(|s| s.to_string());
2445
2446 let logical_type_options = field_data.get("logicalTypeOptions").and_then(|v| {
2448 v.as_object().map(|opts| LogicalTypeOptions {
2449 min_length: opts.get("minLength").and_then(|v| v.as_i64()),
2450 max_length: opts.get("maxLength").and_then(|v| v.as_i64()),
2451 pattern: opts
2452 .get("pattern")
2453 .and_then(|v| v.as_str())
2454 .map(|s| s.to_string()),
2455 format: opts
2456 .get("format")
2457 .and_then(|v| v.as_str())
2458 .map(|s| s.to_string()),
2459 minimum: opts.get("minimum").cloned(),
2460 maximum: opts.get("maximum").cloned(),
2461 exclusive_minimum: opts.get("exclusiveMinimum").cloned(),
2462 exclusive_maximum: opts.get("exclusiveMaximum").cloned(),
2463 precision: opts
2464 .get("precision")
2465 .and_then(|v| v.as_i64())
2466 .map(|n| n as i32),
2467 scale: opts.get("scale").and_then(|v| v.as_i64()).map(|n| n as i32),
2468 })
2469 });
2470
2471 let primary_key = field_data
2473 .get("primaryKey")
2474 .and_then(|v| v.as_bool())
2475 .unwrap_or(false);
2476
2477 let primary_key_position = field_data
2479 .get("primaryKeyPosition")
2480 .and_then(|v| v.as_i64())
2481 .map(|n| n as i32);
2482
2483 let unique = field_data
2485 .get("unique")
2486 .and_then(|v| v.as_bool())
2487 .unwrap_or(false);
2488
2489 let partitioned = field_data
2491 .get("partitioned")
2492 .and_then(|v| v.as_bool())
2493 .unwrap_or(false);
2494
2495 let partition_key_position = field_data
2497 .get("partitionKeyPosition")
2498 .and_then(|v| v.as_i64())
2499 .map(|n| n as i32);
2500
2501 let clustered = field_data
2503 .get("clustered")
2504 .and_then(|v| v.as_bool())
2505 .unwrap_or(false);
2506
2507 let classification = field_data
2509 .get("classification")
2510 .and_then(|v| v.as_str())
2511 .map(|s| s.to_string());
2512
2513 let critical_data_element = field_data
2515 .get("criticalDataElement")
2516 .and_then(|v| v.as_bool())
2517 .unwrap_or(false);
2518
2519 let encrypted_name = field_data
2521 .get("encryptedName")
2522 .and_then(|v| v.as_str())
2523 .map(|s| s.to_string());
2524
2525 let transform_source_objects = field_data
2527 .get("transformSourceObjects")
2528 .and_then(|v| v.as_array())
2529 .map(|arr| {
2530 arr.iter()
2531 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2532 .collect()
2533 })
2534 .unwrap_or_default();
2535
2536 let transform_logic = field_data
2538 .get("transformLogic")
2539 .and_then(|v| v.as_str())
2540 .map(|s| s.to_string());
2541
2542 let transform_description = field_data
2544 .get("transformDescription")
2545 .and_then(|v| v.as_str())
2546 .map(|s| s.to_string());
2547
2548 let examples = field_data
2550 .get("examples")
2551 .and_then(|v| v.as_array())
2552 .map(|arr| arr.to_vec())
2553 .unwrap_or_default();
2554
2555 let authoritative_definitions = field_data
2557 .get("authoritativeDefinitions")
2558 .and_then(|v| v.as_array())
2559 .map(|arr| {
2560 arr.iter()
2561 .filter_map(|item| {
2562 item.as_object().map(|obj| AuthoritativeDefinition {
2563 definition_type: obj
2564 .get("type")
2565 .and_then(|v| v.as_str())
2566 .unwrap_or("")
2567 .to_string(),
2568 url: obj
2569 .get("url")
2570 .and_then(|v| v.as_str())
2571 .unwrap_or("")
2572 .to_string(),
2573 })
2574 })
2575 .collect()
2576 })
2577 .unwrap_or_default();
2578
2579 let tags = field_data
2581 .get("tags")
2582 .and_then(|v| v.as_array())
2583 .map(|arr| {
2584 arr.iter()
2585 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2586 .collect()
2587 })
2588 .unwrap_or_default();
2589
2590 let custom_properties = field_data
2592 .get("customProperties")
2593 .and_then(|v| v.as_array())
2594 .map(|arr| {
2595 arr.iter()
2596 .filter_map(|item| {
2597 item.as_object().and_then(|obj| {
2598 let key = obj.get("property").and_then(|v| v.as_str())?;
2599 let value = obj.get("value").cloned()?;
2600 Some((key.to_string(), value))
2601 })
2602 })
2603 .collect()
2604 })
2605 .unwrap_or_default();
2606
2607 let secondary_key = field_data
2609 .get("businessKey")
2610 .and_then(|v| v.as_bool())
2611 .unwrap_or(false);
2612
2613 let enum_values = field_data
2615 .get("enum")
2616 .and_then(|v| v.as_array())
2617 .map(|arr| {
2618 arr.iter()
2619 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2620 .collect()
2621 })
2622 .unwrap_or_default();
2623
2624 let constraints = field_data
2626 .get("constraints")
2627 .and_then(|v| v.as_array())
2628 .map(|arr| {
2629 arr.iter()
2630 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2631 .collect()
2632 })
2633 .unwrap_or_default();
2634
2635 Column {
2636 name: name.to_string(),
2637 data_type: data_type.to_string(),
2638 physical_type,
2639 physical_name,
2640 nullable,
2641 description,
2642 quality,
2643 business_name,
2644 logical_type_options,
2645 primary_key,
2646 primary_key_position,
2647 unique,
2648 partitioned,
2649 partition_key_position,
2650 clustered,
2651 classification,
2652 critical_data_element,
2653 encrypted_name,
2654 transform_source_objects,
2655 transform_logic,
2656 transform_description,
2657 examples,
2658 authoritative_definitions,
2659 tags,
2660 custom_properties,
2661 secondary_key,
2662 enum_values,
2663 constraints,
2664 foreign_key: self.parse_foreign_key_from_data_contract(field_data),
2665 relationships: self.parse_relationships_from_field(field_data),
2666 ..Default::default()
2667 }
2668 }
2669
2670 fn parse_foreign_key_from_data_contract(
2672 &self,
2673 field_data: &serde_json::Map<String, JsonValue>,
2674 ) -> Option<ForeignKey> {
2675 field_data
2676 .get("foreignKey")
2677 .and_then(|v| v.as_object())
2678 .map(|fk_obj| ForeignKey {
2679 table_id: fk_obj
2680 .get("table")
2681 .or_else(|| fk_obj.get("table_id"))
2682 .and_then(|v| v.as_str())
2683 .unwrap_or("")
2684 .to_string(),
2685 column_name: fk_obj
2686 .get("column")
2687 .or_else(|| fk_obj.get("column_name"))
2688 .and_then(|v| v.as_str())
2689 .unwrap_or("")
2690 .to_string(),
2691 })
2692 }
2693
2694 fn parse_relationships_from_field(
2697 &self,
2698 field_data: &serde_json::Map<String, JsonValue>,
2699 ) -> Vec<PropertyRelationship> {
2700 let mut relationships = Vec::new();
2701
2702 if let Some(rels_array) = field_data.get("relationships").and_then(|v| v.as_array()) {
2704 for rel in rels_array {
2705 if let Some(rel_obj) = rel.as_object() {
2706 let rel_type = rel_obj
2707 .get("type")
2708 .and_then(|v| v.as_str())
2709 .unwrap_or("foreignKey")
2710 .to_string();
2711 let to = rel_obj
2712 .get("to")
2713 .and_then(|v| v.as_str())
2714 .unwrap_or("")
2715 .to_string();
2716
2717 if !to.is_empty() {
2718 relationships.push(PropertyRelationship {
2719 relationship_type: rel_type,
2720 to,
2721 });
2722 }
2723 }
2724 }
2725 }
2726
2727 if relationships.is_empty()
2729 && let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str())
2730 {
2731 let to = if ref_str.starts_with("#/definitions/") {
2733 let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
2734 format!("definitions/{}", def_path)
2735 } else if ref_str.starts_with("#/") {
2736 ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
2737 } else {
2738 ref_str.to_string()
2739 };
2740
2741 relationships.push(PropertyRelationship {
2742 relationship_type: "foreignKey".to_string(),
2743 to,
2744 });
2745 }
2746
2747 relationships
2748 }
2749
2750 fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2752 if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
2754 if let Some((_, server_data)) = servers_obj.iter().next()
2756 && let Some(server_obj) = server_data.as_object()
2757 {
2758 return server_obj
2759 .get("type")
2760 .and_then(|v| v.as_str())
2761 .and_then(|s| self.parse_database_type(s));
2762 }
2763 } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
2764 if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
2766 return server_obj
2767 .get("type")
2768 .and_then(|v| v.as_str())
2769 .and_then(|s| self.parse_database_type(s));
2770 }
2771 }
2772 None
2773 }
2774
2775 fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
2777 match s.to_lowercase().as_str() {
2778 "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
2779 "postgres" | "postgresql" => Some(DatabaseType::Postgres),
2780 "mysql" => Some(DatabaseType::Mysql),
2781 "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
2782 "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
2783 _ => None,
2784 }
2785 }
2786
2787 pub fn parse_struct_type_from_string(
2791 &self,
2792 field_name: &str,
2793 type_str: &str,
2794 field_data: &serde_json::Map<String, JsonValue>,
2795 ) -> Result<Vec<Column>> {
2796 let mut columns = Vec::new();
2797
2798 let normalized_type = type_str
2800 .lines()
2801 .map(|line| line.trim())
2802 .filter(|line| !line.is_empty())
2803 .collect::<Vec<_>>()
2804 .join(" ");
2805
2806 let type_str_upper = normalized_type.to_uppercase();
2807
2808 let is_array = type_str_upper.starts_with("ARRAY<");
2810 let struct_start = type_str_upper.find("STRUCT<");
2811
2812 if let Some(start_pos) = struct_start {
2813 let struct_content_start = start_pos + 7; let struct_content = &normalized_type[struct_content_start..];
2817
2818 let mut depth = 1;
2821 let mut end_pos = None;
2822 for (i, ch) in struct_content.char_indices() {
2823 match ch {
2824 '<' => depth += 1,
2825 '>' => {
2826 depth -= 1;
2827 if depth == 0 {
2828 end_pos = Some(i);
2829 break;
2830 }
2831 }
2832 _ => {}
2833 }
2834 }
2835
2836 let struct_fields_str = if let Some(end) = end_pos {
2838 &struct_content[..end]
2839 } else {
2840 struct_content.trim_end_matches('>').trim()
2842 };
2843
2844 let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
2846
2847 for (nested_name, nested_type) in fields {
2851 let nested_type_upper = nested_type.to_uppercase();
2852 let nested_col_name = if is_array {
2853 format!("{}.[].{}", field_name, nested_name)
2854 } else {
2855 format!("{}.{}", field_name, nested_name)
2856 };
2857
2858 let is_nested_struct = nested_type_upper.starts_with("STRUCT<");
2861 let is_nested_array_struct = nested_type_upper.starts_with("ARRAY<STRUCT<");
2862
2863 if is_nested_struct || is_nested_array_struct {
2864 match self.parse_struct_type_from_string(
2872 &nested_col_name,
2873 &nested_type,
2874 field_data,
2875 ) {
2876 Ok(nested_cols) => {
2877 columns.extend(nested_cols);
2884 }
2885 Err(_) => {
2886 let fallback_data_type = if is_nested_array_struct {
2888 "ARRAY<STRUCT<...>>".to_string()
2889 } else {
2890 "STRUCT<...>".to_string()
2891 };
2892 let nested_physical_type = field_data
2893 .get("physicalType")
2894 .and_then(|v| v.as_str())
2895 .map(|s| s.to_string());
2896 columns.push(Column {
2897 name: nested_col_name,
2898 data_type: fallback_data_type,
2899 physical_type: nested_physical_type,
2900 nullable: !field_data
2901 .get("required")
2902 .and_then(|v| v.as_bool())
2903 .unwrap_or(false),
2904 description: field_data
2905 .get("description")
2906 .and_then(|v| v.as_str())
2907 .unwrap_or("")
2908 .to_string(),
2909 ..Default::default()
2910 });
2911 }
2912 }
2913 } else if nested_type_upper.starts_with("ARRAY<") {
2914 let nested_physical_type = field_data
2917 .get("physicalType")
2918 .and_then(|v| v.as_str())
2919 .map(|s| s.to_string());
2920 columns.push(Column {
2921 name: nested_col_name,
2922 data_type: normalize_data_type(&nested_type),
2923 physical_type: nested_physical_type,
2924 nullable: !field_data
2925 .get("required")
2926 .and_then(|v| v.as_bool())
2927 .unwrap_or(false),
2928 description: field_data
2929 .get("description")
2930 .and_then(|v| v.as_str())
2931 .unwrap_or("")
2932 .to_string(),
2933 ..Default::default()
2934 });
2935 } else {
2936 let nested_physical_type = field_data
2938 .get("physicalType")
2939 .and_then(|v| v.as_str())
2940 .map(|s| s.to_string());
2941 columns.push(Column {
2942 name: nested_col_name,
2943 data_type: normalize_data_type(&nested_type),
2944 physical_type: nested_physical_type,
2945 nullable: !field_data
2946 .get("required")
2947 .and_then(|v| v.as_bool())
2948 .unwrap_or(false),
2949 description: field_data
2950 .get("description")
2951 .and_then(|v| v.as_str())
2952 .unwrap_or("")
2953 .to_string(),
2954 ..Default::default()
2955 });
2956 }
2957 }
2958
2959 return Ok(columns);
2960 }
2961
2962 Ok(Vec::new())
2964 }
2965
2966 fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
2968 let mut fields = Vec::new();
2969 let mut current_field = String::new();
2970 let mut depth = 0;
2971 let mut in_string = false;
2972 let mut string_char = None;
2973
2974 for ch in fields_str.chars() {
2975 match ch {
2976 '\'' | '"' if !in_string || Some(ch) == string_char => {
2977 if in_string {
2978 in_string = false;
2979 string_char = None;
2980 } else {
2981 in_string = true;
2982 string_char = Some(ch);
2983 }
2984 current_field.push(ch);
2985 }
2986 '<' if !in_string => {
2987 depth += 1;
2988 current_field.push(ch);
2989 }
2990 '>' if !in_string => {
2991 depth -= 1;
2992 current_field.push(ch);
2993 }
2994 ',' if !in_string && depth == 0 => {
2995 let trimmed = current_field.trim();
2997 if !trimmed.is_empty()
2998 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2999 {
3000 fields.push((name, type_part));
3001 }
3002 current_field.clear();
3003 }
3004 _ => {
3005 current_field.push(ch);
3006 }
3007 }
3008 }
3009
3010 let trimmed = current_field.trim();
3012 if !trimmed.is_empty()
3013 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
3014 {
3015 fields.push((name, type_part));
3016 }
3017
3018 Ok(fields)
3019 }
3020
3021 fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
3023 let colon_pos = field_def.find(':')?;
3025 let name = field_def[..colon_pos].trim().to_string();
3026 let type_part = field_def[colon_pos + 1..].trim().to_string();
3027
3028 if name.is_empty() || type_part.is_empty() {
3029 return None;
3030 }
3031
3032 Some((name, type_part))
3033 }
3034
3035 fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
3037 let mut catalog_name = None;
3038 let mut schema_name = None;
3039
3040 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
3041 for prop in custom_props {
3042 if let Some(prop_obj) = prop.as_object() {
3043 let prop_key = prop_obj
3044 .get("property")
3045 .and_then(|v| v.as_str())
3046 .unwrap_or("");
3047 let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
3048
3049 match prop_key {
3050 "catalogName" | "catalog_name" => {
3051 catalog_name = prop_value.map(|s| s.to_string());
3052 }
3053 "schemaName" | "schema_name" => {
3054 schema_name = prop_value.map(|s| s.to_string());
3055 }
3056 _ => {}
3057 }
3058 }
3059 }
3060 }
3061
3062 if catalog_name.is_none() {
3064 catalog_name = data
3065 .get("catalog_name")
3066 .and_then(|v| v.as_str())
3067 .map(|s| s.to_string());
3068 }
3069 if schema_name.is_none() {
3070 schema_name = data
3071 .get("schema_name")
3072 .and_then(|v| v.as_str())
3073 .map(|s| s.to_string());
3074 }
3075
3076 (catalog_name, schema_name)
3077 }
3078}
3079
3080impl Default for ODCSImporter {
3081 fn default() -> Self {
3082 Self::new()
3083 }
3084}
3085
3086#[cfg(test)]
3087mod tests {
3088 use super::*;
3089
3090 #[test]
3091 fn test_parse_simple_odcl_table() {
3092 let mut parser = ODCSImporter::new();
3093 let odcl_yaml = r#"
3094name: users
3095columns:
3096 - name: id
3097 data_type: INT
3098 nullable: false
3099 primary_key: true
3100 - name: name
3101 data_type: VARCHAR(255)
3102 nullable: false
3103database_type: Postgres
3104"#;
3105
3106 let (table, errors) = parser.parse(odcl_yaml).unwrap();
3107 assert_eq!(table.name, "users");
3108 assert_eq!(table.columns.len(), 2);
3109 assert_eq!(table.columns[0].name, "id");
3110 assert_eq!(table.database_type, Some(DatabaseType::Postgres));
3111 assert_eq!(errors.len(), 0);
3112 }
3113
3114 #[test]
3115 fn test_parse_odcl_with_metadata() {
3116 let mut parser = ODCSImporter::new();
3117 let odcl_yaml = r#"
3118name: users
3119columns:
3120 - name: id
3121 data_type: INT
3122medallion_layer: gold
3123scd_pattern: TYPE_2
3124odcl_metadata:
3125 description: "User table"
3126 owner: "data-team"
3127"#;
3128
3129 let (table, errors) = parser.parse(odcl_yaml).unwrap();
3130 assert_eq!(table.medallion_layers.len(), 1);
3131 assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
3132 assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
3133 if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
3134 assert_eq!(desc, "User table");
3135 }
3136 assert_eq!(errors.len(), 0);
3137 }
3138
3139 #[test]
3140 fn test_parse_odcl_with_data_vault() {
3141 let mut parser = ODCSImporter::new();
3142 let odcl_yaml = r#"
3143name: hub_customer
3144columns:
3145 - name: customer_key
3146 data_type: VARCHAR(50)
3147data_vault_classification: Hub
3148"#;
3149
3150 let (table, errors) = parser.parse(odcl_yaml).unwrap();
3151 assert_eq!(
3152 table.data_vault_classification,
3153 Some(DataVaultClassification::Hub)
3154 );
3155 assert_eq!(errors.len(), 0);
3156 }
3157
3158 #[test]
3159 fn test_parse_invalid_odcl() {
3160 let mut parser = ODCSImporter::new();
3161 let invalid_yaml = "not: valid: yaml: structure:";
3162
3163 assert!(parser.parse(invalid_yaml).is_err());
3165 }
3166
3167 #[test]
3168 fn test_parse_odcl_missing_required_fields() {
3169 let mut parser = ODCSImporter::new();
3170 let non_conformant = r#"
3171name: users
3172# Missing required columns field
3173"#;
3174
3175 assert!(parser.parse(non_conformant).is_err());
3177 }
3178
3179 #[test]
3180 fn test_parse_odcl_with_foreign_key() {
3181 let mut parser = ODCSImporter::new();
3182 let odcl_yaml = r#"
3183name: orders
3184columns:
3185 - name: id
3186 data_type: INT
3187 primary_key: true
3188 - name: user_id
3189 data_type: INT
3190 foreign_key:
3191 table_id: users
3192 column_name: id
3193"#;
3194
3195 let (table, errors) = parser.parse(odcl_yaml).unwrap();
3196 assert_eq!(table.columns.len(), 2);
3197 let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
3198 assert!(user_id_col.foreign_key.is_some());
3199 assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
3200 assert_eq!(errors.len(), 0);
3201 }
3202
3203 #[test]
3204 fn test_parse_odcl_with_constraints() {
3205 let mut parser = ODCSImporter::new();
3206 let odcl_yaml = r#"
3207name: products
3208columns:
3209 - name: id
3210 data_type: INT
3211 primary_key: true
3212 - name: name
3213 data_type: VARCHAR(255)
3214 nullable: false
3215 constraints:
3216 - UNIQUE
3217 - NOT NULL
3218"#;
3219
3220 let (table, errors) = parser.parse(odcl_yaml).unwrap();
3221 assert_eq!(table.columns.len(), 2);
3222 let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
3223 assert!(!name_col.nullable);
3224 assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
3225 assert_eq!(errors.len(), 0);
3226 }
3227}