data_modelling_core/import/
odcs.rs

1//! ODCS parser service for parsing Open Data Contract Standard YAML files.
2//!
3//! This service parses ODCS (Open Data Contract Standard) v3.1.0 and legacy ODCL (Data Contract Specification) YAML files
4//! and converts them to Table models. ODCL files are automatically converted to ODCS v3.1.0 format.
5//! Supports multiple formats:
6//! - ODCS v3.1.0 / v3.0.x format (apiVersion, kind, schema) - PRIMARY FORMAT
7//! - ODCL (Data Contract Specification) format (dataContractSpecification, models, definitions) - LEGACY, converted to ODCS
8//! - Simple ODCL format (name, columns) - LEGACY, converted to ODCS
9//! - Liquibase format
10
11use super::odcs_shared::{
12    ParserError, column_to_column_data, expand_nested_column, json_value_to_serde_value,
13    normalize_data_type, parse_data_vault_classification, parse_medallion_layer, parse_scd_pattern,
14    resolve_ref, yaml_to_json_value,
15};
16use super::{ImportError, ImportResult, TableData};
17use crate::models::column::ForeignKey;
18use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
19use crate::models::{Column, PropertyRelationship, Table, Tag};
20use anyhow::{Context, Result};
21use serde_json::Value as JsonValue;
22use std::collections::HashMap;
23use std::str::FromStr;
24use tracing::info;
25
26// Re-export ParserError for backward compatibility
27pub use super::odcs_shared::ParserError as OdcsParserError;
28
29/// Convert a $ref path to a PropertyRelationship.
30/// E.g., "#/definitions/order_id" -> PropertyRelationship { type: "foreignKey", to: "definitions/order_id" }
31fn ref_to_relationships(ref_path: &Option<String>) -> Vec<PropertyRelationship> {
32    match ref_path {
33        Some(ref_str) => {
34            let to = if ref_str.starts_with("#/definitions/") {
35                let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
36                format!("definitions/{}", def_path)
37            } else if ref_str.starts_with("#/") {
38                ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
39            } else {
40                ref_str.clone()
41            };
42            vec![PropertyRelationship {
43                relationship_type: "foreignKey".to_string(),
44                to,
45            }]
46        }
47        None => Vec::new(),
48    }
49}
50
51/// ODCS parser service for parsing Open Data Contract Standard YAML files.
52/// Handles ODCS v3.1.0 (primary format) and legacy ODCL formats (converted to ODCS).
53pub struct ODCSImporter {
54    /// Current YAML data for $ref resolution
55    current_yaml_data: Option<serde_yaml::Value>,
56}
57
58impl ODCSImporter {
59    /// Create a new ODCS parser instance.
60    ///
61    /// # Example
62    ///
63    /// ```rust
64    /// use data_modelling_core::import::odcs::ODCSImporter;
65    ///
66    /// let mut importer = ODCSImporter::new();
67    /// ```
68    pub fn new() -> Self {
69        Self {
70            current_yaml_data: None,
71        }
72    }
73
74    /// Import ODCS/ODCL YAML content and create Table (SDK interface).
75    ///
76    /// Supports ODCS v3.1.0 (primary), legacy ODCL formats (converted to ODCS), and Liquibase formats.
77    ///
78    /// # Arguments
79    ///
80    /// * `yaml_content` - ODCS/ODCL YAML content as a string
81    ///
82    /// # Returns
83    ///
84    /// An `ImportResult` containing the extracted table and any parse errors.
85    ///
86    /// # Example
87    ///
88    /// ```rust
89    /// use data_modelling_core::import::odcs::ODCSImporter;
90    ///
91    /// let mut importer = ODCSImporter::new();
92    /// let yaml = r#"
93    /// apiVersion: v3.1.0
94    /// kind: DataContract
95    /// id: 550e8400-e29b-41d4-a716-446655440000
96    /// version: 1.0.0
97    /// name: users
98    /// schema:
99    ///   fields:
100    ///     - name: id
101    ///       type: bigint
102    /// "#;
103    /// let result = importer.import(yaml).unwrap();
104    /// assert_eq!(result.tables.len(), 1);
105    /// ```
106    pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
107        // First parse YAML to get raw data for ODCS field extraction
108        let yaml_data: serde_yaml::Value = serde_yaml::from_str(yaml_content)
109            .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
110
111        let json_data = yaml_to_json_value(&yaml_data).map_err(|e| {
112            ImportError::ParseError(format!("Failed to convert YAML to JSON: {}", e))
113        })?;
114
115        match self.parse(yaml_content) {
116            Ok((table, errors)) => {
117                // Extract all ODCS contract-level fields from the raw JSON data
118                let sdk_tables = vec![TableData {
119                    table_index: 0,
120                    id: Some(table.id.to_string()),
121                    name: Some(table.name.clone()),
122                    api_version: json_data
123                        .get("apiVersion")
124                        .and_then(|v| v.as_str())
125                        .map(|s| s.to_string()),
126                    version: json_data
127                        .get("version")
128                        .and_then(|v| v.as_str())
129                        .map(|s| s.to_string()),
130                    status: json_data
131                        .get("status")
132                        .and_then(|v| v.as_str())
133                        .map(|s| s.to_string()),
134                    kind: json_data
135                        .get("kind")
136                        .and_then(|v| v.as_str())
137                        .map(|s| s.to_string()),
138                    domain: json_data
139                        .get("domain")
140                        .and_then(|v| v.as_str())
141                        .map(|s| s.to_string()),
142                    data_product: json_data
143                        .get("dataProduct")
144                        .and_then(|v| v.as_str())
145                        .map(|s| s.to_string()),
146                    tenant: json_data
147                        .get("tenant")
148                        .and_then(|v| v.as_str())
149                        .map(|s| s.to_string()),
150                    description: json_data.get("description").cloned(),
151                    columns: table.columns.iter().map(column_to_column_data).collect(),
152                    servers: json_data
153                        .get("servers")
154                        .and_then(|v| v.as_array())
155                        .cloned()
156                        .unwrap_or_default(),
157                    team: json_data.get("team").cloned(),
158                    support: json_data.get("support").cloned(),
159                    roles: json_data
160                        .get("roles")
161                        .and_then(|v| v.as_array())
162                        .cloned()
163                        .unwrap_or_default(),
164                    sla_properties: json_data
165                        .get("slaProperties")
166                        .and_then(|v| v.as_array())
167                        .cloned()
168                        .unwrap_or_default(),
169                    quality: table.quality.clone(),
170                    price: json_data.get("price").cloned(),
171                    tags: table.tags.iter().map(|t| t.to_string()).collect(),
172                    custom_properties: json_data
173                        .get("customProperties")
174                        .and_then(|v| v.as_array())
175                        .cloned()
176                        .unwrap_or_default(),
177                    authoritative_definitions: json_data
178                        .get("authoritativeDefinitions")
179                        .and_then(|v| v.as_array())
180                        .cloned()
181                        .unwrap_or_default(),
182                    contract_created_ts: json_data
183                        .get("contractCreatedTs")
184                        .and_then(|v| v.as_str())
185                        .map(|s| s.to_string()),
186                    odcs_metadata: table.odcl_metadata.clone(),
187                }];
188                let sdk_errors: Vec<ImportError> = errors
189                    .iter()
190                    .map(|e| ImportError::ParseError(e.message.clone()))
191                    .collect();
192                Ok(ImportResult {
193                    tables: sdk_tables,
194                    tables_requiring_name: Vec::new(),
195                    errors: sdk_errors,
196                    ai_suggestions: None,
197                })
198            }
199            Err(e) => Err(ImportError::ParseError(e.to_string())),
200        }
201    }
202
203    /// Parse ODCS/ODCL YAML content and create Table (public method for native app use).
204    ///
205    /// This method returns the full Table object with all metadata, suitable for use in
206    /// native applications that need direct access to the parsed table structure.
207    /// For API use, prefer the `import()` method which returns ImportResult.
208    ///
209    /// # Returns
210    ///
211    /// Returns a tuple of (Table, list of errors/warnings).
212    /// Errors list is empty if parsing is successful.
213    pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
214        self.parse(yaml_content)
215    }
216
217    /// Parse ODCS/ODCL YAML content and create Table (internal method).
218    ///
219    /// Supports ODCS v3.1.0/v3.0.x (primary), ODCL Data Contract spec (legacy, converted to ODCS),
220    /// simple ODCL (legacy, converted to ODCS), and Liquibase formats.
221    ///
222    /// # Returns
223    ///
224    /// Returns a tuple of (Table, list of errors/warnings).
225    /// Errors list is empty if parsing is successful.
226    fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
227        // Errors are collected in helper functions, not here
228        let _errors: Vec<ParserError> = Vec::new();
229
230        // Parse YAML
231        let data: serde_yaml::Value =
232            serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
233
234        if data.is_null() {
235            return Err(anyhow::anyhow!("Empty YAML content"));
236        }
237
238        // Store current YAML data for $ref resolution
239        self.current_yaml_data = Some(data.clone());
240
241        // Convert to JSON Value for easier manipulation
242        let json_data = yaml_to_json_value(&data)?;
243
244        // Check format and parse accordingly
245        if self.is_liquibase_format(&json_data) {
246            return self.parse_liquibase(&json_data);
247        }
248
249        if self.is_odcl_v3_format(&json_data) {
250            return self.parse_odcl_v3(&json_data);
251        }
252
253        if self.is_data_contract_format(&json_data) {
254            return self.parse_data_contract(&json_data);
255        }
256
257        // Fall back to simple ODCL format
258        self.parse_simple_odcl(&json_data)
259    }
260
261    /// Check if YAML is in Liquibase format.
262    fn is_liquibase_format(&self, data: &JsonValue) -> bool {
263        if data.get("databaseChangeLog").is_some() {
264            return true;
265        }
266        // Check for Liquibase-specific keys
267        if let Some(obj) = data.as_object() {
268            let obj_str = format!("{:?}", obj);
269            if obj_str.contains("changeSet") {
270                return true;
271            }
272        }
273        false
274    }
275
276    /// Check if YAML is in ODCS v3.0.x format.
277    fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
278        if let Some(obj) = data.as_object() {
279            let has_api_version = obj.contains_key("apiVersion");
280            let has_kind = obj
281                .get("kind")
282                .and_then(|v| v.as_str())
283                .map(|s| s == "DataContract")
284                .unwrap_or(false);
285            let has_id = obj.contains_key("id");
286            let has_version = obj.contains_key("version");
287            return has_api_version && has_kind && has_id && has_version;
288        }
289        false
290    }
291
292    /// Check if YAML is in Data Contract specification format.
293    fn is_data_contract_format(&self, data: &JsonValue) -> bool {
294        if let Some(obj) = data.as_object() {
295            let has_spec = obj.contains_key("dataContractSpecification");
296            let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
297            return has_spec && has_models;
298        }
299        false
300    }
301
302    /// Parse simple ODCL format.
303    fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
304        let mut errors = Vec::new();
305
306        // Extract table name
307        let name = data
308            .get("name")
309            .and_then(|v| v.as_str())
310            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
311            .to_string();
312
313        // Extract columns
314        let columns_data = data
315            .get("columns")
316            .and_then(|v| v.as_array())
317            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
318
319        let mut columns = Vec::new();
320        for (idx, col_data) in columns_data.iter().enumerate() {
321            match self.parse_column(col_data) {
322                Ok(col) => columns.push(col),
323                Err(e) => {
324                    errors.push(ParserError {
325                        error_type: "column_parse_error".to_string(),
326                        field: format!("columns[{}]", idx),
327                        message: e.to_string(),
328                    });
329                }
330            }
331        }
332
333        // Extract metadata
334        let database_type = self.extract_database_type(data);
335        let medallion_layers = self.extract_medallion_layers(data);
336        let scd_pattern = self.extract_scd_pattern(data);
337        let data_vault_classification = self.extract_data_vault_classification(data);
338        let quality_rules = self.extract_quality_rules(data);
339
340        // Validate pattern exclusivity
341        if scd_pattern.is_some() && data_vault_classification.is_some() {
342            errors.push(ParserError {
343                error_type: "validation_error".to_string(),
344                field: "patterns".to_string(),
345                message: "SCD pattern and Data Vault classification are mutually exclusive"
346                    .to_string(),
347            });
348        }
349
350        // Extract odcl_metadata
351        let mut odcl_metadata = HashMap::new();
352        if let Some(metadata) = data.get("odcl_metadata")
353            && let Some(obj) = metadata.as_object()
354        {
355            for (key, value) in obj {
356                odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
357            }
358        }
359
360        let table_uuid = self.extract_table_uuid(data);
361
362        let table = Table {
363            id: table_uuid,
364            name,
365            columns,
366            database_type,
367            catalog_name: None,
368            schema_name: None,
369            medallion_layers,
370            scd_pattern,
371            data_vault_classification,
372            modeling_level: None,
373            tags: Vec::<Tag>::new(),
374            odcl_metadata,
375            owner: None,
376            sla: None,
377            contact_details: None,
378            infrastructure_type: None,
379            notes: None,
380            position: None,
381            yaml_file_path: None,
382            drawio_cell_id: None,
383            quality: quality_rules,
384            errors: Vec::new(),
385            created_at: chrono::Utc::now(),
386            updated_at: chrono::Utc::now(),
387        };
388
389        info!("Parsed ODCL table: {}", table.name);
390        Ok((table, errors))
391    }
392
393    /// Parse a single column definition.
394    fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
395        let name = col_data
396            .get("name")
397            .and_then(|v| v.as_str())
398            .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
399            .to_string();
400
401        let data_type = col_data
402            .get("data_type")
403            .and_then(|v| v.as_str())
404            .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
405            .to_string();
406
407        // Normalize data_type to uppercase (preserve STRUCT<...> format)
408        let data_type = normalize_data_type(&data_type);
409
410        let nullable = col_data
411            .get("nullable")
412            .and_then(|v| v.as_bool())
413            .unwrap_or(true);
414
415        let primary_key = col_data
416            .get("primary_key")
417            .and_then(|v| v.as_bool())
418            .unwrap_or(false);
419
420        let foreign_key = col_data
421            .get("foreign_key")
422            .and_then(|v| self.parse_foreign_key(v));
423
424        let constraints = col_data
425            .get("constraints")
426            .and_then(|v| v.as_array())
427            .map(|arr| {
428                arr.iter()
429                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
430                    .collect()
431            })
432            .unwrap_or_default();
433
434        let description = col_data
435            .get("description")
436            .and_then(|v| v.as_str())
437            .map(|s| s.to_string())
438            .unwrap_or_default();
439
440        // Extract column-level quality rules
441        let mut column_quality_rules = Vec::new();
442        if let Some(quality_val) = col_data.get("quality") {
443            if let Some(arr) = quality_val.as_array() {
444                // Array of quality rules
445                for item in arr {
446                    if let Some(obj) = item.as_object() {
447                        let mut rule = HashMap::new();
448                        for (key, value) in obj {
449                            rule.insert(key.clone(), json_value_to_serde_value(value));
450                        }
451                        column_quality_rules.push(rule);
452                    }
453                }
454            } else if let Some(obj) = quality_val.as_object() {
455                // Single quality rule object
456                let mut rule = HashMap::new();
457                for (key, value) in obj {
458                    rule.insert(key.clone(), json_value_to_serde_value(value));
459                }
460                column_quality_rules.push(rule);
461            }
462        }
463
464        // Note: ODCS v3.1.0 uses the `required` field at the property level to indicate non-nullable columns,
465        // not a quality rule. We should NOT add a "not_null" quality rule as it's not a valid ODCS quality type.
466        // The `nullable` field will be converted to `required` during export.
467
468        Ok(Column {
469            name,
470            data_type,
471            nullable,
472            primary_key,
473            foreign_key,
474            constraints,
475            description,
476            quality: column_quality_rules,
477            ..Default::default()
478        })
479    }
480
481    /// Parse foreign key from JSON value.
482    fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
483        let obj = fk_data.as_object()?;
484        Some(ForeignKey {
485            table_id: obj
486                .get("table_id")
487                .or_else(|| obj.get("table"))
488                .and_then(|v| v.as_str())
489                .unwrap_or("")
490                .to_string(),
491            column_name: obj
492                .get("column_name")
493                .or_else(|| obj.get("column"))
494                .and_then(|v| v.as_str())
495                .unwrap_or("")
496                .to_string(),
497        })
498    }
499
500    /// Extract database type from data.
501    fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
502        data.get("database_type")
503            .and_then(|v| v.as_str())
504            .and_then(|s| match s.to_uppercase().as_str() {
505                "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
506                "MYSQL" => Some(DatabaseType::Mysql),
507                "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
508                "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
509                "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
510                _ => None,
511            })
512    }
513
514    /// Extract medallion layers from data.
515    fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
516        let mut layers = Vec::new();
517
518        // Check plural form first
519        if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
520            for item in arr {
521                if let Some(s) = item.as_str()
522                    && let Ok(layer) = parse_medallion_layer(s)
523                {
524                    layers.push(layer);
525                }
526            }
527        }
528        // Check singular form (backward compatibility)
529        else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
530            && let Ok(layer) = parse_medallion_layer(s)
531        {
532            layers.push(layer);
533        }
534
535        layers
536    }
537
538    /// Extract SCD pattern from data.
539    fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
540        data.get("scd_pattern")
541            .and_then(|v| v.as_str())
542            .and_then(|s| parse_scd_pattern(s).ok())
543    }
544
545    /// Extract Data Vault classification from data.
546    fn extract_data_vault_classification(
547        &self,
548        data: &JsonValue,
549    ) -> Option<DataVaultClassification> {
550        data.get("data_vault_classification")
551            .and_then(|v| v.as_str())
552            .and_then(|s| parse_data_vault_classification(s).ok())
553    }
554
555    /// Extract quality rules from data.
556    fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
557        use serde_json::Value;
558        let mut quality_rules = Vec::new();
559
560        // Check for quality field at root level (array of objects or single object)
561        if let Some(quality_val) = data.get("quality") {
562            if let Some(arr) = quality_val.as_array() {
563                // Array of quality rules
564                for item in arr {
565                    if let Some(obj) = item.as_object() {
566                        let mut rule = HashMap::new();
567                        for (key, value) in obj {
568                            rule.insert(key.clone(), json_value_to_serde_value(value));
569                        }
570                        quality_rules.push(rule);
571                    }
572                }
573            } else if let Some(obj) = quality_val.as_object() {
574                // Single quality rule object
575                let mut rule = HashMap::new();
576                for (key, value) in obj {
577                    rule.insert(key.clone(), json_value_to_serde_value(value));
578                }
579                quality_rules.push(rule);
580            } else if let Some(s) = quality_val.as_str() {
581                // Simple string quality value
582                let mut rule = HashMap::new();
583                rule.insert("value".to_string(), Value::String(s.to_string()));
584                quality_rules.push(rule);
585            }
586        }
587
588        // Check for quality in metadata (ODCL v3 format)
589        if let Some(metadata) = data.get("metadata")
590            && let Some(metadata_obj) = metadata.as_object()
591            && let Some(quality_val) = metadata_obj.get("quality")
592        {
593            if let Some(arr) = quality_val.as_array() {
594                // Array of quality rules
595                for item in arr {
596                    if let Some(obj) = item.as_object() {
597                        let mut rule = HashMap::new();
598                        for (key, value) in obj {
599                            rule.insert(key.clone(), json_value_to_serde_value(value));
600                        }
601                        quality_rules.push(rule);
602                    }
603                }
604            } else if let Some(obj) = quality_val.as_object() {
605                // Single quality rule object
606                let mut rule = HashMap::new();
607                for (key, value) in obj {
608                    rule.insert(key.clone(), json_value_to_serde_value(value));
609                }
610                quality_rules.push(rule);
611            } else if let Some(s) = quality_val.as_str() {
612                // Simple string quality value
613                let mut rule = HashMap::new();
614                rule.insert("value".to_string(), Value::String(s.to_string()));
615                quality_rules.push(rule);
616            }
617        }
618
619        // Check for tblproperties field (similar to SQL TBLPROPERTIES)
620        if let Some(tblprops) = data.get("tblproperties")
621            && let Some(obj) = tblprops.as_object()
622        {
623            for (key, value) in obj {
624                let mut rule = HashMap::new();
625                rule.insert("property".to_string(), Value::String(key.clone()));
626                rule.insert("value".to_string(), json_value_to_serde_value(value));
627                quality_rules.push(rule);
628            }
629        }
630
631        quality_rules
632    }
633
634    /// Parse Liquibase YAML changelog format.
635    ///
636    /// Extracts the first `createTable` change from a Liquibase changelog.
637    /// Supports both direct column definitions and nested column structures.
638    ///
639    /// # Supported Format
640    ///
641    fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
642        // Supported Liquibase YAML changelog format:
643        // databaseChangeLog:
644        //   - changeSet:
645        //       - createTable:
646        //           tableName: my_table
647        //           columns:
648        //             - column:
649        //                 name: id
650        //                 type: int
651        //                 constraints:
652        //                   primaryKey: true
653        //                   nullable: false
654
655        let mut errors = Vec::new();
656
657        let changelog = data
658            .get("databaseChangeLog")
659            .and_then(|v| v.as_array())
660            .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
661
662        // Find first createTable change.
663        let mut table_name: Option<String> = None;
664        let mut columns: Vec<crate::models::column::Column> = Vec::new();
665
666        for entry in changelog {
667            // Entries are typically maps like { changeSet: [...] } or { changeSet: { changes: [...] } }
668            if let Some(change_set) = entry.get("changeSet") {
669                // Liquibase YAML can represent changeSet as map or list.
670                let changes = if let Some(obj) = change_set.as_object() {
671                    obj.get("changes")
672                        .and_then(|v| v.as_array())
673                        .cloned()
674                        .unwrap_or_default()
675                } else if let Some(arr) = change_set.as_array() {
676                    // Some variants encode changes directly as list entries inside changeSet.
677                    arr.clone()
678                } else {
679                    Vec::new()
680                };
681
682                for ch in changes {
683                    let create = ch.get("createTable").or_else(|| ch.get("create_table"));
684                    if let Some(create) = create {
685                        table_name = create
686                            .get("tableName")
687                            .or_else(|| create.get("table_name"))
688                            .and_then(|v| v.as_str())
689                            .map(|s| s.to_string());
690
691                        // columns: [ { column: { ... } }, ... ]
692                        if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
693                            for col_entry in cols {
694                                let col = col_entry.get("column").unwrap_or(col_entry);
695                                let name = col
696                                    .get("name")
697                                    .and_then(|v| v.as_str())
698                                    .unwrap_or("")
699                                    .to_string();
700                                let data_type = col
701                                    .get("type")
702                                    .and_then(|v| v.as_str())
703                                    .unwrap_or("")
704                                    .to_string();
705
706                                if name.is_empty() {
707                                    errors.push(ParserError {
708                                        error_type: "validation_error".to_string(),
709                                        field: "columns.name".to_string(),
710                                        message: "Liquibase createTable column missing name"
711                                            .to_string(),
712                                    });
713                                    continue;
714                                }
715
716                                let mut column =
717                                    crate::models::column::Column::new(name, data_type);
718
719                                if let Some(constraints) =
720                                    col.get("constraints").and_then(|v| v.as_object())
721                                {
722                                    if let Some(pk) =
723                                        constraints.get("primaryKey").and_then(|v| v.as_bool())
724                                    {
725                                        column.primary_key = pk;
726                                    }
727                                    if let Some(nullable) =
728                                        constraints.get("nullable").and_then(|v| v.as_bool())
729                                    {
730                                        column.nullable = nullable;
731                                    }
732                                }
733
734                                columns.push(column);
735                            }
736                        }
737
738                        // parse_table() returns a single Table, so we parse the first createTable.
739                        // If multiple tables are needed, call parse_table() multiple times or use import().
740                        break;
741                    }
742                }
743            }
744            if table_name.is_some() {
745                break;
746            }
747        }
748
749        let table_name = table_name
750            .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
751        let table = Table::new(table_name, columns);
752        // Preserve any errors collected.
753        Ok((table, errors))
754    }
755
756    /// Parse ODCS v3.0.x format.
757    fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
758        let mut errors = Vec::new();
759
760        // Extract table name
761        let table_name = data
762            .get("name")
763            .and_then(|v| v.as_str())
764            .map(|s| s.to_string())
765            .or_else(|| {
766                // Try to get from schema array
767                data.get("schema")
768                    .and_then(|v| v.as_array())
769                    .and_then(|arr| arr.first())
770                    .and_then(|obj| obj.as_object())
771                    .and_then(|obj| obj.get("name"))
772                    .and_then(|v| v.as_str())
773                    .map(|s| s.to_string())
774            })
775            .ok_or_else(|| {
776                anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
777            })?;
778
779        // Extract schema - ODCS v3.0.x uses array of SchemaObject
780        let schema = data
781            .get("schema")
782            .and_then(|v| v.as_array())
783            .ok_or_else(|| {
784                errors.push(ParserError {
785                    error_type: "validation_error".to_string(),
786                    field: "schema".to_string(),
787                    message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
788                });
789                anyhow::anyhow!("Missing schema")
790            });
791
792        let schema = match schema {
793            Ok(s) if s.is_empty() => {
794                errors.push(ParserError {
795                    error_type: "validation_error".to_string(),
796                    field: "schema".to_string(),
797                    message: "ODCS v3.0.x schema array is empty".to_string(),
798                });
799                let quality_rules = self.extract_quality_rules(data);
800                let table_uuid = self.extract_table_uuid(data);
801                let table = Table {
802                    id: table_uuid,
803                    name: table_name,
804                    columns: Vec::new(),
805                    database_type: None,
806                    catalog_name: None,
807                    schema_name: None,
808                    medallion_layers: Vec::new(),
809                    scd_pattern: None,
810                    data_vault_classification: None,
811                    modeling_level: None,
812                    tags: Vec::<Tag>::new(),
813                    odcl_metadata: HashMap::new(),
814                    owner: None,
815                    sla: None,
816                    contact_details: None,
817                    infrastructure_type: None,
818                    notes: None,
819                    position: None,
820                    yaml_file_path: None,
821                    drawio_cell_id: None,
822                    quality: quality_rules,
823                    errors: Vec::new(),
824                    created_at: chrono::Utc::now(),
825                    updated_at: chrono::Utc::now(),
826                };
827                return Ok((table, errors));
828            }
829            Ok(s) => s,
830            Err(_) => {
831                let quality_rules = self.extract_quality_rules(data);
832                let table_uuid = self.extract_table_uuid(data);
833                let table = Table {
834                    id: table_uuid,
835                    name: table_name,
836                    columns: Vec::new(),
837                    database_type: None,
838                    catalog_name: None,
839                    schema_name: None,
840                    medallion_layers: Vec::new(),
841                    scd_pattern: None,
842                    data_vault_classification: None,
843                    modeling_level: None,
844                    tags: Vec::<Tag>::new(),
845                    odcl_metadata: HashMap::new(),
846                    owner: None,
847                    sla: None,
848                    contact_details: None,
849                    infrastructure_type: None,
850                    notes: None,
851                    position: None,
852                    yaml_file_path: None,
853                    drawio_cell_id: None,
854                    quality: quality_rules,
855                    errors: Vec::new(),
856                    created_at: chrono::Utc::now(),
857                    updated_at: chrono::Utc::now(),
858                };
859                return Ok((table, errors));
860            }
861        };
862
863        // Get the first schema object (table)
864        let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
865            errors.push(ParserError {
866                error_type: "validation_error".to_string(),
867                field: "schema[0]".to_string(),
868                message: "First schema object must be a dictionary".to_string(),
869            });
870            anyhow::anyhow!("Invalid schema object")
871        })?;
872
873        let object_name = schema_object
874            .get("name")
875            .and_then(|v| v.as_str())
876            .unwrap_or(&table_name);
877
878        // Handle both object format (v3.0.x) and array format (v3.1.0) for properties
879        let mut columns = Vec::new();
880
881        if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
882            // Object format (v3.0.x): properties is a map of name -> property object
883            for (prop_name, prop_data) in properties_obj {
884                if let Some(prop_obj) = prop_data.as_object() {
885                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
886                        Ok(mut cols) => columns.append(&mut cols),
887                        Err(e) => {
888                            errors.push(ParserError {
889                                error_type: "property_parse_error".to_string(),
890                                field: format!("Property '{}'", prop_name),
891                                message: e.to_string(),
892                            });
893                        }
894                    }
895                } else {
896                    errors.push(ParserError {
897                        error_type: "validation_error".to_string(),
898                        field: format!("Property '{}'", prop_name),
899                        message: format!("Property '{}' must be an object", prop_name),
900                    });
901                }
902            }
903        } else if let Some(properties_arr) =
904            schema_object.get("properties").and_then(|v| v.as_array())
905        {
906            // Array format (v3.1.0): properties is an array of property objects with 'name' field
907            for (idx, prop_data) in properties_arr.iter().enumerate() {
908                if let Some(prop_obj) = prop_data.as_object() {
909                    // Extract name from property object (required in v3.1.0)
910                    // ODCS v3.1.0 requires 'name' field, but we'll also accept 'id' as fallback
911                    let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
912                        Some(JsonValue::String(s)) => s.as_str(),
913                        _ => {
914                            // Skip properties without name or id (not valid ODCS v3.1.0)
915                            errors.push(ParserError {
916                                error_type: "validation_error".to_string(),
917                                field: format!("Property[{}]", idx),
918                                message: format!(
919                                    "Property[{}] missing required 'name' or 'id' field",
920                                    idx
921                                ),
922                            });
923                            continue;
924                        }
925                    };
926
927                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
928                        Ok(mut cols) => columns.append(&mut cols),
929                        Err(e) => {
930                            errors.push(ParserError {
931                                error_type: "property_parse_error".to_string(),
932                                field: format!("Property[{}] '{}'", idx, prop_name),
933                                message: e.to_string(),
934                            });
935                        }
936                    }
937                } else {
938                    errors.push(ParserError {
939                        error_type: "validation_error".to_string(),
940                        field: format!("Property[{}]", idx),
941                        message: format!("Property[{}] must be an object", idx),
942                    });
943                }
944            }
945        } else {
946            errors.push(ParserError {
947                error_type: "validation_error".to_string(),
948                field: format!("Object '{}'", object_name),
949                message: format!(
950                    "Object '{}' missing 'properties' field or properties is invalid",
951                    object_name
952                ),
953            });
954        }
955
956        // Extract metadata from customProperties
957        let (medallion_layers, scd_pattern, data_vault_classification, mut tags): (
958            _,
959            _,
960            _,
961            Vec<Tag>,
962        ) = self.extract_metadata_from_custom_properties(data);
963
964        // Extract sharedDomains from customProperties
965        let mut shared_domains: Vec<String> = Vec::new();
966        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
967            for prop in custom_props {
968                if let Some(prop_obj) = prop.as_object() {
969                    let prop_key = prop_obj
970                        .get("property")
971                        .and_then(|v| v.as_str())
972                        .unwrap_or("");
973                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
974                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
975                    {
976                        for item in arr {
977                            if let Some(s) = item.as_str() {
978                                shared_domains.push(s.to_string());
979                            }
980                        }
981                    }
982                }
983            }
984        }
985
986        // Extract tags from top-level tags field (if not already extracted from customProperties)
987        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
988            for item in tags_arr {
989                if let Some(s) = item.as_str() {
990                    // Parse tag string to Tag enum
991                    let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
992                    if !tags.contains(&tag) {
993                        tags.push(tag);
994                    }
995                }
996            }
997        }
998
999        // Extract database type from servers
1000        let database_type = self.extract_database_type_from_odcl_v3_servers(data);
1001
1002        // Extract quality rules
1003        let quality_rules = self.extract_quality_rules(data);
1004
1005        // Build ODCL metadata
1006        let mut odcl_metadata = HashMap::new();
1007        odcl_metadata.insert(
1008            "apiVersion".to_string(),
1009            json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
1010        );
1011        odcl_metadata.insert(
1012            "kind".to_string(),
1013            json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
1014        );
1015        odcl_metadata.insert(
1016            "id".to_string(),
1017            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1018        );
1019        odcl_metadata.insert(
1020            "version".to_string(),
1021            json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
1022        );
1023        odcl_metadata.insert(
1024            "status".to_string(),
1025            json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
1026        );
1027
1028        // Extract servicelevels if present
1029        if let Some(servicelevels_val) = data.get("servicelevels") {
1030            odcl_metadata.insert(
1031                "servicelevels".to_string(),
1032                json_value_to_serde_value(servicelevels_val),
1033            );
1034        }
1035
1036        // Extract links if present
1037        if let Some(links_val) = data.get("links") {
1038            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1039        }
1040
1041        // Extract domain, dataProduct, tenant
1042        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1043            odcl_metadata.insert(
1044                "domain".to_string(),
1045                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1046            );
1047        }
1048        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1049            odcl_metadata.insert(
1050                "dataProduct".to_string(),
1051                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1052            );
1053        }
1054        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1055            odcl_metadata.insert(
1056                "tenant".to_string(),
1057                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1058            );
1059        }
1060
1061        // Extract top-level description (can be object or string)
1062        if let Some(desc_val) = data.get("description") {
1063            odcl_metadata.insert(
1064                "description".to_string(),
1065                json_value_to_serde_value(desc_val),
1066            );
1067        }
1068
1069        // Extract pricing
1070        if let Some(pricing_val) = data.get("pricing") {
1071            odcl_metadata.insert(
1072                "pricing".to_string(),
1073                json_value_to_serde_value(pricing_val),
1074            );
1075        }
1076
1077        // Extract team
1078        if let Some(team_val) = data.get("team") {
1079            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1080        }
1081
1082        // Extract roles
1083        if let Some(roles_val) = data.get("roles") {
1084            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1085        }
1086
1087        // Extract terms
1088        if let Some(terms_val) = data.get("terms") {
1089            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1090        }
1091
1092        // Extract full servers array (not just type)
1093        if let Some(servers_val) = data.get("servers") {
1094            odcl_metadata.insert(
1095                "servers".to_string(),
1096                json_value_to_serde_value(servers_val),
1097            );
1098        }
1099
1100        // Extract infrastructure
1101        if let Some(infrastructure_val) = data.get("infrastructure") {
1102            odcl_metadata.insert(
1103                "infrastructure".to_string(),
1104                json_value_to_serde_value(infrastructure_val),
1105            );
1106        }
1107
1108        // Store sharedDomains in metadata (extracted from customProperties above)
1109        if !shared_domains.is_empty() {
1110            let shared_domains_json: Vec<serde_json::Value> = shared_domains
1111                .iter()
1112                .map(|d| serde_json::Value::String(d.clone()))
1113                .collect();
1114            odcl_metadata.insert(
1115                "sharedDomains".to_string(),
1116                serde_json::Value::Array(shared_domains_json),
1117            );
1118        }
1119
1120        let table_uuid = self.extract_table_uuid(data);
1121
1122        let table = Table {
1123            id: table_uuid,
1124            name: table_name,
1125            columns,
1126            database_type,
1127            catalog_name: None,
1128            schema_name: None,
1129            medallion_layers,
1130            scd_pattern,
1131            data_vault_classification,
1132            modeling_level: None,
1133            tags,
1134            odcl_metadata,
1135            owner: None,
1136            sla: None,
1137            contact_details: None,
1138            infrastructure_type: None,
1139            notes: None,
1140            position: None,
1141            yaml_file_path: None,
1142            drawio_cell_id: None,
1143            quality: quality_rules,
1144            errors: Vec::new(),
1145            created_at: chrono::Utc::now(),
1146            updated_at: chrono::Utc::now(),
1147        };
1148
1149        info!(
1150            "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1151            table.name,
1152            errors.len()
1153        );
1154        Ok((table, errors))
1155    }
1156
1157    /// Parse a single property from ODCS v3.0.x format (similar to Data Contract field).
1158    fn parse_odcl_v3_property(
1159        &self,
1160        prop_name: &str,
1161        prop_data: &serde_json::Map<String, JsonValue>,
1162        data: &JsonValue,
1163    ) -> Result<Vec<Column>> {
1164        // Reuse Data Contract field parsing logic (they're similar)
1165        let mut errors = Vec::new();
1166        self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
1167    }
1168
1169    /// Extract table UUID from ODCS `id` field (standard) or fallback to customProperties/odcl_metadata (backward compatibility).
1170    /// Returns the UUID if found, otherwise generates a new one.
1171    fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
1172        // First check the top-level `id` field (ODCS spec: "A unique identifier used to reduce the risk of dataset name collisions, such as a UUID.")
1173        if let Some(id_val) = data.get("id")
1174            && let Some(id_str) = id_val.as_str()
1175        {
1176            if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1177                tracing::debug!(
1178                    "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
1179                    uuid
1180                );
1181                return uuid;
1182            } else {
1183                tracing::warn!(
1184                    "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
1185                    id_str
1186                );
1187            }
1188        }
1189
1190        // Backward compatibility: check customProperties for tableUuid (legacy format)
1191        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1192            for prop in custom_props {
1193                if let Some(prop_obj) = prop.as_object() {
1194                    let prop_key = prop_obj
1195                        .get("property")
1196                        .and_then(|v| v.as_str())
1197                        .unwrap_or("");
1198                    if prop_key == "tableUuid"
1199                        && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
1200                        && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1201                    {
1202                        tracing::debug!(
1203                            "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
1204                            uuid
1205                        );
1206                        return uuid;
1207                    }
1208                }
1209            }
1210        }
1211
1212        // Fallback: check odcl_metadata if present (legacy format)
1213        if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
1214            && let Some(uuid_val) = metadata.get("tableUuid")
1215            && let Some(uuid_str) = uuid_val.as_str()
1216            && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1217        {
1218            tracing::debug!(
1219                "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
1220                uuid
1221            );
1222            return uuid;
1223        }
1224
1225        // Generate deterministic UUID v5 if not found (based on table name)
1226        let table_name = data
1227            .get("name")
1228            .and_then(|v| v.as_str())
1229            .unwrap_or("unknown");
1230        let new_uuid = crate::models::table::Table::generate_id(
1231            table_name, None, // database_type not available here
1232            None, // catalog_name not available here
1233            None, // schema_name not available here
1234        );
1235        tracing::warn!(
1236            "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
1237            table_name,
1238            new_uuid
1239        );
1240        new_uuid
1241    }
1242
1243    /// Extract metadata from customProperties in ODCS v3.0.x format.
1244    fn extract_metadata_from_custom_properties(
1245        &self,
1246        data: &JsonValue,
1247    ) -> (
1248        Vec<MedallionLayer>,
1249        Option<SCDPattern>,
1250        Option<DataVaultClassification>,
1251        Vec<Tag>,
1252    ) {
1253        let mut medallion_layers = Vec::new();
1254        let mut scd_pattern = None;
1255        let mut data_vault_classification = None;
1256        let mut tags: Vec<Tag> = Vec::new();
1257
1258        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1259            for prop in custom_props {
1260                if let Some(prop_obj) = prop.as_object() {
1261                    let prop_key = prop_obj
1262                        .get("property")
1263                        .and_then(|v| v.as_str())
1264                        .unwrap_or("");
1265                    let prop_value = prop_obj.get("value");
1266
1267                    match prop_key {
1268                        "medallionLayers" | "medallion_layers" => {
1269                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1270                                for item in arr {
1271                                    if let Some(s) = item.as_str()
1272                                        && let Ok(layer) = parse_medallion_layer(s)
1273                                    {
1274                                        medallion_layers.push(layer);
1275                                    }
1276                                }
1277                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1278                                // Comma-separated string
1279                                for part in s.split(',') {
1280                                    if let Ok(layer) = parse_medallion_layer(part.trim()) {
1281                                        medallion_layers.push(layer);
1282                                    }
1283                                }
1284                            }
1285                        }
1286                        "scdPattern" | "scd_pattern" => {
1287                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1288                                scd_pattern = parse_scd_pattern(s).ok();
1289                            }
1290                        }
1291                        "dataVaultClassification" | "data_vault_classification" => {
1292                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1293                                data_vault_classification = parse_data_vault_classification(s).ok();
1294                            }
1295                        }
1296                        "tags" => {
1297                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1298                                for item in arr {
1299                                    if let Some(s) = item.as_str() {
1300                                        // Parse tag string to Tag enum
1301                                        if let Ok(tag) = Tag::from_str(s) {
1302                                            tags.push(tag);
1303                                        } else {
1304                                            tags.push(Tag::Simple(s.to_string()));
1305                                        }
1306                                    }
1307                                }
1308                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1309                                // Comma-separated string
1310                                for part in s.split(',') {
1311                                    let part = part.trim();
1312                                    if let Ok(tag) = Tag::from_str(part) {
1313                                        tags.push(tag);
1314                                    } else {
1315                                        tags.push(Tag::Simple(part.to_string()));
1316                                    }
1317                                }
1318                            }
1319                        }
1320                        "sharedDomains" | "shared_domains" => {
1321                            // sharedDomains will be stored in metadata by the caller
1322                            // This match is here for completeness but sharedDomains is handled separately
1323                        }
1324                        _ => {}
1325                    }
1326                }
1327            }
1328        }
1329
1330        // Also extract tags from top-level tags field
1331        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1332            for item in tags_arr {
1333                if let Some(s) = item.as_str() {
1334                    // Parse tag string to Tag enum
1335                    let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
1336                    if !tags.contains(&tag) {
1337                        tags.push(tag);
1338                    }
1339                }
1340            }
1341        }
1342
1343        (
1344            medallion_layers,
1345            scd_pattern,
1346            data_vault_classification,
1347            tags,
1348        )
1349    }
1350
1351    /// Extract database type from servers in ODCS v3.0.x format.
1352    fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
1353        // ODCS v3.0.x: servers is an array of Server objects
1354        if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
1355            && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
1356        {
1357            return server_obj
1358                .get("type")
1359                .and_then(|v| v.as_str())
1360                .and_then(|s| self.parse_database_type(s));
1361        }
1362        None
1363    }
1364
1365    /// Parse Data Contract format.
1366    fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1367        let mut errors = Vec::new();
1368
1369        // Extract models
1370        let models = data
1371            .get("models")
1372            .and_then(|v| v.as_object())
1373            .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
1374
1375        // parse_table() returns a single Table, so we parse the first model.
1376        // If multiple models are needed, call parse_table() multiple times or use import().
1377        let (model_name, model_data) = models
1378            .iter()
1379            .next()
1380            .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
1381
1382        let model_data = model_data
1383            .as_object()
1384            .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
1385
1386        // Extract fields (columns)
1387        let fields = model_data
1388            .get("fields")
1389            .and_then(|v| v.as_object())
1390            .ok_or_else(|| {
1391                errors.push(ParserError {
1392                    error_type: "validation_error".to_string(),
1393                    field: format!("Model '{}'", model_name),
1394                    message: format!("Model '{}' missing 'fields' field", model_name),
1395                });
1396                anyhow::anyhow!("Missing fields")
1397            });
1398
1399        let fields = match fields {
1400            Ok(f) => f,
1401            Err(_) => {
1402                // Return empty table with errors
1403                let quality_rules = self.extract_quality_rules(data);
1404                let table_uuid = self.extract_table_uuid(data);
1405                let table = Table {
1406                    id: table_uuid,
1407                    name: model_name.clone(),
1408                    columns: Vec::new(),
1409                    database_type: None,
1410                    catalog_name: None,
1411                    schema_name: None,
1412                    medallion_layers: Vec::new(),
1413                    scd_pattern: None,
1414                    data_vault_classification: None,
1415                    modeling_level: None,
1416                    tags: Vec::<Tag>::new(),
1417                    odcl_metadata: HashMap::new(),
1418                    owner: None,
1419                    sla: None,
1420                    contact_details: None,
1421                    infrastructure_type: None,
1422                    notes: None,
1423                    position: None,
1424                    yaml_file_path: None,
1425                    drawio_cell_id: None,
1426                    quality: quality_rules,
1427                    errors: Vec::new(),
1428                    created_at: chrono::Utc::now(),
1429                    updated_at: chrono::Utc::now(),
1430                };
1431                return Ok((table, errors));
1432            }
1433        };
1434
1435        // Parse fields as columns
1436        let mut columns = Vec::new();
1437        for (field_name, field_data) in fields {
1438            if let Some(field_obj) = field_data.as_object() {
1439                match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
1440                    Ok(mut cols) => columns.append(&mut cols),
1441                    Err(e) => {
1442                        errors.push(ParserError {
1443                            error_type: "field_parse_error".to_string(),
1444                            field: format!("Field '{}'", field_name),
1445                            message: e.to_string(),
1446                        });
1447                    }
1448                }
1449            } else {
1450                errors.push(ParserError {
1451                    error_type: "validation_error".to_string(),
1452                    field: format!("Field '{}'", field_name),
1453                    message: format!("Field '{}' must be an object", field_name),
1454                });
1455            }
1456        }
1457
1458        // Extract metadata from info section
1459        // The frontend expects info fields to be nested under "info" key
1460        let mut odcl_metadata = HashMap::new();
1461
1462        // Extract info section and nest it properly
1463        // Convert JsonValue object to serde_json::Value::Object (Map)
1464        if let Some(info_val) = data.get("info") {
1465            // Convert JsonValue to serde_json::Value
1466            let info_json_value = json_value_to_serde_value(info_val);
1467            odcl_metadata.insert("info".to_string(), info_json_value);
1468        }
1469
1470        odcl_metadata.insert(
1471            "dataContractSpecification".to_string(),
1472            json_value_to_serde_value(
1473                data.get("dataContractSpecification")
1474                    .unwrap_or(&JsonValue::Null),
1475            ),
1476        );
1477        odcl_metadata.insert(
1478            "id".to_string(),
1479            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1480        );
1481        // Note: model_name is not added to metadata as it's redundant (it's the table name)
1482
1483        // Extract servicelevels if present
1484        if let Some(servicelevels_val) = data.get("servicelevels") {
1485            odcl_metadata.insert(
1486                "servicelevels".to_string(),
1487                json_value_to_serde_value(servicelevels_val),
1488            );
1489        }
1490
1491        // Extract links if present
1492        if let Some(links_val) = data.get("links") {
1493            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1494        }
1495
1496        // Extract domain, dataProduct, tenant
1497        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1498            odcl_metadata.insert(
1499                "domain".to_string(),
1500                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1501            );
1502        }
1503        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1504            odcl_metadata.insert(
1505                "dataProduct".to_string(),
1506                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1507            );
1508        }
1509        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1510            odcl_metadata.insert(
1511                "tenant".to_string(),
1512                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1513            );
1514        }
1515
1516        // Extract top-level description (can be object or string)
1517        if let Some(desc_val) = data.get("description") {
1518            odcl_metadata.insert(
1519                "description".to_string(),
1520                json_value_to_serde_value(desc_val),
1521            );
1522        }
1523
1524        // Extract pricing
1525        if let Some(pricing_val) = data.get("pricing") {
1526            odcl_metadata.insert(
1527                "pricing".to_string(),
1528                json_value_to_serde_value(pricing_val),
1529            );
1530        }
1531
1532        // Extract team
1533        if let Some(team_val) = data.get("team") {
1534            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1535        }
1536
1537        // Extract roles
1538        if let Some(roles_val) = data.get("roles") {
1539            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1540        }
1541
1542        // Extract terms
1543        if let Some(terms_val) = data.get("terms") {
1544            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1545        }
1546
1547        // Extract full servers array (not just type)
1548        if let Some(servers_val) = data.get("servers") {
1549            odcl_metadata.insert(
1550                "servers".to_string(),
1551                json_value_to_serde_value(servers_val),
1552            );
1553        }
1554
1555        // Extract infrastructure
1556        if let Some(infrastructure_val) = data.get("infrastructure") {
1557            odcl_metadata.insert(
1558                "infrastructure".to_string(),
1559                json_value_to_serde_value(infrastructure_val),
1560            );
1561        }
1562
1563        // Extract database type from servers if available
1564        let database_type = self.extract_database_type_from_servers(data);
1565
1566        // Extract catalog and schema from customProperties
1567        let (catalog_name, schema_name) = self.extract_catalog_schema(data);
1568
1569        // Extract sharedDomains from customProperties
1570        let mut shared_domains: Vec<String> = Vec::new();
1571        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1572            for prop in custom_props {
1573                if let Some(prop_obj) = prop.as_object() {
1574                    let prop_key = prop_obj
1575                        .get("property")
1576                        .and_then(|v| v.as_str())
1577                        .unwrap_or("");
1578                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1579                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1580                    {
1581                        for item in arr {
1582                            if let Some(s) = item.as_str() {
1583                                shared_domains.push(s.to_string());
1584                            }
1585                        }
1586                    }
1587                }
1588            }
1589        }
1590
1591        // Extract tags from top-level tags field (Data Contract format)
1592        let mut tags: Vec<Tag> = Vec::new();
1593        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1594            for item in tags_arr {
1595                if let Some(s) = item.as_str() {
1596                    // Parse tag string to Tag enum (supports Simple, Pair, List formats)
1597                    if let Ok(tag) = Tag::from_str(s) {
1598                        tags.push(tag);
1599                    } else {
1600                        // Fallback: create Simple tag if parsing fails
1601                        tags.push(crate::models::Tag::Simple(s.to_string()));
1602                    }
1603                }
1604            }
1605        }
1606
1607        // Extract quality rules
1608        let quality_rules = self.extract_quality_rules(data);
1609
1610        // Store sharedDomains in metadata
1611        if !shared_domains.is_empty() {
1612            let shared_domains_json: Vec<serde_json::Value> = shared_domains
1613                .iter()
1614                .map(|d| serde_json::Value::String(d.clone()))
1615                .collect();
1616            odcl_metadata.insert(
1617                "sharedDomains".to_string(),
1618                serde_json::Value::Array(shared_domains_json),
1619            );
1620        }
1621
1622        let table_uuid = self.extract_table_uuid(data);
1623
1624        let table = Table {
1625            id: table_uuid,
1626            name: model_name.clone(),
1627            columns,
1628            database_type,
1629            catalog_name,
1630            schema_name,
1631            medallion_layers: Vec::new(),
1632            scd_pattern: None,
1633            data_vault_classification: None,
1634            modeling_level: None,
1635            tags,
1636            odcl_metadata,
1637            owner: None,
1638            sla: None,
1639            contact_details: None,
1640            infrastructure_type: None,
1641            notes: None,
1642            position: None,
1643            yaml_file_path: None,
1644            drawio_cell_id: None,
1645            quality: quality_rules,
1646            errors: Vec::new(),
1647            created_at: chrono::Utc::now(),
1648            updated_at: chrono::Utc::now(),
1649        };
1650
1651        info!(
1652            "Parsed Data Contract table: {} with {} warnings/errors",
1653            model_name,
1654            errors.len()
1655        );
1656        Ok((table, errors))
1657    }
1658
1659    /// Parse a single field from Data Contract format.
1660    fn parse_data_contract_field(
1661        &self,
1662        field_name: &str,
1663        field_data: &serde_json::Map<String, JsonValue>,
1664        data: &JsonValue,
1665        errors: &mut Vec<ParserError>,
1666    ) -> Result<Vec<Column>> {
1667        let mut columns = Vec::new();
1668
1669        // Helper function to extract quality rules from a JSON object
1670        let extract_quality_from_obj =
1671            |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
1672                let mut quality_rules = Vec::new();
1673                if let Some(quality_val) = obj.get("quality") {
1674                    if let Some(arr) = quality_val.as_array() {
1675                        // Array of quality rules
1676                        for item in arr {
1677                            if let Some(rule_obj) = item.as_object() {
1678                                let mut rule = HashMap::new();
1679                                for (key, value) in rule_obj {
1680                                    rule.insert(key.clone(), json_value_to_serde_value(value));
1681                                }
1682                                quality_rules.push(rule);
1683                            }
1684                        }
1685                    } else if let Some(rule_obj) = quality_val.as_object() {
1686                        // Single quality rule object
1687                        let mut rule = HashMap::new();
1688                        for (key, value) in rule_obj {
1689                            rule.insert(key.clone(), json_value_to_serde_value(value));
1690                        }
1691                        quality_rules.push(rule);
1692                    }
1693                }
1694                quality_rules
1695            };
1696
1697        // Extract description from field_data (preserve empty strings)
1698        let description = field_data
1699            .get("description")
1700            .and_then(|v| v.as_str())
1701            .unwrap_or("")
1702            .to_string();
1703
1704        // Extract quality rules from field_data
1705        let mut quality_rules = extract_quality_from_obj(field_data);
1706
1707        // Check for $ref
1708        if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
1709            // Store ref_path (preserve even if definition doesn't exist)
1710            let ref_path = Some(ref_str.to_string());
1711
1712            if let Some(definition) = resolve_ref(ref_str, data) {
1713                // Merge quality rules from definition if field doesn't have any
1714
1715                // Also extract quality rules from definition and merge (if field doesn't have any)
1716                if quality_rules.is_empty() {
1717                    if let Some(def_obj) = definition.as_object() {
1718                        quality_rules = extract_quality_from_obj(def_obj);
1719                    }
1720                } else {
1721                    // Merge definition quality rules if field has some
1722                    if let Some(def_obj) = definition.as_object() {
1723                        let def_quality = extract_quality_from_obj(def_obj);
1724                        // Append definition quality rules (field-level takes precedence)
1725                        quality_rules.extend(def_quality);
1726                    }
1727                }
1728
1729                let required = field_data
1730                    .get("required")
1731                    .and_then(|v| v.as_bool())
1732                    .unwrap_or(false);
1733
1734                // Note: ODCS v3.1.0 uses the `required` field at the property level to indicate non-nullable columns,
1735                // not a quality rule. We should NOT add a "not_null" quality rule as it's not a valid ODCS quality type.
1736                // The `required` field will be set based on the `required` parameter during column creation.
1737
1738                // Check if definition is an object/struct with nested structure
1739                let has_nested = definition
1740                    .get("type")
1741                    .and_then(|v| v.as_str())
1742                    .map(|s| s == "object")
1743                    .unwrap_or(false)
1744                    || definition.get("properties").is_some()
1745                    || definition.get("fields").is_some();
1746
1747                if has_nested {
1748                    // Expand STRUCT from definition into nested columns with dot notation
1749                    if let Some(properties) =
1750                        definition.get("properties").and_then(|v| v.as_object())
1751                    {
1752                        // Recursively expand nested properties
1753                        let nested_required: Vec<String> = definition
1754                            .get("required")
1755                            .and_then(|v| v.as_array())
1756                            .map(|arr| {
1757                                arr.iter()
1758                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1759                                    .collect()
1760                            })
1761                            .unwrap_or_default();
1762
1763                        for (nested_name, nested_schema) in properties {
1764                            let nested_required_field = nested_required.contains(nested_name);
1765                            expand_nested_column(
1766                                &format!("{}.{}", field_name, nested_name),
1767                                nested_schema,
1768                                !nested_required_field,
1769                                &mut columns,
1770                                errors,
1771                            );
1772                        }
1773                    } else if let Some(fields) =
1774                        definition.get("fields").and_then(|v| v.as_object())
1775                    {
1776                        // Handle fields format (ODCL style)
1777                        for (nested_name, nested_schema) in fields {
1778                            expand_nested_column(
1779                                &format!("{}.{}", field_name, nested_name),
1780                                nested_schema,
1781                                true, // Assume nullable if not specified
1782                                &mut columns,
1783                                errors,
1784                            );
1785                        }
1786                    } else {
1787                        // Fallback: create parent column as OBJECT if we can't expand
1788                        let def_physical_type = definition
1789                            .get("physicalType")
1790                            .and_then(|v| v.as_str())
1791                            .map(|s| s.to_string());
1792                        columns.push(Column {
1793                            name: field_name.to_string(),
1794                            data_type: "OBJECT".to_string(),
1795                            physical_type: def_physical_type,
1796                            nullable: !required,
1797                            description: if description.is_empty() {
1798                                definition
1799                                    .get("description")
1800                                    .and_then(|v| v.as_str())
1801                                    .unwrap_or("")
1802                                    .to_string()
1803                            } else {
1804                                description.clone()
1805                            },
1806                            quality: quality_rules.clone(),
1807                            relationships: ref_to_relationships(&ref_path),
1808                            ..Default::default()
1809                        });
1810                    }
1811                } else {
1812                    // Simple type from definition
1813                    let def_type = definition
1814                        .get("type")
1815                        .and_then(|v| v.as_str())
1816                        .unwrap_or("STRING")
1817                        .to_uppercase();
1818
1819                    let enum_values = definition
1820                        .get("enum")
1821                        .and_then(|v| v.as_array())
1822                        .map(|arr| {
1823                            arr.iter()
1824                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
1825                                .collect()
1826                        })
1827                        .unwrap_or_default();
1828
1829                    let def_physical_type = definition
1830                        .get("physicalType")
1831                        .and_then(|v| v.as_str())
1832                        .map(|s| s.to_string());
1833
1834                    columns.push(Column {
1835                        name: field_name.to_string(),
1836                        data_type: def_type,
1837                        physical_type: def_physical_type,
1838                        nullable: !required,
1839                        description: if description.is_empty() {
1840                            definition
1841                                .get("description")
1842                                .and_then(|v| v.as_str())
1843                                .unwrap_or("")
1844                                .to_string()
1845                        } else {
1846                            description
1847                        },
1848                        quality: quality_rules,
1849                        relationships: ref_to_relationships(&ref_path),
1850                        enum_values,
1851                        ..Default::default()
1852                    });
1853                }
1854                return Ok(columns);
1855            } else {
1856                // Undefined reference - create column with error
1857                let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
1858                let mut error_map = HashMap::new();
1859                error_map.insert("type".to_string(), serde_json::json!("validation_error"));
1860                error_map.insert("field".to_string(), serde_json::json!("data_type"));
1861                error_map.insert(
1862                    "message".to_string(),
1863                    serde_json::json!(format!(
1864                        "Field '{}' references undefined definition: {}",
1865                        field_name, ref_str
1866                    )),
1867                );
1868                col_errors.push(error_map);
1869                let field_physical_type = field_data
1870                    .get("physicalType")
1871                    .and_then(|v| v.as_str())
1872                    .map(|s| s.to_string());
1873                columns.push(Column {
1874                    name: field_name.to_string(),
1875                    data_type: "OBJECT".to_string(),
1876                    physical_type: field_physical_type,
1877                    description,
1878                    errors: col_errors,
1879                    relationships: ref_to_relationships(&Some(ref_str.to_string())),
1880                    ..Default::default()
1881                });
1882                return Ok(columns);
1883            }
1884        }
1885
1886        // Extract field type - check both "logicalType" (ODCS v3.1.0) and "type" (legacy)
1887        // Default to STRING if missing
1888        let field_type_str = field_data
1889            .get("logicalType")
1890            .and_then(|v| v.as_str())
1891            .or_else(|| field_data.get("type").and_then(|v| v.as_str()))
1892            .unwrap_or("STRING");
1893
1894        // Check if type contains STRUCT definition (multiline STRUCT type)
1895        if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
1896            match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
1897                Ok(nested_cols) if !nested_cols.is_empty() => {
1898                    // We have nested columns - add parent column with full type, then nested columns
1899                    let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
1900                        "ARRAY<STRUCT<...>>".to_string()
1901                    } else {
1902                        "STRUCT<...>".to_string()
1903                    };
1904
1905                    let struct_physical_type = field_data
1906                        .get("physicalType")
1907                        .and_then(|v| v.as_str())
1908                        .map(|s| s.to_string());
1909
1910                    // Add parent column
1911                    columns.push(Column {
1912                        name: field_name.to_string(),
1913                        data_type: parent_data_type,
1914                        physical_type: struct_physical_type,
1915                        nullable: !field_data
1916                            .get("required")
1917                            .and_then(|v| v.as_bool())
1918                            .unwrap_or(false),
1919                        description: description.clone(),
1920                        quality: quality_rules.clone(),
1921                        relationships: ref_to_relationships(
1922                            &field_data
1923                                .get("$ref")
1924                                .and_then(|v| v.as_str())
1925                                .map(|s| s.to_string()),
1926                        ),
1927                        ..Default::default()
1928                    });
1929
1930                    // Add nested columns
1931                    columns.extend(nested_cols);
1932                    return Ok(columns);
1933                }
1934                Ok(_) | Err(_) => {
1935                    // If parsing fails or returns empty, fall back to using the type as-is
1936                }
1937            }
1938        }
1939
1940        let field_type = normalize_data_type(field_type_str);
1941
1942        // Handle ARRAY type
1943        if field_type == "ARRAY" {
1944            let items = field_data.get("items");
1945            if let Some(items_val) = items {
1946                if let Some(items_obj) = items_val.as_object() {
1947                    // Check if items is an object with fields (nested structure)
1948                    // Check both "logicalType" (ODCS v3.1.0) and "type" (legacy) for backward compatibility
1949                    let items_type = items_obj
1950                        .get("logicalType")
1951                        .and_then(|v| v.as_str())
1952                        .or_else(|| items_obj.get("type").and_then(|v| v.as_str()));
1953
1954                    // Normalize legacy "type" values to "logicalType" equivalents
1955                    let normalized_items_type = match items_type {
1956                        Some("object") | Some("struct") => Some("object"),
1957                        Some("array") => Some("array"),
1958                        Some("string") | Some("varchar") | Some("char") | Some("text") => {
1959                            Some("string")
1960                        }
1961                        Some("integer") | Some("int") | Some("bigint") | Some("smallint")
1962                        | Some("tinyint") => Some("integer"),
1963                        Some("number") | Some("decimal") | Some("double") | Some("float")
1964                        | Some("numeric") => Some("number"),
1965                        Some("boolean") | Some("bool") => Some("boolean"),
1966                        Some("date") => Some("date"),
1967                        Some("timestamp") | Some("datetime") => Some("timestamp"),
1968                        Some("time") => Some("time"),
1969                        other => other,
1970                    };
1971
1972                    if items_obj.get("fields").is_some()
1973                        || items_obj.get("properties").is_some()
1974                        || normalized_items_type == Some("object")
1975                    {
1976                        // Array of objects - create parent column as ARRAY<OBJECT>
1977                        let array_physical_type = field_data
1978                            .get("physicalType")
1979                            .and_then(|v| v.as_str())
1980                            .map(|s| s.to_string());
1981                        columns.push(Column {
1982                            name: field_name.to_string(),
1983                            data_type: "ARRAY<OBJECT>".to_string(),
1984                            physical_type: array_physical_type,
1985                            nullable: !field_data
1986                                .get("required")
1987                                .and_then(|v| v.as_bool())
1988                                .unwrap_or(false),
1989                            description: field_data
1990                                .get("description")
1991                                .and_then(|v| v.as_str())
1992                                .unwrap_or("")
1993                                .to_string(),
1994                            ..Default::default()
1995                        });
1996
1997                        // Extract nested fields from items.properties or items.fields if present
1998                        // Handle both object format (legacy) and array format (ODCS v3.1.0)
1999                        let properties_obj =
2000                            items_obj.get("properties").and_then(|v| v.as_object());
2001                        let properties_arr = items_obj.get("properties").and_then(|v| v.as_array());
2002                        let fields_obj = items_obj.get("fields").and_then(|v| v.as_object());
2003
2004                        if let Some(fields_map) = properties_obj.or(fields_obj) {
2005                            // Object format (legacy): properties is a map
2006                            for (nested_field_name, nested_field_data) in fields_map {
2007                                if let Some(nested_field_obj) = nested_field_data.as_object() {
2008                                    // Check both "logicalType" (ODCS v3.1.0) and "type" (legacy)
2009                                    let nested_field_type = nested_field_obj
2010                                        .get("logicalType")
2011                                        .and_then(|v| v.as_str())
2012                                        .or_else(|| {
2013                                            nested_field_obj.get("type").and_then(|v| v.as_str())
2014                                        })
2015                                        .unwrap_or("STRING");
2016
2017                                    // Recursively parse nested fields with array prefix
2018                                    let nested_col_name =
2019                                        format!("{}.[].{}", field_name, nested_field_name);
2020                                    let mut local_errors = Vec::new();
2021                                    match self.parse_data_contract_field(
2022                                        &nested_col_name,
2023                                        nested_field_obj,
2024                                        data,
2025                                        &mut local_errors,
2026                                    ) {
2027                                        Ok(mut nested_cols) => {
2028                                            // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2029                                            // Otherwise, it returns a single column
2030                                            columns.append(&mut nested_cols);
2031                                        }
2032                                        Err(_) => {
2033                                            // Fallback: create simple nested column
2034                                            let nested_physical_type = nested_field_obj
2035                                                .get("physicalType")
2036                                                .and_then(|v| v.as_str())
2037                                                .map(|s| s.to_string());
2038                                            columns.push(Column {
2039                                                name: nested_col_name,
2040                                                data_type: nested_field_type.to_uppercase(),
2041                                                physical_type: nested_physical_type,
2042                                                nullable: !nested_field_obj
2043                                                    .get("required")
2044                                                    .and_then(|v| v.as_bool())
2045                                                    .unwrap_or(false),
2046                                                description: nested_field_obj
2047                                                    .get("description")
2048                                                    .and_then(|v| v.as_str())
2049                                                    .unwrap_or("")
2050                                                    .to_string(),
2051                                                ..Default::default()
2052                                            });
2053                                        }
2054                                    }
2055                                }
2056                            }
2057                        } else if let Some(properties_list) = properties_arr {
2058                            // Array format (ODCS v3.1.0): properties is an array with 'name' field
2059                            let mut local_errors = Vec::new();
2060                            for prop_data in properties_list {
2061                                if let Some(prop_obj) = prop_data.as_object() {
2062                                    // Extract name from property object (required in v3.1.0)
2063                                    let nested_field_name = prop_obj
2064                                        .get("name")
2065                                        .or_else(|| prop_obj.get("id"))
2066                                        .and_then(|v| v.as_str())
2067                                        .unwrap_or("");
2068
2069                                    if !nested_field_name.is_empty() {
2070                                        // Recursively parse nested fields with array prefix
2071                                        let nested_col_name =
2072                                            format!("{}.[].{}", field_name, nested_field_name);
2073                                        match self.parse_data_contract_field(
2074                                            &nested_col_name,
2075                                            prop_obj,
2076                                            data,
2077                                            &mut local_errors,
2078                                        ) {
2079                                            Ok(mut nested_cols) => {
2080                                                // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2081                                                // Otherwise, it returns a single column
2082                                                columns.append(&mut nested_cols);
2083                                            }
2084                                            Err(_) => {
2085                                                // Fallback: create simple nested column
2086                                                // Check both "logicalType" (ODCS v3.1.0) and "type" (legacy)
2087                                                let nested_field_type = prop_obj
2088                                                    .get("logicalType")
2089                                                    .and_then(|v| v.as_str())
2090                                                    .or_else(|| {
2091                                                        prop_obj
2092                                                            .get("type")
2093                                                            .and_then(|v| v.as_str())
2094                                                    })
2095                                                    .unwrap_or("STRING");
2096                                                let nested_physical_type = prop_obj
2097                                                    .get("physicalType")
2098                                                    .and_then(|v| v.as_str())
2099                                                    .map(|s| s.to_string());
2100                                                columns.push(Column {
2101                                                    name: nested_col_name,
2102                                                    data_type: nested_field_type.to_uppercase(),
2103                                                    physical_type: nested_physical_type,
2104                                                    nullable: !prop_obj
2105                                                        .get("required")
2106                                                        .and_then(|v| v.as_bool())
2107                                                        .unwrap_or(false),
2108                                                    description: prop_obj
2109                                                        .get("description")
2110                                                        .and_then(|v| v.as_str())
2111                                                        .unwrap_or("")
2112                                                        .to_string(),
2113                                                    ..Default::default()
2114                                                });
2115                                            }
2116                                        }
2117                                    }
2118                                }
2119                            }
2120                        }
2121
2122                        return Ok(columns);
2123                    } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
2124                        // Array of simple type
2125                        let array_physical_type = field_data
2126                            .get("physicalType")
2127                            .and_then(|v| v.as_str())
2128                            .map(|s| s.to_string());
2129                        columns.push(Column {
2130                            name: field_name.to_string(),
2131                            data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
2132                            physical_type: array_physical_type,
2133                            nullable: !field_data
2134                                .get("required")
2135                                .and_then(|v| v.as_bool())
2136                                .unwrap_or(false),
2137                            description: description.clone(),
2138                            quality: quality_rules.clone(),
2139                            relationships: ref_to_relationships(
2140                                &field_data
2141                                    .get("$ref")
2142                                    .and_then(|v| v.as_str())
2143                                    .map(|s| s.to_string()),
2144                            ),
2145                            ..Default::default()
2146                        });
2147                        return Ok(columns);
2148                    }
2149                } else if let Some(item_type_str) = items_val.as_str() {
2150                    // Array of simple type (string)
2151                    let array_physical_type = field_data
2152                        .get("physicalType")
2153                        .and_then(|v| v.as_str())
2154                        .map(|s| s.to_string());
2155                    columns.push(Column {
2156                        name: field_name.to_string(),
2157                        data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
2158                        physical_type: array_physical_type,
2159                        nullable: !field_data
2160                            .get("required")
2161                            .and_then(|v| v.as_bool())
2162                            .unwrap_or(false),
2163                        description: description.clone(),
2164                        quality: quality_rules.clone(),
2165                        relationships: ref_to_relationships(
2166                            &field_data
2167                                .get("$ref")
2168                                .and_then(|v| v.as_str())
2169                                .map(|s| s.to_string()),
2170                        ),
2171                        ..Default::default()
2172                    });
2173                    return Ok(columns);
2174                }
2175            }
2176            // Array without items - default to ARRAY<STRING>
2177            let array_physical_type = field_data
2178                .get("physicalType")
2179                .and_then(|v| v.as_str())
2180                .map(|s| s.to_string());
2181            columns.push(Column {
2182                name: field_name.to_string(),
2183                data_type: "ARRAY<STRING>".to_string(),
2184                physical_type: array_physical_type,
2185                nullable: !field_data
2186                    .get("required")
2187                    .and_then(|v| v.as_bool())
2188                    .unwrap_or(false),
2189                description: description.clone(),
2190                quality: quality_rules.clone(),
2191                relationships: ref_to_relationships(
2192                    &field_data
2193                        .get("$ref")
2194                        .and_then(|v| v.as_str())
2195                        .map(|s| s.to_string()),
2196                ),
2197                ..Default::default()
2198            });
2199            return Ok(columns);
2200        }
2201
2202        // Check if this is a nested object with fields or properties
2203        // Note: Export saves nested columns in properties.properties, but some formats use fields
2204        // Handle both object format (legacy/ODCL) and array format (ODCS v3.1.0)
2205        let nested_fields_obj = field_data
2206            .get("properties")
2207            .and_then(|v| v.as_object())
2208            .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
2209        let nested_fields_arr = field_data.get("properties").and_then(|v| v.as_array());
2210
2211        if field_type == "OBJECT" && (nested_fields_obj.is_some() || nested_fields_arr.is_some()) {
2212            // Inline nested object - create parent column as OBJECT and extract nested fields
2213
2214            // Extract physicalType for the parent OBJECT column
2215            let parent_physical_type = field_data
2216                .get("physicalType")
2217                .and_then(|v| v.as_str())
2218                .map(|s| s.to_string());
2219
2220            // Create parent column
2221            columns.push(Column {
2222                name: field_name.to_string(),
2223                data_type: "OBJECT".to_string(),
2224                physical_type: parent_physical_type,
2225                nullable: !field_data
2226                    .get("required")
2227                    .and_then(|v| v.as_bool())
2228                    .unwrap_or(false),
2229                description: description.clone(),
2230                quality: quality_rules.clone(),
2231                relationships: ref_to_relationships(
2232                    &field_data
2233                        .get("$ref")
2234                        .and_then(|v| v.as_str())
2235                        .map(|s| s.to_string()),
2236                ),
2237                ..Default::default()
2238            });
2239
2240            // Extract nested fields recursively - handle both object and array formats
2241            if let Some(fields_obj) = nested_fields_obj {
2242                // Object format (legacy/ODCL): properties is a map of name -> schema
2243                for (nested_field_name, nested_field_data) in fields_obj {
2244                    if let Some(nested_field_obj) = nested_field_data.as_object() {
2245                        let nested_field_type = nested_field_obj
2246                            .get("logicalType")
2247                            .and_then(|v| v.as_str())
2248                            .or_else(|| nested_field_obj.get("type").and_then(|v| v.as_str()))
2249                            .unwrap_or("STRING");
2250
2251                        // Recursively parse nested fields
2252                        let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2253                        match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data)
2254                        {
2255                            Ok(mut nested_cols) => {
2256                                // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2257                                // Otherwise, it returns a single column
2258                                columns.append(&mut nested_cols);
2259                            }
2260                            Err(_) => {
2261                                // Fallback: create simple nested column
2262                                let nested_physical_type = nested_field_obj
2263                                    .get("physicalType")
2264                                    .and_then(|v| v.as_str())
2265                                    .map(|s| s.to_string());
2266                                columns.push(Column {
2267                                    name: nested_col_name,
2268                                    data_type: nested_field_type.to_uppercase(),
2269                                    physical_type: nested_physical_type,
2270                                    nullable: !nested_field_obj
2271                                        .get("required")
2272                                        .and_then(|v| v.as_bool())
2273                                        .unwrap_or(false),
2274                                    description: nested_field_obj
2275                                        .get("description")
2276                                        .and_then(|v| v.as_str())
2277                                        .unwrap_or("")
2278                                        .to_string(),
2279                                    ..Default::default()
2280                                });
2281                            }
2282                        }
2283                    }
2284                }
2285            } else if let Some(fields_arr) = nested_fields_arr {
2286                // Array format (ODCS v3.1.0): properties is an array with 'name' field
2287                for prop_data in fields_arr {
2288                    if let Some(prop_obj) = prop_data.as_object() {
2289                        // Extract name from property object (required in v3.1.0)
2290                        let nested_field_name = prop_obj
2291                            .get("name")
2292                            .or_else(|| prop_obj.get("id"))
2293                            .and_then(|v| v.as_str())
2294                            .unwrap_or("");
2295
2296                        if !nested_field_name.is_empty() {
2297                            let nested_field_type = prop_obj
2298                                .get("logicalType")
2299                                .and_then(|v| v.as_str())
2300                                .or_else(|| prop_obj.get("type").and_then(|v| v.as_str()))
2301                                .unwrap_or("STRING");
2302
2303                            // Recursively parse nested fields
2304                            let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2305                            match self.parse_odcl_v3_property(&nested_col_name, prop_obj, data) {
2306                                Ok(mut nested_cols) => {
2307                                    // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2308                                    // Otherwise, it returns a single column
2309                                    columns.append(&mut nested_cols);
2310                                }
2311                                Err(_) => {
2312                                    // Fallback: create simple nested column
2313                                    let nested_physical_type = prop_obj
2314                                        .get("physicalType")
2315                                        .and_then(|v| v.as_str())
2316                                        .map(|s| s.to_string());
2317                                    columns.push(Column {
2318                                        name: nested_col_name,
2319                                        data_type: nested_field_type.to_uppercase(),
2320                                        physical_type: nested_physical_type,
2321                                        nullable: !prop_obj
2322                                            .get("required")
2323                                            .and_then(|v| v.as_bool())
2324                                            .unwrap_or(false),
2325                                        description: prop_obj
2326                                            .get("description")
2327                                            .and_then(|v| v.as_str())
2328                                            .unwrap_or("")
2329                                            .to_string(),
2330                                        ..Default::default()
2331                                    });
2332                                }
2333                            }
2334                        }
2335                    }
2336                }
2337            }
2338
2339            return Ok(columns);
2340        }
2341
2342        // Regular field (no $ref or $ref not found)
2343        let required = field_data
2344            .get("required")
2345            .and_then(|v| v.as_bool())
2346            .unwrap_or(false);
2347
2348        // Use description extracted at function start, or extract if not yet extracted
2349        let field_description = if description.is_empty() {
2350            field_data
2351                .get("description")
2352                .and_then(|v| v.as_str())
2353                .unwrap_or("")
2354                .to_string()
2355        } else {
2356            description
2357        };
2358
2359        // Use quality_rules extracted at function start, or extract if not yet extracted
2360        let mut column_quality_rules = quality_rules;
2361
2362        // Extract column-level quality rules if not already extracted
2363        if column_quality_rules.is_empty()
2364            && let Some(quality_val) = field_data.get("quality")
2365        {
2366            if let Some(arr) = quality_val.as_array() {
2367                // Array of quality rules
2368                for item in arr {
2369                    if let Some(obj) = item.as_object() {
2370                        let mut rule = HashMap::new();
2371                        for (key, value) in obj {
2372                            rule.insert(key.clone(), json_value_to_serde_value(value));
2373                        }
2374                        column_quality_rules.push(rule);
2375                    }
2376                }
2377            } else if let Some(obj) = quality_val.as_object() {
2378                // Single quality rule object
2379                let mut rule = HashMap::new();
2380                for (key, value) in obj {
2381                    rule.insert(key.clone(), json_value_to_serde_value(value));
2382                }
2383                column_quality_rules.push(rule);
2384            }
2385        }
2386
2387        // Note: ODCS v3.1.0 uses the `required` field at the property level to indicate non-nullable columns,
2388        // not a quality rule. We should NOT add a "not_null" quality rule as it's not a valid ODCS quality type.
2389        // The `required` field will be set based on the `required` parameter during column creation.
2390
2391        // Extract physicalType (ODCS v3.1.0) - the actual database type
2392        let physical_type = field_data
2393            .get("physicalType")
2394            .and_then(|v| v.as_str())
2395            .map(|s| s.to_string());
2396
2397        columns.push(Column {
2398            name: field_name.to_string(),
2399            data_type: field_type,
2400            physical_type,
2401            nullable: !required,
2402            primary_key: field_data
2403                .get("primaryKey")
2404                .and_then(|v| v.as_bool())
2405                .unwrap_or(false),
2406            foreign_key: self.parse_foreign_key_from_data_contract(field_data),
2407            description: field_description,
2408            quality: column_quality_rules,
2409            relationships: self.parse_relationships_from_field(field_data),
2410            ..Default::default()
2411        });
2412
2413        Ok(columns)
2414    }
2415
2416    /// Parse foreign key from Data Contract field data.
2417    fn parse_foreign_key_from_data_contract(
2418        &self,
2419        field_data: &serde_json::Map<String, JsonValue>,
2420    ) -> Option<ForeignKey> {
2421        field_data
2422            .get("foreignKey")
2423            .and_then(|v| v.as_object())
2424            .map(|fk_obj| ForeignKey {
2425                table_id: fk_obj
2426                    .get("table")
2427                    .or_else(|| fk_obj.get("table_id"))
2428                    .and_then(|v| v.as_str())
2429                    .unwrap_or("")
2430                    .to_string(),
2431                column_name: fk_obj
2432                    .get("column")
2433                    .or_else(|| fk_obj.get("column_name"))
2434                    .and_then(|v| v.as_str())
2435                    .unwrap_or("")
2436                    .to_string(),
2437            })
2438    }
2439
2440    /// Parse relationships from Data Contract field data.
2441    /// Also converts $ref to a relationship entry if present.
2442    fn parse_relationships_from_field(
2443        &self,
2444        field_data: &serde_json::Map<String, JsonValue>,
2445    ) -> Vec<PropertyRelationship> {
2446        let mut relationships = Vec::new();
2447
2448        // Parse relationships array from ODCS v3.1.0 format
2449        if let Some(rels_array) = field_data.get("relationships").and_then(|v| v.as_array()) {
2450            for rel in rels_array {
2451                if let Some(rel_obj) = rel.as_object() {
2452                    let rel_type = rel_obj
2453                        .get("type")
2454                        .and_then(|v| v.as_str())
2455                        .unwrap_or("foreignKey")
2456                        .to_string();
2457                    let to = rel_obj
2458                        .get("to")
2459                        .and_then(|v| v.as_str())
2460                        .unwrap_or("")
2461                        .to_string();
2462
2463                    if !to.is_empty() {
2464                        relationships.push(PropertyRelationship {
2465                            relationship_type: rel_type,
2466                            to,
2467                        });
2468                    }
2469                }
2470            }
2471        }
2472
2473        // Convert $ref to relationship if present and no relationships were parsed
2474        if relationships.is_empty()
2475            && let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str())
2476        {
2477            // Convert $ref path to ODCS relationship format
2478            let to = if ref_str.starts_with("#/definitions/") {
2479                let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
2480                format!("definitions/{}", def_path)
2481            } else if ref_str.starts_with("#/") {
2482                ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
2483            } else {
2484                ref_str.to_string()
2485            };
2486
2487            relationships.push(PropertyRelationship {
2488                relationship_type: "foreignKey".to_string(),
2489                to,
2490            });
2491        }
2492
2493        relationships
2494    }
2495
2496    /// Extract database type from servers in Data Contract format.
2497    fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2498        // Data Contract format: servers can be object or array
2499        if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
2500            // Object format: { "server_name": { "type": "..." } }
2501            if let Some((_, server_data)) = servers_obj.iter().next()
2502                && let Some(server_obj) = server_data.as_object()
2503            {
2504                return server_obj
2505                    .get("type")
2506                    .and_then(|v| v.as_str())
2507                    .and_then(|s| self.parse_database_type(s));
2508            }
2509        } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
2510            // Array format: [ { "server": "...", "type": "..." } ]
2511            if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
2512                return server_obj
2513                    .get("type")
2514                    .and_then(|v| v.as_str())
2515                    .and_then(|s| self.parse_database_type(s));
2516            }
2517        }
2518        None
2519    }
2520
2521    /// Parse database type string to enum.
2522    fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
2523        match s.to_lowercase().as_str() {
2524            "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
2525            "postgres" | "postgresql" => Some(DatabaseType::Postgres),
2526            "mysql" => Some(DatabaseType::Mysql),
2527            "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
2528            "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
2529            _ => None,
2530        }
2531    }
2532
2533    /// Parse STRUCT type definition from string (e.g., "ARRAY<STRUCT<ID: STRING, NAME: STRING>>").
2534    /// Parse STRUCT type from string and create nested columns
2535    /// This is public so it can be used by SQL importer to parse STRUCT types
2536    pub fn parse_struct_type_from_string(
2537        &self,
2538        field_name: &str,
2539        type_str: &str,
2540        field_data: &serde_json::Map<String, JsonValue>,
2541    ) -> Result<Vec<Column>> {
2542        let mut columns = Vec::new();
2543
2544        // Normalize whitespace - replace newlines and multiple spaces with single space
2545        let normalized_type = type_str
2546            .lines()
2547            .map(|line| line.trim())
2548            .filter(|line| !line.is_empty())
2549            .collect::<Vec<_>>()
2550            .join(" ");
2551
2552        let type_str_upper = normalized_type.to_uppercase();
2553
2554        // Check if it's ARRAY<STRUCT<...>>
2555        let is_array = type_str_upper.starts_with("ARRAY<");
2556        let struct_start = type_str_upper.find("STRUCT<");
2557
2558        if let Some(start_pos) = struct_start {
2559            // Extract STRUCT content - start from STRUCT< and find its closing >
2560            // For ARRAY<STRUCT<...>>, we still need to find the STRUCT<...> closing bracket
2561            let struct_content_start = start_pos + 7; // Skip "STRUCT<"
2562            let struct_content = &normalized_type[struct_content_start..];
2563
2564            // Find matching closing bracket for STRUCT<, handling nested STRUCTs
2565            // We need to find the > that closes STRUCT<, not ARRAY<
2566            let mut depth = 1;
2567            let mut end_pos = None;
2568            for (i, ch) in struct_content.char_indices() {
2569                match ch {
2570                    '<' => depth += 1,
2571                    '>' => {
2572                        depth -= 1;
2573                        if depth == 0 {
2574                            end_pos = Some(i);
2575                            break;
2576                        }
2577                    }
2578                    _ => {}
2579                }
2580            }
2581
2582            // If no closing bracket found, try to infer it (handle malformed YAML)
2583            let struct_fields_str = if let Some(end) = end_pos {
2584                &struct_content[..end]
2585            } else {
2586                // Missing closing bracket - use everything up to the end
2587                struct_content.trim_end_matches('>').trim()
2588            };
2589
2590            // Parse fields: "ID: STRING, NAME: STRING, ..."
2591            let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
2592
2593            // Create nested columns, handling nested STRUCTs recursively
2594            // For ARRAY<STRUCT<...>>, use .[] notation for array elements: parent.[].field
2595            // For STRUCT<...>, use dot notation: parent.field
2596            for (nested_name, nested_type) in fields {
2597                let nested_type_upper = nested_type.to_uppercase();
2598                let nested_col_name = if is_array {
2599                    format!("{}.[].{}", field_name, nested_name)
2600                } else {
2601                    format!("{}.{}", field_name, nested_name)
2602                };
2603
2604                // Check if this field is itself a STRUCT or ARRAY<STRUCT>
2605                // Handle multiple levels of nesting: STRUCT<...>, ARRAY<STRUCT<...>>, ARRAY<STRUCT<...>> within STRUCT, etc.
2606                let is_nested_struct = nested_type_upper.starts_with("STRUCT<");
2607                let is_nested_array_struct = nested_type_upper.starts_with("ARRAY<STRUCT<");
2608
2609                if is_nested_struct || is_nested_array_struct {
2610                    // Recursively parse nested STRUCT or ARRAY<STRUCT>
2611                    // The nested_col_name already includes the parent path:
2612                    // - For STRUCT within ARRAY: "items.[].details"
2613                    // - For STRUCT within STRUCT: "orderInfo.metadata"
2614                    // - For ARRAY<STRUCT> within STRUCT: "parent.items"
2615                    // - For ARRAY<STRUCT> within ARRAY<STRUCT>: "parent.[].items"
2616                    // Recursive calls will correctly build the full path with proper notation
2617                    match self.parse_struct_type_from_string(
2618                        &nested_col_name,
2619                        &nested_type,
2620                        field_data,
2621                    ) {
2622                        Ok(nested_cols) => {
2623                            // Add all recursively parsed columns
2624                            // These will have names like:
2625                            // - items.[].details.name (STRUCT within ARRAY<STRUCT>)
2626                            // - orderInfo.metadata.field (STRUCT within STRUCT)
2627                            // - parent.items.[].field (ARRAY<STRUCT> within STRUCT)
2628                            // - parent.[].items.[].field (ARRAY<STRUCT> within ARRAY<STRUCT>)
2629                            columns.extend(nested_cols);
2630                        }
2631                        Err(_) => {
2632                            // If recursive parsing fails, add the parent column with STRUCT/ARRAY type
2633                            let fallback_data_type = if is_nested_array_struct {
2634                                "ARRAY<STRUCT<...>>".to_string()
2635                            } else {
2636                                "STRUCT<...>".to_string()
2637                            };
2638                            let nested_physical_type = field_data
2639                                .get("physicalType")
2640                                .and_then(|v| v.as_str())
2641                                .map(|s| s.to_string());
2642                            columns.push(Column {
2643                                name: nested_col_name,
2644                                data_type: fallback_data_type,
2645                                physical_type: nested_physical_type,
2646                                nullable: !field_data
2647                                    .get("required")
2648                                    .and_then(|v| v.as_bool())
2649                                    .unwrap_or(false),
2650                                description: field_data
2651                                    .get("description")
2652                                    .and_then(|v| v.as_str())
2653                                    .unwrap_or("")
2654                                    .to_string(),
2655                                ..Default::default()
2656                            });
2657                        }
2658                    }
2659                } else if nested_type_upper.starts_with("ARRAY<") {
2660                    // Handle ARRAY of simple types (not STRUCT) - these don't need nested columns
2661                    // But if it's ARRAY<STRUCT<...>>, it would have been caught above
2662                    let nested_physical_type = field_data
2663                        .get("physicalType")
2664                        .and_then(|v| v.as_str())
2665                        .map(|s| s.to_string());
2666                    columns.push(Column {
2667                        name: nested_col_name,
2668                        data_type: normalize_data_type(&nested_type),
2669                        physical_type: nested_physical_type,
2670                        nullable: !field_data
2671                            .get("required")
2672                            .and_then(|v| v.as_bool())
2673                            .unwrap_or(false),
2674                        description: field_data
2675                            .get("description")
2676                            .and_then(|v| v.as_str())
2677                            .unwrap_or("")
2678                            .to_string(),
2679                        ..Default::default()
2680                    });
2681                } else {
2682                    // Simple nested field (not a STRUCT)
2683                    let nested_physical_type = field_data
2684                        .get("physicalType")
2685                        .and_then(|v| v.as_str())
2686                        .map(|s| s.to_string());
2687                    columns.push(Column {
2688                        name: nested_col_name,
2689                        data_type: normalize_data_type(&nested_type),
2690                        physical_type: nested_physical_type,
2691                        nullable: !field_data
2692                            .get("required")
2693                            .and_then(|v| v.as_bool())
2694                            .unwrap_or(false),
2695                        description: field_data
2696                            .get("description")
2697                            .and_then(|v| v.as_str())
2698                            .unwrap_or("")
2699                            .to_string(),
2700                        ..Default::default()
2701                    });
2702                }
2703            }
2704
2705            return Ok(columns);
2706        }
2707
2708        // If no STRUCT found, return empty (fallback to regular parsing)
2709        Ok(Vec::new())
2710    }
2711
2712    /// Parse STRUCT fields from string (e.g., "ID: STRING, NAME: STRING").
2713    fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
2714        let mut fields = Vec::new();
2715        let mut current_field = String::new();
2716        let mut depth = 0;
2717        let mut in_string = false;
2718        let mut string_char = None;
2719
2720        for ch in fields_str.chars() {
2721            match ch {
2722                '\'' | '"' if !in_string || Some(ch) == string_char => {
2723                    if in_string {
2724                        in_string = false;
2725                        string_char = None;
2726                    } else {
2727                        in_string = true;
2728                        string_char = Some(ch);
2729                    }
2730                    current_field.push(ch);
2731                }
2732                '<' if !in_string => {
2733                    depth += 1;
2734                    current_field.push(ch);
2735                }
2736                '>' if !in_string => {
2737                    depth -= 1;
2738                    current_field.push(ch);
2739                }
2740                ',' if !in_string && depth == 0 => {
2741                    // End of current field
2742                    let trimmed = current_field.trim();
2743                    if !trimmed.is_empty()
2744                        && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2745                    {
2746                        fields.push((name, type_part));
2747                    }
2748                    current_field.clear();
2749                }
2750                _ => {
2751                    current_field.push(ch);
2752                }
2753            }
2754        }
2755
2756        // Handle last field
2757        let trimmed = current_field.trim();
2758        if !trimmed.is_empty()
2759            && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2760        {
2761            fields.push((name, type_part));
2762        }
2763
2764        Ok(fields)
2765    }
2766
2767    /// Parse a single field definition (e.g., "ID: STRING" or "DETAILS: STRUCT<...>").
2768    fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
2769        // Split by colon, but handle nested STRUCTs
2770        let colon_pos = field_def.find(':')?;
2771        let name = field_def[..colon_pos].trim().to_string();
2772        let type_part = field_def[colon_pos + 1..].trim().to_string();
2773
2774        if name.is_empty() || type_part.is_empty() {
2775            return None;
2776        }
2777
2778        Some((name, type_part))
2779    }
2780
2781    /// Extract catalog and schema from customProperties.
2782    fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
2783        let mut catalog_name = None;
2784        let mut schema_name = None;
2785
2786        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2787            for prop in custom_props {
2788                if let Some(prop_obj) = prop.as_object() {
2789                    let prop_key = prop_obj
2790                        .get("property")
2791                        .and_then(|v| v.as_str())
2792                        .unwrap_or("");
2793                    let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
2794
2795                    match prop_key {
2796                        "catalogName" | "catalog_name" => {
2797                            catalog_name = prop_value.map(|s| s.to_string());
2798                        }
2799                        "schemaName" | "schema_name" => {
2800                            schema_name = prop_value.map(|s| s.to_string());
2801                        }
2802                        _ => {}
2803                    }
2804                }
2805            }
2806        }
2807
2808        // Also check direct fields
2809        if catalog_name.is_none() {
2810            catalog_name = data
2811                .get("catalog_name")
2812                .and_then(|v| v.as_str())
2813                .map(|s| s.to_string());
2814        }
2815        if schema_name.is_none() {
2816            schema_name = data
2817                .get("schema_name")
2818                .and_then(|v| v.as_str())
2819                .map(|s| s.to_string());
2820        }
2821
2822        (catalog_name, schema_name)
2823    }
2824}
2825
2826impl Default for ODCSImporter {
2827    fn default() -> Self {
2828        Self::new()
2829    }
2830}
2831
2832#[cfg(test)]
2833mod tests {
2834    use super::*;
2835
2836    #[test]
2837    fn test_parse_simple_odcl_table() {
2838        let mut parser = ODCSImporter::new();
2839        let odcl_yaml = r#"
2840name: users
2841columns:
2842  - name: id
2843    data_type: INT
2844    nullable: false
2845    primary_key: true
2846  - name: name
2847    data_type: VARCHAR(255)
2848    nullable: false
2849database_type: Postgres
2850"#;
2851
2852        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2853        assert_eq!(table.name, "users");
2854        assert_eq!(table.columns.len(), 2);
2855        assert_eq!(table.columns[0].name, "id");
2856        assert_eq!(table.database_type, Some(DatabaseType::Postgres));
2857        assert_eq!(errors.len(), 0);
2858    }
2859
2860    #[test]
2861    fn test_parse_odcl_with_metadata() {
2862        let mut parser = ODCSImporter::new();
2863        let odcl_yaml = r#"
2864name: users
2865columns:
2866  - name: id
2867    data_type: INT
2868medallion_layer: gold
2869scd_pattern: TYPE_2
2870odcl_metadata:
2871  description: "User table"
2872  owner: "data-team"
2873"#;
2874
2875        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2876        assert_eq!(table.medallion_layers.len(), 1);
2877        assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
2878        assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
2879        if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
2880            assert_eq!(desc, "User table");
2881        }
2882        assert_eq!(errors.len(), 0);
2883    }
2884
2885    #[test]
2886    fn test_parse_odcl_with_data_vault() {
2887        let mut parser = ODCSImporter::new();
2888        let odcl_yaml = r#"
2889name: hub_customer
2890columns:
2891  - name: customer_key
2892    data_type: VARCHAR(50)
2893data_vault_classification: Hub
2894"#;
2895
2896        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2897        assert_eq!(
2898            table.data_vault_classification,
2899            Some(DataVaultClassification::Hub)
2900        );
2901        assert_eq!(errors.len(), 0);
2902    }
2903
2904    #[test]
2905    fn test_parse_invalid_odcl() {
2906        let mut parser = ODCSImporter::new();
2907        let invalid_yaml = "not: valid: yaml: structure:";
2908
2909        // Should fail to parse YAML
2910        assert!(parser.parse(invalid_yaml).is_err());
2911    }
2912
2913    #[test]
2914    fn test_parse_odcl_missing_required_fields() {
2915        let mut parser = ODCSImporter::new();
2916        let non_conformant = r#"
2917name: users
2918# Missing required columns field
2919"#;
2920
2921        // Should fail with missing columns
2922        assert!(parser.parse(non_conformant).is_err());
2923    }
2924
2925    #[test]
2926    fn test_parse_odcl_with_foreign_key() {
2927        let mut parser = ODCSImporter::new();
2928        let odcl_yaml = r#"
2929name: orders
2930columns:
2931  - name: id
2932    data_type: INT
2933    primary_key: true
2934  - name: user_id
2935    data_type: INT
2936    foreign_key:
2937      table_id: users
2938      column_name: id
2939"#;
2940
2941        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2942        assert_eq!(table.columns.len(), 2);
2943        let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
2944        assert!(user_id_col.foreign_key.is_some());
2945        assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
2946        assert_eq!(errors.len(), 0);
2947    }
2948
2949    #[test]
2950    fn test_parse_odcl_with_constraints() {
2951        let mut parser = ODCSImporter::new();
2952        let odcl_yaml = r#"
2953name: products
2954columns:
2955  - name: id
2956    data_type: INT
2957    primary_key: true
2958  - name: name
2959    data_type: VARCHAR(255)
2960    nullable: false
2961    constraints:
2962      - UNIQUE
2963      - NOT NULL
2964"#;
2965
2966        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2967        assert_eq!(table.columns.len(), 2);
2968        let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
2969        assert!(!name_col.nullable);
2970        assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
2971        assert_eq!(errors.len(), 0);
2972    }
2973}