1use super::odcs_shared::{
12 ParserError, column_to_column_data, expand_nested_column, json_value_to_serde_value,
13 normalize_data_type, parse_data_vault_classification, parse_medallion_layer, parse_scd_pattern,
14 resolve_ref, yaml_to_json_value,
15};
16use super::{ImportError, ImportResult, TableData};
17use crate::models::column::ForeignKey;
18use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
19use crate::models::{Column, PropertyRelationship, Table, Tag};
20use anyhow::{Context, Result};
21use serde_json::Value as JsonValue;
22use std::collections::HashMap;
23use std::str::FromStr;
24use tracing::info;
25
26pub use super::odcs_shared::ParserError as OdcsParserError;
28
29fn ref_to_relationships(ref_path: &Option<String>) -> Vec<PropertyRelationship> {
32 match ref_path {
33 Some(ref_str) => {
34 let to = if ref_str.starts_with("#/definitions/") {
35 let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
36 format!("definitions/{}", def_path)
37 } else if ref_str.starts_with("#/") {
38 ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
39 } else {
40 ref_str.clone()
41 };
42 vec![PropertyRelationship {
43 relationship_type: "foreignKey".to_string(),
44 to,
45 }]
46 }
47 None => Vec::new(),
48 }
49}
50
51pub struct ODCSImporter {
54 current_yaml_data: Option<serde_yaml::Value>,
56}
57
58impl ODCSImporter {
59 pub fn new() -> Self {
69 Self {
70 current_yaml_data: None,
71 }
72 }
73
74 pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
108 let yaml_data: serde_yaml::Value = serde_yaml::from_str(yaml_content)
110 .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
111
112 let json_data = yaml_to_json_value(&yaml_data).map_err(|e| {
113 ImportError::ParseError(format!("Failed to convert YAML to JSON: {}", e))
114 })?;
115
116 self.current_yaml_data = Some(yaml_data);
118
119 if self.is_odcl_v3_format(&json_data) {
121 match self.parse_odcl_v3_all(&json_data) {
123 Ok(tables_with_errors) => {
124 let mut sdk_tables = Vec::new();
125 let mut all_errors = Vec::new();
126
127 for (table_index, (table, errors)) in tables_with_errors.into_iter().enumerate()
128 {
129 for e in errors {
131 all_errors.push(ImportError::ParseError(e.message));
132 }
133
134 sdk_tables.push(self.table_to_table_data(table, table_index, &json_data));
136 }
137
138 info!(
139 "ODCS import completed: {} tables extracted from schema array",
140 sdk_tables.len()
141 );
142
143 return Ok(ImportResult {
144 tables: sdk_tables,
145 tables_requiring_name: Vec::new(),
146 errors: all_errors,
147 ai_suggestions: None,
148 });
149 }
150 Err(e) => {
151 return Err(ImportError::ParseError(e.to_string()));
152 }
153 }
154 }
155
156 match self.parse(yaml_content) {
158 Ok((table, errors)) => {
159 let sdk_tables = vec![self.table_to_table_data(table, 0, &json_data)];
160 let sdk_errors: Vec<ImportError> = errors
161 .iter()
162 .map(|e| ImportError::ParseError(e.message.clone()))
163 .collect();
164 Ok(ImportResult {
165 tables: sdk_tables,
166 tables_requiring_name: Vec::new(),
167 errors: sdk_errors,
168 ai_suggestions: None,
169 })
170 }
171 Err(e) => Err(ImportError::ParseError(e.to_string())),
172 }
173 }
174
175 fn table_to_table_data(
177 &self,
178 table: Table,
179 table_index: usize,
180 json_data: &JsonValue,
181 ) -> TableData {
182 TableData {
183 table_index,
184 id: Some(table.id.to_string()),
185 name: Some(table.name.clone()),
186 api_version: json_data
187 .get("apiVersion")
188 .and_then(|v| v.as_str())
189 .map(|s| s.to_string()),
190 version: json_data
191 .get("version")
192 .and_then(|v| v.as_str())
193 .map(|s| s.to_string()),
194 status: json_data
195 .get("status")
196 .and_then(|v| v.as_str())
197 .map(|s| s.to_string()),
198 kind: json_data
199 .get("kind")
200 .and_then(|v| v.as_str())
201 .map(|s| s.to_string()),
202 domain: json_data
203 .get("domain")
204 .and_then(|v| v.as_str())
205 .map(|s| s.to_string()),
206 data_product: json_data
207 .get("dataProduct")
208 .and_then(|v| v.as_str())
209 .map(|s| s.to_string()),
210 tenant: json_data
211 .get("tenant")
212 .and_then(|v| v.as_str())
213 .map(|s| s.to_string()),
214 description: json_data.get("description").cloned(),
215 physical_name: table
217 .odcl_metadata
218 .get("physicalName")
219 .and_then(|v| v.as_str())
220 .map(|s| s.to_string())
221 .or_else(|| table.schema_name.clone()),
222 physical_type: table
223 .odcl_metadata
224 .get("physicalType")
225 .and_then(|v| v.as_str())
226 .map(|s| s.to_string()),
227 business_name: table
228 .odcl_metadata
229 .get("businessName")
230 .and_then(|v| v.as_str())
231 .map(|s| s.to_string()),
232 data_granularity_description: table
233 .odcl_metadata
234 .get("dataGranularityDescription")
235 .and_then(|v| v.as_str())
236 .map(|s| s.to_string()),
237 columns: table.columns.iter().map(column_to_column_data).collect(),
238 servers: json_data
239 .get("servers")
240 .and_then(|v| v.as_array())
241 .cloned()
242 .unwrap_or_default(),
243 team: json_data.get("team").cloned(),
244 support: json_data.get("support").cloned(),
245 roles: json_data
246 .get("roles")
247 .and_then(|v| v.as_array())
248 .cloned()
249 .unwrap_or_default(),
250 sla_properties: json_data
251 .get("slaProperties")
252 .and_then(|v| v.as_array())
253 .cloned()
254 .unwrap_or_default(),
255 quality: table.quality.clone(),
256 price: json_data.get("price").cloned(),
257 tags: table.tags.iter().map(|t| t.to_string()).collect(),
258 custom_properties: json_data
259 .get("customProperties")
260 .and_then(|v| v.as_array())
261 .cloned()
262 .unwrap_or_default(),
263 authoritative_definitions: json_data
264 .get("authoritativeDefinitions")
265 .and_then(|v| v.as_array())
266 .cloned()
267 .unwrap_or_default(),
268 contract_created_ts: json_data
269 .get("contractCreatedTs")
270 .and_then(|v| v.as_str())
271 .map(|s| s.to_string()),
272 odcs_metadata: table.odcl_metadata.clone(),
273 }
274 }
275
276 pub fn import_contract(
324 &mut self,
325 yaml_content: &str,
326 ) -> Result<crate::models::odcs::ODCSContract, ImportError> {
327 use crate::models::odcs::{
328 AuthoritativeDefinition, CustomProperty, Description, ODCSContract, QualityRule, Role,
329 Server, ServiceLevel, Support, Team,
330 };
331
332 let yaml_data: serde_yaml::Value = serde_yaml::from_str(yaml_content)
334 .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
335
336 let json_data = yaml_to_json_value(&yaml_data).map_err(|e| {
337 ImportError::ParseError(format!("Failed to convert YAML to JSON: {}", e))
338 })?;
339
340 self.current_yaml_data = Some(yaml_data);
342
343 if !self.is_odcl_v3_format(&json_data) {
345 return Err(ImportError::ParseError(
346 "import_contract() only supports ODCS v3 format. Use import() for legacy formats."
347 .to_string(),
348 ));
349 }
350
351 let api_version = json_data
353 .get("apiVersion")
354 .and_then(|v| v.as_str())
355 .unwrap_or("v3.1.0")
356 .to_string();
357 let kind = json_data
358 .get("kind")
359 .and_then(|v| v.as_str())
360 .unwrap_or("DataContract")
361 .to_string();
362 let id = json_data
363 .get("id")
364 .and_then(|v| v.as_str())
365 .unwrap_or("")
366 .to_string();
367 let version = json_data
368 .get("version")
369 .and_then(|v| v.as_str())
370 .unwrap_or("1.0.0")
371 .to_string();
372 let name = json_data
373 .get("name")
374 .and_then(|v| v.as_str())
375 .unwrap_or("")
376 .to_string();
377 let status = json_data
378 .get("status")
379 .and_then(|v| v.as_str())
380 .map(|s| s.to_string());
381 let domain = json_data
382 .get("domain")
383 .and_then(|v| v.as_str())
384 .map(|s| s.to_string());
385 let data_product = json_data
386 .get("dataProduct")
387 .and_then(|v| v.as_str())
388 .map(|s| s.to_string());
389 let tenant = json_data
390 .get("tenant")
391 .and_then(|v| v.as_str())
392 .map(|s| s.to_string());
393
394 let description = json_data.get("description").and_then(|v| {
396 if v.is_string() {
397 Some(Description::Simple(v.as_str().unwrap().to_string()))
398 } else if v.is_object() {
399 serde_json::from_value::<Description>(json_value_to_serde_value(v)).ok()
400 } else {
401 None
402 }
403 });
404
405 let schema = self.parse_schema_array_to_odcs(&json_data)?;
407
408 let servers: Vec<Server> = json_data
410 .get("servers")
411 .and_then(|v| v.as_array())
412 .map(|arr| {
413 arr.iter()
414 .filter_map(|s| serde_json::from_value(json_value_to_serde_value(s)).ok())
415 .collect()
416 })
417 .unwrap_or_default();
418
419 let team: Option<Team> = json_data
421 .get("team")
422 .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
423
424 let support: Option<Support> = json_data
426 .get("support")
427 .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
428
429 let roles: Vec<Role> = json_data
431 .get("roles")
432 .and_then(|v| v.as_array())
433 .map(|arr| {
434 arr.iter()
435 .filter_map(|r| serde_json::from_value(json_value_to_serde_value(r)).ok())
436 .collect()
437 })
438 .unwrap_or_default();
439
440 let service_levels: Vec<ServiceLevel> = json_data
442 .get("slaProperties")
443 .or_else(|| json_data.get("serviceLevels"))
444 .and_then(|v| v.as_array())
445 .map(|arr| {
446 arr.iter()
447 .filter_map(|s| serde_json::from_value(json_value_to_serde_value(s)).ok())
448 .collect()
449 })
450 .unwrap_or_default();
451
452 let quality: Vec<QualityRule> = json_data
454 .get("quality")
455 .and_then(|v| v.as_array())
456 .map(|arr| {
457 arr.iter()
458 .filter_map(|q| serde_json::from_value(json_value_to_serde_value(q)).ok())
459 .collect()
460 })
461 .unwrap_or_default();
462
463 let price = json_data
465 .get("price")
466 .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
467
468 let terms = json_data
470 .get("terms")
471 .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
472
473 let links = json_data
475 .get("links")
476 .and_then(|v| v.as_array())
477 .map(|arr| {
478 arr.iter()
479 .filter_map(|l| serde_json::from_value(json_value_to_serde_value(l)).ok())
480 .collect()
481 })
482 .unwrap_or_default();
483
484 let authoritative_definitions: Vec<AuthoritativeDefinition> = json_data
486 .get("authoritativeDefinitions")
487 .and_then(|v| v.as_array())
488 .map(|arr| {
489 arr.iter()
490 .filter_map(|ad| serde_json::from_value(json_value_to_serde_value(ad)).ok())
491 .collect()
492 })
493 .unwrap_or_default();
494
495 let tags: Vec<String> = json_data
497 .get("tags")
498 .and_then(|v| v.as_array())
499 .map(|arr| {
500 arr.iter()
501 .filter_map(|t| t.as_str().map(|s| s.to_string()))
502 .collect()
503 })
504 .unwrap_or_default();
505
506 let custom_properties: Vec<CustomProperty> = json_data
508 .get("customProperties")
509 .and_then(|v| v.as_array())
510 .map(|arr| {
511 arr.iter()
512 .filter_map(|cp| serde_json::from_value(json_value_to_serde_value(cp)).ok())
513 .collect()
514 })
515 .unwrap_or_default();
516
517 let contract_created_ts = json_data
519 .get("contractCreatedTs")
520 .and_then(|v| v.as_str())
521 .map(|s| s.to_string());
522
523 Ok(ODCSContract {
524 api_version,
525 kind,
526 id,
527 version,
528 name,
529 status,
530 domain,
531 data_product,
532 tenant,
533 description,
534 schema,
535 servers,
536 team,
537 support,
538 roles,
539 service_levels,
540 quality,
541 price,
542 terms,
543 links,
544 authoritative_definitions,
545 tags,
546 custom_properties,
547 contract_created_ts,
548 })
549 }
550
551 fn parse_schema_array_to_odcs(
553 &self,
554 json_data: &JsonValue,
555 ) -> Result<Vec<crate::models::odcs::SchemaObject>, ImportError> {
556 use crate::models::odcs::{
557 AuthoritativeDefinition, CustomProperty, QualityRule, SchemaObject, SchemaRelationship,
558 };
559
560 let schema_arr = match json_data.get("schema").and_then(|v| v.as_array()) {
561 Some(arr) => arr,
562 None => return Ok(Vec::new()),
563 };
564
565 let mut schemas = Vec::new();
566
567 for (idx, schema_value) in schema_arr.iter().enumerate() {
568 let schema_obj = match schema_value.as_object() {
569 Some(obj) => obj,
570 None => {
571 return Err(ImportError::ParseError(format!(
572 "Schema object at index {} must be a dictionary",
573 idx
574 )));
575 }
576 };
577
578 let name = schema_obj
579 .get("name")
580 .and_then(|v| v.as_str())
581 .ok_or_else(|| {
582 ImportError::ParseError(format!(
583 "Schema object at index {} missing 'name' field",
584 idx
585 ))
586 })?
587 .to_string();
588
589 let id = schema_obj
590 .get("id")
591 .and_then(|v| v.as_str())
592 .map(|s| s.to_string());
593 let physical_name = schema_obj
594 .get("physicalName")
595 .and_then(|v| v.as_str())
596 .map(|s| s.to_string());
597 let physical_type = schema_obj
598 .get("physicalType")
599 .and_then(|v| v.as_str())
600 .map(|s| s.to_string());
601 let business_name = schema_obj
602 .get("businessName")
603 .and_then(|v| v.as_str())
604 .map(|s| s.to_string());
605 let description = schema_obj
606 .get("description")
607 .and_then(|v| v.as_str())
608 .map(|s| s.to_string());
609 let data_granularity_description = schema_obj
610 .get("dataGranularityDescription")
611 .and_then(|v| v.as_str())
612 .map(|s| s.to_string());
613
614 let properties = self.parse_properties_to_odcs(schema_obj, json_data)?;
616
617 let relationships: Vec<SchemaRelationship> = schema_obj
619 .get("relationships")
620 .and_then(|v| v.as_array())
621 .map(|arr| {
622 arr.iter()
623 .filter_map(|r| serde_json::from_value(json_value_to_serde_value(r)).ok())
624 .collect()
625 })
626 .unwrap_or_default();
627
628 let quality: Vec<QualityRule> = schema_obj
630 .get("quality")
631 .and_then(|v| v.as_array())
632 .map(|arr| {
633 arr.iter()
634 .filter_map(|q| serde_json::from_value(json_value_to_serde_value(q)).ok())
635 .collect()
636 })
637 .unwrap_or_default();
638
639 let authoritative_definitions: Vec<AuthoritativeDefinition> = schema_obj
641 .get("authoritativeDefinitions")
642 .and_then(|v| v.as_array())
643 .map(|arr| {
644 arr.iter()
645 .filter_map(|ad| serde_json::from_value(json_value_to_serde_value(ad)).ok())
646 .collect()
647 })
648 .unwrap_or_default();
649
650 let tags: Vec<String> = schema_obj
652 .get("tags")
653 .and_then(|v| v.as_array())
654 .map(|arr| {
655 arr.iter()
656 .filter_map(|t| t.as_str().map(|s| s.to_string()))
657 .collect()
658 })
659 .unwrap_or_default();
660
661 let custom_properties: Vec<CustomProperty> = schema_obj
663 .get("customProperties")
664 .and_then(|v| v.as_array())
665 .map(|arr| {
666 arr.iter()
667 .filter_map(|cp| serde_json::from_value(json_value_to_serde_value(cp)).ok())
668 .collect()
669 })
670 .unwrap_or_default();
671
672 schemas.push(SchemaObject {
673 id,
674 name,
675 physical_name,
676 physical_type,
677 business_name,
678 description,
679 data_granularity_description,
680 properties,
681 relationships,
682 quality,
683 authoritative_definitions,
684 tags,
685 custom_properties,
686 });
687 }
688
689 Ok(schemas)
690 }
691
692 fn parse_properties_to_odcs(
694 &self,
695 schema_obj: &serde_json::Map<String, JsonValue>,
696 root_data: &JsonValue,
697 ) -> Result<Vec<crate::models::odcs::Property>, ImportError> {
698 let props_arr = match schema_obj.get("properties").and_then(|v| v.as_array()) {
699 Some(arr) => arr,
700 None => return Ok(Vec::new()),
701 };
702
703 let mut properties = Vec::new();
704
705 for prop_value in props_arr {
706 let prop_obj = match prop_value.as_object() {
707 Some(obj) => obj,
708 None => continue,
709 };
710
711 let prop = self.parse_single_property_to_odcs(prop_obj, root_data)?;
712 properties.push(prop);
713 }
714
715 Ok(properties)
716 }
717
718 #[allow(clippy::only_used_in_recursion)]
720 fn parse_single_property_to_odcs(
721 &self,
722 prop_obj: &serde_json::Map<String, JsonValue>,
723 root_data: &JsonValue,
724 ) -> Result<crate::models::odcs::Property, ImportError> {
725 use crate::models::odcs::{
726 AuthoritativeDefinition, CustomProperty, LogicalTypeOptions, Property,
727 PropertyRelationship, QualityRule,
728 };
729
730 if let Some(ref_path) = prop_obj.get("$ref").and_then(|v| v.as_str())
732 && let Some(resolved) = resolve_ref(ref_path, root_data)
733 && let Some(obj) = resolved.as_object()
734 {
735 return self.parse_single_property_to_odcs(obj, root_data);
736 }
737
738 let name = prop_obj
739 .get("name")
740 .and_then(|v| v.as_str())
741 .unwrap_or("")
742 .to_string();
743 let logical_type = prop_obj
744 .get("logicalType")
745 .or_else(|| prop_obj.get("type"))
746 .and_then(|v| v.as_str())
747 .unwrap_or("string")
748 .to_string();
749
750 let id = prop_obj
751 .get("id")
752 .and_then(|v| v.as_str())
753 .map(|s| s.to_string());
754 let business_name = prop_obj
755 .get("businessName")
756 .and_then(|v| v.as_str())
757 .map(|s| s.to_string());
758 let description = prop_obj
759 .get("description")
760 .and_then(|v| v.as_str())
761 .map(|s| s.to_string());
762 let physical_type = prop_obj
763 .get("physicalType")
764 .and_then(|v| v.as_str())
765 .map(|s| s.to_string());
766 let physical_name = prop_obj
767 .get("physicalName")
768 .and_then(|v| v.as_str())
769 .map(|s| s.to_string());
770
771 let logical_type_options: Option<LogicalTypeOptions> = prop_obj
773 .get("logicalTypeOptions")
774 .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
775
776 let required = prop_obj
778 .get("required")
779 .and_then(|v| v.as_bool())
780 .unwrap_or(false);
781 let primary_key = prop_obj
782 .get("primaryKey")
783 .and_then(|v| v.as_bool())
784 .unwrap_or(false);
785 let primary_key_position = prop_obj
786 .get("primaryKeyPosition")
787 .and_then(|v| v.as_i64())
788 .map(|n| n as i32);
789 let unique = prop_obj
790 .get("unique")
791 .and_then(|v| v.as_bool())
792 .unwrap_or(false);
793
794 let partitioned = prop_obj
796 .get("partitioned")
797 .and_then(|v| v.as_bool())
798 .unwrap_or(false);
799 let partition_key_position = prop_obj
800 .get("partitionKeyPosition")
801 .and_then(|v| v.as_i64())
802 .map(|n| n as i32);
803 let clustered = prop_obj
804 .get("clustered")
805 .and_then(|v| v.as_bool())
806 .unwrap_or(false);
807
808 let classification = prop_obj
810 .get("classification")
811 .and_then(|v| v.as_str())
812 .map(|s| s.to_string());
813 let critical_data_element = prop_obj
814 .get("criticalDataElement")
815 .and_then(|v| v.as_bool())
816 .unwrap_or(false);
817 let encrypted_name = prop_obj
818 .get("encryptedName")
819 .and_then(|v| v.as_str())
820 .map(|s| s.to_string());
821
822 let transform_source_objects: Vec<String> = prop_obj
824 .get("transformSourceObjects")
825 .and_then(|v| v.as_array())
826 .map(|arr| {
827 arr.iter()
828 .filter_map(|t| t.as_str().map(|s| s.to_string()))
829 .collect()
830 })
831 .unwrap_or_default();
832 let transform_logic = prop_obj
833 .get("transformLogic")
834 .and_then(|v| v.as_str())
835 .map(|s| s.to_string());
836 let transform_description = prop_obj
837 .get("transformDescription")
838 .and_then(|v| v.as_str())
839 .map(|s| s.to_string());
840
841 let examples: Vec<serde_json::Value> = prop_obj
843 .get("examples")
844 .and_then(|v| v.as_array())
845 .map(|arr| arr.iter().map(json_value_to_serde_value).collect())
846 .unwrap_or_default();
847 let default_value = prop_obj.get("default").map(json_value_to_serde_value);
848
849 let relationships: Vec<PropertyRelationship> = prop_obj
851 .get("relationships")
852 .and_then(|v| v.as_array())
853 .map(|arr| {
854 arr.iter()
855 .filter_map(|r| serde_json::from_value(json_value_to_serde_value(r)).ok())
856 .collect()
857 })
858 .unwrap_or_default();
859
860 let authoritative_definitions: Vec<AuthoritativeDefinition> = prop_obj
862 .get("authoritativeDefinitions")
863 .and_then(|v| v.as_array())
864 .map(|arr| {
865 arr.iter()
866 .filter_map(|ad| serde_json::from_value(json_value_to_serde_value(ad)).ok())
867 .collect()
868 })
869 .unwrap_or_default();
870
871 let quality: Vec<QualityRule> = prop_obj
873 .get("quality")
874 .and_then(|v| v.as_array())
875 .map(|arr| {
876 arr.iter()
877 .filter_map(|q| serde_json::from_value(json_value_to_serde_value(q)).ok())
878 .collect()
879 })
880 .unwrap_or_default();
881
882 let enum_values: Vec<String> = prop_obj
884 .get("enum")
885 .and_then(|v| v.as_array())
886 .map(|arr| {
887 arr.iter()
888 .filter_map(|e| e.as_str().map(|s| s.to_string()))
889 .collect()
890 })
891 .unwrap_or_default();
892
893 let tags: Vec<String> = prop_obj
895 .get("tags")
896 .and_then(|v| v.as_array())
897 .map(|arr| {
898 arr.iter()
899 .filter_map(|t| t.as_str().map(|s| s.to_string()))
900 .collect()
901 })
902 .unwrap_or_default();
903
904 let custom_properties: Vec<CustomProperty> = prop_obj
906 .get("customProperties")
907 .and_then(|v| v.as_array())
908 .map(|arr| {
909 arr.iter()
910 .filter_map(|cp| serde_json::from_value(json_value_to_serde_value(cp)).ok())
911 .collect()
912 })
913 .unwrap_or_default();
914
915 let nested_properties: Vec<Property> =
917 if let Some(props) = prop_obj.get("properties").and_then(|v| v.as_array()) {
918 props
919 .iter()
920 .filter_map(|p| {
921 p.as_object()
922 .and_then(|obj| self.parse_single_property_to_odcs(obj, root_data).ok())
923 })
924 .collect()
925 } else {
926 Vec::new()
927 };
928
929 let items: Option<Box<Property>> = prop_obj.get("items").and_then(|v| {
931 v.as_object()
932 .and_then(|obj| self.parse_single_property_to_odcs(obj, root_data).ok())
933 .map(Box::new)
934 });
935
936 Ok(Property {
937 id,
938 name,
939 business_name,
940 description,
941 logical_type,
942 physical_type,
943 physical_name,
944 logical_type_options,
945 required,
946 primary_key,
947 primary_key_position,
948 unique,
949 partitioned,
950 partition_key_position,
951 clustered,
952 classification,
953 critical_data_element,
954 encrypted_name,
955 transform_source_objects,
956 transform_logic,
957 transform_description,
958 examples,
959 default_value,
960 relationships,
961 authoritative_definitions,
962 quality,
963 enum_values,
964 tags,
965 custom_properties,
966 items,
967 properties: nested_properties,
968 })
969 }
970
971 pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
982 self.parse(yaml_content)
983 }
984
985 fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
995 let _errors: Vec<ParserError> = Vec::new();
997
998 let data: serde_yaml::Value =
1000 serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
1001
1002 if data.is_null() {
1003 return Err(anyhow::anyhow!("Empty YAML content"));
1004 }
1005
1006 self.current_yaml_data = Some(data.clone());
1008
1009 let json_data = yaml_to_json_value(&data)?;
1011
1012 if self.is_liquibase_format(&json_data) {
1014 return self.parse_liquibase(&json_data);
1015 }
1016
1017 if self.is_odcl_v3_format(&json_data) {
1018 return self.parse_odcl_v3(&json_data);
1019 }
1020
1021 if self.is_data_contract_format(&json_data) {
1022 return self.parse_data_contract(&json_data);
1023 }
1024
1025 self.parse_simple_odcl(&json_data)
1027 }
1028
1029 fn is_liquibase_format(&self, data: &JsonValue) -> bool {
1031 if data.get("databaseChangeLog").is_some() {
1032 return true;
1033 }
1034 if let Some(obj) = data.as_object() {
1036 let obj_str = format!("{:?}", obj);
1037 if obj_str.contains("changeSet") {
1038 return true;
1039 }
1040 }
1041 false
1042 }
1043
1044 fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
1046 if let Some(obj) = data.as_object() {
1047 let has_api_version = obj.contains_key("apiVersion");
1048 let has_kind = obj
1049 .get("kind")
1050 .and_then(|v| v.as_str())
1051 .map(|s| s == "DataContract")
1052 .unwrap_or(false);
1053 let has_id = obj.contains_key("id");
1054 let has_version = obj.contains_key("version");
1055 return has_api_version && has_kind && has_id && has_version;
1056 }
1057 false
1058 }
1059
1060 fn is_data_contract_format(&self, data: &JsonValue) -> bool {
1062 if let Some(obj) = data.as_object() {
1063 let has_spec = obj.contains_key("dataContractSpecification");
1064 let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
1065 return has_spec && has_models;
1066 }
1067 false
1068 }
1069
1070 fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1072 let mut errors = Vec::new();
1073
1074 let name = data
1076 .get("name")
1077 .and_then(|v| v.as_str())
1078 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
1079 .to_string();
1080
1081 let columns_data = data
1083 .get("columns")
1084 .and_then(|v| v.as_array())
1085 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
1086
1087 let mut columns = Vec::new();
1088 for (idx, col_data) in columns_data.iter().enumerate() {
1089 match self.parse_column(col_data) {
1090 Ok(col) => columns.push(col),
1091 Err(e) => {
1092 errors.push(ParserError {
1093 error_type: "column_parse_error".to_string(),
1094 field: format!("columns[{}]", idx),
1095 message: e.to_string(),
1096 });
1097 }
1098 }
1099 }
1100
1101 let database_type = self.extract_database_type(data);
1103 let medallion_layers = self.extract_medallion_layers(data);
1104 let scd_pattern = self.extract_scd_pattern(data);
1105 let data_vault_classification = self.extract_data_vault_classification(data);
1106 let quality_rules = self.extract_quality_rules(data);
1107
1108 if scd_pattern.is_some() && data_vault_classification.is_some() {
1110 errors.push(ParserError {
1111 error_type: "validation_error".to_string(),
1112 field: "patterns".to_string(),
1113 message: "SCD pattern and Data Vault classification are mutually exclusive"
1114 .to_string(),
1115 });
1116 }
1117
1118 let mut odcl_metadata = HashMap::new();
1120 if let Some(metadata) = data.get("odcl_metadata")
1121 && let Some(obj) = metadata.as_object()
1122 {
1123 for (key, value) in obj {
1124 odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
1125 }
1126 }
1127
1128 let table_uuid = self.extract_table_uuid(data);
1129
1130 let table = Table {
1131 id: table_uuid,
1132 name,
1133 columns,
1134 database_type,
1135 catalog_name: None,
1136 schema_name: None,
1137 medallion_layers,
1138 scd_pattern,
1139 data_vault_classification,
1140 modeling_level: None,
1141 tags: Vec::<Tag>::new(),
1142 odcl_metadata,
1143 owner: None,
1144 sla: None,
1145 contact_details: None,
1146 infrastructure_type: None,
1147 notes: None,
1148 position: None,
1149 yaml_file_path: None,
1150 drawio_cell_id: None,
1151 quality: quality_rules,
1152 errors: Vec::new(),
1153 created_at: chrono::Utc::now(),
1154 updated_at: chrono::Utc::now(),
1155 };
1156
1157 info!("Parsed ODCL table: {}", table.name);
1158 Ok((table, errors))
1159 }
1160
1161 fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
1163 let name = col_data
1164 .get("name")
1165 .and_then(|v| v.as_str())
1166 .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
1167 .to_string();
1168
1169 let data_type = col_data
1170 .get("data_type")
1171 .and_then(|v| v.as_str())
1172 .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
1173 .to_string();
1174
1175 let data_type = normalize_data_type(&data_type);
1177
1178 let nullable = col_data
1179 .get("nullable")
1180 .and_then(|v| v.as_bool())
1181 .unwrap_or(true);
1182
1183 let primary_key = col_data
1184 .get("primary_key")
1185 .and_then(|v| v.as_bool())
1186 .unwrap_or(false);
1187
1188 let foreign_key = col_data
1189 .get("foreign_key")
1190 .and_then(|v| self.parse_foreign_key(v));
1191
1192 let constraints = col_data
1193 .get("constraints")
1194 .and_then(|v| v.as_array())
1195 .map(|arr| {
1196 arr.iter()
1197 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1198 .collect()
1199 })
1200 .unwrap_or_default();
1201
1202 let description = col_data
1203 .get("description")
1204 .and_then(|v| v.as_str())
1205 .map(|s| s.to_string())
1206 .unwrap_or_default();
1207
1208 let mut column_quality_rules = Vec::new();
1210 if let Some(quality_val) = col_data.get("quality") {
1211 if let Some(arr) = quality_val.as_array() {
1212 for item in arr {
1214 if let Some(obj) = item.as_object() {
1215 let mut rule = HashMap::new();
1216 for (key, value) in obj {
1217 rule.insert(key.clone(), json_value_to_serde_value(value));
1218 }
1219 column_quality_rules.push(rule);
1220 }
1221 }
1222 } else if let Some(obj) = quality_val.as_object() {
1223 let mut rule = HashMap::new();
1225 for (key, value) in obj {
1226 rule.insert(key.clone(), json_value_to_serde_value(value));
1227 }
1228 column_quality_rules.push(rule);
1229 }
1230 }
1231
1232 Ok(Column {
1237 name,
1238 data_type,
1239 nullable,
1240 primary_key,
1241 foreign_key,
1242 constraints,
1243 description,
1244 quality: column_quality_rules,
1245 ..Default::default()
1246 })
1247 }
1248
1249 fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
1251 let obj = fk_data.as_object()?;
1252 Some(ForeignKey {
1253 table_id: obj
1254 .get("table_id")
1255 .or_else(|| obj.get("table"))
1256 .and_then(|v| v.as_str())
1257 .unwrap_or("")
1258 .to_string(),
1259 column_name: obj
1260 .get("column_name")
1261 .or_else(|| obj.get("column"))
1262 .and_then(|v| v.as_str())
1263 .unwrap_or("")
1264 .to_string(),
1265 })
1266 }
1267
1268 fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
1270 data.get("database_type")
1271 .and_then(|v| v.as_str())
1272 .and_then(|s| match s.to_uppercase().as_str() {
1273 "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
1274 "MYSQL" => Some(DatabaseType::Mysql),
1275 "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
1276 "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
1277 "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
1278 _ => None,
1279 })
1280 }
1281
1282 fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
1284 let mut layers = Vec::new();
1285
1286 if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
1288 for item in arr {
1289 if let Some(s) = item.as_str()
1290 && let Ok(layer) = parse_medallion_layer(s)
1291 {
1292 layers.push(layer);
1293 }
1294 }
1295 }
1296 else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
1298 && let Ok(layer) = parse_medallion_layer(s)
1299 {
1300 layers.push(layer);
1301 }
1302
1303 layers
1304 }
1305
1306 fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
1308 data.get("scd_pattern")
1309 .and_then(|v| v.as_str())
1310 .and_then(|s| parse_scd_pattern(s).ok())
1311 }
1312
1313 fn extract_data_vault_classification(
1315 &self,
1316 data: &JsonValue,
1317 ) -> Option<DataVaultClassification> {
1318 data.get("data_vault_classification")
1319 .and_then(|v| v.as_str())
1320 .and_then(|s| parse_data_vault_classification(s).ok())
1321 }
1322
1323 fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
1325 use serde_json::Value;
1326 let mut quality_rules = Vec::new();
1327
1328 if let Some(quality_val) = data.get("quality") {
1330 if let Some(arr) = quality_val.as_array() {
1331 for item in arr {
1333 if let Some(obj) = item.as_object() {
1334 let mut rule = HashMap::new();
1335 for (key, value) in obj {
1336 rule.insert(key.clone(), json_value_to_serde_value(value));
1337 }
1338 quality_rules.push(rule);
1339 }
1340 }
1341 } else if let Some(obj) = quality_val.as_object() {
1342 let mut rule = HashMap::new();
1344 for (key, value) in obj {
1345 rule.insert(key.clone(), json_value_to_serde_value(value));
1346 }
1347 quality_rules.push(rule);
1348 } else if let Some(s) = quality_val.as_str() {
1349 let mut rule = HashMap::new();
1351 rule.insert("value".to_string(), Value::String(s.to_string()));
1352 quality_rules.push(rule);
1353 }
1354 }
1355
1356 if let Some(metadata) = data.get("metadata")
1358 && let Some(metadata_obj) = metadata.as_object()
1359 && let Some(quality_val) = metadata_obj.get("quality")
1360 {
1361 if let Some(arr) = quality_val.as_array() {
1362 for item in arr {
1364 if let Some(obj) = item.as_object() {
1365 let mut rule = HashMap::new();
1366 for (key, value) in obj {
1367 rule.insert(key.clone(), json_value_to_serde_value(value));
1368 }
1369 quality_rules.push(rule);
1370 }
1371 }
1372 } else if let Some(obj) = quality_val.as_object() {
1373 let mut rule = HashMap::new();
1375 for (key, value) in obj {
1376 rule.insert(key.clone(), json_value_to_serde_value(value));
1377 }
1378 quality_rules.push(rule);
1379 } else if let Some(s) = quality_val.as_str() {
1380 let mut rule = HashMap::new();
1382 rule.insert("value".to_string(), Value::String(s.to_string()));
1383 quality_rules.push(rule);
1384 }
1385 }
1386
1387 if let Some(tblprops) = data.get("tblproperties")
1389 && let Some(obj) = tblprops.as_object()
1390 {
1391 for (key, value) in obj {
1392 let mut rule = HashMap::new();
1393 rule.insert("property".to_string(), Value::String(key.clone()));
1394 rule.insert("value".to_string(), json_value_to_serde_value(value));
1395 quality_rules.push(rule);
1396 }
1397 }
1398
1399 quality_rules
1400 }
1401
1402 fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1410 let mut errors = Vec::new();
1424
1425 let changelog = data
1426 .get("databaseChangeLog")
1427 .and_then(|v| v.as_array())
1428 .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
1429
1430 let mut table_name: Option<String> = None;
1432 let mut columns: Vec<crate::models::column::Column> = Vec::new();
1433
1434 for entry in changelog {
1435 if let Some(change_set) = entry.get("changeSet") {
1437 let changes = if let Some(obj) = change_set.as_object() {
1439 obj.get("changes")
1440 .and_then(|v| v.as_array())
1441 .cloned()
1442 .unwrap_or_default()
1443 } else if let Some(arr) = change_set.as_array() {
1444 arr.clone()
1446 } else {
1447 Vec::new()
1448 };
1449
1450 for ch in changes {
1451 let create = ch.get("createTable").or_else(|| ch.get("create_table"));
1452 if let Some(create) = create {
1453 table_name = create
1454 .get("tableName")
1455 .or_else(|| create.get("table_name"))
1456 .and_then(|v| v.as_str())
1457 .map(|s| s.to_string());
1458
1459 if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
1461 for col_entry in cols {
1462 let col = col_entry.get("column").unwrap_or(col_entry);
1463 let name = col
1464 .get("name")
1465 .and_then(|v| v.as_str())
1466 .unwrap_or("")
1467 .to_string();
1468 let data_type = col
1469 .get("type")
1470 .and_then(|v| v.as_str())
1471 .unwrap_or("")
1472 .to_string();
1473
1474 if name.is_empty() {
1475 errors.push(ParserError {
1476 error_type: "validation_error".to_string(),
1477 field: "columns.name".to_string(),
1478 message: "Liquibase createTable column missing name"
1479 .to_string(),
1480 });
1481 continue;
1482 }
1483
1484 let mut column =
1485 crate::models::column::Column::new(name, data_type);
1486
1487 if let Some(constraints) =
1488 col.get("constraints").and_then(|v| v.as_object())
1489 {
1490 if let Some(pk) =
1491 constraints.get("primaryKey").and_then(|v| v.as_bool())
1492 {
1493 column.primary_key = pk;
1494 }
1495 if let Some(nullable) =
1496 constraints.get("nullable").and_then(|v| v.as_bool())
1497 {
1498 column.nullable = nullable;
1499 }
1500 }
1501
1502 columns.push(column);
1503 }
1504 }
1505
1506 break;
1509 }
1510 }
1511 }
1512 if table_name.is_some() {
1513 break;
1514 }
1515 }
1516
1517 let table_name = table_name
1518 .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
1519 let table = Table::new(table_name, columns);
1520 Ok((table, errors))
1522 }
1523
1524 fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1526 let mut errors = Vec::new();
1527
1528 let table_name = data
1530 .get("name")
1531 .and_then(|v| v.as_str())
1532 .map(|s| s.to_string())
1533 .or_else(|| {
1534 data.get("schema")
1536 .and_then(|v| v.as_array())
1537 .and_then(|arr| arr.first())
1538 .and_then(|obj| obj.as_object())
1539 .and_then(|obj| obj.get("name"))
1540 .and_then(|v| v.as_str())
1541 .map(|s| s.to_string())
1542 })
1543 .ok_or_else(|| {
1544 anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
1545 })?;
1546
1547 let schema = data
1549 .get("schema")
1550 .and_then(|v| v.as_array())
1551 .ok_or_else(|| {
1552 errors.push(ParserError {
1553 error_type: "validation_error".to_string(),
1554 field: "schema".to_string(),
1555 message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
1556 });
1557 anyhow::anyhow!("Missing schema")
1558 });
1559
1560 let schema = match schema {
1561 Ok(s) if s.is_empty() => {
1562 errors.push(ParserError {
1563 error_type: "validation_error".to_string(),
1564 field: "schema".to_string(),
1565 message: "ODCS v3.0.x schema array is empty".to_string(),
1566 });
1567 let quality_rules = self.extract_quality_rules(data);
1568 let table_uuid = self.extract_table_uuid(data);
1569 let table = Table {
1570 id: table_uuid,
1571 name: table_name,
1572 columns: Vec::new(),
1573 database_type: None,
1574 catalog_name: None,
1575 schema_name: None,
1576 medallion_layers: Vec::new(),
1577 scd_pattern: None,
1578 data_vault_classification: None,
1579 modeling_level: None,
1580 tags: Vec::<Tag>::new(),
1581 odcl_metadata: HashMap::new(),
1582 owner: None,
1583 sla: None,
1584 contact_details: None,
1585 infrastructure_type: None,
1586 notes: None,
1587 position: None,
1588 yaml_file_path: None,
1589 drawio_cell_id: None,
1590 quality: quality_rules,
1591 errors: Vec::new(),
1592 created_at: chrono::Utc::now(),
1593 updated_at: chrono::Utc::now(),
1594 };
1595 return Ok((table, errors));
1596 }
1597 Ok(s) => s,
1598 Err(_) => {
1599 let quality_rules = self.extract_quality_rules(data);
1600 let table_uuid = self.extract_table_uuid(data);
1601 let table = Table {
1602 id: table_uuid,
1603 name: table_name,
1604 columns: Vec::new(),
1605 database_type: None,
1606 catalog_name: None,
1607 schema_name: None,
1608 medallion_layers: Vec::new(),
1609 scd_pattern: None,
1610 data_vault_classification: None,
1611 modeling_level: None,
1612 tags: Vec::<Tag>::new(),
1613 odcl_metadata: HashMap::new(),
1614 owner: None,
1615 sla: None,
1616 contact_details: None,
1617 infrastructure_type: None,
1618 notes: None,
1619 position: None,
1620 yaml_file_path: None,
1621 drawio_cell_id: None,
1622 quality: quality_rules,
1623 errors: Vec::new(),
1624 created_at: chrono::Utc::now(),
1625 updated_at: chrono::Utc::now(),
1626 };
1627 return Ok((table, errors));
1628 }
1629 };
1630
1631 let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
1633 errors.push(ParserError {
1634 error_type: "validation_error".to_string(),
1635 field: "schema[0]".to_string(),
1636 message: "First schema object must be a dictionary".to_string(),
1637 });
1638 anyhow::anyhow!("Invalid schema object")
1639 })?;
1640
1641 let object_name = schema_object
1642 .get("name")
1643 .and_then(|v| v.as_str())
1644 .unwrap_or(&table_name);
1645
1646 let mut columns = Vec::new();
1648
1649 if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
1650 for (prop_name, prop_data) in properties_obj {
1652 if let Some(prop_obj) = prop_data.as_object() {
1653 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
1654 Ok(mut cols) => columns.append(&mut cols),
1655 Err(e) => {
1656 errors.push(ParserError {
1657 error_type: "property_parse_error".to_string(),
1658 field: format!("Property '{}'", prop_name),
1659 message: e.to_string(),
1660 });
1661 }
1662 }
1663 } else {
1664 errors.push(ParserError {
1665 error_type: "validation_error".to_string(),
1666 field: format!("Property '{}'", prop_name),
1667 message: format!("Property '{}' must be an object", prop_name),
1668 });
1669 }
1670 }
1671 } else if let Some(properties_arr) =
1672 schema_object.get("properties").and_then(|v| v.as_array())
1673 {
1674 for (idx, prop_data) in properties_arr.iter().enumerate() {
1676 if let Some(prop_obj) = prop_data.as_object() {
1677 let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
1680 Some(JsonValue::String(s)) => s.as_str(),
1681 _ => {
1682 errors.push(ParserError {
1684 error_type: "validation_error".to_string(),
1685 field: format!("Property[{}]", idx),
1686 message: format!(
1687 "Property[{}] missing required 'name' or 'id' field",
1688 idx
1689 ),
1690 });
1691 continue;
1692 }
1693 };
1694
1695 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
1696 Ok(mut cols) => columns.append(&mut cols),
1697 Err(e) => {
1698 errors.push(ParserError {
1699 error_type: "property_parse_error".to_string(),
1700 field: format!("Property[{}] '{}'", idx, prop_name),
1701 message: e.to_string(),
1702 });
1703 }
1704 }
1705 } else {
1706 errors.push(ParserError {
1707 error_type: "validation_error".to_string(),
1708 field: format!("Property[{}]", idx),
1709 message: format!("Property[{}] must be an object", idx),
1710 });
1711 }
1712 }
1713 } else {
1714 errors.push(ParserError {
1715 error_type: "validation_error".to_string(),
1716 field: format!("Object '{}'", object_name),
1717 message: format!(
1718 "Object '{}' missing 'properties' field or properties is invalid",
1719 object_name
1720 ),
1721 });
1722 }
1723
1724 let (medallion_layers, scd_pattern, data_vault_classification, mut tags): (
1726 _,
1727 _,
1728 _,
1729 Vec<Tag>,
1730 ) = self.extract_metadata_from_custom_properties(data);
1731
1732 let mut shared_domains: Vec<String> = Vec::new();
1734 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1735 for prop in custom_props {
1736 if let Some(prop_obj) = prop.as_object() {
1737 let prop_key = prop_obj
1738 .get("property")
1739 .and_then(|v| v.as_str())
1740 .unwrap_or("");
1741 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1742 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1743 {
1744 for item in arr {
1745 if let Some(s) = item.as_str() {
1746 shared_domains.push(s.to_string());
1747 }
1748 }
1749 }
1750 }
1751 }
1752 }
1753
1754 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1756 for item in tags_arr {
1757 if let Some(s) = item.as_str() {
1758 let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
1760 if !tags.contains(&tag) {
1761 tags.push(tag);
1762 }
1763 }
1764 }
1765 }
1766
1767 let database_type = self.extract_database_type_from_odcl_v3_servers(data);
1769
1770 let quality_rules = self.extract_quality_rules(data);
1772
1773 let mut odcl_metadata = HashMap::new();
1775 odcl_metadata.insert(
1776 "apiVersion".to_string(),
1777 json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
1778 );
1779 odcl_metadata.insert(
1780 "kind".to_string(),
1781 json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
1782 );
1783 odcl_metadata.insert(
1784 "id".to_string(),
1785 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1786 );
1787 odcl_metadata.insert(
1788 "version".to_string(),
1789 json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
1790 );
1791 odcl_metadata.insert(
1792 "status".to_string(),
1793 json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
1794 );
1795
1796 if let Some(servicelevels_val) = data.get("servicelevels") {
1798 odcl_metadata.insert(
1799 "servicelevels".to_string(),
1800 json_value_to_serde_value(servicelevels_val),
1801 );
1802 }
1803
1804 if let Some(links_val) = data.get("links") {
1806 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1807 }
1808
1809 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1811 odcl_metadata.insert(
1812 "domain".to_string(),
1813 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1814 );
1815 }
1816 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1817 odcl_metadata.insert(
1818 "dataProduct".to_string(),
1819 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1820 );
1821 }
1822 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1823 odcl_metadata.insert(
1824 "tenant".to_string(),
1825 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1826 );
1827 }
1828
1829 if let Some(desc_val) = data.get("description") {
1831 odcl_metadata.insert(
1832 "description".to_string(),
1833 json_value_to_serde_value(desc_val),
1834 );
1835 }
1836
1837 if let Some(pricing_val) = data.get("pricing") {
1839 odcl_metadata.insert(
1840 "pricing".to_string(),
1841 json_value_to_serde_value(pricing_val),
1842 );
1843 }
1844
1845 if let Some(team_val) = data.get("team") {
1847 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1848 }
1849
1850 if let Some(roles_val) = data.get("roles") {
1852 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1853 }
1854
1855 if let Some(terms_val) = data.get("terms") {
1857 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1858 }
1859
1860 if let Some(servers_val) = data.get("servers") {
1862 odcl_metadata.insert(
1863 "servers".to_string(),
1864 json_value_to_serde_value(servers_val),
1865 );
1866 }
1867
1868 if let Some(infrastructure_val) = data.get("infrastructure") {
1870 odcl_metadata.insert(
1871 "infrastructure".to_string(),
1872 json_value_to_serde_value(infrastructure_val),
1873 );
1874 }
1875
1876 if !shared_domains.is_empty() {
1878 let shared_domains_json: Vec<serde_json::Value> = shared_domains
1879 .iter()
1880 .map(|d| serde_json::Value::String(d.clone()))
1881 .collect();
1882 odcl_metadata.insert(
1883 "sharedDomains".to_string(),
1884 serde_json::Value::Array(shared_domains_json),
1885 );
1886 }
1887
1888 let table_uuid = self.extract_table_uuid(data);
1889
1890 let table = Table {
1891 id: table_uuid,
1892 name: table_name,
1893 columns,
1894 database_type,
1895 catalog_name: None,
1896 schema_name: None,
1897 medallion_layers,
1898 scd_pattern,
1899 data_vault_classification,
1900 modeling_level: None,
1901 tags,
1902 odcl_metadata,
1903 owner: None,
1904 sla: None,
1905 contact_details: None,
1906 infrastructure_type: None,
1907 notes: None,
1908 position: None,
1909 yaml_file_path: None,
1910 drawio_cell_id: None,
1911 quality: quality_rules,
1912 errors: Vec::new(),
1913 created_at: chrono::Utc::now(),
1914 updated_at: chrono::Utc::now(),
1915 };
1916
1917 info!(
1918 "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1919 table.name,
1920 errors.len()
1921 );
1922 Ok((table, errors))
1923 }
1924
1925 fn parse_odcl_v3_all(&self, data: &JsonValue) -> Result<Vec<(Table, Vec<ParserError>)>> {
1934 let mut all_tables = Vec::new();
1935
1936 let schema = match data.get("schema").and_then(|v| v.as_array()) {
1938 Some(s) if !s.is_empty() => s,
1939 Some(_) => {
1940 return Ok(Vec::new());
1942 }
1943 None => {
1944 return Ok(Vec::new());
1946 }
1947 };
1948
1949 let (medallion_layers, scd_pattern, data_vault_classification, contract_tags): (
1951 _,
1952 _,
1953 _,
1954 Vec<Tag>,
1955 ) = self.extract_metadata_from_custom_properties(data);
1956
1957 let database_type = self.extract_database_type_from_odcl_v3_servers(data);
1959
1960 let contract_quality_rules = self.extract_quality_rules(data);
1962
1963 let base_odcl_metadata = self.build_odcl_metadata(data);
1965
1966 for (schema_idx, schema_value) in schema.iter().enumerate() {
1968 let mut errors = Vec::new();
1969
1970 let schema_object = match schema_value.as_object() {
1971 Some(obj) => obj,
1972 None => {
1973 errors.push(ParserError {
1974 error_type: "validation_error".to_string(),
1975 field: format!("schema[{}]", schema_idx),
1976 message: format!(
1977 "Schema object at index {} must be a dictionary",
1978 schema_idx
1979 ),
1980 });
1981 continue;
1982 }
1983 };
1984
1985 let table_name = match schema_object.get("name").and_then(|v| v.as_str()) {
1987 Some(name) => name.to_string(),
1988 None => {
1989 errors.push(ParserError {
1990 error_type: "validation_error".to_string(),
1991 field: format!("schema[{}].name", schema_idx),
1992 message: format!(
1993 "Schema object at index {} missing 'name' field",
1994 schema_idx
1995 ),
1996 });
1997 continue;
1998 }
1999 };
2000
2001 let columns = self.parse_schema_object_properties(schema_object, data, &mut errors);
2003
2004 let mut tags = contract_tags.clone();
2006 if let Some(tags_arr) = schema_object.get("tags").and_then(|v| v.as_array()) {
2007 for item in tags_arr {
2008 if let Some(s) = item.as_str() {
2009 let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
2010 if !tags.contains(&tag) {
2011 tags.push(tag);
2012 }
2013 }
2014 }
2015 }
2016
2017 let mut quality_rules = contract_quality_rules.clone();
2019 if let Some(quality_arr) = schema_object.get("quality").and_then(|v| v.as_array()) {
2020 for rule in quality_arr {
2021 if let Some(rule_obj) = rule.as_object() {
2022 let mut rule_map = HashMap::new();
2023 for (k, v) in rule_obj {
2024 rule_map.insert(k.clone(), json_value_to_serde_value(v));
2025 }
2026 quality_rules.push(rule_map);
2027 }
2028 }
2029 }
2030
2031 let table_uuid = if let Some(id_str) = schema_object.get("id").and_then(|v| v.as_str())
2034 {
2035 uuid::Uuid::parse_str(id_str)
2036 .unwrap_or_else(|_| Table::generate_id(&table_name, None, None, None))
2037 } else {
2038 Table::generate_id(&table_name, None, None, None)
2040 };
2041
2042 let mut odcl_metadata = base_odcl_metadata.clone();
2044
2045 if let Some(desc) = schema_object.get("description") {
2047 odcl_metadata.insert(
2048 "schemaDescription".to_string(),
2049 json_value_to_serde_value(desc),
2050 );
2051 }
2052
2053 if let Some(phys_name) = schema_object.get("physicalName").and_then(|v| v.as_str()) {
2055 odcl_metadata.insert(
2056 "schemaPhysicalName".to_string(),
2057 serde_json::Value::String(phys_name.to_string()),
2058 );
2059 }
2060
2061 if let Some(phys_type) = schema_object.get("physicalType").and_then(|v| v.as_str()) {
2063 odcl_metadata.insert(
2064 "schemaPhysicalType".to_string(),
2065 serde_json::Value::String(phys_type.to_string()),
2066 );
2067 }
2068
2069 if let Some(biz_name) = schema_object.get("businessName").and_then(|v| v.as_str()) {
2071 odcl_metadata.insert(
2072 "schemaBusinessName".to_string(),
2073 serde_json::Value::String(biz_name.to_string()),
2074 );
2075 }
2076
2077 if let Some(granularity) = schema_object
2079 .get("dataGranularityDescription")
2080 .and_then(|v| v.as_str())
2081 {
2082 odcl_metadata.insert(
2083 "dataGranularityDescription".to_string(),
2084 serde_json::Value::String(granularity.to_string()),
2085 );
2086 }
2087
2088 if let Some(auth_defs) = schema_object.get("authoritativeDefinitions") {
2090 odcl_metadata.insert(
2091 "schemaAuthoritativeDefinitions".to_string(),
2092 json_value_to_serde_value(auth_defs),
2093 );
2094 }
2095
2096 if let Some(relationships) = schema_object.get("relationships") {
2098 odcl_metadata.insert(
2099 "schemaRelationships".to_string(),
2100 json_value_to_serde_value(relationships),
2101 );
2102 }
2103
2104 let table = Table {
2105 id: table_uuid,
2106 name: table_name.clone(),
2107 columns,
2108 database_type,
2109 catalog_name: None,
2110 schema_name: None,
2111 medallion_layers: medallion_layers.clone(),
2112 scd_pattern,
2113 data_vault_classification,
2114 modeling_level: None,
2115 tags,
2116 odcl_metadata,
2117 owner: None,
2118 sla: None,
2119 contact_details: None,
2120 infrastructure_type: None,
2121 notes: None,
2122 position: None,
2123 yaml_file_path: None,
2124 drawio_cell_id: None,
2125 quality: quality_rules,
2126 errors: Vec::new(),
2127 created_at: chrono::Utc::now(),
2128 updated_at: chrono::Utc::now(),
2129 };
2130
2131 info!(
2132 "Parsed ODCS v3 table {}/{}: {} with {} columns, {} warnings/errors",
2133 schema_idx + 1,
2134 schema.len(),
2135 table.name,
2136 table.columns.len(),
2137 errors.len()
2138 );
2139
2140 all_tables.push((table, errors));
2141 }
2142
2143 Ok(all_tables)
2144 }
2145
2146 fn parse_schema_object_properties(
2148 &self,
2149 schema_object: &serde_json::Map<String, JsonValue>,
2150 data: &JsonValue,
2151 errors: &mut Vec<ParserError>,
2152 ) -> Vec<Column> {
2153 let mut columns = Vec::new();
2154
2155 if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
2156 for (prop_name, prop_data) in properties_obj {
2158 if let Some(prop_obj) = prop_data.as_object() {
2159 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
2160 Ok(mut cols) => columns.append(&mut cols),
2161 Err(e) => {
2162 errors.push(ParserError {
2163 error_type: "property_parse_error".to_string(),
2164 field: format!("Property '{}'", prop_name),
2165 message: e.to_string(),
2166 });
2167 }
2168 }
2169 } else {
2170 errors.push(ParserError {
2171 error_type: "validation_error".to_string(),
2172 field: format!("Property '{}'", prop_name),
2173 message: format!("Property '{}' must be an object", prop_name),
2174 });
2175 }
2176 }
2177 } else if let Some(properties_arr) =
2178 schema_object.get("properties").and_then(|v| v.as_array())
2179 {
2180 for (idx, prop_data) in properties_arr.iter().enumerate() {
2182 if let Some(prop_obj) = prop_data.as_object() {
2183 let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
2184 Some(JsonValue::String(s)) => s.as_str(),
2185 _ => {
2186 errors.push(ParserError {
2187 error_type: "validation_error".to_string(),
2188 field: format!("Property[{}]", idx),
2189 message: format!(
2190 "Property[{}] missing required 'name' or 'id' field",
2191 idx
2192 ),
2193 });
2194 continue;
2195 }
2196 };
2197
2198 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
2199 Ok(mut cols) => columns.append(&mut cols),
2200 Err(e) => {
2201 errors.push(ParserError {
2202 error_type: "property_parse_error".to_string(),
2203 field: format!("Property[{}] '{}'", idx, prop_name),
2204 message: e.to_string(),
2205 });
2206 }
2207 }
2208 } else {
2209 errors.push(ParserError {
2210 error_type: "validation_error".to_string(),
2211 field: format!("Property[{}]", idx),
2212 message: format!("Property[{}] must be an object", idx),
2213 });
2214 }
2215 }
2216 }
2217
2218 columns
2219 }
2220
2221 fn build_odcl_metadata(&self, data: &JsonValue) -> HashMap<String, serde_json::Value> {
2223 let mut odcl_metadata = HashMap::new();
2224
2225 odcl_metadata.insert(
2227 "apiVersion".to_string(),
2228 json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
2229 );
2230 odcl_metadata.insert(
2231 "kind".to_string(),
2232 json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
2233 );
2234 odcl_metadata.insert(
2235 "id".to_string(),
2236 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
2237 );
2238 odcl_metadata.insert(
2239 "version".to_string(),
2240 json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
2241 );
2242 odcl_metadata.insert(
2243 "status".to_string(),
2244 json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
2245 );
2246
2247 let optional_fields = [
2249 "servicelevels",
2250 "links",
2251 "domain",
2252 "dataProduct",
2253 "tenant",
2254 "description",
2255 "pricing",
2256 "team",
2257 "roles",
2258 "terms",
2259 "servers",
2260 "infrastructure",
2261 ];
2262
2263 for field in optional_fields {
2264 if let Some(val) = data.get(field) {
2265 odcl_metadata.insert(field.to_string(), json_value_to_serde_value(val));
2266 }
2267 }
2268
2269 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2271 for prop in custom_props {
2272 if let Some(prop_obj) = prop.as_object() {
2273 let prop_key = prop_obj
2274 .get("property")
2275 .and_then(|v| v.as_str())
2276 .unwrap_or("");
2277 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
2278 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
2279 {
2280 let shared_domains: Vec<serde_json::Value> = arr
2281 .iter()
2282 .filter_map(|item| {
2283 item.as_str()
2284 .map(|s| serde_json::Value::String(s.to_string()))
2285 })
2286 .collect();
2287 if !shared_domains.is_empty() {
2288 odcl_metadata.insert(
2289 "sharedDomains".to_string(),
2290 serde_json::Value::Array(shared_domains),
2291 );
2292 }
2293 }
2294 }
2295 }
2296 }
2297
2298 odcl_metadata
2299 }
2300
2301 fn parse_odcl_v3_property(
2303 &self,
2304 prop_name: &str,
2305 prop_data: &serde_json::Map<String, JsonValue>,
2306 data: &JsonValue,
2307 ) -> Result<Vec<Column>> {
2308 let mut errors = Vec::new();
2310 self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
2311 }
2312
2313 fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
2316 if let Some(id_val) = data.get("id")
2318 && let Some(id_str) = id_val.as_str()
2319 {
2320 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
2321 tracing::debug!(
2322 "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
2323 uuid
2324 );
2325 return uuid;
2326 } else {
2327 tracing::warn!(
2328 "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
2329 id_str
2330 );
2331 }
2332 }
2333
2334 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2336 for prop in custom_props {
2337 if let Some(prop_obj) = prop.as_object() {
2338 let prop_key = prop_obj
2339 .get("property")
2340 .and_then(|v| v.as_str())
2341 .unwrap_or("");
2342 if prop_key == "tableUuid"
2343 && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
2344 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
2345 {
2346 tracing::debug!(
2347 "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
2348 uuid
2349 );
2350 return uuid;
2351 }
2352 }
2353 }
2354 }
2355
2356 if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
2358 && let Some(uuid_val) = metadata.get("tableUuid")
2359 && let Some(uuid_str) = uuid_val.as_str()
2360 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
2361 {
2362 tracing::debug!(
2363 "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
2364 uuid
2365 );
2366 return uuid;
2367 }
2368
2369 let table_name = data
2371 .get("name")
2372 .and_then(|v| v.as_str())
2373 .unwrap_or("unknown");
2374 let new_uuid = crate::models::table::Table::generate_id(
2375 table_name, None, None, None, );
2379 tracing::warn!(
2380 "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
2381 table_name,
2382 new_uuid
2383 );
2384 new_uuid
2385 }
2386
2387 fn extract_metadata_from_custom_properties(
2389 &self,
2390 data: &JsonValue,
2391 ) -> (
2392 Vec<MedallionLayer>,
2393 Option<SCDPattern>,
2394 Option<DataVaultClassification>,
2395 Vec<Tag>,
2396 ) {
2397 let mut medallion_layers = Vec::new();
2398 let mut scd_pattern = None;
2399 let mut data_vault_classification = None;
2400 let mut tags: Vec<Tag> = Vec::new();
2401
2402 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2403 for prop in custom_props {
2404 if let Some(prop_obj) = prop.as_object() {
2405 let prop_key = prop_obj
2406 .get("property")
2407 .and_then(|v| v.as_str())
2408 .unwrap_or("");
2409 let prop_value = prop_obj.get("value");
2410
2411 match prop_key {
2412 "medallionLayers" | "medallion_layers" => {
2413 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
2414 for item in arr {
2415 if let Some(s) = item.as_str()
2416 && let Ok(layer) = parse_medallion_layer(s)
2417 {
2418 medallion_layers.push(layer);
2419 }
2420 }
2421 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2422 for part in s.split(',') {
2424 if let Ok(layer) = parse_medallion_layer(part.trim()) {
2425 medallion_layers.push(layer);
2426 }
2427 }
2428 }
2429 }
2430 "scdPattern" | "scd_pattern" => {
2431 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2432 scd_pattern = parse_scd_pattern(s).ok();
2433 }
2434 }
2435 "dataVaultClassification" | "data_vault_classification" => {
2436 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2437 data_vault_classification = parse_data_vault_classification(s).ok();
2438 }
2439 }
2440 "tags" => {
2441 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
2442 for item in arr {
2443 if let Some(s) = item.as_str() {
2444 if let Ok(tag) = Tag::from_str(s) {
2446 tags.push(tag);
2447 } else {
2448 tags.push(Tag::Simple(s.to_string()));
2449 }
2450 }
2451 }
2452 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2453 for part in s.split(',') {
2455 let part = part.trim();
2456 if let Ok(tag) = Tag::from_str(part) {
2457 tags.push(tag);
2458 } else {
2459 tags.push(Tag::Simple(part.to_string()));
2460 }
2461 }
2462 }
2463 }
2464 "sharedDomains" | "shared_domains" => {
2465 }
2468 _ => {}
2469 }
2470 }
2471 }
2472 }
2473
2474 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
2476 for item in tags_arr {
2477 if let Some(s) = item.as_str() {
2478 let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
2480 if !tags.contains(&tag) {
2481 tags.push(tag);
2482 }
2483 }
2484 }
2485 }
2486
2487 (
2488 medallion_layers,
2489 scd_pattern,
2490 data_vault_classification,
2491 tags,
2492 )
2493 }
2494
2495 fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2497 if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
2499 && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
2500 {
2501 return server_obj
2502 .get("type")
2503 .and_then(|v| v.as_str())
2504 .and_then(|s| self.parse_database_type(s));
2505 }
2506 None
2507 }
2508
2509 fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
2511 let mut errors = Vec::new();
2512
2513 let models = data
2515 .get("models")
2516 .and_then(|v| v.as_object())
2517 .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
2518
2519 let (model_name, model_data) = models
2522 .iter()
2523 .next()
2524 .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
2525
2526 let model_data = model_data
2527 .as_object()
2528 .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
2529
2530 let fields = model_data
2532 .get("fields")
2533 .and_then(|v| v.as_object())
2534 .ok_or_else(|| {
2535 errors.push(ParserError {
2536 error_type: "validation_error".to_string(),
2537 field: format!("Model '{}'", model_name),
2538 message: format!("Model '{}' missing 'fields' field", model_name),
2539 });
2540 anyhow::anyhow!("Missing fields")
2541 });
2542
2543 let fields = match fields {
2544 Ok(f) => f,
2545 Err(_) => {
2546 let quality_rules = self.extract_quality_rules(data);
2548 let table_uuid = self.extract_table_uuid(data);
2549 let table = Table {
2550 id: table_uuid,
2551 name: model_name.clone(),
2552 columns: Vec::new(),
2553 database_type: None,
2554 catalog_name: None,
2555 schema_name: None,
2556 medallion_layers: Vec::new(),
2557 scd_pattern: None,
2558 data_vault_classification: None,
2559 modeling_level: None,
2560 tags: Vec::<Tag>::new(),
2561 odcl_metadata: HashMap::new(),
2562 owner: None,
2563 sla: None,
2564 contact_details: None,
2565 infrastructure_type: None,
2566 notes: None,
2567 position: None,
2568 yaml_file_path: None,
2569 drawio_cell_id: None,
2570 quality: quality_rules,
2571 errors: Vec::new(),
2572 created_at: chrono::Utc::now(),
2573 updated_at: chrono::Utc::now(),
2574 };
2575 return Ok((table, errors));
2576 }
2577 };
2578
2579 let mut columns = Vec::new();
2581 for (field_name, field_data) in fields {
2582 if let Some(field_obj) = field_data.as_object() {
2583 match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
2584 Ok(mut cols) => columns.append(&mut cols),
2585 Err(e) => {
2586 errors.push(ParserError {
2587 error_type: "field_parse_error".to_string(),
2588 field: format!("Field '{}'", field_name),
2589 message: e.to_string(),
2590 });
2591 }
2592 }
2593 } else {
2594 errors.push(ParserError {
2595 error_type: "validation_error".to_string(),
2596 field: format!("Field '{}'", field_name),
2597 message: format!("Field '{}' must be an object", field_name),
2598 });
2599 }
2600 }
2601
2602 let mut odcl_metadata = HashMap::new();
2605
2606 if let Some(info_val) = data.get("info") {
2609 let info_json_value = json_value_to_serde_value(info_val);
2611 odcl_metadata.insert("info".to_string(), info_json_value);
2612 }
2613
2614 odcl_metadata.insert(
2615 "dataContractSpecification".to_string(),
2616 json_value_to_serde_value(
2617 data.get("dataContractSpecification")
2618 .unwrap_or(&JsonValue::Null),
2619 ),
2620 );
2621 odcl_metadata.insert(
2622 "id".to_string(),
2623 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
2624 );
2625 if let Some(servicelevels_val) = data.get("servicelevels") {
2629 odcl_metadata.insert(
2630 "servicelevels".to_string(),
2631 json_value_to_serde_value(servicelevels_val),
2632 );
2633 }
2634
2635 if let Some(links_val) = data.get("links") {
2637 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
2638 }
2639
2640 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
2642 odcl_metadata.insert(
2643 "domain".to_string(),
2644 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
2645 );
2646 }
2647 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
2648 odcl_metadata.insert(
2649 "dataProduct".to_string(),
2650 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
2651 );
2652 }
2653 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
2654 odcl_metadata.insert(
2655 "tenant".to_string(),
2656 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
2657 );
2658 }
2659
2660 if let Some(desc_val) = data.get("description") {
2662 odcl_metadata.insert(
2663 "description".to_string(),
2664 json_value_to_serde_value(desc_val),
2665 );
2666 }
2667
2668 if let Some(pricing_val) = data.get("pricing") {
2670 odcl_metadata.insert(
2671 "pricing".to_string(),
2672 json_value_to_serde_value(pricing_val),
2673 );
2674 }
2675
2676 if let Some(team_val) = data.get("team") {
2678 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
2679 }
2680
2681 if let Some(roles_val) = data.get("roles") {
2683 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
2684 }
2685
2686 if let Some(terms_val) = data.get("terms") {
2688 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
2689 }
2690
2691 if let Some(servers_val) = data.get("servers") {
2693 odcl_metadata.insert(
2694 "servers".to_string(),
2695 json_value_to_serde_value(servers_val),
2696 );
2697 }
2698
2699 if let Some(infrastructure_val) = data.get("infrastructure") {
2701 odcl_metadata.insert(
2702 "infrastructure".to_string(),
2703 json_value_to_serde_value(infrastructure_val),
2704 );
2705 }
2706
2707 let database_type = self.extract_database_type_from_servers(data);
2709
2710 let (catalog_name, schema_name) = self.extract_catalog_schema(data);
2712
2713 let mut shared_domains: Vec<String> = Vec::new();
2715 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2716 for prop in custom_props {
2717 if let Some(prop_obj) = prop.as_object() {
2718 let prop_key = prop_obj
2719 .get("property")
2720 .and_then(|v| v.as_str())
2721 .unwrap_or("");
2722 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
2723 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
2724 {
2725 for item in arr {
2726 if let Some(s) = item.as_str() {
2727 shared_domains.push(s.to_string());
2728 }
2729 }
2730 }
2731 }
2732 }
2733 }
2734
2735 let mut tags: Vec<Tag> = Vec::new();
2737 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
2738 for item in tags_arr {
2739 if let Some(s) = item.as_str() {
2740 if let Ok(tag) = Tag::from_str(s) {
2742 tags.push(tag);
2743 } else {
2744 tags.push(crate::models::Tag::Simple(s.to_string()));
2746 }
2747 }
2748 }
2749 }
2750
2751 let quality_rules = self.extract_quality_rules(data);
2753
2754 if !shared_domains.is_empty() {
2756 let shared_domains_json: Vec<serde_json::Value> = shared_domains
2757 .iter()
2758 .map(|d| serde_json::Value::String(d.clone()))
2759 .collect();
2760 odcl_metadata.insert(
2761 "sharedDomains".to_string(),
2762 serde_json::Value::Array(shared_domains_json),
2763 );
2764 }
2765
2766 let table_uuid = self.extract_table_uuid(data);
2767
2768 let table = Table {
2769 id: table_uuid,
2770 name: model_name.clone(),
2771 columns,
2772 database_type,
2773 catalog_name,
2774 schema_name,
2775 medallion_layers: Vec::new(),
2776 scd_pattern: None,
2777 data_vault_classification: None,
2778 modeling_level: None,
2779 tags,
2780 odcl_metadata,
2781 owner: None,
2782 sla: None,
2783 contact_details: None,
2784 infrastructure_type: None,
2785 notes: None,
2786 position: None,
2787 yaml_file_path: None,
2788 drawio_cell_id: None,
2789 quality: quality_rules,
2790 errors: Vec::new(),
2791 created_at: chrono::Utc::now(),
2792 updated_at: chrono::Utc::now(),
2793 };
2794
2795 info!(
2796 "Parsed Data Contract table: {} with {} warnings/errors",
2797 model_name,
2798 errors.len()
2799 );
2800 Ok((table, errors))
2801 }
2802
2803 fn parse_data_contract_field(
2805 &self,
2806 field_name: &str,
2807 field_data: &serde_json::Map<String, JsonValue>,
2808 data: &JsonValue,
2809 errors: &mut Vec<ParserError>,
2810 ) -> Result<Vec<Column>> {
2811 let mut columns = Vec::new();
2812
2813 let extract_quality_from_obj =
2815 |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
2816 let mut quality_rules = Vec::new();
2817 if let Some(quality_val) = obj.get("quality") {
2818 if let Some(arr) = quality_val.as_array() {
2819 for item in arr {
2821 if let Some(rule_obj) = item.as_object() {
2822 let mut rule = HashMap::new();
2823 for (key, value) in rule_obj {
2824 rule.insert(key.clone(), json_value_to_serde_value(value));
2825 }
2826 quality_rules.push(rule);
2827 }
2828 }
2829 } else if let Some(rule_obj) = quality_val.as_object() {
2830 let mut rule = HashMap::new();
2832 for (key, value) in rule_obj {
2833 rule.insert(key.clone(), json_value_to_serde_value(value));
2834 }
2835 quality_rules.push(rule);
2836 }
2837 }
2838 quality_rules
2839 };
2840
2841 let description = field_data
2843 .get("description")
2844 .and_then(|v| v.as_str())
2845 .unwrap_or("")
2846 .to_string();
2847
2848 let mut quality_rules = extract_quality_from_obj(field_data);
2850
2851 if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
2853 let ref_path = Some(ref_str.to_string());
2855
2856 if let Some(definition) = resolve_ref(ref_str, data) {
2857 if quality_rules.is_empty() {
2861 if let Some(def_obj) = definition.as_object() {
2862 quality_rules = extract_quality_from_obj(def_obj);
2863 }
2864 } else {
2865 if let Some(def_obj) = definition.as_object() {
2867 let def_quality = extract_quality_from_obj(def_obj);
2868 quality_rules.extend(def_quality);
2870 }
2871 }
2872
2873 let required = field_data
2874 .get("required")
2875 .and_then(|v| v.as_bool())
2876 .unwrap_or(false);
2877
2878 let has_nested = definition
2884 .get("type")
2885 .and_then(|v| v.as_str())
2886 .map(|s| s == "object")
2887 .unwrap_or(false)
2888 || definition.get("properties").is_some()
2889 || definition.get("fields").is_some();
2890
2891 if has_nested {
2892 if let Some(properties) =
2894 definition.get("properties").and_then(|v| v.as_object())
2895 {
2896 let nested_required: Vec<String> = definition
2898 .get("required")
2899 .and_then(|v| v.as_array())
2900 .map(|arr| {
2901 arr.iter()
2902 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2903 .collect()
2904 })
2905 .unwrap_or_default();
2906
2907 for (nested_name, nested_schema) in properties {
2908 let nested_required_field = nested_required.contains(nested_name);
2909 expand_nested_column(
2910 &format!("{}.{}", field_name, nested_name),
2911 nested_schema,
2912 !nested_required_field,
2913 &mut columns,
2914 errors,
2915 );
2916 }
2917 } else if let Some(fields) =
2918 definition.get("fields").and_then(|v| v.as_object())
2919 {
2920 for (nested_name, nested_schema) in fields {
2922 expand_nested_column(
2923 &format!("{}.{}", field_name, nested_name),
2924 nested_schema,
2925 true, &mut columns,
2927 errors,
2928 );
2929 }
2930 } else {
2931 let def_physical_type = definition
2933 .get("physicalType")
2934 .and_then(|v| v.as_str())
2935 .map(|s| s.to_string());
2936 columns.push(Column {
2937 name: field_name.to_string(),
2938 data_type: "OBJECT".to_string(),
2939 physical_type: def_physical_type,
2940 nullable: !required,
2941 description: if description.is_empty() {
2942 definition
2943 .get("description")
2944 .and_then(|v| v.as_str())
2945 .unwrap_or("")
2946 .to_string()
2947 } else {
2948 description.clone()
2949 },
2950 quality: quality_rules.clone(),
2951 relationships: ref_to_relationships(&ref_path),
2952 ..Default::default()
2953 });
2954 }
2955 } else {
2956 let def_type = definition
2958 .get("type")
2959 .and_then(|v| v.as_str())
2960 .unwrap_or("STRING")
2961 .to_uppercase();
2962
2963 let enum_values = definition
2964 .get("enum")
2965 .and_then(|v| v.as_array())
2966 .map(|arr| {
2967 arr.iter()
2968 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2969 .collect()
2970 })
2971 .unwrap_or_default();
2972
2973 let def_physical_type = definition
2974 .get("physicalType")
2975 .and_then(|v| v.as_str())
2976 .map(|s| s.to_string());
2977
2978 columns.push(Column {
2979 name: field_name.to_string(),
2980 data_type: def_type,
2981 physical_type: def_physical_type,
2982 nullable: !required,
2983 description: if description.is_empty() {
2984 definition
2985 .get("description")
2986 .and_then(|v| v.as_str())
2987 .unwrap_or("")
2988 .to_string()
2989 } else {
2990 description
2991 },
2992 quality: quality_rules,
2993 relationships: ref_to_relationships(&ref_path),
2994 enum_values,
2995 ..Default::default()
2996 });
2997 }
2998 return Ok(columns);
2999 } else {
3000 let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
3002 let mut error_map = HashMap::new();
3003 error_map.insert("type".to_string(), serde_json::json!("validation_error"));
3004 error_map.insert("field".to_string(), serde_json::json!("data_type"));
3005 error_map.insert(
3006 "message".to_string(),
3007 serde_json::json!(format!(
3008 "Field '{}' references undefined definition: {}",
3009 field_name, ref_str
3010 )),
3011 );
3012 col_errors.push(error_map);
3013 let field_physical_type = field_data
3014 .get("physicalType")
3015 .and_then(|v| v.as_str())
3016 .map(|s| s.to_string());
3017 columns.push(Column {
3018 name: field_name.to_string(),
3019 data_type: "OBJECT".to_string(),
3020 physical_type: field_physical_type,
3021 description,
3022 errors: col_errors,
3023 relationships: ref_to_relationships(&Some(ref_str.to_string())),
3024 ..Default::default()
3025 });
3026 return Ok(columns);
3027 }
3028 }
3029
3030 let field_type_str = field_data
3033 .get("logicalType")
3034 .and_then(|v| v.as_str())
3035 .or_else(|| field_data.get("type").and_then(|v| v.as_str()))
3036 .unwrap_or("STRING");
3037
3038 if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
3040 match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
3041 Ok(nested_cols) if !nested_cols.is_empty() => {
3042 let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
3044 "ARRAY<STRUCT<...>>".to_string()
3045 } else {
3046 "STRUCT<...>".to_string()
3047 };
3048
3049 let struct_physical_type = field_data
3050 .get("physicalType")
3051 .and_then(|v| v.as_str())
3052 .map(|s| s.to_string());
3053
3054 columns.push(Column {
3056 name: field_name.to_string(),
3057 data_type: parent_data_type,
3058 physical_type: struct_physical_type,
3059 nullable: !field_data
3060 .get("required")
3061 .and_then(|v| v.as_bool())
3062 .unwrap_or(false),
3063 description: description.clone(),
3064 quality: quality_rules.clone(),
3065 relationships: ref_to_relationships(
3066 &field_data
3067 .get("$ref")
3068 .and_then(|v| v.as_str())
3069 .map(|s| s.to_string()),
3070 ),
3071 ..Default::default()
3072 });
3073
3074 columns.extend(nested_cols);
3076 return Ok(columns);
3077 }
3078 Ok(_) | Err(_) => {
3079 }
3081 }
3082 }
3083
3084 let field_type = normalize_data_type(field_type_str);
3085
3086 if field_type == "ARRAY" {
3088 let items = field_data.get("items");
3089 if let Some(items_val) = items {
3090 if let Some(items_obj) = items_val.as_object() {
3091 let items_type = items_obj
3094 .get("logicalType")
3095 .and_then(|v| v.as_str())
3096 .or_else(|| items_obj.get("type").and_then(|v| v.as_str()));
3097
3098 let normalized_items_type = match items_type {
3100 Some("object") | Some("struct") => Some("object"),
3101 Some("array") => Some("array"),
3102 Some("string") | Some("varchar") | Some("char") | Some("text") => {
3103 Some("string")
3104 }
3105 Some("integer") | Some("int") | Some("bigint") | Some("smallint")
3106 | Some("tinyint") => Some("integer"),
3107 Some("number") | Some("decimal") | Some("double") | Some("float")
3108 | Some("numeric") => Some("number"),
3109 Some("boolean") | Some("bool") => Some("boolean"),
3110 Some("date") => Some("date"),
3111 Some("timestamp") | Some("datetime") => Some("timestamp"),
3112 Some("time") => Some("time"),
3113 other => other,
3114 };
3115
3116 if items_obj.get("fields").is_some()
3117 || items_obj.get("properties").is_some()
3118 || normalized_items_type == Some("object")
3119 {
3120 let array_physical_type = field_data
3122 .get("physicalType")
3123 .and_then(|v| v.as_str())
3124 .map(|s| s.to_string());
3125 columns.push(Column {
3126 name: field_name.to_string(),
3127 data_type: "ARRAY<OBJECT>".to_string(),
3128 physical_type: array_physical_type,
3129 nullable: !field_data
3130 .get("required")
3131 .and_then(|v| v.as_bool())
3132 .unwrap_or(false),
3133 description: field_data
3134 .get("description")
3135 .and_then(|v| v.as_str())
3136 .unwrap_or("")
3137 .to_string(),
3138 ..Default::default()
3139 });
3140
3141 let properties_obj =
3144 items_obj.get("properties").and_then(|v| v.as_object());
3145 let properties_arr = items_obj.get("properties").and_then(|v| v.as_array());
3146 let fields_obj = items_obj.get("fields").and_then(|v| v.as_object());
3147
3148 if let Some(fields_map) = properties_obj.or(fields_obj) {
3149 for (nested_field_name, nested_field_data) in fields_map {
3151 if let Some(nested_field_obj) = nested_field_data.as_object() {
3152 let nested_field_type = nested_field_obj
3154 .get("logicalType")
3155 .and_then(|v| v.as_str())
3156 .or_else(|| {
3157 nested_field_obj.get("type").and_then(|v| v.as_str())
3158 })
3159 .unwrap_or("STRING");
3160
3161 let nested_col_name =
3163 format!("{}.[].{}", field_name, nested_field_name);
3164 let mut local_errors = Vec::new();
3165 match self.parse_data_contract_field(
3166 &nested_col_name,
3167 nested_field_obj,
3168 data,
3169 &mut local_errors,
3170 ) {
3171 Ok(mut nested_cols) => {
3172 columns.append(&mut nested_cols);
3175 }
3176 Err(_) => {
3177 let nested_physical_type = nested_field_obj
3179 .get("physicalType")
3180 .and_then(|v| v.as_str())
3181 .map(|s| s.to_string());
3182 columns.push(Column {
3183 name: nested_col_name,
3184 data_type: nested_field_type.to_uppercase(),
3185 physical_type: nested_physical_type,
3186 nullable: !nested_field_obj
3187 .get("required")
3188 .and_then(|v| v.as_bool())
3189 .unwrap_or(false),
3190 description: nested_field_obj
3191 .get("description")
3192 .and_then(|v| v.as_str())
3193 .unwrap_or("")
3194 .to_string(),
3195 ..Default::default()
3196 });
3197 }
3198 }
3199 }
3200 }
3201 } else if let Some(properties_list) = properties_arr {
3202 let mut local_errors = Vec::new();
3204 for prop_data in properties_list {
3205 if let Some(prop_obj) = prop_data.as_object() {
3206 let nested_field_name = prop_obj
3208 .get("name")
3209 .or_else(|| prop_obj.get("id"))
3210 .and_then(|v| v.as_str())
3211 .unwrap_or("");
3212
3213 if !nested_field_name.is_empty() {
3214 let nested_col_name =
3216 format!("{}.[].{}", field_name, nested_field_name);
3217 match self.parse_data_contract_field(
3218 &nested_col_name,
3219 prop_obj,
3220 data,
3221 &mut local_errors,
3222 ) {
3223 Ok(mut nested_cols) => {
3224 columns.append(&mut nested_cols);
3227 }
3228 Err(_) => {
3229 let nested_field_type = prop_obj
3232 .get("logicalType")
3233 .and_then(|v| v.as_str())
3234 .or_else(|| {
3235 prop_obj
3236 .get("type")
3237 .and_then(|v| v.as_str())
3238 })
3239 .unwrap_or("STRING");
3240 let nested_physical_type = prop_obj
3241 .get("physicalType")
3242 .and_then(|v| v.as_str())
3243 .map(|s| s.to_string());
3244 columns.push(Column {
3245 name: nested_col_name,
3246 data_type: nested_field_type.to_uppercase(),
3247 physical_type: nested_physical_type,
3248 nullable: !prop_obj
3249 .get("required")
3250 .and_then(|v| v.as_bool())
3251 .unwrap_or(false),
3252 description: prop_obj
3253 .get("description")
3254 .and_then(|v| v.as_str())
3255 .unwrap_or("")
3256 .to_string(),
3257 ..Default::default()
3258 });
3259 }
3260 }
3261 }
3262 }
3263 }
3264 }
3265
3266 return Ok(columns);
3267 } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
3268 let array_physical_type = field_data
3270 .get("physicalType")
3271 .and_then(|v| v.as_str())
3272 .map(|s| s.to_string());
3273 columns.push(Column {
3274 name: field_name.to_string(),
3275 data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
3276 physical_type: array_physical_type,
3277 nullable: !field_data
3278 .get("required")
3279 .and_then(|v| v.as_bool())
3280 .unwrap_or(false),
3281 description: description.clone(),
3282 quality: quality_rules.clone(),
3283 relationships: ref_to_relationships(
3284 &field_data
3285 .get("$ref")
3286 .and_then(|v| v.as_str())
3287 .map(|s| s.to_string()),
3288 ),
3289 ..Default::default()
3290 });
3291 return Ok(columns);
3292 }
3293 } else if let Some(item_type_str) = items_val.as_str() {
3294 let array_physical_type = field_data
3296 .get("physicalType")
3297 .and_then(|v| v.as_str())
3298 .map(|s| s.to_string());
3299 columns.push(Column {
3300 name: field_name.to_string(),
3301 data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
3302 physical_type: array_physical_type,
3303 nullable: !field_data
3304 .get("required")
3305 .and_then(|v| v.as_bool())
3306 .unwrap_or(false),
3307 description: description.clone(),
3308 quality: quality_rules.clone(),
3309 relationships: ref_to_relationships(
3310 &field_data
3311 .get("$ref")
3312 .and_then(|v| v.as_str())
3313 .map(|s| s.to_string()),
3314 ),
3315 ..Default::default()
3316 });
3317 return Ok(columns);
3318 }
3319 }
3320 let array_physical_type = field_data
3322 .get("physicalType")
3323 .and_then(|v| v.as_str())
3324 .map(|s| s.to_string());
3325 columns.push(Column {
3326 name: field_name.to_string(),
3327 data_type: "ARRAY<STRING>".to_string(),
3328 physical_type: array_physical_type,
3329 nullable: !field_data
3330 .get("required")
3331 .and_then(|v| v.as_bool())
3332 .unwrap_or(false),
3333 description: description.clone(),
3334 quality: quality_rules.clone(),
3335 relationships: ref_to_relationships(
3336 &field_data
3337 .get("$ref")
3338 .and_then(|v| v.as_str())
3339 .map(|s| s.to_string()),
3340 ),
3341 ..Default::default()
3342 });
3343 return Ok(columns);
3344 }
3345
3346 let nested_fields_obj = field_data
3350 .get("properties")
3351 .and_then(|v| v.as_object())
3352 .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
3353 let nested_fields_arr = field_data.get("properties").and_then(|v| v.as_array());
3354
3355 if field_type == "OBJECT" && (nested_fields_obj.is_some() || nested_fields_arr.is_some()) {
3356 let parent_physical_type = field_data
3360 .get("physicalType")
3361 .and_then(|v| v.as_str())
3362 .map(|s| s.to_string());
3363
3364 columns.push(Column {
3366 name: field_name.to_string(),
3367 data_type: "OBJECT".to_string(),
3368 physical_type: parent_physical_type,
3369 nullable: !field_data
3370 .get("required")
3371 .and_then(|v| v.as_bool())
3372 .unwrap_or(false),
3373 description: description.clone(),
3374 quality: quality_rules.clone(),
3375 relationships: ref_to_relationships(
3376 &field_data
3377 .get("$ref")
3378 .and_then(|v| v.as_str())
3379 .map(|s| s.to_string()),
3380 ),
3381 ..Default::default()
3382 });
3383
3384 if let Some(fields_obj) = nested_fields_obj {
3386 for (nested_field_name, nested_field_data) in fields_obj {
3388 if let Some(nested_field_obj) = nested_field_data.as_object() {
3389 let nested_field_type = nested_field_obj
3390 .get("logicalType")
3391 .and_then(|v| v.as_str())
3392 .or_else(|| nested_field_obj.get("type").and_then(|v| v.as_str()))
3393 .unwrap_or("STRING");
3394
3395 let nested_col_name = format!("{}.{}", field_name, nested_field_name);
3397 match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data)
3398 {
3399 Ok(mut nested_cols) => {
3400 columns.append(&mut nested_cols);
3403 }
3404 Err(_) => {
3405 let nested_physical_type = nested_field_obj
3407 .get("physicalType")
3408 .and_then(|v| v.as_str())
3409 .map(|s| s.to_string());
3410 columns.push(Column {
3411 name: nested_col_name,
3412 data_type: nested_field_type.to_uppercase(),
3413 physical_type: nested_physical_type,
3414 nullable: !nested_field_obj
3415 .get("required")
3416 .and_then(|v| v.as_bool())
3417 .unwrap_or(false),
3418 description: nested_field_obj
3419 .get("description")
3420 .and_then(|v| v.as_str())
3421 .unwrap_or("")
3422 .to_string(),
3423 ..Default::default()
3424 });
3425 }
3426 }
3427 }
3428 }
3429 } else if let Some(fields_arr) = nested_fields_arr {
3430 for prop_data in fields_arr {
3432 if let Some(prop_obj) = prop_data.as_object() {
3433 let nested_field_name = prop_obj
3435 .get("name")
3436 .or_else(|| prop_obj.get("id"))
3437 .and_then(|v| v.as_str())
3438 .unwrap_or("");
3439
3440 if !nested_field_name.is_empty() {
3441 let nested_field_type = prop_obj
3442 .get("logicalType")
3443 .and_then(|v| v.as_str())
3444 .or_else(|| prop_obj.get("type").and_then(|v| v.as_str()))
3445 .unwrap_or("STRING");
3446
3447 let nested_col_name = format!("{}.{}", field_name, nested_field_name);
3449 match self.parse_odcl_v3_property(&nested_col_name, prop_obj, data) {
3450 Ok(mut nested_cols) => {
3451 columns.append(&mut nested_cols);
3454 }
3455 Err(_) => {
3456 let nested_physical_type = prop_obj
3458 .get("physicalType")
3459 .and_then(|v| v.as_str())
3460 .map(|s| s.to_string());
3461 columns.push(Column {
3462 name: nested_col_name,
3463 data_type: nested_field_type.to_uppercase(),
3464 physical_type: nested_physical_type,
3465 nullable: !prop_obj
3466 .get("required")
3467 .and_then(|v| v.as_bool())
3468 .unwrap_or(false),
3469 description: prop_obj
3470 .get("description")
3471 .and_then(|v| v.as_str())
3472 .unwrap_or("")
3473 .to_string(),
3474 ..Default::default()
3475 });
3476 }
3477 }
3478 }
3479 }
3480 }
3481 }
3482
3483 return Ok(columns);
3484 }
3485
3486 let required = field_data
3488 .get("required")
3489 .and_then(|v| v.as_bool())
3490 .unwrap_or(false);
3491
3492 let field_description = if description.is_empty() {
3494 field_data
3495 .get("description")
3496 .and_then(|v| v.as_str())
3497 .unwrap_or("")
3498 .to_string()
3499 } else {
3500 description
3501 };
3502
3503 let mut column_quality_rules = quality_rules;
3505
3506 if column_quality_rules.is_empty()
3508 && let Some(quality_val) = field_data.get("quality")
3509 {
3510 if let Some(arr) = quality_val.as_array() {
3511 for item in arr {
3513 if let Some(obj) = item.as_object() {
3514 let mut rule = HashMap::new();
3515 for (key, value) in obj {
3516 rule.insert(key.clone(), json_value_to_serde_value(value));
3517 }
3518 column_quality_rules.push(rule);
3519 }
3520 }
3521 } else if let Some(obj) = quality_val.as_object() {
3522 let mut rule = HashMap::new();
3524 for (key, value) in obj {
3525 rule.insert(key.clone(), json_value_to_serde_value(value));
3526 }
3527 column_quality_rules.push(rule);
3528 }
3529 }
3530
3531 let physical_type = field_data
3537 .get("physicalType")
3538 .and_then(|v| v.as_str())
3539 .map(|s| s.to_string());
3540
3541 let column = self.parse_column_metadata_from_field(
3543 field_name,
3544 &field_type,
3545 physical_type,
3546 !required,
3547 field_description,
3548 column_quality_rules,
3549 field_data,
3550 );
3551
3552 columns.push(column);
3553
3554 Ok(columns)
3555 }
3556
3557 #[allow(clippy::too_many_arguments)]
3566 fn parse_column_metadata_from_field(
3567 &self,
3568 name: &str,
3569 data_type: &str,
3570 physical_type: Option<String>,
3571 nullable: bool,
3572 description: String,
3573 quality: Vec<HashMap<String, serde_json::Value>>,
3574 field_data: &serde_json::Map<String, JsonValue>,
3575 ) -> Column {
3576 use crate::models::{AuthoritativeDefinition, LogicalTypeOptions};
3577
3578 let business_name = field_data
3580 .get("businessName")
3581 .and_then(|v| v.as_str())
3582 .map(|s| s.to_string());
3583
3584 let physical_name = field_data
3586 .get("physicalName")
3587 .and_then(|v| v.as_str())
3588 .map(|s| s.to_string());
3589
3590 let logical_type_options = field_data.get("logicalTypeOptions").and_then(|v| {
3592 v.as_object().map(|opts| LogicalTypeOptions {
3593 min_length: opts.get("minLength").and_then(|v| v.as_i64()),
3594 max_length: opts.get("maxLength").and_then(|v| v.as_i64()),
3595 pattern: opts
3596 .get("pattern")
3597 .and_then(|v| v.as_str())
3598 .map(|s| s.to_string()),
3599 format: opts
3600 .get("format")
3601 .and_then(|v| v.as_str())
3602 .map(|s| s.to_string()),
3603 minimum: opts.get("minimum").cloned(),
3604 maximum: opts.get("maximum").cloned(),
3605 exclusive_minimum: opts.get("exclusiveMinimum").cloned(),
3606 exclusive_maximum: opts.get("exclusiveMaximum").cloned(),
3607 precision: opts
3608 .get("precision")
3609 .and_then(|v| v.as_i64())
3610 .map(|n| n as i32),
3611 scale: opts.get("scale").and_then(|v| v.as_i64()).map(|n| n as i32),
3612 })
3613 });
3614
3615 let primary_key = field_data
3617 .get("primaryKey")
3618 .and_then(|v| v.as_bool())
3619 .unwrap_or(false);
3620
3621 let primary_key_position = field_data
3623 .get("primaryKeyPosition")
3624 .and_then(|v| v.as_i64())
3625 .map(|n| n as i32);
3626
3627 let unique = field_data
3629 .get("unique")
3630 .and_then(|v| v.as_bool())
3631 .unwrap_or(false);
3632
3633 let partitioned = field_data
3635 .get("partitioned")
3636 .and_then(|v| v.as_bool())
3637 .unwrap_or(false);
3638
3639 let partition_key_position = field_data
3641 .get("partitionKeyPosition")
3642 .and_then(|v| v.as_i64())
3643 .map(|n| n as i32);
3644
3645 let clustered = field_data
3647 .get("clustered")
3648 .and_then(|v| v.as_bool())
3649 .unwrap_or(false);
3650
3651 let classification = field_data
3653 .get("classification")
3654 .and_then(|v| v.as_str())
3655 .map(|s| s.to_string());
3656
3657 let critical_data_element = field_data
3659 .get("criticalDataElement")
3660 .and_then(|v| v.as_bool())
3661 .unwrap_or(false);
3662
3663 let encrypted_name = field_data
3665 .get("encryptedName")
3666 .and_then(|v| v.as_str())
3667 .map(|s| s.to_string());
3668
3669 let transform_source_objects = field_data
3671 .get("transformSourceObjects")
3672 .and_then(|v| v.as_array())
3673 .map(|arr| {
3674 arr.iter()
3675 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3676 .collect()
3677 })
3678 .unwrap_or_default();
3679
3680 let transform_logic = field_data
3682 .get("transformLogic")
3683 .and_then(|v| v.as_str())
3684 .map(|s| s.to_string());
3685
3686 let transform_description = field_data
3688 .get("transformDescription")
3689 .and_then(|v| v.as_str())
3690 .map(|s| s.to_string());
3691
3692 let examples = field_data
3694 .get("examples")
3695 .and_then(|v| v.as_array())
3696 .map(|arr| arr.to_vec())
3697 .unwrap_or_default();
3698
3699 let authoritative_definitions = field_data
3701 .get("authoritativeDefinitions")
3702 .and_then(|v| v.as_array())
3703 .map(|arr| {
3704 arr.iter()
3705 .filter_map(|item| {
3706 item.as_object().map(|obj| AuthoritativeDefinition {
3707 definition_type: obj
3708 .get("type")
3709 .and_then(|v| v.as_str())
3710 .unwrap_or("")
3711 .to_string(),
3712 url: obj
3713 .get("url")
3714 .and_then(|v| v.as_str())
3715 .unwrap_or("")
3716 .to_string(),
3717 })
3718 })
3719 .collect()
3720 })
3721 .unwrap_or_default();
3722
3723 let tags = field_data
3725 .get("tags")
3726 .and_then(|v| v.as_array())
3727 .map(|arr| {
3728 arr.iter()
3729 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3730 .collect()
3731 })
3732 .unwrap_or_default();
3733
3734 let custom_properties = field_data
3736 .get("customProperties")
3737 .and_then(|v| v.as_array())
3738 .map(|arr| {
3739 arr.iter()
3740 .filter_map(|item| {
3741 item.as_object().and_then(|obj| {
3742 let key = obj.get("property").and_then(|v| v.as_str())?;
3743 let value = obj.get("value").cloned()?;
3744 Some((key.to_string(), value))
3745 })
3746 })
3747 .collect()
3748 })
3749 .unwrap_or_default();
3750
3751 let secondary_key = field_data
3753 .get("businessKey")
3754 .and_then(|v| v.as_bool())
3755 .unwrap_or(false);
3756
3757 let enum_values = field_data
3759 .get("enum")
3760 .and_then(|v| v.as_array())
3761 .map(|arr| {
3762 arr.iter()
3763 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3764 .collect()
3765 })
3766 .unwrap_or_default();
3767
3768 let constraints = field_data
3770 .get("constraints")
3771 .and_then(|v| v.as_array())
3772 .map(|arr| {
3773 arr.iter()
3774 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3775 .collect()
3776 })
3777 .unwrap_or_default();
3778
3779 Column {
3780 name: name.to_string(),
3781 data_type: data_type.to_string(),
3782 physical_type,
3783 physical_name,
3784 nullable,
3785 description,
3786 quality,
3787 business_name,
3788 logical_type_options,
3789 primary_key,
3790 primary_key_position,
3791 unique,
3792 partitioned,
3793 partition_key_position,
3794 clustered,
3795 classification,
3796 critical_data_element,
3797 encrypted_name,
3798 transform_source_objects,
3799 transform_logic,
3800 transform_description,
3801 examples,
3802 authoritative_definitions,
3803 tags,
3804 custom_properties,
3805 secondary_key,
3806 enum_values,
3807 constraints,
3808 foreign_key: self.parse_foreign_key_from_data_contract(field_data),
3809 relationships: self.parse_relationships_from_field(field_data),
3810 ..Default::default()
3811 }
3812 }
3813
3814 fn parse_foreign_key_from_data_contract(
3816 &self,
3817 field_data: &serde_json::Map<String, JsonValue>,
3818 ) -> Option<ForeignKey> {
3819 field_data
3820 .get("foreignKey")
3821 .and_then(|v| v.as_object())
3822 .map(|fk_obj| ForeignKey {
3823 table_id: fk_obj
3824 .get("table")
3825 .or_else(|| fk_obj.get("table_id"))
3826 .and_then(|v| v.as_str())
3827 .unwrap_or("")
3828 .to_string(),
3829 column_name: fk_obj
3830 .get("column")
3831 .or_else(|| fk_obj.get("column_name"))
3832 .and_then(|v| v.as_str())
3833 .unwrap_or("")
3834 .to_string(),
3835 })
3836 }
3837
3838 fn parse_relationships_from_field(
3841 &self,
3842 field_data: &serde_json::Map<String, JsonValue>,
3843 ) -> Vec<PropertyRelationship> {
3844 let mut relationships = Vec::new();
3845
3846 if let Some(rels_array) = field_data.get("relationships").and_then(|v| v.as_array()) {
3848 for rel in rels_array {
3849 if let Some(rel_obj) = rel.as_object() {
3850 let rel_type = rel_obj
3851 .get("type")
3852 .and_then(|v| v.as_str())
3853 .unwrap_or("foreignKey")
3854 .to_string();
3855 let to = rel_obj
3856 .get("to")
3857 .and_then(|v| v.as_str())
3858 .unwrap_or("")
3859 .to_string();
3860
3861 if !to.is_empty() {
3862 relationships.push(PropertyRelationship {
3863 relationship_type: rel_type,
3864 to,
3865 });
3866 }
3867 }
3868 }
3869 }
3870
3871 if relationships.is_empty()
3873 && let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str())
3874 {
3875 let to = if ref_str.starts_with("#/definitions/") {
3877 let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
3878 format!("definitions/{}", def_path)
3879 } else if ref_str.starts_with("#/") {
3880 ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
3881 } else {
3882 ref_str.to_string()
3883 };
3884
3885 relationships.push(PropertyRelationship {
3886 relationship_type: "foreignKey".to_string(),
3887 to,
3888 });
3889 }
3890
3891 relationships
3892 }
3893
3894 fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
3896 if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
3898 if let Some((_, server_data)) = servers_obj.iter().next()
3900 && let Some(server_obj) = server_data.as_object()
3901 {
3902 return server_obj
3903 .get("type")
3904 .and_then(|v| v.as_str())
3905 .and_then(|s| self.parse_database_type(s));
3906 }
3907 } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
3908 if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
3910 return server_obj
3911 .get("type")
3912 .and_then(|v| v.as_str())
3913 .and_then(|s| self.parse_database_type(s));
3914 }
3915 }
3916 None
3917 }
3918
3919 fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
3921 match s.to_lowercase().as_str() {
3922 "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
3923 "postgres" | "postgresql" => Some(DatabaseType::Postgres),
3924 "mysql" => Some(DatabaseType::Mysql),
3925 "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
3926 "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
3927 _ => None,
3928 }
3929 }
3930
3931 pub fn parse_struct_type_from_string(
3935 &self,
3936 field_name: &str,
3937 type_str: &str,
3938 field_data: &serde_json::Map<String, JsonValue>,
3939 ) -> Result<Vec<Column>> {
3940 let mut columns = Vec::new();
3941
3942 let normalized_type = type_str
3944 .lines()
3945 .map(|line| line.trim())
3946 .filter(|line| !line.is_empty())
3947 .collect::<Vec<_>>()
3948 .join(" ");
3949
3950 let type_str_upper = normalized_type.to_uppercase();
3951
3952 let is_array = type_str_upper.starts_with("ARRAY<");
3954 let struct_start = type_str_upper.find("STRUCT<");
3955
3956 if let Some(start_pos) = struct_start {
3957 let struct_content_start = start_pos + 7; let struct_content = &normalized_type[struct_content_start..];
3961
3962 let mut depth = 1;
3965 let mut end_pos = None;
3966 for (i, ch) in struct_content.char_indices() {
3967 match ch {
3968 '<' => depth += 1,
3969 '>' => {
3970 depth -= 1;
3971 if depth == 0 {
3972 end_pos = Some(i);
3973 break;
3974 }
3975 }
3976 _ => {}
3977 }
3978 }
3979
3980 let struct_fields_str = if let Some(end) = end_pos {
3982 &struct_content[..end]
3983 } else {
3984 struct_content.trim_end_matches('>').trim()
3986 };
3987
3988 let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
3990
3991 for (nested_name, nested_type) in fields {
3995 let nested_type_upper = nested_type.to_uppercase();
3996 let nested_col_name = if is_array {
3997 format!("{}.[].{}", field_name, nested_name)
3998 } else {
3999 format!("{}.{}", field_name, nested_name)
4000 };
4001
4002 let is_nested_struct = nested_type_upper.starts_with("STRUCT<");
4005 let is_nested_array_struct = nested_type_upper.starts_with("ARRAY<STRUCT<");
4006
4007 if is_nested_struct || is_nested_array_struct {
4008 match self.parse_struct_type_from_string(
4016 &nested_col_name,
4017 &nested_type,
4018 field_data,
4019 ) {
4020 Ok(nested_cols) => {
4021 columns.extend(nested_cols);
4028 }
4029 Err(_) => {
4030 let fallback_data_type = if is_nested_array_struct {
4032 "ARRAY<STRUCT<...>>".to_string()
4033 } else {
4034 "STRUCT<...>".to_string()
4035 };
4036 let nested_physical_type = field_data
4037 .get("physicalType")
4038 .and_then(|v| v.as_str())
4039 .map(|s| s.to_string());
4040 columns.push(Column {
4041 name: nested_col_name,
4042 data_type: fallback_data_type,
4043 physical_type: nested_physical_type,
4044 nullable: !field_data
4045 .get("required")
4046 .and_then(|v| v.as_bool())
4047 .unwrap_or(false),
4048 description: field_data
4049 .get("description")
4050 .and_then(|v| v.as_str())
4051 .unwrap_or("")
4052 .to_string(),
4053 ..Default::default()
4054 });
4055 }
4056 }
4057 } else if nested_type_upper.starts_with("ARRAY<") {
4058 let nested_physical_type = field_data
4061 .get("physicalType")
4062 .and_then(|v| v.as_str())
4063 .map(|s| s.to_string());
4064 columns.push(Column {
4065 name: nested_col_name,
4066 data_type: normalize_data_type(&nested_type),
4067 physical_type: nested_physical_type,
4068 nullable: !field_data
4069 .get("required")
4070 .and_then(|v| v.as_bool())
4071 .unwrap_or(false),
4072 description: field_data
4073 .get("description")
4074 .and_then(|v| v.as_str())
4075 .unwrap_or("")
4076 .to_string(),
4077 ..Default::default()
4078 });
4079 } else {
4080 let nested_physical_type = field_data
4082 .get("physicalType")
4083 .and_then(|v| v.as_str())
4084 .map(|s| s.to_string());
4085 columns.push(Column {
4086 name: nested_col_name,
4087 data_type: normalize_data_type(&nested_type),
4088 physical_type: nested_physical_type,
4089 nullable: !field_data
4090 .get("required")
4091 .and_then(|v| v.as_bool())
4092 .unwrap_or(false),
4093 description: field_data
4094 .get("description")
4095 .and_then(|v| v.as_str())
4096 .unwrap_or("")
4097 .to_string(),
4098 ..Default::default()
4099 });
4100 }
4101 }
4102
4103 return Ok(columns);
4104 }
4105
4106 Ok(Vec::new())
4108 }
4109
4110 fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
4112 let mut fields = Vec::new();
4113 let mut current_field = String::new();
4114 let mut depth = 0;
4115 let mut in_string = false;
4116 let mut string_char = None;
4117
4118 for ch in fields_str.chars() {
4119 match ch {
4120 '\'' | '"' if !in_string || Some(ch) == string_char => {
4121 if in_string {
4122 in_string = false;
4123 string_char = None;
4124 } else {
4125 in_string = true;
4126 string_char = Some(ch);
4127 }
4128 current_field.push(ch);
4129 }
4130 '<' if !in_string => {
4131 depth += 1;
4132 current_field.push(ch);
4133 }
4134 '>' if !in_string => {
4135 depth -= 1;
4136 current_field.push(ch);
4137 }
4138 ',' if !in_string && depth == 0 => {
4139 let trimmed = current_field.trim();
4141 if !trimmed.is_empty()
4142 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
4143 {
4144 fields.push((name, type_part));
4145 }
4146 current_field.clear();
4147 }
4148 _ => {
4149 current_field.push(ch);
4150 }
4151 }
4152 }
4153
4154 let trimmed = current_field.trim();
4156 if !trimmed.is_empty()
4157 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
4158 {
4159 fields.push((name, type_part));
4160 }
4161
4162 Ok(fields)
4163 }
4164
4165 fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
4167 let colon_pos = field_def.find(':')?;
4169 let name = field_def[..colon_pos].trim().to_string();
4170 let type_part = field_def[colon_pos + 1..].trim().to_string();
4171
4172 if name.is_empty() || type_part.is_empty() {
4173 return None;
4174 }
4175
4176 Some((name, type_part))
4177 }
4178
4179 fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
4181 let mut catalog_name = None;
4182 let mut schema_name = None;
4183
4184 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
4185 for prop in custom_props {
4186 if let Some(prop_obj) = prop.as_object() {
4187 let prop_key = prop_obj
4188 .get("property")
4189 .and_then(|v| v.as_str())
4190 .unwrap_or("");
4191 let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
4192
4193 match prop_key {
4194 "catalogName" | "catalog_name" => {
4195 catalog_name = prop_value.map(|s| s.to_string());
4196 }
4197 "schemaName" | "schema_name" => {
4198 schema_name = prop_value.map(|s| s.to_string());
4199 }
4200 _ => {}
4201 }
4202 }
4203 }
4204 }
4205
4206 if catalog_name.is_none() {
4208 catalog_name = data
4209 .get("catalog_name")
4210 .and_then(|v| v.as_str())
4211 .map(|s| s.to_string());
4212 }
4213 if schema_name.is_none() {
4214 schema_name = data
4215 .get("schema_name")
4216 .and_then(|v| v.as_str())
4217 .map(|s| s.to_string());
4218 }
4219
4220 (catalog_name, schema_name)
4221 }
4222}
4223
4224impl Default for ODCSImporter {
4225 fn default() -> Self {
4226 Self::new()
4227 }
4228}
4229
4230#[cfg(test)]
4231mod tests {
4232 use super::*;
4233
4234 #[test]
4235 fn test_parse_simple_odcl_table() {
4236 let mut parser = ODCSImporter::new();
4237 let odcl_yaml = r#"
4238name: users
4239columns:
4240 - name: id
4241 data_type: INT
4242 nullable: false
4243 primary_key: true
4244 - name: name
4245 data_type: VARCHAR(255)
4246 nullable: false
4247database_type: Postgres
4248"#;
4249
4250 let (table, errors) = parser.parse(odcl_yaml).unwrap();
4251 assert_eq!(table.name, "users");
4252 assert_eq!(table.columns.len(), 2);
4253 assert_eq!(table.columns[0].name, "id");
4254 assert_eq!(table.database_type, Some(DatabaseType::Postgres));
4255 assert_eq!(errors.len(), 0);
4256 }
4257
4258 #[test]
4259 fn test_parse_odcl_with_metadata() {
4260 let mut parser = ODCSImporter::new();
4261 let odcl_yaml = r#"
4262name: users
4263columns:
4264 - name: id
4265 data_type: INT
4266medallion_layer: gold
4267scd_pattern: TYPE_2
4268odcl_metadata:
4269 description: "User table"
4270 owner: "data-team"
4271"#;
4272
4273 let (table, errors) = parser.parse(odcl_yaml).unwrap();
4274 assert_eq!(table.medallion_layers.len(), 1);
4275 assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
4276 assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
4277 if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
4278 assert_eq!(desc, "User table");
4279 }
4280 assert_eq!(errors.len(), 0);
4281 }
4282
4283 #[test]
4284 fn test_parse_odcl_with_data_vault() {
4285 let mut parser = ODCSImporter::new();
4286 let odcl_yaml = r#"
4287name: hub_customer
4288columns:
4289 - name: customer_key
4290 data_type: VARCHAR(50)
4291data_vault_classification: Hub
4292"#;
4293
4294 let (table, errors) = parser.parse(odcl_yaml).unwrap();
4295 assert_eq!(
4296 table.data_vault_classification,
4297 Some(DataVaultClassification::Hub)
4298 );
4299 assert_eq!(errors.len(), 0);
4300 }
4301
4302 #[test]
4303 fn test_parse_invalid_odcl() {
4304 let mut parser = ODCSImporter::new();
4305 let invalid_yaml = "not: valid: yaml: structure:";
4306
4307 assert!(parser.parse(invalid_yaml).is_err());
4309 }
4310
4311 #[test]
4312 fn test_parse_odcl_missing_required_fields() {
4313 let mut parser = ODCSImporter::new();
4314 let non_conformant = r#"
4315name: users
4316# Missing required columns field
4317"#;
4318
4319 assert!(parser.parse(non_conformant).is_err());
4321 }
4322
4323 #[test]
4324 fn test_parse_odcl_with_foreign_key() {
4325 let mut parser = ODCSImporter::new();
4326 let odcl_yaml = r#"
4327name: orders
4328columns:
4329 - name: id
4330 data_type: INT
4331 primary_key: true
4332 - name: user_id
4333 data_type: INT
4334 foreign_key:
4335 table_id: users
4336 column_name: id
4337"#;
4338
4339 let (table, errors) = parser.parse(odcl_yaml).unwrap();
4340 assert_eq!(table.columns.len(), 2);
4341 let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
4342 assert!(user_id_col.foreign_key.is_some());
4343 assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
4344 assert_eq!(errors.len(), 0);
4345 }
4346
4347 #[test]
4348 fn test_parse_odcl_with_constraints() {
4349 let mut parser = ODCSImporter::new();
4350 let odcl_yaml = r#"
4351name: products
4352columns:
4353 - name: id
4354 data_type: INT
4355 primary_key: true
4356 - name: name
4357 data_type: VARCHAR(255)
4358 nullable: false
4359 constraints:
4360 - UNIQUE
4361 - NOT NULL
4362"#;
4363
4364 let (table, errors) = parser.parse(odcl_yaml).unwrap();
4365 assert_eq!(table.columns.len(), 2);
4366 let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
4367 assert!(!name_col.nullable);
4368 assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
4369 assert_eq!(errors.len(), 0);
4370 }
4371
4372 #[test]
4373 fn test_parse_odcs_v3_multiple_tables() {
4374 let mut parser = ODCSImporter::new();
4375 let odcs_yaml = r#"
4376apiVersion: v3.1.0
4377kind: DataContract
4378id: 550e8400-e29b-41d4-a716-446655440000
4379version: 1.0.0
4380name: multi_table_contract
4381domain: sales
4382schema:
4383 - name: orders
4384 properties:
4385 - name: order_id
4386 logicalType: integer
4387 primaryKey: true
4388 - name: customer_id
4389 logicalType: integer
4390 - name: total_amount
4391 logicalType: decimal
4392 - name: order_items
4393 properties:
4394 - name: item_id
4395 logicalType: integer
4396 primaryKey: true
4397 - name: order_id
4398 logicalType: integer
4399 - name: product_name
4400 logicalType: string
4401 - name: quantity
4402 logicalType: integer
4403 - name: customers
4404 properties:
4405 - name: customer_id
4406 logicalType: integer
4407 primaryKey: true
4408 - name: name
4409 logicalType: string
4410 - name: email
4411 logicalType: string
4412"#;
4413
4414 let result = parser.import(odcs_yaml).unwrap();
4415
4416 assert_eq!(
4418 result.tables.len(),
4419 3,
4420 "Expected 3 tables, got {}",
4421 result.tables.len()
4422 );
4423
4424 let table_names: Vec<&str> = result
4426 .tables
4427 .iter()
4428 .map(|t| t.name.as_deref().unwrap_or(""))
4429 .collect();
4430 assert!(table_names.contains(&"orders"), "Missing 'orders' table");
4431 assert!(
4432 table_names.contains(&"order_items"),
4433 "Missing 'order_items' table"
4434 );
4435 assert!(
4436 table_names.contains(&"customers"),
4437 "Missing 'customers' table"
4438 );
4439
4440 let orders_table = result
4442 .tables
4443 .iter()
4444 .find(|t| t.name.as_deref() == Some("orders"))
4445 .unwrap();
4446 assert_eq!(
4447 orders_table.columns.len(),
4448 3,
4449 "orders table should have 3 columns"
4450 );
4451
4452 let order_items_table = result
4453 .tables
4454 .iter()
4455 .find(|t| t.name.as_deref() == Some("order_items"))
4456 .unwrap();
4457 assert_eq!(
4458 order_items_table.columns.len(),
4459 4,
4460 "order_items table should have 4 columns"
4461 );
4462
4463 let customers_table = result
4464 .tables
4465 .iter()
4466 .find(|t| t.name.as_deref() == Some("customers"))
4467 .unwrap();
4468 assert_eq!(
4469 customers_table.columns.len(),
4470 3,
4471 "customers table should have 3 columns"
4472 );
4473
4474 for table in &result.tables {
4476 assert_eq!(table.api_version.as_deref(), Some("v3.1.0"));
4477 assert_eq!(table.kind.as_deref(), Some("DataContract"));
4478 assert_eq!(table.domain.as_deref(), Some("sales"));
4479 }
4480
4481 assert_eq!(result.tables[0].table_index, 0);
4483 assert_eq!(result.tables[1].table_index, 1);
4484 assert_eq!(result.tables[2].table_index, 2);
4485 }
4486
4487 #[test]
4488 fn test_parse_odcs_v3_single_table_backwards_compatible() {
4489 let mut parser = ODCSImporter::new();
4491 let odcs_yaml = r#"
4492apiVersion: v3.1.0
4493kind: DataContract
4494id: 550e8400-e29b-41d4-a716-446655440001
4495version: 1.0.0
4496name: single_table_contract
4497schema:
4498 - name: users
4499 properties:
4500 - name: user_id
4501 logicalType: integer
4502 primaryKey: true
4503 - name: username
4504 logicalType: string
4505"#;
4506
4507 let result = parser.import(odcs_yaml).unwrap();
4508
4509 assert_eq!(
4511 result.tables.len(),
4512 1,
4513 "Expected 1 table for single-schema ODCS"
4514 );
4515 assert_eq!(result.tables[0].name.as_deref(), Some("users"));
4516 assert_eq!(result.tables[0].columns.len(), 2);
4517 }
4518}