1use super::odcs_shared::{
12 ParserError, column_to_column_data, expand_nested_column, json_value_to_serde_value,
13 normalize_data_type, parse_data_vault_classification, parse_medallion_layer, parse_scd_pattern,
14 resolve_ref, yaml_to_json_value,
15};
16use super::{ImportError, ImportResult, TableData};
17use crate::models::column::ForeignKey;
18use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
19use crate::models::{Column, PropertyRelationship, Table, Tag};
20use anyhow::{Context, Result};
21use serde_json::Value as JsonValue;
22use std::collections::HashMap;
23use std::str::FromStr;
24use tracing::info;
25
26pub use super::odcs_shared::ParserError as OdcsParserError;
28
29fn ref_to_relationships(ref_path: &Option<String>) -> Vec<PropertyRelationship> {
32 match ref_path {
33 Some(ref_str) => {
34 let to = if ref_str.starts_with("#/definitions/") {
35 let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
36 format!("definitions/{}", def_path)
37 } else if ref_str.starts_with("#/") {
38 ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
39 } else {
40 ref_str.clone()
41 };
42 vec![PropertyRelationship {
43 relationship_type: "foreignKey".to_string(),
44 to,
45 }]
46 }
47 None => Vec::new(),
48 }
49}
50
51pub struct ODCSImporter {
54 current_yaml_data: Option<serde_yaml::Value>,
56}
57
58impl ODCSImporter {
59 pub fn new() -> Self {
69 Self {
70 current_yaml_data: None,
71 }
72 }
73
74 pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
108 let yaml_data: serde_yaml::Value = serde_yaml::from_str(yaml_content)
110 .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
111
112 let json_data = yaml_to_json_value(&yaml_data).map_err(|e| {
113 ImportError::ParseError(format!("Failed to convert YAML to JSON: {}", e))
114 })?;
115
116 self.current_yaml_data = Some(yaml_data);
118
119 if self.is_odcl_v3_format(&json_data) {
121 match self.parse_odcl_v3_all(&json_data) {
123 Ok(tables_with_errors) => {
124 let mut sdk_tables = Vec::new();
125 let mut all_errors = Vec::new();
126
127 for (table_index, (table, errors)) in tables_with_errors.into_iter().enumerate()
128 {
129 for e in errors {
131 all_errors.push(ImportError::ParseError(e.message));
132 }
133
134 sdk_tables.push(self.table_to_table_data(table, table_index, &json_data));
136 }
137
138 info!(
139 "ODCS import completed: {} tables extracted from schema array",
140 sdk_tables.len()
141 );
142
143 return Ok(ImportResult {
144 tables: sdk_tables,
145 tables_requiring_name: Vec::new(),
146 errors: all_errors,
147 ai_suggestions: None,
148 });
149 }
150 Err(e) => {
151 return Err(ImportError::ParseError(e.to_string()));
152 }
153 }
154 }
155
156 match self.parse(yaml_content) {
158 Ok((table, errors)) => {
159 let sdk_tables = vec![self.table_to_table_data(table, 0, &json_data)];
160 let sdk_errors: Vec<ImportError> = errors
161 .iter()
162 .map(|e| ImportError::ParseError(e.message.clone()))
163 .collect();
164 Ok(ImportResult {
165 tables: sdk_tables,
166 tables_requiring_name: Vec::new(),
167 errors: sdk_errors,
168 ai_suggestions: None,
169 })
170 }
171 Err(e) => Err(ImportError::ParseError(e.to_string())),
172 }
173 }
174
175 fn table_to_table_data(
177 &self,
178 table: Table,
179 table_index: usize,
180 json_data: &JsonValue,
181 ) -> TableData {
182 TableData {
183 table_index,
184 id: Some(table.id.to_string()),
185 name: Some(table.name.clone()),
186 api_version: json_data
187 .get("apiVersion")
188 .and_then(|v| v.as_str())
189 .map(|s| s.to_string()),
190 version: json_data
191 .get("version")
192 .and_then(|v| v.as_str())
193 .map(|s| s.to_string()),
194 status: json_data
195 .get("status")
196 .and_then(|v| v.as_str())
197 .map(|s| s.to_string()),
198 kind: json_data
199 .get("kind")
200 .and_then(|v| v.as_str())
201 .map(|s| s.to_string()),
202 domain: json_data
203 .get("domain")
204 .and_then(|v| v.as_str())
205 .map(|s| s.to_string()),
206 data_product: json_data
207 .get("dataProduct")
208 .and_then(|v| v.as_str())
209 .map(|s| s.to_string()),
210 tenant: json_data
211 .get("tenant")
212 .and_then(|v| v.as_str())
213 .map(|s| s.to_string()),
214 description: json_data.get("description").cloned(),
215 physical_name: table
217 .odcl_metadata
218 .get("physicalName")
219 .and_then(|v| v.as_str())
220 .map(|s| s.to_string())
221 .or_else(|| table.schema_name.clone()),
222 physical_type: table
223 .odcl_metadata
224 .get("physicalType")
225 .and_then(|v| v.as_str())
226 .map(|s| s.to_string()),
227 business_name: table
228 .odcl_metadata
229 .get("businessName")
230 .and_then(|v| v.as_str())
231 .map(|s| s.to_string()),
232 data_granularity_description: table
233 .odcl_metadata
234 .get("dataGranularityDescription")
235 .and_then(|v| v.as_str())
236 .map(|s| s.to_string()),
237 columns: table.columns.iter().map(column_to_column_data).collect(),
238 servers: json_data
239 .get("servers")
240 .and_then(|v| v.as_array())
241 .cloned()
242 .unwrap_or_default(),
243 team: json_data.get("team").cloned(),
244 support: json_data.get("support").cloned(),
245 roles: json_data
246 .get("roles")
247 .and_then(|v| v.as_array())
248 .cloned()
249 .unwrap_or_default(),
250 sla_properties: json_data
251 .get("slaProperties")
252 .and_then(|v| v.as_array())
253 .cloned()
254 .unwrap_or_default(),
255 quality: table.quality.clone(),
256 price: json_data.get("price").cloned(),
257 tags: table.tags.iter().map(|t| t.to_string()).collect(),
258 custom_properties: {
260 let mut props = json_data
261 .get("customProperties")
262 .and_then(|v| v.as_array())
263 .cloned()
264 .unwrap_or_default();
265 if let Some(schema_props) = table.odcl_metadata.get("schemaCustomProperties")
267 && let Some(schema_arr) = schema_props.as_array()
268 {
269 for prop in schema_arr {
270 let prop_name = prop.get("property").and_then(|v| v.as_str()).unwrap_or("");
272 let already_exists = props.iter().any(|p| {
273 p.get("property")
274 .and_then(|v| v.as_str())
275 .map(|n| n == prop_name)
276 .unwrap_or(false)
277 });
278 if !already_exists {
279 props.push(prop.clone());
280 }
281 }
282 }
283 props
284 },
285 authoritative_definitions: json_data
286 .get("authoritativeDefinitions")
287 .and_then(|v| v.as_array())
288 .cloned()
289 .unwrap_or_default(),
290 contract_created_ts: json_data
291 .get("contractCreatedTs")
292 .and_then(|v| v.as_str())
293 .map(|s| s.to_string()),
294 odcs_metadata: table.odcl_metadata.clone(),
295 }
296 }
297
298 pub fn import_contract(
346 &mut self,
347 yaml_content: &str,
348 ) -> Result<crate::models::odcs::ODCSContract, ImportError> {
349 use crate::models::odcs::{
350 AuthoritativeDefinition, CustomProperty, Description, ODCSContract, QualityRule, Role,
351 Server, ServiceLevel, Support, Team,
352 };
353
354 let yaml_data: serde_yaml::Value = serde_yaml::from_str(yaml_content)
356 .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
357
358 let json_data = yaml_to_json_value(&yaml_data).map_err(|e| {
359 ImportError::ParseError(format!("Failed to convert YAML to JSON: {}", e))
360 })?;
361
362 self.current_yaml_data = Some(yaml_data);
364
365 if !self.is_odcl_v3_format(&json_data) {
367 return Err(ImportError::ParseError(
368 "import_contract() only supports ODCS v3 format. Use import() for legacy formats."
369 .to_string(),
370 ));
371 }
372
373 let api_version = json_data
375 .get("apiVersion")
376 .and_then(|v| v.as_str())
377 .unwrap_or("v3.1.0")
378 .to_string();
379 let kind = json_data
380 .get("kind")
381 .and_then(|v| v.as_str())
382 .unwrap_or("DataContract")
383 .to_string();
384 let id = json_data
385 .get("id")
386 .and_then(|v| v.as_str())
387 .unwrap_or("")
388 .to_string();
389 let version = json_data
390 .get("version")
391 .and_then(|v| v.as_str())
392 .unwrap_or("1.0.0")
393 .to_string();
394 let name = json_data
395 .get("name")
396 .and_then(|v| v.as_str())
397 .unwrap_or("")
398 .to_string();
399 let status = json_data
400 .get("status")
401 .and_then(|v| v.as_str())
402 .map(|s| s.to_string());
403 let domain = json_data
404 .get("domain")
405 .and_then(|v| v.as_str())
406 .map(|s| s.to_string());
407 let data_product = json_data
408 .get("dataProduct")
409 .and_then(|v| v.as_str())
410 .map(|s| s.to_string());
411 let tenant = json_data
412 .get("tenant")
413 .and_then(|v| v.as_str())
414 .map(|s| s.to_string());
415
416 let description = json_data.get("description").and_then(|v| {
418 if v.is_string() {
419 Some(Description::Simple(v.as_str().unwrap().to_string()))
420 } else if v.is_object() {
421 serde_json::from_value::<Description>(json_value_to_serde_value(v)).ok()
422 } else {
423 None
424 }
425 });
426
427 let schema = self.parse_schema_array_to_odcs(&json_data)?;
429
430 let servers: Vec<Server> = json_data
432 .get("servers")
433 .and_then(|v| v.as_array())
434 .map(|arr| {
435 arr.iter()
436 .filter_map(|s| serde_json::from_value(json_value_to_serde_value(s)).ok())
437 .collect()
438 })
439 .unwrap_or_default();
440
441 let team: Option<Team> = json_data
443 .get("team")
444 .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
445
446 let support: Option<Support> = json_data
448 .get("support")
449 .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
450
451 let roles: Vec<Role> = json_data
453 .get("roles")
454 .and_then(|v| v.as_array())
455 .map(|arr| {
456 arr.iter()
457 .filter_map(|r| serde_json::from_value(json_value_to_serde_value(r)).ok())
458 .collect()
459 })
460 .unwrap_or_default();
461
462 let service_levels: Vec<ServiceLevel> = json_data
464 .get("slaProperties")
465 .or_else(|| json_data.get("serviceLevels"))
466 .and_then(|v| v.as_array())
467 .map(|arr| {
468 arr.iter()
469 .filter_map(|s| serde_json::from_value(json_value_to_serde_value(s)).ok())
470 .collect()
471 })
472 .unwrap_or_default();
473
474 let quality: Vec<QualityRule> = json_data
476 .get("quality")
477 .and_then(|v| v.as_array())
478 .map(|arr| {
479 arr.iter()
480 .filter_map(|q| serde_json::from_value(json_value_to_serde_value(q)).ok())
481 .collect()
482 })
483 .unwrap_or_default();
484
485 let price = json_data
487 .get("price")
488 .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
489
490 let terms = json_data
492 .get("terms")
493 .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
494
495 let links = json_data
497 .get("links")
498 .and_then(|v| v.as_array())
499 .map(|arr| {
500 arr.iter()
501 .filter_map(|l| serde_json::from_value(json_value_to_serde_value(l)).ok())
502 .collect()
503 })
504 .unwrap_or_default();
505
506 let authoritative_definitions: Vec<AuthoritativeDefinition> = json_data
508 .get("authoritativeDefinitions")
509 .and_then(|v| v.as_array())
510 .map(|arr| {
511 arr.iter()
512 .filter_map(|ad| serde_json::from_value(json_value_to_serde_value(ad)).ok())
513 .collect()
514 })
515 .unwrap_or_default();
516
517 let tags: Vec<String> = json_data
519 .get("tags")
520 .and_then(|v| v.as_array())
521 .map(|arr| {
522 arr.iter()
523 .filter_map(|t| t.as_str().map(|s| s.to_string()))
524 .collect()
525 })
526 .unwrap_or_default();
527
528 let custom_properties: Vec<CustomProperty> = json_data
530 .get("customProperties")
531 .and_then(|v| v.as_array())
532 .map(|arr| {
533 arr.iter()
534 .filter_map(|cp| serde_json::from_value(json_value_to_serde_value(cp)).ok())
535 .collect()
536 })
537 .unwrap_or_default();
538
539 let contract_created_ts = json_data
541 .get("contractCreatedTs")
542 .and_then(|v| v.as_str())
543 .map(|s| s.to_string());
544
545 Ok(ODCSContract {
546 api_version,
547 kind,
548 id,
549 version,
550 name,
551 status,
552 domain,
553 data_product,
554 tenant,
555 description,
556 schema,
557 servers,
558 team,
559 support,
560 roles,
561 service_levels,
562 quality,
563 price,
564 terms,
565 links,
566 authoritative_definitions,
567 tags,
568 custom_properties,
569 contract_created_ts,
570 })
571 }
572
573 fn parse_schema_array_to_odcs(
575 &self,
576 json_data: &JsonValue,
577 ) -> Result<Vec<crate::models::odcs::SchemaObject>, ImportError> {
578 use crate::models::odcs::{
579 AuthoritativeDefinition, CustomProperty, QualityRule, SchemaObject, SchemaRelationship,
580 };
581
582 let schema_arr = match json_data.get("schema").and_then(|v| v.as_array()) {
583 Some(arr) => arr,
584 None => return Ok(Vec::new()),
585 };
586
587 let mut schemas = Vec::new();
588
589 for (idx, schema_value) in schema_arr.iter().enumerate() {
590 let schema_obj = match schema_value.as_object() {
591 Some(obj) => obj,
592 None => {
593 return Err(ImportError::ParseError(format!(
594 "Schema object at index {} must be a dictionary",
595 idx
596 )));
597 }
598 };
599
600 let name = schema_obj
601 .get("name")
602 .and_then(|v| v.as_str())
603 .ok_or_else(|| {
604 ImportError::ParseError(format!(
605 "Schema object at index {} missing 'name' field",
606 idx
607 ))
608 })?
609 .to_string();
610
611 let id = schema_obj
612 .get("id")
613 .and_then(|v| v.as_str())
614 .map(|s| s.to_string());
615 let physical_name = schema_obj
616 .get("physicalName")
617 .and_then(|v| v.as_str())
618 .map(|s| s.to_string());
619 let physical_type = schema_obj
620 .get("physicalType")
621 .and_then(|v| v.as_str())
622 .map(|s| s.to_string());
623 let business_name = schema_obj
624 .get("businessName")
625 .and_then(|v| v.as_str())
626 .map(|s| s.to_string());
627 let description = schema_obj
628 .get("description")
629 .and_then(|v| v.as_str())
630 .map(|s| s.to_string());
631 let data_granularity_description = schema_obj
632 .get("dataGranularityDescription")
633 .and_then(|v| v.as_str())
634 .map(|s| s.to_string());
635
636 let properties = self.parse_properties_to_odcs(schema_obj, json_data)?;
638
639 let relationships: Vec<SchemaRelationship> = schema_obj
641 .get("relationships")
642 .and_then(|v| v.as_array())
643 .map(|arr| {
644 arr.iter()
645 .filter_map(|r| serde_json::from_value(json_value_to_serde_value(r)).ok())
646 .collect()
647 })
648 .unwrap_or_default();
649
650 let quality: Vec<QualityRule> = schema_obj
652 .get("quality")
653 .and_then(|v| v.as_array())
654 .map(|arr| {
655 arr.iter()
656 .filter_map(|q| serde_json::from_value(json_value_to_serde_value(q)).ok())
657 .collect()
658 })
659 .unwrap_or_default();
660
661 let authoritative_definitions: Vec<AuthoritativeDefinition> = schema_obj
663 .get("authoritativeDefinitions")
664 .and_then(|v| v.as_array())
665 .map(|arr| {
666 arr.iter()
667 .filter_map(|ad| serde_json::from_value(json_value_to_serde_value(ad)).ok())
668 .collect()
669 })
670 .unwrap_or_default();
671
672 let tags: Vec<String> = schema_obj
674 .get("tags")
675 .and_then(|v| v.as_array())
676 .map(|arr| {
677 arr.iter()
678 .filter_map(|t| t.as_str().map(|s| s.to_string()))
679 .collect()
680 })
681 .unwrap_or_default();
682
683 let custom_properties: Vec<CustomProperty> = schema_obj
685 .get("customProperties")
686 .and_then(|v| v.as_array())
687 .map(|arr| {
688 arr.iter()
689 .filter_map(|cp| serde_json::from_value(json_value_to_serde_value(cp)).ok())
690 .collect()
691 })
692 .unwrap_or_default();
693
694 schemas.push(SchemaObject {
695 id,
696 name,
697 physical_name,
698 physical_type,
699 business_name,
700 description,
701 data_granularity_description,
702 properties,
703 relationships,
704 quality,
705 authoritative_definitions,
706 tags,
707 custom_properties,
708 });
709 }
710
711 Ok(schemas)
712 }
713
714 fn parse_properties_to_odcs(
716 &self,
717 schema_obj: &serde_json::Map<String, JsonValue>,
718 root_data: &JsonValue,
719 ) -> Result<Vec<crate::models::odcs::Property>, ImportError> {
720 let props_arr = match schema_obj.get("properties").and_then(|v| v.as_array()) {
721 Some(arr) => arr,
722 None => return Ok(Vec::new()),
723 };
724
725 let mut properties = Vec::new();
726
727 for prop_value in props_arr {
728 let prop_obj = match prop_value.as_object() {
729 Some(obj) => obj,
730 None => continue,
731 };
732
733 let prop = self.parse_single_property_to_odcs(prop_obj, root_data)?;
734 properties.push(prop);
735 }
736
737 Ok(properties)
738 }
739
740 #[allow(clippy::only_used_in_recursion)]
742 fn parse_single_property_to_odcs(
743 &self,
744 prop_obj: &serde_json::Map<String, JsonValue>,
745 root_data: &JsonValue,
746 ) -> Result<crate::models::odcs::Property, ImportError> {
747 use crate::models::odcs::{
748 AuthoritativeDefinition, CustomProperty, LogicalTypeOptions, Property,
749 PropertyRelationship, QualityRule,
750 };
751
752 if let Some(ref_path) = prop_obj.get("$ref").and_then(|v| v.as_str())
754 && let Some(resolved) = resolve_ref(ref_path, root_data)
755 && let Some(obj) = resolved.as_object()
756 {
757 return self.parse_single_property_to_odcs(obj, root_data);
758 }
759
760 let name = prop_obj
761 .get("name")
762 .and_then(|v| v.as_str())
763 .unwrap_or("")
764 .to_string();
765 let logical_type = prop_obj
766 .get("logicalType")
767 .or_else(|| prop_obj.get("type"))
768 .and_then(|v| v.as_str())
769 .unwrap_or("string")
770 .to_string();
771
772 let id = prop_obj
773 .get("id")
774 .and_then(|v| v.as_str())
775 .map(|s| s.to_string());
776 let business_name = prop_obj
777 .get("businessName")
778 .and_then(|v| v.as_str())
779 .map(|s| s.to_string());
780 let description = prop_obj
781 .get("description")
782 .and_then(|v| v.as_str())
783 .map(|s| s.to_string());
784 let physical_type = prop_obj
785 .get("physicalType")
786 .and_then(|v| v.as_str())
787 .map(|s| s.to_string());
788 let physical_name = prop_obj
789 .get("physicalName")
790 .and_then(|v| v.as_str())
791 .map(|s| s.to_string());
792
793 let logical_type_options: Option<LogicalTypeOptions> = prop_obj
795 .get("logicalTypeOptions")
796 .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
797
798 let required = prop_obj
800 .get("required")
801 .and_then(|v| v.as_bool())
802 .unwrap_or(false);
803 let primary_key = prop_obj
804 .get("primaryKey")
805 .and_then(|v| v.as_bool())
806 .unwrap_or(false);
807 let primary_key_position = prop_obj
808 .get("primaryKeyPosition")
809 .and_then(|v| v.as_i64())
810 .map(|n| n as i32);
811 let unique = prop_obj
812 .get("unique")
813 .and_then(|v| v.as_bool())
814 .unwrap_or(false);
815
816 let partitioned = prop_obj
818 .get("partitioned")
819 .and_then(|v| v.as_bool())
820 .unwrap_or(false);
821 let partition_key_position = prop_obj
822 .get("partitionKeyPosition")
823 .and_then(|v| v.as_i64())
824 .map(|n| n as i32);
825 let clustered = prop_obj
826 .get("clustered")
827 .and_then(|v| v.as_bool())
828 .unwrap_or(false);
829
830 let classification = prop_obj
832 .get("classification")
833 .and_then(|v| v.as_str())
834 .map(|s| s.to_string());
835 let critical_data_element = prop_obj
836 .get("criticalDataElement")
837 .and_then(|v| v.as_bool())
838 .unwrap_or(false);
839 let encrypted_name = prop_obj
840 .get("encryptedName")
841 .and_then(|v| v.as_str())
842 .map(|s| s.to_string());
843
844 let transform_source_objects: Vec<String> = prop_obj
846 .get("transformSourceObjects")
847 .and_then(|v| v.as_array())
848 .map(|arr| {
849 arr.iter()
850 .filter_map(|t| t.as_str().map(|s| s.to_string()))
851 .collect()
852 })
853 .unwrap_or_default();
854 let transform_logic = prop_obj
855 .get("transformLogic")
856 .and_then(|v| v.as_str())
857 .map(|s| s.to_string());
858 let transform_description = prop_obj
859 .get("transformDescription")
860 .and_then(|v| v.as_str())
861 .map(|s| s.to_string());
862
863 let examples: Vec<serde_json::Value> = prop_obj
865 .get("examples")
866 .and_then(|v| v.as_array())
867 .map(|arr| arr.iter().map(json_value_to_serde_value).collect())
868 .unwrap_or_default();
869 let default_value = prop_obj.get("default").map(json_value_to_serde_value);
870
871 let relationships: Vec<PropertyRelationship> = prop_obj
873 .get("relationships")
874 .and_then(|v| v.as_array())
875 .map(|arr| {
876 arr.iter()
877 .filter_map(|r| serde_json::from_value(json_value_to_serde_value(r)).ok())
878 .collect()
879 })
880 .unwrap_or_default();
881
882 let authoritative_definitions: Vec<AuthoritativeDefinition> = prop_obj
884 .get("authoritativeDefinitions")
885 .and_then(|v| v.as_array())
886 .map(|arr| {
887 arr.iter()
888 .filter_map(|ad| serde_json::from_value(json_value_to_serde_value(ad)).ok())
889 .collect()
890 })
891 .unwrap_or_default();
892
893 let quality: Vec<QualityRule> = prop_obj
895 .get("quality")
896 .and_then(|v| v.as_array())
897 .map(|arr| {
898 arr.iter()
899 .filter_map(|q| serde_json::from_value(json_value_to_serde_value(q)).ok())
900 .collect()
901 })
902 .unwrap_or_default();
903
904 let enum_values: Vec<String> = prop_obj
906 .get("enum")
907 .and_then(|v| v.as_array())
908 .map(|arr| {
909 arr.iter()
910 .filter_map(|e| e.as_str().map(|s| s.to_string()))
911 .collect()
912 })
913 .unwrap_or_default();
914
915 let tags: Vec<String> = prop_obj
917 .get("tags")
918 .and_then(|v| v.as_array())
919 .map(|arr| {
920 arr.iter()
921 .filter_map(|t| t.as_str().map(|s| s.to_string()))
922 .collect()
923 })
924 .unwrap_or_default();
925
926 let custom_properties: Vec<CustomProperty> = prop_obj
928 .get("customProperties")
929 .and_then(|v| v.as_array())
930 .map(|arr| {
931 arr.iter()
932 .filter_map(|cp| serde_json::from_value(json_value_to_serde_value(cp)).ok())
933 .collect()
934 })
935 .unwrap_or_default();
936
937 let nested_properties: Vec<Property> =
939 if let Some(props) = prop_obj.get("properties").and_then(|v| v.as_array()) {
940 props
941 .iter()
942 .filter_map(|p| {
943 p.as_object()
944 .and_then(|obj| self.parse_single_property_to_odcs(obj, root_data).ok())
945 })
946 .collect()
947 } else {
948 Vec::new()
949 };
950
951 let items: Option<Box<Property>> = prop_obj.get("items").and_then(|v| {
953 v.as_object()
954 .and_then(|obj| self.parse_single_property_to_odcs(obj, root_data).ok())
955 .map(Box::new)
956 });
957
958 Ok(Property {
959 id,
960 name,
961 business_name,
962 description,
963 logical_type,
964 physical_type,
965 physical_name,
966 logical_type_options,
967 required,
968 primary_key,
969 primary_key_position,
970 unique,
971 partitioned,
972 partition_key_position,
973 clustered,
974 classification,
975 critical_data_element,
976 encrypted_name,
977 transform_source_objects,
978 transform_logic,
979 transform_description,
980 examples,
981 default_value,
982 relationships,
983 authoritative_definitions,
984 quality,
985 enum_values,
986 tags,
987 custom_properties,
988 items,
989 properties: nested_properties,
990 })
991 }
992
993 pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
1004 self.parse(yaml_content)
1005 }
1006
1007 fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
1017 let _errors: Vec<ParserError> = Vec::new();
1019
1020 let data: serde_yaml::Value =
1022 serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
1023
1024 if data.is_null() {
1025 return Err(anyhow::anyhow!("Empty YAML content"));
1026 }
1027
1028 self.current_yaml_data = Some(data.clone());
1030
1031 let json_data = yaml_to_json_value(&data)?;
1033
1034 if self.is_liquibase_format(&json_data) {
1036 return self.parse_liquibase(&json_data);
1037 }
1038
1039 if self.is_odcl_v3_format(&json_data) {
1040 return self.parse_odcl_v3(&json_data);
1041 }
1042
1043 if self.is_data_contract_format(&json_data) {
1044 return self.parse_data_contract(&json_data);
1045 }
1046
1047 self.parse_simple_odcl(&json_data)
1049 }
1050
1051 fn is_liquibase_format(&self, data: &JsonValue) -> bool {
1053 if data.get("databaseChangeLog").is_some() {
1054 return true;
1055 }
1056 if let Some(obj) = data.as_object() {
1058 let obj_str = format!("{:?}", obj);
1059 if obj_str.contains("changeSet") {
1060 return true;
1061 }
1062 }
1063 false
1064 }
1065
1066 fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
1068 if let Some(obj) = data.as_object() {
1069 let has_api_version = obj.contains_key("apiVersion");
1070 let has_kind = obj
1071 .get("kind")
1072 .and_then(|v| v.as_str())
1073 .map(|s| s == "DataContract")
1074 .unwrap_or(false);
1075 let has_id = obj.contains_key("id");
1076 let has_version = obj.contains_key("version");
1077 return has_api_version && has_kind && has_id && has_version;
1078 }
1079 false
1080 }
1081
1082 fn is_data_contract_format(&self, data: &JsonValue) -> bool {
1084 if let Some(obj) = data.as_object() {
1085 let has_spec = obj.contains_key("dataContractSpecification");
1086 let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
1087 return has_spec && has_models;
1088 }
1089 false
1090 }
1091
1092 fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1094 let mut errors = Vec::new();
1095
1096 let name = data
1098 .get("name")
1099 .and_then(|v| v.as_str())
1100 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
1101 .to_string();
1102
1103 let columns_data = data
1105 .get("columns")
1106 .and_then(|v| v.as_array())
1107 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
1108
1109 let mut columns = Vec::new();
1110 for (idx, col_data) in columns_data.iter().enumerate() {
1111 match self.parse_column(col_data) {
1112 Ok(col) => columns.push(col),
1113 Err(e) => {
1114 errors.push(ParserError {
1115 error_type: "column_parse_error".to_string(),
1116 field: format!("columns[{}]", idx),
1117 message: e.to_string(),
1118 });
1119 }
1120 }
1121 }
1122
1123 let database_type = self.extract_database_type(data);
1125 let medallion_layers = self.extract_medallion_layers(data);
1126 let scd_pattern = self.extract_scd_pattern(data);
1127 let data_vault_classification = self.extract_data_vault_classification(data);
1128 let quality_rules = self.extract_quality_rules(data);
1129
1130 if scd_pattern.is_some() && data_vault_classification.is_some() {
1132 errors.push(ParserError {
1133 error_type: "validation_error".to_string(),
1134 field: "patterns".to_string(),
1135 message: "SCD pattern and Data Vault classification are mutually exclusive"
1136 .to_string(),
1137 });
1138 }
1139
1140 let mut odcl_metadata = HashMap::new();
1142 if let Some(metadata) = data.get("odcl_metadata")
1143 && let Some(obj) = metadata.as_object()
1144 {
1145 for (key, value) in obj {
1146 odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
1147 }
1148 }
1149
1150 let table_uuid = self.extract_table_uuid(data);
1151
1152 let table = Table {
1153 id: table_uuid,
1154 name,
1155 columns,
1156 database_type,
1157 catalog_name: None,
1158 schema_name: None,
1159 medallion_layers,
1160 scd_pattern,
1161 data_vault_classification,
1162 modeling_level: None,
1163 tags: Vec::<Tag>::new(),
1164 odcl_metadata,
1165 owner: None,
1166 sla: None,
1167 contact_details: None,
1168 infrastructure_type: None,
1169 notes: None,
1170 position: None,
1171 yaml_file_path: None,
1172 drawio_cell_id: None,
1173 quality: quality_rules,
1174 errors: Vec::new(),
1175 created_at: chrono::Utc::now(),
1176 updated_at: chrono::Utc::now(),
1177 };
1178
1179 info!("Parsed ODCL table: {}", table.name);
1180 Ok((table, errors))
1181 }
1182
1183 fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
1185 let name = col_data
1186 .get("name")
1187 .and_then(|v| v.as_str())
1188 .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
1189 .to_string();
1190
1191 let data_type = col_data
1192 .get("data_type")
1193 .and_then(|v| v.as_str())
1194 .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
1195 .to_string();
1196
1197 let data_type = normalize_data_type(&data_type);
1199
1200 let nullable = col_data
1201 .get("nullable")
1202 .and_then(|v| v.as_bool())
1203 .unwrap_or(true);
1204
1205 let primary_key = col_data
1206 .get("primary_key")
1207 .and_then(|v| v.as_bool())
1208 .unwrap_or(false);
1209
1210 let foreign_key = col_data
1211 .get("foreign_key")
1212 .and_then(|v| self.parse_foreign_key(v));
1213
1214 let constraints = col_data
1215 .get("constraints")
1216 .and_then(|v| v.as_array())
1217 .map(|arr| {
1218 arr.iter()
1219 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1220 .collect()
1221 })
1222 .unwrap_or_default();
1223
1224 let description = col_data
1225 .get("description")
1226 .and_then(|v| v.as_str())
1227 .map(|s| s.to_string())
1228 .unwrap_or_default();
1229
1230 let mut column_quality_rules = Vec::new();
1232 if let Some(quality_val) = col_data.get("quality") {
1233 if let Some(arr) = quality_val.as_array() {
1234 for item in arr {
1236 if let Some(obj) = item.as_object() {
1237 let mut rule = HashMap::new();
1238 for (key, value) in obj {
1239 rule.insert(key.clone(), json_value_to_serde_value(value));
1240 }
1241 column_quality_rules.push(rule);
1242 }
1243 }
1244 } else if let Some(obj) = quality_val.as_object() {
1245 let mut rule = HashMap::new();
1247 for (key, value) in obj {
1248 rule.insert(key.clone(), json_value_to_serde_value(value));
1249 }
1250 column_quality_rules.push(rule);
1251 }
1252 }
1253
1254 Ok(Column {
1259 name,
1260 data_type,
1261 nullable,
1262 primary_key,
1263 foreign_key,
1264 constraints,
1265 description,
1266 quality: column_quality_rules,
1267 ..Default::default()
1268 })
1269 }
1270
1271 fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
1273 let obj = fk_data.as_object()?;
1274 Some(ForeignKey {
1275 table_id: obj
1276 .get("table_id")
1277 .or_else(|| obj.get("table"))
1278 .and_then(|v| v.as_str())
1279 .unwrap_or("")
1280 .to_string(),
1281 column_name: obj
1282 .get("column_name")
1283 .or_else(|| obj.get("column"))
1284 .and_then(|v| v.as_str())
1285 .unwrap_or("")
1286 .to_string(),
1287 })
1288 }
1289
1290 fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
1292 data.get("database_type")
1293 .and_then(|v| v.as_str())
1294 .and_then(|s| match s.to_uppercase().as_str() {
1295 "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
1296 "MYSQL" => Some(DatabaseType::Mysql),
1297 "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
1298 "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
1299 "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
1300 _ => None,
1301 })
1302 }
1303
1304 fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
1306 let mut layers = Vec::new();
1307
1308 if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
1310 for item in arr {
1311 if let Some(s) = item.as_str()
1312 && let Ok(layer) = parse_medallion_layer(s)
1313 {
1314 layers.push(layer);
1315 }
1316 }
1317 }
1318 else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
1320 && let Ok(layer) = parse_medallion_layer(s)
1321 {
1322 layers.push(layer);
1323 }
1324
1325 layers
1326 }
1327
1328 fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
1330 data.get("scd_pattern")
1331 .and_then(|v| v.as_str())
1332 .and_then(|s| parse_scd_pattern(s).ok())
1333 }
1334
1335 fn extract_data_vault_classification(
1337 &self,
1338 data: &JsonValue,
1339 ) -> Option<DataVaultClassification> {
1340 data.get("data_vault_classification")
1341 .and_then(|v| v.as_str())
1342 .and_then(|s| parse_data_vault_classification(s).ok())
1343 }
1344
1345 fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
1347 use serde_json::Value;
1348 let mut quality_rules = Vec::new();
1349
1350 if let Some(quality_val) = data.get("quality") {
1352 if let Some(arr) = quality_val.as_array() {
1353 for item in arr {
1355 if let Some(obj) = item.as_object() {
1356 let mut rule = HashMap::new();
1357 for (key, value) in obj {
1358 rule.insert(key.clone(), json_value_to_serde_value(value));
1359 }
1360 quality_rules.push(rule);
1361 }
1362 }
1363 } else if let Some(obj) = quality_val.as_object() {
1364 let mut rule = HashMap::new();
1366 for (key, value) in obj {
1367 rule.insert(key.clone(), json_value_to_serde_value(value));
1368 }
1369 quality_rules.push(rule);
1370 } else if let Some(s) = quality_val.as_str() {
1371 let mut rule = HashMap::new();
1373 rule.insert("value".to_string(), Value::String(s.to_string()));
1374 quality_rules.push(rule);
1375 }
1376 }
1377
1378 if let Some(metadata) = data.get("metadata")
1380 && let Some(metadata_obj) = metadata.as_object()
1381 && let Some(quality_val) = metadata_obj.get("quality")
1382 {
1383 if let Some(arr) = quality_val.as_array() {
1384 for item in arr {
1386 if let Some(obj) = item.as_object() {
1387 let mut rule = HashMap::new();
1388 for (key, value) in obj {
1389 rule.insert(key.clone(), json_value_to_serde_value(value));
1390 }
1391 quality_rules.push(rule);
1392 }
1393 }
1394 } else if let Some(obj) = quality_val.as_object() {
1395 let mut rule = HashMap::new();
1397 for (key, value) in obj {
1398 rule.insert(key.clone(), json_value_to_serde_value(value));
1399 }
1400 quality_rules.push(rule);
1401 } else if let Some(s) = quality_val.as_str() {
1402 let mut rule = HashMap::new();
1404 rule.insert("value".to_string(), Value::String(s.to_string()));
1405 quality_rules.push(rule);
1406 }
1407 }
1408
1409 if let Some(tblprops) = data.get("tblproperties")
1411 && let Some(obj) = tblprops.as_object()
1412 {
1413 for (key, value) in obj {
1414 let mut rule = HashMap::new();
1415 rule.insert("property".to_string(), Value::String(key.clone()));
1416 rule.insert("value".to_string(), json_value_to_serde_value(value));
1417 quality_rules.push(rule);
1418 }
1419 }
1420
1421 quality_rules
1422 }
1423
1424 fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1432 let mut errors = Vec::new();
1446
1447 let changelog = data
1448 .get("databaseChangeLog")
1449 .and_then(|v| v.as_array())
1450 .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
1451
1452 let mut table_name: Option<String> = None;
1454 let mut columns: Vec<crate::models::column::Column> = Vec::new();
1455
1456 for entry in changelog {
1457 if let Some(change_set) = entry.get("changeSet") {
1459 let changes = if let Some(obj) = change_set.as_object() {
1461 obj.get("changes")
1462 .and_then(|v| v.as_array())
1463 .cloned()
1464 .unwrap_or_default()
1465 } else if let Some(arr) = change_set.as_array() {
1466 arr.clone()
1468 } else {
1469 Vec::new()
1470 };
1471
1472 for ch in changes {
1473 let create = ch.get("createTable").or_else(|| ch.get("create_table"));
1474 if let Some(create) = create {
1475 table_name = create
1476 .get("tableName")
1477 .or_else(|| create.get("table_name"))
1478 .and_then(|v| v.as_str())
1479 .map(|s| s.to_string());
1480
1481 if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
1483 for col_entry in cols {
1484 let col = col_entry.get("column").unwrap_or(col_entry);
1485 let name = col
1486 .get("name")
1487 .and_then(|v| v.as_str())
1488 .unwrap_or("")
1489 .to_string();
1490 let data_type = col
1491 .get("type")
1492 .and_then(|v| v.as_str())
1493 .unwrap_or("")
1494 .to_string();
1495
1496 if name.is_empty() {
1497 errors.push(ParserError {
1498 error_type: "validation_error".to_string(),
1499 field: "columns.name".to_string(),
1500 message: "Liquibase createTable column missing name"
1501 .to_string(),
1502 });
1503 continue;
1504 }
1505
1506 let mut column =
1507 crate::models::column::Column::new(name, data_type);
1508
1509 if let Some(constraints) =
1510 col.get("constraints").and_then(|v| v.as_object())
1511 {
1512 if let Some(pk) =
1513 constraints.get("primaryKey").and_then(|v| v.as_bool())
1514 {
1515 column.primary_key = pk;
1516 }
1517 if let Some(nullable) =
1518 constraints.get("nullable").and_then(|v| v.as_bool())
1519 {
1520 column.nullable = nullable;
1521 }
1522 }
1523
1524 columns.push(column);
1525 }
1526 }
1527
1528 break;
1531 }
1532 }
1533 }
1534 if table_name.is_some() {
1535 break;
1536 }
1537 }
1538
1539 let table_name = table_name
1540 .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
1541 let table = Table::new(table_name, columns);
1542 Ok((table, errors))
1544 }
1545
1546 fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1548 let mut errors = Vec::new();
1549
1550 let table_name = data
1552 .get("name")
1553 .and_then(|v| v.as_str())
1554 .map(|s| s.to_string())
1555 .or_else(|| {
1556 data.get("schema")
1558 .and_then(|v| v.as_array())
1559 .and_then(|arr| arr.first())
1560 .and_then(|obj| obj.as_object())
1561 .and_then(|obj| obj.get("name"))
1562 .and_then(|v| v.as_str())
1563 .map(|s| s.to_string())
1564 })
1565 .ok_or_else(|| {
1566 anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
1567 })?;
1568
1569 let schema = data
1571 .get("schema")
1572 .and_then(|v| v.as_array())
1573 .ok_or_else(|| {
1574 errors.push(ParserError {
1575 error_type: "validation_error".to_string(),
1576 field: "schema".to_string(),
1577 message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
1578 });
1579 anyhow::anyhow!("Missing schema")
1580 });
1581
1582 let schema = match schema {
1583 Ok(s) if s.is_empty() => {
1584 errors.push(ParserError {
1585 error_type: "validation_error".to_string(),
1586 field: "schema".to_string(),
1587 message: "ODCS v3.0.x schema array is empty".to_string(),
1588 });
1589 let quality_rules = self.extract_quality_rules(data);
1590 let table_uuid = self.extract_table_uuid(data);
1591 let table = Table {
1592 id: table_uuid,
1593 name: table_name,
1594 columns: Vec::new(),
1595 database_type: None,
1596 catalog_name: None,
1597 schema_name: None,
1598 medallion_layers: Vec::new(),
1599 scd_pattern: None,
1600 data_vault_classification: None,
1601 modeling_level: None,
1602 tags: Vec::<Tag>::new(),
1603 odcl_metadata: HashMap::new(),
1604 owner: None,
1605 sla: None,
1606 contact_details: None,
1607 infrastructure_type: None,
1608 notes: None,
1609 position: None,
1610 yaml_file_path: None,
1611 drawio_cell_id: None,
1612 quality: quality_rules,
1613 errors: Vec::new(),
1614 created_at: chrono::Utc::now(),
1615 updated_at: chrono::Utc::now(),
1616 };
1617 return Ok((table, errors));
1618 }
1619 Ok(s) => s,
1620 Err(_) => {
1621 let quality_rules = self.extract_quality_rules(data);
1622 let table_uuid = self.extract_table_uuid(data);
1623 let table = Table {
1624 id: table_uuid,
1625 name: table_name,
1626 columns: Vec::new(),
1627 database_type: None,
1628 catalog_name: None,
1629 schema_name: None,
1630 medallion_layers: Vec::new(),
1631 scd_pattern: None,
1632 data_vault_classification: None,
1633 modeling_level: None,
1634 tags: Vec::<Tag>::new(),
1635 odcl_metadata: HashMap::new(),
1636 owner: None,
1637 sla: None,
1638 contact_details: None,
1639 infrastructure_type: None,
1640 notes: None,
1641 position: None,
1642 yaml_file_path: None,
1643 drawio_cell_id: None,
1644 quality: quality_rules,
1645 errors: Vec::new(),
1646 created_at: chrono::Utc::now(),
1647 updated_at: chrono::Utc::now(),
1648 };
1649 return Ok((table, errors));
1650 }
1651 };
1652
1653 let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
1655 errors.push(ParserError {
1656 error_type: "validation_error".to_string(),
1657 field: "schema[0]".to_string(),
1658 message: "First schema object must be a dictionary".to_string(),
1659 });
1660 anyhow::anyhow!("Invalid schema object")
1661 })?;
1662
1663 let object_name = schema_object
1664 .get("name")
1665 .and_then(|v| v.as_str())
1666 .unwrap_or(&table_name);
1667
1668 let mut columns = Vec::new();
1670
1671 if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
1672 for (prop_name, prop_data) in properties_obj {
1674 if let Some(prop_obj) = prop_data.as_object() {
1675 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
1676 Ok(mut cols) => columns.append(&mut cols),
1677 Err(e) => {
1678 errors.push(ParserError {
1679 error_type: "property_parse_error".to_string(),
1680 field: format!("Property '{}'", prop_name),
1681 message: e.to_string(),
1682 });
1683 }
1684 }
1685 } else {
1686 errors.push(ParserError {
1687 error_type: "validation_error".to_string(),
1688 field: format!("Property '{}'", prop_name),
1689 message: format!("Property '{}' must be an object", prop_name),
1690 });
1691 }
1692 }
1693 } else if let Some(properties_arr) =
1694 schema_object.get("properties").and_then(|v| v.as_array())
1695 {
1696 for (idx, prop_data) in properties_arr.iter().enumerate() {
1698 if let Some(prop_obj) = prop_data.as_object() {
1699 let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
1702 Some(JsonValue::String(s)) => s.as_str(),
1703 _ => {
1704 errors.push(ParserError {
1706 error_type: "validation_error".to_string(),
1707 field: format!("Property[{}]", idx),
1708 message: format!(
1709 "Property[{}] missing required 'name' or 'id' field",
1710 idx
1711 ),
1712 });
1713 continue;
1714 }
1715 };
1716
1717 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
1718 Ok(mut cols) => columns.append(&mut cols),
1719 Err(e) => {
1720 errors.push(ParserError {
1721 error_type: "property_parse_error".to_string(),
1722 field: format!("Property[{}] '{}'", idx, prop_name),
1723 message: e.to_string(),
1724 });
1725 }
1726 }
1727 } else {
1728 errors.push(ParserError {
1729 error_type: "validation_error".to_string(),
1730 field: format!("Property[{}]", idx),
1731 message: format!("Property[{}] must be an object", idx),
1732 });
1733 }
1734 }
1735 } else {
1736 errors.push(ParserError {
1737 error_type: "validation_error".to_string(),
1738 field: format!("Object '{}'", object_name),
1739 message: format!(
1740 "Object '{}' missing 'properties' field or properties is invalid",
1741 object_name
1742 ),
1743 });
1744 }
1745
1746 let (medallion_layers, scd_pattern, data_vault_classification, mut tags): (
1748 _,
1749 _,
1750 _,
1751 Vec<Tag>,
1752 ) = self.extract_metadata_from_custom_properties(data);
1753
1754 let mut shared_domains: Vec<String> = Vec::new();
1756 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1757 for prop in custom_props {
1758 if let Some(prop_obj) = prop.as_object() {
1759 let prop_key = prop_obj
1760 .get("property")
1761 .and_then(|v| v.as_str())
1762 .unwrap_or("");
1763 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1764 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1765 {
1766 for item in arr {
1767 if let Some(s) = item.as_str() {
1768 shared_domains.push(s.to_string());
1769 }
1770 }
1771 }
1772 }
1773 }
1774 }
1775
1776 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1778 for item in tags_arr {
1779 if let Some(s) = item.as_str() {
1780 let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
1782 if !tags.contains(&tag) {
1783 tags.push(tag);
1784 }
1785 }
1786 }
1787 }
1788
1789 let database_type = self.extract_database_type_from_odcl_v3_servers(data);
1791
1792 let quality_rules = self.extract_quality_rules(data);
1794
1795 let mut odcl_metadata = HashMap::new();
1797 odcl_metadata.insert(
1798 "apiVersion".to_string(),
1799 json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
1800 );
1801 odcl_metadata.insert(
1802 "kind".to_string(),
1803 json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
1804 );
1805 odcl_metadata.insert(
1806 "id".to_string(),
1807 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1808 );
1809 odcl_metadata.insert(
1810 "version".to_string(),
1811 json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
1812 );
1813 odcl_metadata.insert(
1814 "status".to_string(),
1815 json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
1816 );
1817
1818 if let Some(servicelevels_val) = data.get("servicelevels") {
1820 odcl_metadata.insert(
1821 "servicelevels".to_string(),
1822 json_value_to_serde_value(servicelevels_val),
1823 );
1824 }
1825
1826 if let Some(links_val) = data.get("links") {
1828 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1829 }
1830
1831 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1833 odcl_metadata.insert(
1834 "domain".to_string(),
1835 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1836 );
1837 }
1838 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1839 odcl_metadata.insert(
1840 "dataProduct".to_string(),
1841 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1842 );
1843 }
1844 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1845 odcl_metadata.insert(
1846 "tenant".to_string(),
1847 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1848 );
1849 }
1850
1851 if let Some(desc_val) = data.get("description") {
1853 odcl_metadata.insert(
1854 "description".to_string(),
1855 json_value_to_serde_value(desc_val),
1856 );
1857 }
1858
1859 if let Some(pricing_val) = data.get("pricing") {
1861 odcl_metadata.insert(
1862 "pricing".to_string(),
1863 json_value_to_serde_value(pricing_val),
1864 );
1865 }
1866
1867 if let Some(team_val) = data.get("team") {
1869 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1870 }
1871
1872 if let Some(roles_val) = data.get("roles") {
1874 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1875 }
1876
1877 if let Some(terms_val) = data.get("terms") {
1879 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1880 }
1881
1882 if let Some(servers_val) = data.get("servers") {
1884 odcl_metadata.insert(
1885 "servers".to_string(),
1886 json_value_to_serde_value(servers_val),
1887 );
1888 }
1889
1890 if let Some(infrastructure_val) = data.get("infrastructure") {
1892 odcl_metadata.insert(
1893 "infrastructure".to_string(),
1894 json_value_to_serde_value(infrastructure_val),
1895 );
1896 }
1897
1898 if !shared_domains.is_empty() {
1900 let shared_domains_json: Vec<serde_json::Value> = shared_domains
1901 .iter()
1902 .map(|d| serde_json::Value::String(d.clone()))
1903 .collect();
1904 odcl_metadata.insert(
1905 "sharedDomains".to_string(),
1906 serde_json::Value::Array(shared_domains_json),
1907 );
1908 }
1909
1910 let table_uuid = self.extract_table_uuid(data);
1911
1912 let table = Table {
1913 id: table_uuid,
1914 name: table_name,
1915 columns,
1916 database_type,
1917 catalog_name: None,
1918 schema_name: None,
1919 medallion_layers,
1920 scd_pattern,
1921 data_vault_classification,
1922 modeling_level: None,
1923 tags,
1924 odcl_metadata,
1925 owner: None,
1926 sla: None,
1927 contact_details: None,
1928 infrastructure_type: None,
1929 notes: None,
1930 position: None,
1931 yaml_file_path: None,
1932 drawio_cell_id: None,
1933 quality: quality_rules,
1934 errors: Vec::new(),
1935 created_at: chrono::Utc::now(),
1936 updated_at: chrono::Utc::now(),
1937 };
1938
1939 info!(
1940 "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1941 table.name,
1942 errors.len()
1943 );
1944 Ok((table, errors))
1945 }
1946
1947 fn parse_odcl_v3_all(&self, data: &JsonValue) -> Result<Vec<(Table, Vec<ParserError>)>> {
1956 let mut all_tables = Vec::new();
1957
1958 let schema = match data.get("schema").and_then(|v| v.as_array()) {
1960 Some(s) if !s.is_empty() => s,
1961 Some(_) => {
1962 return Ok(Vec::new());
1964 }
1965 None => {
1966 return Ok(Vec::new());
1968 }
1969 };
1970
1971 let (medallion_layers, scd_pattern, data_vault_classification, contract_tags): (
1973 _,
1974 _,
1975 _,
1976 Vec<Tag>,
1977 ) = self.extract_metadata_from_custom_properties(data);
1978
1979 let database_type = self.extract_database_type_from_odcl_v3_servers(data);
1981
1982 let contract_quality_rules = self.extract_quality_rules(data);
1984
1985 let base_odcl_metadata = self.build_odcl_metadata(data);
1987
1988 for (schema_idx, schema_value) in schema.iter().enumerate() {
1990 let mut errors = Vec::new();
1991
1992 let schema_object = match schema_value.as_object() {
1993 Some(obj) => obj,
1994 None => {
1995 errors.push(ParserError {
1996 error_type: "validation_error".to_string(),
1997 field: format!("schema[{}]", schema_idx),
1998 message: format!(
1999 "Schema object at index {} must be a dictionary",
2000 schema_idx
2001 ),
2002 });
2003 continue;
2004 }
2005 };
2006
2007 let table_name = match schema_object.get("name").and_then(|v| v.as_str()) {
2009 Some(name) => name.to_string(),
2010 None => {
2011 errors.push(ParserError {
2012 error_type: "validation_error".to_string(),
2013 field: format!("schema[{}].name", schema_idx),
2014 message: format!(
2015 "Schema object at index {} missing 'name' field",
2016 schema_idx
2017 ),
2018 });
2019 continue;
2020 }
2021 };
2022
2023 let columns = self.parse_schema_object_properties(schema_object, data, &mut errors);
2025
2026 let mut tags = contract_tags.clone();
2028 if let Some(tags_arr) = schema_object.get("tags").and_then(|v| v.as_array()) {
2029 for item in tags_arr {
2030 if let Some(s) = item.as_str() {
2031 let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
2032 if !tags.contains(&tag) {
2033 tags.push(tag);
2034 }
2035 }
2036 }
2037 }
2038
2039 let mut quality_rules = contract_quality_rules.clone();
2041 if let Some(quality_arr) = schema_object.get("quality").and_then(|v| v.as_array()) {
2042 for rule in quality_arr {
2043 if let Some(rule_obj) = rule.as_object() {
2044 let mut rule_map = HashMap::new();
2045 for (k, v) in rule_obj {
2046 rule_map.insert(k.clone(), json_value_to_serde_value(v));
2047 }
2048 quality_rules.push(rule_map);
2049 }
2050 }
2051 }
2052
2053 let table_uuid = if let Some(id_str) = schema_object.get("id").and_then(|v| v.as_str())
2056 {
2057 uuid::Uuid::parse_str(id_str)
2058 .unwrap_or_else(|_| Table::generate_id(&table_name, None, None, None))
2059 } else {
2060 Table::generate_id(&table_name, None, None, None)
2062 };
2063
2064 let mut odcl_metadata = base_odcl_metadata.clone();
2066
2067 if let Some(desc) = schema_object.get("description") {
2069 odcl_metadata.insert(
2070 "schemaDescription".to_string(),
2071 json_value_to_serde_value(desc),
2072 );
2073 }
2074
2075 if let Some(phys_name) = schema_object.get("physicalName").and_then(|v| v.as_str()) {
2077 odcl_metadata.insert(
2078 "schemaPhysicalName".to_string(),
2079 serde_json::Value::String(phys_name.to_string()),
2080 );
2081 }
2082
2083 if let Some(phys_type) = schema_object.get("physicalType").and_then(|v| v.as_str()) {
2085 odcl_metadata.insert(
2086 "schemaPhysicalType".to_string(),
2087 serde_json::Value::String(phys_type.to_string()),
2088 );
2089 }
2090
2091 if let Some(biz_name) = schema_object.get("businessName").and_then(|v| v.as_str()) {
2093 odcl_metadata.insert(
2094 "schemaBusinessName".to_string(),
2095 serde_json::Value::String(biz_name.to_string()),
2096 );
2097 }
2098
2099 if let Some(granularity) = schema_object
2101 .get("dataGranularityDescription")
2102 .and_then(|v| v.as_str())
2103 {
2104 odcl_metadata.insert(
2105 "dataGranularityDescription".to_string(),
2106 serde_json::Value::String(granularity.to_string()),
2107 );
2108 }
2109
2110 if let Some(auth_defs) = schema_object.get("authoritativeDefinitions") {
2112 odcl_metadata.insert(
2113 "schemaAuthoritativeDefinitions".to_string(),
2114 json_value_to_serde_value(auth_defs),
2115 );
2116 }
2117
2118 if let Some(relationships) = schema_object.get("relationships") {
2120 odcl_metadata.insert(
2121 "schemaRelationships".to_string(),
2122 json_value_to_serde_value(relationships),
2123 );
2124 }
2125
2126 if let Some(custom_props) = schema_object.get("customProperties") {
2128 odcl_metadata.insert(
2129 "schemaCustomProperties".to_string(),
2130 json_value_to_serde_value(custom_props),
2131 );
2132 }
2133
2134 let table = Table {
2135 id: table_uuid,
2136 name: table_name.clone(),
2137 columns,
2138 database_type,
2139 catalog_name: None,
2140 schema_name: None,
2141 medallion_layers: medallion_layers.clone(),
2142 scd_pattern,
2143 data_vault_classification,
2144 modeling_level: None,
2145 tags,
2146 odcl_metadata,
2147 owner: None,
2148 sla: None,
2149 contact_details: None,
2150 infrastructure_type: None,
2151 notes: None,
2152 position: None,
2153 yaml_file_path: None,
2154 drawio_cell_id: None,
2155 quality: quality_rules,
2156 errors: Vec::new(),
2157 created_at: chrono::Utc::now(),
2158 updated_at: chrono::Utc::now(),
2159 };
2160
2161 info!(
2162 "Parsed ODCS v3 table {}/{}: {} with {} columns, {} warnings/errors",
2163 schema_idx + 1,
2164 schema.len(),
2165 table.name,
2166 table.columns.len(),
2167 errors.len()
2168 );
2169
2170 all_tables.push((table, errors));
2171 }
2172
2173 Ok(all_tables)
2174 }
2175
2176 fn parse_schema_object_properties(
2178 &self,
2179 schema_object: &serde_json::Map<String, JsonValue>,
2180 data: &JsonValue,
2181 errors: &mut Vec<ParserError>,
2182 ) -> Vec<Column> {
2183 let mut columns = Vec::new();
2184
2185 if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
2186 for (prop_name, prop_data) in properties_obj {
2188 if let Some(prop_obj) = prop_data.as_object() {
2189 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
2190 Ok(mut cols) => columns.append(&mut cols),
2191 Err(e) => {
2192 errors.push(ParserError {
2193 error_type: "property_parse_error".to_string(),
2194 field: format!("Property '{}'", prop_name),
2195 message: e.to_string(),
2196 });
2197 }
2198 }
2199 } else {
2200 errors.push(ParserError {
2201 error_type: "validation_error".to_string(),
2202 field: format!("Property '{}'", prop_name),
2203 message: format!("Property '{}' must be an object", prop_name),
2204 });
2205 }
2206 }
2207 } else if let Some(properties_arr) =
2208 schema_object.get("properties").and_then(|v| v.as_array())
2209 {
2210 for (idx, prop_data) in properties_arr.iter().enumerate() {
2212 if let Some(prop_obj) = prop_data.as_object() {
2213 let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
2214 Some(JsonValue::String(s)) => s.as_str(),
2215 _ => {
2216 errors.push(ParserError {
2217 error_type: "validation_error".to_string(),
2218 field: format!("Property[{}]", idx),
2219 message: format!(
2220 "Property[{}] missing required 'name' or 'id' field",
2221 idx
2222 ),
2223 });
2224 continue;
2225 }
2226 };
2227
2228 match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
2229 Ok(mut cols) => columns.append(&mut cols),
2230 Err(e) => {
2231 errors.push(ParserError {
2232 error_type: "property_parse_error".to_string(),
2233 field: format!("Property[{}] '{}'", idx, prop_name),
2234 message: e.to_string(),
2235 });
2236 }
2237 }
2238 } else {
2239 errors.push(ParserError {
2240 error_type: "validation_error".to_string(),
2241 field: format!("Property[{}]", idx),
2242 message: format!("Property[{}] must be an object", idx),
2243 });
2244 }
2245 }
2246 }
2247
2248 columns
2249 }
2250
2251 fn build_odcl_metadata(&self, data: &JsonValue) -> HashMap<String, serde_json::Value> {
2253 let mut odcl_metadata = HashMap::new();
2254
2255 odcl_metadata.insert(
2257 "apiVersion".to_string(),
2258 json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
2259 );
2260 odcl_metadata.insert(
2261 "kind".to_string(),
2262 json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
2263 );
2264 odcl_metadata.insert(
2265 "id".to_string(),
2266 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
2267 );
2268 odcl_metadata.insert(
2269 "version".to_string(),
2270 json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
2271 );
2272 odcl_metadata.insert(
2273 "status".to_string(),
2274 json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
2275 );
2276
2277 let optional_fields = [
2279 "servicelevels",
2280 "links",
2281 "domain",
2282 "dataProduct",
2283 "tenant",
2284 "description",
2285 "pricing",
2286 "team",
2287 "roles",
2288 "terms",
2289 "servers",
2290 "infrastructure",
2291 ];
2292
2293 for field in optional_fields {
2294 if let Some(val) = data.get(field) {
2295 odcl_metadata.insert(field.to_string(), json_value_to_serde_value(val));
2296 }
2297 }
2298
2299 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2301 for prop in custom_props {
2302 if let Some(prop_obj) = prop.as_object() {
2303 let prop_key = prop_obj
2304 .get("property")
2305 .and_then(|v| v.as_str())
2306 .unwrap_or("");
2307 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
2308 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
2309 {
2310 let shared_domains: Vec<serde_json::Value> = arr
2311 .iter()
2312 .filter_map(|item| {
2313 item.as_str()
2314 .map(|s| serde_json::Value::String(s.to_string()))
2315 })
2316 .collect();
2317 if !shared_domains.is_empty() {
2318 odcl_metadata.insert(
2319 "sharedDomains".to_string(),
2320 serde_json::Value::Array(shared_domains),
2321 );
2322 }
2323 }
2324 }
2325 }
2326 }
2327
2328 odcl_metadata
2329 }
2330
2331 fn parse_odcl_v3_property(
2333 &self,
2334 prop_name: &str,
2335 prop_data: &serde_json::Map<String, JsonValue>,
2336 data: &JsonValue,
2337 ) -> Result<Vec<Column>> {
2338 let mut errors = Vec::new();
2340 self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
2341 }
2342
2343 fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
2346 if let Some(id_val) = data.get("id")
2348 && let Some(id_str) = id_val.as_str()
2349 {
2350 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
2351 tracing::debug!(
2352 "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
2353 uuid
2354 );
2355 return uuid;
2356 } else {
2357 tracing::warn!(
2358 "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
2359 id_str
2360 );
2361 }
2362 }
2363
2364 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2366 for prop in custom_props {
2367 if let Some(prop_obj) = prop.as_object() {
2368 let prop_key = prop_obj
2369 .get("property")
2370 .and_then(|v| v.as_str())
2371 .unwrap_or("");
2372 if prop_key == "tableUuid"
2373 && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
2374 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
2375 {
2376 tracing::debug!(
2377 "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
2378 uuid
2379 );
2380 return uuid;
2381 }
2382 }
2383 }
2384 }
2385
2386 if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
2388 && let Some(uuid_val) = metadata.get("tableUuid")
2389 && let Some(uuid_str) = uuid_val.as_str()
2390 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
2391 {
2392 tracing::debug!(
2393 "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
2394 uuid
2395 );
2396 return uuid;
2397 }
2398
2399 let table_name = data
2401 .get("name")
2402 .and_then(|v| v.as_str())
2403 .unwrap_or("unknown");
2404 let new_uuid = crate::models::table::Table::generate_id(
2405 table_name, None, None, None, );
2409 tracing::warn!(
2410 "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
2411 table_name,
2412 new_uuid
2413 );
2414 new_uuid
2415 }
2416
2417 fn extract_metadata_from_custom_properties(
2419 &self,
2420 data: &JsonValue,
2421 ) -> (
2422 Vec<MedallionLayer>,
2423 Option<SCDPattern>,
2424 Option<DataVaultClassification>,
2425 Vec<Tag>,
2426 ) {
2427 let mut medallion_layers = Vec::new();
2428 let mut scd_pattern = None;
2429 let mut data_vault_classification = None;
2430 let mut tags: Vec<Tag> = Vec::new();
2431
2432 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2433 for prop in custom_props {
2434 if let Some(prop_obj) = prop.as_object() {
2435 let prop_key = prop_obj
2436 .get("property")
2437 .and_then(|v| v.as_str())
2438 .unwrap_or("");
2439 let prop_value = prop_obj.get("value");
2440
2441 match prop_key {
2442 "medallionLayers" | "medallion_layers" => {
2443 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
2444 for item in arr {
2445 if let Some(s) = item.as_str()
2446 && let Ok(layer) = parse_medallion_layer(s)
2447 {
2448 medallion_layers.push(layer);
2449 }
2450 }
2451 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2452 for part in s.split(',') {
2454 if let Ok(layer) = parse_medallion_layer(part.trim()) {
2455 medallion_layers.push(layer);
2456 }
2457 }
2458 }
2459 }
2460 "scdPattern" | "scd_pattern" => {
2461 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2462 scd_pattern = parse_scd_pattern(s).ok();
2463 }
2464 }
2465 "dataVaultClassification" | "data_vault_classification" => {
2466 if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2467 data_vault_classification = parse_data_vault_classification(s).ok();
2468 }
2469 }
2470 "tags" => {
2471 if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
2472 for item in arr {
2473 if let Some(s) = item.as_str() {
2474 if let Ok(tag) = Tag::from_str(s) {
2476 tags.push(tag);
2477 } else {
2478 tags.push(Tag::Simple(s.to_string()));
2479 }
2480 }
2481 }
2482 } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2483 for part in s.split(',') {
2485 let part = part.trim();
2486 if let Ok(tag) = Tag::from_str(part) {
2487 tags.push(tag);
2488 } else {
2489 tags.push(Tag::Simple(part.to_string()));
2490 }
2491 }
2492 }
2493 }
2494 "sharedDomains" | "shared_domains" => {
2495 }
2498 _ => {}
2499 }
2500 }
2501 }
2502 }
2503
2504 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
2506 for item in tags_arr {
2507 if let Some(s) = item.as_str() {
2508 let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
2510 if !tags.contains(&tag) {
2511 tags.push(tag);
2512 }
2513 }
2514 }
2515 }
2516
2517 (
2518 medallion_layers,
2519 scd_pattern,
2520 data_vault_classification,
2521 tags,
2522 )
2523 }
2524
2525 fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2527 if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
2529 && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
2530 {
2531 return server_obj
2532 .get("type")
2533 .and_then(|v| v.as_str())
2534 .and_then(|s| self.parse_database_type(s));
2535 }
2536 None
2537 }
2538
2539 fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
2541 let mut errors = Vec::new();
2542
2543 let models = data
2545 .get("models")
2546 .and_then(|v| v.as_object())
2547 .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
2548
2549 let (model_name, model_data) = models
2552 .iter()
2553 .next()
2554 .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
2555
2556 let model_data = model_data
2557 .as_object()
2558 .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
2559
2560 let fields = model_data
2562 .get("fields")
2563 .and_then(|v| v.as_object())
2564 .ok_or_else(|| {
2565 errors.push(ParserError {
2566 error_type: "validation_error".to_string(),
2567 field: format!("Model '{}'", model_name),
2568 message: format!("Model '{}' missing 'fields' field", model_name),
2569 });
2570 anyhow::anyhow!("Missing fields")
2571 });
2572
2573 let fields = match fields {
2574 Ok(f) => f,
2575 Err(_) => {
2576 let quality_rules = self.extract_quality_rules(data);
2578 let table_uuid = self.extract_table_uuid(data);
2579 let table = Table {
2580 id: table_uuid,
2581 name: model_name.clone(),
2582 columns: Vec::new(),
2583 database_type: None,
2584 catalog_name: None,
2585 schema_name: None,
2586 medallion_layers: Vec::new(),
2587 scd_pattern: None,
2588 data_vault_classification: None,
2589 modeling_level: None,
2590 tags: Vec::<Tag>::new(),
2591 odcl_metadata: HashMap::new(),
2592 owner: None,
2593 sla: None,
2594 contact_details: None,
2595 infrastructure_type: None,
2596 notes: None,
2597 position: None,
2598 yaml_file_path: None,
2599 drawio_cell_id: None,
2600 quality: quality_rules,
2601 errors: Vec::new(),
2602 created_at: chrono::Utc::now(),
2603 updated_at: chrono::Utc::now(),
2604 };
2605 return Ok((table, errors));
2606 }
2607 };
2608
2609 let mut columns = Vec::new();
2611 for (field_name, field_data) in fields {
2612 if let Some(field_obj) = field_data.as_object() {
2613 match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
2614 Ok(mut cols) => columns.append(&mut cols),
2615 Err(e) => {
2616 errors.push(ParserError {
2617 error_type: "field_parse_error".to_string(),
2618 field: format!("Field '{}'", field_name),
2619 message: e.to_string(),
2620 });
2621 }
2622 }
2623 } else {
2624 errors.push(ParserError {
2625 error_type: "validation_error".to_string(),
2626 field: format!("Field '{}'", field_name),
2627 message: format!("Field '{}' must be an object", field_name),
2628 });
2629 }
2630 }
2631
2632 let mut odcl_metadata = HashMap::new();
2635
2636 if let Some(info_val) = data.get("info") {
2639 let info_json_value = json_value_to_serde_value(info_val);
2641 odcl_metadata.insert("info".to_string(), info_json_value);
2642 }
2643
2644 odcl_metadata.insert(
2645 "dataContractSpecification".to_string(),
2646 json_value_to_serde_value(
2647 data.get("dataContractSpecification")
2648 .unwrap_or(&JsonValue::Null),
2649 ),
2650 );
2651 odcl_metadata.insert(
2652 "id".to_string(),
2653 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
2654 );
2655 if let Some(servicelevels_val) = data.get("servicelevels") {
2659 odcl_metadata.insert(
2660 "servicelevels".to_string(),
2661 json_value_to_serde_value(servicelevels_val),
2662 );
2663 }
2664
2665 if let Some(links_val) = data.get("links") {
2667 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
2668 }
2669
2670 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
2672 odcl_metadata.insert(
2673 "domain".to_string(),
2674 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
2675 );
2676 }
2677 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
2678 odcl_metadata.insert(
2679 "dataProduct".to_string(),
2680 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
2681 );
2682 }
2683 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
2684 odcl_metadata.insert(
2685 "tenant".to_string(),
2686 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
2687 );
2688 }
2689
2690 if let Some(desc_val) = data.get("description") {
2692 odcl_metadata.insert(
2693 "description".to_string(),
2694 json_value_to_serde_value(desc_val),
2695 );
2696 }
2697
2698 if let Some(pricing_val) = data.get("pricing") {
2700 odcl_metadata.insert(
2701 "pricing".to_string(),
2702 json_value_to_serde_value(pricing_val),
2703 );
2704 }
2705
2706 if let Some(team_val) = data.get("team") {
2708 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
2709 }
2710
2711 if let Some(roles_val) = data.get("roles") {
2713 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
2714 }
2715
2716 if let Some(terms_val) = data.get("terms") {
2718 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
2719 }
2720
2721 if let Some(servers_val) = data.get("servers") {
2723 odcl_metadata.insert(
2724 "servers".to_string(),
2725 json_value_to_serde_value(servers_val),
2726 );
2727 }
2728
2729 if let Some(infrastructure_val) = data.get("infrastructure") {
2731 odcl_metadata.insert(
2732 "infrastructure".to_string(),
2733 json_value_to_serde_value(infrastructure_val),
2734 );
2735 }
2736
2737 let database_type = self.extract_database_type_from_servers(data);
2739
2740 let (catalog_name, schema_name) = self.extract_catalog_schema(data);
2742
2743 let mut shared_domains: Vec<String> = Vec::new();
2745 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2746 for prop in custom_props {
2747 if let Some(prop_obj) = prop.as_object() {
2748 let prop_key = prop_obj
2749 .get("property")
2750 .and_then(|v| v.as_str())
2751 .unwrap_or("");
2752 if (prop_key == "sharedDomains" || prop_key == "shared_domains")
2753 && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
2754 {
2755 for item in arr {
2756 if let Some(s) = item.as_str() {
2757 shared_domains.push(s.to_string());
2758 }
2759 }
2760 }
2761 }
2762 }
2763 }
2764
2765 let mut tags: Vec<Tag> = Vec::new();
2767 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
2768 for item in tags_arr {
2769 if let Some(s) = item.as_str() {
2770 if let Ok(tag) = Tag::from_str(s) {
2772 tags.push(tag);
2773 } else {
2774 tags.push(crate::models::Tag::Simple(s.to_string()));
2776 }
2777 }
2778 }
2779 }
2780
2781 let quality_rules = self.extract_quality_rules(data);
2783
2784 if !shared_domains.is_empty() {
2786 let shared_domains_json: Vec<serde_json::Value> = shared_domains
2787 .iter()
2788 .map(|d| serde_json::Value::String(d.clone()))
2789 .collect();
2790 odcl_metadata.insert(
2791 "sharedDomains".to_string(),
2792 serde_json::Value::Array(shared_domains_json),
2793 );
2794 }
2795
2796 let table_uuid = self.extract_table_uuid(data);
2797
2798 let table = Table {
2799 id: table_uuid,
2800 name: model_name.clone(),
2801 columns,
2802 database_type,
2803 catalog_name,
2804 schema_name,
2805 medallion_layers: Vec::new(),
2806 scd_pattern: None,
2807 data_vault_classification: None,
2808 modeling_level: None,
2809 tags,
2810 odcl_metadata,
2811 owner: None,
2812 sla: None,
2813 contact_details: None,
2814 infrastructure_type: None,
2815 notes: None,
2816 position: None,
2817 yaml_file_path: None,
2818 drawio_cell_id: None,
2819 quality: quality_rules,
2820 errors: Vec::new(),
2821 created_at: chrono::Utc::now(),
2822 updated_at: chrono::Utc::now(),
2823 };
2824
2825 info!(
2826 "Parsed Data Contract table: {} with {} warnings/errors",
2827 model_name,
2828 errors.len()
2829 );
2830 Ok((table, errors))
2831 }
2832
2833 fn parse_data_contract_field(
2835 &self,
2836 field_name: &str,
2837 field_data: &serde_json::Map<String, JsonValue>,
2838 data: &JsonValue,
2839 errors: &mut Vec<ParserError>,
2840 ) -> Result<Vec<Column>> {
2841 let mut columns = Vec::new();
2842
2843 let extract_quality_from_obj =
2845 |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
2846 let mut quality_rules = Vec::new();
2847 if let Some(quality_val) = obj.get("quality") {
2848 if let Some(arr) = quality_val.as_array() {
2849 for item in arr {
2851 if let Some(rule_obj) = item.as_object() {
2852 let mut rule = HashMap::new();
2853 for (key, value) in rule_obj {
2854 rule.insert(key.clone(), json_value_to_serde_value(value));
2855 }
2856 quality_rules.push(rule);
2857 }
2858 }
2859 } else if let Some(rule_obj) = quality_val.as_object() {
2860 let mut rule = HashMap::new();
2862 for (key, value) in rule_obj {
2863 rule.insert(key.clone(), json_value_to_serde_value(value));
2864 }
2865 quality_rules.push(rule);
2866 }
2867 }
2868 quality_rules
2869 };
2870
2871 let description = field_data
2873 .get("description")
2874 .and_then(|v| v.as_str())
2875 .unwrap_or("")
2876 .to_string();
2877
2878 let mut quality_rules = extract_quality_from_obj(field_data);
2880
2881 if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
2883 let ref_path = Some(ref_str.to_string());
2885
2886 if let Some(definition) = resolve_ref(ref_str, data) {
2887 if quality_rules.is_empty() {
2891 if let Some(def_obj) = definition.as_object() {
2892 quality_rules = extract_quality_from_obj(def_obj);
2893 }
2894 } else {
2895 if let Some(def_obj) = definition.as_object() {
2897 let def_quality = extract_quality_from_obj(def_obj);
2898 quality_rules.extend(def_quality);
2900 }
2901 }
2902
2903 let required = field_data
2904 .get("required")
2905 .and_then(|v| v.as_bool())
2906 .unwrap_or(false);
2907
2908 let has_nested = definition
2914 .get("type")
2915 .and_then(|v| v.as_str())
2916 .map(|s| s == "object")
2917 .unwrap_or(false)
2918 || definition.get("properties").is_some()
2919 || definition.get("fields").is_some();
2920
2921 if has_nested {
2922 if let Some(properties) =
2924 definition.get("properties").and_then(|v| v.as_object())
2925 {
2926 let nested_required: Vec<String> = definition
2928 .get("required")
2929 .and_then(|v| v.as_array())
2930 .map(|arr| {
2931 arr.iter()
2932 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2933 .collect()
2934 })
2935 .unwrap_or_default();
2936
2937 for (nested_name, nested_schema) in properties {
2938 let nested_required_field = nested_required.contains(nested_name);
2939 expand_nested_column(
2940 &format!("{}.{}", field_name, nested_name),
2941 nested_schema,
2942 !nested_required_field,
2943 &mut columns,
2944 errors,
2945 );
2946 }
2947 } else if let Some(fields) =
2948 definition.get("fields").and_then(|v| v.as_object())
2949 {
2950 for (nested_name, nested_schema) in fields {
2952 expand_nested_column(
2953 &format!("{}.{}", field_name, nested_name),
2954 nested_schema,
2955 true, &mut columns,
2957 errors,
2958 );
2959 }
2960 } else {
2961 let def_physical_type = definition
2963 .get("physicalType")
2964 .and_then(|v| v.as_str())
2965 .map(|s| s.to_string());
2966 columns.push(Column {
2967 name: field_name.to_string(),
2968 data_type: "OBJECT".to_string(),
2969 physical_type: def_physical_type,
2970 nullable: !required,
2971 description: if description.is_empty() {
2972 definition
2973 .get("description")
2974 .and_then(|v| v.as_str())
2975 .unwrap_or("")
2976 .to_string()
2977 } else {
2978 description.clone()
2979 },
2980 quality: quality_rules.clone(),
2981 relationships: ref_to_relationships(&ref_path),
2982 ..Default::default()
2983 });
2984 }
2985 } else {
2986 let def_type = definition
2988 .get("type")
2989 .and_then(|v| v.as_str())
2990 .unwrap_or("STRING")
2991 .to_uppercase();
2992
2993 let enum_values = definition
2994 .get("enum")
2995 .and_then(|v| v.as_array())
2996 .map(|arr| {
2997 arr.iter()
2998 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2999 .collect()
3000 })
3001 .unwrap_or_default();
3002
3003 let def_physical_type = definition
3004 .get("physicalType")
3005 .and_then(|v| v.as_str())
3006 .map(|s| s.to_string());
3007
3008 columns.push(Column {
3009 name: field_name.to_string(),
3010 data_type: def_type,
3011 physical_type: def_physical_type,
3012 nullable: !required,
3013 description: if description.is_empty() {
3014 definition
3015 .get("description")
3016 .and_then(|v| v.as_str())
3017 .unwrap_or("")
3018 .to_string()
3019 } else {
3020 description
3021 },
3022 quality: quality_rules,
3023 relationships: ref_to_relationships(&ref_path),
3024 enum_values,
3025 ..Default::default()
3026 });
3027 }
3028 return Ok(columns);
3029 } else {
3030 let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
3032 let mut error_map = HashMap::new();
3033 error_map.insert("type".to_string(), serde_json::json!("validation_error"));
3034 error_map.insert("field".to_string(), serde_json::json!("data_type"));
3035 error_map.insert(
3036 "message".to_string(),
3037 serde_json::json!(format!(
3038 "Field '{}' references undefined definition: {}",
3039 field_name, ref_str
3040 )),
3041 );
3042 col_errors.push(error_map);
3043 let field_physical_type = field_data
3044 .get("physicalType")
3045 .and_then(|v| v.as_str())
3046 .map(|s| s.to_string());
3047 columns.push(Column {
3048 name: field_name.to_string(),
3049 data_type: "OBJECT".to_string(),
3050 physical_type: field_physical_type,
3051 description,
3052 errors: col_errors,
3053 relationships: ref_to_relationships(&Some(ref_str.to_string())),
3054 ..Default::default()
3055 });
3056 return Ok(columns);
3057 }
3058 }
3059
3060 let field_type_str = field_data
3063 .get("logicalType")
3064 .and_then(|v| v.as_str())
3065 .or_else(|| field_data.get("type").and_then(|v| v.as_str()))
3066 .unwrap_or("STRING");
3067
3068 if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
3070 match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
3071 Ok(nested_cols) if !nested_cols.is_empty() => {
3072 let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
3074 "ARRAY<STRUCT<...>>".to_string()
3075 } else {
3076 "STRUCT<...>".to_string()
3077 };
3078
3079 let struct_physical_type = field_data
3080 .get("physicalType")
3081 .and_then(|v| v.as_str())
3082 .map(|s| s.to_string());
3083
3084 columns.push(Column {
3086 name: field_name.to_string(),
3087 data_type: parent_data_type,
3088 physical_type: struct_physical_type,
3089 nullable: !field_data
3090 .get("required")
3091 .and_then(|v| v.as_bool())
3092 .unwrap_or(false),
3093 description: description.clone(),
3094 quality: quality_rules.clone(),
3095 relationships: ref_to_relationships(
3096 &field_data
3097 .get("$ref")
3098 .and_then(|v| v.as_str())
3099 .map(|s| s.to_string()),
3100 ),
3101 ..Default::default()
3102 });
3103
3104 columns.extend(nested_cols);
3106 return Ok(columns);
3107 }
3108 Ok(_) | Err(_) => {
3109 }
3111 }
3112 }
3113
3114 let field_type = normalize_data_type(field_type_str);
3115
3116 if field_type == "ARRAY" {
3118 let items = field_data.get("items");
3119 if let Some(items_val) = items {
3120 if let Some(items_obj) = items_val.as_object() {
3121 let items_type = items_obj
3124 .get("logicalType")
3125 .and_then(|v| v.as_str())
3126 .or_else(|| items_obj.get("type").and_then(|v| v.as_str()));
3127
3128 let normalized_items_type = match items_type {
3130 Some("object") | Some("struct") => Some("object"),
3131 Some("array") => Some("array"),
3132 Some("string") | Some("varchar") | Some("char") | Some("text") => {
3133 Some("string")
3134 }
3135 Some("integer") | Some("int") | Some("bigint") | Some("smallint")
3136 | Some("tinyint") => Some("integer"),
3137 Some("number") | Some("decimal") | Some("double") | Some("float")
3138 | Some("numeric") => Some("number"),
3139 Some("boolean") | Some("bool") => Some("boolean"),
3140 Some("date") => Some("date"),
3141 Some("timestamp") | Some("datetime") => Some("timestamp"),
3142 Some("time") => Some("time"),
3143 other => other,
3144 };
3145
3146 if items_obj.get("fields").is_some()
3147 || items_obj.get("properties").is_some()
3148 || normalized_items_type == Some("object")
3149 {
3150 let array_physical_type = field_data
3152 .get("physicalType")
3153 .and_then(|v| v.as_str())
3154 .map(|s| s.to_string());
3155 columns.push(Column {
3156 name: field_name.to_string(),
3157 data_type: "ARRAY<OBJECT>".to_string(),
3158 physical_type: array_physical_type,
3159 nullable: !field_data
3160 .get("required")
3161 .and_then(|v| v.as_bool())
3162 .unwrap_or(false),
3163 description: field_data
3164 .get("description")
3165 .and_then(|v| v.as_str())
3166 .unwrap_or("")
3167 .to_string(),
3168 ..Default::default()
3169 });
3170
3171 let properties_obj =
3174 items_obj.get("properties").and_then(|v| v.as_object());
3175 let properties_arr = items_obj.get("properties").and_then(|v| v.as_array());
3176 let fields_obj = items_obj.get("fields").and_then(|v| v.as_object());
3177
3178 if let Some(fields_map) = properties_obj.or(fields_obj) {
3179 for (nested_field_name, nested_field_data) in fields_map {
3181 if let Some(nested_field_obj) = nested_field_data.as_object() {
3182 let nested_field_type = nested_field_obj
3184 .get("logicalType")
3185 .and_then(|v| v.as_str())
3186 .or_else(|| {
3187 nested_field_obj.get("type").and_then(|v| v.as_str())
3188 })
3189 .unwrap_or("STRING");
3190
3191 let nested_col_name =
3193 format!("{}.[].{}", field_name, nested_field_name);
3194 let mut local_errors = Vec::new();
3195 match self.parse_data_contract_field(
3196 &nested_col_name,
3197 nested_field_obj,
3198 data,
3199 &mut local_errors,
3200 ) {
3201 Ok(mut nested_cols) => {
3202 columns.append(&mut nested_cols);
3205 }
3206 Err(_) => {
3207 let nested_physical_type = nested_field_obj
3209 .get("physicalType")
3210 .and_then(|v| v.as_str())
3211 .map(|s| s.to_string());
3212 columns.push(Column {
3213 name: nested_col_name,
3214 data_type: nested_field_type.to_uppercase(),
3215 physical_type: nested_physical_type,
3216 nullable: !nested_field_obj
3217 .get("required")
3218 .and_then(|v| v.as_bool())
3219 .unwrap_or(false),
3220 description: nested_field_obj
3221 .get("description")
3222 .and_then(|v| v.as_str())
3223 .unwrap_or("")
3224 .to_string(),
3225 ..Default::default()
3226 });
3227 }
3228 }
3229 }
3230 }
3231 } else if let Some(properties_list) = properties_arr {
3232 let mut local_errors = Vec::new();
3234 for prop_data in properties_list {
3235 if let Some(prop_obj) = prop_data.as_object() {
3236 let nested_field_name = prop_obj
3238 .get("name")
3239 .or_else(|| prop_obj.get("id"))
3240 .and_then(|v| v.as_str())
3241 .unwrap_or("");
3242
3243 if !nested_field_name.is_empty() {
3244 let nested_col_name =
3246 format!("{}.[].{}", field_name, nested_field_name);
3247 match self.parse_data_contract_field(
3248 &nested_col_name,
3249 prop_obj,
3250 data,
3251 &mut local_errors,
3252 ) {
3253 Ok(mut nested_cols) => {
3254 columns.append(&mut nested_cols);
3257 }
3258 Err(_) => {
3259 let nested_field_type = prop_obj
3262 .get("logicalType")
3263 .and_then(|v| v.as_str())
3264 .or_else(|| {
3265 prop_obj
3266 .get("type")
3267 .and_then(|v| v.as_str())
3268 })
3269 .unwrap_or("STRING");
3270 let nested_physical_type = prop_obj
3271 .get("physicalType")
3272 .and_then(|v| v.as_str())
3273 .map(|s| s.to_string());
3274 columns.push(Column {
3275 name: nested_col_name,
3276 data_type: nested_field_type.to_uppercase(),
3277 physical_type: nested_physical_type,
3278 nullable: !prop_obj
3279 .get("required")
3280 .and_then(|v| v.as_bool())
3281 .unwrap_or(false),
3282 description: prop_obj
3283 .get("description")
3284 .and_then(|v| v.as_str())
3285 .unwrap_or("")
3286 .to_string(),
3287 ..Default::default()
3288 });
3289 }
3290 }
3291 }
3292 }
3293 }
3294 }
3295
3296 return Ok(columns);
3297 } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
3298 let array_physical_type = field_data
3300 .get("physicalType")
3301 .and_then(|v| v.as_str())
3302 .map(|s| s.to_string());
3303 columns.push(Column {
3304 name: field_name.to_string(),
3305 data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
3306 physical_type: array_physical_type,
3307 nullable: !field_data
3308 .get("required")
3309 .and_then(|v| v.as_bool())
3310 .unwrap_or(false),
3311 description: description.clone(),
3312 quality: quality_rules.clone(),
3313 relationships: ref_to_relationships(
3314 &field_data
3315 .get("$ref")
3316 .and_then(|v| v.as_str())
3317 .map(|s| s.to_string()),
3318 ),
3319 ..Default::default()
3320 });
3321 return Ok(columns);
3322 }
3323 } else if let Some(item_type_str) = items_val.as_str() {
3324 let array_physical_type = field_data
3326 .get("physicalType")
3327 .and_then(|v| v.as_str())
3328 .map(|s| s.to_string());
3329 columns.push(Column {
3330 name: field_name.to_string(),
3331 data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
3332 physical_type: array_physical_type,
3333 nullable: !field_data
3334 .get("required")
3335 .and_then(|v| v.as_bool())
3336 .unwrap_or(false),
3337 description: description.clone(),
3338 quality: quality_rules.clone(),
3339 relationships: ref_to_relationships(
3340 &field_data
3341 .get("$ref")
3342 .and_then(|v| v.as_str())
3343 .map(|s| s.to_string()),
3344 ),
3345 ..Default::default()
3346 });
3347 return Ok(columns);
3348 }
3349 }
3350 let array_physical_type = field_data
3352 .get("physicalType")
3353 .and_then(|v| v.as_str())
3354 .map(|s| s.to_string());
3355 columns.push(Column {
3356 name: field_name.to_string(),
3357 data_type: "ARRAY<STRING>".to_string(),
3358 physical_type: array_physical_type,
3359 nullable: !field_data
3360 .get("required")
3361 .and_then(|v| v.as_bool())
3362 .unwrap_or(false),
3363 description: description.clone(),
3364 quality: quality_rules.clone(),
3365 relationships: ref_to_relationships(
3366 &field_data
3367 .get("$ref")
3368 .and_then(|v| v.as_str())
3369 .map(|s| s.to_string()),
3370 ),
3371 ..Default::default()
3372 });
3373 return Ok(columns);
3374 }
3375
3376 let nested_fields_obj = field_data
3380 .get("properties")
3381 .and_then(|v| v.as_object())
3382 .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
3383 let nested_fields_arr = field_data.get("properties").and_then(|v| v.as_array());
3384
3385 if field_type == "OBJECT" && (nested_fields_obj.is_some() || nested_fields_arr.is_some()) {
3386 let parent_physical_type = field_data
3390 .get("physicalType")
3391 .and_then(|v| v.as_str())
3392 .map(|s| s.to_string());
3393
3394 columns.push(Column {
3396 name: field_name.to_string(),
3397 data_type: "OBJECT".to_string(),
3398 physical_type: parent_physical_type,
3399 nullable: !field_data
3400 .get("required")
3401 .and_then(|v| v.as_bool())
3402 .unwrap_or(false),
3403 description: description.clone(),
3404 quality: quality_rules.clone(),
3405 relationships: ref_to_relationships(
3406 &field_data
3407 .get("$ref")
3408 .and_then(|v| v.as_str())
3409 .map(|s| s.to_string()),
3410 ),
3411 ..Default::default()
3412 });
3413
3414 if let Some(fields_obj) = nested_fields_obj {
3416 for (nested_field_name, nested_field_data) in fields_obj {
3418 if let Some(nested_field_obj) = nested_field_data.as_object() {
3419 let nested_field_type = nested_field_obj
3420 .get("logicalType")
3421 .and_then(|v| v.as_str())
3422 .or_else(|| nested_field_obj.get("type").and_then(|v| v.as_str()))
3423 .unwrap_or("STRING");
3424
3425 let nested_col_name = format!("{}.{}", field_name, nested_field_name);
3427 match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data)
3428 {
3429 Ok(mut nested_cols) => {
3430 columns.append(&mut nested_cols);
3433 }
3434 Err(_) => {
3435 let nested_physical_type = nested_field_obj
3437 .get("physicalType")
3438 .and_then(|v| v.as_str())
3439 .map(|s| s.to_string());
3440 columns.push(Column {
3441 name: nested_col_name,
3442 data_type: nested_field_type.to_uppercase(),
3443 physical_type: nested_physical_type,
3444 nullable: !nested_field_obj
3445 .get("required")
3446 .and_then(|v| v.as_bool())
3447 .unwrap_or(false),
3448 description: nested_field_obj
3449 .get("description")
3450 .and_then(|v| v.as_str())
3451 .unwrap_or("")
3452 .to_string(),
3453 ..Default::default()
3454 });
3455 }
3456 }
3457 }
3458 }
3459 } else if let Some(fields_arr) = nested_fields_arr {
3460 for prop_data in fields_arr {
3462 if let Some(prop_obj) = prop_data.as_object() {
3463 let nested_field_name = prop_obj
3465 .get("name")
3466 .or_else(|| prop_obj.get("id"))
3467 .and_then(|v| v.as_str())
3468 .unwrap_or("");
3469
3470 if !nested_field_name.is_empty() {
3471 let nested_field_type = prop_obj
3472 .get("logicalType")
3473 .and_then(|v| v.as_str())
3474 .or_else(|| prop_obj.get("type").and_then(|v| v.as_str()))
3475 .unwrap_or("STRING");
3476
3477 let nested_col_name = format!("{}.{}", field_name, nested_field_name);
3479 match self.parse_odcl_v3_property(&nested_col_name, prop_obj, data) {
3480 Ok(mut nested_cols) => {
3481 columns.append(&mut nested_cols);
3484 }
3485 Err(_) => {
3486 let nested_physical_type = prop_obj
3488 .get("physicalType")
3489 .and_then(|v| v.as_str())
3490 .map(|s| s.to_string());
3491 columns.push(Column {
3492 name: nested_col_name,
3493 data_type: nested_field_type.to_uppercase(),
3494 physical_type: nested_physical_type,
3495 nullable: !prop_obj
3496 .get("required")
3497 .and_then(|v| v.as_bool())
3498 .unwrap_or(false),
3499 description: prop_obj
3500 .get("description")
3501 .and_then(|v| v.as_str())
3502 .unwrap_or("")
3503 .to_string(),
3504 ..Default::default()
3505 });
3506 }
3507 }
3508 }
3509 }
3510 }
3511 }
3512
3513 return Ok(columns);
3514 }
3515
3516 let required = field_data
3518 .get("required")
3519 .and_then(|v| v.as_bool())
3520 .unwrap_or(false);
3521
3522 let field_description = if description.is_empty() {
3524 field_data
3525 .get("description")
3526 .and_then(|v| v.as_str())
3527 .unwrap_or("")
3528 .to_string()
3529 } else {
3530 description
3531 };
3532
3533 let mut column_quality_rules = quality_rules;
3535
3536 if column_quality_rules.is_empty()
3538 && let Some(quality_val) = field_data.get("quality")
3539 {
3540 if let Some(arr) = quality_val.as_array() {
3541 for item in arr {
3543 if let Some(obj) = item.as_object() {
3544 let mut rule = HashMap::new();
3545 for (key, value) in obj {
3546 rule.insert(key.clone(), json_value_to_serde_value(value));
3547 }
3548 column_quality_rules.push(rule);
3549 }
3550 }
3551 } else if let Some(obj) = quality_val.as_object() {
3552 let mut rule = HashMap::new();
3554 for (key, value) in obj {
3555 rule.insert(key.clone(), json_value_to_serde_value(value));
3556 }
3557 column_quality_rules.push(rule);
3558 }
3559 }
3560
3561 let physical_type = field_data
3567 .get("physicalType")
3568 .and_then(|v| v.as_str())
3569 .map(|s| s.to_string());
3570
3571 let column = self.parse_column_metadata_from_field(
3573 field_name,
3574 &field_type,
3575 physical_type,
3576 !required,
3577 field_description,
3578 column_quality_rules,
3579 field_data,
3580 );
3581
3582 columns.push(column);
3583
3584 Ok(columns)
3585 }
3586
3587 #[allow(clippy::too_many_arguments)]
3596 fn parse_column_metadata_from_field(
3597 &self,
3598 name: &str,
3599 data_type: &str,
3600 physical_type: Option<String>,
3601 nullable: bool,
3602 description: String,
3603 quality: Vec<HashMap<String, serde_json::Value>>,
3604 field_data: &serde_json::Map<String, JsonValue>,
3605 ) -> Column {
3606 use crate::models::{AuthoritativeDefinition, LogicalTypeOptions};
3607
3608 let business_name = field_data
3610 .get("businessName")
3611 .and_then(|v| v.as_str())
3612 .map(|s| s.to_string());
3613
3614 let physical_name = field_data
3616 .get("physicalName")
3617 .and_then(|v| v.as_str())
3618 .map(|s| s.to_string());
3619
3620 let logical_type_options = field_data.get("logicalTypeOptions").and_then(|v| {
3622 v.as_object().map(|opts| LogicalTypeOptions {
3623 min_length: opts.get("minLength").and_then(|v| v.as_i64()),
3624 max_length: opts.get("maxLength").and_then(|v| v.as_i64()),
3625 pattern: opts
3626 .get("pattern")
3627 .and_then(|v| v.as_str())
3628 .map(|s| s.to_string()),
3629 format: opts
3630 .get("format")
3631 .and_then(|v| v.as_str())
3632 .map(|s| s.to_string()),
3633 minimum: opts.get("minimum").cloned(),
3634 maximum: opts.get("maximum").cloned(),
3635 exclusive_minimum: opts.get("exclusiveMinimum").cloned(),
3636 exclusive_maximum: opts.get("exclusiveMaximum").cloned(),
3637 precision: opts
3638 .get("precision")
3639 .and_then(|v| v.as_i64())
3640 .map(|n| n as i32),
3641 scale: opts.get("scale").and_then(|v| v.as_i64()).map(|n| n as i32),
3642 })
3643 });
3644
3645 let primary_key = field_data
3647 .get("primaryKey")
3648 .and_then(|v| v.as_bool())
3649 .unwrap_or(false);
3650
3651 let primary_key_position = field_data
3653 .get("primaryKeyPosition")
3654 .and_then(|v| v.as_i64())
3655 .map(|n| n as i32);
3656
3657 let unique = field_data
3659 .get("unique")
3660 .and_then(|v| v.as_bool())
3661 .unwrap_or(false);
3662
3663 let partitioned = field_data
3665 .get("partitioned")
3666 .and_then(|v| v.as_bool())
3667 .unwrap_or(false);
3668
3669 let partition_key_position = field_data
3671 .get("partitionKeyPosition")
3672 .and_then(|v| v.as_i64())
3673 .map(|n| n as i32);
3674
3675 let clustered = field_data
3677 .get("clustered")
3678 .and_then(|v| v.as_bool())
3679 .unwrap_or(false);
3680
3681 let classification = field_data
3683 .get("classification")
3684 .and_then(|v| v.as_str())
3685 .map(|s| s.to_string());
3686
3687 let critical_data_element = field_data
3689 .get("criticalDataElement")
3690 .and_then(|v| v.as_bool())
3691 .unwrap_or(false);
3692
3693 let encrypted_name = field_data
3695 .get("encryptedName")
3696 .and_then(|v| v.as_str())
3697 .map(|s| s.to_string());
3698
3699 let transform_source_objects = field_data
3701 .get("transformSourceObjects")
3702 .and_then(|v| v.as_array())
3703 .map(|arr| {
3704 arr.iter()
3705 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3706 .collect()
3707 })
3708 .unwrap_or_default();
3709
3710 let transform_logic = field_data
3712 .get("transformLogic")
3713 .and_then(|v| v.as_str())
3714 .map(|s| s.to_string());
3715
3716 let transform_description = field_data
3718 .get("transformDescription")
3719 .and_then(|v| v.as_str())
3720 .map(|s| s.to_string());
3721
3722 let examples = field_data
3724 .get("examples")
3725 .and_then(|v| v.as_array())
3726 .map(|arr| arr.to_vec())
3727 .unwrap_or_default();
3728
3729 let authoritative_definitions = field_data
3731 .get("authoritativeDefinitions")
3732 .and_then(|v| v.as_array())
3733 .map(|arr| {
3734 arr.iter()
3735 .filter_map(|item| {
3736 item.as_object().map(|obj| AuthoritativeDefinition {
3737 definition_type: obj
3738 .get("type")
3739 .and_then(|v| v.as_str())
3740 .unwrap_or("")
3741 .to_string(),
3742 url: obj
3743 .get("url")
3744 .and_then(|v| v.as_str())
3745 .unwrap_or("")
3746 .to_string(),
3747 })
3748 })
3749 .collect()
3750 })
3751 .unwrap_or_default();
3752
3753 let tags = field_data
3755 .get("tags")
3756 .and_then(|v| v.as_array())
3757 .map(|arr| {
3758 arr.iter()
3759 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3760 .collect()
3761 })
3762 .unwrap_or_default();
3763
3764 let custom_properties = field_data
3766 .get("customProperties")
3767 .and_then(|v| v.as_array())
3768 .map(|arr| {
3769 arr.iter()
3770 .filter_map(|item| {
3771 item.as_object().and_then(|obj| {
3772 let key = obj.get("property").and_then(|v| v.as_str())?;
3773 let value = obj.get("value").cloned()?;
3774 Some((key.to_string(), value))
3775 })
3776 })
3777 .collect()
3778 })
3779 .unwrap_or_default();
3780
3781 let secondary_key = field_data
3783 .get("businessKey")
3784 .and_then(|v| v.as_bool())
3785 .unwrap_or(false);
3786
3787 let enum_values = field_data
3789 .get("enum")
3790 .and_then(|v| v.as_array())
3791 .map(|arr| {
3792 arr.iter()
3793 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3794 .collect()
3795 })
3796 .unwrap_or_default();
3797
3798 let constraints = field_data
3800 .get("constraints")
3801 .and_then(|v| v.as_array())
3802 .map(|arr| {
3803 arr.iter()
3804 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3805 .collect()
3806 })
3807 .unwrap_or_default();
3808
3809 Column {
3810 name: name.to_string(),
3811 data_type: data_type.to_string(),
3812 physical_type,
3813 physical_name,
3814 nullable,
3815 description,
3816 quality,
3817 business_name,
3818 logical_type_options,
3819 primary_key,
3820 primary_key_position,
3821 unique,
3822 partitioned,
3823 partition_key_position,
3824 clustered,
3825 classification,
3826 critical_data_element,
3827 encrypted_name,
3828 transform_source_objects,
3829 transform_logic,
3830 transform_description,
3831 examples,
3832 authoritative_definitions,
3833 tags,
3834 custom_properties,
3835 secondary_key,
3836 enum_values,
3837 constraints,
3838 foreign_key: self.parse_foreign_key_from_data_contract(field_data),
3839 relationships: self.parse_relationships_from_field(field_data),
3840 ..Default::default()
3841 }
3842 }
3843
3844 fn parse_foreign_key_from_data_contract(
3846 &self,
3847 field_data: &serde_json::Map<String, JsonValue>,
3848 ) -> Option<ForeignKey> {
3849 field_data
3850 .get("foreignKey")
3851 .and_then(|v| v.as_object())
3852 .map(|fk_obj| ForeignKey {
3853 table_id: fk_obj
3854 .get("table")
3855 .or_else(|| fk_obj.get("table_id"))
3856 .and_then(|v| v.as_str())
3857 .unwrap_or("")
3858 .to_string(),
3859 column_name: fk_obj
3860 .get("column")
3861 .or_else(|| fk_obj.get("column_name"))
3862 .and_then(|v| v.as_str())
3863 .unwrap_or("")
3864 .to_string(),
3865 })
3866 }
3867
3868 fn parse_relationships_from_field(
3871 &self,
3872 field_data: &serde_json::Map<String, JsonValue>,
3873 ) -> Vec<PropertyRelationship> {
3874 let mut relationships = Vec::new();
3875
3876 if let Some(rels_array) = field_data.get("relationships").and_then(|v| v.as_array()) {
3878 for rel in rels_array {
3879 if let Some(rel_obj) = rel.as_object() {
3880 let rel_type = rel_obj
3881 .get("type")
3882 .and_then(|v| v.as_str())
3883 .unwrap_or("foreignKey")
3884 .to_string();
3885 let to = rel_obj
3886 .get("to")
3887 .and_then(|v| v.as_str())
3888 .unwrap_or("")
3889 .to_string();
3890
3891 if !to.is_empty() {
3892 relationships.push(PropertyRelationship {
3893 relationship_type: rel_type,
3894 to,
3895 });
3896 }
3897 }
3898 }
3899 }
3900
3901 if relationships.is_empty()
3903 && let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str())
3904 {
3905 let to = if ref_str.starts_with("#/definitions/") {
3907 let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
3908 format!("definitions/{}", def_path)
3909 } else if ref_str.starts_with("#/") {
3910 ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
3911 } else {
3912 ref_str.to_string()
3913 };
3914
3915 relationships.push(PropertyRelationship {
3916 relationship_type: "foreignKey".to_string(),
3917 to,
3918 });
3919 }
3920
3921 relationships
3922 }
3923
3924 fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
3926 if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
3928 if let Some((_, server_data)) = servers_obj.iter().next()
3930 && let Some(server_obj) = server_data.as_object()
3931 {
3932 return server_obj
3933 .get("type")
3934 .and_then(|v| v.as_str())
3935 .and_then(|s| self.parse_database_type(s));
3936 }
3937 } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
3938 if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
3940 return server_obj
3941 .get("type")
3942 .and_then(|v| v.as_str())
3943 .and_then(|s| self.parse_database_type(s));
3944 }
3945 }
3946 None
3947 }
3948
3949 fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
3951 match s.to_lowercase().as_str() {
3952 "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
3953 "postgres" | "postgresql" => Some(DatabaseType::Postgres),
3954 "mysql" => Some(DatabaseType::Mysql),
3955 "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
3956 "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
3957 _ => None,
3958 }
3959 }
3960
3961 pub fn parse_struct_type_from_string(
3965 &self,
3966 field_name: &str,
3967 type_str: &str,
3968 field_data: &serde_json::Map<String, JsonValue>,
3969 ) -> Result<Vec<Column>> {
3970 let mut columns = Vec::new();
3971
3972 let normalized_type = type_str
3974 .lines()
3975 .map(|line| line.trim())
3976 .filter(|line| !line.is_empty())
3977 .collect::<Vec<_>>()
3978 .join(" ");
3979
3980 let type_str_upper = normalized_type.to_uppercase();
3981
3982 let is_array = type_str_upper.starts_with("ARRAY<");
3984 let struct_start = type_str_upper.find("STRUCT<");
3985
3986 if let Some(start_pos) = struct_start {
3987 let struct_content_start = start_pos + 7; let struct_content = &normalized_type[struct_content_start..];
3991
3992 let mut depth = 1;
3995 let mut end_pos = None;
3996 for (i, ch) in struct_content.char_indices() {
3997 match ch {
3998 '<' => depth += 1,
3999 '>' => {
4000 depth -= 1;
4001 if depth == 0 {
4002 end_pos = Some(i);
4003 break;
4004 }
4005 }
4006 _ => {}
4007 }
4008 }
4009
4010 let struct_fields_str = if let Some(end) = end_pos {
4012 &struct_content[..end]
4013 } else {
4014 struct_content.trim_end_matches('>').trim()
4016 };
4017
4018 let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
4020
4021 for (nested_name, nested_type) in fields {
4025 let nested_type_upper = nested_type.to_uppercase();
4026 let nested_col_name = if is_array {
4027 format!("{}.[].{}", field_name, nested_name)
4028 } else {
4029 format!("{}.{}", field_name, nested_name)
4030 };
4031
4032 let is_nested_struct = nested_type_upper.starts_with("STRUCT<");
4035 let is_nested_array_struct = nested_type_upper.starts_with("ARRAY<STRUCT<");
4036
4037 if is_nested_struct || is_nested_array_struct {
4038 match self.parse_struct_type_from_string(
4046 &nested_col_name,
4047 &nested_type,
4048 field_data,
4049 ) {
4050 Ok(nested_cols) => {
4051 columns.extend(nested_cols);
4058 }
4059 Err(_) => {
4060 let fallback_data_type = if is_nested_array_struct {
4062 "ARRAY<STRUCT<...>>".to_string()
4063 } else {
4064 "STRUCT<...>".to_string()
4065 };
4066 let nested_physical_type = field_data
4067 .get("physicalType")
4068 .and_then(|v| v.as_str())
4069 .map(|s| s.to_string());
4070 columns.push(Column {
4071 name: nested_col_name,
4072 data_type: fallback_data_type,
4073 physical_type: nested_physical_type,
4074 nullable: !field_data
4075 .get("required")
4076 .and_then(|v| v.as_bool())
4077 .unwrap_or(false),
4078 description: field_data
4079 .get("description")
4080 .and_then(|v| v.as_str())
4081 .unwrap_or("")
4082 .to_string(),
4083 ..Default::default()
4084 });
4085 }
4086 }
4087 } else if nested_type_upper.starts_with("ARRAY<") {
4088 let nested_physical_type = field_data
4091 .get("physicalType")
4092 .and_then(|v| v.as_str())
4093 .map(|s| s.to_string());
4094 columns.push(Column {
4095 name: nested_col_name,
4096 data_type: normalize_data_type(&nested_type),
4097 physical_type: nested_physical_type,
4098 nullable: !field_data
4099 .get("required")
4100 .and_then(|v| v.as_bool())
4101 .unwrap_or(false),
4102 description: field_data
4103 .get("description")
4104 .and_then(|v| v.as_str())
4105 .unwrap_or("")
4106 .to_string(),
4107 ..Default::default()
4108 });
4109 } else {
4110 let nested_physical_type = field_data
4112 .get("physicalType")
4113 .and_then(|v| v.as_str())
4114 .map(|s| s.to_string());
4115 columns.push(Column {
4116 name: nested_col_name,
4117 data_type: normalize_data_type(&nested_type),
4118 physical_type: nested_physical_type,
4119 nullable: !field_data
4120 .get("required")
4121 .and_then(|v| v.as_bool())
4122 .unwrap_or(false),
4123 description: field_data
4124 .get("description")
4125 .and_then(|v| v.as_str())
4126 .unwrap_or("")
4127 .to_string(),
4128 ..Default::default()
4129 });
4130 }
4131 }
4132
4133 return Ok(columns);
4134 }
4135
4136 Ok(Vec::new())
4138 }
4139
4140 fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
4142 let mut fields = Vec::new();
4143 let mut current_field = String::new();
4144 let mut depth = 0;
4145 let mut in_string = false;
4146 let mut string_char = None;
4147
4148 for ch in fields_str.chars() {
4149 match ch {
4150 '\'' | '"' if !in_string || Some(ch) == string_char => {
4151 if in_string {
4152 in_string = false;
4153 string_char = None;
4154 } else {
4155 in_string = true;
4156 string_char = Some(ch);
4157 }
4158 current_field.push(ch);
4159 }
4160 '<' if !in_string => {
4161 depth += 1;
4162 current_field.push(ch);
4163 }
4164 '>' if !in_string => {
4165 depth -= 1;
4166 current_field.push(ch);
4167 }
4168 ',' if !in_string && depth == 0 => {
4169 let trimmed = current_field.trim();
4171 if !trimmed.is_empty()
4172 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
4173 {
4174 fields.push((name, type_part));
4175 }
4176 current_field.clear();
4177 }
4178 _ => {
4179 current_field.push(ch);
4180 }
4181 }
4182 }
4183
4184 let trimmed = current_field.trim();
4186 if !trimmed.is_empty()
4187 && let Some((name, type_part)) = self.parse_field_definition(trimmed)
4188 {
4189 fields.push((name, type_part));
4190 }
4191
4192 Ok(fields)
4193 }
4194
4195 fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
4197 let colon_pos = field_def.find(':')?;
4199 let name = field_def[..colon_pos].trim().to_string();
4200 let type_part = field_def[colon_pos + 1..].trim().to_string();
4201
4202 if name.is_empty() || type_part.is_empty() {
4203 return None;
4204 }
4205
4206 Some((name, type_part))
4207 }
4208
4209 fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
4211 let mut catalog_name = None;
4212 let mut schema_name = None;
4213
4214 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
4215 for prop in custom_props {
4216 if let Some(prop_obj) = prop.as_object() {
4217 let prop_key = prop_obj
4218 .get("property")
4219 .and_then(|v| v.as_str())
4220 .unwrap_or("");
4221 let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
4222
4223 match prop_key {
4224 "catalogName" | "catalog_name" => {
4225 catalog_name = prop_value.map(|s| s.to_string());
4226 }
4227 "schemaName" | "schema_name" => {
4228 schema_name = prop_value.map(|s| s.to_string());
4229 }
4230 _ => {}
4231 }
4232 }
4233 }
4234 }
4235
4236 if catalog_name.is_none() {
4238 catalog_name = data
4239 .get("catalog_name")
4240 .and_then(|v| v.as_str())
4241 .map(|s| s.to_string());
4242 }
4243 if schema_name.is_none() {
4244 schema_name = data
4245 .get("schema_name")
4246 .and_then(|v| v.as_str())
4247 .map(|s| s.to_string());
4248 }
4249
4250 (catalog_name, schema_name)
4251 }
4252}
4253
4254impl Default for ODCSImporter {
4255 fn default() -> Self {
4256 Self::new()
4257 }
4258}
4259
4260#[cfg(test)]
4261mod tests {
4262 use super::*;
4263
4264 #[test]
4265 fn test_parse_simple_odcl_table() {
4266 let mut parser = ODCSImporter::new();
4267 let odcl_yaml = r#"
4268name: users
4269columns:
4270 - name: id
4271 data_type: INT
4272 nullable: false
4273 primary_key: true
4274 - name: name
4275 data_type: VARCHAR(255)
4276 nullable: false
4277database_type: Postgres
4278"#;
4279
4280 let (table, errors) = parser.parse(odcl_yaml).unwrap();
4281 assert_eq!(table.name, "users");
4282 assert_eq!(table.columns.len(), 2);
4283 assert_eq!(table.columns[0].name, "id");
4284 assert_eq!(table.database_type, Some(DatabaseType::Postgres));
4285 assert_eq!(errors.len(), 0);
4286 }
4287
4288 #[test]
4289 fn test_parse_odcl_with_metadata() {
4290 let mut parser = ODCSImporter::new();
4291 let odcl_yaml = r#"
4292name: users
4293columns:
4294 - name: id
4295 data_type: INT
4296medallion_layer: gold
4297scd_pattern: TYPE_2
4298odcl_metadata:
4299 description: "User table"
4300 owner: "data-team"
4301"#;
4302
4303 let (table, errors) = parser.parse(odcl_yaml).unwrap();
4304 assert_eq!(table.medallion_layers.len(), 1);
4305 assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
4306 assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
4307 if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
4308 assert_eq!(desc, "User table");
4309 }
4310 assert_eq!(errors.len(), 0);
4311 }
4312
4313 #[test]
4314 fn test_parse_odcl_with_data_vault() {
4315 let mut parser = ODCSImporter::new();
4316 let odcl_yaml = r#"
4317name: hub_customer
4318columns:
4319 - name: customer_key
4320 data_type: VARCHAR(50)
4321data_vault_classification: Hub
4322"#;
4323
4324 let (table, errors) = parser.parse(odcl_yaml).unwrap();
4325 assert_eq!(
4326 table.data_vault_classification,
4327 Some(DataVaultClassification::Hub)
4328 );
4329 assert_eq!(errors.len(), 0);
4330 }
4331
4332 #[test]
4333 fn test_parse_invalid_odcl() {
4334 let mut parser = ODCSImporter::new();
4335 let invalid_yaml = "not: valid: yaml: structure:";
4336
4337 assert!(parser.parse(invalid_yaml).is_err());
4339 }
4340
4341 #[test]
4342 fn test_parse_odcl_missing_required_fields() {
4343 let mut parser = ODCSImporter::new();
4344 let non_conformant = r#"
4345name: users
4346# Missing required columns field
4347"#;
4348
4349 assert!(parser.parse(non_conformant).is_err());
4351 }
4352
4353 #[test]
4354 fn test_parse_odcl_with_foreign_key() {
4355 let mut parser = ODCSImporter::new();
4356 let odcl_yaml = r#"
4357name: orders
4358columns:
4359 - name: id
4360 data_type: INT
4361 primary_key: true
4362 - name: user_id
4363 data_type: INT
4364 foreign_key:
4365 table_id: users
4366 column_name: id
4367"#;
4368
4369 let (table, errors) = parser.parse(odcl_yaml).unwrap();
4370 assert_eq!(table.columns.len(), 2);
4371 let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
4372 assert!(user_id_col.foreign_key.is_some());
4373 assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
4374 assert_eq!(errors.len(), 0);
4375 }
4376
4377 #[test]
4378 fn test_parse_odcl_with_constraints() {
4379 let mut parser = ODCSImporter::new();
4380 let odcl_yaml = r#"
4381name: products
4382columns:
4383 - name: id
4384 data_type: INT
4385 primary_key: true
4386 - name: name
4387 data_type: VARCHAR(255)
4388 nullable: false
4389 constraints:
4390 - UNIQUE
4391 - NOT NULL
4392"#;
4393
4394 let (table, errors) = parser.parse(odcl_yaml).unwrap();
4395 assert_eq!(table.columns.len(), 2);
4396 let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
4397 assert!(!name_col.nullable);
4398 assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
4399 assert_eq!(errors.len(), 0);
4400 }
4401
4402 #[test]
4403 fn test_parse_odcs_v3_multiple_tables() {
4404 let mut parser = ODCSImporter::new();
4405 let odcs_yaml = r#"
4406apiVersion: v3.1.0
4407kind: DataContract
4408id: 550e8400-e29b-41d4-a716-446655440000
4409version: 1.0.0
4410name: multi_table_contract
4411domain: sales
4412schema:
4413 - name: orders
4414 properties:
4415 - name: order_id
4416 logicalType: integer
4417 primaryKey: true
4418 - name: customer_id
4419 logicalType: integer
4420 - name: total_amount
4421 logicalType: decimal
4422 - name: order_items
4423 properties:
4424 - name: item_id
4425 logicalType: integer
4426 primaryKey: true
4427 - name: order_id
4428 logicalType: integer
4429 - name: product_name
4430 logicalType: string
4431 - name: quantity
4432 logicalType: integer
4433 - name: customers
4434 properties:
4435 - name: customer_id
4436 logicalType: integer
4437 primaryKey: true
4438 - name: name
4439 logicalType: string
4440 - name: email
4441 logicalType: string
4442"#;
4443
4444 let result = parser.import(odcs_yaml).unwrap();
4445
4446 assert_eq!(
4448 result.tables.len(),
4449 3,
4450 "Expected 3 tables, got {}",
4451 result.tables.len()
4452 );
4453
4454 let table_names: Vec<&str> = result
4456 .tables
4457 .iter()
4458 .map(|t| t.name.as_deref().unwrap_or(""))
4459 .collect();
4460 assert!(table_names.contains(&"orders"), "Missing 'orders' table");
4461 assert!(
4462 table_names.contains(&"order_items"),
4463 "Missing 'order_items' table"
4464 );
4465 assert!(
4466 table_names.contains(&"customers"),
4467 "Missing 'customers' table"
4468 );
4469
4470 let orders_table = result
4472 .tables
4473 .iter()
4474 .find(|t| t.name.as_deref() == Some("orders"))
4475 .unwrap();
4476 assert_eq!(
4477 orders_table.columns.len(),
4478 3,
4479 "orders table should have 3 columns"
4480 );
4481
4482 let order_items_table = result
4483 .tables
4484 .iter()
4485 .find(|t| t.name.as_deref() == Some("order_items"))
4486 .unwrap();
4487 assert_eq!(
4488 order_items_table.columns.len(),
4489 4,
4490 "order_items table should have 4 columns"
4491 );
4492
4493 let customers_table = result
4494 .tables
4495 .iter()
4496 .find(|t| t.name.as_deref() == Some("customers"))
4497 .unwrap();
4498 assert_eq!(
4499 customers_table.columns.len(),
4500 3,
4501 "customers table should have 3 columns"
4502 );
4503
4504 for table in &result.tables {
4506 assert_eq!(table.api_version.as_deref(), Some("v3.1.0"));
4507 assert_eq!(table.kind.as_deref(), Some("DataContract"));
4508 assert_eq!(table.domain.as_deref(), Some("sales"));
4509 }
4510
4511 assert_eq!(result.tables[0].table_index, 0);
4513 assert_eq!(result.tables[1].table_index, 1);
4514 assert_eq!(result.tables[2].table_index, 2);
4515 }
4516
4517 #[test]
4518 fn test_parse_odcs_v3_single_table_backwards_compatible() {
4519 let mut parser = ODCSImporter::new();
4521 let odcs_yaml = r#"
4522apiVersion: v3.1.0
4523kind: DataContract
4524id: 550e8400-e29b-41d4-a716-446655440001
4525version: 1.0.0
4526name: single_table_contract
4527schema:
4528 - name: users
4529 properties:
4530 - name: user_id
4531 logicalType: integer
4532 primaryKey: true
4533 - name: username
4534 logicalType: string
4535"#;
4536
4537 let result = parser.import(odcs_yaml).unwrap();
4538
4539 assert_eq!(
4541 result.tables.len(),
4542 1,
4543 "Expected 1 table for single-schema ODCS"
4544 );
4545 assert_eq!(result.tables[0].name.as_deref(), Some("users"));
4546 assert_eq!(result.tables[0].columns.len(), 2);
4547 }
4548}