data_modelling_sdk/import/
odcs.rs

1//! ODCS parser service for parsing Open Data Contract Standard YAML files.
2//!
3//! This service parses ODCS (Open Data Contract Standard) v3.1.0 and legacy ODCL (Data Contract Specification) YAML files
4//! and converts them to Table models. ODCL files are automatically converted to ODCS v3.1.0 format.
5//! Supports multiple formats:
6//! - ODCS v3.1.0 / v3.0.x format (apiVersion, kind, schema) - PRIMARY FORMAT
7//! - ODCL (Data Contract Specification) format (dataContractSpecification, models, definitions) - LEGACY, converted to ODCS
8//! - Simple ODCL format (name, columns) - LEGACY, converted to ODCS
9//! - Liquibase format
10
11use super::{ImportError, ImportResult, TableData};
12use crate::models::column::ForeignKey;
13use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
14use crate::models::{Column, Table};
15use anyhow::{Context, Result};
16use serde_json::Value as JsonValue;
17use std::collections::HashMap;
18use tracing::info;
19
20/// ODCS parser service for parsing Open Data Contract Standard YAML files.
21/// Handles ODCS v3.1.0 (primary format) and legacy ODCL formats (converted to ODCS).
22pub struct ODCSImporter {
23    /// Current YAML data for $ref resolution
24    current_yaml_data: Option<serde_yaml::Value>,
25}
26
27impl ODCSImporter {
28    /// Create a new ODCS parser instance.
29    ///
30    /// # Example
31    ///
32    /// ```rust
33    /// use data_modelling_sdk::import::odcs::ODCSImporter;
34    ///
35    /// let mut importer = ODCSImporter::new();
36    /// ```
37    pub fn new() -> Self {
38        Self {
39            current_yaml_data: None,
40        }
41    }
42
43    /// Import ODCS/ODCL YAML content and create Table (SDK interface).
44    ///
45    /// Supports ODCS v3.1.0 (primary), legacy ODCL formats (converted to ODCS), and Liquibase formats.
46    ///
47    /// # Arguments
48    ///
49    /// * `yaml_content` - ODCS/ODCL YAML content as a string
50    ///
51    /// # Returns
52    ///
53    /// An `ImportResult` containing the extracted table and any parse errors.
54    ///
55    /// # Example
56    ///
57    /// ```rust
58    /// use data_modelling_sdk::import::odcs::ODCSImporter;
59    ///
60    /// let mut importer = ODCSImporter::new();
61    /// let yaml = r#"
62    /// apiVersion: v3.1.0
63    /// kind: DataContract
64    /// id: 550e8400-e29b-41d4-a716-446655440000
65    /// version: 1.0.0
66    /// name: users
67    /// schema:
68    ///   fields:
69    ///     - name: id
70    ///       type: bigint
71    /// "#;
72    /// let result = importer.import(yaml).unwrap();
73    /// assert_eq!(result.tables.len(), 1);
74    /// ```
75    pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
76        match self.parse(yaml_content) {
77            Ok((table, errors)) => {
78                let sdk_tables = vec![TableData {
79                    table_index: 0,
80                    name: Some(table.name.clone()),
81                    columns: table
82                        .columns
83                        .iter()
84                        .map(|c| super::ColumnData {
85                            name: c.name.clone(),
86                            data_type: c.data_type.clone(),
87                            nullable: c.nullable,
88                            primary_key: c.primary_key,
89                        })
90                        .collect(),
91                }];
92                let sdk_errors: Vec<ImportError> = errors
93                    .iter()
94                    .map(|e| ImportError::ParseError(e.message.clone()))
95                    .collect();
96                Ok(ImportResult {
97                    tables: sdk_tables,
98                    tables_requiring_name: Vec::new(),
99                    errors: sdk_errors,
100                    ai_suggestions: None,
101                })
102            }
103            Err(e) => Err(ImportError::ParseError(e.to_string())),
104        }
105    }
106
107    /// Parse ODCS/ODCL YAML content and create Table (public method for native app use).
108    ///
109    /// This method returns the full Table object with all metadata, suitable for use in
110    /// native applications that need direct access to the parsed table structure.
111    /// For API use, prefer the `import()` method which returns ImportResult.
112    ///
113    /// # Returns
114    ///
115    /// Returns a tuple of (Table, list of errors/warnings).
116    /// Errors list is empty if parsing is successful.
117    pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
118        self.parse(yaml_content)
119    }
120
121    /// Parse ODCS/ODCL YAML content and create Table (internal method).
122    ///
123    /// Supports ODCS v3.1.0/v3.0.x (primary), ODCL Data Contract spec (legacy, converted to ODCS),
124    /// simple ODCL (legacy, converted to ODCS), and Liquibase formats.
125    ///
126    /// # Returns
127    ///
128    /// Returns a tuple of (Table, list of errors/warnings).
129    /// Errors list is empty if parsing is successful.
130    fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
131        // Errors are collected in helper functions, not here
132        let _errors: Vec<ParserError> = Vec::new();
133
134        // Parse YAML
135        let data: serde_yaml::Value =
136            serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
137
138        if data.is_null() {
139            return Err(anyhow::anyhow!("Empty YAML content"));
140        }
141
142        // Store current YAML data for $ref resolution
143        self.current_yaml_data = Some(data.clone());
144
145        // Convert to JSON Value for easier manipulation
146        let json_data = yaml_to_json_value(&data)?;
147
148        // Check format and parse accordingly
149        if self.is_liquibase_format(&json_data) {
150            return self.parse_liquibase(&json_data);
151        }
152
153        if self.is_odcl_v3_format(&json_data) {
154            return self.parse_odcl_v3(&json_data);
155        }
156
157        if self.is_data_contract_format(&json_data) {
158            return self.parse_data_contract(&json_data);
159        }
160
161        // Fall back to simple ODCL format
162        self.parse_simple_odcl(&json_data)
163    }
164
165    /// Resolve a $ref reference like '#/definitions/betAction'.
166    fn resolve_ref<'a>(&self, ref_str: &str, data: &'a JsonValue) -> Option<&'a JsonValue> {
167        if !ref_str.starts_with("#/") {
168            return None;
169        }
170
171        // Remove the leading '#/'
172        let path = &ref_str[2..];
173        let parts: Vec<&str> = path.split('/').collect();
174
175        // Navigate through the data structure
176        let mut current = data;
177        for part in parts {
178            current = current.get(part)?;
179        }
180
181        if current.is_object() {
182            Some(current)
183        } else {
184            None
185        }
186    }
187
188    /// Check if YAML is in Liquibase format.
189    fn is_liquibase_format(&self, data: &JsonValue) -> bool {
190        if data.get("databaseChangeLog").is_some() {
191            return true;
192        }
193        // Check for Liquibase-specific keys
194        if let Some(obj) = data.as_object() {
195            let obj_str = format!("{:?}", obj);
196            if obj_str.contains("changeSet") {
197                return true;
198            }
199        }
200        false
201    }
202
203    /// Check if YAML is in ODCS v3.0.x format.
204    fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
205        if let Some(obj) = data.as_object() {
206            let has_api_version = obj.contains_key("apiVersion");
207            let has_kind = obj
208                .get("kind")
209                .and_then(|v| v.as_str())
210                .map(|s| s == "DataContract")
211                .unwrap_or(false);
212            let has_id = obj.contains_key("id");
213            let has_version = obj.contains_key("version");
214            return has_api_version && has_kind && has_id && has_version;
215        }
216        false
217    }
218
219    /// Check if YAML is in Data Contract specification format.
220    fn is_data_contract_format(&self, data: &JsonValue) -> bool {
221        if let Some(obj) = data.as_object() {
222            let has_spec = obj.contains_key("dataContractSpecification");
223            let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
224            return has_spec && has_models;
225        }
226        false
227    }
228
229    /// Parse simple ODCL format.
230    fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
231        let mut errors = Vec::new();
232
233        // Extract table name
234        let name = data
235            .get("name")
236            .and_then(|v| v.as_str())
237            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
238            .to_string();
239
240        // Extract columns
241        let columns_data = data
242            .get("columns")
243            .and_then(|v| v.as_array())
244            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
245
246        let mut columns = Vec::new();
247        for (idx, col_data) in columns_data.iter().enumerate() {
248            match self.parse_column(col_data) {
249                Ok(col) => columns.push(col),
250                Err(e) => {
251                    errors.push(ParserError {
252                        error_type: "column_parse_error".to_string(),
253                        field: format!("columns[{}]", idx),
254                        message: e.to_string(),
255                    });
256                }
257            }
258        }
259
260        // Extract metadata
261        let database_type = self.extract_database_type(data);
262        let medallion_layers = self.extract_medallion_layers(data);
263        let scd_pattern = self.extract_scd_pattern(data);
264        let data_vault_classification = self.extract_data_vault_classification(data);
265        let quality_rules = self.extract_quality_rules(data);
266
267        // Validate pattern exclusivity
268        if scd_pattern.is_some() && data_vault_classification.is_some() {
269            errors.push(ParserError {
270                error_type: "validation_error".to_string(),
271                field: "patterns".to_string(),
272                message: "SCD pattern and Data Vault classification are mutually exclusive"
273                    .to_string(),
274            });
275        }
276
277        // Extract odcl_metadata
278        let mut odcl_metadata = HashMap::new();
279        if let Some(metadata) = data.get("odcl_metadata")
280            && let Some(obj) = metadata.as_object()
281        {
282            for (key, value) in obj {
283                odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
284            }
285        }
286
287        let table_uuid = self.extract_table_uuid(data);
288
289        let table = Table {
290            id: table_uuid,
291            name,
292            columns,
293            database_type,
294            catalog_name: None,
295            schema_name: None,
296            medallion_layers,
297            scd_pattern,
298            data_vault_classification,
299            modeling_level: None,
300            tags: Vec::new(),
301            odcl_metadata,
302            owner: None,
303            sla: None,
304            contact_details: None,
305            infrastructure_type: None,
306            notes: None,
307            position: None,
308            yaml_file_path: None,
309            drawio_cell_id: None,
310            quality: quality_rules,
311            errors: Vec::new(),
312            created_at: chrono::Utc::now(),
313            updated_at: chrono::Utc::now(),
314        };
315
316        info!("Parsed ODCL table: {}", table.name);
317        Ok((table, errors))
318    }
319
320    /// Parse a single column definition.
321    fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
322        let name = col_data
323            .get("name")
324            .and_then(|v| v.as_str())
325            .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
326            .to_string();
327
328        let data_type = col_data
329            .get("data_type")
330            .and_then(|v| v.as_str())
331            .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
332            .to_string();
333
334        // Normalize data_type to uppercase (preserve STRUCT<...> format)
335        let data_type = normalize_data_type(&data_type);
336
337        let nullable = col_data
338            .get("nullable")
339            .and_then(|v| v.as_bool())
340            .unwrap_or(true);
341
342        let primary_key = col_data
343            .get("primary_key")
344            .and_then(|v| v.as_bool())
345            .unwrap_or(false);
346
347        let foreign_key = col_data
348            .get("foreign_key")
349            .and_then(|v| self.parse_foreign_key(v));
350
351        let constraints = col_data
352            .get("constraints")
353            .and_then(|v| v.as_array())
354            .map(|arr| {
355                arr.iter()
356                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
357                    .collect()
358            })
359            .unwrap_or_default();
360
361        let description = col_data
362            .get("description")
363            .and_then(|v| v.as_str())
364            .map(|s| s.to_string())
365            .unwrap_or_default();
366
367        // Extract column-level quality rules
368        let mut column_quality_rules = Vec::new();
369        if let Some(quality_val) = col_data.get("quality") {
370            if let Some(arr) = quality_val.as_array() {
371                // Array of quality rules
372                for item in arr {
373                    if let Some(obj) = item.as_object() {
374                        let mut rule = HashMap::new();
375                        for (key, value) in obj {
376                            rule.insert(key.clone(), json_value_to_serde_value(value));
377                        }
378                        column_quality_rules.push(rule);
379                    }
380                }
381            } else if let Some(obj) = quality_val.as_object() {
382                // Single quality rule object
383                let mut rule = HashMap::new();
384                for (key, value) in obj {
385                    rule.insert(key.clone(), json_value_to_serde_value(value));
386                }
387                column_quality_rules.push(rule);
388            }
389        }
390
391        // If nullable=false (required=true), add a "not_null" quality rule if not already present
392        if !nullable {
393            let has_not_null = column_quality_rules.iter().any(|rule| {
394                rule.get("type")
395                    .and_then(|v| v.as_str())
396                    .map(|s| {
397                        s.to_lowercase().contains("not_null")
398                            || s.to_lowercase().contains("notnull")
399                    })
400                    .unwrap_or(false)
401            });
402            if !has_not_null {
403                let mut not_null_rule = HashMap::new();
404                not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
405                not_null_rule.insert(
406                    "description".to_string(),
407                    serde_json::json!("Column must not be null"),
408                );
409                column_quality_rules.push(not_null_rule);
410            }
411        }
412
413        Ok(Column {
414            name,
415            data_type,
416            nullable,
417            primary_key,
418            secondary_key: false,
419            composite_key: None,
420            foreign_key,
421            constraints,
422            description,
423            errors: Vec::new(),
424            quality: column_quality_rules,
425            enum_values: Vec::new(),
426            column_order: 0,
427        })
428    }
429
430    /// Parse foreign key from JSON value.
431    fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
432        let obj = fk_data.as_object()?;
433        Some(ForeignKey {
434            table_id: obj
435                .get("table_id")
436                .or_else(|| obj.get("table"))
437                .and_then(|v| v.as_str())
438                .unwrap_or("")
439                .to_string(),
440            column_name: obj
441                .get("column_name")
442                .or_else(|| obj.get("column"))
443                .and_then(|v| v.as_str())
444                .unwrap_or("")
445                .to_string(),
446        })
447    }
448
449    /// Extract database type from data.
450    fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
451        data.get("database_type")
452            .and_then(|v| v.as_str())
453            .and_then(|s| match s.to_uppercase().as_str() {
454                "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
455                "MYSQL" => Some(DatabaseType::Mysql),
456                "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
457                "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
458                "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
459                _ => None,
460            })
461    }
462
463    /// Extract medallion layers from data.
464    fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
465        let mut layers = Vec::new();
466
467        // Check plural form first
468        if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
469            for item in arr {
470                if let Some(s) = item.as_str()
471                    && let Ok(layer) = parse_medallion_layer(s)
472                {
473                    layers.push(layer);
474                }
475            }
476        }
477        // Check singular form (backward compatibility)
478        else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
479            && let Ok(layer) = parse_medallion_layer(s)
480        {
481            layers.push(layer);
482        }
483
484        layers
485    }
486
487    /// Extract SCD pattern from data.
488    fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
489        data.get("scd_pattern")
490            .and_then(|v| v.as_str())
491            .and_then(|s| parse_scd_pattern(s).ok())
492    }
493
494    /// Extract Data Vault classification from data.
495    fn extract_data_vault_classification(
496        &self,
497        data: &JsonValue,
498    ) -> Option<DataVaultClassification> {
499        data.get("data_vault_classification")
500            .and_then(|v| v.as_str())
501            .and_then(|s| parse_data_vault_classification(s).ok())
502    }
503
504    /// Extract quality rules from data.
505    fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
506        use serde_json::Value;
507        let mut quality_rules = Vec::new();
508
509        // Check for quality field at root level (array of objects or single object)
510        if let Some(quality_val) = data.get("quality") {
511            if let Some(arr) = quality_val.as_array() {
512                // Array of quality rules
513                for item in arr {
514                    if let Some(obj) = item.as_object() {
515                        let mut rule = HashMap::new();
516                        for (key, value) in obj {
517                            rule.insert(key.clone(), json_value_to_serde_value(value));
518                        }
519                        quality_rules.push(rule);
520                    }
521                }
522            } else if let Some(obj) = quality_val.as_object() {
523                // Single quality rule object
524                let mut rule = HashMap::new();
525                for (key, value) in obj {
526                    rule.insert(key.clone(), json_value_to_serde_value(value));
527                }
528                quality_rules.push(rule);
529            } else if let Some(s) = quality_val.as_str() {
530                // Simple string quality value
531                let mut rule = HashMap::new();
532                rule.insert("value".to_string(), Value::String(s.to_string()));
533                quality_rules.push(rule);
534            }
535        }
536
537        // Check for quality in metadata (ODCL v3 format)
538        if let Some(metadata) = data.get("metadata")
539            && let Some(metadata_obj) = metadata.as_object()
540            && let Some(quality_val) = metadata_obj.get("quality")
541        {
542            if let Some(arr) = quality_val.as_array() {
543                // Array of quality rules
544                for item in arr {
545                    if let Some(obj) = item.as_object() {
546                        let mut rule = HashMap::new();
547                        for (key, value) in obj {
548                            rule.insert(key.clone(), json_value_to_serde_value(value));
549                        }
550                        quality_rules.push(rule);
551                    }
552                }
553            } else if let Some(obj) = quality_val.as_object() {
554                // Single quality rule object
555                let mut rule = HashMap::new();
556                for (key, value) in obj {
557                    rule.insert(key.clone(), json_value_to_serde_value(value));
558                }
559                quality_rules.push(rule);
560            } else if let Some(s) = quality_val.as_str() {
561                // Simple string quality value
562                let mut rule = HashMap::new();
563                rule.insert("value".to_string(), Value::String(s.to_string()));
564                quality_rules.push(rule);
565            }
566        }
567
568        // Check for tblproperties field (similar to SQL TBLPROPERTIES)
569        if let Some(tblprops) = data.get("tblproperties")
570            && let Some(obj) = tblprops.as_object()
571        {
572            for (key, value) in obj {
573                let mut rule = HashMap::new();
574                rule.insert("property".to_string(), Value::String(key.clone()));
575                rule.insert("value".to_string(), json_value_to_serde_value(value));
576                quality_rules.push(rule);
577            }
578        }
579
580        quality_rules
581    }
582
583    /// Parse Liquibase YAML changelog format.
584    ///
585    /// Extracts the first `createTable` change from a Liquibase changelog.
586    /// Supports both direct column definitions and nested column structures.
587    ///
588    /// # Supported Format
589    ///
590    fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
591        // Supported Liquibase YAML changelog format:
592        // databaseChangeLog:
593        //   - changeSet:
594        //       - createTable:
595        //           tableName: my_table
596        //           columns:
597        //             - column:
598        //                 name: id
599        //                 type: int
600        //                 constraints:
601        //                   primaryKey: true
602        //                   nullable: false
603
604        let mut errors = Vec::new();
605
606        let changelog = data
607            .get("databaseChangeLog")
608            .and_then(|v| v.as_array())
609            .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
610
611        // Find first createTable change.
612        let mut table_name: Option<String> = None;
613        let mut columns: Vec<crate::models::column::Column> = Vec::new();
614
615        for entry in changelog {
616            // Entries are typically maps like { changeSet: [...] } or { changeSet: { changes: [...] } }
617            if let Some(change_set) = entry.get("changeSet") {
618                // Liquibase YAML can represent changeSet as map or list.
619                let changes = if let Some(obj) = change_set.as_object() {
620                    obj.get("changes")
621                        .and_then(|v| v.as_array())
622                        .cloned()
623                        .unwrap_or_default()
624                } else if let Some(arr) = change_set.as_array() {
625                    // Some variants encode changes directly as list entries inside changeSet.
626                    arr.clone()
627                } else {
628                    Vec::new()
629                };
630
631                for ch in changes {
632                    let create = ch.get("createTable").or_else(|| ch.get("create_table"));
633                    if let Some(create) = create {
634                        table_name = create
635                            .get("tableName")
636                            .or_else(|| create.get("table_name"))
637                            .and_then(|v| v.as_str())
638                            .map(|s| s.to_string());
639
640                        // columns: [ { column: { ... } }, ... ]
641                        if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
642                            for col_entry in cols {
643                                let col = col_entry.get("column").unwrap_or(col_entry);
644                                let name = col
645                                    .get("name")
646                                    .and_then(|v| v.as_str())
647                                    .unwrap_or("")
648                                    .to_string();
649                                let data_type = col
650                                    .get("type")
651                                    .and_then(|v| v.as_str())
652                                    .unwrap_or("")
653                                    .to_string();
654
655                                if name.is_empty() {
656                                    errors.push(ParserError {
657                                        error_type: "validation_error".to_string(),
658                                        field: "columns.name".to_string(),
659                                        message: "Liquibase createTable column missing name"
660                                            .to_string(),
661                                    });
662                                    continue;
663                                }
664
665                                let mut column =
666                                    crate::models::column::Column::new(name, data_type);
667
668                                if let Some(constraints) =
669                                    col.get("constraints").and_then(|v| v.as_object())
670                                {
671                                    if let Some(pk) =
672                                        constraints.get("primaryKey").and_then(|v| v.as_bool())
673                                    {
674                                        column.primary_key = pk;
675                                    }
676                                    if let Some(nullable) =
677                                        constraints.get("nullable").and_then(|v| v.as_bool())
678                                    {
679                                        column.nullable = nullable;
680                                    }
681                                }
682
683                                columns.push(column);
684                            }
685                        }
686
687                        // parse_table() returns a single Table, so we parse the first createTable.
688                        // If multiple tables are needed, call parse_table() multiple times or use import().
689                        break;
690                    }
691                }
692            }
693            if table_name.is_some() {
694                break;
695            }
696        }
697
698        let table_name = table_name
699            .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
700        let table = Table::new(table_name, columns);
701        // Preserve any errors collected.
702        Ok((table, errors))
703    }
704
705    /// Parse ODCS v3.0.x format.
706    fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
707        let mut errors = Vec::new();
708
709        // Extract table name
710        let table_name = data
711            .get("name")
712            .and_then(|v| v.as_str())
713            .map(|s| s.to_string())
714            .or_else(|| {
715                // Try to get from schema array
716                data.get("schema")
717                    .and_then(|v| v.as_array())
718                    .and_then(|arr| arr.first())
719                    .and_then(|obj| obj.as_object())
720                    .and_then(|obj| obj.get("name"))
721                    .and_then(|v| v.as_str())
722                    .map(|s| s.to_string())
723            })
724            .ok_or_else(|| {
725                anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
726            })?;
727
728        // Extract schema - ODCS v3.0.x uses array of SchemaObject
729        let schema = data
730            .get("schema")
731            .and_then(|v| v.as_array())
732            .ok_or_else(|| {
733                errors.push(ParserError {
734                    error_type: "validation_error".to_string(),
735                    field: "schema".to_string(),
736                    message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
737                });
738                anyhow::anyhow!("Missing schema")
739            });
740
741        let schema = match schema {
742            Ok(s) if s.is_empty() => {
743                errors.push(ParserError {
744                    error_type: "validation_error".to_string(),
745                    field: "schema".to_string(),
746                    message: "ODCS v3.0.x schema array is empty".to_string(),
747                });
748                let quality_rules = self.extract_quality_rules(data);
749                let table_uuid = self.extract_table_uuid(data);
750                let table = Table {
751                    id: table_uuid,
752                    name: table_name,
753                    columns: Vec::new(),
754                    database_type: None,
755                    catalog_name: None,
756                    schema_name: None,
757                    medallion_layers: Vec::new(),
758                    scd_pattern: None,
759                    data_vault_classification: None,
760                    modeling_level: None,
761                    tags: Vec::new(),
762                    odcl_metadata: HashMap::new(),
763                    owner: None,
764                    sla: None,
765                    contact_details: None,
766                    infrastructure_type: None,
767                    notes: None,
768                    position: None,
769                    yaml_file_path: None,
770                    drawio_cell_id: None,
771                    quality: quality_rules,
772                    errors: Vec::new(),
773                    created_at: chrono::Utc::now(),
774                    updated_at: chrono::Utc::now(),
775                };
776                return Ok((table, errors));
777            }
778            Ok(s) => s,
779            Err(_) => {
780                let quality_rules = self.extract_quality_rules(data);
781                let table_uuid = self.extract_table_uuid(data);
782                let table = Table {
783                    id: table_uuid,
784                    name: table_name,
785                    columns: Vec::new(),
786                    database_type: None,
787                    catalog_name: None,
788                    schema_name: None,
789                    medallion_layers: Vec::new(),
790                    scd_pattern: None,
791                    data_vault_classification: None,
792                    modeling_level: None,
793                    tags: Vec::new(),
794                    odcl_metadata: HashMap::new(),
795                    owner: None,
796                    sla: None,
797                    contact_details: None,
798                    infrastructure_type: None,
799                    notes: None,
800                    position: None,
801                    yaml_file_path: None,
802                    drawio_cell_id: None,
803                    quality: quality_rules,
804                    errors: Vec::new(),
805                    created_at: chrono::Utc::now(),
806                    updated_at: chrono::Utc::now(),
807                };
808                return Ok((table, errors));
809            }
810        };
811
812        // Get the first schema object (table)
813        let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
814            errors.push(ParserError {
815                error_type: "validation_error".to_string(),
816                field: "schema[0]".to_string(),
817                message: "First schema object must be a dictionary".to_string(),
818            });
819            anyhow::anyhow!("Invalid schema object")
820        })?;
821
822        let object_name = schema_object
823            .get("name")
824            .and_then(|v| v.as_str())
825            .unwrap_or(&table_name);
826
827        let properties = schema_object
828            .get("properties")
829            .and_then(|v| v.as_object())
830            .ok_or_else(|| {
831                errors.push(ParserError {
832                    error_type: "validation_error".to_string(),
833                    field: format!("Object '{}'", object_name),
834                    message: format!("Object '{}' missing 'properties' field", object_name),
835                });
836                anyhow::anyhow!("Missing properties")
837            })?;
838
839        // Parse properties as columns (similar to Data Contract)
840        let mut columns = Vec::new();
841        for (prop_name, prop_data) in properties {
842            if let Some(prop_obj) = prop_data.as_object() {
843                match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
844                    Ok(mut cols) => columns.append(&mut cols),
845                    Err(e) => {
846                        errors.push(ParserError {
847                            error_type: "property_parse_error".to_string(),
848                            field: format!("Property '{}'", prop_name),
849                            message: e.to_string(),
850                        });
851                    }
852                }
853            } else {
854                errors.push(ParserError {
855                    error_type: "validation_error".to_string(),
856                    field: format!("Property '{}'", prop_name),
857                    message: format!("Property '{}' must be an object", prop_name),
858                });
859            }
860        }
861
862        // Extract metadata from customProperties
863        let (medallion_layers, scd_pattern, data_vault_classification, mut tags) =
864            self.extract_metadata_from_custom_properties(data);
865
866        // Extract sharedDomains from customProperties
867        let mut shared_domains: Vec<String> = Vec::new();
868        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
869            for prop in custom_props {
870                if let Some(prop_obj) = prop.as_object() {
871                    let prop_key = prop_obj
872                        .get("property")
873                        .and_then(|v| v.as_str())
874                        .unwrap_or("");
875                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
876                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
877                    {
878                        for item in arr {
879                            if let Some(s) = item.as_str() {
880                                shared_domains.push(s.to_string());
881                            }
882                        }
883                    }
884                }
885            }
886        }
887
888        // Extract tags from top-level tags field (if not already extracted from customProperties)
889        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
890            for item in tags_arr {
891                if let Some(s) = item.as_str()
892                    && !tags.contains(&s.to_string())
893                {
894                    tags.push(s.to_string());
895                }
896            }
897        }
898
899        // Extract database type from servers
900        let database_type = self.extract_database_type_from_odcl_v3_servers(data);
901
902        // Extract quality rules
903        let quality_rules = self.extract_quality_rules(data);
904
905        // Build ODCL metadata
906        let mut odcl_metadata = HashMap::new();
907        odcl_metadata.insert(
908            "apiVersion".to_string(),
909            json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
910        );
911        odcl_metadata.insert(
912            "kind".to_string(),
913            json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
914        );
915        odcl_metadata.insert(
916            "id".to_string(),
917            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
918        );
919        odcl_metadata.insert(
920            "version".to_string(),
921            json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
922        );
923        odcl_metadata.insert(
924            "status".to_string(),
925            json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
926        );
927
928        // Extract servicelevels if present
929        if let Some(servicelevels_val) = data.get("servicelevels") {
930            odcl_metadata.insert(
931                "servicelevels".to_string(),
932                json_value_to_serde_value(servicelevels_val),
933            );
934        }
935
936        // Extract links if present
937        if let Some(links_val) = data.get("links") {
938            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
939        }
940
941        // Extract domain, dataProduct, tenant
942        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
943            odcl_metadata.insert(
944                "domain".to_string(),
945                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
946            );
947        }
948        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
949            odcl_metadata.insert(
950                "dataProduct".to_string(),
951                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
952            );
953        }
954        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
955            odcl_metadata.insert(
956                "tenant".to_string(),
957                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
958            );
959        }
960
961        // Extract top-level description (can be object or string)
962        if let Some(desc_val) = data.get("description") {
963            odcl_metadata.insert(
964                "description".to_string(),
965                json_value_to_serde_value(desc_val),
966            );
967        }
968
969        // Extract pricing
970        if let Some(pricing_val) = data.get("pricing") {
971            odcl_metadata.insert(
972                "pricing".to_string(),
973                json_value_to_serde_value(pricing_val),
974            );
975        }
976
977        // Extract team
978        if let Some(team_val) = data.get("team") {
979            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
980        }
981
982        // Extract roles
983        if let Some(roles_val) = data.get("roles") {
984            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
985        }
986
987        // Extract terms
988        if let Some(terms_val) = data.get("terms") {
989            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
990        }
991
992        // Extract full servers array (not just type)
993        if let Some(servers_val) = data.get("servers") {
994            odcl_metadata.insert(
995                "servers".to_string(),
996                json_value_to_serde_value(servers_val),
997            );
998        }
999
1000        // Extract infrastructure
1001        if let Some(infrastructure_val) = data.get("infrastructure") {
1002            odcl_metadata.insert(
1003                "infrastructure".to_string(),
1004                json_value_to_serde_value(infrastructure_val),
1005            );
1006        }
1007
1008        // Store sharedDomains in metadata (extracted from customProperties above)
1009        if !shared_domains.is_empty() {
1010            let shared_domains_json: Vec<serde_json::Value> = shared_domains
1011                .iter()
1012                .map(|d| serde_json::Value::String(d.clone()))
1013                .collect();
1014            odcl_metadata.insert(
1015                "sharedDomains".to_string(),
1016                serde_json::Value::Array(shared_domains_json),
1017            );
1018        }
1019
1020        let table_uuid = self.extract_table_uuid(data);
1021
1022        let table = Table {
1023            id: table_uuid,
1024            name: table_name,
1025            columns,
1026            database_type,
1027            catalog_name: None,
1028            schema_name: None,
1029            medallion_layers,
1030            scd_pattern,
1031            data_vault_classification,
1032            modeling_level: None,
1033            tags,
1034            odcl_metadata,
1035            owner: None,
1036            sla: None,
1037            contact_details: None,
1038            infrastructure_type: None,
1039            notes: None,
1040            position: None,
1041            yaml_file_path: None,
1042            drawio_cell_id: None,
1043            quality: quality_rules,
1044            errors: Vec::new(),
1045            created_at: chrono::Utc::now(),
1046            updated_at: chrono::Utc::now(),
1047        };
1048
1049        info!(
1050            "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1051            table.name,
1052            errors.len()
1053        );
1054        Ok((table, errors))
1055    }
1056
1057    /// Parse a single property from ODCS v3.0.x format (similar to Data Contract field).
1058    fn parse_odcl_v3_property(
1059        &self,
1060        prop_name: &str,
1061        prop_data: &serde_json::Map<String, JsonValue>,
1062        data: &JsonValue,
1063    ) -> Result<Vec<Column>> {
1064        // Reuse Data Contract field parsing logic (they're similar)
1065        let mut errors = Vec::new();
1066        self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
1067    }
1068
1069    /// Extract table UUID from ODCS `id` field (standard) or fallback to customProperties/odcl_metadata (backward compatibility).
1070    /// Returns the UUID if found, otherwise generates a new one.
1071    fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
1072        // First check the top-level `id` field (ODCS spec: "A unique identifier used to reduce the risk of dataset name collisions, such as a UUID.")
1073        if let Some(id_val) = data.get("id")
1074            && let Some(id_str) = id_val.as_str()
1075        {
1076            if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1077                tracing::debug!(
1078                    "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
1079                    uuid
1080                );
1081                return uuid;
1082            } else {
1083                tracing::warn!(
1084                    "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
1085                    id_str
1086                );
1087            }
1088        }
1089
1090        // Backward compatibility: check customProperties for tableUuid (legacy format)
1091        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1092            for prop in custom_props {
1093                if let Some(prop_obj) = prop.as_object() {
1094                    let prop_key = prop_obj
1095                        .get("property")
1096                        .and_then(|v| v.as_str())
1097                        .unwrap_or("");
1098                    if prop_key == "tableUuid"
1099                        && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
1100                        && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1101                    {
1102                        tracing::debug!(
1103                            "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
1104                            uuid
1105                        );
1106                        return uuid;
1107                    }
1108                }
1109            }
1110        }
1111
1112        // Fallback: check odcl_metadata if present (legacy format)
1113        if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
1114            && let Some(uuid_val) = metadata.get("tableUuid")
1115            && let Some(uuid_str) = uuid_val.as_str()
1116            && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1117        {
1118            tracing::debug!(
1119                "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
1120                uuid
1121            );
1122            return uuid;
1123        }
1124
1125        // Generate deterministic UUID v5 if not found (based on table name)
1126        let table_name = data
1127            .get("name")
1128            .and_then(|v| v.as_str())
1129            .unwrap_or("unknown");
1130        let new_uuid = crate::models::table::Table::generate_id(
1131            table_name, None, // database_type not available here
1132            None, // catalog_name not available here
1133            None, // schema_name not available here
1134        );
1135        tracing::warn!(
1136            "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
1137            table_name,
1138            new_uuid
1139        );
1140        new_uuid
1141    }
1142
1143    /// Extract metadata from customProperties in ODCS v3.0.x format.
1144    fn extract_metadata_from_custom_properties(
1145        &self,
1146        data: &JsonValue,
1147    ) -> (
1148        Vec<MedallionLayer>,
1149        Option<SCDPattern>,
1150        Option<DataVaultClassification>,
1151        Vec<String>,
1152    ) {
1153        let mut medallion_layers = Vec::new();
1154        let mut scd_pattern = None;
1155        let mut data_vault_classification = None;
1156        let mut tags = Vec::new();
1157
1158        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1159            for prop in custom_props {
1160                if let Some(prop_obj) = prop.as_object() {
1161                    let prop_key = prop_obj
1162                        .get("property")
1163                        .and_then(|v| v.as_str())
1164                        .unwrap_or("");
1165                    let prop_value = prop_obj.get("value");
1166
1167                    match prop_key {
1168                        "medallionLayers" | "medallion_layers" => {
1169                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1170                                for item in arr {
1171                                    if let Some(s) = item.as_str()
1172                                        && let Ok(layer) = parse_medallion_layer(s)
1173                                    {
1174                                        medallion_layers.push(layer);
1175                                    }
1176                                }
1177                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1178                                // Comma-separated string
1179                                for part in s.split(',') {
1180                                    if let Ok(layer) = parse_medallion_layer(part.trim()) {
1181                                        medallion_layers.push(layer);
1182                                    }
1183                                }
1184                            }
1185                        }
1186                        "scdPattern" | "scd_pattern" => {
1187                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1188                                scd_pattern = parse_scd_pattern(s).ok();
1189                            }
1190                        }
1191                        "dataVaultClassification" | "data_vault_classification" => {
1192                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1193                                data_vault_classification = parse_data_vault_classification(s).ok();
1194                            }
1195                        }
1196                        "tags" => {
1197                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
1198                                for item in arr {
1199                                    if let Some(s) = item.as_str() {
1200                                        tags.push(s.to_string());
1201                                    }
1202                                }
1203                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
1204                                // Comma-separated string
1205                                for part in s.split(',') {
1206                                    tags.push(part.trim().to_string());
1207                                }
1208                            }
1209                        }
1210                        "sharedDomains" | "shared_domains" => {
1211                            // sharedDomains will be stored in metadata by the caller
1212                            // This match is here for completeness but sharedDomains is handled separately
1213                        }
1214                        _ => {}
1215                    }
1216                }
1217            }
1218        }
1219
1220        // Also extract tags from top-level tags field
1221        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1222            for item in tags_arr {
1223                if let Some(s) = item.as_str()
1224                    && !tags.contains(&s.to_string())
1225                {
1226                    tags.push(s.to_string());
1227                }
1228            }
1229        }
1230
1231        (
1232            medallion_layers,
1233            scd_pattern,
1234            data_vault_classification,
1235            tags,
1236        )
1237    }
1238
1239    /// Extract database type from servers in ODCS v3.0.x format.
1240    fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
1241        // ODCS v3.0.x: servers is an array of Server objects
1242        if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
1243            && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
1244        {
1245            return server_obj
1246                .get("type")
1247                .and_then(|v| v.as_str())
1248                .and_then(|s| self.parse_database_type(s));
1249        }
1250        None
1251    }
1252
1253    /// Parse Data Contract format.
1254    fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1255        let mut errors = Vec::new();
1256
1257        // Extract models
1258        let models = data
1259            .get("models")
1260            .and_then(|v| v.as_object())
1261            .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
1262
1263        // parse_table() returns a single Table, so we parse the first model.
1264        // If multiple models are needed, call parse_table() multiple times or use import().
1265        let (model_name, model_data) = models
1266            .iter()
1267            .next()
1268            .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
1269
1270        let model_data = model_data
1271            .as_object()
1272            .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
1273
1274        // Extract fields (columns)
1275        let fields = model_data
1276            .get("fields")
1277            .and_then(|v| v.as_object())
1278            .ok_or_else(|| {
1279                errors.push(ParserError {
1280                    error_type: "validation_error".to_string(),
1281                    field: format!("Model '{}'", model_name),
1282                    message: format!("Model '{}' missing 'fields' field", model_name),
1283                });
1284                anyhow::anyhow!("Missing fields")
1285            });
1286
1287        let fields = match fields {
1288            Ok(f) => f,
1289            Err(_) => {
1290                // Return empty table with errors
1291                let quality_rules = self.extract_quality_rules(data);
1292                let table_uuid = self.extract_table_uuid(data);
1293                let table = Table {
1294                    id: table_uuid,
1295                    name: model_name.clone(),
1296                    columns: Vec::new(),
1297                    database_type: None,
1298                    catalog_name: None,
1299                    schema_name: None,
1300                    medallion_layers: Vec::new(),
1301                    scd_pattern: None,
1302                    data_vault_classification: None,
1303                    modeling_level: None,
1304                    tags: Vec::new(),
1305                    odcl_metadata: HashMap::new(),
1306                    owner: None,
1307                    sla: None,
1308                    contact_details: None,
1309                    infrastructure_type: None,
1310                    notes: None,
1311                    position: None,
1312                    yaml_file_path: None,
1313                    drawio_cell_id: None,
1314                    quality: quality_rules,
1315                    errors: Vec::new(),
1316                    created_at: chrono::Utc::now(),
1317                    updated_at: chrono::Utc::now(),
1318                };
1319                return Ok((table, errors));
1320            }
1321        };
1322
1323        // Parse fields as columns
1324        let mut columns = Vec::new();
1325        for (field_name, field_data) in fields {
1326            if let Some(field_obj) = field_data.as_object() {
1327                match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
1328                    Ok(mut cols) => columns.append(&mut cols),
1329                    Err(e) => {
1330                        errors.push(ParserError {
1331                            error_type: "field_parse_error".to_string(),
1332                            field: format!("Field '{}'", field_name),
1333                            message: e.to_string(),
1334                        });
1335                    }
1336                }
1337            } else {
1338                errors.push(ParserError {
1339                    error_type: "validation_error".to_string(),
1340                    field: format!("Field '{}'", field_name),
1341                    message: format!("Field '{}' must be an object", field_name),
1342                });
1343            }
1344        }
1345
1346        // Extract metadata from info section
1347        // The frontend expects info fields to be nested under "info" key
1348        let mut odcl_metadata = HashMap::new();
1349
1350        // Extract info section and nest it properly
1351        // Convert JsonValue object to serde_json::Value::Object (Map)
1352        if let Some(info_val) = data.get("info") {
1353            // Convert JsonValue to serde_json::Value
1354            let info_json_value = json_value_to_serde_value(info_val);
1355            odcl_metadata.insert("info".to_string(), info_json_value);
1356        }
1357
1358        odcl_metadata.insert(
1359            "dataContractSpecification".to_string(),
1360            json_value_to_serde_value(
1361                data.get("dataContractSpecification")
1362                    .unwrap_or(&JsonValue::Null),
1363            ),
1364        );
1365        odcl_metadata.insert(
1366            "id".to_string(),
1367            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1368        );
1369        // Note: model_name is not added to metadata as it's redundant (it's the table name)
1370
1371        // Extract servicelevels if present
1372        if let Some(servicelevels_val) = data.get("servicelevels") {
1373            odcl_metadata.insert(
1374                "servicelevels".to_string(),
1375                json_value_to_serde_value(servicelevels_val),
1376            );
1377        }
1378
1379        // Extract links if present
1380        if let Some(links_val) = data.get("links") {
1381            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1382        }
1383
1384        // Extract domain, dataProduct, tenant
1385        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1386            odcl_metadata.insert(
1387                "domain".to_string(),
1388                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1389            );
1390        }
1391        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1392            odcl_metadata.insert(
1393                "dataProduct".to_string(),
1394                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1395            );
1396        }
1397        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1398            odcl_metadata.insert(
1399                "tenant".to_string(),
1400                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1401            );
1402        }
1403
1404        // Extract top-level description (can be object or string)
1405        if let Some(desc_val) = data.get("description") {
1406            odcl_metadata.insert(
1407                "description".to_string(),
1408                json_value_to_serde_value(desc_val),
1409            );
1410        }
1411
1412        // Extract pricing
1413        if let Some(pricing_val) = data.get("pricing") {
1414            odcl_metadata.insert(
1415                "pricing".to_string(),
1416                json_value_to_serde_value(pricing_val),
1417            );
1418        }
1419
1420        // Extract team
1421        if let Some(team_val) = data.get("team") {
1422            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1423        }
1424
1425        // Extract roles
1426        if let Some(roles_val) = data.get("roles") {
1427            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1428        }
1429
1430        // Extract terms
1431        if let Some(terms_val) = data.get("terms") {
1432            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1433        }
1434
1435        // Extract full servers array (not just type)
1436        if let Some(servers_val) = data.get("servers") {
1437            odcl_metadata.insert(
1438                "servers".to_string(),
1439                json_value_to_serde_value(servers_val),
1440            );
1441        }
1442
1443        // Extract infrastructure
1444        if let Some(infrastructure_val) = data.get("infrastructure") {
1445            odcl_metadata.insert(
1446                "infrastructure".to_string(),
1447                json_value_to_serde_value(infrastructure_val),
1448            );
1449        }
1450
1451        // Extract database type from servers if available
1452        let database_type = self.extract_database_type_from_servers(data);
1453
1454        // Extract catalog and schema from customProperties
1455        let (catalog_name, schema_name) = self.extract_catalog_schema(data);
1456
1457        // Extract sharedDomains from customProperties
1458        let mut shared_domains: Vec<String> = Vec::new();
1459        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1460            for prop in custom_props {
1461                if let Some(prop_obj) = prop.as_object() {
1462                    let prop_key = prop_obj
1463                        .get("property")
1464                        .and_then(|v| v.as_str())
1465                        .unwrap_or("");
1466                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1467                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1468                    {
1469                        for item in arr {
1470                            if let Some(s) = item.as_str() {
1471                                shared_domains.push(s.to_string());
1472                            }
1473                        }
1474                    }
1475                }
1476            }
1477        }
1478
1479        // Extract tags from top-level tags field (Data Contract format)
1480        let mut tags = Vec::new();
1481        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1482            for item in tags_arr {
1483                if let Some(s) = item.as_str() {
1484                    tags.push(s.to_string());
1485                }
1486            }
1487        }
1488
1489        // Extract quality rules
1490        let quality_rules = self.extract_quality_rules(data);
1491
1492        // Store sharedDomains in metadata
1493        if !shared_domains.is_empty() {
1494            let shared_domains_json: Vec<serde_json::Value> = shared_domains
1495                .iter()
1496                .map(|d| serde_json::Value::String(d.clone()))
1497                .collect();
1498            odcl_metadata.insert(
1499                "sharedDomains".to_string(),
1500                serde_json::Value::Array(shared_domains_json),
1501            );
1502        }
1503
1504        let table_uuid = self.extract_table_uuid(data);
1505
1506        let table = Table {
1507            id: table_uuid,
1508            name: model_name.clone(),
1509            columns,
1510            database_type,
1511            catalog_name,
1512            schema_name,
1513            medallion_layers: Vec::new(),
1514            scd_pattern: None,
1515            data_vault_classification: None,
1516            modeling_level: None,
1517            tags,
1518            odcl_metadata,
1519            owner: None,
1520            sla: None,
1521            contact_details: None,
1522            infrastructure_type: None,
1523            notes: None,
1524            position: None,
1525            yaml_file_path: None,
1526            drawio_cell_id: None,
1527            quality: quality_rules,
1528            errors: Vec::new(),
1529            created_at: chrono::Utc::now(),
1530            updated_at: chrono::Utc::now(),
1531        };
1532
1533        info!(
1534            "Parsed Data Contract table: {} with {} warnings/errors",
1535            model_name,
1536            errors.len()
1537        );
1538        Ok((table, errors))
1539    }
1540
1541    /// Expand a nested column from a schema definition, creating columns with dot notation.
1542    ///
1543    /// This helper function recursively expands nested structures (OBJECT/STRUCT types)
1544    /// into flat columns with dot notation (e.g., "address.street", "address.city").
1545    #[allow(clippy::only_used_in_recursion)]
1546    fn expand_nested_column(
1547        &self,
1548        column_name: &str,
1549        schema: &JsonValue,
1550        nullable: bool,
1551        columns: &mut Vec<Column>,
1552        errors: &mut Vec<ParserError>,
1553    ) {
1554        let schema_obj = match schema.as_object() {
1555            Some(obj) => obj,
1556            None => {
1557                errors.push(ParserError {
1558                    error_type: "parse_error".to_string(),
1559                    field: column_name.to_string(),
1560                    message: "Nested schema must be an object".to_string(),
1561                });
1562                return;
1563            }
1564        };
1565
1566        let schema_type = schema_obj
1567            .get("type")
1568            .and_then(|v| v.as_str())
1569            .unwrap_or("object");
1570
1571        match schema_type {
1572            "object" | "struct" => {
1573                // Check if it has nested properties
1574                if let Some(properties) = schema_obj.get("properties").and_then(|v| v.as_object()) {
1575                    let nested_required: Vec<String> = schema_obj
1576                        .get("required")
1577                        .and_then(|v| v.as_array())
1578                        .map(|arr| {
1579                            arr.iter()
1580                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
1581                                .collect()
1582                        })
1583                        .unwrap_or_default();
1584
1585                    for (nested_name, nested_schema) in properties {
1586                        let nested_nullable = !nested_required.contains(nested_name);
1587                        self.expand_nested_column(
1588                            &format!("{}.{}", column_name, nested_name),
1589                            nested_schema,
1590                            nullable || nested_nullable,
1591                            columns,
1592                            errors,
1593                        );
1594                    }
1595                } else {
1596                    // Object without properties - create as OBJECT type
1597                    let description = schema_obj
1598                        .get("description")
1599                        .and_then(|v| v.as_str())
1600                        .unwrap_or("")
1601                        .to_string();
1602                    columns.push(Column {
1603                        name: column_name.to_string(),
1604                        data_type: "OBJECT".to_string(),
1605                        nullable,
1606                        primary_key: false,
1607                        secondary_key: false,
1608                        composite_key: None,
1609                        foreign_key: None,
1610                        constraints: Vec::new(),
1611                        description,
1612                        quality: Vec::new(),
1613                        enum_values: Vec::new(),
1614                        errors: Vec::new(),
1615                        column_order: 0,
1616                    });
1617                }
1618            }
1619            "array" => {
1620                // Handle array types
1621                let items = schema_obj.get("items").unwrap_or(schema);
1622                let items_type = items
1623                    .as_object()
1624                    .and_then(|obj| obj.get("type").and_then(|v| v.as_str()))
1625                    .unwrap_or("string");
1626
1627                if items_type == "object" || items_type == "struct" {
1628                    // Array of objects - expand nested structure
1629                    let description = schema_obj
1630                        .get("description")
1631                        .and_then(|v| v.as_str())
1632                        .unwrap_or("")
1633                        .to_string();
1634                    columns.push(Column {
1635                        name: column_name.to_string(),
1636                        data_type: "ARRAY<OBJECT>".to_string(),
1637                        nullable,
1638                        primary_key: false,
1639                        secondary_key: false,
1640                        composite_key: None,
1641                        foreign_key: None,
1642                        constraints: Vec::new(),
1643                        description,
1644                        quality: Vec::new(),
1645                        enum_values: Vec::new(),
1646                        errors: Vec::new(),
1647                        column_order: 0,
1648                    });
1649                    // Also expand nested properties with array prefix
1650                    if let Some(properties) = items
1651                        .as_object()
1652                        .and_then(|obj| obj.get("properties").and_then(|v| v.as_object()))
1653                    {
1654                        let nested_required: Vec<String> = items
1655                            .as_object()
1656                            .and_then(|obj| obj.get("required").and_then(|v| v.as_array()))
1657                            .map(|arr| {
1658                                arr.iter()
1659                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1660                                    .collect()
1661                            })
1662                            .unwrap_or_default();
1663
1664                        for (nested_name, nested_schema) in properties {
1665                            let nested_nullable = !nested_required.contains(nested_name);
1666                            self.expand_nested_column(
1667                                &format!("{}.{}", column_name, nested_name),
1668                                nested_schema,
1669                                nullable || nested_nullable,
1670                                columns,
1671                                errors,
1672                            );
1673                        }
1674                    }
1675                } else {
1676                    // Array of primitives
1677                    let data_type = format!("ARRAY<{}>", items_type.to_uppercase());
1678                    let description = schema_obj
1679                        .get("description")
1680                        .and_then(|v| v.as_str())
1681                        .unwrap_or("")
1682                        .to_string();
1683                    columns.push(Column {
1684                        name: column_name.to_string(),
1685                        data_type,
1686                        nullable,
1687                        primary_key: false,
1688                        secondary_key: false,
1689                        composite_key: None,
1690                        foreign_key: None,
1691                        constraints: Vec::new(),
1692                        description,
1693                        quality: Vec::new(),
1694                        enum_values: Vec::new(),
1695                        errors: Vec::new(),
1696                        column_order: 0,
1697                    });
1698                }
1699            }
1700            _ => {
1701                // Simple type
1702                let data_type = schema_type.to_uppercase();
1703                let description = schema_obj
1704                    .get("description")
1705                    .and_then(|v| v.as_str())
1706                    .unwrap_or("")
1707                    .to_string();
1708                let enum_values = schema_obj
1709                    .get("enum")
1710                    .and_then(|v| v.as_array())
1711                    .map(|arr| {
1712                        arr.iter()
1713                            .filter_map(|v| v.as_str().map(|s| s.to_string()))
1714                            .collect()
1715                    })
1716                    .unwrap_or_default();
1717                columns.push(Column {
1718                    name: column_name.to_string(),
1719                    data_type,
1720                    nullable,
1721                    primary_key: false,
1722                    secondary_key: false,
1723                    composite_key: None,
1724                    foreign_key: None,
1725                    constraints: Vec::new(),
1726                    description,
1727                    quality: Vec::new(),
1728                    enum_values,
1729                    errors: Vec::new(),
1730                    column_order: 0,
1731                });
1732            }
1733        }
1734    }
1735
1736    /// Parse a single field from Data Contract format.
1737    fn parse_data_contract_field(
1738        &self,
1739        field_name: &str,
1740        field_data: &serde_json::Map<String, JsonValue>,
1741        data: &JsonValue,
1742        errors: &mut Vec<ParserError>,
1743    ) -> Result<Vec<Column>> {
1744        let mut columns = Vec::new();
1745
1746        // Helper function to extract quality rules from a JSON object
1747        let extract_quality_from_obj =
1748            |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
1749                let mut quality_rules = Vec::new();
1750                if let Some(quality_val) = obj.get("quality") {
1751                    if let Some(arr) = quality_val.as_array() {
1752                        // Array of quality rules
1753                        for item in arr {
1754                            if let Some(rule_obj) = item.as_object() {
1755                                let mut rule = HashMap::new();
1756                                for (key, value) in rule_obj {
1757                                    rule.insert(key.clone(), json_value_to_serde_value(value));
1758                                }
1759                                quality_rules.push(rule);
1760                            }
1761                        }
1762                    } else if let Some(rule_obj) = quality_val.as_object() {
1763                        // Single quality rule object
1764                        let mut rule = HashMap::new();
1765                        for (key, value) in rule_obj {
1766                            rule.insert(key.clone(), json_value_to_serde_value(value));
1767                        }
1768                        quality_rules.push(rule);
1769                    }
1770                }
1771                quality_rules
1772            };
1773
1774        // Check for $ref
1775        if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
1776            if let Some(definition) = self.resolve_ref(ref_str, data) {
1777                // Extract quality rules from field-level first (takes precedence)
1778                let mut quality_rules = extract_quality_from_obj(field_data);
1779
1780                // Also extract quality rules from definition and merge (if field doesn't have any)
1781                if quality_rules.is_empty() {
1782                    if let Some(def_obj) = definition.as_object() {
1783                        quality_rules = extract_quality_from_obj(def_obj);
1784                    }
1785                } else {
1786                    // Merge definition quality rules if field has some
1787                    if let Some(def_obj) = definition.as_object() {
1788                        let def_quality = extract_quality_from_obj(def_obj);
1789                        // Append definition quality rules (field-level takes precedence)
1790                        quality_rules.extend(def_quality);
1791                    }
1792                }
1793
1794                let required = field_data
1795                    .get("required")
1796                    .and_then(|v| v.as_bool())
1797                    .unwrap_or(false);
1798
1799                // If required=true, add not_null quality rule if not present
1800                if required {
1801                    let has_not_null = quality_rules.iter().any(|rule| {
1802                        rule.get("type")
1803                            .and_then(|v| v.as_str())
1804                            .map(|s| {
1805                                s.to_lowercase().contains("not_null")
1806                                    || s.to_lowercase().contains("notnull")
1807                            })
1808                            .unwrap_or(false)
1809                    });
1810                    if !has_not_null {
1811                        let mut not_null_rule = HashMap::new();
1812                        not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
1813                        not_null_rule.insert(
1814                            "description".to_string(),
1815                            serde_json::json!("Column must not be null"),
1816                        );
1817                        quality_rules.push(not_null_rule);
1818                    }
1819                }
1820
1821                // Check if definition is an object/struct with nested structure
1822                let has_nested = definition
1823                    .get("type")
1824                    .and_then(|v| v.as_str())
1825                    .map(|s| s == "object")
1826                    .unwrap_or(false)
1827                    || definition.get("properties").is_some()
1828                    || definition.get("fields").is_some();
1829
1830                if has_nested {
1831                    // Expand STRUCT from definition into nested columns with dot notation
1832                    if let Some(properties) =
1833                        definition.get("properties").and_then(|v| v.as_object())
1834                    {
1835                        // Recursively expand nested properties
1836                        let nested_required: Vec<String> = definition
1837                            .get("required")
1838                            .and_then(|v| v.as_array())
1839                            .map(|arr| {
1840                                arr.iter()
1841                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1842                                    .collect()
1843                            })
1844                            .unwrap_or_default();
1845
1846                        for (nested_name, nested_schema) in properties {
1847                            let nested_required_field = nested_required.contains(nested_name);
1848                            self.expand_nested_column(
1849                                &format!("{}.{}", field_name, nested_name),
1850                                nested_schema,
1851                                !nested_required_field,
1852                                &mut columns,
1853                                errors,
1854                            );
1855                        }
1856                    } else if let Some(fields) =
1857                        definition.get("fields").and_then(|v| v.as_object())
1858                    {
1859                        // Handle fields format (ODCL style)
1860                        for (nested_name, nested_schema) in fields {
1861                            self.expand_nested_column(
1862                                &format!("{}.{}", field_name, nested_name),
1863                                nested_schema,
1864                                true, // Assume nullable if not specified
1865                                &mut columns,
1866                                errors,
1867                            );
1868                        }
1869                    } else {
1870                        // Fallback: create parent column as OBJECT if we can't expand
1871                        columns.push(Column {
1872                            name: field_name.to_string(),
1873                            data_type: "OBJECT".to_string(),
1874                            nullable: !required,
1875                            primary_key: false,
1876                            secondary_key: false,
1877                            composite_key: None,
1878                            foreign_key: None,
1879                            constraints: Vec::new(),
1880                            description: field_data
1881                                .get("description")
1882                                .or_else(|| definition.get("description"))
1883                                .and_then(|v| v.as_str())
1884                                .unwrap_or("")
1885                                .to_string(),
1886                            errors: Vec::new(),
1887                            quality: quality_rules.clone(),
1888                            enum_values: Vec::new(),
1889                            column_order: 0,
1890                        });
1891                    }
1892                } else {
1893                    // Simple type from definition
1894                    let def_type = definition
1895                        .get("type")
1896                        .and_then(|v| v.as_str())
1897                        .unwrap_or("STRING")
1898                        .to_uppercase();
1899
1900                    let enum_values = definition
1901                        .get("enum")
1902                        .and_then(|v| v.as_array())
1903                        .map(|arr| {
1904                            arr.iter()
1905                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
1906                                .collect()
1907                        })
1908                        .unwrap_or_default();
1909
1910                    columns.push(Column {
1911                        name: field_name.to_string(),
1912                        data_type: def_type,
1913                        nullable: !required,
1914                        primary_key: false,
1915                        secondary_key: false,
1916                        composite_key: None,
1917                        foreign_key: None,
1918                        constraints: Vec::new(),
1919                        description: field_data
1920                            .get("description")
1921                            .or_else(|| definition.get("description"))
1922                            .and_then(|v| v.as_str())
1923                            .unwrap_or("")
1924                            .to_string(),
1925                        errors: Vec::new(),
1926                        quality: quality_rules,
1927                        enum_values,
1928                        column_order: 0,
1929                    });
1930                }
1931                return Ok(columns);
1932            } else {
1933                // Undefined reference - create column with error
1934                let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
1935                let mut error_map = HashMap::new();
1936                error_map.insert("type".to_string(), serde_json::json!("validation_error"));
1937                error_map.insert("field".to_string(), serde_json::json!("data_type"));
1938                error_map.insert(
1939                    "message".to_string(),
1940                    serde_json::json!(format!(
1941                        "Field '{}' references undefined definition: {}",
1942                        field_name, ref_str
1943                    )),
1944                );
1945                col_errors.push(error_map);
1946                columns.push(Column {
1947                    name: field_name.to_string(),
1948                    data_type: "OBJECT".to_string(),
1949                    nullable: true,
1950                    primary_key: false,
1951                    secondary_key: false,
1952                    composite_key: None,
1953                    foreign_key: None,
1954                    constraints: Vec::new(),
1955                    description: field_data
1956                        .get("description")
1957                        .and_then(|v| v.as_str())
1958                        .unwrap_or("")
1959                        .to_string(),
1960                    errors: col_errors,
1961                    quality: Vec::new(),
1962                    enum_values: Vec::new(),
1963                    column_order: 0,
1964                });
1965                return Ok(columns);
1966            }
1967        }
1968
1969        // Extract field type - default to STRING if missing
1970        let field_type_str = field_data
1971            .get("type")
1972            .and_then(|v| v.as_str())
1973            .unwrap_or("STRING");
1974
1975        // Check if type contains STRUCT definition (multiline STRUCT type)
1976        if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
1977            match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
1978                Ok(nested_cols) if !nested_cols.is_empty() => {
1979                    // We have nested columns - add parent column with full type, then nested columns
1980                    let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
1981                        "ARRAY<STRUCT<...>>".to_string()
1982                    } else {
1983                        "STRUCT<...>".to_string()
1984                    };
1985
1986                    // Add parent column
1987                    columns.push(Column {
1988                        name: field_name.to_string(),
1989                        data_type: parent_data_type,
1990                        nullable: !field_data
1991                            .get("required")
1992                            .and_then(|v| v.as_bool())
1993                            .unwrap_or(false),
1994                        primary_key: false,
1995                        secondary_key: false,
1996                        composite_key: None,
1997                        foreign_key: None,
1998                        constraints: Vec::new(),
1999                        description: field_data
2000                            .get("description")
2001                            .and_then(|v| v.as_str())
2002                            .unwrap_or("")
2003                            .to_string(),
2004                        errors: Vec::new(),
2005                        quality: Vec::new(),
2006                        enum_values: Vec::new(),
2007                        column_order: 0,
2008                    });
2009
2010                    // Add nested columns
2011                    columns.extend(nested_cols);
2012                    return Ok(columns);
2013                }
2014                Ok(_) | Err(_) => {
2015                    // If parsing fails or returns empty, fall back to using the type as-is
2016                }
2017            }
2018        }
2019
2020        let field_type = normalize_data_type(field_type_str);
2021
2022        // Handle ARRAY type
2023        if field_type == "ARRAY" {
2024            let items = field_data.get("items");
2025            if let Some(items_val) = items {
2026                if let Some(items_obj) = items_val.as_object() {
2027                    // Check if items is an object with fields (nested structure)
2028                    if items_obj.get("fields").is_some()
2029                        || items_obj.get("type").and_then(|v| v.as_str()) == Some("object")
2030                    {
2031                        // Array of objects - create parent column as ARRAY<OBJECT>
2032                        columns.push(Column {
2033                            name: field_name.to_string(),
2034                            data_type: "ARRAY<OBJECT>".to_string(),
2035                            nullable: !field_data
2036                                .get("required")
2037                                .and_then(|v| v.as_bool())
2038                                .unwrap_or(false),
2039                            primary_key: false,
2040                            secondary_key: false,
2041                            composite_key: None,
2042                            foreign_key: None,
2043                            constraints: Vec::new(),
2044                            description: field_data
2045                                .get("description")
2046                                .and_then(|v| v.as_str())
2047                                .unwrap_or("")
2048                                .to_string(),
2049                            errors: Vec::new(),
2050                            quality: Vec::new(),
2051                            enum_values: Vec::new(),
2052                            column_order: 0,
2053                        });
2054
2055                        // Extract nested fields from items.properties or items.fields if present
2056                        // Note: Export saves nested columns in properties.properties, but some formats use fields
2057                        let nested_fields_obj = items_obj
2058                            .get("properties")
2059                            .and_then(|v| v.as_object())
2060                            .or_else(|| items_obj.get("fields").and_then(|v| v.as_object()));
2061
2062                        if let Some(fields_obj) = nested_fields_obj {
2063                            for (nested_field_name, nested_field_data) in fields_obj {
2064                                if let Some(nested_field_obj) = nested_field_data.as_object() {
2065                                    let nested_field_type = nested_field_obj
2066                                        .get("type")
2067                                        .and_then(|v| v.as_str())
2068                                        .unwrap_or("STRING");
2069
2070                                    // Recursively parse nested fields with array prefix
2071                                    let nested_col_name =
2072                                        format!("{}.{}", field_name, nested_field_name);
2073                                    let mut local_errors = Vec::new();
2074                                    match self.parse_data_contract_field(
2075                                        &nested_col_name,
2076                                        nested_field_obj,
2077                                        data,
2078                                        &mut local_errors,
2079                                    ) {
2080                                        Ok(mut nested_cols) => {
2081                                            // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2082                                            // Otherwise, it returns a single column
2083                                            columns.append(&mut nested_cols);
2084                                        }
2085                                        Err(_) => {
2086                                            // Fallback: create simple nested column
2087                                            columns.push(Column {
2088                                                name: nested_col_name,
2089                                                data_type: nested_field_type.to_uppercase(),
2090                                                nullable: !nested_field_obj
2091                                                    .get("required")
2092                                                    .and_then(|v| v.as_bool())
2093                                                    .unwrap_or(false),
2094                                                primary_key: false,
2095                                                secondary_key: false,
2096                                                composite_key: None,
2097                                                foreign_key: None,
2098                                                constraints: Vec::new(),
2099                                                description: nested_field_obj
2100                                                    .get("description")
2101                                                    .and_then(|v| v.as_str())
2102                                                    .unwrap_or("")
2103                                                    .to_string(),
2104                                                errors: Vec::new(),
2105                                                quality: Vec::new(),
2106                                                enum_values: Vec::new(),
2107                                                column_order: 0,
2108                                            });
2109                                        }
2110                                    }
2111                                }
2112                            }
2113                        }
2114
2115                        return Ok(columns);
2116                    } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
2117                        // Array of simple type
2118                        columns.push(Column {
2119                            name: field_name.to_string(),
2120                            data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
2121                            nullable: !field_data
2122                                .get("required")
2123                                .and_then(|v| v.as_bool())
2124                                .unwrap_or(false),
2125                            primary_key: false,
2126                            secondary_key: false,
2127                            composite_key: None,
2128                            foreign_key: None,
2129                            constraints: Vec::new(),
2130                            description: field_data
2131                                .get("description")
2132                                .and_then(|v| v.as_str())
2133                                .unwrap_or("")
2134                                .to_string(),
2135                            errors: Vec::new(),
2136                            quality: Vec::new(),
2137                            enum_values: Vec::new(),
2138                            column_order: 0,
2139                        });
2140                        return Ok(columns);
2141                    }
2142                } else if let Some(item_type_str) = items_val.as_str() {
2143                    // Array of simple type (string)
2144                    columns.push(Column {
2145                        name: field_name.to_string(),
2146                        data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
2147                        nullable: !field_data
2148                            .get("required")
2149                            .and_then(|v| v.as_bool())
2150                            .unwrap_or(false),
2151                        primary_key: false,
2152                        secondary_key: false,
2153                        composite_key: None,
2154                        foreign_key: None,
2155                        constraints: Vec::new(),
2156                        description: field_data
2157                            .get("description")
2158                            .and_then(|v| v.as_str())
2159                            .unwrap_or("")
2160                            .to_string(),
2161                        errors: Vec::new(),
2162                        quality: Vec::new(),
2163                        enum_values: Vec::new(),
2164                        column_order: 0,
2165                    });
2166                    return Ok(columns);
2167                }
2168            }
2169            // Array without items - default to ARRAY<STRING>
2170            columns.push(Column {
2171                name: field_name.to_string(),
2172                data_type: "ARRAY<STRING>".to_string(),
2173                nullable: !field_data
2174                    .get("required")
2175                    .and_then(|v| v.as_bool())
2176                    .unwrap_or(false),
2177                primary_key: false,
2178                secondary_key: false,
2179                composite_key: None,
2180                foreign_key: None,
2181                constraints: Vec::new(),
2182                description: field_data
2183                    .get("description")
2184                    .and_then(|v| v.as_str())
2185                    .unwrap_or("")
2186                    .to_string(),
2187                errors: Vec::new(),
2188                quality: Vec::new(),
2189                enum_values: Vec::new(),
2190                column_order: 0,
2191            });
2192            return Ok(columns);
2193        }
2194
2195        // Check if this is a nested object with fields or properties
2196        // Note: Export saves nested columns in properties.properties, but some formats use fields
2197        let nested_fields_obj = field_data
2198            .get("properties")
2199            .and_then(|v| v.as_object())
2200            .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
2201
2202        if field_type == "OBJECT"
2203            && let Some(fields_obj) = nested_fields_obj
2204        {
2205            // Inline nested object - create parent column as OBJECT and extract nested fields
2206
2207            // Create parent column
2208            columns.push(Column {
2209                name: field_name.to_string(),
2210                data_type: "OBJECT".to_string(),
2211                nullable: !field_data
2212                    .get("required")
2213                    .and_then(|v| v.as_bool())
2214                    .unwrap_or(false),
2215                primary_key: false,
2216                secondary_key: false,
2217                composite_key: None,
2218                foreign_key: None,
2219                constraints: Vec::new(),
2220                description: field_data
2221                    .get("description")
2222                    .and_then(|v| v.as_str())
2223                    .unwrap_or("")
2224                    .to_string(),
2225                errors: Vec::new(),
2226                quality: Vec::new(),
2227                enum_values: Vec::new(),
2228                column_order: 0,
2229            });
2230
2231            // Extract nested fields recursively
2232            for (nested_field_name, nested_field_data) in fields_obj {
2233                if let Some(nested_field_obj) = nested_field_data.as_object() {
2234                    let nested_field_type = nested_field_obj
2235                        .get("type")
2236                        .and_then(|v| v.as_str())
2237                        .unwrap_or("STRING");
2238
2239                    // Recursively parse nested fields
2240                    let nested_col_name = format!("{}.{}", field_name, nested_field_name);
2241                    match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data) {
2242                        Ok(mut nested_cols) => {
2243                            // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
2244                            // Otherwise, it returns a single column
2245                            columns.append(&mut nested_cols);
2246                        }
2247                        Err(_) => {
2248                            // Fallback: create simple nested column
2249                            columns.push(Column {
2250                                name: nested_col_name,
2251                                data_type: nested_field_type.to_uppercase(),
2252                                nullable: !nested_field_obj
2253                                    .get("required")
2254                                    .and_then(|v| v.as_bool())
2255                                    .unwrap_or(false),
2256                                primary_key: false,
2257                                secondary_key: false,
2258                                composite_key: None,
2259                                foreign_key: None,
2260                                constraints: Vec::new(),
2261                                description: nested_field_obj
2262                                    .get("description")
2263                                    .and_then(|v| v.as_str())
2264                                    .unwrap_or("")
2265                                    .to_string(),
2266                                errors: Vec::new(),
2267                                quality: Vec::new(),
2268                                enum_values: Vec::new(),
2269                                column_order: 0,
2270                            });
2271                        }
2272                    }
2273                }
2274            }
2275
2276            return Ok(columns);
2277        }
2278
2279        // Regular field
2280        let required = field_data
2281            .get("required")
2282            .and_then(|v| v.as_bool())
2283            .unwrap_or(false);
2284        let description = field_data
2285            .get("description")
2286            .and_then(|v| v.as_str())
2287            .unwrap_or("")
2288            .to_string();
2289
2290        // Extract column-level quality rules
2291        let mut column_quality_rules = Vec::new();
2292        if let Some(quality_val) = field_data.get("quality") {
2293            if let Some(arr) = quality_val.as_array() {
2294                // Array of quality rules
2295                for item in arr {
2296                    if let Some(obj) = item.as_object() {
2297                        let mut rule = HashMap::new();
2298                        for (key, value) in obj {
2299                            rule.insert(key.clone(), json_value_to_serde_value(value));
2300                        }
2301                        column_quality_rules.push(rule);
2302                    }
2303                }
2304            } else if let Some(obj) = quality_val.as_object() {
2305                // Single quality rule object
2306                let mut rule = HashMap::new();
2307                for (key, value) in obj {
2308                    rule.insert(key.clone(), json_value_to_serde_value(value));
2309                }
2310                column_quality_rules.push(rule);
2311            }
2312        }
2313
2314        // If required=true (nullable=false), add a "not_null" quality rule if not already present
2315        if required {
2316            let has_not_null = column_quality_rules.iter().any(|rule| {
2317                rule.get("type")
2318                    .and_then(|v| v.as_str())
2319                    .map(|s| {
2320                        s.to_lowercase().contains("not_null")
2321                            || s.to_lowercase().contains("notnull")
2322                    })
2323                    .unwrap_or(false)
2324            });
2325            if !has_not_null {
2326                let mut not_null_rule = HashMap::new();
2327                not_null_rule.insert("type".to_string(), serde_json::json!("not_null"));
2328                not_null_rule.insert(
2329                    "description".to_string(),
2330                    serde_json::json!("Column must not be null"),
2331                );
2332                column_quality_rules.push(not_null_rule);
2333            }
2334        }
2335
2336        columns.push(Column {
2337            name: field_name.to_string(),
2338            data_type: field_type,
2339            nullable: !required,
2340            primary_key: field_data
2341                .get("primaryKey")
2342                .and_then(|v| v.as_bool())
2343                .unwrap_or(false),
2344            secondary_key: false,
2345            composite_key: None,
2346            foreign_key: self.parse_foreign_key_from_data_contract(field_data),
2347            constraints: Vec::new(),
2348            description,
2349            errors: Vec::new(),
2350            quality: column_quality_rules,
2351            enum_values: Vec::new(),
2352            column_order: 0,
2353        });
2354
2355        Ok(columns)
2356    }
2357
2358    /// Parse foreign key from Data Contract field data.
2359    fn parse_foreign_key_from_data_contract(
2360        &self,
2361        field_data: &serde_json::Map<String, JsonValue>,
2362    ) -> Option<ForeignKey> {
2363        field_data
2364            .get("foreignKey")
2365            .and_then(|v| v.as_object())
2366            .map(|fk_obj| ForeignKey {
2367                table_id: fk_obj
2368                    .get("table")
2369                    .or_else(|| fk_obj.get("table_id"))
2370                    .and_then(|v| v.as_str())
2371                    .unwrap_or("")
2372                    .to_string(),
2373                column_name: fk_obj
2374                    .get("column")
2375                    .or_else(|| fk_obj.get("column_name"))
2376                    .and_then(|v| v.as_str())
2377                    .unwrap_or("")
2378                    .to_string(),
2379            })
2380    }
2381
2382    /// Extract database type from servers in Data Contract format.
2383    fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2384        // Data Contract format: servers can be object or array
2385        if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
2386            // Object format: { "server_name": { "type": "..." } }
2387            if let Some((_, server_data)) = servers_obj.iter().next()
2388                && let Some(server_obj) = server_data.as_object()
2389            {
2390                return server_obj
2391                    .get("type")
2392                    .and_then(|v| v.as_str())
2393                    .and_then(|s| self.parse_database_type(s));
2394            }
2395        } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
2396            // Array format: [ { "server": "...", "type": "..." } ]
2397            if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
2398                return server_obj
2399                    .get("type")
2400                    .and_then(|v| v.as_str())
2401                    .and_then(|s| self.parse_database_type(s));
2402            }
2403        }
2404        None
2405    }
2406
2407    /// Parse database type string to enum.
2408    fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
2409        match s.to_lowercase().as_str() {
2410            "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
2411            "postgres" | "postgresql" => Some(DatabaseType::Postgres),
2412            "mysql" => Some(DatabaseType::Mysql),
2413            "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
2414            "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
2415            _ => None,
2416        }
2417    }
2418
2419    /// Parse STRUCT type definition from string (e.g., "ARRAY<STRUCT<ID: STRING, NAME: STRING>>").
2420    fn parse_struct_type_from_string(
2421        &self,
2422        field_name: &str,
2423        type_str: &str,
2424        field_data: &serde_json::Map<String, JsonValue>,
2425    ) -> Result<Vec<Column>> {
2426        let mut columns = Vec::new();
2427
2428        // Normalize whitespace - replace newlines and multiple spaces with single space
2429        let normalized_type = type_str
2430            .lines()
2431            .map(|line| line.trim())
2432            .filter(|line| !line.is_empty())
2433            .collect::<Vec<_>>()
2434            .join(" ");
2435
2436        let type_str_upper = normalized_type.to_uppercase();
2437
2438        // Check if it's ARRAY<STRUCT<...>>
2439        let _is_array = type_str_upper.starts_with("ARRAY<");
2440        let struct_start = type_str_upper.find("STRUCT<");
2441
2442        if let Some(start_pos) = struct_start {
2443            // Extract STRUCT content
2444            let struct_content = &normalized_type[start_pos + 7..]; // Skip "STRUCT<"
2445
2446            // Find matching closing bracket, handling nested STRUCTs
2447            let mut depth = 1;
2448            let mut end_pos = None;
2449            for (i, ch) in struct_content.char_indices() {
2450                match ch {
2451                    '<' => depth += 1,
2452                    '>' => {
2453                        depth -= 1;
2454                        if depth == 0 {
2455                            end_pos = Some(i);
2456                            break;
2457                        }
2458                    }
2459                    _ => {}
2460                }
2461            }
2462
2463            // If no closing bracket found, try to infer it (handle malformed YAML)
2464            let struct_fields_str = if let Some(end) = end_pos {
2465                &struct_content[..end]
2466            } else {
2467                // Missing closing bracket - use everything up to the end
2468                struct_content.trim_end_matches('>').trim()
2469            };
2470
2471            // Parse fields: "ID: STRING, NAME: STRING, ..."
2472            let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
2473
2474            // Create nested columns, handling nested STRUCTs recursively
2475            for (nested_name, nested_type) in fields {
2476                let nested_type_upper = nested_type.to_uppercase();
2477                let nested_col_name = format!("{}.{}", field_name, nested_name);
2478
2479                // Check if this field is itself a STRUCT or ARRAY<STRUCT>
2480                if nested_type_upper.starts_with("STRUCT<") {
2481                    // Recursively parse nested STRUCT by creating a synthetic field_data
2482                    // and calling parse_struct_type_from_string recursively
2483                    let nested_struct_type_str = if nested_type_upper.starts_with("ARRAY<STRUCT<") {
2484                        // Handle ARRAY<STRUCT<...>>
2485                        nested_type.clone()
2486                    } else {
2487                        // Handle STRUCT<...>
2488                        nested_type.clone()
2489                    };
2490
2491                    // Recursively parse the nested STRUCT
2492                    match self.parse_struct_type_from_string(
2493                        &nested_col_name,
2494                        &nested_struct_type_str,
2495                        field_data,
2496                    ) {
2497                        Ok(nested_cols) => {
2498                            // Add all recursively parsed columns
2499                            columns.extend(nested_cols);
2500                        }
2501                        Err(_) => {
2502                            // If recursive parsing fails, add the parent column with STRUCT type
2503                            columns.push(Column {
2504                                name: nested_col_name,
2505                                data_type: normalize_data_type(&nested_type),
2506                                nullable: !field_data
2507                                    .get("required")
2508                                    .and_then(|v| v.as_bool())
2509                                    .unwrap_or(false),
2510                                primary_key: false,
2511                                secondary_key: false,
2512                                composite_key: None,
2513                                foreign_key: None,
2514                                constraints: Vec::new(),
2515                                description: field_data
2516                                    .get("description")
2517                                    .and_then(|v| v.as_str())
2518                                    .unwrap_or("")
2519                                    .to_string(),
2520                                errors: Vec::new(),
2521                                quality: Vec::new(),
2522                                enum_values: Vec::new(),
2523                                column_order: 0,
2524                            });
2525                        }
2526                    }
2527                } else {
2528                    // Simple nested field (not a STRUCT)
2529                    columns.push(Column {
2530                        name: nested_col_name,
2531                        data_type: normalize_data_type(&nested_type),
2532                        nullable: !field_data
2533                            .get("required")
2534                            .and_then(|v| v.as_bool())
2535                            .unwrap_or(false),
2536                        primary_key: false,
2537                        secondary_key: false,
2538                        composite_key: None,
2539                        foreign_key: None,
2540                        constraints: Vec::new(),
2541                        description: field_data
2542                            .get("description")
2543                            .and_then(|v| v.as_str())
2544                            .unwrap_or("")
2545                            .to_string(),
2546                        errors: Vec::new(),
2547                        quality: Vec::new(),
2548                        enum_values: Vec::new(),
2549                        column_order: 0,
2550                    });
2551                }
2552            }
2553
2554            return Ok(columns);
2555        }
2556
2557        // If no STRUCT found, return empty (fallback to regular parsing)
2558        Ok(Vec::new())
2559    }
2560
2561    /// Parse STRUCT fields from string (e.g., "ID: STRING, NAME: STRING").
2562    fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
2563        let mut fields = Vec::new();
2564        let mut current_field = String::new();
2565        let mut depth = 0;
2566        let mut in_string = false;
2567        let mut string_char = None;
2568
2569        for ch in fields_str.chars() {
2570            match ch {
2571                '\'' | '"' if !in_string || Some(ch) == string_char => {
2572                    if in_string {
2573                        in_string = false;
2574                        string_char = None;
2575                    } else {
2576                        in_string = true;
2577                        string_char = Some(ch);
2578                    }
2579                    current_field.push(ch);
2580                }
2581                '<' if !in_string => {
2582                    depth += 1;
2583                    current_field.push(ch);
2584                }
2585                '>' if !in_string => {
2586                    depth -= 1;
2587                    current_field.push(ch);
2588                }
2589                ',' if !in_string && depth == 0 => {
2590                    // End of current field
2591                    let trimmed = current_field.trim();
2592                    if !trimmed.is_empty()
2593                        && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2594                    {
2595                        fields.push((name, type_part));
2596                    }
2597                    current_field.clear();
2598                }
2599                _ => {
2600                    current_field.push(ch);
2601                }
2602            }
2603        }
2604
2605        // Handle last field
2606        let trimmed = current_field.trim();
2607        if !trimmed.is_empty()
2608            && let Some((name, type_part)) = self.parse_field_definition(trimmed)
2609        {
2610            fields.push((name, type_part));
2611        }
2612
2613        Ok(fields)
2614    }
2615
2616    /// Parse a single field definition (e.g., "ID: STRING" or "ALERTOPERATION: STRUCT<...>").
2617    fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
2618        // Split by colon, but handle nested STRUCTs
2619        let colon_pos = field_def.find(':')?;
2620        let name = field_def[..colon_pos].trim().to_string();
2621        let type_part = field_def[colon_pos + 1..].trim().to_string();
2622
2623        if name.is_empty() || type_part.is_empty() {
2624            return None;
2625        }
2626
2627        Some((name, type_part))
2628    }
2629
2630    /// Extract catalog and schema from customProperties.
2631    fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
2632        let mut catalog_name = None;
2633        let mut schema_name = None;
2634
2635        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2636            for prop in custom_props {
2637                if let Some(prop_obj) = prop.as_object() {
2638                    let prop_key = prop_obj
2639                        .get("property")
2640                        .and_then(|v| v.as_str())
2641                        .unwrap_or("");
2642                    let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
2643
2644                    match prop_key {
2645                        "catalogName" | "catalog_name" => {
2646                            catalog_name = prop_value.map(|s| s.to_string());
2647                        }
2648                        "schemaName" | "schema_name" => {
2649                            schema_name = prop_value.map(|s| s.to_string());
2650                        }
2651                        _ => {}
2652                    }
2653                }
2654            }
2655        }
2656
2657        // Also check direct fields
2658        if catalog_name.is_none() {
2659            catalog_name = data
2660                .get("catalog_name")
2661                .and_then(|v| v.as_str())
2662                .map(|s| s.to_string());
2663        }
2664        if schema_name.is_none() {
2665            schema_name = data
2666                .get("schema_name")
2667                .and_then(|v| v.as_str())
2668                .map(|s| s.to_string());
2669        }
2670
2671        (catalog_name, schema_name)
2672    }
2673}
2674
2675impl Default for ODCSImporter {
2676    fn default() -> Self {
2677        Self::new()
2678    }
2679}
2680
2681/// Parser error structure for detailed error reporting.
2682#[derive(Debug, Clone)]
2683pub struct ParserError {
2684    pub error_type: String,
2685    pub field: String,
2686    pub message: String,
2687}
2688
2689/// Convert YAML Value to JSON Value for easier manipulation.
2690fn yaml_to_json_value(yaml: &serde_yaml::Value) -> Result<JsonValue> {
2691    // Convert YAML to JSON via serialization
2692    let json_str = serde_json::to_string(yaml).context("Failed to convert YAML to JSON")?;
2693    serde_json::from_str(&json_str).context("Failed to parse JSON")
2694}
2695
2696/// Convert JSON Value to serde_json::Value for storage in HashMap.
2697fn json_value_to_serde_value(value: &JsonValue) -> serde_json::Value {
2698    value.clone()
2699}
2700
2701/// Normalize data type to uppercase, preserving STRUCT<...>, ARRAY<...>, MAP<...> format.
2702fn normalize_data_type(data_type: &str) -> String {
2703    if data_type.is_empty() {
2704        return data_type.to_string();
2705    }
2706
2707    let upper = data_type.to_uppercase();
2708
2709    // Handle STRUCT<...>, ARRAY<...>, MAP<...> preserving inner content
2710    if upper.starts_with("STRUCT") {
2711        if let Some(start) = data_type.find('<')
2712            && let Some(end) = data_type.rfind('>')
2713        {
2714            let inner = &data_type[start + 1..end];
2715            return format!("STRUCT<{}>", inner);
2716        }
2717        return format!("STRUCT{}", &data_type[6..]);
2718    } else if upper.starts_with("ARRAY") {
2719        if let Some(start) = data_type.find('<')
2720            && let Some(end) = data_type.rfind('>')
2721        {
2722            let inner = &data_type[start + 1..end];
2723            return format!("ARRAY<{}>", inner);
2724        }
2725        return format!("ARRAY{}", &data_type[5..]);
2726    } else if upper.starts_with("MAP") {
2727        if let Some(start) = data_type.find('<')
2728            && let Some(end) = data_type.rfind('>')
2729        {
2730            let inner = &data_type[start + 1..end];
2731            return format!("MAP<{}>", inner);
2732        }
2733        return format!("MAP{}", &data_type[3..]);
2734    }
2735
2736    upper
2737}
2738
2739// Helper functions for enum parsing
2740fn parse_medallion_layer(s: &str) -> Result<MedallionLayer> {
2741    match s.to_uppercase().as_str() {
2742        "BRONZE" => Ok(MedallionLayer::Bronze),
2743        "SILVER" => Ok(MedallionLayer::Silver),
2744        "GOLD" => Ok(MedallionLayer::Gold),
2745        "OPERATIONAL" => Ok(MedallionLayer::Operational),
2746        _ => Err(anyhow::anyhow!("Unknown medallion layer: {}", s)),
2747    }
2748}
2749
2750fn parse_scd_pattern(s: &str) -> Result<SCDPattern> {
2751    match s.to_uppercase().as_str() {
2752        "TYPE_1" | "TYPE1" => Ok(SCDPattern::Type1),
2753        "TYPE_2" | "TYPE2" => Ok(SCDPattern::Type2),
2754        _ => Err(anyhow::anyhow!("Unknown SCD pattern: {}", s)),
2755    }
2756}
2757
2758fn parse_data_vault_classification(s: &str) -> Result<DataVaultClassification> {
2759    match s.to_uppercase().as_str() {
2760        "HUB" => Ok(DataVaultClassification::Hub),
2761        "LINK" => Ok(DataVaultClassification::Link),
2762        "SATELLITE" | "SAT" => Ok(DataVaultClassification::Satellite),
2763        _ => Err(anyhow::anyhow!("Unknown Data Vault classification: {}", s)),
2764    }
2765}
2766
2767#[cfg(test)]
2768mod tests {
2769    use super::*;
2770
2771    #[test]
2772    fn test_parse_simple_odcl_table() {
2773        let mut parser = ODCSImporter::new();
2774        let odcl_yaml = r#"
2775name: users
2776columns:
2777  - name: id
2778    data_type: INT
2779    nullable: false
2780    primary_key: true
2781  - name: name
2782    data_type: VARCHAR(255)
2783    nullable: false
2784database_type: Postgres
2785"#;
2786
2787        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2788        assert_eq!(table.name, "users");
2789        assert_eq!(table.columns.len(), 2);
2790        assert_eq!(table.columns[0].name, "id");
2791        assert_eq!(table.database_type, Some(DatabaseType::Postgres));
2792        assert_eq!(errors.len(), 0);
2793    }
2794
2795    #[test]
2796    fn test_parse_odcl_with_metadata() {
2797        let mut parser = ODCSImporter::new();
2798        let odcl_yaml = r#"
2799name: users
2800columns:
2801  - name: id
2802    data_type: INT
2803medallion_layer: gold
2804scd_pattern: TYPE_2
2805odcl_metadata:
2806  description: "User table"
2807  owner: "data-team"
2808"#;
2809
2810        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2811        assert_eq!(table.medallion_layers.len(), 1);
2812        assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
2813        assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
2814        if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
2815            assert_eq!(desc, "User table");
2816        }
2817        assert_eq!(errors.len(), 0);
2818    }
2819
2820    #[test]
2821    fn test_parse_odcl_with_data_vault() {
2822        let mut parser = ODCSImporter::new();
2823        let odcl_yaml = r#"
2824name: hub_customer
2825columns:
2826  - name: customer_key
2827    data_type: VARCHAR(50)
2828data_vault_classification: Hub
2829"#;
2830
2831        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2832        assert_eq!(
2833            table.data_vault_classification,
2834            Some(DataVaultClassification::Hub)
2835        );
2836        assert_eq!(errors.len(), 0);
2837    }
2838
2839    #[test]
2840    fn test_parse_invalid_odcl() {
2841        let mut parser = ODCSImporter::new();
2842        let invalid_yaml = "not: valid: yaml: structure:";
2843
2844        // Should fail to parse YAML
2845        assert!(parser.parse(invalid_yaml).is_err());
2846    }
2847
2848    #[test]
2849    fn test_parse_odcl_missing_required_fields() {
2850        let mut parser = ODCSImporter::new();
2851        let non_conformant = r#"
2852name: users
2853# Missing required columns field
2854"#;
2855
2856        // Should fail with missing columns
2857        assert!(parser.parse(non_conformant).is_err());
2858    }
2859
2860    #[test]
2861    fn test_parse_odcl_with_foreign_key() {
2862        let mut parser = ODCSImporter::new();
2863        let odcl_yaml = r#"
2864name: orders
2865columns:
2866  - name: id
2867    data_type: INT
2868    primary_key: true
2869  - name: user_id
2870    data_type: INT
2871    foreign_key:
2872      table_id: users
2873      column_name: id
2874"#;
2875
2876        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2877        assert_eq!(table.columns.len(), 2);
2878        let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
2879        assert!(user_id_col.foreign_key.is_some());
2880        assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
2881        assert_eq!(errors.len(), 0);
2882    }
2883
2884    #[test]
2885    fn test_parse_odcl_with_constraints() {
2886        let mut parser = ODCSImporter::new();
2887        let odcl_yaml = r#"
2888name: products
2889columns:
2890  - name: id
2891    data_type: INT
2892    primary_key: true
2893  - name: name
2894    data_type: VARCHAR(255)
2895    nullable: false
2896    constraints:
2897      - UNIQUE
2898      - NOT NULL
2899"#;
2900
2901        let (table, errors) = parser.parse(odcl_yaml).unwrap();
2902        assert_eq!(table.columns.len(), 2);
2903        let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
2904        assert!(!name_col.nullable);
2905        assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
2906        assert_eq!(errors.len(), 0);
2907    }
2908}