1use vibesql_ast::{CreateTableStmt, IndexColumn, OrderDirection};
4use vibesql_catalog::{ColumnSchema, TableIdentifier, TableSchema};
5use vibesql_storage::Database;
6use vibesql_types::DataType;
7
8use crate::{
9 constraint_validator::ConstraintValidator, errors::ExecutorError,
10 privilege_checker::PrivilegeChecker, SelectExecutor,
11};
12
13pub struct CreateTableExecutor;
15
16impl CreateTableExecutor {
17 pub fn execute(
70 stmt: &CreateTableStmt,
71 database: &mut Database,
72 ) -> Result<String, ExecutorError> {
73 let (schema_name, table_name, identifier) = if stmt.temporary {
76 let temp_schema = database.catalog.temp_schema_name();
80 let id = TableIdentifier::qualified(
81 temp_schema,
82 false,
83 &stmt.table_name,
84 stmt.quoted,
85 );
86 (temp_schema.to_string(), stmt.table_name.clone(), id)
87 } else if let Some((schema_part, table_part)) = stmt.table_name.split_once('.') {
88 let id = TableIdentifier::qualified(schema_part, stmt.quoted, table_part, stmt.quoted);
92 (schema_part.to_string(), table_part.to_string(), id)
93 } else {
94 let id = TableIdentifier::new(&stmt.table_name, stmt.quoted);
96 (database.catalog.get_current_schema().to_string(), stmt.table_name.clone(), id)
97 };
98
99 PrivilegeChecker::check_create(database, &schema_name)?;
101
102 if let Some(query) = &stmt.as_query {
104 return Self::execute_create_as_select(
105 database,
106 &table_name,
107 &schema_name,
108 identifier,
109 stmt.if_not_exists,
110 query,
111 );
112 }
113
114 if database.catalog.table_exists_by_identifier(&identifier) {
121 if stmt.if_not_exists {
122 return Ok(format!(
124 "Table '{}' already exists in schema '{}' (skipped)",
125 table_name, schema_name
126 ));
127 }
128 return Err(ExecutorError::TableAlreadyExists(format!(
129 "{}.{}",
130 schema_name,
131 identifier.display()
132 )));
133 }
134
135 let auto_increment_columns: Vec<&str> = stmt
138 .columns
139 .iter()
140 .filter(|col_def| {
141 col_def
142 .constraints
143 .iter()
144 .any(|c| matches!(c.kind, vibesql_ast::ColumnConstraintKind::AutoIncrement))
145 })
146 .map(|col_def| col_def.name.as_str())
147 .collect();
148
149 if auto_increment_columns.len() > 1 {
150 return Err(ExecutorError::ConstraintViolation(
151 "Only one AUTO_INCREMENT column allowed per table".to_string(),
152 ));
153 }
154
155 let mut columns: Vec<ColumnSchema> = stmt
157 .columns
158 .iter()
159 .map(|col_def| {
160 let default_value = if col_def
162 .constraints
163 .iter()
164 .any(|c| matches!(c.kind, vibesql_ast::ColumnConstraintKind::AutoIncrement))
165 {
166 let sequence_name = format!("{}_{}_seq", table_name, col_def.name);
168 Some(vibesql_ast::Expression::NextValue { sequence_name })
169 } else {
170 col_def.default_value.as_ref().map(|expr| (**expr).clone())
171 };
172
173 let collation = col_def.constraints.iter().find_map(|c| {
175 if let vibesql_ast::ColumnConstraintKind::Collate(coll) = &c.kind {
176 Some(coll.clone())
177 } else {
178 None
179 }
180 });
181
182 ColumnSchema {
183 name: col_def.name.clone(),
184 data_type: col_def.data_type.clone(),
185 nullable: col_def.nullable,
186 default_value,
187 generated_expr: col_def.generated_expr.as_ref().map(|expr| (**expr).clone()),
188 collation,
189 is_exact_integer_type: col_def.is_exact_integer_type,
190 }
191 })
192 .collect();
193
194 let constraint_result =
196 ConstraintValidator::process_constraints(&stmt.columns, &stmt.table_constraints)?;
197
198 ConstraintValidator::apply_to_columns(&mut columns, &constraint_result);
200
201 let mut table_schema = TableSchema::new(table_name.clone(), columns);
203
204 table_schema.without_rowid = stmt.without_rowid;
206
207 ConstraintValidator::apply_to_schema(&mut table_schema, &constraint_result);
209
210 if stmt.without_rowid && table_schema.primary_key.is_none() {
212 return Err(ExecutorError::ConstraintViolation(
213 "WITHOUT ROWID tables must have a PRIMARY KEY".to_string(),
214 ));
215 }
216
217 if let Some(pk_cols) = &table_schema.primary_key {
222 if pk_cols.len() == 1 {
223 if let Some(col_idx) = table_schema.get_column_index(&pk_cols[0]) {
224 let col = &table_schema.columns[col_idx];
225 if matches!(col.data_type, DataType::Integer) && col.is_exact_integer_type {
227 table_schema.set_rowid_alias_column(Some(col_idx));
228 }
229 }
230 }
231 }
232
233 for option in &stmt.table_options {
235 if let vibesql_ast::TableOption::Storage(format) = option {
236 table_schema.set_storage_format(*format);
237 }
238 }
239
240 for constraint in &stmt.table_constraints {
242 if let vibesql_ast::TableConstraintKind::ForeignKey {
243 columns: fk_columns,
244 references_table,
245 references_columns,
246 on_delete,
247 on_update,
248 ..
249 } = &constraint.kind
250 {
251 let column_indices: Vec<usize> = fk_columns
253 .iter()
254 .map(|col_name| {
255 table_schema.get_column_index(col_name).ok_or_else(|| {
256 ExecutorError::ColumnNotFound {
257 column_name: col_name.to_string(),
258 table_name: table_name.clone(),
259 searched_tables: vec![table_name.clone()],
260 available_columns: table_schema
261 .columns
262 .iter()
263 .map(|c| c.name.clone())
264 .collect(),
265 }
266 })
267 })
268 .collect::<Result<Vec<_>, _>>()?;
269
270 let parent_schema = database
272 .catalog
273 .get_table(references_table)
274 .ok_or_else(|| ExecutorError::TableNotFound(references_table.clone()))?;
275
276 let parent_column_indices: Vec<usize> = references_columns
277 .iter()
278 .map(|col_name| {
279 parent_schema.get_column_index(col_name).ok_or_else(|| {
280 ExecutorError::ColumnNotFound {
281 column_name: col_name.to_string(),
282 table_name: references_table.clone(),
283 searched_tables: vec![references_table.clone()],
284 available_columns: parent_schema
285 .columns
286 .iter()
287 .map(|c| c.name.clone())
288 .collect(),
289 }
290 })
291 })
292 .collect::<Result<Vec<_>, _>>()?;
293
294 let convert_action = |action: &Option<vibesql_ast::ReferentialAction>| match action
296 .as_ref()
297 .unwrap_or(&vibesql_ast::ReferentialAction::NoAction)
298 {
299 vibesql_ast::ReferentialAction::Cascade => {
300 vibesql_catalog::ReferentialAction::Cascade
301 }
302 vibesql_ast::ReferentialAction::SetNull => {
303 vibesql_catalog::ReferentialAction::SetNull
304 }
305 vibesql_ast::ReferentialAction::SetDefault => {
306 vibesql_catalog::ReferentialAction::SetDefault
307 }
308 vibesql_ast::ReferentialAction::Restrict => {
309 vibesql_catalog::ReferentialAction::Restrict
310 }
311 vibesql_ast::ReferentialAction::NoAction => {
312 vibesql_catalog::ReferentialAction::NoAction
313 }
314 };
315
316 let fk = vibesql_catalog::ForeignKeyConstraint {
317 name: constraint.name.clone(),
318 column_names: fk_columns.clone(),
319 column_indices,
320 parent_table: references_table.clone(),
321 parent_column_names: references_columns.clone(),
322 parent_column_indices,
323 on_delete: convert_action(on_delete),
324 on_update: convert_action(on_update),
325 };
326
327 table_schema.add_foreign_key(fk)?;
328 }
329 }
330
331 let original_schema = database.catalog.get_current_schema().to_string();
333 let needs_schema_switch = schema_name != original_schema;
334
335 if needs_schema_switch {
336 database
337 .catalog
338 .set_current_schema(&schema_name)
339 .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
340 }
341
342 for auto_inc_col in &auto_increment_columns {
344 let sequence_name = format!("{}_{}_seq", table_name, auto_inc_col);
345 database
346 .catalog
347 .create_sequence(
348 sequence_name.clone(),
349 Some(1), 1, Some(1), None, false, )
355 .map_err(|e| {
356 ExecutorError::StorageError(format!(
357 "Failed to create sequence for AUTO_INCREMENT: {:?}",
358 e
359 ))
360 })?;
361 }
362
363 let result = database
366 .create_table_with_identifier(table_schema.clone(), identifier.clone())
367 .map_err(|e| ExecutorError::StorageError(e.to_string()));
368
369 result?;
371
372 Self::create_implicit_indexes(database, &table_name, &table_schema)?;
374
375 if needs_schema_switch {
377 database
378 .catalog
379 .set_current_schema(&original_schema)
380 .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
381 }
382
383 Ok(format!("Table '{}' created successfully in schema '{}'", table_name, schema_name))
385 }
386
387 fn create_implicit_indexes(
392 database: &mut Database,
393 table_name: &str,
394 table_schema: &TableSchema,
395 ) -> Result<(), ExecutorError> {
396 let mut autoindex_counter = 1;
398
399 if let Some(pk_cols) = &table_schema.primary_key {
403 if table_schema.rowid_alias_column.is_none() {
404 let index_name = format!("sqlite_autoindex_{}_{}", table_name, autoindex_counter);
405 autoindex_counter += 1;
406
407 let index_columns: Vec<IndexColumn> = pk_cols
409 .iter()
410 .map(|col_name| IndexColumn::Column {
411 column_name: col_name.to_string(),
412 direction: OrderDirection::Asc,
413 prefix_length: None,
414 })
415 .collect();
416
417 let index_metadata = vibesql_catalog::IndexMetadata::new(
419 index_name.clone(),
420 table_name.to_string(),
421 vibesql_catalog::IndexType::BTree,
422 index_columns
423 .iter()
424 .map(|col| {
425 vibesql_catalog::IndexedColumn::new_column(
426 col.expect_column_name().to_string(),
427 vibesql_catalog::SortOrder::Ascending,
428 )
429 })
430 .collect(),
431 true, );
433 database
434 .catalog
435 .add_index(index_metadata)
436 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
437
438 database
440 .create_index(index_name, table_name.to_string(), true, index_columns)
441 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
442 }
443 }
444
445 for unique_cols in &table_schema.unique_constraints {
447 let index_name = format!("sqlite_autoindex_{}_{}", table_name, autoindex_counter);
448 autoindex_counter += 1;
449
450 let index_columns: Vec<IndexColumn> = unique_cols
452 .iter()
453 .map(|col_name| IndexColumn::Column {
454 column_name: col_name.to_string(),
455 direction: OrderDirection::Asc,
456 prefix_length: None,
457 })
458 .collect();
459
460 let index_metadata = vibesql_catalog::IndexMetadata::new(
462 index_name.clone(),
463 table_name.to_string(),
464 vibesql_catalog::IndexType::BTree,
465 index_columns
466 .iter()
467 .map(|col| {
468 vibesql_catalog::IndexedColumn::new_column(
469 col.expect_column_name().to_string(),
470 vibesql_catalog::SortOrder::Ascending,
471 )
472 })
473 .collect(),
474 true, );
476 database
477 .catalog
478 .add_index(index_metadata)
479 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
480
481 database
483 .create_index(index_name, table_name.to_string(), true, index_columns)
484 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
485 }
486
487 Ok(())
488 }
489
490 fn execute_create_as_select(
495 database: &mut Database,
496 table_name: &str,
497 schema_name: &str,
498 identifier: TableIdentifier,
499 if_not_exists: bool,
500 query: &vibesql_ast::SelectStmt,
501 ) -> Result<String, ExecutorError> {
502 if database.catalog.table_exists_by_identifier(&identifier) {
504 if if_not_exists {
505 return Ok(format!(
506 "Table '{}' already exists in schema '{}' (skipped)",
507 table_name, schema_name
508 ));
509 }
510 return Err(ExecutorError::TableAlreadyExists(format!(
511 "{}.{}",
512 schema_name,
513 identifier.display()
514 )));
515 }
516
517 let rows = SelectExecutor::new(database).execute(query)?;
519
520 let column_names =
522 Self::derive_column_names_from_select_list(&query.select_list, &query.from, database)?;
523
524 let columns: Vec<ColumnSchema> = column_names
526 .iter()
527 .enumerate()
528 .map(|(idx, col_name)| {
529 let data_type = if !rows.is_empty() && idx < rows[0].values.len() {
531 Self::infer_data_type(&rows[0].values[idx])
532 } else {
533 DataType::BinaryLargeObject
535 };
536
537 ColumnSchema {
538 name: col_name.to_string(),
539 data_type,
540 nullable: true, default_value: None,
542 generated_expr: None,
543 collation: None, is_exact_integer_type: false, }
546 })
547 .collect();
548
549 let table_schema = TableSchema::new(table_name.to_string(), columns);
551
552 database
554 .create_table_with_identifier(table_schema, identifier)
555 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
556
557 let row_count = rows.len();
559 for row in rows {
560 database
561 .insert_row(table_name, row)
562 .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
563 }
564
565 Ok(format!(
566 "Table '{}' created successfully in schema '{}' with {} rows",
567 table_name, schema_name, row_count
568 ))
569 }
570
571 fn derive_column_names_from_select_list(
573 select_list: &[vibesql_ast::SelectItem],
574 from: &Option<vibesql_ast::FromClause>,
575 database: &Database,
576 ) -> Result<Vec<String>, ExecutorError> {
577 let mut names = Vec::new();
578 let mut counter = 0;
579
580 for item in select_list {
581 match item {
582 vibesql_ast::SelectItem::Wildcard { .. } => {
583 let table_names = Self::get_table_names_from_from(from)?;
585 for table_name in table_names {
586 if let Some(schema) = database.catalog.get_table(&table_name) {
587 for col in &schema.columns {
588 names.push(col.name.clone());
589 }
590 } else {
591 return Err(ExecutorError::TableNotFound(table_name));
592 }
593 }
594 }
595 vibesql_ast::SelectItem::QualifiedWildcard { qualifier, .. } => {
596 if let Some(schema) = database.catalog.get_table(qualifier) {
598 for col in &schema.columns {
599 names.push(col.name.clone());
600 }
601 } else {
602 return Err(ExecutorError::TableNotFound(qualifier.clone()));
603 }
604 }
605 vibesql_ast::SelectItem::Expression { expr, alias, .. } => {
606 let name = if let Some(alias) = alias {
607 alias.clone()
608 } else {
609 Self::derive_column_name_from_expr(expr, &mut counter)
611 };
612 names.push(name);
613 }
614 }
615 }
616
617 Ok(names)
618 }
619
620 fn get_table_names_from_from(
622 from: &Option<vibesql_ast::FromClause>,
623 ) -> Result<Vec<String>, ExecutorError> {
624 let mut names = Vec::new();
625
626 match from {
627 None => {
628 return Err(ExecutorError::UnsupportedFeature(
630 "CREATE TABLE AS SELECT * requires a FROM clause".to_string(),
631 ));
632 }
633 Some(vibesql_ast::FromClause::Table { name, .. }) => {
634 names.push(name.clone());
635 }
636 Some(vibesql_ast::FromClause::Join { left, right, .. }) => {
637 names.extend(Self::get_table_names_from_from(&Some(*left.clone()))?);
639 names.extend(Self::get_table_names_from_from(&Some(*right.clone()))?);
640 }
641 Some(vibesql_ast::FromClause::Subquery { alias, .. }) => {
642 return Err(ExecutorError::UnsupportedFeature(format!(
645 "CREATE TABLE AS SELECT * from subquery '{}' not supported - please specify columns explicitly",
646 alias
647 )));
648 }
649 Some(vibesql_ast::FromClause::Values { alias, .. }) => {
650 return Err(ExecutorError::UnsupportedFeature(format!(
652 "CREATE TABLE AS SELECT * from VALUES '{}' not supported - please specify columns explicitly",
653 alias
654 )));
655 }
656 }
657
658 Ok(names)
659 }
660
661 fn derive_column_name_from_expr(expr: &vibesql_ast::Expression, counter: &mut usize) -> String {
663 match expr {
664 vibesql_ast::Expression::ColumnRef(col_id) => col_id.column_canonical().to_string(),
665 vibesql_ast::Expression::Literal(_) => {
666 *counter += 1;
667 format!("column{}", counter)
668 }
669 vibesql_ast::Expression::Function { name, .. } => {
670 name.to_string().to_lowercase()
672 }
673 _ => {
674 *counter += 1;
675 format!("column{}", counter)
676 }
677 }
678 }
679
680 fn infer_data_type(value: &vibesql_types::SqlValue) -> DataType {
682 use vibesql_types::SqlValue;
683 match value {
684 SqlValue::Null => DataType::BinaryLargeObject,
685 SqlValue::Boolean(_) => DataType::Boolean,
686 SqlValue::Integer(_) => DataType::Integer,
687 SqlValue::Bigint(_) => DataType::Bigint,
688 SqlValue::Smallint(_) => DataType::Smallint,
689 SqlValue::Unsigned(_) => DataType::Unsigned,
690 SqlValue::Float(_) | SqlValue::Real(_) => DataType::Real,
691 SqlValue::Double(_) | SqlValue::Numeric(_) => DataType::DoublePrecision,
692 SqlValue::Character(_) => DataType::Character { length: 255 },
693 SqlValue::Varchar(_) => DataType::Varchar { max_length: None },
694 SqlValue::Date(_) => DataType::Date,
695 SqlValue::Time(_) => DataType::Time { with_timezone: false },
696 SqlValue::Timestamp(_) => DataType::Timestamp { with_timezone: false },
697 SqlValue::Interval(_) => DataType::Interval {
698 start_field: vibesql_types::IntervalField::Day,
699 end_field: None,
700 },
701 SqlValue::Vector(v) => DataType::Vector { dimensions: v.len() as u32 },
702 SqlValue::Blob(_) => DataType::BinaryLargeObject,
703 }
704 }
705}