use crate::core::{DataType, Error, Result, Row, Schema, SchemaBuilder, Value};
use crate::functions::{FunctionDataType, FunctionSignature};
use crate::parser::ast::*;
use crate::storage::expression::Expression;
use crate::storage::functions::{
StoredFunction, StoredParameter, CREATE_FUNCTIONS_SQL, SYS_FUNCTIONS,
};
use crate::storage::procedures::{CREATE_PROCEDURES_SQL, SYS_PROCEDURES};
use crate::storage::traits::{result::EmptyResult, Engine, QueryResult};
use rustc_hash::FxHashMap;
use serde_json;
use std::sync::Arc;
use super::context::ExecutionContext;
use super::expression::ExpressionEval;
use super::result::ExecResult;
use super::Executor;
impl Executor {
pub(crate) fn execute_create_table(
&self,
stmt: &CreateTableStatement,
ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
let table_name = &stmt.table_name.value();
if Schema::is_reserved_namespace(table_name) && !ctx.is_internal() {
return Err(Error::ReservedNamespaceModification(table_name.clone()));
}
if let Some(schema_name) = stmt.table_name.schema() {
let schemas = self.engine.schemas.read().unwrap();
if !schemas.contains_key(&schema_name.to_lowercase()) {
return Err(Error::SchemaNotFound(schema_name));
}
}
if self.engine.table_exists(table_name)? {
if stmt.if_not_exists {
return Ok(Box::new(ExecResult::empty()));
}
return Err(Error::TableExists(table_name.clone()));
}
let schema_name = stmt
.table_name
.schema()
.unwrap_or_else(|| ctx.current_schema().unwrap_or("public").to_string())
.to_lowercase();
if self.engine.view_exists(&schema_name, table_name)? {
return Err(Error::ViewAlreadyExists(table_name.clone()));
}
if let Some(ref select_stmt) = stmt.as_select {
return self.execute_create_table_as_select(
table_name,
select_stmt,
stmt.if_not_exists,
ctx,
);
}
let mut schema_builder = SchemaBuilder::new(table_name);
let mut unique_columns: Vec<String> = Vec::new();
for col_def in &stmt.columns {
let col_name = &col_def.name.value;
let data_type = self.parse_data_type(&col_def.data_type)?;
let nullable = !col_def
.constraints
.iter()
.any(|c| matches!(c, ColumnConstraint::NotNull));
let is_primary_key = col_def
.constraints
.iter()
.any(|c| matches!(c, ColumnConstraint::PrimaryKey));
if is_primary_key && data_type != DataType::Integer {
return Err(Error::ParseError(format!(
"PRIMARY KEY column '{}' must be INTEGER type, got {:?}. Only INTEGER PRIMARY KEY is supported.",
col_name, data_type
)));
}
let is_unique = col_def
.constraints
.iter()
.any(|c| matches!(c, ColumnConstraint::Unique));
let is_auto_increment = col_def
.constraints
.iter()
.any(|c| matches!(c, ColumnConstraint::AutoIncrement));
let default_expr = col_def.constraints.iter().find_map(|c| {
if let ColumnConstraint::Default(expr) = c {
Some(format!("{}", expr))
} else {
None
}
});
let check_expr = col_def.constraints.iter().find_map(|c| {
if let ColumnConstraint::Check(expr) = c {
Some(format!("{}", expr))
} else {
None
}
});
schema_builder = schema_builder.add_with_constraints(
col_name,
data_type,
nullable && !is_primary_key,
is_primary_key,
is_auto_increment,
default_expr,
check_expr,
);
if is_unique && !is_primary_key {
unique_columns.push(col_name.clone());
}
}
let mut schema = schema_builder.build();
let mut table_unique_constraints: Vec<Vec<String>> = Vec::new();
let mut schemas_to_update: Vec<crate::core::Schema> = Vec::new();
for constraint in &stmt.table_constraints {
match constraint {
TableConstraint::Unique(cols) => {
let col_names: Vec<String> = cols.iter().map(|c| c.value.clone()).collect();
table_unique_constraints.push(col_names);
}
TableConstraint::ForeignKey {
column,
foreign_table,
foreign_column,
on_delete,
on_update,
..
} => {
let (col_idx, _) = schema
.find_column(&column.value)
.ok_or_else(|| Error::column_not_found_by_name(column.value.clone()))?;
let is_self_referencing = foreign_table.value.eq_ignore_ascii_case(table_name);
let mut ref_schema = if is_self_referencing {
schema.clone()
} else {
self.engine.get_table_schema(&foreign_table.value)?
};
let ref_col = ref_schema
.get_column_by_name(&foreign_column.value)
.ok_or_else(|| {
Error::column_not_found_by_name(foreign_column.value.clone())
})?;
if !ref_col.primary_key {
}
if ref_col.data_type != schema.columns[col_idx].data_type {
return Err(Error::Type(format!(
"foreign key type mismatch: {} ({:?}) vs {} ({:?})",
column.value,
schema.columns[col_idx].data_type,
foreign_column.value,
ref_col.data_type
)));
}
let fk_meta = crate::core::schema::ForeignKeyMetadata {
column_id: col_idx,
referenced_table: foreign_table.value.clone(),
referenced_column_name: foreign_column.value.clone(),
on_delete: *on_delete,
on_update: *on_update,
};
schema.foreign_keys.push(fk_meta);
if is_self_referencing {
schema.referenced_by.push(table_name.clone());
} else {
ref_schema.referenced_by.push(table_name.clone());
schemas_to_update.push(ref_schema);
}
}
_ => {} }
}
let mut active_tx = self.active_transaction.lock().unwrap();
if let Some(ref mut _tx_state) = *active_tx {
tracing::info!(
"Executing CREATE TABLE for '{}' (in transaction)",
table_name
);
self.engine.create_table(schema.clone())?;
for ref_schema in schemas_to_update {
self.engine
.update_table_schema(&ref_schema.table_name.clone(), ref_schema)?;
}
if let Some(ref mut tx_state) = *active_tx {
tx_state
.ddl_undo_log
.push(super::DeferredDdlOperation::CreateTable {
name: table_name.clone(),
});
}
} else {
tracing::info!("Executing CREATE TABLE for '{}'", table_name);
self.engine.create_table(schema)?;
for ref_schema in schemas_to_update {
self.engine
.update_table_schema(&ref_schema.table_name.clone(), ref_schema)?;
}
}
let needs_indexes = !unique_columns.is_empty() || !table_unique_constraints.is_empty();
if needs_indexes {
let mut tx = self.engine.begin_transaction()?;
let table = tx.get_table(table_name)?;
for col_name in &unique_columns {
let index_name = format!("unique_{}_{}", table_name, col_name);
table.create_index(&index_name, &[col_name.as_str()], true)?;
let idx_type = table
.get_index(&index_name)
.map(|idx| idx.index_type())
.unwrap_or(crate::core::IndexType::BTree);
self.engine.record_create_index(
table_name,
&index_name,
std::slice::from_ref(col_name),
true,
idx_type,
);
}
for (i, col_names) in table_unique_constraints.iter().enumerate() {
let index_name = format!("unique_{}_{}", table_name, i);
let col_refs: Vec<&str> = col_names.iter().map(|s| s.as_str()).collect();
table.create_index(&index_name, &col_refs, true)?;
let idx_type = table
.get_index(&index_name)
.map(|idx| idx.index_type())
.unwrap_or(crate::core::IndexType::BTree);
self.engine
.record_create_index(table_name, &index_name, col_names, true, idx_type);
}
tx.commit()?;
}
Ok(Box::new(ExecResult::empty()))
}
fn execute_create_table_as_select(
&self,
table_name: &str,
select_stmt: &SelectStatement,
_if_not_exists: bool,
ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
use crate::core::Row;
let mut result = self.execute_select(select_stmt, ctx)?;
let columns: Vec<String> = result.columns().to_vec();
let mut rows: Vec<Row> = Vec::new();
while result.next() {
rows.push(result.take_row());
}
let mut schema_builder = SchemaBuilder::new(table_name);
for (i, col_name) in columns.iter().enumerate() {
let base_name = if let Some(pos) = col_name.rfind('.') {
&col_name[pos + 1..]
} else {
col_name.as_str()
};
let data_type = if let Some(first_row) = rows.first() {
if let Some(value) = first_row.get(i) {
Self::infer_data_type(value)
} else {
DataType::Text }
} else {
DataType::Text };
schema_builder = schema_builder.add_nullable(base_name, data_type);
}
let schema = schema_builder.build();
self.engine.create_table(schema)?;
let rows_count = rows.len();
if !rows.is_empty() {
let mut tx = self.engine.begin_transaction()?;
let mut table = tx.get_table(table_name)?;
for row in rows {
let _ = table.insert(row)?;
}
tx.commit()?;
}
Ok(Box::new(ExecResult::with_rows_affected(rows_count as i64)))
}
fn infer_data_type(value: &crate::core::Value) -> DataType {
match value {
crate::core::Value::Integer(_) => DataType::Integer,
crate::core::Value::Float(_) => DataType::Float,
crate::core::Value::Text(_) => DataType::Text,
crate::core::Value::Boolean(_) => DataType::Boolean,
crate::core::Value::Timestamp(_) => DataType::Timestamp,
crate::core::Value::Json(_) => DataType::Json,
crate::core::Value::Null(_) => DataType::Text, }
}
pub(crate) fn execute_drop_table(
&self,
stmt: &DropTableStatement,
ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
let table_name = &stmt.table_name.value();
if Schema::is_reserved_namespace(table_name) && !ctx.is_internal() {
return Err(Error::ReservedNamespaceModification(table_name.clone()));
}
if !self.engine.table_exists(table_name)? {
if stmt.if_exists {
return Ok(Box::new(ExecResult::empty()));
}
return Err(Error::TableNotFoundByName(table_name.clone()));
}
let mut active_tx = self.active_transaction.lock().unwrap();
if let Some(ref mut _tx_state) = *active_tx {
tracing::info!("Executing DROP TABLE for '{}' (in transaction)", table_name);
let schema = self.engine.get_table_schema(table_name)?;
if let Ok(true) = self
.engine
.table_exists(crate::storage::triggers::SYS_TRIGGERS)
{
let _ = self.delete_table_triggers(table_name);
}
self.engine.drop_table_internal(table_name)?;
if let Some(ref mut tx_state) = *active_tx {
tx_state
.ddl_undo_log
.push(super::DeferredDdlOperation::DropTable {
name: table_name.clone(),
schema,
});
}
eprintln!(
"Warning: DROP TABLE '{}' within transaction - data cannot be recovered on rollback",
table_name
);
} else {
tracing::info!("Executing DROP TABLE for '{}'", table_name);
if let Ok(true) = self
.engine
.table_exists(crate::storage::triggers::SYS_TRIGGERS)
{
let _ = self.delete_table_triggers(table_name);
}
self.engine.drop_table_internal(table_name)?;
}
Ok(Box::new(ExecResult::empty()))
}
pub(crate) fn execute_create_index(
&self,
stmt: &CreateIndexStatement,
_ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
if let Some(schema) = stmt.table_name.schema() {
let schemas = self.engine.schemas.read().unwrap();
if !schemas.contains_key(&schema.to_lowercase()) {
return Err(Error::SchemaNotFound(schema));
}
}
let table_name = &stmt.table_name.value();
let index_name = &stmt.index_name.value;
if !self.engine.table_exists(table_name)? {
return Err(Error::TableNotFoundByName(table_name.clone()));
}
if self.engine.index_exists(index_name, table_name)? {
if stmt.if_not_exists {
return Ok(Box::new(ExecResult::empty()));
}
return Err(Error::internal(format!(
"index already exists: {}",
index_name
)));
}
let is_unique = stmt.is_unique;
let tx = self.engine.begin_transaction()?;
let table = tx.get_table(table_name)?;
let schema = table.schema();
for col_id in &stmt.columns {
let col_name = &col_id.value;
if !schema
.columns
.iter()
.any(|c| c.name.eq_ignore_ascii_case(col_name))
{
return Err(Error::ColumnNotFoundNamed(col_name.clone()));
}
}
let column_names: Vec<String> = stmt.columns.iter().map(|c| c.value.clone()).collect();
let column_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
if stmt.if_not_exists {
if table.get_index(index_name).is_some() {
return Ok(Box::new(ExecResult::empty()));
}
if column_names.len() == 1 && table.has_index_on_column(&column_names[0]) {
return Ok(Box::new(ExecResult::empty()));
}
}
let requested_index_type = stmt.index_method.map(|method| match method {
crate::parser::ast::IndexMethod::BTree => crate::core::IndexType::BTree,
crate::parser::ast::IndexMethod::Hash => crate::core::IndexType::Hash,
crate::parser::ast::IndexMethod::Bitmap => crate::core::IndexType::Bitmap,
});
table.create_index_with_type(index_name, &column_refs, is_unique, requested_index_type)?;
let index_type = table
.get_index(index_name)
.map(|idx| idx.index_type())
.unwrap_or(crate::core::IndexType::BTree);
self.engine.record_create_index(
table_name,
index_name,
&column_names,
is_unique,
index_type,
);
Ok(Box::new(ExecResult::empty()))
}
pub(crate) fn execute_drop_index(
&self,
stmt: &DropIndexStatement,
_ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
let index_name = &stmt.index_name.value;
let table_name = match &stmt.table_name {
Some(t) => t.value.clone(),
None => {
return Err(Error::InvalidArgumentMessage(
"DROP INDEX requires table name".to_string(),
))
}
};
if !self.engine.table_exists(&table_name)? {
if stmt.if_exists {
return Ok(Box::new(ExecResult::empty()));
}
return Err(Error::TableNotFoundByName(table_name));
}
if !self.engine.index_exists(index_name, &table_name)? {
if stmt.if_exists {
return Ok(Box::new(ExecResult::empty()));
}
return Err(Error::IndexNotFoundByName(index_name.to_string()));
}
let tx = self.engine.begin_transaction()?;
let table = tx.get_table(&table_name)?;
table.drop_index(index_name)?;
self.engine.record_drop_index(&table_name, index_name);
Ok(Box::new(ExecResult::empty()))
}
pub(crate) fn execute_alter_table(
&self,
stmt: &AlterTableStatement,
ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
let table_name = &stmt.table_name.value();
if Schema::is_reserved_namespace(table_name) && !ctx.is_internal() {
return Err(Error::ReservedNamespaceModification(table_name.clone()));
}
if !self.engine.table_exists(table_name)? {
return Err(Error::TableNotFoundByName(table_name.clone()));
}
let mut tx = self.engine.begin_transaction()?;
let mut table = tx.get_table(table_name)?;
match stmt.operation {
AlterTableOperation::AddColumn => {
if let Some(ref col_def) = stmt.column_def {
let data_type = self.parse_data_type(&col_def.data_type)?;
let nullable = !col_def
.constraints
.iter()
.any(|c| matches!(c, ColumnConstraint::NotNull));
let default_expr = col_def.constraints.iter().find_map(|c| {
if let ColumnConstraint::Default(expr) = c {
Some(expr.to_string())
} else {
None
}
});
let default_value = if let Some(ref expr_str) = default_expr {
let val = self.evaluate_default_expression(expr_str, data_type)?;
if val.is_null() {
None
} else {
Some(val)
}
} else {
None
};
table.create_column_with_default_value(
&col_def.name.value,
data_type,
nullable,
default_expr.clone(),
default_value,
)?;
let schema = table.schema().clone();
self.engine.update_table_schema(table_name, schema)?;
self.engine.record_alter_table_add_column(
table_name,
&col_def.name.value,
data_type,
nullable,
default_expr.as_deref(),
);
} else {
return Err(Error::InvalidArgumentMessage(
"ADD COLUMN requires column definition".to_string(),
));
}
}
AlterTableOperation::DropColumn => {
if let Some(ref col_name) = stmt.column_name {
table.drop_column(&col_name.value)?;
let schema = table.schema().clone();
self.engine.update_table_schema(table_name, schema)?;
self.engine
.record_alter_table_drop_column(table_name, &col_name.value);
} else {
return Err(Error::InvalidArgumentMessage(
"DROP COLUMN requires column name".to_string(),
));
}
}
AlterTableOperation::RenameColumn => match (&stmt.column_name, &stmt.new_column_name) {
(Some(old_name), Some(new_name)) => {
table.rename_column(&old_name.value, &new_name.value)?;
let schema = table.schema().clone();
self.engine.update_table_schema(table_name, schema)?;
self.engine.record_alter_table_rename_column(
table_name,
&old_name.value,
&new_name.value,
);
}
_ => {
return Err(Error::InvalidArgumentMessage(
"RENAME COLUMN requires old and new column names".to_string(),
));
}
},
AlterTableOperation::ModifyColumn => {
if let Some(ref col_def) = stmt.column_def {
let data_type = self.parse_data_type(&col_def.data_type)?;
let nullable = !col_def
.constraints
.iter()
.any(|c| matches!(c, ColumnConstraint::NotNull));
let auto_increment = col_def
.constraints
.iter()
.any(|c| matches!(c, ColumnConstraint::AutoIncrement));
if auto_increment && data_type != crate::core::DataType::Integer {
return Err(Error::InvalidArgumentMessage(
"AUTOINCREMENT is only allowed on INTEGER columns".to_string(),
));
}
let auto_increment_opt = if auto_increment { Some(true) } else { None };
let check_expr = col_def.constraints.iter().find_map(|c| {
if let ColumnConstraint::Check(expr) = c {
Some(expr.to_string())
} else {
None
}
});
let check_expr_opt = check_expr.map(Some);
self.engine.modify_column(
table_name,
&col_def.name.value,
data_type,
nullable,
auto_increment_opt,
check_expr_opt.clone(),
)?;
let is_unique = col_def
.constraints
.iter()
.any(|c| matches!(c, ColumnConstraint::Unique));
if is_unique {
let index_name = format!("unique_{}_{}", table_name, col_def.name.value);
table.create_index_with_type(
&index_name,
&[&col_def.name.value],
true, None,
)?;
}
self.engine.record_alter_table_modify_column(
table_name,
&col_def.name.value,
data_type,
nullable,
auto_increment_opt,
check_expr_opt,
);
} else {
return Err(Error::InvalidArgumentMessage(
"MODIFY COLUMN requires column definition".to_string(),
));
}
}
AlterTableOperation::RenameTable => {
if let Some(ref new_name) = stmt.new_table_name {
tx.rename_table(table_name, &new_name.value)?;
self.engine
.record_alter_table_rename(table_name, &new_name.value);
} else {
return Err(Error::InvalidArgumentMessage(
"RENAME TABLE requires new table name".to_string(),
));
}
}
AlterTableOperation::AddConstraint => {
if let Some(ref constraint) = stmt.constraint {
match constraint {
TableConstraint::ForeignKey {
column,
foreign_table,
foreign_column,
on_delete,
on_update,
..
} => {
let mut schema = self.engine.get_table_schema(table_name)?;
let (col_idx, _) =
schema.find_column(&column.value).ok_or_else(|| {
Error::column_not_found_by_name(column.value.clone())
})?;
let mut ref_schema =
self.engine.get_table_schema(&foreign_table.value)?;
let ref_col = ref_schema
.get_column_by_name(&foreign_column.value)
.ok_or_else(|| {
Error::column_not_found_by_name(foreign_column.value.clone())
})?;
if ref_col.data_type != schema.columns[col_idx].data_type {
return Err(Error::Type(format!(
"foreign key type mismatch: {} ({:?}) vs {} ({:?})",
column.value,
schema.columns[col_idx].data_type,
foreign_column.value,
ref_col.data_type
)));
}
let col_indices = vec![col_idx];
let mut scanner = table.scan(&col_indices, None)?;
while scanner.next() {
let row = scanner.row();
if let Some(val) = row.get(0) {
if val.is_null() {
continue;
}
let ref_table_name = foreign_table.value.to_lowercase();
let ref_table = tx.get_table(&ref_table_name)?;
let mut check_expr =
crate::storage::expression::ComparisonExpr::new(
foreign_column.value.clone(),
crate::core::Operator::Eq,
val.clone(),
);
check_expr.prepare_for_schema(&ref_schema);
let ref_col_indices = vec![0]; let mut ref_scanner =
ref_table.scan(&ref_col_indices, Some(&check_expr))?;
if !ref_scanner.next() {
return Err(Error::ReferentialIntegrityViolation {
message: format!(
"ALTER TABLE ADD CONSTRAINT failed: row contains value '{}' not present in {}({})",
val, foreign_table.value, foreign_column.value
),
});
}
}
}
drop(scanner);
let fk_meta = crate::core::schema::ForeignKeyMetadata {
column_id: col_idx,
referenced_table: foreign_table.value.clone(),
referenced_column_name: foreign_column.value.clone(),
on_delete: *on_delete,
on_update: *on_update,
};
schema.foreign_keys.push(fk_meta);
self.engine.update_table_schema(table_name, schema)?;
ref_schema.referenced_by.push(table_name.clone());
self.engine
.update_table_schema(&foreign_table.value, ref_schema)?;
}
_ => {
return Err(Error::NotSupportedMessage(
"Only ADD CONSTRAINT FOREIGN KEY is supported".to_string(),
));
}
}
} else {
return Err(Error::InvalidArgumentMessage(
"ADD CONSTRAINT requires a constraint definition".to_string(),
));
}
}
}
tx.commit()?;
Ok(Box::new(ExecResult::empty()))
}
pub(crate) fn execute_create_view(
&self,
stmt: &CreateViewStatement,
ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
let schema_name = stmt
.view_name
.schema()
.unwrap_or_else(|| ctx.current_schema().unwrap_or("public").to_string())
.to_lowercase();
let schemas = self.engine.schemas.read().unwrap();
if !schemas.contains_key(&schema_name) {
return Err(Error::SchemaNotFound(schema_name));
}
drop(schemas);
let view_name = &stmt.view_name.table();
if self.engine.table_exists(view_name)? {
return Err(Error::TableAlreadyExists);
}
let query_sql = stmt.query.to_string();
self.engine
.create_view(&schema_name, view_name, query_sql, stmt.if_not_exists)?;
Ok(Box::new(ExecResult::empty()))
}
pub(crate) fn execute_drop_view(
&self,
stmt: &DropViewStatement,
ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
let schema_name = ctx.current_schema().unwrap_or("public").to_lowercase();
let view_name = &stmt.view_name.value;
self.engine
.drop_view(&schema_name, view_name, stmt.if_exists)?;
Ok(Box::new(ExecResult::empty()))
}
pub(crate) fn execute_create_columnar_index(
&self,
_stmt: &CreateColumnarIndexStatement,
_ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
Err(Error::internal(
"CREATE COLUMNAR INDEX syntax is deprecated. Use CREATE INDEX instead - the index type is auto-selected based on column type.",
))
}
pub(crate) fn ensure_procedures_table_exists(&self) -> Result<()> {
let tx = self.engine.begin_transaction()?;
let tables = tx.list_tables()?;
let has_procedures_table = tables
.iter()
.any(|t| t.eq_ignore_ascii_case(SYS_PROCEDURES));
drop(tx);
if !has_procedures_table {
self.execute_functions_sql(CREATE_PROCEDURES_SQL)?;
}
Ok(())
}
fn procedure_exists(&self, procedure_name: &str) -> Result<bool> {
let tx = self.engine.begin_transaction()?;
let table = match tx.get_table(SYS_PROCEDURES) {
Ok(table) => table,
Err(_) => return Ok(false),
};
let mut scanner = table.scan(&[], None)?;
while scanner.next() {
let row = scanner.row();
if let Some(crate::core::value::Value::Text(name)) = row.get(2) {
if name.eq_ignore_ascii_case(procedure_name) {
return Ok(true);
}
}
}
Ok(false)
}
fn insert_procedure(
&self,
procedure: &crate::storage::procedures::StoredProcedure,
) -> Result<()> {
let mut tx = self.engine.begin_transaction()?;
let mut table = tx.get_table(SYS_PROCEDURES)?;
let parameters_json = serde_json::to_string(&procedure.parameters).map_err(|e| {
Error::internal(format!("Failed to serialize procedure parameters: {}", e))
})?;
let schema_value = match &procedure.schema {
Some(schema) => crate::core::value::Value::text(schema.clone()),
None => crate::core::value::Value::Null(crate::core::types::DataType::Text),
};
let row = crate::core::row::Row::from_values(vec![
crate::core::value::Value::Null(crate::core::types::DataType::Integer),
schema_value,
crate::core::value::Value::text(procedure.name.clone()),
crate::core::value::Value::text(parameters_json),
crate::core::value::Value::text(procedure.language.clone()),
crate::core::value::Value::text(procedure.code.clone()),
]);
table.insert(row)?;
tx.commit()?;
Ok(())
}
fn update_procedure(
&self,
procedure: &crate::storage::procedures::StoredProcedure,
) -> Result<()> {
let mut tx = self.engine.begin_transaction()?;
let mut table = tx.get_table(SYS_PROCEDURES)?;
let mut function_id: Option<crate::core::value::Value> = None;
let mut scanner = table.scan(&[], None)?;
while scanner.next() {
let row = scanner.row();
if let Some(crate::core::value::Value::Text(name)) = row.get(2) {
if name.eq_ignore_ascii_case(&procedure.name) {
function_id = row.get(0).cloned();
break;
}
}
}
if let Some(id_value) = &function_id {
use crate::storage::expression::{ComparisonExpr, Expression as StorageExpr};
let mut id_expr =
ComparisonExpr::new("id", crate::core::Operator::Eq, id_value.clone());
let schema = table.schema();
id_expr.prepare_for_schema(schema);
table.delete(Some(&id_expr))?;
}
let parameters_json = serde_json::to_string(&procedure.parameters).map_err(|e| {
Error::internal(format!("Failed to serialize procedure parameters: {}", e))
})?;
let schema_value = match &procedure.schema {
Some(schema) => crate::core::value::Value::text(schema.clone()),
None => crate::core::value::Value::Null(crate::core::types::DataType::Text),
};
let id_value = function_id.unwrap_or(crate::core::value::Value::Null(
crate::core::types::DataType::Integer,
));
let new_row = crate::core::row::Row::from_values(vec![
id_value,
schema_value,
crate::core::value::Value::text(procedure.name.clone()),
crate::core::value::Value::text(parameters_json),
crate::core::value::Value::text(procedure.language.clone()),
crate::core::value::Value::text(procedure.code.clone()),
]);
table.insert(new_row)?;
tx.commit()?;
Ok(())
}
fn delete_procedure(&self, procedure_name: &str) -> Result<()> {
let mut tx = self.engine.begin_transaction()?;
let mut table = tx.get_table(SYS_PROCEDURES)?;
let mut scanner = table.scan(&[], None)?;
let mut procedure_id: Option<crate::core::value::Value> = None;
while scanner.next() {
let row = scanner.row();
if let Some(crate::core::value::Value::Text(name)) = row.get(2) {
if name.eq_ignore_ascii_case(procedure_name) {
procedure_id = row.get(0).cloned(); break;
}
}
}
if let Some(id_value) = procedure_id {
use crate::storage::expression::{ComparisonExpr, Expression as StorageExpr};
let mut id_expr = ComparisonExpr::new("id", crate::core::Operator::Eq, id_value);
let schema = table.schema();
id_expr.prepare_for_schema(schema);
table.delete(Some(&id_expr))?;
}
tx.commit()?;
Ok(())
}
pub(crate) fn execute_create_procedure(
&self,
stmt: &crate::parser::ast::CreateProcedureStatement,
ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
self.ensure_procedures_table_exists()?;
let procedure_name_upper = stmt.procedure_name.function().to_uppercase();
let exists = self.procedure_exists(&procedure_name_upper)?;
if exists && !stmt.or_replace {
return Err(Error::FunctionAlreadyExists(procedure_name_upper.clone()));
}
let is_sql = stmt.language.eq_ignore_ascii_case("sql")
|| stmt.language.eq_ignore_ascii_case("plsql")
|| stmt.language.eq_ignore_ascii_case("pl/sql");
if !is_sql && !self.function_registry.is_language_supported(&stmt.language) {
return Err(Error::internal(format!(
"Unsupported language: {}",
stmt.language
)));
}
let stored_parameters: Vec<crate::storage::procedures::StoredProcedureParameter> = stmt
.parameters
.iter()
.map(|p| crate::storage::procedures::StoredProcedureParameter {
mode: p.mode.to_string(),
name: p.name.value.clone(),
data_type: p.data_type.clone(),
})
.collect();
let stored_procedure = crate::storage::procedures::StoredProcedure {
id: 0,
schema: Some(
stmt.procedure_name
.schema()
.unwrap_or_else(|| ctx.current_schema().unwrap_or("public").to_string())
.to_uppercase(),
),
name: procedure_name_upper.clone(),
parameters: stored_parameters,
language: stmt.language.clone(),
code: stmt.body.clone(),
};
if exists {
self.update_procedure(&stored_procedure)?;
} else {
self.insert_procedure(&stored_procedure)?;
}
self.function_registry
.register_procedure(&procedure_name_upper, stored_procedure);
Ok(Box::new(EmptyResult::new()))
}
pub(crate) fn execute_drop_procedure(
&self,
stmt: &crate::parser::ast::DropProcedureStatement,
_ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
let procedure_name = stmt.procedure_name.function();
let procedure_name_upper = procedure_name.to_uppercase();
if !self.procedure_exists(&procedure_name_upper)? {
if stmt.if_exists {
return Ok(Box::new(EmptyResult::new()));
}
return Err(Error::FunctionNotFound(procedure_name.clone()));
}
self.delete_procedure(&procedure_name_upper)?;
self.function_registry
.unregister_procedure(&procedure_name_upper);
Ok(Box::new(EmptyResult::new()))
}
pub(crate) fn execute_create_function(
&self,
stmt: &CreateFunctionStatement,
ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
self.ensure_functions_table_exists()?;
let function_name_upper = stmt.function_name.function().to_uppercase();
if self.function_exists(&function_name_upper)? {
if stmt.if_not_exists {
return Ok(Box::new(EmptyResult::new()));
}
return Err(Error::FunctionAlreadyExists(function_name_upper.clone()));
}
let stored_parameters: Vec<StoredParameter> = stmt
.parameters
.iter()
.map(|p| StoredParameter {
name: p.name.value.clone(),
data_type: p.data_type.clone(),
})
.collect();
let stored_function = StoredFunction {
id: 0, schema: Some(
stmt.function_name
.schema()
.unwrap_or_else(|| ctx.current_schema().unwrap_or("public").to_string())
.to_uppercase(),
),
name: function_name_upper.clone(),
parameters: stored_parameters,
return_type: stmt.return_type.clone(),
language: stmt.language.clone(),
code: stmt.body.clone(),
};
let param_names: Vec<String> = stmt
.parameters
.iter()
.map(|p| p.name.value.clone())
.collect();
if !self.function_registry.is_language_supported(&stmt.language) {
return Err(Error::internal(format!(
"Unsupported language: {}",
stmt.language
)));
}
self.function_registry.register_user_defined(
function_name_upper.clone(),
stmt.body.clone(),
stmt.language.clone(),
param_names,
FunctionSignature::new(
FunctionDataType::Unknown,
vec![],
stmt.parameters.len(),
stmt.parameters.len(),
),
)?;
if let Err(e) = self.insert_function(&stored_function) {
let _ = self
.function_registry
.unregister_user_defined(&function_name_upper);
return Err(e);
}
Ok(Box::new(EmptyResult::new()))
}
pub(crate) fn execute_drop_function(
&self,
stmt: &DropFunctionStatement,
_ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
let function_name = stmt.function_name.function();
if !self.function_exists(&function_name)? {
if stmt.if_exists {
return Ok(Box::new(EmptyResult::new()));
}
return Err(Error::FunctionNotFound(function_name.clone()));
}
self.delete_function(&function_name)?;
self.function_registry
.unregister_user_defined(&function_name)?;
Ok(Box::new(EmptyResult::new()))
}
pub(crate) fn ensure_functions_table_exists(&self) -> Result<()> {
let tx = self.engine.begin_transaction()?;
let tables = tx.list_tables()?;
let has_functions_table = tables.iter().any(|t| t.eq_ignore_ascii_case(SYS_FUNCTIONS));
drop(tx);
if !has_functions_table {
self.execute_functions_sql(CREATE_FUNCTIONS_SQL)?;
}
Ok(())
}
fn function_exists(&self, function_name: &str) -> Result<bool> {
let tx = self.engine.begin_transaction()?;
let table = match tx.get_table(SYS_FUNCTIONS) {
Ok(table) => table,
Err(_) => return Ok(false), };
let mut scanner = table.scan(&[], None)?;
while scanner.next() {
let row = scanner.row();
if let Some(Value::Text(name)) = row.get(2) {
if name.eq_ignore_ascii_case(function_name) {
return Ok(true);
}
}
}
Ok(false)
}
fn insert_function(&self, function: &StoredFunction) -> Result<()> {
let mut tx = self.engine.begin_transaction()?;
let mut table = tx.get_table(SYS_FUNCTIONS)?;
let parameters_json = serde_json::to_string(&function.parameters).map_err(|e| {
Error::internal(format!("Failed to serialize function parameters: {}", e))
})?;
let schema_value = match &function.schema {
Some(schema) => Value::Text(Arc::from(schema.clone())),
None => Value::Null(DataType::Text),
};
let values = vec![
Value::Null(DataType::Integer), schema_value, Value::Text(Arc::from(function.name.clone())), Value::Text(Arc::from(parameters_json)), Value::Text(Arc::from(function.return_type.clone())), Value::Text(Arc::from(function.language.clone())), Value::Text(Arc::from(function.code.clone())), ];
let row = Row::from_values(values);
table.insert(row)?;
tx.commit()?;
Ok(())
}
fn delete_function(&self, function_name: &str) -> Result<()> {
let mut tx = self.engine.begin_transaction()?;
let mut table = tx.get_table(SYS_FUNCTIONS)?;
let mut scanner = table.scan(&[], None)?;
let mut function_id: Option<Value> = None;
while scanner.next() {
let row = scanner.row();
if let Some(Value::Text(name)) = row.get(2) {
if name.eq_ignore_ascii_case(function_name) {
function_id = row.get(0).cloned(); break;
}
}
}
if let Some(id_value) = function_id {
use crate::storage::expression::{ComparisonExpr, Expression as StorageExpr};
let mut id_expr = ComparisonExpr::new("id", crate::core::Operator::Eq, id_value);
let schema = table.schema();
id_expr.prepare_for_schema(schema);
table.delete(Some(&id_expr))?;
}
tx.commit()?;
Ok(())
}
fn execute_functions_sql(&self, sql: &str) -> Result<()> {
let mut parser = crate::parser::Parser::new(sql);
let program = parser
.parse_program()
.map_err(|e| Error::parse(e.to_string()))?;
for stmt in &program.statements {
let ctx = crate::executor::context::ExecutionContextBuilder::new()
.with_internal(true)
.build();
self.execute_statement(stmt, &ctx)?;
}
Ok(())
}
pub(crate) fn execute_drop_columnar_index(
&self,
_stmt: &DropColumnarIndexStatement,
_ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
Err(Error::internal(
"DROP COLUMNAR INDEX syntax is deprecated. Use DROP INDEX index_name ON table_name instead.",
))
}
pub(crate) fn parse_data_type(&self, type_str: &str) -> Result<DataType> {
let upper = type_str.to_uppercase();
let base_type = upper.split('(').next().unwrap_or(&upper);
match base_type {
"INTEGER" | "INT" | "BIGINT" | "SMALLINT" | "TINYINT" => Ok(DataType::Integer),
"FLOAT" | "DOUBLE" | "REAL" | "DECIMAL" | "NUMERIC" => Ok(DataType::Float),
"TEXT" | "VARCHAR" | "CHAR" | "STRING" | "CLOB" => Ok(DataType::Text),
"BOOLEAN" | "BOOL" => Ok(DataType::Boolean),
"TIMESTAMP" | "DATETIME" | "DATE" | "TIME" => Ok(DataType::Timestamp),
"JSON" | "JSONB" => Ok(DataType::Json),
"BLOB" | "BINARY" | "VARBINARY" => Ok(DataType::Text),
_ => Err(Error::Type(format!("Unknown data type: {}", type_str))),
}
}
fn evaluate_default_expression(
&self,
default_expr: &str,
target_type: DataType,
) -> Result<Value> {
use crate::parser::parse_sql;
let sql = format!("SELECT {}", default_expr);
let stmts = match parse_sql(&sql) {
Ok(s) => s,
Err(_) => return Ok(Value::null(target_type)),
};
if stmts.is_empty() {
return Ok(Value::null(target_type));
}
if let Statement::Select(select) = &stmts[0] {
if let Some(expr) = select.columns.first() {
let mut eval = ExpressionEval::compile(expr, &[])?;
let value = eval.eval_slice(&[])?;
return Ok(value.into_coerce_to_type(target_type));
}
}
Ok(Value::null(target_type))
}
pub(crate) fn execute_create_sequence(
&self,
stmt: &CreateSequenceStatement,
ctx: &ExecutionContext,
) -> Result<Box<dyn crate::storage::traits::QueryResult>> {
let schema_name = stmt
.name
.schema()
.unwrap_or_else(|| ctx.current_schema().unwrap_or("public").to_string())
.to_lowercase();
let name = stmt.name.table().to_string();
if self.engine.sequence_exists(&schema_name, &name)? {
if stmt.if_not_exists {
return Ok(Box::new(crate::executor::result::ExecResult::new(0, 0)));
}
return Err(Error::SequenceAlreadyExists(name));
}
let mut options = crate::core::SequenceOptions::default();
if let Some(v) = stmt.start_with {
options.start_with = v;
}
if let Some(v) = stmt.increment_by {
options.increment_by = v;
}
if let Some(v) = stmt.min_value {
options.min_value = v;
}
if let Some(v) = stmt.max_value {
options.max_value = v;
}
options.cycle = stmt.cycle;
self.engine.create_sequence(&schema_name, &name, options)?;
Ok(Box::new(crate::executor::result::ExecResult::new(0, 0)))
}
pub(crate) fn execute_alter_sequence(
&self,
stmt: &AlterSequenceStatement,
ctx: &ExecutionContext,
) -> Result<Box<dyn crate::storage::traits::QueryResult>> {
let schema_name = stmt
.name
.schema()
.unwrap_or_else(|| ctx.current_schema().unwrap_or("public").to_string())
.to_lowercase();
let name = stmt.name.table().to_string();
if !self.engine.sequence_exists(&schema_name, &name)? {
if stmt.if_exists {
return Ok(Box::new(crate::executor::result::ExecResult::new(0, 0)));
}
return Err(Error::SequenceNotFound(name));
}
let mut options = crate::core::SequenceOptions::default();
if let Some(v) = stmt.restart_with {
options.start_with = v;
}
if let Some(v) = stmt.increment_by {
options.increment_by = v;
}
if let Some(v) = stmt.min_value {
options.min_value = v;
}
if let Some(v) = stmt.max_value {
options.max_value = v;
}
if let Some(v) = stmt.cycle {
options.cycle = v;
}
self.engine.alter_sequence(&schema_name, &name, options)?;
Ok(Box::new(crate::executor::result::ExecResult::new(0, 0)))
}
pub(crate) fn execute_drop_sequence(
&self,
stmt: &DropSequenceStatement,
ctx: &ExecutionContext,
) -> Result<Box<dyn crate::storage::traits::QueryResult>> {
let schema_name = stmt
.name
.schema()
.unwrap_or_else(|| ctx.current_schema().unwrap_or("public").to_string())
.to_lowercase();
let name = stmt.name.table().to_string();
if !self.engine.sequence_exists(&schema_name, &name)? {
if stmt.if_exists {
return Ok(Box::new(crate::executor::result::ExecResult::new(0, 0)));
}
return Err(Error::SequenceNotFound(name));
}
self.engine.drop_sequence(&schema_name, &name)?;
Ok(Box::new(crate::executor::result::ExecResult::new(0, 0)))
}
pub(crate) fn execute_create_schema(
&self,
stmt: &CreateSchemaStatement,
_ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
let schema_name = stmt.schema_name.value.to_lowercase();
{
let schemas = self.engine.schemas.read().unwrap();
if schemas.contains_key(&schema_name) {
if stmt.if_not_exists {
return Ok(Box::new(ExecResult::empty()));
}
return Err(Error::SchemaAlreadyExists);
}
}
let mut active_tx = self.active_transaction.lock().unwrap();
if let Some(ref mut tx_state) = *active_tx {
{
let mut schemas = self.engine.schemas.write().unwrap();
schemas.insert(schema_name.clone(), FxHashMap::default());
}
tx_state
.ddl_undo_log
.push(super::DeferredDdlOperation::CreateSchema { name: schema_name });
} else {
{
let mut schemas = self.engine.schemas.write().unwrap();
schemas.insert(schema_name, FxHashMap::default());
}
}
Ok(Box::new(ExecResult::empty()))
}
pub(crate) fn execute_drop_schema(
&self,
stmt: &DropSchemaStatement,
_ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
let schema_name = stmt.schema_name.value.to_lowercase();
let tables = if let Some(ref tx_state) = *self.active_transaction.lock().unwrap() {
tx_state.transaction.list_tables()?
} else {
let mut tx = self.engine.begin_transaction()?;
let t = tx.list_tables()?;
tx.commit()?;
t
};
let mut tables_to_drop = Vec::new();
for table_name in tables {
if table_name.starts_with(&format!("{}.", schema_name)) {
let schema = self.engine.get_table_schema(&table_name)?;
tables_to_drop.push((table_name, schema));
}
}
let mut active_tx = self.active_transaction.lock().unwrap();
if let Some(ref mut tx_state) = *active_tx {
for (table_name, _) in &tables_to_drop {
self.engine.drop_table_internal(table_name)?;
}
{
let mut schemas = self.engine.schemas.write().unwrap();
schemas.remove(&schema_name);
}
tx_state
.ddl_undo_log
.push(super::DeferredDdlOperation::DropSchema {
name: schema_name,
tables: tables_to_drop,
});
} else {
{
let mut tx = self.engine.begin_transaction()?;
for (table_name, _) in &tables_to_drop {
tx.drop_table(table_name)?;
}
tx.commit()?;
}
{
let mut schemas = self.engine.schemas.write().unwrap();
schemas.remove(&schema_name);
}
}
Ok(Box::new(ExecResult::empty()))
}
pub(crate) fn execute_use_schema(
&self,
_stmt: &UseSchemaStatement,
_ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
Ok(Box::new(ExecResult::empty()))
}
pub(crate) fn ensure_triggers_table_exists(&self) -> Result<()> {
let tx = self.engine.begin_transaction()?;
let tables = tx.list_tables()?;
let has_triggers_table = tables
.iter()
.any(|t| t.eq_ignore_ascii_case(crate::storage::triggers::SYS_TRIGGERS));
drop(tx);
if !has_triggers_table {
self.execute_functions_sql(crate::storage::triggers::CREATE_TRIGGERS_SQL)?;
}
Ok(())
}
fn trigger_exists(&self, trigger_name: &str) -> Result<bool> {
let tx = self.engine.begin_transaction()?;
let tables = tx.list_tables()?;
if !tables
.iter()
.any(|t| t.eq_ignore_ascii_case(crate::storage::triggers::SYS_TRIGGERS))
{
return Ok(false);
}
let table = tx.get_table(crate::storage::triggers::SYS_TRIGGERS)?;
let mut scanner = table.scan(&[], None)?;
while scanner.next() {
let row = scanner.row();
if let Some(Value::Text(name)) = row.get(2) {
if name.eq_ignore_ascii_case(trigger_name) {
return Ok(true);
}
}
}
Ok(false)
}
fn insert_trigger(&self, trigger: &crate::storage::triggers::StoredTrigger) -> Result<()> {
let (tx, mut table, auto_commit) =
self.start_transaction_for_dml(crate::storage::triggers::SYS_TRIGGERS)?;
let row_values = vec![
Value::Null(crate::core::DataType::Integer), trigger
.schema
.as_ref()
.map(|s| Value::text(s.clone()))
.unwrap_or(Value::Null(crate::core::DataType::Null)),
Value::text(trigger.name.clone()),
Value::text(trigger.table_name.clone()),
Value::text(trigger.timing.clone()),
Value::text(trigger.event.clone()),
Value::Boolean(trigger.for_each_row),
Value::text(trigger.language.clone()),
Value::text(trigger.code.clone()),
];
table.insert(crate::core::Row::from(row_values))?;
if auto_commit {
if let Some(mut tx) = tx {
tx.commit()?;
}
}
Ok(())
}
fn delete_table_triggers(&self, table_name: &str) -> Result<()> {
let (tx, mut table, auto_commit) =
self.start_transaction_for_dml(crate::storage::triggers::SYS_TRIGGERS)?;
let mut ids_to_delete = Vec::new();
let mut scanner = table.scan(&[], None)?;
while scanner.next() {
let row = scanner.row();
if let (Some(Value::Integer(id)), Some(Value::Text(target))) = (row.get(0), row.get(3))
{
if target.eq_ignore_ascii_case(table_name) {
ids_to_delete.push(*id);
}
}
}
for id in ids_to_delete {
let mut pk_expr = crate::storage::expression::ComparisonExpr::new(
"id",
crate::core::Operator::Eq,
Value::Integer(id),
);
pk_expr.prepare_for_schema(table.schema());
let _ = table.delete(Some(&pk_expr))?;
}
if auto_commit {
if let Some(mut tx) = tx {
tx.commit()?;
}
}
self.trigger_registry.remove_table_triggers(table_name);
Ok(())
}
fn delete_trigger(&self, trigger_name: &str) -> Result<bool> {
let (tx, mut table, auto_commit) =
self.start_transaction_for_dml(crate::storage::triggers::SYS_TRIGGERS)?;
let mut ids_to_delete = Vec::new();
let mut scanner = table.scan(&[], None)?;
while scanner.next() {
let row = scanner.row();
if let (Some(Value::Integer(id)), Some(Value::Text(name))) = (row.get(0), row.get(2)) {
if name.eq_ignore_ascii_case(trigger_name) {
ids_to_delete.push(*id);
}
}
}
let mut deleted = false;
use crate::storage::expression::{ComparisonExpr, Expression as StorageExpr};
for id in ids_to_delete {
let mut id_expr =
ComparisonExpr::new("id", crate::core::Operator::Eq, Value::Integer(id));
let schema = table.schema();
id_expr.prepare_for_schema(schema);
table.delete(Some(&id_expr))?;
deleted = true;
}
if auto_commit {
if let Some(mut tx) = tx {
tx.commit()?;
}
}
Ok(deleted)
}
pub(crate) fn execute_create_schedule(
&self,
stmt: &CreateScheduleStatement,
_ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
self.ensure_cron_tables_exist()?;
let mut tx = self.engine.begin_transaction()?;
let mut table = tx.get_table(crate::storage::jobs::SYS_CRON)?;
if let Err(e) = stmt.cron_expr.parse::<cron::Schedule>() {
return Err(Error::internal(format!("Invalid CRON expression: {}", e)));
}
let values = vec![
crate::core::value::Value::Null(DataType::Integer), crate::core::value::Value::text(stmt.name.to_uppercase()),
crate::core::value::Value::text(stmt.cron_expr.clone()),
crate::core::value::Value::text(stmt.command.clone()),
crate::core::value::Value::Boolean(true), ];
let row = Row::from_values(values);
table.insert(row)?;
tx.commit()?;
Ok(Box::new(ExecResult::new(1, 0)))
}
pub(crate) fn execute_alter_schedule(
&self,
stmt: &AlterScheduleStatement,
_ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
use crate::executor::expression::RowFilter;
self.ensure_cron_tables_exist()?;
let mut tx = self.engine.begin_transaction()?;
let mut table = tx.get_table(crate::storage::jobs::SYS_CRON)?;
let name_upper = stmt.name.to_uppercase();
let where_expr =
crate::parser::ast::Expression::Infix(crate::parser::ast::InfixExpression::new(
crate::parser::token::Token::new(
crate::parser::token::TokenType::Operator,
"=",
crate::parser::token::Position::default(),
),
Box::new(crate::parser::ast::Expression::Identifier(
crate::parser::ast::Identifier {
token: crate::parser::token::Token::new(
crate::parser::token::TokenType::Identifier,
"name",
crate::parser::token::Position::default(),
),
value: "name".to_string(),
value_lower: "name".to_string(),
},
)),
"=".to_string(),
Box::new(crate::parser::ast::Expression::StringLiteral(
crate::parser::ast::StringLiteral {
token: crate::parser::token::Token::new(
crate::parser::token::TokenType::String,
name_upper.clone(),
crate::parser::token::Position::default(),
),
value: name_upper.clone(),
type_hint: None,
},
)),
));
let schema = table.schema();
let col_names: Vec<String> = schema
.column_names()
.iter()
.map(|s| s.to_string())
.collect();
let row_filter = RowFilter::new(&where_expr, &col_names)?;
let mut scanner = table.scan(&[], None)?;
let mut id_to_update = None;
while scanner.next() {
let row = scanner.row();
if row_filter.matches(row) {
if let Some(crate::core::value::Value::Integer(id)) = row.get(0) {
id_to_update = Some(*id);
break;
}
}
}
if let Some(id) = id_to_update {
let pk_expr = crate::storage::expression::ComparisonExpr::new(
"id".to_string(),
crate::core::Operator::Eq,
crate::core::value::Value::Integer(id),
);
let mut setter = |mut row: Row| -> Result<(Row, bool)> {
let _ = row.set(4, crate::core::value::Value::Boolean(stmt.active));
Ok((row, true))
};
table.update(Some(&pk_expr), &mut setter)?;
tx.commit()?;
Ok(Box::new(ExecResult::new(0, 1)))
} else {
Err(Error::internal(format!(
"Schedule not found: {}",
name_upper
)))
}
}
pub(crate) fn execute_drop_schedule(
&self,
stmt: &DropScheduleStatement,
_ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
use crate::executor::expression::RowFilter;
self.ensure_cron_tables_exist()?;
let mut tx = self.engine.begin_transaction()?;
let mut table = tx.get_table(crate::storage::jobs::SYS_CRON)?;
let name_upper = stmt.name.to_uppercase();
let where_expr =
crate::parser::ast::Expression::Infix(crate::parser::ast::InfixExpression::new(
crate::parser::token::Token::new(
crate::parser::token::TokenType::Operator,
"=",
crate::parser::token::Position::default(),
),
Box::new(crate::parser::ast::Expression::Identifier(
crate::parser::ast::Identifier {
token: crate::parser::token::Token::new(
crate::parser::token::TokenType::Identifier,
"name",
crate::parser::token::Position::default(),
),
value: "name".to_string(),
value_lower: "name".to_string(),
},
)),
"=".to_string(),
Box::new(crate::parser::ast::Expression::StringLiteral(
crate::parser::ast::StringLiteral {
token: crate::parser::token::Token::new(
crate::parser::token::TokenType::String,
name_upper.clone(),
crate::parser::token::Position::default(),
),
value: name_upper.clone(),
type_hint: None,
},
)),
));
let schema = table.schema();
let col_names: Vec<String> = schema
.column_names()
.iter()
.map(|s| s.to_string())
.collect();
let row_filter = RowFilter::new(&where_expr, &col_names)?;
let mut scanner = table.scan(&[], None)?;
let mut id_to_delete = None;
while scanner.next() {
let row = scanner.row();
if row_filter.matches(row) {
if let Some(crate::core::value::Value::Integer(id)) = row.get(0) {
id_to_delete = Some(*id);
break;
}
}
}
let affected = if let Some(id) = id_to_delete {
let mut pk_expr = crate::storage::expression::ComparisonExpr::new(
"id".to_string(),
crate::core::Operator::Eq,
crate::core::value::Value::Integer(id),
);
pk_expr.prepare_for_schema(schema);
table.delete(Some(&pk_expr))?
} else {
0
};
tx.commit()?;
Ok(Box::new(ExecResult::new(0, affected as i64)))
}
pub(crate) fn execute_create_trigger(
&self,
stmt: &crate::parser::ast::CreateTriggerStatement,
ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
self.ensure_triggers_table_exists()?;
let trigger_name_upper = stmt.trigger_name.value.to_uppercase();
let exists = self.trigger_exists(&trigger_name_upper)?;
if exists && !stmt.if_not_exists {
return Err(Error::internal(format!(
"Trigger {} already exists",
trigger_name_upper
)));
} else if exists && stmt.if_not_exists {
return Ok(Box::new(ExecResult::new(0, 0)));
}
if !self.function_registry.is_language_supported(&stmt.language) {
return Err(Error::internal(format!(
"Unsupported language for trigger: {}",
stmt.language
)));
}
let stored_trigger = crate::storage::triggers::StoredTrigger {
id: 0,
schema: Some(ctx.current_schema().unwrap_or("public").to_uppercase()),
name: trigger_name_upper.clone(),
table_name: stmt.table_name.value().to_uppercase(),
timing: stmt.timing.to_string(),
event: stmt.event.to_string(),
for_each_row: stmt.for_each_row,
language: stmt.language.clone(),
code: stmt.body.clone(),
};
self.insert_trigger(&stored_trigger)?;
self.trigger_registry.add_trigger(stored_trigger);
Ok(Box::new(ExecResult::new(1, 0)))
}
pub(crate) fn execute_drop_trigger(
&self,
stmt: &crate::parser::ast::DropTriggerStatement,
_ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
self.ensure_triggers_table_exists()?;
let trigger_name_upper = stmt.trigger_name.value.to_uppercase();
let deleted = self.delete_trigger(&trigger_name_upper)?;
if deleted {
self.trigger_registry.remove_trigger(&trigger_name_upper);
}
if !deleted && !stmt.if_exists {
return Err(Error::internal(format!(
"Trigger {} does not exist",
trigger_name_upper
)));
}
Ok(Box::new(ExecResult::new(if deleted { 1 } else { 0 }, 0)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::mvcc::engine::MVCCEngine;
use std::sync::Arc;
fn create_test_executor() -> Executor {
let engine = MVCCEngine::in_memory();
engine.open_engine().unwrap();
Executor::new(Arc::new(engine))
}
#[test]
fn test_create_table() {
let executor = create_test_executor();
let result = executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL)")
.unwrap();
assert_eq!(result.rows_affected(), 0);
assert!(executor.engine().table_exists("users").unwrap());
}
#[test]
fn test_create_table_if_not_exists() {
let executor = create_test_executor();
executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY)")
.unwrap();
let result = executor
.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY)")
.unwrap();
assert_eq!(result.rows_affected(), 0);
}
#[test]
fn test_create_table_already_exists() {
let executor = create_test_executor();
executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY)")
.unwrap();
let result = executor.execute("CREATE TABLE users (id INTEGER PRIMARY KEY)");
assert!(result.is_err());
}
#[test]
fn test_drop_table() {
let executor = create_test_executor();
executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY)")
.unwrap();
assert!(executor.engine().table_exists("users").unwrap());
executor.execute("DROP TABLE users").unwrap();
assert!(!executor.engine().table_exists("users").unwrap());
}
#[test]
fn test_drop_table_if_exists() {
let executor = create_test_executor();
let result = executor
.execute("DROP TABLE IF EXISTS nonexistent")
.unwrap();
assert_eq!(result.rows_affected(), 0);
}
#[test]
fn test_drop_table_not_found() {
let executor = create_test_executor();
let result = executor.execute("DROP TABLE nonexistent");
assert!(result.is_err());
}
#[test]
fn test_create_index() {
let executor = create_test_executor();
executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
.unwrap();
let result = executor
.execute("CREATE INDEX idx_name ON users (name)")
.unwrap();
assert_eq!(result.rows_affected(), 0);
}
#[test]
fn test_create_unique_index() {
let executor = create_test_executor();
executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT)")
.unwrap();
let result = executor
.execute("CREATE UNIQUE INDEX idx_email ON users (email)")
.unwrap();
assert_eq!(result.rows_affected(), 0);
}
#[test]
fn test_parse_data_type() {
let executor = create_test_executor();
assert_eq!(
executor.parse_data_type("INTEGER").unwrap(),
DataType::Integer
);
assert_eq!(executor.parse_data_type("INT").unwrap(), DataType::Integer);
assert_eq!(
executor.parse_data_type("BIGINT").unwrap(),
DataType::Integer
);
assert_eq!(executor.parse_data_type("FLOAT").unwrap(), DataType::Float);
assert_eq!(executor.parse_data_type("DOUBLE").unwrap(), DataType::Float);
assert_eq!(executor.parse_data_type("TEXT").unwrap(), DataType::Text);
assert_eq!(
executor.parse_data_type("VARCHAR(255)").unwrap(),
DataType::Text
);
assert_eq!(
executor.parse_data_type("BOOLEAN").unwrap(),
DataType::Boolean
);
assert_eq!(
executor.parse_data_type("TIMESTAMP").unwrap(),
DataType::Timestamp
);
assert_eq!(executor.parse_data_type("JSON").unwrap(), DataType::Json);
}
}