data_modelling_core/import/
odcl.rs

1//! ODCL (Open Data Contract Language) parser service for parsing legacy ODCL YAML files.
2//!
3//! This service parses legacy ODCL (Data Contract Specification) YAML files and converts
4//! them to Table models. ODCL is the predecessor to ODCS (Open Data Contract Standard).
5//!
6//! Supports multiple legacy formats:
7//! - Data Contract Specification format (dataContractSpecification, models, definitions)
8//! - Simple ODCL format (name, columns)
9//!
10//! For ODCS v3.1.0/v3.0.x format, use the ODCSImporter instead.
11
12use super::odcs_shared::{
13    column_to_column_data, expand_nested_column, extract_catalog_schema, extract_quality_from_obj,
14    extract_shared_domains, json_value_to_serde_value, normalize_data_type,
15    parse_data_vault_classification, parse_foreign_key, parse_foreign_key_from_data_contract,
16    parse_medallion_layer, parse_scd_pattern, parse_struct_fields_from_string, resolve_ref,
17    yaml_to_json_value, ParserError,
18};
19use super::{ImportError, ImportResult, TableData};
20use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
21use crate::models::{Column, PropertyRelationship, Table, Tag};
22use anyhow::{Context, Result};
23use serde_json::Value as JsonValue;
24use std::collections::HashMap;
25use std::str::FromStr;
26use tracing::info;
27
28/// Convert a $ref path to a PropertyRelationship.
29/// E.g., "#/definitions/order_id" -> PropertyRelationship { type: "foreignKey", to: "definitions/order_id" }
30fn ref_to_relationships(ref_path: &Option<String>) -> Vec<PropertyRelationship> {
31    match ref_path {
32        Some(ref_str) => {
33            let to = if ref_str.starts_with("#/definitions/") {
34                let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
35                format!("definitions/{}", def_path)
36            } else if ref_str.starts_with("#/") {
37                ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
38            } else {
39                ref_str.clone()
40            };
41            vec![PropertyRelationship {
42                relationship_type: "foreignKey".to_string(),
43                to,
44            }]
45        }
46        None => Vec::new(),
47    }
48}
49
50/// ODCL parser service for parsing legacy Open Data Contract Language YAML files.
51/// Handles Data Contract Specification format and simple ODCL format.
52///
53/// For ODCS v3.1.0 format, use ODCSImporter instead.
54pub struct ODCLImporter {
55    /// Current YAML data for $ref resolution
56    current_yaml_data: Option<serde_yaml::Value>,
57}
58
59impl ODCLImporter {
60    /// Create a new ODCL parser instance.
61    ///
62    /// # Example
63    ///
64    /// ```rust
65    /// use data_modelling_core::import::odcl::ODCLImporter;
66    ///
67    /// let mut importer = ODCLImporter::new();
68    /// ```
69    pub fn new() -> Self {
70        Self {
71            current_yaml_data: None,
72        }
73    }
74
75    /// Import ODCL YAML content and create Table (SDK interface).
76    ///
77    /// Supports Data Contract Specification format and simple ODCL format.
78    ///
79    /// # Arguments
80    ///
81    /// * `yaml_content` - ODCL YAML content as a string
82    ///
83    /// # Returns
84    ///
85    /// An `ImportResult` containing the extracted table and any parse errors.
86    ///
87    /// # Example
88    ///
89    /// ```rust
90    /// use data_modelling_core::import::odcl::ODCLImporter;
91    ///
92    /// let mut importer = ODCLImporter::new();
93    /// let yaml = r#"
94    /// dataContractSpecification: 0.9.3
95    /// id: urn:datacontract:example
96    /// models:
97    ///   users:
98    ///     fields:
99    ///       id:
100    ///         type: bigint
101    /// "#;
102    /// let result = importer.import(yaml).unwrap();
103    /// assert_eq!(result.tables.len(), 1);
104    /// ```
105    pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
106        // First parse YAML to get raw data for field extraction
107        let yaml_data: serde_yaml::Value = serde_yaml::from_str(yaml_content)
108            .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
109
110        let json_data = yaml_to_json_value(&yaml_data).map_err(|e| {
111            ImportError::ParseError(format!("Failed to convert YAML to JSON: {}", e))
112        })?;
113
114        match self.parse(yaml_content) {
115            Ok((table, errors)) => {
116                // Extract contract-level fields from the raw JSON data (ODCL/Data Contract format)
117                let sdk_tables = vec![TableData {
118                    table_index: 0,
119                    id: Some(table.id.to_string()),
120                    name: Some(table.name.clone()),
121                    api_version: json_data
122                        .get("dataContractSpecification")
123                        .and_then(|v| v.as_str())
124                        .map(|s| s.to_string()),
125                    version: json_data
126                        .get("info")
127                        .and_then(|v| v.get("version"))
128                        .and_then(|v| v.as_str())
129                        .map(|s| s.to_string()),
130                    status: json_data
131                        .get("info")
132                        .and_then(|v| v.get("status"))
133                        .and_then(|v| v.as_str())
134                        .map(|s| s.to_string()),
135                    kind: Some("DataContract".to_string()),
136                    domain: json_data
137                        .get("info")
138                        .and_then(|v| v.get("domain"))
139                        .and_then(|v| v.as_str())
140                        .map(|s| s.to_string()),
141                    data_product: json_data
142                        .get("info")
143                        .and_then(|v| v.get("dataProduct"))
144                        .and_then(|v| v.as_str())
145                        .map(|s| s.to_string()),
146                    tenant: json_data
147                        .get("info")
148                        .and_then(|v| v.get("tenant"))
149                        .and_then(|v| v.as_str())
150                        .map(|s| s.to_string()),
151                    description: json_data
152                        .get("info")
153                        .and_then(|v| v.get("description"))
154                        .cloned(),
155                    // Schema-level fields (ODCL doesn't have these at schema level)
156                    physical_name: table.schema_name.clone(),
157                    physical_type: None,
158                    business_name: None,
159                    data_granularity_description: None,
160                    columns: table.columns.iter().map(column_to_column_data).collect(),
161                    servers: json_data
162                        .get("servers")
163                        .and_then(|v| v.as_array())
164                        .cloned()
165                        .unwrap_or_default(),
166                    team: json_data.get("info").and_then(|v| v.get("team")).cloned(),
167                    support: json_data
168                        .get("info")
169                        .and_then(|v| v.get("support"))
170                        .cloned(),
171                    roles: Vec::new(),
172                    sla_properties: json_data
173                        .get("servicelevels")
174                        .and_then(|v| v.as_array())
175                        .cloned()
176                        .unwrap_or_default(),
177                    quality: table.quality.clone(),
178                    price: json_data
179                        .get("info")
180                        .and_then(|v| v.get("pricing"))
181                        .cloned(),
182                    tags: table.tags.iter().map(|t| t.to_string()).collect(),
183                    custom_properties: Vec::new(),
184                    authoritative_definitions: Vec::new(),
185                    contract_created_ts: None,
186                    odcs_metadata: table.odcl_metadata.clone(),
187                }];
188                let sdk_errors: Vec<ImportError> = errors
189                    .iter()
190                    .map(|e| ImportError::ParseError(e.message.clone()))
191                    .collect();
192                Ok(ImportResult {
193                    tables: sdk_tables,
194                    tables_requiring_name: Vec::new(),
195                    errors: sdk_errors,
196                    ai_suggestions: None,
197                })
198            }
199            Err(e) => Err(ImportError::ParseError(e.to_string())),
200        }
201    }
202
203    /// Parse ODCL YAML content and create Table (public method for native app use).
204    ///
205    /// This method returns the full Table object with all metadata, suitable for use in
206    /// native applications that need direct access to the parsed table structure.
207    /// For API use, prefer the `import()` method which returns ImportResult.
208    ///
209    /// # Returns
210    ///
211    /// Returns a tuple of (Table, list of errors/warnings).
212    /// Errors list is empty if parsing is successful.
213    pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
214        self.parse(yaml_content)
215    }
216
217    /// Parse ODCL YAML content and create Table (internal method).
218    ///
219    /// Supports Data Contract Specification format and simple ODCL format.
220    ///
221    /// # Returns
222    ///
223    /// Returns a tuple of (Table, list of errors/warnings).
224    /// Errors list is empty if parsing is successful.
225    fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
226        // Parse YAML
227        let data: serde_yaml::Value =
228            serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
229
230        if data.is_null() {
231            return Err(anyhow::anyhow!("Empty YAML content"));
232        }
233
234        // Store current YAML data for $ref resolution
235        self.current_yaml_data = Some(data.clone());
236
237        // Convert to JSON Value for easier manipulation
238        let json_data = yaml_to_json_value(&data)?;
239
240        // Check format and parse accordingly
241        if self.is_data_contract_format(&json_data) {
242            return self.parse_data_contract(&json_data);
243        }
244
245        // Fall back to simple ODCL format
246        self.parse_simple_odcl(&json_data)
247    }
248
249    /// Check if this importer can handle the given YAML content.
250    ///
251    /// Returns true if the content is in ODCL format (Data Contract Specification
252    /// or simple ODCL format), false if it's in ODCS v3.x format.
253    pub fn can_handle(&self, yaml_content: &str) -> bool {
254        let data: serde_yaml::Value = match serde_yaml::from_str(yaml_content) {
255            Ok(d) => d,
256            Err(_) => return false,
257        };
258
259        let json_data = match yaml_to_json_value(&data) {
260            Ok(j) => j,
261            Err(_) => return false,
262        };
263
264        // Check if it's ODCS v3.x format (should use ODCSImporter instead)
265        if self.is_odcs_v3_format(&json_data) {
266            return false;
267        }
268
269        // Check if it's Data Contract Specification format
270        if self.is_data_contract_format(&json_data) {
271            return true;
272        }
273
274        // Check if it's simple ODCL format (has name and columns)
275        if let Some(obj) = json_data.as_object() {
276            let has_name = obj.contains_key("name");
277            let has_columns = obj.get("columns").and_then(|v| v.as_array()).is_some();
278            return has_name && has_columns;
279        }
280
281        false
282    }
283
284    /// Check if YAML is in ODCS v3.x format.
285    fn is_odcs_v3_format(&self, data: &JsonValue) -> bool {
286        if let Some(obj) = data.as_object() {
287            let has_api_version = obj.contains_key("apiVersion");
288            let has_kind = obj
289                .get("kind")
290                .and_then(|v| v.as_str())
291                .map(|s| s == "DataContract")
292                .unwrap_or(false);
293            let has_id = obj.contains_key("id");
294            let has_version = obj.contains_key("version");
295            return has_api_version && has_kind && has_id && has_version;
296        }
297        false
298    }
299
300    /// Check if YAML is in Data Contract specification format.
301    fn is_data_contract_format(&self, data: &JsonValue) -> bool {
302        if let Some(obj) = data.as_object() {
303            let has_spec = obj.contains_key("dataContractSpecification");
304            let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
305            return has_spec && has_models;
306        }
307        false
308    }
309
310    /// Parse simple ODCL format.
311    fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
312        let mut errors = Vec::new();
313
314        // Extract table name
315        let name = data
316            .get("name")
317            .and_then(|v| v.as_str())
318            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
319            .to_string();
320
321        // Extract columns
322        let columns_data = data
323            .get("columns")
324            .and_then(|v| v.as_array())
325            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
326
327        let mut columns = Vec::new();
328        for (idx, col_data) in columns_data.iter().enumerate() {
329            match self.parse_column(col_data) {
330                Ok(col) => columns.push(col),
331                Err(e) => {
332                    errors.push(ParserError {
333                        error_type: "column_parse_error".to_string(),
334                        field: format!("columns[{}]", idx),
335                        message: e.to_string(),
336                    });
337                }
338            }
339        }
340
341        // Extract metadata
342        let database_type = self.extract_database_type(data);
343        let medallion_layers = self.extract_medallion_layers(data);
344        let scd_pattern = self.extract_scd_pattern(data);
345        let data_vault_classification = self.extract_data_vault_classification(data);
346        let quality_rules = self.extract_quality_rules(data);
347
348        // Validate pattern exclusivity
349        if scd_pattern.is_some() && data_vault_classification.is_some() {
350            errors.push(ParserError {
351                error_type: "validation_error".to_string(),
352                field: "patterns".to_string(),
353                message: "SCD pattern and Data Vault classification are mutually exclusive"
354                    .to_string(),
355            });
356        }
357
358        // Extract odcl_metadata
359        let mut odcl_metadata = HashMap::new();
360        if let Some(metadata) = data.get("odcl_metadata")
361            && let Some(obj) = metadata.as_object()
362        {
363            for (key, value) in obj {
364                odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
365            }
366        }
367
368        let table_uuid = self.extract_table_uuid(data);
369
370        let table = Table {
371            id: table_uuid,
372            name,
373            columns,
374            database_type,
375            catalog_name: None,
376            schema_name: None,
377            medallion_layers,
378            scd_pattern,
379            data_vault_classification,
380            modeling_level: None,
381            tags: Vec::<Tag>::new(),
382            odcl_metadata,
383            owner: None,
384            sla: None,
385            contact_details: None,
386            infrastructure_type: None,
387            notes: None,
388            position: None,
389            yaml_file_path: None,
390            drawio_cell_id: None,
391            quality: quality_rules,
392            errors: Vec::new(),
393            created_at: chrono::Utc::now(),
394            updated_at: chrono::Utc::now(),
395        };
396
397        info!("Parsed ODCL table: {}", table.name);
398        Ok((table, errors))
399    }
400
401    /// Parse a single column definition.
402    fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
403        let name = col_data
404            .get("name")
405            .and_then(|v| v.as_str())
406            .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
407            .to_string();
408
409        let data_type = col_data
410            .get("data_type")
411            .and_then(|v| v.as_str())
412            .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
413            .to_string();
414
415        // Normalize data_type to uppercase (preserve STRUCT<...> format)
416        let data_type = normalize_data_type(&data_type);
417
418        let nullable = col_data
419            .get("nullable")
420            .and_then(|v| v.as_bool())
421            .unwrap_or(true);
422
423        let primary_key = col_data
424            .get("primary_key")
425            .and_then(|v| v.as_bool())
426            .unwrap_or(false);
427
428        let foreign_key = col_data.get("foreign_key").and_then(parse_foreign_key);
429
430        let constraints = col_data
431            .get("constraints")
432            .and_then(|v| v.as_array())
433            .map(|arr| {
434                arr.iter()
435                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
436                    .collect()
437            })
438            .unwrap_or_default();
439
440        let description = col_data
441            .get("description")
442            .and_then(|v| v.as_str())
443            .map(|s| s.to_string())
444            .unwrap_or_default();
445
446        // Extract column-level quality rules
447        let mut column_quality_rules = Vec::new();
448        if let Some(quality_val) = col_data.get("quality") {
449            if let Some(arr) = quality_val.as_array() {
450                // Array of quality rules
451                for item in arr {
452                    if let Some(obj) = item.as_object() {
453                        let mut rule = HashMap::new();
454                        for (key, value) in obj {
455                            rule.insert(key.clone(), json_value_to_serde_value(value));
456                        }
457                        column_quality_rules.push(rule);
458                    }
459                }
460            } else if let Some(obj) = quality_val.as_object() {
461                // Single quality rule object
462                let mut rule = HashMap::new();
463                for (key, value) in obj {
464                    rule.insert(key.clone(), json_value_to_serde_value(value));
465                }
466                column_quality_rules.push(rule);
467            }
468        }
469
470        Ok(Column {
471            name,
472            data_type,
473            nullable,
474            primary_key,
475            foreign_key,
476            constraints,
477            description,
478            quality: column_quality_rules,
479            ..Default::default()
480        })
481    }
482
483    /// Extract database type from data.
484    fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
485        data.get("database_type")
486            .and_then(|v| v.as_str())
487            .and_then(|s| match s.to_uppercase().as_str() {
488                "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
489                "MYSQL" => Some(DatabaseType::Mysql),
490                "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
491                "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
492                "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
493                _ => None,
494            })
495    }
496
497    /// Extract medallion layers from data.
498    fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
499        let mut layers = Vec::new();
500
501        // Check plural form first
502        if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
503            for item in arr {
504                if let Some(s) = item.as_str()
505                    && let Ok(layer) = parse_medallion_layer(s)
506                {
507                    layers.push(layer);
508                }
509            }
510        }
511        // Check singular form (backward compatibility)
512        else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
513            && let Ok(layer) = parse_medallion_layer(s)
514        {
515            layers.push(layer);
516        }
517
518        layers
519    }
520
521    /// Extract SCD pattern from data.
522    fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
523        data.get("scd_pattern")
524            .and_then(|v| v.as_str())
525            .and_then(|s| parse_scd_pattern(s).ok())
526    }
527
528    /// Extract Data Vault classification from data.
529    fn extract_data_vault_classification(
530        &self,
531        data: &JsonValue,
532    ) -> Option<DataVaultClassification> {
533        data.get("data_vault_classification")
534            .and_then(|v| v.as_str())
535            .and_then(|s| parse_data_vault_classification(s).ok())
536    }
537
538    /// Extract quality rules from data.
539    fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
540        use serde_json::Value;
541        let mut quality_rules = Vec::new();
542
543        // Check for quality field at root level (array of objects or single object)
544        if let Some(quality_val) = data.get("quality") {
545            if let Some(arr) = quality_val.as_array() {
546                // Array of quality rules
547                for item in arr {
548                    if let Some(obj) = item.as_object() {
549                        let mut rule = HashMap::new();
550                        for (key, value) in obj {
551                            rule.insert(key.clone(), json_value_to_serde_value(value));
552                        }
553                        quality_rules.push(rule);
554                    }
555                }
556            } else if let Some(obj) = quality_val.as_object() {
557                // Single quality rule object
558                let mut rule = HashMap::new();
559                for (key, value) in obj {
560                    rule.insert(key.clone(), json_value_to_serde_value(value));
561                }
562                quality_rules.push(rule);
563            } else if let Some(s) = quality_val.as_str() {
564                // Simple string quality value
565                let mut rule = HashMap::new();
566                rule.insert("value".to_string(), Value::String(s.to_string()));
567                quality_rules.push(rule);
568            }
569        }
570
571        // Check for quality in metadata (ODCL format)
572        if let Some(metadata) = data.get("metadata")
573            && let Some(metadata_obj) = metadata.as_object()
574            && let Some(quality_val) = metadata_obj.get("quality")
575        {
576            if let Some(arr) = quality_val.as_array() {
577                // Array of quality rules
578                for item in arr {
579                    if let Some(obj) = item.as_object() {
580                        let mut rule = HashMap::new();
581                        for (key, value) in obj {
582                            rule.insert(key.clone(), json_value_to_serde_value(value));
583                        }
584                        quality_rules.push(rule);
585                    }
586                }
587            } else if let Some(obj) = quality_val.as_object() {
588                // Single quality rule object
589                let mut rule = HashMap::new();
590                for (key, value) in obj {
591                    rule.insert(key.clone(), json_value_to_serde_value(value));
592                }
593                quality_rules.push(rule);
594            } else if let Some(s) = quality_val.as_str() {
595                // Simple string quality value
596                let mut rule = HashMap::new();
597                rule.insert("value".to_string(), Value::String(s.to_string()));
598                quality_rules.push(rule);
599            }
600        }
601
602        // Check for tblproperties field (similar to SQL TBLPROPERTIES)
603        if let Some(tblprops) = data.get("tblproperties")
604            && let Some(obj) = tblprops.as_object()
605        {
606            for (key, value) in obj {
607                let mut rule = HashMap::new();
608                rule.insert("property".to_string(), Value::String(key.clone()));
609                rule.insert("value".to_string(), json_value_to_serde_value(value));
610                quality_rules.push(rule);
611            }
612        }
613
614        quality_rules
615    }
616
617    /// Parse Data Contract format.
618    fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
619        let mut errors = Vec::new();
620
621        // Extract models
622        let models = data
623            .get("models")
624            .and_then(|v| v.as_object())
625            .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
626
627        // parse_table() returns a single Table, so we parse the first model.
628        // If multiple models are needed, call parse_table() multiple times or use import().
629        let (model_name, model_data) = models
630            .iter()
631            .next()
632            .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
633
634        let model_data = model_data
635            .as_object()
636            .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
637
638        // Extract fields (columns)
639        let fields = model_data
640            .get("fields")
641            .and_then(|v| v.as_object())
642            .ok_or_else(|| {
643                errors.push(ParserError {
644                    error_type: "validation_error".to_string(),
645                    field: format!("Model '{}'", model_name),
646                    message: format!("Model '{}' missing 'fields' field", model_name),
647                });
648                anyhow::anyhow!("Missing fields")
649            });
650
651        let fields = match fields {
652            Ok(f) => f,
653            Err(_) => {
654                // Return empty table with errors
655                // Extract quality rules from both root and model level
656                let mut quality_rules = self.extract_quality_rules(data);
657                let model_data_value = JsonValue::Object(model_data.clone());
658                let model_quality_rules = self.extract_quality_rules(&model_data_value);
659                quality_rules.extend(model_quality_rules);
660                let table_uuid = self.extract_table_uuid(data);
661                let table = Table {
662                    id: table_uuid,
663                    name: model_name.clone(),
664                    columns: Vec::new(),
665                    database_type: None,
666                    catalog_name: None,
667                    schema_name: None,
668                    medallion_layers: Vec::new(),
669                    scd_pattern: None,
670                    data_vault_classification: None,
671                    modeling_level: None,
672                    tags: Vec::<Tag>::new(),
673                    odcl_metadata: HashMap::new(),
674                    owner: None,
675                    sla: None,
676                    contact_details: None,
677                    infrastructure_type: None,
678                    notes: None,
679                    position: None,
680                    yaml_file_path: None,
681                    drawio_cell_id: None,
682                    quality: quality_rules,
683                    errors: Vec::new(),
684                    created_at: chrono::Utc::now(),
685                    updated_at: chrono::Utc::now(),
686                };
687                return Ok((table, errors));
688            }
689        };
690
691        // Parse fields as columns
692        let mut columns = Vec::new();
693        for (field_name, field_data) in fields {
694            if let Some(field_obj) = field_data.as_object() {
695                match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
696                    Ok(mut cols) => columns.append(&mut cols),
697                    Err(e) => {
698                        errors.push(ParserError {
699                            error_type: "field_parse_error".to_string(),
700                            field: format!("Field '{}'", field_name),
701                            message: e.to_string(),
702                        });
703                    }
704                }
705            } else {
706                errors.push(ParserError {
707                    error_type: "validation_error".to_string(),
708                    field: format!("Field '{}'", field_name),
709                    message: format!("Field '{}' must be an object", field_name),
710                });
711            }
712        }
713
714        // Extract metadata from info section
715        let mut odcl_metadata = HashMap::new();
716
717        // Extract info section and nest it properly
718        if let Some(info_val) = data.get("info") {
719            let info_json_value = json_value_to_serde_value(info_val);
720            odcl_metadata.insert("info".to_string(), info_json_value);
721        }
722
723        odcl_metadata.insert(
724            "dataContractSpecification".to_string(),
725            json_value_to_serde_value(
726                data.get("dataContractSpecification")
727                    .unwrap_or(&JsonValue::Null),
728            ),
729        );
730        odcl_metadata.insert(
731            "id".to_string(),
732            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
733        );
734
735        // Extract servicelevels if present
736        if let Some(servicelevels_val) = data.get("servicelevels") {
737            odcl_metadata.insert(
738                "servicelevels".to_string(),
739                json_value_to_serde_value(servicelevels_val),
740            );
741        }
742
743        // Extract links if present
744        if let Some(links_val) = data.get("links") {
745            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
746        }
747
748        // Extract domain, dataProduct, tenant
749        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
750            odcl_metadata.insert(
751                "domain".to_string(),
752                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
753            );
754        }
755        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
756            odcl_metadata.insert(
757                "dataProduct".to_string(),
758                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
759            );
760        }
761        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
762            odcl_metadata.insert(
763                "tenant".to_string(),
764                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
765            );
766        }
767
768        // Extract top-level description (can be object or string)
769        if let Some(desc_val) = data.get("description") {
770            odcl_metadata.insert(
771                "description".to_string(),
772                json_value_to_serde_value(desc_val),
773            );
774        }
775
776        // Extract pricing
777        if let Some(pricing_val) = data.get("pricing") {
778            odcl_metadata.insert(
779                "pricing".to_string(),
780                json_value_to_serde_value(pricing_val),
781            );
782        }
783
784        // Extract team
785        if let Some(team_val) = data.get("team") {
786            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
787        }
788
789        // Extract roles
790        if let Some(roles_val) = data.get("roles") {
791            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
792        }
793
794        // Extract terms
795        if let Some(terms_val) = data.get("terms") {
796            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
797        }
798
799        // Extract full servers array (not just type)
800        if let Some(servers_val) = data.get("servers") {
801            odcl_metadata.insert(
802                "servers".to_string(),
803                json_value_to_serde_value(servers_val),
804            );
805        }
806
807        // Extract infrastructure
808        if let Some(infrastructure_val) = data.get("infrastructure") {
809            odcl_metadata.insert(
810                "infrastructure".to_string(),
811                json_value_to_serde_value(infrastructure_val),
812            );
813        }
814
815        // Extract database type from servers if available
816        let database_type = self.extract_database_type_from_servers(data);
817
818        // Extract catalog and schema from customProperties
819        let (catalog_name, schema_name) = extract_catalog_schema(data);
820
821        // Extract sharedDomains from customProperties
822        let shared_domains = extract_shared_domains(data);
823
824        // Extract tags from top-level tags field (Data Contract format)
825        let mut tags: Vec<Tag> = Vec::new();
826        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
827            for item in tags_arr {
828                if let Some(s) = item.as_str() {
829                    // Parse tag string to Tag enum (supports Simple, Pair, List formats)
830                    if let Ok(tag) = Tag::from_str(s) {
831                        tags.push(tag);
832                    } else {
833                        // Fallback: create Simple tag if parsing fails
834                        tags.push(crate::models::Tag::Simple(s.to_string()));
835                    }
836                }
837            }
838        }
839
840        // Extract quality rules from both root level and model level
841        // Root-level quality rules (contract-level)
842        let mut quality_rules = self.extract_quality_rules(data);
843
844        // Model-level quality rules (from models.<name>.quality)
845        // This is the primary location for quality rules in Data Contract Specification format
846        let model_data_value = JsonValue::Object(model_data.clone());
847        let model_quality_rules = self.extract_quality_rules(&model_data_value);
848        quality_rules.extend(model_quality_rules);
849
850        // Store sharedDomains in metadata
851        if !shared_domains.is_empty() {
852            let shared_domains_json: Vec<serde_json::Value> = shared_domains
853                .iter()
854                .map(|d| serde_json::Value::String(d.clone()))
855                .collect();
856            odcl_metadata.insert(
857                "sharedDomains".to_string(),
858                serde_json::Value::Array(shared_domains_json),
859            );
860        }
861
862        let table_uuid = self.extract_table_uuid(data);
863
864        let table = Table {
865            id: table_uuid,
866            name: model_name.clone(),
867            columns,
868            database_type,
869            catalog_name,
870            schema_name,
871            medallion_layers: Vec::new(),
872            scd_pattern: None,
873            data_vault_classification: None,
874            modeling_level: None,
875            tags,
876            odcl_metadata,
877            owner: None,
878            sla: None,
879            contact_details: None,
880            infrastructure_type: None,
881            notes: None,
882            position: None,
883            yaml_file_path: None,
884            drawio_cell_id: None,
885            quality: quality_rules,
886            errors: Vec::new(),
887            created_at: chrono::Utc::now(),
888            updated_at: chrono::Utc::now(),
889        };
890
891        info!(
892            "Parsed Data Contract table: {} with {} warnings/errors",
893            model_name,
894            errors.len()
895        );
896        Ok((table, errors))
897    }
898
899    /// Parse a single field from Data Contract format.
900    fn parse_data_contract_field(
901        &self,
902        field_name: &str,
903        field_data: &serde_json::Map<String, JsonValue>,
904        data: &JsonValue,
905        errors: &mut Vec<ParserError>,
906    ) -> Result<Vec<Column>> {
907        let mut columns = Vec::new();
908
909        // Extract description from field_data (preserve empty strings)
910        let description = field_data
911            .get("description")
912            .and_then(|v| v.as_str())
913            .unwrap_or("")
914            .to_string();
915
916        // Extract quality rules from field_data
917        let mut quality_rules = extract_quality_from_obj(field_data);
918
919        // Check for $ref
920        if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
921            // Store ref_path (preserve even if definition doesn't exist)
922            let ref_path = Some(ref_str.to_string());
923
924            if let Some(definition) = resolve_ref(ref_str, data) {
925                // Also extract quality rules from definition and merge (if field doesn't have any)
926                if quality_rules.is_empty() {
927                    if let Some(def_obj) = definition.as_object() {
928                        quality_rules = extract_quality_from_obj(def_obj);
929                    }
930                } else {
931                    // Merge definition quality rules if field has some
932                    if let Some(def_obj) = definition.as_object() {
933                        let def_quality = extract_quality_from_obj(def_obj);
934                        // Append definition quality rules (field-level takes precedence)
935                        quality_rules.extend(def_quality);
936                    }
937                }
938
939                let required = field_data
940                    .get("required")
941                    .and_then(|v| v.as_bool())
942                    .unwrap_or(false);
943
944                // Check if definition is an object/struct with nested structure
945                let has_nested = definition
946                    .get("type")
947                    .and_then(|v| v.as_str())
948                    .map(|s| s == "object")
949                    .unwrap_or(false)
950                    || definition.get("properties").is_some()
951                    || definition.get("fields").is_some();
952
953                if has_nested {
954                    // Expand STRUCT from definition into nested columns with dot notation
955                    if let Some(properties) =
956                        definition.get("properties").and_then(|v| v.as_object())
957                    {
958                        // Recursively expand nested properties
959                        let nested_required: Vec<String> = definition
960                            .get("required")
961                            .and_then(|v| v.as_array())
962                            .map(|arr| {
963                                arr.iter()
964                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
965                                    .collect()
966                            })
967                            .unwrap_or_default();
968
969                        for (nested_name, nested_schema) in properties {
970                            let nested_required_field = nested_required.contains(nested_name);
971                            expand_nested_column(
972                                &format!("{}.{}", field_name, nested_name),
973                                nested_schema,
974                                !nested_required_field,
975                                &mut columns,
976                                errors,
977                            );
978                        }
979                    } else if let Some(fields) =
980                        definition.get("fields").and_then(|v| v.as_object())
981                    {
982                        // Handle fields format (ODCL style)
983                        for (nested_name, nested_schema) in fields {
984                            expand_nested_column(
985                                &format!("{}.{}", field_name, nested_name),
986                                nested_schema,
987                                true, // Assume nullable if not specified
988                                &mut columns,
989                                errors,
990                            );
991                        }
992                    } else {
993                        // Fallback: create parent column as OBJECT if we can't expand
994                        columns.push(Column {
995                            name: field_name.to_string(),
996                            data_type: "OBJECT".to_string(),
997                            nullable: !required,
998                            description: if description.is_empty() {
999                                definition
1000                                    .get("description")
1001                                    .and_then(|v| v.as_str())
1002                                    .unwrap_or("")
1003                                    .to_string()
1004                            } else {
1005                                description.clone()
1006                            },
1007                            quality: quality_rules.clone(),
1008                            relationships: ref_to_relationships(&ref_path),
1009                            ..Default::default()
1010                        });
1011                    }
1012                } else {
1013                    // Simple type from definition
1014                    let def_type = definition
1015                        .get("type")
1016                        .and_then(|v| v.as_str())
1017                        .unwrap_or("STRING")
1018                        .to_uppercase();
1019
1020                    let enum_values = definition
1021                        .get("enum")
1022                        .and_then(|v| v.as_array())
1023                        .map(|arr| {
1024                            arr.iter()
1025                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
1026                                .collect()
1027                        })
1028                        .unwrap_or_default();
1029
1030                    columns.push(Column {
1031                        name: field_name.to_string(),
1032                        data_type: def_type,
1033                        nullable: !required,
1034                        description: if description.is_empty() {
1035                            definition
1036                                .get("description")
1037                                .and_then(|v| v.as_str())
1038                                .unwrap_or("")
1039                                .to_string()
1040                        } else {
1041                            description
1042                        },
1043                        quality: quality_rules,
1044                        relationships: ref_to_relationships(&ref_path),
1045                        enum_values,
1046                        ..Default::default()
1047                    });
1048                }
1049                return Ok(columns);
1050            } else {
1051                // Undefined reference - create column with error
1052                let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
1053                let mut error_map = HashMap::new();
1054                error_map.insert("type".to_string(), serde_json::json!("validation_error"));
1055                error_map.insert("field".to_string(), serde_json::json!("data_type"));
1056                error_map.insert(
1057                    "message".to_string(),
1058                    serde_json::json!(format!(
1059                        "Field '{}' references undefined definition: {}",
1060                        field_name, ref_str
1061                    )),
1062                );
1063                col_errors.push(error_map);
1064                columns.push(Column {
1065                    name: field_name.to_string(),
1066                    data_type: "OBJECT".to_string(),
1067                    description,
1068                    errors: col_errors,
1069                    relationships: ref_to_relationships(&Some(ref_str.to_string())),
1070                    ..Default::default()
1071                });
1072                return Ok(columns);
1073            }
1074        }
1075
1076        // Extract field type - check both "logicalType" (ODCS v3.1.0) and "type" (legacy)
1077        // Default to STRING if missing
1078        let field_type_str = field_data
1079            .get("logicalType")
1080            .and_then(|v| v.as_str())
1081            .or_else(|| field_data.get("type").and_then(|v| v.as_str()))
1082            .unwrap_or("STRING");
1083
1084        // Check if type contains STRUCT definition (multiline STRUCT type)
1085        if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
1086            match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
1087                Ok(nested_cols) if !nested_cols.is_empty() => {
1088                    // We have nested columns - add parent column with full type, then nested columns
1089                    let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
1090                        "ARRAY<STRUCT<...>>".to_string()
1091                    } else {
1092                        "STRUCT<...>".to_string()
1093                    };
1094
1095                    // Add parent column
1096                    columns.push(Column {
1097                        name: field_name.to_string(),
1098                        data_type: parent_data_type,
1099                        nullable: !field_data
1100                            .get("required")
1101                            .and_then(|v| v.as_bool())
1102                            .unwrap_or(false),
1103                        description: description.clone(),
1104                        quality: quality_rules.clone(),
1105                        relationships: ref_to_relationships(
1106                            &field_data
1107                                .get("$ref")
1108                                .and_then(|v| v.as_str())
1109                                .map(|s| s.to_string()),
1110                        ),
1111                        ..Default::default()
1112                    });
1113
1114                    // Add nested columns
1115                    columns.extend(nested_cols);
1116                    return Ok(columns);
1117                }
1118                Ok(_) | Err(_) => {
1119                    // If parsing fails or returns empty, fall back to using the type as-is
1120                }
1121            }
1122        }
1123
1124        let field_type = normalize_data_type(field_type_str);
1125
1126        // Handle ARRAY type
1127        if field_type == "ARRAY" {
1128            let items = field_data.get("items");
1129            if let Some(items_val) = items {
1130                if let Some(items_obj) = items_val.as_object() {
1131                    // Check if items is an object with fields (nested structure)
1132                    let items_type = items_obj
1133                        .get("logicalType")
1134                        .and_then(|v| v.as_str())
1135                        .or_else(|| items_obj.get("type").and_then(|v| v.as_str()));
1136
1137                    // Normalize legacy "type" values to "logicalType" equivalents
1138                    let normalized_items_type = match items_type {
1139                        Some("object") | Some("struct") => Some("object"),
1140                        Some("array") => Some("array"),
1141                        Some("string") | Some("varchar") | Some("char") | Some("text") => {
1142                            Some("string")
1143                        }
1144                        Some("integer") | Some("int") | Some("bigint") | Some("smallint")
1145                        | Some("tinyint") => Some("integer"),
1146                        Some("number") | Some("decimal") | Some("double") | Some("float")
1147                        | Some("numeric") => Some("number"),
1148                        Some("boolean") | Some("bool") => Some("boolean"),
1149                        Some("date") => Some("date"),
1150                        Some("timestamp") | Some("datetime") => Some("timestamp"),
1151                        Some("time") => Some("time"),
1152                        other => other,
1153                    };
1154
1155                    if items_obj.get("fields").is_some()
1156                        || items_obj.get("properties").is_some()
1157                        || normalized_items_type == Some("object")
1158                    {
1159                        // Array of objects - create parent column as ARRAY<OBJECT>
1160                        columns.push(Column {
1161                            name: field_name.to_string(),
1162                            data_type: "ARRAY<OBJECT>".to_string(),
1163                            nullable: !field_data
1164                                .get("required")
1165                                .and_then(|v| v.as_bool())
1166                                .unwrap_or(false),
1167                            description: field_data
1168                                .get("description")
1169                                .and_then(|v| v.as_str())
1170                                .unwrap_or("")
1171                                .to_string(),
1172                            ..Default::default()
1173                        });
1174
1175                        // Extract nested fields from items.properties or items.fields if present
1176                        let properties_obj =
1177                            items_obj.get("properties").and_then(|v| v.as_object());
1178                        let fields_obj = items_obj.get("fields").and_then(|v| v.as_object());
1179
1180                        if let Some(fields_map) = properties_obj.or(fields_obj) {
1181                            for (nested_field_name, nested_field_data) in fields_map {
1182                                if let Some(nested_field_obj) = nested_field_data.as_object() {
1183                                    let nested_field_type = nested_field_obj
1184                                        .get("logicalType")
1185                                        .and_then(|v| v.as_str())
1186                                        .or_else(|| {
1187                                            nested_field_obj.get("type").and_then(|v| v.as_str())
1188                                        })
1189                                        .unwrap_or("STRING");
1190
1191                                    // Recursively parse nested fields with array prefix
1192                                    let nested_col_name =
1193                                        format!("{}.[].{}", field_name, nested_field_name);
1194                                    let mut local_errors = Vec::new();
1195                                    match self.parse_data_contract_field(
1196                                        &nested_col_name,
1197                                        nested_field_obj,
1198                                        data,
1199                                        &mut local_errors,
1200                                    ) {
1201                                        Ok(mut nested_cols) => {
1202                                            columns.append(&mut nested_cols);
1203                                        }
1204                                        Err(_) => {
1205                                            // Fallback: create simple nested column
1206                                            columns.push(Column {
1207                                                name: nested_col_name,
1208                                                data_type: nested_field_type.to_uppercase(),
1209                                                nullable: !nested_field_obj
1210                                                    .get("required")
1211                                                    .and_then(|v| v.as_bool())
1212                                                    .unwrap_or(false),
1213                                                description: nested_field_obj
1214                                                    .get("description")
1215                                                    .and_then(|v| v.as_str())
1216                                                    .unwrap_or("")
1217                                                    .to_string(),
1218                                                ..Default::default()
1219                                            });
1220                                        }
1221                                    }
1222                                }
1223                            }
1224                        }
1225
1226                        return Ok(columns);
1227                    } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
1228                        // Array of simple type
1229                        columns.push(Column {
1230                            name: field_name.to_string(),
1231                            data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
1232                            nullable: !field_data
1233                                .get("required")
1234                                .and_then(|v| v.as_bool())
1235                                .unwrap_or(false),
1236                            description: description.clone(),
1237                            quality: quality_rules.clone(),
1238                            relationships: ref_to_relationships(
1239                                &field_data
1240                                    .get("$ref")
1241                                    .and_then(|v| v.as_str())
1242                                    .map(|s| s.to_string()),
1243                            ),
1244                            ..Default::default()
1245                        });
1246                        return Ok(columns);
1247                    }
1248                } else if let Some(item_type_str) = items_val.as_str() {
1249                    // Array of simple type (string)
1250                    columns.push(Column {
1251                        name: field_name.to_string(),
1252                        data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
1253                        nullable: !field_data
1254                            .get("required")
1255                            .and_then(|v| v.as_bool())
1256                            .unwrap_or(false),
1257                        description: description.clone(),
1258                        quality: quality_rules.clone(),
1259                        relationships: ref_to_relationships(
1260                            &field_data
1261                                .get("$ref")
1262                                .and_then(|v| v.as_str())
1263                                .map(|s| s.to_string()),
1264                        ),
1265                        ..Default::default()
1266                    });
1267                    return Ok(columns);
1268                }
1269            }
1270            // Array without items - default to ARRAY<STRING>
1271            columns.push(Column {
1272                name: field_name.to_string(),
1273                data_type: "ARRAY<STRING>".to_string(),
1274                nullable: !field_data
1275                    .get("required")
1276                    .and_then(|v| v.as_bool())
1277                    .unwrap_or(false),
1278                description: description.clone(),
1279                quality: quality_rules.clone(),
1280                relationships: ref_to_relationships(
1281                    &field_data
1282                        .get("$ref")
1283                        .and_then(|v| v.as_str())
1284                        .map(|s| s.to_string()),
1285                ),
1286                ..Default::default()
1287            });
1288            return Ok(columns);
1289        }
1290
1291        // Check if this is a nested object with fields or properties
1292        let nested_fields_obj = field_data
1293            .get("properties")
1294            .and_then(|v| v.as_object())
1295            .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
1296
1297        if field_type == "OBJECT" && nested_fields_obj.is_some() {
1298            // Inline nested object - create parent column as OBJECT and extract nested fields
1299            columns.push(Column {
1300                name: field_name.to_string(),
1301                data_type: "OBJECT".to_string(),
1302                nullable: !field_data
1303                    .get("required")
1304                    .and_then(|v| v.as_bool())
1305                    .unwrap_or(false),
1306                description: description.clone(),
1307                quality: quality_rules.clone(),
1308                relationships: ref_to_relationships(
1309                    &field_data
1310                        .get("$ref")
1311                        .and_then(|v| v.as_str())
1312                        .map(|s| s.to_string()),
1313                ),
1314                ..Default::default()
1315            });
1316
1317            // Extract nested fields recursively
1318            if let Some(fields_obj) = nested_fields_obj {
1319                for (nested_field_name, nested_field_data) in fields_obj {
1320                    if let Some(nested_field_obj) = nested_field_data.as_object() {
1321                        let nested_field_type = nested_field_obj
1322                            .get("logicalType")
1323                            .and_then(|v| v.as_str())
1324                            .or_else(|| nested_field_obj.get("type").and_then(|v| v.as_str()))
1325                            .unwrap_or("STRING");
1326
1327                        // Recursively parse nested fields
1328                        let nested_col_name = format!("{}.{}", field_name, nested_field_name);
1329                        match self.parse_data_contract_field(
1330                            &nested_col_name,
1331                            nested_field_obj,
1332                            data,
1333                            errors,
1334                        ) {
1335                            Ok(mut nested_cols) => {
1336                                columns.append(&mut nested_cols);
1337                            }
1338                            Err(_) => {
1339                                // Fallback: create simple nested column
1340                                columns.push(Column {
1341                                    name: nested_col_name,
1342                                    data_type: nested_field_type.to_uppercase(),
1343                                    nullable: !nested_field_obj
1344                                        .get("required")
1345                                        .and_then(|v| v.as_bool())
1346                                        .unwrap_or(false),
1347                                    description: nested_field_obj
1348                                        .get("description")
1349                                        .and_then(|v| v.as_str())
1350                                        .unwrap_or("")
1351                                        .to_string(),
1352                                    ..Default::default()
1353                                });
1354                            }
1355                        }
1356                    }
1357                }
1358            }
1359
1360            return Ok(columns);
1361        }
1362
1363        // Regular field (no $ref or $ref not found)
1364        let ref_path = field_data
1365            .get("$ref")
1366            .and_then(|v| v.as_str())
1367            .map(|s| s.to_string());
1368
1369        let required = field_data
1370            .get("required")
1371            .and_then(|v| v.as_bool())
1372            .unwrap_or(false);
1373
1374        let field_description = if description.is_empty() {
1375            field_data
1376                .get("description")
1377                .and_then(|v| v.as_str())
1378                .unwrap_or("")
1379                .to_string()
1380        } else {
1381            description
1382        };
1383
1384        // Extract column-level quality rules if not already extracted
1385        let mut column_quality_rules = quality_rules;
1386        if column_quality_rules.is_empty()
1387            && let Some(quality_val) = field_data.get("quality")
1388        {
1389            if let Some(arr) = quality_val.as_array() {
1390                for item in arr {
1391                    if let Some(obj) = item.as_object() {
1392                        let mut rule = HashMap::new();
1393                        for (key, value) in obj {
1394                            rule.insert(key.clone(), json_value_to_serde_value(value));
1395                        }
1396                        column_quality_rules.push(rule);
1397                    }
1398                }
1399            } else if let Some(obj) = quality_val.as_object() {
1400                let mut rule = HashMap::new();
1401                for (key, value) in obj {
1402                    rule.insert(key.clone(), json_value_to_serde_value(value));
1403                }
1404                column_quality_rules.push(rule);
1405            }
1406        }
1407
1408        columns.push(Column {
1409            name: field_name.to_string(),
1410            data_type: field_type,
1411            nullable: !required,
1412            primary_key: field_data
1413                .get("primaryKey")
1414                .and_then(|v| v.as_bool())
1415                .unwrap_or(false),
1416            foreign_key: parse_foreign_key_from_data_contract(field_data),
1417            description: field_description,
1418            quality: column_quality_rules,
1419            relationships: ref_to_relationships(&ref_path),
1420            ..Default::default()
1421        });
1422
1423        Ok(columns)
1424    }
1425
1426    /// Extract database type from servers in Data Contract format.
1427    fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
1428        // Data Contract format: servers can be object or array
1429        if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
1430            // Object format: { "server_name": { "type": "..." } }
1431            if let Some((_, server_data)) = servers_obj.iter().next()
1432                && let Some(server_obj) = server_data.as_object()
1433            {
1434                return server_obj
1435                    .get("type")
1436                    .and_then(|v| v.as_str())
1437                    .and_then(|s| self.parse_database_type(s));
1438            }
1439        } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
1440            // Array format: [ { "server": "...", "type": "..." } ]
1441            if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
1442                return server_obj
1443                    .get("type")
1444                    .and_then(|v| v.as_str())
1445                    .and_then(|s| self.parse_database_type(s));
1446            }
1447        }
1448        None
1449    }
1450
1451    /// Parse database type string to enum.
1452    fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
1453        match s.to_lowercase().as_str() {
1454            "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
1455            "postgres" | "postgresql" => Some(DatabaseType::Postgres),
1456            "mysql" => Some(DatabaseType::Mysql),
1457            "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
1458            "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
1459            _ => None,
1460        }
1461    }
1462
1463    /// Extract table UUID from ODCL `id` field or fallback.
1464    fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
1465        // First check the top-level `id` field
1466        if let Some(id_val) = data.get("id")
1467            && let Some(id_str) = id_val.as_str()
1468            && let Ok(uuid) = uuid::Uuid::parse_str(id_str)
1469        {
1470            tracing::debug!(
1471                "[ODCLImporter] Extracted UUID from top-level 'id' field: {}",
1472                uuid
1473            );
1474            return uuid;
1475        }
1476
1477        // Backward compatibility: check customProperties for tableUuid (legacy format)
1478        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1479            for prop in custom_props {
1480                if let Some(prop_obj) = prop.as_object() {
1481                    let prop_key = prop_obj
1482                        .get("property")
1483                        .and_then(|v| v.as_str())
1484                        .unwrap_or("");
1485                    if prop_key == "tableUuid"
1486                        && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
1487                        && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1488                    {
1489                        tracing::debug!(
1490                            "[ODCLImporter] Extracted UUID from customProperties.tableUuid: {}",
1491                            uuid
1492                        );
1493                        return uuid;
1494                    }
1495                }
1496            }
1497        }
1498
1499        // Fallback: check odcl_metadata if present (legacy format)
1500        if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
1501            && let Some(uuid_val) = metadata.get("tableUuid")
1502            && let Some(uuid_str) = uuid_val.as_str()
1503            && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
1504        {
1505            tracing::debug!(
1506                "[ODCLImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
1507                uuid
1508            );
1509            return uuid;
1510        }
1511
1512        // Generate deterministic UUID v5 if not found (based on table name)
1513        let table_name = data
1514            .get("name")
1515            .and_then(|v| v.as_str())
1516            .unwrap_or("unknown");
1517        let new_uuid = crate::models::table::Table::generate_id(table_name, None, None, None);
1518        tracing::warn!(
1519            "[ODCLImporter] No UUID found for table '{}', generating deterministic UUID: {}",
1520            table_name,
1521            new_uuid
1522        );
1523        new_uuid
1524    }
1525
1526    /// Parse STRUCT type from string and create nested columns.
1527    #[allow(clippy::only_used_in_recursion)]
1528    fn parse_struct_type_from_string(
1529        &self,
1530        field_name: &str,
1531        type_str: &str,
1532        field_data: &serde_json::Map<String, JsonValue>,
1533    ) -> Result<Vec<Column>> {
1534        let mut columns = Vec::new();
1535
1536        // Normalize whitespace
1537        let normalized_type = type_str
1538            .lines()
1539            .map(|line| line.trim())
1540            .filter(|line| !line.is_empty())
1541            .collect::<Vec<_>>()
1542            .join(" ");
1543
1544        let type_str_upper = normalized_type.to_uppercase();
1545
1546        // Check if it's ARRAY<STRUCT<...>>
1547        let is_array = type_str_upper.starts_with("ARRAY<");
1548        let struct_start = type_str_upper.find("STRUCT<");
1549
1550        if let Some(start_pos) = struct_start {
1551            let struct_content_start = start_pos + 7; // Skip "STRUCT<"
1552            let struct_content = &normalized_type[struct_content_start..];
1553
1554            // Find matching closing bracket for STRUCT<
1555            let mut depth = 1;
1556            let mut end_pos = None;
1557            for (i, ch) in struct_content.char_indices() {
1558                match ch {
1559                    '<' => depth += 1,
1560                    '>' => {
1561                        depth -= 1;
1562                        if depth == 0 {
1563                            end_pos = Some(i);
1564                            break;
1565                        }
1566                    }
1567                    _ => {}
1568                }
1569            }
1570
1571            let struct_fields_str = if let Some(end) = end_pos {
1572                &struct_content[..end]
1573            } else {
1574                struct_content.trim_end_matches('>').trim()
1575            };
1576
1577            // Parse fields: "ID: STRING, NAME: STRING, ..."
1578            let fields = parse_struct_fields_from_string(struct_fields_str)?;
1579
1580            // Create nested columns
1581            for (nested_name, nested_type) in fields {
1582                let nested_type_upper = nested_type.to_uppercase();
1583                let nested_col_name = if is_array {
1584                    format!("{}.[].{}", field_name, nested_name)
1585                } else {
1586                    format!("{}.{}", field_name, nested_name)
1587                };
1588
1589                let is_nested_struct = nested_type_upper.starts_with("STRUCT<");
1590                let is_nested_array_struct = nested_type_upper.starts_with("ARRAY<STRUCT<");
1591
1592                if is_nested_struct || is_nested_array_struct {
1593                    // Recursively parse nested STRUCT or ARRAY<STRUCT>
1594                    match self.parse_struct_type_from_string(
1595                        &nested_col_name,
1596                        &nested_type,
1597                        field_data,
1598                    ) {
1599                        Ok(nested_cols) => {
1600                            columns.extend(nested_cols);
1601                        }
1602                        Err(_) => {
1603                            let fallback_data_type = if is_nested_array_struct {
1604                                "ARRAY<STRUCT<...>>".to_string()
1605                            } else {
1606                                "STRUCT<...>".to_string()
1607                            };
1608                            columns.push(Column {
1609                                name: nested_col_name,
1610                                data_type: fallback_data_type,
1611                                nullable: !field_data
1612                                    .get("required")
1613                                    .and_then(|v| v.as_bool())
1614                                    .unwrap_or(false),
1615                                description: field_data
1616                                    .get("description")
1617                                    .and_then(|v| v.as_str())
1618                                    .unwrap_or("")
1619                                    .to_string(),
1620                                ..Default::default()
1621                            });
1622                        }
1623                    }
1624                } else if nested_type_upper.starts_with("ARRAY<") {
1625                    columns.push(Column {
1626                        name: nested_col_name,
1627                        data_type: normalize_data_type(&nested_type),
1628                        nullable: !field_data
1629                            .get("required")
1630                            .and_then(|v| v.as_bool())
1631                            .unwrap_or(false),
1632                        description: field_data
1633                            .get("description")
1634                            .and_then(|v| v.as_str())
1635                            .unwrap_or("")
1636                            .to_string(),
1637                        ..Default::default()
1638                    });
1639                } else {
1640                    // Simple nested field
1641                    columns.push(Column {
1642                        name: nested_col_name,
1643                        data_type: normalize_data_type(&nested_type),
1644                        nullable: !field_data
1645                            .get("required")
1646                            .and_then(|v| v.as_bool())
1647                            .unwrap_or(false),
1648                        description: field_data
1649                            .get("description")
1650                            .and_then(|v| v.as_str())
1651                            .unwrap_or("")
1652                            .to_string(),
1653                        ..Default::default()
1654                    });
1655                }
1656            }
1657
1658            return Ok(columns);
1659        }
1660
1661        // If no STRUCT found, return empty (fallback to regular parsing)
1662        Ok(Vec::new())
1663    }
1664}
1665
1666impl Default for ODCLImporter {
1667    fn default() -> Self {
1668        Self::new()
1669    }
1670}
1671
1672#[cfg(test)]
1673mod tests {
1674    use super::*;
1675
1676    #[test]
1677    fn test_parse_simple_odcl_table() {
1678        let mut parser = ODCLImporter::new();
1679        let odcl_yaml = r#"
1680name: users
1681columns:
1682  - name: id
1683    data_type: INT
1684    nullable: false
1685    primary_key: true
1686  - name: name
1687    data_type: VARCHAR(255)
1688    nullable: false
1689database_type: Postgres
1690"#;
1691
1692        let (table, errors) = parser.parse(odcl_yaml).unwrap();
1693        assert_eq!(table.name, "users");
1694        assert_eq!(table.columns.len(), 2);
1695        assert_eq!(table.columns[0].name, "id");
1696        assert_eq!(table.database_type, Some(DatabaseType::Postgres));
1697        assert_eq!(errors.len(), 0);
1698    }
1699
1700    #[test]
1701    fn test_parse_odcl_with_metadata() {
1702        let mut parser = ODCLImporter::new();
1703        let odcl_yaml = r#"
1704name: users
1705columns:
1706  - name: id
1707    data_type: INT
1708medallion_layer: gold
1709scd_pattern: TYPE_2
1710odcl_metadata:
1711  description: "User table"
1712  owner: "data-team"
1713"#;
1714
1715        let (table, errors) = parser.parse(odcl_yaml).unwrap();
1716        assert_eq!(table.medallion_layers.len(), 1);
1717        assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
1718        assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
1719        if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
1720            assert_eq!(desc, "User table");
1721        }
1722        assert_eq!(errors.len(), 0);
1723    }
1724
1725    #[test]
1726    fn test_parse_data_contract_format() {
1727        let mut parser = ODCLImporter::new();
1728        let odcl_yaml = r#"
1729dataContractSpecification: 0.9.3
1730id: urn:datacontract:example
1731models:
1732  users:
1733    fields:
1734      id:
1735        type: bigint
1736        description: User ID
1737      name:
1738        type: string
1739        description: User name
1740"#;
1741
1742        let (table, errors) = parser.parse(odcl_yaml).unwrap();
1743        assert_eq!(table.name, "users");
1744        assert_eq!(table.columns.len(), 2);
1745        assert!(errors.is_empty());
1746    }
1747
1748    #[test]
1749    fn test_can_handle_odcl_format() {
1750        let parser = ODCLImporter::new();
1751
1752        // Data Contract format should be handled
1753        let data_contract = r#"
1754dataContractSpecification: 0.9.3
1755id: test
1756models:
1757  users:
1758    fields:
1759      id:
1760        type: int
1761"#;
1762        assert!(parser.can_handle(data_contract));
1763
1764        // Simple ODCL format should be handled
1765        let simple_odcl = r#"
1766name: users
1767columns:
1768  - name: id
1769    data_type: INT
1770"#;
1771        assert!(parser.can_handle(simple_odcl));
1772
1773        // ODCS v3.x format should NOT be handled
1774        let odcs_v3 = r#"
1775apiVersion: v3.1.0
1776kind: DataContract
1777id: test-uuid
1778version: 1.0.0
1779name: users
1780schema:
1781  - name: users
1782    properties:
1783      - name: id
1784        logicalType: integer
1785"#;
1786        assert!(!parser.can_handle(odcs_v3));
1787    }
1788}