vibesql_executor/
create_table.rs

1//! CREATE TABLE statement execution
2
3use vibesql_ast::{CreateTableStmt, IndexColumn, OrderDirection};
4use vibesql_catalog::{ColumnSchema, TableSchema};
5use vibesql_storage::Database;
6
7use crate::{
8    constraint_validator::ConstraintValidator, errors::ExecutorError,
9    privilege_checker::PrivilegeChecker,
10};
11
12/// Executor for CREATE TABLE statements
13pub struct CreateTableExecutor;
14
15impl CreateTableExecutor {
16    /// Execute a CREATE TABLE statement
17    ///
18    /// # Arguments
19    ///
20    /// * `stmt` - The CREATE TABLE statement AST node
21    /// * `database` - The database to create the table in
22    ///
23    /// # Returns
24    ///
25    /// Success message or error
26    ///
27    /// # Examples
28    ///
29    /// ```
30    /// use vibesql_ast::{ColumnDef, CreateTableStmt};
31    /// use vibesql_executor::CreateTableExecutor;
32    /// use vibesql_storage::Database;
33    /// use vibesql_types::DataType;
34    ///
35    /// let mut db = Database::new();
36    /// let stmt = CreateTableStmt {
37    ///     if_not_exists: false,
38    ///     table_name: "users".to_string(),
39    ///     columns: vec![
40    ///         ColumnDef {
41    ///             name: "id".to_string(),
42    ///             data_type: DataType::Integer,
43    ///             nullable: false,
44    ///             constraints: vec![],
45    ///             default_value: None,
46    ///             comment: None,
47    ///         },
48    ///         ColumnDef {
49    ///             name: "name".to_string(),
50    ///             data_type: DataType::Varchar { max_length: Some(255) },
51    ///             nullable: true,
52    ///             constraints: vec![],
53    ///             default_value: None,
54    ///             comment: None,
55    ///         },
56    ///     ],
57    ///     table_constraints: vec![],
58    ///     table_options: vec![],
59    /// };
60    ///
61    /// let result = CreateTableExecutor::execute(&stmt, &mut db);
62    /// assert!(result.is_ok());
63    /// ```
64    pub fn execute(
65        stmt: &CreateTableStmt,
66        database: &mut Database,
67    ) -> Result<String, ExecutorError> {
68        // Parse qualified table name (schema.table or just table)
69        let (schema_name, table_name) =
70            if let Some((schema_part, table_part)) = stmt.table_name.split_once('.') {
71                (schema_part.to_string(), table_part.to_string())
72            } else {
73                (database.catalog.get_current_schema().to_string(), stmt.table_name.clone())
74            };
75
76        // Check CREATE privilege on the schema
77        PrivilegeChecker::check_create(database, &schema_name)?;
78
79        // Check if table already exists in the target schema
80        let qualified_name = format!("{}.{}", schema_name, table_name);
81        if database.catalog.table_exists(&qualified_name) {
82            if stmt.if_not_exists {
83                // IF NOT EXISTS - silently return success without creating the table
84                return Ok(format!(
85                    "Table '{}' already exists in schema '{}' (skipped)",
86                    table_name, schema_name
87                ));
88            }
89            return Err(ExecutorError::TableAlreadyExists(qualified_name));
90        }
91
92        // Check for AUTO_INCREMENT constraints
93        // MySQL allows only one AUTO_INCREMENT column per table
94        let auto_increment_columns: Vec<&str> = stmt
95            .columns
96            .iter()
97            .filter(|col_def| {
98                col_def
99                    .constraints
100                    .iter()
101                    .any(|c| matches!(c.kind, vibesql_ast::ColumnConstraintKind::AutoIncrement))
102            })
103            .map(|col_def| col_def.name.as_str())
104            .collect();
105
106        if auto_increment_columns.len() > 1 {
107            return Err(ExecutorError::ConstraintViolation(
108                "Only one AUTO_INCREMENT column allowed per table".to_string(),
109            ));
110        }
111
112        // Convert AST ColumnDef → Catalog ColumnSchema
113        let mut columns: Vec<ColumnSchema> =
114            stmt.columns
115                .iter()
116                .map(|col_def| {
117                    // For AUTO_INCREMENT columns, set default to NEXT VALUE FOR sequence
118                    let default_value =
119                        if col_def.constraints.iter().any(|c| {
120                            matches!(c.kind, vibesql_ast::ColumnConstraintKind::AutoIncrement)
121                        }) {
122                            // Create sequence name: {table_name}_{column_name}_seq
123                            let sequence_name = format!("{}_{}_seq", table_name, col_def.name);
124                            Some(vibesql_ast::Expression::NextValue { sequence_name })
125                        } else {
126                            col_def.default_value.as_ref().map(|expr| (**expr).clone())
127                        };
128
129                    ColumnSchema {
130                        name: col_def.name.clone(),
131                        data_type: col_def.data_type.clone(),
132                        nullable: col_def.nullable,
133                        default_value,
134                    }
135                })
136                .collect();
137
138        // Process constraints using the constraint validator
139        let constraint_result =
140            ConstraintValidator::process_constraints(&stmt.columns, &stmt.table_constraints)?;
141
142        // Apply constraint results to columns (updates nullability)
143        ConstraintValidator::apply_to_columns(&mut columns, &constraint_result);
144
145        // Create TableSchema with unqualified name
146        let mut table_schema = TableSchema::new(table_name.clone(), columns);
147
148        // Apply constraint results to schema (sets PK, unique, and check constraints)
149        ConstraintValidator::apply_to_schema(&mut table_schema, &constraint_result);
150
151        // Check for STORAGE table option and apply storage format
152        for option in &stmt.table_options {
153            if let vibesql_ast::TableOption::Storage(format) = option {
154                table_schema.set_storage_format(*format);
155            }
156        }
157
158        // Process foreign key constraints from table_constraints
159        for constraint in &stmt.table_constraints {
160            if let vibesql_ast::TableConstraintKind::ForeignKey {
161                columns: fk_columns,
162                references_table,
163                references_columns,
164                on_delete,
165                on_update,
166            } = &constraint.kind
167            {
168                // Resolve column indices for FK columns
169                let column_indices: Vec<usize> = fk_columns
170                    .iter()
171                    .map(|col_name| {
172                        table_schema.get_column_index(col_name).ok_or_else(|| {
173                            ExecutorError::ColumnNotFound {
174                                column_name: col_name.clone(),
175                                table_name: table_name.clone(),
176                                searched_tables: vec![table_name.clone()],
177                                available_columns: table_schema
178                                    .columns
179                                    .iter()
180                                    .map(|c| c.name.clone())
181                                    .collect(),
182                            }
183                        })
184                    })
185                    .collect::<Result<Vec<_>, _>>()?;
186
187                // Lookup parent table to get parent column indices
188                let parent_schema = database
189                    .catalog
190                    .get_table(references_table)
191                    .ok_or_else(|| ExecutorError::TableNotFound(references_table.clone()))?;
192
193                let parent_column_indices: Vec<usize> = references_columns
194                    .iter()
195                    .map(|col_name| {
196                        parent_schema.get_column_index(col_name).ok_or_else(|| {
197                            ExecutorError::ColumnNotFound {
198                                column_name: col_name.clone(),
199                                table_name: references_table.clone(),
200                                searched_tables: vec![references_table.clone()],
201                                available_columns: parent_schema
202                                    .columns
203                                    .iter()
204                                    .map(|c| c.name.clone())
205                                    .collect(),
206                            }
207                        })
208                    })
209                    .collect::<Result<Vec<_>, _>>()?;
210
211                // Convert ReferentialAction from AST to catalog type
212                let convert_action = |action: &Option<vibesql_ast::ReferentialAction>| match action
213                    .as_ref()
214                    .unwrap_or(&vibesql_ast::ReferentialAction::NoAction)
215                {
216                    vibesql_ast::ReferentialAction::Cascade => {
217                        vibesql_catalog::ReferentialAction::Cascade
218                    }
219                    vibesql_ast::ReferentialAction::SetNull => {
220                        vibesql_catalog::ReferentialAction::SetNull
221                    }
222                    vibesql_ast::ReferentialAction::SetDefault => {
223                        vibesql_catalog::ReferentialAction::SetDefault
224                    }
225                    vibesql_ast::ReferentialAction::Restrict => {
226                        vibesql_catalog::ReferentialAction::Restrict
227                    }
228                    vibesql_ast::ReferentialAction::NoAction => {
229                        vibesql_catalog::ReferentialAction::NoAction
230                    }
231                };
232
233                let fk = vibesql_catalog::ForeignKeyConstraint {
234                    name: constraint.name.clone(),
235                    column_names: fk_columns.clone(),
236                    column_indices,
237                    parent_table: references_table.clone(),
238                    parent_column_names: references_columns.clone(),
239                    parent_column_indices,
240                    on_delete: convert_action(on_delete),
241                    on_update: convert_action(on_update),
242                };
243
244                table_schema.add_foreign_key(fk)?;
245            }
246        }
247
248        // If creating in a non-current schema, temporarily switch to it
249        let original_schema = database.catalog.get_current_schema().to_string();
250        let needs_schema_switch = schema_name != original_schema;
251
252        if needs_schema_switch {
253            database
254                .catalog
255                .set_current_schema(&schema_name)
256                .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
257        }
258
259        // Create internal sequences for AUTO_INCREMENT columns
260        for auto_inc_col in &auto_increment_columns {
261            let sequence_name = format!("{}_{}_seq", table_name, auto_inc_col);
262            database
263                .catalog
264                .create_sequence(
265                    sequence_name.clone(),
266                    Some(1), // start_with: 1
267                    1,       // increment_by: 1
268                    Some(1), // min_value: 1
269                    None,    // max_value: unlimited
270                    false,   // cycle: false
271                )
272                .map_err(|e| {
273                    ExecutorError::StorageError(format!(
274                        "Failed to create sequence for AUTO_INCREMENT: {:?}",
275                        e
276                    ))
277                })?;
278        }
279
280        // Create table using Database API (handles both catalog and storage)
281        let result = database
282            .create_table(table_schema.clone())
283            .map_err(|e| ExecutorError::StorageError(e.to_string()));
284
285        // Check if table creation succeeded before creating indexes
286        result?;
287
288        // Auto-create indexes for PRIMARY KEY and UNIQUE constraints
289        Self::create_implicit_indexes(database, &table_name, &table_schema)?;
290
291        // Restore original schema if we switched
292        if needs_schema_switch {
293            database
294                .catalog
295                .set_current_schema(&original_schema)
296                .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
297        }
298
299        // Return success message
300        Ok(format!("Table '{}' created successfully in schema '{}'", table_name, schema_name))
301    }
302
303    /// Create implicit indexes for PRIMARY KEY and UNIQUE constraints
304    ///
305    /// Production databases automatically create B-tree indexes for these constraints
306    /// to enable efficient query optimization. This function replicates that behavior.
307    fn create_implicit_indexes(
308        database: &mut Database,
309        table_name: &str,
310        table_schema: &TableSchema,
311    ) -> Result<(), ExecutorError> {
312        // Auto-create PRIMARY KEY index
313        if let Some(pk_cols) = &table_schema.primary_key {
314            let index_name = format!("pk_{}", table_name);
315
316            // Create IndexColumn specs for the PRIMARY KEY columns
317            let index_columns: Vec<IndexColumn> = pk_cols
318                .iter()
319                .map(|col_name| IndexColumn {
320                    column_name: col_name.clone(),
321                    direction: OrderDirection::Asc,
322                    prefix_length: None,
323                })
324                .collect();
325
326            // Add to catalog first
327            let index_metadata = vibesql_catalog::IndexMetadata::new(
328                index_name.clone(),
329                table_name.to_string(),
330                vibesql_catalog::IndexType::BTree,
331                index_columns
332                    .iter()
333                    .map(|col| vibesql_catalog::IndexedColumn {
334                        column_name: col.column_name.clone(),
335                        order: vibesql_catalog::SortOrder::Ascending,
336                        prefix_length: None,
337                    })
338                    .collect(),
339                true, // unique
340            );
341            database
342                .catalog
343                .add_index(index_metadata)
344                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
345
346            // Create the actual B-tree index
347            database
348                .create_index(index_name, table_name.to_string(), true, index_columns)
349                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
350        }
351
352        // Auto-create UNIQUE constraint indexes
353        for unique_cols in &table_schema.unique_constraints {
354            let index_name = format!("uq_{}_{}", table_name, unique_cols.join("_"));
355
356            // Create IndexColumn specs for the UNIQUE columns
357            let index_columns: Vec<IndexColumn> = unique_cols
358                .iter()
359                .map(|col_name| IndexColumn {
360                    column_name: col_name.clone(),
361                    direction: OrderDirection::Asc,
362                    prefix_length: None,
363                })
364                .collect();
365
366            // Add to catalog first
367            let index_metadata = vibesql_catalog::IndexMetadata::new(
368                index_name.clone(),
369                table_name.to_string(),
370                vibesql_catalog::IndexType::BTree,
371                index_columns
372                    .iter()
373                    .map(|col| vibesql_catalog::IndexedColumn {
374                        column_name: col.column_name.clone(),
375                        order: vibesql_catalog::SortOrder::Ascending,
376                        prefix_length: None,
377                    })
378                    .collect(),
379                true, // unique
380            );
381            database
382                .catalog
383                .add_index(index_metadata)
384                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
385
386            // Create the actual B-tree index
387            database
388                .create_index(index_name, table_name.to_string(), true, index_columns)
389                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
390        }
391
392        Ok(())
393    }
394}