1use super::{ImportError, ImportResult, TableData};
12use crate::models::column::ForeignKey;
13use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
14use crate::models::{Column, Table};
15use anyhow::{Context, Result};
16use serde_json::Value as JsonValue;
17use std::collections::HashMap;
18use tracing::info;
19
20pub struct ODCSImporter {
23 current_yaml_data: Option<serde_yaml::Value>,
25}
26
27impl ODCSImporter {
28 pub fn new() -> Self {
38 Self {
39 current_yaml_data: None,
40 }
41 }
42
43 pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
76 match self.parse(yaml_content) {
77 Ok((table, errors)) => {
78 let sdk_tables = vec![TableData {
79 table_index: 0,
80 name: Some(table.name.clone()),
81 columns: table
82 .columns
83 .iter()
84 .map(|c| super::ColumnData {
85 name: c.name.clone(),
86 data_type: c.data_type.clone(),
87 nullable: c.nullable,
88 primary_key: c.primary_key,
89 })
90 .collect(),
91 }];
92 let sdk_errors: Vec<ImportError> = errors
93 .iter()
94 .map(|e| ImportError::ParseError(e.message.clone()))
95 .collect();
96 Ok(ImportResult {
97 tables: sdk_tables,
98 tables_requiring_name: Vec::new(),
99 errors: sdk_errors,
100 ai_suggestions: None,
101 })
102 }
103 Err(e) => Err(ImportError::ParseError(e.to_string())),
104 }
105 }
106
107 pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
118 self.parse(yaml_content)
119 }
120
121 fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
131 let _errors: Vec<ParserError> = Vec::new();
133
134 let data: serde_yaml::Value =
136 serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
137
138 if data.is_null() {
139 return Err(anyhow::anyhow!("Empty YAML content"));
140 }
141
142 self.current_yaml_data = Some(data.clone());
144
145 let json_data = yaml_to_json_value(&data)?;
147
148 if self.is_liquibase_format(&json_data) {
150 return self.parse_liquibase(&json_data);
151 }
152
153 if self.is_odcl_v3_format(&json_data) {
154 return self.parse_odcl_v3(&json_data);
155 }
156
157 if self.is_data_contract_format(&json_data) {
158 return self.parse_data_contract(&json_data);
159 }
160
161 self.parse_simple_odcl(&json_data)
163 }
164
165 fn resolve_ref<'a>(&self, ref_str: &str, data: &'a JsonValue) -> Option<&'a JsonValue> {
167 if !ref_str.starts_with("#/") {
168 return None;
169 }
170
171 let path = &ref_str[2..];
173 let parts: Vec<&str> = path.split('/').collect();
174
175 let mut current = data;
177 for part in parts {
178 current = current.get(part)?;
179 }
180
181 if current.is_object() {
182 Some(current)
183 } else {
184 None
185 }
186 }
187
188 fn is_liquibase_format(&self, data: &JsonValue) -> bool {
190 if data.get("databaseChangeLog").is_some() {
191 return true;
192 }
193 if let Some(obj) = data.as_object() {
195 let obj_str = format!("{:?}", obj);
196 if obj_str.contains("changeSet") {
197 return true;
198 }
199 }
200 false
201 }
202
203 fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
205 if let Some(obj) = data.as_object() {
206 let has_api_version = obj.contains_key("apiVersion");
207 let has_kind = obj
208 .get("kind")
209 .and_then(|v| v.as_str())
210 .map(|s| s == "DataContract")
211 .unwrap_or(false);
212 let has_id = obj.contains_key("id");
213 let has_version = obj.contains_key("version");
214 return has_api_version && has_kind && has_id && has_version;
215 }
216 false
217 }
218
219 fn is_data_contract_format(&self, data: &JsonValue) -> bool {
221 if let Some(obj) = data.as_object() {
222 let has_spec = obj.contains_key("dataContractSpecification");
223 let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
224 return has_spec && has_models;
225 }
226 false
227 }
228
229 fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
231 let mut errors = Vec::new();
232
233 let name = data
235 .get("name")
236 .and_then(|v| v.as_str())
237 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
238 .to_string();
239
240 let columns_data = data
242 .get("columns")
243 .and_then(|v| v.as_array())
244 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
245
246 let mut columns = Vec::new();
247 for (idx, col_data) in columns_data.iter().enumerate() {
248 match self.parse_column(col_data) {
249 Ok(col) => columns.push(col),
250 Err(e) => {
251 errors.push(ParserError {
252 error_type: "column_parse_error".to_string(),
253 field: format!("columns[{}]", idx),
254 message: e.to_string(),
255 });
256 }
257 }
258 }
259
260 let database_type = self.extract_database_type(data);
262 let medallion_layers = self.extract_medallion_layers(data);
263 let scd_pattern = self.extract_scd_pattern(data);
264 let data_vault_classification = self.extract_data_vault_classification(data);
265 let quality_rules = self.extract_quality_rules(data);
266
267 if scd_pattern.is_some() && data_vault_classification.is_some() {
269 errors.push(ParserError {
270 error_type: "validation_error".to_string(),
271 field: "patterns".to_string(),
272 message: "SCD pattern and Data Vault classification are mutually exclusive"
273 .to_string(),
274 });
275 }
276
277 let mut odcl_metadata = HashMap::new();
279 if let Some(metadata) = data.get("odcl_metadata")
280 && let Some(obj) = metadata.as_object()
281 {
282 for (key, value) in obj {
283 odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
284 }
285 }
286
287 let table_uuid = self.extract_table_uuid(data);
288
289 let table = Table {
290 id: table_uuid,
291 name,
292 columns,
293 database_type,
294 catalog_name: None,
295 schema_name: None,
296 medallion_layers,
297 scd_pattern,
298 data_vault_classification,
299 modeling_level: None,
300 tags: Vec::new(),
301 odcl_metadata,
302 owner: None,
303 sla: None,
304 contact_details: None,
305 infrastructure_type: None,
306 notes: None,
307 position: None,
308 yaml_file_path: None,
309 drawio_cell_id: None,
310 quality: quality_rules,
311 errors: Vec::new(),
312 created_at: chrono::Utc::now(),
313 updated_at: chrono::Utc::now(),
314 };
315
316 info!("Parsed ODCL table: {}", table.name);
317 Ok((table, errors))
318 }
319
320 fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
322 let name = col_data
323 .get("name")
324 .and_then(|v| v.as_str())
325 .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
326 .to_string();
327
328 let data_type = col_data
329 .get("data_type")
330 .and_then(|v| v.as_str())
331 .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
332 .to_string();
333
334 let data_type = normalize_data_type(&data_type);
336
337 let nullable = col_data
338 .get("nullable")
339 .and_then(|v| v.as_bool())
340 .unwrap_or(true);
341
342 let primary_key = col_data
343 .get("primary_key")
344 .and_then(|v| v.as_bool())
345 .unwrap_or(false);
346
347 let foreign_key = col_data
348 .get("foreign_key")
349 .and_then(|v| self.parse_foreign_key(v));
350
351 let constraints = col_data
352 .get("constraints")
353 .and_then(|v| v.as_array())
354 .map(|arr| {
355 arr.iter()
356 .filter_map(|v| v.as_str().map(|s| s.to_string()))
357 .collect()
358 })
359 .unwrap_or_default();
360
361 let description = col_data
362 .get("description")
363 .and_then(|v| v.as_str())
364 .map(|s| s.to_string())
365 .unwrap_or_default();
366
367 let mut column_quality_rules = Vec::new();
369 if let Some(quality_val) = col_data.get("quality") {
370 if let Some(arr) = quality_val.as_array() {
371 for item in arr {
373 if let Some(obj) = item.as_object() {
374 let mut rule = HashMap::new();
375 for (key, value) in obj {
376 rule.insert(key.clone(), json_value_to_serde_value(value));
377 }
378 column_quality_rules.push(rule);
379 }
380 }
381 } else if let Some(obj) = quality_val.as_object() {
382 let mut rule = HashMap::new();
384 for (key, value) in obj {
385 rule.insert(key.clone(), json_value_to_serde_value(value));
386 }
387 column_quality_rules.push(rule);
388 }
389 }
390
391 if !nullable {
393 let has_not_null = column_quality_rules.iter().any(|rule| {
394 rule.get("type")
395 .and_then(|v| v.as_str())
396 .map(|s| {
397 s.to_lowercase().contains("not_null")
398 || s.to_lowercase().contains("notnull")
399 })
400 .unwrap_or(false)
401 });
402 if !has_not_null {
403 let mut not_null_rule = HashMap::new();
404 not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
405 not_null_rule.insert(
406 "description".to_string(),
407 serde_json::json!("Column must not be null"),
408 );
409 column_quality_rules.push(not_null_rule);
410 }
411 }
412
413 Ok(Column {
414 name,
415 data_type,
416 nullable,
417 primary_key,
418 secondary_key: false,
419 composite_key: None,
420 foreign_key,
421 constraints,
422 description,
423 errors: Vec::new(),
424 quality: column_quality_rules,
425 enum_values: Vec::new(),
426 column_order: 0,
427 })
428 }
429
430 fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
432 let obj = fk_data.as_object()?;
433 Some(ForeignKey {
434 table_id: obj
435 .get("table_id")
436 .or_else(|| obj.get("table"))
437 .and_then(|v| v.as_str())
438 .unwrap_or("")
439 .to_string(),
440 column_name: obj
441 .get("column_name")
442 .or_else(|| obj.get("column"))
443 .and_then(|v| v.as_str())
444 .unwrap_or("")
445 .to_string(),
446 })
447 }
448
449 fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
451 data.get("database_type")
452 .and_then(|v| v.as_str())
453 .and_then(|s| match s.to_uppercase().as_str() {
454 "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
455 "MYSQL" => Some(DatabaseType::Mysql),
456 "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
457 "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
458 "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
459 _ => None,
460 })
461 }
462
463 fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
465 let mut layers = Vec::new();
466
467 if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
469 for item in arr {
470 if let Some(s) = item.as_str()
471 && let Ok(layer) = parse_medallion_layer(s)
472 {
473 layers.push(layer);
474 }
475 }
476 }
477 else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
479 && let Ok(layer) = parse_medallion_layer(s)
480 {
481 layers.push(layer);
482 }
483
484 layers
485 }
486
487 fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
489 data.get("scd_pattern")
490 .and_then(|v| v.as_str())
491 .and_then(|s| parse_scd_pattern(s).ok())
492 }
493
494 fn extract_data_vault_classification(
496 &self,
497 data: &JsonValue,
498 ) -> Option<DataVaultClassification> {
499 data.get("data_vault_classification")
500 .and_then(|v| v.as_str())
501 .and_then(|s| parse_data_vault_classification(s).ok())
502 }
503
504 fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
506 use serde_json::Value;
507 let mut quality_rules = Vec::new();
508
509 if let Some(quality_val) = data.get("quality") {
511 if let Some(arr) = quality_val.as_array() {
512 for item in arr {
514 if let Some(obj) = item.as_object() {
515 let mut rule = HashMap::new();
516 for (key, value) in obj {
517 rule.insert(key.clone(), json_value_to_serde_value(value));
518 }
519 quality_rules.push(rule);
520 }
521 }
522 } else if let Some(obj) = quality_val.as_object() {
523 let mut rule = HashMap::new();
525 for (key, value) in obj {
526 rule.insert(key.clone(), json_value_to_serde_value(value));
527 }
528 quality_rules.push(rule);
529 } else if let Some(s) = quality_val.as_str() {
530 let mut rule = HashMap::new();
532 rule.insert("value".to_string(), Value::String(s.to_string()));
533 quality_rules.push(rule);
534 }
535 }
536
537 if let Some(metadata) = data.get("metadata")
539 && let Some(metadata_obj) = metadata.as_object()
540 && let Some(quality_val) = metadata_obj.get("quality")
541 {
542 if let Some(arr) = quality_val.as_array() {
543 for item in arr {
545 if let Some(obj) = item.as_object() {
546 let mut rule = HashMap::new();
547 for (key, value) in obj {
548 rule.insert(key.clone(), json_value_to_serde_value(value));
549 }
550 quality_rules.push(rule);
551 }
552 }
553 } else if let Some(obj) = quality_val.as_object() {
554 let mut rule = HashMap::new();
556 for (key, value) in obj {
557 rule.insert(key.clone(), json_value_to_serde_value(value));
558 }
559 quality_rules.push(rule);
560 } else if let Some(s) = quality_val.as_str() {
561 let mut rule = HashMap::new();
563 rule.insert("value".to_string(), Value::String(s.to_string()));
564 quality_rules.push(rule);
565 }
566 }
567
568 if let Some(tblprops) = data.get("tblproperties")
570 && let Some(obj) = tblprops.as_object()
571 {
572 for (key, value) in obj {
573 let mut rule = HashMap::new();
574 rule.insert("property".to_string(), Value::String(key.clone()));
575 rule.insert("value".to_string(), json_value_to_serde_value(value));
576 quality_rules.push(rule);
577 }
578 }
579
580 quality_rules
581 }
582
583 fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
591 let mut errors = Vec::new();
605
606 let changelog = data
607 .get("databaseChangeLog")
608 .and_then(|v| v.as_array())
609 .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
610
611 let mut table_name: Option<String> = None;
613 let mut columns: Vec<crate::models::column::Column> = Vec::new();
614
615 for entry in changelog {
616 if let Some(change_set) = entry.get("changeSet") {
618 let changes = if let Some(obj) = change_set.as_object() {
620 obj.get("changes")
621 .and_then(|v| v.as_array())
622 .cloned()
623 .unwrap_or_default()
624 } else if let Some(arr) = change_set.as_array() {
625 arr.clone()
627 } else {
628 Vec::new()
629 };
630
631 for ch in changes {
632 let create = ch.get("createTable").or_else(|| ch.get("create_table"));
633 if let Some(create) = create {
634 table_name = create
635 .get("tableName")
636 .or_else(|| create.get("table_name"))
637 .and_then(|v| v.as_str())
638 .map(|s| s.to_string());
639
640 if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
642 for col_entry in cols {
643 let col = col_entry.get("column").unwrap_or(col_entry);
644 let name = col
645 .get("name")
646 .and_then(|v| v.as_str())
647 .unwrap_or("")
648 .to_string();
649 let data_type = col
650 .get("type")
651 .and_then(|v| v.as_str())
652 .unwrap_or("")
653 .to_string();
654
655 if name.is_empty() {
656 errors.push(ParserError {
657 error_type: "validation_error".to_string(),
658 field: "columns.name".to_string(),
659 message: "Liquibase createTable column missing name"
660 .to_string(),
661 });
662 continue;
663 }
664
665 let mut column =
666 crate::models::column::Column::new(name, data_type);
667
668 if let Some(constraints) =
669 col.get("constraints").and_then(|v| v.as_object())
670 {
671 if let Some(pk) =
672 constraints.get("primaryKey").and_then(|v| v.as_bool())
673 {
674 column.primary_key = pk;
675 }
676 if let Some(nullable) =
677 constraints.get("nullable").and_then(|v| v.as_bool())
678 {
679 column.nullable = nullable;
680 }
681 }
682
683 columns.push(column);
684 }
685 }
686
687 break;
690 }
691 }
692 }
693 if table_name.is_some() {
694 break;
695 }
696 }
697
698 let table_name = table_name
699 .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
700 let table = Table::new(table_name, columns);
701 Ok((table, errors))
703 }
704
705 fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
707 let mut errors = Vec::new();
708
709 let table_name = data
711 .get("name")
712 .and_then(|v| v.as_str())
713 .map(|s| s.to_string())
714 .or_else(|| {
715 data.get("schema")
717 .and_then(|v| v.as_array())
718 .and_then(|arr| arr.first())
719 .and_then(|obj| obj.as_object())
720 .and_then(|obj| obj.get("name"))
721 .and_then(|v| v.as_str())
722 .map(|s| s.to_string())
723 })
724 .ok_or_else(|| {
725 anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
726 })?;
727
728 let schema = data
730 .get("schema")
731 .and_then(|v| v.as_array())
732 .ok_or_else(|| {
733 errors.push(ParserError {
734 error_type: "validation_error".to_string(),
735 field: "schema".to_string(),
736 message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
737 });
738 anyhow::anyhow!("Missing schema")
739 });
740
741 let schema = match schema {
742 Ok(s) if s.is_empty() => {
743 errors.push(ParserError {
744 error_type: "validation_error".to_string(),
745 field: "schema".to_string(),
746 message: "ODCS v3.0.x schema array is empty".to_string(),
747 });
748 let quality_rules = self.extract_quality_rules(data);
749 let table_uuid = self.extract_table_uuid(data);
750 let table = Table {
751 id: table_uuid,
752 name: table_name,
753 columns: Vec::new(),
754 database_type: None,
755 catalog_name: None,
756 schema_name: None,
757 medallion_layers: Vec::new(),
758 scd_pattern: None,
759 data_vault_classification: None,
760 modeling_level: None,
761 tags: Vec::new(),
762 odcl_metadata: HashMap::new(),
763 owner: None,
764 sla: None,
765 contact_details: None,
766 infrastructure_type: None,
767 notes: None,
768 position: None,
769 yaml_file_path: None,
770 drawio_cell_id: None,
771 quality: quality_rules,
772 errors: Vec::new(),
773 created_at: chrono::Utc::now(),
774 updated_at: chrono::Utc::now(),
775 };
776 return Ok((table, errors));
777 }
778 Ok(s) => s,
779 Err(_) => {
780 let quality_rules = self.extract_quality_rules(data);
781 let table_uuid = self.extract_table_uuid(data);
782 let table = Table {
783 id: table_uuid,
784 name: table_name,
785 columns: Vec::new(),
786 database_type: None,
787 catalog_name: None,
788 schema_name: None,
789 medallion_layers: Vec::new(),
790 scd_pattern: None,
791 data_vault_classification: None,
792 modeling_level: None,
793 tags: Vec::new(),
794 odcl_metadata: HashMap::new(),
795 owner: None,
796 sla: None,
797 contact_details: None,
798 infrastructure_type: None,
799 notes: None,
800 position: None,
801 yaml_file_path: None,
802 drawio_cell_id: None,
803 quality: quality_rules,
804 errors: Vec::new(),
805 created_at: chrono::Utc::now(),
806 updated_at: chrono::Utc::now(),
807 };
808 return Ok((table, errors));
809 }
810 };
811
812 let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
814 errors.push(ParserError {
815 error_type: "validation_error".to_string(),
816 field: "schema[0]".to_string(),
817 message: "First schema object must be a dictionary".to_string(),
818 });
819 anyhow::anyhow!("Invalid schema object")
820 })?;
821
822 let object_name = schema_object
823 .get("name")
824 .and_then(|v| v.as_str())
825 .unwrap_or(&table_name);
826
827 let properties = schema_object
828 .get("properties")
829 .and_then(|v| v.as_object())
830 .ok_or_else(|| {
831 errors.push(ParserError {
832 error_type: "validation_error".to_string(),
833 field: format!("Object '{}'", object_name),
834 message: format!("Object '{}' missing 'properties' field", object_name),
835 });
836 anyhow::anyhow!("Missing properties")
837 })?;
838
839 let mut columns = Vec::new();
841 for (prop_name, prop_data) in properties {
842 if let Some(prop_obj) = prop_data.as_object() {
843 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
844 Ok(mut cols) => columns.append(&mut cols),
845 Err(e) => {
846 errors.push(ParserError {
847 error_type: "property_parse_error".to_string(),
848 field: format!("Property '{}'", prop_name),
849 message: e.to_string(),
850 });
851 }
852 }
853 } else {
854 errors.push(ParserError {
855 error_type: "validation_error".to_string(),
856 field: format!("Property '{}'", prop_name),
857 message: format!("Property '{}' must be an object", prop_name),
858 });
859 }
860 }
861
862 let (medallion_layers, scd_pattern, data_vault_classification, mut tags) =
864 self.extract_metadata_from_custom_properties(data);
865
866 let mut shared_domains: Vec<String> = Vec::new();
868 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
869 for prop in custom_props {
870 if let Some(prop_obj) = prop.as_object() {
871 let prop_key = prop_obj
872 .get("property")
873 .and_then(|v| v.as_str())
874 .unwrap_or("");
875 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
876 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
877 {
878 for item in arr {
879 if let Some(s) = item.as_str() {
880 shared_domains.push(s.to_string());
881 }
882 }
883 }
884 }
885 }
886 }
887
888 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
890 for item in tags_arr {
891 if let Some(s) = item.as_str()
892 && !tags.contains(&s.to_string())
893 {
894 tags.push(s.to_string());
895 }
896 }
897 }
898
899 let database_type = self.extract_database_type_from_odcl_v3_servers(data);
901
902 let quality_rules = self.extract_quality_rules(data);
904
905 let mut odcl_metadata = HashMap::new();
907 odcl_metadata.insert(
908 "apiVersion".to_string(),
909 json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
910 );
911 odcl_metadata.insert(
912 "kind".to_string(),
913 json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
914 );
915 odcl_metadata.insert(
916 "id".to_string(),
917 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
918 );
919 odcl_metadata.insert(
920 "version".to_string(),
921 json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
922 );
923 odcl_metadata.insert(
924 "status".to_string(),
925 json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
926 );
927
928 if let Some(servicelevels_val) = data.get("servicelevels") {
930 odcl_metadata.insert(
931 "servicelevels".to_string(),
932 json_value_to_serde_value(servicelevels_val),
933 );
934 }
935
936 if let Some(links_val) = data.get("links") {
938 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
939 }
940
941 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
943 odcl_metadata.insert(
944 "domain".to_string(),
945 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
946 );
947 }
948 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
949 odcl_metadata.insert(
950 "dataProduct".to_string(),
951 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
952 );
953 }
954 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
955 odcl_metadata.insert(
956 "tenant".to_string(),
957 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
958 );
959 }
960
961 if let Some(desc_val) = data.get("description") {
963 odcl_metadata.insert(
964 "description".to_string(),
965 json_value_to_serde_value(desc_val),
966 );
967 }
968
969 if let Some(pricing_val) = data.get("pricing") {
971 odcl_metadata.insert(
972 "pricing".to_string(),
973 json_value_to_serde_value(pricing_val),
974 );
975 }
976
977 if let Some(team_val) = data.get("team") {
979 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
980 }
981
982 if let Some(roles_val) = data.get("roles") {
984 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
985 }
986
987 if let Some(terms_val) = data.get("terms") {
989 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
990 }
991
992 if let Some(servers_val) = data.get("servers") {
994 odcl_metadata.insert(
995 "servers".to_string(),
996 json_value_to_serde_value(servers_val),
997 );
998 }
999
1000 if let Some(infrastructure_val) = data.get("infrastructure") {
1002 odcl_metadata.insert(
1003 "infrastructure".to_string(),
1004 json_value_to_serde_value(infrastructure_val),
1005 );
1006 }
1007
1008 if !shared_domains.is_empty() {
1010 let shared_domains_json: Vec<serde_json::Value> = shared_domains
1011 .iter()
1012 .map(|d| serde_json::Value::String(d.clone()))
1013 .collect();
1014 odcl_metadata.insert(
1015 "sharedDomains".to_string(),
1016 serde_json::Value::Array(shared_domains_json),
1017 );
1018 }
1019
1020 let table_uuid = self.extract_table_uuid(data);
1021
1022 let table = Table {
1023 id: table_uuid,
1024 name: table_name,
1025 columns,
1026 database_type,
1027 catalog_name: None,
1028 schema_name: None,
1029 medallion_layers,
1030 scd_pattern,
1031 data_vault_classification,
1032 modeling_level: None,
1033 tags,
1034 odcl_metadata,
1035 owner: None,
1036 sla: None,
1037 contact_details: None,
1038 infrastructure_type: None,
1039 notes: None,
1040 position: None,
1041 yaml_file_path: None,
1042 drawio_cell_id: None,
1043 quality: quality_rules,
1044 errors: Vec::new(),
1045 created_at: chrono::Utc::now(),
1046 updated_at: chrono::Utc::now(),
1047 };
1048
1049 info!(
1050 "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1051 table.name,
1052 errors.len()
1053 );
1054 Ok((table, errors))
1055 }
1056
1057 fn parse_odcl_v3_property(
1059 &self,
1060 prop_name: &str,
1061 prop_data: &serde_json::Map<String, JsonValue>,
1062 data: &JsonValue,
1063 ) -> Result<Vec<Column>> {
1064 let mut errors = Vec::new();
1066 self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
1067 }
1068
1069 fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
1072 if let Some(id_val) = data.get("id")
1074 && let Some(id_str) = id_val.as_str()
1075 {
1076 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1077 tracing::debug!(
1078 "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
1079 uuid
1080 );
1081 return uuid;
1082 } else {
1083 tracing::warn!(
1084 "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
1085 id_str
1086 );
1087 }
1088 }
1089
1090 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1092 for prop in custom_props {
1093 if let Some(prop_obj) = prop.as_object() {
1094 let prop_key = prop_obj
1095 .get("property")
1096 .and_then(|v| v.as_str())
1097 .unwrap_or("");
1098 if prop_key == "tableUuid"
1099 && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
1100 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1101 {
1102 tracing::debug!(
1103 "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
1104 uuid
1105 );
1106 return uuid;
1107 }
1108 }
1109 }
1110 }
1111
1112 if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
1114 && let Some(uuid_val) = metadata.get("tableUuid")
1115 && let Some(uuid_str) = uuid_val.as_str()
1116 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1117 {
1118 tracing::debug!(
1119 "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
1120 uuid
1121 );
1122 return uuid;
1123 }
1124
1125 let table_name = data
1127 .get("name")
1128 .and_then(|v| v.as_str())
1129 .unwrap_or("unknown");
1130 let new_uuid = crate::models::table::Table::generate_id(
1131 table_name, None, None, None, );
1135 tracing::warn!(
1136 "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
1137 table_name,
1138 new_uuid
1139 );
1140 new_uuid
1141 }
1142
1143 fn extract_metadata_from_custom_properties(
1145 &self,
1146 data: &JsonValue,
1147 ) -> (
1148 Vec<MedallionLayer>,
1149 Option<SCDPattern>,
1150 Option<DataVaultClassification>,
1151 Vec<String>,
1152 ) {
1153 let mut medallion_layers = Vec::new();
1154 let mut scd_pattern = None;
1155 let mut data_vault_classification = None;
1156 let mut tags = Vec::new();
1157
1158 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1159 for prop in custom_props {
1160 if let Some(prop_obj) = prop.as_object() {
1161 let prop_key = prop_obj
1162 .get("property")
1163 .and_then(|v| v.as_str())
1164 .unwrap_or("");
1165 let prop_value = prop_obj.get("value");
1166
1167 match prop_key {
1168 "medallionLayers" | "medallion_layers" => {
1169 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1170 for item in arr {
1171 if let Some(s) = item.as_str()
1172 && let Ok(layer) = parse_medallion_layer(s)
1173 {
1174 medallion_layers.push(layer);
1175 }
1176 }
1177 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1178 for part in s.split(',') {
1180 if let Ok(layer) = parse_medallion_layer(part.trim()) {
1181 medallion_layers.push(layer);
1182 }
1183 }
1184 }
1185 }
1186 "scdPattern" | "scd_pattern" => {
1187 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1188 scd_pattern = parse_scd_pattern(s).ok();
1189 }
1190 }
1191 "dataVaultClassification" | "data_vault_classification" => {
1192 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1193 data_vault_classification = parse_data_vault_classification(s).ok();
1194 }
1195 }
1196 "tags" => {
1197 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1198 for item in arr {
1199 if let Some(s) = item.as_str() {
1200 tags.push(s.to_string());
1201 }
1202 }
1203 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1204 for part in s.split(',') {
1206 tags.push(part.trim().to_string());
1207 }
1208 }
1209 }
1210 "sharedDomains" | "shared_domains" => {
1211 }
1214 _ => {}
1215 }
1216 }
1217 }
1218 }
1219
1220 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1222 for item in tags_arr {
1223 if let Some(s) = item.as_str()
1224 && !tags.contains(&s.to_string())
1225 {
1226 tags.push(s.to_string());
1227 }
1228 }
1229 }
1230
1231 (
1232 medallion_layers,
1233 scd_pattern,
1234 data_vault_classification,
1235 tags,
1236 )
1237 }
1238
1239 fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
1241 if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
1243 && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
1244 {
1245 return server_obj
1246 .get("type")
1247 .and_then(|v| v.as_str())
1248 .and_then(|s| self.parse_database_type(s));
1249 }
1250 None
1251 }
1252
1253 fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1255 let mut errors = Vec::new();
1256
1257 let models = data
1259 .get("models")
1260 .and_then(|v| v.as_object())
1261 .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
1262
1263 let (model_name, model_data) = models
1266 .iter()
1267 .next()
1268 .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
1269
1270 let model_data = model_data
1271 .as_object()
1272 .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
1273
1274 let fields = model_data
1276 .get("fields")
1277 .and_then(|v| v.as_object())
1278 .ok_or_else(|| {
1279 errors.push(ParserError {
1280 error_type: "validation_error".to_string(),
1281 field: format!("Model '{}'", model_name),
1282 message: format!("Model '{}' missing 'fields' field", model_name),
1283 });
1284 anyhow::anyhow!("Missing fields")
1285 });
1286
1287 let fields = match fields {
1288 Ok(f) => f,
1289 Err(_) => {
1290 let quality_rules = self.extract_quality_rules(data);
1292 let table_uuid = self.extract_table_uuid(data);
1293 let table = Table {
1294 id: table_uuid,
1295 name: model_name.clone(),
1296 columns: Vec::new(),
1297 database_type: None,
1298 catalog_name: None,
1299 schema_name: None,
1300 medallion_layers: Vec::new(),
1301 scd_pattern: None,
1302 data_vault_classification: None,
1303 modeling_level: None,
1304 tags: Vec::new(),
1305 odcl_metadata: HashMap::new(),
1306 owner: None,
1307 sla: None,
1308 contact_details: None,
1309 infrastructure_type: None,
1310 notes: None,
1311 position: None,
1312 yaml_file_path: None,
1313 drawio_cell_id: None,
1314 quality: quality_rules,
1315 errors: Vec::new(),
1316 created_at: chrono::Utc::now(),
1317 updated_at: chrono::Utc::now(),
1318 };
1319 return Ok((table, errors));
1320 }
1321 };
1322
1323 let mut columns = Vec::new();
1325 for (field_name, field_data) in fields {
1326 if let Some(field_obj) = field_data.as_object() {
1327 match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
1328 Ok(mut cols) => columns.append(&mut cols),
1329 Err(e) => {
1330 errors.push(ParserError {
1331 error_type: "field_parse_error".to_string(),
1332 field: format!("Field '{}'", field_name),
1333 message: e.to_string(),
1334 });
1335 }
1336 }
1337 } else {
1338 errors.push(ParserError {
1339 error_type: "validation_error".to_string(),
1340 field: format!("Field '{}'", field_name),
1341 message: format!("Field '{}' must be an object", field_name),
1342 });
1343 }
1344 }
1345
1346 let mut odcl_metadata = HashMap::new();
1349
1350 if let Some(info_val) = data.get("info") {
1353 let info_json_value = json_value_to_serde_value(info_val);
1355 odcl_metadata.insert("info".to_string(), info_json_value);
1356 }
1357
1358 odcl_metadata.insert(
1359 "dataContractSpecification".to_string(),
1360 json_value_to_serde_value(
1361 data.get("dataContractSpecification")
1362 .unwrap_or(&JsonValue::Null),
1363 ),
1364 );
1365 odcl_metadata.insert(
1366 "id".to_string(),
1367 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1368 );
1369 if let Some(servicelevels_val) = data.get("servicelevels") {
1373 odcl_metadata.insert(
1374 "servicelevels".to_string(),
1375 json_value_to_serde_value(servicelevels_val),
1376 );
1377 }
1378
1379 if let Some(links_val) = data.get("links") {
1381 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1382 }
1383
1384 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1386 odcl_metadata.insert(
1387 "domain".to_string(),
1388 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1389 );
1390 }
1391 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1392 odcl_metadata.insert(
1393 "dataProduct".to_string(),
1394 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1395 );
1396 }
1397 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1398 odcl_metadata.insert(
1399 "tenant".to_string(),
1400 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1401 );
1402 }
1403
1404 if let Some(desc_val) = data.get("description") {
1406 odcl_metadata.insert(
1407 "description".to_string(),
1408 json_value_to_serde_value(desc_val),
1409 );
1410 }
1411
1412 if let Some(pricing_val) = data.get("pricing") {
1414 odcl_metadata.insert(
1415 "pricing".to_string(),
1416 json_value_to_serde_value(pricing_val),
1417 );
1418 }
1419
1420 if let Some(team_val) = data.get("team") {
1422 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1423 }
1424
1425 if let Some(roles_val) = data.get("roles") {
1427 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1428 }
1429
1430 if let Some(terms_val) = data.get("terms") {
1432 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1433 }
1434
1435 if let Some(servers_val) = data.get("servers") {
1437 odcl_metadata.insert(
1438 "servers".to_string(),
1439 json_value_to_serde_value(servers_val),
1440 );
1441 }
1442
1443 if let Some(infrastructure_val) = data.get("infrastructure") {
1445 odcl_metadata.insert(
1446 "infrastructure".to_string(),
1447 json_value_to_serde_value(infrastructure_val),
1448 );
1449 }
1450
1451 let database_type = self.extract_database_type_from_servers(data);
1453
1454 let (catalog_name, schema_name) = self.extract_catalog_schema(data);
1456
1457 let mut shared_domains: Vec<String> = Vec::new();
1459 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1460 for prop in custom_props {
1461 if let Some(prop_obj) = prop.as_object() {
1462 let prop_key = prop_obj
1463 .get("property")
1464 .and_then(|v| v.as_str())
1465 .unwrap_or("");
1466 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1467 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1468 {
1469 for item in arr {
1470 if let Some(s) = item.as_str() {
1471 shared_domains.push(s.to_string());
1472 }
1473 }
1474 }
1475 }
1476 }
1477 }
1478
1479 let mut tags = Vec::new();
1481 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1482 for item in tags_arr {
1483 if let Some(s) = item.as_str() {
1484 tags.push(s.to_string());
1485 }
1486 }
1487 }
1488
1489 let quality_rules = self.extract_quality_rules(data);
1491
1492 if !shared_domains.is_empty() {
1494 let shared_domains_json: Vec<serde_json::Value> = shared_domains
1495 .iter()
1496 .map(|d| serde_json::Value::String(d.clone()))
1497 .collect();
1498 odcl_metadata.insert(
1499 "sharedDomains".to_string(),
1500 serde_json::Value::Array(shared_domains_json),
1501 );
1502 }
1503
1504 let table_uuid = self.extract_table_uuid(data);
1505
1506 let table = Table {
1507 id: table_uuid,
1508 name: model_name.clone(),
1509 columns,
1510 database_type,
1511 catalog_name,
1512 schema_name,
1513 medallion_layers: Vec::new(),
1514 scd_pattern: None,
1515 data_vault_classification: None,
1516 modeling_level: None,
1517 tags,
1518 odcl_metadata,
1519 owner: None,
1520 sla: None,
1521 contact_details: None,
1522 infrastructure_type: None,
1523 notes: None,
1524 position: None,
1525 yaml_file_path: None,
1526 drawio_cell_id: None,
1527 quality: quality_rules,
1528 errors: Vec::new(),
1529 created_at: chrono::Utc::now(),
1530 updated_at: chrono::Utc::now(),
1531 };
1532
1533 info!(
1534 "Parsed Data Contract table: {} with {} warnings/errors",
1535 model_name,
1536 errors.len()
1537 );
1538 Ok((table, errors))
1539 }
1540
1541 #[allow(clippy::only_used_in_recursion)]
1546 fn expand_nested_column(
1547 &self,
1548 column_name: &str,
1549 schema: &JsonValue,
1550 nullable: bool,
1551 columns: &mut Vec<Column>,
1552 errors: &mut Vec<ParserError>,
1553 ) {
1554 let schema_obj = match schema.as_object() {
1555 Some(obj) => obj,
1556 None => {
1557 errors.push(ParserError {
1558 error_type: "parse_error".to_string(),
1559 field: column_name.to_string(),
1560 message: "Nested schema must be an object".to_string(),
1561 });
1562 return;
1563 }
1564 };
1565
1566 let schema_type = schema_obj
1567 .get("type")
1568 .and_then(|v| v.as_str())
1569 .unwrap_or("object");
1570
1571 match schema_type {
1572 "object" | "struct" => {
1573 if let Some(properties) = schema_obj.get("properties").and_then(|v| v.as_object()) {
1575 let nested_required: Vec<String> = schema_obj
1576 .get("required")
1577 .and_then(|v| v.as_array())
1578 .map(|arr| {
1579 arr.iter()
1580 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1581 .collect()
1582 })
1583 .unwrap_or_default();
1584
1585 for (nested_name, nested_schema) in properties {
1586 let nested_nullable = !nested_required.contains(nested_name);
1587 self.expand_nested_column(
1588 &format!("{}.{}", column_name, nested_name),
1589 nested_schema,
1590 nullable || nested_nullable,
1591 columns,
1592 errors,
1593 );
1594 }
1595 } else {
1596 let description = schema_obj
1598 .get("description")
1599 .and_then(|v| v.as_str())
1600 .unwrap_or("")
1601 .to_string();
1602 columns.push(Column {
1603 name: column_name.to_string(),
1604 data_type: "OBJECT".to_string(),
1605 nullable,
1606 primary_key: false,
1607 secondary_key: false,
1608 composite_key: None,
1609 foreign_key: None,
1610 constraints: Vec::new(),
1611 description,
1612 quality: Vec::new(),
1613 enum_values: Vec::new(),
1614 errors: Vec::new(),
1615 column_order: 0,
1616 });
1617 }
1618 }
1619 "array" => {
1620 let items = schema_obj.get("items").unwrap_or(schema);
1622 let items_type = items
1623 .as_object()
1624 .and_then(|obj| obj.get("type").and_then(|v| v.as_str()))
1625 .unwrap_or("string");
1626
1627 if items_type == "object" || items_type == "struct" {
1628 let description = schema_obj
1630 .get("description")
1631 .and_then(|v| v.as_str())
1632 .unwrap_or("")
1633 .to_string();
1634 columns.push(Column {
1635 name: column_name.to_string(),
1636 data_type: "ARRAY<OBJECT>".to_string(),
1637 nullable,
1638 primary_key: false,
1639 secondary_key: false,
1640 composite_key: None,
1641 foreign_key: None,
1642 constraints: Vec::new(),
1643 description,
1644 quality: Vec::new(),
1645 enum_values: Vec::new(),
1646 errors: Vec::new(),
1647 column_order: 0,
1648 });
1649 if let Some(properties) = items
1651 .as_object()
1652 .and_then(|obj| obj.get("properties").and_then(|v| v.as_object()))
1653 {
1654 let nested_required: Vec<String> = items
1655 .as_object()
1656 .and_then(|obj| obj.get("required").and_then(|v| v.as_array()))
1657 .map(|arr| {
1658 arr.iter()
1659 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1660 .collect()
1661 })
1662 .unwrap_or_default();
1663
1664 for (nested_name, nested_schema) in properties {
1665 let nested_nullable = !nested_required.contains(nested_name);
1666 self.expand_nested_column(
1667 &format!("{}.{}", column_name, nested_name),
1668 nested_schema,
1669 nullable || nested_nullable,
1670 columns,
1671 errors,
1672 );
1673 }
1674 }
1675 } else {
1676 let data_type = format!("ARRAY<{}>", items_type.to_uppercase());
1678 let description = schema_obj
1679 .get("description")
1680 .and_then(|v| v.as_str())
1681 .unwrap_or("")
1682 .to_string();
1683 columns.push(Column {
1684 name: column_name.to_string(),
1685 data_type,
1686 nullable,
1687 primary_key: false,
1688 secondary_key: false,
1689 composite_key: None,
1690 foreign_key: None,
1691 constraints: Vec::new(),
1692 description,
1693 quality: Vec::new(),
1694 enum_values: Vec::new(),
1695 errors: Vec::new(),
1696 column_order: 0,
1697 });
1698 }
1699 }
1700 _ => {
1701 let data_type = schema_type.to_uppercase();
1703 let description = schema_obj
1704 .get("description")
1705 .and_then(|v| v.as_str())
1706 .unwrap_or("")
1707 .to_string();
1708 let enum_values = schema_obj
1709 .get("enum")
1710 .and_then(|v| v.as_array())
1711 .map(|arr| {
1712 arr.iter()
1713 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1714 .collect()
1715 })
1716 .unwrap_or_default();
1717 columns.push(Column {
1718 name: column_name.to_string(),
1719 data_type,
1720 nullable,
1721 primary_key: false,
1722 secondary_key: false,
1723 composite_key: None,
1724 foreign_key: None,
1725 constraints: Vec::new(),
1726 description,
1727 quality: Vec::new(),
1728 enum_values,
1729 errors: Vec::new(),
1730 column_order: 0,
1731 });
1732 }
1733 }
1734 }
1735
1736 fn parse_data_contract_field(
1738 &self,
1739 field_name: &str,
1740 field_data: &serde_json::Map<String, JsonValue>,
1741 data: &JsonValue,
1742 errors: &mut Vec<ParserError>,
1743 ) -> Result<Vec<Column>> {
1744 let mut columns = Vec::new();
1745
1746 let extract_quality_from_obj =
1748 |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
1749 let mut quality_rules = Vec::new();
1750 if let Some(quality_val) = obj.get("quality") {
1751 if let Some(arr) = quality_val.as_array() {
1752 for item in arr {
1754 if let Some(rule_obj) = item.as_object() {
1755 let mut rule = HashMap::new();
1756 for (key, value) in rule_obj {
1757 rule.insert(key.clone(), json_value_to_serde_value(value));
1758 }
1759 quality_rules.push(rule);
1760 }
1761 }
1762 } else if let Some(rule_obj) = quality_val.as_object() {
1763 let mut rule = HashMap::new();
1765 for (key, value) in rule_obj {
1766 rule.insert(key.clone(), json_value_to_serde_value(value));
1767 }
1768 quality_rules.push(rule);
1769 }
1770 }
1771 quality_rules
1772 };
1773
1774 if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
1776 if let Some(definition) = self.resolve_ref(ref_str, data) {
1777 let mut quality_rules = extract_quality_from_obj(field_data);
1779
1780 if quality_rules.is_empty() {
1782 if let Some(def_obj) = definition.as_object() {
1783 quality_rules = extract_quality_from_obj(def_obj);
1784 }
1785 } else {
1786 if let Some(def_obj) = definition.as_object() {
1788 let def_quality = extract_quality_from_obj(def_obj);
1789 quality_rules.extend(def_quality);
1791 }
1792 }
1793
1794 let required = field_data
1795 .get("required")
1796 .and_then(|v| v.as_bool())
1797 .unwrap_or(false);
1798
1799 if required {
1801 let has_not_null = quality_rules.iter().any(|rule| {
1802 rule.get("type")
1803 .and_then(|v| v.as_str())
1804 .map(|s| {
1805 s.to_lowercase().contains("not_null")
1806 || s.to_lowercase().contains("notnull")
1807 })
1808 .unwrap_or(false)
1809 });
1810 if !has_not_null {
1811 let mut not_null_rule = HashMap::new();
1812 not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
1813 not_null_rule.insert(
1814 "description".to_string(),
1815 serde_json::json!("Column must not be null"),
1816 );
1817 quality_rules.push(not_null_rule);
1818 }
1819 }
1820
1821 let has_nested = definition
1823 .get("type")
1824 .and_then(|v| v.as_str())
1825 .map(|s| s == "object")
1826 .unwrap_or(false)
1827 || definition.get("properties").is_some()
1828 || definition.get("fields").is_some();
1829
1830 if has_nested {
1831 if let Some(properties) =
1833 definition.get("properties").and_then(|v| v.as_object())
1834 {
1835 let nested_required: Vec<String> = definition
1837 .get("required")
1838 .and_then(|v| v.as_array())
1839 .map(|arr| {
1840 arr.iter()
1841 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1842 .collect()
1843 })
1844 .unwrap_or_default();
1845
1846 for (nested_name, nested_schema) in properties {
1847 let nested_required_field = nested_required.contains(nested_name);
1848 self.expand_nested_column(
1849 &format!("{}.{}", field_name, nested_name),
1850 nested_schema,
1851 !nested_required_field,
1852 &mut columns,
1853 errors,
1854 );
1855 }
1856 } else if let Some(fields) =
1857 definition.get("fields").and_then(|v| v.as_object())
1858 {
1859 for (nested_name, nested_schema) in fields {
1861 self.expand_nested_column(
1862 &format!("{}.{}", field_name, nested_name),
1863 nested_schema,
1864 true, &mut columns,
1866 errors,
1867 );
1868 }
1869 } else {
1870 columns.push(Column {
1872 name: field_name.to_string(),
1873 data_type: "OBJECT".to_string(),
1874 nullable: !required,
1875 primary_key: false,
1876 secondary_key: false,
1877 composite_key: None,
1878 foreign_key: None,
1879 constraints: Vec::new(),
1880 description: field_data
1881 .get("description")
1882 .or_else(|| definition.get("description"))
1883 .and_then(|v| v.as_str())
1884 .unwrap_or("")
1885 .to_string(),
1886 errors: Vec::new(),
1887 quality: quality_rules.clone(),
1888 enum_values: Vec::new(),
1889 column_order: 0,
1890 });
1891 }
1892 } else {
1893 let def_type = definition
1895 .get("type")
1896 .and_then(|v| v.as_str())
1897 .unwrap_or("STRING")
1898 .to_uppercase();
1899
1900 let enum_values = definition
1901 .get("enum")
1902 .and_then(|v| v.as_array())
1903 .map(|arr| {
1904 arr.iter()
1905 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1906 .collect()
1907 })
1908 .unwrap_or_default();
1909
1910 columns.push(Column {
1911 name: field_name.to_string(),
1912 data_type: def_type,
1913 nullable: !required,
1914 primary_key: false,
1915 secondary_key: false,
1916 composite_key: None,
1917 foreign_key: None,
1918 constraints: Vec::new(),
1919 description: field_data
1920 .get("description")
1921 .or_else(|| definition.get("description"))
1922 .and_then(|v| v.as_str())
1923 .unwrap_or("")
1924 .to_string(),
1925 errors: Vec::new(),
1926 quality: quality_rules,
1927 enum_values,
1928 column_order: 0,
1929 });
1930 }
1931 return Ok(columns);
1932 } else {
1933 let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
1935 let mut error_map = HashMap::new();
1936 error_map.insert("type".to_string(), serde_json::json!("validation_error"));
1937 error_map.insert("field".to_string(), serde_json::json!("data_type"));
1938 error_map.insert(
1939 "message".to_string(),
1940 serde_json::json!(format!(
1941 "Field '{}' references undefined definition: {}",
1942 field_name, ref_str
1943 )),
1944 );
1945 col_errors.push(error_map);
1946 columns.push(Column {
1947 name: field_name.to_string(),
1948 data_type: "OBJECT".to_string(),
1949 nullable: true,
1950 primary_key: false,
1951 secondary_key: false,
1952 composite_key: None,
1953 foreign_key: None,
1954 constraints: Vec::new(),
1955 description: field_data
1956 .get("description")
1957 .and_then(|v| v.as_str())
1958 .unwrap_or("")
1959 .to_string(),
1960 errors: col_errors,
1961 quality: Vec::new(),
1962 enum_values: Vec::new(),
1963 column_order: 0,
1964 });
1965 return Ok(columns);
1966 }
1967 }
1968
1969 let field_type_str = field_data
1971 .get("type")
1972 .and_then(|v| v.as_str())
1973 .unwrap_or("STRING");
1974
1975 if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
1977 match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
1978 Ok(nested_cols) if !nested_cols.is_empty() => {
1979 let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
1981 "ARRAY<STRUCT<...>>".to_string()
1982 } else {
1983 "STRUCT<...>".to_string()
1984 };
1985
1986 columns.push(Column {
1988 name: field_name.to_string(),
1989 data_type: parent_data_type,
1990 nullable: !field_data
1991 .get("required")
1992 .and_then(|v| v.as_bool())
1993 .unwrap_or(false),
1994 primary_key: false,
1995 secondary_key: false,
1996 composite_key: None,
1997 foreign_key: None,
1998 constraints: Vec::new(),
1999 description: field_data
2000 .get("description")
2001 .and_then(|v| v.as_str())
2002 .unwrap_or("")
2003 .to_string(),
2004 errors: Vec::new(),
2005 quality: Vec::new(),
2006 enum_values: Vec::new(),
2007 column_order: 0,
2008 });
2009
2010 columns.extend(nested_cols);
2012 return Ok(columns);
2013 }
2014 Ok(_) | Err(_) => {
2015 }
2017 }
2018 }
2019
2020 let field_type = normalize_data_type(field_type_str);
2021
2022 if field_type == "ARRAY" {
2024 let items = field_data.get("items");
2025 if let Some(items_val) = items {
2026 if let Some(items_obj) = items_val.as_object() {
2027 if items_obj.get("fields").is_some()
2029 || items_obj.get("type").and_then(|v| v.as_str()) == Some("object")
2030 {
2031 columns.push(Column {
2033 name: field_name.to_string(),
2034 data_type: "ARRAY<OBJECT>".to_string(),
2035 nullable: !field_data
2036 .get("required")
2037 .and_then(|v| v.as_bool())
2038 .unwrap_or(false),
2039 primary_key: false,
2040 secondary_key: false,
2041 composite_key: None,
2042 foreign_key: None,
2043 constraints: Vec::new(),
2044 description: field_data
2045 .get("description")
2046 .and_then(|v| v.as_str())
2047 .unwrap_or("")
2048 .to_string(),
2049 errors: Vec::new(),
2050 quality: Vec::new(),
2051 enum_values: Vec::new(),
2052 column_order: 0,
2053 });
2054
2055 let nested_fields_obj = items_obj
2058 .get("properties")
2059 .and_then(|v| v.as_object())
2060 .or_else(|| items_obj.get("fields").and_then(|v| v.as_object()));
2061
2062 if let Some(fields_obj) = nested_fields_obj {
2063 for (nested_field_name, nested_field_data) in fields_obj {
2064 if let Some(nested_field_obj) = nested_field_data.as_object() {
2065 let nested_field_type = nested_field_obj
2066 .get("type")
2067 .and_then(|v| v.as_str())
2068 .unwrap_or("STRING");
2069
2070 let nested_col_name =
2072 format!("{}.{}", field_name, nested_field_name);
2073 let mut local_errors = Vec::new();
2074 match self.parse_data_contract_field(
2075 &nested_col_name,
2076 nested_field_obj,
2077 data,
2078 &mut local_errors,
2079 ) {
2080 Ok(mut nested_cols) => {
2081 columns.append(&mut nested_cols);
2084 }
2085 Err(_) => {
2086 columns.push(Column {
2088 name: nested_col_name,
2089 data_type: nested_field_type.to_uppercase(),
2090 nullable: !nested_field_obj
2091 .get("required")
2092 .and_then(|v| v.as_bool())
2093 .unwrap_or(false),
2094 primary_key: false,
2095 secondary_key: false,
2096 composite_key: None,
2097 foreign_key: None,
2098 constraints: Vec::new(),
2099 description: nested_field_obj
2100 .get("description")
2101 .and_then(|v| v.as_str())
2102 .unwrap_or("")
2103 .to_string(),
2104 errors: Vec::new(),
2105 quality: Vec::new(),
2106 enum_values: Vec::new(),
2107 column_order: 0,
2108 });
2109 }
2110 }
2111 }
2112 }
2113 }
2114
2115 return Ok(columns);
2116 } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
2117 columns.push(Column {
2119 name: field_name.to_string(),
2120 data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
2121 nullable: !field_data
2122 .get("required")
2123 .and_then(|v| v.as_bool())
2124 .unwrap_or(false),
2125 primary_key: false,
2126 secondary_key: false,
2127 composite_key: None,
2128 foreign_key: None,
2129 constraints: Vec::new(),
2130 description: field_data
2131 .get("description")
2132 .and_then(|v| v.as_str())
2133 .unwrap_or("")
2134 .to_string(),
2135 errors: Vec::new(),
2136 quality: Vec::new(),
2137 enum_values: Vec::new(),
2138 column_order: 0,
2139 });
2140 return Ok(columns);
2141 }
2142 } else if let Some(item_type_str) = items_val.as_str() {
2143 columns.push(Column {
2145 name: field_name.to_string(),
2146 data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
2147 nullable: !field_data
2148 .get("required")
2149 .and_then(|v| v.as_bool())
2150 .unwrap_or(false),
2151 primary_key: false,
2152 secondary_key: false,
2153 composite_key: None,
2154 foreign_key: None,
2155 constraints: Vec::new(),
2156 description: field_data
2157 .get("description")
2158 .and_then(|v| v.as_str())
2159 .unwrap_or("")
2160 .to_string(),
2161 errors: Vec::new(),
2162 quality: Vec::new(),
2163 enum_values: Vec::new(),
2164 column_order: 0,
2165 });
2166 return Ok(columns);
2167 }
2168 }
2169 columns.push(Column {
2171 name: field_name.to_string(),
2172 data_type: "ARRAY<STRING>".to_string(),
2173 nullable: !field_data
2174 .get("required")
2175 .and_then(|v| v.as_bool())
2176 .unwrap_or(false),
2177 primary_key: false,
2178 secondary_key: false,
2179 composite_key: None,
2180 foreign_key: None,
2181 constraints: Vec::new(),
2182 description: field_data
2183 .get("description")
2184 .and_then(|v| v.as_str())
2185 .unwrap_or("")
2186 .to_string(),
2187 errors: Vec::new(),
2188 quality: Vec::new(),
2189 enum_values: Vec::new(),
2190 column_order: 0,
2191 });
2192 return Ok(columns);
2193 }
2194
2195 let nested_fields_obj = field_data
2198 .get("properties")
2199 .and_then(|v| v.as_object())
2200 .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
2201
2202 if field_type == "OBJECT"
2203 && let Some(fields_obj) = nested_fields_obj
2204 {
2205 columns.push(Column {
2209 name: field_name.to_string(),
2210 data_type: "OBJECT".to_string(),
2211 nullable: !field_data
2212 .get("required")
2213 .and_then(|v| v.as_bool())
2214 .unwrap_or(false),
2215 primary_key: false,
2216 secondary_key: false,
2217 composite_key: None,
2218 foreign_key: None,
2219 constraints: Vec::new(),
2220 description: field_data
2221 .get("description")
2222 .and_then(|v| v.as_str())
2223 .unwrap_or("")
2224 .to_string(),
2225 errors: Vec::new(),
2226 quality: Vec::new(),
2227 enum_values: Vec::new(),
2228 column_order: 0,
2229 });
2230
2231 for (nested_field_name, nested_field_data) in fields_obj {
2233 if let Some(nested_field_obj) = nested_field_data.as_object() {
2234 let nested_field_type = nested_field_obj
2235 .get("type")
2236 .and_then(|v| v.as_str())
2237 .unwrap_or("STRING");
2238
2239 let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2241 match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data) {
2242 Ok(mut nested_cols) => {
2243 columns.append(&mut nested_cols);
2246 }
2247 Err(_) => {
2248 columns.push(Column {
2250 name: nested_col_name,
2251 data_type: nested_field_type.to_uppercase(),
2252 nullable: !nested_field_obj
2253 .get("required")
2254 .and_then(|v| v.as_bool())
2255 .unwrap_or(false),
2256 primary_key: false,
2257 secondary_key: false,
2258 composite_key: None,
2259 foreign_key: None,
2260 constraints: Vec::new(),
2261 description: nested_field_obj
2262 .get("description")
2263 .and_then(|v| v.as_str())
2264 .unwrap_or("")
2265 .to_string(),
2266 errors: Vec::new(),
2267 quality: Vec::new(),
2268 enum_values: Vec::new(),
2269 column_order: 0,
2270 });
2271 }
2272 }
2273 }
2274 }
2275
2276 return Ok(columns);
2277 }
2278
2279 let required = field_data
2281 .get("required")
2282 .and_then(|v| v.as_bool())
2283 .unwrap_or(false);
2284 let description = field_data
2285 .get("description")
2286 .and_then(|v| v.as_str())
2287 .unwrap_or("")
2288 .to_string();
2289
2290 let mut column_quality_rules = Vec::new();
2292 if let Some(quality_val) = field_data.get("quality") {
2293 if let Some(arr) = quality_val.as_array() {
2294 for item in arr {
2296 if let Some(obj) = item.as_object() {
2297 let mut rule = HashMap::new();
2298 for (key, value) in obj {
2299 rule.insert(key.clone(), json_value_to_serde_value(value));
2300 }
2301 column_quality_rules.push(rule);
2302 }
2303 }
2304 } else if let Some(obj) = quality_val.as_object() {
2305 let mut rule = HashMap::new();
2307 for (key, value) in obj {
2308 rule.insert(key.clone(), json_value_to_serde_value(value));
2309 }
2310 column_quality_rules.push(rule);
2311 }
2312 }
2313
2314 if required {
2316 let has_not_null = column_quality_rules.iter().any(|rule| {
2317 rule.get("type")
2318 .and_then(|v| v.as_str())
2319 .map(|s| {
2320 s.to_lowercase().contains("not_null")
2321 || s.to_lowercase().contains("notnull")
2322 })
2323 .unwrap_or(false)
2324 });
2325 if !has_not_null {
2326 let mut not_null_rule = HashMap::new();
2327 not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
2328 not_null_rule.insert(
2329 "description".to_string(),
2330 serde_json::json!("Column must not be null"),
2331 );
2332 column_quality_rules.push(not_null_rule);
2333 }
2334 }
2335
2336 columns.push(Column {
2337 name: field_name.to_string(),
2338 data_type: field_type,
2339 nullable: !required,
2340 primary_key: field_data
2341 .get("primaryKey")
2342 .and_then(|v| v.as_bool())
2343 .unwrap_or(false),
2344 secondary_key: false,
2345 composite_key: None,
2346 foreign_key: self.parse_foreign_key_from_data_contract(field_data),
2347 constraints: Vec::new(),
2348 description,
2349 errors: Vec::new(),
2350 quality: column_quality_rules,
2351 enum_values: Vec::new(),
2352 column_order: 0,
2353 });
2354
2355 Ok(columns)
2356 }
2357
2358 fn parse_foreign_key_from_data_contract(
2360 &self,
2361 field_data: &serde_json::Map<String, JsonValue>,
2362 ) -> Option<ForeignKey> {
2363 field_data
2364 .get("foreignKey")
2365 .and_then(|v| v.as_object())
2366 .map(|fk_obj| ForeignKey {
2367 table_id: fk_obj
2368 .get("table")
2369 .or_else(|| fk_obj.get("table_id"))
2370 .and_then(|v| v.as_str())
2371 .unwrap_or("")
2372 .to_string(),
2373 column_name: fk_obj
2374 .get("column")
2375 .or_else(|| fk_obj.get("column_name"))
2376 .and_then(|v| v.as_str())
2377 .unwrap_or("")
2378 .to_string(),
2379 })
2380 }
2381
2382 fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2384 if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
2386 if let Some((_, server_data)) = servers_obj.iter().next()
2388 && let Some(server_obj) = server_data.as_object()
2389 {
2390 return server_obj
2391 .get("type")
2392 .and_then(|v| v.as_str())
2393 .and_then(|s| self.parse_database_type(s));
2394 }
2395 } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
2396 if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
2398 return server_obj
2399 .get("type")
2400 .and_then(|v| v.as_str())
2401 .and_then(|s| self.parse_database_type(s));
2402 }
2403 }
2404 None
2405 }
2406
2407 fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
2409 match s.to_lowercase().as_str() {
2410 "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
2411 "postgres" | "postgresql" => Some(DatabaseType::Postgres),
2412 "mysql" => Some(DatabaseType::Mysql),
2413 "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
2414 "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
2415 _ => None,
2416 }
2417 }
2418
2419 fn parse_struct_type_from_string(
2421 &self,
2422 field_name: &str,
2423 type_str: &str,
2424 field_data: &serde_json::Map<String, JsonValue>,
2425 ) -> Result<Vec<Column>> {
2426 let mut columns = Vec::new();
2427
2428 let normalized_type = type_str
2430 .lines()
2431 .map(|line| line.trim())
2432 .filter(|line| !line.is_empty())
2433 .collect::<Vec<_>>()
2434 .join(" ");
2435
2436 let type_str_upper = normalized_type.to_uppercase();
2437
2438 let _is_array = type_str_upper.starts_with("ARRAY<");
2440 let struct_start = type_str_upper.find("STRUCT<");
2441
2442 if let Some(start_pos) = struct_start {
2443 let struct_content = &normalized_type[start_pos + 7..]; let mut depth = 1;
2448 let mut end_pos = None;
2449 for (i, ch) in struct_content.char_indices() {
2450 match ch {
2451 '<' => depth += 1,
2452 '>' => {
2453 depth -= 1;
2454 if depth == 0 {
2455 end_pos = Some(i);
2456 break;
2457 }
2458 }
2459 _ => {}
2460 }
2461 }
2462
2463 let struct_fields_str = if let Some(end) = end_pos {
2465 &struct_content[..end]
2466 } else {
2467 struct_content.trim_end_matches('>').trim()
2469 };
2470
2471 let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
2473
2474 for (nested_name, nested_type) in fields {
2476 let nested_type_upper = nested_type.to_uppercase();
2477 let nested_col_name = format!("{}.{}", field_name, nested_name);
2478
2479 if nested_type_upper.starts_with("STRUCT<") {
2481 let nested_struct_type_str = if nested_type_upper.starts_with("ARRAY<STRUCT<") {
2484 nested_type.clone()
2486 } else {
2487 nested_type.clone()
2489 };
2490
2491 match self.parse_struct_type_from_string(
2493 &nested_col_name,
2494 &nested_struct_type_str,
2495 field_data,
2496 ) {
2497 Ok(nested_cols) => {
2498 columns.extend(nested_cols);
2500 }
2501 Err(_) => {
2502 columns.push(Column {
2504 name: nested_col_name,
2505 data_type: normalize_data_type(&nested_type),
2506 nullable: !field_data
2507 .get("required")
2508 .and_then(|v| v.as_bool())
2509 .unwrap_or(false),
2510 primary_key: false,
2511 secondary_key: false,
2512 composite_key: None,
2513 foreign_key: None,
2514 constraints: Vec::new(),
2515 description: field_data
2516 .get("description")
2517 .and_then(|v| v.as_str())
2518 .unwrap_or("")
2519 .to_string(),
2520 errors: Vec::new(),
2521 quality: Vec::new(),
2522 enum_values: Vec::new(),
2523 column_order: 0,
2524 });
2525 }
2526 }
2527 } else {
2528 columns.push(Column {
2530 name: nested_col_name,
2531 data_type: normalize_data_type(&nested_type),
2532 nullable: !field_data
2533 .get("required")
2534 .and_then(|v| v.as_bool())
2535 .unwrap_or(false),
2536 primary_key: false,
2537 secondary_key: false,
2538 composite_key: None,
2539 foreign_key: None,
2540 constraints: Vec::new(),
2541 description: field_data
2542 .get("description")
2543 .and_then(|v| v.as_str())
2544 .unwrap_or("")
2545 .to_string(),
2546 errors: Vec::new(),
2547 quality: Vec::new(),
2548 enum_values: Vec::new(),
2549 column_order: 0,
2550 });
2551 }
2552 }
2553
2554 return Ok(columns);
2555 }
2556
2557 Ok(Vec::new())
2559 }
2560
2561 fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
2563 let mut fields = Vec::new();
2564 let mut current_field = String::new();
2565 let mut depth = 0;
2566 let mut in_string = false;
2567 let mut string_char = None;
2568
2569 for ch in fields_str.chars() {
2570 match ch {
2571 '\'' | '"' if !in_string || Some(ch) == string_char => {
2572 if in_string {
2573 in_string = false;
2574 string_char = None;
2575 } else {
2576 in_string = true;
2577 string_char = Some(ch);
2578 }
2579 current_field.push(ch);
2580 }
2581 '<' if !in_string => {
2582 depth += 1;
2583 current_field.push(ch);
2584 }
2585 '>' if !in_string => {
2586 depth -= 1;
2587 current_field.push(ch);
2588 }
2589 ',' if !in_string && depth == 0 => {
2590 let trimmed = current_field.trim();
2592 if !trimmed.is_empty()
2593 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2594 {
2595 fields.push((name, type_part));
2596 }
2597 current_field.clear();
2598 }
2599 _ => {
2600 current_field.push(ch);
2601 }
2602 }
2603 }
2604
2605 let trimmed = current_field.trim();
2607 if !trimmed.is_empty()
2608 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2609 {
2610 fields.push((name, type_part));
2611 }
2612
2613 Ok(fields)
2614 }
2615
2616 fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
2618 let colon_pos = field_def.find(':')?;
2620 let name = field_def[..colon_pos].trim().to_string();
2621 let type_part = field_def[colon_pos + 1..].trim().to_string();
2622
2623 if name.is_empty() || type_part.is_empty() {
2624 return None;
2625 }
2626
2627 Some((name, type_part))
2628 }
2629
2630 fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
2632 let mut catalog_name = None;
2633 let mut schema_name = None;
2634
2635 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2636 for prop in custom_props {
2637 if let Some(prop_obj) = prop.as_object() {
2638 let prop_key = prop_obj
2639 .get("property")
2640 .and_then(|v| v.as_str())
2641 .unwrap_or("");
2642 let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
2643
2644 match prop_key {
2645 "catalogName" | "catalog_name" => {
2646 catalog_name = prop_value.map(|s| s.to_string());
2647 }
2648 "schemaName" | "schema_name" => {
2649 schema_name = prop_value.map(|s| s.to_string());
2650 }
2651 _ => {}
2652 }
2653 }
2654 }
2655 }
2656
2657 if catalog_name.is_none() {
2659 catalog_name = data
2660 .get("catalog_name")
2661 .and_then(|v| v.as_str())
2662 .map(|s| s.to_string());
2663 }
2664 if schema_name.is_none() {
2665 schema_name = data
2666 .get("schema_name")
2667 .and_then(|v| v.as_str())
2668 .map(|s| s.to_string());
2669 }
2670
2671 (catalog_name, schema_name)
2672 }
2673}
2674
2675impl Default for ODCSImporter {
2676 fn default() -> Self {
2677 Self::new()
2678 }
2679}
2680
2681#[derive(Debug, Clone)]
2683pub struct ParserError {
2684 pub error_type: String,
2685 pub field: String,
2686 pub message: String,
2687}
2688
2689fn yaml_to_json_value(yaml: &serde_yaml::Value) -> Result<JsonValue> {
2691 let json_str = serde_json::to_string(yaml).context("Failed to convert YAML to JSON")?;
2693 serde_json::from_str(&json_str).context("Failed to parse JSON")
2694}
2695
2696fn json_value_to_serde_value(value: &JsonValue) -> serde_json::Value {
2698 value.clone()
2699}
2700
2701fn normalize_data_type(data_type: &str) -> String {
2703 if data_type.is_empty() {
2704 return data_type.to_string();
2705 }
2706
2707 let upper = data_type.to_uppercase();
2708
2709 if upper.starts_with("STRUCT") {
2711 if let Some(start) = data_type.find('<')
2712 && let Some(end) = data_type.rfind('>')
2713 {
2714 let inner = &data_type[start + 1..end];
2715 return format!("STRUCT<{}>", inner);
2716 }
2717 return format!("STRUCT{}", &data_type[6..]);
2718 } else if upper.starts_with("ARRAY") {
2719 if let Some(start) = data_type.find('<')
2720 && let Some(end) = data_type.rfind('>')
2721 {
2722 let inner = &data_type[start + 1..end];
2723 return format!("ARRAY<{}>", inner);
2724 }
2725 return format!("ARRAY{}", &data_type[5..]);
2726 } else if upper.starts_with("MAP") {
2727 if let Some(start) = data_type.find('<')
2728 && let Some(end) = data_type.rfind('>')
2729 {
2730 let inner = &data_type[start + 1..end];
2731 return format!("MAP<{}>", inner);
2732 }
2733 return format!("MAP{}", &data_type[3..]);
2734 }
2735
2736 upper
2737}
2738
2739fn parse_medallion_layer(s: &str) -> Result<MedallionLayer> {
2741 match s.to_uppercase().as_str() {
2742 "BRONZE" => Ok(MedallionLayer::Bronze),
2743 "SILVER" => Ok(MedallionLayer::Silver),
2744 "GOLD" => Ok(MedallionLayer::Gold),
2745 "OPERATIONAL" => Ok(MedallionLayer::Operational),
2746 _ => Err(anyhow::anyhow!("Unknown medallion layer: {}", s)),
2747 }
2748}
2749
2750fn parse_scd_pattern(s: &str) -> Result<SCDPattern> {
2751 match s.to_uppercase().as_str() {
2752 "TYPE_1" | "TYPE1" => Ok(SCDPattern::Type1),
2753 "TYPE_2" | "TYPE2" => Ok(SCDPattern::Type2),
2754 _ => Err(anyhow::anyhow!("Unknown SCD pattern: {}", s)),
2755 }
2756}
2757
2758fn parse_data_vault_classification(s: &str) -> Result<DataVaultClassification> {
2759 match s.to_uppercase().as_str() {
2760 "HUB" => Ok(DataVaultClassification::Hub),
2761 "LINK" => Ok(DataVaultClassification::Link),
2762 "SATELLITE" | "SAT" => Ok(DataVaultClassification::Satellite),
2763 _ => Err(anyhow::anyhow!("Unknown Data Vault classification: {}", s)),
2764 }
2765}
2766
2767#[cfg(test)]
2768mod tests {
2769 use super::*;
2770
2771 #[test]
2772 fn test_parse_simple_odcl_table() {
2773 let mut parser = ODCSImporter::new();
2774 let odcl_yaml = r#"
2775name: users
2776columns:
2777 - name: id
2778 data_type: INT
2779 nullable: false
2780 primary_key: true
2781 - name: name
2782 data_type: VARCHAR(255)
2783 nullable: false
2784database_type: Postgres
2785"#;
2786
2787 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2788 assert_eq!(table.name, "users");
2789 assert_eq!(table.columns.len(), 2);
2790 assert_eq!(table.columns[0].name, "id");
2791 assert_eq!(table.database_type, Some(DatabaseType::Postgres));
2792 assert_eq!(errors.len(), 0);
2793 }
2794
2795 #[test]
2796 fn test_parse_odcl_with_metadata() {
2797 let mut parser = ODCSImporter::new();
2798 let odcl_yaml = r#"
2799name: users
2800columns:
2801 - name: id
2802 data_type: INT
2803medallion_layer: gold
2804scd_pattern: TYPE_2
2805odcl_metadata:
2806 description: "User table"
2807 owner: "data-team"
2808"#;
2809
2810 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2811 assert_eq!(table.medallion_layers.len(), 1);
2812 assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
2813 assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
2814 if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
2815 assert_eq!(desc, "User table");
2816 }
2817 assert_eq!(errors.len(), 0);
2818 }
2819
2820 #[test]
2821 fn test_parse_odcl_with_data_vault() {
2822 let mut parser = ODCSImporter::new();
2823 let odcl_yaml = r#"
2824name: hub_customer
2825columns:
2826 - name: customer_key
2827 data_type: VARCHAR(50)
2828data_vault_classification: Hub
2829"#;
2830
2831 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2832 assert_eq!(
2833 table.data_vault_classification,
2834 Some(DataVaultClassification::Hub)
2835 );
2836 assert_eq!(errors.len(), 0);
2837 }
2838
2839 #[test]
2840 fn test_parse_invalid_odcl() {
2841 let mut parser = ODCSImporter::new();
2842 let invalid_yaml = "not: valid: yaml: structure:";
2843
2844 assert!(parser.parse(invalid_yaml).is_err());
2846 }
2847
2848 #[test]
2849 fn test_parse_odcl_missing_required_fields() {
2850 let mut parser = ODCSImporter::new();
2851 let non_conformant = r#"
2852name: users
2853# Missing required columns field
2854"#;
2855
2856 assert!(parser.parse(non_conformant).is_err());
2858 }
2859
2860 #[test]
2861 fn test_parse_odcl_with_foreign_key() {
2862 let mut parser = ODCSImporter::new();
2863 let odcl_yaml = r#"
2864name: orders
2865columns:
2866 - name: id
2867 data_type: INT
2868 primary_key: true
2869 - name: user_id
2870 data_type: INT
2871 foreign_key:
2872 table_id: users
2873 column_name: id
2874"#;
2875
2876 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2877 assert_eq!(table.columns.len(), 2);
2878 let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
2879 assert!(user_id_col.foreign_key.is_some());
2880 assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
2881 assert_eq!(errors.len(), 0);
2882 }
2883
2884 #[test]
2885 fn test_parse_odcl_with_constraints() {
2886 let mut parser = ODCSImporter::new();
2887 let odcl_yaml = r#"
2888name: products
2889columns:
2890 - name: id
2891 data_type: INT
2892 primary_key: true
2893 - name: name
2894 data_type: VARCHAR(255)
2895 nullable: false
2896 constraints:
2897 - UNIQUE
2898 - NOT NULL
2899"#;
2900
2901 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2902 assert_eq!(table.columns.len(), 2);
2903 let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
2904 assert!(!name_col.nullable);
2905 assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
2906 assert_eq!(errors.len(), 0);
2907 }
2908}