vibesql_executor/
create_table.rs

1//! CREATE TABLE statement execution
2
3use vibesql_ast::{CreateTableStmt, IndexColumn, OrderDirection};
4use vibesql_catalog::{ColumnSchema, TableSchema};
5use vibesql_storage::Database;
6
7use crate::{
8    constraint_validator::ConstraintValidator, errors::ExecutorError,
9    privilege_checker::PrivilegeChecker,
10};
11
12/// Executor for CREATE TABLE statements
13pub struct CreateTableExecutor;
14
15impl CreateTableExecutor {
16    /// Execute a CREATE TABLE statement
17    ///
18    /// # Arguments
19    ///
20    /// * `stmt` - The CREATE TABLE statement AST node
21    /// * `database` - The database to create the table in
22    ///
23    /// # Returns
24    ///
25    /// Success message or error
26    ///
27    /// # Examples
28    ///
29    /// ```
30    /// use vibesql_ast::{ColumnDef, CreateTableStmt};
31    /// use vibesql_executor::CreateTableExecutor;
32    /// use vibesql_storage::Database;
33    /// use vibesql_types::DataType;
34    ///
35    /// let mut db = Database::new();
36    /// let stmt = CreateTableStmt {
37    ///     table_name: "users".to_string(),
38    ///     columns: vec![
39    ///         ColumnDef {
40    ///             name: "id".to_string(),
41    ///             data_type: DataType::Integer,
42    ///             nullable: false,
43    ///             constraints: vec![],
44    ///             default_value: None,
45    ///             comment: None,
46    ///         },
47    ///         ColumnDef {
48    ///             name: "name".to_string(),
49    ///             data_type: DataType::Varchar { max_length: Some(255) },
50    ///             nullable: true,
51    ///             constraints: vec![],
52    ///             default_value: None,
53    ///             comment: None,
54    ///         },
55    ///     ],
56    ///     table_constraints: vec![],
57    ///     table_options: vec![],
58    /// };
59    ///
60    /// let result = CreateTableExecutor::execute(&stmt, &mut db);
61    /// assert!(result.is_ok());
62    /// ```
63    pub fn execute(
64        stmt: &CreateTableStmt,
65        database: &mut Database,
66    ) -> Result<String, ExecutorError> {
67        // Parse qualified table name (schema.table or just table)
68        let (schema_name, table_name) =
69            if let Some((schema_part, table_part)) = stmt.table_name.split_once('.') {
70                (schema_part.to_string(), table_part.to_string())
71            } else {
72                (database.catalog.get_current_schema().to_string(), stmt.table_name.clone())
73            };
74
75        // Check CREATE privilege on the schema
76        PrivilegeChecker::check_create(database, &schema_name)?;
77
78        // Check if table already exists in the target schema
79        let qualified_name = format!("{}.{}", schema_name, table_name);
80        if database.catalog.table_exists(&qualified_name) {
81            return Err(ExecutorError::TableAlreadyExists(qualified_name));
82        }
83
84        // Check for AUTO_INCREMENT constraints
85        // MySQL allows only one AUTO_INCREMENT column per table
86        let auto_increment_columns: Vec<&str> = stmt
87            .columns
88            .iter()
89            .filter(|col_def| {
90                col_def
91                    .constraints
92                    .iter()
93                    .any(|c| matches!(c.kind, vibesql_ast::ColumnConstraintKind::AutoIncrement))
94            })
95            .map(|col_def| col_def.name.as_str())
96            .collect();
97
98        if auto_increment_columns.len() > 1 {
99            return Err(ExecutorError::ConstraintViolation(
100                "Only one AUTO_INCREMENT column allowed per table".to_string(),
101            ));
102        }
103
104        // Convert AST ColumnDef → Catalog ColumnSchema
105        let mut columns: Vec<ColumnSchema> =
106            stmt.columns
107                .iter()
108                .map(|col_def| {
109                    // For AUTO_INCREMENT columns, set default to NEXT VALUE FOR sequence
110                    let default_value =
111                        if col_def.constraints.iter().any(|c| {
112                            matches!(c.kind, vibesql_ast::ColumnConstraintKind::AutoIncrement)
113                        }) {
114                            // Create sequence name: {table_name}_{column_name}_seq
115                            let sequence_name = format!("{}_{}_seq", table_name, col_def.name);
116                            Some(vibesql_ast::Expression::NextValue { sequence_name })
117                        } else {
118                            col_def.default_value.as_ref().map(|expr| (**expr).clone())
119                        };
120
121                    ColumnSchema {
122                        name: col_def.name.clone(),
123                        data_type: col_def.data_type.clone(),
124                        nullable: col_def.nullable,
125                        default_value,
126                    }
127                })
128                .collect();
129
130        // Process constraints using the constraint validator
131        let constraint_result =
132            ConstraintValidator::process_constraints(&stmt.columns, &stmt.table_constraints)?;
133
134        // Apply constraint results to columns (updates nullability)
135        ConstraintValidator::apply_to_columns(&mut columns, &constraint_result);
136
137        // Create TableSchema with unqualified name
138        let mut table_schema = TableSchema::new(table_name.clone(), columns);
139
140        // Apply constraint results to schema (sets PK, unique, and check constraints)
141        ConstraintValidator::apply_to_schema(&mut table_schema, &constraint_result);
142
143        // Check for STORAGE table option and apply storage format
144        for option in &stmt.table_options {
145            if let vibesql_ast::TableOption::Storage(format) = option {
146                table_schema.set_storage_format(*format);
147            }
148        }
149
150        // Process foreign key constraints from table_constraints
151        for constraint in &stmt.table_constraints {
152            if let vibesql_ast::TableConstraintKind::ForeignKey {
153                columns: fk_columns,
154                references_table,
155                references_columns,
156                on_delete,
157                on_update,
158            } = &constraint.kind
159            {
160                // Resolve column indices for FK columns
161                let column_indices: Vec<usize> = fk_columns
162                    .iter()
163                    .map(|col_name| {
164                        table_schema.get_column_index(col_name).ok_or_else(|| {
165                            ExecutorError::ColumnNotFound {
166                                column_name: col_name.clone(),
167                                table_name: table_name.clone(),
168                                searched_tables: vec![table_name.clone()],
169                                available_columns: table_schema
170                                    .columns
171                                    .iter()
172                                    .map(|c| c.name.clone())
173                                    .collect(),
174                            }
175                        })
176                    })
177                    .collect::<Result<Vec<_>, _>>()?;
178
179                // Lookup parent table to get parent column indices
180                let parent_schema = database
181                    .catalog
182                    .get_table(references_table)
183                    .ok_or_else(|| ExecutorError::TableNotFound(references_table.clone()))?;
184
185                let parent_column_indices: Vec<usize> = references_columns
186                    .iter()
187                    .map(|col_name| {
188                        parent_schema.get_column_index(col_name).ok_or_else(|| {
189                            ExecutorError::ColumnNotFound {
190                                column_name: col_name.clone(),
191                                table_name: references_table.clone(),
192                                searched_tables: vec![references_table.clone()],
193                                available_columns: parent_schema
194                                    .columns
195                                    .iter()
196                                    .map(|c| c.name.clone())
197                                    .collect(),
198                            }
199                        })
200                    })
201                    .collect::<Result<Vec<_>, _>>()?;
202
203                // Convert ReferentialAction from AST to catalog type
204                let convert_action = |action: &Option<vibesql_ast::ReferentialAction>| match action
205                    .as_ref()
206                    .unwrap_or(&vibesql_ast::ReferentialAction::NoAction)
207                {
208                    vibesql_ast::ReferentialAction::Cascade => {
209                        vibesql_catalog::ReferentialAction::Cascade
210                    }
211                    vibesql_ast::ReferentialAction::SetNull => {
212                        vibesql_catalog::ReferentialAction::SetNull
213                    }
214                    vibesql_ast::ReferentialAction::SetDefault => {
215                        vibesql_catalog::ReferentialAction::SetDefault
216                    }
217                    vibesql_ast::ReferentialAction::Restrict => {
218                        vibesql_catalog::ReferentialAction::Restrict
219                    }
220                    vibesql_ast::ReferentialAction::NoAction => {
221                        vibesql_catalog::ReferentialAction::NoAction
222                    }
223                };
224
225                let fk = vibesql_catalog::ForeignKeyConstraint {
226                    name: constraint.name.clone(),
227                    column_names: fk_columns.clone(),
228                    column_indices,
229                    parent_table: references_table.clone(),
230                    parent_column_names: references_columns.clone(),
231                    parent_column_indices,
232                    on_delete: convert_action(on_delete),
233                    on_update: convert_action(on_update),
234                };
235
236                table_schema.add_foreign_key(fk)?;
237            }
238        }
239
240        // If creating in a non-current schema, temporarily switch to it
241        let original_schema = database.catalog.get_current_schema().to_string();
242        let needs_schema_switch = schema_name != original_schema;
243
244        if needs_schema_switch {
245            database
246                .catalog
247                .set_current_schema(&schema_name)
248                .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
249        }
250
251        // Create internal sequences for AUTO_INCREMENT columns
252        for auto_inc_col in &auto_increment_columns {
253            let sequence_name = format!("{}_{}_seq", table_name, auto_inc_col);
254            database
255                .catalog
256                .create_sequence(
257                    sequence_name.clone(),
258                    Some(1), // start_with: 1
259                    1,       // increment_by: 1
260                    Some(1), // min_value: 1
261                    None,    // max_value: unlimited
262                    false,   // cycle: false
263                )
264                .map_err(|e| {
265                    ExecutorError::StorageError(format!(
266                        "Failed to create sequence for AUTO_INCREMENT: {:?}",
267                        e
268                    ))
269                })?;
270        }
271
272        // Create table using Database API (handles both catalog and storage)
273        let result = database
274            .create_table(table_schema.clone())
275            .map_err(|e| ExecutorError::StorageError(e.to_string()));
276
277        // Check if table creation succeeded before creating indexes
278        result?;
279
280        // Auto-create indexes for PRIMARY KEY and UNIQUE constraints
281        Self::create_implicit_indexes(database, &table_name, &table_schema)?;
282
283        // Restore original schema if we switched
284        if needs_schema_switch {
285            database
286                .catalog
287                .set_current_schema(&original_schema)
288                .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
289        }
290
291        // Return success message
292        Ok(format!("Table '{}' created successfully in schema '{}'", table_name, schema_name))
293    }
294
295    /// Create implicit indexes for PRIMARY KEY and UNIQUE constraints
296    ///
297    /// Production databases automatically create B-tree indexes for these constraints
298    /// to enable efficient query optimization. This function replicates that behavior.
299    fn create_implicit_indexes(
300        database: &mut Database,
301        table_name: &str,
302        table_schema: &TableSchema,
303    ) -> Result<(), ExecutorError> {
304        // Auto-create PRIMARY KEY index
305        if let Some(pk_cols) = &table_schema.primary_key {
306            let index_name = format!("pk_{}", table_name);
307
308            // Create IndexColumn specs for the PRIMARY KEY columns
309            let index_columns: Vec<IndexColumn> = pk_cols
310                .iter()
311                .map(|col_name| IndexColumn {
312                    column_name: col_name.clone(),
313                    direction: OrderDirection::Asc,
314                    prefix_length: None,
315                })
316                .collect();
317
318            // Add to catalog first
319            let index_metadata = vibesql_catalog::IndexMetadata::new(
320                index_name.clone(),
321                table_name.to_string(),
322                vibesql_catalog::IndexType::BTree,
323                index_columns
324                    .iter()
325                    .map(|col| vibesql_catalog::IndexedColumn {
326                        column_name: col.column_name.clone(),
327                        order: vibesql_catalog::SortOrder::Ascending,
328                        prefix_length: None,
329                    })
330                    .collect(),
331                true, // unique
332            );
333            database
334                .catalog
335                .add_index(index_metadata)
336                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
337
338            // Create the actual B-tree index
339            database
340                .create_index(index_name, table_name.to_string(), true, index_columns)
341                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
342        }
343
344        // Auto-create UNIQUE constraint indexes
345        for unique_cols in &table_schema.unique_constraints {
346            let index_name = format!("uq_{}_{}", table_name, unique_cols.join("_"));
347
348            // Create IndexColumn specs for the UNIQUE columns
349            let index_columns: Vec<IndexColumn> = unique_cols
350                .iter()
351                .map(|col_name| IndexColumn {
352                    column_name: col_name.clone(),
353                    direction: OrderDirection::Asc,
354                    prefix_length: None,
355                })
356                .collect();
357
358            // Add to catalog first
359            let index_metadata = vibesql_catalog::IndexMetadata::new(
360                index_name.clone(),
361                table_name.to_string(),
362                vibesql_catalog::IndexType::BTree,
363                index_columns
364                    .iter()
365                    .map(|col| vibesql_catalog::IndexedColumn {
366                        column_name: col.column_name.clone(),
367                        order: vibesql_catalog::SortOrder::Ascending,
368                        prefix_length: None,
369                    })
370                    .collect(),
371                true, // unique
372            );
373            database
374                .catalog
375                .add_index(index_metadata)
376                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
377
378            // Create the actual B-tree index
379            database
380                .create_index(index_name, table_name.to_string(), true, index_columns)
381                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
382        }
383
384        Ok(())
385    }
386}