data_modelling_core/import/
odcl.rs

1//! ODCL (Open Data Contract Language) parser service for parsing legacy ODCL YAML files.
2//!
3//! This service parses legacy ODCL (Data Contract Specification) YAML files and converts
4//! them to Table models. ODCL is the predecessor to ODCS (Open Data Contract Standard).
5//!
6//! Supports multiple legacy formats:
7//! - Data Contract Specification format (dataContractSpecification, models, definitions)
8//! - Simple ODCL format (name, columns)
9//!
10//! For ODCS v3.1.0/v3.0.x format, use the ODCSImporter instead.
11
12use super::odcs_shared::{
13    ParserError, column_to_column_data, expand_nested_column, extract_catalog_schema,
14    extract_quality_from_obj, extract_shared_domains, json_value_to_serde_value,
15    normalize_data_type, parse_data_vault_classification, parse_foreign_key,
16    parse_foreign_key_from_data_contract, parse_medallion_layer, parse_scd_pattern,
17    parse_struct_fields_from_string, resolve_ref, yaml_to_json_value,
18};
19use super::{ImportError, ImportResult, TableData};
20use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
21use crate::models::{Column, PropertyRelationship, Table, Tag};
22use anyhow::{Context, Result};
23use serde_json::Value as JsonValue;
24use std::collections::HashMap;
25use std::str::FromStr;
26use tracing::info;
27
28/// Convert a $ref path to a PropertyRelationship.
29/// E.g., "#/definitions/order_id" -> PropertyRelationship { type: "foreignKey", to: "definitions/order_id" }
30fn ref_to_relationships(ref_path: &Option<String>) -> Vec<PropertyRelationship> {
31    match ref_path {
32        Some(ref_str) => {
33            let to = if ref_str.starts_with("#/definitions/") {
34                let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
35                format!("definitions/{}", def_path)
36            } else if ref_str.starts_with("#/") {
37                ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
38            } else {
39                ref_str.clone()
40            };
41            vec![PropertyRelationship {
42                relationship_type: "foreignKey".to_string(),
43                to,
44            }]
45        }
46        None => Vec::new(),
47    }
48}
49
50/// ODCL parser service for parsing legacy Open Data Contract Language YAML files.
51/// Handles Data Contract Specification format and simple ODCL format.
52///
53/// For ODCS v3.1.0 format, use ODCSImporter instead.
54pub struct ODCLImporter {
55    /// Current YAML data for $ref resolution
56    current_yaml_data: Option<serde_yaml::Value>,
57}
58
59impl ODCLImporter {
60    /// Create a new ODCL parser instance.
61    ///
62    /// # Example
63    ///
64    /// ```rust
65    /// use data_modelling_core::import::odcl::ODCLImporter;
66    ///
67    /// let mut importer = ODCLImporter::new();
68    /// ```
69    pub fn new() -> Self {
70        Self {
71            current_yaml_data: None,
72        }
73    }
74
75    /// Import ODCL YAML content and create Table (SDK interface).
76    ///
77    /// Supports Data Contract Specification format and simple ODCL format.
78    ///
79    /// # Arguments
80    ///
81    /// * `yaml_content` - ODCL YAML content as a string
82    ///
83    /// # Returns
84    ///
85    /// An `ImportResult` containing the extracted table and any parse errors.
86    ///
87    /// # Example
88    ///
89    /// ```rust
90    /// use data_modelling_core::import::odcl::ODCLImporter;
91    ///
92    /// let mut importer = ODCLImporter::new();
93    /// let yaml = r#"
94    /// dataContractSpecification: 0.9.3
95    /// id: urn:datacontract:example
96    /// models:
97    ///   users:
98    ///     fields:
99    ///       id:
100    ///         type: bigint
101    /// "#;
102    /// let result = importer.import(yaml).unwrap();
103    /// assert_eq!(result.tables.len(), 1);
104    /// ```
105    pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
106        // First parse YAML to get raw data for field extraction
107        let yaml_data: serde_yaml::Value = serde_yaml::from_str(yaml_content)
108            .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
109
110        let json_data = yaml_to_json_value(&yaml_data).map_err(|e| {
111            ImportError::ParseError(format!("Failed to convert YAML to JSON: {}", e))
112        })?;
113
114        match self.parse(yaml_content) {
115            Ok((table, errors)) => {
116                // Extract contract-level fields from the raw JSON data (ODCL/Data Contract format)
117                let sdk_tables = vec![TableData {
118                    table_index: 0,
119                    id: Some(table.id.to_string()),
120                    name: Some(table.name.clone()),
121                    api_version: json_data
122                        .get("dataContractSpecification")
123                        .and_then(|v| v.as_str())
124                        .map(|s| s.to_string()),
125                    version: json_data
126                        .get("info")
127                        .and_then(|v| v.get("version"))
128                        .and_then(|v| v.as_str())
129                        .map(|s| s.to_string()),
130                    status: json_data
131                        .get("info")
132                        .and_then(|v| v.get("status"))
133                        .and_then(|v| v.as_str())
134                        .map(|s| s.to_string()),
135                    kind: Some("DataContract".to_string()),
136                    domain: json_data
137                        .get("info")
138                        .and_then(|v| v.get("domain"))
139                        .and_then(|v| v.as_str())
140                        .map(|s| s.to_string()),
141                    data_product: json_data
142                        .get("info")
143                        .and_then(|v| v.get("dataProduct"))
144                        .and_then(|v| v.as_str())
145                        .map(|s| s.to_string()),
146                    tenant: json_data
147                        .get("info")
148                        .and_then(|v| v.get("tenant"))
149                        .and_then(|v| v.as_str())
150                        .map(|s| s.to_string()),
151                    description: json_data
152                        .get("info")
153                        .and_then(|v| v.get("description"))
154                        .cloned(),
155                    columns: table.columns.iter().map(column_to_column_data).collect(),
156                    servers: json_data
157                        .get("servers")
158                        .and_then(|v| v.as_array())
159                        .cloned()
160                        .unwrap_or_default(),
161                    team: json_data.get("info").and_then(|v| v.get("team")).cloned(),
162                    support: json_data
163                        .get("info")
164                        .and_then(|v| v.get("support"))
165                        .cloned(),
166                    roles: Vec::new(),
167                    sla_properties: json_data
168                        .get("servicelevels")
169                        .and_then(|v| v.as_array())
170                        .cloned()
171                        .unwrap_or_default(),
172                    quality: table.quality.clone(),
173                    price: json_data
174                        .get("info")
175                        .and_then(|v| v.get("pricing"))
176                        .cloned(),
177                    tags: table.tags.iter().map(|t| t.to_string()).collect(),
178                    custom_properties: Vec::new(),
179                    authoritative_definitions: Vec::new(),
180                    contract_created_ts: None,
181                    odcs_metadata: table.odcl_metadata.clone(),
182                }];
183                let sdk_errors: Vec<ImportError> = errors
184                    .iter()
185                    .map(|e| ImportError::ParseError(e.message.clone()))
186                    .collect();
187                Ok(ImportResult {
188                    tables: sdk_tables,
189                    tables_requiring_name: Vec::new(),
190                    errors: sdk_errors,
191                    ai_suggestions: None,
192                })
193            }
194            Err(e) => Err(ImportError::ParseError(e.to_string())),
195        }
196    }
197
198    /// Parse ODCL YAML content and create Table (public method for native app use).
199    ///
200    /// This method returns the full Table object with all metadata, suitable for use in
201    /// native applications that need direct access to the parsed table structure.
202    /// For API use, prefer the `import()` method which returns ImportResult.
203    ///
204    /// # Returns
205    ///
206    /// Returns a tuple of (Table, list of errors/warnings).
207    /// Errors list is empty if parsing is successful.
208    pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
209        self.parse(yaml_content)
210    }
211
212    /// Parse ODCL YAML content and create Table (internal method).
213    ///
214    /// Supports Data Contract Specification format and simple ODCL format.
215    ///
216    /// # Returns
217    ///
218    /// Returns a tuple of (Table, list of errors/warnings).
219    /// Errors list is empty if parsing is successful.
220    fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
221        // Parse YAML
222        let data: serde_yaml::Value =
223            serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
224
225        if data.is_null() {
226            return Err(anyhow::anyhow!("Empty YAML content"));
227        }
228
229        // Store current YAML data for $ref resolution
230        self.current_yaml_data = Some(data.clone());
231
232        // Convert to JSON Value for easier manipulation
233        let json_data = yaml_to_json_value(&data)?;
234
235        // Check format and parse accordingly
236        if self.is_data_contract_format(&json_data) {
237            return self.parse_data_contract(&json_data);
238        }
239
240        // Fall back to simple ODCL format
241        self.parse_simple_odcl(&json_data)
242    }
243
244    /// Check if this importer can handle the given YAML content.
245    ///
246    /// Returns true if the content is in ODCL format (Data Contract Specification
247    /// or simple ODCL format), false if it's in ODCS v3.x format.
248    pub fn can_handle(&self, yaml_content: &str) -> bool {
249        let data: serde_yaml::Value = match serde_yaml::from_str(yaml_content) {
250            Ok(d) => d,
251            Err(_) => return false,
252        };
253
254        let json_data = match yaml_to_json_value(&data) {
255            Ok(j) => j,
256            Err(_) => return false,
257        };
258
259        // Check if it's ODCS v3.x format (should use ODCSImporter instead)
260        if self.is_odcs_v3_format(&json_data) {
261            return false;
262        }
263
264        // Check if it's Data Contract Specification format
265        if self.is_data_contract_format(&json_data) {
266            return true;
267        }
268
269        // Check if it's simple ODCL format (has name and columns)
270        if let Some(obj) = json_data.as_object() {
271            let has_name = obj.contains_key("name");
272            let has_columns = obj.get("columns").and_then(|v| v.as_array()).is_some();
273            return has_name && has_columns;
274        }
275
276        false
277    }
278
279    /// Check if YAML is in ODCS v3.x format.
280    fn is_odcs_v3_format(&self, data: &JsonValue) -> bool {
281        if let Some(obj) = data.as_object() {
282            let has_api_version = obj.contains_key("apiVersion");
283            let has_kind = obj
284                .get("kind")
285                .and_then(|v| v.as_str())
286                .map(|s| s == "DataContract")
287                .unwrap_or(false);
288            let has_id = obj.contains_key("id");
289            let has_version = obj.contains_key("version");
290            return has_api_version && has_kind && has_id && has_version;
291        }
292        false
293    }
294
295    /// Check if YAML is in Data Contract specification format.
296    fn is_data_contract_format(&self, data: &JsonValue) -> bool {
297        if let Some(obj) = data.as_object() {
298            let has_spec = obj.contains_key("dataContractSpecification");
299            let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
300            return has_spec && has_models;
301        }
302        false
303    }
304
305    /// Parse simple ODCL format.
306    fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
307        let mut errors = Vec::new();
308
309        // Extract table name
310        let name = data
311            .get("name")
312            .and_then(|v| v.as_str())
313            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
314            .to_string();
315
316        // Extract columns
317        let columns_data = data
318            .get("columns")
319            .and_then(|v| v.as_array())
320            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
321
322        let mut columns = Vec::new();
323        for (idx, col_data) in columns_data.iter().enumerate() {
324            match self.parse_column(col_data) {
325                Ok(col) => columns.push(col),
326                Err(e) => {
327                    errors.push(ParserError {
328                        error_type: "column_parse_error".to_string(),
329                        field: format!("columns[{}]", idx),
330                        message: e.to_string(),
331                    });
332                }
333            }
334        }
335
336        // Extract metadata
337        let database_type = self.extract_database_type(data);
338        let medallion_layers = self.extract_medallion_layers(data);
339        let scd_pattern = self.extract_scd_pattern(data);
340        let data_vault_classification = self.extract_data_vault_classification(data);
341        let quality_rules = self.extract_quality_rules(data);
342
343        // Validate pattern exclusivity
344        if scd_pattern.is_some() && data_vault_classification.is_some() {
345            errors.push(ParserError {
346                error_type: "validation_error".to_string(),
347                field: "patterns".to_string(),
348                message: "SCD pattern and Data Vault classification are mutually exclusive"
349                    .to_string(),
350            });
351        }
352
353        // Extract odcl_metadata
354        let mut odcl_metadata = HashMap::new();
355        if let Some(metadata) = data.get("odcl_metadata")
356            && let Some(obj) = metadata.as_object()
357        {
358            for (key, value) in obj {
359                odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
360            }
361        }
362
363        let table_uuid = self.extract_table_uuid(data);
364
365        let table = Table {
366            id: table_uuid,
367            name,
368            columns,
369            database_type,
370            catalog_name: None,
371            schema_name: None,
372            medallion_layers,
373            scd_pattern,
374            data_vault_classification,
375            modeling_level: None,
376            tags: Vec::<Tag>::new(),
377            odcl_metadata,
378            owner: None,
379            sla: None,
380            contact_details: None,
381            infrastructure_type: None,
382            notes: None,
383            position: None,
384            yaml_file_path: None,
385            drawio_cell_id: None,
386            quality: quality_rules,
387            errors: Vec::new(),
388            created_at: chrono::Utc::now(),
389            updated_at: chrono::Utc::now(),
390        };
391
392        info!("Parsed ODCL table: {}", table.name);
393        Ok((table, errors))
394    }
395
396    /// Parse a single column definition.
397    fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
398        let name = col_data
399            .get("name")
400            .and_then(|v| v.as_str())
401            .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
402            .to_string();
403
404        let data_type = col_data
405            .get("data_type")
406            .and_then(|v| v.as_str())
407            .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
408            .to_string();
409
410        // Normalize data_type to uppercase (preserve STRUCT<...> format)
411        let data_type = normalize_data_type(&data_type);
412
413        let nullable = col_data
414            .get("nullable")
415            .and_then(|v| v.as_bool())
416            .unwrap_or(true);
417
418        let primary_key = col_data
419            .get("primary_key")
420            .and_then(|v| v.as_bool())
421            .unwrap_or(false);
422
423        let foreign_key = col_data.get("foreign_key").and_then(parse_foreign_key);
424
425        let constraints = col_data
426            .get("constraints")
427            .and_then(|v| v.as_array())
428            .map(|arr| {
429                arr.iter()
430                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
431                    .collect()
432            })
433            .unwrap_or_default();
434
435        let description = col_data
436            .get("description")
437            .and_then(|v| v.as_str())
438            .map(|s| s.to_string())
439            .unwrap_or_default();
440
441        // Extract column-level quality rules
442        let mut column_quality_rules = Vec::new();
443        if let Some(quality_val) = col_data.get("quality") {
444            if let Some(arr) = quality_val.as_array() {
445                // Array of quality rules
446                for item in arr {
447                    if let Some(obj) = item.as_object() {
448                        let mut rule = HashMap::new();
449                        for (key, value) in obj {
450                            rule.insert(key.clone(), json_value_to_serde_value(value));
451                        }
452                        column_quality_rules.push(rule);
453                    }
454                }
455            } else if let Some(obj) = quality_val.as_object() {
456                // Single quality rule object
457                let mut rule = HashMap::new();
458                for (key, value) in obj {
459                    rule.insert(key.clone(), json_value_to_serde_value(value));
460                }
461                column_quality_rules.push(rule);
462            }
463        }
464
465        Ok(Column {
466            name,
467            data_type,
468            nullable,
469            primary_key,
470            foreign_key,
471            constraints,
472            description,
473            quality: column_quality_rules,
474            ..Default::default()
475        })
476    }
477
478    /// Extract database type from data.
479    fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
480        data.get("database_type")
481            .and_then(|v| v.as_str())
482            .and_then(|s| match s.to_uppercase().as_str() {
483                "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
484                "MYSQL" => Some(DatabaseType::Mysql),
485                "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
486                "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
487                "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
488                _ => None,
489            })
490    }
491
492    /// Extract medallion layers from data.
493    fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
494        let mut layers = Vec::new();
495
496        // Check plural form first
497        if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
498            for item in arr {
499                if let Some(s) = item.as_str()
500                    && let Ok(layer) = parse_medallion_layer(s)
501                {
502                    layers.push(layer);
503                }
504            }
505        }
506        // Check singular form (backward compatibility)
507        else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
508            && let Ok(layer) = parse_medallion_layer(s)
509        {
510            layers.push(layer);
511        }
512
513        layers
514    }
515
516    /// Extract SCD pattern from data.
517    fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
518        data.get("scd_pattern")
519            .and_then(|v| v.as_str())
520            .and_then(|s| parse_scd_pattern(s).ok())
521    }
522
523    /// Extract Data Vault classification from data.
524    fn extract_data_vault_classification(
525        &self,
526        data: &JsonValue,
527    ) -> Option<DataVaultClassification> {
528        data.get("data_vault_classification")
529            .and_then(|v| v.as_str())
530            .and_then(|s| parse_data_vault_classification(s).ok())
531    }
532
533    /// Extract quality rules from data.
534    fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
535        use serde_json::Value;
536        let mut quality_rules = Vec::new();
537
538        // Check for quality field at root level (array of objects or single object)
539        if let Some(quality_val) = data.get("quality") {
540            if let Some(arr) = quality_val.as_array() {
541                // Array of quality rules
542                for item in arr {
543                    if let Some(obj) = item.as_object() {
544                        let mut rule = HashMap::new();
545                        for (key, value) in obj {
546                            rule.insert(key.clone(), json_value_to_serde_value(value));
547                        }
548                        quality_rules.push(rule);
549                    }
550                }
551            } else if let Some(obj) = quality_val.as_object() {
552                // Single quality rule object
553                let mut rule = HashMap::new();
554                for (key, value) in obj {
555                    rule.insert(key.clone(), json_value_to_serde_value(value));
556                }
557                quality_rules.push(rule);
558            } else if let Some(s) = quality_val.as_str() {
559                // Simple string quality value
560                let mut rule = HashMap::new();
561                rule.insert("value".to_string(), Value::String(s.to_string()));
562                quality_rules.push(rule);
563            }
564        }
565
566        // Check for quality in metadata (ODCL format)
567        if let Some(metadata) = data.get("metadata")
568            && let Some(metadata_obj) = metadata.as_object()
569            && let Some(quality_val) = metadata_obj.get("quality")
570        {
571            if let Some(arr) = quality_val.as_array() {
572                // Array of quality rules
573                for item in arr {
574                    if let Some(obj) = item.as_object() {
575                        let mut rule = HashMap::new();
576                        for (key, value) in obj {
577                            rule.insert(key.clone(), json_value_to_serde_value(value));
578                        }
579                        quality_rules.push(rule);
580                    }
581                }
582            } else if let Some(obj) = quality_val.as_object() {
583                // Single quality rule object
584                let mut rule = HashMap::new();
585                for (key, value) in obj {
586                    rule.insert(key.clone(), json_value_to_serde_value(value));
587                }
588                quality_rules.push(rule);
589            } else if let Some(s) = quality_val.as_str() {
590                // Simple string quality value
591                let mut rule = HashMap::new();
592                rule.insert("value".to_string(), Value::String(s.to_string()));
593                quality_rules.push(rule);
594            }
595        }
596
597        // Check for tblproperties field (similar to SQL TBLPROPERTIES)
598        if let Some(tblprops) = data.get("tblproperties")
599            && let Some(obj) = tblprops.as_object()
600        {
601            for (key, value) in obj {
602                let mut rule = HashMap::new();
603                rule.insert("property".to_string(), Value::String(key.clone()));
604                rule.insert("value".to_string(), json_value_to_serde_value(value));
605                quality_rules.push(rule);
606            }
607        }
608
609        quality_rules
610    }
611
612    /// Parse Data Contract format.
613    fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
614        let mut errors = Vec::new();
615
616        // Extract models
617        let models = data
618            .get("models")
619            .and_then(|v| v.as_object())
620            .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
621
622        // parse_table() returns a single Table, so we parse the first model.
623        // If multiple models are needed, call parse_table() multiple times or use import().
624        let (model_name, model_data) = models
625            .iter()
626            .next()
627            .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
628
629        let model_data = model_data
630            .as_object()
631            .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
632
633        // Extract fields (columns)
634        let fields = model_data
635            .get("fields")
636            .and_then(|v| v.as_object())
637            .ok_or_else(|| {
638                errors.push(ParserError {
639                    error_type: "validation_error".to_string(),
640                    field: format!("Model '{}'", model_name),
641                    message: format!("Model '{}' missing 'fields' field", model_name),
642                });
643                anyhow::anyhow!("Missing fields")
644            });
645
646        let fields = match fields {
647            Ok(f) => f,
648            Err(_) => {
649                // Return empty table with errors
650                let quality_rules = self.extract_quality_rules(data);
651                let table_uuid = self.extract_table_uuid(data);
652                let table = Table {
653                    id: table_uuid,
654                    name: model_name.clone(),
655                    columns: Vec::new(),
656                    database_type: None,
657                    catalog_name: None,
658                    schema_name: None,
659                    medallion_layers: Vec::new(),
660                    scd_pattern: None,
661                    data_vault_classification: None,
662                    modeling_level: None,
663                    tags: Vec::<Tag>::new(),
664                    odcl_metadata: HashMap::new(),
665                    owner: None,
666                    sla: None,
667                    contact_details: None,
668                    infrastructure_type: None,
669                    notes: None,
670                    position: None,
671                    yaml_file_path: None,
672                    drawio_cell_id: None,
673                    quality: quality_rules,
674                    errors: Vec::new(),
675                    created_at: chrono::Utc::now(),
676                    updated_at: chrono::Utc::now(),
677                };
678                return Ok((table, errors));
679            }
680        };
681
682        // Parse fields as columns
683        let mut columns = Vec::new();
684        for (field_name, field_data) in fields {
685            if let Some(field_obj) = field_data.as_object() {
686                match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
687                    Ok(mut cols) => columns.append(&mut cols),
688                    Err(e) => {
689                        errors.push(ParserError {
690                            error_type: "field_parse_error".to_string(),
691                            field: format!("Field '{}'", field_name),
692                            message: e.to_string(),
693                        });
694                    }
695                }
696            } else {
697                errors.push(ParserError {
698                    error_type: "validation_error".to_string(),
699                    field: format!("Field '{}'", field_name),
700                    message: format!("Field '{}' must be an object", field_name),
701                });
702            }
703        }
704
705        // Extract metadata from info section
706        let mut odcl_metadata = HashMap::new();
707
708        // Extract info section and nest it properly
709        if let Some(info_val) = data.get("info") {
710            let info_json_value = json_value_to_serde_value(info_val);
711            odcl_metadata.insert("info".to_string(), info_json_value);
712        }
713
714        odcl_metadata.insert(
715            "dataContractSpecification".to_string(),
716            json_value_to_serde_value(
717                data.get("dataContractSpecification")
718                    .unwrap_or(&JsonValue::Null),
719            ),
720        );
721        odcl_metadata.insert(
722            "id".to_string(),
723            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
724        );
725
726        // Extract servicelevels if present
727        if let Some(servicelevels_val) = data.get("servicelevels") {
728            odcl_metadata.insert(
729                "servicelevels".to_string(),
730                json_value_to_serde_value(servicelevels_val),
731            );
732        }
733
734        // Extract links if present
735        if let Some(links_val) = data.get("links") {
736            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
737        }
738
739        // Extract domain, dataProduct, tenant
740        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
741            odcl_metadata.insert(
742                "domain".to_string(),
743                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
744            );
745        }
746        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
747            odcl_metadata.insert(
748                "dataProduct".to_string(),
749                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
750            );
751        }
752        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
753            odcl_metadata.insert(
754                "tenant".to_string(),
755                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
756            );
757        }
758
759        // Extract top-level description (can be object or string)
760        if let Some(desc_val) = data.get("description") {
761            odcl_metadata.insert(
762                "description".to_string(),
763                json_value_to_serde_value(desc_val),
764            );
765        }
766
767        // Extract pricing
768        if let Some(pricing_val) = data.get("pricing") {
769            odcl_metadata.insert(
770                "pricing".to_string(),
771                json_value_to_serde_value(pricing_val),
772            );
773        }
774
775        // Extract team
776        if let Some(team_val) = data.get("team") {
777            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
778        }
779
780        // Extract roles
781        if let Some(roles_val) = data.get("roles") {
782            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
783        }
784
785        // Extract terms
786        if let Some(terms_val) = data.get("terms") {
787            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
788        }
789
790        // Extract full servers array (not just type)
791        if let Some(servers_val) = data.get("servers") {
792            odcl_metadata.insert(
793                "servers".to_string(),
794                json_value_to_serde_value(servers_val),
795            );
796        }
797
798        // Extract infrastructure
799        if let Some(infrastructure_val) = data.get("infrastructure") {
800            odcl_metadata.insert(
801                "infrastructure".to_string(),
802                json_value_to_serde_value(infrastructure_val),
803            );
804        }
805
806        // Extract database type from servers if available
807        let database_type = self.extract_database_type_from_servers(data);
808
809        // Extract catalog and schema from customProperties
810        let (catalog_name, schema_name) = extract_catalog_schema(data);
811
812        // Extract sharedDomains from customProperties
813        let shared_domains = extract_shared_domains(data);
814
815        // Extract tags from top-level tags field (Data Contract format)
816        let mut tags: Vec<Tag> = Vec::new();
817        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
818            for item in tags_arr {
819                if let Some(s) = item.as_str() {
820                    // Parse tag string to Tag enum (supports Simple, Pair, List formats)
821                    if let Ok(tag) = Tag::from_str(s) {
822                        tags.push(tag);
823                    } else {
824                        // Fallback: create Simple tag if parsing fails
825                        tags.push(crate::models::Tag::Simple(s.to_string()));
826                    }
827                }
828            }
829        }
830
831        // Extract quality rules
832        let quality_rules = self.extract_quality_rules(data);
833
834        // Store sharedDomains in metadata
835        if !shared_domains.is_empty() {
836            let shared_domains_json: Vec<serde_json::Value> = shared_domains
837                .iter()
838                .map(|d| serde_json::Value::String(d.clone()))
839                .collect();
840            odcl_metadata.insert(
841                "sharedDomains".to_string(),
842                serde_json::Value::Array(shared_domains_json),
843            );
844        }
845
846        let table_uuid = self.extract_table_uuid(data);
847
848        let table = Table {
849            id: table_uuid,
850            name: model_name.clone(),
851            columns,
852            database_type,
853            catalog_name,
854            schema_name,
855            medallion_layers: Vec::new(),
856            scd_pattern: None,
857            data_vault_classification: None,
858            modeling_level: None,
859            tags,
860            odcl_metadata,
861            owner: None,
862            sla: None,
863            contact_details: None,
864            infrastructure_type: None,
865            notes: None,
866            position: None,
867            yaml_file_path: None,
868            drawio_cell_id: None,
869            quality: quality_rules,
870            errors: Vec::new(),
871            created_at: chrono::Utc::now(),
872            updated_at: chrono::Utc::now(),
873        };
874
875        info!(
876            "Parsed Data Contract table: {} with {} warnings/errors",
877            model_name,
878            errors.len()
879        );
880        Ok((table, errors))
881    }
882
883    /// Parse a single field from Data Contract format.
884    fn parse_data_contract_field(
885        &self,
886        field_name: &str,
887        field_data: &serde_json::Map<String, JsonValue>,
888        data: &JsonValue,
889        errors: &mut Vec<ParserError>,
890    ) -> Result<Vec<Column>> {
891        let mut columns = Vec::new();
892
893        // Extract description from field_data (preserve empty strings)
894        let description = field_data
895            .get("description")
896            .and_then(|v| v.as_str())
897            .unwrap_or("")
898            .to_string();
899
900        // Extract quality rules from field_data
901        let mut quality_rules = extract_quality_from_obj(field_data);
902
903        // Check for $ref
904        if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
905            // Store ref_path (preserve even if definition doesn't exist)
906            let ref_path = Some(ref_str.to_string());
907
908            if let Some(definition) = resolve_ref(ref_str, data) {
909                // Also extract quality rules from definition and merge (if field doesn't have any)
910                if quality_rules.is_empty() {
911                    if let Some(def_obj) = definition.as_object() {
912                        quality_rules = extract_quality_from_obj(def_obj);
913                    }
914                } else {
915                    // Merge definition quality rules if field has some
916                    if let Some(def_obj) = definition.as_object() {
917                        let def_quality = extract_quality_from_obj(def_obj);
918                        // Append definition quality rules (field-level takes precedence)
919                        quality_rules.extend(def_quality);
920                    }
921                }
922
923                let required = field_data
924                    .get("required")
925                    .and_then(|v| v.as_bool())
926                    .unwrap_or(false);
927
928                // Check if definition is an object/struct with nested structure
929                let has_nested = definition
930                    .get("type")
931                    .and_then(|v| v.as_str())
932                    .map(|s| s == "object")
933                    .unwrap_or(false)
934                    || definition.get("properties").is_some()
935                    || definition.get("fields").is_some();
936
937                if has_nested {
938                    // Expand STRUCT from definition into nested columns with dot notation
939                    if let Some(properties) =
940                        definition.get("properties").and_then(|v| v.as_object())
941                    {
942                        // Recursively expand nested properties
943                        let nested_required: Vec<String> = definition
944                            .get("required")
945                            .and_then(|v| v.as_array())
946                            .map(|arr| {
947                                arr.iter()
948                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
949                                    .collect()
950                            })
951                            .unwrap_or_default();
952
953                        for (nested_name, nested_schema) in properties {
954                            let nested_required_field = nested_required.contains(nested_name);
955                            expand_nested_column(
956                                &format!("{}.{}", field_name, nested_name),
957                                nested_schema,
958                                !nested_required_field,
959                                &mut columns,
960                                errors,
961                            );
962                        }
963                    } else if let Some(fields) =
964                        definition.get("fields").and_then(|v| v.as_object())
965                    {
966                        // Handle fields format (ODCL style)
967                        for (nested_name, nested_schema) in fields {
968                            expand_nested_column(
969                                &format!("{}.{}", field_name, nested_name),
970                                nested_schema,
971                                true, // Assume nullable if not specified
972                                &mut columns,
973                                errors,
974                            );
975                        }
976                    } else {
977                        // Fallback: create parent column as OBJECT if we can't expand
978                        columns.push(Column {
979                            name: field_name.to_string(),
980                            data_type: "OBJECT".to_string(),
981                            nullable: !required,
982                            description: if description.is_empty() {
983                                definition
984                                    .get("description")
985                                    .and_then(|v| v.as_str())
986                                    .unwrap_or("")
987                                    .to_string()
988                            } else {
989                                description.clone()
990                            },
991                            quality: quality_rules.clone(),
992                            relationships: ref_to_relationships(&ref_path),
993                            ..Default::default()
994                        });
995                    }
996                } else {
997                    // Simple type from definition
998                    let def_type = definition
999                        .get("type")
1000                        .and_then(|v| v.as_str())
1001                        .unwrap_or("STRING")
1002                        .to_uppercase();
1003
1004                    let enum_values = definition
1005                        .get("enum")
1006                        .and_then(|v| v.as_array())
1007                        .map(|arr| {
1008                            arr.iter()
1009                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
1010                                .collect()
1011                        })
1012                        .unwrap_or_default();
1013
1014                    columns.push(Column {
1015                        name: field_name.to_string(),
1016                        data_type: def_type,
1017                        nullable: !required,
1018                        description: if description.is_empty() {
1019                            definition
1020                                .get("description")
1021                                .and_then(|v| v.as_str())
1022                                .unwrap_or("")
1023                                .to_string()
1024                        } else {
1025                            description
1026                        },
1027                        quality: quality_rules,
1028                        relationships: ref_to_relationships(&ref_path),
1029                        enum_values,
1030                        ..Default::default()
1031                    });
1032                }
1033                return Ok(columns);
1034            } else {
1035                // Undefined reference - create column with error
1036                let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
1037                let mut error_map = HashMap::new();
1038                error_map.insert("type".to_string(), serde_json::json!("validation_error"));
1039                error_map.insert("field".to_string(), serde_json::json!("data_type"));
1040                error_map.insert(
1041                    "message".to_string(),
1042                    serde_json::json!(format!(
1043                        "Field '{}' references undefined definition: {}",
1044                        field_name, ref_str
1045                    )),
1046                );
1047                col_errors.push(error_map);
1048                columns.push(Column {
1049                    name: field_name.to_string(),
1050                    data_type: "OBJECT".to_string(),
1051                    description,
1052                    errors: col_errors,
1053                    relationships: ref_to_relationships(&Some(ref_str.to_string())),
1054                    ..Default::default()
1055                });
1056                return Ok(columns);
1057            }
1058        }
1059
1060        // Extract field type - check both "logicalType" (ODCS v3.1.0) and "type" (legacy)
1061        // Default to STRING if missing
1062        let field_type_str = field_data
1063            .get("logicalType")
1064            .and_then(|v| v.as_str())
1065            .or_else(|| field_data.get("type").and_then(|v| v.as_str()))
1066            .unwrap_or("STRING");
1067
1068        // Check if type contains STRUCT definition (multiline STRUCT type)
1069        if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
1070            match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
1071                Ok(nested_cols) if !nested_cols.is_empty() => {
1072                    // We have nested columns - add parent column with full type, then nested columns
1073                    let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
1074                        "ARRAY<STRUCT<...>>".to_string()
1075                    } else {
1076                        "STRUCT<...>".to_string()
1077                    };
1078
1079                    // Add parent column
1080                    columns.push(Column {
1081                        name: field_name.to_string(),
1082                        data_type: parent_data_type,
1083                        nullable: !field_data
1084                            .get("required")
1085                            .and_then(|v| v.as_bool())
1086                            .unwrap_or(false),
1087                        description: description.clone(),
1088                        quality: quality_rules.clone(),
1089                        relationships: ref_to_relationships(
1090                            &field_data
1091                                .get("$ref")
1092                                .and_then(|v| v.as_str())
1093                                .map(|s| s.to_string()),
1094                        ),
1095                        ..Default::default()
1096                    });
1097
1098                    // Add nested columns
1099                    columns.extend(nested_cols);
1100                    return Ok(columns);
1101                }
1102                Ok(_) | Err(_) => {
1103                    // If parsing fails or returns empty, fall back to using the type as-is
1104                }
1105            }
1106        }
1107
1108        let field_type = normalize_data_type(field_type_str);
1109
1110        // Handle ARRAY type
1111        if field_type == "ARRAY" {
1112            let items = field_data.get("items");
1113            if let Some(items_val) = items {
1114                if let Some(items_obj) = items_val.as_object() {
1115                    // Check if items is an object with fields (nested structure)
1116                    let items_type = items_obj
1117                        .get("logicalType")
1118                        .and_then(|v| v.as_str())
1119                        .or_else(|| items_obj.get("type").and_then(|v| v.as_str()));
1120
1121                    // Normalize legacy "type" values to "logicalType" equivalents
1122                    let normalized_items_type = match items_type {
1123                        Some("object") | Some("struct") => Some("object"),
1124                        Some("array") => Some("array"),
1125                        Some("string") | Some("varchar") | Some("char") | Some("text") => {
1126                            Some("string")
1127                        }
1128                        Some("integer") | Some("int") | Some("bigint") | Some("smallint")
1129                        | Some("tinyint") => Some("integer"),
1130                        Some("number") | Some("decimal") | Some("double") | Some("float")
1131                        | Some("numeric") => Some("number"),
1132                        Some("boolean") | Some("bool") => Some("boolean"),
1133                        Some("date") => Some("date"),
1134                        Some("timestamp") | Some("datetime") => Some("timestamp"),
1135                        Some("time") => Some("time"),
1136                        other => other,
1137                    };
1138
1139                    if items_obj.get("fields").is_some()
1140                        || items_obj.get("properties").is_some()
1141                        || normalized_items_type == Some("object")
1142                    {
1143                        // Array of objects - create parent column as ARRAY<OBJECT>
1144                        columns.push(Column {
1145                            name: field_name.to_string(),
1146                            data_type: "ARRAY<OBJECT>".to_string(),
1147                            nullable: !field_data
1148                                .get("required")
1149                                .and_then(|v| v.as_bool())
1150                                .unwrap_or(false),
1151                            description: field_data
1152                                .get("description")
1153                                .and_then(|v| v.as_str())
1154                                .unwrap_or("")
1155                                .to_string(),
1156                            ..Default::default()
1157                        });
1158
1159                        // Extract nested fields from items.properties or items.fields if present
1160                        let properties_obj =
1161                            items_obj.get("properties").and_then(|v| v.as_object());
1162                        let fields_obj = items_obj.get("fields").and_then(|v| v.as_object());
1163
1164                        if let Some(fields_map) = properties_obj.or(fields_obj) {
1165                            for (nested_field_name, nested_field_data) in fields_map {
1166                                if let Some(nested_field_obj) = nested_field_data.as_object() {
1167                                    let nested_field_type = nested_field_obj
1168                                        .get("logicalType")
1169                                        .and_then(|v| v.as_str())
1170                                        .or_else(|| {
1171                                            nested_field_obj.get("type").and_then(|v| v.as_str())
1172                                        })
1173                                        .unwrap_or("STRING");
1174
1175                                    // Recursively parse nested fields with array prefix
1176                                    let nested_col_name =
1177                                        format!("{}.[].{}", field_name, nested_field_name);
1178                                    let mut local_errors = Vec::new();
1179                                    match self.parse_data_contract_field(
1180                                        &nested_col_name,
1181                                        nested_field_obj,
1182                                        data,
1183                                        &mut local_errors,
1184                                    ) {
1185                                        Ok(mut nested_cols) => {
1186                                            columns.append(&mut nested_cols);
1187                                        }
1188                                        Err(_) => {
1189                                            // Fallback: create simple nested column
1190                                            columns.push(Column {
1191                                                name: nested_col_name,
1192                                                data_type: nested_field_type.to_uppercase(),
1193                                                nullable: !nested_field_obj
1194                                                    .get("required")
1195                                                    .and_then(|v| v.as_bool())
1196                                                    .unwrap_or(false),
1197                                                description: nested_field_obj
1198                                                    .get("description")
1199                                                    .and_then(|v| v.as_str())
1200                                                    .unwrap_or("")
1201                                                    .to_string(),
1202                                                ..Default::default()
1203                                            });
1204                                        }
1205                                    }
1206                                }
1207                            }
1208                        }
1209
1210                        return Ok(columns);
1211                    } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
1212                        // Array of simple type
1213                        columns.push(Column {
1214                            name: field_name.to_string(),
1215                            data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
1216                            nullable: !field_data
1217                                .get("required")
1218                                .and_then(|v| v.as_bool())
1219                                .unwrap_or(false),
1220                            description: description.clone(),
1221                            quality: quality_rules.clone(),
1222                            relationships: ref_to_relationships(
1223                                &field_data
1224                                    .get("$ref")
1225                                    .and_then(|v| v.as_str())
1226                                    .map(|s| s.to_string()),
1227                            ),
1228                            ..Default::default()
1229                        });
1230                        return Ok(columns);
1231                    }
1232                } else if let Some(item_type_str) = items_val.as_str() {
1233                    // Array of simple type (string)
1234                    columns.push(Column {
1235                        name: field_name.to_string(),
1236                        data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
1237                        nullable: !field_data
1238                            .get("required")
1239                            .and_then(|v| v.as_bool())
1240                            .unwrap_or(false),
1241                        description: description.clone(),
1242                        quality: quality_rules.clone(),
1243                        relationships: ref_to_relationships(
1244                            &field_data
1245                                .get("$ref")
1246                                .and_then(|v| v.as_str())
1247                                .map(|s| s.to_string()),
1248                        ),
1249                        ..Default::default()
1250                    });
1251                    return Ok(columns);
1252                }
1253            }
1254            // Array without items - default to ARRAY<STRING>
1255            columns.push(Column {
1256                name: field_name.to_string(),
1257                data_type: "ARRAY<STRING>".to_string(),
1258                nullable: !field_data
1259                    .get("required")
1260                    .and_then(|v| v.as_bool())
1261                    .unwrap_or(false),
1262                description: description.clone(),
1263                quality: quality_rules.clone(),
1264                relationships: ref_to_relationships(
1265                    &field_data
1266                        .get("$ref")
1267                        .and_then(|v| v.as_str())
1268                        .map(|s| s.to_string()),
1269                ),
1270                ..Default::default()
1271            });
1272            return Ok(columns);
1273        }
1274
1275        // Check if this is a nested object with fields or properties
1276        let nested_fields_obj = field_data
1277            .get("properties")
1278            .and_then(|v| v.as_object())
1279            .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
1280
1281        if field_type == "OBJECT" && nested_fields_obj.is_some() {
1282            // Inline nested object - create parent column as OBJECT and extract nested fields
1283            columns.push(Column {
1284                name: field_name.to_string(),
1285                data_type: "OBJECT".to_string(),
1286                nullable: !field_data
1287                    .get("required")
1288                    .and_then(|v| v.as_bool())
1289                    .unwrap_or(false),
1290                description: description.clone(),
1291                quality: quality_rules.clone(),
1292                relationships: ref_to_relationships(
1293                    &field_data
1294                        .get("$ref")
1295                        .and_then(|v| v.as_str())
1296                        .map(|s| s.to_string()),
1297                ),
1298                ..Default::default()
1299            });
1300
1301            // Extract nested fields recursively
1302            if let Some(fields_obj) = nested_fields_obj {
1303                for (nested_field_name, nested_field_data) in fields_obj {
1304                    if let Some(nested_field_obj) = nested_field_data.as_object() {
1305                        let nested_field_type = nested_field_obj
1306                            .get("logicalType")
1307                            .and_then(|v| v.as_str())
1308                            .or_else(|| nested_field_obj.get("type").and_then(|v| v.as_str()))
1309                            .unwrap_or("STRING");
1310
1311                        // Recursively parse nested fields
1312                        let nested_col_name = format!("{}.{}", field_name, nested_field_name);
1313                        match self.parse_data_contract_field(
1314                            &nested_col_name,
1315                            nested_field_obj,
1316                            data,
1317                            errors,
1318                        ) {
1319                            Ok(mut nested_cols) => {
1320                                columns.append(&mut nested_cols);
1321                            }
1322                            Err(_) => {
1323                                // Fallback: create simple nested column
1324                                columns.push(Column {
1325                                    name: nested_col_name,
1326                                    data_type: nested_field_type.to_uppercase(),
1327                                    nullable: !nested_field_obj
1328                                        .get("required")
1329                                        .and_then(|v| v.as_bool())
1330                                        .unwrap_or(false),
1331                                    description: nested_field_obj
1332                                        .get("description")
1333                                        .and_then(|v| v.as_str())
1334                                        .unwrap_or("")
1335                                        .to_string(),
1336                                    ..Default::default()
1337                                });
1338                            }
1339                        }
1340                    }
1341                }
1342            }
1343
1344            return Ok(columns);
1345        }
1346
1347        // Regular field (no $ref or $ref not found)
1348        let ref_path = field_data
1349            .get("$ref")
1350            .and_then(|v| v.as_str())
1351            .map(|s| s.to_string());
1352
1353        let required = field_data
1354            .get("required")
1355            .and_then(|v| v.as_bool())
1356            .unwrap_or(false);
1357
1358        let field_description = if description.is_empty() {
1359            field_data
1360                .get("description")
1361                .and_then(|v| v.as_str())
1362                .unwrap_or("")
1363                .to_string()
1364        } else {
1365            description
1366        };
1367
1368        // Extract column-level quality rules if not already extracted
1369        let mut column_quality_rules = quality_rules;
1370        if column_quality_rules.is_empty()
1371            && let Some(quality_val) = field_data.get("quality")
1372        {
1373            if let Some(arr) = quality_val.as_array() {
1374                for item in arr {
1375                    if let Some(obj) = item.as_object() {
1376                        let mut rule = HashMap::new();
1377                        for (key, value) in obj {
1378                            rule.insert(key.clone(), json_value_to_serde_value(value));
1379                        }
1380                        column_quality_rules.push(rule);
1381                    }
1382                }
1383            } else if let Some(obj) = quality_val.as_object() {
1384                let mut rule = HashMap::new();
1385                for (key, value) in obj {
1386                    rule.insert(key.clone(), json_value_to_serde_value(value));
1387                }
1388                column_quality_rules.push(rule);
1389            }
1390        }
1391
1392        columns.push(Column {
1393            name: field_name.to_string(),
1394            data_type: field_type,
1395            nullable: !required,
1396            primary_key: field_data
1397                .get("primaryKey")
1398                .and_then(|v| v.as_bool())
1399                .unwrap_or(false),
1400            foreign_key: parse_foreign_key_from_data_contract(field_data),
1401            description: field_description,
1402            quality: column_quality_rules,
1403            relationships: ref_to_relationships(&ref_path),
1404            ..Default::default()
1405        });
1406
1407        Ok(columns)
1408    }
1409
1410    /// Extract database type from servers in Data Contract format.
1411    fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
1412        // Data Contract format: servers can be object or array
1413        if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
1414            // Object format: { "server_name": { "type": "..." } }
1415            if let Some((_, server_data)) = servers_obj.iter().next()
1416                && let Some(server_obj) = server_data.as_object()
1417            {
1418                return server_obj
1419                    .get("type")
1420                    .and_then(|v| v.as_str())
1421                    .and_then(|s| self.parse_database_type(s));
1422            }
1423        } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
1424            // Array format: [ { "server": "...", "type": "..." } ]
1425            if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
1426                return server_obj
1427                    .get("type")
1428                    .and_then(|v| v.as_str())
1429                    .and_then(|s| self.parse_database_type(s));
1430            }
1431        }
1432        None
1433    }
1434
1435    /// Parse database type string to enum.
1436    fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
1437        match s.to_lowercase().as_str() {
1438            "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
1439            "postgres" | "postgresql" => Some(DatabaseType::Postgres),
1440            "mysql" => Some(DatabaseType::Mysql),
1441            "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
1442            "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
1443            _ => None,
1444        }
1445    }
1446
1447    /// Extract table UUID from ODCL `id` field or fallback.
1448    fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
1449        // First check the top-level `id` field
1450        if let Some(id_val) = data.get("id")
1451            && let Some(id_str) = id_val.as_str()
1452            && let Ok(uuid) = uuid::Uuid::parse_str(id_str)
1453        {
1454            tracing::debug!(
1455                "[ODCLImporter] Extracted UUID from top-level 'id' field: {}",
1456                uuid
1457            );
1458            return uuid;
1459        }
1460
1461        // Backward compatibility: check customProperties for tableUuid (legacy format)
1462        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1463            for prop in custom_props {
1464                if let Some(prop_obj) = prop.as_object() {
1465                    let prop_key = prop_obj
1466                        .get("property")
1467                        .and_then(|v| v.as_str())
1468                        .unwrap_or("");
1469                    if prop_key == "tableUuid"
1470                        && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
1471                        && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1472                    {
1473                        tracing::debug!(
1474                            "[ODCLImporter] Extracted UUID from customProperties.tableUuid: {}",
1475                            uuid
1476                        );
1477                        return uuid;
1478                    }
1479                }
1480            }
1481        }
1482
1483        // Fallback: check odcl_metadata if present (legacy format)
1484        if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
1485            && let Some(uuid_val) = metadata.get("tableUuid")
1486            && let Some(uuid_str) = uuid_val.as_str()
1487            && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1488        {
1489            tracing::debug!(
1490                "[ODCLImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
1491                uuid
1492            );
1493            return uuid;
1494        }
1495
1496        // Generate deterministic UUID v5 if not found (based on table name)
1497        let table_name = data
1498            .get("name")
1499            .and_then(|v| v.as_str())
1500            .unwrap_or("unknown");
1501        let new_uuid = crate::models::table::Table::generate_id(table_name, None, None, None);
1502        tracing::warn!(
1503            "[ODCLImporter] No UUID found for table '{}', generating deterministic UUID: {}",
1504            table_name,
1505            new_uuid
1506        );
1507        new_uuid
1508    }
1509
1510    /// Parse STRUCT type from string and create nested columns.
1511    #[allow(clippy::only_used_in_recursion)]
1512    fn parse_struct_type_from_string(
1513        &self,
1514        field_name: &str,
1515        type_str: &str,
1516        field_data: &serde_json::Map<String, JsonValue>,
1517    ) -> Result<Vec<Column>> {
1518        let mut columns = Vec::new();
1519
1520        // Normalize whitespace
1521        let normalized_type = type_str
1522            .lines()
1523            .map(|line| line.trim())
1524            .filter(|line| !line.is_empty())
1525            .collect::<Vec<_>>()
1526            .join(" ");
1527
1528        let type_str_upper = normalized_type.to_uppercase();
1529
1530        // Check if it's ARRAY<STRUCT<...>>
1531        let is_array = type_str_upper.starts_with("ARRAY<");
1532        let struct_start = type_str_upper.find("STRUCT<");
1533
1534        if let Some(start_pos) = struct_start {
1535            let struct_content_start = start_pos + 7; // Skip "STRUCT<"
1536            let struct_content = &normalized_type[struct_content_start..];
1537
1538            // Find matching closing bracket for STRUCT<
1539            let mut depth = 1;
1540            let mut end_pos = None;
1541            for (i, ch) in struct_content.char_indices() {
1542                match ch {
1543                    '<' => depth += 1,
1544                    '>' => {
1545                        depth -= 1;
1546                        if depth == 0 {
1547                            end_pos = Some(i);
1548                            break;
1549                        }
1550                    }
1551                    _ => {}
1552                }
1553            }
1554
1555            let struct_fields_str = if let Some(end) = end_pos {
1556                &struct_content[..end]
1557            } else {
1558                struct_content.trim_end_matches('>').trim()
1559            };
1560
1561            // Parse fields: "ID: STRING, NAME: STRING, ..."
1562            let fields = parse_struct_fields_from_string(struct_fields_str)?;
1563
1564            // Create nested columns
1565            for (nested_name, nested_type) in fields {
1566                let nested_type_upper = nested_type.to_uppercase();
1567                let nested_col_name = if is_array {
1568                    format!("{}.[].{}", field_name, nested_name)
1569                } else {
1570                    format!("{}.{}", field_name, nested_name)
1571                };
1572
1573                let is_nested_struct = nested_type_upper.starts_with("STRUCT<");
1574                let is_nested_array_struct = nested_type_upper.starts_with("ARRAY<STRUCT<");
1575
1576                if is_nested_struct || is_nested_array_struct {
1577                    // Recursively parse nested STRUCT or ARRAY<STRUCT>
1578                    match self.parse_struct_type_from_string(
1579                        &nested_col_name,
1580                        &nested_type,
1581                        field_data,
1582                    ) {
1583                        Ok(nested_cols) => {
1584                            columns.extend(nested_cols);
1585                        }
1586                        Err(_) => {
1587                            let fallback_data_type = if is_nested_array_struct {
1588                                "ARRAY<STRUCT<...>>".to_string()
1589                            } else {
1590                                "STRUCT<...>".to_string()
1591                            };
1592                            columns.push(Column {
1593                                name: nested_col_name,
1594                                data_type: fallback_data_type,
1595                                nullable: !field_data
1596                                    .get("required")
1597                                    .and_then(|v| v.as_bool())
1598                                    .unwrap_or(false),
1599                                description: field_data
1600                                    .get("description")
1601                                    .and_then(|v| v.as_str())
1602                                    .unwrap_or("")
1603                                    .to_string(),
1604                                ..Default::default()
1605                            });
1606                        }
1607                    }
1608                } else if nested_type_upper.starts_with("ARRAY<") {
1609                    columns.push(Column {
1610                        name: nested_col_name,
1611                        data_type: normalize_data_type(&nested_type),
1612                        nullable: !field_data
1613                            .get("required")
1614                            .and_then(|v| v.as_bool())
1615                            .unwrap_or(false),
1616                        description: field_data
1617                            .get("description")
1618                            .and_then(|v| v.as_str())
1619                            .unwrap_or("")
1620                            .to_string(),
1621                        ..Default::default()
1622                    });
1623                } else {
1624                    // Simple nested field
1625                    columns.push(Column {
1626                        name: nested_col_name,
1627                        data_type: normalize_data_type(&nested_type),
1628                        nullable: !field_data
1629                            .get("required")
1630                            .and_then(|v| v.as_bool())
1631                            .unwrap_or(false),
1632                        description: field_data
1633                            .get("description")
1634                            .and_then(|v| v.as_str())
1635                            .unwrap_or("")
1636                            .to_string(),
1637                        ..Default::default()
1638                    });
1639                }
1640            }
1641
1642            return Ok(columns);
1643        }
1644
1645        // If no STRUCT found, return empty (fallback to regular parsing)
1646        Ok(Vec::new())
1647    }
1648}
1649
1650impl Default for ODCLImporter {
1651    fn default() -> Self {
1652        Self::new()
1653    }
1654}
1655
1656#[cfg(test)]
1657mod tests {
1658    use super::*;
1659
1660    #[test]
1661    fn test_parse_simple_odcl_table() {
1662        let mut parser = ODCLImporter::new();
1663        let odcl_yaml = r#"
1664name: users
1665columns:
1666  - name: id
1667    data_type: INT
1668    nullable: false
1669    primary_key: true
1670  - name: name
1671    data_type: VARCHAR(255)
1672    nullable: false
1673database_type: Postgres
1674"#;
1675
1676        let (table, errors) = parser.parse(odcl_yaml).unwrap();
1677        assert_eq!(table.name, "users");
1678        assert_eq!(table.columns.len(), 2);
1679        assert_eq!(table.columns[0].name, "id");
1680        assert_eq!(table.database_type, Some(DatabaseType::Postgres));
1681        assert_eq!(errors.len(), 0);
1682    }
1683
1684    #[test]
1685    fn test_parse_odcl_with_metadata() {
1686        let mut parser = ODCLImporter::new();
1687        let odcl_yaml = r#"
1688name: users
1689columns:
1690  - name: id
1691    data_type: INT
1692medallion_layer: gold
1693scd_pattern: TYPE_2
1694odcl_metadata:
1695  description: "User table"
1696  owner: "data-team"
1697"#;
1698
1699        let (table, errors) = parser.parse(odcl_yaml).unwrap();
1700        assert_eq!(table.medallion_layers.len(), 1);
1701        assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
1702        assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
1703        if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
1704            assert_eq!(desc, "User table");
1705        }
1706        assert_eq!(errors.len(), 0);
1707    }
1708
1709    #[test]
1710    fn test_parse_data_contract_format() {
1711        let mut parser = ODCLImporter::new();
1712        let odcl_yaml = r#"
1713dataContractSpecification: 0.9.3
1714id: urn:datacontract:example
1715models:
1716  users:
1717    fields:
1718      id:
1719        type: bigint
1720        description: User ID
1721      name:
1722        type: string
1723        description: User name
1724"#;
1725
1726        let (table, errors) = parser.parse(odcl_yaml).unwrap();
1727        assert_eq!(table.name, "users");
1728        assert_eq!(table.columns.len(), 2);
1729        assert!(errors.is_empty());
1730    }
1731
1732    #[test]
1733    fn test_can_handle_odcl_format() {
1734        let parser = ODCLImporter::new();
1735
1736        // Data Contract format should be handled
1737        let data_contract = r#"
1738dataContractSpecification: 0.9.3
1739id: test
1740models:
1741  users:
1742    fields:
1743      id:
1744        type: int
1745"#;
1746        assert!(parser.can_handle(data_contract));
1747
1748        // Simple ODCL format should be handled
1749        let simple_odcl = r#"
1750name: users
1751columns:
1752  - name: id
1753    data_type: INT
1754"#;
1755        assert!(parser.can_handle(simple_odcl));
1756
1757        // ODCS v3.x format should NOT be handled
1758        let odcs_v3 = r#"
1759apiVersion: v3.1.0
1760kind: DataContract
1761id: test-uuid
1762version: 1.0.0
1763name: users
1764schema:
1765  - name: users
1766    properties:
1767      - name: id
1768        logicalType: integer
1769"#;
1770        assert!(!parser.can_handle(odcs_v3));
1771    }
1772}