data_modelling_sdk/import/sql.rs
1//! SQL Import functionality
2//!
3//! Provides parsing of CREATE TABLE statements from various SQL dialects.
4//!
5//! Uses `sqlparser` to parse CREATE TABLE statements into SDK import primitives.
6//!
7//! # Validation
8//!
9//! All imported table and column names are validated for:
10//! - Valid identifier format
11//! - Maximum length limits
12//! - SQL reserved word detection
13
14use super::{ColumnData, ImportError, ImportResult, TableData};
15use crate::validation::input::{validate_column_name, validate_data_type, validate_table_name};
16use anyhow::Result;
17use sqlparser::ast::{ColumnDef, ColumnOption, ObjectName, Statement, TableConstraint};
18use sqlparser::dialect::{Dialect, GenericDialect, MySqlDialect, PostgreSqlDialect, SQLiteDialect};
19use sqlparser::parser::Parser;
20use std::collections::HashMap;
21
22/// Databricks SQL dialect implementation
23///
24/// Extends GenericDialect to support Databricks-specific syntax patterns:
25/// - Variable references (`:variable_name`) in identifiers
26/// - Backtick-quoted identifiers
27#[derive(Debug)]
28struct DatabricksDialect;
29
30impl Dialect for DatabricksDialect {
31 fn is_identifier_start(&self, ch: char) -> bool {
32 // Allow ':' as identifier start for variable references like :variable_name
33 ch.is_alphabetic() || ch == '_' || ch == ':'
34 }
35
36 fn is_identifier_part(&self, ch: char) -> bool {
37 // Allow ':' as identifier part for variable references
38 ch.is_alphanumeric() || ch == '_' || ch == ':'
39 }
40
41 fn is_delimited_identifier_start(&self, ch: char) -> bool {
42 // Support backtick-quoted identifiers (Databricks style)
43 ch == '"' || ch == '`' || ch == '['
44 }
45}
46
47/// Tracks preprocessing transformations applied to SQL
48#[derive(Debug)]
49struct PreprocessingState {
50 /// Maps placeholder table names to original IDENTIFIER() expressions
51 identifier_replacements: HashMap<String, String>,
52 /// Tracks variable references replaced in type definitions
53 #[allow(dead_code)] // Reserved for future use
54 variable_replacements: Vec<(String, String)>,
55}
56
57impl PreprocessingState {
58 fn new() -> Self {
59 Self {
60 identifier_replacements: HashMap::new(),
61 variable_replacements: Vec::new(),
62 }
63 }
64}
65
66/// SQL Importer - parses CREATE TABLE statements
67pub struct SQLImporter {
68 /// SQL dialect to use for parsing
69 pub dialect: String,
70}
71
72impl Default for SQLImporter {
73 fn default() -> Self {
74 Self {
75 dialect: "generic".to_string(),
76 }
77 }
78}
79
80impl SQLImporter {
81 /// Create a new SQL importer with the specified dialect
82 ///
83 /// # Arguments
84 ///
85 /// * `dialect` - SQL dialect name ("postgres", "mysql", "sqlite", "generic", "databricks")
86 ///
87 /// # Supported Dialects
88 ///
89 /// - **postgres** / **postgresql**: PostgreSQL dialect
90 /// - **mysql**: MySQL dialect
91 /// - **sqlite**: SQLite dialect
92 /// - **generic**: Generic SQL dialect (default)
93 /// - **databricks**: Databricks SQL dialect with support for:
94 /// - `IDENTIFIER()` function calls in table/view names
95 /// - Variable references (`:variable_name`) in type definitions, column definitions, and metadata
96 /// - `STRUCT` and `ARRAY` complex types
97 /// - `CREATE VIEW` and `CREATE MATERIALIZED VIEW` statements
98 ///
99 /// # Example
100 ///
101 /// ```rust
102 /// use data_modelling_sdk::import::sql::SQLImporter;
103 ///
104 /// // Standard SQL dialect
105 /// let importer = SQLImporter::new("postgres");
106 ///
107 /// // Databricks SQL dialect
108 /// let databricks_importer = SQLImporter::new("databricks");
109 /// ```
110 pub fn new(dialect: &str) -> Self {
111 Self {
112 dialect: dialect.to_string(),
113 }
114 }
115
116 /// Preprocess Databricks SQL to handle IDENTIFIER() expressions
117 ///
118 /// Replaces IDENTIFIER() function calls with placeholder table names
119 /// and tracks the original expressions for later extraction.
120 fn preprocess_identifier_expressions(sql: &str, state: &mut PreprocessingState) -> String {
121 use regex::Regex;
122
123 // Pattern to match IDENTIFIER(...) expressions
124 let re = Regex::new(r"(?i)IDENTIFIER\s*\(\s*([^)]+)\s*\)").unwrap();
125 let mut counter = 0;
126
127 re.replace_all(sql, |caps: ®ex::Captures| {
128 let expr = caps.get(1).map(|m| m.as_str()).unwrap_or("");
129 counter += 1;
130 let placeholder = format!("__databricks_table_{}__", counter);
131
132 // Store the mapping for later extraction
133 state
134 .identifier_replacements
135 .insert(placeholder.clone(), expr.to_string());
136
137 placeholder
138 })
139 .to_string()
140 }
141
142 /// Extract table name from IDENTIFIER() expression
143 ///
144 /// If the expression contains string literals, extracts and constructs the table name.
145 /// Returns None if expression contains only variables.
146 fn extract_identifier_table_name(expr: &str) -> Option<String> {
147 use regex::Regex;
148
149 // Check if expression contains string literals (single or double quoted)
150 let literal_re = Regex::new(r#"(?:'([^']*)'|"([^"]*)")"#).unwrap();
151 let mut parts = Vec::new();
152
153 // Extract all string literals
154 for cap in literal_re.captures_iter(expr) {
155 if let Some(m) = cap.get(1) {
156 parts.push(m.as_str().to_string());
157 } else if let Some(m) = cap.get(2) {
158 parts.push(m.as_str().to_string());
159 }
160 }
161
162 if parts.is_empty() {
163 // No literals found - expression contains only variables
164 return None;
165 }
166
167 // Join literals and remove leading/trailing dots
168 let result = parts.join("");
169 Some(result.trim_matches('.').to_string())
170 }
171
172 /// Handle IDENTIFIER() expressions containing only variables
173 ///
174 /// Creates placeholder table names and marks tables as requiring name resolution.
175 #[allow(dead_code)] // Reserved for future use
176 fn handle_identifier_variables(placeholder: &str, _expr: &str) -> String {
177 // Return the placeholder as-is - it will be marked in tables_requiring_name
178 placeholder.to_string()
179 }
180
181 /// Preprocess CREATE MATERIALIZED VIEW to CREATE VIEW
182 ///
183 /// sqlparser may not support MATERIALIZED VIEW directly, so we convert it to CREATE VIEW
184 /// This allows parsing to succeed while preserving the intent.
185 fn preprocess_materialized_views(sql: &str) -> String {
186 use regex::Regex;
187
188 // Replace CREATE MATERIALIZED VIEW with CREATE VIEW
189 let re = Regex::new(r"(?i)CREATE\s+MATERIALIZED\s+VIEW").unwrap();
190 re.replace_all(sql, "CREATE VIEW").to_string()
191 }
192
193 /// Preprocess table-level COMMENT clause removal
194 ///
195 /// sqlparser does not support table-level COMMENT clauses, so we remove them before parsing.
196 /// Handles both single-quoted and double-quoted COMMENT strings.
197 /// Table-level COMMENT appears after the closing parenthesis, not inside column definitions.
198 fn preprocess_table_comment(sql: &str) -> String {
199 use regex::Regex;
200
201 // Remove COMMENT clause that appears after closing parenthesis of CREATE TABLE
202 // Pattern: ) followed by whitespace, then COMMENT, then quoted string
203 // This ensures we only match table-level COMMENT, not column-level COMMENT
204 // Match: )\s+COMMENT\s+['"]...['"]
205 let re_single = Regex::new(r#"(?i)\)\s+COMMENT\s+'[^']*'"#).unwrap();
206 let re_double = Regex::new(r#"(?i)\)\s+COMMENT\s+"[^"]*""#).unwrap();
207
208 // Replace with just the closing parenthesis
209 let result = re_single.replace_all(sql, ")");
210 re_double.replace_all(&result, ")").to_string()
211 }
212
213 /// Preprocess TBLPROPERTIES clause removal
214 ///
215 /// sqlparser does not support TBLPROPERTIES, so we remove it before parsing.
216 /// This preserves the rest of the SQL structure while allowing parsing to succeed.
217 fn preprocess_tblproperties(sql: &str) -> String {
218 use regex::Regex;
219
220 // Remove TBLPROPERTIES clause (may span multiple lines)
221 // Pattern matches: TBLPROPERTIES ( ... ) where ... can contain nested parentheses
222 // We need to match balanced parentheses
223 let mut result = sql.to_string();
224 let re = Regex::new(r"(?i)TBLPROPERTIES\s*\(").unwrap();
225
226 // Find all TBLPROPERTIES occurrences and remove them with balanced parentheses
227 let mut search_start = 0;
228 while let Some(m) = re.find_at(&result, search_start) {
229 let start = m.start();
230 let mut pos = m.end();
231 let mut paren_count = 1;
232
233 // Find matching closing parenthesis (using byte positions)
234 let bytes = result.as_bytes();
235 while pos < bytes.len() && paren_count > 0 {
236 if let Some(ch) = result[pos..].chars().next() {
237 if ch == '(' {
238 paren_count += 1;
239 } else if ch == ')' {
240 paren_count -= 1;
241 }
242 pos += ch.len_utf8();
243 } else {
244 break;
245 }
246 }
247
248 if paren_count == 0 {
249 // Remove TBLPROPERTIES clause including the closing parenthesis
250 result.replace_range(start..pos, "");
251 search_start = start;
252 } else {
253 // Unbalanced parentheses, skip this match
254 search_start = pos;
255 }
256 }
257
258 result
259 }
260
261 /// Preprocess CLUSTER BY clause removal
262 ///
263 /// sqlparser does not support CLUSTER BY, so we remove it before parsing.
264 fn preprocess_cluster_by(sql: &str) -> String {
265 use regex::Regex;
266
267 // Remove CLUSTER BY clause (may have AUTO or column list)
268 // Pattern: CLUSTER BY followed by AUTO or column list
269 let re = Regex::new(r"(?i)\s+CLUSTER\s+BY\s+(?:AUTO|\([^)]*\)|[\w,\s]+)").unwrap();
270 re.replace_all(sql, "").to_string()
271 }
272
273 /// Normalize SQL while preserving quoted strings
274 ///
275 /// Converts multiline SQL to single line, but preserves quoted strings
276 /// (both single and double quotes) to avoid breaking COMMENT clauses and other string literals.
277 /// Handles escape sequences: `\'` (escaped quote) and `\\` (escaped backslash).
278 fn normalize_sql_preserving_quotes(sql: &str) -> String {
279 let mut result = String::with_capacity(sql.len());
280 let mut chars = sql.chars().peekable();
281 let mut in_single_quote = false;
282 let mut in_double_quote = false;
283 let mut last_char_was_space = false;
284
285 while let Some(ch) = chars.next() {
286 match ch {
287 '\\' if in_single_quote || in_double_quote => {
288 // Handle escape sequences inside quoted strings
289 // \' or \" or \\ - preserve the escape sequence
290 if let Some(&next_ch) = chars.peek() {
291 result.push(ch);
292 result.push(next_ch);
293 chars.next(); // Consume the escaped character
294 last_char_was_space = false;
295 } else {
296 // Backslash at end of string - just add it
297 result.push(ch);
298 last_char_was_space = false;
299 }
300 }
301 '\'' if !in_double_quote => {
302 // Check if this is an escaped quote (doubled quotes: '')
303 // In SQL standard, '' inside a string means a single quote
304 if in_single_quote && chars.peek() == Some(&'\'') {
305 // Doubled quote - this is an escaped quote, not the end of string
306 result.push(ch);
307 result.push(ch);
308 chars.next(); // Consume the second quote
309 last_char_was_space = false;
310 } else {
311 // Regular quote - toggle quote state
312 in_single_quote = !in_single_quote;
313 result.push(ch);
314 last_char_was_space = false;
315 }
316 }
317 '"' if !in_single_quote => {
318 // Check if this is an escaped quote (doubled quotes: "")
319 if in_double_quote && chars.peek() == Some(&'"') {
320 // Doubled quote - this is an escaped quote, not the end of string
321 result.push(ch);
322 result.push(ch);
323 chars.next(); // Consume the second quote
324 last_char_was_space = false;
325 } else {
326 // Regular quote - toggle quote state
327 in_double_quote = !in_double_quote;
328 result.push(ch);
329 last_char_was_space = false;
330 }
331 }
332 '\n' | '\r' => {
333 if in_single_quote || in_double_quote {
334 // Replace newlines inside quoted strings with space
335 // sqlparser doesn't support multiline string literals
336 if !last_char_was_space {
337 result.push(' ');
338 last_char_was_space = true;
339 }
340 } else {
341 // Replace newlines outside quotes with space
342 if !last_char_was_space {
343 result.push(' ');
344 last_char_was_space = true;
345 }
346 }
347 }
348 ' ' | '\t' => {
349 if in_single_quote || in_double_quote {
350 // Preserve spaces inside quoted strings
351 result.push(ch);
352 last_char_was_space = false;
353 } else {
354 // Collapse multiple spaces to single space
355 if !last_char_was_space {
356 result.push(' ');
357 last_char_was_space = true;
358 }
359 }
360 }
361 '-' if !in_single_quote && !in_double_quote => {
362 // Check for SQL comment (--)
363 if let Some(&'-') = chars.peek() {
364 // Skip rest of line comment
365 for c in chars.by_ref() {
366 if c == '\n' || c == '\r' {
367 break;
368 }
369 }
370 if !last_char_was_space {
371 result.push(' ');
372 last_char_was_space = true;
373 }
374 } else {
375 result.push(ch);
376 last_char_was_space = false;
377 }
378 }
379 _ => {
380 result.push(ch);
381 last_char_was_space = false;
382 }
383 }
384 }
385
386 result.trim().to_string()
387 }
388
389 /// Convert backslash-escaped quotes to SQL standard doubled quotes
390 ///
391 /// sqlparser doesn't support `\'` escape sequences, so we convert them to `''`
392 /// which is the SQL standard way to escape quotes in string literals.
393 fn convert_backslash_escaped_quotes(sql: &str) -> String {
394 let mut result = String::with_capacity(sql.len());
395 let mut chars = sql.chars().peekable();
396 let mut in_single_quote = false;
397 let mut in_double_quote = false;
398
399 while let Some(ch) = chars.next() {
400 match ch {
401 '\\' if (in_single_quote || in_double_quote) => {
402 // Handle escape sequences inside quoted strings
403 if let Some(&next_ch) = chars.peek() {
404 match next_ch {
405 '\'' if in_single_quote => {
406 // Convert \' to '' (SQL standard escaped quote)
407 result.push_str("''");
408 chars.next(); // Consume the escaped quote
409 }
410 '"' if in_double_quote => {
411 // Convert \" to "" (SQL standard escaped quote)
412 result.push_str("\"\"");
413 chars.next(); // Consume the escaped quote
414 }
415 '\\' => {
416 // Convert \\ to \\ (keep as is, but we need to handle it)
417 result.push('\\');
418 result.push('\\');
419 chars.next(); // Consume the escaped backslash
420 }
421 _ => {
422 // Other escape sequences - preserve as is
423 result.push(ch);
424 result.push(next_ch);
425 chars.next();
426 }
427 }
428 } else {
429 // Backslash at end - just add it
430 result.push(ch);
431 }
432 }
433 '\'' if !in_double_quote => {
434 in_single_quote = !in_single_quote;
435 result.push(ch);
436 }
437 '"' if !in_single_quote => {
438 in_double_quote = !in_double_quote;
439 result.push(ch);
440 }
441 _ => {
442 result.push(ch);
443 }
444 }
445 }
446
447 result
448 }
449
450 /// Replace variable references in STRUCT field types with STRING
451 ///
452 /// Handles patterns like STRUCT<field: :variable_type> -> STRUCT<field: STRING>
453 fn replace_variables_in_struct_types(sql: &str) -> String {
454 use regex::Regex;
455
456 // Pattern to match :variable_type in STRUCT field type definitions
457 // Matches: :variable_name where it appears after a colon (field: :variable)
458 // The pattern looks for colon, optional whitespace, colon, then variable name
459 let re = Regex::new(r":\s*:([a-zA-Z_][a-zA-Z0-9_]*)").unwrap();
460
461 re.replace_all(sql, |_caps: ®ex::Captures| {
462 // Replace :variable_name with STRING
463 ": STRING".to_string()
464 })
465 .to_string()
466 }
467
468 /// Replace variable references in ARRAY element types with STRING
469 ///
470 /// Handles patterns like ARRAY<:element_type> -> ARRAY<STRING>
471 fn replace_variables_in_array_types(sql: &str) -> String {
472 use regex::Regex;
473
474 // Pattern to match ARRAY<:variable_type> (but not ARRAY<STRUCT<...>>)
475 // This is tricky - we need to avoid matching inside STRUCT definitions
476 // Simple approach: match ARRAY<:variable> where variable is not STRUCT
477 let re = Regex::new(r"ARRAY\s*<\s*:([a-zA-Z_][a-zA-Z0-9_]*)\s*>").unwrap();
478
479 re.replace_all(sql, |_caps: ®ex::Captures| "ARRAY<STRING>".to_string())
480 .to_string()
481 }
482
483 /// Replace variable references in column definitions
484 ///
485 /// Handles patterns like `column_name :variable STRING` by removing the variable reference.
486 /// Example: `id :id_var STRING` -> `id STRING`
487 fn replace_variables_in_column_definitions(sql: &str) -> String {
488 use regex::Regex;
489
490 // Pattern to match column definitions with variable references
491 // Matches: word(s) :variable_name TYPE
492 // Example: "id :id_var STRING" -> "id STRING"
493 let re = Regex::new(r"(\w+)\s+:\w+\s+([A-Z][A-Z0-9_]*(?:<[^>]*>)?)").unwrap();
494
495 re.replace_all(sql, |caps: ®ex::Captures| {
496 let col_name = caps.get(1).map(|m| m.as_str()).unwrap_or("");
497 let type_name = caps.get(2).map(|m| m.as_str()).unwrap_or("");
498 format!("{} {}", col_name, type_name)
499 })
500 .to_string()
501 }
502
503 /// Replace nested variable references recursively
504 ///
505 /// Handles patterns like ARRAY<STRUCT<field: :type>> by applying both
506 /// STRUCT and ARRAY replacements recursively.
507 fn replace_nested_variables(sql: &str) -> String {
508 let mut result = sql.to_string();
509 let mut changed = true;
510 let mut iterations = 0;
511 const MAX_ITERATIONS: usize = 10; // Prevent infinite loops
512
513 // Keep applying replacements until no more changes occur
514 while changed && iterations < MAX_ITERATIONS {
515 let before = result.clone();
516
517 // First replace variables in STRUCT types
518 result = Self::replace_variables_in_struct_types(&result);
519
520 // Then replace variables in ARRAY types
521 result = Self::replace_variables_in_array_types(&result);
522
523 // Check if anything changed
524 changed = before != result;
525 iterations += 1;
526 }
527
528 result
529 }
530
531 /// Extract STRUCT, ARRAY, and MAP column definitions and replace with placeholders
532 ///
533 /// sqlparser doesn't support these complex types, so we need to extract them manually
534 /// and replace with a simple type that can be parsed, then restore the original
535 /// type string after parsing.
536 ///
537 /// Assumes SQL is already normalized (single line, single spaces).
538 fn extract_complex_type_columns(sql: &str) -> (String, Vec<(String, String)>) {
539 use regex::Regex;
540
541 let mut column_types = Vec::new();
542 let mut result = sql.to_string();
543
544 // Use regex to find all STRUCT<...>, ARRAY<...>, and MAP<...> patterns with their preceding column names
545 // Pattern: word(s) followed by STRUCT<, ARRAY<, or MAP<, then match balanced brackets
546 let re = Regex::new(r"(\w+)\s+(STRUCT<|ARRAY<|MAP<)").unwrap();
547
548 // Find all matches and extract the full type
549 let mut matches_to_replace: Vec<(usize, usize, String, String)> = Vec::new();
550
551 for cap in re.captures_iter(sql) {
552 let col_name = cap.get(1).map(|m| m.as_str()).unwrap_or("");
553 let type_start = cap.get(0).map(|m| m.start()).unwrap_or(0);
554 let struct_or_array = cap.get(2).map(|m| m.as_str()).unwrap_or("");
555
556 // Find the matching closing bracket
557 // Start counting from the '<' in STRUCT<, ARRAY<, or MAP<
558 let bracket_start = type_start + col_name.len() + 1 + struct_or_array.len() - 1; // After "column_name STRUCT<", "column_name ARRAY<", or "column_name MAP<"
559 let mut bracket_count = 0;
560 let mut type_end = bracket_start;
561
562 for (idx, ch) in sql[bracket_start..].char_indices() {
563 let pos = bracket_start + idx;
564 if ch == '<' {
565 bracket_count += 1;
566 } else if ch == '>' {
567 bracket_count -= 1;
568 if bracket_count == 0 {
569 type_end = pos + 1;
570 break;
571 }
572 }
573 }
574
575 if bracket_count == 0 && type_end > type_start {
576 // Extract the full type (STRUCT<...>, ARRAY<...>, or MAP<...>)
577 // Start from after the column name and space
578 let type_start_pos = type_start + col_name.len() + 1;
579 let full_type = sql[type_start_pos..type_end].trim().to_string();
580 matches_to_replace.push((
581 type_start_pos,
582 type_end,
583 col_name.to_string(),
584 full_type,
585 ));
586 }
587 }
588
589 // Replace matches in reverse order
590 for (start, end, col_name, full_type) in matches_to_replace.iter().rev() {
591 column_types.push((col_name.clone(), full_type.clone()));
592 result.replace_range(*start..*end, "STRING");
593 }
594
595 (result, column_types)
596 }
597
598 /// Parse SQL and extract table definitions
599 ///
600 /// # Arguments
601 ///
602 /// * `sql` - SQL string containing CREATE TABLE, CREATE VIEW, or CREATE MATERIALIZED VIEW statements
603 ///
604 /// # Returns
605 ///
606 /// An `ImportResult` containing extracted tables/views and any parse errors.
607 ///
608 /// # Example - Standard SQL
609 ///
610 /// ```rust
611 /// use data_modelling_sdk::import::sql::SQLImporter;
612 ///
613 /// let importer = SQLImporter::new("postgres");
614 /// let sql = "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(100));";
615 /// let result = importer.parse(sql).unwrap();
616 /// assert_eq!(result.tables.len(), 1);
617 /// ```
618 ///
619 /// # Example - Databricks SQL with IDENTIFIER()
620 ///
621 /// ```rust
622 /// use data_modelling_sdk::import::sql::SQLImporter;
623 ///
624 /// let importer = SQLImporter::new("databricks");
625 /// let sql = "CREATE TABLE IDENTIFIER(:catalog || '.schema.table') (id STRING, name STRING);";
626 /// let result = importer.parse(sql).unwrap();
627 /// assert_eq!(result.tables.len(), 1);
628 /// assert_eq!(result.tables[0].name.as_deref(), Some("schema.table"));
629 /// ```
630 ///
631 /// # Example - Databricks SQL with Views
632 ///
633 /// ```rust
634 /// use data_modelling_sdk::import::sql::SQLImporter;
635 ///
636 /// let importer = SQLImporter::new("databricks");
637 /// let sql = "CREATE VIEW example_view AS SELECT * FROM table1;";
638 /// let result = importer.parse(sql).unwrap();
639 /// assert_eq!(result.tables.len(), 1);
640 /// ```
641 pub fn parse(&self, sql: &str) -> Result<ImportResult> {
642 // Preprocess SQL if Databricks dialect
643 let (preprocessed_sql, preprocessing_state, complex_types) = if self.dialect.to_lowercase()
644 == "databricks"
645 {
646 let mut state = PreprocessingState::new();
647 // Step 1: Preprocess MATERIALIZED VIEW (convert to CREATE VIEW for sqlparser compatibility)
648 let mut preprocessed = Self::preprocess_materialized_views(sql);
649 // Step 2: Remove table-level COMMENT clauses (sqlparser doesn't support them)
650 preprocessed = Self::preprocess_table_comment(&preprocessed);
651 // Step 3: Remove TBLPROPERTIES (sqlparser doesn't support it)
652 preprocessed = Self::preprocess_tblproperties(&preprocessed);
653 // Step 3.5: Remove CLUSTER BY clause (sqlparser doesn't support it)
654 preprocessed = Self::preprocess_cluster_by(&preprocessed);
655 // Step 4: Replace IDENTIFIER() expressions
656 preprocessed = Self::preprocess_identifier_expressions(&preprocessed, &mut state);
657 // Step 5: Replace variable references in column definitions (e.g., "id :var STRING" -> "id STRING")
658 preprocessed = Self::replace_variables_in_column_definitions(&preprocessed);
659 // Step 6: Replace variable references in type definitions
660 // This replaces :variable_type with STRING in STRUCT and ARRAY types
661 preprocessed = Self::replace_nested_variables(&preprocessed);
662 // Step 7: Normalize SQL (handle multiline) before extraction
663 // Preserve quoted strings during normalization to avoid breaking COMMENT clauses
664 let normalized = Self::normalize_sql_preserving_quotes(&preprocessed);
665 // Step 7.5: Convert backslash-escaped quotes to SQL standard doubled quotes
666 // sqlparser doesn't support \' escape sequences, so convert to ''
667 let normalized = Self::convert_backslash_escaped_quotes(&normalized);
668 // Step 8: Extract STRUCT/ARRAY/MAP columns (sqlparser doesn't support them)
669 let (simplified_sql, complex_cols) = Self::extract_complex_type_columns(&normalized);
670 (simplified_sql, state, complex_cols)
671 } else {
672 (sql.to_string(), PreprocessingState::new(), Vec::new())
673 };
674
675 let dialect = self.dialect_impl();
676 let statements = match Parser::parse_sql(dialect.as_ref(), &preprocessed_sql) {
677 Ok(stmts) => stmts,
678 Err(e) => {
679 return Ok(ImportResult {
680 tables: Vec::new(),
681 tables_requiring_name: Vec::new(),
682 errors: vec![ImportError::ParseError(e.to_string())],
683 ai_suggestions: None,
684 });
685 }
686 };
687
688 let mut tables = Vec::new();
689 let mut errors = Vec::new();
690 let mut tables_requiring_name = Vec::new();
691
692 for (idx, stmt) in statements.into_iter().enumerate() {
693 match stmt {
694 Statement::CreateTable(create) => {
695 match self.parse_create_table_with_preprocessing(
696 idx,
697 &create.name,
698 &create.columns,
699 &create.constraints,
700 &preprocessing_state,
701 &complex_types,
702 ) {
703 Ok((table, requires_name)) => {
704 if requires_name {
705 tables_requiring_name.push(super::TableRequiringName {
706 table_index: idx,
707 suggested_name: None,
708 });
709 }
710 tables.push(table);
711 }
712 Err(e) => errors.push(ImportError::ParseError(e)),
713 }
714 }
715 Statement::CreateView { name, .. } => {
716 match self.parse_create_view(idx, &name, &preprocessing_state) {
717 Ok((table, requires_name)) => {
718 if requires_name {
719 tables_requiring_name.push(super::TableRequiringName {
720 table_index: idx,
721 suggested_name: None,
722 });
723 }
724 tables.push(table);
725 }
726 Err(e) => errors.push(ImportError::ParseError(e)),
727 }
728 }
729 _ => {
730 // Other statements (INSERT, UPDATE, DELETE, etc.) are ignored.
731 }
732 }
733 }
734
735 Ok(ImportResult {
736 tables,
737 tables_requiring_name,
738 errors,
739 ai_suggestions: None,
740 })
741 }
742
743 /// Parse SQL with Liquibase format support
744 ///
745 /// Strips Liquibase directive comments (--liquibase formatted sql, --changeset, etc.)
746 /// before parsing the SQL.
747 ///
748 /// # Arguments
749 ///
750 /// * `sql` - SQL string with optional Liquibase comments
751 ///
752 /// # Returns
753 ///
754 /// An `ImportResult` containing extracted tables.
755 ///
756 /// # Example
757 ///
758 /// ```rust
759 /// use data_modelling_sdk::import::sql::SQLImporter;
760 ///
761 /// let importer = SQLImporter::new("postgres");
762 /// let sql = r#"
763 /// --liquibase formatted sql
764 /// --changeset user:1
765 /// CREATE TABLE users (id INT);
766 /// "#;
767 /// let result = importer.parse_liquibase(sql).unwrap();
768 /// ```
769 pub fn parse_liquibase(&self, sql: &str) -> Result<ImportResult> {
770 // Liquibase "formatted SQL" is still SQL, but often includes directive comments like:
771 // --liquibase formatted sql
772 // --changeset user:id
773 // We strip those comment lines, then parse the remaining SQL.
774 let cleaned = sql
775 .lines()
776 .filter(|l| {
777 let t = l.trim_start();
778 if !t.starts_with("--") {
779 return true;
780 }
781 // Keep regular SQL comments? For now, drop all -- lines to avoid parser issues.
782 false
783 })
784 .collect::<Vec<_>>()
785 .join("\n");
786
787 self.parse(&cleaned)
788 }
789
790 fn dialect_impl(&self) -> Box<dyn Dialect + Send + Sync> {
791 match self.dialect.to_lowercase().as_str() {
792 "postgres" | "postgresql" => Box::new(PostgreSqlDialect {}),
793 "mysql" => Box::new(MySqlDialect {}),
794 "sqlite" => Box::new(SQLiteDialect {}),
795 "databricks" => Box::new(DatabricksDialect {}),
796 _ => Box::new(GenericDialect {}),
797 }
798 }
799
800 fn object_name_to_string(name: &ObjectName) -> String {
801 // Use final identifier (supports schema-qualified names).
802 name.0
803 .last()
804 .map(|ident| ident.value.clone())
805 .unwrap_or_else(|| name.to_string())
806 }
807
808 fn parse_create_table_with_preprocessing(
809 &self,
810 table_index: usize,
811 name: &ObjectName,
812 columns: &[ColumnDef],
813 constraints: &[TableConstraint],
814 preprocessing_state: &PreprocessingState,
815 complex_types: &[(String, String)],
816 ) -> std::result::Result<(TableData, bool), String> {
817 let mut table_name = Self::object_name_to_string(name);
818 let mut requires_name = false;
819
820 // Check if this is a placeholder table name from IDENTIFIER() preprocessing
821 if table_name.starts_with("__databricks_table_")
822 && let Some(original_expr) =
823 preprocessing_state.identifier_replacements.get(&table_name)
824 {
825 // Try to extract table name from the original expression
826 if let Some(extracted_name) = Self::extract_identifier_table_name(original_expr) {
827 table_name = extracted_name;
828 } else {
829 // Expression contains only variables - mark as requiring name
830 requires_name = true;
831 }
832 }
833
834 // Validate table name (warnings are logged but don't fail import)
835 if let Err(e) = validate_table_name(&table_name) {
836 // Log warning but continue - imported SQL may have valid but unusual names
837 tracing::warn!("Table name validation warning: {}", e);
838 }
839
840 // Collect PK columns from table-level constraints.
841 let mut pk_cols = std::collections::HashSet::<String>::new();
842 for c in constraints {
843 if let TableConstraint::PrimaryKey { columns, .. } = c {
844 for col in columns {
845 pk_cols.insert(col.value.clone());
846 }
847 }
848 }
849
850 let mut out_cols = Vec::new();
851 for col in columns {
852 let mut nullable = true;
853 let mut is_pk = false;
854
855 for opt_def in &col.options {
856 match &opt_def.option {
857 ColumnOption::NotNull => nullable = false,
858 ColumnOption::Null => nullable = true,
859 ColumnOption::Unique { is_primary, .. } => {
860 if *is_primary {
861 is_pk = true;
862 }
863 }
864 _ => {}
865 }
866 }
867
868 if pk_cols.contains(&col.name.value) {
869 is_pk = true;
870 }
871
872 let col_name = col.name.value.clone();
873 let mut data_type = col.data_type.to_string();
874 let mut description = None;
875
876 // Extract COMMENT clause from column options
877 for opt_def in &col.options {
878 if let ColumnOption::Comment(comment) = &opt_def.option {
879 description = Some(comment.clone());
880 }
881 }
882
883 // Restore complex types (STRUCT/ARRAY) if this column was extracted
884 if let Some((_, original_type)) =
885 complex_types.iter().find(|(name, _)| name == &col_name)
886 {
887 data_type = original_type.clone();
888 }
889
890 // Validate column name and data type (warnings are logged but don't fail import)
891 if let Err(e) = validate_column_name(&col_name) {
892 tracing::warn!("Column name validation warning for '{}': {}", col_name, e);
893 }
894 if let Err(e) = validate_data_type(&data_type) {
895 tracing::warn!("Data type validation warning for '{}': {}", data_type, e);
896 }
897
898 out_cols.push(ColumnData {
899 name: col_name,
900 data_type,
901 nullable,
902 primary_key: is_pk,
903 description,
904 quality: None,
905 ref_path: None,
906 enum_values: None,
907 });
908 }
909
910 Ok((
911 TableData {
912 table_index,
913 name: Some(table_name),
914 columns: out_cols,
915 },
916 requires_name,
917 ))
918 }
919
920 #[allow(dead_code)] // Used by non-Databricks dialects
921 fn parse_create_table(
922 &self,
923 table_index: usize,
924 name: &ObjectName,
925 columns: &[ColumnDef],
926 constraints: &[TableConstraint],
927 ) -> std::result::Result<TableData, String> {
928 // Use empty preprocessing state for non-Databricks dialects
929 let empty_state = PreprocessingState::new();
930 self.parse_create_table_with_preprocessing(
931 table_index,
932 name,
933 columns,
934 constraints,
935 &empty_state,
936 &[],
937 )
938 .map(|(table, _)| table)
939 }
940
941 /// Parse CREATE VIEW statement
942 ///
943 /// Extracts view name and creates a TableData entry for the view.
944 /// Views are treated as table-like entities for data modeling purposes.
945 fn parse_create_view(
946 &self,
947 view_index: usize,
948 name: &ObjectName,
949 preprocessing_state: &PreprocessingState,
950 ) -> std::result::Result<(TableData, bool), String> {
951 let mut view_name = Self::object_name_to_string(name);
952 let mut requires_name = false;
953
954 // Check if this is a placeholder view name from IDENTIFIER() preprocessing
955 if view_name.starts_with("__databricks_table_")
956 && let Some(original_expr) = preprocessing_state.identifier_replacements.get(&view_name)
957 {
958 // Try to extract view name from the original expression
959 if let Some(extracted_name) = Self::extract_identifier_table_name(original_expr) {
960 view_name = extracted_name;
961 } else {
962 // Expression contains only variables - mark as requiring name
963 requires_name = true;
964 }
965 }
966
967 // Validate view name
968 if let Err(e) = validate_table_name(&view_name) {
969 tracing::warn!("View name validation warning: {}", e);
970 }
971
972 // Views don't have explicit column definitions in CREATE VIEW statements
973 // The columns are derived from the SELECT query, which we don't parse here
974 // So we create a view with empty columns - the actual columns would need
975 // to be extracted from the query if needed in the future
976 Ok((
977 TableData {
978 table_index: view_index,
979 name: Some(view_name),
980 columns: Vec::new(), // Views don't have explicit column definitions
981 },
982 requires_name,
983 ))
984 }
985}
986
987#[cfg(test)]
988mod tests {
989 use super::*;
990
991 #[test]
992 fn test_sql_importer_default() {
993 let importer = SQLImporter::default();
994 assert_eq!(importer.dialect, "generic");
995 }
996
997 #[test]
998 fn test_sql_importer_parse_basic() {
999 let importer = SQLImporter::new("postgres");
1000 let result = importer
1001 .parse("CREATE TABLE test (id INT PRIMARY KEY, name TEXT NOT NULL);")
1002 .unwrap();
1003 assert!(result.errors.is_empty());
1004 assert_eq!(result.tables.len(), 1);
1005 let t = &result.tables[0];
1006 assert_eq!(t.name.as_deref(), Some("test"));
1007 assert_eq!(t.columns.len(), 2);
1008 assert!(t.columns.iter().any(|c| c.name == "id" && c.primary_key));
1009 assert!(t.columns.iter().any(|c| c.name == "name" && !c.nullable));
1010 }
1011
1012 #[test]
1013 fn test_sql_importer_parse_table_pk_constraint() {
1014 let importer = SQLImporter::new("postgres");
1015 let result = importer
1016 .parse("CREATE TABLE t (id INT, name TEXT, CONSTRAINT pk PRIMARY KEY (id));")
1017 .unwrap();
1018 assert!(result.errors.is_empty());
1019 assert_eq!(result.tables.len(), 1);
1020 let t = &result.tables[0];
1021 assert!(t.columns.iter().any(|c| c.name == "id" && c.primary_key));
1022 }
1023
1024 #[test]
1025 fn test_sql_importer_parse_liquibase_formatted_sql() {
1026 let importer = SQLImporter::new("postgres");
1027 let result = importer
1028 .parse_liquibase(
1029 "--liquibase formatted sql\n--changeset user:1\nCREATE TABLE test (id INT);\n",
1030 )
1031 .unwrap();
1032 assert!(result.errors.is_empty());
1033 assert_eq!(result.tables.len(), 1);
1034 }
1035
1036 #[test]
1037 fn test_databricks_identifier_with_literal() {
1038 let importer = SQLImporter::new("databricks");
1039 let sql = "CREATE TABLE IDENTIFIER('test_table') (id STRING);";
1040 let result = importer.parse(sql).unwrap();
1041 assert!(result.errors.is_empty());
1042 assert_eq!(result.tables.len(), 1);
1043 assert_eq!(result.tables[0].name.as_deref(), Some("test_table"));
1044 }
1045
1046 #[test]
1047 fn test_databricks_identifier_with_variable() {
1048 let importer = SQLImporter::new("databricks");
1049 let sql = "CREATE TABLE IDENTIFIER(:table_name) (id STRING);";
1050 let result = importer.parse(sql).unwrap();
1051 // Should create placeholder table name and add to tables_requiring_name
1052 assert_eq!(result.tables.len(), 1);
1053 assert!(
1054 result.tables[0]
1055 .name
1056 .as_deref()
1057 .unwrap()
1058 .starts_with("__databricks_table_")
1059 );
1060 assert_eq!(result.tables_requiring_name.len(), 1);
1061 }
1062
1063 #[test]
1064 fn test_databricks_identifier_with_concatenation() {
1065 let importer = SQLImporter::new("databricks");
1066 let sql = "CREATE TABLE IDENTIFIER(:catalog || '.schema.table') (id STRING);";
1067 let result = importer.parse(sql).unwrap();
1068 assert!(result.errors.is_empty());
1069 assert_eq!(result.tables.len(), 1);
1070 // Should extract table name from concatenation
1071 assert_eq!(result.tables[0].name.as_deref(), Some("schema.table"));
1072 }
1073
1074 #[test]
1075 fn test_databricks_variable_in_struct() {
1076 let importer = SQLImporter::new("databricks");
1077 let sql = "CREATE TABLE example (metadata STRUCT<key: STRING, value: :variable_type, timestamp: TIMESTAMP>);";
1078 let result = importer.parse(sql).unwrap();
1079 if !result.errors.is_empty() {
1080 eprintln!("Parse errors: {:?}", result.errors);
1081 }
1082 assert!(result.errors.is_empty());
1083 assert_eq!(result.tables.len(), 1);
1084 // Variable should be replaced with STRING
1085 assert!(
1086 result.tables[0].columns[0]
1087 .data_type
1088 .contains("value: STRING")
1089 );
1090 }
1091
1092 #[test]
1093 fn test_databricks_variable_in_array() {
1094 let importer = SQLImporter::new("databricks");
1095 let sql = "CREATE TABLE example (items ARRAY<:element_type>);";
1096 let result = importer.parse(sql).unwrap();
1097 assert!(result.errors.is_empty());
1098 assert_eq!(result.tables.len(), 1);
1099 // Variable should be replaced with STRING
1100 assert_eq!(result.tables[0].columns[0].data_type, "ARRAY<STRING>");
1101 }
1102
1103 #[test]
1104 fn test_databricks_nested_variables() {
1105 let importer = SQLImporter::new("databricks");
1106 let sql = "CREATE TABLE example (rulesTriggered ARRAY<STRUCT<id: STRING, name: STRING, alertOperation: STRUCT<name: STRING, revert: :variable_type, timestamp: TIMESTAMP>>>);";
1107 let result = importer.parse(sql).unwrap();
1108 if !result.errors.is_empty() {
1109 eprintln!("Parse errors: {:?}", result.errors);
1110 }
1111 assert!(result.errors.is_empty());
1112 assert_eq!(result.tables.len(), 1);
1113 // Nested variable should be replaced with STRING
1114 assert!(
1115 result.tables[0].columns[0]
1116 .data_type
1117 .contains("revert: STRING")
1118 );
1119 }
1120
1121 #[test]
1122 fn test_databricks_comment_variable() {
1123 let importer = SQLImporter::new("databricks");
1124 let sql = "CREATE TABLE example (id STRING) COMMENT ':comment_variable';";
1125 let result = importer.parse(sql).unwrap();
1126 assert!(result.errors.is_empty());
1127 assert_eq!(result.tables.len(), 1);
1128 }
1129
1130 #[test]
1131 fn test_databricks_tblproperties_variable() {
1132 let importer = SQLImporter::new("databricks");
1133 let sql = "CREATE TABLE example (id STRING) TBLPROPERTIES ('key1' = ':variable_value', 'key2' = 'static_value');";
1134 let result = importer.parse(sql).unwrap();
1135 assert!(result.errors.is_empty());
1136 assert_eq!(result.tables.len(), 1);
1137 }
1138
1139 #[test]
1140 fn test_databricks_column_variable() {
1141 let importer = SQLImporter::new("databricks");
1142 // Test column definition with variable reference like "column_name :variable STRING"
1143 // This pattern may need preprocessing to remove the variable
1144 let sql = "CREATE TABLE example (id :id_var STRING, name :name_var STRING);";
1145 let result = importer.parse(sql).unwrap();
1146 assert!(result.errors.is_empty());
1147 assert_eq!(result.tables.len(), 1);
1148 assert_eq!(result.tables[0].columns.len(), 2);
1149 }
1150
1151 #[test]
1152 fn test_databricks_create_view() {
1153 let importer = SQLImporter::new("databricks");
1154 let sql = "CREATE VIEW example_view AS SELECT id, name FROM source_table;";
1155 let result = importer.parse(sql).unwrap();
1156 // Views should be imported as table-like entities
1157 assert!(result.errors.is_empty());
1158 assert_eq!(result.tables.len(), 1);
1159 assert_eq!(result.tables[0].name.as_deref(), Some("example_view"));
1160 }
1161
1162 #[test]
1163 fn test_databricks_view_with_identifier() {
1164 let importer = SQLImporter::new("databricks");
1165 let sql =
1166 "CREATE VIEW IDENTIFIER(:catalog || '.schema.view_name') AS SELECT * FROM table1;";
1167 let result = importer.parse(sql).unwrap();
1168 assert!(result.errors.is_empty());
1169 assert_eq!(result.tables.len(), 1);
1170 // Should extract view name from IDENTIFIER() expression
1171 assert_eq!(result.tables[0].name.as_deref(), Some("schema.view_name"));
1172 }
1173
1174 #[test]
1175 fn test_databricks_create_materialized_view() {
1176 let importer = SQLImporter::new("databricks");
1177 // MATERIALIZED VIEW is preprocessed to CREATE VIEW for sqlparser compatibility
1178 let sql = "CREATE MATERIALIZED VIEW mv_example AS SELECT id, name FROM source_table;";
1179 let result = importer.parse(sql).unwrap();
1180 assert!(result.errors.is_empty());
1181 assert_eq!(result.tables.len(), 1);
1182 assert_eq!(result.tables[0].name.as_deref(), Some("mv_example"));
1183 }
1184}