1use vibesql_ast::{CreateTableStmt, IndexColumn, OrderDirection};
4use vibesql_catalog::{ColumnSchema, TableSchema};
5use vibesql_storage::Database;
6
7use crate::{
8 constraint_validator::ConstraintValidator, errors::ExecutorError,
9 privilege_checker::PrivilegeChecker,
10};
11
12pub struct CreateTableExecutor;
14
15impl CreateTableExecutor {
16 pub fn execute(
64 stmt: &CreateTableStmt,
65 database: &mut Database,
66 ) -> Result<String, ExecutorError> {
67 let (schema_name, table_name) =
69 if let Some((schema_part, table_part)) = stmt.table_name.split_once('.') {
70 (schema_part.to_string(), table_part.to_string())
71 } else {
72 (database.catalog.get_current_schema().to_string(), stmt.table_name.clone())
73 };
74
75 PrivilegeChecker::check_create(database, &schema_name)?;
77
78 let qualified_name = format!("{}.{}", schema_name, table_name);
80 if database.catalog.table_exists(&qualified_name) {
81 return Err(ExecutorError::TableAlreadyExists(qualified_name));
82 }
83
84 let auto_increment_columns: Vec<&str> = stmt
87 .columns
88 .iter()
89 .filter(|col_def| {
90 col_def.constraints.iter().any(|c| {
91 matches!(
92 c.kind,
93 vibesql_ast::ColumnConstraintKind::AutoIncrement
94 )
95 })
96 })
97 .map(|col_def| col_def.name.as_str())
98 .collect();
99
100 if auto_increment_columns.len() > 1 {
101 return Err(ExecutorError::ConstraintViolation(
102 "Only one AUTO_INCREMENT column allowed per table".to_string(),
103 ));
104 }
105
106 let mut columns: Vec<ColumnSchema> = stmt
108 .columns
109 .iter()
110 .map(|col_def| {
111 let default_value = if col_def.constraints.iter().any(|c| {
113 matches!(
114 c.kind,
115 vibesql_ast::ColumnConstraintKind::AutoIncrement
116 )
117 }) {
118 let sequence_name = format!("{}_{}_seq", table_name, col_def.name);
120 Some(vibesql_ast::Expression::NextValue { sequence_name })
121 } else {
122 col_def.default_value.as_ref().map(|expr| (**expr).clone())
123 };
124
125 ColumnSchema {
126 name: col_def.name.clone(),
127 data_type: col_def.data_type.clone(),
128 nullable: col_def.nullable,
129 default_value,
130 }
131 })
132 .collect();
133
134 let constraint_result =
136 ConstraintValidator::process_constraints(&stmt.columns, &stmt.table_constraints)?;
137
138 ConstraintValidator::apply_to_columns(&mut columns, &constraint_result);
140
141 let mut table_schema = TableSchema::new(table_name.clone(), columns);
143
144 ConstraintValidator::apply_to_schema(&mut table_schema, &constraint_result);
146
147 for option in &stmt.table_options {
149 if let vibesql_ast::TableOption::Storage(format) = option {
150 table_schema.set_storage_format(*format);
151 }
152 }
153
154 for constraint in &stmt.table_constraints {
156 if let vibesql_ast::TableConstraintKind::ForeignKey {
157 columns: fk_columns,
158 references_table,
159 references_columns,
160 on_delete,
161 on_update,
162 } = &constraint.kind
163 {
164 let column_indices: Vec<usize> = fk_columns
166 .iter()
167 .map(|col_name| {
168 table_schema.get_column_index(col_name).ok_or_else(|| {
169 ExecutorError::ColumnNotFound {
170 column_name: col_name.clone(),
171 table_name: table_name.clone(),
172 searched_tables: vec![table_name.clone()],
173 available_columns: table_schema
174 .columns
175 .iter()
176 .map(|c| c.name.clone())
177 .collect(),
178 }
179 })
180 })
181 .collect::<Result<Vec<_>, _>>()?;
182
183 let parent_schema = database
185 .catalog
186 .get_table(references_table)
187 .ok_or_else(|| ExecutorError::TableNotFound(references_table.clone()))?;
188
189 let parent_column_indices: Vec<usize> = references_columns
190 .iter()
191 .map(|col_name| {
192 parent_schema.get_column_index(col_name).ok_or_else(|| {
193 ExecutorError::ColumnNotFound {
194 column_name: col_name.clone(),
195 table_name: references_table.clone(),
196 searched_tables: vec![references_table.clone()],
197 available_columns: parent_schema
198 .columns
199 .iter()
200 .map(|c| c.name.clone())
201 .collect(),
202 }
203 })
204 })
205 .collect::<Result<Vec<_>, _>>()?;
206
207 let convert_action = |action: &Option<vibesql_ast::ReferentialAction>| {
209 match action.as_ref().unwrap_or(&vibesql_ast::ReferentialAction::NoAction) {
210 vibesql_ast::ReferentialAction::Cascade => {
211 vibesql_catalog::ReferentialAction::Cascade
212 }
213 vibesql_ast::ReferentialAction::SetNull => {
214 vibesql_catalog::ReferentialAction::SetNull
215 }
216 vibesql_ast::ReferentialAction::SetDefault => {
217 vibesql_catalog::ReferentialAction::SetDefault
218 }
219 vibesql_ast::ReferentialAction::Restrict => {
220 vibesql_catalog::ReferentialAction::Restrict
221 }
222 vibesql_ast::ReferentialAction::NoAction => {
223 vibesql_catalog::ReferentialAction::NoAction
224 }
225 }
226 };
227
228 let fk = vibesql_catalog::ForeignKeyConstraint {
229 name: constraint.name.clone(),
230 column_names: fk_columns.clone(),
231 column_indices,
232 parent_table: references_table.clone(),
233 parent_column_names: references_columns.clone(),
234 parent_column_indices,
235 on_delete: convert_action(on_delete),
236 on_update: convert_action(on_update),
237 };
238
239 table_schema.add_foreign_key(fk)?;
240 }
241 }
242
243 let original_schema = database.catalog.get_current_schema().to_string();
245 let needs_schema_switch = schema_name != original_schema;
246
247 if needs_schema_switch {
248 database
249 .catalog
250 .set_current_schema(&schema_name)
251 .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
252 }
253
254 for auto_inc_col in &auto_increment_columns {
256 let sequence_name = format!("{}_{}_seq", table_name, auto_inc_col);
257 database
258 .catalog
259 .create_sequence(
260 sequence_name.clone(),
261 Some(1), 1, Some(1), None, false, )
267 .map_err(|e| ExecutorError::StorageError(format!("Failed to create sequence for AUTO_INCREMENT: {:?}", e)))?;
268 }
269
270 let result = database
272 .create_table(table_schema.clone())
273 .map_err(|e| ExecutorError::StorageError(e.to_string()));
274
275 result?;
277
278 Self::create_implicit_indexes(database, &table_name, &table_schema)?;
280
281 if needs_schema_switch {
283 database
284 .catalog
285 .set_current_schema(&original_schema)
286 .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
287 }
288
289 Ok(format!("Table '{}' created successfully in schema '{}'", table_name, schema_name))
291 }
292
293 fn create_implicit_indexes(
298 database: &mut Database,
299 table_name: &str,
300 table_schema: &TableSchema,
301 ) -> Result<(), ExecutorError> {
302 if let Some(pk_cols) = &table_schema.primary_key {
304 let index_name = format!("pk_{}", table_name);
305
306 let index_columns: Vec<IndexColumn> = pk_cols
308 .iter()
309 .map(|col_name| IndexColumn {
310 column_name: col_name.clone(),
311 direction: OrderDirection::Asc,
312 prefix_length: None,
313 })
314 .collect();
315
316 let index_metadata = vibesql_catalog::IndexMetadata::new(
318 index_name.clone(),
319 table_name.to_string(),
320 vibesql_catalog::IndexType::BTree,
321 index_columns
322 .iter()
323 .map(|col| vibesql_catalog::IndexedColumn {
324 column_name: col.column_name.clone(),
325 order: vibesql_catalog::SortOrder::Ascending,
326 prefix_length: None,
327 })
328 .collect(),
329 true, );
331 database
332 .catalog
333 .add_index(index_metadata)
334 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
335
336 database
338 .create_index(index_name, table_name.to_string(), true, index_columns)
339 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
340 }
341
342 for unique_cols in &table_schema.unique_constraints {
344 let index_name = format!("uq_{}_{}", table_name, unique_cols.join("_"));
345
346 let index_columns: Vec<IndexColumn> = unique_cols
348 .iter()
349 .map(|col_name| IndexColumn {
350 column_name: col_name.clone(),
351 direction: OrderDirection::Asc,
352 prefix_length: None,
353 })
354 .collect();
355
356 let index_metadata = vibesql_catalog::IndexMetadata::new(
358 index_name.clone(),
359 table_name.to_string(),
360 vibesql_catalog::IndexType::BTree,
361 index_columns
362 .iter()
363 .map(|col| vibesql_catalog::IndexedColumn {
364 column_name: col.column_name.clone(),
365 order: vibesql_catalog::SortOrder::Ascending,
366 prefix_length: None,
367 })
368 .collect(),
369 true, );
371 database
372 .catalog
373 .add_index(index_metadata)
374 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
375
376 database
378 .create_index(index_name, table_name.to_string(), true, index_columns)
379 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
380 }
381
382 Ok(())
383 }
384}