data_modelling_sdk/import/
odcs.rs

1//! ODCS parser service for parsing Open Data Contract Standard YAML files.
2//!
3//! This service parses ODCS (Open Data Contract Standard) v3.1.0 and legacy ODCL (Data Contract Specification) YAML files
4//! and converts them to Table models. ODCL files are automatically converted to ODCS v3.1.0 format.
5//! Supports multiple formats:
6//! - ODCS v3.1.0 / v3.0.x format (apiVersion, kind, schema) - PRIMARY FORMAT
7//! - ODCL (Data Contract Specification) format (dataContractSpecification, models, definitions) - LEGACY, converted to ODCS
8//! - Simple ODCL format (name, columns) - LEGACY, converted to ODCS
9//! - Liquibase format
10
11use super::{ImportError, ImportResult, TableData};
12use crate::models::column::ForeignKey;
13use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
14use crate::models::{Column, Table, Tag};
15use anyhow::{Context, Result};
16use serde_json::Value as JsonValue;
17use std::collections::HashMap;
18use std::str::FromStr;
19use tracing::info;
20
21/// ODCS parser service for parsing Open Data Contract Standard YAML files.
22/// Handles ODCS v3.1.0 (primary format) and legacy ODCL formats (converted to ODCS).
23pub struct ODCSImporter {
24    /// Current YAML data for $ref resolution
25    current_yaml_data: Option<serde_yaml::Value>,
26}
27
28impl ODCSImporter {
29    /// Create a new ODCS parser instance.
30    ///
31    /// # Example
32    ///
33    /// ```rust
34    /// use data_modelling_sdk::import::odcs::ODCSImporter;
35    ///
36    /// let mut importer = ODCSImporter::new();
37    /// ```
38    pub fn new() -> Self {
39        Self {
40            current_yaml_data: None,
41        }
42    }
43
44    /// Import ODCS/ODCL YAML content and create Table (SDK interface).
45    ///
46    /// Supports ODCS v3.1.0 (primary), legacy ODCL formats (converted to ODCS), and Liquibase formats.
47    ///
48    /// # Arguments
49    ///
50    /// * `yaml_content` - ODCS/ODCL YAML content as a string
51    ///
52    /// # Returns
53    ///
54    /// An `ImportResult` containing the extracted table and any parse errors.
55    ///
56    /// # Example
57    ///
58    /// ```rust
59    /// use data_modelling_sdk::import::odcs::ODCSImporter;
60    ///
61    /// let mut importer = ODCSImporter::new();
62    /// let yaml = r#"
63    /// apiVersion: v3.1.0
64    /// kind: DataContract
65    /// id: 550e8400-e29b-41d4-a716-446655440000
66    /// version: 1.0.0
67    /// name: users
68    /// schema:
69    ///   fields:
70    ///     - name: id
71    ///       type: bigint
72    /// "#;
73    /// let result = importer.import(yaml).unwrap();
74    /// assert_eq!(result.tables.len(), 1);
75    /// ```
76    pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
77        match self.parse(yaml_content) {
78            Ok((table, errors)) => {
79                let sdk_tables = vec![TableData {
80                    table_index: 0,
81                    name: Some(table.name.clone()),
82                    columns: table
83                        .columns
84                        .iter()
85                        .map(|c| super::ColumnData {
86                            name: c.name.clone(),
87                            data_type: c.data_type.clone(),
88                            nullable: c.nullable,
89                            primary_key: c.primary_key,
90                            description: if c.description.is_empty() {
91                                None
92                            } else {
93                                Some(c.description.clone())
94                            },
95                            quality: if c.quality.is_empty() {
96                                None
97                            } else {
98                                Some(c.quality.clone())
99                            },
100                            ref_path: c.ref_path.clone(),
101                        })
102                        .collect(),
103                }];
104                let sdk_errors: Vec<ImportError> = errors
105                    .iter()
106                    .map(|e| ImportError::ParseError(e.message.clone()))
107                    .collect();
108                Ok(ImportResult {
109                    tables: sdk_tables,
110                    tables_requiring_name: Vec::new(),
111                    errors: sdk_errors,
112                    ai_suggestions: None,
113                })
114            }
115            Err(e) => Err(ImportError::ParseError(e.to_string())),
116        }
117    }
118
119    /// Parse ODCS/ODCL YAML content and create Table (public method for native app use).
120    ///
121    /// This method returns the full Table object with all metadata, suitable for use in
122    /// native applications that need direct access to the parsed table structure.
123    /// For API use, prefer the `import()` method which returns ImportResult.
124    ///
125    /// # Returns
126    ///
127    /// Returns a tuple of (Table, list of errors/warnings).
128    /// Errors list is empty if parsing is successful.
129    pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
130        self.parse(yaml_content)
131    }
132
133    /// Parse ODCS/ODCL YAML content and create Table (internal method).
134    ///
135    /// Supports ODCS v3.1.0/v3.0.x (primary), ODCL Data Contract spec (legacy, converted to ODCS),
136    /// simple ODCL (legacy, converted to ODCS), and Liquibase formats.
137    ///
138    /// # Returns
139    ///
140    /// Returns a tuple of (Table, list of errors/warnings).
141    /// Errors list is empty if parsing is successful.
142    fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
143        // Errors are collected in helper functions, not here
144        let _errors: Vec<ParserError> = Vec::new();
145
146        // Parse YAML
147        let data: serde_yaml::Value =
148            serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
149
150        if data.is_null() {
151            return Err(anyhow::anyhow!("Empty YAML content"));
152        }
153
154        // Store current YAML data for $ref resolution
155        self.current_yaml_data = Some(data.clone());
156
157        // Convert to JSON Value for easier manipulation
158        let json_data = yaml_to_json_value(&data)?;
159
160        // Check format and parse accordingly
161        if self.is_liquibase_format(&json_data) {
162            return self.parse_liquibase(&json_data);
163        }
164
165        if self.is_odcl_v3_format(&json_data) {
166            return self.parse_odcl_v3(&json_data);
167        }
168
169        if self.is_data_contract_format(&json_data) {
170            return self.parse_data_contract(&json_data);
171        }
172
173        // Fall back to simple ODCL format
174        self.parse_simple_odcl(&json_data)
175    }
176
177    /// Resolve a $ref reference like '#/definitions/betAction'.
178    fn resolve_ref<'a>(&self, ref_str: &str, data: &'a JsonValue) -> Option<&'a JsonValue> {
179        if !ref_str.starts_with("#/") {
180            return None;
181        }
182
183        // Remove the leading '#/'
184        let path = &ref_str[2..];
185        let parts: Vec<&str> = path.split('/').collect();
186
187        // Navigate through the data structure
188        let mut current = data;
189        for part in parts {
190            current = current.get(part)?;
191        }
192
193        if current.is_object() {
194            Some(current)
195        } else {
196            None
197        }
198    }
199
200    /// Check if YAML is in Liquibase format.
201    fn is_liquibase_format(&self, data: &JsonValue) -> bool {
202        if data.get("databaseChangeLog").is_some() {
203            return true;
204        }
205        // Check for Liquibase-specific keys
206        if let Some(obj) = data.as_object() {
207            let obj_str = format!("{:?}", obj);
208            if obj_str.contains("changeSet") {
209                return true;
210            }
211        }
212        false
213    }
214
215    /// Check if YAML is in ODCS v3.0.x format.
216    fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
217        if let Some(obj) = data.as_object() {
218            let has_api_version = obj.contains_key("apiVersion");
219            let has_kind = obj
220                .get("kind")
221                .and_then(|v| v.as_str())
222                .map(|s| s == "DataContract")
223                .unwrap_or(false);
224            let has_id = obj.contains_key("id");
225            let has_version = obj.contains_key("version");
226            return has_api_version && has_kind && has_id && has_version;
227        }
228        false
229    }
230
231    /// Check if YAML is in Data Contract specification format.
232    fn is_data_contract_format(&self, data: &JsonValue) -> bool {
233        if let Some(obj) = data.as_object() {
234            let has_spec = obj.contains_key("dataContractSpecification");
235            let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
236            return has_spec && has_models;
237        }
238        false
239    }
240
241    /// Parse simple ODCL format.
242    fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
243        let mut errors = Vec::new();
244
245        // Extract table name
246        let name = data
247            .get("name")
248            .and_then(|v| v.as_str())
249            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
250            .to_string();
251
252        // Extract columns
253        let columns_data = data
254            .get("columns")
255            .and_then(|v| v.as_array())
256            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
257
258        let mut columns = Vec::new();
259        for (idx, col_data) in columns_data.iter().enumerate() {
260            match self.parse_column(col_data) {
261                Ok(col) => columns.push(col),
262                Err(e) => {
263                    errors.push(ParserError {
264                        error_type: "column_parse_error".to_string(),
265                        field: format!("columns[{}]", idx),
266                        message: e.to_string(),
267                    });
268                }
269            }
270        }
271
272        // Extract metadata
273        let database_type = self.extract_database_type(data);
274        let medallion_layers = self.extract_medallion_layers(data);
275        let scd_pattern = self.extract_scd_pattern(data);
276        let data_vault_classification = self.extract_data_vault_classification(data);
277        let quality_rules = self.extract_quality_rules(data);
278
279        // Validate pattern exclusivity
280        if scd_pattern.is_some() && data_vault_classification.is_some() {
281            errors.push(ParserError {
282                error_type: "validation_error".to_string(),
283                field: "patterns".to_string(),
284                message: "SCD pattern and Data Vault classification are mutually exclusive"
285                    .to_string(),
286            });
287        }
288
289        // Extract odcl_metadata
290        let mut odcl_metadata = HashMap::new();
291        if let Some(metadata) = data.get("odcl_metadata")
292            && let Some(obj) = metadata.as_object()
293        {
294            for (key, value) in obj {
295                odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
296            }
297        }
298
299        let table_uuid = self.extract_table_uuid(data);
300
301        let table = Table {
302            id: table_uuid,
303            name,
304            columns,
305            database_type,
306            catalog_name: None,
307            schema_name: None,
308            medallion_layers,
309            scd_pattern,
310            data_vault_classification,
311            modeling_level: None,
312            tags: Vec::<Tag>::new(),
313            odcl_metadata,
314            owner: None,
315            sla: None,
316            contact_details: None,
317            infrastructure_type: None,
318            notes: None,
319            position: None,
320            yaml_file_path: None,
321            drawio_cell_id: None,
322            quality: quality_rules,
323            errors: Vec::new(),
324            created_at: chrono::Utc::now(),
325            updated_at: chrono::Utc::now(),
326        };
327
328        info!("Parsed ODCL table: {}", table.name);
329        Ok((table, errors))
330    }
331
332    /// Parse a single column definition.
333    fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
334        let name = col_data
335            .get("name")
336            .and_then(|v| v.as_str())
337            .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
338            .to_string();
339
340        let data_type = col_data
341            .get("data_type")
342            .and_then(|v| v.as_str())
343            .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
344            .to_string();
345
346        // Normalize data_type to uppercase (preserve STRUCT<...> format)
347        let data_type = normalize_data_type(&data_type);
348
349        let nullable = col_data
350            .get("nullable")
351            .and_then(|v| v.as_bool())
352            .unwrap_or(true);
353
354        let primary_key = col_data
355            .get("primary_key")
356            .and_then(|v| v.as_bool())
357            .unwrap_or(false);
358
359        let foreign_key = col_data
360            .get("foreign_key")
361            .and_then(|v| self.parse_foreign_key(v));
362
363        let constraints = col_data
364            .get("constraints")
365            .and_then(|v| v.as_array())
366            .map(|arr| {
367                arr.iter()
368                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
369                    .collect()
370            })
371            .unwrap_or_default();
372
373        let description = col_data
374            .get("description")
375            .and_then(|v| v.as_str())
376            .map(|s| s.to_string())
377            .unwrap_or_default();
378
379        // Extract column-level quality rules
380        let mut column_quality_rules = Vec::new();
381        if let Some(quality_val) = col_data.get("quality") {
382            if let Some(arr) = quality_val.as_array() {
383                // Array of quality rules
384                for item in arr {
385                    if let Some(obj) = item.as_object() {
386                        let mut rule = HashMap::new();
387                        for (key, value) in obj {
388                            rule.insert(key.clone(), json_value_to_serde_value(value));
389                        }
390                        column_quality_rules.push(rule);
391                    }
392                }
393            } else if let Some(obj) = quality_val.as_object() {
394                // Single quality rule object
395                let mut rule = HashMap::new();
396                for (key, value) in obj {
397                    rule.insert(key.clone(), json_value_to_serde_value(value));
398                }
399                column_quality_rules.push(rule);
400            }
401        }
402
403        // If nullable=false (required=true), add a "not_null" quality rule if not already present
404        if !nullable {
405            let has_not_null = column_quality_rules.iter().any(|rule| {
406                rule.get("type")
407                    .and_then(|v| v.as_str())
408                    .map(|s| {
409                        s.to_lowercase().contains("not_null")
410                            || s.to_lowercase().contains("notnull")
411                    })
412                    .unwrap_or(false)
413            });
414            if !has_not_null {
415                let mut not_null_rule = HashMap::new();
416                not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
417                not_null_rule.insert(
418                    "description".to_string(),
419                    serde_json::json!("Column must not be null"),
420                );
421                column_quality_rules.push(not_null_rule);
422            }
423        }
424
425        Ok(Column {
426            name,
427            data_type,
428            nullable,
429            primary_key,
430            secondary_key: false,
431            composite_key: None,
432            foreign_key,
433            constraints,
434            description,
435            errors: Vec::new(),
436            quality: column_quality_rules,
437            ref_path: None,
438            enum_values: Vec::new(),
439            column_order: 0,
440        })
441    }
442
443    /// Parse foreign key from JSON value.
444    fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
445        let obj = fk_data.as_object()?;
446        Some(ForeignKey {
447            table_id: obj
448                .get("table_id")
449                .or_else(|| obj.get("table"))
450                .and_then(|v| v.as_str())
451                .unwrap_or("")
452                .to_string(),
453            column_name: obj
454                .get("column_name")
455                .or_else(|| obj.get("column"))
456                .and_then(|v| v.as_str())
457                .unwrap_or("")
458                .to_string(),
459        })
460    }
461
462    /// Extract database type from data.
463    fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
464        data.get("database_type")
465            .and_then(|v| v.as_str())
466            .and_then(|s| match s.to_uppercase().as_str() {
467                "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
468                "MYSQL" => Some(DatabaseType::Mysql),
469                "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
470                "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
471                "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
472                _ => None,
473            })
474    }
475
476    /// Extract medallion layers from data.
477    fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
478        let mut layers = Vec::new();
479
480        // Check plural form first
481        if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
482            for item in arr {
483                if let Some(s) = item.as_str()
484                    && let Ok(layer) = parse_medallion_layer(s)
485                {
486                    layers.push(layer);
487                }
488            }
489        }
490        // Check singular form (backward compatibility)
491        else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
492            && let Ok(layer) = parse_medallion_layer(s)
493        {
494            layers.push(layer);
495        }
496
497        layers
498    }
499
500    /// Extract SCD pattern from data.
501    fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
502        data.get("scd_pattern")
503            .and_then(|v| v.as_str())
504            .and_then(|s| parse_scd_pattern(s).ok())
505    }
506
507    /// Extract Data Vault classification from data.
508    fn extract_data_vault_classification(
509        &self,
510        data: &JsonValue,
511    ) -> Option<DataVaultClassification> {
512        data.get("data_vault_classification")
513            .and_then(|v| v.as_str())
514            .and_then(|s| parse_data_vault_classification(s).ok())
515    }
516
517    /// Extract quality rules from data.
518    fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
519        use serde_json::Value;
520        let mut quality_rules = Vec::new();
521
522        // Check for quality field at root level (array of objects or single object)
523        if let Some(quality_val) = data.get("quality") {
524            if let Some(arr) = quality_val.as_array() {
525                // Array of quality rules
526                for item in arr {
527                    if let Some(obj) = item.as_object() {
528                        let mut rule = HashMap::new();
529                        for (key, value) in obj {
530                            rule.insert(key.clone(), json_value_to_serde_value(value));
531                        }
532                        quality_rules.push(rule);
533                    }
534                }
535            } else if let Some(obj) = quality_val.as_object() {
536                // Single quality rule object
537                let mut rule = HashMap::new();
538                for (key, value) in obj {
539                    rule.insert(key.clone(), json_value_to_serde_value(value));
540                }
541                quality_rules.push(rule);
542            } else if let Some(s) = quality_val.as_str() {
543                // Simple string quality value
544                let mut rule = HashMap::new();
545                rule.insert("value".to_string(), Value::String(s.to_string()));
546                quality_rules.push(rule);
547            }
548        }
549
550        // Check for quality in metadata (ODCL v3 format)
551        if let Some(metadata) = data.get("metadata")
552            && let Some(metadata_obj) = metadata.as_object()
553            && let Some(quality_val) = metadata_obj.get("quality")
554        {
555            if let Some(arr) = quality_val.as_array() {
556                // Array of quality rules
557                for item in arr {
558                    if let Some(obj) = item.as_object() {
559                        let mut rule = HashMap::new();
560                        for (key, value) in obj {
561                            rule.insert(key.clone(), json_value_to_serde_value(value));
562                        }
563                        quality_rules.push(rule);
564                    }
565                }
566            } else if let Some(obj) = quality_val.as_object() {
567                // Single quality rule object
568                let mut rule = HashMap::new();
569                for (key, value) in obj {
570                    rule.insert(key.clone(), json_value_to_serde_value(value));
571                }
572                quality_rules.push(rule);
573            } else if let Some(s) = quality_val.as_str() {
574                // Simple string quality value
575                let mut rule = HashMap::new();
576                rule.insert("value".to_string(), Value::String(s.to_string()));
577                quality_rules.push(rule);
578            }
579        }
580
581        // Check for tblproperties field (similar to SQL TBLPROPERTIES)
582        if let Some(tblprops) = data.get("tblproperties")
583            && let Some(obj) = tblprops.as_object()
584        {
585            for (key, value) in obj {
586                let mut rule = HashMap::new();
587                rule.insert("property".to_string(), Value::String(key.clone()));
588                rule.insert("value".to_string(), json_value_to_serde_value(value));
589                quality_rules.push(rule);
590            }
591        }
592
593        quality_rules
594    }
595
596    /// Parse Liquibase YAML changelog format.
597    ///
598    /// Extracts the first `createTable` change from a Liquibase changelog.
599    /// Supports both direct column definitions and nested column structures.
600    ///
601    /// # Supported Format
602    ///
603    fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
604        // Supported Liquibase YAML changelog format:
605        // databaseChangeLog:
606        //   - changeSet:
607        //       - createTable:
608        //           tableName: my_table
609        //           columns:
610        //             - column:
611        //                 name: id
612        //                 type: int
613        //                 constraints:
614        //                   primaryKey: true
615        //                   nullable: false
616
617        let mut errors = Vec::new();
618
619        let changelog = data
620            .get("databaseChangeLog")
621            .and_then(|v| v.as_array())
622            .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
623
624        // Find first createTable change.
625        let mut table_name: Option<String> = None;
626        let mut columns: Vec<crate::models::column::Column> = Vec::new();
627
628        for entry in changelog {
629            // Entries are typically maps like { changeSet: [...] } or { changeSet: { changes: [...] } }
630            if let Some(change_set) = entry.get("changeSet") {
631                // Liquibase YAML can represent changeSet as map or list.
632                let changes = if let Some(obj) = change_set.as_object() {
633                    obj.get("changes")
634                        .and_then(|v| v.as_array())
635                        .cloned()
636                        .unwrap_or_default()
637                } else if let Some(arr) = change_set.as_array() {
638                    // Some variants encode changes directly as list entries inside changeSet.
639                    arr.clone()
640                } else {
641                    Vec::new()
642                };
643
644                for ch in changes {
645                    let create = ch.get("createTable").or_else(|| ch.get("create_table"));
646                    if let Some(create) = create {
647                        table_name = create
648                            .get("tableName")
649                            .or_else(|| create.get("table_name"))
650                            .and_then(|v| v.as_str())
651                            .map(|s| s.to_string());
652
653                        // columns: [ { column: { ... } }, ... ]
654                        if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
655                            for col_entry in cols {
656                                let col = col_entry.get("column").unwrap_or(col_entry);
657                                let name = col
658                                    .get("name")
659                                    .and_then(|v| v.as_str())
660                                    .unwrap_or("")
661                                    .to_string();
662                                let data_type = col
663                                    .get("type")
664                                    .and_then(|v| v.as_str())
665                                    .unwrap_or("")
666                                    .to_string();
667
668                                if name.is_empty() {
669                                    errors.push(ParserError {
670                                        error_type: "validation_error".to_string(),
671                                        field: "columns.name".to_string(),
672                                        message: "Liquibase createTable column missing name"
673                                            .to_string(),
674                                    });
675                                    continue;
676                                }
677
678                                let mut column =
679                                    crate::models::column::Column::new(name, data_type);
680
681                                if let Some(constraints) =
682                                    col.get("constraints").and_then(|v| v.as_object())
683                                {
684                                    if let Some(pk) =
685                                        constraints.get("primaryKey").and_then(|v| v.as_bool())
686                                    {
687                                        column.primary_key = pk;
688                                    }
689                                    if let Some(nullable) =
690                                        constraints.get("nullable").and_then(|v| v.as_bool())
691                                    {
692                                        column.nullable = nullable;
693                                    }
694                                }
695
696                                columns.push(column);
697                            }
698                        }
699
700                        // parse_table() returns a single Table, so we parse the first createTable.
701                        // If multiple tables are needed, call parse_table() multiple times or use import().
702                        break;
703                    }
704                }
705            }
706            if table_name.is_some() {
707                break;
708            }
709        }
710
711        let table_name = table_name
712            .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
713        let table = Table::new(table_name, columns);
714        // Preserve any errors collected.
715        Ok((table, errors))
716    }
717
718    /// Parse ODCS v3.0.x format.
719    fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
720        let mut errors = Vec::new();
721
722        // Extract table name
723        let table_name = data
724            .get("name")
725            .and_then(|v| v.as_str())
726            .map(|s| s.to_string())
727            .or_else(|| {
728                // Try to get from schema array
729                data.get("schema")
730                    .and_then(|v| v.as_array())
731                    .and_then(|arr| arr.first())
732                    .and_then(|obj| obj.as_object())
733                    .and_then(|obj| obj.get("name"))
734                    .and_then(|v| v.as_str())
735                    .map(|s| s.to_string())
736            })
737            .ok_or_else(|| {
738                anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
739            })?;
740
741        // Extract schema - ODCS v3.0.x uses array of SchemaObject
742        let schema = data
743            .get("schema")
744            .and_then(|v| v.as_array())
745            .ok_or_else(|| {
746                errors.push(ParserError {
747                    error_type: "validation_error".to_string(),
748                    field: "schema".to_string(),
749                    message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
750                });
751                anyhow::anyhow!("Missing schema")
752            });
753
754        let schema = match schema {
755            Ok(s) if s.is_empty() => {
756                errors.push(ParserError {
757                    error_type: "validation_error".to_string(),
758                    field: "schema".to_string(),
759                    message: "ODCS v3.0.x schema array is empty".to_string(),
760                });
761                let quality_rules = self.extract_quality_rules(data);
762                let table_uuid = self.extract_table_uuid(data);
763                let table = Table {
764                    id: table_uuid,
765                    name: table_name,
766                    columns: Vec::new(),
767                    database_type: None,
768                    catalog_name: None,
769                    schema_name: None,
770                    medallion_layers: Vec::new(),
771                    scd_pattern: None,
772                    data_vault_classification: None,
773                    modeling_level: None,
774                    tags: Vec::<Tag>::new(),
775                    odcl_metadata: HashMap::new(),
776                    owner: None,
777                    sla: None,
778                    contact_details: None,
779                    infrastructure_type: None,
780                    notes: None,
781                    position: None,
782                    yaml_file_path: None,
783                    drawio_cell_id: None,
784                    quality: quality_rules,
785                    errors: Vec::new(),
786                    created_at: chrono::Utc::now(),
787                    updated_at: chrono::Utc::now(),
788                };
789                return Ok((table, errors));
790            }
791            Ok(s) => s,
792            Err(_) => {
793                let quality_rules = self.extract_quality_rules(data);
794                let table_uuid = self.extract_table_uuid(data);
795                let table = Table {
796                    id: table_uuid,
797                    name: table_name,
798                    columns: Vec::new(),
799                    database_type: None,
800                    catalog_name: None,
801                    schema_name: None,
802                    medallion_layers: Vec::new(),
803                    scd_pattern: None,
804                    data_vault_classification: None,
805                    modeling_level: None,
806                    tags: Vec::<Tag>::new(),
807                    odcl_metadata: HashMap::new(),
808                    owner: None,
809                    sla: None,
810                    contact_details: None,
811                    infrastructure_type: None,
812                    notes: None,
813                    position: None,
814                    yaml_file_path: None,
815                    drawio_cell_id: None,
816                    quality: quality_rules,
817                    errors: Vec::new(),
818                    created_at: chrono::Utc::now(),
819                    updated_at: chrono::Utc::now(),
820                };
821                return Ok((table, errors));
822            }
823        };
824
825        // Get the first schema object (table)
826        let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
827            errors.push(ParserError {
828                error_type: "validation_error".to_string(),
829                field: "schema[0]".to_string(),
830                message: "First schema object must be a dictionary".to_string(),
831            });
832            anyhow::anyhow!("Invalid schema object")
833        })?;
834
835        let object_name = schema_object
836            .get("name")
837            .and_then(|v| v.as_str())
838            .unwrap_or(&table_name);
839
840        // Handle both object format (v3.0.x) and array format (v3.1.0) for properties
841        let mut columns = Vec::new();
842
843        if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
844            // Object format (v3.0.x): properties is a map of name -> property object
845            for (prop_name, prop_data) in properties_obj {
846                if let Some(prop_obj) = prop_data.as_object() {
847                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
848                        Ok(mut cols) => columns.append(&mut cols),
849                        Err(e) => {
850                            errors.push(ParserError {
851                                error_type: "property_parse_error".to_string(),
852                                field: format!("Property '{}'", prop_name),
853                                message: e.to_string(),
854                            });
855                        }
856                    }
857                } else {
858                    errors.push(ParserError {
859                        error_type: "validation_error".to_string(),
860                        field: format!("Property '{}'", prop_name),
861                        message: format!("Property '{}' must be an object", prop_name),
862                    });
863                }
864            }
865        } else if let Some(properties_arr) =
866            schema_object.get("properties").and_then(|v| v.as_array())
867        {
868            // Array format (v3.1.0): properties is an array of property objects with 'name' field
869            for (idx, prop_data) in properties_arr.iter().enumerate() {
870                if let Some(prop_obj) = prop_data.as_object() {
871                    // Extract name from property object (required in v3.1.0)
872                    // ODCS v3.1.0 requires 'name' field, but we'll also accept 'id' as fallback
873                    let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
874                        Some(JsonValue::String(s)) => s.as_str(),
875                        _ => {
876                            // Skip properties without name or id (not valid ODCS v3.1.0)
877                            errors.push(ParserError {
878                                error_type: "validation_error".to_string(),
879                                field: format!("Property[{}]", idx),
880                                message: format!(
881                                    "Property[{}] missing required 'name' or 'id' field",
882                                    idx
883                                ),
884                            });
885                            continue;
886                        }
887                    };
888
889                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
890                        Ok(mut cols) => columns.append(&mut cols),
891                        Err(e) => {
892                            errors.push(ParserError {
893                                error_type: "property_parse_error".to_string(),
894                                field: format!("Property[{}] '{}'", idx, prop_name),
895                                message: e.to_string(),
896                            });
897                        }
898                    }
899                } else {
900                    errors.push(ParserError {
901                        error_type: "validation_error".to_string(),
902                        field: format!("Property[{}]", idx),
903                        message: format!("Property[{}] must be an object", idx),
904                    });
905                }
906            }
907        } else {
908            errors.push(ParserError {
909                error_type: "validation_error".to_string(),
910                field: format!("Object '{}'", object_name),
911                message: format!(
912                    "Object '{}' missing 'properties' field or properties is invalid",
913                    object_name
914                ),
915            });
916        }
917
918        // Extract metadata from customProperties
919        let (medallion_layers, scd_pattern, data_vault_classification, mut tags): (
920            _,
921            _,
922            _,
923            Vec<Tag>,
924        ) = self.extract_metadata_from_custom_properties(data);
925
926        // Extract sharedDomains from customProperties
927        let mut shared_domains: Vec<String> = Vec::new();
928        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
929            for prop in custom_props {
930                if let Some(prop_obj) = prop.as_object() {
931                    let prop_key = prop_obj
932                        .get("property")
933                        .and_then(|v| v.as_str())
934                        .unwrap_or("");
935                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
936                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
937                    {
938                        for item in arr {
939                            if let Some(s) = item.as_str() {
940                                shared_domains.push(s.to_string());
941                            }
942                        }
943                    }
944                }
945            }
946        }
947
948        // Extract tags from top-level tags field (if not already extracted from customProperties)
949        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
950            for item in tags_arr {
951                if let Some(s) = item.as_str() {
952                    // Parse tag string to Tag enum
953                    let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
954                    if !tags.contains(&tag) {
955                        tags.push(tag);
956                    }
957                }
958            }
959        }
960
961        // Extract database type from servers
962        let database_type = self.extract_database_type_from_odcl_v3_servers(data);
963
964        // Extract quality rules
965        let quality_rules = self.extract_quality_rules(data);
966
967        // Build ODCL metadata
968        let mut odcl_metadata = HashMap::new();
969        odcl_metadata.insert(
970            "apiVersion".to_string(),
971            json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
972        );
973        odcl_metadata.insert(
974            "kind".to_string(),
975            json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
976        );
977        odcl_metadata.insert(
978            "id".to_string(),
979            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
980        );
981        odcl_metadata.insert(
982            "version".to_string(),
983            json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
984        );
985        odcl_metadata.insert(
986            "status".to_string(),
987            json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
988        );
989
990        // Extract servicelevels if present
991        if let Some(servicelevels_val) = data.get("servicelevels") {
992            odcl_metadata.insert(
993                "servicelevels".to_string(),
994                json_value_to_serde_value(servicelevels_val),
995            );
996        }
997
998        // Extract links if present
999        if let Some(links_val) = data.get("links") {
1000            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1001        }
1002
1003        // Extract domain, dataProduct, tenant
1004        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1005            odcl_metadata.insert(
1006                "domain".to_string(),
1007                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1008            );
1009        }
1010        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1011            odcl_metadata.insert(
1012                "dataProduct".to_string(),
1013                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1014            );
1015        }
1016        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1017            odcl_metadata.insert(
1018                "tenant".to_string(),
1019                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1020            );
1021        }
1022
1023        // Extract top-level description (can be object or string)
1024        if let Some(desc_val) = data.get("description") {
1025            odcl_metadata.insert(
1026                "description".to_string(),
1027                json_value_to_serde_value(desc_val),
1028            );
1029        }
1030
1031        // Extract pricing
1032        if let Some(pricing_val) = data.get("pricing") {
1033            odcl_metadata.insert(
1034                "pricing".to_string(),
1035                json_value_to_serde_value(pricing_val),
1036            );
1037        }
1038
1039        // Extract team
1040        if let Some(team_val) = data.get("team") {
1041            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1042        }
1043
1044        // Extract roles
1045        if let Some(roles_val) = data.get("roles") {
1046            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1047        }
1048
1049        // Extract terms
1050        if let Some(terms_val) = data.get("terms") {
1051            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1052        }
1053
1054        // Extract full servers array (not just type)
1055        if let Some(servers_val) = data.get("servers") {
1056            odcl_metadata.insert(
1057                "servers".to_string(),
1058                json_value_to_serde_value(servers_val),
1059            );
1060        }
1061
1062        // Extract infrastructure
1063        if let Some(infrastructure_val) = data.get("infrastructure") {
1064            odcl_metadata.insert(
1065                "infrastructure".to_string(),
1066                json_value_to_serde_value(infrastructure_val),
1067            );
1068        }
1069
1070        // Store sharedDomains in metadata (extracted from customProperties above)
1071        if !shared_domains.is_empty() {
1072            let shared_domains_json: Vec<serde_json::Value> = shared_domains
1073                .iter()
1074                .map(|d| serde_json::Value::String(d.clone()))
1075                .collect();
1076            odcl_metadata.insert(
1077                "sharedDomains".to_string(),
1078                serde_json::Value::Array(shared_domains_json),
1079            );
1080        }
1081
1082        let table_uuid = self.extract_table_uuid(data);
1083
1084        let table = Table {
1085            id: table_uuid,
1086            name: table_name,
1087            columns,
1088            database_type,
1089            catalog_name: None,
1090            schema_name: None,
1091            medallion_layers,
1092            scd_pattern,
1093            data_vault_classification,
1094            modeling_level: None,
1095            tags,
1096            odcl_metadata,
1097            owner: None,
1098            sla: None,
1099            contact_details: None,
1100            infrastructure_type: None,
1101            notes: None,
1102            position: None,
1103            yaml_file_path: None,
1104            drawio_cell_id: None,
1105            quality: quality_rules,
1106            errors: Vec::new(),
1107            created_at: chrono::Utc::now(),
1108            updated_at: chrono::Utc::now(),
1109        };
1110
1111        info!(
1112            "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1113            table.name,
1114            errors.len()
1115        );
1116        Ok((table, errors))
1117    }
1118
1119    /// Parse a single property from ODCS v3.0.x format (similar to Data Contract field).
1120    fn parse_odcl_v3_property(
1121        &self,
1122        prop_name: &str,
1123        prop_data: &serde_json::Map<String, JsonValue>,
1124        data: &JsonValue,
1125    ) -> Result<Vec<Column>> {
1126        // Reuse Data Contract field parsing logic (they're similar)
1127        let mut errors = Vec::new();
1128        self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
1129    }
1130
1131    /// Extract table UUID from ODCS `id` field (standard) or fallback to customProperties/odcl_metadata (backward compatibility).
1132    /// Returns the UUID if found, otherwise generates a new one.
1133    fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
1134        // First check the top-level `id` field (ODCS spec: "A unique identifier used to reduce the risk of dataset name collisions, such as a UUID.")
1135        if let Some(id_val) = data.get("id")
1136            && let Some(id_str) = id_val.as_str()
1137        {
1138            if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1139                tracing::debug!(
1140                    "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
1141                    uuid
1142                );
1143                return uuid;
1144            } else {
1145                tracing::warn!(
1146                    "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
1147                    id_str
1148                );
1149            }
1150        }
1151
1152        // Backward compatibility: check customProperties for tableUuid (legacy format)
1153        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1154            for prop in custom_props {
1155                if let Some(prop_obj) = prop.as_object() {
1156                    let prop_key = prop_obj
1157                        .get("property")
1158                        .and_then(|v| v.as_str())
1159                        .unwrap_or("");
1160                    if prop_key == "tableUuid"
1161                        && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
1162                        && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1163                    {
1164                        tracing::debug!(
1165                            "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
1166                            uuid
1167                        );
1168                        return uuid;
1169                    }
1170                }
1171            }
1172        }
1173
1174        // Fallback: check odcl_metadata if present (legacy format)
1175        if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
1176            && let Some(uuid_val) = metadata.get("tableUuid")
1177            && let Some(uuid_str) = uuid_val.as_str()
1178            && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1179        {
1180            tracing::debug!(
1181                "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
1182                uuid
1183            );
1184            return uuid;
1185        }
1186
1187        // Generate deterministic UUID v5 if not found (based on table name)
1188        let table_name = data
1189            .get("name")
1190            .and_then(|v| v.as_str())
1191            .unwrap_or("unknown");
1192        let new_uuid = crate::models::table::Table::generate_id(
1193            table_name, None, // database_type not available here
1194            None, // catalog_name not available here
1195            None, // schema_name not available here
1196        );
1197        tracing::warn!(
1198            "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
1199            table_name,
1200            new_uuid
1201        );
1202        new_uuid
1203    }
1204
1205    /// Extract metadata from customProperties in ODCS v3.0.x format.
1206    fn extract_metadata_from_custom_properties(
1207        &self,
1208        data: &JsonValue,
1209    ) -> (
1210        Vec<MedallionLayer>,
1211        Option<SCDPattern>,
1212        Option<DataVaultClassification>,
1213        Vec<Tag>,
1214    ) {
1215        let mut medallion_layers = Vec::new();
1216        let mut scd_pattern = None;
1217        let mut data_vault_classification = None;
1218        let mut tags: Vec<Tag> = Vec::new();
1219
1220        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1221            for prop in custom_props {
1222                if let Some(prop_obj) = prop.as_object() {
1223                    let prop_key = prop_obj
1224                        .get("property")
1225                        .and_then(|v| v.as_str())
1226                        .unwrap_or("");
1227                    let prop_value = prop_obj.get("value");
1228
1229                    match prop_key {
1230                        "medallionLayers" | "medallion_layers" => {
1231                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1232                                for item in arr {
1233                                    if let Some(s) = item.as_str()
1234                                        && let Ok(layer) = parse_medallion_layer(s)
1235                                    {
1236                                        medallion_layers.push(layer);
1237                                    }
1238                                }
1239                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1240                                // Comma-separated string
1241                                for part in s.split(',') {
1242                                    if let Ok(layer) = parse_medallion_layer(part.trim()) {
1243                                        medallion_layers.push(layer);
1244                                    }
1245                                }
1246                            }
1247                        }
1248                        "scdPattern" | "scd_pattern" => {
1249                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1250                                scd_pattern = parse_scd_pattern(s).ok();
1251                            }
1252                        }
1253                        "dataVaultClassification" | "data_vault_classification" => {
1254                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1255                                data_vault_classification = parse_data_vault_classification(s).ok();
1256                            }
1257                        }
1258                        "tags" => {
1259                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1260                                for item in arr {
1261                                    if let Some(s) = item.as_str() {
1262                                        // Parse tag string to Tag enum
1263                                        if let Ok(tag) = Tag::from_str(s) {
1264                                            tags.push(tag);
1265                                        } else {
1266                                            tags.push(Tag::Simple(s.to_string()));
1267                                        }
1268                                    }
1269                                }
1270                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1271                                // Comma-separated string
1272                                for part in s.split(',') {
1273                                    let part = part.trim();
1274                                    if let Ok(tag) = Tag::from_str(part) {
1275                                        tags.push(tag);
1276                                    } else {
1277                                        tags.push(Tag::Simple(part.to_string()));
1278                                    }
1279                                }
1280                            }
1281                        }
1282                        "sharedDomains" | "shared_domains" => {
1283                            // sharedDomains will be stored in metadata by the caller
1284                            // This match is here for completeness but sharedDomains is handled separately
1285                        }
1286                        _ => {}
1287                    }
1288                }
1289            }
1290        }
1291
1292        // Also extract tags from top-level tags field
1293        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1294            for item in tags_arr {
1295                if let Some(s) = item.as_str() {
1296                    // Parse tag string to Tag enum
1297                    let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
1298                    if !tags.contains(&tag) {
1299                        tags.push(tag);
1300                    }
1301                }
1302            }
1303        }
1304
1305        (
1306            medallion_layers,
1307            scd_pattern,
1308            data_vault_classification,
1309            tags,
1310        )
1311    }
1312
1313    /// Extract database type from servers in ODCS v3.0.x format.
1314    fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
1315        // ODCS v3.0.x: servers is an array of Server objects
1316        if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
1317            && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
1318        {
1319            return server_obj
1320                .get("type")
1321                .and_then(|v| v.as_str())
1322                .and_then(|s| self.parse_database_type(s));
1323        }
1324        None
1325    }
1326
1327    /// Parse Data Contract format.
1328    fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1329        let mut errors = Vec::new();
1330
1331        // Extract models
1332        let models = data
1333            .get("models")
1334            .and_then(|v| v.as_object())
1335            .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
1336
1337        // parse_table() returns a single Table, so we parse the first model.
1338        // If multiple models are needed, call parse_table() multiple times or use import().
1339        let (model_name, model_data) = models
1340            .iter()
1341            .next()
1342            .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
1343
1344        let model_data = model_data
1345            .as_object()
1346            .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
1347
1348        // Extract fields (columns)
1349        let fields = model_data
1350            .get("fields")
1351            .and_then(|v| v.as_object())
1352            .ok_or_else(|| {
1353                errors.push(ParserError {
1354                    error_type: "validation_error".to_string(),
1355                    field: format!("Model '{}'", model_name),
1356                    message: format!("Model '{}' missing 'fields' field", model_name),
1357                });
1358                anyhow::anyhow!("Missing fields")
1359            });
1360
1361        let fields = match fields {
1362            Ok(f) => f,
1363            Err(_) => {
1364                // Return empty table with errors
1365                let quality_rules = self.extract_quality_rules(data);
1366                let table_uuid = self.extract_table_uuid(data);
1367                let table = Table {
1368                    id: table_uuid,
1369                    name: model_name.clone(),
1370                    columns: Vec::new(),
1371                    database_type: None,
1372                    catalog_name: None,
1373                    schema_name: None,
1374                    medallion_layers: Vec::new(),
1375                    scd_pattern: None,
1376                    data_vault_classification: None,
1377                    modeling_level: None,
1378                    tags: Vec::<Tag>::new(),
1379                    odcl_metadata: HashMap::new(),
1380                    owner: None,
1381                    sla: None,
1382                    contact_details: None,
1383                    infrastructure_type: None,
1384                    notes: None,
1385                    position: None,
1386                    yaml_file_path: None,
1387                    drawio_cell_id: None,
1388                    quality: quality_rules,
1389                    errors: Vec::new(),
1390                    created_at: chrono::Utc::now(),
1391                    updated_at: chrono::Utc::now(),
1392                };
1393                return Ok((table, errors));
1394            }
1395        };
1396
1397        // Parse fields as columns
1398        let mut columns = Vec::new();
1399        for (field_name, field_data) in fields {
1400            if let Some(field_obj) = field_data.as_object() {
1401                match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
1402                    Ok(mut cols) => columns.append(&mut cols),
1403                    Err(e) => {
1404                        errors.push(ParserError {
1405                            error_type: "field_parse_error".to_string(),
1406                            field: format!("Field '{}'", field_name),
1407                            message: e.to_string(),
1408                        });
1409                    }
1410                }
1411            } else {
1412                errors.push(ParserError {
1413                    error_type: "validation_error".to_string(),
1414                    field: format!("Field '{}'", field_name),
1415                    message: format!("Field '{}' must be an object", field_name),
1416                });
1417            }
1418        }
1419
1420        // Extract metadata from info section
1421        // The frontend expects info fields to be nested under "info" key
1422        let mut odcl_metadata = HashMap::new();
1423
1424        // Extract info section and nest it properly
1425        // Convert JsonValue object to serde_json::Value::Object (Map)
1426        if let Some(info_val) = data.get("info") {
1427            // Convert JsonValue to serde_json::Value
1428            let info_json_value = json_value_to_serde_value(info_val);
1429            odcl_metadata.insert("info".to_string(), info_json_value);
1430        }
1431
1432        odcl_metadata.insert(
1433            "dataContractSpecification".to_string(),
1434            json_value_to_serde_value(
1435                data.get("dataContractSpecification")
1436                    .unwrap_or(&JsonValue::Null),
1437            ),
1438        );
1439        odcl_metadata.insert(
1440            "id".to_string(),
1441            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1442        );
1443        // Note: model_name is not added to metadata as it's redundant (it's the table name)
1444
1445        // Extract servicelevels if present
1446        if let Some(servicelevels_val) = data.get("servicelevels") {
1447            odcl_metadata.insert(
1448                "servicelevels".to_string(),
1449                json_value_to_serde_value(servicelevels_val),
1450            );
1451        }
1452
1453        // Extract links if present
1454        if let Some(links_val) = data.get("links") {
1455            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1456        }
1457
1458        // Extract domain, dataProduct, tenant
1459        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1460            odcl_metadata.insert(
1461                "domain".to_string(),
1462                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1463            );
1464        }
1465        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1466            odcl_metadata.insert(
1467                "dataProduct".to_string(),
1468                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1469            );
1470        }
1471        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1472            odcl_metadata.insert(
1473                "tenant".to_string(),
1474                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1475            );
1476        }
1477
1478        // Extract top-level description (can be object or string)
1479        if let Some(desc_val) = data.get("description") {
1480            odcl_metadata.insert(
1481                "description".to_string(),
1482                json_value_to_serde_value(desc_val),
1483            );
1484        }
1485
1486        // Extract pricing
1487        if let Some(pricing_val) = data.get("pricing") {
1488            odcl_metadata.insert(
1489                "pricing".to_string(),
1490                json_value_to_serde_value(pricing_val),
1491            );
1492        }
1493
1494        // Extract team
1495        if let Some(team_val) = data.get("team") {
1496            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1497        }
1498
1499        // Extract roles
1500        if let Some(roles_val) = data.get("roles") {
1501            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1502        }
1503
1504        // Extract terms
1505        if let Some(terms_val) = data.get("terms") {
1506            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1507        }
1508
1509        // Extract full servers array (not just type)
1510        if let Some(servers_val) = data.get("servers") {
1511            odcl_metadata.insert(
1512                "servers".to_string(),
1513                json_value_to_serde_value(servers_val),
1514            );
1515        }
1516
1517        // Extract infrastructure
1518        if let Some(infrastructure_val) = data.get("infrastructure") {
1519            odcl_metadata.insert(
1520                "infrastructure".to_string(),
1521                json_value_to_serde_value(infrastructure_val),
1522            );
1523        }
1524
1525        // Extract database type from servers if available
1526        let database_type = self.extract_database_type_from_servers(data);
1527
1528        // Extract catalog and schema from customProperties
1529        let (catalog_name, schema_name) = self.extract_catalog_schema(data);
1530
1531        // Extract sharedDomains from customProperties
1532        let mut shared_domains: Vec<String> = Vec::new();
1533        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1534            for prop in custom_props {
1535                if let Some(prop_obj) = prop.as_object() {
1536                    let prop_key = prop_obj
1537                        .get("property")
1538                        .and_then(|v| v.as_str())
1539                        .unwrap_or("");
1540                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1541                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1542                    {
1543                        for item in arr {
1544                            if let Some(s) = item.as_str() {
1545                                shared_domains.push(s.to_string());
1546                            }
1547                        }
1548                    }
1549                }
1550            }
1551        }
1552
1553        // Extract tags from top-level tags field (Data Contract format)
1554        let mut tags: Vec<Tag> = Vec::new();
1555        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1556            for item in tags_arr {
1557                if let Some(s) = item.as_str() {
1558                    // Parse tag string to Tag enum (supports Simple, Pair, List formats)
1559                    if let Ok(tag) = Tag::from_str(s) {
1560                        tags.push(tag);
1561                    } else {
1562                        // Fallback: create Simple tag if parsing fails
1563                        tags.push(crate::models::Tag::Simple(s.to_string()));
1564                    }
1565                }
1566            }
1567        }
1568
1569        // Extract quality rules
1570        let quality_rules = self.extract_quality_rules(data);
1571
1572        // Store sharedDomains in metadata
1573        if !shared_domains.is_empty() {
1574            let shared_domains_json: Vec<serde_json::Value> = shared_domains
1575                .iter()
1576                .map(|d| serde_json::Value::String(d.clone()))
1577                .collect();
1578            odcl_metadata.insert(
1579                "sharedDomains".to_string(),
1580                serde_json::Value::Array(shared_domains_json),
1581            );
1582        }
1583
1584        let table_uuid = self.extract_table_uuid(data);
1585
1586        let table = Table {
1587            id: table_uuid,
1588            name: model_name.clone(),
1589            columns,
1590            database_type,
1591            catalog_name,
1592            schema_name,
1593            medallion_layers: Vec::new(),
1594            scd_pattern: None,
1595            data_vault_classification: None,
1596            modeling_level: None,
1597            tags,
1598            odcl_metadata,
1599            owner: None,
1600            sla: None,
1601            contact_details: None,
1602            infrastructure_type: None,
1603            notes: None,
1604            position: None,
1605            yaml_file_path: None,
1606            drawio_cell_id: None,
1607            quality: quality_rules,
1608            errors: Vec::new(),
1609            created_at: chrono::Utc::now(),
1610            updated_at: chrono::Utc::now(),
1611        };
1612
1613        info!(
1614            "Parsed Data Contract table: {} with {} warnings/errors",
1615            model_name,
1616            errors.len()
1617        );
1618        Ok((table, errors))
1619    }
1620
1621    /// Expand a nested column from a schema definition, creating columns with dot notation.
1622    ///
1623    /// This helper function recursively expands nested structures (OBJECT/STRUCT types)
1624    /// into flat columns with dot notation (e.g., "address.street", "address.city").
1625    #[allow(clippy::only_used_in_recursion)]
1626    fn expand_nested_column(
1627        &self,
1628        column_name: &str,
1629        schema: &JsonValue,
1630        nullable: bool,
1631        columns: &mut Vec<Column>,
1632        errors: &mut Vec<ParserError>,
1633    ) {
1634        let schema_obj = match schema.as_object() {
1635            Some(obj) => obj,
1636            None => {
1637                errors.push(ParserError {
1638                    error_type: "parse_error".to_string(),
1639                    field: column_name.to_string(),
1640                    message: "Nested schema must be an object".to_string(),
1641                });
1642                return;
1643            }
1644        };
1645
1646        let schema_type = schema_obj
1647            .get("type")
1648            .and_then(|v| v.as_str())
1649            .unwrap_or("object");
1650
1651        match schema_type {
1652            "object" | "struct" => {
1653                // Check if it has nested properties
1654                if let Some(properties) = schema_obj.get("properties").and_then(|v| v.as_object()) {
1655                    let nested_required: Vec<String> = schema_obj
1656                        .get("required")
1657                        .and_then(|v| v.as_array())
1658                        .map(|arr| {
1659                            arr.iter()
1660                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
1661                                .collect()
1662                        })
1663                        .unwrap_or_default();
1664
1665                    for (nested_name, nested_schema) in properties {
1666                        let nested_nullable = !nested_required.contains(nested_name);
1667                        self.expand_nested_column(
1668                            &format!("{}.{}", column_name, nested_name),
1669                            nested_schema,
1670                            nullable || nested_nullable,
1671                            columns,
1672                            errors,
1673                        );
1674                    }
1675                } else {
1676                    // Object without properties - create as OBJECT type
1677                    let description = schema_obj
1678                        .get("description")
1679                        .and_then(|v| v.as_str())
1680                        .unwrap_or("")
1681                        .to_string();
1682                    columns.push(Column {
1683                        name: column_name.to_string(),
1684                        data_type: "OBJECT".to_string(),
1685                        nullable,
1686                        primary_key: false,
1687                        secondary_key: false,
1688                        composite_key: None,
1689                        foreign_key: None,
1690                        constraints: Vec::new(),
1691                        description,
1692                        quality: Vec::new(),
1693                        ref_path: None,
1694                        enum_values: Vec::new(),
1695                        errors: Vec::new(),
1696                        column_order: 0,
1697                    });
1698                }
1699            }
1700            "array" => {
1701                // Handle array types
1702                let items = schema_obj.get("items").unwrap_or(schema);
1703                let items_type = items
1704                    .as_object()
1705                    .and_then(|obj| obj.get("type").and_then(|v| v.as_str()))
1706                    .unwrap_or("string");
1707
1708                if items_type == "object" || items_type == "struct" {
1709                    // Array of objects - expand nested structure
1710                    let description = schema_obj
1711                        .get("description")
1712                        .and_then(|v| v.as_str())
1713                        .unwrap_or("")
1714                        .to_string();
1715                    columns.push(Column {
1716                        name: column_name.to_string(),
1717                        data_type: "ARRAY<OBJECT>".to_string(),
1718                        nullable,
1719                        primary_key: false,
1720                        secondary_key: false,
1721                        composite_key: None,
1722                        foreign_key: None,
1723                        constraints: Vec::new(),
1724                        description,
1725                        quality: Vec::new(),
1726                        ref_path: None,
1727                        enum_values: Vec::new(),
1728                        errors: Vec::new(),
1729                        column_order: 0,
1730                    });
1731                    // Also expand nested properties with array prefix
1732                    if let Some(properties) = items
1733                        .as_object()
1734                        .and_then(|obj| obj.get("properties").and_then(|v| v.as_object()))
1735                    {
1736                        let nested_required: Vec<String> = items
1737                            .as_object()
1738                            .and_then(|obj| obj.get("required").and_then(|v| v.as_array()))
1739                            .map(|arr| {
1740                                arr.iter()
1741                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1742                                    .collect()
1743                            })
1744                            .unwrap_or_default();
1745
1746                        for (nested_name, nested_schema) in properties {
1747                            let nested_nullable = !nested_required.contains(nested_name);
1748                            self.expand_nested_column(
1749                                &format!("{}.{}", column_name, nested_name),
1750                                nested_schema,
1751                                nullable || nested_nullable,
1752                                columns,
1753                                errors,
1754                            );
1755                        }
1756                    }
1757                } else {
1758                    // Array of primitives
1759                    let data_type = format!("ARRAY<{}>", items_type.to_uppercase());
1760                    let description = schema_obj
1761                        .get("description")
1762                        .and_then(|v| v.as_str())
1763                        .unwrap_or("")
1764                        .to_string();
1765                    columns.push(Column {
1766                        name: column_name.to_string(),
1767                        data_type,
1768                        nullable,
1769                        primary_key: false,
1770                        secondary_key: false,
1771                        composite_key: None,
1772                        foreign_key: None,
1773                        constraints: Vec::new(),
1774                        description,
1775                        quality: Vec::new(),
1776                        ref_path: None,
1777                        enum_values: Vec::new(),
1778                        errors: Vec::new(),
1779                        column_order: 0,
1780                    });
1781                }
1782            }
1783            _ => {
1784                // Simple type
1785                let data_type = schema_type.to_uppercase();
1786                let description = schema_obj
1787                    .get("description")
1788                    .and_then(|v| v.as_str())
1789                    .unwrap_or("")
1790                    .to_string();
1791                let enum_values = schema_obj
1792                    .get("enum")
1793                    .and_then(|v| v.as_array())
1794                    .map(|arr| {
1795                        arr.iter()
1796                            .filter_map(|v| v.as_str().map(|s| s.to_string()))
1797                            .collect()
1798                    })
1799                    .unwrap_or_default();
1800                columns.push(Column {
1801                    name: column_name.to_string(),
1802                    data_type,
1803                    nullable,
1804                    primary_key: false,
1805                    secondary_key: false,
1806                    composite_key: None,
1807                    foreign_key: None,
1808                    constraints: Vec::new(),
1809                    description,
1810                    quality: Vec::new(),
1811                    ref_path: None,
1812                    enum_values,
1813                    errors: Vec::new(),
1814                    column_order: 0,
1815                });
1816            }
1817        }
1818    }
1819
1820    /// Parse a single field from Data Contract format.
1821    fn parse_data_contract_field(
1822        &self,
1823        field_name: &str,
1824        field_data: &serde_json::Map<String, JsonValue>,
1825        data: &JsonValue,
1826        errors: &mut Vec<ParserError>,
1827    ) -> Result<Vec<Column>> {
1828        let mut columns = Vec::new();
1829
1830        // Helper function to extract quality rules from a JSON object
1831        let extract_quality_from_obj =
1832            |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
1833                let mut quality_rules = Vec::new();
1834                if let Some(quality_val) = obj.get("quality") {
1835                    if let Some(arr) = quality_val.as_array() {
1836                        // Array of quality rules
1837                        for item in arr {
1838                            if let Some(rule_obj) = item.as_object() {
1839                                let mut rule = HashMap::new();
1840                                for (key, value) in rule_obj {
1841                                    rule.insert(key.clone(), json_value_to_serde_value(value));
1842                                }
1843                                quality_rules.push(rule);
1844                            }
1845                        }
1846                    } else if let Some(rule_obj) = quality_val.as_object() {
1847                        // Single quality rule object
1848                        let mut rule = HashMap::new();
1849                        for (key, value) in rule_obj {
1850                            rule.insert(key.clone(), json_value_to_serde_value(value));
1851                        }
1852                        quality_rules.push(rule);
1853                    }
1854                }
1855                quality_rules
1856            };
1857
1858        // Extract description from field_data (preserve empty strings)
1859        let description = field_data
1860            .get("description")
1861            .and_then(|v| v.as_str())
1862            .unwrap_or("")
1863            .to_string();
1864
1865        // Extract quality rules from field_data
1866        let mut quality_rules = extract_quality_from_obj(field_data);
1867
1868        // Check for $ref
1869        if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
1870            // Store ref_path (preserve even if definition doesn't exist)
1871            let ref_path = Some(ref_str.to_string());
1872
1873            if let Some(definition) = self.resolve_ref(ref_str, data) {
1874                // Merge quality rules from definition if field doesn't have any
1875
1876                // Also extract quality rules from definition and merge (if field doesn't have any)
1877                if quality_rules.is_empty() {
1878                    if let Some(def_obj) = definition.as_object() {
1879                        quality_rules = extract_quality_from_obj(def_obj);
1880                    }
1881                } else {
1882                    // Merge definition quality rules if field has some
1883                    if let Some(def_obj) = definition.as_object() {
1884                        let def_quality = extract_quality_from_obj(def_obj);
1885                        // Append definition quality rules (field-level takes precedence)
1886                        quality_rules.extend(def_quality);
1887                    }
1888                }
1889
1890                let required = field_data
1891                    .get("required")
1892                    .and_then(|v| v.as_bool())
1893                    .unwrap_or(false);
1894
1895                // If required=true, add not_null quality rule if not present
1896                if required {
1897                    let has_not_null = quality_rules.iter().any(|rule| {
1898                        rule.get("type")
1899                            .and_then(|v| v.as_str())
1900                            .map(|s| {
1901                                s.to_lowercase().contains("not_null")
1902                                    || s.to_lowercase().contains("notnull")
1903                            })
1904                            .unwrap_or(false)
1905                    });
1906                    if !has_not_null {
1907                        let mut not_null_rule = HashMap::new();
1908                        not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
1909                        not_null_rule.insert(
1910                            "description".to_string(),
1911                            serde_json::json!("Column must not be null"),
1912                        );
1913                        quality_rules.push(not_null_rule);
1914                    }
1915                }
1916
1917                // Check if definition is an object/struct with nested structure
1918                let has_nested = definition
1919                    .get("type")
1920                    .and_then(|v| v.as_str())
1921                    .map(|s| s == "object")
1922                    .unwrap_or(false)
1923                    || definition.get("properties").is_some()
1924                    || definition.get("fields").is_some();
1925
1926                if has_nested {
1927                    // Expand STRUCT from definition into nested columns with dot notation
1928                    if let Some(properties) =
1929                        definition.get("properties").and_then(|v| v.as_object())
1930                    {
1931                        // Recursively expand nested properties
1932                        let nested_required: Vec<String> = definition
1933                            .get("required")
1934                            .and_then(|v| v.as_array())
1935                            .map(|arr| {
1936                                arr.iter()
1937                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1938                                    .collect()
1939                            })
1940                            .unwrap_or_default();
1941
1942                        for (nested_name, nested_schema) in properties {
1943                            let nested_required_field = nested_required.contains(nested_name);
1944                            self.expand_nested_column(
1945                                &format!("{}.{}", field_name, nested_name),
1946                                nested_schema,
1947                                !nested_required_field,
1948                                &mut columns,
1949                                errors,
1950                            );
1951                        }
1952                    } else if let Some(fields) =
1953                        definition.get("fields").and_then(|v| v.as_object())
1954                    {
1955                        // Handle fields format (ODCL style)
1956                        for (nested_name, nested_schema) in fields {
1957                            self.expand_nested_column(
1958                                &format!("{}.{}", field_name, nested_name),
1959                                nested_schema,
1960                                true, // Assume nullable if not specified
1961                                &mut columns,
1962                                errors,
1963                            );
1964                        }
1965                    } else {
1966                        // Fallback: create parent column as OBJECT if we can't expand
1967                        columns.push(Column {
1968                            name: field_name.to_string(),
1969                            data_type: "OBJECT".to_string(),
1970                            nullable: !required,
1971                            primary_key: false,
1972                            secondary_key: false,
1973                            composite_key: None,
1974                            foreign_key: None,
1975                            constraints: Vec::new(),
1976                            description: if description.is_empty() {
1977                                definition
1978                                    .get("description")
1979                                    .and_then(|v| v.as_str())
1980                                    .unwrap_or("")
1981                                    .to_string()
1982                            } else {
1983                                description.clone()
1984                            },
1985                            errors: Vec::new(),
1986                            quality: quality_rules.clone(),
1987                            ref_path: ref_path.clone(),
1988                            enum_values: Vec::new(),
1989                            column_order: 0,
1990                        });
1991                    }
1992                } else {
1993                    // Simple type from definition
1994                    let def_type = definition
1995                        .get("type")
1996                        .and_then(|v| v.as_str())
1997                        .unwrap_or("STRING")
1998                        .to_uppercase();
1999
2000                    let enum_values = definition
2001                        .get("enum")
2002                        .and_then(|v| v.as_array())
2003                        .map(|arr| {
2004                            arr.iter()
2005                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
2006                                .collect()
2007                        })
2008                        .unwrap_or_default();
2009
2010                    columns.push(Column {
2011                        name: field_name.to_string(),
2012                        data_type: def_type,
2013                        nullable: !required,
2014                        primary_key: false,
2015                        secondary_key: false,
2016                        composite_key: None,
2017                        foreign_key: None,
2018                        constraints: Vec::new(),
2019                        description: if description.is_empty() {
2020                            definition
2021                                .get("description")
2022                                .and_then(|v| v.as_str())
2023                                .unwrap_or("")
2024                                .to_string()
2025                        } else {
2026                            description
2027                        },
2028                        errors: Vec::new(),
2029                        quality: quality_rules,
2030                        ref_path,
2031                        enum_values,
2032                        column_order: 0,
2033                    });
2034                }
2035                return Ok(columns);
2036            } else {
2037                // Undefined reference - create column with error
2038                let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
2039                let mut error_map = HashMap::new();
2040                error_map.insert("type".to_string(), serde_json::json!("validation_error"));
2041                error_map.insert("field".to_string(), serde_json::json!("data_type"));
2042                error_map.insert(
2043                    "message".to_string(),
2044                    serde_json::json!(format!(
2045                        "Field '{}' references undefined definition: {}",
2046                        field_name, ref_str
2047                    )),
2048                );
2049                col_errors.push(error_map);
2050                columns.push(Column {
2051                    name: field_name.to_string(),
2052                    data_type: "OBJECT".to_string(),
2053                    nullable: true,
2054                    primary_key: false,
2055                    secondary_key: false,
2056                    composite_key: None,
2057                    foreign_key: None,
2058                    constraints: Vec::new(),
2059                    description,
2060                    errors: col_errors,
2061                    quality: Vec::new(),
2062                    ref_path: Some(ref_str.to_string()), // Preserve ref_path even if undefined
2063                    enum_values: Vec::new(),
2064                    column_order: 0,
2065                });
2066                return Ok(columns);
2067            }
2068        }
2069
2070        // Extract field type - default to STRING if missing
2071        let field_type_str = field_data
2072            .get("type")
2073            .and_then(|v| v.as_str())
2074            .unwrap_or("STRING");
2075
2076        // Check if type contains STRUCT definition (multiline STRUCT type)
2077        if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
2078            match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
2079                Ok(nested_cols) if !nested_cols.is_empty() => {
2080                    // We have nested columns - add parent column with full type, then nested columns
2081                    let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
2082                        "ARRAY<STRUCT<...>>".to_string()
2083                    } else {
2084                        "STRUCT<...>".to_string()
2085                    };
2086
2087                    // Add parent column
2088                    columns.push(Column {
2089                        name: field_name.to_string(),
2090                        data_type: parent_data_type,
2091                        nullable: !field_data
2092                            .get("required")
2093                            .and_then(|v| v.as_bool())
2094                            .unwrap_or(false),
2095                        primary_key: false,
2096                        secondary_key: false,
2097                        composite_key: None,
2098                        foreign_key: None,
2099                        constraints: Vec::new(),
2100                        description: description.clone(),
2101                        errors: Vec::new(),
2102                        quality: quality_rules.clone(),
2103                        ref_path: field_data
2104                            .get("$ref")
2105                            .and_then(|v| v.as_str())
2106                            .map(|s| s.to_string()),
2107                        enum_values: Vec::new(),
2108                        column_order: 0,
2109                    });
2110
2111                    // Add nested columns
2112                    columns.extend(nested_cols);
2113                    return Ok(columns);
2114                }
2115                Ok(_) | Err(_) => {
2116                    // If parsing fails or returns empty, fall back to using the type as-is
2117                }
2118            }
2119        }
2120
2121        let field_type = normalize_data_type(field_type_str);
2122
2123        // Handle ARRAY type
2124        if field_type == "ARRAY" {
2125            let items = field_data.get("items");
2126            if let Some(items_val) = items {
2127                if let Some(items_obj) = items_val.as_object() {
2128                    // Check if items is an object with fields (nested structure)
2129                    if items_obj.get("fields").is_some()
2130                        || items_obj.get("type").and_then(|v| v.as_str()) == Some("object")
2131                    {
2132                        // Array of objects - create parent column as ARRAY<OBJECT>
2133                        columns.push(Column {
2134                            name: field_name.to_string(),
2135                            data_type: "ARRAY<OBJECT>".to_string(),
2136                            nullable: !field_data
2137                                .get("required")
2138                                .and_then(|v| v.as_bool())
2139                                .unwrap_or(false),
2140                            primary_key: false,
2141                            secondary_key: false,
2142                            composite_key: None,
2143                            foreign_key: None,
2144                            constraints: Vec::new(),
2145                            description: field_data
2146                                .get("description")
2147                                .and_then(|v| v.as_str())
2148                                .unwrap_or("")
2149                                .to_string(),
2150                            errors: Vec::new(),
2151                            quality: Vec::new(),
2152                            ref_path: None,
2153                            enum_values: Vec::new(),
2154                            column_order: 0,
2155                        });
2156
2157                        // Extract nested fields from items.properties or items.fields if present
2158                        // Note: Export saves nested columns in properties.properties, but some formats use fields
2159                        let nested_fields_obj = items_obj
2160                            .get("properties")
2161                            .and_then(|v| v.as_object())
2162                            .or_else(|| items_obj.get("fields").and_then(|v| v.as_object()));
2163
2164                        if let Some(fields_obj) = nested_fields_obj {
2165                            for (nested_field_name, nested_field_data) in fields_obj {
2166                                if let Some(nested_field_obj) = nested_field_data.as_object() {
2167                                    let nested_field_type = nested_field_obj
2168                                        .get("type")
2169                                        .and_then(|v| v.as_str())
2170                                        .unwrap_or("STRING");
2171
2172                                    // Recursively parse nested fields with array prefix
2173                                    let nested_col_name =
2174                                        format!("{}.{}", field_name, nested_field_name);
2175                                    let mut local_errors = Vec::new();
2176                                    match self.parse_data_contract_field(
2177                                        &nested_col_name,
2178                                        nested_field_obj,
2179                                        data,
2180                                        &mut local_errors,
2181                                    ) {
2182                                        Ok(mut nested_cols) => {
2183                                            // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2184                                            // Otherwise, it returns a single column
2185                                            columns.append(&mut nested_cols);
2186                                        }
2187                                        Err(_) => {
2188                                            // Fallback: create simple nested column
2189                                            columns.push(Column {
2190                                                name: nested_col_name,
2191                                                data_type: nested_field_type.to_uppercase(),
2192                                                nullable: !nested_field_obj
2193                                                    .get("required")
2194                                                    .and_then(|v| v.as_bool())
2195                                                    .unwrap_or(false),
2196                                                primary_key: false,
2197                                                secondary_key: false,
2198                                                composite_key: None,
2199                                                foreign_key: None,
2200                                                constraints: Vec::new(),
2201                                                description: nested_field_obj
2202                                                    .get("description")
2203                                                    .and_then(|v| v.as_str())
2204                                                    .unwrap_or("")
2205                                                    .to_string(),
2206                                                errors: Vec::new(),
2207                                                quality: Vec::new(),
2208                                                ref_path: None,
2209                                                enum_values: Vec::new(),
2210                                                column_order: 0,
2211                                            });
2212                                        }
2213                                    }
2214                                }
2215                            }
2216                        }
2217
2218                        return Ok(columns);
2219                    } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
2220                        // Array of simple type
2221                        columns.push(Column {
2222                            name: field_name.to_string(),
2223                            data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
2224                            nullable: !field_data
2225                                .get("required")
2226                                .and_then(|v| v.as_bool())
2227                                .unwrap_or(false),
2228                            primary_key: false,
2229                            secondary_key: false,
2230                            composite_key: None,
2231                            foreign_key: None,
2232                            constraints: Vec::new(),
2233                            description: description.clone(),
2234                            errors: Vec::new(),
2235                            quality: quality_rules.clone(),
2236                            ref_path: field_data
2237                                .get("$ref")
2238                                .and_then(|v| v.as_str())
2239                                .map(|s| s.to_string()),
2240                            enum_values: Vec::new(),
2241                            column_order: 0,
2242                        });
2243                        return Ok(columns);
2244                    }
2245                } else if let Some(item_type_str) = items_val.as_str() {
2246                    // Array of simple type (string)
2247                    columns.push(Column {
2248                        name: field_name.to_string(),
2249                        data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
2250                        nullable: !field_data
2251                            .get("required")
2252                            .and_then(|v| v.as_bool())
2253                            .unwrap_or(false),
2254                        primary_key: false,
2255                        secondary_key: false,
2256                        composite_key: None,
2257                        foreign_key: None,
2258                        constraints: Vec::new(),
2259                        description: description.clone(),
2260                        errors: Vec::new(),
2261                        quality: quality_rules.clone(),
2262                        ref_path: field_data
2263                            .get("$ref")
2264                            .and_then(|v| v.as_str())
2265                            .map(|s| s.to_string()),
2266                        enum_values: Vec::new(),
2267                        column_order: 0,
2268                    });
2269                    return Ok(columns);
2270                }
2271            }
2272            // Array without items - default to ARRAY<STRING>
2273            columns.push(Column {
2274                name: field_name.to_string(),
2275                data_type: "ARRAY<STRING>".to_string(),
2276                nullable: !field_data
2277                    .get("required")
2278                    .and_then(|v| v.as_bool())
2279                    .unwrap_or(false),
2280                primary_key: false,
2281                secondary_key: false,
2282                composite_key: None,
2283                foreign_key: None,
2284                constraints: Vec::new(),
2285                description: description.clone(),
2286                errors: Vec::new(),
2287                quality: quality_rules.clone(),
2288                ref_path: field_data
2289                    .get("$ref")
2290                    .and_then(|v| v.as_str())
2291                    .map(|s| s.to_string()),
2292                enum_values: Vec::new(),
2293                column_order: 0,
2294            });
2295            return Ok(columns);
2296        }
2297
2298        // Check if this is a nested object with fields or properties
2299        // Note: Export saves nested columns in properties.properties, but some formats use fields
2300        let nested_fields_obj = field_data
2301            .get("properties")
2302            .and_then(|v| v.as_object())
2303            .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
2304
2305        if field_type == "OBJECT"
2306            && let Some(fields_obj) = nested_fields_obj
2307        {
2308            // Inline nested object - create parent column as OBJECT and extract nested fields
2309
2310            // Create parent column
2311            columns.push(Column {
2312                name: field_name.to_string(),
2313                data_type: "OBJECT".to_string(),
2314                nullable: !field_data
2315                    .get("required")
2316                    .and_then(|v| v.as_bool())
2317                    .unwrap_or(false),
2318                primary_key: false,
2319                secondary_key: false,
2320                composite_key: None,
2321                foreign_key: None,
2322                constraints: Vec::new(),
2323                description: description.clone(),
2324                errors: Vec::new(),
2325                quality: quality_rules.clone(),
2326                ref_path: field_data
2327                    .get("$ref")
2328                    .and_then(|v| v.as_str())
2329                    .map(|s| s.to_string()),
2330                enum_values: Vec::new(),
2331                column_order: 0,
2332            });
2333
2334            // Extract nested fields recursively
2335            for (nested_field_name, nested_field_data) in fields_obj {
2336                if let Some(nested_field_obj) = nested_field_data.as_object() {
2337                    let nested_field_type = nested_field_obj
2338                        .get("type")
2339                        .and_then(|v| v.as_str())
2340                        .unwrap_or("STRING");
2341
2342                    // Recursively parse nested fields
2343                    let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2344                    match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data) {
2345                        Ok(mut nested_cols) => {
2346                            // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2347                            // Otherwise, it returns a single column
2348                            columns.append(&mut nested_cols);
2349                        }
2350                        Err(_) => {
2351                            // Fallback: create simple nested column
2352                            columns.push(Column {
2353                                name: nested_col_name,
2354                                data_type: nested_field_type.to_uppercase(),
2355                                nullable: !nested_field_obj
2356                                    .get("required")
2357                                    .and_then(|v| v.as_bool())
2358                                    .unwrap_or(false),
2359                                primary_key: false,
2360                                secondary_key: false,
2361                                composite_key: None,
2362                                foreign_key: None,
2363                                constraints: Vec::new(),
2364                                description: nested_field_obj
2365                                    .get("description")
2366                                    .and_then(|v| v.as_str())
2367                                    .unwrap_or("")
2368                                    .to_string(),
2369                                errors: Vec::new(),
2370                                quality: Vec::new(),
2371                                ref_path: None,
2372                                enum_values: Vec::new(),
2373                                column_order: 0,
2374                            });
2375                        }
2376                    }
2377                }
2378            }
2379
2380            return Ok(columns);
2381        }
2382
2383        // Regular field (no $ref or $ref not found)
2384        // Check for $ref even in non-$ref path (in case $ref doesn't resolve)
2385        let ref_path = field_data
2386            .get("$ref")
2387            .and_then(|v| v.as_str())
2388            .map(|s| s.to_string());
2389
2390        let required = field_data
2391            .get("required")
2392            .and_then(|v| v.as_bool())
2393            .unwrap_or(false);
2394
2395        // Use description extracted at function start, or extract if not yet extracted
2396        let field_description = if description.is_empty() {
2397            field_data
2398                .get("description")
2399                .and_then(|v| v.as_str())
2400                .unwrap_or("")
2401                .to_string()
2402        } else {
2403            description
2404        };
2405
2406        // Use quality_rules extracted at function start, or extract if not yet extracted
2407        let mut column_quality_rules = quality_rules;
2408
2409        // Extract column-level quality rules if not already extracted
2410        if column_quality_rules.is_empty()
2411            && let Some(quality_val) = field_data.get("quality")
2412        {
2413            if let Some(arr) = quality_val.as_array() {
2414                // Array of quality rules
2415                for item in arr {
2416                    if let Some(obj) = item.as_object() {
2417                        let mut rule = HashMap::new();
2418                        for (key, value) in obj {
2419                            rule.insert(key.clone(), json_value_to_serde_value(value));
2420                        }
2421                        column_quality_rules.push(rule);
2422                    }
2423                }
2424            } else if let Some(obj) = quality_val.as_object() {
2425                // Single quality rule object
2426                let mut rule = HashMap::new();
2427                for (key, value) in obj {
2428                    rule.insert(key.clone(), json_value_to_serde_value(value));
2429                }
2430                column_quality_rules.push(rule);
2431            }
2432        }
2433
2434        // If required=true (nullable=false), add a "not_null" quality rule if not already present
2435        if required {
2436            let has_not_null = column_quality_rules.iter().any(|rule| {
2437                rule.get("type")
2438                    .and_then(|v| v.as_str())
2439                    .map(|s| {
2440                        s.to_lowercase().contains("not_null")
2441                            || s.to_lowercase().contains("notnull")
2442                    })
2443                    .unwrap_or(false)
2444            });
2445            if !has_not_null {
2446                let mut not_null_rule = HashMap::new();
2447                not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
2448                not_null_rule.insert(
2449                    "description".to_string(),
2450                    serde_json::json!("Column must not be null"),
2451                );
2452                column_quality_rules.push(not_null_rule);
2453            }
2454        }
2455
2456        columns.push(Column {
2457            name: field_name.to_string(),
2458            data_type: field_type,
2459            nullable: !required,
2460            primary_key: field_data
2461                .get("primaryKey")
2462                .and_then(|v| v.as_bool())
2463                .unwrap_or(false),
2464            secondary_key: false,
2465            composite_key: None,
2466            foreign_key: self.parse_foreign_key_from_data_contract(field_data),
2467            constraints: Vec::new(),
2468            description: field_description,
2469            errors: Vec::new(),
2470            quality: column_quality_rules,
2471            ref_path,
2472            enum_values: Vec::new(),
2473            column_order: 0,
2474        });
2475
2476        Ok(columns)
2477    }
2478
2479    /// Parse foreign key from Data Contract field data.
2480    fn parse_foreign_key_from_data_contract(
2481        &self,
2482        field_data: &serde_json::Map<String, JsonValue>,
2483    ) -> Option<ForeignKey> {
2484        field_data
2485            .get("foreignKey")
2486            .and_then(|v| v.as_object())
2487            .map(|fk_obj| ForeignKey {
2488                table_id: fk_obj
2489                    .get("table")
2490                    .or_else(|| fk_obj.get("table_id"))
2491                    .and_then(|v| v.as_str())
2492                    .unwrap_or("")
2493                    .to_string(),
2494                column_name: fk_obj
2495                    .get("column")
2496                    .or_else(|| fk_obj.get("column_name"))
2497                    .and_then(|v| v.as_str())
2498                    .unwrap_or("")
2499                    .to_string(),
2500            })
2501    }
2502
2503    /// Extract database type from servers in Data Contract format.
2504    fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2505        // Data Contract format: servers can be object or array
2506        if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
2507            // Object format: { "server_name": { "type": "..." } }
2508            if let Some((_, server_data)) = servers_obj.iter().next()
2509                && let Some(server_obj) = server_data.as_object()
2510            {
2511                return server_obj
2512                    .get("type")
2513                    .and_then(|v| v.as_str())
2514                    .and_then(|s| self.parse_database_type(s));
2515            }
2516        } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
2517            // Array format: [ { "server": "...", "type": "..." } ]
2518            if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
2519                return server_obj
2520                    .get("type")
2521                    .and_then(|v| v.as_str())
2522                    .and_then(|s| self.parse_database_type(s));
2523            }
2524        }
2525        None
2526    }
2527
2528    /// Parse database type string to enum.
2529    fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
2530        match s.to_lowercase().as_str() {
2531            "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
2532            "postgres" | "postgresql" => Some(DatabaseType::Postgres),
2533            "mysql" => Some(DatabaseType::Mysql),
2534            "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
2535            "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
2536            _ => None,
2537        }
2538    }
2539
2540    /// Parse STRUCT type definition from string (e.g., "ARRAY<STRUCT<ID: STRING, NAME: STRING>>").
2541    fn parse_struct_type_from_string(
2542        &self,
2543        field_name: &str,
2544        type_str: &str,
2545        field_data: &serde_json::Map<String, JsonValue>,
2546    ) -> Result<Vec<Column>> {
2547        let mut columns = Vec::new();
2548
2549        // Normalize whitespace - replace newlines and multiple spaces with single space
2550        let normalized_type = type_str
2551            .lines()
2552            .map(|line| line.trim())
2553            .filter(|line| !line.is_empty())
2554            .collect::<Vec<_>>()
2555            .join(" ");
2556
2557        let type_str_upper = normalized_type.to_uppercase();
2558
2559        // Check if it's ARRAY<STRUCT<...>>
2560        let _is_array = type_str_upper.starts_with("ARRAY<");
2561        let struct_start = type_str_upper.find("STRUCT<");
2562
2563        if let Some(start_pos) = struct_start {
2564            // Extract STRUCT content
2565            let struct_content = &normalized_type[start_pos + 7..]; // Skip "STRUCT<"
2566
2567            // Find matching closing bracket, handling nested STRUCTs
2568            let mut depth = 1;
2569            let mut end_pos = None;
2570            for (i, ch) in struct_content.char_indices() {
2571                match ch {
2572                    '<' => depth += 1,
2573                    '>' => {
2574                        depth -= 1;
2575                        if depth == 0 {
2576                            end_pos = Some(i);
2577                            break;
2578                        }
2579                    }
2580                    _ => {}
2581                }
2582            }
2583
2584            // If no closing bracket found, try to infer it (handle malformed YAML)
2585            let struct_fields_str = if let Some(end) = end_pos {
2586                &struct_content[..end]
2587            } else {
2588                // Missing closing bracket - use everything up to the end
2589                struct_content.trim_end_matches('>').trim()
2590            };
2591
2592            // Parse fields: "ID: STRING, NAME: STRING, ..."
2593            let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
2594
2595            // Create nested columns, handling nested STRUCTs recursively
2596            for (nested_name, nested_type) in fields {
2597                let nested_type_upper = nested_type.to_uppercase();
2598                let nested_col_name = format!("{}.{}", field_name, nested_name);
2599
2600                // Check if this field is itself a STRUCT or ARRAY<STRUCT>
2601                if nested_type_upper.starts_with("STRUCT<") {
2602                    // Recursively parse nested STRUCT by creating a synthetic field_data
2603                    // and calling parse_struct_type_from_string recursively
2604                    let nested_struct_type_str = if nested_type_upper.starts_with("ARRAY<STRUCT<") {
2605                        // Handle ARRAY<STRUCT<...>>
2606                        nested_type.clone()
2607                    } else {
2608                        // Handle STRUCT<...>
2609                        nested_type.clone()
2610                    };
2611
2612                    // Recursively parse the nested STRUCT
2613                    match self.parse_struct_type_from_string(
2614                        &nested_col_name,
2615                        &nested_struct_type_str,
2616                        field_data,
2617                    ) {
2618                        Ok(nested_cols) => {
2619                            // Add all recursively parsed columns
2620                            columns.extend(nested_cols);
2621                        }
2622                        Err(_) => {
2623                            // If recursive parsing fails, add the parent column with STRUCT type
2624                            columns.push(Column {
2625                                name: nested_col_name,
2626                                data_type: normalize_data_type(&nested_type),
2627                                nullable: !field_data
2628                                    .get("required")
2629                                    .and_then(|v| v.as_bool())
2630                                    .unwrap_or(false),
2631                                primary_key: false,
2632                                secondary_key: false,
2633                                composite_key: None,
2634                                foreign_key: None,
2635                                constraints: Vec::new(),
2636                                description: field_data
2637                                    .get("description")
2638                                    .and_then(|v| v.as_str())
2639                                    .unwrap_or("")
2640                                    .to_string(),
2641                                errors: Vec::new(),
2642                                quality: Vec::new(),
2643                                ref_path: None,
2644                                enum_values: Vec::new(),
2645                                column_order: 0,
2646                            });
2647                        }
2648                    }
2649                } else {
2650                    // Simple nested field (not a STRUCT)
2651                    columns.push(Column {
2652                        name: nested_col_name,
2653                        data_type: normalize_data_type(&nested_type),
2654                        nullable: !field_data
2655                            .get("required")
2656                            .and_then(|v| v.as_bool())
2657                            .unwrap_or(false),
2658                        primary_key: false,
2659                        secondary_key: false,
2660                        composite_key: None,
2661                        foreign_key: None,
2662                        constraints: Vec::new(),
2663                        description: field_data
2664                            .get("description")
2665                            .and_then(|v| v.as_str())
2666                            .unwrap_or("")
2667                            .to_string(),
2668                        errors: Vec::new(),
2669                        quality: Vec::new(),
2670                        ref_path: None,
2671                        enum_values: Vec::new(),
2672                        column_order: 0,
2673                    });
2674                }
2675            }
2676
2677            return Ok(columns);
2678        }
2679
2680        // If no STRUCT found, return empty (fallback to regular parsing)
2681        Ok(Vec::new())
2682    }
2683
2684    /// Parse STRUCT fields from string (e.g., "ID: STRING, NAME: STRING").
2685    fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
2686        let mut fields = Vec::new();
2687        let mut current_field = String::new();
2688        let mut depth = 0;
2689        let mut in_string = false;
2690        let mut string_char = None;
2691
2692        for ch in fields_str.chars() {
2693            match ch {
2694                '\'' | '"' if !in_string || Some(ch) == string_char => {
2695                    if in_string {
2696                        in_string = false;
2697                        string_char = None;
2698                    } else {
2699                        in_string = true;
2700                        string_char = Some(ch);
2701                    }
2702                    current_field.push(ch);
2703                }
2704                '<' if !in_string => {
2705                    depth += 1;
2706                    current_field.push(ch);
2707                }
2708                '>' if !in_string => {
2709                    depth -= 1;
2710                    current_field.push(ch);
2711                }
2712                ',' if !in_string && depth == 0 => {
2713                    // End of current field
2714                    let trimmed = current_field.trim();
2715                    if !trimmed.is_empty()
2716                        && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2717                    {
2718                        fields.push((name, type_part));
2719                    }
2720                    current_field.clear();
2721                }
2722                _ => {
2723                    current_field.push(ch);
2724                }
2725            }
2726        }
2727
2728        // Handle last field
2729        let trimmed = current_field.trim();
2730        if !trimmed.is_empty()
2731            && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2732        {
2733            fields.push((name, type_part));
2734        }
2735
2736        Ok(fields)
2737    }
2738
2739    /// Parse a single field definition (e.g., "ID: STRING" or "ALERTOPERATION: STRUCT<...>").
2740    fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
2741        // Split by colon, but handle nested STRUCTs
2742        let colon_pos = field_def.find(':')?;
2743        let name = field_def[..colon_pos].trim().to_string();
2744        let type_part = field_def[colon_pos + 1..].trim().to_string();
2745
2746        if name.is_empty() || type_part.is_empty() {
2747            return None;
2748        }
2749
2750        Some((name, type_part))
2751    }
2752
2753    /// Extract catalog and schema from customProperties.
2754    fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
2755        let mut catalog_name = None;
2756        let mut schema_name = None;
2757
2758        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2759            for prop in custom_props {
2760                if let Some(prop_obj) = prop.as_object() {
2761                    let prop_key = prop_obj
2762                        .get("property")
2763                        .and_then(|v| v.as_str())
2764                        .unwrap_or("");
2765                    let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
2766
2767                    match prop_key {
2768                        "catalogName" | "catalog_name" => {
2769                            catalog_name = prop_value.map(|s| s.to_string());
2770                        }
2771                        "schemaName" | "schema_name" => {
2772                            schema_name = prop_value.map(|s| s.to_string());
2773                        }
2774                        _ => {}
2775                    }
2776                }
2777            }
2778        }
2779
2780        // Also check direct fields
2781        if catalog_name.is_none() {
2782            catalog_name = data
2783                .get("catalog_name")
2784                .and_then(|v| v.as_str())
2785                .map(|s| s.to_string());
2786        }
2787        if schema_name.is_none() {
2788            schema_name = data
2789                .get("schema_name")
2790                .and_then(|v| v.as_str())
2791                .map(|s| s.to_string());
2792        }
2793
2794        (catalog_name, schema_name)
2795    }
2796}
2797
2798impl Default for ODCSImporter {
2799    fn default() -> Self {
2800        Self::new()
2801    }
2802}
2803
2804/// Parser error structure for detailed error reporting.
2805#[derive(Debug, Clone)]
2806pub struct ParserError {
2807    pub error_type: String,
2808    pub field: String,
2809    pub message: String,
2810}
2811
2812/// Convert YAML Value to JSON Value for easier manipulation.
2813fn yaml_to_json_value(yaml: &serde_yaml::Value) -> Result<JsonValue> {
2814    // Convert YAML to JSON via serialization
2815    let json_str = serde_json::to_string(yaml).context("Failed to convert YAML to JSON")?;
2816    serde_json::from_str(&json_str).context("Failed to parse JSON")
2817}
2818
2819/// Convert JSON Value to serde_json::Value for storage in HashMap.
2820fn json_value_to_serde_value(value: &JsonValue) -> serde_json::Value {
2821    value.clone()
2822}
2823
2824/// Normalize data type to uppercase, preserving STRUCT<...>, ARRAY<...>, MAP<...> format.
2825fn normalize_data_type(data_type: &str) -> String {
2826    if data_type.is_empty() {
2827        return data_type.to_string();
2828    }
2829
2830    let upper = data_type.to_uppercase();
2831
2832    // Handle STRUCT<...>, ARRAY<...>, MAP<...> preserving inner content
2833    if upper.starts_with("STRUCT") {
2834        if let Some(start) = data_type.find('<')
2835            && let Some(end) = data_type.rfind('>')
2836        {
2837            let inner = &data_type[start + 1..end];
2838            return format!("STRUCT<{}>", inner);
2839        }
2840        return format!("STRUCT{}", &data_type[6..]);
2841    } else if upper.starts_with("ARRAY") {
2842        if let Some(start) = data_type.find('<')
2843            && let Some(end) = data_type.rfind('>')
2844        {
2845            let inner = &data_type[start + 1..end];
2846            return format!("ARRAY<{}>", inner);
2847        }
2848        return format!("ARRAY{}", &data_type[5..]);
2849    } else if upper.starts_with("MAP") {
2850        if let Some(start) = data_type.find('<')
2851            && let Some(end) = data_type.rfind('>')
2852        {
2853            let inner = &data_type[start + 1..end];
2854            return format!("MAP<{}>", inner);
2855        }
2856        return format!("MAP{}", &data_type[3..]);
2857    }
2858
2859    upper
2860}
2861
2862// Helper functions for enum parsing
2863fn parse_medallion_layer(s: &str) -> Result<MedallionLayer> {
2864    match s.to_uppercase().as_str() {
2865        "BRONZE" => Ok(MedallionLayer::Bronze),
2866        "SILVER" => Ok(MedallionLayer::Silver),
2867        "GOLD" => Ok(MedallionLayer::Gold),
2868        "OPERATIONAL" => Ok(MedallionLayer::Operational),
2869        _ => Err(anyhow::anyhow!("Unknown medallion layer: {}", s)),
2870    }
2871}
2872
2873fn parse_scd_pattern(s: &str) -> Result<SCDPattern> {
2874    match s.to_uppercase().as_str() {
2875        "TYPE_1" | "TYPE1" => Ok(SCDPattern::Type1),
2876        "TYPE_2" | "TYPE2" => Ok(SCDPattern::Type2),
2877        _ => Err(anyhow::anyhow!("Unknown SCD pattern: {}", s)),
2878    }
2879}
2880
2881fn parse_data_vault_classification(s: &str) -> Result<DataVaultClassification> {
2882    match s.to_uppercase().as_str() {
2883        "HUB" => Ok(DataVaultClassification::Hub),
2884        "LINK" => Ok(DataVaultClassification::Link),
2885        "SATELLITE" | "SAT" => Ok(DataVaultClassification::Satellite),
2886        _ => Err(anyhow::anyhow!("Unknown Data Vault classification: {}", s)),
2887    }
2888}
2889
2890#[cfg(test)]
2891mod tests {
2892    use super::*;
2893
2894    #[test]
2895    fn test_parse_simple_odcl_table() {
2896        let mut parser = ODCSImporter::new();
2897        let odcl_yaml = r#"
2898name: users
2899columns:
2900  - name: id
2901    data_type: INT
2902    nullable: false
2903    primary_key: true
2904  - name: name
2905    data_type: VARCHAR(255)
2906    nullable: false
2907database_type: Postgres
2908"#;
2909
2910        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2911        assert_eq!(table.name, "users");
2912        assert_eq!(table.columns.len(), 2);
2913        assert_eq!(table.columns[0].name, "id");
2914        assert_eq!(table.database_type, Some(DatabaseType::Postgres));
2915        assert_eq!(errors.len(), 0);
2916    }
2917
2918    #[test]
2919    fn test_parse_odcl_with_metadata() {
2920        let mut parser = ODCSImporter::new();
2921        let odcl_yaml = r#"
2922name: users
2923columns:
2924  - name: id
2925    data_type: INT
2926medallion_layer: gold
2927scd_pattern: TYPE_2
2928odcl_metadata:
2929  description: "User table"
2930  owner: "data-team"
2931"#;
2932
2933        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2934        assert_eq!(table.medallion_layers.len(), 1);
2935        assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
2936        assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
2937        if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
2938            assert_eq!(desc, "User table");
2939        }
2940        assert_eq!(errors.len(), 0);
2941    }
2942
2943    #[test]
2944    fn test_parse_odcl_with_data_vault() {
2945        let mut parser = ODCSImporter::new();
2946        let odcl_yaml = r#"
2947name: hub_customer
2948columns:
2949  - name: customer_key
2950    data_type: VARCHAR(50)
2951data_vault_classification: Hub
2952"#;
2953
2954        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2955        assert_eq!(
2956            table.data_vault_classification,
2957            Some(DataVaultClassification::Hub)
2958        );
2959        assert_eq!(errors.len(), 0);
2960    }
2961
2962    #[test]
2963    fn test_parse_invalid_odcl() {
2964        let mut parser = ODCSImporter::new();
2965        let invalid_yaml = "not: valid: yaml: structure:";
2966
2967        // Should fail to parse YAML
2968        assert!(parser.parse(invalid_yaml).is_err());
2969    }
2970
2971    #[test]
2972    fn test_parse_odcl_missing_required_fields() {
2973        let mut parser = ODCSImporter::new();
2974        let non_conformant = r#"
2975name: users
2976# Missing required columns field
2977"#;
2978
2979        // Should fail with missing columns
2980        assert!(parser.parse(non_conformant).is_err());
2981    }
2982
2983    #[test]
2984    fn test_parse_odcl_with_foreign_key() {
2985        let mut parser = ODCSImporter::new();
2986        let odcl_yaml = r#"
2987name: orders
2988columns:
2989  - name: id
2990    data_type: INT
2991    primary_key: true
2992  - name: user_id
2993    data_type: INT
2994    foreign_key:
2995      table_id: users
2996      column_name: id
2997"#;
2998
2999        let (table, errors) = parser.parse(odcl_yaml).unwrap();
3000        assert_eq!(table.columns.len(), 2);
3001        let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
3002        assert!(user_id_col.foreign_key.is_some());
3003        assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
3004        assert_eq!(errors.len(), 0);
3005    }
3006
3007    #[test]
3008    fn test_parse_odcl_with_constraints() {
3009        let mut parser = ODCSImporter::new();
3010        let odcl_yaml = r#"
3011name: products
3012columns:
3013  - name: id
3014    data_type: INT
3015    primary_key: true
3016  - name: name
3017    data_type: VARCHAR(255)
3018    nullable: false
3019    constraints:
3020      - UNIQUE
3021      - NOT NULL
3022"#;
3023
3024        let (table, errors) = parser.parse(odcl_yaml).unwrap();
3025        assert_eq!(table.columns.len(), 2);
3026        let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
3027        assert!(!name_col.nullable);
3028        assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
3029        assert_eq!(errors.len(), 0);
3030    }
3031}