pub mod context;
pub mod expression;
pub mod parallel;
pub mod pattern_cache;
pub mod planner;
pub mod query_cache;
pub mod result;
pub mod semantic_cache;
pub mod statistics;
mod aggregation;
mod cte;
mod ddl;
mod dml;
mod explain;
mod join;
pub mod pushdown;
mod query;
mod set_ops;
mod show;
mod subquery;
pub mod utils;
mod window;
use rustc_hash::FxHashMap;
use std::sync::{Arc, Mutex};
use crate::core::{Error, Result, Value};
use crate::functions::FunctionRegistry;
use crate::parser::ast::{Program, Statement};
use crate::parser::Parser;
use crate::storage::mvcc::engine::MVCCEngine;
use crate::storage::traits::{Engine, QueryResult, Table, Transaction};
#[derive(Debug, Clone)]
pub(crate) enum DeferredDdlOperation {
CreateTable {
name: String,
}, DropTable {
name: String,
schema: crate::core::Schema,
}, CreateSchema {
name: String,
}, DropSchema {
name: String,
tables: Vec<(String, crate::core::Schema)>,
}, }
pub use context::{ExecutionContext, TimeoutGuard};
pub use expression::{
CompileContext, CompileError, CompiledEvaluator, ExecuteContext, ExprCompiler, ExprVM,
Program as ExprProgram,
};
pub use parallel::{
hash_row_by_keys,
parallel_hash_build,
parallel_hash_join,
parallel_hash_probe,
parallel_order_by,
parallel_order_by_fn,
verify_key_match,
JoinType as ParallelJoinType,
ParallelConfig,
ParallelHashTable,
ParallelJoinResult,
ParallelStats,
SortDirection,
SortSpec,
DEFAULT_PARALLEL_CHUNK_SIZE,
DEFAULT_PARALLEL_FILTER_THRESHOLD,
DEFAULT_PARALLEL_JOIN_THRESHOLD,
DEFAULT_PARALLEL_SORT_THRESHOLD,
};
pub use planner::{
AccessPlan, ColumnStatsCache, JoinPlan, QueryPlanner, RuntimeJoinAlgorithm,
RuntimeJoinDecision, StatsHealth,
};
pub use query_cache::{CacheStats, CachedQueryPlan, QueryCache, DEFAULT_CACHE_SIZE};
pub use result::{ExecResult, ExecutorMemoryResult};
pub use semantic_cache::{
CacheLookupResult, CachedResult, QueryFingerprint, SemanticCache, SemanticCacheStats,
SemanticCacheStatsSnapshot, SubsumptionResult, DEFAULT_CACHE_TTL_SECS, DEFAULT_MAX_CACHED_ROWS,
DEFAULT_SEMANTIC_CACHE_SIZE,
};
struct ActiveTransaction {
transaction: Box<dyn Transaction>,
tables: FxHashMap<String, Box<dyn Table>>,
ddl_undo_log: Vec<DeferredDdlOperation>,
}
pub struct Executor {
engine: Arc<MVCCEngine>,
function_registry: Arc<FunctionRegistry>,
default_isolation_level: crate::core::IsolationLevel,
query_cache: QueryCache,
semantic_cache: SemanticCache,
active_transaction: Mutex<Option<ActiveTransaction>>,
query_planner: std::sync::OnceLock<QueryPlanner>,
}
impl Executor {
pub fn new(engine: Arc<MVCCEngine>) -> Self {
Self {
engine,
function_registry: Arc::new(FunctionRegistry::new()),
default_isolation_level: crate::core::IsolationLevel::ReadCommitted,
query_cache: QueryCache::default(),
semantic_cache: SemanticCache::default(),
active_transaction: Mutex::new(None),
query_planner: std::sync::OnceLock::new(),
}
}
pub fn with_function_registry(
engine: Arc<MVCCEngine>,
function_registry: Arc<FunctionRegistry>,
) -> Self {
Self {
engine,
function_registry,
default_isolation_level: crate::core::IsolationLevel::ReadCommitted,
query_cache: QueryCache::default(),
semantic_cache: SemanticCache::default(),
active_transaction: Mutex::new(None),
query_planner: std::sync::OnceLock::new(),
}
}
pub fn with_cache_size(engine: Arc<MVCCEngine>, cache_size: usize) -> Self {
Self {
engine,
function_registry: Arc::new(FunctionRegistry::new()),
default_isolation_level: crate::core::IsolationLevel::ReadCommitted,
query_cache: QueryCache::new(cache_size),
semantic_cache: SemanticCache::default(),
active_transaction: Mutex::new(None),
query_planner: std::sync::OnceLock::new(),
}
}
pub fn has_active_transaction(&self) -> bool {
self.active_transaction.lock().unwrap().is_some()
}
fn get_query_planner(&self) -> &QueryPlanner {
self.query_planner
.get_or_init(|| QueryPlanner::new(Arc::clone(&self.engine)))
}
#[allow(dead_code)]
fn get_table_for_dml(&self, table_name: &str) -> Result<(Box<dyn Table>, bool)> {
let mut active_tx = self.active_transaction.lock().unwrap();
if let Some(ref mut tx_state) = *active_tx {
let table_name_lower = table_name.to_lowercase();
if tx_state.tables.contains_key(&table_name_lower) {
let table = tx_state.transaction.get_table(table_name)?;
return Ok((table, false));
}
let table = tx_state.transaction.get_table(table_name)?;
tx_state.tables.insert(
table_name_lower.clone(),
tx_state.transaction.get_table(table_name)?,
);
Ok((table, false))
} else {
let tx = self.engine.begin_transaction()?;
let table = tx.get_table(table_name)?;
Ok((table, true))
}
}
#[allow(dead_code)]
#[allow(clippy::type_complexity)]
fn start_transaction_for_dml(
&self,
table_name: &str,
) -> Result<(Option<Box<dyn Transaction>>, Box<dyn Table>, bool)> {
let active_tx = self.active_transaction.lock().unwrap();
if active_tx.is_some() {
drop(active_tx);
let (table, auto_commit) = self.get_table_for_dml(table_name)?;
Ok((None, table, auto_commit))
} else {
drop(active_tx);
let tx = self.engine.begin_transaction()?;
let table = tx.get_table(table_name)?;
Ok((Some(tx), table, true))
}
}
pub fn set_default_isolation_level(&mut self, level: crate::core::IsolationLevel) {
self.default_isolation_level = level;
}
pub fn engine(&self) -> &Arc<MVCCEngine> {
&self.engine
}
pub fn function_registry(&self) -> &Arc<FunctionRegistry> {
&self.function_registry
}
pub fn execute(&self, sql: &str) -> Result<Box<dyn QueryResult>> {
let ctx = ExecutionContext::new();
self.execute_cached(sql, &ctx)
}
pub fn execute_with_params(&self, sql: &str, params: &[Value]) -> Result<Box<dyn QueryResult>> {
let ctx = ExecutionContext::with_params(params.to_vec());
self.execute_cached(sql, &ctx)
}
pub fn execute_with_named_params(
&self,
sql: &str,
params: std::collections::HashMap<String, Value>,
) -> Result<Box<dyn QueryResult>> {
let ctx = ExecutionContext::with_named_params(params);
self.execute_cached(sql, &ctx)
}
pub fn execute_with_context(
&self,
sql: &str,
ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
self.execute_cached(sql, ctx)
}
fn execute_cached(&self, sql: &str, ctx: &ExecutionContext) -> Result<Box<dyn QueryResult>> {
if let Some(cached) = self.query_cache.get(sql) {
if cached.has_params {
let provided = ctx.params().len();
if provided < cached.param_count {
return Err(Error::internal(format!(
"Query requires {} parameters but only {} provided",
cached.param_count, provided
)));
}
}
return self.execute_statement(&cached.statement, ctx);
}
let mut parser = Parser::new(sql);
let mut program = parser
.parse_program()
.map_err(|e| Error::parse(e.to_string()))?;
if program.statements.len() == 1 {
let stmt = program.statements.pop().unwrap();
let (has_params, param_count) = count_parameters(&stmt);
let stmt_arc = std::sync::Arc::new(stmt);
self.query_cache
.put(sql, stmt_arc.clone(), has_params, param_count);
return self.execute_statement(&stmt_arc, ctx);
}
self.execute_program_with_context(&program, ctx)
}
pub fn query_cache(&self) -> &QueryCache {
&self.query_cache
}
pub fn cache_stats(&self) -> CacheStats {
self.query_cache.stats()
}
pub fn clear_cache(&self) {
self.query_cache.clear();
}
pub fn semantic_cache(&self) -> &SemanticCache {
&self.semantic_cache
}
pub fn semantic_cache_stats(&self) -> SemanticCacheStatsSnapshot {
self.semantic_cache.stats()
}
pub fn clear_semantic_cache(&self) {
self.semantic_cache.clear();
}
pub fn invalidate_semantic_cache(&self, table_name: &str) {
self.semantic_cache.invalidate_table(table_name);
}
pub fn execute_program(&self, program: &Program) -> Result<Box<dyn QueryResult>> {
let ctx = ExecutionContext::new();
self.execute_program_with_context(program, &ctx)
}
pub fn execute_program_with_context(
&self,
program: &Program,
ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
if program.statements.is_empty() {
return Ok(Box::new(ExecResult::empty()));
}
let mut last_result: Option<Box<dyn QueryResult>> = None;
for statement in &program.statements {
last_result = Some(self.execute_statement(statement, ctx)?);
}
Ok(last_result.unwrap())
}
pub fn execute_statement(
&self,
statement: &Statement,
ctx: &ExecutionContext,
) -> Result<Box<dyn QueryResult>> {
let ctx = {
let active_tx = self.active_transaction.lock().unwrap();
if let Some(ref tx_state) = *active_tx {
let txn_id = tx_state.transaction.id();
ctx.with_transaction_id(txn_id as u64)
} else {
ctx.clone()
}
};
match statement {
Statement::CreateTable(stmt) => self.execute_create_table(stmt, &ctx),
Statement::DropTable(stmt) => self.execute_drop_table(stmt, &ctx),
Statement::CreateIndex(stmt) => self.execute_create_index(stmt, &ctx),
Statement::DropIndex(stmt) => self.execute_drop_index(stmt, &ctx),
Statement::AlterTable(stmt) => self.execute_alter_table(stmt, &ctx),
Statement::CreateView(stmt) => self.execute_create_view(stmt, &ctx),
Statement::DropView(stmt) => self.execute_drop_view(stmt, &ctx),
Statement::CreateColumnarIndex(stmt) => self.execute_create_columnar_index(stmt, &ctx),
Statement::DropColumnarIndex(stmt) => self.execute_drop_columnar_index(stmt, &ctx),
Statement::CreateSchema(stmt) => self.execute_create_schema(stmt, &ctx),
Statement::DropSchema(stmt) => self.execute_drop_schema(stmt, &ctx),
Statement::UseSchema(stmt) => self.execute_use_schema(stmt, &ctx),
Statement::Insert(stmt) => self.execute_insert(stmt, &ctx),
Statement::Update(stmt) => self.execute_update(stmt, &ctx),
Statement::Delete(stmt) => self.execute_delete(stmt, &ctx),
Statement::Truncate(stmt) => self.execute_truncate(stmt, &ctx),
Statement::Select(stmt) => self.execute_select(stmt, &ctx),
Statement::Begin(stmt) => self.execute_begin(stmt, &ctx),
Statement::Commit(stmt) => self.execute_commit_stmt(stmt, &ctx),
Statement::Rollback(stmt) => self.execute_rollback_stmt(stmt, &ctx),
Statement::Savepoint(stmt) => self.execute_savepoint(stmt, &ctx),
Statement::Set(stmt) => self.execute_set(stmt, &ctx),
Statement::ShowTables(stmt) => self.execute_show_tables(stmt, &ctx),
Statement::ShowViews(stmt) => self.execute_show_views(stmt, &ctx),
Statement::ShowCreateTable(stmt) => self.execute_show_create_table(stmt, &ctx),
Statement::ShowCreateView(stmt) => self.execute_show_create_view(stmt, &ctx),
Statement::ShowIndexes(stmt) => self.execute_show_indexes(stmt, &ctx),
Statement::Describe(stmt) => self.execute_describe(stmt, &ctx),
Statement::Pragma(stmt) => self.execute_pragma(stmt, &ctx),
Statement::Expression(stmt) => self.execute_expression_stmt(stmt, &ctx),
Statement::Explain(stmt) => self.execute_explain(stmt, &ctx),
Statement::Analyze(stmt) => self.execute_analyze(stmt, &ctx),
}
}
pub fn begin_transaction(&self) -> Result<Box<dyn Transaction>> {
self.engine.begin_transaction()
}
pub fn begin_transaction_with_isolation(
&self,
isolation: crate::core::IsolationLevel,
) -> Result<Box<dyn Transaction>> {
let mut tx = self.engine.begin_transaction()?;
let _ = tx.set_isolation_level(isolation);
Ok(tx)
}
}
fn count_parameters(stmt: &Statement) -> (bool, usize) {
use crate::parser::ast::*;
struct ParamCounter {
max_index: usize,
has_positional: bool,
}
impl ParamCounter {
fn new() -> Self {
Self {
max_index: 0,
has_positional: false,
}
}
fn visit_expr(&mut self, expr: &Expression) {
match expr {
Expression::Parameter(param) => {
if param.index > 0 {
self.max_index = self.max_index.max(param.index);
} else {
self.has_positional = true;
}
}
Expression::Infix(infix) => {
self.visit_expr(&infix.left);
self.visit_expr(&infix.right);
}
Expression::Prefix(prefix) => {
self.visit_expr(&prefix.right);
}
Expression::FunctionCall(func) => {
for arg in &func.arguments {
self.visit_expr(arg);
}
}
Expression::Case(case) => {
if let Some(val) = &case.value {
self.visit_expr(val);
}
for when in &case.when_clauses {
self.visit_expr(&when.condition);
self.visit_expr(&when.then_result);
}
if let Some(el) = &case.else_value {
self.visit_expr(el);
}
}
Expression::In(in_expr) => {
self.visit_expr(&in_expr.left);
self.visit_expr(&in_expr.right);
}
Expression::Between(between) => {
self.visit_expr(&between.expr);
self.visit_expr(&between.lower);
self.visit_expr(&between.upper);
}
Expression::Cast(cast) => {
self.visit_expr(&cast.expr);
}
Expression::ScalarSubquery(subq) => {
self.visit_select(&subq.subquery);
}
Expression::Exists(exists) => {
self.visit_select(&exists.subquery);
}
Expression::List(list) => {
for item in &list.elements {
self.visit_expr(item);
}
}
Expression::ExpressionList(list) => {
for item in &list.expressions {
self.visit_expr(item);
}
}
Expression::Aliased(aliased) => {
self.visit_expr(&aliased.expression);
}
Expression::Window(window) => {
for arg in &window.function.arguments {
self.visit_expr(arg);
}
}
_ => {}
}
}
fn visit_select(&mut self, select: &SelectStatement) {
for col in &select.columns {
self.visit_expr(col);
}
if let Some(table_expr) = &select.table_expr {
self.visit_expr(table_expr);
}
if let Some(where_clause) = &select.where_clause {
self.visit_expr(where_clause);
}
for group in &select.group_by.columns {
self.visit_expr(group);
}
if let Some(having) = &select.having {
self.visit_expr(having);
}
}
}
let mut counter = ParamCounter::new();
match stmt {
Statement::Select(select) => counter.visit_select(select),
Statement::Insert(insert) => {
for row in &insert.values {
for expr in row {
counter.visit_expr(expr);
}
}
}
Statement::Update(update) => {
for expr in update.updates.values() {
counter.visit_expr(expr);
}
if let Some(where_clause) = &update.where_clause {
counter.visit_expr(where_clause);
}
}
Statement::Delete(delete) => {
if let Some(where_clause) = &delete.where_clause {
counter.visit_expr(where_clause);
}
}
_ => {}
}
let has_params = counter.max_index > 0 || counter.has_positional;
let param_count = if counter.has_positional {
0
} else {
counter.max_index
};
(has_params, param_count)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::mvcc::engine::MVCCEngine;
fn create_test_executor() -> Executor {
let engine = MVCCEngine::in_memory();
engine.open_engine().unwrap();
Executor::new(Arc::new(engine))
}
#[test]
fn test_executor_creation() {
let executor = create_test_executor();
assert!(executor.function_registry().exists("COUNT"));
assert!(executor.function_registry().exists("UPPER"));
}
#[test]
fn test_empty_program() {
let executor = create_test_executor();
let result = executor.execute("").unwrap();
assert_eq!(result.columns().len(), 0);
}
#[test]
fn test_create_table() {
let executor = create_test_executor();
let result = executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
.unwrap();
assert_eq!(result.rows_affected(), 0);
}
#[test]
fn test_insert_and_select() {
let executor = create_test_executor();
executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
.unwrap();
let result = executor
.execute("INSERT INTO users (id, name) VALUES (1, 'Alice')")
.unwrap();
assert_eq!(result.rows_affected(), 1);
let mut result = executor.execute("SELECT * FROM users").unwrap();
let columns = result.columns();
assert_eq!(columns.len(), 2);
assert!(result.next());
let row = result.row();
assert_eq!(row.get(0), Some(&Value::Integer(1)));
assert_eq!(row.get(1), Some(&Value::text("Alice")));
assert!(!result.next());
}
#[test]
fn test_parameterized_query() {
let executor = create_test_executor();
executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
.unwrap();
executor
.execute("INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')")
.unwrap();
let mut result = executor
.execute_with_params("SELECT * FROM users WHERE id = $1", &[Value::Integer(1)])
.unwrap();
assert!(result.next());
let row = result.row();
assert_eq!(row.get(0), Some(&Value::Integer(1)));
assert!(!result.next());
}
#[test]
fn test_query_cache_basic() {
let executor = create_test_executor();
executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
.unwrap();
executor
.execute("INSERT INTO users (id, name) VALUES (1, 'Alice')")
.unwrap();
let stats_before = executor.cache_stats();
executor.execute("SELECT * FROM users").unwrap();
let stats_after = executor.cache_stats();
assert!(stats_after.size > stats_before.size);
let size_before = executor.cache_stats().size;
executor.execute("SELECT * FROM users").unwrap();
let size_after = executor.cache_stats().size;
assert_eq!(size_before, size_after); }
#[test]
fn test_query_cache_parameterized() {
let executor = create_test_executor();
executor
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
.unwrap();
executor
.execute("INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')")
.unwrap();
let query = "SELECT * FROM users WHERE id = $1";
let mut result = executor
.execute_with_params(query, &[Value::Integer(1)])
.unwrap();
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(1)));
let mut result = executor
.execute_with_params(query, &[Value::Integer(2)])
.unwrap();
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(2)));
}
#[test]
fn test_query_cache_clear() {
let executor = create_test_executor();
executor.execute("SELECT 1").unwrap();
executor.execute("SELECT 2").unwrap();
assert!(executor.cache_stats().size > 0);
executor.clear_cache();
assert_eq!(executor.cache_stats().size, 0);
}
#[test]
fn test_query_cache_whitespace_normalization() {
let executor = create_test_executor();
executor.execute("SELECT 1").unwrap();
let size = executor.cache_stats().size;
executor.execute("SELECT 1").unwrap();
assert_eq!(executor.cache_stats().size, size);
}
}