data_modelling_sdk/import/
sql.rs

1//! SQL Import functionality
2//!
3//! Provides parsing of CREATE TABLE statements from various SQL dialects.
4//!
5//! Uses `sqlparser` to parse CREATE TABLE statements into SDK import primitives.
6//!
7//! # Validation
8//!
9//! All imported table and column names are validated for:
10//! - Valid identifier format
11//! - Maximum length limits
12//! - SQL reserved word detection
13
14use super::{ColumnData, ImportError, ImportResult, TableData};
15use crate::validation::input::{validate_column_name, validate_data_type, validate_table_name};
16use anyhow::Result;
17use once_cell::sync::Lazy;
18use regex::Regex;
19use sqlparser::ast::{ColumnDef, ColumnOption, ObjectName, Statement, TableConstraint};
20use sqlparser::dialect::{
21    AnsiDialect, BigQueryDialect, DatabricksDialect as OfficialDatabricksDialect, Dialect,
22    GenericDialect, HiveDialect, MsSqlDialect, MySqlDialect, PostgreSqlDialect, SQLiteDialect,
23};
24use sqlparser::parser::Parser;
25use std::collections::HashMap;
26
27// Static regex patterns compiled once for performance
28static RE_IDENTIFIER: Lazy<Regex> =
29    Lazy::new(|| Regex::new(r"(?i)IDENTIFIER\s*\(\s*([^)]+)\s*\)").expect("Invalid regex"));
30static RE_LITERAL: Lazy<Regex> =
31    Lazy::new(|| Regex::new(r#"(?:'([^']*)'|"([^"]*)")"#).expect("Invalid regex"));
32static RE_MATERIALIZED_VIEW: Lazy<Regex> =
33    Lazy::new(|| Regex::new(r"(?i)CREATE\s+MATERIALIZED\s+VIEW").expect("Invalid regex"));
34static RE_TABLE_COMMENT_SINGLE: Lazy<Regex> =
35    Lazy::new(|| Regex::new(r#"(?i)\)\s+COMMENT\s+'[^']*'"#).expect("Invalid regex"));
36static RE_TABLE_COMMENT_DOUBLE: Lazy<Regex> =
37    Lazy::new(|| Regex::new(r#"(?i)\)\s+COMMENT\s+"[^"]*""#).expect("Invalid regex"));
38static RE_TBLPROPERTIES: Lazy<Regex> =
39    Lazy::new(|| Regex::new(r"(?i)TBLPROPERTIES\s*\(").expect("Invalid regex"));
40static RE_CLUSTER_BY: Lazy<Regex> = Lazy::new(|| {
41    Regex::new(r"(?i)\s+CLUSTER\s+BY\s+(?:AUTO|\([^)]*\)|[\w,\s]+)").expect("Invalid regex")
42});
43static RE_VARIABLE_TYPE: Lazy<Regex> =
44    Lazy::new(|| Regex::new(r":\s*:([a-zA-Z_][a-zA-Z0-9_]*)").expect("Invalid regex"));
45static RE_ARRAY_VARIABLE: Lazy<Regex> =
46    Lazy::new(|| Regex::new(r"ARRAY\s*<\s*:([a-zA-Z_][a-zA-Z0-9_]*)\s*>").expect("Invalid regex"));
47static RE_FIELD_VARIABLE: Lazy<Regex> = Lazy::new(|| {
48    Regex::new(r"(\w+)\s+:\w+\s+([A-Z][A-Z0-9_]*(?:<[^>]*>)?)").expect("Invalid regex")
49});
50static RE_COMPLEX_TYPE: Lazy<Regex> =
51    Lazy::new(|| Regex::new(r"(\w+)\s+(STRUCT<|ARRAY<|MAP<)").expect("Invalid regex"));
52
53/// Custom Databricks SQL dialect implementation
54///
55/// Extends the official DatabricksDialect to support additional Databricks-specific syntax patterns:
56/// - Variable references (`:variable_name`) in identifiers
57/// - Enhanced backtick-quoted identifiers
58#[derive(Debug)]
59struct CustomDatabricksDialect {
60    official: OfficialDatabricksDialect,
61}
62
63impl CustomDatabricksDialect {
64    fn new() -> Self {
65        Self {
66            official: OfficialDatabricksDialect {},
67        }
68    }
69}
70
71impl Dialect for CustomDatabricksDialect {
72    fn is_identifier_start(&self, ch: char) -> bool {
73        // Allow ':' as identifier start for variable references like :variable_name
74        // Delegate to official dialect first, then add our custom support
75        self.official.is_identifier_start(ch) || ch == ':'
76    }
77
78    fn is_identifier_part(&self, ch: char) -> bool {
79        // Allow ':' as identifier part for variable references
80        self.official.is_identifier_part(ch) || ch == ':'
81    }
82
83    fn is_delimited_identifier_start(&self, ch: char) -> bool {
84        // Support backtick-quoted identifiers (Databricks style)
85        // Use official dialect's support, which should include backticks
86        self.official.is_delimited_identifier_start(ch)
87    }
88}
89
90/// Tracks preprocessing transformations applied to SQL
91#[derive(Debug)]
92struct PreprocessingState {
93    /// Maps placeholder table names to original IDENTIFIER() expressions
94    identifier_replacements: HashMap<String, String>,
95}
96
97impl PreprocessingState {
98    fn new() -> Self {
99        Self {
100            identifier_replacements: HashMap::new(),
101        }
102    }
103}
104
105/// SQL Importer - parses CREATE TABLE statements
106pub struct SQLImporter {
107    /// SQL dialect to use for parsing
108    pub dialect: String,
109}
110
111impl Default for SQLImporter {
112    fn default() -> Self {
113        Self {
114            dialect: "generic".to_string(),
115        }
116    }
117}
118
119impl SQLImporter {
120    /// Create a new SQL importer with the specified dialect
121    ///
122    /// # Arguments
123    ///
124    /// * `dialect` - SQL dialect name (see supported dialects below)
125    ///
126    /// # Supported Dialects
127    ///
128    /// - **ansi**: ANSI SQL dialect
129    /// - **bigquery**: Google BigQuery dialect (natively supports STRUCT/ARRAY types)
130    /// - **databricks**: Databricks SQL dialect with support for:
131    ///   - `IDENTIFIER()` function calls in table/view names
132    ///   - Variable references (`:variable_name`) in type definitions, column definitions, and metadata
133    ///   - `STRUCT` and `ARRAY` complex types
134    ///   - `CREATE VIEW` and `CREATE MATERIALIZED VIEW` statements
135    /// - **hive**: Apache Hive dialect (natively supports STRUCT/ARRAY types)
136    /// - **mssql** / **sqlserver**: Microsoft SQL Server dialect
137    /// - **mysql**: MySQL dialect
138    /// - **postgres** / **postgresql**: PostgreSQL dialect
139    /// - **sqlite**: SQLite dialect
140    /// - **generic**: Generic SQL dialect (default)
141    ///
142    /// # Example
143    ///
144    /// ```rust
145    /// use data_modelling_sdk::import::sql::SQLImporter;
146    ///
147    /// // Standard SQL dialect
148    /// let importer = SQLImporter::new("postgres");
149    ///
150    /// // Databricks SQL dialect
151    /// let databricks_importer = SQLImporter::new("databricks");
152    /// ```
153    pub fn new(dialect: &str) -> Self {
154        Self {
155            dialect: dialect.to_string(),
156        }
157    }
158
159    /// Preprocess Databricks SQL to handle IDENTIFIER() expressions
160    ///
161    /// Replaces IDENTIFIER() function calls with placeholder table names
162    /// and tracks the original expressions for later extraction.
163    fn preprocess_identifier_expressions(sql: &str, state: &mut PreprocessingState) -> String {
164        let mut counter = 0;
165
166        RE_IDENTIFIER
167            .replace_all(sql, |caps: &regex::Captures| {
168                let expr = caps.get(1).map(|m| m.as_str()).unwrap_or("");
169                counter += 1;
170                let placeholder = format!("__databricks_table_{}__", counter);
171
172                // Store the mapping for later extraction
173                state
174                    .identifier_replacements
175                    .insert(placeholder.clone(), expr.to_string());
176
177                placeholder
178            })
179            .to_string()
180    }
181
182    /// Extract table name from IDENTIFIER() expression
183    ///
184    /// If the expression contains string literals, extracts and constructs the table name.
185    /// Returns None if expression contains only variables.
186    fn extract_identifier_table_name(expr: &str) -> Option<String> {
187        let mut parts = Vec::new();
188
189        // Extract all string literals
190        for cap in RE_LITERAL.captures_iter(expr) {
191            if let Some(m) = cap.get(1) {
192                parts.push(m.as_str().to_string());
193            } else if let Some(m) = cap.get(2) {
194                parts.push(m.as_str().to_string());
195            }
196        }
197
198        if parts.is_empty() {
199            // No literals found - expression contains only variables
200            return None;
201        }
202
203        // Join literals and remove leading/trailing dots
204        let result = parts.join("");
205        Some(result.trim_matches('.').to_string())
206    }
207
208    /// Preprocess CREATE MATERIALIZED VIEW to CREATE VIEW
209    ///
210    /// sqlparser may not support MATERIALIZED VIEW directly, so we convert it to CREATE VIEW
211    /// This allows parsing to succeed while preserving the intent.
212    fn preprocess_materialized_views(sql: &str) -> String {
213        RE_MATERIALIZED_VIEW
214            .replace_all(sql, "CREATE VIEW")
215            .to_string()
216    }
217
218    /// Preprocess table-level COMMENT clause removal
219    ///
220    /// sqlparser does not support table-level COMMENT clauses, so we remove them before parsing.
221    /// Handles both single-quoted and double-quoted COMMENT strings.
222    /// Table-level COMMENT appears after the closing parenthesis, not inside column definitions.
223    fn preprocess_table_comment(sql: &str) -> String {
224        // Replace with just the closing parenthesis
225        let result = RE_TABLE_COMMENT_SINGLE.replace_all(sql, ")");
226        RE_TABLE_COMMENT_DOUBLE
227            .replace_all(&result, ")")
228            .to_string()
229    }
230
231    /// Preprocess TBLPROPERTIES clause removal
232    ///
233    /// sqlparser does not support TBLPROPERTIES, so we remove it before parsing.
234    /// This preserves the rest of the SQL structure while allowing parsing to succeed.
235    fn preprocess_tblproperties(sql: &str) -> String {
236        // Remove TBLPROPERTIES clause (may span multiple lines)
237        // Pattern matches: TBLPROPERTIES ( ... ) where ... can contain nested parentheses
238        // We need to match balanced parentheses
239        let mut result = sql.to_string();
240
241        // Find all TBLPROPERTIES occurrences and remove them with balanced parentheses
242        let mut search_start = 0;
243        while let Some(m) = RE_TBLPROPERTIES.find_at(&result, search_start) {
244            let start = m.start();
245            let mut pos = m.end();
246            let mut paren_count = 1;
247
248            // Find matching closing parenthesis (using byte positions)
249            let bytes = result.as_bytes();
250            while pos < bytes.len() && paren_count > 0 {
251                if let Some(ch) = result[pos..].chars().next() {
252                    if ch == '(' {
253                        paren_count += 1;
254                    } else if ch == ')' {
255                        paren_count -= 1;
256                    }
257                    pos += ch.len_utf8();
258                } else {
259                    break;
260                }
261            }
262
263            if paren_count == 0 {
264                // Remove TBLPROPERTIES clause including the closing parenthesis
265                result.replace_range(start..pos, "");
266                search_start = start;
267            } else {
268                // Unbalanced parentheses, skip this match
269                search_start = pos;
270            }
271        }
272
273        result
274    }
275
276    /// Preprocess CLUSTER BY clause removal
277    ///
278    /// sqlparser does not support CLUSTER BY, so we remove it before parsing.
279    fn preprocess_cluster_by(sql: &str) -> String {
280        RE_CLUSTER_BY.replace_all(sql, "").to_string()
281    }
282
283    /// Normalize SQL while preserving quoted strings
284    ///
285    /// Converts multiline SQL to single line, but preserves quoted strings
286    /// (both single and double quotes) to avoid breaking COMMENT clauses and other string literals.
287    /// Handles escape sequences: `\'` (escaped quote) and `\\` (escaped backslash).
288    fn normalize_sql_preserving_quotes(sql: &str) -> String {
289        let mut result = String::with_capacity(sql.len());
290        let mut chars = sql.chars().peekable();
291        let mut in_single_quote = false;
292        let mut in_double_quote = false;
293        let mut last_char_was_space = false;
294
295        while let Some(ch) = chars.next() {
296            match ch {
297                '\\' if in_single_quote || in_double_quote => {
298                    // Handle escape sequences inside quoted strings
299                    // \' or \" or \\ - preserve the escape sequence
300                    if let Some(&next_ch) = chars.peek() {
301                        result.push(ch);
302                        result.push(next_ch);
303                        chars.next(); // Consume the escaped character
304                        last_char_was_space = false;
305                    } else {
306                        // Backslash at end of string - just add it
307                        result.push(ch);
308                        last_char_was_space = false;
309                    }
310                }
311                '\'' if !in_double_quote => {
312                    // Check if this is an escaped quote (doubled quotes: '')
313                    // In SQL standard, '' inside a string means a single quote
314                    if in_single_quote && chars.peek() == Some(&'\'') {
315                        // Doubled quote - this is an escaped quote, not the end of string
316                        result.push(ch);
317                        result.push(ch);
318                        chars.next(); // Consume the second quote
319                        last_char_was_space = false;
320                    } else {
321                        // Regular quote - toggle quote state
322                        in_single_quote = !in_single_quote;
323                        result.push(ch);
324                        last_char_was_space = false;
325                    }
326                }
327                '"' if !in_single_quote => {
328                    // Check if this is an escaped quote (doubled quotes: "")
329                    if in_double_quote && chars.peek() == Some(&'"') {
330                        // Doubled quote - this is an escaped quote, not the end of string
331                        result.push(ch);
332                        result.push(ch);
333                        chars.next(); // Consume the second quote
334                        last_char_was_space = false;
335                    } else {
336                        // Regular quote - toggle quote state
337                        in_double_quote = !in_double_quote;
338                        result.push(ch);
339                        last_char_was_space = false;
340                    }
341                }
342                '\n' | '\r' => {
343                    if in_single_quote || in_double_quote {
344                        // Replace newlines inside quoted strings with space
345                        // sqlparser doesn't support multiline string literals
346                        if !last_char_was_space {
347                            result.push(' ');
348                            last_char_was_space = true;
349                        }
350                    } else {
351                        // Replace newlines outside quotes with space
352                        if !last_char_was_space {
353                            result.push(' ');
354                            last_char_was_space = true;
355                        }
356                    }
357                }
358                ' ' | '\t' => {
359                    if in_single_quote || in_double_quote {
360                        // Preserve spaces inside quoted strings
361                        result.push(ch);
362                        last_char_was_space = false;
363                    } else {
364                        // Collapse multiple spaces to single space
365                        if !last_char_was_space {
366                            result.push(' ');
367                            last_char_was_space = true;
368                        }
369                    }
370                }
371                '-' if !in_single_quote && !in_double_quote => {
372                    // Check for SQL comment (--)
373                    if let Some(&'-') = chars.peek() {
374                        // Skip rest of line comment
375                        for c in chars.by_ref() {
376                            if c == '\n' || c == '\r' {
377                                break;
378                            }
379                        }
380                        if !last_char_was_space {
381                            result.push(' ');
382                            last_char_was_space = true;
383                        }
384                    } else {
385                        result.push(ch);
386                        last_char_was_space = false;
387                    }
388                }
389                _ => {
390                    result.push(ch);
391                    last_char_was_space = false;
392                }
393            }
394        }
395
396        result.trim().to_string()
397    }
398
399    /// Convert backslash-escaped quotes to SQL standard doubled quotes
400    ///
401    /// sqlparser doesn't support `\'` escape sequences, so we convert them to `''`
402    /// which is the SQL standard way to escape quotes in string literals.
403    fn convert_backslash_escaped_quotes(sql: &str) -> String {
404        let mut result = String::with_capacity(sql.len());
405        let mut chars = sql.chars().peekable();
406        let mut in_single_quote = false;
407        let mut in_double_quote = false;
408
409        while let Some(ch) = chars.next() {
410            match ch {
411                '\\' if (in_single_quote || in_double_quote) => {
412                    // Handle escape sequences inside quoted strings
413                    if let Some(&next_ch) = chars.peek() {
414                        match next_ch {
415                            '\'' if in_single_quote => {
416                                // Convert \' to '' (SQL standard escaped quote)
417                                result.push_str("''");
418                                chars.next(); // Consume the escaped quote
419                            }
420                            '"' if in_double_quote => {
421                                // Convert \" to "" (SQL standard escaped quote)
422                                result.push_str("\"\"");
423                                chars.next(); // Consume the escaped quote
424                            }
425                            '\\' => {
426                                // Convert \\ to \\ (keep as is, but we need to handle it)
427                                result.push('\\');
428                                result.push('\\');
429                                chars.next(); // Consume the escaped backslash
430                            }
431                            _ => {
432                                // Other escape sequences - preserve as is
433                                result.push(ch);
434                                result.push(next_ch);
435                                chars.next();
436                            }
437                        }
438                    } else {
439                        // Backslash at end - just add it
440                        result.push(ch);
441                    }
442                }
443                '\'' if !in_double_quote => {
444                    in_single_quote = !in_single_quote;
445                    result.push(ch);
446                }
447                '"' if !in_single_quote => {
448                    in_double_quote = !in_double_quote;
449                    result.push(ch);
450                }
451                _ => {
452                    result.push(ch);
453                }
454            }
455        }
456
457        result
458    }
459
460    /// Replace variable references in STRUCT field types with STRING
461    ///
462    /// Handles patterns like STRUCT<field: :variable_type> -> STRUCT<field: STRING>
463    fn replace_variables_in_struct_types(sql: &str) -> String {
464        RE_VARIABLE_TYPE
465            .replace_all(sql, |_caps: &regex::Captures| ": STRING".to_string())
466            .to_string()
467    }
468
469    /// Replace variable references in ARRAY element types with STRING
470    ///
471    /// Handles patterns like ARRAY<:element_type> -> ARRAY<STRING>
472    fn replace_variables_in_array_types(sql: &str) -> String {
473        RE_ARRAY_VARIABLE
474            .replace_all(sql, |_caps: &regex::Captures| "ARRAY<STRING>".to_string())
475            .to_string()
476    }
477
478    /// Replace variable references in column definitions
479    ///
480    /// Handles patterns like `column_name :variable STRING` by removing the variable reference.
481    /// Example: `id :id_var STRING` -> `id STRING`
482    fn replace_variables_in_column_definitions(sql: &str) -> String {
483        RE_FIELD_VARIABLE
484            .replace_all(sql, |caps: &regex::Captures| {
485                let col_name = caps.get(1).map(|m| m.as_str()).unwrap_or("");
486                let type_name = caps.get(2).map(|m| m.as_str()).unwrap_or("");
487                format!("{} {}", col_name, type_name)
488            })
489            .to_string()
490    }
491
492    /// Replace nested variable references recursively
493    ///
494    /// Handles patterns like ARRAY<STRUCT<field: :type>> by applying both
495    /// STRUCT and ARRAY replacements recursively.
496    fn replace_nested_variables(sql: &str) -> String {
497        let mut result = sql.to_string();
498        let mut changed = true;
499        let mut iterations = 0;
500        const MAX_ITERATIONS: usize = 10; // Prevent infinite loops
501
502        // Keep applying replacements until no more changes occur
503        while changed && iterations < MAX_ITERATIONS {
504            let before = result.clone();
505
506            // First replace variables in STRUCT types
507            result = Self::replace_variables_in_struct_types(&result);
508
509            // Then replace variables in ARRAY types
510            result = Self::replace_variables_in_array_types(&result);
511
512            // Check if anything changed
513            changed = before != result;
514            iterations += 1;
515        }
516
517        result
518    }
519
520    /// Extract STRUCT, ARRAY, and MAP column definitions and replace with placeholders
521    ///
522    /// sqlparser doesn't support these complex types, so we need to extract them manually
523    /// and replace with a simple type that can be parsed, then restore the original
524    /// type string after parsing.
525    ///
526    /// Assumes SQL is already normalized (single line, single spaces).
527    fn extract_complex_type_columns(sql: &str) -> (String, Vec<(String, String)>) {
528        let mut column_types = Vec::new();
529        let mut result = sql.to_string();
530
531        // Find all matches and extract the full type
532        let mut matches_to_replace: Vec<(usize, usize, String, String)> = Vec::new();
533
534        for cap in RE_COMPLEX_TYPE.captures_iter(sql) {
535            let col_name = cap.get(1).map(|m| m.as_str()).unwrap_or("");
536            let type_start = cap.get(0).map(|m| m.start()).unwrap_or(0);
537            let struct_or_array = cap.get(2).map(|m| m.as_str()).unwrap_or("");
538
539            // Find the matching closing bracket
540            // Start counting from the '<' in STRUCT<, ARRAY<, or MAP<
541            let bracket_start = type_start + col_name.len() + 1 + struct_or_array.len() - 1; // After "column_name STRUCT<", "column_name ARRAY<", or "column_name MAP<"
542            let mut bracket_count = 0;
543            let mut type_end = bracket_start;
544
545            for (idx, ch) in sql[bracket_start..].char_indices() {
546                let pos = bracket_start + idx;
547                if ch == '<' {
548                    bracket_count += 1;
549                } else if ch == '>' {
550                    bracket_count -= 1;
551                    if bracket_count == 0 {
552                        type_end = pos + 1;
553                        break;
554                    }
555                }
556            }
557
558            if bracket_count == 0 && type_end > type_start {
559                // Extract the full type (STRUCT<...>, ARRAY<...>, or MAP<...>)
560                // Start from after the column name and space
561                let type_start_pos = type_start + col_name.len() + 1;
562                let full_type = sql[type_start_pos..type_end].trim().to_string();
563                matches_to_replace.push((
564                    type_start_pos,
565                    type_end,
566                    col_name.to_string(),
567                    full_type,
568                ));
569            }
570        }
571
572        // Replace matches in reverse order
573        for (start, end, col_name, full_type) in matches_to_replace.iter().rev() {
574            column_types.push((col_name.clone(), full_type.clone()));
575            result.replace_range(*start..*end, "STRING");
576        }
577
578        (result, column_types)
579    }
580
581    /// Parse SQL and extract table definitions
582    ///
583    /// # Arguments
584    ///
585    /// * `sql` - SQL string containing CREATE TABLE, CREATE VIEW, or CREATE MATERIALIZED VIEW statements
586    ///
587    /// # Returns
588    ///
589    /// An `ImportResult` containing extracted tables/views and any parse errors.
590    ///
591    /// # Example - Standard SQL
592    ///
593    /// ```rust
594    /// use data_modelling_sdk::import::sql::SQLImporter;
595    ///
596    /// let importer = SQLImporter::new("postgres");
597    /// let sql = "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(100));";
598    /// let result = importer.parse(sql).unwrap();
599    /// assert_eq!(result.tables.len(), 1);
600    /// ```
601    ///
602    /// # Example - Databricks SQL with IDENTIFIER()
603    ///
604    /// ```rust
605    /// use data_modelling_sdk::import::sql::SQLImporter;
606    ///
607    /// let importer = SQLImporter::new("databricks");
608    /// let sql = "CREATE TABLE IDENTIFIER(:catalog || '.schema.table') (id STRING, name STRING);";
609    /// let result = importer.parse(sql).unwrap();
610    /// assert_eq!(result.tables.len(), 1);
611    /// assert_eq!(result.tables[0].name.as_deref(), Some("schema.table"));
612    /// ```
613    ///
614    /// # Example - Databricks SQL with Views
615    ///
616    /// ```rust
617    /// use data_modelling_sdk::import::sql::SQLImporter;
618    ///
619    /// let importer = SQLImporter::new("databricks");
620    /// let sql = "CREATE VIEW example_view AS SELECT * FROM table1;";
621    /// let result = importer.parse(sql).unwrap();
622    /// assert_eq!(result.tables.len(), 1);
623    /// ```
624    pub fn parse(&self, sql: &str) -> Result<ImportResult> {
625        // Minimal preprocessing: only handle variable replacement and unsupported clauses
626        // For Databricks: let DatabricksDialect try to parse STRUCT/ARRAY first, then restore full type strings
627        // For BigQuery/Hive: can parse STRUCT/ARRAY directly without extraction
628        // For other dialects: extract STRUCT/ARRAY if present (they may not support them)
629        let (preprocessed_sql, preprocessing_state, complex_types) = if self.dialect.to_lowercase()
630            == "databricks"
631        {
632            let mut state = PreprocessingState::new();
633            let mut preprocessed = sql.to_string();
634
635            // Step 1: Replace IDENTIFIER() expressions (needed for variable table names)
636            preprocessed = Self::preprocess_identifier_expressions(&preprocessed, &mut state);
637            // Step 2: Replace variable references in column definitions (e.g., "id :var STRING" -> "id STRING")
638            preprocessed = Self::replace_variables_in_column_definitions(&preprocessed);
639            // Step 3: Replace variable references in type definitions (e.g., STRUCT<field: :type> -> STRUCT<field: STRING>)
640            preprocessed = Self::replace_nested_variables(&preprocessed);
641            // Step 4: Remove unsupported clauses that break parsing
642            preprocessed = Self::preprocess_materialized_views(&preprocessed);
643            preprocessed = Self::preprocess_table_comment(&preprocessed);
644            preprocessed = Self::preprocess_tblproperties(&preprocessed);
645            preprocessed = Self::preprocess_cluster_by(&preprocessed);
646            // Step 5: Normalize SQL (handle multiline) - needed for regex matching
647            let normalized = Self::normalize_sql_preserving_quotes(&preprocessed);
648            // Step 6: Convert backslash-escaped quotes (sqlparser doesn't support \' escape sequences)
649            let normalized = Self::convert_backslash_escaped_quotes(&normalized);
650
651            // Step 7: Try parsing with DatabricksDialect first (without extraction)
652            let dialect = self.dialect_impl();
653            let parse_result = Parser::parse_sql(dialect.as_ref(), &normalized);
654
655            // If parsing fails, extract STRUCT/ARRAY types and try again
656            // Otherwise, extract type strings for restoration (without modifying SQL)
657            let (final_sql, complex_cols) = if parse_result.is_err() {
658                // Parsing failed - extract STRUCT/ARRAY/MAP columns and replace with STRING
659                let (simplified, cols) = Self::extract_complex_type_columns(&normalized);
660                (simplified, cols)
661            } else {
662                // Parsing succeeded - extract type strings for restoration after parsing
663                // We'll restore the full type strings in parse_create_table_with_preprocessing
664                let (_, cols) = Self::extract_complex_type_columns(&normalized);
665                (normalized, cols)
666            };
667
668            (final_sql, state, complex_cols)
669        } else if matches!(self.dialect.to_lowercase().as_str(), "bigquery" | "hive") {
670            // BigQuery/Hive: can parse STRUCT/ARRAY directly, just normalize
671            let normalized = Self::normalize_sql_preserving_quotes(sql);
672            let normalized = Self::convert_backslash_escaped_quotes(&normalized);
673            (normalized, PreprocessingState::new(), Vec::new())
674        } else {
675            // Other dialects: extract STRUCT/ARRAY if present (they may not support them)
676            let normalized = Self::normalize_sql_preserving_quotes(sql);
677            let normalized = Self::convert_backslash_escaped_quotes(&normalized);
678            let (simplified_sql, complex_cols) = Self::extract_complex_type_columns(&normalized);
679            (simplified_sql, PreprocessingState::new(), complex_cols)
680        };
681
682        let dialect = self.dialect_impl();
683        let statements = match Parser::parse_sql(dialect.as_ref(), &preprocessed_sql) {
684            Ok(stmts) => stmts,
685            Err(e) => {
686                return Ok(ImportResult {
687                    tables: Vec::new(),
688                    tables_requiring_name: Vec::new(),
689                    errors: vec![ImportError::ParseError(e.to_string())],
690                    ai_suggestions: None,
691                });
692            }
693        };
694
695        let mut tables = Vec::new();
696        let mut errors = Vec::new();
697        let mut tables_requiring_name = Vec::new();
698
699        for (idx, stmt) in statements.into_iter().enumerate() {
700            match stmt {
701                Statement::CreateTable(create) => {
702                    match self.parse_create_table_with_preprocessing(
703                        idx,
704                        &create.name,
705                        &create.columns,
706                        &create.constraints,
707                        &preprocessing_state,
708                        &complex_types,
709                    ) {
710                        Ok((table, requires_name)) => {
711                            if requires_name {
712                                tables_requiring_name.push(super::TableRequiringName {
713                                    table_index: idx,
714                                    suggested_name: None,
715                                });
716                            }
717                            tables.push(table);
718                        }
719                        Err(e) => errors.push(ImportError::ParseError(e)),
720                    }
721                }
722                Statement::CreateView(create_view) => {
723                    match self.parse_create_view(idx, &create_view.name, &preprocessing_state) {
724                        Ok((table, requires_name)) => {
725                            if requires_name {
726                                tables_requiring_name.push(super::TableRequiringName {
727                                    table_index: idx,
728                                    suggested_name: None,
729                                });
730                            }
731                            tables.push(table);
732                        }
733                        Err(e) => errors.push(ImportError::ParseError(e)),
734                    }
735                }
736                _ => {
737                    // Other statements (INSERT, UPDATE, DELETE, etc.) are ignored.
738                }
739            }
740        }
741
742        Ok(ImportResult {
743            tables,
744            tables_requiring_name,
745            errors,
746            ai_suggestions: None,
747        })
748    }
749
750    /// Parse SQL with Liquibase format support
751    ///
752    /// Strips Liquibase directive comments (--liquibase formatted sql, --changeset, etc.)
753    /// before parsing the SQL.
754    ///
755    /// # Arguments
756    ///
757    /// * `sql` - SQL string with optional Liquibase comments
758    ///
759    /// # Returns
760    ///
761    /// An `ImportResult` containing extracted tables.
762    ///
763    /// # Example
764    ///
765    /// ```rust
766    /// use data_modelling_sdk::import::sql::SQLImporter;
767    ///
768    /// let importer = SQLImporter::new("postgres");
769    /// let sql = r#"
770    /// --liquibase formatted sql
771    /// --changeset user:1
772    /// CREATE TABLE users (id INT);
773    /// "#;
774    /// let result = importer.parse_liquibase(sql).unwrap();
775    /// ```
776    pub fn parse_liquibase(&self, sql: &str) -> Result<ImportResult> {
777        // Liquibase "formatted SQL" is still SQL, but often includes directive comments like:
778        // --liquibase formatted sql
779        // --changeset user:id
780        // We strip those comment lines, then parse the remaining SQL.
781        let cleaned = sql
782            .lines()
783            .filter(|l| {
784                let t = l.trim_start();
785                if !t.starts_with("--") {
786                    return true;
787                }
788                // Keep regular SQL comments? For now, drop all -- lines to avoid parser issues.
789                false
790            })
791            .collect::<Vec<_>>()
792            .join("\n");
793
794        self.parse(&cleaned)
795    }
796
797    fn dialect_impl(&self) -> Box<dyn Dialect + Send + Sync> {
798        match self.dialect.to_lowercase().as_str() {
799            "ansi" => Box::new(AnsiDialect {}),
800            "bigquery" => Box::new(BigQueryDialect {}),
801            "databricks" => Box::new(CustomDatabricksDialect::new()),
802            "hive" => Box::new(HiveDialect {}),
803            "mssql" | "sqlserver" => Box::new(MsSqlDialect {}),
804            "mysql" => Box::new(MySqlDialect {}),
805            "postgres" | "postgresql" => Box::new(PostgreSqlDialect {}),
806            "sqlite" => Box::new(SQLiteDialect {}),
807            _ => Box::new(GenericDialect {}),
808        }
809    }
810
811    /// Strip quote characters from an identifier
812    ///
813    /// Handles various SQL quoting styles:
814    /// - Double quotes: `"identifier"` -> `identifier`
815    /// - Backticks: `` `identifier` `` -> `identifier`
816    /// - Brackets: `[identifier]` -> `identifier`
817    ///
818    /// Also handles escaped quotes within identifiers:
819    /// - `""` -> `"` (PostgreSQL style)
820    /// - ``` `` ``` -> `` ` `` (MySQL style)
821    /// - `]]` -> `]` (SQL Server style)
822    fn unquote_identifier(identifier: &str) -> String {
823        let trimmed = identifier.trim();
824
825        // Check for double quotes
826        if trimmed.starts_with('"') && trimmed.ends_with('"') && trimmed.len() >= 2 {
827            let inner = &trimmed[1..trimmed.len() - 1];
828            return inner.replace("\"\"", "\"");
829        }
830
831        // Check for backticks (MySQL)
832        if trimmed.starts_with('`') && trimmed.ends_with('`') && trimmed.len() >= 2 {
833            let inner = &trimmed[1..trimmed.len() - 1];
834            return inner.replace("``", "`");
835        }
836
837        // Check for brackets (SQL Server)
838        if trimmed.starts_with('[') && trimmed.ends_with(']') && trimmed.len() >= 2 {
839            let inner = &trimmed[1..trimmed.len() - 1];
840            return inner.replace("]]", "]");
841        }
842
843        // No quoting detected, return as-is
844        trimmed.to_string()
845    }
846
847    fn object_name_to_string(name: &ObjectName) -> String {
848        // Use final identifier (supports schema-qualified names).
849        // In sqlparser 0.60, ObjectNamePart might be a different structure
850        // Fall back to to_string() which should work
851        let raw_name = name
852            .0
853            .last()
854            .map(|ident| {
855                // Try to get the identifier value - structure may have changed in 0.60
856                // Use to_string() as fallback
857                ident.to_string()
858            })
859            .unwrap_or_else(|| name.to_string());
860
861        // Strip any quote characters from the identifier
862        Self::unquote_identifier(&raw_name)
863    }
864
865    fn parse_create_table_with_preprocessing(
866        &self,
867        table_index: usize,
868        name: &ObjectName,
869        columns: &[ColumnDef],
870        constraints: &[TableConstraint],
871        preprocessing_state: &PreprocessingState,
872        complex_types: &[(String, String)],
873    ) -> std::result::Result<(TableData, bool), String> {
874        let mut table_name = Self::object_name_to_string(name);
875        let mut requires_name = false;
876
877        // Check if this is a placeholder table name from IDENTIFIER() preprocessing
878        if table_name.starts_with("__databricks_table_")
879            && let Some(original_expr) =
880                preprocessing_state.identifier_replacements.get(&table_name)
881        {
882            // Try to extract table name from the original expression
883            if let Some(extracted_name) = Self::extract_identifier_table_name(original_expr) {
884                table_name = extracted_name;
885            } else {
886                // Expression contains only variables - mark as requiring name
887                requires_name = true;
888            }
889        }
890
891        // Validate table name (warnings are logged but don't fail import)
892        if let Err(e) = validate_table_name(&table_name) {
893            // Log warning but continue - imported SQL may have valid but unusual names
894            tracing::warn!("Table name validation warning: {}", e);
895        }
896
897        // Collect PK columns from table-level constraints.
898        let mut pk_cols = std::collections::HashSet::<String>::new();
899        for c in constraints {
900            if let TableConstraint::PrimaryKey(pk_constraint) = c {
901                for col in &pk_constraint.columns {
902                    // In sqlparser 0.60, IndexColumn structure may have changed
903                    // Try to get the column name - might be a field or method
904                    // Unquote the column name to match against column definitions
905                    pk_cols.insert(Self::unquote_identifier(&col.to_string()));
906                }
907            }
908        }
909
910        let mut out_cols = Vec::new();
911        for col in columns {
912            let mut nullable = true;
913            let mut is_pk = false;
914
915            for opt_def in &col.options {
916                match &opt_def.option {
917                    ColumnOption::NotNull => nullable = false,
918                    ColumnOption::Null => nullable = true,
919                    ColumnOption::Unique(_) => {
920                        // UNIQUE constraint (not primary key)
921                    }
922                    ColumnOption::PrimaryKey(_) => {
923                        // In sqlparser 0.60, PRIMARY KEY is a separate variant
924                        is_pk = true;
925                    }
926                    _ => {}
927                }
928            }
929
930            let col_name = Self::unquote_identifier(&col.name.value);
931
932            if pk_cols.contains(&col_name) {
933                is_pk = true;
934            }
935            let mut data_type = col.data_type.to_string();
936            let mut description = None;
937
938            // Extract COMMENT clause from column options
939            for opt_def in &col.options {
940                if let ColumnOption::Comment(comment) = &opt_def.option {
941                    description = Some(comment.clone());
942                }
943            }
944
945            // Restore complex types (STRUCT/ARRAY/MAP) if this column was extracted
946            // Keep the full type string - we'll simplify it when converting to Column model
947            if let Some((_, original_type)) =
948                complex_types.iter().find(|(name, _)| name == &col_name)
949            {
950                data_type = original_type.clone();
951            }
952
953            // Validate column name and data type (warnings are logged but don't fail import)
954            if let Err(e) = validate_column_name(&col_name) {
955                tracing::warn!("Column name validation warning for '{}': {}", col_name, e);
956            }
957            if let Err(e) = validate_data_type(&data_type) {
958                tracing::warn!("Data type validation warning for '{}': {}", data_type, e);
959            }
960
961            out_cols.push(ColumnData {
962                name: col_name,
963                data_type,
964                nullable,
965                primary_key: is_pk,
966                description,
967                ..Default::default()
968            });
969        }
970
971        Ok((
972            TableData {
973                table_index,
974                name: Some(table_name),
975                columns: out_cols,
976            },
977            requires_name,
978        ))
979    }
980
981    /// Parse CREATE VIEW statement
982    ///
983    /// Extracts view name and creates a TableData entry for the view.
984    /// Views are treated as table-like entities for data modeling purposes.
985    fn parse_create_view(
986        &self,
987        view_index: usize,
988        name: &ObjectName,
989        preprocessing_state: &PreprocessingState,
990    ) -> std::result::Result<(TableData, bool), String> {
991        let mut view_name = Self::object_name_to_string(name);
992        let mut requires_name = false;
993
994        // Check if this is a placeholder view name from IDENTIFIER() preprocessing
995        if view_name.starts_with("__databricks_table_")
996            && let Some(original_expr) = preprocessing_state.identifier_replacements.get(&view_name)
997        {
998            // Try to extract view name from the original expression
999            if let Some(extracted_name) = Self::extract_identifier_table_name(original_expr) {
1000                view_name = extracted_name;
1001            } else {
1002                // Expression contains only variables - mark as requiring name
1003                requires_name = true;
1004            }
1005        }
1006
1007        // Validate view name
1008        if let Err(e) = validate_table_name(&view_name) {
1009            tracing::warn!("View name validation warning: {}", e);
1010        }
1011
1012        // Views don't have explicit column definitions in CREATE VIEW statements
1013        // The columns are derived from the SELECT query, which we don't parse here
1014        // So we create a view with empty columns - the actual columns would need
1015        // to be extracted from the query if needed in the future
1016        Ok((
1017            TableData {
1018                table_index: view_index,
1019                name: Some(view_name),
1020                columns: Vec::new(), // Views don't have explicit column definitions
1021            },
1022            requires_name,
1023        ))
1024    }
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029    use super::*;
1030
1031    #[test]
1032    fn test_sql_importer_default() {
1033        let importer = SQLImporter::default();
1034        assert_eq!(importer.dialect, "generic");
1035    }
1036
1037    #[test]
1038    fn test_sql_importer_parse_basic() {
1039        let importer = SQLImporter::new("postgres");
1040        let result = importer
1041            .parse("CREATE TABLE test (id INT PRIMARY KEY, name TEXT NOT NULL);")
1042            .unwrap();
1043        assert!(result.errors.is_empty());
1044        assert_eq!(result.tables.len(), 1);
1045        let t = &result.tables[0];
1046        assert_eq!(t.name.as_deref(), Some("test"));
1047        assert_eq!(t.columns.len(), 2);
1048        assert!(t.columns.iter().any(|c| c.name == "id" && c.primary_key));
1049        assert!(t.columns.iter().any(|c| c.name == "name" && !c.nullable));
1050    }
1051
1052    #[test]
1053    fn test_sql_importer_parse_table_pk_constraint() {
1054        let importer = SQLImporter::new("postgres");
1055        let result = importer
1056            .parse("CREATE TABLE t (id INT, name TEXT, CONSTRAINT pk PRIMARY KEY (id));")
1057            .unwrap();
1058        assert!(result.errors.is_empty());
1059        assert_eq!(result.tables.len(), 1);
1060        let t = &result.tables[0];
1061        assert!(t.columns.iter().any(|c| c.name == "id" && c.primary_key));
1062    }
1063
1064    #[test]
1065    fn test_sql_importer_parse_liquibase_formatted_sql() {
1066        let importer = SQLImporter::new("postgres");
1067        let result = importer
1068            .parse_liquibase(
1069                "--liquibase formatted sql\n--changeset user:1\nCREATE TABLE test (id INT);\n",
1070            )
1071            .unwrap();
1072        assert!(result.errors.is_empty());
1073        assert_eq!(result.tables.len(), 1);
1074    }
1075
1076    #[test]
1077    fn test_databricks_identifier_with_literal() {
1078        let importer = SQLImporter::new("databricks");
1079        let sql = "CREATE TABLE IDENTIFIER('test_table') (id STRING);";
1080        let result = importer.parse(sql).unwrap();
1081        assert!(result.errors.is_empty());
1082        assert_eq!(result.tables.len(), 1);
1083        assert_eq!(result.tables[0].name.as_deref(), Some("test_table"));
1084    }
1085
1086    #[test]
1087    fn test_databricks_identifier_with_variable() {
1088        let importer = SQLImporter::new("databricks");
1089        let sql = "CREATE TABLE IDENTIFIER(:table_name) (id STRING);";
1090        let result = importer.parse(sql).unwrap();
1091        // Should create placeholder table name and add to tables_requiring_name
1092        assert_eq!(result.tables.len(), 1);
1093        assert!(
1094            result.tables[0]
1095                .name
1096                .as_deref()
1097                .unwrap()
1098                .starts_with("__databricks_table_")
1099        );
1100        assert_eq!(result.tables_requiring_name.len(), 1);
1101    }
1102
1103    #[test]
1104    fn test_databricks_identifier_with_concatenation() {
1105        let importer = SQLImporter::new("databricks");
1106        let sql = "CREATE TABLE IDENTIFIER(:catalog || '.schema.table') (id STRING);";
1107        let result = importer.parse(sql).unwrap();
1108        assert!(result.errors.is_empty());
1109        assert_eq!(result.tables.len(), 1);
1110        // Should extract table name from concatenation
1111        assert_eq!(result.tables[0].name.as_deref(), Some("schema.table"));
1112    }
1113
1114    #[test]
1115    fn test_databricks_variable_in_struct() {
1116        let importer = SQLImporter::new("databricks");
1117        let sql = "CREATE TABLE example (metadata STRUCT<key: STRING, value: :variable_type, timestamp: TIMESTAMP>);";
1118        let result = importer.parse(sql).unwrap();
1119        if !result.errors.is_empty() {
1120            eprintln!("Parse errors: {:?}", result.errors);
1121        }
1122        assert!(result.errors.is_empty());
1123        assert_eq!(result.tables.len(), 1);
1124        // Variable should be replaced with STRING
1125        assert!(
1126            result.tables[0].columns[0]
1127                .data_type
1128                .contains("value: STRING")
1129        );
1130    }
1131
1132    #[test]
1133    fn test_databricks_variable_in_array() {
1134        let importer = SQLImporter::new("databricks");
1135        let sql = "CREATE TABLE example (items ARRAY<:element_type>);";
1136        let result = importer.parse(sql).unwrap();
1137        assert!(result.errors.is_empty());
1138        assert_eq!(result.tables.len(), 1);
1139        // Variable should be replaced with STRING
1140        assert_eq!(result.tables[0].columns[0].data_type, "ARRAY<STRING>");
1141    }
1142
1143    #[test]
1144    fn test_databricks_nested_variables() {
1145        let importer = SQLImporter::new("databricks");
1146        let sql = "CREATE TABLE example (events ARRAY<STRUCT<id: STRING, name: STRING, details: STRUCT<name: STRING, status: :variable_type, timestamp: TIMESTAMP>>>);";
1147        let result = importer.parse(sql).unwrap();
1148        if !result.errors.is_empty() {
1149            eprintln!("Parse errors: {:?}", result.errors);
1150        }
1151        assert!(result.errors.is_empty());
1152        assert_eq!(result.tables.len(), 1);
1153        // Nested variable should be replaced with STRING
1154        assert!(
1155            result.tables[0].columns[0]
1156                .data_type
1157                .contains("status: STRING")
1158        );
1159    }
1160
1161    #[test]
1162    fn test_databricks_comment_variable() {
1163        let importer = SQLImporter::new("databricks");
1164        let sql = "CREATE TABLE example (id STRING) COMMENT ':comment_variable';";
1165        let result = importer.parse(sql).unwrap();
1166        assert!(result.errors.is_empty());
1167        assert_eq!(result.tables.len(), 1);
1168    }
1169
1170    #[test]
1171    fn test_databricks_tblproperties_variable() {
1172        let importer = SQLImporter::new("databricks");
1173        let sql = "CREATE TABLE example (id STRING) TBLPROPERTIES ('key1' = ':variable_value', 'key2' = 'static_value');";
1174        let result = importer.parse(sql).unwrap();
1175        assert!(result.errors.is_empty());
1176        assert_eq!(result.tables.len(), 1);
1177    }
1178
1179    #[test]
1180    fn test_databricks_column_variable() {
1181        let importer = SQLImporter::new("databricks");
1182        // Test column definition with variable reference like "column_name :variable STRING"
1183        // This pattern may need preprocessing to remove the variable
1184        let sql = "CREATE TABLE example (id :id_var STRING, name :name_var STRING);";
1185        let result = importer.parse(sql).unwrap();
1186        assert!(result.errors.is_empty());
1187        assert_eq!(result.tables.len(), 1);
1188        assert_eq!(result.tables[0].columns.len(), 2);
1189    }
1190
1191    #[test]
1192    fn test_databricks_create_view() {
1193        let importer = SQLImporter::new("databricks");
1194        let sql = "CREATE VIEW example_view AS SELECT id, name FROM source_table;";
1195        let result = importer.parse(sql).unwrap();
1196        // Views should be imported as table-like entities
1197        assert!(result.errors.is_empty());
1198        assert_eq!(result.tables.len(), 1);
1199        assert_eq!(result.tables[0].name.as_deref(), Some("example_view"));
1200    }
1201
1202    #[test]
1203    fn test_databricks_view_with_identifier() {
1204        let importer = SQLImporter::new("databricks");
1205        let sql =
1206            "CREATE VIEW IDENTIFIER(:catalog || '.schema.view_name') AS SELECT * FROM table1;";
1207        let result = importer.parse(sql).unwrap();
1208        assert!(result.errors.is_empty());
1209        assert_eq!(result.tables.len(), 1);
1210        // Should extract view name from IDENTIFIER() expression
1211        assert_eq!(result.tables[0].name.as_deref(), Some("schema.view_name"));
1212    }
1213
1214    #[test]
1215    fn test_databricks_create_materialized_view() {
1216        let importer = SQLImporter::new("databricks");
1217        // MATERIALIZED VIEW is preprocessed to CREATE VIEW for sqlparser compatibility
1218        let sql = "CREATE MATERIALIZED VIEW mv_example AS SELECT id, name FROM source_table;";
1219        let result = importer.parse(sql).unwrap();
1220        assert!(result.errors.is_empty());
1221        assert_eq!(result.tables.len(), 1);
1222        assert_eq!(result.tables[0].name.as_deref(), Some("mv_example"));
1223    }
1224}