1use super::odcs_shared::{
13 column_to_column_data, expand_nested_column, extract_catalog_schema, extract_quality_from_obj,
14 extract_shared_domains, json_value_to_serde_value, normalize_data_type,
15 parse_data_vault_classification, parse_foreign_key, parse_foreign_key_from_data_contract,
16 parse_medallion_layer, parse_scd_pattern, parse_struct_fields_from_string, resolve_ref,
17 yaml_to_json_value, ParserError,
18};
19use super::{ImportError, ImportResult, TableData};
20use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
21use crate::models::{Column, PropertyRelationship, Table, Tag};
22use anyhow::{Context, Result};
23use serde_json::Value as JsonValue;
24use std::collections::HashMap;
25use std::str::FromStr;
26use tracing::info;
27
28fn ref_to_relationships(ref_path: &Option<String>) -> Vec<PropertyRelationship> {
31 match ref_path {
32 Some(ref_str) => {
33 let to = if ref_str.starts_with("#/definitions/") {
34 let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
35 format!("definitions/{}", def_path)
36 } else if ref_str.starts_with("#/") {
37 ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
38 } else {
39 ref_str.clone()
40 };
41 vec![PropertyRelationship {
42 relationship_type: "foreignKey".to_string(),
43 to,
44 }]
45 }
46 None => Vec::new(),
47 }
48}
49
50pub struct ODCLImporter {
55 current_yaml_data: Option<serde_yaml::Value>,
57}
58
59impl ODCLImporter {
60 pub fn new() -> Self {
70 Self {
71 current_yaml_data: None,
72 }
73 }
74
75 pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
106 let yaml_data: serde_yaml::Value = serde_yaml::from_str(yaml_content)
108 .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
109
110 let json_data = yaml_to_json_value(&yaml_data).map_err(|e| {
111 ImportError::ParseError(format!("Failed to convert YAML to JSON: {}", e))
112 })?;
113
114 match self.parse(yaml_content) {
115 Ok((table, errors)) => {
116 let sdk_tables = vec![TableData {
118 table_index: 0,
119 id: Some(table.id.to_string()),
120 name: Some(table.name.clone()),
121 api_version: json_data
122 .get("dataContractSpecification")
123 .and_then(|v| v.as_str())
124 .map(|s| s.to_string()),
125 version: json_data
126 .get("info")
127 .and_then(|v| v.get("version"))
128 .and_then(|v| v.as_str())
129 .map(|s| s.to_string()),
130 status: json_data
131 .get("info")
132 .and_then(|v| v.get("status"))
133 .and_then(|v| v.as_str())
134 .map(|s| s.to_string()),
135 kind: Some("DataContract".to_string()),
136 domain: json_data
137 .get("info")
138 .and_then(|v| v.get("domain"))
139 .and_then(|v| v.as_str())
140 .map(|s| s.to_string()),
141 data_product: json_data
142 .get("info")
143 .and_then(|v| v.get("dataProduct"))
144 .and_then(|v| v.as_str())
145 .map(|s| s.to_string()),
146 tenant: json_data
147 .get("info")
148 .and_then(|v| v.get("tenant"))
149 .and_then(|v| v.as_str())
150 .map(|s| s.to_string()),
151 description: json_data
152 .get("info")
153 .and_then(|v| v.get("description"))
154 .cloned(),
155 physical_name: table.schema_name.clone(),
157 physical_type: None,
158 business_name: None,
159 data_granularity_description: None,
160 columns: table.columns.iter().map(column_to_column_data).collect(),
161 servers: json_data
162 .get("servers")
163 .and_then(|v| v.as_array())
164 .cloned()
165 .unwrap_or_default(),
166 team: json_data.get("info").and_then(|v| v.get("team")).cloned(),
167 support: json_data
168 .get("info")
169 .and_then(|v| v.get("support"))
170 .cloned(),
171 roles: Vec::new(),
172 sla_properties: json_data
173 .get("servicelevels")
174 .and_then(|v| v.as_array())
175 .cloned()
176 .unwrap_or_default(),
177 quality: table.quality.clone(),
178 price: json_data
179 .get("info")
180 .and_then(|v| v.get("pricing"))
181 .cloned(),
182 tags: table.tags.iter().map(|t| t.to_string()).collect(),
183 custom_properties: Vec::new(),
184 authoritative_definitions: Vec::new(),
185 contract_created_ts: None,
186 odcs_metadata: table.odcl_metadata.clone(),
187 }];
188 let sdk_errors: Vec<ImportError> = errors
189 .iter()
190 .map(|e| ImportError::ParseError(e.message.clone()))
191 .collect();
192 Ok(ImportResult {
193 tables: sdk_tables,
194 tables_requiring_name: Vec::new(),
195 errors: sdk_errors,
196 ai_suggestions: None,
197 })
198 }
199 Err(e) => Err(ImportError::ParseError(e.to_string())),
200 }
201 }
202
203 pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
214 self.parse(yaml_content)
215 }
216
217 fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
226 let data: serde_yaml::Value =
228 serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
229
230 if data.is_null() {
231 return Err(anyhow::anyhow!("Empty YAML content"));
232 }
233
234 self.current_yaml_data = Some(data.clone());
236
237 let json_data = yaml_to_json_value(&data)?;
239
240 if self.is_data_contract_format(&json_data) {
242 return self.parse_data_contract(&json_data);
243 }
244
245 self.parse_simple_odcl(&json_data)
247 }
248
249 pub fn can_handle(&self, yaml_content: &str) -> bool {
254 let data: serde_yaml::Value = match serde_yaml::from_str(yaml_content) {
255 Ok(d) => d,
256 Err(_) => return false,
257 };
258
259 let json_data = match yaml_to_json_value(&data) {
260 Ok(j) => j,
261 Err(_) => return false,
262 };
263
264 if self.is_odcs_v3_format(&json_data) {
266 return false;
267 }
268
269 if self.is_data_contract_format(&json_data) {
271 return true;
272 }
273
274 if let Some(obj) = json_data.as_object() {
276 let has_name = obj.contains_key("name");
277 let has_columns = obj.get("columns").and_then(|v| v.as_array()).is_some();
278 return has_name && has_columns;
279 }
280
281 false
282 }
283
284 fn is_odcs_v3_format(&self, data: &JsonValue) -> bool {
286 if let Some(obj) = data.as_object() {
287 let has_api_version = obj.contains_key("apiVersion");
288 let has_kind = obj
289 .get("kind")
290 .and_then(|v| v.as_str())
291 .map(|s| s == "DataContract")
292 .unwrap_or(false);
293 let has_id = obj.contains_key("id");
294 let has_version = obj.contains_key("version");
295 return has_api_version && has_kind && has_id && has_version;
296 }
297 false
298 }
299
300 fn is_data_contract_format(&self, data: &JsonValue) -> bool {
302 if let Some(obj) = data.as_object() {
303 let has_spec = obj.contains_key("dataContractSpecification");
304 let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
305 return has_spec && has_models;
306 }
307 false
308 }
309
310 fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
312 let mut errors = Vec::new();
313
314 let name = data
316 .get("name")
317 .and_then(|v| v.as_str())
318 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
319 .to_string();
320
321 let columns_data = data
323 .get("columns")
324 .and_then(|v| v.as_array())
325 .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
326
327 let mut columns = Vec::new();
328 for (idx, col_data) in columns_data.iter().enumerate() {
329 match self.parse_column(col_data) {
330 Ok(col) => columns.push(col),
331 Err(e) => {
332 errors.push(ParserError {
333 error_type: "column_parse_error".to_string(),
334 field: format!("columns[{}]", idx),
335 message: e.to_string(),
336 });
337 }
338 }
339 }
340
341 let database_type = self.extract_database_type(data);
343 let medallion_layers = self.extract_medallion_layers(data);
344 let scd_pattern = self.extract_scd_pattern(data);
345 let data_vault_classification = self.extract_data_vault_classification(data);
346 let quality_rules = self.extract_quality_rules(data);
347
348 if scd_pattern.is_some() && data_vault_classification.is_some() {
350 errors.push(ParserError {
351 error_type: "validation_error".to_string(),
352 field: "patterns".to_string(),
353 message: "SCD pattern and Data Vault classification are mutually exclusive"
354 .to_string(),
355 });
356 }
357
358 let mut odcl_metadata = HashMap::new();
360 if let Some(metadata) = data.get("odcl_metadata")
361 && let Some(obj) = metadata.as_object()
362 {
363 for (key, value) in obj {
364 odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
365 }
366 }
367
368 let table_uuid = self.extract_table_uuid(data);
369
370 let table = Table {
371 id: table_uuid,
372 name,
373 columns,
374 database_type,
375 catalog_name: None,
376 schema_name: None,
377 medallion_layers,
378 scd_pattern,
379 data_vault_classification,
380 modeling_level: None,
381 tags: Vec::<Tag>::new(),
382 odcl_metadata,
383 owner: None,
384 sla: None,
385 contact_details: None,
386 infrastructure_type: None,
387 notes: None,
388 position: None,
389 yaml_file_path: None,
390 drawio_cell_id: None,
391 quality: quality_rules,
392 errors: Vec::new(),
393 created_at: chrono::Utc::now(),
394 updated_at: chrono::Utc::now(),
395 };
396
397 info!("Parsed ODCL table: {}", table.name);
398 Ok((table, errors))
399 }
400
401 fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
403 let name = col_data
404 .get("name")
405 .and_then(|v| v.as_str())
406 .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
407 .to_string();
408
409 let data_type = col_data
410 .get("data_type")
411 .and_then(|v| v.as_str())
412 .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
413 .to_string();
414
415 let data_type = normalize_data_type(&data_type);
417
418 let nullable = col_data
419 .get("nullable")
420 .and_then(|v| v.as_bool())
421 .unwrap_or(true);
422
423 let primary_key = col_data
424 .get("primary_key")
425 .and_then(|v| v.as_bool())
426 .unwrap_or(false);
427
428 let foreign_key = col_data.get("foreign_key").and_then(parse_foreign_key);
429
430 let constraints = col_data
431 .get("constraints")
432 .and_then(|v| v.as_array())
433 .map(|arr| {
434 arr.iter()
435 .filter_map(|v| v.as_str().map(|s| s.to_string()))
436 .collect()
437 })
438 .unwrap_or_default();
439
440 let description = col_data
441 .get("description")
442 .and_then(|v| v.as_str())
443 .map(|s| s.to_string())
444 .unwrap_or_default();
445
446 let mut column_quality_rules = Vec::new();
448 if let Some(quality_val) = col_data.get("quality") {
449 if let Some(arr) = quality_val.as_array() {
450 for item in arr {
452 if let Some(obj) = item.as_object() {
453 let mut rule = HashMap::new();
454 for (key, value) in obj {
455 rule.insert(key.clone(), json_value_to_serde_value(value));
456 }
457 column_quality_rules.push(rule);
458 }
459 }
460 } else if let Some(obj) = quality_val.as_object() {
461 let mut rule = HashMap::new();
463 for (key, value) in obj {
464 rule.insert(key.clone(), json_value_to_serde_value(value));
465 }
466 column_quality_rules.push(rule);
467 }
468 }
469
470 Ok(Column {
471 name,
472 data_type,
473 nullable,
474 primary_key,
475 foreign_key,
476 constraints,
477 description,
478 quality: column_quality_rules,
479 ..Default::default()
480 })
481 }
482
483 fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
485 data.get("database_type")
486 .and_then(|v| v.as_str())
487 .and_then(|s| match s.to_uppercase().as_str() {
488 "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
489 "MYSQL" => Some(DatabaseType::Mysql),
490 "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
491 "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
492 "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
493 _ => None,
494 })
495 }
496
497 fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
499 let mut layers = Vec::new();
500
501 if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
503 for item in arr {
504 if let Some(s) = item.as_str()
505 && let Ok(layer) = parse_medallion_layer(s)
506 {
507 layers.push(layer);
508 }
509 }
510 }
511 else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
513 && let Ok(layer) = parse_medallion_layer(s)
514 {
515 layers.push(layer);
516 }
517
518 layers
519 }
520
521 fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
523 data.get("scd_pattern")
524 .and_then(|v| v.as_str())
525 .and_then(|s| parse_scd_pattern(s).ok())
526 }
527
528 fn extract_data_vault_classification(
530 &self,
531 data: &JsonValue,
532 ) -> Option<DataVaultClassification> {
533 data.get("data_vault_classification")
534 .and_then(|v| v.as_str())
535 .and_then(|s| parse_data_vault_classification(s).ok())
536 }
537
538 fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
540 use serde_json::Value;
541 let mut quality_rules = Vec::new();
542
543 if let Some(quality_val) = data.get("quality") {
545 if let Some(arr) = quality_val.as_array() {
546 for item in arr {
548 if let Some(obj) = item.as_object() {
549 let mut rule = HashMap::new();
550 for (key, value) in obj {
551 rule.insert(key.clone(), json_value_to_serde_value(value));
552 }
553 quality_rules.push(rule);
554 }
555 }
556 } else if let Some(obj) = quality_val.as_object() {
557 let mut rule = HashMap::new();
559 for (key, value) in obj {
560 rule.insert(key.clone(), json_value_to_serde_value(value));
561 }
562 quality_rules.push(rule);
563 } else if let Some(s) = quality_val.as_str() {
564 let mut rule = HashMap::new();
566 rule.insert("value".to_string(), Value::String(s.to_string()));
567 quality_rules.push(rule);
568 }
569 }
570
571 if let Some(metadata) = data.get("metadata")
573 && let Some(metadata_obj) = metadata.as_object()
574 && let Some(quality_val) = metadata_obj.get("quality")
575 {
576 if let Some(arr) = quality_val.as_array() {
577 for item in arr {
579 if let Some(obj) = item.as_object() {
580 let mut rule = HashMap::new();
581 for (key, value) in obj {
582 rule.insert(key.clone(), json_value_to_serde_value(value));
583 }
584 quality_rules.push(rule);
585 }
586 }
587 } else if let Some(obj) = quality_val.as_object() {
588 let mut rule = HashMap::new();
590 for (key, value) in obj {
591 rule.insert(key.clone(), json_value_to_serde_value(value));
592 }
593 quality_rules.push(rule);
594 } else if let Some(s) = quality_val.as_str() {
595 let mut rule = HashMap::new();
597 rule.insert("value".to_string(), Value::String(s.to_string()));
598 quality_rules.push(rule);
599 }
600 }
601
602 if let Some(tblprops) = data.get("tblproperties")
604 && let Some(obj) = tblprops.as_object()
605 {
606 for (key, value) in obj {
607 let mut rule = HashMap::new();
608 rule.insert("property".to_string(), Value::String(key.clone()));
609 rule.insert("value".to_string(), json_value_to_serde_value(value));
610 quality_rules.push(rule);
611 }
612 }
613
614 quality_rules
615 }
616
617 fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
619 let mut errors = Vec::new();
620
621 let models = data
623 .get("models")
624 .and_then(|v| v.as_object())
625 .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
626
627 let (model_name, model_data) = models
630 .iter()
631 .next()
632 .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
633
634 let model_data = model_data
635 .as_object()
636 .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
637
638 let fields = model_data
640 .get("fields")
641 .and_then(|v| v.as_object())
642 .ok_or_else(|| {
643 errors.push(ParserError {
644 error_type: "validation_error".to_string(),
645 field: format!("Model '{}'", model_name),
646 message: format!("Model '{}' missing 'fields' field", model_name),
647 });
648 anyhow::anyhow!("Missing fields")
649 });
650
651 let fields = match fields {
652 Ok(f) => f,
653 Err(_) => {
654 let mut quality_rules = self.extract_quality_rules(data);
657 let model_data_value = JsonValue::Object(model_data.clone());
658 let model_quality_rules = self.extract_quality_rules(&model_data_value);
659 quality_rules.extend(model_quality_rules);
660 let table_uuid = self.extract_table_uuid(data);
661 let table = Table {
662 id: table_uuid,
663 name: model_name.clone(),
664 columns: Vec::new(),
665 database_type: None,
666 catalog_name: None,
667 schema_name: None,
668 medallion_layers: Vec::new(),
669 scd_pattern: None,
670 data_vault_classification: None,
671 modeling_level: None,
672 tags: Vec::<Tag>::new(),
673 odcl_metadata: HashMap::new(),
674 owner: None,
675 sla: None,
676 contact_details: None,
677 infrastructure_type: None,
678 notes: None,
679 position: None,
680 yaml_file_path: None,
681 drawio_cell_id: None,
682 quality: quality_rules,
683 errors: Vec::new(),
684 created_at: chrono::Utc::now(),
685 updated_at: chrono::Utc::now(),
686 };
687 return Ok((table, errors));
688 }
689 };
690
691 let mut columns = Vec::new();
693 for (field_name, field_data) in fields {
694 if let Some(field_obj) = field_data.as_object() {
695 match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
696 Ok(mut cols) => columns.append(&mut cols),
697 Err(e) => {
698 errors.push(ParserError {
699 error_type: "field_parse_error".to_string(),
700 field: format!("Field '{}'", field_name),
701 message: e.to_string(),
702 });
703 }
704 }
705 } else {
706 errors.push(ParserError {
707 error_type: "validation_error".to_string(),
708 field: format!("Field '{}'", field_name),
709 message: format!("Field '{}' must be an object", field_name),
710 });
711 }
712 }
713
714 let mut odcl_metadata = HashMap::new();
716
717 if let Some(info_val) = data.get("info") {
719 let info_json_value = json_value_to_serde_value(info_val);
720 odcl_metadata.insert("info".to_string(), info_json_value);
721 }
722
723 odcl_metadata.insert(
724 "dataContractSpecification".to_string(),
725 json_value_to_serde_value(
726 data.get("dataContractSpecification")
727 .unwrap_or(&JsonValue::Null),
728 ),
729 );
730 odcl_metadata.insert(
731 "id".to_string(),
732 json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
733 );
734
735 if let Some(servicelevels_val) = data.get("servicelevels") {
737 odcl_metadata.insert(
738 "servicelevels".to_string(),
739 json_value_to_serde_value(servicelevels_val),
740 );
741 }
742
743 if let Some(links_val) = data.get("links") {
745 odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
746 }
747
748 if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
750 odcl_metadata.insert(
751 "domain".to_string(),
752 json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
753 );
754 }
755 if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
756 odcl_metadata.insert(
757 "dataProduct".to_string(),
758 json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
759 );
760 }
761 if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
762 odcl_metadata.insert(
763 "tenant".to_string(),
764 json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
765 );
766 }
767
768 if let Some(desc_val) = data.get("description") {
770 odcl_metadata.insert(
771 "description".to_string(),
772 json_value_to_serde_value(desc_val),
773 );
774 }
775
776 if let Some(pricing_val) = data.get("pricing") {
778 odcl_metadata.insert(
779 "pricing".to_string(),
780 json_value_to_serde_value(pricing_val),
781 );
782 }
783
784 if let Some(team_val) = data.get("team") {
786 odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
787 }
788
789 if let Some(roles_val) = data.get("roles") {
791 odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
792 }
793
794 if let Some(terms_val) = data.get("terms") {
796 odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
797 }
798
799 if let Some(servers_val) = data.get("servers") {
801 odcl_metadata.insert(
802 "servers".to_string(),
803 json_value_to_serde_value(servers_val),
804 );
805 }
806
807 if let Some(infrastructure_val) = data.get("infrastructure") {
809 odcl_metadata.insert(
810 "infrastructure".to_string(),
811 json_value_to_serde_value(infrastructure_val),
812 );
813 }
814
815 let database_type = self.extract_database_type_from_servers(data);
817
818 let (catalog_name, schema_name) = extract_catalog_schema(data);
820
821 let shared_domains = extract_shared_domains(data);
823
824 let mut tags: Vec<Tag> = Vec::new();
826 if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
827 for item in tags_arr {
828 if let Some(s) = item.as_str() {
829 if let Ok(tag) = Tag::from_str(s) {
831 tags.push(tag);
832 } else {
833 tags.push(crate::models::Tag::Simple(s.to_string()));
835 }
836 }
837 }
838 }
839
840 let mut quality_rules = self.extract_quality_rules(data);
843
844 let model_data_value = JsonValue::Object(model_data.clone());
847 let model_quality_rules = self.extract_quality_rules(&model_data_value);
848 quality_rules.extend(model_quality_rules);
849
850 if !shared_domains.is_empty() {
852 let shared_domains_json: Vec<serde_json::Value> = shared_domains
853 .iter()
854 .map(|d| serde_json::Value::String(d.clone()))
855 .collect();
856 odcl_metadata.insert(
857 "sharedDomains".to_string(),
858 serde_json::Value::Array(shared_domains_json),
859 );
860 }
861
862 let table_uuid = self.extract_table_uuid(data);
863
864 let table = Table {
865 id: table_uuid,
866 name: model_name.clone(),
867 columns,
868 database_type,
869 catalog_name,
870 schema_name,
871 medallion_layers: Vec::new(),
872 scd_pattern: None,
873 data_vault_classification: None,
874 modeling_level: None,
875 tags,
876 odcl_metadata,
877 owner: None,
878 sla: None,
879 contact_details: None,
880 infrastructure_type: None,
881 notes: None,
882 position: None,
883 yaml_file_path: None,
884 drawio_cell_id: None,
885 quality: quality_rules,
886 errors: Vec::new(),
887 created_at: chrono::Utc::now(),
888 updated_at: chrono::Utc::now(),
889 };
890
891 info!(
892 "Parsed Data Contract table: {} with {} warnings/errors",
893 model_name,
894 errors.len()
895 );
896 Ok((table, errors))
897 }
898
899 fn parse_data_contract_field(
901 &self,
902 field_name: &str,
903 field_data: &serde_json::Map<String, JsonValue>,
904 data: &JsonValue,
905 errors: &mut Vec<ParserError>,
906 ) -> Result<Vec<Column>> {
907 let mut columns = Vec::new();
908
909 let description = field_data
911 .get("description")
912 .and_then(|v| v.as_str())
913 .unwrap_or("")
914 .to_string();
915
916 let mut quality_rules = extract_quality_from_obj(field_data);
918
919 if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
921 let ref_path = Some(ref_str.to_string());
923
924 if let Some(definition) = resolve_ref(ref_str, data) {
925 if quality_rules.is_empty() {
927 if let Some(def_obj) = definition.as_object() {
928 quality_rules = extract_quality_from_obj(def_obj);
929 }
930 } else {
931 if let Some(def_obj) = definition.as_object() {
933 let def_quality = extract_quality_from_obj(def_obj);
934 quality_rules.extend(def_quality);
936 }
937 }
938
939 let required = field_data
940 .get("required")
941 .and_then(|v| v.as_bool())
942 .unwrap_or(false);
943
944 let has_nested = definition
946 .get("type")
947 .and_then(|v| v.as_str())
948 .map(|s| s == "object")
949 .unwrap_or(false)
950 || definition.get("properties").is_some()
951 || definition.get("fields").is_some();
952
953 if has_nested {
954 if let Some(properties) =
956 definition.get("properties").and_then(|v| v.as_object())
957 {
958 let nested_required: Vec<String> = definition
960 .get("required")
961 .and_then(|v| v.as_array())
962 .map(|arr| {
963 arr.iter()
964 .filter_map(|v| v.as_str().map(|s| s.to_string()))
965 .collect()
966 })
967 .unwrap_or_default();
968
969 for (nested_name, nested_schema) in properties {
970 let nested_required_field = nested_required.contains(nested_name);
971 expand_nested_column(
972 &format!("{}.{}", field_name, nested_name),
973 nested_schema,
974 !nested_required_field,
975 &mut columns,
976 errors,
977 );
978 }
979 } else if let Some(fields) =
980 definition.get("fields").and_then(|v| v.as_object())
981 {
982 for (nested_name, nested_schema) in fields {
984 expand_nested_column(
985 &format!("{}.{}", field_name, nested_name),
986 nested_schema,
987 true, &mut columns,
989 errors,
990 );
991 }
992 } else {
993 columns.push(Column {
995 name: field_name.to_string(),
996 data_type: "OBJECT".to_string(),
997 nullable: !required,
998 description: if description.is_empty() {
999 definition
1000 .get("description")
1001 .and_then(|v| v.as_str())
1002 .unwrap_or("")
1003 .to_string()
1004 } else {
1005 description.clone()
1006 },
1007 quality: quality_rules.clone(),
1008 relationships: ref_to_relationships(&ref_path),
1009 ..Default::default()
1010 });
1011 }
1012 } else {
1013 let def_type = definition
1015 .get("type")
1016 .and_then(|v| v.as_str())
1017 .unwrap_or("STRING")
1018 .to_uppercase();
1019
1020 let enum_values = definition
1021 .get("enum")
1022 .and_then(|v| v.as_array())
1023 .map(|arr| {
1024 arr.iter()
1025 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1026 .collect()
1027 })
1028 .unwrap_or_default();
1029
1030 columns.push(Column {
1031 name: field_name.to_string(),
1032 data_type: def_type,
1033 nullable: !required,
1034 description: if description.is_empty() {
1035 definition
1036 .get("description")
1037 .and_then(|v| v.as_str())
1038 .unwrap_or("")
1039 .to_string()
1040 } else {
1041 description
1042 },
1043 quality: quality_rules,
1044 relationships: ref_to_relationships(&ref_path),
1045 enum_values,
1046 ..Default::default()
1047 });
1048 }
1049 return Ok(columns);
1050 } else {
1051 let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
1053 let mut error_map = HashMap::new();
1054 error_map.insert("type".to_string(), serde_json::json!("validation_error"));
1055 error_map.insert("field".to_string(), serde_json::json!("data_type"));
1056 error_map.insert(
1057 "message".to_string(),
1058 serde_json::json!(format!(
1059 "Field '{}' references undefined definition: {}",
1060 field_name, ref_str
1061 )),
1062 );
1063 col_errors.push(error_map);
1064 columns.push(Column {
1065 name: field_name.to_string(),
1066 data_type: "OBJECT".to_string(),
1067 description,
1068 errors: col_errors,
1069 relationships: ref_to_relationships(&Some(ref_str.to_string())),
1070 ..Default::default()
1071 });
1072 return Ok(columns);
1073 }
1074 }
1075
1076 let field_type_str = field_data
1079 .get("logicalType")
1080 .and_then(|v| v.as_str())
1081 .or_else(|| field_data.get("type").and_then(|v| v.as_str()))
1082 .unwrap_or("STRING");
1083
1084 if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
1086 match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
1087 Ok(nested_cols) if !nested_cols.is_empty() => {
1088 let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
1090 "ARRAY<STRUCT<...>>".to_string()
1091 } else {
1092 "STRUCT<...>".to_string()
1093 };
1094
1095 columns.push(Column {
1097 name: field_name.to_string(),
1098 data_type: parent_data_type,
1099 nullable: !field_data
1100 .get("required")
1101 .and_then(|v| v.as_bool())
1102 .unwrap_or(false),
1103 description: description.clone(),
1104 quality: quality_rules.clone(),
1105 relationships: ref_to_relationships(
1106 &field_data
1107 .get("$ref")
1108 .and_then(|v| v.as_str())
1109 .map(|s| s.to_string()),
1110 ),
1111 ..Default::default()
1112 });
1113
1114 columns.extend(nested_cols);
1116 return Ok(columns);
1117 }
1118 Ok(_) | Err(_) => {
1119 }
1121 }
1122 }
1123
1124 let field_type = normalize_data_type(field_type_str);
1125
1126 if field_type == "ARRAY" {
1128 let items = field_data.get("items");
1129 if let Some(items_val) = items {
1130 if let Some(items_obj) = items_val.as_object() {
1131 let items_type = items_obj
1133 .get("logicalType")
1134 .and_then(|v| v.as_str())
1135 .or_else(|| items_obj.get("type").and_then(|v| v.as_str()));
1136
1137 let normalized_items_type = match items_type {
1139 Some("object") | Some("struct") => Some("object"),
1140 Some("array") => Some("array"),
1141 Some("string") | Some("varchar") | Some("char") | Some("text") => {
1142 Some("string")
1143 }
1144 Some("integer") | Some("int") | Some("bigint") | Some("smallint")
1145 | Some("tinyint") => Some("integer"),
1146 Some("number") | Some("decimal") | Some("double") | Some("float")
1147 | Some("numeric") => Some("number"),
1148 Some("boolean") | Some("bool") => Some("boolean"),
1149 Some("date") => Some("date"),
1150 Some("timestamp") | Some("datetime") => Some("timestamp"),
1151 Some("time") => Some("time"),
1152 other => other,
1153 };
1154
1155 if items_obj.get("fields").is_some()
1156 || items_obj.get("properties").is_some()
1157 || normalized_items_type == Some("object")
1158 {
1159 columns.push(Column {
1161 name: field_name.to_string(),
1162 data_type: "ARRAY<OBJECT>".to_string(),
1163 nullable: !field_data
1164 .get("required")
1165 .and_then(|v| v.as_bool())
1166 .unwrap_or(false),
1167 description: field_data
1168 .get("description")
1169 .and_then(|v| v.as_str())
1170 .unwrap_or("")
1171 .to_string(),
1172 ..Default::default()
1173 });
1174
1175 let properties_obj =
1177 items_obj.get("properties").and_then(|v| v.as_object());
1178 let fields_obj = items_obj.get("fields").and_then(|v| v.as_object());
1179
1180 if let Some(fields_map) = properties_obj.or(fields_obj) {
1181 for (nested_field_name, nested_field_data) in fields_map {
1182 if let Some(nested_field_obj) = nested_field_data.as_object() {
1183 let nested_field_type = nested_field_obj
1184 .get("logicalType")
1185 .and_then(|v| v.as_str())
1186 .or_else(|| {
1187 nested_field_obj.get("type").and_then(|v| v.as_str())
1188 })
1189 .unwrap_or("STRING");
1190
1191 let nested_col_name =
1193 format!("{}.[].{}", field_name, nested_field_name);
1194 let mut local_errors = Vec::new();
1195 match self.parse_data_contract_field(
1196 &nested_col_name,
1197 nested_field_obj,
1198 data,
1199 &mut local_errors,
1200 ) {
1201 Ok(mut nested_cols) => {
1202 columns.append(&mut nested_cols);
1203 }
1204 Err(_) => {
1205 columns.push(Column {
1207 name: nested_col_name,
1208 data_type: nested_field_type.to_uppercase(),
1209 nullable: !nested_field_obj
1210 .get("required")
1211 .and_then(|v| v.as_bool())
1212 .unwrap_or(false),
1213 description: nested_field_obj
1214 .get("description")
1215 .and_then(|v| v.as_str())
1216 .unwrap_or("")
1217 .to_string(),
1218 ..Default::default()
1219 });
1220 }
1221 }
1222 }
1223 }
1224 }
1225
1226 return Ok(columns);
1227 } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
1228 columns.push(Column {
1230 name: field_name.to_string(),
1231 data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
1232 nullable: !field_data
1233 .get("required")
1234 .and_then(|v| v.as_bool())
1235 .unwrap_or(false),
1236 description: description.clone(),
1237 quality: quality_rules.clone(),
1238 relationships: ref_to_relationships(
1239 &field_data
1240 .get("$ref")
1241 .and_then(|v| v.as_str())
1242 .map(|s| s.to_string()),
1243 ),
1244 ..Default::default()
1245 });
1246 return Ok(columns);
1247 }
1248 } else if let Some(item_type_str) = items_val.as_str() {
1249 columns.push(Column {
1251 name: field_name.to_string(),
1252 data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
1253 nullable: !field_data
1254 .get("required")
1255 .and_then(|v| v.as_bool())
1256 .unwrap_or(false),
1257 description: description.clone(),
1258 quality: quality_rules.clone(),
1259 relationships: ref_to_relationships(
1260 &field_data
1261 .get("$ref")
1262 .and_then(|v| v.as_str())
1263 .map(|s| s.to_string()),
1264 ),
1265 ..Default::default()
1266 });
1267 return Ok(columns);
1268 }
1269 }
1270 columns.push(Column {
1272 name: field_name.to_string(),
1273 data_type: "ARRAY<STRING>".to_string(),
1274 nullable: !field_data
1275 .get("required")
1276 .and_then(|v| v.as_bool())
1277 .unwrap_or(false),
1278 description: description.clone(),
1279 quality: quality_rules.clone(),
1280 relationships: ref_to_relationships(
1281 &field_data
1282 .get("$ref")
1283 .and_then(|v| v.as_str())
1284 .map(|s| s.to_string()),
1285 ),
1286 ..Default::default()
1287 });
1288 return Ok(columns);
1289 }
1290
1291 let nested_fields_obj = field_data
1293 .get("properties")
1294 .and_then(|v| v.as_object())
1295 .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
1296
1297 if field_type == "OBJECT" && nested_fields_obj.is_some() {
1298 columns.push(Column {
1300 name: field_name.to_string(),
1301 data_type: "OBJECT".to_string(),
1302 nullable: !field_data
1303 .get("required")
1304 .and_then(|v| v.as_bool())
1305 .unwrap_or(false),
1306 description: description.clone(),
1307 quality: quality_rules.clone(),
1308 relationships: ref_to_relationships(
1309 &field_data
1310 .get("$ref")
1311 .and_then(|v| v.as_str())
1312 .map(|s| s.to_string()),
1313 ),
1314 ..Default::default()
1315 });
1316
1317 if let Some(fields_obj) = nested_fields_obj {
1319 for (nested_field_name, nested_field_data) in fields_obj {
1320 if let Some(nested_field_obj) = nested_field_data.as_object() {
1321 let nested_field_type = nested_field_obj
1322 .get("logicalType")
1323 .and_then(|v| v.as_str())
1324 .or_else(|| nested_field_obj.get("type").and_then(|v| v.as_str()))
1325 .unwrap_or("STRING");
1326
1327 let nested_col_name = format!("{}.{}", field_name, nested_field_name);
1329 match self.parse_data_contract_field(
1330 &nested_col_name,
1331 nested_field_obj,
1332 data,
1333 errors,
1334 ) {
1335 Ok(mut nested_cols) => {
1336 columns.append(&mut nested_cols);
1337 }
1338 Err(_) => {
1339 columns.push(Column {
1341 name: nested_col_name,
1342 data_type: nested_field_type.to_uppercase(),
1343 nullable: !nested_field_obj
1344 .get("required")
1345 .and_then(|v| v.as_bool())
1346 .unwrap_or(false),
1347 description: nested_field_obj
1348 .get("description")
1349 .and_then(|v| v.as_str())
1350 .unwrap_or("")
1351 .to_string(),
1352 ..Default::default()
1353 });
1354 }
1355 }
1356 }
1357 }
1358 }
1359
1360 return Ok(columns);
1361 }
1362
1363 let ref_path = field_data
1365 .get("$ref")
1366 .and_then(|v| v.as_str())
1367 .map(|s| s.to_string());
1368
1369 let required = field_data
1370 .get("required")
1371 .and_then(|v| v.as_bool())
1372 .unwrap_or(false);
1373
1374 let field_description = if description.is_empty() {
1375 field_data
1376 .get("description")
1377 .and_then(|v| v.as_str())
1378 .unwrap_or("")
1379 .to_string()
1380 } else {
1381 description
1382 };
1383
1384 let mut column_quality_rules = quality_rules;
1386 if column_quality_rules.is_empty()
1387 && let Some(quality_val) = field_data.get("quality")
1388 {
1389 if let Some(arr) = quality_val.as_array() {
1390 for item in arr {
1391 if let Some(obj) = item.as_object() {
1392 let mut rule = HashMap::new();
1393 for (key, value) in obj {
1394 rule.insert(key.clone(), json_value_to_serde_value(value));
1395 }
1396 column_quality_rules.push(rule);
1397 }
1398 }
1399 } else if let Some(obj) = quality_val.as_object() {
1400 let mut rule = HashMap::new();
1401 for (key, value) in obj {
1402 rule.insert(key.clone(), json_value_to_serde_value(value));
1403 }
1404 column_quality_rules.push(rule);
1405 }
1406 }
1407
1408 columns.push(Column {
1409 name: field_name.to_string(),
1410 data_type: field_type,
1411 nullable: !required,
1412 primary_key: field_data
1413 .get("primaryKey")
1414 .and_then(|v| v.as_bool())
1415 .unwrap_or(false),
1416 foreign_key: parse_foreign_key_from_data_contract(field_data),
1417 description: field_description,
1418 quality: column_quality_rules,
1419 relationships: ref_to_relationships(&ref_path),
1420 ..Default::default()
1421 });
1422
1423 Ok(columns)
1424 }
1425
1426 fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
1428 if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
1430 if let Some((_, server_data)) = servers_obj.iter().next()
1432 && let Some(server_obj) = server_data.as_object()
1433 {
1434 return server_obj
1435 .get("type")
1436 .and_then(|v| v.as_str())
1437 .and_then(|s| self.parse_database_type(s));
1438 }
1439 } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
1440 if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
1442 return server_obj
1443 .get("type")
1444 .and_then(|v| v.as_str())
1445 .and_then(|s| self.parse_database_type(s));
1446 }
1447 }
1448 None
1449 }
1450
1451 fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
1453 match s.to_lowercase().as_str() {
1454 "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
1455 "postgres" | "postgresql" => Some(DatabaseType::Postgres),
1456 "mysql" => Some(DatabaseType::Mysql),
1457 "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
1458 "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
1459 _ => None,
1460 }
1461 }
1462
1463 fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
1465 if let Some(id_val) = data.get("id")
1467 && let Some(id_str) = id_val.as_str()
1468 && let Ok(uuid) = uuid::Uuid::parse_str(id_str)
1469 {
1470 tracing::debug!(
1471 "[ODCLImporter] Extracted UUID from top-level 'id' field: {}",
1472 uuid
1473 );
1474 return uuid;
1475 }
1476
1477 if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1479 for prop in custom_props {
1480 if let Some(prop_obj) = prop.as_object() {
1481 let prop_key = prop_obj
1482 .get("property")
1483 .and_then(|v| v.as_str())
1484 .unwrap_or("");
1485 if prop_key == "tableUuid"
1486 && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
1487 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1488 {
1489 tracing::debug!(
1490 "[ODCLImporter] Extracted UUID from customProperties.tableUuid: {}",
1491 uuid
1492 );
1493 return uuid;
1494 }
1495 }
1496 }
1497 }
1498
1499 if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
1501 && let Some(uuid_val) = metadata.get("tableUuid")
1502 && let Some(uuid_str) = uuid_val.as_str()
1503 && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1504 {
1505 tracing::debug!(
1506 "[ODCLImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
1507 uuid
1508 );
1509 return uuid;
1510 }
1511
1512 let table_name = data
1514 .get("name")
1515 .and_then(|v| v.as_str())
1516 .unwrap_or("unknown");
1517 let new_uuid = crate::models::table::Table::generate_id(table_name, None, None, None);
1518 tracing::warn!(
1519 "[ODCLImporter] No UUID found for table '{}', generating deterministic UUID: {}",
1520 table_name,
1521 new_uuid
1522 );
1523 new_uuid
1524 }
1525
1526 #[allow(clippy::only_used_in_recursion)]
1528 fn parse_struct_type_from_string(
1529 &self,
1530 field_name: &str,
1531 type_str: &str,
1532 field_data: &serde_json::Map<String, JsonValue>,
1533 ) -> Result<Vec<Column>> {
1534 let mut columns = Vec::new();
1535
1536 let normalized_type = type_str
1538 .lines()
1539 .map(|line| line.trim())
1540 .filter(|line| !line.is_empty())
1541 .collect::<Vec<_>>()
1542 .join(" ");
1543
1544 let type_str_upper = normalized_type.to_uppercase();
1545
1546 let is_array = type_str_upper.starts_with("ARRAY<");
1548 let struct_start = type_str_upper.find("STRUCT<");
1549
1550 if let Some(start_pos) = struct_start {
1551 let struct_content_start = start_pos + 7; let struct_content = &normalized_type[struct_content_start..];
1553
1554 let mut depth = 1;
1556 let mut end_pos = None;
1557 for (i, ch) in struct_content.char_indices() {
1558 match ch {
1559 '<' => depth += 1,
1560 '>' => {
1561 depth -= 1;
1562 if depth == 0 {
1563 end_pos = Some(i);
1564 break;
1565 }
1566 }
1567 _ => {}
1568 }
1569 }
1570
1571 let struct_fields_str = if let Some(end) = end_pos {
1572 &struct_content[..end]
1573 } else {
1574 struct_content.trim_end_matches('>').trim()
1575 };
1576
1577 let fields = parse_struct_fields_from_string(struct_fields_str)?;
1579
1580 for (nested_name, nested_type) in fields {
1582 let nested_type_upper = nested_type.to_uppercase();
1583 let nested_col_name = if is_array {
1584 format!("{}.[].{}", field_name, nested_name)
1585 } else {
1586 format!("{}.{}", field_name, nested_name)
1587 };
1588
1589 let is_nested_struct = nested_type_upper.starts_with("STRUCT<");
1590 let is_nested_array_struct = nested_type_upper.starts_with("ARRAY<STRUCT<");
1591
1592 if is_nested_struct || is_nested_array_struct {
1593 match self.parse_struct_type_from_string(
1595 &nested_col_name,
1596 &nested_type,
1597 field_data,
1598 ) {
1599 Ok(nested_cols) => {
1600 columns.extend(nested_cols);
1601 }
1602 Err(_) => {
1603 let fallback_data_type = if is_nested_array_struct {
1604 "ARRAY<STRUCT<...>>".to_string()
1605 } else {
1606 "STRUCT<...>".to_string()
1607 };
1608 columns.push(Column {
1609 name: nested_col_name,
1610 data_type: fallback_data_type,
1611 nullable: !field_data
1612 .get("required")
1613 .and_then(|v| v.as_bool())
1614 .unwrap_or(false),
1615 description: field_data
1616 .get("description")
1617 .and_then(|v| v.as_str())
1618 .unwrap_or("")
1619 .to_string(),
1620 ..Default::default()
1621 });
1622 }
1623 }
1624 } else if nested_type_upper.starts_with("ARRAY<") {
1625 columns.push(Column {
1626 name: nested_col_name,
1627 data_type: normalize_data_type(&nested_type),
1628 nullable: !field_data
1629 .get("required")
1630 .and_then(|v| v.as_bool())
1631 .unwrap_or(false),
1632 description: field_data
1633 .get("description")
1634 .and_then(|v| v.as_str())
1635 .unwrap_or("")
1636 .to_string(),
1637 ..Default::default()
1638 });
1639 } else {
1640 columns.push(Column {
1642 name: nested_col_name,
1643 data_type: normalize_data_type(&nested_type),
1644 nullable: !field_data
1645 .get("required")
1646 .and_then(|v| v.as_bool())
1647 .unwrap_or(false),
1648 description: field_data
1649 .get("description")
1650 .and_then(|v| v.as_str())
1651 .unwrap_or("")
1652 .to_string(),
1653 ..Default::default()
1654 });
1655 }
1656 }
1657
1658 return Ok(columns);
1659 }
1660
1661 Ok(Vec::new())
1663 }
1664}
1665
1666impl Default for ODCLImporter {
1667 fn default() -> Self {
1668 Self::new()
1669 }
1670}
1671
1672#[cfg(test)]
1673mod tests {
1674 use super::*;
1675
1676 #[test]
1677 fn test_parse_simple_odcl_table() {
1678 let mut parser = ODCLImporter::new();
1679 let odcl_yaml = r#"
1680name: users
1681columns:
1682 - name: id
1683 data_type: INT
1684 nullable: false
1685 primary_key: true
1686 - name: name
1687 data_type: VARCHAR(255)
1688 nullable: false
1689database_type: Postgres
1690"#;
1691
1692 let (table, errors) = parser.parse(odcl_yaml).unwrap();
1693 assert_eq!(table.name, "users");
1694 assert_eq!(table.columns.len(), 2);
1695 assert_eq!(table.columns[0].name, "id");
1696 assert_eq!(table.database_type, Some(DatabaseType::Postgres));
1697 assert_eq!(errors.len(), 0);
1698 }
1699
1700 #[test]
1701 fn test_parse_odcl_with_metadata() {
1702 let mut parser = ODCLImporter::new();
1703 let odcl_yaml = r#"
1704name: users
1705columns:
1706 - name: id
1707 data_type: INT
1708medallion_layer: gold
1709scd_pattern: TYPE_2
1710odcl_metadata:
1711 description: "User table"
1712 owner: "data-team"
1713"#;
1714
1715 let (table, errors) = parser.parse(odcl_yaml).unwrap();
1716 assert_eq!(table.medallion_layers.len(), 1);
1717 assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
1718 assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
1719 if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
1720 assert_eq!(desc, "User table");
1721 }
1722 assert_eq!(errors.len(), 0);
1723 }
1724
1725 #[test]
1726 fn test_parse_data_contract_format() {
1727 let mut parser = ODCLImporter::new();
1728 let odcl_yaml = r#"
1729dataContractSpecification: 0.9.3
1730id: urn:datacontract:example
1731models:
1732 users:
1733 fields:
1734 id:
1735 type: bigint
1736 description: User ID
1737 name:
1738 type: string
1739 description: User name
1740"#;
1741
1742 let (table, errors) = parser.parse(odcl_yaml).unwrap();
1743 assert_eq!(table.name, "users");
1744 assert_eq!(table.columns.len(), 2);
1745 assert!(errors.is_empty());
1746 }
1747
1748 #[test]
1749 fn test_can_handle_odcl_format() {
1750 let parser = ODCLImporter::new();
1751
1752 let data_contract = r#"
1754dataContractSpecification: 0.9.3
1755id: test
1756models:
1757 users:
1758 fields:
1759 id:
1760 type: int
1761"#;
1762 assert!(parser.can_handle(data_contract));
1763
1764 let simple_odcl = r#"
1766name: users
1767columns:
1768 - name: id
1769 data_type: INT
1770"#;
1771 assert!(parser.can_handle(simple_odcl));
1772
1773 let odcs_v3 = r#"
1775apiVersion: v3.1.0
1776kind: DataContract
1777id: test-uuid
1778version: 1.0.0
1779name: users
1780schema:
1781 - name: users
1782 properties:
1783 - name: id
1784 logicalType: integer
1785"#;
1786 assert!(!parser.can_handle(odcs_v3));
1787 }
1788}