data_modelling_sdk/import/
sql.rs

1//! SQL Import functionality
2//!
3//! Provides parsing of CREATE TABLE statements from various SQL dialects.
4//!
5//! Uses `sqlparser` to parse CREATE TABLE statements into SDK import primitives.
6//!
7//! # Validation
8//!
9//! All imported table and column names are validated for:
10//! - Valid identifier format
11//! - Maximum length limits
12//! - SQL reserved word detection
13
14use super::{ColumnData, ImportError, ImportResult, TableData};
15use crate::validation::input::{validate_column_name, validate_data_type, validate_table_name};
16use anyhow::Result;
17use once_cell::sync::Lazy;
18use regex::Regex;
19use sqlparser::ast::{ColumnDef, ColumnOption, ObjectName, Statement, TableConstraint};
20use sqlparser::dialect::{
21    AnsiDialect, BigQueryDialect, DatabricksDialect as OfficialDatabricksDialect, Dialect,
22    GenericDialect, HiveDialect, MsSqlDialect, MySqlDialect, PostgreSqlDialect, SQLiteDialect,
23};
24use sqlparser::parser::Parser;
25use std::collections::HashMap;
26
27// Static regex patterns compiled once for performance
28static RE_IDENTIFIER: Lazy<Regex> =
29    Lazy::new(|| Regex::new(r"(?i)IDENTIFIER\s*\(\s*([^)]+)\s*\)").expect("Invalid regex"));
30static RE_LITERAL: Lazy<Regex> =
31    Lazy::new(|| Regex::new(r#"(?:'([^']*)'|"([^"]*)")"#).expect("Invalid regex"));
32static RE_MATERIALIZED_VIEW: Lazy<Regex> =
33    Lazy::new(|| Regex::new(r"(?i)CREATE\s+MATERIALIZED\s+VIEW").expect("Invalid regex"));
34static RE_TABLE_COMMENT_SINGLE: Lazy<Regex> =
35    Lazy::new(|| Regex::new(r#"(?i)\)\s+COMMENT\s+'[^']*'"#).expect("Invalid regex"));
36static RE_TABLE_COMMENT_DOUBLE: Lazy<Regex> =
37    Lazy::new(|| Regex::new(r#"(?i)\)\s+COMMENT\s+"[^"]*""#).expect("Invalid regex"));
38static RE_TBLPROPERTIES: Lazy<Regex> =
39    Lazy::new(|| Regex::new(r"(?i)TBLPROPERTIES\s*\(").expect("Invalid regex"));
40static RE_CLUSTER_BY: Lazy<Regex> = Lazy::new(|| {
41    Regex::new(r"(?i)\s+CLUSTER\s+BY\s+(?:AUTO|\([^)]*\)|[\w,\s]+)").expect("Invalid regex")
42});
43// USING clause (e.g., USING DELTA, USING PARQUET, USING CSV, etc.)
44static RE_USING_FORMAT: Lazy<Regex> = Lazy::new(|| {
45    Regex::new(r"(?i)\s+USING\s+(?:DELTA|PARQUET|CSV|JSON|ORC|AVRO|TEXT|BINARYFILE|JDBC|ICEBERG)\b")
46        .expect("Invalid regex")
47});
48static RE_VARIABLE_TYPE: Lazy<Regex> =
49    Lazy::new(|| Regex::new(r":\s*:([a-zA-Z_][a-zA-Z0-9_]*)").expect("Invalid regex"));
50static RE_ARRAY_VARIABLE: Lazy<Regex> =
51    Lazy::new(|| Regex::new(r"ARRAY\s*<\s*:([a-zA-Z_][a-zA-Z0-9_]*)\s*>").expect("Invalid regex"));
52static RE_FIELD_VARIABLE: Lazy<Regex> = Lazy::new(|| {
53    Regex::new(r"(\w+)\s+:\w+\s+([A-Z][A-Z0-9_]*(?:<[^>]*>)?)").expect("Invalid regex")
54});
55static RE_COMPLEX_TYPE: Lazy<Regex> =
56    Lazy::new(|| Regex::new(r"(\w+)\s+(STRUCT<|ARRAY<|MAP<)").expect("Invalid regex"));
57
58/// Custom Databricks SQL dialect implementation
59///
60/// Extends the official DatabricksDialect to support additional Databricks-specific syntax patterns:
61/// - Variable references (`:variable_name`) in identifiers
62/// - Enhanced backtick-quoted identifiers
63#[derive(Debug)]
64struct CustomDatabricksDialect {
65    official: OfficialDatabricksDialect,
66}
67
68impl CustomDatabricksDialect {
69    fn new() -> Self {
70        Self {
71            official: OfficialDatabricksDialect {},
72        }
73    }
74}
75
76impl Dialect for CustomDatabricksDialect {
77    fn is_identifier_start(&self, ch: char) -> bool {
78        // Allow ':' as identifier start for variable references like :variable_name
79        // Delegate to official dialect first, then add our custom support
80        self.official.is_identifier_start(ch) || ch == ':'
81    }
82
83    fn is_identifier_part(&self, ch: char) -> bool {
84        // Allow ':' as identifier part for variable references
85        self.official.is_identifier_part(ch) || ch == ':'
86    }
87
88    fn is_delimited_identifier_start(&self, ch: char) -> bool {
89        // Support backtick-quoted identifiers (Databricks style)
90        // Use official dialect's support, which should include backticks
91        self.official.is_delimited_identifier_start(ch)
92    }
93}
94
95/// Tracks preprocessing transformations applied to SQL
96#[derive(Debug)]
97struct PreprocessingState {
98    /// Maps placeholder table names to original IDENTIFIER() expressions
99    identifier_replacements: HashMap<String, String>,
100}
101
102impl PreprocessingState {
103    fn new() -> Self {
104        Self {
105            identifier_replacements: HashMap::new(),
106        }
107    }
108}
109
110/// SQL Importer - parses CREATE TABLE statements
111pub struct SQLImporter {
112    /// SQL dialect to use for parsing
113    pub dialect: String,
114}
115
116impl Default for SQLImporter {
117    fn default() -> Self {
118        Self {
119            dialect: "generic".to_string(),
120        }
121    }
122}
123
124impl SQLImporter {
125    /// Create a new SQL importer with the specified dialect
126    ///
127    /// # Arguments
128    ///
129    /// * `dialect` - SQL dialect name (see supported dialects below)
130    ///
131    /// # Supported Dialects
132    ///
133    /// - **ansi**: ANSI SQL dialect
134    /// - **bigquery**: Google BigQuery dialect (natively supports STRUCT/ARRAY types)
135    /// - **databricks**: Databricks SQL dialect with support for:
136    ///   - `IDENTIFIER()` function calls in table/view names
137    ///   - Variable references (`:variable_name`) in type definitions, column definitions, and metadata
138    ///   - `STRUCT` and `ARRAY` complex types
139    ///   - `CREATE VIEW` and `CREATE MATERIALIZED VIEW` statements
140    /// - **hive**: Apache Hive dialect (natively supports STRUCT/ARRAY types)
141    /// - **mssql** / **sqlserver**: Microsoft SQL Server dialect
142    /// - **mysql**: MySQL dialect
143    /// - **postgres** / **postgresql**: PostgreSQL dialect
144    /// - **sqlite**: SQLite dialect
145    /// - **generic**: Generic SQL dialect (default)
146    ///
147    /// # Example
148    ///
149    /// ```rust
150    /// use data_modelling_sdk::import::sql::SQLImporter;
151    ///
152    /// // Standard SQL dialect
153    /// let importer = SQLImporter::new("postgres");
154    ///
155    /// // Databricks SQL dialect
156    /// let databricks_importer = SQLImporter::new("databricks");
157    /// ```
158    pub fn new(dialect: &str) -> Self {
159        Self {
160            dialect: dialect.to_string(),
161        }
162    }
163
164    /// Preprocess Databricks SQL to handle IDENTIFIER() expressions
165    ///
166    /// Replaces IDENTIFIER() function calls with placeholder table names
167    /// and tracks the original expressions for later extraction.
168    fn preprocess_identifier_expressions(sql: &str, state: &mut PreprocessingState) -> String {
169        let mut counter = 0;
170
171        RE_IDENTIFIER
172            .replace_all(sql, |caps: &regex::Captures| {
173                let expr = caps.get(1).map(|m| m.as_str()).unwrap_or("");
174                counter += 1;
175                let placeholder = format!("__databricks_table_{}__", counter);
176
177                // Store the mapping for later extraction
178                state
179                    .identifier_replacements
180                    .insert(placeholder.clone(), expr.to_string());
181
182                placeholder
183            })
184            .to_string()
185    }
186
187    /// Extract table name from IDENTIFIER() expression
188    ///
189    /// If the expression contains string literals, extracts and constructs the table name.
190    /// Returns None if expression contains only variables.
191    fn extract_identifier_table_name(expr: &str) -> Option<String> {
192        let mut parts = Vec::new();
193
194        // Extract all string literals
195        for cap in RE_LITERAL.captures_iter(expr) {
196            if let Some(m) = cap.get(1) {
197                parts.push(m.as_str().to_string());
198            } else if let Some(m) = cap.get(2) {
199                parts.push(m.as_str().to_string());
200            }
201        }
202
203        if parts.is_empty() {
204            // No literals found - expression contains only variables
205            return None;
206        }
207
208        // Join literals and remove leading/trailing dots
209        let result = parts.join("");
210        Some(result.trim_matches('.').to_string())
211    }
212
213    /// Preprocess CREATE MATERIALIZED VIEW to CREATE VIEW
214    ///
215    /// sqlparser may not support MATERIALIZED VIEW directly, so we convert it to CREATE VIEW
216    /// This allows parsing to succeed while preserving the intent.
217    fn preprocess_materialized_views(sql: &str) -> String {
218        RE_MATERIALIZED_VIEW
219            .replace_all(sql, "CREATE VIEW")
220            .to_string()
221    }
222
223    /// Preprocess table-level COMMENT clause removal
224    ///
225    /// sqlparser does not support table-level COMMENT clauses, so we remove them before parsing.
226    /// Handles both single-quoted and double-quoted COMMENT strings.
227    /// Table-level COMMENT appears after the closing parenthesis, not inside column definitions.
228    fn preprocess_table_comment(sql: &str) -> String {
229        // Replace with just the closing parenthesis
230        let result = RE_TABLE_COMMENT_SINGLE.replace_all(sql, ")");
231        RE_TABLE_COMMENT_DOUBLE
232            .replace_all(&result, ")")
233            .to_string()
234    }
235
236    /// Preprocess TBLPROPERTIES clause removal
237    ///
238    /// sqlparser does not support TBLPROPERTIES, so we remove it before parsing.
239    /// This preserves the rest of the SQL structure while allowing parsing to succeed.
240    fn preprocess_tblproperties(sql: &str) -> String {
241        // Remove TBLPROPERTIES clause (may span multiple lines)
242        // Pattern matches: TBLPROPERTIES ( ... ) where ... can contain nested parentheses
243        // We need to match balanced parentheses
244        let mut result = sql.to_string();
245
246        // Find all TBLPROPERTIES occurrences and remove them with balanced parentheses
247        let mut search_start = 0;
248        while let Some(m) = RE_TBLPROPERTIES.find_at(&result, search_start) {
249            let start = m.start();
250            let mut pos = m.end();
251            let mut paren_count = 1;
252
253            // Find matching closing parenthesis (using byte positions)
254            let bytes = result.as_bytes();
255            while pos < bytes.len() && paren_count > 0 {
256                if let Some(ch) = result[pos..].chars().next() {
257                    if ch == '(' {
258                        paren_count += 1;
259                    } else if ch == ')' {
260                        paren_count -= 1;
261                    }
262                    pos += ch.len_utf8();
263                } else {
264                    break;
265                }
266            }
267
268            if paren_count == 0 {
269                // Remove TBLPROPERTIES clause including the closing parenthesis
270                result.replace_range(start..pos, "");
271                search_start = start;
272            } else {
273                // Unbalanced parentheses, skip this match
274                search_start = pos;
275            }
276        }
277
278        result
279    }
280
281    /// Preprocess CLUSTER BY clause removal
282    ///
283    /// sqlparser does not support CLUSTER BY, so we remove it before parsing.
284    fn preprocess_cluster_by(sql: &str) -> String {
285        RE_CLUSTER_BY.replace_all(sql, "").to_string()
286    }
287
288    /// Preprocess USING format clause removal
289    ///
290    /// Databricks supports USING DELTA, USING PARQUET, etc. which sqlparser doesn't support.
291    /// We remove these clauses before parsing as they don't affect the table structure.
292    fn preprocess_using_format(sql: &str) -> String {
293        RE_USING_FORMAT.replace_all(sql, "").to_string()
294    }
295
296    /// Normalize SQL while preserving quoted strings
297    ///
298    /// Converts multiline SQL to single line, but preserves quoted strings
299    /// (both single and double quotes) to avoid breaking COMMENT clauses and other string literals.
300    /// Handles escape sequences: `\'` (escaped quote) and `\\` (escaped backslash).
301    fn normalize_sql_preserving_quotes(sql: &str) -> String {
302        let mut result = String::with_capacity(sql.len());
303        let mut chars = sql.chars().peekable();
304        let mut in_single_quote = false;
305        let mut in_double_quote = false;
306        let mut last_char_was_space = false;
307
308        while let Some(ch) = chars.next() {
309            match ch {
310                '\\' if in_single_quote || in_double_quote => {
311                    // Handle escape sequences inside quoted strings
312                    // \' or \" or \\ - preserve the escape sequence
313                    if let Some(&next_ch) = chars.peek() {
314                        result.push(ch);
315                        result.push(next_ch);
316                        chars.next(); // Consume the escaped character
317                        last_char_was_space = false;
318                    } else {
319                        // Backslash at end of string - just add it
320                        result.push(ch);
321                        last_char_was_space = false;
322                    }
323                }
324                '\'' if !in_double_quote => {
325                    // Check if this is an escaped quote (doubled quotes: '')
326                    // In SQL standard, '' inside a string means a single quote
327                    if in_single_quote && chars.peek() == Some(&'\'') {
328                        // Doubled quote - this is an escaped quote, not the end of string
329                        result.push(ch);
330                        result.push(ch);
331                        chars.next(); // Consume the second quote
332                        last_char_was_space = false;
333                    } else {
334                        // Regular quote - toggle quote state
335                        in_single_quote = !in_single_quote;
336                        result.push(ch);
337                        last_char_was_space = false;
338                    }
339                }
340                '"' if !in_single_quote => {
341                    // Check if this is an escaped quote (doubled quotes: "")
342                    if in_double_quote && chars.peek() == Some(&'"') {
343                        // Doubled quote - this is an escaped quote, not the end of string
344                        result.push(ch);
345                        result.push(ch);
346                        chars.next(); // Consume the second quote
347                        last_char_was_space = false;
348                    } else {
349                        // Regular quote - toggle quote state
350                        in_double_quote = !in_double_quote;
351                        result.push(ch);
352                        last_char_was_space = false;
353                    }
354                }
355                '\n' | '\r' => {
356                    if in_single_quote || in_double_quote {
357                        // Replace newlines inside quoted strings with space
358                        // sqlparser doesn't support multiline string literals
359                        if !last_char_was_space {
360                            result.push(' ');
361                            last_char_was_space = true;
362                        }
363                    } else {
364                        // Replace newlines outside quotes with space
365                        if !last_char_was_space {
366                            result.push(' ');
367                            last_char_was_space = true;
368                        }
369                    }
370                }
371                ' ' | '\t' => {
372                    if in_single_quote || in_double_quote {
373                        // Preserve spaces inside quoted strings
374                        result.push(ch);
375                        last_char_was_space = false;
376                    } else {
377                        // Collapse multiple spaces to single space
378                        if !last_char_was_space {
379                            result.push(' ');
380                            last_char_was_space = true;
381                        }
382                    }
383                }
384                '-' if !in_single_quote && !in_double_quote => {
385                    // Check for SQL comment (--)
386                    if let Some(&'-') = chars.peek() {
387                        // Skip rest of line comment
388                        for c in chars.by_ref() {
389                            if c == '\n' || c == '\r' {
390                                break;
391                            }
392                        }
393                        if !last_char_was_space {
394                            result.push(' ');
395                            last_char_was_space = true;
396                        }
397                    } else {
398                        result.push(ch);
399                        last_char_was_space = false;
400                    }
401                }
402                _ => {
403                    result.push(ch);
404                    last_char_was_space = false;
405                }
406            }
407        }
408
409        result.trim().to_string()
410    }
411
412    /// Convert backslash-escaped quotes to SQL standard doubled quotes
413    ///
414    /// sqlparser doesn't support `\'` escape sequences, so we convert them to `''`
415    /// which is the SQL standard way to escape quotes in string literals.
416    fn convert_backslash_escaped_quotes(sql: &str) -> String {
417        let mut result = String::with_capacity(sql.len());
418        let mut chars = sql.chars().peekable();
419        let mut in_single_quote = false;
420        let mut in_double_quote = false;
421
422        while let Some(ch) = chars.next() {
423            match ch {
424                '\\' if (in_single_quote || in_double_quote) => {
425                    // Handle escape sequences inside quoted strings
426                    if let Some(&next_ch) = chars.peek() {
427                        match next_ch {
428                            '\'' if in_single_quote => {
429                                // Convert \' to '' (SQL standard escaped quote)
430                                result.push_str("''");
431                                chars.next(); // Consume the escaped quote
432                            }
433                            '"' if in_double_quote => {
434                                // Convert \" to "" (SQL standard escaped quote)
435                                result.push_str("\"\"");
436                                chars.next(); // Consume the escaped quote
437                            }
438                            '\\' => {
439                                // Convert \\ to \\ (keep as is, but we need to handle it)
440                                result.push('\\');
441                                result.push('\\');
442                                chars.next(); // Consume the escaped backslash
443                            }
444                            _ => {
445                                // Other escape sequences - preserve as is
446                                result.push(ch);
447                                result.push(next_ch);
448                                chars.next();
449                            }
450                        }
451                    } else {
452                        // Backslash at end - just add it
453                        result.push(ch);
454                    }
455                }
456                '\'' if !in_double_quote => {
457                    in_single_quote = !in_single_quote;
458                    result.push(ch);
459                }
460                '"' if !in_single_quote => {
461                    in_double_quote = !in_double_quote;
462                    result.push(ch);
463                }
464                _ => {
465                    result.push(ch);
466                }
467            }
468        }
469
470        result
471    }
472
473    /// Replace variable references in STRUCT field types with STRING
474    ///
475    /// Handles patterns like STRUCT<field: :variable_type> -> STRUCT<field: STRING>
476    fn replace_variables_in_struct_types(sql: &str) -> String {
477        RE_VARIABLE_TYPE
478            .replace_all(sql, |_caps: &regex::Captures| ": STRING".to_string())
479            .to_string()
480    }
481
482    /// Replace variable references in ARRAY element types with STRING
483    ///
484    /// Handles patterns like ARRAY<:element_type> -> ARRAY<STRING>
485    fn replace_variables_in_array_types(sql: &str) -> String {
486        RE_ARRAY_VARIABLE
487            .replace_all(sql, |_caps: &regex::Captures| "ARRAY<STRING>".to_string())
488            .to_string()
489    }
490
491    /// Replace variable references in column definitions
492    ///
493    /// Handles patterns like `column_name :variable STRING` by removing the variable reference.
494    /// Example: `id :id_var STRING` -> `id STRING`
495    fn replace_variables_in_column_definitions(sql: &str) -> String {
496        RE_FIELD_VARIABLE
497            .replace_all(sql, |caps: &regex::Captures| {
498                let col_name = caps.get(1).map(|m| m.as_str()).unwrap_or("");
499                let type_name = caps.get(2).map(|m| m.as_str()).unwrap_or("");
500                format!("{} {}", col_name, type_name)
501            })
502            .to_string()
503    }
504
505    /// Replace nested variable references recursively
506    ///
507    /// Handles patterns like ARRAY<STRUCT<field: :type>> by applying both
508    /// STRUCT and ARRAY replacements recursively.
509    fn replace_nested_variables(sql: &str) -> String {
510        let mut result = sql.to_string();
511        let mut changed = true;
512        let mut iterations = 0;
513        const MAX_ITERATIONS: usize = 10; // Prevent infinite loops
514
515        // Keep applying replacements until no more changes occur
516        while changed && iterations < MAX_ITERATIONS {
517            let before = result.clone();
518
519            // First replace variables in STRUCT types
520            result = Self::replace_variables_in_struct_types(&result);
521
522            // Then replace variables in ARRAY types
523            result = Self::replace_variables_in_array_types(&result);
524
525            // Check if anything changed
526            changed = before != result;
527            iterations += 1;
528        }
529
530        result
531    }
532
533    /// Extract STRUCT, ARRAY, and MAP column definitions and replace with placeholders
534    ///
535    /// sqlparser doesn't support these complex types, so we need to extract them manually
536    /// and replace with a simple type that can be parsed, then restore the original
537    /// type string after parsing.
538    ///
539    /// Assumes SQL is already normalized (single line, single spaces).
540    fn extract_complex_type_columns(sql: &str) -> (String, Vec<(String, String)>) {
541        let mut column_types = Vec::new();
542        let mut result = sql.to_string();
543
544        // Find all matches and extract the full type
545        let mut matches_to_replace: Vec<(usize, usize, String, String)> = Vec::new();
546
547        for cap in RE_COMPLEX_TYPE.captures_iter(sql) {
548            let col_name = cap.get(1).map(|m| m.as_str()).unwrap_or("");
549            let type_start = cap.get(0).map(|m| m.start()).unwrap_or(0);
550            let struct_or_array = cap.get(2).map(|m| m.as_str()).unwrap_or("");
551
552            // Find the matching closing bracket
553            // Start counting from the '<' in STRUCT<, ARRAY<, or MAP<
554            let bracket_start = type_start + col_name.len() + 1 + struct_or_array.len() - 1; // After "column_name STRUCT<", "column_name ARRAY<", or "column_name MAP<"
555            let mut bracket_count = 0;
556            let mut type_end = bracket_start;
557
558            for (idx, ch) in sql[bracket_start..].char_indices() {
559                let pos = bracket_start + idx;
560                if ch == '<' {
561                    bracket_count += 1;
562                } else if ch == '>' {
563                    bracket_count -= 1;
564                    if bracket_count == 0 {
565                        type_end = pos + 1;
566                        break;
567                    }
568                }
569            }
570
571            if bracket_count == 0 && type_end > type_start {
572                // Extract the full type (STRUCT<...>, ARRAY<...>, or MAP<...>)
573                // Start from after the column name and space
574                let type_start_pos = type_start + col_name.len() + 1;
575                let full_type = sql[type_start_pos..type_end].trim().to_string();
576                matches_to_replace.push((
577                    type_start_pos,
578                    type_end,
579                    col_name.to_string(),
580                    full_type,
581                ));
582            }
583        }
584
585        // Replace matches in reverse order
586        for (start, end, col_name, full_type) in matches_to_replace.iter().rev() {
587            column_types.push((col_name.clone(), full_type.clone()));
588            result.replace_range(*start..*end, "STRING");
589        }
590
591        (result, column_types)
592    }
593
594    /// Parse SQL and extract table definitions
595    ///
596    /// # Arguments
597    ///
598    /// * `sql` - SQL string containing CREATE TABLE, CREATE VIEW, or CREATE MATERIALIZED VIEW statements
599    ///
600    /// # Returns
601    ///
602    /// An `ImportResult` containing extracted tables/views and any parse errors.
603    ///
604    /// # Example - Standard SQL
605    ///
606    /// ```rust
607    /// use data_modelling_sdk::import::sql::SQLImporter;
608    ///
609    /// let importer = SQLImporter::new("postgres");
610    /// let sql = "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(100));";
611    /// let result = importer.parse(sql).unwrap();
612    /// assert_eq!(result.tables.len(), 1);
613    /// ```
614    ///
615    /// # Example - Databricks SQL with IDENTIFIER()
616    ///
617    /// ```rust
618    /// use data_modelling_sdk::import::sql::SQLImporter;
619    ///
620    /// let importer = SQLImporter::new("databricks");
621    /// let sql = "CREATE TABLE IDENTIFIER(:catalog || '.schema.table') (id STRING, name STRING);";
622    /// let result = importer.parse(sql).unwrap();
623    /// assert_eq!(result.tables.len(), 1);
624    /// assert_eq!(result.tables[0].name.as_deref(), Some("schema.table"));
625    /// ```
626    ///
627    /// # Example - Databricks SQL with Views
628    ///
629    /// ```rust
630    /// use data_modelling_sdk::import::sql::SQLImporter;
631    ///
632    /// let importer = SQLImporter::new("databricks");
633    /// let sql = "CREATE VIEW example_view AS SELECT * FROM table1;";
634    /// let result = importer.parse(sql).unwrap();
635    /// assert_eq!(result.tables.len(), 1);
636    /// ```
637    pub fn parse(&self, sql: &str) -> Result<ImportResult> {
638        // Minimal preprocessing: only handle variable replacement and unsupported clauses
639        // For Databricks: let DatabricksDialect try to parse STRUCT/ARRAY first, then restore full type strings
640        // For BigQuery/Hive: can parse STRUCT/ARRAY directly without extraction
641        // For other dialects: extract STRUCT/ARRAY if present (they may not support them)
642        let (preprocessed_sql, preprocessing_state, complex_types) = if self.dialect.to_lowercase()
643            == "databricks"
644        {
645            let mut state = PreprocessingState::new();
646            let mut preprocessed = sql.to_string();
647
648            // Step 1: Replace IDENTIFIER() expressions (needed for variable table names)
649            preprocessed = Self::preprocess_identifier_expressions(&preprocessed, &mut state);
650            // Step 2: Replace variable references in column definitions (e.g., "id :var STRING" -> "id STRING")
651            preprocessed = Self::replace_variables_in_column_definitions(&preprocessed);
652            // Step 3: Replace variable references in type definitions (e.g., STRUCT<field: :type> -> STRUCT<field: STRING>)
653            preprocessed = Self::replace_nested_variables(&preprocessed);
654            // Step 4: Remove unsupported clauses that break parsing
655            preprocessed = Self::preprocess_materialized_views(&preprocessed);
656            preprocessed = Self::preprocess_table_comment(&preprocessed);
657            preprocessed = Self::preprocess_tblproperties(&preprocessed);
658            preprocessed = Self::preprocess_cluster_by(&preprocessed);
659            preprocessed = Self::preprocess_using_format(&preprocessed);
660            // Step 5: Normalize SQL (handle multiline) - needed for regex matching
661            let normalized = Self::normalize_sql_preserving_quotes(&preprocessed);
662            // Step 6: Convert backslash-escaped quotes (sqlparser doesn't support \' escape sequences)
663            let normalized = Self::convert_backslash_escaped_quotes(&normalized);
664
665            // Step 7: Try parsing with DatabricksDialect first (without extraction)
666            let dialect = self.dialect_impl();
667            let parse_result = Parser::parse_sql(dialect.as_ref(), &normalized);
668
669            // If parsing fails, extract STRUCT/ARRAY types and try again
670            // Otherwise, extract type strings for restoration (without modifying SQL)
671            let (final_sql, complex_cols) = if parse_result.is_err() {
672                // Parsing failed - extract STRUCT/ARRAY/MAP columns and replace with STRING
673                let (simplified, cols) = Self::extract_complex_type_columns(&normalized);
674                (simplified, cols)
675            } else {
676                // Parsing succeeded - extract type strings for restoration after parsing
677                // We'll restore the full type strings in parse_create_table_with_preprocessing
678                let (_, cols) = Self::extract_complex_type_columns(&normalized);
679                (normalized, cols)
680            };
681
682            (final_sql, state, complex_cols)
683        } else if matches!(self.dialect.to_lowercase().as_str(), "bigquery" | "hive") {
684            // BigQuery/Hive: can parse STRUCT/ARRAY directly, just normalize
685            let normalized = Self::normalize_sql_preserving_quotes(sql);
686            let normalized = Self::convert_backslash_escaped_quotes(&normalized);
687            (normalized, PreprocessingState::new(), Vec::new())
688        } else {
689            // Other dialects: extract STRUCT/ARRAY if present (they may not support them)
690            let normalized = Self::normalize_sql_preserving_quotes(sql);
691            let normalized = Self::convert_backslash_escaped_quotes(&normalized);
692            let (simplified_sql, complex_cols) = Self::extract_complex_type_columns(&normalized);
693            (simplified_sql, PreprocessingState::new(), complex_cols)
694        };
695
696        let dialect = self.dialect_impl();
697        let statements = match Parser::parse_sql(dialect.as_ref(), &preprocessed_sql) {
698            Ok(stmts) => stmts,
699            Err(e) => {
700                return Ok(ImportResult {
701                    tables: Vec::new(),
702                    tables_requiring_name: Vec::new(),
703                    errors: vec![ImportError::ParseError(e.to_string())],
704                    ai_suggestions: None,
705                });
706            }
707        };
708
709        let mut tables = Vec::new();
710        let mut errors = Vec::new();
711        let mut tables_requiring_name = Vec::new();
712
713        for (idx, stmt) in statements.into_iter().enumerate() {
714            match stmt {
715                Statement::CreateTable(create) => {
716                    match self.parse_create_table_with_preprocessing(
717                        idx,
718                        &create.name,
719                        &create.columns,
720                        &create.constraints,
721                        &preprocessing_state,
722                        &complex_types,
723                    ) {
724                        Ok((table, requires_name)) => {
725                            if requires_name {
726                                tables_requiring_name.push(super::TableRequiringName {
727                                    table_index: idx,
728                                    suggested_name: None,
729                                });
730                            }
731                            tables.push(table);
732                        }
733                        Err(e) => errors.push(ImportError::ParseError(e)),
734                    }
735                }
736                Statement::CreateView(create_view) => {
737                    match self.parse_create_view(idx, &create_view.name, &preprocessing_state) {
738                        Ok((table, requires_name)) => {
739                            if requires_name {
740                                tables_requiring_name.push(super::TableRequiringName {
741                                    table_index: idx,
742                                    suggested_name: None,
743                                });
744                            }
745                            tables.push(table);
746                        }
747                        Err(e) => errors.push(ImportError::ParseError(e)),
748                    }
749                }
750                _ => {
751                    // Other statements (INSERT, UPDATE, DELETE, etc.) are ignored.
752                }
753            }
754        }
755
756        Ok(ImportResult {
757            tables,
758            tables_requiring_name,
759            errors,
760            ai_suggestions: None,
761        })
762    }
763
764    /// Parse SQL with Liquibase format support
765    ///
766    /// Strips Liquibase directive comments (--liquibase formatted sql, --changeset, etc.)
767    /// before parsing the SQL.
768    ///
769    /// # Arguments
770    ///
771    /// * `sql` - SQL string with optional Liquibase comments
772    ///
773    /// # Returns
774    ///
775    /// An `ImportResult` containing extracted tables.
776    ///
777    /// # Example
778    ///
779    /// ```rust
780    /// use data_modelling_sdk::import::sql::SQLImporter;
781    ///
782    /// let importer = SQLImporter::new("postgres");
783    /// let sql = r#"
784    /// --liquibase formatted sql
785    /// --changeset user:1
786    /// CREATE TABLE users (id INT);
787    /// "#;
788    /// let result = importer.parse_liquibase(sql).unwrap();
789    /// ```
790    pub fn parse_liquibase(&self, sql: &str) -> Result<ImportResult> {
791        // Liquibase "formatted SQL" is still SQL, but often includes directive comments like:
792        // --liquibase formatted sql
793        // --changeset user:id
794        // We strip those comment lines, then parse the remaining SQL.
795        let cleaned = sql
796            .lines()
797            .filter(|l| {
798                let t = l.trim_start();
799                if !t.starts_with("--") {
800                    return true;
801                }
802                // Keep regular SQL comments? For now, drop all -- lines to avoid parser issues.
803                false
804            })
805            .collect::<Vec<_>>()
806            .join("\n");
807
808        self.parse(&cleaned)
809    }
810
811    fn dialect_impl(&self) -> Box<dyn Dialect + Send + Sync> {
812        match self.dialect.to_lowercase().as_str() {
813            "ansi" => Box::new(AnsiDialect {}),
814            "bigquery" => Box::new(BigQueryDialect {}),
815            "databricks" => Box::new(CustomDatabricksDialect::new()),
816            "hive" => Box::new(HiveDialect {}),
817            "mssql" | "sqlserver" => Box::new(MsSqlDialect {}),
818            "mysql" => Box::new(MySqlDialect {}),
819            "postgres" | "postgresql" => Box::new(PostgreSqlDialect {}),
820            "sqlite" => Box::new(SQLiteDialect {}),
821            _ => Box::new(GenericDialect {}),
822        }
823    }
824
825    /// Strip quote characters from an identifier
826    ///
827    /// Handles various SQL quoting styles:
828    /// - Double quotes: `"identifier"` -> `identifier`
829    /// - Backticks: `` `identifier` `` -> `identifier`
830    /// - Brackets: `[identifier]` -> `identifier`
831    ///
832    /// Also handles escaped quotes within identifiers:
833    /// - `""` -> `"` (PostgreSQL style)
834    /// - ``` `` ``` -> `` ` `` (MySQL style)
835    /// - `]]` -> `]` (SQL Server style)
836    fn unquote_identifier(identifier: &str) -> String {
837        let trimmed = identifier.trim();
838
839        // Check for double quotes
840        if trimmed.starts_with('"') && trimmed.ends_with('"') && trimmed.len() >= 2 {
841            let inner = &trimmed[1..trimmed.len() - 1];
842            return inner.replace("\"\"", "\"");
843        }
844
845        // Check for backticks (MySQL)
846        if trimmed.starts_with('`') && trimmed.ends_with('`') && trimmed.len() >= 2 {
847            let inner = &trimmed[1..trimmed.len() - 1];
848            return inner.replace("``", "`");
849        }
850
851        // Check for brackets (SQL Server)
852        if trimmed.starts_with('[') && trimmed.ends_with(']') && trimmed.len() >= 2 {
853            let inner = &trimmed[1..trimmed.len() - 1];
854            return inner.replace("]]", "]");
855        }
856
857        // No quoting detected, return as-is
858        trimmed.to_string()
859    }
860
861    fn object_name_to_string(name: &ObjectName) -> String {
862        // Use final identifier (supports schema-qualified names).
863        // In sqlparser 0.60, ObjectNamePart might be a different structure
864        // Fall back to to_string() which should work
865        let raw_name = name
866            .0
867            .last()
868            .map(|ident| {
869                // Try to get the identifier value - structure may have changed in 0.60
870                // Use to_string() as fallback
871                ident.to_string()
872            })
873            .unwrap_or_else(|| name.to_string());
874
875        // Strip any quote characters from the identifier
876        Self::unquote_identifier(&raw_name)
877    }
878
879    fn parse_create_table_with_preprocessing(
880        &self,
881        table_index: usize,
882        name: &ObjectName,
883        columns: &[ColumnDef],
884        constraints: &[TableConstraint],
885        preprocessing_state: &PreprocessingState,
886        complex_types: &[(String, String)],
887    ) -> std::result::Result<(TableData, bool), String> {
888        let mut table_name = Self::object_name_to_string(name);
889        let mut requires_name = false;
890
891        // Check if this is a placeholder table name from IDENTIFIER() preprocessing
892        if table_name.starts_with("__databricks_table_")
893            && let Some(original_expr) =
894                preprocessing_state.identifier_replacements.get(&table_name)
895        {
896            // Try to extract table name from the original expression
897            if let Some(extracted_name) = Self::extract_identifier_table_name(original_expr) {
898                table_name = extracted_name;
899            } else {
900                // Expression contains only variables - mark as requiring name
901                requires_name = true;
902            }
903        }
904
905        // Validate table name (warnings are logged but don't fail import)
906        if let Err(e) = validate_table_name(&table_name) {
907            // Log warning but continue - imported SQL may have valid but unusual names
908            tracing::warn!("Table name validation warning: {}", e);
909        }
910
911        // Collect PK columns from table-level constraints.
912        let mut pk_cols = std::collections::HashSet::<String>::new();
913        for c in constraints {
914            if let TableConstraint::PrimaryKey(pk_constraint) = c {
915                for col in &pk_constraint.columns {
916                    // In sqlparser 0.60, IndexColumn structure may have changed
917                    // Try to get the column name - might be a field or method
918                    // Unquote the column name to match against column definitions
919                    pk_cols.insert(Self::unquote_identifier(&col.to_string()));
920                }
921            }
922        }
923
924        let mut out_cols = Vec::new();
925        for col in columns {
926            let mut nullable = true;
927            let mut is_pk = false;
928
929            for opt_def in &col.options {
930                match &opt_def.option {
931                    ColumnOption::NotNull => nullable = false,
932                    ColumnOption::Null => nullable = true,
933                    ColumnOption::Unique(_) => {
934                        // UNIQUE constraint (not primary key)
935                    }
936                    ColumnOption::PrimaryKey(_) => {
937                        // In sqlparser 0.60, PRIMARY KEY is a separate variant
938                        is_pk = true;
939                    }
940                    _ => {}
941                }
942            }
943
944            let col_name = Self::unquote_identifier(&col.name.value);
945
946            if pk_cols.contains(&col_name) {
947                is_pk = true;
948            }
949            let mut data_type = col.data_type.to_string();
950            let mut description = None;
951
952            // Extract COMMENT clause from column options
953            for opt_def in &col.options {
954                if let ColumnOption::Comment(comment) = &opt_def.option {
955                    description = Some(comment.clone());
956                }
957            }
958
959            // Restore complex types (STRUCT/ARRAY/MAP) if this column was extracted
960            // Keep the full type string - we'll simplify it when converting to Column model
961            if let Some((_, original_type)) =
962                complex_types.iter().find(|(name, _)| name == &col_name)
963            {
964                data_type = original_type.clone();
965            }
966
967            // Validate column name and data type (warnings are logged but don't fail import)
968            if let Err(e) = validate_column_name(&col_name) {
969                tracing::warn!("Column name validation warning for '{}': {}", col_name, e);
970            }
971            if let Err(e) = validate_data_type(&data_type) {
972                tracing::warn!("Data type validation warning for '{}': {}", data_type, e);
973            }
974
975            out_cols.push(ColumnData {
976                name: col_name,
977                data_type,
978                nullable,
979                primary_key: is_pk,
980                description,
981                ..Default::default()
982            });
983        }
984
985        Ok((
986            TableData {
987                table_index,
988                name: Some(table_name),
989                columns: out_cols,
990            },
991            requires_name,
992        ))
993    }
994
995    /// Parse CREATE VIEW statement
996    ///
997    /// Extracts view name and creates a TableData entry for the view.
998    /// Views are treated as table-like entities for data modeling purposes.
999    fn parse_create_view(
1000        &self,
1001        view_index: usize,
1002        name: &ObjectName,
1003        preprocessing_state: &PreprocessingState,
1004    ) -> std::result::Result<(TableData, bool), String> {
1005        let mut view_name = Self::object_name_to_string(name);
1006        let mut requires_name = false;
1007
1008        // Check if this is a placeholder view name from IDENTIFIER() preprocessing
1009        if view_name.starts_with("__databricks_table_")
1010            && let Some(original_expr) = preprocessing_state.identifier_replacements.get(&view_name)
1011        {
1012            // Try to extract view name from the original expression
1013            if let Some(extracted_name) = Self::extract_identifier_table_name(original_expr) {
1014                view_name = extracted_name;
1015            } else {
1016                // Expression contains only variables - mark as requiring name
1017                requires_name = true;
1018            }
1019        }
1020
1021        // Validate view name
1022        if let Err(e) = validate_table_name(&view_name) {
1023            tracing::warn!("View name validation warning: {}", e);
1024        }
1025
1026        // Views don't have explicit column definitions in CREATE VIEW statements
1027        // The columns are derived from the SELECT query, which we don't parse here
1028        // So we create a view with empty columns - the actual columns would need
1029        // to be extracted from the query if needed in the future
1030        Ok((
1031            TableData {
1032                table_index: view_index,
1033                name: Some(view_name),
1034                columns: Vec::new(), // Views don't have explicit column definitions
1035            },
1036            requires_name,
1037        ))
1038    }
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043    use super::*;
1044
1045    #[test]
1046    fn test_sql_importer_default() {
1047        let importer = SQLImporter::default();
1048        assert_eq!(importer.dialect, "generic");
1049    }
1050
1051    #[test]
1052    fn test_sql_importer_parse_basic() {
1053        let importer = SQLImporter::new("postgres");
1054        let result = importer
1055            .parse("CREATE TABLE test (id INT PRIMARY KEY, name TEXT NOT NULL);")
1056            .unwrap();
1057        assert!(result.errors.is_empty());
1058        assert_eq!(result.tables.len(), 1);
1059        let t = &result.tables[0];
1060        assert_eq!(t.name.as_deref(), Some("test"));
1061        assert_eq!(t.columns.len(), 2);
1062        assert!(t.columns.iter().any(|c| c.name == "id" && c.primary_key));
1063        assert!(t.columns.iter().any(|c| c.name == "name" && !c.nullable));
1064    }
1065
1066    #[test]
1067    fn test_sql_importer_parse_table_pk_constraint() {
1068        let importer = SQLImporter::new("postgres");
1069        let result = importer
1070            .parse("CREATE TABLE t (id INT, name TEXT, CONSTRAINT pk PRIMARY KEY (id));")
1071            .unwrap();
1072        assert!(result.errors.is_empty());
1073        assert_eq!(result.tables.len(), 1);
1074        let t = &result.tables[0];
1075        assert!(t.columns.iter().any(|c| c.name == "id" && c.primary_key));
1076    }
1077
1078    #[test]
1079    fn test_sql_importer_parse_liquibase_formatted_sql() {
1080        let importer = SQLImporter::new("postgres");
1081        let result = importer
1082            .parse_liquibase(
1083                "--liquibase formatted sql\n--changeset user:1\nCREATE TABLE test (id INT);\n",
1084            )
1085            .unwrap();
1086        assert!(result.errors.is_empty());
1087        assert_eq!(result.tables.len(), 1);
1088    }
1089
1090    #[test]
1091    fn test_databricks_identifier_with_literal() {
1092        let importer = SQLImporter::new("databricks");
1093        let sql = "CREATE TABLE IDENTIFIER('test_table') (id STRING);";
1094        let result = importer.parse(sql).unwrap();
1095        assert!(result.errors.is_empty());
1096        assert_eq!(result.tables.len(), 1);
1097        assert_eq!(result.tables[0].name.as_deref(), Some("test_table"));
1098    }
1099
1100    #[test]
1101    fn test_databricks_identifier_with_variable() {
1102        let importer = SQLImporter::new("databricks");
1103        let sql = "CREATE TABLE IDENTIFIER(:table_name) (id STRING);";
1104        let result = importer.parse(sql).unwrap();
1105        // Should create placeholder table name and add to tables_requiring_name
1106        assert_eq!(result.tables.len(), 1);
1107        assert!(
1108            result.tables[0]
1109                .name
1110                .as_deref()
1111                .unwrap()
1112                .starts_with("__databricks_table_")
1113        );
1114        assert_eq!(result.tables_requiring_name.len(), 1);
1115    }
1116
1117    #[test]
1118    fn test_databricks_identifier_with_concatenation() {
1119        let importer = SQLImporter::new("databricks");
1120        let sql = "CREATE TABLE IDENTIFIER(:catalog || '.schema.table') (id STRING);";
1121        let result = importer.parse(sql).unwrap();
1122        assert!(result.errors.is_empty());
1123        assert_eq!(result.tables.len(), 1);
1124        // Should extract table name from concatenation
1125        assert_eq!(result.tables[0].name.as_deref(), Some("schema.table"));
1126    }
1127
1128    #[test]
1129    fn test_databricks_variable_in_struct() {
1130        let importer = SQLImporter::new("databricks");
1131        let sql = "CREATE TABLE example (metadata STRUCT<key: STRING, value: :variable_type, timestamp: TIMESTAMP>);";
1132        let result = importer.parse(sql).unwrap();
1133        if !result.errors.is_empty() {
1134            eprintln!("Parse errors: {:?}", result.errors);
1135        }
1136        assert!(result.errors.is_empty());
1137        assert_eq!(result.tables.len(), 1);
1138        // Variable should be replaced with STRING
1139        assert!(
1140            result.tables[0].columns[0]
1141                .data_type
1142                .contains("value: STRING")
1143        );
1144    }
1145
1146    #[test]
1147    fn test_databricks_variable_in_array() {
1148        let importer = SQLImporter::new("databricks");
1149        let sql = "CREATE TABLE example (items ARRAY<:element_type>);";
1150        let result = importer.parse(sql).unwrap();
1151        assert!(result.errors.is_empty());
1152        assert_eq!(result.tables.len(), 1);
1153        // Variable should be replaced with STRING
1154        assert_eq!(result.tables[0].columns[0].data_type, "ARRAY<STRING>");
1155    }
1156
1157    #[test]
1158    fn test_databricks_nested_variables() {
1159        let importer = SQLImporter::new("databricks");
1160        let sql = "CREATE TABLE example (events ARRAY<STRUCT<id: STRING, name: STRING, details: STRUCT<name: STRING, status: :variable_type, timestamp: TIMESTAMP>>>);";
1161        let result = importer.parse(sql).unwrap();
1162        if !result.errors.is_empty() {
1163            eprintln!("Parse errors: {:?}", result.errors);
1164        }
1165        assert!(result.errors.is_empty());
1166        assert_eq!(result.tables.len(), 1);
1167        // Nested variable should be replaced with STRING
1168        assert!(
1169            result.tables[0].columns[0]
1170                .data_type
1171                .contains("status: STRING")
1172        );
1173    }
1174
1175    #[test]
1176    fn test_databricks_comment_variable() {
1177        let importer = SQLImporter::new("databricks");
1178        let sql = "CREATE TABLE example (id STRING) COMMENT ':comment_variable';";
1179        let result = importer.parse(sql).unwrap();
1180        assert!(result.errors.is_empty());
1181        assert_eq!(result.tables.len(), 1);
1182    }
1183
1184    #[test]
1185    fn test_databricks_tblproperties_variable() {
1186        let importer = SQLImporter::new("databricks");
1187        let sql = "CREATE TABLE example (id STRING) TBLPROPERTIES ('key1' = ':variable_value', 'key2' = 'static_value');";
1188        let result = importer.parse(sql).unwrap();
1189        assert!(result.errors.is_empty());
1190        assert_eq!(result.tables.len(), 1);
1191    }
1192
1193    #[test]
1194    fn test_databricks_column_variable() {
1195        let importer = SQLImporter::new("databricks");
1196        // Test column definition with variable reference like "column_name :variable STRING"
1197        // This pattern may need preprocessing to remove the variable
1198        let sql = "CREATE TABLE example (id :id_var STRING, name :name_var STRING);";
1199        let result = importer.parse(sql).unwrap();
1200        assert!(result.errors.is_empty());
1201        assert_eq!(result.tables.len(), 1);
1202        assert_eq!(result.tables[0].columns.len(), 2);
1203    }
1204
1205    #[test]
1206    fn test_databricks_create_view() {
1207        let importer = SQLImporter::new("databricks");
1208        let sql = "CREATE VIEW example_view AS SELECT id, name FROM source_table;";
1209        let result = importer.parse(sql).unwrap();
1210        // Views should be imported as table-like entities
1211        assert!(result.errors.is_empty());
1212        assert_eq!(result.tables.len(), 1);
1213        assert_eq!(result.tables[0].name.as_deref(), Some("example_view"));
1214    }
1215
1216    #[test]
1217    fn test_databricks_view_with_identifier() {
1218        let importer = SQLImporter::new("databricks");
1219        let sql =
1220            "CREATE VIEW IDENTIFIER(:catalog || '.schema.view_name') AS SELECT * FROM table1;";
1221        let result = importer.parse(sql).unwrap();
1222        assert!(result.errors.is_empty());
1223        assert_eq!(result.tables.len(), 1);
1224        // Should extract view name from IDENTIFIER() expression
1225        assert_eq!(result.tables[0].name.as_deref(), Some("schema.view_name"));
1226    }
1227
1228    #[test]
1229    fn test_databricks_create_materialized_view() {
1230        let importer = SQLImporter::new("databricks");
1231        // MATERIALIZED VIEW is preprocessed to CREATE VIEW for sqlparser compatibility
1232        let sql = "CREATE MATERIALIZED VIEW mv_example AS SELECT id, name FROM source_table;";
1233        let result = importer.parse(sql).unwrap();
1234        assert!(result.errors.is_empty());
1235        assert_eq!(result.tables.len(), 1);
1236        assert_eq!(result.tables[0].name.as_deref(), Some("mv_example"));
1237    }
1238
1239    #[test]
1240    fn test_databricks_using_delta() {
1241        let importer = SQLImporter::new("databricks");
1242        // USING DELTA is preprocessed out for sqlparser compatibility
1243        let sql = "CREATE TABLE delta_table (id INT, name STRING) USING DELTA;";
1244        let result = importer.parse(sql).unwrap();
1245        assert!(result.errors.is_empty(), "Errors: {:?}", result.errors);
1246        assert_eq!(result.tables.len(), 1);
1247        assert_eq!(result.tables[0].name.as_deref(), Some("delta_table"));
1248        assert_eq!(result.tables[0].columns.len(), 2);
1249    }
1250
1251    #[test]
1252    fn test_databricks_using_parquet() {
1253        let importer = SQLImporter::new("databricks");
1254        // USING PARQUET is preprocessed out for sqlparser compatibility
1255        let sql = "CREATE TABLE parquet_table (id INT, data STRING) USING PARQUET;";
1256        let result = importer.parse(sql).unwrap();
1257        assert!(result.errors.is_empty(), "Errors: {:?}", result.errors);
1258        assert_eq!(result.tables.len(), 1);
1259        assert_eq!(result.tables[0].name.as_deref(), Some("parquet_table"));
1260    }
1261
1262    #[test]
1263    fn test_databricks_using_delta_with_tblproperties() {
1264        let importer = SQLImporter::new("databricks");
1265        // Combined USING DELTA and TBLPROPERTIES - both should be removed
1266        let sql = r#"CREATE TABLE complex_table (
1267            id INT,
1268            name STRING
1269        ) USING DELTA
1270        TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true');"#;
1271        let result = importer.parse(sql).unwrap();
1272        assert!(result.errors.is_empty(), "Errors: {:?}", result.errors);
1273        assert_eq!(result.tables.len(), 1);
1274        assert_eq!(result.tables[0].name.as_deref(), Some("complex_table"));
1275        assert_eq!(result.tables[0].columns.len(), 2);
1276    }
1277
1278    #[test]
1279    fn test_databricks_materialized_view_using_delta() {
1280        let importer = SQLImporter::new("databricks");
1281        // Combined MATERIALIZED VIEW - should work
1282        let sql = "CREATE MATERIALIZED VIEW mv_delta AS SELECT id FROM source;";
1283        let result = importer.parse(sql).unwrap();
1284        assert!(result.errors.is_empty(), "Errors: {:?}", result.errors);
1285        assert_eq!(result.tables.len(), 1);
1286        assert_eq!(result.tables[0].name.as_deref(), Some("mv_delta"));
1287    }
1288}