Skip to main content

sql_splitter/schema/
ddl.rs

1//! MySQL DDL parsing for schema extraction.
2//!
3//! Parses CREATE TABLE and ALTER TABLE statements to extract:
4//! - Column definitions with types
5//! - Primary key constraints
6//! - Foreign key constraints
7
8use super::{Column, ColumnId, ColumnType, ForeignKey, IndexDef, Schema, TableId, TableSchema};
9use once_cell::sync::Lazy;
10use regex::Regex;
11
12/// Regex to extract table name from CREATE TABLE
13/// Supports: `table` (MySQL), "table" (PostgreSQL), [table] (MSSQL), table (SQLite/unquoted), schema.table
14static CREATE_TABLE_NAME_RE: Lazy<Regex> = Lazy::new(|| {
15    // Match table name with various quoting styles including MSSQL brackets
16    // Pattern handles: schema.table, [schema].[table], `schema`.`table`, "schema"."table"
17    Regex::new(r#"(?i)CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?(?:[\[\]`"\w]+\s*\.\s*)*[\[`"]?([^\[\]`"\s(]+)[\]`"]?"#)
18        .unwrap()
19});
20
21/// Regex to extract table name from ALTER TABLE
22/// Supports: `table` (MySQL), "table" (PostgreSQL), [table] (MSSQL), table (SQLite/unquoted), schema.table
23static ALTER_TABLE_NAME_RE: Lazy<Regex> = Lazy::new(|| {
24    Regex::new(
25        r#"(?i)ALTER\s+TABLE\s+(?:ONLY\s+)?(?:[\[\]`"\w]+\s*\.\s*)*[\[`"]?([^\[\]`"\s]+)[\]`"]?"#,
26    )
27    .unwrap()
28});
29
30/// Regex for column definition
31/// Supports: `column` (MySQL), "column" (PostgreSQL), [column] (MSSQL), column (unquoted)
32static COLUMN_DEF_RE: Lazy<Regex> = Lazy::new(|| {
33    Regex::new(r#"^\s*[\[`"]?([^\[\]`"\s,]+)[\]`"]?\s+(\w+(?:\([^)]+\))?(?:\s+unsigned)?)"#)
34        .unwrap()
35});
36
37/// Regex for PRIMARY KEY constraint
38/// Supports MSSQL CLUSTERED/NONCLUSTERED keywords: PRIMARY KEY CLUSTERED ([col])
39static PRIMARY_KEY_RE: Lazy<Regex> = Lazy::new(|| {
40    Regex::new(r"(?i)PRIMARY\s+KEY\s*(?:CLUSTERED\s+|NONCLUSTERED\s+)?\(([^)]+)\)").unwrap()
41});
42
43/// Regex for inline PRIMARY KEY on column
44static INLINE_PRIMARY_KEY_RE: Lazy<Regex> =
45    Lazy::new(|| Regex::new(r"(?i)\bPRIMARY\s+KEY\b").unwrap());
46
47/// Regex for FOREIGN KEY constraint with optional constraint name
48/// Supports: `name` (MySQL), "name" (PostgreSQL), [name] (MSSQL), name (unquoted)
49static FOREIGN_KEY_RE: Lazy<Regex> = Lazy::new(|| {
50    Regex::new(
51        r#"(?i)(?:CONSTRAINT\s+[\[`"]?([^\[\]`"\s]+)[\]`"]?\s+)?FOREIGN\s+KEY\s*\(([^)]+)\)\s*REFERENCES\s+(?:[\[\]`"\w]+\s*\.\s*)*[\[`"]?([^\[\]`"\s(]+)[\]`"]?\s*\(([^)]+)\)"#,
52    )
53    .unwrap()
54});
55
56/// Regex to detect NOT NULL constraint
57static NOT_NULL_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?i)\bNOT\s+NULL\b").unwrap());
58
59/// Regex for inline INDEX/KEY in CREATE TABLE
60/// Matches: INDEX idx_name (col1, col2), KEY idx_name (col1), UNIQUE INDEX idx_name (col1)
61/// Supports MSSQL bracket quoting: INDEX [idx_name] ([col])
62static INLINE_INDEX_RE: Lazy<Regex> = Lazy::new(|| {
63    Regex::new(r#"(?i)(?:(UNIQUE)\s+)?(?:INDEX|KEY)\s+[\[`"]?(\w+)[\]`"]?\s*\(([^)]+)\)"#).unwrap()
64});
65
66/// Regex for CREATE INDEX statement
67/// Matches: CREATE [UNIQUE] [CLUSTERED|NONCLUSTERED] INDEX [IF NOT EXISTS] idx_name ON table [USING method] (columns)
68/// Supports MSSQL bracket quoting and schema prefixes
69static CREATE_INDEX_RE: Lazy<Regex> = Lazy::new(|| {
70    Regex::new(
71        r#"(?i)CREATE\s+(UNIQUE\s+)?(?:CLUSTERED\s+|NONCLUSTERED\s+)?INDEX\s+(?:IF\s+NOT\s+EXISTS\s+)?[\[`"]?(\w+)[\]`"]?\s+ON\s+(?:[\[\]`"\w]+\s*\.\s*)*[\[`"]?(\w+)[\]`"]?\s*(?:USING\s+(\w+)\s*)?\(([^)]+)\)"#,
72    )
73    .unwrap()
74});
75
76/// Builder for constructing schema from DDL statements
77#[derive(Debug, Default)]
78pub struct SchemaBuilder {
79    schema: Schema,
80}
81
82impl SchemaBuilder {
83    /// Create a new schema builder
84    pub fn new() -> Self {
85        Self {
86            schema: Schema::new(),
87        }
88    }
89
90    /// Parse a CREATE TABLE statement and add to schema
91    pub fn parse_create_table(&mut self, stmt: &str) -> Option<TableId> {
92        let table_name = extract_create_table_name(stmt)?;
93
94        // Check if table already exists
95        if self.schema.get_table_id(&table_name).is_some() {
96            return self.schema.get_table_id(&table_name);
97        }
98
99        let mut table = TableSchema::new(table_name, TableId(0));
100        table.create_statement = Some(stmt.to_string());
101
102        // Extract the body between first ( and last )
103        let body = extract_table_body(stmt)?;
104
105        // Parse columns and constraints
106        parse_table_body(&body, &mut table);
107
108        // Add table to schema
109        Some(self.schema.add_table(table))
110    }
111
112    /// Parse an ALTER TABLE statement and update existing table
113    pub fn parse_alter_table(&mut self, stmt: &str) -> Option<TableId> {
114        let table_name = extract_alter_table_name(stmt)?;
115        let table_id = self.schema.get_table_id(&table_name)?;
116
117        // Parse any FK constraints added by ALTER TABLE
118        for fk in parse_foreign_keys(stmt) {
119            if let Some(table) = self.schema.table_mut(table_id) {
120                // Resolve column names to IDs
121                let mut resolved_fk = fk;
122                resolved_fk.columns = resolved_fk
123                    .column_names
124                    .iter()
125                    .filter_map(|name| table.get_column_id(name))
126                    .collect();
127                table.foreign_keys.push(resolved_fk);
128            }
129        }
130
131        Some(table_id)
132    }
133
134    /// Parse a CREATE INDEX statement and add to the appropriate table
135    pub fn parse_create_index(&mut self, stmt: &str) -> Option<TableId> {
136        let caps = CREATE_INDEX_RE.captures(stmt)?;
137
138        let is_unique = caps.get(1).is_some();
139        let index_name = caps.get(2)?.as_str().to_string();
140        let table_name = caps.get(3)?.as_str().to_string();
141        let index_type = caps.get(4).map(|m| m.as_str().to_uppercase());
142        let columns_str = caps.get(5)?.as_str();
143        let columns = parse_column_list(columns_str);
144
145        let table_id = self.schema.get_table_id(&table_name)?;
146
147        if let Some(table) = self.schema.table_mut(table_id) {
148            table.indexes.push(IndexDef {
149                name: index_name,
150                columns,
151                is_unique,
152                index_type,
153            });
154        }
155
156        Some(table_id)
157    }
158
159    /// Finalize the schema, resolving all FK references
160    pub fn build(mut self) -> Schema {
161        self.schema.resolve_foreign_keys();
162        self.schema
163    }
164
165    /// Get current schema (for inspection during building)
166    pub fn schema(&self) -> &Schema {
167        &self.schema
168    }
169}
170
171/// Extract table name from CREATE TABLE statement
172pub fn extract_create_table_name(stmt: &str) -> Option<String> {
173    CREATE_TABLE_NAME_RE
174        .captures(stmt)
175        .and_then(|c| c.get(1))
176        .map(|m| m.as_str().to_string())
177}
178
179/// Extract table name from ALTER TABLE statement
180pub fn extract_alter_table_name(stmt: &str) -> Option<String> {
181    ALTER_TABLE_NAME_RE
182        .captures(stmt)
183        .and_then(|c| c.get(1))
184        .map(|m| m.as_str().to_string())
185}
186
187/// Extract the body of a CREATE TABLE statement (between first ( and matching ))
188fn extract_table_body(stmt: &str) -> Option<String> {
189    let bytes = stmt.as_bytes();
190    let mut depth = 0;
191    let mut start = None;
192    let mut in_string = false;
193    let mut escape_next = false;
194
195    for (i, &b) in bytes.iter().enumerate() {
196        if escape_next {
197            escape_next = false;
198            continue;
199        }
200
201        if b == b'\\' && in_string {
202            escape_next = true;
203            continue;
204        }
205
206        if b == b'\'' {
207            in_string = !in_string;
208            continue;
209        }
210
211        if in_string {
212            continue;
213        }
214
215        if b == b'(' {
216            if depth == 0 {
217                start = Some(i + 1);
218            }
219            depth += 1;
220        } else if b == b')' {
221            depth -= 1;
222            if depth == 0 {
223                if let Some(s) = start {
224                    return Some(stmt[s..i].to_string());
225                }
226            }
227        }
228    }
229
230    None
231}
232
233/// Parse the body of a CREATE TABLE to extract columns and constraints
234fn parse_table_body(body: &str, table: &mut TableSchema) {
235    // Split by commas, but respect nested parentheses
236    let parts = split_table_body(body);
237
238    for part in parts {
239        let trimmed = part.trim();
240        if trimmed.is_empty() {
241            continue;
242        }
243
244        // Check if this is a constraint or a column
245        let upper = trimmed.to_uppercase();
246        if upper.starts_with("PRIMARY KEY")
247            || upper.starts_with("CONSTRAINT")
248            || upper.starts_with("FOREIGN KEY")
249            || upper.starts_with("KEY ")
250            || upper.starts_with("INDEX ")
251            || upper.starts_with("UNIQUE ")
252            || upper.starts_with("FULLTEXT ")
253            || upper.starts_with("SPATIAL ")
254            || upper.starts_with("CHECK ")
255        {
256            // Parse constraints
257            if let Some(pk_cols) = parse_primary_key_constraint(trimmed) {
258                for col_name in pk_cols {
259                    if let Some(col) = table
260                        .columns
261                        .iter_mut()
262                        .find(|c| c.name.eq_ignore_ascii_case(&col_name))
263                    {
264                        col.is_primary_key = true;
265                        if !table.primary_key.contains(&col.ordinal) {
266                            table.primary_key.push(col.ordinal);
267                        }
268                    }
269                }
270            }
271
272            for fk in parse_foreign_keys(trimmed) {
273                let mut resolved_fk = fk;
274                resolved_fk.columns = resolved_fk
275                    .column_names
276                    .iter()
277                    .filter_map(|name| table.get_column_id(name))
278                    .collect();
279                table.foreign_keys.push(resolved_fk);
280            }
281
282            // Parse inline indexes (INDEX, KEY, UNIQUE INDEX, UNIQUE KEY)
283            if let Some(idx) = parse_inline_index(trimmed) {
284                table.indexes.push(idx);
285            }
286        } else {
287            // Parse column definition
288            if let Some(col) = parse_column_def(trimmed, ColumnId(table.columns.len() as u16)) {
289                // Check for inline PRIMARY KEY
290                if INLINE_PRIMARY_KEY_RE.is_match(trimmed) {
291                    let mut col = col;
292                    col.is_primary_key = true;
293                    table.primary_key.push(col.ordinal);
294                    table.columns.push(col);
295                } else {
296                    table.columns.push(col);
297                }
298            }
299        }
300    }
301}
302
303/// Split table body by commas, respecting nested parentheses
304pub fn split_table_body(body: &str) -> Vec<String> {
305    let mut parts = Vec::new();
306    let mut current = String::new();
307    let mut depth = 0;
308    let mut in_string = false;
309    let mut escape_next = false;
310
311    for ch in body.chars() {
312        if escape_next {
313            current.push(ch);
314            escape_next = false;
315            continue;
316        }
317
318        if ch == '\\' && in_string {
319            current.push(ch);
320            escape_next = true;
321            continue;
322        }
323
324        if ch == '\'' {
325            in_string = !in_string;
326            current.push(ch);
327            continue;
328        }
329
330        if in_string {
331            current.push(ch);
332            continue;
333        }
334
335        match ch {
336            '(' => {
337                depth += 1;
338                current.push(ch);
339            }
340            ')' => {
341                depth -= 1;
342                current.push(ch);
343            }
344            ',' if depth == 0 => {
345                parts.push(current.trim().to_string());
346                current = String::new();
347            }
348            _ => {
349                current.push(ch);
350            }
351        }
352    }
353
354    if !current.trim().is_empty() {
355        parts.push(current.trim().to_string());
356    }
357
358    parts
359}
360
361/// Parse a column definition
362fn parse_column_def(def: &str, ordinal: ColumnId) -> Option<Column> {
363    let caps = COLUMN_DEF_RE.captures(def)?;
364    let name = caps.get(1)?.as_str().to_string();
365    let type_str = caps.get(2)?.as_str();
366
367    let col_type = ColumnType::from_mysql_type(type_str);
368    let is_nullable = !NOT_NULL_RE.is_match(def);
369
370    Some(Column {
371        name,
372        col_type,
373        ordinal,
374        is_primary_key: false,
375        is_nullable,
376    })
377}
378
379/// Parse PRIMARY KEY constraint, returns column names
380fn parse_primary_key_constraint(constraint: &str) -> Option<Vec<String>> {
381    let caps = PRIMARY_KEY_RE.captures(constraint)?;
382    let cols_str = caps.get(1)?.as_str();
383    Some(parse_column_list(cols_str))
384}
385
386/// Parse inline INDEX/KEY constraint from CREATE TABLE body
387fn parse_inline_index(constraint: &str) -> Option<IndexDef> {
388    let caps = INLINE_INDEX_RE.captures(constraint)?;
389
390    let is_unique = caps.get(1).is_some();
391    let index_name = caps.get(2)?.as_str().to_string();
392    let columns_str = caps.get(3)?.as_str();
393    let columns = parse_column_list(columns_str);
394
395    Some(IndexDef {
396        name: index_name,
397        columns,
398        is_unique,
399        index_type: None, // Inline indexes don't specify type
400    })
401}
402
403/// Parse FOREIGN KEY constraints from a statement
404fn parse_foreign_keys(stmt: &str) -> Vec<ForeignKey> {
405    let mut fks = Vec::new();
406
407    for caps in FOREIGN_KEY_RE.captures_iter(stmt) {
408        let name = caps.get(1).map(|m| m.as_str().to_string());
409        let local_cols = caps
410            .get(2)
411            .map(|m| parse_column_list(m.as_str()))
412            .unwrap_or_default();
413        let ref_table = caps
414            .get(3)
415            .map(|m| m.as_str().to_string())
416            .unwrap_or_default();
417        let ref_cols = caps
418            .get(4)
419            .map(|m| parse_column_list(m.as_str()))
420            .unwrap_or_default();
421
422        if !local_cols.is_empty() && !ref_table.is_empty() && !ref_cols.is_empty() {
423            fks.push(ForeignKey {
424                name,
425                columns: Vec::new(), // Will be resolved later
426                column_names: local_cols,
427                referenced_table: ref_table,
428                referenced_columns: ref_cols,
429                referenced_table_id: None,
430            });
431        }
432    }
433
434    fks
435}
436
437/// Parse a comma-separated column list, stripping quotes (backticks, double quotes, brackets)
438pub fn parse_column_list(s: &str) -> Vec<String> {
439    s.split(',')
440        .map(|c| {
441            c.trim()
442                .trim_matches('`')
443                .trim_matches('"')
444                .trim_matches('[')
445                .trim_matches(']')
446                .to_string()
447        })
448        .filter(|c| !c.is_empty())
449        .collect()
450}