1use vibesql_ast::{CreateTableStmt, IndexColumn, OrderDirection};
4use vibesql_catalog::{ColumnSchema, TableSchema};
5use vibesql_storage::Database;
6
7use crate::{
8 constraint_validator::ConstraintValidator, errors::ExecutorError,
9 privilege_checker::PrivilegeChecker,
10};
11
12pub struct CreateTableExecutor;
14
15impl CreateTableExecutor {
16 pub fn execute(
64 stmt: &CreateTableStmt,
65 database: &mut Database,
66 ) -> Result<String, ExecutorError> {
67 let (schema_name, table_name) =
69 if let Some((schema_part, table_part)) = stmt.table_name.split_once('.') {
70 (schema_part.to_string(), table_part.to_string())
71 } else {
72 (database.catalog.get_current_schema().to_string(), stmt.table_name.clone())
73 };
74
75 PrivilegeChecker::check_create(database, &schema_name)?;
77
78 let qualified_name = format!("{}.{}", schema_name, table_name);
80 if database.catalog.table_exists(&qualified_name) {
81 return Err(ExecutorError::TableAlreadyExists(qualified_name));
82 }
83
84 let auto_increment_columns: Vec<&str> = stmt
87 .columns
88 .iter()
89 .filter(|col_def| {
90 col_def
91 .constraints
92 .iter()
93 .any(|c| matches!(c.kind, vibesql_ast::ColumnConstraintKind::AutoIncrement))
94 })
95 .map(|col_def| col_def.name.as_str())
96 .collect();
97
98 if auto_increment_columns.len() > 1 {
99 return Err(ExecutorError::ConstraintViolation(
100 "Only one AUTO_INCREMENT column allowed per table".to_string(),
101 ));
102 }
103
104 let mut columns: Vec<ColumnSchema> =
106 stmt.columns
107 .iter()
108 .map(|col_def| {
109 let default_value =
111 if col_def.constraints.iter().any(|c| {
112 matches!(c.kind, vibesql_ast::ColumnConstraintKind::AutoIncrement)
113 }) {
114 let sequence_name = format!("{}_{}_seq", table_name, col_def.name);
116 Some(vibesql_ast::Expression::NextValue { sequence_name })
117 } else {
118 col_def.default_value.as_ref().map(|expr| (**expr).clone())
119 };
120
121 ColumnSchema {
122 name: col_def.name.clone(),
123 data_type: col_def.data_type.clone(),
124 nullable: col_def.nullable,
125 default_value,
126 }
127 })
128 .collect();
129
130 let constraint_result =
132 ConstraintValidator::process_constraints(&stmt.columns, &stmt.table_constraints)?;
133
134 ConstraintValidator::apply_to_columns(&mut columns, &constraint_result);
136
137 let mut table_schema = TableSchema::new(table_name.clone(), columns);
139
140 ConstraintValidator::apply_to_schema(&mut table_schema, &constraint_result);
142
143 for option in &stmt.table_options {
145 if let vibesql_ast::TableOption::Storage(format) = option {
146 table_schema.set_storage_format(*format);
147 }
148 }
149
150 for constraint in &stmt.table_constraints {
152 if let vibesql_ast::TableConstraintKind::ForeignKey {
153 columns: fk_columns,
154 references_table,
155 references_columns,
156 on_delete,
157 on_update,
158 } = &constraint.kind
159 {
160 let column_indices: Vec<usize> = fk_columns
162 .iter()
163 .map(|col_name| {
164 table_schema.get_column_index(col_name).ok_or_else(|| {
165 ExecutorError::ColumnNotFound {
166 column_name: col_name.clone(),
167 table_name: table_name.clone(),
168 searched_tables: vec![table_name.clone()],
169 available_columns: table_schema
170 .columns
171 .iter()
172 .map(|c| c.name.clone())
173 .collect(),
174 }
175 })
176 })
177 .collect::<Result<Vec<_>, _>>()?;
178
179 let parent_schema = database
181 .catalog
182 .get_table(references_table)
183 .ok_or_else(|| ExecutorError::TableNotFound(references_table.clone()))?;
184
185 let parent_column_indices: Vec<usize> = references_columns
186 .iter()
187 .map(|col_name| {
188 parent_schema.get_column_index(col_name).ok_or_else(|| {
189 ExecutorError::ColumnNotFound {
190 column_name: col_name.clone(),
191 table_name: references_table.clone(),
192 searched_tables: vec![references_table.clone()],
193 available_columns: parent_schema
194 .columns
195 .iter()
196 .map(|c| c.name.clone())
197 .collect(),
198 }
199 })
200 })
201 .collect::<Result<Vec<_>, _>>()?;
202
203 let convert_action = |action: &Option<vibesql_ast::ReferentialAction>| match action
205 .as_ref()
206 .unwrap_or(&vibesql_ast::ReferentialAction::NoAction)
207 {
208 vibesql_ast::ReferentialAction::Cascade => {
209 vibesql_catalog::ReferentialAction::Cascade
210 }
211 vibesql_ast::ReferentialAction::SetNull => {
212 vibesql_catalog::ReferentialAction::SetNull
213 }
214 vibesql_ast::ReferentialAction::SetDefault => {
215 vibesql_catalog::ReferentialAction::SetDefault
216 }
217 vibesql_ast::ReferentialAction::Restrict => {
218 vibesql_catalog::ReferentialAction::Restrict
219 }
220 vibesql_ast::ReferentialAction::NoAction => {
221 vibesql_catalog::ReferentialAction::NoAction
222 }
223 };
224
225 let fk = vibesql_catalog::ForeignKeyConstraint {
226 name: constraint.name.clone(),
227 column_names: fk_columns.clone(),
228 column_indices,
229 parent_table: references_table.clone(),
230 parent_column_names: references_columns.clone(),
231 parent_column_indices,
232 on_delete: convert_action(on_delete),
233 on_update: convert_action(on_update),
234 };
235
236 table_schema.add_foreign_key(fk)?;
237 }
238 }
239
240 let original_schema = database.catalog.get_current_schema().to_string();
242 let needs_schema_switch = schema_name != original_schema;
243
244 if needs_schema_switch {
245 database
246 .catalog
247 .set_current_schema(&schema_name)
248 .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
249 }
250
251 for auto_inc_col in &auto_increment_columns {
253 let sequence_name = format!("{}_{}_seq", table_name, auto_inc_col);
254 database
255 .catalog
256 .create_sequence(
257 sequence_name.clone(),
258 Some(1), 1, Some(1), None, false, )
264 .map_err(|e| {
265 ExecutorError::StorageError(format!(
266 "Failed to create sequence for AUTO_INCREMENT: {:?}",
267 e
268 ))
269 })?;
270 }
271
272 let result = database
274 .create_table(table_schema.clone())
275 .map_err(|e| ExecutorError::StorageError(e.to_string()));
276
277 result?;
279
280 Self::create_implicit_indexes(database, &table_name, &table_schema)?;
282
283 if needs_schema_switch {
285 database
286 .catalog
287 .set_current_schema(&original_schema)
288 .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
289 }
290
291 Ok(format!("Table '{}' created successfully in schema '{}'", table_name, schema_name))
293 }
294
295 fn create_implicit_indexes(
300 database: &mut Database,
301 table_name: &str,
302 table_schema: &TableSchema,
303 ) -> Result<(), ExecutorError> {
304 if let Some(pk_cols) = &table_schema.primary_key {
306 let index_name = format!("pk_{}", table_name);
307
308 let index_columns: Vec<IndexColumn> = pk_cols
310 .iter()
311 .map(|col_name| IndexColumn {
312 column_name: col_name.clone(),
313 direction: OrderDirection::Asc,
314 prefix_length: None,
315 })
316 .collect();
317
318 let index_metadata = vibesql_catalog::IndexMetadata::new(
320 index_name.clone(),
321 table_name.to_string(),
322 vibesql_catalog::IndexType::BTree,
323 index_columns
324 .iter()
325 .map(|col| vibesql_catalog::IndexedColumn {
326 column_name: col.column_name.clone(),
327 order: vibesql_catalog::SortOrder::Ascending,
328 prefix_length: None,
329 })
330 .collect(),
331 true, );
333 database
334 .catalog
335 .add_index(index_metadata)
336 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
337
338 database
340 .create_index(index_name, table_name.to_string(), true, index_columns)
341 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
342 }
343
344 for unique_cols in &table_schema.unique_constraints {
346 let index_name = format!("uq_{}_{}", table_name, unique_cols.join("_"));
347
348 let index_columns: Vec<IndexColumn> = unique_cols
350 .iter()
351 .map(|col_name| IndexColumn {
352 column_name: col_name.clone(),
353 direction: OrderDirection::Asc,
354 prefix_length: None,
355 })
356 .collect();
357
358 let index_metadata = vibesql_catalog::IndexMetadata::new(
360 index_name.clone(),
361 table_name.to_string(),
362 vibesql_catalog::IndexType::BTree,
363 index_columns
364 .iter()
365 .map(|col| vibesql_catalog::IndexedColumn {
366 column_name: col.column_name.clone(),
367 order: vibesql_catalog::SortOrder::Ascending,
368 prefix_length: None,
369 })
370 .collect(),
371 true, );
373 database
374 .catalog
375 .add_index(index_metadata)
376 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
377
378 database
380 .create_index(index_name, table_name.to_string(), true, index_columns)
381 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
382 }
383
384 Ok(())
385 }
386}