1use super::{ImportError, ImportResult, TableData};
12use crate::models::column::ForeignKey;
13use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
14use crate::models::{Column, Table, Tag};
15use anyhow::{Context, Result};
16use serde_json::Value as JsonValue;
17use std::collections::HashMap;
18use std::str::FromStr;
19use tracing::info;
20
21pub struct ODCSImporter {
24 current_yaml_data: Option<serde_yaml::Value>,
26}
27
28impl ODCSImporter {
29 pub fn new() -> Self {
39 Self {
40 current_yaml_data: None,
41 }
42 }
43
44 pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
77 match self.parse(yaml_content) {
78 Ok((table, errors)) => {
79 let sdk_tables = vec![TableData {
80 table_index: 0,
81 name: Some(table.name.clone()),
82 columns: table
83 .columns
84 .iter()
85 .map(|c| super::ColumnData {
86 name: c.name.clone(),
87 data_type: c.data_type.clone(),
88 nullable: c.nullable,
89 primary_key: c.primary_key,
90 description: if c.description.is_empty() {
91 None
92 } else {
93 Some(c.description.clone())
94 },
95 quality: if c.quality.is_empty() {
96 None
97 } else {
98 Some(c.quality.clone())
99 },
100 ref_path: c.ref_path.clone(),
101 })
102 .collect(),
103 }];
104 let sdk_errors: Vec<ImportError> = errors
105 .iter()
106 .map(|e| ImportError::ParseError(e.message.clone()))
107 .collect();
108 Ok(ImportResult {
109 tables: sdk_tables,
110 tables_requiring_name: Vec::new(),
111 errors: sdk_errors,
112 ai_suggestions: None,
113 })
114 }
115 Err(e) => Err(ImportError::ParseError(e.to_string())),
116 }
117 }
118
119 pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
130 self.parse(yaml_content)
131 }
132
133 fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
143 let _errors: Vec<ParserError> = Vec::new();
145
146 let data: serde_yaml::Value =
148 serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
149
150 if data.is_null() {
151 return Err(anyhow::anyhow!("Empty YAML content"));
152 }
153
154 self.current_yaml_data = Some(data.clone());
156
157 let json_data = yaml_to_json_value(&data)?;
159
160 if self.is_liquibase_format(&json_data) {
162 return self.parse_liquibase(&json_data);
163 }
164
165 if self.is_odcl_v3_format(&json_data) {
166 return self.parse_odcl_v3(&json_data);
167 }
168
169 if self.is_data_contract_format(&json_data) {
170 return self.parse_data_contract(&json_data);
171 }
172
173 self.parse_simple_odcl(&json_data)
175 }
176
177 fn resolve_ref<'a>(&self, ref_str: &str, data: &'a JsonValue) -> Option<&'a JsonValue> {
179 if !ref_str.starts_with("#/") {
180 return None;
181 }
182
183 let path = &ref_str[2..];
185 let parts: Vec<&str> = path.split('/').collect();
186
187 let mut current = data;
189 for part in parts {
190 current = current.get(part)?;
191 }
192
193 if current.is_object() {
194 Some(current)
195 } else {
196 None
197 }
198 }
199
200 fn is_liquibase_format(&self, data: &JsonValue) -> bool {
202 if data.get("databaseChangeLog").is_some() {
203 return true;
204 }
205 if let Some(obj) = data.as_object() {
207 let obj_str = format!("{:?}", obj);
208 if obj_str.contains("changeSet") {
209 return true;
210 }
211 }
212 false
213 }
214
215 fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
217 if let Some(obj) = data.as_object() {
218 let has_api_version = obj.contains_key("apiVersion");
219 let has_kind = obj
220 .get("kind")
221 .and_then(|v| v.as_str())
222 .map(|s| s == "DataContract")
223 .unwrap_or(false);
224 let has_id = obj.contains_key("id");
225 let has_version = obj.contains_key("version");
226 return has_api_version && has_kind && has_id && has_version;
227 }
228 false
229 }
230
231 fn is_data_contract_format(&self, data: &JsonValue) -> bool {
233 if let Some(obj) = data.as_object() {
234 let has_spec = obj.contains_key("dataContractSpecification");
235 let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
236 return has_spec && has_models;
237 }
238 false
239 }
240
241 fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
243 let mut errors = Vec::new();
244
245 let name = data
247 .get("name")
248 .and_then(|v| v.as_str())
249 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
250 .to_string();
251
252 let columns_data = data
254 .get("columns")
255 .and_then(|v| v.as_array())
256 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
257
258 let mut columns = Vec::new();
259 for (idx, col_data) in columns_data.iter().enumerate() {
260 match self.parse_column(col_data) {
261 Ok(col) => columns.push(col),
262 Err(e) => {
263 errors.push(ParserError {
264 error_type: "column_parse_error".to_string(),
265 field: format!("columns[{}]", idx),
266 message: e.to_string(),
267 });
268 }
269 }
270 }
271
272 let database_type = self.extract_database_type(data);
274 let medallion_layers = self.extract_medallion_layers(data);
275 let scd_pattern = self.extract_scd_pattern(data);
276 let data_vault_classification = self.extract_data_vault_classification(data);
277 let quality_rules = self.extract_quality_rules(data);
278
279 if scd_pattern.is_some() && data_vault_classification.is_some() {
281 errors.push(ParserError {
282 error_type: "validation_error".to_string(),
283 field: "patterns".to_string(),
284 message: "SCD pattern and Data Vault classification are mutually exclusive"
285 .to_string(),
286 });
287 }
288
289 let mut odcl_metadata = HashMap::new();
291 if let Some(metadata) = data.get("odcl_metadata")
292 && let Some(obj) = metadata.as_object()
293 {
294 for (key, value) in obj {
295 odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
296 }
297 }
298
299 let table_uuid = self.extract_table_uuid(data);
300
301 let table = Table {
302 id: table_uuid,
303 name,
304 columns,
305 database_type,
306 catalog_name: None,
307 schema_name: None,
308 medallion_layers,
309 scd_pattern,
310 data_vault_classification,
311 modeling_level: None,
312 tags: Vec::<Tag>::new(),
313 odcl_metadata,
314 owner: None,
315 sla: None,
316 contact_details: None,
317 infrastructure_type: None,
318 notes: None,
319 position: None,
320 yaml_file_path: None,
321 drawio_cell_id: None,
322 quality: quality_rules,
323 errors: Vec::new(),
324 created_at: chrono::Utc::now(),
325 updated_at: chrono::Utc::now(),
326 };
327
328 info!("Parsed ODCL table: {}", table.name);
329 Ok((table, errors))
330 }
331
332 fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
334 let name = col_data
335 .get("name")
336 .and_then(|v| v.as_str())
337 .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
338 .to_string();
339
340 let data_type = col_data
341 .get("data_type")
342 .and_then(|v| v.as_str())
343 .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
344 .to_string();
345
346 let data_type = normalize_data_type(&data_type);
348
349 let nullable = col_data
350 .get("nullable")
351 .and_then(|v| v.as_bool())
352 .unwrap_or(true);
353
354 let primary_key = col_data
355 .get("primary_key")
356 .and_then(|v| v.as_bool())
357 .unwrap_or(false);
358
359 let foreign_key = col_data
360 .get("foreign_key")
361 .and_then(|v| self.parse_foreign_key(v));
362
363 let constraints = col_data
364 .get("constraints")
365 .and_then(|v| v.as_array())
366 .map(|arr| {
367 arr.iter()
368 .filter_map(|v| v.as_str().map(|s| s.to_string()))
369 .collect()
370 })
371 .unwrap_or_default();
372
373 let description = col_data
374 .get("description")
375 .and_then(|v| v.as_str())
376 .map(|s| s.to_string())
377 .unwrap_or_default();
378
379 let mut column_quality_rules = Vec::new();
381 if let Some(quality_val) = col_data.get("quality") {
382 if let Some(arr) = quality_val.as_array() {
383 for item in arr {
385 if let Some(obj) = item.as_object() {
386 let mut rule = HashMap::new();
387 for (key, value) in obj {
388 rule.insert(key.clone(), json_value_to_serde_value(value));
389 }
390 column_quality_rules.push(rule);
391 }
392 }
393 } else if let Some(obj) = quality_val.as_object() {
394 let mut rule = HashMap::new();
396 for (key, value) in obj {
397 rule.insert(key.clone(), json_value_to_serde_value(value));
398 }
399 column_quality_rules.push(rule);
400 }
401 }
402
403 if !nullable {
405 let has_not_null = column_quality_rules.iter().any(|rule| {
406 rule.get("type")
407 .and_then(|v| v.as_str())
408 .map(|s| {
409 s.to_lowercase().contains("not_null")
410 || s.to_lowercase().contains("notnull")
411 })
412 .unwrap_or(false)
413 });
414 if !has_not_null {
415 let mut not_null_rule = HashMap::new();
416 not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
417 not_null_rule.insert(
418 "description".to_string(),
419 serde_json::json!("Column must not be null"),
420 );
421 column_quality_rules.push(not_null_rule);
422 }
423 }
424
425 Ok(Column {
426 name,
427 data_type,
428 nullable,
429 primary_key,
430 secondary_key: false,
431 composite_key: None,
432 foreign_key,
433 constraints,
434 description,
435 errors: Vec::new(),
436 quality: column_quality_rules,
437 ref_path: None,
438 enum_values: Vec::new(),
439 column_order: 0,
440 })
441 }
442
443 fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
445 let obj = fk_data.as_object()?;
446 Some(ForeignKey {
447 table_id: obj
448 .get("table_id")
449 .or_else(|| obj.get("table"))
450 .and_then(|v| v.as_str())
451 .unwrap_or("")
452 .to_string(),
453 column_name: obj
454 .get("column_name")
455 .or_else(|| obj.get("column"))
456 .and_then(|v| v.as_str())
457 .unwrap_or("")
458 .to_string(),
459 })
460 }
461
462 fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
464 data.get("database_type")
465 .and_then(|v| v.as_str())
466 .and_then(|s| match s.to_uppercase().as_str() {
467 "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
468 "MYSQL" => Some(DatabaseType::Mysql),
469 "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
470 "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
471 "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
472 _ => None,
473 })
474 }
475
476 fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
478 let mut layers = Vec::new();
479
480 if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
482 for item in arr {
483 if let Some(s) = item.as_str()
484 && let Ok(layer) = parse_medallion_layer(s)
485 {
486 layers.push(layer);
487 }
488 }
489 }
490 else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
492 && let Ok(layer) = parse_medallion_layer(s)
493 {
494 layers.push(layer);
495 }
496
497 layers
498 }
499
500 fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
502 data.get("scd_pattern")
503 .and_then(|v| v.as_str())
504 .and_then(|s| parse_scd_pattern(s).ok())
505 }
506
507 fn extract_data_vault_classification(
509 &self,
510 data: &JsonValue,
511 ) -> Option<DataVaultClassification> {
512 data.get("data_vault_classification")
513 .and_then(|v| v.as_str())
514 .and_then(|s| parse_data_vault_classification(s).ok())
515 }
516
517 fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
519 use serde_json::Value;
520 let mut quality_rules = Vec::new();
521
522 if let Some(quality_val) = data.get("quality") {
524 if let Some(arr) = quality_val.as_array() {
525 for item in arr {
527 if let Some(obj) = item.as_object() {
528 let mut rule = HashMap::new();
529 for (key, value) in obj {
530 rule.insert(key.clone(), json_value_to_serde_value(value));
531 }
532 quality_rules.push(rule);
533 }
534 }
535 } else if let Some(obj) = quality_val.as_object() {
536 let mut rule = HashMap::new();
538 for (key, value) in obj {
539 rule.insert(key.clone(), json_value_to_serde_value(value));
540 }
541 quality_rules.push(rule);
542 } else if let Some(s) = quality_val.as_str() {
543 let mut rule = HashMap::new();
545 rule.insert("value".to_string(), Value::String(s.to_string()));
546 quality_rules.push(rule);
547 }
548 }
549
550 if let Some(metadata) = data.get("metadata")
552 && let Some(metadata_obj) = metadata.as_object()
553 && let Some(quality_val) = metadata_obj.get("quality")
554 {
555 if let Some(arr) = quality_val.as_array() {
556 for item in arr {
558 if let Some(obj) = item.as_object() {
559 let mut rule = HashMap::new();
560 for (key, value) in obj {
561 rule.insert(key.clone(), json_value_to_serde_value(value));
562 }
563 quality_rules.push(rule);
564 }
565 }
566 } else if let Some(obj) = quality_val.as_object() {
567 let mut rule = HashMap::new();
569 for (key, value) in obj {
570 rule.insert(key.clone(), json_value_to_serde_value(value));
571 }
572 quality_rules.push(rule);
573 } else if let Some(s) = quality_val.as_str() {
574 let mut rule = HashMap::new();
576 rule.insert("value".to_string(), Value::String(s.to_string()));
577 quality_rules.push(rule);
578 }
579 }
580
581 if let Some(tblprops) = data.get("tblproperties")
583 && let Some(obj) = tblprops.as_object()
584 {
585 for (key, value) in obj {
586 let mut rule = HashMap::new();
587 rule.insert("property".to_string(), Value::String(key.clone()));
588 rule.insert("value".to_string(), json_value_to_serde_value(value));
589 quality_rules.push(rule);
590 }
591 }
592
593 quality_rules
594 }
595
596 fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
604 let mut errors = Vec::new();
618
619 let changelog = data
620 .get("databaseChangeLog")
621 .and_then(|v| v.as_array())
622 .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
623
624 let mut table_name: Option<String> = None;
626 let mut columns: Vec<crate::models::column::Column> = Vec::new();
627
628 for entry in changelog {
629 if let Some(change_set) = entry.get("changeSet") {
631 let changes = if let Some(obj) = change_set.as_object() {
633 obj.get("changes")
634 .and_then(|v| v.as_array())
635 .cloned()
636 .unwrap_or_default()
637 } else if let Some(arr) = change_set.as_array() {
638 arr.clone()
640 } else {
641 Vec::new()
642 };
643
644 for ch in changes {
645 let create = ch.get("createTable").or_else(|| ch.get("create_table"));
646 if let Some(create) = create {
647 table_name = create
648 .get("tableName")
649 .or_else(|| create.get("table_name"))
650 .and_then(|v| v.as_str())
651 .map(|s| s.to_string());
652
653 if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
655 for col_entry in cols {
656 let col = col_entry.get("column").unwrap_or(col_entry);
657 let name = col
658 .get("name")
659 .and_then(|v| v.as_str())
660 .unwrap_or("")
661 .to_string();
662 let data_type = col
663 .get("type")
664 .and_then(|v| v.as_str())
665 .unwrap_or("")
666 .to_string();
667
668 if name.is_empty() {
669 errors.push(ParserError {
670 error_type: "validation_error".to_string(),
671 field: "columns.name".to_string(),
672 message: "Liquibase createTable column missing name"
673 .to_string(),
674 });
675 continue;
676 }
677
678 let mut column =
679 crate::models::column::Column::new(name, data_type);
680
681 if let Some(constraints) =
682 col.get("constraints").and_then(|v| v.as_object())
683 {
684 if let Some(pk) =
685 constraints.get("primaryKey").and_then(|v| v.as_bool())
686 {
687 column.primary_key = pk;
688 }
689 if let Some(nullable) =
690 constraints.get("nullable").and_then(|v| v.as_bool())
691 {
692 column.nullable = nullable;
693 }
694 }
695
696 columns.push(column);
697 }
698 }
699
700 break;
703 }
704 }
705 }
706 if table_name.is_some() {
707 break;
708 }
709 }
710
711 let table_name = table_name
712 .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
713 let table = Table::new(table_name, columns);
714 Ok((table, errors))
716 }
717
718 fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
720 let mut errors = Vec::new();
721
722 let table_name = data
724 .get("name")
725 .and_then(|v| v.as_str())
726 .map(|s| s.to_string())
727 .or_else(|| {
728 data.get("schema")
730 .and_then(|v| v.as_array())
731 .and_then(|arr| arr.first())
732 .and_then(|obj| obj.as_object())
733 .and_then(|obj| obj.get("name"))
734 .and_then(|v| v.as_str())
735 .map(|s| s.to_string())
736 })
737 .ok_or_else(|| {
738 anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
739 })?;
740
741 let schema = data
743 .get("schema")
744 .and_then(|v| v.as_array())
745 .ok_or_else(|| {
746 errors.push(ParserError {
747 error_type: "validation_error".to_string(),
748 field: "schema".to_string(),
749 message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
750 });
751 anyhow::anyhow!("Missing schema")
752 });
753
754 let schema = match schema {
755 Ok(s) if s.is_empty() => {
756 errors.push(ParserError {
757 error_type: "validation_error".to_string(),
758 field: "schema".to_string(),
759 message: "ODCS v3.0.x schema array is empty".to_string(),
760 });
761 let quality_rules = self.extract_quality_rules(data);
762 let table_uuid = self.extract_table_uuid(data);
763 let table = Table {
764 id: table_uuid,
765 name: table_name,
766 columns: Vec::new(),
767 database_type: None,
768 catalog_name: None,
769 schema_name: None,
770 medallion_layers: Vec::new(),
771 scd_pattern: None,
772 data_vault_classification: None,
773 modeling_level: None,
774 tags: Vec::<Tag>::new(),
775 odcl_metadata: HashMap::new(),
776 owner: None,
777 sla: None,
778 contact_details: None,
779 infrastructure_type: None,
780 notes: None,
781 position: None,
782 yaml_file_path: None,
783 drawio_cell_id: None,
784 quality: quality_rules,
785 errors: Vec::new(),
786 created_at: chrono::Utc::now(),
787 updated_at: chrono::Utc::now(),
788 };
789 return Ok((table, errors));
790 }
791 Ok(s) => s,
792 Err(_) => {
793 let quality_rules = self.extract_quality_rules(data);
794 let table_uuid = self.extract_table_uuid(data);
795 let table = Table {
796 id: table_uuid,
797 name: table_name,
798 columns: Vec::new(),
799 database_type: None,
800 catalog_name: None,
801 schema_name: None,
802 medallion_layers: Vec::new(),
803 scd_pattern: None,
804 data_vault_classification: None,
805 modeling_level: None,
806 tags: Vec::<Tag>::new(),
807 odcl_metadata: HashMap::new(),
808 owner: None,
809 sla: None,
810 contact_details: None,
811 infrastructure_type: None,
812 notes: None,
813 position: None,
814 yaml_file_path: None,
815 drawio_cell_id: None,
816 quality: quality_rules,
817 errors: Vec::new(),
818 created_at: chrono::Utc::now(),
819 updated_at: chrono::Utc::now(),
820 };
821 return Ok((table, errors));
822 }
823 };
824
825 let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
827 errors.push(ParserError {
828 error_type: "validation_error".to_string(),
829 field: "schema[0]".to_string(),
830 message: "First schema object must be a dictionary".to_string(),
831 });
832 anyhow::anyhow!("Invalid schema object")
833 })?;
834
835 let object_name = schema_object
836 .get("name")
837 .and_then(|v| v.as_str())
838 .unwrap_or(&table_name);
839
840 let mut columns = Vec::new();
842
843 if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
844 for (prop_name, prop_data) in properties_obj {
846 if let Some(prop_obj) = prop_data.as_object() {
847 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
848 Ok(mut cols) => columns.append(&mut cols),
849 Err(e) => {
850 errors.push(ParserError {
851 error_type: "property_parse_error".to_string(),
852 field: format!("Property '{}'", prop_name),
853 message: e.to_string(),
854 });
855 }
856 }
857 } else {
858 errors.push(ParserError {
859 error_type: "validation_error".to_string(),
860 field: format!("Property '{}'", prop_name),
861 message: format!("Property '{}' must be an object", prop_name),
862 });
863 }
864 }
865 } else if let Some(properties_arr) =
866 schema_object.get("properties").and_then(|v| v.as_array())
867 {
868 for (idx, prop_data) in properties_arr.iter().enumerate() {
870 if let Some(prop_obj) = prop_data.as_object() {
871 let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
874 Some(JsonValue::String(s)) => s.as_str(),
875 _ => {
876 errors.push(ParserError {
878 error_type: "validation_error".to_string(),
879 field: format!("Property[{}]", idx),
880 message: format!(
881 "Property[{}] missing required 'name' or 'id' field",
882 idx
883 ),
884 });
885 continue;
886 }
887 };
888
889 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
890 Ok(mut cols) => columns.append(&mut cols),
891 Err(e) => {
892 errors.push(ParserError {
893 error_type: "property_parse_error".to_string(),
894 field: format!("Property[{}] '{}'", idx, prop_name),
895 message: e.to_string(),
896 });
897 }
898 }
899 } else {
900 errors.push(ParserError {
901 error_type: "validation_error".to_string(),
902 field: format!("Property[{}]", idx),
903 message: format!("Property[{}] must be an object", idx),
904 });
905 }
906 }
907 } else {
908 errors.push(ParserError {
909 error_type: "validation_error".to_string(),
910 field: format!("Object '{}'", object_name),
911 message: format!(
912 "Object '{}' missing 'properties' field or properties is invalid",
913 object_name
914 ),
915 });
916 }
917
918 let (medallion_layers, scd_pattern, data_vault_classification, mut tags): (
920 _,
921 _,
922 _,
923 Vec<Tag>,
924 ) = self.extract_metadata_from_custom_properties(data);
925
926 let mut shared_domains: Vec<String> = Vec::new();
928 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
929 for prop in custom_props {
930 if let Some(prop_obj) = prop.as_object() {
931 let prop_key = prop_obj
932 .get("property")
933 .and_then(|v| v.as_str())
934 .unwrap_or("");
935 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
936 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
937 {
938 for item in arr {
939 if let Some(s) = item.as_str() {
940 shared_domains.push(s.to_string());
941 }
942 }
943 }
944 }
945 }
946 }
947
948 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
950 for item in tags_arr {
951 if let Some(s) = item.as_str() {
952 let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
954 if !tags.contains(&tag) {
955 tags.push(tag);
956 }
957 }
958 }
959 }
960
961 let database_type = self.extract_database_type_from_odcl_v3_servers(data);
963
964 let quality_rules = self.extract_quality_rules(data);
966
967 let mut odcl_metadata = HashMap::new();
969 odcl_metadata.insert(
970 "apiVersion".to_string(),
971 json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
972 );
973 odcl_metadata.insert(
974 "kind".to_string(),
975 json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
976 );
977 odcl_metadata.insert(
978 "id".to_string(),
979 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
980 );
981 odcl_metadata.insert(
982 "version".to_string(),
983 json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
984 );
985 odcl_metadata.insert(
986 "status".to_string(),
987 json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
988 );
989
990 if let Some(servicelevels_val) = data.get("servicelevels") {
992 odcl_metadata.insert(
993 "servicelevels".to_string(),
994 json_value_to_serde_value(servicelevels_val),
995 );
996 }
997
998 if let Some(links_val) = data.get("links") {
1000 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1001 }
1002
1003 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1005 odcl_metadata.insert(
1006 "domain".to_string(),
1007 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1008 );
1009 }
1010 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1011 odcl_metadata.insert(
1012 "dataProduct".to_string(),
1013 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1014 );
1015 }
1016 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1017 odcl_metadata.insert(
1018 "tenant".to_string(),
1019 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1020 );
1021 }
1022
1023 if let Some(desc_val) = data.get("description") {
1025 odcl_metadata.insert(
1026 "description".to_string(),
1027 json_value_to_serde_value(desc_val),
1028 );
1029 }
1030
1031 if let Some(pricing_val) = data.get("pricing") {
1033 odcl_metadata.insert(
1034 "pricing".to_string(),
1035 json_value_to_serde_value(pricing_val),
1036 );
1037 }
1038
1039 if let Some(team_val) = data.get("team") {
1041 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1042 }
1043
1044 if let Some(roles_val) = data.get("roles") {
1046 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1047 }
1048
1049 if let Some(terms_val) = data.get("terms") {
1051 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1052 }
1053
1054 if let Some(servers_val) = data.get("servers") {
1056 odcl_metadata.insert(
1057 "servers".to_string(),
1058 json_value_to_serde_value(servers_val),
1059 );
1060 }
1061
1062 if let Some(infrastructure_val) = data.get("infrastructure") {
1064 odcl_metadata.insert(
1065 "infrastructure".to_string(),
1066 json_value_to_serde_value(infrastructure_val),
1067 );
1068 }
1069
1070 if !shared_domains.is_empty() {
1072 let shared_domains_json: Vec<serde_json::Value> = shared_domains
1073 .iter()
1074 .map(|d| serde_json::Value::String(d.clone()))
1075 .collect();
1076 odcl_metadata.insert(
1077 "sharedDomains".to_string(),
1078 serde_json::Value::Array(shared_domains_json),
1079 );
1080 }
1081
1082 let table_uuid = self.extract_table_uuid(data);
1083
1084 let table = Table {
1085 id: table_uuid,
1086 name: table_name,
1087 columns,
1088 database_type,
1089 catalog_name: None,
1090 schema_name: None,
1091 medallion_layers,
1092 scd_pattern,
1093 data_vault_classification,
1094 modeling_level: None,
1095 tags,
1096 odcl_metadata,
1097 owner: None,
1098 sla: None,
1099 contact_details: None,
1100 infrastructure_type: None,
1101 notes: None,
1102 position: None,
1103 yaml_file_path: None,
1104 drawio_cell_id: None,
1105 quality: quality_rules,
1106 errors: Vec::new(),
1107 created_at: chrono::Utc::now(),
1108 updated_at: chrono::Utc::now(),
1109 };
1110
1111 info!(
1112 "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1113 table.name,
1114 errors.len()
1115 );
1116 Ok((table, errors))
1117 }
1118
1119 fn parse_odcl_v3_property(
1121 &self,
1122 prop_name: &str,
1123 prop_data: &serde_json::Map<String, JsonValue>,
1124 data: &JsonValue,
1125 ) -> Result<Vec<Column>> {
1126 let mut errors = Vec::new();
1128 self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
1129 }
1130
1131 fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
1134 if let Some(id_val) = data.get("id")
1136 && let Some(id_str) = id_val.as_str()
1137 {
1138 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1139 tracing::debug!(
1140 "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
1141 uuid
1142 );
1143 return uuid;
1144 } else {
1145 tracing::warn!(
1146 "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
1147 id_str
1148 );
1149 }
1150 }
1151
1152 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1154 for prop in custom_props {
1155 if let Some(prop_obj) = prop.as_object() {
1156 let prop_key = prop_obj
1157 .get("property")
1158 .and_then(|v| v.as_str())
1159 .unwrap_or("");
1160 if prop_key == "tableUuid"
1161 && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
1162 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1163 {
1164 tracing::debug!(
1165 "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
1166 uuid
1167 );
1168 return uuid;
1169 }
1170 }
1171 }
1172 }
1173
1174 if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
1176 && let Some(uuid_val) = metadata.get("tableUuid")
1177 && let Some(uuid_str) = uuid_val.as_str()
1178 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1179 {
1180 tracing::debug!(
1181 "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
1182 uuid
1183 );
1184 return uuid;
1185 }
1186
1187 let table_name = data
1189 .get("name")
1190 .and_then(|v| v.as_str())
1191 .unwrap_or("unknown");
1192 let new_uuid = crate::models::table::Table::generate_id(
1193 table_name, None, None, None, );
1197 tracing::warn!(
1198 "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
1199 table_name,
1200 new_uuid
1201 );
1202 new_uuid
1203 }
1204
1205 fn extract_metadata_from_custom_properties(
1207 &self,
1208 data: &JsonValue,
1209 ) -> (
1210 Vec<MedallionLayer>,
1211 Option<SCDPattern>,
1212 Option<DataVaultClassification>,
1213 Vec<Tag>,
1214 ) {
1215 let mut medallion_layers = Vec::new();
1216 let mut scd_pattern = None;
1217 let mut data_vault_classification = None;
1218 let mut tags: Vec<Tag> = Vec::new();
1219
1220 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1221 for prop in custom_props {
1222 if let Some(prop_obj) = prop.as_object() {
1223 let prop_key = prop_obj
1224 .get("property")
1225 .and_then(|v| v.as_str())
1226 .unwrap_or("");
1227 let prop_value = prop_obj.get("value");
1228
1229 match prop_key {
1230 "medallionLayers" | "medallion_layers" => {
1231 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1232 for item in arr {
1233 if let Some(s) = item.as_str()
1234 && let Ok(layer) = parse_medallion_layer(s)
1235 {
1236 medallion_layers.push(layer);
1237 }
1238 }
1239 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1240 for part in s.split(',') {
1242 if let Ok(layer) = parse_medallion_layer(part.trim()) {
1243 medallion_layers.push(layer);
1244 }
1245 }
1246 }
1247 }
1248 "scdPattern" | "scd_pattern" => {
1249 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1250 scd_pattern = parse_scd_pattern(s).ok();
1251 }
1252 }
1253 "dataVaultClassification" | "data_vault_classification" => {
1254 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1255 data_vault_classification = parse_data_vault_classification(s).ok();
1256 }
1257 }
1258 "tags" => {
1259 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1260 for item in arr {
1261 if let Some(s) = item.as_str() {
1262 if let Ok(tag) = Tag::from_str(s) {
1264 tags.push(tag);
1265 } else {
1266 tags.push(Tag::Simple(s.to_string()));
1267 }
1268 }
1269 }
1270 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1271 for part in s.split(',') {
1273 let part = part.trim();
1274 if let Ok(tag) = Tag::from_str(part) {
1275 tags.push(tag);
1276 } else {
1277 tags.push(Tag::Simple(part.to_string()));
1278 }
1279 }
1280 }
1281 }
1282 "sharedDomains" | "shared_domains" => {
1283 }
1286 _ => {}
1287 }
1288 }
1289 }
1290 }
1291
1292 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1294 for item in tags_arr {
1295 if let Some(s) = item.as_str() {
1296 let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
1298 if !tags.contains(&tag) {
1299 tags.push(tag);
1300 }
1301 }
1302 }
1303 }
1304
1305 (
1306 medallion_layers,
1307 scd_pattern,
1308 data_vault_classification,
1309 tags,
1310 )
1311 }
1312
1313 fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
1315 if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
1317 && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
1318 {
1319 return server_obj
1320 .get("type")
1321 .and_then(|v| v.as_str())
1322 .and_then(|s| self.parse_database_type(s));
1323 }
1324 None
1325 }
1326
1327 fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1329 let mut errors = Vec::new();
1330
1331 let models = data
1333 .get("models")
1334 .and_then(|v| v.as_object())
1335 .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
1336
1337 let (model_name, model_data) = models
1340 .iter()
1341 .next()
1342 .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
1343
1344 let model_data = model_data
1345 .as_object()
1346 .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
1347
1348 let fields = model_data
1350 .get("fields")
1351 .and_then(|v| v.as_object())
1352 .ok_or_else(|| {
1353 errors.push(ParserError {
1354 error_type: "validation_error".to_string(),
1355 field: format!("Model '{}'", model_name),
1356 message: format!("Model '{}' missing 'fields' field", model_name),
1357 });
1358 anyhow::anyhow!("Missing fields")
1359 });
1360
1361 let fields = match fields {
1362 Ok(f) => f,
1363 Err(_) => {
1364 let quality_rules = self.extract_quality_rules(data);
1366 let table_uuid = self.extract_table_uuid(data);
1367 let table = Table {
1368 id: table_uuid,
1369 name: model_name.clone(),
1370 columns: Vec::new(),
1371 database_type: None,
1372 catalog_name: None,
1373 schema_name: None,
1374 medallion_layers: Vec::new(),
1375 scd_pattern: None,
1376 data_vault_classification: None,
1377 modeling_level: None,
1378 tags: Vec::<Tag>::new(),
1379 odcl_metadata: HashMap::new(),
1380 owner: None,
1381 sla: None,
1382 contact_details: None,
1383 infrastructure_type: None,
1384 notes: None,
1385 position: None,
1386 yaml_file_path: None,
1387 drawio_cell_id: None,
1388 quality: quality_rules,
1389 errors: Vec::new(),
1390 created_at: chrono::Utc::now(),
1391 updated_at: chrono::Utc::now(),
1392 };
1393 return Ok((table, errors));
1394 }
1395 };
1396
1397 let mut columns = Vec::new();
1399 for (field_name, field_data) in fields {
1400 if let Some(field_obj) = field_data.as_object() {
1401 match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
1402 Ok(mut cols) => columns.append(&mut cols),
1403 Err(e) => {
1404 errors.push(ParserError {
1405 error_type: "field_parse_error".to_string(),
1406 field: format!("Field '{}'", field_name),
1407 message: e.to_string(),
1408 });
1409 }
1410 }
1411 } else {
1412 errors.push(ParserError {
1413 error_type: "validation_error".to_string(),
1414 field: format!("Field '{}'", field_name),
1415 message: format!("Field '{}' must be an object", field_name),
1416 });
1417 }
1418 }
1419
1420 let mut odcl_metadata = HashMap::new();
1423
1424 if let Some(info_val) = data.get("info") {
1427 let info_json_value = json_value_to_serde_value(info_val);
1429 odcl_metadata.insert("info".to_string(), info_json_value);
1430 }
1431
1432 odcl_metadata.insert(
1433 "dataContractSpecification".to_string(),
1434 json_value_to_serde_value(
1435 data.get("dataContractSpecification")
1436 .unwrap_or(&JsonValue::Null),
1437 ),
1438 );
1439 odcl_metadata.insert(
1440 "id".to_string(),
1441 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1442 );
1443 if let Some(servicelevels_val) = data.get("servicelevels") {
1447 odcl_metadata.insert(
1448 "servicelevels".to_string(),
1449 json_value_to_serde_value(servicelevels_val),
1450 );
1451 }
1452
1453 if let Some(links_val) = data.get("links") {
1455 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1456 }
1457
1458 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1460 odcl_metadata.insert(
1461 "domain".to_string(),
1462 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1463 );
1464 }
1465 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1466 odcl_metadata.insert(
1467 "dataProduct".to_string(),
1468 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1469 );
1470 }
1471 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1472 odcl_metadata.insert(
1473 "tenant".to_string(),
1474 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1475 );
1476 }
1477
1478 if let Some(desc_val) = data.get("description") {
1480 odcl_metadata.insert(
1481 "description".to_string(),
1482 json_value_to_serde_value(desc_val),
1483 );
1484 }
1485
1486 if let Some(pricing_val) = data.get("pricing") {
1488 odcl_metadata.insert(
1489 "pricing".to_string(),
1490 json_value_to_serde_value(pricing_val),
1491 );
1492 }
1493
1494 if let Some(team_val) = data.get("team") {
1496 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1497 }
1498
1499 if let Some(roles_val) = data.get("roles") {
1501 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1502 }
1503
1504 if let Some(terms_val) = data.get("terms") {
1506 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1507 }
1508
1509 if let Some(servers_val) = data.get("servers") {
1511 odcl_metadata.insert(
1512 "servers".to_string(),
1513 json_value_to_serde_value(servers_val),
1514 );
1515 }
1516
1517 if let Some(infrastructure_val) = data.get("infrastructure") {
1519 odcl_metadata.insert(
1520 "infrastructure".to_string(),
1521 json_value_to_serde_value(infrastructure_val),
1522 );
1523 }
1524
1525 let database_type = self.extract_database_type_from_servers(data);
1527
1528 let (catalog_name, schema_name) = self.extract_catalog_schema(data);
1530
1531 let mut shared_domains: Vec<String> = Vec::new();
1533 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1534 for prop in custom_props {
1535 if let Some(prop_obj) = prop.as_object() {
1536 let prop_key = prop_obj
1537 .get("property")
1538 .and_then(|v| v.as_str())
1539 .unwrap_or("");
1540 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1541 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1542 {
1543 for item in arr {
1544 if let Some(s) = item.as_str() {
1545 shared_domains.push(s.to_string());
1546 }
1547 }
1548 }
1549 }
1550 }
1551 }
1552
1553 let mut tags: Vec<Tag> = Vec::new();
1555 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1556 for item in tags_arr {
1557 if let Some(s) = item.as_str() {
1558 if let Ok(tag) = Tag::from_str(s) {
1560 tags.push(tag);
1561 } else {
1562 tags.push(crate::models::Tag::Simple(s.to_string()));
1564 }
1565 }
1566 }
1567 }
1568
1569 let quality_rules = self.extract_quality_rules(data);
1571
1572 if !shared_domains.is_empty() {
1574 let shared_domains_json: Vec<serde_json::Value> = shared_domains
1575 .iter()
1576 .map(|d| serde_json::Value::String(d.clone()))
1577 .collect();
1578 odcl_metadata.insert(
1579 "sharedDomains".to_string(),
1580 serde_json::Value::Array(shared_domains_json),
1581 );
1582 }
1583
1584 let table_uuid = self.extract_table_uuid(data);
1585
1586 let table = Table {
1587 id: table_uuid,
1588 name: model_name.clone(),
1589 columns,
1590 database_type,
1591 catalog_name,
1592 schema_name,
1593 medallion_layers: Vec::new(),
1594 scd_pattern: None,
1595 data_vault_classification: None,
1596 modeling_level: None,
1597 tags,
1598 odcl_metadata,
1599 owner: None,
1600 sla: None,
1601 contact_details: None,
1602 infrastructure_type: None,
1603 notes: None,
1604 position: None,
1605 yaml_file_path: None,
1606 drawio_cell_id: None,
1607 quality: quality_rules,
1608 errors: Vec::new(),
1609 created_at: chrono::Utc::now(),
1610 updated_at: chrono::Utc::now(),
1611 };
1612
1613 info!(
1614 "Parsed Data Contract table: {} with {} warnings/errors",
1615 model_name,
1616 errors.len()
1617 );
1618 Ok((table, errors))
1619 }
1620
1621 #[allow(clippy::only_used_in_recursion)]
1626 fn expand_nested_column(
1627 &self,
1628 column_name: &str,
1629 schema: &JsonValue,
1630 nullable: bool,
1631 columns: &mut Vec<Column>,
1632 errors: &mut Vec<ParserError>,
1633 ) {
1634 let schema_obj = match schema.as_object() {
1635 Some(obj) => obj,
1636 None => {
1637 errors.push(ParserError {
1638 error_type: "parse_error".to_string(),
1639 field: column_name.to_string(),
1640 message: "Nested schema must be an object".to_string(),
1641 });
1642 return;
1643 }
1644 };
1645
1646 let schema_type = schema_obj
1647 .get("type")
1648 .and_then(|v| v.as_str())
1649 .unwrap_or("object");
1650
1651 match schema_type {
1652 "object" | "struct" => {
1653 if let Some(properties) = schema_obj.get("properties").and_then(|v| v.as_object()) {
1655 let nested_required: Vec<String> = schema_obj
1656 .get("required")
1657 .and_then(|v| v.as_array())
1658 .map(|arr| {
1659 arr.iter()
1660 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1661 .collect()
1662 })
1663 .unwrap_or_default();
1664
1665 for (nested_name, nested_schema) in properties {
1666 let nested_nullable = !nested_required.contains(nested_name);
1667 self.expand_nested_column(
1668 &format!("{}.{}", column_name, nested_name),
1669 nested_schema,
1670 nullable || nested_nullable,
1671 columns,
1672 errors,
1673 );
1674 }
1675 } else {
1676 let description = schema_obj
1678 .get("description")
1679 .and_then(|v| v.as_str())
1680 .unwrap_or("")
1681 .to_string();
1682 columns.push(Column {
1683 name: column_name.to_string(),
1684 data_type: "OBJECT".to_string(),
1685 nullable,
1686 primary_key: false,
1687 secondary_key: false,
1688 composite_key: None,
1689 foreign_key: None,
1690 constraints: Vec::new(),
1691 description,
1692 quality: Vec::new(),
1693 ref_path: None,
1694 enum_values: Vec::new(),
1695 errors: Vec::new(),
1696 column_order: 0,
1697 });
1698 }
1699 }
1700 "array" => {
1701 let items = schema_obj.get("items").unwrap_or(schema);
1703 let items_type = items
1704 .as_object()
1705 .and_then(|obj| obj.get("type").and_then(|v| v.as_str()))
1706 .unwrap_or("string");
1707
1708 if items_type == "object" || items_type == "struct" {
1709 let description = schema_obj
1711 .get("description")
1712 .and_then(|v| v.as_str())
1713 .unwrap_or("")
1714 .to_string();
1715 columns.push(Column {
1716 name: column_name.to_string(),
1717 data_type: "ARRAY<OBJECT>".to_string(),
1718 nullable,
1719 primary_key: false,
1720 secondary_key: false,
1721 composite_key: None,
1722 foreign_key: None,
1723 constraints: Vec::new(),
1724 description,
1725 quality: Vec::new(),
1726 ref_path: None,
1727 enum_values: Vec::new(),
1728 errors: Vec::new(),
1729 column_order: 0,
1730 });
1731 if let Some(properties) = items
1733 .as_object()
1734 .and_then(|obj| obj.get("properties").and_then(|v| v.as_object()))
1735 {
1736 let nested_required: Vec<String> = items
1737 .as_object()
1738 .and_then(|obj| obj.get("required").and_then(|v| v.as_array()))
1739 .map(|arr| {
1740 arr.iter()
1741 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1742 .collect()
1743 })
1744 .unwrap_or_default();
1745
1746 for (nested_name, nested_schema) in properties {
1747 let nested_nullable = !nested_required.contains(nested_name);
1748 self.expand_nested_column(
1749 &format!("{}.{}", column_name, nested_name),
1750 nested_schema,
1751 nullable || nested_nullable,
1752 columns,
1753 errors,
1754 );
1755 }
1756 }
1757 } else {
1758 let data_type = format!("ARRAY<{}>", items_type.to_uppercase());
1760 let description = schema_obj
1761 .get("description")
1762 .and_then(|v| v.as_str())
1763 .unwrap_or("")
1764 .to_string();
1765 columns.push(Column {
1766 name: column_name.to_string(),
1767 data_type,
1768 nullable,
1769 primary_key: false,
1770 secondary_key: false,
1771 composite_key: None,
1772 foreign_key: None,
1773 constraints: Vec::new(),
1774 description,
1775 quality: Vec::new(),
1776 ref_path: None,
1777 enum_values: Vec::new(),
1778 errors: Vec::new(),
1779 column_order: 0,
1780 });
1781 }
1782 }
1783 _ => {
1784 let data_type = schema_type.to_uppercase();
1786 let description = schema_obj
1787 .get("description")
1788 .and_then(|v| v.as_str())
1789 .unwrap_or("")
1790 .to_string();
1791 let enum_values = schema_obj
1792 .get("enum")
1793 .and_then(|v| v.as_array())
1794 .map(|arr| {
1795 arr.iter()
1796 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1797 .collect()
1798 })
1799 .unwrap_or_default();
1800 columns.push(Column {
1801 name: column_name.to_string(),
1802 data_type,
1803 nullable,
1804 primary_key: false,
1805 secondary_key: false,
1806 composite_key: None,
1807 foreign_key: None,
1808 constraints: Vec::new(),
1809 description,
1810 quality: Vec::new(),
1811 ref_path: None,
1812 enum_values,
1813 errors: Vec::new(),
1814 column_order: 0,
1815 });
1816 }
1817 }
1818 }
1819
1820 fn parse_data_contract_field(
1822 &self,
1823 field_name: &str,
1824 field_data: &serde_json::Map<String, JsonValue>,
1825 data: &JsonValue,
1826 errors: &mut Vec<ParserError>,
1827 ) -> Result<Vec<Column>> {
1828 let mut columns = Vec::new();
1829
1830 let extract_quality_from_obj =
1832 |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
1833 let mut quality_rules = Vec::new();
1834 if let Some(quality_val) = obj.get("quality") {
1835 if let Some(arr) = quality_val.as_array() {
1836 for item in arr {
1838 if let Some(rule_obj) = item.as_object() {
1839 let mut rule = HashMap::new();
1840 for (key, value) in rule_obj {
1841 rule.insert(key.clone(), json_value_to_serde_value(value));
1842 }
1843 quality_rules.push(rule);
1844 }
1845 }
1846 } else if let Some(rule_obj) = quality_val.as_object() {
1847 let mut rule = HashMap::new();
1849 for (key, value) in rule_obj {
1850 rule.insert(key.clone(), json_value_to_serde_value(value));
1851 }
1852 quality_rules.push(rule);
1853 }
1854 }
1855 quality_rules
1856 };
1857
1858 let description = field_data
1860 .get("description")
1861 .and_then(|v| v.as_str())
1862 .unwrap_or("")
1863 .to_string();
1864
1865 let mut quality_rules = extract_quality_from_obj(field_data);
1867
1868 if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
1870 let ref_path = Some(ref_str.to_string());
1872
1873 if let Some(definition) = self.resolve_ref(ref_str, data) {
1874 if quality_rules.is_empty() {
1878 if let Some(def_obj) = definition.as_object() {
1879 quality_rules = extract_quality_from_obj(def_obj);
1880 }
1881 } else {
1882 if let Some(def_obj) = definition.as_object() {
1884 let def_quality = extract_quality_from_obj(def_obj);
1885 quality_rules.extend(def_quality);
1887 }
1888 }
1889
1890 let required = field_data
1891 .get("required")
1892 .and_then(|v| v.as_bool())
1893 .unwrap_or(false);
1894
1895 if required {
1897 let has_not_null = quality_rules.iter().any(|rule| {
1898 rule.get("type")
1899 .and_then(|v| v.as_str())
1900 .map(|s| {
1901 s.to_lowercase().contains("not_null")
1902 || s.to_lowercase().contains("notnull")
1903 })
1904 .unwrap_or(false)
1905 });
1906 if !has_not_null {
1907 let mut not_null_rule = HashMap::new();
1908 not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
1909 not_null_rule.insert(
1910 "description".to_string(),
1911 serde_json::json!("Column must not be null"),
1912 );
1913 quality_rules.push(not_null_rule);
1914 }
1915 }
1916
1917 let has_nested = definition
1919 .get("type")
1920 .and_then(|v| v.as_str())
1921 .map(|s| s == "object")
1922 .unwrap_or(false)
1923 || definition.get("properties").is_some()
1924 || definition.get("fields").is_some();
1925
1926 if has_nested {
1927 if let Some(properties) =
1929 definition.get("properties").and_then(|v| v.as_object())
1930 {
1931 let nested_required: Vec<String> = definition
1933 .get("required")
1934 .and_then(|v| v.as_array())
1935 .map(|arr| {
1936 arr.iter()
1937 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1938 .collect()
1939 })
1940 .unwrap_or_default();
1941
1942 for (nested_name, nested_schema) in properties {
1943 let nested_required_field = nested_required.contains(nested_name);
1944 self.expand_nested_column(
1945 &format!("{}.{}", field_name, nested_name),
1946 nested_schema,
1947 !nested_required_field,
1948 &mut columns,
1949 errors,
1950 );
1951 }
1952 } else if let Some(fields) =
1953 definition.get("fields").and_then(|v| v.as_object())
1954 {
1955 for (nested_name, nested_schema) in fields {
1957 self.expand_nested_column(
1958 &format!("{}.{}", field_name, nested_name),
1959 nested_schema,
1960 true, &mut columns,
1962 errors,
1963 );
1964 }
1965 } else {
1966 columns.push(Column {
1968 name: field_name.to_string(),
1969 data_type: "OBJECT".to_string(),
1970 nullable: !required,
1971 primary_key: false,
1972 secondary_key: false,
1973 composite_key: None,
1974 foreign_key: None,
1975 constraints: Vec::new(),
1976 description: if description.is_empty() {
1977 definition
1978 .get("description")
1979 .and_then(|v| v.as_str())
1980 .unwrap_or("")
1981 .to_string()
1982 } else {
1983 description.clone()
1984 },
1985 errors: Vec::new(),
1986 quality: quality_rules.clone(),
1987 ref_path: ref_path.clone(),
1988 enum_values: Vec::new(),
1989 column_order: 0,
1990 });
1991 }
1992 } else {
1993 let def_type = definition
1995 .get("type")
1996 .and_then(|v| v.as_str())
1997 .unwrap_or("STRING")
1998 .to_uppercase();
1999
2000 let enum_values = definition
2001 .get("enum")
2002 .and_then(|v| v.as_array())
2003 .map(|arr| {
2004 arr.iter()
2005 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2006 .collect()
2007 })
2008 .unwrap_or_default();
2009
2010 columns.push(Column {
2011 name: field_name.to_string(),
2012 data_type: def_type,
2013 nullable: !required,
2014 primary_key: false,
2015 secondary_key: false,
2016 composite_key: None,
2017 foreign_key: None,
2018 constraints: Vec::new(),
2019 description: if description.is_empty() {
2020 definition
2021 .get("description")
2022 .and_then(|v| v.as_str())
2023 .unwrap_or("")
2024 .to_string()
2025 } else {
2026 description
2027 },
2028 errors: Vec::new(),
2029 quality: quality_rules,
2030 ref_path,
2031 enum_values,
2032 column_order: 0,
2033 });
2034 }
2035 return Ok(columns);
2036 } else {
2037 let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
2039 let mut error_map = HashMap::new();
2040 error_map.insert("type".to_string(), serde_json::json!("validation_error"));
2041 error_map.insert("field".to_string(), serde_json::json!("data_type"));
2042 error_map.insert(
2043 "message".to_string(),
2044 serde_json::json!(format!(
2045 "Field '{}' references undefined definition: {}",
2046 field_name, ref_str
2047 )),
2048 );
2049 col_errors.push(error_map);
2050 columns.push(Column {
2051 name: field_name.to_string(),
2052 data_type: "OBJECT".to_string(),
2053 nullable: true,
2054 primary_key: false,
2055 secondary_key: false,
2056 composite_key: None,
2057 foreign_key: None,
2058 constraints: Vec::new(),
2059 description,
2060 errors: col_errors,
2061 quality: Vec::new(),
2062 ref_path: Some(ref_str.to_string()), enum_values: Vec::new(),
2064 column_order: 0,
2065 });
2066 return Ok(columns);
2067 }
2068 }
2069
2070 let field_type_str = field_data
2072 .get("type")
2073 .and_then(|v| v.as_str())
2074 .unwrap_or("STRING");
2075
2076 if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
2078 match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
2079 Ok(nested_cols) if !nested_cols.is_empty() => {
2080 let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
2082 "ARRAY<STRUCT<...>>".to_string()
2083 } else {
2084 "STRUCT<...>".to_string()
2085 };
2086
2087 columns.push(Column {
2089 name: field_name.to_string(),
2090 data_type: parent_data_type,
2091 nullable: !field_data
2092 .get("required")
2093 .and_then(|v| v.as_bool())
2094 .unwrap_or(false),
2095 primary_key: false,
2096 secondary_key: false,
2097 composite_key: None,
2098 foreign_key: None,
2099 constraints: Vec::new(),
2100 description: description.clone(),
2101 errors: Vec::new(),
2102 quality: quality_rules.clone(),
2103 ref_path: field_data
2104 .get("$ref")
2105 .and_then(|v| v.as_str())
2106 .map(|s| s.to_string()),
2107 enum_values: Vec::new(),
2108 column_order: 0,
2109 });
2110
2111 columns.extend(nested_cols);
2113 return Ok(columns);
2114 }
2115 Ok(_) | Err(_) => {
2116 }
2118 }
2119 }
2120
2121 let field_type = normalize_data_type(field_type_str);
2122
2123 if field_type == "ARRAY" {
2125 let items = field_data.get("items");
2126 if let Some(items_val) = items {
2127 if let Some(items_obj) = items_val.as_object() {
2128 if items_obj.get("fields").is_some()
2130 || items_obj.get("type").and_then(|v| v.as_str()) == Some("object")
2131 {
2132 columns.push(Column {
2134 name: field_name.to_string(),
2135 data_type: "ARRAY<OBJECT>".to_string(),
2136 nullable: !field_data
2137 .get("required")
2138 .and_then(|v| v.as_bool())
2139 .unwrap_or(false),
2140 primary_key: false,
2141 secondary_key: false,
2142 composite_key: None,
2143 foreign_key: None,
2144 constraints: Vec::new(),
2145 description: field_data
2146 .get("description")
2147 .and_then(|v| v.as_str())
2148 .unwrap_or("")
2149 .to_string(),
2150 errors: Vec::new(),
2151 quality: Vec::new(),
2152 ref_path: None,
2153 enum_values: Vec::new(),
2154 column_order: 0,
2155 });
2156
2157 let nested_fields_obj = items_obj
2160 .get("properties")
2161 .and_then(|v| v.as_object())
2162 .or_else(|| items_obj.get("fields").and_then(|v| v.as_object()));
2163
2164 if let Some(fields_obj) = nested_fields_obj {
2165 for (nested_field_name, nested_field_data) in fields_obj {
2166 if let Some(nested_field_obj) = nested_field_data.as_object() {
2167 let nested_field_type = nested_field_obj
2168 .get("type")
2169 .and_then(|v| v.as_str())
2170 .unwrap_or("STRING");
2171
2172 let nested_col_name =
2174 format!("{}.{}", field_name, nested_field_name);
2175 let mut local_errors = Vec::new();
2176 match self.parse_data_contract_field(
2177 &nested_col_name,
2178 nested_field_obj,
2179 data,
2180 &mut local_errors,
2181 ) {
2182 Ok(mut nested_cols) => {
2183 columns.append(&mut nested_cols);
2186 }
2187 Err(_) => {
2188 columns.push(Column {
2190 name: nested_col_name,
2191 data_type: nested_field_type.to_uppercase(),
2192 nullable: !nested_field_obj
2193 .get("required")
2194 .and_then(|v| v.as_bool())
2195 .unwrap_or(false),
2196 primary_key: false,
2197 secondary_key: false,
2198 composite_key: None,
2199 foreign_key: None,
2200 constraints: Vec::new(),
2201 description: nested_field_obj
2202 .get("description")
2203 .and_then(|v| v.as_str())
2204 .unwrap_or("")
2205 .to_string(),
2206 errors: Vec::new(),
2207 quality: Vec::new(),
2208 ref_path: None,
2209 enum_values: Vec::new(),
2210 column_order: 0,
2211 });
2212 }
2213 }
2214 }
2215 }
2216 }
2217
2218 return Ok(columns);
2219 } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
2220 columns.push(Column {
2222 name: field_name.to_string(),
2223 data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
2224 nullable: !field_data
2225 .get("required")
2226 .and_then(|v| v.as_bool())
2227 .unwrap_or(false),
2228 primary_key: false,
2229 secondary_key: false,
2230 composite_key: None,
2231 foreign_key: None,
2232 constraints: Vec::new(),
2233 description: description.clone(),
2234 errors: Vec::new(),
2235 quality: quality_rules.clone(),
2236 ref_path: field_data
2237 .get("$ref")
2238 .and_then(|v| v.as_str())
2239 .map(|s| s.to_string()),
2240 enum_values: Vec::new(),
2241 column_order: 0,
2242 });
2243 return Ok(columns);
2244 }
2245 } else if let Some(item_type_str) = items_val.as_str() {
2246 columns.push(Column {
2248 name: field_name.to_string(),
2249 data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
2250 nullable: !field_data
2251 .get("required")
2252 .and_then(|v| v.as_bool())
2253 .unwrap_or(false),
2254 primary_key: false,
2255 secondary_key: false,
2256 composite_key: None,
2257 foreign_key: None,
2258 constraints: Vec::new(),
2259 description: description.clone(),
2260 errors: Vec::new(),
2261 quality: quality_rules.clone(),
2262 ref_path: field_data
2263 .get("$ref")
2264 .and_then(|v| v.as_str())
2265 .map(|s| s.to_string()),
2266 enum_values: Vec::new(),
2267 column_order: 0,
2268 });
2269 return Ok(columns);
2270 }
2271 }
2272 columns.push(Column {
2274 name: field_name.to_string(),
2275 data_type: "ARRAY<STRING>".to_string(),
2276 nullable: !field_data
2277 .get("required")
2278 .and_then(|v| v.as_bool())
2279 .unwrap_or(false),
2280 primary_key: false,
2281 secondary_key: false,
2282 composite_key: None,
2283 foreign_key: None,
2284 constraints: Vec::new(),
2285 description: description.clone(),
2286 errors: Vec::new(),
2287 quality: quality_rules.clone(),
2288 ref_path: field_data
2289 .get("$ref")
2290 .and_then(|v| v.as_str())
2291 .map(|s| s.to_string()),
2292 enum_values: Vec::new(),
2293 column_order: 0,
2294 });
2295 return Ok(columns);
2296 }
2297
2298 let nested_fields_obj = field_data
2301 .get("properties")
2302 .and_then(|v| v.as_object())
2303 .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
2304
2305 if field_type == "OBJECT"
2306 && let Some(fields_obj) = nested_fields_obj
2307 {
2308 columns.push(Column {
2312 name: field_name.to_string(),
2313 data_type: "OBJECT".to_string(),
2314 nullable: !field_data
2315 .get("required")
2316 .and_then(|v| v.as_bool())
2317 .unwrap_or(false),
2318 primary_key: false,
2319 secondary_key: false,
2320 composite_key: None,
2321 foreign_key: None,
2322 constraints: Vec::new(),
2323 description: description.clone(),
2324 errors: Vec::new(),
2325 quality: quality_rules.clone(),
2326 ref_path: field_data
2327 .get("$ref")
2328 .and_then(|v| v.as_str())
2329 .map(|s| s.to_string()),
2330 enum_values: Vec::new(),
2331 column_order: 0,
2332 });
2333
2334 for (nested_field_name, nested_field_data) in fields_obj {
2336 if let Some(nested_field_obj) = nested_field_data.as_object() {
2337 let nested_field_type = nested_field_obj
2338 .get("type")
2339 .and_then(|v| v.as_str())
2340 .unwrap_or("STRING");
2341
2342 let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2344 match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data) {
2345 Ok(mut nested_cols) => {
2346 columns.append(&mut nested_cols);
2349 }
2350 Err(_) => {
2351 columns.push(Column {
2353 name: nested_col_name,
2354 data_type: nested_field_type.to_uppercase(),
2355 nullable: !nested_field_obj
2356 .get("required")
2357 .and_then(|v| v.as_bool())
2358 .unwrap_or(false),
2359 primary_key: false,
2360 secondary_key: false,
2361 composite_key: None,
2362 foreign_key: None,
2363 constraints: Vec::new(),
2364 description: nested_field_obj
2365 .get("description")
2366 .and_then(|v| v.as_str())
2367 .unwrap_or("")
2368 .to_string(),
2369 errors: Vec::new(),
2370 quality: Vec::new(),
2371 ref_path: None,
2372 enum_values: Vec::new(),
2373 column_order: 0,
2374 });
2375 }
2376 }
2377 }
2378 }
2379
2380 return Ok(columns);
2381 }
2382
2383 let ref_path = field_data
2386 .get("$ref")
2387 .and_then(|v| v.as_str())
2388 .map(|s| s.to_string());
2389
2390 let required = field_data
2391 .get("required")
2392 .and_then(|v| v.as_bool())
2393 .unwrap_or(false);
2394
2395 let field_description = if description.is_empty() {
2397 field_data
2398 .get("description")
2399 .and_then(|v| v.as_str())
2400 .unwrap_or("")
2401 .to_string()
2402 } else {
2403 description
2404 };
2405
2406 let mut column_quality_rules = quality_rules;
2408
2409 if column_quality_rules.is_empty()
2411 && let Some(quality_val) = field_data.get("quality")
2412 {
2413 if let Some(arr) = quality_val.as_array() {
2414 for item in arr {
2416 if let Some(obj) = item.as_object() {
2417 let mut rule = HashMap::new();
2418 for (key, value) in obj {
2419 rule.insert(key.clone(), json_value_to_serde_value(value));
2420 }
2421 column_quality_rules.push(rule);
2422 }
2423 }
2424 } else if let Some(obj) = quality_val.as_object() {
2425 let mut rule = HashMap::new();
2427 for (key, value) in obj {
2428 rule.insert(key.clone(), json_value_to_serde_value(value));
2429 }
2430 column_quality_rules.push(rule);
2431 }
2432 }
2433
2434 if required {
2436 let has_not_null = column_quality_rules.iter().any(|rule| {
2437 rule.get("type")
2438 .and_then(|v| v.as_str())
2439 .map(|s| {
2440 s.to_lowercase().contains("not_null")
2441 || s.to_lowercase().contains("notnull")
2442 })
2443 .unwrap_or(false)
2444 });
2445 if !has_not_null {
2446 let mut not_null_rule = HashMap::new();
2447 not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
2448 not_null_rule.insert(
2449 "description".to_string(),
2450 serde_json::json!("Column must not be null"),
2451 );
2452 column_quality_rules.push(not_null_rule);
2453 }
2454 }
2455
2456 columns.push(Column {
2457 name: field_name.to_string(),
2458 data_type: field_type,
2459 nullable: !required,
2460 primary_key: field_data
2461 .get("primaryKey")
2462 .and_then(|v| v.as_bool())
2463 .unwrap_or(false),
2464 secondary_key: false,
2465 composite_key: None,
2466 foreign_key: self.parse_foreign_key_from_data_contract(field_data),
2467 constraints: Vec::new(),
2468 description: field_description,
2469 errors: Vec::new(),
2470 quality: column_quality_rules,
2471 ref_path,
2472 enum_values: Vec::new(),
2473 column_order: 0,
2474 });
2475
2476 Ok(columns)
2477 }
2478
2479 fn parse_foreign_key_from_data_contract(
2481 &self,
2482 field_data: &serde_json::Map<String, JsonValue>,
2483 ) -> Option<ForeignKey> {
2484 field_data
2485 .get("foreignKey")
2486 .and_then(|v| v.as_object())
2487 .map(|fk_obj| ForeignKey {
2488 table_id: fk_obj
2489 .get("table")
2490 .or_else(|| fk_obj.get("table_id"))
2491 .and_then(|v| v.as_str())
2492 .unwrap_or("")
2493 .to_string(),
2494 column_name: fk_obj
2495 .get("column")
2496 .or_else(|| fk_obj.get("column_name"))
2497 .and_then(|v| v.as_str())
2498 .unwrap_or("")
2499 .to_string(),
2500 })
2501 }
2502
2503 fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2505 if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
2507 if let Some((_, server_data)) = servers_obj.iter().next()
2509 && let Some(server_obj) = server_data.as_object()
2510 {
2511 return server_obj
2512 .get("type")
2513 .and_then(|v| v.as_str())
2514 .and_then(|s| self.parse_database_type(s));
2515 }
2516 } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
2517 if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
2519 return server_obj
2520 .get("type")
2521 .and_then(|v| v.as_str())
2522 .and_then(|s| self.parse_database_type(s));
2523 }
2524 }
2525 None
2526 }
2527
2528 fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
2530 match s.to_lowercase().as_str() {
2531 "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
2532 "postgres" | "postgresql" => Some(DatabaseType::Postgres),
2533 "mysql" => Some(DatabaseType::Mysql),
2534 "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
2535 "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
2536 _ => None,
2537 }
2538 }
2539
2540 fn parse_struct_type_from_string(
2542 &self,
2543 field_name: &str,
2544 type_str: &str,
2545 field_data: &serde_json::Map<String, JsonValue>,
2546 ) -> Result<Vec<Column>> {
2547 let mut columns = Vec::new();
2548
2549 let normalized_type = type_str
2551 .lines()
2552 .map(|line| line.trim())
2553 .filter(|line| !line.is_empty())
2554 .collect::<Vec<_>>()
2555 .join(" ");
2556
2557 let type_str_upper = normalized_type.to_uppercase();
2558
2559 let _is_array = type_str_upper.starts_with("ARRAY<");
2561 let struct_start = type_str_upper.find("STRUCT<");
2562
2563 if let Some(start_pos) = struct_start {
2564 let struct_content = &normalized_type[start_pos + 7..]; let mut depth = 1;
2569 let mut end_pos = None;
2570 for (i, ch) in struct_content.char_indices() {
2571 match ch {
2572 '<' => depth += 1,
2573 '>' => {
2574 depth -= 1;
2575 if depth == 0 {
2576 end_pos = Some(i);
2577 break;
2578 }
2579 }
2580 _ => {}
2581 }
2582 }
2583
2584 let struct_fields_str = if let Some(end) = end_pos {
2586 &struct_content[..end]
2587 } else {
2588 struct_content.trim_end_matches('>').trim()
2590 };
2591
2592 let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
2594
2595 for (nested_name, nested_type) in fields {
2597 let nested_type_upper = nested_type.to_uppercase();
2598 let nested_col_name = format!("{}.{}", field_name, nested_name);
2599
2600 if nested_type_upper.starts_with("STRUCT<") {
2602 let nested_struct_type_str = if nested_type_upper.starts_with("ARRAY<STRUCT<") {
2605 nested_type.clone()
2607 } else {
2608 nested_type.clone()
2610 };
2611
2612 match self.parse_struct_type_from_string(
2614 &nested_col_name,
2615 &nested_struct_type_str,
2616 field_data,
2617 ) {
2618 Ok(nested_cols) => {
2619 columns.extend(nested_cols);
2621 }
2622 Err(_) => {
2623 columns.push(Column {
2625 name: nested_col_name,
2626 data_type: normalize_data_type(&nested_type),
2627 nullable: !field_data
2628 .get("required")
2629 .and_then(|v| v.as_bool())
2630 .unwrap_or(false),
2631 primary_key: false,
2632 secondary_key: false,
2633 composite_key: None,
2634 foreign_key: None,
2635 constraints: Vec::new(),
2636 description: field_data
2637 .get("description")
2638 .and_then(|v| v.as_str())
2639 .unwrap_or("")
2640 .to_string(),
2641 errors: Vec::new(),
2642 quality: Vec::new(),
2643 ref_path: None,
2644 enum_values: Vec::new(),
2645 column_order: 0,
2646 });
2647 }
2648 }
2649 } else {
2650 columns.push(Column {
2652 name: nested_col_name,
2653 data_type: normalize_data_type(&nested_type),
2654 nullable: !field_data
2655 .get("required")
2656 .and_then(|v| v.as_bool())
2657 .unwrap_or(false),
2658 primary_key: false,
2659 secondary_key: false,
2660 composite_key: None,
2661 foreign_key: None,
2662 constraints: Vec::new(),
2663 description: field_data
2664 .get("description")
2665 .and_then(|v| v.as_str())
2666 .unwrap_or("")
2667 .to_string(),
2668 errors: Vec::new(),
2669 quality: Vec::new(),
2670 ref_path: None,
2671 enum_values: Vec::new(),
2672 column_order: 0,
2673 });
2674 }
2675 }
2676
2677 return Ok(columns);
2678 }
2679
2680 Ok(Vec::new())
2682 }
2683
2684 fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
2686 let mut fields = Vec::new();
2687 let mut current_field = String::new();
2688 let mut depth = 0;
2689 let mut in_string = false;
2690 let mut string_char = None;
2691
2692 for ch in fields_str.chars() {
2693 match ch {
2694 '\'' | '"' if !in_string || Some(ch) == string_char => {
2695 if in_string {
2696 in_string = false;
2697 string_char = None;
2698 } else {
2699 in_string = true;
2700 string_char = Some(ch);
2701 }
2702 current_field.push(ch);
2703 }
2704 '<' if !in_string => {
2705 depth += 1;
2706 current_field.push(ch);
2707 }
2708 '>' if !in_string => {
2709 depth -= 1;
2710 current_field.push(ch);
2711 }
2712 ',' if !in_string && depth == 0 => {
2713 let trimmed = current_field.trim();
2715 if !trimmed.is_empty()
2716 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2717 {
2718 fields.push((name, type_part));
2719 }
2720 current_field.clear();
2721 }
2722 _ => {
2723 current_field.push(ch);
2724 }
2725 }
2726 }
2727
2728 let trimmed = current_field.trim();
2730 if !trimmed.is_empty()
2731 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2732 {
2733 fields.push((name, type_part));
2734 }
2735
2736 Ok(fields)
2737 }
2738
2739 fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
2741 let colon_pos = field_def.find(':')?;
2743 let name = field_def[..colon_pos].trim().to_string();
2744 let type_part = field_def[colon_pos + 1..].trim().to_string();
2745
2746 if name.is_empty() || type_part.is_empty() {
2747 return None;
2748 }
2749
2750 Some((name, type_part))
2751 }
2752
2753 fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
2755 let mut catalog_name = None;
2756 let mut schema_name = None;
2757
2758 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2759 for prop in custom_props {
2760 if let Some(prop_obj) = prop.as_object() {
2761 let prop_key = prop_obj
2762 .get("property")
2763 .and_then(|v| v.as_str())
2764 .unwrap_or("");
2765 let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
2766
2767 match prop_key {
2768 "catalogName" | "catalog_name" => {
2769 catalog_name = prop_value.map(|s| s.to_string());
2770 }
2771 "schemaName" | "schema_name" => {
2772 schema_name = prop_value.map(|s| s.to_string());
2773 }
2774 _ => {}
2775 }
2776 }
2777 }
2778 }
2779
2780 if catalog_name.is_none() {
2782 catalog_name = data
2783 .get("catalog_name")
2784 .and_then(|v| v.as_str())
2785 .map(|s| s.to_string());
2786 }
2787 if schema_name.is_none() {
2788 schema_name = data
2789 .get("schema_name")
2790 .and_then(|v| v.as_str())
2791 .map(|s| s.to_string());
2792 }
2793
2794 (catalog_name, schema_name)
2795 }
2796}
2797
2798impl Default for ODCSImporter {
2799 fn default() -> Self {
2800 Self::new()
2801 }
2802}
2803
2804#[derive(Debug, Clone)]
2806pub struct ParserError {
2807 pub error_type: String,
2808 pub field: String,
2809 pub message: String,
2810}
2811
2812fn yaml_to_json_value(yaml: &serde_yaml::Value) -> Result<JsonValue> {
2814 let json_str = serde_json::to_string(yaml).context("Failed to convert YAML to JSON")?;
2816 serde_json::from_str(&json_str).context("Failed to parse JSON")
2817}
2818
2819fn json_value_to_serde_value(value: &JsonValue) -> serde_json::Value {
2821 value.clone()
2822}
2823
2824fn normalize_data_type(data_type: &str) -> String {
2826 if data_type.is_empty() {
2827 return data_type.to_string();
2828 }
2829
2830 let upper = data_type.to_uppercase();
2831
2832 if upper.starts_with("STRUCT") {
2834 if let Some(start) = data_type.find('<')
2835 && let Some(end) = data_type.rfind('>')
2836 {
2837 let inner = &data_type[start + 1..end];
2838 return format!("STRUCT<{}>", inner);
2839 }
2840 return format!("STRUCT{}", &data_type[6..]);
2841 } else if upper.starts_with("ARRAY") {
2842 if let Some(start) = data_type.find('<')
2843 && let Some(end) = data_type.rfind('>')
2844 {
2845 let inner = &data_type[start + 1..end];
2846 return format!("ARRAY<{}>", inner);
2847 }
2848 return format!("ARRAY{}", &data_type[5..]);
2849 } else if upper.starts_with("MAP") {
2850 if let Some(start) = data_type.find('<')
2851 && let Some(end) = data_type.rfind('>')
2852 {
2853 let inner = &data_type[start + 1..end];
2854 return format!("MAP<{}>", inner);
2855 }
2856 return format!("MAP{}", &data_type[3..]);
2857 }
2858
2859 upper
2860}
2861
2862fn parse_medallion_layer(s: &str) -> Result<MedallionLayer> {
2864 match s.to_uppercase().as_str() {
2865 "BRONZE" => Ok(MedallionLayer::Bronze),
2866 "SILVER" => Ok(MedallionLayer::Silver),
2867 "GOLD" => Ok(MedallionLayer::Gold),
2868 "OPERATIONAL" => Ok(MedallionLayer::Operational),
2869 _ => Err(anyhow::anyhow!("Unknown medallion layer: {}", s)),
2870 }
2871}
2872
2873fn parse_scd_pattern(s: &str) -> Result<SCDPattern> {
2874 match s.to_uppercase().as_str() {
2875 "TYPE_1" | "TYPE1" => Ok(SCDPattern::Type1),
2876 "TYPE_2" | "TYPE2" => Ok(SCDPattern::Type2),
2877 _ => Err(anyhow::anyhow!("Unknown SCD pattern: {}", s)),
2878 }
2879}
2880
2881fn parse_data_vault_classification(s: &str) -> Result<DataVaultClassification> {
2882 match s.to_uppercase().as_str() {
2883 "HUB" => Ok(DataVaultClassification::Hub),
2884 "LINK" => Ok(DataVaultClassification::Link),
2885 "SATELLITE" | "SAT" => Ok(DataVaultClassification::Satellite),
2886 _ => Err(anyhow::anyhow!("Unknown Data Vault classification: {}", s)),
2887 }
2888}
2889
2890#[cfg(test)]
2891mod tests {
2892 use super::*;
2893
2894 #[test]
2895 fn test_parse_simple_odcl_table() {
2896 let mut parser = ODCSImporter::new();
2897 let odcl_yaml = r#"
2898name: users
2899columns:
2900 - name: id
2901 data_type: INT
2902 nullable: false
2903 primary_key: true
2904 - name: name
2905 data_type: VARCHAR(255)
2906 nullable: false
2907database_type: Postgres
2908"#;
2909
2910 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2911 assert_eq!(table.name, "users");
2912 assert_eq!(table.columns.len(), 2);
2913 assert_eq!(table.columns[0].name, "id");
2914 assert_eq!(table.database_type, Some(DatabaseType::Postgres));
2915 assert_eq!(errors.len(), 0);
2916 }
2917
2918 #[test]
2919 fn test_parse_odcl_with_metadata() {
2920 let mut parser = ODCSImporter::new();
2921 let odcl_yaml = r#"
2922name: users
2923columns:
2924 - name: id
2925 data_type: INT
2926medallion_layer: gold
2927scd_pattern: TYPE_2
2928odcl_metadata:
2929 description: "User table"
2930 owner: "data-team"
2931"#;
2932
2933 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2934 assert_eq!(table.medallion_layers.len(), 1);
2935 assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
2936 assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
2937 if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
2938 assert_eq!(desc, "User table");
2939 }
2940 assert_eq!(errors.len(), 0);
2941 }
2942
2943 #[test]
2944 fn test_parse_odcl_with_data_vault() {
2945 let mut parser = ODCSImporter::new();
2946 let odcl_yaml = r#"
2947name: hub_customer
2948columns:
2949 - name: customer_key
2950 data_type: VARCHAR(50)
2951data_vault_classification: Hub
2952"#;
2953
2954 let (table, errors) = parser.parse(odcl_yaml).unwrap();
2955 assert_eq!(
2956 table.data_vault_classification,
2957 Some(DataVaultClassification::Hub)
2958 );
2959 assert_eq!(errors.len(), 0);
2960 }
2961
2962 #[test]
2963 fn test_parse_invalid_odcl() {
2964 let mut parser = ODCSImporter::new();
2965 let invalid_yaml = "not: valid: yaml: structure:";
2966
2967 assert!(parser.parse(invalid_yaml).is_err());
2969 }
2970
2971 #[test]
2972 fn test_parse_odcl_missing_required_fields() {
2973 let mut parser = ODCSImporter::new();
2974 let non_conformant = r#"
2975name: users
2976# Missing required columns field
2977"#;
2978
2979 assert!(parser.parse(non_conformant).is_err());
2981 }
2982
2983 #[test]
2984 fn test_parse_odcl_with_foreign_key() {
2985 let mut parser = ODCSImporter::new();
2986 let odcl_yaml = r#"
2987name: orders
2988columns:
2989 - name: id
2990 data_type: INT
2991 primary_key: true
2992 - name: user_id
2993 data_type: INT
2994 foreign_key:
2995 table_id: users
2996 column_name: id
2997"#;
2998
2999 let (table, errors) = parser.parse(odcl_yaml).unwrap();
3000 assert_eq!(table.columns.len(), 2);
3001 let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
3002 assert!(user_id_col.foreign_key.is_some());
3003 assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
3004 assert_eq!(errors.len(), 0);
3005 }
3006
3007 #[test]
3008 fn test_parse_odcl_with_constraints() {
3009 let mut parser = ODCSImporter::new();
3010 let odcl_yaml = r#"
3011name: products
3012columns:
3013 - name: id
3014 data_type: INT
3015 primary_key: true
3016 - name: name
3017 data_type: VARCHAR(255)
3018 nullable: false
3019 constraints:
3020 - UNIQUE
3021 - NOT NULL
3022"#;
3023
3024 let (table, errors) = parser.parse(odcl_yaml).unwrap();
3025 assert_eq!(table.columns.len(), 2);
3026 let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
3027 assert!(!name_col.nullable);
3028 assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
3029 assert_eq!(errors.len(), 0);
3030 }
3031}