1use super::odcs_shared::{
12 ParserError, column_to_column_data, expand_nested_column, json_value_to_serde_value,
13 normalize_data_type, parse_data_vault_classification, parse_medallion_layer, parse_scd_pattern,
14 resolve_ref, yaml_to_json_value,
15};
16use super::{ImportError, ImportResult, TableData};
17use crate::models::column::ForeignKey;
18use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
19use crate::models::{Column, PropertyRelationship, Table, Tag};
20use anyhow::{Context, Result};
21use serde_json::Value as JsonValue;
22use std::collections::HashMap;
23use std::str::FromStr;
24use tracing::info;
25
26pub use super::odcs_shared::ParserError as OdcsParserError;
28
29fn ref_to_relationships(ref_path: &Option<String>) -> Vec<PropertyRelationship> {
32 match ref_path {
33 Some(ref_str) => {
34 let to = if ref_str.starts_with("#/definitions/") {
35 let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
36 format!("definitions/{}", def_path)
37 } else if ref_str.starts_with("#/") {
38 ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
39 } else {
40 ref_str.clone()
41 };
42 vec![PropertyRelationship {
43 relationship_type: "foreignKey".to_string(),
44 to,
45 }]
46 }
47 None => Vec::new(),
48 }
49}
50
51pub struct ODCSImporter {
54 current_yaml_data: Option<serde_yaml::Value>,
56}
57
58impl ODCSImporter {
59 pub fn new() -> Self {
69 Self {
70 current_yaml_data: None,
71 }
72 }
73
74 pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
107 let yaml_data: serde_yaml::Value = serde_yaml::from_str(yaml_content)
109 .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
110
111 let json_data = yaml_to_json_value(&yaml_data).map_err(|e| {
112 ImportError::ParseError(format!("Failed to convert YAML to JSON: {}", e))
113 })?;
114
115 match self.parse(yaml_content) {
116 Ok((table, errors)) => {
117 let sdk_tables = vec![TableData {
119 table_index: 0,
120 id: Some(table.id.to_string()),
121 name: Some(table.name.clone()),
122 api_version: json_data
123 .get("apiVersion")
124 .and_then(|v| v.as_str())
125 .map(|s| s.to_string()),
126 version: json_data
127 .get("version")
128 .and_then(|v| v.as_str())
129 .map(|s| s.to_string()),
130 status: json_data
131 .get("status")
132 .and_then(|v| v.as_str())
133 .map(|s| s.to_string()),
134 kind: json_data
135 .get("kind")
136 .and_then(|v| v.as_str())
137 .map(|s| s.to_string()),
138 domain: json_data
139 .get("domain")
140 .and_then(|v| v.as_str())
141 .map(|s| s.to_string()),
142 data_product: json_data
143 .get("dataProduct")
144 .and_then(|v| v.as_str())
145 .map(|s| s.to_string()),
146 tenant: json_data
147 .get("tenant")
148 .and_then(|v| v.as_str())
149 .map(|s| s.to_string()),
150 description: json_data.get("description").cloned(),
151 columns: table.columns.iter().map(column_to_column_data).collect(),
152 servers: json_data
153 .get("servers")
154 .and_then(|v| v.as_array())
155 .cloned()
156 .unwrap_or_default(),
157 team: json_data.get("team").cloned(),
158 support: json_data.get("support").cloned(),
159 roles: json_data
160 .get("roles")
161 .and_then(|v| v.as_array())
162 .cloned()
163 .unwrap_or_default(),
164 sla_properties: json_data
165 .get("slaProperties")
166 .and_then(|v| v.as_array())
167 .cloned()
168 .unwrap_or_default(),
169 quality: table.quality.clone(),
170 price: json_data.get("price").cloned(),
171 tags: table.tags.iter().map(|t| t.to_string()).collect(),
172 custom_properties: json_data
173 .get("customProperties")
174 .and_then(|v| v.as_array())
175 .cloned()
176 .unwrap_or_default(),
177 authoritative_definitions: json_data
178 .get("authoritativeDefinitions")
179 .and_then(|v| v.as_array())
180 .cloned()
181 .unwrap_or_default(),
182 contract_created_ts: json_data
183 .get("contractCreatedTs")
184 .and_then(|v| v.as_str())
185 .map(|s| s.to_string()),
186 odcs_metadata: table.odcl_metadata.clone(),
187 }];
188 let sdk_errors: Vec<ImportError> = errors
189 .iter()
190 .map(|e| ImportError::ParseError(e.message.clone()))
191 .collect();
192 Ok(ImportResult {
193 tables: sdk_tables,
194 tables_requiring_name: Vec::new(),
195 errors: sdk_errors,
196 ai_suggestions: None,
197 })
198 }
199 Err(e) => Err(ImportError::ParseError(e.to_string())),
200 }
201 }
202
203 pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
214 self.parse(yaml_content)
215 }
216
217 fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
227 let _errors: Vec<ParserError> = Vec::new();
229
230 let data: serde_yaml::Value =
232 serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
233
234 if data.is_null() {
235 return Err(anyhow::anyhow!("Empty YAML content"));
236 }
237
238 self.current_yaml_data = Some(data.clone());
240
241 let json_data = yaml_to_json_value(&data)?;
243
244 if self.is_liquibase_format(&json_data) {
246 return self.parse_liquibase(&json_data);
247 }
248
249 if self.is_odcl_v3_format(&json_data) {
250 return self.parse_odcl_v3(&json_data);
251 }
252
253 if self.is_data_contract_format(&json_data) {
254 return self.parse_data_contract(&json_data);
255 }
256
257 self.parse_simple_odcl(&json_data)
259 }
260
261 fn is_liquibase_format(&self, data: &JsonValue) -> bool {
263 if data.get("databaseChangeLog").is_some() {
264 return true;
265 }
266 if let Some(obj) = data.as_object() {
268 let obj_str = format!("{:?}", obj);
269 if obj_str.contains("changeSet") {
270 return true;
271 }
272 }
273 false
274 }
275
276 fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
278 if let Some(obj) = data.as_object() {
279 let has_api_version = obj.contains_key("apiVersion");
280 let has_kind = obj
281 .get("kind")
282 .and_then(|v| v.as_str())
283 .map(|s| s == "DataContract")
284 .unwrap_or(false);
285 let has_id = obj.contains_key("id");
286 let has_version = obj.contains_key("version");
287 return has_api_version && has_kind && has_id && has_version;
288 }
289 false
290 }
291
292 fn is_data_contract_format(&self, data: &JsonValue) -> bool {
294 if let Some(obj) = data.as_object() {
295 let has_spec = obj.contains_key("dataContractSpecification");
296 let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
297 return has_spec && has_models;
298 }
299 false
300 }
301
302 fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
304 let mut errors = Vec::new();
305
306 let name = data
308 .get("name")
309 .and_then(|v| v.as_str())
310 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
311 .to_string();
312
313 let columns_data = data
315 .get("columns")
316 .and_then(|v| v.as_array())
317 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
318
319 let mut columns = Vec::new();
320 for (idx, col_data) in columns_data.iter().enumerate() {
321 match self.parse_column(col_data) {
322 Ok(col) => columns.push(col),
323 Err(e) => {
324 errors.push(ParserError {
325 error_type: "column_parse_error".to_string(),
326 field: format!("columns[{}]", idx),
327 message: e.to_string(),
328 });
329 }
330 }
331 }
332
333 let database_type = self.extract_database_type(data);
335 let medallion_layers = self.extract_medallion_layers(data);
336 let scd_pattern = self.extract_scd_pattern(data);
337 let data_vault_classification = self.extract_data_vault_classification(data);
338 let quality_rules = self.extract_quality_rules(data);
339
340 if scd_pattern.is_some() && data_vault_classification.is_some() {
342 errors.push(ParserError {
343 error_type: "validation_error".to_string(),
344 field: "patterns".to_string(),
345 message: "SCD pattern and Data Vault classification are mutually exclusive"
346 .to_string(),
347 });
348 }
349
350 let mut odcl_metadata = HashMap::new();
352 if let Some(metadata) = data.get("odcl_metadata")
353 && let Some(obj) = metadata.as_object()
354 {
355 for (key, value) in obj {
356 odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
357 }
358 }
359
360 let table_uuid = self.extract_table_uuid(data);
361
362 let table = Table {
363 id: table_uuid,
364 name,
365 columns,
366 database_type,
367 catalog_name: None,
368 schema_name: None,
369 medallion_layers,
370 scd_pattern,
371 data_vault_classification,
372 modeling_level: None,
373 tags: Vec::<Tag>::new(),
374 odcl_metadata,
375 owner: None,
376 sla: None,
377 contact_details: None,
378 infrastructure_type: None,
379 notes: None,
380 position: None,
381 yaml_file_path: None,
382 drawio_cell_id: None,
383 quality: quality_rules,
384 errors: Vec::new(),
385 created_at: chrono::Utc::now(),
386 updated_at: chrono::Utc::now(),
387 };
388
389 info!("Parsed ODCL table: {}", table.name);
390 Ok((table, errors))
391 }
392
393 fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
395 let name = col_data
396 .get("name")
397 .and_then(|v| v.as_str())
398 .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
399 .to_string();
400
401 let data_type = col_data
402 .get("data_type")
403 .and_then(|v| v.as_str())
404 .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
405 .to_string();
406
407 let data_type = normalize_data_type(&data_type);
409
410 let nullable = col_data
411 .get("nullable")
412 .and_then(|v| v.as_bool())
413 .unwrap_or(true);
414
415 let primary_key = col_data
416 .get("primary_key")
417 .and_then(|v| v.as_bool())
418 .unwrap_or(false);
419
420 let foreign_key = col_data
421 .get("foreign_key")
422 .and_then(|v| self.parse_foreign_key(v));
423
424 let constraints = col_data
425 .get("constraints")
426 .and_then(|v| v.as_array())
427 .map(|arr| {
428 arr.iter()
429 .filter_map(|v| v.as_str().map(|s| s.to_string()))
430 .collect()
431 })
432 .unwrap_or_default();
433
434 let description = col_data
435 .get("description")
436 .and_then(|v| v.as_str())
437 .map(|s| s.to_string())
438 .unwrap_or_default();
439
440 let mut column_quality_rules = Vec::new();
442 if let Some(quality_val) = col_data.get("quality") {
443 if let Some(arr) = quality_val.as_array() {
444 for item in arr {
446 if let Some(obj) = item.as_object() {
447 let mut rule = HashMap::new();
448 for (key, value) in obj {
449 rule.insert(key.clone(), json_value_to_serde_value(value));
450 }
451 column_quality_rules.push(rule);
452 }
453 }
454 } else if let Some(obj) = quality_val.as_object() {
455 let mut rule = HashMap::new();
457 for (key, value) in obj {
458 rule.insert(key.clone(), json_value_to_serde_value(value));
459 }
460 column_quality_rules.push(rule);
461 }
462 }
463
464 Ok(Column {
469 name,
470 data_type,
471 nullable,
472 primary_key,
473 foreign_key,
474 constraints,
475 description,
476 quality: column_quality_rules,
477 ..Default::default()
478 })
479 }
480
481 fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
483 let obj = fk_data.as_object()?;
484 Some(ForeignKey {
485 table_id: obj
486 .get("table_id")
487 .or_else(|| obj.get("table"))
488 .and_then(|v| v.as_str())
489 .unwrap_or("")
490 .to_string(),
491 column_name: obj
492 .get("column_name")
493 .or_else(|| obj.get("column"))
494 .and_then(|v| v.as_str())
495 .unwrap_or("")
496 .to_string(),
497 })
498 }
499
500 fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
502 data.get("database_type")
503 .and_then(|v| v.as_str())
504 .and_then(|s| match s.to_uppercase().as_str() {
505 "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
506 "MYSQL" => Some(DatabaseType::Mysql),
507 "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
508 "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
509 "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
510 _ => None,
511 })
512 }
513
514 fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
516 let mut layers = Vec::new();
517
518 if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
520 for item in arr {
521 if let Some(s) = item.as_str()
522 && let Ok(layer) = parse_medallion_layer(s)
523 {
524 layers.push(layer);
525 }
526 }
527 }
528 else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
530 && let Ok(layer) = parse_medallion_layer(s)
531 {
532 layers.push(layer);
533 }
534
535 layers
536 }
537
538 fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
540 data.get("scd_pattern")
541 .and_then(|v| v.as_str())
542 .and_then(|s| parse_scd_pattern(s).ok())
543 }
544
545 fn extract_data_vault_classification(
547 &self,
548 data: &JsonValue,
549 ) -> Option<DataVaultClassification> {
550 data.get("data_vault_classification")
551 .and_then(|v| v.as_str())
552 .and_then(|s| parse_data_vault_classification(s).ok())
553 }
554
555 fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
557 use serde_json::Value;
558 let mut quality_rules = Vec::new();
559
560 if let Some(quality_val) = data.get("quality") {
562 if let Some(arr) = quality_val.as_array() {
563 for item in arr {
565 if let Some(obj) = item.as_object() {
566 let mut rule = HashMap::new();
567 for (key, value) in obj {
568 rule.insert(key.clone(), json_value_to_serde_value(value));
569 }
570 quality_rules.push(rule);
571 }
572 }
573 } else if let Some(obj) = quality_val.as_object() {
574 let mut rule = HashMap::new();
576 for (key, value) in obj {
577 rule.insert(key.clone(), json_value_to_serde_value(value));
578 }
579 quality_rules.push(rule);
580 } else if let Some(s) = quality_val.as_str() {
581 let mut rule = HashMap::new();
583 rule.insert("value".to_string(), Value::String(s.to_string()));
584 quality_rules.push(rule);
585 }
586 }
587
588 if let Some(metadata) = data.get("metadata")
590 && let Some(metadata_obj) = metadata.as_object()
591 && let Some(quality_val) = metadata_obj.get("quality")
592 {
593 if let Some(arr) = quality_val.as_array() {
594 for item in arr {
596 if let Some(obj) = item.as_object() {
597 let mut rule = HashMap::new();
598 for (key, value) in obj {
599 rule.insert(key.clone(), json_value_to_serde_value(value));
600 }
601 quality_rules.push(rule);
602 }
603 }
604 } else if let Some(obj) = quality_val.as_object() {
605 let mut rule = HashMap::new();
607 for (key, value) in obj {
608 rule.insert(key.clone(), json_value_to_serde_value(value));
609 }
610 quality_rules.push(rule);
611 } else if let Some(s) = quality_val.as_str() {
612 let mut rule = HashMap::new();
614 rule.insert("value".to_string(), Value::String(s.to_string()));
615 quality_rules.push(rule);
616 }
617 }
618
619 if let Some(tblprops) = data.get("tblproperties")
621 && let Some(obj) = tblprops.as_object()
622 {
623 for (key, value) in obj {
624 let mut rule = HashMap::new();
625 rule.insert("property".to_string(), Value::String(key.clone()));
626 rule.insert("value".to_string(), json_value_to_serde_value(value));
627 quality_rules.push(rule);
628 }
629 }
630
631 quality_rules
632 }
633
634 fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
642 let mut errors = Vec::new();
656
657 let changelog = data
658 .get("databaseChangeLog")
659 .and_then(|v| v.as_array())
660 .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
661
662 let mut table_name: Option<String> = None;
664 let mut columns: Vec<crate::models::column::Column> = Vec::new();
665
666 for entry in changelog {
667 if let Some(change_set) = entry.get("changeSet") {
669 let changes = if let Some(obj) = change_set.as_object() {
671 obj.get("changes")
672 .and_then(|v| v.as_array())
673 .cloned()
674 .unwrap_or_default()
675 } else if let Some(arr) = change_set.as_array() {
676 arr.clone()
678 } else {
679 Vec::new()
680 };
681
682 for ch in changes {
683 let create = ch.get("createTable").or_else(|| ch.get("create_table"));
684 if let Some(create) = create {
685 table_name = create
686 .get("tableName")
687 .or_else(|| create.get("table_name"))
688 .and_then(|v| v.as_str())
689 .map(|s| s.to_string());
690
691 if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
693 for col_entry in cols {
694 let col = col_entry.get("column").unwrap_or(col_entry);
695 let name = col
696 .get("name")
697 .and_then(|v| v.as_str())
698 .unwrap_or("")
699 .to_string();
700 let data_type = col
701 .get("type")
702 .and_then(|v| v.as_str())
703 .unwrap_or("")
704 .to_string();
705
706 if name.is_empty() {
707 errors.push(ParserError {
708 error_type: "validation_error".to_string(),
709 field: "columns.name".to_string(),
710 message: "Liquibase createTable column missing name"
711 .to_string(),
712 });
713 continue;
714 }
715
716 let mut column =
717 crate::models::column::Column::new(name, data_type);
718
719 if let Some(constraints) =
720 col.get("constraints").and_then(|v| v.as_object())
721 {
722 if let Some(pk) =
723 constraints.get("primaryKey").and_then(|v| v.as_bool())
724 {
725 column.primary_key = pk;
726 }
727 if let Some(nullable) =
728 constraints.get("nullable").and_then(|v| v.as_bool())
729 {
730 column.nullable = nullable;
731 }
732 }
733
734 columns.push(column);
735 }
736 }
737
738 break;
741 }
742 }
743 }
744 if table_name.is_some() {
745 break;
746 }
747 }
748
749 let table_name = table_name
750 .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
751 let table = Table::new(table_name, columns);
752 Ok((table, errors))
754 }
755
756 fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
758 let mut errors = Vec::new();
759
760 let table_name = data
762 .get("name")
763 .and_then(|v| v.as_str())
764 .map(|s| s.to_string())
765 .or_else(|| {
766 data.get("schema")
768 .and_then(|v| v.as_array())
769 .and_then(|arr| arr.first())
770 .and_then(|obj| obj.as_object())
771 .and_then(|obj| obj.get("name"))
772 .and_then(|v| v.as_str())
773 .map(|s| s.to_string())
774 })
775 .ok_or_else(|| {
776 anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
777 })?;
778
779 let schema = data
781 .get("schema")
782 .and_then(|v| v.as_array())
783 .ok_or_else(|| {
784 errors.push(ParserError {
785 error_type: "validation_error".to_string(),
786 field: "schema".to_string(),
787 message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
788 });
789 anyhow::anyhow!("Missing schema")
790 });
791
792 let schema = match schema {
793 Ok(s) if s.is_empty() => {
794 errors.push(ParserError {
795 error_type: "validation_error".to_string(),
796 field: "schema".to_string(),
797 message: "ODCS v3.0.x schema array is empty".to_string(),
798 });
799 let quality_rules = self.extract_quality_rules(data);
800 let table_uuid = self.extract_table_uuid(data);
801 let table = Table {
802 id: table_uuid,
803 name: table_name,
804 columns: Vec::new(),
805 database_type: None,
806 catalog_name: None,
807 schema_name: None,
808 medallion_layers: Vec::new(),
809 scd_pattern: None,
810 data_vault_classification: None,
811 modeling_level: None,
812 tags: Vec::<Tag>::new(),
813 odcl_metadata: HashMap::new(),
814 owner: None,
815 sla: None,
816 contact_details: None,
817 infrastructure_type: None,
818 notes: None,
819 position: None,
820 yaml_file_path: None,
821 drawio_cell_id: None,
822 quality: quality_rules,
823 errors: Vec::new(),
824 created_at: chrono::Utc::now(),
825 updated_at: chrono::Utc::now(),
826 };
827 return Ok((table, errors));
828 }
829 Ok(s) => s,
830 Err(_) => {
831 let quality_rules = self.extract_quality_rules(data);
832 let table_uuid = self.extract_table_uuid(data);
833 let table = Table {
834 id: table_uuid,
835 name: table_name,
836 columns: Vec::new(),
837 database_type: None,
838 catalog_name: None,
839 schema_name: None,
840 medallion_layers: Vec::new(),
841 scd_pattern: None,
842 data_vault_classification: None,
843 modeling_level: None,
844 tags: Vec::<Tag>::new(),
845 odcl_metadata: HashMap::new(),
846 owner: None,
847 sla: None,
848 contact_details: None,
849 infrastructure_type: None,
850 notes: None,
851 position: None,
852 yaml_file_path: None,
853 drawio_cell_id: None,
854 quality: quality_rules,
855 errors: Vec::new(),
856 created_at: chrono::Utc::now(),
857 updated_at: chrono::Utc::now(),
858 };
859 return Ok((table, errors));
860 }
861 };
862
863 let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
865 errors.push(ParserError {
866 error_type: "validation_error".to_string(),
867 field: "schema[0]".to_string(),
868 message: "First schema object must be a dictionary".to_string(),
869 });
870 anyhow::anyhow!("Invalid schema object")
871 })?;
872
873 let object_name = schema_object
874 .get("name")
875 .and_then(|v| v.as_str())
876 .unwrap_or(&table_name);
877
878 let mut columns = Vec::new();
880
881 if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
882 for (prop_name, prop_data) in properties_obj {
884 if let Some(prop_obj) = prop_data.as_object() {
885 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
886 Ok(mut cols) => columns.append(&mut cols),
887 Err(e) => {
888 errors.push(ParserError {
889 error_type: "property_parse_error".to_string(),
890 field: format!("Property '{}'", prop_name),
891 message: e.to_string(),
892 });
893 }
894 }
895 } else {
896 errors.push(ParserError {
897 error_type: "validation_error".to_string(),
898 field: format!("Property '{}'", prop_name),
899 message: format!("Property '{}' must be an object", prop_name),
900 });
901 }
902 }
903 } else if let Some(properties_arr) =
904 schema_object.get("properties").and_then(|v| v.as_array())
905 {
906 for (idx, prop_data) in properties_arr.iter().enumerate() {
908 if let Some(prop_obj) = prop_data.as_object() {
909 let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
912 Some(JsonValue::String(s)) => s.as_str(),
913 _ => {
914 errors.push(ParserError {
916 error_type: "validation_error".to_string(),
917 field: format!("Property[{}]", idx),
918 message: format!(
919 "Property[{}] missing required 'name' or 'id' field",
920 idx
921 ),
922 });
923 continue;
924 }
925 };
926
927 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
928 Ok(mut cols) => columns.append(&mut cols),
929 Err(e) => {
930 errors.push(ParserError {
931 error_type: "property_parse_error".to_string(),
932 field: format!("Property[{}] '{}'", idx, prop_name),
933 message: e.to_string(),
934 });
935 }
936 }
937 } else {
938 errors.push(ParserError {
939 error_type: "validation_error".to_string(),
940 field: format!("Property[{}]", idx),
941 message: format!("Property[{}] must be an object", idx),
942 });
943 }
944 }
945 } else {
946 errors.push(ParserError {
947 error_type: "validation_error".to_string(),
948 field: format!("Object '{}'", object_name),
949 message: format!(
950 "Object '{}' missing 'properties' field or properties is invalid",
951 object_name
952 ),
953 });
954 }
955
956 let (medallion_layers, scd_pattern, data_vault_classification, mut tags): (
958 _,
959 _,
960 _,
961 Vec<Tag>,
962 ) = self.extract_metadata_from_custom_properties(data);
963
964 let mut shared_domains: Vec<String> = Vec::new();
966 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
967 for prop in custom_props {
968 if let Some(prop_obj) = prop.as_object() {
969 let prop_key = prop_obj
970 .get("property")
971 .and_then(|v| v.as_str())
972 .unwrap_or("");
973 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
974 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
975 {
976 for item in arr {
977 if let Some(s) = item.as_str() {
978 shared_domains.push(s.to_string());
979 }
980 }
981 }
982 }
983 }
984 }
985
986 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
988 for item in tags_arr {
989 if let Some(s) = item.as_str() {
990 let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
992 if !tags.contains(&tag) {
993 tags.push(tag);
994 }
995 }
996 }
997 }
998
999 let database_type = self.extract_database_type_from_odcl_v3_servers(data);
1001
1002 let quality_rules = self.extract_quality_rules(data);
1004
1005 let mut odcl_metadata = HashMap::new();
1007 odcl_metadata.insert(
1008 "apiVersion".to_string(),
1009 json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
1010 );
1011 odcl_metadata.insert(
1012 "kind".to_string(),
1013 json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
1014 );
1015 odcl_metadata.insert(
1016 "id".to_string(),
1017 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1018 );
1019 odcl_metadata.insert(
1020 "version".to_string(),
1021 json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
1022 );
1023 odcl_metadata.insert(
1024 "status".to_string(),
1025 json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
1026 );
1027
1028 if let Some(servicelevels_val) = data.get("servicelevels") {
1030 odcl_metadata.insert(
1031 "servicelevels".to_string(),
1032 json_value_to_serde_value(servicelevels_val),
1033 );
1034 }
1035
1036 if let Some(links_val) = data.get("links") {
1038 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1039 }
1040
1041 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1043 odcl_metadata.insert(
1044 "domain".to_string(),
1045 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1046 );
1047 }
1048 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1049 odcl_metadata.insert(
1050 "dataProduct".to_string(),
1051 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1052 );
1053 }
1054 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1055 odcl_metadata.insert(
1056 "tenant".to_string(),
1057 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1058 );
1059 }
1060
1061 if let Some(desc_val) = data.get("description") {
1063 odcl_metadata.insert(
1064 "description".to_string(),
1065 json_value_to_serde_value(desc_val),
1066 );
1067 }
1068
1069 if let Some(pricing_val) = data.get("pricing") {
1071 odcl_metadata.insert(
1072 "pricing".to_string(),
1073 json_value_to_serde_value(pricing_val),
1074 );
1075 }
1076
1077 if let Some(team_val) = data.get("team") {
1079 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1080 }
1081
1082 if let Some(roles_val) = data.get("roles") {
1084 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1085 }
1086
1087 if let Some(terms_val) = data.get("terms") {
1089 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1090 }
1091
1092 if let Some(servers_val) = data.get("servers") {
1094 odcl_metadata.insert(
1095 "servers".to_string(),
1096 json_value_to_serde_value(servers_val),
1097 );
1098 }
1099
1100 if let Some(infrastructure_val) = data.get("infrastructure") {
1102 odcl_metadata.insert(
1103 "infrastructure".to_string(),
1104 json_value_to_serde_value(infrastructure_val),
1105 );
1106 }
1107
1108 if !shared_domains.is_empty() {
1110 let shared_domains_json: Vec<serde_json::Value> = shared_domains
1111 .iter()
1112 .map(|d| serde_json::Value::String(d.clone()))
1113 .collect();
1114 odcl_metadata.insert(
1115 "sharedDomains".to_string(),
1116 serde_json::Value::Array(shared_domains_json),
1117 );
1118 }
1119
1120 let table_uuid = self.extract_table_uuid(data);
1121
1122 let table = Table {
1123 id: table_uuid,
1124 name: table_name,
1125 columns,
1126 database_type,
1127 catalog_name: None,
1128 schema_name: None,
1129 medallion_layers,
1130 scd_pattern,
1131 data_vault_classification,
1132 modeling_level: None,
1133 tags,
1134 odcl_metadata,
1135 owner: None,
1136 sla: None,
1137 contact_details: None,
1138 infrastructure_type: None,
1139 notes: None,
1140 position: None,
1141 yaml_file_path: None,
1142 drawio_cell_id: None,
1143 quality: quality_rules,
1144 errors: Vec::new(),
1145 created_at: chrono::Utc::now(),
1146 updated_at: chrono::Utc::now(),
1147 };
1148
1149 info!(
1150 "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1151 table.name,
1152 errors.len()
1153 );
1154 Ok((table, errors))
1155 }
1156
1157 fn parse_odcl_v3_property(
1159 &self,
1160 prop_name: &str,
1161 prop_data: &serde_json::Map<String, JsonValue>,
1162 data: &JsonValue,
1163 ) -> Result<Vec<Column>> {
1164 let mut errors = Vec::new();
1166 self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
1167 }
1168
1169 fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
1172 if let Some(id_val) = data.get("id")
1174 && let Some(id_str) = id_val.as_str()
1175 {
1176 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1177 tracing::debug!(
1178 "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
1179 uuid
1180 );
1181 return uuid;
1182 } else {
1183 tracing::warn!(
1184 "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
1185 id_str
1186 );
1187 }
1188 }
1189
1190 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1192 for prop in custom_props {
1193 if let Some(prop_obj) = prop.as_object() {
1194 let prop_key = prop_obj
1195 .get("property")
1196 .and_then(|v| v.as_str())
1197 .unwrap_or("");
1198 if prop_key == "tableUuid"
1199 && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
1200 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1201 {
1202 tracing::debug!(
1203 "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
1204 uuid
1205 );
1206 return uuid;
1207 }
1208 }
1209 }
1210 }
1211
1212 if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
1214 && let Some(uuid_val) = metadata.get("tableUuid")
1215 && let Some(uuid_str) = uuid_val.as_str()
1216 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1217 {
1218 tracing::debug!(
1219 "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
1220 uuid
1221 );
1222 return uuid;
1223 }
1224
1225 let table_name = data
1227 .get("name")
1228 .and_then(|v| v.as_str())
1229 .unwrap_or("unknown");
1230 let new_uuid = crate::models::table::Table::generate_id(
1231 table_name, None, None, None, );
1235 tracing::warn!(
1236 "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
1237 table_name,
1238 new_uuid
1239 );
1240 new_uuid
1241 }
1242
1243 fn extract_metadata_from_custom_properties(
1245 &self,
1246 data: &JsonValue,
1247 ) -> (
1248 Vec<MedallionLayer>,
1249 Option<SCDPattern>,
1250 Option<DataVaultClassification>,
1251 Vec<Tag>,
1252 ) {
1253 let mut medallion_layers = Vec::new();
1254 let mut scd_pattern = None;
1255 let mut data_vault_classification = None;
1256 let mut tags: Vec<Tag> = Vec::new();
1257
1258 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1259 for prop in custom_props {
1260 if let Some(prop_obj) = prop.as_object() {
1261 let prop_key = prop_obj
1262 .get("property")
1263 .and_then(|v| v.as_str())
1264 .unwrap_or("");
1265 let prop_value = prop_obj.get("value");
1266
1267 match prop_key {
1268 "medallionLayers" | "medallion_layers" => {
1269 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1270 for item in arr {
1271 if let Some(s) = item.as_str()
1272 && let Ok(layer) = parse_medallion_layer(s)
1273 {
1274 medallion_layers.push(layer);
1275 }
1276 }
1277 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1278 for part in s.split(',') {
1280 if let Ok(layer) = parse_medallion_layer(part.trim()) {
1281 medallion_layers.push(layer);
1282 }
1283 }
1284 }
1285 }
1286 "scdPattern" | "scd_pattern" => {
1287 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1288 scd_pattern = parse_scd_pattern(s).ok();
1289 }
1290 }
1291 "dataVaultClassification" | "data_vault_classification" => {
1292 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1293 data_vault_classification = parse_data_vault_classification(s).ok();
1294 }
1295 }
1296 "tags" => {
1297 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1298 for item in arr {
1299 if let Some(s) = item.as_str() {
1300 if let Ok(tag) = Tag::from_str(s) {
1302 tags.push(tag);
1303 } else {
1304 tags.push(Tag::Simple(s.to_string()));
1305 }
1306 }
1307 }
1308 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1309 for part in s.split(',') {
1311 let part = part.trim();
1312 if let Ok(tag) = Tag::from_str(part) {
1313 tags.push(tag);
1314 } else {
1315 tags.push(Tag::Simple(part.to_string()));
1316 }
1317 }
1318 }
1319 }
1320 "sharedDomains" | "shared_domains" => {
1321 }
1324 _ => {}
1325 }
1326 }
1327 }
1328 }
1329
1330 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1332 for item in tags_arr {
1333 if let Some(s) = item.as_str() {
1334 let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
1336 if !tags.contains(&tag) {
1337 tags.push(tag);
1338 }
1339 }
1340 }
1341 }
1342
1343 (
1344 medallion_layers,
1345 scd_pattern,
1346 data_vault_classification,
1347 tags,
1348 )
1349 }
1350
1351 fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
1353 if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
1355 && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
1356 {
1357 return server_obj
1358 .get("type")
1359 .and_then(|v| v.as_str())
1360 .and_then(|s| self.parse_database_type(s));
1361 }
1362 None
1363 }
1364
1365 fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1367 let mut errors = Vec::new();
1368
1369 let models = data
1371 .get("models")
1372 .and_then(|v| v.as_object())
1373 .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
1374
1375 let (model_name, model_data) = models
1378 .iter()
1379 .next()
1380 .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
1381
1382 let model_data = model_data
1383 .as_object()
1384 .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
1385
1386 let fields = model_data
1388 .get("fields")
1389 .and_then(|v| v.as_object())
1390 .ok_or_else(|| {
1391 errors.push(ParserError {
1392 error_type: "validation_error".to_string(),
1393 field: format!("Model '{}'", model_name),
1394 message: format!("Model '{}' missing 'fields' field", model_name),
1395 });
1396 anyhow::anyhow!("Missing fields")
1397 });
1398
1399 let fields = match fields {
1400 Ok(f) => f,
1401 Err(_) => {
1402 let quality_rules = self.extract_quality_rules(data);
1404 let table_uuid = self.extract_table_uuid(data);
1405 let table = Table {
1406 id: table_uuid,
1407 name: model_name.clone(),
1408 columns: Vec::new(),
1409 database_type: None,
1410 catalog_name: None,
1411 schema_name: None,
1412 medallion_layers: Vec::new(),
1413 scd_pattern: None,
1414 data_vault_classification: None,
1415 modeling_level: None,
1416 tags: Vec::<Tag>::new(),
1417 odcl_metadata: HashMap::new(),
1418 owner: None,
1419 sla: None,
1420 contact_details: None,
1421 infrastructure_type: None,
1422 notes: None,
1423 position: None,
1424 yaml_file_path: None,
1425 drawio_cell_id: None,
1426 quality: quality_rules,
1427 errors: Vec::new(),
1428 created_at: chrono::Utc::now(),
1429 updated_at: chrono::Utc::now(),
1430 };
1431 return Ok((table, errors));
1432 }
1433 };
1434
1435 let mut columns = Vec::new();
1437 for (field_name, field_data) in fields {
1438 if let Some(field_obj) = field_data.as_object() {
1439 match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
1440 Ok(mut cols) => columns.append(&mut cols),
1441 Err(e) => {
1442 errors.push(ParserError {
1443 error_type: "field_parse_error".to_string(),
1444 field: format!("Field '{}'", field_name),
1445 message: e.to_string(),
1446 });
1447 }
1448 }
1449 } else {
1450 errors.push(ParserError {
1451 error_type: "validation_error".to_string(),
1452 field: format!("Field '{}'", field_name),
1453 message: format!("Field '{}' must be an object", field_name),
1454 });
1455 }
1456 }
1457
1458 let mut odcl_metadata = HashMap::new();
1461
1462 if let Some(info_val) = data.get("info") {
1465 let info_json_value = json_value_to_serde_value(info_val);
1467 odcl_metadata.insert("info".to_string(), info_json_value);
1468 }
1469
1470 odcl_metadata.insert(
1471 "dataContractSpecification".to_string(),
1472 json_value_to_serde_value(
1473 data.get("dataContractSpecification")
1474 .unwrap_or(&JsonValue::Null),
1475 ),
1476 );
1477 odcl_metadata.insert(
1478 "id".to_string(),
1479 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1480 );
1481 if let Some(servicelevels_val) = data.get("servicelevels") {
1485 odcl_metadata.insert(
1486 "servicelevels".to_string(),
1487 json_value_to_serde_value(servicelevels_val),
1488 );
1489 }
1490
1491 if let Some(links_val) = data.get("links") {
1493 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1494 }
1495
1496 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1498 odcl_metadata.insert(
1499 "domain".to_string(),
1500 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1501 );
1502 }
1503 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1504 odcl_metadata.insert(
1505 "dataProduct".to_string(),
1506 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1507 );
1508 }
1509 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1510 odcl_metadata.insert(
1511 "tenant".to_string(),
1512 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1513 );
1514 }
1515
1516 if let Some(desc_val) = data.get("description") {
1518 odcl_metadata.insert(
1519 "description".to_string(),
1520 json_value_to_serde_value(desc_val),
1521 );
1522 }
1523
1524 if let Some(pricing_val) = data.get("pricing") {
1526 odcl_metadata.insert(
1527 "pricing".to_string(),
1528 json_value_to_serde_value(pricing_val),
1529 );
1530 }
1531
1532 if let Some(team_val) = data.get("team") {
1534 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1535 }
1536
1537 if let Some(roles_val) = data.get("roles") {
1539 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1540 }
1541
1542 if let Some(terms_val) = data.get("terms") {
1544 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1545 }
1546
1547 if let Some(servers_val) = data.get("servers") {
1549 odcl_metadata.insert(
1550 "servers".to_string(),
1551 json_value_to_serde_value(servers_val),
1552 );
1553 }
1554
1555 if let Some(infrastructure_val) = data.get("infrastructure") {
1557 odcl_metadata.insert(
1558 "infrastructure".to_string(),
1559 json_value_to_serde_value(infrastructure_val),
1560 );
1561 }
1562
1563 let database_type = self.extract_database_type_from_servers(data);
1565
1566 let (catalog_name, schema_name) = self.extract_catalog_schema(data);
1568
1569 let mut shared_domains: Vec<String> = Vec::new();
1571 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1572 for prop in custom_props {
1573 if let Some(prop_obj) = prop.as_object() {
1574 let prop_key = prop_obj
1575 .get("property")
1576 .and_then(|v| v.as_str())
1577 .unwrap_or("");
1578 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1579 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1580 {
1581 for item in arr {
1582 if let Some(s) = item.as_str() {
1583 shared_domains.push(s.to_string());
1584 }
1585 }
1586 }
1587 }
1588 }
1589 }
1590
1591 let mut tags: Vec<Tag> = Vec::new();
1593 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1594 for item in tags_arr {
1595 if let Some(s) = item.as_str() {
1596 if let Ok(tag) = Tag::from_str(s) {
1598 tags.push(tag);
1599 } else {
1600 tags.push(crate::models::Tag::Simple(s.to_string()));
1602 }
1603 }
1604 }
1605 }
1606
1607 let quality_rules = self.extract_quality_rules(data);
1609
1610 if !shared_domains.is_empty() {
1612 let shared_domains_json: Vec<serde_json::Value> = shared_domains
1613 .iter()
1614 .map(|d| serde_json::Value::String(d.clone()))
1615 .collect();
1616 odcl_metadata.insert(
1617 "sharedDomains".to_string(),
1618 serde_json::Value::Array(shared_domains_json),
1619 );
1620 }
1621
1622 let table_uuid = self.extract_table_uuid(data);
1623
1624 let table = Table {
1625 id: table_uuid,
1626 name: model_name.clone(),
1627 columns,
1628 database_type,
1629 catalog_name,
1630 schema_name,
1631 medallion_layers: Vec::new(),
1632 scd_pattern: None,
1633 data_vault_classification: None,
1634 modeling_level: None,
1635 tags,
1636 odcl_metadata,
1637 owner: None,
1638 sla: None,
1639 contact_details: None,
1640 infrastructure_type: None,
1641 notes: None,
1642 position: None,
1643 yaml_file_path: None,
1644 drawio_cell_id: None,
1645 quality: quality_rules,
1646 errors: Vec::new(),
1647 created_at: chrono::Utc::now(),
1648 updated_at: chrono::Utc::now(),
1649 };
1650
1651 info!(
1652 "Parsed Data Contract table: {} with {} warnings/errors",
1653 model_name,
1654 errors.len()
1655 );
1656 Ok((table, errors))
1657 }
1658
1659 fn parse_data_contract_field(
1661 &self,
1662 field_name: &str,
1663 field_data: &serde_json::Map<String, JsonValue>,
1664 data: &JsonValue,
1665 errors: &mut Vec<ParserError>,
1666 ) -> Result<Vec<Column>> {
1667 let mut columns = Vec::new();
1668
1669 let extract_quality_from_obj =
1671 |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
1672 let mut quality_rules = Vec::new();
1673 if let Some(quality_val) = obj.get("quality") {
1674 if let Some(arr) = quality_val.as_array() {
1675 for item in arr {
1677 if let Some(rule_obj) = item.as_object() {
1678 let mut rule = HashMap::new();
1679 for (key, value) in rule_obj {
1680 rule.insert(key.clone(), json_value_to_serde_value(value));
1681 }
1682 quality_rules.push(rule);
1683 }
1684 }
1685 } else if let Some(rule_obj) = quality_val.as_object() {
1686 let mut rule = HashMap::new();
1688 for (key, value) in rule_obj {
1689 rule.insert(key.clone(), json_value_to_serde_value(value));
1690 }
1691 quality_rules.push(rule);
1692 }
1693 }
1694 quality_rules
1695 };
1696
1697 let description = field_data
1699 .get("description")
1700 .and_then(|v| v.as_str())
1701 .unwrap_or("")
1702 .to_string();
1703
1704 let mut quality_rules = extract_quality_from_obj(field_data);
1706
1707 if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
1709 let ref_path = Some(ref_str.to_string());
1711
1712 if let Some(definition) = resolve_ref(ref_str, data) {
1713 if quality_rules.is_empty() {
1717 if let Some(def_obj) = definition.as_object() {
1718 quality_rules = extract_quality_from_obj(def_obj);
1719 }
1720 } else {
1721 if let Some(def_obj) = definition.as_object() {
1723 let def_quality = extract_quality_from_obj(def_obj);
1724 quality_rules.extend(def_quality);
1726 }
1727 }
1728
1729 let required = field_data
1730 .get("required")
1731 .and_then(|v| v.as_bool())
1732 .unwrap_or(false);
1733
1734 let has_nested = definition
1740 .get("type")
1741 .and_then(|v| v.as_str())
1742 .map(|s| s == "object")
1743 .unwrap_or(false)
1744 || definition.get("properties").is_some()
1745 || definition.get("fields").is_some();
1746
1747 if has_nested {
1748 if let Some(properties) =
1750 definition.get("properties").and_then(|v| v.as_object())
1751 {
1752 let nested_required: Vec<String> = definition
1754 .get("required")
1755 .and_then(|v| v.as_array())
1756 .map(|arr| {
1757 arr.iter()
1758 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1759 .collect()
1760 })
1761 .unwrap_or_default();
1762
1763 for (nested_name, nested_schema) in properties {
1764 let nested_required_field = nested_required.contains(nested_name);
1765 expand_nested_column(
1766 &format!("{}.{}", field_name, nested_name),
1767 nested_schema,
1768 !nested_required_field,
1769 &mut columns,
1770 errors,
1771 );
1772 }
1773 } else if let Some(fields) =
1774 definition.get("fields").and_then(|v| v.as_object())
1775 {
1776 for (nested_name, nested_schema) in fields {
1778 expand_nested_column(
1779 &format!("{}.{}", field_name, nested_name),
1780 nested_schema,
1781 true, &mut columns,
1783 errors,
1784 );
1785 }
1786 } else {
1787 let def_physical_type = definition
1789 .get("physicalType")
1790 .and_then(|v| v.as_str())
1791 .map(|s| s.to_string());
1792 columns.push(Column {
1793 name: field_name.to_string(),
1794 data_type: "OBJECT".to_string(),
1795 physical_type: def_physical_type,
1796 nullable: !required,
1797 description: if description.is_empty() {
1798 definition
1799 .get("description")
1800 .and_then(|v| v.as_str())
1801 .unwrap_or("")
1802 .to_string()
1803 } else {
1804 description.clone()
1805 },
1806 quality: quality_rules.clone(),
1807 relationships: ref_to_relationships(&ref_path),
1808 ..Default::default()
1809 });
1810 }
1811 } else {
1812 let def_type = definition
1814 .get("type")
1815 .and_then(|v| v.as_str())
1816 .unwrap_or("STRING")
1817 .to_uppercase();
1818
1819 let enum_values = definition
1820 .get("enum")
1821 .and_then(|v| v.as_array())
1822 .map(|arr| {
1823 arr.iter()
1824 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1825 .collect()
1826 })
1827 .unwrap_or_default();
1828
1829 let def_physical_type = definition
1830 .get("physicalType")
1831 .and_then(|v| v.as_str())
1832 .map(|s| s.to_string());
1833
1834 columns.push(Column {
1835 name: field_name.to_string(),
1836 data_type: def_type,
1837 physical_type: def_physical_type,
1838 nullable: !required,
1839 description: if description.is_empty() {
1840 definition
1841 .get("description")
1842 .and_then(|v| v.as_str())
1843 .unwrap_or("")
1844 .to_string()
1845 } else {
1846 description
1847 },
1848 quality: quality_rules,
1849 relationships: ref_to_relationships(&ref_path),
1850 enum_values,
1851 ..Default::default()
1852 });
1853 }
1854 return Ok(columns);
1855 } else {
1856 let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
1858 let mut error_map = HashMap::new();
1859 error_map.insert("type".to_string(), serde_json::json!("validation_error"));
1860 error_map.insert("field".to_string(), serde_json::json!("data_type"));
1861 error_map.insert(
1862 "message".to_string(),
1863 serde_json::json!(format!(
1864 "Field '{}' references undefined definition: {}",
1865 field_name, ref_str
1866 )),
1867 );
1868 col_errors.push(error_map);
1869 let field_physical_type = field_data
1870 .get("physicalType")
1871 .and_then(|v| v.as_str())
1872 .map(|s| s.to_string());
1873 columns.push(Column {
1874 name: field_name.to_string(),
1875 data_type: "OBJECT".to_string(),
1876 physical_type: field_physical_type,
1877 description,
1878 errors: col_errors,
1879 relationships: ref_to_relationships(&Some(ref_str.to_string())),
1880 ..Default::default()
1881 });
1882 return Ok(columns);
1883 }
1884 }
1885
1886 let field_type_str = field_data
1889 .get("logicalType")
1890 .and_then(|v| v.as_str())
1891 .or_else(|| field_data.get("type").and_then(|v| v.as_str()))
1892 .unwrap_or("STRING");
1893
1894 if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
1896 match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
1897 Ok(nested_cols) if !nested_cols.is_empty() => {
1898 let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
1900 "ARRAY<STRUCT<...>>".to_string()
1901 } else {
1902 "STRUCT<...>".to_string()
1903 };
1904
1905 let struct_physical_type = field_data
1906 .get("physicalType")
1907 .and_then(|v| v.as_str())
1908 .map(|s| s.to_string());
1909
1910 columns.push(Column {
1912 name: field_name.to_string(),
1913 data_type: parent_data_type,
1914 physical_type: struct_physical_type,
1915 nullable: !field_data
1916 .get("required")
1917 .and_then(|v| v.as_bool())
1918 .unwrap_or(false),
1919 description: description.clone(),
1920 quality: quality_rules.clone(),
1921 relationships: ref_to_relationships(
1922 &field_data
1923 .get("$ref")
1924 .and_then(|v| v.as_str())
1925 .map(|s| s.to_string()),
1926 ),
1927 ..Default::default()
1928 });
1929
1930 columns.extend(nested_cols);
1932 return Ok(columns);
1933 }
1934 Ok(_) | Err(_) => {
1935 }
1937 }
1938 }
1939
1940 let field_type = normalize_data_type(field_type_str);
1941
1942 if field_type == "ARRAY" {
1944 let items = field_data.get("items");
1945 if let Some(items_val) = items {
1946 if let Some(items_obj) = items_val.as_object() {
1947 let items_type = items_obj
1950 .get("logicalType")
1951 .and_then(|v| v.as_str())
1952 .or_else(|| items_obj.get("type").and_then(|v| v.as_str()));
1953
1954 let normalized_items_type = match items_type {
1956 Some("object") | Some("struct") => Some("object"),
1957 Some("array") => Some("array"),
1958 Some("string") | Some("varchar") | Some("char") | Some("text") => {
1959 Some("string")
1960 }
1961 Some("integer") | Some("int") | Some("bigint") | Some("smallint")
1962 | Some("tinyint") => Some("integer"),
1963 Some("number") | Some("decimal") | Some("double") | Some("float")
1964 | Some("numeric") => Some("number"),
1965 Some("boolean") | Some("bool") => Some("boolean"),
1966 Some("date") => Some("date"),
1967 Some("timestamp") | Some("datetime") => Some("timestamp"),
1968 Some("time") => Some("time"),
1969 other => other,
1970 };
1971
1972 if items_obj.get("fields").is_some()
1973 || items_obj.get("properties").is_some()
1974 || normalized_items_type == Some("object")
1975 {
1976 let array_physical_type = field_data
1978 .get("physicalType")
1979 .and_then(|v| v.as_str())
1980 .map(|s| s.to_string());
1981 columns.push(Column {
1982 name: field_name.to_string(),
1983 data_type: "ARRAY<OBJECT>".to_string(),
1984 physical_type: array_physical_type,
1985 nullable: !field_data
1986 .get("required")
1987 .and_then(|v| v.as_bool())
1988 .unwrap_or(false),
1989 description: field_data
1990 .get("description")
1991 .and_then(|v| v.as_str())
1992 .unwrap_or("")
1993 .to_string(),
1994 ..Default::default()
1995 });
1996
1997 let properties_obj =
2000 items_obj.get("properties").and_then(|v| v.as_object());
2001 let properties_arr = items_obj.get("properties").and_then(|v| v.as_array());
2002 let fields_obj = items_obj.get("fields").and_then(|v| v.as_object());
2003
2004 if let Some(fields_map) = properties_obj.or(fields_obj) {
2005 for (nested_field_name, nested_field_data) in fields_map {
2007 if let Some(nested_field_obj) = nested_field_data.as_object() {
2008 let nested_field_type = nested_field_obj
2010 .get("logicalType")
2011 .and_then(|v| v.as_str())
2012 .or_else(|| {
2013 nested_field_obj.get("type").and_then(|v| v.as_str())
2014 })
2015 .unwrap_or("STRING");
2016
2017 let nested_col_name =
2019 format!("{}.[].{}", field_name, nested_field_name);
2020 let mut local_errors = Vec::new();
2021 match self.parse_data_contract_field(
2022 &nested_col_name,
2023 nested_field_obj,
2024 data,
2025 &mut local_errors,
2026 ) {
2027 Ok(mut nested_cols) => {
2028 columns.append(&mut nested_cols);
2031 }
2032 Err(_) => {
2033 let nested_physical_type = nested_field_obj
2035 .get("physicalType")
2036 .and_then(|v| v.as_str())
2037 .map(|s| s.to_string());
2038 columns.push(Column {
2039 name: nested_col_name,
2040 data_type: nested_field_type.to_uppercase(),
2041 physical_type: nested_physical_type,
2042 nullable: !nested_field_obj
2043 .get("required")
2044 .and_then(|v| v.as_bool())
2045 .unwrap_or(false),
2046 description: nested_field_obj
2047 .get("description")
2048 .and_then(|v| v.as_str())
2049 .unwrap_or("")
2050 .to_string(),
2051 ..Default::default()
2052 });
2053 }
2054 }
2055 }
2056 }
2057 } else if let Some(properties_list) = properties_arr {
2058 let mut local_errors = Vec::new();
2060 for prop_data in properties_list {
2061 if let Some(prop_obj) = prop_data.as_object() {
2062 let nested_field_name = prop_obj
2064 .get("name")
2065 .or_else(|| prop_obj.get("id"))
2066 .and_then(|v| v.as_str())
2067 .unwrap_or("");
2068
2069 if !nested_field_name.is_empty() {
2070 let nested_col_name =
2072 format!("{}.[].{}", field_name, nested_field_name);
2073 match self.parse_data_contract_field(
2074 &nested_col_name,
2075 prop_obj,
2076 data,
2077 &mut local_errors,
2078 ) {
2079 Ok(mut nested_cols) => {
2080 columns.append(&mut nested_cols);
2083 }
2084 Err(_) => {
2085 let nested_field_type = prop_obj
2088 .get("logicalType")
2089 .and_then(|v| v.as_str())
2090 .or_else(|| {
2091 prop_obj
2092 .get("type")
2093 .and_then(|v| v.as_str())
2094 })
2095 .unwrap_or("STRING");
2096 let nested_physical_type = prop_obj
2097 .get("physicalType")
2098 .and_then(|v| v.as_str())
2099 .map(|s| s.to_string());
2100 columns.push(Column {
2101 name: nested_col_name,
2102 data_type: nested_field_type.to_uppercase(),
2103 physical_type: nested_physical_type,
2104 nullable: !prop_obj
2105 .get("required")
2106 .and_then(|v| v.as_bool())
2107 .unwrap_or(false),
2108 description: prop_obj
2109 .get("description")
2110 .and_then(|v| v.as_str())
2111 .unwrap_or("")
2112 .to_string(),
2113 ..Default::default()
2114 });
2115 }
2116 }
2117 }
2118 }
2119 }
2120 }
2121
2122 return Ok(columns);
2123 } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
2124 let array_physical_type = field_data
2126 .get("physicalType")
2127 .and_then(|v| v.as_str())
2128 .map(|s| s.to_string());
2129 columns.push(Column {
2130 name: field_name.to_string(),
2131 data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
2132 physical_type: array_physical_type,
2133 nullable: !field_data
2134 .get("required")
2135 .and_then(|v| v.as_bool())
2136 .unwrap_or(false),
2137 description: description.clone(),
2138 quality: quality_rules.clone(),
2139 relationships: ref_to_relationships(
2140 &field_data
2141 .get("$ref")
2142 .and_then(|v| v.as_str())
2143 .map(|s| s.to_string()),
2144 ),
2145 ..Default::default()
2146 });
2147 return Ok(columns);
2148 }
2149 } else if let Some(item_type_str) = items_val.as_str() {
2150 let array_physical_type = field_data
2152 .get("physicalType")
2153 .and_then(|v| v.as_str())
2154 .map(|s| s.to_string());
2155 columns.push(Column {
2156 name: field_name.to_string(),
2157 data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
2158 physical_type: array_physical_type,
2159 nullable: !field_data
2160 .get("required")
2161 .and_then(|v| v.as_bool())
2162 .unwrap_or(false),
2163 description: description.clone(),
2164 quality: quality_rules.clone(),
2165 relationships: ref_to_relationships(
2166 &field_data
2167 .get("$ref")
2168 .and_then(|v| v.as_str())
2169 .map(|s| s.to_string()),
2170 ),
2171 ..Default::default()
2172 });
2173 return Ok(columns);
2174 }
2175 }
2176 let array_physical_type = field_data
2178 .get("physicalType")
2179 .and_then(|v| v.as_str())
2180 .map(|s| s.to_string());
2181 columns.push(Column {
2182 name: field_name.to_string(),
2183 data_type: "ARRAY<STRING>".to_string(),
2184 physical_type: array_physical_type,
2185 nullable: !field_data
2186 .get("required")
2187 .and_then(|v| v.as_bool())
2188 .unwrap_or(false),
2189 description: description.clone(),
2190 quality: quality_rules.clone(),
2191 relationships: ref_to_relationships(
2192 &field_data
2193 .get("$ref")
2194 .and_then(|v| v.as_str())
2195 .map(|s| s.to_string()),
2196 ),
2197 ..Default::default()
2198 });
2199 return Ok(columns);
2200 }
2201
2202 let nested_fields_obj = field_data
2206 .get("properties")
2207 .and_then(|v| v.as_object())
2208 .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
2209 let nested_fields_arr = field_data.get("properties").and_then(|v| v.as_array());
2210
2211 if field_type == "OBJECT" && (nested_fields_obj.is_some() || nested_fields_arr.is_some()) {
2212 let parent_physical_type = field_data
2216 .get("physicalType")
2217 .and_then(|v| v.as_str())
2218 .map(|s| s.to_string());
2219
2220 columns.push(Column {
2222 name: field_name.to_string(),
2223 data_type: "OBJECT".to_string(),
2224 physical_type: parent_physical_type,
2225 nullable: !field_data
2226 .get("required")
2227 .and_then(|v| v.as_bool())
2228 .unwrap_or(false),
2229 description: description.clone(),
2230 quality: quality_rules.clone(),
2231 relationships: ref_to_relationships(
2232 &field_data
2233 .get("$ref")
2234 .and_then(|v| v.as_str())
2235 .map(|s| s.to_string()),
2236 ),
2237 ..Default::default()
2238 });
2239
2240 if let Some(fields_obj) = nested_fields_obj {
2242 for (nested_field_name, nested_field_data) in fields_obj {
2244 if let Some(nested_field_obj) = nested_field_data.as_object() {
2245 let nested_field_type = nested_field_obj
2246 .get("logicalType")
2247 .and_then(|v| v.as_str())
2248 .or_else(|| nested_field_obj.get("type").and_then(|v| v.as_str()))
2249 .unwrap_or("STRING");
2250
2251 let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2253 match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data)
2254 {
2255 Ok(mut nested_cols) => {
2256 columns.append(&mut nested_cols);
2259 }
2260 Err(_) => {
2261 let nested_physical_type = nested_field_obj
2263 .get("physicalType")
2264 .and_then(|v| v.as_str())
2265 .map(|s| s.to_string());
2266 columns.push(Column {
2267 name: nested_col_name,
2268 data_type: nested_field_type.to_uppercase(),
2269 physical_type: nested_physical_type,
2270 nullable: !nested_field_obj
2271 .get("required")
2272 .and_then(|v| v.as_bool())
2273 .unwrap_or(false),
2274 description: nested_field_obj
2275 .get("description")
2276 .and_then(|v| v.as_str())
2277 .unwrap_or("")
2278 .to_string(),
2279 ..Default::default()
2280 });
2281 }
2282 }
2283 }
2284 }
2285 } else if let Some(fields_arr) = nested_fields_arr {
2286 for prop_data in fields_arr {
2288 if let Some(prop_obj) = prop_data.as_object() {
2289 let nested_field_name = prop_obj
2291 .get("name")
2292 .or_else(|| prop_obj.get("id"))
2293 .and_then(|v| v.as_str())
2294 .unwrap_or("");
2295
2296 if !nested_field_name.is_empty() {
2297 let nested_field_type = prop_obj
2298 .get("logicalType")
2299 .and_then(|v| v.as_str())
2300 .or_else(|| prop_obj.get("type").and_then(|v| v.as_str()))
2301 .unwrap_or("STRING");
2302
2303 let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2305 match self.parse_odcl_v3_property(&nested_col_name, prop_obj, data) {
2306 Ok(mut nested_cols) => {
2307 columns.append(&mut nested_cols);
2310 }
2311 Err(_) => {
2312 let nested_physical_type = prop_obj
2314 .get("physicalType")
2315 .and_then(|v| v.as_str())
2316 .map(|s| s.to_string());
2317 columns.push(Column {
2318 name: nested_col_name,
2319 data_type: nested_field_type.to_uppercase(),
2320 physical_type: nested_physical_type,
2321 nullable: !prop_obj
2322 .get("required")
2323 .and_then(|v| v.as_bool())
2324 .unwrap_or(false),
2325 description: prop_obj
2326 .get("description")
2327 .and_then(|v| v.as_str())
2328 .unwrap_or("")
2329 .to_string(),
2330 ..Default::default()
2331 });
2332 }
2333 }
2334 }
2335 }
2336 }
2337 }
2338
2339 return Ok(columns);
2340 }
2341
2342 let required = field_data
2344 .get("required")
2345 .and_then(|v| v.as_bool())
2346 .unwrap_or(false);
2347
2348 let field_description = if description.is_empty() {
2350 field_data
2351 .get("description")
2352 .and_then(|v| v.as_str())
2353 .unwrap_or("")
2354 .to_string()
2355 } else {
2356 description
2357 };
2358
2359 let mut column_quality_rules = quality_rules;
2361
2362 if column_quality_rules.is_empty()
2364 && let Some(quality_val) = field_data.get("quality")
2365 {
2366 if let Some(arr) = quality_val.as_array() {
2367 for item in arr {
2369 if let Some(obj) = item.as_object() {
2370 let mut rule = HashMap::new();
2371 for (key, value) in obj {
2372 rule.insert(key.clone(), json_value_to_serde_value(value));
2373 }
2374 column_quality_rules.push(rule);
2375 }
2376 }
2377 } else if let Some(obj) = quality_val.as_object() {
2378 let mut rule = HashMap::new();
2380 for (key, value) in obj {
2381 rule.insert(key.clone(), json_value_to_serde_value(value));
2382 }
2383 column_quality_rules.push(rule);
2384 }
2385 }
2386
2387 let physical_type = field_data
2393 .get("physicalType")
2394 .and_then(|v| v.as_str())
2395 .map(|s| s.to_string());
2396
2397 columns.push(Column {
2398 name: field_name.to_string(),
2399 data_type: field_type,
2400 physical_type,
2401 nullable: !required,
2402 primary_key: field_data
2403 .get("primaryKey")
2404 .and_then(|v| v.as_bool())
2405 .unwrap_or(false),
2406 foreign_key: self.parse_foreign_key_from_data_contract(field_data),
2407 description: field_description,
2408 quality: column_quality_rules,
2409 relationships: self.parse_relationships_from_field(field_data),
2410 ..Default::default()
2411 });
2412
2413 Ok(columns)
2414 }
2415
2416 fn parse_foreign_key_from_data_contract(
2418 &self,
2419 field_data: &serde_json::Map<String, JsonValue>,
2420 ) -> Option<ForeignKey> {
2421 field_data
2422 .get("foreignKey")
2423 .and_then(|v| v.as_object())
2424 .map(|fk_obj| ForeignKey {
2425 table_id: fk_obj
2426 .get("table")
2427 .or_else(|| fk_obj.get("table_id"))
2428 .and_then(|v| v.as_str())
2429 .unwrap_or("")
2430 .to_string(),
2431 column_name: fk_obj
2432 .get("column")
2433 .or_else(|| fk_obj.get("column_name"))
2434 .and_then(|v| v.as_str())
2435 .unwrap_or("")
2436 .to_string(),
2437 })
2438 }
2439
2440 fn parse_relationships_from_field(
2443 &self,
2444 field_data: &serde_json::Map<String, JsonValue>,
2445 ) -> Vec<PropertyRelationship> {
2446 let mut relationships = Vec::new();
2447
2448 if let Some(rels_array) = field_data.get("relationships").and_then(|v| v.as_array()) {
2450 for rel in rels_array {
2451 if let Some(rel_obj) = rel.as_object() {
2452 let rel_type = rel_obj
2453 .get("type")
2454 .and_then(|v| v.as_str())
2455 .unwrap_or("foreignKey")
2456 .to_string();
2457 let to = rel_obj
2458 .get("to")
2459 .and_then(|v| v.as_str())
2460 .unwrap_or("")
2461 .to_string();
2462
2463 if !to.is_empty() {
2464 relationships.push(PropertyRelationship {
2465 relationship_type: rel_type,
2466 to,
2467 });
2468 }
2469 }
2470 }
2471 }
2472
2473 if relationships.is_empty()
2475 && let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str())
2476 {
2477 let to = if ref_str.starts_with("#/definitions/") {
2479 let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
2480 format!("definitions/{}", def_path)
2481 } else if ref_str.starts_with("#/") {
2482 ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
2483 } else {
2484 ref_str.to_string()
2485 };
2486
2487 relationships.push(PropertyRelationship {
2488 relationship_type: "foreignKey".to_string(),
2489 to,
2490 });
2491 }
2492
2493 relationships
2494 }
2495
2496 fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2498 if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
2500 if let Some((_, server_data)) = servers_obj.iter().next()
2502 && let Some(server_obj) = server_data.as_object()
2503 {
2504 return server_obj
2505 .get("type")
2506 .and_then(|v| v.as_str())
2507 .and_then(|s| self.parse_database_type(s));
2508 }
2509 } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
2510 if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
2512 return server_obj
2513 .get("type")
2514 .and_then(|v| v.as_str())
2515 .and_then(|s| self.parse_database_type(s));
2516 }
2517 }
2518 None
2519 }
2520
2521 fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
2523 match s.to_lowercase().as_str() {
2524 "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
2525 "postgres" | "postgresql" => Some(DatabaseType::Postgres),
2526 "mysql" => Some(DatabaseType::Mysql),
2527 "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
2528 "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
2529 _ => None,
2530 }
2531 }
2532
2533 pub fn parse_struct_type_from_string(
2537 &self,
2538 field_name: &str,
2539 type_str: &str,
2540 field_data: &serde_json::Map<String, JsonValue>,
2541 ) -> Result<Vec<Column>> {
2542 let mut columns = Vec::new();
2543
2544 let normalized_type = type_str
2546 .lines()
2547 .map(|line| line.trim())
2548 .filter(|line| !line.is_empty())
2549 .collect::<Vec<_>>()
2550 .join(" ");
2551
2552 let type_str_upper = normalized_type.to_uppercase();
2553
2554 let is_array = type_str_upper.starts_with("ARRAY<");
2556 let struct_start = type_str_upper.find("STRUCT<");
2557
2558 if let Some(start_pos) = struct_start {
2559 let struct_content_start = start_pos + 7; let struct_content = &normalized_type[struct_content_start..];
2563
2564 let mut depth = 1;
2567 let mut end_pos = None;
2568 for (i, ch) in struct_content.char_indices() {
2569 match ch {
2570 '<' => depth += 1,
2571 '>' => {
2572 depth -= 1;
2573 if depth == 0 {
2574 end_pos = Some(i);
2575 break;
2576 }
2577 }
2578 _ => {}
2579 }
2580 }
2581
2582 let struct_fields_str = if let Some(end) = end_pos {
2584 &struct_content[..end]
2585 } else {
2586 struct_content.trim_end_matches('>').trim()
2588 };
2589
2590 let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
2592
2593 for (nested_name, nested_type) in fields {
2597 let nested_type_upper = nested_type.to_uppercase();
2598 let nested_col_name = if is_array {
2599 format!("{}.[].{}", field_name, nested_name)
2600 } else {
2601 format!("{}.{}", field_name, nested_name)
2602 };
2603
2604 let is_nested_struct = nested_type_upper.starts_with("STRUCT<");
2607 let is_nested_array_struct = nested_type_upper.starts_with("ARRAY<STRUCT<");
2608
2609 if is_nested_struct || is_nested_array_struct {
2610 match self.parse_struct_type_from_string(
2618 &nested_col_name,
2619 &nested_type,
2620 field_data,
2621 ) {
2622 Ok(nested_cols) => {
2623 columns.extend(nested_cols);
2630 }
2631 Err(_) => {
2632 let fallback_data_type = if is_nested_array_struct {
2634 "ARRAY<STRUCT<...>>".to_string()
2635 } else {
2636 "STRUCT<...>".to_string()
2637 };
2638 let nested_physical_type = field_data
2639 .get("physicalType")
2640 .and_then(|v| v.as_str())
2641 .map(|s| s.to_string());
2642 columns.push(Column {
2643 name: nested_col_name,
2644 data_type: fallback_data_type,
2645 physical_type: nested_physical_type,
2646 nullable: !field_data
2647 .get("required")
2648 .and_then(|v| v.as_bool())
2649 .unwrap_or(false),
2650 description: field_data
2651 .get("description")
2652 .and_then(|v| v.as_str())
2653 .unwrap_or("")
2654 .to_string(),
2655 ..Default::default()
2656 });
2657 }
2658 }
2659 } else if nested_type_upper.starts_with("ARRAY<") {
2660 let nested_physical_type = field_data
2663 .get("physicalType")
2664 .and_then(|v| v.as_str())
2665 .map(|s| s.to_string());
2666 columns.push(Column {
2667 name: nested_col_name,
2668 data_type: normalize_data_type(&nested_type),
2669 physical_type: nested_physical_type,
2670 nullable: !field_data
2671 .get("required")
2672 .and_then(|v| v.as_bool())
2673 .unwrap_or(false),
2674 description: field_data
2675 .get("description")
2676 .and_then(|v| v.as_str())
2677 .unwrap_or("")
2678 .to_string(),
2679 ..Default::default()
2680 });
2681 } else {
2682 let nested_physical_type = field_data
2684 .get("physicalType")
2685 .and_then(|v| v.as_str())
2686 .map(|s| s.to_string());
2687 columns.push(Column {
2688 name: nested_col_name,
2689 data_type: normalize_data_type(&nested_type),
2690 physical_type: nested_physical_type,
2691 nullable: !field_data
2692 .get("required")
2693 .and_then(|v| v.as_bool())
2694 .unwrap_or(false),
2695 description: field_data
2696 .get("description")
2697 .and_then(|v| v.as_str())
2698 .unwrap_or("")
2699 .to_string(),
2700 ..Default::default()
2701 });
2702 }
2703 }
2704
2705 return Ok(columns);
2706 }
2707
2708 Ok(Vec::new())
2710 }
2711
2712 fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
2714 let mut fields = Vec::new();
2715 let mut current_field = String::new();
2716 let mut depth = 0;
2717 let mut in_string = false;
2718 let mut string_char = None;
2719
2720 for ch in fields_str.chars() {
2721 match ch {
2722 '\'' | '"' if !in_string || Some(ch) == string_char => {
2723 if in_string {
2724 in_string = false;
2725 string_char = None;
2726 } else {
2727 in_string = true;
2728 string_char = Some(ch);
2729 }
2730 current_field.push(ch);
2731 }
2732 '<' if !in_string => {
2733 depth += 1;
2734 current_field.push(ch);
2735 }
2736 '>' if !in_string => {
2737 depth -= 1;
2738 current_field.push(ch);
2739 }
2740 ',' if !in_string && depth == 0 => {
2741 let trimmed = current_field.trim();
2743 if !trimmed.is_empty()
2744 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2745 {
2746 fields.push((name, type_part));
2747 }
2748 current_field.clear();
2749 }
2750 _ => {
2751 current_field.push(ch);
2752 }
2753 }
2754 }
2755
2756 let trimmed = current_field.trim();
2758 if !trimmed.is_empty()
2759 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2760 {
2761 fields.push((name, type_part));
2762 }
2763
2764 Ok(fields)
2765 }
2766
2767 fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
2769 let colon_pos = field_def.find(':')?;
2771 let name = field_def[..colon_pos].trim().to_string();
2772 let type_part = field_def[colon_pos + 1..].trim().to_string();
2773
2774 if name.is_empty() || type_part.is_empty() {
2775 return None;
2776 }
2777
2778 Some((name, type_part))
2779 }
2780
2781 fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
2783 let mut catalog_name = None;
2784 let mut schema_name = None;
2785
2786 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2787 for prop in custom_props {
2788 if let Some(prop_obj) = prop.as_object() {
2789 let prop_key = prop_obj
2790 .get("property")
2791 .and_then(|v| v.as_str())
2792 .unwrap_or("");
2793 let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
2794
2795 match prop_key {
2796 "catalogName" | "catalog_name" => {
2797 catalog_name = prop_value.map(|s| s.to_string());
2798 }
2799 "schemaName" | "schema_name" => {
2800 schema_name = prop_value.map(|s| s.to_string());
2801 }
2802 _ => {}
2803 }
2804 }
2805 }
2806 }
2807
2808 if catalog_name.is_none() {
2810 catalog_name = data
2811 .get("catalog_name")
2812 .and_then(|v| v.as_str())
2813 .map(|s| s.to_string());
2814 }
2815 if schema_name.is_none() {
2816 schema_name = data
2817 .get("schema_name")
2818 .and_then(|v| v.as_str())
2819 .map(|s| s.to_string());
2820 }
2821
2822 (catalog_name, schema_name)
2823 }
2824}
2825
2826impl Default for ODCSImporter {
2827 fn default() -> Self {
2828 Self::new()
2829 }
2830}
2831
2832#[cfg(test)]
2833mod tests {
2834 use super::*;
2835
2836 #[test]
2837 fn test_parse_simple_odcl_table() {
2838 let mut parser = ODCSImporter::new();
2839 let odcl_yaml = r#"
2840name: users
2841columns:
2842 - name: id
2843 data_type: INT
2844 nullable: false
2845 primary_key: true
2846 - name: name
2847 data_type: VARCHAR(255)
2848 nullable: false
2849database_type: Postgres
2850"#;
2851
2852 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2853 assert_eq!(table.name, "users");
2854 assert_eq!(table.columns.len(), 2);
2855 assert_eq!(table.columns[0].name, "id");
2856 assert_eq!(table.database_type, Some(DatabaseType::Postgres));
2857 assert_eq!(errors.len(), 0);
2858 }
2859
2860 #[test]
2861 fn test_parse_odcl_with_metadata() {
2862 let mut parser = ODCSImporter::new();
2863 let odcl_yaml = r#"
2864name: users
2865columns:
2866 - name: id
2867 data_type: INT
2868medallion_layer: gold
2869scd_pattern: TYPE_2
2870odcl_metadata:
2871 description: "User table"
2872 owner: "data-team"
2873"#;
2874
2875 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2876 assert_eq!(table.medallion_layers.len(), 1);
2877 assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
2878 assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
2879 if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
2880 assert_eq!(desc, "User table");
2881 }
2882 assert_eq!(errors.len(), 0);
2883 }
2884
2885 #[test]
2886 fn test_parse_odcl_with_data_vault() {
2887 let mut parser = ODCSImporter::new();
2888 let odcl_yaml = r#"
2889name: hub_customer
2890columns:
2891 - name: customer_key
2892 data_type: VARCHAR(50)
2893data_vault_classification: Hub
2894"#;
2895
2896 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2897 assert_eq!(
2898 table.data_vault_classification,
2899 Some(DataVaultClassification::Hub)
2900 );
2901 assert_eq!(errors.len(), 0);
2902 }
2903
2904 #[test]
2905 fn test_parse_invalid_odcl() {
2906 let mut parser = ODCSImporter::new();
2907 let invalid_yaml = "not: valid: yaml: structure:";
2908
2909 assert!(parser.parse(invalid_yaml).is_err());
2911 }
2912
2913 #[test]
2914 fn test_parse_odcl_missing_required_fields() {
2915 let mut parser = ODCSImporter::new();
2916 let non_conformant = r#"
2917name: users
2918# Missing required columns field
2919"#;
2920
2921 assert!(parser.parse(non_conformant).is_err());
2923 }
2924
2925 #[test]
2926 fn test_parse_odcl_with_foreign_key() {
2927 let mut parser = ODCSImporter::new();
2928 let odcl_yaml = r#"
2929name: orders
2930columns:
2931 - name: id
2932 data_type: INT
2933 primary_key: true
2934 - name: user_id
2935 data_type: INT
2936 foreign_key:
2937 table_id: users
2938 column_name: id
2939"#;
2940
2941 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2942 assert_eq!(table.columns.len(), 2);
2943 let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
2944 assert!(user_id_col.foreign_key.is_some());
2945 assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
2946 assert_eq!(errors.len(), 0);
2947 }
2948
2949 #[test]
2950 fn test_parse_odcl_with_constraints() {
2951 let mut parser = ODCSImporter::new();
2952 let odcl_yaml = r#"
2953name: products
2954columns:
2955 - name: id
2956 data_type: INT
2957 primary_key: true
2958 - name: name
2959 data_type: VARCHAR(255)
2960 nullable: false
2961 constraints:
2962 - UNIQUE
2963 - NOT NULL
2964"#;
2965
2966 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2967 assert_eq!(table.columns.len(), 2);
2968 let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
2969 assert!(!name_col.nullable);
2970 assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
2971 assert_eq!(errors.len(), 0);
2972 }
2973}