pub mod context;
pub mod expression;
pub mod parallel;
pub mod pattern_cache;
pub mod planner;
pub mod query_cache;
pub mod result;
pub mod semantic_cache;
pub mod statistics;
mod aggregation;
mod cte;
mod ddl;
mod dml;
mod explain;
mod information_schema;
mod join;
pub mod pushdown;
mod query;
mod set_ops;
mod show;
mod subquery;
pub mod system_schema;
pub mod triggers;
pub mod utils;
mod window;
use rustc_hash::FxHashMap;
use std::sync::{Arc, Mutex};
use crate::core::{Error, Result, Value};
use crate::functions::{global_registry, FunctionDataType, FunctionRegistry, FunctionSignature};
use crate::parser::ast::{Program, Statement};
use crate::parser::Parser;
use crate::storage::mvcc::engine::MVCCEngine;
use crate::storage::traits::{Engine, QueryResult, Table, Transaction};
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum DeferredDdlOperation {
CreateTable {
name: String,
}, DropTable {
name: String,
schema: crate::core::Schema,
}, CreateSchema {
name: String,
}, DropSchema {
name: String,
tables: Vec<(String, crate::core::Schema)>,
}, }
use crate::storage::functions::StoredParameter;
use serde_json;
pub use context::{ExecutionContext, TimeoutGuard};
pub use expression::{
CompileContext, CompileError, CompiledEvaluator, ExecuteContext, ExprCompiler, ExprVM,
Program as ExprProgram,
};
pub use parallel::{
hash_row_by_keys,
parallel_hash_build,
parallel_hash_join,
parallel_hash_probe,
parallel_order_by,
parallel_order_by_fn,
verify_key_match,
JoinType as ParallelJoinType,
ParallelConfig,
ParallelHashTable,
ParallelJoinResult,
ParallelStats,
SortDirection,
SortSpec,
DEFAULT_PARALLEL_CHUNK_SIZE,
DEFAULT_PARALLEL_FILTER_THRESHOLD,
DEFAULT_PARALLEL_JOIN_THRESHOLD,
DEFAULT_PARALLEL_SORT_THRESHOLD,
};
pub use planner::{
AccessPlan, ColumnStatsCache, JoinPlan, QueryPlanner, RuntimeJoinAlgorithm,
RuntimeJoinDecision, StatsHealth,
};
pub use query_cache::{CacheStats, CachedQueryPlan, QueryCache, DEFAULT_CACHE_SIZE};
pub use result::{ExecResult, ExecutorMemoryResult};
pub use semantic_cache::{
CacheLookupResult, CachedResult, QueryFingerprint, SemanticCache, SemanticCacheStats,
SemanticCacheStatsSnapshot, SubsumptionResult, DEFAULT_CACHE_TTL_SECS, DEFAULT_MAX_CACHED_ROWS,
DEFAULT_SEMANTIC_CACHE_SIZE,
};
pub(crate) struct ActiveTransaction {
transaction: Box<dyn Transaction>,
tables: FxHashMap<String, Box<dyn Table>>,
ddl_undo_log: Vec<DeferredDdlOperation>,
is_explicit_tx: bool,
}
pub struct Executor {
engine: Arc<MVCCEngine>,
function_registry: Arc<FunctionRegistry>,
default_isolation_level: crate::core::IsolationLevel,
query_cache: QueryCache,
semantic_cache: SemanticCache,
active_transaction: Mutex<Option<ActiveTransaction>>,
query_planner: std::sync::OnceLock<QueryPlanner>,
trigger_registry: Arc<triggers::TriggerRegistry>,
}
impl Executor {
pub fn new(engine: Arc<MVCCEngine>) -> Self {
let executor = Self {
engine,
function_registry: Arc::clone(global_registry()),
default_isolation_level: crate::core::IsolationLevel::ReadCommitted,
query_cache: QueryCache::default(),
semantic_cache: SemanticCache::default(),
active_transaction: Mutex::new(None),
query_planner: std::sync::OnceLock::new(),
trigger_registry: Arc::new(triggers::TriggerRegistry::new()),
};
let _ = executor.load_functions();
let _ = executor.load_procedures();
let _ = executor.load_triggers();
executor
}
pub fn with_function_registry(
engine: Arc<MVCCEngine>,
function_registry: Arc<FunctionRegistry>,
) -> Self {
let executor = Self {
engine,
function_registry,
default_isolation_level: crate::core::IsolationLevel::ReadCommitted,
query_cache: QueryCache::default(),
semantic_cache: SemanticCache::default(),
active_transaction: Mutex::new(None),
query_planner: std::sync::OnceLock::new(),
trigger_registry: Arc::new(triggers::TriggerRegistry::new()),
};
let _ = executor.load_functions();
let _ = executor.load_procedures();
let _ = executor.load_triggers();
executor
}
pub fn with_cache_size(engine: Arc<MVCCEngine>, cache_size: usize) -> Self {
let executor = Self {
engine,
function_registry: Arc::clone(crate::functions::global_registry()),
default_isolation_level: crate::core::IsolationLevel::ReadCommitted,
query_cache: QueryCache::new(cache_size),
semantic_cache: SemanticCache::default(),
active_transaction: Mutex::new(None),
query_planner: std::sync::OnceLock::new(),
trigger_registry: Arc::new(triggers::TriggerRegistry::new()),
};
let _ = executor.load_functions();
let _ = executor.load_procedures();
let _ = executor.load_triggers();
executor
}
pub fn has_active_transaction(&self) -> bool {
self.active_transaction.lock().unwrap().is_some()
}
fn get_query_planner(&self) -> &QueryPlanner {
self.query_planner
.get_or_init(|| QueryPlanner::new(Arc::clone(&self.engine)))
}
fn load_functions(&self) -> Result<()> {
use crate::storage::functions::SYS_FUNCTIONS;
let tx = self.engine.begin_transaction()?;
let tables = tx.list_tables()?;
let has_functions_table = tables.iter().any(|t| t.eq_ignore_ascii_case(SYS_FUNCTIONS));
if !has_functions_table {
return Ok(());
}
let table = tx.get_table(SYS_FUNCTIONS)?;
let mut scanner = table.scan(&[], None)?;
while scanner.next() {
let row = scanner.row();
if let (
Some(Value::Text(name)),
Some(Value::Text(parameters_json)),
Some(Value::Text(_return_type)),
Some(Value::Text(_language)),
Some(Value::Text(code)),
) = (row.get(1), row.get(2), row.get(3), row.get(4), row.get(5))
{
let stored_parameters: Vec<StoredParameter> = serde_json::from_str(parameters_json)
.map_err(|e| {
Error::internal(format!("Failed to parse function parameters: {}", e))
})?;
let parameters: Vec<crate::parser::ast::FunctionParameter> = stored_parameters
.into_iter()
.map(|sp| crate::parser::ast::FunctionParameter {
name: crate::parser::ast::Identifier::new(
crate::parser::token::Token::new(
crate::parser::token::TokenType::Identifier,
sp.name.clone(),
crate::parser::token::Position::default(),
),
sp.name,
),
data_type: sp.data_type,
})
.collect();
fn string_to_function_data_type(s: &str) -> FunctionDataType {
match s.to_uppercase().as_str() {
"ANY" => FunctionDataType::Any,
"INTEGER" => FunctionDataType::Integer,
"FLOAT" => FunctionDataType::Float,
"TEXT" => FunctionDataType::String,
"BOOLEAN" => FunctionDataType::Boolean,
"TIMESTAMP" => FunctionDataType::Timestamp,
"DATE" => FunctionDataType::Date,
"TIME" => FunctionDataType::Time,
"JSON" => FunctionDataType::Json,
_ => FunctionDataType::Unknown,
}
}
let language = if self.function_registry.is_language_supported(_language) {
_language.to_string()
} else {
"rhai".to_string()
};
let param_names: Vec<String> =
parameters.iter().map(|p| p.name.value.clone()).collect();
self.function_registry.register_user_defined(
name.to_string(),
code.to_string(),
language,
param_names,
FunctionSignature::new(
string_to_function_data_type(_return_type),
parameters
.iter()
.map(|p| string_to_function_data_type(&p.data_type))
.collect(),
parameters.len(),
parameters.len(),
),
)?;
}
}
Ok(())
}
fn load_procedures(&self) -> Result<()> {
use crate::storage::procedures::SYS_PROCEDURES;
let tx = self.engine.begin_transaction()?;
let tables = tx.list_tables()?;
let has_procedures_table = tables
.iter()
.any(|t| t.eq_ignore_ascii_case(SYS_PROCEDURES));
if !has_procedures_table {
return Ok(());
}
let table = tx.get_table(SYS_PROCEDURES)?;
let mut scanner = table.scan(&[], None)?;
while scanner.next() {
let row = scanner.row();
if let (
Some(Value::Integer(id)),
Some(Value::Text(name)),
Some(Value::Text(parameters_json)),
Some(Value::Text(language)),
Some(Value::Text(code)),
) = (row.get(0), row.get(2), row.get(3), row.get(4), row.get(5))
{
let schema_val = row.get(1).and_then(|v| match v {
Value::Text(s) => Some(s.to_string()),
_ => None,
});
let stored_parameters: Vec<crate::storage::procedures::StoredProcedureParameter> =
serde_json::from_str(parameters_json).map_err(|e| {
Error::internal(format!("Failed to parse procedure parameters: {}", e))
})?;
let procedure = crate::storage::procedures::StoredProcedure {
id: *id,
schema: schema_val,
name: name.to_string(),
parameters: stored_parameters,
language: language.to_string(),
code: code.to_string(),
};
self.function_registry.register_procedure(name, procedure);
}
}
Ok(())
}
fn load_triggers(&self) -> Result<()> {
use crate::storage::triggers::SYS_TRIGGERS;
let tx = self.engine.begin_transaction()?;
let tables = tx.list_tables()?;
let has_triggers_table = tables.iter().any(|t| t.eq_ignore_ascii_case(SYS_TRIGGERS));
if !has_triggers_table {
return Ok(());
}
let table = tx.get_table(SYS_TRIGGERS)?;
let mut scanner = table.scan(&[], None)?;
let mut triggers = Vec::new();
while scanner.next() {
let row = scanner.row();
if let (
Some(Value::Integer(id)),
Some(Value::Text(name)),
Some(Value::Text(table_name)),
Some(Value::Text(timing)),
Some(Value::Text(event)),
Some(Value::Boolean(for_each_row)),
Some(Value::Text(language)),
Some(Value::Text(code)),
) = (
row.get(0),
row.get(2),
row.get(3),
row.get(4),
row.get(5),
row.get(6),
row.get(7),
row.get(8),
) {
let schema_val = row.get(1).and_then(|v| match v {
Value::Text(s) => Some(s.to_string()),
_ => None,
});
triggers.push(crate::storage::triggers::StoredTrigger {
id: *id,
schema: schema_val,
name: name.to_string(),
table_name: table_name.to_string(),
timing: timing.to_string(),
event: event.to_string(),
for_each_row: *for_each_row,
language: language.to_string(),
code: code.to_string(),
});
}
}
self.trigger_registry.load_triggers(triggers);
Ok(())
}
#[allow(dead_code)]
fn get_table_for_dml(&self, table_name: &str) -> Result<(Box<dyn Table>, bool)> {
let mut active_tx = self.active_transaction.lock().unwrap();
if let Some(ref mut tx_state) = *active_tx {
let table_name_lower = table_name.to_lowercase();
if tx_state.tables.contains_key(&table_name_lower) {
let table = tx_state.transaction.get_table(table_name)?;
return Ok((table, false));
}
let table = tx_state.transaction.get_table(table_name)?;
tx_state.tables.insert(
table_name_lower.clone(),
tx_state.transaction.get_table(table_name)?,
);
Ok((table, false))
} else {
let tx = self.engine.begin_transaction()?;
let table = tx.get_table(table_name)?;
Ok((table, true))
}
}
#[allow(dead_code)]
#[allow(clippy::type_complexity)]
fn start_transaction_for_dml(
&self,
table_name: &str,
) -> Result<(Option<Box<dyn Transaction>>, Box<dyn Table>, bool)> {
let active_tx = self.active_transaction.lock().unwrap();
if active_tx.is_some() {
drop(active_tx);
let (table, auto_commit) = self.get_table_for_dml(table_name)?;
Ok((None, table, auto_commit))
} else {
drop(active_tx);
let tx = self.engine.begin_transaction()?;
let table = tx.get_table(table_name)?;
Ok((Some(tx), table, true))
}
}
pub fn set_default_isolation_level(&mut self, level: crate::core::IsolationLevel) {
self.default_isolation_level = level;
}
pub fn engine(&self) -> &Arc<MVCCEngine> {
&self.engine
}
pub fn function_registry(&self) -> &Arc<FunctionRegistry> {
&self.function_registry
}
pub fn execute(&self, sql: &str) -> Result<Box<dyn QueryResult>> {
let ctx = ExecutionContext::new();
self.execute_cached(sql, &ctx)
}
pub fn execute_with_params(&self, sql: &str, params: &[Value]) -> Result<Box<dyn QueryResult>> {
let ctx = ExecutionContext::with_params(params.to_vec());
self.execute_cached(sql, &ctx)
}
pub fn execute_with_named_params(
&self,
sql: &str,
params: std::collections::HashMap<String, Value>,
) -> Result<Box<dyn QueryResult>> {
let ctx = ExecutionContext::with_named_params(params);
self.execute_cached(sql, &ctx)
}
pub fn execute_with_context(
&self,
sql: &str,
ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
self.execute_cached(sql, ctx)
}
fn execute_cached(&self, sql: &str, ctx: &ExecutionContext) -> Result<Box<dyn QueryResult>> {
if let Some(cached) = self.query_cache.get(sql) {
if cached.has_params {
let provided = ctx.params().len();
if provided < cached.param_count {
return Err(Error::internal(format!(
"Query requires {} parameters but only {} provided",
cached.param_count, provided
)));
}
}
return self.execute_statement(&cached.statement, ctx);
}
let mut parser = Parser::new(sql);
let mut program = parser
.parse_program()
.map_err(|e| Error::parse(e.to_string()))?;
if program.statements.len() == 1 {
let stmt = program.statements.pop().unwrap();
let (has_params, param_count) = count_parameters(&stmt);
let stmt_arc = std::sync::Arc::new(stmt);
self.query_cache
.put(sql, stmt_arc.clone(), has_params, param_count);
return self.execute_statement(&stmt_arc, ctx);
}
self.execute_program_with_context(&program, ctx)
}
pub fn query_cache(&self) -> &QueryCache {
&self.query_cache
}
pub fn cache_stats(&self) -> CacheStats {
self.query_cache.stats()
}
pub fn clear_cache(&self) {
self.query_cache.clear();
}
pub fn semantic_cache(&self) -> &SemanticCache {
&self.semantic_cache
}
pub fn semantic_cache_stats(&self) -> SemanticCacheStatsSnapshot {
self.semantic_cache.stats()
}
pub fn clear_semantic_cache(&self) {
self.semantic_cache.clear();
}
pub fn invalidate_semantic_cache(&self, table_name: &str) {
self.semantic_cache.invalidate_table(table_name);
}
pub fn execute_program(&self, program: &Program) -> Result<Box<dyn QueryResult>> {
let ctx = ExecutionContext::new();
self.execute_program_with_context(program, &ctx)
}
pub fn execute_program_with_context(
&self,
program: &Program,
ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
if program.statements.is_empty() {
return Ok(Box::new(ExecResult::empty()));
}
let mut last_result: Option<Box<dyn QueryResult>> = None;
for statement in &program.statements {
last_result = Some(self.execute_statement(statement, ctx)?);
}
Ok(last_result.unwrap())
}
pub fn execute_statement(
&self,
statement: &Statement,
ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
let ctx = {
let active_tx = self.active_transaction.lock().unwrap();
if let Some(ref tx_state) = *active_tx {
let txn_id = tx_state.transaction.id();
ctx.with_transaction_id(txn_id as u64)
} else {
ctx.clone()
}
};
match statement {
Statement::CreateTable(stmt) => self.execute_create_table(stmt, &ctx),
Statement::DropTable(stmt) => self.execute_drop_table(stmt, &ctx),
Statement::CreateIndex(stmt) => self.execute_create_index(stmt, &ctx),
Statement::DropIndex(stmt) => self.execute_drop_index(stmt, &ctx),
Statement::AlterTable(stmt) => self.execute_alter_table(stmt, &ctx),
Statement::CreateView(stmt) => self.execute_create_view(stmt, &ctx),
Statement::DropView(stmt) => self.execute_drop_view(stmt, &ctx),
Statement::CreateColumnarIndex(stmt) => self.execute_create_columnar_index(stmt, &ctx),
Statement::DropColumnarIndex(stmt) => self.execute_drop_columnar_index(stmt, &ctx),
Statement::CreateSchema(stmt) => self.execute_create_schema(stmt, &ctx),
Statement::DropSchema(stmt) => self.execute_drop_schema(stmt, &ctx),
Statement::UseSchema(stmt) => self.execute_use_schema(stmt, &ctx),
Statement::Insert(stmt) => self.execute_insert(stmt, &ctx),
Statement::Update(stmt) => self.execute_update(stmt, &ctx),
Statement::Delete(stmt) => self.execute_delete(stmt, &ctx),
Statement::Truncate(stmt) => self.execute_truncate(stmt, &ctx),
Statement::Select(stmt) => self.execute_select(stmt, &ctx),
Statement::Begin(stmt) => self.execute_begin(stmt, &ctx),
Statement::Commit(stmt) => self.execute_commit_stmt(stmt, &ctx),
Statement::Rollback(stmt) => self.execute_rollback_stmt(stmt, &ctx),
Statement::Savepoint(stmt) => self.execute_savepoint(stmt, &ctx),
Statement::Set(stmt) => self.execute_set(stmt, &ctx),
Statement::ShowTables(stmt) => self.execute_show_tables(stmt, &ctx),
Statement::ShowViews(stmt) => self.execute_show_views(stmt, &ctx),
Statement::ShowFunctions(stmt) => self.execute_show_functions(stmt, &ctx),
Statement::ShowCreateTable(stmt) => self.execute_show_create_table(stmt, &ctx),
Statement::ShowCreateView(stmt) => self.execute_show_create_view(stmt, &ctx),
Statement::ShowIndexes(stmt) => self.execute_show_indexes(stmt, &ctx),
Statement::Describe(stmt) => self.execute_describe(stmt, &ctx),
Statement::Pragma(stmt) => self.execute_pragma(stmt, &ctx),
Statement::Expression(stmt) => self.execute_expression_stmt(stmt, &ctx),
Statement::Explain(stmt) => self.execute_explain(stmt, &ctx),
Statement::Analyze(stmt) => self.execute_analyze(stmt, &ctx),
Statement::CreateFunction(stmt) => self.execute_create_function(stmt, &ctx),
Statement::DropFunction(stmt) => self.execute_drop_function(stmt, &ctx),
Statement::CreateProcedure(stmt) => self.execute_create_procedure(stmt, &ctx),
Statement::CreateTrigger(stmt) => self.execute_create_trigger(stmt, &ctx),
Statement::DropTrigger(stmt) => self.execute_drop_trigger(stmt, &ctx),
Statement::Call(stmt) => self.execute_call(stmt, &ctx),
}
}
pub fn begin_transaction(&self) -> Result<Box<dyn Transaction>> {
self.engine.begin_transaction()
}
pub fn begin_transaction_with_isolation(
&self,
isolation: crate::core::IsolationLevel,
) -> Result<Box<dyn Transaction>> {
let mut tx = self.engine.begin_transaction()?;
let _ = tx.set_isolation_level(isolation);
Ok(tx)
}
}
fn count_parameters(stmt: &Statement) -> (bool, usize) {
use crate::parser::ast::*;
struct ParamCounter {
max_index: usize,
has_positional: bool,
}
impl ParamCounter {
fn new() -> Self {
Self {
max_index: 0,
has_positional: false,
}
}
fn visit_expr(&mut self, expr: &Expression) {
match expr {
Expression::Parameter(param) => {
if param.index > 0 {
self.max_index = self.max_index.max(param.index);
} else {
self.has_positional = true;
}
}
Expression::Infix(infix) => {
self.visit_expr(&infix.left);
self.visit_expr(&infix.right);
}
Expression::Prefix(prefix) => {
self.visit_expr(&prefix.right);
}
Expression::FunctionCall(func) => {
for arg in &func.arguments {
self.visit_expr(arg);
}
}
Expression::Case(case) => {
if let Some(val) = &case.value {
self.visit_expr(val);
}
for when in &case.when_clauses {
self.visit_expr(&when.condition);
self.visit_expr(&when.then_result);
}
if let Some(el) = &case.else_value {
self.visit_expr(el);
}
}
Expression::In(in_expr) => {
self.visit_expr(&in_expr.left);
self.visit_expr(&in_expr.right);
}
Expression::Between(between) => {
self.visit_expr(&between.expr);
self.visit_expr(&between.lower);
self.visit_expr(&between.upper);
}
Expression::Cast(cast) => {
self.visit_expr(&cast.expr);
}
Expression::ScalarSubquery(subq) => {
self.visit_select(&subq.subquery);
}
Expression::Exists(exists) => {
self.visit_select(&exists.subquery);
}
Expression::List(list) => {
for item in &list.elements {
self.visit_expr(item);
}
}
Expression::ExpressionList(list) => {
for item in &list.expressions {
self.visit_expr(item);
}
}
Expression::Aliased(aliased) => {
self.visit_expr(&aliased.expression);
}
Expression::Window(window) => {
for arg in &window.function.arguments {
self.visit_expr(arg);
}
}
_ => {}
}
}
fn visit_select(&mut self, select: &SelectStatement) {
for col in &select.columns {
self.visit_expr(col);
}
if let Some(table_expr) = &select.table_expr {
self.visit_expr(table_expr);
}
if let Some(where_clause) = &select.where_clause {
self.visit_expr(where_clause);
}
for group in &select.group_by.columns {
self.visit_expr(group);
}
if let Some(having) = &select.having {
self.visit_expr(having);
}
}
}
let mut counter = ParamCounter::new();
match stmt {
Statement::Select(select) => counter.visit_select(select),
Statement::Insert(insert) => {
for row in &insert.values {
for expr in row {
counter.visit_expr(expr);
}
}
}
Statement::Update(update) => {
for expr in update.updates.values() {
counter.visit_expr(expr);
}
if let Some(where_clause) = &update.where_clause {
counter.visit_expr(where_clause);
}
}
Statement::Delete(delete) => {
if let Some(where_clause) = &delete.where_clause {
counter.visit_expr(where_clause);
}
}
_ => {}
}
let has_params = counter.max_index > 0 || counter.has_positional;
let param_count = if counter.has_positional {
0
} else {
counter.max_index
};
(has_params, param_count)
}
impl crate::functions::backends::SqlRunner for Executor {
fn execute_query(
&self,
sql: &str,
) -> crate::core::Result<Box<dyn crate::storage::traits::QueryResult>> {
self.execute(sql)
}
fn execute_ast(
&self,
stmt: &crate::parser::ast::Statement,
) -> crate::core::Result<Box<dyn crate::storage::traits::QueryResult>> {
let ctx = crate::executor::context::ExecutionContext::new();
self.execute_statement(stmt, &ctx)
}
fn commit(&self) -> crate::core::Result<()> {
let mut active_tx = self.active_transaction.lock().unwrap();
let is_explicit = if let Some(tx_state) = active_tx.as_ref() {
tx_state.is_explicit_tx
} else {
return Err(crate::core::Error::internal(
"No active transaction to commit",
));
};
if is_explicit {
return Err(crate::core::Error::internal(
"invalid transaction termination",
));
}
if let Some(mut tx_state) = active_tx.take() {
tx_state.transaction.commit()?;
let new_tx = self.engine.begin_transaction()?;
*active_tx = Some(ActiveTransaction {
transaction: new_tx,
tables: rustc_hash::FxHashMap::default(),
ddl_undo_log: Vec::new(),
is_explicit_tx: false,
});
}
Ok(())
}
fn rollback(&self) -> crate::core::Result<()> {
let mut active_tx = self.active_transaction.lock().unwrap();
let is_explicit = if let Some(tx_state) = active_tx.as_ref() {
tx_state.is_explicit_tx
} else {
return Err(crate::core::Error::internal(
"No active transaction to rollback",
));
};
if is_explicit {
return Err(crate::core::Error::internal(
"invalid transaction termination",
));
}
if let Some(mut tx_state) = active_tx.take() {
for (_name, mut table) in tx_state.tables.drain() {
table.rollback();
}
tx_state.transaction.rollback()?;
while let Some(op) = tx_state.ddl_undo_log.pop() {
match op {
DeferredDdlOperation::CreateTable { name } => {
let _ = self.engine.drop_table_internal(&name);
}
DeferredDdlOperation::DropTable { schema, .. } => {
let _ = self.engine.create_table(schema);
}
DeferredDdlOperation::CreateSchema { name } => {
let mut schemas = self.engine.schemas.write().unwrap();
schemas.remove(&name);
}
DeferredDdlOperation::DropSchema { name, tables } => {
let mut schemas = self.engine.schemas.write().unwrap();
let mut table_map = rustc_hash::FxHashMap::default();
for (qualified_table_name, schema) in &tables {
let simple_table_name =
qualified_table_name[(name.len() + 1)..].to_string();
table_map.insert(simple_table_name, schema.clone());
}
schemas.insert(name.clone(), table_map);
}
}
}
let new_tx = self.engine.begin_transaction()?;
*active_tx = Some(ActiveTransaction {
transaction: new_tx,
tables: rustc_hash::FxHashMap::default(),
ddl_undo_log: Vec::new(),
is_explicit_tx: false,
});
}
Ok(())
}
fn begin(&self) -> crate::core::Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::mvcc::engine::MVCCEngine;
fn create_test_executor() -> Executor {
let engine = MVCCEngine::in_memory();
engine.open_engine().unwrap();
Executor::new(Arc::new(engine))
}
#[test]
fn test_executor_creation() {
let executor = create_test_executor();
assert!(executor.function_registry().exists("COUNT"));
assert!(executor.function_registry().exists("UPPER"));
}
#[test]
fn test_empty_program() {
let executor = create_test_executor();
let result = executor.execute("").unwrap();
assert_eq!(result.columns().len(), 0);
}
#[test]
fn test_create_table() {
let executor = create_test_executor();
let result = executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
.unwrap();
assert_eq!(result.rows_affected(), 0);
}
#[test]
fn test_insert_and_select() {
let executor = create_test_executor();
executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
.unwrap();
let result = executor
.execute("INSERT INTO users (id, name) VALUES (1, 'Alice')")
.unwrap();
assert_eq!(result.rows_affected(), 1);
let mut result = executor.execute("SELECT * FROM users").unwrap();
let columns = result.columns();
assert_eq!(columns.len(), 2);
assert!(result.next());
let row = result.row();
assert_eq!(row.get(0), Some(&Value::Integer(1)));
assert_eq!(row.get(1), Some(&Value::text("Alice")));
assert!(!result.next());
}
#[test]
fn test_parameterized_query() {
let executor = create_test_executor();
executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
.unwrap();
executor
.execute("INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')")
.unwrap();
let mut result = executor
.execute_with_params("SELECT * FROM users WHERE id = $1", &[Value::Integer(1)])
.unwrap();
assert!(result.next());
let row = result.row();
assert_eq!(row.get(0), Some(&Value::Integer(1)));
assert!(!result.next());
}
#[test]
fn test_query_cache_basic() {
let executor = create_test_executor();
executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
.unwrap();
executor
.execute("INSERT INTO users (id, name) VALUES (1, 'Alice')")
.unwrap();
let stats_before = executor.cache_stats();
executor.execute("SELECT * FROM users").unwrap();
let stats_after = executor.cache_stats();
assert!(stats_after.size > stats_before.size);
let size_before = executor.cache_stats().size;
executor.execute("SELECT * FROM users").unwrap();
let size_after = executor.cache_stats().size;
assert_eq!(size_before, size_after); }
#[test]
fn test_query_cache_parameterized() {
let executor = create_test_executor();
executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
.unwrap();
executor
.execute("INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')")
.unwrap();
let query = "SELECT * FROM users WHERE id = $1";
let mut result = executor
.execute_with_params(query, &[Value::Integer(1)])
.unwrap();
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(1)));
let mut result = executor
.execute_with_params(query, &[Value::Integer(2)])
.unwrap();
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(2)));
}
#[test]
fn test_query_cache_clear() {
let executor = create_test_executor();
executor.execute("SELECT 1").unwrap();
executor.execute("SELECT 2").unwrap();
assert!(executor.cache_stats().size > 0);
executor.clear_cache();
assert_eq!(executor.cache_stats().size, 0);
}
#[test]
fn test_query_cache_whitespace_normalization() {
let executor = create_test_executor();
executor.execute("SELECT 1").unwrap();
let size = executor.cache_stats().size;
executor.execute("SELECT 1").unwrap();
assert_eq!(executor.cache_stats().size, size);
}
}