1use super::{ImportError, ImportResult, TableData};
12use crate::models::column::ForeignKey;
13use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
14use crate::models::{Column, Table};
15use anyhow::{Context, Result};
16use serde_json::Value as JsonValue;
17use std::collections::HashMap;
18use tracing::info;
19
20pub struct ODCSImporter {
23 current_yaml_data: Option<serde_yaml::Value>,
25}
26
27impl ODCSImporter {
28 pub fn new() -> Self {
38 Self {
39 current_yaml_data: None,
40 }
41 }
42
43 pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
76 match self.parse(yaml_content) {
77 Ok((table, errors)) => {
78 let sdk_tables = vec![TableData {
79 table_index: 0,
80 name: Some(table.name.clone()),
81 columns: table
82 .columns
83 .iter()
84 .map(|c| super::ColumnData {
85 name: c.name.clone(),
86 data_type: c.data_type.clone(),
87 nullable: c.nullable,
88 primary_key: c.primary_key,
89 })
90 .collect(),
91 }];
92 let sdk_errors: Vec<ImportError> = errors
93 .iter()
94 .map(|e| ImportError::ParseError(e.message.clone()))
95 .collect();
96 Ok(ImportResult {
97 tables: sdk_tables,
98 tables_requiring_name: Vec::new(),
99 errors: sdk_errors,
100 ai_suggestions: None,
101 })
102 }
103 Err(e) => Err(ImportError::ParseError(e.to_string())),
104 }
105 }
106
107 pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
118 self.parse(yaml_content)
119 }
120
121 fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
131 let _errors: Vec<ParserError> = Vec::new();
133
134 let data: serde_yaml::Value =
136 serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
137
138 if data.is_null() {
139 return Err(anyhow::anyhow!("Empty YAML content"));
140 }
141
142 self.current_yaml_data = Some(data.clone());
144
145 let json_data = yaml_to_json_value(&data)?;
147
148 if self.is_liquibase_format(&json_data) {
150 return self.parse_liquibase(&json_data);
151 }
152
153 if self.is_odcl_v3_format(&json_data) {
154 return self.parse_odcl_v3(&json_data);
155 }
156
157 if self.is_data_contract_format(&json_data) {
158 return self.parse_data_contract(&json_data);
159 }
160
161 self.parse_simple_odcl(&json_data)
163 }
164
165 fn resolve_ref<'a>(&self, ref_str: &str, data: &'a JsonValue) -> Option<&'a JsonValue> {
167 if !ref_str.starts_with("#/") {
168 return None;
169 }
170
171 let path = &ref_str[2..];
173 let parts: Vec<&str> = path.split('/').collect();
174
175 let mut current = data;
177 for part in parts {
178 current = current.get(part)?;
179 }
180
181 if current.is_object() {
182 Some(current)
183 } else {
184 None
185 }
186 }
187
188 fn is_liquibase_format(&self, data: &JsonValue) -> bool {
190 if data.get("databaseChangeLog").is_some() {
191 return true;
192 }
193 if let Some(obj) = data.as_object() {
195 let obj_str = format!("{:?}", obj);
196 if obj_str.contains("changeSet") {
197 return true;
198 }
199 }
200 false
201 }
202
203 fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
205 if let Some(obj) = data.as_object() {
206 let has_api_version = obj.contains_key("apiVersion");
207 let has_kind = obj
208 .get("kind")
209 .and_then(|v| v.as_str())
210 .map(|s| s == "DataContract")
211 .unwrap_or(false);
212 let has_id = obj.contains_key("id");
213 let has_version = obj.contains_key("version");
214 return has_api_version && has_kind && has_id && has_version;
215 }
216 false
217 }
218
219 fn is_data_contract_format(&self, data: &JsonValue) -> bool {
221 if let Some(obj) = data.as_object() {
222 let has_spec = obj.contains_key("dataContractSpecification");
223 let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
224 return has_spec && has_models;
225 }
226 false
227 }
228
229 fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
231 let mut errors = Vec::new();
232
233 let name = data
235 .get("name")
236 .and_then(|v| v.as_str())
237 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
238 .to_string();
239
240 let columns_data = data
242 .get("columns")
243 .and_then(|v| v.as_array())
244 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
245
246 let mut columns = Vec::new();
247 for (idx, col_data) in columns_data.iter().enumerate() {
248 match self.parse_column(col_data) {
249 Ok(col) => columns.push(col),
250 Err(e) => {
251 errors.push(ParserError {
252 error_type: "column_parse_error".to_string(),
253 field: format!("columns[{}]", idx),
254 message: e.to_string(),
255 });
256 }
257 }
258 }
259
260 let database_type = self.extract_database_type(data);
262 let medallion_layers = self.extract_medallion_layers(data);
263 let scd_pattern = self.extract_scd_pattern(data);
264 let data_vault_classification = self.extract_data_vault_classification(data);
265 let quality_rules = self.extract_quality_rules(data);
266
267 if scd_pattern.is_some() && data_vault_classification.is_some() {
269 errors.push(ParserError {
270 error_type: "validation_error".to_string(),
271 field: "patterns".to_string(),
272 message: "SCD pattern and Data Vault classification are mutually exclusive"
273 .to_string(),
274 });
275 }
276
277 let mut odcl_metadata = HashMap::new();
279 if let Some(metadata) = data.get("odcl_metadata")
280 && let Some(obj) = metadata.as_object()
281 {
282 for (key, value) in obj {
283 odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
284 }
285 }
286
287 let table_uuid = self.extract_table_uuid(data);
288
289 let table = Table {
290 id: table_uuid,
291 name,
292 columns,
293 database_type,
294 catalog_name: None,
295 schema_name: None,
296 medallion_layers,
297 scd_pattern,
298 data_vault_classification,
299 modeling_level: None,
300 tags: Vec::new(),
301 odcl_metadata,
302 position: None,
303 yaml_file_path: None,
304 drawio_cell_id: None,
305 quality: quality_rules,
306 errors: Vec::new(),
307 created_at: chrono::Utc::now(),
308 updated_at: chrono::Utc::now(),
309 };
310
311 info!("Parsed ODCL table: {}", table.name);
312 Ok((table, errors))
313 }
314
315 fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
317 let name = col_data
318 .get("name")
319 .and_then(|v| v.as_str())
320 .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
321 .to_string();
322
323 let data_type = col_data
324 .get("data_type")
325 .and_then(|v| v.as_str())
326 .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
327 .to_string();
328
329 let data_type = normalize_data_type(&data_type);
331
332 let nullable = col_data
333 .get("nullable")
334 .and_then(|v| v.as_bool())
335 .unwrap_or(true);
336
337 let primary_key = col_data
338 .get("primary_key")
339 .and_then(|v| v.as_bool())
340 .unwrap_or(false);
341
342 let foreign_key = col_data
343 .get("foreign_key")
344 .and_then(|v| self.parse_foreign_key(v));
345
346 let constraints = col_data
347 .get("constraints")
348 .and_then(|v| v.as_array())
349 .map(|arr| {
350 arr.iter()
351 .filter_map(|v| v.as_str().map(|s| s.to_string()))
352 .collect()
353 })
354 .unwrap_or_default();
355
356 let description = col_data
357 .get("description")
358 .and_then(|v| v.as_str())
359 .map(|s| s.to_string())
360 .unwrap_or_default();
361
362 let mut column_quality_rules = Vec::new();
364 if let Some(quality_val) = col_data.get("quality") {
365 if let Some(arr) = quality_val.as_array() {
366 for item in arr {
368 if let Some(obj) = item.as_object() {
369 let mut rule = HashMap::new();
370 for (key, value) in obj {
371 rule.insert(key.clone(), json_value_to_serde_value(value));
372 }
373 column_quality_rules.push(rule);
374 }
375 }
376 } else if let Some(obj) = quality_val.as_object() {
377 let mut rule = HashMap::new();
379 for (key, value) in obj {
380 rule.insert(key.clone(), json_value_to_serde_value(value));
381 }
382 column_quality_rules.push(rule);
383 }
384 }
385
386 if !nullable {
388 let has_not_null = column_quality_rules.iter().any(|rule| {
389 rule.get("type")
390 .and_then(|v| v.as_str())
391 .map(|s| {
392 s.to_lowercase().contains("not_null")
393 || s.to_lowercase().contains("notnull")
394 })
395 .unwrap_or(false)
396 });
397 if !has_not_null {
398 let mut not_null_rule = HashMap::new();
399 not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
400 not_null_rule.insert(
401 "description".to_string(),
402 serde_json::json!("Column must not be null"),
403 );
404 column_quality_rules.push(not_null_rule);
405 }
406 }
407
408 Ok(Column {
409 name,
410 data_type,
411 nullable,
412 primary_key,
413 secondary_key: false,
414 composite_key: None,
415 foreign_key,
416 constraints,
417 description,
418 errors: Vec::new(),
419 quality: column_quality_rules,
420 enum_values: Vec::new(),
421 column_order: 0,
422 })
423 }
424
425 fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
427 let obj = fk_data.as_object()?;
428 Some(ForeignKey {
429 table_id: obj
430 .get("table_id")
431 .or_else(|| obj.get("table"))
432 .and_then(|v| v.as_str())
433 .unwrap_or("")
434 .to_string(),
435 column_name: obj
436 .get("column_name")
437 .or_else(|| obj.get("column"))
438 .and_then(|v| v.as_str())
439 .unwrap_or("")
440 .to_string(),
441 })
442 }
443
444 fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
446 data.get("database_type")
447 .and_then(|v| v.as_str())
448 .and_then(|s| match s.to_uppercase().as_str() {
449 "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
450 "MYSQL" => Some(DatabaseType::Mysql),
451 "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
452 "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
453 "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
454 _ => None,
455 })
456 }
457
458 fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
460 let mut layers = Vec::new();
461
462 if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
464 for item in arr {
465 if let Some(s) = item.as_str()
466 && let Ok(layer) = parse_medallion_layer(s)
467 {
468 layers.push(layer);
469 }
470 }
471 }
472 else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
474 && let Ok(layer) = parse_medallion_layer(s)
475 {
476 layers.push(layer);
477 }
478
479 layers
480 }
481
482 fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
484 data.get("scd_pattern")
485 .and_then(|v| v.as_str())
486 .and_then(|s| parse_scd_pattern(s).ok())
487 }
488
489 fn extract_data_vault_classification(
491 &self,
492 data: &JsonValue,
493 ) -> Option<DataVaultClassification> {
494 data.get("data_vault_classification")
495 .and_then(|v| v.as_str())
496 .and_then(|s| parse_data_vault_classification(s).ok())
497 }
498
499 fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
501 use serde_json::Value;
502 let mut quality_rules = Vec::new();
503
504 if let Some(quality_val) = data.get("quality") {
506 if let Some(arr) = quality_val.as_array() {
507 for item in arr {
509 if let Some(obj) = item.as_object() {
510 let mut rule = HashMap::new();
511 for (key, value) in obj {
512 rule.insert(key.clone(), json_value_to_serde_value(value));
513 }
514 quality_rules.push(rule);
515 }
516 }
517 } else if let Some(obj) = quality_val.as_object() {
518 let mut rule = HashMap::new();
520 for (key, value) in obj {
521 rule.insert(key.clone(), json_value_to_serde_value(value));
522 }
523 quality_rules.push(rule);
524 } else if let Some(s) = quality_val.as_str() {
525 let mut rule = HashMap::new();
527 rule.insert("value".to_string(), Value::String(s.to_string()));
528 quality_rules.push(rule);
529 }
530 }
531
532 if let Some(metadata) = data.get("metadata")
534 && let Some(metadata_obj) = metadata.as_object()
535 && let Some(quality_val) = metadata_obj.get("quality")
536 {
537 if let Some(arr) = quality_val.as_array() {
538 for item in arr {
540 if let Some(obj) = item.as_object() {
541 let mut rule = HashMap::new();
542 for (key, value) in obj {
543 rule.insert(key.clone(), json_value_to_serde_value(value));
544 }
545 quality_rules.push(rule);
546 }
547 }
548 } else if let Some(obj) = quality_val.as_object() {
549 let mut rule = HashMap::new();
551 for (key, value) in obj {
552 rule.insert(key.clone(), json_value_to_serde_value(value));
553 }
554 quality_rules.push(rule);
555 } else if let Some(s) = quality_val.as_str() {
556 let mut rule = HashMap::new();
558 rule.insert("value".to_string(), Value::String(s.to_string()));
559 quality_rules.push(rule);
560 }
561 }
562
563 if let Some(tblprops) = data.get("tblproperties")
565 && let Some(obj) = tblprops.as_object()
566 {
567 for (key, value) in obj {
568 let mut rule = HashMap::new();
569 rule.insert("property".to_string(), Value::String(key.clone()));
570 rule.insert("value".to_string(), json_value_to_serde_value(value));
571 quality_rules.push(rule);
572 }
573 }
574
575 quality_rules
576 }
577
578 fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
586 let mut errors = Vec::new();
600
601 let changelog = data
602 .get("databaseChangeLog")
603 .and_then(|v| v.as_array())
604 .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
605
606 let mut table_name: Option<String> = None;
608 let mut columns: Vec<crate::models::column::Column> = Vec::new();
609
610 for entry in changelog {
611 if let Some(change_set) = entry.get("changeSet") {
613 let changes = if let Some(obj) = change_set.as_object() {
615 obj.get("changes")
616 .and_then(|v| v.as_array())
617 .cloned()
618 .unwrap_or_default()
619 } else if let Some(arr) = change_set.as_array() {
620 arr.clone()
622 } else {
623 Vec::new()
624 };
625
626 for ch in changes {
627 let create = ch.get("createTable").or_else(|| ch.get("create_table"));
628 if let Some(create) = create {
629 table_name = create
630 .get("tableName")
631 .or_else(|| create.get("table_name"))
632 .and_then(|v| v.as_str())
633 .map(|s| s.to_string());
634
635 if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
637 for col_entry in cols {
638 let col = col_entry.get("column").unwrap_or(col_entry);
639 let name = col
640 .get("name")
641 .and_then(|v| v.as_str())
642 .unwrap_or("")
643 .to_string();
644 let data_type = col
645 .get("type")
646 .and_then(|v| v.as_str())
647 .unwrap_or("")
648 .to_string();
649
650 if name.is_empty() {
651 errors.push(ParserError {
652 error_type: "validation_error".to_string(),
653 field: "columns.name".to_string(),
654 message: "Liquibase createTable column missing name"
655 .to_string(),
656 });
657 continue;
658 }
659
660 let mut column =
661 crate::models::column::Column::new(name, data_type);
662
663 if let Some(constraints) =
664 col.get("constraints").and_then(|v| v.as_object())
665 {
666 if let Some(pk) =
667 constraints.get("primaryKey").and_then(|v| v.as_bool())
668 {
669 column.primary_key = pk;
670 }
671 if let Some(nullable) =
672 constraints.get("nullable").and_then(|v| v.as_bool())
673 {
674 column.nullable = nullable;
675 }
676 }
677
678 columns.push(column);
679 }
680 }
681
682 break;
685 }
686 }
687 }
688 if table_name.is_some() {
689 break;
690 }
691 }
692
693 let table_name = table_name
694 .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
695 let table = Table::new(table_name, columns);
696 Ok((table, errors))
698 }
699
700 fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
702 let mut errors = Vec::new();
703
704 let table_name = data
706 .get("name")
707 .and_then(|v| v.as_str())
708 .map(|s| s.to_string())
709 .or_else(|| {
710 data.get("schema")
712 .and_then(|v| v.as_array())
713 .and_then(|arr| arr.first())
714 .and_then(|obj| obj.as_object())
715 .and_then(|obj| obj.get("name"))
716 .and_then(|v| v.as_str())
717 .map(|s| s.to_string())
718 })
719 .ok_or_else(|| {
720 anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
721 })?;
722
723 let schema = data
725 .get("schema")
726 .and_then(|v| v.as_array())
727 .ok_or_else(|| {
728 errors.push(ParserError {
729 error_type: "validation_error".to_string(),
730 field: "schema".to_string(),
731 message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
732 });
733 anyhow::anyhow!("Missing schema")
734 });
735
736 let schema = match schema {
737 Ok(s) if s.is_empty() => {
738 errors.push(ParserError {
739 error_type: "validation_error".to_string(),
740 field: "schema".to_string(),
741 message: "ODCS v3.0.x schema array is empty".to_string(),
742 });
743 let quality_rules = self.extract_quality_rules(data);
744 let table_uuid = self.extract_table_uuid(data);
745 let table = Table {
746 id: table_uuid,
747 name: table_name,
748 columns: Vec::new(),
749 database_type: None,
750 catalog_name: None,
751 schema_name: None,
752 medallion_layers: Vec::new(),
753 scd_pattern: None,
754 data_vault_classification: None,
755 modeling_level: None,
756 tags: Vec::new(),
757 odcl_metadata: HashMap::new(),
758 position: None,
759 yaml_file_path: None,
760 drawio_cell_id: None,
761 quality: quality_rules,
762 errors: Vec::new(),
763 created_at: chrono::Utc::now(),
764 updated_at: chrono::Utc::now(),
765 };
766 return Ok((table, errors));
767 }
768 Ok(s) => s,
769 Err(_) => {
770 let quality_rules = self.extract_quality_rules(data);
771 let table_uuid = self.extract_table_uuid(data);
772 let table = Table {
773 id: table_uuid,
774 name: table_name,
775 columns: Vec::new(),
776 database_type: None,
777 catalog_name: None,
778 schema_name: None,
779 medallion_layers: Vec::new(),
780 scd_pattern: None,
781 data_vault_classification: None,
782 modeling_level: None,
783 tags: Vec::new(),
784 odcl_metadata: HashMap::new(),
785 position: None,
786 yaml_file_path: None,
787 drawio_cell_id: None,
788 quality: quality_rules,
789 errors: Vec::new(),
790 created_at: chrono::Utc::now(),
791 updated_at: chrono::Utc::now(),
792 };
793 return Ok((table, errors));
794 }
795 };
796
797 let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
799 errors.push(ParserError {
800 error_type: "validation_error".to_string(),
801 field: "schema[0]".to_string(),
802 message: "First schema object must be a dictionary".to_string(),
803 });
804 anyhow::anyhow!("Invalid schema object")
805 })?;
806
807 let object_name = schema_object
808 .get("name")
809 .and_then(|v| v.as_str())
810 .unwrap_or(&table_name);
811
812 let properties = schema_object
813 .get("properties")
814 .and_then(|v| v.as_object())
815 .ok_or_else(|| {
816 errors.push(ParserError {
817 error_type: "validation_error".to_string(),
818 field: format!("Object '{}'", object_name),
819 message: format!("Object '{}' missing 'properties' field", object_name),
820 });
821 anyhow::anyhow!("Missing properties")
822 })?;
823
824 let mut columns = Vec::new();
826 for (prop_name, prop_data) in properties {
827 if let Some(prop_obj) = prop_data.as_object() {
828 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
829 Ok(mut cols) => columns.append(&mut cols),
830 Err(e) => {
831 errors.push(ParserError {
832 error_type: "property_parse_error".to_string(),
833 field: format!("Property '{}'", prop_name),
834 message: e.to_string(),
835 });
836 }
837 }
838 } else {
839 errors.push(ParserError {
840 error_type: "validation_error".to_string(),
841 field: format!("Property '{}'", prop_name),
842 message: format!("Property '{}' must be an object", prop_name),
843 });
844 }
845 }
846
847 let (medallion_layers, scd_pattern, data_vault_classification, mut tags) =
849 self.extract_metadata_from_custom_properties(data);
850
851 let mut shared_domains: Vec<String> = Vec::new();
853 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
854 for prop in custom_props {
855 if let Some(prop_obj) = prop.as_object() {
856 let prop_key = prop_obj
857 .get("property")
858 .and_then(|v| v.as_str())
859 .unwrap_or("");
860 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
861 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
862 {
863 for item in arr {
864 if let Some(s) = item.as_str() {
865 shared_domains.push(s.to_string());
866 }
867 }
868 }
869 }
870 }
871 }
872
873 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
875 for item in tags_arr {
876 if let Some(s) = item.as_str()
877 && !tags.contains(&s.to_string())
878 {
879 tags.push(s.to_string());
880 }
881 }
882 }
883
884 let database_type = self.extract_database_type_from_odcl_v3_servers(data);
886
887 let quality_rules = self.extract_quality_rules(data);
889
890 let mut odcl_metadata = HashMap::new();
892 odcl_metadata.insert(
893 "apiVersion".to_string(),
894 json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
895 );
896 odcl_metadata.insert(
897 "kind".to_string(),
898 json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
899 );
900 odcl_metadata.insert(
901 "id".to_string(),
902 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
903 );
904 odcl_metadata.insert(
905 "version".to_string(),
906 json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
907 );
908 odcl_metadata.insert(
909 "status".to_string(),
910 json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
911 );
912
913 if let Some(servicelevels_val) = data.get("servicelevels") {
915 odcl_metadata.insert(
916 "servicelevels".to_string(),
917 json_value_to_serde_value(servicelevels_val),
918 );
919 }
920
921 if let Some(links_val) = data.get("links") {
923 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
924 }
925
926 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
928 odcl_metadata.insert(
929 "domain".to_string(),
930 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
931 );
932 }
933 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
934 odcl_metadata.insert(
935 "dataProduct".to_string(),
936 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
937 );
938 }
939 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
940 odcl_metadata.insert(
941 "tenant".to_string(),
942 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
943 );
944 }
945
946 if let Some(desc_val) = data.get("description") {
948 odcl_metadata.insert(
949 "description".to_string(),
950 json_value_to_serde_value(desc_val),
951 );
952 }
953
954 if let Some(pricing_val) = data.get("pricing") {
956 odcl_metadata.insert(
957 "pricing".to_string(),
958 json_value_to_serde_value(pricing_val),
959 );
960 }
961
962 if let Some(team_val) = data.get("team") {
964 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
965 }
966
967 if let Some(roles_val) = data.get("roles") {
969 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
970 }
971
972 if let Some(terms_val) = data.get("terms") {
974 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
975 }
976
977 if let Some(servers_val) = data.get("servers") {
979 odcl_metadata.insert(
980 "servers".to_string(),
981 json_value_to_serde_value(servers_val),
982 );
983 }
984
985 if let Some(infrastructure_val) = data.get("infrastructure") {
987 odcl_metadata.insert(
988 "infrastructure".to_string(),
989 json_value_to_serde_value(infrastructure_val),
990 );
991 }
992
993 if !shared_domains.is_empty() {
995 let shared_domains_json: Vec<serde_json::Value> = shared_domains
996 .iter()
997 .map(|d| serde_json::Value::String(d.clone()))
998 .collect();
999 odcl_metadata.insert(
1000 "sharedDomains".to_string(),
1001 serde_json::Value::Array(shared_domains_json),
1002 );
1003 }
1004
1005 let table_uuid = self.extract_table_uuid(data);
1006
1007 let table = Table {
1008 id: table_uuid,
1009 name: table_name,
1010 columns,
1011 database_type,
1012 catalog_name: None,
1013 schema_name: None,
1014 medallion_layers,
1015 scd_pattern,
1016 data_vault_classification,
1017 modeling_level: None,
1018 tags,
1019 odcl_metadata,
1020 position: None,
1021 yaml_file_path: None,
1022 drawio_cell_id: None,
1023 quality: quality_rules,
1024 errors: Vec::new(),
1025 created_at: chrono::Utc::now(),
1026 updated_at: chrono::Utc::now(),
1027 };
1028
1029 info!(
1030 "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1031 table.name,
1032 errors.len()
1033 );
1034 Ok((table, errors))
1035 }
1036
1037 fn parse_odcl_v3_property(
1039 &self,
1040 prop_name: &str,
1041 prop_data: &serde_json::Map<String, JsonValue>,
1042 data: &JsonValue,
1043 ) -> Result<Vec<Column>> {
1044 let mut errors = Vec::new();
1046 self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
1047 }
1048
1049 fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
1052 if let Some(id_val) = data.get("id")
1054 && let Some(id_str) = id_val.as_str()
1055 {
1056 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1057 tracing::debug!(
1058 "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
1059 uuid
1060 );
1061 return uuid;
1062 } else {
1063 tracing::warn!(
1064 "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
1065 id_str
1066 );
1067 }
1068 }
1069
1070 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1072 for prop in custom_props {
1073 if let Some(prop_obj) = prop.as_object() {
1074 let prop_key = prop_obj
1075 .get("property")
1076 .and_then(|v| v.as_str())
1077 .unwrap_or("");
1078 if prop_key == "tableUuid"
1079 && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
1080 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1081 {
1082 tracing::debug!(
1083 "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
1084 uuid
1085 );
1086 return uuid;
1087 }
1088 }
1089 }
1090 }
1091
1092 if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
1094 && let Some(uuid_val) = metadata.get("tableUuid")
1095 && let Some(uuid_str) = uuid_val.as_str()
1096 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1097 {
1098 tracing::debug!(
1099 "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
1100 uuid
1101 );
1102 return uuid;
1103 }
1104
1105 let table_name = data
1107 .get("name")
1108 .and_then(|v| v.as_str())
1109 .unwrap_or("unknown");
1110 let new_uuid = crate::models::table::Table::generate_id(
1111 table_name, None, None, None, );
1115 tracing::warn!(
1116 "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
1117 table_name,
1118 new_uuid
1119 );
1120 new_uuid
1121 }
1122
1123 fn extract_metadata_from_custom_properties(
1125 &self,
1126 data: &JsonValue,
1127 ) -> (
1128 Vec<MedallionLayer>,
1129 Option<SCDPattern>,
1130 Option<DataVaultClassification>,
1131 Vec<String>,
1132 ) {
1133 let mut medallion_layers = Vec::new();
1134 let mut scd_pattern = None;
1135 let mut data_vault_classification = None;
1136 let mut tags = Vec::new();
1137
1138 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1139 for prop in custom_props {
1140 if let Some(prop_obj) = prop.as_object() {
1141 let prop_key = prop_obj
1142 .get("property")
1143 .and_then(|v| v.as_str())
1144 .unwrap_or("");
1145 let prop_value = prop_obj.get("value");
1146
1147 match prop_key {
1148 "medallionLayers" | "medallion_layers" => {
1149 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1150 for item in arr {
1151 if let Some(s) = item.as_str()
1152 && let Ok(layer) = parse_medallion_layer(s)
1153 {
1154 medallion_layers.push(layer);
1155 }
1156 }
1157 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1158 for part in s.split(',') {
1160 if let Ok(layer) = parse_medallion_layer(part.trim()) {
1161 medallion_layers.push(layer);
1162 }
1163 }
1164 }
1165 }
1166 "scdPattern" | "scd_pattern" => {
1167 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1168 scd_pattern = parse_scd_pattern(s).ok();
1169 }
1170 }
1171 "dataVaultClassification" | "data_vault_classification" => {
1172 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1173 data_vault_classification = parse_data_vault_classification(s).ok();
1174 }
1175 }
1176 "tags" => {
1177 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1178 for item in arr {
1179 if let Some(s) = item.as_str() {
1180 tags.push(s.to_string());
1181 }
1182 }
1183 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1184 for part in s.split(',') {
1186 tags.push(part.trim().to_string());
1187 }
1188 }
1189 }
1190 "sharedDomains" | "shared_domains" => {
1191 }
1194 _ => {}
1195 }
1196 }
1197 }
1198 }
1199
1200 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1202 for item in tags_arr {
1203 if let Some(s) = item.as_str()
1204 && !tags.contains(&s.to_string())
1205 {
1206 tags.push(s.to_string());
1207 }
1208 }
1209 }
1210
1211 (
1212 medallion_layers,
1213 scd_pattern,
1214 data_vault_classification,
1215 tags,
1216 )
1217 }
1218
1219 fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
1221 if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
1223 && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
1224 {
1225 return server_obj
1226 .get("type")
1227 .and_then(|v| v.as_str())
1228 .and_then(|s| self.parse_database_type(s));
1229 }
1230 None
1231 }
1232
1233 fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1235 let mut errors = Vec::new();
1236
1237 let models = data
1239 .get("models")
1240 .and_then(|v| v.as_object())
1241 .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
1242
1243 let (model_name, model_data) = models
1246 .iter()
1247 .next()
1248 .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
1249
1250 let model_data = model_data
1251 .as_object()
1252 .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
1253
1254 let fields = model_data
1256 .get("fields")
1257 .and_then(|v| v.as_object())
1258 .ok_or_else(|| {
1259 errors.push(ParserError {
1260 error_type: "validation_error".to_string(),
1261 field: format!("Model '{}'", model_name),
1262 message: format!("Model '{}' missing 'fields' field", model_name),
1263 });
1264 anyhow::anyhow!("Missing fields")
1265 });
1266
1267 let fields = match fields {
1268 Ok(f) => f,
1269 Err(_) => {
1270 let quality_rules = self.extract_quality_rules(data);
1272 let table_uuid = self.extract_table_uuid(data);
1273 let table = Table {
1274 id: table_uuid,
1275 name: model_name.clone(),
1276 columns: Vec::new(),
1277 database_type: None,
1278 catalog_name: None,
1279 schema_name: None,
1280 medallion_layers: Vec::new(),
1281 scd_pattern: None,
1282 data_vault_classification: None,
1283 modeling_level: None,
1284 tags: Vec::new(),
1285 odcl_metadata: HashMap::new(),
1286 position: None,
1287 yaml_file_path: None,
1288 drawio_cell_id: None,
1289 quality: quality_rules,
1290 errors: Vec::new(),
1291 created_at: chrono::Utc::now(),
1292 updated_at: chrono::Utc::now(),
1293 };
1294 return Ok((table, errors));
1295 }
1296 };
1297
1298 let mut columns = Vec::new();
1300 for (field_name, field_data) in fields {
1301 if let Some(field_obj) = field_data.as_object() {
1302 match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
1303 Ok(mut cols) => columns.append(&mut cols),
1304 Err(e) => {
1305 errors.push(ParserError {
1306 error_type: "field_parse_error".to_string(),
1307 field: format!("Field '{}'", field_name),
1308 message: e.to_string(),
1309 });
1310 }
1311 }
1312 } else {
1313 errors.push(ParserError {
1314 error_type: "validation_error".to_string(),
1315 field: format!("Field '{}'", field_name),
1316 message: format!("Field '{}' must be an object", field_name),
1317 });
1318 }
1319 }
1320
1321 let mut odcl_metadata = HashMap::new();
1324
1325 if let Some(info_val) = data.get("info") {
1328 let info_json_value = json_value_to_serde_value(info_val);
1330 odcl_metadata.insert("info".to_string(), info_json_value);
1331 }
1332
1333 odcl_metadata.insert(
1334 "dataContractSpecification".to_string(),
1335 json_value_to_serde_value(
1336 data.get("dataContractSpecification")
1337 .unwrap_or(&JsonValue::Null),
1338 ),
1339 );
1340 odcl_metadata.insert(
1341 "id".to_string(),
1342 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1343 );
1344 if let Some(servicelevels_val) = data.get("servicelevels") {
1348 odcl_metadata.insert(
1349 "servicelevels".to_string(),
1350 json_value_to_serde_value(servicelevels_val),
1351 );
1352 }
1353
1354 if let Some(links_val) = data.get("links") {
1356 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1357 }
1358
1359 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1361 odcl_metadata.insert(
1362 "domain".to_string(),
1363 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1364 );
1365 }
1366 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1367 odcl_metadata.insert(
1368 "dataProduct".to_string(),
1369 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1370 );
1371 }
1372 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1373 odcl_metadata.insert(
1374 "tenant".to_string(),
1375 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1376 );
1377 }
1378
1379 if let Some(desc_val) = data.get("description") {
1381 odcl_metadata.insert(
1382 "description".to_string(),
1383 json_value_to_serde_value(desc_val),
1384 );
1385 }
1386
1387 if let Some(pricing_val) = data.get("pricing") {
1389 odcl_metadata.insert(
1390 "pricing".to_string(),
1391 json_value_to_serde_value(pricing_val),
1392 );
1393 }
1394
1395 if let Some(team_val) = data.get("team") {
1397 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1398 }
1399
1400 if let Some(roles_val) = data.get("roles") {
1402 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1403 }
1404
1405 if let Some(terms_val) = data.get("terms") {
1407 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1408 }
1409
1410 if let Some(servers_val) = data.get("servers") {
1412 odcl_metadata.insert(
1413 "servers".to_string(),
1414 json_value_to_serde_value(servers_val),
1415 );
1416 }
1417
1418 if let Some(infrastructure_val) = data.get("infrastructure") {
1420 odcl_metadata.insert(
1421 "infrastructure".to_string(),
1422 json_value_to_serde_value(infrastructure_val),
1423 );
1424 }
1425
1426 let database_type = self.extract_database_type_from_servers(data);
1428
1429 let (catalog_name, schema_name) = self.extract_catalog_schema(data);
1431
1432 let mut shared_domains: Vec<String> = Vec::new();
1434 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1435 for prop in custom_props {
1436 if let Some(prop_obj) = prop.as_object() {
1437 let prop_key = prop_obj
1438 .get("property")
1439 .and_then(|v| v.as_str())
1440 .unwrap_or("");
1441 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1442 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1443 {
1444 for item in arr {
1445 if let Some(s) = item.as_str() {
1446 shared_domains.push(s.to_string());
1447 }
1448 }
1449 }
1450 }
1451 }
1452 }
1453
1454 let mut tags = Vec::new();
1456 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1457 for item in tags_arr {
1458 if let Some(s) = item.as_str() {
1459 tags.push(s.to_string());
1460 }
1461 }
1462 }
1463
1464 let quality_rules = self.extract_quality_rules(data);
1466
1467 if !shared_domains.is_empty() {
1469 let shared_domains_json: Vec<serde_json::Value> = shared_domains
1470 .iter()
1471 .map(|d| serde_json::Value::String(d.clone()))
1472 .collect();
1473 odcl_metadata.insert(
1474 "sharedDomains".to_string(),
1475 serde_json::Value::Array(shared_domains_json),
1476 );
1477 }
1478
1479 let table_uuid = self.extract_table_uuid(data);
1480
1481 let table = Table {
1482 id: table_uuid,
1483 name: model_name.clone(),
1484 columns,
1485 database_type,
1486 catalog_name,
1487 schema_name,
1488 medallion_layers: Vec::new(),
1489 scd_pattern: None,
1490 data_vault_classification: None,
1491 modeling_level: None,
1492 tags,
1493 odcl_metadata,
1494 position: None,
1495 yaml_file_path: None,
1496 drawio_cell_id: None,
1497 quality: quality_rules,
1498 errors: Vec::new(),
1499 created_at: chrono::Utc::now(),
1500 updated_at: chrono::Utc::now(),
1501 };
1502
1503 info!(
1504 "Parsed Data Contract table: {} with {} warnings/errors",
1505 model_name,
1506 errors.len()
1507 );
1508 Ok((table, errors))
1509 }
1510
1511 #[allow(clippy::only_used_in_recursion)]
1516 fn expand_nested_column(
1517 &self,
1518 column_name: &str,
1519 schema: &JsonValue,
1520 nullable: bool,
1521 columns: &mut Vec<Column>,
1522 errors: &mut Vec<ParserError>,
1523 ) {
1524 let schema_obj = match schema.as_object() {
1525 Some(obj) => obj,
1526 None => {
1527 errors.push(ParserError {
1528 error_type: "parse_error".to_string(),
1529 field: column_name.to_string(),
1530 message: "Nested schema must be an object".to_string(),
1531 });
1532 return;
1533 }
1534 };
1535
1536 let schema_type = schema_obj
1537 .get("type")
1538 .and_then(|v| v.as_str())
1539 .unwrap_or("object");
1540
1541 match schema_type {
1542 "object" | "struct" => {
1543 if let Some(properties) = schema_obj.get("properties").and_then(|v| v.as_object()) {
1545 let nested_required: Vec<String> = schema_obj
1546 .get("required")
1547 .and_then(|v| v.as_array())
1548 .map(|arr| {
1549 arr.iter()
1550 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1551 .collect()
1552 })
1553 .unwrap_or_default();
1554
1555 for (nested_name, nested_schema) in properties {
1556 let nested_nullable = !nested_required.contains(nested_name);
1557 self.expand_nested_column(
1558 &format!("{}.{}", column_name, nested_name),
1559 nested_schema,
1560 nullable || nested_nullable,
1561 columns,
1562 errors,
1563 );
1564 }
1565 } else {
1566 let description = schema_obj
1568 .get("description")
1569 .and_then(|v| v.as_str())
1570 .unwrap_or("")
1571 .to_string();
1572 columns.push(Column {
1573 name: column_name.to_string(),
1574 data_type: "OBJECT".to_string(),
1575 nullable,
1576 primary_key: false,
1577 secondary_key: false,
1578 composite_key: None,
1579 foreign_key: None,
1580 constraints: Vec::new(),
1581 description,
1582 quality: Vec::new(),
1583 enum_values: Vec::new(),
1584 errors: Vec::new(),
1585 column_order: 0,
1586 });
1587 }
1588 }
1589 "array" => {
1590 let items = schema_obj.get("items").unwrap_or(schema);
1592 let items_type = items
1593 .as_object()
1594 .and_then(|obj| obj.get("type").and_then(|v| v.as_str()))
1595 .unwrap_or("string");
1596
1597 if items_type == "object" || items_type == "struct" {
1598 let description = schema_obj
1600 .get("description")
1601 .and_then(|v| v.as_str())
1602 .unwrap_or("")
1603 .to_string();
1604 columns.push(Column {
1605 name: column_name.to_string(),
1606 data_type: "ARRAY<OBJECT>".to_string(),
1607 nullable,
1608 primary_key: false,
1609 secondary_key: false,
1610 composite_key: None,
1611 foreign_key: None,
1612 constraints: Vec::new(),
1613 description,
1614 quality: Vec::new(),
1615 enum_values: Vec::new(),
1616 errors: Vec::new(),
1617 column_order: 0,
1618 });
1619 if let Some(properties) = items
1621 .as_object()
1622 .and_then(|obj| obj.get("properties").and_then(|v| v.as_object()))
1623 {
1624 let nested_required: Vec<String> = items
1625 .as_object()
1626 .and_then(|obj| obj.get("required").and_then(|v| v.as_array()))
1627 .map(|arr| {
1628 arr.iter()
1629 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1630 .collect()
1631 })
1632 .unwrap_or_default();
1633
1634 for (nested_name, nested_schema) in properties {
1635 let nested_nullable = !nested_required.contains(nested_name);
1636 self.expand_nested_column(
1637 &format!("{}.{}", column_name, nested_name),
1638 nested_schema,
1639 nullable || nested_nullable,
1640 columns,
1641 errors,
1642 );
1643 }
1644 }
1645 } else {
1646 let data_type = format!("ARRAY<{}>", items_type.to_uppercase());
1648 let description = schema_obj
1649 .get("description")
1650 .and_then(|v| v.as_str())
1651 .unwrap_or("")
1652 .to_string();
1653 columns.push(Column {
1654 name: column_name.to_string(),
1655 data_type,
1656 nullable,
1657 primary_key: false,
1658 secondary_key: false,
1659 composite_key: None,
1660 foreign_key: None,
1661 constraints: Vec::new(),
1662 description,
1663 quality: Vec::new(),
1664 enum_values: Vec::new(),
1665 errors: Vec::new(),
1666 column_order: 0,
1667 });
1668 }
1669 }
1670 _ => {
1671 let data_type = schema_type.to_uppercase();
1673 let description = schema_obj
1674 .get("description")
1675 .and_then(|v| v.as_str())
1676 .unwrap_or("")
1677 .to_string();
1678 let enum_values = schema_obj
1679 .get("enum")
1680 .and_then(|v| v.as_array())
1681 .map(|arr| {
1682 arr.iter()
1683 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1684 .collect()
1685 })
1686 .unwrap_or_default();
1687 columns.push(Column {
1688 name: column_name.to_string(),
1689 data_type,
1690 nullable,
1691 primary_key: false,
1692 secondary_key: false,
1693 composite_key: None,
1694 foreign_key: None,
1695 constraints: Vec::new(),
1696 description,
1697 quality: Vec::new(),
1698 enum_values,
1699 errors: Vec::new(),
1700 column_order: 0,
1701 });
1702 }
1703 }
1704 }
1705
1706 fn parse_data_contract_field(
1708 &self,
1709 field_name: &str,
1710 field_data: &serde_json::Map<String, JsonValue>,
1711 data: &JsonValue,
1712 errors: &mut Vec<ParserError>,
1713 ) -> Result<Vec<Column>> {
1714 let mut columns = Vec::new();
1715
1716 let extract_quality_from_obj =
1718 |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
1719 let mut quality_rules = Vec::new();
1720 if let Some(quality_val) = obj.get("quality") {
1721 if let Some(arr) = quality_val.as_array() {
1722 for item in arr {
1724 if let Some(rule_obj) = item.as_object() {
1725 let mut rule = HashMap::new();
1726 for (key, value) in rule_obj {
1727 rule.insert(key.clone(), json_value_to_serde_value(value));
1728 }
1729 quality_rules.push(rule);
1730 }
1731 }
1732 } else if let Some(rule_obj) = quality_val.as_object() {
1733 let mut rule = HashMap::new();
1735 for (key, value) in rule_obj {
1736 rule.insert(key.clone(), json_value_to_serde_value(value));
1737 }
1738 quality_rules.push(rule);
1739 }
1740 }
1741 quality_rules
1742 };
1743
1744 if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
1746 if let Some(definition) = self.resolve_ref(ref_str, data) {
1747 let mut quality_rules = extract_quality_from_obj(field_data);
1749
1750 if quality_rules.is_empty() {
1752 if let Some(def_obj) = definition.as_object() {
1753 quality_rules = extract_quality_from_obj(def_obj);
1754 }
1755 } else {
1756 if let Some(def_obj) = definition.as_object() {
1758 let def_quality = extract_quality_from_obj(def_obj);
1759 quality_rules.extend(def_quality);
1761 }
1762 }
1763
1764 let required = field_data
1765 .get("required")
1766 .and_then(|v| v.as_bool())
1767 .unwrap_or(false);
1768
1769 if required {
1771 let has_not_null = quality_rules.iter().any(|rule| {
1772 rule.get("type")
1773 .and_then(|v| v.as_str())
1774 .map(|s| {
1775 s.to_lowercase().contains("not_null")
1776 || s.to_lowercase().contains("notnull")
1777 })
1778 .unwrap_or(false)
1779 });
1780 if !has_not_null {
1781 let mut not_null_rule = HashMap::new();
1782 not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
1783 not_null_rule.insert(
1784 "description".to_string(),
1785 serde_json::json!("Column must not be null"),
1786 );
1787 quality_rules.push(not_null_rule);
1788 }
1789 }
1790
1791 let has_nested = definition
1793 .get("type")
1794 .and_then(|v| v.as_str())
1795 .map(|s| s == "object")
1796 .unwrap_or(false)
1797 || definition.get("properties").is_some()
1798 || definition.get("fields").is_some();
1799
1800 if has_nested {
1801 if let Some(properties) =
1803 definition.get("properties").and_then(|v| v.as_object())
1804 {
1805 let nested_required: Vec<String> = definition
1807 .get("required")
1808 .and_then(|v| v.as_array())
1809 .map(|arr| {
1810 arr.iter()
1811 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1812 .collect()
1813 })
1814 .unwrap_or_default();
1815
1816 for (nested_name, nested_schema) in properties {
1817 let nested_required_field = nested_required.contains(nested_name);
1818 self.expand_nested_column(
1819 &format!("{}.{}", field_name, nested_name),
1820 nested_schema,
1821 !nested_required_field,
1822 &mut columns,
1823 errors,
1824 );
1825 }
1826 } else if let Some(fields) =
1827 definition.get("fields").and_then(|v| v.as_object())
1828 {
1829 for (nested_name, nested_schema) in fields {
1831 self.expand_nested_column(
1832 &format!("{}.{}", field_name, nested_name),
1833 nested_schema,
1834 true, &mut columns,
1836 errors,
1837 );
1838 }
1839 } else {
1840 columns.push(Column {
1842 name: field_name.to_string(),
1843 data_type: "OBJECT".to_string(),
1844 nullable: !required,
1845 primary_key: false,
1846 secondary_key: false,
1847 composite_key: None,
1848 foreign_key: None,
1849 constraints: Vec::new(),
1850 description: field_data
1851 .get("description")
1852 .or_else(|| definition.get("description"))
1853 .and_then(|v| v.as_str())
1854 .unwrap_or("")
1855 .to_string(),
1856 errors: Vec::new(),
1857 quality: quality_rules.clone(),
1858 enum_values: Vec::new(),
1859 column_order: 0,
1860 });
1861 }
1862 } else {
1863 let def_type = definition
1865 .get("type")
1866 .and_then(|v| v.as_str())
1867 .unwrap_or("STRING")
1868 .to_uppercase();
1869
1870 let enum_values = definition
1871 .get("enum")
1872 .and_then(|v| v.as_array())
1873 .map(|arr| {
1874 arr.iter()
1875 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1876 .collect()
1877 })
1878 .unwrap_or_default();
1879
1880 columns.push(Column {
1881 name: field_name.to_string(),
1882 data_type: def_type,
1883 nullable: !required,
1884 primary_key: false,
1885 secondary_key: false,
1886 composite_key: None,
1887 foreign_key: None,
1888 constraints: Vec::new(),
1889 description: field_data
1890 .get("description")
1891 .or_else(|| definition.get("description"))
1892 .and_then(|v| v.as_str())
1893 .unwrap_or("")
1894 .to_string(),
1895 errors: Vec::new(),
1896 quality: quality_rules,
1897 enum_values,
1898 column_order: 0,
1899 });
1900 }
1901 return Ok(columns);
1902 } else {
1903 let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
1905 let mut error_map = HashMap::new();
1906 error_map.insert("type".to_string(), serde_json::json!("validation_error"));
1907 error_map.insert("field".to_string(), serde_json::json!("data_type"));
1908 error_map.insert(
1909 "message".to_string(),
1910 serde_json::json!(format!(
1911 "Field '{}' references undefined definition: {}",
1912 field_name, ref_str
1913 )),
1914 );
1915 col_errors.push(error_map);
1916 columns.push(Column {
1917 name: field_name.to_string(),
1918 data_type: "OBJECT".to_string(),
1919 nullable: true,
1920 primary_key: false,
1921 secondary_key: false,
1922 composite_key: None,
1923 foreign_key: None,
1924 constraints: Vec::new(),
1925 description: field_data
1926 .get("description")
1927 .and_then(|v| v.as_str())
1928 .unwrap_or("")
1929 .to_string(),
1930 errors: col_errors,
1931 quality: Vec::new(),
1932 enum_values: Vec::new(),
1933 column_order: 0,
1934 });
1935 return Ok(columns);
1936 }
1937 }
1938
1939 let field_type_str = field_data
1941 .get("type")
1942 .and_then(|v| v.as_str())
1943 .unwrap_or("STRING");
1944
1945 if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
1947 match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
1948 Ok(nested_cols) if !nested_cols.is_empty() => {
1949 let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
1951 "ARRAY<STRUCT<...>>".to_string()
1952 } else {
1953 "STRUCT<...>".to_string()
1954 };
1955
1956 columns.push(Column {
1958 name: field_name.to_string(),
1959 data_type: parent_data_type,
1960 nullable: !field_data
1961 .get("required")
1962 .and_then(|v| v.as_bool())
1963 .unwrap_or(false),
1964 primary_key: false,
1965 secondary_key: false,
1966 composite_key: None,
1967 foreign_key: None,
1968 constraints: Vec::new(),
1969 description: field_data
1970 .get("description")
1971 .and_then(|v| v.as_str())
1972 .unwrap_or("")
1973 .to_string(),
1974 errors: Vec::new(),
1975 quality: Vec::new(),
1976 enum_values: Vec::new(),
1977 column_order: 0,
1978 });
1979
1980 columns.extend(nested_cols);
1982 return Ok(columns);
1983 }
1984 Ok(_) | Err(_) => {
1985 }
1987 }
1988 }
1989
1990 let field_type = normalize_data_type(field_type_str);
1991
1992 if field_type == "ARRAY" {
1994 let items = field_data.get("items");
1995 if let Some(items_val) = items {
1996 if let Some(items_obj) = items_val.as_object() {
1997 if items_obj.get("fields").is_some()
1999 || items_obj.get("type").and_then(|v| v.as_str()) == Some("object")
2000 {
2001 columns.push(Column {
2003 name: field_name.to_string(),
2004 data_type: "ARRAY<OBJECT>".to_string(),
2005 nullable: !field_data
2006 .get("required")
2007 .and_then(|v| v.as_bool())
2008 .unwrap_or(false),
2009 primary_key: false,
2010 secondary_key: false,
2011 composite_key: None,
2012 foreign_key: None,
2013 constraints: Vec::new(),
2014 description: field_data
2015 .get("description")
2016 .and_then(|v| v.as_str())
2017 .unwrap_or("")
2018 .to_string(),
2019 errors: Vec::new(),
2020 quality: Vec::new(),
2021 enum_values: Vec::new(),
2022 column_order: 0,
2023 });
2024
2025 let nested_fields_obj = items_obj
2028 .get("properties")
2029 .and_then(|v| v.as_object())
2030 .or_else(|| items_obj.get("fields").and_then(|v| v.as_object()));
2031
2032 if let Some(fields_obj) = nested_fields_obj {
2033 for (nested_field_name, nested_field_data) in fields_obj {
2034 if let Some(nested_field_obj) = nested_field_data.as_object() {
2035 let nested_field_type = nested_field_obj
2036 .get("type")
2037 .and_then(|v| v.as_str())
2038 .unwrap_or("STRING");
2039
2040 let nested_col_name =
2042 format!("{}.{}", field_name, nested_field_name);
2043 let mut local_errors = Vec::new();
2044 match self.parse_data_contract_field(
2045 &nested_col_name,
2046 nested_field_obj,
2047 data,
2048 &mut local_errors,
2049 ) {
2050 Ok(mut nested_cols) => {
2051 columns.append(&mut nested_cols);
2054 }
2055 Err(_) => {
2056 columns.push(Column {
2058 name: nested_col_name,
2059 data_type: nested_field_type.to_uppercase(),
2060 nullable: !nested_field_obj
2061 .get("required")
2062 .and_then(|v| v.as_bool())
2063 .unwrap_or(false),
2064 primary_key: false,
2065 secondary_key: false,
2066 composite_key: None,
2067 foreign_key: None,
2068 constraints: Vec::new(),
2069 description: nested_field_obj
2070 .get("description")
2071 .and_then(|v| v.as_str())
2072 .unwrap_or("")
2073 .to_string(),
2074 errors: Vec::new(),
2075 quality: Vec::new(),
2076 enum_values: Vec::new(),
2077 column_order: 0,
2078 });
2079 }
2080 }
2081 }
2082 }
2083 }
2084
2085 return Ok(columns);
2086 } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
2087 columns.push(Column {
2089 name: field_name.to_string(),
2090 data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
2091 nullable: !field_data
2092 .get("required")
2093 .and_then(|v| v.as_bool())
2094 .unwrap_or(false),
2095 primary_key: false,
2096 secondary_key: false,
2097 composite_key: None,
2098 foreign_key: None,
2099 constraints: Vec::new(),
2100 description: field_data
2101 .get("description")
2102 .and_then(|v| v.as_str())
2103 .unwrap_or("")
2104 .to_string(),
2105 errors: Vec::new(),
2106 quality: Vec::new(),
2107 enum_values: Vec::new(),
2108 column_order: 0,
2109 });
2110 return Ok(columns);
2111 }
2112 } else if let Some(item_type_str) = items_val.as_str() {
2113 columns.push(Column {
2115 name: field_name.to_string(),
2116 data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
2117 nullable: !field_data
2118 .get("required")
2119 .and_then(|v| v.as_bool())
2120 .unwrap_or(false),
2121 primary_key: false,
2122 secondary_key: false,
2123 composite_key: None,
2124 foreign_key: None,
2125 constraints: Vec::new(),
2126 description: field_data
2127 .get("description")
2128 .and_then(|v| v.as_str())
2129 .unwrap_or("")
2130 .to_string(),
2131 errors: Vec::new(),
2132 quality: Vec::new(),
2133 enum_values: Vec::new(),
2134 column_order: 0,
2135 });
2136 return Ok(columns);
2137 }
2138 }
2139 columns.push(Column {
2141 name: field_name.to_string(),
2142 data_type: "ARRAY<STRING>".to_string(),
2143 nullable: !field_data
2144 .get("required")
2145 .and_then(|v| v.as_bool())
2146 .unwrap_or(false),
2147 primary_key: false,
2148 secondary_key: false,
2149 composite_key: None,
2150 foreign_key: None,
2151 constraints: Vec::new(),
2152 description: field_data
2153 .get("description")
2154 .and_then(|v| v.as_str())
2155 .unwrap_or("")
2156 .to_string(),
2157 errors: Vec::new(),
2158 quality: Vec::new(),
2159 enum_values: Vec::new(),
2160 column_order: 0,
2161 });
2162 return Ok(columns);
2163 }
2164
2165 let nested_fields_obj = field_data
2168 .get("properties")
2169 .and_then(|v| v.as_object())
2170 .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
2171
2172 if field_type == "OBJECT"
2173 && let Some(fields_obj) = nested_fields_obj
2174 {
2175 columns.push(Column {
2179 name: field_name.to_string(),
2180 data_type: "OBJECT".to_string(),
2181 nullable: !field_data
2182 .get("required")
2183 .and_then(|v| v.as_bool())
2184 .unwrap_or(false),
2185 primary_key: false,
2186 secondary_key: false,
2187 composite_key: None,
2188 foreign_key: None,
2189 constraints: Vec::new(),
2190 description: field_data
2191 .get("description")
2192 .and_then(|v| v.as_str())
2193 .unwrap_or("")
2194 .to_string(),
2195 errors: Vec::new(),
2196 quality: Vec::new(),
2197 enum_values: Vec::new(),
2198 column_order: 0,
2199 });
2200
2201 for (nested_field_name, nested_field_data) in fields_obj {
2203 if let Some(nested_field_obj) = nested_field_data.as_object() {
2204 let nested_field_type = nested_field_obj
2205 .get("type")
2206 .and_then(|v| v.as_str())
2207 .unwrap_or("STRING");
2208
2209 let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2211 match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data) {
2212 Ok(mut nested_cols) => {
2213 columns.append(&mut nested_cols);
2216 }
2217 Err(_) => {
2218 columns.push(Column {
2220 name: nested_col_name,
2221 data_type: nested_field_type.to_uppercase(),
2222 nullable: !nested_field_obj
2223 .get("required")
2224 .and_then(|v| v.as_bool())
2225 .unwrap_or(false),
2226 primary_key: false,
2227 secondary_key: false,
2228 composite_key: None,
2229 foreign_key: None,
2230 constraints: Vec::new(),
2231 description: nested_field_obj
2232 .get("description")
2233 .and_then(|v| v.as_str())
2234 .unwrap_or("")
2235 .to_string(),
2236 errors: Vec::new(),
2237 quality: Vec::new(),
2238 enum_values: Vec::new(),
2239 column_order: 0,
2240 });
2241 }
2242 }
2243 }
2244 }
2245
2246 return Ok(columns);
2247 }
2248
2249 let required = field_data
2251 .get("required")
2252 .and_then(|v| v.as_bool())
2253 .unwrap_or(false);
2254 let description = field_data
2255 .get("description")
2256 .and_then(|v| v.as_str())
2257 .unwrap_or("")
2258 .to_string();
2259
2260 let mut column_quality_rules = Vec::new();
2262 if let Some(quality_val) = field_data.get("quality") {
2263 if let Some(arr) = quality_val.as_array() {
2264 for item in arr {
2266 if let Some(obj) = item.as_object() {
2267 let mut rule = HashMap::new();
2268 for (key, value) in obj {
2269 rule.insert(key.clone(), json_value_to_serde_value(value));
2270 }
2271 column_quality_rules.push(rule);
2272 }
2273 }
2274 } else if let Some(obj) = quality_val.as_object() {
2275 let mut rule = HashMap::new();
2277 for (key, value) in obj {
2278 rule.insert(key.clone(), json_value_to_serde_value(value));
2279 }
2280 column_quality_rules.push(rule);
2281 }
2282 }
2283
2284 if required {
2286 let has_not_null = column_quality_rules.iter().any(|rule| {
2287 rule.get("type")
2288 .and_then(|v| v.as_str())
2289 .map(|s| {
2290 s.to_lowercase().contains("not_null")
2291 || s.to_lowercase().contains("notnull")
2292 })
2293 .unwrap_or(false)
2294 });
2295 if !has_not_null {
2296 let mut not_null_rule = HashMap::new();
2297 not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
2298 not_null_rule.insert(
2299 "description".to_string(),
2300 serde_json::json!("Column must not be null"),
2301 );
2302 column_quality_rules.push(not_null_rule);
2303 }
2304 }
2305
2306 columns.push(Column {
2307 name: field_name.to_string(),
2308 data_type: field_type,
2309 nullable: !required,
2310 primary_key: field_data
2311 .get("primaryKey")
2312 .and_then(|v| v.as_bool())
2313 .unwrap_or(false),
2314 secondary_key: false,
2315 composite_key: None,
2316 foreign_key: self.parse_foreign_key_from_data_contract(field_data),
2317 constraints: Vec::new(),
2318 description,
2319 errors: Vec::new(),
2320 quality: column_quality_rules,
2321 enum_values: Vec::new(),
2322 column_order: 0,
2323 });
2324
2325 Ok(columns)
2326 }
2327
2328 fn parse_foreign_key_from_data_contract(
2330 &self,
2331 field_data: &serde_json::Map<String, JsonValue>,
2332 ) -> Option<ForeignKey> {
2333 field_data
2334 .get("foreignKey")
2335 .and_then(|v| v.as_object())
2336 .map(|fk_obj| ForeignKey {
2337 table_id: fk_obj
2338 .get("table")
2339 .or_else(|| fk_obj.get("table_id"))
2340 .and_then(|v| v.as_str())
2341 .unwrap_or("")
2342 .to_string(),
2343 column_name: fk_obj
2344 .get("column")
2345 .or_else(|| fk_obj.get("column_name"))
2346 .and_then(|v| v.as_str())
2347 .unwrap_or("")
2348 .to_string(),
2349 })
2350 }
2351
2352 fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2354 if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
2356 if let Some((_, server_data)) = servers_obj.iter().next()
2358 && let Some(server_obj) = server_data.as_object()
2359 {
2360 return server_obj
2361 .get("type")
2362 .and_then(|v| v.as_str())
2363 .and_then(|s| self.parse_database_type(s));
2364 }
2365 } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
2366 if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
2368 return server_obj
2369 .get("type")
2370 .and_then(|v| v.as_str())
2371 .and_then(|s| self.parse_database_type(s));
2372 }
2373 }
2374 None
2375 }
2376
2377 fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
2379 match s.to_lowercase().as_str() {
2380 "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
2381 "postgres" | "postgresql" => Some(DatabaseType::Postgres),
2382 "mysql" => Some(DatabaseType::Mysql),
2383 "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
2384 "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
2385 _ => None,
2386 }
2387 }
2388
2389 fn parse_struct_type_from_string(
2391 &self,
2392 field_name: &str,
2393 type_str: &str,
2394 field_data: &serde_json::Map<String, JsonValue>,
2395 ) -> Result<Vec<Column>> {
2396 let mut columns = Vec::new();
2397
2398 let normalized_type = type_str
2400 .lines()
2401 .map(|line| line.trim())
2402 .filter(|line| !line.is_empty())
2403 .collect::<Vec<_>>()
2404 .join(" ");
2405
2406 let type_str_upper = normalized_type.to_uppercase();
2407
2408 let _is_array = type_str_upper.starts_with("ARRAY<");
2410 let struct_start = type_str_upper.find("STRUCT<");
2411
2412 if let Some(start_pos) = struct_start {
2413 let struct_content = &normalized_type[start_pos + 7..]; let mut depth = 1;
2418 let mut end_pos = None;
2419 for (i, ch) in struct_content.char_indices() {
2420 match ch {
2421 '<' => depth += 1,
2422 '>' => {
2423 depth -= 1;
2424 if depth == 0 {
2425 end_pos = Some(i);
2426 break;
2427 }
2428 }
2429 _ => {}
2430 }
2431 }
2432
2433 let struct_fields_str = if let Some(end) = end_pos {
2435 &struct_content[..end]
2436 } else {
2437 struct_content.trim_end_matches('>').trim()
2439 };
2440
2441 let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
2443
2444 for (nested_name, nested_type) in fields {
2446 let nested_type_upper = nested_type.to_uppercase();
2447 let nested_col_name = format!("{}.{}", field_name, nested_name);
2448
2449 if nested_type_upper.starts_with("STRUCT<") {
2451 let nested_struct_type_str = if nested_type_upper.starts_with("ARRAY<STRUCT<") {
2454 nested_type.clone()
2456 } else {
2457 nested_type.clone()
2459 };
2460
2461 match self.parse_struct_type_from_string(
2463 &nested_col_name,
2464 &nested_struct_type_str,
2465 field_data,
2466 ) {
2467 Ok(nested_cols) => {
2468 columns.extend(nested_cols);
2470 }
2471 Err(_) => {
2472 columns.push(Column {
2474 name: nested_col_name,
2475 data_type: normalize_data_type(&nested_type),
2476 nullable: !field_data
2477 .get("required")
2478 .and_then(|v| v.as_bool())
2479 .unwrap_or(false),
2480 primary_key: false,
2481 secondary_key: false,
2482 composite_key: None,
2483 foreign_key: None,
2484 constraints: Vec::new(),
2485 description: field_data
2486 .get("description")
2487 .and_then(|v| v.as_str())
2488 .unwrap_or("")
2489 .to_string(),
2490 errors: Vec::new(),
2491 quality: Vec::new(),
2492 enum_values: Vec::new(),
2493 column_order: 0,
2494 });
2495 }
2496 }
2497 } else {
2498 columns.push(Column {
2500 name: nested_col_name,
2501 data_type: normalize_data_type(&nested_type),
2502 nullable: !field_data
2503 .get("required")
2504 .and_then(|v| v.as_bool())
2505 .unwrap_or(false),
2506 primary_key: false,
2507 secondary_key: false,
2508 composite_key: None,
2509 foreign_key: None,
2510 constraints: Vec::new(),
2511 description: field_data
2512 .get("description")
2513 .and_then(|v| v.as_str())
2514 .unwrap_or("")
2515 .to_string(),
2516 errors: Vec::new(),
2517 quality: Vec::new(),
2518 enum_values: Vec::new(),
2519 column_order: 0,
2520 });
2521 }
2522 }
2523
2524 return Ok(columns);
2525 }
2526
2527 Ok(Vec::new())
2529 }
2530
2531 fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
2533 let mut fields = Vec::new();
2534 let mut current_field = String::new();
2535 let mut depth = 0;
2536 let mut in_string = false;
2537 let mut string_char = None;
2538
2539 for ch in fields_str.chars() {
2540 match ch {
2541 '\'' | '"' if !in_string || Some(ch) == string_char => {
2542 if in_string {
2543 in_string = false;
2544 string_char = None;
2545 } else {
2546 in_string = true;
2547 string_char = Some(ch);
2548 }
2549 current_field.push(ch);
2550 }
2551 '<' if !in_string => {
2552 depth += 1;
2553 current_field.push(ch);
2554 }
2555 '>' if !in_string => {
2556 depth -= 1;
2557 current_field.push(ch);
2558 }
2559 ',' if !in_string && depth == 0 => {
2560 let trimmed = current_field.trim();
2562 if !trimmed.is_empty()
2563 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2564 {
2565 fields.push((name, type_part));
2566 }
2567 current_field.clear();
2568 }
2569 _ => {
2570 current_field.push(ch);
2571 }
2572 }
2573 }
2574
2575 let trimmed = current_field.trim();
2577 if !trimmed.is_empty()
2578 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2579 {
2580 fields.push((name, type_part));
2581 }
2582
2583 Ok(fields)
2584 }
2585
2586 fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
2588 let colon_pos = field_def.find(':')?;
2590 let name = field_def[..colon_pos].trim().to_string();
2591 let type_part = field_def[colon_pos + 1..].trim().to_string();
2592
2593 if name.is_empty() || type_part.is_empty() {
2594 return None;
2595 }
2596
2597 Some((name, type_part))
2598 }
2599
2600 fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
2602 let mut catalog_name = None;
2603 let mut schema_name = None;
2604
2605 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2606 for prop in custom_props {
2607 if let Some(prop_obj) = prop.as_object() {
2608 let prop_key = prop_obj
2609 .get("property")
2610 .and_then(|v| v.as_str())
2611 .unwrap_or("");
2612 let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
2613
2614 match prop_key {
2615 "catalogName" | "catalog_name" => {
2616 catalog_name = prop_value.map(|s| s.to_string());
2617 }
2618 "schemaName" | "schema_name" => {
2619 schema_name = prop_value.map(|s| s.to_string());
2620 }
2621 _ => {}
2622 }
2623 }
2624 }
2625 }
2626
2627 if catalog_name.is_none() {
2629 catalog_name = data
2630 .get("catalog_name")
2631 .and_then(|v| v.as_str())
2632 .map(|s| s.to_string());
2633 }
2634 if schema_name.is_none() {
2635 schema_name = data
2636 .get("schema_name")
2637 .and_then(|v| v.as_str())
2638 .map(|s| s.to_string());
2639 }
2640
2641 (catalog_name, schema_name)
2642 }
2643}
2644
2645impl Default for ODCSImporter {
2646 fn default() -> Self {
2647 Self::new()
2648 }
2649}
2650
2651#[derive(Debug, Clone)]
2653pub struct ParserError {
2654 pub error_type: String,
2655 pub field: String,
2656 pub message: String,
2657}
2658
2659fn yaml_to_json_value(yaml: &serde_yaml::Value) -> Result<JsonValue> {
2661 let json_str = serde_json::to_string(yaml).context("Failed to convert YAML to JSON")?;
2663 serde_json::from_str(&json_str).context("Failed to parse JSON")
2664}
2665
2666fn json_value_to_serde_value(value: &JsonValue) -> serde_json::Value {
2668 value.clone()
2669}
2670
2671fn normalize_data_type(data_type: &str) -> String {
2673 if data_type.is_empty() {
2674 return data_type.to_string();
2675 }
2676
2677 let upper = data_type.to_uppercase();
2678
2679 if upper.starts_with("STRUCT") {
2681 if let Some(start) = data_type.find('<')
2682 && let Some(end) = data_type.rfind('>')
2683 {
2684 let inner = &data_type[start + 1..end];
2685 return format!("STRUCT<{}>", inner);
2686 }
2687 return format!("STRUCT{}", &data_type[6..]);
2688 } else if upper.starts_with("ARRAY") {
2689 if let Some(start) = data_type.find('<')
2690 && let Some(end) = data_type.rfind('>')
2691 {
2692 let inner = &data_type[start + 1..end];
2693 return format!("ARRAY<{}>", inner);
2694 }
2695 return format!("ARRAY{}", &data_type[5..]);
2696 } else if upper.starts_with("MAP") {
2697 if let Some(start) = data_type.find('<')
2698 && let Some(end) = data_type.rfind('>')
2699 {
2700 let inner = &data_type[start + 1..end];
2701 return format!("MAP<{}>", inner);
2702 }
2703 return format!("MAP{}", &data_type[3..]);
2704 }
2705
2706 upper
2707}
2708
2709fn parse_medallion_layer(s: &str) -> Result<MedallionLayer> {
2711 match s.to_uppercase().as_str() {
2712 "BRONZE" => Ok(MedallionLayer::Bronze),
2713 "SILVER" => Ok(MedallionLayer::Silver),
2714 "GOLD" => Ok(MedallionLayer::Gold),
2715 "OPERATIONAL" => Ok(MedallionLayer::Operational),
2716 _ => Err(anyhow::anyhow!("Unknown medallion layer: {}", s)),
2717 }
2718}
2719
2720fn parse_scd_pattern(s: &str) -> Result<SCDPattern> {
2721 match s.to_uppercase().as_str() {
2722 "TYPE_1" | "TYPE1" => Ok(SCDPattern::Type1),
2723 "TYPE_2" | "TYPE2" => Ok(SCDPattern::Type2),
2724 _ => Err(anyhow::anyhow!("Unknown SCD pattern: {}", s)),
2725 }
2726}
2727
2728fn parse_data_vault_classification(s: &str) -> Result<DataVaultClassification> {
2729 match s.to_uppercase().as_str() {
2730 "HUB" => Ok(DataVaultClassification::Hub),
2731 "LINK" => Ok(DataVaultClassification::Link),
2732 "SATELLITE" | "SAT" => Ok(DataVaultClassification::Satellite),
2733 _ => Err(anyhow::anyhow!("Unknown Data Vault classification: {}", s)),
2734 }
2735}
2736
2737#[cfg(test)]
2738mod tests {
2739 use super::*;
2740
2741 #[test]
2742 fn test_parse_simple_odcl_table() {
2743 let mut parser = ODCSImporter::new();
2744 let odcl_yaml = r#"
2745name: users
2746columns:
2747 - name: id
2748 data_type: INT
2749 nullable: false
2750 primary_key: true
2751 - name: name
2752 data_type: VARCHAR(255)
2753 nullable: false
2754database_type: Postgres
2755"#;
2756
2757 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2758 assert_eq!(table.name, "users");
2759 assert_eq!(table.columns.len(), 2);
2760 assert_eq!(table.columns[0].name, "id");
2761 assert_eq!(table.database_type, Some(DatabaseType::Postgres));
2762 assert_eq!(errors.len(), 0);
2763 }
2764
2765 #[test]
2766 fn test_parse_odcl_with_metadata() {
2767 let mut parser = ODCSImporter::new();
2768 let odcl_yaml = r#"
2769name: users
2770columns:
2771 - name: id
2772 data_type: INT
2773medallion_layer: gold
2774scd_pattern: TYPE_2
2775odcl_metadata:
2776 description: "User table"
2777 owner: "data-team"
2778"#;
2779
2780 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2781 assert_eq!(table.medallion_layers.len(), 1);
2782 assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
2783 assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
2784 if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
2785 assert_eq!(desc, "User table");
2786 }
2787 assert_eq!(errors.len(), 0);
2788 }
2789
2790 #[test]
2791 fn test_parse_odcl_with_data_vault() {
2792 let mut parser = ODCSImporter::new();
2793 let odcl_yaml = r#"
2794name: hub_customer
2795columns:
2796 - name: customer_key
2797 data_type: VARCHAR(50)
2798data_vault_classification: Hub
2799"#;
2800
2801 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2802 assert_eq!(
2803 table.data_vault_classification,
2804 Some(DataVaultClassification::Hub)
2805 );
2806 assert_eq!(errors.len(), 0);
2807 }
2808
2809 #[test]
2810 fn test_parse_invalid_odcl() {
2811 let mut parser = ODCSImporter::new();
2812 let invalid_yaml = "not: valid: yaml: structure:";
2813
2814 assert!(parser.parse(invalid_yaml).is_err());
2816 }
2817
2818 #[test]
2819 fn test_parse_odcl_missing_required_fields() {
2820 let mut parser = ODCSImporter::new();
2821 let non_conformant = r#"
2822name: users
2823# Missing required columns field
2824"#;
2825
2826 assert!(parser.parse(non_conformant).is_err());
2828 }
2829
2830 #[test]
2831 fn test_parse_odcl_with_foreign_key() {
2832 let mut parser = ODCSImporter::new();
2833 let odcl_yaml = r#"
2834name: orders
2835columns:
2836 - name: id
2837 data_type: INT
2838 primary_key: true
2839 - name: user_id
2840 data_type: INT
2841 foreign_key:
2842 table_id: users
2843 column_name: id
2844"#;
2845
2846 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2847 assert_eq!(table.columns.len(), 2);
2848 let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
2849 assert!(user_id_col.foreign_key.is_some());
2850 assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
2851 assert_eq!(errors.len(), 0);
2852 }
2853
2854 #[test]
2855 fn test_parse_odcl_with_constraints() {
2856 let mut parser = ODCSImporter::new();
2857 let odcl_yaml = r#"
2858name: products
2859columns:
2860 - name: id
2861 data_type: INT
2862 primary_key: true
2863 - name: name
2864 data_type: VARCHAR(255)
2865 nullable: false
2866 constraints:
2867 - UNIQUE
2868 - NOT NULL
2869"#;
2870
2871 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2872 assert_eq!(table.columns.len(), 2);
2873 let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
2874 assert!(!name_col.nullable);
2875 assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
2876 assert_eq!(errors.len(), 0);
2877 }
2878}