data_modelling_core/import/
odcs.rs

1//! ODCS parser service for parsing Open Data Contract Standard YAML files.
2//!
3//! This service parses ODCS (Open Data Contract Standard) v3.1.0 and legacy ODCL (Data Contract Specification) YAML files
4//! and converts them to Table models. ODCL files are automatically converted to ODCS v3.1.0 format.
5//! Supports multiple formats:
6//! - ODCS v3.1.0 / v3.0.x format (apiVersion, kind, schema) - PRIMARY FORMAT
7//! - ODCL (Data Contract Specification) format (dataContractSpecification, models, definitions) - LEGACY, converted to ODCS
8//! - Simple ODCL format (name, columns) - LEGACY, converted to ODCS
9//! - Liquibase format
10
11use super::odcs_shared::{
12    ParserError, column_to_column_data, expand_nested_column, json_value_to_serde_value,
13    normalize_data_type, parse_data_vault_classification, parse_medallion_layer, parse_scd_pattern,
14    resolve_ref, yaml_to_json_value,
15};
16use super::{ImportError, ImportResult, TableData};
17use crate::models::column::ForeignKey;
18use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
19use crate::models::{Column, PropertyRelationship, Table, Tag};
20use anyhow::{Context, Result};
21use serde_json::Value as JsonValue;
22use std::collections::HashMap;
23use std::str::FromStr;
24use tracing::info;
25
26// Re-export ParserError for backward compatibility
27pub use super::odcs_shared::ParserError as OdcsParserError;
28
29/// Convert a $ref path to a PropertyRelationship.
30/// E.g., "#/definitions/order_id" -> PropertyRelationship { type: "foreignKey", to: "definitions/order_id" }
31fn ref_to_relationships(ref_path: &Option<String>) -> Vec<PropertyRelationship> {
32    match ref_path {
33        Some(ref_str) => {
34            let to = if ref_str.starts_with("#/definitions/") {
35                let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
36                format!("definitions/{}", def_path)
37            } else if ref_str.starts_with("#/") {
38                ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
39            } else {
40                ref_str.clone()
41            };
42            vec![PropertyRelationship {
43                relationship_type: "foreignKey".to_string(),
44                to,
45            }]
46        }
47        None => Vec::new(),
48    }
49}
50
51/// ODCS parser service for parsing Open Data Contract Standard YAML files.
52/// Handles ODCS v3.1.0 (primary format) and legacy ODCL formats (converted to ODCS).
53pub struct ODCSImporter {
54    /// Current YAML data for $ref resolution
55    current_yaml_data: Option<serde_yaml::Value>,
56}
57
58impl ODCSImporter {
59    /// Create a new ODCS parser instance.
60    ///
61    /// # Example
62    ///
63    /// ```rust
64    /// use data_modelling_core::import::odcs::ODCSImporter;
65    ///
66    /// let mut importer = ODCSImporter::new();
67    /// ```
68    pub fn new() -> Self {
69        Self {
70            current_yaml_data: None,
71        }
72    }
73
74    /// Import ODCS/ODCL YAML content and create Table (SDK interface).
75    ///
76    /// Supports ODCS v3.1.0 (primary), legacy ODCL formats (converted to ODCS), and Liquibase formats.
77    ///
78    /// # Arguments
79    ///
80    /// * `yaml_content` - ODCS/ODCL YAML content as a string
81    ///
82    /// # Returns
83    ///
84    /// An `ImportResult` containing the extracted table and any parse errors.
85    ///
86    /// # Example
87    ///
88    /// ```rust
89    /// use data_modelling_core::import::odcs::ODCSImporter;
90    ///
91    /// let mut importer = ODCSImporter::new();
92    /// let yaml = r#"
93    /// apiVersion: v3.1.0
94    /// kind: DataContract
95    /// id: 550e8400-e29b-41d4-a716-446655440000
96    /// version: 1.0.0
97    /// name: users
98    /// schema:
99    ///   fields:
100    ///     - name: id
101    ///       type: bigint
102    /// "#;
103    /// let result = importer.import(yaml).unwrap();
104    /// assert_eq!(result.tables.len(), 1);
105    /// ```
106    pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
107        // First parse YAML to get raw data for ODCS field extraction
108        let yaml_data: serde_yaml::Value = serde_yaml::from_str(yaml_content)
109            .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
110
111        let json_data = yaml_to_json_value(&yaml_data).map_err(|e| {
112            ImportError::ParseError(format!("Failed to convert YAML to JSON: {}", e))
113        })?;
114
115        match self.parse(yaml_content) {
116            Ok((table, errors)) => {
117                // Extract all ODCS contract-level fields from the raw JSON data
118                let sdk_tables = vec![TableData {
119                    table_index: 0,
120                    id: Some(table.id.to_string()),
121                    name: Some(table.name.clone()),
122                    api_version: json_data
123                        .get("apiVersion")
124                        .and_then(|v| v.as_str())
125                        .map(|s| s.to_string()),
126                    version: json_data
127                        .get("version")
128                        .and_then(|v| v.as_str())
129                        .map(|s| s.to_string()),
130                    status: json_data
131                        .get("status")
132                        .and_then(|v| v.as_str())
133                        .map(|s| s.to_string()),
134                    kind: json_data
135                        .get("kind")
136                        .and_then(|v| v.as_str())
137                        .map(|s| s.to_string()),
138                    domain: json_data
139                        .get("domain")
140                        .and_then(|v| v.as_str())
141                        .map(|s| s.to_string()),
142                    data_product: json_data
143                        .get("dataProduct")
144                        .and_then(|v| v.as_str())
145                        .map(|s| s.to_string()),
146                    tenant: json_data
147                        .get("tenant")
148                        .and_then(|v| v.as_str())
149                        .map(|s| s.to_string()),
150                    description: json_data.get("description").cloned(),
151                    columns: table.columns.iter().map(column_to_column_data).collect(),
152                    servers: json_data
153                        .get("servers")
154                        .and_then(|v| v.as_array())
155                        .cloned()
156                        .unwrap_or_default(),
157                    team: json_data.get("team").cloned(),
158                    support: json_data.get("support").cloned(),
159                    roles: json_data
160                        .get("roles")
161                        .and_then(|v| v.as_array())
162                        .cloned()
163                        .unwrap_or_default(),
164                    sla_properties: json_data
165                        .get("slaProperties")
166                        .and_then(|v| v.as_array())
167                        .cloned()
168                        .unwrap_or_default(),
169                    quality: table.quality.clone(),
170                    price: json_data.get("price").cloned(),
171                    tags: table.tags.iter().map(|t| t.to_string()).collect(),
172                    custom_properties: json_data
173                        .get("customProperties")
174                        .and_then(|v| v.as_array())
175                        .cloned()
176                        .unwrap_or_default(),
177                    authoritative_definitions: json_data
178                        .get("authoritativeDefinitions")
179                        .and_then(|v| v.as_array())
180                        .cloned()
181                        .unwrap_or_default(),
182                    contract_created_ts: json_data
183                        .get("contractCreatedTs")
184                        .and_then(|v| v.as_str())
185                        .map(|s| s.to_string()),
186                    odcs_metadata: table.odcl_metadata.clone(),
187                }];
188                let sdk_errors: Vec<ImportError> = errors
189                    .iter()
190                    .map(|e| ImportError::ParseError(e.message.clone()))
191                    .collect();
192                Ok(ImportResult {
193                    tables: sdk_tables,
194                    tables_requiring_name: Vec::new(),
195                    errors: sdk_errors,
196                    ai_suggestions: None,
197                })
198            }
199            Err(e) => Err(ImportError::ParseError(e.to_string())),
200        }
201    }
202
203    /// Parse ODCS/ODCL YAML content and create Table (public method for native app use).
204    ///
205    /// This method returns the full Table object with all metadata, suitable for use in
206    /// native applications that need direct access to the parsed table structure.
207    /// For API use, prefer the `import()` method which returns ImportResult.
208    ///
209    /// # Returns
210    ///
211    /// Returns a tuple of (Table, list of errors/warnings).
212    /// Errors list is empty if parsing is successful.
213    pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
214        self.parse(yaml_content)
215    }
216
217    /// Parse ODCS/ODCL YAML content and create Table (internal method).
218    ///
219    /// Supports ODCS v3.1.0/v3.0.x (primary), ODCL Data Contract spec (legacy, converted to ODCS),
220    /// simple ODCL (legacy, converted to ODCS), and Liquibase formats.
221    ///
222    /// # Returns
223    ///
224    /// Returns a tuple of (Table, list of errors/warnings).
225    /// Errors list is empty if parsing is successful.
226    fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
227        // Errors are collected in helper functions, not here
228        let _errors: Vec<ParserError> = Vec::new();
229
230        // Parse YAML
231        let data: serde_yaml::Value =
232            serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
233
234        if data.is_null() {
235            return Err(anyhow::anyhow!("Empty YAML content"));
236        }
237
238        // Store current YAML data for $ref resolution
239        self.current_yaml_data = Some(data.clone());
240
241        // Convert to JSON Value for easier manipulation
242        let json_data = yaml_to_json_value(&data)?;
243
244        // Check format and parse accordingly
245        if self.is_liquibase_format(&json_data) {
246            return self.parse_liquibase(&json_data);
247        }
248
249        if self.is_odcl_v3_format(&json_data) {
250            return self.parse_odcl_v3(&json_data);
251        }
252
253        if self.is_data_contract_format(&json_data) {
254            return self.parse_data_contract(&json_data);
255        }
256
257        // Fall back to simple ODCL format
258        self.parse_simple_odcl(&json_data)
259    }
260
261    /// Check if YAML is in Liquibase format.
262    fn is_liquibase_format(&self, data: &JsonValue) -> bool {
263        if data.get("databaseChangeLog").is_some() {
264            return true;
265        }
266        // Check for Liquibase-specific keys
267        if let Some(obj) = data.as_object() {
268            let obj_str = format!("{:?}", obj);
269            if obj_str.contains("changeSet") {
270                return true;
271            }
272        }
273        false
274    }
275
276    /// Check if YAML is in ODCS v3.0.x format.
277    fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
278        if let Some(obj) = data.as_object() {
279            let has_api_version = obj.contains_key("apiVersion");
280            let has_kind = obj
281                .get("kind")
282                .and_then(|v| v.as_str())
283                .map(|s| s == "DataContract")
284                .unwrap_or(false);
285            let has_id = obj.contains_key("id");
286            let has_version = obj.contains_key("version");
287            return has_api_version && has_kind && has_id && has_version;
288        }
289        false
290    }
291
292    /// Check if YAML is in Data Contract specification format.
293    fn is_data_contract_format(&self, data: &JsonValue) -> bool {
294        if let Some(obj) = data.as_object() {
295            let has_spec = obj.contains_key("dataContractSpecification");
296            let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
297            return has_spec && has_models;
298        }
299        false
300    }
301
302    /// Parse simple ODCL format.
303    fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
304        let mut errors = Vec::new();
305
306        // Extract table name
307        let name = data
308            .get("name")
309            .and_then(|v| v.as_str())
310            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
311            .to_string();
312
313        // Extract columns
314        let columns_data = data
315            .get("columns")
316            .and_then(|v| v.as_array())
317            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
318
319        let mut columns = Vec::new();
320        for (idx, col_data) in columns_data.iter().enumerate() {
321            match self.parse_column(col_data) {
322                Ok(col) => columns.push(col),
323                Err(e) => {
324                    errors.push(ParserError {
325                        error_type: "column_parse_error".to_string(),
326                        field: format!("columns[{}]", idx),
327                        message: e.to_string(),
328                    });
329                }
330            }
331        }
332
333        // Extract metadata
334        let database_type = self.extract_database_type(data);
335        let medallion_layers = self.extract_medallion_layers(data);
336        let scd_pattern = self.extract_scd_pattern(data);
337        let data_vault_classification = self.extract_data_vault_classification(data);
338        let quality_rules = self.extract_quality_rules(data);
339
340        // Validate pattern exclusivity
341        if scd_pattern.is_some() && data_vault_classification.is_some() {
342            errors.push(ParserError {
343                error_type: "validation_error".to_string(),
344                field: "patterns".to_string(),
345                message: "SCD pattern and Data Vault classification are mutually exclusive"
346                    .to_string(),
347            });
348        }
349
350        // Extract odcl_metadata
351        let mut odcl_metadata = HashMap::new();
352        if let Some(metadata) = data.get("odcl_metadata")
353            && let Some(obj) = metadata.as_object()
354        {
355            for (key, value) in obj {
356                odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
357            }
358        }
359
360        let table_uuid = self.extract_table_uuid(data);
361
362        let table = Table {
363            id: table_uuid,
364            name,
365            columns,
366            database_type,
367            catalog_name: None,
368            schema_name: None,
369            medallion_layers,
370            scd_pattern,
371            data_vault_classification,
372            modeling_level: None,
373            tags: Vec::<Tag>::new(),
374            odcl_metadata,
375            owner: None,
376            sla: None,
377            contact_details: None,
378            infrastructure_type: None,
379            notes: None,
380            position: None,
381            yaml_file_path: None,
382            drawio_cell_id: None,
383            quality: quality_rules,
384            errors: Vec::new(),
385            created_at: chrono::Utc::now(),
386            updated_at: chrono::Utc::now(),
387        };
388
389        info!("Parsed ODCL table: {}", table.name);
390        Ok((table, errors))
391    }
392
393    /// Parse a single column definition.
394    fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
395        let name = col_data
396            .get("name")
397            .and_then(|v| v.as_str())
398            .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
399            .to_string();
400
401        let data_type = col_data
402            .get("data_type")
403            .and_then(|v| v.as_str())
404            .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
405            .to_string();
406
407        // Normalize data_type to uppercase (preserve STRUCT<...> format)
408        let data_type = normalize_data_type(&data_type);
409
410        let nullable = col_data
411            .get("nullable")
412            .and_then(|v| v.as_bool())
413            .unwrap_or(true);
414
415        let primary_key = col_data
416            .get("primary_key")
417            .and_then(|v| v.as_bool())
418            .unwrap_or(false);
419
420        let foreign_key = col_data
421            .get("foreign_key")
422            .and_then(|v| self.parse_foreign_key(v));
423
424        let constraints = col_data
425            .get("constraints")
426            .and_then(|v| v.as_array())
427            .map(|arr| {
428                arr.iter()
429                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
430                    .collect()
431            })
432            .unwrap_or_default();
433
434        let description = col_data
435            .get("description")
436            .and_then(|v| v.as_str())
437            .map(|s| s.to_string())
438            .unwrap_or_default();
439
440        // Extract column-level quality rules
441        let mut column_quality_rules = Vec::new();
442        if let Some(quality_val) = col_data.get("quality") {
443            if let Some(arr) = quality_val.as_array() {
444                // Array of quality rules
445                for item in arr {
446                    if let Some(obj) = item.as_object() {
447                        let mut rule = HashMap::new();
448                        for (key, value) in obj {
449                            rule.insert(key.clone(), json_value_to_serde_value(value));
450                        }
451                        column_quality_rules.push(rule);
452                    }
453                }
454            } else if let Some(obj) = quality_val.as_object() {
455                // Single quality rule object
456                let mut rule = HashMap::new();
457                for (key, value) in obj {
458                    rule.insert(key.clone(), json_value_to_serde_value(value));
459                }
460                column_quality_rules.push(rule);
461            }
462        }
463
464        // Note: ODCS v3.1.0 uses the `required` field at the property level to indicate non-nullable columns,
465        // not a quality rule. We should NOT add a "not_null" quality rule as it's not a valid ODCS quality type.
466        // The `nullable` field will be converted to `required` during export.
467
468        Ok(Column {
469            name,
470            data_type,
471            nullable,
472            primary_key,
473            foreign_key,
474            constraints,
475            description,
476            quality: column_quality_rules,
477            ..Default::default()
478        })
479    }
480
481    /// Parse foreign key from JSON value.
482    fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
483        let obj = fk_data.as_object()?;
484        Some(ForeignKey {
485            table_id: obj
486                .get("table_id")
487                .or_else(|| obj.get("table"))
488                .and_then(|v| v.as_str())
489                .unwrap_or("")
490                .to_string(),
491            column_name: obj
492                .get("column_name")
493                .or_else(|| obj.get("column"))
494                .and_then(|v| v.as_str())
495                .unwrap_or("")
496                .to_string(),
497        })
498    }
499
500    /// Extract database type from data.
501    fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
502        data.get("database_type")
503            .and_then(|v| v.as_str())
504            .and_then(|s| match s.to_uppercase().as_str() {
505                "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
506                "MYSQL" => Some(DatabaseType::Mysql),
507                "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
508                "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
509                "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
510                _ => None,
511            })
512    }
513
514    /// Extract medallion layers from data.
515    fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
516        let mut layers = Vec::new();
517
518        // Check plural form first
519        if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
520            for item in arr {
521                if let Some(s) = item.as_str()
522                    && let Ok(layer) = parse_medallion_layer(s)
523                {
524                    layers.push(layer);
525                }
526            }
527        }
528        // Check singular form (backward compatibility)
529        else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
530            && let Ok(layer) = parse_medallion_layer(s)
531        {
532            layers.push(layer);
533        }
534
535        layers
536    }
537
538    /// Extract SCD pattern from data.
539    fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
540        data.get("scd_pattern")
541            .and_then(|v| v.as_str())
542            .and_then(|s| parse_scd_pattern(s).ok())
543    }
544
545    /// Extract Data Vault classification from data.
546    fn extract_data_vault_classification(
547        &self,
548        data: &JsonValue,
549    ) -> Option<DataVaultClassification> {
550        data.get("data_vault_classification")
551            .and_then(|v| v.as_str())
552            .and_then(|s| parse_data_vault_classification(s).ok())
553    }
554
555    /// Extract quality rules from data.
556    fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
557        use serde_json::Value;
558        let mut quality_rules = Vec::new();
559
560        // Check for quality field at root level (array of objects or single object)
561        if let Some(quality_val) = data.get("quality") {
562            if let Some(arr) = quality_val.as_array() {
563                // Array of quality rules
564                for item in arr {
565                    if let Some(obj) = item.as_object() {
566                        let mut rule = HashMap::new();
567                        for (key, value) in obj {
568                            rule.insert(key.clone(), json_value_to_serde_value(value));
569                        }
570                        quality_rules.push(rule);
571                    }
572                }
573            } else if let Some(obj) = quality_val.as_object() {
574                // Single quality rule object
575                let mut rule = HashMap::new();
576                for (key, value) in obj {
577                    rule.insert(key.clone(), json_value_to_serde_value(value));
578                }
579                quality_rules.push(rule);
580            } else if let Some(s) = quality_val.as_str() {
581                // Simple string quality value
582                let mut rule = HashMap::new();
583                rule.insert("value".to_string(), Value::String(s.to_string()));
584                quality_rules.push(rule);
585            }
586        }
587
588        // Check for quality in metadata (ODCL v3 format)
589        if let Some(metadata) = data.get("metadata")
590            && let Some(metadata_obj) = metadata.as_object()
591            && let Some(quality_val) = metadata_obj.get("quality")
592        {
593            if let Some(arr) = quality_val.as_array() {
594                // Array of quality rules
595                for item in arr {
596                    if let Some(obj) = item.as_object() {
597                        let mut rule = HashMap::new();
598                        for (key, value) in obj {
599                            rule.insert(key.clone(), json_value_to_serde_value(value));
600                        }
601                        quality_rules.push(rule);
602                    }
603                }
604            } else if let Some(obj) = quality_val.as_object() {
605                // Single quality rule object
606                let mut rule = HashMap::new();
607                for (key, value) in obj {
608                    rule.insert(key.clone(), json_value_to_serde_value(value));
609                }
610                quality_rules.push(rule);
611            } else if let Some(s) = quality_val.as_str() {
612                // Simple string quality value
613                let mut rule = HashMap::new();
614                rule.insert("value".to_string(), Value::String(s.to_string()));
615                quality_rules.push(rule);
616            }
617        }
618
619        // Check for tblproperties field (similar to SQL TBLPROPERTIES)
620        if let Some(tblprops) = data.get("tblproperties")
621            && let Some(obj) = tblprops.as_object()
622        {
623            for (key, value) in obj {
624                let mut rule = HashMap::new();
625                rule.insert("property".to_string(), Value::String(key.clone()));
626                rule.insert("value".to_string(), json_value_to_serde_value(value));
627                quality_rules.push(rule);
628            }
629        }
630
631        quality_rules
632    }
633
634    /// Parse Liquibase YAML changelog format.
635    ///
636    /// Extracts the first `createTable` change from a Liquibase changelog.
637    /// Supports both direct column definitions and nested column structures.
638    ///
639    /// # Supported Format
640    ///
641    fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
642        // Supported Liquibase YAML changelog format:
643        // databaseChangeLog:
644        //   - changeSet:
645        //       - createTable:
646        //           tableName: my_table
647        //           columns:
648        //             - column:
649        //                 name: id
650        //                 type: int
651        //                 constraints:
652        //                   primaryKey: true
653        //                   nullable: false
654
655        let mut errors = Vec::new();
656
657        let changelog = data
658            .get("databaseChangeLog")
659            .and_then(|v| v.as_array())
660            .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
661
662        // Find first createTable change.
663        let mut table_name: Option<String> = None;
664        let mut columns: Vec<crate::models::column::Column> = Vec::new();
665
666        for entry in changelog {
667            // Entries are typically maps like { changeSet: [...] } or { changeSet: { changes: [...] } }
668            if let Some(change_set) = entry.get("changeSet") {
669                // Liquibase YAML can represent changeSet as map or list.
670                let changes = if let Some(obj) = change_set.as_object() {
671                    obj.get("changes")
672                        .and_then(|v| v.as_array())
673                        .cloned()
674                        .unwrap_or_default()
675                } else if let Some(arr) = change_set.as_array() {
676                    // Some variants encode changes directly as list entries inside changeSet.
677                    arr.clone()
678                } else {
679                    Vec::new()
680                };
681
682                for ch in changes {
683                    let create = ch.get("createTable").or_else(|| ch.get("create_table"));
684                    if let Some(create) = create {
685                        table_name = create
686                            .get("tableName")
687                            .or_else(|| create.get("table_name"))
688                            .and_then(|v| v.as_str())
689                            .map(|s| s.to_string());
690
691                        // columns: [ { column: { ... } }, ... ]
692                        if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
693                            for col_entry in cols {
694                                let col = col_entry.get("column").unwrap_or(col_entry);
695                                let name = col
696                                    .get("name")
697                                    .and_then(|v| v.as_str())
698                                    .unwrap_or("")
699                                    .to_string();
700                                let data_type = col
701                                    .get("type")
702                                    .and_then(|v| v.as_str())
703                                    .unwrap_or("")
704                                    .to_string();
705
706                                if name.is_empty() {
707                                    errors.push(ParserError {
708                                        error_type: "validation_error".to_string(),
709                                        field: "columns.name".to_string(),
710                                        message: "Liquibase createTable column missing name"
711                                            .to_string(),
712                                    });
713                                    continue;
714                                }
715
716                                let mut column =
717                                    crate::models::column::Column::new(name, data_type);
718
719                                if let Some(constraints) =
720                                    col.get("constraints").and_then(|v| v.as_object())
721                                {
722                                    if let Some(pk) =
723                                        constraints.get("primaryKey").and_then(|v| v.as_bool())
724                                    {
725                                        column.primary_key = pk;
726                                    }
727                                    if let Some(nullable) =
728                                        constraints.get("nullable").and_then(|v| v.as_bool())
729                                    {
730                                        column.nullable = nullable;
731                                    }
732                                }
733
734                                columns.push(column);
735                            }
736                        }
737
738                        // parse_table() returns a single Table, so we parse the first createTable.
739                        // If multiple tables are needed, call parse_table() multiple times or use import().
740                        break;
741                    }
742                }
743            }
744            if table_name.is_some() {
745                break;
746            }
747        }
748
749        let table_name = table_name
750            .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
751        let table = Table::new(table_name, columns);
752        // Preserve any errors collected.
753        Ok((table, errors))
754    }
755
756    /// Parse ODCS v3.0.x format.
757    fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
758        let mut errors = Vec::new();
759
760        // Extract table name
761        let table_name = data
762            .get("name")
763            .and_then(|v| v.as_str())
764            .map(|s| s.to_string())
765            .or_else(|| {
766                // Try to get from schema array
767                data.get("schema")
768                    .and_then(|v| v.as_array())
769                    .and_then(|arr| arr.first())
770                    .and_then(|obj| obj.as_object())
771                    .and_then(|obj| obj.get("name"))
772                    .and_then(|v| v.as_str())
773                    .map(|s| s.to_string())
774            })
775            .ok_or_else(|| {
776                anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
777            })?;
778
779        // Extract schema - ODCS v3.0.x uses array of SchemaObject
780        let schema = data
781            .get("schema")
782            .and_then(|v| v.as_array())
783            .ok_or_else(|| {
784                errors.push(ParserError {
785                    error_type: "validation_error".to_string(),
786                    field: "schema".to_string(),
787                    message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
788                });
789                anyhow::anyhow!("Missing schema")
790            });
791
792        let schema = match schema {
793            Ok(s) if s.is_empty() => {
794                errors.push(ParserError {
795                    error_type: "validation_error".to_string(),
796                    field: "schema".to_string(),
797                    message: "ODCS v3.0.x schema array is empty".to_string(),
798                });
799                let quality_rules = self.extract_quality_rules(data);
800                let table_uuid = self.extract_table_uuid(data);
801                let table = Table {
802                    id: table_uuid,
803                    name: table_name,
804                    columns: Vec::new(),
805                    database_type: None,
806                    catalog_name: None,
807                    schema_name: None,
808                    medallion_layers: Vec::new(),
809                    scd_pattern: None,
810                    data_vault_classification: None,
811                    modeling_level: None,
812                    tags: Vec::<Tag>::new(),
813                    odcl_metadata: HashMap::new(),
814                    owner: None,
815                    sla: None,
816                    contact_details: None,
817                    infrastructure_type: None,
818                    notes: None,
819                    position: None,
820                    yaml_file_path: None,
821                    drawio_cell_id: None,
822                    quality: quality_rules,
823                    errors: Vec::new(),
824                    created_at: chrono::Utc::now(),
825                    updated_at: chrono::Utc::now(),
826                };
827                return Ok((table, errors));
828            }
829            Ok(s) => s,
830            Err(_) => {
831                let quality_rules = self.extract_quality_rules(data);
832                let table_uuid = self.extract_table_uuid(data);
833                let table = Table {
834                    id: table_uuid,
835                    name: table_name,
836                    columns: Vec::new(),
837                    database_type: None,
838                    catalog_name: None,
839                    schema_name: None,
840                    medallion_layers: Vec::new(),
841                    scd_pattern: None,
842                    data_vault_classification: None,
843                    modeling_level: None,
844                    tags: Vec::<Tag>::new(),
845                    odcl_metadata: HashMap::new(),
846                    owner: None,
847                    sla: None,
848                    contact_details: None,
849                    infrastructure_type: None,
850                    notes: None,
851                    position: None,
852                    yaml_file_path: None,
853                    drawio_cell_id: None,
854                    quality: quality_rules,
855                    errors: Vec::new(),
856                    created_at: chrono::Utc::now(),
857                    updated_at: chrono::Utc::now(),
858                };
859                return Ok((table, errors));
860            }
861        };
862
863        // Get the first schema object (table)
864        let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
865            errors.push(ParserError {
866                error_type: "validation_error".to_string(),
867                field: "schema[0]".to_string(),
868                message: "First schema object must be a dictionary".to_string(),
869            });
870            anyhow::anyhow!("Invalid schema object")
871        })?;
872
873        let object_name = schema_object
874            .get("name")
875            .and_then(|v| v.as_str())
876            .unwrap_or(&table_name);
877
878        // Handle both object format (v3.0.x) and array format (v3.1.0) for properties
879        let mut columns = Vec::new();
880
881        if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
882            // Object format (v3.0.x): properties is a map of name -> property object
883            for (prop_name, prop_data) in properties_obj {
884                if let Some(prop_obj) = prop_data.as_object() {
885                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
886                        Ok(mut cols) => columns.append(&mut cols),
887                        Err(e) => {
888                            errors.push(ParserError {
889                                error_type: "property_parse_error".to_string(),
890                                field: format!("Property '{}'", prop_name),
891                                message: e.to_string(),
892                            });
893                        }
894                    }
895                } else {
896                    errors.push(ParserError {
897                        error_type: "validation_error".to_string(),
898                        field: format!("Property '{}'", prop_name),
899                        message: format!("Property '{}' must be an object", prop_name),
900                    });
901                }
902            }
903        } else if let Some(properties_arr) =
904            schema_object.get("properties").and_then(|v| v.as_array())
905        {
906            // Array format (v3.1.0): properties is an array of property objects with 'name' field
907            for (idx, prop_data) in properties_arr.iter().enumerate() {
908                if let Some(prop_obj) = prop_data.as_object() {
909                    // Extract name from property object (required in v3.1.0)
910                    // ODCS v3.1.0 requires 'name' field, but we'll also accept 'id' as fallback
911                    let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
912                        Some(JsonValue::String(s)) => s.as_str(),
913                        _ => {
914                            // Skip properties without name or id (not valid ODCS v3.1.0)
915                            errors.push(ParserError {
916                                error_type: "validation_error".to_string(),
917                                field: format!("Property[{}]", idx),
918                                message: format!(
919                                    "Property[{}] missing required 'name' or 'id' field",
920                                    idx
921                                ),
922                            });
923                            continue;
924                        }
925                    };
926
927                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
928                        Ok(mut cols) => columns.append(&mut cols),
929                        Err(e) => {
930                            errors.push(ParserError {
931                                error_type: "property_parse_error".to_string(),
932                                field: format!("Property[{}] '{}'", idx, prop_name),
933                                message: e.to_string(),
934                            });
935                        }
936                    }
937                } else {
938                    errors.push(ParserError {
939                        error_type: "validation_error".to_string(),
940                        field: format!("Property[{}]", idx),
941                        message: format!("Property[{}] must be an object", idx),
942                    });
943                }
944            }
945        } else {
946            errors.push(ParserError {
947                error_type: "validation_error".to_string(),
948                field: format!("Object '{}'", object_name),
949                message: format!(
950                    "Object '{}' missing 'properties' field or properties is invalid",
951                    object_name
952                ),
953            });
954        }
955
956        // Extract metadata from customProperties
957        let (medallion_layers, scd_pattern, data_vault_classification, mut tags): (
958            _,
959            _,
960            _,
961            Vec<Tag>,
962        ) = self.extract_metadata_from_custom_properties(data);
963
964        // Extract sharedDomains from customProperties
965        let mut shared_domains: Vec<String> = Vec::new();
966        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
967            for prop in custom_props {
968                if let Some(prop_obj) = prop.as_object() {
969                    let prop_key = prop_obj
970                        .get("property")
971                        .and_then(|v| v.as_str())
972                        .unwrap_or("");
973                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
974                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
975                    {
976                        for item in arr {
977                            if let Some(s) = item.as_str() {
978                                shared_domains.push(s.to_string());
979                            }
980                        }
981                    }
982                }
983            }
984        }
985
986        // Extract tags from top-level tags field (if not already extracted from customProperties)
987        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
988            for item in tags_arr {
989                if let Some(s) = item.as_str() {
990                    // Parse tag string to Tag enum
991                    let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
992                    if !tags.contains(&tag) {
993                        tags.push(tag);
994                    }
995                }
996            }
997        }
998
999        // Extract database type from servers
1000        let database_type = self.extract_database_type_from_odcl_v3_servers(data);
1001
1002        // Extract quality rules
1003        let quality_rules = self.extract_quality_rules(data);
1004
1005        // Build ODCL metadata
1006        let mut odcl_metadata = HashMap::new();
1007        odcl_metadata.insert(
1008            "apiVersion".to_string(),
1009            json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
1010        );
1011        odcl_metadata.insert(
1012            "kind".to_string(),
1013            json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
1014        );
1015        odcl_metadata.insert(
1016            "id".to_string(),
1017            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1018        );
1019        odcl_metadata.insert(
1020            "version".to_string(),
1021            json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
1022        );
1023        odcl_metadata.insert(
1024            "status".to_string(),
1025            json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
1026        );
1027
1028        // Extract servicelevels if present
1029        if let Some(servicelevels_val) = data.get("servicelevels") {
1030            odcl_metadata.insert(
1031                "servicelevels".to_string(),
1032                json_value_to_serde_value(servicelevels_val),
1033            );
1034        }
1035
1036        // Extract links if present
1037        if let Some(links_val) = data.get("links") {
1038            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1039        }
1040
1041        // Extract domain, dataProduct, tenant
1042        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1043            odcl_metadata.insert(
1044                "domain".to_string(),
1045                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1046            );
1047        }
1048        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1049            odcl_metadata.insert(
1050                "dataProduct".to_string(),
1051                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1052            );
1053        }
1054        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1055            odcl_metadata.insert(
1056                "tenant".to_string(),
1057                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1058            );
1059        }
1060
1061        // Extract top-level description (can be object or string)
1062        if let Some(desc_val) = data.get("description") {
1063            odcl_metadata.insert(
1064                "description".to_string(),
1065                json_value_to_serde_value(desc_val),
1066            );
1067        }
1068
1069        // Extract pricing
1070        if let Some(pricing_val) = data.get("pricing") {
1071            odcl_metadata.insert(
1072                "pricing".to_string(),
1073                json_value_to_serde_value(pricing_val),
1074            );
1075        }
1076
1077        // Extract team
1078        if let Some(team_val) = data.get("team") {
1079            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1080        }
1081
1082        // Extract roles
1083        if let Some(roles_val) = data.get("roles") {
1084            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1085        }
1086
1087        // Extract terms
1088        if let Some(terms_val) = data.get("terms") {
1089            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1090        }
1091
1092        // Extract full servers array (not just type)
1093        if let Some(servers_val) = data.get("servers") {
1094            odcl_metadata.insert(
1095                "servers".to_string(),
1096                json_value_to_serde_value(servers_val),
1097            );
1098        }
1099
1100        // Extract infrastructure
1101        if let Some(infrastructure_val) = data.get("infrastructure") {
1102            odcl_metadata.insert(
1103                "infrastructure".to_string(),
1104                json_value_to_serde_value(infrastructure_val),
1105            );
1106        }
1107
1108        // Store sharedDomains in metadata (extracted from customProperties above)
1109        if !shared_domains.is_empty() {
1110            let shared_domains_json: Vec<serde_json::Value> = shared_domains
1111                .iter()
1112                .map(|d| serde_json::Value::String(d.clone()))
1113                .collect();
1114            odcl_metadata.insert(
1115                "sharedDomains".to_string(),
1116                serde_json::Value::Array(shared_domains_json),
1117            );
1118        }
1119
1120        let table_uuid = self.extract_table_uuid(data);
1121
1122        let table = Table {
1123            id: table_uuid,
1124            name: table_name,
1125            columns,
1126            database_type,
1127            catalog_name: None,
1128            schema_name: None,
1129            medallion_layers,
1130            scd_pattern,
1131            data_vault_classification,
1132            modeling_level: None,
1133            tags,
1134            odcl_metadata,
1135            owner: None,
1136            sla: None,
1137            contact_details: None,
1138            infrastructure_type: None,
1139            notes: None,
1140            position: None,
1141            yaml_file_path: None,
1142            drawio_cell_id: None,
1143            quality: quality_rules,
1144            errors: Vec::new(),
1145            created_at: chrono::Utc::now(),
1146            updated_at: chrono::Utc::now(),
1147        };
1148
1149        info!(
1150            "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1151            table.name,
1152            errors.len()
1153        );
1154        Ok((table, errors))
1155    }
1156
1157    /// Parse a single property from ODCS v3.0.x format (similar to Data Contract field).
1158    fn parse_odcl_v3_property(
1159        &self,
1160        prop_name: &str,
1161        prop_data: &serde_json::Map<String, JsonValue>,
1162        data: &JsonValue,
1163    ) -> Result<Vec<Column>> {
1164        // Reuse Data Contract field parsing logic (they're similar)
1165        let mut errors = Vec::new();
1166        self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
1167    }
1168
1169    /// Extract table UUID from ODCS `id` field (standard) or fallback to customProperties/odcl_metadata (backward compatibility).
1170    /// Returns the UUID if found, otherwise generates a new one.
1171    fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
1172        // First check the top-level `id` field (ODCS spec: "A unique identifier used to reduce the risk of dataset name collisions, such as a UUID.")
1173        if let Some(id_val) = data.get("id")
1174            && let Some(id_str) = id_val.as_str()
1175        {
1176            if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1177                tracing::debug!(
1178                    "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
1179                    uuid
1180                );
1181                return uuid;
1182            } else {
1183                tracing::warn!(
1184                    "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
1185                    id_str
1186                );
1187            }
1188        }
1189
1190        // Backward compatibility: check customProperties for tableUuid (legacy format)
1191        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1192            for prop in custom_props {
1193                if let Some(prop_obj) = prop.as_object() {
1194                    let prop_key = prop_obj
1195                        .get("property")
1196                        .and_then(|v| v.as_str())
1197                        .unwrap_or("");
1198                    if prop_key == "tableUuid"
1199                        && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
1200                        && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1201                    {
1202                        tracing::debug!(
1203                            "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
1204                            uuid
1205                        );
1206                        return uuid;
1207                    }
1208                }
1209            }
1210        }
1211
1212        // Fallback: check odcl_metadata if present (legacy format)
1213        if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
1214            && let Some(uuid_val) = metadata.get("tableUuid")
1215            && let Some(uuid_str) = uuid_val.as_str()
1216            && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1217        {
1218            tracing::debug!(
1219                "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
1220                uuid
1221            );
1222            return uuid;
1223        }
1224
1225        // Generate deterministic UUID v5 if not found (based on table name)
1226        let table_name = data
1227            .get("name")
1228            .and_then(|v| v.as_str())
1229            .unwrap_or("unknown");
1230        let new_uuid = crate::models::table::Table::generate_id(
1231            table_name, None, // database_type not available here
1232            None, // catalog_name not available here
1233            None, // schema_name not available here
1234        );
1235        tracing::warn!(
1236            "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
1237            table_name,
1238            new_uuid
1239        );
1240        new_uuid
1241    }
1242
1243    /// Extract metadata from customProperties in ODCS v3.0.x format.
1244    fn extract_metadata_from_custom_properties(
1245        &self,
1246        data: &JsonValue,
1247    ) -> (
1248        Vec<MedallionLayer>,
1249        Option<SCDPattern>,
1250        Option<DataVaultClassification>,
1251        Vec<Tag>,
1252    ) {
1253        let mut medallion_layers = Vec::new();
1254        let mut scd_pattern = None;
1255        let mut data_vault_classification = None;
1256        let mut tags: Vec<Tag> = Vec::new();
1257
1258        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1259            for prop in custom_props {
1260                if let Some(prop_obj) = prop.as_object() {
1261                    let prop_key = prop_obj
1262                        .get("property")
1263                        .and_then(|v| v.as_str())
1264                        .unwrap_or("");
1265                    let prop_value = prop_obj.get("value");
1266
1267                    match prop_key {
1268                        "medallionLayers" | "medallion_layers" => {
1269                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1270                                for item in arr {
1271                                    if let Some(s) = item.as_str()
1272                                        && let Ok(layer) = parse_medallion_layer(s)
1273                                    {
1274                                        medallion_layers.push(layer);
1275                                    }
1276                                }
1277                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1278                                // Comma-separated string
1279                                for part in s.split(',') {
1280                                    if let Ok(layer) = parse_medallion_layer(part.trim()) {
1281                                        medallion_layers.push(layer);
1282                                    }
1283                                }
1284                            }
1285                        }
1286                        "scdPattern" | "scd_pattern" => {
1287                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1288                                scd_pattern = parse_scd_pattern(s).ok();
1289                            }
1290                        }
1291                        "dataVaultClassification" | "data_vault_classification" => {
1292                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1293                                data_vault_classification = parse_data_vault_classification(s).ok();
1294                            }
1295                        }
1296                        "tags" => {
1297                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1298                                for item in arr {
1299                                    if let Some(s) = item.as_str() {
1300                                        // Parse tag string to Tag enum
1301                                        if let Ok(tag) = Tag::from_str(s) {
1302                                            tags.push(tag);
1303                                        } else {
1304                                            tags.push(Tag::Simple(s.to_string()));
1305                                        }
1306                                    }
1307                                }
1308                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1309                                // Comma-separated string
1310                                for part in s.split(',') {
1311                                    let part = part.trim();
1312                                    if let Ok(tag) = Tag::from_str(part) {
1313                                        tags.push(tag);
1314                                    } else {
1315                                        tags.push(Tag::Simple(part.to_string()));
1316                                    }
1317                                }
1318                            }
1319                        }
1320                        "sharedDomains" | "shared_domains" => {
1321                            // sharedDomains will be stored in metadata by the caller
1322                            // This match is here for completeness but sharedDomains is handled separately
1323                        }
1324                        _ => {}
1325                    }
1326                }
1327            }
1328        }
1329
1330        // Also extract tags from top-level tags field
1331        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1332            for item in tags_arr {
1333                if let Some(s) = item.as_str() {
1334                    // Parse tag string to Tag enum
1335                    let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
1336                    if !tags.contains(&tag) {
1337                        tags.push(tag);
1338                    }
1339                }
1340            }
1341        }
1342
1343        (
1344            medallion_layers,
1345            scd_pattern,
1346            data_vault_classification,
1347            tags,
1348        )
1349    }
1350
1351    /// Extract database type from servers in ODCS v3.0.x format.
1352    fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
1353        // ODCS v3.0.x: servers is an array of Server objects
1354        if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
1355            && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
1356        {
1357            return server_obj
1358                .get("type")
1359                .and_then(|v| v.as_str())
1360                .and_then(|s| self.parse_database_type(s));
1361        }
1362        None
1363    }
1364
1365    /// Parse Data Contract format.
1366    fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1367        let mut errors = Vec::new();
1368
1369        // Extract models
1370        let models = data
1371            .get("models")
1372            .and_then(|v| v.as_object())
1373            .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
1374
1375        // parse_table() returns a single Table, so we parse the first model.
1376        // If multiple models are needed, call parse_table() multiple times or use import().
1377        let (model_name, model_data) = models
1378            .iter()
1379            .next()
1380            .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
1381
1382        let model_data = model_data
1383            .as_object()
1384            .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
1385
1386        // Extract fields (columns)
1387        let fields = model_data
1388            .get("fields")
1389            .and_then(|v| v.as_object())
1390            .ok_or_else(|| {
1391                errors.push(ParserError {
1392                    error_type: "validation_error".to_string(),
1393                    field: format!("Model '{}'", model_name),
1394                    message: format!("Model '{}' missing 'fields' field", model_name),
1395                });
1396                anyhow::anyhow!("Missing fields")
1397            });
1398
1399        let fields = match fields {
1400            Ok(f) => f,
1401            Err(_) => {
1402                // Return empty table with errors
1403                let quality_rules = self.extract_quality_rules(data);
1404                let table_uuid = self.extract_table_uuid(data);
1405                let table = Table {
1406                    id: table_uuid,
1407                    name: model_name.clone(),
1408                    columns: Vec::new(),
1409                    database_type: None,
1410                    catalog_name: None,
1411                    schema_name: None,
1412                    medallion_layers: Vec::new(),
1413                    scd_pattern: None,
1414                    data_vault_classification: None,
1415                    modeling_level: None,
1416                    tags: Vec::<Tag>::new(),
1417                    odcl_metadata: HashMap::new(),
1418                    owner: None,
1419                    sla: None,
1420                    contact_details: None,
1421                    infrastructure_type: None,
1422                    notes: None,
1423                    position: None,
1424                    yaml_file_path: None,
1425                    drawio_cell_id: None,
1426                    quality: quality_rules,
1427                    errors: Vec::new(),
1428                    created_at: chrono::Utc::now(),
1429                    updated_at: chrono::Utc::now(),
1430                };
1431                return Ok((table, errors));
1432            }
1433        };
1434
1435        // Parse fields as columns
1436        let mut columns = Vec::new();
1437        for (field_name, field_data) in fields {
1438            if let Some(field_obj) = field_data.as_object() {
1439                match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
1440                    Ok(mut cols) => columns.append(&mut cols),
1441                    Err(e) => {
1442                        errors.push(ParserError {
1443                            error_type: "field_parse_error".to_string(),
1444                            field: format!("Field '{}'", field_name),
1445                            message: e.to_string(),
1446                        });
1447                    }
1448                }
1449            } else {
1450                errors.push(ParserError {
1451                    error_type: "validation_error".to_string(),
1452                    field: format!("Field '{}'", field_name),
1453                    message: format!("Field '{}' must be an object", field_name),
1454                });
1455            }
1456        }
1457
1458        // Extract metadata from info section
1459        // The frontend expects info fields to be nested under "info" key
1460        let mut odcl_metadata = HashMap::new();
1461
1462        // Extract info section and nest it properly
1463        // Convert JsonValue object to serde_json::Value::Object (Map)
1464        if let Some(info_val) = data.get("info") {
1465            // Convert JsonValue to serde_json::Value
1466            let info_json_value = json_value_to_serde_value(info_val);
1467            odcl_metadata.insert("info".to_string(), info_json_value);
1468        }
1469
1470        odcl_metadata.insert(
1471            "dataContractSpecification".to_string(),
1472            json_value_to_serde_value(
1473                data.get("dataContractSpecification")
1474                    .unwrap_or(&JsonValue::Null),
1475            ),
1476        );
1477        odcl_metadata.insert(
1478            "id".to_string(),
1479            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1480        );
1481        // Note: model_name is not added to metadata as it's redundant (it's the table name)
1482
1483        // Extract servicelevels if present
1484        if let Some(servicelevels_val) = data.get("servicelevels") {
1485            odcl_metadata.insert(
1486                "servicelevels".to_string(),
1487                json_value_to_serde_value(servicelevels_val),
1488            );
1489        }
1490
1491        // Extract links if present
1492        if let Some(links_val) = data.get("links") {
1493            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1494        }
1495
1496        // Extract domain, dataProduct, tenant
1497        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1498            odcl_metadata.insert(
1499                "domain".to_string(),
1500                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1501            );
1502        }
1503        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1504            odcl_metadata.insert(
1505                "dataProduct".to_string(),
1506                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1507            );
1508        }
1509        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1510            odcl_metadata.insert(
1511                "tenant".to_string(),
1512                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1513            );
1514        }
1515
1516        // Extract top-level description (can be object or string)
1517        if let Some(desc_val) = data.get("description") {
1518            odcl_metadata.insert(
1519                "description".to_string(),
1520                json_value_to_serde_value(desc_val),
1521            );
1522        }
1523
1524        // Extract pricing
1525        if let Some(pricing_val) = data.get("pricing") {
1526            odcl_metadata.insert(
1527                "pricing".to_string(),
1528                json_value_to_serde_value(pricing_val),
1529            );
1530        }
1531
1532        // Extract team
1533        if let Some(team_val) = data.get("team") {
1534            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1535        }
1536
1537        // Extract roles
1538        if let Some(roles_val) = data.get("roles") {
1539            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1540        }
1541
1542        // Extract terms
1543        if let Some(terms_val) = data.get("terms") {
1544            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1545        }
1546
1547        // Extract full servers array (not just type)
1548        if let Some(servers_val) = data.get("servers") {
1549            odcl_metadata.insert(
1550                "servers".to_string(),
1551                json_value_to_serde_value(servers_val),
1552            );
1553        }
1554
1555        // Extract infrastructure
1556        if let Some(infrastructure_val) = data.get("infrastructure") {
1557            odcl_metadata.insert(
1558                "infrastructure".to_string(),
1559                json_value_to_serde_value(infrastructure_val),
1560            );
1561        }
1562
1563        // Extract database type from servers if available
1564        let database_type = self.extract_database_type_from_servers(data);
1565
1566        // Extract catalog and schema from customProperties
1567        let (catalog_name, schema_name) = self.extract_catalog_schema(data);
1568
1569        // Extract sharedDomains from customProperties
1570        let mut shared_domains: Vec<String> = Vec::new();
1571        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1572            for prop in custom_props {
1573                if let Some(prop_obj) = prop.as_object() {
1574                    let prop_key = prop_obj
1575                        .get("property")
1576                        .and_then(|v| v.as_str())
1577                        .unwrap_or("");
1578                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1579                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1580                    {
1581                        for item in arr {
1582                            if let Some(s) = item.as_str() {
1583                                shared_domains.push(s.to_string());
1584                            }
1585                        }
1586                    }
1587                }
1588            }
1589        }
1590
1591        // Extract tags from top-level tags field (Data Contract format)
1592        let mut tags: Vec<Tag> = Vec::new();
1593        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1594            for item in tags_arr {
1595                if let Some(s) = item.as_str() {
1596                    // Parse tag string to Tag enum (supports Simple, Pair, List formats)
1597                    if let Ok(tag) = Tag::from_str(s) {
1598                        tags.push(tag);
1599                    } else {
1600                        // Fallback: create Simple tag if parsing fails
1601                        tags.push(crate::models::Tag::Simple(s.to_string()));
1602                    }
1603                }
1604            }
1605        }
1606
1607        // Extract quality rules
1608        let quality_rules = self.extract_quality_rules(data);
1609
1610        // Store sharedDomains in metadata
1611        if !shared_domains.is_empty() {
1612            let shared_domains_json: Vec<serde_json::Value> = shared_domains
1613                .iter()
1614                .map(|d| serde_json::Value::String(d.clone()))
1615                .collect();
1616            odcl_metadata.insert(
1617                "sharedDomains".to_string(),
1618                serde_json::Value::Array(shared_domains_json),
1619            );
1620        }
1621
1622        let table_uuid = self.extract_table_uuid(data);
1623
1624        let table = Table {
1625            id: table_uuid,
1626            name: model_name.clone(),
1627            columns,
1628            database_type,
1629            catalog_name,
1630            schema_name,
1631            medallion_layers: Vec::new(),
1632            scd_pattern: None,
1633            data_vault_classification: None,
1634            modeling_level: None,
1635            tags,
1636            odcl_metadata,
1637            owner: None,
1638            sla: None,
1639            contact_details: None,
1640            infrastructure_type: None,
1641            notes: None,
1642            position: None,
1643            yaml_file_path: None,
1644            drawio_cell_id: None,
1645            quality: quality_rules,
1646            errors: Vec::new(),
1647            created_at: chrono::Utc::now(),
1648            updated_at: chrono::Utc::now(),
1649        };
1650
1651        info!(
1652            "Parsed Data Contract table: {} with {} warnings/errors",
1653            model_name,
1654            errors.len()
1655        );
1656        Ok((table, errors))
1657    }
1658
1659    /// Parse a single field from Data Contract format.
1660    fn parse_data_contract_field(
1661        &self,
1662        field_name: &str,
1663        field_data: &serde_json::Map<String, JsonValue>,
1664        data: &JsonValue,
1665        errors: &mut Vec<ParserError>,
1666    ) -> Result<Vec<Column>> {
1667        let mut columns = Vec::new();
1668
1669        // Helper function to extract quality rules from a JSON object
1670        let extract_quality_from_obj =
1671            |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
1672                let mut quality_rules = Vec::new();
1673                if let Some(quality_val) = obj.get("quality") {
1674                    if let Some(arr) = quality_val.as_array() {
1675                        // Array of quality rules
1676                        for item in arr {
1677                            if let Some(rule_obj) = item.as_object() {
1678                                let mut rule = HashMap::new();
1679                                for (key, value) in rule_obj {
1680                                    rule.insert(key.clone(), json_value_to_serde_value(value));
1681                                }
1682                                quality_rules.push(rule);
1683                            }
1684                        }
1685                    } else if let Some(rule_obj) = quality_val.as_object() {
1686                        // Single quality rule object
1687                        let mut rule = HashMap::new();
1688                        for (key, value) in rule_obj {
1689                            rule.insert(key.clone(), json_value_to_serde_value(value));
1690                        }
1691                        quality_rules.push(rule);
1692                    }
1693                }
1694                quality_rules
1695            };
1696
1697        // Extract description from field_data (preserve empty strings)
1698        let description = field_data
1699            .get("description")
1700            .and_then(|v| v.as_str())
1701            .unwrap_or("")
1702            .to_string();
1703
1704        // Extract quality rules from field_data
1705        let mut quality_rules = extract_quality_from_obj(field_data);
1706
1707        // Check for $ref
1708        if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
1709            // Store ref_path (preserve even if definition doesn't exist)
1710            let ref_path = Some(ref_str.to_string());
1711
1712            if let Some(definition) = resolve_ref(ref_str, data) {
1713                // Merge quality rules from definition if field doesn't have any
1714
1715                // Also extract quality rules from definition and merge (if field doesn't have any)
1716                if quality_rules.is_empty() {
1717                    if let Some(def_obj) = definition.as_object() {
1718                        quality_rules = extract_quality_from_obj(def_obj);
1719                    }
1720                } else {
1721                    // Merge definition quality rules if field has some
1722                    if let Some(def_obj) = definition.as_object() {
1723                        let def_quality = extract_quality_from_obj(def_obj);
1724                        // Append definition quality rules (field-level takes precedence)
1725                        quality_rules.extend(def_quality);
1726                    }
1727                }
1728
1729                let required = field_data
1730                    .get("required")
1731                    .and_then(|v| v.as_bool())
1732                    .unwrap_or(false);
1733
1734                // Note: ODCS v3.1.0 uses the `required` field at the property level to indicate non-nullable columns,
1735                // not a quality rule. We should NOT add a "not_null" quality rule as it's not a valid ODCS quality type.
1736                // The `required` field will be set based on the `required` parameter during column creation.
1737
1738                // Check if definition is an object/struct with nested structure
1739                let has_nested = definition
1740                    .get("type")
1741                    .and_then(|v| v.as_str())
1742                    .map(|s| s == "object")
1743                    .unwrap_or(false)
1744                    || definition.get("properties").is_some()
1745                    || definition.get("fields").is_some();
1746
1747                if has_nested {
1748                    // Expand STRUCT from definition into nested columns with dot notation
1749                    if let Some(properties) =
1750                        definition.get("properties").and_then(|v| v.as_object())
1751                    {
1752                        // Recursively expand nested properties
1753                        let nested_required: Vec<String> = definition
1754                            .get("required")
1755                            .and_then(|v| v.as_array())
1756                            .map(|arr| {
1757                                arr.iter()
1758                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1759                                    .collect()
1760                            })
1761                            .unwrap_or_default();
1762
1763                        for (nested_name, nested_schema) in properties {
1764                            let nested_required_field = nested_required.contains(nested_name);
1765                            expand_nested_column(
1766                                &format!("{}.{}", field_name, nested_name),
1767                                nested_schema,
1768                                !nested_required_field,
1769                                &mut columns,
1770                                errors,
1771                            );
1772                        }
1773                    } else if let Some(fields) =
1774                        definition.get("fields").and_then(|v| v.as_object())
1775                    {
1776                        // Handle fields format (ODCL style)
1777                        for (nested_name, nested_schema) in fields {
1778                            expand_nested_column(
1779                                &format!("{}.{}", field_name, nested_name),
1780                                nested_schema,
1781                                true, // Assume nullable if not specified
1782                                &mut columns,
1783                                errors,
1784                            );
1785                        }
1786                    } else {
1787                        // Fallback: create parent column as OBJECT if we can't expand
1788                        let def_physical_type = definition
1789                            .get("physicalType")
1790                            .and_then(|v| v.as_str())
1791                            .map(|s| s.to_string());
1792                        columns.push(Column {
1793                            name: field_name.to_string(),
1794                            data_type: "OBJECT".to_string(),
1795                            physical_type: def_physical_type,
1796                            nullable: !required,
1797                            description: if description.is_empty() {
1798                                definition
1799                                    .get("description")
1800                                    .and_then(|v| v.as_str())
1801                                    .unwrap_or("")
1802                                    .to_string()
1803                            } else {
1804                                description.clone()
1805                            },
1806                            quality: quality_rules.clone(),
1807                            relationships: ref_to_relationships(&ref_path),
1808                            ..Default::default()
1809                        });
1810                    }
1811                } else {
1812                    // Simple type from definition
1813                    let def_type = definition
1814                        .get("type")
1815                        .and_then(|v| v.as_str())
1816                        .unwrap_or("STRING")
1817                        .to_uppercase();
1818
1819                    let enum_values = definition
1820                        .get("enum")
1821                        .and_then(|v| v.as_array())
1822                        .map(|arr| {
1823                            arr.iter()
1824                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
1825                                .collect()
1826                        })
1827                        .unwrap_or_default();
1828
1829                    let def_physical_type = definition
1830                        .get("physicalType")
1831                        .and_then(|v| v.as_str())
1832                        .map(|s| s.to_string());
1833
1834                    columns.push(Column {
1835                        name: field_name.to_string(),
1836                        data_type: def_type,
1837                        physical_type: def_physical_type,
1838                        nullable: !required,
1839                        description: if description.is_empty() {
1840                            definition
1841                                .get("description")
1842                                .and_then(|v| v.as_str())
1843                                .unwrap_or("")
1844                                .to_string()
1845                        } else {
1846                            description
1847                        },
1848                        quality: quality_rules,
1849                        relationships: ref_to_relationships(&ref_path),
1850                        enum_values,
1851                        ..Default::default()
1852                    });
1853                }
1854                return Ok(columns);
1855            } else {
1856                // Undefined reference - create column with error
1857                let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
1858                let mut error_map = HashMap::new();
1859                error_map.insert("type".to_string(), serde_json::json!("validation_error"));
1860                error_map.insert("field".to_string(), serde_json::json!("data_type"));
1861                error_map.insert(
1862                    "message".to_string(),
1863                    serde_json::json!(format!(
1864                        "Field '{}' references undefined definition: {}",
1865                        field_name, ref_str
1866                    )),
1867                );
1868                col_errors.push(error_map);
1869                let field_physical_type = field_data
1870                    .get("physicalType")
1871                    .and_then(|v| v.as_str())
1872                    .map(|s| s.to_string());
1873                columns.push(Column {
1874                    name: field_name.to_string(),
1875                    data_type: "OBJECT".to_string(),
1876                    physical_type: field_physical_type,
1877                    description,
1878                    errors: col_errors,
1879                    relationships: ref_to_relationships(&Some(ref_str.to_string())),
1880                    ..Default::default()
1881                });
1882                return Ok(columns);
1883            }
1884        }
1885
1886        // Extract field type - check both "logicalType" (ODCS v3.1.0) and "type" (legacy)
1887        // Default to STRING if missing
1888        let field_type_str = field_data
1889            .get("logicalType")
1890            .and_then(|v| v.as_str())
1891            .or_else(|| field_data.get("type").and_then(|v| v.as_str()))
1892            .unwrap_or("STRING");
1893
1894        // Check if type contains STRUCT definition (multiline STRUCT type)
1895        if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
1896            match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
1897                Ok(nested_cols) if !nested_cols.is_empty() => {
1898                    // We have nested columns - add parent column with full type, then nested columns
1899                    let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
1900                        "ARRAY<STRUCT<...>>".to_string()
1901                    } else {
1902                        "STRUCT<...>".to_string()
1903                    };
1904
1905                    let struct_physical_type = field_data
1906                        .get("physicalType")
1907                        .and_then(|v| v.as_str())
1908                        .map(|s| s.to_string());
1909
1910                    // Add parent column
1911                    columns.push(Column {
1912                        name: field_name.to_string(),
1913                        data_type: parent_data_type,
1914                        physical_type: struct_physical_type,
1915                        nullable: !field_data
1916                            .get("required")
1917                            .and_then(|v| v.as_bool())
1918                            .unwrap_or(false),
1919                        description: description.clone(),
1920                        quality: quality_rules.clone(),
1921                        relationships: ref_to_relationships(
1922                            &field_data
1923                                .get("$ref")
1924                                .and_then(|v| v.as_str())
1925                                .map(|s| s.to_string()),
1926                        ),
1927                        ..Default::default()
1928                    });
1929
1930                    // Add nested columns
1931                    columns.extend(nested_cols);
1932                    return Ok(columns);
1933                }
1934                Ok(_) | Err(_) => {
1935                    // If parsing fails or returns empty, fall back to using the type as-is
1936                }
1937            }
1938        }
1939
1940        let field_type = normalize_data_type(field_type_str);
1941
1942        // Handle ARRAY type
1943        if field_type == "ARRAY" {
1944            let items = field_data.get("items");
1945            if let Some(items_val) = items {
1946                if let Some(items_obj) = items_val.as_object() {
1947                    // Check if items is an object with fields (nested structure)
1948                    // Check both "logicalType" (ODCS v3.1.0) and "type" (legacy) for backward compatibility
1949                    let items_type = items_obj
1950                        .get("logicalType")
1951                        .and_then(|v| v.as_str())
1952                        .or_else(|| items_obj.get("type").and_then(|v| v.as_str()));
1953
1954                    // Normalize legacy "type" values to "logicalType" equivalents
1955                    let normalized_items_type = match items_type {
1956                        Some("object") | Some("struct") => Some("object"),
1957                        Some("array") => Some("array"),
1958                        Some("string") | Some("varchar") | Some("char") | Some("text") => {
1959                            Some("string")
1960                        }
1961                        Some("integer") | Some("int") | Some("bigint") | Some("smallint")
1962                        | Some("tinyint") => Some("integer"),
1963                        Some("number") | Some("decimal") | Some("double") | Some("float")
1964                        | Some("numeric") => Some("number"),
1965                        Some("boolean") | Some("bool") => Some("boolean"),
1966                        Some("date") => Some("date"),
1967                        Some("timestamp") | Some("datetime") => Some("timestamp"),
1968                        Some("time") => Some("time"),
1969                        other => other,
1970                    };
1971
1972                    if items_obj.get("fields").is_some()
1973                        || items_obj.get("properties").is_some()
1974                        || normalized_items_type == Some("object")
1975                    {
1976                        // Array of objects - create parent column as ARRAY<OBJECT>
1977                        let array_physical_type = field_data
1978                            .get("physicalType")
1979                            .and_then(|v| v.as_str())
1980                            .map(|s| s.to_string());
1981                        columns.push(Column {
1982                            name: field_name.to_string(),
1983                            data_type: "ARRAY<OBJECT>".to_string(),
1984                            physical_type: array_physical_type,
1985                            nullable: !field_data
1986                                .get("required")
1987                                .and_then(|v| v.as_bool())
1988                                .unwrap_or(false),
1989                            description: field_data
1990                                .get("description")
1991                                .and_then(|v| v.as_str())
1992                                .unwrap_or("")
1993                                .to_string(),
1994                            ..Default::default()
1995                        });
1996
1997                        // Extract nested fields from items.properties or items.fields if present
1998                        // Handle both object format (legacy) and array format (ODCS v3.1.0)
1999                        let properties_obj =
2000                            items_obj.get("properties").and_then(|v| v.as_object());
2001                        let properties_arr = items_obj.get("properties").and_then(|v| v.as_array());
2002                        let fields_obj = items_obj.get("fields").and_then(|v| v.as_object());
2003
2004                        if let Some(fields_map) = properties_obj.or(fields_obj) {
2005                            // Object format (legacy): properties is a map
2006                            for (nested_field_name, nested_field_data) in fields_map {
2007                                if let Some(nested_field_obj) = nested_field_data.as_object() {
2008                                    // Check both "logicalType" (ODCS v3.1.0) and "type" (legacy)
2009                                    let nested_field_type = nested_field_obj
2010                                        .get("logicalType")
2011                                        .and_then(|v| v.as_str())
2012                                        .or_else(|| {
2013                                            nested_field_obj.get("type").and_then(|v| v.as_str())
2014                                        })
2015                                        .unwrap_or("STRING");
2016
2017                                    // Recursively parse nested fields with array prefix
2018                                    let nested_col_name =
2019                                        format!("{}.[].{}", field_name, nested_field_name);
2020                                    let mut local_errors = Vec::new();
2021                                    match self.parse_data_contract_field(
2022                                        &nested_col_name,
2023                                        nested_field_obj,
2024                                        data,
2025                                        &mut local_errors,
2026                                    ) {
2027                                        Ok(mut nested_cols) => {
2028                                            // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2029                                            // Otherwise, it returns a single column
2030                                            columns.append(&mut nested_cols);
2031                                        }
2032                                        Err(_) => {
2033                                            // Fallback: create simple nested column
2034                                            let nested_physical_type = nested_field_obj
2035                                                .get("physicalType")
2036                                                .and_then(|v| v.as_str())
2037                                                .map(|s| s.to_string());
2038                                            columns.push(Column {
2039                                                name: nested_col_name,
2040                                                data_type: nested_field_type.to_uppercase(),
2041                                                physical_type: nested_physical_type,
2042                                                nullable: !nested_field_obj
2043                                                    .get("required")
2044                                                    .and_then(|v| v.as_bool())
2045                                                    .unwrap_or(false),
2046                                                description: nested_field_obj
2047                                                    .get("description")
2048                                                    .and_then(|v| v.as_str())
2049                                                    .unwrap_or("")
2050                                                    .to_string(),
2051                                                ..Default::default()
2052                                            });
2053                                        }
2054                                    }
2055                                }
2056                            }
2057                        } else if let Some(properties_list) = properties_arr {
2058                            // Array format (ODCS v3.1.0): properties is an array with 'name' field
2059                            let mut local_errors = Vec::new();
2060                            for prop_data in properties_list {
2061                                if let Some(prop_obj) = prop_data.as_object() {
2062                                    // Extract name from property object (required in v3.1.0)
2063                                    let nested_field_name = prop_obj
2064                                        .get("name")
2065                                        .or_else(|| prop_obj.get("id"))
2066                                        .and_then(|v| v.as_str())
2067                                        .unwrap_or("");
2068
2069                                    if !nested_field_name.is_empty() {
2070                                        // Recursively parse nested fields with array prefix
2071                                        let nested_col_name =
2072                                            format!("{}.[].{}", field_name, nested_field_name);
2073                                        match self.parse_data_contract_field(
2074                                            &nested_col_name,
2075                                            prop_obj,
2076                                            data,
2077                                            &mut local_errors,
2078                                        ) {
2079                                            Ok(mut nested_cols) => {
2080                                                // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2081                                                // Otherwise, it returns a single column
2082                                                columns.append(&mut nested_cols);
2083                                            }
2084                                            Err(_) => {
2085                                                // Fallback: create simple nested column
2086                                                // Check both "logicalType" (ODCS v3.1.0) and "type" (legacy)
2087                                                let nested_field_type = prop_obj
2088                                                    .get("logicalType")
2089                                                    .and_then(|v| v.as_str())
2090                                                    .or_else(|| {
2091                                                        prop_obj
2092                                                            .get("type")
2093                                                            .and_then(|v| v.as_str())
2094                                                    })
2095                                                    .unwrap_or("STRING");
2096                                                let nested_physical_type = prop_obj
2097                                                    .get("physicalType")
2098                                                    .and_then(|v| v.as_str())
2099                                                    .map(|s| s.to_string());
2100                                                columns.push(Column {
2101                                                    name: nested_col_name,
2102                                                    data_type: nested_field_type.to_uppercase(),
2103                                                    physical_type: nested_physical_type,
2104                                                    nullable: !prop_obj
2105                                                        .get("required")
2106                                                        .and_then(|v| v.as_bool())
2107                                                        .unwrap_or(false),
2108                                                    description: prop_obj
2109                                                        .get("description")
2110                                                        .and_then(|v| v.as_str())
2111                                                        .unwrap_or("")
2112                                                        .to_string(),
2113                                                    ..Default::default()
2114                                                });
2115                                            }
2116                                        }
2117                                    }
2118                                }
2119                            }
2120                        }
2121
2122                        return Ok(columns);
2123                    } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
2124                        // Array of simple type
2125                        let array_physical_type = field_data
2126                            .get("physicalType")
2127                            .and_then(|v| v.as_str())
2128                            .map(|s| s.to_string());
2129                        columns.push(Column {
2130                            name: field_name.to_string(),
2131                            data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
2132                            physical_type: array_physical_type,
2133                            nullable: !field_data
2134                                .get("required")
2135                                .and_then(|v| v.as_bool())
2136                                .unwrap_or(false),
2137                            description: description.clone(),
2138                            quality: quality_rules.clone(),
2139                            relationships: ref_to_relationships(
2140                                &field_data
2141                                    .get("$ref")
2142                                    .and_then(|v| v.as_str())
2143                                    .map(|s| s.to_string()),
2144                            ),
2145                            ..Default::default()
2146                        });
2147                        return Ok(columns);
2148                    }
2149                } else if let Some(item_type_str) = items_val.as_str() {
2150                    // Array of simple type (string)
2151                    let array_physical_type = field_data
2152                        .get("physicalType")
2153                        .and_then(|v| v.as_str())
2154                        .map(|s| s.to_string());
2155                    columns.push(Column {
2156                        name: field_name.to_string(),
2157                        data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
2158                        physical_type: array_physical_type,
2159                        nullable: !field_data
2160                            .get("required")
2161                            .and_then(|v| v.as_bool())
2162                            .unwrap_or(false),
2163                        description: description.clone(),
2164                        quality: quality_rules.clone(),
2165                        relationships: ref_to_relationships(
2166                            &field_data
2167                                .get("$ref")
2168                                .and_then(|v| v.as_str())
2169                                .map(|s| s.to_string()),
2170                        ),
2171                        ..Default::default()
2172                    });
2173                    return Ok(columns);
2174                }
2175            }
2176            // Array without items - default to ARRAY<STRING>
2177            let array_physical_type = field_data
2178                .get("physicalType")
2179                .and_then(|v| v.as_str())
2180                .map(|s| s.to_string());
2181            columns.push(Column {
2182                name: field_name.to_string(),
2183                data_type: "ARRAY<STRING>".to_string(),
2184                physical_type: array_physical_type,
2185                nullable: !field_data
2186                    .get("required")
2187                    .and_then(|v| v.as_bool())
2188                    .unwrap_or(false),
2189                description: description.clone(),
2190                quality: quality_rules.clone(),
2191                relationships: ref_to_relationships(
2192                    &field_data
2193                        .get("$ref")
2194                        .and_then(|v| v.as_str())
2195                        .map(|s| s.to_string()),
2196                ),
2197                ..Default::default()
2198            });
2199            return Ok(columns);
2200        }
2201
2202        // Check if this is a nested object with fields or properties
2203        // Note: Export saves nested columns in properties.properties, but some formats use fields
2204        // Handle both object format (legacy/ODCL) and array format (ODCS v3.1.0)
2205        let nested_fields_obj = field_data
2206            .get("properties")
2207            .and_then(|v| v.as_object())
2208            .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
2209        let nested_fields_arr = field_data.get("properties").and_then(|v| v.as_array());
2210
2211        if field_type == "OBJECT" && (nested_fields_obj.is_some() || nested_fields_arr.is_some()) {
2212            // Inline nested object - create parent column as OBJECT and extract nested fields
2213
2214            // Extract physicalType for the parent OBJECT column
2215            let parent_physical_type = field_data
2216                .get("physicalType")
2217                .and_then(|v| v.as_str())
2218                .map(|s| s.to_string());
2219
2220            // Create parent column
2221            columns.push(Column {
2222                name: field_name.to_string(),
2223                data_type: "OBJECT".to_string(),
2224                physical_type: parent_physical_type,
2225                nullable: !field_data
2226                    .get("required")
2227                    .and_then(|v| v.as_bool())
2228                    .unwrap_or(false),
2229                description: description.clone(),
2230                quality: quality_rules.clone(),
2231                relationships: ref_to_relationships(
2232                    &field_data
2233                        .get("$ref")
2234                        .and_then(|v| v.as_str())
2235                        .map(|s| s.to_string()),
2236                ),
2237                ..Default::default()
2238            });
2239
2240            // Extract nested fields recursively - handle both object and array formats
2241            if let Some(fields_obj) = nested_fields_obj {
2242                // Object format (legacy/ODCL): properties is a map of name -> schema
2243                for (nested_field_name, nested_field_data) in fields_obj {
2244                    if let Some(nested_field_obj) = nested_field_data.as_object() {
2245                        let nested_field_type = nested_field_obj
2246                            .get("logicalType")
2247                            .and_then(|v| v.as_str())
2248                            .or_else(|| nested_field_obj.get("type").and_then(|v| v.as_str()))
2249                            .unwrap_or("STRING");
2250
2251                        // Recursively parse nested fields
2252                        let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2253                        match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data)
2254                        {
2255                            Ok(mut nested_cols) => {
2256                                // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2257                                // Otherwise, it returns a single column
2258                                columns.append(&mut nested_cols);
2259                            }
2260                            Err(_) => {
2261                                // Fallback: create simple nested column
2262                                let nested_physical_type = nested_field_obj
2263                                    .get("physicalType")
2264                                    .and_then(|v| v.as_str())
2265                                    .map(|s| s.to_string());
2266                                columns.push(Column {
2267                                    name: nested_col_name,
2268                                    data_type: nested_field_type.to_uppercase(),
2269                                    physical_type: nested_physical_type,
2270                                    nullable: !nested_field_obj
2271                                        .get("required")
2272                                        .and_then(|v| v.as_bool())
2273                                        .unwrap_or(false),
2274                                    description: nested_field_obj
2275                                        .get("description")
2276                                        .and_then(|v| v.as_str())
2277                                        .unwrap_or("")
2278                                        .to_string(),
2279                                    ..Default::default()
2280                                });
2281                            }
2282                        }
2283                    }
2284                }
2285            } else if let Some(fields_arr) = nested_fields_arr {
2286                // Array format (ODCS v3.1.0): properties is an array with 'name' field
2287                for prop_data in fields_arr {
2288                    if let Some(prop_obj) = prop_data.as_object() {
2289                        // Extract name from property object (required in v3.1.0)
2290                        let nested_field_name = prop_obj
2291                            .get("name")
2292                            .or_else(|| prop_obj.get("id"))
2293                            .and_then(|v| v.as_str())
2294                            .unwrap_or("");
2295
2296                        if !nested_field_name.is_empty() {
2297                            let nested_field_type = prop_obj
2298                                .get("logicalType")
2299                                .and_then(|v| v.as_str())
2300                                .or_else(|| prop_obj.get("type").and_then(|v| v.as_str()))
2301                                .unwrap_or("STRING");
2302
2303                            // Recursively parse nested fields
2304                            let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2305                            match self.parse_odcl_v3_property(&nested_col_name, prop_obj, data) {
2306                                Ok(mut nested_cols) => {
2307                                    // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2308                                    // Otherwise, it returns a single column
2309                                    columns.append(&mut nested_cols);
2310                                }
2311                                Err(_) => {
2312                                    // Fallback: create simple nested column
2313                                    let nested_physical_type = prop_obj
2314                                        .get("physicalType")
2315                                        .and_then(|v| v.as_str())
2316                                        .map(|s| s.to_string());
2317                                    columns.push(Column {
2318                                        name: nested_col_name,
2319                                        data_type: nested_field_type.to_uppercase(),
2320                                        physical_type: nested_physical_type,
2321                                        nullable: !prop_obj
2322                                            .get("required")
2323                                            .and_then(|v| v.as_bool())
2324                                            .unwrap_or(false),
2325                                        description: prop_obj
2326                                            .get("description")
2327                                            .and_then(|v| v.as_str())
2328                                            .unwrap_or("")
2329                                            .to_string(),
2330                                        ..Default::default()
2331                                    });
2332                                }
2333                            }
2334                        }
2335                    }
2336                }
2337            }
2338
2339            return Ok(columns);
2340        }
2341
2342        // Regular field (no $ref or $ref not found)
2343        let required = field_data
2344            .get("required")
2345            .and_then(|v| v.as_bool())
2346            .unwrap_or(false);
2347
2348        // Use description extracted at function start, or extract if not yet extracted
2349        let field_description = if description.is_empty() {
2350            field_data
2351                .get("description")
2352                .and_then(|v| v.as_str())
2353                .unwrap_or("")
2354                .to_string()
2355        } else {
2356            description
2357        };
2358
2359        // Use quality_rules extracted at function start, or extract if not yet extracted
2360        let mut column_quality_rules = quality_rules;
2361
2362        // Extract column-level quality rules if not already extracted
2363        if column_quality_rules.is_empty()
2364            && let Some(quality_val) = field_data.get("quality")
2365        {
2366            if let Some(arr) = quality_val.as_array() {
2367                // Array of quality rules
2368                for item in arr {
2369                    if let Some(obj) = item.as_object() {
2370                        let mut rule = HashMap::new();
2371                        for (key, value) in obj {
2372                            rule.insert(key.clone(), json_value_to_serde_value(value));
2373                        }
2374                        column_quality_rules.push(rule);
2375                    }
2376                }
2377            } else if let Some(obj) = quality_val.as_object() {
2378                // Single quality rule object
2379                let mut rule = HashMap::new();
2380                for (key, value) in obj {
2381                    rule.insert(key.clone(), json_value_to_serde_value(value));
2382                }
2383                column_quality_rules.push(rule);
2384            }
2385        }
2386
2387        // Note: ODCS v3.1.0 uses the `required` field at the property level to indicate non-nullable columns,
2388        // not a quality rule. We should NOT add a "not_null" quality rule as it's not a valid ODCS quality type.
2389        // The `required` field will be set based on the `required` parameter during column creation.
2390
2391        // Extract physicalType (ODCS v3.1.0) - the actual database type
2392        let physical_type = field_data
2393            .get("physicalType")
2394            .and_then(|v| v.as_str())
2395            .map(|s| s.to_string());
2396
2397        // Parse the column with all ODCS v3.1.0 metadata fields
2398        let column = self.parse_column_metadata_from_field(
2399            field_name,
2400            &field_type,
2401            physical_type,
2402            !required,
2403            field_description,
2404            column_quality_rules,
2405            field_data,
2406        );
2407
2408        columns.push(column);
2409
2410        Ok(columns)
2411    }
2412
2413    /// Parse all column metadata fields from ODCS v3.1.0 field data.
2414    /// This extracts all supported column-level metadata including:
2415    /// - businessName, physicalName, logicalTypeOptions
2416    /// - primaryKey, primaryKeyPosition, unique
2417    /// - partitioned, partitionKeyPosition, clustered
2418    /// - classification, criticalDataElement, encryptedName
2419    /// - transformSourceObjects, transformLogic, transformDescription
2420    /// - examples, authoritativeDefinitions, tags, customProperties
2421    #[allow(clippy::too_many_arguments)]
2422    fn parse_column_metadata_from_field(
2423        &self,
2424        name: &str,
2425        data_type: &str,
2426        physical_type: Option<String>,
2427        nullable: bool,
2428        description: String,
2429        quality: Vec<HashMap<String, serde_json::Value>>,
2430        field_data: &serde_json::Map<String, JsonValue>,
2431    ) -> Column {
2432        use crate::models::{AuthoritativeDefinition, LogicalTypeOptions};
2433
2434        // businessName
2435        let business_name = field_data
2436            .get("businessName")
2437            .and_then(|v| v.as_str())
2438            .map(|s| s.to_string());
2439
2440        // physicalName
2441        let physical_name = field_data
2442            .get("physicalName")
2443            .and_then(|v| v.as_str())
2444            .map(|s| s.to_string());
2445
2446        // logicalTypeOptions
2447        let logical_type_options = field_data.get("logicalTypeOptions").and_then(|v| {
2448            v.as_object().map(|opts| LogicalTypeOptions {
2449                min_length: opts.get("minLength").and_then(|v| v.as_i64()),
2450                max_length: opts.get("maxLength").and_then(|v| v.as_i64()),
2451                pattern: opts
2452                    .get("pattern")
2453                    .and_then(|v| v.as_str())
2454                    .map(|s| s.to_string()),
2455                format: opts
2456                    .get("format")
2457                    .and_then(|v| v.as_str())
2458                    .map(|s| s.to_string()),
2459                minimum: opts.get("minimum").cloned(),
2460                maximum: opts.get("maximum").cloned(),
2461                exclusive_minimum: opts.get("exclusiveMinimum").cloned(),
2462                exclusive_maximum: opts.get("exclusiveMaximum").cloned(),
2463                precision: opts
2464                    .get("precision")
2465                    .and_then(|v| v.as_i64())
2466                    .map(|n| n as i32),
2467                scale: opts.get("scale").and_then(|v| v.as_i64()).map(|n| n as i32),
2468            })
2469        });
2470
2471        // primaryKey
2472        let primary_key = field_data
2473            .get("primaryKey")
2474            .and_then(|v| v.as_bool())
2475            .unwrap_or(false);
2476
2477        // primaryKeyPosition
2478        let primary_key_position = field_data
2479            .get("primaryKeyPosition")
2480            .and_then(|v| v.as_i64())
2481            .map(|n| n as i32);
2482
2483        // unique
2484        let unique = field_data
2485            .get("unique")
2486            .and_then(|v| v.as_bool())
2487            .unwrap_or(false);
2488
2489        // partitioned
2490        let partitioned = field_data
2491            .get("partitioned")
2492            .and_then(|v| v.as_bool())
2493            .unwrap_or(false);
2494
2495        // partitionKeyPosition
2496        let partition_key_position = field_data
2497            .get("partitionKeyPosition")
2498            .and_then(|v| v.as_i64())
2499            .map(|n| n as i32);
2500
2501        // clustered
2502        let clustered = field_data
2503            .get("clustered")
2504            .and_then(|v| v.as_bool())
2505            .unwrap_or(false);
2506
2507        // classification
2508        let classification = field_data
2509            .get("classification")
2510            .and_then(|v| v.as_str())
2511            .map(|s| s.to_string());
2512
2513        // criticalDataElement
2514        let critical_data_element = field_data
2515            .get("criticalDataElement")
2516            .and_then(|v| v.as_bool())
2517            .unwrap_or(false);
2518
2519        // encryptedName
2520        let encrypted_name = field_data
2521            .get("encryptedName")
2522            .and_then(|v| v.as_str())
2523            .map(|s| s.to_string());
2524
2525        // transformSourceObjects
2526        let transform_source_objects = field_data
2527            .get("transformSourceObjects")
2528            .and_then(|v| v.as_array())
2529            .map(|arr| {
2530                arr.iter()
2531                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
2532                    .collect()
2533            })
2534            .unwrap_or_default();
2535
2536        // transformLogic
2537        let transform_logic = field_data
2538            .get("transformLogic")
2539            .and_then(|v| v.as_str())
2540            .map(|s| s.to_string());
2541
2542        // transformDescription
2543        let transform_description = field_data
2544            .get("transformDescription")
2545            .and_then(|v| v.as_str())
2546            .map(|s| s.to_string());
2547
2548        // examples
2549        let examples = field_data
2550            .get("examples")
2551            .and_then(|v| v.as_array())
2552            .map(|arr| arr.to_vec())
2553            .unwrap_or_default();
2554
2555        // authoritativeDefinitions
2556        let authoritative_definitions = field_data
2557            .get("authoritativeDefinitions")
2558            .and_then(|v| v.as_array())
2559            .map(|arr| {
2560                arr.iter()
2561                    .filter_map(|item| {
2562                        item.as_object().map(|obj| AuthoritativeDefinition {
2563                            definition_type: obj
2564                                .get("type")
2565                                .and_then(|v| v.as_str())
2566                                .unwrap_or("")
2567                                .to_string(),
2568                            url: obj
2569                                .get("url")
2570                                .and_then(|v| v.as_str())
2571                                .unwrap_or("")
2572                                .to_string(),
2573                        })
2574                    })
2575                    .collect()
2576            })
2577            .unwrap_or_default();
2578
2579        // tags
2580        let tags = field_data
2581            .get("tags")
2582            .and_then(|v| v.as_array())
2583            .map(|arr| {
2584                arr.iter()
2585                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
2586                    .collect()
2587            })
2588            .unwrap_or_default();
2589
2590        // customProperties
2591        let custom_properties = field_data
2592            .get("customProperties")
2593            .and_then(|v| v.as_array())
2594            .map(|arr| {
2595                arr.iter()
2596                    .filter_map(|item| {
2597                        item.as_object().and_then(|obj| {
2598                            let key = obj.get("property").and_then(|v| v.as_str())?;
2599                            let value = obj.get("value").cloned()?;
2600                            Some((key.to_string(), value))
2601                        })
2602                    })
2603                    .collect()
2604            })
2605            .unwrap_or_default();
2606
2607        // businessKey (secondary_key)
2608        let secondary_key = field_data
2609            .get("businessKey")
2610            .and_then(|v| v.as_bool())
2611            .unwrap_or(false);
2612
2613        // enum values
2614        let enum_values = field_data
2615            .get("enum")
2616            .and_then(|v| v.as_array())
2617            .map(|arr| {
2618                arr.iter()
2619                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
2620                    .collect()
2621            })
2622            .unwrap_or_default();
2623
2624        // constraints
2625        let constraints = field_data
2626            .get("constraints")
2627            .and_then(|v| v.as_array())
2628            .map(|arr| {
2629                arr.iter()
2630                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
2631                    .collect()
2632            })
2633            .unwrap_or_default();
2634
2635        Column {
2636            name: name.to_string(),
2637            data_type: data_type.to_string(),
2638            physical_type,
2639            physical_name,
2640            nullable,
2641            description,
2642            quality,
2643            business_name,
2644            logical_type_options,
2645            primary_key,
2646            primary_key_position,
2647            unique,
2648            partitioned,
2649            partition_key_position,
2650            clustered,
2651            classification,
2652            critical_data_element,
2653            encrypted_name,
2654            transform_source_objects,
2655            transform_logic,
2656            transform_description,
2657            examples,
2658            authoritative_definitions,
2659            tags,
2660            custom_properties,
2661            secondary_key,
2662            enum_values,
2663            constraints,
2664            foreign_key: self.parse_foreign_key_from_data_contract(field_data),
2665            relationships: self.parse_relationships_from_field(field_data),
2666            ..Default::default()
2667        }
2668    }
2669
2670    /// Parse foreign key from Data Contract field data.
2671    fn parse_foreign_key_from_data_contract(
2672        &self,
2673        field_data: &serde_json::Map<String, JsonValue>,
2674    ) -> Option<ForeignKey> {
2675        field_data
2676            .get("foreignKey")
2677            .and_then(|v| v.as_object())
2678            .map(|fk_obj| ForeignKey {
2679                table_id: fk_obj
2680                    .get("table")
2681                    .or_else(|| fk_obj.get("table_id"))
2682                    .and_then(|v| v.as_str())
2683                    .unwrap_or("")
2684                    .to_string(),
2685                column_name: fk_obj
2686                    .get("column")
2687                    .or_else(|| fk_obj.get("column_name"))
2688                    .and_then(|v| v.as_str())
2689                    .unwrap_or("")
2690                    .to_string(),
2691            })
2692    }
2693
2694    /// Parse relationships from Data Contract field data.
2695    /// Also converts $ref to a relationship entry if present.
2696    fn parse_relationships_from_field(
2697        &self,
2698        field_data: &serde_json::Map<String, JsonValue>,
2699    ) -> Vec<PropertyRelationship> {
2700        let mut relationships = Vec::new();
2701
2702        // Parse relationships array from ODCS v3.1.0 format
2703        if let Some(rels_array) = field_data.get("relationships").and_then(|v| v.as_array()) {
2704            for rel in rels_array {
2705                if let Some(rel_obj) = rel.as_object() {
2706                    let rel_type = rel_obj
2707                        .get("type")
2708                        .and_then(|v| v.as_str())
2709                        .unwrap_or("foreignKey")
2710                        .to_string();
2711                    let to = rel_obj
2712                        .get("to")
2713                        .and_then(|v| v.as_str())
2714                        .unwrap_or("")
2715                        .to_string();
2716
2717                    if !to.is_empty() {
2718                        relationships.push(PropertyRelationship {
2719                            relationship_type: rel_type,
2720                            to,
2721                        });
2722                    }
2723                }
2724            }
2725        }
2726
2727        // Convert $ref to relationship if present and no relationships were parsed
2728        if relationships.is_empty()
2729            && let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str())
2730        {
2731            // Convert $ref path to ODCS relationship format
2732            let to = if ref_str.starts_with("#/definitions/") {
2733                let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
2734                format!("definitions/{}", def_path)
2735            } else if ref_str.starts_with("#/") {
2736                ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
2737            } else {
2738                ref_str.to_string()
2739            };
2740
2741            relationships.push(PropertyRelationship {
2742                relationship_type: "foreignKey".to_string(),
2743                to,
2744            });
2745        }
2746
2747        relationships
2748    }
2749
2750    /// Extract database type from servers in Data Contract format.
2751    fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2752        // Data Contract format: servers can be object or array
2753        if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
2754            // Object format: { "server_name": { "type": "..." } }
2755            if let Some((_, server_data)) = servers_obj.iter().next()
2756                && let Some(server_obj) = server_data.as_object()
2757            {
2758                return server_obj
2759                    .get("type")
2760                    .and_then(|v| v.as_str())
2761                    .and_then(|s| self.parse_database_type(s));
2762            }
2763        } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
2764            // Array format: [ { "server": "...", "type": "..." } ]
2765            if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
2766                return server_obj
2767                    .get("type")
2768                    .and_then(|v| v.as_str())
2769                    .and_then(|s| self.parse_database_type(s));
2770            }
2771        }
2772        None
2773    }
2774
2775    /// Parse database type string to enum.
2776    fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
2777        match s.to_lowercase().as_str() {
2778            "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
2779            "postgres" | "postgresql" => Some(DatabaseType::Postgres),
2780            "mysql" => Some(DatabaseType::Mysql),
2781            "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
2782            "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
2783            _ => None,
2784        }
2785    }
2786
2787    /// Parse STRUCT type definition from string (e.g., "ARRAY<STRUCT<ID: STRING, NAME: STRING>>").
2788    /// Parse STRUCT type from string and create nested columns
2789    /// This is public so it can be used by SQL importer to parse STRUCT types
2790    pub fn parse_struct_type_from_string(
2791        &self,
2792        field_name: &str,
2793        type_str: &str,
2794        field_data: &serde_json::Map<String, JsonValue>,
2795    ) -> Result<Vec<Column>> {
2796        let mut columns = Vec::new();
2797
2798        // Normalize whitespace - replace newlines and multiple spaces with single space
2799        let normalized_type = type_str
2800            .lines()
2801            .map(|line| line.trim())
2802            .filter(|line| !line.is_empty())
2803            .collect::<Vec<_>>()
2804            .join(" ");
2805
2806        let type_str_upper = normalized_type.to_uppercase();
2807
2808        // Check if it's ARRAY<STRUCT<...>>
2809        let is_array = type_str_upper.starts_with("ARRAY<");
2810        let struct_start = type_str_upper.find("STRUCT<");
2811
2812        if let Some(start_pos) = struct_start {
2813            // Extract STRUCT content - start from STRUCT< and find its closing >
2814            // For ARRAY<STRUCT<...>>, we still need to find the STRUCT<...> closing bracket
2815            let struct_content_start = start_pos + 7; // Skip "STRUCT<"
2816            let struct_content = &normalized_type[struct_content_start..];
2817
2818            // Find matching closing bracket for STRUCT<, handling nested STRUCTs
2819            // We need to find the > that closes STRUCT<, not ARRAY<
2820            let mut depth = 1;
2821            let mut end_pos = None;
2822            for (i, ch) in struct_content.char_indices() {
2823                match ch {
2824                    '<' => depth += 1,
2825                    '>' => {
2826                        depth -= 1;
2827                        if depth == 0 {
2828                            end_pos = Some(i);
2829                            break;
2830                        }
2831                    }
2832                    _ => {}
2833                }
2834            }
2835
2836            // If no closing bracket found, try to infer it (handle malformed YAML)
2837            let struct_fields_str = if let Some(end) = end_pos {
2838                &struct_content[..end]
2839            } else {
2840                // Missing closing bracket - use everything up to the end
2841                struct_content.trim_end_matches('>').trim()
2842            };
2843
2844            // Parse fields: "ID: STRING, NAME: STRING, ..."
2845            let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
2846
2847            // Create nested columns, handling nested STRUCTs recursively
2848            // For ARRAY<STRUCT<...>>, use .[] notation for array elements: parent.[].field
2849            // For STRUCT<...>, use dot notation: parent.field
2850            for (nested_name, nested_type) in fields {
2851                let nested_type_upper = nested_type.to_uppercase();
2852                let nested_col_name = if is_array {
2853                    format!("{}.[].{}", field_name, nested_name)
2854                } else {
2855                    format!("{}.{}", field_name, nested_name)
2856                };
2857
2858                // Check if this field is itself a STRUCT or ARRAY<STRUCT>
2859                // Handle multiple levels of nesting: STRUCT<...>, ARRAY<STRUCT<...>>, ARRAY<STRUCT<...>> within STRUCT, etc.
2860                let is_nested_struct = nested_type_upper.starts_with("STRUCT<");
2861                let is_nested_array_struct = nested_type_upper.starts_with("ARRAY<STRUCT<");
2862
2863                if is_nested_struct || is_nested_array_struct {
2864                    // Recursively parse nested STRUCT or ARRAY<STRUCT>
2865                    // The nested_col_name already includes the parent path:
2866                    // - For STRUCT within ARRAY: "items.[].details"
2867                    // - For STRUCT within STRUCT: "orderInfo.metadata"
2868                    // - For ARRAY<STRUCT> within STRUCT: "parent.items"
2869                    // - For ARRAY<STRUCT> within ARRAY<STRUCT>: "parent.[].items"
2870                    // Recursive calls will correctly build the full path with proper notation
2871                    match self.parse_struct_type_from_string(
2872                        &nested_col_name,
2873                        &nested_type,
2874                        field_data,
2875                    ) {
2876                        Ok(nested_cols) => {
2877                            // Add all recursively parsed columns
2878                            // These will have names like:
2879                            // - items.[].details.name (STRUCT within ARRAY<STRUCT>)
2880                            // - orderInfo.metadata.field (STRUCT within STRUCT)
2881                            // - parent.items.[].field (ARRAY<STRUCT> within STRUCT)
2882                            // - parent.[].items.[].field (ARRAY<STRUCT> within ARRAY<STRUCT>)
2883                            columns.extend(nested_cols);
2884                        }
2885                        Err(_) => {
2886                            // If recursive parsing fails, add the parent column with STRUCT/ARRAY type
2887                            let fallback_data_type = if is_nested_array_struct {
2888                                "ARRAY<STRUCT<...>>".to_string()
2889                            } else {
2890                                "STRUCT<...>".to_string()
2891                            };
2892                            let nested_physical_type = field_data
2893                                .get("physicalType")
2894                                .and_then(|v| v.as_str())
2895                                .map(|s| s.to_string());
2896                            columns.push(Column {
2897                                name: nested_col_name,
2898                                data_type: fallback_data_type,
2899                                physical_type: nested_physical_type,
2900                                nullable: !field_data
2901                                    .get("required")
2902                                    .and_then(|v| v.as_bool())
2903                                    .unwrap_or(false),
2904                                description: field_data
2905                                    .get("description")
2906                                    .and_then(|v| v.as_str())
2907                                    .unwrap_or("")
2908                                    .to_string(),
2909                                ..Default::default()
2910                            });
2911                        }
2912                    }
2913                } else if nested_type_upper.starts_with("ARRAY<") {
2914                    // Handle ARRAY of simple types (not STRUCT) - these don't need nested columns
2915                    // But if it's ARRAY<STRUCT<...>>, it would have been caught above
2916                    let nested_physical_type = field_data
2917                        .get("physicalType")
2918                        .and_then(|v| v.as_str())
2919                        .map(|s| s.to_string());
2920                    columns.push(Column {
2921                        name: nested_col_name,
2922                        data_type: normalize_data_type(&nested_type),
2923                        physical_type: nested_physical_type,
2924                        nullable: !field_data
2925                            .get("required")
2926                            .and_then(|v| v.as_bool())
2927                            .unwrap_or(false),
2928                        description: field_data
2929                            .get("description")
2930                            .and_then(|v| v.as_str())
2931                            .unwrap_or("")
2932                            .to_string(),
2933                        ..Default::default()
2934                    });
2935                } else {
2936                    // Simple nested field (not a STRUCT)
2937                    let nested_physical_type = field_data
2938                        .get("physicalType")
2939                        .and_then(|v| v.as_str())
2940                        .map(|s| s.to_string());
2941                    columns.push(Column {
2942                        name: nested_col_name,
2943                        data_type: normalize_data_type(&nested_type),
2944                        physical_type: nested_physical_type,
2945                        nullable: !field_data
2946                            .get("required")
2947                            .and_then(|v| v.as_bool())
2948                            .unwrap_or(false),
2949                        description: field_data
2950                            .get("description")
2951                            .and_then(|v| v.as_str())
2952                            .unwrap_or("")
2953                            .to_string(),
2954                        ..Default::default()
2955                    });
2956                }
2957            }
2958
2959            return Ok(columns);
2960        }
2961
2962        // If no STRUCT found, return empty (fallback to regular parsing)
2963        Ok(Vec::new())
2964    }
2965
2966    /// Parse STRUCT fields from string (e.g., "ID: STRING, NAME: STRING").
2967    fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
2968        let mut fields = Vec::new();
2969        let mut current_field = String::new();
2970        let mut depth = 0;
2971        let mut in_string = false;
2972        let mut string_char = None;
2973
2974        for ch in fields_str.chars() {
2975            match ch {
2976                '\'' | '"' if !in_string || Some(ch) == string_char => {
2977                    if in_string {
2978                        in_string = false;
2979                        string_char = None;
2980                    } else {
2981                        in_string = true;
2982                        string_char = Some(ch);
2983                    }
2984                    current_field.push(ch);
2985                }
2986                '<' if !in_string => {
2987                    depth += 1;
2988                    current_field.push(ch);
2989                }
2990                '>' if !in_string => {
2991                    depth -= 1;
2992                    current_field.push(ch);
2993                }
2994                ',' if !in_string && depth == 0 => {
2995                    // End of current field
2996                    let trimmed = current_field.trim();
2997                    if !trimmed.is_empty()
2998                        && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2999                    {
3000                        fields.push((name, type_part));
3001                    }
3002                    current_field.clear();
3003                }
3004                _ => {
3005                    current_field.push(ch);
3006                }
3007            }
3008        }
3009
3010        // Handle last field
3011        let trimmed = current_field.trim();
3012        if !trimmed.is_empty()
3013            && let Some((name, type_part)) = self.parse_field_definition(trimmed)
3014        {
3015            fields.push((name, type_part));
3016        }
3017
3018        Ok(fields)
3019    }
3020
3021    /// Parse a single field definition (e.g., "ID: STRING" or "DETAILS: STRUCT<...>").
3022    fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
3023        // Split by colon, but handle nested STRUCTs
3024        let colon_pos = field_def.find(':')?;
3025        let name = field_def[..colon_pos].trim().to_string();
3026        let type_part = field_def[colon_pos + 1..].trim().to_string();
3027
3028        if name.is_empty() || type_part.is_empty() {
3029            return None;
3030        }
3031
3032        Some((name, type_part))
3033    }
3034
3035    /// Extract catalog and schema from customProperties.
3036    fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
3037        let mut catalog_name = None;
3038        let mut schema_name = None;
3039
3040        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
3041            for prop in custom_props {
3042                if let Some(prop_obj) = prop.as_object() {
3043                    let prop_key = prop_obj
3044                        .get("property")
3045                        .and_then(|v| v.as_str())
3046                        .unwrap_or("");
3047                    let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
3048
3049                    match prop_key {
3050                        "catalogName" | "catalog_name" => {
3051                            catalog_name = prop_value.map(|s| s.to_string());
3052                        }
3053                        "schemaName" | "schema_name" => {
3054                            schema_name = prop_value.map(|s| s.to_string());
3055                        }
3056                        _ => {}
3057                    }
3058                }
3059            }
3060        }
3061
3062        // Also check direct fields
3063        if catalog_name.is_none() {
3064            catalog_name = data
3065                .get("catalog_name")
3066                .and_then(|v| v.as_str())
3067                .map(|s| s.to_string());
3068        }
3069        if schema_name.is_none() {
3070            schema_name = data
3071                .get("schema_name")
3072                .and_then(|v| v.as_str())
3073                .map(|s| s.to_string());
3074        }
3075
3076        (catalog_name, schema_name)
3077    }
3078}
3079
3080impl Default for ODCSImporter {
3081    fn default() -> Self {
3082        Self::new()
3083    }
3084}
3085
3086#[cfg(test)]
3087mod tests {
3088    use super::*;
3089
3090    #[test]
3091    fn test_parse_simple_odcl_table() {
3092        let mut parser = ODCSImporter::new();
3093        let odcl_yaml = r#"
3094name: users
3095columns:
3096  - name: id
3097    data_type: INT
3098    nullable: false
3099    primary_key: true
3100  - name: name
3101    data_type: VARCHAR(255)
3102    nullable: false
3103database_type: Postgres
3104"#;
3105
3106        let (table, errors) = parser.parse(odcl_yaml).unwrap();
3107        assert_eq!(table.name, "users");
3108        assert_eq!(table.columns.len(), 2);
3109        assert_eq!(table.columns[0].name, "id");
3110        assert_eq!(table.database_type, Some(DatabaseType::Postgres));
3111        assert_eq!(errors.len(), 0);
3112    }
3113
3114    #[test]
3115    fn test_parse_odcl_with_metadata() {
3116        let mut parser = ODCSImporter::new();
3117        let odcl_yaml = r#"
3118name: users
3119columns:
3120  - name: id
3121    data_type: INT
3122medallion_layer: gold
3123scd_pattern: TYPE_2
3124odcl_metadata:
3125  description: "User table"
3126  owner: "data-team"
3127"#;
3128
3129        let (table, errors) = parser.parse(odcl_yaml).unwrap();
3130        assert_eq!(table.medallion_layers.len(), 1);
3131        assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
3132        assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
3133        if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
3134            assert_eq!(desc, "User table");
3135        }
3136        assert_eq!(errors.len(), 0);
3137    }
3138
3139    #[test]
3140    fn test_parse_odcl_with_data_vault() {
3141        let mut parser = ODCSImporter::new();
3142        let odcl_yaml = r#"
3143name: hub_customer
3144columns:
3145  - name: customer_key
3146    data_type: VARCHAR(50)
3147data_vault_classification: Hub
3148"#;
3149
3150        let (table, errors) = parser.parse(odcl_yaml).unwrap();
3151        assert_eq!(
3152            table.data_vault_classification,
3153            Some(DataVaultClassification::Hub)
3154        );
3155        assert_eq!(errors.len(), 0);
3156    }
3157
3158    #[test]
3159    fn test_parse_invalid_odcl() {
3160        let mut parser = ODCSImporter::new();
3161        let invalid_yaml = "not: valid: yaml: structure:";
3162
3163        // Should fail to parse YAML
3164        assert!(parser.parse(invalid_yaml).is_err());
3165    }
3166
3167    #[test]
3168    fn test_parse_odcl_missing_required_fields() {
3169        let mut parser = ODCSImporter::new();
3170        let non_conformant = r#"
3171name: users
3172# Missing required columns field
3173"#;
3174
3175        // Should fail with missing columns
3176        assert!(parser.parse(non_conformant).is_err());
3177    }
3178
3179    #[test]
3180    fn test_parse_odcl_with_foreign_key() {
3181        let mut parser = ODCSImporter::new();
3182        let odcl_yaml = r#"
3183name: orders
3184columns:
3185  - name: id
3186    data_type: INT
3187    primary_key: true
3188  - name: user_id
3189    data_type: INT
3190    foreign_key:
3191      table_id: users
3192      column_name: id
3193"#;
3194
3195        let (table, errors) = parser.parse(odcl_yaml).unwrap();
3196        assert_eq!(table.columns.len(), 2);
3197        let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
3198        assert!(user_id_col.foreign_key.is_some());
3199        assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
3200        assert_eq!(errors.len(), 0);
3201    }
3202
3203    #[test]
3204    fn test_parse_odcl_with_constraints() {
3205        let mut parser = ODCSImporter::new();
3206        let odcl_yaml = r#"
3207name: products
3208columns:
3209  - name: id
3210    data_type: INT
3211    primary_key: true
3212  - name: name
3213    data_type: VARCHAR(255)
3214    nullable: false
3215    constraints:
3216      - UNIQUE
3217      - NOT NULL
3218"#;
3219
3220        let (table, errors) = parser.parse(odcl_yaml).unwrap();
3221        assert_eq!(table.columns.len(), 2);
3222        let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
3223        assert!(!name_col.nullable);
3224        assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
3225        assert_eq!(errors.len(), 0);
3226    }
3227}