data_modelling_sdk/import/
sql.rs

1//! SQL Import functionality
2//!
3//! Provides parsing of CREATE TABLE statements from various SQL dialects.
4//!
5//! Uses `sqlparser` to parse CREATE TABLE statements into SDK import primitives.
6//!
7//! # Validation
8//!
9//! All imported table and column names are validated for:
10//! - Valid identifier format
11//! - Maximum length limits
12//! - SQL reserved word detection
13
14use super::{ColumnData, ImportError, ImportResult, TableData};
15use crate::validation::input::{validate_column_name, validate_data_type, validate_table_name};
16use anyhow::Result;
17use sqlparser::ast::{ColumnDef, ColumnOption, ObjectName, Statement, TableConstraint};
18use sqlparser::dialect::{Dialect, GenericDialect, MySqlDialect, PostgreSqlDialect, SQLiteDialect};
19use sqlparser::parser::Parser;
20use std::collections::HashMap;
21
22/// Databricks SQL dialect implementation
23///
24/// Extends GenericDialect to support Databricks-specific syntax patterns:
25/// - Variable references (`:variable_name`) in identifiers
26/// - Backtick-quoted identifiers
27#[derive(Debug)]
28struct DatabricksDialect;
29
30impl Dialect for DatabricksDialect {
31    fn is_identifier_start(&self, ch: char) -> bool {
32        // Allow ':' as identifier start for variable references like :variable_name
33        ch.is_alphabetic() || ch == '_' || ch == ':'
34    }
35
36    fn is_identifier_part(&self, ch: char) -> bool {
37        // Allow ':' as identifier part for variable references
38        ch.is_alphanumeric() || ch == '_' || ch == ':'
39    }
40
41    fn is_delimited_identifier_start(&self, ch: char) -> bool {
42        // Support backtick-quoted identifiers (Databricks style)
43        ch == '"' || ch == '`' || ch == '['
44    }
45}
46
47/// Tracks preprocessing transformations applied to SQL
48#[derive(Debug)]
49struct PreprocessingState {
50    /// Maps placeholder table names to original IDENTIFIER() expressions
51    identifier_replacements: HashMap<String, String>,
52    /// Tracks variable references replaced in type definitions
53    #[allow(dead_code)] // Reserved for future use
54    variable_replacements: Vec<(String, String)>,
55}
56
57impl PreprocessingState {
58    fn new() -> Self {
59        Self {
60            identifier_replacements: HashMap::new(),
61            variable_replacements: Vec::new(),
62        }
63    }
64}
65
66/// SQL Importer - parses CREATE TABLE statements
67pub struct SQLImporter {
68    /// SQL dialect to use for parsing
69    pub dialect: String,
70}
71
72impl Default for SQLImporter {
73    fn default() -> Self {
74        Self {
75            dialect: "generic".to_string(),
76        }
77    }
78}
79
80impl SQLImporter {
81    /// Create a new SQL importer with the specified dialect
82    ///
83    /// # Arguments
84    ///
85    /// * `dialect` - SQL dialect name ("postgres", "mysql", "sqlite", "generic", "databricks")
86    ///
87    /// # Supported Dialects
88    ///
89    /// - **postgres** / **postgresql**: PostgreSQL dialect
90    /// - **mysql**: MySQL dialect
91    /// - **sqlite**: SQLite dialect
92    /// - **generic**: Generic SQL dialect (default)
93    /// - **databricks**: Databricks SQL dialect with support for:
94    ///   - `IDENTIFIER()` function calls in table/view names
95    ///   - Variable references (`:variable_name`) in type definitions, column definitions, and metadata
96    ///   - `STRUCT` and `ARRAY` complex types
97    ///   - `CREATE VIEW` and `CREATE MATERIALIZED VIEW` statements
98    ///
99    /// # Example
100    ///
101    /// ```rust
102    /// use data_modelling_sdk::import::sql::SQLImporter;
103    ///
104    /// // Standard SQL dialect
105    /// let importer = SQLImporter::new("postgres");
106    ///
107    /// // Databricks SQL dialect
108    /// let databricks_importer = SQLImporter::new("databricks");
109    /// ```
110    pub fn new(dialect: &str) -> Self {
111        Self {
112            dialect: dialect.to_string(),
113        }
114    }
115
116    /// Preprocess Databricks SQL to handle IDENTIFIER() expressions
117    ///
118    /// Replaces IDENTIFIER() function calls with placeholder table names
119    /// and tracks the original expressions for later extraction.
120    fn preprocess_identifier_expressions(sql: &str, state: &mut PreprocessingState) -> String {
121        use regex::Regex;
122
123        // Pattern to match IDENTIFIER(...) expressions
124        let re = Regex::new(r"(?i)IDENTIFIER\s*\(\s*([^)]+)\s*\)").unwrap();
125        let mut counter = 0;
126
127        re.replace_all(sql, |caps: &regex::Captures| {
128            let expr = caps.get(1).map(|m| m.as_str()).unwrap_or("");
129            counter += 1;
130            let placeholder = format!("__databricks_table_{}__", counter);
131
132            // Store the mapping for later extraction
133            state
134                .identifier_replacements
135                .insert(placeholder.clone(), expr.to_string());
136
137            placeholder
138        })
139        .to_string()
140    }
141
142    /// Extract table name from IDENTIFIER() expression
143    ///
144    /// If the expression contains string literals, extracts and constructs the table name.
145    /// Returns None if expression contains only variables.
146    fn extract_identifier_table_name(expr: &str) -> Option<String> {
147        use regex::Regex;
148
149        // Check if expression contains string literals (single or double quoted)
150        let literal_re = Regex::new(r#"(?:'([^']*)'|"([^"]*)")"#).unwrap();
151        let mut parts = Vec::new();
152
153        // Extract all string literals
154        for cap in literal_re.captures_iter(expr) {
155            if let Some(m) = cap.get(1) {
156                parts.push(m.as_str().to_string());
157            } else if let Some(m) = cap.get(2) {
158                parts.push(m.as_str().to_string());
159            }
160        }
161
162        if parts.is_empty() {
163            // No literals found - expression contains only variables
164            return None;
165        }
166
167        // Join literals and remove leading/trailing dots
168        let result = parts.join("");
169        Some(result.trim_matches('.').to_string())
170    }
171
172    /// Handle IDENTIFIER() expressions containing only variables
173    ///
174    /// Creates placeholder table names and marks tables as requiring name resolution.
175    #[allow(dead_code)] // Reserved for future use
176    fn handle_identifier_variables(placeholder: &str, _expr: &str) -> String {
177        // Return the placeholder as-is - it will be marked in tables_requiring_name
178        placeholder.to_string()
179    }
180
181    /// Preprocess CREATE MATERIALIZED VIEW to CREATE VIEW
182    ///
183    /// sqlparser may not support MATERIALIZED VIEW directly, so we convert it to CREATE VIEW
184    /// This allows parsing to succeed while preserving the intent.
185    fn preprocess_materialized_views(sql: &str) -> String {
186        use regex::Regex;
187
188        // Replace CREATE MATERIALIZED VIEW with CREATE VIEW
189        let re = Regex::new(r"(?i)CREATE\s+MATERIALIZED\s+VIEW").unwrap();
190        re.replace_all(sql, "CREATE VIEW").to_string()
191    }
192
193    /// Preprocess table-level COMMENT clause removal
194    ///
195    /// sqlparser does not support table-level COMMENT clauses, so we remove them before parsing.
196    /// Handles both single-quoted and double-quoted COMMENT strings.
197    /// Table-level COMMENT appears after the closing parenthesis, not inside column definitions.
198    fn preprocess_table_comment(sql: &str) -> String {
199        use regex::Regex;
200
201        // Remove COMMENT clause that appears after closing parenthesis of CREATE TABLE
202        // Pattern: ) followed by whitespace, then COMMENT, then quoted string
203        // This ensures we only match table-level COMMENT, not column-level COMMENT
204        // Match: )\s+COMMENT\s+['"]...['"]
205        let re_single = Regex::new(r#"(?i)\)\s+COMMENT\s+'[^']*'"#).unwrap();
206        let re_double = Regex::new(r#"(?i)\)\s+COMMENT\s+"[^"]*""#).unwrap();
207
208        // Replace with just the closing parenthesis
209        let result = re_single.replace_all(sql, ")");
210        re_double.replace_all(&result, ")").to_string()
211    }
212
213    /// Preprocess TBLPROPERTIES clause removal
214    ///
215    /// sqlparser does not support TBLPROPERTIES, so we remove it before parsing.
216    /// This preserves the rest of the SQL structure while allowing parsing to succeed.
217    fn preprocess_tblproperties(sql: &str) -> String {
218        use regex::Regex;
219
220        // Remove TBLPROPERTIES clause (may span multiple lines)
221        // Pattern matches: TBLPROPERTIES ( ... ) where ... can contain nested parentheses
222        // We need to match balanced parentheses
223        let mut result = sql.to_string();
224        let re = Regex::new(r"(?i)TBLPROPERTIES\s*\(").unwrap();
225
226        // Find all TBLPROPERTIES occurrences and remove them with balanced parentheses
227        let mut search_start = 0;
228        while let Some(m) = re.find_at(&result, search_start) {
229            let start = m.start();
230            let mut pos = m.end();
231            let mut paren_count = 1;
232
233            // Find matching closing parenthesis (using byte positions)
234            let bytes = result.as_bytes();
235            while pos < bytes.len() && paren_count > 0 {
236                if let Some(ch) = result[pos..].chars().next() {
237                    if ch == '(' {
238                        paren_count += 1;
239                    } else if ch == ')' {
240                        paren_count -= 1;
241                    }
242                    pos += ch.len_utf8();
243                } else {
244                    break;
245                }
246            }
247
248            if paren_count == 0 {
249                // Remove TBLPROPERTIES clause including the closing parenthesis
250                result.replace_range(start..pos, "");
251                search_start = start;
252            } else {
253                // Unbalanced parentheses, skip this match
254                search_start = pos;
255            }
256        }
257
258        result
259    }
260
261    /// Preprocess CLUSTER BY clause removal
262    ///
263    /// sqlparser does not support CLUSTER BY, so we remove it before parsing.
264    fn preprocess_cluster_by(sql: &str) -> String {
265        use regex::Regex;
266
267        // Remove CLUSTER BY clause (may have AUTO or column list)
268        // Pattern: CLUSTER BY followed by AUTO or column list
269        let re = Regex::new(r"(?i)\s+CLUSTER\s+BY\s+(?:AUTO|\([^)]*\)|[\w,\s]+)").unwrap();
270        re.replace_all(sql, "").to_string()
271    }
272
273    /// Normalize SQL while preserving quoted strings
274    ///
275    /// Converts multiline SQL to single line, but preserves quoted strings
276    /// (both single and double quotes) to avoid breaking COMMENT clauses and other string literals.
277    /// Handles escape sequences: `\'` (escaped quote) and `\\` (escaped backslash).
278    fn normalize_sql_preserving_quotes(sql: &str) -> String {
279        let mut result = String::with_capacity(sql.len());
280        let mut chars = sql.chars().peekable();
281        let mut in_single_quote = false;
282        let mut in_double_quote = false;
283        let mut last_char_was_space = false;
284
285        while let Some(ch) = chars.next() {
286            match ch {
287                '\\' if in_single_quote || in_double_quote => {
288                    // Handle escape sequences inside quoted strings
289                    // \' or \" or \\ - preserve the escape sequence
290                    if let Some(&next_ch) = chars.peek() {
291                        result.push(ch);
292                        result.push(next_ch);
293                        chars.next(); // Consume the escaped character
294                        last_char_was_space = false;
295                    } else {
296                        // Backslash at end of string - just add it
297                        result.push(ch);
298                        last_char_was_space = false;
299                    }
300                }
301                '\'' if !in_double_quote => {
302                    // Check if this is an escaped quote (doubled quotes: '')
303                    // In SQL standard, '' inside a string means a single quote
304                    if in_single_quote && chars.peek() == Some(&'\'') {
305                        // Doubled quote - this is an escaped quote, not the end of string
306                        result.push(ch);
307                        result.push(ch);
308                        chars.next(); // Consume the second quote
309                        last_char_was_space = false;
310                    } else {
311                        // Regular quote - toggle quote state
312                        in_single_quote = !in_single_quote;
313                        result.push(ch);
314                        last_char_was_space = false;
315                    }
316                }
317                '"' if !in_single_quote => {
318                    // Check if this is an escaped quote (doubled quotes: "")
319                    if in_double_quote && chars.peek() == Some(&'"') {
320                        // Doubled quote - this is an escaped quote, not the end of string
321                        result.push(ch);
322                        result.push(ch);
323                        chars.next(); // Consume the second quote
324                        last_char_was_space = false;
325                    } else {
326                        // Regular quote - toggle quote state
327                        in_double_quote = !in_double_quote;
328                        result.push(ch);
329                        last_char_was_space = false;
330                    }
331                }
332                '\n' | '\r' => {
333                    if in_single_quote || in_double_quote {
334                        // Replace newlines inside quoted strings with space
335                        // sqlparser doesn't support multiline string literals
336                        if !last_char_was_space {
337                            result.push(' ');
338                            last_char_was_space = true;
339                        }
340                    } else {
341                        // Replace newlines outside quotes with space
342                        if !last_char_was_space {
343                            result.push(' ');
344                            last_char_was_space = true;
345                        }
346                    }
347                }
348                ' ' | '\t' => {
349                    if in_single_quote || in_double_quote {
350                        // Preserve spaces inside quoted strings
351                        result.push(ch);
352                        last_char_was_space = false;
353                    } else {
354                        // Collapse multiple spaces to single space
355                        if !last_char_was_space {
356                            result.push(' ');
357                            last_char_was_space = true;
358                        }
359                    }
360                }
361                '-' if !in_single_quote && !in_double_quote => {
362                    // Check for SQL comment (--)
363                    if let Some(&'-') = chars.peek() {
364                        // Skip rest of line comment
365                        for c in chars.by_ref() {
366                            if c == '\n' || c == '\r' {
367                                break;
368                            }
369                        }
370                        if !last_char_was_space {
371                            result.push(' ');
372                            last_char_was_space = true;
373                        }
374                    } else {
375                        result.push(ch);
376                        last_char_was_space = false;
377                    }
378                }
379                _ => {
380                    result.push(ch);
381                    last_char_was_space = false;
382                }
383            }
384        }
385
386        result.trim().to_string()
387    }
388
389    /// Convert backslash-escaped quotes to SQL standard doubled quotes
390    ///
391    /// sqlparser doesn't support `\'` escape sequences, so we convert them to `''`
392    /// which is the SQL standard way to escape quotes in string literals.
393    fn convert_backslash_escaped_quotes(sql: &str) -> String {
394        let mut result = String::with_capacity(sql.len());
395        let mut chars = sql.chars().peekable();
396        let mut in_single_quote = false;
397        let mut in_double_quote = false;
398
399        while let Some(ch) = chars.next() {
400            match ch {
401                '\\' if (in_single_quote || in_double_quote) => {
402                    // Handle escape sequences inside quoted strings
403                    if let Some(&next_ch) = chars.peek() {
404                        match next_ch {
405                            '\'' if in_single_quote => {
406                                // Convert \' to '' (SQL standard escaped quote)
407                                result.push_str("''");
408                                chars.next(); // Consume the escaped quote
409                            }
410                            '"' if in_double_quote => {
411                                // Convert \" to "" (SQL standard escaped quote)
412                                result.push_str("\"\"");
413                                chars.next(); // Consume the escaped quote
414                            }
415                            '\\' => {
416                                // Convert \\ to \\ (keep as is, but we need to handle it)
417                                result.push('\\');
418                                result.push('\\');
419                                chars.next(); // Consume the escaped backslash
420                            }
421                            _ => {
422                                // Other escape sequences - preserve as is
423                                result.push(ch);
424                                result.push(next_ch);
425                                chars.next();
426                            }
427                        }
428                    } else {
429                        // Backslash at end - just add it
430                        result.push(ch);
431                    }
432                }
433                '\'' if !in_double_quote => {
434                    in_single_quote = !in_single_quote;
435                    result.push(ch);
436                }
437                '"' if !in_single_quote => {
438                    in_double_quote = !in_double_quote;
439                    result.push(ch);
440                }
441                _ => {
442                    result.push(ch);
443                }
444            }
445        }
446
447        result
448    }
449
450    /// Replace variable references in STRUCT field types with STRING
451    ///
452    /// Handles patterns like STRUCT<field: :variable_type> -> STRUCT<field: STRING>
453    fn replace_variables_in_struct_types(sql: &str) -> String {
454        use regex::Regex;
455
456        // Pattern to match :variable_type in STRUCT field type definitions
457        // Matches: :variable_name where it appears after a colon (field: :variable)
458        // The pattern looks for colon, optional whitespace, colon, then variable name
459        let re = Regex::new(r":\s*:([a-zA-Z_][a-zA-Z0-9_]*)").unwrap();
460
461        re.replace_all(sql, |_caps: &regex::Captures| {
462            // Replace :variable_name with STRING
463            ": STRING".to_string()
464        })
465        .to_string()
466    }
467
468    /// Replace variable references in ARRAY element types with STRING
469    ///
470    /// Handles patterns like ARRAY<:element_type> -> ARRAY<STRING>
471    fn replace_variables_in_array_types(sql: &str) -> String {
472        use regex::Regex;
473
474        // Pattern to match ARRAY<:variable_type> (but not ARRAY<STRUCT<...>>)
475        // This is tricky - we need to avoid matching inside STRUCT definitions
476        // Simple approach: match ARRAY<:variable> where variable is not STRUCT
477        let re = Regex::new(r"ARRAY\s*<\s*:([a-zA-Z_][a-zA-Z0-9_]*)\s*>").unwrap();
478
479        re.replace_all(sql, |_caps: &regex::Captures| "ARRAY<STRING>".to_string())
480            .to_string()
481    }
482
483    /// Replace variable references in column definitions
484    ///
485    /// Handles patterns like `column_name :variable STRING` by removing the variable reference.
486    /// Example: `id :id_var STRING` -> `id STRING`
487    fn replace_variables_in_column_definitions(sql: &str) -> String {
488        use regex::Regex;
489
490        // Pattern to match column definitions with variable references
491        // Matches: word(s) :variable_name TYPE
492        // Example: "id :id_var STRING" -> "id STRING"
493        let re = Regex::new(r"(\w+)\s+:\w+\s+([A-Z][A-Z0-9_]*(?:<[^>]*>)?)").unwrap();
494
495        re.replace_all(sql, |caps: &regex::Captures| {
496            let col_name = caps.get(1).map(|m| m.as_str()).unwrap_or("");
497            let type_name = caps.get(2).map(|m| m.as_str()).unwrap_or("");
498            format!("{} {}", col_name, type_name)
499        })
500        .to_string()
501    }
502
503    /// Replace nested variable references recursively
504    ///
505    /// Handles patterns like ARRAY<STRUCT<field: :type>> by applying both
506    /// STRUCT and ARRAY replacements recursively.
507    fn replace_nested_variables(sql: &str) -> String {
508        let mut result = sql.to_string();
509        let mut changed = true;
510        let mut iterations = 0;
511        const MAX_ITERATIONS: usize = 10; // Prevent infinite loops
512
513        // Keep applying replacements until no more changes occur
514        while changed && iterations < MAX_ITERATIONS {
515            let before = result.clone();
516
517            // First replace variables in STRUCT types
518            result = Self::replace_variables_in_struct_types(&result);
519
520            // Then replace variables in ARRAY types
521            result = Self::replace_variables_in_array_types(&result);
522
523            // Check if anything changed
524            changed = before != result;
525            iterations += 1;
526        }
527
528        result
529    }
530
531    /// Extract STRUCT, ARRAY, and MAP column definitions and replace with placeholders
532    ///
533    /// sqlparser doesn't support these complex types, so we need to extract them manually
534    /// and replace with a simple type that can be parsed, then restore the original
535    /// type string after parsing.
536    ///
537    /// Assumes SQL is already normalized (single line, single spaces).
538    fn extract_complex_type_columns(sql: &str) -> (String, Vec<(String, String)>) {
539        use regex::Regex;
540
541        let mut column_types = Vec::new();
542        let mut result = sql.to_string();
543
544        // Use regex to find all STRUCT<...>, ARRAY<...>, and MAP<...> patterns with their preceding column names
545        // Pattern: word(s) followed by STRUCT<, ARRAY<, or MAP<, then match balanced brackets
546        let re = Regex::new(r"(\w+)\s+(STRUCT<|ARRAY<|MAP<)").unwrap();
547
548        // Find all matches and extract the full type
549        let mut matches_to_replace: Vec<(usize, usize, String, String)> = Vec::new();
550
551        for cap in re.captures_iter(sql) {
552            let col_name = cap.get(1).map(|m| m.as_str()).unwrap_or("");
553            let type_start = cap.get(0).map(|m| m.start()).unwrap_or(0);
554            let struct_or_array = cap.get(2).map(|m| m.as_str()).unwrap_or("");
555
556            // Find the matching closing bracket
557            // Start counting from the '<' in STRUCT<, ARRAY<, or MAP<
558            let bracket_start = type_start + col_name.len() + 1 + struct_or_array.len() - 1; // After "column_name STRUCT<", "column_name ARRAY<", or "column_name MAP<"
559            let mut bracket_count = 0;
560            let mut type_end = bracket_start;
561
562            for (idx, ch) in sql[bracket_start..].char_indices() {
563                let pos = bracket_start + idx;
564                if ch == '<' {
565                    bracket_count += 1;
566                } else if ch == '>' {
567                    bracket_count -= 1;
568                    if bracket_count == 0 {
569                        type_end = pos + 1;
570                        break;
571                    }
572                }
573            }
574
575            if bracket_count == 0 && type_end > type_start {
576                // Extract the full type (STRUCT<...>, ARRAY<...>, or MAP<...>)
577                // Start from after the column name and space
578                let type_start_pos = type_start + col_name.len() + 1;
579                let full_type = sql[type_start_pos..type_end].trim().to_string();
580                matches_to_replace.push((
581                    type_start_pos,
582                    type_end,
583                    col_name.to_string(),
584                    full_type,
585                ));
586            }
587        }
588
589        // Replace matches in reverse order
590        for (start, end, col_name, full_type) in matches_to_replace.iter().rev() {
591            column_types.push((col_name.clone(), full_type.clone()));
592            result.replace_range(*start..*end, "STRING");
593        }
594
595        (result, column_types)
596    }
597
598    /// Parse SQL and extract table definitions
599    ///
600    /// # Arguments
601    ///
602    /// * `sql` - SQL string containing CREATE TABLE, CREATE VIEW, or CREATE MATERIALIZED VIEW statements
603    ///
604    /// # Returns
605    ///
606    /// An `ImportResult` containing extracted tables/views and any parse errors.
607    ///
608    /// # Example - Standard SQL
609    ///
610    /// ```rust
611    /// use data_modelling_sdk::import::sql::SQLImporter;
612    ///
613    /// let importer = SQLImporter::new("postgres");
614    /// let sql = "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(100));";
615    /// let result = importer.parse(sql).unwrap();
616    /// assert_eq!(result.tables.len(), 1);
617    /// ```
618    ///
619    /// # Example - Databricks SQL with IDENTIFIER()
620    ///
621    /// ```rust
622    /// use data_modelling_sdk::import::sql::SQLImporter;
623    ///
624    /// let importer = SQLImporter::new("databricks");
625    /// let sql = "CREATE TABLE IDENTIFIER(:catalog || '.schema.table') (id STRING, name STRING);";
626    /// let result = importer.parse(sql).unwrap();
627    /// assert_eq!(result.tables.len(), 1);
628    /// assert_eq!(result.tables[0].name.as_deref(), Some("schema.table"));
629    /// ```
630    ///
631    /// # Example - Databricks SQL with Views
632    ///
633    /// ```rust
634    /// use data_modelling_sdk::import::sql::SQLImporter;
635    ///
636    /// let importer = SQLImporter::new("databricks");
637    /// let sql = "CREATE VIEW example_view AS SELECT * FROM table1;";
638    /// let result = importer.parse(sql).unwrap();
639    /// assert_eq!(result.tables.len(), 1);
640    /// ```
641    pub fn parse(&self, sql: &str) -> Result<ImportResult> {
642        // Preprocess SQL if Databricks dialect
643        let (preprocessed_sql, preprocessing_state, complex_types) = if self.dialect.to_lowercase()
644            == "databricks"
645        {
646            let mut state = PreprocessingState::new();
647            // Step 1: Preprocess MATERIALIZED VIEW (convert to CREATE VIEW for sqlparser compatibility)
648            let mut preprocessed = Self::preprocess_materialized_views(sql);
649            // Step 2: Remove table-level COMMENT clauses (sqlparser doesn't support them)
650            preprocessed = Self::preprocess_table_comment(&preprocessed);
651            // Step 3: Remove TBLPROPERTIES (sqlparser doesn't support it)
652            preprocessed = Self::preprocess_tblproperties(&preprocessed);
653            // Step 3.5: Remove CLUSTER BY clause (sqlparser doesn't support it)
654            preprocessed = Self::preprocess_cluster_by(&preprocessed);
655            // Step 4: Replace IDENTIFIER() expressions
656            preprocessed = Self::preprocess_identifier_expressions(&preprocessed, &mut state);
657            // Step 5: Replace variable references in column definitions (e.g., "id :var STRING" -> "id STRING")
658            preprocessed = Self::replace_variables_in_column_definitions(&preprocessed);
659            // Step 6: Replace variable references in type definitions
660            // This replaces :variable_type with STRING in STRUCT and ARRAY types
661            preprocessed = Self::replace_nested_variables(&preprocessed);
662            // Step 7: Normalize SQL (handle multiline) before extraction
663            // Preserve quoted strings during normalization to avoid breaking COMMENT clauses
664            let normalized = Self::normalize_sql_preserving_quotes(&preprocessed);
665            // Step 7.5: Convert backslash-escaped quotes to SQL standard doubled quotes
666            // sqlparser doesn't support \' escape sequences, so convert to ''
667            let normalized = Self::convert_backslash_escaped_quotes(&normalized);
668            // Step 8: Extract STRUCT/ARRAY/MAP columns (sqlparser doesn't support them)
669            let (simplified_sql, complex_cols) = Self::extract_complex_type_columns(&normalized);
670            (simplified_sql, state, complex_cols)
671        } else {
672            (sql.to_string(), PreprocessingState::new(), Vec::new())
673        };
674
675        let dialect = self.dialect_impl();
676        let statements = match Parser::parse_sql(dialect.as_ref(), &preprocessed_sql) {
677            Ok(stmts) => stmts,
678            Err(e) => {
679                return Ok(ImportResult {
680                    tables: Vec::new(),
681                    tables_requiring_name: Vec::new(),
682                    errors: vec![ImportError::ParseError(e.to_string())],
683                    ai_suggestions: None,
684                });
685            }
686        };
687
688        let mut tables = Vec::new();
689        let mut errors = Vec::new();
690        let mut tables_requiring_name = Vec::new();
691
692        for (idx, stmt) in statements.into_iter().enumerate() {
693            match stmt {
694                Statement::CreateTable(create) => {
695                    match self.parse_create_table_with_preprocessing(
696                        idx,
697                        &create.name,
698                        &create.columns,
699                        &create.constraints,
700                        &preprocessing_state,
701                        &complex_types,
702                    ) {
703                        Ok((table, requires_name)) => {
704                            if requires_name {
705                                tables_requiring_name.push(super::TableRequiringName {
706                                    table_index: idx,
707                                    suggested_name: None,
708                                });
709                            }
710                            tables.push(table);
711                        }
712                        Err(e) => errors.push(ImportError::ParseError(e)),
713                    }
714                }
715                Statement::CreateView { name, .. } => {
716                    match self.parse_create_view(idx, &name, &preprocessing_state) {
717                        Ok((table, requires_name)) => {
718                            if requires_name {
719                                tables_requiring_name.push(super::TableRequiringName {
720                                    table_index: idx,
721                                    suggested_name: None,
722                                });
723                            }
724                            tables.push(table);
725                        }
726                        Err(e) => errors.push(ImportError::ParseError(e)),
727                    }
728                }
729                _ => {
730                    // Other statements (INSERT, UPDATE, DELETE, etc.) are ignored.
731                }
732            }
733        }
734
735        Ok(ImportResult {
736            tables,
737            tables_requiring_name,
738            errors,
739            ai_suggestions: None,
740        })
741    }
742
743    /// Parse SQL with Liquibase format support
744    ///
745    /// Strips Liquibase directive comments (--liquibase formatted sql, --changeset, etc.)
746    /// before parsing the SQL.
747    ///
748    /// # Arguments
749    ///
750    /// * `sql` - SQL string with optional Liquibase comments
751    ///
752    /// # Returns
753    ///
754    /// An `ImportResult` containing extracted tables.
755    ///
756    /// # Example
757    ///
758    /// ```rust
759    /// use data_modelling_sdk::import::sql::SQLImporter;
760    ///
761    /// let importer = SQLImporter::new("postgres");
762    /// let sql = r#"
763    /// --liquibase formatted sql
764    /// --changeset user:1
765    /// CREATE TABLE users (id INT);
766    /// "#;
767    /// let result = importer.parse_liquibase(sql).unwrap();
768    /// ```
769    pub fn parse_liquibase(&self, sql: &str) -> Result<ImportResult> {
770        // Liquibase "formatted SQL" is still SQL, but often includes directive comments like:
771        // --liquibase formatted sql
772        // --changeset user:id
773        // We strip those comment lines, then parse the remaining SQL.
774        let cleaned = sql
775            .lines()
776            .filter(|l| {
777                let t = l.trim_start();
778                if !t.starts_with("--") {
779                    return true;
780                }
781                // Keep regular SQL comments? For now, drop all -- lines to avoid parser issues.
782                false
783            })
784            .collect::<Vec<_>>()
785            .join("\n");
786
787        self.parse(&cleaned)
788    }
789
790    fn dialect_impl(&self) -> Box<dyn Dialect + Send + Sync> {
791        match self.dialect.to_lowercase().as_str() {
792            "postgres" | "postgresql" => Box::new(PostgreSqlDialect {}),
793            "mysql" => Box::new(MySqlDialect {}),
794            "sqlite" => Box::new(SQLiteDialect {}),
795            "databricks" => Box::new(DatabricksDialect {}),
796            _ => Box::new(GenericDialect {}),
797        }
798    }
799
800    fn object_name_to_string(name: &ObjectName) -> String {
801        // Use final identifier (supports schema-qualified names).
802        name.0
803            .last()
804            .map(|ident| ident.value.clone())
805            .unwrap_or_else(|| name.to_string())
806    }
807
808    fn parse_create_table_with_preprocessing(
809        &self,
810        table_index: usize,
811        name: &ObjectName,
812        columns: &[ColumnDef],
813        constraints: &[TableConstraint],
814        preprocessing_state: &PreprocessingState,
815        complex_types: &[(String, String)],
816    ) -> std::result::Result<(TableData, bool), String> {
817        let mut table_name = Self::object_name_to_string(name);
818        let mut requires_name = false;
819
820        // Check if this is a placeholder table name from IDENTIFIER() preprocessing
821        if table_name.starts_with("__databricks_table_")
822            && let Some(original_expr) =
823                preprocessing_state.identifier_replacements.get(&table_name)
824        {
825            // Try to extract table name from the original expression
826            if let Some(extracted_name) = Self::extract_identifier_table_name(original_expr) {
827                table_name = extracted_name;
828            } else {
829                // Expression contains only variables - mark as requiring name
830                requires_name = true;
831            }
832        }
833
834        // Validate table name (warnings are logged but don't fail import)
835        if let Err(e) = validate_table_name(&table_name) {
836            // Log warning but continue - imported SQL may have valid but unusual names
837            tracing::warn!("Table name validation warning: {}", e);
838        }
839
840        // Collect PK columns from table-level constraints.
841        let mut pk_cols = std::collections::HashSet::<String>::new();
842        for c in constraints {
843            if let TableConstraint::PrimaryKey { columns, .. } = c {
844                for col in columns {
845                    pk_cols.insert(col.value.clone());
846                }
847            }
848        }
849
850        let mut out_cols = Vec::new();
851        for col in columns {
852            let mut nullable = true;
853            let mut is_pk = false;
854
855            for opt_def in &col.options {
856                match &opt_def.option {
857                    ColumnOption::NotNull => nullable = false,
858                    ColumnOption::Null => nullable = true,
859                    ColumnOption::Unique { is_primary, .. } => {
860                        if *is_primary {
861                            is_pk = true;
862                        }
863                    }
864                    _ => {}
865                }
866            }
867
868            if pk_cols.contains(&col.name.value) {
869                is_pk = true;
870            }
871
872            let col_name = col.name.value.clone();
873            let mut data_type = col.data_type.to_string();
874            let mut description = None;
875
876            // Extract COMMENT clause from column options
877            for opt_def in &col.options {
878                if let ColumnOption::Comment(comment) = &opt_def.option {
879                    description = Some(comment.clone());
880                }
881            }
882
883            // Restore complex types (STRUCT/ARRAY) if this column was extracted
884            if let Some((_, original_type)) =
885                complex_types.iter().find(|(name, _)| name == &col_name)
886            {
887                data_type = original_type.clone();
888            }
889
890            // Validate column name and data type (warnings are logged but don't fail import)
891            if let Err(e) = validate_column_name(&col_name) {
892                tracing::warn!("Column name validation warning for '{}': {}", col_name, e);
893            }
894            if let Err(e) = validate_data_type(&data_type) {
895                tracing::warn!("Data type validation warning for '{}': {}", data_type, e);
896            }
897
898            out_cols.push(ColumnData {
899                name: col_name,
900                data_type,
901                nullable,
902                primary_key: is_pk,
903                description,
904                quality: None,
905                ref_path: None,
906                enum_values: None,
907            });
908        }
909
910        Ok((
911            TableData {
912                table_index,
913                name: Some(table_name),
914                columns: out_cols,
915            },
916            requires_name,
917        ))
918    }
919
920    #[allow(dead_code)] // Used by non-Databricks dialects
921    fn parse_create_table(
922        &self,
923        table_index: usize,
924        name: &ObjectName,
925        columns: &[ColumnDef],
926        constraints: &[TableConstraint],
927    ) -> std::result::Result<TableData, String> {
928        // Use empty preprocessing state for non-Databricks dialects
929        let empty_state = PreprocessingState::new();
930        self.parse_create_table_with_preprocessing(
931            table_index,
932            name,
933            columns,
934            constraints,
935            &empty_state,
936            &[],
937        )
938        .map(|(table, _)| table)
939    }
940
941    /// Parse CREATE VIEW statement
942    ///
943    /// Extracts view name and creates a TableData entry for the view.
944    /// Views are treated as table-like entities for data modeling purposes.
945    fn parse_create_view(
946        &self,
947        view_index: usize,
948        name: &ObjectName,
949        preprocessing_state: &PreprocessingState,
950    ) -> std::result::Result<(TableData, bool), String> {
951        let mut view_name = Self::object_name_to_string(name);
952        let mut requires_name = false;
953
954        // Check if this is a placeholder view name from IDENTIFIER() preprocessing
955        if view_name.starts_with("__databricks_table_")
956            && let Some(original_expr) = preprocessing_state.identifier_replacements.get(&view_name)
957        {
958            // Try to extract view name from the original expression
959            if let Some(extracted_name) = Self::extract_identifier_table_name(original_expr) {
960                view_name = extracted_name;
961            } else {
962                // Expression contains only variables - mark as requiring name
963                requires_name = true;
964            }
965        }
966
967        // Validate view name
968        if let Err(e) = validate_table_name(&view_name) {
969            tracing::warn!("View name validation warning: {}", e);
970        }
971
972        // Views don't have explicit column definitions in CREATE VIEW statements
973        // The columns are derived from the SELECT query, which we don't parse here
974        // So we create a view with empty columns - the actual columns would need
975        // to be extracted from the query if needed in the future
976        Ok((
977            TableData {
978                table_index: view_index,
979                name: Some(view_name),
980                columns: Vec::new(), // Views don't have explicit column definitions
981            },
982            requires_name,
983        ))
984    }
985}
986
987#[cfg(test)]
988mod tests {
989    use super::*;
990
991    #[test]
992    fn test_sql_importer_default() {
993        let importer = SQLImporter::default();
994        assert_eq!(importer.dialect, "generic");
995    }
996
997    #[test]
998    fn test_sql_importer_parse_basic() {
999        let importer = SQLImporter::new("postgres");
1000        let result = importer
1001            .parse("CREATE TABLE test (id INT PRIMARY KEY, name TEXT NOT NULL);")
1002            .unwrap();
1003        assert!(result.errors.is_empty());
1004        assert_eq!(result.tables.len(), 1);
1005        let t = &result.tables[0];
1006        assert_eq!(t.name.as_deref(), Some("test"));
1007        assert_eq!(t.columns.len(), 2);
1008        assert!(t.columns.iter().any(|c| c.name == "id" && c.primary_key));
1009        assert!(t.columns.iter().any(|c| c.name == "name" && !c.nullable));
1010    }
1011
1012    #[test]
1013    fn test_sql_importer_parse_table_pk_constraint() {
1014        let importer = SQLImporter::new("postgres");
1015        let result = importer
1016            .parse("CREATE TABLE t (id INT, name TEXT, CONSTRAINT pk PRIMARY KEY (id));")
1017            .unwrap();
1018        assert!(result.errors.is_empty());
1019        assert_eq!(result.tables.len(), 1);
1020        let t = &result.tables[0];
1021        assert!(t.columns.iter().any(|c| c.name == "id" && c.primary_key));
1022    }
1023
1024    #[test]
1025    fn test_sql_importer_parse_liquibase_formatted_sql() {
1026        let importer = SQLImporter::new("postgres");
1027        let result = importer
1028            .parse_liquibase(
1029                "--liquibase formatted sql\n--changeset user:1\nCREATE TABLE test (id INT);\n",
1030            )
1031            .unwrap();
1032        assert!(result.errors.is_empty());
1033        assert_eq!(result.tables.len(), 1);
1034    }
1035
1036    #[test]
1037    fn test_databricks_identifier_with_literal() {
1038        let importer = SQLImporter::new("databricks");
1039        let sql = "CREATE TABLE IDENTIFIER('test_table') (id STRING);";
1040        let result = importer.parse(sql).unwrap();
1041        assert!(result.errors.is_empty());
1042        assert_eq!(result.tables.len(), 1);
1043        assert_eq!(result.tables[0].name.as_deref(), Some("test_table"));
1044    }
1045
1046    #[test]
1047    fn test_databricks_identifier_with_variable() {
1048        let importer = SQLImporter::new("databricks");
1049        let sql = "CREATE TABLE IDENTIFIER(:table_name) (id STRING);";
1050        let result = importer.parse(sql).unwrap();
1051        // Should create placeholder table name and add to tables_requiring_name
1052        assert_eq!(result.tables.len(), 1);
1053        assert!(
1054            result.tables[0]
1055                .name
1056                .as_deref()
1057                .unwrap()
1058                .starts_with("__databricks_table_")
1059        );
1060        assert_eq!(result.tables_requiring_name.len(), 1);
1061    }
1062
1063    #[test]
1064    fn test_databricks_identifier_with_concatenation() {
1065        let importer = SQLImporter::new("databricks");
1066        let sql = "CREATE TABLE IDENTIFIER(:catalog || '.schema.table') (id STRING);";
1067        let result = importer.parse(sql).unwrap();
1068        assert!(result.errors.is_empty());
1069        assert_eq!(result.tables.len(), 1);
1070        // Should extract table name from concatenation
1071        assert_eq!(result.tables[0].name.as_deref(), Some("schema.table"));
1072    }
1073
1074    #[test]
1075    fn test_databricks_variable_in_struct() {
1076        let importer = SQLImporter::new("databricks");
1077        let sql = "CREATE TABLE example (metadata STRUCT<key: STRING, value: :variable_type, timestamp: TIMESTAMP>);";
1078        let result = importer.parse(sql).unwrap();
1079        if !result.errors.is_empty() {
1080            eprintln!("Parse errors: {:?}", result.errors);
1081        }
1082        assert!(result.errors.is_empty());
1083        assert_eq!(result.tables.len(), 1);
1084        // Variable should be replaced with STRING
1085        assert!(
1086            result.tables[0].columns[0]
1087                .data_type
1088                .contains("value: STRING")
1089        );
1090    }
1091
1092    #[test]
1093    fn test_databricks_variable_in_array() {
1094        let importer = SQLImporter::new("databricks");
1095        let sql = "CREATE TABLE example (items ARRAY<:element_type>);";
1096        let result = importer.parse(sql).unwrap();
1097        assert!(result.errors.is_empty());
1098        assert_eq!(result.tables.len(), 1);
1099        // Variable should be replaced with STRING
1100        assert_eq!(result.tables[0].columns[0].data_type, "ARRAY<STRING>");
1101    }
1102
1103    #[test]
1104    fn test_databricks_nested_variables() {
1105        let importer = SQLImporter::new("databricks");
1106        let sql = "CREATE TABLE example (rulesTriggered ARRAY<STRUCT<id: STRING, name: STRING, alertOperation: STRUCT<name: STRING, revert: :variable_type, timestamp: TIMESTAMP>>>);";
1107        let result = importer.parse(sql).unwrap();
1108        if !result.errors.is_empty() {
1109            eprintln!("Parse errors: {:?}", result.errors);
1110        }
1111        assert!(result.errors.is_empty());
1112        assert_eq!(result.tables.len(), 1);
1113        // Nested variable should be replaced with STRING
1114        assert!(
1115            result.tables[0].columns[0]
1116                .data_type
1117                .contains("revert: STRING")
1118        );
1119    }
1120
1121    #[test]
1122    fn test_databricks_comment_variable() {
1123        let importer = SQLImporter::new("databricks");
1124        let sql = "CREATE TABLE example (id STRING) COMMENT ':comment_variable';";
1125        let result = importer.parse(sql).unwrap();
1126        assert!(result.errors.is_empty());
1127        assert_eq!(result.tables.len(), 1);
1128    }
1129
1130    #[test]
1131    fn test_databricks_tblproperties_variable() {
1132        let importer = SQLImporter::new("databricks");
1133        let sql = "CREATE TABLE example (id STRING) TBLPROPERTIES ('key1' = ':variable_value', 'key2' = 'static_value');";
1134        let result = importer.parse(sql).unwrap();
1135        assert!(result.errors.is_empty());
1136        assert_eq!(result.tables.len(), 1);
1137    }
1138
1139    #[test]
1140    fn test_databricks_column_variable() {
1141        let importer = SQLImporter::new("databricks");
1142        // Test column definition with variable reference like "column_name :variable STRING"
1143        // This pattern may need preprocessing to remove the variable
1144        let sql = "CREATE TABLE example (id :id_var STRING, name :name_var STRING);";
1145        let result = importer.parse(sql).unwrap();
1146        assert!(result.errors.is_empty());
1147        assert_eq!(result.tables.len(), 1);
1148        assert_eq!(result.tables[0].columns.len(), 2);
1149    }
1150
1151    #[test]
1152    fn test_databricks_create_view() {
1153        let importer = SQLImporter::new("databricks");
1154        let sql = "CREATE VIEW example_view AS SELECT id, name FROM source_table;";
1155        let result = importer.parse(sql).unwrap();
1156        // Views should be imported as table-like entities
1157        assert!(result.errors.is_empty());
1158        assert_eq!(result.tables.len(), 1);
1159        assert_eq!(result.tables[0].name.as_deref(), Some("example_view"));
1160    }
1161
1162    #[test]
1163    fn test_databricks_view_with_identifier() {
1164        let importer = SQLImporter::new("databricks");
1165        let sql =
1166            "CREATE VIEW IDENTIFIER(:catalog || '.schema.view_name') AS SELECT * FROM table1;";
1167        let result = importer.parse(sql).unwrap();
1168        assert!(result.errors.is_empty());
1169        assert_eq!(result.tables.len(), 1);
1170        // Should extract view name from IDENTIFIER() expression
1171        assert_eq!(result.tables[0].name.as_deref(), Some("schema.view_name"));
1172    }
1173
1174    #[test]
1175    fn test_databricks_create_materialized_view() {
1176        let importer = SQLImporter::new("databricks");
1177        // MATERIALIZED VIEW is preprocessed to CREATE VIEW for sqlparser compatibility
1178        let sql = "CREATE MATERIALIZED VIEW mv_example AS SELECT id, name FROM source_table;";
1179        let result = importer.parse(sql).unwrap();
1180        assert!(result.errors.is_empty());
1181        assert_eq!(result.tables.len(), 1);
1182        assert_eq!(result.tables[0].name.as_deref(), Some("mv_example"));
1183    }
1184}