1use super::{ImportError, ImportResult, TableData};
12use crate::models::column::ForeignKey;
13use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
14use crate::models::{Column, Table, Tag};
15use anyhow::{Context, Result};
16use serde_json::Value as JsonValue;
17use std::collections::HashMap;
18use std::str::FromStr;
19use tracing::info;
20
21pub struct ODCSImporter {
24 current_yaml_data: Option<serde_yaml::Value>,
26}
27
28impl ODCSImporter {
29 pub fn new() -> Self {
39 Self {
40 current_yaml_data: None,
41 }
42 }
43
44 pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
77 match self.parse(yaml_content) {
78 Ok((table, errors)) => {
79 let sdk_tables = vec![TableData {
80 table_index: 0,
81 name: Some(table.name.clone()),
82 columns: table
83 .columns
84 .iter()
85 .map(|c| super::ColumnData {
86 name: c.name.clone(),
87 data_type: c.data_type.clone(),
88 nullable: c.nullable,
89 primary_key: c.primary_key,
90 description: if c.description.is_empty() {
91 None
92 } else {
93 Some(c.description.clone())
94 },
95 quality: if c.quality.is_empty() {
96 None
97 } else {
98 Some(c.quality.clone())
99 },
100 ref_path: c.ref_path.clone(),
101 enum_values: if c.enum_values.is_empty() {
102 None
103 } else {
104 Some(c.enum_values.clone())
105 },
106 })
107 .collect(),
108 }];
109 let sdk_errors: Vec<ImportError> = errors
110 .iter()
111 .map(|e| ImportError::ParseError(e.message.clone()))
112 .collect();
113 Ok(ImportResult {
114 tables: sdk_tables,
115 tables_requiring_name: Vec::new(),
116 errors: sdk_errors,
117 ai_suggestions: None,
118 })
119 }
120 Err(e) => Err(ImportError::ParseError(e.to_string())),
121 }
122 }
123
124 pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
135 self.parse(yaml_content)
136 }
137
138 fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
148 let _errors: Vec<ParserError> = Vec::new();
150
151 let data: serde_yaml::Value =
153 serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
154
155 if data.is_null() {
156 return Err(anyhow::anyhow!("Empty YAML content"));
157 }
158
159 self.current_yaml_data = Some(data.clone());
161
162 let json_data = yaml_to_json_value(&data)?;
164
165 if self.is_liquibase_format(&json_data) {
167 return self.parse_liquibase(&json_data);
168 }
169
170 if self.is_odcl_v3_format(&json_data) {
171 return self.parse_odcl_v3(&json_data);
172 }
173
174 if self.is_data_contract_format(&json_data) {
175 return self.parse_data_contract(&json_data);
176 }
177
178 self.parse_simple_odcl(&json_data)
180 }
181
182 fn resolve_ref<'a>(&self, ref_str: &str, data: &'a JsonValue) -> Option<&'a JsonValue> {
184 if !ref_str.starts_with("#/") {
185 return None;
186 }
187
188 let path = &ref_str[2..];
190 let parts: Vec<&str> = path.split('/').collect();
191
192 let mut current = data;
194 for part in parts {
195 current = current.get(part)?;
196 }
197
198 if current.is_object() {
199 Some(current)
200 } else {
201 None
202 }
203 }
204
205 fn is_liquibase_format(&self, data: &JsonValue) -> bool {
207 if data.get("databaseChangeLog").is_some() {
208 return true;
209 }
210 if let Some(obj) = data.as_object() {
212 let obj_str = format!("{:?}", obj);
213 if obj_str.contains("changeSet") {
214 return true;
215 }
216 }
217 false
218 }
219
220 fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
222 if let Some(obj) = data.as_object() {
223 let has_api_version = obj.contains_key("apiVersion");
224 let has_kind = obj
225 .get("kind")
226 .and_then(|v| v.as_str())
227 .map(|s| s == "DataContract")
228 .unwrap_or(false);
229 let has_id = obj.contains_key("id");
230 let has_version = obj.contains_key("version");
231 return has_api_version && has_kind && has_id && has_version;
232 }
233 false
234 }
235
236 fn is_data_contract_format(&self, data: &JsonValue) -> bool {
238 if let Some(obj) = data.as_object() {
239 let has_spec = obj.contains_key("dataContractSpecification");
240 let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
241 return has_spec && has_models;
242 }
243 false
244 }
245
246 fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
248 let mut errors = Vec::new();
249
250 let name = data
252 .get("name")
253 .and_then(|v| v.as_str())
254 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
255 .to_string();
256
257 let columns_data = data
259 .get("columns")
260 .and_then(|v| v.as_array())
261 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
262
263 let mut columns = Vec::new();
264 for (idx, col_data) in columns_data.iter().enumerate() {
265 match self.parse_column(col_data) {
266 Ok(col) => columns.push(col),
267 Err(e) => {
268 errors.push(ParserError {
269 error_type: "column_parse_error".to_string(),
270 field: format!("columns[{}]", idx),
271 message: e.to_string(),
272 });
273 }
274 }
275 }
276
277 let database_type = self.extract_database_type(data);
279 let medallion_layers = self.extract_medallion_layers(data);
280 let scd_pattern = self.extract_scd_pattern(data);
281 let data_vault_classification = self.extract_data_vault_classification(data);
282 let quality_rules = self.extract_quality_rules(data);
283
284 if scd_pattern.is_some() && data_vault_classification.is_some() {
286 errors.push(ParserError {
287 error_type: "validation_error".to_string(),
288 field: "patterns".to_string(),
289 message: "SCD pattern and Data Vault classification are mutually exclusive"
290 .to_string(),
291 });
292 }
293
294 let mut odcl_metadata = HashMap::new();
296 if let Some(metadata) = data.get("odcl_metadata")
297 && let Some(obj) = metadata.as_object()
298 {
299 for (key, value) in obj {
300 odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
301 }
302 }
303
304 let table_uuid = self.extract_table_uuid(data);
305
306 let table = Table {
307 id: table_uuid,
308 name,
309 columns,
310 database_type,
311 catalog_name: None,
312 schema_name: None,
313 medallion_layers,
314 scd_pattern,
315 data_vault_classification,
316 modeling_level: None,
317 tags: Vec::<Tag>::new(),
318 odcl_metadata,
319 owner: None,
320 sla: None,
321 contact_details: None,
322 infrastructure_type: None,
323 notes: None,
324 position: None,
325 yaml_file_path: None,
326 drawio_cell_id: None,
327 quality: quality_rules,
328 errors: Vec::new(),
329 created_at: chrono::Utc::now(),
330 updated_at: chrono::Utc::now(),
331 };
332
333 info!("Parsed ODCL table: {}", table.name);
334 Ok((table, errors))
335 }
336
337 fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
339 let name = col_data
340 .get("name")
341 .and_then(|v| v.as_str())
342 .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
343 .to_string();
344
345 let data_type = col_data
346 .get("data_type")
347 .and_then(|v| v.as_str())
348 .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
349 .to_string();
350
351 let data_type = normalize_data_type(&data_type);
353
354 let nullable = col_data
355 .get("nullable")
356 .and_then(|v| v.as_bool())
357 .unwrap_or(true);
358
359 let primary_key = col_data
360 .get("primary_key")
361 .and_then(|v| v.as_bool())
362 .unwrap_or(false);
363
364 let foreign_key = col_data
365 .get("foreign_key")
366 .and_then(|v| self.parse_foreign_key(v));
367
368 let constraints = col_data
369 .get("constraints")
370 .and_then(|v| v.as_array())
371 .map(|arr| {
372 arr.iter()
373 .filter_map(|v| v.as_str().map(|s| s.to_string()))
374 .collect()
375 })
376 .unwrap_or_default();
377
378 let description = col_data
379 .get("description")
380 .and_then(|v| v.as_str())
381 .map(|s| s.to_string())
382 .unwrap_or_default();
383
384 let mut column_quality_rules = Vec::new();
386 if let Some(quality_val) = col_data.get("quality") {
387 if let Some(arr) = quality_val.as_array() {
388 for item in arr {
390 if let Some(obj) = item.as_object() {
391 let mut rule = HashMap::new();
392 for (key, value) in obj {
393 rule.insert(key.clone(), json_value_to_serde_value(value));
394 }
395 column_quality_rules.push(rule);
396 }
397 }
398 } else if let Some(obj) = quality_val.as_object() {
399 let mut rule = HashMap::new();
401 for (key, value) in obj {
402 rule.insert(key.clone(), json_value_to_serde_value(value));
403 }
404 column_quality_rules.push(rule);
405 }
406 }
407
408 if !nullable {
410 let has_not_null = column_quality_rules.iter().any(|rule| {
411 rule.get("type")
412 .and_then(|v| v.as_str())
413 .map(|s| {
414 s.to_lowercase().contains("not_null")
415 || s.to_lowercase().contains("notnull")
416 })
417 .unwrap_or(false)
418 });
419 if !has_not_null {
420 let mut not_null_rule = HashMap::new();
421 not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
422 not_null_rule.insert(
423 "description".to_string(),
424 serde_json::json!("Column must not be null"),
425 );
426 column_quality_rules.push(not_null_rule);
427 }
428 }
429
430 Ok(Column {
431 name,
432 data_type,
433 nullable,
434 primary_key,
435 secondary_key: false,
436 composite_key: None,
437 foreign_key,
438 constraints,
439 description,
440 errors: Vec::new(),
441 quality: column_quality_rules,
442 ref_path: None,
443 enum_values: Vec::new(),
444 column_order: 0,
445 })
446 }
447
448 fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
450 let obj = fk_data.as_object()?;
451 Some(ForeignKey {
452 table_id: obj
453 .get("table_id")
454 .or_else(|| obj.get("table"))
455 .and_then(|v| v.as_str())
456 .unwrap_or("")
457 .to_string(),
458 column_name: obj
459 .get("column_name")
460 .or_else(|| obj.get("column"))
461 .and_then(|v| v.as_str())
462 .unwrap_or("")
463 .to_string(),
464 })
465 }
466
467 fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
469 data.get("database_type")
470 .and_then(|v| v.as_str())
471 .and_then(|s| match s.to_uppercase().as_str() {
472 "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
473 "MYSQL" => Some(DatabaseType::Mysql),
474 "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
475 "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
476 "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
477 _ => None,
478 })
479 }
480
481 fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
483 let mut layers = Vec::new();
484
485 if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
487 for item in arr {
488 if let Some(s) = item.as_str()
489 && let Ok(layer) = parse_medallion_layer(s)
490 {
491 layers.push(layer);
492 }
493 }
494 }
495 else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
497 && let Ok(layer) = parse_medallion_layer(s)
498 {
499 layers.push(layer);
500 }
501
502 layers
503 }
504
505 fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
507 data.get("scd_pattern")
508 .and_then(|v| v.as_str())
509 .and_then(|s| parse_scd_pattern(s).ok())
510 }
511
512 fn extract_data_vault_classification(
514 &self,
515 data: &JsonValue,
516 ) -> Option<DataVaultClassification> {
517 data.get("data_vault_classification")
518 .and_then(|v| v.as_str())
519 .and_then(|s| parse_data_vault_classification(s).ok())
520 }
521
522 fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
524 use serde_json::Value;
525 let mut quality_rules = Vec::new();
526
527 if let Some(quality_val) = data.get("quality") {
529 if let Some(arr) = quality_val.as_array() {
530 for item in arr {
532 if let Some(obj) = item.as_object() {
533 let mut rule = HashMap::new();
534 for (key, value) in obj {
535 rule.insert(key.clone(), json_value_to_serde_value(value));
536 }
537 quality_rules.push(rule);
538 }
539 }
540 } else if let Some(obj) = quality_val.as_object() {
541 let mut rule = HashMap::new();
543 for (key, value) in obj {
544 rule.insert(key.clone(), json_value_to_serde_value(value));
545 }
546 quality_rules.push(rule);
547 } else if let Some(s) = quality_val.as_str() {
548 let mut rule = HashMap::new();
550 rule.insert("value".to_string(), Value::String(s.to_string()));
551 quality_rules.push(rule);
552 }
553 }
554
555 if let Some(metadata) = data.get("metadata")
557 && let Some(metadata_obj) = metadata.as_object()
558 && let Some(quality_val) = metadata_obj.get("quality")
559 {
560 if let Some(arr) = quality_val.as_array() {
561 for item in arr {
563 if let Some(obj) = item.as_object() {
564 let mut rule = HashMap::new();
565 for (key, value) in obj {
566 rule.insert(key.clone(), json_value_to_serde_value(value));
567 }
568 quality_rules.push(rule);
569 }
570 }
571 } else if let Some(obj) = quality_val.as_object() {
572 let mut rule = HashMap::new();
574 for (key, value) in obj {
575 rule.insert(key.clone(), json_value_to_serde_value(value));
576 }
577 quality_rules.push(rule);
578 } else if let Some(s) = quality_val.as_str() {
579 let mut rule = HashMap::new();
581 rule.insert("value".to_string(), Value::String(s.to_string()));
582 quality_rules.push(rule);
583 }
584 }
585
586 if let Some(tblprops) = data.get("tblproperties")
588 && let Some(obj) = tblprops.as_object()
589 {
590 for (key, value) in obj {
591 let mut rule = HashMap::new();
592 rule.insert("property".to_string(), Value::String(key.clone()));
593 rule.insert("value".to_string(), json_value_to_serde_value(value));
594 quality_rules.push(rule);
595 }
596 }
597
598 quality_rules
599 }
600
601 fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
609 let mut errors = Vec::new();
623
624 let changelog = data
625 .get("databaseChangeLog")
626 .and_then(|v| v.as_array())
627 .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
628
629 let mut table_name: Option<String> = None;
631 let mut columns: Vec<crate::models::column::Column> = Vec::new();
632
633 for entry in changelog {
634 if let Some(change_set) = entry.get("changeSet") {
636 let changes = if let Some(obj) = change_set.as_object() {
638 obj.get("changes")
639 .and_then(|v| v.as_array())
640 .cloned()
641 .unwrap_or_default()
642 } else if let Some(arr) = change_set.as_array() {
643 arr.clone()
645 } else {
646 Vec::new()
647 };
648
649 for ch in changes {
650 let create = ch.get("createTable").or_else(|| ch.get("create_table"));
651 if let Some(create) = create {
652 table_name = create
653 .get("tableName")
654 .or_else(|| create.get("table_name"))
655 .and_then(|v| v.as_str())
656 .map(|s| s.to_string());
657
658 if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
660 for col_entry in cols {
661 let col = col_entry.get("column").unwrap_or(col_entry);
662 let name = col
663 .get("name")
664 .and_then(|v| v.as_str())
665 .unwrap_or("")
666 .to_string();
667 let data_type = col
668 .get("type")
669 .and_then(|v| v.as_str())
670 .unwrap_or("")
671 .to_string();
672
673 if name.is_empty() {
674 errors.push(ParserError {
675 error_type: "validation_error".to_string(),
676 field: "columns.name".to_string(),
677 message: "Liquibase createTable column missing name"
678 .to_string(),
679 });
680 continue;
681 }
682
683 let mut column =
684 crate::models::column::Column::new(name, data_type);
685
686 if let Some(constraints) =
687 col.get("constraints").and_then(|v| v.as_object())
688 {
689 if let Some(pk) =
690 constraints.get("primaryKey").and_then(|v| v.as_bool())
691 {
692 column.primary_key = pk;
693 }
694 if let Some(nullable) =
695 constraints.get("nullable").and_then(|v| v.as_bool())
696 {
697 column.nullable = nullable;
698 }
699 }
700
701 columns.push(column);
702 }
703 }
704
705 break;
708 }
709 }
710 }
711 if table_name.is_some() {
712 break;
713 }
714 }
715
716 let table_name = table_name
717 .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
718 let table = Table::new(table_name, columns);
719 Ok((table, errors))
721 }
722
723 fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
725 let mut errors = Vec::new();
726
727 let table_name = data
729 .get("name")
730 .and_then(|v| v.as_str())
731 .map(|s| s.to_string())
732 .or_else(|| {
733 data.get("schema")
735 .and_then(|v| v.as_array())
736 .and_then(|arr| arr.first())
737 .and_then(|obj| obj.as_object())
738 .and_then(|obj| obj.get("name"))
739 .and_then(|v| v.as_str())
740 .map(|s| s.to_string())
741 })
742 .ok_or_else(|| {
743 anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
744 })?;
745
746 let schema = data
748 .get("schema")
749 .and_then(|v| v.as_array())
750 .ok_or_else(|| {
751 errors.push(ParserError {
752 error_type: "validation_error".to_string(),
753 field: "schema".to_string(),
754 message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
755 });
756 anyhow::anyhow!("Missing schema")
757 });
758
759 let schema = match schema {
760 Ok(s) if s.is_empty() => {
761 errors.push(ParserError {
762 error_type: "validation_error".to_string(),
763 field: "schema".to_string(),
764 message: "ODCS v3.0.x schema array is empty".to_string(),
765 });
766 let quality_rules = self.extract_quality_rules(data);
767 let table_uuid = self.extract_table_uuid(data);
768 let table = Table {
769 id: table_uuid,
770 name: table_name,
771 columns: Vec::new(),
772 database_type: None,
773 catalog_name: None,
774 schema_name: None,
775 medallion_layers: Vec::new(),
776 scd_pattern: None,
777 data_vault_classification: None,
778 modeling_level: None,
779 tags: Vec::<Tag>::new(),
780 odcl_metadata: HashMap::new(),
781 owner: None,
782 sla: None,
783 contact_details: None,
784 infrastructure_type: None,
785 notes: None,
786 position: None,
787 yaml_file_path: None,
788 drawio_cell_id: None,
789 quality: quality_rules,
790 errors: Vec::new(),
791 created_at: chrono::Utc::now(),
792 updated_at: chrono::Utc::now(),
793 };
794 return Ok((table, errors));
795 }
796 Ok(s) => s,
797 Err(_) => {
798 let quality_rules = self.extract_quality_rules(data);
799 let table_uuid = self.extract_table_uuid(data);
800 let table = Table {
801 id: table_uuid,
802 name: table_name,
803 columns: Vec::new(),
804 database_type: None,
805 catalog_name: None,
806 schema_name: None,
807 medallion_layers: Vec::new(),
808 scd_pattern: None,
809 data_vault_classification: None,
810 modeling_level: None,
811 tags: Vec::<Tag>::new(),
812 odcl_metadata: HashMap::new(),
813 owner: None,
814 sla: None,
815 contact_details: None,
816 infrastructure_type: None,
817 notes: None,
818 position: None,
819 yaml_file_path: None,
820 drawio_cell_id: None,
821 quality: quality_rules,
822 errors: Vec::new(),
823 created_at: chrono::Utc::now(),
824 updated_at: chrono::Utc::now(),
825 };
826 return Ok((table, errors));
827 }
828 };
829
830 let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
832 errors.push(ParserError {
833 error_type: "validation_error".to_string(),
834 field: "schema[0]".to_string(),
835 message: "First schema object must be a dictionary".to_string(),
836 });
837 anyhow::anyhow!("Invalid schema object")
838 })?;
839
840 let object_name = schema_object
841 .get("name")
842 .and_then(|v| v.as_str())
843 .unwrap_or(&table_name);
844
845 let mut columns = Vec::new();
847
848 if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
849 for (prop_name, prop_data) in properties_obj {
851 if let Some(prop_obj) = prop_data.as_object() {
852 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
853 Ok(mut cols) => columns.append(&mut cols),
854 Err(e) => {
855 errors.push(ParserError {
856 error_type: "property_parse_error".to_string(),
857 field: format!("Property '{}'", prop_name),
858 message: e.to_string(),
859 });
860 }
861 }
862 } else {
863 errors.push(ParserError {
864 error_type: "validation_error".to_string(),
865 field: format!("Property '{}'", prop_name),
866 message: format!("Property '{}' must be an object", prop_name),
867 });
868 }
869 }
870 } else if let Some(properties_arr) =
871 schema_object.get("properties").and_then(|v| v.as_array())
872 {
873 for (idx, prop_data) in properties_arr.iter().enumerate() {
875 if let Some(prop_obj) = prop_data.as_object() {
876 let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
879 Some(JsonValue::String(s)) => s.as_str(),
880 _ => {
881 errors.push(ParserError {
883 error_type: "validation_error".to_string(),
884 field: format!("Property[{}]", idx),
885 message: format!(
886 "Property[{}] missing required 'name' or 'id' field",
887 idx
888 ),
889 });
890 continue;
891 }
892 };
893
894 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
895 Ok(mut cols) => columns.append(&mut cols),
896 Err(e) => {
897 errors.push(ParserError {
898 error_type: "property_parse_error".to_string(),
899 field: format!("Property[{}] '{}'", idx, prop_name),
900 message: e.to_string(),
901 });
902 }
903 }
904 } else {
905 errors.push(ParserError {
906 error_type: "validation_error".to_string(),
907 field: format!("Property[{}]", idx),
908 message: format!("Property[{}] must be an object", idx),
909 });
910 }
911 }
912 } else {
913 errors.push(ParserError {
914 error_type: "validation_error".to_string(),
915 field: format!("Object '{}'", object_name),
916 message: format!(
917 "Object '{}' missing 'properties' field or properties is invalid",
918 object_name
919 ),
920 });
921 }
922
923 let (medallion_layers, scd_pattern, data_vault_classification, mut tags): (
925 _,
926 _,
927 _,
928 Vec<Tag>,
929 ) = self.extract_metadata_from_custom_properties(data);
930
931 let mut shared_domains: Vec<String> = Vec::new();
933 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
934 for prop in custom_props {
935 if let Some(prop_obj) = prop.as_object() {
936 let prop_key = prop_obj
937 .get("property")
938 .and_then(|v| v.as_str())
939 .unwrap_or("");
940 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
941 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
942 {
943 for item in arr {
944 if let Some(s) = item.as_str() {
945 shared_domains.push(s.to_string());
946 }
947 }
948 }
949 }
950 }
951 }
952
953 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
955 for item in tags_arr {
956 if let Some(s) = item.as_str() {
957 let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
959 if !tags.contains(&tag) {
960 tags.push(tag);
961 }
962 }
963 }
964 }
965
966 let database_type = self.extract_database_type_from_odcl_v3_servers(data);
968
969 let quality_rules = self.extract_quality_rules(data);
971
972 let mut odcl_metadata = HashMap::new();
974 odcl_metadata.insert(
975 "apiVersion".to_string(),
976 json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
977 );
978 odcl_metadata.insert(
979 "kind".to_string(),
980 json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
981 );
982 odcl_metadata.insert(
983 "id".to_string(),
984 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
985 );
986 odcl_metadata.insert(
987 "version".to_string(),
988 json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
989 );
990 odcl_metadata.insert(
991 "status".to_string(),
992 json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
993 );
994
995 if let Some(servicelevels_val) = data.get("servicelevels") {
997 odcl_metadata.insert(
998 "servicelevels".to_string(),
999 json_value_to_serde_value(servicelevels_val),
1000 );
1001 }
1002
1003 if let Some(links_val) = data.get("links") {
1005 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1006 }
1007
1008 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1010 odcl_metadata.insert(
1011 "domain".to_string(),
1012 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1013 );
1014 }
1015 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1016 odcl_metadata.insert(
1017 "dataProduct".to_string(),
1018 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1019 );
1020 }
1021 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1022 odcl_metadata.insert(
1023 "tenant".to_string(),
1024 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1025 );
1026 }
1027
1028 if let Some(desc_val) = data.get("description") {
1030 odcl_metadata.insert(
1031 "description".to_string(),
1032 json_value_to_serde_value(desc_val),
1033 );
1034 }
1035
1036 if let Some(pricing_val) = data.get("pricing") {
1038 odcl_metadata.insert(
1039 "pricing".to_string(),
1040 json_value_to_serde_value(pricing_val),
1041 );
1042 }
1043
1044 if let Some(team_val) = data.get("team") {
1046 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1047 }
1048
1049 if let Some(roles_val) = data.get("roles") {
1051 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1052 }
1053
1054 if let Some(terms_val) = data.get("terms") {
1056 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1057 }
1058
1059 if let Some(servers_val) = data.get("servers") {
1061 odcl_metadata.insert(
1062 "servers".to_string(),
1063 json_value_to_serde_value(servers_val),
1064 );
1065 }
1066
1067 if let Some(infrastructure_val) = data.get("infrastructure") {
1069 odcl_metadata.insert(
1070 "infrastructure".to_string(),
1071 json_value_to_serde_value(infrastructure_val),
1072 );
1073 }
1074
1075 if !shared_domains.is_empty() {
1077 let shared_domains_json: Vec<serde_json::Value> = shared_domains
1078 .iter()
1079 .map(|d| serde_json::Value::String(d.clone()))
1080 .collect();
1081 odcl_metadata.insert(
1082 "sharedDomains".to_string(),
1083 serde_json::Value::Array(shared_domains_json),
1084 );
1085 }
1086
1087 let table_uuid = self.extract_table_uuid(data);
1088
1089 let table = Table {
1090 id: table_uuid,
1091 name: table_name,
1092 columns,
1093 database_type,
1094 catalog_name: None,
1095 schema_name: None,
1096 medallion_layers,
1097 scd_pattern,
1098 data_vault_classification,
1099 modeling_level: None,
1100 tags,
1101 odcl_metadata,
1102 owner: None,
1103 sla: None,
1104 contact_details: None,
1105 infrastructure_type: None,
1106 notes: None,
1107 position: None,
1108 yaml_file_path: None,
1109 drawio_cell_id: None,
1110 quality: quality_rules,
1111 errors: Vec::new(),
1112 created_at: chrono::Utc::now(),
1113 updated_at: chrono::Utc::now(),
1114 };
1115
1116 info!(
1117 "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1118 table.name,
1119 errors.len()
1120 );
1121 Ok((table, errors))
1122 }
1123
1124 fn parse_odcl_v3_property(
1126 &self,
1127 prop_name: &str,
1128 prop_data: &serde_json::Map<String, JsonValue>,
1129 data: &JsonValue,
1130 ) -> Result<Vec<Column>> {
1131 let mut errors = Vec::new();
1133 self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
1134 }
1135
1136 fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
1139 if let Some(id_val) = data.get("id")
1141 && let Some(id_str) = id_val.as_str()
1142 {
1143 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1144 tracing::debug!(
1145 "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
1146 uuid
1147 );
1148 return uuid;
1149 } else {
1150 tracing::warn!(
1151 "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
1152 id_str
1153 );
1154 }
1155 }
1156
1157 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1159 for prop in custom_props {
1160 if let Some(prop_obj) = prop.as_object() {
1161 let prop_key = prop_obj
1162 .get("property")
1163 .and_then(|v| v.as_str())
1164 .unwrap_or("");
1165 if prop_key == "tableUuid"
1166 && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
1167 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1168 {
1169 tracing::debug!(
1170 "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
1171 uuid
1172 );
1173 return uuid;
1174 }
1175 }
1176 }
1177 }
1178
1179 if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
1181 && let Some(uuid_val) = metadata.get("tableUuid")
1182 && let Some(uuid_str) = uuid_val.as_str()
1183 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1184 {
1185 tracing::debug!(
1186 "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
1187 uuid
1188 );
1189 return uuid;
1190 }
1191
1192 let table_name = data
1194 .get("name")
1195 .and_then(|v| v.as_str())
1196 .unwrap_or("unknown");
1197 let new_uuid = crate::models::table::Table::generate_id(
1198 table_name, None, None, None, );
1202 tracing::warn!(
1203 "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
1204 table_name,
1205 new_uuid
1206 );
1207 new_uuid
1208 }
1209
1210 fn extract_metadata_from_custom_properties(
1212 &self,
1213 data: &JsonValue,
1214 ) -> (
1215 Vec<MedallionLayer>,
1216 Option<SCDPattern>,
1217 Option<DataVaultClassification>,
1218 Vec<Tag>,
1219 ) {
1220 let mut medallion_layers = Vec::new();
1221 let mut scd_pattern = None;
1222 let mut data_vault_classification = None;
1223 let mut tags: Vec<Tag> = Vec::new();
1224
1225 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1226 for prop in custom_props {
1227 if let Some(prop_obj) = prop.as_object() {
1228 let prop_key = prop_obj
1229 .get("property")
1230 .and_then(|v| v.as_str())
1231 .unwrap_or("");
1232 let prop_value = prop_obj.get("value");
1233
1234 match prop_key {
1235 "medallionLayers" | "medallion_layers" => {
1236 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1237 for item in arr {
1238 if let Some(s) = item.as_str()
1239 && let Ok(layer) = parse_medallion_layer(s)
1240 {
1241 medallion_layers.push(layer);
1242 }
1243 }
1244 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1245 for part in s.split(',') {
1247 if let Ok(layer) = parse_medallion_layer(part.trim()) {
1248 medallion_layers.push(layer);
1249 }
1250 }
1251 }
1252 }
1253 "scdPattern" | "scd_pattern" => {
1254 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1255 scd_pattern = parse_scd_pattern(s).ok();
1256 }
1257 }
1258 "dataVaultClassification" | "data_vault_classification" => {
1259 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1260 data_vault_classification = parse_data_vault_classification(s).ok();
1261 }
1262 }
1263 "tags" => {
1264 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1265 for item in arr {
1266 if let Some(s) = item.as_str() {
1267 if let Ok(tag) = Tag::from_str(s) {
1269 tags.push(tag);
1270 } else {
1271 tags.push(Tag::Simple(s.to_string()));
1272 }
1273 }
1274 }
1275 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1276 for part in s.split(',') {
1278 let part = part.trim();
1279 if let Ok(tag) = Tag::from_str(part) {
1280 tags.push(tag);
1281 } else {
1282 tags.push(Tag::Simple(part.to_string()));
1283 }
1284 }
1285 }
1286 }
1287 "sharedDomains" | "shared_domains" => {
1288 }
1291 _ => {}
1292 }
1293 }
1294 }
1295 }
1296
1297 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1299 for item in tags_arr {
1300 if let Some(s) = item.as_str() {
1301 let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
1303 if !tags.contains(&tag) {
1304 tags.push(tag);
1305 }
1306 }
1307 }
1308 }
1309
1310 (
1311 medallion_layers,
1312 scd_pattern,
1313 data_vault_classification,
1314 tags,
1315 )
1316 }
1317
1318 fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
1320 if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
1322 && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
1323 {
1324 return server_obj
1325 .get("type")
1326 .and_then(|v| v.as_str())
1327 .and_then(|s| self.parse_database_type(s));
1328 }
1329 None
1330 }
1331
1332 fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1334 let mut errors = Vec::new();
1335
1336 let models = data
1338 .get("models")
1339 .and_then(|v| v.as_object())
1340 .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
1341
1342 let (model_name, model_data) = models
1345 .iter()
1346 .next()
1347 .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
1348
1349 let model_data = model_data
1350 .as_object()
1351 .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
1352
1353 let fields = model_data
1355 .get("fields")
1356 .and_then(|v| v.as_object())
1357 .ok_or_else(|| {
1358 errors.push(ParserError {
1359 error_type: "validation_error".to_string(),
1360 field: format!("Model '{}'", model_name),
1361 message: format!("Model '{}' missing 'fields' field", model_name),
1362 });
1363 anyhow::anyhow!("Missing fields")
1364 });
1365
1366 let fields = match fields {
1367 Ok(f) => f,
1368 Err(_) => {
1369 let quality_rules = self.extract_quality_rules(data);
1371 let table_uuid = self.extract_table_uuid(data);
1372 let table = Table {
1373 id: table_uuid,
1374 name: model_name.clone(),
1375 columns: Vec::new(),
1376 database_type: None,
1377 catalog_name: None,
1378 schema_name: None,
1379 medallion_layers: Vec::new(),
1380 scd_pattern: None,
1381 data_vault_classification: None,
1382 modeling_level: None,
1383 tags: Vec::<Tag>::new(),
1384 odcl_metadata: HashMap::new(),
1385 owner: None,
1386 sla: None,
1387 contact_details: None,
1388 infrastructure_type: None,
1389 notes: None,
1390 position: None,
1391 yaml_file_path: None,
1392 drawio_cell_id: None,
1393 quality: quality_rules,
1394 errors: Vec::new(),
1395 created_at: chrono::Utc::now(),
1396 updated_at: chrono::Utc::now(),
1397 };
1398 return Ok((table, errors));
1399 }
1400 };
1401
1402 let mut columns = Vec::new();
1404 for (field_name, field_data) in fields {
1405 if let Some(field_obj) = field_data.as_object() {
1406 match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
1407 Ok(mut cols) => columns.append(&mut cols),
1408 Err(e) => {
1409 errors.push(ParserError {
1410 error_type: "field_parse_error".to_string(),
1411 field: format!("Field '{}'", field_name),
1412 message: e.to_string(),
1413 });
1414 }
1415 }
1416 } else {
1417 errors.push(ParserError {
1418 error_type: "validation_error".to_string(),
1419 field: format!("Field '{}'", field_name),
1420 message: format!("Field '{}' must be an object", field_name),
1421 });
1422 }
1423 }
1424
1425 let mut odcl_metadata = HashMap::new();
1428
1429 if let Some(info_val) = data.get("info") {
1432 let info_json_value = json_value_to_serde_value(info_val);
1434 odcl_metadata.insert("info".to_string(), info_json_value);
1435 }
1436
1437 odcl_metadata.insert(
1438 "dataContractSpecification".to_string(),
1439 json_value_to_serde_value(
1440 data.get("dataContractSpecification")
1441 .unwrap_or(&JsonValue::Null),
1442 ),
1443 );
1444 odcl_metadata.insert(
1445 "id".to_string(),
1446 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1447 );
1448 if let Some(servicelevels_val) = data.get("servicelevels") {
1452 odcl_metadata.insert(
1453 "servicelevels".to_string(),
1454 json_value_to_serde_value(servicelevels_val),
1455 );
1456 }
1457
1458 if let Some(links_val) = data.get("links") {
1460 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1461 }
1462
1463 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1465 odcl_metadata.insert(
1466 "domain".to_string(),
1467 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1468 );
1469 }
1470 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1471 odcl_metadata.insert(
1472 "dataProduct".to_string(),
1473 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1474 );
1475 }
1476 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1477 odcl_metadata.insert(
1478 "tenant".to_string(),
1479 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1480 );
1481 }
1482
1483 if let Some(desc_val) = data.get("description") {
1485 odcl_metadata.insert(
1486 "description".to_string(),
1487 json_value_to_serde_value(desc_val),
1488 );
1489 }
1490
1491 if let Some(pricing_val) = data.get("pricing") {
1493 odcl_metadata.insert(
1494 "pricing".to_string(),
1495 json_value_to_serde_value(pricing_val),
1496 );
1497 }
1498
1499 if let Some(team_val) = data.get("team") {
1501 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1502 }
1503
1504 if let Some(roles_val) = data.get("roles") {
1506 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1507 }
1508
1509 if let Some(terms_val) = data.get("terms") {
1511 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1512 }
1513
1514 if let Some(servers_val) = data.get("servers") {
1516 odcl_metadata.insert(
1517 "servers".to_string(),
1518 json_value_to_serde_value(servers_val),
1519 );
1520 }
1521
1522 if let Some(infrastructure_val) = data.get("infrastructure") {
1524 odcl_metadata.insert(
1525 "infrastructure".to_string(),
1526 json_value_to_serde_value(infrastructure_val),
1527 );
1528 }
1529
1530 let database_type = self.extract_database_type_from_servers(data);
1532
1533 let (catalog_name, schema_name) = self.extract_catalog_schema(data);
1535
1536 let mut shared_domains: Vec<String> = Vec::new();
1538 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1539 for prop in custom_props {
1540 if let Some(prop_obj) = prop.as_object() {
1541 let prop_key = prop_obj
1542 .get("property")
1543 .and_then(|v| v.as_str())
1544 .unwrap_or("");
1545 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1546 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1547 {
1548 for item in arr {
1549 if let Some(s) = item.as_str() {
1550 shared_domains.push(s.to_string());
1551 }
1552 }
1553 }
1554 }
1555 }
1556 }
1557
1558 let mut tags: Vec<Tag> = Vec::new();
1560 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1561 for item in tags_arr {
1562 if let Some(s) = item.as_str() {
1563 if let Ok(tag) = Tag::from_str(s) {
1565 tags.push(tag);
1566 } else {
1567 tags.push(crate::models::Tag::Simple(s.to_string()));
1569 }
1570 }
1571 }
1572 }
1573
1574 let quality_rules = self.extract_quality_rules(data);
1576
1577 if !shared_domains.is_empty() {
1579 let shared_domains_json: Vec<serde_json::Value> = shared_domains
1580 .iter()
1581 .map(|d| serde_json::Value::String(d.clone()))
1582 .collect();
1583 odcl_metadata.insert(
1584 "sharedDomains".to_string(),
1585 serde_json::Value::Array(shared_domains_json),
1586 );
1587 }
1588
1589 let table_uuid = self.extract_table_uuid(data);
1590
1591 let table = Table {
1592 id: table_uuid,
1593 name: model_name.clone(),
1594 columns,
1595 database_type,
1596 catalog_name,
1597 schema_name,
1598 medallion_layers: Vec::new(),
1599 scd_pattern: None,
1600 data_vault_classification: None,
1601 modeling_level: None,
1602 tags,
1603 odcl_metadata,
1604 owner: None,
1605 sla: None,
1606 contact_details: None,
1607 infrastructure_type: None,
1608 notes: None,
1609 position: None,
1610 yaml_file_path: None,
1611 drawio_cell_id: None,
1612 quality: quality_rules,
1613 errors: Vec::new(),
1614 created_at: chrono::Utc::now(),
1615 updated_at: chrono::Utc::now(),
1616 };
1617
1618 info!(
1619 "Parsed Data Contract table: {} with {} warnings/errors",
1620 model_name,
1621 errors.len()
1622 );
1623 Ok((table, errors))
1624 }
1625
1626 #[allow(clippy::only_used_in_recursion)]
1631 fn expand_nested_column(
1632 &self,
1633 column_name: &str,
1634 schema: &JsonValue,
1635 nullable: bool,
1636 columns: &mut Vec<Column>,
1637 errors: &mut Vec<ParserError>,
1638 ) {
1639 let schema_obj = match schema.as_object() {
1640 Some(obj) => obj,
1641 None => {
1642 errors.push(ParserError {
1643 error_type: "parse_error".to_string(),
1644 field: column_name.to_string(),
1645 message: "Nested schema must be an object".to_string(),
1646 });
1647 return;
1648 }
1649 };
1650
1651 let schema_type = schema_obj
1652 .get("type")
1653 .and_then(|v| v.as_str())
1654 .unwrap_or("object");
1655
1656 match schema_type {
1657 "object" | "struct" => {
1658 if let Some(properties) = schema_obj.get("properties").and_then(|v| v.as_object()) {
1660 let nested_required: Vec<String> = schema_obj
1661 .get("required")
1662 .and_then(|v| v.as_array())
1663 .map(|arr| {
1664 arr.iter()
1665 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1666 .collect()
1667 })
1668 .unwrap_or_default();
1669
1670 for (nested_name, nested_schema) in properties {
1671 let nested_nullable = !nested_required.contains(nested_name);
1672 self.expand_nested_column(
1673 &format!("{}.{}", column_name, nested_name),
1674 nested_schema,
1675 nullable || nested_nullable,
1676 columns,
1677 errors,
1678 );
1679 }
1680 } else {
1681 let description = schema_obj
1683 .get("description")
1684 .and_then(|v| v.as_str())
1685 .unwrap_or("")
1686 .to_string();
1687 columns.push(Column {
1688 name: column_name.to_string(),
1689 data_type: "OBJECT".to_string(),
1690 nullable,
1691 primary_key: false,
1692 secondary_key: false,
1693 composite_key: None,
1694 foreign_key: None,
1695 constraints: Vec::new(),
1696 description,
1697 quality: Vec::new(),
1698 ref_path: None,
1699 enum_values: Vec::new(),
1700 errors: Vec::new(),
1701 column_order: 0,
1702 });
1703 }
1704 }
1705 "array" => {
1706 let items = schema_obj.get("items").unwrap_or(schema);
1708 let items_type = items
1709 .as_object()
1710 .and_then(|obj| obj.get("type").and_then(|v| v.as_str()))
1711 .unwrap_or("string");
1712
1713 if items_type == "object" || items_type == "struct" {
1714 let description = schema_obj
1716 .get("description")
1717 .and_then(|v| v.as_str())
1718 .unwrap_or("")
1719 .to_string();
1720 columns.push(Column {
1721 name: column_name.to_string(),
1722 data_type: "ARRAY<OBJECT>".to_string(),
1723 nullable,
1724 primary_key: false,
1725 secondary_key: false,
1726 composite_key: None,
1727 foreign_key: None,
1728 constraints: Vec::new(),
1729 description,
1730 quality: Vec::new(),
1731 ref_path: None,
1732 enum_values: Vec::new(),
1733 errors: Vec::new(),
1734 column_order: 0,
1735 });
1736 if let Some(properties) = items
1738 .as_object()
1739 .and_then(|obj| obj.get("properties").and_then(|v| v.as_object()))
1740 {
1741 let nested_required: Vec<String> = items
1742 .as_object()
1743 .and_then(|obj| obj.get("required").and_then(|v| v.as_array()))
1744 .map(|arr| {
1745 arr.iter()
1746 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1747 .collect()
1748 })
1749 .unwrap_or_default();
1750
1751 for (nested_name, nested_schema) in properties {
1752 let nested_nullable = !nested_required.contains(nested_name);
1753 self.expand_nested_column(
1754 &format!("{}.{}", column_name, nested_name),
1755 nested_schema,
1756 nullable || nested_nullable,
1757 columns,
1758 errors,
1759 );
1760 }
1761 }
1762 } else {
1763 let data_type = format!("ARRAY<{}>", items_type.to_uppercase());
1765 let description = schema_obj
1766 .get("description")
1767 .and_then(|v| v.as_str())
1768 .unwrap_or("")
1769 .to_string();
1770 columns.push(Column {
1771 name: column_name.to_string(),
1772 data_type,
1773 nullable,
1774 primary_key: false,
1775 secondary_key: false,
1776 composite_key: None,
1777 foreign_key: None,
1778 constraints: Vec::new(),
1779 description,
1780 quality: Vec::new(),
1781 ref_path: None,
1782 enum_values: Vec::new(),
1783 errors: Vec::new(),
1784 column_order: 0,
1785 });
1786 }
1787 }
1788 _ => {
1789 let data_type = schema_type.to_uppercase();
1791 let description = schema_obj
1792 .get("description")
1793 .and_then(|v| v.as_str())
1794 .unwrap_or("")
1795 .to_string();
1796 let enum_values = schema_obj
1797 .get("enum")
1798 .and_then(|v| v.as_array())
1799 .map(|arr| {
1800 arr.iter()
1801 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1802 .collect()
1803 })
1804 .unwrap_or_default();
1805 columns.push(Column {
1806 name: column_name.to_string(),
1807 data_type,
1808 nullable,
1809 primary_key: false,
1810 secondary_key: false,
1811 composite_key: None,
1812 foreign_key: None,
1813 constraints: Vec::new(),
1814 description,
1815 quality: Vec::new(),
1816 ref_path: None,
1817 enum_values,
1818 errors: Vec::new(),
1819 column_order: 0,
1820 });
1821 }
1822 }
1823 }
1824
1825 fn parse_data_contract_field(
1827 &self,
1828 field_name: &str,
1829 field_data: &serde_json::Map<String, JsonValue>,
1830 data: &JsonValue,
1831 errors: &mut Vec<ParserError>,
1832 ) -> Result<Vec<Column>> {
1833 let mut columns = Vec::new();
1834
1835 let extract_quality_from_obj =
1837 |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
1838 let mut quality_rules = Vec::new();
1839 if let Some(quality_val) = obj.get("quality") {
1840 if let Some(arr) = quality_val.as_array() {
1841 for item in arr {
1843 if let Some(rule_obj) = item.as_object() {
1844 let mut rule = HashMap::new();
1845 for (key, value) in rule_obj {
1846 rule.insert(key.clone(), json_value_to_serde_value(value));
1847 }
1848 quality_rules.push(rule);
1849 }
1850 }
1851 } else if let Some(rule_obj) = quality_val.as_object() {
1852 let mut rule = HashMap::new();
1854 for (key, value) in rule_obj {
1855 rule.insert(key.clone(), json_value_to_serde_value(value));
1856 }
1857 quality_rules.push(rule);
1858 }
1859 }
1860 quality_rules
1861 };
1862
1863 let description = field_data
1865 .get("description")
1866 .and_then(|v| v.as_str())
1867 .unwrap_or("")
1868 .to_string();
1869
1870 let mut quality_rules = extract_quality_from_obj(field_data);
1872
1873 if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
1875 let ref_path = Some(ref_str.to_string());
1877
1878 if let Some(definition) = self.resolve_ref(ref_str, data) {
1879 if quality_rules.is_empty() {
1883 if let Some(def_obj) = definition.as_object() {
1884 quality_rules = extract_quality_from_obj(def_obj);
1885 }
1886 } else {
1887 if let Some(def_obj) = definition.as_object() {
1889 let def_quality = extract_quality_from_obj(def_obj);
1890 quality_rules.extend(def_quality);
1892 }
1893 }
1894
1895 let required = field_data
1896 .get("required")
1897 .and_then(|v| v.as_bool())
1898 .unwrap_or(false);
1899
1900 if required {
1902 let has_not_null = quality_rules.iter().any(|rule| {
1903 rule.get("type")
1904 .and_then(|v| v.as_str())
1905 .map(|s| {
1906 s.to_lowercase().contains("not_null")
1907 || s.to_lowercase().contains("notnull")
1908 })
1909 .unwrap_or(false)
1910 });
1911 if !has_not_null {
1912 let mut not_null_rule = HashMap::new();
1913 not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
1914 not_null_rule.insert(
1915 "description".to_string(),
1916 serde_json::json!("Column must not be null"),
1917 );
1918 quality_rules.push(not_null_rule);
1919 }
1920 }
1921
1922 let has_nested = definition
1924 .get("type")
1925 .and_then(|v| v.as_str())
1926 .map(|s| s == "object")
1927 .unwrap_or(false)
1928 || definition.get("properties").is_some()
1929 || definition.get("fields").is_some();
1930
1931 if has_nested {
1932 if let Some(properties) =
1934 definition.get("properties").and_then(|v| v.as_object())
1935 {
1936 let nested_required: Vec<String> = definition
1938 .get("required")
1939 .and_then(|v| v.as_array())
1940 .map(|arr| {
1941 arr.iter()
1942 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1943 .collect()
1944 })
1945 .unwrap_or_default();
1946
1947 for (nested_name, nested_schema) in properties {
1948 let nested_required_field = nested_required.contains(nested_name);
1949 self.expand_nested_column(
1950 &format!("{}.{}", field_name, nested_name),
1951 nested_schema,
1952 !nested_required_field,
1953 &mut columns,
1954 errors,
1955 );
1956 }
1957 } else if let Some(fields) =
1958 definition.get("fields").and_then(|v| v.as_object())
1959 {
1960 for (nested_name, nested_schema) in fields {
1962 self.expand_nested_column(
1963 &format!("{}.{}", field_name, nested_name),
1964 nested_schema,
1965 true, &mut columns,
1967 errors,
1968 );
1969 }
1970 } else {
1971 columns.push(Column {
1973 name: field_name.to_string(),
1974 data_type: "OBJECT".to_string(),
1975 nullable: !required,
1976 primary_key: false,
1977 secondary_key: false,
1978 composite_key: None,
1979 foreign_key: None,
1980 constraints: Vec::new(),
1981 description: if description.is_empty() {
1982 definition
1983 .get("description")
1984 .and_then(|v| v.as_str())
1985 .unwrap_or("")
1986 .to_string()
1987 } else {
1988 description.clone()
1989 },
1990 errors: Vec::new(),
1991 quality: quality_rules.clone(),
1992 ref_path: ref_path.clone(),
1993 enum_values: Vec::new(),
1994 column_order: 0,
1995 });
1996 }
1997 } else {
1998 let def_type = definition
2000 .get("type")
2001 .and_then(|v| v.as_str())
2002 .unwrap_or("STRING")
2003 .to_uppercase();
2004
2005 let enum_values = definition
2006 .get("enum")
2007 .and_then(|v| v.as_array())
2008 .map(|arr| {
2009 arr.iter()
2010 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2011 .collect()
2012 })
2013 .unwrap_or_default();
2014
2015 columns.push(Column {
2016 name: field_name.to_string(),
2017 data_type: def_type,
2018 nullable: !required,
2019 primary_key: false,
2020 secondary_key: false,
2021 composite_key: None,
2022 foreign_key: None,
2023 constraints: Vec::new(),
2024 description: if description.is_empty() {
2025 definition
2026 .get("description")
2027 .and_then(|v| v.as_str())
2028 .unwrap_or("")
2029 .to_string()
2030 } else {
2031 description
2032 },
2033 errors: Vec::new(),
2034 quality: quality_rules,
2035 ref_path,
2036 enum_values,
2037 column_order: 0,
2038 });
2039 }
2040 return Ok(columns);
2041 } else {
2042 let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
2044 let mut error_map = HashMap::new();
2045 error_map.insert("type".to_string(), serde_json::json!("validation_error"));
2046 error_map.insert("field".to_string(), serde_json::json!("data_type"));
2047 error_map.insert(
2048 "message".to_string(),
2049 serde_json::json!(format!(
2050 "Field '{}' references undefined definition: {}",
2051 field_name, ref_str
2052 )),
2053 );
2054 col_errors.push(error_map);
2055 columns.push(Column {
2056 name: field_name.to_string(),
2057 data_type: "OBJECT".to_string(),
2058 nullable: true,
2059 primary_key: false,
2060 secondary_key: false,
2061 composite_key: None,
2062 foreign_key: None,
2063 constraints: Vec::new(),
2064 description,
2065 errors: col_errors,
2066 quality: Vec::new(),
2067 ref_path: Some(ref_str.to_string()), enum_values: Vec::new(),
2069 column_order: 0,
2070 });
2071 return Ok(columns);
2072 }
2073 }
2074
2075 let field_type_str = field_data
2077 .get("type")
2078 .and_then(|v| v.as_str())
2079 .unwrap_or("STRING");
2080
2081 if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
2083 match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
2084 Ok(nested_cols) if !nested_cols.is_empty() => {
2085 let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
2087 "ARRAY<STRUCT<...>>".to_string()
2088 } else {
2089 "STRUCT<...>".to_string()
2090 };
2091
2092 columns.push(Column {
2094 name: field_name.to_string(),
2095 data_type: parent_data_type,
2096 nullable: !field_data
2097 .get("required")
2098 .and_then(|v| v.as_bool())
2099 .unwrap_or(false),
2100 primary_key: false,
2101 secondary_key: false,
2102 composite_key: None,
2103 foreign_key: None,
2104 constraints: Vec::new(),
2105 description: description.clone(),
2106 errors: Vec::new(),
2107 quality: quality_rules.clone(),
2108 ref_path: field_data
2109 .get("$ref")
2110 .and_then(|v| v.as_str())
2111 .map(|s| s.to_string()),
2112 enum_values: Vec::new(),
2113 column_order: 0,
2114 });
2115
2116 columns.extend(nested_cols);
2118 return Ok(columns);
2119 }
2120 Ok(_) | Err(_) => {
2121 }
2123 }
2124 }
2125
2126 let field_type = normalize_data_type(field_type_str);
2127
2128 if field_type == "ARRAY" {
2130 let items = field_data.get("items");
2131 if let Some(items_val) = items {
2132 if let Some(items_obj) = items_val.as_object() {
2133 if items_obj.get("fields").is_some()
2135 || items_obj.get("type").and_then(|v| v.as_str()) == Some("object")
2136 {
2137 columns.push(Column {
2139 name: field_name.to_string(),
2140 data_type: "ARRAY<OBJECT>".to_string(),
2141 nullable: !field_data
2142 .get("required")
2143 .and_then(|v| v.as_bool())
2144 .unwrap_or(false),
2145 primary_key: false,
2146 secondary_key: false,
2147 composite_key: None,
2148 foreign_key: None,
2149 constraints: Vec::new(),
2150 description: field_data
2151 .get("description")
2152 .and_then(|v| v.as_str())
2153 .unwrap_or("")
2154 .to_string(),
2155 errors: Vec::new(),
2156 quality: Vec::new(),
2157 ref_path: None,
2158 enum_values: Vec::new(),
2159 column_order: 0,
2160 });
2161
2162 let nested_fields_obj = items_obj
2165 .get("properties")
2166 .and_then(|v| v.as_object())
2167 .or_else(|| items_obj.get("fields").and_then(|v| v.as_object()));
2168
2169 if let Some(fields_obj) = nested_fields_obj {
2170 for (nested_field_name, nested_field_data) in fields_obj {
2171 if let Some(nested_field_obj) = nested_field_data.as_object() {
2172 let nested_field_type = nested_field_obj
2173 .get("type")
2174 .and_then(|v| v.as_str())
2175 .unwrap_or("STRING");
2176
2177 let nested_col_name =
2179 format!("{}.{}", field_name, nested_field_name);
2180 let mut local_errors = Vec::new();
2181 match self.parse_data_contract_field(
2182 &nested_col_name,
2183 nested_field_obj,
2184 data,
2185 &mut local_errors,
2186 ) {
2187 Ok(mut nested_cols) => {
2188 columns.append(&mut nested_cols);
2191 }
2192 Err(_) => {
2193 columns.push(Column {
2195 name: nested_col_name,
2196 data_type: nested_field_type.to_uppercase(),
2197 nullable: !nested_field_obj
2198 .get("required")
2199 .and_then(|v| v.as_bool())
2200 .unwrap_or(false),
2201 primary_key: false,
2202 secondary_key: false,
2203 composite_key: None,
2204 foreign_key: None,
2205 constraints: Vec::new(),
2206 description: nested_field_obj
2207 .get("description")
2208 .and_then(|v| v.as_str())
2209 .unwrap_or("")
2210 .to_string(),
2211 errors: Vec::new(),
2212 quality: Vec::new(),
2213 ref_path: None,
2214 enum_values: Vec::new(),
2215 column_order: 0,
2216 });
2217 }
2218 }
2219 }
2220 }
2221 }
2222
2223 return Ok(columns);
2224 } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
2225 columns.push(Column {
2227 name: field_name.to_string(),
2228 data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
2229 nullable: !field_data
2230 .get("required")
2231 .and_then(|v| v.as_bool())
2232 .unwrap_or(false),
2233 primary_key: false,
2234 secondary_key: false,
2235 composite_key: None,
2236 foreign_key: None,
2237 constraints: Vec::new(),
2238 description: description.clone(),
2239 errors: Vec::new(),
2240 quality: quality_rules.clone(),
2241 ref_path: field_data
2242 .get("$ref")
2243 .and_then(|v| v.as_str())
2244 .map(|s| s.to_string()),
2245 enum_values: Vec::new(),
2246 column_order: 0,
2247 });
2248 return Ok(columns);
2249 }
2250 } else if let Some(item_type_str) = items_val.as_str() {
2251 columns.push(Column {
2253 name: field_name.to_string(),
2254 data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
2255 nullable: !field_data
2256 .get("required")
2257 .and_then(|v| v.as_bool())
2258 .unwrap_or(false),
2259 primary_key: false,
2260 secondary_key: false,
2261 composite_key: None,
2262 foreign_key: None,
2263 constraints: Vec::new(),
2264 description: description.clone(),
2265 errors: Vec::new(),
2266 quality: quality_rules.clone(),
2267 ref_path: field_data
2268 .get("$ref")
2269 .and_then(|v| v.as_str())
2270 .map(|s| s.to_string()),
2271 enum_values: Vec::new(),
2272 column_order: 0,
2273 });
2274 return Ok(columns);
2275 }
2276 }
2277 columns.push(Column {
2279 name: field_name.to_string(),
2280 data_type: "ARRAY<STRING>".to_string(),
2281 nullable: !field_data
2282 .get("required")
2283 .and_then(|v| v.as_bool())
2284 .unwrap_or(false),
2285 primary_key: false,
2286 secondary_key: false,
2287 composite_key: None,
2288 foreign_key: None,
2289 constraints: Vec::new(),
2290 description: description.clone(),
2291 errors: Vec::new(),
2292 quality: quality_rules.clone(),
2293 ref_path: field_data
2294 .get("$ref")
2295 .and_then(|v| v.as_str())
2296 .map(|s| s.to_string()),
2297 enum_values: Vec::new(),
2298 column_order: 0,
2299 });
2300 return Ok(columns);
2301 }
2302
2303 let nested_fields_obj = field_data
2306 .get("properties")
2307 .and_then(|v| v.as_object())
2308 .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
2309
2310 if field_type == "OBJECT"
2311 && let Some(fields_obj) = nested_fields_obj
2312 {
2313 columns.push(Column {
2317 name: field_name.to_string(),
2318 data_type: "OBJECT".to_string(),
2319 nullable: !field_data
2320 .get("required")
2321 .and_then(|v| v.as_bool())
2322 .unwrap_or(false),
2323 primary_key: false,
2324 secondary_key: false,
2325 composite_key: None,
2326 foreign_key: None,
2327 constraints: Vec::new(),
2328 description: description.clone(),
2329 errors: Vec::new(),
2330 quality: quality_rules.clone(),
2331 ref_path: field_data
2332 .get("$ref")
2333 .and_then(|v| v.as_str())
2334 .map(|s| s.to_string()),
2335 enum_values: Vec::new(),
2336 column_order: 0,
2337 });
2338
2339 for (nested_field_name, nested_field_data) in fields_obj {
2341 if let Some(nested_field_obj) = nested_field_data.as_object() {
2342 let nested_field_type = nested_field_obj
2343 .get("type")
2344 .and_then(|v| v.as_str())
2345 .unwrap_or("STRING");
2346
2347 let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2349 match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data) {
2350 Ok(mut nested_cols) => {
2351 columns.append(&mut nested_cols);
2354 }
2355 Err(_) => {
2356 columns.push(Column {
2358 name: nested_col_name,
2359 data_type: nested_field_type.to_uppercase(),
2360 nullable: !nested_field_obj
2361 .get("required")
2362 .and_then(|v| v.as_bool())
2363 .unwrap_or(false),
2364 primary_key: false,
2365 secondary_key: false,
2366 composite_key: None,
2367 foreign_key: None,
2368 constraints: Vec::new(),
2369 description: nested_field_obj
2370 .get("description")
2371 .and_then(|v| v.as_str())
2372 .unwrap_or("")
2373 .to_string(),
2374 errors: Vec::new(),
2375 quality: Vec::new(),
2376 ref_path: None,
2377 enum_values: Vec::new(),
2378 column_order: 0,
2379 });
2380 }
2381 }
2382 }
2383 }
2384
2385 return Ok(columns);
2386 }
2387
2388 let ref_path = field_data
2391 .get("$ref")
2392 .and_then(|v| v.as_str())
2393 .map(|s| s.to_string());
2394
2395 let required = field_data
2396 .get("required")
2397 .and_then(|v| v.as_bool())
2398 .unwrap_or(false);
2399
2400 let field_description = if description.is_empty() {
2402 field_data
2403 .get("description")
2404 .and_then(|v| v.as_str())
2405 .unwrap_or("")
2406 .to_string()
2407 } else {
2408 description
2409 };
2410
2411 let mut column_quality_rules = quality_rules;
2413
2414 if column_quality_rules.is_empty()
2416 && let Some(quality_val) = field_data.get("quality")
2417 {
2418 if let Some(arr) = quality_val.as_array() {
2419 for item in arr {
2421 if let Some(obj) = item.as_object() {
2422 let mut rule = HashMap::new();
2423 for (key, value) in obj {
2424 rule.insert(key.clone(), json_value_to_serde_value(value));
2425 }
2426 column_quality_rules.push(rule);
2427 }
2428 }
2429 } else if let Some(obj) = quality_val.as_object() {
2430 let mut rule = HashMap::new();
2432 for (key, value) in obj {
2433 rule.insert(key.clone(), json_value_to_serde_value(value));
2434 }
2435 column_quality_rules.push(rule);
2436 }
2437 }
2438
2439 if required {
2441 let has_not_null = column_quality_rules.iter().any(|rule| {
2442 rule.get("type")
2443 .and_then(|v| v.as_str())
2444 .map(|s| {
2445 s.to_lowercase().contains("not_null")
2446 || s.to_lowercase().contains("notnull")
2447 })
2448 .unwrap_or(false)
2449 });
2450 if !has_not_null {
2451 let mut not_null_rule = HashMap::new();
2452 not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
2453 not_null_rule.insert(
2454 "description".to_string(),
2455 serde_json::json!("Column must not be null"),
2456 );
2457 column_quality_rules.push(not_null_rule);
2458 }
2459 }
2460
2461 columns.push(Column {
2462 name: field_name.to_string(),
2463 data_type: field_type,
2464 nullable: !required,
2465 primary_key: field_data
2466 .get("primaryKey")
2467 .and_then(|v| v.as_bool())
2468 .unwrap_or(false),
2469 secondary_key: false,
2470 composite_key: None,
2471 foreign_key: self.parse_foreign_key_from_data_contract(field_data),
2472 constraints: Vec::new(),
2473 description: field_description,
2474 errors: Vec::new(),
2475 quality: column_quality_rules,
2476 ref_path,
2477 enum_values: Vec::new(),
2478 column_order: 0,
2479 });
2480
2481 Ok(columns)
2482 }
2483
2484 fn parse_foreign_key_from_data_contract(
2486 &self,
2487 field_data: &serde_json::Map<String, JsonValue>,
2488 ) -> Option<ForeignKey> {
2489 field_data
2490 .get("foreignKey")
2491 .and_then(|v| v.as_object())
2492 .map(|fk_obj| ForeignKey {
2493 table_id: fk_obj
2494 .get("table")
2495 .or_else(|| fk_obj.get("table_id"))
2496 .and_then(|v| v.as_str())
2497 .unwrap_or("")
2498 .to_string(),
2499 column_name: fk_obj
2500 .get("column")
2501 .or_else(|| fk_obj.get("column_name"))
2502 .and_then(|v| v.as_str())
2503 .unwrap_or("")
2504 .to_string(),
2505 })
2506 }
2507
2508 fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2510 if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
2512 if let Some((_, server_data)) = servers_obj.iter().next()
2514 && let Some(server_obj) = server_data.as_object()
2515 {
2516 return server_obj
2517 .get("type")
2518 .and_then(|v| v.as_str())
2519 .and_then(|s| self.parse_database_type(s));
2520 }
2521 } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
2522 if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
2524 return server_obj
2525 .get("type")
2526 .and_then(|v| v.as_str())
2527 .and_then(|s| self.parse_database_type(s));
2528 }
2529 }
2530 None
2531 }
2532
2533 fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
2535 match s.to_lowercase().as_str() {
2536 "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
2537 "postgres" | "postgresql" => Some(DatabaseType::Postgres),
2538 "mysql" => Some(DatabaseType::Mysql),
2539 "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
2540 "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
2541 _ => None,
2542 }
2543 }
2544
2545 fn parse_struct_type_from_string(
2547 &self,
2548 field_name: &str,
2549 type_str: &str,
2550 field_data: &serde_json::Map<String, JsonValue>,
2551 ) -> Result<Vec<Column>> {
2552 let mut columns = Vec::new();
2553
2554 let normalized_type = type_str
2556 .lines()
2557 .map(|line| line.trim())
2558 .filter(|line| !line.is_empty())
2559 .collect::<Vec<_>>()
2560 .join(" ");
2561
2562 let type_str_upper = normalized_type.to_uppercase();
2563
2564 let _is_array = type_str_upper.starts_with("ARRAY<");
2566 let struct_start = type_str_upper.find("STRUCT<");
2567
2568 if let Some(start_pos) = struct_start {
2569 let struct_content = &normalized_type[start_pos + 7..]; let mut depth = 1;
2574 let mut end_pos = None;
2575 for (i, ch) in struct_content.char_indices() {
2576 match ch {
2577 '<' => depth += 1,
2578 '>' => {
2579 depth -= 1;
2580 if depth == 0 {
2581 end_pos = Some(i);
2582 break;
2583 }
2584 }
2585 _ => {}
2586 }
2587 }
2588
2589 let struct_fields_str = if let Some(end) = end_pos {
2591 &struct_content[..end]
2592 } else {
2593 struct_content.trim_end_matches('>').trim()
2595 };
2596
2597 let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
2599
2600 for (nested_name, nested_type) in fields {
2602 let nested_type_upper = nested_type.to_uppercase();
2603 let nested_col_name = format!("{}.{}", field_name, nested_name);
2604
2605 if nested_type_upper.starts_with("STRUCT<") {
2607 let nested_struct_type_str = if nested_type_upper.starts_with("ARRAY<STRUCT<") {
2610 nested_type.clone()
2612 } else {
2613 nested_type.clone()
2615 };
2616
2617 match self.parse_struct_type_from_string(
2619 &nested_col_name,
2620 &nested_struct_type_str,
2621 field_data,
2622 ) {
2623 Ok(nested_cols) => {
2624 columns.extend(nested_cols);
2626 }
2627 Err(_) => {
2628 columns.push(Column {
2630 name: nested_col_name,
2631 data_type: normalize_data_type(&nested_type),
2632 nullable: !field_data
2633 .get("required")
2634 .and_then(|v| v.as_bool())
2635 .unwrap_or(false),
2636 primary_key: false,
2637 secondary_key: false,
2638 composite_key: None,
2639 foreign_key: None,
2640 constraints: Vec::new(),
2641 description: field_data
2642 .get("description")
2643 .and_then(|v| v.as_str())
2644 .unwrap_or("")
2645 .to_string(),
2646 errors: Vec::new(),
2647 quality: Vec::new(),
2648 ref_path: None,
2649 enum_values: Vec::new(),
2650 column_order: 0,
2651 });
2652 }
2653 }
2654 } else {
2655 columns.push(Column {
2657 name: nested_col_name,
2658 data_type: normalize_data_type(&nested_type),
2659 nullable: !field_data
2660 .get("required")
2661 .and_then(|v| v.as_bool())
2662 .unwrap_or(false),
2663 primary_key: false,
2664 secondary_key: false,
2665 composite_key: None,
2666 foreign_key: None,
2667 constraints: Vec::new(),
2668 description: field_data
2669 .get("description")
2670 .and_then(|v| v.as_str())
2671 .unwrap_or("")
2672 .to_string(),
2673 errors: Vec::new(),
2674 quality: Vec::new(),
2675 ref_path: None,
2676 enum_values: Vec::new(),
2677 column_order: 0,
2678 });
2679 }
2680 }
2681
2682 return Ok(columns);
2683 }
2684
2685 Ok(Vec::new())
2687 }
2688
2689 fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
2691 let mut fields = Vec::new();
2692 let mut current_field = String::new();
2693 let mut depth = 0;
2694 let mut in_string = false;
2695 let mut string_char = None;
2696
2697 for ch in fields_str.chars() {
2698 match ch {
2699 '\'' | '"' if !in_string || Some(ch) == string_char => {
2700 if in_string {
2701 in_string = false;
2702 string_char = None;
2703 } else {
2704 in_string = true;
2705 string_char = Some(ch);
2706 }
2707 current_field.push(ch);
2708 }
2709 '<' if !in_string => {
2710 depth += 1;
2711 current_field.push(ch);
2712 }
2713 '>' if !in_string => {
2714 depth -= 1;
2715 current_field.push(ch);
2716 }
2717 ',' if !in_string && depth == 0 => {
2718 let trimmed = current_field.trim();
2720 if !trimmed.is_empty()
2721 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2722 {
2723 fields.push((name, type_part));
2724 }
2725 current_field.clear();
2726 }
2727 _ => {
2728 current_field.push(ch);
2729 }
2730 }
2731 }
2732
2733 let trimmed = current_field.trim();
2735 if !trimmed.is_empty()
2736 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2737 {
2738 fields.push((name, type_part));
2739 }
2740
2741 Ok(fields)
2742 }
2743
2744 fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
2746 let colon_pos = field_def.find(':')?;
2748 let name = field_def[..colon_pos].trim().to_string();
2749 let type_part = field_def[colon_pos + 1..].trim().to_string();
2750
2751 if name.is_empty() || type_part.is_empty() {
2752 return None;
2753 }
2754
2755 Some((name, type_part))
2756 }
2757
2758 fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
2760 let mut catalog_name = None;
2761 let mut schema_name = None;
2762
2763 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2764 for prop in custom_props {
2765 if let Some(prop_obj) = prop.as_object() {
2766 let prop_key = prop_obj
2767 .get("property")
2768 .and_then(|v| v.as_str())
2769 .unwrap_or("");
2770 let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
2771
2772 match prop_key {
2773 "catalogName" | "catalog_name" => {
2774 catalog_name = prop_value.map(|s| s.to_string());
2775 }
2776 "schemaName" | "schema_name" => {
2777 schema_name = prop_value.map(|s| s.to_string());
2778 }
2779 _ => {}
2780 }
2781 }
2782 }
2783 }
2784
2785 if catalog_name.is_none() {
2787 catalog_name = data
2788 .get("catalog_name")
2789 .and_then(|v| v.as_str())
2790 .map(|s| s.to_string());
2791 }
2792 if schema_name.is_none() {
2793 schema_name = data
2794 .get("schema_name")
2795 .and_then(|v| v.as_str())
2796 .map(|s| s.to_string());
2797 }
2798
2799 (catalog_name, schema_name)
2800 }
2801}
2802
2803impl Default for ODCSImporter {
2804 fn default() -> Self {
2805 Self::new()
2806 }
2807}
2808
2809#[derive(Debug, Clone)]
2811pub struct ParserError {
2812 pub error_type: String,
2813 pub field: String,
2814 pub message: String,
2815}
2816
2817fn yaml_to_json_value(yaml: &serde_yaml::Value) -> Result<JsonValue> {
2819 let json_str = serde_json::to_string(yaml).context("Failed to convert YAML to JSON")?;
2821 serde_json::from_str(&json_str).context("Failed to parse JSON")
2822}
2823
2824fn json_value_to_serde_value(value: &JsonValue) -> serde_json::Value {
2826 value.clone()
2827}
2828
2829fn normalize_data_type(data_type: &str) -> String {
2831 if data_type.is_empty() {
2832 return data_type.to_string();
2833 }
2834
2835 let upper = data_type.to_uppercase();
2836
2837 if upper.starts_with("STRUCT") {
2839 if let Some(start) = data_type.find('<')
2840 && let Some(end) = data_type.rfind('>')
2841 {
2842 let inner = &data_type[start + 1..end];
2843 return format!("STRUCT<{}>", inner);
2844 }
2845 return format!("STRUCT{}", &data_type[6..]);
2846 } else if upper.starts_with("ARRAY") {
2847 if let Some(start) = data_type.find('<')
2848 && let Some(end) = data_type.rfind('>')
2849 {
2850 let inner = &data_type[start + 1..end];
2851 return format!("ARRAY<{}>", inner);
2852 }
2853 return format!("ARRAY{}", &data_type[5..]);
2854 } else if upper.starts_with("MAP") {
2855 if let Some(start) = data_type.find('<')
2856 && let Some(end) = data_type.rfind('>')
2857 {
2858 let inner = &data_type[start + 1..end];
2859 return format!("MAP<{}>", inner);
2860 }
2861 return format!("MAP{}", &data_type[3..]);
2862 }
2863
2864 upper
2865}
2866
2867fn parse_medallion_layer(s: &str) -> Result<MedallionLayer> {
2869 match s.to_uppercase().as_str() {
2870 "BRONZE" => Ok(MedallionLayer::Bronze),
2871 "SILVER" => Ok(MedallionLayer::Silver),
2872 "GOLD" => Ok(MedallionLayer::Gold),
2873 "OPERATIONAL" => Ok(MedallionLayer::Operational),
2874 _ => Err(anyhow::anyhow!("Unknown medallion layer: {}", s)),
2875 }
2876}
2877
2878fn parse_scd_pattern(s: &str) -> Result<SCDPattern> {
2879 match s.to_uppercase().as_str() {
2880 "TYPE_1" | "TYPE1" => Ok(SCDPattern::Type1),
2881 "TYPE_2" | "TYPE2" => Ok(SCDPattern::Type2),
2882 _ => Err(anyhow::anyhow!("Unknown SCD pattern: {}", s)),
2883 }
2884}
2885
2886fn parse_data_vault_classification(s: &str) -> Result<DataVaultClassification> {
2887 match s.to_uppercase().as_str() {
2888 "HUB" => Ok(DataVaultClassification::Hub),
2889 "LINK" => Ok(DataVaultClassification::Link),
2890 "SATELLITE" | "SAT" => Ok(DataVaultClassification::Satellite),
2891 _ => Err(anyhow::anyhow!("Unknown Data Vault classification: {}", s)),
2892 }
2893}
2894
2895#[cfg(test)]
2896mod tests {
2897 use super::*;
2898
2899 #[test]
2900 fn test_parse_simple_odcl_table() {
2901 let mut parser = ODCSImporter::new();
2902 let odcl_yaml = r#"
2903name: users
2904columns:
2905 - name: id
2906 data_type: INT
2907 nullable: false
2908 primary_key: true
2909 - name: name
2910 data_type: VARCHAR(255)
2911 nullable: false
2912database_type: Postgres
2913"#;
2914
2915 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2916 assert_eq!(table.name, "users");
2917 assert_eq!(table.columns.len(), 2);
2918 assert_eq!(table.columns[0].name, "id");
2919 assert_eq!(table.database_type, Some(DatabaseType::Postgres));
2920 assert_eq!(errors.len(), 0);
2921 }
2922
2923 #[test]
2924 fn test_parse_odcl_with_metadata() {
2925 let mut parser = ODCSImporter::new();
2926 let odcl_yaml = r#"
2927name: users
2928columns:
2929 - name: id
2930 data_type: INT
2931medallion_layer: gold
2932scd_pattern: TYPE_2
2933odcl_metadata:
2934 description: "User table"
2935 owner: "data-team"
2936"#;
2937
2938 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2939 assert_eq!(table.medallion_layers.len(), 1);
2940 assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
2941 assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
2942 if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
2943 assert_eq!(desc, "User table");
2944 }
2945 assert_eq!(errors.len(), 0);
2946 }
2947
2948 #[test]
2949 fn test_parse_odcl_with_data_vault() {
2950 let mut parser = ODCSImporter::new();
2951 let odcl_yaml = r#"
2952name: hub_customer
2953columns:
2954 - name: customer_key
2955 data_type: VARCHAR(50)
2956data_vault_classification: Hub
2957"#;
2958
2959 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2960 assert_eq!(
2961 table.data_vault_classification,
2962 Some(DataVaultClassification::Hub)
2963 );
2964 assert_eq!(errors.len(), 0);
2965 }
2966
2967 #[test]
2968 fn test_parse_invalid_odcl() {
2969 let mut parser = ODCSImporter::new();
2970 let invalid_yaml = "not: valid: yaml: structure:";
2971
2972 assert!(parser.parse(invalid_yaml).is_err());
2974 }
2975
2976 #[test]
2977 fn test_parse_odcl_missing_required_fields() {
2978 let mut parser = ODCSImporter::new();
2979 let non_conformant = r#"
2980name: users
2981# Missing required columns field
2982"#;
2983
2984 assert!(parser.parse(non_conformant).is_err());
2986 }
2987
2988 #[test]
2989 fn test_parse_odcl_with_foreign_key() {
2990 let mut parser = ODCSImporter::new();
2991 let odcl_yaml = r#"
2992name: orders
2993columns:
2994 - name: id
2995 data_type: INT
2996 primary_key: true
2997 - name: user_id
2998 data_type: INT
2999 foreign_key:
3000 table_id: users
3001 column_name: id
3002"#;
3003
3004 let (table, errors) = parser.parse(odcl_yaml).unwrap();
3005 assert_eq!(table.columns.len(), 2);
3006 let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
3007 assert!(user_id_col.foreign_key.is_some());
3008 assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
3009 assert_eq!(errors.len(), 0);
3010 }
3011
3012 #[test]
3013 fn test_parse_odcl_with_constraints() {
3014 let mut parser = ODCSImporter::new();
3015 let odcl_yaml = r#"
3016name: products
3017columns:
3018 - name: id
3019 data_type: INT
3020 primary_key: true
3021 - name: name
3022 data_type: VARCHAR(255)
3023 nullable: false
3024 constraints:
3025 - UNIQUE
3026 - NOT NULL
3027"#;
3028
3029 let (table, errors) = parser.parse(odcl_yaml).unwrap();
3030 assert_eq!(table.columns.len(), 2);
3031 let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
3032 assert!(!name_col.nullable);
3033 assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
3034 assert_eq!(errors.len(), 0);
3035 }
3036}