data_modelling_sdk/import/
odcs.rs

1//! ODCS parser service for parsing Open Data Contract Standard YAML files.
2//!
3//! This service parses ODCS (Open Data Contract Standard) v3.1.0 and legacy ODCL (Data Contract Specification) YAML files
4//! and converts them to Table models. ODCL files are automatically converted to ODCS v3.1.0 format.
5//! Supports multiple formats:
6//! - ODCS v3.1.0 / v3.0.x format (apiVersion, kind, schema) - PRIMARY FORMAT
7//! - ODCL (Data Contract Specification) format (dataContractSpecification, models, definitions) - LEGACY, converted to ODCS
8//! - Simple ODCL format (name, columns) - LEGACY, converted to ODCS
9//! - Liquibase format
10
11use super::{ImportError, ImportResult, TableData};
12use crate::models::column::ForeignKey;
13use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
14use crate::models::{Column, Table, Tag};
15use anyhow::{Context, Result};
16use serde_json::Value as JsonValue;
17use std::collections::HashMap;
18use std::str::FromStr;
19use tracing::info;
20
21/// ODCS parser service for parsing Open Data Contract Standard YAML files.
22/// Handles ODCS v3.1.0 (primary format) and legacy ODCL formats (converted to ODCS).
23pub struct ODCSImporter {
24    /// Current YAML data for $ref resolution
25    current_yaml_data: Option<serde_yaml::Value>,
26}
27
28impl ODCSImporter {
29    /// Create a new ODCS parser instance.
30    ///
31    /// # Example
32    ///
33    /// ```rust
34    /// use data_modelling_sdk::import::odcs::ODCSImporter;
35    ///
36    /// let mut importer = ODCSImporter::new();
37    /// ```
38    pub fn new() -> Self {
39        Self {
40            current_yaml_data: None,
41        }
42    }
43
44    /// Import ODCS/ODCL YAML content and create Table (SDK interface).
45    ///
46    /// Supports ODCS v3.1.0 (primary), legacy ODCL formats (converted to ODCS), and Liquibase formats.
47    ///
48    /// # Arguments
49    ///
50    /// * `yaml_content` - ODCS/ODCL YAML content as a string
51    ///
52    /// # Returns
53    ///
54    /// An `ImportResult` containing the extracted table and any parse errors.
55    ///
56    /// # Example
57    ///
58    /// ```rust
59    /// use data_modelling_sdk::import::odcs::ODCSImporter;
60    ///
61    /// let mut importer = ODCSImporter::new();
62    /// let yaml = r#"
63    /// apiVersion: v3.1.0
64    /// kind: DataContract
65    /// id: 550e8400-e29b-41d4-a716-446655440000
66    /// version: 1.0.0
67    /// name: users
68    /// schema:
69    ///   fields:
70    ///     - name: id
71    ///       type: bigint
72    /// "#;
73    /// let result = importer.import(yaml).unwrap();
74    /// assert_eq!(result.tables.len(), 1);
75    /// ```
76    pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
77        match self.parse(yaml_content) {
78            Ok((table, errors)) => {
79                let sdk_tables = vec![TableData {
80                    table_index: 0,
81                    name: Some(table.name.clone()),
82                    columns: table
83                        .columns
84                        .iter()
85                        .map(|c| super::ColumnData {
86                            name: c.name.clone(),
87                            data_type: c.data_type.clone(),
88                            nullable: c.nullable,
89                            primary_key: c.primary_key,
90                            description: if c.description.is_empty() {
91                                None
92                            } else {
93                                Some(c.description.clone())
94                            },
95                            quality: if c.quality.is_empty() {
96                                None
97                            } else {
98                                Some(c.quality.clone())
99                            },
100                            ref_path: c.ref_path.clone(),
101                            enum_values: if c.enum_values.is_empty() {
102                                None
103                            } else {
104                                Some(c.enum_values.clone())
105                            },
106                        })
107                        .collect(),
108                }];
109                let sdk_errors: Vec<ImportError> = errors
110                    .iter()
111                    .map(|e| ImportError::ParseError(e.message.clone()))
112                    .collect();
113                Ok(ImportResult {
114                    tables: sdk_tables,
115                    tables_requiring_name: Vec::new(),
116                    errors: sdk_errors,
117                    ai_suggestions: None,
118                })
119            }
120            Err(e) => Err(ImportError::ParseError(e.to_string())),
121        }
122    }
123
124    /// Parse ODCS/ODCL YAML content and create Table (public method for native app use).
125    ///
126    /// This method returns the full Table object with all metadata, suitable for use in
127    /// native applications that need direct access to the parsed table structure.
128    /// For API use, prefer the `import()` method which returns ImportResult.
129    ///
130    /// # Returns
131    ///
132    /// Returns a tuple of (Table, list of errors/warnings).
133    /// Errors list is empty if parsing is successful.
134    pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
135        self.parse(yaml_content)
136    }
137
138    /// Parse ODCS/ODCL YAML content and create Table (internal method).
139    ///
140    /// Supports ODCS v3.1.0/v3.0.x (primary), ODCL Data Contract spec (legacy, converted to ODCS),
141    /// simple ODCL (legacy, converted to ODCS), and Liquibase formats.
142    ///
143    /// # Returns
144    ///
145    /// Returns a tuple of (Table, list of errors/warnings).
146    /// Errors list is empty if parsing is successful.
147    fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
148        // Errors are collected in helper functions, not here
149        let _errors: Vec<ParserError> = Vec::new();
150
151        // Parse YAML
152        let data: serde_yaml::Value =
153            serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
154
155        if data.is_null() {
156            return Err(anyhow::anyhow!("Empty YAML content"));
157        }
158
159        // Store current YAML data for $ref resolution
160        self.current_yaml_data = Some(data.clone());
161
162        // Convert to JSON Value for easier manipulation
163        let json_data = yaml_to_json_value(&data)?;
164
165        // Check format and parse accordingly
166        if self.is_liquibase_format(&json_data) {
167            return self.parse_liquibase(&json_data);
168        }
169
170        if self.is_odcl_v3_format(&json_data) {
171            return self.parse_odcl_v3(&json_data);
172        }
173
174        if self.is_data_contract_format(&json_data) {
175            return self.parse_data_contract(&json_data);
176        }
177
178        // Fall back to simple ODCL format
179        self.parse_simple_odcl(&json_data)
180    }
181
182    /// Resolve a $ref reference like '#/definitions/betAction'.
183    fn resolve_ref<'a>(&self, ref_str: &str, data: &'a JsonValue) -> Option<&'a JsonValue> {
184        if !ref_str.starts_with("#/") {
185            return None;
186        }
187
188        // Remove the leading '#/'
189        let path = &ref_str[2..];
190        let parts: Vec<&str> = path.split('/').collect();
191
192        // Navigate through the data structure
193        let mut current = data;
194        for part in parts {
195            current = current.get(part)?;
196        }
197
198        if current.is_object() {
199            Some(current)
200        } else {
201            None
202        }
203    }
204
205    /// Check if YAML is in Liquibase format.
206    fn is_liquibase_format(&self, data: &JsonValue) -> bool {
207        if data.get("databaseChangeLog").is_some() {
208            return true;
209        }
210        // Check for Liquibase-specific keys
211        if let Some(obj) = data.as_object() {
212            let obj_str = format!("{:?}", obj);
213            if obj_str.contains("changeSet") {
214                return true;
215            }
216        }
217        false
218    }
219
220    /// Check if YAML is in ODCS v3.0.x format.
221    fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
222        if let Some(obj) = data.as_object() {
223            let has_api_version = obj.contains_key("apiVersion");
224            let has_kind = obj
225                .get("kind")
226                .and_then(|v| v.as_str())
227                .map(|s| s == "DataContract")
228                .unwrap_or(false);
229            let has_id = obj.contains_key("id");
230            let has_version = obj.contains_key("version");
231            return has_api_version && has_kind && has_id && has_version;
232        }
233        false
234    }
235
236    /// Check if YAML is in Data Contract specification format.
237    fn is_data_contract_format(&self, data: &JsonValue) -> bool {
238        if let Some(obj) = data.as_object() {
239            let has_spec = obj.contains_key("dataContractSpecification");
240            let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
241            return has_spec && has_models;
242        }
243        false
244    }
245
246    /// Parse simple ODCL format.
247    fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
248        let mut errors = Vec::new();
249
250        // Extract table name
251        let name = data
252            .get("name")
253            .and_then(|v| v.as_str())
254            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
255            .to_string();
256
257        // Extract columns
258        let columns_data = data
259            .get("columns")
260            .and_then(|v| v.as_array())
261            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
262
263        let mut columns = Vec::new();
264        for (idx, col_data) in columns_data.iter().enumerate() {
265            match self.parse_column(col_data) {
266                Ok(col) => columns.push(col),
267                Err(e) => {
268                    errors.push(ParserError {
269                        error_type: "column_parse_error".to_string(),
270                        field: format!("columns[{}]", idx),
271                        message: e.to_string(),
272                    });
273                }
274            }
275        }
276
277        // Extract metadata
278        let database_type = self.extract_database_type(data);
279        let medallion_layers = self.extract_medallion_layers(data);
280        let scd_pattern = self.extract_scd_pattern(data);
281        let data_vault_classification = self.extract_data_vault_classification(data);
282        let quality_rules = self.extract_quality_rules(data);
283
284        // Validate pattern exclusivity
285        if scd_pattern.is_some() && data_vault_classification.is_some() {
286            errors.push(ParserError {
287                error_type: "validation_error".to_string(),
288                field: "patterns".to_string(),
289                message: "SCD pattern and Data Vault classification are mutually exclusive"
290                    .to_string(),
291            });
292        }
293
294        // Extract odcl_metadata
295        let mut odcl_metadata = HashMap::new();
296        if let Some(metadata) = data.get("odcl_metadata")
297            && let Some(obj) = metadata.as_object()
298        {
299            for (key, value) in obj {
300                odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
301            }
302        }
303
304        let table_uuid = self.extract_table_uuid(data);
305
306        let table = Table {
307            id: table_uuid,
308            name,
309            columns,
310            database_type,
311            catalog_name: None,
312            schema_name: None,
313            medallion_layers,
314            scd_pattern,
315            data_vault_classification,
316            modeling_level: None,
317            tags: Vec::<Tag>::new(),
318            odcl_metadata,
319            owner: None,
320            sla: None,
321            contact_details: None,
322            infrastructure_type: None,
323            notes: None,
324            position: None,
325            yaml_file_path: None,
326            drawio_cell_id: None,
327            quality: quality_rules,
328            errors: Vec::new(),
329            created_at: chrono::Utc::now(),
330            updated_at: chrono::Utc::now(),
331        };
332
333        info!("Parsed ODCL table: {}", table.name);
334        Ok((table, errors))
335    }
336
337    /// Parse a single column definition.
338    fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
339        let name = col_data
340            .get("name")
341            .and_then(|v| v.as_str())
342            .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
343            .to_string();
344
345        let data_type = col_data
346            .get("data_type")
347            .and_then(|v| v.as_str())
348            .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
349            .to_string();
350
351        // Normalize data_type to uppercase (preserve STRUCT<...> format)
352        let data_type = normalize_data_type(&data_type);
353
354        let nullable = col_data
355            .get("nullable")
356            .and_then(|v| v.as_bool())
357            .unwrap_or(true);
358
359        let primary_key = col_data
360            .get("primary_key")
361            .and_then(|v| v.as_bool())
362            .unwrap_or(false);
363
364        let foreign_key = col_data
365            .get("foreign_key")
366            .and_then(|v| self.parse_foreign_key(v));
367
368        let constraints = col_data
369            .get("constraints")
370            .and_then(|v| v.as_array())
371            .map(|arr| {
372                arr.iter()
373                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
374                    .collect()
375            })
376            .unwrap_or_default();
377
378        let description = col_data
379            .get("description")
380            .and_then(|v| v.as_str())
381            .map(|s| s.to_string())
382            .unwrap_or_default();
383
384        // Extract column-level quality rules
385        let mut column_quality_rules = Vec::new();
386        if let Some(quality_val) = col_data.get("quality") {
387            if let Some(arr) = quality_val.as_array() {
388                // Array of quality rules
389                for item in arr {
390                    if let Some(obj) = item.as_object() {
391                        let mut rule = HashMap::new();
392                        for (key, value) in obj {
393                            rule.insert(key.clone(), json_value_to_serde_value(value));
394                        }
395                        column_quality_rules.push(rule);
396                    }
397                }
398            } else if let Some(obj) = quality_val.as_object() {
399                // Single quality rule object
400                let mut rule = HashMap::new();
401                for (key, value) in obj {
402                    rule.insert(key.clone(), json_value_to_serde_value(value));
403                }
404                column_quality_rules.push(rule);
405            }
406        }
407
408        // If nullable=false (required=true), add a "not_null" quality rule if not already present
409        if !nullable {
410            let has_not_null = column_quality_rules.iter().any(|rule| {
411                rule.get("type")
412                    .and_then(|v| v.as_str())
413                    .map(|s| {
414                        s.to_lowercase().contains("not_null")
415                            || s.to_lowercase().contains("notnull")
416                    })
417                    .unwrap_or(false)
418            });
419            if !has_not_null {
420                let mut not_null_rule = HashMap::new();
421                not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
422                not_null_rule.insert(
423                    "description".to_string(),
424                    serde_json::json!("Column must not be null"),
425                );
426                column_quality_rules.push(not_null_rule);
427            }
428        }
429
430        Ok(Column {
431            name,
432            data_type,
433            nullable,
434            primary_key,
435            secondary_key: false,
436            composite_key: None,
437            foreign_key,
438            constraints,
439            description,
440            errors: Vec::new(),
441            quality: column_quality_rules,
442            ref_path: None,
443            enum_values: Vec::new(),
444            column_order: 0,
445        })
446    }
447
448    /// Parse foreign key from JSON value.
449    fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
450        let obj = fk_data.as_object()?;
451        Some(ForeignKey {
452            table_id: obj
453                .get("table_id")
454                .or_else(|| obj.get("table"))
455                .and_then(|v| v.as_str())
456                .unwrap_or("")
457                .to_string(),
458            column_name: obj
459                .get("column_name")
460                .or_else(|| obj.get("column"))
461                .and_then(|v| v.as_str())
462                .unwrap_or("")
463                .to_string(),
464        })
465    }
466
467    /// Extract database type from data.
468    fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
469        data.get("database_type")
470            .and_then(|v| v.as_str())
471            .and_then(|s| match s.to_uppercase().as_str() {
472                "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
473                "MYSQL" => Some(DatabaseType::Mysql),
474                "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
475                "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
476                "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
477                _ => None,
478            })
479    }
480
481    /// Extract medallion layers from data.
482    fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
483        let mut layers = Vec::new();
484
485        // Check plural form first
486        if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
487            for item in arr {
488                if let Some(s) = item.as_str()
489                    && let Ok(layer) = parse_medallion_layer(s)
490                {
491                    layers.push(layer);
492                }
493            }
494        }
495        // Check singular form (backward compatibility)
496        else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
497            && let Ok(layer) = parse_medallion_layer(s)
498        {
499            layers.push(layer);
500        }
501
502        layers
503    }
504
505    /// Extract SCD pattern from data.
506    fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
507        data.get("scd_pattern")
508            .and_then(|v| v.as_str())
509            .and_then(|s| parse_scd_pattern(s).ok())
510    }
511
512    /// Extract Data Vault classification from data.
513    fn extract_data_vault_classification(
514        &self,
515        data: &JsonValue,
516    ) -> Option<DataVaultClassification> {
517        data.get("data_vault_classification")
518            .and_then(|v| v.as_str())
519            .and_then(|s| parse_data_vault_classification(s).ok())
520    }
521
522    /// Extract quality rules from data.
523    fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
524        use serde_json::Value;
525        let mut quality_rules = Vec::new();
526
527        // Check for quality field at root level (array of objects or single object)
528        if let Some(quality_val) = data.get("quality") {
529            if let Some(arr) = quality_val.as_array() {
530                // Array of quality rules
531                for item in arr {
532                    if let Some(obj) = item.as_object() {
533                        let mut rule = HashMap::new();
534                        for (key, value) in obj {
535                            rule.insert(key.clone(), json_value_to_serde_value(value));
536                        }
537                        quality_rules.push(rule);
538                    }
539                }
540            } else if let Some(obj) = quality_val.as_object() {
541                // Single quality rule object
542                let mut rule = HashMap::new();
543                for (key, value) in obj {
544                    rule.insert(key.clone(), json_value_to_serde_value(value));
545                }
546                quality_rules.push(rule);
547            } else if let Some(s) = quality_val.as_str() {
548                // Simple string quality value
549                let mut rule = HashMap::new();
550                rule.insert("value".to_string(), Value::String(s.to_string()));
551                quality_rules.push(rule);
552            }
553        }
554
555        // Check for quality in metadata (ODCL v3 format)
556        if let Some(metadata) = data.get("metadata")
557            && let Some(metadata_obj) = metadata.as_object()
558            && let Some(quality_val) = metadata_obj.get("quality")
559        {
560            if let Some(arr) = quality_val.as_array() {
561                // Array of quality rules
562                for item in arr {
563                    if let Some(obj) = item.as_object() {
564                        let mut rule = HashMap::new();
565                        for (key, value) in obj {
566                            rule.insert(key.clone(), json_value_to_serde_value(value));
567                        }
568                        quality_rules.push(rule);
569                    }
570                }
571            } else if let Some(obj) = quality_val.as_object() {
572                // Single quality rule object
573                let mut rule = HashMap::new();
574                for (key, value) in obj {
575                    rule.insert(key.clone(), json_value_to_serde_value(value));
576                }
577                quality_rules.push(rule);
578            } else if let Some(s) = quality_val.as_str() {
579                // Simple string quality value
580                let mut rule = HashMap::new();
581                rule.insert("value".to_string(), Value::String(s.to_string()));
582                quality_rules.push(rule);
583            }
584        }
585
586        // Check for tblproperties field (similar to SQL TBLPROPERTIES)
587        if let Some(tblprops) = data.get("tblproperties")
588            && let Some(obj) = tblprops.as_object()
589        {
590            for (key, value) in obj {
591                let mut rule = HashMap::new();
592                rule.insert("property".to_string(), Value::String(key.clone()));
593                rule.insert("value".to_string(), json_value_to_serde_value(value));
594                quality_rules.push(rule);
595            }
596        }
597
598        quality_rules
599    }
600
601    /// Parse Liquibase YAML changelog format.
602    ///
603    /// Extracts the first `createTable` change from a Liquibase changelog.
604    /// Supports both direct column definitions and nested column structures.
605    ///
606    /// # Supported Format
607    ///
608    fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
609        // Supported Liquibase YAML changelog format:
610        // databaseChangeLog:
611        //   - changeSet:
612        //       - createTable:
613        //           tableName: my_table
614        //           columns:
615        //             - column:
616        //                 name: id
617        //                 type: int
618        //                 constraints:
619        //                   primaryKey: true
620        //                   nullable: false
621
622        let mut errors = Vec::new();
623
624        let changelog = data
625            .get("databaseChangeLog")
626            .and_then(|v| v.as_array())
627            .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
628
629        // Find first createTable change.
630        let mut table_name: Option<String> = None;
631        let mut columns: Vec<crate::models::column::Column> = Vec::new();
632
633        for entry in changelog {
634            // Entries are typically maps like { changeSet: [...] } or { changeSet: { changes: [...] } }
635            if let Some(change_set) = entry.get("changeSet") {
636                // Liquibase YAML can represent changeSet as map or list.
637                let changes = if let Some(obj) = change_set.as_object() {
638                    obj.get("changes")
639                        .and_then(|v| v.as_array())
640                        .cloned()
641                        .unwrap_or_default()
642                } else if let Some(arr) = change_set.as_array() {
643                    // Some variants encode changes directly as list entries inside changeSet.
644                    arr.clone()
645                } else {
646                    Vec::new()
647                };
648
649                for ch in changes {
650                    let create = ch.get("createTable").or_else(|| ch.get("create_table"));
651                    if let Some(create) = create {
652                        table_name = create
653                            .get("tableName")
654                            .or_else(|| create.get("table_name"))
655                            .and_then(|v| v.as_str())
656                            .map(|s| s.to_string());
657
658                        // columns: [ { column: { ... } }, ... ]
659                        if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
660                            for col_entry in cols {
661                                let col = col_entry.get("column").unwrap_or(col_entry);
662                                let name = col
663                                    .get("name")
664                                    .and_then(|v| v.as_str())
665                                    .unwrap_or("")
666                                    .to_string();
667                                let data_type = col
668                                    .get("type")
669                                    .and_then(|v| v.as_str())
670                                    .unwrap_or("")
671                                    .to_string();
672
673                                if name.is_empty() {
674                                    errors.push(ParserError {
675                                        error_type: "validation_error".to_string(),
676                                        field: "columns.name".to_string(),
677                                        message: "Liquibase createTable column missing name"
678                                            .to_string(),
679                                    });
680                                    continue;
681                                }
682
683                                let mut column =
684                                    crate::models::column::Column::new(name, data_type);
685
686                                if let Some(constraints) =
687                                    col.get("constraints").and_then(|v| v.as_object())
688                                {
689                                    if let Some(pk) =
690                                        constraints.get("primaryKey").and_then(|v| v.as_bool())
691                                    {
692                                        column.primary_key = pk;
693                                    }
694                                    if let Some(nullable) =
695                                        constraints.get("nullable").and_then(|v| v.as_bool())
696                                    {
697                                        column.nullable = nullable;
698                                    }
699                                }
700
701                                columns.push(column);
702                            }
703                        }
704
705                        // parse_table() returns a single Table, so we parse the first createTable.
706                        // If multiple tables are needed, call parse_table() multiple times or use import().
707                        break;
708                    }
709                }
710            }
711            if table_name.is_some() {
712                break;
713            }
714        }
715
716        let table_name = table_name
717            .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
718        let table = Table::new(table_name, columns);
719        // Preserve any errors collected.
720        Ok((table, errors))
721    }
722
723    /// Parse ODCS v3.0.x format.
724    fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
725        let mut errors = Vec::new();
726
727        // Extract table name
728        let table_name = data
729            .get("name")
730            .and_then(|v| v.as_str())
731            .map(|s| s.to_string())
732            .or_else(|| {
733                // Try to get from schema array
734                data.get("schema")
735                    .and_then(|v| v.as_array())
736                    .and_then(|arr| arr.first())
737                    .and_then(|obj| obj.as_object())
738                    .and_then(|obj| obj.get("name"))
739                    .and_then(|v| v.as_str())
740                    .map(|s| s.to_string())
741            })
742            .ok_or_else(|| {
743                anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
744            })?;
745
746        // Extract schema - ODCS v3.0.x uses array of SchemaObject
747        let schema = data
748            .get("schema")
749            .and_then(|v| v.as_array())
750            .ok_or_else(|| {
751                errors.push(ParserError {
752                    error_type: "validation_error".to_string(),
753                    field: "schema".to_string(),
754                    message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
755                });
756                anyhow::anyhow!("Missing schema")
757            });
758
759        let schema = match schema {
760            Ok(s) if s.is_empty() => {
761                errors.push(ParserError {
762                    error_type: "validation_error".to_string(),
763                    field: "schema".to_string(),
764                    message: "ODCS v3.0.x schema array is empty".to_string(),
765                });
766                let quality_rules = self.extract_quality_rules(data);
767                let table_uuid = self.extract_table_uuid(data);
768                let table = Table {
769                    id: table_uuid,
770                    name: table_name,
771                    columns: Vec::new(),
772                    database_type: None,
773                    catalog_name: None,
774                    schema_name: None,
775                    medallion_layers: Vec::new(),
776                    scd_pattern: None,
777                    data_vault_classification: None,
778                    modeling_level: None,
779                    tags: Vec::<Tag>::new(),
780                    odcl_metadata: HashMap::new(),
781                    owner: None,
782                    sla: None,
783                    contact_details: None,
784                    infrastructure_type: None,
785                    notes: None,
786                    position: None,
787                    yaml_file_path: None,
788                    drawio_cell_id: None,
789                    quality: quality_rules,
790                    errors: Vec::new(),
791                    created_at: chrono::Utc::now(),
792                    updated_at: chrono::Utc::now(),
793                };
794                return Ok((table, errors));
795            }
796            Ok(s) => s,
797            Err(_) => {
798                let quality_rules = self.extract_quality_rules(data);
799                let table_uuid = self.extract_table_uuid(data);
800                let table = Table {
801                    id: table_uuid,
802                    name: table_name,
803                    columns: Vec::new(),
804                    database_type: None,
805                    catalog_name: None,
806                    schema_name: None,
807                    medallion_layers: Vec::new(),
808                    scd_pattern: None,
809                    data_vault_classification: None,
810                    modeling_level: None,
811                    tags: Vec::<Tag>::new(),
812                    odcl_metadata: HashMap::new(),
813                    owner: None,
814                    sla: None,
815                    contact_details: None,
816                    infrastructure_type: None,
817                    notes: None,
818                    position: None,
819                    yaml_file_path: None,
820                    drawio_cell_id: None,
821                    quality: quality_rules,
822                    errors: Vec::new(),
823                    created_at: chrono::Utc::now(),
824                    updated_at: chrono::Utc::now(),
825                };
826                return Ok((table, errors));
827            }
828        };
829
830        // Get the first schema object (table)
831        let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
832            errors.push(ParserError {
833                error_type: "validation_error".to_string(),
834                field: "schema[0]".to_string(),
835                message: "First schema object must be a dictionary".to_string(),
836            });
837            anyhow::anyhow!("Invalid schema object")
838        })?;
839
840        let object_name = schema_object
841            .get("name")
842            .and_then(|v| v.as_str())
843            .unwrap_or(&table_name);
844
845        // Handle both object format (v3.0.x) and array format (v3.1.0) for properties
846        let mut columns = Vec::new();
847
848        if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
849            // Object format (v3.0.x): properties is a map of name -> property object
850            for (prop_name, prop_data) in properties_obj {
851                if let Some(prop_obj) = prop_data.as_object() {
852                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
853                        Ok(mut cols) => columns.append(&mut cols),
854                        Err(e) => {
855                            errors.push(ParserError {
856                                error_type: "property_parse_error".to_string(),
857                                field: format!("Property '{}'", prop_name),
858                                message: e.to_string(),
859                            });
860                        }
861                    }
862                } else {
863                    errors.push(ParserError {
864                        error_type: "validation_error".to_string(),
865                        field: format!("Property '{}'", prop_name),
866                        message: format!("Property '{}' must be an object", prop_name),
867                    });
868                }
869            }
870        } else if let Some(properties_arr) =
871            schema_object.get("properties").and_then(|v| v.as_array())
872        {
873            // Array format (v3.1.0): properties is an array of property objects with 'name' field
874            for (idx, prop_data) in properties_arr.iter().enumerate() {
875                if let Some(prop_obj) = prop_data.as_object() {
876                    // Extract name from property object (required in v3.1.0)
877                    // ODCS v3.1.0 requires 'name' field, but we'll also accept 'id' as fallback
878                    let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
879                        Some(JsonValue::String(s)) => s.as_str(),
880                        _ => {
881                            // Skip properties without name or id (not valid ODCS v3.1.0)
882                            errors.push(ParserError {
883                                error_type: "validation_error".to_string(),
884                                field: format!("Property[{}]", idx),
885                                message: format!(
886                                    "Property[{}] missing required 'name' or 'id' field",
887                                    idx
888                                ),
889                            });
890                            continue;
891                        }
892                    };
893
894                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
895                        Ok(mut cols) => columns.append(&mut cols),
896                        Err(e) => {
897                            errors.push(ParserError {
898                                error_type: "property_parse_error".to_string(),
899                                field: format!("Property[{}] '{}'", idx, prop_name),
900                                message: e.to_string(),
901                            });
902                        }
903                    }
904                } else {
905                    errors.push(ParserError {
906                        error_type: "validation_error".to_string(),
907                        field: format!("Property[{}]", idx),
908                        message: format!("Property[{}] must be an object", idx),
909                    });
910                }
911            }
912        } else {
913            errors.push(ParserError {
914                error_type: "validation_error".to_string(),
915                field: format!("Object '{}'", object_name),
916                message: format!(
917                    "Object '{}' missing 'properties' field or properties is invalid",
918                    object_name
919                ),
920            });
921        }
922
923        // Extract metadata from customProperties
924        let (medallion_layers, scd_pattern, data_vault_classification, mut tags): (
925            _,
926            _,
927            _,
928            Vec<Tag>,
929        ) = self.extract_metadata_from_custom_properties(data);
930
931        // Extract sharedDomains from customProperties
932        let mut shared_domains: Vec<String> = Vec::new();
933        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
934            for prop in custom_props {
935                if let Some(prop_obj) = prop.as_object() {
936                    let prop_key = prop_obj
937                        .get("property")
938                        .and_then(|v| v.as_str())
939                        .unwrap_or("");
940                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
941                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
942                    {
943                        for item in arr {
944                            if let Some(s) = item.as_str() {
945                                shared_domains.push(s.to_string());
946                            }
947                        }
948                    }
949                }
950            }
951        }
952
953        // Extract tags from top-level tags field (if not already extracted from customProperties)
954        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
955            for item in tags_arr {
956                if let Some(s) = item.as_str() {
957                    // Parse tag string to Tag enum
958                    let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
959                    if !tags.contains(&tag) {
960                        tags.push(tag);
961                    }
962                }
963            }
964        }
965
966        // Extract database type from servers
967        let database_type = self.extract_database_type_from_odcl_v3_servers(data);
968
969        // Extract quality rules
970        let quality_rules = self.extract_quality_rules(data);
971
972        // Build ODCL metadata
973        let mut odcl_metadata = HashMap::new();
974        odcl_metadata.insert(
975            "apiVersion".to_string(),
976            json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
977        );
978        odcl_metadata.insert(
979            "kind".to_string(),
980            json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
981        );
982        odcl_metadata.insert(
983            "id".to_string(),
984            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
985        );
986        odcl_metadata.insert(
987            "version".to_string(),
988            json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
989        );
990        odcl_metadata.insert(
991            "status".to_string(),
992            json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
993        );
994
995        // Extract servicelevels if present
996        if let Some(servicelevels_val) = data.get("servicelevels") {
997            odcl_metadata.insert(
998                "servicelevels".to_string(),
999                json_value_to_serde_value(servicelevels_val),
1000            );
1001        }
1002
1003        // Extract links if present
1004        if let Some(links_val) = data.get("links") {
1005            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1006        }
1007
1008        // Extract domain, dataProduct, tenant
1009        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1010            odcl_metadata.insert(
1011                "domain".to_string(),
1012                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1013            );
1014        }
1015        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1016            odcl_metadata.insert(
1017                "dataProduct".to_string(),
1018                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1019            );
1020        }
1021        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1022            odcl_metadata.insert(
1023                "tenant".to_string(),
1024                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1025            );
1026        }
1027
1028        // Extract top-level description (can be object or string)
1029        if let Some(desc_val) = data.get("description") {
1030            odcl_metadata.insert(
1031                "description".to_string(),
1032                json_value_to_serde_value(desc_val),
1033            );
1034        }
1035
1036        // Extract pricing
1037        if let Some(pricing_val) = data.get("pricing") {
1038            odcl_metadata.insert(
1039                "pricing".to_string(),
1040                json_value_to_serde_value(pricing_val),
1041            );
1042        }
1043
1044        // Extract team
1045        if let Some(team_val) = data.get("team") {
1046            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1047        }
1048
1049        // Extract roles
1050        if let Some(roles_val) = data.get("roles") {
1051            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1052        }
1053
1054        // Extract terms
1055        if let Some(terms_val) = data.get("terms") {
1056            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1057        }
1058
1059        // Extract full servers array (not just type)
1060        if let Some(servers_val) = data.get("servers") {
1061            odcl_metadata.insert(
1062                "servers".to_string(),
1063                json_value_to_serde_value(servers_val),
1064            );
1065        }
1066
1067        // Extract infrastructure
1068        if let Some(infrastructure_val) = data.get("infrastructure") {
1069            odcl_metadata.insert(
1070                "infrastructure".to_string(),
1071                json_value_to_serde_value(infrastructure_val),
1072            );
1073        }
1074
1075        // Store sharedDomains in metadata (extracted from customProperties above)
1076        if !shared_domains.is_empty() {
1077            let shared_domains_json: Vec<serde_json::Value> = shared_domains
1078                .iter()
1079                .map(|d| serde_json::Value::String(d.clone()))
1080                .collect();
1081            odcl_metadata.insert(
1082                "sharedDomains".to_string(),
1083                serde_json::Value::Array(shared_domains_json),
1084            );
1085        }
1086
1087        let table_uuid = self.extract_table_uuid(data);
1088
1089        let table = Table {
1090            id: table_uuid,
1091            name: table_name,
1092            columns,
1093            database_type,
1094            catalog_name: None,
1095            schema_name: None,
1096            medallion_layers,
1097            scd_pattern,
1098            data_vault_classification,
1099            modeling_level: None,
1100            tags,
1101            odcl_metadata,
1102            owner: None,
1103            sla: None,
1104            contact_details: None,
1105            infrastructure_type: None,
1106            notes: None,
1107            position: None,
1108            yaml_file_path: None,
1109            drawio_cell_id: None,
1110            quality: quality_rules,
1111            errors: Vec::new(),
1112            created_at: chrono::Utc::now(),
1113            updated_at: chrono::Utc::now(),
1114        };
1115
1116        info!(
1117            "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1118            table.name,
1119            errors.len()
1120        );
1121        Ok((table, errors))
1122    }
1123
1124    /// Parse a single property from ODCS v3.0.x format (similar to Data Contract field).
1125    fn parse_odcl_v3_property(
1126        &self,
1127        prop_name: &str,
1128        prop_data: &serde_json::Map<String, JsonValue>,
1129        data: &JsonValue,
1130    ) -> Result<Vec<Column>> {
1131        // Reuse Data Contract field parsing logic (they're similar)
1132        let mut errors = Vec::new();
1133        self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
1134    }
1135
1136    /// Extract table UUID from ODCS `id` field (standard) or fallback to customProperties/odcl_metadata (backward compatibility).
1137    /// Returns the UUID if found, otherwise generates a new one.
1138    fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
1139        // First check the top-level `id` field (ODCS spec: "A unique identifier used to reduce the risk of dataset name collisions, such as a UUID.")
1140        if let Some(id_val) = data.get("id")
1141            && let Some(id_str) = id_val.as_str()
1142        {
1143            if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1144                tracing::debug!(
1145                    "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
1146                    uuid
1147                );
1148                return uuid;
1149            } else {
1150                tracing::warn!(
1151                    "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
1152                    id_str
1153                );
1154            }
1155        }
1156
1157        // Backward compatibility: check customProperties for tableUuid (legacy format)
1158        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1159            for prop in custom_props {
1160                if let Some(prop_obj) = prop.as_object() {
1161                    let prop_key = prop_obj
1162                        .get("property")
1163                        .and_then(|v| v.as_str())
1164                        .unwrap_or("");
1165                    if prop_key == "tableUuid"
1166                        && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
1167                        && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1168                    {
1169                        tracing::debug!(
1170                            "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
1171                            uuid
1172                        );
1173                        return uuid;
1174                    }
1175                }
1176            }
1177        }
1178
1179        // Fallback: check odcl_metadata if present (legacy format)
1180        if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
1181            && let Some(uuid_val) = metadata.get("tableUuid")
1182            && let Some(uuid_str) = uuid_val.as_str()
1183            && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1184        {
1185            tracing::debug!(
1186                "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
1187                uuid
1188            );
1189            return uuid;
1190        }
1191
1192        // Generate deterministic UUID v5 if not found (based on table name)
1193        let table_name = data
1194            .get("name")
1195            .and_then(|v| v.as_str())
1196            .unwrap_or("unknown");
1197        let new_uuid = crate::models::table::Table::generate_id(
1198            table_name, None, // database_type not available here
1199            None, // catalog_name not available here
1200            None, // schema_name not available here
1201        );
1202        tracing::warn!(
1203            "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
1204            table_name,
1205            new_uuid
1206        );
1207        new_uuid
1208    }
1209
1210    /// Extract metadata from customProperties in ODCS v3.0.x format.
1211    fn extract_metadata_from_custom_properties(
1212        &self,
1213        data: &JsonValue,
1214    ) -> (
1215        Vec<MedallionLayer>,
1216        Option<SCDPattern>,
1217        Option<DataVaultClassification>,
1218        Vec<Tag>,
1219    ) {
1220        let mut medallion_layers = Vec::new();
1221        let mut scd_pattern = None;
1222        let mut data_vault_classification = None;
1223        let mut tags: Vec<Tag> = Vec::new();
1224
1225        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1226            for prop in custom_props {
1227                if let Some(prop_obj) = prop.as_object() {
1228                    let prop_key = prop_obj
1229                        .get("property")
1230                        .and_then(|v| v.as_str())
1231                        .unwrap_or("");
1232                    let prop_value = prop_obj.get("value");
1233
1234                    match prop_key {
1235                        "medallionLayers" | "medallion_layers" => {
1236                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1237                                for item in arr {
1238                                    if let Some(s) = item.as_str()
1239                                        && let Ok(layer) = parse_medallion_layer(s)
1240                                    {
1241                                        medallion_layers.push(layer);
1242                                    }
1243                                }
1244                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1245                                // Comma-separated string
1246                                for part in s.split(',') {
1247                                    if let Ok(layer) = parse_medallion_layer(part.trim()) {
1248                                        medallion_layers.push(layer);
1249                                    }
1250                                }
1251                            }
1252                        }
1253                        "scdPattern" | "scd_pattern" => {
1254                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1255                                scd_pattern = parse_scd_pattern(s).ok();
1256                            }
1257                        }
1258                        "dataVaultClassification" | "data_vault_classification" => {
1259                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1260                                data_vault_classification = parse_data_vault_classification(s).ok();
1261                            }
1262                        }
1263                        "tags" => {
1264                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1265                                for item in arr {
1266                                    if let Some(s) = item.as_str() {
1267                                        // Parse tag string to Tag enum
1268                                        if let Ok(tag) = Tag::from_str(s) {
1269                                            tags.push(tag);
1270                                        } else {
1271                                            tags.push(Tag::Simple(s.to_string()));
1272                                        }
1273                                    }
1274                                }
1275                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1276                                // Comma-separated string
1277                                for part in s.split(',') {
1278                                    let part = part.trim();
1279                                    if let Ok(tag) = Tag::from_str(part) {
1280                                        tags.push(tag);
1281                                    } else {
1282                                        tags.push(Tag::Simple(part.to_string()));
1283                                    }
1284                                }
1285                            }
1286                        }
1287                        "sharedDomains" | "shared_domains" => {
1288                            // sharedDomains will be stored in metadata by the caller
1289                            // This match is here for completeness but sharedDomains is handled separately
1290                        }
1291                        _ => {}
1292                    }
1293                }
1294            }
1295        }
1296
1297        // Also extract tags from top-level tags field
1298        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1299            for item in tags_arr {
1300                if let Some(s) = item.as_str() {
1301                    // Parse tag string to Tag enum
1302                    let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
1303                    if !tags.contains(&tag) {
1304                        tags.push(tag);
1305                    }
1306                }
1307            }
1308        }
1309
1310        (
1311            medallion_layers,
1312            scd_pattern,
1313            data_vault_classification,
1314            tags,
1315        )
1316    }
1317
1318    /// Extract database type from servers in ODCS v3.0.x format.
1319    fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
1320        // ODCS v3.0.x: servers is an array of Server objects
1321        if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
1322            && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
1323        {
1324            return server_obj
1325                .get("type")
1326                .and_then(|v| v.as_str())
1327                .and_then(|s| self.parse_database_type(s));
1328        }
1329        None
1330    }
1331
1332    /// Parse Data Contract format.
1333    fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1334        let mut errors = Vec::new();
1335
1336        // Extract models
1337        let models = data
1338            .get("models")
1339            .and_then(|v| v.as_object())
1340            .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
1341
1342        // parse_table() returns a single Table, so we parse the first model.
1343        // If multiple models are needed, call parse_table() multiple times or use import().
1344        let (model_name, model_data) = models
1345            .iter()
1346            .next()
1347            .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
1348
1349        let model_data = model_data
1350            .as_object()
1351            .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
1352
1353        // Extract fields (columns)
1354        let fields = model_data
1355            .get("fields")
1356            .and_then(|v| v.as_object())
1357            .ok_or_else(|| {
1358                errors.push(ParserError {
1359                    error_type: "validation_error".to_string(),
1360                    field: format!("Model '{}'", model_name),
1361                    message: format!("Model '{}' missing 'fields' field", model_name),
1362                });
1363                anyhow::anyhow!("Missing fields")
1364            });
1365
1366        let fields = match fields {
1367            Ok(f) => f,
1368            Err(_) => {
1369                // Return empty table with errors
1370                let quality_rules = self.extract_quality_rules(data);
1371                let table_uuid = self.extract_table_uuid(data);
1372                let table = Table {
1373                    id: table_uuid,
1374                    name: model_name.clone(),
1375                    columns: Vec::new(),
1376                    database_type: None,
1377                    catalog_name: None,
1378                    schema_name: None,
1379                    medallion_layers: Vec::new(),
1380                    scd_pattern: None,
1381                    data_vault_classification: None,
1382                    modeling_level: None,
1383                    tags: Vec::<Tag>::new(),
1384                    odcl_metadata: HashMap::new(),
1385                    owner: None,
1386                    sla: None,
1387                    contact_details: None,
1388                    infrastructure_type: None,
1389                    notes: None,
1390                    position: None,
1391                    yaml_file_path: None,
1392                    drawio_cell_id: None,
1393                    quality: quality_rules,
1394                    errors: Vec::new(),
1395                    created_at: chrono::Utc::now(),
1396                    updated_at: chrono::Utc::now(),
1397                };
1398                return Ok((table, errors));
1399            }
1400        };
1401
1402        // Parse fields as columns
1403        let mut columns = Vec::new();
1404        for (field_name, field_data) in fields {
1405            if let Some(field_obj) = field_data.as_object() {
1406                match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
1407                    Ok(mut cols) => columns.append(&mut cols),
1408                    Err(e) => {
1409                        errors.push(ParserError {
1410                            error_type: "field_parse_error".to_string(),
1411                            field: format!("Field '{}'", field_name),
1412                            message: e.to_string(),
1413                        });
1414                    }
1415                }
1416            } else {
1417                errors.push(ParserError {
1418                    error_type: "validation_error".to_string(),
1419                    field: format!("Field '{}'", field_name),
1420                    message: format!("Field '{}' must be an object", field_name),
1421                });
1422            }
1423        }
1424
1425        // Extract metadata from info section
1426        // The frontend expects info fields to be nested under "info" key
1427        let mut odcl_metadata = HashMap::new();
1428
1429        // Extract info section and nest it properly
1430        // Convert JsonValue object to serde_json::Value::Object (Map)
1431        if let Some(info_val) = data.get("info") {
1432            // Convert JsonValue to serde_json::Value
1433            let info_json_value = json_value_to_serde_value(info_val);
1434            odcl_metadata.insert("info".to_string(), info_json_value);
1435        }
1436
1437        odcl_metadata.insert(
1438            "dataContractSpecification".to_string(),
1439            json_value_to_serde_value(
1440                data.get("dataContractSpecification")
1441                    .unwrap_or(&JsonValue::Null),
1442            ),
1443        );
1444        odcl_metadata.insert(
1445            "id".to_string(),
1446            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1447        );
1448        // Note: model_name is not added to metadata as it's redundant (it's the table name)
1449
1450        // Extract servicelevels if present
1451        if let Some(servicelevels_val) = data.get("servicelevels") {
1452            odcl_metadata.insert(
1453                "servicelevels".to_string(),
1454                json_value_to_serde_value(servicelevels_val),
1455            );
1456        }
1457
1458        // Extract links if present
1459        if let Some(links_val) = data.get("links") {
1460            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1461        }
1462
1463        // Extract domain, dataProduct, tenant
1464        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1465            odcl_metadata.insert(
1466                "domain".to_string(),
1467                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1468            );
1469        }
1470        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1471            odcl_metadata.insert(
1472                "dataProduct".to_string(),
1473                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1474            );
1475        }
1476        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1477            odcl_metadata.insert(
1478                "tenant".to_string(),
1479                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1480            );
1481        }
1482
1483        // Extract top-level description (can be object or string)
1484        if let Some(desc_val) = data.get("description") {
1485            odcl_metadata.insert(
1486                "description".to_string(),
1487                json_value_to_serde_value(desc_val),
1488            );
1489        }
1490
1491        // Extract pricing
1492        if let Some(pricing_val) = data.get("pricing") {
1493            odcl_metadata.insert(
1494                "pricing".to_string(),
1495                json_value_to_serde_value(pricing_val),
1496            );
1497        }
1498
1499        // Extract team
1500        if let Some(team_val) = data.get("team") {
1501            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1502        }
1503
1504        // Extract roles
1505        if let Some(roles_val) = data.get("roles") {
1506            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1507        }
1508
1509        // Extract terms
1510        if let Some(terms_val) = data.get("terms") {
1511            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1512        }
1513
1514        // Extract full servers array (not just type)
1515        if let Some(servers_val) = data.get("servers") {
1516            odcl_metadata.insert(
1517                "servers".to_string(),
1518                json_value_to_serde_value(servers_val),
1519            );
1520        }
1521
1522        // Extract infrastructure
1523        if let Some(infrastructure_val) = data.get("infrastructure") {
1524            odcl_metadata.insert(
1525                "infrastructure".to_string(),
1526                json_value_to_serde_value(infrastructure_val),
1527            );
1528        }
1529
1530        // Extract database type from servers if available
1531        let database_type = self.extract_database_type_from_servers(data);
1532
1533        // Extract catalog and schema from customProperties
1534        let (catalog_name, schema_name) = self.extract_catalog_schema(data);
1535
1536        // Extract sharedDomains from customProperties
1537        let mut shared_domains: Vec<String> = Vec::new();
1538        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1539            for prop in custom_props {
1540                if let Some(prop_obj) = prop.as_object() {
1541                    let prop_key = prop_obj
1542                        .get("property")
1543                        .and_then(|v| v.as_str())
1544                        .unwrap_or("");
1545                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1546                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1547                    {
1548                        for item in arr {
1549                            if let Some(s) = item.as_str() {
1550                                shared_domains.push(s.to_string());
1551                            }
1552                        }
1553                    }
1554                }
1555            }
1556        }
1557
1558        // Extract tags from top-level tags field (Data Contract format)
1559        let mut tags: Vec<Tag> = Vec::new();
1560        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1561            for item in tags_arr {
1562                if let Some(s) = item.as_str() {
1563                    // Parse tag string to Tag enum (supports Simple, Pair, List formats)
1564                    if let Ok(tag) = Tag::from_str(s) {
1565                        tags.push(tag);
1566                    } else {
1567                        // Fallback: create Simple tag if parsing fails
1568                        tags.push(crate::models::Tag::Simple(s.to_string()));
1569                    }
1570                }
1571            }
1572        }
1573
1574        // Extract quality rules
1575        let quality_rules = self.extract_quality_rules(data);
1576
1577        // Store sharedDomains in metadata
1578        if !shared_domains.is_empty() {
1579            let shared_domains_json: Vec<serde_json::Value> = shared_domains
1580                .iter()
1581                .map(|d| serde_json::Value::String(d.clone()))
1582                .collect();
1583            odcl_metadata.insert(
1584                "sharedDomains".to_string(),
1585                serde_json::Value::Array(shared_domains_json),
1586            );
1587        }
1588
1589        let table_uuid = self.extract_table_uuid(data);
1590
1591        let table = Table {
1592            id: table_uuid,
1593            name: model_name.clone(),
1594            columns,
1595            database_type,
1596            catalog_name,
1597            schema_name,
1598            medallion_layers: Vec::new(),
1599            scd_pattern: None,
1600            data_vault_classification: None,
1601            modeling_level: None,
1602            tags,
1603            odcl_metadata,
1604            owner: None,
1605            sla: None,
1606            contact_details: None,
1607            infrastructure_type: None,
1608            notes: None,
1609            position: None,
1610            yaml_file_path: None,
1611            drawio_cell_id: None,
1612            quality: quality_rules,
1613            errors: Vec::new(),
1614            created_at: chrono::Utc::now(),
1615            updated_at: chrono::Utc::now(),
1616        };
1617
1618        info!(
1619            "Parsed Data Contract table: {} with {} warnings/errors",
1620            model_name,
1621            errors.len()
1622        );
1623        Ok((table, errors))
1624    }
1625
1626    /// Expand a nested column from a schema definition, creating columns with dot notation.
1627    ///
1628    /// This helper function recursively expands nested structures (OBJECT/STRUCT types)
1629    /// into flat columns with dot notation (e.g., "address.street", "address.city").
1630    #[allow(clippy::only_used_in_recursion)]
1631    fn expand_nested_column(
1632        &self,
1633        column_name: &str,
1634        schema: &JsonValue,
1635        nullable: bool,
1636        columns: &mut Vec<Column>,
1637        errors: &mut Vec<ParserError>,
1638    ) {
1639        let schema_obj = match schema.as_object() {
1640            Some(obj) => obj,
1641            None => {
1642                errors.push(ParserError {
1643                    error_type: "parse_error".to_string(),
1644                    field: column_name.to_string(),
1645                    message: "Nested schema must be an object".to_string(),
1646                });
1647                return;
1648            }
1649        };
1650
1651        let schema_type = schema_obj
1652            .get("type")
1653            .and_then(|v| v.as_str())
1654            .unwrap_or("object");
1655
1656        match schema_type {
1657            "object" | "struct" => {
1658                // Check if it has nested properties
1659                if let Some(properties) = schema_obj.get("properties").and_then(|v| v.as_object()) {
1660                    let nested_required: Vec<String> = schema_obj
1661                        .get("required")
1662                        .and_then(|v| v.as_array())
1663                        .map(|arr| {
1664                            arr.iter()
1665                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
1666                                .collect()
1667                        })
1668                        .unwrap_or_default();
1669
1670                    for (nested_name, nested_schema) in properties {
1671                        let nested_nullable = !nested_required.contains(nested_name);
1672                        self.expand_nested_column(
1673                            &format!("{}.{}", column_name, nested_name),
1674                            nested_schema,
1675                            nullable || nested_nullable,
1676                            columns,
1677                            errors,
1678                        );
1679                    }
1680                } else {
1681                    // Object without properties - create as OBJECT type
1682                    let description = schema_obj
1683                        .get("description")
1684                        .and_then(|v| v.as_str())
1685                        .unwrap_or("")
1686                        .to_string();
1687                    columns.push(Column {
1688                        name: column_name.to_string(),
1689                        data_type: "OBJECT".to_string(),
1690                        nullable,
1691                        primary_key: false,
1692                        secondary_key: false,
1693                        composite_key: None,
1694                        foreign_key: None,
1695                        constraints: Vec::new(),
1696                        description,
1697                        quality: Vec::new(),
1698                        ref_path: None,
1699                        enum_values: Vec::new(),
1700                        errors: Vec::new(),
1701                        column_order: 0,
1702                    });
1703                }
1704            }
1705            "array" => {
1706                // Handle array types
1707                let items = schema_obj.get("items").unwrap_or(schema);
1708                let items_type = items
1709                    .as_object()
1710                    .and_then(|obj| obj.get("type").and_then(|v| v.as_str()))
1711                    .unwrap_or("string");
1712
1713                if items_type == "object" || items_type == "struct" {
1714                    // Array of objects - expand nested structure
1715                    let description = schema_obj
1716                        .get("description")
1717                        .and_then(|v| v.as_str())
1718                        .unwrap_or("")
1719                        .to_string();
1720                    columns.push(Column {
1721                        name: column_name.to_string(),
1722                        data_type: "ARRAY<OBJECT>".to_string(),
1723                        nullable,
1724                        primary_key: false,
1725                        secondary_key: false,
1726                        composite_key: None,
1727                        foreign_key: None,
1728                        constraints: Vec::new(),
1729                        description,
1730                        quality: Vec::new(),
1731                        ref_path: None,
1732                        enum_values: Vec::new(),
1733                        errors: Vec::new(),
1734                        column_order: 0,
1735                    });
1736                    // Also expand nested properties with array prefix
1737                    if let Some(properties) = items
1738                        .as_object()
1739                        .and_then(|obj| obj.get("properties").and_then(|v| v.as_object()))
1740                    {
1741                        let nested_required: Vec<String> = items
1742                            .as_object()
1743                            .and_then(|obj| obj.get("required").and_then(|v| v.as_array()))
1744                            .map(|arr| {
1745                                arr.iter()
1746                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1747                                    .collect()
1748                            })
1749                            .unwrap_or_default();
1750
1751                        for (nested_name, nested_schema) in properties {
1752                            let nested_nullable = !nested_required.contains(nested_name);
1753                            self.expand_nested_column(
1754                                &format!("{}.{}", column_name, nested_name),
1755                                nested_schema,
1756                                nullable || nested_nullable,
1757                                columns,
1758                                errors,
1759                            );
1760                        }
1761                    }
1762                } else {
1763                    // Array of primitives
1764                    let data_type = format!("ARRAY<{}>", items_type.to_uppercase());
1765                    let description = schema_obj
1766                        .get("description")
1767                        .and_then(|v| v.as_str())
1768                        .unwrap_or("")
1769                        .to_string();
1770                    columns.push(Column {
1771                        name: column_name.to_string(),
1772                        data_type,
1773                        nullable,
1774                        primary_key: false,
1775                        secondary_key: false,
1776                        composite_key: None,
1777                        foreign_key: None,
1778                        constraints: Vec::new(),
1779                        description,
1780                        quality: Vec::new(),
1781                        ref_path: None,
1782                        enum_values: Vec::new(),
1783                        errors: Vec::new(),
1784                        column_order: 0,
1785                    });
1786                }
1787            }
1788            _ => {
1789                // Simple type
1790                let data_type = schema_type.to_uppercase();
1791                let description = schema_obj
1792                    .get("description")
1793                    .and_then(|v| v.as_str())
1794                    .unwrap_or("")
1795                    .to_string();
1796                let enum_values = schema_obj
1797                    .get("enum")
1798                    .and_then(|v| v.as_array())
1799                    .map(|arr| {
1800                        arr.iter()
1801                            .filter_map(|v| v.as_str().map(|s| s.to_string()))
1802                            .collect()
1803                    })
1804                    .unwrap_or_default();
1805                columns.push(Column {
1806                    name: column_name.to_string(),
1807                    data_type,
1808                    nullable,
1809                    primary_key: false,
1810                    secondary_key: false,
1811                    composite_key: None,
1812                    foreign_key: None,
1813                    constraints: Vec::new(),
1814                    description,
1815                    quality: Vec::new(),
1816                    ref_path: None,
1817                    enum_values,
1818                    errors: Vec::new(),
1819                    column_order: 0,
1820                });
1821            }
1822        }
1823    }
1824
1825    /// Parse a single field from Data Contract format.
1826    fn parse_data_contract_field(
1827        &self,
1828        field_name: &str,
1829        field_data: &serde_json::Map<String, JsonValue>,
1830        data: &JsonValue,
1831        errors: &mut Vec<ParserError>,
1832    ) -> Result<Vec<Column>> {
1833        let mut columns = Vec::new();
1834
1835        // Helper function to extract quality rules from a JSON object
1836        let extract_quality_from_obj =
1837            |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
1838                let mut quality_rules = Vec::new();
1839                if let Some(quality_val) = obj.get("quality") {
1840                    if let Some(arr) = quality_val.as_array() {
1841                        // Array of quality rules
1842                        for item in arr {
1843                            if let Some(rule_obj) = item.as_object() {
1844                                let mut rule = HashMap::new();
1845                                for (key, value) in rule_obj {
1846                                    rule.insert(key.clone(), json_value_to_serde_value(value));
1847                                }
1848                                quality_rules.push(rule);
1849                            }
1850                        }
1851                    } else if let Some(rule_obj) = quality_val.as_object() {
1852                        // Single quality rule object
1853                        let mut rule = HashMap::new();
1854                        for (key, value) in rule_obj {
1855                            rule.insert(key.clone(), json_value_to_serde_value(value));
1856                        }
1857                        quality_rules.push(rule);
1858                    }
1859                }
1860                quality_rules
1861            };
1862
1863        // Extract description from field_data (preserve empty strings)
1864        let description = field_data
1865            .get("description")
1866            .and_then(|v| v.as_str())
1867            .unwrap_or("")
1868            .to_string();
1869
1870        // Extract quality rules from field_data
1871        let mut quality_rules = extract_quality_from_obj(field_data);
1872
1873        // Check for $ref
1874        if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
1875            // Store ref_path (preserve even if definition doesn't exist)
1876            let ref_path = Some(ref_str.to_string());
1877
1878            if let Some(definition) = self.resolve_ref(ref_str, data) {
1879                // Merge quality rules from definition if field doesn't have any
1880
1881                // Also extract quality rules from definition and merge (if field doesn't have any)
1882                if quality_rules.is_empty() {
1883                    if let Some(def_obj) = definition.as_object() {
1884                        quality_rules = extract_quality_from_obj(def_obj);
1885                    }
1886                } else {
1887                    // Merge definition quality rules if field has some
1888                    if let Some(def_obj) = definition.as_object() {
1889                        let def_quality = extract_quality_from_obj(def_obj);
1890                        // Append definition quality rules (field-level takes precedence)
1891                        quality_rules.extend(def_quality);
1892                    }
1893                }
1894
1895                let required = field_data
1896                    .get("required")
1897                    .and_then(|v| v.as_bool())
1898                    .unwrap_or(false);
1899
1900                // If required=true, add not_null quality rule if not present
1901                if required {
1902                    let has_not_null = quality_rules.iter().any(|rule| {
1903                        rule.get("type")
1904                            .and_then(|v| v.as_str())
1905                            .map(|s| {
1906                                s.to_lowercase().contains("not_null")
1907                                    || s.to_lowercase().contains("notnull")
1908                            })
1909                            .unwrap_or(false)
1910                    });
1911                    if !has_not_null {
1912                        let mut not_null_rule = HashMap::new();
1913                        not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
1914                        not_null_rule.insert(
1915                            "description".to_string(),
1916                            serde_json::json!("Column must not be null"),
1917                        );
1918                        quality_rules.push(not_null_rule);
1919                    }
1920                }
1921
1922                // Check if definition is an object/struct with nested structure
1923                let has_nested = definition
1924                    .get("type")
1925                    .and_then(|v| v.as_str())
1926                    .map(|s| s == "object")
1927                    .unwrap_or(false)
1928                    || definition.get("properties").is_some()
1929                    || definition.get("fields").is_some();
1930
1931                if has_nested {
1932                    // Expand STRUCT from definition into nested columns with dot notation
1933                    if let Some(properties) =
1934                        definition.get("properties").and_then(|v| v.as_object())
1935                    {
1936                        // Recursively expand nested properties
1937                        let nested_required: Vec<String> = definition
1938                            .get("required")
1939                            .and_then(|v| v.as_array())
1940                            .map(|arr| {
1941                                arr.iter()
1942                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1943                                    .collect()
1944                            })
1945                            .unwrap_or_default();
1946
1947                        for (nested_name, nested_schema) in properties {
1948                            let nested_required_field = nested_required.contains(nested_name);
1949                            self.expand_nested_column(
1950                                &format!("{}.{}", field_name, nested_name),
1951                                nested_schema,
1952                                !nested_required_field,
1953                                &mut columns,
1954                                errors,
1955                            );
1956                        }
1957                    } else if let Some(fields) =
1958                        definition.get("fields").and_then(|v| v.as_object())
1959                    {
1960                        // Handle fields format (ODCL style)
1961                        for (nested_name, nested_schema) in fields {
1962                            self.expand_nested_column(
1963                                &format!("{}.{}", field_name, nested_name),
1964                                nested_schema,
1965                                true, // Assume nullable if not specified
1966                                &mut columns,
1967                                errors,
1968                            );
1969                        }
1970                    } else {
1971                        // Fallback: create parent column as OBJECT if we can't expand
1972                        columns.push(Column {
1973                            name: field_name.to_string(),
1974                            data_type: "OBJECT".to_string(),
1975                            nullable: !required,
1976                            primary_key: false,
1977                            secondary_key: false,
1978                            composite_key: None,
1979                            foreign_key: None,
1980                            constraints: Vec::new(),
1981                            description: if description.is_empty() {
1982                                definition
1983                                    .get("description")
1984                                    .and_then(|v| v.as_str())
1985                                    .unwrap_or("")
1986                                    .to_string()
1987                            } else {
1988                                description.clone()
1989                            },
1990                            errors: Vec::new(),
1991                            quality: quality_rules.clone(),
1992                            ref_path: ref_path.clone(),
1993                            enum_values: Vec::new(),
1994                            column_order: 0,
1995                        });
1996                    }
1997                } else {
1998                    // Simple type from definition
1999                    let def_type = definition
2000                        .get("type")
2001                        .and_then(|v| v.as_str())
2002                        .unwrap_or("STRING")
2003                        .to_uppercase();
2004
2005                    let enum_values = definition
2006                        .get("enum")
2007                        .and_then(|v| v.as_array())
2008                        .map(|arr| {
2009                            arr.iter()
2010                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
2011                                .collect()
2012                        })
2013                        .unwrap_or_default();
2014
2015                    columns.push(Column {
2016                        name: field_name.to_string(),
2017                        data_type: def_type,
2018                        nullable: !required,
2019                        primary_key: false,
2020                        secondary_key: false,
2021                        composite_key: None,
2022                        foreign_key: None,
2023                        constraints: Vec::new(),
2024                        description: if description.is_empty() {
2025                            definition
2026                                .get("description")
2027                                .and_then(|v| v.as_str())
2028                                .unwrap_or("")
2029                                .to_string()
2030                        } else {
2031                            description
2032                        },
2033                        errors: Vec::new(),
2034                        quality: quality_rules,
2035                        ref_path,
2036                        enum_values,
2037                        column_order: 0,
2038                    });
2039                }
2040                return Ok(columns);
2041            } else {
2042                // Undefined reference - create column with error
2043                let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
2044                let mut error_map = HashMap::new();
2045                error_map.insert("type".to_string(), serde_json::json!("validation_error"));
2046                error_map.insert("field".to_string(), serde_json::json!("data_type"));
2047                error_map.insert(
2048                    "message".to_string(),
2049                    serde_json::json!(format!(
2050                        "Field '{}' references undefined definition: {}",
2051                        field_name, ref_str
2052                    )),
2053                );
2054                col_errors.push(error_map);
2055                columns.push(Column {
2056                    name: field_name.to_string(),
2057                    data_type: "OBJECT".to_string(),
2058                    nullable: true,
2059                    primary_key: false,
2060                    secondary_key: false,
2061                    composite_key: None,
2062                    foreign_key: None,
2063                    constraints: Vec::new(),
2064                    description,
2065                    errors: col_errors,
2066                    quality: Vec::new(),
2067                    ref_path: Some(ref_str.to_string()), // Preserve ref_path even if undefined
2068                    enum_values: Vec::new(),
2069                    column_order: 0,
2070                });
2071                return Ok(columns);
2072            }
2073        }
2074
2075        // Extract field type - default to STRING if missing
2076        let field_type_str = field_data
2077            .get("type")
2078            .and_then(|v| v.as_str())
2079            .unwrap_or("STRING");
2080
2081        // Check if type contains STRUCT definition (multiline STRUCT type)
2082        if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
2083            match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
2084                Ok(nested_cols) if !nested_cols.is_empty() => {
2085                    // We have nested columns - add parent column with full type, then nested columns
2086                    let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
2087                        "ARRAY<STRUCT<...>>".to_string()
2088                    } else {
2089                        "STRUCT<...>".to_string()
2090                    };
2091
2092                    // Add parent column
2093                    columns.push(Column {
2094                        name: field_name.to_string(),
2095                        data_type: parent_data_type,
2096                        nullable: !field_data
2097                            .get("required")
2098                            .and_then(|v| v.as_bool())
2099                            .unwrap_or(false),
2100                        primary_key: false,
2101                        secondary_key: false,
2102                        composite_key: None,
2103                        foreign_key: None,
2104                        constraints: Vec::new(),
2105                        description: description.clone(),
2106                        errors: Vec::new(),
2107                        quality: quality_rules.clone(),
2108                        ref_path: field_data
2109                            .get("$ref")
2110                            .and_then(|v| v.as_str())
2111                            .map(|s| s.to_string()),
2112                        enum_values: Vec::new(),
2113                        column_order: 0,
2114                    });
2115
2116                    // Add nested columns
2117                    columns.extend(nested_cols);
2118                    return Ok(columns);
2119                }
2120                Ok(_) | Err(_) => {
2121                    // If parsing fails or returns empty, fall back to using the type as-is
2122                }
2123            }
2124        }
2125
2126        let field_type = normalize_data_type(field_type_str);
2127
2128        // Handle ARRAY type
2129        if field_type == "ARRAY" {
2130            let items = field_data.get("items");
2131            if let Some(items_val) = items {
2132                if let Some(items_obj) = items_val.as_object() {
2133                    // Check if items is an object with fields (nested structure)
2134                    if items_obj.get("fields").is_some()
2135                        || items_obj.get("type").and_then(|v| v.as_str()) == Some("object")
2136                    {
2137                        // Array of objects - create parent column as ARRAY<OBJECT>
2138                        columns.push(Column {
2139                            name: field_name.to_string(),
2140                            data_type: "ARRAY<OBJECT>".to_string(),
2141                            nullable: !field_data
2142                                .get("required")
2143                                .and_then(|v| v.as_bool())
2144                                .unwrap_or(false),
2145                            primary_key: false,
2146                            secondary_key: false,
2147                            composite_key: None,
2148                            foreign_key: None,
2149                            constraints: Vec::new(),
2150                            description: field_data
2151                                .get("description")
2152                                .and_then(|v| v.as_str())
2153                                .unwrap_or("")
2154                                .to_string(),
2155                            errors: Vec::new(),
2156                            quality: Vec::new(),
2157                            ref_path: None,
2158                            enum_values: Vec::new(),
2159                            column_order: 0,
2160                        });
2161
2162                        // Extract nested fields from items.properties or items.fields if present
2163                        // Note: Export saves nested columns in properties.properties, but some formats use fields
2164                        let nested_fields_obj = items_obj
2165                            .get("properties")
2166                            .and_then(|v| v.as_object())
2167                            .or_else(|| items_obj.get("fields").and_then(|v| v.as_object()));
2168
2169                        if let Some(fields_obj) = nested_fields_obj {
2170                            for (nested_field_name, nested_field_data) in fields_obj {
2171                                if let Some(nested_field_obj) = nested_field_data.as_object() {
2172                                    let nested_field_type = nested_field_obj
2173                                        .get("type")
2174                                        .and_then(|v| v.as_str())
2175                                        .unwrap_or("STRING");
2176
2177                                    // Recursively parse nested fields with array prefix
2178                                    let nested_col_name =
2179                                        format!("{}.{}", field_name, nested_field_name);
2180                                    let mut local_errors = Vec::new();
2181                                    match self.parse_data_contract_field(
2182                                        &nested_col_name,
2183                                        nested_field_obj,
2184                                        data,
2185                                        &mut local_errors,
2186                                    ) {
2187                                        Ok(mut nested_cols) => {
2188                                            // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2189                                            // Otherwise, it returns a single column
2190                                            columns.append(&mut nested_cols);
2191                                        }
2192                                        Err(_) => {
2193                                            // Fallback: create simple nested column
2194                                            columns.push(Column {
2195                                                name: nested_col_name,
2196                                                data_type: nested_field_type.to_uppercase(),
2197                                                nullable: !nested_field_obj
2198                                                    .get("required")
2199                                                    .and_then(|v| v.as_bool())
2200                                                    .unwrap_or(false),
2201                                                primary_key: false,
2202                                                secondary_key: false,
2203                                                composite_key: None,
2204                                                foreign_key: None,
2205                                                constraints: Vec::new(),
2206                                                description: nested_field_obj
2207                                                    .get("description")
2208                                                    .and_then(|v| v.as_str())
2209                                                    .unwrap_or("")
2210                                                    .to_string(),
2211                                                errors: Vec::new(),
2212                                                quality: Vec::new(),
2213                                                ref_path: None,
2214                                                enum_values: Vec::new(),
2215                                                column_order: 0,
2216                                            });
2217                                        }
2218                                    }
2219                                }
2220                            }
2221                        }
2222
2223                        return Ok(columns);
2224                    } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
2225                        // Array of simple type
2226                        columns.push(Column {
2227                            name: field_name.to_string(),
2228                            data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
2229                            nullable: !field_data
2230                                .get("required")
2231                                .and_then(|v| v.as_bool())
2232                                .unwrap_or(false),
2233                            primary_key: false,
2234                            secondary_key: false,
2235                            composite_key: None,
2236                            foreign_key: None,
2237                            constraints: Vec::new(),
2238                            description: description.clone(),
2239                            errors: Vec::new(),
2240                            quality: quality_rules.clone(),
2241                            ref_path: field_data
2242                                .get("$ref")
2243                                .and_then(|v| v.as_str())
2244                                .map(|s| s.to_string()),
2245                            enum_values: Vec::new(),
2246                            column_order: 0,
2247                        });
2248                        return Ok(columns);
2249                    }
2250                } else if let Some(item_type_str) = items_val.as_str() {
2251                    // Array of simple type (string)
2252                    columns.push(Column {
2253                        name: field_name.to_string(),
2254                        data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
2255                        nullable: !field_data
2256                            .get("required")
2257                            .and_then(|v| v.as_bool())
2258                            .unwrap_or(false),
2259                        primary_key: false,
2260                        secondary_key: false,
2261                        composite_key: None,
2262                        foreign_key: None,
2263                        constraints: Vec::new(),
2264                        description: description.clone(),
2265                        errors: Vec::new(),
2266                        quality: quality_rules.clone(),
2267                        ref_path: field_data
2268                            .get("$ref")
2269                            .and_then(|v| v.as_str())
2270                            .map(|s| s.to_string()),
2271                        enum_values: Vec::new(),
2272                        column_order: 0,
2273                    });
2274                    return Ok(columns);
2275                }
2276            }
2277            // Array without items - default to ARRAY<STRING>
2278            columns.push(Column {
2279                name: field_name.to_string(),
2280                data_type: "ARRAY<STRING>".to_string(),
2281                nullable: !field_data
2282                    .get("required")
2283                    .and_then(|v| v.as_bool())
2284                    .unwrap_or(false),
2285                primary_key: false,
2286                secondary_key: false,
2287                composite_key: None,
2288                foreign_key: None,
2289                constraints: Vec::new(),
2290                description: description.clone(),
2291                errors: Vec::new(),
2292                quality: quality_rules.clone(),
2293                ref_path: field_data
2294                    .get("$ref")
2295                    .and_then(|v| v.as_str())
2296                    .map(|s| s.to_string()),
2297                enum_values: Vec::new(),
2298                column_order: 0,
2299            });
2300            return Ok(columns);
2301        }
2302
2303        // Check if this is a nested object with fields or properties
2304        // Note: Export saves nested columns in properties.properties, but some formats use fields
2305        let nested_fields_obj = field_data
2306            .get("properties")
2307            .and_then(|v| v.as_object())
2308            .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
2309
2310        if field_type == "OBJECT"
2311            && let Some(fields_obj) = nested_fields_obj
2312        {
2313            // Inline nested object - create parent column as OBJECT and extract nested fields
2314
2315            // Create parent column
2316            columns.push(Column {
2317                name: field_name.to_string(),
2318                data_type: "OBJECT".to_string(),
2319                nullable: !field_data
2320                    .get("required")
2321                    .and_then(|v| v.as_bool())
2322                    .unwrap_or(false),
2323                primary_key: false,
2324                secondary_key: false,
2325                composite_key: None,
2326                foreign_key: None,
2327                constraints: Vec::new(),
2328                description: description.clone(),
2329                errors: Vec::new(),
2330                quality: quality_rules.clone(),
2331                ref_path: field_data
2332                    .get("$ref")
2333                    .and_then(|v| v.as_str())
2334                    .map(|s| s.to_string()),
2335                enum_values: Vec::new(),
2336                column_order: 0,
2337            });
2338
2339            // Extract nested fields recursively
2340            for (nested_field_name, nested_field_data) in fields_obj {
2341                if let Some(nested_field_obj) = nested_field_data.as_object() {
2342                    let nested_field_type = nested_field_obj
2343                        .get("type")
2344                        .and_then(|v| v.as_str())
2345                        .unwrap_or("STRING");
2346
2347                    // Recursively parse nested fields
2348                    let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2349                    match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data) {
2350                        Ok(mut nested_cols) => {
2351                            // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2352                            // Otherwise, it returns a single column
2353                            columns.append(&mut nested_cols);
2354                        }
2355                        Err(_) => {
2356                            // Fallback: create simple nested column
2357                            columns.push(Column {
2358                                name: nested_col_name,
2359                                data_type: nested_field_type.to_uppercase(),
2360                                nullable: !nested_field_obj
2361                                    .get("required")
2362                                    .and_then(|v| v.as_bool())
2363                                    .unwrap_or(false),
2364                                primary_key: false,
2365                                secondary_key: false,
2366                                composite_key: None,
2367                                foreign_key: None,
2368                                constraints: Vec::new(),
2369                                description: nested_field_obj
2370                                    .get("description")
2371                                    .and_then(|v| v.as_str())
2372                                    .unwrap_or("")
2373                                    .to_string(),
2374                                errors: Vec::new(),
2375                                quality: Vec::new(),
2376                                ref_path: None,
2377                                enum_values: Vec::new(),
2378                                column_order: 0,
2379                            });
2380                        }
2381                    }
2382                }
2383            }
2384
2385            return Ok(columns);
2386        }
2387
2388        // Regular field (no $ref or $ref not found)
2389        // Check for $ref even in non-$ref path (in case $ref doesn't resolve)
2390        let ref_path = field_data
2391            .get("$ref")
2392            .and_then(|v| v.as_str())
2393            .map(|s| s.to_string());
2394
2395        let required = field_data
2396            .get("required")
2397            .and_then(|v| v.as_bool())
2398            .unwrap_or(false);
2399
2400        // Use description extracted at function start, or extract if not yet extracted
2401        let field_description = if description.is_empty() {
2402            field_data
2403                .get("description")
2404                .and_then(|v| v.as_str())
2405                .unwrap_or("")
2406                .to_string()
2407        } else {
2408            description
2409        };
2410
2411        // Use quality_rules extracted at function start, or extract if not yet extracted
2412        let mut column_quality_rules = quality_rules;
2413
2414        // Extract column-level quality rules if not already extracted
2415        if column_quality_rules.is_empty()
2416            && let Some(quality_val) = field_data.get("quality")
2417        {
2418            if let Some(arr) = quality_val.as_array() {
2419                // Array of quality rules
2420                for item in arr {
2421                    if let Some(obj) = item.as_object() {
2422                        let mut rule = HashMap::new();
2423                        for (key, value) in obj {
2424                            rule.insert(key.clone(), json_value_to_serde_value(value));
2425                        }
2426                        column_quality_rules.push(rule);
2427                    }
2428                }
2429            } else if let Some(obj) = quality_val.as_object() {
2430                // Single quality rule object
2431                let mut rule = HashMap::new();
2432                for (key, value) in obj {
2433                    rule.insert(key.clone(), json_value_to_serde_value(value));
2434                }
2435                column_quality_rules.push(rule);
2436            }
2437        }
2438
2439        // If required=true (nullable=false), add a "not_null" quality rule if not already present
2440        if required {
2441            let has_not_null = column_quality_rules.iter().any(|rule| {
2442                rule.get("type")
2443                    .and_then(|v| v.as_str())
2444                    .map(|s| {
2445                        s.to_lowercase().contains("not_null")
2446                            || s.to_lowercase().contains("notnull")
2447                    })
2448                    .unwrap_or(false)
2449            });
2450            if !has_not_null {
2451                let mut not_null_rule = HashMap::new();
2452                not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
2453                not_null_rule.insert(
2454                    "description".to_string(),
2455                    serde_json::json!("Column must not be null"),
2456                );
2457                column_quality_rules.push(not_null_rule);
2458            }
2459        }
2460
2461        columns.push(Column {
2462            name: field_name.to_string(),
2463            data_type: field_type,
2464            nullable: !required,
2465            primary_key: field_data
2466                .get("primaryKey")
2467                .and_then(|v| v.as_bool())
2468                .unwrap_or(false),
2469            secondary_key: false,
2470            composite_key: None,
2471            foreign_key: self.parse_foreign_key_from_data_contract(field_data),
2472            constraints: Vec::new(),
2473            description: field_description,
2474            errors: Vec::new(),
2475            quality: column_quality_rules,
2476            ref_path,
2477            enum_values: Vec::new(),
2478            column_order: 0,
2479        });
2480
2481        Ok(columns)
2482    }
2483
2484    /// Parse foreign key from Data Contract field data.
2485    fn parse_foreign_key_from_data_contract(
2486        &self,
2487        field_data: &serde_json::Map<String, JsonValue>,
2488    ) -> Option<ForeignKey> {
2489        field_data
2490            .get("foreignKey")
2491            .and_then(|v| v.as_object())
2492            .map(|fk_obj| ForeignKey {
2493                table_id: fk_obj
2494                    .get("table")
2495                    .or_else(|| fk_obj.get("table_id"))
2496                    .and_then(|v| v.as_str())
2497                    .unwrap_or("")
2498                    .to_string(),
2499                column_name: fk_obj
2500                    .get("column")
2501                    .or_else(|| fk_obj.get("column_name"))
2502                    .and_then(|v| v.as_str())
2503                    .unwrap_or("")
2504                    .to_string(),
2505            })
2506    }
2507
2508    /// Extract database type from servers in Data Contract format.
2509    fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2510        // Data Contract format: servers can be object or array
2511        if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
2512            // Object format: { "server_name": { "type": "..." } }
2513            if let Some((_, server_data)) = servers_obj.iter().next()
2514                && let Some(server_obj) = server_data.as_object()
2515            {
2516                return server_obj
2517                    .get("type")
2518                    .and_then(|v| v.as_str())
2519                    .and_then(|s| self.parse_database_type(s));
2520            }
2521        } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
2522            // Array format: [ { "server": "...", "type": "..." } ]
2523            if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
2524                return server_obj
2525                    .get("type")
2526                    .and_then(|v| v.as_str())
2527                    .and_then(|s| self.parse_database_type(s));
2528            }
2529        }
2530        None
2531    }
2532
2533    /// Parse database type string to enum.
2534    fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
2535        match s.to_lowercase().as_str() {
2536            "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
2537            "postgres" | "postgresql" => Some(DatabaseType::Postgres),
2538            "mysql" => Some(DatabaseType::Mysql),
2539            "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
2540            "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
2541            _ => None,
2542        }
2543    }
2544
2545    /// Parse STRUCT type definition from string (e.g., "ARRAY<STRUCT<ID: STRING, NAME: STRING>>").
2546    fn parse_struct_type_from_string(
2547        &self,
2548        field_name: &str,
2549        type_str: &str,
2550        field_data: &serde_json::Map<String, JsonValue>,
2551    ) -> Result<Vec<Column>> {
2552        let mut columns = Vec::new();
2553
2554        // Normalize whitespace - replace newlines and multiple spaces with single space
2555        let normalized_type = type_str
2556            .lines()
2557            .map(|line| line.trim())
2558            .filter(|line| !line.is_empty())
2559            .collect::<Vec<_>>()
2560            .join(" ");
2561
2562        let type_str_upper = normalized_type.to_uppercase();
2563
2564        // Check if it's ARRAY<STRUCT<...>>
2565        let _is_array = type_str_upper.starts_with("ARRAY<");
2566        let struct_start = type_str_upper.find("STRUCT<");
2567
2568        if let Some(start_pos) = struct_start {
2569            // Extract STRUCT content
2570            let struct_content = &normalized_type[start_pos + 7..]; // Skip "STRUCT<"
2571
2572            // Find matching closing bracket, handling nested STRUCTs
2573            let mut depth = 1;
2574            let mut end_pos = None;
2575            for (i, ch) in struct_content.char_indices() {
2576                match ch {
2577                    '<' => depth += 1,
2578                    '>' => {
2579                        depth -= 1;
2580                        if depth == 0 {
2581                            end_pos = Some(i);
2582                            break;
2583                        }
2584                    }
2585                    _ => {}
2586                }
2587            }
2588
2589            // If no closing bracket found, try to infer it (handle malformed YAML)
2590            let struct_fields_str = if let Some(end) = end_pos {
2591                &struct_content[..end]
2592            } else {
2593                // Missing closing bracket - use everything up to the end
2594                struct_content.trim_end_matches('>').trim()
2595            };
2596
2597            // Parse fields: "ID: STRING, NAME: STRING, ..."
2598            let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
2599
2600            // Create nested columns, handling nested STRUCTs recursively
2601            for (nested_name, nested_type) in fields {
2602                let nested_type_upper = nested_type.to_uppercase();
2603                let nested_col_name = format!("{}.{}", field_name, nested_name);
2604
2605                // Check if this field is itself a STRUCT or ARRAY<STRUCT>
2606                if nested_type_upper.starts_with("STRUCT<") {
2607                    // Recursively parse nested STRUCT by creating a synthetic field_data
2608                    // and calling parse_struct_type_from_string recursively
2609                    let nested_struct_type_str = if nested_type_upper.starts_with("ARRAY<STRUCT<") {
2610                        // Handle ARRAY<STRUCT<...>>
2611                        nested_type.clone()
2612                    } else {
2613                        // Handle STRUCT<...>
2614                        nested_type.clone()
2615                    };
2616
2617                    // Recursively parse the nested STRUCT
2618                    match self.parse_struct_type_from_string(
2619                        &nested_col_name,
2620                        &nested_struct_type_str,
2621                        field_data,
2622                    ) {
2623                        Ok(nested_cols) => {
2624                            // Add all recursively parsed columns
2625                            columns.extend(nested_cols);
2626                        }
2627                        Err(_) => {
2628                            // If recursive parsing fails, add the parent column with STRUCT type
2629                            columns.push(Column {
2630                                name: nested_col_name,
2631                                data_type: normalize_data_type(&nested_type),
2632                                nullable: !field_data
2633                                    .get("required")
2634                                    .and_then(|v| v.as_bool())
2635                                    .unwrap_or(false),
2636                                primary_key: false,
2637                                secondary_key: false,
2638                                composite_key: None,
2639                                foreign_key: None,
2640                                constraints: Vec::new(),
2641                                description: field_data
2642                                    .get("description")
2643                                    .and_then(|v| v.as_str())
2644                                    .unwrap_or("")
2645                                    .to_string(),
2646                                errors: Vec::new(),
2647                                quality: Vec::new(),
2648                                ref_path: None,
2649                                enum_values: Vec::new(),
2650                                column_order: 0,
2651                            });
2652                        }
2653                    }
2654                } else {
2655                    // Simple nested field (not a STRUCT)
2656                    columns.push(Column {
2657                        name: nested_col_name,
2658                        data_type: normalize_data_type(&nested_type),
2659                        nullable: !field_data
2660                            .get("required")
2661                            .and_then(|v| v.as_bool())
2662                            .unwrap_or(false),
2663                        primary_key: false,
2664                        secondary_key: false,
2665                        composite_key: None,
2666                        foreign_key: None,
2667                        constraints: Vec::new(),
2668                        description: field_data
2669                            .get("description")
2670                            .and_then(|v| v.as_str())
2671                            .unwrap_or("")
2672                            .to_string(),
2673                        errors: Vec::new(),
2674                        quality: Vec::new(),
2675                        ref_path: None,
2676                        enum_values: Vec::new(),
2677                        column_order: 0,
2678                    });
2679                }
2680            }
2681
2682            return Ok(columns);
2683        }
2684
2685        // If no STRUCT found, return empty (fallback to regular parsing)
2686        Ok(Vec::new())
2687    }
2688
2689    /// Parse STRUCT fields from string (e.g., "ID: STRING, NAME: STRING").
2690    fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
2691        let mut fields = Vec::new();
2692        let mut current_field = String::new();
2693        let mut depth = 0;
2694        let mut in_string = false;
2695        let mut string_char = None;
2696
2697        for ch in fields_str.chars() {
2698            match ch {
2699                '\'' | '"' if !in_string || Some(ch) == string_char => {
2700                    if in_string {
2701                        in_string = false;
2702                        string_char = None;
2703                    } else {
2704                        in_string = true;
2705                        string_char = Some(ch);
2706                    }
2707                    current_field.push(ch);
2708                }
2709                '<' if !in_string => {
2710                    depth += 1;
2711                    current_field.push(ch);
2712                }
2713                '>' if !in_string => {
2714                    depth -= 1;
2715                    current_field.push(ch);
2716                }
2717                ',' if !in_string && depth == 0 => {
2718                    // End of current field
2719                    let trimmed = current_field.trim();
2720                    if !trimmed.is_empty()
2721                        && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2722                    {
2723                        fields.push((name, type_part));
2724                    }
2725                    current_field.clear();
2726                }
2727                _ => {
2728                    current_field.push(ch);
2729                }
2730            }
2731        }
2732
2733        // Handle last field
2734        let trimmed = current_field.trim();
2735        if !trimmed.is_empty()
2736            && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2737        {
2738            fields.push((name, type_part));
2739        }
2740
2741        Ok(fields)
2742    }
2743
2744    /// Parse a single field definition (e.g., "ID: STRING" or "ALERTOPERATION: STRUCT<...>").
2745    fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
2746        // Split by colon, but handle nested STRUCTs
2747        let colon_pos = field_def.find(':')?;
2748        let name = field_def[..colon_pos].trim().to_string();
2749        let type_part = field_def[colon_pos + 1..].trim().to_string();
2750
2751        if name.is_empty() || type_part.is_empty() {
2752            return None;
2753        }
2754
2755        Some((name, type_part))
2756    }
2757
2758    /// Extract catalog and schema from customProperties.
2759    fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
2760        let mut catalog_name = None;
2761        let mut schema_name = None;
2762
2763        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2764            for prop in custom_props {
2765                if let Some(prop_obj) = prop.as_object() {
2766                    let prop_key = prop_obj
2767                        .get("property")
2768                        .and_then(|v| v.as_str())
2769                        .unwrap_or("");
2770                    let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
2771
2772                    match prop_key {
2773                        "catalogName" | "catalog_name" => {
2774                            catalog_name = prop_value.map(|s| s.to_string());
2775                        }
2776                        "schemaName" | "schema_name" => {
2777                            schema_name = prop_value.map(|s| s.to_string());
2778                        }
2779                        _ => {}
2780                    }
2781                }
2782            }
2783        }
2784
2785        // Also check direct fields
2786        if catalog_name.is_none() {
2787            catalog_name = data
2788                .get("catalog_name")
2789                .and_then(|v| v.as_str())
2790                .map(|s| s.to_string());
2791        }
2792        if schema_name.is_none() {
2793            schema_name = data
2794                .get("schema_name")
2795                .and_then(|v| v.as_str())
2796                .map(|s| s.to_string());
2797        }
2798
2799        (catalog_name, schema_name)
2800    }
2801}
2802
2803impl Default for ODCSImporter {
2804    fn default() -> Self {
2805        Self::new()
2806    }
2807}
2808
2809/// Parser error structure for detailed error reporting.
2810#[derive(Debug, Clone)]
2811pub struct ParserError {
2812    pub error_type: String,
2813    pub field: String,
2814    pub message: String,
2815}
2816
2817/// Convert YAML Value to JSON Value for easier manipulation.
2818fn yaml_to_json_value(yaml: &serde_yaml::Value) -> Result<JsonValue> {
2819    // Convert YAML to JSON via serialization
2820    let json_str = serde_json::to_string(yaml).context("Failed to convert YAML to JSON")?;
2821    serde_json::from_str(&json_str).context("Failed to parse JSON")
2822}
2823
2824/// Convert JSON Value to serde_json::Value for storage in HashMap.
2825fn json_value_to_serde_value(value: &JsonValue) -> serde_json::Value {
2826    value.clone()
2827}
2828
2829/// Normalize data type to uppercase, preserving STRUCT<...>, ARRAY<...>, MAP<...> format.
2830fn normalize_data_type(data_type: &str) -> String {
2831    if data_type.is_empty() {
2832        return data_type.to_string();
2833    }
2834
2835    let upper = data_type.to_uppercase();
2836
2837    // Handle STRUCT<...>, ARRAY<...>, MAP<...> preserving inner content
2838    if upper.starts_with("STRUCT") {
2839        if let Some(start) = data_type.find('<')
2840            && let Some(end) = data_type.rfind('>')
2841        {
2842            let inner = &data_type[start + 1..end];
2843            return format!("STRUCT<{}>", inner);
2844        }
2845        return format!("STRUCT{}", &data_type[6..]);
2846    } else if upper.starts_with("ARRAY") {
2847        if let Some(start) = data_type.find('<')
2848            && let Some(end) = data_type.rfind('>')
2849        {
2850            let inner = &data_type[start + 1..end];
2851            return format!("ARRAY<{}>", inner);
2852        }
2853        return format!("ARRAY{}", &data_type[5..]);
2854    } else if upper.starts_with("MAP") {
2855        if let Some(start) = data_type.find('<')
2856            && let Some(end) = data_type.rfind('>')
2857        {
2858            let inner = &data_type[start + 1..end];
2859            return format!("MAP<{}>", inner);
2860        }
2861        return format!("MAP{}", &data_type[3..]);
2862    }
2863
2864    upper
2865}
2866
2867// Helper functions for enum parsing
2868fn parse_medallion_layer(s: &str) -> Result<MedallionLayer> {
2869    match s.to_uppercase().as_str() {
2870        "BRONZE" => Ok(MedallionLayer::Bronze),
2871        "SILVER" => Ok(MedallionLayer::Silver),
2872        "GOLD" => Ok(MedallionLayer::Gold),
2873        "OPERATIONAL" => Ok(MedallionLayer::Operational),
2874        _ => Err(anyhow::anyhow!("Unknown medallion layer: {}", s)),
2875    }
2876}
2877
2878fn parse_scd_pattern(s: &str) -> Result<SCDPattern> {
2879    match s.to_uppercase().as_str() {
2880        "TYPE_1" | "TYPE1" => Ok(SCDPattern::Type1),
2881        "TYPE_2" | "TYPE2" => Ok(SCDPattern::Type2),
2882        _ => Err(anyhow::anyhow!("Unknown SCD pattern: {}", s)),
2883    }
2884}
2885
2886fn parse_data_vault_classification(s: &str) -> Result<DataVaultClassification> {
2887    match s.to_uppercase().as_str() {
2888        "HUB" => Ok(DataVaultClassification::Hub),
2889        "LINK" => Ok(DataVaultClassification::Link),
2890        "SATELLITE" | "SAT" => Ok(DataVaultClassification::Satellite),
2891        _ => Err(anyhow::anyhow!("Unknown Data Vault classification: {}", s)),
2892    }
2893}
2894
2895#[cfg(test)]
2896mod tests {
2897    use super::*;
2898
2899    #[test]
2900    fn test_parse_simple_odcl_table() {
2901        let mut parser = ODCSImporter::new();
2902        let odcl_yaml = r#"
2903name: users
2904columns:
2905  - name: id
2906    data_type: INT
2907    nullable: false
2908    primary_key: true
2909  - name: name
2910    data_type: VARCHAR(255)
2911    nullable: false
2912database_type: Postgres
2913"#;
2914
2915        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2916        assert_eq!(table.name, "users");
2917        assert_eq!(table.columns.len(), 2);
2918        assert_eq!(table.columns[0].name, "id");
2919        assert_eq!(table.database_type, Some(DatabaseType::Postgres));
2920        assert_eq!(errors.len(), 0);
2921    }
2922
2923    #[test]
2924    fn test_parse_odcl_with_metadata() {
2925        let mut parser = ODCSImporter::new();
2926        let odcl_yaml = r#"
2927name: users
2928columns:
2929  - name: id
2930    data_type: INT
2931medallion_layer: gold
2932scd_pattern: TYPE_2
2933odcl_metadata:
2934  description: "User table"
2935  owner: "data-team"
2936"#;
2937
2938        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2939        assert_eq!(table.medallion_layers.len(), 1);
2940        assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
2941        assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
2942        if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
2943            assert_eq!(desc, "User table");
2944        }
2945        assert_eq!(errors.len(), 0);
2946    }
2947
2948    #[test]
2949    fn test_parse_odcl_with_data_vault() {
2950        let mut parser = ODCSImporter::new();
2951        let odcl_yaml = r#"
2952name: hub_customer
2953columns:
2954  - name: customer_key
2955    data_type: VARCHAR(50)
2956data_vault_classification: Hub
2957"#;
2958
2959        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2960        assert_eq!(
2961            table.data_vault_classification,
2962            Some(DataVaultClassification::Hub)
2963        );
2964        assert_eq!(errors.len(), 0);
2965    }
2966
2967    #[test]
2968    fn test_parse_invalid_odcl() {
2969        let mut parser = ODCSImporter::new();
2970        let invalid_yaml = "not: valid: yaml: structure:";
2971
2972        // Should fail to parse YAML
2973        assert!(parser.parse(invalid_yaml).is_err());
2974    }
2975
2976    #[test]
2977    fn test_parse_odcl_missing_required_fields() {
2978        let mut parser = ODCSImporter::new();
2979        let non_conformant = r#"
2980name: users
2981# Missing required columns field
2982"#;
2983
2984        // Should fail with missing columns
2985        assert!(parser.parse(non_conformant).is_err());
2986    }
2987
2988    #[test]
2989    fn test_parse_odcl_with_foreign_key() {
2990        let mut parser = ODCSImporter::new();
2991        let odcl_yaml = r#"
2992name: orders
2993columns:
2994  - name: id
2995    data_type: INT
2996    primary_key: true
2997  - name: user_id
2998    data_type: INT
2999    foreign_key:
3000      table_id: users
3001      column_name: id
3002"#;
3003
3004        let (table, errors) = parser.parse(odcl_yaml).unwrap();
3005        assert_eq!(table.columns.len(), 2);
3006        let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
3007        assert!(user_id_col.foreign_key.is_some());
3008        assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
3009        assert_eq!(errors.len(), 0);
3010    }
3011
3012    #[test]
3013    fn test_parse_odcl_with_constraints() {
3014        let mut parser = ODCSImporter::new();
3015        let odcl_yaml = r#"
3016name: products
3017columns:
3018  - name: id
3019    data_type: INT
3020    primary_key: true
3021  - name: name
3022    data_type: VARCHAR(255)
3023    nullable: false
3024    constraints:
3025      - UNIQUE
3026      - NOT NULL
3027"#;
3028
3029        let (table, errors) = parser.parse(odcl_yaml).unwrap();
3030        assert_eq!(table.columns.len(), 2);
3031        let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
3032        assert!(!name_col.nullable);
3033        assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
3034        assert_eq!(errors.len(), 0);
3035    }
3036}