vibesql_executor/
create_table.rs

1//! CREATE TABLE statement execution
2
3use vibesql_ast::{CreateTableStmt, IndexColumn, OrderDirection};
4use vibesql_catalog::{ColumnSchema, TableSchema};
5use vibesql_storage::Database;
6
7use crate::{
8    constraint_validator::ConstraintValidator, errors::ExecutorError,
9    privilege_checker::PrivilegeChecker,
10};
11
12/// Executor for CREATE TABLE statements
13pub struct CreateTableExecutor;
14
15impl CreateTableExecutor {
16    /// Execute a CREATE TABLE statement
17    ///
18    /// # Arguments
19    ///
20    /// * `stmt` - The CREATE TABLE statement AST node
21    /// * `database` - The database to create the table in
22    ///
23    /// # Returns
24    ///
25    /// Success message or error
26    ///
27    /// # Examples
28    ///
29    /// ```
30    /// use vibesql_ast::{ColumnDef, CreateTableStmt};
31    /// use vibesql_executor::CreateTableExecutor;
32    /// use vibesql_storage::Database;
33    /// use vibesql_types::DataType;
34    ///
35    /// let mut db = Database::new();
36    /// let stmt = CreateTableStmt {
37    ///     table_name: "users".to_string(),
38    ///     columns: vec![
39    ///         ColumnDef {
40    ///             name: "id".to_string(),
41    ///             data_type: DataType::Integer,
42    ///             nullable: false,
43    ///             constraints: vec![],
44    ///             default_value: None,
45    ///             comment: None,
46    ///         },
47    ///         ColumnDef {
48    ///             name: "name".to_string(),
49    ///             data_type: DataType::Varchar { max_length: Some(255) },
50    ///             nullable: true,
51    ///             constraints: vec![],
52    ///             default_value: None,
53    ///             comment: None,
54    ///         },
55    ///     ],
56    ///     table_constraints: vec![],
57    ///     table_options: vec![],
58    /// };
59    ///
60    /// let result = CreateTableExecutor::execute(&stmt, &mut db);
61    /// assert!(result.is_ok());
62    /// ```
63    pub fn execute(
64        stmt: &CreateTableStmt,
65        database: &mut Database,
66    ) -> Result<String, ExecutorError> {
67        // Parse qualified table name (schema.table or just table)
68        let (schema_name, table_name) =
69            if let Some((schema_part, table_part)) = stmt.table_name.split_once('.') {
70                (schema_part.to_string(), table_part.to_string())
71            } else {
72                (database.catalog.get_current_schema().to_string(), stmt.table_name.clone())
73            };
74
75        // Check CREATE privilege on the schema
76        PrivilegeChecker::check_create(database, &schema_name)?;
77
78        // Check if table already exists in the target schema
79        let qualified_name = format!("{}.{}", schema_name, table_name);
80        if database.catalog.table_exists(&qualified_name) {
81            return Err(ExecutorError::TableAlreadyExists(qualified_name));
82        }
83
84        // Check for AUTO_INCREMENT constraints
85        // MySQL allows only one AUTO_INCREMENT column per table
86        let auto_increment_columns: Vec<&str> = stmt
87            .columns
88            .iter()
89            .filter(|col_def| {
90                col_def.constraints.iter().any(|c| {
91                    matches!(
92                        c.kind,
93                        vibesql_ast::ColumnConstraintKind::AutoIncrement
94                    )
95                })
96            })
97            .map(|col_def| col_def.name.as_str())
98            .collect();
99
100        if auto_increment_columns.len() > 1 {
101            return Err(ExecutorError::ConstraintViolation(
102                "Only one AUTO_INCREMENT column allowed per table".to_string(),
103            ));
104        }
105
106        // Convert AST ColumnDef → Catalog ColumnSchema
107        let mut columns: Vec<ColumnSchema> = stmt
108            .columns
109            .iter()
110            .map(|col_def| {
111                // For AUTO_INCREMENT columns, set default to NEXT VALUE FOR sequence
112                let default_value = if col_def.constraints.iter().any(|c| {
113                    matches!(
114                        c.kind,
115                        vibesql_ast::ColumnConstraintKind::AutoIncrement
116                    )
117                }) {
118                    // Create sequence name: {table_name}_{column_name}_seq
119                    let sequence_name = format!("{}_{}_seq", table_name, col_def.name);
120                    Some(vibesql_ast::Expression::NextValue { sequence_name })
121                } else {
122                    col_def.default_value.as_ref().map(|expr| (**expr).clone())
123                };
124
125                ColumnSchema {
126                    name: col_def.name.clone(),
127                    data_type: col_def.data_type.clone(),
128                    nullable: col_def.nullable,
129                    default_value,
130                }
131            })
132            .collect();
133
134        // Process constraints using the constraint validator
135        let constraint_result =
136            ConstraintValidator::process_constraints(&stmt.columns, &stmt.table_constraints)?;
137
138        // Apply constraint results to columns (updates nullability)
139        ConstraintValidator::apply_to_columns(&mut columns, &constraint_result);
140
141        // Create TableSchema with unqualified name
142        let mut table_schema = TableSchema::new(table_name.clone(), columns);
143
144        // Apply constraint results to schema (sets PK, unique, and check constraints)
145        ConstraintValidator::apply_to_schema(&mut table_schema, &constraint_result);
146
147        // Check for STORAGE table option and apply storage format
148        for option in &stmt.table_options {
149            if let vibesql_ast::TableOption::Storage(format) = option {
150                table_schema.set_storage_format(*format);
151            }
152        }
153
154        // Process foreign key constraints from table_constraints
155        for constraint in &stmt.table_constraints {
156            if let vibesql_ast::TableConstraintKind::ForeignKey {
157                columns: fk_columns,
158                references_table,
159                references_columns,
160                on_delete,
161                on_update,
162            } = &constraint.kind
163            {
164                // Resolve column indices for FK columns
165                let column_indices: Vec<usize> = fk_columns
166                    .iter()
167                    .map(|col_name| {
168                        table_schema.get_column_index(col_name).ok_or_else(|| {
169                            ExecutorError::ColumnNotFound {
170                                column_name: col_name.clone(),
171                                table_name: table_name.clone(),
172                                searched_tables: vec![table_name.clone()],
173                                available_columns: table_schema
174                                    .columns
175                                    .iter()
176                                    .map(|c| c.name.clone())
177                                    .collect(),
178                            }
179                        })
180                    })
181                    .collect::<Result<Vec<_>, _>>()?;
182
183                // Lookup parent table to get parent column indices
184                let parent_schema = database
185                    .catalog
186                    .get_table(references_table)
187                    .ok_or_else(|| ExecutorError::TableNotFound(references_table.clone()))?;
188
189                let parent_column_indices: Vec<usize> = references_columns
190                    .iter()
191                    .map(|col_name| {
192                        parent_schema.get_column_index(col_name).ok_or_else(|| {
193                            ExecutorError::ColumnNotFound {
194                                column_name: col_name.clone(),
195                                table_name: references_table.clone(),
196                                searched_tables: vec![references_table.clone()],
197                                available_columns: parent_schema
198                                    .columns
199                                    .iter()
200                                    .map(|c| c.name.clone())
201                                    .collect(),
202                            }
203                        })
204                    })
205                    .collect::<Result<Vec<_>, _>>()?;
206
207                // Convert ReferentialAction from AST to catalog type
208                let convert_action = |action: &Option<vibesql_ast::ReferentialAction>| {
209                    match action.as_ref().unwrap_or(&vibesql_ast::ReferentialAction::NoAction) {
210                        vibesql_ast::ReferentialAction::Cascade => {
211                            vibesql_catalog::ReferentialAction::Cascade
212                        }
213                        vibesql_ast::ReferentialAction::SetNull => {
214                            vibesql_catalog::ReferentialAction::SetNull
215                        }
216                        vibesql_ast::ReferentialAction::SetDefault => {
217                            vibesql_catalog::ReferentialAction::SetDefault
218                        }
219                        vibesql_ast::ReferentialAction::Restrict => {
220                            vibesql_catalog::ReferentialAction::Restrict
221                        }
222                        vibesql_ast::ReferentialAction::NoAction => {
223                            vibesql_catalog::ReferentialAction::NoAction
224                        }
225                    }
226                };
227
228                let fk = vibesql_catalog::ForeignKeyConstraint {
229                    name: constraint.name.clone(),
230                    column_names: fk_columns.clone(),
231                    column_indices,
232                    parent_table: references_table.clone(),
233                    parent_column_names: references_columns.clone(),
234                    parent_column_indices,
235                    on_delete: convert_action(on_delete),
236                    on_update: convert_action(on_update),
237                };
238
239                table_schema.add_foreign_key(fk)?;
240            }
241        }
242
243        // If creating in a non-current schema, temporarily switch to it
244        let original_schema = database.catalog.get_current_schema().to_string();
245        let needs_schema_switch = schema_name != original_schema;
246
247        if needs_schema_switch {
248            database
249                .catalog
250                .set_current_schema(&schema_name)
251                .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
252        }
253
254        // Create internal sequences for AUTO_INCREMENT columns
255        for auto_inc_col in &auto_increment_columns {
256            let sequence_name = format!("{}_{}_seq", table_name, auto_inc_col);
257            database
258                .catalog
259                .create_sequence(
260                    sequence_name.clone(),
261                    Some(1),  // start_with: 1
262                    1,        // increment_by: 1
263                    Some(1),  // min_value: 1
264                    None,     // max_value: unlimited
265                    false,    // cycle: false
266                )
267                .map_err(|e| ExecutorError::StorageError(format!("Failed to create sequence for AUTO_INCREMENT: {:?}", e)))?;
268        }
269
270        // Create table using Database API (handles both catalog and storage)
271        let result = database
272            .create_table(table_schema.clone())
273            .map_err(|e| ExecutorError::StorageError(e.to_string()));
274
275        // Check if table creation succeeded before creating indexes
276        result?;
277
278        // Auto-create indexes for PRIMARY KEY and UNIQUE constraints
279        Self::create_implicit_indexes(database, &table_name, &table_schema)?;
280
281        // Restore original schema if we switched
282        if needs_schema_switch {
283            database
284                .catalog
285                .set_current_schema(&original_schema)
286                .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
287        }
288
289        // Return success message
290        Ok(format!("Table '{}' created successfully in schema '{}'", table_name, schema_name))
291    }
292
293    /// Create implicit indexes for PRIMARY KEY and UNIQUE constraints
294    ///
295    /// Production databases automatically create B-tree indexes for these constraints
296    /// to enable efficient query optimization. This function replicates that behavior.
297    fn create_implicit_indexes(
298        database: &mut Database,
299        table_name: &str,
300        table_schema: &TableSchema,
301    ) -> Result<(), ExecutorError> {
302        // Auto-create PRIMARY KEY index
303        if let Some(pk_cols) = &table_schema.primary_key {
304            let index_name = format!("pk_{}", table_name);
305
306            // Create IndexColumn specs for the PRIMARY KEY columns
307            let index_columns: Vec<IndexColumn> = pk_cols
308                .iter()
309                .map(|col_name| IndexColumn {
310                    column_name: col_name.clone(),
311                    direction: OrderDirection::Asc,
312                    prefix_length: None,
313                })
314                .collect();
315
316            // Add to catalog first
317            let index_metadata = vibesql_catalog::IndexMetadata::new(
318                index_name.clone(),
319                table_name.to_string(),
320                vibesql_catalog::IndexType::BTree,
321                index_columns
322                    .iter()
323                    .map(|col| vibesql_catalog::IndexedColumn {
324                        column_name: col.column_name.clone(),
325                        order: vibesql_catalog::SortOrder::Ascending,
326                        prefix_length: None,
327                    })
328                    .collect(),
329                true, // unique
330            );
331            database
332                .catalog
333                .add_index(index_metadata)
334                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
335
336            // Create the actual B-tree index
337            database
338                .create_index(index_name, table_name.to_string(), true, index_columns)
339                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
340        }
341
342        // Auto-create UNIQUE constraint indexes
343        for unique_cols in &table_schema.unique_constraints {
344            let index_name = format!("uq_{}_{}", table_name, unique_cols.join("_"));
345
346            // Create IndexColumn specs for the UNIQUE columns
347            let index_columns: Vec<IndexColumn> = unique_cols
348                .iter()
349                .map(|col_name| IndexColumn {
350                    column_name: col_name.clone(),
351                    direction: OrderDirection::Asc,
352                    prefix_length: None,
353                })
354                .collect();
355
356            // Add to catalog first
357            let index_metadata = vibesql_catalog::IndexMetadata::new(
358                index_name.clone(),
359                table_name.to_string(),
360                vibesql_catalog::IndexType::BTree,
361                index_columns
362                    .iter()
363                    .map(|col| vibesql_catalog::IndexedColumn {
364                        column_name: col.column_name.clone(),
365                        order: vibesql_catalog::SortOrder::Ascending,
366                        prefix_length: None,
367                    })
368                    .collect(),
369                true, // unique
370            );
371            database
372                .catalog
373                .add_index(index_metadata)
374                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
375
376            // Create the actual B-tree index
377            database
378                .create_index(index_name, table_name.to_string(), true, index_columns)
379                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
380        }
381
382        Ok(())
383    }
384}