data_modelling_core/import/
odcs.rs

1//! ODCS parser service for parsing Open Data Contract Standard YAML files.
2//!
3//! This service parses ODCS (Open Data Contract Standard) v3.1.0 and legacy ODCL (Data Contract Specification) YAML files
4//! and converts them to Table models. ODCL files are automatically converted to ODCS v3.1.0 format.
5//! Supports multiple formats:
6//! - ODCS v3.1.0 / v3.0.x format (apiVersion, kind, schema) - PRIMARY FORMAT
7//! - ODCL (Data Contract Specification) format (dataContractSpecification, models, definitions) - LEGACY, converted to ODCS
8//! - Simple ODCL format (name, columns) - LEGACY, converted to ODCS
9//! - Liquibase format
10
11use super::odcs_shared::{
12    ParserError, column_to_column_data, expand_nested_column, json_value_to_serde_value,
13    normalize_data_type, parse_data_vault_classification, parse_medallion_layer, parse_scd_pattern,
14    resolve_ref, yaml_to_json_value,
15};
16use super::{ImportError, ImportResult, TableData};
17use crate::models::column::ForeignKey;
18use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
19use crate::models::{Column, PropertyRelationship, Table, Tag};
20use anyhow::{Context, Result};
21use serde_json::Value as JsonValue;
22use std::collections::HashMap;
23use std::str::FromStr;
24use tracing::info;
25
26// Re-export ParserError for backward compatibility
27pub use super::odcs_shared::ParserError as OdcsParserError;
28
29/// Convert a $ref path to a PropertyRelationship.
30/// E.g., "#/definitions/order_id" -> PropertyRelationship { type: "foreignKey", to: "definitions/order_id" }
31fn ref_to_relationships(ref_path: &Option<String>) -> Vec<PropertyRelationship> {
32    match ref_path {
33        Some(ref_str) => {
34            let to = if ref_str.starts_with("#/definitions/") {
35                let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
36                format!("definitions/{}", def_path)
37            } else if ref_str.starts_with("#/") {
38                ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
39            } else {
40                ref_str.clone()
41            };
42            vec![PropertyRelationship {
43                relationship_type: "foreignKey".to_string(),
44                to,
45            }]
46        }
47        None => Vec::new(),
48    }
49}
50
51/// ODCS parser service for parsing Open Data Contract Standard YAML files.
52/// Handles ODCS v3.1.0 (primary format) and legacy ODCL formats (converted to ODCS).
53pub struct ODCSImporter {
54    /// Current YAML data for $ref resolution
55    current_yaml_data: Option<serde_yaml::Value>,
56}
57
58impl ODCSImporter {
59    /// Create a new ODCS parser instance.
60    ///
61    /// # Example
62    ///
63    /// ```rust
64    /// use data_modelling_core::import::odcs::ODCSImporter;
65    ///
66    /// let mut importer = ODCSImporter::new();
67    /// ```
68    pub fn new() -> Self {
69        Self {
70            current_yaml_data: None,
71        }
72    }
73
74    /// Import ODCS/ODCL YAML content and create Table (SDK interface).
75    ///
76    /// Supports ODCS v3.1.0 (primary), legacy ODCL formats (converted to ODCS), and Liquibase formats.
77    ///
78    /// # Arguments
79    ///
80    /// * `yaml_content` - ODCS/ODCL YAML content as a string
81    ///
82    /// # Returns
83    ///
84    /// An `ImportResult` containing the extracted table and any parse errors.
85    ///
86    /// # Example
87    ///
88    /// ```rust
89    /// use data_modelling_core::import::odcs::ODCSImporter;
90    ///
91    /// let mut importer = ODCSImporter::new();
92    /// let yaml = r#"
93    /// apiVersion: v3.1.0
94    /// kind: DataContract
95    /// id: 550e8400-e29b-41d4-a716-446655440000
96    /// version: 1.0.0
97    /// name: users
98    /// schema:
99    ///   - name: users
100    ///     properties:
101    ///       - name: id
102    ///         logicalType: bigint
103    /// "#;
104    /// let result = importer.import(yaml).unwrap();
105    /// assert_eq!(result.tables.len(), 1);
106    /// ```
107    pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
108        // First parse YAML to get raw data for ODCS field extraction
109        let yaml_data: serde_yaml::Value = serde_yaml::from_str(yaml_content)
110            .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
111
112        let json_data = yaml_to_json_value(&yaml_data).map_err(|e| {
113            ImportError::ParseError(format!("Failed to convert YAML to JSON: {}", e))
114        })?;
115
116        // Store current YAML data for $ref resolution
117        self.current_yaml_data = Some(yaml_data);
118
119        // Check if this is ODCS v3 format with multiple tables in schema array
120        if self.is_odcl_v3_format(&json_data) {
121            // Use parse_odcl_v3_all to get ALL tables from the schema array
122            match self.parse_odcl_v3_all(&json_data) {
123                Ok(tables_with_errors) => {
124                    let mut sdk_tables = Vec::new();
125                    let mut all_errors = Vec::new();
126
127                    for (table_index, (table, errors)) in tables_with_errors.into_iter().enumerate()
128                    {
129                        // Convert errors
130                        for e in errors {
131                            all_errors.push(ImportError::ParseError(e.message));
132                        }
133
134                        // Create TableData for this table
135                        sdk_tables.push(self.table_to_table_data(table, table_index, &json_data));
136                    }
137
138                    info!(
139                        "ODCS import completed: {} tables extracted from schema array",
140                        sdk_tables.len()
141                    );
142
143                    return Ok(ImportResult {
144                        tables: sdk_tables,
145                        tables_requiring_name: Vec::new(),
146                        errors: all_errors,
147                        ai_suggestions: None,
148                    });
149                }
150                Err(e) => {
151                    return Err(ImportError::ParseError(e.to_string()));
152                }
153            }
154        }
155
156        // Fall back to single-table parsing for non-ODCS v3 formats
157        match self.parse(yaml_content) {
158            Ok((table, errors)) => {
159                let sdk_tables = vec![self.table_to_table_data(table, 0, &json_data)];
160                let sdk_errors: Vec<ImportError> = errors
161                    .iter()
162                    .map(|e| ImportError::ParseError(e.message.clone()))
163                    .collect();
164                Ok(ImportResult {
165                    tables: sdk_tables,
166                    tables_requiring_name: Vec::new(),
167                    errors: sdk_errors,
168                    ai_suggestions: None,
169                })
170            }
171            Err(e) => Err(ImportError::ParseError(e.to_string())),
172        }
173    }
174
175    /// Convert a Table to TableData for the SDK interface.
176    fn table_to_table_data(
177        &self,
178        table: Table,
179        table_index: usize,
180        json_data: &JsonValue,
181    ) -> TableData {
182        TableData {
183            table_index,
184            id: Some(table.id.to_string()),
185            name: Some(table.name.clone()),
186            api_version: json_data
187                .get("apiVersion")
188                .and_then(|v| v.as_str())
189                .map(|s| s.to_string()),
190            version: json_data
191                .get("version")
192                .and_then(|v| v.as_str())
193                .map(|s| s.to_string()),
194            status: json_data
195                .get("status")
196                .and_then(|v| v.as_str())
197                .map(|s| s.to_string()),
198            kind: json_data
199                .get("kind")
200                .and_then(|v| v.as_str())
201                .map(|s| s.to_string()),
202            domain: json_data
203                .get("domain")
204                .and_then(|v| v.as_str())
205                .map(|s| s.to_string()),
206            data_product: json_data
207                .get("dataProduct")
208                .and_then(|v| v.as_str())
209                .map(|s| s.to_string()),
210            tenant: json_data
211                .get("tenant")
212                .and_then(|v| v.as_str())
213                .map(|s| s.to_string()),
214            description: json_data.get("description").cloned(),
215            // Schema-level fields (from table's odcl_metadata or direct fields)
216            physical_name: table
217                .odcl_metadata
218                .get("physicalName")
219                .and_then(|v| v.as_str())
220                .map(|s| s.to_string())
221                .or_else(|| table.schema_name.clone()),
222            physical_type: table
223                .odcl_metadata
224                .get("physicalType")
225                .and_then(|v| v.as_str())
226                .map(|s| s.to_string()),
227            business_name: table
228                .odcl_metadata
229                .get("businessName")
230                .and_then(|v| v.as_str())
231                .map(|s| s.to_string()),
232            data_granularity_description: table
233                .odcl_metadata
234                .get("dataGranularityDescription")
235                .and_then(|v| v.as_str())
236                .map(|s| s.to_string()),
237            columns: table.columns.iter().map(column_to_column_data).collect(),
238            servers: json_data
239                .get("servers")
240                .and_then(|v| v.as_array())
241                .cloned()
242                .unwrap_or_default(),
243            team: json_data.get("team").cloned(),
244            support: json_data.get("support").cloned(),
245            roles: json_data
246                .get("roles")
247                .and_then(|v| v.as_array())
248                .cloned()
249                .unwrap_or_default(),
250            sla_properties: json_data
251                .get("slaProperties")
252                .and_then(|v| v.as_array())
253                .cloned()
254                .unwrap_or_default(),
255            quality: table.quality.clone(),
256            price: json_data.get("price").cloned(),
257            tags: table.tags.iter().map(|t| t.to_string()).collect(),
258            // Merge contract-level and schema-level customProperties
259            custom_properties: {
260                let mut props = json_data
261                    .get("customProperties")
262                    .and_then(|v| v.as_array())
263                    .cloned()
264                    .unwrap_or_default();
265                // Add schema-level customProperties if present in odcl_metadata
266                if let Some(schema_props) = table.odcl_metadata.get("schemaCustomProperties")
267                    && let Some(schema_arr) = schema_props.as_array()
268                {
269                    for prop in schema_arr {
270                        // Avoid duplicates by checking if property already exists
271                        let prop_name = prop.get("property").and_then(|v| v.as_str()).unwrap_or("");
272                        let already_exists = props.iter().any(|p| {
273                            p.get("property")
274                                .and_then(|v| v.as_str())
275                                .map(|n| n == prop_name)
276                                .unwrap_or(false)
277                        });
278                        if !already_exists {
279                            props.push(prop.clone());
280                        }
281                    }
282                }
283                props
284            },
285            authoritative_definitions: json_data
286                .get("authoritativeDefinitions")
287                .and_then(|v| v.as_array())
288                .cloned()
289                .unwrap_or_default(),
290            contract_created_ts: json_data
291                .get("contractCreatedTs")
292                .and_then(|v| v.as_str())
293                .map(|s| s.to_string()),
294            odcs_metadata: table.odcl_metadata.clone(),
295        }
296    }
297
298    /// Import ODCS YAML content and return an ODCSContract (new v2 API).
299    ///
300    /// This method returns the native ODCS contract structure, which properly models
301    /// the ODCS v3.1.0 three-level hierarchy:
302    /// - Contract level (apiVersion, domain, etc.)
303    /// - Schema level (tables/views with physicalName, businessName, etc.)
304    /// - Property level (columns with all metadata)
305    ///
306    /// This API provides lossless round-trip import/export and supports multi-table contracts.
307    ///
308    /// # Arguments
309    ///
310    /// * `yaml_content` - ODCS YAML content as a string
311    ///
312    /// # Returns
313    ///
314    /// An `ODCSContract` containing the full contract with all schemas and properties.
315    ///
316    /// # Example
317    ///
318    /// ```rust
319    /// use data_modelling_core::import::odcs::ODCSImporter;
320    ///
321    /// let mut importer = ODCSImporter::new();
322    /// let yaml = r#"
323    /// apiVersion: v3.1.0
324    /// kind: DataContract
325    /// id: 550e8400-e29b-41d4-a716-446655440000
326    /// version: 1.0.0
327    /// name: my-contract
328    /// domain: retail
329    /// schema:
330    ///   - name: users
331    ///     physicalName: tbl_users
332    ///     properties:
333    ///       - name: id
334    ///         logicalType: integer
335    ///         primaryKey: true
336    ///       - name: email
337    ///         logicalType: string
338    ///         required: true
339    /// "#;
340    /// let contract = importer.import_contract(yaml).unwrap();
341    /// assert_eq!(contract.name, "my-contract");
342    /// assert_eq!(contract.schema.len(), 1);
343    /// assert_eq!(contract.schema[0].name, "users");
344    /// ```
345    pub fn import_contract(
346        &mut self,
347        yaml_content: &str,
348    ) -> Result<crate::models::odcs::ODCSContract, ImportError> {
349        use crate::models::odcs::{
350            AuthoritativeDefinition, CustomProperty, Description, ODCSContract, QualityRule, Role,
351            Server, ServiceLevel, Support, Team,
352        };
353
354        // Parse YAML
355        let yaml_data: serde_yaml::Value = serde_yaml::from_str(yaml_content)
356            .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
357
358        let json_data = yaml_to_json_value(&yaml_data).map_err(|e| {
359            ImportError::ParseError(format!("Failed to convert YAML to JSON: {}", e))
360        })?;
361
362        // Store current YAML data for $ref resolution
363        self.current_yaml_data = Some(yaml_data);
364
365        // Check if this is ODCS v3 format
366        if !self.is_odcl_v3_format(&json_data) {
367            return Err(ImportError::ParseError(
368                "import_contract() only supports ODCS v3 format. Use import() for legacy formats."
369                    .to_string(),
370            ));
371        }
372
373        // Extract contract-level fields
374        let api_version = json_data
375            .get("apiVersion")
376            .and_then(|v| v.as_str())
377            .unwrap_or("v3.1.0")
378            .to_string();
379        let kind = json_data
380            .get("kind")
381            .and_then(|v| v.as_str())
382            .unwrap_or("DataContract")
383            .to_string();
384        let id = json_data
385            .get("id")
386            .and_then(|v| v.as_str())
387            .unwrap_or("")
388            .to_string();
389        let version = json_data
390            .get("version")
391            .and_then(|v| v.as_str())
392            .unwrap_or("1.0.0")
393            .to_string();
394        let name = json_data
395            .get("name")
396            .and_then(|v| v.as_str())
397            .unwrap_or("")
398            .to_string();
399        let status = json_data
400            .get("status")
401            .and_then(|v| v.as_str())
402            .map(|s| s.to_string());
403        let domain = json_data
404            .get("domain")
405            .and_then(|v| v.as_str())
406            .map(|s| s.to_string());
407        let data_product = json_data
408            .get("dataProduct")
409            .and_then(|v| v.as_str())
410            .map(|s| s.to_string());
411        let tenant = json_data
412            .get("tenant")
413            .and_then(|v| v.as_str())
414            .map(|s| s.to_string());
415
416        // Description can be string or object
417        let description = json_data.get("description").and_then(|v| {
418            if v.is_string() {
419                Some(Description::Simple(v.as_str().unwrap().to_string()))
420            } else if v.is_object() {
421                serde_json::from_value::<Description>(json_value_to_serde_value(v)).ok()
422            } else {
423                None
424            }
425        });
426
427        // Parse schema array
428        let schema = self.parse_schema_array_to_odcs(&json_data)?;
429
430        // Parse servers
431        let servers: Vec<Server> = json_data
432            .get("servers")
433            .and_then(|v| v.as_array())
434            .map(|arr| {
435                arr.iter()
436                    .filter_map(|s| serde_json::from_value(json_value_to_serde_value(s)).ok())
437                    .collect()
438            })
439            .unwrap_or_default();
440
441        // Parse team
442        let team: Option<Team> = json_data
443            .get("team")
444            .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
445
446        // Parse support
447        let support: Option<Support> = json_data
448            .get("support")
449            .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
450
451        // Parse roles
452        let roles: Vec<Role> = json_data
453            .get("roles")
454            .and_then(|v| v.as_array())
455            .map(|arr| {
456                arr.iter()
457                    .filter_map(|r| serde_json::from_value(json_value_to_serde_value(r)).ok())
458                    .collect()
459            })
460            .unwrap_or_default();
461
462        // Parse service levels
463        let service_levels: Vec<ServiceLevel> = json_data
464            .get("slaProperties")
465            .or_else(|| json_data.get("serviceLevels"))
466            .and_then(|v| v.as_array())
467            .map(|arr| {
468                arr.iter()
469                    .filter_map(|s| serde_json::from_value(json_value_to_serde_value(s)).ok())
470                    .collect()
471            })
472            .unwrap_or_default();
473
474        // Parse contract-level quality rules
475        let quality: Vec<QualityRule> = json_data
476            .get("quality")
477            .and_then(|v| v.as_array())
478            .map(|arr| {
479                arr.iter()
480                    .filter_map(|q| serde_json::from_value(json_value_to_serde_value(q)).ok())
481                    .collect()
482            })
483            .unwrap_or_default();
484
485        // Parse price
486        let price = json_data
487            .get("price")
488            .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
489
490        // Parse terms
491        let terms = json_data
492            .get("terms")
493            .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
494
495        // Parse links
496        let links = json_data
497            .get("links")
498            .and_then(|v| v.as_array())
499            .map(|arr| {
500                arr.iter()
501                    .filter_map(|l| serde_json::from_value(json_value_to_serde_value(l)).ok())
502                    .collect()
503            })
504            .unwrap_or_default();
505
506        // Parse authoritative definitions
507        let authoritative_definitions: Vec<AuthoritativeDefinition> = json_data
508            .get("authoritativeDefinitions")
509            .and_then(|v| v.as_array())
510            .map(|arr| {
511                arr.iter()
512                    .filter_map(|ad| serde_json::from_value(json_value_to_serde_value(ad)).ok())
513                    .collect()
514            })
515            .unwrap_or_default();
516
517        // Parse tags
518        let tags: Vec<String> = json_data
519            .get("tags")
520            .and_then(|v| v.as_array())
521            .map(|arr| {
522                arr.iter()
523                    .filter_map(|t| t.as_str().map(|s| s.to_string()))
524                    .collect()
525            })
526            .unwrap_or_default();
527
528        // Parse custom properties
529        let custom_properties: Vec<CustomProperty> = json_data
530            .get("customProperties")
531            .and_then(|v| v.as_array())
532            .map(|arr| {
533                arr.iter()
534                    .filter_map(|cp| serde_json::from_value(json_value_to_serde_value(cp)).ok())
535                    .collect()
536            })
537            .unwrap_or_default();
538
539        // Parse contract created timestamp
540        let contract_created_ts = json_data
541            .get("contractCreatedTs")
542            .and_then(|v| v.as_str())
543            .map(|s| s.to_string());
544
545        Ok(ODCSContract {
546            api_version,
547            kind,
548            id,
549            version,
550            name,
551            status,
552            domain,
553            data_product,
554            tenant,
555            description,
556            schema,
557            servers,
558            team,
559            support,
560            roles,
561            service_levels,
562            quality,
563            price,
564            terms,
565            links,
566            authoritative_definitions,
567            tags,
568            custom_properties,
569            contract_created_ts,
570        })
571    }
572
573    /// Parse the schema array into ODCS SchemaObject types
574    fn parse_schema_array_to_odcs(
575        &self,
576        json_data: &JsonValue,
577    ) -> Result<Vec<crate::models::odcs::SchemaObject>, ImportError> {
578        use crate::models::odcs::{
579            AuthoritativeDefinition, CustomProperty, QualityRule, SchemaObject, SchemaRelationship,
580        };
581
582        let schema_arr = match json_data.get("schema").and_then(|v| v.as_array()) {
583            Some(arr) => arr,
584            None => return Ok(Vec::new()),
585        };
586
587        let mut schemas = Vec::new();
588
589        for (idx, schema_value) in schema_arr.iter().enumerate() {
590            let schema_obj = match schema_value.as_object() {
591                Some(obj) => obj,
592                None => {
593                    return Err(ImportError::ParseError(format!(
594                        "Schema object at index {} must be a dictionary",
595                        idx
596                    )));
597                }
598            };
599
600            let name = schema_obj
601                .get("name")
602                .and_then(|v| v.as_str())
603                .ok_or_else(|| {
604                    ImportError::ParseError(format!(
605                        "Schema object at index {} missing 'name' field",
606                        idx
607                    ))
608                })?
609                .to_string();
610
611            let id = schema_obj
612                .get("id")
613                .and_then(|v| v.as_str())
614                .map(|s| s.to_string());
615            let physical_name = schema_obj
616                .get("physicalName")
617                .and_then(|v| v.as_str())
618                .map(|s| s.to_string());
619            let physical_type = schema_obj
620                .get("physicalType")
621                .and_then(|v| v.as_str())
622                .map(|s| s.to_string());
623            let business_name = schema_obj
624                .get("businessName")
625                .and_then(|v| v.as_str())
626                .map(|s| s.to_string());
627            let description = schema_obj
628                .get("description")
629                .and_then(|v| v.as_str())
630                .map(|s| s.to_string());
631            let data_granularity_description = schema_obj
632                .get("dataGranularityDescription")
633                .and_then(|v| v.as_str())
634                .map(|s| s.to_string());
635
636            // Parse properties
637            let properties = self.parse_properties_to_odcs(schema_obj, json_data)?;
638
639            // Parse schema-level relationships
640            let relationships: Vec<SchemaRelationship> = schema_obj
641                .get("relationships")
642                .and_then(|v| v.as_array())
643                .map(|arr| {
644                    arr.iter()
645                        .filter_map(|r| serde_json::from_value(json_value_to_serde_value(r)).ok())
646                        .collect()
647                })
648                .unwrap_or_default();
649
650            // Parse schema-level quality rules
651            let quality: Vec<QualityRule> = schema_obj
652                .get("quality")
653                .and_then(|v| v.as_array())
654                .map(|arr| {
655                    arr.iter()
656                        .filter_map(|q| serde_json::from_value(json_value_to_serde_value(q)).ok())
657                        .collect()
658                })
659                .unwrap_or_default();
660
661            // Parse authoritative definitions
662            let authoritative_definitions: Vec<AuthoritativeDefinition> = schema_obj
663                .get("authoritativeDefinitions")
664                .and_then(|v| v.as_array())
665                .map(|arr| {
666                    arr.iter()
667                        .filter_map(|ad| serde_json::from_value(json_value_to_serde_value(ad)).ok())
668                        .collect()
669                })
670                .unwrap_or_default();
671
672            // Parse tags
673            let tags: Vec<String> = schema_obj
674                .get("tags")
675                .and_then(|v| v.as_array())
676                .map(|arr| {
677                    arr.iter()
678                        .filter_map(|t| t.as_str().map(|s| s.to_string()))
679                        .collect()
680                })
681                .unwrap_or_default();
682
683            // Parse custom properties
684            let custom_properties: Vec<CustomProperty> = schema_obj
685                .get("customProperties")
686                .and_then(|v| v.as_array())
687                .map(|arr| {
688                    arr.iter()
689                        .filter_map(|cp| serde_json::from_value(json_value_to_serde_value(cp)).ok())
690                        .collect()
691                })
692                .unwrap_or_default();
693
694            schemas.push(SchemaObject {
695                id,
696                name,
697                physical_name,
698                physical_type,
699                business_name,
700                description,
701                data_granularity_description,
702                properties,
703                relationships,
704                quality,
705                authoritative_definitions,
706                tags,
707                custom_properties,
708            });
709        }
710
711        Ok(schemas)
712    }
713
714    /// Parse properties array into ODCS Property types (with nested support)
715    fn parse_properties_to_odcs(
716        &self,
717        schema_obj: &serde_json::Map<String, JsonValue>,
718        root_data: &JsonValue,
719    ) -> Result<Vec<crate::models::odcs::Property>, ImportError> {
720        let props_arr = match schema_obj.get("properties").and_then(|v| v.as_array()) {
721            Some(arr) => arr,
722            None => return Ok(Vec::new()),
723        };
724
725        let mut properties = Vec::new();
726
727        for prop_value in props_arr {
728            let prop_obj = match prop_value.as_object() {
729                Some(obj) => obj,
730                None => continue,
731            };
732
733            let prop = self.parse_single_property_to_odcs(prop_obj, root_data)?;
734            properties.push(prop);
735        }
736
737        Ok(properties)
738    }
739
740    /// Parse a single property object to ODCS Property (recursive for nested)
741    #[allow(clippy::only_used_in_recursion)]
742    fn parse_single_property_to_odcs(
743        &self,
744        prop_obj: &serde_json::Map<String, JsonValue>,
745        root_data: &JsonValue,
746    ) -> Result<crate::models::odcs::Property, ImportError> {
747        use crate::models::odcs::{
748            AuthoritativeDefinition, CustomProperty, LogicalTypeOptions, Property,
749            PropertyRelationship, QualityRule,
750        };
751
752        // Handle $ref
753        if let Some(ref_path) = prop_obj.get("$ref").and_then(|v| v.as_str())
754            && let Some(resolved) = resolve_ref(ref_path, root_data)
755            && let Some(obj) = resolved.as_object()
756        {
757            return self.parse_single_property_to_odcs(obj, root_data);
758        }
759
760        let name = prop_obj
761            .get("name")
762            .and_then(|v| v.as_str())
763            .unwrap_or("")
764            .to_string();
765        let logical_type = prop_obj
766            .get("logicalType")
767            .or_else(|| prop_obj.get("type"))
768            .and_then(|v| v.as_str())
769            .unwrap_or("string")
770            .to_string();
771
772        let id = prop_obj
773            .get("id")
774            .and_then(|v| v.as_str())
775            .map(|s| s.to_string());
776        let business_name = prop_obj
777            .get("businessName")
778            .and_then(|v| v.as_str())
779            .map(|s| s.to_string());
780        let description = prop_obj
781            .get("description")
782            .and_then(|v| v.as_str())
783            .map(|s| s.to_string());
784        let physical_type = prop_obj
785            .get("physicalType")
786            .and_then(|v| v.as_str())
787            .map(|s| s.to_string());
788        let physical_name = prop_obj
789            .get("physicalName")
790            .and_then(|v| v.as_str())
791            .map(|s| s.to_string());
792
793        // Parse logicalTypeOptions
794        let logical_type_options: Option<LogicalTypeOptions> = prop_obj
795            .get("logicalTypeOptions")
796            .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
797
798        // Key constraints
799        let required = prop_obj
800            .get("required")
801            .and_then(|v| v.as_bool())
802            .unwrap_or(false);
803        let primary_key = prop_obj
804            .get("primaryKey")
805            .and_then(|v| v.as_bool())
806            .unwrap_or(false);
807        let primary_key_position = prop_obj
808            .get("primaryKeyPosition")
809            .and_then(|v| v.as_i64())
810            .map(|n| n as i32);
811        let unique = prop_obj
812            .get("unique")
813            .and_then(|v| v.as_bool())
814            .unwrap_or(false);
815
816        // Partitioning & clustering
817        let partitioned = prop_obj
818            .get("partitioned")
819            .and_then(|v| v.as_bool())
820            .unwrap_or(false);
821        let partition_key_position = prop_obj
822            .get("partitionKeyPosition")
823            .and_then(|v| v.as_i64())
824            .map(|n| n as i32);
825        let clustered = prop_obj
826            .get("clustered")
827            .and_then(|v| v.as_bool())
828            .unwrap_or(false);
829
830        // Classification & security
831        let classification = prop_obj
832            .get("classification")
833            .and_then(|v| v.as_str())
834            .map(|s| s.to_string());
835        let critical_data_element = prop_obj
836            .get("criticalDataElement")
837            .and_then(|v| v.as_bool())
838            .unwrap_or(false);
839        let encrypted_name = prop_obj
840            .get("encryptedName")
841            .and_then(|v| v.as_str())
842            .map(|s| s.to_string());
843
844        // Transformation
845        let transform_source_objects: Vec<String> = prop_obj
846            .get("transformSourceObjects")
847            .and_then(|v| v.as_array())
848            .map(|arr| {
849                arr.iter()
850                    .filter_map(|t| t.as_str().map(|s| s.to_string()))
851                    .collect()
852            })
853            .unwrap_or_default();
854        let transform_logic = prop_obj
855            .get("transformLogic")
856            .and_then(|v| v.as_str())
857            .map(|s| s.to_string());
858        let transform_description = prop_obj
859            .get("transformDescription")
860            .and_then(|v| v.as_str())
861            .map(|s| s.to_string());
862
863        // Examples & defaults
864        let examples: Vec<serde_json::Value> = prop_obj
865            .get("examples")
866            .and_then(|v| v.as_array())
867            .map(|arr| arr.iter().map(json_value_to_serde_value).collect())
868            .unwrap_or_default();
869        let default_value = prop_obj.get("default").map(json_value_to_serde_value);
870
871        // Relationships
872        let relationships: Vec<PropertyRelationship> = prop_obj
873            .get("relationships")
874            .and_then(|v| v.as_array())
875            .map(|arr| {
876                arr.iter()
877                    .filter_map(|r| serde_json::from_value(json_value_to_serde_value(r)).ok())
878                    .collect()
879            })
880            .unwrap_or_default();
881
882        // Authoritative definitions
883        let authoritative_definitions: Vec<AuthoritativeDefinition> = prop_obj
884            .get("authoritativeDefinitions")
885            .and_then(|v| v.as_array())
886            .map(|arr| {
887                arr.iter()
888                    .filter_map(|ad| serde_json::from_value(json_value_to_serde_value(ad)).ok())
889                    .collect()
890            })
891            .unwrap_or_default();
892
893        // Quality rules
894        let quality: Vec<QualityRule> = prop_obj
895            .get("quality")
896            .and_then(|v| v.as_array())
897            .map(|arr| {
898                arr.iter()
899                    .filter_map(|q| serde_json::from_value(json_value_to_serde_value(q)).ok())
900                    .collect()
901            })
902            .unwrap_or_default();
903
904        // Enum values
905        let enum_values: Vec<String> = prop_obj
906            .get("enum")
907            .and_then(|v| v.as_array())
908            .map(|arr| {
909                arr.iter()
910                    .filter_map(|e| e.as_str().map(|s| s.to_string()))
911                    .collect()
912            })
913            .unwrap_or_default();
914
915        // Tags
916        let tags: Vec<String> = prop_obj
917            .get("tags")
918            .and_then(|v| v.as_array())
919            .map(|arr| {
920                arr.iter()
921                    .filter_map(|t| t.as_str().map(|s| s.to_string()))
922                    .collect()
923            })
924            .unwrap_or_default();
925
926        // Custom properties
927        let custom_properties: Vec<CustomProperty> = prop_obj
928            .get("customProperties")
929            .and_then(|v| v.as_array())
930            .map(|arr| {
931                arr.iter()
932                    .filter_map(|cp| serde_json::from_value(json_value_to_serde_value(cp)).ok())
933                    .collect()
934            })
935            .unwrap_or_default();
936
937        // Parse nested properties (for OBJECT types)
938        let nested_properties: Vec<Property> =
939            if let Some(props) = prop_obj.get("properties").and_then(|v| v.as_array()) {
940                props
941                    .iter()
942                    .filter_map(|p| {
943                        p.as_object()
944                            .and_then(|obj| self.parse_single_property_to_odcs(obj, root_data).ok())
945                    })
946                    .collect()
947            } else {
948                Vec::new()
949            };
950
951        // Parse items (for ARRAY types)
952        let items: Option<Box<Property>> = prop_obj.get("items").and_then(|v| {
953            v.as_object()
954                .and_then(|obj| self.parse_single_property_to_odcs(obj, root_data).ok())
955                .map(Box::new)
956        });
957
958        Ok(Property {
959            id,
960            name,
961            business_name,
962            description,
963            logical_type,
964            physical_type,
965            physical_name,
966            logical_type_options,
967            required,
968            primary_key,
969            primary_key_position,
970            unique,
971            partitioned,
972            partition_key_position,
973            clustered,
974            classification,
975            critical_data_element,
976            encrypted_name,
977            transform_source_objects,
978            transform_logic,
979            transform_description,
980            examples,
981            default_value,
982            relationships,
983            authoritative_definitions,
984            quality,
985            enum_values,
986            tags,
987            custom_properties,
988            items,
989            properties: nested_properties,
990        })
991    }
992
993    /// Parse ODCS/ODCL YAML content and create Table (public method for native app use).
994    ///
995    /// This method returns the full Table object with all metadata, suitable for use in
996    /// native applications that need direct access to the parsed table structure.
997    /// For API use, prefer the `import()` method which returns ImportResult.
998    ///
999    /// # Returns
1000    ///
1001    /// Returns a tuple of (Table, list of errors/warnings).
1002    /// Errors list is empty if parsing is successful.
1003    pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
1004        self.parse(yaml_content)
1005    }
1006
1007    /// Parse ODCS/ODCL YAML content and create Table (internal method).
1008    ///
1009    /// Supports ODCS v3.1.0/v3.0.x (primary), ODCL Data Contract spec (legacy, converted to ODCS),
1010    /// simple ODCL (legacy, converted to ODCS), and Liquibase formats.
1011    ///
1012    /// # Returns
1013    ///
1014    /// Returns a tuple of (Table, list of errors/warnings).
1015    /// Errors list is empty if parsing is successful.
1016    fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
1017        // Errors are collected in helper functions, not here
1018        let _errors: Vec<ParserError> = Vec::new();
1019
1020        // Parse YAML
1021        let data: serde_yaml::Value =
1022            serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
1023
1024        if data.is_null() {
1025            return Err(anyhow::anyhow!("Empty YAML content"));
1026        }
1027
1028        // Store current YAML data for $ref resolution
1029        self.current_yaml_data = Some(data.clone());
1030
1031        // Convert to JSON Value for easier manipulation
1032        let json_data = yaml_to_json_value(&data)?;
1033
1034        // Check format and parse accordingly
1035        if self.is_liquibase_format(&json_data) {
1036            return self.parse_liquibase(&json_data);
1037        }
1038
1039        if self.is_odcl_v3_format(&json_data) {
1040            return self.parse_odcl_v3(&json_data);
1041        }
1042
1043        if self.is_data_contract_format(&json_data) {
1044            return self.parse_data_contract(&json_data);
1045        }
1046
1047        // Fall back to simple ODCL format
1048        self.parse_simple_odcl(&json_data)
1049    }
1050
1051    /// Check if YAML is in Liquibase format.
1052    fn is_liquibase_format(&self, data: &JsonValue) -> bool {
1053        if data.get("databaseChangeLog").is_some() {
1054            return true;
1055        }
1056        // Check for Liquibase-specific keys
1057        if let Some(obj) = data.as_object() {
1058            let obj_str = format!("{:?}", obj);
1059            if obj_str.contains("changeSet") {
1060                return true;
1061            }
1062        }
1063        false
1064    }
1065
1066    /// Check if YAML is in ODCS v3.0.x format.
1067    fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
1068        if let Some(obj) = data.as_object() {
1069            let has_api_version = obj.contains_key("apiVersion");
1070            let has_kind = obj
1071                .get("kind")
1072                .and_then(|v| v.as_str())
1073                .map(|s| s == "DataContract")
1074                .unwrap_or(false);
1075            let has_id = obj.contains_key("id");
1076            let has_version = obj.contains_key("version");
1077            return has_api_version && has_kind && has_id && has_version;
1078        }
1079        false
1080    }
1081
1082    /// Check if YAML is in Data Contract specification format.
1083    fn is_data_contract_format(&self, data: &JsonValue) -> bool {
1084        if let Some(obj) = data.as_object() {
1085            let has_spec = obj.contains_key("dataContractSpecification");
1086            let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
1087            return has_spec && has_models;
1088        }
1089        false
1090    }
1091
1092    /// Parse simple ODCL format.
1093    fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1094        let mut errors = Vec::new();
1095
1096        // Extract table name
1097        let name = data
1098            .get("name")
1099            .and_then(|v| v.as_str())
1100            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
1101            .to_string();
1102
1103        // Extract columns
1104        let columns_data = data
1105            .get("columns")
1106            .and_then(|v| v.as_array())
1107            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
1108
1109        let mut columns = Vec::new();
1110        for (idx, col_data) in columns_data.iter().enumerate() {
1111            match self.parse_column(col_data) {
1112                Ok(col) => columns.push(col),
1113                Err(e) => {
1114                    errors.push(ParserError {
1115                        error_type: "column_parse_error".to_string(),
1116                        field: format!("columns[{}]", idx),
1117                        message: e.to_string(),
1118                    });
1119                }
1120            }
1121        }
1122
1123        // Extract metadata
1124        let database_type = self.extract_database_type(data);
1125        let medallion_layers = self.extract_medallion_layers(data);
1126        let scd_pattern = self.extract_scd_pattern(data);
1127        let data_vault_classification = self.extract_data_vault_classification(data);
1128        let quality_rules = self.extract_quality_rules(data);
1129
1130        // Validate pattern exclusivity
1131        if scd_pattern.is_some() && data_vault_classification.is_some() {
1132            errors.push(ParserError {
1133                error_type: "validation_error".to_string(),
1134                field: "patterns".to_string(),
1135                message: "SCD pattern and Data Vault classification are mutually exclusive"
1136                    .to_string(),
1137            });
1138        }
1139
1140        // Extract odcl_metadata
1141        let mut odcl_metadata = HashMap::new();
1142        if let Some(metadata) = data.get("odcl_metadata")
1143            && let Some(obj) = metadata.as_object()
1144        {
1145            for (key, value) in obj {
1146                odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
1147            }
1148        }
1149
1150        let table_uuid = self.extract_table_uuid(data);
1151
1152        let table = Table {
1153            id: table_uuid,
1154            name,
1155            columns,
1156            database_type,
1157            catalog_name: None,
1158            schema_name: None,
1159            medallion_layers,
1160            scd_pattern,
1161            data_vault_classification,
1162            modeling_level: None,
1163            tags: Vec::<Tag>::new(),
1164            odcl_metadata,
1165            owner: None,
1166            sla: None,
1167            contact_details: None,
1168            infrastructure_type: None,
1169            notes: None,
1170            position: None,
1171            yaml_file_path: None,
1172            drawio_cell_id: None,
1173            quality: quality_rules,
1174            errors: Vec::new(),
1175            created_at: chrono::Utc::now(),
1176            updated_at: chrono::Utc::now(),
1177        };
1178
1179        info!("Parsed ODCL table: {}", table.name);
1180        Ok((table, errors))
1181    }
1182
1183    /// Parse a single column definition.
1184    fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
1185        let name = col_data
1186            .get("name")
1187            .and_then(|v| v.as_str())
1188            .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
1189            .to_string();
1190
1191        let data_type = col_data
1192            .get("data_type")
1193            .and_then(|v| v.as_str())
1194            .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
1195            .to_string();
1196
1197        // Normalize data_type to uppercase (preserve STRUCT<...> format)
1198        let data_type = normalize_data_type(&data_type);
1199
1200        let nullable = col_data
1201            .get("nullable")
1202            .and_then(|v| v.as_bool())
1203            .unwrap_or(true);
1204
1205        let primary_key = col_data
1206            .get("primary_key")
1207            .and_then(|v| v.as_bool())
1208            .unwrap_or(false);
1209
1210        let foreign_key = col_data
1211            .get("foreign_key")
1212            .and_then(|v| self.parse_foreign_key(v));
1213
1214        let constraints = col_data
1215            .get("constraints")
1216            .and_then(|v| v.as_array())
1217            .map(|arr| {
1218                arr.iter()
1219                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1220                    .collect()
1221            })
1222            .unwrap_or_default();
1223
1224        let description = col_data
1225            .get("description")
1226            .and_then(|v| v.as_str())
1227            .map(|s| s.to_string())
1228            .unwrap_or_default();
1229
1230        // Extract column-level quality rules
1231        let mut column_quality_rules = Vec::new();
1232        if let Some(quality_val) = col_data.get("quality") {
1233            if let Some(arr) = quality_val.as_array() {
1234                // Array of quality rules
1235                for item in arr {
1236                    if let Some(obj) = item.as_object() {
1237                        let mut rule = HashMap::new();
1238                        for (key, value) in obj {
1239                            rule.insert(key.clone(), json_value_to_serde_value(value));
1240                        }
1241                        column_quality_rules.push(rule);
1242                    }
1243                }
1244            } else if let Some(obj) = quality_val.as_object() {
1245                // Single quality rule object
1246                let mut rule = HashMap::new();
1247                for (key, value) in obj {
1248                    rule.insert(key.clone(), json_value_to_serde_value(value));
1249                }
1250                column_quality_rules.push(rule);
1251            }
1252        }
1253
1254        // Note: ODCS v3.1.0 uses the `required` field at the property level to indicate non-nullable columns,
1255        // not a quality rule. We should NOT add a "not_null" quality rule as it's not a valid ODCS quality type.
1256        // The `nullable` field will be converted to `required` during export.
1257
1258        Ok(Column {
1259            name,
1260            data_type,
1261            nullable,
1262            primary_key,
1263            foreign_key,
1264            constraints,
1265            description,
1266            quality: column_quality_rules,
1267            ..Default::default()
1268        })
1269    }
1270
1271    /// Parse foreign key from JSON value.
1272    fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
1273        let obj = fk_data.as_object()?;
1274        Some(ForeignKey {
1275            table_id: obj
1276                .get("table_id")
1277                .or_else(|| obj.get("table"))
1278                .and_then(|v| v.as_str())
1279                .unwrap_or("")
1280                .to_string(),
1281            column_name: obj
1282                .get("column_name")
1283                .or_else(|| obj.get("column"))
1284                .and_then(|v| v.as_str())
1285                .unwrap_or("")
1286                .to_string(),
1287        })
1288    }
1289
1290    /// Extract database type from data.
1291    fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
1292        data.get("database_type")
1293            .and_then(|v| v.as_str())
1294            .and_then(|s| match s.to_uppercase().as_str() {
1295                "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
1296                "MYSQL" => Some(DatabaseType::Mysql),
1297                "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
1298                "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
1299                "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
1300                _ => None,
1301            })
1302    }
1303
1304    /// Extract medallion layers from data.
1305    fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
1306        let mut layers = Vec::new();
1307
1308        // Check plural form first
1309        if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
1310            for item in arr {
1311                if let Some(s) = item.as_str()
1312                    && let Ok(layer) = parse_medallion_layer(s)
1313                {
1314                    layers.push(layer);
1315                }
1316            }
1317        }
1318        // Check singular form (backward compatibility)
1319        else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
1320            && let Ok(layer) = parse_medallion_layer(s)
1321        {
1322            layers.push(layer);
1323        }
1324
1325        layers
1326    }
1327
1328    /// Extract SCD pattern from data.
1329    fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
1330        data.get("scd_pattern")
1331            .and_then(|v| v.as_str())
1332            .and_then(|s| parse_scd_pattern(s).ok())
1333    }
1334
1335    /// Extract Data Vault classification from data.
1336    fn extract_data_vault_classification(
1337        &self,
1338        data: &JsonValue,
1339    ) -> Option<DataVaultClassification> {
1340        data.get("data_vault_classification")
1341            .and_then(|v| v.as_str())
1342            .and_then(|s| parse_data_vault_classification(s).ok())
1343    }
1344
1345    /// Extract quality rules from data.
1346    fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
1347        use serde_json::Value;
1348        let mut quality_rules = Vec::new();
1349
1350        // Check for quality field at root level (array of objects or single object)
1351        if let Some(quality_val) = data.get("quality") {
1352            if let Some(arr) = quality_val.as_array() {
1353                // Array of quality rules
1354                for item in arr {
1355                    if let Some(obj) = item.as_object() {
1356                        let mut rule = HashMap::new();
1357                        for (key, value) in obj {
1358                            rule.insert(key.clone(), json_value_to_serde_value(value));
1359                        }
1360                        quality_rules.push(rule);
1361                    }
1362                }
1363            } else if let Some(obj) = quality_val.as_object() {
1364                // Single quality rule object
1365                let mut rule = HashMap::new();
1366                for (key, value) in obj {
1367                    rule.insert(key.clone(), json_value_to_serde_value(value));
1368                }
1369                quality_rules.push(rule);
1370            } else if let Some(s) = quality_val.as_str() {
1371                // Simple string quality value
1372                let mut rule = HashMap::new();
1373                rule.insert("value".to_string(), Value::String(s.to_string()));
1374                quality_rules.push(rule);
1375            }
1376        }
1377
1378        // Check for quality in metadata (ODCL v3 format)
1379        if let Some(metadata) = data.get("metadata")
1380            && let Some(metadata_obj) = metadata.as_object()
1381            && let Some(quality_val) = metadata_obj.get("quality")
1382        {
1383            if let Some(arr) = quality_val.as_array() {
1384                // Array of quality rules
1385                for item in arr {
1386                    if let Some(obj) = item.as_object() {
1387                        let mut rule = HashMap::new();
1388                        for (key, value) in obj {
1389                            rule.insert(key.clone(), json_value_to_serde_value(value));
1390                        }
1391                        quality_rules.push(rule);
1392                    }
1393                }
1394            } else if let Some(obj) = quality_val.as_object() {
1395                // Single quality rule object
1396                let mut rule = HashMap::new();
1397                for (key, value) in obj {
1398                    rule.insert(key.clone(), json_value_to_serde_value(value));
1399                }
1400                quality_rules.push(rule);
1401            } else if let Some(s) = quality_val.as_str() {
1402                // Simple string quality value
1403                let mut rule = HashMap::new();
1404                rule.insert("value".to_string(), Value::String(s.to_string()));
1405                quality_rules.push(rule);
1406            }
1407        }
1408
1409        // Check for tblproperties field (similar to SQL TBLPROPERTIES)
1410        if let Some(tblprops) = data.get("tblproperties")
1411            && let Some(obj) = tblprops.as_object()
1412        {
1413            for (key, value) in obj {
1414                let mut rule = HashMap::new();
1415                rule.insert("property".to_string(), Value::String(key.clone()));
1416                rule.insert("value".to_string(), json_value_to_serde_value(value));
1417                quality_rules.push(rule);
1418            }
1419        }
1420
1421        quality_rules
1422    }
1423
1424    /// Parse Liquibase YAML changelog format.
1425    ///
1426    /// Extracts the first `createTable` change from a Liquibase changelog.
1427    /// Supports both direct column definitions and nested column structures.
1428    ///
1429    /// # Supported Format
1430    ///
1431    fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1432        // Supported Liquibase YAML changelog format:
1433        // databaseChangeLog:
1434        //   - changeSet:
1435        //       - createTable:
1436        //           tableName: my_table
1437        //           columns:
1438        //             - column:
1439        //                 name: id
1440        //                 type: int
1441        //                 constraints:
1442        //                   primaryKey: true
1443        //                   nullable: false
1444
1445        let mut errors = Vec::new();
1446
1447        let changelog = data
1448            .get("databaseChangeLog")
1449            .and_then(|v| v.as_array())
1450            .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
1451
1452        // Find first createTable change.
1453        let mut table_name: Option<String> = None;
1454        let mut columns: Vec<crate::models::column::Column> = Vec::new();
1455
1456        for entry in changelog {
1457            // Entries are typically maps like { changeSet: [...] } or { changeSet: { changes: [...] } }
1458            if let Some(change_set) = entry.get("changeSet") {
1459                // Liquibase YAML can represent changeSet as map or list.
1460                let changes = if let Some(obj) = change_set.as_object() {
1461                    obj.get("changes")
1462                        .and_then(|v| v.as_array())
1463                        .cloned()
1464                        .unwrap_or_default()
1465                } else if let Some(arr) = change_set.as_array() {
1466                    // Some variants encode changes directly as list entries inside changeSet.
1467                    arr.clone()
1468                } else {
1469                    Vec::new()
1470                };
1471
1472                for ch in changes {
1473                    let create = ch.get("createTable").or_else(|| ch.get("create_table"));
1474                    if let Some(create) = create {
1475                        table_name = create
1476                            .get("tableName")
1477                            .or_else(|| create.get("table_name"))
1478                            .and_then(|v| v.as_str())
1479                            .map(|s| s.to_string());
1480
1481                        // columns: [ { column: { ... } }, ... ]
1482                        if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
1483                            for col_entry in cols {
1484                                let col = col_entry.get("column").unwrap_or(col_entry);
1485                                let name = col
1486                                    .get("name")
1487                                    .and_then(|v| v.as_str())
1488                                    .unwrap_or("")
1489                                    .to_string();
1490                                let data_type = col
1491                                    .get("type")
1492                                    .and_then(|v| v.as_str())
1493                                    .unwrap_or("")
1494                                    .to_string();
1495
1496                                if name.is_empty() {
1497                                    errors.push(ParserError {
1498                                        error_type: "validation_error".to_string(),
1499                                        field: "columns.name".to_string(),
1500                                        message: "Liquibase createTable column missing name"
1501                                            .to_string(),
1502                                    });
1503                                    continue;
1504                                }
1505
1506                                let mut column =
1507                                    crate::models::column::Column::new(name, data_type);
1508
1509                                if let Some(constraints) =
1510                                    col.get("constraints").and_then(|v| v.as_object())
1511                                {
1512                                    if let Some(pk) =
1513                                        constraints.get("primaryKey").and_then(|v| v.as_bool())
1514                                    {
1515                                        column.primary_key = pk;
1516                                    }
1517                                    if let Some(nullable) =
1518                                        constraints.get("nullable").and_then(|v| v.as_bool())
1519                                    {
1520                                        column.nullable = nullable;
1521                                    }
1522                                }
1523
1524                                columns.push(column);
1525                            }
1526                        }
1527
1528                        // parse_table() returns a single Table, so we parse the first createTable.
1529                        // If multiple tables are needed, call parse_table() multiple times or use import().
1530                        break;
1531                    }
1532                }
1533            }
1534            if table_name.is_some() {
1535                break;
1536            }
1537        }
1538
1539        let table_name = table_name
1540            .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
1541        let table = Table::new(table_name, columns);
1542        // Preserve any errors collected.
1543        Ok((table, errors))
1544    }
1545
1546    /// Parse ODCS v3.0.x format.
1547    fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1548        let mut errors = Vec::new();
1549
1550        // Extract table name
1551        let table_name = data
1552            .get("name")
1553            .and_then(|v| v.as_str())
1554            .map(|s| s.to_string())
1555            .or_else(|| {
1556                // Try to get from schema array
1557                data.get("schema")
1558                    .and_then(|v| v.as_array())
1559                    .and_then(|arr| arr.first())
1560                    .and_then(|obj| obj.as_object())
1561                    .and_then(|obj| obj.get("name"))
1562                    .and_then(|v| v.as_str())
1563                    .map(|s| s.to_string())
1564            })
1565            .ok_or_else(|| {
1566                anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
1567            })?;
1568
1569        // Extract schema - ODCS v3.0.x uses array of SchemaObject
1570        let schema = data
1571            .get("schema")
1572            .and_then(|v| v.as_array())
1573            .ok_or_else(|| {
1574                errors.push(ParserError {
1575                    error_type: "validation_error".to_string(),
1576                    field: "schema".to_string(),
1577                    message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
1578                });
1579                anyhow::anyhow!("Missing schema")
1580            });
1581
1582        let schema = match schema {
1583            Ok(s) if s.is_empty() => {
1584                errors.push(ParserError {
1585                    error_type: "validation_error".to_string(),
1586                    field: "schema".to_string(),
1587                    message: "ODCS v3.0.x schema array is empty".to_string(),
1588                });
1589                let quality_rules = self.extract_quality_rules(data);
1590                let table_uuid = self.extract_table_uuid(data);
1591                let table = Table {
1592                    id: table_uuid,
1593                    name: table_name,
1594                    columns: Vec::new(),
1595                    database_type: None,
1596                    catalog_name: None,
1597                    schema_name: None,
1598                    medallion_layers: Vec::new(),
1599                    scd_pattern: None,
1600                    data_vault_classification: None,
1601                    modeling_level: None,
1602                    tags: Vec::<Tag>::new(),
1603                    odcl_metadata: HashMap::new(),
1604                    owner: None,
1605                    sla: None,
1606                    contact_details: None,
1607                    infrastructure_type: None,
1608                    notes: None,
1609                    position: None,
1610                    yaml_file_path: None,
1611                    drawio_cell_id: None,
1612                    quality: quality_rules,
1613                    errors: Vec::new(),
1614                    created_at: chrono::Utc::now(),
1615                    updated_at: chrono::Utc::now(),
1616                };
1617                return Ok((table, errors));
1618            }
1619            Ok(s) => s,
1620            Err(_) => {
1621                let quality_rules = self.extract_quality_rules(data);
1622                let table_uuid = self.extract_table_uuid(data);
1623                let table = Table {
1624                    id: table_uuid,
1625                    name: table_name,
1626                    columns: Vec::new(),
1627                    database_type: None,
1628                    catalog_name: None,
1629                    schema_name: None,
1630                    medallion_layers: Vec::new(),
1631                    scd_pattern: None,
1632                    data_vault_classification: None,
1633                    modeling_level: None,
1634                    tags: Vec::<Tag>::new(),
1635                    odcl_metadata: HashMap::new(),
1636                    owner: None,
1637                    sla: None,
1638                    contact_details: None,
1639                    infrastructure_type: None,
1640                    notes: None,
1641                    position: None,
1642                    yaml_file_path: None,
1643                    drawio_cell_id: None,
1644                    quality: quality_rules,
1645                    errors: Vec::new(),
1646                    created_at: chrono::Utc::now(),
1647                    updated_at: chrono::Utc::now(),
1648                };
1649                return Ok((table, errors));
1650            }
1651        };
1652
1653        // Get the first schema object (table)
1654        let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
1655            errors.push(ParserError {
1656                error_type: "validation_error".to_string(),
1657                field: "schema[0]".to_string(),
1658                message: "First schema object must be a dictionary".to_string(),
1659            });
1660            anyhow::anyhow!("Invalid schema object")
1661        })?;
1662
1663        let object_name = schema_object
1664            .get("name")
1665            .and_then(|v| v.as_str())
1666            .unwrap_or(&table_name);
1667
1668        // Handle both object format (v3.0.x) and array format (v3.1.0) for properties
1669        let mut columns = Vec::new();
1670
1671        if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
1672            // Object format (v3.0.x): properties is a map of name -> property object
1673            for (prop_name, prop_data) in properties_obj {
1674                if let Some(prop_obj) = prop_data.as_object() {
1675                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
1676                        Ok(mut cols) => columns.append(&mut cols),
1677                        Err(e) => {
1678                            errors.push(ParserError {
1679                                error_type: "property_parse_error".to_string(),
1680                                field: format!("Property '{}'", prop_name),
1681                                message: e.to_string(),
1682                            });
1683                        }
1684                    }
1685                } else {
1686                    errors.push(ParserError {
1687                        error_type: "validation_error".to_string(),
1688                        field: format!("Property '{}'", prop_name),
1689                        message: format!("Property '{}' must be an object", prop_name),
1690                    });
1691                }
1692            }
1693        } else if let Some(properties_arr) =
1694            schema_object.get("properties").and_then(|v| v.as_array())
1695        {
1696            // Array format (v3.1.0): properties is an array of property objects with 'name' field
1697            for (idx, prop_data) in properties_arr.iter().enumerate() {
1698                if let Some(prop_obj) = prop_data.as_object() {
1699                    // Extract name from property object (required in v3.1.0)
1700                    // ODCS v3.1.0 requires 'name' field, but we'll also accept 'id' as fallback
1701                    let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
1702                        Some(JsonValue::String(s)) => s.as_str(),
1703                        _ => {
1704                            // Skip properties without name or id (not valid ODCS v3.1.0)
1705                            errors.push(ParserError {
1706                                error_type: "validation_error".to_string(),
1707                                field: format!("Property[{}]", idx),
1708                                message: format!(
1709                                    "Property[{}] missing required 'name' or 'id' field",
1710                                    idx
1711                                ),
1712                            });
1713                            continue;
1714                        }
1715                    };
1716
1717                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
1718                        Ok(mut cols) => columns.append(&mut cols),
1719                        Err(e) => {
1720                            errors.push(ParserError {
1721                                error_type: "property_parse_error".to_string(),
1722                                field: format!("Property[{}] '{}'", idx, prop_name),
1723                                message: e.to_string(),
1724                            });
1725                        }
1726                    }
1727                } else {
1728                    errors.push(ParserError {
1729                        error_type: "validation_error".to_string(),
1730                        field: format!("Property[{}]", idx),
1731                        message: format!("Property[{}] must be an object", idx),
1732                    });
1733                }
1734            }
1735        } else {
1736            errors.push(ParserError {
1737                error_type: "validation_error".to_string(),
1738                field: format!("Object '{}'", object_name),
1739                message: format!(
1740                    "Object '{}' missing 'properties' field or properties is invalid",
1741                    object_name
1742                ),
1743            });
1744        }
1745
1746        // Extract metadata from customProperties
1747        let (medallion_layers, scd_pattern, data_vault_classification, mut tags): (
1748            _,
1749            _,
1750            _,
1751            Vec<Tag>,
1752        ) = self.extract_metadata_from_custom_properties(data);
1753
1754        // Extract sharedDomains from customProperties
1755        let mut shared_domains: Vec<String> = Vec::new();
1756        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1757            for prop in custom_props {
1758                if let Some(prop_obj) = prop.as_object() {
1759                    let prop_key = prop_obj
1760                        .get("property")
1761                        .and_then(|v| v.as_str())
1762                        .unwrap_or("");
1763                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1764                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1765                    {
1766                        for item in arr {
1767                            if let Some(s) = item.as_str() {
1768                                shared_domains.push(s.to_string());
1769                            }
1770                        }
1771                    }
1772                }
1773            }
1774        }
1775
1776        // Extract tags from top-level tags field (if not already extracted from customProperties)
1777        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1778            for item in tags_arr {
1779                if let Some(s) = item.as_str() {
1780                    // Parse tag string to Tag enum
1781                    let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
1782                    if !tags.contains(&tag) {
1783                        tags.push(tag);
1784                    }
1785                }
1786            }
1787        }
1788
1789        // Extract database type from servers
1790        let database_type = self.extract_database_type_from_odcl_v3_servers(data);
1791
1792        // Extract quality rules
1793        let quality_rules = self.extract_quality_rules(data);
1794
1795        // Build ODCL metadata
1796        let mut odcl_metadata = HashMap::new();
1797        odcl_metadata.insert(
1798            "apiVersion".to_string(),
1799            json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
1800        );
1801        odcl_metadata.insert(
1802            "kind".to_string(),
1803            json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
1804        );
1805        odcl_metadata.insert(
1806            "id".to_string(),
1807            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1808        );
1809        odcl_metadata.insert(
1810            "version".to_string(),
1811            json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
1812        );
1813        odcl_metadata.insert(
1814            "status".to_string(),
1815            json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
1816        );
1817
1818        // Extract servicelevels if present
1819        if let Some(servicelevels_val) = data.get("servicelevels") {
1820            odcl_metadata.insert(
1821                "servicelevels".to_string(),
1822                json_value_to_serde_value(servicelevels_val),
1823            );
1824        }
1825
1826        // Extract links if present
1827        if let Some(links_val) = data.get("links") {
1828            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1829        }
1830
1831        // Extract domain, dataProduct, tenant
1832        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1833            odcl_metadata.insert(
1834                "domain".to_string(),
1835                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1836            );
1837        }
1838        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1839            odcl_metadata.insert(
1840                "dataProduct".to_string(),
1841                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1842            );
1843        }
1844        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1845            odcl_metadata.insert(
1846                "tenant".to_string(),
1847                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1848            );
1849        }
1850
1851        // Extract top-level description (can be object or string)
1852        if let Some(desc_val) = data.get("description") {
1853            odcl_metadata.insert(
1854                "description".to_string(),
1855                json_value_to_serde_value(desc_val),
1856            );
1857        }
1858
1859        // Extract pricing
1860        if let Some(pricing_val) = data.get("pricing") {
1861            odcl_metadata.insert(
1862                "pricing".to_string(),
1863                json_value_to_serde_value(pricing_val),
1864            );
1865        }
1866
1867        // Extract team
1868        if let Some(team_val) = data.get("team") {
1869            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1870        }
1871
1872        // Extract roles
1873        if let Some(roles_val) = data.get("roles") {
1874            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1875        }
1876
1877        // Extract terms
1878        if let Some(terms_val) = data.get("terms") {
1879            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1880        }
1881
1882        // Extract full servers array (not just type)
1883        if let Some(servers_val) = data.get("servers") {
1884            odcl_metadata.insert(
1885                "servers".to_string(),
1886                json_value_to_serde_value(servers_val),
1887            );
1888        }
1889
1890        // Extract infrastructure
1891        if let Some(infrastructure_val) = data.get("infrastructure") {
1892            odcl_metadata.insert(
1893                "infrastructure".to_string(),
1894                json_value_to_serde_value(infrastructure_val),
1895            );
1896        }
1897
1898        // Store sharedDomains in metadata (extracted from customProperties above)
1899        if !shared_domains.is_empty() {
1900            let shared_domains_json: Vec<serde_json::Value> = shared_domains
1901                .iter()
1902                .map(|d| serde_json::Value::String(d.clone()))
1903                .collect();
1904            odcl_metadata.insert(
1905                "sharedDomains".to_string(),
1906                serde_json::Value::Array(shared_domains_json),
1907            );
1908        }
1909
1910        let table_uuid = self.extract_table_uuid(data);
1911
1912        let table = Table {
1913            id: table_uuid,
1914            name: table_name,
1915            columns,
1916            database_type,
1917            catalog_name: None,
1918            schema_name: None,
1919            medallion_layers,
1920            scd_pattern,
1921            data_vault_classification,
1922            modeling_level: None,
1923            tags,
1924            odcl_metadata,
1925            owner: None,
1926            sla: None,
1927            contact_details: None,
1928            infrastructure_type: None,
1929            notes: None,
1930            position: None,
1931            yaml_file_path: None,
1932            drawio_cell_id: None,
1933            quality: quality_rules,
1934            errors: Vec::new(),
1935            created_at: chrono::Utc::now(),
1936            updated_at: chrono::Utc::now(),
1937        };
1938
1939        info!(
1940            "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1941            table.name,
1942            errors.len()
1943        );
1944        Ok((table, errors))
1945    }
1946
1947    /// Parse ALL tables from ODCS v3.0.x/v3.1.0 format schema array.
1948    ///
1949    /// Unlike `parse_odcl_v3` which only returns the first table, this method
1950    /// iterates over all schema objects and returns a vector of tables.
1951    ///
1952    /// # Returns
1953    ///
1954    /// Returns a vector of (Table, Vec<ParserError>) tuples, one for each schema object.
1955    fn parse_odcl_v3_all(&self, data: &JsonValue) -> Result<Vec<(Table, Vec<ParserError>)>> {
1956        let mut all_tables = Vec::new();
1957
1958        // Extract schema array - ODCS v3.0.x/v3.1.0 uses array of SchemaObject
1959        let schema = match data.get("schema").and_then(|v| v.as_array()) {
1960            Some(s) if !s.is_empty() => s,
1961            Some(_) => {
1962                // Empty schema array - return empty result
1963                return Ok(Vec::new());
1964            }
1965            None => {
1966                // No schema field - return empty result
1967                return Ok(Vec::new());
1968            }
1969        };
1970
1971        // Extract contract-level metadata (shared across all tables)
1972        let (medallion_layers, scd_pattern, data_vault_classification, contract_tags): (
1973            _,
1974            _,
1975            _,
1976            Vec<Tag>,
1977        ) = self.extract_metadata_from_custom_properties(data);
1978
1979        // Extract database type from servers (shared)
1980        let database_type = self.extract_database_type_from_odcl_v3_servers(data);
1981
1982        // Extract quality rules (contract-level, shared)
1983        let contract_quality_rules = self.extract_quality_rules(data);
1984
1985        // Build base ODCL metadata (shared across all tables)
1986        let base_odcl_metadata = self.build_odcl_metadata(data);
1987
1988        // Parse each schema object as a separate table
1989        for (schema_idx, schema_value) in schema.iter().enumerate() {
1990            let mut errors = Vec::new();
1991
1992            let schema_object = match schema_value.as_object() {
1993                Some(obj) => obj,
1994                None => {
1995                    errors.push(ParserError {
1996                        error_type: "validation_error".to_string(),
1997                        field: format!("schema[{}]", schema_idx),
1998                        message: format!(
1999                            "Schema object at index {} must be a dictionary",
2000                            schema_idx
2001                        ),
2002                    });
2003                    continue;
2004                }
2005            };
2006
2007            // Get table name from schema object
2008            let table_name = match schema_object.get("name").and_then(|v| v.as_str()) {
2009                Some(name) => name.to_string(),
2010                None => {
2011                    errors.push(ParserError {
2012                        error_type: "validation_error".to_string(),
2013                        field: format!("schema[{}].name", schema_idx),
2014                        message: format!(
2015                            "Schema object at index {} missing 'name' field",
2016                            schema_idx
2017                        ),
2018                    });
2019                    continue;
2020                }
2021            };
2022
2023            // Parse columns from properties
2024            let columns = self.parse_schema_object_properties(schema_object, data, &mut errors);
2025
2026            // Extract table-specific tags (merge with contract tags)
2027            let mut tags = contract_tags.clone();
2028            if let Some(tags_arr) = schema_object.get("tags").and_then(|v| v.as_array()) {
2029                for item in tags_arr {
2030                    if let Some(s) = item.as_str() {
2031                        let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
2032                        if !tags.contains(&tag) {
2033                            tags.push(tag);
2034                        }
2035                    }
2036                }
2037            }
2038
2039            // Extract table-specific quality rules (merge with contract rules)
2040            let mut quality_rules = contract_quality_rules.clone();
2041            if let Some(quality_arr) = schema_object.get("quality").and_then(|v| v.as_array()) {
2042                for rule in quality_arr {
2043                    if let Some(rule_obj) = rule.as_object() {
2044                        let mut rule_map = HashMap::new();
2045                        for (k, v) in rule_obj {
2046                            rule_map.insert(k.clone(), json_value_to_serde_value(v));
2047                        }
2048                        quality_rules.push(rule_map);
2049                    }
2050                }
2051            }
2052
2053            // Generate UUID for this table
2054            // First check if schema object has its own id
2055            let table_uuid = if let Some(id_str) = schema_object.get("id").and_then(|v| v.as_str())
2056            {
2057                uuid::Uuid::parse_str(id_str)
2058                    .unwrap_or_else(|_| Table::generate_id(&table_name, None, None, None))
2059            } else {
2060                // For multi-table ODCS, generate deterministic UUID from table name
2061                Table::generate_id(&table_name, None, None, None)
2062            };
2063
2064            // Clone metadata for this table
2065            let mut odcl_metadata = base_odcl_metadata.clone();
2066
2067            // Add schema-object-specific metadata (ODCS v3.1.0 schema-level fields)
2068            if let Some(desc) = schema_object.get("description") {
2069                odcl_metadata.insert(
2070                    "schemaDescription".to_string(),
2071                    json_value_to_serde_value(desc),
2072                );
2073            }
2074
2075            // physicalName at schema object level
2076            if let Some(phys_name) = schema_object.get("physicalName").and_then(|v| v.as_str()) {
2077                odcl_metadata.insert(
2078                    "schemaPhysicalName".to_string(),
2079                    serde_json::Value::String(phys_name.to_string()),
2080                );
2081            }
2082
2083            // physicalType at schema object level
2084            if let Some(phys_type) = schema_object.get("physicalType").and_then(|v| v.as_str()) {
2085                odcl_metadata.insert(
2086                    "schemaPhysicalType".to_string(),
2087                    serde_json::Value::String(phys_type.to_string()),
2088                );
2089            }
2090
2091            // businessName at schema object level
2092            if let Some(biz_name) = schema_object.get("businessName").and_then(|v| v.as_str()) {
2093                odcl_metadata.insert(
2094                    "schemaBusinessName".to_string(),
2095                    serde_json::Value::String(biz_name.to_string()),
2096                );
2097            }
2098
2099            // dataGranularityDescription at schema object level
2100            if let Some(granularity) = schema_object
2101                .get("dataGranularityDescription")
2102                .and_then(|v| v.as_str())
2103            {
2104                odcl_metadata.insert(
2105                    "dataGranularityDescription".to_string(),
2106                    serde_json::Value::String(granularity.to_string()),
2107                );
2108            }
2109
2110            // authoritativeDefinitions at schema object level
2111            if let Some(auth_defs) = schema_object.get("authoritativeDefinitions") {
2112                odcl_metadata.insert(
2113                    "schemaAuthoritativeDefinitions".to_string(),
2114                    json_value_to_serde_value(auth_defs),
2115                );
2116            }
2117
2118            // relationships at schema object level
2119            if let Some(relationships) = schema_object.get("relationships") {
2120                odcl_metadata.insert(
2121                    "schemaRelationships".to_string(),
2122                    json_value_to_serde_value(relationships),
2123                );
2124            }
2125
2126            // customProperties at schema object level (ODCS v3.1.0)
2127            if let Some(custom_props) = schema_object.get("customProperties") {
2128                odcl_metadata.insert(
2129                    "schemaCustomProperties".to_string(),
2130                    json_value_to_serde_value(custom_props),
2131                );
2132            }
2133
2134            let table = Table {
2135                id: table_uuid,
2136                name: table_name.clone(),
2137                columns,
2138                database_type,
2139                catalog_name: None,
2140                schema_name: None,
2141                medallion_layers: medallion_layers.clone(),
2142                scd_pattern,
2143                data_vault_classification,
2144                modeling_level: None,
2145                tags,
2146                odcl_metadata,
2147                owner: None,
2148                sla: None,
2149                contact_details: None,
2150                infrastructure_type: None,
2151                notes: None,
2152                position: None,
2153                yaml_file_path: None,
2154                drawio_cell_id: None,
2155                quality: quality_rules,
2156                errors: Vec::new(),
2157                created_at: chrono::Utc::now(),
2158                updated_at: chrono::Utc::now(),
2159            };
2160
2161            info!(
2162                "Parsed ODCS v3 table {}/{}: {} with {} columns, {} warnings/errors",
2163                schema_idx + 1,
2164                schema.len(),
2165                table.name,
2166                table.columns.len(),
2167                errors.len()
2168            );
2169
2170            all_tables.push((table, errors));
2171        }
2172
2173        Ok(all_tables)
2174    }
2175
2176    /// Parse properties from a schema object into columns.
2177    fn parse_schema_object_properties(
2178        &self,
2179        schema_object: &serde_json::Map<String, JsonValue>,
2180        data: &JsonValue,
2181        errors: &mut Vec<ParserError>,
2182    ) -> Vec<Column> {
2183        let mut columns = Vec::new();
2184
2185        if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
2186            // Object format (v3.0.x): properties is a map of name -> property object
2187            for (prop_name, prop_data) in properties_obj {
2188                if let Some(prop_obj) = prop_data.as_object() {
2189                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
2190                        Ok(mut cols) => columns.append(&mut cols),
2191                        Err(e) => {
2192                            errors.push(ParserError {
2193                                error_type: "property_parse_error".to_string(),
2194                                field: format!("Property '{}'", prop_name),
2195                                message: e.to_string(),
2196                            });
2197                        }
2198                    }
2199                } else {
2200                    errors.push(ParserError {
2201                        error_type: "validation_error".to_string(),
2202                        field: format!("Property '{}'", prop_name),
2203                        message: format!("Property '{}' must be an object", prop_name),
2204                    });
2205                }
2206            }
2207        } else if let Some(properties_arr) =
2208            schema_object.get("properties").and_then(|v| v.as_array())
2209        {
2210            // Array format (v3.1.0): properties is an array of property objects with 'name' field
2211            for (idx, prop_data) in properties_arr.iter().enumerate() {
2212                if let Some(prop_obj) = prop_data.as_object() {
2213                    let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
2214                        Some(JsonValue::String(s)) => s.as_str(),
2215                        _ => {
2216                            errors.push(ParserError {
2217                                error_type: "validation_error".to_string(),
2218                                field: format!("Property[{}]", idx),
2219                                message: format!(
2220                                    "Property[{}] missing required 'name' or 'id' field",
2221                                    idx
2222                                ),
2223                            });
2224                            continue;
2225                        }
2226                    };
2227
2228                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
2229                        Ok(mut cols) => columns.append(&mut cols),
2230                        Err(e) => {
2231                            errors.push(ParserError {
2232                                error_type: "property_parse_error".to_string(),
2233                                field: format!("Property[{}] '{}'", idx, prop_name),
2234                                message: e.to_string(),
2235                            });
2236                        }
2237                    }
2238                } else {
2239                    errors.push(ParserError {
2240                        error_type: "validation_error".to_string(),
2241                        field: format!("Property[{}]", idx),
2242                        message: format!("Property[{}] must be an object", idx),
2243                    });
2244                }
2245            }
2246        }
2247
2248        columns
2249    }
2250
2251    /// Build base ODCL metadata from contract-level fields.
2252    fn build_odcl_metadata(&self, data: &JsonValue) -> HashMap<String, serde_json::Value> {
2253        let mut odcl_metadata = HashMap::new();
2254
2255        // Core identity fields
2256        odcl_metadata.insert(
2257            "apiVersion".to_string(),
2258            json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
2259        );
2260        odcl_metadata.insert(
2261            "kind".to_string(),
2262            json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
2263        );
2264        odcl_metadata.insert(
2265            "id".to_string(),
2266            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
2267        );
2268        odcl_metadata.insert(
2269            "version".to_string(),
2270            json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
2271        );
2272        odcl_metadata.insert(
2273            "status".to_string(),
2274            json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
2275        );
2276
2277        // Optional contract-level fields
2278        let optional_fields = [
2279            "servicelevels",
2280            "links",
2281            "domain",
2282            "dataProduct",
2283            "tenant",
2284            "description",
2285            "pricing",
2286            "team",
2287            "roles",
2288            "terms",
2289            "servers",
2290            "infrastructure",
2291        ];
2292
2293        for field in optional_fields {
2294            if let Some(val) = data.get(field) {
2295                odcl_metadata.insert(field.to_string(), json_value_to_serde_value(val));
2296            }
2297        }
2298
2299        // Extract sharedDomains from customProperties
2300        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2301            for prop in custom_props {
2302                if let Some(prop_obj) = prop.as_object() {
2303                    let prop_key = prop_obj
2304                        .get("property")
2305                        .and_then(|v| v.as_str())
2306                        .unwrap_or("");
2307                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
2308                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
2309                    {
2310                        let shared_domains: Vec<serde_json::Value> = arr
2311                            .iter()
2312                            .filter_map(|item| {
2313                                item.as_str()
2314                                    .map(|s| serde_json::Value::String(s.to_string()))
2315                            })
2316                            .collect();
2317                        if !shared_domains.is_empty() {
2318                            odcl_metadata.insert(
2319                                "sharedDomains".to_string(),
2320                                serde_json::Value::Array(shared_domains),
2321                            );
2322                        }
2323                    }
2324                }
2325            }
2326        }
2327
2328        odcl_metadata
2329    }
2330
2331    /// Parse a single property from ODCS v3.0.x format (similar to Data Contract field).
2332    fn parse_odcl_v3_property(
2333        &self,
2334        prop_name: &str,
2335        prop_data: &serde_json::Map<String, JsonValue>,
2336        data: &JsonValue,
2337    ) -> Result<Vec<Column>> {
2338        // Reuse Data Contract field parsing logic (they're similar)
2339        let mut errors = Vec::new();
2340        self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
2341    }
2342
2343    /// Extract table UUID from ODCS `id` field (standard) or fallback to customProperties/odcl_metadata (backward compatibility).
2344    /// Returns the UUID if found, otherwise generates a new one.
2345    fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
2346        // First check the top-level `id` field (ODCS spec: "A unique identifier used to reduce the risk of dataset name collisions, such as a UUID.")
2347        if let Some(id_val) = data.get("id")
2348            && let Some(id_str) = id_val.as_str()
2349        {
2350            if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
2351                tracing::debug!(
2352                    "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
2353                    uuid
2354                );
2355                return uuid;
2356            } else {
2357                tracing::warn!(
2358                    "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
2359                    id_str
2360                );
2361            }
2362        }
2363
2364        // Backward compatibility: check customProperties for tableUuid (legacy format)
2365        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2366            for prop in custom_props {
2367                if let Some(prop_obj) = prop.as_object() {
2368                    let prop_key = prop_obj
2369                        .get("property")
2370                        .and_then(|v| v.as_str())
2371                        .unwrap_or("");
2372                    if prop_key == "tableUuid"
2373                        && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
2374                        && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
2375                    {
2376                        tracing::debug!(
2377                            "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
2378                            uuid
2379                        );
2380                        return uuid;
2381                    }
2382                }
2383            }
2384        }
2385
2386        // Fallback: check odcl_metadata if present (legacy format)
2387        if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
2388            && let Some(uuid_val) = metadata.get("tableUuid")
2389            && let Some(uuid_str) = uuid_val.as_str()
2390            && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
2391        {
2392            tracing::debug!(
2393                "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
2394                uuid
2395            );
2396            return uuid;
2397        }
2398
2399        // Generate deterministic UUID v5 if not found (based on table name)
2400        let table_name = data
2401            .get("name")
2402            .and_then(|v| v.as_str())
2403            .unwrap_or("unknown");
2404        let new_uuid = crate::models::table::Table::generate_id(
2405            table_name, None, // database_type not available here
2406            None, // catalog_name not available here
2407            None, // schema_name not available here
2408        );
2409        tracing::warn!(
2410            "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
2411            table_name,
2412            new_uuid
2413        );
2414        new_uuid
2415    }
2416
2417    /// Extract metadata from customProperties in ODCS v3.0.x format.
2418    fn extract_metadata_from_custom_properties(
2419        &self,
2420        data: &JsonValue,
2421    ) -> (
2422        Vec<MedallionLayer>,
2423        Option<SCDPattern>,
2424        Option<DataVaultClassification>,
2425        Vec<Tag>,
2426    ) {
2427        let mut medallion_layers = Vec::new();
2428        let mut scd_pattern = None;
2429        let mut data_vault_classification = None;
2430        let mut tags: Vec<Tag> = Vec::new();
2431
2432        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2433            for prop in custom_props {
2434                if let Some(prop_obj) = prop.as_object() {
2435                    let prop_key = prop_obj
2436                        .get("property")
2437                        .and_then(|v| v.as_str())
2438                        .unwrap_or("");
2439                    let prop_value = prop_obj.get("value");
2440
2441                    match prop_key {
2442                        "medallionLayers" | "medallion_layers" => {
2443                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
2444                                for item in arr {
2445                                    if let Some(s) = item.as_str()
2446                                        && let Ok(layer) = parse_medallion_layer(s)
2447                                    {
2448                                        medallion_layers.push(layer);
2449                                    }
2450                                }
2451                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2452                                // Comma-separated string
2453                                for part in s.split(',') {
2454                                    if let Ok(layer) = parse_medallion_layer(part.trim()) {
2455                                        medallion_layers.push(layer);
2456                                    }
2457                                }
2458                            }
2459                        }
2460                        "scdPattern" | "scd_pattern" => {
2461                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2462                                scd_pattern = parse_scd_pattern(s).ok();
2463                            }
2464                        }
2465                        "dataVaultClassification" | "data_vault_classification" => {
2466                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2467                                data_vault_classification = parse_data_vault_classification(s).ok();
2468                            }
2469                        }
2470                        "tags" => {
2471                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
2472                                for item in arr {
2473                                    if let Some(s) = item.as_str() {
2474                                        // Parse tag string to Tag enum
2475                                        if let Ok(tag) = Tag::from_str(s) {
2476                                            tags.push(tag);
2477                                        } else {
2478                                            tags.push(Tag::Simple(s.to_string()));
2479                                        }
2480                                    }
2481                                }
2482                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2483                                // Comma-separated string
2484                                for part in s.split(',') {
2485                                    let part = part.trim();
2486                                    if let Ok(tag) = Tag::from_str(part) {
2487                                        tags.push(tag);
2488                                    } else {
2489                                        tags.push(Tag::Simple(part.to_string()));
2490                                    }
2491                                }
2492                            }
2493                        }
2494                        "sharedDomains" | "shared_domains" => {
2495                            // sharedDomains will be stored in metadata by the caller
2496                            // This match is here for completeness but sharedDomains is handled separately
2497                        }
2498                        _ => {}
2499                    }
2500                }
2501            }
2502        }
2503
2504        // Also extract tags from top-level tags field
2505        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
2506            for item in tags_arr {
2507                if let Some(s) = item.as_str() {
2508                    // Parse tag string to Tag enum
2509                    let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
2510                    if !tags.contains(&tag) {
2511                        tags.push(tag);
2512                    }
2513                }
2514            }
2515        }
2516
2517        (
2518            medallion_layers,
2519            scd_pattern,
2520            data_vault_classification,
2521            tags,
2522        )
2523    }
2524
2525    /// Extract database type from servers in ODCS v3.0.x format.
2526    fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2527        // ODCS v3.0.x: servers is an array of Server objects
2528        if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
2529            && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
2530        {
2531            return server_obj
2532                .get("type")
2533                .and_then(|v| v.as_str())
2534                .and_then(|s| self.parse_database_type(s));
2535        }
2536        None
2537    }
2538
2539    /// Parse Data Contract format.
2540    fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
2541        let mut errors = Vec::new();
2542
2543        // Extract models
2544        let models = data
2545            .get("models")
2546            .and_then(|v| v.as_object())
2547            .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
2548
2549        // parse_table() returns a single Table, so we parse the first model.
2550        // If multiple models are needed, call parse_table() multiple times or use import().
2551        let (model_name, model_data) = models
2552            .iter()
2553            .next()
2554            .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
2555
2556        let model_data = model_data
2557            .as_object()
2558            .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
2559
2560        // Extract fields (columns)
2561        let fields = model_data
2562            .get("fields")
2563            .and_then(|v| v.as_object())
2564            .ok_or_else(|| {
2565                errors.push(ParserError {
2566                    error_type: "validation_error".to_string(),
2567                    field: format!("Model '{}'", model_name),
2568                    message: format!("Model '{}' missing 'fields' field", model_name),
2569                });
2570                anyhow::anyhow!("Missing fields")
2571            });
2572
2573        let fields = match fields {
2574            Ok(f) => f,
2575            Err(_) => {
2576                // Return empty table with errors
2577                let quality_rules = self.extract_quality_rules(data);
2578                let table_uuid = self.extract_table_uuid(data);
2579                let table = Table {
2580                    id: table_uuid,
2581                    name: model_name.clone(),
2582                    columns: Vec::new(),
2583                    database_type: None,
2584                    catalog_name: None,
2585                    schema_name: None,
2586                    medallion_layers: Vec::new(),
2587                    scd_pattern: None,
2588                    data_vault_classification: None,
2589                    modeling_level: None,
2590                    tags: Vec::<Tag>::new(),
2591                    odcl_metadata: HashMap::new(),
2592                    owner: None,
2593                    sla: None,
2594                    contact_details: None,
2595                    infrastructure_type: None,
2596                    notes: None,
2597                    position: None,
2598                    yaml_file_path: None,
2599                    drawio_cell_id: None,
2600                    quality: quality_rules,
2601                    errors: Vec::new(),
2602                    created_at: chrono::Utc::now(),
2603                    updated_at: chrono::Utc::now(),
2604                };
2605                return Ok((table, errors));
2606            }
2607        };
2608
2609        // Parse fields as columns
2610        let mut columns = Vec::new();
2611        for (field_name, field_data) in fields {
2612            if let Some(field_obj) = field_data.as_object() {
2613                match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
2614                    Ok(mut cols) => columns.append(&mut cols),
2615                    Err(e) => {
2616                        errors.push(ParserError {
2617                            error_type: "field_parse_error".to_string(),
2618                            field: format!("Field '{}'", field_name),
2619                            message: e.to_string(),
2620                        });
2621                    }
2622                }
2623            } else {
2624                errors.push(ParserError {
2625                    error_type: "validation_error".to_string(),
2626                    field: format!("Field '{}'", field_name),
2627                    message: format!("Field '{}' must be an object", field_name),
2628                });
2629            }
2630        }
2631
2632        // Extract metadata from info section
2633        // The frontend expects info fields to be nested under "info" key
2634        let mut odcl_metadata = HashMap::new();
2635
2636        // Extract info section and nest it properly
2637        // Convert JsonValue object to serde_json::Value::Object (Map)
2638        if let Some(info_val) = data.get("info") {
2639            // Convert JsonValue to serde_json::Value
2640            let info_json_value = json_value_to_serde_value(info_val);
2641            odcl_metadata.insert("info".to_string(), info_json_value);
2642        }
2643
2644        odcl_metadata.insert(
2645            "dataContractSpecification".to_string(),
2646            json_value_to_serde_value(
2647                data.get("dataContractSpecification")
2648                    .unwrap_or(&JsonValue::Null),
2649            ),
2650        );
2651        odcl_metadata.insert(
2652            "id".to_string(),
2653            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
2654        );
2655        // Note: model_name is not added to metadata as it's redundant (it's the table name)
2656
2657        // Extract servicelevels if present
2658        if let Some(servicelevels_val) = data.get("servicelevels") {
2659            odcl_metadata.insert(
2660                "servicelevels".to_string(),
2661                json_value_to_serde_value(servicelevels_val),
2662            );
2663        }
2664
2665        // Extract links if present
2666        if let Some(links_val) = data.get("links") {
2667            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
2668        }
2669
2670        // Extract domain, dataProduct, tenant
2671        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
2672            odcl_metadata.insert(
2673                "domain".to_string(),
2674                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
2675            );
2676        }
2677        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
2678            odcl_metadata.insert(
2679                "dataProduct".to_string(),
2680                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
2681            );
2682        }
2683        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
2684            odcl_metadata.insert(
2685                "tenant".to_string(),
2686                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
2687            );
2688        }
2689
2690        // Extract top-level description (can be object or string)
2691        if let Some(desc_val) = data.get("description") {
2692            odcl_metadata.insert(
2693                "description".to_string(),
2694                json_value_to_serde_value(desc_val),
2695            );
2696        }
2697
2698        // Extract pricing
2699        if let Some(pricing_val) = data.get("pricing") {
2700            odcl_metadata.insert(
2701                "pricing".to_string(),
2702                json_value_to_serde_value(pricing_val),
2703            );
2704        }
2705
2706        // Extract team
2707        if let Some(team_val) = data.get("team") {
2708            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
2709        }
2710
2711        // Extract roles
2712        if let Some(roles_val) = data.get("roles") {
2713            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
2714        }
2715
2716        // Extract terms
2717        if let Some(terms_val) = data.get("terms") {
2718            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
2719        }
2720
2721        // Extract full servers array (not just type)
2722        if let Some(servers_val) = data.get("servers") {
2723            odcl_metadata.insert(
2724                "servers".to_string(),
2725                json_value_to_serde_value(servers_val),
2726            );
2727        }
2728
2729        // Extract infrastructure
2730        if let Some(infrastructure_val) = data.get("infrastructure") {
2731            odcl_metadata.insert(
2732                "infrastructure".to_string(),
2733                json_value_to_serde_value(infrastructure_val),
2734            );
2735        }
2736
2737        // Extract database type from servers if available
2738        let database_type = self.extract_database_type_from_servers(data);
2739
2740        // Extract catalog and schema from customProperties
2741        let (catalog_name, schema_name) = self.extract_catalog_schema(data);
2742
2743        // Extract sharedDomains from customProperties
2744        let mut shared_domains: Vec<String> = Vec::new();
2745        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2746            for prop in custom_props {
2747                if let Some(prop_obj) = prop.as_object() {
2748                    let prop_key = prop_obj
2749                        .get("property")
2750                        .and_then(|v| v.as_str())
2751                        .unwrap_or("");
2752                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
2753                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
2754                    {
2755                        for item in arr {
2756                            if let Some(s) = item.as_str() {
2757                                shared_domains.push(s.to_string());
2758                            }
2759                        }
2760                    }
2761                }
2762            }
2763        }
2764
2765        // Extract tags from top-level tags field (Data Contract format)
2766        let mut tags: Vec<Tag> = Vec::new();
2767        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
2768            for item in tags_arr {
2769                if let Some(s) = item.as_str() {
2770                    // Parse tag string to Tag enum (supports Simple, Pair, List formats)
2771                    if let Ok(tag) = Tag::from_str(s) {
2772                        tags.push(tag);
2773                    } else {
2774                        // Fallback: create Simple tag if parsing fails
2775                        tags.push(crate::models::Tag::Simple(s.to_string()));
2776                    }
2777                }
2778            }
2779        }
2780
2781        // Extract quality rules
2782        let quality_rules = self.extract_quality_rules(data);
2783
2784        // Store sharedDomains in metadata
2785        if !shared_domains.is_empty() {
2786            let shared_domains_json: Vec<serde_json::Value> = shared_domains
2787                .iter()
2788                .map(|d| serde_json::Value::String(d.clone()))
2789                .collect();
2790            odcl_metadata.insert(
2791                "sharedDomains".to_string(),
2792                serde_json::Value::Array(shared_domains_json),
2793            );
2794        }
2795
2796        let table_uuid = self.extract_table_uuid(data);
2797
2798        let table = Table {
2799            id: table_uuid,
2800            name: model_name.clone(),
2801            columns,
2802            database_type,
2803            catalog_name,
2804            schema_name,
2805            medallion_layers: Vec::new(),
2806            scd_pattern: None,
2807            data_vault_classification: None,
2808            modeling_level: None,
2809            tags,
2810            odcl_metadata,
2811            owner: None,
2812            sla: None,
2813            contact_details: None,
2814            infrastructure_type: None,
2815            notes: None,
2816            position: None,
2817            yaml_file_path: None,
2818            drawio_cell_id: None,
2819            quality: quality_rules,
2820            errors: Vec::new(),
2821            created_at: chrono::Utc::now(),
2822            updated_at: chrono::Utc::now(),
2823        };
2824
2825        info!(
2826            "Parsed Data Contract table: {} with {} warnings/errors",
2827            model_name,
2828            errors.len()
2829        );
2830        Ok((table, errors))
2831    }
2832
2833    /// Parse a single field from Data Contract format.
2834    fn parse_data_contract_field(
2835        &self,
2836        field_name: &str,
2837        field_data: &serde_json::Map<String, JsonValue>,
2838        data: &JsonValue,
2839        errors: &mut Vec<ParserError>,
2840    ) -> Result<Vec<Column>> {
2841        let mut columns = Vec::new();
2842
2843        // Helper function to extract quality rules from a JSON object
2844        let extract_quality_from_obj =
2845            |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
2846                let mut quality_rules = Vec::new();
2847                if let Some(quality_val) = obj.get("quality") {
2848                    if let Some(arr) = quality_val.as_array() {
2849                        // Array of quality rules
2850                        for item in arr {
2851                            if let Some(rule_obj) = item.as_object() {
2852                                let mut rule = HashMap::new();
2853                                for (key, value) in rule_obj {
2854                                    rule.insert(key.clone(), json_value_to_serde_value(value));
2855                                }
2856                                quality_rules.push(rule);
2857                            }
2858                        }
2859                    } else if let Some(rule_obj) = quality_val.as_object() {
2860                        // Single quality rule object
2861                        let mut rule = HashMap::new();
2862                        for (key, value) in rule_obj {
2863                            rule.insert(key.clone(), json_value_to_serde_value(value));
2864                        }
2865                        quality_rules.push(rule);
2866                    }
2867                }
2868                quality_rules
2869            };
2870
2871        // Extract description from field_data (preserve empty strings)
2872        let description = field_data
2873            .get("description")
2874            .and_then(|v| v.as_str())
2875            .unwrap_or("")
2876            .to_string();
2877
2878        // Extract quality rules from field_data
2879        let mut quality_rules = extract_quality_from_obj(field_data);
2880
2881        // Check for $ref
2882        if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
2883            // Store ref_path (preserve even if definition doesn't exist)
2884            let ref_path = Some(ref_str.to_string());
2885
2886            if let Some(definition) = resolve_ref(ref_str, data) {
2887                // Merge quality rules from definition if field doesn't have any
2888
2889                // Also extract quality rules from definition and merge (if field doesn't have any)
2890                if quality_rules.is_empty() {
2891                    if let Some(def_obj) = definition.as_object() {
2892                        quality_rules = extract_quality_from_obj(def_obj);
2893                    }
2894                } else {
2895                    // Merge definition quality rules if field has some
2896                    if let Some(def_obj) = definition.as_object() {
2897                        let def_quality = extract_quality_from_obj(def_obj);
2898                        // Append definition quality rules (field-level takes precedence)
2899                        quality_rules.extend(def_quality);
2900                    }
2901                }
2902
2903                let required = field_data
2904                    .get("required")
2905                    .and_then(|v| v.as_bool())
2906                    .unwrap_or(false);
2907
2908                // Note: ODCS v3.1.0 uses the `required` field at the property level to indicate non-nullable columns,
2909                // not a quality rule. We should NOT add a "not_null" quality rule as it's not a valid ODCS quality type.
2910                // The `required` field will be set based on the `required` parameter during column creation.
2911
2912                // Check if definition is an object/struct with nested structure
2913                let has_nested = definition
2914                    .get("type")
2915                    .and_then(|v| v.as_str())
2916                    .map(|s| s == "object")
2917                    .unwrap_or(false)
2918                    || definition.get("properties").is_some()
2919                    || definition.get("fields").is_some();
2920
2921                if has_nested {
2922                    // Expand STRUCT from definition into nested columns with dot notation
2923                    if let Some(properties) =
2924                        definition.get("properties").and_then(|v| v.as_object())
2925                    {
2926                        // Recursively expand nested properties
2927                        let nested_required: Vec<String> = definition
2928                            .get("required")
2929                            .and_then(|v| v.as_array())
2930                            .map(|arr| {
2931                                arr.iter()
2932                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
2933                                    .collect()
2934                            })
2935                            .unwrap_or_default();
2936
2937                        for (nested_name, nested_schema) in properties {
2938                            let nested_required_field = nested_required.contains(nested_name);
2939                            expand_nested_column(
2940                                &format!("{}.{}", field_name, nested_name),
2941                                nested_schema,
2942                                !nested_required_field,
2943                                &mut columns,
2944                                errors,
2945                            );
2946                        }
2947                    } else if let Some(fields) =
2948                        definition.get("fields").and_then(|v| v.as_object())
2949                    {
2950                        // Handle fields format (ODCL style)
2951                        for (nested_name, nested_schema) in fields {
2952                            expand_nested_column(
2953                                &format!("{}.{}", field_name, nested_name),
2954                                nested_schema,
2955                                true, // Assume nullable if not specified
2956                                &mut columns,
2957                                errors,
2958                            );
2959                        }
2960                    } else {
2961                        // Fallback: create parent column as OBJECT if we can't expand
2962                        let def_physical_type = definition
2963                            .get("physicalType")
2964                            .and_then(|v| v.as_str())
2965                            .map(|s| s.to_string());
2966                        columns.push(Column {
2967                            name: field_name.to_string(),
2968                            data_type: "OBJECT".to_string(),
2969                            physical_type: def_physical_type,
2970                            nullable: !required,
2971                            description: if description.is_empty() {
2972                                definition
2973                                    .get("description")
2974                                    .and_then(|v| v.as_str())
2975                                    .unwrap_or("")
2976                                    .to_string()
2977                            } else {
2978                                description.clone()
2979                            },
2980                            quality: quality_rules.clone(),
2981                            relationships: ref_to_relationships(&ref_path),
2982                            ..Default::default()
2983                        });
2984                    }
2985                } else {
2986                    // Simple type from definition
2987                    let def_type = definition
2988                        .get("type")
2989                        .and_then(|v| v.as_str())
2990                        .unwrap_or("STRING")
2991                        .to_uppercase();
2992
2993                    let enum_values = definition
2994                        .get("enum")
2995                        .and_then(|v| v.as_array())
2996                        .map(|arr| {
2997                            arr.iter()
2998                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
2999                                .collect()
3000                        })
3001                        .unwrap_or_default();
3002
3003                    let def_physical_type = definition
3004                        .get("physicalType")
3005                        .and_then(|v| v.as_str())
3006                        .map(|s| s.to_string());
3007
3008                    columns.push(Column {
3009                        name: field_name.to_string(),
3010                        data_type: def_type,
3011                        physical_type: def_physical_type,
3012                        nullable: !required,
3013                        description: if description.is_empty() {
3014                            definition
3015                                .get("description")
3016                                .and_then(|v| v.as_str())
3017                                .unwrap_or("")
3018                                .to_string()
3019                        } else {
3020                            description
3021                        },
3022                        quality: quality_rules,
3023                        relationships: ref_to_relationships(&ref_path),
3024                        enum_values,
3025                        ..Default::default()
3026                    });
3027                }
3028                return Ok(columns);
3029            } else {
3030                // Undefined reference - create column with error
3031                let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
3032                let mut error_map = HashMap::new();
3033                error_map.insert("type".to_string(), serde_json::json!("validation_error"));
3034                error_map.insert("field".to_string(), serde_json::json!("data_type"));
3035                error_map.insert(
3036                    "message".to_string(),
3037                    serde_json::json!(format!(
3038                        "Field '{}' references undefined definition: {}",
3039                        field_name, ref_str
3040                    )),
3041                );
3042                col_errors.push(error_map);
3043                let field_physical_type = field_data
3044                    .get("physicalType")
3045                    .and_then(|v| v.as_str())
3046                    .map(|s| s.to_string());
3047                columns.push(Column {
3048                    name: field_name.to_string(),
3049                    data_type: "OBJECT".to_string(),
3050                    physical_type: field_physical_type,
3051                    description,
3052                    errors: col_errors,
3053                    relationships: ref_to_relationships(&Some(ref_str.to_string())),
3054                    ..Default::default()
3055                });
3056                return Ok(columns);
3057            }
3058        }
3059
3060        // Extract field type - check both "logicalType" (ODCS v3.1.0) and "type" (legacy)
3061        // Default to STRING if missing
3062        let field_type_str = field_data
3063            .get("logicalType")
3064            .and_then(|v| v.as_str())
3065            .or_else(|| field_data.get("type").and_then(|v| v.as_str()))
3066            .unwrap_or("STRING");
3067
3068        // Check if type contains STRUCT definition (multiline STRUCT type)
3069        if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
3070            match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
3071                Ok(nested_cols) if !nested_cols.is_empty() => {
3072                    // We have nested columns - add parent column with full type, then nested columns
3073                    let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
3074                        "ARRAY<STRUCT<...>>".to_string()
3075                    } else {
3076                        "STRUCT<...>".to_string()
3077                    };
3078
3079                    let struct_physical_type = field_data
3080                        .get("physicalType")
3081                        .and_then(|v| v.as_str())
3082                        .map(|s| s.to_string());
3083
3084                    // Add parent column
3085                    columns.push(Column {
3086                        name: field_name.to_string(),
3087                        data_type: parent_data_type,
3088                        physical_type: struct_physical_type,
3089                        nullable: !field_data
3090                            .get("required")
3091                            .and_then(|v| v.as_bool())
3092                            .unwrap_or(false),
3093                        description: description.clone(),
3094                        quality: quality_rules.clone(),
3095                        relationships: ref_to_relationships(
3096                            &field_data
3097                                .get("$ref")
3098                                .and_then(|v| v.as_str())
3099                                .map(|s| s.to_string()),
3100                        ),
3101                        ..Default::default()
3102                    });
3103
3104                    // Add nested columns
3105                    columns.extend(nested_cols);
3106                    return Ok(columns);
3107                }
3108                Ok(_) | Err(_) => {
3109                    // If parsing fails or returns empty, fall back to using the type as-is
3110                }
3111            }
3112        }
3113
3114        let field_type = normalize_data_type(field_type_str);
3115
3116        // Handle ARRAY type
3117        if field_type == "ARRAY" {
3118            let items = field_data.get("items");
3119            if let Some(items_val) = items {
3120                if let Some(items_obj) = items_val.as_object() {
3121                    // Check if items is an object with fields (nested structure)
3122                    // Check both "logicalType" (ODCS v3.1.0) and "type" (legacy) for backward compatibility
3123                    let items_type = items_obj
3124                        .get("logicalType")
3125                        .and_then(|v| v.as_str())
3126                        .or_else(|| items_obj.get("type").and_then(|v| v.as_str()));
3127
3128                    // Normalize legacy "type" values to "logicalType" equivalents
3129                    let normalized_items_type = match items_type {
3130                        Some("object") | Some("struct") => Some("object"),
3131                        Some("array") => Some("array"),
3132                        Some("string") | Some("varchar") | Some("char") | Some("text") => {
3133                            Some("string")
3134                        }
3135                        Some("integer") | Some("int") | Some("bigint") | Some("smallint")
3136                        | Some("tinyint") => Some("integer"),
3137                        Some("number") | Some("decimal") | Some("double") | Some("float")
3138                        | Some("numeric") => Some("number"),
3139                        Some("boolean") | Some("bool") => Some("boolean"),
3140                        Some("date") => Some("date"),
3141                        Some("timestamp") | Some("datetime") => Some("timestamp"),
3142                        Some("time") => Some("time"),
3143                        other => other,
3144                    };
3145
3146                    if items_obj.get("fields").is_some()
3147                        || items_obj.get("properties").is_some()
3148                        || normalized_items_type == Some("object")
3149                    {
3150                        // Array of objects - create parent column as ARRAY<OBJECT>
3151                        let array_physical_type = field_data
3152                            .get("physicalType")
3153                            .and_then(|v| v.as_str())
3154                            .map(|s| s.to_string());
3155                        columns.push(Column {
3156                            name: field_name.to_string(),
3157                            data_type: "ARRAY<OBJECT>".to_string(),
3158                            physical_type: array_physical_type,
3159                            nullable: !field_data
3160                                .get("required")
3161                                .and_then(|v| v.as_bool())
3162                                .unwrap_or(false),
3163                            description: field_data
3164                                .get("description")
3165                                .and_then(|v| v.as_str())
3166                                .unwrap_or("")
3167                                .to_string(),
3168                            ..Default::default()
3169                        });
3170
3171                        // Extract nested fields from items.properties or items.fields if present
3172                        // Handle both object format (legacy) and array format (ODCS v3.1.0)
3173                        let properties_obj =
3174                            items_obj.get("properties").and_then(|v| v.as_object());
3175                        let properties_arr = items_obj.get("properties").and_then(|v| v.as_array());
3176                        let fields_obj = items_obj.get("fields").and_then(|v| v.as_object());
3177
3178                        if let Some(fields_map) = properties_obj.or(fields_obj) {
3179                            // Object format (legacy): properties is a map
3180                            for (nested_field_name, nested_field_data) in fields_map {
3181                                if let Some(nested_field_obj) = nested_field_data.as_object() {
3182                                    // Check both "logicalType" (ODCS v3.1.0) and "type" (legacy)
3183                                    let nested_field_type = nested_field_obj
3184                                        .get("logicalType")
3185                                        .and_then(|v| v.as_str())
3186                                        .or_else(|| {
3187                                            nested_field_obj.get("type").and_then(|v| v.as_str())
3188                                        })
3189                                        .unwrap_or("STRING");
3190
3191                                    // Recursively parse nested fields with array prefix
3192                                    let nested_col_name =
3193                                        format!("{}.[].{}", field_name, nested_field_name);
3194                                    let mut local_errors = Vec::new();
3195                                    match self.parse_data_contract_field(
3196                                        &nested_col_name,
3197                                        nested_field_obj,
3198                                        data,
3199                                        &mut local_errors,
3200                                    ) {
3201                                        Ok(mut nested_cols) => {
3202                                            // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
3203                                            // Otherwise, it returns a single column
3204                                            columns.append(&mut nested_cols);
3205                                        }
3206                                        Err(_) => {
3207                                            // Fallback: create simple nested column
3208                                            let nested_physical_type = nested_field_obj
3209                                                .get("physicalType")
3210                                                .and_then(|v| v.as_str())
3211                                                .map(|s| s.to_string());
3212                                            columns.push(Column {
3213                                                name: nested_col_name,
3214                                                data_type: nested_field_type.to_uppercase(),
3215                                                physical_type: nested_physical_type,
3216                                                nullable: !nested_field_obj
3217                                                    .get("required")
3218                                                    .and_then(|v| v.as_bool())
3219                                                    .unwrap_or(false),
3220                                                description: nested_field_obj
3221                                                    .get("description")
3222                                                    .and_then(|v| v.as_str())
3223                                                    .unwrap_or("")
3224                                                    .to_string(),
3225                                                ..Default::default()
3226                                            });
3227                                        }
3228                                    }
3229                                }
3230                            }
3231                        } else if let Some(properties_list) = properties_arr {
3232                            // Array format (ODCS v3.1.0): properties is an array with 'name' field
3233                            let mut local_errors = Vec::new();
3234                            for prop_data in properties_list {
3235                                if let Some(prop_obj) = prop_data.as_object() {
3236                                    // Extract name from property object (required in v3.1.0)
3237                                    let nested_field_name = prop_obj
3238                                        .get("name")
3239                                        .or_else(|| prop_obj.get("id"))
3240                                        .and_then(|v| v.as_str())
3241                                        .unwrap_or("");
3242
3243                                    if !nested_field_name.is_empty() {
3244                                        // Recursively parse nested fields with array prefix
3245                                        let nested_col_name =
3246                                            format!("{}.[].{}", field_name, nested_field_name);
3247                                        match self.parse_data_contract_field(
3248                                            &nested_col_name,
3249                                            prop_obj,
3250                                            data,
3251                                            &mut local_errors,
3252                                        ) {
3253                                            Ok(mut nested_cols) => {
3254                                                // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
3255                                                // Otherwise, it returns a single column
3256                                                columns.append(&mut nested_cols);
3257                                            }
3258                                            Err(_) => {
3259                                                // Fallback: create simple nested column
3260                                                // Check both "logicalType" (ODCS v3.1.0) and "type" (legacy)
3261                                                let nested_field_type = prop_obj
3262                                                    .get("logicalType")
3263                                                    .and_then(|v| v.as_str())
3264                                                    .or_else(|| {
3265                                                        prop_obj
3266                                                            .get("type")
3267                                                            .and_then(|v| v.as_str())
3268                                                    })
3269                                                    .unwrap_or("STRING");
3270                                                let nested_physical_type = prop_obj
3271                                                    .get("physicalType")
3272                                                    .and_then(|v| v.as_str())
3273                                                    .map(|s| s.to_string());
3274                                                columns.push(Column {
3275                                                    name: nested_col_name,
3276                                                    data_type: nested_field_type.to_uppercase(),
3277                                                    physical_type: nested_physical_type,
3278                                                    nullable: !prop_obj
3279                                                        .get("required")
3280                                                        .and_then(|v| v.as_bool())
3281                                                        .unwrap_or(false),
3282                                                    description: prop_obj
3283                                                        .get("description")
3284                                                        .and_then(|v| v.as_str())
3285                                                        .unwrap_or("")
3286                                                        .to_string(),
3287                                                    ..Default::default()
3288                                                });
3289                                            }
3290                                        }
3291                                    }
3292                                }
3293                            }
3294                        }
3295
3296                        return Ok(columns);
3297                    } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
3298                        // Array of simple type
3299                        let array_physical_type = field_data
3300                            .get("physicalType")
3301                            .and_then(|v| v.as_str())
3302                            .map(|s| s.to_string());
3303                        columns.push(Column {
3304                            name: field_name.to_string(),
3305                            data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
3306                            physical_type: array_physical_type,
3307                            nullable: !field_data
3308                                .get("required")
3309                                .and_then(|v| v.as_bool())
3310                                .unwrap_or(false),
3311                            description: description.clone(),
3312                            quality: quality_rules.clone(),
3313                            relationships: ref_to_relationships(
3314                                &field_data
3315                                    .get("$ref")
3316                                    .and_then(|v| v.as_str())
3317                                    .map(|s| s.to_string()),
3318                            ),
3319                            ..Default::default()
3320                        });
3321                        return Ok(columns);
3322                    }
3323                } else if let Some(item_type_str) = items_val.as_str() {
3324                    // Array of simple type (string)
3325                    let array_physical_type = field_data
3326                        .get("physicalType")
3327                        .and_then(|v| v.as_str())
3328                        .map(|s| s.to_string());
3329                    columns.push(Column {
3330                        name: field_name.to_string(),
3331                        data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
3332                        physical_type: array_physical_type,
3333                        nullable: !field_data
3334                            .get("required")
3335                            .and_then(|v| v.as_bool())
3336                            .unwrap_or(false),
3337                        description: description.clone(),
3338                        quality: quality_rules.clone(),
3339                        relationships: ref_to_relationships(
3340                            &field_data
3341                                .get("$ref")
3342                                .and_then(|v| v.as_str())
3343                                .map(|s| s.to_string()),
3344                        ),
3345                        ..Default::default()
3346                    });
3347                    return Ok(columns);
3348                }
3349            }
3350            // Array without items - default to ARRAY<STRING>
3351            let array_physical_type = field_data
3352                .get("physicalType")
3353                .and_then(|v| v.as_str())
3354                .map(|s| s.to_string());
3355            columns.push(Column {
3356                name: field_name.to_string(),
3357                data_type: "ARRAY<STRING>".to_string(),
3358                physical_type: array_physical_type,
3359                nullable: !field_data
3360                    .get("required")
3361                    .and_then(|v| v.as_bool())
3362                    .unwrap_or(false),
3363                description: description.clone(),
3364                quality: quality_rules.clone(),
3365                relationships: ref_to_relationships(
3366                    &field_data
3367                        .get("$ref")
3368                        .and_then(|v| v.as_str())
3369                        .map(|s| s.to_string()),
3370                ),
3371                ..Default::default()
3372            });
3373            return Ok(columns);
3374        }
3375
3376        // Check if this is a nested object with fields or properties
3377        // Note: Export saves nested columns in properties.properties, but some formats use fields
3378        // Handle both object format (legacy/ODCL) and array format (ODCS v3.1.0)
3379        let nested_fields_obj = field_data
3380            .get("properties")
3381            .and_then(|v| v.as_object())
3382            .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
3383        let nested_fields_arr = field_data.get("properties").and_then(|v| v.as_array());
3384
3385        if field_type == "OBJECT" && (nested_fields_obj.is_some() || nested_fields_arr.is_some()) {
3386            // Inline nested object - create parent column as OBJECT and extract nested fields
3387
3388            // Extract physicalType for the parent OBJECT column
3389            let parent_physical_type = field_data
3390                .get("physicalType")
3391                .and_then(|v| v.as_str())
3392                .map(|s| s.to_string());
3393
3394            // Create parent column
3395            columns.push(Column {
3396                name: field_name.to_string(),
3397                data_type: "OBJECT".to_string(),
3398                physical_type: parent_physical_type,
3399                nullable: !field_data
3400                    .get("required")
3401                    .and_then(|v| v.as_bool())
3402                    .unwrap_or(false),
3403                description: description.clone(),
3404                quality: quality_rules.clone(),
3405                relationships: ref_to_relationships(
3406                    &field_data
3407                        .get("$ref")
3408                        .and_then(|v| v.as_str())
3409                        .map(|s| s.to_string()),
3410                ),
3411                ..Default::default()
3412            });
3413
3414            // Extract nested fields recursively - handle both object and array formats
3415            if let Some(fields_obj) = nested_fields_obj {
3416                // Object format (legacy/ODCL): properties is a map of name -> schema
3417                for (nested_field_name, nested_field_data) in fields_obj {
3418                    if let Some(nested_field_obj) = nested_field_data.as_object() {
3419                        let nested_field_type = nested_field_obj
3420                            .get("logicalType")
3421                            .and_then(|v| v.as_str())
3422                            .or_else(|| nested_field_obj.get("type").and_then(|v| v.as_str()))
3423                            .unwrap_or("STRING");
3424
3425                        // Recursively parse nested fields
3426                        let nested_col_name = format!("{}.{}", field_name, nested_field_name);
3427                        match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data)
3428                        {
3429                            Ok(mut nested_cols) => {
3430                                // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
3431                                // Otherwise, it returns a single column
3432                                columns.append(&mut nested_cols);
3433                            }
3434                            Err(_) => {
3435                                // Fallback: create simple nested column
3436                                let nested_physical_type = nested_field_obj
3437                                    .get("physicalType")
3438                                    .and_then(|v| v.as_str())
3439                                    .map(|s| s.to_string());
3440                                columns.push(Column {
3441                                    name: nested_col_name,
3442                                    data_type: nested_field_type.to_uppercase(),
3443                                    physical_type: nested_physical_type,
3444                                    nullable: !nested_field_obj
3445                                        .get("required")
3446                                        .and_then(|v| v.as_bool())
3447                                        .unwrap_or(false),
3448                                    description: nested_field_obj
3449                                        .get("description")
3450                                        .and_then(|v| v.as_str())
3451                                        .unwrap_or("")
3452                                        .to_string(),
3453                                    ..Default::default()
3454                                });
3455                            }
3456                        }
3457                    }
3458                }
3459            } else if let Some(fields_arr) = nested_fields_arr {
3460                // Array format (ODCS v3.1.0): properties is an array with 'name' field
3461                for prop_data in fields_arr {
3462                    if let Some(prop_obj) = prop_data.as_object() {
3463                        // Extract name from property object (required in v3.1.0)
3464                        let nested_field_name = prop_obj
3465                            .get("name")
3466                            .or_else(|| prop_obj.get("id"))
3467                            .and_then(|v| v.as_str())
3468                            .unwrap_or("");
3469
3470                        if !nested_field_name.is_empty() {
3471                            let nested_field_type = prop_obj
3472                                .get("logicalType")
3473                                .and_then(|v| v.as_str())
3474                                .or_else(|| prop_obj.get("type").and_then(|v| v.as_str()))
3475                                .unwrap_or("STRING");
3476
3477                            // Recursively parse nested fields
3478                            let nested_col_name = format!("{}.{}", field_name, nested_field_name);
3479                            match self.parse_odcl_v3_property(&nested_col_name, prop_obj, data) {
3480                                Ok(mut nested_cols) => {
3481                                    // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
3482                                    // Otherwise, it returns a single column
3483                                    columns.append(&mut nested_cols);
3484                                }
3485                                Err(_) => {
3486                                    // Fallback: create simple nested column
3487                                    let nested_physical_type = prop_obj
3488                                        .get("physicalType")
3489                                        .and_then(|v| v.as_str())
3490                                        .map(|s| s.to_string());
3491                                    columns.push(Column {
3492                                        name: nested_col_name,
3493                                        data_type: nested_field_type.to_uppercase(),
3494                                        physical_type: nested_physical_type,
3495                                        nullable: !prop_obj
3496                                            .get("required")
3497                                            .and_then(|v| v.as_bool())
3498                                            .unwrap_or(false),
3499                                        description: prop_obj
3500                                            .get("description")
3501                                            .and_then(|v| v.as_str())
3502                                            .unwrap_or("")
3503                                            .to_string(),
3504                                        ..Default::default()
3505                                    });
3506                                }
3507                            }
3508                        }
3509                    }
3510                }
3511            }
3512
3513            return Ok(columns);
3514        }
3515
3516        // Regular field (no $ref or $ref not found)
3517        let required = field_data
3518            .get("required")
3519            .and_then(|v| v.as_bool())
3520            .unwrap_or(false);
3521
3522        // Use description extracted at function start, or extract if not yet extracted
3523        let field_description = if description.is_empty() {
3524            field_data
3525                .get("description")
3526                .and_then(|v| v.as_str())
3527                .unwrap_or("")
3528                .to_string()
3529        } else {
3530            description
3531        };
3532
3533        // Use quality_rules extracted at function start, or extract if not yet extracted
3534        let mut column_quality_rules = quality_rules;
3535
3536        // Extract column-level quality rules if not already extracted
3537        if column_quality_rules.is_empty()
3538            && let Some(quality_val) = field_data.get("quality")
3539        {
3540            if let Some(arr) = quality_val.as_array() {
3541                // Array of quality rules
3542                for item in arr {
3543                    if let Some(obj) = item.as_object() {
3544                        let mut rule = HashMap::new();
3545                        for (key, value) in obj {
3546                            rule.insert(key.clone(), json_value_to_serde_value(value));
3547                        }
3548                        column_quality_rules.push(rule);
3549                    }
3550                }
3551            } else if let Some(obj) = quality_val.as_object() {
3552                // Single quality rule object
3553                let mut rule = HashMap::new();
3554                for (key, value) in obj {
3555                    rule.insert(key.clone(), json_value_to_serde_value(value));
3556                }
3557                column_quality_rules.push(rule);
3558            }
3559        }
3560
3561        // Note: ODCS v3.1.0 uses the `required` field at the property level to indicate non-nullable columns,
3562        // not a quality rule. We should NOT add a "not_null" quality rule as it's not a valid ODCS quality type.
3563        // The `required` field will be set based on the `required` parameter during column creation.
3564
3565        // Extract physicalType (ODCS v3.1.0) - the actual database type
3566        let physical_type = field_data
3567            .get("physicalType")
3568            .and_then(|v| v.as_str())
3569            .map(|s| s.to_string());
3570
3571        // Parse the column with all ODCS v3.1.0 metadata fields
3572        let column = self.parse_column_metadata_from_field(
3573            field_name,
3574            &field_type,
3575            physical_type,
3576            !required,
3577            field_description,
3578            column_quality_rules,
3579            field_data,
3580        );
3581
3582        columns.push(column);
3583
3584        Ok(columns)
3585    }
3586
3587    /// Parse all column metadata fields from ODCS v3.1.0 field data.
3588    /// This extracts all supported column-level metadata including:
3589    /// - businessName, physicalName, logicalTypeOptions
3590    /// - primaryKey, primaryKeyPosition, unique
3591    /// - partitioned, partitionKeyPosition, clustered
3592    /// - classification, criticalDataElement, encryptedName
3593    /// - transformSourceObjects, transformLogic, transformDescription
3594    /// - examples, authoritativeDefinitions, tags, customProperties
3595    #[allow(clippy::too_many_arguments)]
3596    fn parse_column_metadata_from_field(
3597        &self,
3598        name: &str,
3599        data_type: &str,
3600        physical_type: Option<String>,
3601        nullable: bool,
3602        description: String,
3603        quality: Vec<HashMap<String, serde_json::Value>>,
3604        field_data: &serde_json::Map<String, JsonValue>,
3605    ) -> Column {
3606        use crate::models::{AuthoritativeDefinition, LogicalTypeOptions};
3607
3608        // businessName
3609        let business_name = field_data
3610            .get("businessName")
3611            .and_then(|v| v.as_str())
3612            .map(|s| s.to_string());
3613
3614        // physicalName
3615        let physical_name = field_data
3616            .get("physicalName")
3617            .and_then(|v| v.as_str())
3618            .map(|s| s.to_string());
3619
3620        // logicalTypeOptions
3621        let logical_type_options = field_data.get("logicalTypeOptions").and_then(|v| {
3622            v.as_object().map(|opts| LogicalTypeOptions {
3623                min_length: opts.get("minLength").and_then(|v| v.as_i64()),
3624                max_length: opts.get("maxLength").and_then(|v| v.as_i64()),
3625                pattern: opts
3626                    .get("pattern")
3627                    .and_then(|v| v.as_str())
3628                    .map(|s| s.to_string()),
3629                format: opts
3630                    .get("format")
3631                    .and_then(|v| v.as_str())
3632                    .map(|s| s.to_string()),
3633                minimum: opts.get("minimum").cloned(),
3634                maximum: opts.get("maximum").cloned(),
3635                exclusive_minimum: opts.get("exclusiveMinimum").cloned(),
3636                exclusive_maximum: opts.get("exclusiveMaximum").cloned(),
3637                precision: opts
3638                    .get("precision")
3639                    .and_then(|v| v.as_i64())
3640                    .map(|n| n as i32),
3641                scale: opts.get("scale").and_then(|v| v.as_i64()).map(|n| n as i32),
3642            })
3643        });
3644
3645        // primaryKey
3646        let primary_key = field_data
3647            .get("primaryKey")
3648            .and_then(|v| v.as_bool())
3649            .unwrap_or(false);
3650
3651        // primaryKeyPosition
3652        let primary_key_position = field_data
3653            .get("primaryKeyPosition")
3654            .and_then(|v| v.as_i64())
3655            .map(|n| n as i32);
3656
3657        // unique
3658        let unique = field_data
3659            .get("unique")
3660            .and_then(|v| v.as_bool())
3661            .unwrap_or(false);
3662
3663        // partitioned
3664        let partitioned = field_data
3665            .get("partitioned")
3666            .and_then(|v| v.as_bool())
3667            .unwrap_or(false);
3668
3669        // partitionKeyPosition
3670        let partition_key_position = field_data
3671            .get("partitionKeyPosition")
3672            .and_then(|v| v.as_i64())
3673            .map(|n| n as i32);
3674
3675        // clustered
3676        let clustered = field_data
3677            .get("clustered")
3678            .and_then(|v| v.as_bool())
3679            .unwrap_or(false);
3680
3681        // classification
3682        let classification = field_data
3683            .get("classification")
3684            .and_then(|v| v.as_str())
3685            .map(|s| s.to_string());
3686
3687        // criticalDataElement
3688        let critical_data_element = field_data
3689            .get("criticalDataElement")
3690            .and_then(|v| v.as_bool())
3691            .unwrap_or(false);
3692
3693        // encryptedName
3694        let encrypted_name = field_data
3695            .get("encryptedName")
3696            .and_then(|v| v.as_str())
3697            .map(|s| s.to_string());
3698
3699        // transformSourceObjects
3700        let transform_source_objects = field_data
3701            .get("transformSourceObjects")
3702            .and_then(|v| v.as_array())
3703            .map(|arr| {
3704                arr.iter()
3705                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3706                    .collect()
3707            })
3708            .unwrap_or_default();
3709
3710        // transformLogic
3711        let transform_logic = field_data
3712            .get("transformLogic")
3713            .and_then(|v| v.as_str())
3714            .map(|s| s.to_string());
3715
3716        // transformDescription
3717        let transform_description = field_data
3718            .get("transformDescription")
3719            .and_then(|v| v.as_str())
3720            .map(|s| s.to_string());
3721
3722        // examples
3723        let examples = field_data
3724            .get("examples")
3725            .and_then(|v| v.as_array())
3726            .map(|arr| arr.to_vec())
3727            .unwrap_or_default();
3728
3729        // authoritativeDefinitions
3730        let authoritative_definitions = field_data
3731            .get("authoritativeDefinitions")
3732            .and_then(|v| v.as_array())
3733            .map(|arr| {
3734                arr.iter()
3735                    .filter_map(|item| {
3736                        item.as_object().map(|obj| AuthoritativeDefinition {
3737                            definition_type: obj
3738                                .get("type")
3739                                .and_then(|v| v.as_str())
3740                                .unwrap_or("")
3741                                .to_string(),
3742                            url: obj
3743                                .get("url")
3744                                .and_then(|v| v.as_str())
3745                                .unwrap_or("")
3746                                .to_string(),
3747                        })
3748                    })
3749                    .collect()
3750            })
3751            .unwrap_or_default();
3752
3753        // tags
3754        let tags = field_data
3755            .get("tags")
3756            .and_then(|v| v.as_array())
3757            .map(|arr| {
3758                arr.iter()
3759                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3760                    .collect()
3761            })
3762            .unwrap_or_default();
3763
3764        // customProperties
3765        let custom_properties = field_data
3766            .get("customProperties")
3767            .and_then(|v| v.as_array())
3768            .map(|arr| {
3769                arr.iter()
3770                    .filter_map(|item| {
3771                        item.as_object().and_then(|obj| {
3772                            let key = obj.get("property").and_then(|v| v.as_str())?;
3773                            let value = obj.get("value").cloned()?;
3774                            Some((key.to_string(), value))
3775                        })
3776                    })
3777                    .collect()
3778            })
3779            .unwrap_or_default();
3780
3781        // businessKey (secondary_key)
3782        let secondary_key = field_data
3783            .get("businessKey")
3784            .and_then(|v| v.as_bool())
3785            .unwrap_or(false);
3786
3787        // enum values
3788        let enum_values = field_data
3789            .get("enum")
3790            .and_then(|v| v.as_array())
3791            .map(|arr| {
3792                arr.iter()
3793                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3794                    .collect()
3795            })
3796            .unwrap_or_default();
3797
3798        // constraints
3799        let constraints = field_data
3800            .get("constraints")
3801            .and_then(|v| v.as_array())
3802            .map(|arr| {
3803                arr.iter()
3804                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3805                    .collect()
3806            })
3807            .unwrap_or_default();
3808
3809        Column {
3810            name: name.to_string(),
3811            data_type: data_type.to_string(),
3812            physical_type,
3813            physical_name,
3814            nullable,
3815            description,
3816            quality,
3817            business_name,
3818            logical_type_options,
3819            primary_key,
3820            primary_key_position,
3821            unique,
3822            partitioned,
3823            partition_key_position,
3824            clustered,
3825            classification,
3826            critical_data_element,
3827            encrypted_name,
3828            transform_source_objects,
3829            transform_logic,
3830            transform_description,
3831            examples,
3832            authoritative_definitions,
3833            tags,
3834            custom_properties,
3835            secondary_key,
3836            enum_values,
3837            constraints,
3838            foreign_key: self.parse_foreign_key_from_data_contract(field_data),
3839            relationships: self.parse_relationships_from_field(field_data),
3840            ..Default::default()
3841        }
3842    }
3843
3844    /// Parse foreign key from Data Contract field data.
3845    fn parse_foreign_key_from_data_contract(
3846        &self,
3847        field_data: &serde_json::Map<String, JsonValue>,
3848    ) -> Option<ForeignKey> {
3849        field_data
3850            .get("foreignKey")
3851            .and_then(|v| v.as_object())
3852            .map(|fk_obj| ForeignKey {
3853                table_id: fk_obj
3854                    .get("table")
3855                    .or_else(|| fk_obj.get("table_id"))
3856                    .and_then(|v| v.as_str())
3857                    .unwrap_or("")
3858                    .to_string(),
3859                column_name: fk_obj
3860                    .get("column")
3861                    .or_else(|| fk_obj.get("column_name"))
3862                    .and_then(|v| v.as_str())
3863                    .unwrap_or("")
3864                    .to_string(),
3865            })
3866    }
3867
3868    /// Parse relationships from Data Contract field data.
3869    /// Also converts $ref to a relationship entry if present.
3870    fn parse_relationships_from_field(
3871        &self,
3872        field_data: &serde_json::Map<String, JsonValue>,
3873    ) -> Vec<PropertyRelationship> {
3874        let mut relationships = Vec::new();
3875
3876        // Parse relationships array from ODCS v3.1.0 format
3877        if let Some(rels_array) = field_data.get("relationships").and_then(|v| v.as_array()) {
3878            for rel in rels_array {
3879                if let Some(rel_obj) = rel.as_object() {
3880                    let rel_type = rel_obj
3881                        .get("type")
3882                        .and_then(|v| v.as_str())
3883                        .unwrap_or("foreignKey")
3884                        .to_string();
3885                    let to = rel_obj
3886                        .get("to")
3887                        .and_then(|v| v.as_str())
3888                        .unwrap_or("")
3889                        .to_string();
3890
3891                    if !to.is_empty() {
3892                        relationships.push(PropertyRelationship {
3893                            relationship_type: rel_type,
3894                            to,
3895                        });
3896                    }
3897                }
3898            }
3899        }
3900
3901        // Convert $ref to relationship if present and no relationships were parsed
3902        if relationships.is_empty()
3903            && let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str())
3904        {
3905            // Convert $ref path to ODCS relationship format
3906            let to = if ref_str.starts_with("#/definitions/") {
3907                let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
3908                format!("definitions/{}", def_path)
3909            } else if ref_str.starts_with("#/") {
3910                ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
3911            } else {
3912                ref_str.to_string()
3913            };
3914
3915            relationships.push(PropertyRelationship {
3916                relationship_type: "foreignKey".to_string(),
3917                to,
3918            });
3919        }
3920
3921        relationships
3922    }
3923
3924    /// Extract database type from servers in Data Contract format.
3925    fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
3926        // Data Contract format: servers can be object or array
3927        if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
3928            // Object format: { "server_name": { "type": "..." } }
3929            if let Some((_, server_data)) = servers_obj.iter().next()
3930                && let Some(server_obj) = server_data.as_object()
3931            {
3932                return server_obj
3933                    .get("type")
3934                    .and_then(|v| v.as_str())
3935                    .and_then(|s| self.parse_database_type(s));
3936            }
3937        } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
3938            // Array format: [ { "server": "...", "type": "..." } ]
3939            if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
3940                return server_obj
3941                    .get("type")
3942                    .and_then(|v| v.as_str())
3943                    .and_then(|s| self.parse_database_type(s));
3944            }
3945        }
3946        None
3947    }
3948
3949    /// Parse database type string to enum.
3950    fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
3951        match s.to_lowercase().as_str() {
3952            "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
3953            "postgres" | "postgresql" => Some(DatabaseType::Postgres),
3954            "mysql" => Some(DatabaseType::Mysql),
3955            "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
3956            "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
3957            _ => None,
3958        }
3959    }
3960
3961    /// Parse STRUCT type definition from string (e.g., "ARRAY<STRUCT<ID: STRING, NAME: STRING>>").
3962    /// Parse STRUCT type from string and create nested columns
3963    /// This is public so it can be used by SQL importer to parse STRUCT types
3964    pub fn parse_struct_type_from_string(
3965        &self,
3966        field_name: &str,
3967        type_str: &str,
3968        field_data: &serde_json::Map<String, JsonValue>,
3969    ) -> Result<Vec<Column>> {
3970        let mut columns = Vec::new();
3971
3972        // Normalize whitespace - replace newlines and multiple spaces with single space
3973        let normalized_type = type_str
3974            .lines()
3975            .map(|line| line.trim())
3976            .filter(|line| !line.is_empty())
3977            .collect::<Vec<_>>()
3978            .join(" ");
3979
3980        let type_str_upper = normalized_type.to_uppercase();
3981
3982        // Check if it's ARRAY<STRUCT<...>>
3983        let is_array = type_str_upper.starts_with("ARRAY<");
3984        let struct_start = type_str_upper.find("STRUCT<");
3985
3986        if let Some(start_pos) = struct_start {
3987            // Extract STRUCT content - start from STRUCT< and find its closing >
3988            // For ARRAY<STRUCT<...>>, we still need to find the STRUCT<...> closing bracket
3989            let struct_content_start = start_pos + 7; // Skip "STRUCT<"
3990            let struct_content = &normalized_type[struct_content_start..];
3991
3992            // Find matching closing bracket for STRUCT<, handling nested STRUCTs
3993            // We need to find the > that closes STRUCT<, not ARRAY<
3994            let mut depth = 1;
3995            let mut end_pos = None;
3996            for (i, ch) in struct_content.char_indices() {
3997                match ch {
3998                    '<' => depth += 1,
3999                    '>' => {
4000                        depth -= 1;
4001                        if depth == 0 {
4002                            end_pos = Some(i);
4003                            break;
4004                        }
4005                    }
4006                    _ => {}
4007                }
4008            }
4009
4010            // If no closing bracket found, try to infer it (handle malformed YAML)
4011            let struct_fields_str = if let Some(end) = end_pos {
4012                &struct_content[..end]
4013            } else {
4014                // Missing closing bracket - use everything up to the end
4015                struct_content.trim_end_matches('>').trim()
4016            };
4017
4018            // Parse fields: "ID: STRING, NAME: STRING, ..."
4019            let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
4020
4021            // Create nested columns, handling nested STRUCTs recursively
4022            // For ARRAY<STRUCT<...>>, use .[] notation for array elements: parent.[].field
4023            // For STRUCT<...>, use dot notation: parent.field
4024            for (nested_name, nested_type) in fields {
4025                let nested_type_upper = nested_type.to_uppercase();
4026                let nested_col_name = if is_array {
4027                    format!("{}.[].{}", field_name, nested_name)
4028                } else {
4029                    format!("{}.{}", field_name, nested_name)
4030                };
4031
4032                // Check if this field is itself a STRUCT or ARRAY<STRUCT>
4033                // Handle multiple levels of nesting: STRUCT<...>, ARRAY<STRUCT<...>>, ARRAY<STRUCT<...>> within STRUCT, etc.
4034                let is_nested_struct = nested_type_upper.starts_with("STRUCT<");
4035                let is_nested_array_struct = nested_type_upper.starts_with("ARRAY<STRUCT<");
4036
4037                if is_nested_struct || is_nested_array_struct {
4038                    // Recursively parse nested STRUCT or ARRAY<STRUCT>
4039                    // The nested_col_name already includes the parent path:
4040                    // - For STRUCT within ARRAY: "items.[].details"
4041                    // - For STRUCT within STRUCT: "orderInfo.metadata"
4042                    // - For ARRAY<STRUCT> within STRUCT: "parent.items"
4043                    // - For ARRAY<STRUCT> within ARRAY<STRUCT>: "parent.[].items"
4044                    // Recursive calls will correctly build the full path with proper notation
4045                    match self.parse_struct_type_from_string(
4046                        &nested_col_name,
4047                        &nested_type,
4048                        field_data,
4049                    ) {
4050                        Ok(nested_cols) => {
4051                            // Add all recursively parsed columns
4052                            // These will have names like:
4053                            // - items.[].details.name (STRUCT within ARRAY<STRUCT>)
4054                            // - orderInfo.metadata.field (STRUCT within STRUCT)
4055                            // - parent.items.[].field (ARRAY<STRUCT> within STRUCT)
4056                            // - parent.[].items.[].field (ARRAY<STRUCT> within ARRAY<STRUCT>)
4057                            columns.extend(nested_cols);
4058                        }
4059                        Err(_) => {
4060                            // If recursive parsing fails, add the parent column with STRUCT/ARRAY type
4061                            let fallback_data_type = if is_nested_array_struct {
4062                                "ARRAY<STRUCT<...>>".to_string()
4063                            } else {
4064                                "STRUCT<...>".to_string()
4065                            };
4066                            let nested_physical_type = field_data
4067                                .get("physicalType")
4068                                .and_then(|v| v.as_str())
4069                                .map(|s| s.to_string());
4070                            columns.push(Column {
4071                                name: nested_col_name,
4072                                data_type: fallback_data_type,
4073                                physical_type: nested_physical_type,
4074                                nullable: !field_data
4075                                    .get("required")
4076                                    .and_then(|v| v.as_bool())
4077                                    .unwrap_or(false),
4078                                description: field_data
4079                                    .get("description")
4080                                    .and_then(|v| v.as_str())
4081                                    .unwrap_or("")
4082                                    .to_string(),
4083                                ..Default::default()
4084                            });
4085                        }
4086                    }
4087                } else if nested_type_upper.starts_with("ARRAY<") {
4088                    // Handle ARRAY of simple types (not STRUCT) - these don't need nested columns
4089                    // But if it's ARRAY<STRUCT<...>>, it would have been caught above
4090                    let nested_physical_type = field_data
4091                        .get("physicalType")
4092                        .and_then(|v| v.as_str())
4093                        .map(|s| s.to_string());
4094                    columns.push(Column {
4095                        name: nested_col_name,
4096                        data_type: normalize_data_type(&nested_type),
4097                        physical_type: nested_physical_type,
4098                        nullable: !field_data
4099                            .get("required")
4100                            .and_then(|v| v.as_bool())
4101                            .unwrap_or(false),
4102                        description: field_data
4103                            .get("description")
4104                            .and_then(|v| v.as_str())
4105                            .unwrap_or("")
4106                            .to_string(),
4107                        ..Default::default()
4108                    });
4109                } else {
4110                    // Simple nested field (not a STRUCT)
4111                    let nested_physical_type = field_data
4112                        .get("physicalType")
4113                        .and_then(|v| v.as_str())
4114                        .map(|s| s.to_string());
4115                    columns.push(Column {
4116                        name: nested_col_name,
4117                        data_type: normalize_data_type(&nested_type),
4118                        physical_type: nested_physical_type,
4119                        nullable: !field_data
4120                            .get("required")
4121                            .and_then(|v| v.as_bool())
4122                            .unwrap_or(false),
4123                        description: field_data
4124                            .get("description")
4125                            .and_then(|v| v.as_str())
4126                            .unwrap_or("")
4127                            .to_string(),
4128                        ..Default::default()
4129                    });
4130                }
4131            }
4132
4133            return Ok(columns);
4134        }
4135
4136        // If no STRUCT found, return empty (fallback to regular parsing)
4137        Ok(Vec::new())
4138    }
4139
4140    /// Parse STRUCT fields from string (e.g., "ID: STRING, NAME: STRING").
4141    fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
4142        let mut fields = Vec::new();
4143        let mut current_field = String::new();
4144        let mut depth = 0;
4145        let mut in_string = false;
4146        let mut string_char = None;
4147
4148        for ch in fields_str.chars() {
4149            match ch {
4150                '\'' | '"' if !in_string || Some(ch) == string_char => {
4151                    if in_string {
4152                        in_string = false;
4153                        string_char = None;
4154                    } else {
4155                        in_string = true;
4156                        string_char = Some(ch);
4157                    }
4158                    current_field.push(ch);
4159                }
4160                '<' if !in_string => {
4161                    depth += 1;
4162                    current_field.push(ch);
4163                }
4164                '>' if !in_string => {
4165                    depth -= 1;
4166                    current_field.push(ch);
4167                }
4168                ',' if !in_string && depth == 0 => {
4169                    // End of current field
4170                    let trimmed = current_field.trim();
4171                    if !trimmed.is_empty()
4172                        && let Some((name, type_part)) = self.parse_field_definition(trimmed)
4173                    {
4174                        fields.push((name, type_part));
4175                    }
4176                    current_field.clear();
4177                }
4178                _ => {
4179                    current_field.push(ch);
4180                }
4181            }
4182        }
4183
4184        // Handle last field
4185        let trimmed = current_field.trim();
4186        if !trimmed.is_empty()
4187            && let Some((name, type_part)) = self.parse_field_definition(trimmed)
4188        {
4189            fields.push((name, type_part));
4190        }
4191
4192        Ok(fields)
4193    }
4194
4195    /// Parse a single field definition (e.g., "ID: STRING" or "DETAILS: STRUCT<...>").
4196    fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
4197        // Split by colon, but handle nested STRUCTs
4198        let colon_pos = field_def.find(':')?;
4199        let name = field_def[..colon_pos].trim().to_string();
4200        let type_part = field_def[colon_pos + 1..].trim().to_string();
4201
4202        if name.is_empty() || type_part.is_empty() {
4203            return None;
4204        }
4205
4206        Some((name, type_part))
4207    }
4208
4209    /// Extract catalog and schema from customProperties.
4210    fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
4211        let mut catalog_name = None;
4212        let mut schema_name = None;
4213
4214        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
4215            for prop in custom_props {
4216                if let Some(prop_obj) = prop.as_object() {
4217                    let prop_key = prop_obj
4218                        .get("property")
4219                        .and_then(|v| v.as_str())
4220                        .unwrap_or("");
4221                    let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
4222
4223                    match prop_key {
4224                        "catalogName" | "catalog_name" => {
4225                            catalog_name = prop_value.map(|s| s.to_string());
4226                        }
4227                        "schemaName" | "schema_name" => {
4228                            schema_name = prop_value.map(|s| s.to_string());
4229                        }
4230                        _ => {}
4231                    }
4232                }
4233            }
4234        }
4235
4236        // Also check direct fields
4237        if catalog_name.is_none() {
4238            catalog_name = data
4239                .get("catalog_name")
4240                .and_then(|v| v.as_str())
4241                .map(|s| s.to_string());
4242        }
4243        if schema_name.is_none() {
4244            schema_name = data
4245                .get("schema_name")
4246                .and_then(|v| v.as_str())
4247                .map(|s| s.to_string());
4248        }
4249
4250        (catalog_name, schema_name)
4251    }
4252}
4253
4254impl Default for ODCSImporter {
4255    fn default() -> Self {
4256        Self::new()
4257    }
4258}
4259
4260#[cfg(test)]
4261mod tests {
4262    use super::*;
4263
4264    #[test]
4265    fn test_parse_simple_odcl_table() {
4266        let mut parser = ODCSImporter::new();
4267        let odcl_yaml = r#"
4268name: users
4269columns:
4270  - name: id
4271    data_type: INT
4272    nullable: false
4273    primary_key: true
4274  - name: name
4275    data_type: VARCHAR(255)
4276    nullable: false
4277database_type: Postgres
4278"#;
4279
4280        let (table, errors) = parser.parse(odcl_yaml).unwrap();
4281        assert_eq!(table.name, "users");
4282        assert_eq!(table.columns.len(), 2);
4283        assert_eq!(table.columns[0].name, "id");
4284        assert_eq!(table.database_type, Some(DatabaseType::Postgres));
4285        assert_eq!(errors.len(), 0);
4286    }
4287
4288    #[test]
4289    fn test_parse_odcl_with_metadata() {
4290        let mut parser = ODCSImporter::new();
4291        let odcl_yaml = r#"
4292name: users
4293columns:
4294  - name: id
4295    data_type: INT
4296medallion_layer: gold
4297scd_pattern: TYPE_2
4298odcl_metadata:
4299  description: "User table"
4300  owner: "data-team"
4301"#;
4302
4303        let (table, errors) = parser.parse(odcl_yaml).unwrap();
4304        assert_eq!(table.medallion_layers.len(), 1);
4305        assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
4306        assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
4307        if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
4308            assert_eq!(desc, "User table");
4309        }
4310        assert_eq!(errors.len(), 0);
4311    }
4312
4313    #[test]
4314    fn test_parse_odcl_with_data_vault() {
4315        let mut parser = ODCSImporter::new();
4316        let odcl_yaml = r#"
4317name: hub_customer
4318columns:
4319  - name: customer_key
4320    data_type: VARCHAR(50)
4321data_vault_classification: Hub
4322"#;
4323
4324        let (table, errors) = parser.parse(odcl_yaml).unwrap();
4325        assert_eq!(
4326            table.data_vault_classification,
4327            Some(DataVaultClassification::Hub)
4328        );
4329        assert_eq!(errors.len(), 0);
4330    }
4331
4332    #[test]
4333    fn test_parse_invalid_odcl() {
4334        let mut parser = ODCSImporter::new();
4335        let invalid_yaml = "not: valid: yaml: structure:";
4336
4337        // Should fail to parse YAML
4338        assert!(parser.parse(invalid_yaml).is_err());
4339    }
4340
4341    #[test]
4342    fn test_parse_odcl_missing_required_fields() {
4343        let mut parser = ODCSImporter::new();
4344        let non_conformant = r#"
4345name: users
4346# Missing required columns field
4347"#;
4348
4349        // Should fail with missing columns
4350        assert!(parser.parse(non_conformant).is_err());
4351    }
4352
4353    #[test]
4354    fn test_parse_odcl_with_foreign_key() {
4355        let mut parser = ODCSImporter::new();
4356        let odcl_yaml = r#"
4357name: orders
4358columns:
4359  - name: id
4360    data_type: INT
4361    primary_key: true
4362  - name: user_id
4363    data_type: INT
4364    foreign_key:
4365      table_id: users
4366      column_name: id
4367"#;
4368
4369        let (table, errors) = parser.parse(odcl_yaml).unwrap();
4370        assert_eq!(table.columns.len(), 2);
4371        let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
4372        assert!(user_id_col.foreign_key.is_some());
4373        assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
4374        assert_eq!(errors.len(), 0);
4375    }
4376
4377    #[test]
4378    fn test_parse_odcl_with_constraints() {
4379        let mut parser = ODCSImporter::new();
4380        let odcl_yaml = r#"
4381name: products
4382columns:
4383  - name: id
4384    data_type: INT
4385    primary_key: true
4386  - name: name
4387    data_type: VARCHAR(255)
4388    nullable: false
4389    constraints:
4390      - UNIQUE
4391      - NOT NULL
4392"#;
4393
4394        let (table, errors) = parser.parse(odcl_yaml).unwrap();
4395        assert_eq!(table.columns.len(), 2);
4396        let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
4397        assert!(!name_col.nullable);
4398        assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
4399        assert_eq!(errors.len(), 0);
4400    }
4401
4402    #[test]
4403    fn test_parse_odcs_v3_multiple_tables() {
4404        let mut parser = ODCSImporter::new();
4405        let odcs_yaml = r#"
4406apiVersion: v3.1.0
4407kind: DataContract
4408id: 550e8400-e29b-41d4-a716-446655440000
4409version: 1.0.0
4410name: multi_table_contract
4411domain: sales
4412schema:
4413  - name: orders
4414    properties:
4415      - name: order_id
4416        logicalType: integer
4417        primaryKey: true
4418      - name: customer_id
4419        logicalType: integer
4420      - name: total_amount
4421        logicalType: decimal
4422  - name: order_items
4423    properties:
4424      - name: item_id
4425        logicalType: integer
4426        primaryKey: true
4427      - name: order_id
4428        logicalType: integer
4429      - name: product_name
4430        logicalType: string
4431      - name: quantity
4432        logicalType: integer
4433  - name: customers
4434    properties:
4435      - name: customer_id
4436        logicalType: integer
4437        primaryKey: true
4438      - name: name
4439        logicalType: string
4440      - name: email
4441        logicalType: string
4442"#;
4443
4444        let result = parser.import(odcs_yaml).unwrap();
4445
4446        // Should return 3 tables
4447        assert_eq!(
4448            result.tables.len(),
4449            3,
4450            "Expected 3 tables, got {}",
4451            result.tables.len()
4452        );
4453
4454        // Verify table names
4455        let table_names: Vec<&str> = result
4456            .tables
4457            .iter()
4458            .map(|t| t.name.as_deref().unwrap_or(""))
4459            .collect();
4460        assert!(table_names.contains(&"orders"), "Missing 'orders' table");
4461        assert!(
4462            table_names.contains(&"order_items"),
4463            "Missing 'order_items' table"
4464        );
4465        assert!(
4466            table_names.contains(&"customers"),
4467            "Missing 'customers' table"
4468        );
4469
4470        // Verify column counts for each table
4471        let orders_table = result
4472            .tables
4473            .iter()
4474            .find(|t| t.name.as_deref() == Some("orders"))
4475            .unwrap();
4476        assert_eq!(
4477            orders_table.columns.len(),
4478            3,
4479            "orders table should have 3 columns"
4480        );
4481
4482        let order_items_table = result
4483            .tables
4484            .iter()
4485            .find(|t| t.name.as_deref() == Some("order_items"))
4486            .unwrap();
4487        assert_eq!(
4488            order_items_table.columns.len(),
4489            4,
4490            "order_items table should have 4 columns"
4491        );
4492
4493        let customers_table = result
4494            .tables
4495            .iter()
4496            .find(|t| t.name.as_deref() == Some("customers"))
4497            .unwrap();
4498        assert_eq!(
4499            customers_table.columns.len(),
4500            3,
4501            "customers table should have 3 columns"
4502        );
4503
4504        // Verify contract-level metadata is shared across all tables
4505        for table in &result.tables {
4506            assert_eq!(table.api_version.as_deref(), Some("v3.1.0"));
4507            assert_eq!(table.kind.as_deref(), Some("DataContract"));
4508            assert_eq!(table.domain.as_deref(), Some("sales"));
4509        }
4510
4511        // Verify table indices are sequential
4512        assert_eq!(result.tables[0].table_index, 0);
4513        assert_eq!(result.tables[1].table_index, 1);
4514        assert_eq!(result.tables[2].table_index, 2);
4515    }
4516
4517    #[test]
4518    fn test_parse_odcs_v3_single_table_backwards_compatible() {
4519        // Ensure single-table ODCS files still work correctly
4520        let mut parser = ODCSImporter::new();
4521        let odcs_yaml = r#"
4522apiVersion: v3.1.0
4523kind: DataContract
4524id: 550e8400-e29b-41d4-a716-446655440001
4525version: 1.0.0
4526name: single_table_contract
4527schema:
4528  - name: users
4529    properties:
4530      - name: user_id
4531        logicalType: integer
4532        primaryKey: true
4533      - name: username
4534        logicalType: string
4535"#;
4536
4537        let result = parser.import(odcs_yaml).unwrap();
4538
4539        // Should return exactly 1 table
4540        assert_eq!(
4541            result.tables.len(),
4542            1,
4543            "Expected 1 table for single-schema ODCS"
4544        );
4545        assert_eq!(result.tables[0].name.as_deref(), Some("users"));
4546        assert_eq!(result.tables[0].columns.len(), 2);
4547    }
4548}