data_modelling_sdk/import/
odcs.rs

1//! ODCS parser service for parsing Open Data Contract Standard YAML files.
2//!
3//! This service parses ODCS (Open Data Contract Standard) v3.1.0 and legacy ODCL (Data Contract Specification) YAML files
4//! and converts them to Table models. ODCL files are automatically converted to ODCS v3.1.0 format.
5//! Supports multiple formats:
6//! - ODCS v3.1.0 / v3.0.x format (apiVersion, kind, schema) - PRIMARY FORMAT
7//! - ODCL (Data Contract Specification) format (dataContractSpecification, models, definitions) - LEGACY, converted to ODCS
8//! - Simple ODCL format (name, columns) - LEGACY, converted to ODCS
9//! - Liquibase format
10
11use super::{ImportError, ImportResult, TableData};
12use crate::models::column::ForeignKey;
13use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
14use crate::models::{Column, Table};
15use anyhow::{Context, Result};
16use serde_json::Value as JsonValue;
17use std::collections::HashMap;
18use tracing::info;
19
20/// ODCS parser service for parsing Open Data Contract Standard YAML files.
21/// Handles ODCS v3.1.0 (primary format) and legacy ODCL formats (converted to ODCS).
22pub struct ODCSImporter {
23    /// Current YAML data for $ref resolution
24    current_yaml_data: Option<serde_yaml::Value>,
25}
26
27impl ODCSImporter {
28    /// Create a new ODCS parser instance.
29    ///
30    /// # Example
31    ///
32    /// ```rust
33    /// use data_modelling_sdk::import::odcs::ODCSImporter;
34    ///
35    /// let mut importer = ODCSImporter::new();
36    /// ```
37    pub fn new() -> Self {
38        Self {
39            current_yaml_data: None,
40        }
41    }
42
43    /// Import ODCS/ODCL YAML content and create Table (SDK interface).
44    ///
45    /// Supports ODCS v3.1.0 (primary), legacy ODCL formats (converted to ODCS), and Liquibase formats.
46    ///
47    /// # Arguments
48    ///
49    /// * `yaml_content` - ODCS/ODCL YAML content as a string
50    ///
51    /// # Returns
52    ///
53    /// An `ImportResult` containing the extracted table and any parse errors.
54    ///
55    /// # Example
56    ///
57    /// ```rust
58    /// use data_modelling_sdk::import::odcs::ODCSImporter;
59    ///
60    /// let mut importer = ODCSImporter::new();
61    /// let yaml = r#"
62    /// apiVersion: v3.1.0
63    /// kind: DataContract
64    /// id: 550e8400-e29b-41d4-a716-446655440000
65    /// version: 1.0.0
66    /// name: users
67    /// schema:
68    ///   fields:
69    ///     - name: id
70    ///       type: bigint
71    /// "#;
72    /// let result = importer.import(yaml).unwrap();
73    /// assert_eq!(result.tables.len(), 1);
74    /// ```
75    pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
76        match self.parse(yaml_content) {
77            Ok((table, errors)) => {
78                let sdk_tables = vec![TableData {
79                    table_index: 0,
80                    name: Some(table.name.clone()),
81                    columns: table
82                        .columns
83                        .iter()
84                        .map(|c| super::ColumnData {
85                            name: c.name.clone(),
86                            data_type: c.data_type.clone(),
87                            nullable: c.nullable,
88                            primary_key: c.primary_key,
89                        })
90                        .collect(),
91                }];
92                let sdk_errors: Vec<ImportError> = errors
93                    .iter()
94                    .map(|e| ImportError::ParseError(e.message.clone()))
95                    .collect();
96                Ok(ImportResult {
97                    tables: sdk_tables,
98                    tables_requiring_name: Vec::new(),
99                    errors: sdk_errors,
100                    ai_suggestions: None,
101                })
102            }
103            Err(e) => Err(ImportError::ParseError(e.to_string())),
104        }
105    }
106
107    /// Parse ODCS/ODCL YAML content and create Table (public method for native app use).
108    ///
109    /// This method returns the full Table object with all metadata, suitable for use in
110    /// native applications that need direct access to the parsed table structure.
111    /// For API use, prefer the `import()` method which returns ImportResult.
112    ///
113    /// # Returns
114    ///
115    /// Returns a tuple of (Table, list of errors/warnings).
116    /// Errors list is empty if parsing is successful.
117    pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
118        self.parse(yaml_content)
119    }
120
121    /// Parse ODCS/ODCL YAML content and create Table (internal method).
122    ///
123    /// Supports ODCS v3.1.0/v3.0.x (primary), ODCL Data Contract spec (legacy, converted to ODCS),
124    /// simple ODCL (legacy, converted to ODCS), and Liquibase formats.
125    ///
126    /// # Returns
127    ///
128    /// Returns a tuple of (Table, list of errors/warnings).
129    /// Errors list is empty if parsing is successful.
130    fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
131        // Errors are collected in helper functions, not here
132        let _errors: Vec<ParserError> = Vec::new();
133
134        // Parse YAML
135        let data: serde_yaml::Value =
136            serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
137
138        if data.is_null() {
139            return Err(anyhow::anyhow!("Empty YAML content"));
140        }
141
142        // Store current YAML data for $ref resolution
143        self.current_yaml_data = Some(data.clone());
144
145        // Convert to JSON Value for easier manipulation
146        let json_data = yaml_to_json_value(&data)?;
147
148        // Check format and parse accordingly
149        if self.is_liquibase_format(&json_data) {
150            return self.parse_liquibase(&json_data);
151        }
152
153        if self.is_odcl_v3_format(&json_data) {
154            return self.parse_odcl_v3(&json_data);
155        }
156
157        if self.is_data_contract_format(&json_data) {
158            return self.parse_data_contract(&json_data);
159        }
160
161        // Fall back to simple ODCL format
162        self.parse_simple_odcl(&json_data)
163    }
164
165    /// Resolve a $ref reference like '#/definitions/betAction'.
166    fn resolve_ref<'a>(&self, ref_str: &str, data: &'a JsonValue) -> Option<&'a JsonValue> {
167        if !ref_str.starts_with("#/") {
168            return None;
169        }
170
171        // Remove the leading '#/'
172        let path = &ref_str[2..];
173        let parts: Vec<&str> = path.split('/').collect();
174
175        // Navigate through the data structure
176        let mut current = data;
177        for part in parts {
178            current = current.get(part)?;
179        }
180
181        if current.is_object() {
182            Some(current)
183        } else {
184            None
185        }
186    }
187
188    /// Check if YAML is in Liquibase format.
189    fn is_liquibase_format(&self, data: &JsonValue) -> bool {
190        if data.get("databaseChangeLog").is_some() {
191            return true;
192        }
193        // Check for Liquibase-specific keys
194        if let Some(obj) = data.as_object() {
195            let obj_str = format!("{:?}", obj);
196            if obj_str.contains("changeSet") {
197                return true;
198            }
199        }
200        false
201    }
202
203    /// Check if YAML is in ODCS v3.0.x format.
204    fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
205        if let Some(obj) = data.as_object() {
206            let has_api_version = obj.contains_key("apiVersion");
207            let has_kind = obj
208                .get("kind")
209                .and_then(|v| v.as_str())
210                .map(|s| s == "DataContract")
211                .unwrap_or(false);
212            let has_id = obj.contains_key("id");
213            let has_version = obj.contains_key("version");
214            return has_api_version && has_kind && has_id && has_version;
215        }
216        false
217    }
218
219    /// Check if YAML is in Data Contract specification format.
220    fn is_data_contract_format(&self, data: &JsonValue) -> bool {
221        if let Some(obj) = data.as_object() {
222            let has_spec = obj.contains_key("dataContractSpecification");
223            let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
224            return has_spec && has_models;
225        }
226        false
227    }
228
229    /// Parse simple ODCL format.
230    fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
231        let mut errors = Vec::new();
232
233        // Extract table name
234        let name = data
235            .get("name")
236            .and_then(|v| v.as_str())
237            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
238            .to_string();
239
240        // Extract columns
241        let columns_data = data
242            .get("columns")
243            .and_then(|v| v.as_array())
244            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
245
246        let mut columns = Vec::new();
247        for (idx, col_data) in columns_data.iter().enumerate() {
248            match self.parse_column(col_data) {
249                Ok(col) => columns.push(col),
250                Err(e) => {
251                    errors.push(ParserError {
252                        error_type: "column_parse_error".to_string(),
253                        field: format!("columns[{}]", idx),
254                        message: e.to_string(),
255                    });
256                }
257            }
258        }
259
260        // Extract metadata
261        let database_type = self.extract_database_type(data);
262        let medallion_layers = self.extract_medallion_layers(data);
263        let scd_pattern = self.extract_scd_pattern(data);
264        let data_vault_classification = self.extract_data_vault_classification(data);
265        let quality_rules = self.extract_quality_rules(data);
266
267        // Validate pattern exclusivity
268        if scd_pattern.is_some() && data_vault_classification.is_some() {
269            errors.push(ParserError {
270                error_type: "validation_error".to_string(),
271                field: "patterns".to_string(),
272                message: "SCD pattern and Data Vault classification are mutually exclusive"
273                    .to_string(),
274            });
275        }
276
277        // Extract odcl_metadata
278        let mut odcl_metadata = HashMap::new();
279        if let Some(metadata) = data.get("odcl_metadata")
280            && let Some(obj) = metadata.as_object()
281        {
282            for (key, value) in obj {
283                odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
284            }
285        }
286
287        let table_uuid = self.extract_table_uuid(data);
288
289        let table = Table {
290            id: table_uuid,
291            name,
292            columns,
293            database_type,
294            catalog_name: None,
295            schema_name: None,
296            medallion_layers,
297            scd_pattern,
298            data_vault_classification,
299            modeling_level: None,
300            tags: Vec::new(),
301            odcl_metadata,
302            position: None,
303            yaml_file_path: None,
304            drawio_cell_id: None,
305            quality: quality_rules,
306            errors: Vec::new(),
307            created_at: chrono::Utc::now(),
308            updated_at: chrono::Utc::now(),
309        };
310
311        info!("Parsed ODCL table: {}", table.name);
312        Ok((table, errors))
313    }
314
315    /// Parse a single column definition.
316    fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
317        let name = col_data
318            .get("name")
319            .and_then(|v| v.as_str())
320            .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
321            .to_string();
322
323        let data_type = col_data
324            .get("data_type")
325            .and_then(|v| v.as_str())
326            .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
327            .to_string();
328
329        // Normalize data_type to uppercase (preserve STRUCT<...> format)
330        let data_type = normalize_data_type(&data_type);
331
332        let nullable = col_data
333            .get("nullable")
334            .and_then(|v| v.as_bool())
335            .unwrap_or(true);
336
337        let primary_key = col_data
338            .get("primary_key")
339            .and_then(|v| v.as_bool())
340            .unwrap_or(false);
341
342        let foreign_key = col_data
343            .get("foreign_key")
344            .and_then(|v| self.parse_foreign_key(v));
345
346        let constraints = col_data
347            .get("constraints")
348            .and_then(|v| v.as_array())
349            .map(|arr| {
350                arr.iter()
351                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
352                    .collect()
353            })
354            .unwrap_or_default();
355
356        let description = col_data
357            .get("description")
358            .and_then(|v| v.as_str())
359            .map(|s| s.to_string())
360            .unwrap_or_default();
361
362        // Extract column-level quality rules
363        let mut column_quality_rules = Vec::new();
364        if let Some(quality_val) = col_data.get("quality") {
365            if let Some(arr) = quality_val.as_array() {
366                // Array of quality rules
367                for item in arr {
368                    if let Some(obj) = item.as_object() {
369                        let mut rule = HashMap::new();
370                        for (key, value) in obj {
371                            rule.insert(key.clone(), json_value_to_serde_value(value));
372                        }
373                        column_quality_rules.push(rule);
374                    }
375                }
376            } else if let Some(obj) = quality_val.as_object() {
377                // Single quality rule object
378                let mut rule = HashMap::new();
379                for (key, value) in obj {
380                    rule.insert(key.clone(), json_value_to_serde_value(value));
381                }
382                column_quality_rules.push(rule);
383            }
384        }
385
386        // If nullable=false (required=true), add a "not_null" quality rule if not already present
387        if !nullable {
388            let has_not_null = column_quality_rules.iter().any(|rule| {
389                rule.get("type")
390                    .and_then(|v| v.as_str())
391                    .map(|s| {
392                        s.to_lowercase().contains("not_null")
393                            || s.to_lowercase().contains("notnull")
394                    })
395                    .unwrap_or(false)
396            });
397            if !has_not_null {
398                let mut not_null_rule = HashMap::new();
399                not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
400                not_null_rule.insert(
401                    "description".to_string(),
402                    serde_json::json!("Column must not be null"),
403                );
404                column_quality_rules.push(not_null_rule);
405            }
406        }
407
408        Ok(Column {
409            name,
410            data_type,
411            nullable,
412            primary_key,
413            secondary_key: false,
414            composite_key: None,
415            foreign_key,
416            constraints,
417            description,
418            errors: Vec::new(),
419            quality: column_quality_rules,
420            enum_values: Vec::new(),
421            column_order: 0,
422        })
423    }
424
425    /// Parse foreign key from JSON value.
426    fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
427        let obj = fk_data.as_object()?;
428        Some(ForeignKey {
429            table_id: obj
430                .get("table_id")
431                .or_else(|| obj.get("table"))
432                .and_then(|v| v.as_str())
433                .unwrap_or("")
434                .to_string(),
435            column_name: obj
436                .get("column_name")
437                .or_else(|| obj.get("column"))
438                .and_then(|v| v.as_str())
439                .unwrap_or("")
440                .to_string(),
441        })
442    }
443
444    /// Extract database type from data.
445    fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
446        data.get("database_type")
447            .and_then(|v| v.as_str())
448            .and_then(|s| match s.to_uppercase().as_str() {
449                "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
450                "MYSQL" => Some(DatabaseType::Mysql),
451                "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
452                "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
453                "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
454                _ => None,
455            })
456    }
457
458    /// Extract medallion layers from data.
459    fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
460        let mut layers = Vec::new();
461
462        // Check plural form first
463        if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
464            for item in arr {
465                if let Some(s) = item.as_str()
466                    && let Ok(layer) = parse_medallion_layer(s)
467                {
468                    layers.push(layer);
469                }
470            }
471        }
472        // Check singular form (backward compatibility)
473        else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
474            && let Ok(layer) = parse_medallion_layer(s)
475        {
476            layers.push(layer);
477        }
478
479        layers
480    }
481
482    /// Extract SCD pattern from data.
483    fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
484        data.get("scd_pattern")
485            .and_then(|v| v.as_str())
486            .and_then(|s| parse_scd_pattern(s).ok())
487    }
488
489    /// Extract Data Vault classification from data.
490    fn extract_data_vault_classification(
491        &self,
492        data: &JsonValue,
493    ) -> Option<DataVaultClassification> {
494        data.get("data_vault_classification")
495            .and_then(|v| v.as_str())
496            .and_then(|s| parse_data_vault_classification(s).ok())
497    }
498
499    /// Extract quality rules from data.
500    fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
501        use serde_json::Value;
502        let mut quality_rules = Vec::new();
503
504        // Check for quality field at root level (array of objects or single object)
505        if let Some(quality_val) = data.get("quality") {
506            if let Some(arr) = quality_val.as_array() {
507                // Array of quality rules
508                for item in arr {
509                    if let Some(obj) = item.as_object() {
510                        let mut rule = HashMap::new();
511                        for (key, value) in obj {
512                            rule.insert(key.clone(), json_value_to_serde_value(value));
513                        }
514                        quality_rules.push(rule);
515                    }
516                }
517            } else if let Some(obj) = quality_val.as_object() {
518                // Single quality rule object
519                let mut rule = HashMap::new();
520                for (key, value) in obj {
521                    rule.insert(key.clone(), json_value_to_serde_value(value));
522                }
523                quality_rules.push(rule);
524            } else if let Some(s) = quality_val.as_str() {
525                // Simple string quality value
526                let mut rule = HashMap::new();
527                rule.insert("value".to_string(), Value::String(s.to_string()));
528                quality_rules.push(rule);
529            }
530        }
531
532        // Check for quality in metadata (ODCL v3 format)
533        if let Some(metadata) = data.get("metadata")
534            && let Some(metadata_obj) = metadata.as_object()
535            && let Some(quality_val) = metadata_obj.get("quality")
536        {
537            if let Some(arr) = quality_val.as_array() {
538                // Array of quality rules
539                for item in arr {
540                    if let Some(obj) = item.as_object() {
541                        let mut rule = HashMap::new();
542                        for (key, value) in obj {
543                            rule.insert(key.clone(), json_value_to_serde_value(value));
544                        }
545                        quality_rules.push(rule);
546                    }
547                }
548            } else if let Some(obj) = quality_val.as_object() {
549                // Single quality rule object
550                let mut rule = HashMap::new();
551                for (key, value) in obj {
552                    rule.insert(key.clone(), json_value_to_serde_value(value));
553                }
554                quality_rules.push(rule);
555            } else if let Some(s) = quality_val.as_str() {
556                // Simple string quality value
557                let mut rule = HashMap::new();
558                rule.insert("value".to_string(), Value::String(s.to_string()));
559                quality_rules.push(rule);
560            }
561        }
562
563        // Check for tblproperties field (similar to SQL TBLPROPERTIES)
564        if let Some(tblprops) = data.get("tblproperties")
565            && let Some(obj) = tblprops.as_object()
566        {
567            for (key, value) in obj {
568                let mut rule = HashMap::new();
569                rule.insert("property".to_string(), Value::String(key.clone()));
570                rule.insert("value".to_string(), json_value_to_serde_value(value));
571                quality_rules.push(rule);
572            }
573        }
574
575        quality_rules
576    }
577
578    /// Parse Liquibase YAML changelog format.
579    ///
580    /// Extracts the first `createTable` change from a Liquibase changelog.
581    /// Supports both direct column definitions and nested column structures.
582    ///
583    /// # Supported Format
584    ///
585    fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
586        // Supported Liquibase YAML changelog format:
587        // databaseChangeLog:
588        //   - changeSet:
589        //       - createTable:
590        //           tableName: my_table
591        //           columns:
592        //             - column:
593        //                 name: id
594        //                 type: int
595        //                 constraints:
596        //                   primaryKey: true
597        //                   nullable: false
598
599        let mut errors = Vec::new();
600
601        let changelog = data
602            .get("databaseChangeLog")
603            .and_then(|v| v.as_array())
604            .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
605
606        // Find first createTable change.
607        let mut table_name: Option<String> = None;
608        let mut columns: Vec<crate::models::column::Column> = Vec::new();
609
610        for entry in changelog {
611            // Entries are typically maps like { changeSet: [...] } or { changeSet: { changes: [...] } }
612            if let Some(change_set) = entry.get("changeSet") {
613                // Liquibase YAML can represent changeSet as map or list.
614                let changes = if let Some(obj) = change_set.as_object() {
615                    obj.get("changes")
616                        .and_then(|v| v.as_array())
617                        .cloned()
618                        .unwrap_or_default()
619                } else if let Some(arr) = change_set.as_array() {
620                    // Some variants encode changes directly as list entries inside changeSet.
621                    arr.clone()
622                } else {
623                    Vec::new()
624                };
625
626                for ch in changes {
627                    let create = ch.get("createTable").or_else(|| ch.get("create_table"));
628                    if let Some(create) = create {
629                        table_name = create
630                            .get("tableName")
631                            .or_else(|| create.get("table_name"))
632                            .and_then(|v| v.as_str())
633                            .map(|s| s.to_string());
634
635                        // columns: [ { column: { ... } }, ... ]
636                        if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
637                            for col_entry in cols {
638                                let col = col_entry.get("column").unwrap_or(col_entry);
639                                let name = col
640                                    .get("name")
641                                    .and_then(|v| v.as_str())
642                                    .unwrap_or("")
643                                    .to_string();
644                                let data_type = col
645                                    .get("type")
646                                    .and_then(|v| v.as_str())
647                                    .unwrap_or("")
648                                    .to_string();
649
650                                if name.is_empty() {
651                                    errors.push(ParserError {
652                                        error_type: "validation_error".to_string(),
653                                        field: "columns.name".to_string(),
654                                        message: "Liquibase createTable column missing name"
655                                            .to_string(),
656                                    });
657                                    continue;
658                                }
659
660                                let mut column =
661                                    crate::models::column::Column::new(name, data_type);
662
663                                if let Some(constraints) =
664                                    col.get("constraints").and_then(|v| v.as_object())
665                                {
666                                    if let Some(pk) =
667                                        constraints.get("primaryKey").and_then(|v| v.as_bool())
668                                    {
669                                        column.primary_key = pk;
670                                    }
671                                    if let Some(nullable) =
672                                        constraints.get("nullable").and_then(|v| v.as_bool())
673                                    {
674                                        column.nullable = nullable;
675                                    }
676                                }
677
678                                columns.push(column);
679                            }
680                        }
681
682                        // parse_table() returns a single Table, so we parse the first createTable.
683                        // If multiple tables are needed, call parse_table() multiple times or use import().
684                        break;
685                    }
686                }
687            }
688            if table_name.is_some() {
689                break;
690            }
691        }
692
693        let table_name = table_name
694            .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
695        let table = Table::new(table_name, columns);
696        // Preserve any errors collected.
697        Ok((table, errors))
698    }
699
700    /// Parse ODCS v3.0.x format.
701    fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
702        let mut errors = Vec::new();
703
704        // Extract table name
705        let table_name = data
706            .get("name")
707            .and_then(|v| v.as_str())
708            .map(|s| s.to_string())
709            .or_else(|| {
710                // Try to get from schema array
711                data.get("schema")
712                    .and_then(|v| v.as_array())
713                    .and_then(|arr| arr.first())
714                    .and_then(|obj| obj.as_object())
715                    .and_then(|obj| obj.get("name"))
716                    .and_then(|v| v.as_str())
717                    .map(|s| s.to_string())
718            })
719            .ok_or_else(|| {
720                anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
721            })?;
722
723        // Extract schema - ODCS v3.0.x uses array of SchemaObject
724        let schema = data
725            .get("schema")
726            .and_then(|v| v.as_array())
727            .ok_or_else(|| {
728                errors.push(ParserError {
729                    error_type: "validation_error".to_string(),
730                    field: "schema".to_string(),
731                    message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
732                });
733                anyhow::anyhow!("Missing schema")
734            });
735
736        let schema = match schema {
737            Ok(s) if s.is_empty() => {
738                errors.push(ParserError {
739                    error_type: "validation_error".to_string(),
740                    field: "schema".to_string(),
741                    message: "ODCS v3.0.x schema array is empty".to_string(),
742                });
743                let quality_rules = self.extract_quality_rules(data);
744                let table_uuid = self.extract_table_uuid(data);
745                let table = Table {
746                    id: table_uuid,
747                    name: table_name,
748                    columns: Vec::new(),
749                    database_type: None,
750                    catalog_name: None,
751                    schema_name: None,
752                    medallion_layers: Vec::new(),
753                    scd_pattern: None,
754                    data_vault_classification: None,
755                    modeling_level: None,
756                    tags: Vec::new(),
757                    odcl_metadata: HashMap::new(),
758                    position: None,
759                    yaml_file_path: None,
760                    drawio_cell_id: None,
761                    quality: quality_rules,
762                    errors: Vec::new(),
763                    created_at: chrono::Utc::now(),
764                    updated_at: chrono::Utc::now(),
765                };
766                return Ok((table, errors));
767            }
768            Ok(s) => s,
769            Err(_) => {
770                let quality_rules = self.extract_quality_rules(data);
771                let table_uuid = self.extract_table_uuid(data);
772                let table = Table {
773                    id: table_uuid,
774                    name: table_name,
775                    columns: Vec::new(),
776                    database_type: None,
777                    catalog_name: None,
778                    schema_name: None,
779                    medallion_layers: Vec::new(),
780                    scd_pattern: None,
781                    data_vault_classification: None,
782                    modeling_level: None,
783                    tags: Vec::new(),
784                    odcl_metadata: HashMap::new(),
785                    position: None,
786                    yaml_file_path: None,
787                    drawio_cell_id: None,
788                    quality: quality_rules,
789                    errors: Vec::new(),
790                    created_at: chrono::Utc::now(),
791                    updated_at: chrono::Utc::now(),
792                };
793                return Ok((table, errors));
794            }
795        };
796
797        // Get the first schema object (table)
798        let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
799            errors.push(ParserError {
800                error_type: "validation_error".to_string(),
801                field: "schema[0]".to_string(),
802                message: "First schema object must be a dictionary".to_string(),
803            });
804            anyhow::anyhow!("Invalid schema object")
805        })?;
806
807        let object_name = schema_object
808            .get("name")
809            .and_then(|v| v.as_str())
810            .unwrap_or(&table_name);
811
812        let properties = schema_object
813            .get("properties")
814            .and_then(|v| v.as_object())
815            .ok_or_else(|| {
816                errors.push(ParserError {
817                    error_type: "validation_error".to_string(),
818                    field: format!("Object '{}'", object_name),
819                    message: format!("Object '{}' missing 'properties' field", object_name),
820                });
821                anyhow::anyhow!("Missing properties")
822            })?;
823
824        // Parse properties as columns (similar to Data Contract)
825        let mut columns = Vec::new();
826        for (prop_name, prop_data) in properties {
827            if let Some(prop_obj) = prop_data.as_object() {
828                match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
829                    Ok(mut cols) => columns.append(&mut cols),
830                    Err(e) => {
831                        errors.push(ParserError {
832                            error_type: "property_parse_error".to_string(),
833                            field: format!("Property '{}'", prop_name),
834                            message: e.to_string(),
835                        });
836                    }
837                }
838            } else {
839                errors.push(ParserError {
840                    error_type: "validation_error".to_string(),
841                    field: format!("Property '{}'", prop_name),
842                    message: format!("Property '{}' must be an object", prop_name),
843                });
844            }
845        }
846
847        // Extract metadata from customProperties
848        let (medallion_layers, scd_pattern, data_vault_classification, mut tags) =
849            self.extract_metadata_from_custom_properties(data);
850
851        // Extract sharedDomains from customProperties
852        let mut shared_domains: Vec<String> = Vec::new();
853        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
854            for prop in custom_props {
855                if let Some(prop_obj) = prop.as_object() {
856                    let prop_key = prop_obj
857                        .get("property")
858                        .and_then(|v| v.as_str())
859                        .unwrap_or("");
860                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
861                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
862                    {
863                        for item in arr {
864                            if let Some(s) = item.as_str() {
865                                shared_domains.push(s.to_string());
866                            }
867                        }
868                    }
869                }
870            }
871        }
872
873        // Extract tags from top-level tags field (if not already extracted from customProperties)
874        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
875            for item in tags_arr {
876                if let Some(s) = item.as_str()
877                    && !tags.contains(&s.to_string())
878                {
879                    tags.push(s.to_string());
880                }
881            }
882        }
883
884        // Extract database type from servers
885        let database_type = self.extract_database_type_from_odcl_v3_servers(data);
886
887        // Extract quality rules
888        let quality_rules = self.extract_quality_rules(data);
889
890        // Build ODCL metadata
891        let mut odcl_metadata = HashMap::new();
892        odcl_metadata.insert(
893            "apiVersion".to_string(),
894            json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
895        );
896        odcl_metadata.insert(
897            "kind".to_string(),
898            json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
899        );
900        odcl_metadata.insert(
901            "id".to_string(),
902            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
903        );
904        odcl_metadata.insert(
905            "version".to_string(),
906            json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
907        );
908        odcl_metadata.insert(
909            "status".to_string(),
910            json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
911        );
912
913        // Extract servicelevels if present
914        if let Some(servicelevels_val) = data.get("servicelevels") {
915            odcl_metadata.insert(
916                "servicelevels".to_string(),
917                json_value_to_serde_value(servicelevels_val),
918            );
919        }
920
921        // Extract links if present
922        if let Some(links_val) = data.get("links") {
923            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
924        }
925
926        // Extract domain, dataProduct, tenant
927        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
928            odcl_metadata.insert(
929                "domain".to_string(),
930                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
931            );
932        }
933        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
934            odcl_metadata.insert(
935                "dataProduct".to_string(),
936                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
937            );
938        }
939        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
940            odcl_metadata.insert(
941                "tenant".to_string(),
942                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
943            );
944        }
945
946        // Extract top-level description (can be object or string)
947        if let Some(desc_val) = data.get("description") {
948            odcl_metadata.insert(
949                "description".to_string(),
950                json_value_to_serde_value(desc_val),
951            );
952        }
953
954        // Extract pricing
955        if let Some(pricing_val) = data.get("pricing") {
956            odcl_metadata.insert(
957                "pricing".to_string(),
958                json_value_to_serde_value(pricing_val),
959            );
960        }
961
962        // Extract team
963        if let Some(team_val) = data.get("team") {
964            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
965        }
966
967        // Extract roles
968        if let Some(roles_val) = data.get("roles") {
969            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
970        }
971
972        // Extract terms
973        if let Some(terms_val) = data.get("terms") {
974            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
975        }
976
977        // Extract full servers array (not just type)
978        if let Some(servers_val) = data.get("servers") {
979            odcl_metadata.insert(
980                "servers".to_string(),
981                json_value_to_serde_value(servers_val),
982            );
983        }
984
985        // Extract infrastructure
986        if let Some(infrastructure_val) = data.get("infrastructure") {
987            odcl_metadata.insert(
988                "infrastructure".to_string(),
989                json_value_to_serde_value(infrastructure_val),
990            );
991        }
992
993        // Store sharedDomains in metadata (extracted from customProperties above)
994        if !shared_domains.is_empty() {
995            let shared_domains_json: Vec<serde_json::Value> = shared_domains
996                .iter()
997                .map(|d| serde_json::Value::String(d.clone()))
998                .collect();
999            odcl_metadata.insert(
1000                "sharedDomains".to_string(),
1001                serde_json::Value::Array(shared_domains_json),
1002            );
1003        }
1004
1005        let table_uuid = self.extract_table_uuid(data);
1006
1007        let table = Table {
1008            id: table_uuid,
1009            name: table_name,
1010            columns,
1011            database_type,
1012            catalog_name: None,
1013            schema_name: None,
1014            medallion_layers,
1015            scd_pattern,
1016            data_vault_classification,
1017            modeling_level: None,
1018            tags,
1019            odcl_metadata,
1020            position: None,
1021            yaml_file_path: None,
1022            drawio_cell_id: None,
1023            quality: quality_rules,
1024            errors: Vec::new(),
1025            created_at: chrono::Utc::now(),
1026            updated_at: chrono::Utc::now(),
1027        };
1028
1029        info!(
1030            "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1031            table.name,
1032            errors.len()
1033        );
1034        Ok((table, errors))
1035    }
1036
1037    /// Parse a single property from ODCS v3.0.x format (similar to Data Contract field).
1038    fn parse_odcl_v3_property(
1039        &self,
1040        prop_name: &str,
1041        prop_data: &serde_json::Map<String, JsonValue>,
1042        data: &JsonValue,
1043    ) -> Result<Vec<Column>> {
1044        // Reuse Data Contract field parsing logic (they're similar)
1045        let mut errors = Vec::new();
1046        self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
1047    }
1048
1049    /// Extract table UUID from ODCS `id` field (standard) or fallback to customProperties/odcl_metadata (backward compatibility).
1050    /// Returns the UUID if found, otherwise generates a new one.
1051    fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
1052        // First check the top-level `id` field (ODCS spec: "A unique identifier used to reduce the risk of dataset name collisions, such as a UUID.")
1053        if let Some(id_val) = data.get("id")
1054            && let Some(id_str) = id_val.as_str()
1055        {
1056            if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1057                tracing::debug!(
1058                    "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
1059                    uuid
1060                );
1061                return uuid;
1062            } else {
1063                tracing::warn!(
1064                    "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
1065                    id_str
1066                );
1067            }
1068        }
1069
1070        // Backward compatibility: check customProperties for tableUuid (legacy format)
1071        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1072            for prop in custom_props {
1073                if let Some(prop_obj) = prop.as_object() {
1074                    let prop_key = prop_obj
1075                        .get("property")
1076                        .and_then(|v| v.as_str())
1077                        .unwrap_or("");
1078                    if prop_key == "tableUuid"
1079                        && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
1080                        && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1081                    {
1082                        tracing::debug!(
1083                            "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
1084                            uuid
1085                        );
1086                        return uuid;
1087                    }
1088                }
1089            }
1090        }
1091
1092        // Fallback: check odcl_metadata if present (legacy format)
1093        if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
1094            && let Some(uuid_val) = metadata.get("tableUuid")
1095            && let Some(uuid_str) = uuid_val.as_str()
1096            && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1097        {
1098            tracing::debug!(
1099                "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
1100                uuid
1101            );
1102            return uuid;
1103        }
1104
1105        // Generate deterministic UUID v5 if not found (based on table name)
1106        let table_name = data
1107            .get("name")
1108            .and_then(|v| v.as_str())
1109            .unwrap_or("unknown");
1110        let new_uuid = crate::models::table::Table::generate_id(
1111            table_name, None, // database_type not available here
1112            None, // catalog_name not available here
1113            None, // schema_name not available here
1114        );
1115        tracing::warn!(
1116            "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
1117            table_name,
1118            new_uuid
1119        );
1120        new_uuid
1121    }
1122
1123    /// Extract metadata from customProperties in ODCS v3.0.x format.
1124    fn extract_metadata_from_custom_properties(
1125        &self,
1126        data: &JsonValue,
1127    ) -> (
1128        Vec<MedallionLayer>,
1129        Option<SCDPattern>,
1130        Option<DataVaultClassification>,
1131        Vec<String>,
1132    ) {
1133        let mut medallion_layers = Vec::new();
1134        let mut scd_pattern = None;
1135        let mut data_vault_classification = None;
1136        let mut tags = Vec::new();
1137
1138        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1139            for prop in custom_props {
1140                if let Some(prop_obj) = prop.as_object() {
1141                    let prop_key = prop_obj
1142                        .get("property")
1143                        .and_then(|v| v.as_str())
1144                        .unwrap_or("");
1145                    let prop_value = prop_obj.get("value");
1146
1147                    match prop_key {
1148                        "medallionLayers" | "medallion_layers" => {
1149                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1150                                for item in arr {
1151                                    if let Some(s) = item.as_str()
1152                                        && let Ok(layer) = parse_medallion_layer(s)
1153                                    {
1154                                        medallion_layers.push(layer);
1155                                    }
1156                                }
1157                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1158                                // Comma-separated string
1159                                for part in s.split(',') {
1160                                    if let Ok(layer) = parse_medallion_layer(part.trim()) {
1161                                        medallion_layers.push(layer);
1162                                    }
1163                                }
1164                            }
1165                        }
1166                        "scdPattern" | "scd_pattern" => {
1167                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1168                                scd_pattern = parse_scd_pattern(s).ok();
1169                            }
1170                        }
1171                        "dataVaultClassification" | "data_vault_classification" => {
1172                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1173                                data_vault_classification = parse_data_vault_classification(s).ok();
1174                            }
1175                        }
1176                        "tags" => {
1177                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1178                                for item in arr {
1179                                    if let Some(s) = item.as_str() {
1180                                        tags.push(s.to_string());
1181                                    }
1182                                }
1183                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1184                                // Comma-separated string
1185                                for part in s.split(',') {
1186                                    tags.push(part.trim().to_string());
1187                                }
1188                            }
1189                        }
1190                        "sharedDomains" | "shared_domains" => {
1191                            // sharedDomains will be stored in metadata by the caller
1192                            // This match is here for completeness but sharedDomains is handled separately
1193                        }
1194                        _ => {}
1195                    }
1196                }
1197            }
1198        }
1199
1200        // Also extract tags from top-level tags field
1201        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1202            for item in tags_arr {
1203                if let Some(s) = item.as_str()
1204                    && !tags.contains(&s.to_string())
1205                {
1206                    tags.push(s.to_string());
1207                }
1208            }
1209        }
1210
1211        (
1212            medallion_layers,
1213            scd_pattern,
1214            data_vault_classification,
1215            tags,
1216        )
1217    }
1218
1219    /// Extract database type from servers in ODCS v3.0.x format.
1220    fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
1221        // ODCS v3.0.x: servers is an array of Server objects
1222        if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
1223            && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
1224        {
1225            return server_obj
1226                .get("type")
1227                .and_then(|v| v.as_str())
1228                .and_then(|s| self.parse_database_type(s));
1229        }
1230        None
1231    }
1232
1233    /// Parse Data Contract format.
1234    fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1235        let mut errors = Vec::new();
1236
1237        // Extract models
1238        let models = data
1239            .get("models")
1240            .and_then(|v| v.as_object())
1241            .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
1242
1243        // parse_table() returns a single Table, so we parse the first model.
1244        // If multiple models are needed, call parse_table() multiple times or use import().
1245        let (model_name, model_data) = models
1246            .iter()
1247            .next()
1248            .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
1249
1250        let model_data = model_data
1251            .as_object()
1252            .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
1253
1254        // Extract fields (columns)
1255        let fields = model_data
1256            .get("fields")
1257            .and_then(|v| v.as_object())
1258            .ok_or_else(|| {
1259                errors.push(ParserError {
1260                    error_type: "validation_error".to_string(),
1261                    field: format!("Model '{}'", model_name),
1262                    message: format!("Model '{}' missing 'fields' field", model_name),
1263                });
1264                anyhow::anyhow!("Missing fields")
1265            });
1266
1267        let fields = match fields {
1268            Ok(f) => f,
1269            Err(_) => {
1270                // Return empty table with errors
1271                let quality_rules = self.extract_quality_rules(data);
1272                let table_uuid = self.extract_table_uuid(data);
1273                let table = Table {
1274                    id: table_uuid,
1275                    name: model_name.clone(),
1276                    columns: Vec::new(),
1277                    database_type: None,
1278                    catalog_name: None,
1279                    schema_name: None,
1280                    medallion_layers: Vec::new(),
1281                    scd_pattern: None,
1282                    data_vault_classification: None,
1283                    modeling_level: None,
1284                    tags: Vec::new(),
1285                    odcl_metadata: HashMap::new(),
1286                    position: None,
1287                    yaml_file_path: None,
1288                    drawio_cell_id: None,
1289                    quality: quality_rules,
1290                    errors: Vec::new(),
1291                    created_at: chrono::Utc::now(),
1292                    updated_at: chrono::Utc::now(),
1293                };
1294                return Ok((table, errors));
1295            }
1296        };
1297
1298        // Parse fields as columns
1299        let mut columns = Vec::new();
1300        for (field_name, field_data) in fields {
1301            if let Some(field_obj) = field_data.as_object() {
1302                match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
1303                    Ok(mut cols) => columns.append(&mut cols),
1304                    Err(e) => {
1305                        errors.push(ParserError {
1306                            error_type: "field_parse_error".to_string(),
1307                            field: format!("Field '{}'", field_name),
1308                            message: e.to_string(),
1309                        });
1310                    }
1311                }
1312            } else {
1313                errors.push(ParserError {
1314                    error_type: "validation_error".to_string(),
1315                    field: format!("Field '{}'", field_name),
1316                    message: format!("Field '{}' must be an object", field_name),
1317                });
1318            }
1319        }
1320
1321        // Extract metadata from info section
1322        // The frontend expects info fields to be nested under "info" key
1323        let mut odcl_metadata = HashMap::new();
1324
1325        // Extract info section and nest it properly
1326        // Convert JsonValue object to serde_json::Value::Object (Map)
1327        if let Some(info_val) = data.get("info") {
1328            // Convert JsonValue to serde_json::Value
1329            let info_json_value = json_value_to_serde_value(info_val);
1330            odcl_metadata.insert("info".to_string(), info_json_value);
1331        }
1332
1333        odcl_metadata.insert(
1334            "dataContractSpecification".to_string(),
1335            json_value_to_serde_value(
1336                data.get("dataContractSpecification")
1337                    .unwrap_or(&JsonValue::Null),
1338            ),
1339        );
1340        odcl_metadata.insert(
1341            "id".to_string(),
1342            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1343        );
1344        // Note: model_name is not added to metadata as it's redundant (it's the table name)
1345
1346        // Extract servicelevels if present
1347        if let Some(servicelevels_val) = data.get("servicelevels") {
1348            odcl_metadata.insert(
1349                "servicelevels".to_string(),
1350                json_value_to_serde_value(servicelevels_val),
1351            );
1352        }
1353
1354        // Extract links if present
1355        if let Some(links_val) = data.get("links") {
1356            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1357        }
1358
1359        // Extract domain, dataProduct, tenant
1360        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1361            odcl_metadata.insert(
1362                "domain".to_string(),
1363                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1364            );
1365        }
1366        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1367            odcl_metadata.insert(
1368                "dataProduct".to_string(),
1369                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1370            );
1371        }
1372        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1373            odcl_metadata.insert(
1374                "tenant".to_string(),
1375                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1376            );
1377        }
1378
1379        // Extract top-level description (can be object or string)
1380        if let Some(desc_val) = data.get("description") {
1381            odcl_metadata.insert(
1382                "description".to_string(),
1383                json_value_to_serde_value(desc_val),
1384            );
1385        }
1386
1387        // Extract pricing
1388        if let Some(pricing_val) = data.get("pricing") {
1389            odcl_metadata.insert(
1390                "pricing".to_string(),
1391                json_value_to_serde_value(pricing_val),
1392            );
1393        }
1394
1395        // Extract team
1396        if let Some(team_val) = data.get("team") {
1397            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1398        }
1399
1400        // Extract roles
1401        if let Some(roles_val) = data.get("roles") {
1402            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1403        }
1404
1405        // Extract terms
1406        if let Some(terms_val) = data.get("terms") {
1407            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1408        }
1409
1410        // Extract full servers array (not just type)
1411        if let Some(servers_val) = data.get("servers") {
1412            odcl_metadata.insert(
1413                "servers".to_string(),
1414                json_value_to_serde_value(servers_val),
1415            );
1416        }
1417
1418        // Extract infrastructure
1419        if let Some(infrastructure_val) = data.get("infrastructure") {
1420            odcl_metadata.insert(
1421                "infrastructure".to_string(),
1422                json_value_to_serde_value(infrastructure_val),
1423            );
1424        }
1425
1426        // Extract database type from servers if available
1427        let database_type = self.extract_database_type_from_servers(data);
1428
1429        // Extract catalog and schema from customProperties
1430        let (catalog_name, schema_name) = self.extract_catalog_schema(data);
1431
1432        // Extract sharedDomains from customProperties
1433        let mut shared_domains: Vec<String> = Vec::new();
1434        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1435            for prop in custom_props {
1436                if let Some(prop_obj) = prop.as_object() {
1437                    let prop_key = prop_obj
1438                        .get("property")
1439                        .and_then(|v| v.as_str())
1440                        .unwrap_or("");
1441                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1442                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1443                    {
1444                        for item in arr {
1445                            if let Some(s) = item.as_str() {
1446                                shared_domains.push(s.to_string());
1447                            }
1448                        }
1449                    }
1450                }
1451            }
1452        }
1453
1454        // Extract tags from top-level tags field (Data Contract format)
1455        let mut tags = Vec::new();
1456        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1457            for item in tags_arr {
1458                if let Some(s) = item.as_str() {
1459                    tags.push(s.to_string());
1460                }
1461            }
1462        }
1463
1464        // Extract quality rules
1465        let quality_rules = self.extract_quality_rules(data);
1466
1467        // Store sharedDomains in metadata
1468        if !shared_domains.is_empty() {
1469            let shared_domains_json: Vec<serde_json::Value> = shared_domains
1470                .iter()
1471                .map(|d| serde_json::Value::String(d.clone()))
1472                .collect();
1473            odcl_metadata.insert(
1474                "sharedDomains".to_string(),
1475                serde_json::Value::Array(shared_domains_json),
1476            );
1477        }
1478
1479        let table_uuid = self.extract_table_uuid(data);
1480
1481        let table = Table {
1482            id: table_uuid,
1483            name: model_name.clone(),
1484            columns,
1485            database_type,
1486            catalog_name,
1487            schema_name,
1488            medallion_layers: Vec::new(),
1489            scd_pattern: None,
1490            data_vault_classification: None,
1491            modeling_level: None,
1492            tags,
1493            odcl_metadata,
1494            position: None,
1495            yaml_file_path: None,
1496            drawio_cell_id: None,
1497            quality: quality_rules,
1498            errors: Vec::new(),
1499            created_at: chrono::Utc::now(),
1500            updated_at: chrono::Utc::now(),
1501        };
1502
1503        info!(
1504            "Parsed Data Contract table: {} with {} warnings/errors",
1505            model_name,
1506            errors.len()
1507        );
1508        Ok((table, errors))
1509    }
1510
1511    /// Expand a nested column from a schema definition, creating columns with dot notation.
1512    ///
1513    /// This helper function recursively expands nested structures (OBJECT/STRUCT types)
1514    /// into flat columns with dot notation (e.g., "address.street", "address.city").
1515    #[allow(clippy::only_used_in_recursion)]
1516    fn expand_nested_column(
1517        &self,
1518        column_name: &str,
1519        schema: &JsonValue,
1520        nullable: bool,
1521        columns: &mut Vec<Column>,
1522        errors: &mut Vec<ParserError>,
1523    ) {
1524        let schema_obj = match schema.as_object() {
1525            Some(obj) => obj,
1526            None => {
1527                errors.push(ParserError {
1528                    error_type: "parse_error".to_string(),
1529                    field: column_name.to_string(),
1530                    message: "Nested schema must be an object".to_string(),
1531                });
1532                return;
1533            }
1534        };
1535
1536        let schema_type = schema_obj
1537            .get("type")
1538            .and_then(|v| v.as_str())
1539            .unwrap_or("object");
1540
1541        match schema_type {
1542            "object" | "struct" => {
1543                // Check if it has nested properties
1544                if let Some(properties) = schema_obj.get("properties").and_then(|v| v.as_object()) {
1545                    let nested_required: Vec<String> = schema_obj
1546                        .get("required")
1547                        .and_then(|v| v.as_array())
1548                        .map(|arr| {
1549                            arr.iter()
1550                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
1551                                .collect()
1552                        })
1553                        .unwrap_or_default();
1554
1555                    for (nested_name, nested_schema) in properties {
1556                        let nested_nullable = !nested_required.contains(nested_name);
1557                        self.expand_nested_column(
1558                            &format!("{}.{}", column_name, nested_name),
1559                            nested_schema,
1560                            nullable || nested_nullable,
1561                            columns,
1562                            errors,
1563                        );
1564                    }
1565                } else {
1566                    // Object without properties - create as OBJECT type
1567                    let description = schema_obj
1568                        .get("description")
1569                        .and_then(|v| v.as_str())
1570                        .unwrap_or("")
1571                        .to_string();
1572                    columns.push(Column {
1573                        name: column_name.to_string(),
1574                        data_type: "OBJECT".to_string(),
1575                        nullable,
1576                        primary_key: false,
1577                        secondary_key: false,
1578                        composite_key: None,
1579                        foreign_key: None,
1580                        constraints: Vec::new(),
1581                        description,
1582                        quality: Vec::new(),
1583                        enum_values: Vec::new(),
1584                        errors: Vec::new(),
1585                        column_order: 0,
1586                    });
1587                }
1588            }
1589            "array" => {
1590                // Handle array types
1591                let items = schema_obj.get("items").unwrap_or(schema);
1592                let items_type = items
1593                    .as_object()
1594                    .and_then(|obj| obj.get("type").and_then(|v| v.as_str()))
1595                    .unwrap_or("string");
1596
1597                if items_type == "object" || items_type == "struct" {
1598                    // Array of objects - expand nested structure
1599                    let description = schema_obj
1600                        .get("description")
1601                        .and_then(|v| v.as_str())
1602                        .unwrap_or("")
1603                        .to_string();
1604                    columns.push(Column {
1605                        name: column_name.to_string(),
1606                        data_type: "ARRAY<OBJECT>".to_string(),
1607                        nullable,
1608                        primary_key: false,
1609                        secondary_key: false,
1610                        composite_key: None,
1611                        foreign_key: None,
1612                        constraints: Vec::new(),
1613                        description,
1614                        quality: Vec::new(),
1615                        enum_values: Vec::new(),
1616                        errors: Vec::new(),
1617                        column_order: 0,
1618                    });
1619                    // Also expand nested properties with array prefix
1620                    if let Some(properties) = items
1621                        .as_object()
1622                        .and_then(|obj| obj.get("properties").and_then(|v| v.as_object()))
1623                    {
1624                        let nested_required: Vec<String> = items
1625                            .as_object()
1626                            .and_then(|obj| obj.get("required").and_then(|v| v.as_array()))
1627                            .map(|arr| {
1628                                arr.iter()
1629                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1630                                    .collect()
1631                            })
1632                            .unwrap_or_default();
1633
1634                        for (nested_name, nested_schema) in properties {
1635                            let nested_nullable = !nested_required.contains(nested_name);
1636                            self.expand_nested_column(
1637                                &format!("{}.{}", column_name, nested_name),
1638                                nested_schema,
1639                                nullable || nested_nullable,
1640                                columns,
1641                                errors,
1642                            );
1643                        }
1644                    }
1645                } else {
1646                    // Array of primitives
1647                    let data_type = format!("ARRAY<{}>", items_type.to_uppercase());
1648                    let description = schema_obj
1649                        .get("description")
1650                        .and_then(|v| v.as_str())
1651                        .unwrap_or("")
1652                        .to_string();
1653                    columns.push(Column {
1654                        name: column_name.to_string(),
1655                        data_type,
1656                        nullable,
1657                        primary_key: false,
1658                        secondary_key: false,
1659                        composite_key: None,
1660                        foreign_key: None,
1661                        constraints: Vec::new(),
1662                        description,
1663                        quality: Vec::new(),
1664                        enum_values: Vec::new(),
1665                        errors: Vec::new(),
1666                        column_order: 0,
1667                    });
1668                }
1669            }
1670            _ => {
1671                // Simple type
1672                let data_type = schema_type.to_uppercase();
1673                let description = schema_obj
1674                    .get("description")
1675                    .and_then(|v| v.as_str())
1676                    .unwrap_or("")
1677                    .to_string();
1678                let enum_values = schema_obj
1679                    .get("enum")
1680                    .and_then(|v| v.as_array())
1681                    .map(|arr| {
1682                        arr.iter()
1683                            .filter_map(|v| v.as_str().map(|s| s.to_string()))
1684                            .collect()
1685                    })
1686                    .unwrap_or_default();
1687                columns.push(Column {
1688                    name: column_name.to_string(),
1689                    data_type,
1690                    nullable,
1691                    primary_key: false,
1692                    secondary_key: false,
1693                    composite_key: None,
1694                    foreign_key: None,
1695                    constraints: Vec::new(),
1696                    description,
1697                    quality: Vec::new(),
1698                    enum_values,
1699                    errors: Vec::new(),
1700                    column_order: 0,
1701                });
1702            }
1703        }
1704    }
1705
1706    /// Parse a single field from Data Contract format.
1707    fn parse_data_contract_field(
1708        &self,
1709        field_name: &str,
1710        field_data: &serde_json::Map<String, JsonValue>,
1711        data: &JsonValue,
1712        errors: &mut Vec<ParserError>,
1713    ) -> Result<Vec<Column>> {
1714        let mut columns = Vec::new();
1715
1716        // Helper function to extract quality rules from a JSON object
1717        let extract_quality_from_obj =
1718            |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
1719                let mut quality_rules = Vec::new();
1720                if let Some(quality_val) = obj.get("quality") {
1721                    if let Some(arr) = quality_val.as_array() {
1722                        // Array of quality rules
1723                        for item in arr {
1724                            if let Some(rule_obj) = item.as_object() {
1725                                let mut rule = HashMap::new();
1726                                for (key, value) in rule_obj {
1727                                    rule.insert(key.clone(), json_value_to_serde_value(value));
1728                                }
1729                                quality_rules.push(rule);
1730                            }
1731                        }
1732                    } else if let Some(rule_obj) = quality_val.as_object() {
1733                        // Single quality rule object
1734                        let mut rule = HashMap::new();
1735                        for (key, value) in rule_obj {
1736                            rule.insert(key.clone(), json_value_to_serde_value(value));
1737                        }
1738                        quality_rules.push(rule);
1739                    }
1740                }
1741                quality_rules
1742            };
1743
1744        // Check for $ref
1745        if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
1746            if let Some(definition) = self.resolve_ref(ref_str, data) {
1747                // Extract quality rules from field-level first (takes precedence)
1748                let mut quality_rules = extract_quality_from_obj(field_data);
1749
1750                // Also extract quality rules from definition and merge (if field doesn't have any)
1751                if quality_rules.is_empty() {
1752                    if let Some(def_obj) = definition.as_object() {
1753                        quality_rules = extract_quality_from_obj(def_obj);
1754                    }
1755                } else {
1756                    // Merge definition quality rules if field has some
1757                    if let Some(def_obj) = definition.as_object() {
1758                        let def_quality = extract_quality_from_obj(def_obj);
1759                        // Append definition quality rules (field-level takes precedence)
1760                        quality_rules.extend(def_quality);
1761                    }
1762                }
1763
1764                let required = field_data
1765                    .get("required")
1766                    .and_then(|v| v.as_bool())
1767                    .unwrap_or(false);
1768
1769                // If required=true, add not_null quality rule if not present
1770                if required {
1771                    let has_not_null = quality_rules.iter().any(|rule| {
1772                        rule.get("type")
1773                            .and_then(|v| v.as_str())
1774                            .map(|s| {
1775                                s.to_lowercase().contains("not_null")
1776                                    || s.to_lowercase().contains("notnull")
1777                            })
1778                            .unwrap_or(false)
1779                    });
1780                    if !has_not_null {
1781                        let mut not_null_rule = HashMap::new();
1782                        not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
1783                        not_null_rule.insert(
1784                            "description".to_string(),
1785                            serde_json::json!("Column must not be null"),
1786                        );
1787                        quality_rules.push(not_null_rule);
1788                    }
1789                }
1790
1791                // Check if definition is an object/struct with nested structure
1792                let has_nested = definition
1793                    .get("type")
1794                    .and_then(|v| v.as_str())
1795                    .map(|s| s == "object")
1796                    .unwrap_or(false)
1797                    || definition.get("properties").is_some()
1798                    || definition.get("fields").is_some();
1799
1800                if has_nested {
1801                    // Expand STRUCT from definition into nested columns with dot notation
1802                    if let Some(properties) =
1803                        definition.get("properties").and_then(|v| v.as_object())
1804                    {
1805                        // Recursively expand nested properties
1806                        let nested_required: Vec<String> = definition
1807                            .get("required")
1808                            .and_then(|v| v.as_array())
1809                            .map(|arr| {
1810                                arr.iter()
1811                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1812                                    .collect()
1813                            })
1814                            .unwrap_or_default();
1815
1816                        for (nested_name, nested_schema) in properties {
1817                            let nested_required_field = nested_required.contains(nested_name);
1818                            self.expand_nested_column(
1819                                &format!("{}.{}", field_name, nested_name),
1820                                nested_schema,
1821                                !nested_required_field,
1822                                &mut columns,
1823                                errors,
1824                            );
1825                        }
1826                    } else if let Some(fields) =
1827                        definition.get("fields").and_then(|v| v.as_object())
1828                    {
1829                        // Handle fields format (ODCL style)
1830                        for (nested_name, nested_schema) in fields {
1831                            self.expand_nested_column(
1832                                &format!("{}.{}", field_name, nested_name),
1833                                nested_schema,
1834                                true, // Assume nullable if not specified
1835                                &mut columns,
1836                                errors,
1837                            );
1838                        }
1839                    } else {
1840                        // Fallback: create parent column as OBJECT if we can't expand
1841                        columns.push(Column {
1842                            name: field_name.to_string(),
1843                            data_type: "OBJECT".to_string(),
1844                            nullable: !required,
1845                            primary_key: false,
1846                            secondary_key: false,
1847                            composite_key: None,
1848                            foreign_key: None,
1849                            constraints: Vec::new(),
1850                            description: field_data
1851                                .get("description")
1852                                .or_else(|| definition.get("description"))
1853                                .and_then(|v| v.as_str())
1854                                .unwrap_or("")
1855                                .to_string(),
1856                            errors: Vec::new(),
1857                            quality: quality_rules.clone(),
1858                            enum_values: Vec::new(),
1859                            column_order: 0,
1860                        });
1861                    }
1862                } else {
1863                    // Simple type from definition
1864                    let def_type = definition
1865                        .get("type")
1866                        .and_then(|v| v.as_str())
1867                        .unwrap_or("STRING")
1868                        .to_uppercase();
1869
1870                    let enum_values = definition
1871                        .get("enum")
1872                        .and_then(|v| v.as_array())
1873                        .map(|arr| {
1874                            arr.iter()
1875                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
1876                                .collect()
1877                        })
1878                        .unwrap_or_default();
1879
1880                    columns.push(Column {
1881                        name: field_name.to_string(),
1882                        data_type: def_type,
1883                        nullable: !required,
1884                        primary_key: false,
1885                        secondary_key: false,
1886                        composite_key: None,
1887                        foreign_key: None,
1888                        constraints: Vec::new(),
1889                        description: field_data
1890                            .get("description")
1891                            .or_else(|| definition.get("description"))
1892                            .and_then(|v| v.as_str())
1893                            .unwrap_or("")
1894                            .to_string(),
1895                        errors: Vec::new(),
1896                        quality: quality_rules,
1897                        enum_values,
1898                        column_order: 0,
1899                    });
1900                }
1901                return Ok(columns);
1902            } else {
1903                // Undefined reference - create column with error
1904                let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
1905                let mut error_map = HashMap::new();
1906                error_map.insert("type".to_string(), serde_json::json!("validation_error"));
1907                error_map.insert("field".to_string(), serde_json::json!("data_type"));
1908                error_map.insert(
1909                    "message".to_string(),
1910                    serde_json::json!(format!(
1911                        "Field '{}' references undefined definition: {}",
1912                        field_name, ref_str
1913                    )),
1914                );
1915                col_errors.push(error_map);
1916                columns.push(Column {
1917                    name: field_name.to_string(),
1918                    data_type: "OBJECT".to_string(),
1919                    nullable: true,
1920                    primary_key: false,
1921                    secondary_key: false,
1922                    composite_key: None,
1923                    foreign_key: None,
1924                    constraints: Vec::new(),
1925                    description: field_data
1926                        .get("description")
1927                        .and_then(|v| v.as_str())
1928                        .unwrap_or("")
1929                        .to_string(),
1930                    errors: col_errors,
1931                    quality: Vec::new(),
1932                    enum_values: Vec::new(),
1933                    column_order: 0,
1934                });
1935                return Ok(columns);
1936            }
1937        }
1938
1939        // Extract field type - default to STRING if missing
1940        let field_type_str = field_data
1941            .get("type")
1942            .and_then(|v| v.as_str())
1943            .unwrap_or("STRING");
1944
1945        // Check if type contains STRUCT definition (multiline STRUCT type)
1946        if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
1947            match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
1948                Ok(nested_cols) if !nested_cols.is_empty() => {
1949                    // We have nested columns - add parent column with full type, then nested columns
1950                    let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
1951                        "ARRAY<STRUCT<...>>".to_string()
1952                    } else {
1953                        "STRUCT<...>".to_string()
1954                    };
1955
1956                    // Add parent column
1957                    columns.push(Column {
1958                        name: field_name.to_string(),
1959                        data_type: parent_data_type,
1960                        nullable: !field_data
1961                            .get("required")
1962                            .and_then(|v| v.as_bool())
1963                            .unwrap_or(false),
1964                        primary_key: false,
1965                        secondary_key: false,
1966                        composite_key: None,
1967                        foreign_key: None,
1968                        constraints: Vec::new(),
1969                        description: field_data
1970                            .get("description")
1971                            .and_then(|v| v.as_str())
1972                            .unwrap_or("")
1973                            .to_string(),
1974                        errors: Vec::new(),
1975                        quality: Vec::new(),
1976                        enum_values: Vec::new(),
1977                        column_order: 0,
1978                    });
1979
1980                    // Add nested columns
1981                    columns.extend(nested_cols);
1982                    return Ok(columns);
1983                }
1984                Ok(_) | Err(_) => {
1985                    // If parsing fails or returns empty, fall back to using the type as-is
1986                }
1987            }
1988        }
1989
1990        let field_type = normalize_data_type(field_type_str);
1991
1992        // Handle ARRAY type
1993        if field_type == "ARRAY" {
1994            let items = field_data.get("items");
1995            if let Some(items_val) = items {
1996                if let Some(items_obj) = items_val.as_object() {
1997                    // Check if items is an object with fields (nested structure)
1998                    if items_obj.get("fields").is_some()
1999                        || items_obj.get("type").and_then(|v| v.as_str()) == Some("object")
2000                    {
2001                        // Array of objects - create parent column as ARRAY<OBJECT>
2002                        columns.push(Column {
2003                            name: field_name.to_string(),
2004                            data_type: "ARRAY<OBJECT>".to_string(),
2005                            nullable: !field_data
2006                                .get("required")
2007                                .and_then(|v| v.as_bool())
2008                                .unwrap_or(false),
2009                            primary_key: false,
2010                            secondary_key: false,
2011                            composite_key: None,
2012                            foreign_key: None,
2013                            constraints: Vec::new(),
2014                            description: field_data
2015                                .get("description")
2016                                .and_then(|v| v.as_str())
2017                                .unwrap_or("")
2018                                .to_string(),
2019                            errors: Vec::new(),
2020                            quality: Vec::new(),
2021                            enum_values: Vec::new(),
2022                            column_order: 0,
2023                        });
2024
2025                        // Extract nested fields from items.properties or items.fields if present
2026                        // Note: Export saves nested columns in properties.properties, but some formats use fields
2027                        let nested_fields_obj = items_obj
2028                            .get("properties")
2029                            .and_then(|v| v.as_object())
2030                            .or_else(|| items_obj.get("fields").and_then(|v| v.as_object()));
2031
2032                        if let Some(fields_obj) = nested_fields_obj {
2033                            for (nested_field_name, nested_field_data) in fields_obj {
2034                                if let Some(nested_field_obj) = nested_field_data.as_object() {
2035                                    let nested_field_type = nested_field_obj
2036                                        .get("type")
2037                                        .and_then(|v| v.as_str())
2038                                        .unwrap_or("STRING");
2039
2040                                    // Recursively parse nested fields with array prefix
2041                                    let nested_col_name =
2042                                        format!("{}.{}", field_name, nested_field_name);
2043                                    let mut local_errors = Vec::new();
2044                                    match self.parse_data_contract_field(
2045                                        &nested_col_name,
2046                                        nested_field_obj,
2047                                        data,
2048                                        &mut local_errors,
2049                                    ) {
2050                                        Ok(mut nested_cols) => {
2051                                            // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2052                                            // Otherwise, it returns a single column
2053                                            columns.append(&mut nested_cols);
2054                                        }
2055                                        Err(_) => {
2056                                            // Fallback: create simple nested column
2057                                            columns.push(Column {
2058                                                name: nested_col_name,
2059                                                data_type: nested_field_type.to_uppercase(),
2060                                                nullable: !nested_field_obj
2061                                                    .get("required")
2062                                                    .and_then(|v| v.as_bool())
2063                                                    .unwrap_or(false),
2064                                                primary_key: false,
2065                                                secondary_key: false,
2066                                                composite_key: None,
2067                                                foreign_key: None,
2068                                                constraints: Vec::new(),
2069                                                description: nested_field_obj
2070                                                    .get("description")
2071                                                    .and_then(|v| v.as_str())
2072                                                    .unwrap_or("")
2073                                                    .to_string(),
2074                                                errors: Vec::new(),
2075                                                quality: Vec::new(),
2076                                                enum_values: Vec::new(),
2077                                                column_order: 0,
2078                                            });
2079                                        }
2080                                    }
2081                                }
2082                            }
2083                        }
2084
2085                        return Ok(columns);
2086                    } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
2087                        // Array of simple type
2088                        columns.push(Column {
2089                            name: field_name.to_string(),
2090                            data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
2091                            nullable: !field_data
2092                                .get("required")
2093                                .and_then(|v| v.as_bool())
2094                                .unwrap_or(false),
2095                            primary_key: false,
2096                            secondary_key: false,
2097                            composite_key: None,
2098                            foreign_key: None,
2099                            constraints: Vec::new(),
2100                            description: field_data
2101                                .get("description")
2102                                .and_then(|v| v.as_str())
2103                                .unwrap_or("")
2104                                .to_string(),
2105                            errors: Vec::new(),
2106                            quality: Vec::new(),
2107                            enum_values: Vec::new(),
2108                            column_order: 0,
2109                        });
2110                        return Ok(columns);
2111                    }
2112                } else if let Some(item_type_str) = items_val.as_str() {
2113                    // Array of simple type (string)
2114                    columns.push(Column {
2115                        name: field_name.to_string(),
2116                        data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
2117                        nullable: !field_data
2118                            .get("required")
2119                            .and_then(|v| v.as_bool())
2120                            .unwrap_or(false),
2121                        primary_key: false,
2122                        secondary_key: false,
2123                        composite_key: None,
2124                        foreign_key: None,
2125                        constraints: Vec::new(),
2126                        description: field_data
2127                            .get("description")
2128                            .and_then(|v| v.as_str())
2129                            .unwrap_or("")
2130                            .to_string(),
2131                        errors: Vec::new(),
2132                        quality: Vec::new(),
2133                        enum_values: Vec::new(),
2134                        column_order: 0,
2135                    });
2136                    return Ok(columns);
2137                }
2138            }
2139            // Array without items - default to ARRAY<STRING>
2140            columns.push(Column {
2141                name: field_name.to_string(),
2142                data_type: "ARRAY<STRING>".to_string(),
2143                nullable: !field_data
2144                    .get("required")
2145                    .and_then(|v| v.as_bool())
2146                    .unwrap_or(false),
2147                primary_key: false,
2148                secondary_key: false,
2149                composite_key: None,
2150                foreign_key: None,
2151                constraints: Vec::new(),
2152                description: field_data
2153                    .get("description")
2154                    .and_then(|v| v.as_str())
2155                    .unwrap_or("")
2156                    .to_string(),
2157                errors: Vec::new(),
2158                quality: Vec::new(),
2159                enum_values: Vec::new(),
2160                column_order: 0,
2161            });
2162            return Ok(columns);
2163        }
2164
2165        // Check if this is a nested object with fields or properties
2166        // Note: Export saves nested columns in properties.properties, but some formats use fields
2167        let nested_fields_obj = field_data
2168            .get("properties")
2169            .and_then(|v| v.as_object())
2170            .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
2171
2172        if field_type == "OBJECT"
2173            && let Some(fields_obj) = nested_fields_obj
2174        {
2175            // Inline nested object - create parent column as OBJECT and extract nested fields
2176
2177            // Create parent column
2178            columns.push(Column {
2179                name: field_name.to_string(),
2180                data_type: "OBJECT".to_string(),
2181                nullable: !field_data
2182                    .get("required")
2183                    .and_then(|v| v.as_bool())
2184                    .unwrap_or(false),
2185                primary_key: false,
2186                secondary_key: false,
2187                composite_key: None,
2188                foreign_key: None,
2189                constraints: Vec::new(),
2190                description: field_data
2191                    .get("description")
2192                    .and_then(|v| v.as_str())
2193                    .unwrap_or("")
2194                    .to_string(),
2195                errors: Vec::new(),
2196                quality: Vec::new(),
2197                enum_values: Vec::new(),
2198                column_order: 0,
2199            });
2200
2201            // Extract nested fields recursively
2202            for (nested_field_name, nested_field_data) in fields_obj {
2203                if let Some(nested_field_obj) = nested_field_data.as_object() {
2204                    let nested_field_type = nested_field_obj
2205                        .get("type")
2206                        .and_then(|v| v.as_str())
2207                        .unwrap_or("STRING");
2208
2209                    // Recursively parse nested fields
2210                    let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2211                    match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data) {
2212                        Ok(mut nested_cols) => {
2213                            // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2214                            // Otherwise, it returns a single column
2215                            columns.append(&mut nested_cols);
2216                        }
2217                        Err(_) => {
2218                            // Fallback: create simple nested column
2219                            columns.push(Column {
2220                                name: nested_col_name,
2221                                data_type: nested_field_type.to_uppercase(),
2222                                nullable: !nested_field_obj
2223                                    .get("required")
2224                                    .and_then(|v| v.as_bool())
2225                                    .unwrap_or(false),
2226                                primary_key: false,
2227                                secondary_key: false,
2228                                composite_key: None,
2229                                foreign_key: None,
2230                                constraints: Vec::new(),
2231                                description: nested_field_obj
2232                                    .get("description")
2233                                    .and_then(|v| v.as_str())
2234                                    .unwrap_or("")
2235                                    .to_string(),
2236                                errors: Vec::new(),
2237                                quality: Vec::new(),
2238                                enum_values: Vec::new(),
2239                                column_order: 0,
2240                            });
2241                        }
2242                    }
2243                }
2244            }
2245
2246            return Ok(columns);
2247        }
2248
2249        // Regular field
2250        let required = field_data
2251            .get("required")
2252            .and_then(|v| v.as_bool())
2253            .unwrap_or(false);
2254        let description = field_data
2255            .get("description")
2256            .and_then(|v| v.as_str())
2257            .unwrap_or("")
2258            .to_string();
2259
2260        // Extract column-level quality rules
2261        let mut column_quality_rules = Vec::new();
2262        if let Some(quality_val) = field_data.get("quality") {
2263            if let Some(arr) = quality_val.as_array() {
2264                // Array of quality rules
2265                for item in arr {
2266                    if let Some(obj) = item.as_object() {
2267                        let mut rule = HashMap::new();
2268                        for (key, value) in obj {
2269                            rule.insert(key.clone(), json_value_to_serde_value(value));
2270                        }
2271                        column_quality_rules.push(rule);
2272                    }
2273                }
2274            } else if let Some(obj) = quality_val.as_object() {
2275                // Single quality rule object
2276                let mut rule = HashMap::new();
2277                for (key, value) in obj {
2278                    rule.insert(key.clone(), json_value_to_serde_value(value));
2279                }
2280                column_quality_rules.push(rule);
2281            }
2282        }
2283
2284        // If required=true (nullable=false), add a "not_null" quality rule if not already present
2285        if required {
2286            let has_not_null = column_quality_rules.iter().any(|rule| {
2287                rule.get("type")
2288                    .and_then(|v| v.as_str())
2289                    .map(|s| {
2290                        s.to_lowercase().contains("not_null")
2291                            || s.to_lowercase().contains("notnull")
2292                    })
2293                    .unwrap_or(false)
2294            });
2295            if !has_not_null {
2296                let mut not_null_rule = HashMap::new();
2297                not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
2298                not_null_rule.insert(
2299                    "description".to_string(),
2300                    serde_json::json!("Column must not be null"),
2301                );
2302                column_quality_rules.push(not_null_rule);
2303            }
2304        }
2305
2306        columns.push(Column {
2307            name: field_name.to_string(),
2308            data_type: field_type,
2309            nullable: !required,
2310            primary_key: field_data
2311                .get("primaryKey")
2312                .and_then(|v| v.as_bool())
2313                .unwrap_or(false),
2314            secondary_key: false,
2315            composite_key: None,
2316            foreign_key: self.parse_foreign_key_from_data_contract(field_data),
2317            constraints: Vec::new(),
2318            description,
2319            errors: Vec::new(),
2320            quality: column_quality_rules,
2321            enum_values: Vec::new(),
2322            column_order: 0,
2323        });
2324
2325        Ok(columns)
2326    }
2327
2328    /// Parse foreign key from Data Contract field data.
2329    fn parse_foreign_key_from_data_contract(
2330        &self,
2331        field_data: &serde_json::Map<String, JsonValue>,
2332    ) -> Option<ForeignKey> {
2333        field_data
2334            .get("foreignKey")
2335            .and_then(|v| v.as_object())
2336            .map(|fk_obj| ForeignKey {
2337                table_id: fk_obj
2338                    .get("table")
2339                    .or_else(|| fk_obj.get("table_id"))
2340                    .and_then(|v| v.as_str())
2341                    .unwrap_or("")
2342                    .to_string(),
2343                column_name: fk_obj
2344                    .get("column")
2345                    .or_else(|| fk_obj.get("column_name"))
2346                    .and_then(|v| v.as_str())
2347                    .unwrap_or("")
2348                    .to_string(),
2349            })
2350    }
2351
2352    /// Extract database type from servers in Data Contract format.
2353    fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2354        // Data Contract format: servers can be object or array
2355        if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
2356            // Object format: { "server_name": { "type": "..." } }
2357            if let Some((_, server_data)) = servers_obj.iter().next()
2358                && let Some(server_obj) = server_data.as_object()
2359            {
2360                return server_obj
2361                    .get("type")
2362                    .and_then(|v| v.as_str())
2363                    .and_then(|s| self.parse_database_type(s));
2364            }
2365        } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
2366            // Array format: [ { "server": "...", "type": "..." } ]
2367            if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
2368                return server_obj
2369                    .get("type")
2370                    .and_then(|v| v.as_str())
2371                    .and_then(|s| self.parse_database_type(s));
2372            }
2373        }
2374        None
2375    }
2376
2377    /// Parse database type string to enum.
2378    fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
2379        match s.to_lowercase().as_str() {
2380            "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
2381            "postgres" | "postgresql" => Some(DatabaseType::Postgres),
2382            "mysql" => Some(DatabaseType::Mysql),
2383            "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
2384            "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
2385            _ => None,
2386        }
2387    }
2388
2389    /// Parse STRUCT type definition from string (e.g., "ARRAY<STRUCT<ID: STRING, NAME: STRING>>").
2390    fn parse_struct_type_from_string(
2391        &self,
2392        field_name: &str,
2393        type_str: &str,
2394        field_data: &serde_json::Map<String, JsonValue>,
2395    ) -> Result<Vec<Column>> {
2396        let mut columns = Vec::new();
2397
2398        // Normalize whitespace - replace newlines and multiple spaces with single space
2399        let normalized_type = type_str
2400            .lines()
2401            .map(|line| line.trim())
2402            .filter(|line| !line.is_empty())
2403            .collect::<Vec<_>>()
2404            .join(" ");
2405
2406        let type_str_upper = normalized_type.to_uppercase();
2407
2408        // Check if it's ARRAY<STRUCT<...>>
2409        let _is_array = type_str_upper.starts_with("ARRAY<");
2410        let struct_start = type_str_upper.find("STRUCT<");
2411
2412        if let Some(start_pos) = struct_start {
2413            // Extract STRUCT content
2414            let struct_content = &normalized_type[start_pos + 7..]; // Skip "STRUCT<"
2415
2416            // Find matching closing bracket, handling nested STRUCTs
2417            let mut depth = 1;
2418            let mut end_pos = None;
2419            for (i, ch) in struct_content.char_indices() {
2420                match ch {
2421                    '<' => depth += 1,
2422                    '>' => {
2423                        depth -= 1;
2424                        if depth == 0 {
2425                            end_pos = Some(i);
2426                            break;
2427                        }
2428                    }
2429                    _ => {}
2430                }
2431            }
2432
2433            // If no closing bracket found, try to infer it (handle malformed YAML)
2434            let struct_fields_str = if let Some(end) = end_pos {
2435                &struct_content[..end]
2436            } else {
2437                // Missing closing bracket - use everything up to the end
2438                struct_content.trim_end_matches('>').trim()
2439            };
2440
2441            // Parse fields: "ID: STRING, NAME: STRING, ..."
2442            let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
2443
2444            // Create nested columns, handling nested STRUCTs recursively
2445            for (nested_name, nested_type) in fields {
2446                let nested_type_upper = nested_type.to_uppercase();
2447                let nested_col_name = format!("{}.{}", field_name, nested_name);
2448
2449                // Check if this field is itself a STRUCT or ARRAY<STRUCT>
2450                if nested_type_upper.starts_with("STRUCT<") {
2451                    // Recursively parse nested STRUCT by creating a synthetic field_data
2452                    // and calling parse_struct_type_from_string recursively
2453                    let nested_struct_type_str = if nested_type_upper.starts_with("ARRAY<STRUCT<") {
2454                        // Handle ARRAY<STRUCT<...>>
2455                        nested_type.clone()
2456                    } else {
2457                        // Handle STRUCT<...>
2458                        nested_type.clone()
2459                    };
2460
2461                    // Recursively parse the nested STRUCT
2462                    match self.parse_struct_type_from_string(
2463                        &nested_col_name,
2464                        &nested_struct_type_str,
2465                        field_data,
2466                    ) {
2467                        Ok(nested_cols) => {
2468                            // Add all recursively parsed columns
2469                            columns.extend(nested_cols);
2470                        }
2471                        Err(_) => {
2472                            // If recursive parsing fails, add the parent column with STRUCT type
2473                            columns.push(Column {
2474                                name: nested_col_name,
2475                                data_type: normalize_data_type(&nested_type),
2476                                nullable: !field_data
2477                                    .get("required")
2478                                    .and_then(|v| v.as_bool())
2479                                    .unwrap_or(false),
2480                                primary_key: false,
2481                                secondary_key: false,
2482                                composite_key: None,
2483                                foreign_key: None,
2484                                constraints: Vec::new(),
2485                                description: field_data
2486                                    .get("description")
2487                                    .and_then(|v| v.as_str())
2488                                    .unwrap_or("")
2489                                    .to_string(),
2490                                errors: Vec::new(),
2491                                quality: Vec::new(),
2492                                enum_values: Vec::new(),
2493                                column_order: 0,
2494                            });
2495                        }
2496                    }
2497                } else {
2498                    // Simple nested field (not a STRUCT)
2499                    columns.push(Column {
2500                        name: nested_col_name,
2501                        data_type: normalize_data_type(&nested_type),
2502                        nullable: !field_data
2503                            .get("required")
2504                            .and_then(|v| v.as_bool())
2505                            .unwrap_or(false),
2506                        primary_key: false,
2507                        secondary_key: false,
2508                        composite_key: None,
2509                        foreign_key: None,
2510                        constraints: Vec::new(),
2511                        description: field_data
2512                            .get("description")
2513                            .and_then(|v| v.as_str())
2514                            .unwrap_or("")
2515                            .to_string(),
2516                        errors: Vec::new(),
2517                        quality: Vec::new(),
2518                        enum_values: Vec::new(),
2519                        column_order: 0,
2520                    });
2521                }
2522            }
2523
2524            return Ok(columns);
2525        }
2526
2527        // If no STRUCT found, return empty (fallback to regular parsing)
2528        Ok(Vec::new())
2529    }
2530
2531    /// Parse STRUCT fields from string (e.g., "ID: STRING, NAME: STRING").
2532    fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
2533        let mut fields = Vec::new();
2534        let mut current_field = String::new();
2535        let mut depth = 0;
2536        let mut in_string = false;
2537        let mut string_char = None;
2538
2539        for ch in fields_str.chars() {
2540            match ch {
2541                '\'' | '"' if !in_string || Some(ch) == string_char => {
2542                    if in_string {
2543                        in_string = false;
2544                        string_char = None;
2545                    } else {
2546                        in_string = true;
2547                        string_char = Some(ch);
2548                    }
2549                    current_field.push(ch);
2550                }
2551                '<' if !in_string => {
2552                    depth += 1;
2553                    current_field.push(ch);
2554                }
2555                '>' if !in_string => {
2556                    depth -= 1;
2557                    current_field.push(ch);
2558                }
2559                ',' if !in_string && depth == 0 => {
2560                    // End of current field
2561                    let trimmed = current_field.trim();
2562                    if !trimmed.is_empty()
2563                        && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2564                    {
2565                        fields.push((name, type_part));
2566                    }
2567                    current_field.clear();
2568                }
2569                _ => {
2570                    current_field.push(ch);
2571                }
2572            }
2573        }
2574
2575        // Handle last field
2576        let trimmed = current_field.trim();
2577        if !trimmed.is_empty()
2578            && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2579        {
2580            fields.push((name, type_part));
2581        }
2582
2583        Ok(fields)
2584    }
2585
2586    /// Parse a single field definition (e.g., "ID: STRING" or "ALERTOPERATION: STRUCT<...>").
2587    fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
2588        // Split by colon, but handle nested STRUCTs
2589        let colon_pos = field_def.find(':')?;
2590        let name = field_def[..colon_pos].trim().to_string();
2591        let type_part = field_def[colon_pos + 1..].trim().to_string();
2592
2593        if name.is_empty() || type_part.is_empty() {
2594            return None;
2595        }
2596
2597        Some((name, type_part))
2598    }
2599
2600    /// Extract catalog and schema from customProperties.
2601    fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
2602        let mut catalog_name = None;
2603        let mut schema_name = None;
2604
2605        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2606            for prop in custom_props {
2607                if let Some(prop_obj) = prop.as_object() {
2608                    let prop_key = prop_obj
2609                        .get("property")
2610                        .and_then(|v| v.as_str())
2611                        .unwrap_or("");
2612                    let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
2613
2614                    match prop_key {
2615                        "catalogName" | "catalog_name" => {
2616                            catalog_name = prop_value.map(|s| s.to_string());
2617                        }
2618                        "schemaName" | "schema_name" => {
2619                            schema_name = prop_value.map(|s| s.to_string());
2620                        }
2621                        _ => {}
2622                    }
2623                }
2624            }
2625        }
2626
2627        // Also check direct fields
2628        if catalog_name.is_none() {
2629            catalog_name = data
2630                .get("catalog_name")
2631                .and_then(|v| v.as_str())
2632                .map(|s| s.to_string());
2633        }
2634        if schema_name.is_none() {
2635            schema_name = data
2636                .get("schema_name")
2637                .and_then(|v| v.as_str())
2638                .map(|s| s.to_string());
2639        }
2640
2641        (catalog_name, schema_name)
2642    }
2643}
2644
2645impl Default for ODCSImporter {
2646    fn default() -> Self {
2647        Self::new()
2648    }
2649}
2650
2651/// Parser error structure for detailed error reporting.
2652#[derive(Debug, Clone)]
2653pub struct ParserError {
2654    pub error_type: String,
2655    pub field: String,
2656    pub message: String,
2657}
2658
2659/// Convert YAML Value to JSON Value for easier manipulation.
2660fn yaml_to_json_value(yaml: &serde_yaml::Value) -> Result<JsonValue> {
2661    // Convert YAML to JSON via serialization
2662    let json_str = serde_json::to_string(yaml).context("Failed to convert YAML to JSON")?;
2663    serde_json::from_str(&json_str).context("Failed to parse JSON")
2664}
2665
2666/// Convert JSON Value to serde_json::Value for storage in HashMap.
2667fn json_value_to_serde_value(value: &JsonValue) -> serde_json::Value {
2668    value.clone()
2669}
2670
2671/// Normalize data type to uppercase, preserving STRUCT<...>, ARRAY<...>, MAP<...> format.
2672fn normalize_data_type(data_type: &str) -> String {
2673    if data_type.is_empty() {
2674        return data_type.to_string();
2675    }
2676
2677    let upper = data_type.to_uppercase();
2678
2679    // Handle STRUCT<...>, ARRAY<...>, MAP<...> preserving inner content
2680    if upper.starts_with("STRUCT") {
2681        if let Some(start) = data_type.find('<')
2682            && let Some(end) = data_type.rfind('>')
2683        {
2684            let inner = &data_type[start + 1..end];
2685            return format!("STRUCT<{}>", inner);
2686        }
2687        return format!("STRUCT{}", &data_type[6..]);
2688    } else if upper.starts_with("ARRAY") {
2689        if let Some(start) = data_type.find('<')
2690            && let Some(end) = data_type.rfind('>')
2691        {
2692            let inner = &data_type[start + 1..end];
2693            return format!("ARRAY<{}>", inner);
2694        }
2695        return format!("ARRAY{}", &data_type[5..]);
2696    } else if upper.starts_with("MAP") {
2697        if let Some(start) = data_type.find('<')
2698            && let Some(end) = data_type.rfind('>')
2699        {
2700            let inner = &data_type[start + 1..end];
2701            return format!("MAP<{}>", inner);
2702        }
2703        return format!("MAP{}", &data_type[3..]);
2704    }
2705
2706    upper
2707}
2708
2709// Helper functions for enum parsing
2710fn parse_medallion_layer(s: &str) -> Result<MedallionLayer> {
2711    match s.to_uppercase().as_str() {
2712        "BRONZE" => Ok(MedallionLayer::Bronze),
2713        "SILVER" => Ok(MedallionLayer::Silver),
2714        "GOLD" => Ok(MedallionLayer::Gold),
2715        "OPERATIONAL" => Ok(MedallionLayer::Operational),
2716        _ => Err(anyhow::anyhow!("Unknown medallion layer: {}", s)),
2717    }
2718}
2719
2720fn parse_scd_pattern(s: &str) -> Result<SCDPattern> {
2721    match s.to_uppercase().as_str() {
2722        "TYPE_1" | "TYPE1" => Ok(SCDPattern::Type1),
2723        "TYPE_2" | "TYPE2" => Ok(SCDPattern::Type2),
2724        _ => Err(anyhow::anyhow!("Unknown SCD pattern: {}", s)),
2725    }
2726}
2727
2728fn parse_data_vault_classification(s: &str) -> Result<DataVaultClassification> {
2729    match s.to_uppercase().as_str() {
2730        "HUB" => Ok(DataVaultClassification::Hub),
2731        "LINK" => Ok(DataVaultClassification::Link),
2732        "SATELLITE" | "SAT" => Ok(DataVaultClassification::Satellite),
2733        _ => Err(anyhow::anyhow!("Unknown Data Vault classification: {}", s)),
2734    }
2735}
2736
2737#[cfg(test)]
2738mod tests {
2739    use super::*;
2740
2741    #[test]
2742    fn test_parse_simple_odcl_table() {
2743        let mut parser = ODCSImporter::new();
2744        let odcl_yaml = r#"
2745name: users
2746columns:
2747  - name: id
2748    data_type: INT
2749    nullable: false
2750    primary_key: true
2751  - name: name
2752    data_type: VARCHAR(255)
2753    nullable: false
2754database_type: Postgres
2755"#;
2756
2757        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2758        assert_eq!(table.name, "users");
2759        assert_eq!(table.columns.len(), 2);
2760        assert_eq!(table.columns[0].name, "id");
2761        assert_eq!(table.database_type, Some(DatabaseType::Postgres));
2762        assert_eq!(errors.len(), 0);
2763    }
2764
2765    #[test]
2766    fn test_parse_odcl_with_metadata() {
2767        let mut parser = ODCSImporter::new();
2768        let odcl_yaml = r#"
2769name: users
2770columns:
2771  - name: id
2772    data_type: INT
2773medallion_layer: gold
2774scd_pattern: TYPE_2
2775odcl_metadata:
2776  description: "User table"
2777  owner: "data-team"
2778"#;
2779
2780        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2781        assert_eq!(table.medallion_layers.len(), 1);
2782        assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
2783        assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
2784        if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
2785            assert_eq!(desc, "User table");
2786        }
2787        assert_eq!(errors.len(), 0);
2788    }
2789
2790    #[test]
2791    fn test_parse_odcl_with_data_vault() {
2792        let mut parser = ODCSImporter::new();
2793        let odcl_yaml = r#"
2794name: hub_customer
2795columns:
2796  - name: customer_key
2797    data_type: VARCHAR(50)
2798data_vault_classification: Hub
2799"#;
2800
2801        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2802        assert_eq!(
2803            table.data_vault_classification,
2804            Some(DataVaultClassification::Hub)
2805        );
2806        assert_eq!(errors.len(), 0);
2807    }
2808
2809    #[test]
2810    fn test_parse_invalid_odcl() {
2811        let mut parser = ODCSImporter::new();
2812        let invalid_yaml = "not: valid: yaml: structure:";
2813
2814        // Should fail to parse YAML
2815        assert!(parser.parse(invalid_yaml).is_err());
2816    }
2817
2818    #[test]
2819    fn test_parse_odcl_missing_required_fields() {
2820        let mut parser = ODCSImporter::new();
2821        let non_conformant = r#"
2822name: users
2823# Missing required columns field
2824"#;
2825
2826        // Should fail with missing columns
2827        assert!(parser.parse(non_conformant).is_err());
2828    }
2829
2830    #[test]
2831    fn test_parse_odcl_with_foreign_key() {
2832        let mut parser = ODCSImporter::new();
2833        let odcl_yaml = r#"
2834name: orders
2835columns:
2836  - name: id
2837    data_type: INT
2838    primary_key: true
2839  - name: user_id
2840    data_type: INT
2841    foreign_key:
2842      table_id: users
2843      column_name: id
2844"#;
2845
2846        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2847        assert_eq!(table.columns.len(), 2);
2848        let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
2849        assert!(user_id_col.foreign_key.is_some());
2850        assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
2851        assert_eq!(errors.len(), 0);
2852    }
2853
2854    #[test]
2855    fn test_parse_odcl_with_constraints() {
2856        let mut parser = ODCSImporter::new();
2857        let odcl_yaml = r#"
2858name: products
2859columns:
2860  - name: id
2861    data_type: INT
2862    primary_key: true
2863  - name: name
2864    data_type: VARCHAR(255)
2865    nullable: false
2866    constraints:
2867      - UNIQUE
2868      - NOT NULL
2869"#;
2870
2871        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2872        assert_eq!(table.columns.len(), 2);
2873        let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
2874        assert!(!name_col.nullable);
2875        assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
2876        assert_eq!(errors.len(), 0);
2877    }
2878}