sql_splitter/schema/
ddl.rs

1//! MySQL DDL parsing for schema extraction.
2//!
3//! Parses CREATE TABLE and ALTER TABLE statements to extract:
4//! - Column definitions with types
5//! - Primary key constraints
6//! - Foreign key constraints
7
8use super::{Column, ColumnId, ColumnType, ForeignKey, Schema, TableId, TableSchema};
9use once_cell::sync::Lazy;
10use regex::Regex;
11
12/// Regex to extract table name from CREATE TABLE
13/// Supports: `table` (MySQL), "table" (PostgreSQL), table (SQLite/unquoted), schema.table
14static CREATE_TABLE_NAME_RE: Lazy<Regex> = Lazy::new(|| {
15    Regex::new(r#"(?i)CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?(?:\w+\.)?[`"]?([^`"\s(]+)[`"]?"#)
16        .unwrap()
17});
18
19/// Regex to extract table name from ALTER TABLE
20/// Supports: `table` (MySQL), "table" (PostgreSQL), table (SQLite/unquoted), schema.table
21static ALTER_TABLE_NAME_RE: Lazy<Regex> = Lazy::new(|| {
22    Regex::new(r#"(?i)ALTER\s+TABLE\s+(?:ONLY\s+)?(?:\w+\.)?[`"]?([^`"\s]+)[`"]?"#).unwrap()
23});
24
25/// Regex for column definition
26/// Supports: `column` (MySQL), "column" (PostgreSQL), column (unquoted)
27static COLUMN_DEF_RE: Lazy<Regex> = Lazy::new(|| {
28    Regex::new(r#"^\s*[`"]?([^`"\s,]+)[`"]?\s+(\w+(?:\([^)]+\))?(?:\s+unsigned)?)"#).unwrap()
29});
30
31/// Regex for PRIMARY KEY constraint
32static PRIMARY_KEY_RE: Lazy<Regex> =
33    Lazy::new(|| Regex::new(r"(?i)PRIMARY\s+KEY\s*\(([^)]+)\)").unwrap());
34
35/// Regex for inline PRIMARY KEY on column
36static INLINE_PRIMARY_KEY_RE: Lazy<Regex> =
37    Lazy::new(|| Regex::new(r"(?i)\bPRIMARY\s+KEY\b").unwrap());
38
39/// Regex for FOREIGN KEY constraint with optional constraint name
40/// Supports: `name` (MySQL), "name" (PostgreSQL), name (unquoted)
41static FOREIGN_KEY_RE: Lazy<Regex> = Lazy::new(|| {
42    Regex::new(
43        r#"(?i)(?:CONSTRAINT\s+[`"]?([^`"\s]+)[`"]?\s+)?FOREIGN\s+KEY\s*\(([^)]+)\)\s*REFERENCES\s+(?:\w+\.)?[`"]?([^`"\s(]+)[`"]?\s*\(([^)]+)\)"#,
44    )
45    .unwrap()
46});
47
48/// Regex to detect NOT NULL constraint
49static NOT_NULL_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?i)\bNOT\s+NULL\b").unwrap());
50
51/// Builder for constructing schema from DDL statements
52#[derive(Debug, Default)]
53pub struct SchemaBuilder {
54    schema: Schema,
55}
56
57impl SchemaBuilder {
58    /// Create a new schema builder
59    pub fn new() -> Self {
60        Self {
61            schema: Schema::new(),
62        }
63    }
64
65    /// Parse a CREATE TABLE statement and add to schema
66    pub fn parse_create_table(&mut self, stmt: &str) -> Option<TableId> {
67        let table_name = extract_create_table_name(stmt)?;
68
69        // Check if table already exists
70        if self.schema.get_table_id(&table_name).is_some() {
71            return self.schema.get_table_id(&table_name);
72        }
73
74        let mut table = TableSchema::new(table_name, TableId(0));
75        table.create_statement = Some(stmt.to_string());
76
77        // Extract the body between first ( and last )
78        let body = extract_table_body(stmt)?;
79
80        // Parse columns and constraints
81        parse_table_body(&body, &mut table);
82
83        // Add table to schema
84        Some(self.schema.add_table(table))
85    }
86
87    /// Parse an ALTER TABLE statement and update existing table
88    pub fn parse_alter_table(&mut self, stmt: &str) -> Option<TableId> {
89        let table_name = extract_alter_table_name(stmt)?;
90        let table_id = self.schema.get_table_id(&table_name)?;
91
92        // Parse any FK constraints added by ALTER TABLE
93        for fk in parse_foreign_keys(stmt) {
94            if let Some(table) = self.schema.table_mut(table_id) {
95                // Resolve column names to IDs
96                let mut resolved_fk = fk;
97                resolved_fk.columns = resolved_fk
98                    .column_names
99                    .iter()
100                    .filter_map(|name| table.get_column_id(name))
101                    .collect();
102                table.foreign_keys.push(resolved_fk);
103            }
104        }
105
106        Some(table_id)
107    }
108
109    /// Finalize the schema, resolving all FK references
110    pub fn build(mut self) -> Schema {
111        self.schema.resolve_foreign_keys();
112        self.schema
113    }
114
115    /// Get current schema (for inspection during building)
116    pub fn schema(&self) -> &Schema {
117        &self.schema
118    }
119}
120
121/// Extract table name from CREATE TABLE statement
122pub fn extract_create_table_name(stmt: &str) -> Option<String> {
123    CREATE_TABLE_NAME_RE
124        .captures(stmt)
125        .and_then(|c| c.get(1))
126        .map(|m| m.as_str().to_string())
127}
128
129/// Extract table name from ALTER TABLE statement
130pub fn extract_alter_table_name(stmt: &str) -> Option<String> {
131    ALTER_TABLE_NAME_RE
132        .captures(stmt)
133        .and_then(|c| c.get(1))
134        .map(|m| m.as_str().to_string())
135}
136
137/// Extract the body of a CREATE TABLE statement (between first ( and matching ))
138fn extract_table_body(stmt: &str) -> Option<String> {
139    let bytes = stmt.as_bytes();
140    let mut depth = 0;
141    let mut start = None;
142    let mut in_string = false;
143    let mut escape_next = false;
144
145    for (i, &b) in bytes.iter().enumerate() {
146        if escape_next {
147            escape_next = false;
148            continue;
149        }
150
151        if b == b'\\' && in_string {
152            escape_next = true;
153            continue;
154        }
155
156        if b == b'\'' {
157            in_string = !in_string;
158            continue;
159        }
160
161        if in_string {
162            continue;
163        }
164
165        if b == b'(' {
166            if depth == 0 {
167                start = Some(i + 1);
168            }
169            depth += 1;
170        } else if b == b')' {
171            depth -= 1;
172            if depth == 0 {
173                if let Some(s) = start {
174                    return Some(stmt[s..i].to_string());
175                }
176            }
177        }
178    }
179
180    None
181}
182
183/// Parse the body of a CREATE TABLE to extract columns and constraints
184fn parse_table_body(body: &str, table: &mut TableSchema) {
185    // Split by commas, but respect nested parentheses
186    let parts = split_table_body(body);
187
188    for part in parts {
189        let trimmed = part.trim();
190        if trimmed.is_empty() {
191            continue;
192        }
193
194        // Check if this is a constraint or a column
195        let upper = trimmed.to_uppercase();
196        if upper.starts_with("PRIMARY KEY")
197            || upper.starts_with("CONSTRAINT")
198            || upper.starts_with("FOREIGN KEY")
199            || upper.starts_with("KEY ")
200            || upper.starts_with("INDEX ")
201            || upper.starts_with("UNIQUE ")
202            || upper.starts_with("FULLTEXT ")
203            || upper.starts_with("SPATIAL ")
204            || upper.starts_with("CHECK ")
205        {
206            // Parse constraints
207            if let Some(pk_cols) = parse_primary_key_constraint(trimmed) {
208                for col_name in pk_cols {
209                    if let Some(col) = table
210                        .columns
211                        .iter_mut()
212                        .find(|c| c.name.eq_ignore_ascii_case(&col_name))
213                    {
214                        col.is_primary_key = true;
215                        if !table.primary_key.contains(&col.ordinal) {
216                            table.primary_key.push(col.ordinal);
217                        }
218                    }
219                }
220            }
221
222            for fk in parse_foreign_keys(trimmed) {
223                let mut resolved_fk = fk;
224                resolved_fk.columns = resolved_fk
225                    .column_names
226                    .iter()
227                    .filter_map(|name| table.get_column_id(name))
228                    .collect();
229                table.foreign_keys.push(resolved_fk);
230            }
231        } else {
232            // Parse column definition
233            if let Some(col) = parse_column_def(trimmed, ColumnId(table.columns.len() as u16)) {
234                // Check for inline PRIMARY KEY
235                if INLINE_PRIMARY_KEY_RE.is_match(trimmed) {
236                    let mut col = col;
237                    col.is_primary_key = true;
238                    table.primary_key.push(col.ordinal);
239                    table.columns.push(col);
240                } else {
241                    table.columns.push(col);
242                }
243            }
244        }
245    }
246}
247
248/// Split table body by commas, respecting nested parentheses
249pub fn split_table_body(body: &str) -> Vec<String> {
250    let mut parts = Vec::new();
251    let mut current = String::new();
252    let mut depth = 0;
253    let mut in_string = false;
254    let mut escape_next = false;
255
256    for ch in body.chars() {
257        if escape_next {
258            current.push(ch);
259            escape_next = false;
260            continue;
261        }
262
263        if ch == '\\' && in_string {
264            current.push(ch);
265            escape_next = true;
266            continue;
267        }
268
269        if ch == '\'' {
270            in_string = !in_string;
271            current.push(ch);
272            continue;
273        }
274
275        if in_string {
276            current.push(ch);
277            continue;
278        }
279
280        match ch {
281            '(' => {
282                depth += 1;
283                current.push(ch);
284            }
285            ')' => {
286                depth -= 1;
287                current.push(ch);
288            }
289            ',' if depth == 0 => {
290                parts.push(current.trim().to_string());
291                current = String::new();
292            }
293            _ => {
294                current.push(ch);
295            }
296        }
297    }
298
299    if !current.trim().is_empty() {
300        parts.push(current.trim().to_string());
301    }
302
303    parts
304}
305
306/// Parse a column definition
307fn parse_column_def(def: &str, ordinal: ColumnId) -> Option<Column> {
308    let caps = COLUMN_DEF_RE.captures(def)?;
309    let name = caps.get(1)?.as_str().to_string();
310    let type_str = caps.get(2)?.as_str();
311
312    let col_type = ColumnType::from_mysql_type(type_str);
313    let is_nullable = !NOT_NULL_RE.is_match(def);
314
315    Some(Column {
316        name,
317        col_type,
318        ordinal,
319        is_primary_key: false,
320        is_nullable,
321    })
322}
323
324/// Parse PRIMARY KEY constraint, returns column names
325fn parse_primary_key_constraint(constraint: &str) -> Option<Vec<String>> {
326    let caps = PRIMARY_KEY_RE.captures(constraint)?;
327    let cols_str = caps.get(1)?.as_str();
328    Some(parse_column_list(cols_str))
329}
330
331/// Parse FOREIGN KEY constraints from a statement
332fn parse_foreign_keys(stmt: &str) -> Vec<ForeignKey> {
333    let mut fks = Vec::new();
334
335    for caps in FOREIGN_KEY_RE.captures_iter(stmt) {
336        let name = caps.get(1).map(|m| m.as_str().to_string());
337        let local_cols = caps
338            .get(2)
339            .map(|m| parse_column_list(m.as_str()))
340            .unwrap_or_default();
341        let ref_table = caps
342            .get(3)
343            .map(|m| m.as_str().to_string())
344            .unwrap_or_default();
345        let ref_cols = caps
346            .get(4)
347            .map(|m| parse_column_list(m.as_str()))
348            .unwrap_or_default();
349
350        if !local_cols.is_empty() && !ref_table.is_empty() && !ref_cols.is_empty() {
351            fks.push(ForeignKey {
352                name,
353                columns: Vec::new(), // Will be resolved later
354                column_names: local_cols,
355                referenced_table: ref_table,
356                referenced_columns: ref_cols,
357                referenced_table_id: None,
358            });
359        }
360    }
361
362    fks
363}
364
365/// Parse a comma-separated column list, stripping backticks and whitespace
366pub fn parse_column_list(s: &str) -> Vec<String> {
367    s.split(',')
368        .map(|c| c.trim().trim_matches('`').trim_matches('"').to_string())
369        .filter(|c| !c.is_empty())
370        .collect()
371}