1use super::contract::ODCSContract;
7use super::property::Property;
8use super::schema::SchemaObject;
9use super::supporting::{
10 AuthoritativeDefinition as OdcsAuthDef, CustomProperty,
11 LogicalTypeOptions as OdcsLogicalTypeOptions, PropertyRelationship as OdcsPropertyRelationship,
12 QualityRule,
13};
14use crate::import::{ColumnData, TableData};
15use crate::models::column::{
16 AuthoritativeDefinition as ColumnAuthDef, Column,
17 LogicalTypeOptions as ColumnLogicalTypeOptions,
18 PropertyRelationship as ColumnPropertyRelationship,
19};
20use crate::models::table::Table;
21
22pub fn map_data_type_to_logical_type(data_type: &str) -> (String, bool) {
33 let upper = data_type.to_uppercase();
34
35 if upper.starts_with("ARRAY<") {
37 return ("array".to_string(), true);
38 }
39 if upper == "STRUCT" || upper == "OBJECT" || upper.starts_with("STRUCT<") {
40 return ("object".to_string(), false);
41 }
42
43 if upper.contains("INT") || upper == "BIGINT" || upper == "SMALLINT" || upper == "TINYINT" {
45 ("integer".to_string(), false)
46 } else if upper.contains("DECIMAL")
47 || upper.contains("DOUBLE")
48 || upper.contains("FLOAT")
49 || upper.contains("NUMERIC")
50 || upper == "NUMBER"
51 {
52 ("number".to_string(), false)
53 } else if upper == "BOOLEAN" || upper == "BOOL" {
54 ("boolean".to_string(), false)
55 } else if upper == "DATE" {
56 ("date".to_string(), false)
57 } else if upper.contains("TIMESTAMP") {
58 ("timestamp".to_string(), false)
59 } else if upper == "TIME" {
60 ("time".to_string(), false)
61 } else {
62 ("string".to_string(), false)
64 }
65}
66
67fn enum_values_to_quality_rule(enum_values: &[String]) -> QualityRule {
72 let enum_list: String = enum_values
73 .iter()
74 .map(|e| format!("'{}'", e.replace('\'', "''")))
75 .collect::<Vec<_>>()
76 .join(", ");
77
78 let query = format!(
79 "SELECT COUNT(*) FROM ${{table}} WHERE ${{column}} NOT IN ({})",
80 enum_list
81 );
82
83 QualityRule {
84 rule_type: Some("sql".to_string()),
85 query: Some(query),
86 must_be: Some(serde_json::json!(0)),
87 description: Some(format!("Value must be one of: {}", enum_values.join(", "))),
88 ..Default::default()
89 }
90}
91
92impl From<&Property> for Column {
97 fn from(prop: &Property) -> Self {
102 Column {
103 id: prop.id.clone(),
104 name: prop.name.clone(),
105 business_name: prop.business_name.clone(),
106 description: prop.description.clone().unwrap_or_default(),
107 data_type: prop.logical_type.clone(),
108 physical_type: prop.physical_type.clone(),
109 physical_name: prop.physical_name.clone(),
110 logical_type_options: prop.logical_type_options.as_ref().map(|opts| {
111 ColumnLogicalTypeOptions {
112 min_length: opts.min_length,
113 max_length: opts.max_length,
114 pattern: opts.pattern.clone(),
115 format: opts.format.clone(),
116 minimum: opts.minimum.clone(),
117 maximum: opts.maximum.clone(),
118 exclusive_minimum: opts.exclusive_minimum.clone(),
119 exclusive_maximum: opts.exclusive_maximum.clone(),
120 precision: opts.precision,
121 scale: opts.scale,
122 }
123 }),
124 primary_key: prop.primary_key,
125 primary_key_position: prop.primary_key_position,
126 unique: prop.unique,
127 nullable: !prop.required, partitioned: prop.partitioned,
129 partition_key_position: prop.partition_key_position,
130 clustered: prop.clustered,
131 classification: prop.classification.clone(),
132 critical_data_element: prop.critical_data_element,
133 encrypted_name: prop.encrypted_name.clone(),
134 transform_source_objects: prop.transform_source_objects.clone(),
135 transform_logic: prop.transform_logic.clone(),
136 transform_description: prop.transform_description.clone(),
137 examples: prop.examples.clone(),
138 default_value: prop.default_value.clone(),
139 relationships: prop
140 .relationships
141 .iter()
142 .map(|r| ColumnPropertyRelationship {
143 relationship_type: r.relationship_type.clone(),
144 to: r.to.clone(),
145 })
146 .collect(),
147 authoritative_definitions: prop
148 .authoritative_definitions
149 .iter()
150 .map(|d| ColumnAuthDef {
151 definition_type: d.definition_type.clone(),
152 url: d.url.clone(),
153 })
154 .collect(),
155 quality: prop
156 .quality
157 .iter()
158 .map(|q| serde_json::to_value(q).ok())
159 .filter_map(|v| v.and_then(|v| v.as_object().cloned()))
160 .map(|m| m.into_iter().collect())
161 .collect(),
162 enum_values: prop.enum_values.clone(),
163 tags: prop.tags.clone(),
164 custom_properties: prop
165 .custom_properties
166 .iter()
167 .map(|cp| (cp.property.clone(), cp.value.clone()))
168 .collect(),
169 secondary_key: false,
171 composite_key: None,
172 foreign_key: None,
173 constraints: Vec::new(),
174 errors: Vec::new(),
175 column_order: 0,
176 nested_data: None,
177 }
178 }
179}
180
181impl From<&Column> for Property {
182 fn from(col: &Column) -> Self {
192 let (logical_type, _is_array) = map_data_type_to_logical_type(&col.data_type);
194
195 let physical_type = col
197 .physical_type
198 .clone()
199 .or_else(|| Some(col.data_type.clone()));
200
201 let mut quality: Vec<QualityRule> = col
203 .quality
204 .iter()
205 .filter_map(|q| serde_json::to_value(q).ok())
206 .filter_map(|v| serde_json::from_value(v).ok())
207 .collect();
208
209 if !col.enum_values.is_empty() {
211 quality.push(enum_values_to_quality_rule(&col.enum_values));
212 }
213
214 Property {
215 id: col.id.clone(),
216 name: col.name.clone(),
217 business_name: col.business_name.clone(),
218 description: if col.description.is_empty() {
219 None
220 } else {
221 Some(col.description.clone())
222 },
223 logical_type,
224 physical_type,
225 physical_name: col.physical_name.clone(),
226 logical_type_options: col.logical_type_options.as_ref().map(|opts| {
227 OdcsLogicalTypeOptions {
228 min_length: opts.min_length,
229 max_length: opts.max_length,
230 pattern: opts.pattern.clone(),
231 format: opts.format.clone(),
232 minimum: opts.minimum.clone(),
233 maximum: opts.maximum.clone(),
234 exclusive_minimum: opts.exclusive_minimum.clone(),
235 exclusive_maximum: opts.exclusive_maximum.clone(),
236 precision: opts.precision,
237 scale: opts.scale,
238 }
239 }),
240 required: !col.nullable, primary_key: col.primary_key,
242 primary_key_position: col.primary_key_position,
243 unique: col.unique,
244 partitioned: col.partitioned,
245 partition_key_position: col.partition_key_position,
246 clustered: col.clustered,
247 classification: col.classification.clone(),
248 critical_data_element: col.critical_data_element,
249 encrypted_name: col.encrypted_name.clone(),
250 transform_source_objects: col.transform_source_objects.clone(),
251 transform_logic: col.transform_logic.clone(),
252 transform_description: col.transform_description.clone(),
253 examples: col.examples.clone(),
254 default_value: col.default_value.clone(),
255 relationships: col
256 .relationships
257 .iter()
258 .map(|r| OdcsPropertyRelationship {
259 relationship_type: r.relationship_type.clone(),
260 to: r.to.clone(),
261 })
262 .collect(),
263 authoritative_definitions: col
264 .authoritative_definitions
265 .iter()
266 .map(|d| OdcsAuthDef {
267 definition_type: d.definition_type.clone(),
268 url: d.url.clone(),
269 })
270 .collect(),
271 quality,
272 enum_values: col.enum_values.clone(),
273 tags: col.tags.clone(),
274 custom_properties: col
275 .custom_properties
276 .iter()
277 .map(|(k, v)| CustomProperty::new(k.clone(), v.clone()))
278 .collect(),
279 items: None,
280 properties: Vec::new(),
281 }
282 }
283}
284
285impl From<&SchemaObject> for Table {
290 fn from(schema: &SchemaObject) -> Self {
294 let columns = flatten_properties_to_columns(&schema.properties, "");
296
297 let mut table = Table::new(schema.name.clone(), columns);
298
299 table.schema_name = schema.physical_name.clone();
301
302 if let Some(ref id) = schema.id {
304 table
305 .odcl_metadata
306 .insert("schemaId".to_string(), serde_json::json!(id));
307 }
308 if let Some(ref physical_name) = schema.physical_name {
309 table
310 .odcl_metadata
311 .insert("physicalName".to_string(), serde_json::json!(physical_name));
312 }
313 if let Some(ref physical_type) = schema.physical_type {
314 table
315 .odcl_metadata
316 .insert("physicalType".to_string(), serde_json::json!(physical_type));
317 }
318 if let Some(ref business_name) = schema.business_name {
319 table
320 .odcl_metadata
321 .insert("businessName".to_string(), serde_json::json!(business_name));
322 }
323 if let Some(ref description) = schema.description {
324 table.odcl_metadata.insert(
325 "schemaDescription".to_string(),
326 serde_json::json!(description),
327 );
328 }
329 if let Some(ref granularity) = schema.data_granularity_description {
330 table.odcl_metadata.insert(
331 "dataGranularityDescription".to_string(),
332 serde_json::json!(granularity),
333 );
334 }
335 if !schema.tags.is_empty() {
336 table
337 .odcl_metadata
338 .insert("schemaTags".to_string(), serde_json::json!(schema.tags));
339 }
340 if !schema.relationships.is_empty() {
341 table.odcl_metadata.insert(
342 "schemaRelationships".to_string(),
343 serde_json::to_value(&schema.relationships).unwrap_or_default(),
344 );
345 }
346 if !schema.quality.is_empty() {
347 table.quality = schema
348 .quality
349 .iter()
350 .filter_map(|q| serde_json::to_value(q).ok())
351 .filter_map(|v| v.as_object().cloned())
352 .map(|m| m.into_iter().collect())
353 .collect();
354 }
355 if !schema.authoritative_definitions.is_empty() {
356 table.odcl_metadata.insert(
357 "authoritativeDefinitions".to_string(),
358 serde_json::to_value(&schema.authoritative_definitions).unwrap_or_default(),
359 );
360 }
361 if !schema.custom_properties.is_empty() {
362 table.odcl_metadata.insert(
363 "customProperties".to_string(),
364 serde_json::to_value(&schema.custom_properties).unwrap_or_default(),
365 );
366 }
367
368 table
369 }
370}
371
372fn flatten_properties_to_columns(properties: &[Property], prefix: &str) -> Vec<Column> {
374 let mut columns = Vec::new();
375
376 for prop in properties {
377 let full_name = if prefix.is_empty() {
378 prop.name.clone()
379 } else {
380 format!("{}.{}", prefix, prop.name)
381 };
382
383 let mut col = Column::from(prop);
385 col.name = full_name.clone();
386
387 columns.push(col);
388
389 if !prop.properties.is_empty() {
391 let nested = flatten_properties_to_columns(&prop.properties, &full_name);
392 columns.extend(nested);
393 }
394
395 if let Some(ref items) = prop.items {
397 let items_prefix = format!("{}.[]", full_name);
398 let mut items_col = Column::from(items.as_ref());
399 items_col.name = items_prefix.clone();
400 columns.push(items_col);
401
402 if !items.properties.is_empty() {
404 let nested = flatten_properties_to_columns(&items.properties, &items_prefix);
405 columns.extend(nested);
406 }
407 }
408 }
409
410 columns
411}
412
413fn parse_struct_fields_from_data_type(data_type: &str) -> Vec<Property> {
416 use crate::import::odcs::ODCSImporter;
417
418 let importer = ODCSImporter::new();
419 let field_data = serde_json::Map::new();
420
421 let struct_type = if data_type.to_uppercase().starts_with("ARRAY<STRUCT<") {
423 if let Some(start) = data_type.find("STRUCT<") {
424 &data_type[start..]
425 } else {
426 data_type
427 }
428 } else {
429 data_type
430 };
431
432 if let Ok(nested_cols) = importer.parse_struct_type_from_string("", struct_type, &field_data) {
434 nested_cols
435 .iter()
436 .filter_map(|col| {
437 let field_name = col
439 .name
440 .strip_prefix(".[].")
441 .or_else(|| col.name.strip_prefix("."))
442 .unwrap_or(&col.name);
443
444 if field_name.is_empty() {
445 return None;
446 }
447
448 let (logical_type, _) = map_data_type_to_logical_type(&col.data_type);
449 Some(Property {
450 name: field_name.to_string(),
451 logical_type,
452 physical_type: Some(col.data_type.clone()),
453 ..Default::default()
454 })
455 })
456 .collect()
457 } else {
458 Vec::new()
459 }
460}
461
462fn fix_parent_logical_types(properties: &mut [Property], table: &Table) {
471 for prop in properties.iter_mut() {
472 if let Some(col) = table.columns.iter().find(|c| c.name == prop.name) {
474 let data_type_upper = col.data_type.to_uppercase();
475
476 if data_type_upper.starts_with("ARRAY<") {
478 prop.logical_type = "array".to_string();
479
480 if !prop.properties.is_empty() && prop.items.is_none() {
482 let items_prop = Property {
483 name: String::new(),
484 logical_type: "object".to_string(),
485 properties: std::mem::take(&mut prop.properties),
486 ..Default::default()
487 };
488 prop.items = Some(Box::new(items_prop));
489 }
490 else if prop.properties.is_empty()
492 && prop.items.is_none()
493 && data_type_upper.contains("STRUCT<")
494 {
495 let nested_props = parse_struct_fields_from_data_type(&col.data_type);
496 if !nested_props.is_empty() {
497 let items_prop = Property {
498 name: String::new(),
499 logical_type: "object".to_string(),
500 properties: nested_props,
501 ..Default::default()
502 };
503 prop.items = Some(Box::new(items_prop));
504 }
505 }
506 }
507 else if data_type_upper.starts_with("STRUCT<")
509 || data_type_upper == "STRUCT"
510 || data_type_upper == "OBJECT"
511 {
512 prop.logical_type = "object".to_string();
513
514 if prop.properties.is_empty() && data_type_upper.starts_with("STRUCT<") {
516 prop.properties = parse_struct_fields_from_data_type(&col.data_type);
517 }
518 }
519 }
520
521 if !prop.properties.is_empty() {
523 fix_parent_logical_types(&mut prop.properties, table);
524 }
525
526 if let Some(ref mut items) = prop.items
528 && !items.properties.is_empty()
529 {
530 let items_prefix = format!("{}.[]", prop.name);
532 if let Some(items_col) = table.columns.iter().find(|c| c.name == items_prefix) {
533 let items_type_upper = items_col.data_type.to_uppercase();
534 if items_type_upper.starts_with("STRUCT<")
535 || items_type_upper == "STRUCT"
536 || items_type_upper == "OBJECT"
537 {
538 items.logical_type = "object".to_string();
539 }
540 }
541 fix_parent_logical_types(&mut items.properties, table);
542 }
543 }
544}
545
546impl From<&Table> for SchemaObject {
547 fn from(table: &Table) -> Self {
552 let flat_props: Vec<(String, Property)> = table
554 .columns
555 .iter()
556 .map(|col| (col.name.clone(), Property::from(col)))
557 .collect();
558
559 let mut properties = Property::from_flat_paths(&flat_props);
561
562 fix_parent_logical_types(&mut properties, table);
564
565 let mut schema = SchemaObject::new(table.name.clone()).with_properties(properties);
566
567 if let Some(id) = table.odcl_metadata.get("schemaId").and_then(|v| v.as_str()) {
569 schema.id = Some(id.to_string());
570 }
571 if let Some(physical_name) = table
572 .odcl_metadata
573 .get("physicalName")
574 .and_then(|v| v.as_str())
575 {
576 schema.physical_name = Some(physical_name.to_string());
577 } else if let Some(ref sn) = table.schema_name {
578 schema.physical_name = Some(sn.clone());
579 }
580 if let Some(physical_type) = table
581 .odcl_metadata
582 .get("physicalType")
583 .and_then(|v| v.as_str())
584 {
585 schema.physical_type = Some(physical_type.to_string());
586 }
587 if let Some(business_name) = table
588 .odcl_metadata
589 .get("businessName")
590 .and_then(|v| v.as_str())
591 {
592 schema.business_name = Some(business_name.to_string());
593 }
594 if let Some(description) = table
595 .odcl_metadata
596 .get("schemaDescription")
597 .and_then(|v| v.as_str())
598 {
599 schema.description = Some(description.to_string());
600 }
601 if let Some(granularity) = table
602 .odcl_metadata
603 .get("dataGranularityDescription")
604 .and_then(|v| v.as_str())
605 {
606 schema.data_granularity_description = Some(granularity.to_string());
607 }
608 if let Some(tags) = table.odcl_metadata.get("schemaTags")
609 && let Ok(parsed_tags) = serde_json::from_value::<Vec<String>>(tags.clone())
610 {
611 schema.tags = parsed_tags;
612 }
613 if let Some(rels) = table.odcl_metadata.get("schemaRelationships")
614 && let Ok(parsed_rels) = serde_json::from_value(rels.clone())
615 {
616 schema.relationships = parsed_rels;
617 }
618 if !table.quality.is_empty() {
619 schema.quality = table
620 .quality
621 .iter()
622 .filter_map(|q| serde_json::to_value(q).ok())
623 .filter_map(|v| serde_json::from_value(v).ok())
624 .collect();
625 }
626 if let Some(auth_defs) = table.odcl_metadata.get("authoritativeDefinitions")
627 && let Ok(parsed) = serde_json::from_value(auth_defs.clone())
628 {
629 schema.authoritative_definitions = parsed;
630 }
631 if let Some(custom) = table.odcl_metadata.get("customProperties")
632 && let Ok(parsed) = serde_json::from_value(custom.clone())
633 {
634 schema.custom_properties = parsed;
635 }
636
637 schema
638 }
639}
640
641impl ODCSContract {
646 pub fn to_tables(&self) -> Vec<Table> {
651 self.schema
652 .iter()
653 .map(|schema| {
654 let mut table = Table::from(schema);
655
656 table.odcl_metadata.insert(
658 "apiVersion".to_string(),
659 serde_json::json!(self.api_version),
660 );
661 table
662 .odcl_metadata
663 .insert("kind".to_string(), serde_json::json!(self.kind));
664 table
665 .odcl_metadata
666 .insert("contractId".to_string(), serde_json::json!(self.id));
667 table
668 .odcl_metadata
669 .insert("version".to_string(), serde_json::json!(self.version));
670 table
671 .odcl_metadata
672 .insert("contractName".to_string(), serde_json::json!(self.name));
673
674 if let Some(ref status) = self.status {
675 table
676 .odcl_metadata
677 .insert("status".to_string(), serde_json::json!(status));
678 }
679 if let Some(ref domain) = self.domain {
680 table
681 .odcl_metadata
682 .insert("domain".to_string(), serde_json::json!(domain));
683 }
684 if let Some(ref data_product) = self.data_product {
685 table
686 .odcl_metadata
687 .insert("dataProduct".to_string(), serde_json::json!(data_product));
688 }
689 if let Some(ref tenant) = self.tenant {
690 table
691 .odcl_metadata
692 .insert("tenant".to_string(), serde_json::json!(tenant));
693 }
694 if let Some(ref description) = self.description {
695 table.odcl_metadata.insert(
696 "description".to_string(),
697 serde_json::to_value(description).unwrap_or_default(),
698 );
699 }
700 if !self.servers.is_empty() {
701 table.odcl_metadata.insert(
702 "servers".to_string(),
703 serde_json::to_value(&self.servers).unwrap_or_default(),
704 );
705 }
706 if let Some(ref team) = self.team {
707 table.odcl_metadata.insert(
708 "team".to_string(),
709 serde_json::to_value(team).unwrap_or_default(),
710 );
711 }
712 if let Some(ref support) = self.support {
713 table.odcl_metadata.insert(
714 "support".to_string(),
715 serde_json::to_value(support).unwrap_or_default(),
716 );
717 }
718 if !self.roles.is_empty() {
719 table.odcl_metadata.insert(
720 "roles".to_string(),
721 serde_json::to_value(&self.roles).unwrap_or_default(),
722 );
723 }
724 if !self.service_levels.is_empty() {
725 table.odcl_metadata.insert(
726 "serviceLevels".to_string(),
727 serde_json::to_value(&self.service_levels).unwrap_or_default(),
728 );
729 }
730 if !self.quality.is_empty() {
731 table.odcl_metadata.insert(
732 "contractQuality".to_string(),
733 serde_json::to_value(&self.quality).unwrap_or_default(),
734 );
735 }
736 if let Some(ref price) = self.price {
737 table.odcl_metadata.insert(
738 "price".to_string(),
739 serde_json::to_value(price).unwrap_or_default(),
740 );
741 }
742 if let Some(ref terms) = self.terms {
743 table.odcl_metadata.insert(
744 "terms".to_string(),
745 serde_json::to_value(terms).unwrap_or_default(),
746 );
747 }
748 if !self.links.is_empty() {
749 table.odcl_metadata.insert(
750 "links".to_string(),
751 serde_json::to_value(&self.links).unwrap_or_default(),
752 );
753 }
754 if !self.authoritative_definitions.is_empty() {
755 table.odcl_metadata.insert(
756 "contractAuthoritativeDefinitions".to_string(),
757 serde_json::to_value(&self.authoritative_definitions).unwrap_or_default(),
758 );
759 }
760 if !self.tags.is_empty() {
761 table
762 .odcl_metadata
763 .insert("contractTags".to_string(), serde_json::json!(self.tags));
764 }
765 if !self.custom_properties.is_empty() {
766 table.odcl_metadata.insert(
767 "contractCustomProperties".to_string(),
768 serde_json::to_value(&self.custom_properties).unwrap_or_default(),
769 );
770 }
771 if let Some(ref ts) = self.contract_created_ts {
772 table
773 .odcl_metadata
774 .insert("contractCreatedTs".to_string(), serde_json::json!(ts));
775 }
776
777 table
778 })
779 .collect()
780 }
781
782 pub fn from_tables(tables: &[Table]) -> Self {
787 if tables.is_empty() {
788 return ODCSContract::default();
789 }
790
791 let first_table = &tables[0];
792 let mut contract = ODCSContract::default();
793
794 if let Some(api_version) = first_table
796 .odcl_metadata
797 .get("apiVersion")
798 .and_then(|v| v.as_str())
799 {
800 contract.api_version = api_version.to_string();
801 }
802 if let Some(kind) = first_table
803 .odcl_metadata
804 .get("kind")
805 .and_then(|v| v.as_str())
806 {
807 contract.kind = kind.to_string();
808 }
809 if let Some(id) = first_table
810 .odcl_metadata
811 .get("contractId")
812 .and_then(|v| v.as_str())
813 {
814 contract.id = id.to_string();
815 }
816 if let Some(version) = first_table
817 .odcl_metadata
818 .get("version")
819 .and_then(|v| v.as_str())
820 {
821 contract.version = version.to_string();
822 }
823 if let Some(name) = first_table
824 .odcl_metadata
825 .get("contractName")
826 .and_then(|v| v.as_str())
827 {
828 contract.name = name.to_string();
829 }
830 if let Some(status) = first_table
831 .odcl_metadata
832 .get("status")
833 .and_then(|v| v.as_str())
834 {
835 contract.status = Some(status.to_string());
836 }
837 if let Some(domain) = first_table
838 .odcl_metadata
839 .get("domain")
840 .and_then(|v| v.as_str())
841 {
842 contract.domain = Some(domain.to_string());
843 }
844 if let Some(data_product) = first_table
845 .odcl_metadata
846 .get("dataProduct")
847 .and_then(|v| v.as_str())
848 {
849 contract.data_product = Some(data_product.to_string());
850 }
851 if let Some(tenant) = first_table
852 .odcl_metadata
853 .get("tenant")
854 .and_then(|v| v.as_str())
855 {
856 contract.tenant = Some(tenant.to_string());
857 }
858 if let Some(description) = first_table.odcl_metadata.get("description") {
859 contract.description = serde_json::from_value(description.clone()).ok();
860 }
861 if let Some(servers) = first_table.odcl_metadata.get("servers") {
862 contract.servers = serde_json::from_value(servers.clone()).unwrap_or_default();
863 }
864 if let Some(team) = first_table.odcl_metadata.get("team") {
865 contract.team = serde_json::from_value(team.clone()).ok();
866 }
867 if let Some(support) = first_table.odcl_metadata.get("support") {
868 contract.support = serde_json::from_value(support.clone()).ok();
869 }
870 if let Some(roles) = first_table.odcl_metadata.get("roles") {
871 contract.roles = serde_json::from_value(roles.clone()).unwrap_or_default();
872 }
873 if let Some(service_levels) = first_table.odcl_metadata.get("serviceLevels") {
874 contract.service_levels =
875 serde_json::from_value(service_levels.clone()).unwrap_or_default();
876 }
877 if let Some(quality) = first_table.odcl_metadata.get("contractQuality") {
878 contract.quality = serde_json::from_value(quality.clone()).unwrap_or_default();
879 }
880 if let Some(price) = first_table.odcl_metadata.get("price") {
881 contract.price = serde_json::from_value(price.clone()).ok();
882 }
883 if let Some(terms) = first_table.odcl_metadata.get("terms") {
884 contract.terms = serde_json::from_value(terms.clone()).ok();
885 }
886 if let Some(links) = first_table.odcl_metadata.get("links") {
887 contract.links = serde_json::from_value(links.clone()).unwrap_or_default();
888 }
889 if let Some(auth_defs) = first_table
890 .odcl_metadata
891 .get("contractAuthoritativeDefinitions")
892 {
893 contract.authoritative_definitions =
894 serde_json::from_value(auth_defs.clone()).unwrap_or_default();
895 }
896 if let Some(tags) = first_table.odcl_metadata.get("contractTags") {
897 contract.tags = serde_json::from_value(tags.clone()).unwrap_or_default();
898 }
899 if let Some(custom) = first_table.odcl_metadata.get("contractCustomProperties") {
900 contract.custom_properties = serde_json::from_value(custom.clone()).unwrap_or_default();
901 }
902 if let Some(ts) = first_table
903 .odcl_metadata
904 .get("contractCreatedTs")
905 .and_then(|v| v.as_str())
906 {
907 contract.contract_created_ts = Some(ts.to_string());
908 }
909
910 contract.schema = tables.iter().map(SchemaObject::from).collect();
912
913 contract
914 }
915
916 pub fn from_table(table: &Table) -> Self {
948 let mut contract = Self::from_tables(std::slice::from_ref(table));
950
951 contract.id = table.id.to_string();
953
954 if contract.name.is_empty() {
956 contract.name = table.name.clone();
957 }
958
959 if contract.version.is_empty() {
961 contract.version = "1.0.0".to_string();
962 }
963
964 if contract.status.is_none() {
966 contract.status = Some("draft".to_string());
967 }
968
969 if let Some(infrastructure) = table.odcl_metadata.get("infrastructure") {
972 if contract
974 .custom_properties
975 .iter()
976 .all(|cp| cp.property != "infrastructure")
977 {
978 contract.custom_properties.push(CustomProperty::new(
979 "infrastructure".to_string(),
980 infrastructure.clone(),
981 ));
982 }
983 }
984
985 if let Some(servicelevels) = table.odcl_metadata.get("servicelevels")
987 && contract.service_levels.is_empty()
988 {
989 contract.service_levels =
990 serde_json::from_value(servicelevels.clone()).unwrap_or_default();
991 }
992
993 if let Some(pricing) = table.odcl_metadata.get("pricing")
995 && contract.price.is_none()
996 {
997 contract.price = serde_json::from_value(pricing.clone()).ok();
998 }
999
1000 if contract.tags.is_empty() && !table.tags.is_empty() {
1002 contract.tags = table.tags.iter().map(|t| t.to_string()).collect();
1003 }
1004
1005 if contract.contract_created_ts.is_none() {
1007 contract.contract_created_ts = Some(table.created_at.to_rfc3339());
1008 }
1009
1010 contract
1011 }
1012
1013 pub fn to_table_data(&self) -> Vec<TableData> {
1015 self.schema
1016 .iter()
1017 .enumerate()
1018 .map(|(idx, schema)| {
1019 let description_value = self
1020 .description
1021 .as_ref()
1022 .map(|d| serde_json::to_value(d).unwrap_or(serde_json::Value::Null));
1023
1024 TableData {
1025 table_index: idx,
1026 id: Some(self.id.clone()),
1028 name: Some(schema.name.clone()),
1029 api_version: Some(self.api_version.clone()),
1030 version: Some(self.version.clone()),
1031 status: self.status.clone(),
1032 kind: Some(self.kind.clone()),
1033 domain: self.domain.clone(),
1035 data_product: self.data_product.clone(),
1036 tenant: self.tenant.clone(),
1037 description: description_value,
1039 physical_name: schema.physical_name.clone(),
1041 physical_type: schema.physical_type.clone(),
1042 business_name: schema.business_name.clone(),
1043 data_granularity_description: schema.data_granularity_description.clone(),
1044 columns: schema
1046 .properties
1047 .iter()
1048 .map(property_to_column_data)
1049 .collect(),
1050 servers: self
1052 .servers
1053 .iter()
1054 .filter_map(|s| serde_json::to_value(s).ok())
1055 .collect(),
1056 team: self
1058 .team
1059 .as_ref()
1060 .and_then(|t| serde_json::to_value(t).ok()),
1061 support: self
1062 .support
1063 .as_ref()
1064 .and_then(|s| serde_json::to_value(s).ok()),
1065 roles: self
1067 .roles
1068 .iter()
1069 .filter_map(|r| serde_json::to_value(r).ok())
1070 .collect(),
1071 sla_properties: self
1073 .service_levels
1074 .iter()
1075 .filter_map(|s| serde_json::to_value(s).ok())
1076 .collect(),
1077 quality: self
1078 .quality
1079 .iter()
1080 .filter_map(|q| serde_json::to_value(q).ok())
1081 .filter_map(|v| v.as_object().cloned())
1082 .map(|m| m.into_iter().collect())
1083 .collect(),
1084 price: self
1086 .price
1087 .as_ref()
1088 .and_then(|p| serde_json::to_value(p).ok()),
1089 tags: self.tags.clone(),
1091 custom_properties: self
1092 .custom_properties
1093 .iter()
1094 .filter_map(|cp| serde_json::to_value(cp).ok())
1095 .collect(),
1096 authoritative_definitions: self
1097 .authoritative_definitions
1098 .iter()
1099 .filter_map(|ad| serde_json::to_value(ad).ok())
1100 .collect(),
1101 contract_created_ts: self.contract_created_ts.clone(),
1103 odcs_metadata: std::collections::HashMap::new(),
1105 }
1106 })
1107 .collect()
1108 }
1109}
1110
1111fn property_to_column_data(prop: &Property) -> ColumnData {
1113 ColumnData {
1114 id: prop.id.clone(),
1115 name: prop.name.clone(),
1116 business_name: prop.business_name.clone(),
1117 description: prop.description.clone(),
1118 data_type: prop.logical_type.clone(),
1119 physical_type: prop.physical_type.clone(),
1120 physical_name: prop.physical_name.clone(),
1121 logical_type_options: prop.logical_type_options.as_ref().map(|opts| {
1122 ColumnLogicalTypeOptions {
1123 min_length: opts.min_length,
1124 max_length: opts.max_length,
1125 pattern: opts.pattern.clone(),
1126 format: opts.format.clone(),
1127 minimum: opts.minimum.clone(),
1128 maximum: opts.maximum.clone(),
1129 exclusive_minimum: opts.exclusive_minimum.clone(),
1130 exclusive_maximum: opts.exclusive_maximum.clone(),
1131 precision: opts.precision,
1132 scale: opts.scale,
1133 }
1134 }),
1135 primary_key: prop.primary_key,
1136 primary_key_position: prop.primary_key_position,
1137 unique: prop.unique,
1138 nullable: !prop.required,
1139 partitioned: prop.partitioned,
1140 partition_key_position: prop.partition_key_position,
1141 clustered: prop.clustered,
1142 classification: prop.classification.clone(),
1143 critical_data_element: prop.critical_data_element,
1144 encrypted_name: prop.encrypted_name.clone(),
1145 transform_source_objects: prop.transform_source_objects.clone(),
1146 transform_logic: prop.transform_logic.clone(),
1147 transform_description: prop.transform_description.clone(),
1148 examples: prop.examples.clone(),
1149 default_value: prop.default_value.clone(),
1150 relationships: prop
1151 .relationships
1152 .iter()
1153 .map(|r| ColumnPropertyRelationship {
1154 relationship_type: r.relationship_type.clone(),
1155 to: r.to.clone(),
1156 })
1157 .collect(),
1158 authoritative_definitions: prop
1159 .authoritative_definitions
1160 .iter()
1161 .map(|d| ColumnAuthDef {
1162 definition_type: d.definition_type.clone(),
1163 url: d.url.clone(),
1164 })
1165 .collect(),
1166 quality: if prop.quality.is_empty() {
1167 None
1168 } else {
1169 Some(
1170 prop.quality
1171 .iter()
1172 .filter_map(|q| serde_json::to_value(q).ok())
1173 .filter_map(|v| v.as_object().cloned())
1174 .map(|m| m.into_iter().collect())
1175 .collect(),
1176 )
1177 },
1178 enum_values: if prop.enum_values.is_empty() {
1179 None
1180 } else {
1181 Some(prop.enum_values.clone())
1182 },
1183 tags: prop.tags.clone(),
1184 custom_properties: prop
1185 .custom_properties
1186 .iter()
1187 .map(|cp| (cp.property.clone(), cp.value.clone()))
1188 .collect(),
1189 }
1190}
1191
1192#[cfg(test)]
1193mod tests {
1194 use super::*;
1195
1196 #[test]
1197 fn test_property_to_column_roundtrip() {
1198 let prop = Property::new("email", "string")
1199 .with_required(true)
1200 .with_description("User email address")
1201 .with_classification("pii");
1202
1203 let col = Column::from(&prop);
1204 assert_eq!(col.name, "email");
1205 assert_eq!(col.data_type, "string");
1206 assert!(!col.nullable); assert_eq!(col.description, "User email address");
1208 assert_eq!(col.classification, Some("pii".to_string()));
1209
1210 let prop2 = Property::from(&col);
1211 assert_eq!(prop2.name, "email");
1212 assert_eq!(prop2.logical_type, "string");
1213 assert!(prop2.required);
1214 assert_eq!(prop2.description, Some("User email address".to_string()));
1215 }
1216
1217 #[test]
1218 fn test_data_type_mapping() {
1219 assert_eq!(
1221 map_data_type_to_logical_type("BIGINT"),
1222 ("integer".to_string(), false)
1223 );
1224 assert_eq!(
1225 map_data_type_to_logical_type("INT"),
1226 ("integer".to_string(), false)
1227 );
1228
1229 assert_eq!(
1231 map_data_type_to_logical_type("DECIMAL(10,2)"),
1232 ("number".to_string(), false)
1233 );
1234 assert_eq!(
1235 map_data_type_to_logical_type("DOUBLE"),
1236 ("number".to_string(), false)
1237 );
1238
1239 assert_eq!(
1241 map_data_type_to_logical_type("BOOLEAN"),
1242 ("boolean".to_string(), false)
1243 );
1244
1245 assert_eq!(
1247 map_data_type_to_logical_type("DATE"),
1248 ("date".to_string(), false)
1249 );
1250 assert_eq!(
1251 map_data_type_to_logical_type("TIMESTAMP"),
1252 ("timestamp".to_string(), false)
1253 );
1254
1255 assert_eq!(
1257 map_data_type_to_logical_type("ARRAY<STRING>"),
1258 ("array".to_string(), true)
1259 );
1260
1261 assert_eq!(
1263 map_data_type_to_logical_type("STRUCT<name STRING, age INT>"),
1264 ("object".to_string(), false)
1265 );
1266
1267 assert_eq!(
1269 map_data_type_to_logical_type("VARCHAR(255)"),
1270 ("string".to_string(), false)
1271 );
1272 }
1273
1274 #[test]
1275 fn test_column_to_property_with_data_type_mapping() {
1276 let mut col = Column::new("age".to_string(), "BIGINT".to_string());
1277 col.nullable = false;
1278
1279 let prop = Property::from(&col);
1280 assert_eq!(prop.name, "age");
1281 assert_eq!(prop.logical_type, "integer"); assert_eq!(prop.physical_type, Some("BIGINT".to_string())); assert!(prop.required);
1284 }
1285
1286 #[test]
1287 fn test_enum_values_to_quality_rule() {
1288 let mut col = Column::new("status".to_string(), "VARCHAR(20)".to_string());
1289 col.enum_values = vec![
1290 "active".to_string(),
1291 "inactive".to_string(),
1292 "pending".to_string(),
1293 ];
1294
1295 let prop = Property::from(&col);
1296 assert_eq!(prop.logical_type, "string");
1297
1298 assert!(!prop.quality.is_empty());
1300 let enum_rule = prop
1301 .quality
1302 .iter()
1303 .find(|q| q.rule_type == Some("sql".to_string()));
1304 assert!(enum_rule.is_some());
1305 let rule = enum_rule.unwrap();
1306 assert!(rule.query.as_ref().unwrap().contains("NOT IN"));
1307 assert!(rule.query.as_ref().unwrap().contains("'active'"));
1308 }
1309
1310 #[test]
1311 fn test_schema_to_table_roundtrip() {
1312 let schema = SchemaObject::new("users")
1313 .with_physical_name("tbl_users")
1314 .with_physical_type("table")
1315 .with_business_name("User Accounts")
1316 .with_description("User data")
1317 .with_properties(vec![
1318 Property::new("id", "integer").with_primary_key(true),
1319 Property::new("email", "string").with_required(true),
1320 ]);
1321
1322 let table = Table::from(&schema);
1323 assert_eq!(table.name, "users");
1324 assert_eq!(table.columns.len(), 2);
1325 assert_eq!(
1326 table
1327 .odcl_metadata
1328 .get("physicalName")
1329 .and_then(|v| v.as_str()),
1330 Some("tbl_users")
1331 );
1332
1333 let schema2 = SchemaObject::from(&table);
1334 assert_eq!(schema2.name, "users");
1335 assert_eq!(schema2.physical_name, Some("tbl_users".to_string()));
1336 assert_eq!(schema2.physical_type, Some("table".to_string()));
1337 assert_eq!(schema2.properties.len(), 2);
1338 }
1339
1340 #[test]
1341 fn test_contract_to_tables_roundtrip() {
1342 let contract = ODCSContract::new("test-contract", "1.0.0")
1343 .with_domain("test")
1344 .with_status("active")
1345 .with_schema(
1346 SchemaObject::new("orders")
1347 .with_physical_type("table")
1348 .with_properties(vec![
1349 Property::new("id", "integer").with_primary_key(true),
1350 Property::new("total", "number"),
1351 ]),
1352 )
1353 .with_schema(
1354 SchemaObject::new("items")
1355 .with_physical_type("table")
1356 .with_properties(vec![Property::new("id", "integer").with_primary_key(true)]),
1357 );
1358
1359 let tables = contract.to_tables();
1360 assert_eq!(tables.len(), 2);
1361 assert_eq!(tables[0].name, "orders");
1362 assert_eq!(tables[1].name, "items");
1363
1364 assert_eq!(
1366 tables[0]
1367 .odcl_metadata
1368 .get("domain")
1369 .and_then(|v| v.as_str()),
1370 Some("test")
1371 );
1372
1373 let contract2 = ODCSContract::from_tables(&tables);
1374 assert_eq!(contract2.name, "test-contract");
1375 assert_eq!(contract2.version, "1.0.0");
1376 assert_eq!(contract2.domain, Some("test".to_string()));
1377 assert_eq!(contract2.schema_count(), 2);
1378 }
1379
1380 #[test]
1381 fn test_nested_property_flattening() {
1382 let schema = SchemaObject::new("events").with_properties(vec![
1383 Property::new("id", "string"),
1384 Property::new("address", "object").with_nested_properties(vec![
1385 Property::new("street", "string"),
1386 Property::new("city", "string"),
1387 ]),
1388 ]);
1389
1390 let table = Table::from(&schema);
1391
1392 let column_names: Vec<&str> = table.columns.iter().map(|c| c.name.as_str()).collect();
1394 assert!(column_names.contains(&"id"));
1395 assert!(column_names.contains(&"address"));
1396 assert!(column_names.contains(&"address.street"));
1397 assert!(column_names.contains(&"address.city"));
1398 }
1399
1400 #[test]
1401 fn test_to_table_data() {
1402 let contract = ODCSContract::new("test", "1.0.0")
1403 .with_domain("test-domain")
1404 .with_schema(
1405 SchemaObject::new("users")
1406 .with_description("User data")
1407 .with_properties(vec![
1408 Property::new("id", "integer").with_primary_key(true),
1409 Property::new("name", "string"),
1410 ]),
1411 );
1412
1413 let table_data = contract.to_table_data();
1414 assert_eq!(table_data.len(), 1);
1415 assert_eq!(table_data[0].name, Some("users".to_string()));
1416 assert_eq!(table_data[0].domain, Some("test-domain".to_string()));
1419 assert_eq!(table_data[0].columns.len(), 2);
1420 }
1421
1422 #[test]
1423 fn test_array_struct_type_handling() {
1424 let mut table = Table::new(
1426 "orders".to_string(),
1427 vec![
1428 Column::new("id".to_string(), "BIGINT".to_string()),
1429 Column::new(
1430 "items".to_string(),
1431 "ARRAY<STRUCT<name STRING, qty INT>>".to_string(),
1432 ),
1433 Column::new("items.[].name".to_string(), "STRING".to_string()),
1434 Column::new("items.[].qty".to_string(), "INT".to_string()),
1435 ],
1436 );
1437 table.columns[0].nullable = false;
1438
1439 let schema = SchemaObject::from(&table);
1440 assert_eq!(schema.properties.len(), 2); let id_prop = schema.get_property("id").unwrap();
1444 assert_eq!(id_prop.logical_type, "integer");
1445 assert!(id_prop.required);
1446
1447 let items_prop = schema.get_property("items").unwrap();
1449 assert_eq!(items_prop.logical_type, "array");
1450 assert!(items_prop.items.is_some());
1451
1452 let items_inner = items_prop.items.as_ref().unwrap();
1454 assert_eq!(items_inner.logical_type, "object");
1455 assert_eq!(items_inner.properties.len(), 2);
1456 }
1457
1458 #[test]
1459 fn test_struct_type_handling() {
1460 let table = Table::new(
1462 "users".to_string(),
1463 vec![
1464 Column::new("id".to_string(), "BIGINT".to_string()),
1465 Column::new(
1466 "address".to_string(),
1467 "STRUCT<street STRING, city STRING>".to_string(),
1468 ),
1469 Column::new("address.street".to_string(), "STRING".to_string()),
1470 Column::new("address.city".to_string(), "STRING".to_string()),
1471 ],
1472 );
1473
1474 let schema = SchemaObject::from(&table);
1475 assert_eq!(schema.properties.len(), 2); let address_prop = schema.get_property("address").unwrap();
1479 assert_eq!(address_prop.logical_type, "object");
1480 assert_eq!(address_prop.properties.len(), 2);
1481
1482 let street = address_prop.properties.iter().find(|p| p.name == "street");
1484 assert!(street.is_some());
1485 assert_eq!(street.unwrap().logical_type, "string");
1486 }
1487
1488 #[test]
1489 fn test_from_table() {
1490 use crate::models::tag::Tag;
1491
1492 let mut table = Table::new(
1493 "orders".to_string(),
1494 vec![
1495 Column::new("id".to_string(), "BIGINT".to_string()),
1496 Column::new("total".to_string(), "DECIMAL(10,2)".to_string()),
1497 ],
1498 );
1499
1500 table
1502 .odcl_metadata
1503 .insert("version".to_string(), serde_json::json!("2.0.0"));
1504 table
1505 .odcl_metadata
1506 .insert("status".to_string(), serde_json::json!("active"));
1507 table
1508 .odcl_metadata
1509 .insert("domain".to_string(), serde_json::json!("sales"));
1510 table.tags = vec![Tag::Simple("important".to_string())];
1511
1512 let contract = ODCSContract::from_table(&table);
1513
1514 assert_eq!(contract.id, table.id.to_string());
1516 assert_eq!(contract.name, "orders");
1517 assert_eq!(contract.version, "2.0.0");
1518 assert_eq!(contract.status, Some("active".to_string()));
1519 assert_eq!(contract.domain, Some("sales".to_string()));
1520 assert_eq!(contract.tags, vec!["important".to_string()]);
1521
1522 assert_eq!(contract.schema.len(), 1);
1524 let schema = &contract.schema[0];
1525 assert_eq!(schema.name, "orders");
1526 assert_eq!(schema.properties.len(), 2);
1527
1528 let id_prop = schema.get_property("id").unwrap();
1530 assert_eq!(id_prop.logical_type, "integer");
1531
1532 let total_prop = schema.get_property("total").unwrap();
1533 assert_eq!(total_prop.logical_type, "number");
1534 }
1535
1536 #[test]
1537 fn test_from_table_defaults() {
1538 let table = Table::new(
1540 "simple".to_string(),
1541 vec![Column::new("id".to_string(), "INT".to_string())],
1542 );
1543
1544 let contract = ODCSContract::from_table(&table);
1545
1546 assert_eq!(contract.name, "simple");
1547 assert_eq!(contract.version, "1.0.0"); assert_eq!(contract.status, Some("draft".to_string())); assert!(contract.contract_created_ts.is_some()); }
1551}