vibesql_executor/
create_table.rs

1//! CREATE TABLE statement execution
2
3use vibesql_ast::{CreateTableStmt, IndexColumn, OrderDirection};
4use vibesql_catalog::{ColumnSchema, TableIdentifier, TableSchema};
5use vibesql_storage::Database;
6use vibesql_types::DataType;
7
8use crate::{
9    constraint_validator::ConstraintValidator, errors::ExecutorError,
10    privilege_checker::PrivilegeChecker, SelectExecutor,
11};
12
13/// Executor for CREATE TABLE statements
14pub struct CreateTableExecutor;
15
16impl CreateTableExecutor {
17    /// Execute a CREATE TABLE statement
18    ///
19    /// # Arguments
20    ///
21    /// * `stmt` - The CREATE TABLE statement AST node
22    /// * `database` - The database to create the table in
23    ///
24    /// # Returns
25    ///
26    /// Success message or error
27    ///
28    /// # Examples
29    ///
30    /// ```
31    /// use vibesql_ast::{ColumnDef, CreateTableStmt};
32    /// use vibesql_executor::CreateTableExecutor;
33    /// use vibesql_storage::Database;
34    /// use vibesql_types::DataType;
35    ///
36    /// let mut db = Database::new();
37    /// let stmt = CreateTableStmt { temporary: false,
38    ///     if_not_exists: false,
39    ///     table_name: "users".to_string(),
40    ///     columns: vec![
41    ///         ColumnDef {
42    ///             name: "id".to_string(),
43    ///             data_type: DataType::Integer,
44    ///             nullable: false,
45    ///             constraints: vec![],
46    ///             default_value: None,
47    ///             comment: None,
48    ///             generated_expr: None, is_exact_integer_type: false,
49    ///         },
50    ///         ColumnDef {
51    ///             name: "name".to_string(),
52    ///             data_type: DataType::Varchar { max_length: Some(255) },
53    ///             nullable: true,
54    ///             constraints: vec![],
55    ///             default_value: None,
56    ///             comment: None,
57    ///             generated_expr: None, is_exact_integer_type: false,
58    ///         },
59    ///     ],
60    ///     table_constraints: vec![],
61    ///     table_options: vec![],
62    ///     quoted: false,
63    ///     as_query: None, without_rowid: false,
64    /// };
65    ///
66    /// let result = CreateTableExecutor::execute(&stmt, &mut db);
67    /// assert!(result.is_ok());
68    /// ```
69    pub fn execute(
70        stmt: &CreateTableStmt,
71        database: &mut Database,
72    ) -> Result<String, ExecutorError> {
73        // Parse qualified table name (schema.table or just table)
74        // For TEMP tables, use the session-specific temp schema (SQLite compatibility)
75        let (schema_name, table_name, identifier) = if stmt.temporary {
76            // Temporary table - use session-specific temp schema
77            // Each session gets its own temp schema (e.g., "temp_1", "temp_2")
78            // for isolation between database connections
79            let temp_schema = database.catalog.temp_schema_name();
80            let id = TableIdentifier::qualified(
81                temp_schema,
82                false,
83                &stmt.table_name,
84                stmt.quoted,
85            );
86            (temp_schema.to_string(), stmt.table_name.clone(), id)
87        } else if let Some((schema_part, table_part)) = stmt.table_name.split_once('.') {
88            // Schema-qualified table name - use qualified identifier
89            // Note: We use stmt.quoted for both parts since the parser combined them
90            // In a future iteration, CREATE TABLE could also store schema/table quoted status separately
91            let id = TableIdentifier::qualified(schema_part, stmt.quoted, table_part, stmt.quoted);
92            (schema_part.to_string(), table_part.to_string(), id)
93        } else {
94            // Simple table name - use current schema
95            let id = TableIdentifier::new(&stmt.table_name, stmt.quoted);
96            (database.catalog.get_current_schema().to_string(), stmt.table_name.clone(), id)
97        };
98
99        // Check CREATE privilege on the schema
100        PrivilegeChecker::check_create(database, &schema_name)?;
101
102        // Handle CREATE TABLE AS SELECT syntax
103        if let Some(query) = &stmt.as_query {
104            return Self::execute_create_as_select(
105                database,
106                &table_name,
107                &schema_name,
108                identifier,
109                stmt.if_not_exists,
110                query,
111            );
112        }
113
114        // Check if table already exists in the target schema using SQL:1999 identifier semantics
115        // For CREATE TABLE, we only check the target schema (not temp schema)
116        // Temp tables can shadow main tables, but we allow creating in main even if temp exists
117        // Use table_exists_by_identifier which respects quoted/unquoted semantics:
118        // - Quoted identifiers: case-sensitive (exact match)
119        // - Unquoted identifiers: case-insensitive (lowercase canonical)
120        if database.catalog.table_exists_by_identifier(&identifier) {
121            if stmt.if_not_exists {
122                // IF NOT EXISTS - silently return success without creating the table
123                return Ok(format!(
124                    "Table '{}' already exists in schema '{}' (skipped)",
125                    table_name, schema_name
126                ));
127            }
128            return Err(ExecutorError::TableAlreadyExists(format!(
129                "{}.{}",
130                schema_name,
131                identifier.display()
132            )));
133        }
134
135        // Check for AUTO_INCREMENT constraints
136        // MySQL allows only one AUTO_INCREMENT column per table
137        let auto_increment_columns: Vec<&str> = stmt
138            .columns
139            .iter()
140            .filter(|col_def| {
141                col_def
142                    .constraints
143                    .iter()
144                    .any(|c| matches!(c.kind, vibesql_ast::ColumnConstraintKind::AutoIncrement))
145            })
146            .map(|col_def| col_def.name.as_str())
147            .collect();
148
149        if auto_increment_columns.len() > 1 {
150            return Err(ExecutorError::ConstraintViolation(
151                "Only one AUTO_INCREMENT column allowed per table".to_string(),
152            ));
153        }
154
155        // Convert AST ColumnDef → Catalog ColumnSchema
156        let mut columns: Vec<ColumnSchema> = stmt
157            .columns
158            .iter()
159            .map(|col_def| {
160                // For AUTO_INCREMENT columns, set default to NEXT VALUE FOR sequence
161                let default_value = if col_def
162                    .constraints
163                    .iter()
164                    .any(|c| matches!(c.kind, vibesql_ast::ColumnConstraintKind::AutoIncrement))
165                {
166                    // Create sequence name: {table_name}_{column_name}_seq
167                    let sequence_name = format!("{}_{}_seq", table_name, col_def.name);
168                    Some(vibesql_ast::Expression::NextValue { sequence_name })
169                } else {
170                    col_def.default_value.as_ref().map(|expr| (**expr).clone())
171                };
172
173                // Extract column-level collation from constraints
174                let collation = col_def.constraints.iter().find_map(|c| {
175                    if let vibesql_ast::ColumnConstraintKind::Collate(coll) = &c.kind {
176                        Some(coll.clone())
177                    } else {
178                        None
179                    }
180                });
181
182                ColumnSchema {
183                    name: col_def.name.clone(),
184                    data_type: col_def.data_type.clone(),
185                    nullable: col_def.nullable,
186                    default_value,
187                    generated_expr: col_def.generated_expr.as_ref().map(|expr| (**expr).clone()),
188                    collation,
189                    is_exact_integer_type: col_def.is_exact_integer_type,
190                }
191            })
192            .collect();
193
194        // Process constraints using the constraint validator
195        let constraint_result =
196            ConstraintValidator::process_constraints(&stmt.columns, &stmt.table_constraints)?;
197
198        // Apply constraint results to columns (updates nullability)
199        ConstraintValidator::apply_to_columns(&mut columns, &constraint_result);
200
201        // Create TableSchema with unqualified name
202        let mut table_schema = TableSchema::new(table_name.clone(), columns);
203
204        // Apply WITHOUT ROWID flag from AST (SQLite compatibility)
205        table_schema.without_rowid = stmt.without_rowid;
206
207        // Apply constraint results to schema (sets PK, unique, and check constraints)
208        ConstraintValidator::apply_to_schema(&mut table_schema, &constraint_result);
209
210        // WITHOUT ROWID tables must have a PRIMARY KEY (SQLite requirement, Issue #4953)
211        if stmt.without_rowid && table_schema.primary_key.is_none() {
212            return Err(ExecutorError::ConstraintViolation(
213                "WITHOUT ROWID tables must have a PRIMARY KEY".to_string(),
214            ));
215        }
216
217        // Detect INTEGER PRIMARY KEY for SQLite rowid aliasing (Issue #4536)
218        // In SQLite, a single-column PRIMARY KEY with exactly "INTEGER" type is an alias for rowid.
219        // The column's value IS the rowid, and SELECT rowid returns this column's value.
220        // IMPORTANT: Only exact "INTEGER" qualifies - "INT" does NOT (even though both parse to DataType::Integer)
221        if let Some(pk_cols) = &table_schema.primary_key {
222            if pk_cols.len() == 1 {
223                if let Some(col_idx) = table_schema.get_column_index(&pk_cols[0]) {
224                    let col = &table_schema.columns[col_idx];
225                    // Only exact "INTEGER" type qualifies for rowid aliasing, not "INT"
226                    if matches!(col.data_type, DataType::Integer) && col.is_exact_integer_type {
227                        table_schema.set_rowid_alias_column(Some(col_idx));
228                    }
229                }
230            }
231        }
232
233        // Check for STORAGE table option and apply storage format
234        for option in &stmt.table_options {
235            if let vibesql_ast::TableOption::Storage(format) = option {
236                table_schema.set_storage_format(*format);
237            }
238        }
239
240        // Process foreign key constraints from table_constraints
241        for constraint in &stmt.table_constraints {
242            if let vibesql_ast::TableConstraintKind::ForeignKey {
243                columns: fk_columns,
244                references_table,
245                references_columns,
246                on_delete,
247                on_update,
248                ..
249            } = &constraint.kind
250            {
251                // Resolve column indices for FK columns
252                let column_indices: Vec<usize> = fk_columns
253                    .iter()
254                    .map(|col_name| {
255                        table_schema.get_column_index(col_name).ok_or_else(|| {
256                            ExecutorError::ColumnNotFound {
257                                column_name: col_name.to_string(),
258                                table_name: table_name.clone(),
259                                searched_tables: vec![table_name.clone()],
260                                available_columns: table_schema
261                                    .columns
262                                    .iter()
263                                    .map(|c| c.name.clone())
264                                    .collect(),
265                            }
266                        })
267                    })
268                    .collect::<Result<Vec<_>, _>>()?;
269
270                // Lookup parent table to get parent column indices
271                let parent_schema = database
272                    .catalog
273                    .get_table(references_table)
274                    .ok_or_else(|| ExecutorError::TableNotFound(references_table.clone()))?;
275
276                let parent_column_indices: Vec<usize> = references_columns
277                    .iter()
278                    .map(|col_name| {
279                        parent_schema.get_column_index(col_name).ok_or_else(|| {
280                            ExecutorError::ColumnNotFound {
281                                column_name: col_name.to_string(),
282                                table_name: references_table.clone(),
283                                searched_tables: vec![references_table.clone()],
284                                available_columns: parent_schema
285                                    .columns
286                                    .iter()
287                                    .map(|c| c.name.clone())
288                                    .collect(),
289                            }
290                        })
291                    })
292                    .collect::<Result<Vec<_>, _>>()?;
293
294                // Convert ReferentialAction from AST to catalog type
295                let convert_action = |action: &Option<vibesql_ast::ReferentialAction>| match action
296                    .as_ref()
297                    .unwrap_or(&vibesql_ast::ReferentialAction::NoAction)
298                {
299                    vibesql_ast::ReferentialAction::Cascade => {
300                        vibesql_catalog::ReferentialAction::Cascade
301                    }
302                    vibesql_ast::ReferentialAction::SetNull => {
303                        vibesql_catalog::ReferentialAction::SetNull
304                    }
305                    vibesql_ast::ReferentialAction::SetDefault => {
306                        vibesql_catalog::ReferentialAction::SetDefault
307                    }
308                    vibesql_ast::ReferentialAction::Restrict => {
309                        vibesql_catalog::ReferentialAction::Restrict
310                    }
311                    vibesql_ast::ReferentialAction::NoAction => {
312                        vibesql_catalog::ReferentialAction::NoAction
313                    }
314                };
315
316                let fk = vibesql_catalog::ForeignKeyConstraint {
317                    name: constraint.name.clone(),
318                    column_names: fk_columns.clone(),
319                    column_indices,
320                    parent_table: references_table.clone(),
321                    parent_column_names: references_columns.clone(),
322                    parent_column_indices,
323                    on_delete: convert_action(on_delete),
324                    on_update: convert_action(on_update),
325                };
326
327                table_schema.add_foreign_key(fk)?;
328            }
329        }
330
331        // If creating in a non-current schema, temporarily switch to it
332        let original_schema = database.catalog.get_current_schema().to_string();
333        let needs_schema_switch = schema_name != original_schema;
334
335        if needs_schema_switch {
336            database
337                .catalog
338                .set_current_schema(&schema_name)
339                .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
340        }
341
342        // Create internal sequences for AUTO_INCREMENT columns
343        for auto_inc_col in &auto_increment_columns {
344            let sequence_name = format!("{}_{}_seq", table_name, auto_inc_col);
345            database
346                .catalog
347                .create_sequence(
348                    sequence_name.clone(),
349                    Some(1), // start_with: 1
350                    1,       // increment_by: 1
351                    Some(1), // min_value: 1
352                    None,    // max_value: unlimited
353                    false,   // cycle: false
354                )
355                .map_err(|e| {
356                    ExecutorError::StorageError(format!(
357                        "Failed to create sequence for AUTO_INCREMENT: {:?}",
358                        e
359                    ))
360                })?;
361        }
362
363        // Create table using Database API with TableIdentifier (handles both catalog and storage)
364        // Note: identifier was created at the start of this function with proper quoted semantics
365        let result = database
366            .create_table_with_identifier(table_schema.clone(), identifier.clone())
367            .map_err(|e| ExecutorError::StorageError(e.to_string()));
368
369        // Check if table creation succeeded before creating indexes
370        result?;
371
372        // Auto-create indexes for PRIMARY KEY and UNIQUE constraints
373        Self::create_implicit_indexes(database, &table_name, &table_schema)?;
374
375        // Restore original schema if we switched
376        if needs_schema_switch {
377            database
378                .catalog
379                .set_current_schema(&original_schema)
380                .map_err(|e| ExecutorError::StorageError(format!("Schema error: {:?}", e)))?;
381        }
382
383        // Return success message
384        Ok(format!("Table '{}' created successfully in schema '{}'", table_name, schema_name))
385    }
386
387    /// Create implicit indexes for PRIMARY KEY and UNIQUE constraints
388    ///
389    /// Production databases automatically create B-tree indexes for these constraints
390    /// to enable efficient query optimization. This function replicates that behavior.
391    fn create_implicit_indexes(
392        database: &mut Database,
393        table_name: &str,
394        table_schema: &TableSchema,
395    ) -> Result<(), ExecutorError> {
396        // Counter for SQLite-compatible auto-index naming: sqlite_autoindex_<table>_<n>
397        let mut autoindex_counter = 1;
398
399        // Auto-create PRIMARY KEY index
400        // Skip autoindex for INTEGER PRIMARY KEY - it's an alias for rowid
401        // and doesn't need a separate B-tree index (matches SQLite behavior)
402        if let Some(pk_cols) = &table_schema.primary_key {
403            if table_schema.rowid_alias_column.is_none() {
404                let index_name = format!("sqlite_autoindex_{}_{}", table_name, autoindex_counter);
405                autoindex_counter += 1;
406
407                // Create IndexColumn specs for the PRIMARY KEY columns
408                let index_columns: Vec<IndexColumn> = pk_cols
409                    .iter()
410                    .map(|col_name| IndexColumn::Column {
411                        column_name: col_name.to_string(),
412                        direction: OrderDirection::Asc,
413                        prefix_length: None,
414                    })
415                    .collect();
416
417                // Add to catalog first
418                let index_metadata = vibesql_catalog::IndexMetadata::new(
419                    index_name.clone(),
420                    table_name.to_string(),
421                    vibesql_catalog::IndexType::BTree,
422                    index_columns
423                        .iter()
424                        .map(|col| {
425                            vibesql_catalog::IndexedColumn::new_column(
426                                col.expect_column_name().to_string(),
427                                vibesql_catalog::SortOrder::Ascending,
428                            )
429                        })
430                        .collect(),
431                    true, // unique
432                );
433                database
434                    .catalog
435                    .add_index(index_metadata)
436                    .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
437
438                // Create the actual B-tree index
439                database
440                    .create_index(index_name, table_name.to_string(), true, index_columns)
441                    .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
442            }
443        }
444
445        // Auto-create UNIQUE constraint indexes
446        for unique_cols in &table_schema.unique_constraints {
447            let index_name = format!("sqlite_autoindex_{}_{}", table_name, autoindex_counter);
448            autoindex_counter += 1;
449
450            // Create IndexColumn specs for the UNIQUE columns
451            let index_columns: Vec<IndexColumn> = unique_cols
452                .iter()
453                .map(|col_name| IndexColumn::Column {
454                    column_name: col_name.to_string(),
455                    direction: OrderDirection::Asc,
456                    prefix_length: None,
457                })
458                .collect();
459
460            // Add to catalog first
461            let index_metadata = vibesql_catalog::IndexMetadata::new(
462                index_name.clone(),
463                table_name.to_string(),
464                vibesql_catalog::IndexType::BTree,
465                index_columns
466                    .iter()
467                    .map(|col| {
468                        vibesql_catalog::IndexedColumn::new_column(
469                            col.expect_column_name().to_string(),
470                            vibesql_catalog::SortOrder::Ascending,
471                        )
472                    })
473                    .collect(),
474                true, // unique
475            );
476            database
477                .catalog
478                .add_index(index_metadata)
479                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
480
481            // Create the actual B-tree index
482            database
483                .create_index(index_name, table_name.to_string(), true, index_columns)
484                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
485        }
486
487        Ok(())
488    }
489
490    /// Execute CREATE TABLE ... AS SELECT
491    ///
492    /// Creates a new table with schema derived from the SELECT result,
493    /// and populates it with the query results.
494    fn execute_create_as_select(
495        database: &mut Database,
496        table_name: &str,
497        schema_name: &str,
498        identifier: TableIdentifier,
499        if_not_exists: bool,
500        query: &vibesql_ast::SelectStmt,
501    ) -> Result<String, ExecutorError> {
502        // Check if table already exists
503        if database.catalog.table_exists_by_identifier(&identifier) {
504            if if_not_exists {
505                return Ok(format!(
506                    "Table '{}' already exists in schema '{}' (skipped)",
507                    table_name, schema_name
508                ));
509            }
510            return Err(ExecutorError::TableAlreadyExists(format!(
511                "{}.{}",
512                schema_name,
513                identifier.display()
514            )));
515        }
516
517        // Execute the SELECT query to get results
518        let rows = SelectExecutor::new(database).execute(query)?;
519
520        // Derive column names from the SELECT list (expanding wildcards if needed)
521        let column_names =
522            Self::derive_column_names_from_select_list(&query.select_list, &query.from, database)?;
523
524        // Derive column schema from the first row (if any) or default to BLOB
525        let columns: Vec<ColumnSchema> = column_names
526            .iter()
527            .enumerate()
528            .map(|(idx, col_name)| {
529                // Try to infer data type from the first row if available
530                let data_type = if !rows.is_empty() && idx < rows[0].values.len() {
531                    Self::infer_data_type(&rows[0].values[idx])
532                } else {
533                    // No rows or column - default to BLOB affinity
534                    DataType::BinaryLargeObject
535                };
536
537                ColumnSchema {
538                    name: col_name.to_string(),
539                    data_type,
540                    nullable: true, // Default to nullable for CTAS
541                    default_value: None,
542                    generated_expr: None,
543                    collation: None, // CTAS doesn't preserve collation
544                    is_exact_integer_type: false, // CTAS doesn't preserve exact type
545                }
546            })
547            .collect();
548
549        // Create the table schema
550        let table_schema = TableSchema::new(table_name.to_string(), columns);
551
552        // Create the table
553        database
554            .create_table_with_identifier(table_schema, identifier)
555            .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
556
557        // Insert the result rows into the new table
558        let row_count = rows.len();
559        for row in rows {
560            database
561                .insert_row(table_name, row)
562                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
563        }
564
565        Ok(format!(
566            "Table '{}' created successfully in schema '{}' with {} rows",
567            table_name, schema_name, row_count
568        ))
569    }
570
571    /// Derive column names from a SELECT list, expanding wildcards using the database schema
572    fn derive_column_names_from_select_list(
573        select_list: &[vibesql_ast::SelectItem],
574        from: &Option<vibesql_ast::FromClause>,
575        database: &Database,
576    ) -> Result<Vec<String>, ExecutorError> {
577        let mut names = Vec::new();
578        let mut counter = 0;
579
580        for item in select_list {
581            match item {
582                vibesql_ast::SelectItem::Wildcard { .. } => {
583                    // Expand wildcard using the FROM clause tables
584                    let table_names = Self::get_table_names_from_from(from)?;
585                    for table_name in table_names {
586                        if let Some(schema) = database.catalog.get_table(&table_name) {
587                            for col in &schema.columns {
588                                names.push(col.name.clone());
589                            }
590                        } else {
591                            return Err(ExecutorError::TableNotFound(table_name));
592                        }
593                    }
594                }
595                vibesql_ast::SelectItem::QualifiedWildcard { qualifier, .. } => {
596                    // Expand table.* using the specific table's schema
597                    if let Some(schema) = database.catalog.get_table(qualifier) {
598                        for col in &schema.columns {
599                            names.push(col.name.clone());
600                        }
601                    } else {
602                        return Err(ExecutorError::TableNotFound(qualifier.clone()));
603                    }
604                }
605                vibesql_ast::SelectItem::Expression { expr, alias, .. } => {
606                    let name = if let Some(alias) = alias {
607                        alias.clone()
608                    } else {
609                        // Try to derive from expression
610                        Self::derive_column_name_from_expr(expr, &mut counter)
611                    };
612                    names.push(name);
613                }
614            }
615        }
616
617        Ok(names)
618    }
619
620    /// Extract table names from a FROM clause
621    fn get_table_names_from_from(
622        from: &Option<vibesql_ast::FromClause>,
623    ) -> Result<Vec<String>, ExecutorError> {
624        let mut names = Vec::new();
625
626        match from {
627            None => {
628                // No FROM clause - can't expand wildcard
629                return Err(ExecutorError::UnsupportedFeature(
630                    "CREATE TABLE AS SELECT * requires a FROM clause".to_string(),
631                ));
632            }
633            Some(vibesql_ast::FromClause::Table { name, .. }) => {
634                names.push(name.clone());
635            }
636            Some(vibesql_ast::FromClause::Join { left, right, .. }) => {
637                // Recursively get tables from join
638                names.extend(Self::get_table_names_from_from(&Some(*left.clone()))?);
639                names.extend(Self::get_table_names_from_from(&Some(*right.clone()))?);
640            }
641            Some(vibesql_ast::FromClause::Subquery { alias, .. }) => {
642                // For derived tables (subqueries), we can't easily expand *
643                // because we'd need to recursively process the subquery
644                return Err(ExecutorError::UnsupportedFeature(format!(
645                    "CREATE TABLE AS SELECT * from subquery '{}' not supported - please specify columns explicitly",
646                    alias
647                )));
648            }
649            Some(vibesql_ast::FromClause::Values { alias, .. }) => {
650                // VALUES clause - can't determine column names from schema
651                return Err(ExecutorError::UnsupportedFeature(format!(
652                    "CREATE TABLE AS SELECT * from VALUES '{}' not supported - please specify columns explicitly",
653                    alias
654                )));
655            }
656        }
657
658        Ok(names)
659    }
660
661    /// Derive a column name from an expression
662    fn derive_column_name_from_expr(expr: &vibesql_ast::Expression, counter: &mut usize) -> String {
663        match expr {
664            vibesql_ast::Expression::ColumnRef(col_id) => col_id.column_canonical().to_string(),
665            vibesql_ast::Expression::Literal(_) => {
666                *counter += 1;
667                format!("column{}", counter)
668            }
669            vibesql_ast::Expression::Function { name, .. } => {
670                // Use the function name as the column name
671                name.to_string().to_lowercase()
672            }
673            _ => {
674                *counter += 1;
675                format!("column{}", counter)
676            }
677        }
678    }
679
680    /// Infer DataType from an SqlValue
681    fn infer_data_type(value: &vibesql_types::SqlValue) -> DataType {
682        use vibesql_types::SqlValue;
683        match value {
684            SqlValue::Null => DataType::BinaryLargeObject,
685            SqlValue::Boolean(_) => DataType::Boolean,
686            SqlValue::Integer(_) => DataType::Integer,
687            SqlValue::Bigint(_) => DataType::Bigint,
688            SqlValue::Smallint(_) => DataType::Smallint,
689            SqlValue::Unsigned(_) => DataType::Unsigned,
690            SqlValue::Float(_) | SqlValue::Real(_) => DataType::Real,
691            SqlValue::Double(_) | SqlValue::Numeric(_) => DataType::DoublePrecision,
692            SqlValue::Character(_) => DataType::Character { length: 255 },
693            SqlValue::Varchar(_) => DataType::Varchar { max_length: None },
694            SqlValue::Date(_) => DataType::Date,
695            SqlValue::Time(_) => DataType::Time { with_timezone: false },
696            SqlValue::Timestamp(_) => DataType::Timestamp { with_timezone: false },
697            SqlValue::Interval(_) => DataType::Interval {
698                start_field: vibesql_types::IntervalField::Day,
699                end_field: None,
700            },
701            SqlValue::Vector(v) => DataType::Vector { dimensions: v.len() as u32 },
702            SqlValue::Blob(_) => DataType::BinaryLargeObject,
703        }
704    }
705}