1use vibesql_ast::{CreateTableStmt, IndexColumn, OrderDirection};
4use vibesql_catalog::{ColumnSchema, TableSchema};
5use vibesql_storage::Database;
6
7use crate::{
8 constraint_validator::ConstraintValidator, errors::ExecutorError,
9 privilege_checker::PrivilegeChecker,
10};
11
12pub struct CreateTableExecutor;
14
15impl CreateTableExecutor {
16 pub fn execute(
65 stmt: &CreateTableStmt,
66 database: &mut Database,
67 ) -> Result<String, ExecutorError> {
68 let (schema_name, table_name) =
70 if let Some((schema_part, table_part)) = stmt.table_name.split_once('.') {
71 (schema_part.to_string(), table_part.to_string())
72 } else {
73 (database.catalog.get_current_schema().to_string(), stmt.table_name.clone())
74 };
75
76 PrivilegeChecker::check_create(database, &schema_name)?;
78
79 let qualified_name = format!("{}.{}", schema_name, table_name);
81 if database.catalog.table_exists(&qualified_name) {
82 if stmt.if_not_exists {
83 return Ok(format!(
85 "Table '{}' already exists in schema '{}' (skipped)",
86 table_name, schema_name
87 ));
88 }
89 return Err(ExecutorError::TableAlreadyExists(qualified_name));
90 }
91
92 let auto_increment_columns: Vec<&str> = stmt
95 .columns
96 .iter()
97 .filter(|col_def| {
98 col_def
99 .constraints
100 .iter()
101 .any(|c| matches!(c.kind, vibesql_ast::ColumnConstraintKind::AutoIncrement))
102 })
103 .map(|col_def| col_def.name.as_str())
104 .collect();
105
106 if auto_increment_columns.len() > 1 {
107 return Err(ExecutorError::ConstraintViolation(
108 "Only one AUTO_INCREMENT column allowed per table".to_string(),
109 ));
110 }
111
112 let mut columns: Vec<ColumnSchema> =
114 stmt.columns
115 .iter()
116 .map(|col_def| {
117 let default_value =
119 if col_def.constraints.iter().any(|c| {
120 matches!(c.kind, vibesql_ast::ColumnConstraintKind::AutoIncrement)
121 }) {
122 let sequence_name = format!("{}_{}_seq", table_name, col_def.name);
124 Some(vibesql_ast::Expression::NextValue { sequence_name })
125 } else {
126 col_def.default_value.as_ref().map(|expr| (**expr).clone())
127 };
128
129 ColumnSchema {
130 name: col_def.name.clone(),
131 data_type: col_def.data_type.clone(),
132 nullable: col_def.nullable,
133 default_value,
134 }
135 })
136 .collect();
137
138 let constraint_result =
140 ConstraintValidator::process_constraints(&stmt.columns, &stmt.table_constraints)?;
141
142 ConstraintValidator::apply_to_columns(&mut columns, &constraint_result);
144
145 let mut table_schema = TableSchema::new(table_name.clone(), columns);
147
148 ConstraintValidator::apply_to_schema(&mut table_schema, &constraint_result);
150
151 for option in &stmt.table_options {
153 if let vibesql_ast::TableOption::Storage(format) = option {
154 table_schema.set_storage_format(*format);
155 }
156 }
157
158 for constraint in &stmt.table_constraints {
160 if let vibesql_ast::TableConstraintKind::ForeignKey {
161 columns: fk_columns,
162 references_table,
163 references_columns,
164 on_delete,
165 on_update,
166 } = &constraint.kind
167 {
168 let column_indices: Vec<usize> = fk_columns
170 .iter()
171 .map(|col_name| {
172 table_schema.get_column_index(col_name).ok_or_else(|| {
173 ExecutorError::ColumnNotFound {
174 column_name: col_name.clone(),
175 table_name: table_name.clone(),
176 searched_tables: vec![table_name.clone()],
177 available_columns: table_schema
178 .columns
179 .iter()
180 .map(|c| c.name.clone())
181 .collect(),
182 }
183 })
184 })
185 .collect::<Result<Vec<_>, _>>()?;
186
187 let parent_schema = database
189 .catalog
190 .get_table(references_table)
191 .ok_or_else(|| ExecutorError::TableNotFound(references_table.clone()))?;
192
193 let parent_column_indices: Vec<usize> = references_columns
194 .iter()
195 .map(|col_name| {
196 parent_schema.get_column_index(col_name).ok_or_else(|| {
197 ExecutorError::ColumnNotFound {
198 column_name: col_name.clone(),
199 table_name: references_table.clone(),
200 searched_tables: vec![references_table.clone()],
201 available_columns: parent_schema
202 .columns
203 .iter()
204 .map(|c| c.name.clone())
205 .collect(),
206 }
207 })
208 })
209 .collect::<Result<Vec<_>, _>>()?;
210
211 let convert_action = |action: &Option<vibesql_ast::ReferentialAction>| match action
213 .as_ref()
214 .unwrap_or(&vibesql_ast::ReferentialAction::NoAction)
215 {
216 vibesql_ast::ReferentialAction::Cascade => {
217 vibesql_catalog::ReferentialAction::Cascade
218 }
219 vibesql_ast::ReferentialAction::SetNull => {
220 vibesql_catalog::ReferentialAction::SetNull
221 }
222 vibesql_ast::ReferentialAction::SetDefault => {
223 vibesql_catalog::ReferentialAction::SetDefault
224 }
225 vibesql_ast::ReferentialAction::Restrict => {
226 vibesql_catalog::ReferentialAction::Restrict
227 }
228 vibesql_ast::ReferentialAction::NoAction => {
229 vibesql_catalog::ReferentialAction::NoAction
230 }
231 };
232
233 let fk = vibesql_catalog::ForeignKeyConstraint {
234 name: constraint.name.clone(),
235 column_names: fk_columns.clone(),
236 column_indices,
237 parent_table: references_table.clone(),
238 parent_column_names: references_columns.clone(),
239 parent_column_indices,
240 on_delete: convert_action(on_delete),
241 on_update: convert_action(on_update),
242 };
243
244 table_schema.add_foreign_key(fk)?;
245 }
246 }
247
248 let original_schema = database.catalog.get_current_schema().to_string();
250 let needs_schema_switch = schema_name != original_schema;
251
252 if needs_schema_switch {
253 database
254 .catalog
255 .set_current_schema(&schema_name)
256 .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
257 }
258
259 for auto_inc_col in &auto_increment_columns {
261 let sequence_name = format!("{}_{}_seq", table_name, auto_inc_col);
262 database
263 .catalog
264 .create_sequence(
265 sequence_name.clone(),
266 Some(1), 1, Some(1), None, false, )
272 .map_err(|e| {
273 ExecutorError::StorageError(format!(
274 "Failed to create sequence for AUTO_INCREMENT: {:?}",
275 e
276 ))
277 })?;
278 }
279
280 let result = database
282 .create_table(table_schema.clone())
283 .map_err(|e| ExecutorError::StorageError(e.to_string()));
284
285 result?;
287
288 Self::create_implicit_indexes(database, &table_name, &table_schema)?;
290
291 if needs_schema_switch {
293 database
294 .catalog
295 .set_current_schema(&original_schema)
296 .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
297 }
298
299 Ok(format!("Table '{}' created successfully in schema '{}'", table_name, schema_name))
301 }
302
303 fn create_implicit_indexes(
308 database: &mut Database,
309 table_name: &str,
310 table_schema: &TableSchema,
311 ) -> Result<(), ExecutorError> {
312 if let Some(pk_cols) = &table_schema.primary_key {
314 let index_name = format!("pk_{}", table_name);
315
316 let index_columns: Vec<IndexColumn> = pk_cols
318 .iter()
319 .map(|col_name| IndexColumn {
320 column_name: col_name.clone(),
321 direction: OrderDirection::Asc,
322 prefix_length: None,
323 })
324 .collect();
325
326 let index_metadata = vibesql_catalog::IndexMetadata::new(
328 index_name.clone(),
329 table_name.to_string(),
330 vibesql_catalog::IndexType::BTree,
331 index_columns
332 .iter()
333 .map(|col| vibesql_catalog::IndexedColumn {
334 column_name: col.column_name.clone(),
335 order: vibesql_catalog::SortOrder::Ascending,
336 prefix_length: None,
337 })
338 .collect(),
339 true, );
341 database
342 .catalog
343 .add_index(index_metadata)
344 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
345
346 database
348 .create_index(index_name, table_name.to_string(), true, index_columns)
349 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
350 }
351
352 for unique_cols in &table_schema.unique_constraints {
354 let index_name = format!("uq_{}_{}", table_name, unique_cols.join("_"));
355
356 let index_columns: Vec<IndexColumn> = unique_cols
358 .iter()
359 .map(|col_name| IndexColumn {
360 column_name: col_name.clone(),
361 direction: OrderDirection::Asc,
362 prefix_length: None,
363 })
364 .collect();
365
366 let index_metadata = vibesql_catalog::IndexMetadata::new(
368 index_name.clone(),
369 table_name.to_string(),
370 vibesql_catalog::IndexType::BTree,
371 index_columns
372 .iter()
373 .map(|col| vibesql_catalog::IndexedColumn {
374 column_name: col.column_name.clone(),
375 order: vibesql_catalog::SortOrder::Ascending,
376 prefix_length: None,
377 })
378 .collect(),
379 true, );
381 database
382 .catalog
383 .add_index(index_metadata)
384 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
385
386 database
388 .create_index(index_name, table_name.to_string(), true, index_columns)
389 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
390 }
391
392 Ok(())
393 }
394}