data_modelling_core/import/
odcs.rs

1//! ODCS parser service for parsing Open Data Contract Standard YAML files.
2//!
3//! This service parses ODCS (Open Data Contract Standard) v3.1.0 and legacy ODCL (Data Contract Specification) YAML files
4//! and converts them to Table models. ODCL files are automatically converted to ODCS v3.1.0 format.
5//! Supports multiple formats:
6//! - ODCS v3.1.0 / v3.0.x format (apiVersion, kind, schema) - PRIMARY FORMAT
7//! - ODCL (Data Contract Specification) format (dataContractSpecification, models, definitions) - LEGACY, converted to ODCS
8//! - Simple ODCL format (name, columns) - LEGACY, converted to ODCS
9//! - Liquibase format
10
11use super::odcs_shared::{
12    ParserError, column_to_column_data, expand_nested_column, json_value_to_serde_value,
13    normalize_data_type, parse_data_vault_classification, parse_medallion_layer, parse_scd_pattern,
14    resolve_ref, yaml_to_json_value,
15};
16use super::{ImportError, ImportResult, TableData};
17use crate::models::column::ForeignKey;
18use crate::models::enums::{DataVaultClassification, DatabaseType, MedallionLayer, SCDPattern};
19use crate::models::{Column, PropertyRelationship, Table, Tag};
20use anyhow::{Context, Result};
21use serde_json::Value as JsonValue;
22use std::collections::HashMap;
23use std::str::FromStr;
24use tracing::info;
25
26// Re-export ParserError for backward compatibility
27pub use super::odcs_shared::ParserError as OdcsParserError;
28
29/// Convert a $ref path to a PropertyRelationship.
30/// E.g., "#/definitions/order_id" -> PropertyRelationship { type: "foreignKey", to: "definitions/order_id" }
31fn ref_to_relationships(ref_path: &Option<String>) -> Vec<PropertyRelationship> {
32    match ref_path {
33        Some(ref_str) => {
34            let to = if ref_str.starts_with("#/definitions/") {
35                let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
36                format!("definitions/{}", def_path)
37            } else if ref_str.starts_with("#/") {
38                ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
39            } else {
40                ref_str.clone()
41            };
42            vec![PropertyRelationship {
43                relationship_type: "foreignKey".to_string(),
44                to,
45            }]
46        }
47        None => Vec::new(),
48    }
49}
50
51/// ODCS parser service for parsing Open Data Contract Standard YAML files.
52/// Handles ODCS v3.1.0 (primary format) and legacy ODCL formats (converted to ODCS).
53pub struct ODCSImporter {
54    /// Current YAML data for $ref resolution
55    current_yaml_data: Option<serde_yaml::Value>,
56}
57
58impl ODCSImporter {
59    /// Create a new ODCS parser instance.
60    ///
61    /// # Example
62    ///
63    /// ```rust
64    /// use data_modelling_core::import::odcs::ODCSImporter;
65    ///
66    /// let mut importer = ODCSImporter::new();
67    /// ```
68    pub fn new() -> Self {
69        Self {
70            current_yaml_data: None,
71        }
72    }
73
74    /// Import ODCS/ODCL YAML content and create Table (SDK interface).
75    ///
76    /// Supports ODCS v3.1.0 (primary), legacy ODCL formats (converted to ODCS), and Liquibase formats.
77    ///
78    /// # Arguments
79    ///
80    /// * `yaml_content` - ODCS/ODCL YAML content as a string
81    ///
82    /// # Returns
83    ///
84    /// An `ImportResult` containing the extracted table and any parse errors.
85    ///
86    /// # Example
87    ///
88    /// ```rust
89    /// use data_modelling_core::import::odcs::ODCSImporter;
90    ///
91    /// let mut importer = ODCSImporter::new();
92    /// let yaml = r#"
93    /// apiVersion: v3.1.0
94    /// kind: DataContract
95    /// id: 550e8400-e29b-41d4-a716-446655440000
96    /// version: 1.0.0
97    /// name: users
98    /// schema:
99    ///   - name: users
100    ///     properties:
101    ///       - name: id
102    ///         logicalType: bigint
103    /// "#;
104    /// let result = importer.import(yaml).unwrap();
105    /// assert_eq!(result.tables.len(), 1);
106    /// ```
107    pub fn import(&mut self, yaml_content: &str) -> Result<ImportResult, ImportError> {
108        // First parse YAML to get raw data for ODCS field extraction
109        let yaml_data: serde_yaml::Value = serde_yaml::from_str(yaml_content)
110            .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
111
112        let json_data = yaml_to_json_value(&yaml_data).map_err(|e| {
113            ImportError::ParseError(format!("Failed to convert YAML to JSON: {}", e))
114        })?;
115
116        // Store current YAML data for $ref resolution
117        self.current_yaml_data = Some(yaml_data);
118
119        // Check if this is ODCS v3 format with multiple tables in schema array
120        if self.is_odcl_v3_format(&json_data) {
121            // Use parse_odcl_v3_all to get ALL tables from the schema array
122            match self.parse_odcl_v3_all(&json_data) {
123                Ok(tables_with_errors) => {
124                    let mut sdk_tables = Vec::new();
125                    let mut all_errors = Vec::new();
126
127                    for (table_index, (table, errors)) in tables_with_errors.into_iter().enumerate()
128                    {
129                        // Convert errors
130                        for e in errors {
131                            all_errors.push(ImportError::ParseError(e.message));
132                        }
133
134                        // Create TableData for this table
135                        sdk_tables.push(self.table_to_table_data(table, table_index, &json_data));
136                    }
137
138                    info!(
139                        "ODCS import completed: {} tables extracted from schema array",
140                        sdk_tables.len()
141                    );
142
143                    return Ok(ImportResult {
144                        tables: sdk_tables,
145                        tables_requiring_name: Vec::new(),
146                        errors: all_errors,
147                        ai_suggestions: None,
148                    });
149                }
150                Err(e) => {
151                    return Err(ImportError::ParseError(e.to_string()));
152                }
153            }
154        }
155
156        // Fall back to single-table parsing for non-ODCS v3 formats
157        match self.parse(yaml_content) {
158            Ok((table, errors)) => {
159                let sdk_tables = vec![self.table_to_table_data(table, 0, &json_data)];
160                let sdk_errors: Vec<ImportError> = errors
161                    .iter()
162                    .map(|e| ImportError::ParseError(e.message.clone()))
163                    .collect();
164                Ok(ImportResult {
165                    tables: sdk_tables,
166                    tables_requiring_name: Vec::new(),
167                    errors: sdk_errors,
168                    ai_suggestions: None,
169                })
170            }
171            Err(e) => Err(ImportError::ParseError(e.to_string())),
172        }
173    }
174
175    /// Convert a Table to TableData for the SDK interface.
176    fn table_to_table_data(
177        &self,
178        table: Table,
179        table_index: usize,
180        json_data: &JsonValue,
181    ) -> TableData {
182        TableData {
183            table_index,
184            id: Some(table.id.to_string()),
185            name: Some(table.name.clone()),
186            api_version: json_data
187                .get("apiVersion")
188                .and_then(|v| v.as_str())
189                .map(|s| s.to_string()),
190            version: json_data
191                .get("version")
192                .and_then(|v| v.as_str())
193                .map(|s| s.to_string()),
194            status: json_data
195                .get("status")
196                .and_then(|v| v.as_str())
197                .map(|s| s.to_string()),
198            kind: json_data
199                .get("kind")
200                .and_then(|v| v.as_str())
201                .map(|s| s.to_string()),
202            domain: json_data
203                .get("domain")
204                .and_then(|v| v.as_str())
205                .map(|s| s.to_string()),
206            data_product: json_data
207                .get("dataProduct")
208                .and_then(|v| v.as_str())
209                .map(|s| s.to_string()),
210            tenant: json_data
211                .get("tenant")
212                .and_then(|v| v.as_str())
213                .map(|s| s.to_string()),
214            description: json_data.get("description").cloned(),
215            // Schema-level fields (from table's odcl_metadata or direct fields)
216            physical_name: table
217                .odcl_metadata
218                .get("physicalName")
219                .and_then(|v| v.as_str())
220                .map(|s| s.to_string())
221                .or_else(|| table.schema_name.clone()),
222            physical_type: table
223                .odcl_metadata
224                .get("physicalType")
225                .and_then(|v| v.as_str())
226                .map(|s| s.to_string()),
227            business_name: table
228                .odcl_metadata
229                .get("businessName")
230                .and_then(|v| v.as_str())
231                .map(|s| s.to_string()),
232            data_granularity_description: table
233                .odcl_metadata
234                .get("dataGranularityDescription")
235                .and_then(|v| v.as_str())
236                .map(|s| s.to_string()),
237            columns: table.columns.iter().map(column_to_column_data).collect(),
238            servers: json_data
239                .get("servers")
240                .and_then(|v| v.as_array())
241                .cloned()
242                .unwrap_or_default(),
243            team: json_data.get("team").cloned(),
244            support: json_data.get("support").cloned(),
245            roles: json_data
246                .get("roles")
247                .and_then(|v| v.as_array())
248                .cloned()
249                .unwrap_or_default(),
250            sla_properties: json_data
251                .get("slaProperties")
252                .and_then(|v| v.as_array())
253                .cloned()
254                .unwrap_or_default(),
255            quality: table.quality.clone(),
256            price: json_data.get("price").cloned(),
257            tags: table.tags.iter().map(|t| t.to_string()).collect(),
258            custom_properties: json_data
259                .get("customProperties")
260                .and_then(|v| v.as_array())
261                .cloned()
262                .unwrap_or_default(),
263            authoritative_definitions: json_data
264                .get("authoritativeDefinitions")
265                .and_then(|v| v.as_array())
266                .cloned()
267                .unwrap_or_default(),
268            contract_created_ts: json_data
269                .get("contractCreatedTs")
270                .and_then(|v| v.as_str())
271                .map(|s| s.to_string()),
272            odcs_metadata: table.odcl_metadata.clone(),
273        }
274    }
275
276    /// Import ODCS YAML content and return an ODCSContract (new v2 API).
277    ///
278    /// This method returns the native ODCS contract structure, which properly models
279    /// the ODCS v3.1.0 three-level hierarchy:
280    /// - Contract level (apiVersion, domain, etc.)
281    /// - Schema level (tables/views with physicalName, businessName, etc.)
282    /// - Property level (columns with all metadata)
283    ///
284    /// This API provides lossless round-trip import/export and supports multi-table contracts.
285    ///
286    /// # Arguments
287    ///
288    /// * `yaml_content` - ODCS YAML content as a string
289    ///
290    /// # Returns
291    ///
292    /// An `ODCSContract` containing the full contract with all schemas and properties.
293    ///
294    /// # Example
295    ///
296    /// ```rust
297    /// use data_modelling_core::import::odcs::ODCSImporter;
298    ///
299    /// let mut importer = ODCSImporter::new();
300    /// let yaml = r#"
301    /// apiVersion: v3.1.0
302    /// kind: DataContract
303    /// id: 550e8400-e29b-41d4-a716-446655440000
304    /// version: 1.0.0
305    /// name: my-contract
306    /// domain: retail
307    /// schema:
308    ///   - name: users
309    ///     physicalName: tbl_users
310    ///     properties:
311    ///       - name: id
312    ///         logicalType: integer
313    ///         primaryKey: true
314    ///       - name: email
315    ///         logicalType: string
316    ///         required: true
317    /// "#;
318    /// let contract = importer.import_contract(yaml).unwrap();
319    /// assert_eq!(contract.name, "my-contract");
320    /// assert_eq!(contract.schema.len(), 1);
321    /// assert_eq!(contract.schema[0].name, "users");
322    /// ```
323    pub fn import_contract(
324        &mut self,
325        yaml_content: &str,
326    ) -> Result<crate::models::odcs::ODCSContract, ImportError> {
327        use crate::models::odcs::{
328            AuthoritativeDefinition, CustomProperty, Description, ODCSContract, QualityRule, Role,
329            Server, ServiceLevel, Support, Team,
330        };
331
332        // Parse YAML
333        let yaml_data: serde_yaml::Value = serde_yaml::from_str(yaml_content)
334            .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
335
336        let json_data = yaml_to_json_value(&yaml_data).map_err(|e| {
337            ImportError::ParseError(format!("Failed to convert YAML to JSON: {}", e))
338        })?;
339
340        // Store current YAML data for $ref resolution
341        self.current_yaml_data = Some(yaml_data);
342
343        // Check if this is ODCS v3 format
344        if !self.is_odcl_v3_format(&json_data) {
345            return Err(ImportError::ParseError(
346                "import_contract() only supports ODCS v3 format. Use import() for legacy formats."
347                    .to_string(),
348            ));
349        }
350
351        // Extract contract-level fields
352        let api_version = json_data
353            .get("apiVersion")
354            .and_then(|v| v.as_str())
355            .unwrap_or("v3.1.0")
356            .to_string();
357        let kind = json_data
358            .get("kind")
359            .and_then(|v| v.as_str())
360            .unwrap_or("DataContract")
361            .to_string();
362        let id = json_data
363            .get("id")
364            .and_then(|v| v.as_str())
365            .unwrap_or("")
366            .to_string();
367        let version = json_data
368            .get("version")
369            .and_then(|v| v.as_str())
370            .unwrap_or("1.0.0")
371            .to_string();
372        let name = json_data
373            .get("name")
374            .and_then(|v| v.as_str())
375            .unwrap_or("")
376            .to_string();
377        let status = json_data
378            .get("status")
379            .and_then(|v| v.as_str())
380            .map(|s| s.to_string());
381        let domain = json_data
382            .get("domain")
383            .and_then(|v| v.as_str())
384            .map(|s| s.to_string());
385        let data_product = json_data
386            .get("dataProduct")
387            .and_then(|v| v.as_str())
388            .map(|s| s.to_string());
389        let tenant = json_data
390            .get("tenant")
391            .and_then(|v| v.as_str())
392            .map(|s| s.to_string());
393
394        // Description can be string or object
395        let description = json_data.get("description").and_then(|v| {
396            if v.is_string() {
397                Some(Description::Simple(v.as_str().unwrap().to_string()))
398            } else if v.is_object() {
399                serde_json::from_value::<Description>(json_value_to_serde_value(v)).ok()
400            } else {
401                None
402            }
403        });
404
405        // Parse schema array
406        let schema = self.parse_schema_array_to_odcs(&json_data)?;
407
408        // Parse servers
409        let servers: Vec<Server> = json_data
410            .get("servers")
411            .and_then(|v| v.as_array())
412            .map(|arr| {
413                arr.iter()
414                    .filter_map(|s| serde_json::from_value(json_value_to_serde_value(s)).ok())
415                    .collect()
416            })
417            .unwrap_or_default();
418
419        // Parse team
420        let team: Option<Team> = json_data
421            .get("team")
422            .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
423
424        // Parse support
425        let support: Option<Support> = json_data
426            .get("support")
427            .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
428
429        // Parse roles
430        let roles: Vec<Role> = json_data
431            .get("roles")
432            .and_then(|v| v.as_array())
433            .map(|arr| {
434                arr.iter()
435                    .filter_map(|r| serde_json::from_value(json_value_to_serde_value(r)).ok())
436                    .collect()
437            })
438            .unwrap_or_default();
439
440        // Parse service levels
441        let service_levels: Vec<ServiceLevel> = json_data
442            .get("slaProperties")
443            .or_else(|| json_data.get("serviceLevels"))
444            .and_then(|v| v.as_array())
445            .map(|arr| {
446                arr.iter()
447                    .filter_map(|s| serde_json::from_value(json_value_to_serde_value(s)).ok())
448                    .collect()
449            })
450            .unwrap_or_default();
451
452        // Parse contract-level quality rules
453        let quality: Vec<QualityRule> = json_data
454            .get("quality")
455            .and_then(|v| v.as_array())
456            .map(|arr| {
457                arr.iter()
458                    .filter_map(|q| serde_json::from_value(json_value_to_serde_value(q)).ok())
459                    .collect()
460            })
461            .unwrap_or_default();
462
463        // Parse price
464        let price = json_data
465            .get("price")
466            .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
467
468        // Parse terms
469        let terms = json_data
470            .get("terms")
471            .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
472
473        // Parse links
474        let links = json_data
475            .get("links")
476            .and_then(|v| v.as_array())
477            .map(|arr| {
478                arr.iter()
479                    .filter_map(|l| serde_json::from_value(json_value_to_serde_value(l)).ok())
480                    .collect()
481            })
482            .unwrap_or_default();
483
484        // Parse authoritative definitions
485        let authoritative_definitions: Vec<AuthoritativeDefinition> = json_data
486            .get("authoritativeDefinitions")
487            .and_then(|v| v.as_array())
488            .map(|arr| {
489                arr.iter()
490                    .filter_map(|ad| serde_json::from_value(json_value_to_serde_value(ad)).ok())
491                    .collect()
492            })
493            .unwrap_or_default();
494
495        // Parse tags
496        let tags: Vec<String> = json_data
497            .get("tags")
498            .and_then(|v| v.as_array())
499            .map(|arr| {
500                arr.iter()
501                    .filter_map(|t| t.as_str().map(|s| s.to_string()))
502                    .collect()
503            })
504            .unwrap_or_default();
505
506        // Parse custom properties
507        let custom_properties: Vec<CustomProperty> = json_data
508            .get("customProperties")
509            .and_then(|v| v.as_array())
510            .map(|arr| {
511                arr.iter()
512                    .filter_map(|cp| serde_json::from_value(json_value_to_serde_value(cp)).ok())
513                    .collect()
514            })
515            .unwrap_or_default();
516
517        // Parse contract created timestamp
518        let contract_created_ts = json_data
519            .get("contractCreatedTs")
520            .and_then(|v| v.as_str())
521            .map(|s| s.to_string());
522
523        Ok(ODCSContract {
524            api_version,
525            kind,
526            id,
527            version,
528            name,
529            status,
530            domain,
531            data_product,
532            tenant,
533            description,
534            schema,
535            servers,
536            team,
537            support,
538            roles,
539            service_levels,
540            quality,
541            price,
542            terms,
543            links,
544            authoritative_definitions,
545            tags,
546            custom_properties,
547            contract_created_ts,
548        })
549    }
550
551    /// Parse the schema array into ODCS SchemaObject types
552    fn parse_schema_array_to_odcs(
553        &self,
554        json_data: &JsonValue,
555    ) -> Result<Vec<crate::models::odcs::SchemaObject>, ImportError> {
556        use crate::models::odcs::{
557            AuthoritativeDefinition, CustomProperty, QualityRule, SchemaObject, SchemaRelationship,
558        };
559
560        let schema_arr = match json_data.get("schema").and_then(|v| v.as_array()) {
561            Some(arr) => arr,
562            None => return Ok(Vec::new()),
563        };
564
565        let mut schemas = Vec::new();
566
567        for (idx, schema_value) in schema_arr.iter().enumerate() {
568            let schema_obj = match schema_value.as_object() {
569                Some(obj) => obj,
570                None => {
571                    return Err(ImportError::ParseError(format!(
572                        "Schema object at index {} must be a dictionary",
573                        idx
574                    )));
575                }
576            };
577
578            let name = schema_obj
579                .get("name")
580                .and_then(|v| v.as_str())
581                .ok_or_else(|| {
582                    ImportError::ParseError(format!(
583                        "Schema object at index {} missing 'name' field",
584                        idx
585                    ))
586                })?
587                .to_string();
588
589            let id = schema_obj
590                .get("id")
591                .and_then(|v| v.as_str())
592                .map(|s| s.to_string());
593            let physical_name = schema_obj
594                .get("physicalName")
595                .and_then(|v| v.as_str())
596                .map(|s| s.to_string());
597            let physical_type = schema_obj
598                .get("physicalType")
599                .and_then(|v| v.as_str())
600                .map(|s| s.to_string());
601            let business_name = schema_obj
602                .get("businessName")
603                .and_then(|v| v.as_str())
604                .map(|s| s.to_string());
605            let description = schema_obj
606                .get("description")
607                .and_then(|v| v.as_str())
608                .map(|s| s.to_string());
609            let data_granularity_description = schema_obj
610                .get("dataGranularityDescription")
611                .and_then(|v| v.as_str())
612                .map(|s| s.to_string());
613
614            // Parse properties
615            let properties = self.parse_properties_to_odcs(schema_obj, json_data)?;
616
617            // Parse schema-level relationships
618            let relationships: Vec<SchemaRelationship> = schema_obj
619                .get("relationships")
620                .and_then(|v| v.as_array())
621                .map(|arr| {
622                    arr.iter()
623                        .filter_map(|r| serde_json::from_value(json_value_to_serde_value(r)).ok())
624                        .collect()
625                })
626                .unwrap_or_default();
627
628            // Parse schema-level quality rules
629            let quality: Vec<QualityRule> = schema_obj
630                .get("quality")
631                .and_then(|v| v.as_array())
632                .map(|arr| {
633                    arr.iter()
634                        .filter_map(|q| serde_json::from_value(json_value_to_serde_value(q)).ok())
635                        .collect()
636                })
637                .unwrap_or_default();
638
639            // Parse authoritative definitions
640            let authoritative_definitions: Vec<AuthoritativeDefinition> = schema_obj
641                .get("authoritativeDefinitions")
642                .and_then(|v| v.as_array())
643                .map(|arr| {
644                    arr.iter()
645                        .filter_map(|ad| serde_json::from_value(json_value_to_serde_value(ad)).ok())
646                        .collect()
647                })
648                .unwrap_or_default();
649
650            // Parse tags
651            let tags: Vec<String> = schema_obj
652                .get("tags")
653                .and_then(|v| v.as_array())
654                .map(|arr| {
655                    arr.iter()
656                        .filter_map(|t| t.as_str().map(|s| s.to_string()))
657                        .collect()
658                })
659                .unwrap_or_default();
660
661            // Parse custom properties
662            let custom_properties: Vec<CustomProperty> = schema_obj
663                .get("customProperties")
664                .and_then(|v| v.as_array())
665                .map(|arr| {
666                    arr.iter()
667                        .filter_map(|cp| serde_json::from_value(json_value_to_serde_value(cp)).ok())
668                        .collect()
669                })
670                .unwrap_or_default();
671
672            schemas.push(SchemaObject {
673                id,
674                name,
675                physical_name,
676                physical_type,
677                business_name,
678                description,
679                data_granularity_description,
680                properties,
681                relationships,
682                quality,
683                authoritative_definitions,
684                tags,
685                custom_properties,
686            });
687        }
688
689        Ok(schemas)
690    }
691
692    /// Parse properties array into ODCS Property types (with nested support)
693    fn parse_properties_to_odcs(
694        &self,
695        schema_obj: &serde_json::Map<String, JsonValue>,
696        root_data: &JsonValue,
697    ) -> Result<Vec<crate::models::odcs::Property>, ImportError> {
698        let props_arr = match schema_obj.get("properties").and_then(|v| v.as_array()) {
699            Some(arr) => arr,
700            None => return Ok(Vec::new()),
701        };
702
703        let mut properties = Vec::new();
704
705        for prop_value in props_arr {
706            let prop_obj = match prop_value.as_object() {
707                Some(obj) => obj,
708                None => continue,
709            };
710
711            let prop = self.parse_single_property_to_odcs(prop_obj, root_data)?;
712            properties.push(prop);
713        }
714
715        Ok(properties)
716    }
717
718    /// Parse a single property object to ODCS Property (recursive for nested)
719    #[allow(clippy::only_used_in_recursion)]
720    fn parse_single_property_to_odcs(
721        &self,
722        prop_obj: &serde_json::Map<String, JsonValue>,
723        root_data: &JsonValue,
724    ) -> Result<crate::models::odcs::Property, ImportError> {
725        use crate::models::odcs::{
726            AuthoritativeDefinition, CustomProperty, LogicalTypeOptions, Property,
727            PropertyRelationship, QualityRule,
728        };
729
730        // Handle $ref
731        if let Some(ref_path) = prop_obj.get("$ref").and_then(|v| v.as_str())
732            && let Some(resolved) = resolve_ref(ref_path, root_data)
733            && let Some(obj) = resolved.as_object()
734        {
735            return self.parse_single_property_to_odcs(obj, root_data);
736        }
737
738        let name = prop_obj
739            .get("name")
740            .and_then(|v| v.as_str())
741            .unwrap_or("")
742            .to_string();
743        let logical_type = prop_obj
744            .get("logicalType")
745            .or_else(|| prop_obj.get("type"))
746            .and_then(|v| v.as_str())
747            .unwrap_or("string")
748            .to_string();
749
750        let id = prop_obj
751            .get("id")
752            .and_then(|v| v.as_str())
753            .map(|s| s.to_string());
754        let business_name = prop_obj
755            .get("businessName")
756            .and_then(|v| v.as_str())
757            .map(|s| s.to_string());
758        let description = prop_obj
759            .get("description")
760            .and_then(|v| v.as_str())
761            .map(|s| s.to_string());
762        let physical_type = prop_obj
763            .get("physicalType")
764            .and_then(|v| v.as_str())
765            .map(|s| s.to_string());
766        let physical_name = prop_obj
767            .get("physicalName")
768            .and_then(|v| v.as_str())
769            .map(|s| s.to_string());
770
771        // Parse logicalTypeOptions
772        let logical_type_options: Option<LogicalTypeOptions> = prop_obj
773            .get("logicalTypeOptions")
774            .and_then(|v| serde_json::from_value(json_value_to_serde_value(v)).ok());
775
776        // Key constraints
777        let required = prop_obj
778            .get("required")
779            .and_then(|v| v.as_bool())
780            .unwrap_or(false);
781        let primary_key = prop_obj
782            .get("primaryKey")
783            .and_then(|v| v.as_bool())
784            .unwrap_or(false);
785        let primary_key_position = prop_obj
786            .get("primaryKeyPosition")
787            .and_then(|v| v.as_i64())
788            .map(|n| n as i32);
789        let unique = prop_obj
790            .get("unique")
791            .and_then(|v| v.as_bool())
792            .unwrap_or(false);
793
794        // Partitioning & clustering
795        let partitioned = prop_obj
796            .get("partitioned")
797            .and_then(|v| v.as_bool())
798            .unwrap_or(false);
799        let partition_key_position = prop_obj
800            .get("partitionKeyPosition")
801            .and_then(|v| v.as_i64())
802            .map(|n| n as i32);
803        let clustered = prop_obj
804            .get("clustered")
805            .and_then(|v| v.as_bool())
806            .unwrap_or(false);
807
808        // Classification & security
809        let classification = prop_obj
810            .get("classification")
811            .and_then(|v| v.as_str())
812            .map(|s| s.to_string());
813        let critical_data_element = prop_obj
814            .get("criticalDataElement")
815            .and_then(|v| v.as_bool())
816            .unwrap_or(false);
817        let encrypted_name = prop_obj
818            .get("encryptedName")
819            .and_then(|v| v.as_str())
820            .map(|s| s.to_string());
821
822        // Transformation
823        let transform_source_objects: Vec<String> = prop_obj
824            .get("transformSourceObjects")
825            .and_then(|v| v.as_array())
826            .map(|arr| {
827                arr.iter()
828                    .filter_map(|t| t.as_str().map(|s| s.to_string()))
829                    .collect()
830            })
831            .unwrap_or_default();
832        let transform_logic = prop_obj
833            .get("transformLogic")
834            .and_then(|v| v.as_str())
835            .map(|s| s.to_string());
836        let transform_description = prop_obj
837            .get("transformDescription")
838            .and_then(|v| v.as_str())
839            .map(|s| s.to_string());
840
841        // Examples & defaults
842        let examples: Vec<serde_json::Value> = prop_obj
843            .get("examples")
844            .and_then(|v| v.as_array())
845            .map(|arr| arr.iter().map(json_value_to_serde_value).collect())
846            .unwrap_or_default();
847        let default_value = prop_obj.get("default").map(json_value_to_serde_value);
848
849        // Relationships
850        let relationships: Vec<PropertyRelationship> = prop_obj
851            .get("relationships")
852            .and_then(|v| v.as_array())
853            .map(|arr| {
854                arr.iter()
855                    .filter_map(|r| serde_json::from_value(json_value_to_serde_value(r)).ok())
856                    .collect()
857            })
858            .unwrap_or_default();
859
860        // Authoritative definitions
861        let authoritative_definitions: Vec<AuthoritativeDefinition> = prop_obj
862            .get("authoritativeDefinitions")
863            .and_then(|v| v.as_array())
864            .map(|arr| {
865                arr.iter()
866                    .filter_map(|ad| serde_json::from_value(json_value_to_serde_value(ad)).ok())
867                    .collect()
868            })
869            .unwrap_or_default();
870
871        // Quality rules
872        let quality: Vec<QualityRule> = prop_obj
873            .get("quality")
874            .and_then(|v| v.as_array())
875            .map(|arr| {
876                arr.iter()
877                    .filter_map(|q| serde_json::from_value(json_value_to_serde_value(q)).ok())
878                    .collect()
879            })
880            .unwrap_or_default();
881
882        // Enum values
883        let enum_values: Vec<String> = prop_obj
884            .get("enum")
885            .and_then(|v| v.as_array())
886            .map(|arr| {
887                arr.iter()
888                    .filter_map(|e| e.as_str().map(|s| s.to_string()))
889                    .collect()
890            })
891            .unwrap_or_default();
892
893        // Tags
894        let tags: Vec<String> = prop_obj
895            .get("tags")
896            .and_then(|v| v.as_array())
897            .map(|arr| {
898                arr.iter()
899                    .filter_map(|t| t.as_str().map(|s| s.to_string()))
900                    .collect()
901            })
902            .unwrap_or_default();
903
904        // Custom properties
905        let custom_properties: Vec<CustomProperty> = prop_obj
906            .get("customProperties")
907            .and_then(|v| v.as_array())
908            .map(|arr| {
909                arr.iter()
910                    .filter_map(|cp| serde_json::from_value(json_value_to_serde_value(cp)).ok())
911                    .collect()
912            })
913            .unwrap_or_default();
914
915        // Parse nested properties (for OBJECT types)
916        let nested_properties: Vec<Property> =
917            if let Some(props) = prop_obj.get("properties").and_then(|v| v.as_array()) {
918                props
919                    .iter()
920                    .filter_map(|p| {
921                        p.as_object()
922                            .and_then(|obj| self.parse_single_property_to_odcs(obj, root_data).ok())
923                    })
924                    .collect()
925            } else {
926                Vec::new()
927            };
928
929        // Parse items (for ARRAY types)
930        let items: Option<Box<Property>> = prop_obj.get("items").and_then(|v| {
931            v.as_object()
932                .and_then(|obj| self.parse_single_property_to_odcs(obj, root_data).ok())
933                .map(Box::new)
934        });
935
936        Ok(Property {
937            id,
938            name,
939            business_name,
940            description,
941            logical_type,
942            physical_type,
943            physical_name,
944            logical_type_options,
945            required,
946            primary_key,
947            primary_key_position,
948            unique,
949            partitioned,
950            partition_key_position,
951            clustered,
952            classification,
953            critical_data_element,
954            encrypted_name,
955            transform_source_objects,
956            transform_logic,
957            transform_description,
958            examples,
959            default_value,
960            relationships,
961            authoritative_definitions,
962            quality,
963            enum_values,
964            tags,
965            custom_properties,
966            items,
967            properties: nested_properties,
968        })
969    }
970
971    /// Parse ODCS/ODCL YAML content and create Table (public method for native app use).
972    ///
973    /// This method returns the full Table object with all metadata, suitable for use in
974    /// native applications that need direct access to the parsed table structure.
975    /// For API use, prefer the `import()` method which returns ImportResult.
976    ///
977    /// # Returns
978    ///
979    /// Returns a tuple of (Table, list of errors/warnings).
980    /// Errors list is empty if parsing is successful.
981    pub fn parse_table(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
982        self.parse(yaml_content)
983    }
984
985    /// Parse ODCS/ODCL YAML content and create Table (internal method).
986    ///
987    /// Supports ODCS v3.1.0/v3.0.x (primary), ODCL Data Contract spec (legacy, converted to ODCS),
988    /// simple ODCL (legacy, converted to ODCS), and Liquibase formats.
989    ///
990    /// # Returns
991    ///
992    /// Returns a tuple of (Table, list of errors/warnings).
993    /// Errors list is empty if parsing is successful.
994    fn parse(&mut self, yaml_content: &str) -> Result<(Table, Vec<ParserError>)> {
995        // Errors are collected in helper functions, not here
996        let _errors: Vec<ParserError> = Vec::new();
997
998        // Parse YAML
999        let data: serde_yaml::Value =
1000            serde_yaml::from_str(yaml_content).context("Failed to parse YAML")?;
1001
1002        if data.is_null() {
1003            return Err(anyhow::anyhow!("Empty YAML content"));
1004        }
1005
1006        // Store current YAML data for $ref resolution
1007        self.current_yaml_data = Some(data.clone());
1008
1009        // Convert to JSON Value for easier manipulation
1010        let json_data = yaml_to_json_value(&data)?;
1011
1012        // Check format and parse accordingly
1013        if self.is_liquibase_format(&json_data) {
1014            return self.parse_liquibase(&json_data);
1015        }
1016
1017        if self.is_odcl_v3_format(&json_data) {
1018            return self.parse_odcl_v3(&json_data);
1019        }
1020
1021        if self.is_data_contract_format(&json_data) {
1022            return self.parse_data_contract(&json_data);
1023        }
1024
1025        // Fall back to simple ODCL format
1026        self.parse_simple_odcl(&json_data)
1027    }
1028
1029    /// Check if YAML is in Liquibase format.
1030    fn is_liquibase_format(&self, data: &JsonValue) -> bool {
1031        if data.get("databaseChangeLog").is_some() {
1032            return true;
1033        }
1034        // Check for Liquibase-specific keys
1035        if let Some(obj) = data.as_object() {
1036            let obj_str = format!("{:?}", obj);
1037            if obj_str.contains("changeSet") {
1038                return true;
1039            }
1040        }
1041        false
1042    }
1043
1044    /// Check if YAML is in ODCS v3.0.x format.
1045    fn is_odcl_v3_format(&self, data: &JsonValue) -> bool {
1046        if let Some(obj) = data.as_object() {
1047            let has_api_version = obj.contains_key("apiVersion");
1048            let has_kind = obj
1049                .get("kind")
1050                .and_then(|v| v.as_str())
1051                .map(|s| s == "DataContract")
1052                .unwrap_or(false);
1053            let has_id = obj.contains_key("id");
1054            let has_version = obj.contains_key("version");
1055            return has_api_version && has_kind && has_id && has_version;
1056        }
1057        false
1058    }
1059
1060    /// Check if YAML is in Data Contract specification format.
1061    fn is_data_contract_format(&self, data: &JsonValue) -> bool {
1062        if let Some(obj) = data.as_object() {
1063            let has_spec = obj.contains_key("dataContractSpecification");
1064            let has_models = obj.get("models").and_then(|v| v.as_object()).is_some();
1065            return has_spec && has_models;
1066        }
1067        false
1068    }
1069
1070    /// Parse simple ODCL format.
1071    fn parse_simple_odcl(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1072        let mut errors = Vec::new();
1073
1074        // Extract table name
1075        let name = data
1076            .get("name")
1077            .and_then(|v| v.as_str())
1078            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'name' field"))?
1079            .to_string();
1080
1081        // Extract columns
1082        let columns_data = data
1083            .get("columns")
1084            .and_then(|v| v.as_array())
1085            .ok_or_else(|| anyhow::anyhow!("ODCL YAML missing required 'columns' field"))?;
1086
1087        let mut columns = Vec::new();
1088        for (idx, col_data) in columns_data.iter().enumerate() {
1089            match self.parse_column(col_data) {
1090                Ok(col) => columns.push(col),
1091                Err(e) => {
1092                    errors.push(ParserError {
1093                        error_type: "column_parse_error".to_string(),
1094                        field: format!("columns[{}]", idx),
1095                        message: e.to_string(),
1096                    });
1097                }
1098            }
1099        }
1100
1101        // Extract metadata
1102        let database_type = self.extract_database_type(data);
1103        let medallion_layers = self.extract_medallion_layers(data);
1104        let scd_pattern = self.extract_scd_pattern(data);
1105        let data_vault_classification = self.extract_data_vault_classification(data);
1106        let quality_rules = self.extract_quality_rules(data);
1107
1108        // Validate pattern exclusivity
1109        if scd_pattern.is_some() && data_vault_classification.is_some() {
1110            errors.push(ParserError {
1111                error_type: "validation_error".to_string(),
1112                field: "patterns".to_string(),
1113                message: "SCD pattern and Data Vault classification are mutually exclusive"
1114                    .to_string(),
1115            });
1116        }
1117
1118        // Extract odcl_metadata
1119        let mut odcl_metadata = HashMap::new();
1120        if let Some(metadata) = data.get("odcl_metadata")
1121            && let Some(obj) = metadata.as_object()
1122        {
1123            for (key, value) in obj {
1124                odcl_metadata.insert(key.clone(), json_value_to_serde_value(value));
1125            }
1126        }
1127
1128        let table_uuid = self.extract_table_uuid(data);
1129
1130        let table = Table {
1131            id: table_uuid,
1132            name,
1133            columns,
1134            database_type,
1135            catalog_name: None,
1136            schema_name: None,
1137            medallion_layers,
1138            scd_pattern,
1139            data_vault_classification,
1140            modeling_level: None,
1141            tags: Vec::<Tag>::new(),
1142            odcl_metadata,
1143            owner: None,
1144            sla: None,
1145            contact_details: None,
1146            infrastructure_type: None,
1147            notes: None,
1148            position: None,
1149            yaml_file_path: None,
1150            drawio_cell_id: None,
1151            quality: quality_rules,
1152            errors: Vec::new(),
1153            created_at: chrono::Utc::now(),
1154            updated_at: chrono::Utc::now(),
1155        };
1156
1157        info!("Parsed ODCL table: {}", table.name);
1158        Ok((table, errors))
1159    }
1160
1161    /// Parse a single column definition.
1162    fn parse_column(&self, col_data: &JsonValue) -> Result<Column> {
1163        let name = col_data
1164            .get("name")
1165            .and_then(|v| v.as_str())
1166            .ok_or_else(|| anyhow::anyhow!("Column missing 'name' field"))?
1167            .to_string();
1168
1169        let data_type = col_data
1170            .get("data_type")
1171            .and_then(|v| v.as_str())
1172            .ok_or_else(|| anyhow::anyhow!("Column missing 'data_type' field"))?
1173            .to_string();
1174
1175        // Normalize data_type to uppercase (preserve STRUCT<...> format)
1176        let data_type = normalize_data_type(&data_type);
1177
1178        let nullable = col_data
1179            .get("nullable")
1180            .and_then(|v| v.as_bool())
1181            .unwrap_or(true);
1182
1183        let primary_key = col_data
1184            .get("primary_key")
1185            .and_then(|v| v.as_bool())
1186            .unwrap_or(false);
1187
1188        let foreign_key = col_data
1189            .get("foreign_key")
1190            .and_then(|v| self.parse_foreign_key(v));
1191
1192        let constraints = col_data
1193            .get("constraints")
1194            .and_then(|v| v.as_array())
1195            .map(|arr| {
1196                arr.iter()
1197                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1198                    .collect()
1199            })
1200            .unwrap_or_default();
1201
1202        let description = col_data
1203            .get("description")
1204            .and_then(|v| v.as_str())
1205            .map(|s| s.to_string())
1206            .unwrap_or_default();
1207
1208        // Extract column-level quality rules
1209        let mut column_quality_rules = Vec::new();
1210        if let Some(quality_val) = col_data.get("quality") {
1211            if let Some(arr) = quality_val.as_array() {
1212                // Array of quality rules
1213                for item in arr {
1214                    if let Some(obj) = item.as_object() {
1215                        let mut rule = HashMap::new();
1216                        for (key, value) in obj {
1217                            rule.insert(key.clone(), json_value_to_serde_value(value));
1218                        }
1219                        column_quality_rules.push(rule);
1220                    }
1221                }
1222            } else if let Some(obj) = quality_val.as_object() {
1223                // Single quality rule object
1224                let mut rule = HashMap::new();
1225                for (key, value) in obj {
1226                    rule.insert(key.clone(), json_value_to_serde_value(value));
1227                }
1228                column_quality_rules.push(rule);
1229            }
1230        }
1231
1232        // Note: ODCS v3.1.0 uses the `required` field at the property level to indicate non-nullable columns,
1233        // not a quality rule. We should NOT add a "not_null" quality rule as it's not a valid ODCS quality type.
1234        // The `nullable` field will be converted to `required` during export.
1235
1236        Ok(Column {
1237            name,
1238            data_type,
1239            nullable,
1240            primary_key,
1241            foreign_key,
1242            constraints,
1243            description,
1244            quality: column_quality_rules,
1245            ..Default::default()
1246        })
1247    }
1248
1249    /// Parse foreign key from JSON value.
1250    fn parse_foreign_key(&self, fk_data: &JsonValue) -> Option<ForeignKey> {
1251        let obj = fk_data.as_object()?;
1252        Some(ForeignKey {
1253            table_id: obj
1254                .get("table_id")
1255                .or_else(|| obj.get("table"))
1256                .and_then(|v| v.as_str())
1257                .unwrap_or("")
1258                .to_string(),
1259            column_name: obj
1260                .get("column_name")
1261                .or_else(|| obj.get("column"))
1262                .and_then(|v| v.as_str())
1263                .unwrap_or("")
1264                .to_string(),
1265        })
1266    }
1267
1268    /// Extract database type from data.
1269    fn extract_database_type(&self, data: &JsonValue) -> Option<DatabaseType> {
1270        data.get("database_type")
1271            .and_then(|v| v.as_str())
1272            .and_then(|s| match s.to_uppercase().as_str() {
1273                "POSTGRES" | "POSTGRESQL" => Some(DatabaseType::Postgres),
1274                "MYSQL" => Some(DatabaseType::Mysql),
1275                "SQL_SERVER" | "SQLSERVER" => Some(DatabaseType::SqlServer),
1276                "DATABRICKS" | "DATABRICKS_DELTA" => Some(DatabaseType::DatabricksDelta),
1277                "AWS_GLUE" | "GLUE" => Some(DatabaseType::AwsGlue),
1278                _ => None,
1279            })
1280    }
1281
1282    /// Extract medallion layers from data.
1283    fn extract_medallion_layers(&self, data: &JsonValue) -> Vec<MedallionLayer> {
1284        let mut layers = Vec::new();
1285
1286        // Check plural form first
1287        if let Some(arr) = data.get("medallion_layers").and_then(|v| v.as_array()) {
1288            for item in arr {
1289                if let Some(s) = item.as_str()
1290                    && let Ok(layer) = parse_medallion_layer(s)
1291                {
1292                    layers.push(layer);
1293                }
1294            }
1295        }
1296        // Check singular form (backward compatibility)
1297        else if let Some(s) = data.get("medallion_layer").and_then(|v| v.as_str())
1298            && let Ok(layer) = parse_medallion_layer(s)
1299        {
1300            layers.push(layer);
1301        }
1302
1303        layers
1304    }
1305
1306    /// Extract SCD pattern from data.
1307    fn extract_scd_pattern(&self, data: &JsonValue) -> Option<SCDPattern> {
1308        data.get("scd_pattern")
1309            .and_then(|v| v.as_str())
1310            .and_then(|s| parse_scd_pattern(s).ok())
1311    }
1312
1313    /// Extract Data Vault classification from data.
1314    fn extract_data_vault_classification(
1315        &self,
1316        data: &JsonValue,
1317    ) -> Option<DataVaultClassification> {
1318        data.get("data_vault_classification")
1319            .and_then(|v| v.as_str())
1320            .and_then(|s| parse_data_vault_classification(s).ok())
1321    }
1322
1323    /// Extract quality rules from data.
1324    fn extract_quality_rules(&self, data: &JsonValue) -> Vec<HashMap<String, serde_json::Value>> {
1325        use serde_json::Value;
1326        let mut quality_rules = Vec::new();
1327
1328        // Check for quality field at root level (array of objects or single object)
1329        if let Some(quality_val) = data.get("quality") {
1330            if let Some(arr) = quality_val.as_array() {
1331                // Array of quality rules
1332                for item in arr {
1333                    if let Some(obj) = item.as_object() {
1334                        let mut rule = HashMap::new();
1335                        for (key, value) in obj {
1336                            rule.insert(key.clone(), json_value_to_serde_value(value));
1337                        }
1338                        quality_rules.push(rule);
1339                    }
1340                }
1341            } else if let Some(obj) = quality_val.as_object() {
1342                // Single quality rule object
1343                let mut rule = HashMap::new();
1344                for (key, value) in obj {
1345                    rule.insert(key.clone(), json_value_to_serde_value(value));
1346                }
1347                quality_rules.push(rule);
1348            } else if let Some(s) = quality_val.as_str() {
1349                // Simple string quality value
1350                let mut rule = HashMap::new();
1351                rule.insert("value".to_string(), Value::String(s.to_string()));
1352                quality_rules.push(rule);
1353            }
1354        }
1355
1356        // Check for quality in metadata (ODCL v3 format)
1357        if let Some(metadata) = data.get("metadata")
1358            && let Some(metadata_obj) = metadata.as_object()
1359            && let Some(quality_val) = metadata_obj.get("quality")
1360        {
1361            if let Some(arr) = quality_val.as_array() {
1362                // Array of quality rules
1363                for item in arr {
1364                    if let Some(obj) = item.as_object() {
1365                        let mut rule = HashMap::new();
1366                        for (key, value) in obj {
1367                            rule.insert(key.clone(), json_value_to_serde_value(value));
1368                        }
1369                        quality_rules.push(rule);
1370                    }
1371                }
1372            } else if let Some(obj) = quality_val.as_object() {
1373                // Single quality rule object
1374                let mut rule = HashMap::new();
1375                for (key, value) in obj {
1376                    rule.insert(key.clone(), json_value_to_serde_value(value));
1377                }
1378                quality_rules.push(rule);
1379            } else if let Some(s) = quality_val.as_str() {
1380                // Simple string quality value
1381                let mut rule = HashMap::new();
1382                rule.insert("value".to_string(), Value::String(s.to_string()));
1383                quality_rules.push(rule);
1384            }
1385        }
1386
1387        // Check for tblproperties field (similar to SQL TBLPROPERTIES)
1388        if let Some(tblprops) = data.get("tblproperties")
1389            && let Some(obj) = tblprops.as_object()
1390        {
1391            for (key, value) in obj {
1392                let mut rule = HashMap::new();
1393                rule.insert("property".to_string(), Value::String(key.clone()));
1394                rule.insert("value".to_string(), json_value_to_serde_value(value));
1395                quality_rules.push(rule);
1396            }
1397        }
1398
1399        quality_rules
1400    }
1401
1402    /// Parse Liquibase YAML changelog format.
1403    ///
1404    /// Extracts the first `createTable` change from a Liquibase changelog.
1405    /// Supports both direct column definitions and nested column structures.
1406    ///
1407    /// # Supported Format
1408    ///
1409    fn parse_liquibase(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1410        // Supported Liquibase YAML changelog format:
1411        // databaseChangeLog:
1412        //   - changeSet:
1413        //       - createTable:
1414        //           tableName: my_table
1415        //           columns:
1416        //             - column:
1417        //                 name: id
1418        //                 type: int
1419        //                 constraints:
1420        //                   primaryKey: true
1421        //                   nullable: false
1422
1423        let mut errors = Vec::new();
1424
1425        let changelog = data
1426            .get("databaseChangeLog")
1427            .and_then(|v| v.as_array())
1428            .ok_or_else(|| anyhow::anyhow!("Liquibase YAML missing databaseChangeLog array"))?;
1429
1430        // Find first createTable change.
1431        let mut table_name: Option<String> = None;
1432        let mut columns: Vec<crate::models::column::Column> = Vec::new();
1433
1434        for entry in changelog {
1435            // Entries are typically maps like { changeSet: [...] } or { changeSet: { changes: [...] } }
1436            if let Some(change_set) = entry.get("changeSet") {
1437                // Liquibase YAML can represent changeSet as map or list.
1438                let changes = if let Some(obj) = change_set.as_object() {
1439                    obj.get("changes")
1440                        .and_then(|v| v.as_array())
1441                        .cloned()
1442                        .unwrap_or_default()
1443                } else if let Some(arr) = change_set.as_array() {
1444                    // Some variants encode changes directly as list entries inside changeSet.
1445                    arr.clone()
1446                } else {
1447                    Vec::new()
1448                };
1449
1450                for ch in changes {
1451                    let create = ch.get("createTable").or_else(|| ch.get("create_table"));
1452                    if let Some(create) = create {
1453                        table_name = create
1454                            .get("tableName")
1455                            .or_else(|| create.get("table_name"))
1456                            .and_then(|v| v.as_str())
1457                            .map(|s| s.to_string());
1458
1459                        // columns: [ { column: { ... } }, ... ]
1460                        if let Some(cols) = create.get("columns").and_then(|v| v.as_array()) {
1461                            for col_entry in cols {
1462                                let col = col_entry.get("column").unwrap_or(col_entry);
1463                                let name = col
1464                                    .get("name")
1465                                    .and_then(|v| v.as_str())
1466                                    .unwrap_or("")
1467                                    .to_string();
1468                                let data_type = col
1469                                    .get("type")
1470                                    .and_then(|v| v.as_str())
1471                                    .unwrap_or("")
1472                                    .to_string();
1473
1474                                if name.is_empty() {
1475                                    errors.push(ParserError {
1476                                        error_type: "validation_error".to_string(),
1477                                        field: "columns.name".to_string(),
1478                                        message: "Liquibase createTable column missing name"
1479                                            .to_string(),
1480                                    });
1481                                    continue;
1482                                }
1483
1484                                let mut column =
1485                                    crate::models::column::Column::new(name, data_type);
1486
1487                                if let Some(constraints) =
1488                                    col.get("constraints").and_then(|v| v.as_object())
1489                                {
1490                                    if let Some(pk) =
1491                                        constraints.get("primaryKey").and_then(|v| v.as_bool())
1492                                    {
1493                                        column.primary_key = pk;
1494                                    }
1495                                    if let Some(nullable) =
1496                                        constraints.get("nullable").and_then(|v| v.as_bool())
1497                                    {
1498                                        column.nullable = nullable;
1499                                    }
1500                                }
1501
1502                                columns.push(column);
1503                            }
1504                        }
1505
1506                        // parse_table() returns a single Table, so we parse the first createTable.
1507                        // If multiple tables are needed, call parse_table() multiple times or use import().
1508                        break;
1509                    }
1510                }
1511            }
1512            if table_name.is_some() {
1513                break;
1514            }
1515        }
1516
1517        let table_name = table_name
1518            .ok_or_else(|| anyhow::anyhow!("Liquibase changelog did not contain a createTable"))?;
1519        let table = Table::new(table_name, columns);
1520        // Preserve any errors collected.
1521        Ok((table, errors))
1522    }
1523
1524    /// Parse ODCS v3.0.x format.
1525    fn parse_odcl_v3(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
1526        let mut errors = Vec::new();
1527
1528        // Extract table name
1529        let table_name = data
1530            .get("name")
1531            .and_then(|v| v.as_str())
1532            .map(|s| s.to_string())
1533            .or_else(|| {
1534                // Try to get from schema array
1535                data.get("schema")
1536                    .and_then(|v| v.as_array())
1537                    .and_then(|arr| arr.first())
1538                    .and_then(|obj| obj.as_object())
1539                    .and_then(|obj| obj.get("name"))
1540                    .and_then(|v| v.as_str())
1541                    .map(|s| s.to_string())
1542            })
1543            .ok_or_else(|| {
1544                anyhow::anyhow!("ODCS v3.0.x YAML missing 'name' field and no schema objects")
1545            })?;
1546
1547        // Extract schema - ODCS v3.0.x uses array of SchemaObject
1548        let schema = data
1549            .get("schema")
1550            .and_then(|v| v.as_array())
1551            .ok_or_else(|| {
1552                errors.push(ParserError {
1553                    error_type: "validation_error".to_string(),
1554                    field: "schema".to_string(),
1555                    message: "ODCS v3.0.x YAML missing 'schema' field".to_string(),
1556                });
1557                anyhow::anyhow!("Missing schema")
1558            });
1559
1560        let schema = match schema {
1561            Ok(s) if s.is_empty() => {
1562                errors.push(ParserError {
1563                    error_type: "validation_error".to_string(),
1564                    field: "schema".to_string(),
1565                    message: "ODCS v3.0.x schema array is empty".to_string(),
1566                });
1567                let quality_rules = self.extract_quality_rules(data);
1568                let table_uuid = self.extract_table_uuid(data);
1569                let table = Table {
1570                    id: table_uuid,
1571                    name: table_name,
1572                    columns: Vec::new(),
1573                    database_type: None,
1574                    catalog_name: None,
1575                    schema_name: None,
1576                    medallion_layers: Vec::new(),
1577                    scd_pattern: None,
1578                    data_vault_classification: None,
1579                    modeling_level: None,
1580                    tags: Vec::<Tag>::new(),
1581                    odcl_metadata: HashMap::new(),
1582                    owner: None,
1583                    sla: None,
1584                    contact_details: None,
1585                    infrastructure_type: None,
1586                    notes: None,
1587                    position: None,
1588                    yaml_file_path: None,
1589                    drawio_cell_id: None,
1590                    quality: quality_rules,
1591                    errors: Vec::new(),
1592                    created_at: chrono::Utc::now(),
1593                    updated_at: chrono::Utc::now(),
1594                };
1595                return Ok((table, errors));
1596            }
1597            Ok(s) => s,
1598            Err(_) => {
1599                let quality_rules = self.extract_quality_rules(data);
1600                let table_uuid = self.extract_table_uuid(data);
1601                let table = Table {
1602                    id: table_uuid,
1603                    name: table_name,
1604                    columns: Vec::new(),
1605                    database_type: None,
1606                    catalog_name: None,
1607                    schema_name: None,
1608                    medallion_layers: Vec::new(),
1609                    scd_pattern: None,
1610                    data_vault_classification: None,
1611                    modeling_level: None,
1612                    tags: Vec::<Tag>::new(),
1613                    odcl_metadata: HashMap::new(),
1614                    owner: None,
1615                    sla: None,
1616                    contact_details: None,
1617                    infrastructure_type: None,
1618                    notes: None,
1619                    position: None,
1620                    yaml_file_path: None,
1621                    drawio_cell_id: None,
1622                    quality: quality_rules,
1623                    errors: Vec::new(),
1624                    created_at: chrono::Utc::now(),
1625                    updated_at: chrono::Utc::now(),
1626                };
1627                return Ok((table, errors));
1628            }
1629        };
1630
1631        // Get the first schema object (table)
1632        let schema_object = schema.first().and_then(|v| v.as_object()).ok_or_else(|| {
1633            errors.push(ParserError {
1634                error_type: "validation_error".to_string(),
1635                field: "schema[0]".to_string(),
1636                message: "First schema object must be a dictionary".to_string(),
1637            });
1638            anyhow::anyhow!("Invalid schema object")
1639        })?;
1640
1641        let object_name = schema_object
1642            .get("name")
1643            .and_then(|v| v.as_str())
1644            .unwrap_or(&table_name);
1645
1646        // Handle both object format (v3.0.x) and array format (v3.1.0) for properties
1647        let mut columns = Vec::new();
1648
1649        if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
1650            // Object format (v3.0.x): properties is a map of name -> property object
1651            for (prop_name, prop_data) in properties_obj {
1652                if let Some(prop_obj) = prop_data.as_object() {
1653                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
1654                        Ok(mut cols) => columns.append(&mut cols),
1655                        Err(e) => {
1656                            errors.push(ParserError {
1657                                error_type: "property_parse_error".to_string(),
1658                                field: format!("Property '{}'", prop_name),
1659                                message: e.to_string(),
1660                            });
1661                        }
1662                    }
1663                } else {
1664                    errors.push(ParserError {
1665                        error_type: "validation_error".to_string(),
1666                        field: format!("Property '{}'", prop_name),
1667                        message: format!("Property '{}' must be an object", prop_name),
1668                    });
1669                }
1670            }
1671        } else if let Some(properties_arr) =
1672            schema_object.get("properties").and_then(|v| v.as_array())
1673        {
1674            // Array format (v3.1.0): properties is an array of property objects with 'name' field
1675            for (idx, prop_data) in properties_arr.iter().enumerate() {
1676                if let Some(prop_obj) = prop_data.as_object() {
1677                    // Extract name from property object (required in v3.1.0)
1678                    // ODCS v3.1.0 requires 'name' field, but we'll also accept 'id' as fallback
1679                    let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
1680                        Some(JsonValue::String(s)) => s.as_str(),
1681                        _ => {
1682                            // Skip properties without name or id (not valid ODCS v3.1.0)
1683                            errors.push(ParserError {
1684                                error_type: "validation_error".to_string(),
1685                                field: format!("Property[{}]", idx),
1686                                message: format!(
1687                                    "Property[{}] missing required 'name' or 'id' field",
1688                                    idx
1689                                ),
1690                            });
1691                            continue;
1692                        }
1693                    };
1694
1695                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
1696                        Ok(mut cols) => columns.append(&mut cols),
1697                        Err(e) => {
1698                            errors.push(ParserError {
1699                                error_type: "property_parse_error".to_string(),
1700                                field: format!("Property[{}] '{}'", idx, prop_name),
1701                                message: e.to_string(),
1702                            });
1703                        }
1704                    }
1705                } else {
1706                    errors.push(ParserError {
1707                        error_type: "validation_error".to_string(),
1708                        field: format!("Property[{}]", idx),
1709                        message: format!("Property[{}] must be an object", idx),
1710                    });
1711                }
1712            }
1713        } else {
1714            errors.push(ParserError {
1715                error_type: "validation_error".to_string(),
1716                field: format!("Object '{}'", object_name),
1717                message: format!(
1718                    "Object '{}' missing 'properties' field or properties is invalid",
1719                    object_name
1720                ),
1721            });
1722        }
1723
1724        // Extract metadata from customProperties
1725        let (medallion_layers, scd_pattern, data_vault_classification, mut tags): (
1726            _,
1727            _,
1728            _,
1729            Vec<Tag>,
1730        ) = self.extract_metadata_from_custom_properties(data);
1731
1732        // Extract sharedDomains from customProperties
1733        let mut shared_domains: Vec<String> = Vec::new();
1734        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
1735            for prop in custom_props {
1736                if let Some(prop_obj) = prop.as_object() {
1737                    let prop_key = prop_obj
1738                        .get("property")
1739                        .and_then(|v| v.as_str())
1740                        .unwrap_or("");
1741                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
1742                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
1743                    {
1744                        for item in arr {
1745                            if let Some(s) = item.as_str() {
1746                                shared_domains.push(s.to_string());
1747                            }
1748                        }
1749                    }
1750                }
1751            }
1752        }
1753
1754        // Extract tags from top-level tags field (if not already extracted from customProperties)
1755        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
1756            for item in tags_arr {
1757                if let Some(s) = item.as_str() {
1758                    // Parse tag string to Tag enum
1759                    let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
1760                    if !tags.contains(&tag) {
1761                        tags.push(tag);
1762                    }
1763                }
1764            }
1765        }
1766
1767        // Extract database type from servers
1768        let database_type = self.extract_database_type_from_odcl_v3_servers(data);
1769
1770        // Extract quality rules
1771        let quality_rules = self.extract_quality_rules(data);
1772
1773        // Build ODCL metadata
1774        let mut odcl_metadata = HashMap::new();
1775        odcl_metadata.insert(
1776            "apiVersion".to_string(),
1777            json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
1778        );
1779        odcl_metadata.insert(
1780            "kind".to_string(),
1781            json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
1782        );
1783        odcl_metadata.insert(
1784            "id".to_string(),
1785            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
1786        );
1787        odcl_metadata.insert(
1788            "version".to_string(),
1789            json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
1790        );
1791        odcl_metadata.insert(
1792            "status".to_string(),
1793            json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
1794        );
1795
1796        // Extract servicelevels if present
1797        if let Some(servicelevels_val) = data.get("servicelevels") {
1798            odcl_metadata.insert(
1799                "servicelevels".to_string(),
1800                json_value_to_serde_value(servicelevels_val),
1801            );
1802        }
1803
1804        // Extract links if present
1805        if let Some(links_val) = data.get("links") {
1806            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
1807        }
1808
1809        // Extract domain, dataProduct, tenant
1810        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
1811            odcl_metadata.insert(
1812                "domain".to_string(),
1813                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
1814            );
1815        }
1816        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
1817            odcl_metadata.insert(
1818                "dataProduct".to_string(),
1819                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
1820            );
1821        }
1822        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
1823            odcl_metadata.insert(
1824                "tenant".to_string(),
1825                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
1826            );
1827        }
1828
1829        // Extract top-level description (can be object or string)
1830        if let Some(desc_val) = data.get("description") {
1831            odcl_metadata.insert(
1832                "description".to_string(),
1833                json_value_to_serde_value(desc_val),
1834            );
1835        }
1836
1837        // Extract pricing
1838        if let Some(pricing_val) = data.get("pricing") {
1839            odcl_metadata.insert(
1840                "pricing".to_string(),
1841                json_value_to_serde_value(pricing_val),
1842            );
1843        }
1844
1845        // Extract team
1846        if let Some(team_val) = data.get("team") {
1847            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
1848        }
1849
1850        // Extract roles
1851        if let Some(roles_val) = data.get("roles") {
1852            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
1853        }
1854
1855        // Extract terms
1856        if let Some(terms_val) = data.get("terms") {
1857            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
1858        }
1859
1860        // Extract full servers array (not just type)
1861        if let Some(servers_val) = data.get("servers") {
1862            odcl_metadata.insert(
1863                "servers".to_string(),
1864                json_value_to_serde_value(servers_val),
1865            );
1866        }
1867
1868        // Extract infrastructure
1869        if let Some(infrastructure_val) = data.get("infrastructure") {
1870            odcl_metadata.insert(
1871                "infrastructure".to_string(),
1872                json_value_to_serde_value(infrastructure_val),
1873            );
1874        }
1875
1876        // Store sharedDomains in metadata (extracted from customProperties above)
1877        if !shared_domains.is_empty() {
1878            let shared_domains_json: Vec<serde_json::Value> = shared_domains
1879                .iter()
1880                .map(|d| serde_json::Value::String(d.clone()))
1881                .collect();
1882            odcl_metadata.insert(
1883                "sharedDomains".to_string(),
1884                serde_json::Value::Array(shared_domains_json),
1885            );
1886        }
1887
1888        let table_uuid = self.extract_table_uuid(data);
1889
1890        let table = Table {
1891            id: table_uuid,
1892            name: table_name,
1893            columns,
1894            database_type,
1895            catalog_name: None,
1896            schema_name: None,
1897            medallion_layers,
1898            scd_pattern,
1899            data_vault_classification,
1900            modeling_level: None,
1901            tags,
1902            odcl_metadata,
1903            owner: None,
1904            sla: None,
1905            contact_details: None,
1906            infrastructure_type: None,
1907            notes: None,
1908            position: None,
1909            yaml_file_path: None,
1910            drawio_cell_id: None,
1911            quality: quality_rules,
1912            errors: Vec::new(),
1913            created_at: chrono::Utc::now(),
1914            updated_at: chrono::Utc::now(),
1915        };
1916
1917        info!(
1918            "Parsed ODCL v3.0.0 table: {} with {} warnings/errors",
1919            table.name,
1920            errors.len()
1921        );
1922        Ok((table, errors))
1923    }
1924
1925    /// Parse ALL tables from ODCS v3.0.x/v3.1.0 format schema array.
1926    ///
1927    /// Unlike `parse_odcl_v3` which only returns the first table, this method
1928    /// iterates over all schema objects and returns a vector of tables.
1929    ///
1930    /// # Returns
1931    ///
1932    /// Returns a vector of (Table, Vec<ParserError>) tuples, one for each schema object.
1933    fn parse_odcl_v3_all(&self, data: &JsonValue) -> Result<Vec<(Table, Vec<ParserError>)>> {
1934        let mut all_tables = Vec::new();
1935
1936        // Extract schema array - ODCS v3.0.x/v3.1.0 uses array of SchemaObject
1937        let schema = match data.get("schema").and_then(|v| v.as_array()) {
1938            Some(s) if !s.is_empty() => s,
1939            Some(_) => {
1940                // Empty schema array - return empty result
1941                return Ok(Vec::new());
1942            }
1943            None => {
1944                // No schema field - return empty result
1945                return Ok(Vec::new());
1946            }
1947        };
1948
1949        // Extract contract-level metadata (shared across all tables)
1950        let (medallion_layers, scd_pattern, data_vault_classification, contract_tags): (
1951            _,
1952            _,
1953            _,
1954            Vec<Tag>,
1955        ) = self.extract_metadata_from_custom_properties(data);
1956
1957        // Extract database type from servers (shared)
1958        let database_type = self.extract_database_type_from_odcl_v3_servers(data);
1959
1960        // Extract quality rules (contract-level, shared)
1961        let contract_quality_rules = self.extract_quality_rules(data);
1962
1963        // Build base ODCL metadata (shared across all tables)
1964        let base_odcl_metadata = self.build_odcl_metadata(data);
1965
1966        // Parse each schema object as a separate table
1967        for (schema_idx, schema_value) in schema.iter().enumerate() {
1968            let mut errors = Vec::new();
1969
1970            let schema_object = match schema_value.as_object() {
1971                Some(obj) => obj,
1972                None => {
1973                    errors.push(ParserError {
1974                        error_type: "validation_error".to_string(),
1975                        field: format!("schema[{}]", schema_idx),
1976                        message: format!(
1977                            "Schema object at index {} must be a dictionary",
1978                            schema_idx
1979                        ),
1980                    });
1981                    continue;
1982                }
1983            };
1984
1985            // Get table name from schema object
1986            let table_name = match schema_object.get("name").and_then(|v| v.as_str()) {
1987                Some(name) => name.to_string(),
1988                None => {
1989                    errors.push(ParserError {
1990                        error_type: "validation_error".to_string(),
1991                        field: format!("schema[{}].name", schema_idx),
1992                        message: format!(
1993                            "Schema object at index {} missing 'name' field",
1994                            schema_idx
1995                        ),
1996                    });
1997                    continue;
1998                }
1999            };
2000
2001            // Parse columns from properties
2002            let columns = self.parse_schema_object_properties(schema_object, data, &mut errors);
2003
2004            // Extract table-specific tags (merge with contract tags)
2005            let mut tags = contract_tags.clone();
2006            if let Some(tags_arr) = schema_object.get("tags").and_then(|v| v.as_array()) {
2007                for item in tags_arr {
2008                    if let Some(s) = item.as_str() {
2009                        let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
2010                        if !tags.contains(&tag) {
2011                            tags.push(tag);
2012                        }
2013                    }
2014                }
2015            }
2016
2017            // Extract table-specific quality rules (merge with contract rules)
2018            let mut quality_rules = contract_quality_rules.clone();
2019            if let Some(quality_arr) = schema_object.get("quality").and_then(|v| v.as_array()) {
2020                for rule in quality_arr {
2021                    if let Some(rule_obj) = rule.as_object() {
2022                        let mut rule_map = HashMap::new();
2023                        for (k, v) in rule_obj {
2024                            rule_map.insert(k.clone(), json_value_to_serde_value(v));
2025                        }
2026                        quality_rules.push(rule_map);
2027                    }
2028                }
2029            }
2030
2031            // Generate UUID for this table
2032            // First check if schema object has its own id
2033            let table_uuid = if let Some(id_str) = schema_object.get("id").and_then(|v| v.as_str())
2034            {
2035                uuid::Uuid::parse_str(id_str)
2036                    .unwrap_or_else(|_| Table::generate_id(&table_name, None, None, None))
2037            } else {
2038                // For multi-table ODCS, generate deterministic UUID from table name
2039                Table::generate_id(&table_name, None, None, None)
2040            };
2041
2042            // Clone metadata for this table
2043            let mut odcl_metadata = base_odcl_metadata.clone();
2044
2045            // Add schema-object-specific metadata (ODCS v3.1.0 schema-level fields)
2046            if let Some(desc) = schema_object.get("description") {
2047                odcl_metadata.insert(
2048                    "schemaDescription".to_string(),
2049                    json_value_to_serde_value(desc),
2050                );
2051            }
2052
2053            // physicalName at schema object level
2054            if let Some(phys_name) = schema_object.get("physicalName").and_then(|v| v.as_str()) {
2055                odcl_metadata.insert(
2056                    "schemaPhysicalName".to_string(),
2057                    serde_json::Value::String(phys_name.to_string()),
2058                );
2059            }
2060
2061            // physicalType at schema object level
2062            if let Some(phys_type) = schema_object.get("physicalType").and_then(|v| v.as_str()) {
2063                odcl_metadata.insert(
2064                    "schemaPhysicalType".to_string(),
2065                    serde_json::Value::String(phys_type.to_string()),
2066                );
2067            }
2068
2069            // businessName at schema object level
2070            if let Some(biz_name) = schema_object.get("businessName").and_then(|v| v.as_str()) {
2071                odcl_metadata.insert(
2072                    "schemaBusinessName".to_string(),
2073                    serde_json::Value::String(biz_name.to_string()),
2074                );
2075            }
2076
2077            // dataGranularityDescription at schema object level
2078            if let Some(granularity) = schema_object
2079                .get("dataGranularityDescription")
2080                .and_then(|v| v.as_str())
2081            {
2082                odcl_metadata.insert(
2083                    "dataGranularityDescription".to_string(),
2084                    serde_json::Value::String(granularity.to_string()),
2085                );
2086            }
2087
2088            // authoritativeDefinitions at schema object level
2089            if let Some(auth_defs) = schema_object.get("authoritativeDefinitions") {
2090                odcl_metadata.insert(
2091                    "schemaAuthoritativeDefinitions".to_string(),
2092                    json_value_to_serde_value(auth_defs),
2093                );
2094            }
2095
2096            // relationships at schema object level
2097            if let Some(relationships) = schema_object.get("relationships") {
2098                odcl_metadata.insert(
2099                    "schemaRelationships".to_string(),
2100                    json_value_to_serde_value(relationships),
2101                );
2102            }
2103
2104            let table = Table {
2105                id: table_uuid,
2106                name: table_name.clone(),
2107                columns,
2108                database_type,
2109                catalog_name: None,
2110                schema_name: None,
2111                medallion_layers: medallion_layers.clone(),
2112                scd_pattern,
2113                data_vault_classification,
2114                modeling_level: None,
2115                tags,
2116                odcl_metadata,
2117                owner: None,
2118                sla: None,
2119                contact_details: None,
2120                infrastructure_type: None,
2121                notes: None,
2122                position: None,
2123                yaml_file_path: None,
2124                drawio_cell_id: None,
2125                quality: quality_rules,
2126                errors: Vec::new(),
2127                created_at: chrono::Utc::now(),
2128                updated_at: chrono::Utc::now(),
2129            };
2130
2131            info!(
2132                "Parsed ODCS v3 table {}/{}: {} with {} columns, {} warnings/errors",
2133                schema_idx + 1,
2134                schema.len(),
2135                table.name,
2136                table.columns.len(),
2137                errors.len()
2138            );
2139
2140            all_tables.push((table, errors));
2141        }
2142
2143        Ok(all_tables)
2144    }
2145
2146    /// Parse properties from a schema object into columns.
2147    fn parse_schema_object_properties(
2148        &self,
2149        schema_object: &serde_json::Map<String, JsonValue>,
2150        data: &JsonValue,
2151        errors: &mut Vec<ParserError>,
2152    ) -> Vec<Column> {
2153        let mut columns = Vec::new();
2154
2155        if let Some(properties_obj) = schema_object.get("properties").and_then(|v| v.as_object()) {
2156            // Object format (v3.0.x): properties is a map of name -> property object
2157            for (prop_name, prop_data) in properties_obj {
2158                if let Some(prop_obj) = prop_data.as_object() {
2159                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
2160                        Ok(mut cols) => columns.append(&mut cols),
2161                        Err(e) => {
2162                            errors.push(ParserError {
2163                                error_type: "property_parse_error".to_string(),
2164                                field: format!("Property '{}'", prop_name),
2165                                message: e.to_string(),
2166                            });
2167                        }
2168                    }
2169                } else {
2170                    errors.push(ParserError {
2171                        error_type: "validation_error".to_string(),
2172                        field: format!("Property '{}'", prop_name),
2173                        message: format!("Property '{}' must be an object", prop_name),
2174                    });
2175                }
2176            }
2177        } else if let Some(properties_arr) =
2178            schema_object.get("properties").and_then(|v| v.as_array())
2179        {
2180            // Array format (v3.1.0): properties is an array of property objects with 'name' field
2181            for (idx, prop_data) in properties_arr.iter().enumerate() {
2182                if let Some(prop_obj) = prop_data.as_object() {
2183                    let prop_name = match prop_obj.get("name").or_else(|| prop_obj.get("id")) {
2184                        Some(JsonValue::String(s)) => s.as_str(),
2185                        _ => {
2186                            errors.push(ParserError {
2187                                error_type: "validation_error".to_string(),
2188                                field: format!("Property[{}]", idx),
2189                                message: format!(
2190                                    "Property[{}] missing required 'name' or 'id' field",
2191                                    idx
2192                                ),
2193                            });
2194                            continue;
2195                        }
2196                    };
2197
2198                    match self.parse_odcl_v3_property(prop_name, prop_obj, data) {
2199                        Ok(mut cols) => columns.append(&mut cols),
2200                        Err(e) => {
2201                            errors.push(ParserError {
2202                                error_type: "property_parse_error".to_string(),
2203                                field: format!("Property[{}] '{}'", idx, prop_name),
2204                                message: e.to_string(),
2205                            });
2206                        }
2207                    }
2208                } else {
2209                    errors.push(ParserError {
2210                        error_type: "validation_error".to_string(),
2211                        field: format!("Property[{}]", idx),
2212                        message: format!("Property[{}] must be an object", idx),
2213                    });
2214                }
2215            }
2216        }
2217
2218        columns
2219    }
2220
2221    /// Build base ODCL metadata from contract-level fields.
2222    fn build_odcl_metadata(&self, data: &JsonValue) -> HashMap<String, serde_json::Value> {
2223        let mut odcl_metadata = HashMap::new();
2224
2225        // Core identity fields
2226        odcl_metadata.insert(
2227            "apiVersion".to_string(),
2228            json_value_to_serde_value(data.get("apiVersion").unwrap_or(&JsonValue::Null)),
2229        );
2230        odcl_metadata.insert(
2231            "kind".to_string(),
2232            json_value_to_serde_value(data.get("kind").unwrap_or(&JsonValue::Null)),
2233        );
2234        odcl_metadata.insert(
2235            "id".to_string(),
2236            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
2237        );
2238        odcl_metadata.insert(
2239            "version".to_string(),
2240            json_value_to_serde_value(data.get("version").unwrap_or(&JsonValue::Null)),
2241        );
2242        odcl_metadata.insert(
2243            "status".to_string(),
2244            json_value_to_serde_value(data.get("status").unwrap_or(&JsonValue::Null)),
2245        );
2246
2247        // Optional contract-level fields
2248        let optional_fields = [
2249            "servicelevels",
2250            "links",
2251            "domain",
2252            "dataProduct",
2253            "tenant",
2254            "description",
2255            "pricing",
2256            "team",
2257            "roles",
2258            "terms",
2259            "servers",
2260            "infrastructure",
2261        ];
2262
2263        for field in optional_fields {
2264            if let Some(val) = data.get(field) {
2265                odcl_metadata.insert(field.to_string(), json_value_to_serde_value(val));
2266            }
2267        }
2268
2269        // Extract sharedDomains from customProperties
2270        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2271            for prop in custom_props {
2272                if let Some(prop_obj) = prop.as_object() {
2273                    let prop_key = prop_obj
2274                        .get("property")
2275                        .and_then(|v| v.as_str())
2276                        .unwrap_or("");
2277                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
2278                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
2279                    {
2280                        let shared_domains: Vec<serde_json::Value> = arr
2281                            .iter()
2282                            .filter_map(|item| {
2283                                item.as_str()
2284                                    .map(|s| serde_json::Value::String(s.to_string()))
2285                            })
2286                            .collect();
2287                        if !shared_domains.is_empty() {
2288                            odcl_metadata.insert(
2289                                "sharedDomains".to_string(),
2290                                serde_json::Value::Array(shared_domains),
2291                            );
2292                        }
2293                    }
2294                }
2295            }
2296        }
2297
2298        odcl_metadata
2299    }
2300
2301    /// Parse a single property from ODCS v3.0.x format (similar to Data Contract field).
2302    fn parse_odcl_v3_property(
2303        &self,
2304        prop_name: &str,
2305        prop_data: &serde_json::Map<String, JsonValue>,
2306        data: &JsonValue,
2307    ) -> Result<Vec<Column>> {
2308        // Reuse Data Contract field parsing logic (they're similar)
2309        let mut errors = Vec::new();
2310        self.parse_data_contract_field(prop_name, prop_data, data, &mut errors)
2311    }
2312
2313    /// Extract table UUID from ODCS `id` field (standard) or fallback to customProperties/odcl_metadata (backward compatibility).
2314    /// Returns the UUID if found, otherwise generates a new one.
2315    fn extract_table_uuid(&self, data: &JsonValue) -> uuid::Uuid {
2316        // First check the top-level `id` field (ODCS spec: "A unique identifier used to reduce the risk of dataset name collisions, such as a UUID.")
2317        if let Some(id_val) = data.get("id")
2318            && let Some(id_str) = id_val.as_str()
2319        {
2320            if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
2321                tracing::debug!(
2322                    "[ODCSImporter] Extracted UUID from top-level 'id' field: {}",
2323                    uuid
2324                );
2325                return uuid;
2326            } else {
2327                tracing::warn!(
2328                    "[ODCSImporter] Found 'id' field but failed to parse as UUID: {}",
2329                    id_str
2330                );
2331            }
2332        }
2333
2334        // Backward compatibility: check customProperties for tableUuid (legacy format)
2335        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2336            for prop in custom_props {
2337                if let Some(prop_obj) = prop.as_object() {
2338                    let prop_key = prop_obj
2339                        .get("property")
2340                        .and_then(|v| v.as_str())
2341                        .unwrap_or("");
2342                    if prop_key == "tableUuid"
2343                        && let Some(uuid_str) = prop_obj.get("value").and_then(|v| v.as_str())
2344                        && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
2345                    {
2346                        tracing::debug!(
2347                            "[ODCSImporter] Extracted UUID from customProperties.tableUuid: {}",
2348                            uuid
2349                        );
2350                        return uuid;
2351                    }
2352                }
2353            }
2354        }
2355
2356        // Fallback: check odcl_metadata if present (legacy format)
2357        if let Some(metadata) = data.get("odcl_metadata").and_then(|v| v.as_object())
2358            && let Some(uuid_val) = metadata.get("tableUuid")
2359            && let Some(uuid_str) = uuid_val.as_str()
2360            && let Ok(uuid) = uuid::Uuid::parse_str(uuid_str)
2361        {
2362            tracing::debug!(
2363                "[ODCSImporter] Extracted UUID from odcl_metadata.tableUuid: {}",
2364                uuid
2365            );
2366            return uuid;
2367        }
2368
2369        // Generate deterministic UUID v5 if not found (based on table name)
2370        let table_name = data
2371            .get("name")
2372            .and_then(|v| v.as_str())
2373            .unwrap_or("unknown");
2374        let new_uuid = crate::models::table::Table::generate_id(
2375            table_name, None, // database_type not available here
2376            None, // catalog_name not available here
2377            None, // schema_name not available here
2378        );
2379        tracing::warn!(
2380            "[ODCSImporter] No UUID found for table '{}', generating deterministic UUID: {}. This may cause relationships to become orphaned!",
2381            table_name,
2382            new_uuid
2383        );
2384        new_uuid
2385    }
2386
2387    /// Extract metadata from customProperties in ODCS v3.0.x format.
2388    fn extract_metadata_from_custom_properties(
2389        &self,
2390        data: &JsonValue,
2391    ) -> (
2392        Vec<MedallionLayer>,
2393        Option<SCDPattern>,
2394        Option<DataVaultClassification>,
2395        Vec<Tag>,
2396    ) {
2397        let mut medallion_layers = Vec::new();
2398        let mut scd_pattern = None;
2399        let mut data_vault_classification = None;
2400        let mut tags: Vec<Tag> = Vec::new();
2401
2402        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2403            for prop in custom_props {
2404                if let Some(prop_obj) = prop.as_object() {
2405                    let prop_key = prop_obj
2406                        .get("property")
2407                        .and_then(|v| v.as_str())
2408                        .unwrap_or("");
2409                    let prop_value = prop_obj.get("value");
2410
2411                    match prop_key {
2412                        "medallionLayers" | "medallion_layers" => {
2413                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
2414                                for item in arr {
2415                                    if let Some(s) = item.as_str()
2416                                        && let Ok(layer) = parse_medallion_layer(s)
2417                                    {
2418                                        medallion_layers.push(layer);
2419                                    }
2420                                }
2421                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2422                                // Comma-separated string
2423                                for part in s.split(',') {
2424                                    if let Ok(layer) = parse_medallion_layer(part.trim()) {
2425                                        medallion_layers.push(layer);
2426                                    }
2427                                }
2428                            }
2429                        }
2430                        "scdPattern" | "scd_pattern" => {
2431                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2432                                scd_pattern = parse_scd_pattern(s).ok();
2433                            }
2434                        }
2435                        "dataVaultClassification" | "data_vault_classification" => {
2436                            if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2437                                data_vault_classification = parse_data_vault_classification(s).ok();
2438                            }
2439                        }
2440                        "tags" => {
2441                            if let Some(arr) = prop_value.and_then(|v| v.as_array()) {
2442                                for item in arr {
2443                                    if let Some(s) = item.as_str() {
2444                                        // Parse tag string to Tag enum
2445                                        if let Ok(tag) = Tag::from_str(s) {
2446                                            tags.push(tag);
2447                                        } else {
2448                                            tags.push(Tag::Simple(s.to_string()));
2449                                        }
2450                                    }
2451                                }
2452                            } else if let Some(s) = prop_value.and_then(|v| v.as_str()) {
2453                                // Comma-separated string
2454                                for part in s.split(',') {
2455                                    let part = part.trim();
2456                                    if let Ok(tag) = Tag::from_str(part) {
2457                                        tags.push(tag);
2458                                    } else {
2459                                        tags.push(Tag::Simple(part.to_string()));
2460                                    }
2461                                }
2462                            }
2463                        }
2464                        "sharedDomains" | "shared_domains" => {
2465                            // sharedDomains will be stored in metadata by the caller
2466                            // This match is here for completeness but sharedDomains is handled separately
2467                        }
2468                        _ => {}
2469                    }
2470                }
2471            }
2472        }
2473
2474        // Also extract tags from top-level tags field
2475        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
2476            for item in tags_arr {
2477                if let Some(s) = item.as_str() {
2478                    // Parse tag string to Tag enum
2479                    let tag = Tag::from_str(s).unwrap_or_else(|_| Tag::Simple(s.to_string()));
2480                    if !tags.contains(&tag) {
2481                        tags.push(tag);
2482                    }
2483                }
2484            }
2485        }
2486
2487        (
2488            medallion_layers,
2489            scd_pattern,
2490            data_vault_classification,
2491            tags,
2492        )
2493    }
2494
2495    /// Extract database type from servers in ODCS v3.0.x format.
2496    fn extract_database_type_from_odcl_v3_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
2497        // ODCS v3.0.x: servers is an array of Server objects
2498        if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array())
2499            && let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object())
2500        {
2501            return server_obj
2502                .get("type")
2503                .and_then(|v| v.as_str())
2504                .and_then(|s| self.parse_database_type(s));
2505        }
2506        None
2507    }
2508
2509    /// Parse Data Contract format.
2510    fn parse_data_contract(&self, data: &JsonValue) -> Result<(Table, Vec<ParserError>)> {
2511        let mut errors = Vec::new();
2512
2513        // Extract models
2514        let models = data
2515            .get("models")
2516            .and_then(|v| v.as_object())
2517            .ok_or_else(|| anyhow::anyhow!("Data Contract YAML missing 'models' field"))?;
2518
2519        // parse_table() returns a single Table, so we parse the first model.
2520        // If multiple models are needed, call parse_table() multiple times or use import().
2521        let (model_name, model_data) = models
2522            .iter()
2523            .next()
2524            .ok_or_else(|| anyhow::anyhow!("Data Contract 'models' object is empty"))?;
2525
2526        let model_data = model_data
2527            .as_object()
2528            .ok_or_else(|| anyhow::anyhow!("Model '{}' must be an object", model_name))?;
2529
2530        // Extract fields (columns)
2531        let fields = model_data
2532            .get("fields")
2533            .and_then(|v| v.as_object())
2534            .ok_or_else(|| {
2535                errors.push(ParserError {
2536                    error_type: "validation_error".to_string(),
2537                    field: format!("Model '{}'", model_name),
2538                    message: format!("Model '{}' missing 'fields' field", model_name),
2539                });
2540                anyhow::anyhow!("Missing fields")
2541            });
2542
2543        let fields = match fields {
2544            Ok(f) => f,
2545            Err(_) => {
2546                // Return empty table with errors
2547                let quality_rules = self.extract_quality_rules(data);
2548                let table_uuid = self.extract_table_uuid(data);
2549                let table = Table {
2550                    id: table_uuid,
2551                    name: model_name.clone(),
2552                    columns: Vec::new(),
2553                    database_type: None,
2554                    catalog_name: None,
2555                    schema_name: None,
2556                    medallion_layers: Vec::new(),
2557                    scd_pattern: None,
2558                    data_vault_classification: None,
2559                    modeling_level: None,
2560                    tags: Vec::<Tag>::new(),
2561                    odcl_metadata: HashMap::new(),
2562                    owner: None,
2563                    sla: None,
2564                    contact_details: None,
2565                    infrastructure_type: None,
2566                    notes: None,
2567                    position: None,
2568                    yaml_file_path: None,
2569                    drawio_cell_id: None,
2570                    quality: quality_rules,
2571                    errors: Vec::new(),
2572                    created_at: chrono::Utc::now(),
2573                    updated_at: chrono::Utc::now(),
2574                };
2575                return Ok((table, errors));
2576            }
2577        };
2578
2579        // Parse fields as columns
2580        let mut columns = Vec::new();
2581        for (field_name, field_data) in fields {
2582            if let Some(field_obj) = field_data.as_object() {
2583                match self.parse_data_contract_field(field_name, field_obj, data, &mut errors) {
2584                    Ok(mut cols) => columns.append(&mut cols),
2585                    Err(e) => {
2586                        errors.push(ParserError {
2587                            error_type: "field_parse_error".to_string(),
2588                            field: format!("Field '{}'", field_name),
2589                            message: e.to_string(),
2590                        });
2591                    }
2592                }
2593            } else {
2594                errors.push(ParserError {
2595                    error_type: "validation_error".to_string(),
2596                    field: format!("Field '{}'", field_name),
2597                    message: format!("Field '{}' must be an object", field_name),
2598                });
2599            }
2600        }
2601
2602        // Extract metadata from info section
2603        // The frontend expects info fields to be nested under "info" key
2604        let mut odcl_metadata = HashMap::new();
2605
2606        // Extract info section and nest it properly
2607        // Convert JsonValue object to serde_json::Value::Object (Map)
2608        if let Some(info_val) = data.get("info") {
2609            // Convert JsonValue to serde_json::Value
2610            let info_json_value = json_value_to_serde_value(info_val);
2611            odcl_metadata.insert("info".to_string(), info_json_value);
2612        }
2613
2614        odcl_metadata.insert(
2615            "dataContractSpecification".to_string(),
2616            json_value_to_serde_value(
2617                data.get("dataContractSpecification")
2618                    .unwrap_or(&JsonValue::Null),
2619            ),
2620        );
2621        odcl_metadata.insert(
2622            "id".to_string(),
2623            json_value_to_serde_value(data.get("id").unwrap_or(&JsonValue::Null)),
2624        );
2625        // Note: model_name is not added to metadata as it's redundant (it's the table name)
2626
2627        // Extract servicelevels if present
2628        if let Some(servicelevels_val) = data.get("servicelevels") {
2629            odcl_metadata.insert(
2630                "servicelevels".to_string(),
2631                json_value_to_serde_value(servicelevels_val),
2632            );
2633        }
2634
2635        // Extract links if present
2636        if let Some(links_val) = data.get("links") {
2637            odcl_metadata.insert("links".to_string(), json_value_to_serde_value(links_val));
2638        }
2639
2640        // Extract domain, dataProduct, tenant
2641        if let Some(domain_val) = data.get("domain").and_then(|v| v.as_str()) {
2642            odcl_metadata.insert(
2643                "domain".to_string(),
2644                json_value_to_serde_value(&JsonValue::String(domain_val.to_string())),
2645            );
2646        }
2647        if let Some(data_product_val) = data.get("dataProduct").and_then(|v| v.as_str()) {
2648            odcl_metadata.insert(
2649                "dataProduct".to_string(),
2650                json_value_to_serde_value(&JsonValue::String(data_product_val.to_string())),
2651            );
2652        }
2653        if let Some(tenant_val) = data.get("tenant").and_then(|v| v.as_str()) {
2654            odcl_metadata.insert(
2655                "tenant".to_string(),
2656                json_value_to_serde_value(&JsonValue::String(tenant_val.to_string())),
2657            );
2658        }
2659
2660        // Extract top-level description (can be object or string)
2661        if let Some(desc_val) = data.get("description") {
2662            odcl_metadata.insert(
2663                "description".to_string(),
2664                json_value_to_serde_value(desc_val),
2665            );
2666        }
2667
2668        // Extract pricing
2669        if let Some(pricing_val) = data.get("pricing") {
2670            odcl_metadata.insert(
2671                "pricing".to_string(),
2672                json_value_to_serde_value(pricing_val),
2673            );
2674        }
2675
2676        // Extract team
2677        if let Some(team_val) = data.get("team") {
2678            odcl_metadata.insert("team".to_string(), json_value_to_serde_value(team_val));
2679        }
2680
2681        // Extract roles
2682        if let Some(roles_val) = data.get("roles") {
2683            odcl_metadata.insert("roles".to_string(), json_value_to_serde_value(roles_val));
2684        }
2685
2686        // Extract terms
2687        if let Some(terms_val) = data.get("terms") {
2688            odcl_metadata.insert("terms".to_string(), json_value_to_serde_value(terms_val));
2689        }
2690
2691        // Extract full servers array (not just type)
2692        if let Some(servers_val) = data.get("servers") {
2693            odcl_metadata.insert(
2694                "servers".to_string(),
2695                json_value_to_serde_value(servers_val),
2696            );
2697        }
2698
2699        // Extract infrastructure
2700        if let Some(infrastructure_val) = data.get("infrastructure") {
2701            odcl_metadata.insert(
2702                "infrastructure".to_string(),
2703                json_value_to_serde_value(infrastructure_val),
2704            );
2705        }
2706
2707        // Extract database type from servers if available
2708        let database_type = self.extract_database_type_from_servers(data);
2709
2710        // Extract catalog and schema from customProperties
2711        let (catalog_name, schema_name) = self.extract_catalog_schema(data);
2712
2713        // Extract sharedDomains from customProperties
2714        let mut shared_domains: Vec<String> = Vec::new();
2715        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
2716            for prop in custom_props {
2717                if let Some(prop_obj) = prop.as_object() {
2718                    let prop_key = prop_obj
2719                        .get("property")
2720                        .and_then(|v| v.as_str())
2721                        .unwrap_or("");
2722                    if (prop_key == "sharedDomains" || prop_key == "shared_domains")
2723                        && let Some(arr) = prop_obj.get("value").and_then(|v| v.as_array())
2724                    {
2725                        for item in arr {
2726                            if let Some(s) = item.as_str() {
2727                                shared_domains.push(s.to_string());
2728                            }
2729                        }
2730                    }
2731                }
2732            }
2733        }
2734
2735        // Extract tags from top-level tags field (Data Contract format)
2736        let mut tags: Vec<Tag> = Vec::new();
2737        if let Some(tags_arr) = data.get("tags").and_then(|v| v.as_array()) {
2738            for item in tags_arr {
2739                if let Some(s) = item.as_str() {
2740                    // Parse tag string to Tag enum (supports Simple, Pair, List formats)
2741                    if let Ok(tag) = Tag::from_str(s) {
2742                        tags.push(tag);
2743                    } else {
2744                        // Fallback: create Simple tag if parsing fails
2745                        tags.push(crate::models::Tag::Simple(s.to_string()));
2746                    }
2747                }
2748            }
2749        }
2750
2751        // Extract quality rules
2752        let quality_rules = self.extract_quality_rules(data);
2753
2754        // Store sharedDomains in metadata
2755        if !shared_domains.is_empty() {
2756            let shared_domains_json: Vec<serde_json::Value> = shared_domains
2757                .iter()
2758                .map(|d| serde_json::Value::String(d.clone()))
2759                .collect();
2760            odcl_metadata.insert(
2761                "sharedDomains".to_string(),
2762                serde_json::Value::Array(shared_domains_json),
2763            );
2764        }
2765
2766        let table_uuid = self.extract_table_uuid(data);
2767
2768        let table = Table {
2769            id: table_uuid,
2770            name: model_name.clone(),
2771            columns,
2772            database_type,
2773            catalog_name,
2774            schema_name,
2775            medallion_layers: Vec::new(),
2776            scd_pattern: None,
2777            data_vault_classification: None,
2778            modeling_level: None,
2779            tags,
2780            odcl_metadata,
2781            owner: None,
2782            sla: None,
2783            contact_details: None,
2784            infrastructure_type: None,
2785            notes: None,
2786            position: None,
2787            yaml_file_path: None,
2788            drawio_cell_id: None,
2789            quality: quality_rules,
2790            errors: Vec::new(),
2791            created_at: chrono::Utc::now(),
2792            updated_at: chrono::Utc::now(),
2793        };
2794
2795        info!(
2796            "Parsed Data Contract table: {} with {} warnings/errors",
2797            model_name,
2798            errors.len()
2799        );
2800        Ok((table, errors))
2801    }
2802
2803    /// Parse a single field from Data Contract format.
2804    fn parse_data_contract_field(
2805        &self,
2806        field_name: &str,
2807        field_data: &serde_json::Map<String, JsonValue>,
2808        data: &JsonValue,
2809        errors: &mut Vec<ParserError>,
2810    ) -> Result<Vec<Column>> {
2811        let mut columns = Vec::new();
2812
2813        // Helper function to extract quality rules from a JSON object
2814        let extract_quality_from_obj =
2815            |obj: &serde_json::Map<String, JsonValue>| -> Vec<HashMap<String, serde_json::Value>> {
2816                let mut quality_rules = Vec::new();
2817                if let Some(quality_val) = obj.get("quality") {
2818                    if let Some(arr) = quality_val.as_array() {
2819                        // Array of quality rules
2820                        for item in arr {
2821                            if let Some(rule_obj) = item.as_object() {
2822                                let mut rule = HashMap::new();
2823                                for (key, value) in rule_obj {
2824                                    rule.insert(key.clone(), json_value_to_serde_value(value));
2825                                }
2826                                quality_rules.push(rule);
2827                            }
2828                        }
2829                    } else if let Some(rule_obj) = quality_val.as_object() {
2830                        // Single quality rule object
2831                        let mut rule = HashMap::new();
2832                        for (key, value) in rule_obj {
2833                            rule.insert(key.clone(), json_value_to_serde_value(value));
2834                        }
2835                        quality_rules.push(rule);
2836                    }
2837                }
2838                quality_rules
2839            };
2840
2841        // Extract description from field_data (preserve empty strings)
2842        let description = field_data
2843            .get("description")
2844            .and_then(|v| v.as_str())
2845            .unwrap_or("")
2846            .to_string();
2847
2848        // Extract quality rules from field_data
2849        let mut quality_rules = extract_quality_from_obj(field_data);
2850
2851        // Check for $ref
2852        if let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str()) {
2853            // Store ref_path (preserve even if definition doesn't exist)
2854            let ref_path = Some(ref_str.to_string());
2855
2856            if let Some(definition) = resolve_ref(ref_str, data) {
2857                // Merge quality rules from definition if field doesn't have any
2858
2859                // Also extract quality rules from definition and merge (if field doesn't have any)
2860                if quality_rules.is_empty() {
2861                    if let Some(def_obj) = definition.as_object() {
2862                        quality_rules = extract_quality_from_obj(def_obj);
2863                    }
2864                } else {
2865                    // Merge definition quality rules if field has some
2866                    if let Some(def_obj) = definition.as_object() {
2867                        let def_quality = extract_quality_from_obj(def_obj);
2868                        // Append definition quality rules (field-level takes precedence)
2869                        quality_rules.extend(def_quality);
2870                    }
2871                }
2872
2873                let required = field_data
2874                    .get("required")
2875                    .and_then(|v| v.as_bool())
2876                    .unwrap_or(false);
2877
2878                // Note: ODCS v3.1.0 uses the `required` field at the property level to indicate non-nullable columns,
2879                // not a quality rule. We should NOT add a "not_null" quality rule as it's not a valid ODCS quality type.
2880                // The `required` field will be set based on the `required` parameter during column creation.
2881
2882                // Check if definition is an object/struct with nested structure
2883                let has_nested = definition
2884                    .get("type")
2885                    .and_then(|v| v.as_str())
2886                    .map(|s| s == "object")
2887                    .unwrap_or(false)
2888                    || definition.get("properties").is_some()
2889                    || definition.get("fields").is_some();
2890
2891                if has_nested {
2892                    // Expand STRUCT from definition into nested columns with dot notation
2893                    if let Some(properties) =
2894                        definition.get("properties").and_then(|v| v.as_object())
2895                    {
2896                        // Recursively expand nested properties
2897                        let nested_required: Vec<String> = definition
2898                            .get("required")
2899                            .and_then(|v| v.as_array())
2900                            .map(|arr| {
2901                                arr.iter()
2902                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
2903                                    .collect()
2904                            })
2905                            .unwrap_or_default();
2906
2907                        for (nested_name, nested_schema) in properties {
2908                            let nested_required_field = nested_required.contains(nested_name);
2909                            expand_nested_column(
2910                                &format!("{}.{}", field_name, nested_name),
2911                                nested_schema,
2912                                !nested_required_field,
2913                                &mut columns,
2914                                errors,
2915                            );
2916                        }
2917                    } else if let Some(fields) =
2918                        definition.get("fields").and_then(|v| v.as_object())
2919                    {
2920                        // Handle fields format (ODCL style)
2921                        for (nested_name, nested_schema) in fields {
2922                            expand_nested_column(
2923                                &format!("{}.{}", field_name, nested_name),
2924                                nested_schema,
2925                                true, // Assume nullable if not specified
2926                                &mut columns,
2927                                errors,
2928                            );
2929                        }
2930                    } else {
2931                        // Fallback: create parent column as OBJECT if we can't expand
2932                        let def_physical_type = definition
2933                            .get("physicalType")
2934                            .and_then(|v| v.as_str())
2935                            .map(|s| s.to_string());
2936                        columns.push(Column {
2937                            name: field_name.to_string(),
2938                            data_type: "OBJECT".to_string(),
2939                            physical_type: def_physical_type,
2940                            nullable: !required,
2941                            description: if description.is_empty() {
2942                                definition
2943                                    .get("description")
2944                                    .and_then(|v| v.as_str())
2945                                    .unwrap_or("")
2946                                    .to_string()
2947                            } else {
2948                                description.clone()
2949                            },
2950                            quality: quality_rules.clone(),
2951                            relationships: ref_to_relationships(&ref_path),
2952                            ..Default::default()
2953                        });
2954                    }
2955                } else {
2956                    // Simple type from definition
2957                    let def_type = definition
2958                        .get("type")
2959                        .and_then(|v| v.as_str())
2960                        .unwrap_or("STRING")
2961                        .to_uppercase();
2962
2963                    let enum_values = definition
2964                        .get("enum")
2965                        .and_then(|v| v.as_array())
2966                        .map(|arr| {
2967                            arr.iter()
2968                                .filter_map(|v| v.as_str().map(|s| s.to_string()))
2969                                .collect()
2970                        })
2971                        .unwrap_or_default();
2972
2973                    let def_physical_type = definition
2974                        .get("physicalType")
2975                        .and_then(|v| v.as_str())
2976                        .map(|s| s.to_string());
2977
2978                    columns.push(Column {
2979                        name: field_name.to_string(),
2980                        data_type: def_type,
2981                        physical_type: def_physical_type,
2982                        nullable: !required,
2983                        description: if description.is_empty() {
2984                            definition
2985                                .get("description")
2986                                .and_then(|v| v.as_str())
2987                                .unwrap_or("")
2988                                .to_string()
2989                        } else {
2990                            description
2991                        },
2992                        quality: quality_rules,
2993                        relationships: ref_to_relationships(&ref_path),
2994                        enum_values,
2995                        ..Default::default()
2996                    });
2997                }
2998                return Ok(columns);
2999            } else {
3000                // Undefined reference - create column with error
3001                let mut col_errors: Vec<HashMap<String, serde_json::Value>> = Vec::new();
3002                let mut error_map = HashMap::new();
3003                error_map.insert("type".to_string(), serde_json::json!("validation_error"));
3004                error_map.insert("field".to_string(), serde_json::json!("data_type"));
3005                error_map.insert(
3006                    "message".to_string(),
3007                    serde_json::json!(format!(
3008                        "Field '{}' references undefined definition: {}",
3009                        field_name, ref_str
3010                    )),
3011                );
3012                col_errors.push(error_map);
3013                let field_physical_type = field_data
3014                    .get("physicalType")
3015                    .and_then(|v| v.as_str())
3016                    .map(|s| s.to_string());
3017                columns.push(Column {
3018                    name: field_name.to_string(),
3019                    data_type: "OBJECT".to_string(),
3020                    physical_type: field_physical_type,
3021                    description,
3022                    errors: col_errors,
3023                    relationships: ref_to_relationships(&Some(ref_str.to_string())),
3024                    ..Default::default()
3025                });
3026                return Ok(columns);
3027            }
3028        }
3029
3030        // Extract field type - check both "logicalType" (ODCS v3.1.0) and "type" (legacy)
3031        // Default to STRING if missing
3032        let field_type_str = field_data
3033            .get("logicalType")
3034            .and_then(|v| v.as_str())
3035            .or_else(|| field_data.get("type").and_then(|v| v.as_str()))
3036            .unwrap_or("STRING");
3037
3038        // Check if type contains STRUCT definition (multiline STRUCT type)
3039        if field_type_str.contains("STRUCT<") || field_type_str.contains("ARRAY<STRUCT<") {
3040            match self.parse_struct_type_from_string(field_name, field_type_str, field_data) {
3041                Ok(nested_cols) if !nested_cols.is_empty() => {
3042                    // We have nested columns - add parent column with full type, then nested columns
3043                    let parent_data_type = if field_type_str.to_uppercase().starts_with("ARRAY<") {
3044                        "ARRAY<STRUCT<...>>".to_string()
3045                    } else {
3046                        "STRUCT<...>".to_string()
3047                    };
3048
3049                    let struct_physical_type = field_data
3050                        .get("physicalType")
3051                        .and_then(|v| v.as_str())
3052                        .map(|s| s.to_string());
3053
3054                    // Add parent column
3055                    columns.push(Column {
3056                        name: field_name.to_string(),
3057                        data_type: parent_data_type,
3058                        physical_type: struct_physical_type,
3059                        nullable: !field_data
3060                            .get("required")
3061                            .and_then(|v| v.as_bool())
3062                            .unwrap_or(false),
3063                        description: description.clone(),
3064                        quality: quality_rules.clone(),
3065                        relationships: ref_to_relationships(
3066                            &field_data
3067                                .get("$ref")
3068                                .and_then(|v| v.as_str())
3069                                .map(|s| s.to_string()),
3070                        ),
3071                        ..Default::default()
3072                    });
3073
3074                    // Add nested columns
3075                    columns.extend(nested_cols);
3076                    return Ok(columns);
3077                }
3078                Ok(_) | Err(_) => {
3079                    // If parsing fails or returns empty, fall back to using the type as-is
3080                }
3081            }
3082        }
3083
3084        let field_type = normalize_data_type(field_type_str);
3085
3086        // Handle ARRAY type
3087        if field_type == "ARRAY" {
3088            let items = field_data.get("items");
3089            if let Some(items_val) = items {
3090                if let Some(items_obj) = items_val.as_object() {
3091                    // Check if items is an object with fields (nested structure)
3092                    // Check both "logicalType" (ODCS v3.1.0) and "type" (legacy) for backward compatibility
3093                    let items_type = items_obj
3094                        .get("logicalType")
3095                        .and_then(|v| v.as_str())
3096                        .or_else(|| items_obj.get("type").and_then(|v| v.as_str()));
3097
3098                    // Normalize legacy "type" values to "logicalType" equivalents
3099                    let normalized_items_type = match items_type {
3100                        Some("object") | Some("struct") => Some("object"),
3101                        Some("array") => Some("array"),
3102                        Some("string") | Some("varchar") | Some("char") | Some("text") => {
3103                            Some("string")
3104                        }
3105                        Some("integer") | Some("int") | Some("bigint") | Some("smallint")
3106                        | Some("tinyint") => Some("integer"),
3107                        Some("number") | Some("decimal") | Some("double") | Some("float")
3108                        | Some("numeric") => Some("number"),
3109                        Some("boolean") | Some("bool") => Some("boolean"),
3110                        Some("date") => Some("date"),
3111                        Some("timestamp") | Some("datetime") => Some("timestamp"),
3112                        Some("time") => Some("time"),
3113                        other => other,
3114                    };
3115
3116                    if items_obj.get("fields").is_some()
3117                        || items_obj.get("properties").is_some()
3118                        || normalized_items_type == Some("object")
3119                    {
3120                        // Array of objects - create parent column as ARRAY<OBJECT>
3121                        let array_physical_type = field_data
3122                            .get("physicalType")
3123                            .and_then(|v| v.as_str())
3124                            .map(|s| s.to_string());
3125                        columns.push(Column {
3126                            name: field_name.to_string(),
3127                            data_type: "ARRAY<OBJECT>".to_string(),
3128                            physical_type: array_physical_type,
3129                            nullable: !field_data
3130                                .get("required")
3131                                .and_then(|v| v.as_bool())
3132                                .unwrap_or(false),
3133                            description: field_data
3134                                .get("description")
3135                                .and_then(|v| v.as_str())
3136                                .unwrap_or("")
3137                                .to_string(),
3138                            ..Default::default()
3139                        });
3140
3141                        // Extract nested fields from items.properties or items.fields if present
3142                        // Handle both object format (legacy) and array format (ODCS v3.1.0)
3143                        let properties_obj =
3144                            items_obj.get("properties").and_then(|v| v.as_object());
3145                        let properties_arr = items_obj.get("properties").and_then(|v| v.as_array());
3146                        let fields_obj = items_obj.get("fields").and_then(|v| v.as_object());
3147
3148                        if let Some(fields_map) = properties_obj.or(fields_obj) {
3149                            // Object format (legacy): properties is a map
3150                            for (nested_field_name, nested_field_data) in fields_map {
3151                                if let Some(nested_field_obj) = nested_field_data.as_object() {
3152                                    // Check both "logicalType" (ODCS v3.1.0) and "type" (legacy)
3153                                    let nested_field_type = nested_field_obj
3154                                        .get("logicalType")
3155                                        .and_then(|v| v.as_str())
3156                                        .or_else(|| {
3157                                            nested_field_obj.get("type").and_then(|v| v.as_str())
3158                                        })
3159                                        .unwrap_or("STRING");
3160
3161                                    // Recursively parse nested fields with array prefix
3162                                    let nested_col_name =
3163                                        format!("{}.[].{}", field_name, nested_field_name);
3164                                    let mut local_errors = Vec::new();
3165                                    match self.parse_data_contract_field(
3166                                        &nested_col_name,
3167                                        nested_field_obj,
3168                                        data,
3169                                        &mut local_errors,
3170                                    ) {
3171                                        Ok(mut nested_cols) => {
3172                                            // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
3173                                            // Otherwise, it returns a single column
3174                                            columns.append(&mut nested_cols);
3175                                        }
3176                                        Err(_) => {
3177                                            // Fallback: create simple nested column
3178                                            let nested_physical_type = nested_field_obj
3179                                                .get("physicalType")
3180                                                .and_then(|v| v.as_str())
3181                                                .map(|s| s.to_string());
3182                                            columns.push(Column {
3183                                                name: nested_col_name,
3184                                                data_type: nested_field_type.to_uppercase(),
3185                                                physical_type: nested_physical_type,
3186                                                nullable: !nested_field_obj
3187                                                    .get("required")
3188                                                    .and_then(|v| v.as_bool())
3189                                                    .unwrap_or(false),
3190                                                description: nested_field_obj
3191                                                    .get("description")
3192                                                    .and_then(|v| v.as_str())
3193                                                    .unwrap_or("")
3194                                                    .to_string(),
3195                                                ..Default::default()
3196                                            });
3197                                        }
3198                                    }
3199                                }
3200                            }
3201                        } else if let Some(properties_list) = properties_arr {
3202                            // Array format (ODCS v3.1.0): properties is an array with 'name' field
3203                            let mut local_errors = Vec::new();
3204                            for prop_data in properties_list {
3205                                if let Some(prop_obj) = prop_data.as_object() {
3206                                    // Extract name from property object (required in v3.1.0)
3207                                    let nested_field_name = prop_obj
3208                                        .get("name")
3209                                        .or_else(|| prop_obj.get("id"))
3210                                        .and_then(|v| v.as_str())
3211                                        .unwrap_or("");
3212
3213                                    if !nested_field_name.is_empty() {
3214                                        // Recursively parse nested fields with array prefix
3215                                        let nested_col_name =
3216                                            format!("{}.[].{}", field_name, nested_field_name);
3217                                        match self.parse_data_contract_field(
3218                                            &nested_col_name,
3219                                            prop_obj,
3220                                            data,
3221                                            &mut local_errors,
3222                                        ) {
3223                                            Ok(mut nested_cols) => {
3224                                                // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
3225                                                // Otherwise, it returns a single column
3226                                                columns.append(&mut nested_cols);
3227                                            }
3228                                            Err(_) => {
3229                                                // Fallback: create simple nested column
3230                                                // Check both "logicalType" (ODCS v3.1.0) and "type" (legacy)
3231                                                let nested_field_type = prop_obj
3232                                                    .get("logicalType")
3233                                                    .and_then(|v| v.as_str())
3234                                                    .or_else(|| {
3235                                                        prop_obj
3236                                                            .get("type")
3237                                                            .and_then(|v| v.as_str())
3238                                                    })
3239                                                    .unwrap_or("STRING");
3240                                                let nested_physical_type = prop_obj
3241                                                    .get("physicalType")
3242                                                    .and_then(|v| v.as_str())
3243                                                    .map(|s| s.to_string());
3244                                                columns.push(Column {
3245                                                    name: nested_col_name,
3246                                                    data_type: nested_field_type.to_uppercase(),
3247                                                    physical_type: nested_physical_type,
3248                                                    nullable: !prop_obj
3249                                                        .get("required")
3250                                                        .and_then(|v| v.as_bool())
3251                                                        .unwrap_or(false),
3252                                                    description: prop_obj
3253                                                        .get("description")
3254                                                        .and_then(|v| v.as_str())
3255                                                        .unwrap_or("")
3256                                                        .to_string(),
3257                                                    ..Default::default()
3258                                                });
3259                                            }
3260                                        }
3261                                    }
3262                                }
3263                            }
3264                        }
3265
3266                        return Ok(columns);
3267                    } else if let Some(item_type) = items_obj.get("type").and_then(|v| v.as_str()) {
3268                        // Array of simple type
3269                        let array_physical_type = field_data
3270                            .get("physicalType")
3271                            .and_then(|v| v.as_str())
3272                            .map(|s| s.to_string());
3273                        columns.push(Column {
3274                            name: field_name.to_string(),
3275                            data_type: format!("ARRAY<{}>", normalize_data_type(item_type)),
3276                            physical_type: array_physical_type,
3277                            nullable: !field_data
3278                                .get("required")
3279                                .and_then(|v| v.as_bool())
3280                                .unwrap_or(false),
3281                            description: description.clone(),
3282                            quality: quality_rules.clone(),
3283                            relationships: ref_to_relationships(
3284                                &field_data
3285                                    .get("$ref")
3286                                    .and_then(|v| v.as_str())
3287                                    .map(|s| s.to_string()),
3288                            ),
3289                            ..Default::default()
3290                        });
3291                        return Ok(columns);
3292                    }
3293                } else if let Some(item_type_str) = items_val.as_str() {
3294                    // Array of simple type (string)
3295                    let array_physical_type = field_data
3296                        .get("physicalType")
3297                        .and_then(|v| v.as_str())
3298                        .map(|s| s.to_string());
3299                    columns.push(Column {
3300                        name: field_name.to_string(),
3301                        data_type: format!("ARRAY<{}>", normalize_data_type(item_type_str)),
3302                        physical_type: array_physical_type,
3303                        nullable: !field_data
3304                            .get("required")
3305                            .and_then(|v| v.as_bool())
3306                            .unwrap_or(false),
3307                        description: description.clone(),
3308                        quality: quality_rules.clone(),
3309                        relationships: ref_to_relationships(
3310                            &field_data
3311                                .get("$ref")
3312                                .and_then(|v| v.as_str())
3313                                .map(|s| s.to_string()),
3314                        ),
3315                        ..Default::default()
3316                    });
3317                    return Ok(columns);
3318                }
3319            }
3320            // Array without items - default to ARRAY<STRING>
3321            let array_physical_type = field_data
3322                .get("physicalType")
3323                .and_then(|v| v.as_str())
3324                .map(|s| s.to_string());
3325            columns.push(Column {
3326                name: field_name.to_string(),
3327                data_type: "ARRAY<STRING>".to_string(),
3328                physical_type: array_physical_type,
3329                nullable: !field_data
3330                    .get("required")
3331                    .and_then(|v| v.as_bool())
3332                    .unwrap_or(false),
3333                description: description.clone(),
3334                quality: quality_rules.clone(),
3335                relationships: ref_to_relationships(
3336                    &field_data
3337                        .get("$ref")
3338                        .and_then(|v| v.as_str())
3339                        .map(|s| s.to_string()),
3340                ),
3341                ..Default::default()
3342            });
3343            return Ok(columns);
3344        }
3345
3346        // Check if this is a nested object with fields or properties
3347        // Note: Export saves nested columns in properties.properties, but some formats use fields
3348        // Handle both object format (legacy/ODCL) and array format (ODCS v3.1.0)
3349        let nested_fields_obj = field_data
3350            .get("properties")
3351            .and_then(|v| v.as_object())
3352            .or_else(|| field_data.get("fields").and_then(|v| v.as_object()));
3353        let nested_fields_arr = field_data.get("properties").and_then(|v| v.as_array());
3354
3355        if field_type == "OBJECT" && (nested_fields_obj.is_some() || nested_fields_arr.is_some()) {
3356            // Inline nested object - create parent column as OBJECT and extract nested fields
3357
3358            // Extract physicalType for the parent OBJECT column
3359            let parent_physical_type = field_data
3360                .get("physicalType")
3361                .and_then(|v| v.as_str())
3362                .map(|s| s.to_string());
3363
3364            // Create parent column
3365            columns.push(Column {
3366                name: field_name.to_string(),
3367                data_type: "OBJECT".to_string(),
3368                physical_type: parent_physical_type,
3369                nullable: !field_data
3370                    .get("required")
3371                    .and_then(|v| v.as_bool())
3372                    .unwrap_or(false),
3373                description: description.clone(),
3374                quality: quality_rules.clone(),
3375                relationships: ref_to_relationships(
3376                    &field_data
3377                        .get("$ref")
3378                        .and_then(|v| v.as_str())
3379                        .map(|s| s.to_string()),
3380                ),
3381                ..Default::default()
3382            });
3383
3384            // Extract nested fields recursively - handle both object and array formats
3385            if let Some(fields_obj) = nested_fields_obj {
3386                // Object format (legacy/ODCL): properties is a map of name -> schema
3387                for (nested_field_name, nested_field_data) in fields_obj {
3388                    if let Some(nested_field_obj) = nested_field_data.as_object() {
3389                        let nested_field_type = nested_field_obj
3390                            .get("logicalType")
3391                            .and_then(|v| v.as_str())
3392                            .or_else(|| nested_field_obj.get("type").and_then(|v| v.as_str()))
3393                            .unwrap_or("STRING");
3394
3395                        // Recursively parse nested fields
3396                        let nested_col_name = format!("{}.{}", field_name, nested_field_name);
3397                        match self.parse_odcl_v3_property(&nested_col_name, nested_field_obj, data)
3398                        {
3399                            Ok(mut nested_cols) => {
3400                                // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
3401                                // Otherwise, it returns a single column
3402                                columns.append(&mut nested_cols);
3403                            }
3404                            Err(_) => {
3405                                // Fallback: create simple nested column
3406                                let nested_physical_type = nested_field_obj
3407                                    .get("physicalType")
3408                                    .and_then(|v| v.as_str())
3409                                    .map(|s| s.to_string());
3410                                columns.push(Column {
3411                                    name: nested_col_name,
3412                                    data_type: nested_field_type.to_uppercase(),
3413                                    physical_type: nested_physical_type,
3414                                    nullable: !nested_field_obj
3415                                        .get("required")
3416                                        .and_then(|v| v.as_bool())
3417                                        .unwrap_or(false),
3418                                    description: nested_field_obj
3419                                        .get("description")
3420                                        .and_then(|v| v.as_str())
3421                                        .unwrap_or("")
3422                                        .to_string(),
3423                                    ..Default::default()
3424                                });
3425                            }
3426                        }
3427                    }
3428                }
3429            } else if let Some(fields_arr) = nested_fields_arr {
3430                // Array format (ODCS v3.1.0): properties is an array with 'name' field
3431                for prop_data in fields_arr {
3432                    if let Some(prop_obj) = prop_data.as_object() {
3433                        // Extract name from property object (required in v3.1.0)
3434                        let nested_field_name = prop_obj
3435                            .get("name")
3436                            .or_else(|| prop_obj.get("id"))
3437                            .and_then(|v| v.as_str())
3438                            .unwrap_or("");
3439
3440                        if !nested_field_name.is_empty() {
3441                            let nested_field_type = prop_obj
3442                                .get("logicalType")
3443                                .and_then(|v| v.as_str())
3444                                .or_else(|| prop_obj.get("type").and_then(|v| v.as_str()))
3445                                .unwrap_or("STRING");
3446
3447                            // Recursively parse nested fields
3448                            let nested_col_name = format!("{}.{}", field_name, nested_field_name);
3449                            match self.parse_odcl_v3_property(&nested_col_name, prop_obj, data) {
3450                                Ok(mut nested_cols) => {
3451                                    // If nested field is itself an OBJECT/STRUCT, it will return multiple columns
3452                                    // Otherwise, it returns a single column
3453                                    columns.append(&mut nested_cols);
3454                                }
3455                                Err(_) => {
3456                                    // Fallback: create simple nested column
3457                                    let nested_physical_type = prop_obj
3458                                        .get("physicalType")
3459                                        .and_then(|v| v.as_str())
3460                                        .map(|s| s.to_string());
3461                                    columns.push(Column {
3462                                        name: nested_col_name,
3463                                        data_type: nested_field_type.to_uppercase(),
3464                                        physical_type: nested_physical_type,
3465                                        nullable: !prop_obj
3466                                            .get("required")
3467                                            .and_then(|v| v.as_bool())
3468                                            .unwrap_or(false),
3469                                        description: prop_obj
3470                                            .get("description")
3471                                            .and_then(|v| v.as_str())
3472                                            .unwrap_or("")
3473                                            .to_string(),
3474                                        ..Default::default()
3475                                    });
3476                                }
3477                            }
3478                        }
3479                    }
3480                }
3481            }
3482
3483            return Ok(columns);
3484        }
3485
3486        // Regular field (no $ref or $ref not found)
3487        let required = field_data
3488            .get("required")
3489            .and_then(|v| v.as_bool())
3490            .unwrap_or(false);
3491
3492        // Use description extracted at function start, or extract if not yet extracted
3493        let field_description = if description.is_empty() {
3494            field_data
3495                .get("description")
3496                .and_then(|v| v.as_str())
3497                .unwrap_or("")
3498                .to_string()
3499        } else {
3500            description
3501        };
3502
3503        // Use quality_rules extracted at function start, or extract if not yet extracted
3504        let mut column_quality_rules = quality_rules;
3505
3506        // Extract column-level quality rules if not already extracted
3507        if column_quality_rules.is_empty()
3508            && let Some(quality_val) = field_data.get("quality")
3509        {
3510            if let Some(arr) = quality_val.as_array() {
3511                // Array of quality rules
3512                for item in arr {
3513                    if let Some(obj) = item.as_object() {
3514                        let mut rule = HashMap::new();
3515                        for (key, value) in obj {
3516                            rule.insert(key.clone(), json_value_to_serde_value(value));
3517                        }
3518                        column_quality_rules.push(rule);
3519                    }
3520                }
3521            } else if let Some(obj) = quality_val.as_object() {
3522                // Single quality rule object
3523                let mut rule = HashMap::new();
3524                for (key, value) in obj {
3525                    rule.insert(key.clone(), json_value_to_serde_value(value));
3526                }
3527                column_quality_rules.push(rule);
3528            }
3529        }
3530
3531        // Note: ODCS v3.1.0 uses the `required` field at the property level to indicate non-nullable columns,
3532        // not a quality rule. We should NOT add a "not_null" quality rule as it's not a valid ODCS quality type.
3533        // The `required` field will be set based on the `required` parameter during column creation.
3534
3535        // Extract physicalType (ODCS v3.1.0) - the actual database type
3536        let physical_type = field_data
3537            .get("physicalType")
3538            .and_then(|v| v.as_str())
3539            .map(|s| s.to_string());
3540
3541        // Parse the column with all ODCS v3.1.0 metadata fields
3542        let column = self.parse_column_metadata_from_field(
3543            field_name,
3544            &field_type,
3545            physical_type,
3546            !required,
3547            field_description,
3548            column_quality_rules,
3549            field_data,
3550        );
3551
3552        columns.push(column);
3553
3554        Ok(columns)
3555    }
3556
3557    /// Parse all column metadata fields from ODCS v3.1.0 field data.
3558    /// This extracts all supported column-level metadata including:
3559    /// - businessName, physicalName, logicalTypeOptions
3560    /// - primaryKey, primaryKeyPosition, unique
3561    /// - partitioned, partitionKeyPosition, clustered
3562    /// - classification, criticalDataElement, encryptedName
3563    /// - transformSourceObjects, transformLogic, transformDescription
3564    /// - examples, authoritativeDefinitions, tags, customProperties
3565    #[allow(clippy::too_many_arguments)]
3566    fn parse_column_metadata_from_field(
3567        &self,
3568        name: &str,
3569        data_type: &str,
3570        physical_type: Option<String>,
3571        nullable: bool,
3572        description: String,
3573        quality: Vec<HashMap<String, serde_json::Value>>,
3574        field_data: &serde_json::Map<String, JsonValue>,
3575    ) -> Column {
3576        use crate::models::{AuthoritativeDefinition, LogicalTypeOptions};
3577
3578        // businessName
3579        let business_name = field_data
3580            .get("businessName")
3581            .and_then(|v| v.as_str())
3582            .map(|s| s.to_string());
3583
3584        // physicalName
3585        let physical_name = field_data
3586            .get("physicalName")
3587            .and_then(|v| v.as_str())
3588            .map(|s| s.to_string());
3589
3590        // logicalTypeOptions
3591        let logical_type_options = field_data.get("logicalTypeOptions").and_then(|v| {
3592            v.as_object().map(|opts| LogicalTypeOptions {
3593                min_length: opts.get("minLength").and_then(|v| v.as_i64()),
3594                max_length: opts.get("maxLength").and_then(|v| v.as_i64()),
3595                pattern: opts
3596                    .get("pattern")
3597                    .and_then(|v| v.as_str())
3598                    .map(|s| s.to_string()),
3599                format: opts
3600                    .get("format")
3601                    .and_then(|v| v.as_str())
3602                    .map(|s| s.to_string()),
3603                minimum: opts.get("minimum").cloned(),
3604                maximum: opts.get("maximum").cloned(),
3605                exclusive_minimum: opts.get("exclusiveMinimum").cloned(),
3606                exclusive_maximum: opts.get("exclusiveMaximum").cloned(),
3607                precision: opts
3608                    .get("precision")
3609                    .and_then(|v| v.as_i64())
3610                    .map(|n| n as i32),
3611                scale: opts.get("scale").and_then(|v| v.as_i64()).map(|n| n as i32),
3612            })
3613        });
3614
3615        // primaryKey
3616        let primary_key = field_data
3617            .get("primaryKey")
3618            .and_then(|v| v.as_bool())
3619            .unwrap_or(false);
3620
3621        // primaryKeyPosition
3622        let primary_key_position = field_data
3623            .get("primaryKeyPosition")
3624            .and_then(|v| v.as_i64())
3625            .map(|n| n as i32);
3626
3627        // unique
3628        let unique = field_data
3629            .get("unique")
3630            .and_then(|v| v.as_bool())
3631            .unwrap_or(false);
3632
3633        // partitioned
3634        let partitioned = field_data
3635            .get("partitioned")
3636            .and_then(|v| v.as_bool())
3637            .unwrap_or(false);
3638
3639        // partitionKeyPosition
3640        let partition_key_position = field_data
3641            .get("partitionKeyPosition")
3642            .and_then(|v| v.as_i64())
3643            .map(|n| n as i32);
3644
3645        // clustered
3646        let clustered = field_data
3647            .get("clustered")
3648            .and_then(|v| v.as_bool())
3649            .unwrap_or(false);
3650
3651        // classification
3652        let classification = field_data
3653            .get("classification")
3654            .and_then(|v| v.as_str())
3655            .map(|s| s.to_string());
3656
3657        // criticalDataElement
3658        let critical_data_element = field_data
3659            .get("criticalDataElement")
3660            .and_then(|v| v.as_bool())
3661            .unwrap_or(false);
3662
3663        // encryptedName
3664        let encrypted_name = field_data
3665            .get("encryptedName")
3666            .and_then(|v| v.as_str())
3667            .map(|s| s.to_string());
3668
3669        // transformSourceObjects
3670        let transform_source_objects = field_data
3671            .get("transformSourceObjects")
3672            .and_then(|v| v.as_array())
3673            .map(|arr| {
3674                arr.iter()
3675                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3676                    .collect()
3677            })
3678            .unwrap_or_default();
3679
3680        // transformLogic
3681        let transform_logic = field_data
3682            .get("transformLogic")
3683            .and_then(|v| v.as_str())
3684            .map(|s| s.to_string());
3685
3686        // transformDescription
3687        let transform_description = field_data
3688            .get("transformDescription")
3689            .and_then(|v| v.as_str())
3690            .map(|s| s.to_string());
3691
3692        // examples
3693        let examples = field_data
3694            .get("examples")
3695            .and_then(|v| v.as_array())
3696            .map(|arr| arr.to_vec())
3697            .unwrap_or_default();
3698
3699        // authoritativeDefinitions
3700        let authoritative_definitions = field_data
3701            .get("authoritativeDefinitions")
3702            .and_then(|v| v.as_array())
3703            .map(|arr| {
3704                arr.iter()
3705                    .filter_map(|item| {
3706                        item.as_object().map(|obj| AuthoritativeDefinition {
3707                            definition_type: obj
3708                                .get("type")
3709                                .and_then(|v| v.as_str())
3710                                .unwrap_or("")
3711                                .to_string(),
3712                            url: obj
3713                                .get("url")
3714                                .and_then(|v| v.as_str())
3715                                .unwrap_or("")
3716                                .to_string(),
3717                        })
3718                    })
3719                    .collect()
3720            })
3721            .unwrap_or_default();
3722
3723        // tags
3724        let tags = field_data
3725            .get("tags")
3726            .and_then(|v| v.as_array())
3727            .map(|arr| {
3728                arr.iter()
3729                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3730                    .collect()
3731            })
3732            .unwrap_or_default();
3733
3734        // customProperties
3735        let custom_properties = field_data
3736            .get("customProperties")
3737            .and_then(|v| v.as_array())
3738            .map(|arr| {
3739                arr.iter()
3740                    .filter_map(|item| {
3741                        item.as_object().and_then(|obj| {
3742                            let key = obj.get("property").and_then(|v| v.as_str())?;
3743                            let value = obj.get("value").cloned()?;
3744                            Some((key.to_string(), value))
3745                        })
3746                    })
3747                    .collect()
3748            })
3749            .unwrap_or_default();
3750
3751        // businessKey (secondary_key)
3752        let secondary_key = field_data
3753            .get("businessKey")
3754            .and_then(|v| v.as_bool())
3755            .unwrap_or(false);
3756
3757        // enum values
3758        let enum_values = field_data
3759            .get("enum")
3760            .and_then(|v| v.as_array())
3761            .map(|arr| {
3762                arr.iter()
3763                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3764                    .collect()
3765            })
3766            .unwrap_or_default();
3767
3768        // constraints
3769        let constraints = field_data
3770            .get("constraints")
3771            .and_then(|v| v.as_array())
3772            .map(|arr| {
3773                arr.iter()
3774                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3775                    .collect()
3776            })
3777            .unwrap_or_default();
3778
3779        Column {
3780            name: name.to_string(),
3781            data_type: data_type.to_string(),
3782            physical_type,
3783            physical_name,
3784            nullable,
3785            description,
3786            quality,
3787            business_name,
3788            logical_type_options,
3789            primary_key,
3790            primary_key_position,
3791            unique,
3792            partitioned,
3793            partition_key_position,
3794            clustered,
3795            classification,
3796            critical_data_element,
3797            encrypted_name,
3798            transform_source_objects,
3799            transform_logic,
3800            transform_description,
3801            examples,
3802            authoritative_definitions,
3803            tags,
3804            custom_properties,
3805            secondary_key,
3806            enum_values,
3807            constraints,
3808            foreign_key: self.parse_foreign_key_from_data_contract(field_data),
3809            relationships: self.parse_relationships_from_field(field_data),
3810            ..Default::default()
3811        }
3812    }
3813
3814    /// Parse foreign key from Data Contract field data.
3815    fn parse_foreign_key_from_data_contract(
3816        &self,
3817        field_data: &serde_json::Map<String, JsonValue>,
3818    ) -> Option<ForeignKey> {
3819        field_data
3820            .get("foreignKey")
3821            .and_then(|v| v.as_object())
3822            .map(|fk_obj| ForeignKey {
3823                table_id: fk_obj
3824                    .get("table")
3825                    .or_else(|| fk_obj.get("table_id"))
3826                    .and_then(|v| v.as_str())
3827                    .unwrap_or("")
3828                    .to_string(),
3829                column_name: fk_obj
3830                    .get("column")
3831                    .or_else(|| fk_obj.get("column_name"))
3832                    .and_then(|v| v.as_str())
3833                    .unwrap_or("")
3834                    .to_string(),
3835            })
3836    }
3837
3838    /// Parse relationships from Data Contract field data.
3839    /// Also converts $ref to a relationship entry if present.
3840    fn parse_relationships_from_field(
3841        &self,
3842        field_data: &serde_json::Map<String, JsonValue>,
3843    ) -> Vec<PropertyRelationship> {
3844        let mut relationships = Vec::new();
3845
3846        // Parse relationships array from ODCS v3.1.0 format
3847        if let Some(rels_array) = field_data.get("relationships").and_then(|v| v.as_array()) {
3848            for rel in rels_array {
3849                if let Some(rel_obj) = rel.as_object() {
3850                    let rel_type = rel_obj
3851                        .get("type")
3852                        .and_then(|v| v.as_str())
3853                        .unwrap_or("foreignKey")
3854                        .to_string();
3855                    let to = rel_obj
3856                        .get("to")
3857                        .and_then(|v| v.as_str())
3858                        .unwrap_or("")
3859                        .to_string();
3860
3861                    if !to.is_empty() {
3862                        relationships.push(PropertyRelationship {
3863                            relationship_type: rel_type,
3864                            to,
3865                        });
3866                    }
3867                }
3868            }
3869        }
3870
3871        // Convert $ref to relationship if present and no relationships were parsed
3872        if relationships.is_empty()
3873            && let Some(ref_str) = field_data.get("$ref").and_then(|v| v.as_str())
3874        {
3875            // Convert $ref path to ODCS relationship format
3876            let to = if ref_str.starts_with("#/definitions/") {
3877                let def_path = ref_str.strip_prefix("#/definitions/").unwrap_or(ref_str);
3878                format!("definitions/{}", def_path)
3879            } else if ref_str.starts_with("#/") {
3880                ref_str.strip_prefix("#/").unwrap_or(ref_str).to_string()
3881            } else {
3882                ref_str.to_string()
3883            };
3884
3885            relationships.push(PropertyRelationship {
3886                relationship_type: "foreignKey".to_string(),
3887                to,
3888            });
3889        }
3890
3891        relationships
3892    }
3893
3894    /// Extract database type from servers in Data Contract format.
3895    fn extract_database_type_from_servers(&self, data: &JsonValue) -> Option<DatabaseType> {
3896        // Data Contract format: servers can be object or array
3897        if let Some(servers_obj) = data.get("servers").and_then(|v| v.as_object()) {
3898            // Object format: { "server_name": { "type": "..." } }
3899            if let Some((_, server_data)) = servers_obj.iter().next()
3900                && let Some(server_obj) = server_data.as_object()
3901            {
3902                return server_obj
3903                    .get("type")
3904                    .and_then(|v| v.as_str())
3905                    .and_then(|s| self.parse_database_type(s));
3906            }
3907        } else if let Some(servers_arr) = data.get("servers").and_then(|v| v.as_array()) {
3908            // Array format: [ { "server": "...", "type": "..." } ]
3909            if let Some(server_obj) = servers_arr.first().and_then(|v| v.as_object()) {
3910                return server_obj
3911                    .get("type")
3912                    .and_then(|v| v.as_str())
3913                    .and_then(|s| self.parse_database_type(s));
3914            }
3915        }
3916        None
3917    }
3918
3919    /// Parse database type string to enum.
3920    fn parse_database_type(&self, s: &str) -> Option<DatabaseType> {
3921        match s.to_lowercase().as_str() {
3922            "databricks" | "databricks_delta" => Some(DatabaseType::DatabricksDelta),
3923            "postgres" | "postgresql" => Some(DatabaseType::Postgres),
3924            "mysql" => Some(DatabaseType::Mysql),
3925            "sql_server" | "sqlserver" => Some(DatabaseType::SqlServer),
3926            "aws_glue" | "glue" => Some(DatabaseType::AwsGlue),
3927            _ => None,
3928        }
3929    }
3930
3931    /// Parse STRUCT type definition from string (e.g., "ARRAY<STRUCT<ID: STRING, NAME: STRING>>").
3932    /// Parse STRUCT type from string and create nested columns
3933    /// This is public so it can be used by SQL importer to parse STRUCT types
3934    pub fn parse_struct_type_from_string(
3935        &self,
3936        field_name: &str,
3937        type_str: &str,
3938        field_data: &serde_json::Map<String, JsonValue>,
3939    ) -> Result<Vec<Column>> {
3940        let mut columns = Vec::new();
3941
3942        // Normalize whitespace - replace newlines and multiple spaces with single space
3943        let normalized_type = type_str
3944            .lines()
3945            .map(|line| line.trim())
3946            .filter(|line| !line.is_empty())
3947            .collect::<Vec<_>>()
3948            .join(" ");
3949
3950        let type_str_upper = normalized_type.to_uppercase();
3951
3952        // Check if it's ARRAY<STRUCT<...>>
3953        let is_array = type_str_upper.starts_with("ARRAY<");
3954        let struct_start = type_str_upper.find("STRUCT<");
3955
3956        if let Some(start_pos) = struct_start {
3957            // Extract STRUCT content - start from STRUCT< and find its closing >
3958            // For ARRAY<STRUCT<...>>, we still need to find the STRUCT<...> closing bracket
3959            let struct_content_start = start_pos + 7; // Skip "STRUCT<"
3960            let struct_content = &normalized_type[struct_content_start..];
3961
3962            // Find matching closing bracket for STRUCT<, handling nested STRUCTs
3963            // We need to find the > that closes STRUCT<, not ARRAY<
3964            let mut depth = 1;
3965            let mut end_pos = None;
3966            for (i, ch) in struct_content.char_indices() {
3967                match ch {
3968                    '<' => depth += 1,
3969                    '>' => {
3970                        depth -= 1;
3971                        if depth == 0 {
3972                            end_pos = Some(i);
3973                            break;
3974                        }
3975                    }
3976                    _ => {}
3977                }
3978            }
3979
3980            // If no closing bracket found, try to infer it (handle malformed YAML)
3981            let struct_fields_str = if let Some(end) = end_pos {
3982                &struct_content[..end]
3983            } else {
3984                // Missing closing bracket - use everything up to the end
3985                struct_content.trim_end_matches('>').trim()
3986            };
3987
3988            // Parse fields: "ID: STRING, NAME: STRING, ..."
3989            let fields = self.parse_struct_fields_from_string(struct_fields_str)?;
3990
3991            // Create nested columns, handling nested STRUCTs recursively
3992            // For ARRAY<STRUCT<...>>, use .[] notation for array elements: parent.[].field
3993            // For STRUCT<...>, use dot notation: parent.field
3994            for (nested_name, nested_type) in fields {
3995                let nested_type_upper = nested_type.to_uppercase();
3996                let nested_col_name = if is_array {
3997                    format!("{}.[].{}", field_name, nested_name)
3998                } else {
3999                    format!("{}.{}", field_name, nested_name)
4000                };
4001
4002                // Check if this field is itself a STRUCT or ARRAY<STRUCT>
4003                // Handle multiple levels of nesting: STRUCT<...>, ARRAY<STRUCT<...>>, ARRAY<STRUCT<...>> within STRUCT, etc.
4004                let is_nested_struct = nested_type_upper.starts_with("STRUCT<");
4005                let is_nested_array_struct = nested_type_upper.starts_with("ARRAY<STRUCT<");
4006
4007                if is_nested_struct || is_nested_array_struct {
4008                    // Recursively parse nested STRUCT or ARRAY<STRUCT>
4009                    // The nested_col_name already includes the parent path:
4010                    // - For STRUCT within ARRAY: "items.[].details"
4011                    // - For STRUCT within STRUCT: "orderInfo.metadata"
4012                    // - For ARRAY<STRUCT> within STRUCT: "parent.items"
4013                    // - For ARRAY<STRUCT> within ARRAY<STRUCT>: "parent.[].items"
4014                    // Recursive calls will correctly build the full path with proper notation
4015                    match self.parse_struct_type_from_string(
4016                        &nested_col_name,
4017                        &nested_type,
4018                        field_data,
4019                    ) {
4020                        Ok(nested_cols) => {
4021                            // Add all recursively parsed columns
4022                            // These will have names like:
4023                            // - items.[].details.name (STRUCT within ARRAY<STRUCT>)
4024                            // - orderInfo.metadata.field (STRUCT within STRUCT)
4025                            // - parent.items.[].field (ARRAY<STRUCT> within STRUCT)
4026                            // - parent.[].items.[].field (ARRAY<STRUCT> within ARRAY<STRUCT>)
4027                            columns.extend(nested_cols);
4028                        }
4029                        Err(_) => {
4030                            // If recursive parsing fails, add the parent column with STRUCT/ARRAY type
4031                            let fallback_data_type = if is_nested_array_struct {
4032                                "ARRAY<STRUCT<...>>".to_string()
4033                            } else {
4034                                "STRUCT<...>".to_string()
4035                            };
4036                            let nested_physical_type = field_data
4037                                .get("physicalType")
4038                                .and_then(|v| v.as_str())
4039                                .map(|s| s.to_string());
4040                            columns.push(Column {
4041                                name: nested_col_name,
4042                                data_type: fallback_data_type,
4043                                physical_type: nested_physical_type,
4044                                nullable: !field_data
4045                                    .get("required")
4046                                    .and_then(|v| v.as_bool())
4047                                    .unwrap_or(false),
4048                                description: field_data
4049                                    .get("description")
4050                                    .and_then(|v| v.as_str())
4051                                    .unwrap_or("")
4052                                    .to_string(),
4053                                ..Default::default()
4054                            });
4055                        }
4056                    }
4057                } else if nested_type_upper.starts_with("ARRAY<") {
4058                    // Handle ARRAY of simple types (not STRUCT) - these don't need nested columns
4059                    // But if it's ARRAY<STRUCT<...>>, it would have been caught above
4060                    let nested_physical_type = field_data
4061                        .get("physicalType")
4062                        .and_then(|v| v.as_str())
4063                        .map(|s| s.to_string());
4064                    columns.push(Column {
4065                        name: nested_col_name,
4066                        data_type: normalize_data_type(&nested_type),
4067                        physical_type: nested_physical_type,
4068                        nullable: !field_data
4069                            .get("required")
4070                            .and_then(|v| v.as_bool())
4071                            .unwrap_or(false),
4072                        description: field_data
4073                            .get("description")
4074                            .and_then(|v| v.as_str())
4075                            .unwrap_or("")
4076                            .to_string(),
4077                        ..Default::default()
4078                    });
4079                } else {
4080                    // Simple nested field (not a STRUCT)
4081                    let nested_physical_type = field_data
4082                        .get("physicalType")
4083                        .and_then(|v| v.as_str())
4084                        .map(|s| s.to_string());
4085                    columns.push(Column {
4086                        name: nested_col_name,
4087                        data_type: normalize_data_type(&nested_type),
4088                        physical_type: nested_physical_type,
4089                        nullable: !field_data
4090                            .get("required")
4091                            .and_then(|v| v.as_bool())
4092                            .unwrap_or(false),
4093                        description: field_data
4094                            .get("description")
4095                            .and_then(|v| v.as_str())
4096                            .unwrap_or("")
4097                            .to_string(),
4098                        ..Default::default()
4099                    });
4100                }
4101            }
4102
4103            return Ok(columns);
4104        }
4105
4106        // If no STRUCT found, return empty (fallback to regular parsing)
4107        Ok(Vec::new())
4108    }
4109
4110    /// Parse STRUCT fields from string (e.g., "ID: STRING, NAME: STRING").
4111    fn parse_struct_fields_from_string(&self, fields_str: &str) -> Result<Vec<(String, String)>> {
4112        let mut fields = Vec::new();
4113        let mut current_field = String::new();
4114        let mut depth = 0;
4115        let mut in_string = false;
4116        let mut string_char = None;
4117
4118        for ch in fields_str.chars() {
4119            match ch {
4120                '\'' | '"' if !in_string || Some(ch) == string_char => {
4121                    if in_string {
4122                        in_string = false;
4123                        string_char = None;
4124                    } else {
4125                        in_string = true;
4126                        string_char = Some(ch);
4127                    }
4128                    current_field.push(ch);
4129                }
4130                '<' if !in_string => {
4131                    depth += 1;
4132                    current_field.push(ch);
4133                }
4134                '>' if !in_string => {
4135                    depth -= 1;
4136                    current_field.push(ch);
4137                }
4138                ',' if !in_string && depth == 0 => {
4139                    // End of current field
4140                    let trimmed = current_field.trim();
4141                    if !trimmed.is_empty()
4142                        && let Some((name, type_part)) = self.parse_field_definition(trimmed)
4143                    {
4144                        fields.push((name, type_part));
4145                    }
4146                    current_field.clear();
4147                }
4148                _ => {
4149                    current_field.push(ch);
4150                }
4151            }
4152        }
4153
4154        // Handle last field
4155        let trimmed = current_field.trim();
4156        if !trimmed.is_empty()
4157            && let Some((name, type_part)) = self.parse_field_definition(trimmed)
4158        {
4159            fields.push((name, type_part));
4160        }
4161
4162        Ok(fields)
4163    }
4164
4165    /// Parse a single field definition (e.g., "ID: STRING" or "DETAILS: STRUCT<...>").
4166    fn parse_field_definition(&self, field_def: &str) -> Option<(String, String)> {
4167        // Split by colon, but handle nested STRUCTs
4168        let colon_pos = field_def.find(':')?;
4169        let name = field_def[..colon_pos].trim().to_string();
4170        let type_part = field_def[colon_pos + 1..].trim().to_string();
4171
4172        if name.is_empty() || type_part.is_empty() {
4173            return None;
4174        }
4175
4176        Some((name, type_part))
4177    }
4178
4179    /// Extract catalog and schema from customProperties.
4180    fn extract_catalog_schema(&self, data: &JsonValue) -> (Option<String>, Option<String>) {
4181        let mut catalog_name = None;
4182        let mut schema_name = None;
4183
4184        if let Some(custom_props) = data.get("customProperties").and_then(|v| v.as_array()) {
4185            for prop in custom_props {
4186                if let Some(prop_obj) = prop.as_object() {
4187                    let prop_key = prop_obj
4188                        .get("property")
4189                        .and_then(|v| v.as_str())
4190                        .unwrap_or("");
4191                    let prop_value = prop_obj.get("value").and_then(|v| v.as_str());
4192
4193                    match prop_key {
4194                        "catalogName" | "catalog_name" => {
4195                            catalog_name = prop_value.map(|s| s.to_string());
4196                        }
4197                        "schemaName" | "schema_name" => {
4198                            schema_name = prop_value.map(|s| s.to_string());
4199                        }
4200                        _ => {}
4201                    }
4202                }
4203            }
4204        }
4205
4206        // Also check direct fields
4207        if catalog_name.is_none() {
4208            catalog_name = data
4209                .get("catalog_name")
4210                .and_then(|v| v.as_str())
4211                .map(|s| s.to_string());
4212        }
4213        if schema_name.is_none() {
4214            schema_name = data
4215                .get("schema_name")
4216                .and_then(|v| v.as_str())
4217                .map(|s| s.to_string());
4218        }
4219
4220        (catalog_name, schema_name)
4221    }
4222}
4223
4224impl Default for ODCSImporter {
4225    fn default() -> Self {
4226        Self::new()
4227    }
4228}
4229
4230#[cfg(test)]
4231mod tests {
4232    use super::*;
4233
4234    #[test]
4235    fn test_parse_simple_odcl_table() {
4236        let mut parser = ODCSImporter::new();
4237        let odcl_yaml = r#"
4238name: users
4239columns:
4240  - name: id
4241    data_type: INT
4242    nullable: false
4243    primary_key: true
4244  - name: name
4245    data_type: VARCHAR(255)
4246    nullable: false
4247database_type: Postgres
4248"#;
4249
4250        let (table, errors) = parser.parse(odcl_yaml).unwrap();
4251        assert_eq!(table.name, "users");
4252        assert_eq!(table.columns.len(), 2);
4253        assert_eq!(table.columns[0].name, "id");
4254        assert_eq!(table.database_type, Some(DatabaseType::Postgres));
4255        assert_eq!(errors.len(), 0);
4256    }
4257
4258    #[test]
4259    fn test_parse_odcl_with_metadata() {
4260        let mut parser = ODCSImporter::new();
4261        let odcl_yaml = r#"
4262name: users
4263columns:
4264  - name: id
4265    data_type: INT
4266medallion_layer: gold
4267scd_pattern: TYPE_2
4268odcl_metadata:
4269  description: "User table"
4270  owner: "data-team"
4271"#;
4272
4273        let (table, errors) = parser.parse(odcl_yaml).unwrap();
4274        assert_eq!(table.medallion_layers.len(), 1);
4275        assert_eq!(table.medallion_layers[0], MedallionLayer::Gold);
4276        assert_eq!(table.scd_pattern, Some(SCDPattern::Type2));
4277        if let Some(serde_json::Value::String(desc)) = table.odcl_metadata.get("description") {
4278            assert_eq!(desc, "User table");
4279        }
4280        assert_eq!(errors.len(), 0);
4281    }
4282
4283    #[test]
4284    fn test_parse_odcl_with_data_vault() {
4285        let mut parser = ODCSImporter::new();
4286        let odcl_yaml = r#"
4287name: hub_customer
4288columns:
4289  - name: customer_key
4290    data_type: VARCHAR(50)
4291data_vault_classification: Hub
4292"#;
4293
4294        let (table, errors) = parser.parse(odcl_yaml).unwrap();
4295        assert_eq!(
4296            table.data_vault_classification,
4297            Some(DataVaultClassification::Hub)
4298        );
4299        assert_eq!(errors.len(), 0);
4300    }
4301
4302    #[test]
4303    fn test_parse_invalid_odcl() {
4304        let mut parser = ODCSImporter::new();
4305        let invalid_yaml = "not: valid: yaml: structure:";
4306
4307        // Should fail to parse YAML
4308        assert!(parser.parse(invalid_yaml).is_err());
4309    }
4310
4311    #[test]
4312    fn test_parse_odcl_missing_required_fields() {
4313        let mut parser = ODCSImporter::new();
4314        let non_conformant = r#"
4315name: users
4316# Missing required columns field
4317"#;
4318
4319        // Should fail with missing columns
4320        assert!(parser.parse(non_conformant).is_err());
4321    }
4322
4323    #[test]
4324    fn test_parse_odcl_with_foreign_key() {
4325        let mut parser = ODCSImporter::new();
4326        let odcl_yaml = r#"
4327name: orders
4328columns:
4329  - name: id
4330    data_type: INT
4331    primary_key: true
4332  - name: user_id
4333    data_type: INT
4334    foreign_key:
4335      table_id: users
4336      column_name: id
4337"#;
4338
4339        let (table, errors) = parser.parse(odcl_yaml).unwrap();
4340        assert_eq!(table.columns.len(), 2);
4341        let user_id_col = table.columns.iter().find(|c| c.name == "user_id").unwrap();
4342        assert!(user_id_col.foreign_key.is_some());
4343        assert_eq!(user_id_col.foreign_key.as_ref().unwrap().table_id, "users");
4344        assert_eq!(errors.len(), 0);
4345    }
4346
4347    #[test]
4348    fn test_parse_odcl_with_constraints() {
4349        let mut parser = ODCSImporter::new();
4350        let odcl_yaml = r#"
4351name: products
4352columns:
4353  - name: id
4354    data_type: INT
4355    primary_key: true
4356  - name: name
4357    data_type: VARCHAR(255)
4358    nullable: false
4359    constraints:
4360      - UNIQUE
4361      - NOT NULL
4362"#;
4363
4364        let (table, errors) = parser.parse(odcl_yaml).unwrap();
4365        assert_eq!(table.columns.len(), 2);
4366        let name_col = table.columns.iter().find(|c| c.name == "name").unwrap();
4367        assert!(!name_col.nullable);
4368        assert!(name_col.constraints.contains(&"UNIQUE".to_string()));
4369        assert_eq!(errors.len(), 0);
4370    }
4371
4372    #[test]
4373    fn test_parse_odcs_v3_multiple_tables() {
4374        let mut parser = ODCSImporter::new();
4375        let odcs_yaml = r#"
4376apiVersion: v3.1.0
4377kind: DataContract
4378id: 550e8400-e29b-41d4-a716-446655440000
4379version: 1.0.0
4380name: multi_table_contract
4381domain: sales
4382schema:
4383  - name: orders
4384    properties:
4385      - name: order_id
4386        logicalType: integer
4387        primaryKey: true
4388      - name: customer_id
4389        logicalType: integer
4390      - name: total_amount
4391        logicalType: decimal
4392  - name: order_items
4393    properties:
4394      - name: item_id
4395        logicalType: integer
4396        primaryKey: true
4397      - name: order_id
4398        logicalType: integer
4399      - name: product_name
4400        logicalType: string
4401      - name: quantity
4402        logicalType: integer
4403  - name: customers
4404    properties:
4405      - name: customer_id
4406        logicalType: integer
4407        primaryKey: true
4408      - name: name
4409        logicalType: string
4410      - name: email
4411        logicalType: string
4412"#;
4413
4414        let result = parser.import(odcs_yaml).unwrap();
4415
4416        // Should return 3 tables
4417        assert_eq!(
4418            result.tables.len(),
4419            3,
4420            "Expected 3 tables, got {}",
4421            result.tables.len()
4422        );
4423
4424        // Verify table names
4425        let table_names: Vec<&str> = result
4426            .tables
4427            .iter()
4428            .map(|t| t.name.as_deref().unwrap_or(""))
4429            .collect();
4430        assert!(table_names.contains(&"orders"), "Missing 'orders' table");
4431        assert!(
4432            table_names.contains(&"order_items"),
4433            "Missing 'order_items' table"
4434        );
4435        assert!(
4436            table_names.contains(&"customers"),
4437            "Missing 'customers' table"
4438        );
4439
4440        // Verify column counts for each table
4441        let orders_table = result
4442            .tables
4443            .iter()
4444            .find(|t| t.name.as_deref() == Some("orders"))
4445            .unwrap();
4446        assert_eq!(
4447            orders_table.columns.len(),
4448            3,
4449            "orders table should have 3 columns"
4450        );
4451
4452        let order_items_table = result
4453            .tables
4454            .iter()
4455            .find(|t| t.name.as_deref() == Some("order_items"))
4456            .unwrap();
4457        assert_eq!(
4458            order_items_table.columns.len(),
4459            4,
4460            "order_items table should have 4 columns"
4461        );
4462
4463        let customers_table = result
4464            .tables
4465            .iter()
4466            .find(|t| t.name.as_deref() == Some("customers"))
4467            .unwrap();
4468        assert_eq!(
4469            customers_table.columns.len(),
4470            3,
4471            "customers table should have 3 columns"
4472        );
4473
4474        // Verify contract-level metadata is shared across all tables
4475        for table in &result.tables {
4476            assert_eq!(table.api_version.as_deref(), Some("v3.1.0"));
4477            assert_eq!(table.kind.as_deref(), Some("DataContract"));
4478            assert_eq!(table.domain.as_deref(), Some("sales"));
4479        }
4480
4481        // Verify table indices are sequential
4482        assert_eq!(result.tables[0].table_index, 0);
4483        assert_eq!(result.tables[1].table_index, 1);
4484        assert_eq!(result.tables[2].table_index, 2);
4485    }
4486
4487    #[test]
4488    fn test_parse_odcs_v3_single_table_backwards_compatible() {
4489        // Ensure single-table ODCS files still work correctly
4490        let mut parser = ODCSImporter::new();
4491        let odcs_yaml = r#"
4492apiVersion: v3.1.0
4493kind: DataContract
4494id: 550e8400-e29b-41d4-a716-446655440001
4495version: 1.0.0
4496name: single_table_contract
4497schema:
4498  - name: users
4499    properties:
4500      - name: user_id
4501        logicalType: integer
4502        primaryKey: true
4503      - name: username
4504        logicalType: string
4505"#;
4506
4507        let result = parser.import(odcs_yaml).unwrap();
4508
4509        // Should return exactly 1 table
4510        assert_eq!(
4511            result.tables.len(),
4512            1,
4513            "Expected 1 table for single-schema ODCS"
4514        );
4515        assert_eq!(result.tables[0].name.as_deref(), Some("users"));
4516        assert_eq!(result.tables[0].columns.len(), 2);
4517    }
4518}